PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
snapbuild.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "miscadmin.h"
#include "access/heapam_xlog.h"
#include "access/transam.h"
#include "access/xact.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/snapshot.h"
#include "utils/snapmgr.h"
#include "utils/tqual.h"
#include "storage/block.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/standby.h"
Include dependency graph for snapbuild.c:

Go to the source code of this file.

Data Structures

struct  SnapBuild
 
struct  SnapBuildOnDisk
 

Macros

#define SnapBuildOnDiskConstantSize   offsetof(SnapBuildOnDisk, builder)
 
#define SnapBuildOnDiskNotChecksummedSize   offsetof(SnapBuildOnDisk, version)
 
#define SNAPBUILD_MAGIC   0x51A1E001
 
#define SNAPBUILD_VERSION   2
 

Typedefs

typedef struct SnapBuildOnDisk SnapBuildOnDisk
 

Functions

static void SnapBuildEndTxn (SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
 
static bool SnapBuildTxnIsRunning (SnapBuild *builder, TransactionId xid)
 
static void SnapBuildPurgeCommittedTxn (SnapBuild *builder)
 
static Snapshot SnapBuildBuildSnapshot (SnapBuild *builder, TransactionId xid)
 
static void SnapBuildFreeSnapshot (Snapshot snap)
 
static void SnapBuildSnapIncRefcount (Snapshot snap)
 
static void SnapBuildDistributeNewCatalogSnapshot (SnapBuild *builder, XLogRecPtr lsn)
 
static bool SnapBuildFindSnapshot (SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
 
static void SnapBuildSerialize (SnapBuild *builder, XLogRecPtr lsn)
 
static bool SnapBuildRestore (SnapBuild *builder, XLogRecPtr lsn)
 
SnapBuildAllocateSnapshotBuilder (ReorderBuffer *reorder, TransactionId xmin_horizon, XLogRecPtr start_lsn)
 
void FreeSnapshotBuilder (SnapBuild *builder)
 
SnapBuildState SnapBuildCurrentState (SnapBuild *builder)
 
bool SnapBuildXactNeedsSkip (SnapBuild *builder, XLogRecPtr ptr)
 
void SnapBuildSnapDecRefcount (Snapshot snap)
 
const char * SnapBuildExportSnapshot (SnapBuild *builder)
 
Snapshot SnapBuildGetOrBuildSnapshot (SnapBuild *builder, TransactionId xid)
 
void SnapBuildClearExportedSnapshot (void)
 
bool SnapBuildProcessChange (SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
 
void SnapBuildProcessNewCid (SnapBuild *builder, TransactionId xid, XLogRecPtr lsn, xl_heap_new_cid *xlrec)
 
static void SnapBuildAddCommittedTxn (SnapBuild *builder, TransactionId xid)
 
void SnapBuildAbortTxn (SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, int nsubxacts, TransactionId *subxacts)
 
void SnapBuildCommitTxn (SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, int nsubxacts, TransactionId *subxacts)
 
void SnapBuildProcessRunningXacts (SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
 
void SnapBuildSerializationPoint (SnapBuild *builder, XLogRecPtr lsn)
 
void CheckPointSnapBuild (void)
 

Variables

static ResourceOwner SavedResourceOwnerDuringExport = NULL
 
static bool ExportInProgress = false
 

Macro Definition Documentation

#define SNAPBUILD_MAGIC   0x51A1E001

Definition at line 1427 of file snapbuild.c.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

#define SNAPBUILD_VERSION   2

Definition at line 1428 of file snapbuild.c.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

#define SnapBuildOnDiskConstantSize   offsetof(SnapBuildOnDisk, builder)

Definition at line 1422 of file snapbuild.c.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

#define SnapBuildOnDiskNotChecksummedSize   offsetof(SnapBuildOnDisk, version)

Definition at line 1424 of file snapbuild.c.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

Typedef Documentation

Function Documentation

SnapBuild* AllocateSnapshotBuilder ( ReorderBuffer reorder,
TransactionId  xmin_horizon,
XLogRecPtr  start_lsn 
)

Definition at line 281 of file snapbuild.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate(), SnapBuild::committed, SnapBuild::context, CurrentMemoryContext, SnapBuild::includes_all_transactions, SnapBuild::initial_xmin_horizon, MemoryContextSwitchTo(), palloc0(), SnapBuild::reorder, SNAPBUILD_START, SnapBuild::start_decoding_at, SnapBuild::state, SnapBuild::xcnt, SnapBuild::xcnt_space, and SnapBuild::xip.

Referenced by StartupDecodingContext().

284 {
285  MemoryContext context;
286  MemoryContext oldcontext;
287  SnapBuild *builder;
288 
289  /* allocate memory in own context, to have better accountability */
291  "snapshot builder context",
293  oldcontext = MemoryContextSwitchTo(context);
294 
295  builder = palloc0(sizeof(SnapBuild));
296 
297  builder->state = SNAPBUILD_START;
298  builder->context = context;
299  builder->reorder = reorder;
300  /* Other struct members initialized by zeroing via palloc0 above */
301 
302  builder->committed.xcnt = 0;
303  builder->committed.xcnt_space = 128; /* arbitrary number */
304  builder->committed.xip =
305  palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
306  builder->committed.includes_all_transactions = true;
307 
308  builder->initial_xmin_horizon = xmin_horizon;
309  builder->start_decoding_at = start_lsn;
310 
311  MemoryContextSwitchTo(oldcontext);
312 
313  return builder;
314 }
uint32 TransactionId
Definition: c.h:394
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
size_t xcnt_space
Definition: snapbuild.c:200
size_t xcnt
Definition: snapbuild.c:199
SnapBuildState state
Definition: snapbuild.c:144
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:145
ReorderBuffer * reorder
Definition: snapbuild.c:180
TransactionId initial_xmin_horizon
Definition: snapbuild.c:165
TransactionId * xip
Definition: snapbuild.c:201
bool includes_all_transactions
Definition: snapbuild.c:221
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:440
void * palloc0(Size size)
Definition: mcxt.c:920
MemoryContext context
Definition: snapbuild.c:147
struct SnapBuild::@23 committed
XLogRecPtr start_decoding_at
Definition: snapbuild.c:159
void CheckPointSnapBuild ( void  )

Definition at line 1835 of file snapbuild.c.

References AllocateDir(), dirent::d_name, DEBUG1, elog, ereport, errcode_for_file_access(), errmsg(), FreeDir(), GetRedoRecPtr(), InvalidXLogRecPtr, LOG, lstat, MAXPGPATH, NULL, ReadDir(), ReplicationSlotsComputeLogicalRestartLSN(), snprintf(), and unlink().

Referenced by CheckPointGuts().

1836 {
1837  XLogRecPtr cutoff;
1838  XLogRecPtr redo;
1839  DIR *snap_dir;
1840  struct dirent *snap_de;
1841  char path[MAXPGPATH];
1842 
1843  /*
1844  * We start of with a minimum of the last redo pointer. No new replication
1845  * slot will start before that, so that's a safe upper bound for removal.
1846  */
1847  redo = GetRedoRecPtr();
1848 
1849  /* now check for the restart ptrs from existing slots */
1851 
1852  /* don't start earlier than the restart lsn */
1853  if (redo < cutoff)
1854  cutoff = redo;
1855 
1856  snap_dir = AllocateDir("pg_logical/snapshots");
1857  while ((snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL)
1858  {
1859  uint32 hi;
1860  uint32 lo;
1861  XLogRecPtr lsn;
1862  struct stat statbuf;
1863 
1864  if (strcmp(snap_de->d_name, ".") == 0 ||
1865  strcmp(snap_de->d_name, "..") == 0)
1866  continue;
1867 
1868  snprintf(path, MAXPGPATH, "pg_logical/snapshots/%s", snap_de->d_name);
1869 
1870  if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1871  {
1872  elog(DEBUG1, "only regular files expected: %s", path);
1873  continue;
1874  }
1875 
1876  /*
1877  * temporary filenames from SnapBuildSerialize() include the LSN and
1878  * everything but are postfixed by .$pid.tmp. We can just remove them
1879  * the same as other files because there can be none that are
1880  * currently being written that are older than cutoff.
1881  *
1882  * We just log a message if a file doesn't fit the pattern, it's
1883  * probably some editors lock/state file or similar...
1884  */
1885  if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
1886  {
1887  ereport(LOG,
1888  (errmsg("could not parse file name \"%s\"", path)));
1889  continue;
1890  }
1891 
1892  lsn = ((uint64) hi) << 32 | lo;
1893 
1894  /* check whether we still need it */
1895  if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
1896  {
1897  elog(DEBUG1, "removing snapbuild snapshot %s", path);
1898 
1899  /*
1900  * It's not particularly harmful, though strange, if we can't
1901  * remove the file here. Don't prevent the checkpoint from
1902  * completing, that'd be cure worse than the disease.
1903  */
1904  if (unlink(path) < 0)
1905  {
1906  ereport(LOG,
1908  errmsg("could not remove file \"%s\": %m",
1909  path)));
1910  continue;
1911  }
1912  }
1913  }
1914  FreeDir(snap_dir);
1915 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG1
Definition: elog.h:25
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define LOG
Definition: elog.h:26
Definition: dirent.h:9
Definition: dirent.c:25
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:598
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:707
unsigned int uint32
Definition: c.h:265
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2284
int unlink(const char *filename)
#define ereport(elevel, rest)
Definition: elog.h:122
#define NULL
Definition: c.h:226
uint64 XLogRecPtr
Definition: xlogdefs.h:21
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2350
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:8095
int errmsg(const char *fmt,...)
Definition: elog.c:797
char d_name[MAX_PATH]
Definition: dirent.h:14
#define elog
Definition: elog.h:219
#define lstat(path, sb)
Definition: win32.h:272
int FreeDir(DIR *dir)
Definition: fd.c:2393
void FreeSnapshotBuilder ( SnapBuild builder)

Definition at line 320 of file snapbuild.c.

References SnapBuild::context, MemoryContextDelete(), NULL, SnapBuildSnapDecRefcount(), and SnapBuild::snapshot.

Referenced by FreeDecodingContext().

321 {
322  MemoryContext context = builder->context;
323 
324  /* free snapshot explicitly, that contains some error checking */
325  if (builder->snapshot != NULL)
326  {
328  builder->snapshot = NULL;
329  }
330 
331  /* other resources are deallocated via memory context reset */
332  MemoryContextDelete(context);
333 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
Snapshot snapshot
Definition: snapbuild.c:170
#define NULL
Definition: c.h:226
MemoryContext context
Definition: snapbuild.c:147
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:397
void SnapBuildAbortTxn ( SnapBuild builder,
XLogRecPtr  lsn,
TransactionId  xid,
int  nsubxacts,
TransactionId subxacts 
)

Definition at line 930 of file snapbuild.c.

References i, and SnapBuildEndTxn().

Referenced by DecodeAbort().

933 {
934  int i;
935 
936  for (i = 0; i < nsubxacts; i++)
937  {
938  TransactionId subxid = subxacts[i];
939 
940  SnapBuildEndTxn(builder, lsn, subxid);
941  }
942 
943  SnapBuildEndTxn(builder, lsn, xid);
944 }
uint32 TransactionId
Definition: c.h:394
static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
Definition: snapbuild.c:894
int i
static void SnapBuildAddCommittedTxn ( SnapBuild builder,
TransactionId  xid 
)
static

Definition at line 823 of file snapbuild.c.

References Assert, SnapBuild::committed, DEBUG1, elog, repalloc(), TransactionIdIsValid, SnapBuild::xcnt, SnapBuild::xcnt_space, and SnapBuild::xip.

Referenced by SnapBuildCommitTxn().

824 {
826 
827  if (builder->committed.xcnt == builder->committed.xcnt_space)
828  {
829  builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
830 
831  elog(DEBUG1, "increasing space for committed transactions to %u",
832  (uint32) builder->committed.xcnt_space);
833 
834  builder->committed.xip = repalloc(builder->committed.xip,
835  builder->committed.xcnt_space * sizeof(TransactionId));
836  }
837 
838  /*
839  * TODO: It might make sense to keep the array sorted here instead of
840  * doing it every time we build a new snapshot. On the other hand this
841  * gets called repeatedly when a transaction with subtransactions commits.
842  */
843  builder->committed.xip[builder->committed.xcnt++] = xid;
844 }
#define DEBUG1
Definition: elog.h:25
uint32 TransactionId
Definition: c.h:394
size_t xcnt_space
Definition: snapbuild.c:200
size_t xcnt
Definition: snapbuild.c:199
TransactionId * xip
Definition: snapbuild.c:201
unsigned int uint32
Definition: c.h:265
#define Assert(condition)
Definition: c.h:671
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1021
struct SnapBuild::@23 committed
#define elog
Definition: elog.h:219
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static Snapshot SnapBuildBuildSnapshot ( SnapBuild builder,
TransactionId  xid 
)
static

Definition at line 429 of file snapbuild.c.

References SnapshotData::active_count, Assert, SnapBuild::committed, SnapBuild::context, SnapshotData::copied, SnapshotData::curcid, FirstCommandId, HeapTupleSatisfiesHistoricMVCC(), MemoryContextAllocZero(), NULL, qsort, SnapshotData::regd_count, SnapshotData::satisfies, SNAPBUILD_FULL_SNAPSHOT, SnapBuild::state, SnapshotData::suboverflowed, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::takenDuringRecovery, TransactionIdIsNormal, SnapshotData::xcnt, SnapBuild::xcnt, xidComparator(), SnapshotData::xip, SnapBuild::xip, SnapshotData::xmax, SnapBuild::xmax, SnapshotData::xmin, and SnapBuild::xmin.

Referenced by SnapBuildCommitTxn(), SnapBuildExportSnapshot(), SnapBuildGetOrBuildSnapshot(), SnapBuildProcessChange(), and SnapBuildRestore().

430 {
431  Snapshot snapshot;
432  Size ssize;
433 
434  Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
435 
436  ssize = sizeof(SnapshotData)
437  + sizeof(TransactionId) * builder->committed.xcnt
438  + sizeof(TransactionId) * 1 /* toplevel xid */ ;
439 
440  snapshot = MemoryContextAllocZero(builder->context, ssize);
441 
443 
444  /*
445  * We misuse the original meaning of SnapshotData's xip and subxip fields
446  * to make the more fitting for our needs.
447  *
448  * In the 'xip' array we store transactions that have to be treated as
449  * committed. Since we will only ever look at tuples from transactions
450  * that have modified the catalog it's more efficient to store those few
451  * that exist between xmin and xmax (frequently there are none).
452  *
453  * Snapshots that are used in transactions that have modified the catalog
454  * also use the 'subxip' array to store their toplevel xid and all the
455  * subtransaction xids so we can recognize when we need to treat rows as
456  * visible that are not in xip but still need to be visible. Subxip only
457  * gets filled when the transaction is copied into the context of a
458  * catalog modifying transaction since we otherwise share a snapshot
459  * between transactions. As long as a txn hasn't modified the catalog it
460  * doesn't need to treat any uncommitted rows as visible, so there is no
461  * need for those xids.
462  *
463  * Both arrays are qsort'ed so that we can use bsearch() on them.
464  */
465  Assert(TransactionIdIsNormal(builder->xmin));
466  Assert(TransactionIdIsNormal(builder->xmax));
467 
468  snapshot->xmin = builder->xmin;
469  snapshot->xmax = builder->xmax;
470 
471  /* store all transactions to be treated as committed by this snapshot */
472  snapshot->xip =
473  (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
474  snapshot->xcnt = builder->committed.xcnt;
475  memcpy(snapshot->xip,
476  builder->committed.xip,
477  builder->committed.xcnt * sizeof(TransactionId));
478 
479  /* sort so we can bsearch() */
480  qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
481 
482  /*
483  * Initially, subxip is empty, i.e. it's a snapshot to be used by
484  * transactions that don't modify the catalog. Will be filled by
485  * ReorderBufferCopySnap() if necessary.
486  */
487  snapshot->subxcnt = 0;
488  snapshot->subxip = NULL;
489 
490  snapshot->suboverflowed = false;
491  snapshot->takenDuringRecovery = false;
492  snapshot->copied = false;
493  snapshot->curcid = FirstCommandId;
494  snapshot->active_count = 0;
495  snapshot->regd_count = 0;
496 
497  return snapshot;
498 }
SnapshotSatisfiesFunc satisfies
Definition: snapshot.h:54
uint32 TransactionId
Definition: c.h:394
bool copied
Definition: snapshot.h:94
bool suboverflowed
Definition: snapshot.h:91
size_t xcnt
Definition: snapbuild.c:199
uint32 regd_count
Definition: snapshot.h:108
#define FirstCommandId
Definition: c.h:410
SnapBuildState state
Definition: snapbuild.c:144
TransactionId * xip
Definition: snapbuild.c:201
struct SnapshotData SnapshotData
TransactionId xmax
Definition: snapshot.h:67
TransactionId xmin
Definition: snapshot.h:66
TransactionId * xip
Definition: snapshot.h:77
TransactionId xmax
Definition: snapbuild.c:153
bool HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot, Buffer buffer)
Definition: tqual.c:1652
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:784
CommandId curcid
Definition: snapshot.h:96
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
bool takenDuringRecovery
Definition: snapshot.h:93
size_t Size
Definition: c.h:353
MemoryContext context
Definition: snapbuild.c:147
uint32 xcnt
Definition: snapshot.h:78
TransactionId xmin
Definition: snapbuild.c:150
struct SnapBuild::@23 committed
#define qsort(a, b, c, d)
Definition: port.h:440
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
TransactionId * subxip
Definition: snapshot.h:89
uint32 active_count
Definition: snapshot.h:107
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138
int32 subxcnt
Definition: snapshot.h:90
void SnapBuildClearExportedSnapshot ( void  )

Definition at line 630 of file snapbuild.c.

References AbortCurrentTransaction(), CurrentResourceOwner, elog, ERROR, ExportInProgress, IsTransactionState(), NULL, and SavedResourceOwnerDuringExport.

Referenced by exec_replication_command().

631 {
632  /* nothing exported, that is the usual case */
633  if (!ExportInProgress)
634  return;
635 
636  if (!IsTransactionState())
637  elog(ERROR, "clearing exported snapshot in wrong transaction state");
638 
639  /* make sure nothing could have ever happened */
641 
644  ExportInProgress = false;
645 }
void AbortCurrentTransaction(void)
Definition: xact.c:2984
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
static ResourceOwner SavedResourceOwnerDuringExport
Definition: snapbuild.c:245
#define ERROR
Definition: elog.h:43
static bool ExportInProgress
Definition: snapbuild.c:246
#define NULL
Definition: c.h:226
bool IsTransactionState(void)
Definition: xact.c:349
#define elog
Definition: elog.h:219
void SnapBuildCommitTxn ( SnapBuild builder,
XLogRecPtr  lsn,
TransactionId  xid,
int  nsubxacts,
TransactionId subxacts 
)

Definition at line 950 of file snapbuild.c.

References SnapBuild::committed, DEBUG1, DEBUG2, elog, SnapBuild::includes_all_transactions, NormalTransactionIdFollows, SnapBuild::reorder, ReorderBufferSetBaseSnapshot(), ReorderBufferXidHasBaseSnapshot(), ReorderBufferXidHasCatalogChanges(), SNAPBUILD_CONSISTENT, SNAPBUILD_FULL_SNAPSHOT, SnapBuildAddCommittedTxn(), SnapBuildBuildSnapshot(), SnapBuildDistributeNewCatalogSnapshot(), SnapBuildEndTxn(), SnapBuildSnapDecRefcount(), SnapBuildSnapIncRefcount(), SnapBuild::snapshot, SnapBuild::start_decoding_at, SnapBuild::state, TransactionIdAdvance, TransactionIdFollowsOrEquals(), TransactionIdIsValid, and SnapBuild::xmax.

Referenced by DecodeCommit().

952 {
953  int nxact;
954 
955  bool forced_timetravel = false;
956  bool sub_needs_timetravel = false;
957  bool top_needs_timetravel = false;
958 
959  TransactionId xmax = xid;
960 
961  /*
962  * If we couldn't observe every change of a transaction because it was
963  * already running at the point we started to observe we have to assume it
964  * made catalog changes.
965  *
966  * This has the positive benefit that we afterwards have enough
967  * information to build an exportable snapshot that's usable by pg_dump et
968  * al.
969  */
970  if (builder->state < SNAPBUILD_CONSISTENT)
971  {
972  /* ensure that only commits after this are getting replayed */
973  if (builder->start_decoding_at <= lsn)
974  builder->start_decoding_at = lsn + 1;
975 
976  /*
977  * We could avoid treating !SnapBuildTxnIsRunning transactions as
978  * timetravel ones, but we want to be able to export a snapshot when
979  * we reached consistency.
980  */
981  forced_timetravel = true;
982  elog(DEBUG1, "forced to assume catalog changes for xid %u because it was running too early", xid);
983  }
984 
985  for (nxact = 0; nxact < nsubxacts; nxact++)
986  {
987  TransactionId subxid = subxacts[nxact];
988 
989  /*
990  * make sure txn is not tracked in running txn's anymore, switch state
991  */
992  SnapBuildEndTxn(builder, lsn, subxid);
993 
994  /*
995  * If we're forcing timetravel we also need visibility information
996  * about subtransaction, so keep track of subtransaction's state.
997  */
998  if (forced_timetravel)
999  {
1000  SnapBuildAddCommittedTxn(builder, subxid);
1001  if (NormalTransactionIdFollows(subxid, xmax))
1002  xmax = subxid;
1003  }
1004 
1005  /*
1006  * Add subtransaction to base snapshot if it DDL, we don't distinguish
1007  * to toplevel transactions there.
1008  */
1009  else if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid))
1010  {
1011  sub_needs_timetravel = true;
1012 
1013  elog(DEBUG1, "found subtransaction %u:%u with catalog changes.",
1014  xid, subxid);
1015 
1016  SnapBuildAddCommittedTxn(builder, subxid);
1017 
1018  if (NormalTransactionIdFollows(subxid, xmax))
1019  xmax = subxid;
1020  }
1021  }
1022 
1023  /*
1024  * Make sure toplevel txn is not tracked in running txn's anymore, switch
1025  * state to consistent if possible.
1026  */
1027  SnapBuildEndTxn(builder, lsn, xid);
1028 
1029  if (forced_timetravel)
1030  {
1031  elog(DEBUG2, "forced transaction %u to do timetravel.", xid);
1032 
1033  SnapBuildAddCommittedTxn(builder, xid);
1034  }
1035  /* add toplevel transaction to base snapshot */
1036  else if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
1037  {
1038  elog(DEBUG2, "found top level transaction %u, with catalog changes!",
1039  xid);
1040 
1041  top_needs_timetravel = true;
1042  SnapBuildAddCommittedTxn(builder, xid);
1043  }
1044  else if (sub_needs_timetravel)
1045  {
1046  /* mark toplevel txn as timetravel as well */
1047  SnapBuildAddCommittedTxn(builder, xid);
1048  }
1049 
1050  /* if there's any reason to build a historic snapshot, do so now */
1051  if (forced_timetravel || top_needs_timetravel || sub_needs_timetravel)
1052  {
1053  /*
1054  * Adjust xmax of the snapshot builder, we only do that for committed,
1055  * catalog modifying, transactions, everything else isn't interesting
1056  * for us since we'll never look at the respective rows.
1057  */
1058  if (!TransactionIdIsValid(builder->xmax) ||
1059  TransactionIdFollowsOrEquals(xmax, builder->xmax))
1060  {
1061  builder->xmax = xmax;
1062  TransactionIdAdvance(builder->xmax);
1063  }
1064 
1065  /*
1066  * If we haven't built a complete snapshot yet there's no need to hand
1067  * it out, it wouldn't (and couldn't) be used anyway.
1068  */
1069  if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
1070  return;
1071 
1072  /*
1073  * Decrease the snapshot builder's refcount of the old snapshot, note
1074  * that it still will be used if it has been handed out to the
1075  * reorderbuffer earlier.
1076  */
1077  if (builder->snapshot)
1079 
1080  builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
1081 
1082  /* we might need to execute invalidations, add snapshot */
1083  if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
1084  {
1086  ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
1087  builder->snapshot);
1088  }
1089 
1090  /* refcount of the snapshot builder for the new snapshot */
1092 
1093  /* add a new Snapshot to all currently running transactions */
1095  }
1096  else
1097  {
1098  /* record that we cannot export a general snapshot anymore */
1099  builder->committed.includes_all_transactions = false;
1100  }
1101 }
#define TransactionIdAdvance(dest)
Definition: transam.h:48
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
#define DEBUG1
Definition: elog.h:25
static void SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
Definition: snapbuild.c:823
uint32 TransactionId
Definition: c.h:394
Snapshot snapshot
Definition: snapbuild.c:170
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
bool TransactionIdFollowsOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:349
static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
Definition: snapbuild.c:894
SnapBuildState state
Definition: snapbuild.c:144
static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
Definition: snapbuild.c:777
#define DEBUG2
Definition: elog.h:24
ReorderBuffer * reorder
Definition: snapbuild.c:180
bool includes_all_transactions
Definition: snapbuild.c:221
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid)
Definition: snapbuild.c:429
TransactionId xmax
Definition: snapbuild.c:153
#define NormalTransactionIdFollows(id1, id2)
Definition: transam.h:67
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:397
struct SnapBuild::@23 committed
#define elog
Definition: elog.h:219
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static void SnapBuildSnapIncRefcount(Snapshot snap)
Definition: snapbuild.c:385
XLogRecPtr start_decoding_at
Definition: snapbuild.c:159
SnapBuildState SnapBuildCurrentState ( SnapBuild builder)

Definition at line 364 of file snapbuild.c.

References SnapBuild::state.

Referenced by DecodeHeap2Op(), DecodeHeapOp(), DecodeLogicalMsgOp(), DecodeXactOp(), and DecodingContextReady().

365 {
366  return builder->state;
367 }
SnapBuildState state
Definition: snapbuild.c:144
static void SnapBuildDistributeNewCatalogSnapshot ( SnapBuild builder,
XLogRecPtr  lsn 
)
static

Definition at line 777 of file snapbuild.c.

References Assert, dlist_iter::cur, DEBUG2, dlist_container, dlist_foreach, elog, SnapBuild::reorder, ReorderBufferAddSnapshot(), ReorderBufferXidHasBaseSnapshot(), SnapBuildSnapIncRefcount(), SnapBuild::snapshot, ReorderBuffer::toplevel_by_lsn, TransactionIdIsValid, and ReorderBufferTXN::xid.

Referenced by SnapBuildCommitTxn().

778 {
779  dlist_iter txn_i;
780  ReorderBufferTXN *txn;
781 
782  /*
783  * Iterate through all toplevel transactions. This can include
784  * subtransactions which we just don't yet know to be that, but that's
785  * fine, they will just get an unnecessary snapshot queued.
786  */
787  dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
788  {
789  txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
790 
792 
793  /*
794  * If we don't have a base snapshot yet, there are no changes in this
795  * transaction which in turn implies we don't yet need a snapshot at
796  * all. We'll add a snapshot when the first change gets queued.
797  *
798  * NB: This works correctly even for subtransactions because
799  * ReorderBufferCommitChild() takes care to pass the parent the base
800  * snapshot, and while iterating the changequeue we'll get the change
801  * from the subtxn.
802  */
803  if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
804  continue;
805 
806  elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
807  txn->xid, (uint32) (lsn >> 32), (uint32) lsn);
808 
809  /*
810  * increase the snapshot's refcount for the transaction we are handing
811  * it out to
812  */
814  ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
815  builder->snapshot);
816  }
817 }
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
Snapshot snapshot
Definition: snapbuild.c:170
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define DEBUG2
Definition: elog.h:24
ReorderBuffer * reorder
Definition: snapbuild.c:180
unsigned int uint32
Definition: c.h:265
dlist_head toplevel_by_lsn
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
void ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
#define Assert(condition)
Definition: c.h:671
#define elog
Definition: elog.h:219
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static void SnapBuildSnapIncRefcount(Snapshot snap)
Definition: snapbuild.c:385
static void SnapBuildEndTxn ( SnapBuild builder,
XLogRecPtr  lsn,
TransactionId  xid 
)
static

Definition at line 894 of file snapbuild.c.

References Assert, ereport, errdetail(), errmsg(), LOG, SnapBuild::running, SNAPBUILD_CONSISTENT, SnapBuildTxnIsRunning(), SnapBuild::state, and SnapBuild::xcnt.

Referenced by SnapBuildAbortTxn(), and SnapBuildCommitTxn().

895 {
896  if (builder->state == SNAPBUILD_CONSISTENT)
897  return;
898 
899  /*
900  * NB: This handles subtransactions correctly even if we started from
901  * suboverflowed xl_running_xacts because we only keep track of toplevel
902  * transactions. Since the latter are always allocated before their
903  * subxids and since they end at the same time it's sufficient to deal
904  * with them here.
905  */
906  if (SnapBuildTxnIsRunning(builder, xid))
907  {
908  Assert(builder->running.xcnt > 0);
909 
910  if (!--builder->running.xcnt)
911  {
912  /*
913  * None of the originally running transaction is running anymore,
914  * so our incrementally built snapshot now is consistent.
915  */
916  ereport(LOG,
917  (errmsg("logical decoding found consistent point at %X/%X",
918  (uint32) (lsn >> 32), (uint32) lsn),
919  errdetail("Transaction ID %u finished; no more running transactions.",
920  xid)));
921  builder->state = SNAPBUILD_CONSISTENT;
922  }
923  }
924 }
size_t xcnt
Definition: snapbuild.c:199
#define LOG
Definition: elog.h:26
SnapBuildState state
Definition: snapbuild.c:144
struct SnapBuild::@22 running
int errdetail(const char *fmt,...)
Definition: elog.c:873
unsigned int uint32
Definition: c.h:265
#define ereport(elevel, rest)
Definition: elog.h:122
static bool SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid)
Definition: snapbuild.c:744
#define Assert(condition)
Definition: c.h:671
int errmsg(const char *fmt,...)
Definition: elog.c:797
const char* SnapBuildExportSnapshot ( SnapBuild builder)

Definition at line 512 of file snapbuild.c.

References Assert, SnapBuild::committed, CurrentResourceOwner, elog, ereport, errmsg_plural(), ERROR, ExportInProgress, ExportSnapshot(), FirstSnapshotSet, GetMaxSnapshotXidCount(), GetTopTransactionId(), SnapBuild::includes_all_transactions, IsTransactionOrTransactionBlock(), LOG, MyPgXact, NormalTransactionIdPrecedes, NULL, palloc(), SNAPBUILD_CONSISTENT, SnapBuildBuildSnapshot(), StartTransactionCommand(), SnapBuild::state, test(), TransactionIdAdvance, TransactionIdIsValid, XACT_REPEATABLE_READ, XactIsoLevel, XactReadOnly, SnapshotData::xcnt, xidComparator(), SnapshotData::xip, SnapshotData::xmax, SnapshotData::xmin, and PGXACT::xmin.

Referenced by CreateReplicationSlot().

513 {
514  Snapshot snap;
515  char *snapname;
516  TransactionId xid;
517  TransactionId *newxip;
518  int newxcnt = 0;
519 
520  if (builder->state != SNAPBUILD_CONSISTENT)
521  elog(ERROR, "cannot export a snapshot before reaching a consistent state");
522 
523  if (!builder->committed.includes_all_transactions)
524  elog(ERROR, "cannot export a snapshot, not all transactions are monitored anymore");
525 
526  /* so we don't overwrite the existing value */
528  elog(ERROR, "cannot export a snapshot when MyPgXact->xmin already is valid");
529 
531  elog(ERROR, "cannot export a snapshot from within a transaction");
532 
534  elog(ERROR, "can only export one snapshot at a time");
535 
537  ExportInProgress = true;
538 
540 
542 
543  /* There doesn't seem to a nice API to set these */
545  XactReadOnly = true;
546 
547  snap = SnapBuildBuildSnapshot(builder, GetTopTransactionId());
548 
549  /*
550  * We know that snap->xmin is alive, enforced by the logical xmin
551  * mechanism. Due to that we can do this without locks, we're only
552  * changing our own value.
553  */
554  MyPgXact->xmin = snap->xmin;
555 
556  /* allocate in transaction context */
557  newxip = (TransactionId *)
559 
560  /*
561  * snapbuild.c builds transactions in an "inverted" manner, which means it
562  * stores committed transactions in ->xip, not ones in progress. Build a
563  * classical snapshot by marking all non-committed transactions as
564  * in-progress. This can be expensive.
565  */
566  for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
567  {
568  void *test;
569 
570  /*
571  * Check whether transaction committed using the decoding snapshot
572  * meaning of ->xip.
573  */
574  test = bsearch(&xid, snap->xip, snap->xcnt,
575  sizeof(TransactionId), xidComparator);
576 
577  if (test == NULL)
578  {
579  if (newxcnt >= GetMaxSnapshotXidCount())
580  elog(ERROR, "snapshot too large");
581 
582  newxip[newxcnt++] = xid;
583  }
584 
586  }
587 
588  snap->xcnt = newxcnt;
589  snap->xip = newxip;
590 
591  /*
592  * now that we've built a plain snapshot, use the normal mechanisms for
593  * exporting it
594  */
595  snapname = ExportSnapshot(snap);
596 
597  ereport(LOG,
598  (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
599  "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
600  snap->xcnt,
601  snapname, snap->xcnt)));
602  return snapname;
603 }
#define TransactionIdAdvance(dest)
Definition: transam.h:48
static void test(void)
uint32 TransactionId
Definition: c.h:394
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
Definition: elog.c:850
TransactionId xmin
Definition: proc.h:203
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
#define XACT_REPEATABLE_READ
Definition: xact.h:30
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4320
char * ExportSnapshot(Snapshot snapshot)
Definition: snapmgr.c:1150
TransactionId GetTopTransactionId(void)
Definition: xact.c:388
#define LOG
Definition: elog.h:26
static ResourceOwner SavedResourceOwnerDuringExport
Definition: snapbuild.c:245
PGXACT * MyPgXact
Definition: proc.c:68
SnapBuildState state
Definition: snapbuild.c:144
#define ERROR
Definition: elog.h:43
bool FirstSnapshotSet
Definition: snapmgr.c:203
static bool ExportInProgress
Definition: snapbuild.c:246
bool includes_all_transactions
Definition: snapbuild.c:221
TransactionId xmax
Definition: snapshot.h:67
TransactionId xmin
Definition: snapshot.h:66
#define ereport(elevel, rest)
Definition: elog.h:122
static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid)
Definition: snapbuild.c:429
TransactionId * xip
Definition: snapshot.h:77
int GetMaxSnapshotXidCount(void)
Definition: procarray.c:1451
bool XactReadOnly
Definition: xact.c:77
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
void StartTransactionCommand(void)
Definition: xact.c:2675
int XactIsoLevel
Definition: xact.c:74
#define NormalTransactionIdPrecedes(id1, id2)
Definition: transam.h:62
uint32 xcnt
Definition: snapshot.h:78
void * palloc(Size size)
Definition: mcxt.c:891
struct SnapBuild::@23 committed
#define elog
Definition: elog.h:219
#define TransactionIdIsValid(xid)
Definition: transam.h:41
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138
static bool SnapBuildFindSnapshot ( SnapBuild builder,
XLogRecPtr  lsn,
xl_running_xacts running 
)
static

Definition at line 1210 of file snapbuild.c.

References Assert, SnapBuild::context, DEBUG1, elog, ereport, errdetail(), errdetail_internal(), errdetail_plural(), errmsg(), errmsg_internal(), ERROR, SnapBuild::initial_xmin_horizon, InvalidTransactionId, InvalidXLogRecPtr, LOG, MemoryContextAlloc(), xl_running_xacts::nextXid, NormalTransactionIdPrecedes, NULL, xl_running_xacts::oldestRunningXid, qsort, SnapBuild::running, SNAPBUILD_CONSISTENT, SNAPBUILD_FULL_SNAPSHOT, SnapBuildRestore(), SnapBuild::start_decoding_at, SnapBuild::state, TransactionIdAdvance, TransactionIdIsCurrentTransactionId(), TransactionIdIsNormal, TransactionIdRetreat, XactLockTableWait(), xl_running_xacts::xcnt, SnapBuild::xcnt, SnapBuild::xcnt_space, xidComparator(), xl_running_xacts::xids, SnapBuild::xip, XLTW_None, SnapBuild::xmax, and SnapBuild::xmin.

Referenced by SnapBuildProcessRunningXacts().

1211 {
1212  /* ---
1213  * Build catalog decoding snapshot incrementally using information about
1214  * the currently running transactions. There are several ways to do that:
1215  *
1216  * a) There were no running transactions when the xl_running_xacts record
1217  * was inserted, jump to CONSISTENT immediately. We might find such a
1218  * state we were waiting for b) and c).
1219  *
1220  * b) Wait for all toplevel transactions that were running to end. We
1221  * simply track the number of in-progress toplevel transactions and
1222  * lower it whenever one commits or aborts. When that number
1223  * (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT
1224  * to CONSISTENT.
1225  * NB: We need to search running.xip when seeing a transaction's end to
1226  * make sure it's a toplevel transaction and it's been one of the
1227  * initially running ones.
1228  * Interestingly, in contrast to HS, this allows us not to care about
1229  * subtransactions - and by extension suboverflowed xl_running_xacts -
1230  * at all.
1231  *
1232  * c) This (in a previous run) or another decoding slot serialized a
1233  * snapshot to disk that we can use.
1234  * ---
1235  */
1236 
1237  /*
1238  * xl_running_xact record is older than what we can use, we might not have
1239  * all necessary catalog rows anymore.
1240  */
1243  builder->initial_xmin_horizon))
1244  {
1245  ereport(DEBUG1,
1246  (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
1247  (uint32) (lsn >> 32), (uint32) lsn),
1248  errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
1249  builder->initial_xmin_horizon, running->oldestRunningXid)));
1250  return true;
1251  }
1252 
1253  /*
1254  * a) No transaction were running, we can jump to consistent.
1255  *
1256  * NB: We might have already started to incrementally assemble a snapshot,
1257  * so we need to be careful to deal with that.
1258  */
1259  if (running->xcnt == 0)
1260  {
1261  if (builder->start_decoding_at == InvalidXLogRecPtr ||
1262  builder->start_decoding_at <= lsn)
1263  /* can decode everything after this */
1264  builder->start_decoding_at = lsn + 1;
1265 
1266  /* As no transactions were running xmin/xmax can be trivially set. */
1267  builder->xmin = running->nextXid; /* < are finished */
1268  builder->xmax = running->nextXid; /* >= are running */
1269 
1270  /* so we can safely use the faster comparisons */
1271  Assert(TransactionIdIsNormal(builder->xmin));
1272  Assert(TransactionIdIsNormal(builder->xmax));
1273 
1274  /* no transactions running now */
1275  builder->running.xcnt = 0;
1276  builder->running.xmin = InvalidTransactionId;
1277  builder->running.xmax = InvalidTransactionId;
1278 
1279  builder->state = SNAPBUILD_CONSISTENT;
1280 
1281  ereport(LOG,
1282  (errmsg("logical decoding found consistent point at %X/%X",
1283  (uint32) (lsn >> 32), (uint32) lsn),
1284  errdetail("There are no running transactions.")));
1285 
1286  return false;
1287  }
1288  /* c) valid on disk state */
1289  else if (SnapBuildRestore(builder, lsn))
1290  {
1291  /* there won't be any state to cleanup */
1292  return false;
1293  }
1294 
1295  /*
1296  * b) first encounter of a useable xl_running_xacts record. If we had
1297  * found one earlier we would either track running transactions (i.e.
1298  * builder->running.xcnt != 0) or be consistent (this function wouldn't
1299  * get called).
1300  */
1301  else if (!builder->running.xcnt)
1302  {
1303  int off;
1304 
1305  /*
1306  * We only care about toplevel xids as those are the ones we
1307  * definitely see in the wal stream. As snapbuild.c tracks committed
1308  * instead of running transactions we don't need to know anything
1309  * about uncommitted subtransactions.
1310  */
1311 
1312  /*
1313  * Start with an xmin/xmax that's correct for future, when all the
1314  * currently running transactions have finished. We'll update both
1315  * while waiting for the pending transactions to finish.
1316  */
1317  builder->xmin = running->nextXid; /* < are finished */
1318  builder->xmax = running->nextXid; /* >= are running */
1319 
1320  /* so we can safely use the faster comparisons */
1321  Assert(TransactionIdIsNormal(builder->xmin));
1322  Assert(TransactionIdIsNormal(builder->xmax));
1323 
1324  builder->running.xcnt = running->xcnt;
1325  builder->running.xcnt_space = running->xcnt;
1326  builder->running.xip =
1327  MemoryContextAlloc(builder->context,
1328  builder->running.xcnt * sizeof(TransactionId));
1329  memcpy(builder->running.xip, running->xids,
1330  builder->running.xcnt * sizeof(TransactionId));
1331 
1332  /* sort so we can do a binary search */
1333  qsort(builder->running.xip, builder->running.xcnt,
1334  sizeof(TransactionId), xidComparator);
1335 
1336  builder->running.xmin = builder->running.xip[0];
1337  builder->running.xmax = builder->running.xip[running->xcnt - 1];
1338 
1339  /* makes comparisons cheaper later */
1340  TransactionIdRetreat(builder->running.xmin);
1341  TransactionIdAdvance(builder->running.xmax);
1342 
1343  builder->state = SNAPBUILD_FULL_SNAPSHOT;
1344 
1345  ereport(LOG,
1346  (errmsg("logical decoding found initial starting point at %X/%X",
1347  (uint32) (lsn >> 32), (uint32) lsn),
1348  errdetail_plural("%u transaction needs to finish.",
1349  "%u transactions need to finish.",
1350  builder->running.xcnt,
1351  (uint32) builder->running.xcnt)));
1352 
1353  /*
1354  * Iterate through all xids, wait for them to finish.
1355  *
1356  * This isn't required for the correctness of decoding, but to allow
1357  * isolationtester to notice that we're currently waiting for
1358  * something.
1359  */
1360  for (off = 0; off < builder->running.xcnt; off++)
1361  {
1362  TransactionId xid = builder->running.xip[off];
1363 
1364  /*
1365  * Upper layers should prevent that we ever need to wait on
1366  * ourselves. Check anyway, since failing to do so would either
1367  * result in an endless wait or an Assert() failure.
1368  */
1370  elog(ERROR, "waiting for ourselves");
1371 
1373  }
1374 
1375  /* nothing could have built up so far, so don't perform cleanup */
1376  return false;
1377  }
1378 
1379  /*
1380  * We already started to track running xacts and need to wait for all
1381  * in-progress ones to finish. We fall through to the normal processing of
1382  * records so incremental cleanup can be performed.
1383  */
1384  return true;
1385 }
#define TransactionIdAdvance(dest)
Definition: transam.h:48
static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
Definition: snapbuild.c:1643
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG1
Definition: elog.h:25
uint32 TransactionId
Definition: c.h:394
bool TransactionIdIsCurrentTransactionId(TransactionId xid)
Definition: xact.c:772
size_t xcnt_space
Definition: snapbuild.c:200
size_t xcnt
Definition: snapbuild.c:199
#define LOG
Definition: elog.h:26
#define TransactionIdRetreat(dest)
Definition: transam.h:56
int errdetail_internal(const char *fmt,...)
Definition: elog.c:900
TransactionId xids[FLEXIBLE_ARRAY_MEMBER]
Definition: standbydefs.h:56
SnapBuildState state
Definition: snapbuild.c:144
#define ERROR
Definition: elog.h:43
Definition: lmgr.h:26
struct SnapBuild::@22 running
TransactionId initial_xmin_horizon
Definition: snapbuild.c:165
TransactionId * xip
Definition: snapbuild.c:201
int errdetail(const char *fmt,...)
Definition: elog.c:873
#define InvalidTransactionId
Definition: transam.h:31
unsigned int uint32
Definition: c.h:265
#define ereport(elevel, rest)
Definition: elog.h:122
TransactionId xmax
Definition: snapbuild.c:153
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
void XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid, XLTW_Oper oper)
Definition: lmgr.c:554
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
int errdetail_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
Definition: elog.c:965
MemoryContext context
Definition: snapbuild.c:147
TransactionId nextXid
Definition: standbydefs.h:52
#define NormalTransactionIdPrecedes(id1, id2)
Definition: transam.h:62
int errmsg(const char *fmt,...)
Definition: elog.c:797
TransactionId xmin
Definition: snapbuild.c:150
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:749
TransactionId oldestRunningXid
Definition: standbydefs.h:53
#define elog
Definition: elog.h:219
#define qsort(a, b, c, d)
Definition: port.h:440
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
XLogRecPtr start_decoding_at
Definition: snapbuild.c:159
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138
static void SnapBuildFreeSnapshot ( Snapshot  snap)
static

Definition at line 339 of file snapbuild.c.

References SnapshotData::active_count, Assert, SnapshotData::copied, SnapshotData::curcid, elog, ERROR, FirstCommandId, HeapTupleSatisfiesHistoricMVCC(), pfree(), SnapshotData::regd_count, SnapshotData::satisfies, SnapshotData::suboverflowed, and SnapshotData::takenDuringRecovery.

Referenced by SnapBuildSnapDecRefcount().

340 {
341  /* make sure we don't get passed an external snapshot */
343 
344  /* make sure nobody modified our snapshot */
345  Assert(snap->curcid == FirstCommandId);
346  Assert(!snap->suboverflowed);
347  Assert(!snap->takenDuringRecovery);
348  Assert(snap->regd_count == 0);
349 
350  /* slightly more likely, so it's checked even without c-asserts */
351  if (snap->copied)
352  elog(ERROR, "cannot free a copied snapshot");
353 
354  if (snap->active_count)
355  elog(ERROR, "cannot free an active snapshot");
356 
357  pfree(snap);
358 }
SnapshotSatisfiesFunc satisfies
Definition: snapshot.h:54
bool copied
Definition: snapshot.h:94
bool suboverflowed
Definition: snapshot.h:91
uint32 regd_count
Definition: snapshot.h:108
#define FirstCommandId
Definition: c.h:410
void pfree(void *pointer)
Definition: mcxt.c:992
#define ERROR
Definition: elog.h:43
bool HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot, Buffer buffer)
Definition: tqual.c:1652
CommandId curcid
Definition: snapshot.h:96
#define Assert(condition)
Definition: c.h:671
bool takenDuringRecovery
Definition: snapshot.h:93
#define elog
Definition: elog.h:219
uint32 active_count
Definition: snapshot.h:107
Snapshot SnapBuildGetOrBuildSnapshot ( SnapBuild builder,
TransactionId  xid 
)

Definition at line 609 of file snapbuild.c.

References Assert, NULL, SNAPBUILD_CONSISTENT, SnapBuildBuildSnapshot(), SnapBuildSnapIncRefcount(), SnapBuild::snapshot, and SnapBuild::state.

Referenced by DecodeLogicalMsgOp().

610 {
611  Assert(builder->state == SNAPBUILD_CONSISTENT);
612 
613  /* only build a new snapshot if we don't have a prebuilt one */
614  if (builder->snapshot == NULL)
615  {
616  builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
617  /* increase refcount for the snapshot builder */
619  }
620 
621  return builder->snapshot;
622 }
Snapshot snapshot
Definition: snapbuild.c:170
SnapBuildState state
Definition: snapbuild.c:144
static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid)
Definition: snapbuild.c:429
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
static void SnapBuildSnapIncRefcount(Snapshot snap)
Definition: snapbuild.c:385
bool SnapBuildProcessChange ( SnapBuild builder,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 653 of file snapbuild.c.

References NULL, SnapBuild::reorder, ReorderBufferSetBaseSnapshot(), ReorderBufferXidHasBaseSnapshot(), SNAPBUILD_CONSISTENT, SNAPBUILD_FULL_SNAPSHOT, SnapBuildBuildSnapshot(), SnapBuildSnapIncRefcount(), SnapBuildTxnIsRunning(), SnapBuild::snapshot, and SnapBuild::state.

Referenced by DecodeHeap2Op(), DecodeHeapOp(), and DecodeLogicalMsgOp().

654 {
655  /*
656  * We can't handle data in transactions if we haven't built a snapshot
657  * yet, so don't store them.
658  */
659  if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
660  return false;
661 
662  /*
663  * No point in keeping track of changes in transactions that we don't have
664  * enough information about to decode. This means that they started before
665  * we got into the SNAPBUILD_FULL_SNAPSHOT state.
666  */
667  if (builder->state < SNAPBUILD_CONSISTENT &&
668  SnapBuildTxnIsRunning(builder, xid))
669  return false;
670 
671  /*
672  * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
673  * be needed to decode the change we're currently processing.
674  */
675  if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
676  {
677  /* only build a new snapshot if we don't have a prebuilt one */
678  if (builder->snapshot == NULL)
679  {
680  builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
681  /* increase refcount for the snapshot builder */
683  }
684 
685  /*
686  * Increase refcount for the transaction we're handing the snapshot
687  * out to.
688  */
690  ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
691  builder->snapshot);
692  }
693 
694  return true;
695 }
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
Snapshot snapshot
Definition: snapbuild.c:170
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
SnapBuildState state
Definition: snapbuild.c:144
ReorderBuffer * reorder
Definition: snapbuild.c:180
static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid)
Definition: snapbuild.c:429
static bool SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid)
Definition: snapbuild.c:744
#define NULL
Definition: c.h:226
static void SnapBuildSnapIncRefcount(Snapshot snap)
Definition: snapbuild.c:385
void SnapBuildProcessNewCid ( SnapBuild builder,
TransactionId  xid,
XLogRecPtr  lsn,
xl_heap_new_cid xlrec 
)

Definition at line 703 of file snapbuild.c.

References xl_heap_new_cid::cmax, xl_heap_new_cid::cmin, xl_heap_new_cid::combocid, elog, ERROR, InvalidCommandId, Max, SnapBuild::reorder, ReorderBufferAddNewCommandId(), ReorderBufferAddNewTupleCids(), ReorderBufferXidSetCatalogChanges(), xl_heap_new_cid::target_node, xl_heap_new_cid::target_tid, and xl_heap_new_cid::top_xid.

Referenced by DecodeHeap2Op().

705 {
706  CommandId cid;
707 
708  /*
709  * we only log new_cid's if a catalog tuple was modified, so mark the
710  * transaction as containing catalog modifications
711  */
712  ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
713 
714  ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
715  xlrec->target_node, xlrec->target_tid,
716  xlrec->cmin, xlrec->cmax,
717  xlrec->combocid);
718 
719  /* figure out new command id */
720  if (xlrec->cmin != InvalidCommandId &&
721  xlrec->cmax != InvalidCommandId)
722  cid = Max(xlrec->cmin, xlrec->cmax);
723  else if (xlrec->cmax != InvalidCommandId)
724  cid = xlrec->cmax;
725  else if (xlrec->cmin != InvalidCommandId)
726  cid = xlrec->cmin;
727  else
728  {
729  cid = InvalidCommandId; /* silence compiler */
730  elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
731  }
732 
733  ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
734 }
uint32 CommandId
Definition: c.h:408
CommandId combocid
Definition: heapam_xlog.h:348
CommandId cmax
Definition: heapam_xlog.h:341
ItemPointerData target_tid
Definition: heapam_xlog.h:354
void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
#define ERROR
Definition: elog.h:43
ReorderBuffer * reorder
Definition: snapbuild.c:180
RelFileNode target_node
Definition: heapam_xlog.h:353
#define InvalidCommandId
Definition: c.h:411
#define Max(x, y)
Definition: c.h:796
CommandId cmin
Definition: heapam_xlog.h:340
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
#define elog
Definition: elog.h:219
TransactionId top_xid
Definition: heapam_xlog.h:339
void ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
void SnapBuildProcessRunningXacts ( SnapBuild builder,
XLogRecPtr  lsn,
xl_running_xacts running 
)

Definition at line 1115 of file snapbuild.c.

References ReorderBuffer::current_restart_decoding_lsn, DEBUG3, elog, InvalidXLogRecPtr, SnapBuild::last_serialized_snapshot, LogicalIncreaseRestartDecodingForSlot(), LogicalIncreaseXminForSlot(), NULL, xl_running_xacts::oldestRunningXid, SnapBuild::reorder, ReorderBufferGetOldestTXN(), ReorderBufferTXN::restart_decoding_lsn, SNAPBUILD_CONSISTENT, SnapBuildFindSnapshot(), SnapBuildPurgeCommittedTxn(), SnapBuildSerialize(), SnapBuild::state, SnapBuild::xmax, and SnapBuild::xmin.

Referenced by DecodeStandbyOp().

1116 {
1117  ReorderBufferTXN *txn;
1118 
1119  /*
1120  * If we're not consistent yet, inspect the record to see whether it
1121  * allows to get closer to being consistent. If we are consistent, dump
1122  * our snapshot so others or we, after a restart, can use it.
1123  */
1124  if (builder->state < SNAPBUILD_CONSISTENT)
1125  {
1126  /* returns false if there's no point in performing cleanup just yet */
1127  if (!SnapBuildFindSnapshot(builder, lsn, running))
1128  return;
1129  }
1130  else
1131  SnapBuildSerialize(builder, lsn);
1132 
1133  /*
1134  * Update range of interesting xids based on the running xacts
1135  * information. We don't increase ->xmax using it, because once we are in
1136  * a consistent state we can do that ourselves and much more efficiently
1137  * so, because we only need to do it for catalog transactions since we
1138  * only ever look at those.
1139  *
1140  * NB: Because of that xmax can be lower than xmin, because we only
1141  * increase xmax when a catalog modifying transaction commits. While odd
1142  * looking, it's correct and actually more efficient this way since we hit
1143  * fast paths in tqual.c.
1144  */
1145  builder->xmin = running->oldestRunningXid;
1146 
1147  /* Remove transactions we don't need to keep track off anymore */
1148  SnapBuildPurgeCommittedTxn(builder);
1149 
1150  elog(DEBUG3, "xmin: %u, xmax: %u, oldestrunning: %u",
1151  builder->xmin, builder->xmax,
1152  running->oldestRunningXid);
1153 
1154  /*
1155  * Inrease shared memory limits, so vacuum can work on tuples we prevented
1156  * from being pruned till now.
1157  */
1159 
1160  /*
1161  * Also tell the slot where we can restart decoding from. We don't want to
1162  * do that after every commit because changing that implies an fsync of
1163  * the logical slot's state file, so we only do it every time we see a
1164  * running xacts record.
1165  *
1166  * Do so by looking for the oldest in progress transaction (determined by
1167  * the first LSN of any of its relevant records). Every transaction
1168  * remembers the last location we stored the snapshot to disk before its
1169  * beginning. That point is where we can restart from.
1170  */
1171 
1172  /*
1173  * Can't know about a serialized snapshot's location if we're not
1174  * consistent.
1175  */
1176  if (builder->state < SNAPBUILD_CONSISTENT)
1177  return;
1178 
1179  txn = ReorderBufferGetOldestTXN(builder->reorder);
1180 
1181  /*
1182  * oldest ongoing txn might have started when we didn't yet serialize
1183  * anything because we hadn't reached a consistent state yet.
1184  */
1185  if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
1187 
1188  /*
1189  * No in-progress transaction, can reuse the last serialized snapshot if
1190  * we have one.
1191  */
1192  else if (txn == NULL &&
1196  builder->last_serialized_snapshot);
1197 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG3
Definition: elog.h:23
void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
Definition: logical.c:815
XLogRecPtr current_restart_decoding_lsn
static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
Definition: snapbuild.c:1210
SnapBuildState state
Definition: snapbuild.c:144
static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
Definition: snapbuild.c:1450
ReorderBuffer * reorder
Definition: snapbuild.c:180
XLogRecPtr last_serialized_snapshot
Definition: snapbuild.c:175
TransactionId xmax
Definition: snapbuild.c:153
#define NULL
Definition: c.h:226
ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *rb)
void LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
Definition: logical.c:758
TransactionId xmin
Definition: snapbuild.c:150
XLogRecPtr restart_decoding_lsn
TransactionId oldestRunningXid
Definition: standbydefs.h:53
#define elog
Definition: elog.h:219
static void SnapBuildPurgeCommittedTxn(SnapBuild *builder)
Definition: snapbuild.c:852
static void SnapBuildPurgeCommittedTxn ( SnapBuild builder)
static

Definition at line 852 of file snapbuild.c.

References SnapBuild::committed, SnapBuild::context, DEBUG3, elog, MemoryContextAlloc(), NormalTransactionIdPrecedes, pfree(), TransactionIdIsNormal, SnapBuild::xcnt, SnapBuild::xip, SnapBuild::xmax, and SnapBuild::xmin.

Referenced by SnapBuildProcessRunningXacts().

853 {
854  int off;
855  TransactionId *workspace;
856  int surviving_xids = 0;
857 
858  /* not ready yet */
859  if (!TransactionIdIsNormal(builder->xmin))
860  return;
861 
862  /* TODO: Neater algorithm than just copying and iterating? */
863  workspace =
864  MemoryContextAlloc(builder->context,
865  builder->committed.xcnt * sizeof(TransactionId));
866 
867  /* copy xids that still are interesting to workspace */
868  for (off = 0; off < builder->committed.xcnt; off++)
869  {
870  if (NormalTransactionIdPrecedes(builder->committed.xip[off],
871  builder->xmin))
872  ; /* remove */
873  else
874  workspace[surviving_xids++] = builder->committed.xip[off];
875  }
876 
877  /* copy workspace back to persistent state */
878  memcpy(builder->committed.xip, workspace,
879  surviving_xids * sizeof(TransactionId));
880 
881  elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
882  (uint32) builder->committed.xcnt, (uint32) surviving_xids,
883  builder->xmin, builder->xmax);
884  builder->committed.xcnt = surviving_xids;
885 
886  pfree(workspace);
887 }
uint32 TransactionId
Definition: c.h:394
#define DEBUG3
Definition: elog.h:23
size_t xcnt
Definition: snapbuild.c:199
void pfree(void *pointer)
Definition: mcxt.c:992
TransactionId * xip
Definition: snapbuild.c:201
unsigned int uint32
Definition: c.h:265
TransactionId xmax
Definition: snapbuild.c:153
MemoryContext context
Definition: snapbuild.c:147
#define NormalTransactionIdPrecedes(id1, id2)
Definition: transam.h:62
TransactionId xmin
Definition: snapbuild.c:150
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:749
struct SnapBuild::@23 committed
#define elog
Definition: elog.h:219
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
static bool SnapBuildRestore ( SnapBuild builder,
XLogRecPtr  lsn 
)
static

Definition at line 1643 of file snapbuild.c.

References Assert, SnapBuildOnDisk::builder, SnapBuildOnDisk::checksum, CloseTransientFile(), SnapBuild::committed, COMP_CRC32C, SnapBuild::context, EQ_CRC32C, ereport, errcode_for_file_access(), errdetail(), errmsg(), ERROR, fd(), FIN_CRC32C, fsync_fname(), INIT_CRC32C, SnapBuild::initial_xmin_horizon, InvalidTransactionId, LOG, SnapBuildOnDisk::magic, MAXPGPATH, MemoryContextAllocZero(), NULL, OpenTransientFile(), pfree(), PG_BINARY, read, SnapBuild::reorder, ReorderBufferSetRestartPoint(), SnapBuild::running, SNAPBUILD_CONSISTENT, SNAPBUILD_MAGIC, SNAPBUILD_VERSION, SnapBuildBuildSnapshot(), SnapBuildOnDiskConstantSize, SnapBuildOnDiskNotChecksummedSize, SnapBuildSnapDecRefcount(), SnapBuildSnapIncRefcount(), SnapBuild::snapshot, SnapBuild::state, TransactionIdPrecedes(), SnapBuildOnDisk::version, SnapBuild::xcnt, SnapBuild::xcnt_space, SnapBuild::xip, SnapBuild::xmax, and SnapBuild::xmin.

Referenced by SnapBuildFindSnapshot(), and SnapBuildSerializationPoint().

1644 {
1645  SnapBuildOnDisk ondisk;
1646  int fd;
1647  char path[MAXPGPATH];
1648  Size sz;
1649  int readBytes;
1650  pg_crc32c checksum;
1651 
1652  /* no point in loading a snapshot if we're already there */
1653  if (builder->state == SNAPBUILD_CONSISTENT)
1654  return false;
1655 
1656  sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1657  (uint32) (lsn >> 32), (uint32) lsn);
1658 
1659  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
1660 
1661  if (fd < 0 && errno == ENOENT)
1662  return false;
1663  else if (fd < 0)
1664  ereport(ERROR,
1666  errmsg("could not open file \"%s\": %m", path)));
1667 
1668  /* ----
1669  * Make sure the snapshot had been stored safely to disk, that's normally
1670  * cheap.
1671  * Note that we do not need PANIC here, nobody will be able to use the
1672  * slot without fsyncing, and saving it won't succeed without an fsync()
1673  * either...
1674  * ----
1675  */
1676  fsync_fname(path, false);
1677  fsync_fname("pg_logical/snapshots", true);
1678 
1679 
1680  /* read statically sized portion of snapshot */
1681  readBytes = read(fd, &ondisk, SnapBuildOnDiskConstantSize);
1682  if (readBytes != SnapBuildOnDiskConstantSize)
1683  {
1684  CloseTransientFile(fd);
1685  ereport(ERROR,
1687  errmsg("could not read file \"%s\", read %d of %d: %m",
1688  path, readBytes, (int) SnapBuildOnDiskConstantSize)));
1689  }
1690 
1691  if (ondisk.magic != SNAPBUILD_MAGIC)
1692  ereport(ERROR,
1693  (errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
1694  path, ondisk.magic, SNAPBUILD_MAGIC)));
1695 
1696  if (ondisk.version != SNAPBUILD_VERSION)
1697  ereport(ERROR,
1698  (errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
1699  path, ondisk.version, SNAPBUILD_VERSION)));
1700 
1701  INIT_CRC32C(checksum);
1702  COMP_CRC32C(checksum,
1703  ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
1705 
1706  /* read SnapBuild */
1707  readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild));
1708  if (readBytes != sizeof(SnapBuild))
1709  {
1710  CloseTransientFile(fd);
1711  ereport(ERROR,
1713  errmsg("could not read file \"%s\", read %d of %d: %m",
1714  path, readBytes, (int) sizeof(SnapBuild))));
1715  }
1716  COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
1717 
1718  /* restore running xacts information */
1719  sz = sizeof(TransactionId) * ondisk.builder.running.xcnt_space;
1720  ondisk.builder.running.xip = MemoryContextAllocZero(builder->context, sz);
1721  readBytes = read(fd, ondisk.builder.running.xip, sz);
1722  if (readBytes != sz)
1723  {
1724  CloseTransientFile(fd);
1725  ereport(ERROR,
1727  errmsg("could not read file \"%s\", read %d of %d: %m",
1728  path, readBytes, (int) sz)));
1729  }
1730  COMP_CRC32C(checksum, ondisk.builder.running.xip, sz);
1731 
1732  /* restore committed xacts information */
1733  sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
1734  ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
1735  readBytes = read(fd, ondisk.builder.committed.xip, sz);
1736  if (readBytes != sz)
1737  {
1738  CloseTransientFile(fd);
1739  ereport(ERROR,
1741  errmsg("could not read file \"%s\", read %d of %d: %m",
1742  path, readBytes, (int) sz)));
1743  }
1744  COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
1745 
1746  CloseTransientFile(fd);
1747 
1748  FIN_CRC32C(checksum);
1749 
1750  /* verify checksum of what we've read */
1751  if (!EQ_CRC32C(checksum, ondisk.checksum))
1752  ereport(ERROR,
1754  errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
1755  path, checksum, ondisk.checksum)));
1756 
1757  /*
1758  * ok, we now have a sensible snapshot here, figure out if it has more
1759  * information than we have.
1760  */
1761 
1762  /*
1763  * We are only interested in consistent snapshots for now, comparing
1764  * whether one incomplete snapshot is more "advanced" seems to be
1765  * unnecessarily complex.
1766  */
1767  if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
1768  goto snapshot_not_interesting;
1769 
1770  /*
1771  * Don't use a snapshot that requires an xmin that we cannot guarantee to
1772  * be available.
1773  */
1775  goto snapshot_not_interesting;
1776 
1777 
1778  /* ok, we think the snapshot is sensible, copy over everything important */
1779  builder->xmin = ondisk.builder.xmin;
1780  builder->xmax = ondisk.builder.xmax;
1781  builder->state = ondisk.builder.state;
1782 
1783  builder->committed.xcnt = ondisk.builder.committed.xcnt;
1784  /* We only allocated/stored xcnt, not xcnt_space xids ! */
1785  /* don't overwrite preallocated xip, if we don't have anything here */
1786  if (builder->committed.xcnt > 0)
1787  {
1788  pfree(builder->committed.xip);
1789  builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
1790  builder->committed.xip = ondisk.builder.committed.xip;
1791  }
1792  ondisk.builder.committed.xip = NULL;
1793 
1794  builder->running.xcnt = ondisk.builder.running.xcnt;
1795  if (builder->running.xip)
1796  pfree(builder->running.xip);
1797  builder->running.xcnt_space = ondisk.builder.running.xcnt_space;
1798  builder->running.xip = ondisk.builder.running.xip;
1799 
1800  /* our snapshot is not interesting anymore, build a new one */
1801  if (builder->snapshot != NULL)
1802  {
1804  }
1807 
1808  ReorderBufferSetRestartPoint(builder->reorder, lsn);
1809 
1810  Assert(builder->state == SNAPBUILD_CONSISTENT);
1811 
1812  ereport(LOG,
1813  (errmsg("logical decoding found consistent point at %X/%X",
1814  (uint32) (lsn >> 32), (uint32) lsn),
1815  errdetail("Logical decoding will begin using saved snapshot.")));
1816  return true;
1817 
1818 snapshot_not_interesting:
1819  if (ondisk.builder.running.xip != NULL)
1820  pfree(ondisk.builder.running.xip);
1821  if (ondisk.builder.committed.xip != NULL)
1822  pfree(ondisk.builder.committed.xip);
1823  return false;
1824 }
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
#define SNAPBUILD_VERSION
Definition: snapbuild.c:1428
uint32 TransactionId
Definition: c.h:394
#define SNAPBUILD_MAGIC
Definition: snapbuild.c:1427
pg_crc32c checksum
Definition: snapbuild.c:1407
Snapshot snapshot
Definition: snapbuild.c:170
uint32 pg_crc32c
Definition: pg_crc32c.h:38
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:567
size_t xcnt_space
Definition: snapbuild.c:200
size_t xcnt
Definition: snapbuild.c:199
#define LOG
Definition: elog.h:26
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1038
void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
SnapBuildState state
Definition: snapbuild.c:144
void pfree(void *pointer)
Definition: mcxt.c:992
#define ERROR
Definition: elog.h:43
struct SnapBuild::@22 running
#define MAXPGPATH
SnapBuild builder
Definition: snapbuild.c:1417
ReorderBuffer * reorder
Definition: snapbuild.c:180
TransactionId initial_xmin_horizon
Definition: snapbuild.c:165
TransactionId * xip
Definition: snapbuild.c:201
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2093
int errdetail(const char *fmt,...)
Definition: elog.c:873
int errcode_for_file_access(void)
Definition: elog.c:598
#define InvalidTransactionId
Definition: transam.h:31
unsigned int uint32
Definition: c.h:265
#define EQ_CRC32C(c1, c2)
Definition: pg_crc32c.h:42
#define ereport(elevel, rest)
Definition: elog.h:122
static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid)
Definition: snapbuild.c:429
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
int CloseTransientFile(int fd)
Definition: fd.c:2254
TransactionId xmax
Definition: snapbuild.c:153
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:784
#define SnapBuildOnDiskConstantSize
Definition: snapbuild.c:1422
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
#define SnapBuildOnDiskNotChecksummedSize
Definition: snapbuild.c:1424
size_t Size
Definition: c.h:353
MemoryContext context
Definition: snapbuild.c:147
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:397
int errmsg(const char *fmt,...)
Definition: elog.c:797
TransactionId xmin
Definition: snapbuild.c:150
struct SnapBuild::@23 committed
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:73
static void SnapBuildSnapIncRefcount(Snapshot snap)
Definition: snapbuild.c:385
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:78
#define read(a, b, c)
Definition: win32.h:18
void SnapBuildSerializationPoint ( SnapBuild builder,
XLogRecPtr  lsn 
)

Definition at line 1437 of file snapbuild.c.

References SNAPBUILD_CONSISTENT, SnapBuildRestore(), SnapBuildSerialize(), and SnapBuild::state.

Referenced by DecodeXLogOp().

1438 {
1439  if (builder->state < SNAPBUILD_CONSISTENT)
1440  SnapBuildRestore(builder, lsn);
1441  else
1442  SnapBuildSerialize(builder, lsn);
1443 }
static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
Definition: snapbuild.c:1643
SnapBuildState state
Definition: snapbuild.c:144
static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
Definition: snapbuild.c:1450
static void SnapBuildSerialize ( SnapBuild builder,
XLogRecPtr  lsn 
)
static

Definition at line 1450 of file snapbuild.c.

References Assert, SnapBuildOnDisk::builder, SnapBuildOnDisk::checksum, CloseTransientFile(), SnapBuild::committed, COMP_CRC32C, SnapBuild::context, DEBUG1, elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FIN_CRC32C, fsync_fname(), INIT_CRC32C, InvalidXLogRecPtr, SnapBuild::last_serialized_snapshot, SnapBuildOnDisk::length, SnapBuildOnDisk::magic, MAXPGPATH, MemoryContextAllocZero(), MyProcPid, NULL, OpenTransientFile(), PG_BINARY, pg_fsync(), SnapBuild::reorder, ReorderBufferSetRestartPoint(), SnapBuild::running, SNAPBUILD_CONSISTENT, SNAPBUILD_MAGIC, SNAPBUILD_VERSION, SnapBuildOnDiskConstantSize, SnapBuildOnDiskNotChecksummedSize, SnapBuild::snapshot, SnapBuild::state, unlink(), SnapBuildOnDisk::version, write, SnapBuild::xcnt, SnapBuild::xcnt_space, and SnapBuild::xip.

Referenced by SnapBuildProcessRunningXacts(), and SnapBuildSerializationPoint().

1451 {
1452  Size needed_length;
1453  SnapBuildOnDisk *ondisk;
1454  char *ondisk_c;
1455  int fd;
1456  char tmppath[MAXPGPATH];
1457  char path[MAXPGPATH];
1458  int ret;
1459  struct stat stat_buf;
1460  Size sz;
1461 
1462  Assert(lsn != InvalidXLogRecPtr);
1464  builder->last_serialized_snapshot <= lsn);
1465 
1466  /*
1467  * no point in serializing if we cannot continue to work immediately after
1468  * restoring the snapshot
1469  */
1470  if (builder->state < SNAPBUILD_CONSISTENT)
1471  return;
1472 
1473  /*
1474  * We identify snapshots by the LSN they are valid for. We don't need to
1475  * include timelines in the name as each LSN maps to exactly one timeline
1476  * unless the user used pg_resetwal or similar. If a user did so, there's
1477  * no hope continuing to decode anyway.
1478  */
1479  sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1480  (uint32) (lsn >> 32), (uint32) lsn);
1481 
1482  /*
1483  * first check whether some other backend already has written the snapshot
1484  * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
1485  * as a valid state. Everything else is an unexpected error.
1486  */
1487  ret = stat(path, &stat_buf);
1488 
1489  if (ret != 0 && errno != ENOENT)
1490  ereport(ERROR,
1491  (errmsg("could not stat file \"%s\": %m", path)));
1492 
1493  else if (ret == 0)
1494  {
1495  /*
1496  * somebody else has already serialized to this point, don't overwrite
1497  * but remember location, so we don't need to read old data again.
1498  *
1499  * To be sure it has been synced to disk after the rename() from the
1500  * tempfile filename to the real filename, we just repeat the fsync.
1501  * That ought to be cheap because in most scenarios it should already
1502  * be safely on disk.
1503  */
1504  fsync_fname(path, false);
1505  fsync_fname("pg_logical/snapshots", true);
1506 
1507  builder->last_serialized_snapshot = lsn;
1508  goto out;
1509  }
1510 
1511  /*
1512  * there is an obvious race condition here between the time we stat(2) the
1513  * file and us writing the file. But we rename the file into place
1514  * atomically and all files created need to contain the same data anyway,
1515  * so this is perfectly fine, although a bit of a resource waste. Locking
1516  * seems like pointless complication.
1517  */
1518  elog(DEBUG1, "serializing snapshot to %s", path);
1519 
1520  /* to make sure only we will write to this tempfile, include pid */
1521  sprintf(tmppath, "pg_logical/snapshots/%X-%X.snap.%u.tmp",
1522  (uint32) (lsn >> 32), (uint32) lsn, MyProcPid);
1523 
1524  /*
1525  * Unlink temporary file if it already exists, needs to have been before a
1526  * crash/error since we won't enter this function twice from within a
1527  * single decoding slot/backend and the temporary file contains the pid of
1528  * the current process.
1529  */
1530  if (unlink(tmppath) != 0 && errno != ENOENT)
1531  ereport(ERROR,
1533  errmsg("could not remove file \"%s\": %m", path)));
1534 
1535  needed_length = sizeof(SnapBuildOnDisk) +
1536  sizeof(TransactionId) * builder->running.xcnt_space +
1537  sizeof(TransactionId) * builder->committed.xcnt;
1538 
1539  ondisk_c = MemoryContextAllocZero(builder->context, needed_length);
1540  ondisk = (SnapBuildOnDisk *) ondisk_c;
1541  ondisk->magic = SNAPBUILD_MAGIC;
1542  ondisk->version = SNAPBUILD_VERSION;
1543  ondisk->length = needed_length;
1544  INIT_CRC32C(ondisk->checksum);
1545  COMP_CRC32C(ondisk->checksum,
1546  ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1548  ondisk_c += sizeof(SnapBuildOnDisk);
1549 
1550  memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
1551  /* NULL-ify memory-only data */
1552  ondisk->builder.context = NULL;
1553  ondisk->builder.snapshot = NULL;
1554  ondisk->builder.reorder = NULL;
1555  ondisk->builder.running.xip = NULL;
1556  ondisk->builder.committed.xip = NULL;
1557 
1558  COMP_CRC32C(ondisk->checksum,
1559  &ondisk->builder,
1560  sizeof(SnapBuild));
1561 
1562  /* copy running xacts */
1563  sz = sizeof(TransactionId) * builder->running.xcnt_space;
1564  memcpy(ondisk_c, builder->running.xip, sz);
1565  COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1566  ondisk_c += sz;
1567 
1568  /* copy committed xacts */
1569  sz = sizeof(TransactionId) * builder->committed.xcnt;
1570  memcpy(ondisk_c, builder->committed.xip, sz);
1571  COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1572  ondisk_c += sz;
1573 
1574  FIN_CRC32C(ondisk->checksum);
1575 
1576  /* we have valid data now, open tempfile and write it there */
1577  fd = OpenTransientFile(tmppath,
1578  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
1579  S_IRUSR | S_IWUSR);
1580  if (fd < 0)
1581  ereport(ERROR,
1582  (errmsg("could not open file \"%s\": %m", path)));
1583 
1584  if ((write(fd, ondisk, needed_length)) != needed_length)
1585  {
1586  CloseTransientFile(fd);
1587  ereport(ERROR,
1589  errmsg("could not write to file \"%s\": %m", tmppath)));
1590  }
1591 
1592  /*
1593  * fsync the file before renaming so that even if we crash after this we
1594  * have either a fully valid file or nothing.
1595  *
1596  * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
1597  * some noticeable overhead since it's performed synchronously during
1598  * decoding?
1599  */
1600  if (pg_fsync(fd) != 0)
1601  {
1602  CloseTransientFile(fd);
1603  ereport(ERROR,
1605  errmsg("could not fsync file \"%s\": %m", tmppath)));
1606  }
1607  CloseTransientFile(fd);
1608 
1609  fsync_fname("pg_logical/snapshots", true);
1610 
1611  /*
1612  * We may overwrite the work from some other backend, but that's ok, our
1613  * snapshot is valid as well, we'll just have done some superfluous work.
1614  */
1615  if (rename(tmppath, path) != 0)
1616  {
1617  ereport(ERROR,
1619  errmsg("could not rename file \"%s\" to \"%s\": %m",
1620  tmppath, path)));
1621  }
1622 
1623  /* make sure we persist */
1624  fsync_fname(path, false);
1625  fsync_fname("pg_logical/snapshots", true);
1626 
1627  /*
1628  * Now there's no way we can loose the dumped state anymore, remember this
1629  * as a serialization point.
1630  */
1631  builder->last_serialized_snapshot = lsn;
1632 
1633 out:
1635  builder->last_serialized_snapshot);
1636 }
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:38
#define SNAPBUILD_VERSION
Definition: snapbuild.c:1428
uint32 TransactionId
Definition: c.h:394
#define SNAPBUILD_MAGIC
Definition: snapbuild.c:1427
#define write(a, b, c)
Definition: win32.h:19
pg_crc32c checksum
Definition: snapbuild.c:1407
Snapshot snapshot
Definition: snapbuild.c:170
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:567
size_t xcnt_space
Definition: snapbuild.c:200
size_t xcnt
Definition: snapbuild.c:199
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1038
void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
SnapBuildState state
Definition: snapbuild.c:144
#define ERROR
Definition: elog.h:43
struct stat stat_buf
Definition: pg_standby.c:101
struct SnapBuild::@22 running
#define MAXPGPATH
SnapBuild builder
Definition: snapbuild.c:1417
ReorderBuffer * reorder
Definition: snapbuild.c:180
TransactionId * xip
Definition: snapbuild.c:201
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2093
int errcode_for_file_access(void)
Definition: elog.c:598
unsigned int uint32
Definition: c.h:265
int unlink(const char *filename)
#define ereport(elevel, rest)
Definition: elog.h:122
XLogRecPtr last_serialized_snapshot
Definition: snapbuild.c:175
int CloseTransientFile(int fd)
Definition: fd.c:2254
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:784
#define SnapBuildOnDiskConstantSize
Definition: snapbuild.c:1422
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
#define SnapBuildOnDiskNotChecksummedSize
Definition: snapbuild.c:1424
size_t Size
Definition: c.h:353
MemoryContext context
Definition: snapbuild.c:147
int errmsg(const char *fmt,...)
Definition: elog.c:797
struct SnapBuild::@23 committed
int pg_fsync(int fd)
Definition: fd.c:333
struct SnapBuildOnDisk SnapBuildOnDisk
#define elog
Definition: elog.h:219
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:73
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:78
void SnapBuildSnapDecRefcount ( Snapshot  snap)

Definition at line 397 of file snapbuild.c.

References SnapshotData::active_count, Assert, SnapshotData::copied, SnapshotData::curcid, elog, ERROR, FirstCommandId, HeapTupleSatisfiesHistoricMVCC(), SnapshotData::regd_count, SnapshotData::satisfies, SnapBuildFreeSnapshot(), SnapshotData::suboverflowed, and SnapshotData::takenDuringRecovery.

Referenced by FreeSnapshotBuilder(), ReorderBufferCleanupTXN(), ReorderBufferFreeSnap(), SnapBuildCommitTxn(), and SnapBuildRestore().

398 {
399  /* make sure we don't get passed an external snapshot */
401 
402  /* make sure nobody modified our snapshot */
403  Assert(snap->curcid == FirstCommandId);
404  Assert(!snap->suboverflowed);
405  Assert(!snap->takenDuringRecovery);
406 
407  Assert(snap->regd_count == 0);
408 
409  Assert(snap->active_count > 0);
410 
411  /* slightly more likely, so it's checked even without casserts */
412  if (snap->copied)
413  elog(ERROR, "cannot free a copied snapshot");
414 
415  snap->active_count--;
416  if (snap->active_count == 0)
417  SnapBuildFreeSnapshot(snap);
418 }
SnapshotSatisfiesFunc satisfies
Definition: snapshot.h:54
bool copied
Definition: snapshot.h:94
bool suboverflowed
Definition: snapshot.h:91
uint32 regd_count
Definition: snapshot.h:108
#define FirstCommandId
Definition: c.h:410
#define ERROR
Definition: elog.h:43
static void SnapBuildFreeSnapshot(Snapshot snap)
Definition: snapbuild.c:339
bool HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot, Buffer buffer)
Definition: tqual.c:1652
CommandId curcid
Definition: snapshot.h:96
#define Assert(condition)
Definition: c.h:671
bool takenDuringRecovery
Definition: snapshot.h:93
#define elog
Definition: elog.h:219
uint32 active_count
Definition: snapshot.h:107
static void SnapBuildSnapIncRefcount ( Snapshot  snap)
static
static bool SnapBuildTxnIsRunning ( SnapBuild builder,
TransactionId  xid 
)
static

Definition at line 744 of file snapbuild.c.

References Assert, NormalTransactionIdFollows, NormalTransactionIdPrecedes, NULL, SnapBuild::running, SNAPBUILD_CONSISTENT, SnapBuild::state, TransactionIdIsNormal, SnapBuild::xcnt, SnapBuild::xcnt_space, xidComparator(), SnapBuild::xip, SnapBuild::xmax, and SnapBuild::xmin.

Referenced by SnapBuildEndTxn(), and SnapBuildProcessChange().

745 {
746  Assert(builder->state < SNAPBUILD_CONSISTENT);
749 
750  if (builder->running.xcnt &&
751  NormalTransactionIdFollows(xid, builder->running.xmin) &&
753  {
754  TransactionId *search =
755  bsearch(&xid, builder->running.xip, builder->running.xcnt_space,
756  sizeof(TransactionId), xidComparator);
757 
758  if (search != NULL)
759  {
760  Assert(*search == xid);
761  return true;
762  }
763  }
764 
765  return false;
766 }
uint32 TransactionId
Definition: c.h:394
size_t xcnt_space
Definition: snapbuild.c:200
size_t xcnt
Definition: snapbuild.c:199
SnapBuildState state
Definition: snapbuild.c:144
struct SnapBuild::@22 running
TransactionId * xip
Definition: snapbuild.c:201
TransactionId xmax
Definition: snapbuild.c:153
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
#define NormalTransactionIdFollows(id1, id2)
Definition: transam.h:67
#define NormalTransactionIdPrecedes(id1, id2)
Definition: transam.h:62
TransactionId xmin
Definition: snapbuild.c:150
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138
bool SnapBuildXactNeedsSkip ( SnapBuild builder,
XLogRecPtr  ptr 
)

Definition at line 373 of file snapbuild.c.

References SnapBuild::start_decoding_at.

Referenced by DecodeCommit(), and DecodeLogicalMsgOp().

374 {
375  return ptr < builder->start_decoding_at;
376 }
XLogRecPtr start_decoding_at
Definition: snapbuild.c:159

Variable Documentation

bool ExportInProgress = false
static

Definition at line 246 of file snapbuild.c.

Referenced by SnapBuildClearExportedSnapshot(), and SnapBuildExportSnapshot().

ResourceOwner SavedResourceOwnerDuringExport = NULL
static

Definition at line 245 of file snapbuild.c.

Referenced by SnapBuildClearExportedSnapshot().