159 #define SNAPSHOT_EXPORT_DIR "pg_snapshots"
278 "cannot take query snapshot during a parallel operation");
333 "cannot update SecondarySnapshot during a parallel operation");
359 Snapshot OldestRegisteredSnapshot = NULL;
366 RegisteredLSN = OldestRegisteredSnapshot->
lsn;
377 return OldestRegisteredSnapshot;
512 int sourcepid,
PGPROC *sourceproc)
539 if (sourcesnap->
xcnt > 0)
564 if (sourceproc != NULL)
568 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
569 errmsg(
"could not import the requested snapshot"),
570 errdetail(
"The source transaction is not running anymore.")));
574 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
575 errmsg(
"could not import the requested snapshot"),
576 errdetail(
"The source process with PID %d is not running anymore.",
631 if (snapshot->
xcnt > 0)
634 memcpy(newsnap->
xip, snapshot->
xip,
765 elog(
ERROR,
"cannot modify commandid in active snapshot during a parallel operation");
871 if (snapshot == NULL)
884 if (snapshot == NULL)
1084 elog(
WARNING,
"registered snapshots seem to remain after cleanup");
1088 elog(
WARNING,
"snapshot %p still active", active);
1163 (
errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
1164 errmsg(
"cannot export a snapshot from a subtransaction")));
1228 for (
i = 0;
i < snapshot->
xcnt;
i++)
1246 for (
i = 0;
i < nchildren;
i++)
1257 snprintf(pathtmp,
sizeof(pathtmp),
"%s.tmp", path);
1261 errmsg(
"could not create file \"%s\": %m", pathtmp)));
1263 if (fwrite(
buf.data,
buf.len, 1, f) != 1)
1266 errmsg(
"could not write to file \"%s\": %m", pathtmp)));
1273 errmsg(
"could not write to file \"%s\": %m", pathtmp)));
1279 if (rename(pathtmp, path) < 0)
1282 errmsg(
"could not rename file \"%s\" to \"%s\": %m",
1317 int prefixlen = strlen(prefix);
1320 if (strncmp(ptr, prefix, prefixlen) != 0)
1322 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1325 if (sscanf(ptr,
"%d", &
val) != 1)
1327 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1329 ptr = strchr(ptr,
'\n');
1332 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1342 int prefixlen = strlen(prefix);
1345 if (strncmp(ptr, prefix, prefixlen) != 0)
1347 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1350 if (sscanf(ptr,
"%u", &
val) != 1)
1352 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1354 ptr = strchr(ptr,
'\n');
1357 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1368 int prefixlen = strlen(prefix);
1370 if (strncmp(ptr, prefix, prefixlen) != 0)
1372 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1377 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1379 ptr = strchr(ptr,
'\n');
1382 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1398 struct stat stat_buf;
1419 (
errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
1420 errmsg(
"SET TRANSACTION SNAPSHOT must be called before any query")));
1428 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1429 errmsg(
"a snapshot-importing transaction must have isolation level SERIALIZABLE or REPEATABLE READ")));
1435 if (strspn(idstr,
"0123456789ABCDEF-") != strlen(idstr))
1437 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1438 errmsg(
"invalid snapshot identifier: \"%s\"", idstr)));
1446 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1447 errmsg(
"invalid snapshot identifier: \"%s\"", idstr)));
1450 if (
fstat(fileno(f), &stat_buf))
1451 elog(
ERROR,
"could not stat file \"%s\": %m", path);
1455 if (fread(filebuf, stat_buf.
st_size, 1, f) != 1)
1456 elog(
ERROR,
"could not read file \"%s\": %m", path);
1458 filebuf[stat_buf.
st_size] =
'\0';
1465 memset(&snapshot, 0,
sizeof(snapshot));
1484 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1485 errmsg(
"invalid snapshot data in file \"%s\"", path)));
1488 for (
i = 0;
i < xcnt;
i++)
1500 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1501 errmsg(
"invalid snapshot data in file \"%s\"", path)));
1504 for (
i = 0;
i < xcnt;
i++)
1525 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1526 errmsg(
"invalid snapshot data in file \"%s\"", path)));
1538 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1539 errmsg(
"a serializable transaction cannot import a snapshot from a non-serializable transaction")));
1542 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1543 errmsg(
"a non-read-only serializable transaction cannot import a snapshot from a read-only transaction")));
1557 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1558 errmsg(
"cannot import a snapshot from a different database")));
1597 if (strcmp(s_de->
d_name,
".") == 0 ||
1598 strcmp(s_de->
d_name,
"..") == 0)
1603 if (unlink(
buf) != 0)
1606 errmsg(
"could not remove file \"%s\": %m",
buf)));
1686 if (now <= oldSnapshotControl->current_timestamp)
1711 return threshold_timestamp;
1752 bool in_mapping =
false;
1808 Assert(limit_ts != NULL && limit_xid != NULL);
1836 xlimit = latest_xmin;
1851 if (ts == threshold_timestamp)
1857 xlimit = threshold_xid;
1859 else if (ts == next_map_update_ts)
1865 xlimit = latest_xmin;
1882 xlimit = latest_xmin;
1889 *limit_xid = xlimit;
1906 bool map_update_required =
false;
1923 map_update_required =
true;
1930 if (!map_update_required)
1946 "MaintainOldSnapshotTimeMapping called with negative whenTaken = %ld",
1953 "MaintainOldSnapshotTimeMapping called with xmin = %lu",
1954 (
unsigned long) xmin);
1974 else if (ts < oldSnapshotControl->head_timestamp)
1979 "MaintainOldSnapshotTimeMapping called with old whenTaken = %ld",
1999 int distance_to_new_tail;
2000 int distance_to_current_tail;
2019 distance_to_new_tail =
2021 distance_to_current_tail =
2023 advance = distance_to_new_tail - distance_to_current_tail;
2039 for (
i = 0;
i < advance;
i++)
2080 Assert(historic_snapshot != NULL);
2152 serialized_snapshot.
xmin = snapshot->
xmin;
2153 serialized_snapshot.
xmax = snapshot->
xmax;
2154 serialized_snapshot.
xcnt = snapshot->
xcnt;
2160 serialized_snapshot.
lsn = snapshot->
lsn;
2168 serialized_snapshot.
subxcnt = 0;
2171 memcpy(start_address,
2175 if (snapshot->
xcnt > 0)
2186 if (serialized_snapshot.
subxcnt > 0)
2211 memcpy(&serialized_snapshot, start_address,
2224 snapshot->
xmin = serialized_snapshot.
xmin;
2225 snapshot->
xmax = serialized_snapshot.
xmax;
2226 snapshot->
xip = NULL;
2227 snapshot->
xcnt = serialized_snapshot.
xcnt;
2234 snapshot->
lsn = serialized_snapshot.
lsn;
2238 if (serialized_snapshot.
xcnt > 0)
2241 memcpy(snapshot->
xip, serialized_xids,
2246 if (serialized_snapshot.
subxcnt > 0)
2249 serialized_snapshot.
xcnt;
2250 memcpy(snapshot->
subxip, serialized_xids + serialized_snapshot.
xcnt,
2347 for (
i = 0;
i < snapshot->
xcnt;
i++)
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
#define offsetof(type, field)
#define OidIsValid(objectId)
elog(ERROR, "%s: %s", p2, msg)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
FILE * AllocateFile(const char *name, const char *mode)
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
DIR * AllocateDir(const char *dirname)
#define PG_RETURN_TEXT_P(x)
Assert(fmt[strlen(fmt) - 1] !='\n')
List * lappend(List *list, void *datum)
#define VirtualTransactionIdIsValid(vxid)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
MemoryContext TopTransactionContext
char * pstrdup(const char *in)
void pfree(void *pointer)
void * MemoryContextAlloc(MemoryContext context, Size size)
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
pairingheap_node * pairingheap_first(pairingheap *heap)
#define pairingheap_is_empty(h)
#define pairingheap_is_singular(h)
#define pairingheap_container(type, membername, ptr)
#define pairingheap_const_container(type, membername, ptr)
#define pairingheap_reset(h)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
static int list_length(const List *l)
void SetSerializableTransactionSnapshot(Snapshot snapshot, VirtualTransactionId *sourcevxid, int sourcepid)
Snapshot GetSerializableTransactionSnapshot(Snapshot snapshot)
int GetMaxSnapshotSubxidCount(void)
Snapshot GetSnapshotData(Snapshot snapshot)
int GetMaxSnapshotXidCount(void)
bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc)
bool ProcArrayInstallImportedXmin(TransactionId xmin, VirtualTransactionId *sourcevxid)
#define RelationNeedsWAL(relation)
ResourceOwner CurrentResourceOwner
void ResourceOwnerRememberSnapshot(ResourceOwner owner, Snapshot snapshot)
void ResourceOwnerForgetSnapshot(ResourceOwner owner, Snapshot snapshot)
void ResourceOwnerEnlargeSnapshots(ResourceOwner owner)
Size add_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Size mul_size(Size s1, Size s2)
static Snapshot HistoricSnapshot
void SnapshotTooOldMagicForTest(void)
static Snapshot FirstXactSnapshot
void MaintainOldSnapshotTimeMapping(TimestampTz whenTaken, TransactionId xmin)
SnapshotData CatalogSnapshotData
void UnregisterSnapshotFromOwner(Snapshot snapshot, ResourceOwner owner)
static void SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid, int sourcepid, PGPROC *sourceproc)
TimestampTz GetSnapshotCurrentTimestamp(void)
void AtSubAbort_Snapshot(int level)
void SerializeSnapshot(Snapshot snapshot, char *start_address)
Size EstimateSnapshotSpace(Snapshot snap)
SnapshotData SnapshotSelfData
void AtEOXact_Snapshot(bool isCommit, bool resetXmin)
struct ActiveSnapshotElt ActiveSnapshotElt
static Snapshot CurrentSnapshot
static Snapshot SecondarySnapshot
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
static List * exportedSnapshots
static pairingheap RegisteredSnapshots
static TimestampTz AlignTimestampToMinuteBoundary(TimestampTz ts)
Snapshot GetTransactionSnapshot(void)
Snapshot GetLatestSnapshot(void)
void TeardownHistoricSnapshot(bool is_error)
Snapshot GetCatalogSnapshot(Oid relid)
void UnregisterSnapshot(Snapshot snapshot)
static Snapshot CopySnapshot(Snapshot snapshot)
static ActiveSnapshotElt * OldestActiveSnapshot
Snapshot RestoreSnapshot(char *start_address)
static bool GetOldSnapshotFromTimeMapping(TimestampTz ts, TransactionId *xlimitp)
HTAB * HistoricSnapshotGetTupleCids(void)
void AtSubCommit_Snapshot(int level)
void UpdateActiveSnapshotCommandId(void)
static void SnapshotResetXmin(void)
static int parseIntFromText(const char *prefix, char **s, const char *filename)
static SnapshotData SecondarySnapshotData
char * ExportSnapshot(Snapshot snapshot)
static int xmin_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
TransactionId TransactionXmin
SnapshotData SnapshotAnyData
bool HistoricSnapshotActive(void)
void ImportSnapshot(const char *idstr)
bool ActiveSnapshotSet(void)
Snapshot RegisterSnapshot(Snapshot snapshot)
bool XactHasExportedSnapshots(void)
void DeleteAllExportedSnapshotFiles(void)
static void parseVxidFromText(const char *prefix, char **s, const char *filename, VirtualTransactionId *vxid)
static void FreeSnapshot(Snapshot snapshot)
#define SNAPSHOT_EXPORT_DIR
bool HaveRegisteredOrActiveSnapshot(void)
void InvalidateCatalogSnapshotConditionally(void)
static SnapshotData CurrentSnapshotData
bool ThereAreNoPriorRegisteredSnapshots(void)
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
bool TransactionIdLimitedForOldSnapshots(TransactionId recentXmin, Relation relation, TransactionId *limit_xid, TimestampTz *limit_ts)
void SnapshotSetCommandId(CommandId curcid)
void PushActiveSnapshotWithLevel(Snapshot snap, int snap_level)
struct SerializedSnapshotData SerializedSnapshotData
void PopActiveSnapshot(void)
void PushCopiedSnapshot(Snapshot snapshot)
void PushActiveSnapshot(Snapshot snap)
static ActiveSnapshotElt * ActiveSnapshot
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
TimestampTz GetOldSnapshotThresholdTimestamp(void)
int old_snapshot_threshold
static HTAB * tuplecid_data
Snapshot RegisterSnapshotOnOwner(Snapshot snapshot, ResourceOwner owner)
static TransactionId parseXidFromText(const char *prefix, char **s, const char *filename)
void InvalidateCatalogSnapshot(void)
Snapshot GetNonHistoricCatalogSnapshot(Oid relid)
struct ExportedSnapshot ExportedSnapshot
volatile OldSnapshotControlData * oldSnapshotControl
Size SnapMgrShmemSize(void)
Snapshot GetOldestSnapshot(void)
void SetOldSnapshotThresholdTimestamp(TimestampTz ts, TransactionId xlimit)
static Snapshot CatalogSnapshot
Snapshot GetActiveSnapshot(void)
Datum pg_export_snapshot(PG_FUNCTION_ARGS)
#define RelationAllowsEarlyPruning(rel)
static bool OldSnapshotThresholdActive(void)
#define OLD_SNAPSHOT_TIME_MAP_ENTRIES
struct SnapshotData * Snapshot
struct SnapshotData SnapshotData
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void initStringInfo(StringInfo str)
struct ActiveSnapshotElt * as_next
slock_t mutex_latest_xmin
TimestampTz next_map_update
TimestampTz threshold_timestamp
TransactionId latest_xmin
TimestampTz head_timestamp
TimestampTz current_timestamp
TransactionId xid_by_minute[FLEXIBLE_ARRAY_MEMBER]
TransactionId threshold_xid
uint64 snapXactCompletionCount
SnapshotType snapshot_type
LocalTransactionId localTransactionId
TransactionId SubTransGetTopmostTransaction(TransactionId xid)
bool RelationHasSysCache(Oid relid)
bool RelationInvalidatesSnapshotsOnly(Oid relid)
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
bool TransactionIdFollows(TransactionId id1, TransactionId id2)
bool TransactionIdFollowsOrEquals(TransactionId id1, TransactionId id2)
#define InvalidTransactionId
#define TransactionIdEquals(id1, id2)
#define FirstNormalTransactionId
#define TransactionIdIsValid(xid)
#define TransactionIdIsNormal(xid)
text * cstring_to_text(const char *s)
int GetCurrentTransactionNestLevel(void)
TransactionId GetTopTransactionIdIfAny(void)
bool IsSubTransaction(void)
bool IsInParallelMode(void)
int xactGetCommittedChildren(TransactionId **ptr)
CommandId GetCurrentCommandId(bool used)
#define XACT_SERIALIZABLE
#define IsolationUsesXactSnapshot()
#define IsolationIsSerializable()
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr