PostgreSQL Source Code git master
Loading...
Searching...
No Matches
snapbuild.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/heapam_xlog.h"
#include "access/transam.h"
#include "access/xact.h"
#include "common/file_utils.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
#include "replication/snapbuild_internal.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/standby.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
#include "utils/snapshot.h"
#include "utils/wait_event.h"
Include dependency graph for snapbuild.c:

Go to the source code of this file.

Macros

#define SnapBuildOnDiskConstantSize    offsetof(SnapBuildOnDisk, builder)
 
#define SnapBuildOnDiskNotChecksummedSize    offsetof(SnapBuildOnDisk, version)
 
#define SNAPBUILD_MAGIC   0x51A1E001
 
#define SNAPBUILD_VERSION   6
 

Functions

static void SnapBuildPurgeOlderTxn (SnapBuild *builder)
 
static Snapshot SnapBuildBuildSnapshot (SnapBuild *builder)
 
static void SnapBuildFreeSnapshot (Snapshot snap)
 
static void SnapBuildSnapIncRefcount (Snapshot snap)
 
static void SnapBuildDistributeSnapshotAndInval (SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
 
static bool SnapBuildXidHasCatalogChanges (SnapBuild *builder, TransactionId xid, uint32 xinfo)
 
static bool SnapBuildFindSnapshot (SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
 
static void SnapBuildWaitSnapshot (xl_running_xacts *running, TransactionId cutoff)
 
static void SnapBuildSerialize (SnapBuild *builder, XLogRecPtr lsn)
 
static bool SnapBuildRestore (SnapBuild *builder, XLogRecPtr lsn)
 
static void SnapBuildRestoreContents (int fd, void *dest, Size size, const char *path)
 
SnapBuildAllocateSnapshotBuilder (ReorderBuffer *reorder, TransactionId xmin_horizon, XLogRecPtr start_lsn, bool need_full_snapshot, bool in_slot_creation, XLogRecPtr two_phase_at)
 
void FreeSnapshotBuilder (SnapBuild *builder)
 
SnapBuildState SnapBuildCurrentState (SnapBuild *builder)
 
XLogRecPtr SnapBuildGetTwoPhaseAt (SnapBuild *builder)
 
void SnapBuildSetTwoPhaseAt (SnapBuild *builder, XLogRecPtr ptr)
 
bool SnapBuildXactNeedsSkip (SnapBuild *builder, XLogRecPtr ptr)
 
void SnapBuildSnapDecRefcount (Snapshot snap)
 
Snapshot SnapBuildInitialSnapshot (SnapBuild *builder)
 
const charSnapBuildExportSnapshot (SnapBuild *builder)
 
Snapshot SnapBuildGetOrBuildSnapshot (SnapBuild *builder)
 
void SnapBuildClearExportedSnapshot (void)
 
void SnapBuildResetExportedSnapshotState (void)
 
bool SnapBuildProcessChange (SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
 
void SnapBuildProcessNewCid (SnapBuild *builder, TransactionId xid, XLogRecPtr lsn, xl_heap_new_cid *xlrec)
 
static void SnapBuildAddCommittedTxn (SnapBuild *builder, TransactionId xid)
 
void SnapBuildCommitTxn (SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, int nsubxacts, TransactionId *subxacts, uint32 xinfo)
 
void SnapBuildProcessRunningXacts (SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
 
void SnapBuildSerializationPoint (SnapBuild *builder, XLogRecPtr lsn)
 
bool SnapBuildRestoreSnapshot (SnapBuildOnDisk *ondisk, XLogRecPtr lsn, MemoryContext context, bool missing_ok)
 
void CheckPointSnapBuild (void)
 
bool SnapBuildSnapshotExists (XLogRecPtr lsn)
 

Variables

static ResourceOwner SavedResourceOwnerDuringExport = NULL
 
static bool ExportInProgress = false
 

Macro Definition Documentation

◆ SNAPBUILD_MAGIC

#define SNAPBUILD_MAGIC   0x51A1E001

Definition at line 1478 of file snapbuild.c.

◆ SNAPBUILD_VERSION

#define SNAPBUILD_VERSION   6

Definition at line 1479 of file snapbuild.c.

◆ SnapBuildOnDiskConstantSize

#define SnapBuildOnDiskConstantSize    offsetof(SnapBuildOnDisk, builder)

Definition at line 1473 of file snapbuild.c.

1489{
1490 if (builder->state < SNAPBUILD_CONSISTENT)
1491 SnapBuildRestore(builder, lsn);
1492 else
1493 SnapBuildSerialize(builder, lsn);
1494}
1495
1496/*
1497 * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
1498 * been done by another decoding process.
1499 */
1500static void
1502{
1504 SnapBuildOnDisk *ondisk = NULL;
1507 size_t catchange_xcnt;
1508 char *ondisk_c;
1509 int fd;
1510 char tmppath[MAXPGPATH];
1511 char path[MAXPGPATH];
1512 int ret;
1513 struct stat stat_buf;
1514 Size sz;
1515
1518 builder->last_serialized_snapshot <= lsn);
1519
1520 /*
1521 * no point in serializing if we cannot continue to work immediately after
1522 * restoring the snapshot
1523 */
1524 if (builder->state < SNAPBUILD_CONSISTENT)
1525 return;
1526
1527 /* consistent snapshots have no next phase */
1529
1530 /*
1531 * We identify snapshots by the LSN they are valid for. We don't need to
1532 * include timelines in the name as each LSN maps to exactly one timeline
1533 * unless the user used pg_resetwal or similar. If a user did so, there's
1534 * no hope continuing to decode anyway.
1535 */
1536 sprintf(path, "%s/%X-%X.snap",
1538 LSN_FORMAT_ARGS(lsn));
1539
1540 /*
1541 * first check whether some other backend already has written the snapshot
1542 * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
1543 * as a valid state. Everything else is an unexpected error.
1544 */
1545 ret = stat(path, &stat_buf);
1546
1547 if (ret != 0 && errno != ENOENT)
1548 ereport(ERROR,
1550 errmsg("could not stat file \"%s\": %m", path)));
1551
1552 else if (ret == 0)
1553 {
1554 /*
1555 * somebody else has already serialized to this point, don't overwrite
1556 * but remember location, so we don't need to read old data again.
1557 *
1558 * To be sure it has been synced to disk after the rename() from the
1559 * tempfile filename to the real filename, we just repeat the fsync.
1560 * That ought to be cheap because in most scenarios it should already
1561 * be safely on disk.
1562 */
1563 fsync_fname(path, false);
1565
1566 builder->last_serialized_snapshot = lsn;
1567 goto out;
1568 }
1569
1570 /*
1571 * there is an obvious race condition here between the time we stat(2) the
1572 * file and us writing the file. But we rename the file into place
1573 * atomically and all files created need to contain the same data anyway,
1574 * so this is perfectly fine, although a bit of a resource waste. Locking
1575 * seems like pointless complication.
1576 */
1577 elog(DEBUG1, "serializing snapshot to %s", path);
1578
1579 /* to make sure only we will write to this tempfile, include pid */
1580 sprintf(tmppath, "%s/%X-%X.snap.%d.tmp",
1583
1584 /*
1585 * Unlink temporary file if it already exists, needs to have been before a
1586 * crash/error since we won't enter this function twice from within a
1587 * single decoding slot/backend and the temporary file contains the pid of
1588 * the current process.
1589 */
1590 if (unlink(tmppath) != 0 && errno != ENOENT)
1591 ereport(ERROR,
1593 errmsg("could not remove file \"%s\": %m", tmppath)));
1594
1596
1597 /* Get the catalog modifying transactions that are yet not committed */
1600
1601 needed_length = sizeof(SnapBuildOnDisk) +
1602 sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
1603
1605 ondisk = (SnapBuildOnDisk *) ondisk_c;
1606 ondisk->magic = SNAPBUILD_MAGIC;
1607 ondisk->version = SNAPBUILD_VERSION;
1608 ondisk->length = needed_length;
1609 INIT_CRC32C(ondisk->checksum);
1610 COMP_CRC32C(ondisk->checksum,
1611 ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1613 ondisk_c += sizeof(SnapBuildOnDisk);
1614
1615 memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
1616 /* NULL-ify memory-only data */
1617 ondisk->builder.context = NULL;
1618 ondisk->builder.snapshot = NULL;
1619 ondisk->builder.reorder = NULL;
1620 ondisk->builder.committed.xip = NULL;
1621 ondisk->builder.catchange.xip = NULL;
1622 /* update catchange only on disk data */
1624
1625 COMP_CRC32C(ondisk->checksum,
1626 &ondisk->builder,
1627 sizeof(SnapBuild));
1628
1629 /* copy committed xacts */
1630 if (builder->committed.xcnt > 0)
1631 {
1632 sz = sizeof(TransactionId) * builder->committed.xcnt;
1633 memcpy(ondisk_c, builder->committed.xip, sz);
1634 COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1635 ondisk_c += sz;
1636 }
1637
1638 /* copy catalog modifying xacts */
1639 if (catchange_xcnt > 0)
1640 {
1641 sz = sizeof(TransactionId) * catchange_xcnt;
1643 COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1644 ondisk_c += sz;
1645 }
1646
1647 FIN_CRC32C(ondisk->checksum);
1648
1649 /* we have valid data now, open tempfile and write it there */
1652 if (fd < 0)
1653 ereport(ERROR,
1655 errmsg("could not open file \"%s\": %m", tmppath)));
1656
1657 errno = 0;
1659 if ((write(fd, ondisk, needed_length)) != needed_length)
1660 {
1661 int save_errno = errno;
1662
1664
1665 /* if write didn't set errno, assume problem is no disk space */
1667 ereport(ERROR,
1669 errmsg("could not write to file \"%s\": %m", tmppath)));
1670 }
1672
1673 /*
1674 * fsync the file before renaming so that even if we crash after this we
1675 * have either a fully valid file or nothing.
1676 *
1677 * It's safe to just ERROR on fsync() here because we'll retry the whole
1678 * operation including the writes.
1679 *
1680 * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
1681 * some noticeable overhead since it's performed synchronously during
1682 * decoding?
1683 */
1685 if (pg_fsync(fd) != 0)
1686 {
1687 int save_errno = errno;
1688
1690 errno = save_errno;
1691 ereport(ERROR,
1693 errmsg("could not fsync file \"%s\": %m", tmppath)));
1694 }
1696
1697 if (CloseTransientFile(fd) != 0)
1698 ereport(ERROR,
1700 errmsg("could not close file \"%s\": %m", tmppath)));
1701
1703
1704 /*
1705 * We may overwrite the work from some other backend, but that's ok, our
1706 * snapshot is valid as well, we'll just have done some superfluous work.
1707 */
1708 if (rename(tmppath, path) != 0)
1709 {
1710 ereport(ERROR,
1712 errmsg("could not rename file \"%s\" to \"%s\": %m",
1713 tmppath, path)));
1714 }
1715
1716 /* make sure we persist */
1717 fsync_fname(path, false);
1719
1720 /*
1721 * Now there's no way we can lose the dumped state anymore, remember this
1722 * as a serialization point.
1723 */
1724 builder->last_serialized_snapshot = lsn;
1725
1727
1728out:
1730 builder->last_serialized_snapshot);
1731 /* be tidy */
1732 if (ondisk)
1733 pfree(ondisk);
1734 if (catchange_xip)
1736}
1737
1738/*
1739 * Restore the logical snapshot file contents to 'ondisk'.
1740 *
1741 * 'context' is the memory context where the catalog modifying/committed xid
1742 * will live.
1743 * If 'missing_ok' is true, will not throw an error if the file is not found.
1744 */
1745bool
1747 MemoryContext context, bool missing_ok)
1748{
1749 int fd;
1750 pg_crc32c checksum;
1751 Size sz;
1752 char path[MAXPGPATH];
1753
1754 sprintf(path, "%s/%X-%X.snap",
1756 LSN_FORMAT_ARGS(lsn));
1757
1759
1760 if (fd < 0)
1761 {
1762 if (missing_ok && errno == ENOENT)
1763 return false;
1764
1765 ereport(ERROR,
1767 errmsg("could not open file \"%s\": %m", path)));
1768 }
1769
1770 /* ----
1771 * Make sure the snapshot had been stored safely to disk, that's normally
1772 * cheap.
1773 * Note that we do not need PANIC here, nobody will be able to use the
1774 * slot without fsyncing, and saving it won't succeed without an fsync()
1775 * either...
1776 * ----
1777 */
1778 fsync_fname(path, false);
1780
1781 /* read statically sized portion of snapshot */
1783
1784 if (ondisk->magic != SNAPBUILD_MAGIC)
1785 ereport(ERROR,
1787 errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
1788 path, ondisk->magic, SNAPBUILD_MAGIC)));
1789
1790 if (ondisk->version != SNAPBUILD_VERSION)
1791 ereport(ERROR,
1793 errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
1794 path, ondisk->version, SNAPBUILD_VERSION)));
1795
1796 INIT_CRC32C(checksum);
1797 COMP_CRC32C(checksum,
1798 ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1800
1801 /* read SnapBuild */
1802 SnapBuildRestoreContents(fd, &ondisk->builder, sizeof(SnapBuild), path);
1803 COMP_CRC32C(checksum, &ondisk->builder, sizeof(SnapBuild));
1804
1805 /* restore committed xacts information */
1806 if (ondisk->builder.committed.xcnt > 0)
1807 {
1808 sz = sizeof(TransactionId) * ondisk->builder.committed.xcnt;
1809 ondisk->builder.committed.xip = MemoryContextAllocZero(context, sz);
1811 COMP_CRC32C(checksum, ondisk->builder.committed.xip, sz);
1812 }
1813
1814 /* restore catalog modifying xacts information */
1815 if (ondisk->builder.catchange.xcnt > 0)
1816 {
1817 sz = sizeof(TransactionId) * ondisk->builder.catchange.xcnt;
1818 ondisk->builder.catchange.xip = MemoryContextAllocZero(context, sz);
1820 COMP_CRC32C(checksum, ondisk->builder.catchange.xip, sz);
1821 }
1822
1823 if (CloseTransientFile(fd) != 0)
1824 ereport(ERROR,
1826 errmsg("could not close file \"%s\": %m", path)));
1827
1828 FIN_CRC32C(checksum);
1829
1830 /* verify checksum of what we've read */
1831 if (!EQ_CRC32C(checksum, ondisk->checksum))
1832 ereport(ERROR,
1834 errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
1835 path, checksum, ondisk->checksum)));
1836
1837 return true;
1838}
1839
1840/*
1841 * Restore a snapshot into 'builder' if previously one has been stored at the
1842 * location indicated by 'lsn'. Returns true if successful, false otherwise.
1843 */
1844static bool
1846{
1847 SnapBuildOnDisk ondisk;
1848
1849 /* no point in loading a snapshot if we're already there */
1850 if (builder->state == SNAPBUILD_CONSISTENT)
1851 return false;
1852
1853 /* validate and restore the snapshot to 'ondisk' */
1854 if (!SnapBuildRestoreSnapshot(&ondisk, lsn, builder->context, true))
1855 return false;
1856
1857 /*
1858 * ok, we now have a sensible snapshot here, figure out if it has more
1859 * information than we have.
1860 */
1861
1862 /*
1863 * We are only interested in consistent snapshots for now, comparing
1864 * whether one incomplete snapshot is more "advanced" seems to be
1865 * unnecessarily complex.
1866 */
1867 if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
1869
1870 /*
1871 * Don't use a snapshot that requires an xmin that we cannot guarantee to
1872 * be available.
1873 */
1876
1877 /*
1878 * Consistent snapshots have no next phase. Reset next_phase_at as it is
1879 * possible that an old value may remain.
1880 */
1883
1884 /* ok, we think the snapshot is sensible, copy over everything important */
1885 builder->xmin = ondisk.builder.xmin;
1886 builder->xmax = ondisk.builder.xmax;
1887 builder->state = ondisk.builder.state;
1888
1889 builder->committed.xcnt = ondisk.builder.committed.xcnt;
1890 /* We only allocated/stored xcnt, not xcnt_space xids ! */
1891 /* don't overwrite preallocated xip, if we don't have anything here */
1892 if (builder->committed.xcnt > 0)
1893 {
1894 pfree(builder->committed.xip);
1895 builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
1896 builder->committed.xip = ondisk.builder.committed.xip;
1897 }
1898 ondisk.builder.committed.xip = NULL;
1899
1900 /* set catalog modifying transactions */
1901 if (builder->catchange.xip)
1902 pfree(builder->catchange.xip);
1903 builder->catchange.xcnt = ondisk.builder.catchange.xcnt;
1904 builder->catchange.xip = ondisk.builder.catchange.xip;
1905 ondisk.builder.catchange.xip = NULL;
1906
1907 /* our snapshot is not interesting anymore, build a new one */
1908 if (builder->snapshot != NULL)
1909 {
1911 }
1912 builder->snapshot = SnapBuildBuildSnapshot(builder);
1914
1916
1917 Assert(builder->state == SNAPBUILD_CONSISTENT);
1918
1920 errmsg("logical decoding found consistent point at %X/%08X",
1921 LSN_FORMAT_ARGS(lsn)),
1922 errdetail("Logical decoding will begin using saved snapshot."));
1923 return true;
1924
1926 if (ondisk.builder.committed.xip != NULL)
1927 pfree(ondisk.builder.committed.xip);
1928 if (ondisk.builder.catchange.xip != NULL)
1929 pfree(ondisk.builder.catchange.xip);
1930 return false;
1931}
1932
1933/*
1934 * Read the contents of the serialized snapshot to 'dest'.
1935 */
1936static void
1937SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path)
1938{
1939 int readBytes;
1940
1942 readBytes = read(fd, dest, size);
1944 if (readBytes != size)
1945 {
1946 int save_errno = errno;
1947
1949
1950 if (readBytes < 0)
1951 {
1952 errno = save_errno;
1953 ereport(ERROR,
1955 errmsg("could not read file \"%s\": %m", path)));
1956 }
1957 else
1958 ereport(ERROR,
1960 errmsg("could not read file \"%s\": read %d of %zu",
1961 path, readBytes, size)));
1962 }
1963}
1964
1965/*
1966 * Remove all serialized snapshots that are not required anymore because no
1967 * slot can need them. This doesn't actually have to run during a checkpoint,
1968 * but it's a convenient point to schedule this.
1969 *
1970 * NB: We run this during checkpoints even if logical decoding is disabled so
1971 * we cleanup old slots at some point after it got disabled.
1972 */
1973void
1975{
1976 XLogRecPtr cutoff;
1977 XLogRecPtr redo;
1978 DIR *snap_dir;
1979 struct dirent *snap_de;
1980 char path[MAXPGPATH + sizeof(PG_LOGICAL_SNAPSHOTS_DIR)];
1981
1982 /*
1983 * We start off with a minimum of the last redo pointer. No new
1984 * replication slot will start before that, so that's a safe upper bound
1985 * for removal.
1986 */
1987 redo = GetRedoRecPtr();
1988
1989 /* now check for the restart ptrs from existing slots */
1991
1992 /* don't start earlier than the restart lsn */
1993 if (redo < cutoff)
1994 cutoff = redo;
1995
1998 {
1999 uint32 hi;
2000 uint32 lo;
2001 XLogRecPtr lsn;
2003
2004 if (strcmp(snap_de->d_name, ".") == 0 ||
2005 strcmp(snap_de->d_name, "..") == 0)
2006 continue;
2007
2008 snprintf(path, sizeof(path), "%s/%s", PG_LOGICAL_SNAPSHOTS_DIR, snap_de->d_name);
2009 de_type = get_dirent_type(path, snap_de, false, DEBUG1);
2010
2012 {
2013 elog(DEBUG1, "only regular files expected: %s", path);
2014 continue;
2015 }
2016
2017 /*
2018 * temporary filenames from SnapBuildSerialize() include the LSN and
2019 * everything but are postfixed by .$pid.tmp. We can just remove them
2020 * the same as other files because there can be none that are
2021 * currently being written that are older than cutoff.
2022 *
2023 * We just log a message if a file doesn't fit the pattern, it's
2024 * probably some editors lock/state file or similar...
2025 */
2026 if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
2027 {
2028 ereport(LOG,
2029 (errmsg("could not parse file name \"%s\"", path)));
2030 continue;
2031 }
2032
2033 lsn = ((uint64) hi) << 32 | lo;
2034
2035 /* check whether we still need it */
2036 if (lsn < cutoff || !XLogRecPtrIsValid(cutoff))
2037 {
2038 elog(DEBUG1, "removing snapbuild snapshot %s", path);
2039
2040 /*
2041 * It's not particularly harmful, though strange, if we can't
2042 * remove the file here. Don't prevent the checkpoint from
2043 * completing, that'd be a cure worse than the disease.
2044 */
2045 if (unlink(path) < 0)
2046 {
2047 ereport(LOG,
2049 errmsg("could not remove file \"%s\": %m",
2050 path)));
2051 continue;
2052 }
2053 }
2054 }
2056}
2057
2058/*
2059 * Check if a logical snapshot at the specified point has been serialized.
2060 */
2061bool
2063{
2064 char path[MAXPGPATH];
2065 int ret;
2066 struct stat stat_buf;
2067
2068 sprintf(path, "%s/%X-%X.snap",
2070 LSN_FORMAT_ARGS(lsn));
2071
2072 ret = stat(path, &stat_buf);
2073
2074 if (ret != 0 && errno != ENOENT)
2075 ereport(ERROR,
2077 errmsg("could not stat file \"%s\": %m", path)));
2078
2079 return ret == 0;
2080}
#define Assert(condition)
Definition c.h:943
#define PG_BINARY
Definition c.h:1386
uint64_t uint64
Definition c.h:625
uint32_t uint32
Definition c.h:624
uint32 TransactionId
Definition c.h:736
size_t Size
Definition c.h:689
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
int errcode_for_file_access(void)
Definition elog.c:898
int errcode(int sqlerrcode)
Definition elog.c:875
#define LOG
Definition elog.h:32
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define DEBUG1
Definition elog.h:31
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define ereport(elevel,...)
Definition elog.h:152
int FreeDir(DIR *dir)
Definition fd.c:3009
int CloseTransientFile(int fd)
Definition fd.c:2855
void fsync_fname(const char *fname, bool isdir)
Definition fd.c:757
DIR * AllocateDir(const char *dirname)
Definition fd.c:2891
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition fd.c:2957
int pg_fsync(int fd)
Definition fd.c:390
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2678
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Definition file_utils.c:547
PGFileType
Definition file_utils.h:19
@ PGFILETYPE_REG
Definition file_utils.h:22
@ PGFILETYPE_ERROR
Definition file_utils.h:20
int MyProcPid
Definition globals.c:49
static uint32 dclist_count(const dclist_head *head)
Definition ilist.h:932
#define write(a, b, c)
Definition win32.h:14
#define read(a, b, c)
Definition win32.h:13
#define LogicalDecodingLogLevel()
Definition logical.h:175
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition mcxt.c:1269
void pfree(void *pointer)
Definition mcxt.c:1619
void * palloc0(Size size)
Definition mcxt.c:1420
static char * errmsg
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:138
#define ERRCODE_DATA_CORRUPTED
#define MAXPGPATH
uint32 pg_crc32c
Definition pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition pg_crc32c.h:177
#define EQ_CRC32C(c1, c2)
Definition pg_crc32c.h:42
#define INIT_CRC32C(crc)
Definition pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition pg_crc32c.h:182
#define sprintf
Definition port.h:263
#define snprintf
Definition port.h:261
static int fd(const char *x, int i)
static int fb(int x)
TransactionId * ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb)
void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
#define PG_LOGICAL_SNAPSHOTS_DIR
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition slot.c:1374
static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
Definition snapbuild.c:1501
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition snapbuild.c:332
bool SnapBuildRestoreSnapshot(SnapBuildOnDisk *ondisk, XLogRecPtr lsn, MemoryContext context, bool missing_ok)
Definition snapbuild.c:1746
#define SNAPBUILD_VERSION
Definition snapbuild.c:1479
static void SnapBuildSnapIncRefcount(Snapshot snap)
Definition snapbuild.c:320
#define SnapBuildOnDiskNotChecksummedSize
Definition snapbuild.c:1475
bool SnapBuildSnapshotExists(XLogRecPtr lsn)
Definition snapbuild.c:2062
void CheckPointSnapBuild(void)
Definition snapbuild.c:1974
#define SNAPBUILD_MAGIC
Definition snapbuild.c:1478
static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder)
Definition snapbuild.c:364
#define SnapBuildOnDiskConstantSize
Definition snapbuild.c:1473
static void SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path)
Definition snapbuild.c:1937
static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
Definition snapbuild.c:1845
@ SNAPBUILD_CONSISTENT
Definition snapbuild.h:58
Definition dirent.c:26
dclist_head catchange_txns
SnapBuildState state
TransactionId xmin
TransactionId initial_xmin_horizon
TransactionId xmax
TransactionId * xip
Snapshot snapshot
struct SnapBuild::@127 catchange
TransactionId next_phase_at
XLogRecPtr last_serialized_snapshot
MemoryContext context
struct SnapBuild::@126 committed
ReorderBuffer * reorder
#define InvalidTransactionId
Definition transam.h:31
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition wait_event.h:67
static void pgstat_report_wait_end(void)
Definition wait_event.h:83
#define stat
Definition win32_port.h:74
XLogRecPtr GetRedoRecPtr(void)
Definition xlog.c:6937
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint64 XLogRecPtr
Definition xlogdefs.h:21

◆ SnapBuildOnDiskNotChecksummedSize

#define SnapBuildOnDiskNotChecksummedSize    offsetof(SnapBuildOnDisk, version)

Definition at line 1475 of file snapbuild.c.

Function Documentation

◆ AllocateSnapshotBuilder()

SnapBuild * AllocateSnapshotBuilder ( ReorderBuffer reorder,
TransactionId  xmin_horizon,
XLogRecPtr  start_lsn,
bool  need_full_snapshot,
bool  in_slot_creation,
XLogRecPtr  two_phase_at 
)

Definition at line 189 of file snapbuild.c.

195{
196 MemoryContext context;
197 MemoryContext oldcontext;
198 SnapBuild *builder;
199
200 /* allocate memory in own context, to have better accountability */
202 "snapshot builder context",
204 oldcontext = MemoryContextSwitchTo(context);
205
206 builder = palloc0_object(SnapBuild);
207
208 builder->state = SNAPBUILD_START;
209 builder->context = context;
210 builder->reorder = reorder;
211 /* Other struct members initialized by zeroing via palloc0 above */
212
213 builder->committed.xcnt = 0;
214 builder->committed.xcnt_space = 128; /* arbitrary number */
215 builder->committed.xip =
217 builder->committed.includes_all_transactions = true;
218
219 builder->catchange.xcnt = 0;
220 builder->catchange.xip = NULL;
221
223 builder->start_decoding_at = start_lsn;
224 builder->in_slot_creation = in_slot_creation;
226 builder->two_phase_at = two_phase_at;
227
228 MemoryContextSwitchTo(oldcontext);
229
230 return builder;
231}
#define palloc0_array(type, count)
Definition fe_memutils.h:92
#define palloc0_object(type)
Definition fe_memutils.h:90
MemoryContext CurrentMemoryContext
Definition mcxt.c:161
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
@ SNAPBUILD_START
Definition snapbuild.h:35
XLogRecPtr start_decoding_at
XLogRecPtr two_phase_at
bool building_full_snapshot
bool includes_all_transactions

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, SnapBuild::building_full_snapshot, SnapBuild::catchange, SnapBuild::committed, SnapBuild::context, CurrentMemoryContext, fb(), SnapBuild::in_slot_creation, SnapBuild::includes_all_transactions, SnapBuild::initial_xmin_horizon, MemoryContextSwitchTo(), palloc0_array, palloc0_object, SnapBuild::reorder, SNAPBUILD_START, SnapBuild::start_decoding_at, SnapBuild::state, SnapBuild::two_phase_at, SnapBuild::xcnt, SnapBuild::xcnt_space, and SnapBuild::xip.

Referenced by StartupDecodingContext().

◆ CheckPointSnapBuild()

void CheckPointSnapBuild ( void  )

Definition at line 1974 of file snapbuild.c.

1975{
1976 XLogRecPtr cutoff;
1977 XLogRecPtr redo;
1978 DIR *snap_dir;
1979 struct dirent *snap_de;
1980 char path[MAXPGPATH + sizeof(PG_LOGICAL_SNAPSHOTS_DIR)];
1981
1982 /*
1983 * We start off with a minimum of the last redo pointer. No new
1984 * replication slot will start before that, so that's a safe upper bound
1985 * for removal.
1986 */
1987 redo = GetRedoRecPtr();
1988
1989 /* now check for the restart ptrs from existing slots */
1991
1992 /* don't start earlier than the restart lsn */
1993 if (redo < cutoff)
1994 cutoff = redo;
1995
1998 {
1999 uint32 hi;
2000 uint32 lo;
2001 XLogRecPtr lsn;
2003
2004 if (strcmp(snap_de->d_name, ".") == 0 ||
2005 strcmp(snap_de->d_name, "..") == 0)
2006 continue;
2007
2008 snprintf(path, sizeof(path), "%s/%s", PG_LOGICAL_SNAPSHOTS_DIR, snap_de->d_name);
2009 de_type = get_dirent_type(path, snap_de, false, DEBUG1);
2010
2012 {
2013 elog(DEBUG1, "only regular files expected: %s", path);
2014 continue;
2015 }
2016
2017 /*
2018 * temporary filenames from SnapBuildSerialize() include the LSN and
2019 * everything but are postfixed by .$pid.tmp. We can just remove them
2020 * the same as other files because there can be none that are
2021 * currently being written that are older than cutoff.
2022 *
2023 * We just log a message if a file doesn't fit the pattern, it's
2024 * probably some editors lock/state file or similar...
2025 */
2026 if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
2027 {
2028 ereport(LOG,
2029 (errmsg("could not parse file name \"%s\"", path)));
2030 continue;
2031 }
2032
2033 lsn = ((uint64) hi) << 32 | lo;
2034
2035 /* check whether we still need it */
2036 if (lsn < cutoff || !XLogRecPtrIsValid(cutoff))
2037 {
2038 elog(DEBUG1, "removing snapbuild snapshot %s", path);
2039
2040 /*
2041 * It's not particularly harmful, though strange, if we can't
2042 * remove the file here. Don't prevent the checkpoint from
2043 * completing, that'd be a cure worse than the disease.
2044 */
2045 if (unlink(path) < 0)
2046 {
2047 ereport(LOG,
2049 errmsg("could not remove file \"%s\": %m",
2050 path)));
2051 continue;
2052 }
2053 }
2054 }
2056}

References AllocateDir(), DEBUG1, elog, ereport, errcode_for_file_access(), errmsg, fb(), FreeDir(), get_dirent_type(), GetRedoRecPtr(), LOG, MAXPGPATH, PG_LOGICAL_SNAPSHOTS_DIR, PGFILETYPE_ERROR, PGFILETYPE_REG, ReadDir(), ReplicationSlotsComputeLogicalRestartLSN(), snprintf, and XLogRecPtrIsValid.

Referenced by CheckPointGuts().

◆ FreeSnapshotBuilder()

void FreeSnapshotBuilder ( SnapBuild builder)

Definition at line 237 of file snapbuild.c.

238{
239 MemoryContext context = builder->context;
240
241 /* free snapshot explicitly, that contains some error checking */
242 if (builder->snapshot != NULL)
243 {
245 builder->snapshot = NULL;
246 }
247
248 /* other resources are deallocated via memory context reset */
249 MemoryContextDelete(context);
250}
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:475

References SnapBuild::context, fb(), MemoryContextDelete(), SnapBuildSnapDecRefcount(), and SnapBuild::snapshot.

Referenced by FreeDecodingContext().

◆ SnapBuildAddCommittedTxn()

static void SnapBuildAddCommittedTxn ( SnapBuild builder,
TransactionId  xid 
)
static

Definition at line 832 of file snapbuild.c.

833{
835
836 if (builder->committed.xcnt == builder->committed.xcnt_space)
837 {
838 builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
839
840 elog(DEBUG1, "increasing space for committed transactions to %u",
841 (uint32) builder->committed.xcnt_space);
842
843 builder->committed.xip = repalloc_array(builder->committed.xip,
845 builder->committed.xcnt_space);
846 }
847
848 /*
849 * TODO: It might make sense to keep the array sorted here instead of
850 * doing it every time we build a new snapshot. On the other hand this
851 * gets called repeatedly when a transaction with subtransactions commits.
852 */
853 builder->committed.xip[builder->committed.xcnt++] = xid;
854}
#define repalloc_array(pointer, type, count)
Definition fe_memutils.h:94
#define TransactionIdIsValid(xid)
Definition transam.h:41

References Assert, SnapBuild::committed, DEBUG1, elog, repalloc_array, TransactionIdIsValid, SnapBuild::xcnt, SnapBuild::xcnt_space, and SnapBuild::xip.

Referenced by SnapBuildCommitTxn().

◆ SnapBuildBuildSnapshot()

static Snapshot SnapBuildBuildSnapshot ( SnapBuild builder)
static

Definition at line 364 of file snapbuild.c.

365{
366 Snapshot snapshot;
367 Size ssize;
368
370
371 ssize = sizeof(SnapshotData)
372 + sizeof(TransactionId) * builder->committed.xcnt
373 + sizeof(TransactionId) * 1 /* toplevel xid */ ;
374
375 snapshot = MemoryContextAllocZero(builder->context, ssize);
376
378
379 /*
380 * We misuse the original meaning of SnapshotData's xip and subxip fields
381 * to make the more fitting for our needs.
382 *
383 * In the 'xip' array we store transactions that have to be treated as
384 * committed. Since we will only ever look at tuples from transactions
385 * that have modified the catalog it's more efficient to store those few
386 * that exist between xmin and xmax (frequently there are none).
387 *
388 * Snapshots that are used in transactions that have modified the catalog
389 * also use the 'subxip' array to store their toplevel xid and all the
390 * subtransaction xids so we can recognize when we need to treat rows as
391 * visible that are not in xip but still need to be visible. Subxip only
392 * gets filled when the transaction is copied into the context of a
393 * catalog modifying transaction since we otherwise share a snapshot
394 * between transactions. As long as a txn hasn't modified the catalog it
395 * doesn't need to treat any uncommitted rows as visible, so there is no
396 * need for those xids.
397 *
398 * Both arrays are qsort'ed so that we can use bsearch() on them.
399 */
402
403 snapshot->xmin = builder->xmin;
404 snapshot->xmax = builder->xmax;
405
406 /* store all transactions to be treated as committed by this snapshot */
407 snapshot->xip =
408 (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
409 snapshot->xcnt = builder->committed.xcnt;
410 memcpy(snapshot->xip,
411 builder->committed.xip,
412 builder->committed.xcnt * sizeof(TransactionId));
413
414 /* sort so we can bsearch() */
415 qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
416
417 /*
418 * Initially, subxip is empty, i.e. it's a snapshot to be used by
419 * transactions that don't modify the catalog. Will be filled by
420 * ReorderBufferCopySnap() if necessary.
421 */
422 snapshot->subxcnt = 0;
423 snapshot->subxip = NULL;
424
425 snapshot->suboverflowed = false;
426 snapshot->takenDuringRecovery = false;
427 snapshot->copied = false;
428 snapshot->curcid = FirstCommandId;
429 snapshot->active_count = 0;
430 snapshot->regd_count = 0;
431 snapshot->snapXactCompletionCount = 0;
432
433 return snapshot;
434}
#define FirstCommandId
Definition c.h:752
#define qsort(a, b, c, d)
Definition port.h:496
@ SNAPBUILD_FULL_SNAPSHOT
Definition snapbuild.h:51
@ SNAPSHOT_HISTORIC_MVCC
Definition snapshot.h:105
TransactionId xmin
Definition snapshot.h:153
int32 subxcnt
Definition snapshot.h:177
uint32 regd_count
Definition snapshot.h:201
uint32 active_count
Definition snapshot.h:200
CommandId curcid
Definition snapshot.h:183
uint32 xcnt
Definition snapshot.h:165
TransactionId * subxip
Definition snapshot.h:176
uint64 snapXactCompletionCount
Definition snapshot.h:209
TransactionId xmax
Definition snapshot.h:154
SnapshotType snapshot_type
Definition snapshot.h:140
TransactionId * xip
Definition snapshot.h:164
bool suboverflowed
Definition snapshot.h:178
bool takenDuringRecovery
Definition snapshot.h:180
#define TransactionIdIsNormal(xid)
Definition transam.h:42
int xidComparator(const void *arg1, const void *arg2)
Definition xid.c:152

References SnapshotData::active_count, Assert, SnapBuild::committed, SnapBuild::context, SnapshotData::copied, SnapshotData::curcid, fb(), FirstCommandId, memcpy(), MemoryContextAllocZero(), qsort, SnapshotData::regd_count, SNAPBUILD_FULL_SNAPSHOT, SNAPSHOT_HISTORIC_MVCC, SnapshotData::snapshot_type, SnapshotData::snapXactCompletionCount, SnapBuild::state, SnapshotData::suboverflowed, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::takenDuringRecovery, TransactionIdIsNormal, SnapBuild::xcnt, SnapshotData::xcnt, xidComparator(), SnapBuild::xip, SnapshotData::xip, SnapBuild::xmax, SnapshotData::xmax, SnapBuild::xmin, and SnapshotData::xmin.

Referenced by SnapBuildCommitTxn(), SnapBuildGetOrBuildSnapshot(), SnapBuildInitialSnapshot(), SnapBuildProcessChange(), and SnapBuildRestore().

◆ SnapBuildClearExportedSnapshot()

void SnapBuildClearExportedSnapshot ( void  )

Definition at line 603 of file snapbuild.c.

604{
606
607 /* nothing exported, that is the usual case */
608 if (!ExportInProgress)
609 return;
610
611 if (!IsTransactionState())
612 elog(ERROR, "clearing exported snapshot in wrong transaction state");
613
614 /*
615 * AbortCurrentTransaction() takes care of resetting the snapshot state,
616 * so remember SavedResourceOwnerDuringExport.
617 */
619
620 /* make sure nothing could have ever happened */
622
624}
ResourceOwner CurrentResourceOwner
Definition resowner.c:173
static ResourceOwner SavedResourceOwnerDuringExport
Definition snapbuild.c:154
static bool ExportInProgress
Definition snapbuild.c:155
bool IsTransactionState(void)
Definition xact.c:389
void AbortCurrentTransaction(void)
Definition xact.c:3501

References AbortCurrentTransaction(), CurrentResourceOwner, elog, ERROR, ExportInProgress, fb(), IsTransactionState(), and SavedResourceOwnerDuringExport.

Referenced by exec_replication_command().

◆ SnapBuildCommitTxn()

void SnapBuildCommitTxn ( SnapBuild builder,
XLogRecPtr  lsn,
TransactionId  xid,
int  nsubxacts,
TransactionId subxacts,
uint32  xinfo 
)

Definition at line 944 of file snapbuild.c.

946{
947 int nxact;
948
949 bool needs_snapshot = false;
950 bool needs_timetravel = false;
951 bool sub_needs_timetravel = false;
952
953 TransactionId xmax = xid;
954
955 /*
956 * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor
957 * will they be part of a snapshot. So we don't need to record anything.
958 */
959 if (builder->state == SNAPBUILD_START ||
960 (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
961 TransactionIdPrecedes(xid, builder->next_phase_at)))
962 {
963 /* ensure that only commits after this are getting replayed */
964 if (builder->start_decoding_at <= lsn)
965 builder->start_decoding_at = lsn + 1;
966 return;
967 }
968
969 if (builder->state < SNAPBUILD_CONSISTENT)
970 {
971 /* ensure that only commits after this are getting replayed */
972 if (builder->start_decoding_at <= lsn)
973 builder->start_decoding_at = lsn + 1;
974
975 /*
976 * If building an exportable snapshot, force xid to be tracked, even
977 * if the transaction didn't modify the catalog.
978 */
979 if (builder->building_full_snapshot)
980 {
981 needs_timetravel = true;
982 }
983 }
984
985 for (nxact = 0; nxact < nsubxacts; nxact++)
986 {
987 TransactionId subxid = subxacts[nxact];
988
989 /*
990 * Add subtransaction to base snapshot if catalog modifying, we don't
991 * distinguish to toplevel transactions there.
992 */
993 if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo))
994 {
996 needs_snapshot = true;
997
998 elog(DEBUG1, "found subtransaction %u:%u with catalog changes",
999 xid, subxid);
1000
1001 SnapBuildAddCommittedTxn(builder, subxid);
1002
1003 if (NormalTransactionIdFollows(subxid, xmax))
1004 xmax = subxid;
1005 }
1006
1007 /*
1008 * If we're forcing timetravel we also need visibility information
1009 * about subtransaction, so keep track of subtransaction's state, even
1010 * if not catalog modifying. Don't need to distribute a snapshot in
1011 * that case.
1012 */
1013 else if (needs_timetravel)
1014 {
1015 SnapBuildAddCommittedTxn(builder, subxid);
1016 if (NormalTransactionIdFollows(subxid, xmax))
1017 xmax = subxid;
1018 }
1019 }
1020
1021 /* if top-level modified catalog, it'll need a snapshot */
1022 if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo))
1023 {
1024 elog(DEBUG2, "found top level transaction %u, with catalog changes",
1025 xid);
1026 needs_snapshot = true;
1027 needs_timetravel = true;
1028 SnapBuildAddCommittedTxn(builder, xid);
1029 }
1030 else if (sub_needs_timetravel)
1031 {
1032 /* track toplevel txn as well, subxact alone isn't meaningful */
1033 elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransactions",
1034 xid);
1035 needs_timetravel = true;
1036 SnapBuildAddCommittedTxn(builder, xid);
1037 }
1038 else if (needs_timetravel)
1039 {
1040 elog(DEBUG2, "forced transaction %u to do timetravel", xid);
1041
1042 SnapBuildAddCommittedTxn(builder, xid);
1043 }
1044
1045 if (!needs_timetravel)
1046 {
1047 /* record that we cannot export a general snapshot anymore */
1048 builder->committed.includes_all_transactions = false;
1049 }
1050
1052
1053 /*
1054 * Adjust xmax of the snapshot builder, we only do that for committed,
1055 * catalog modifying, transactions, everything else isn't interesting for
1056 * us since we'll never look at the respective rows.
1057 */
1058 if (needs_timetravel &&
1059 (!TransactionIdIsValid(builder->xmax) ||
1060 TransactionIdFollowsOrEquals(xmax, builder->xmax)))
1061 {
1062 builder->xmax = xmax;
1063 TransactionIdAdvance(builder->xmax);
1064 }
1065
1066 /* if there's any reason to build a historic snapshot, do so now */
1067 if (needs_snapshot)
1068 {
1069 /*
1070 * If we haven't built a complete snapshot yet there's no need to hand
1071 * it out, it wouldn't (and couldn't) be used anyway.
1072 */
1073 if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
1074 return;
1075
1076 /*
1077 * Decrease the snapshot builder's refcount of the old snapshot, note
1078 * that it still will be used if it has been handed out to the
1079 * reorderbuffer earlier.
1080 */
1081 if (builder->snapshot)
1083
1084 builder->snapshot = SnapBuildBuildSnapshot(builder);
1085
1086 /* we might need to execute invalidations, add snapshot */
1087 if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
1088 {
1090 ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
1091 builder->snapshot);
1092 }
1093
1094 /* refcount of the snapshot builder for the new snapshot */
1096
1097 /*
1098 * Add a new catalog snapshot and invalidations messages to all
1099 * currently running transactions.
1100 */
1101 SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
1102 }
1103}
#define DEBUG2
Definition elog.h:30
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
static void SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
Definition snapbuild.c:832
static bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid, uint32 xinfo)
Definition snapbuild.c:1110
static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
Definition snapbuild.c:734
@ SNAPBUILD_BUILDING_SNAPSHOT
Definition snapbuild.h:41
static bool TransactionIdFollowsOrEquals(TransactionId id1, TransactionId id2)
Definition transam.h:312
#define NormalTransactionIdFollows(id1, id2)
Definition transam.h:152
#define TransactionIdAdvance(dest)
Definition transam.h:91

References Assert, SnapBuild::building_full_snapshot, SnapBuild::committed, DEBUG1, DEBUG2, elog, fb(), SnapBuild::includes_all_transactions, SnapBuild::next_phase_at, NormalTransactionIdFollows, SnapBuild::reorder, ReorderBufferSetBaseSnapshot(), ReorderBufferXidHasBaseSnapshot(), SNAPBUILD_BUILDING_SNAPSHOT, SNAPBUILD_CONSISTENT, SNAPBUILD_FULL_SNAPSHOT, SNAPBUILD_START, SnapBuildAddCommittedTxn(), SnapBuildBuildSnapshot(), SnapBuildDistributeSnapshotAndInval(), SnapBuildSnapDecRefcount(), SnapBuildSnapIncRefcount(), SnapBuildXidHasCatalogChanges(), SnapBuild::snapshot, SnapBuild::start_decoding_at, SnapBuild::state, TransactionIdAdvance, TransactionIdFollowsOrEquals(), TransactionIdIsValid, TransactionIdPrecedes(), and SnapBuild::xmax.

Referenced by DecodeCommit().

◆ SnapBuildCurrentState()

SnapBuildState SnapBuildCurrentState ( SnapBuild builder)

◆ SnapBuildDistributeSnapshotAndInval()

static void SnapBuildDistributeSnapshotAndInval ( SnapBuild builder,
XLogRecPtr  lsn,
TransactionId  xid 
)
static

Definition at line 734 of file snapbuild.c.

735{
737 ReorderBufferTXN *txn;
738
739 /*
740 * Iterate through all toplevel transactions. This can include
741 * subtransactions which we just don't yet know to be that, but that's
742 * fine, they will just get an unnecessary snapshot and invalidations
743 * queued.
744 */
746 {
747 txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
748
750
751 /*
752 * If we don't have a base snapshot yet, there are no changes in this
753 * transaction which in turn implies we don't yet need a snapshot at
754 * all. We'll add a snapshot when the first change gets queued.
755 *
756 * Similarly, we don't need to add invalidations to a transaction
757 * whose base snapshot is not yet set. Once a base snapshot is built,
758 * it will include the xids of committed transactions that have
759 * modified the catalog, thus reflecting the new catalog contents. The
760 * existing catalog cache will have already been invalidated after
761 * processing the invalidations in the transaction that modified
762 * catalogs, ensuring that a fresh cache is constructed during
763 * decoding.
764 *
765 * NB: This works correctly even for subtransactions because
766 * ReorderBufferAssignChild() takes care to transfer the base snapshot
767 * to the top-level transaction, and while iterating the changequeue
768 * we'll get the change from the subtxn.
769 */
770 if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
771 continue;
772
773 /*
774 * We don't need to add snapshot or invalidations to prepared
775 * transactions as they should not see the new catalog contents.
776 */
777 if (rbtxn_is_prepared(txn))
778 continue;
779
780 elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%08X",
781 txn->xid, LSN_FORMAT_ARGS(lsn));
782
783 /*
784 * increase the snapshot's refcount for the transaction we are handing
785 * it out to
786 */
788 ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
789 builder->snapshot);
790
791 /*
792 * Add invalidation messages to the reorder buffer of in-progress
793 * transactions except the current committed transaction, for which we
794 * will execute invalidations at the end.
795 *
796 * It is required, otherwise, we will end up using the stale catcache
797 * contents built by the current transaction even after its decoding,
798 * which should have been invalidated due to concurrent catalog
799 * changing transaction.
800 *
801 * Distribute only the invalidation messages generated by the current
802 * committed transaction. Invalidation messages received from other
803 * transactions would have already been propagated to the relevant
804 * in-progress transactions. This transaction would have processed
805 * those invalidations, ensuring that subsequent transactions observe
806 * a consistent cache state.
807 */
808 if (txn->xid != xid)
809 {
810 uint32 ninvalidations;
812
813 ninvalidations = ReorderBufferGetInvalidations(builder->reorder,
814 xid, &msgs);
815
816 if (ninvalidations > 0)
817 {
818 Assert(msgs != NULL);
819
821 txn->xid, lsn,
822 ninvalidations, msgs);
823 }
824 }
825 }
826}
#define dlist_foreach(iter, lhead)
Definition ilist.h:623
#define dlist_container(type, membername, ptr)
Definition ilist.h:593
uint32 ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid, SharedInvalidationMessage **msgs)
void ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
void ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
#define rbtxn_is_prepared(txn)
TransactionId xid
dlist_head toplevel_by_lsn

References Assert, DEBUG2, dlist_container, dlist_foreach, elog, fb(), LSN_FORMAT_ARGS, rbtxn_is_prepared, SnapBuild::reorder, ReorderBufferAddDistributedInvalidations(), ReorderBufferAddSnapshot(), ReorderBufferGetInvalidations(), ReorderBufferXidHasBaseSnapshot(), SnapBuildSnapIncRefcount(), SnapBuild::snapshot, ReorderBuffer::toplevel_by_lsn, TransactionIdIsValid, and ReorderBufferTXN::xid.

Referenced by SnapBuildCommitTxn().

◆ SnapBuildExportSnapshot()

const char * SnapBuildExportSnapshot ( SnapBuild builder)

Definition at line 542 of file snapbuild.c.

543{
545 char *snapname;
546
548 elog(ERROR, "cannot export a snapshot from within a transaction");
549
551 elog(ERROR, "can only export one snapshot at a time");
552
554 ExportInProgress = true;
555
557
558 /* There doesn't seem to a nice API to set these */
560 XactReadOnly = true;
561
563
564 /*
565 * now that we've built a plain snapshot, make it active and use the
566 * normal mechanisms for exporting it
567 */
569
570 ereport(LOG,
571 (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
572 "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
573 snap->xcnt,
574 snapname, snap->xcnt)));
575 return snapname;
576}
int int int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...) pg_attribute_printf(1
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition snapbuild.c:444
char * ExportSnapshot(Snapshot snapshot)
Definition snapmgr.c:1115
bool IsTransactionOrTransactionBlock(void)
Definition xact.c:5040
bool XactReadOnly
Definition xact.c:84
void StartTransactionCommand(void)
Definition xact.c:3109
int XactIsoLevel
Definition xact.c:81
#define XACT_REPEATABLE_READ
Definition xact.h:38

References CurrentResourceOwner, elog, ereport, errmsg_plural(), ERROR, ExportInProgress, ExportSnapshot(), fb(), IsTransactionOrTransactionBlock(), LOG, SavedResourceOwnerDuringExport, SnapBuildInitialSnapshot(), StartTransactionCommand(), XACT_REPEATABLE_READ, XactIsoLevel, and XactReadOnly.

Referenced by CreateReplicationSlot().

◆ SnapBuildFindSnapshot()

static bool SnapBuildFindSnapshot ( SnapBuild builder,
XLogRecPtr  lsn,
xl_running_xacts running 
)
static

Definition at line 1242 of file snapbuild.c.

1243{
1244 /* ---
1245 * Build catalog decoding snapshot incrementally using information about
1246 * the currently running transactions. There are several ways to do that:
1247 *
1248 * a) There were no running transactions when the xl_running_xacts record
1249 * was inserted, jump to CONSISTENT immediately. We might find such a
1250 * state while waiting on c)'s sub-states.
1251 *
1252 * b) This (in a previous run) or another decoding slot serialized a
1253 * snapshot to disk that we can use. Can't use this method while finding
1254 * the start point for decoding changes as the restart LSN would be an
1255 * arbitrary LSN but we need to find the start point to extract changes
1256 * where we won't see the data for partial transactions. Also, we cannot
1257 * use this method when a slot needs a full snapshot for export or direct
1258 * use, as that snapshot will only contain catalog modifying transactions.
1259 *
1260 * c) First incrementally build a snapshot for catalog tuples
1261 * (BUILDING_SNAPSHOT), that requires all, already in-progress,
1262 * transactions to finish. Every transaction starting after that
1263 * (FULL_SNAPSHOT state), has enough information to be decoded. But
1264 * for older running transactions no viable snapshot exists yet, so
1265 * CONSISTENT will only be reached once all of those have finished.
1266 * ---
1267 */
1268
1269 /*
1270 * xl_running_xacts record is older than what we can use, we might not
1271 * have all necessary catalog rows anymore.
1272 */
1275 builder->initial_xmin_horizon))
1276 {
1278 errmsg_internal("skipping snapshot at %X/%08X while building logical decoding snapshot, xmin horizon too low",
1279 LSN_FORMAT_ARGS(lsn)),
1280 errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
1281 builder->initial_xmin_horizon, running->oldestRunningXid));
1282
1283
1285
1286 return true;
1287 }
1288
1289 /*
1290 * a) No transaction were running, we can jump to consistent.
1291 *
1292 * This is not affected by races around xl_running_xacts, because we can
1293 * miss transaction commits, but currently not transactions starting.
1294 *
1295 * NB: We might have already started to incrementally assemble a snapshot,
1296 * so we need to be careful to deal with that.
1297 */
1298 if (running->oldestRunningXid == running->nextXid)
1299 {
1300 if (!XLogRecPtrIsValid(builder->start_decoding_at) ||
1301 builder->start_decoding_at <= lsn)
1302 /* can decode everything after this */
1303 builder->start_decoding_at = lsn + 1;
1304
1305 /* As no transactions were running xmin/xmax can be trivially set. */
1306 builder->xmin = running->nextXid; /* < are finished */
1307 builder->xmax = running->nextXid; /* >= are running */
1308
1309 /* so we can safely use the faster comparisons */
1312
1313 builder->state = SNAPBUILD_CONSISTENT;
1315
1317 errmsg("logical decoding found consistent point at %X/%08X",
1318 LSN_FORMAT_ARGS(lsn)),
1319 errdetail("There are no running transactions."));
1320
1321 return false;
1322 }
1323
1324 /*
1325 * b) valid on disk state and while neither building full snapshot nor
1326 * creating a slot.
1327 */
1328 else if (!builder->building_full_snapshot &&
1329 !builder->in_slot_creation &&
1330 SnapBuildRestore(builder, lsn))
1331 {
1332 /* there won't be any state to cleanup */
1333 return false;
1334 }
1335
1336 /*
1337 * c) transition from START to BUILDING_SNAPSHOT.
1338 *
1339 * In START state, and a xl_running_xacts record with running xacts is
1340 * encountered. In that case, switch to BUILDING_SNAPSHOT state, and
1341 * record xl_running_xacts->nextXid. Once all running xacts have finished
1342 * (i.e. they're all >= nextXid), we have a complete catalog snapshot. It
1343 * might look that we could use xl_running_xacts's ->xids information to
1344 * get there quicker, but that is problematic because transactions marked
1345 * as running, might already have inserted their commit record - it's
1346 * infeasible to change that with locking.
1347 */
1348 else if (builder->state == SNAPBUILD_START)
1349 {
1351 builder->next_phase_at = running->nextXid;
1352
1353 /*
1354 * Start with an xmin/xmax that's correct for future, when all the
1355 * currently running transactions have finished. We'll update both
1356 * while waiting for the pending transactions to finish.
1357 */
1358 builder->xmin = running->nextXid; /* < are finished */
1359 builder->xmax = running->nextXid; /* >= are running */
1360
1361 /* so we can safely use the faster comparisons */
1364
1365 ereport(LOG,
1366 errmsg("logical decoding found initial starting point at %X/%08X",
1367 LSN_FORMAT_ARGS(lsn)),
1368 errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1369 running->xcnt, running->nextXid));
1370
1371 SnapBuildWaitSnapshot(running, running->nextXid);
1372 }
1373
1374 /*
1375 * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
1376 *
1377 * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
1378 * is >= than nextXid from when we switched to BUILDING_SNAPSHOT. This
1379 * means all transactions starting afterwards have enough information to
1380 * be decoded. Switch to FULL_SNAPSHOT.
1381 */
1382 else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
1384 running->oldestRunningXid))
1385 {
1386 builder->state = SNAPBUILD_FULL_SNAPSHOT;
1387 builder->next_phase_at = running->nextXid;
1388
1389 ereport(LOG,
1390 errmsg("logical decoding found initial consistent point at %X/%08X",
1391 LSN_FORMAT_ARGS(lsn)),
1392 errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1393 running->xcnt, running->nextXid));
1394
1395 SnapBuildWaitSnapshot(running, running->nextXid);
1396 }
1397
1398 /*
1399 * c) transition from FULL_SNAPSHOT to CONSISTENT.
1400 *
1401 * In FULL_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid is
1402 * >= than nextXid from when we switched to FULL_SNAPSHOT. This means all
1403 * transactions that are currently in progress have a catalog snapshot,
1404 * and all their changes have been collected. Switch to CONSISTENT.
1405 */
1406 else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
1408 running->oldestRunningXid))
1409 {
1410 builder->state = SNAPBUILD_CONSISTENT;
1412
1414 errmsg("logical decoding found consistent point at %X/%08X",
1415 LSN_FORMAT_ARGS(lsn)),
1416 errdetail("There are no old transactions anymore."));
1417 }
1418
1419 /*
1420 * We already started to track running xacts and need to wait for all
1421 * in-progress ones to finish. We fall through to the normal processing of
1422 * records so incremental cleanup can be performed.
1423 */
1424 return true;
1425}
int int errdetail_internal(const char *fmt,...) pg_attribute_printf(1
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
Definition snapbuild.c:1439
TransactionId oldestRunningXid
Definition standbydefs.h:53
TransactionId nextXid
Definition standbydefs.h:52
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition transam.h:282
#define NormalTransactionIdPrecedes(id1, id2)
Definition transam.h:147

References Assert, SnapBuild::building_full_snapshot, DEBUG1, ereport, errdetail(), errdetail_internal(), errmsg, errmsg_internal(), SnapBuild::in_slot_creation, SnapBuild::initial_xmin_horizon, InvalidTransactionId, LOG, LogicalDecodingLogLevel, LSN_FORMAT_ARGS, SnapBuild::next_phase_at, xl_running_xacts::nextXid, NormalTransactionIdPrecedes, xl_running_xacts::oldestRunningXid, SNAPBUILD_BUILDING_SNAPSHOT, SNAPBUILD_CONSISTENT, SNAPBUILD_FULL_SNAPSHOT, SNAPBUILD_START, SnapBuildRestore(), SnapBuildWaitSnapshot(), SnapBuild::start_decoding_at, SnapBuild::state, TransactionIdIsNormal, TransactionIdPrecedesOrEquals(), xl_running_xacts::xcnt, XLogRecPtrIsValid, SnapBuild::xmax, and SnapBuild::xmin.

Referenced by SnapBuildProcessRunningXacts().

◆ SnapBuildFreeSnapshot()

static void SnapBuildFreeSnapshot ( Snapshot  snap)
static

Definition at line 256 of file snapbuild.c.

257{
258 /* make sure we don't get passed an external snapshot */
259 Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
260
261 /* make sure nobody modified our snapshot */
262 Assert(snap->curcid == FirstCommandId);
263 Assert(!snap->suboverflowed);
264 Assert(!snap->takenDuringRecovery);
265 Assert(snap->regd_count == 0);
266
267 /* slightly more likely, so it's checked even without c-asserts */
268 if (snap->copied)
269 elog(ERROR, "cannot free a copied snapshot");
270
271 if (snap->active_count)
272 elog(ERROR, "cannot free an active snapshot");
273
274 pfree(snap);
275}

References Assert, elog, ERROR, fb(), FirstCommandId, pfree(), and SNAPSHOT_HISTORIC_MVCC.

Referenced by SnapBuildSnapDecRefcount().

◆ SnapBuildGetOrBuildSnapshot()

Snapshot SnapBuildGetOrBuildSnapshot ( SnapBuild builder)

Definition at line 582 of file snapbuild.c.

583{
584 Assert(builder->state == SNAPBUILD_CONSISTENT);
585
586 /* only build a new snapshot if we don't have a prebuilt one */
587 if (builder->snapshot == NULL)
588 {
589 builder->snapshot = SnapBuildBuildSnapshot(builder);
590 /* increase refcount for the snapshot builder */
592 }
593
594 return builder->snapshot;
595}

References Assert, fb(), SNAPBUILD_CONSISTENT, SnapBuildBuildSnapshot(), SnapBuildSnapIncRefcount(), SnapBuild::snapshot, and SnapBuild::state.

Referenced by logicalmsg_decode().

◆ SnapBuildGetTwoPhaseAt()

XLogRecPtr SnapBuildGetTwoPhaseAt ( SnapBuild builder)

Definition at line 290 of file snapbuild.c.

291{
292 return builder->two_phase_at;
293}

References SnapBuild::two_phase_at.

Referenced by DecodeCommit().

◆ SnapBuildInitialSnapshot()

Snapshot SnapBuildInitialSnapshot ( SnapBuild builder)

Definition at line 444 of file snapbuild.c.

445{
447 TransactionId xid;
450 int newxcnt = 0;
451
454
455 /* don't allow older snapshots */
456 InvalidateCatalogSnapshot(); /* about to overwrite MyProc->xmin */
458 elog(ERROR, "cannot build an initial slot snapshot when snapshots exist");
460
461 if (builder->state != SNAPBUILD_CONSISTENT)
462 elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
463
465 elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
466
467 /* so we don't overwrite the existing value */
469 elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid");
470
471 snap = SnapBuildBuildSnapshot(builder);
472
473 /*
474 * We know that snap->xmin is alive, enforced by the logical xmin
475 * mechanism. Due to that we can do this without locks, we're only
476 * changing our own value.
477 *
478 * Building an initial snapshot is expensive and an unenforced xmin
479 * horizon would have bad consequences, therefore always double-check that
480 * the horizon is enforced.
481 */
485
487 elog(ERROR, "cannot build an initial slot snapshot as oldest safe xid %u follows snapshot's xmin %u",
488 safeXid, snap->xmin);
489
490 MyProc->xmin = snap->xmin;
491
492 /* allocate in transaction context */
494
495 /*
496 * snapbuild.c builds transactions in an "inverted" manner, which means it
497 * stores committed transactions in ->xip, not ones in progress. Build a
498 * classical snapshot by marking all non-committed transactions as
499 * in-progress. This can be expensive.
500 */
501 for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
502 {
503 void *test;
504
505 /*
506 * Check whether transaction committed using the decoding snapshot
507 * meaning of ->xip.
508 */
509 test = bsearch(&xid, snap->xip, snap->xcnt,
511
512 if (test == NULL)
513 {
517 errmsg("initial slot snapshot too large")));
518
519 newxip[newxcnt++] = xid;
520 }
521
523 }
524
525 /* adjust remaining snapshot fields as needed */
526 snap->snapshot_type = SNAPSHOT_MVCC;
527 snap->xcnt = newxcnt;
528 snap->xip = newxip;
529
530 return snap;
531}
#define palloc_array(type, count)
Definition fe_memutils.h:91
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_SHARED
Definition lwlock.h:105
#define ERRCODE_T_R_SERIALIZATION_FAILURE
Definition pgbench.c:77
static void test(void)
TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly)
Definition procarray.c:2898
int GetMaxSnapshotXidCount(void)
Definition procarray.c:2008
bool HistoricSnapshotActive(void)
Definition snapmgr.c:1692
bool HaveRegisteredOrActiveSnapshot(void)
Definition snapmgr.c:1644
void InvalidateCatalogSnapshot(void)
Definition snapmgr.c:455
@ SNAPSHOT_MVCC
Definition snapshot.h:46
PGPROC * MyProc
Definition proc.c:71
TransactionId xmin
Definition proc.h:242
static bool TransactionIdFollows(TransactionId id1, TransactionId id2)
Definition transam.h:297

References Assert, SnapBuild::building_full_snapshot, SnapBuild::committed, elog, ereport, errcode(), ERRCODE_T_R_SERIALIZATION_FAILURE, errmsg, ERROR, fb(), GetMaxSnapshotXidCount(), GetOldestSafeDecodingTransactionId(), HaveRegisteredOrActiveSnapshot(), HistoricSnapshotActive(), SnapBuild::includes_all_transactions, InvalidateCatalogSnapshot(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyProc, NormalTransactionIdPrecedes, palloc_array, SNAPBUILD_CONSISTENT, SnapBuildBuildSnapshot(), SNAPSHOT_MVCC, SnapBuild::state, test(), TransactionIdAdvance, TransactionIdFollows(), TransactionIdIsValid, XACT_REPEATABLE_READ, XactIsoLevel, xidComparator(), and PGPROC::xmin.

Referenced by CreateReplicationSlot(), RepackWorkerMain(), and SnapBuildExportSnapshot().

◆ SnapBuildProcessChange()

bool SnapBuildProcessChange ( SnapBuild builder,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 642 of file snapbuild.c.

643{
644 /*
645 * We can't handle data in transactions if we haven't built a snapshot
646 * yet, so don't store them.
647 */
648 if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
649 return false;
650
651 /*
652 * No point in keeping track of changes in transactions that we don't have
653 * enough information about to decode. This means that they started before
654 * we got into the SNAPBUILD_FULL_SNAPSHOT state.
655 */
656 if (builder->state < SNAPBUILD_CONSISTENT &&
658 return false;
659
660 /*
661 * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
662 * be needed to decode the change we're currently processing.
663 */
664 if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
665 {
666 /* only build a new snapshot if we don't have a prebuilt one */
667 if (builder->snapshot == NULL)
668 {
669 builder->snapshot = SnapBuildBuildSnapshot(builder);
670 /* increase refcount for the snapshot builder */
672 }
673
674 /*
675 * Increase refcount for the transaction we're handing the snapshot
676 * out to.
677 */
679 ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
680 builder->snapshot);
681 }
682
683 return true;
684}

References fb(), SnapBuild::next_phase_at, SnapBuild::reorder, ReorderBufferSetBaseSnapshot(), ReorderBufferXidHasBaseSnapshot(), SNAPBUILD_CONSISTENT, SNAPBUILD_FULL_SNAPSHOT, SnapBuildBuildSnapshot(), SnapBuildSnapIncRefcount(), SnapBuild::snapshot, SnapBuild::state, and TransactionIdPrecedes().

Referenced by heap2_decode(), heap_decode(), and logicalmsg_decode().

◆ SnapBuildProcessNewCid()

void SnapBuildProcessNewCid ( SnapBuild builder,
TransactionId  xid,
XLogRecPtr  lsn,
xl_heap_new_cid xlrec 
)

Definition at line 692 of file snapbuild.c.

694{
696
697 /*
698 * we only log new_cid's if a catalog tuple was modified, so mark the
699 * transaction as containing catalog modifications
700 */
701 ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
702
703 ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
704 xlrec->target_locator, xlrec->target_tid,
705 xlrec->cmin, xlrec->cmax,
706 xlrec->combocid);
707
708 /* figure out new command id */
709 if (xlrec->cmin != InvalidCommandId &&
710 xlrec->cmax != InvalidCommandId)
711 cid = Max(xlrec->cmin, xlrec->cmax);
712 else if (xlrec->cmax != InvalidCommandId)
713 cid = xlrec->cmax;
714 else if (xlrec->cmin != InvalidCommandId)
715 cid = xlrec->cmin;
716 else
717 {
718 cid = InvalidCommandId; /* silence compiler */
719 elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
720 }
721
722 ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
723}
#define InvalidCommandId
Definition c.h:753
#define Max(x, y)
Definition c.h:1085
uint32 CommandId
Definition c.h:750
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
void ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileLocator locator, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)

References elog, ERROR, fb(), InvalidCommandId, Max, SnapBuild::reorder, ReorderBufferAddNewCommandId(), ReorderBufferAddNewTupleCids(), and ReorderBufferXidSetCatalogChanges().

Referenced by heap2_decode().

◆ SnapBuildProcessRunningXacts()

void SnapBuildProcessRunningXacts ( SnapBuild builder,
XLogRecPtr  lsn,
xl_running_xacts running 
)

Definition at line 1140 of file snapbuild.c.

1141{
1142 ReorderBufferTXN *txn;
1143 TransactionId xmin;
1144
1145 /*
1146 * If we're not consistent yet, inspect the record to see whether it
1147 * allows to get closer to being consistent. If we are consistent, dump
1148 * our snapshot so others or we, after a restart, can use it.
1149 */
1150 if (builder->state < SNAPBUILD_CONSISTENT)
1151 {
1152 /* returns false if there's no point in performing cleanup just yet */
1153 if (!SnapBuildFindSnapshot(builder, lsn, running))
1154 return;
1155 }
1156 else
1157 SnapBuildSerialize(builder, lsn);
1158
1159 /*
1160 * Update range of interesting xids based on the running xacts
1161 * information. We don't increase ->xmax using it, because once we are in
1162 * a consistent state we can do that ourselves and much more efficiently
1163 * so, because we only need to do it for catalog transactions since we
1164 * only ever look at those.
1165 *
1166 * NB: We only increase xmax when a catalog modifying transaction commits
1167 * (see SnapBuildCommitTxn). Because of this, xmax can be lower than
1168 * xmin, which looks odd but is correct and actually more efficient, since
1169 * we hit fast paths in heapam_visibility.c.
1170 */
1171 builder->xmin = running->oldestRunningXid;
1172
1173 /* Remove transactions we don't need to keep track off anymore */
1174 SnapBuildPurgeOlderTxn(builder);
1175
1176 /*
1177 * Advance the xmin limit for the current replication slot, to allow
1178 * vacuum to clean up the tuples this slot has been protecting.
1179 *
1180 * The reorderbuffer might have an xmin among the currently running
1181 * snapshots; use it if so. If not, we need only consider the snapshots
1182 * we'll produce later, which can't be less than the oldest running xid in
1183 * the record we're reading now.
1184 */
1185 xmin = ReorderBufferGetOldestXmin(builder->reorder);
1186 if (xmin == InvalidTransactionId)
1187 xmin = running->oldestRunningXid;
1188 elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
1189 builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
1190 LogicalIncreaseXminForSlot(lsn, xmin);
1191
1192 /*
1193 * Also tell the slot where we can restart decoding from. We don't want to
1194 * do that after every commit because changing that implies an fsync of
1195 * the logical slot's state file, so we only do it every time we see a
1196 * running xacts record.
1197 *
1198 * Do so by looking for the oldest in progress transaction (determined by
1199 * the first LSN of any of its relevant records). Every transaction
1200 * remembers the last location we stored the snapshot to disk before its
1201 * beginning. That point is where we can restart from.
1202 */
1203
1204 /*
1205 * Can't know about a serialized snapshot's location if we're not
1206 * consistent.
1207 */
1208 if (builder->state < SNAPBUILD_CONSISTENT)
1209 return;
1210
1211 txn = ReorderBufferGetOldestTXN(builder->reorder);
1212
1213 /*
1214 * oldest ongoing txn might have started when we didn't yet serialize
1215 * anything because we hadn't reached a consistent state yet.
1216 */
1217 if (txn != NULL && XLogRecPtrIsValid(txn->restart_decoding_lsn))
1219
1220 /*
1221 * No in-progress transaction, can reuse the last serialized snapshot if
1222 * we have one.
1223 */
1224 else if (txn == NULL &&
1228 builder->last_serialized_snapshot);
1229}
#define DEBUG3
Definition elog.h:29
void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
Definition logical.c:1737
void LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
Definition logical.c:1669
TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb)
ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *rb)
static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
Definition snapbuild.c:1242
static void SnapBuildPurgeOlderTxn(SnapBuild *builder)
Definition snapbuild.c:867
XLogRecPtr restart_decoding_lsn
XLogRecPtr current_restart_decoding_lsn

References ReorderBuffer::current_restart_decoding_lsn, DEBUG3, elog, fb(), InvalidTransactionId, SnapBuild::last_serialized_snapshot, LogicalIncreaseRestartDecodingForSlot(), LogicalIncreaseXminForSlot(), xl_running_xacts::oldestRunningXid, SnapBuild::reorder, ReorderBufferGetOldestTXN(), ReorderBufferGetOldestXmin(), ReorderBufferTXN::restart_decoding_lsn, SNAPBUILD_CONSISTENT, SnapBuildFindSnapshot(), SnapBuildPurgeOlderTxn(), SnapBuildSerialize(), SnapBuild::state, XLogRecPtrIsValid, SnapBuild::xmax, and SnapBuild::xmin.

Referenced by standby_decode().

◆ SnapBuildPurgeOlderTxn()

static void SnapBuildPurgeOlderTxn ( SnapBuild builder)
static

Definition at line 867 of file snapbuild.c.

868{
869 int off;
870 TransactionId *workspace;
871 int surviving_xids = 0;
872
873 /* not ready yet */
874 if (!TransactionIdIsNormal(builder->xmin))
875 return;
876
877 /* TODO: Neater algorithm than just copying and iterating? */
878 workspace =
880 builder->committed.xcnt * sizeof(TransactionId));
881
882 /* copy xids that still are interesting to workspace */
883 for (off = 0; off < builder->committed.xcnt; off++)
884 {
885 if (NormalTransactionIdPrecedes(builder->committed.xip[off],
886 builder->xmin))
887 ; /* remove */
888 else
889 workspace[surviving_xids++] = builder->committed.xip[off];
890 }
891
892 /* copy workspace back to persistent state */
893 memcpy(builder->committed.xip, workspace,
894 surviving_xids * sizeof(TransactionId));
895
896 elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
898 builder->xmin, builder->xmax);
899 builder->committed.xcnt = surviving_xids;
900
901 pfree(workspace);
902
903 /*
904 * Purge xids in ->catchange as well. The purged array must also be sorted
905 * in xidComparator order.
906 */
907 if (builder->catchange.xcnt > 0)
908 {
909 /*
910 * Since catchange.xip is sorted, we find the lower bound of xids that
911 * are still interesting.
912 */
913 for (off = 0; off < builder->catchange.xcnt; off++)
914 {
916 builder->xmin))
917 break;
918 }
919
920 surviving_xids = builder->catchange.xcnt - off;
921
922 if (surviving_xids > 0)
923 {
924 memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
925 surviving_xids * sizeof(TransactionId));
926 }
927 else
928 {
929 pfree(builder->catchange.xip);
930 builder->catchange.xip = NULL;
931 }
932
933 elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
935 builder->xmin, builder->xmax);
936 builder->catchange.xcnt = surviving_xids;
937 }
938}
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition mcxt.c:1235

References SnapBuild::catchange, SnapBuild::committed, SnapBuild::context, DEBUG3, elog, fb(), memcpy(), MemoryContextAlloc(), NormalTransactionIdPrecedes, pfree(), TransactionIdFollowsOrEquals(), TransactionIdIsNormal, SnapBuild::xcnt, SnapBuild::xip, SnapBuild::xmax, and SnapBuild::xmin.

Referenced by SnapBuildProcessRunningXacts().

◆ SnapBuildResetExportedSnapshotState()

void SnapBuildResetExportedSnapshotState ( void  )

Definition at line 630 of file snapbuild.c.

631{
633 ExportInProgress = false;
634}

References ExportInProgress, fb(), and SavedResourceOwnerDuringExport.

Referenced by AbortTransaction().

◆ SnapBuildRestore()

static bool SnapBuildRestore ( SnapBuild builder,
XLogRecPtr  lsn 
)
static

Definition at line 1845 of file snapbuild.c.

1846{
1847 SnapBuildOnDisk ondisk;
1848
1849 /* no point in loading a snapshot if we're already there */
1850 if (builder->state == SNAPBUILD_CONSISTENT)
1851 return false;
1852
1853 /* validate and restore the snapshot to 'ondisk' */
1854 if (!SnapBuildRestoreSnapshot(&ondisk, lsn, builder->context, true))
1855 return false;
1856
1857 /*
1858 * ok, we now have a sensible snapshot here, figure out if it has more
1859 * information than we have.
1860 */
1861
1862 /*
1863 * We are only interested in consistent snapshots for now, comparing
1864 * whether one incomplete snapshot is more "advanced" seems to be
1865 * unnecessarily complex.
1866 */
1867 if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
1869
1870 /*
1871 * Don't use a snapshot that requires an xmin that we cannot guarantee to
1872 * be available.
1873 */
1876
1877 /*
1878 * Consistent snapshots have no next phase. Reset next_phase_at as it is
1879 * possible that an old value may remain.
1880 */
1883
1884 /* ok, we think the snapshot is sensible, copy over everything important */
1885 builder->xmin = ondisk.builder.xmin;
1886 builder->xmax = ondisk.builder.xmax;
1887 builder->state = ondisk.builder.state;
1888
1889 builder->committed.xcnt = ondisk.builder.committed.xcnt;
1890 /* We only allocated/stored xcnt, not xcnt_space xids ! */
1891 /* don't overwrite preallocated xip, if we don't have anything here */
1892 if (builder->committed.xcnt > 0)
1893 {
1894 pfree(builder->committed.xip);
1895 builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
1896 builder->committed.xip = ondisk.builder.committed.xip;
1897 }
1898 ondisk.builder.committed.xip = NULL;
1899
1900 /* set catalog modifying transactions */
1901 if (builder->catchange.xip)
1902 pfree(builder->catchange.xip);
1903 builder->catchange.xcnt = ondisk.builder.catchange.xcnt;
1904 builder->catchange.xip = ondisk.builder.catchange.xip;
1905 ondisk.builder.catchange.xip = NULL;
1906
1907 /* our snapshot is not interesting anymore, build a new one */
1908 if (builder->snapshot != NULL)
1909 {
1911 }
1912 builder->snapshot = SnapBuildBuildSnapshot(builder);
1914
1916
1917 Assert(builder->state == SNAPBUILD_CONSISTENT);
1918
1920 errmsg("logical decoding found consistent point at %X/%08X",
1921 LSN_FORMAT_ARGS(lsn)),
1922 errdetail("Logical decoding will begin using saved snapshot."));
1923 return true;
1924
1926 if (ondisk.builder.committed.xip != NULL)
1927 pfree(ondisk.builder.committed.xip);
1928 if (ondisk.builder.catchange.xip != NULL)
1929 pfree(ondisk.builder.catchange.xip);
1930 return false;
1931}

References Assert, SnapBuildOnDisk::builder, SnapBuild::catchange, SnapBuild::committed, SnapBuild::context, ereport, errdetail(), errmsg, fb(), SnapBuild::initial_xmin_horizon, InvalidTransactionId, LogicalDecodingLogLevel, LSN_FORMAT_ARGS, SnapBuild::next_phase_at, pfree(), SnapBuild::reorder, ReorderBufferSetRestartPoint(), SNAPBUILD_CONSISTENT, SnapBuildBuildSnapshot(), SnapBuildRestoreSnapshot(), SnapBuildSnapDecRefcount(), SnapBuildSnapIncRefcount(), SnapBuild::snapshot, SnapBuild::state, TransactionIdPrecedes(), SnapBuild::xcnt, SnapBuild::xcnt_space, SnapBuild::xip, SnapBuild::xmax, and SnapBuild::xmin.

Referenced by SnapBuildFindSnapshot(), and SnapBuildSerializationPoint().

◆ SnapBuildRestoreContents()

static void SnapBuildRestoreContents ( int  fd,
void dest,
Size  size,
const char path 
)
static

Definition at line 1937 of file snapbuild.c.

1938{
1939 int readBytes;
1940
1942 readBytes = read(fd, dest, size);
1944 if (readBytes != size)
1945 {
1946 int save_errno = errno;
1947
1949
1950 if (readBytes < 0)
1951 {
1952 errno = save_errno;
1953 ereport(ERROR,
1955 errmsg("could not read file \"%s\": %m", path)));
1956 }
1957 else
1958 ereport(ERROR,
1960 errmsg("could not read file \"%s\": read %d of %zu",
1961 path, readBytes, size)));
1962 }
1963}

References CloseTransientFile(), ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg, ERROR, fb(), fd(), pgstat_report_wait_end(), pgstat_report_wait_start(), and read.

Referenced by SnapBuildRestoreSnapshot().

◆ SnapBuildRestoreSnapshot()

bool SnapBuildRestoreSnapshot ( SnapBuildOnDisk ondisk,
XLogRecPtr  lsn,
MemoryContext  context,
bool  missing_ok 
)

Definition at line 1746 of file snapbuild.c.

1748{
1749 int fd;
1750 pg_crc32c checksum;
1751 Size sz;
1752 char path[MAXPGPATH];
1753
1754 sprintf(path, "%s/%X-%X.snap",
1756 LSN_FORMAT_ARGS(lsn));
1757
1759
1760 if (fd < 0)
1761 {
1762 if (missing_ok && errno == ENOENT)
1763 return false;
1764
1765 ereport(ERROR,
1767 errmsg("could not open file \"%s\": %m", path)));
1768 }
1769
1770 /* ----
1771 * Make sure the snapshot had been stored safely to disk, that's normally
1772 * cheap.
1773 * Note that we do not need PANIC here, nobody will be able to use the
1774 * slot without fsyncing, and saving it won't succeed without an fsync()
1775 * either...
1776 * ----
1777 */
1778 fsync_fname(path, false);
1780
1781 /* read statically sized portion of snapshot */
1783
1784 if (ondisk->magic != SNAPBUILD_MAGIC)
1785 ereport(ERROR,
1787 errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
1788 path, ondisk->magic, SNAPBUILD_MAGIC)));
1789
1790 if (ondisk->version != SNAPBUILD_VERSION)
1791 ereport(ERROR,
1793 errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
1794 path, ondisk->version, SNAPBUILD_VERSION)));
1795
1796 INIT_CRC32C(checksum);
1797 COMP_CRC32C(checksum,
1798 ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1800
1801 /* read SnapBuild */
1802 SnapBuildRestoreContents(fd, &ondisk->builder, sizeof(SnapBuild), path);
1803 COMP_CRC32C(checksum, &ondisk->builder, sizeof(SnapBuild));
1804
1805 /* restore committed xacts information */
1806 if (ondisk->builder.committed.xcnt > 0)
1807 {
1808 sz = sizeof(TransactionId) * ondisk->builder.committed.xcnt;
1809 ondisk->builder.committed.xip = MemoryContextAllocZero(context, sz);
1811 COMP_CRC32C(checksum, ondisk->builder.committed.xip, sz);
1812 }
1813
1814 /* restore catalog modifying xacts information */
1815 if (ondisk->builder.catchange.xcnt > 0)
1816 {
1817 sz = sizeof(TransactionId) * ondisk->builder.catchange.xcnt;
1818 ondisk->builder.catchange.xip = MemoryContextAllocZero(context, sz);
1820 COMP_CRC32C(checksum, ondisk->builder.catchange.xip, sz);
1821 }
1822
1823 if (CloseTransientFile(fd) != 0)
1824 ereport(ERROR,
1826 errmsg("could not close file \"%s\": %m", path)));
1827
1828 FIN_CRC32C(checksum);
1829
1830 /* verify checksum of what we've read */
1831 if (!EQ_CRC32C(checksum, ondisk->checksum))
1832 ereport(ERROR,
1834 errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
1835 path, checksum, ondisk->checksum)));
1836
1837 return true;
1838}

References SnapBuildOnDisk::builder, SnapBuild::catchange, SnapBuildOnDisk::checksum, CloseTransientFile(), SnapBuild::committed, COMP_CRC32C, EQ_CRC32C, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg, ERROR, fb(), fd(), FIN_CRC32C, fsync_fname(), INIT_CRC32C, LSN_FORMAT_ARGS, SnapBuildOnDisk::magic, MAXPGPATH, MemoryContextAllocZero(), OpenTransientFile(), PG_BINARY, PG_LOGICAL_SNAPSHOTS_DIR, SNAPBUILD_MAGIC, SNAPBUILD_VERSION, SnapBuildOnDiskConstantSize, SnapBuildOnDiskNotChecksummedSize, SnapBuildRestoreContents(), sprintf, SnapBuildOnDisk::version, SnapBuild::xcnt, and SnapBuild::xip.

Referenced by pg_get_logical_snapshot_info(), pg_get_logical_snapshot_meta(), and SnapBuildRestore().

◆ SnapBuildSerializationPoint()

void SnapBuildSerializationPoint ( SnapBuild builder,
XLogRecPtr  lsn 
)

Definition at line 1488 of file snapbuild.c.

1489{
1490 if (builder->state < SNAPBUILD_CONSISTENT)
1491 SnapBuildRestore(builder, lsn);
1492 else
1493 SnapBuildSerialize(builder, lsn);
1494}

References SNAPBUILD_CONSISTENT, SnapBuildRestore(), SnapBuildSerialize(), and SnapBuild::state.

Referenced by xlog_decode().

◆ SnapBuildSerialize()

static void SnapBuildSerialize ( SnapBuild builder,
XLogRecPtr  lsn 
)
static

Definition at line 1501 of file snapbuild.c.

1502{
1504 SnapBuildOnDisk *ondisk = NULL;
1507 size_t catchange_xcnt;
1508 char *ondisk_c;
1509 int fd;
1510 char tmppath[MAXPGPATH];
1511 char path[MAXPGPATH];
1512 int ret;
1513 struct stat stat_buf;
1514 Size sz;
1515
1518 builder->last_serialized_snapshot <= lsn);
1519
1520 /*
1521 * no point in serializing if we cannot continue to work immediately after
1522 * restoring the snapshot
1523 */
1524 if (builder->state < SNAPBUILD_CONSISTENT)
1525 return;
1526
1527 /* consistent snapshots have no next phase */
1529
1530 /*
1531 * We identify snapshots by the LSN they are valid for. We don't need to
1532 * include timelines in the name as each LSN maps to exactly one timeline
1533 * unless the user used pg_resetwal or similar. If a user did so, there's
1534 * no hope continuing to decode anyway.
1535 */
1536 sprintf(path, "%s/%X-%X.snap",
1538 LSN_FORMAT_ARGS(lsn));
1539
1540 /*
1541 * first check whether some other backend already has written the snapshot
1542 * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
1543 * as a valid state. Everything else is an unexpected error.
1544 */
1545 ret = stat(path, &stat_buf);
1546
1547 if (ret != 0 && errno != ENOENT)
1548 ereport(ERROR,
1550 errmsg("could not stat file \"%s\": %m", path)));
1551
1552 else if (ret == 0)
1553 {
1554 /*
1555 * somebody else has already serialized to this point, don't overwrite
1556 * but remember location, so we don't need to read old data again.
1557 *
1558 * To be sure it has been synced to disk after the rename() from the
1559 * tempfile filename to the real filename, we just repeat the fsync.
1560 * That ought to be cheap because in most scenarios it should already
1561 * be safely on disk.
1562 */
1563 fsync_fname(path, false);
1565
1566 builder->last_serialized_snapshot = lsn;
1567 goto out;
1568 }
1569
1570 /*
1571 * there is an obvious race condition here between the time we stat(2) the
1572 * file and us writing the file. But we rename the file into place
1573 * atomically and all files created need to contain the same data anyway,
1574 * so this is perfectly fine, although a bit of a resource waste. Locking
1575 * seems like pointless complication.
1576 */
1577 elog(DEBUG1, "serializing snapshot to %s", path);
1578
1579 /* to make sure only we will write to this tempfile, include pid */
1580 sprintf(tmppath, "%s/%X-%X.snap.%d.tmp",
1583
1584 /*
1585 * Unlink temporary file if it already exists, needs to have been before a
1586 * crash/error since we won't enter this function twice from within a
1587 * single decoding slot/backend and the temporary file contains the pid of
1588 * the current process.
1589 */
1590 if (unlink(tmppath) != 0 && errno != ENOENT)
1591 ereport(ERROR,
1593 errmsg("could not remove file \"%s\": %m", tmppath)));
1594
1596
1597 /* Get the catalog modifying transactions that are yet not committed */
1600
1601 needed_length = sizeof(SnapBuildOnDisk) +
1602 sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
1603
1605 ondisk = (SnapBuildOnDisk *) ondisk_c;
1606 ondisk->magic = SNAPBUILD_MAGIC;
1607 ondisk->version = SNAPBUILD_VERSION;
1608 ondisk->length = needed_length;
1609 INIT_CRC32C(ondisk->checksum);
1610 COMP_CRC32C(ondisk->checksum,
1611 ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1613 ondisk_c += sizeof(SnapBuildOnDisk);
1614
1615 memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
1616 /* NULL-ify memory-only data */
1617 ondisk->builder.context = NULL;
1618 ondisk->builder.snapshot = NULL;
1619 ondisk->builder.reorder = NULL;
1620 ondisk->builder.committed.xip = NULL;
1621 ondisk->builder.catchange.xip = NULL;
1622 /* update catchange only on disk data */
1624
1625 COMP_CRC32C(ondisk->checksum,
1626 &ondisk->builder,
1627 sizeof(SnapBuild));
1628
1629 /* copy committed xacts */
1630 if (builder->committed.xcnt > 0)
1631 {
1632 sz = sizeof(TransactionId) * builder->committed.xcnt;
1633 memcpy(ondisk_c, builder->committed.xip, sz);
1634 COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1635 ondisk_c += sz;
1636 }
1637
1638 /* copy catalog modifying xacts */
1639 if (catchange_xcnt > 0)
1640 {
1641 sz = sizeof(TransactionId) * catchange_xcnt;
1643 COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1644 ondisk_c += sz;
1645 }
1646
1647 FIN_CRC32C(ondisk->checksum);
1648
1649 /* we have valid data now, open tempfile and write it there */
1652 if (fd < 0)
1653 ereport(ERROR,
1655 errmsg("could not open file \"%s\": %m", tmppath)));
1656
1657 errno = 0;
1659 if ((write(fd, ondisk, needed_length)) != needed_length)
1660 {
1661 int save_errno = errno;
1662
1664
1665 /* if write didn't set errno, assume problem is no disk space */
1667 ereport(ERROR,
1669 errmsg("could not write to file \"%s\": %m", tmppath)));
1670 }
1672
1673 /*
1674 * fsync the file before renaming so that even if we crash after this we
1675 * have either a fully valid file or nothing.
1676 *
1677 * It's safe to just ERROR on fsync() here because we'll retry the whole
1678 * operation including the writes.
1679 *
1680 * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
1681 * some noticeable overhead since it's performed synchronously during
1682 * decoding?
1683 */
1685 if (pg_fsync(fd) != 0)
1686 {
1687 int save_errno = errno;
1688
1690 errno = save_errno;
1691 ereport(ERROR,
1693 errmsg("could not fsync file \"%s\": %m", tmppath)));
1694 }
1696
1697 if (CloseTransientFile(fd) != 0)
1698 ereport(ERROR,
1700 errmsg("could not close file \"%s\": %m", tmppath)));
1701
1703
1704 /*
1705 * We may overwrite the work from some other backend, but that's ok, our
1706 * snapshot is valid as well, we'll just have done some superfluous work.
1707 */
1708 if (rename(tmppath, path) != 0)
1709 {
1710 ereport(ERROR,
1712 errmsg("could not rename file \"%s\" to \"%s\": %m",
1713 tmppath, path)));
1714 }
1715
1716 /* make sure we persist */
1717 fsync_fname(path, false);
1719
1720 /*
1721 * Now there's no way we can lose the dumped state anymore, remember this
1722 * as a serialization point.
1723 */
1724 builder->last_serialized_snapshot = lsn;
1725
1727
1728out:
1730 builder->last_serialized_snapshot);
1731 /* be tidy */
1732 if (ondisk)
1733 pfree(ondisk);
1734 if (catchange_xip)
1736}

References Assert, SnapBuildOnDisk::builder, SnapBuild::catchange, ReorderBuffer::catchange_txns, SnapBuildOnDisk::checksum, CloseTransientFile(), SnapBuild::committed, COMP_CRC32C, SnapBuild::context, dclist_count(), DEBUG1, elog, ereport, errcode_for_file_access(), errmsg, ERROR, fb(), fd(), FIN_CRC32C, fsync_fname(), INIT_CRC32C, InvalidTransactionId, SnapBuild::last_serialized_snapshot, SnapBuildOnDisk::length, LSN_FORMAT_ARGS, SnapBuildOnDisk::magic, MAXPGPATH, memcpy(), MemoryContextSwitchTo(), MyProcPid, SnapBuild::next_phase_at, OpenTransientFile(), palloc0(), pfree(), PG_BINARY, pg_fsync(), PG_LOGICAL_SNAPSHOTS_DIR, pgstat_report_wait_end(), pgstat_report_wait_start(), SnapBuild::reorder, ReorderBufferGetCatalogChangesXacts(), ReorderBufferSetRestartPoint(), SNAPBUILD_CONSISTENT, SNAPBUILD_MAGIC, SNAPBUILD_VERSION, SnapBuildOnDiskConstantSize, SnapBuildOnDiskNotChecksummedSize, SnapBuild::snapshot, sprintf, stat, SnapBuild::state, SnapBuildOnDisk::version, write, SnapBuild::xcnt, SnapBuild::xip, and XLogRecPtrIsValid.

Referenced by SnapBuildProcessRunningXacts(), and SnapBuildSerializationPoint().

◆ SnapBuildSetTwoPhaseAt()

void SnapBuildSetTwoPhaseAt ( SnapBuild builder,
XLogRecPtr  ptr 
)

Definition at line 299 of file snapbuild.c.

300{
301 builder->two_phase_at = ptr;
302}

References SnapBuild::two_phase_at.

Referenced by CreateDecodingContext().

◆ SnapBuildSnapDecRefcount()

void SnapBuildSnapDecRefcount ( Snapshot  snap)

Definition at line 332 of file snapbuild.c.

333{
334 /* make sure we don't get passed an external snapshot */
335 Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
336
337 /* make sure nobody modified our snapshot */
338 Assert(snap->curcid == FirstCommandId);
339 Assert(!snap->suboverflowed);
340 Assert(!snap->takenDuringRecovery);
341
342 Assert(snap->regd_count == 0);
343
344 Assert(snap->active_count > 0);
345
346 /* slightly more likely, so it's checked even without casserts */
347 if (snap->copied)
348 elog(ERROR, "cannot free a copied snapshot");
349
350 snap->active_count--;
351 if (snap->active_count == 0)
353}
static void SnapBuildFreeSnapshot(Snapshot snap)
Definition snapbuild.c:256

References Assert, elog, ERROR, fb(), FirstCommandId, SnapBuildFreeSnapshot(), and SNAPSHOT_HISTORIC_MVCC.

Referenced by FreeSnapshotBuilder(), ReorderBufferCleanupTXN(), ReorderBufferFreeSnap(), ReorderBufferTransferSnapToParent(), SnapBuildCommitTxn(), and SnapBuildRestore().

◆ SnapBuildSnapIncRefcount()

static void SnapBuildSnapIncRefcount ( Snapshot  snap)
static

Definition at line 320 of file snapbuild.c.

321{
322 snap->active_count++;
323}

References fb().

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeSnapshotAndInval(), SnapBuildGetOrBuildSnapshot(), SnapBuildProcessChange(), and SnapBuildRestore().

◆ SnapBuildSnapshotExists()

bool SnapBuildSnapshotExists ( XLogRecPtr  lsn)

Definition at line 2062 of file snapbuild.c.

2063{
2064 char path[MAXPGPATH];
2065 int ret;
2066 struct stat stat_buf;
2067
2068 sprintf(path, "%s/%X-%X.snap",
2070 LSN_FORMAT_ARGS(lsn));
2071
2072 ret = stat(path, &stat_buf);
2073
2074 if (ret != 0 && errno != ENOENT)
2075 ereport(ERROR,
2077 errmsg("could not stat file \"%s\": %m", path)));
2078
2079 return ret == 0;
2080}

References ereport, errcode_for_file_access(), errmsg, ERROR, fb(), LSN_FORMAT_ARGS, MAXPGPATH, PG_LOGICAL_SNAPSHOTS_DIR, sprintf, and stat.

Referenced by update_local_synced_slot().

◆ SnapBuildWaitSnapshot()

static void SnapBuildWaitSnapshot ( xl_running_xacts running,
TransactionId  cutoff 
)
static

Definition at line 1439 of file snapbuild.c.

1440{
1441 int off;
1442
1443 for (off = 0; off < running->xcnt; off++)
1444 {
1445 TransactionId xid = running->xids[off];
1446
1447 /*
1448 * Upper layers should prevent that we ever need to wait on ourselves.
1449 * Check anyway, since failing to do so would either result in an
1450 * endless wait or an Assert() failure.
1451 */
1453 elog(ERROR, "waiting for ourselves");
1454
1455 if (TransactionIdFollows(xid, cutoff))
1456 continue;
1457
1459 }
1460
1461 /*
1462 * All transactions we needed to finish finished - try to ensure there is
1463 * another xl_running_xacts record in a timely manner, without having to
1464 * wait for bgwriter or checkpointer to log one. During recovery we can't
1465 * enforce that, so we'll have to wait.
1466 */
1467 if (!RecoveryInProgress())
1468 {
1470 }
1471}
void XactLockTableWait(TransactionId xid, Relation rel, const ItemPointerData *ctid, XLTW_Oper oper)
Definition lmgr.c:663
@ XLTW_None
Definition lmgr.h:26
XLogRecPtr LogStandbySnapshot(void)
Definition standby.c:1284
TransactionId xids[FLEXIBLE_ARRAY_MEMBER]
Definition standbydefs.h:56
bool TransactionIdIsCurrentTransactionId(TransactionId xid)
Definition xact.c:943
bool RecoveryInProgress(void)
Definition xlog.c:6834

References elog, ERROR, fb(), LogStandbySnapshot(), RecoveryInProgress(), TransactionIdFollows(), TransactionIdIsCurrentTransactionId(), XactLockTableWait(), xl_running_xacts::xcnt, xl_running_xacts::xids, and XLTW_None.

Referenced by SnapBuildFindSnapshot().

◆ SnapBuildXactNeedsSkip()

bool SnapBuildXactNeedsSkip ( SnapBuild builder,
XLogRecPtr  ptr 
)

Definition at line 308 of file snapbuild.c.

309{
310 return ptr < builder->start_decoding_at;
311}

References SnapBuild::start_decoding_at.

Referenced by AssertTXNLsnOrder(), DecodeTXNNeedSkip(), logicalmsg_decode(), and ReorderBufferCanStartStreaming().

◆ SnapBuildXidHasCatalogChanges()

static bool SnapBuildXidHasCatalogChanges ( SnapBuild builder,
TransactionId  xid,
uint32  xinfo 
)
inlinestatic

Definition at line 1110 of file snapbuild.c.

1112{
1113 if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
1114 return true;
1115
1116 /*
1117 * The transactions that have changed catalogs must have invalidation
1118 * info.
1119 */
1120 if (!(xinfo & XACT_XINFO_HAS_INVALS))
1121 return false;
1122
1123 /* Check the catchange XID array */
1124 return ((builder->catchange.xcnt > 0) &&
1125 (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt,
1126 sizeof(TransactionId), xidComparator) != NULL));
1127}
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
#define XACT_XINFO_HAS_INVALS
Definition xact.h:192

References SnapBuild::catchange, fb(), SnapBuild::reorder, ReorderBufferXidHasCatalogChanges(), XACT_XINFO_HAS_INVALS, SnapBuild::xcnt, xidComparator(), and SnapBuild::xip.

Referenced by SnapBuildCommitTxn().

Variable Documentation

◆ ExportInProgress

bool ExportInProgress = false
static

◆ SavedResourceOwnerDuringExport

ResourceOwner SavedResourceOwnerDuringExport = NULL
static