PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
snapbuild.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * snapbuild.c
4  *
5  * Infrastructure for building historic catalog snapshots based on contents
6  * of the WAL, for the purpose of decoding heapam.c style values in the
7  * WAL.
8  *
9  * NOTES:
10  *
11  * We build snapshots which can *only* be used to read catalog contents and we
12  * do so by reading and interpreting the WAL stream. The aim is to build a
13  * snapshot that behaves the same as a freshly taken MVCC snapshot would have
14  * at the time the XLogRecord was generated.
15  *
16  * To build the snapshots we reuse the infrastructure built for Hot
17  * Standby. The in-memory snapshots we build look different than HS' because
18  * we have different needs. To successfully decode data from the WAL we only
19  * need to access catalog tables and (sys|rel|cat)cache, not the actual user
20  * tables since the data we decode is wholly contained in the WAL
21  * records. Also, our snapshots need to be different in comparison to normal
22  * MVCC ones because in contrast to those we cannot fully rely on the clog and
23  * pg_subtrans for information about committed transactions because they might
24  * commit in the future from the POV of the WAL entry we're currently
25  * decoding. This definition has the advantage that we only need to prevent
26  * removal of catalog rows, while normal table's rows can still be
27  * removed. This is achieved by using the replication slot mechanism.
28  *
29  * As the percentage of transactions modifying the catalog normally is fairly
30  * small in comparisons to ones only manipulating user data, we keep track of
31  * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
32  * track of all running transactions like it's done in a normal snapshot. Note
33  * that we're generally only looking at transactions that have acquired an
34  * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
35  * that we consider committed, everything else is considered aborted/in
36  * progress. That also allows us not to care about subtransactions before they
37  * have committed which means this module, in contrast to HS, doesn't have to
38  * care about suboverflowed subtransactions and similar.
39  *
40  * One complexity of doing this is that to e.g. handle mixed DDL/DML
41  * transactions we need Snapshots that see intermediate versions of the
42  * catalog in a transaction. During normal operation this is achieved by using
43  * CommandIds/cmin/cmax. The problem with that however is that for space
44  * efficiency reasons only one value of that is stored
45  * (c.f. combocid.c). Since ComboCids are only available in memory we log
46  * additional information which allows us to get the original (cmin, cmax)
47  * pair during visibility checks. Check the reorderbuffer.c's comment above
48  * ResolveCminCmaxDuringDecoding() for details.
49  *
50  * To facilitate all this we need our own visibility routine, as the normal
51  * ones are optimized for different usecases.
52  *
53  * To replace the normal catalog snapshots with decoding ones use the
54  * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
55  *
56  *
57  *
58  * The snapbuild machinery is starting up in several stages, as illustrated
59  * by the following graph describing the SnapBuild->state transitions:
60  *
61  * +-------------------------+
62  * +----| START |-------------+
63  * | +-------------------------+ |
64  * | | |
65  * | | |
66  * | running_xacts #1 |
67  * | | |
68  * | | |
69  * | v |
70  * | +-------------------------+ v
71  * | | BUILDING_SNAPSHOT |------------>|
72  * | +-------------------------+ |
73  * | | |
74  * | | |
75  * | running_xacts #2, xacts from #1 finished |
76  * | | |
77  * | | |
78  * | v |
79  * | +-------------------------+ v
80  * | | FULL_SNAPSHOT |------------>|
81  * | +-------------------------+ |
82  * | | |
83  * running_xacts | saved snapshot
84  * with zero xacts | at running_xacts's lsn
85  * | | |
86  * | running_xacts with xacts from #2 finished |
87  * | | |
88  * | v |
89  * | +-------------------------+ |
90  * +--->|SNAPBUILD_CONSISTENT |<------------+
91  * +-------------------------+
92  *
93  * Initially the machinery is in the START stage. When an xl_running_xacts
94  * record is read that is sufficiently new (above the safe xmin horizon),
95  * there's a state transition. If there were no running xacts when the
96  * running_xacts record was generated, we'll directly go into CONSISTENT
97  * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
98  * snapshot means that all transactions that start henceforth can be decoded
99  * in their entirety, but transactions that started previously can't. In
100  * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
101  * running transactions have committed or aborted.
102  *
103  * Only transactions that commit after CONSISTENT state has been reached will
104  * be replayed, even though they might have started while still in
105  * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
106  * changes has been exported, but all the following ones will be. That point
107  * is a convenient point to initialize replication from, which is why we
108  * export a snapshot at that point, which *can* be used to read normal data.
109  *
110  * Copyright (c) 2012-2017, PostgreSQL Global Development Group
111  *
112  * IDENTIFICATION
113  * src/backend/replication/snapbuild.c
114  *
115  *-------------------------------------------------------------------------
116  */
117 
118 #include "postgres.h"
119 
120 #include <sys/stat.h>
121 #include <unistd.h>
122 
123 #include "miscadmin.h"
124 
125 #include "access/heapam_xlog.h"
126 #include "access/transam.h"
127 #include "access/xact.h"
128 
129 #include "pgstat.h"
130 
131 #include "replication/logical.h"
133 #include "replication/snapbuild.h"
134 
135 #include "utils/builtins.h"
136 #include "utils/memutils.h"
137 #include "utils/snapshot.h"
138 #include "utils/snapmgr.h"
139 #include "utils/tqual.h"
140 
141 #include "storage/block.h" /* debugging output */
142 #include "storage/fd.h"
143 #include "storage/lmgr.h"
144 #include "storage/proc.h"
145 #include "storage/procarray.h"
146 #include "storage/standby.h"
147 
148 /*
149  * This struct contains the current state of the snapshot building
150  * machinery. Besides a forward declaration in the header, it is not exposed
151  * to the public, so we can easily change its contents.
152  */
153 struct SnapBuild
154 {
155  /* how far are we along building our first full snapshot */
157 
158  /* private memory context used to allocate memory for this module. */
160 
161  /* all transactions < than this have committed/aborted */
163 
164  /* all transactions >= than this are uncommitted */
166 
167  /*
168  * Don't replay commits from an LSN < this LSN. This can be set externally
169  * but it will also be advanced (never retreat) from within snapbuild.c.
170  */
172 
173  /*
174  * Don't start decoding WAL until the "xl_running_xacts" information
175  * indicates there are no running xids with an xid smaller than this.
176  */
178 
179  /* Indicates if we are building full snapshot or just catalog one. */
181 
182  /*
183  * Snapshot that's valid to see the catalog state seen at this moment.
184  */
186 
187  /*
188  * LSN of the last location we are sure a snapshot has been serialized to.
189  */
191 
192  /*
193  * The reorderbuffer we need to update with usable snapshots et al.
194  */
196 
197  /*
198  * Outdated: This struct isn't used for its original purpose anymore, but
199  * can't be removed / changed in a minor version, because it's stored
200  * on-disk.
201  */
202  struct
203  {
204  /*
205  * NB: This field is misused, until a major version can break on-disk
206  * compatibility. See SnapBuildNextPhaseAt() /
207  * SnapBuildStartNextPhaseAt().
208  */
211 
212  size_t was_xcnt; /* number of used xip entries */
213  size_t was_xcnt_space; /* allocated size of xip */
214  TransactionId *was_xip; /* running xacts array, xidComparator-sorted */
215  } was_running;
216 
217  /*
218  * Array of transactions which could have catalog changes that committed
219  * between xmin and xmax.
220  */
221  struct
222  {
223  /* number of committed transactions */
224  size_t xcnt;
225 
226  /* available space for committed transactions */
227  size_t xcnt_space;
228 
229  /*
230  * Until we reach a CONSISTENT state, we record commits of all
231  * transactions, not just the catalog changing ones. Record when that
232  * changes so we know we cannot export a snapshot safely anymore.
233  */
235 
236  /*
237  * Array of committed transactions that have modified the catalog.
238  *
239  * As this array is frequently modified we do *not* keep it in
240  * xidComparator order. Instead we sort the array when building &
241  * distributing a snapshot.
242  *
243  * TODO: It's unclear whether that reasoning has much merit. Every
244  * time we add something here after becoming consistent will also
245  * require distributing a snapshot. Storing them sorted would
246  * potentially also make it easier to purge (but more complicated wrt
247  * wraparound?). Should be improved if sorting while building the
248  * snapshot shows up in profiles.
249  */
251  } committed;
252 };
253 
254 /*
255  * Starting a transaction -- which we need to do while exporting a snapshot --
256  * removes knowledge about the previously used resowner, so we save it here.
257  */
259 static bool ExportInProgress = false;
260 
261 /* ->committed manipulation */
262 static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
263 
264 /* snapshot building/manipulation/distribution functions */
265 static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
266 
267 static void SnapBuildFreeSnapshot(Snapshot snap);
268 
269 static void SnapBuildSnapIncRefcount(Snapshot snap);
270 
272 
273 /* xlog reading helper functions for SnapBuildProcessRecord */
274 static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
275 static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
276 
277 /* serialization functions */
278 static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
279 static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
280 
281 /*
282  * Return TransactionId after which the next phase of initial snapshot
283  * building will happen.
284  */
285 static inline TransactionId
287 {
288  /*
289  * For backward compatibility reasons this has to be stored in the wrongly
290  * named field. Will be fixed in next major version.
291  */
292  return builder->was_running.was_xmax;
293 }
294 
295 /*
296  * Set TransactionId after which the next phase of initial snapshot building
297  * will happen.
298  */
299 static inline void
301 {
302  /*
303  * For backward compatibility reasons this has to be stored in the wrongly
304  * named field. Will be fixed in next major version.
305  */
306  builder->was_running.was_xmax = at;
307 }
308 
309 /*
310  * Allocate a new snapshot builder.
311  *
312  * xmin_horizon is the xid >= which we can be sure no catalog rows have been
313  * removed, start_lsn is the LSN >= we want to replay commits.
314  */
315 SnapBuild *
317  TransactionId xmin_horizon,
318  XLogRecPtr start_lsn,
319  bool need_full_snapshot)
320 {
321  MemoryContext context;
322  MemoryContext oldcontext;
323  SnapBuild *builder;
324 
325  /* allocate memory in own context, to have better accountability */
327  "snapshot builder context",
329  oldcontext = MemoryContextSwitchTo(context);
330 
331  builder = palloc0(sizeof(SnapBuild));
332 
333  builder->state = SNAPBUILD_START;
334  builder->context = context;
335  builder->reorder = reorder;
336  /* Other struct members initialized by zeroing via palloc0 above */
337 
338  builder->committed.xcnt = 0;
339  builder->committed.xcnt_space = 128; /* arbitrary number */
340  builder->committed.xip =
341  palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
342  builder->committed.includes_all_transactions = true;
343 
344  builder->initial_xmin_horizon = xmin_horizon;
345  builder->start_decoding_at = start_lsn;
346  builder->building_full_snapshot = need_full_snapshot;
347 
348  MemoryContextSwitchTo(oldcontext);
349 
350  return builder;
351 }
352 
353 /*
354  * Free a snapshot builder.
355  */
356 void
358 {
359  MemoryContext context = builder->context;
360 
361  /* free snapshot explicitly, that contains some error checking */
362  if (builder->snapshot != NULL)
363  {
365  builder->snapshot = NULL;
366  }
367 
368  /* other resources are deallocated via memory context reset */
369  MemoryContextDelete(context);
370 }
371 
372 /*
373  * Free an unreferenced snapshot that has previously been built by us.
374  */
375 static void
377 {
378  /* make sure we don't get passed an external snapshot */
380 
381  /* make sure nobody modified our snapshot */
382  Assert(snap->curcid == FirstCommandId);
383  Assert(!snap->suboverflowed);
384  Assert(!snap->takenDuringRecovery);
385  Assert(snap->regd_count == 0);
386 
387  /* slightly more likely, so it's checked even without c-asserts */
388  if (snap->copied)
389  elog(ERROR, "cannot free a copied snapshot");
390 
391  if (snap->active_count)
392  elog(ERROR, "cannot free an active snapshot");
393 
394  pfree(snap);
395 }
396 
397 /*
398  * In which state of snapshot building are we?
399  */
402 {
403  return builder->state;
404 }
405 
406 /*
407  * Should the contents of transaction ending at 'ptr' be decoded?
408  */
409 bool
411 {
412  return ptr < builder->start_decoding_at;
413 }
414 
415 /*
416  * Increase refcount of a snapshot.
417  *
418  * This is used when handing out a snapshot to some external resource or when
419  * adding a Snapshot as builder->snapshot.
420  */
421 static void
423 {
424  snap->active_count++;
425 }
426 
427 /*
428  * Decrease refcount of a snapshot and free if the refcount reaches zero.
429  *
430  * Externally visible, so that external resources that have been handed an
431  * IncRef'ed Snapshot can adjust its refcount easily.
432  */
433 void
435 {
436  /* make sure we don't get passed an external snapshot */
438 
439  /* make sure nobody modified our snapshot */
440  Assert(snap->curcid == FirstCommandId);
441  Assert(!snap->suboverflowed);
442  Assert(!snap->takenDuringRecovery);
443 
444  Assert(snap->regd_count == 0);
445 
446  Assert(snap->active_count > 0);
447 
448  /* slightly more likely, so it's checked even without casserts */
449  if (snap->copied)
450  elog(ERROR, "cannot free a copied snapshot");
451 
452  snap->active_count--;
453  if (snap->active_count == 0)
454  SnapBuildFreeSnapshot(snap);
455 }
456 
457 /*
458  * Build a new snapshot, based on currently committed catalog-modifying
459  * transactions.
460  *
461  * In-progress transactions with catalog access are *not* allowed to modify
462  * these snapshots; they have to copy them and fill in appropriate ->curcid
463  * and ->subxip/subxcnt values.
464  */
465 static Snapshot
467 {
468  Snapshot snapshot;
469  Size ssize;
470 
471  Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
472 
473  ssize = sizeof(SnapshotData)
474  + sizeof(TransactionId) * builder->committed.xcnt
475  + sizeof(TransactionId) * 1 /* toplevel xid */ ;
476 
477  snapshot = MemoryContextAllocZero(builder->context, ssize);
478 
480 
481  /*
482  * We misuse the original meaning of SnapshotData's xip and subxip fields
483  * to make the more fitting for our needs.
484  *
485  * In the 'xip' array we store transactions that have to be treated as
486  * committed. Since we will only ever look at tuples from transactions
487  * that have modified the catalog it's more efficient to store those few
488  * that exist between xmin and xmax (frequently there are none).
489  *
490  * Snapshots that are used in transactions that have modified the catalog
491  * also use the 'subxip' array to store their toplevel xid and all the
492  * subtransaction xids so we can recognize when we need to treat rows as
493  * visible that are not in xip but still need to be visible. Subxip only
494  * gets filled when the transaction is copied into the context of a
495  * catalog modifying transaction since we otherwise share a snapshot
496  * between transactions. As long as a txn hasn't modified the catalog it
497  * doesn't need to treat any uncommitted rows as visible, so there is no
498  * need for those xids.
499  *
500  * Both arrays are qsort'ed so that we can use bsearch() on them.
501  */
502  Assert(TransactionIdIsNormal(builder->xmin));
503  Assert(TransactionIdIsNormal(builder->xmax));
504 
505  snapshot->xmin = builder->xmin;
506  snapshot->xmax = builder->xmax;
507 
508  /* store all transactions to be treated as committed by this snapshot */
509  snapshot->xip =
510  (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
511  snapshot->xcnt = builder->committed.xcnt;
512  memcpy(snapshot->xip,
513  builder->committed.xip,
514  builder->committed.xcnt * sizeof(TransactionId));
515 
516  /* sort so we can bsearch() */
517  qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
518 
519  /*
520  * Initially, subxip is empty, i.e. it's a snapshot to be used by
521  * transactions that don't modify the catalog. Will be filled by
522  * ReorderBufferCopySnap() if necessary.
523  */
524  snapshot->subxcnt = 0;
525  snapshot->subxip = NULL;
526 
527  snapshot->suboverflowed = false;
528  snapshot->takenDuringRecovery = false;
529  snapshot->copied = false;
530  snapshot->curcid = FirstCommandId;
531  snapshot->active_count = 0;
532  snapshot->regd_count = 0;
533 
534  return snapshot;
535 }
536 
537 /*
538  * Build the initial slot snapshot and convert it to a normal snapshot that
539  * is understood by HeapTupleSatisfiesMVCC.
540  *
541  * The snapshot will be usable directly in current transaction or exported
542  * for loading in different transaction.
543  */
544 Snapshot
546 {
547  Snapshot snap;
548  TransactionId xid;
549  TransactionId *newxip;
550  int newxcnt = 0;
551 
554 
555  if (builder->state != SNAPBUILD_CONSISTENT)
556  elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
557 
558  if (!builder->committed.includes_all_transactions)
559  elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
560 
561  /* so we don't overwrite the existing value */
563  elog(ERROR, "cannot build an initial slot snapshot when MyPgXact->xmin already is valid");
564 
565  snap = SnapBuildBuildSnapshot(builder);
566 
567  /*
568  * We know that snap->xmin is alive, enforced by the logical xmin
569  * mechanism. Due to that we can do this without locks, we're only
570  * changing our own value.
571  */
572 #ifdef USE_ASSERT_CHECKING
573  {
574  TransactionId safeXid;
575 
576  LWLockAcquire(ProcArrayLock, LW_SHARED);
577  safeXid = GetOldestSafeDecodingTransactionId(false);
578  LWLockRelease(ProcArrayLock);
579 
580  Assert(TransactionIdPrecedesOrEquals(safeXid, snap->xmin));
581  }
582 #endif
583 
584  MyPgXact->xmin = snap->xmin;
585 
586  /* allocate in transaction context */
587  newxip = (TransactionId *)
589 
590  /*
591  * snapbuild.c builds transactions in an "inverted" manner, which means it
592  * stores committed transactions in ->xip, not ones in progress. Build a
593  * classical snapshot by marking all non-committed transactions as
594  * in-progress. This can be expensive.
595  */
596  for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
597  {
598  void *test;
599 
600  /*
601  * Check whether transaction committed using the decoding snapshot
602  * meaning of ->xip.
603  */
604  test = bsearch(&xid, snap->xip, snap->xcnt,
605  sizeof(TransactionId), xidComparator);
606 
607  if (test == NULL)
608  {
609  if (newxcnt >= GetMaxSnapshotXidCount())
610  ereport(ERROR,
611  (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
612  errmsg("initial slot snapshot too large")));
613 
614  newxip[newxcnt++] = xid;
615  }
616 
618  }
619 
620  snap->xcnt = newxcnt;
621  snap->xip = newxip;
622 
623  return snap;
624 }
625 
626 /*
627  * Export a snapshot so it can be set in another session with SET TRANSACTION
628  * SNAPSHOT.
629  *
630  * For that we need to start a transaction in the current backend as the
631  * importing side checks whether the source transaction is still open to make
632  * sure the xmin horizon hasn't advanced since then.
633  */
634 const char *
636 {
637  Snapshot snap;
638  char *snapname;
639 
641  elog(ERROR, "cannot export a snapshot from within a transaction");
642 
643  if (SavedResourceOwnerDuringExport)
644  elog(ERROR, "can only export one snapshot at a time");
645 
646  SavedResourceOwnerDuringExport = CurrentResourceOwner;
647  ExportInProgress = true;
648 
650 
651  /* There doesn't seem to a nice API to set these */
653  XactReadOnly = true;
654 
655  snap = SnapBuildInitialSnapshot(builder);
656 
657  /*
658  * now that we've built a plain snapshot, make it active and use the
659  * normal mechanisms for exporting it
660  */
661  snapname = ExportSnapshot(snap);
662 
663  ereport(LOG,
664  (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
665  "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
666  snap->xcnt,
667  snapname, snap->xcnt)));
668  return snapname;
669 }
670 
671 /*
672  * Ensure there is a snapshot and if not build one for current transaction.
673  */
674 Snapshot
676 {
677  Assert(builder->state == SNAPBUILD_CONSISTENT);
678 
679  /* only build a new snapshot if we don't have a prebuilt one */
680  if (builder->snapshot == NULL)
681  {
682  builder->snapshot = SnapBuildBuildSnapshot(builder);
683  /* increase refcount for the snapshot builder */
685  }
686 
687  return builder->snapshot;
688 }
689 
690 /*
691  * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
692  * any. Aborts the previously started transaction and resets the resource
693  * owner back to its original value.
694  */
695 void
697 {
698  /* nothing exported, that is the usual case */
699  if (!ExportInProgress)
700  return;
701 
702  if (!IsTransactionState())
703  elog(ERROR, "clearing exported snapshot in wrong transaction state");
704 
705  /* make sure nothing could have ever happened */
707 
709  SavedResourceOwnerDuringExport = NULL;
710  ExportInProgress = false;
711 }
712 
713 /*
714  * Handle the effects of a single heap change, appropriate to the current state
715  * of the snapshot builder and returns whether changes made at (xid, lsn) can
716  * be decoded.
717  */
718 bool
720 {
721  /*
722  * We can't handle data in transactions if we haven't built a snapshot
723  * yet, so don't store them.
724  */
725  if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
726  return false;
727 
728  /*
729  * No point in keeping track of changes in transactions that we don't have
730  * enough information about to decode. This means that they started before
731  * we got into the SNAPBUILD_FULL_SNAPSHOT state.
732  */
733  if (builder->state < SNAPBUILD_CONSISTENT &&
735  return false;
736 
737  /*
738  * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
739  * be needed to decode the change we're currently processing.
740  */
741  if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
742  {
743  /* only build a new snapshot if we don't have a prebuilt one */
744  if (builder->snapshot == NULL)
745  {
746  builder->snapshot = SnapBuildBuildSnapshot(builder);
747  /* increase refcount for the snapshot builder */
749  }
750 
751  /*
752  * Increase refcount for the transaction we're handing the snapshot
753  * out to.
754  */
756  ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
757  builder->snapshot);
758  }
759 
760  return true;
761 }
762 
763 /*
764  * Do CommandId/ComboCid handling after reading an xl_heap_new_cid record.
765  * This implies that a transaction has done some form of write to system
766  * catalogs.
767  */
768 void
770  XLogRecPtr lsn, xl_heap_new_cid *xlrec)
771 {
772  CommandId cid;
773 
774  /*
775  * we only log new_cid's if a catalog tuple was modified, so mark the
776  * transaction as containing catalog modifications
777  */
778  ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
779 
780  ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
781  xlrec->target_node, xlrec->target_tid,
782  xlrec->cmin, xlrec->cmax,
783  xlrec->combocid);
784 
785  /* figure out new command id */
786  if (xlrec->cmin != InvalidCommandId &&
787  xlrec->cmax != InvalidCommandId)
788  cid = Max(xlrec->cmin, xlrec->cmax);
789  else if (xlrec->cmax != InvalidCommandId)
790  cid = xlrec->cmax;
791  else if (xlrec->cmin != InvalidCommandId)
792  cid = xlrec->cmin;
793  else
794  {
795  cid = InvalidCommandId; /* silence compiler */
796  elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
797  }
798 
799  ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
800 }
801 
802 /*
803  * Add a new Snapshot to all transactions we're decoding that currently are
804  * in-progress so they can see new catalog contents made by the transaction
805  * that just committed. This is necessary because those in-progress
806  * transactions will use the new catalog's contents from here on (at the very
807  * least everything they do needs to be compatible with newer catalog
808  * contents).
809  */
810 static void
812 {
813  dlist_iter txn_i;
814  ReorderBufferTXN *txn;
815 
816  /*
817  * Iterate through all toplevel transactions. This can include
818  * subtransactions which we just don't yet know to be that, but that's
819  * fine, they will just get an unnecessary snapshot queued.
820  */
821  dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
822  {
823  txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
824 
826 
827  /*
828  * If we don't have a base snapshot yet, there are no changes in this
829  * transaction which in turn implies we don't yet need a snapshot at
830  * all. We'll add a snapshot when the first change gets queued.
831  *
832  * NB: This works correctly even for subtransactions because
833  * ReorderBufferCommitChild() takes care to pass the parent the base
834  * snapshot, and while iterating the changequeue we'll get the change
835  * from the subtxn.
836  */
837  if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
838  continue;
839 
840  elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
841  txn->xid, (uint32) (lsn >> 32), (uint32) lsn);
842 
843  /*
844  * increase the snapshot's refcount for the transaction we are handing
845  * it out to
846  */
848  ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
849  builder->snapshot);
850  }
851 }
852 
853 /*
854  * Keep track of a new catalog changing transaction that has committed.
855  */
856 static void
858 {
860 
861  if (builder->committed.xcnt == builder->committed.xcnt_space)
862  {
863  builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
864 
865  elog(DEBUG1, "increasing space for committed transactions to %u",
866  (uint32) builder->committed.xcnt_space);
867 
868  builder->committed.xip = repalloc(builder->committed.xip,
869  builder->committed.xcnt_space * sizeof(TransactionId));
870  }
871 
872  /*
873  * TODO: It might make sense to keep the array sorted here instead of
874  * doing it every time we build a new snapshot. On the other hand this
875  * gets called repeatedly when a transaction with subtransactions commits.
876  */
877  builder->committed.xip[builder->committed.xcnt++] = xid;
878 }
879 
880 /*
881  * Remove knowledge about transactions we treat as committed that are smaller
882  * than ->xmin. Those won't ever get checked via the ->committed array but via
883  * the clog machinery, so we don't need to waste memory on them.
884  */
885 static void
887 {
888  int off;
889  TransactionId *workspace;
890  int surviving_xids = 0;
891 
892  /* not ready yet */
893  if (!TransactionIdIsNormal(builder->xmin))
894  return;
895 
896  /* TODO: Neater algorithm than just copying and iterating? */
897  workspace =
898  MemoryContextAlloc(builder->context,
899  builder->committed.xcnt * sizeof(TransactionId));
900 
901  /* copy xids that still are interesting to workspace */
902  for (off = 0; off < builder->committed.xcnt; off++)
903  {
904  if (NormalTransactionIdPrecedes(builder->committed.xip[off],
905  builder->xmin))
906  ; /* remove */
907  else
908  workspace[surviving_xids++] = builder->committed.xip[off];
909  }
910 
911  /* copy workspace back to persistent state */
912  memcpy(builder->committed.xip, workspace,
913  surviving_xids * sizeof(TransactionId));
914 
915  elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
916  (uint32) builder->committed.xcnt, (uint32) surviving_xids,
917  builder->xmin, builder->xmax);
918  builder->committed.xcnt = surviving_xids;
919 
920  pfree(workspace);
921 }
922 
923 /*
924  * Handle everything that needs to be done when a transaction commits
925  */
926 void
928  int nsubxacts, TransactionId *subxacts)
929 {
930  int nxact;
931 
932  bool needs_snapshot = false;
933  bool needs_timetravel = false;
934  bool sub_needs_timetravel = false;
935 
936  TransactionId xmax = xid;
937 
938  /*
939  * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor
940  * will they be part of a snapshot. So we don't need to record anything.
941  */
942  if (builder->state == SNAPBUILD_START ||
943  (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
945  {
946  /* ensure that only commits after this are getting replayed */
947  if (builder->start_decoding_at <= lsn)
948  builder->start_decoding_at = lsn + 1;
949  return;
950  }
951 
952  if (builder->state < SNAPBUILD_CONSISTENT)
953  {
954  /* ensure that only commits after this are getting replayed */
955  if (builder->start_decoding_at <= lsn)
956  builder->start_decoding_at = lsn + 1;
957 
958  /*
959  * If building an exportable snapshot, force xid to be tracked, even
960  * if the transaction didn't modify the catalog.
961  */
962  if (builder->building_full_snapshot)
963  {
964  needs_timetravel = true;
965  }
966  }
967 
968  for (nxact = 0; nxact < nsubxacts; nxact++)
969  {
970  TransactionId subxid = subxacts[nxact];
971 
972  /*
973  * Add subtransaction to base snapshot if catalog modifying, we don't
974  * distinguish to toplevel transactions there.
975  */
976  if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid))
977  {
978  sub_needs_timetravel = true;
979  needs_snapshot = true;
980 
981  elog(DEBUG1, "found subtransaction %u:%u with catalog changes",
982  xid, subxid);
983 
984  SnapBuildAddCommittedTxn(builder, subxid);
985 
986  if (NormalTransactionIdFollows(subxid, xmax))
987  xmax = subxid;
988  }
989 
990  /*
991  * If we're forcing timetravel we also need visibility information
992  * about subtransaction, so keep track of subtransaction's state, even
993  * if not catalog modifying. Don't need to distribute a snapshot in
994  * that case.
995  */
996  else if (needs_timetravel)
997  {
998  SnapBuildAddCommittedTxn(builder, subxid);
999  if (NormalTransactionIdFollows(subxid, xmax))
1000  xmax = subxid;
1001  }
1002  }
1003 
1004  /* if top-level modified catalog, it'll need a snapshot */
1005  if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
1006  {
1007  elog(DEBUG2, "found top level transaction %u, with catalog changes",
1008  xid);
1009  needs_snapshot = true;
1010  needs_timetravel = true;
1011  SnapBuildAddCommittedTxn(builder, xid);
1012  }
1013  else if (sub_needs_timetravel)
1014  {
1015  /* track toplevel txn as well, subxact alone isn't meaningful */
1016  SnapBuildAddCommittedTxn(builder, xid);
1017  }
1018  else if (needs_timetravel)
1019  {
1020  elog(DEBUG2, "forced transaction %u to do timetravel", xid);
1021 
1022  SnapBuildAddCommittedTxn(builder, xid);
1023  }
1024 
1025  if (!needs_timetravel)
1026  {
1027  /* record that we cannot export a general snapshot anymore */
1028  builder->committed.includes_all_transactions = false;
1029  }
1030 
1031  Assert(!needs_snapshot || needs_timetravel);
1032 
1033  /*
1034  * Adjust xmax of the snapshot builder, we only do that for committed,
1035  * catalog modifying, transactions, everything else isn't interesting for
1036  * us since we'll never look at the respective rows.
1037  */
1038  if (needs_timetravel &&
1039  (!TransactionIdIsValid(builder->xmax) ||
1040  TransactionIdFollowsOrEquals(xmax, builder->xmax)))
1041  {
1042  builder->xmax = xmax;
1043  TransactionIdAdvance(builder->xmax);
1044  }
1045 
1046  /* if there's any reason to build a historic snapshot, do so now */
1047  if (needs_snapshot)
1048  {
1049  /*
1050  * If we haven't built a complete snapshot yet there's no need to hand
1051  * it out, it wouldn't (and couldn't) be used anyway.
1052  */
1053  if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
1054  return;
1055 
1056  /*
1057  * Decrease the snapshot builder's refcount of the old snapshot, note
1058  * that it still will be used if it has been handed out to the
1059  * reorderbuffer earlier.
1060  */
1061  if (builder->snapshot)
1063 
1064  builder->snapshot = SnapBuildBuildSnapshot(builder);
1065 
1066  /* we might need to execute invalidations, add snapshot */
1067  if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
1068  {
1070  ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
1071  builder->snapshot);
1072  }
1073 
1074  /* refcount of the snapshot builder for the new snapshot */
1076 
1077  /* add a new Snapshot to all currently running transactions */
1079  }
1080 }
1081 
1082 
1083 /* -----------------------------------
1084  * Snapshot building functions dealing with xlog records
1085  * -----------------------------------
1086  */
1087 
1088 /*
1089  * Process a running xacts record, and use its information to first build a
1090  * historic snapshot and later to release resources that aren't needed
1091  * anymore.
1092  */
1093 void
1095 {
1096  ReorderBufferTXN *txn;
1097 
1098  /*
1099  * If we're not consistent yet, inspect the record to see whether it
1100  * allows to get closer to being consistent. If we are consistent, dump
1101  * our snapshot so others or we, after a restart, can use it.
1102  */
1103  if (builder->state < SNAPBUILD_CONSISTENT)
1104  {
1105  /* returns false if there's no point in performing cleanup just yet */
1106  if (!SnapBuildFindSnapshot(builder, lsn, running))
1107  return;
1108  }
1109  else
1110  SnapBuildSerialize(builder, lsn);
1111 
1112  /*
1113  * Update range of interesting xids based on the running xacts
1114  * information. We don't increase ->xmax using it, because once we are in
1115  * a consistent state we can do that ourselves and much more efficiently
1116  * so, because we only need to do it for catalog transactions since we
1117  * only ever look at those.
1118  *
1119  * NB: We only increase xmax when a catalog modifying transaction commits
1120  * (see SnapBuildCommitTxn). Because of this, xmax can be lower than
1121  * xmin, which looks odd but is correct and actually more efficient, since
1122  * we hit fast paths in tqual.c.
1123  */
1124  builder->xmin = running->oldestRunningXid;
1125 
1126  /* Remove transactions we don't need to keep track off anymore */
1127  SnapBuildPurgeCommittedTxn(builder);
1128 
1129  elog(DEBUG3, "xmin: %u, xmax: %u, oldestrunning: %u",
1130  builder->xmin, builder->xmax,
1131  running->oldestRunningXid);
1132 
1133  /*
1134  * Increase shared memory limits, so vacuum can work on tuples we
1135  * prevented from being pruned till now.
1136  */
1138 
1139  /*
1140  * Also tell the slot where we can restart decoding from. We don't want to
1141  * do that after every commit because changing that implies an fsync of
1142  * the logical slot's state file, so we only do it every time we see a
1143  * running xacts record.
1144  *
1145  * Do so by looking for the oldest in progress transaction (determined by
1146  * the first LSN of any of its relevant records). Every transaction
1147  * remembers the last location we stored the snapshot to disk before its
1148  * beginning. That point is where we can restart from.
1149  */
1150 
1151  /*
1152  * Can't know about a serialized snapshot's location if we're not
1153  * consistent.
1154  */
1155  if (builder->state < SNAPBUILD_CONSISTENT)
1156  return;
1157 
1158  txn = ReorderBufferGetOldestTXN(builder->reorder);
1159 
1160  /*
1161  * oldest ongoing txn might have started when we didn't yet serialize
1162  * anything because we hadn't reached a consistent state yet.
1163  */
1164  if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
1166 
1167  /*
1168  * No in-progress transaction, can reuse the last serialized snapshot if
1169  * we have one.
1170  */
1171  else if (txn == NULL &&
1175  builder->last_serialized_snapshot);
1176 }
1177 
1178 
1179 /*
1180  * Build the start of a snapshot that's capable of decoding the catalog.
1181  *
1182  * Helper function for SnapBuildProcessRunningXacts() while we're not yet
1183  * consistent.
1184  *
1185  * Returns true if there is a point in performing internal maintenance/cleanup
1186  * using the xl_running_xacts record.
1187  */
1188 static bool
1190 {
1191  /* ---
1192  * Build catalog decoding snapshot incrementally using information about
1193  * the currently running transactions. There are several ways to do that:
1194  *
1195  * a) There were no running transactions when the xl_running_xacts record
1196  * was inserted, jump to CONSISTENT immediately. We might find such a
1197  * state while waiting on c)'s sub-states.
1198  *
1199  * b) This (in a previous run) or another decoding slot serialized a
1200  * snapshot to disk that we can use. Can't use this method for the
1201  * initial snapshot when slot is being created and needs full snapshot
1202  * for export or direct use, as that snapshot will only contain catalog
1203  * modifying transactions.
1204  *
1205  * c) First incrementally build a snapshot for catalog tuples
1206  * (BUILDING_SNAPSHOT), that requires all, already in-progress,
1207  * transactions to finish. Every transaction starting after that
1208  * (FULL_SNAPSHOT state), has enough information to be decoded. But
1209  * for older running transactions no viable snapshot exists yet, so
1210  * CONSISTENT will only be reached once all of those have finished.
1211  * ---
1212  */
1213 
1214  /*
1215  * xl_running_xact record is older than what we can use, we might not have
1216  * all necessary catalog rows anymore.
1217  */
1220  builder->initial_xmin_horizon))
1221  {
1222  ereport(DEBUG1,
1223  (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
1224  (uint32) (lsn >> 32), (uint32) lsn),
1225  errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
1226  builder->initial_xmin_horizon, running->oldestRunningXid)));
1227 
1228 
1229  SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
1230 
1231  return true;
1232  }
1233 
1234  /*
1235  * a) No transaction were running, we can jump to consistent.
1236  *
1237  * This is not affected by races around xl_running_xacts, because we can
1238  * miss transaction commits, but currently not transactions starting.
1239  *
1240  * NB: We might have already started to incrementally assemble a snapshot,
1241  * so we need to be careful to deal with that.
1242  */
1243  if (running->oldestRunningXid == running->nextXid)
1244  {
1245  if (builder->start_decoding_at == InvalidXLogRecPtr ||
1246  builder->start_decoding_at <= lsn)
1247  /* can decode everything after this */
1248  builder->start_decoding_at = lsn + 1;
1249 
1250  /* As no transactions were running xmin/xmax can be trivially set. */
1251  builder->xmin = running->nextXid; /* < are finished */
1252  builder->xmax = running->nextXid; /* >= are running */
1253 
1254  /* so we can safely use the faster comparisons */
1255  Assert(TransactionIdIsNormal(builder->xmin));
1256  Assert(TransactionIdIsNormal(builder->xmax));
1257 
1258  builder->state = SNAPBUILD_CONSISTENT;
1260 
1261  ereport(LOG,
1262  (errmsg("logical decoding found consistent point at %X/%X",
1263  (uint32) (lsn >> 32), (uint32) lsn),
1264  errdetail("There are no running transactions.")));
1265 
1266  return false;
1267  }
1268  /* b) valid on disk state and not building full snapshot */
1269  else if (!builder->building_full_snapshot &&
1270  SnapBuildRestore(builder, lsn))
1271  {
1272  /* there won't be any state to cleanup */
1273  return false;
1274  }
1275 
1276  /*
1277  * c) transition from START to BUILDING_SNAPSHOT.
1278  *
1279  * In START state, and a xl_running_xacts record with running xacts is
1280  * encountered. In that case, switch to BUILDING_SNAPSHOT state, and
1281  * record xl_running_xacts->nextXid. Once all running xacts have finished
1282  * (i.e. they're all >= nextXid), we have a complete catalog snapshot. It
1283  * might look that we could use xl_running_xact's ->xids information to
1284  * get there quicker, but that is problematic because transactions marked
1285  * as running, might already have inserted their commit record - it's
1286  * infeasible to change that with locking.
1287  */
1288  else if (builder->state == SNAPBUILD_START)
1289  {
1291  SnapBuildStartNextPhaseAt(builder, running->nextXid);
1292 
1293  /*
1294  * Start with an xmin/xmax that's correct for future, when all the
1295  * currently running transactions have finished. We'll update both
1296  * while waiting for the pending transactions to finish.
1297  */
1298  builder->xmin = running->nextXid; /* < are finished */
1299  builder->xmax = running->nextXid; /* >= are running */
1300 
1301  /* so we can safely use the faster comparisons */
1302  Assert(TransactionIdIsNormal(builder->xmin));
1303  Assert(TransactionIdIsNormal(builder->xmax));
1304 
1305  ereport(LOG,
1306  (errmsg("logical decoding found initial starting point at %X/%X",
1307  (uint32) (lsn >> 32), (uint32) lsn),
1308  errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1309  running->xcnt, running->nextXid)));
1310 
1311  SnapBuildWaitSnapshot(running, running->nextXid);
1312  }
1313 
1314  /*
1315  * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
1316  *
1317  * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
1318  * is >= than nextXid from when we switched to BUILDING_SNAPSHOT. This
1319  * means all transactions starting afterwards have enough information to
1320  * be decoded. Switch to FULL_SNAPSHOT.
1321  */
1322  else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
1324  running->oldestRunningXid))
1325  {
1326  builder->state = SNAPBUILD_FULL_SNAPSHOT;
1327  SnapBuildStartNextPhaseAt(builder, running->nextXid);
1328 
1329  ereport(LOG,
1330  (errmsg("logical decoding found initial consistent point at %X/%X",
1331  (uint32) (lsn >> 32), (uint32) lsn),
1332  errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1333  running->xcnt, running->nextXid)));
1334 
1335  SnapBuildWaitSnapshot(running, running->nextXid);
1336  }
1337 
1338  /*
1339  * c) transition from FULL_SNAPSHOT to CONSISTENT.
1340  *
1341  * In FULL_SNAPSHOT state (see d) ), and this xl_running_xacts'
1342  * oldestRunningXid is >= than nextXid from when we switched to
1343  * FULL_SNAPSHOT. This means all transactions that are currently in
1344  * progress have a catalog snapshot, and all their changes have been
1345  * collected. Switch to CONSISTENT.
1346  */
1347  else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
1349  running->oldestRunningXid))
1350  {
1351  builder->state = SNAPBUILD_CONSISTENT;
1353 
1354  ereport(LOG,
1355  (errmsg("logical decoding found consistent point at %X/%X",
1356  (uint32) (lsn >> 32), (uint32) lsn),
1357  errdetail("There are no old transactions anymore.")));
1358  }
1359 
1360  /*
1361  * We already started to track running xacts and need to wait for all
1362  * in-progress ones to finish. We fall through to the normal processing of
1363  * records so incremental cleanup can be performed.
1364  */
1365  return true;
1366 
1367 }
1368 
1369 /* ---
1370  * Iterate through xids in record, wait for all older than the cutoff to
1371  * finish. Then, if possible, log a new xl_running_xacts record.
1372  *
1373  * This isn't required for the correctness of decoding, but to:
1374  * a) allow isolationtester to notice that we're currently waiting for
1375  * something.
1376  * b) log a new xl_running_xacts record where it'd be helpful, without having
1377  * to write for bgwriter or checkpointer.
1378  * ---
1379  */
1380 static void
1382 {
1383  int off;
1384 
1385  for (off = 0; off < running->xcnt; off++)
1386  {
1387  TransactionId xid = running->xids[off];
1388 
1389  /*
1390  * Upper layers should prevent that we ever need to wait on ourselves.
1391  * Check anyway, since failing to do so would either result in an
1392  * endless wait or an Assert() failure.
1393  */
1395  elog(ERROR, "waiting for ourselves");
1396 
1397  if (TransactionIdFollows(xid, cutoff))
1398  continue;
1399 
1400  XactLockTableWait(xid, NULL, NULL, XLTW_None);
1401  }
1402 
1403  /*
1404  * All transactions we needed to finish finished - try to ensure there is
1405  * another xl_running_xacts record in a timely manner, without having to
1406  * write for bgwriter or checkpointer to log one. During recovery we
1407  * can't enforce that, so we'll have to wait.
1408  */
1409  if (!RecoveryInProgress())
1410  {
1412  }
1413 }
1414 
1415 /* -----------------------------------
1416  * Snapshot serialization support
1417  * -----------------------------------
1418  */
1419 
1420 /*
1421  * We store current state of struct SnapBuild on disk in the following manner:
1422  *
1423  * struct SnapBuildOnDisk;
1424  * TransactionId * running.xcnt_space;
1425  * TransactionId * committed.xcnt; (*not xcnt_space*)
1426  *
1427  */
1428 typedef struct SnapBuildOnDisk
1429 {
1430  /* first part of this struct needs to be version independent */
1431 
1432  /* data not covered by checksum */
1435 
1436  /* data covered by checksum */
1437 
1438  /* version, in case we want to support pg_upgrade */
1440  /* how large is the on disk data, excluding the constant sized part */
1442 
1443  /* version dependent part */
1445 
1446  /* variable amount of TransactionIds follows */
1447 } SnapBuildOnDisk;
1448 
1449 #define SnapBuildOnDiskConstantSize \
1450  offsetof(SnapBuildOnDisk, builder)
1451 #define SnapBuildOnDiskNotChecksummedSize \
1452  offsetof(SnapBuildOnDisk, version)
1453 
1454 #define SNAPBUILD_MAGIC 0x51A1E001
1455 #define SNAPBUILD_VERSION 2
1456 
1457 /*
1458  * Store/Load a snapshot from disk, depending on the snapshot builder's state.
1459  *
1460  * Supposed to be used by external (i.e. not snapbuild.c) code that just read
1461  * a record that's a potential location for a serialized snapshot.
1462  */
1463 void
1465 {
1466  if (builder->state < SNAPBUILD_CONSISTENT)
1467  SnapBuildRestore(builder, lsn);
1468  else
1469  SnapBuildSerialize(builder, lsn);
1470 }
1471 
1472 /*
1473  * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
1474  * been done by another decoding process.
1475  */
1476 static void
1478 {
1479  Size needed_length;
1480  SnapBuildOnDisk *ondisk;
1481  char *ondisk_c;
1482  int fd;
1483  char tmppath[MAXPGPATH];
1484  char path[MAXPGPATH];
1485  int ret;
1486  struct stat stat_buf;
1487  Size sz;
1488 
1489  Assert(lsn != InvalidXLogRecPtr);
1491  builder->last_serialized_snapshot <= lsn);
1492 
1493  /*
1494  * no point in serializing if we cannot continue to work immediately after
1495  * restoring the snapshot
1496  */
1497  if (builder->state < SNAPBUILD_CONSISTENT)
1498  return;
1499 
1500  /*
1501  * We identify snapshots by the LSN they are valid for. We don't need to
1502  * include timelines in the name as each LSN maps to exactly one timeline
1503  * unless the user used pg_resetwal or similar. If a user did so, there's
1504  * no hope continuing to decode anyway.
1505  */
1506  sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1507  (uint32) (lsn >> 32), (uint32) lsn);
1508 
1509  /*
1510  * first check whether some other backend already has written the snapshot
1511  * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
1512  * as a valid state. Everything else is an unexpected error.
1513  */
1514  ret = stat(path, &stat_buf);
1515 
1516  if (ret != 0 && errno != ENOENT)
1517  ereport(ERROR,
1518  (errmsg("could not stat file \"%s\": %m", path)));
1519 
1520  else if (ret == 0)
1521  {
1522  /*
1523  * somebody else has already serialized to this point, don't overwrite
1524  * but remember location, so we don't need to read old data again.
1525  *
1526  * To be sure it has been synced to disk after the rename() from the
1527  * tempfile filename to the real filename, we just repeat the fsync.
1528  * That ought to be cheap because in most scenarios it should already
1529  * be safely on disk.
1530  */
1531  fsync_fname(path, false);
1532  fsync_fname("pg_logical/snapshots", true);
1533 
1534  builder->last_serialized_snapshot = lsn;
1535  goto out;
1536  }
1537 
1538  /*
1539  * there is an obvious race condition here between the time we stat(2) the
1540  * file and us writing the file. But we rename the file into place
1541  * atomically and all files created need to contain the same data anyway,
1542  * so this is perfectly fine, although a bit of a resource waste. Locking
1543  * seems like pointless complication.
1544  */
1545  elog(DEBUG1, "serializing snapshot to %s", path);
1546 
1547  /* to make sure only we will write to this tempfile, include pid */
1548  sprintf(tmppath, "pg_logical/snapshots/%X-%X.snap.%u.tmp",
1549  (uint32) (lsn >> 32), (uint32) lsn, MyProcPid);
1550 
1551  /*
1552  * Unlink temporary file if it already exists, needs to have been before a
1553  * crash/error since we won't enter this function twice from within a
1554  * single decoding slot/backend and the temporary file contains the pid of
1555  * the current process.
1556  */
1557  if (unlink(tmppath) != 0 && errno != ENOENT)
1558  ereport(ERROR,
1560  errmsg("could not remove file \"%s\": %m", path)));
1561 
1562  needed_length = sizeof(SnapBuildOnDisk) +
1563  sizeof(TransactionId) * builder->committed.xcnt;
1564 
1565  ondisk_c = MemoryContextAllocZero(builder->context, needed_length);
1566  ondisk = (SnapBuildOnDisk *) ondisk_c;
1567  ondisk->magic = SNAPBUILD_MAGIC;
1568  ondisk->version = SNAPBUILD_VERSION;
1569  ondisk->length = needed_length;
1570  INIT_CRC32C(ondisk->checksum);
1571  COMP_CRC32C(ondisk->checksum,
1572  ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1574  ondisk_c += sizeof(SnapBuildOnDisk);
1575 
1576  memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
1577  /* NULL-ify memory-only data */
1578  ondisk->builder.context = NULL;
1579  ondisk->builder.snapshot = NULL;
1580  ondisk->builder.reorder = NULL;
1581  ondisk->builder.committed.xip = NULL;
1582 
1583  COMP_CRC32C(ondisk->checksum,
1584  &ondisk->builder,
1585  sizeof(SnapBuild));
1586 
1587  /* there shouldn't be any running xacts */
1588  Assert(builder->was_running.was_xcnt == 0);
1589 
1590  /* copy committed xacts */
1591  sz = sizeof(TransactionId) * builder->committed.xcnt;
1592  memcpy(ondisk_c, builder->committed.xip, sz);
1593  COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1594  ondisk_c += sz;
1595 
1596  FIN_CRC32C(ondisk->checksum);
1597 
1598  /* we have valid data now, open tempfile and write it there */
1599  fd = OpenTransientFile(tmppath,
1600  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
1601  S_IRUSR | S_IWUSR);
1602  if (fd < 0)
1603  ereport(ERROR,
1604  (errmsg("could not open file \"%s\": %m", path)));
1605 
1607  if ((write(fd, ondisk, needed_length)) != needed_length)
1608  {
1609  CloseTransientFile(fd);
1610  ereport(ERROR,
1612  errmsg("could not write to file \"%s\": %m", tmppath)));
1613  }
1615 
1616  /*
1617  * fsync the file before renaming so that even if we crash after this we
1618  * have either a fully valid file or nothing.
1619  *
1620  * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
1621  * some noticeable overhead since it's performed synchronously during
1622  * decoding?
1623  */
1625  if (pg_fsync(fd) != 0)
1626  {
1627  CloseTransientFile(fd);
1628  ereport(ERROR,
1630  errmsg("could not fsync file \"%s\": %m", tmppath)));
1631  }
1633  CloseTransientFile(fd);
1634 
1635  fsync_fname("pg_logical/snapshots", true);
1636 
1637  /*
1638  * We may overwrite the work from some other backend, but that's ok, our
1639  * snapshot is valid as well, we'll just have done some superfluous work.
1640  */
1641  if (rename(tmppath, path) != 0)
1642  {
1643  ereport(ERROR,
1645  errmsg("could not rename file \"%s\" to \"%s\": %m",
1646  tmppath, path)));
1647  }
1648 
1649  /* make sure we persist */
1650  fsync_fname(path, false);
1651  fsync_fname("pg_logical/snapshots", true);
1652 
1653  /*
1654  * Now there's no way we can loose the dumped state anymore, remember this
1655  * as a serialization point.
1656  */
1657  builder->last_serialized_snapshot = lsn;
1658 
1659 out:
1661  builder->last_serialized_snapshot);
1662 }
1663 
1664 /*
1665  * Restore a snapshot into 'builder' if previously one has been stored at the
1666  * location indicated by 'lsn'. Returns true if successful, false otherwise.
1667  */
1668 static bool
1670 {
1671  SnapBuildOnDisk ondisk;
1672  int fd;
1673  char path[MAXPGPATH];
1674  Size sz;
1675  int readBytes;
1676  pg_crc32c checksum;
1677 
1678  /* no point in loading a snapshot if we're already there */
1679  if (builder->state == SNAPBUILD_CONSISTENT)
1680  return false;
1681 
1682  sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1683  (uint32) (lsn >> 32), (uint32) lsn);
1684 
1685  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
1686 
1687  if (fd < 0 && errno == ENOENT)
1688  return false;
1689  else if (fd < 0)
1690  ereport(ERROR,
1692  errmsg("could not open file \"%s\": %m", path)));
1693 
1694  /* ----
1695  * Make sure the snapshot had been stored safely to disk, that's normally
1696  * cheap.
1697  * Note that we do not need PANIC here, nobody will be able to use the
1698  * slot without fsyncing, and saving it won't succeed without an fsync()
1699  * either...
1700  * ----
1701  */
1702  fsync_fname(path, false);
1703  fsync_fname("pg_logical/snapshots", true);
1704 
1705 
1706  /* read statically sized portion of snapshot */
1708  readBytes = read(fd, &ondisk, SnapBuildOnDiskConstantSize);
1710  if (readBytes != SnapBuildOnDiskConstantSize)
1711  {
1712  CloseTransientFile(fd);
1713  ereport(ERROR,
1715  errmsg("could not read file \"%s\", read %d of %d: %m",
1716  path, readBytes, (int) SnapBuildOnDiskConstantSize)));
1717  }
1718 
1719  if (ondisk.magic != SNAPBUILD_MAGIC)
1720  ereport(ERROR,
1721  (errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
1722  path, ondisk.magic, SNAPBUILD_MAGIC)));
1723 
1724  if (ondisk.version != SNAPBUILD_VERSION)
1725  ereport(ERROR,
1726  (errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
1727  path, ondisk.version, SNAPBUILD_VERSION)));
1728 
1729  INIT_CRC32C(checksum);
1730  COMP_CRC32C(checksum,
1731  ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
1733 
1734  /* read SnapBuild */
1736  readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild));
1738  if (readBytes != sizeof(SnapBuild))
1739  {
1740  CloseTransientFile(fd);
1741  ereport(ERROR,
1743  errmsg("could not read file \"%s\", read %d of %d: %m",
1744  path, readBytes, (int) sizeof(SnapBuild))));
1745  }
1746  COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
1747 
1748  /* restore running xacts (dead, but kept for backward compat) */
1749  sz = sizeof(TransactionId) * ondisk.builder.was_running.was_xcnt_space;
1750  ondisk.builder.was_running.was_xip =
1751  MemoryContextAllocZero(builder->context, sz);
1753  readBytes = read(fd, ondisk.builder.was_running.was_xip, sz);
1755  if (readBytes != sz)
1756  {
1757  CloseTransientFile(fd);
1758  ereport(ERROR,
1760  errmsg("could not read file \"%s\", read %d of %d: %m",
1761  path, readBytes, (int) sz)));
1762  }
1763  COMP_CRC32C(checksum, ondisk.builder.was_running.was_xip, sz);
1764 
1765  /* restore committed xacts information */
1766  sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
1767  ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
1769  readBytes = read(fd, ondisk.builder.committed.xip, sz);
1771  if (readBytes != sz)
1772  {
1773  CloseTransientFile(fd);
1774  ereport(ERROR,
1776  errmsg("could not read file \"%s\", read %d of %d: %m",
1777  path, readBytes, (int) sz)));
1778  }
1779  COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
1780 
1781  CloseTransientFile(fd);
1782 
1783  FIN_CRC32C(checksum);
1784 
1785  /* verify checksum of what we've read */
1786  if (!EQ_CRC32C(checksum, ondisk.checksum))
1787  ereport(ERROR,
1789  errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
1790  path, checksum, ondisk.checksum)));
1791 
1792  /*
1793  * ok, we now have a sensible snapshot here, figure out if it has more
1794  * information than we have.
1795  */
1796 
1797  /*
1798  * We are only interested in consistent snapshots for now, comparing
1799  * whether one incomplete snapshot is more "advanced" seems to be
1800  * unnecessarily complex.
1801  */
1802  if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
1803  goto snapshot_not_interesting;
1804 
1805  /*
1806  * Don't use a snapshot that requires an xmin that we cannot guarantee to
1807  * be available.
1808  */
1810  goto snapshot_not_interesting;
1811 
1812 
1813  /* ok, we think the snapshot is sensible, copy over everything important */
1814  builder->xmin = ondisk.builder.xmin;
1815  builder->xmax = ondisk.builder.xmax;
1816  builder->state = ondisk.builder.state;
1817 
1818  builder->committed.xcnt = ondisk.builder.committed.xcnt;
1819  /* We only allocated/stored xcnt, not xcnt_space xids ! */
1820  /* don't overwrite preallocated xip, if we don't have anything here */
1821  if (builder->committed.xcnt > 0)
1822  {
1823  pfree(builder->committed.xip);
1824  builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
1825  builder->committed.xip = ondisk.builder.committed.xip;
1826  }
1827  ondisk.builder.committed.xip = NULL;
1828 
1829  /* our snapshot is not interesting anymore, build a new one */
1830  if (builder->snapshot != NULL)
1831  {
1833  }
1834  builder->snapshot = SnapBuildBuildSnapshot(builder);
1836 
1837  ReorderBufferSetRestartPoint(builder->reorder, lsn);
1838 
1839  Assert(builder->state == SNAPBUILD_CONSISTENT);
1840 
1841  ereport(LOG,
1842  (errmsg("logical decoding found consistent point at %X/%X",
1843  (uint32) (lsn >> 32), (uint32) lsn),
1844  errdetail("Logical decoding will begin using saved snapshot.")));
1845  return true;
1846 
1847 snapshot_not_interesting:
1848  if (ondisk.builder.committed.xip != NULL)
1849  pfree(ondisk.builder.committed.xip);
1850  return false;
1851 }
1852 
1853 /*
1854  * Remove all serialized snapshots that are not required anymore because no
1855  * slot can need them. This doesn't actually have to run during a checkpoint,
1856  * but it's a convenient point to schedule this.
1857  *
1858  * NB: We run this during checkpoints even if logical decoding is disabled so
1859  * we cleanup old slots at some point after it got disabled.
1860  */
1861 void
1863 {
1864  XLogRecPtr cutoff;
1865  XLogRecPtr redo;
1866  DIR *snap_dir;
1867  struct dirent *snap_de;
1868  char path[MAXPGPATH + 21];
1869 
1870  /*
1871  * We start off with a minimum of the last redo pointer. No new
1872  * replication slot will start before that, so that's a safe upper bound
1873  * for removal.
1874  */
1875  redo = GetRedoRecPtr();
1876 
1877  /* now check for the restart ptrs from existing slots */
1879 
1880  /* don't start earlier than the restart lsn */
1881  if (redo < cutoff)
1882  cutoff = redo;
1883 
1884  snap_dir = AllocateDir("pg_logical/snapshots");
1885  while ((snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL)
1886  {
1887  uint32 hi;
1888  uint32 lo;
1889  XLogRecPtr lsn;
1890  struct stat statbuf;
1891 
1892  if (strcmp(snap_de->d_name, ".") == 0 ||
1893  strcmp(snap_de->d_name, "..") == 0)
1894  continue;
1895 
1896  snprintf(path, sizeof(path), "pg_logical/snapshots/%s", snap_de->d_name);
1897 
1898  if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1899  {
1900  elog(DEBUG1, "only regular files expected: %s", path);
1901  continue;
1902  }
1903 
1904  /*
1905  * temporary filenames from SnapBuildSerialize() include the LSN and
1906  * everything but are postfixed by .$pid.tmp. We can just remove them
1907  * the same as other files because there can be none that are
1908  * currently being written that are older than cutoff.
1909  *
1910  * We just log a message if a file doesn't fit the pattern, it's
1911  * probably some editors lock/state file or similar...
1912  */
1913  if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
1914  {
1915  ereport(LOG,
1916  (errmsg("could not parse file name \"%s\"", path)));
1917  continue;
1918  }
1919 
1920  lsn = ((uint64) hi) << 32 | lo;
1921 
1922  /* check whether we still need it */
1923  if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
1924  {
1925  elog(DEBUG1, "removing snapbuild snapshot %s", path);
1926 
1927  /*
1928  * It's not particularly harmful, though strange, if we can't
1929  * remove the file here. Don't prevent the checkpoint from
1930  * completing, that'd be a cure worse than the disease.
1931  */
1932  if (unlink(path) < 0)
1933  {
1934  ereport(LOG,
1936  errmsg("could not remove file \"%s\": %m",
1937  path)));
1938  continue;
1939  }
1940  }
1941  }
1942  FreeDir(snap_dir);
1943 }
#define TransactionIdAdvance(dest)
Definition: transam.h:48
static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
Definition: snapbuild.c:1669
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
uint32 CommandId
Definition: c.h:405
void AbortCurrentTransaction(void)
Definition: xact.c:2992
SnapshotSatisfiesFunc satisfies
Definition: snapshot.h:55
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TransactionId was_xmin
Definition: snapbuild.c:209
TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly)
Definition: procarray.c:2158
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
struct SnapBuild::@24 committed
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:39
static void SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
Definition: snapbuild.c:857
#define SNAPBUILD_VERSION
Definition: snapbuild.c:1455
static void test(void)
bool TransactionIdFollows(TransactionId id1, TransactionId id2)
Definition: transam.c:334
uint32 TransactionId
Definition: c.h:391
bool copied
Definition: snapshot.h:96
static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
Definition: snapbuild.c:1381
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
Definition: snapbuild.c:410
#define SNAPBUILD_MAGIC
Definition: snapbuild.c:1454
void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, int nsubxacts, TransactionId *subxacts)
Definition: snapbuild.c:927
#define DEBUG3
Definition: elog.h:23
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Definition: snapbuild.c:401
bool TransactionIdIsCurrentTransactionId(TransactionId xid)
Definition: xact.c:774
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
Definition: elog.c:850
#define write(a, b, c)
Definition: win32.h:14
TransactionId xmin
Definition: proc.h:225
CommandId combocid
Definition: heapam_xlog.h:348
pg_crc32c checksum
Definition: snapbuild.c:1434
Snapshot snapshot
Definition: snapbuild.c:185
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
uint32 pg_crc32c
Definition: pg_crc32c.h:38
void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
Definition: logical.c:845
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
XLogRecPtr current_restart_decoding_lsn
#define XACT_REPEATABLE_READ
Definition: xact.h:30
bool building_full_snapshot
Definition: snapbuild.c:180
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:567
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
bool TransactionIdFollowsOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:349
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
int errcode(int sqlerrcode)
Definition: elog.c:575
CommandId cmax
Definition: heapam_xlog.h:341
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4473
size_t xcnt_space
Definition: snapbuild.c:227
ItemPointerData target_tid
Definition: heapam_xlog.h:354
char * ExportSnapshot(Snapshot snapshot)
Definition: snapmgr.c:1159
bool suboverflowed
Definition: snapshot.h:93
void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
Definition: snapbuild.c:1189
size_t xcnt
Definition: snapbuild.c:224
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7962
Definition: dirent.h:9
uint32 regd_count
Definition: snapshot.h:110
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1027
static ResourceOwner SavedResourceOwnerDuringExport
Definition: snapbuild.c:258
PGXACT * MyPgXact
Definition: proc.c:68
#define FirstCommandId
Definition: c.h:407
int errdetail_internal(const char *fmt,...)
Definition: elog.c:900
bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
Definition: snapbuild.c:719
void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1721
TransactionId xids[FLEXIBLE_ARRAY_MEMBER]
Definition: standbydefs.h:56
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:635
SnapBuildState state
Definition: snapbuild.c:156
static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
Definition: snapbuild.c:1477
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:949
XLogRecPtr LogStandbySnapshot(void)
Definition: standby.c:909
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
Definition: dirent.c:25
#define ERROR
Definition: elog.h:43
Definition: lmgr.h:26
struct SnapBuild::@23 was_running
TransactionId * was_xip
Definition: snapbuild.c:214
bool FirstSnapshotSet
Definition: snapmgr.c:203
size_t was_xcnt
Definition: snapbuild.c:212
#define MAXPGPATH
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
Definition: snapbuild.c:811
static bool ExportInProgress
Definition: snapbuild.c:259
#define DEBUG2
Definition: elog.h:24
SnapBuild builder
Definition: snapbuild.c:1444
ReorderBuffer * reorder
Definition: snapbuild.c:195
TransactionId initial_xmin_horizon
Definition: snapbuild.c:177
TransactionId * xip
Definition: snapbuild.c:250
void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn, xl_heap_new_cid *xlrec)
Definition: snapbuild.c:769
static void SnapBuildFreeSnapshot(Snapshot snap)
Definition: snapbuild.c:376
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2144
int errdetail(const char *fmt,...)
Definition: elog.c:873
static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder)
Definition: snapbuild.c:466
int errcode_for_file_access(void)
Definition: elog.c:598
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:784
struct SnapshotData SnapshotData
bool includes_all_transactions
Definition: snapbuild.c:234
#define InvalidTransactionId
Definition: transam.h:31
void CheckPointSnapBuild(void)
Definition: snapbuild.c:1862
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
unsigned int uint32
Definition: c.h:258
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2335
void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
Definition: snapbuild.c:1094
TransactionId xmax
Definition: snapshot.h:69
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1244
TransactionId xmin
Definition: snapshot.h:68
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
SnapBuildState
Definition: snapbuild.h:18
#define EQ_CRC32C(c1, c2)
Definition: pg_crc32c.h:42
int unlink(const char *filename)
#define ereport(elevel, rest)
Definition: elog.h:122
XLogRecPtr last_serialized_snapshot
Definition: snapbuild.c:190
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
TransactionId * xip
Definition: snapshot.h:79
int CloseTransientFile(int fd)
Definition: fd.c:2305
TransactionId xmax
Definition: snapbuild.c:165
RelFileNode target_node
Definition: heapam_xlog.h:353
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
void * palloc0(Size size)
Definition: mcxt.c:877
bool HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot, Buffer buffer)
Definition: tqual.c:1674
#define InvalidCommandId
Definition: c.h:408
dlist_head toplevel_by_lsn
TransactionId xid
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:741
void ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
CommandId curcid
Definition: snapshot.h:98
#define SnapBuildOnDiskConstantSize
Definition: snapbuild.c:1449
TransactionId was_xmax
Definition: snapbuild.c:210
int GetMaxSnapshotXidCount(void)
Definition: procarray.c:1456
bool XactReadOnly
Definition: xact.c:77
void FreeSnapshotBuilder(SnapBuild *builder)
Definition: snapbuild.c:357
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
#define Max(x, y)
Definition: c.h:789
void XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid, XLTW_Oper oper)
Definition: lmgr.c:554
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:664
ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *rb)
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2401
void LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
Definition: logical.c:788
void StartTransactionCommand(void)
Definition: xact.c:2681
#define SnapBuildOnDiskNotChecksummedSize
Definition: snapbuild.c:1451
bool takenDuringRecovery
Definition: snapshot.h:95
#define NormalTransactionIdFollows(id1, id2)
Definition: transam.h:67
size_t Size
Definition: c.h:350
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1220
int XactIsoLevel
Definition: xact.c:74
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1117
bool IsTransactionState(void)
Definition: xact.c:351
MemoryContext context
Definition: snapbuild.c:159
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:8255
TransactionId nextXid
Definition: standbydefs.h:52
#define NormalTransactionIdPrecedes(id1, id2)
Definition: transam.h:62
static void SnapBuildStartNextPhaseAt(SnapBuild *builder, TransactionId at)
Definition: snapbuild.c:300
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:962
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:434
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:545
uint32 xcnt
Definition: snapshot.h:80
void * palloc(Size size)
Definition: mcxt.c:848
int errmsg(const char *fmt,...)
Definition: elog.c:797
TransactionId xmin
Definition: snapbuild.c:162
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:706
void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
Definition: snapbuild.c:1464
XLogRecPtr restart_decoding_lsn
TransactionId oldestRunningXid
Definition: standbydefs.h:53
CommandId cmin
Definition: heapam_xlog.h:340
int pg_fsync(int fd)
Definition: fd.c:333
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
char d_name[MAX_PATH]
Definition: dirent.h:14
struct SnapBuildOnDisk SnapBuildOnDisk
#define elog
Definition: elog.h:219
#define qsort(a, b, c, d)
Definition: port.h:443
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:73
static void SnapBuildSnapIncRefcount(Snapshot snap)
Definition: snapbuild.c:422
#define lstat(path, sb)
Definition: win32.h:262
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:78
SnapBuild * AllocateSnapshotBuilder(ReorderBuffer *reorder, TransactionId xmin_horizon, XLogRecPtr start_lsn, bool need_full_snapshot)
Definition: snapbuild.c:316
static TransactionId SnapBuildNextPhaseAt(SnapBuild *builder)
Definition: snapbuild.c:286
XLogRecPtr start_decoding_at
Definition: snapbuild.c:171
static void SnapBuildPurgeCommittedTxn(SnapBuild *builder)
Definition: snapbuild.c:886
#define read(a, b, c)
Definition: win32.h:13
int FreeDir(DIR *dir)
Definition: fd.c:2444
size_t was_xcnt_space
Definition: snapbuild.c:213
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:696
TransactionId * subxip
Definition: snapshot.h:91
uint32 active_count
Definition: snapshot.h:109
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138
int32 subxcnt
Definition: snapshot.h:92
TransactionId top_xid
Definition: heapam_xlog.h:339
void ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid)
Definition: snapbuild.c:675