PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
snapmgr.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * snapmgr.c
4  * PostgreSQL snapshot manager
5  *
6  * We keep track of snapshots in two ways: those "registered" by resowner.c,
7  * and the "active snapshot" stack. All snapshots in either of them live in
8  * persistent memory. When a snapshot is no longer in any of these lists
9  * (tracked by separate refcounts on each snapshot), its memory can be freed.
10  *
11  * The FirstXactSnapshot, if any, is treated a bit specially: we increment its
12  * regd_count and list it in RegisteredSnapshots, but this reference is not
13  * tracked by a resource owner. We used to use the TopTransactionResourceOwner
14  * to track this snapshot reference, but that introduces logical circularity
15  * and thus makes it impossible to clean up in a sane fashion. It's better to
16  * handle this reference as an internally-tracked registration, so that this
17  * module is entirely lower-level than ResourceOwners.
18  *
19  * Likewise, any snapshots that have been exported by pg_export_snapshot
20  * have regd_count = 1 and are listed in RegisteredSnapshots, but are not
21  * tracked by any resource owner.
22  *
23  * Likewise, the CatalogSnapshot is listed in RegisteredSnapshots when it
24  * is valid, but is not tracked by any resource owner.
25  *
26  * The same is true for historic snapshots used during logical decoding,
27  * their lifetime is managed separately (as they live longer than one xact.c
28  * transaction).
29  *
30  * These arrangements let us reset MyPgXact->xmin when there are no snapshots
31  * referenced by this transaction, and advance it when the one with oldest
32  * Xmin is no longer referenced. For simplicity however, only registered
33  * snapshots not active snapshots participate in tracking which one is oldest;
34  * we don't try to change MyPgXact->xmin except when the active-snapshot
35  * stack is empty.
36  *
37  *
38  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
39  * Portions Copyright (c) 1994, Regents of the University of California
40  *
41  * IDENTIFICATION
42  * src/backend/utils/time/snapmgr.c
43  *
44  *-------------------------------------------------------------------------
45  */
46 #include "postgres.h"
47 
48 #include <sys/stat.h>
49 #include <unistd.h>
50 
51 #include "access/transam.h"
52 #include "access/xact.h"
53 #include "access/xlog.h"
54 #include "catalog/catalog.h"
55 #include "lib/pairingheap.h"
56 #include "miscadmin.h"
57 #include "storage/predicate.h"
58 #include "storage/proc.h"
59 #include "storage/procarray.h"
60 #include "storage/sinval.h"
61 #include "storage/sinvaladt.h"
62 #include "storage/spin.h"
63 #include "utils/builtins.h"
64 #include "utils/memutils.h"
65 #include "utils/rel.h"
66 #include "utils/resowner_private.h"
67 #include "utils/snapmgr.h"
68 #include "utils/syscache.h"
69 #include "utils/tqual.h"
70 
71 
72 /*
73  * GUC parameters
74  */
75 int old_snapshot_threshold; /* number of minutes, -1 disables */
76 
77 /*
78  * Structure for dealing with old_snapshot_threshold implementation.
79  */
80 typedef struct OldSnapshotControlData
81 {
82  /*
83  * Variables for old snapshot handling are shared among processes and are
84  * only allowed to move forward.
85  */
86  slock_t mutex_current; /* protect current_timestamp */
87  TimestampTz current_timestamp; /* latest snapshot timestamp */
88  slock_t mutex_latest_xmin; /* protect latest_xmin and next_map_update */
89  TransactionId latest_xmin; /* latest snapshot xmin */
90  TimestampTz next_map_update; /* latest snapshot valid up to */
91  slock_t mutex_threshold; /* protect threshold fields */
92  TimestampTz threshold_timestamp; /* earlier snapshot is old */
93  TransactionId threshold_xid; /* earlier xid may be gone */
94 
95  /*
96  * Keep one xid per minute for old snapshot error handling.
97  *
98  * Use a circular buffer with a head offset, a count of entries currently
99  * used, and a timestamp corresponding to the xid at the head offset. A
100  * count_used value of zero means that there are no times stored; a
101  * count_used value of OLD_SNAPSHOT_TIME_MAP_ENTRIES means that the buffer
102  * is full and the head must be advanced to add new entries. Use
103  * timestamps aligned to minute boundaries, since that seems less
104  * surprising than aligning based on the first usage timestamp. The
105  * latest bucket is effectively stored within latest_xmin. The circular
106  * buffer is updated when we get a new xmin value that doesn't fall into
107  * the same interval.
108  *
109  * It is OK if the xid for a given time slot is from earlier than
110  * calculated by adding the number of minutes corresponding to the
111  * (possibly wrapped) distance from the head offset to the time of the
112  * head entry, since that just results in the vacuuming of old tuples
113  * being slightly less aggressive. It would not be OK for it to be off in
114  * the other direction, since it might result in vacuuming tuples that are
115  * still expected to be there.
116  *
117  * Use of an SLRU was considered but not chosen because it is more
118  * heavyweight than is needed for this, and would probably not be any less
119  * code to implement.
120  *
121  * Persistence is not needed.
122  */
123  int head_offset; /* subscript of oldest tracked time */
124  TimestampTz head_timestamp; /* time corresponding to head xid */
125  int count_used; /* how many slots are in use */
126  TransactionId xid_by_minute[FLEXIBLE_ARRAY_MEMBER];
128 
130 
131 
132 /*
133  * CurrentSnapshot points to the only snapshot taken in transaction-snapshot
134  * mode, and to the latest one taken in a read-committed transaction.
135  * SecondarySnapshot is a snapshot that's always up-to-date as of the current
136  * instant, even in transaction-snapshot mode. It should only be used for
137  * special-purpose code (say, RI checking.) CatalogSnapshot points to an
138  * MVCC snapshot intended to be used for catalog scans; we must invalidate it
139  * whenever a system catalog change occurs.
140  *
141  * These SnapshotData structs are static to simplify memory allocation
142  * (see the hack in GetSnapshotData to avoid repeated malloc/free).
143  */
147 
148 /* Pointers to valid snapshots */
153 
154 /*
155  * These are updated by GetSnapshotData. We initialize them this way
156  * for the convenience of TransactionIdIsInProgress: even in bootstrap
157  * mode, we don't want it to say that BootstrapTransactionId is in progress.
158  *
159  * RecentGlobalXmin and RecentGlobalDataXmin are initialized to
160  * InvalidTransactionId, to ensure that no one tries to use a stale
161  * value. Readers should ensure that it has been set to something else
162  * before using it.
163  */
168 
169 /* (table, ctid) => (cmin, cmax) mapping during timetravel */
171 
172 /*
173  * Elements of the active snapshot stack.
174  *
175  * Each element here accounts for exactly one active_count on SnapshotData.
176  *
177  * NB: the code assumes that elements in this list are in non-increasing
178  * order of as_level; also, the list must be NULL-terminated.
179  */
180 typedef struct ActiveSnapshotElt
181 {
183  int as_level;
186 
187 /* Top of the stack of active snapshots */
189 
190 /* Bottom of the stack of active snapshots */
192 
193 /*
194  * Currently registered Snapshots. Ordered in a heap by xmin, so that we can
195  * quickly find the one with lowest xmin, to advance our MyPgXact->xmin.
196  */
197 static int xmin_cmp(const pairingheap_node *a, const pairingheap_node *b,
198  void *arg);
199 
201 
202 /* first GetTransactionSnapshot call in a transaction? */
203 bool FirstSnapshotSet = false;
204 
205 /*
206  * Remember the serializable transaction snapshot, if any. We cannot trust
207  * FirstSnapshotSet in combination with IsolationUsesXactSnapshot(), because
208  * GUC may be reset before us, changing the value of IsolationUsesXactSnapshot.
209  */
211 
212 /* Define pathname of exported-snapshot files */
213 #define SNAPSHOT_EXPORT_DIR "pg_snapshots"
214 
215 /* Structure holding info about exported snapshot. */
216 typedef struct ExportedSnapshot
217 {
218  char *snapfile;
221 
222 /* Current xact's exported snapshots (a list of ExportedSnapshot structs) */
224 
225 /* Prototypes for local functions */
227 static Snapshot CopySnapshot(Snapshot snapshot);
228 static void FreeSnapshot(Snapshot snapshot);
229 static void SnapshotResetXmin(void);
230 
231 /*
232  * Snapshot fields to be serialized.
233  *
234  * Only these fields need to be sent to the cooperating backend; the
235  * remaining ones can (and must) be set by the receiver upon restore.
236  */
238 {
249 
250 Size
252 {
253  Size size;
254 
255  size = offsetof(OldSnapshotControlData, xid_by_minute);
256  if (old_snapshot_threshold > 0)
257  size = add_size(size, mul_size(sizeof(TransactionId),
259 
260  return size;
261 }
262 
263 /*
264  * Initialize for managing old snapshot detection.
265  */
266 void
268 {
269  bool found;
270 
271  /*
272  * Create or attach to the OldSnapshotControlData structure.
273  */
274  oldSnapshotControl = (volatile OldSnapshotControlData *)
275  ShmemInitStruct("OldSnapshotControlData",
276  SnapMgrShmemSize(), &found);
277 
278  if (!found)
279  {
280  SpinLockInit(&oldSnapshotControl->mutex_current);
281  oldSnapshotControl->current_timestamp = 0;
282  SpinLockInit(&oldSnapshotControl->mutex_latest_xmin);
283  oldSnapshotControl->latest_xmin = InvalidTransactionId;
284  oldSnapshotControl->next_map_update = 0;
285  SpinLockInit(&oldSnapshotControl->mutex_threshold);
286  oldSnapshotControl->threshold_timestamp = 0;
287  oldSnapshotControl->threshold_xid = InvalidTransactionId;
288  oldSnapshotControl->head_offset = 0;
289  oldSnapshotControl->head_timestamp = 0;
290  oldSnapshotControl->count_used = 0;
291  }
292 }
293 
294 /*
295  * GetTransactionSnapshot
296  * Get the appropriate snapshot for a new query in a transaction.
297  *
298  * Note that the return value may point at static storage that will be modified
299  * by future calls and by CommandCounterIncrement(). Callers should call
300  * RegisterSnapshot or PushActiveSnapshot on the returned snap if it is to be
301  * used very long.
302  */
303 Snapshot
305 {
306  /*
307  * Return historic snapshot if doing logical decoding. We'll never need a
308  * non-historic transaction snapshot in this (sub-)transaction, so there's
309  * no need to be careful to set one up for later calls to
310  * GetTransactionSnapshot().
311  */
313  {
315  return HistoricSnapshot;
316  }
317 
318  /* First call in transaction? */
319  if (!FirstSnapshotSet)
320  {
321  /*
322  * Don't allow catalog snapshot to be older than xact snapshot. Must
323  * do this first to allow the empty-heap Assert to succeed.
324  */
326 
327  Assert(pairingheap_is_empty(&RegisteredSnapshots));
328  Assert(FirstXactSnapshot == NULL);
329 
330  if (IsInParallelMode())
331  elog(ERROR,
332  "cannot take query snapshot during a parallel operation");
333 
334  /*
335  * In transaction-snapshot mode, the first snapshot must live until
336  * end of xact regardless of what the caller does with it, so we must
337  * make a copy of it rather than returning CurrentSnapshotData
338  * directly. Furthermore, if we're running in serializable mode,
339  * predicate.c needs to wrap the snapshot fetch in its own processing.
340  */
342  {
343  /* First, create the snapshot in CurrentSnapshotData */
345  CurrentSnapshot = GetSerializableTransactionSnapshot(&CurrentSnapshotData);
346  else
347  CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
348  /* Make a saved copy */
349  CurrentSnapshot = CopySnapshot(CurrentSnapshot);
350  FirstXactSnapshot = CurrentSnapshot;
351  /* Mark it as "registered" in FirstXactSnapshot */
352  FirstXactSnapshot->regd_count++;
353  pairingheap_add(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
354  }
355  else
356  CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
357 
358  FirstSnapshotSet = true;
359  return CurrentSnapshot;
360  }
361 
363  return CurrentSnapshot;
364 
365  /* Don't allow catalog snapshot to be older than xact snapshot. */
367 
368  CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
369 
370  return CurrentSnapshot;
371 }
372 
373 /*
374  * GetLatestSnapshot
375  * Get a snapshot that is up-to-date as of the current instant,
376  * even if we are executing in transaction-snapshot mode.
377  */
378 Snapshot
380 {
381  /*
382  * We might be able to relax this, but nothing that could otherwise work
383  * needs it.
384  */
385  if (IsInParallelMode())
386  elog(ERROR,
387  "cannot update SecondarySnapshot during a parallel operation");
388 
389  /*
390  * So far there are no cases requiring support for GetLatestSnapshot()
391  * during logical decoding, but it wouldn't be hard to add if required.
392  */
394 
395  /* If first call in transaction, go ahead and set the xact snapshot */
396  if (!FirstSnapshotSet)
397  return GetTransactionSnapshot();
398 
399  SecondarySnapshot = GetSnapshotData(&SecondarySnapshotData);
400 
401  return SecondarySnapshot;
402 }
403 
404 /*
405  * GetOldestSnapshot
406  *
407  * Get the transaction's oldest known snapshot, as judged by the LSN.
408  * Will return NULL if there are no active or registered snapshots.
409  */
410 Snapshot
412 {
413  Snapshot OldestRegisteredSnapshot = NULL;
414  XLogRecPtr RegisteredLSN = InvalidXLogRecPtr;
415 
416  if (!pairingheap_is_empty(&RegisteredSnapshots))
417  {
418  OldestRegisteredSnapshot = pairingheap_container(SnapshotData, ph_node,
419  pairingheap_first(&RegisteredSnapshots));
420  RegisteredLSN = OldestRegisteredSnapshot->lsn;
421  }
422 
423  if (OldestActiveSnapshot != NULL)
424  {
425  XLogRecPtr ActiveLSN = OldestActiveSnapshot->as_snap->lsn;
426 
427  if (XLogRecPtrIsInvalid(RegisteredLSN) || RegisteredLSN > ActiveLSN)
428  return OldestActiveSnapshot->as_snap;
429  }
430 
431  return OldestRegisteredSnapshot;
432 }
433 
434 /*
435  * GetCatalogSnapshot
436  * Get a snapshot that is sufficiently up-to-date for scan of the
437  * system catalog with the specified OID.
438  */
439 Snapshot
441 {
442  /*
443  * Return historic snapshot while we're doing logical decoding, so we can
444  * see the appropriate state of the catalog.
445  *
446  * This is the primary reason for needing to reset the system caches after
447  * finishing decoding.
448  */
450  return HistoricSnapshot;
451 
452  return GetNonHistoricCatalogSnapshot(relid);
453 }
454 
455 /*
456  * GetNonHistoricCatalogSnapshot
457  * Get a snapshot that is sufficiently up-to-date for scan of the system
458  * catalog with the specified OID, even while historic snapshots are set
459  * up.
460  */
461 Snapshot
463 {
464  /*
465  * If the caller is trying to scan a relation that has no syscache, no
466  * catcache invalidations will be sent when it is updated. For a few key
467  * relations, snapshot invalidations are sent instead. If we're trying to
468  * scan a relation for which neither catcache nor snapshot invalidations
469  * are sent, we must refresh the snapshot every time.
470  */
471  if (CatalogSnapshot &&
473  !RelationHasSysCache(relid))
475 
476  if (CatalogSnapshot == NULL)
477  {
478  /* Get new snapshot. */
479  CatalogSnapshot = GetSnapshotData(&CatalogSnapshotData);
480 
481  /*
482  * Make sure the catalog snapshot will be accounted for in decisions
483  * about advancing PGXACT->xmin. We could apply RegisterSnapshot, but
484  * that would result in making a physical copy, which is overkill; and
485  * it would also create a dependency on some resource owner, which we
486  * do not want for reasons explained at the head of this file. Instead
487  * just shove the CatalogSnapshot into the pairing heap manually. This
488  * has to be reversed in InvalidateCatalogSnapshot, of course.
489  *
490  * NB: it had better be impossible for this to throw error, since the
491  * CatalogSnapshot pointer is already valid.
492  */
493  pairingheap_add(&RegisteredSnapshots, &CatalogSnapshot->ph_node);
494  }
495 
496  return CatalogSnapshot;
497 }
498 
499 /*
500  * InvalidateCatalogSnapshot
501  * Mark the current catalog snapshot, if any, as invalid
502  *
503  * We could change this API to allow the caller to provide more fine-grained
504  * invalidation details, so that a change to relation A wouldn't prevent us
505  * from using our cached snapshot to scan relation B, but so far there's no
506  * evidence that the CPU cycles we spent tracking such fine details would be
507  * well-spent.
508  */
509 void
511 {
512  if (CatalogSnapshot)
513  {
514  pairingheap_remove(&RegisteredSnapshots, &CatalogSnapshot->ph_node);
515  CatalogSnapshot = NULL;
517  }
518 }
519 
520 /*
521  * InvalidateCatalogSnapshotConditionally
522  * Drop catalog snapshot if it's the only one we have
523  *
524  * This is called when we are about to wait for client input, so we don't
525  * want to continue holding the catalog snapshot if it might mean that the
526  * global xmin horizon can't advance. However, if there are other snapshots
527  * still active or registered, the catalog snapshot isn't likely to be the
528  * oldest one, so we might as well keep it.
529  */
530 void
532 {
533  if (CatalogSnapshot &&
534  ActiveSnapshot == NULL &&
535  pairingheap_is_singular(&RegisteredSnapshots))
537 }
538 
539 /*
540  * SnapshotSetCommandId
541  * Propagate CommandCounterIncrement into the static snapshots, if set
542  */
543 void
545 {
546  if (!FirstSnapshotSet)
547  return;
548 
549  if (CurrentSnapshot)
550  CurrentSnapshot->curcid = curcid;
551  if (SecondarySnapshot)
552  SecondarySnapshot->curcid = curcid;
553  /* Should we do the same with CatalogSnapshot? */
554 }
555 
556 /*
557  * SetTransactionSnapshot
558  * Set the transaction's snapshot from an imported MVCC snapshot.
559  *
560  * Note that this is very closely tied to GetTransactionSnapshot --- it
561  * must take care of all the same considerations as the first-snapshot case
562  * in GetTransactionSnapshot.
563  */
564 static void
566  int sourcepid, PGPROC *sourceproc)
567 {
568  /* Caller should have checked this already */
570 
571  /* Better do this to ensure following Assert succeeds. */
573 
574  Assert(pairingheap_is_empty(&RegisteredSnapshots));
575  Assert(FirstXactSnapshot == NULL);
577 
578  /*
579  * Even though we are not going to use the snapshot it computes, we must
580  * call GetSnapshotData, for two reasons: (1) to be sure that
581  * CurrentSnapshotData's XID arrays have been allocated, and (2) to update
582  * RecentXmin and RecentGlobalXmin. (We could alternatively include those
583  * two variables in exported snapshot files, but it seems better to have
584  * snapshot importers compute reasonably up-to-date values for them.)
585  */
586  CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
587 
588  /*
589  * Now copy appropriate fields from the source snapshot.
590  */
591  CurrentSnapshot->xmin = sourcesnap->xmin;
592  CurrentSnapshot->xmax = sourcesnap->xmax;
593  CurrentSnapshot->xcnt = sourcesnap->xcnt;
594  Assert(sourcesnap->xcnt <= GetMaxSnapshotXidCount());
595  memcpy(CurrentSnapshot->xip, sourcesnap->xip,
596  sourcesnap->xcnt * sizeof(TransactionId));
597  CurrentSnapshot->subxcnt = sourcesnap->subxcnt;
598  Assert(sourcesnap->subxcnt <= GetMaxSnapshotSubxidCount());
599  memcpy(CurrentSnapshot->subxip, sourcesnap->subxip,
600  sourcesnap->subxcnt * sizeof(TransactionId));
601  CurrentSnapshot->suboverflowed = sourcesnap->suboverflowed;
602  CurrentSnapshot->takenDuringRecovery = sourcesnap->takenDuringRecovery;
603  /* NB: curcid should NOT be copied, it's a local matter */
604 
605  /*
606  * Now we have to fix what GetSnapshotData did with MyPgXact->xmin and
607  * TransactionXmin. There is a race condition: to make sure we are not
608  * causing the global xmin to go backwards, we have to test that the
609  * source transaction is still running, and that has to be done
610  * atomically. So let procarray.c do it.
611  *
612  * Note: in serializable mode, predicate.c will do this a second time. It
613  * doesn't seem worth contorting the logic here to avoid two calls,
614  * especially since it's not clear that predicate.c *must* do this.
615  */
616  if (sourceproc != NULL)
617  {
618  if (!ProcArrayInstallRestoredXmin(CurrentSnapshot->xmin, sourceproc))
619  ereport(ERROR,
620  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
621  errmsg("could not import the requested snapshot"),
622  errdetail("The source transaction is not running anymore.")));
623  }
624  else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcevxid))
625  ereport(ERROR,
626  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
627  errmsg("could not import the requested snapshot"),
628  errdetail("The source process with pid %d is not running anymore.",
629  sourcepid)));
630 
631  /*
632  * In transaction-snapshot mode, the first snapshot must live until end of
633  * xact, so we must make a copy of it. Furthermore, if we're running in
634  * serializable mode, predicate.c needs to do its own processing.
635  */
637  {
639  SetSerializableTransactionSnapshot(CurrentSnapshot, sourcevxid,
640  sourcepid);
641  /* Make a saved copy */
642  CurrentSnapshot = CopySnapshot(CurrentSnapshot);
643  FirstXactSnapshot = CurrentSnapshot;
644  /* Mark it as "registered" in FirstXactSnapshot */
645  FirstXactSnapshot->regd_count++;
646  pairingheap_add(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
647  }
648 
649  FirstSnapshotSet = true;
650 }
651 
652 /*
653  * CopySnapshot
654  * Copy the given snapshot.
655  *
656  * The copy is palloc'd in TopTransactionContext and has initial refcounts set
657  * to 0. The returned snapshot has the copied flag set.
658  */
659 static Snapshot
661 {
662  Snapshot newsnap;
663  Size subxipoff;
664  Size size;
665 
666  Assert(snapshot != InvalidSnapshot);
667 
668  /* We allocate any XID arrays needed in the same palloc block. */
669  size = subxipoff = sizeof(SnapshotData) +
670  snapshot->xcnt * sizeof(TransactionId);
671  if (snapshot->subxcnt > 0)
672  size += snapshot->subxcnt * sizeof(TransactionId);
673 
675  memcpy(newsnap, snapshot, sizeof(SnapshotData));
676 
677  newsnap->regd_count = 0;
678  newsnap->active_count = 0;
679  newsnap->copied = true;
680 
681  /* setup XID array */
682  if (snapshot->xcnt > 0)
683  {
684  newsnap->xip = (TransactionId *) (newsnap + 1);
685  memcpy(newsnap->xip, snapshot->xip,
686  snapshot->xcnt * sizeof(TransactionId));
687  }
688  else
689  newsnap->xip = NULL;
690 
691  /*
692  * Setup subXID array. Don't bother to copy it if it had overflowed,
693  * though, because it's not used anywhere in that case. Except if it's a
694  * snapshot taken during recovery; all the top-level XIDs are in subxip as
695  * well in that case, so we mustn't lose them.
696  */
697  if (snapshot->subxcnt > 0 &&
698  (!snapshot->suboverflowed || snapshot->takenDuringRecovery))
699  {
700  newsnap->subxip = (TransactionId *) ((char *) newsnap + subxipoff);
701  memcpy(newsnap->subxip, snapshot->subxip,
702  snapshot->subxcnt * sizeof(TransactionId));
703  }
704  else
705  newsnap->subxip = NULL;
706 
707  return newsnap;
708 }
709 
710 /*
711  * FreeSnapshot
712  * Free the memory associated with a snapshot.
713  */
714 static void
716 {
717  Assert(snapshot->regd_count == 0);
718  Assert(snapshot->active_count == 0);
719  Assert(snapshot->copied);
720 
721  pfree(snapshot);
722 }
723 
724 /*
725  * PushActiveSnapshot
726  * Set the given snapshot as the current active snapshot
727  *
728  * If the passed snapshot is a statically-allocated one, or it is possibly
729  * subject to a future command counter update, create a new long-lived copy
730  * with active refcount=1. Otherwise, only increment the refcount.
731  */
732 void
734 {
735  ActiveSnapshotElt *newactive;
736 
737  Assert(snap != InvalidSnapshot);
738 
740 
741  /*
742  * Checking SecondarySnapshot is probably useless here, but it seems
743  * better to be sure.
744  */
745  if (snap == CurrentSnapshot || snap == SecondarySnapshot || !snap->copied)
746  newactive->as_snap = CopySnapshot(snap);
747  else
748  newactive->as_snap = snap;
749 
750  newactive->as_next = ActiveSnapshot;
752 
753  newactive->as_snap->active_count++;
754 
755  ActiveSnapshot = newactive;
756  if (OldestActiveSnapshot == NULL)
757  OldestActiveSnapshot = ActiveSnapshot;
758 }
759 
760 /*
761  * PushCopiedSnapshot
762  * As above, except forcibly copy the presented snapshot.
763  *
764  * This should be used when the ActiveSnapshot has to be modifiable, for
765  * example if the caller intends to call UpdateActiveSnapshotCommandId.
766  * The new snapshot will be released when popped from the stack.
767  */
768 void
770 {
771  PushActiveSnapshot(CopySnapshot(snapshot));
772 }
773 
774 /*
775  * UpdateActiveSnapshotCommandId
776  *
777  * Update the current CID of the active snapshot. This can only be applied
778  * to a snapshot that is not referenced elsewhere.
779  */
780 void
782 {
783  CommandId save_curcid,
784  curcid;
785 
786  Assert(ActiveSnapshot != NULL);
787  Assert(ActiveSnapshot->as_snap->active_count == 1);
788  Assert(ActiveSnapshot->as_snap->regd_count == 0);
789 
790  /*
791  * Don't allow modification of the active snapshot during parallel
792  * operation. We share the snapshot to worker backends at the beginning
793  * of parallel operation, so any change to the snapshot can lead to
794  * inconsistencies. We have other defenses against
795  * CommandCounterIncrement, but there are a few places that call this
796  * directly, so we put an additional guard here.
797  */
798  save_curcid = ActiveSnapshot->as_snap->curcid;
799  curcid = GetCurrentCommandId(false);
800  if (IsInParallelMode() && save_curcid != curcid)
801  elog(ERROR, "cannot modify commandid in active snapshot during a parallel operation");
802  ActiveSnapshot->as_snap->curcid = curcid;
803 }
804 
805 /*
806  * PopActiveSnapshot
807  *
808  * Remove the topmost snapshot from the active snapshot stack, decrementing the
809  * reference count, and free it if this was the last reference.
810  */
811 void
813 {
814  ActiveSnapshotElt *newstack;
815 
816  newstack = ActiveSnapshot->as_next;
817 
818  Assert(ActiveSnapshot->as_snap->active_count > 0);
819 
820  ActiveSnapshot->as_snap->active_count--;
821 
822  if (ActiveSnapshot->as_snap->active_count == 0 &&
823  ActiveSnapshot->as_snap->regd_count == 0)
824  FreeSnapshot(ActiveSnapshot->as_snap);
825 
826  pfree(ActiveSnapshot);
827  ActiveSnapshot = newstack;
828  if (ActiveSnapshot == NULL)
829  OldestActiveSnapshot = NULL;
830 
832 }
833 
834 /*
835  * GetActiveSnapshot
836  * Return the topmost snapshot in the Active stack.
837  */
838 Snapshot
840 {
841  Assert(ActiveSnapshot != NULL);
842 
843  return ActiveSnapshot->as_snap;
844 }
845 
846 /*
847  * ActiveSnapshotSet
848  * Return whether there is at least one snapshot in the Active stack
849  */
850 bool
852 {
853  return ActiveSnapshot != NULL;
854 }
855 
856 /*
857  * RegisterSnapshot
858  * Register a snapshot as being in use by the current resource owner
859  *
860  * If InvalidSnapshot is passed, it is not registered.
861  */
862 Snapshot
864 {
865  if (snapshot == InvalidSnapshot)
866  return InvalidSnapshot;
867 
869 }
870 
871 /*
872  * RegisterSnapshotOnOwner
873  * As above, but use the specified resource owner
874  */
875 Snapshot
877 {
878  Snapshot snap;
879 
880  if (snapshot == InvalidSnapshot)
881  return InvalidSnapshot;
882 
883  /* Static snapshot? Create a persistent copy */
884  snap = snapshot->copied ? snapshot : CopySnapshot(snapshot);
885 
886  /* and tell resowner.c about it */
888  snap->regd_count++;
889  ResourceOwnerRememberSnapshot(owner, snap);
890 
891  if (snap->regd_count == 1)
892  pairingheap_add(&RegisteredSnapshots, &snap->ph_node);
893 
894  return snap;
895 }
896 
897 /*
898  * UnregisterSnapshot
899  *
900  * Decrement the reference count of a snapshot, remove the corresponding
901  * reference from CurrentResourceOwner, and free the snapshot if no more
902  * references remain.
903  */
904 void
906 {
907  if (snapshot == NULL)
908  return;
909 
911 }
912 
913 /*
914  * UnregisterSnapshotFromOwner
915  * As above, but use the specified resource owner
916  */
917 void
919 {
920  if (snapshot == NULL)
921  return;
922 
923  Assert(snapshot->regd_count > 0);
924  Assert(!pairingheap_is_empty(&RegisteredSnapshots));
925 
926  ResourceOwnerForgetSnapshot(owner, snapshot);
927 
928  snapshot->regd_count--;
929  if (snapshot->regd_count == 0)
930  pairingheap_remove(&RegisteredSnapshots, &snapshot->ph_node);
931 
932  if (snapshot->regd_count == 0 && snapshot->active_count == 0)
933  {
934  FreeSnapshot(snapshot);
936  }
937 }
938 
939 /*
940  * Comparison function for RegisteredSnapshots heap. Snapshots are ordered
941  * by xmin, so that the snapshot with smallest xmin is at the top.
942  */
943 static int
944 xmin_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
945 {
946  const SnapshotData *asnap = pairingheap_const_container(SnapshotData, ph_node, a);
947  const SnapshotData *bsnap = pairingheap_const_container(SnapshotData, ph_node, b);
948 
949  if (TransactionIdPrecedes(asnap->xmin, bsnap->xmin))
950  return 1;
951  else if (TransactionIdFollows(asnap->xmin, bsnap->xmin))
952  return -1;
953  else
954  return 0;
955 }
956 
957 /*
958  * SnapshotResetXmin
959  *
960  * If there are no more snapshots, we can reset our PGXACT->xmin to InvalidXid.
961  * Note we can do this without locking because we assume that storing an Xid
962  * is atomic.
963  *
964  * Even if there are some remaining snapshots, we may be able to advance our
965  * PGXACT->xmin to some degree. This typically happens when a portal is
966  * dropped. For efficiency, we only consider recomputing PGXACT->xmin when
967  * the active snapshot stack is empty; this allows us not to need to track
968  * which active snapshot is oldest.
969  *
970  * Note: it's tempting to use GetOldestSnapshot() here so that we can include
971  * active snapshots in the calculation. However, that compares by LSN not
972  * xmin so it's not entirely clear that it's the same thing. Also, we'd be
973  * critically dependent on the assumption that the bottommost active snapshot
974  * stack entry has the oldest xmin. (Current uses of GetOldestSnapshot() are
975  * not actually critical, but this would be.)
976  */
977 static void
979 {
980  Snapshot minSnapshot;
981 
982  if (ActiveSnapshot != NULL)
983  return;
984 
985  if (pairingheap_is_empty(&RegisteredSnapshots))
986  {
988  return;
989  }
990 
991  minSnapshot = pairingheap_container(SnapshotData, ph_node,
992  pairingheap_first(&RegisteredSnapshots));
993 
994  if (TransactionIdPrecedes(MyPgXact->xmin, minSnapshot->xmin))
995  MyPgXact->xmin = minSnapshot->xmin;
996 }
997 
998 /*
999  * AtSubCommit_Snapshot
1000  */
1001 void
1003 {
1004  ActiveSnapshotElt *active;
1005 
1006  /*
1007  * Relabel the active snapshots set in this subtransaction as though they
1008  * are owned by the parent subxact.
1009  */
1010  for (active = ActiveSnapshot; active != NULL; active = active->as_next)
1011  {
1012  if (active->as_level < level)
1013  break;
1014  active->as_level = level - 1;
1015  }
1016 }
1017 
1018 /*
1019  * AtSubAbort_Snapshot
1020  * Clean up snapshots after a subtransaction abort
1021  */
1022 void
1024 {
1025  /* Forget the active snapshots set by this subtransaction */
1026  while (ActiveSnapshot && ActiveSnapshot->as_level >= level)
1027  {
1029 
1030  next = ActiveSnapshot->as_next;
1031 
1032  /*
1033  * Decrement the snapshot's active count. If it's still registered or
1034  * marked as active by an outer subtransaction, we can't free it yet.
1035  */
1036  Assert(ActiveSnapshot->as_snap->active_count >= 1);
1037  ActiveSnapshot->as_snap->active_count -= 1;
1038 
1039  if (ActiveSnapshot->as_snap->active_count == 0 &&
1040  ActiveSnapshot->as_snap->regd_count == 0)
1041  FreeSnapshot(ActiveSnapshot->as_snap);
1042 
1043  /* and free the stack element */
1044  pfree(ActiveSnapshot);
1045 
1046  ActiveSnapshot = next;
1047  if (ActiveSnapshot == NULL)
1048  OldestActiveSnapshot = NULL;
1049  }
1050 
1052 }
1053 
1054 /*
1055  * AtEOXact_Snapshot
1056  * Snapshot manager's cleanup function for end of transaction
1057  */
1058 void
1059 AtEOXact_Snapshot(bool isCommit, bool resetXmin)
1060 {
1061  /*
1062  * In transaction-snapshot mode we must release our privately-managed
1063  * reference to the transaction snapshot. We must remove it from
1064  * RegisteredSnapshots to keep the check below happy. But we don't bother
1065  * to do FreeSnapshot, for two reasons: the memory will go away with
1066  * TopTransactionContext anyway, and if someone has left the snapshot
1067  * stacked as active, we don't want the code below to be chasing through a
1068  * dangling pointer.
1069  */
1070  if (FirstXactSnapshot != NULL)
1071  {
1072  Assert(FirstXactSnapshot->regd_count > 0);
1073  Assert(!pairingheap_is_empty(&RegisteredSnapshots));
1074  pairingheap_remove(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
1075  }
1076  FirstXactSnapshot = NULL;
1077 
1078  /*
1079  * If we exported any snapshots, clean them up.
1080  */
1081  if (exportedSnapshots != NIL)
1082  {
1083  ListCell *lc;
1084 
1085  /*
1086  * Get rid of the files. Unlink failure is only a WARNING because (1)
1087  * it's too late to abort the transaction, and (2) leaving a leaked
1088  * file around has little real consequence anyway.
1089  *
1090  * We also also need to remove the snapshots from RegisteredSnapshots
1091  * to prevent a warning below.
1092  *
1093  * As with the FirstXactSnapshot, we don't need to free resources of
1094  * the snapshot iself as it will go away with the memory context.
1095  */
1096  foreach(lc, exportedSnapshots)
1097  {
1098  ExportedSnapshot *esnap = (ExportedSnapshot *) lfirst(lc);
1099 
1100  if (unlink(esnap->snapfile))
1101  elog(WARNING, "could not unlink file \"%s\": %m",
1102  esnap->snapfile);
1103 
1104  pairingheap_remove(&RegisteredSnapshots,
1105  &esnap->snapshot->ph_node);
1106  }
1107 
1108  exportedSnapshots = NIL;
1109  }
1110 
1111  /* Drop catalog snapshot if any */
1113 
1114  /* On commit, complain about leftover snapshots */
1115  if (isCommit)
1116  {
1117  ActiveSnapshotElt *active;
1118 
1119  if (!pairingheap_is_empty(&RegisteredSnapshots))
1120  elog(WARNING, "registered snapshots seem to remain after cleanup");
1121 
1122  /* complain about unpopped active snapshots */
1123  for (active = ActiveSnapshot; active != NULL; active = active->as_next)
1124  elog(WARNING, "snapshot %p still active", active);
1125  }
1126 
1127  /*
1128  * And reset our state. We don't need to free the memory explicitly --
1129  * it'll go away with TopTransactionContext.
1130  */
1131  ActiveSnapshot = NULL;
1132  OldestActiveSnapshot = NULL;
1133  pairingheap_reset(&RegisteredSnapshots);
1134 
1135  CurrentSnapshot = NULL;
1136  SecondarySnapshot = NULL;
1137 
1138  FirstSnapshotSet = false;
1139 
1140  /*
1141  * During normal commit processing, we call ProcArrayEndTransaction() to
1142  * reset the PgXact->xmin. That call happens prior to the call to
1143  * AtEOXact_Snapshot(), so we need not touch xmin here at all.
1144  */
1145  if (resetXmin)
1147 
1148  Assert(resetXmin || MyPgXact->xmin == 0);
1149 }
1150 
1151 
1152 /*
1153  * ExportSnapshot
1154  * Export the snapshot to a file so that other backends can import it.
1155  * Returns the token (the file name) that can be used to import this
1156  * snapshot.
1157  */
1158 char *
1160 {
1161  TransactionId topXid;
1162  TransactionId *children;
1163  ExportedSnapshot *esnap;
1164  int nchildren;
1165  int addTopXid;
1167  FILE *f;
1168  int i;
1169  MemoryContext oldcxt;
1170  char path[MAXPGPATH];
1171  char pathtmp[MAXPGPATH];
1172 
1173  /*
1174  * It's tempting to call RequireTransactionChain here, since it's not very
1175  * useful to export a snapshot that will disappear immediately afterwards.
1176  * However, we haven't got enough information to do that, since we don't
1177  * know if we're at top level or not. For example, we could be inside a
1178  * plpgsql function that is going to fire off other transactions via
1179  * dblink. Rather than disallow perfectly legitimate usages, don't make a
1180  * check.
1181  *
1182  * Also note that we don't make any restriction on the transaction's
1183  * isolation level; however, importers must check the level if they are
1184  * serializable.
1185  */
1186 
1187  /*
1188  * Get our transaction ID if there is one, to include in the snapshot.
1189  */
1190  topXid = GetTopTransactionIdIfAny();
1191 
1192  /*
1193  * We cannot export a snapshot from a subtransaction because there's no
1194  * easy way for importers to verify that the same subtransaction is still
1195  * running.
1196  */
1197  if (IsSubTransaction())
1198  ereport(ERROR,
1199  (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
1200  errmsg("cannot export a snapshot from a subtransaction")));
1201 
1202  /*
1203  * We do however allow previous committed subtransactions to exist.
1204  * Importers of the snapshot must see them as still running, so get their
1205  * XIDs to add them to the snapshot.
1206  */
1207  nchildren = xactGetCommittedChildren(&children);
1208 
1209  /*
1210  * Generate file path for the snapshot. We start numbering of snapshots
1211  * inside the transaction from 1.
1212  */
1213  snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%08X-%d",
1214  MyProc->backendId, MyProc->lxid, list_length(exportedSnapshots) + 1);
1215 
1216  /*
1217  * Copy the snapshot into TopTransactionContext, add it to the
1218  * exportedSnapshots list, and mark it pseudo-registered. We do this to
1219  * ensure that the snapshot's xmin is honored for the rest of the
1220  * transaction.
1221  */
1222  snapshot = CopySnapshot(snapshot);
1223 
1225  esnap = (ExportedSnapshot *) palloc(sizeof(ExportedSnapshot));
1226  esnap->snapfile = pstrdup(path);
1227  esnap->snapshot = snapshot;
1228  exportedSnapshots = lappend(exportedSnapshots, esnap);
1229  MemoryContextSwitchTo(oldcxt);
1230 
1231  snapshot->regd_count++;
1232  pairingheap_add(&RegisteredSnapshots, &snapshot->ph_node);
1233 
1234  /*
1235  * Fill buf with a text serialization of the snapshot, plus identification
1236  * data about this transaction. The format expected by ImportSnapshot is
1237  * pretty rigid: each line must be fieldname:value.
1238  */
1239  initStringInfo(&buf);
1240 
1241  appendStringInfo(&buf, "vxid:%d/%u\n", MyProc->backendId, MyProc->lxid);
1242  appendStringInfo(&buf, "pid:%d\n", MyProcPid);
1243  appendStringInfo(&buf, "dbid:%u\n", MyDatabaseId);
1244  appendStringInfo(&buf, "iso:%d\n", XactIsoLevel);
1245  appendStringInfo(&buf, "ro:%d\n", XactReadOnly);
1246 
1247  appendStringInfo(&buf, "xmin:%u\n", snapshot->xmin);
1248  appendStringInfo(&buf, "xmax:%u\n", snapshot->xmax);
1249 
1250  /*
1251  * We must include our own top transaction ID in the top-xid data, since
1252  * by definition we will still be running when the importing transaction
1253  * adopts the snapshot, but GetSnapshotData never includes our own XID in
1254  * the snapshot. (There must, therefore, be enough room to add it.)
1255  *
1256  * However, it could be that our topXid is after the xmax, in which case
1257  * we shouldn't include it because xip[] members are expected to be before
1258  * xmax. (We need not make the same check for subxip[] members, see
1259  * snapshot.h.)
1260  */
1261  addTopXid = (TransactionIdIsValid(topXid) &&
1262  TransactionIdPrecedes(topXid, snapshot->xmax)) ? 1 : 0;
1263  appendStringInfo(&buf, "xcnt:%d\n", snapshot->xcnt + addTopXid);
1264  for (i = 0; i < snapshot->xcnt; i++)
1265  appendStringInfo(&buf, "xip:%u\n", snapshot->xip[i]);
1266  if (addTopXid)
1267  appendStringInfo(&buf, "xip:%u\n", topXid);
1268 
1269  /*
1270  * Similarly, we add our subcommitted child XIDs to the subxid data. Here,
1271  * we have to cope with possible overflow.
1272  */
1273  if (snapshot->suboverflowed ||
1274  snapshot->subxcnt + nchildren > GetMaxSnapshotSubxidCount())
1275  appendStringInfoString(&buf, "sof:1\n");
1276  else
1277  {
1278  appendStringInfoString(&buf, "sof:0\n");
1279  appendStringInfo(&buf, "sxcnt:%d\n", snapshot->subxcnt + nchildren);
1280  for (i = 0; i < snapshot->subxcnt; i++)
1281  appendStringInfo(&buf, "sxp:%u\n", snapshot->subxip[i]);
1282  for (i = 0; i < nchildren; i++)
1283  appendStringInfo(&buf, "sxp:%u\n", children[i]);
1284  }
1285  appendStringInfo(&buf, "rec:%u\n", snapshot->takenDuringRecovery);
1286 
1287  /*
1288  * Now write the text representation into a file. We first write to a
1289  * ".tmp" filename, and rename to final filename if no error. This
1290  * ensures that no other backend can read an incomplete file
1291  * (ImportSnapshot won't allow it because of its valid-characters check).
1292  */
1293  snprintf(pathtmp, sizeof(pathtmp), "%s.tmp", path);
1294  if (!(f = AllocateFile(pathtmp, PG_BINARY_W)))
1295  ereport(ERROR,
1297  errmsg("could not create file \"%s\": %m", pathtmp)));
1298 
1299  if (fwrite(buf.data, buf.len, 1, f) != 1)
1300  ereport(ERROR,
1302  errmsg("could not write to file \"%s\": %m", pathtmp)));
1303 
1304  /* no fsync() since file need not survive a system crash */
1305 
1306  if (FreeFile(f))
1307  ereport(ERROR,
1309  errmsg("could not write to file \"%s\": %m", pathtmp)));
1310 
1311  /*
1312  * Now that we have written everything into a .tmp file, rename the file
1313  * to remove the .tmp suffix.
1314  */
1315  if (rename(pathtmp, path) < 0)
1316  ereport(ERROR,
1318  errmsg("could not rename file \"%s\" to \"%s\": %m",
1319  pathtmp, path)));
1320 
1321  /*
1322  * The basename of the file is what we return from pg_export_snapshot().
1323  * It's already in path in a textual format and we know that the path
1324  * starts with SNAPSHOT_EXPORT_DIR. Skip over the prefix and the slash
1325  * and pstrdup it so as not to return the address of a local variable.
1326  */
1327  return pstrdup(path + strlen(SNAPSHOT_EXPORT_DIR) + 1);
1328 }
1329 
1330 /*
1331  * pg_export_snapshot
1332  * SQL-callable wrapper for ExportSnapshot.
1333  */
1334 Datum
1336 {
1337  char *snapshotName;
1338 
1339  snapshotName = ExportSnapshot(GetActiveSnapshot());
1340  PG_RETURN_TEXT_P(cstring_to_text(snapshotName));
1341 }
1342 
1343 
1344 /*
1345  * Parsing subroutines for ImportSnapshot: parse a line with the given
1346  * prefix followed by a value, and advance *s to the next line. The
1347  * filename is provided for use in error messages.
1348  */
1349 static int
1350 parseIntFromText(const char *prefix, char **s, const char *filename)
1351 {
1352  char *ptr = *s;
1353  int prefixlen = strlen(prefix);
1354  int val;
1355 
1356  if (strncmp(ptr, prefix, prefixlen) != 0)
1357  ereport(ERROR,
1358  (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1359  errmsg("invalid snapshot data in file \"%s\"", filename)));
1360  ptr += prefixlen;
1361  if (sscanf(ptr, "%d", &val) != 1)
1362  ereport(ERROR,
1363  (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1364  errmsg("invalid snapshot data in file \"%s\"", filename)));
1365  ptr = strchr(ptr, '\n');
1366  if (!ptr)
1367  ereport(ERROR,
1368  (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1369  errmsg("invalid snapshot data in file \"%s\"", filename)));
1370  *s = ptr + 1;
1371  return val;
1372 }
1373 
1374 static TransactionId
1375 parseXidFromText(const char *prefix, char **s, const char *filename)
1376 {
1377  char *ptr = *s;
1378  int prefixlen = strlen(prefix);
1380 
1381  if (strncmp(ptr, prefix, prefixlen) != 0)
1382  ereport(ERROR,
1383  (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1384  errmsg("invalid snapshot data in file \"%s\"", filename)));
1385  ptr += prefixlen;
1386  if (sscanf(ptr, "%u", &val) != 1)
1387  ereport(ERROR,
1388  (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1389  errmsg("invalid snapshot data in file \"%s\"", filename)));
1390  ptr = strchr(ptr, '\n');
1391  if (!ptr)
1392  ereport(ERROR,
1393  (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1394  errmsg("invalid snapshot data in file \"%s\"", filename)));
1395  *s = ptr + 1;
1396  return val;
1397 }
1398 
1399 static void
1400 parseVxidFromText(const char *prefix, char **s, const char *filename,
1401  VirtualTransactionId *vxid)
1402 {
1403  char *ptr = *s;
1404  int prefixlen = strlen(prefix);
1405 
1406  if (strncmp(ptr, prefix, prefixlen) != 0)
1407  ereport(ERROR,
1408  (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1409  errmsg("invalid snapshot data in file \"%s\"", filename)));
1410  ptr += prefixlen;
1411  if (sscanf(ptr, "%d/%u", &vxid->backendId, &vxid->localTransactionId) != 2)
1412  ereport(ERROR,
1413  (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1414  errmsg("invalid snapshot data in file \"%s\"", filename)));
1415  ptr = strchr(ptr, '\n');
1416  if (!ptr)
1417  ereport(ERROR,
1418  (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1419  errmsg("invalid snapshot data in file \"%s\"", filename)));
1420  *s = ptr + 1;
1421 }
1422 
1423 /*
1424  * ImportSnapshot
1425  * Import a previously exported snapshot. The argument should be a
1426  * filename in SNAPSHOT_EXPORT_DIR. Load the snapshot from that file.
1427  * This is called by "SET TRANSACTION SNAPSHOT 'foo'".
1428  */
1429 void
1430 ImportSnapshot(const char *idstr)
1431 {
1432  char path[MAXPGPATH];
1433  FILE *f;
1434  struct stat stat_buf;
1435  char *filebuf;
1436  int xcnt;
1437  int i;
1438  VirtualTransactionId src_vxid;
1439  int src_pid;
1440  Oid src_dbid;
1441  int src_isolevel;
1442  bool src_readonly;
1443  SnapshotData snapshot;
1444 
1445  /*
1446  * Must be at top level of a fresh transaction. Note in particular that
1447  * we check we haven't acquired an XID --- if we have, it's conceivable
1448  * that the snapshot would show it as not running, making for very screwy
1449  * behavior.
1450  */
1451  if (FirstSnapshotSet ||
1453  IsSubTransaction())
1454  ereport(ERROR,
1455  (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
1456  errmsg("SET TRANSACTION SNAPSHOT must be called before any query")));
1457 
1458  /*
1459  * If we are in read committed mode then the next query would execute with
1460  * a new snapshot thus making this function call quite useless.
1461  */
1463  ereport(ERROR,
1464  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1465  errmsg("a snapshot-importing transaction must have isolation level SERIALIZABLE or REPEATABLE READ")));
1466 
1467  /*
1468  * Verify the identifier: only 0-9, A-F and hyphens are allowed. We do
1469  * this mainly to prevent reading arbitrary files.
1470  */
1471  if (strspn(idstr, "0123456789ABCDEF-") != strlen(idstr))
1472  ereport(ERROR,
1473  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1474  errmsg("invalid snapshot identifier: \"%s\"", idstr)));
1475 
1476  /* OK, read the file */
1477  snprintf(path, MAXPGPATH, SNAPSHOT_EXPORT_DIR "/%s", idstr);
1478 
1479  f = AllocateFile(path, PG_BINARY_R);
1480  if (!f)
1481  ereport(ERROR,
1482  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1483  errmsg("invalid snapshot identifier: \"%s\"", idstr)));
1484 
1485  /* get the size of the file so that we know how much memory we need */
1486  if (fstat(fileno(f), &stat_buf))
1487  elog(ERROR, "could not stat file \"%s\": %m", path);
1488 
1489  /* and read the file into a palloc'd string */
1490  filebuf = (char *) palloc(stat_buf.st_size + 1);
1491  if (fread(filebuf, stat_buf.st_size, 1, f) != 1)
1492  elog(ERROR, "could not read file \"%s\": %m", path);
1493 
1494  filebuf[stat_buf.st_size] = '\0';
1495 
1496  FreeFile(f);
1497 
1498  /*
1499  * Construct a snapshot struct by parsing the file content.
1500  */
1501  memset(&snapshot, 0, sizeof(snapshot));
1502 
1503  parseVxidFromText("vxid:", &filebuf, path, &src_vxid);
1504  src_pid = parseIntFromText("pid:", &filebuf, path);
1505  /* we abuse parseXidFromText a bit here ... */
1506  src_dbid = parseXidFromText("dbid:", &filebuf, path);
1507  src_isolevel = parseIntFromText("iso:", &filebuf, path);
1508  src_readonly = parseIntFromText("ro:", &filebuf, path);
1509 
1510  snapshot.xmin = parseXidFromText("xmin:", &filebuf, path);
1511  snapshot.xmax = parseXidFromText("xmax:", &filebuf, path);
1512 
1513  snapshot.xcnt = xcnt = parseIntFromText("xcnt:", &filebuf, path);
1514 
1515  /* sanity-check the xid count before palloc */
1516  if (xcnt < 0 || xcnt > GetMaxSnapshotXidCount())
1517  ereport(ERROR,
1518  (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1519  errmsg("invalid snapshot data in file \"%s\"", path)));
1520 
1521  snapshot.xip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
1522  for (i = 0; i < xcnt; i++)
1523  snapshot.xip[i] = parseXidFromText("xip:", &filebuf, path);
1524 
1525  snapshot.suboverflowed = parseIntFromText("sof:", &filebuf, path);
1526 
1527  if (!snapshot.suboverflowed)
1528  {
1529  snapshot.subxcnt = xcnt = parseIntFromText("sxcnt:", &filebuf, path);
1530 
1531  /* sanity-check the xid count before palloc */
1532  if (xcnt < 0 || xcnt > GetMaxSnapshotSubxidCount())
1533  ereport(ERROR,
1534  (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1535  errmsg("invalid snapshot data in file \"%s\"", path)));
1536 
1537  snapshot.subxip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
1538  for (i = 0; i < xcnt; i++)
1539  snapshot.subxip[i] = parseXidFromText("sxp:", &filebuf, path);
1540  }
1541  else
1542  {
1543  snapshot.subxcnt = 0;
1544  snapshot.subxip = NULL;
1545  }
1546 
1547  snapshot.takenDuringRecovery = parseIntFromText("rec:", &filebuf, path);
1548 
1549  /*
1550  * Do some additional sanity checking, just to protect ourselves. We
1551  * don't trouble to check the array elements, just the most critical
1552  * fields.
1553  */
1554  if (!VirtualTransactionIdIsValid(src_vxid) ||
1555  !OidIsValid(src_dbid) ||
1556  !TransactionIdIsNormal(snapshot.xmin) ||
1557  !TransactionIdIsNormal(snapshot.xmax))
1558  ereport(ERROR,
1559  (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1560  errmsg("invalid snapshot data in file \"%s\"", path)));
1561 
1562  /*
1563  * If we're serializable, the source transaction must be too, otherwise
1564  * predicate.c has problems (SxactGlobalXmin could go backwards). Also, a
1565  * non-read-only transaction can't adopt a snapshot from a read-only
1566  * transaction, as predicate.c handles the cases very differently.
1567  */
1569  {
1570  if (src_isolevel != XACT_SERIALIZABLE)
1571  ereport(ERROR,
1572  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1573  errmsg("a serializable transaction cannot import a snapshot from a non-serializable transaction")));
1574  if (src_readonly && !XactReadOnly)
1575  ereport(ERROR,
1576  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1577  errmsg("a non-read-only serializable transaction cannot import a snapshot from a read-only transaction")));
1578  }
1579 
1580  /*
1581  * We cannot import a snapshot that was taken in a different database,
1582  * because vacuum calculates OldestXmin on a per-database basis; so the
1583  * source transaction's xmin doesn't protect us from data loss. This
1584  * restriction could be removed if the source transaction were to mark its
1585  * xmin as being globally applicable. But that would require some
1586  * additional syntax, since that has to be known when the snapshot is
1587  * initially taken. (See pgsql-hackers discussion of 2011-10-21.)
1588  */
1589  if (src_dbid != MyDatabaseId)
1590  ereport(ERROR,
1591  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1592  errmsg("cannot import a snapshot from a different database")));
1593 
1594  /* OK, install the snapshot */
1595  SetTransactionSnapshot(&snapshot, &src_vxid, src_pid, NULL);
1596 }
1597 
1598 /*
1599  * XactHasExportedSnapshots
1600  * Test whether current transaction has exported any snapshots.
1601  */
1602 bool
1604 {
1605  return (exportedSnapshots != NIL);
1606 }
1607 
1608 /*
1609  * DeleteAllExportedSnapshotFiles
1610  * Clean up any files that have been left behind by a crashed backend
1611  * that had exported snapshots before it died.
1612  *
1613  * This should be called during database startup or crash recovery.
1614  */
1615 void
1617 {
1618  char buf[MAXPGPATH + sizeof(SNAPSHOT_EXPORT_DIR)];
1619  DIR *s_dir;
1620  struct dirent *s_de;
1621 
1622  if (!(s_dir = AllocateDir(SNAPSHOT_EXPORT_DIR)))
1623  {
1624  /*
1625  * We really should have that directory in a sane cluster setup. But
1626  * then again if we don't, it's not fatal enough to make it FATAL.
1627  * Since we're running in the postmaster, LOG is our best bet.
1628  */
1629  elog(LOG, "could not open directory \"%s\": %m", SNAPSHOT_EXPORT_DIR);
1630  return;
1631  }
1632 
1633  while ((s_de = ReadDir(s_dir, SNAPSHOT_EXPORT_DIR)) != NULL)
1634  {
1635  if (strcmp(s_de->d_name, ".") == 0 ||
1636  strcmp(s_de->d_name, "..") == 0)
1637  continue;
1638 
1639  snprintf(buf, sizeof(buf), SNAPSHOT_EXPORT_DIR "/%s", s_de->d_name);
1640  /* Again, unlink failure is not worthy of FATAL */
1641  if (unlink(buf))
1642  elog(LOG, "could not unlink file \"%s\": %m", buf);
1643  }
1644 
1645  FreeDir(s_dir);
1646 }
1647 
1648 bool
1650 {
1651  if (pairingheap_is_empty(&RegisteredSnapshots) ||
1652  pairingheap_is_singular(&RegisteredSnapshots))
1653  return true;
1654 
1655  return false;
1656 }
1657 
1658 
1659 /*
1660  * Return a timestamp that is exactly on a minute boundary.
1661  *
1662  * If the argument is already aligned, return that value, otherwise move to
1663  * the next minute boundary following the given time.
1664  */
1665 static TimestampTz
1667 {
1668  TimestampTz retval = ts + (USECS_PER_MINUTE - 1);
1669 
1670  return retval - (retval % USECS_PER_MINUTE);
1671 }
1672 
1673 /*
1674  * Get current timestamp for snapshots
1675  *
1676  * This is basically GetCurrentTimestamp(), but with a guarantee that
1677  * the result never moves backward.
1678  */
1681 {
1683 
1684  /*
1685  * Don't let time move backward; if it hasn't advanced, use the old value.
1686  */
1687  SpinLockAcquire(&oldSnapshotControl->mutex_current);
1688  if (now <= oldSnapshotControl->current_timestamp)
1689  now = oldSnapshotControl->current_timestamp;
1690  else
1691  oldSnapshotControl->current_timestamp = now;
1692  SpinLockRelease(&oldSnapshotControl->mutex_current);
1693 
1694  return now;
1695 }
1696 
1697 /*
1698  * Get timestamp through which vacuum may have processed based on last stored
1699  * value for threshold_timestamp.
1700  *
1701  * XXX: So far, we never trust that a 64-bit value can be read atomically; if
1702  * that ever changes, we could get rid of the spinlock here.
1703  */
1706 {
1707  TimestampTz threshold_timestamp;
1708 
1709  SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1710  threshold_timestamp = oldSnapshotControl->threshold_timestamp;
1711  SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1712 
1713  return threshold_timestamp;
1714 }
1715 
1716 static void
1718 {
1719  SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1720  oldSnapshotControl->threshold_timestamp = ts;
1721  oldSnapshotControl->threshold_xid = xlimit;
1722  SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1723 }
1724 
1725 /*
1726  * TransactionIdLimitedForOldSnapshots
1727  *
1728  * Apply old snapshot limit, if any. This is intended to be called for page
1729  * pruning and table vacuuming, to allow old_snapshot_threshold to override
1730  * the normal global xmin value. Actual testing for snapshot too old will be
1731  * based on whether a snapshot timestamp is prior to the threshold timestamp
1732  * set in this function.
1733  */
1736  Relation relation)
1737 {
1738  if (TransactionIdIsNormal(recentXmin)
1739  && old_snapshot_threshold >= 0
1740  && RelationAllowsEarlyPruning(relation))
1741  {
1743  TransactionId xlimit = recentXmin;
1744  TransactionId latest_xmin;
1745  TimestampTz update_ts;
1746  bool same_ts_as_threshold = false;
1747 
1748  SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
1749  latest_xmin = oldSnapshotControl->latest_xmin;
1750  update_ts = oldSnapshotControl->next_map_update;
1751  SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin);
1752 
1753  /*
1754  * Zero threshold always overrides to latest xmin, if valid. Without
1755  * some heuristic it will find its own snapshot too old on, for
1756  * example, a simple UPDATE -- which would make it useless for most
1757  * testing, but there is no principled way to ensure that it doesn't
1758  * fail in this way. Use a five-second delay to try to get useful
1759  * testing behavior, but this may need adjustment.
1760  */
1761  if (old_snapshot_threshold == 0)
1762  {
1763  if (TransactionIdPrecedes(latest_xmin, MyPgXact->xmin)
1764  && TransactionIdFollows(latest_xmin, xlimit))
1765  xlimit = latest_xmin;
1766 
1767  ts -= 5 * USECS_PER_SEC;
1769 
1770  return xlimit;
1771  }
1772 
1775 
1776  /* Check for fast exit without LW locking. */
1777  SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1778  if (ts == oldSnapshotControl->threshold_timestamp)
1779  {
1780  xlimit = oldSnapshotControl->threshold_xid;
1781  same_ts_as_threshold = true;
1782  }
1783  SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1784 
1785  if (!same_ts_as_threshold)
1786  {
1787  if (ts == update_ts)
1788  {
1789  xlimit = latest_xmin;
1790  if (NormalTransactionIdFollows(xlimit, recentXmin))
1792  }
1793  else
1794  {
1795  LWLockAcquire(OldSnapshotTimeMapLock, LW_SHARED);
1796 
1797  if (oldSnapshotControl->count_used > 0
1798  && ts >= oldSnapshotControl->head_timestamp)
1799  {
1800  int offset;
1801 
1802  offset = ((ts - oldSnapshotControl->head_timestamp)
1803  / USECS_PER_MINUTE);
1804  if (offset > oldSnapshotControl->count_used - 1)
1805  offset = oldSnapshotControl->count_used - 1;
1806  offset = (oldSnapshotControl->head_offset + offset)
1808  xlimit = oldSnapshotControl->xid_by_minute[offset];
1809 
1810  if (NormalTransactionIdFollows(xlimit, recentXmin))
1812  }
1813 
1814  LWLockRelease(OldSnapshotTimeMapLock);
1815  }
1816  }
1817 
1818  /*
1819  * Failsafe protection against vacuuming work of active transaction.
1820  *
1821  * This is not an assertion because we avoid the spinlock for
1822  * performance, leaving open the possibility that xlimit could advance
1823  * and be more current; but it seems prudent to apply this limit. It
1824  * might make pruning a tiny bit less aggressive than it could be, but
1825  * protects against data loss bugs.
1826  */
1827  if (TransactionIdIsNormal(latest_xmin)
1828  && TransactionIdPrecedes(latest_xmin, xlimit))
1829  xlimit = latest_xmin;
1830 
1831  if (NormalTransactionIdFollows(xlimit, recentXmin))
1832  return xlimit;
1833  }
1834 
1835  return recentXmin;
1836 }
1837 
1838 /*
1839  * Take care of the circular buffer that maps time to xid.
1840  */
1841 void
1843 {
1844  TimestampTz ts;
1845  TransactionId latest_xmin;
1846  TimestampTz update_ts;
1847  bool map_update_required = false;
1848 
1849  /* Never call this function when old snapshot checking is disabled. */
1851 
1852  ts = AlignTimestampToMinuteBoundary(whenTaken);
1853 
1854  /*
1855  * Keep track of the latest xmin seen by any process. Update mapping with
1856  * a new value when we have crossed a bucket boundary.
1857  */
1858  SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
1859  latest_xmin = oldSnapshotControl->latest_xmin;
1860  update_ts = oldSnapshotControl->next_map_update;
1861  if (ts > update_ts)
1862  {
1863  oldSnapshotControl->next_map_update = ts;
1864  map_update_required = true;
1865  }
1866  if (TransactionIdFollows(xmin, latest_xmin))
1867  oldSnapshotControl->latest_xmin = xmin;
1868  SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin);
1869 
1870  /* We only needed to update the most recent xmin value. */
1871  if (!map_update_required)
1872  return;
1873 
1874  /* No further tracking needed for 0 (used for testing). */
1875  if (old_snapshot_threshold == 0)
1876  return;
1877 
1878  /*
1879  * We don't want to do something stupid with unusual values, but we don't
1880  * want to litter the log with warnings or break otherwise normal
1881  * processing for this feature; so if something seems unreasonable, just
1882  * log at DEBUG level and return without doing anything.
1883  */
1884  if (whenTaken < 0)
1885  {
1886  elog(DEBUG1,
1887  "MaintainOldSnapshotTimeMapping called with negative whenTaken = %ld",
1888  (long) whenTaken);
1889  return;
1890  }
1891  if (!TransactionIdIsNormal(xmin))
1892  {
1893  elog(DEBUG1,
1894  "MaintainOldSnapshotTimeMapping called with xmin = %lu",
1895  (unsigned long) xmin);
1896  return;
1897  }
1898 
1899  LWLockAcquire(OldSnapshotTimeMapLock, LW_EXCLUSIVE);
1900 
1901  Assert(oldSnapshotControl->head_offset >= 0);
1902  Assert(oldSnapshotControl->head_offset < OLD_SNAPSHOT_TIME_MAP_ENTRIES);
1903  Assert((oldSnapshotControl->head_timestamp % USECS_PER_MINUTE) == 0);
1904  Assert(oldSnapshotControl->count_used >= 0);
1905  Assert(oldSnapshotControl->count_used <= OLD_SNAPSHOT_TIME_MAP_ENTRIES);
1906 
1907  if (oldSnapshotControl->count_used == 0)
1908  {
1909  /* set up first entry for empty mapping */
1910  oldSnapshotControl->head_offset = 0;
1911  oldSnapshotControl->head_timestamp = ts;
1912  oldSnapshotControl->count_used = 1;
1913  oldSnapshotControl->xid_by_minute[0] = xmin;
1914  }
1915  else if (ts < oldSnapshotControl->head_timestamp)
1916  {
1917  /* old ts; log it at DEBUG */
1918  LWLockRelease(OldSnapshotTimeMapLock);
1919  elog(DEBUG1,
1920  "MaintainOldSnapshotTimeMapping called with old whenTaken = %ld",
1921  (long) whenTaken);
1922  return;
1923  }
1924  else if (ts <= (oldSnapshotControl->head_timestamp +
1925  ((oldSnapshotControl->count_used - 1)
1926  * USECS_PER_MINUTE)))
1927  {
1928  /* existing mapping; advance xid if possible */
1929  int bucket = (oldSnapshotControl->head_offset
1930  + ((ts - oldSnapshotControl->head_timestamp)
1931  / USECS_PER_MINUTE))
1933 
1934  if (TransactionIdPrecedes(oldSnapshotControl->xid_by_minute[bucket], xmin))
1935  oldSnapshotControl->xid_by_minute[bucket] = xmin;
1936  }
1937  else
1938  {
1939  /* We need a new bucket, but it might not be the very next one. */
1940  int advance = ((ts - oldSnapshotControl->head_timestamp)
1941  / USECS_PER_MINUTE);
1942 
1943  oldSnapshotControl->head_timestamp = ts;
1944 
1945  if (advance >= OLD_SNAPSHOT_TIME_MAP_ENTRIES)
1946  {
1947  /* Advance is so far that all old data is junk; start over. */
1948  oldSnapshotControl->head_offset = 0;
1949  oldSnapshotControl->count_used = 1;
1950  oldSnapshotControl->xid_by_minute[0] = xmin;
1951  }
1952  else
1953  {
1954  /* Store the new value in one or more buckets. */
1955  int i;
1956 
1957  for (i = 0; i < advance; i++)
1958  {
1959  if (oldSnapshotControl->count_used == OLD_SNAPSHOT_TIME_MAP_ENTRIES)
1960  {
1961  /* Map full and new value replaces old head. */
1962  int old_head = oldSnapshotControl->head_offset;
1963 
1964  if (old_head == (OLD_SNAPSHOT_TIME_MAP_ENTRIES - 1))
1965  oldSnapshotControl->head_offset = 0;
1966  else
1967  oldSnapshotControl->head_offset = old_head + 1;
1968  oldSnapshotControl->xid_by_minute[old_head] = xmin;
1969  }
1970  else
1971  {
1972  /* Extend map to unused entry. */
1973  int new_tail = (oldSnapshotControl->head_offset
1974  + oldSnapshotControl->count_used)
1976 
1977  oldSnapshotControl->count_used++;
1978  oldSnapshotControl->xid_by_minute[new_tail] = xmin;
1979  }
1980  }
1981  }
1982  }
1983 
1984  LWLockRelease(OldSnapshotTimeMapLock);
1985 }
1986 
1987 
1988 /*
1989  * Setup a snapshot that replaces normal catalog snapshots that allows catalog
1990  * access to behave just like it did at a certain point in the past.
1991  *
1992  * Needed for logical decoding.
1993  */
1994 void
1995 SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
1996 {
1997  Assert(historic_snapshot != NULL);
1998 
1999  /* setup the timetravel snapshot */
2000  HistoricSnapshot = historic_snapshot;
2001 
2002  /* setup (cmin, cmax) lookup hash */
2003  tuplecid_data = tuplecids;
2004 }
2005 
2006 
2007 /*
2008  * Make catalog snapshots behave normally again.
2009  */
2010 void
2012 {
2013  HistoricSnapshot = NULL;
2014  tuplecid_data = NULL;
2015 }
2016 
2017 bool
2019 {
2020  return HistoricSnapshot != NULL;
2021 }
2022 
2023 HTAB *
2025 {
2027  return tuplecid_data;
2028 }
2029 
2030 /*
2031  * EstimateSnapshotSpace
2032  * Returns the size needed to store the given snapshot.
2033  *
2034  * We are exporting only required fields from the Snapshot, stored in
2035  * SerializedSnapshotData.
2036  */
2037 Size
2039 {
2040  Size size;
2041 
2042  Assert(snap != InvalidSnapshot);
2044 
2045  /* We allocate any XID arrays needed in the same palloc block. */
2046  size = add_size(sizeof(SerializedSnapshotData),
2047  mul_size(snap->xcnt, sizeof(TransactionId)));
2048  if (snap->subxcnt > 0 &&
2049  (!snap->suboverflowed || snap->takenDuringRecovery))
2050  size = add_size(size,
2051  mul_size(snap->subxcnt, sizeof(TransactionId)));
2052 
2053  return size;
2054 }
2055 
2056 /*
2057  * SerializeSnapshot
2058  * Dumps the serialized snapshot (extracted from given snapshot) onto the
2059  * memory location at start_address.
2060  */
2061 void
2062 SerializeSnapshot(Snapshot snapshot, char *start_address)
2063 {
2064  SerializedSnapshotData serialized_snapshot;
2065 
2066  Assert(snapshot->subxcnt >= 0);
2067 
2068  /* Copy all required fields */
2069  serialized_snapshot.xmin = snapshot->xmin;
2070  serialized_snapshot.xmax = snapshot->xmax;
2071  serialized_snapshot.xcnt = snapshot->xcnt;
2072  serialized_snapshot.subxcnt = snapshot->subxcnt;
2073  serialized_snapshot.suboverflowed = snapshot->suboverflowed;
2074  serialized_snapshot.takenDuringRecovery = snapshot->takenDuringRecovery;
2075  serialized_snapshot.curcid = snapshot->curcid;
2076  serialized_snapshot.whenTaken = snapshot->whenTaken;
2077  serialized_snapshot.lsn = snapshot->lsn;
2078 
2079  /*
2080  * Ignore the SubXID array if it has overflowed, unless the snapshot was
2081  * taken during recovery - in that case, top-level XIDs are in subxip as
2082  * well, and we mustn't lose them.
2083  */
2084  if (serialized_snapshot.suboverflowed && !snapshot->takenDuringRecovery)
2085  serialized_snapshot.subxcnt = 0;
2086 
2087  /* Copy struct to possibly-unaligned buffer */
2088  memcpy(start_address,
2089  &serialized_snapshot, sizeof(SerializedSnapshotData));
2090 
2091  /* Copy XID array */
2092  if (snapshot->xcnt > 0)
2093  memcpy((TransactionId *) (start_address +
2094  sizeof(SerializedSnapshotData)),
2095  snapshot->xip, snapshot->xcnt * sizeof(TransactionId));
2096 
2097  /*
2098  * Copy SubXID array. Don't bother to copy it if it had overflowed,
2099  * though, because it's not used anywhere in that case. Except if it's a
2100  * snapshot taken during recovery; all the top-level XIDs are in subxip as
2101  * well in that case, so we mustn't lose them.
2102  */
2103  if (serialized_snapshot.subxcnt > 0)
2104  {
2105  Size subxipoff = sizeof(SerializedSnapshotData) +
2106  snapshot->xcnt * sizeof(TransactionId);
2107 
2108  memcpy((TransactionId *) (start_address + subxipoff),
2109  snapshot->subxip, snapshot->subxcnt * sizeof(TransactionId));
2110  }
2111 }
2112 
2113 /*
2114  * RestoreSnapshot
2115  * Restore a serialized snapshot from the specified address.
2116  *
2117  * The copy is palloc'd in TopTransactionContext and has initial refcounts set
2118  * to 0. The returned snapshot has the copied flag set.
2119  */
2120 Snapshot
2121 RestoreSnapshot(char *start_address)
2122 {
2123  SerializedSnapshotData serialized_snapshot;
2124  Size size;
2125  Snapshot snapshot;
2126  TransactionId *serialized_xids;
2127 
2128  memcpy(&serialized_snapshot, start_address,
2129  sizeof(SerializedSnapshotData));
2130  serialized_xids = (TransactionId *)
2131  (start_address + sizeof(SerializedSnapshotData));
2132 
2133  /* We allocate any XID arrays needed in the same palloc block. */
2134  size = sizeof(SnapshotData)
2135  + serialized_snapshot.xcnt * sizeof(TransactionId)
2136  + serialized_snapshot.subxcnt * sizeof(TransactionId);
2137 
2138  /* Copy all required fields */
2140  snapshot->satisfies = HeapTupleSatisfiesMVCC;
2141  snapshot->xmin = serialized_snapshot.xmin;
2142  snapshot->xmax = serialized_snapshot.xmax;
2143  snapshot->xip = NULL;
2144  snapshot->xcnt = serialized_snapshot.xcnt;
2145  snapshot->subxip = NULL;
2146  snapshot->subxcnt = serialized_snapshot.subxcnt;
2147  snapshot->suboverflowed = serialized_snapshot.suboverflowed;
2148  snapshot->takenDuringRecovery = serialized_snapshot.takenDuringRecovery;
2149  snapshot->curcid = serialized_snapshot.curcid;
2150  snapshot->whenTaken = serialized_snapshot.whenTaken;
2151  snapshot->lsn = serialized_snapshot.lsn;
2152 
2153  /* Copy XIDs, if present. */
2154  if (serialized_snapshot.xcnt > 0)
2155  {
2156  snapshot->xip = (TransactionId *) (snapshot + 1);
2157  memcpy(snapshot->xip, serialized_xids,
2158  serialized_snapshot.xcnt * sizeof(TransactionId));
2159  }
2160 
2161  /* Copy SubXIDs, if present. */
2162  if (serialized_snapshot.subxcnt > 0)
2163  {
2164  snapshot->subxip = ((TransactionId *) (snapshot + 1)) +
2165  serialized_snapshot.xcnt;
2166  memcpy(snapshot->subxip, serialized_xids + serialized_snapshot.xcnt,
2167  serialized_snapshot.subxcnt * sizeof(TransactionId));
2168  }
2169 
2170  /* Set the copied flag so that the caller will set refcounts correctly. */
2171  snapshot->regd_count = 0;
2172  snapshot->active_count = 0;
2173  snapshot->copied = true;
2174 
2175  return snapshot;
2176 }
2177 
2178 /*
2179  * Install a restored snapshot as the transaction snapshot.
2180  *
2181  * The second argument is of type void * so that snapmgr.h need not include
2182  * the declaration for PGPROC.
2183  */
2184 void
2185 RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
2186 {
2187  SetTransactionSnapshot(snapshot, NULL, InvalidPid, master_pgproc);
2188 }
void ImportSnapshot(const char *idstr)
Definition: snapmgr.c:1430
int slock_t
Definition: s_lock.h:888
#define NIL
Definition: pg_list.h:69
uint32 CommandId
Definition: c.h:411
int xactGetCommittedChildren(TransactionId **ptr)
Definition: xact.c:5108
SnapshotSatisfiesFunc satisfies
Definition: snapshot.h:54
pairingheap_node * pairingheap_first(pairingheap *heap)
Definition: pairingheap.c:130
Snapshot as_snap
Definition: snapmgr.c:182
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void UpdateActiveSnapshotCommandId(void)
Definition: snapmgr.c:781
SnapshotData CatalogSnapshotData
Definition: snapmgr.c:146
void SetSerializableTransactionSnapshot(Snapshot snapshot, VirtualTransactionId *sourcevxid, int sourcepid)
Definition: predicate.c:1674
bool RelationHasSysCache(Oid relid)
Definition: syscache.c:1398
static TransactionId parseXidFromText(const char *prefix, char **s, const char *filename)
Definition: snapmgr.c:1375
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:39
TimestampTz GetOldSnapshotThresholdTimestamp(void)
Definition: snapmgr.c:1705
bool XactHasExportedSnapshots(void)
Definition: snapmgr.c:1603
Snapshot RestoreSnapshot(char *start_address)
Definition: snapmgr.c:2121
static int32 next
Definition: blutils.c:210
MemoryContext TopTransactionContext
Definition: mcxt.c:48
bool TransactionIdFollows(TransactionId id1, TransactionId id2)
Definition: transam.c:334
BackendId backendId
Definition: proc.h:112
uint32 TransactionId
Definition: c.h:397
bool copied
Definition: snapshot.h:94
static int parseIntFromText(const char *prefix, char **s, const char *filename)
Definition: snapmgr.c:1350
Snapshot RegisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:863
#define pairingheap_reset(h)
Definition: pairingheap.h:93
static void FreeSnapshot(Snapshot snapshot)
Definition: snapmgr.c:715
#define USECS_PER_SEC
Definition: timestamp.h:94
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
TransactionId xmin
Definition: proc.h:213
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
static void SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid, int sourcepid, PGPROC *sourceproc)
Definition: snapmgr.c:565
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
char * pstrdup(const char *in)
Definition: mcxt.c:1077
slock_t mutex_threshold
Definition: snapmgr.c:91
#define SpinLockInit(lock)
Definition: spin.h:60
static void SnapshotResetXmin(void)
Definition: snapmgr.c:978
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
Definition: snapmgr.c:2185
#define RelationAllowsEarlyPruning(rel)
Definition: snapmgr.h:38
void ResourceOwnerEnlargeSnapshots(ResourceOwner owner)
Definition: resowner.c:1143
TransactionId TransactionIdLimitedForOldSnapshots(TransactionId recentXmin, Relation relation)
Definition: snapmgr.c:1735
Snapshot GetCatalogSnapshot(Oid relid)
Definition: snapmgr.c:440
XLogRecPtr lsn
Definition: snapshot.h:112
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define IsolationUsesXactSnapshot()
Definition: xact.h:43
static Snapshot HistoricSnapshot
Definition: snapmgr.c:152
Snapshot GetActiveSnapshot(void)
Definition: snapmgr.c:839
#define pairingheap_is_empty(h)
Definition: pairingheap.h:96
bool ThereAreNoPriorRegisteredSnapshots(void)
Definition: snapmgr.c:1649
#define USECS_PER_MINUTE
Definition: timestamp.h:93
int errcode(int sqlerrcode)
Definition: elog.c:575
TransactionId RecentXmin
Definition: snapmgr.c:165
#define PG_BINARY_W
Definition: c.h:1042
TimestampTz whenTaken
Definition: snapmgr.c:246
char * ExportSnapshot(Snapshot snapshot)
Definition: snapmgr.c:1159
bool suboverflowed
Definition: snapshot.h:91
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void PopActiveSnapshot(void)
Definition: snapmgr.c:812
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2011
static SnapshotData CurrentSnapshotData
Definition: snapmgr.c:144
static Snapshot FirstXactSnapshot
Definition: snapmgr.c:210
void AtSubCommit_Snapshot(int level)
Definition: snapmgr.c:1002
#define LOG
Definition: elog.h:26
struct SnapshotData * Snapshot
Definition: snapshot.h:23
unsigned int Oid
Definition: postgres_ext.h:31
LocalTransactionId localTransactionId
Definition: lock.h:66
Definition: dirent.h:9
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:304
uint32 regd_count
Definition: snapshot.h:108
#define OidIsValid(objectId)
Definition: c.h:538
#define PG_BINARY_R
Definition: c.h:1041
static Snapshot CurrentSnapshot
Definition: snapmgr.c:149
Snapshot GetNonHistoricCatalogSnapshot(Oid relid)
Definition: snapmgr.c:462
signed int int32
Definition: c.h:256
#define XACT_SERIALIZABLE
Definition: xact.h:31
PGXACT * MyPgXact
Definition: proc.c:68
TransactionId TransactionXmin
Definition: snapmgr.c:164
static List * exportedSnapshots
Definition: snapmgr.c:223
#define SNAPSHOT_EXPORT_DIR
Definition: snapmgr.c:213
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
struct ActiveSnapshotElt ActiveSnapshotElt
#define pairingheap_container(type, membername, ptr)
Definition: pairingheap.h:43
void AtSubAbort_Snapshot(int level)
Definition: snapmgr.c:1023
#define SpinLockAcquire(lock)
Definition: spin.h:62
Definition: dynahash.c:208
void pfree(void *pointer)
Definition: mcxt.c:950
bool IsInParallelMode(void)
Definition: xact.c:913
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:78
TimestampTz threshold_timestamp
Definition: snapmgr.c:92
Definition: dirent.c:25
#define FirstNormalTransactionId
Definition: transam.h:34
static Snapshot CatalogSnapshot
Definition: snapmgr.c:151
#define ERROR
Definition: elog.h:43
TimestampTz GetSnapshotCurrentTimestamp(void)
Definition: snapmgr.c:1680
static pairingheap RegisteredSnapshots
Definition: snapmgr.c:200
void SerializeSnapshot(Snapshot snapshot, char *start_address)
Definition: snapmgr.c:2062
bool FirstSnapshotSet
Definition: snapmgr.c:203
slock_t mutex_latest_xmin
Definition: snapmgr.c:88
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
TimestampTz next_map_update
Definition: snapmgr.c:90
static ActiveSnapshotElt * OldestActiveSnapshot
Definition: snapmgr.c:191
#define MAXPGPATH
void PushCopiedSnapshot(Snapshot snapshot)
Definition: snapmgr.c:769
TimestampTz current_timestamp
Definition: snapmgr.c:87
TransactionId threshold_xid
Definition: snapmgr.c:93
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:157
struct SerializedSnapshotData SerializedSnapshotData
static void SetOldSnapshotThresholdTimestamp(TimestampTz ts, TransactionId xlimit)
Definition: snapmgr.c:1717
static void parseVxidFromText(const char *prefix, char **s, const char *filename, VirtualTransactionId *vxid)
Definition: snapmgr.c:1400
static char * buf
Definition: pg_test_fsync.c:66
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:733
TransactionId RecentGlobalXmin
Definition: snapmgr.c:166
void ResourceOwnerForgetSnapshot(ResourceOwner owner, Snapshot snapshot)
Definition: resowner.c:1163
int errdetail(const char *fmt,...)
Definition: elog.c:873
int errcode_for_file_access(void)
Definition: elog.c:598
struct SnapshotData SnapshotData
FILE * AllocateFile(const char *name, const char *mode)
Definition: fd.c:2094
#define InvalidTransactionId
Definition: transam.h:31
struct ActiveSnapshotElt * as_next
Definition: snapmgr.c:184
Datum pg_export_snapshot(PG_FUNCTION_ARGS)
Definition: snapmgr.c:1335
unsigned int uint32
Definition: c.h:268
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2335
bool ActiveSnapshotSet(void)
Definition: snapmgr.c:851
TransactionId xmax
Definition: snapshot.h:67
TransactionId xmin
Definition: snapshot.h:66
TransactionId GetTopTransactionIdIfAny(void)
Definition: xact.c:404
static volatile OldSnapshotControlData * oldSnapshotControl
Definition: snapmgr.c:129
TransactionId RecentGlobalDataXmin
Definition: snapmgr.c:167
int unlink(const char *filename)
#define ereport(elevel, rest)
Definition: elog.h:122
Snapshot snapshot
Definition: snapmgr.c:219
void SnapMgrInit(void)
Definition: snapmgr.c:267
#define pairingheap_const_container(type, membername, ptr)
Definition: pairingheap.h:51
void InvalidateCatalogSnapshot(void)
Definition: snapmgr.c:510
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
TransactionId * xip
Definition: snapshot.h:77
static ActiveSnapshotElt * ActiveSnapshot
Definition: snapmgr.c:188
void UnregisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:905
List * lappend(List *list, void *datum)
Definition: list.c:128
static HTAB * tuplecid_data
Definition: snapmgr.c:170
char * snapfile
Definition: snapmgr.c:218
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define WARNING
Definition: elog.h:40
#define VirtualTransactionIdIsValid(vxid)
Definition: lock.h:71
TransactionId xmax
Definition: snapmgr.c:240
#define InvalidSnapshot
Definition: snapshot.h:25
#define SpinLockRelease(lock)
Definition: spin.h:64
Size EstimateSnapshotSpace(Snapshot snap)
Definition: snapmgr.c:2038
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
uintptr_t Datum
Definition: postgres.h:372
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
Oid MyDatabaseId
Definition: globals.c:77
Snapshot GetOldestSnapshot(void)
Definition: snapmgr.c:411
static SnapshotData SecondarySnapshotData
Definition: snapmgr.c:145
CommandId curcid
Definition: snapshot.h:96
static Snapshot CopySnapshot(Snapshot snapshot)
Definition: snapmgr.c:660
int GetMaxSnapshotXidCount(void)
Definition: procarray.c:1456
Snapshot RegisterSnapshotOnOwner(Snapshot snapshot, ResourceOwner owner)
Definition: snapmgr.c:876
bool XactReadOnly
Definition: xact.c:77
pairingheap_node ph_node
Definition: snapshot.h:109
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:761
#define PG_RETURN_TEXT_P(x)
Definition: fmgr.h:330
text * cstring_to_text(const char *s)
Definition: varlena.c:149
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
TransactionId latest_xmin
Definition: snapmgr.c:89
#define Assert(condition)
Definition: c.h:676
#define lfirst(lc)
Definition: pg_list.h:106
bool RelationInvalidatesSnapshotsOnly(Oid relid)
Definition: syscache.c:1375
BackendId backendId
Definition: lock.h:65
Snapshot GetSerializableTransactionSnapshot(Snapshot snapshot)
Definition: predicate.c:1634
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2401
void InvalidateCatalogSnapshotConditionally(void)
Definition: snapmgr.c:531
TransactionId xid_by_minute[FLEXIBLE_ARRAY_MEMBER]
Definition: snapmgr.c:126
bool takenDuringRecovery
Definition: snapshot.h:93
#define NormalTransactionIdFollows(id1, id2)
Definition: transam.h:67
size_t Size
Definition: c.h:356
Snapshot GetSnapshotData(Snapshot snapshot)
Definition: procarray.c:1509
static int list_length(const List *l)
Definition: pg_list.h:89
int XactIsoLevel
Definition: xact.c:74
void UnregisterSnapshotFromOwner(Snapshot snapshot, ResourceOwner owner)
Definition: snapmgr.c:918
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
slock_t mutex_current
Definition: snapmgr.c:86
static TimestampTz AlignTimestampToMinuteBoundary(TimestampTz ts)
Definition: snapmgr.c:1666
bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc)
Definition: procarray.c:1872
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:379
void SnapshotSetCommandId(CommandId curcid)
Definition: snapmgr.c:544
int FreeFile(FILE *file)
Definition: fd.c:2277
bool IsSubTransaction(void)
Definition: xact.c:4400
HTAB * HistoricSnapshotGetTupleCids(void)
Definition: snapmgr.c:2024
static char * filename
Definition: pg_dumpall.c:90
uint32 xcnt
Definition: snapshot.h:78
void * palloc(Size size)
Definition: mcxt.c:849
bool HistoricSnapshotActive(void)
Definition: snapmgr.c:2018
int errmsg(const char *fmt,...)
Definition: elog.c:797
void ResourceOwnerRememberSnapshot(ResourceOwner owner, Snapshot snapshot)
Definition: resowner.c:1154
#define IsolationIsSerializable()
Definition: xact.h:44
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
void DeleteAllExportedSnapshotFiles(void)
Definition: snapmgr.c:1616
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1995
static int xmin_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
Definition: snapmgr.c:944
int old_snapshot_threshold
Definition: snapmgr.c:75
TimestampTz head_timestamp
Definition: snapmgr.c:124
int i
TransactionId xmin
Definition: snapmgr.c:239
static Snapshot SecondarySnapshot
Definition: snapmgr.c:150
void * arg
struct OldSnapshotControlData OldSnapshotControlData
int GetMaxSnapshotSubxidCount(void)
Definition: procarray.c:1467
#define PG_FUNCTION_ARGS
Definition: fmgr.h:158
TimestampTz whenTaken
Definition: snapshot.h:111
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:170
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:112
char d_name[MAX_PATH]
Definition: dirent.h:14
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:687
#define elog
Definition: elog.h:219
#define TransactionIdIsValid(xid)
Definition: transam.h:41
void MaintainOldSnapshotTimeMapping(TimestampTz whenTaken, TransactionId xmin)
Definition: snapmgr.c:1842
#define OLD_SNAPSHOT_TIME_MAP_ENTRIES
Definition: snapmgr.h:32
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
bool ProcArrayInstallImportedXmin(TransactionId xmin, VirtualTransactionId *sourcevxid)
Definition: procarray.c:1797
bool HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot, Buffer buffer)
Definition: tqual.c:963
Size SnapMgrShmemSize(void)
Definition: snapmgr.c:251
Definition: proc.h:94
Definition: pg_list.h:45
struct ExportedSnapshot ExportedSnapshot
long val
Definition: informix.c:689
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
int FreeDir(DIR *dir)
Definition: fd.c:2444
#define offsetof(type, field)
Definition: c.h:555
void AtEOXact_Snapshot(bool isCommit, bool resetXmin)
Definition: snapmgr.c:1059
TransactionId * subxip
Definition: snapshot.h:89
#define InvalidPid
Definition: miscadmin.h:31
#define pairingheap_is_singular(h)
Definition: pairingheap.h:99
uint32 active_count
Definition: snapshot.h:107
int32 subxcnt
Definition: snapshot.h:90
LocalTransactionId lxid
Definition: proc.h:105