160 #define SNAPSHOT_EXPORT_DIR "pg_snapshots"
279 "cannot take query snapshot during a parallel operation");
334 "cannot update SecondarySnapshot during a parallel operation");
360 Snapshot OldestRegisteredSnapshot = NULL;
367 RegisteredLSN = OldestRegisteredSnapshot->
lsn;
378 return OldestRegisteredSnapshot;
513 int sourcepid,
PGPROC *sourceproc)
540 if (sourcesnap->
xcnt > 0)
565 if (sourceproc != NULL)
569 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
570 errmsg(
"could not import the requested snapshot"),
571 errdetail(
"The source transaction is not running anymore.")));
575 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
576 errmsg(
"could not import the requested snapshot"),
577 errdetail(
"The source process with PID %d is not running anymore.",
632 if (snapshot->
xcnt > 0)
635 memcpy(newsnap->
xip, snapshot->
xip,
767 elog(
ERROR,
"cannot modify commandid in active snapshot during a parallel operation");
873 if (snapshot == NULL)
886 if (snapshot == NULL)
1086 elog(
WARNING,
"registered snapshots seem to remain after cleanup");
1090 elog(
WARNING,
"snapshot %p still active", active);
1165 (
errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
1166 errmsg(
"cannot export a snapshot from a subtransaction")));
1230 for (
i = 0;
i < snapshot->
xcnt;
i++)
1248 for (
i = 0;
i < nchildren;
i++)
1259 snprintf(pathtmp,
sizeof(pathtmp),
"%s.tmp", path);
1263 errmsg(
"could not create file \"%s\": %m", pathtmp)));
1265 if (fwrite(
buf.data,
buf.len, 1, f) != 1)
1268 errmsg(
"could not write to file \"%s\": %m", pathtmp)));
1275 errmsg(
"could not write to file \"%s\": %m", pathtmp)));
1281 if (rename(pathtmp, path) < 0)
1284 errmsg(
"could not rename file \"%s\" to \"%s\": %m",
1319 int prefixlen = strlen(prefix);
1322 if (strncmp(ptr, prefix, prefixlen) != 0)
1324 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1327 if (sscanf(ptr,
"%d", &
val) != 1)
1329 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1331 ptr = strchr(ptr,
'\n');
1334 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1344 int prefixlen = strlen(prefix);
1347 if (strncmp(ptr, prefix, prefixlen) != 0)
1349 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1352 if (sscanf(ptr,
"%u", &
val) != 1)
1354 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1356 ptr = strchr(ptr,
'\n');
1359 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1370 int prefixlen = strlen(prefix);
1372 if (strncmp(ptr, prefix, prefixlen) != 0)
1374 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1379 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1381 ptr = strchr(ptr,
'\n');
1384 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1400 struct stat stat_buf;
1421 (
errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
1422 errmsg(
"SET TRANSACTION SNAPSHOT must be called before any query")));
1430 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1431 errmsg(
"a snapshot-importing transaction must have isolation level SERIALIZABLE or REPEATABLE READ")));
1437 if (strspn(idstr,
"0123456789ABCDEF-") != strlen(idstr))
1439 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1440 errmsg(
"invalid snapshot identifier: \"%s\"", idstr)));
1448 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1449 errmsg(
"invalid snapshot identifier: \"%s\"", idstr)));
1452 if (
fstat(fileno(f), &stat_buf))
1453 elog(
ERROR,
"could not stat file \"%s\": %m", path);
1457 if (fread(filebuf, stat_buf.
st_size, 1, f) != 1)
1458 elog(
ERROR,
"could not read file \"%s\": %m", path);
1460 filebuf[stat_buf.
st_size] =
'\0';
1467 memset(&snapshot, 0,
sizeof(snapshot));
1486 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1487 errmsg(
"invalid snapshot data in file \"%s\"", path)));
1490 for (
i = 0;
i < xcnt;
i++)
1502 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1503 errmsg(
"invalid snapshot data in file \"%s\"", path)));
1506 for (
i = 0;
i < xcnt;
i++)
1527 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1528 errmsg(
"invalid snapshot data in file \"%s\"", path)));
1540 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1541 errmsg(
"a serializable transaction cannot import a snapshot from a non-serializable transaction")));
1544 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1545 errmsg(
"a non-read-only serializable transaction cannot import a snapshot from a read-only transaction")));
1559 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1560 errmsg(
"cannot import a snapshot from a different database")));
1599 if (strcmp(s_de->
d_name,
".") == 0 ||
1600 strcmp(s_de->
d_name,
"..") == 0)
1605 if (unlink(
buf) != 0)
1608 errmsg(
"could not remove file \"%s\": %m",
buf)));
1713 return threshold_timestamp;
1754 bool in_mapping =
false;
1810 Assert(limit_ts != NULL && limit_xid != NULL);
1838 xlimit = latest_xmin;
1853 if (ts == threshold_timestamp)
1859 xlimit = threshold_xid;
1861 else if (ts == next_map_update_ts)
1867 xlimit = latest_xmin;
1884 xlimit = latest_xmin;
1891 *limit_xid = xlimit;
1908 bool map_update_required =
false;
1925 map_update_required =
true;
1932 if (!map_update_required)
1948 "MaintainOldSnapshotTimeMapping called with negative whenTaken = %ld",
1955 "MaintainOldSnapshotTimeMapping called with xmin = %lu",
1956 (
unsigned long) xmin);
1976 else if (ts < oldSnapshotControl->head_timestamp)
1981 "MaintainOldSnapshotTimeMapping called with old whenTaken = %ld",
2001 int distance_to_new_tail;
2002 int distance_to_current_tail;
2021 distance_to_new_tail =
2023 distance_to_current_tail =
2025 advance = distance_to_new_tail - distance_to_current_tail;
2041 for (
i = 0;
i < advance;
i++)
2082 Assert(historic_snapshot != NULL);
2154 serialized_snapshot.
xmin = snapshot->
xmin;
2155 serialized_snapshot.
xmax = snapshot->
xmax;
2156 serialized_snapshot.
xcnt = snapshot->
xcnt;
2162 serialized_snapshot.
lsn = snapshot->
lsn;
2170 serialized_snapshot.
subxcnt = 0;
2173 memcpy(start_address,
2177 if (snapshot->
xcnt > 0)
2188 if (serialized_snapshot.
subxcnt > 0)
2213 memcpy(&serialized_snapshot, start_address,
2226 snapshot->
xmin = serialized_snapshot.
xmin;
2227 snapshot->
xmax = serialized_snapshot.
xmax;
2228 snapshot->
xip = NULL;
2229 snapshot->
xcnt = serialized_snapshot.
xcnt;
2236 snapshot->
lsn = serialized_snapshot.
lsn;
2240 if (serialized_snapshot.
xcnt > 0)
2243 memcpy(snapshot->
xip, serialized_xids,
2248 if (serialized_snapshot.
subxcnt > 0)
2251 serialized_snapshot.
xcnt;
2252 memcpy(snapshot->
subxip, serialized_xids + serialized_snapshot.
xcnt,
Datum current_timestamp(PG_FUNCTION_ARGS)
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
#define OidIsValid(objectId)
elog(ERROR, "%s: %s", p2, msg)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
FILE * AllocateFile(const char *name, const char *mode)
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
DIR * AllocateDir(const char *dirname)
#define PG_RETURN_TEXT_P(x)
Assert(fmt[strlen(fmt) - 1] !='\n')
List * lappend(List *list, void *datum)
#define VirtualTransactionIdIsValid(vxid)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
MemoryContext TopTransactionContext
char * pstrdup(const char *in)
void pfree(void *pointer)
void * MemoryContextAlloc(MemoryContext context, Size size)
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
pairingheap_node * pairingheap_first(pairingheap *heap)
#define pairingheap_is_empty(h)
#define pairingheap_is_singular(h)
#define pairingheap_container(type, membername, ptr)
#define pairingheap_const_container(type, membername, ptr)
#define pairingheap_reset(h)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
static bool pg_lfind32(uint32 key, uint32 *base, uint32 nelem)
static int list_length(const List *l)
void SetSerializableTransactionSnapshot(Snapshot snapshot, VirtualTransactionId *sourcevxid, int sourcepid)
Snapshot GetSerializableTransactionSnapshot(Snapshot snapshot)
int GetMaxSnapshotSubxidCount(void)
Snapshot GetSnapshotData(Snapshot snapshot)
int GetMaxSnapshotXidCount(void)
bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc)
bool ProcArrayInstallImportedXmin(TransactionId xmin, VirtualTransactionId *sourcevxid)
#define RelationNeedsWAL(relation)
ResourceOwner CurrentResourceOwner
void ResourceOwnerRememberSnapshot(ResourceOwner owner, Snapshot snapshot)
void ResourceOwnerForgetSnapshot(ResourceOwner owner, Snapshot snapshot)
void ResourceOwnerEnlargeSnapshots(ResourceOwner owner)
Size add_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Size mul_size(Size s1, Size s2)
static Snapshot HistoricSnapshot
void SnapshotTooOldMagicForTest(void)
static Snapshot FirstXactSnapshot
void MaintainOldSnapshotTimeMapping(TimestampTz whenTaken, TransactionId xmin)
SnapshotData CatalogSnapshotData
void UnregisterSnapshotFromOwner(Snapshot snapshot, ResourceOwner owner)
static void SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid, int sourcepid, PGPROC *sourceproc)
TimestampTz GetSnapshotCurrentTimestamp(void)
void AtSubAbort_Snapshot(int level)
void SerializeSnapshot(Snapshot snapshot, char *start_address)
SnapshotData SnapshotSelfData
void AtEOXact_Snapshot(bool isCommit, bool resetXmin)
struct ActiveSnapshotElt ActiveSnapshotElt
static Snapshot CurrentSnapshot
static Snapshot SecondarySnapshot
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
static List * exportedSnapshots
static pairingheap RegisteredSnapshots
static TimestampTz AlignTimestampToMinuteBoundary(TimestampTz ts)
Snapshot GetTransactionSnapshot(void)
Snapshot GetLatestSnapshot(void)
void TeardownHistoricSnapshot(bool is_error)
Snapshot GetCatalogSnapshot(Oid relid)
void UnregisterSnapshot(Snapshot snapshot)
void PushActiveSnapshot(Snapshot snapshot)
static Snapshot CopySnapshot(Snapshot snapshot)
static ActiveSnapshotElt * OldestActiveSnapshot
Snapshot RestoreSnapshot(char *start_address)
static bool GetOldSnapshotFromTimeMapping(TimestampTz ts, TransactionId *xlimitp)
HTAB * HistoricSnapshotGetTupleCids(void)
void AtSubCommit_Snapshot(int level)
void UpdateActiveSnapshotCommandId(void)
static void SnapshotResetXmin(void)
static int parseIntFromText(const char *prefix, char **s, const char *filename)
static SnapshotData SecondarySnapshotData
char * ExportSnapshot(Snapshot snapshot)
static int xmin_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
TransactionId TransactionXmin
SnapshotData SnapshotAnyData
bool HistoricSnapshotActive(void)
void ImportSnapshot(const char *idstr)
bool ActiveSnapshotSet(void)
Snapshot RegisterSnapshot(Snapshot snapshot)
bool XactHasExportedSnapshots(void)
void DeleteAllExportedSnapshotFiles(void)
static void parseVxidFromText(const char *prefix, char **s, const char *filename, VirtualTransactionId *vxid)
static void FreeSnapshot(Snapshot snapshot)
#define SNAPSHOT_EXPORT_DIR
bool HaveRegisteredOrActiveSnapshot(void)
void InvalidateCatalogSnapshotConditionally(void)
static SnapshotData CurrentSnapshotData
bool ThereAreNoPriorRegisteredSnapshots(void)
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
bool TransactionIdLimitedForOldSnapshots(TransactionId recentXmin, Relation relation, TransactionId *limit_xid, TimestampTz *limit_ts)
void SnapshotSetCommandId(CommandId curcid)
struct SerializedSnapshotData SerializedSnapshotData
void PopActiveSnapshot(void)
void PushCopiedSnapshot(Snapshot snapshot)
Size EstimateSnapshotSpace(Snapshot snapshot)
static ActiveSnapshotElt * ActiveSnapshot
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
TimestampTz GetOldSnapshotThresholdTimestamp(void)
int old_snapshot_threshold
static HTAB * tuplecid_data
Snapshot RegisterSnapshotOnOwner(Snapshot snapshot, ResourceOwner owner)
static TransactionId parseXidFromText(const char *prefix, char **s, const char *filename)
void InvalidateCatalogSnapshot(void)
void PushActiveSnapshotWithLevel(Snapshot snapshot, int snap_level)
Snapshot GetNonHistoricCatalogSnapshot(Oid relid)
struct ExportedSnapshot ExportedSnapshot
volatile OldSnapshotControlData * oldSnapshotControl
Size SnapMgrShmemSize(void)
Snapshot GetOldestSnapshot(void)
void SetOldSnapshotThresholdTimestamp(TimestampTz ts, TransactionId xlimit)
static Snapshot CatalogSnapshot
Snapshot GetActiveSnapshot(void)
Datum pg_export_snapshot(PG_FUNCTION_ARGS)
#define RelationAllowsEarlyPruning(rel)
static bool OldSnapshotThresholdActive(void)
#define OLD_SNAPSHOT_TIME_MAP_ENTRIES
struct SnapshotData * Snapshot
struct SnapshotData SnapshotData
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void initStringInfo(StringInfo str)
struct ActiveSnapshotElt * as_next
slock_t mutex_latest_xmin
TimestampTz next_map_update
TimestampTz threshold_timestamp
TransactionId latest_xmin
TimestampTz head_timestamp
TimestampTz current_timestamp
TransactionId xid_by_minute[FLEXIBLE_ARRAY_MEMBER]
TransactionId threshold_xid
uint64 snapXactCompletionCount
SnapshotType snapshot_type
LocalTransactionId localTransactionId
TransactionId SubTransGetTopmostTransaction(TransactionId xid)
bool RelationHasSysCache(Oid relid)
bool RelationInvalidatesSnapshotsOnly(Oid relid)
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
bool TransactionIdFollows(TransactionId id1, TransactionId id2)
bool TransactionIdFollowsOrEquals(TransactionId id1, TransactionId id2)
#define InvalidTransactionId
#define FirstNormalTransactionId
#define TransactionIdIsValid(xid)
#define TransactionIdIsNormal(xid)
text * cstring_to_text(const char *s)
int GetCurrentTransactionNestLevel(void)
TransactionId GetTopTransactionIdIfAny(void)
bool IsSubTransaction(void)
bool IsInParallelMode(void)
int xactGetCommittedChildren(TransactionId **ptr)
CommandId GetCurrentCommandId(bool used)
#define XACT_SERIALIZABLE
#define IsolationUsesXactSnapshot()
#define IsolationIsSerializable()
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr