PostgreSQL Source Code  git master
commit_ts.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * commit_ts.c
4  * PostgreSQL commit timestamp manager
5  *
6  * This module is a pg_xact-like system that stores the commit timestamp
7  * for each transaction.
8  *
9  * XLOG interactions: this module generates an XLOG record whenever a new
10  * CommitTs page is initialized to zeroes. Also, one XLOG record is
11  * generated for setting of values when the caller requests it; this allows
12  * us to support values coming from places other than transaction commit.
13  * Other writes of CommitTS come from recording of transaction commit in
14  * xact.c, which generates its own XLOG records for these events and will
15  * re-perform the status update on redo; so we need make no additional XLOG
16  * entry here.
17  *
18  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
19  * Portions Copyright (c) 1994, Regents of the University of California
20  *
21  * src/backend/access/transam/commit_ts.c
22  *
23  *-------------------------------------------------------------------------
24  */
25 #include "postgres.h"
26 
27 #include "access/commit_ts.h"
28 #include "access/htup_details.h"
29 #include "access/slru.h"
30 #include "access/transam.h"
31 #include "catalog/pg_type.h"
32 #include "funcapi.h"
33 #include "miscadmin.h"
34 #include "pg_trace.h"
35 #include "storage/shmem.h"
36 #include "utils/builtins.h"
37 #include "utils/snapmgr.h"
38 #include "utils/timestamp.h"
39 
40 /*
41  * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
42  * everywhere else in Postgres.
43  *
44  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
45  * CommitTs page numbering also wraps around at
46  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
47  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
48  * explicit notice of that fact in this module, except when comparing segment
49  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
50  */
51 
52 /*
53  * We need 8+2 bytes per xact. Note that enlarging this struct might mean
54  * the largest possible file name is more than 5 chars long; see
55  * SlruScanDirectory.
56  */
57 typedef struct CommitTimestampEntry
58 {
62 
63 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
64  sizeof(RepOriginId))
65 
66 #define COMMIT_TS_XACTS_PER_PAGE \
67  (BLCKSZ / SizeOfCommitTimestampEntry)
68 
69 #define TransactionIdToCTsPage(xid) \
70  ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
71 #define TransactionIdToCTsEntry(xid) \
72  ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
73 
74 /*
75  * Link to shared-memory data structures for CommitTs control
76  */
78 
79 #define CommitTsCtl (&CommitTsCtlData)
80 
81 /*
82  * We keep a cache of the last value set in shared memory.
83  *
84  * This is also good place to keep the activation status. We keep this
85  * separate from the GUC so that the standby can activate the module if the
86  * primary has it active independently of the value of the GUC.
87  *
88  * This is protected by CommitTsLock. In some places, we use commitTsActive
89  * without acquiring the lock; where this happens, a comment explains the
90  * rationale for it.
91  */
92 typedef struct CommitTimestampShared
93 {
98 
100 
101 
102 /* GUC variable */
104 
105 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
106  TransactionId *subxids, TimestampTz ts,
107  RepOriginId nodeid, int pageno);
109  RepOriginId nodeid, int slotno);
110 static void error_commit_ts_disabled(void);
111 static int ZeroCommitTsPage(int pageno, bool writeXlog);
112 static bool CommitTsPagePrecedes(int page1, int page2);
113 static void ActivateCommitTs(void);
114 static void DeactivateCommitTs(void);
115 static void WriteZeroPageXlogRec(int pageno);
116 static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid);
117 static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
120 
121 /*
122  * TransactionTreeSetCommitTsData
123  *
124  * Record the final commit timestamp of transaction entries in the commit log
125  * for a transaction and its subtransaction tree, as efficiently as possible.
126  *
127  * xid is the top level transaction id.
128  *
129  * subxids is an array of xids of length nsubxids, representing subtransactions
130  * in the tree of xid. In various cases nsubxids may be zero.
131  * The reason why tracking just the parent xid commit timestamp is not enough
132  * is that the subtrans SLRU does not stay valid across crashes (it's not
133  * permanent) so we need to keep the information about them here. If the
134  * subtrans implementation changes in the future, we might want to revisit the
135  * decision of storing timestamp info for each subxid.
136  *
137  * The write_xlog parameter tells us whether to include an XLog record of this
138  * or not. Normally, this is called from transaction commit routines (both
139  * normal and prepared) and the information will be stored in the transaction
140  * commit XLog record, and so they should pass "false" for this. The XLog redo
141  * code should use "false" here as well. Other callers probably want to pass
142  * true, so that the given values persist in case of crashes.
143  */
144 void
147  RepOriginId nodeid, bool write_xlog)
148 {
149  int i;
150  TransactionId headxid;
151  TransactionId newestXact;
152 
153  /*
154  * No-op if the module is not active.
155  *
156  * An unlocked read here is fine, because in a standby (the only place
157  * where the flag can change in flight) this routine is only called by the
158  * recovery process, which is also the only process which can change the
159  * flag.
160  */
161  if (!commitTsShared->commitTsActive)
162  return;
163 
164  /*
165  * Comply with the WAL-before-data rule: if caller specified it wants this
166  * value to be recorded in WAL, do so before touching the data.
167  */
168  if (write_xlog)
169  WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
170 
171  /*
172  * Figure out the latest Xid in this batch: either the last subxid if
173  * there's any, otherwise the parent xid.
174  */
175  if (nsubxids > 0)
176  newestXact = subxids[nsubxids - 1];
177  else
178  newestXact = xid;
179 
180  /*
181  * We split the xids to set the timestamp to in groups belonging to the
182  * same SLRU page; the first element in each such set is its head. The
183  * first group has the main XID as the head; subsequent sets use the first
184  * subxid not on the previous page as head. This way, we only have to
185  * lock/modify each SLRU page once.
186  */
187  for (i = 0, headxid = xid;;)
188  {
189  int pageno = TransactionIdToCTsPage(headxid);
190  int j;
191 
192  for (j = i; j < nsubxids; j++)
193  {
194  if (TransactionIdToCTsPage(subxids[j]) != pageno)
195  break;
196  }
197  /* subxids[i..j] are on the same page as the head */
198 
199  SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
200  pageno);
201 
202  /* if we wrote out all subxids, we're done. */
203  if (j + 1 >= nsubxids)
204  break;
205 
206  /*
207  * Set the new head and skip over it, as well as over the subxids we
208  * just wrote.
209  */
210  headxid = subxids[j];
211  i += j - i + 1;
212  }
213 
214  /* update the cached value in shared memory */
215  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
216  commitTsShared->xidLastCommit = xid;
217  commitTsShared->dataLastCommit.time = timestamp;
218  commitTsShared->dataLastCommit.nodeid = nodeid;
219 
220  /* and move forwards our endpoint, if needed */
223  LWLockRelease(CommitTsLock);
224 }
225 
226 /*
227  * Record the commit timestamp of transaction entries in the commit log for all
228  * entries on a single page. Atomic only on this page.
229  */
230 static void
232  TransactionId *subxids, TimestampTz ts,
233  RepOriginId nodeid, int pageno)
234 {
235  int slotno;
236  int i;
237 
238  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
239 
240  slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
241 
242  TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
243  for (i = 0; i < nsubxids; i++)
244  TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
245 
246  CommitTsCtl->shared->page_dirty[slotno] = true;
247 
248  LWLockRelease(CommitTsSLRULock);
249 }
250 
251 /*
252  * Sets the commit timestamp of a single transaction.
253  *
254  * Must be called with CommitTsSLRULock held
255  */
256 static void
258  RepOriginId nodeid, int slotno)
259 {
260  int entryno = TransactionIdToCTsEntry(xid);
261  CommitTimestampEntry entry;
262 
264 
265  entry.time = ts;
266  entry.nodeid = nodeid;
267 
268  memcpy(CommitTsCtl->shared->page_buffer[slotno] +
269  SizeOfCommitTimestampEntry * entryno,
271 }
272 
273 /*
274  * Interrogate the commit timestamp of a transaction.
275  *
276  * The return value indicates whether a commit timestamp record was found for
277  * the given xid. The timestamp value is returned in *ts (which may not be
278  * null), and the origin node for the Xid is returned in *nodeid, if it's not
279  * null.
280  */
281 bool
284 {
285  int pageno = TransactionIdToCTsPage(xid);
286  int entryno = TransactionIdToCTsEntry(xid);
287  int slotno;
288  CommitTimestampEntry entry;
289  TransactionId oldestCommitTsXid;
290  TransactionId newestCommitTsXid;
291 
292  if (!TransactionIdIsValid(xid))
293  ereport(ERROR,
294  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
295  errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
296  else if (!TransactionIdIsNormal(xid))
297  {
298  /* frozen and bootstrap xids are always committed far in the past */
299  *ts = 0;
300  if (nodeid)
301  *nodeid = 0;
302  return false;
303  }
304 
305  LWLockAcquire(CommitTsLock, LW_SHARED);
306 
307  /* Error if module not enabled */
308  if (!commitTsShared->commitTsActive)
310 
311  /*
312  * If we're asked for the cached value, return that. Otherwise, fall
313  * through to read from SLRU.
314  */
315  if (commitTsShared->xidLastCommit == xid)
316  {
317  *ts = commitTsShared->dataLastCommit.time;
318  if (nodeid)
319  *nodeid = commitTsShared->dataLastCommit.nodeid;
320 
321  LWLockRelease(CommitTsLock);
322  return *ts != 0;
323  }
324 
325  oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
326  newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
327  /* neither is invalid, or both are */
328  Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
329  LWLockRelease(CommitTsLock);
330 
331  /*
332  * Return empty if the requested value is outside our valid range.
333  */
334  if (!TransactionIdIsValid(oldestCommitTsXid) ||
335  TransactionIdPrecedes(xid, oldestCommitTsXid) ||
336  TransactionIdPrecedes(newestCommitTsXid, xid))
337  {
338  *ts = 0;
339  if (nodeid)
340  *nodeid = InvalidRepOriginId;
341  return false;
342  }
343 
344  /* lock is acquired by SimpleLruReadPage_ReadOnly */
345  slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
346  memcpy(&entry,
347  CommitTsCtl->shared->page_buffer[slotno] +
348  SizeOfCommitTimestampEntry * entryno,
350 
351  *ts = entry.time;
352  if (nodeid)
353  *nodeid = entry.nodeid;
354 
355  LWLockRelease(CommitTsSLRULock);
356  return *ts != 0;
357 }
358 
359 /*
360  * Return the Xid of the latest committed transaction. (As far as this module
361  * is concerned, anyway; it's up to the caller to ensure the value is useful
362  * for its purposes.)
363  *
364  * ts and nodeid are filled with the corresponding data; they can be passed
365  * as NULL if not wanted.
366  */
369 {
370  TransactionId xid;
371 
372  LWLockAcquire(CommitTsLock, LW_SHARED);
373 
374  /* Error if module not enabled */
375  if (!commitTsShared->commitTsActive)
377 
378  xid = commitTsShared->xidLastCommit;
379  if (ts)
380  *ts = commitTsShared->dataLastCommit.time;
381  if (nodeid)
382  *nodeid = commitTsShared->dataLastCommit.nodeid;
383  LWLockRelease(CommitTsLock);
384 
385  return xid;
386 }
387 
388 static void
390 {
391  ereport(ERROR,
392  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
393  errmsg("could not get commit timestamp data"),
395  errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
396  "track_commit_timestamp") :
397  errhint("Make sure the configuration parameter \"%s\" is set.",
398  "track_commit_timestamp")));
399 }
400 
401 /*
402  * SQL-callable wrapper to obtain commit time of a transaction
403  */
404 Datum
406 {
408  TimestampTz ts;
409  bool found;
410 
411  found = TransactionIdGetCommitTsData(xid, &ts, NULL);
412 
413  if (!found)
414  PG_RETURN_NULL();
415 
417 }
418 
419 
420 /*
421  * pg_last_committed_xact
422  *
423  * SQL-callable wrapper to obtain some information about the latest
424  * committed transaction: transaction ID, timestamp and replication
425  * origin.
426  */
427 Datum
429 {
430  TransactionId xid;
432  TimestampTz ts;
433  Datum values[3];
434  bool nulls[3];
435  TupleDesc tupdesc;
436  HeapTuple htup;
437 
438  /* and construct a tuple with our data */
439  xid = GetLatestCommitTsData(&ts, &nodeid);
440 
441  /*
442  * Construct a tuple descriptor for the result row. This must match this
443  * function's pg_proc entry!
444  */
445  tupdesc = CreateTemplateTupleDesc(3);
446  TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
447  XIDOID, -1, 0);
448  TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
449  TIMESTAMPTZOID, -1, 0);
450  TupleDescInitEntry(tupdesc, (AttrNumber) 3, "roident",
451  OIDOID, -1, 0);
452  tupdesc = BlessTupleDesc(tupdesc);
453 
454  if (!TransactionIdIsNormal(xid))
455  {
456  memset(nulls, true, sizeof(nulls));
457  }
458  else
459  {
460  values[0] = TransactionIdGetDatum(xid);
461  nulls[0] = false;
462 
463  values[1] = TimestampTzGetDatum(ts);
464  nulls[1] = false;
465 
466  values[2] = ObjectIdGetDatum((Oid) nodeid);
467  nulls[2] = false;
468  }
469 
470  htup = heap_form_tuple(tupdesc, values, nulls);
471 
473 }
474 
475 /*
476  * pg_xact_commit_timestamp_origin
477  *
478  * SQL-callable wrapper to obtain commit timestamp and replication origin
479  * of a given transaction.
480  */
481 Datum
483 {
486  TimestampTz ts;
487  Datum values[2];
488  bool nulls[2];
489  TupleDesc tupdesc;
490  HeapTuple htup;
491  bool found;
492 
493  found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
494 
495  /*
496  * Construct a tuple descriptor for the result row. This must match this
497  * function's pg_proc entry!
498  */
499  tupdesc = CreateTemplateTupleDesc(2);
500  TupleDescInitEntry(tupdesc, (AttrNumber) 1, "timestamp",
501  TIMESTAMPTZOID, -1, 0);
502  TupleDescInitEntry(tupdesc, (AttrNumber) 2, "roident",
503  OIDOID, -1, 0);
504  tupdesc = BlessTupleDesc(tupdesc);
505 
506  if (!found)
507  {
508  memset(nulls, true, sizeof(nulls));
509  }
510  else
511  {
512  values[0] = TimestampTzGetDatum(ts);
513  nulls[0] = false;
514 
515  values[1] = ObjectIdGetDatum((Oid) nodeid);
516  nulls[1] = false;
517  }
518 
519  htup = heap_form_tuple(tupdesc, values, nulls);
520 
522 }
523 
524 /*
525  * Number of shared CommitTS buffers.
526  *
527  * We use a very similar logic as for the number of CLOG buffers; see comments
528  * in CLOGShmemBuffers.
529  */
530 Size
532 {
533  return Min(16, Max(4, NBuffers / 1024));
534 }
535 
536 /*
537  * Shared memory sizing for CommitTs
538  */
539 Size
541 {
543  sizeof(CommitTimestampShared);
544 }
545 
546 /*
547  * Initialize CommitTs at system startup (postmaster start or standalone
548  * backend)
549  */
550 void
552 {
553  bool found;
554 
555  CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
557  CommitTsSLRULock, "pg_commit_ts",
561 
562  commitTsShared = ShmemInitStruct("CommitTs shared",
563  sizeof(CommitTimestampShared),
564  &found);
565 
566  if (!IsUnderPostmaster)
567  {
568  Assert(!found);
569 
570  commitTsShared->xidLastCommit = InvalidTransactionId;
571  TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
572  commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
573  commitTsShared->commitTsActive = false;
574  }
575  else
576  Assert(found);
577 }
578 
579 /*
580  * This function must be called ONCE on system install.
581  *
582  * (The CommitTs directory is assumed to have been created by initdb, and
583  * CommitTsShmemInit must have been called already.)
584  */
585 void
587 {
588  /*
589  * Nothing to do here at present, unlike most other SLRU modules; segments
590  * are created when the server is started with this module enabled. See
591  * ActivateCommitTs.
592  */
593 }
594 
595 /*
596  * Initialize (or reinitialize) a page of CommitTs to zeroes.
597  * If writeXlog is true, also emit an XLOG record saying we did this.
598  *
599  * The page is not actually written, just set up in shared memory.
600  * The slot number of the new page is returned.
601  *
602  * Control lock must be held at entry, and will be held at exit.
603  */
604 static int
605 ZeroCommitTsPage(int pageno, bool writeXlog)
606 {
607  int slotno;
608 
609  slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
610 
611  if (writeXlog)
612  WriteZeroPageXlogRec(pageno);
613 
614  return slotno;
615 }
616 
617 /*
618  * This must be called ONCE during postmaster or standalone-backend startup,
619  * after StartupXLOG has initialized ShmemVariableCache->nextXid.
620  */
621 void
623 {
625 }
626 
627 /*
628  * This must be called ONCE during postmaster or standalone-backend startup,
629  * after recovery has finished.
630  */
631 void
633 {
634  /*
635  * If the feature is not enabled, turn it off for good. This also removes
636  * any leftover data.
637  *
638  * Conversely, we activate the module if the feature is enabled. This is
639  * necessary for primary and standby as the activation depends on the
640  * control file contents at the beginning of recovery or when a
641  * XLOG_PARAMETER_CHANGE is replayed.
642  */
645  else
647 }
648 
649 /*
650  * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
651  * XLog record during recovery.
652  */
653 void
654 CommitTsParameterChange(bool newvalue, bool oldvalue)
655 {
656  /*
657  * If the commit_ts module is disabled in this server and we get word from
658  * the primary server that it is enabled there, activate it so that we can
659  * replay future WAL records involving it; also mark it as active on
660  * pg_control. If the old value was already set, we already did this, so
661  * don't do anything.
662  *
663  * If the module is disabled in the primary, disable it here too, unless
664  * the module is enabled locally.
665  *
666  * Note this only runs in the recovery process, so an unlocked read is
667  * fine.
668  */
669  if (newvalue)
670  {
671  if (!commitTsShared->commitTsActive)
673  }
674  else if (commitTsShared->commitTsActive)
676 }
677 
678 /*
679  * Activate this module whenever necessary.
680  * This must happen during postmaster or standalone-backend startup,
681  * or during WAL replay anytime the track_commit_timestamp setting is
682  * changed in the primary.
683  *
684  * The reason why this SLRU needs separate activation/deactivation functions is
685  * that it can be enabled/disabled during start and the activation/deactivation
686  * on the primary is propagated to the standby via replay. Other SLRUs don't
687  * have this property and they can be just initialized during normal startup.
688  *
689  * This is in charge of creating the currently active segment, if it's not
690  * already there. The reason for this is that the server might have been
691  * running with this module disabled for a while and thus might have skipped
692  * the normal creation point.
693  */
694 static void
696 {
697  TransactionId xid;
698  int pageno;
699 
700  /* If we've done this already, there's nothing to do */
701  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
702  if (commitTsShared->commitTsActive)
703  {
704  LWLockRelease(CommitTsLock);
705  return;
706  }
707  LWLockRelease(CommitTsLock);
708 
710  pageno = TransactionIdToCTsPage(xid);
711 
712  /*
713  * Re-Initialize our idea of the latest page number.
714  */
715  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
716  CommitTsCtl->shared->latest_page_number = pageno;
717  LWLockRelease(CommitTsSLRULock);
718 
719  /*
720  * If CommitTs is enabled, but it wasn't in the previous server run, we
721  * need to set the oldest and newest values to the next Xid; that way, we
722  * will not try to read data that might not have been set.
723  *
724  * XXX does this have a problem if a server is started with commitTs
725  * enabled, then started with commitTs disabled, then restarted with it
726  * enabled again? It doesn't look like it does, because there should be a
727  * checkpoint that sets the value to InvalidTransactionId at end of
728  * recovery; and so any chance of injecting new transactions without
729  * CommitTs values would occur after the oldestCommitTsXid has been set to
730  * Invalid temporarily.
731  */
732  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
734  {
737  }
738  LWLockRelease(CommitTsLock);
739 
740  /* Create the current segment file, if necessary */
742  {
743  int slotno;
744 
745  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
746  slotno = ZeroCommitTsPage(pageno, false);
748  Assert(!CommitTsCtl->shared->page_dirty[slotno]);
749  LWLockRelease(CommitTsSLRULock);
750  }
751 
752  /* Change the activation status in shared memory. */
753  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
754  commitTsShared->commitTsActive = true;
755  LWLockRelease(CommitTsLock);
756 }
757 
758 /*
759  * Deactivate this module.
760  *
761  * This must be called when the track_commit_timestamp parameter is turned off.
762  * This happens during postmaster or standalone-backend startup, or during WAL
763  * replay.
764  *
765  * Resets CommitTs into invalid state to make sure we don't hand back
766  * possibly-invalid data; also removes segments of old data.
767  */
768 static void
770 {
771  /*
772  * Cleanup the status in the shared memory.
773  *
774  * We reset everything in the commitTsShared record to prevent user from
775  * getting confusing data about last committed transaction on the standby
776  * when the module was activated repeatedly on the primary.
777  */
778  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
779 
780  commitTsShared->commitTsActive = false;
781  commitTsShared->xidLastCommit = InvalidTransactionId;
782  TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
783  commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
784 
787 
788  LWLockRelease(CommitTsLock);
789 
790  /*
791  * Remove *all* files. This is necessary so that there are no leftover
792  * files; in the case where this feature is later enabled after running
793  * with it disabled for some time there may be a gap in the file sequence.
794  * (We can probably tolerate out-of-sequence files, as they are going to
795  * be overwritten anyway when we wrap around, but it seems better to be
796  * tidy.)
797  */
798  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
800  LWLockRelease(CommitTsSLRULock);
801 }
802 
803 /*
804  * Perform a checkpoint --- either during shutdown, or on-the-fly
805  */
806 void
808 {
809  /*
810  * Write dirty CommitTs pages to disk. This may result in sync requests
811  * queued for later handling by ProcessSyncRequests(), as part of the
812  * checkpoint.
813  */
815 }
816 
817 /*
818  * Make sure that CommitTs has room for a newly-allocated XID.
819  *
820  * NB: this is called while holding XidGenLock. We want it to be very fast
821  * most of the time; even when it's not so fast, no actual I/O need happen
822  * unless we're forced to write out a dirty CommitTs or xlog page to make room
823  * in shared memory.
824  *
825  * NB: the current implementation relies on track_commit_timestamp being
826  * PGC_POSTMASTER.
827  */
828 void
830 {
831  int pageno;
832 
833  /*
834  * Nothing to do if module not enabled. Note we do an unlocked read of
835  * the flag here, which is okay because this routine is only called from
836  * GetNewTransactionId, which is never called in a standby.
837  */
838  Assert(!InRecovery);
839  if (!commitTsShared->commitTsActive)
840  return;
841 
842  /*
843  * No work except at first XID of a page. But beware: just after
844  * wraparound, the first XID of page zero is FirstNormalTransactionId.
845  */
846  if (TransactionIdToCTsEntry(newestXact) != 0 &&
848  return;
849 
850  pageno = TransactionIdToCTsPage(newestXact);
851 
852  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
853 
854  /* Zero the page and make an XLOG entry about it */
855  ZeroCommitTsPage(pageno, !InRecovery);
856 
857  LWLockRelease(CommitTsSLRULock);
858 }
859 
860 /*
861  * Remove all CommitTs segments before the one holding the passed
862  * transaction ID.
863  *
864  * Note that we don't need to flush XLOG here.
865  */
866 void
868 {
869  int cutoffPage;
870 
871  /*
872  * The cutoff point is the start of the segment containing oldestXact. We
873  * pass the *page* containing oldestXact to SimpleLruTruncate.
874  */
875  cutoffPage = TransactionIdToCTsPage(oldestXact);
876 
877  /* Check to see if there's any files that could be removed */
879  &cutoffPage))
880  return; /* nothing to remove */
881 
882  /* Write XLOG record */
883  WriteTruncateXlogRec(cutoffPage, oldestXact);
884 
885  /* Now we can remove the old CommitTs segment(s) */
886  SimpleLruTruncate(CommitTsCtl, cutoffPage);
887 }
888 
889 /*
890  * Set the limit values between which commit TS can be consulted.
891  */
892 void
894 {
895  /*
896  * Be careful not to overwrite values that are either further into the
897  * "future" or signal a disabled committs.
898  */
899  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
901  {
906  }
907  else
908  {
912  }
913  LWLockRelease(CommitTsLock);
914 }
915 
916 /*
917  * Move forwards the oldest commitTS value that can be consulted
918  */
919 void
921 {
922  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
926  LWLockRelease(CommitTsLock);
927 }
928 
929 
930 /*
931  * Decide whether a commitTS page number is "older" for truncation purposes.
932  * Analogous to CLOGPagePrecedes().
933  *
934  * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128. This
935  * introduces differences compared to CLOG and the other SLRUs having (1 <<
936  * 31) % per_page == 0. This function never tests exactly
937  * TransactionIdPrecedes(x-2^31, x). When the system reaches xidStopLimit,
938  * there are two possible counts of page boundaries between oldestXact and the
939  * latest XID assigned, depending on whether oldestXact is within the first
940  * 128 entries of its page. Since this function doesn't know the location of
941  * oldestXact within page2, it returns false for one page that actually is
942  * expendable. This is a wider (yet still negligible) version of the
943  * truncation opportunity that CLOGPagePrecedes() cannot recognize.
944  *
945  * For the sake of a worked example, number entries with decimal values such
946  * that page1==1 entries range from 1.0 to 1.999. Let N+0.15 be the number of
947  * pages that 2^31 entries will span (N is an integer). If oldestXact=N+2.1,
948  * then the final safe XID assignment leaves newestXact=1.95. We keep page 2,
949  * because entry=2.85 is the border that toggles whether entries precede the
950  * last entry of the oldestXact page. While page 2 is expendable at
951  * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
952  */
953 static bool
954 CommitTsPagePrecedes(int page1, int page2)
955 {
956  TransactionId xid1;
957  TransactionId xid2;
958 
959  xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
960  xid1 += FirstNormalTransactionId + 1;
961  xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
962  xid2 += FirstNormalTransactionId + 1;
963 
964  return (TransactionIdPrecedes(xid1, xid2) &&
966 }
967 
968 
969 /*
970  * Write a ZEROPAGE xlog record
971  */
972 static void
974 {
975  XLogBeginInsert();
976  XLogRegisterData((char *) (&pageno), sizeof(int));
977  (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
978 }
979 
980 /*
981  * Write a TRUNCATE xlog record
982  */
983 static void
984 WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
985 {
986  xl_commit_ts_truncate xlrec;
987 
988  xlrec.pageno = pageno;
989  xlrec.oldestXid = oldestXid;
990 
991  XLogBeginInsert();
992  XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
993  (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
994 }
995 
996 /*
997  * Write a SETTS xlog record
998  */
999 static void
1003 {
1004  xl_commit_ts_set record;
1005 
1006  record.timestamp = timestamp;
1007  record.nodeid = nodeid;
1008  record.mainxid = mainxid;
1009 
1010  XLogBeginInsert();
1011  XLogRegisterData((char *) &record,
1012  offsetof(xl_commit_ts_set, mainxid) +
1013  sizeof(TransactionId));
1014  XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
1015  XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
1016 }
1017 
1018 /*
1019  * CommitTS resource manager's routines
1020  */
1021 void
1023 {
1024  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
1025 
1026  /* Backup blocks are not used in commit_ts records */
1027  Assert(!XLogRecHasAnyBlockRefs(record));
1028 
1029  if (info == COMMIT_TS_ZEROPAGE)
1030  {
1031  int pageno;
1032  int slotno;
1033 
1034  memcpy(&pageno, XLogRecGetData(record), sizeof(int));
1035 
1036  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
1037 
1038  slotno = ZeroCommitTsPage(pageno, false);
1040  Assert(!CommitTsCtl->shared->page_dirty[slotno]);
1041 
1042  LWLockRelease(CommitTsSLRULock);
1043  }
1044  else if (info == COMMIT_TS_TRUNCATE)
1045  {
1047 
1049 
1050  /*
1051  * During XLOG replay, latest_page_number isn't set up yet; insert a
1052  * suitable value to bypass the sanity test in SimpleLruTruncate.
1053  */
1054  CommitTsCtl->shared->latest_page_number = trunc->pageno;
1055 
1057  }
1058  else if (info == COMMIT_TS_SETTS)
1059  {
1060  xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
1061  int nsubxids;
1062  TransactionId *subxids;
1063 
1064  nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
1065  sizeof(TransactionId));
1066  if (nsubxids > 0)
1067  {
1068  subxids = palloc(sizeof(TransactionId) * nsubxids);
1069  memcpy(subxids,
1071  sizeof(TransactionId) * nsubxids);
1072  }
1073  else
1074  subxids = NULL;
1075 
1076  TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
1077  setts->timestamp, setts->nodeid, true);
1078  if (subxids)
1079  pfree(subxids);
1080  }
1081  else
1082  elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1083 }
1084 
1085 /*
1086  * Entrypoint for sync.c to sync commit_ts files.
1087  */
1088 int
1089 committssyncfiletag(const FileTag *ftag, char *path)
1090 {
1091  return SlruSyncFileTag(CommitTsCtl, ftag, path);
1092 }
#define COMMIT_TS_ZEROPAGE
Definition: commit_ts.h:51
CommitTimestampEntry dataLastCommit
Definition: commit_ts.c:95
Datum pg_last_committed_xact(PG_FUNCTION_ARGS)
Definition: commit_ts.c:428
int errhint(const char *fmt,...)
Definition: elog.c:1162
#define COMMIT_TS_SETTS
Definition: commit_ts.h:53
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
int SlruSyncFileTag(SlruCtl ctl, const FileTag *ftag, char *path)
Definition: slru.c:1592
uint32 TransactionId
Definition: c.h:575
void SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
Definition: commit_ts.c:893
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1529
static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid)
Definition: commit_ts.c:1000
#define SizeOfCommitTimestampEntry
Definition: commit_ts.c:63
int64 timestamp
int64 TimestampTz
Definition: timestamp.h:39
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, RepOriginId nodeid, int pageno)
Definition: commit_ts.c:231
#define CommitTsCtl
Definition: commit_ts.c:79
static int ZeroCommitTsPage(int pageno, bool writeXlog)
Definition: commit_ts.c:605
int committssyncfiletag(const FileTag *ftag, char *path)
Definition: commit_ts.c:1089
void SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
Definition: slru.c:1225
bool InRecovery
Definition: xlog.c:206
#define Min(x, y)
Definition: c.h:974
unsigned char uint8
Definition: c.h:427
uint16 RepOriginId
Definition: xlogdefs.h:58
void StartupCommitTs(void)
Definition: commit_ts.c:622
int errcode(int sqlerrcode)
Definition: elog.c:704
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
unsigned int Oid
Definition: postgres_ext.h:31
bool RecoveryInProgress(void)
Definition: xlog.c:8148
static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
Definition: commit_ts.c:984
Datum pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
Definition: commit_ts.c:405
FullTransactionId nextXid
Definition: transam.h:213
#define PANIC
Definition: elog.h:55
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:155
#define PG_RETURN_TIMESTAMPTZ(x)
Definition: timestamp.h:40
Size CommitTsShmemBuffers(void)
Definition: commit_ts.c:531
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id, SyncRequestHandler sync_handler)
Definition: slru.c:186
void AdvanceOldestCommitTsXid(TransactionId oldestXact)
Definition: commit_ts.c:920
TimestampTz timestamp
Definition: commit_ts.h:57
#define XidFromFullTransactionId(x)
Definition: transam.h:48
void CompleteCommitTsInitialization(void)
Definition: commit_ts.c:632
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1808
void pfree(void *pointer)
Definition: mcxt.c:1057
#define XLogRecGetData(decoder)
Definition: xlogreader.h:310
#define FirstNormalTransactionId
Definition: transam.h:34
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, RepOriginId nodeid, int slotno)
Definition: commit_ts.c:257
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:45
#define XLogRecGetDataLen(decoder)
Definition: xlogreader.h:311
#define COMMIT_TS_XACTS_PER_PAGE
Definition: commit_ts.c:66
void CommitTsParameterChange(bool newvalue, bool oldvalue)
Definition: commit_ts.c:654
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid)
Definition: slru.c:394
TupleDesc BlessTupleDesc(TupleDesc tupdesc)
Definition: execTuples.c:2052
bool track_commit_timestamp
Definition: commit_ts.c:103
#define SizeOfCommitTsSet
Definition: commit_ts.h:63
#define PG_GETARG_TRANSACTIONID(n)
Definition: fmgr.h:279
bool IsUnderPostmaster
Definition: globals.c:110
void commit_ts_redo(XLogReaderState *record)
Definition: commit_ts.c:1022
VariableCache ShmemVariableCache
Definition: varsup.c:34
void SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
Definition: slru.c:1155
#define InvalidTransactionId
Definition: transam.h:31
TransactionId oldestXid
Definition: commit_ts.h:69
#define TIMESTAMP_NOBEGIN(j)
Definition: timestamp.h:112
static void ActivateCommitTs(void)
Definition: commit_ts.c:695
bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int pageno)
Definition: slru.c:625
void SimpleLruWritePage(SlruCtl ctl, int slotno)
Definition: slru.c:613
static void WriteZeroPageXlogRec(int pageno)
Definition: commit_ts.c:973
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:603
struct CommitTimestampEntry CommitTimestampEntry
TransactionId oldestCommitTsXid
Definition: transam.h:225
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:305
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
void TruncateCommitTs(TransactionId oldestXact)
Definition: commit_ts.c:867
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:330
TimestampTz time
Definition: commit_ts.c:59
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:422
#define TransactionIdGetDatum(X)
Definition: postgres.h:521
bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1499
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:282
#define TransactionIdToCTsPage(xid)
Definition: commit_ts.c:69
uintptr_t Datum
Definition: postgres.h:367
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:353
#define COMMIT_TS_TRUNCATE
Definition: commit_ts.h:52
static bool CommitTsPagePrecedes(int page1, int page2)
Definition: commit_ts.c:954
TransactionId xidLastCommit
Definition: commit_ts.c:94
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
Definition: slru.c:494
#define ereport(elevel,...)
Definition: elog.h:155
void CheckPointCommitTs(void)
Definition: commit_ts.c:807
#define Max(x, y)
Definition: c.h:968
TransactionId mainxid
Definition: commit_ts.h:59
#define Assert(condition)
Definition: c.h:792
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
void ExtendCommitTs(TransactionId newestXact)
Definition: commit_ts.c:829
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1552
TransactionId newestCommitTsXid
Definition: transam.h:226
size_t Size
Definition: c.h:528
Size CommitTsShmemSize(void)
Definition: commit_ts.c:540
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1206
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:221
#define TransactionIdToCTsEntry(xid)
Definition: commit_ts.c:71
static SlruCtlData CommitTsCtlData
Definition: commit_ts.c:77
#define InvalidRepOriginId
Definition: origin.h:33
static TransactionId ReadNewTransactionId(void)
Definition: transam.h:308
static void DeactivateCommitTs(void)
Definition: commit_ts.c:769
static Datum values[MAXATTR]
Definition: bootstrap.c:165
void * palloc(Size size)
Definition: mcxt.c:950
int errmsg(const char *fmt,...)
Definition: elog.c:915
Datum pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
Definition: commit_ts.c:482
RepOriginId nodeid
Definition: commit_ts.h:58
#define elog(elevel,...)
Definition: elog.h:228
int i
int NBuffers
Definition: globals.c:133
#define SlruPagePrecedesUnitTests(ctl, per_page)
Definition: slru.h:156
#define XLogRecHasAnyBlockRefs(decoder)
Definition: xlogreader.h:312
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
#define TransactionIdIsValid(xid)
Definition: transam.h:41
void BootStrapCommitTs(void)
Definition: commit_ts.c:586
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
RepOriginId nodeid
Definition: commit_ts.c:60
void XLogBeginInsert(void)
Definition: xloginsert.c:123
void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid, bool write_xlog)
Definition: commit_ts.c:145
CommitTimestampShared * commitTsShared
Definition: commit_ts.c:99
int16 AttrNumber
Definition: attnum.h:21
static void error_commit_ts_disabled(void)
Definition: commit_ts.c:389
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
Definition: slru.c:279
#define SizeOfCommitTsTruncate
Definition: commit_ts.h:72
#define PG_RETURN_NULL()
Definition: fmgr.h:345
Definition: sync.h:50
#define offsetof(type, field)
Definition: c.h:715
TransactionId GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:368
void CommitTsShmemInit(void)
Definition: commit_ts.c:551
struct CommitTimestampShared CommitTimestampShared