PostgreSQL Source Code  git master
commit_ts.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * commit_ts.c
4  * PostgreSQL commit timestamp manager
5  *
6  * This module is a pg_xact-like system that stores the commit timestamp
7  * for each transaction.
8  *
9  * XLOG interactions: this module generates an XLOG record whenever a new
10  * CommitTs page is initialized to zeroes. Also, one XLOG record is
11  * generated for setting of values when the caller requests it; this allows
12  * us to support values coming from places other than transaction commit.
13  * Other writes of CommitTS come from recording of transaction commit in
14  * xact.c, which generates its own XLOG records for these events and will
15  * re-perform the status update on redo; so we need make no additional XLOG
16  * entry here.
17  *
18  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
19  * Portions Copyright (c) 1994, Regents of the University of California
20  *
21  * src/backend/access/transam/commit_ts.c
22  *
23  *-------------------------------------------------------------------------
24  */
25 #include "postgres.h"
26 
27 #include "access/commit_ts.h"
28 #include "access/htup_details.h"
29 #include "access/slru.h"
30 #include "access/transam.h"
31 #include "catalog/pg_type.h"
32 #include "funcapi.h"
33 #include "miscadmin.h"
34 #include "pg_trace.h"
35 #include "storage/shmem.h"
36 #include "utils/builtins.h"
37 #include "utils/snapmgr.h"
38 #include "utils/timestamp.h"
39 
40 /*
41  * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
42  * everywhere else in Postgres.
43  *
44  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
45  * CommitTs page numbering also wraps around at
46  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
47  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
48  * explicit notice of that fact in this module, except when comparing segment
49  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
50  */
51 
52 /*
53  * We need 8+2 bytes per xact. Note that enlarging this struct might mean
54  * the largest possible file name is more than 5 chars long; see
55  * SlruScanDirectory.
56  */
57 typedef struct CommitTimestampEntry
58 {
62 
63 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
64  sizeof(RepOriginId))
65 
66 #define COMMIT_TS_XACTS_PER_PAGE \
67  (BLCKSZ / SizeOfCommitTimestampEntry)
68 
69 #define TransactionIdToCTsPage(xid) \
70  ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
71 #define TransactionIdToCTsEntry(xid) \
72  ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
73 
74 /*
75  * Link to shared-memory data structures for CommitTs control
76  */
78 
79 #define CommitTsCtl (&CommitTsCtlData)
80 
81 /*
82  * We keep a cache of the last value set in shared memory.
83  *
84  * This is also good place to keep the activation status. We keep this
85  * separate from the GUC so that the standby can activate the module if the
86  * primary has it active independently of the value of the GUC.
87  *
88  * This is protected by CommitTsLock. In some places, we use commitTsActive
89  * without acquiring the lock; where this happens, a comment explains the
90  * rationale for it.
91  */
92 typedef struct CommitTimestampShared
93 {
98 
100 
101 
102 /* GUC variable */
104 
105 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
106  TransactionId *subxids, TimestampTz ts,
107  RepOriginId nodeid, int pageno);
109  RepOriginId nodeid, int slotno);
110 static void error_commit_ts_disabled(void);
111 static int ZeroCommitTsPage(int pageno, bool writeXlog);
112 static bool CommitTsPagePrecedes(int page1, int page2);
113 static void ActivateCommitTs(void);
114 static void DeactivateCommitTs(void);
115 static void WriteZeroPageXlogRec(int pageno);
116 static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid);
117 static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
120 
121 /*
122  * TransactionTreeSetCommitTsData
123  *
124  * Record the final commit timestamp of transaction entries in the commit log
125  * for a transaction and its subtransaction tree, as efficiently as possible.
126  *
127  * xid is the top level transaction id.
128  *
129  * subxids is an array of xids of length nsubxids, representing subtransactions
130  * in the tree of xid. In various cases nsubxids may be zero.
131  * The reason why tracking just the parent xid commit timestamp is not enough
132  * is that the subtrans SLRU does not stay valid across crashes (it's not
133  * permanent) so we need to keep the information about them here. If the
134  * subtrans implementation changes in the future, we might want to revisit the
135  * decision of storing timestamp info for each subxid.
136  *
137  * The write_xlog parameter tells us whether to include an XLog record of this
138  * or not. Normally, this is called from transaction commit routines (both
139  * normal and prepared) and the information will be stored in the transaction
140  * commit XLog record, and so they should pass "false" for this. The XLog redo
141  * code should use "false" here as well. Other callers probably want to pass
142  * true, so that the given values persist in case of crashes.
143  */
144 void
147  RepOriginId nodeid, bool write_xlog)
148 {
149  int i;
150  TransactionId headxid;
151  TransactionId newestXact;
152 
153  /*
154  * No-op if the module is not active.
155  *
156  * An unlocked read here is fine, because in a standby (the only place
157  * where the flag can change in flight) this routine is only called by the
158  * recovery process, which is also the only process which can change the
159  * flag.
160  */
161  if (!commitTsShared->commitTsActive)
162  return;
163 
164  /*
165  * Comply with the WAL-before-data rule: if caller specified it wants this
166  * value to be recorded in WAL, do so before touching the data.
167  */
168  if (write_xlog)
169  WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
170 
171  /*
172  * Figure out the latest Xid in this batch: either the last subxid if
173  * there's any, otherwise the parent xid.
174  */
175  if (nsubxids > 0)
176  newestXact = subxids[nsubxids - 1];
177  else
178  newestXact = xid;
179 
180  /*
181  * We split the xids to set the timestamp to in groups belonging to the
182  * same SLRU page; the first element in each such set is its head. The
183  * first group has the main XID as the head; subsequent sets use the first
184  * subxid not on the previous page as head. This way, we only have to
185  * lock/modify each SLRU page once.
186  */
187  for (i = 0, headxid = xid;;)
188  {
189  int pageno = TransactionIdToCTsPage(headxid);
190  int j;
191 
192  for (j = i; j < nsubxids; j++)
193  {
194  if (TransactionIdToCTsPage(subxids[j]) != pageno)
195  break;
196  }
197  /* subxids[i..j] are on the same page as the head */
198 
199  SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
200  pageno);
201 
202  /* if we wrote out all subxids, we're done. */
203  if (j + 1 >= nsubxids)
204  break;
205 
206  /*
207  * Set the new head and skip over it, as well as over the subxids we
208  * just wrote.
209  */
210  headxid = subxids[j];
211  i += j - i + 1;
212  }
213 
214  /* update the cached value in shared memory */
215  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
216  commitTsShared->xidLastCommit = xid;
217  commitTsShared->dataLastCommit.time = timestamp;
218  commitTsShared->dataLastCommit.nodeid = nodeid;
219 
220  /* and move forwards our endpoint, if needed */
223  LWLockRelease(CommitTsLock);
224 }
225 
226 /*
227  * Record the commit timestamp of transaction entries in the commit log for all
228  * entries on a single page. Atomic only on this page.
229  */
230 static void
232  TransactionId *subxids, TimestampTz ts,
233  RepOriginId nodeid, int pageno)
234 {
235  int slotno;
236  int i;
237 
238  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
239 
240  slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
241 
242  TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
243  for (i = 0; i < nsubxids; i++)
244  TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
245 
246  CommitTsCtl->shared->page_dirty[slotno] = true;
247 
248  LWLockRelease(CommitTsSLRULock);
249 }
250 
251 /*
252  * Sets the commit timestamp of a single transaction.
253  *
254  * Must be called with CommitTsSLRULock held
255  */
256 static void
258  RepOriginId nodeid, int slotno)
259 {
260  int entryno = TransactionIdToCTsEntry(xid);
261  CommitTimestampEntry entry;
262 
264 
265  entry.time = ts;
266  entry.nodeid = nodeid;
267 
268  memcpy(CommitTsCtl->shared->page_buffer[slotno] +
269  SizeOfCommitTimestampEntry * entryno,
271 }
272 
273 /*
274  * Interrogate the commit timestamp of a transaction.
275  *
276  * The return value indicates whether a commit timestamp record was found for
277  * the given xid. The timestamp value is returned in *ts (which may not be
278  * null), and the origin node for the Xid is returned in *nodeid, if it's not
279  * null.
280  */
281 bool
284 {
285  int pageno = TransactionIdToCTsPage(xid);
286  int entryno = TransactionIdToCTsEntry(xid);
287  int slotno;
288  CommitTimestampEntry entry;
289  TransactionId oldestCommitTsXid;
290  TransactionId newestCommitTsXid;
291 
292  if (!TransactionIdIsValid(xid))
293  ereport(ERROR,
294  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
295  errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
296  else if (!TransactionIdIsNormal(xid))
297  {
298  /* frozen and bootstrap xids are always committed far in the past */
299  *ts = 0;
300  if (nodeid)
301  *nodeid = 0;
302  return false;
303  }
304 
305  LWLockAcquire(CommitTsLock, LW_SHARED);
306 
307  /* Error if module not enabled */
308  if (!commitTsShared->commitTsActive)
310 
311  /*
312  * If we're asked for the cached value, return that. Otherwise, fall
313  * through to read from SLRU.
314  */
315  if (commitTsShared->xidLastCommit == xid)
316  {
317  *ts = commitTsShared->dataLastCommit.time;
318  if (nodeid)
319  *nodeid = commitTsShared->dataLastCommit.nodeid;
320 
321  LWLockRelease(CommitTsLock);
322  return *ts != 0;
323  }
324 
325  oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
326  newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
327  /* neither is invalid, or both are */
328  Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
329  LWLockRelease(CommitTsLock);
330 
331  /*
332  * Return empty if the requested value is outside our valid range.
333  */
334  if (!TransactionIdIsValid(oldestCommitTsXid) ||
335  TransactionIdPrecedes(xid, oldestCommitTsXid) ||
336  TransactionIdPrecedes(newestCommitTsXid, xid))
337  {
338  *ts = 0;
339  if (nodeid)
340  *nodeid = InvalidRepOriginId;
341  return false;
342  }
343 
344  /* lock is acquired by SimpleLruReadPage_ReadOnly */
345  slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
346  memcpy(&entry,
347  CommitTsCtl->shared->page_buffer[slotno] +
348  SizeOfCommitTimestampEntry * entryno,
350 
351  *ts = entry.time;
352  if (nodeid)
353  *nodeid = entry.nodeid;
354 
355  LWLockRelease(CommitTsSLRULock);
356  return *ts != 0;
357 }
358 
359 /*
360  * Return the Xid of the latest committed transaction. (As far as this module
361  * is concerned, anyway; it's up to the caller to ensure the value is useful
362  * for its purposes.)
363  *
364  * ts and nodeid are filled with the corresponding data; they can be passed
365  * as NULL if not wanted.
366  */
369 {
370  TransactionId xid;
371 
372  LWLockAcquire(CommitTsLock, LW_SHARED);
373 
374  /* Error if module not enabled */
375  if (!commitTsShared->commitTsActive)
377 
378  xid = commitTsShared->xidLastCommit;
379  if (ts)
380  *ts = commitTsShared->dataLastCommit.time;
381  if (nodeid)
382  *nodeid = commitTsShared->dataLastCommit.nodeid;
383  LWLockRelease(CommitTsLock);
384 
385  return xid;
386 }
387 
388 static void
390 {
391  ereport(ERROR,
392  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
393  errmsg("could not get commit timestamp data"),
395  errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
396  "track_commit_timestamp") :
397  errhint("Make sure the configuration parameter \"%s\" is set.",
398  "track_commit_timestamp")));
399 }
400 
401 /*
402  * SQL-callable wrapper to obtain commit time of a transaction
403  */
404 Datum
406 {
408  TimestampTz ts;
409  bool found;
410 
411  found = TransactionIdGetCommitTsData(xid, &ts, NULL);
412 
413  if (!found)
414  PG_RETURN_NULL();
415 
417 }
418 
419 
420 /*
421  * pg_last_committed_xact
422  *
423  * SQL-callable wrapper to obtain some information about the latest
424  * committed transaction: transaction ID, timestamp and replication
425  * origin.
426  */
427 Datum
429 {
430  TransactionId xid;
432  TimestampTz ts;
433  Datum values[3];
434  bool nulls[3];
435  TupleDesc tupdesc;
436  HeapTuple htup;
437 
438  /* and construct a tuple with our data */
439  xid = GetLatestCommitTsData(&ts, &nodeid);
440 
441  /*
442  * Construct a tuple descriptor for the result row. This must match this
443  * function's pg_proc entry!
444  */
445  tupdesc = CreateTemplateTupleDesc(3);
446  TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
447  XIDOID, -1, 0);
448  TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
449  TIMESTAMPTZOID, -1, 0);
450  TupleDescInitEntry(tupdesc, (AttrNumber) 3, "roident",
451  OIDOID, -1, 0);
452  tupdesc = BlessTupleDesc(tupdesc);
453 
454  if (!TransactionIdIsNormal(xid))
455  {
456  memset(nulls, true, sizeof(nulls));
457  }
458  else
459  {
460  values[0] = TransactionIdGetDatum(xid);
461  nulls[0] = false;
462 
463  values[1] = TimestampTzGetDatum(ts);
464  nulls[1] = false;
465 
466  values[2] = ObjectIdGetDatum((Oid) nodeid);
467  nulls[2] = false;
468  }
469 
470  htup = heap_form_tuple(tupdesc, values, nulls);
471 
473 }
474 
475 /*
476  * pg_xact_commit_timestamp_origin
477  *
478  * SQL-callable wrapper to obtain commit timestamp and replication origin
479  * of a given transaction.
480  */
481 Datum
483 {
486  TimestampTz ts;
487  Datum values[2];
488  bool nulls[2];
489  TupleDesc tupdesc;
490  HeapTuple htup;
491  bool found;
492 
493  found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
494 
495  /*
496  * Construct a tuple descriptor for the result row. This must match this
497  * function's pg_proc entry!
498  */
499  tupdesc = CreateTemplateTupleDesc(2);
500  TupleDescInitEntry(tupdesc, (AttrNumber) 1, "timestamp",
501  TIMESTAMPTZOID, -1, 0);
502  TupleDescInitEntry(tupdesc, (AttrNumber) 2, "roident",
503  OIDOID, -1, 0);
504  tupdesc = BlessTupleDesc(tupdesc);
505 
506  if (!found)
507  {
508  memset(nulls, true, sizeof(nulls));
509  }
510  else
511  {
512  values[0] = TimestampTzGetDatum(ts);
513  nulls[0] = false;
514 
515  values[1] = ObjectIdGetDatum((Oid) nodeid);
516  nulls[1] = false;
517  }
518 
519  htup = heap_form_tuple(tupdesc, values, nulls);
520 
522 }
523 
524 /*
525  * Number of shared CommitTS buffers.
526  *
527  * We use a very similar logic as for the number of CLOG buffers; see comments
528  * in CLOGShmemBuffers.
529  */
530 Size
532 {
533  return Min(16, Max(4, NBuffers / 1024));
534 }
535 
536 /*
537  * Shared memory sizing for CommitTs
538  */
539 Size
541 {
543  sizeof(CommitTimestampShared);
544 }
545 
546 /*
547  * Initialize CommitTs at system startup (postmaster start or standalone
548  * backend)
549  */
550 void
552 {
553  bool found;
554 
555  CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
557  CommitTsSLRULock, "pg_commit_ts",
559 
560  commitTsShared = ShmemInitStruct("CommitTs shared",
561  sizeof(CommitTimestampShared),
562  &found);
563 
564  if (!IsUnderPostmaster)
565  {
566  Assert(!found);
567 
568  commitTsShared->xidLastCommit = InvalidTransactionId;
569  TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
570  commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
571  commitTsShared->commitTsActive = false;
572  }
573  else
574  Assert(found);
575 }
576 
577 /*
578  * This function must be called ONCE on system install.
579  *
580  * (The CommitTs directory is assumed to have been created by initdb, and
581  * CommitTsShmemInit must have been called already.)
582  */
583 void
585 {
586  /*
587  * Nothing to do here at present, unlike most other SLRU modules; segments
588  * are created when the server is started with this module enabled. See
589  * ActivateCommitTs.
590  */
591 }
592 
593 /*
594  * Initialize (or reinitialize) a page of CommitTs to zeroes.
595  * If writeXlog is true, also emit an XLOG record saying we did this.
596  *
597  * The page is not actually written, just set up in shared memory.
598  * The slot number of the new page is returned.
599  *
600  * Control lock must be held at entry, and will be held at exit.
601  */
602 static int
603 ZeroCommitTsPage(int pageno, bool writeXlog)
604 {
605  int slotno;
606 
607  slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
608 
609  if (writeXlog)
610  WriteZeroPageXlogRec(pageno);
611 
612  return slotno;
613 }
614 
615 /*
616  * This must be called ONCE during postmaster or standalone-backend startup,
617  * after StartupXLOG has initialized ShmemVariableCache->nextFullXid.
618  */
619 void
621 {
623 }
624 
625 /*
626  * This must be called ONCE during postmaster or standalone-backend startup,
627  * after recovery has finished.
628  */
629 void
631 {
632  /*
633  * If the feature is not enabled, turn it off for good. This also removes
634  * any leftover data.
635  *
636  * Conversely, we activate the module if the feature is enabled. This is
637  * necessary for primary and standby as the activation depends on the
638  * control file contents at the beginning of recovery or when a
639  * XLOG_PARAMETER_CHANGE is replayed.
640  */
643  else
645 }
646 
647 /*
648  * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
649  * XLog record during recovery.
650  */
651 void
652 CommitTsParameterChange(bool newvalue, bool oldvalue)
653 {
654  /*
655  * If the commit_ts module is disabled in this server and we get word from
656  * the primary server that it is enabled there, activate it so that we can
657  * replay future WAL records involving it; also mark it as active on
658  * pg_control. If the old value was already set, we already did this, so
659  * don't do anything.
660  *
661  * If the module is disabled in the primary, disable it here too, unless
662  * the module is enabled locally.
663  *
664  * Note this only runs in the recovery process, so an unlocked read is
665  * fine.
666  */
667  if (newvalue)
668  {
669  if (!commitTsShared->commitTsActive)
671  }
672  else if (commitTsShared->commitTsActive)
674 }
675 
676 /*
677  * Activate this module whenever necessary.
678  * This must happen during postmaster or standalone-backend startup,
679  * or during WAL replay anytime the track_commit_timestamp setting is
680  * changed in the primary.
681  *
682  * The reason why this SLRU needs separate activation/deactivation functions is
683  * that it can be enabled/disabled during start and the activation/deactivation
684  * on the primary is propagated to the standby via replay. Other SLRUs don't
685  * have this property and they can be just initialized during normal startup.
686  *
687  * This is in charge of creating the currently active segment, if it's not
688  * already there. The reason for this is that the server might have been
689  * running with this module disabled for a while and thus might have skipped
690  * the normal creation point.
691  */
692 static void
694 {
695  TransactionId xid;
696  int pageno;
697 
698  /* If we've done this already, there's nothing to do */
699  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
700  if (commitTsShared->commitTsActive)
701  {
702  LWLockRelease(CommitTsLock);
703  return;
704  }
705  LWLockRelease(CommitTsLock);
706 
708  pageno = TransactionIdToCTsPage(xid);
709 
710  /*
711  * Re-Initialize our idea of the latest page number.
712  */
713  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
714  CommitTsCtl->shared->latest_page_number = pageno;
715  LWLockRelease(CommitTsSLRULock);
716 
717  /*
718  * If CommitTs is enabled, but it wasn't in the previous server run, we
719  * need to set the oldest and newest values to the next Xid; that way, we
720  * will not try to read data that might not have been set.
721  *
722  * XXX does this have a problem if a server is started with commitTs
723  * enabled, then started with commitTs disabled, then restarted with it
724  * enabled again? It doesn't look like it does, because there should be a
725  * checkpoint that sets the value to InvalidTransactionId at end of
726  * recovery; and so any chance of injecting new transactions without
727  * CommitTs values would occur after the oldestCommitTsXid has been set to
728  * Invalid temporarily.
729  */
730  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
732  {
735  }
736  LWLockRelease(CommitTsLock);
737 
738  /* Create the current segment file, if necessary */
740  {
741  int slotno;
742 
743  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
744  slotno = ZeroCommitTsPage(pageno, false);
746  Assert(!CommitTsCtl->shared->page_dirty[slotno]);
747  LWLockRelease(CommitTsSLRULock);
748  }
749 
750  /* Change the activation status in shared memory. */
751  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
752  commitTsShared->commitTsActive = true;
753  LWLockRelease(CommitTsLock);
754 }
755 
756 /*
757  * Deactivate this module.
758  *
759  * This must be called when the track_commit_timestamp parameter is turned off.
760  * This happens during postmaster or standalone-backend startup, or during WAL
761  * replay.
762  *
763  * Resets CommitTs into invalid state to make sure we don't hand back
764  * possibly-invalid data; also removes segments of old data.
765  */
766 static void
768 {
769  /*
770  * Cleanup the status in the shared memory.
771  *
772  * We reset everything in the commitTsShared record to prevent user from
773  * getting confusing data about last committed transaction on the standby
774  * when the module was activated repeatedly on the primary.
775  */
776  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
777 
778  commitTsShared->commitTsActive = false;
779  commitTsShared->xidLastCommit = InvalidTransactionId;
780  TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
781  commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
782 
785 
786  LWLockRelease(CommitTsLock);
787 
788  /*
789  * Remove *all* files. This is necessary so that there are no leftover
790  * files; in the case where this feature is later enabled after running
791  * with it disabled for some time there may be a gap in the file sequence.
792  * (We can probably tolerate out-of-sequence files, as they are going to
793  * be overwritten anyway when we wrap around, but it seems better to be
794  * tidy.)
795  */
796  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
798  LWLockRelease(CommitTsSLRULock);
799 }
800 
801 /*
802  * This must be called ONCE during postmaster or standalone-backend shutdown
803  */
804 void
806 {
807  /* Flush dirty CommitTs pages to disk */
808  SimpleLruFlush(CommitTsCtl, false);
809 
810  /*
811  * fsync pg_commit_ts to ensure that any files flushed previously are
812  * durably on disk.
813  */
814  fsync_fname("pg_commit_ts", true);
815 }
816 
817 /*
818  * Perform a checkpoint --- either during shutdown, or on-the-fly
819  */
820 void
822 {
823  /* Flush dirty CommitTs pages to disk */
825 
826  /*
827  * fsync pg_commit_ts to ensure that any files flushed previously are
828  * durably on disk.
829  */
830  fsync_fname("pg_commit_ts", true);
831 }
832 
833 /*
834  * Make sure that CommitTs has room for a newly-allocated XID.
835  *
836  * NB: this is called while holding XidGenLock. We want it to be very fast
837  * most of the time; even when it's not so fast, no actual I/O need happen
838  * unless we're forced to write out a dirty CommitTs or xlog page to make room
839  * in shared memory.
840  *
841  * NB: the current implementation relies on track_commit_timestamp being
842  * PGC_POSTMASTER.
843  */
844 void
846 {
847  int pageno;
848 
849  /*
850  * Nothing to do if module not enabled. Note we do an unlocked read of
851  * the flag here, which is okay because this routine is only called from
852  * GetNewTransactionId, which is never called in a standby.
853  */
854  Assert(!InRecovery);
855  if (!commitTsShared->commitTsActive)
856  return;
857 
858  /*
859  * No work except at first XID of a page. But beware: just after
860  * wraparound, the first XID of page zero is FirstNormalTransactionId.
861  */
862  if (TransactionIdToCTsEntry(newestXact) != 0 &&
864  return;
865 
866  pageno = TransactionIdToCTsPage(newestXact);
867 
868  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
869 
870  /* Zero the page and make an XLOG entry about it */
871  ZeroCommitTsPage(pageno, !InRecovery);
872 
873  LWLockRelease(CommitTsSLRULock);
874 }
875 
876 /*
877  * Remove all CommitTs segments before the one holding the passed
878  * transaction ID.
879  *
880  * Note that we don't need to flush XLOG here.
881  */
882 void
884 {
885  int cutoffPage;
886 
887  /*
888  * The cutoff point is the start of the segment containing oldestXact. We
889  * pass the *page* containing oldestXact to SimpleLruTruncate.
890  */
891  cutoffPage = TransactionIdToCTsPage(oldestXact);
892 
893  /* Check to see if there's any files that could be removed */
895  &cutoffPage))
896  return; /* nothing to remove */
897 
898  /* Write XLOG record */
899  WriteTruncateXlogRec(cutoffPage, oldestXact);
900 
901  /* Now we can remove the old CommitTs segment(s) */
902  SimpleLruTruncate(CommitTsCtl, cutoffPage);
903 }
904 
905 /*
906  * Set the limit values between which commit TS can be consulted.
907  */
908 void
910 {
911  /*
912  * Be careful not to overwrite values that are either further into the
913  * "future" or signal a disabled committs.
914  */
915  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
917  {
922  }
923  else
924  {
928  }
929  LWLockRelease(CommitTsLock);
930 }
931 
932 /*
933  * Move forwards the oldest commitTS value that can be consulted
934  */
935 void
937 {
938  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
942  LWLockRelease(CommitTsLock);
943 }
944 
945 
946 /*
947  * Decide which of two commitTS page numbers is "older" for truncation
948  * purposes.
949  *
950  * We need to use comparison of TransactionIds here in order to do the right
951  * thing with wraparound XID arithmetic. However, if we are asked about
952  * page number zero, we don't want to hand InvalidTransactionId to
953  * TransactionIdPrecedes: it'll get weird about permanent xact IDs. So,
954  * offset both xids by FirstNormalTransactionId to avoid that.
955  */
956 static bool
957 CommitTsPagePrecedes(int page1, int page2)
958 {
959  TransactionId xid1;
960  TransactionId xid2;
961 
962  xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
963  xid1 += FirstNormalTransactionId;
964  xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
965  xid2 += FirstNormalTransactionId;
966 
967  return TransactionIdPrecedes(xid1, xid2);
968 }
969 
970 
971 /*
972  * Write a ZEROPAGE xlog record
973  */
974 static void
976 {
977  XLogBeginInsert();
978  XLogRegisterData((char *) (&pageno), sizeof(int));
979  (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
980 }
981 
982 /*
983  * Write a TRUNCATE xlog record
984  */
985 static void
986 WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
987 {
988  xl_commit_ts_truncate xlrec;
989 
990  xlrec.pageno = pageno;
991  xlrec.oldestXid = oldestXid;
992 
993  XLogBeginInsert();
994  XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
995  (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
996 }
997 
998 /*
999  * Write a SETTS xlog record
1000  */
1001 static void
1005 {
1006  xl_commit_ts_set record;
1007 
1008  record.timestamp = timestamp;
1009  record.nodeid = nodeid;
1010  record.mainxid = mainxid;
1011 
1012  XLogBeginInsert();
1013  XLogRegisterData((char *) &record,
1014  offsetof(xl_commit_ts_set, mainxid) +
1015  sizeof(TransactionId));
1016  XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
1017  XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
1018 }
1019 
1020 /*
1021  * CommitTS resource manager's routines
1022  */
1023 void
1025 {
1026  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
1027 
1028  /* Backup blocks are not used in commit_ts records */
1029  Assert(!XLogRecHasAnyBlockRefs(record));
1030 
1031  if (info == COMMIT_TS_ZEROPAGE)
1032  {
1033  int pageno;
1034  int slotno;
1035 
1036  memcpy(&pageno, XLogRecGetData(record), sizeof(int));
1037 
1038  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
1039 
1040  slotno = ZeroCommitTsPage(pageno, false);
1042  Assert(!CommitTsCtl->shared->page_dirty[slotno]);
1043 
1044  LWLockRelease(CommitTsSLRULock);
1045  }
1046  else if (info == COMMIT_TS_TRUNCATE)
1047  {
1049 
1051 
1052  /*
1053  * During XLOG replay, latest_page_number isn't set up yet; insert a
1054  * suitable value to bypass the sanity test in SimpleLruTruncate.
1055  */
1056  CommitTsCtl->shared->latest_page_number = trunc->pageno;
1057 
1059  }
1060  else if (info == COMMIT_TS_SETTS)
1061  {
1062  xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
1063  int nsubxids;
1064  TransactionId *subxids;
1065 
1066  nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
1067  sizeof(TransactionId));
1068  if (nsubxids > 0)
1069  {
1070  subxids = palloc(sizeof(TransactionId) * nsubxids);
1071  memcpy(subxids,
1073  sizeof(TransactionId) * nsubxids);
1074  }
1075  else
1076  subxids = NULL;
1077 
1078  TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
1079  setts->timestamp, setts->nodeid, true);
1080  if (subxids)
1081  pfree(subxids);
1082  }
1083  else
1084  elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1085 }
#define COMMIT_TS_ZEROPAGE
Definition: commit_ts.h:49
#define PG_GETARG_UINT32(n)
Definition: fmgr.h:270
CommitTimestampEntry dataLastCommit
Definition: commit_ts.c:95
Datum pg_last_committed_xact(PG_FUNCTION_ARGS)
Definition: commit_ts.c:428
int errhint(const char *fmt,...)
Definition: elog.c:1071
#define COMMIT_TS_SETTS
Definition: commit_ts.h:51
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
uint32 TransactionId
Definition: c.h:520
void SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
Definition: commit_ts.c:909
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1384
static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid)
Definition: commit_ts.c:1002
#define SizeOfCommitTimestampEntry
Definition: commit_ts.c:63
int64 timestamp
int64 TimestampTz
Definition: timestamp.h:39
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, RepOriginId nodeid, int pageno)
Definition: commit_ts.c:231
#define CommitTsCtl
Definition: commit_ts.c:79
static int ZeroCommitTsPage(int pageno, bool writeXlog)
Definition: commit_ts.c:603
void SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
Definition: slru.c:1196
bool InRecovery
Definition: xlog.c:204
#define Min(x, y)
Definition: c.h:927
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:632
unsigned char uint8
Definition: c.h:372
uint16 RepOriginId
Definition: xlogdefs.h:58
void StartupCommitTs(void)
Definition: commit_ts.c:620
int errcode(int sqlerrcode)
Definition: elog.c:610
FullTransactionId nextFullXid
Definition: transam.h:178
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
unsigned int Oid
Definition: postgres_ext.h:31
bool RecoveryInProgress(void)
Definition: xlog.c:8069
static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
Definition: commit_ts.c:986
Datum pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
Definition: commit_ts.c:405
#define PANIC
Definition: elog.h:53
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:144
#define PG_RETURN_TIMESTAMPTZ(x)
Definition: timestamp.h:40
void SimpleLruFlush(SlruCtl ctl, bool allow_redirtied)
Definition: slru.c:1128
Size CommitTsShmemBuffers(void)
Definition: commit_ts.c:531
void AdvanceOldestCommitTsXid(TransactionId oldestXact)
Definition: commit_ts.c:936
TimestampTz timestamp
Definition: commit_ts.h:55
#define XidFromFullTransactionId(x)
Definition: transam.h:48
void CompleteCommitTsInitialization(void)
Definition: commit_ts.c:630
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
void pfree(void *pointer)
Definition: mcxt.c:1056
#define XLogRecGetData(decoder)
Definition: xlogreader.h:310
#define FirstNormalTransactionId
Definition: transam.h:34
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, RepOriginId nodeid, int slotno)
Definition: commit_ts.c:257
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
#define XLogRecGetDataLen(decoder)
Definition: xlogreader.h:311
#define COMMIT_TS_XACTS_PER_PAGE
Definition: commit_ts.c:66
void CommitTsParameterChange(bool newvalue, bool oldvalue)
Definition: commit_ts.c:652
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:392
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid)
Definition: slru.c:382
TupleDesc BlessTupleDesc(TupleDesc tupdesc)
Definition: execTuples.c:2052
bool track_commit_timestamp
Definition: commit_ts.c:103
#define SizeOfCommitTsSet
Definition: commit_ts.h:61
bool IsUnderPostmaster
Definition: globals.c:109
void commit_ts_redo(XLogReaderState *record)
Definition: commit_ts.c:1024
VariableCache ShmemVariableCache
Definition: varsup.c:34
#define InvalidTransactionId
Definition: transam.h:31
TransactionId oldestXid
Definition: commit_ts.h:67
#define TIMESTAMP_NOBEGIN(j)
Definition: timestamp.h:112
static void ActivateCommitTs(void)
Definition: commit_ts.c:693
bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int pageno)
Definition: slru.c:609
void SimpleLruWritePage(SlruCtl ctl, int slotno)
Definition: slru.c:597
static void WriteZeroPageXlogRec(int pageno)
Definition: commit_ts.c:975
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:603
struct CommitTimestampEntry CommitTimestampEntry
TransactionId oldestCommitTsXid
Definition: transam.h:190
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:305
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
void TruncateCommitTs(TransactionId oldestXact)
Definition: commit_ts.c:883
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:330
TimestampTz time
Definition: commit_ts.c:59
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:422
#define TransactionIdGetDatum(X)
Definition: postgres.h:521
bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1352
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:282
#define TransactionIdToCTsPage(xid)
Definition: commit_ts.c:69
uintptr_t Datum
Definition: postgres.h:367
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:352
#define COMMIT_TS_TRUNCATE
Definition: commit_ts.h:50
static bool CommitTsPagePrecedes(int page1, int page2)
Definition: commit_ts.c:957
TransactionId xidLastCommit
Definition: commit_ts.c:94
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
Definition: slru.c:482
#define ereport(elevel,...)
Definition: elog.h:144
void CheckPointCommitTs(void)
Definition: commit_ts.c:821
#define Max(x, y)
Definition: c.h:921
TransactionId mainxid
Definition: commit_ts.h:57
void ShutdownCommitTs(void)
Definition: commit_ts.c:805
#define Assert(condition)
Definition: c.h:745
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
void ExtendCommitTs(TransactionId newestXact)
Definition: commit_ts.c:845
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1407
TransactionId newestCommitTsXid
Definition: transam.h:191
size_t Size
Definition: c.h:473
Size CommitTsShmemSize(void)
Definition: commit_ts.c:540
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:220
#define TransactionIdToCTsEntry(xid)
Definition: commit_ts.c:71
static SlruCtlData CommitTsCtlData
Definition: commit_ts.c:77
#define InvalidRepOriginId
Definition: origin.h:33
static TransactionId ReadNewTransactionId(void)
Definition: transam.h:258
static void DeactivateCommitTs(void)
Definition: commit_ts.c:767
static Datum values[MAXATTR]
Definition: bootstrap.c:167
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:824
Datum pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
Definition: commit_ts.c:482
RepOriginId nodeid
Definition: commit_ts.h:56
#define elog(elevel,...)
Definition: elog.h:214
int i
int NBuffers
Definition: globals.c:132
#define XLogRecHasAnyBlockRefs(decoder)
Definition: xlogreader.h:312
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
#define TransactionIdIsValid(xid)
Definition: transam.h:41
void BootStrapCommitTs(void)
Definition: commit_ts.c:584
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
RepOriginId nodeid
Definition: commit_ts.c:60
void XLogBeginInsert(void)
Definition: xloginsert.c:123
void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid, bool write_xlog)
Definition: commit_ts.c:145
CommitTimestampShared * commitTsShared
Definition: commit_ts.c:99
int16 AttrNumber
Definition: attnum.h:21
static void error_commit_ts_disabled(void)
Definition: commit_ts.c:389
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
Definition: slru.c:267
#define SizeOfCommitTsTruncate
Definition: commit_ts.h:70
#define PG_RETURN_NULL()
Definition: fmgr.h:344
#define offsetof(type, field)
Definition: c.h:668
TransactionId GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:368
void CommitTsShmemInit(void)
Definition: commit_ts.c:551
struct CommitTimestampShared CommitTimestampShared
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id)
Definition: slru.c:175