PostgreSQL Source Code  git master
commit_ts.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * commit_ts.c
4  * PostgreSQL commit timestamp manager
5  *
6  * This module is a pg_xact-like system that stores the commit timestamp
7  * for each transaction.
8  *
9  * XLOG interactions: this module generates an XLOG record whenever a new
10  * CommitTs page is initialized to zeroes. Also, one XLOG record is
11  * generated for setting of values when the caller requests it; this allows
12  * us to support values coming from places other than transaction commit.
13  * Other writes of CommitTS come from recording of transaction commit in
14  * xact.c, which generates its own XLOG records for these events and will
15  * re-perform the status update on redo; so we need make no additional XLOG
16  * entry here.
17  *
18  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
19  * Portions Copyright (c) 1994, Regents of the University of California
20  *
21  * src/backend/access/transam/commit_ts.c
22  *
23  *-------------------------------------------------------------------------
24  */
25 #include "postgres.h"
26 
27 #include "access/commit_ts.h"
28 #include "access/htup_details.h"
29 #include "access/slru.h"
30 #include "access/transam.h"
31 #include "access/xlogutils.h"
32 #include "catalog/pg_type.h"
33 #include "funcapi.h"
34 #include "miscadmin.h"
35 #include "pg_trace.h"
36 #include "storage/shmem.h"
37 #include "utils/builtins.h"
38 #include "utils/snapmgr.h"
39 #include "utils/timestamp.h"
40 
41 /*
42  * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
43  * everywhere else in Postgres.
44  *
45  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
46  * CommitTs page numbering also wraps around at
47  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
48  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
49  * explicit notice of that fact in this module, except when comparing segment
50  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
51  */
52 
53 /*
54  * We need 8+2 bytes per xact. Note that enlarging this struct might mean
55  * the largest possible file name is more than 5 chars long; see
56  * SlruScanDirectory.
57  */
58 typedef struct CommitTimestampEntry
59 {
63 
64 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
65  sizeof(RepOriginId))
66 
67 #define COMMIT_TS_XACTS_PER_PAGE \
68  (BLCKSZ / SizeOfCommitTimestampEntry)
69 
70 #define TransactionIdToCTsPage(xid) \
71  ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
72 #define TransactionIdToCTsEntry(xid) \
73  ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
74 
75 /*
76  * Link to shared-memory data structures for CommitTs control
77  */
79 
80 #define CommitTsCtl (&CommitTsCtlData)
81 
82 /*
83  * We keep a cache of the last value set in shared memory.
84  *
85  * This is also good place to keep the activation status. We keep this
86  * separate from the GUC so that the standby can activate the module if the
87  * primary has it active independently of the value of the GUC.
88  *
89  * This is protected by CommitTsLock. In some places, we use commitTsActive
90  * without acquiring the lock; where this happens, a comment explains the
91  * rationale for it.
92  */
93 typedef struct CommitTimestampShared
94 {
99 
101 
102 
103 /* GUC variable */
105 
106 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
107  TransactionId *subxids, TimestampTz ts,
108  RepOriginId nodeid, int pageno);
110  RepOriginId nodeid, int slotno);
111 static void error_commit_ts_disabled(void);
112 static int ZeroCommitTsPage(int pageno, bool writeXlog);
113 static bool CommitTsPagePrecedes(int page1, int page2);
114 static void ActivateCommitTs(void);
115 static void DeactivateCommitTs(void);
116 static void WriteZeroPageXlogRec(int pageno);
117 static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid);
118 
119 /*
120  * TransactionTreeSetCommitTsData
121  *
122  * Record the final commit timestamp of transaction entries in the commit log
123  * for a transaction and its subtransaction tree, as efficiently as possible.
124  *
125  * xid is the top level transaction id.
126  *
127  * subxids is an array of xids of length nsubxids, representing subtransactions
128  * in the tree of xid. In various cases nsubxids may be zero.
129  * The reason why tracking just the parent xid commit timestamp is not enough
130  * is that the subtrans SLRU does not stay valid across crashes (it's not
131  * permanent) so we need to keep the information about them here. If the
132  * subtrans implementation changes in the future, we might want to revisit the
133  * decision of storing timestamp info for each subxid.
134  */
135 void
139 {
140  int i;
141  TransactionId headxid;
142  TransactionId newestXact;
143 
144  /*
145  * No-op if the module is not active.
146  *
147  * An unlocked read here is fine, because in a standby (the only place
148  * where the flag can change in flight) this routine is only called by the
149  * recovery process, which is also the only process which can change the
150  * flag.
151  */
152  if (!commitTsShared->commitTsActive)
153  return;
154 
155  /*
156  * Figure out the latest Xid in this batch: either the last subxid if
157  * there's any, otherwise the parent xid.
158  */
159  if (nsubxids > 0)
160  newestXact = subxids[nsubxids - 1];
161  else
162  newestXact = xid;
163 
164  /*
165  * We split the xids to set the timestamp to in groups belonging to the
166  * same SLRU page; the first element in each such set is its head. The
167  * first group has the main XID as the head; subsequent sets use the first
168  * subxid not on the previous page as head. This way, we only have to
169  * lock/modify each SLRU page once.
170  */
171  for (i = 0, headxid = xid;;)
172  {
173  int pageno = TransactionIdToCTsPage(headxid);
174  int j;
175 
176  for (j = i; j < nsubxids; j++)
177  {
178  if (TransactionIdToCTsPage(subxids[j]) != pageno)
179  break;
180  }
181  /* subxids[i..j] are on the same page as the head */
182 
183  SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
184  pageno);
185 
186  /* if we wrote out all subxids, we're done. */
187  if (j + 1 >= nsubxids)
188  break;
189 
190  /*
191  * Set the new head and skip over it, as well as over the subxids we
192  * just wrote.
193  */
194  headxid = subxids[j];
195  i += j - i + 1;
196  }
197 
198  /* update the cached value in shared memory */
199  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
200  commitTsShared->xidLastCommit = xid;
201  commitTsShared->dataLastCommit.time = timestamp;
202  commitTsShared->dataLastCommit.nodeid = nodeid;
203 
204  /* and move forwards our endpoint, if needed */
207  LWLockRelease(CommitTsLock);
208 }
209 
210 /*
211  * Record the commit timestamp of transaction entries in the commit log for all
212  * entries on a single page. Atomic only on this page.
213  */
214 static void
216  TransactionId *subxids, TimestampTz ts,
217  RepOriginId nodeid, int pageno)
218 {
219  int slotno;
220  int i;
221 
222  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
223 
224  slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
225 
226  TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
227  for (i = 0; i < nsubxids; i++)
228  TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
229 
230  CommitTsCtl->shared->page_dirty[slotno] = true;
231 
232  LWLockRelease(CommitTsSLRULock);
233 }
234 
235 /*
236  * Sets the commit timestamp of a single transaction.
237  *
238  * Must be called with CommitTsSLRULock held
239  */
240 static void
242  RepOriginId nodeid, int slotno)
243 {
244  int entryno = TransactionIdToCTsEntry(xid);
245  CommitTimestampEntry entry;
246 
248 
249  entry.time = ts;
250  entry.nodeid = nodeid;
251 
252  memcpy(CommitTsCtl->shared->page_buffer[slotno] +
253  SizeOfCommitTimestampEntry * entryno,
255 }
256 
257 /*
258  * Interrogate the commit timestamp of a transaction.
259  *
260  * The return value indicates whether a commit timestamp record was found for
261  * the given xid. The timestamp value is returned in *ts (which may not be
262  * null), and the origin node for the Xid is returned in *nodeid, if it's not
263  * null.
264  */
265 bool
268 {
269  int pageno = TransactionIdToCTsPage(xid);
270  int entryno = TransactionIdToCTsEntry(xid);
271  int slotno;
272  CommitTimestampEntry entry;
273  TransactionId oldestCommitTsXid;
274  TransactionId newestCommitTsXid;
275 
276  if (!TransactionIdIsValid(xid))
277  ereport(ERROR,
278  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
279  errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
280  else if (!TransactionIdIsNormal(xid))
281  {
282  /* frozen and bootstrap xids are always committed far in the past */
283  *ts = 0;
284  if (nodeid)
285  *nodeid = 0;
286  return false;
287  }
288 
289  LWLockAcquire(CommitTsLock, LW_SHARED);
290 
291  /* Error if module not enabled */
292  if (!commitTsShared->commitTsActive)
294 
295  /*
296  * If we're asked for the cached value, return that. Otherwise, fall
297  * through to read from SLRU.
298  */
299  if (commitTsShared->xidLastCommit == xid)
300  {
301  *ts = commitTsShared->dataLastCommit.time;
302  if (nodeid)
303  *nodeid = commitTsShared->dataLastCommit.nodeid;
304 
305  LWLockRelease(CommitTsLock);
306  return *ts != 0;
307  }
308 
309  oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
310  newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
311  /* neither is invalid, or both are */
312  Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
313  LWLockRelease(CommitTsLock);
314 
315  /*
316  * Return empty if the requested value is outside our valid range.
317  */
318  if (!TransactionIdIsValid(oldestCommitTsXid) ||
319  TransactionIdPrecedes(xid, oldestCommitTsXid) ||
320  TransactionIdPrecedes(newestCommitTsXid, xid))
321  {
322  *ts = 0;
323  if (nodeid)
324  *nodeid = InvalidRepOriginId;
325  return false;
326  }
327 
328  /* lock is acquired by SimpleLruReadPage_ReadOnly */
329  slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
330  memcpy(&entry,
331  CommitTsCtl->shared->page_buffer[slotno] +
332  SizeOfCommitTimestampEntry * entryno,
334 
335  *ts = entry.time;
336  if (nodeid)
337  *nodeid = entry.nodeid;
338 
339  LWLockRelease(CommitTsSLRULock);
340  return *ts != 0;
341 }
342 
343 /*
344  * Return the Xid of the latest committed transaction. (As far as this module
345  * is concerned, anyway; it's up to the caller to ensure the value is useful
346  * for its purposes.)
347  *
348  * ts and nodeid are filled with the corresponding data; they can be passed
349  * as NULL if not wanted.
350  */
353 {
354  TransactionId xid;
355 
356  LWLockAcquire(CommitTsLock, LW_SHARED);
357 
358  /* Error if module not enabled */
359  if (!commitTsShared->commitTsActive)
361 
362  xid = commitTsShared->xidLastCommit;
363  if (ts)
364  *ts = commitTsShared->dataLastCommit.time;
365  if (nodeid)
366  *nodeid = commitTsShared->dataLastCommit.nodeid;
367  LWLockRelease(CommitTsLock);
368 
369  return xid;
370 }
371 
372 static void
374 {
375  ereport(ERROR,
376  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
377  errmsg("could not get commit timestamp data"),
379  errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
380  "track_commit_timestamp") :
381  errhint("Make sure the configuration parameter \"%s\" is set.",
382  "track_commit_timestamp")));
383 }
384 
385 /*
386  * SQL-callable wrapper to obtain commit time of a transaction
387  */
388 Datum
390 {
392  TimestampTz ts;
393  bool found;
394 
395  found = TransactionIdGetCommitTsData(xid, &ts, NULL);
396 
397  if (!found)
398  PG_RETURN_NULL();
399 
401 }
402 
403 
404 /*
405  * pg_last_committed_xact
406  *
407  * SQL-callable wrapper to obtain some information about the latest
408  * committed transaction: transaction ID, timestamp and replication
409  * origin.
410  */
411 Datum
413 {
414  TransactionId xid;
416  TimestampTz ts;
417  Datum values[3];
418  bool nulls[3];
419  TupleDesc tupdesc;
420  HeapTuple htup;
421 
422  /* and construct a tuple with our data */
423  xid = GetLatestCommitTsData(&ts, &nodeid);
424 
425  /*
426  * Construct a tuple descriptor for the result row. This must match this
427  * function's pg_proc entry!
428  */
429  tupdesc = CreateTemplateTupleDesc(3);
430  TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
431  XIDOID, -1, 0);
432  TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
433  TIMESTAMPTZOID, -1, 0);
434  TupleDescInitEntry(tupdesc, (AttrNumber) 3, "roident",
435  OIDOID, -1, 0);
436  tupdesc = BlessTupleDesc(tupdesc);
437 
438  if (!TransactionIdIsNormal(xid))
439  {
440  memset(nulls, true, sizeof(nulls));
441  }
442  else
443  {
444  values[0] = TransactionIdGetDatum(xid);
445  nulls[0] = false;
446 
447  values[1] = TimestampTzGetDatum(ts);
448  nulls[1] = false;
449 
450  values[2] = ObjectIdGetDatum((Oid) nodeid);
451  nulls[2] = false;
452  }
453 
454  htup = heap_form_tuple(tupdesc, values, nulls);
455 
457 }
458 
459 /*
460  * pg_xact_commit_timestamp_origin
461  *
462  * SQL-callable wrapper to obtain commit timestamp and replication origin
463  * of a given transaction.
464  */
465 Datum
467 {
470  TimestampTz ts;
471  Datum values[2];
472  bool nulls[2];
473  TupleDesc tupdesc;
474  HeapTuple htup;
475  bool found;
476 
477  found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
478 
479  /*
480  * Construct a tuple descriptor for the result row. This must match this
481  * function's pg_proc entry!
482  */
483  tupdesc = CreateTemplateTupleDesc(2);
484  TupleDescInitEntry(tupdesc, (AttrNumber) 1, "timestamp",
485  TIMESTAMPTZOID, -1, 0);
486  TupleDescInitEntry(tupdesc, (AttrNumber) 2, "roident",
487  OIDOID, -1, 0);
488  tupdesc = BlessTupleDesc(tupdesc);
489 
490  if (!found)
491  {
492  memset(nulls, true, sizeof(nulls));
493  }
494  else
495  {
496  values[0] = TimestampTzGetDatum(ts);
497  nulls[0] = false;
498 
499  values[1] = ObjectIdGetDatum((Oid) nodeid);
500  nulls[1] = false;
501  }
502 
503  htup = heap_form_tuple(tupdesc, values, nulls);
504 
506 }
507 
508 /*
509  * Number of shared CommitTS buffers.
510  *
511  * We use a very similar logic as for the number of CLOG buffers; see comments
512  * in CLOGShmemBuffers.
513  */
514 Size
516 {
517  return Min(16, Max(4, NBuffers / 1024));
518 }
519 
520 /*
521  * Shared memory sizing for CommitTs
522  */
523 Size
525 {
527  sizeof(CommitTimestampShared);
528 }
529 
530 /*
531  * Initialize CommitTs at system startup (postmaster start or standalone
532  * backend)
533  */
534 void
536 {
537  bool found;
538 
539  CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
541  CommitTsSLRULock, "pg_commit_ts",
545 
546  commitTsShared = ShmemInitStruct("CommitTs shared",
547  sizeof(CommitTimestampShared),
548  &found);
549 
550  if (!IsUnderPostmaster)
551  {
552  Assert(!found);
553 
554  commitTsShared->xidLastCommit = InvalidTransactionId;
555  TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
556  commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
557  commitTsShared->commitTsActive = false;
558  }
559  else
560  Assert(found);
561 }
562 
563 /*
564  * This function must be called ONCE on system install.
565  *
566  * (The CommitTs directory is assumed to have been created by initdb, and
567  * CommitTsShmemInit must have been called already.)
568  */
569 void
571 {
572  /*
573  * Nothing to do here at present, unlike most other SLRU modules; segments
574  * are created when the server is started with this module enabled. See
575  * ActivateCommitTs.
576  */
577 }
578 
579 /*
580  * Initialize (or reinitialize) a page of CommitTs to zeroes.
581  * If writeXlog is true, also emit an XLOG record saying we did this.
582  *
583  * The page is not actually written, just set up in shared memory.
584  * The slot number of the new page is returned.
585  *
586  * Control lock must be held at entry, and will be held at exit.
587  */
588 static int
589 ZeroCommitTsPage(int pageno, bool writeXlog)
590 {
591  int slotno;
592 
593  slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
594 
595  if (writeXlog)
596  WriteZeroPageXlogRec(pageno);
597 
598  return slotno;
599 }
600 
601 /*
602  * This must be called ONCE during postmaster or standalone-backend startup,
603  * after StartupXLOG has initialized ShmemVariableCache->nextXid.
604  */
605 void
607 {
609 }
610 
611 /*
612  * This must be called ONCE during postmaster or standalone-backend startup,
613  * after recovery has finished.
614  */
615 void
617 {
618  /*
619  * If the feature is not enabled, turn it off for good. This also removes
620  * any leftover data.
621  *
622  * Conversely, we activate the module if the feature is enabled. This is
623  * necessary for primary and standby as the activation depends on the
624  * control file contents at the beginning of recovery or when a
625  * XLOG_PARAMETER_CHANGE is replayed.
626  */
629  else
631 }
632 
633 /*
634  * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
635  * XLog record during recovery.
636  */
637 void
638 CommitTsParameterChange(bool newvalue, bool oldvalue)
639 {
640  /*
641  * If the commit_ts module is disabled in this server and we get word from
642  * the primary server that it is enabled there, activate it so that we can
643  * replay future WAL records involving it; also mark it as active on
644  * pg_control. If the old value was already set, we already did this, so
645  * don't do anything.
646  *
647  * If the module is disabled in the primary, disable it here too, unless
648  * the module is enabled locally.
649  *
650  * Note this only runs in the recovery process, so an unlocked read is
651  * fine.
652  */
653  if (newvalue)
654  {
655  if (!commitTsShared->commitTsActive)
657  }
658  else if (commitTsShared->commitTsActive)
660 }
661 
662 /*
663  * Activate this module whenever necessary.
664  * This must happen during postmaster or standalone-backend startup,
665  * or during WAL replay anytime the track_commit_timestamp setting is
666  * changed in the primary.
667  *
668  * The reason why this SLRU needs separate activation/deactivation functions is
669  * that it can be enabled/disabled during start and the activation/deactivation
670  * on the primary is propagated to the standby via replay. Other SLRUs don't
671  * have this property and they can be just initialized during normal startup.
672  *
673  * This is in charge of creating the currently active segment, if it's not
674  * already there. The reason for this is that the server might have been
675  * running with this module disabled for a while and thus might have skipped
676  * the normal creation point.
677  */
678 static void
680 {
681  TransactionId xid;
682  int pageno;
683 
684  /* If we've done this already, there's nothing to do */
685  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
686  if (commitTsShared->commitTsActive)
687  {
688  LWLockRelease(CommitTsLock);
689  return;
690  }
691  LWLockRelease(CommitTsLock);
692 
694  pageno = TransactionIdToCTsPage(xid);
695 
696  /*
697  * Re-Initialize our idea of the latest page number.
698  */
699  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
700  CommitTsCtl->shared->latest_page_number = pageno;
701  LWLockRelease(CommitTsSLRULock);
702 
703  /*
704  * If CommitTs is enabled, but it wasn't in the previous server run, we
705  * need to set the oldest and newest values to the next Xid; that way, we
706  * will not try to read data that might not have been set.
707  *
708  * XXX does this have a problem if a server is started with commitTs
709  * enabled, then started with commitTs disabled, then restarted with it
710  * enabled again? It doesn't look like it does, because there should be a
711  * checkpoint that sets the value to InvalidTransactionId at end of
712  * recovery; and so any chance of injecting new transactions without
713  * CommitTs values would occur after the oldestCommitTsXid has been set to
714  * Invalid temporarily.
715  */
716  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
718  {
721  }
722  LWLockRelease(CommitTsLock);
723 
724  /* Create the current segment file, if necessary */
726  {
727  int slotno;
728 
729  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
730  slotno = ZeroCommitTsPage(pageno, false);
732  Assert(!CommitTsCtl->shared->page_dirty[slotno]);
733  LWLockRelease(CommitTsSLRULock);
734  }
735 
736  /* Change the activation status in shared memory. */
737  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
738  commitTsShared->commitTsActive = true;
739  LWLockRelease(CommitTsLock);
740 }
741 
742 /*
743  * Deactivate this module.
744  *
745  * This must be called when the track_commit_timestamp parameter is turned off.
746  * This happens during postmaster or standalone-backend startup, or during WAL
747  * replay.
748  *
749  * Resets CommitTs into invalid state to make sure we don't hand back
750  * possibly-invalid data; also removes segments of old data.
751  */
752 static void
754 {
755  /*
756  * Cleanup the status in the shared memory.
757  *
758  * We reset everything in the commitTsShared record to prevent user from
759  * getting confusing data about last committed transaction on the standby
760  * when the module was activated repeatedly on the primary.
761  */
762  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
763 
764  commitTsShared->commitTsActive = false;
765  commitTsShared->xidLastCommit = InvalidTransactionId;
766  TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
767  commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
768 
771 
772  LWLockRelease(CommitTsLock);
773 
774  /*
775  * Remove *all* files. This is necessary so that there are no leftover
776  * files; in the case where this feature is later enabled after running
777  * with it disabled for some time there may be a gap in the file sequence.
778  * (We can probably tolerate out-of-sequence files, as they are going to
779  * be overwritten anyway when we wrap around, but it seems better to be
780  * tidy.)
781  */
782  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
784  LWLockRelease(CommitTsSLRULock);
785 }
786 
787 /*
788  * Perform a checkpoint --- either during shutdown, or on-the-fly
789  */
790 void
792 {
793  /*
794  * Write dirty CommitTs pages to disk. This may result in sync requests
795  * queued for later handling by ProcessSyncRequests(), as part of the
796  * checkpoint.
797  */
799 }
800 
801 /*
802  * Make sure that CommitTs has room for a newly-allocated XID.
803  *
804  * NB: this is called while holding XidGenLock. We want it to be very fast
805  * most of the time; even when it's not so fast, no actual I/O need happen
806  * unless we're forced to write out a dirty CommitTs or xlog page to make room
807  * in shared memory.
808  *
809  * NB: the current implementation relies on track_commit_timestamp being
810  * PGC_POSTMASTER.
811  */
812 void
814 {
815  int pageno;
816 
817  /*
818  * Nothing to do if module not enabled. Note we do an unlocked read of
819  * the flag here, which is okay because this routine is only called from
820  * GetNewTransactionId, which is never called in a standby.
821  */
822  Assert(!InRecovery);
823  if (!commitTsShared->commitTsActive)
824  return;
825 
826  /*
827  * No work except at first XID of a page. But beware: just after
828  * wraparound, the first XID of page zero is FirstNormalTransactionId.
829  */
830  if (TransactionIdToCTsEntry(newestXact) != 0 &&
832  return;
833 
834  pageno = TransactionIdToCTsPage(newestXact);
835 
836  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
837 
838  /* Zero the page and make an XLOG entry about it */
839  ZeroCommitTsPage(pageno, !InRecovery);
840 
841  LWLockRelease(CommitTsSLRULock);
842 }
843 
844 /*
845  * Remove all CommitTs segments before the one holding the passed
846  * transaction ID.
847  *
848  * Note that we don't need to flush XLOG here.
849  */
850 void
852 {
853  int cutoffPage;
854 
855  /*
856  * The cutoff point is the start of the segment containing oldestXact. We
857  * pass the *page* containing oldestXact to SimpleLruTruncate.
858  */
859  cutoffPage = TransactionIdToCTsPage(oldestXact);
860 
861  /* Check to see if there's any files that could be removed */
863  &cutoffPage))
864  return; /* nothing to remove */
865 
866  /* Write XLOG record */
867  WriteTruncateXlogRec(cutoffPage, oldestXact);
868 
869  /* Now we can remove the old CommitTs segment(s) */
870  SimpleLruTruncate(CommitTsCtl, cutoffPage);
871 }
872 
873 /*
874  * Set the limit values between which commit TS can be consulted.
875  */
876 void
878 {
879  /*
880  * Be careful not to overwrite values that are either further into the
881  * "future" or signal a disabled committs.
882  */
883  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
885  {
890  }
891  else
892  {
896  }
897  LWLockRelease(CommitTsLock);
898 }
899 
900 /*
901  * Move forwards the oldest commitTS value that can be consulted
902  */
903 void
905 {
906  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
910  LWLockRelease(CommitTsLock);
911 }
912 
913 
914 /*
915  * Decide whether a commitTS page number is "older" for truncation purposes.
916  * Analogous to CLOGPagePrecedes().
917  *
918  * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128. This
919  * introduces differences compared to CLOG and the other SLRUs having (1 <<
920  * 31) % per_page == 0. This function never tests exactly
921  * TransactionIdPrecedes(x-2^31, x). When the system reaches xidStopLimit,
922  * there are two possible counts of page boundaries between oldestXact and the
923  * latest XID assigned, depending on whether oldestXact is within the first
924  * 128 entries of its page. Since this function doesn't know the location of
925  * oldestXact within page2, it returns false for one page that actually is
926  * expendable. This is a wider (yet still negligible) version of the
927  * truncation opportunity that CLOGPagePrecedes() cannot recognize.
928  *
929  * For the sake of a worked example, number entries with decimal values such
930  * that page1==1 entries range from 1.0 to 1.999. Let N+0.15 be the number of
931  * pages that 2^31 entries will span (N is an integer). If oldestXact=N+2.1,
932  * then the final safe XID assignment leaves newestXact=1.95. We keep page 2,
933  * because entry=2.85 is the border that toggles whether entries precede the
934  * last entry of the oldestXact page. While page 2 is expendable at
935  * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
936  */
937 static bool
938 CommitTsPagePrecedes(int page1, int page2)
939 {
940  TransactionId xid1;
941  TransactionId xid2;
942 
943  xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
944  xid1 += FirstNormalTransactionId + 1;
945  xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
946  xid2 += FirstNormalTransactionId + 1;
947 
948  return (TransactionIdPrecedes(xid1, xid2) &&
950 }
951 
952 
953 /*
954  * Write a ZEROPAGE xlog record
955  */
956 static void
958 {
959  XLogBeginInsert();
960  XLogRegisterData((char *) (&pageno), sizeof(int));
961  (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
962 }
963 
964 /*
965  * Write a TRUNCATE xlog record
966  */
967 static void
968 WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
969 {
970  xl_commit_ts_truncate xlrec;
971 
972  xlrec.pageno = pageno;
973  xlrec.oldestXid = oldestXid;
974 
975  XLogBeginInsert();
976  XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
977  (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
978 }
979 
980 /*
981  * CommitTS resource manager's routines
982  */
983 void
985 {
986  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
987 
988  /* Backup blocks are not used in commit_ts records */
989  Assert(!XLogRecHasAnyBlockRefs(record));
990 
991  if (info == COMMIT_TS_ZEROPAGE)
992  {
993  int pageno;
994  int slotno;
995 
996  memcpy(&pageno, XLogRecGetData(record), sizeof(int));
997 
998  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
999 
1000  slotno = ZeroCommitTsPage(pageno, false);
1002  Assert(!CommitTsCtl->shared->page_dirty[slotno]);
1003 
1004  LWLockRelease(CommitTsSLRULock);
1005  }
1006  else if (info == COMMIT_TS_TRUNCATE)
1007  {
1009 
1011 
1012  /*
1013  * During XLOG replay, latest_page_number isn't set up yet; insert a
1014  * suitable value to bypass the sanity test in SimpleLruTruncate.
1015  */
1016  CommitTsCtl->shared->latest_page_number = trunc->pageno;
1017 
1019  }
1020  else
1021  elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1022 }
1023 
1024 /*
1025  * Entrypoint for sync.c to sync commit_ts files.
1026  */
1027 int
1028 committssyncfiletag(const FileTag *ftag, char *path)
1029 {
1030  return SlruSyncFileTag(CommitTsCtl, ftag, path);
1031 }
#define COMMIT_TS_ZEROPAGE
Definition: commit_ts.h:51
CommitTimestampEntry dataLastCommit
Definition: commit_ts.c:96
Datum pg_last_committed_xact(PG_FUNCTION_ARGS)
Definition: commit_ts.c:412
int errhint(const char *fmt,...)
Definition: elog.c:1156
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
int SlruSyncFileTag(SlruCtl ctl, const FileTag *ftag, char *path)
Definition: slru.c:1593
uint32 TransactionId
Definition: c.h:587
void SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
Definition: commit_ts.c:877
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1530
#define SizeOfCommitTimestampEntry
Definition: commit_ts.c:64
int64 timestamp
int64 TimestampTz
Definition: timestamp.h:39
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, RepOriginId nodeid, int pageno)
Definition: commit_ts.c:215
#define CommitTsCtl
Definition: commit_ts.c:80
static int ZeroCommitTsPage(int pageno, bool writeXlog)
Definition: commit_ts.c:589
int committssyncfiletag(const FileTag *ftag, char *path)
Definition: commit_ts.c:1028
void SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
Definition: slru.c:1226
#define Min(x, y)
Definition: c.h:986
void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid)
Definition: commit_ts.c:136
unsigned char uint8
Definition: c.h:439
uint16 RepOriginId
Definition: xlogdefs.h:65
void StartupCommitTs(void)
Definition: commit_ts.c:606
int errcode(int sqlerrcode)
Definition: elog.c:698
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
static TransactionId ReadNextTransactionId(void)
Definition: transam.h:316
unsigned int Oid
Definition: postgres_ext.h:31
bool RecoveryInProgress(void)
Definition: xlog.c:8226
static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
Definition: commit_ts.c:968
Datum pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
Definition: commit_ts.c:389
FullTransactionId nextXid
Definition: transam.h:220
#define PANIC
Definition: elog.h:50
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:156
#define PG_RETURN_TIMESTAMPTZ(x)
Definition: timestamp.h:40
Size CommitTsShmemBuffers(void)
Definition: commit_ts.c:515
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id, SyncRequestHandler sync_handler)
Definition: slru.c:187
void AdvanceOldestCommitTsXid(TransactionId oldestXact)
Definition: commit_ts.c:904
#define XidFromFullTransactionId(x)
Definition: transam.h:48
void CompleteCommitTsInitialization(void)
Definition: commit_ts.c:616
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
#define XLogRecGetData(decoder)
Definition: xlogreader.h:310
#define FirstNormalTransactionId
Definition: transam.h:34
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, RepOriginId nodeid, int slotno)
Definition: commit_ts.c:241
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define ERROR
Definition: elog.h:46
#define COMMIT_TS_XACTS_PER_PAGE
Definition: commit_ts.c:67
void CommitTsParameterChange(bool newvalue, bool oldvalue)
Definition: commit_ts.c:638
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid)
Definition: slru.c:395
TupleDesc BlessTupleDesc(TupleDesc tupdesc)
Definition: execTuples.c:2082
bool track_commit_timestamp
Definition: commit_ts.c:104
#define PG_GETARG_TRANSACTIONID(n)
Definition: fmgr.h:279
bool IsUnderPostmaster
Definition: globals.c:112
void commit_ts_redo(XLogReaderState *record)
Definition: commit_ts.c:984
VariableCache ShmemVariableCache
Definition: varsup.c:34
void SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
Definition: slru.c:1156
#define InvalidTransactionId
Definition: transam.h:31
TransactionId oldestXid
Definition: commit_ts.h:68
#define TIMESTAMP_NOBEGIN(j)
Definition: timestamp.h:112
static void ActivateCommitTs(void)
Definition: commit_ts.c:679
bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int pageno)
Definition: slru.c:626
void SimpleLruWritePage(SlruCtl ctl, int slotno)
Definition: slru.c:614
static void WriteZeroPageXlogRec(int pageno)
Definition: commit_ts.c:957
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:583
struct CommitTimestampEntry CommitTimestampEntry
TransactionId oldestCommitTsXid
Definition: transam.h:232
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:305
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
void TruncateCommitTs(TransactionId oldestXact)
Definition: commit_ts.c:851
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:340
TimestampTz time
Definition: commit_ts.c:60
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:432
#define TransactionIdGetDatum(X)
Definition: postgres.h:565
bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1500
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:266
#define TransactionIdToCTsPage(xid)
Definition: commit_ts.c:70
uintptr_t Datum
Definition: postgres.h:411
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:353
#define COMMIT_TS_TRUNCATE
Definition: commit_ts.h:52
static bool CommitTsPagePrecedes(int page1, int page2)
Definition: commit_ts.c:938
TransactionId xidLastCommit
Definition: commit_ts.c:95
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
Definition: slru.c:495
#define ereport(elevel,...)
Definition: elog.h:157
bool InRecovery
Definition: xlogutils.c:52
void CheckPointCommitTs(void)
Definition: commit_ts.c:791
#define Max(x, y)
Definition: c.h:980
#define Assert(condition)
Definition: c.h:804
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
void ExtendCommitTs(TransactionId newestXact)
Definition: commit_ts.c:813
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1553
TransactionId newestCommitTsXid
Definition: transam.h:233
size_t Size
Definition: c.h:540
Size CommitTsShmemSize(void)
Definition: commit_ts.c:524
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:220
#define TransactionIdToCTsEntry(xid)
Definition: commit_ts.c:72
static SlruCtlData CommitTsCtlData
Definition: commit_ts.c:78
#define InvalidRepOriginId
Definition: origin.h:33
static void DeactivateCommitTs(void)
Definition: commit_ts.c:753
static Datum values[MAXATTR]
Definition: bootstrap.c:166
int errmsg(const char *fmt,...)
Definition: elog.c:909
Datum pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
Definition: commit_ts.c:466
#define elog(elevel,...)
Definition: elog.h:232
int i
int NBuffers
Definition: globals.c:135
#define SlruPagePrecedesUnitTests(ctl, per_page)
Definition: slru.h:156
#define XLogRecHasAnyBlockRefs(decoder)
Definition: xlogreader.h:312
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
#define TransactionIdIsValid(xid)
Definition: transam.h:41
void BootStrapCommitTs(void)
Definition: commit_ts.c:570
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
RepOriginId nodeid
Definition: commit_ts.c:61
void XLogBeginInsert(void)
Definition: xloginsert.c:135
CommitTimestampShared * commitTsShared
Definition: commit_ts.c:100
int16 AttrNumber
Definition: attnum.h:21
static void error_commit_ts_disabled(void)
Definition: commit_ts.c:373
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
Definition: slru.c:280
#define SizeOfCommitTsTruncate
Definition: commit_ts.h:71
#define PG_RETURN_NULL()
Definition: fmgr.h:345
Definition: sync.h:50
TransactionId GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:352
void CommitTsShmemInit(void)
Definition: commit_ts.c:535
struct CommitTimestampShared CommitTimestampShared