215 #define SNAPSHOT_EXPORT_DIR "pg_snapshots" 330 Assert(FirstXactSnapshot == NULL);
334 "cannot take query snapshot during a parallel operation");
389 "cannot update SecondarySnapshot during a parallel operation");
415 Snapshot OldestRegisteredSnapshot = NULL;
422 RegisteredLSN = OldestRegisteredSnapshot->
lsn;
425 if (OldestActiveSnapshot != NULL)
430 return OldestActiveSnapshot->
as_snap;
433 return OldestRegisteredSnapshot;
473 if (CatalogSnapshot &&
478 if (CatalogSnapshot == NULL)
517 CatalogSnapshot = NULL;
535 if (CatalogSnapshot &&
536 ActiveSnapshot == NULL &&
552 CurrentSnapshot->
curcid = curcid;
553 if (SecondarySnapshot)
554 SecondarySnapshot->
curcid = curcid;
568 int sourcepid,
PGPROC *sourceproc)
577 Assert(FirstXactSnapshot == NULL);
593 CurrentSnapshot->
xmin = sourcesnap->
xmin;
594 CurrentSnapshot->
xmax = sourcesnap->
xmax;
595 CurrentSnapshot->
xcnt = sourcesnap->
xcnt;
597 memcpy(CurrentSnapshot->
xip, sourcesnap->
xip,
618 if (sourceproc != NULL)
622 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
623 errmsg(
"could not import the requested snapshot"),
624 errdetail(
"The source transaction is not running anymore.")));
628 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
629 errmsg(
"could not import the requested snapshot"),
630 errdetail(
"The source process with PID %d is not running anymore.",
684 if (snapshot->
xcnt > 0)
687 memcpy(newsnap->
xip, snapshot->
xip,
747 if (snap == CurrentSnapshot || snap == SecondarySnapshot || !snap->
copied)
757 ActiveSnapshot = newactive;
758 if (OldestActiveSnapshot == NULL)
788 Assert(ActiveSnapshot != NULL);
803 elog(
ERROR,
"cannot modify commandid in active snapshot during a parallel operation");
818 newstack = ActiveSnapshot->
as_next;
828 pfree(ActiveSnapshot);
829 ActiveSnapshot = newstack;
830 if (ActiveSnapshot == NULL)
831 OldestActiveSnapshot = NULL;
843 Assert(ActiveSnapshot != NULL);
845 return ActiveSnapshot->
as_snap;
855 return ActiveSnapshot != NULL;
909 if (snapshot == NULL)
922 if (snapshot == NULL)
982 epoch = nextxid_epoch - 1;
984 epoch = nextxid_epoch;
1014 if (ActiveSnapshot != NULL)
1042 for (active = ActiveSnapshot; active != NULL; active = active->
as_next)
1058 while (ActiveSnapshot && ActiveSnapshot->
as_level >= level)
1062 next = ActiveSnapshot->
as_next;
1076 pfree(ActiveSnapshot);
1078 ActiveSnapshot =
next;
1079 if (ActiveSnapshot == NULL)
1080 OldestActiveSnapshot = NULL;
1102 if (FirstXactSnapshot != NULL)
1108 FirstXactSnapshot = NULL;
1113 if (exportedSnapshots !=
NIL)
1128 foreach(lc, exportedSnapshots)
1140 exportedSnapshots =
NIL;
1152 elog(
WARNING,
"registered snapshots seem to remain after cleanup");
1155 for (active = ActiveSnapshot; active != NULL; active = active->
as_next)
1156 elog(
WARNING,
"snapshot %p still active", active);
1163 ActiveSnapshot = NULL;
1164 OldestActiveSnapshot = NULL;
1167 CurrentSnapshot = NULL;
1168 SecondarySnapshot = NULL;
1231 (
errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
1232 errmsg(
"cannot export a snapshot from a subtransaction")));
1260 exportedSnapshots =
lappend(exportedSnapshots, esnap);
1296 for (i = 0; i < snapshot->
xcnt; i++)
1312 for (i = 0; i < snapshot->
subxcnt; i++)
1314 for (i = 0; i < nchildren; i++)
1325 snprintf(pathtmp,
sizeof(pathtmp),
"%s.tmp", path);
1329 errmsg(
"could not create file \"%s\": %m", pathtmp)));
1331 if (fwrite(buf.
data, buf.
len, 1, f) != 1)
1334 errmsg(
"could not write to file \"%s\": %m", pathtmp)));
1341 errmsg(
"could not write to file \"%s\": %m", pathtmp)));
1347 if (rename(pathtmp, path) < 0)
1350 errmsg(
"could not rename file \"%s\" to \"%s\": %m",
1385 int prefixlen = strlen(prefix);
1388 if (strncmp(ptr, prefix, prefixlen) != 0)
1390 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1391 errmsg(
"invalid snapshot data in file \"%s\"", filename)));
1393 if (sscanf(ptr,
"%d", &val) != 1)
1395 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1396 errmsg(
"invalid snapshot data in file \"%s\"", filename)));
1397 ptr = strchr(ptr,
'\n');
1400 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1401 errmsg(
"invalid snapshot data in file \"%s\"", filename)));
1410 int prefixlen = strlen(prefix);
1413 if (strncmp(ptr, prefix, prefixlen) != 0)
1415 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1416 errmsg(
"invalid snapshot data in file \"%s\"", filename)));
1418 if (sscanf(ptr,
"%u", &val) != 1)
1420 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1421 errmsg(
"invalid snapshot data in file \"%s\"", filename)));
1422 ptr = strchr(ptr,
'\n');
1425 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1426 errmsg(
"invalid snapshot data in file \"%s\"", filename)));
1436 int prefixlen = strlen(prefix);
1438 if (strncmp(ptr, prefix, prefixlen) != 0)
1440 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1441 errmsg(
"invalid snapshot data in file \"%s\"", filename)));
1445 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1446 errmsg(
"invalid snapshot data in file \"%s\"", filename)));
1447 ptr = strchr(ptr,
'\n');
1450 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1451 errmsg(
"invalid snapshot data in file \"%s\"", filename)));
1466 struct stat stat_buf;
1487 (
errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
1488 errmsg(
"SET TRANSACTION SNAPSHOT must be called before any query")));
1496 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1497 errmsg(
"a snapshot-importing transaction must have isolation level SERIALIZABLE or REPEATABLE READ")));
1503 if (strspn(idstr,
"0123456789ABCDEF-") != strlen(idstr))
1505 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1506 errmsg(
"invalid snapshot identifier: \"%s\"", idstr)));
1514 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1515 errmsg(
"invalid snapshot identifier: \"%s\"", idstr)));
1518 if (fstat(fileno(f), &stat_buf))
1519 elog(
ERROR,
"could not stat file \"%s\": %m", path);
1522 filebuf = (
char *)
palloc(stat_buf.st_size + 1);
1523 if (fread(filebuf, stat_buf.st_size, 1, f) != 1)
1524 elog(
ERROR,
"could not read file \"%s\": %m", path);
1526 filebuf[stat_buf.st_size] =
'\0';
1533 memset(&snapshot, 0,
sizeof(snapshot));
1552 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1553 errmsg(
"invalid snapshot data in file \"%s\"", path)));
1556 for (i = 0; i < xcnt; i++)
1568 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1569 errmsg(
"invalid snapshot data in file \"%s\"", path)));
1572 for (i = 0; i < xcnt; i++)
1593 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1594 errmsg(
"invalid snapshot data in file \"%s\"", path)));
1606 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1607 errmsg(
"a serializable transaction cannot import a snapshot from a non-serializable transaction")));
1610 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1611 errmsg(
"a non-read-only serializable transaction cannot import a snapshot from a read-only transaction")));
1625 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1626 errmsg(
"cannot import a snapshot from a different database")));
1639 return (exportedSnapshots !=
NIL);
1665 if (strcmp(s_de->
d_name,
".") == 0 ||
1666 strcmp(s_de->
d_name,
"..") == 0)
1671 if (unlink(
buf) != 0)
1674 errmsg(
"could not remove file \"%s\": %m",
buf)));
1786 bool same_ts_as_threshold =
false;
1821 same_ts_as_threshold =
true;
1825 if (!same_ts_as_threshold)
1827 if (ts == update_ts)
1844 if (offset > oldSnapshotControl->
count_used - 1)
1846 offset = (oldSnapshotControl->
head_offset + offset)
1887 bool map_update_required =
false;
1904 map_update_required =
true;
1911 if (!map_update_required)
1927 "MaintainOldSnapshotTimeMapping called with negative whenTaken = %ld",
1934 "MaintainOldSnapshotTimeMapping called with xmin = %lu",
1935 (
unsigned long) xmin);
1960 "MaintainOldSnapshotTimeMapping called with old whenTaken = %ld",
1997 for (i = 0; i < advance; i++)
2037 Assert(historic_snapshot != NULL);
2040 HistoricSnapshot = historic_snapshot;
2043 tuplecid_data = tuplecids;
2053 HistoricSnapshot = NULL;
2054 tuplecid_data = NULL;
2060 return HistoricSnapshot != NULL;
2109 serialized_snapshot.
xmin = snapshot->
xmin;
2110 serialized_snapshot.
xmax = snapshot->
xmax;
2111 serialized_snapshot.
xcnt = snapshot->
xcnt;
2117 serialized_snapshot.
lsn = snapshot->
lsn;
2125 serialized_snapshot.
subxcnt = 0;
2128 memcpy(start_address,
2132 if (snapshot->
xcnt > 0)
2143 if (serialized_snapshot.
subxcnt > 0)
2168 memcpy(&serialized_snapshot, start_address,
2181 snapshot->
xmin = serialized_snapshot.
xmin;
2182 snapshot->
xmax = serialized_snapshot.
xmax;
2183 snapshot->
xip = NULL;
2184 snapshot->
xcnt = serialized_snapshot.
xcnt;
2191 snapshot->
lsn = serialized_snapshot.
lsn;
2194 if (serialized_snapshot.
xcnt > 0)
2197 memcpy(snapshot->
xip, serialized_xids,
2202 if (serialized_snapshot.
subxcnt > 0)
2205 serialized_snapshot.
xcnt;
2206 memcpy(snapshot->
subxip, serialized_xids + serialized_snapshot.
xcnt,
2278 for (j = 0; j < snapshot->
subxcnt; j++)
2303 for (i = 0; i < snapshot->
xcnt; i++)
2342 for (j = 0; j < snapshot->
subxcnt; j++)
void ImportSnapshot(const char *idstr)
int xactGetCommittedChildren(TransactionId **ptr)
pairingheap_node * pairingheap_first(pairingheap *heap)
#define InvalidXLogRecPtr
void UpdateActiveSnapshotCommandId(void)
SnapshotData CatalogSnapshotData
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
void SetSerializableTransactionSnapshot(Snapshot snapshot, VirtualTransactionId *sourcevxid, int sourcepid)
bool RelationHasSysCache(Oid relid)
static TransactionId parseXidFromText(const char *prefix, char **s, const char *filename)
TimestampTz GetOldSnapshotThresholdTimestamp(void)
bool XactHasExportedSnapshots(void)
Snapshot RestoreSnapshot(char *start_address)
MemoryContext TopTransactionContext
#define TransactionIdEquals(id1, id2)
bool TransactionIdFollows(TransactionId id1, TransactionId id2)
static int parseIntFromText(const char *prefix, char **s, const char *filename)
Snapshot RegisterSnapshot(Snapshot snapshot)
#define pairingheap_reset(h)
static void FreeSnapshot(Snapshot snapshot)
TransactionId SubTransGetTopmostTransaction(TransactionId xid)
TimestampTz GetCurrentTimestamp(void)
static void SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid, int sourcepid, PGPROC *sourceproc)
ResourceOwner CurrentResourceOwner
char * pstrdup(const char *in)
#define SpinLockInit(lock)
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
static void SnapshotResetXmin(void)
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
#define RelationAllowsEarlyPruning(rel)
void ResourceOwnerEnlargeSnapshots(ResourceOwner owner)
TransactionId TransactionIdLimitedForOldSnapshots(TransactionId recentXmin, Relation relation)
Snapshot GetCatalogSnapshot(Oid relid)
bool TransactionIdFollowsOrEquals(TransactionId id1, TransactionId id2)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
#define IsolationUsesXactSnapshot()
static Snapshot HistoricSnapshot
Snapshot GetActiveSnapshot(void)
#define pairingheap_is_empty(h)
bool ThereAreNoPriorRegisteredSnapshots(void)
int errcode(int sqlerrcode)
char * ExportSnapshot(Snapshot snapshot)
void PopActiveSnapshot(void)
void TeardownHistoricSnapshot(bool is_error)
static SnapshotData CurrentSnapshotData
static Snapshot FirstXactSnapshot
void AtSubCommit_Snapshot(int level)
struct SnapshotData * Snapshot
LocalTransactionId localTransactionId
Snapshot GetTransactionSnapshot(void)
#define OidIsValid(objectId)
SnapshotData SnapshotAnyData
static Snapshot CurrentSnapshot
Snapshot GetNonHistoricCatalogSnapshot(Oid relid)
#define XACT_SERIALIZABLE
#define XidFromFullTransactionId(x)
TransactionId TransactionXmin
static List * exportedSnapshots
#define SNAPSHOT_EXPORT_DIR
void LWLockRelease(LWLock *lock)
struct ActiveSnapshotElt ActiveSnapshotElt
#define pairingheap_container(type, membername, ptr)
void AtSubAbort_Snapshot(int level)
#define SpinLockAcquire(lock)
void pfree(void *pointer)
bool IsInParallelMode(void)
void appendStringInfo(StringInfo str, const char *fmt,...)
TimestampTz threshold_timestamp
#define FirstNormalTransactionId
static Snapshot CatalogSnapshot
TimestampTz GetSnapshotCurrentTimestamp(void)
static pairingheap RegisteredSnapshots
void SerializeSnapshot(Snapshot snapshot, char *start_address)
slock_t mutex_latest_xmin
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
TimestampTz next_map_update
static ActiveSnapshotElt * OldestActiveSnapshot
void PushCopiedSnapshot(Snapshot snapshot)
TimestampTz current_timestamp
TransactionId threshold_xid
void appendStringInfoString(StringInfo str, const char *s)
struct SerializedSnapshotData SerializedSnapshotData
static void SetOldSnapshotThresholdTimestamp(TimestampTz ts, TransactionId xlimit)
static void parseVxidFromText(const char *prefix, char **s, const char *filename, VirtualTransactionId *vxid)
void PushActiveSnapshot(Snapshot snap)
TransactionId RecentGlobalXmin
void ResourceOwnerForgetSnapshot(ResourceOwner owner, Snapshot snapshot)
int errdetail(const char *fmt,...)
int errcode_for_file_access(void)
struct SnapshotData SnapshotData
FILE * AllocateFile(const char *name, const char *mode)
#define InvalidTransactionId
struct ActiveSnapshotElt * as_next
SnapshotType snapshot_type
Datum pg_export_snapshot(PG_FUNCTION_ARGS)
DIR * AllocateDir(const char *dirname)
bool ActiveSnapshotSet(void)
TransactionId GetTopTransactionIdIfAny(void)
static volatile OldSnapshotControlData * oldSnapshotControl
TransactionId RecentGlobalDataXmin
#define ereport(elevel, rest)
#define pairingheap_const_container(type, membername, ptr)
void InvalidateCatalogSnapshot(void)
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
static ActiveSnapshotElt * ActiveSnapshot
FullTransactionId ReadNextFullTransactionId(void)
void UnregisterSnapshot(Snapshot snapshot)
List * lappend(List *list, void *datum)
static HTAB * tuplecid_data
void initStringInfo(StringInfo str)
#define XLogRecPtrIsInvalid(r)
#define VirtualTransactionIdIsValid(vxid)
#define SpinLockRelease(lock)
Size EstimateSnapshotSpace(Snapshot snap)
Size mul_size(Size s1, Size s2)
Size add_size(Size s1, Size s2)
Snapshot GetOldestSnapshot(void)
static SnapshotData SecondarySnapshotData
#define EpochFromFullTransactionId(x)
static Snapshot CopySnapshot(Snapshot snapshot)
int GetMaxSnapshotXidCount(void)
Snapshot RegisterSnapshotOnOwner(Snapshot snapshot, ResourceOwner owner)
int GetCurrentTransactionNestLevel(void)
#define PG_RETURN_TEXT_P(x)
text * cstring_to_text(const char *s)
TransactionId latest_xmin
#define Assert(condition)
bool RelationInvalidatesSnapshotsOnly(Oid relid)
Snapshot GetSerializableTransactionSnapshot(Snapshot snapshot)
void InvalidateCatalogSnapshotConditionally(void)
TransactionId xid_by_minute[FLEXIBLE_ARRAY_MEMBER]
SnapshotData SnapshotSelfData
#define NormalTransactionIdFollows(id1, id2)
Snapshot GetSnapshotData(Snapshot snapshot)
static int list_length(const List *l)
void UnregisterSnapshotFromOwner(Snapshot snapshot, ResourceOwner owner)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
static FullTransactionId FullTransactionIdFromEpochAndXid(uint32 epoch, TransactionId xid)
static TimestampTz AlignTimestampToMinuteBoundary(TimestampTz ts)
bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc)
Snapshot GetLatestSnapshot(void)
void SnapshotSetCommandId(CommandId curcid)
bool IsSubTransaction(void)
HTAB * HistoricSnapshotGetTupleCids(void)
bool HistoricSnapshotActive(void)
int errmsg(const char *fmt,...)
void ResourceOwnerRememberSnapshot(ResourceOwner owner, Snapshot snapshot)
#define IsolationIsSerializable()
void * MemoryContextAlloc(MemoryContext context, Size size)
void DeleteAllExportedSnapshotFiles(void)
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
static int xmin_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
int old_snapshot_threshold
TimestampTz head_timestamp
static const unsigned __int64 epoch
static Snapshot SecondarySnapshot
struct OldSnapshotControlData OldSnapshotControlData
int GetMaxSnapshotSubxidCount(void)
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
CommandId GetCurrentCommandId(bool used)
#define TransactionIdIsValid(xid)
void MaintainOldSnapshotTimeMapping(TimestampTz whenTaken, TransactionId xmin)
#define OLD_SNAPSHOT_TIME_MAP_ENTRIES
#define TransactionIdIsNormal(xid)
bool ProcArrayInstallImportedXmin(TransactionId xmin, VirtualTransactionId *sourcevxid)
Size SnapMgrShmemSize(void)
struct ExportedSnapshot ExportedSnapshot
Datum now(PG_FUNCTION_ARGS)
FullTransactionId GetFullRecentGlobalXmin(void)
#define offsetof(type, field)
void AtEOXact_Snapshot(bool isCommit, bool resetXmin)
#define pairingheap_is_singular(h)