PostgreSQL Source Code  git master
commit_ts.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * commit_ts.c
4  * PostgreSQL commit timestamp manager
5  *
6  * This module is a pg_xact-like system that stores the commit timestamp
7  * for each transaction.
8  *
9  * XLOG interactions: this module generates an XLOG record whenever a new
10  * CommitTs page is initialized to zeroes. Also, one XLOG record is
11  * generated for setting of values when the caller requests it; this allows
12  * us to support values coming from places other than transaction commit.
13  * Other writes of CommitTS come from recording of transaction commit in
14  * xact.c, which generates its own XLOG records for these events and will
15  * re-perform the status update on redo; so we need make no additional XLOG
16  * entry here.
17  *
18  * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
19  * Portions Copyright (c) 1994, Regents of the University of California
20  *
21  * src/backend/access/transam/commit_ts.c
22  *
23  *-------------------------------------------------------------------------
24  */
25 #include "postgres.h"
26 
27 #include "access/commit_ts.h"
28 #include "access/htup_details.h"
29 #include "access/slru.h"
30 #include "access/transam.h"
31 #include "access/xloginsert.h"
32 #include "access/xlogutils.h"
33 #include "catalog/pg_type.h"
34 #include "funcapi.h"
35 #include "miscadmin.h"
36 #include "pg_trace.h"
37 #include "storage/shmem.h"
38 #include "utils/builtins.h"
39 #include "utils/snapmgr.h"
40 #include "utils/timestamp.h"
41 
42 /*
43  * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
44  * everywhere else in Postgres.
45  *
46  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
47  * CommitTs page numbering also wraps around at
48  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
49  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
50  * explicit notice of that fact in this module, except when comparing segment
51  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
52  */
53 
54 /*
55  * We need 8+2 bytes per xact. Note that enlarging this struct might mean
56  * the largest possible file name is more than 5 chars long; see
57  * SlruScanDirectory.
58  */
59 typedef struct CommitTimestampEntry
60 {
64 
65 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
66  sizeof(RepOriginId))
67 
68 #define COMMIT_TS_XACTS_PER_PAGE \
69  (BLCKSZ / SizeOfCommitTimestampEntry)
70 
71 #define TransactionIdToCTsPage(xid) \
72  ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
73 #define TransactionIdToCTsEntry(xid) \
74  ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
75 
76 /*
77  * Link to shared-memory data structures for CommitTs control
78  */
80 
81 #define CommitTsCtl (&CommitTsCtlData)
82 
83 /*
84  * We keep a cache of the last value set in shared memory.
85  *
86  * This is also good place to keep the activation status. We keep this
87  * separate from the GUC so that the standby can activate the module if the
88  * primary has it active independently of the value of the GUC.
89  *
90  * This is protected by CommitTsLock. In some places, we use commitTsActive
91  * without acquiring the lock; where this happens, a comment explains the
92  * rationale for it.
93  */
94 typedef struct CommitTimestampShared
95 {
100 
102 
103 
104 /* GUC variable */
106 
107 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
108  TransactionId *subxids, TimestampTz ts,
109  RepOriginId nodeid, int pageno);
111  RepOriginId nodeid, int slotno);
112 static void error_commit_ts_disabled(void);
113 static int ZeroCommitTsPage(int pageno, bool writeXlog);
114 static bool CommitTsPagePrecedes(int page1, int page2);
115 static void ActivateCommitTs(void);
116 static void DeactivateCommitTs(void);
117 static void WriteZeroPageXlogRec(int pageno);
118 static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid);
119 
120 /*
121  * TransactionTreeSetCommitTsData
122  *
123  * Record the final commit timestamp of transaction entries in the commit log
124  * for a transaction and its subtransaction tree, as efficiently as possible.
125  *
126  * xid is the top level transaction id.
127  *
128  * subxids is an array of xids of length nsubxids, representing subtransactions
129  * in the tree of xid. In various cases nsubxids may be zero.
130  * The reason why tracking just the parent xid commit timestamp is not enough
131  * is that the subtrans SLRU does not stay valid across crashes (it's not
132  * permanent) so we need to keep the information about them here. If the
133  * subtrans implementation changes in the future, we might want to revisit the
134  * decision of storing timestamp info for each subxid.
135  */
136 void
139  RepOriginId nodeid)
140 {
141  int i;
142  TransactionId headxid;
143  TransactionId newestXact;
144 
145  /*
146  * No-op if the module is not active.
147  *
148  * An unlocked read here is fine, because in a standby (the only place
149  * where the flag can change in flight) this routine is only called by the
150  * recovery process, which is also the only process which can change the
151  * flag.
152  */
154  return;
155 
156  /*
157  * Figure out the latest Xid in this batch: either the last subxid if
158  * there's any, otherwise the parent xid.
159  */
160  if (nsubxids > 0)
161  newestXact = subxids[nsubxids - 1];
162  else
163  newestXact = xid;
164 
165  /*
166  * We split the xids to set the timestamp to in groups belonging to the
167  * same SLRU page; the first element in each such set is its head. The
168  * first group has the main XID as the head; subsequent sets use the first
169  * subxid not on the previous page as head. This way, we only have to
170  * lock/modify each SLRU page once.
171  */
172  headxid = xid;
173  i = 0;
174  for (;;)
175  {
176  int pageno = TransactionIdToCTsPage(headxid);
177  int j;
178 
179  for (j = i; j < nsubxids; j++)
180  {
181  if (TransactionIdToCTsPage(subxids[j]) != pageno)
182  break;
183  }
184  /* subxids[i..j] are on the same page as the head */
185 
186  SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
187  pageno);
188 
189  /* if we wrote out all subxids, we're done. */
190  if (j >= nsubxids)
191  break;
192 
193  /*
194  * Set the new head and skip over it, as well as over the subxids we
195  * just wrote.
196  */
197  headxid = subxids[j];
198  i = j + 1;
199  }
200 
201  /* update the cached value in shared memory */
202  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
206 
207  /* and move forwards our endpoint, if needed */
210  LWLockRelease(CommitTsLock);
211 }
212 
213 /*
214  * Record the commit timestamp of transaction entries in the commit log for all
215  * entries on a single page. Atomic only on this page.
216  */
217 static void
219  TransactionId *subxids, TimestampTz ts,
220  RepOriginId nodeid, int pageno)
221 {
222  int slotno;
223  int i;
224 
225  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
226 
227  slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
228 
229  TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
230  for (i = 0; i < nsubxids; i++)
231  TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
232 
233  CommitTsCtl->shared->page_dirty[slotno] = true;
234 
235  LWLockRelease(CommitTsSLRULock);
236 }
237 
238 /*
239  * Sets the commit timestamp of a single transaction.
240  *
241  * Must be called with CommitTsSLRULock held
242  */
243 static void
245  RepOriginId nodeid, int slotno)
246 {
247  int entryno = TransactionIdToCTsEntry(xid);
248  CommitTimestampEntry entry;
249 
251 
252  entry.time = ts;
253  entry.nodeid = nodeid;
254 
255  memcpy(CommitTsCtl->shared->page_buffer[slotno] +
256  SizeOfCommitTimestampEntry * entryno,
258 }
259 
260 /*
261  * Interrogate the commit timestamp of a transaction.
262  *
263  * The return value indicates whether a commit timestamp record was found for
264  * the given xid. The timestamp value is returned in *ts (which may not be
265  * null), and the origin node for the Xid is returned in *nodeid, if it's not
266  * null.
267  */
268 bool
270  RepOriginId *nodeid)
271 {
272  int pageno = TransactionIdToCTsPage(xid);
273  int entryno = TransactionIdToCTsEntry(xid);
274  int slotno;
275  CommitTimestampEntry entry;
276  TransactionId oldestCommitTsXid;
277  TransactionId newestCommitTsXid;
278 
279  if (!TransactionIdIsValid(xid))
280  ereport(ERROR,
281  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
282  errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
283  else if (!TransactionIdIsNormal(xid))
284  {
285  /* frozen and bootstrap xids are always committed far in the past */
286  *ts = 0;
287  if (nodeid)
288  *nodeid = 0;
289  return false;
290  }
291 
292  LWLockAcquire(CommitTsLock, LW_SHARED);
293 
294  /* Error if module not enabled */
297 
298  /*
299  * If we're asked for the cached value, return that. Otherwise, fall
300  * through to read from SLRU.
301  */
302  if (commitTsShared->xidLastCommit == xid)
303  {
305  if (nodeid)
307 
308  LWLockRelease(CommitTsLock);
309  return *ts != 0;
310  }
311 
312  oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
313  newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
314  /* neither is invalid, or both are */
315  Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
316  LWLockRelease(CommitTsLock);
317 
318  /*
319  * Return empty if the requested value is outside our valid range.
320  */
321  if (!TransactionIdIsValid(oldestCommitTsXid) ||
322  TransactionIdPrecedes(xid, oldestCommitTsXid) ||
323  TransactionIdPrecedes(newestCommitTsXid, xid))
324  {
325  *ts = 0;
326  if (nodeid)
327  *nodeid = InvalidRepOriginId;
328  return false;
329  }
330 
331  /* lock is acquired by SimpleLruReadPage_ReadOnly */
332  slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
333  memcpy(&entry,
334  CommitTsCtl->shared->page_buffer[slotno] +
335  SizeOfCommitTimestampEntry * entryno,
337 
338  *ts = entry.time;
339  if (nodeid)
340  *nodeid = entry.nodeid;
341 
342  LWLockRelease(CommitTsSLRULock);
343  return *ts != 0;
344 }
345 
346 /*
347  * Return the Xid of the latest committed transaction. (As far as this module
348  * is concerned, anyway; it's up to the caller to ensure the value is useful
349  * for its purposes.)
350  *
351  * ts and nodeid are filled with the corresponding data; they can be passed
352  * as NULL if not wanted.
353  */
356 {
357  TransactionId xid;
358 
359  LWLockAcquire(CommitTsLock, LW_SHARED);
360 
361  /* Error if module not enabled */
364 
366  if (ts)
368  if (nodeid)
370  LWLockRelease(CommitTsLock);
371 
372  return xid;
373 }
374 
375 static void
377 {
378  ereport(ERROR,
379  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
380  errmsg("could not get commit timestamp data"),
382  errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
383  "track_commit_timestamp") :
384  errhint("Make sure the configuration parameter \"%s\" is set.",
385  "track_commit_timestamp")));
386 }
387 
388 /*
389  * SQL-callable wrapper to obtain commit time of a transaction
390  */
391 Datum
393 {
395  TimestampTz ts;
396  bool found;
397 
398  found = TransactionIdGetCommitTsData(xid, &ts, NULL);
399 
400  if (!found)
401  PG_RETURN_NULL();
402 
404 }
405 
406 
407 /*
408  * pg_last_committed_xact
409  *
410  * SQL-callable wrapper to obtain some information about the latest
411  * committed transaction: transaction ID, timestamp and replication
412  * origin.
413  */
414 Datum
416 {
417  TransactionId xid;
418  RepOriginId nodeid;
419  TimestampTz ts;
420  Datum values[3];
421  bool nulls[3];
422  TupleDesc tupdesc;
423  HeapTuple htup;
424 
425  /* and construct a tuple with our data */
426  xid = GetLatestCommitTsData(&ts, &nodeid);
427 
428  /*
429  * Construct a tuple descriptor for the result row. This must match this
430  * function's pg_proc entry!
431  */
432  tupdesc = CreateTemplateTupleDesc(3);
433  TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
434  XIDOID, -1, 0);
435  TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
436  TIMESTAMPTZOID, -1, 0);
437  TupleDescInitEntry(tupdesc, (AttrNumber) 3, "roident",
438  OIDOID, -1, 0);
439  tupdesc = BlessTupleDesc(tupdesc);
440 
441  if (!TransactionIdIsNormal(xid))
442  {
443  memset(nulls, true, sizeof(nulls));
444  }
445  else
446  {
447  values[0] = TransactionIdGetDatum(xid);
448  nulls[0] = false;
449 
450  values[1] = TimestampTzGetDatum(ts);
451  nulls[1] = false;
452 
453  values[2] = ObjectIdGetDatum((Oid) nodeid);
454  nulls[2] = false;
455  }
456 
457  htup = heap_form_tuple(tupdesc, values, nulls);
458 
460 }
461 
462 /*
463  * pg_xact_commit_timestamp_origin
464  *
465  * SQL-callable wrapper to obtain commit timestamp and replication origin
466  * of a given transaction.
467  */
468 Datum
470 {
472  RepOriginId nodeid;
473  TimestampTz ts;
474  Datum values[2];
475  bool nulls[2];
476  TupleDesc tupdesc;
477  HeapTuple htup;
478  bool found;
479 
480  found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
481 
482  /*
483  * Construct a tuple descriptor for the result row. This must match this
484  * function's pg_proc entry!
485  */
486  tupdesc = CreateTemplateTupleDesc(2);
487  TupleDescInitEntry(tupdesc, (AttrNumber) 1, "timestamp",
488  TIMESTAMPTZOID, -1, 0);
489  TupleDescInitEntry(tupdesc, (AttrNumber) 2, "roident",
490  OIDOID, -1, 0);
491  tupdesc = BlessTupleDesc(tupdesc);
492 
493  if (!found)
494  {
495  memset(nulls, true, sizeof(nulls));
496  }
497  else
498  {
499  values[0] = TimestampTzGetDatum(ts);
500  nulls[0] = false;
501 
502  values[1] = ObjectIdGetDatum((Oid) nodeid);
503  nulls[1] = false;
504  }
505 
506  htup = heap_form_tuple(tupdesc, values, nulls);
507 
509 }
510 
511 /*
512  * Number of shared CommitTS buffers.
513  *
514  * We use a very similar logic as for the number of CLOG buffers (except we
515  * scale up twice as fast with shared buffers, and the maximum is twice as
516  * high); see comments in CLOGShmemBuffers.
517  */
518 Size
520 {
521  return Min(256, Max(4, NBuffers / 256));
522 }
523 
524 /*
525  * Shared memory sizing for CommitTs
526  */
527 Size
529 {
531  sizeof(CommitTimestampShared);
532 }
533 
534 /*
535  * Initialize CommitTs at system startup (postmaster start or standalone
536  * backend)
537  */
538 void
540 {
541  bool found;
542 
543  CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
545  CommitTsSLRULock, "pg_commit_ts",
549 
550  commitTsShared = ShmemInitStruct("CommitTs shared",
551  sizeof(CommitTimestampShared),
552  &found);
553 
554  if (!IsUnderPostmaster)
555  {
556  Assert(!found);
557 
562  }
563  else
564  Assert(found);
565 }
566 
567 /*
568  * This function must be called ONCE on system install.
569  *
570  * (The CommitTs directory is assumed to have been created by initdb, and
571  * CommitTsShmemInit must have been called already.)
572  */
573 void
575 {
576  /*
577  * Nothing to do here at present, unlike most other SLRU modules; segments
578  * are created when the server is started with this module enabled. See
579  * ActivateCommitTs.
580  */
581 }
582 
583 /*
584  * Initialize (or reinitialize) a page of CommitTs to zeroes.
585  * If writeXlog is true, also emit an XLOG record saying we did this.
586  *
587  * The page is not actually written, just set up in shared memory.
588  * The slot number of the new page is returned.
589  *
590  * Control lock must be held at entry, and will be held at exit.
591  */
592 static int
593 ZeroCommitTsPage(int pageno, bool writeXlog)
594 {
595  int slotno;
596 
597  slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
598 
599  if (writeXlog)
600  WriteZeroPageXlogRec(pageno);
601 
602  return slotno;
603 }
604 
605 /*
606  * This must be called ONCE during postmaster or standalone-backend startup,
607  * after StartupXLOG has initialized ShmemVariableCache->nextXid.
608  */
609 void
611 {
613 }
614 
615 /*
616  * This must be called ONCE during postmaster or standalone-backend startup,
617  * after recovery has finished.
618  */
619 void
621 {
622  /*
623  * If the feature is not enabled, turn it off for good. This also removes
624  * any leftover data.
625  *
626  * Conversely, we activate the module if the feature is enabled. This is
627  * necessary for primary and standby as the activation depends on the
628  * control file contents at the beginning of recovery or when a
629  * XLOG_PARAMETER_CHANGE is replayed.
630  */
633  else
635 }
636 
637 /*
638  * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
639  * XLog record during recovery.
640  */
641 void
642 CommitTsParameterChange(bool newvalue, bool oldvalue)
643 {
644  /*
645  * If the commit_ts module is disabled in this server and we get word from
646  * the primary server that it is enabled there, activate it so that we can
647  * replay future WAL records involving it; also mark it as active on
648  * pg_control. If the old value was already set, we already did this, so
649  * don't do anything.
650  *
651  * If the module is disabled in the primary, disable it here too, unless
652  * the module is enabled locally.
653  *
654  * Note this only runs in the recovery process, so an unlocked read is
655  * fine.
656  */
657  if (newvalue)
658  {
661  }
662  else if (commitTsShared->commitTsActive)
664 }
665 
666 /*
667  * Activate this module whenever necessary.
668  * This must happen during postmaster or standalone-backend startup,
669  * or during WAL replay anytime the track_commit_timestamp setting is
670  * changed in the primary.
671  *
672  * The reason why this SLRU needs separate activation/deactivation functions is
673  * that it can be enabled/disabled during start and the activation/deactivation
674  * on the primary is propagated to the standby via replay. Other SLRUs don't
675  * have this property and they can be just initialized during normal startup.
676  *
677  * This is in charge of creating the currently active segment, if it's not
678  * already there. The reason for this is that the server might have been
679  * running with this module disabled for a while and thus might have skipped
680  * the normal creation point.
681  */
682 static void
684 {
685  TransactionId xid;
686  int pageno;
687 
688  /* If we've done this already, there's nothing to do */
689  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
691  {
692  LWLockRelease(CommitTsLock);
693  return;
694  }
695  LWLockRelease(CommitTsLock);
696 
698  pageno = TransactionIdToCTsPage(xid);
699 
700  /*
701  * Re-Initialize our idea of the latest page number.
702  */
703  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
704  CommitTsCtl->shared->latest_page_number = pageno;
705  LWLockRelease(CommitTsSLRULock);
706 
707  /*
708  * If CommitTs is enabled, but it wasn't in the previous server run, we
709  * need to set the oldest and newest values to the next Xid; that way, we
710  * will not try to read data that might not have been set.
711  *
712  * XXX does this have a problem if a server is started with commitTs
713  * enabled, then started with commitTs disabled, then restarted with it
714  * enabled again? It doesn't look like it does, because there should be a
715  * checkpoint that sets the value to InvalidTransactionId at end of
716  * recovery; and so any chance of injecting new transactions without
717  * CommitTs values would occur after the oldestCommitTsXid has been set to
718  * Invalid temporarily.
719  */
720  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
722  {
725  }
726  LWLockRelease(CommitTsLock);
727 
728  /* Create the current segment file, if necessary */
730  {
731  int slotno;
732 
733  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
734  slotno = ZeroCommitTsPage(pageno, false);
736  Assert(!CommitTsCtl->shared->page_dirty[slotno]);
737  LWLockRelease(CommitTsSLRULock);
738  }
739 
740  /* Change the activation status in shared memory. */
741  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
743  LWLockRelease(CommitTsLock);
744 }
745 
746 /*
747  * Deactivate this module.
748  *
749  * This must be called when the track_commit_timestamp parameter is turned off.
750  * This happens during postmaster or standalone-backend startup, or during WAL
751  * replay.
752  *
753  * Resets CommitTs into invalid state to make sure we don't hand back
754  * possibly-invalid data; also removes segments of old data.
755  */
756 static void
758 {
759  /*
760  * Cleanup the status in the shared memory.
761  *
762  * We reset everything in the commitTsShared record to prevent user from
763  * getting confusing data about last committed transaction on the standby
764  * when the module was activated repeatedly on the primary.
765  */
766  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
767 
772 
775 
776  LWLockRelease(CommitTsLock);
777 
778  /*
779  * Remove *all* files. This is necessary so that there are no leftover
780  * files; in the case where this feature is later enabled after running
781  * with it disabled for some time there may be a gap in the file sequence.
782  * (We can probably tolerate out-of-sequence files, as they are going to
783  * be overwritten anyway when we wrap around, but it seems better to be
784  * tidy.)
785  */
786  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
788  LWLockRelease(CommitTsSLRULock);
789 }
790 
791 /*
792  * Perform a checkpoint --- either during shutdown, or on-the-fly
793  */
794 void
796 {
797  /*
798  * Write dirty CommitTs pages to disk. This may result in sync requests
799  * queued for later handling by ProcessSyncRequests(), as part of the
800  * checkpoint.
801  */
803 }
804 
805 /*
806  * Make sure that CommitTs has room for a newly-allocated XID.
807  *
808  * NB: this is called while holding XidGenLock. We want it to be very fast
809  * most of the time; even when it's not so fast, no actual I/O need happen
810  * unless we're forced to write out a dirty CommitTs or xlog page to make room
811  * in shared memory.
812  *
813  * NB: the current implementation relies on track_commit_timestamp being
814  * PGC_POSTMASTER.
815  */
816 void
818 {
819  int pageno;
820 
821  /*
822  * Nothing to do if module not enabled. Note we do an unlocked read of
823  * the flag here, which is okay because this routine is only called from
824  * GetNewTransactionId, which is never called in a standby.
825  */
826  Assert(!InRecovery);
828  return;
829 
830  /*
831  * No work except at first XID of a page. But beware: just after
832  * wraparound, the first XID of page zero is FirstNormalTransactionId.
833  */
834  if (TransactionIdToCTsEntry(newestXact) != 0 &&
836  return;
837 
838  pageno = TransactionIdToCTsPage(newestXact);
839 
840  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
841 
842  /* Zero the page and make an XLOG entry about it */
843  ZeroCommitTsPage(pageno, !InRecovery);
844 
845  LWLockRelease(CommitTsSLRULock);
846 }
847 
848 /*
849  * Remove all CommitTs segments before the one holding the passed
850  * transaction ID.
851  *
852  * Note that we don't need to flush XLOG here.
853  */
854 void
856 {
857  int cutoffPage;
858 
859  /*
860  * The cutoff point is the start of the segment containing oldestXact. We
861  * pass the *page* containing oldestXact to SimpleLruTruncate.
862  */
863  cutoffPage = TransactionIdToCTsPage(oldestXact);
864 
865  /* Check to see if there's any files that could be removed */
867  &cutoffPage))
868  return; /* nothing to remove */
869 
870  /* Write XLOG record */
871  WriteTruncateXlogRec(cutoffPage, oldestXact);
872 
873  /* Now we can remove the old CommitTs segment(s) */
874  SimpleLruTruncate(CommitTsCtl, cutoffPage);
875 }
876 
877 /*
878  * Set the limit values between which commit TS can be consulted.
879  */
880 void
882 {
883  /*
884  * Be careful not to overwrite values that are either further into the
885  * "future" or signal a disabled committs.
886  */
887  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
889  {
894  }
895  else
896  {
900  }
901  LWLockRelease(CommitTsLock);
902 }
903 
904 /*
905  * Move forwards the oldest commitTS value that can be consulted
906  */
907 void
909 {
910  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
914  LWLockRelease(CommitTsLock);
915 }
916 
917 
918 /*
919  * Decide whether a commitTS page number is "older" for truncation purposes.
920  * Analogous to CLOGPagePrecedes().
921  *
922  * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128. This
923  * introduces differences compared to CLOG and the other SLRUs having (1 <<
924  * 31) % per_page == 0. This function never tests exactly
925  * TransactionIdPrecedes(x-2^31, x). When the system reaches xidStopLimit,
926  * there are two possible counts of page boundaries between oldestXact and the
927  * latest XID assigned, depending on whether oldestXact is within the first
928  * 128 entries of its page. Since this function doesn't know the location of
929  * oldestXact within page2, it returns false for one page that actually is
930  * expendable. This is a wider (yet still negligible) version of the
931  * truncation opportunity that CLOGPagePrecedes() cannot recognize.
932  *
933  * For the sake of a worked example, number entries with decimal values such
934  * that page1==1 entries range from 1.0 to 1.999. Let N+0.15 be the number of
935  * pages that 2^31 entries will span (N is an integer). If oldestXact=N+2.1,
936  * then the final safe XID assignment leaves newestXact=1.95. We keep page 2,
937  * because entry=2.85 is the border that toggles whether entries precede the
938  * last entry of the oldestXact page. While page 2 is expendable at
939  * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
940  */
941 static bool
942 CommitTsPagePrecedes(int page1, int page2)
943 {
944  TransactionId xid1;
945  TransactionId xid2;
946 
947  xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
948  xid1 += FirstNormalTransactionId + 1;
949  xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
950  xid2 += FirstNormalTransactionId + 1;
951 
952  return (TransactionIdPrecedes(xid1, xid2) &&
954 }
955 
956 
957 /*
958  * Write a ZEROPAGE xlog record
959  */
960 static void
962 {
963  XLogBeginInsert();
964  XLogRegisterData((char *) (&pageno), sizeof(int));
965  (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
966 }
967 
968 /*
969  * Write a TRUNCATE xlog record
970  */
971 static void
972 WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
973 {
974  xl_commit_ts_truncate xlrec;
975 
976  xlrec.pageno = pageno;
977  xlrec.oldestXid = oldestXid;
978 
979  XLogBeginInsert();
980  XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
981  (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
982 }
983 
984 /*
985  * CommitTS resource manager's routines
986  */
987 void
989 {
990  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
991 
992  /* Backup blocks are not used in commit_ts records */
993  Assert(!XLogRecHasAnyBlockRefs(record));
994 
995  if (info == COMMIT_TS_ZEROPAGE)
996  {
997  int pageno;
998  int slotno;
999 
1000  memcpy(&pageno, XLogRecGetData(record), sizeof(int));
1001 
1002  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
1003 
1004  slotno = ZeroCommitTsPage(pageno, false);
1006  Assert(!CommitTsCtl->shared->page_dirty[slotno]);
1007 
1008  LWLockRelease(CommitTsSLRULock);
1009  }
1010  else if (info == COMMIT_TS_TRUNCATE)
1011  {
1013 
1015 
1016  /*
1017  * During XLOG replay, latest_page_number isn't set up yet; insert a
1018  * suitable value to bypass the sanity test in SimpleLruTruncate.
1019  */
1020  CommitTsCtl->shared->latest_page_number = trunc->pageno;
1021 
1023  }
1024  else
1025  elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1026 }
1027 
1028 /*
1029  * Entrypoint for sync.c to sync commit_ts files.
1030  */
1031 int
1032 committssyncfiletag(const FileTag *ftag, char *path)
1033 {
1034  return SlruSyncFileTag(CommitTsCtl, ftag, path);
1035 }
int16 AttrNumber
Definition: attnum.h:21
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define Min(x, y)
Definition: c.h:986
#define Max(x, y)
Definition: c.h:980
unsigned char uint8
Definition: c.h:439
uint32 TransactionId
Definition: c.h:587
size_t Size
Definition: c.h:540
void StartupCommitTs(void)
Definition: commit_ts.c:610
static SlruCtlData CommitTsCtlData
Definition: commit_ts.c:79
Datum pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
Definition: commit_ts.c:469
struct CommitTimestampEntry CommitTimestampEntry
struct CommitTimestampShared CommitTimestampShared
static void WriteZeroPageXlogRec(int pageno)
Definition: commit_ts.c:961
Datum pg_last_committed_xact(PG_FUNCTION_ARGS)
Definition: commit_ts.c:415
TransactionId GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:355
void CommitTsParameterChange(bool newvalue, bool oldvalue)
Definition: commit_ts.c:642
Size CommitTsShmemBuffers(void)
Definition: commit_ts.c:519
#define COMMIT_TS_XACTS_PER_PAGE
Definition: commit_ts.c:68
#define TransactionIdToCTsEntry(xid)
Definition: commit_ts.c:73
static void DeactivateCommitTs(void)
Definition: commit_ts.c:757
Size CommitTsShmemSize(void)
Definition: commit_ts.c:528
bool track_commit_timestamp
Definition: commit_ts.c:105
void AdvanceOldestCommitTsXid(TransactionId oldestXact)
Definition: commit_ts.c:908
static CommitTimestampShared * commitTsShared
Definition: commit_ts.c:101
int committssyncfiletag(const FileTag *ftag, char *path)
Definition: commit_ts.c:1032
void CompleteCommitTsInitialization(void)
Definition: commit_ts.c:620
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, RepOriginId nodeid, int pageno)
Definition: commit_ts.c:218
static int ZeroCommitTsPage(int pageno, bool writeXlog)
Definition: commit_ts.c:593
static void ActivateCommitTs(void)
Definition: commit_ts.c:683
void TruncateCommitTs(TransactionId oldestXact)
Definition: commit_ts.c:855
void commit_ts_redo(XLogReaderState *record)
Definition: commit_ts.c:988
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:269
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, RepOriginId nodeid, int slotno)
Definition: commit_ts.c:244
Datum pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
Definition: commit_ts.c:392
static void error_commit_ts_disabled(void)
Definition: commit_ts.c:376
#define SizeOfCommitTimestampEntry
Definition: commit_ts.c:65
void BootStrapCommitTs(void)
Definition: commit_ts.c:574
static bool CommitTsPagePrecedes(int page1, int page2)
Definition: commit_ts.c:942
void CommitTsShmemInit(void)
Definition: commit_ts.c:539
void SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
Definition: commit_ts.c:881
#define CommitTsCtl
Definition: commit_ts.c:81
void ExtendCommitTs(TransactionId newestXact)
Definition: commit_ts.c:817
static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
Definition: commit_ts.c:972
#define TransactionIdToCTsPage(xid)
Definition: commit_ts.c:71
void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid)
Definition: commit_ts.c:137
void CheckPointCommitTs(void)
Definition: commit_ts.c:795
#define COMMIT_TS_ZEROPAGE
Definition: commit_ts.h:47
#define SizeOfCommitTsTruncate
Definition: commit_ts.h:67
#define COMMIT_TS_TRUNCATE
Definition: commit_ts.h:48
int64 TimestampTz
Definition: timestamp.h:39
#define TIMESTAMP_NOBEGIN(j)
Definition: timestamp.h:151
int errhint(const char *fmt,...)
Definition: elog.c:1151
int errcode(int sqlerrcode)
Definition: elog.c:693
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define PANIC
Definition: elog.h:36
#define ERROR
Definition: elog.h:33
#define elog(elevel,...)
Definition: elog.h:218
#define ereport(elevel,...)
Definition: elog.h:143
TupleDesc BlessTupleDesc(TupleDesc tupdesc)
Definition: execTuples.c:2071
#define PG_RETURN_NULL()
Definition: fmgr.h:345
#define PG_GETARG_TRANSACTIONID(n)
Definition: fmgr.h:279
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:353
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:220
int NBuffers
Definition: globals.c:136
bool IsUnderPostmaster
Definition: globals.c:113
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
int j
Definition: isn.c:74
int i
Definition: isn.c:73
Assert(fmt[strlen(fmt) - 1] !='\n')
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1196
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1800
@ LWTRANCHE_COMMITTS_BUFFER
Definition: lwlock.h:170
@ LW_SHARED
Definition: lwlock.h:105
@ LW_EXCLUSIVE
Definition: lwlock.h:104
#define InvalidRepOriginId
Definition: origin.h:33
int64 timestamp
#define TransactionIdGetDatum(X)
Definition: postgres.h:565
uintptr_t Datum
Definition: postgres.h:411
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
unsigned int Oid
Definition: postgres_ext.h:31
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
void SimpleLruWritePage(SlruCtl ctl, int slotno)
Definition: slru.c:614
void SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
Definition: slru.c:1156
bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1500
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1530
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id, SyncRequestHandler sync_handler)
Definition: slru.c:187
void SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
Definition: slru.c:1226
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1553
int SlruSyncFileTag(SlruCtl ctl, const FileTag *ftag, char *path)
Definition: slru.c:1593
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
Definition: slru.c:280
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
Definition: slru.c:495
bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int pageno)
Definition: slru.c:626
int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid)
Definition: slru.c:395
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:156
#define SlruPagePrecedesUnitTests(ctl, per_page)
Definition: slru.h:156
TimestampTz time
Definition: commit_ts.c:61
RepOriginId nodeid
Definition: commit_ts.c:62
CommitTimestampEntry dataLastCommit
Definition: commit_ts.c:97
TransactionId xidLastCommit
Definition: commit_ts.c:96
Definition: sync.h:51
FullTransactionId nextXid
Definition: transam.h:220
TransactionId newestCommitTsXid
Definition: transam.h:233
TransactionId oldestCommitTsXid
Definition: transam.h:232
TransactionId oldestXid
Definition: commit_ts.h:64
@ SYNC_HANDLER_COMMIT_TS
Definition: sync.h:39
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static TransactionId ReadNextTransactionId(void)
Definition: transam.h:316
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
#define XidFromFullTransactionId(x)
Definition: transam.h:48
#define FirstNormalTransactionId
Definition: transam.h:34
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:583
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
#define PG_RETURN_TIMESTAMPTZ(x)
Definition: timestamp.h:40
VariableCache ShmemVariableCache
Definition: varsup.c:34
bool RecoveryInProgress(void)
Definition: xlog.c:5753
uint16 RepOriginId
Definition: xlogdefs.h:65
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:443
void XLogBeginInsert(void)
Definition: xloginsert.c:150
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:351
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:408
#define XLogRecGetData(decoder)
Definition: xlogreader.h:413
#define XLogRecHasAnyBlockRefs(decoder)
Definition: xlogreader.h:415
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
bool InRecovery
Definition: xlogutils.c:53