159 #define SNAPSHOT_EXPORT_DIR "pg_snapshots" 274 Assert(FirstXactSnapshot == NULL);
278 "cannot take query snapshot during a parallel operation");
333 "cannot update SecondarySnapshot during a parallel operation");
359 Snapshot OldestRegisteredSnapshot = NULL;
366 RegisteredLSN = OldestRegisteredSnapshot->
lsn;
369 if (OldestActiveSnapshot != NULL)
374 return OldestActiveSnapshot->
as_snap;
377 return OldestRegisteredSnapshot;
417 if (CatalogSnapshot &&
422 if (CatalogSnapshot == NULL)
461 CatalogSnapshot = NULL;
479 if (CatalogSnapshot &&
480 ActiveSnapshot == NULL &&
496 CurrentSnapshot->
curcid = curcid;
497 if (SecondarySnapshot)
498 SecondarySnapshot->
curcid = curcid;
512 int sourcepid,
PGPROC *sourceproc)
521 Assert(FirstXactSnapshot == NULL);
535 CurrentSnapshot->
xmin = sourcesnap->
xmin;
536 CurrentSnapshot->
xmax = sourcesnap->
xmax;
537 CurrentSnapshot->
xcnt = sourcesnap->
xcnt;
539 memcpy(CurrentSnapshot->
xip, sourcesnap->
xip,
562 if (sourceproc != NULL)
566 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
567 errmsg(
"could not import the requested snapshot"),
568 errdetail(
"The source transaction is not running anymore.")));
572 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
573 errmsg(
"could not import the requested snapshot"),
574 errdetail(
"The source process with PID %d is not running anymore.",
629 if (snapshot->
xcnt > 0)
632 memcpy(newsnap->
xip, snapshot->
xip,
692 if (snap == CurrentSnapshot || snap == SecondarySnapshot || !snap->
copied)
702 ActiveSnapshot = newactive;
703 if (OldestActiveSnapshot == NULL)
733 Assert(ActiveSnapshot != NULL);
748 elog(
ERROR,
"cannot modify commandid in active snapshot during a parallel operation");
763 newstack = ActiveSnapshot->
as_next;
773 pfree(ActiveSnapshot);
774 ActiveSnapshot = newstack;
775 if (ActiveSnapshot == NULL)
776 OldestActiveSnapshot = NULL;
788 Assert(ActiveSnapshot != NULL);
790 return ActiveSnapshot->
as_snap;
800 return ActiveSnapshot != NULL;
854 if (snapshot == NULL)
867 if (snapshot == NULL)
929 if (ActiveSnapshot != NULL)
957 for (active = ActiveSnapshot; active != NULL; active = active->
as_next)
973 while (ActiveSnapshot && ActiveSnapshot->
as_level >= level)
977 next = ActiveSnapshot->
as_next;
991 pfree(ActiveSnapshot);
993 ActiveSnapshot =
next;
994 if (ActiveSnapshot == NULL)
995 OldestActiveSnapshot = NULL;
1017 if (FirstXactSnapshot != NULL)
1023 FirstXactSnapshot = NULL;
1028 if (exportedSnapshots !=
NIL)
1043 foreach(lc, exportedSnapshots)
1055 exportedSnapshots =
NIL;
1067 elog(
WARNING,
"registered snapshots seem to remain after cleanup");
1070 for (active = ActiveSnapshot; active != NULL; active = active->
as_next)
1071 elog(
WARNING,
"snapshot %p still active", active);
1078 ActiveSnapshot = NULL;
1079 OldestActiveSnapshot = NULL;
1082 CurrentSnapshot = NULL;
1083 SecondarySnapshot = NULL;
1146 (
errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
1147 errmsg(
"cannot export a snapshot from a subtransaction")));
1175 exportedSnapshots =
lappend(exportedSnapshots, esnap);
1211 for (i = 0; i < snapshot->
xcnt; i++)
1227 for (i = 0; i < snapshot->
subxcnt; i++)
1229 for (i = 0; i < nchildren; i++)
1240 snprintf(pathtmp,
sizeof(pathtmp),
"%s.tmp", path);
1244 errmsg(
"could not create file \"%s\": %m", pathtmp)));
1246 if (fwrite(buf.
data, buf.
len, 1, f) != 1)
1249 errmsg(
"could not write to file \"%s\": %m", pathtmp)));
1256 errmsg(
"could not write to file \"%s\": %m", pathtmp)));
1262 if (rename(pathtmp, path) < 0)
1265 errmsg(
"could not rename file \"%s\" to \"%s\": %m",
1300 int prefixlen = strlen(prefix);
1303 if (strncmp(ptr, prefix, prefixlen) != 0)
1305 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1306 errmsg(
"invalid snapshot data in file \"%s\"", filename)));
1308 if (sscanf(ptr,
"%d", &val) != 1)
1310 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1311 errmsg(
"invalid snapshot data in file \"%s\"", filename)));
1312 ptr = strchr(ptr,
'\n');
1315 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1316 errmsg(
"invalid snapshot data in file \"%s\"", filename)));
1325 int prefixlen = strlen(prefix);
1328 if (strncmp(ptr, prefix, prefixlen) != 0)
1330 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1331 errmsg(
"invalid snapshot data in file \"%s\"", filename)));
1333 if (sscanf(ptr,
"%u", &val) != 1)
1335 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1336 errmsg(
"invalid snapshot data in file \"%s\"", filename)));
1337 ptr = strchr(ptr,
'\n');
1340 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1341 errmsg(
"invalid snapshot data in file \"%s\"", filename)));
1351 int prefixlen = strlen(prefix);
1353 if (strncmp(ptr, prefix, prefixlen) != 0)
1355 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1356 errmsg(
"invalid snapshot data in file \"%s\"", filename)));
1360 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1361 errmsg(
"invalid snapshot data in file \"%s\"", filename)));
1362 ptr = strchr(ptr,
'\n');
1365 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1366 errmsg(
"invalid snapshot data in file \"%s\"", filename)));
1381 struct stat stat_buf;
1402 (
errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
1403 errmsg(
"SET TRANSACTION SNAPSHOT must be called before any query")));
1411 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1412 errmsg(
"a snapshot-importing transaction must have isolation level SERIALIZABLE or REPEATABLE READ")));
1418 if (strspn(idstr,
"0123456789ABCDEF-") != strlen(idstr))
1420 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1421 errmsg(
"invalid snapshot identifier: \"%s\"", idstr)));
1429 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1430 errmsg(
"invalid snapshot identifier: \"%s\"", idstr)));
1433 if (
fstat(fileno(f), &stat_buf))
1434 elog(
ERROR,
"could not stat file \"%s\": %m", path);
1438 if (fread(filebuf, stat_buf.
st_size, 1, f) != 1)
1439 elog(
ERROR,
"could not read file \"%s\": %m", path);
1441 filebuf[stat_buf.
st_size] =
'\0';
1448 memset(&snapshot, 0,
sizeof(snapshot));
1467 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1468 errmsg(
"invalid snapshot data in file \"%s\"", path)));
1471 for (i = 0; i < xcnt; i++)
1483 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1484 errmsg(
"invalid snapshot data in file \"%s\"", path)));
1487 for (i = 0; i < xcnt; i++)
1508 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1509 errmsg(
"invalid snapshot data in file \"%s\"", path)));
1521 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1522 errmsg(
"a serializable transaction cannot import a snapshot from a non-serializable transaction")));
1525 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1526 errmsg(
"a non-read-only serializable transaction cannot import a snapshot from a read-only transaction")));
1540 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1541 errmsg(
"cannot import a snapshot from a different database")));
1554 return (exportedSnapshots !=
NIL);
1580 if (strcmp(s_de->
d_name,
".") == 0 ||
1581 strcmp(s_de->
d_name,
"..") == 0)
1586 if (unlink(
buf) != 0)
1589 errmsg(
"could not remove file \"%s\": %m",
buf)));
1643 if (now <= oldSnapshotControl->current_timestamp)
1668 return threshold_timestamp;
1709 bool in_mapping =
false;
1722 if (offset > oldSnapshotControl->
count_used - 1)
1724 offset = (oldSnapshotControl->
head_offset + offset)
1765 Assert(limit_ts != NULL && limit_xid != NULL);
1789 xlimit = latest_xmin;
1804 if (ts == threshold_timestamp)
1810 xlimit = threshold_xid;
1812 else if (ts == next_map_update_ts)
1818 xlimit = latest_xmin;
1835 xlimit = latest_xmin;
1842 *limit_xid = xlimit;
1859 bool map_update_required =
false;
1876 map_update_required =
true;
1883 if (!map_update_required)
1899 "MaintainOldSnapshotTimeMapping called with negative whenTaken = %ld",
1906 "MaintainOldSnapshotTimeMapping called with xmin = %lu",
1907 (
unsigned long) xmin);
1927 else if (ts < oldSnapshotControl->head_timestamp)
1932 "MaintainOldSnapshotTimeMapping called with old whenTaken = %ld",
1952 int distance_to_new_tail;
1953 int distance_to_current_tail;
1972 distance_to_new_tail =
1974 distance_to_current_tail =
1976 advance = distance_to_new_tail - distance_to_current_tail;
1992 for (i = 0; i < advance; i++)
2033 Assert(historic_snapshot != NULL);
2036 HistoricSnapshot = historic_snapshot;
2039 tuplecid_data = tuplecids;
2049 HistoricSnapshot = NULL;
2050 tuplecid_data = NULL;
2056 return HistoricSnapshot != NULL;
2105 serialized_snapshot.
xmin = snapshot->
xmin;
2106 serialized_snapshot.
xmax = snapshot->
xmax;
2107 serialized_snapshot.
xcnt = snapshot->
xcnt;
2113 serialized_snapshot.
lsn = snapshot->
lsn;
2121 serialized_snapshot.
subxcnt = 0;
2124 memcpy(start_address,
2128 if (snapshot->
xcnt > 0)
2139 if (serialized_snapshot.
subxcnt > 0)
2164 memcpy(&serialized_snapshot, start_address,
2177 snapshot->
xmin = serialized_snapshot.
xmin;
2178 snapshot->
xmax = serialized_snapshot.
xmax;
2179 snapshot->
xip = NULL;
2180 snapshot->
xcnt = serialized_snapshot.
xcnt;
2187 snapshot->
lsn = serialized_snapshot.
lsn;
2191 if (serialized_snapshot.
xcnt > 0)
2194 memcpy(snapshot->
xip, serialized_xids,
2199 if (serialized_snapshot.
subxcnt > 0)
2202 serialized_snapshot.
xcnt;
2203 memcpy(snapshot->
subxip, serialized_xids + serialized_snapshot.
xcnt,
2275 for (j = 0; j < snapshot->
subxcnt; j++)
2300 for (i = 0; i < snapshot->
xcnt; i++)
2339 for (j = 0; j < snapshot->
subxcnt; j++)
void ImportSnapshot(const char *idstr)
int xactGetCommittedChildren(TransactionId **ptr)
pairingheap_node * pairingheap_first(pairingheap *heap)
uint64 snapXactCompletionCount
#define InvalidXLogRecPtr
void UpdateActiveSnapshotCommandId(void)
SnapshotData CatalogSnapshotData
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
void SetSerializableTransactionSnapshot(Snapshot snapshot, VirtualTransactionId *sourcevxid, int sourcepid)
bool RelationHasSysCache(Oid relid)
static TransactionId parseXidFromText(const char *prefix, char **s, const char *filename)
TimestampTz GetOldSnapshotThresholdTimestamp(void)
bool XactHasExportedSnapshots(void)
Snapshot RestoreSnapshot(char *start_address)
MemoryContext TopTransactionContext
#define TransactionIdEquals(id1, id2)
bool TransactionIdFollows(TransactionId id1, TransactionId id2)
static int parseIntFromText(const char *prefix, char **s, const char *filename)
Snapshot RegisterSnapshot(Snapshot snapshot)
#define pairingheap_reset(h)
static void FreeSnapshot(Snapshot snapshot)
TransactionId SubTransGetTopmostTransaction(TransactionId xid)
TimestampTz GetCurrentTimestamp(void)
static void SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid, int sourcepid, PGPROC *sourceproc)
static bool OldSnapshotThresholdActive(void)
ResourceOwner CurrentResourceOwner
char * pstrdup(const char *in)
#define SpinLockInit(lock)
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
static void SnapshotResetXmin(void)
#define RelationAllowsEarlyPruning(rel)
void ResourceOwnerEnlargeSnapshots(ResourceOwner owner)
Snapshot GetCatalogSnapshot(Oid relid)
bool TransactionIdFollowsOrEquals(TransactionId id1, TransactionId id2)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
#define IsolationUsesXactSnapshot()
static Snapshot HistoricSnapshot
Snapshot GetActiveSnapshot(void)
#define pairingheap_is_empty(h)
bool ThereAreNoPriorRegisteredSnapshots(void)
int errcode(int sqlerrcode)
char * ExportSnapshot(Snapshot snapshot)
void PopActiveSnapshot(void)
void TeardownHistoricSnapshot(bool is_error)
static SnapshotData CurrentSnapshotData
static Snapshot FirstXactSnapshot
void AtSubCommit_Snapshot(int level)
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
static bool GetOldSnapshotFromTimeMapping(TimestampTz ts, TransactionId *xlimitp)
struct SnapshotData * Snapshot
bool TransactionIdLimitedForOldSnapshots(TransactionId recentXmin, Relation relation, TransactionId *limit_xid, TimestampTz *limit_ts)
LocalTransactionId localTransactionId
Snapshot GetTransactionSnapshot(void)
#define OidIsValid(objectId)
SnapshotData SnapshotAnyData
static Snapshot CurrentSnapshot
Snapshot GetNonHistoricCatalogSnapshot(Oid relid)
void SetOldSnapshotThresholdTimestamp(TimestampTz ts, TransactionId xlimit)
#define XACT_SERIALIZABLE
TransactionId TransactionXmin
static List * exportedSnapshots
#define SNAPSHOT_EXPORT_DIR
void LWLockRelease(LWLock *lock)
struct ActiveSnapshotElt ActiveSnapshotElt
#define pairingheap_container(type, membername, ptr)
void AtSubAbort_Snapshot(int level)
#define SpinLockAcquire(lock)
void pfree(void *pointer)
bool IsInParallelMode(void)
void appendStringInfo(StringInfo str, const char *fmt,...)
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
TimestampTz threshold_timestamp
#define FirstNormalTransactionId
static Snapshot CatalogSnapshot
TimestampTz GetSnapshotCurrentTimestamp(void)
static pairingheap RegisteredSnapshots
void SerializeSnapshot(Snapshot snapshot, char *start_address)
slock_t mutex_latest_xmin
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
TimestampTz next_map_update
static ActiveSnapshotElt * OldestActiveSnapshot
void PushCopiedSnapshot(Snapshot snapshot)
TimestampTz current_timestamp
TransactionId threshold_xid
void appendStringInfoString(StringInfo str, const char *s)
struct SerializedSnapshotData SerializedSnapshotData
static void parseVxidFromText(const char *prefix, char **s, const char *filename, VirtualTransactionId *vxid)
void PushActiveSnapshot(Snapshot snap)
void ResourceOwnerForgetSnapshot(ResourceOwner owner, Snapshot snapshot)
int errdetail(const char *fmt,...)
int errcode_for_file_access(void)
struct SnapshotData SnapshotData
FILE * AllocateFile(const char *name, const char *mode)
void SnapshotTooOldMagicForTest(void)
#define InvalidTransactionId
struct ActiveSnapshotElt * as_next
SnapshotType snapshot_type
Datum pg_export_snapshot(PG_FUNCTION_ARGS)
DIR * AllocateDir(const char *dirname)
bool ActiveSnapshotSet(void)
TransactionId GetTopTransactionIdIfAny(void)
volatile OldSnapshotControlData * oldSnapshotControl
#define pairingheap_const_container(type, membername, ptr)
void InvalidateCatalogSnapshot(void)
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
static ActiveSnapshotElt * ActiveSnapshot
void UnregisterSnapshot(Snapshot snapshot)
List * lappend(List *list, void *datum)
static HTAB * tuplecid_data
void initStringInfo(StringInfo str)
#define XLogRecPtrIsInvalid(r)
#define VirtualTransactionIdIsValid(vxid)
#define SpinLockRelease(lock)
Size EstimateSnapshotSpace(Snapshot snap)
Size mul_size(Size s1, Size s2)
Size add_size(Size s1, Size s2)
Snapshot GetOldestSnapshot(void)
static SnapshotData SecondarySnapshotData
static Snapshot CopySnapshot(Snapshot snapshot)
#define ereport(elevel,...)
int GetMaxSnapshotXidCount(void)
Snapshot RegisterSnapshotOnOwner(Snapshot snapshot, ResourceOwner owner)
int GetCurrentTransactionNestLevel(void)
#define PG_RETURN_TEXT_P(x)
text * cstring_to_text(const char *s)
TransactionId latest_xmin
#define Assert(condition)
bool RelationInvalidatesSnapshotsOnly(Oid relid)
Snapshot GetSerializableTransactionSnapshot(Snapshot snapshot)
void InvalidateCatalogSnapshotConditionally(void)
TransactionId xid_by_minute[FLEXIBLE_ARRAY_MEMBER]
SnapshotData SnapshotSelfData
Snapshot GetSnapshotData(Snapshot snapshot)
static int list_length(const List *l)
void UnregisterSnapshotFromOwner(Snapshot snapshot, ResourceOwner owner)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
static TimestampTz AlignTimestampToMinuteBoundary(TimestampTz ts)
bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc)
Snapshot GetLatestSnapshot(void)
void SnapshotSetCommandId(CommandId curcid)
bool IsSubTransaction(void)
HTAB * HistoricSnapshotGetTupleCids(void)
bool HistoricSnapshotActive(void)
int errmsg(const char *fmt,...)
void ResourceOwnerRememberSnapshot(ResourceOwner owner, Snapshot snapshot)
#define IsolationIsSerializable()
void * MemoryContextAlloc(MemoryContext context, Size size)
void DeleteAllExportedSnapshotFiles(void)
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
static int xmin_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
int old_snapshot_threshold
TimestampTz head_timestamp
static Snapshot SecondarySnapshot
int GetMaxSnapshotSubxidCount(void)
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
CommandId GetCurrentCommandId(bool used)
#define TransactionIdIsValid(xid)
void MaintainOldSnapshotTimeMapping(TimestampTz whenTaken, TransactionId xmin)
#define OLD_SNAPSHOT_TIME_MAP_ENTRIES
#define TransactionIdIsNormal(xid)
bool ProcArrayInstallImportedXmin(TransactionId xmin, VirtualTransactionId *sourcevxid)
Size SnapMgrShmemSize(void)
struct ExportedSnapshot ExportedSnapshot
Datum now(PG_FUNCTION_ARGS)
#define offsetof(type, field)
void AtEOXact_Snapshot(bool isCommit, bool resetXmin)
#define pairingheap_is_singular(h)