PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
snapbuild.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "miscadmin.h"
#include "access/heapam_xlog.h"
#include "access/transam.h"
#include "access/xact.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/snapshot.h"
#include "utils/snapmgr.h"
#include "utils/tqual.h"
#include "storage/block.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/standby.h"
Include dependency graph for snapbuild.c:

Go to the source code of this file.

Data Structures

struct  SnapBuild
 
struct  SnapBuildOnDisk
 

Macros

#define SnapBuildOnDiskConstantSize   offsetof(SnapBuildOnDisk, builder)
 
#define SnapBuildOnDiskNotChecksummedSize   offsetof(SnapBuildOnDisk, version)
 
#define SNAPBUILD_MAGIC   0x51A1E001
 
#define SNAPBUILD_VERSION   2
 

Typedefs

typedef struct SnapBuildOnDisk SnapBuildOnDisk
 

Functions

static void SnapBuildEndTxn (SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
 
static bool SnapBuildTxnIsRunning (SnapBuild *builder, TransactionId xid)
 
static void SnapBuildPurgeCommittedTxn (SnapBuild *builder)
 
static Snapshot SnapBuildBuildSnapshot (SnapBuild *builder, TransactionId xid)
 
static void SnapBuildFreeSnapshot (Snapshot snap)
 
static void SnapBuildSnapIncRefcount (Snapshot snap)
 
static void SnapBuildDistributeNewCatalogSnapshot (SnapBuild *builder, XLogRecPtr lsn)
 
static bool SnapBuildFindSnapshot (SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
 
static void SnapBuildSerialize (SnapBuild *builder, XLogRecPtr lsn)
 
static bool SnapBuildRestore (SnapBuild *builder, XLogRecPtr lsn)
 
SnapBuildAllocateSnapshotBuilder (ReorderBuffer *reorder, TransactionId xmin_horizon, XLogRecPtr start_lsn)
 
void FreeSnapshotBuilder (SnapBuild *builder)
 
SnapBuildState SnapBuildCurrentState (SnapBuild *builder)
 
bool SnapBuildXactNeedsSkip (SnapBuild *builder, XLogRecPtr ptr)
 
void SnapBuildSnapDecRefcount (Snapshot snap)
 
Snapshot SnapBuildInitialSnapshot (SnapBuild *builder)
 
const char * SnapBuildExportSnapshot (SnapBuild *builder)
 
Snapshot SnapBuildGetOrBuildSnapshot (SnapBuild *builder, TransactionId xid)
 
void SnapBuildClearExportedSnapshot (void)
 
bool SnapBuildProcessChange (SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
 
void SnapBuildProcessNewCid (SnapBuild *builder, TransactionId xid, XLogRecPtr lsn, xl_heap_new_cid *xlrec)
 
static void SnapBuildAddCommittedTxn (SnapBuild *builder, TransactionId xid)
 
void SnapBuildAbortTxn (SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, int nsubxacts, TransactionId *subxacts)
 
void SnapBuildCommitTxn (SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, int nsubxacts, TransactionId *subxacts)
 
void SnapBuildProcessRunningXacts (SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
 
void SnapBuildSerializationPoint (SnapBuild *builder, XLogRecPtr lsn)
 
void CheckPointSnapBuild (void)
 

Variables

static ResourceOwner SavedResourceOwnerDuringExport = NULL
 
static bool ExportInProgress = false
 

Macro Definition Documentation

#define SNAPBUILD_MAGIC   0x51A1E001

Definition at line 1445 of file snapbuild.c.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

#define SNAPBUILD_VERSION   2

Definition at line 1446 of file snapbuild.c.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

#define SnapBuildOnDiskConstantSize   offsetof(SnapBuildOnDisk, builder)

Definition at line 1440 of file snapbuild.c.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

#define SnapBuildOnDiskNotChecksummedSize   offsetof(SnapBuildOnDisk, version)

Definition at line 1442 of file snapbuild.c.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

Typedef Documentation

Function Documentation

SnapBuild* AllocateSnapshotBuilder ( ReorderBuffer reorder,
TransactionId  xmin_horizon,
XLogRecPtr  start_lsn 
)

Definition at line 282 of file snapbuild.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate(), SnapBuild::committed, SnapBuild::context, CurrentMemoryContext, SnapBuild::includes_all_transactions, SnapBuild::initial_xmin_horizon, MemoryContextSwitchTo(), palloc0(), SnapBuild::reorder, SNAPBUILD_START, SnapBuild::start_decoding_at, SnapBuild::state, SnapBuild::xcnt, SnapBuild::xcnt_space, and SnapBuild::xip.

Referenced by StartupDecodingContext().

285 {
286  MemoryContext context;
287  MemoryContext oldcontext;
288  SnapBuild *builder;
289 
290  /* allocate memory in own context, to have better accountability */
292  "snapshot builder context",
294  oldcontext = MemoryContextSwitchTo(context);
295 
296  builder = palloc0(sizeof(SnapBuild));
297 
298  builder->state = SNAPBUILD_START;
299  builder->context = context;
300  builder->reorder = reorder;
301  /* Other struct members initialized by zeroing via palloc0 above */
302 
303  builder->committed.xcnt = 0;
304  builder->committed.xcnt_space = 128; /* arbitrary number */
305  builder->committed.xip =
306  palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
307  builder->committed.includes_all_transactions = true;
308 
309  builder->initial_xmin_horizon = xmin_horizon;
310  builder->start_decoding_at = start_lsn;
311 
312  MemoryContextSwitchTo(oldcontext);
313 
314  return builder;
315 }
uint32 TransactionId
Definition: c.h:397
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
size_t xcnt_space
Definition: snapbuild.c:201
size_t xcnt
Definition: snapbuild.c:200
SnapBuildState state
Definition: snapbuild.c:145
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
ReorderBuffer * reorder
Definition: snapbuild.c:181
TransactionId initial_xmin_horizon
Definition: snapbuild.c:166
TransactionId * xip
Definition: snapbuild.c:202
bool includes_all_transactions
Definition: snapbuild.c:222
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
void * palloc0(Size size)
Definition: mcxt.c:878
MemoryContext context
Definition: snapbuild.c:148
struct SnapBuild::@23 committed
XLogRecPtr start_decoding_at
Definition: snapbuild.c:160
void CheckPointSnapBuild ( void  )

Definition at line 1865 of file snapbuild.c.

References AllocateDir(), dirent::d_name, DEBUG1, elog, ereport, errcode_for_file_access(), errmsg(), FreeDir(), GetRedoRecPtr(), InvalidXLogRecPtr, LOG, lstat, MAXPGPATH, NULL, ReadDir(), ReplicationSlotsComputeLogicalRestartLSN(), snprintf(), and unlink().

Referenced by CheckPointGuts().

1866 {
1867  XLogRecPtr cutoff;
1868  XLogRecPtr redo;
1869  DIR *snap_dir;
1870  struct dirent *snap_de;
1871  char path[MAXPGPATH];
1872 
1873  /*
1874  * We start off with a minimum of the last redo pointer. No new replication
1875  * slot will start before that, so that's a safe upper bound for removal.
1876  */
1877  redo = GetRedoRecPtr();
1878 
1879  /* now check for the restart ptrs from existing slots */
1881 
1882  /* don't start earlier than the restart lsn */
1883  if (redo < cutoff)
1884  cutoff = redo;
1885 
1886  snap_dir = AllocateDir("pg_logical/snapshots");
1887  while ((snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL)
1888  {
1889  uint32 hi;
1890  uint32 lo;
1891  XLogRecPtr lsn;
1892  struct stat statbuf;
1893 
1894  if (strcmp(snap_de->d_name, ".") == 0 ||
1895  strcmp(snap_de->d_name, "..") == 0)
1896  continue;
1897 
1898  snprintf(path, MAXPGPATH, "pg_logical/snapshots/%s", snap_de->d_name);
1899 
1900  if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1901  {
1902  elog(DEBUG1, "only regular files expected: %s", path);
1903  continue;
1904  }
1905 
1906  /*
1907  * temporary filenames from SnapBuildSerialize() include the LSN and
1908  * everything but are postfixed by .$pid.tmp. We can just remove them
1909  * the same as other files because there can be none that are
1910  * currently being written that are older than cutoff.
1911  *
1912  * We just log a message if a file doesn't fit the pattern, it's
1913  * probably some editors lock/state file or similar...
1914  */
1915  if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
1916  {
1917  ereport(LOG,
1918  (errmsg("could not parse file name \"%s\"", path)));
1919  continue;
1920  }
1921 
1922  lsn = ((uint64) hi) << 32 | lo;
1923 
1924  /* check whether we still need it */
1925  if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
1926  {
1927  elog(DEBUG1, "removing snapbuild snapshot %s", path);
1928 
1929  /*
1930  * It's not particularly harmful, though strange, if we can't
1931  * remove the file here. Don't prevent the checkpoint from
1932  * completing, that'd be a cure worse than the disease.
1933  */
1934  if (unlink(path) < 0)
1935  {
1936  ereport(LOG,
1938  errmsg("could not remove file \"%s\": %m",
1939  path)));
1940  continue;
1941  }
1942  }
1943  }
1944  FreeDir(snap_dir);
1945 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG1
Definition: elog.h:25
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define LOG
Definition: elog.h:26
Definition: dirent.h:9
Definition: dirent.c:25
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:598
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:708
unsigned int uint32
Definition: c.h:268
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2335
int unlink(const char *filename)
#define ereport(elevel, rest)
Definition: elog.h:122
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2401
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:8150
int errmsg(const char *fmt,...)
Definition: elog.c:797
char d_name[MAX_PATH]
Definition: dirent.h:14
#define elog
Definition: elog.h:219
#define lstat(path, sb)
Definition: win32.h:272
int FreeDir(DIR *dir)
Definition: fd.c:2444
void FreeSnapshotBuilder ( SnapBuild builder)

Definition at line 321 of file snapbuild.c.

References SnapBuild::context, MemoryContextDelete(), NULL, SnapBuildSnapDecRefcount(), and SnapBuild::snapshot.

Referenced by FreeDecodingContext().

322 {
323  MemoryContext context = builder->context;
324 
325  /* free snapshot explicitly, that contains some error checking */
326  if (builder->snapshot != NULL)
327  {
329  builder->snapshot = NULL;
330  }
331 
332  /* other resources are deallocated via memory context reset */
333  MemoryContextDelete(context);
334 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
Snapshot snapshot
Definition: snapbuild.c:171
#define NULL
Definition: c.h:229
MemoryContext context
Definition: snapbuild.c:148
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:398
void SnapBuildAbortTxn ( SnapBuild builder,
XLogRecPtr  lsn,
TransactionId  xid,
int  nsubxacts,
TransactionId subxacts 
)

Definition at line 948 of file snapbuild.c.

References i, and SnapBuildEndTxn().

Referenced by DecodeAbort().

951 {
952  int i;
953 
954  for (i = 0; i < nsubxacts; i++)
955  {
956  TransactionId subxid = subxacts[i];
957 
958  SnapBuildEndTxn(builder, lsn, subxid);
959  }
960 
961  SnapBuildEndTxn(builder, lsn, xid);
962 }
uint32 TransactionId
Definition: c.h:397
static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
Definition: snapbuild.c:912
int i
static void SnapBuildAddCommittedTxn ( SnapBuild builder,
TransactionId  xid 
)
static

Definition at line 841 of file snapbuild.c.

References Assert, SnapBuild::committed, DEBUG1, elog, repalloc(), TransactionIdIsValid, SnapBuild::xcnt, SnapBuild::xcnt_space, and SnapBuild::xip.

Referenced by SnapBuildCommitTxn().

842 {
844 
845  if (builder->committed.xcnt == builder->committed.xcnt_space)
846  {
847  builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
848 
849  elog(DEBUG1, "increasing space for committed transactions to %u",
850  (uint32) builder->committed.xcnt_space);
851 
852  builder->committed.xip = repalloc(builder->committed.xip,
853  builder->committed.xcnt_space * sizeof(TransactionId));
854  }
855 
856  /*
857  * TODO: It might make sense to keep the array sorted here instead of
858  * doing it every time we build a new snapshot. On the other hand this
859  * gets called repeatedly when a transaction with subtransactions commits.
860  */
861  builder->committed.xip[builder->committed.xcnt++] = xid;
862 }
#define DEBUG1
Definition: elog.h:25
uint32 TransactionId
Definition: c.h:397
size_t xcnt_space
Definition: snapbuild.c:201
size_t xcnt
Definition: snapbuild.c:200
TransactionId * xip
Definition: snapbuild.c:202
unsigned int uint32
Definition: c.h:268
#define Assert(condition)
Definition: c.h:675
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:963
struct SnapBuild::@23 committed
#define elog
Definition: elog.h:219
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static Snapshot SnapBuildBuildSnapshot ( SnapBuild builder,
TransactionId  xid 
)
static

Definition at line 430 of file snapbuild.c.

References SnapshotData::active_count, Assert, SnapBuild::committed, SnapBuild::context, SnapshotData::copied, SnapshotData::curcid, FirstCommandId, HeapTupleSatisfiesHistoricMVCC(), MemoryContextAllocZero(), NULL, qsort, SnapshotData::regd_count, SnapshotData::satisfies, SNAPBUILD_FULL_SNAPSHOT, SnapBuild::state, SnapshotData::suboverflowed, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::takenDuringRecovery, TransactionIdIsNormal, SnapshotData::xcnt, SnapBuild::xcnt, xidComparator(), SnapshotData::xip, SnapBuild::xip, SnapshotData::xmax, SnapBuild::xmax, SnapshotData::xmin, and SnapBuild::xmin.

Referenced by SnapBuildCommitTxn(), SnapBuildGetOrBuildSnapshot(), SnapBuildInitialSnapshot(), SnapBuildProcessChange(), and SnapBuildRestore().

431 {
432  Snapshot snapshot;
433  Size ssize;
434 
435  Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
436 
437  ssize = sizeof(SnapshotData)
438  + sizeof(TransactionId) * builder->committed.xcnt
439  + sizeof(TransactionId) * 1 /* toplevel xid */ ;
440 
441  snapshot = MemoryContextAllocZero(builder->context, ssize);
442 
444 
445  /*
446  * We misuse the original meaning of SnapshotData's xip and subxip fields
447  * to make the more fitting for our needs.
448  *
449  * In the 'xip' array we store transactions that have to be treated as
450  * committed. Since we will only ever look at tuples from transactions
451  * that have modified the catalog it's more efficient to store those few
452  * that exist between xmin and xmax (frequently there are none).
453  *
454  * Snapshots that are used in transactions that have modified the catalog
455  * also use the 'subxip' array to store their toplevel xid and all the
456  * subtransaction xids so we can recognize when we need to treat rows as
457  * visible that are not in xip but still need to be visible. Subxip only
458  * gets filled when the transaction is copied into the context of a
459  * catalog modifying transaction since we otherwise share a snapshot
460  * between transactions. As long as a txn hasn't modified the catalog it
461  * doesn't need to treat any uncommitted rows as visible, so there is no
462  * need for those xids.
463  *
464  * Both arrays are qsort'ed so that we can use bsearch() on them.
465  */
466  Assert(TransactionIdIsNormal(builder->xmin));
467  Assert(TransactionIdIsNormal(builder->xmax));
468 
469  snapshot->xmin = builder->xmin;
470  snapshot->xmax = builder->xmax;
471 
472  /* store all transactions to be treated as committed by this snapshot */
473  snapshot->xip =
474  (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
475  snapshot->xcnt = builder->committed.xcnt;
476  memcpy(snapshot->xip,
477  builder->committed.xip,
478  builder->committed.xcnt * sizeof(TransactionId));
479 
480  /* sort so we can bsearch() */
481  qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
482 
483  /*
484  * Initially, subxip is empty, i.e. it's a snapshot to be used by
485  * transactions that don't modify the catalog. Will be filled by
486  * ReorderBufferCopySnap() if necessary.
487  */
488  snapshot->subxcnt = 0;
489  snapshot->subxip = NULL;
490 
491  snapshot->suboverflowed = false;
492  snapshot->takenDuringRecovery = false;
493  snapshot->copied = false;
494  snapshot->curcid = FirstCommandId;
495  snapshot->active_count = 0;
496  snapshot->regd_count = 0;
497 
498  return snapshot;
499 }
SnapshotSatisfiesFunc satisfies
Definition: snapshot.h:54
uint32 TransactionId
Definition: c.h:397
bool copied
Definition: snapshot.h:94
bool suboverflowed
Definition: snapshot.h:91
size_t xcnt
Definition: snapbuild.c:200
uint32 regd_count
Definition: snapshot.h:108
#define FirstCommandId
Definition: c.h:413
SnapBuildState state
Definition: snapbuild.c:145
TransactionId * xip
Definition: snapbuild.c:202
struct SnapshotData SnapshotData
TransactionId xmax
Definition: snapshot.h:67
TransactionId xmin
Definition: snapshot.h:66
TransactionId * xip
Definition: snapshot.h:77
TransactionId xmax
Definition: snapbuild.c:154
bool HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot, Buffer buffer)
Definition: tqual.c:1652
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:742
CommandId curcid
Definition: snapshot.h:96
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
bool takenDuringRecovery
Definition: snapshot.h:93
size_t Size
Definition: c.h:356
MemoryContext context
Definition: snapbuild.c:148
uint32 xcnt
Definition: snapshot.h:78
TransactionId xmin
Definition: snapbuild.c:151
struct SnapBuild::@23 committed
#define qsort(a, b, c, d)
Definition: port.h:440
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
TransactionId * subxip
Definition: snapshot.h:89
uint32 active_count
Definition: snapshot.h:107
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138
int32 subxcnt
Definition: snapshot.h:90
void SnapBuildClearExportedSnapshot ( void  )

Definition at line 648 of file snapbuild.c.

References AbortCurrentTransaction(), CurrentResourceOwner, elog, ERROR, ExportInProgress, IsTransactionState(), NULL, and SavedResourceOwnerDuringExport.

Referenced by exec_replication_command().

649 {
650  /* nothing exported, that is the usual case */
651  if (!ExportInProgress)
652  return;
653 
654  if (!IsTransactionState())
655  elog(ERROR, "clearing exported snapshot in wrong transaction state");
656 
657  /* make sure nothing could have ever happened */
659 
662  ExportInProgress = false;
663 }
void AbortCurrentTransaction(void)
Definition: xact.c:2986
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
static ResourceOwner SavedResourceOwnerDuringExport
Definition: snapbuild.c:246
#define ERROR
Definition: elog.h:43
static bool ExportInProgress
Definition: snapbuild.c:247
#define NULL
Definition: c.h:229
bool IsTransactionState(void)
Definition: xact.c:350
#define elog
Definition: elog.h:219
void SnapBuildCommitTxn ( SnapBuild builder,
XLogRecPtr  lsn,
TransactionId  xid,
int  nsubxacts,
TransactionId subxacts 
)

Definition at line 968 of file snapbuild.c.

References SnapBuild::committed, DEBUG1, DEBUG2, elog, SnapBuild::includes_all_transactions, NormalTransactionIdFollows, SnapBuild::reorder, ReorderBufferSetBaseSnapshot(), ReorderBufferXidHasBaseSnapshot(), ReorderBufferXidHasCatalogChanges(), SNAPBUILD_CONSISTENT, SNAPBUILD_FULL_SNAPSHOT, SnapBuildAddCommittedTxn(), SnapBuildBuildSnapshot(), SnapBuildDistributeNewCatalogSnapshot(), SnapBuildEndTxn(), SnapBuildSnapDecRefcount(), SnapBuildSnapIncRefcount(), SnapBuild::snapshot, SnapBuild::start_decoding_at, SnapBuild::state, TransactionIdAdvance, TransactionIdFollowsOrEquals(), TransactionIdIsValid, and SnapBuild::xmax.

Referenced by DecodeCommit().

970 {
971  int nxact;
972 
973  bool forced_timetravel = false;
974  bool sub_needs_timetravel = false;
975  bool top_needs_timetravel = false;
976 
977  TransactionId xmax = xid;
978 
979  /*
980  * If we couldn't observe every change of a transaction because it was
981  * already running at the point we started to observe we have to assume it
982  * made catalog changes.
983  *
984  * This has the positive benefit that we afterwards have enough
985  * information to build an exportable snapshot that's usable by pg_dump et
986  * al.
987  */
988  if (builder->state < SNAPBUILD_CONSISTENT)
989  {
990  /* ensure that only commits after this are getting replayed */
991  if (builder->start_decoding_at <= lsn)
992  builder->start_decoding_at = lsn + 1;
993 
994  /*
995  * We could avoid treating !SnapBuildTxnIsRunning transactions as
996  * timetravel ones, but we want to be able to export a snapshot when
997  * we reached consistency.
998  */
999  forced_timetravel = true;
1000  elog(DEBUG1, "forced to assume catalog changes for xid %u because it was running too early", xid);
1001  }
1002 
1003  for (nxact = 0; nxact < nsubxacts; nxact++)
1004  {
1005  TransactionId subxid = subxacts[nxact];
1006 
1007  /*
1008  * make sure txn is not tracked in running txn's anymore, switch state
1009  */
1010  SnapBuildEndTxn(builder, lsn, subxid);
1011 
1012  /*
1013  * If we're forcing timetravel we also need visibility information
1014  * about subtransaction, so keep track of subtransaction's state.
1015  */
1016  if (forced_timetravel)
1017  {
1018  SnapBuildAddCommittedTxn(builder, subxid);
1019  if (NormalTransactionIdFollows(subxid, xmax))
1020  xmax = subxid;
1021  }
1022 
1023  /*
1024  * Add subtransaction to base snapshot if it DDL, we don't distinguish
1025  * to toplevel transactions there.
1026  */
1027  else if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid))
1028  {
1029  sub_needs_timetravel = true;
1030 
1031  elog(DEBUG1, "found subtransaction %u:%u with catalog changes.",
1032  xid, subxid);
1033 
1034  SnapBuildAddCommittedTxn(builder, subxid);
1035 
1036  if (NormalTransactionIdFollows(subxid, xmax))
1037  xmax = subxid;
1038  }
1039  }
1040 
1041  /*
1042  * Make sure toplevel txn is not tracked in running txn's anymore, switch
1043  * state to consistent if possible.
1044  */
1045  SnapBuildEndTxn(builder, lsn, xid);
1046 
1047  if (forced_timetravel)
1048  {
1049  elog(DEBUG2, "forced transaction %u to do timetravel.", xid);
1050 
1051  SnapBuildAddCommittedTxn(builder, xid);
1052  }
1053  /* add toplevel transaction to base snapshot */
1054  else if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
1055  {
1056  elog(DEBUG2, "found top level transaction %u, with catalog changes!",
1057  xid);
1058 
1059  top_needs_timetravel = true;
1060  SnapBuildAddCommittedTxn(builder, xid);
1061  }
1062  else if (sub_needs_timetravel)
1063  {
1064  /* mark toplevel txn as timetravel as well */
1065  SnapBuildAddCommittedTxn(builder, xid);
1066  }
1067 
1068  /* if there's any reason to build a historic snapshot, do so now */
1069  if (forced_timetravel || top_needs_timetravel || sub_needs_timetravel)
1070  {
1071  /*
1072  * Adjust xmax of the snapshot builder, we only do that for committed,
1073  * catalog modifying, transactions, everything else isn't interesting
1074  * for us since we'll never look at the respective rows.
1075  */
1076  if (!TransactionIdIsValid(builder->xmax) ||
1077  TransactionIdFollowsOrEquals(xmax, builder->xmax))
1078  {
1079  builder->xmax = xmax;
1080  TransactionIdAdvance(builder->xmax);
1081  }
1082 
1083  /*
1084  * If we haven't built a complete snapshot yet there's no need to hand
1085  * it out, it wouldn't (and couldn't) be used anyway.
1086  */
1087  if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
1088  return;
1089 
1090  /*
1091  * Decrease the snapshot builder's refcount of the old snapshot, note
1092  * that it still will be used if it has been handed out to the
1093  * reorderbuffer earlier.
1094  */
1095  if (builder->snapshot)
1097 
1098  builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
1099 
1100  /* we might need to execute invalidations, add snapshot */
1101  if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
1102  {
1104  ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
1105  builder->snapshot);
1106  }
1107 
1108  /* refcount of the snapshot builder for the new snapshot */
1110 
1111  /* add a new Snapshot to all currently running transactions */
1113  }
1114  else
1115  {
1116  /* record that we cannot export a general snapshot anymore */
1117  builder->committed.includes_all_transactions = false;
1118  }
1119 }
#define TransactionIdAdvance(dest)
Definition: transam.h:48
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
#define DEBUG1
Definition: elog.h:25
static void SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
Definition: snapbuild.c:841
uint32 TransactionId
Definition: c.h:397
Snapshot snapshot
Definition: snapbuild.c:171
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
bool TransactionIdFollowsOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:349
static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
Definition: snapbuild.c:912
SnapBuildState state
Definition: snapbuild.c:145
static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
Definition: snapbuild.c:795
#define DEBUG2
Definition: elog.h:24
ReorderBuffer * reorder
Definition: snapbuild.c:181
bool includes_all_transactions
Definition: snapbuild.c:222
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid)
Definition: snapbuild.c:430
TransactionId xmax
Definition: snapbuild.c:154
#define NormalTransactionIdFollows(id1, id2)
Definition: transam.h:67
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:398
struct SnapBuild::@23 committed
#define elog
Definition: elog.h:219
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static void SnapBuildSnapIncRefcount(Snapshot snap)
Definition: snapbuild.c:386
XLogRecPtr start_decoding_at
Definition: snapbuild.c:160
SnapBuildState SnapBuildCurrentState ( SnapBuild builder)

Definition at line 365 of file snapbuild.c.

References SnapBuild::state.

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeXactOp(), and DecodingContextReady().

366 {
367  return builder->state;
368 }
SnapBuildState state
Definition: snapbuild.c:145
static void SnapBuildDistributeNewCatalogSnapshot ( SnapBuild builder,
XLogRecPtr  lsn 
)
static

Definition at line 795 of file snapbuild.c.

References Assert, dlist_iter::cur, DEBUG2, dlist_container, dlist_foreach, elog, SnapBuild::reorder, ReorderBufferAddSnapshot(), ReorderBufferXidHasBaseSnapshot(), SnapBuildSnapIncRefcount(), SnapBuild::snapshot, ReorderBuffer::toplevel_by_lsn, TransactionIdIsValid, and ReorderBufferTXN::xid.

Referenced by SnapBuildCommitTxn().

796 {
797  dlist_iter txn_i;
798  ReorderBufferTXN *txn;
799 
800  /*
801  * Iterate through all toplevel transactions. This can include
802  * subtransactions which we just don't yet know to be that, but that's
803  * fine, they will just get an unnecessary snapshot queued.
804  */
805  dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
806  {
807  txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
808 
810 
811  /*
812  * If we don't have a base snapshot yet, there are no changes in this
813  * transaction which in turn implies we don't yet need a snapshot at
814  * all. We'll add a snapshot when the first change gets queued.
815  *
816  * NB: This works correctly even for subtransactions because
817  * ReorderBufferCommitChild() takes care to pass the parent the base
818  * snapshot, and while iterating the changequeue we'll get the change
819  * from the subtxn.
820  */
821  if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
822  continue;
823 
824  elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
825  txn->xid, (uint32) (lsn >> 32), (uint32) lsn);
826 
827  /*
828  * increase the snapshot's refcount for the transaction we are handing
829  * it out to
830  */
832  ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
833  builder->snapshot);
834  }
835 }
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
Snapshot snapshot
Definition: snapbuild.c:171
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define DEBUG2
Definition: elog.h:24
ReorderBuffer * reorder
Definition: snapbuild.c:181
unsigned int uint32
Definition: c.h:268
dlist_head toplevel_by_lsn
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
void ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
#define Assert(condition)
Definition: c.h:675
#define elog
Definition: elog.h:219
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static void SnapBuildSnapIncRefcount(Snapshot snap)
Definition: snapbuild.c:386
static void SnapBuildEndTxn ( SnapBuild builder,
XLogRecPtr  lsn,
TransactionId  xid 
)
static

Definition at line 912 of file snapbuild.c.

References Assert, ereport, errdetail(), errmsg(), LOG, SnapBuild::running, SNAPBUILD_CONSISTENT, SnapBuildTxnIsRunning(), SnapBuild::state, and SnapBuild::xcnt.

Referenced by SnapBuildAbortTxn(), and SnapBuildCommitTxn().

913 {
914  if (builder->state == SNAPBUILD_CONSISTENT)
915  return;
916 
917  /*
918  * NB: This handles subtransactions correctly even if we started from
919  * suboverflowed xl_running_xacts because we only keep track of toplevel
920  * transactions. Since the latter are always allocated before their
921  * subxids and since they end at the same time it's sufficient to deal
922  * with them here.
923  */
924  if (SnapBuildTxnIsRunning(builder, xid))
925  {
926  Assert(builder->running.xcnt > 0);
927 
928  if (!--builder->running.xcnt)
929  {
930  /*
931  * None of the originally running transaction is running anymore,
932  * so our incrementally built snapshot now is consistent.
933  */
934  ereport(LOG,
935  (errmsg("logical decoding found consistent point at %X/%X",
936  (uint32) (lsn >> 32), (uint32) lsn),
937  errdetail("Transaction ID %u finished; no more running transactions.",
938  xid)));
939  builder->state = SNAPBUILD_CONSISTENT;
940  }
941  }
942 }
size_t xcnt
Definition: snapbuild.c:200
#define LOG
Definition: elog.h:26
SnapBuildState state
Definition: snapbuild.c:145
struct SnapBuild::@22 running
int errdetail(const char *fmt,...)
Definition: elog.c:873
unsigned int uint32
Definition: c.h:268
#define ereport(elevel, rest)
Definition: elog.h:122
static bool SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid)
Definition: snapbuild.c:762
#define Assert(condition)
Definition: c.h:675
int errmsg(const char *fmt,...)
Definition: elog.c:797
const char* SnapBuildExportSnapshot ( SnapBuild builder)

Definition at line 587 of file snapbuild.c.

References CurrentResourceOwner, elog, ereport, errmsg_plural(), ERROR, ExportInProgress, ExportSnapshot(), IsTransactionOrTransactionBlock(), LOG, SnapBuildInitialSnapshot(), StartTransactionCommand(), XACT_REPEATABLE_READ, XactIsoLevel, XactReadOnly, and SnapshotData::xcnt.

Referenced by CreateReplicationSlot().

588 {
589  Snapshot snap;
590  char *snapname;
591 
593  elog(ERROR, "cannot export a snapshot from within a transaction");
594 
596  elog(ERROR, "can only export one snapshot at a time");
597 
599  ExportInProgress = true;
600 
602 
603  /* There doesn't seem to a nice API to set these */
605  XactReadOnly = true;
606 
607  snap = SnapBuildInitialSnapshot(builder);
608 
609  /*
610  * now that we've built a plain snapshot, make it active and use the
611  * normal mechanisms for exporting it
612  */
613  snapname = ExportSnapshot(snap);
614 
615  ereport(LOG,
616  (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
617  "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
618  snap->xcnt,
619  snapname, snap->xcnt)));
620  return snapname;
621 }
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
Definition: elog.c:850
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
#define XACT_REPEATABLE_READ
Definition: xact.h:30
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4322
char * ExportSnapshot(Snapshot snapshot)
Definition: snapmgr.c:1150
#define LOG
Definition: elog.h:26
static ResourceOwner SavedResourceOwnerDuringExport
Definition: snapbuild.c:246
#define ERROR
Definition: elog.h:43
static bool ExportInProgress
Definition: snapbuild.c:247
#define ereport(elevel, rest)
Definition: elog.h:122
bool XactReadOnly
Definition: xact.c:77
void StartTransactionCommand(void)
Definition: xact.c:2677
int XactIsoLevel
Definition: xact.c:74
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:509
uint32 xcnt
Definition: snapshot.h:78
#define elog
Definition: elog.h:219
static bool SnapBuildFindSnapshot ( SnapBuild builder,
XLogRecPtr  lsn,
xl_running_xacts running 
)
static

Definition at line 1228 of file snapbuild.c.

References Assert, SnapBuild::context, DEBUG1, elog, ereport, errdetail(), errdetail_internal(), errdetail_plural(), errmsg(), errmsg_internal(), ERROR, SnapBuild::initial_xmin_horizon, InvalidTransactionId, InvalidXLogRecPtr, LOG, MemoryContextAlloc(), xl_running_xacts::nextXid, NormalTransactionIdPrecedes, NULL, xl_running_xacts::oldestRunningXid, qsort, SnapBuild::running, SNAPBUILD_CONSISTENT, SNAPBUILD_FULL_SNAPSHOT, SnapBuildRestore(), SnapBuild::start_decoding_at, SnapBuild::state, TransactionIdAdvance, TransactionIdIsCurrentTransactionId(), TransactionIdIsNormal, TransactionIdRetreat, XactLockTableWait(), xl_running_xacts::xcnt, SnapBuild::xcnt, SnapBuild::xcnt_space, xidComparator(), xl_running_xacts::xids, SnapBuild::xip, XLTW_None, SnapBuild::xmax, and SnapBuild::xmin.

Referenced by SnapBuildProcessRunningXacts().

1229 {
1230  /* ---
1231  * Build catalog decoding snapshot incrementally using information about
1232  * the currently running transactions. There are several ways to do that:
1233  *
1234  * a) There were no running transactions when the xl_running_xacts record
1235  * was inserted, jump to CONSISTENT immediately. We might find such a
1236  * state we were waiting for b) and c).
1237  *
1238  * b) Wait for all toplevel transactions that were running to end. We
1239  * simply track the number of in-progress toplevel transactions and
1240  * lower it whenever one commits or aborts. When that number
1241  * (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT
1242  * to CONSISTENT.
1243  * NB: We need to search running.xip when seeing a transaction's end to
1244  * make sure it's a toplevel transaction and it's been one of the
1245  * initially running ones.
1246  * Interestingly, in contrast to HS, this allows us not to care about
1247  * subtransactions - and by extension suboverflowed xl_running_xacts -
1248  * at all.
1249  *
1250  * c) This (in a previous run) or another decoding slot serialized a
1251  * snapshot to disk that we can use.
1252  * ---
1253  */
1254 
1255  /*
1256  * xl_running_xact record is older than what we can use, we might not have
1257  * all necessary catalog rows anymore.
1258  */
1261  builder->initial_xmin_horizon))
1262  {
1263  ereport(DEBUG1,
1264  (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
1265  (uint32) (lsn >> 32), (uint32) lsn),
1266  errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
1267  builder->initial_xmin_horizon, running->oldestRunningXid)));
1268  return true;
1269  }
1270 
1271  /*
1272  * a) No transaction were running, we can jump to consistent.
1273  *
1274  * NB: We might have already started to incrementally assemble a snapshot,
1275  * so we need to be careful to deal with that.
1276  */
1277  if (running->xcnt == 0)
1278  {
1279  if (builder->start_decoding_at == InvalidXLogRecPtr ||
1280  builder->start_decoding_at <= lsn)
1281  /* can decode everything after this */
1282  builder->start_decoding_at = lsn + 1;
1283 
1284  /* As no transactions were running xmin/xmax can be trivially set. */
1285  builder->xmin = running->nextXid; /* < are finished */
1286  builder->xmax = running->nextXid; /* >= are running */
1287 
1288  /* so we can safely use the faster comparisons */
1289  Assert(TransactionIdIsNormal(builder->xmin));
1290  Assert(TransactionIdIsNormal(builder->xmax));
1291 
1292  /* no transactions running now */
1293  builder->running.xcnt = 0;
1294  builder->running.xmin = InvalidTransactionId;
1295  builder->running.xmax = InvalidTransactionId;
1296 
1297  builder->state = SNAPBUILD_CONSISTENT;
1298 
1299  ereport(LOG,
1300  (errmsg("logical decoding found consistent point at %X/%X",
1301  (uint32) (lsn >> 32), (uint32) lsn),
1302  errdetail("There are no running transactions.")));
1303 
1304  return false;
1305  }
1306  /* c) valid on disk state */
1307  else if (SnapBuildRestore(builder, lsn))
1308  {
1309  /* there won't be any state to cleanup */
1310  return false;
1311  }
1312 
1313  /*
1314  * b) first encounter of a useable xl_running_xacts record. If we had
1315  * found one earlier we would either track running transactions (i.e.
1316  * builder->running.xcnt != 0) or be consistent (this function wouldn't
1317  * get called).
1318  */
1319  else if (!builder->running.xcnt)
1320  {
1321  int off;
1322 
1323  /*
1324  * We only care about toplevel xids as those are the ones we
1325  * definitely see in the wal stream. As snapbuild.c tracks committed
1326  * instead of running transactions we don't need to know anything
1327  * about uncommitted subtransactions.
1328  */
1329 
1330  /*
1331  * Start with an xmin/xmax that's correct for future, when all the
1332  * currently running transactions have finished. We'll update both
1333  * while waiting for the pending transactions to finish.
1334  */
1335  builder->xmin = running->nextXid; /* < are finished */
1336  builder->xmax = running->nextXid; /* >= are running */
1337 
1338  /* so we can safely use the faster comparisons */
1339  Assert(TransactionIdIsNormal(builder->xmin));
1340  Assert(TransactionIdIsNormal(builder->xmax));
1341 
1342  builder->running.xcnt = running->xcnt;
1343  builder->running.xcnt_space = running->xcnt;
1344  builder->running.xip =
1345  MemoryContextAlloc(builder->context,
1346  builder->running.xcnt * sizeof(TransactionId));
1347  memcpy(builder->running.xip, running->xids,
1348  builder->running.xcnt * sizeof(TransactionId));
1349 
1350  /* sort so we can do a binary search */
1351  qsort(builder->running.xip, builder->running.xcnt,
1352  sizeof(TransactionId), xidComparator);
1353 
1354  builder->running.xmin = builder->running.xip[0];
1355  builder->running.xmax = builder->running.xip[running->xcnt - 1];
1356 
1357  /* makes comparisons cheaper later */
1358  TransactionIdRetreat(builder->running.xmin);
1359  TransactionIdAdvance(builder->running.xmax);
1360 
1361  builder->state = SNAPBUILD_FULL_SNAPSHOT;
1362 
1363  ereport(LOG,
1364  (errmsg("logical decoding found initial starting point at %X/%X",
1365  (uint32) (lsn >> 32), (uint32) lsn),
1366  errdetail_plural("%u transaction needs to finish.",
1367  "%u transactions need to finish.",
1368  builder->running.xcnt,
1369  (uint32) builder->running.xcnt)));
1370 
1371  /*
1372  * Iterate through all xids, wait for them to finish.
1373  *
1374  * This isn't required for the correctness of decoding, but to allow
1375  * isolationtester to notice that we're currently waiting for
1376  * something.
1377  */
1378  for (off = 0; off < builder->running.xcnt; off++)
1379  {
1380  TransactionId xid = builder->running.xip[off];
1381 
1382  /*
1383  * Upper layers should prevent that we ever need to wait on
1384  * ourselves. Check anyway, since failing to do so would either
1385  * result in an endless wait or an Assert() failure.
1386  */
1388  elog(ERROR, "waiting for ourselves");
1389 
1391  }
1392 
1393  /* nothing could have built up so far, so don't perform cleanup */
1394  return false;
1395  }
1396 
1397  /*
1398  * We already started to track running xacts and need to wait for all
1399  * in-progress ones to finish. We fall through to the normal processing of
1400  * records so incremental cleanup can be performed.
1401  */
1402  return true;
1403 }
#define TransactionIdAdvance(dest)
Definition: transam.h:48
static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
Definition: snapbuild.c:1665
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG1
Definition: elog.h:25
uint32 TransactionId
Definition: c.h:397
bool TransactionIdIsCurrentTransactionId(TransactionId xid)
Definition: xact.c:773
size_t xcnt_space
Definition: snapbuild.c:201
size_t xcnt
Definition: snapbuild.c:200
#define LOG
Definition: elog.h:26
#define TransactionIdRetreat(dest)
Definition: transam.h:56
int errdetail_internal(const char *fmt,...)
Definition: elog.c:900
TransactionId xids[FLEXIBLE_ARRAY_MEMBER]
Definition: standbydefs.h:56
SnapBuildState state
Definition: snapbuild.c:145
#define ERROR
Definition: elog.h:43
Definition: lmgr.h:26
struct SnapBuild::@22 running
TransactionId initial_xmin_horizon
Definition: snapbuild.c:166
TransactionId * xip
Definition: snapbuild.c:202
int errdetail(const char *fmt,...)
Definition: elog.c:873
#define InvalidTransactionId
Definition: transam.h:31
unsigned int uint32
Definition: c.h:268
#define ereport(elevel, rest)
Definition: elog.h:122
TransactionId xmax
Definition: snapbuild.c:154
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
void XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid, XLTW_Oper oper)
Definition: lmgr.c:554
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
int errdetail_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
Definition: elog.c:965
MemoryContext context
Definition: snapbuild.c:148
TransactionId nextXid
Definition: standbydefs.h:52
#define NormalTransactionIdPrecedes(id1, id2)
Definition: transam.h:62
int errmsg(const char *fmt,...)
Definition: elog.c:797
TransactionId xmin
Definition: snapbuild.c:151
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
TransactionId oldestRunningXid
Definition: standbydefs.h:53
#define elog
Definition: elog.h:219
#define qsort(a, b, c, d)
Definition: port.h:440
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
XLogRecPtr start_decoding_at
Definition: snapbuild.c:160
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138
static void SnapBuildFreeSnapshot ( Snapshot  snap)
static

Definition at line 340 of file snapbuild.c.

References SnapshotData::active_count, Assert, SnapshotData::copied, SnapshotData::curcid, elog, ERROR, FirstCommandId, HeapTupleSatisfiesHistoricMVCC(), pfree(), SnapshotData::regd_count, SnapshotData::satisfies, SnapshotData::suboverflowed, and SnapshotData::takenDuringRecovery.

Referenced by SnapBuildSnapDecRefcount().

341 {
342  /* make sure we don't get passed an external snapshot */
344 
345  /* make sure nobody modified our snapshot */
346  Assert(snap->curcid == FirstCommandId);
347  Assert(!snap->suboverflowed);
348  Assert(!snap->takenDuringRecovery);
349  Assert(snap->regd_count == 0);
350 
351  /* slightly more likely, so it's checked even without c-asserts */
352  if (snap->copied)
353  elog(ERROR, "cannot free a copied snapshot");
354 
355  if (snap->active_count)
356  elog(ERROR, "cannot free an active snapshot");
357 
358  pfree(snap);
359 }
SnapshotSatisfiesFunc satisfies
Definition: snapshot.h:54
bool copied
Definition: snapshot.h:94
bool suboverflowed
Definition: snapshot.h:91
uint32 regd_count
Definition: snapshot.h:108
#define FirstCommandId
Definition: c.h:413
void pfree(void *pointer)
Definition: mcxt.c:950
#define ERROR
Definition: elog.h:43
bool HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot, Buffer buffer)
Definition: tqual.c:1652
CommandId curcid
Definition: snapshot.h:96
#define Assert(condition)
Definition: c.h:675
bool takenDuringRecovery
Definition: snapshot.h:93
#define elog
Definition: elog.h:219
uint32 active_count
Definition: snapshot.h:107
Snapshot SnapBuildGetOrBuildSnapshot ( SnapBuild builder,
TransactionId  xid 
)

Definition at line 627 of file snapbuild.c.

References Assert, NULL, SNAPBUILD_CONSISTENT, SnapBuildBuildSnapshot(), SnapBuildSnapIncRefcount(), SnapBuild::snapshot, and SnapBuild::state.

Referenced by DecodeLogicalMsgOp().

628 {
629  Assert(builder->state == SNAPBUILD_CONSISTENT);
630 
631  /* only build a new snapshot if we don't have a prebuilt one */
632  if (builder->snapshot == NULL)
633  {
634  builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
635  /* increase refcount for the snapshot builder */
637  }
638 
639  return builder->snapshot;
640 }
Snapshot snapshot
Definition: snapbuild.c:171
SnapBuildState state
Definition: snapbuild.c:145
static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid)
Definition: snapbuild.c:430
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
static void SnapBuildSnapIncRefcount(Snapshot snap)
Definition: snapbuild.c:386
Snapshot SnapBuildInitialSnapshot ( SnapBuild builder)

Definition at line 509 of file snapbuild.c.

References Assert, SnapBuild::committed, elog, ereport, errcode(), errmsg(), ERROR, FirstSnapshotSet, GetMaxSnapshotXidCount(), GetTopTransactionId(), SnapBuild::includes_all_transactions, MyPgXact, NormalTransactionIdPrecedes, NULL, palloc(), SNAPBUILD_CONSISTENT, SnapBuildBuildSnapshot(), SnapBuild::state, test(), TransactionIdAdvance, TransactionIdIsValid, XACT_REPEATABLE_READ, XactIsoLevel, SnapshotData::xcnt, xidComparator(), SnapshotData::xip, SnapshotData::xmax, SnapshotData::xmin, and PGXACT::xmin.

Referenced by CreateReplicationSlot(), and SnapBuildExportSnapshot().

510 {
511  Snapshot snap;
512  TransactionId xid;
513  TransactionId *newxip;
514  int newxcnt = 0;
515 
518 
519  if (builder->state != SNAPBUILD_CONSISTENT)
520  elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
521 
522  if (!builder->committed.includes_all_transactions)
523  elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
524 
525  /* so we don't overwrite the existing value */
527  elog(ERROR, "cannot build an initial slot snapshot when MyPgXact->xmin already is valid");
528 
529  snap = SnapBuildBuildSnapshot(builder, GetTopTransactionId());
530 
531  /*
532  * We know that snap->xmin is alive, enforced by the logical xmin
533  * mechanism. Due to that we can do this without locks, we're only
534  * changing our own value.
535  */
536  MyPgXact->xmin = snap->xmin;
537 
538  /* allocate in transaction context */
539  newxip = (TransactionId *)
541 
542  /*
543  * snapbuild.c builds transactions in an "inverted" manner, which means it
544  * stores committed transactions in ->xip, not ones in progress. Build a
545  * classical snapshot by marking all non-committed transactions as
546  * in-progress. This can be expensive.
547  */
548  for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
549  {
550  void *test;
551 
552  /*
553  * Check whether transaction committed using the decoding snapshot
554  * meaning of ->xip.
555  */
556  test = bsearch(&xid, snap->xip, snap->xcnt,
557  sizeof(TransactionId), xidComparator);
558 
559  if (test == NULL)
560  {
561  if (newxcnt >= GetMaxSnapshotXidCount())
562  ereport(ERROR,
563  (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
564  errmsg("initial slot snapshot too large")));
565 
566  newxip[newxcnt++] = xid;
567  }
568 
570  }
571 
572  snap->xcnt = newxcnt;
573  snap->xip = newxip;
574 
575  return snap;
576 }
#define TransactionIdAdvance(dest)
Definition: transam.h:48
static void test(void)
uint32 TransactionId
Definition: c.h:397
TransactionId xmin
Definition: proc.h:213
#define XACT_REPEATABLE_READ
Definition: xact.h:30
int errcode(int sqlerrcode)
Definition: elog.c:575
TransactionId GetTopTransactionId(void)
Definition: xact.c:389
PGXACT * MyPgXact
Definition: proc.c:68
SnapBuildState state
Definition: snapbuild.c:145
#define ERROR
Definition: elog.h:43
bool FirstSnapshotSet
Definition: snapmgr.c:203
bool includes_all_transactions
Definition: snapbuild.c:222
TransactionId xmax
Definition: snapshot.h:67
TransactionId xmin
Definition: snapshot.h:66
#define ereport(elevel, rest)
Definition: elog.h:122
static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid)
Definition: snapbuild.c:430
TransactionId * xip
Definition: snapshot.h:77
int GetMaxSnapshotXidCount(void)
Definition: procarray.c:1452
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
int XactIsoLevel
Definition: xact.c:74
#define NormalTransactionIdPrecedes(id1, id2)
Definition: transam.h:62
uint32 xcnt
Definition: snapshot.h:78
void * palloc(Size size)
Definition: mcxt.c:849
int errmsg(const char *fmt,...)
Definition: elog.c:797
struct SnapBuild::@23 committed
#define elog
Definition: elog.h:219
#define TransactionIdIsValid(xid)
Definition: transam.h:41
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138
bool SnapBuildProcessChange ( SnapBuild builder,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 671 of file snapbuild.c.

References NULL, SnapBuild::reorder, ReorderBufferSetBaseSnapshot(), ReorderBufferXidHasBaseSnapshot(), SNAPBUILD_CONSISTENT, SNAPBUILD_FULL_SNAPSHOT, SnapBuildBuildSnapshot(), SnapBuildSnapIncRefcount(), SnapBuildTxnIsRunning(), SnapBuild::snapshot, and SnapBuild::state.

Referenced by DecodeHeap2Op(), DecodeHeapOp(), and DecodeLogicalMsgOp().

672 {
673  /*
674  * We can't handle data in transactions if we haven't built a snapshot
675  * yet, so don't store them.
676  */
677  if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
678  return false;
679 
680  /*
681  * No point in keeping track of changes in transactions that we don't have
682  * enough information about to decode. This means that they started before
683  * we got into the SNAPBUILD_FULL_SNAPSHOT state.
684  */
685  if (builder->state < SNAPBUILD_CONSISTENT &&
686  SnapBuildTxnIsRunning(builder, xid))
687  return false;
688 
689  /*
690  * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
691  * be needed to decode the change we're currently processing.
692  */
693  if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
694  {
695  /* only build a new snapshot if we don't have a prebuilt one */
696  if (builder->snapshot == NULL)
697  {
698  builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
699  /* increase refcount for the snapshot builder */
701  }
702 
703  /*
704  * Increase refcount for the transaction we're handing the snapshot
705  * out to.
706  */
708  ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
709  builder->snapshot);
710  }
711 
712  return true;
713 }
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
Snapshot snapshot
Definition: snapbuild.c:171
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
SnapBuildState state
Definition: snapbuild.c:145
ReorderBuffer * reorder
Definition: snapbuild.c:181
static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid)
Definition: snapbuild.c:430
static bool SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid)
Definition: snapbuild.c:762
#define NULL
Definition: c.h:229
static void SnapBuildSnapIncRefcount(Snapshot snap)
Definition: snapbuild.c:386
void SnapBuildProcessNewCid ( SnapBuild builder,
TransactionId  xid,
XLogRecPtr  lsn,
xl_heap_new_cid xlrec 
)

Definition at line 721 of file snapbuild.c.

References xl_heap_new_cid::cmax, xl_heap_new_cid::cmin, xl_heap_new_cid::combocid, elog, ERROR, InvalidCommandId, Max, SnapBuild::reorder, ReorderBufferAddNewCommandId(), ReorderBufferAddNewTupleCids(), ReorderBufferXidSetCatalogChanges(), xl_heap_new_cid::target_node, xl_heap_new_cid::target_tid, and xl_heap_new_cid::top_xid.

Referenced by DecodeHeap2Op().

723 {
724  CommandId cid;
725 
726  /*
727  * we only log new_cid's if a catalog tuple was modified, so mark the
728  * transaction as containing catalog modifications
729  */
730  ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
731 
732  ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
733  xlrec->target_node, xlrec->target_tid,
734  xlrec->cmin, xlrec->cmax,
735  xlrec->combocid);
736 
737  /* figure out new command id */
738  if (xlrec->cmin != InvalidCommandId &&
739  xlrec->cmax != InvalidCommandId)
740  cid = Max(xlrec->cmin, xlrec->cmax);
741  else if (xlrec->cmax != InvalidCommandId)
742  cid = xlrec->cmax;
743  else if (xlrec->cmin != InvalidCommandId)
744  cid = xlrec->cmin;
745  else
746  {
747  cid = InvalidCommandId; /* silence compiler */
748  elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
749  }
750 
751  ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
752 }
uint32 CommandId
Definition: c.h:411
CommandId combocid
Definition: heapam_xlog.h:348
CommandId cmax
Definition: heapam_xlog.h:341
ItemPointerData target_tid
Definition: heapam_xlog.h:354
void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
#define ERROR
Definition: elog.h:43
ReorderBuffer * reorder
Definition: snapbuild.c:181
RelFileNode target_node
Definition: heapam_xlog.h:353
#define InvalidCommandId
Definition: c.h:414
#define Max(x, y)
Definition: c.h:800
CommandId cmin
Definition: heapam_xlog.h:340
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
#define elog
Definition: elog.h:219
TransactionId top_xid
Definition: heapam_xlog.h:339
void ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
void SnapBuildProcessRunningXacts ( SnapBuild builder,
XLogRecPtr  lsn,
xl_running_xacts running 
)

Definition at line 1133 of file snapbuild.c.

References ReorderBuffer::current_restart_decoding_lsn, DEBUG3, elog, InvalidXLogRecPtr, SnapBuild::last_serialized_snapshot, LogicalIncreaseRestartDecodingForSlot(), LogicalIncreaseXminForSlot(), NULL, xl_running_xacts::oldestRunningXid, SnapBuild::reorder, ReorderBufferGetOldestTXN(), ReorderBufferTXN::restart_decoding_lsn, SNAPBUILD_CONSISTENT, SnapBuildFindSnapshot(), SnapBuildPurgeCommittedTxn(), SnapBuildSerialize(), SnapBuild::state, SnapBuild::xmax, and SnapBuild::xmin.

Referenced by DecodeStandbyOp().

1134 {
1135  ReorderBufferTXN *txn;
1136 
1137  /*
1138  * If we're not consistent yet, inspect the record to see whether it
1139  * allows to get closer to being consistent. If we are consistent, dump
1140  * our snapshot so others or we, after a restart, can use it.
1141  */
1142  if (builder->state < SNAPBUILD_CONSISTENT)
1143  {
1144  /* returns false if there's no point in performing cleanup just yet */
1145  if (!SnapBuildFindSnapshot(builder, lsn, running))
1146  return;
1147  }
1148  else
1149  SnapBuildSerialize(builder, lsn);
1150 
1151  /*
1152  * Update range of interesting xids based on the running xacts
1153  * information. We don't increase ->xmax using it, because once we are in
1154  * a consistent state we can do that ourselves and much more efficiently
1155  * so, because we only need to do it for catalog transactions since we
1156  * only ever look at those.
1157  *
1158  * NB: Because of that xmax can be lower than xmin, because we only
1159  * increase xmax when a catalog modifying transaction commits. While odd
1160  * looking, it's correct and actually more efficient this way since we hit
1161  * fast paths in tqual.c.
1162  */
1163  builder->xmin = running->oldestRunningXid;
1164 
1165  /* Remove transactions we don't need to keep track off anymore */
1166  SnapBuildPurgeCommittedTxn(builder);
1167 
1168  elog(DEBUG3, "xmin: %u, xmax: %u, oldestrunning: %u",
1169  builder->xmin, builder->xmax,
1170  running->oldestRunningXid);
1171 
1172  /*
1173  * Increase shared memory limits, so vacuum can work on tuples we prevented
1174  * from being pruned till now.
1175  */
1177 
1178  /*
1179  * Also tell the slot where we can restart decoding from. We don't want to
1180  * do that after every commit because changing that implies an fsync of
1181  * the logical slot's state file, so we only do it every time we see a
1182  * running xacts record.
1183  *
1184  * Do so by looking for the oldest in progress transaction (determined by
1185  * the first LSN of any of its relevant records). Every transaction
1186  * remembers the last location we stored the snapshot to disk before its
1187  * beginning. That point is where we can restart from.
1188  */
1189 
1190  /*
1191  * Can't know about a serialized snapshot's location if we're not
1192  * consistent.
1193  */
1194  if (builder->state < SNAPBUILD_CONSISTENT)
1195  return;
1196 
1197  txn = ReorderBufferGetOldestTXN(builder->reorder);
1198 
1199  /*
1200  * oldest ongoing txn might have started when we didn't yet serialize
1201  * anything because we hadn't reached a consistent state yet.
1202  */
1203  if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
1205 
1206  /*
1207  * No in-progress transaction, can reuse the last serialized snapshot if
1208  * we have one.
1209  */
1210  else if (txn == NULL &&
1214  builder->last_serialized_snapshot);
1215 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG3
Definition: elog.h:23
void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
Definition: logical.c:815
XLogRecPtr current_restart_decoding_lsn
static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
Definition: snapbuild.c:1228
SnapBuildState state
Definition: snapbuild.c:145
static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
Definition: snapbuild.c:1468
ReorderBuffer * reorder
Definition: snapbuild.c:181
XLogRecPtr last_serialized_snapshot
Definition: snapbuild.c:176
TransactionId xmax
Definition: snapbuild.c:154
#define NULL
Definition: c.h:229
ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *rb)
void LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
Definition: logical.c:758
TransactionId xmin
Definition: snapbuild.c:151
XLogRecPtr restart_decoding_lsn
TransactionId oldestRunningXid
Definition: standbydefs.h:53
#define elog
Definition: elog.h:219
static void SnapBuildPurgeCommittedTxn(SnapBuild *builder)
Definition: snapbuild.c:870
static void SnapBuildPurgeCommittedTxn ( SnapBuild builder)
static

Definition at line 870 of file snapbuild.c.

References SnapBuild::committed, SnapBuild::context, DEBUG3, elog, MemoryContextAlloc(), NormalTransactionIdPrecedes, pfree(), TransactionIdIsNormal, SnapBuild::xcnt, SnapBuild::xip, SnapBuild::xmax, and SnapBuild::xmin.

Referenced by SnapBuildProcessRunningXacts().

871 {
872  int off;
873  TransactionId *workspace;
874  int surviving_xids = 0;
875 
876  /* not ready yet */
877  if (!TransactionIdIsNormal(builder->xmin))
878  return;
879 
880  /* TODO: Neater algorithm than just copying and iterating? */
881  workspace =
882  MemoryContextAlloc(builder->context,
883  builder->committed.xcnt * sizeof(TransactionId));
884 
885  /* copy xids that still are interesting to workspace */
886  for (off = 0; off < builder->committed.xcnt; off++)
887  {
888  if (NormalTransactionIdPrecedes(builder->committed.xip[off],
889  builder->xmin))
890  ; /* remove */
891  else
892  workspace[surviving_xids++] = builder->committed.xip[off];
893  }
894 
895  /* copy workspace back to persistent state */
896  memcpy(builder->committed.xip, workspace,
897  surviving_xids * sizeof(TransactionId));
898 
899  elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
900  (uint32) builder->committed.xcnt, (uint32) surviving_xids,
901  builder->xmin, builder->xmax);
902  builder->committed.xcnt = surviving_xids;
903 
904  pfree(workspace);
905 }
uint32 TransactionId
Definition: c.h:397
#define DEBUG3
Definition: elog.h:23
size_t xcnt
Definition: snapbuild.c:200
void pfree(void *pointer)
Definition: mcxt.c:950
TransactionId * xip
Definition: snapbuild.c:202
unsigned int uint32
Definition: c.h:268
TransactionId xmax
Definition: snapbuild.c:154
MemoryContext context
Definition: snapbuild.c:148
#define NormalTransactionIdPrecedes(id1, id2)
Definition: transam.h:62
TransactionId xmin
Definition: snapbuild.c:151
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
struct SnapBuild::@23 committed
#define elog
Definition: elog.h:219
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
static bool SnapBuildRestore ( SnapBuild builder,
XLogRecPtr  lsn 
)
static

Definition at line 1665 of file snapbuild.c.

References Assert, SnapBuildOnDisk::builder, SnapBuildOnDisk::checksum, CloseTransientFile(), SnapBuild::committed, COMP_CRC32C, SnapBuild::context, EQ_CRC32C, ereport, errcode_for_file_access(), errdetail(), errmsg(), ERROR, fd(), FIN_CRC32C, fsync_fname(), INIT_CRC32C, SnapBuild::initial_xmin_horizon, InvalidTransactionId, LOG, SnapBuildOnDisk::magic, MAXPGPATH, MemoryContextAllocZero(), NULL, OpenTransientFile(), pfree(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), read, SnapBuild::reorder, ReorderBufferSetRestartPoint(), SnapBuild::running, SNAPBUILD_CONSISTENT, SNAPBUILD_MAGIC, SNAPBUILD_VERSION, SnapBuildBuildSnapshot(), SnapBuildOnDiskConstantSize, SnapBuildOnDiskNotChecksummedSize, SnapBuildSnapDecRefcount(), SnapBuildSnapIncRefcount(), SnapBuild::snapshot, SnapBuild::state, TransactionIdPrecedes(), SnapBuildOnDisk::version, WAIT_EVENT_SNAPBUILD_READ, SnapBuild::xcnt, SnapBuild::xcnt_space, SnapBuild::xip, SnapBuild::xmax, and SnapBuild::xmin.

Referenced by SnapBuildFindSnapshot(), and SnapBuildSerializationPoint().

1666 {
1667  SnapBuildOnDisk ondisk;
1668  int fd;
1669  char path[MAXPGPATH];
1670  Size sz;
1671  int readBytes;
1672  pg_crc32c checksum;
1673 
1674  /* no point in loading a snapshot if we're already there */
1675  if (builder->state == SNAPBUILD_CONSISTENT)
1676  return false;
1677 
1678  sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1679  (uint32) (lsn >> 32), (uint32) lsn);
1680 
1681  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
1682 
1683  if (fd < 0 && errno == ENOENT)
1684  return false;
1685  else if (fd < 0)
1686  ereport(ERROR,
1688  errmsg("could not open file \"%s\": %m", path)));
1689 
1690  /* ----
1691  * Make sure the snapshot had been stored safely to disk, that's normally
1692  * cheap.
1693  * Note that we do not need PANIC here, nobody will be able to use the
1694  * slot without fsyncing, and saving it won't succeed without an fsync()
1695  * either...
1696  * ----
1697  */
1698  fsync_fname(path, false);
1699  fsync_fname("pg_logical/snapshots", true);
1700 
1701 
1702  /* read statically sized portion of snapshot */
1704  readBytes = read(fd, &ondisk, SnapBuildOnDiskConstantSize);
1706  if (readBytes != SnapBuildOnDiskConstantSize)
1707  {
1708  CloseTransientFile(fd);
1709  ereport(ERROR,
1711  errmsg("could not read file \"%s\", read %d of %d: %m",
1712  path, readBytes, (int) SnapBuildOnDiskConstantSize)));
1713  }
1714 
1715  if (ondisk.magic != SNAPBUILD_MAGIC)
1716  ereport(ERROR,
1717  (errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
1718  path, ondisk.magic, SNAPBUILD_MAGIC)));
1719 
1720  if (ondisk.version != SNAPBUILD_VERSION)
1721  ereport(ERROR,
1722  (errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
1723  path, ondisk.version, SNAPBUILD_VERSION)));
1724 
1725  INIT_CRC32C(checksum);
1726  COMP_CRC32C(checksum,
1727  ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
1729 
1730  /* read SnapBuild */
1732  readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild));
1734  if (readBytes != sizeof(SnapBuild))
1735  {
1736  CloseTransientFile(fd);
1737  ereport(ERROR,
1739  errmsg("could not read file \"%s\", read %d of %d: %m",
1740  path, readBytes, (int) sizeof(SnapBuild))));
1741  }
1742  COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
1743 
1744  /* restore running xacts information */
1745  sz = sizeof(TransactionId) * ondisk.builder.running.xcnt_space;
1746  ondisk.builder.running.xip = MemoryContextAllocZero(builder->context, sz);
1748  readBytes = read(fd, ondisk.builder.running.xip, sz);
1750  if (readBytes != sz)
1751  {
1752  CloseTransientFile(fd);
1753  ereport(ERROR,
1755  errmsg("could not read file \"%s\", read %d of %d: %m",
1756  path, readBytes, (int) sz)));
1757  }
1758  COMP_CRC32C(checksum, ondisk.builder.running.xip, sz);
1759 
1760  /* restore committed xacts information */
1761  sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
1762  ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
1764  readBytes = read(fd, ondisk.builder.committed.xip, sz);
1766  if (readBytes != sz)
1767  {
1768  CloseTransientFile(fd);
1769  ereport(ERROR,
1771  errmsg("could not read file \"%s\", read %d of %d: %m",
1772  path, readBytes, (int) sz)));
1773  }
1774  COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
1775 
1776  CloseTransientFile(fd);
1777 
1778  FIN_CRC32C(checksum);
1779 
1780  /* verify checksum of what we've read */
1781  if (!EQ_CRC32C(checksum, ondisk.checksum))
1782  ereport(ERROR,
1784  errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
1785  path, checksum, ondisk.checksum)));
1786 
1787  /*
1788  * ok, we now have a sensible snapshot here, figure out if it has more
1789  * information than we have.
1790  */
1791 
1792  /*
1793  * We are only interested in consistent snapshots for now, comparing
1794  * whether one incomplete snapshot is more "advanced" seems to be
1795  * unnecessarily complex.
1796  */
1797  if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
1798  goto snapshot_not_interesting;
1799 
1800  /*
1801  * Don't use a snapshot that requires an xmin that we cannot guarantee to
1802  * be available.
1803  */
1805  goto snapshot_not_interesting;
1806 
1807 
1808  /* ok, we think the snapshot is sensible, copy over everything important */
1809  builder->xmin = ondisk.builder.xmin;
1810  builder->xmax = ondisk.builder.xmax;
1811  builder->state = ondisk.builder.state;
1812 
1813  builder->committed.xcnt = ondisk.builder.committed.xcnt;
1814  /* We only allocated/stored xcnt, not xcnt_space xids ! */
1815  /* don't overwrite preallocated xip, if we don't have anything here */
1816  if (builder->committed.xcnt > 0)
1817  {
1818  pfree(builder->committed.xip);
1819  builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
1820  builder->committed.xip = ondisk.builder.committed.xip;
1821  }
1822  ondisk.builder.committed.xip = NULL;
1823 
1824  builder->running.xcnt = ondisk.builder.running.xcnt;
1825  if (builder->running.xip)
1826  pfree(builder->running.xip);
1827  builder->running.xcnt_space = ondisk.builder.running.xcnt_space;
1828  builder->running.xip = ondisk.builder.running.xip;
1829 
1830  /* our snapshot is not interesting anymore, build a new one */
1831  if (builder->snapshot != NULL)
1832  {
1834  }
1837 
1838  ReorderBufferSetRestartPoint(builder->reorder, lsn);
1839 
1840  Assert(builder->state == SNAPBUILD_CONSISTENT);
1841 
1842  ereport(LOG,
1843  (errmsg("logical decoding found consistent point at %X/%X",
1844  (uint32) (lsn >> 32), (uint32) lsn),
1845  errdetail("Logical decoding will begin using saved snapshot.")));
1846  return true;
1847 
1848 snapshot_not_interesting:
1849  if (ondisk.builder.running.xip != NULL)
1850  pfree(ondisk.builder.running.xip);
1851  if (ondisk.builder.committed.xip != NULL)
1852  pfree(ondisk.builder.committed.xip);
1853  return false;
1854 }
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
#define SNAPBUILD_VERSION
Definition: snapbuild.c:1446
uint32 TransactionId
Definition: c.h:397
#define SNAPBUILD_MAGIC
Definition: snapbuild.c:1445
pg_crc32c checksum
Definition: snapbuild.c:1425
Snapshot snapshot
Definition: snapbuild.c:171
uint32 pg_crc32c
Definition: pg_crc32c.h:38
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:567
size_t xcnt_space
Definition: snapbuild.c:201
size_t xcnt
Definition: snapbuild.c:200
#define LOG
Definition: elog.h:26
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1038
void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
SnapBuildState state
Definition: snapbuild.c:145
void pfree(void *pointer)
Definition: mcxt.c:950
#define ERROR
Definition: elog.h:43
struct SnapBuild::@22 running
#define MAXPGPATH
SnapBuild builder
Definition: snapbuild.c:1435
ReorderBuffer * reorder
Definition: snapbuild.c:181
TransactionId initial_xmin_horizon
Definition: snapbuild.c:166
TransactionId * xip
Definition: snapbuild.c:202
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2144
int errdetail(const char *fmt,...)
Definition: elog.c:873
int errcode_for_file_access(void)
Definition: elog.c:598
#define InvalidTransactionId
Definition: transam.h:31
unsigned int uint32
Definition: c.h:268
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1231
#define EQ_CRC32C(c1, c2)
Definition: pg_crc32c.h:42
#define ereport(elevel, rest)
Definition: elog.h:122
static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid)
Definition: snapbuild.c:430
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
int CloseTransientFile(int fd)
Definition: fd.c:2305
TransactionId xmax
Definition: snapbuild.c:154
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:742
#define SnapBuildOnDiskConstantSize
Definition: snapbuild.c:1440
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
#define SnapBuildOnDiskNotChecksummedSize
Definition: snapbuild.c:1442
size_t Size
Definition: c.h:356
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1207
MemoryContext context
Definition: snapbuild.c:148
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:398
int errmsg(const char *fmt,...)
Definition: elog.c:797
TransactionId xmin
Definition: snapbuild.c:151
struct SnapBuild::@23 committed
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:73
static void SnapBuildSnapIncRefcount(Snapshot snap)
Definition: snapbuild.c:386
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:78
#define read(a, b, c)
Definition: win32.h:18
void SnapBuildSerializationPoint ( SnapBuild builder,
XLogRecPtr  lsn 
)

Definition at line 1455 of file snapbuild.c.

References SNAPBUILD_CONSISTENT, SnapBuildRestore(), SnapBuildSerialize(), and SnapBuild::state.

Referenced by DecodeXLogOp().

1456 {
1457  if (builder->state < SNAPBUILD_CONSISTENT)
1458  SnapBuildRestore(builder, lsn);
1459  else
1460  SnapBuildSerialize(builder, lsn);
1461 }
static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
Definition: snapbuild.c:1665
SnapBuildState state
Definition: snapbuild.c:145
static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
Definition: snapbuild.c:1468
static void SnapBuildSerialize ( SnapBuild builder,
XLogRecPtr  lsn 
)
static

Definition at line 1468 of file snapbuild.c.

References Assert, SnapBuildOnDisk::builder, SnapBuildOnDisk::checksum, CloseTransientFile(), SnapBuild::committed, COMP_CRC32C, SnapBuild::context, DEBUG1, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FIN_CRC32C, fsync_fname(), INIT_CRC32C, InvalidXLogRecPtr, SnapBuild::last_serialized_snapshot, SnapBuildOnDisk::length, SnapBuildOnDisk::magic, MAXPGPATH, MemoryContextAllocZero(), MyProcPid, NULL, OpenTransientFile(), PG_BINARY, pg_fsync(), pgstat_report_wait_end(), pgstat_report_wait_start(), SnapBuild::reorder, ReorderBufferSetRestartPoint(), SnapBuild::running, SNAPBUILD_CONSISTENT, SNAPBUILD_MAGIC, SNAPBUILD_VERSION, SnapBuildOnDiskConstantSize, SnapBuildOnDiskNotChecksummedSize, SnapBuild::snapshot, SnapBuild::state, unlink(), SnapBuildOnDisk::version, WAIT_EVENT_SNAPBUILD_SYNC, WAIT_EVENT_SNAPBUILD_WRITE, write, SnapBuild::xcnt, SnapBuild::xcnt_space, and SnapBuild::xip.

Referenced by SnapBuildProcessRunningXacts(), and SnapBuildSerializationPoint().

1469 {
1470  Size needed_length;
1471  SnapBuildOnDisk *ondisk;
1472  char *ondisk_c;
1473  int fd;
1474  char tmppath[MAXPGPATH];
1475  char path[MAXPGPATH];
1476  int ret;
1477  struct stat stat_buf;
1478  Size sz;
1479 
1480  Assert(lsn != InvalidXLogRecPtr);
1482  builder->last_serialized_snapshot <= lsn);
1483 
1484  /*
1485  * no point in serializing if we cannot continue to work immediately after
1486  * restoring the snapshot
1487  */
1488  if (builder->state < SNAPBUILD_CONSISTENT)
1489  return;
1490 
1491  /*
1492  * We identify snapshots by the LSN they are valid for. We don't need to
1493  * include timelines in the name as each LSN maps to exactly one timeline
1494  * unless the user used pg_resetwal or similar. If a user did so, there's
1495  * no hope continuing to decode anyway.
1496  */
1497  sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1498  (uint32) (lsn >> 32), (uint32) lsn);
1499 
1500  /*
1501  * first check whether some other backend already has written the snapshot
1502  * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
1503  * as a valid state. Everything else is an unexpected error.
1504  */
1505  ret = stat(path, &stat_buf);
1506 
1507  if (ret != 0 && errno != ENOENT)
1508  ereport(ERROR,
1509  (errmsg("could not stat file \"%s\": %m", path)));
1510 
1511  else if (ret == 0)
1512  {
1513  /*
1514  * somebody else has already serialized to this point, don't overwrite
1515  * but remember location, so we don't need to read old data again.
1516  *
1517  * To be sure it has been synced to disk after the rename() from the
1518  * tempfile filename to the real filename, we just repeat the fsync.
1519  * That ought to be cheap because in most scenarios it should already
1520  * be safely on disk.
1521  */
1522  fsync_fname(path, false);
1523  fsync_fname("pg_logical/snapshots", true);
1524 
1525  builder->last_serialized_snapshot = lsn;
1526  goto out;
1527  }
1528 
1529  /*
1530  * there is an obvious race condition here between the time we stat(2) the
1531  * file and us writing the file. But we rename the file into place
1532  * atomically and all files created need to contain the same data anyway,
1533  * so this is perfectly fine, although a bit of a resource waste. Locking
1534  * seems like pointless complication.
1535  */
1536  elog(DEBUG1, "serializing snapshot to %s", path);
1537 
1538  /* to make sure only we will write to this tempfile, include pid */
1539  sprintf(tmppath, "pg_logical/snapshots/%X-%X.snap.%u.tmp",
1540  (uint32) (lsn >> 32), (uint32) lsn, MyProcPid);
1541 
1542  /*
1543  * Unlink temporary file if it already exists, needs to have been before a
1544  * crash/error since we won't enter this function twice from within a
1545  * single decoding slot/backend and the temporary file contains the pid of
1546  * the current process.
1547  */
1548  if (unlink(tmppath) != 0 && errno != ENOENT)
1549  ereport(ERROR,
1551  errmsg("could not remove file \"%s\": %m", path)));
1552 
1553  needed_length = sizeof(SnapBuildOnDisk) +
1554  sizeof(TransactionId) * builder->running.xcnt_space +
1555  sizeof(TransactionId) * builder->committed.xcnt;
1556 
1557  ondisk_c = MemoryContextAllocZero(builder->context, needed_length);
1558  ondisk = (SnapBuildOnDisk *) ondisk_c;
1559  ondisk->magic = SNAPBUILD_MAGIC;
1560  ondisk->version = SNAPBUILD_VERSION;
1561  ondisk->length = needed_length;
1562  INIT_CRC32C(ondisk->checksum);
1563  COMP_CRC32C(ondisk->checksum,
1564  ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1566  ondisk_c += sizeof(SnapBuildOnDisk);
1567 
1568  memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
1569  /* NULL-ify memory-only data */
1570  ondisk->builder.context = NULL;
1571  ondisk->builder.snapshot = NULL;
1572  ondisk->builder.reorder = NULL;
1573  ondisk->builder.running.xip = NULL;
1574  ondisk->builder.committed.xip = NULL;
1575 
1576  COMP_CRC32C(ondisk->checksum,
1577  &ondisk->builder,
1578  sizeof(SnapBuild));
1579 
1580  /* copy running xacts */
1581  sz = sizeof(TransactionId) * builder->running.xcnt_space;
1582  memcpy(ondisk_c, builder->running.xip, sz);
1583  COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1584  ondisk_c += sz;
1585 
1586  /* copy committed xacts */
1587  sz = sizeof(TransactionId) * builder->committed.xcnt;
1588  memcpy(ondisk_c, builder->committed.xip, sz);
1589  COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1590  ondisk_c += sz;
1591 
1592  FIN_CRC32C(ondisk->checksum);
1593 
1594  /* we have valid data now, open tempfile and write it there */
1595  fd = OpenTransientFile(tmppath,
1596  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
1597  S_IRUSR | S_IWUSR);
1598  if (fd < 0)
1599  ereport(ERROR,
1600  (errmsg("could not open file \"%s\": %m", path)));
1601 
1603  if ((write(fd, ondisk, needed_length)) != needed_length)
1604  {
1605  CloseTransientFile(fd);
1606  ereport(ERROR,
1608  errmsg("could not write to file \"%s\": %m", tmppath)));
1609  }
1611 
1612  /*
1613  * fsync the file before renaming so that even if we crash after this we
1614  * have either a fully valid file or nothing.
1615  *
1616  * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
1617  * some noticeable overhead since it's performed synchronously during
1618  * decoding?
1619  */
1621  if (pg_fsync(fd) != 0)
1622  {
1623  CloseTransientFile(fd);
1624  ereport(ERROR,
1626  errmsg("could not fsync file \"%s\": %m", tmppath)));
1627  }
1629  CloseTransientFile(fd);
1630 
1631  fsync_fname("pg_logical/snapshots", true);
1632 
1633  /*
1634  * We may overwrite the work from some other backend, but that's ok, our
1635  * snapshot is valid as well, we'll just have done some superfluous work.
1636  */
1637  if (rename(tmppath, path) != 0)
1638  {
1639  ereport(ERROR,
1641  errmsg("could not rename file \"%s\" to \"%s\": %m",
1642  tmppath, path)));
1643  }
1644 
1645  /* make sure we persist */
1646  fsync_fname(path, false);
1647  fsync_fname("pg_logical/snapshots", true);
1648 
1649  /*
1650  * Now there's no way we can loose the dumped state anymore, remember this
1651  * as a serialization point.
1652  */
1653  builder->last_serialized_snapshot = lsn;
1654 
1655 out:
1657  builder->last_serialized_snapshot);
1658 }
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:38
#define SNAPBUILD_VERSION
Definition: snapbuild.c:1446
uint32 TransactionId
Definition: c.h:397
#define SNAPBUILD_MAGIC
Definition: snapbuild.c:1445
#define write(a, b, c)
Definition: win32.h:19
pg_crc32c checksum
Definition: snapbuild.c:1425
Snapshot snapshot
Definition: snapbuild.c:171
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:567
size_t xcnt_space
Definition: snapbuild.c:201
size_t xcnt
Definition: snapbuild.c:200
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1038
void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
SnapBuildState state
Definition: snapbuild.c:145
#define ERROR
Definition: elog.h:43
struct stat stat_buf
Definition: pg_standby.c:101
struct SnapBuild::@22 running
#define MAXPGPATH
SnapBuild builder
Definition: snapbuild.c:1435
ReorderBuffer * reorder
Definition: snapbuild.c:181
TransactionId * xip
Definition: snapbuild.c:202
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2144
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:268
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1231
int unlink(const char *filename)
#define ereport(elevel, rest)
Definition: elog.h:122
XLogRecPtr last_serialized_snapshot
Definition: snapbuild.c:176
int CloseTransientFile(int fd)
Definition: fd.c:2305
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:742
#define SnapBuildOnDiskConstantSize
Definition: snapbuild.c:1440
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
#define SnapBuildOnDiskNotChecksummedSize
Definition: snapbuild.c:1442
size_t Size
Definition: c.h:356
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1207
MemoryContext context
Definition: snapbuild.c:148
int errmsg(const char *fmt,...)
Definition: elog.c:797
struct SnapBuild::@23 committed
int pg_fsync(int fd)
Definition: fd.c:333
struct SnapBuildOnDisk SnapBuildOnDisk
#define elog
Definition: elog.h:219
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:73
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:78
void SnapBuildSnapDecRefcount ( Snapshot  snap)

Definition at line 398 of file snapbuild.c.

References SnapshotData::active_count, Assert, SnapshotData::copied, SnapshotData::curcid, elog, ERROR, FirstCommandId, HeapTupleSatisfiesHistoricMVCC(), SnapshotData::regd_count, SnapshotData::satisfies, SnapBuildFreeSnapshot(), SnapshotData::suboverflowed, and SnapshotData::takenDuringRecovery.

Referenced by FreeSnapshotBuilder(), ReorderBufferCleanupTXN(), ReorderBufferFreeSnap(), SnapBuildCommitTxn(), and SnapBuildRestore().

399 {
400  /* make sure we don't get passed an external snapshot */
402 
403  /* make sure nobody modified our snapshot */
404  Assert(snap->curcid == FirstCommandId);
405  Assert(!snap->suboverflowed);
406  Assert(!snap->takenDuringRecovery);
407 
408  Assert(snap->regd_count == 0);
409 
410  Assert(snap->active_count > 0);
411 
412  /* slightly more likely, so it's checked even without casserts */
413  if (snap->copied)
414  elog(ERROR, "cannot free a copied snapshot");
415 
416  snap->active_count--;
417  if (snap->active_count == 0)
418  SnapBuildFreeSnapshot(snap);
419 }
SnapshotSatisfiesFunc satisfies
Definition: snapshot.h:54
bool copied
Definition: snapshot.h:94
bool suboverflowed
Definition: snapshot.h:91
uint32 regd_count
Definition: snapshot.h:108
#define FirstCommandId
Definition: c.h:413
#define ERROR
Definition: elog.h:43
static void SnapBuildFreeSnapshot(Snapshot snap)
Definition: snapbuild.c:340
bool HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot, Buffer buffer)
Definition: tqual.c:1652
CommandId curcid
Definition: snapshot.h:96
#define Assert(condition)
Definition: c.h:675
bool takenDuringRecovery
Definition: snapshot.h:93
#define elog
Definition: elog.h:219
uint32 active_count
Definition: snapshot.h:107
static void SnapBuildSnapIncRefcount ( Snapshot  snap)
static
static bool SnapBuildTxnIsRunning ( SnapBuild builder,
TransactionId  xid 
)
static

Definition at line 762 of file snapbuild.c.

References Assert, NormalTransactionIdFollows, NormalTransactionIdPrecedes, NULL, SnapBuild::running, SNAPBUILD_CONSISTENT, SnapBuild::state, TransactionIdIsNormal, SnapBuild::xcnt, SnapBuild::xcnt_space, xidComparator(), SnapBuild::xip, SnapBuild::xmax, and SnapBuild::xmin.

Referenced by SnapBuildEndTxn(), and SnapBuildProcessChange().

763 {
764  Assert(builder->state < SNAPBUILD_CONSISTENT);
767 
768  if (builder->running.xcnt &&
769  NormalTransactionIdFollows(xid, builder->running.xmin) &&
771  {
772  TransactionId *search =
773  bsearch(&xid, builder->running.xip, builder->running.xcnt_space,
774  sizeof(TransactionId), xidComparator);
775 
776  if (search != NULL)
777  {
778  Assert(*search == xid);
779  return true;
780  }
781  }
782 
783  return false;
784 }
uint32 TransactionId
Definition: c.h:397
size_t xcnt_space
Definition: snapbuild.c:201
size_t xcnt
Definition: snapbuild.c:200
SnapBuildState state
Definition: snapbuild.c:145
struct SnapBuild::@22 running
TransactionId * xip
Definition: snapbuild.c:202
TransactionId xmax
Definition: snapbuild.c:154
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
#define NormalTransactionIdFollows(id1, id2)
Definition: transam.h:67
#define NormalTransactionIdPrecedes(id1, id2)
Definition: transam.h:62
TransactionId xmin
Definition: snapbuild.c:151
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138
bool SnapBuildXactNeedsSkip ( SnapBuild builder,
XLogRecPtr  ptr 
)

Definition at line 374 of file snapbuild.c.

References SnapBuild::start_decoding_at.

Referenced by DecodeCommit(), and DecodeLogicalMsgOp().

375 {
376  return ptr < builder->start_decoding_at;
377 }
XLogRecPtr start_decoding_at
Definition: snapbuild.c:160

Variable Documentation

bool ExportInProgress = false
static

Definition at line 247 of file snapbuild.c.

Referenced by SnapBuildClearExportedSnapshot(), and SnapBuildExportSnapshot().

ResourceOwner SavedResourceOwnerDuringExport = NULL
static

Definition at line 246 of file snapbuild.c.

Referenced by SnapBuildClearExportedSnapshot().