PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
snapbuild.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/heapam_xlog.h"
#include "access/transam.h"
#include "access/xact.h"
#include "common/file_utils.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
#include "replication/snapbuild_internal.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/standby.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
#include "utils/snapshot.h"
Include dependency graph for snapbuild.c:

Go to the source code of this file.

Macros

#define SnapBuildOnDiskConstantSize    offsetof(SnapBuildOnDisk, builder)
 
#define SnapBuildOnDiskNotChecksummedSize    offsetof(SnapBuildOnDisk, version)
 
#define SNAPBUILD_MAGIC   0x51A1E001
 
#define SNAPBUILD_VERSION   6
 

Functions

static void SnapBuildPurgeOlderTxn (SnapBuild *builder)
 
static Snapshot SnapBuildBuildSnapshot (SnapBuild *builder)
 
static void SnapBuildFreeSnapshot (Snapshot snap)
 
static void SnapBuildSnapIncRefcount (Snapshot snap)
 
static void SnapBuildDistributeSnapshotAndInval (SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
 
static bool SnapBuildXidHasCatalogChanges (SnapBuild *builder, TransactionId xid, uint32 xinfo)
 
static bool SnapBuildFindSnapshot (SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
 
static void SnapBuildWaitSnapshot (xl_running_xacts *running, TransactionId cutoff)
 
static void SnapBuildSerialize (SnapBuild *builder, XLogRecPtr lsn)
 
static bool SnapBuildRestore (SnapBuild *builder, XLogRecPtr lsn)
 
static void SnapBuildRestoreContents (int fd, void *dest, Size size, const char *path)
 
SnapBuildAllocateSnapshotBuilder (ReorderBuffer *reorder, TransactionId xmin_horizon, XLogRecPtr start_lsn, bool need_full_snapshot, bool in_slot_creation, XLogRecPtr two_phase_at)
 
void FreeSnapshotBuilder (SnapBuild *builder)
 
SnapBuildState SnapBuildCurrentState (SnapBuild *builder)
 
XLogRecPtr SnapBuildGetTwoPhaseAt (SnapBuild *builder)
 
void SnapBuildSetTwoPhaseAt (SnapBuild *builder, XLogRecPtr ptr)
 
bool SnapBuildXactNeedsSkip (SnapBuild *builder, XLogRecPtr ptr)
 
void SnapBuildSnapDecRefcount (Snapshot snap)
 
Snapshot SnapBuildInitialSnapshot (SnapBuild *builder)
 
const char * SnapBuildExportSnapshot (SnapBuild *builder)
 
Snapshot SnapBuildGetOrBuildSnapshot (SnapBuild *builder)
 
void SnapBuildClearExportedSnapshot (void)
 
void SnapBuildResetExportedSnapshotState (void)
 
bool SnapBuildProcessChange (SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
 
void SnapBuildProcessNewCid (SnapBuild *builder, TransactionId xid, XLogRecPtr lsn, xl_heap_new_cid *xlrec)
 
static void SnapBuildAddCommittedTxn (SnapBuild *builder, TransactionId xid)
 
void SnapBuildCommitTxn (SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, int nsubxacts, TransactionId *subxacts, uint32 xinfo)
 
void SnapBuildProcessRunningXacts (SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
 
void SnapBuildSerializationPoint (SnapBuild *builder, XLogRecPtr lsn)
 
bool SnapBuildRestoreSnapshot (SnapBuildOnDisk *ondisk, XLogRecPtr lsn, MemoryContext context, bool missing_ok)
 
void CheckPointSnapBuild (void)
 
bool SnapBuildSnapshotExists (XLogRecPtr lsn)
 

Variables

static ResourceOwner SavedResourceOwnerDuringExport = NULL
 
static bool ExportInProgress = false
 

Macro Definition Documentation

◆ SNAPBUILD_MAGIC

#define SNAPBUILD_MAGIC   0x51A1E001

Definition at line 1466 of file snapbuild.c.

◆ SNAPBUILD_VERSION

#define SNAPBUILD_VERSION   6

Definition at line 1467 of file snapbuild.c.

◆ SnapBuildOnDiskConstantSize

#define SnapBuildOnDiskConstantSize    offsetof(SnapBuildOnDisk, builder)

Definition at line 1461 of file snapbuild.c.

◆ SnapBuildOnDiskNotChecksummedSize

#define SnapBuildOnDiskNotChecksummedSize    offsetof(SnapBuildOnDisk, version)

Definition at line 1463 of file snapbuild.c.

Function Documentation

◆ AllocateSnapshotBuilder()

SnapBuild * AllocateSnapshotBuilder ( ReorderBuffer reorder,
TransactionId  xmin_horizon,
XLogRecPtr  start_lsn,
bool  need_full_snapshot,
bool  in_slot_creation,
XLogRecPtr  two_phase_at 
)

Definition at line 185 of file snapbuild.c.

191{
192 MemoryContext context;
193 MemoryContext oldcontext;
194 SnapBuild *builder;
195
196 /* allocate memory in own context, to have better accountability */
198 "snapshot builder context",
200 oldcontext = MemoryContextSwitchTo(context);
201
202 builder = palloc0(sizeof(SnapBuild));
203
204 builder->state = SNAPBUILD_START;
205 builder->context = context;
206 builder->reorder = reorder;
207 /* Other struct members initialized by zeroing via palloc0 above */
208
209 builder->committed.xcnt = 0;
210 builder->committed.xcnt_space = 128; /* arbitrary number */
211 builder->committed.xip =
212 palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
213 builder->committed.includes_all_transactions = true;
214
215 builder->catchange.xcnt = 0;
216 builder->catchange.xip = NULL;
217
218 builder->initial_xmin_horizon = xmin_horizon;
219 builder->start_decoding_at = start_lsn;
220 builder->in_slot_creation = in_slot_creation;
221 builder->building_full_snapshot = need_full_snapshot;
222 builder->two_phase_at = two_phase_at;
223
224 MemoryContextSwitchTo(oldcontext);
225
226 return builder;
227}
uint32 TransactionId
Definition: c.h:623
void * palloc0(Size size)
Definition: mcxt.c:1969
MemoryContext CurrentMemoryContext
Definition: mcxt.c:159
#define AllocSetContextCreate
Definition: memutils.h:149
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:180
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
@ SNAPBUILD_START
Definition: snapbuild.h:27
XLogRecPtr start_decoding_at
SnapBuildState state
TransactionId initial_xmin_horizon
struct SnapBuild::@117 committed
TransactionId * xip
XLogRecPtr two_phase_at
bool building_full_snapshot
bool includes_all_transactions
MemoryContext context
ReorderBuffer * reorder
struct SnapBuild::@118 catchange

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, SnapBuild::building_full_snapshot, SnapBuild::catchange, SnapBuild::committed, SnapBuild::context, CurrentMemoryContext, SnapBuild::in_slot_creation, SnapBuild::includes_all_transactions, SnapBuild::initial_xmin_horizon, MemoryContextSwitchTo(), palloc0(), SnapBuild::reorder, SNAPBUILD_START, SnapBuild::start_decoding_at, SnapBuild::state, SnapBuild::two_phase_at, SnapBuild::xcnt, SnapBuild::xcnt_space, and SnapBuild::xip.

Referenced by StartupDecodingContext().

◆ CheckPointSnapBuild()

void CheckPointSnapBuild ( void  )

Definition at line 1962 of file snapbuild.c.

1963{
1964 XLogRecPtr cutoff;
1965 XLogRecPtr redo;
1966 DIR *snap_dir;
1967 struct dirent *snap_de;
1968 char path[MAXPGPATH + sizeof(PG_LOGICAL_SNAPSHOTS_DIR)];
1969
1970 /*
1971 * We start off with a minimum of the last redo pointer. No new
1972 * replication slot will start before that, so that's a safe upper bound
1973 * for removal.
1974 */
1975 redo = GetRedoRecPtr();
1976
1977 /* now check for the restart ptrs from existing slots */
1979
1980 /* don't start earlier than the restart lsn */
1981 if (redo < cutoff)
1982 cutoff = redo;
1983
1985 while ((snap_de = ReadDir(snap_dir, PG_LOGICAL_SNAPSHOTS_DIR)) != NULL)
1986 {
1987 uint32 hi;
1988 uint32 lo;
1989 XLogRecPtr lsn;
1990 PGFileType de_type;
1991
1992 if (strcmp(snap_de->d_name, ".") == 0 ||
1993 strcmp(snap_de->d_name, "..") == 0)
1994 continue;
1995
1996 snprintf(path, sizeof(path), "%s/%s", PG_LOGICAL_SNAPSHOTS_DIR, snap_de->d_name);
1997 de_type = get_dirent_type(path, snap_de, false, DEBUG1);
1998
1999 if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
2000 {
2001 elog(DEBUG1, "only regular files expected: %s", path);
2002 continue;
2003 }
2004
2005 /*
2006 * temporary filenames from SnapBuildSerialize() include the LSN and
2007 * everything but are postfixed by .$pid.tmp. We can just remove them
2008 * the same as other files because there can be none that are
2009 * currently being written that are older than cutoff.
2010 *
2011 * We just log a message if a file doesn't fit the pattern, it's
2012 * probably some editors lock/state file or similar...
2013 */
2014 if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
2015 {
2016 ereport(LOG,
2017 (errmsg("could not parse file name \"%s\"", path)));
2018 continue;
2019 }
2020
2021 lsn = ((uint64) hi) << 32 | lo;
2022
2023 /* check whether we still need it */
2024 if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
2025 {
2026 elog(DEBUG1, "removing snapbuild snapshot %s", path);
2027
2028 /*
2029 * It's not particularly harmful, though strange, if we can't
2030 * remove the file here. Don't prevent the checkpoint from
2031 * completing, that'd be a cure worse than the disease.
2032 */
2033 if (unlink(path) < 0)
2034 {
2035 ereport(LOG,
2037 errmsg("could not remove file \"%s\": %m",
2038 path)));
2039 continue;
2040 }
2041 }
2042 }
2043 FreeDir(snap_dir);
2044}
uint64_t uint64
Definition: c.h:503
uint32_t uint32
Definition: c.h:502
int errcode_for_file_access(void)
Definition: elog.c:877
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:149
int FreeDir(DIR *dir)
Definition: fd.c:3025
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2907
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2973
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Definition: file_utils.c:547
PGFileType
Definition: file_utils.h:19
@ PGFILETYPE_REG
Definition: file_utils.h:22
@ PGFILETYPE_ERROR
Definition: file_utils.h:20
#define MAXPGPATH
#define snprintf
Definition: port.h:239
#define PG_LOGICAL_SNAPSHOTS_DIR
Definition: reorderbuffer.h:24
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:1205
Definition: dirent.c:26
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:6625
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References AllocateDir(), dirent::d_name, DEBUG1, elog, ereport, errcode_for_file_access(), errmsg(), FreeDir(), get_dirent_type(), GetRedoRecPtr(), InvalidXLogRecPtr, LOG, MAXPGPATH, PG_LOGICAL_SNAPSHOTS_DIR, PGFILETYPE_ERROR, PGFILETYPE_REG, ReadDir(), ReplicationSlotsComputeLogicalRestartLSN(), and snprintf.

Referenced by CheckPointGuts().

◆ FreeSnapshotBuilder()

void FreeSnapshotBuilder ( SnapBuild builder)

Definition at line 233 of file snapbuild.c.

234{
235 MemoryContext context = builder->context;
236
237 /* free snapshot explicitly, that contains some error checking */
238 if (builder->snapshot != NULL)
239 {
241 builder->snapshot = NULL;
242 }
243
244 /* other resources are deallocated via memory context reset */
245 MemoryContextDelete(context);
246}
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:485
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:328
Snapshot snapshot

References SnapBuild::context, MemoryContextDelete(), SnapBuildSnapDecRefcount(), and SnapBuild::snapshot.

Referenced by FreeDecodingContext().

◆ SnapBuildAddCommittedTxn()

static void SnapBuildAddCommittedTxn ( SnapBuild builder,
TransactionId  xid 
)
static

Definition at line 821 of file snapbuild.c.

822{
824
825 if (builder->committed.xcnt == builder->committed.xcnt_space)
826 {
827 builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
828
829 elog(DEBUG1, "increasing space for committed transactions to %u",
830 (uint32) builder->committed.xcnt_space);
831
832 builder->committed.xip = repalloc(builder->committed.xip,
833 builder->committed.xcnt_space * sizeof(TransactionId));
834 }
835
836 /*
837 * TODO: It might make sense to keep the array sorted here instead of
838 * doing it every time we build a new snapshot. On the other hand this
839 * gets called repeatedly when a transaction with subtransactions commits.
840 */
841 builder->committed.xip[builder->committed.xcnt++] = xid;
842}
Assert(PointerIsAligned(start, uint64))
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:2166
#define TransactionIdIsValid(xid)
Definition: transam.h:41

References Assert(), SnapBuild::committed, DEBUG1, elog, repalloc(), TransactionIdIsValid, SnapBuild::xcnt, SnapBuild::xcnt_space, and SnapBuild::xip.

Referenced by SnapBuildCommitTxn().

◆ SnapBuildBuildSnapshot()

static Snapshot SnapBuildBuildSnapshot ( SnapBuild builder)
static

Definition at line 360 of file snapbuild.c.

361{
362 Snapshot snapshot;
363 Size ssize;
364
366
367 ssize = sizeof(SnapshotData)
368 + sizeof(TransactionId) * builder->committed.xcnt
369 + sizeof(TransactionId) * 1 /* toplevel xid */ ;
370
371 snapshot = MemoryContextAllocZero(builder->context, ssize);
372
374
375 /*
376 * We misuse the original meaning of SnapshotData's xip and subxip fields
377 * to make the more fitting for our needs.
378 *
379 * In the 'xip' array we store transactions that have to be treated as
380 * committed. Since we will only ever look at tuples from transactions
381 * that have modified the catalog it's more efficient to store those few
382 * that exist between xmin and xmax (frequently there are none).
383 *
384 * Snapshots that are used in transactions that have modified the catalog
385 * also use the 'subxip' array to store their toplevel xid and all the
386 * subtransaction xids so we can recognize when we need to treat rows as
387 * visible that are not in xip but still need to be visible. Subxip only
388 * gets filled when the transaction is copied into the context of a
389 * catalog modifying transaction since we otherwise share a snapshot
390 * between transactions. As long as a txn hasn't modified the catalog it
391 * doesn't need to treat any uncommitted rows as visible, so there is no
392 * need for those xids.
393 *
394 * Both arrays are qsort'ed so that we can use bsearch() on them.
395 */
398
399 snapshot->xmin = builder->xmin;
400 snapshot->xmax = builder->xmax;
401
402 /* store all transactions to be treated as committed by this snapshot */
403 snapshot->xip =
404 (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
405 snapshot->xcnt = builder->committed.xcnt;
406 memcpy(snapshot->xip,
407 builder->committed.xip,
408 builder->committed.xcnt * sizeof(TransactionId));
409
410 /* sort so we can bsearch() */
411 qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
412
413 /*
414 * Initially, subxip is empty, i.e. it's a snapshot to be used by
415 * transactions that don't modify the catalog. Will be filled by
416 * ReorderBufferCopySnap() if necessary.
417 */
418 snapshot->subxcnt = 0;
419 snapshot->subxip = NULL;
420
421 snapshot->suboverflowed = false;
422 snapshot->takenDuringRecovery = false;
423 snapshot->copied = false;
424 snapshot->curcid = FirstCommandId;
425 snapshot->active_count = 0;
426 snapshot->regd_count = 0;
427 snapshot->snapXactCompletionCount = 0;
428
429 return snapshot;
430}
#define FirstCommandId
Definition: c.h:639
size_t Size
Definition: c.h:576
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1290
#define qsort(a, b, c, d)
Definition: port.h:479
@ SNAPBUILD_FULL_SNAPSHOT
Definition: snapbuild.h:43
struct SnapshotData SnapshotData
@ SNAPSHOT_HISTORIC_MVCC
Definition: snapshot.h:105
TransactionId xmin
TransactionId xmax
TransactionId xmin
Definition: snapshot.h:153
int32 subxcnt
Definition: snapshot.h:177
bool copied
Definition: snapshot.h:181
uint32 regd_count
Definition: snapshot.h:201
uint32 active_count
Definition: snapshot.h:200
CommandId curcid
Definition: snapshot.h:183
uint32 xcnt
Definition: snapshot.h:165
TransactionId * subxip
Definition: snapshot.h:176
uint64 snapXactCompletionCount
Definition: snapshot.h:209
TransactionId xmax
Definition: snapshot.h:154
SnapshotType snapshot_type
Definition: snapshot.h:140
TransactionId * xip
Definition: snapshot.h:164
bool suboverflowed
Definition: snapshot.h:178
bool takenDuringRecovery
Definition: snapshot.h:180
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:152

References SnapshotData::active_count, Assert(), SnapBuild::committed, SnapBuild::context, SnapshotData::copied, SnapshotData::curcid, FirstCommandId, MemoryContextAllocZero(), qsort, SnapshotData::regd_count, SNAPBUILD_FULL_SNAPSHOT, SNAPSHOT_HISTORIC_MVCC, SnapshotData::snapshot_type, SnapshotData::snapXactCompletionCount, SnapBuild::state, SnapshotData::suboverflowed, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::takenDuringRecovery, TransactionIdIsNormal, SnapBuild::xcnt, SnapshotData::xcnt, xidComparator(), SnapBuild::xip, SnapshotData::xip, SnapBuild::xmax, SnapshotData::xmax, SnapBuild::xmin, and SnapshotData::xmin.

Referenced by SnapBuildCommitTxn(), SnapBuildGetOrBuildSnapshot(), SnapBuildInitialSnapshot(), SnapBuildProcessChange(), and SnapBuildRestore().

◆ SnapBuildClearExportedSnapshot()

void SnapBuildClearExportedSnapshot ( void  )

Definition at line 600 of file snapbuild.c.

601{
602 ResourceOwner tmpResOwner;
603
604 /* nothing exported, that is the usual case */
605 if (!ExportInProgress)
606 return;
607
608 if (!IsTransactionState())
609 elog(ERROR, "clearing exported snapshot in wrong transaction state");
610
611 /*
612 * AbortCurrentTransaction() takes care of resetting the snapshot state,
613 * so remember SavedResourceOwnerDuringExport.
614 */
615 tmpResOwner = SavedResourceOwnerDuringExport;
616
617 /* make sure nothing could have ever happened */
619
620 CurrentResourceOwner = tmpResOwner;
621}
#define ERROR
Definition: elog.h:39
ResourceOwner CurrentResourceOwner
Definition: resowner.c:173
static ResourceOwner SavedResourceOwnerDuringExport
Definition: snapbuild.c:151
static bool ExportInProgress
Definition: snapbuild.c:152
bool IsTransactionState(void)
Definition: xact.c:387
void AbortCurrentTransaction(void)
Definition: xact.c:3451

References AbortCurrentTransaction(), CurrentResourceOwner, elog, ERROR, ExportInProgress, IsTransactionState(), and SavedResourceOwnerDuringExport.

Referenced by exec_replication_command().

◆ SnapBuildCommitTxn()

void SnapBuildCommitTxn ( SnapBuild builder,
XLogRecPtr  lsn,
TransactionId  xid,
int  nsubxacts,
TransactionId subxacts,
uint32  xinfo 
)

Definition at line 932 of file snapbuild.c.

934{
935 int nxact;
936
937 bool needs_snapshot = false;
938 bool needs_timetravel = false;
939 bool sub_needs_timetravel = false;
940
941 TransactionId xmax = xid;
942
943 /*
944 * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor
945 * will they be part of a snapshot. So we don't need to record anything.
946 */
947 if (builder->state == SNAPBUILD_START ||
948 (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
949 TransactionIdPrecedes(xid, builder->next_phase_at)))
950 {
951 /* ensure that only commits after this are getting replayed */
952 if (builder->start_decoding_at <= lsn)
953 builder->start_decoding_at = lsn + 1;
954 return;
955 }
956
957 if (builder->state < SNAPBUILD_CONSISTENT)
958 {
959 /* ensure that only commits after this are getting replayed */
960 if (builder->start_decoding_at <= lsn)
961 builder->start_decoding_at = lsn + 1;
962
963 /*
964 * If building an exportable snapshot, force xid to be tracked, even
965 * if the transaction didn't modify the catalog.
966 */
967 if (builder->building_full_snapshot)
968 {
969 needs_timetravel = true;
970 }
971 }
972
973 for (nxact = 0; nxact < nsubxacts; nxact++)
974 {
975 TransactionId subxid = subxacts[nxact];
976
977 /*
978 * Add subtransaction to base snapshot if catalog modifying, we don't
979 * distinguish to toplevel transactions there.
980 */
981 if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo))
982 {
983 sub_needs_timetravel = true;
984 needs_snapshot = true;
985
986 elog(DEBUG1, "found subtransaction %u:%u with catalog changes",
987 xid, subxid);
988
989 SnapBuildAddCommittedTxn(builder, subxid);
990
991 if (NormalTransactionIdFollows(subxid, xmax))
992 xmax = subxid;
993 }
994
995 /*
996 * If we're forcing timetravel we also need visibility information
997 * about subtransaction, so keep track of subtransaction's state, even
998 * if not catalog modifying. Don't need to distribute a snapshot in
999 * that case.
1000 */
1001 else if (needs_timetravel)
1002 {
1003 SnapBuildAddCommittedTxn(builder, subxid);
1004 if (NormalTransactionIdFollows(subxid, xmax))
1005 xmax = subxid;
1006 }
1007 }
1008
1009 /* if top-level modified catalog, it'll need a snapshot */
1010 if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo))
1011 {
1012 elog(DEBUG2, "found top level transaction %u, with catalog changes",
1013 xid);
1014 needs_snapshot = true;
1015 needs_timetravel = true;
1016 SnapBuildAddCommittedTxn(builder, xid);
1017 }
1018 else if (sub_needs_timetravel)
1019 {
1020 /* track toplevel txn as well, subxact alone isn't meaningful */
1021 elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransactions",
1022 xid);
1023 needs_timetravel = true;
1024 SnapBuildAddCommittedTxn(builder, xid);
1025 }
1026 else if (needs_timetravel)
1027 {
1028 elog(DEBUG2, "forced transaction %u to do timetravel", xid);
1029
1030 SnapBuildAddCommittedTxn(builder, xid);
1031 }
1032
1033 if (!needs_timetravel)
1034 {
1035 /* record that we cannot export a general snapshot anymore */
1036 builder->committed.includes_all_transactions = false;
1037 }
1038
1039 Assert(!needs_snapshot || needs_timetravel);
1040
1041 /*
1042 * Adjust xmax of the snapshot builder, we only do that for committed,
1043 * catalog modifying, transactions, everything else isn't interesting for
1044 * us since we'll never look at the respective rows.
1045 */
1046 if (needs_timetravel &&
1047 (!TransactionIdIsValid(builder->xmax) ||
1048 TransactionIdFollowsOrEquals(xmax, builder->xmax)))
1049 {
1050 builder->xmax = xmax;
1051 TransactionIdAdvance(builder->xmax);
1052 }
1053
1054 /* if there's any reason to build a historic snapshot, do so now */
1055 if (needs_snapshot)
1056 {
1057 /*
1058 * If we haven't built a complete snapshot yet there's no need to hand
1059 * it out, it wouldn't (and couldn't) be used anyway.
1060 */
1061 if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
1062 return;
1063
1064 /*
1065 * Decrease the snapshot builder's refcount of the old snapshot, note
1066 * that it still will be used if it has been handed out to the
1067 * reorderbuffer earlier.
1068 */
1069 if (builder->snapshot)
1071
1072 builder->snapshot = SnapBuildBuildSnapshot(builder);
1073
1074 /* we might need to execute invalidations, add snapshot */
1075 if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
1076 {
1078 ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
1079 builder->snapshot);
1080 }
1081
1082 /* refcount of the snapshot builder for the new snapshot */
1084
1085 /*
1086 * Add a new catalog snapshot and invalidations messages to all
1087 * currently running transactions.
1088 */
1089 SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
1090 }
1091}
#define DEBUG2
Definition: elog.h:29
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
static void SnapBuildSnapIncRefcount(Snapshot snap)
Definition: snapbuild.c:316
static void SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
Definition: snapbuild.c:821
static bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid, uint32 xinfo)
Definition: snapbuild.c:1098
static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder)
Definition: snapbuild.c:360
static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
Definition: snapbuild.c:731
@ SNAPBUILD_BUILDING_SNAPSHOT
Definition: snapbuild.h:33
@ SNAPBUILD_CONSISTENT
Definition: snapbuild.h:50
TransactionId next_phase_at
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
bool TransactionIdFollowsOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:329
#define NormalTransactionIdFollows(id1, id2)
Definition: transam.h:152
#define TransactionIdAdvance(dest)
Definition: transam.h:91

References Assert(), SnapBuild::building_full_snapshot, SnapBuild::committed, DEBUG1, DEBUG2, elog, SnapBuild::includes_all_transactions, SnapBuild::next_phase_at, NormalTransactionIdFollows, SnapBuild::reorder, ReorderBufferSetBaseSnapshot(), ReorderBufferXidHasBaseSnapshot(), SNAPBUILD_BUILDING_SNAPSHOT, SNAPBUILD_CONSISTENT, SNAPBUILD_FULL_SNAPSHOT, SNAPBUILD_START, SnapBuildAddCommittedTxn(), SnapBuildBuildSnapshot(), SnapBuildDistributeSnapshotAndInval(), SnapBuildSnapDecRefcount(), SnapBuildSnapIncRefcount(), SnapBuildXidHasCatalogChanges(), SnapBuild::snapshot, SnapBuild::start_decoding_at, SnapBuild::state, TransactionIdAdvance, TransactionIdFollowsOrEquals(), TransactionIdIsValid, TransactionIdPrecedes(), and SnapBuild::xmax.

Referenced by DecodeCommit().

◆ SnapBuildCurrentState()

SnapBuildState SnapBuildCurrentState ( SnapBuild builder)

◆ SnapBuildDistributeSnapshotAndInval()

static void SnapBuildDistributeSnapshotAndInval ( SnapBuild builder,
XLogRecPtr  lsn,
TransactionId  xid 
)
static

Definition at line 731 of file snapbuild.c.

732{
733 dlist_iter txn_i;
734 ReorderBufferTXN *txn;
735
736 /*
737 * Iterate through all toplevel transactions. This can include
738 * subtransactions which we just don't yet know to be that, but that's
739 * fine, they will just get an unnecessary snapshot and invalidations
740 * queued.
741 */
742 dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
743 {
744 txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
745
747
748 /*
749 * If we don't have a base snapshot yet, there are no changes in this
750 * transaction which in turn implies we don't yet need a snapshot at
751 * all. We'll add a snapshot when the first change gets queued.
752 *
753 * Similarly, we don't need to add invalidations to a transaction
754 * whose base snapshot is not yet set. Once a base snapshot is built,
755 * it will include the xids of committed transactions that have
756 * modified the catalog, thus reflecting the new catalog contents. The
757 * existing catalog cache will have already been invalidated after
758 * processing the invalidations in the transaction that modified
759 * catalogs, ensuring that a fresh cache is constructed during
760 * decoding.
761 *
762 * NB: This works correctly even for subtransactions because
763 * ReorderBufferAssignChild() takes care to transfer the base snapshot
764 * to the top-level transaction, and while iterating the changequeue
765 * we'll get the change from the subtxn.
766 */
767 if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
768 continue;
769
770 /*
771 * We don't need to add snapshot or invalidations to prepared
772 * transactions as they should not see the new catalog contents.
773 */
774 if (rbtxn_is_prepared(txn))
775 continue;
776
777 elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%X",
778 txn->xid, LSN_FORMAT_ARGS(lsn));
779
780 /*
781 * increase the snapshot's refcount for the transaction we are handing
782 * it out to
783 */
785 ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
786 builder->snapshot);
787
788 /*
789 * Add invalidation messages to the reorder buffer of in-progress
790 * transactions except the current committed transaction, for which we
791 * will execute invalidations at the end.
792 *
793 * It is required, otherwise, we will end up using the stale catcache
794 * contents built by the current transaction even after its decoding,
795 * which should have been invalidated due to concurrent catalog
796 * changing transaction.
797 */
798 if (txn->xid != xid)
799 {
800 uint32 ninvalidations;
801 SharedInvalidationMessage *msgs = NULL;
802
803 ninvalidations = ReorderBufferGetInvalidations(builder->reorder,
804 xid, &msgs);
805
806 if (ninvalidations > 0)
807 {
808 Assert(msgs != NULL);
809
810 ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn,
811 ninvalidations, msgs);
812 }
813 }
814 }
815}
#define dlist_foreach(iter, lhead)
Definition: ilist.h:623
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
uint32 ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid, SharedInvalidationMessage **msgs)
void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
void ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
#define rbtxn_is_prepared(txn)
TransactionId xid
dlist_head toplevel_by_lsn
dlist_node * cur
Definition: ilist.h:179
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43

References Assert(), dlist_iter::cur, DEBUG2, dlist_container, dlist_foreach, elog, LSN_FORMAT_ARGS, rbtxn_is_prepared, SnapBuild::reorder, ReorderBufferAddInvalidations(), ReorderBufferAddSnapshot(), ReorderBufferGetInvalidations(), ReorderBufferXidHasBaseSnapshot(), SnapBuildSnapIncRefcount(), SnapBuild::snapshot, ReorderBuffer::toplevel_by_lsn, TransactionIdIsValid, and ReorderBufferTXN::xid.

Referenced by SnapBuildCommitTxn().

◆ SnapBuildExportSnapshot()

const char * SnapBuildExportSnapshot ( SnapBuild builder)

Definition at line 539 of file snapbuild.c.

540{
541 Snapshot snap;
542 char *snapname;
543
545 elog(ERROR, "cannot export a snapshot from within a transaction");
546
548 elog(ERROR, "can only export one snapshot at a time");
549
551 ExportInProgress = true;
552
554
555 /* There doesn't seem to a nice API to set these */
557 XactReadOnly = true;
558
559 snap = SnapBuildInitialSnapshot(builder);
560
561 /*
562 * now that we've built a plain snapshot, make it active and use the
563 * normal mechanisms for exporting it
564 */
565 snapname = ExportSnapshot(snap);
566
567 ereport(LOG,
568 (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
569 "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
570 snap->xcnt,
571 snapname, snap->xcnt)));
572 return snapname;
573}
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
Definition: elog.c:1181
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:440
char * ExportSnapshot(Snapshot snapshot)
Definition: snapmgr.c:1102
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4989
bool XactReadOnly
Definition: xact.c:82
void StartTransactionCommand(void)
Definition: xact.c:3059
int XactIsoLevel
Definition: xact.c:79
#define XACT_REPEATABLE_READ
Definition: xact.h:38

References CurrentResourceOwner, elog, ereport, errmsg_plural(), ERROR, ExportInProgress, ExportSnapshot(), IsTransactionOrTransactionBlock(), LOG, SavedResourceOwnerDuringExport, SnapBuildInitialSnapshot(), StartTransactionCommand(), XACT_REPEATABLE_READ, XactIsoLevel, XactReadOnly, and SnapshotData::xcnt.

Referenced by CreateReplicationSlot().

◆ SnapBuildFindSnapshot()

static bool SnapBuildFindSnapshot ( SnapBuild builder,
XLogRecPtr  lsn,
xl_running_xacts running 
)
static

Definition at line 1230 of file snapbuild.c.

1231{
1232 /* ---
1233 * Build catalog decoding snapshot incrementally using information about
1234 * the currently running transactions. There are several ways to do that:
1235 *
1236 * a) There were no running transactions when the xl_running_xacts record
1237 * was inserted, jump to CONSISTENT immediately. We might find such a
1238 * state while waiting on c)'s sub-states.
1239 *
1240 * b) This (in a previous run) or another decoding slot serialized a
1241 * snapshot to disk that we can use. Can't use this method while finding
1242 * the start point for decoding changes as the restart LSN would be an
1243 * arbitrary LSN but we need to find the start point to extract changes
1244 * where we won't see the data for partial transactions. Also, we cannot
1245 * use this method when a slot needs a full snapshot for export or direct
1246 * use, as that snapshot will only contain catalog modifying transactions.
1247 *
1248 * c) First incrementally build a snapshot for catalog tuples
1249 * (BUILDING_SNAPSHOT), that requires all, already in-progress,
1250 * transactions to finish. Every transaction starting after that
1251 * (FULL_SNAPSHOT state), has enough information to be decoded. But
1252 * for older running transactions no viable snapshot exists yet, so
1253 * CONSISTENT will only be reached once all of those have finished.
1254 * ---
1255 */
1256
1257 /*
1258 * xl_running_xacts record is older than what we can use, we might not
1259 * have all necessary catalog rows anymore.
1260 */
1263 builder->initial_xmin_horizon))
1264 {
1266 (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
1267 LSN_FORMAT_ARGS(lsn)),
1268 errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
1269 builder->initial_xmin_horizon, running->oldestRunningXid)));
1270
1271
1273
1274 return true;
1275 }
1276
1277 /*
1278 * a) No transaction were running, we can jump to consistent.
1279 *
1280 * This is not affected by races around xl_running_xacts, because we can
1281 * miss transaction commits, but currently not transactions starting.
1282 *
1283 * NB: We might have already started to incrementally assemble a snapshot,
1284 * so we need to be careful to deal with that.
1285 */
1286 if (running->oldestRunningXid == running->nextXid)
1287 {
1288 if (builder->start_decoding_at == InvalidXLogRecPtr ||
1289 builder->start_decoding_at <= lsn)
1290 /* can decode everything after this */
1291 builder->start_decoding_at = lsn + 1;
1292
1293 /* As no transactions were running xmin/xmax can be trivially set. */
1294 builder->xmin = running->nextXid; /* < are finished */
1295 builder->xmax = running->nextXid; /* >= are running */
1296
1297 /* so we can safely use the faster comparisons */
1300
1301 builder->state = SNAPBUILD_CONSISTENT;
1303
1304 ereport(LOG,
1305 (errmsg("logical decoding found consistent point at %X/%X",
1306 LSN_FORMAT_ARGS(lsn)),
1307 errdetail("There are no running transactions.")));
1308
1309 return false;
1310 }
1311
1312 /*
1313 * b) valid on disk state and while neither building full snapshot nor
1314 * creating a slot.
1315 */
1316 else if (!builder->building_full_snapshot &&
1317 !builder->in_slot_creation &&
1318 SnapBuildRestore(builder, lsn))
1319 {
1320 /* there won't be any state to cleanup */
1321 return false;
1322 }
1323
1324 /*
1325 * c) transition from START to BUILDING_SNAPSHOT.
1326 *
1327 * In START state, and a xl_running_xacts record with running xacts is
1328 * encountered. In that case, switch to BUILDING_SNAPSHOT state, and
1329 * record xl_running_xacts->nextXid. Once all running xacts have finished
1330 * (i.e. they're all >= nextXid), we have a complete catalog snapshot. It
1331 * might look that we could use xl_running_xacts's ->xids information to
1332 * get there quicker, but that is problematic because transactions marked
1333 * as running, might already have inserted their commit record - it's
1334 * infeasible to change that with locking.
1335 */
1336 else if (builder->state == SNAPBUILD_START)
1337 {
1339 builder->next_phase_at = running->nextXid;
1340
1341 /*
1342 * Start with an xmin/xmax that's correct for future, when all the
1343 * currently running transactions have finished. We'll update both
1344 * while waiting for the pending transactions to finish.
1345 */
1346 builder->xmin = running->nextXid; /* < are finished */
1347 builder->xmax = running->nextXid; /* >= are running */
1348
1349 /* so we can safely use the faster comparisons */
1352
1353 ereport(LOG,
1354 (errmsg("logical decoding found initial starting point at %X/%X",
1355 LSN_FORMAT_ARGS(lsn)),
1356 errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1357 running->xcnt, running->nextXid)));
1358
1359 SnapBuildWaitSnapshot(running, running->nextXid);
1360 }
1361
1362 /*
1363 * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
1364 *
1365 * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
1366 * is >= than nextXid from when we switched to BUILDING_SNAPSHOT. This
1367 * means all transactions starting afterwards have enough information to
1368 * be decoded. Switch to FULL_SNAPSHOT.
1369 */
1370 else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
1372 running->oldestRunningXid))
1373 {
1374 builder->state = SNAPBUILD_FULL_SNAPSHOT;
1375 builder->next_phase_at = running->nextXid;
1376
1377 ereport(LOG,
1378 (errmsg("logical decoding found initial consistent point at %X/%X",
1379 LSN_FORMAT_ARGS(lsn)),
1380 errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1381 running->xcnt, running->nextXid)));
1382
1383 SnapBuildWaitSnapshot(running, running->nextXid);
1384 }
1385
1386 /*
1387 * c) transition from FULL_SNAPSHOT to CONSISTENT.
1388 *
1389 * In FULL_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid is
1390 * >= than nextXid from when we switched to FULL_SNAPSHOT. This means all
1391 * transactions that are currently in progress have a catalog snapshot,
1392 * and all their changes have been collected. Switch to CONSISTENT.
1393 */
1394 else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
1396 running->oldestRunningXid))
1397 {
1398 builder->state = SNAPBUILD_CONSISTENT;
1400
1401 ereport(LOG,
1402 (errmsg("logical decoding found consistent point at %X/%X",
1403 LSN_FORMAT_ARGS(lsn)),
1404 errdetail("There are no old transactions anymore.")));
1405 }
1406
1407 /*
1408 * We already started to track running xacts and need to wait for all
1409 * in-progress ones to finish. We fall through to the normal processing of
1410 * records so incremental cleanup can be performed.
1411 */
1412 return true;
1413}
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1158
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1231
int errdetail(const char *fmt,...)
Definition: elog.c:1204
static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
Definition: snapbuild.c:1427
static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
Definition: snapbuild.c:1833
TransactionId oldestRunningXid
Definition: standbydefs.h:53
TransactionId nextXid
Definition: standbydefs.h:52
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:299
#define InvalidTransactionId
Definition: transam.h:31
#define NormalTransactionIdPrecedes(id1, id2)
Definition: transam.h:147

References Assert(), SnapBuild::building_full_snapshot, DEBUG1, ereport, errdetail(), errdetail_internal(), errmsg(), errmsg_internal(), SnapBuild::in_slot_creation, SnapBuild::initial_xmin_horizon, InvalidTransactionId, InvalidXLogRecPtr, LOG, LSN_FORMAT_ARGS, SnapBuild::next_phase_at, xl_running_xacts::nextXid, NormalTransactionIdPrecedes, xl_running_xacts::oldestRunningXid, SNAPBUILD_BUILDING_SNAPSHOT, SNAPBUILD_CONSISTENT, SNAPBUILD_FULL_SNAPSHOT, SNAPBUILD_START, SnapBuildRestore(), SnapBuildWaitSnapshot(), SnapBuild::start_decoding_at, SnapBuild::state, TransactionIdIsNormal, TransactionIdPrecedesOrEquals(), xl_running_xacts::xcnt, SnapBuild::xmax, and SnapBuild::xmin.

Referenced by SnapBuildProcessRunningXacts().

◆ SnapBuildFreeSnapshot()

static void SnapBuildFreeSnapshot ( Snapshot  snap)
static

Definition at line 252 of file snapbuild.c.

253{
254 /* make sure we don't get passed an external snapshot */
256
257 /* make sure nobody modified our snapshot */
258 Assert(snap->curcid == FirstCommandId);
259 Assert(!snap->suboverflowed);
261 Assert(snap->regd_count == 0);
262
263 /* slightly more likely, so it's checked even without c-asserts */
264 if (snap->copied)
265 elog(ERROR, "cannot free a copied snapshot");
266
267 if (snap->active_count)
268 elog(ERROR, "cannot free an active snapshot");
269
270 pfree(snap);
271}
void pfree(void *pointer)
Definition: mcxt.c:2146

References SnapshotData::active_count, Assert(), SnapshotData::copied, SnapshotData::curcid, elog, ERROR, FirstCommandId, pfree(), SnapshotData::regd_count, SNAPSHOT_HISTORIC_MVCC, SnapshotData::snapshot_type, SnapshotData::suboverflowed, and SnapshotData::takenDuringRecovery.

Referenced by SnapBuildSnapDecRefcount().

◆ SnapBuildGetOrBuildSnapshot()

Snapshot SnapBuildGetOrBuildSnapshot ( SnapBuild builder)

Definition at line 579 of file snapbuild.c.

580{
581 Assert(builder->state == SNAPBUILD_CONSISTENT);
582
583 /* only build a new snapshot if we don't have a prebuilt one */
584 if (builder->snapshot == NULL)
585 {
586 builder->snapshot = SnapBuildBuildSnapshot(builder);
587 /* increase refcount for the snapshot builder */
589 }
590
591 return builder->snapshot;
592}

References Assert(), SNAPBUILD_CONSISTENT, SnapBuildBuildSnapshot(), SnapBuildSnapIncRefcount(), SnapBuild::snapshot, and SnapBuild::state.

Referenced by logicalmsg_decode().

◆ SnapBuildGetTwoPhaseAt()

XLogRecPtr SnapBuildGetTwoPhaseAt ( SnapBuild builder)

Definition at line 286 of file snapbuild.c.

287{
288 return builder->two_phase_at;
289}

References SnapBuild::two_phase_at.

Referenced by DecodeCommit().

◆ SnapBuildInitialSnapshot()

Snapshot SnapBuildInitialSnapshot ( SnapBuild builder)

Definition at line 440 of file snapbuild.c.

441{
442 Snapshot snap;
443 TransactionId xid;
444 TransactionId safeXid;
445 TransactionId *newxip;
446 int newxcnt = 0;
447
450
451 /* don't allow older snapshots */
452 InvalidateCatalogSnapshot(); /* about to overwrite MyProc->xmin */
454 elog(ERROR, "cannot build an initial slot snapshot when snapshots exist");
456
457 if (builder->state != SNAPBUILD_CONSISTENT)
458 elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
459
461 elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
462
463 /* so we don't overwrite the existing value */
465 elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid");
466
467 snap = SnapBuildBuildSnapshot(builder);
468
469 /*
470 * We know that snap->xmin is alive, enforced by the logical xmin
471 * mechanism. Due to that we can do this without locks, we're only
472 * changing our own value.
473 *
474 * Building an initial snapshot is expensive and an unenforced xmin
475 * horizon would have bad consequences, therefore always double-check that
476 * the horizon is enforced.
477 */
478 LWLockAcquire(ProcArrayLock, LW_SHARED);
479 safeXid = GetOldestSafeDecodingTransactionId(false);
480 LWLockRelease(ProcArrayLock);
481
482 if (TransactionIdFollows(safeXid, snap->xmin))
483 elog(ERROR, "cannot build an initial slot snapshot as oldest safe xid %u follows snapshot's xmin %u",
484 safeXid, snap->xmin);
485
486 MyProc->xmin = snap->xmin;
487
488 /* allocate in transaction context */
489 newxip = (TransactionId *)
491
492 /*
493 * snapbuild.c builds transactions in an "inverted" manner, which means it
494 * stores committed transactions in ->xip, not ones in progress. Build a
495 * classical snapshot by marking all non-committed transactions as
496 * in-progress. This can be expensive.
497 */
498 for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
499 {
500 void *test;
501
502 /*
503 * Check whether transaction committed using the decoding snapshot
504 * meaning of ->xip.
505 */
506 test = bsearch(&xid, snap->xip, snap->xcnt,
508
509 if (test == NULL)
510 {
511 if (newxcnt >= GetMaxSnapshotXidCount())
514 errmsg("initial slot snapshot too large")));
515
516 newxip[newxcnt++] = xid;
517 }
518
520 }
521
522 /* adjust remaining snapshot fields as needed */
524 snap->xcnt = newxcnt;
525 snap->xip = newxip;
526
527 return snap;
528}
int errcode(int sqlerrcode)
Definition: elog.c:854
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1182
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1902
@ LW_SHARED
Definition: lwlock.h:115
void * palloc(Size size)
Definition: mcxt.c:1939
#define ERRCODE_T_R_SERIALIZATION_FAILURE
Definition: pgbench.c:77
static void test(void)
TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly)
Definition: procarray.c:2945
int GetMaxSnapshotXidCount(void)
Definition: procarray.c:2069
bool HistoricSnapshotActive(void)
Definition: snapmgr.c:1679
bool HaveRegisteredOrActiveSnapshot(void)
Definition: snapmgr.c:1631
void InvalidateCatalogSnapshot(void)
Definition: snapmgr.c:443
@ SNAPSHOT_MVCC
Definition: snapshot.h:46
PGPROC * MyProc
Definition: proc.c:67
TransactionId xmin
Definition: proc.h:178
bool TransactionIdFollows(TransactionId id1, TransactionId id2)
Definition: transam.c:314

References Assert(), SnapBuild::building_full_snapshot, SnapBuild::committed, elog, ereport, errcode(), ERRCODE_T_R_SERIALIZATION_FAILURE, errmsg(), ERROR, GetMaxSnapshotXidCount(), GetOldestSafeDecodingTransactionId(), HaveRegisteredOrActiveSnapshot(), HistoricSnapshotActive(), SnapBuild::includes_all_transactions, InvalidateCatalogSnapshot(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyProc, NormalTransactionIdPrecedes, palloc(), SNAPBUILD_CONSISTENT, SnapBuildBuildSnapshot(), SNAPSHOT_MVCC, SnapshotData::snapshot_type, SnapBuild::state, test(), TransactionIdAdvance, TransactionIdFollows(), TransactionIdIsValid, XACT_REPEATABLE_READ, XactIsoLevel, SnapshotData::xcnt, xidComparator(), SnapshotData::xip, SnapshotData::xmax, PGPROC::xmin, and SnapshotData::xmin.

Referenced by CreateReplicationSlot(), and SnapBuildExportSnapshot().

◆ SnapBuildProcessChange()

bool SnapBuildProcessChange ( SnapBuild builder,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 639 of file snapbuild.c.

640{
641 /*
642 * We can't handle data in transactions if we haven't built a snapshot
643 * yet, so don't store them.
644 */
645 if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
646 return false;
647
648 /*
649 * No point in keeping track of changes in transactions that we don't have
650 * enough information about to decode. This means that they started before
651 * we got into the SNAPBUILD_FULL_SNAPSHOT state.
652 */
653 if (builder->state < SNAPBUILD_CONSISTENT &&
655 return false;
656
657 /*
658 * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
659 * be needed to decode the change we're currently processing.
660 */
661 if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
662 {
663 /* only build a new snapshot if we don't have a prebuilt one */
664 if (builder->snapshot == NULL)
665 {
666 builder->snapshot = SnapBuildBuildSnapshot(builder);
667 /* increase refcount for the snapshot builder */
669 }
670
671 /*
672 * Increase refcount for the transaction we're handing the snapshot
673 * out to.
674 */
676 ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
677 builder->snapshot);
678 }
679
680 return true;
681}

References SnapBuild::next_phase_at, SnapBuild::reorder, ReorderBufferSetBaseSnapshot(), ReorderBufferXidHasBaseSnapshot(), SNAPBUILD_CONSISTENT, SNAPBUILD_FULL_SNAPSHOT, SnapBuildBuildSnapshot(), SnapBuildSnapIncRefcount(), SnapBuild::snapshot, SnapBuild::state, and TransactionIdPrecedes().

Referenced by heap2_decode(), heap_decode(), and logicalmsg_decode().

◆ SnapBuildProcessNewCid()

void SnapBuildProcessNewCid ( SnapBuild builder,
TransactionId  xid,
XLogRecPtr  lsn,
xl_heap_new_cid xlrec 
)

Definition at line 689 of file snapbuild.c.

691{
692 CommandId cid;
693
694 /*
695 * we only log new_cid's if a catalog tuple was modified, so mark the
696 * transaction as containing catalog modifications
697 */
698 ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
699
700 ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
701 xlrec->target_locator, xlrec->target_tid,
702 xlrec->cmin, xlrec->cmax,
703 xlrec->combocid);
704
705 /* figure out new command id */
706 if (xlrec->cmin != InvalidCommandId &&
707 xlrec->cmax != InvalidCommandId)
708 cid = Max(xlrec->cmin, xlrec->cmax);
709 else if (xlrec->cmax != InvalidCommandId)
710 cid = xlrec->cmax;
711 else if (xlrec->cmin != InvalidCommandId)
712 cid = xlrec->cmin;
713 else
714 {
715 cid = InvalidCommandId; /* silence compiler */
716 elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
717 }
718
719 ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
720}
#define InvalidCommandId
Definition: c.h:640
#define Max(x, y)
Definition: c.h:969
uint32 CommandId
Definition: c.h:637
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
void ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileLocator locator, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
CommandId cmin
Definition: heapam_xlog.h:459
CommandId combocid
Definition: heapam_xlog.h:461
ItemPointerData target_tid
Definition: heapam_xlog.h:467
TransactionId top_xid
Definition: heapam_xlog.h:458
CommandId cmax
Definition: heapam_xlog.h:460
RelFileLocator target_locator
Definition: heapam_xlog.h:466

References xl_heap_new_cid::cmax, xl_heap_new_cid::cmin, xl_heap_new_cid::combocid, elog, ERROR, InvalidCommandId, Max, SnapBuild::reorder, ReorderBufferAddNewCommandId(), ReorderBufferAddNewTupleCids(), ReorderBufferXidSetCatalogChanges(), xl_heap_new_cid::target_locator, xl_heap_new_cid::target_tid, and xl_heap_new_cid::top_xid.

Referenced by heap2_decode().

◆ SnapBuildProcessRunningXacts()

void SnapBuildProcessRunningXacts ( SnapBuild builder,
XLogRecPtr  lsn,
xl_running_xacts running 
)

Definition at line 1128 of file snapbuild.c.

1129{
1130 ReorderBufferTXN *txn;
1131 TransactionId xmin;
1132
1133 /*
1134 * If we're not consistent yet, inspect the record to see whether it
1135 * allows to get closer to being consistent. If we are consistent, dump
1136 * our snapshot so others or we, after a restart, can use it.
1137 */
1138 if (builder->state < SNAPBUILD_CONSISTENT)
1139 {
1140 /* returns false if there's no point in performing cleanup just yet */
1141 if (!SnapBuildFindSnapshot(builder, lsn, running))
1142 return;
1143 }
1144 else
1145 SnapBuildSerialize(builder, lsn);
1146
1147 /*
1148 * Update range of interesting xids based on the running xacts
1149 * information. We don't increase ->xmax using it, because once we are in
1150 * a consistent state we can do that ourselves and much more efficiently
1151 * so, because we only need to do it for catalog transactions since we
1152 * only ever look at those.
1153 *
1154 * NB: We only increase xmax when a catalog modifying transaction commits
1155 * (see SnapBuildCommitTxn). Because of this, xmax can be lower than
1156 * xmin, which looks odd but is correct and actually more efficient, since
1157 * we hit fast paths in heapam_visibility.c.
1158 */
1159 builder->xmin = running->oldestRunningXid;
1160
1161 /* Remove transactions we don't need to keep track off anymore */
1162 SnapBuildPurgeOlderTxn(builder);
1163
1164 /*
1165 * Advance the xmin limit for the current replication slot, to allow
1166 * vacuum to clean up the tuples this slot has been protecting.
1167 *
1168 * The reorderbuffer might have an xmin among the currently running
1169 * snapshots; use it if so. If not, we need only consider the snapshots
1170 * we'll produce later, which can't be less than the oldest running xid in
1171 * the record we're reading now.
1172 */
1173 xmin = ReorderBufferGetOldestXmin(builder->reorder);
1174 if (xmin == InvalidTransactionId)
1175 xmin = running->oldestRunningXid;
1176 elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
1177 builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
1178 LogicalIncreaseXminForSlot(lsn, xmin);
1179
1180 /*
1181 * Also tell the slot where we can restart decoding from. We don't want to
1182 * do that after every commit because changing that implies an fsync of
1183 * the logical slot's state file, so we only do it every time we see a
1184 * running xacts record.
1185 *
1186 * Do so by looking for the oldest in progress transaction (determined by
1187 * the first LSN of any of its relevant records). Every transaction
1188 * remembers the last location we stored the snapshot to disk before its
1189 * beginning. That point is where we can restart from.
1190 */
1191
1192 /*
1193 * Can't know about a serialized snapshot's location if we're not
1194 * consistent.
1195 */
1196 if (builder->state < SNAPBUILD_CONSISTENT)
1197 return;
1198
1199 txn = ReorderBufferGetOldestTXN(builder->reorder);
1200
1201 /*
1202 * oldest ongoing txn might have started when we didn't yet serialize
1203 * anything because we hadn't reached a consistent state yet.
1204 */
1205 if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
1207
1208 /*
1209 * No in-progress transaction, can reuse the last serialized snapshot if
1210 * we have one.
1211 */
1212 else if (txn == NULL &&
1216 builder->last_serialized_snapshot);
1217}
#define DEBUG3
Definition: elog.h:28
void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
Definition: logical.c:1742
void LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
Definition: logical.c:1674
TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb)
ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *rb)
static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
Definition: snapbuild.c:1489
static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
Definition: snapbuild.c:1230
static void SnapBuildPurgeOlderTxn(SnapBuild *builder)
Definition: snapbuild.c:855
XLogRecPtr restart_decoding_lsn
XLogRecPtr current_restart_decoding_lsn
XLogRecPtr last_serialized_snapshot

References ReorderBuffer::current_restart_decoding_lsn, DEBUG3, elog, InvalidTransactionId, InvalidXLogRecPtr, SnapBuild::last_serialized_snapshot, LogicalIncreaseRestartDecodingForSlot(), LogicalIncreaseXminForSlot(), xl_running_xacts::oldestRunningXid, SnapBuild::reorder, ReorderBufferGetOldestTXN(), ReorderBufferGetOldestXmin(), ReorderBufferTXN::restart_decoding_lsn, SNAPBUILD_CONSISTENT, SnapBuildFindSnapshot(), SnapBuildPurgeOlderTxn(), SnapBuildSerialize(), SnapBuild::state, SnapBuild::xmax, and SnapBuild::xmin.

Referenced by standby_decode().

◆ SnapBuildPurgeOlderTxn()

static void SnapBuildPurgeOlderTxn ( SnapBuild builder)
static

Definition at line 855 of file snapbuild.c.

856{
857 int off;
858 TransactionId *workspace;
859 int surviving_xids = 0;
860
861 /* not ready yet */
862 if (!TransactionIdIsNormal(builder->xmin))
863 return;
864
865 /* TODO: Neater algorithm than just copying and iterating? */
866 workspace =
868 builder->committed.xcnt * sizeof(TransactionId));
869
870 /* copy xids that still are interesting to workspace */
871 for (off = 0; off < builder->committed.xcnt; off++)
872 {
873 if (NormalTransactionIdPrecedes(builder->committed.xip[off],
874 builder->xmin))
875 ; /* remove */
876 else
877 workspace[surviving_xids++] = builder->committed.xip[off];
878 }
879
880 /* copy workspace back to persistent state */
881 memcpy(builder->committed.xip, workspace,
882 surviving_xids * sizeof(TransactionId));
883
884 elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
885 (uint32) builder->committed.xcnt, (uint32) surviving_xids,
886 builder->xmin, builder->xmax);
887 builder->committed.xcnt = surviving_xids;
888
889 pfree(workspace);
890
891 /*
892 * Purge xids in ->catchange as well. The purged array must also be sorted
893 * in xidComparator order.
894 */
895 if (builder->catchange.xcnt > 0)
896 {
897 /*
898 * Since catchange.xip is sorted, we find the lower bound of xids that
899 * are still interesting.
900 */
901 for (off = 0; off < builder->catchange.xcnt; off++)
902 {
904 builder->xmin))
905 break;
906 }
907
908 surviving_xids = builder->catchange.xcnt - off;
909
910 if (surviving_xids > 0)
911 {
912 memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
913 surviving_xids * sizeof(TransactionId));
914 }
915 else
916 {
917 pfree(builder->catchange.xip);
918 builder->catchange.xip = NULL;
919 }
920
921 elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
922 (uint32) builder->catchange.xcnt, (uint32) surviving_xids,
923 builder->xmin, builder->xmax);
924 builder->catchange.xcnt = surviving_xids;
925 }
926}
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1256

References SnapBuild::catchange, SnapBuild::committed, SnapBuild::context, DEBUG3, elog, MemoryContextAlloc(), NormalTransactionIdPrecedes, pfree(), TransactionIdFollowsOrEquals(), TransactionIdIsNormal, SnapBuild::xcnt, SnapBuild::xip, SnapBuild::xmax, and SnapBuild::xmin.

Referenced by SnapBuildProcessRunningXacts().

◆ SnapBuildResetExportedSnapshotState()

void SnapBuildResetExportedSnapshotState ( void  )

Definition at line 627 of file snapbuild.c.

628{
630 ExportInProgress = false;
631}

References ExportInProgress, and SavedResourceOwnerDuringExport.

Referenced by AbortTransaction().

◆ SnapBuildRestore()

static bool SnapBuildRestore ( SnapBuild builder,
XLogRecPtr  lsn 
)
static

Definition at line 1833 of file snapbuild.c.

1834{
1835 SnapBuildOnDisk ondisk;
1836
1837 /* no point in loading a snapshot if we're already there */
1838 if (builder->state == SNAPBUILD_CONSISTENT)
1839 return false;
1840
1841 /* validate and restore the snapshot to 'ondisk' */
1842 if (!SnapBuildRestoreSnapshot(&ondisk, lsn, builder->context, true))
1843 return false;
1844
1845 /*
1846 * ok, we now have a sensible snapshot here, figure out if it has more
1847 * information than we have.
1848 */
1849
1850 /*
1851 * We are only interested in consistent snapshots for now, comparing
1852 * whether one incomplete snapshot is more "advanced" seems to be
1853 * unnecessarily complex.
1854 */
1855 if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
1856 goto snapshot_not_interesting;
1857
1858 /*
1859 * Don't use a snapshot that requires an xmin that we cannot guarantee to
1860 * be available.
1861 */
1863 goto snapshot_not_interesting;
1864
1865 /*
1866 * Consistent snapshots have no next phase. Reset next_phase_at as it is
1867 * possible that an old value may remain.
1868 */
1871
1872 /* ok, we think the snapshot is sensible, copy over everything important */
1873 builder->xmin = ondisk.builder.xmin;
1874 builder->xmax = ondisk.builder.xmax;
1875 builder->state = ondisk.builder.state;
1876
1877 builder->committed.xcnt = ondisk.builder.committed.xcnt;
1878 /* We only allocated/stored xcnt, not xcnt_space xids ! */
1879 /* don't overwrite preallocated xip, if we don't have anything here */
1880 if (builder->committed.xcnt > 0)
1881 {
1882 pfree(builder->committed.xip);
1883 builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
1884 builder->committed.xip = ondisk.builder.committed.xip;
1885 }
1886 ondisk.builder.committed.xip = NULL;
1887
1888 /* set catalog modifying transactions */
1889 if (builder->catchange.xip)
1890 pfree(builder->catchange.xip);
1891 builder->catchange.xcnt = ondisk.builder.catchange.xcnt;
1892 builder->catchange.xip = ondisk.builder.catchange.xip;
1893 ondisk.builder.catchange.xip = NULL;
1894
1895 /* our snapshot is not interesting anymore, build a new one */
1896 if (builder->snapshot != NULL)
1897 {
1899 }
1900 builder->snapshot = SnapBuildBuildSnapshot(builder);
1902
1904
1905 Assert(builder->state == SNAPBUILD_CONSISTENT);
1906
1907 ereport(LOG,
1908 (errmsg("logical decoding found consistent point at %X/%X",
1909 LSN_FORMAT_ARGS(lsn)),
1910 errdetail("Logical decoding will begin using saved snapshot.")));
1911 return true;
1912
1913snapshot_not_interesting:
1914 if (ondisk.builder.committed.xip != NULL)
1915 pfree(ondisk.builder.committed.xip);
1916 if (ondisk.builder.catchange.xip != NULL)
1917 pfree(ondisk.builder.catchange.xip);
1918 return false;
1919}
void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
bool SnapBuildRestoreSnapshot(SnapBuildOnDisk *ondisk, XLogRecPtr lsn, MemoryContext context, bool missing_ok)
Definition: snapbuild.c:1734

References Assert(), SnapBuildOnDisk::builder, SnapBuild::catchange, SnapBuild::committed, SnapBuild::context, ereport, errdetail(), errmsg(), SnapBuild::initial_xmin_horizon, InvalidTransactionId, LOG, LSN_FORMAT_ARGS, SnapBuild::next_phase_at, pfree(), SnapBuild::reorder, ReorderBufferSetRestartPoint(), SNAPBUILD_CONSISTENT, SnapBuildBuildSnapshot(), SnapBuildRestoreSnapshot(), SnapBuildSnapDecRefcount(), SnapBuildSnapIncRefcount(), SnapBuild::snapshot, SnapBuild::state, TransactionIdPrecedes(), SnapBuild::xcnt, SnapBuild::xcnt_space, SnapBuild::xip, SnapBuild::xmax, and SnapBuild::xmin.

Referenced by SnapBuildFindSnapshot(), and SnapBuildSerializationPoint().

◆ SnapBuildRestoreContents()

static void SnapBuildRestoreContents ( int  fd,
void *  dest,
Size  size,
const char *  path 
)
static

Definition at line 1925 of file snapbuild.c.

1926{
1927 int readBytes;
1928
1929 pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1930 readBytes = read(fd, dest, size);
1932 if (readBytes != size)
1933 {
1934 int save_errno = errno;
1935
1937
1938 if (readBytes < 0)
1939 {
1940 errno = save_errno;
1941 ereport(ERROR,
1943 errmsg("could not read file \"%s\": %m", path)));
1944 }
1945 else
1946 ereport(ERROR,
1948 errmsg("could not read file \"%s\": read %d of %zu",
1949 path, readBytes, size)));
1950 }
1951}
int CloseTransientFile(int fd)
Definition: fd.c:2871
#define read(a, b, c)
Definition: win32.h:13
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
static int fd(const char *x, int i)
Definition: preproc-init.c:105
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:85
static void pgstat_report_wait_end(void)
Definition: wait_event.h:101

References CloseTransientFile(), generate_unaccent_rules::dest, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fd(), pgstat_report_wait_end(), pgstat_report_wait_start(), and read.

Referenced by SnapBuildRestoreSnapshot().

◆ SnapBuildRestoreSnapshot()

bool SnapBuildRestoreSnapshot ( SnapBuildOnDisk ondisk,
XLogRecPtr  lsn,
MemoryContext  context,
bool  missing_ok 
)

Definition at line 1734 of file snapbuild.c.

1736{
1737 int fd;
1738 pg_crc32c checksum;
1739 Size sz;
1740 char path[MAXPGPATH];
1741
1742 sprintf(path, "%s/%X-%X.snap",
1744 LSN_FORMAT_ARGS(lsn));
1745
1746 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1747
1748 if (fd < 0)
1749 {
1750 if (missing_ok && errno == ENOENT)
1751 return false;
1752
1753 ereport(ERROR,
1755 errmsg("could not open file \"%s\": %m", path)));
1756 }
1757
1758 /* ----
1759 * Make sure the snapshot had been stored safely to disk, that's normally
1760 * cheap.
1761 * Note that we do not need PANIC here, nobody will be able to use the
1762 * slot without fsyncing, and saving it won't succeed without an fsync()
1763 * either...
1764 * ----
1765 */
1766 fsync_fname(path, false);
1768
1769 /* read statically sized portion of snapshot */
1771
1772 if (ondisk->magic != SNAPBUILD_MAGIC)
1773 ereport(ERROR,
1775 errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
1776 path, ondisk->magic, SNAPBUILD_MAGIC)));
1777
1778 if (ondisk->version != SNAPBUILD_VERSION)
1779 ereport(ERROR,
1781 errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
1782 path, ondisk->version, SNAPBUILD_VERSION)));
1783
1784 INIT_CRC32C(checksum);
1785 COMP_CRC32C(checksum,
1786 ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1788
1789 /* read SnapBuild */
1790 SnapBuildRestoreContents(fd, &ondisk->builder, sizeof(SnapBuild), path);
1791 COMP_CRC32C(checksum, &ondisk->builder, sizeof(SnapBuild));
1792
1793 /* restore committed xacts information */
1794 if (ondisk->builder.committed.xcnt > 0)
1795 {
1796 sz = sizeof(TransactionId) * ondisk->builder.committed.xcnt;
1797 ondisk->builder.committed.xip = MemoryContextAllocZero(context, sz);
1798 SnapBuildRestoreContents(fd, ondisk->builder.committed.xip, sz, path);
1799 COMP_CRC32C(checksum, ondisk->builder.committed.xip, sz);
1800 }
1801
1802 /* restore catalog modifying xacts information */
1803 if (ondisk->builder.catchange.xcnt > 0)
1804 {
1805 sz = sizeof(TransactionId) * ondisk->builder.catchange.xcnt;
1806 ondisk->builder.catchange.xip = MemoryContextAllocZero(context, sz);
1807 SnapBuildRestoreContents(fd, ondisk->builder.catchange.xip, sz, path);
1808 COMP_CRC32C(checksum, ondisk->builder.catchange.xip, sz);
1809 }
1810
1811 if (CloseTransientFile(fd) != 0)
1812 ereport(ERROR,
1814 errmsg("could not close file \"%s\": %m", path)));
1815
1816 FIN_CRC32C(checksum);
1817
1818 /* verify checksum of what we've read */
1819 if (!EQ_CRC32C(checksum, ondisk->checksum))
1820 ereport(ERROR,
1822 errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
1823 path, checksum, ondisk->checksum)));
1824
1825 return true;
1826}
#define PG_BINARY
Definition: c.h:1244
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:756
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2694
uint32 pg_crc32c
Definition: pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:153
#define EQ_CRC32C(c1, c2)
Definition: pg_crc32c.h:42
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:158
#define sprintf
Definition: port.h:241
#define SNAPBUILD_VERSION
Definition: snapbuild.c:1467
#define SnapBuildOnDiskNotChecksummedSize
Definition: snapbuild.c:1463
#define SNAPBUILD_MAGIC
Definition: snapbuild.c:1466
#define SnapBuildOnDiskConstantSize
Definition: snapbuild.c:1461
static void SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path)
Definition: snapbuild.c:1925

References SnapBuildOnDisk::builder, SnapBuild::catchange, SnapBuildOnDisk::checksum, CloseTransientFile(), SnapBuild::committed, COMP_CRC32C, EQ_CRC32C, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fd(), FIN_CRC32C, fsync_fname(), INIT_CRC32C, LSN_FORMAT_ARGS, SnapBuildOnDisk::magic, MAXPGPATH, MemoryContextAllocZero(), OpenTransientFile(), PG_BINARY, PG_LOGICAL_SNAPSHOTS_DIR, SNAPBUILD_MAGIC, SNAPBUILD_VERSION, SnapBuildOnDiskConstantSize, SnapBuildOnDiskNotChecksummedSize, SnapBuildRestoreContents(), sprintf, SnapBuildOnDisk::version, SnapBuild::xcnt, and SnapBuild::xip.

Referenced by pg_get_logical_snapshot_info(), pg_get_logical_snapshot_meta(), and SnapBuildRestore().

◆ SnapBuildSerializationPoint()

void SnapBuildSerializationPoint ( SnapBuild builder,
XLogRecPtr  lsn 
)

Definition at line 1476 of file snapbuild.c.

1477{
1478 if (builder->state < SNAPBUILD_CONSISTENT)
1479 SnapBuildRestore(builder, lsn);
1480 else
1481 SnapBuildSerialize(builder, lsn);
1482}

References SNAPBUILD_CONSISTENT, SnapBuildRestore(), SnapBuildSerialize(), and SnapBuild::state.

Referenced by xlog_decode().

◆ SnapBuildSerialize()

static void SnapBuildSerialize ( SnapBuild builder,
XLogRecPtr  lsn 
)
static

Definition at line 1489 of file snapbuild.c.

1490{
1491 Size needed_length;
1492 SnapBuildOnDisk *ondisk = NULL;
1493 TransactionId *catchange_xip = NULL;
1494 MemoryContext old_ctx;
1495 size_t catchange_xcnt;
1496 char *ondisk_c;
1497 int fd;
1498 char tmppath[MAXPGPATH];
1499 char path[MAXPGPATH];
1500 int ret;
1501 struct stat stat_buf;
1502 Size sz;
1503
1504 Assert(lsn != InvalidXLogRecPtr);
1506 builder->last_serialized_snapshot <= lsn);
1507
1508 /*
1509 * no point in serializing if we cannot continue to work immediately after
1510 * restoring the snapshot
1511 */
1512 if (builder->state < SNAPBUILD_CONSISTENT)
1513 return;
1514
1515 /* consistent snapshots have no next phase */
1517
1518 /*
1519 * We identify snapshots by the LSN they are valid for. We don't need to
1520 * include timelines in the name as each LSN maps to exactly one timeline
1521 * unless the user used pg_resetwal or similar. If a user did so, there's
1522 * no hope continuing to decode anyway.
1523 */
1524 sprintf(path, "%s/%X-%X.snap",
1526 LSN_FORMAT_ARGS(lsn));
1527
1528 /*
1529 * first check whether some other backend already has written the snapshot
1530 * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
1531 * as a valid state. Everything else is an unexpected error.
1532 */
1533 ret = stat(path, &stat_buf);
1534
1535 if (ret != 0 && errno != ENOENT)
1536 ereport(ERROR,
1538 errmsg("could not stat file \"%s\": %m", path)));
1539
1540 else if (ret == 0)
1541 {
1542 /*
1543 * somebody else has already serialized to this point, don't overwrite
1544 * but remember location, so we don't need to read old data again.
1545 *
1546 * To be sure it has been synced to disk after the rename() from the
1547 * tempfile filename to the real filename, we just repeat the fsync.
1548 * That ought to be cheap because in most scenarios it should already
1549 * be safely on disk.
1550 */
1551 fsync_fname(path, false);
1553
1554 builder->last_serialized_snapshot = lsn;
1555 goto out;
1556 }
1557
1558 /*
1559 * there is an obvious race condition here between the time we stat(2) the
1560 * file and us writing the file. But we rename the file into place
1561 * atomically and all files created need to contain the same data anyway,
1562 * so this is perfectly fine, although a bit of a resource waste. Locking
1563 * seems like pointless complication.
1564 */
1565 elog(DEBUG1, "serializing snapshot to %s", path);
1566
1567 /* to make sure only we will write to this tempfile, include pid */
1568 sprintf(tmppath, "%s/%X-%X.snap.%d.tmp",
1571
1572 /*
1573 * Unlink temporary file if it already exists, needs to have been before a
1574 * crash/error since we won't enter this function twice from within a
1575 * single decoding slot/backend and the temporary file contains the pid of
1576 * the current process.
1577 */
1578 if (unlink(tmppath) != 0 && errno != ENOENT)
1579 ereport(ERROR,
1581 errmsg("could not remove file \"%s\": %m", tmppath)));
1582
1583 old_ctx = MemoryContextSwitchTo(builder->context);
1584
1585 /* Get the catalog modifying transactions that are yet not committed */
1586 catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder);
1587 catchange_xcnt = dclist_count(&builder->reorder->catchange_txns);
1588
1589 needed_length = sizeof(SnapBuildOnDisk) +
1590 sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
1591
1592 ondisk_c = palloc0(needed_length);
1593 ondisk = (SnapBuildOnDisk *) ondisk_c;
1594 ondisk->magic = SNAPBUILD_MAGIC;
1595 ondisk->version = SNAPBUILD_VERSION;
1596 ondisk->length = needed_length;
1597 INIT_CRC32C(ondisk->checksum);
1598 COMP_CRC32C(ondisk->checksum,
1599 ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1601 ondisk_c += sizeof(SnapBuildOnDisk);
1602
1603 memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
1604 /* NULL-ify memory-only data */
1605 ondisk->builder.context = NULL;
1606 ondisk->builder.snapshot = NULL;
1607 ondisk->builder.reorder = NULL;
1608 ondisk->builder.committed.xip = NULL;
1609 ondisk->builder.catchange.xip = NULL;
1610 /* update catchange only on disk data */
1611 ondisk->builder.catchange.xcnt = catchange_xcnt;
1612
1613 COMP_CRC32C(ondisk->checksum,
1614 &ondisk->builder,
1615 sizeof(SnapBuild));
1616
1617 /* copy committed xacts */
1618 if (builder->committed.xcnt > 0)
1619 {
1620 sz = sizeof(TransactionId) * builder->committed.xcnt;
1621 memcpy(ondisk_c, builder->committed.xip, sz);
1622 COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1623 ondisk_c += sz;
1624 }
1625
1626 /* copy catalog modifying xacts */
1627 if (catchange_xcnt > 0)
1628 {
1629 sz = sizeof(TransactionId) * catchange_xcnt;
1630 memcpy(ondisk_c, catchange_xip, sz);
1631 COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1632 ondisk_c += sz;
1633 }
1634
1635 FIN_CRC32C(ondisk->checksum);
1636
1637 /* we have valid data now, open tempfile and write it there */
1638 fd = OpenTransientFile(tmppath,
1639 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1640 if (fd < 0)
1641 ereport(ERROR,
1643 errmsg("could not open file \"%s\": %m", tmppath)));
1644
1645 errno = 0;
1646 pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
1647 if ((write(fd, ondisk, needed_length)) != needed_length)
1648 {
1649 int save_errno = errno;
1650
1652
1653 /* if write didn't set errno, assume problem is no disk space */
1654 errno = save_errno ? save_errno : ENOSPC;
1655 ereport(ERROR,
1657 errmsg("could not write to file \"%s\": %m", tmppath)));
1658 }
1660
1661 /*
1662 * fsync the file before renaming so that even if we crash after this we
1663 * have either a fully valid file or nothing.
1664 *
1665 * It's safe to just ERROR on fsync() here because we'll retry the whole
1666 * operation including the writes.
1667 *
1668 * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
1669 * some noticeable overhead since it's performed synchronously during
1670 * decoding?
1671 */
1672 pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC);
1673 if (pg_fsync(fd) != 0)
1674 {
1675 int save_errno = errno;
1676
1678 errno = save_errno;
1679 ereport(ERROR,
1681 errmsg("could not fsync file \"%s\": %m", tmppath)));
1682 }
1684
1685 if (CloseTransientFile(fd) != 0)
1686 ereport(ERROR,
1688 errmsg("could not close file \"%s\": %m", tmppath)));
1689
1691
1692 /*
1693 * We may overwrite the work from some other backend, but that's ok, our
1694 * snapshot is valid as well, we'll just have done some superfluous work.
1695 */
1696 if (rename(tmppath, path) != 0)
1697 {
1698 ereport(ERROR,
1700 errmsg("could not rename file \"%s\" to \"%s\": %m",
1701 tmppath, path)));
1702 }
1703
1704 /* make sure we persist */
1705 fsync_fname(path, false);
1707
1708 /*
1709 * Now there's no way we can lose the dumped state anymore, remember this
1710 * as a serialization point.
1711 */
1712 builder->last_serialized_snapshot = lsn;
1713
1714 MemoryContextSwitchTo(old_ctx);
1715
1716out:
1718 builder->last_serialized_snapshot);
1719 /* be tidy */
1720 if (ondisk)
1721 pfree(ondisk);
1722 if (catchange_xip)
1723 pfree(catchange_xip);
1724}
int pg_fsync(int fd)
Definition: fd.c:386
int MyProcPid
Definition: globals.c:48
static uint32 dclist_count(const dclist_head *head)
Definition: ilist.h:932
#define write(a, b, c)
Definition: win32.h:14
TransactionId * ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb)
struct SnapBuildOnDisk SnapBuildOnDisk
dclist_head catchange_txns
#define stat
Definition: win32_port.h:274

References Assert(), SnapBuildOnDisk::builder, SnapBuild::catchange, ReorderBuffer::catchange_txns, SnapBuildOnDisk::checksum, CloseTransientFile(), SnapBuild::committed, COMP_CRC32C, SnapBuild::context, dclist_count(), DEBUG1, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FIN_CRC32C, fsync_fname(), INIT_CRC32C, InvalidTransactionId, InvalidXLogRecPtr, SnapBuild::last_serialized_snapshot, SnapBuildOnDisk::length, LSN_FORMAT_ARGS, SnapBuildOnDisk::magic, MAXPGPATH, MemoryContextSwitchTo(), MyProcPid, SnapBuild::next_phase_at, OpenTransientFile(), palloc0(), pfree(), PG_BINARY, pg_fsync(), PG_LOGICAL_SNAPSHOTS_DIR, pgstat_report_wait_end(), pgstat_report_wait_start(), SnapBuild::reorder, ReorderBufferGetCatalogChangesXacts(), ReorderBufferSetRestartPoint(), SNAPBUILD_CONSISTENT, SNAPBUILD_MAGIC, SNAPBUILD_VERSION, SnapBuildOnDiskConstantSize, SnapBuildOnDiskNotChecksummedSize, SnapBuild::snapshot, sprintf, stat, SnapBuild::state, SnapBuildOnDisk::version, write, SnapBuild::xcnt, and SnapBuild::xip.

Referenced by SnapBuildProcessRunningXacts(), and SnapBuildSerializationPoint().

◆ SnapBuildSetTwoPhaseAt()

void SnapBuildSetTwoPhaseAt ( SnapBuild builder,
XLogRecPtr  ptr 
)

Definition at line 295 of file snapbuild.c.

296{
297 builder->two_phase_at = ptr;
298}

References SnapBuild::two_phase_at.

Referenced by CreateDecodingContext().

◆ SnapBuildSnapDecRefcount()

void SnapBuildSnapDecRefcount ( Snapshot  snap)

Definition at line 328 of file snapbuild.c.

329{
330 /* make sure we don't get passed an external snapshot */
332
333 /* make sure nobody modified our snapshot */
334 Assert(snap->curcid == FirstCommandId);
335 Assert(!snap->suboverflowed);
337
338 Assert(snap->regd_count == 0);
339
340 Assert(snap->active_count > 0);
341
342 /* slightly more likely, so it's checked even without casserts */
343 if (snap->copied)
344 elog(ERROR, "cannot free a copied snapshot");
345
346 snap->active_count--;
347 if (snap->active_count == 0)
349}
static void SnapBuildFreeSnapshot(Snapshot snap)
Definition: snapbuild.c:252

References SnapshotData::active_count, Assert(), SnapshotData::copied, SnapshotData::curcid, elog, ERROR, FirstCommandId, SnapshotData::regd_count, SnapBuildFreeSnapshot(), SNAPSHOT_HISTORIC_MVCC, SnapshotData::snapshot_type, SnapshotData::suboverflowed, and SnapshotData::takenDuringRecovery.

Referenced by FreeSnapshotBuilder(), ReorderBufferCleanupTXN(), ReorderBufferFreeSnap(), ReorderBufferTransferSnapToParent(), SnapBuildCommitTxn(), and SnapBuildRestore().

◆ SnapBuildSnapIncRefcount()

static void SnapBuildSnapIncRefcount ( Snapshot  snap)
static

◆ SnapBuildSnapshotExists()

bool SnapBuildSnapshotExists ( XLogRecPtr  lsn)

Definition at line 2050 of file snapbuild.c.

2051{
2052 char path[MAXPGPATH];
2053 int ret;
2054 struct stat stat_buf;
2055
2056 sprintf(path, "%s/%X-%X.snap",
2058 LSN_FORMAT_ARGS(lsn));
2059
2060 ret = stat(path, &stat_buf);
2061
2062 if (ret != 0 && errno != ENOENT)
2063 ereport(ERROR,
2065 errmsg("could not stat file \"%s\": %m", path)));
2066
2067 return ret == 0;
2068}

References ereport, errcode_for_file_access(), errmsg(), ERROR, LSN_FORMAT_ARGS, MAXPGPATH, PG_LOGICAL_SNAPSHOTS_DIR, sprintf, and stat.

Referenced by update_local_synced_slot().

◆ SnapBuildWaitSnapshot()

static void SnapBuildWaitSnapshot ( xl_running_xacts running,
TransactionId  cutoff 
)
static

Definition at line 1427 of file snapbuild.c.

1428{
1429 int off;
1430
1431 for (off = 0; off < running->xcnt; off++)
1432 {
1433 TransactionId xid = running->xids[off];
1434
1435 /*
1436 * Upper layers should prevent that we ever need to wait on ourselves.
1437 * Check anyway, since failing to do so would either result in an
1438 * endless wait or an Assert() failure.
1439 */
1441 elog(ERROR, "waiting for ourselves");
1442
1443 if (TransactionIdFollows(xid, cutoff))
1444 continue;
1445
1446 XactLockTableWait(xid, NULL, NULL, XLTW_None);
1447 }
1448
1449 /*
1450 * All transactions we needed to finish finished - try to ensure there is
1451 * another xl_running_xacts record in a timely manner, without having to
1452 * wait for bgwriter or checkpointer to log one. During recovery we can't
1453 * enforce that, so we'll have to wait.
1454 */
1455 if (!RecoveryInProgress())
1456 {
1458 }
1459}
void XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid, XLTW_Oper oper)
Definition: lmgr.c:663
@ XLTW_None
Definition: lmgr.h:26
XLogRecPtr LogStandbySnapshot(void)
Definition: standby.c:1282
TransactionId xids[FLEXIBLE_ARRAY_MEMBER]
Definition: standbydefs.h:56
bool TransactionIdIsCurrentTransactionId(TransactionId xid)
Definition: xact.c:941
bool RecoveryInProgress(void)
Definition: xlog.c:6522

References elog, ERROR, LogStandbySnapshot(), RecoveryInProgress(), TransactionIdFollows(), TransactionIdIsCurrentTransactionId(), XactLockTableWait(), xl_running_xacts::xcnt, xl_running_xacts::xids, and XLTW_None.

Referenced by SnapBuildFindSnapshot().

◆ SnapBuildXactNeedsSkip()

bool SnapBuildXactNeedsSkip ( SnapBuild builder,
XLogRecPtr  ptr 
)

Definition at line 304 of file snapbuild.c.

305{
306 return ptr < builder->start_decoding_at;
307}

References SnapBuild::start_decoding_at.

Referenced by AssertTXNLsnOrder(), DecodeTXNNeedSkip(), logicalmsg_decode(), and ReorderBufferCanStartStreaming().

◆ SnapBuildXidHasCatalogChanges()

static bool SnapBuildXidHasCatalogChanges ( SnapBuild builder,
TransactionId  xid,
uint32  xinfo 
)
inlinestatic

Definition at line 1098 of file snapbuild.c.

1100{
1101 if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
1102 return true;
1103
1104 /*
1105 * The transactions that have changed catalogs must have invalidation
1106 * info.
1107 */
1108 if (!(xinfo & XACT_XINFO_HAS_INVALS))
1109 return false;
1110
1111 /* Check the catchange XID array */
1112 return ((builder->catchange.xcnt > 0) &&
1113 (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt,
1114 sizeof(TransactionId), xidComparator) != NULL));
1115}
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
#define XACT_XINFO_HAS_INVALS
Definition: xact.h:191

References SnapBuild::catchange, SnapBuild::reorder, ReorderBufferXidHasCatalogChanges(), XACT_XINFO_HAS_INVALS, SnapBuild::xcnt, xidComparator(), and SnapBuild::xip.

Referenced by SnapBuildCommitTxn().

Variable Documentation

◆ ExportInProgress

bool ExportInProgress = false
static

◆ SavedResourceOwnerDuringExport

ResourceOwner SavedResourceOwnerDuringExport = NULL
static