PostgreSQL Source Code  git master
commit_ts.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * commit_ts.c
4  * PostgreSQL commit timestamp manager
5  *
6  * This module is a pg_xact-like system that stores the commit timestamp
7  * for each transaction.
8  *
9  * XLOG interactions: this module generates an XLOG record whenever a new
10  * CommitTs page is initialized to zeroes. Also, one XLOG record is
11  * generated for setting of values when the caller requests it; this allows
12  * us to support values coming from places other than transaction commit.
13  * Other writes of CommitTS come from recording of transaction commit in
14  * xact.c, which generates its own XLOG records for these events and will
15  * re-perform the status update on redo; so we need make no additional XLOG
16  * entry here.
17  *
18  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
19  * Portions Copyright (c) 1994, Regents of the University of California
20  *
21  * src/backend/access/transam/commit_ts.c
22  *
23  *-------------------------------------------------------------------------
24  */
25 #include "postgres.h"
26 
27 #include "access/commit_ts.h"
28 #include "access/htup_details.h"
29 #include "access/slru.h"
30 #include "access/transam.h"
31 #include "catalog/pg_type.h"
32 #include "funcapi.h"
33 #include "miscadmin.h"
34 #include "pg_trace.h"
35 #include "storage/shmem.h"
36 #include "utils/builtins.h"
37 #include "utils/snapmgr.h"
38 #include "utils/timestamp.h"
39 
40 /*
41  * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
42  * everywhere else in Postgres.
43  *
44  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
45  * CommitTs page numbering also wraps around at
46  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
47  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
48  * explicit notice of that fact in this module, except when comparing segment
49  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
50  */
51 
52 /*
53  * We need 8+2 bytes per xact. Note that enlarging this struct might mean
54  * the largest possible file name is more than 5 chars long; see
55  * SlruScanDirectory.
56  */
57 typedef struct CommitTimestampEntry
58 {
62 
63 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
64  sizeof(RepOriginId))
65 
66 #define COMMIT_TS_XACTS_PER_PAGE \
67  (BLCKSZ / SizeOfCommitTimestampEntry)
68 
69 #define TransactionIdToCTsPage(xid) \
70  ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
71 #define TransactionIdToCTsEntry(xid) \
72  ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
73 
74 /*
75  * Link to shared-memory data structures for CommitTs control
76  */
78 
79 #define CommitTsCtl (&CommitTsCtlData)
80 
81 /*
82  * We keep a cache of the last value set in shared memory.
83  *
84  * This is also good place to keep the activation status. We keep this
85  * separate from the GUC so that the standby can activate the module if the
86  * primary has it active independently of the value of the GUC.
87  *
88  * This is protected by CommitTsLock. In some places, we use commitTsActive
89  * without acquiring the lock; where this happens, a comment explains the
90  * rationale for it.
91  */
92 typedef struct CommitTimestampShared
93 {
98 
100 
101 
102 /* GUC variable */
104 
105 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
106  TransactionId *subxids, TimestampTz ts,
107  RepOriginId nodeid, int pageno);
109  RepOriginId nodeid, int slotno);
110 static void error_commit_ts_disabled(void);
111 static int ZeroCommitTsPage(int pageno, bool writeXlog);
112 static bool CommitTsPagePrecedes(int page1, int page2);
113 static void ActivateCommitTs(void);
114 static void DeactivateCommitTs(void);
115 static void WriteZeroPageXlogRec(int pageno);
116 static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid);
117 static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
120 
121 /*
122  * TransactionTreeSetCommitTsData
123  *
124  * Record the final commit timestamp of transaction entries in the commit log
125  * for a transaction and its subtransaction tree, as efficiently as possible.
126  *
127  * xid is the top level transaction id.
128  *
129  * subxids is an array of xids of length nsubxids, representing subtransactions
130  * in the tree of xid. In various cases nsubxids may be zero.
131  * The reason why tracking just the parent xid commit timestamp is not enough
132  * is that the subtrans SLRU does not stay valid across crashes (it's not
133  * permanent) so we need to keep the information about them here. If the
134  * subtrans implementation changes in the future, we might want to revisit the
135  * decision of storing timestamp info for each subxid.
136  *
137  * The write_xlog parameter tells us whether to include an XLog record of this
138  * or not. Normally, this is called from transaction commit routines (both
139  * normal and prepared) and the information will be stored in the transaction
140  * commit XLog record, and so they should pass "false" for this. The XLog redo
141  * code should use "false" here as well. Other callers probably want to pass
142  * true, so that the given values persist in case of crashes.
143  */
144 void
147  RepOriginId nodeid, bool write_xlog)
148 {
149  int i;
150  TransactionId headxid;
151  TransactionId newestXact;
152 
153  /*
154  * No-op if the module is not active.
155  *
156  * An unlocked read here is fine, because in a standby (the only place
157  * where the flag can change in flight) this routine is only called by the
158  * recovery process, which is also the only process which can change the
159  * flag.
160  */
161  if (!commitTsShared->commitTsActive)
162  return;
163 
164  /*
165  * Comply with the WAL-before-data rule: if caller specified it wants this
166  * value to be recorded in WAL, do so before touching the data.
167  */
168  if (write_xlog)
169  WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
170 
171  /*
172  * Figure out the latest Xid in this batch: either the last subxid if
173  * there's any, otherwise the parent xid.
174  */
175  if (nsubxids > 0)
176  newestXact = subxids[nsubxids - 1];
177  else
178  newestXact = xid;
179 
180  /*
181  * We split the xids to set the timestamp to in groups belonging to the
182  * same SLRU page; the first element in each such set is its head. The
183  * first group has the main XID as the head; subsequent sets use the first
184  * subxid not on the previous page as head. This way, we only have to
185  * lock/modify each SLRU page once.
186  */
187  for (i = 0, headxid = xid;;)
188  {
189  int pageno = TransactionIdToCTsPage(headxid);
190  int j;
191 
192  for (j = i; j < nsubxids; j++)
193  {
194  if (TransactionIdToCTsPage(subxids[j]) != pageno)
195  break;
196  }
197  /* subxids[i..j] are on the same page as the head */
198 
199  SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
200  pageno);
201 
202  /* if we wrote out all subxids, we're done. */
203  if (j + 1 >= nsubxids)
204  break;
205 
206  /*
207  * Set the new head and skip over it, as well as over the subxids we
208  * just wrote.
209  */
210  headxid = subxids[j];
211  i += j - i + 1;
212  }
213 
214  /* update the cached value in shared memory */
215  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
216  commitTsShared->xidLastCommit = xid;
217  commitTsShared->dataLastCommit.time = timestamp;
218  commitTsShared->dataLastCommit.nodeid = nodeid;
219 
220  /* and move forwards our endpoint, if needed */
223  LWLockRelease(CommitTsLock);
224 }
225 
226 /*
227  * Record the commit timestamp of transaction entries in the commit log for all
228  * entries on a single page. Atomic only on this page.
229  */
230 static void
232  TransactionId *subxids, TimestampTz ts,
233  RepOriginId nodeid, int pageno)
234 {
235  int slotno;
236  int i;
237 
238  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
239 
240  slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
241 
242  TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
243  for (i = 0; i < nsubxids; i++)
244  TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
245 
246  CommitTsCtl->shared->page_dirty[slotno] = true;
247 
248  LWLockRelease(CommitTsSLRULock);
249 }
250 
251 /*
252  * Sets the commit timestamp of a single transaction.
253  *
254  * Must be called with CommitTsSLRULock held
255  */
256 static void
258  RepOriginId nodeid, int slotno)
259 {
260  int entryno = TransactionIdToCTsEntry(xid);
261  CommitTimestampEntry entry;
262 
264 
265  entry.time = ts;
266  entry.nodeid = nodeid;
267 
268  memcpy(CommitTsCtl->shared->page_buffer[slotno] +
269  SizeOfCommitTimestampEntry * entryno,
271 }
272 
273 /*
274  * Interrogate the commit timestamp of a transaction.
275  *
276  * The return value indicates whether a commit timestamp record was found for
277  * the given xid. The timestamp value is returned in *ts (which may not be
278  * null), and the origin node for the Xid is returned in *nodeid, if it's not
279  * null.
280  */
281 bool
284 {
285  int pageno = TransactionIdToCTsPage(xid);
286  int entryno = TransactionIdToCTsEntry(xid);
287  int slotno;
288  CommitTimestampEntry entry;
289  TransactionId oldestCommitTsXid;
290  TransactionId newestCommitTsXid;
291 
292  if (!TransactionIdIsValid(xid))
293  ereport(ERROR,
294  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
295  errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
296  else if (!TransactionIdIsNormal(xid))
297  {
298  /* frozen and bootstrap xids are always committed far in the past */
299  *ts = 0;
300  if (nodeid)
301  *nodeid = 0;
302  return false;
303  }
304 
305  LWLockAcquire(CommitTsLock, LW_SHARED);
306 
307  /* Error if module not enabled */
308  if (!commitTsShared->commitTsActive)
310 
311  /*
312  * If we're asked for the cached value, return that. Otherwise, fall
313  * through to read from SLRU.
314  */
315  if (commitTsShared->xidLastCommit == xid)
316  {
317  *ts = commitTsShared->dataLastCommit.time;
318  if (nodeid)
319  *nodeid = commitTsShared->dataLastCommit.nodeid;
320 
321  LWLockRelease(CommitTsLock);
322  return *ts != 0;
323  }
324 
325  oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
326  newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
327  /* neither is invalid, or both are */
328  Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
329  LWLockRelease(CommitTsLock);
330 
331  /*
332  * Return empty if the requested value is outside our valid range.
333  */
334  if (!TransactionIdIsValid(oldestCommitTsXid) ||
335  TransactionIdPrecedes(xid, oldestCommitTsXid) ||
336  TransactionIdPrecedes(newestCommitTsXid, xid))
337  {
338  *ts = 0;
339  if (nodeid)
340  *nodeid = InvalidRepOriginId;
341  return false;
342  }
343 
344  /* lock is acquired by SimpleLruReadPage_ReadOnly */
345  slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
346  memcpy(&entry,
347  CommitTsCtl->shared->page_buffer[slotno] +
348  SizeOfCommitTimestampEntry * entryno,
350 
351  *ts = entry.time;
352  if (nodeid)
353  *nodeid = entry.nodeid;
354 
355  LWLockRelease(CommitTsSLRULock);
356  return *ts != 0;
357 }
358 
359 /*
360  * Return the Xid of the latest committed transaction. (As far as this module
361  * is concerned, anyway; it's up to the caller to ensure the value is useful
362  * for its purposes.)
363  *
364  * ts and nodeid are filled with the corresponding data; they can be passed
365  * as NULL if not wanted.
366  */
369 {
370  TransactionId xid;
371 
372  LWLockAcquire(CommitTsLock, LW_SHARED);
373 
374  /* Error if module not enabled */
375  if (!commitTsShared->commitTsActive)
377 
378  xid = commitTsShared->xidLastCommit;
379  if (ts)
380  *ts = commitTsShared->dataLastCommit.time;
381  if (nodeid)
382  *nodeid = commitTsShared->dataLastCommit.nodeid;
383  LWLockRelease(CommitTsLock);
384 
385  return xid;
386 }
387 
388 static void
390 {
391  ereport(ERROR,
392  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
393  errmsg("could not get commit timestamp data"),
395  errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
396  "track_commit_timestamp") :
397  errhint("Make sure the configuration parameter \"%s\" is set.",
398  "track_commit_timestamp")));
399 }
400 
401 /*
402  * SQL-callable wrapper to obtain commit time of a transaction
403  */
404 Datum
406 {
408  TimestampTz ts;
409  bool found;
410 
411  found = TransactionIdGetCommitTsData(xid, &ts, NULL);
412 
413  if (!found)
414  PG_RETURN_NULL();
415 
417 }
418 
419 
420 /*
421  * pg_last_committed_xact
422  *
423  * SQL-callable wrapper to obtain some information about the latest
424  * committed transaction: transaction ID, timestamp and replication
425  * origin.
426  */
427 Datum
429 {
430  TransactionId xid;
432  TimestampTz ts;
433  Datum values[3];
434  bool nulls[3];
435  TupleDesc tupdesc;
436  HeapTuple htup;
437 
438  /* and construct a tuple with our data */
439  xid = GetLatestCommitTsData(&ts, &nodeid);
440 
441  /*
442  * Construct a tuple descriptor for the result row. This must match this
443  * function's pg_proc entry!
444  */
445  tupdesc = CreateTemplateTupleDesc(3);
446  TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
447  XIDOID, -1, 0);
448  TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
449  TIMESTAMPTZOID, -1, 0);
450  TupleDescInitEntry(tupdesc, (AttrNumber) 3, "roident",
451  OIDOID, -1, 0);
452  tupdesc = BlessTupleDesc(tupdesc);
453 
454  if (!TransactionIdIsNormal(xid))
455  {
456  memset(nulls, true, sizeof(nulls));
457  }
458  else
459  {
460  values[0] = TransactionIdGetDatum(xid);
461  nulls[0] = false;
462 
463  values[1] = TimestampTzGetDatum(ts);
464  nulls[1] = false;
465 
466  values[2] = ObjectIdGetDatum((Oid) nodeid);
467  nulls[2] = false;
468  }
469 
470  htup = heap_form_tuple(tupdesc, values, nulls);
471 
473 }
474 
475 /*
476  * pg_xact_commit_timestamp_origin
477  *
478  * SQL-callable wrapper to obtain commit timestamp and replication origin
479  * of a given transaction.
480  */
481 Datum
483 {
486  TimestampTz ts;
487  Datum values[2];
488  bool nulls[2];
489  TupleDesc tupdesc;
490  HeapTuple htup;
491  bool found;
492 
493  found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
494 
495  /*
496  * Construct a tuple descriptor for the result row. This must match this
497  * function's pg_proc entry!
498  */
499  tupdesc = CreateTemplateTupleDesc(2);
500  TupleDescInitEntry(tupdesc, (AttrNumber) 1, "timestamp",
501  TIMESTAMPTZOID, -1, 0);
502  TupleDescInitEntry(tupdesc, (AttrNumber) 2, "roident",
503  OIDOID, -1, 0);
504  tupdesc = BlessTupleDesc(tupdesc);
505 
506  if (!found)
507  {
508  memset(nulls, true, sizeof(nulls));
509  }
510  else
511  {
512  values[0] = TimestampTzGetDatum(ts);
513  nulls[0] = false;
514 
515  values[1] = ObjectIdGetDatum((Oid) nodeid);
516  nulls[1] = false;
517  }
518 
519  htup = heap_form_tuple(tupdesc, values, nulls);
520 
522 }
523 
524 /*
525  * Number of shared CommitTS buffers.
526  *
527  * We use a very similar logic as for the number of CLOG buffers; see comments
528  * in CLOGShmemBuffers.
529  */
530 Size
532 {
533  return Min(16, Max(4, NBuffers / 1024));
534 }
535 
536 /*
537  * Shared memory sizing for CommitTs
538  */
539 Size
541 {
543  sizeof(CommitTimestampShared);
544 }
545 
546 /*
547  * Initialize CommitTs at system startup (postmaster start or standalone
548  * backend)
549  */
550 void
552 {
553  bool found;
554 
555  CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
557  CommitTsSLRULock, "pg_commit_ts",
560 
561  commitTsShared = ShmemInitStruct("CommitTs shared",
562  sizeof(CommitTimestampShared),
563  &found);
564 
565  if (!IsUnderPostmaster)
566  {
567  Assert(!found);
568 
569  commitTsShared->xidLastCommit = InvalidTransactionId;
570  TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
571  commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
572  commitTsShared->commitTsActive = false;
573  }
574  else
575  Assert(found);
576 }
577 
578 /*
579  * This function must be called ONCE on system install.
580  *
581  * (The CommitTs directory is assumed to have been created by initdb, and
582  * CommitTsShmemInit must have been called already.)
583  */
584 void
586 {
587  /*
588  * Nothing to do here at present, unlike most other SLRU modules; segments
589  * are created when the server is started with this module enabled. See
590  * ActivateCommitTs.
591  */
592 }
593 
594 /*
595  * Initialize (or reinitialize) a page of CommitTs to zeroes.
596  * If writeXlog is true, also emit an XLOG record saying we did this.
597  *
598  * The page is not actually written, just set up in shared memory.
599  * The slot number of the new page is returned.
600  *
601  * Control lock must be held at entry, and will be held at exit.
602  */
603 static int
604 ZeroCommitTsPage(int pageno, bool writeXlog)
605 {
606  int slotno;
607 
608  slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
609 
610  if (writeXlog)
611  WriteZeroPageXlogRec(pageno);
612 
613  return slotno;
614 }
615 
616 /*
617  * This must be called ONCE during postmaster or standalone-backend startup,
618  * after StartupXLOG has initialized ShmemVariableCache->nextXid.
619  */
620 void
622 {
624 }
625 
626 /*
627  * This must be called ONCE during postmaster or standalone-backend startup,
628  * after recovery has finished.
629  */
630 void
632 {
633  /*
634  * If the feature is not enabled, turn it off for good. This also removes
635  * any leftover data.
636  *
637  * Conversely, we activate the module if the feature is enabled. This is
638  * necessary for primary and standby as the activation depends on the
639  * control file contents at the beginning of recovery or when a
640  * XLOG_PARAMETER_CHANGE is replayed.
641  */
644  else
646 }
647 
648 /*
649  * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
650  * XLog record during recovery.
651  */
652 void
653 CommitTsParameterChange(bool newvalue, bool oldvalue)
654 {
655  /*
656  * If the commit_ts module is disabled in this server and we get word from
657  * the primary server that it is enabled there, activate it so that we can
658  * replay future WAL records involving it; also mark it as active on
659  * pg_control. If the old value was already set, we already did this, so
660  * don't do anything.
661  *
662  * If the module is disabled in the primary, disable it here too, unless
663  * the module is enabled locally.
664  *
665  * Note this only runs in the recovery process, so an unlocked read is
666  * fine.
667  */
668  if (newvalue)
669  {
670  if (!commitTsShared->commitTsActive)
672  }
673  else if (commitTsShared->commitTsActive)
675 }
676 
677 /*
678  * Activate this module whenever necessary.
679  * This must happen during postmaster or standalone-backend startup,
680  * or during WAL replay anytime the track_commit_timestamp setting is
681  * changed in the primary.
682  *
683  * The reason why this SLRU needs separate activation/deactivation functions is
684  * that it can be enabled/disabled during start and the activation/deactivation
685  * on the primary is propagated to the standby via replay. Other SLRUs don't
686  * have this property and they can be just initialized during normal startup.
687  *
688  * This is in charge of creating the currently active segment, if it's not
689  * already there. The reason for this is that the server might have been
690  * running with this module disabled for a while and thus might have skipped
691  * the normal creation point.
692  */
693 static void
695 {
696  TransactionId xid;
697  int pageno;
698 
699  /* If we've done this already, there's nothing to do */
700  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
701  if (commitTsShared->commitTsActive)
702  {
703  LWLockRelease(CommitTsLock);
704  return;
705  }
706  LWLockRelease(CommitTsLock);
707 
709  pageno = TransactionIdToCTsPage(xid);
710 
711  /*
712  * Re-Initialize our idea of the latest page number.
713  */
714  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
715  CommitTsCtl->shared->latest_page_number = pageno;
716  LWLockRelease(CommitTsSLRULock);
717 
718  /*
719  * If CommitTs is enabled, but it wasn't in the previous server run, we
720  * need to set the oldest and newest values to the next Xid; that way, we
721  * will not try to read data that might not have been set.
722  *
723  * XXX does this have a problem if a server is started with commitTs
724  * enabled, then started with commitTs disabled, then restarted with it
725  * enabled again? It doesn't look like it does, because there should be a
726  * checkpoint that sets the value to InvalidTransactionId at end of
727  * recovery; and so any chance of injecting new transactions without
728  * CommitTs values would occur after the oldestCommitTsXid has been set to
729  * Invalid temporarily.
730  */
731  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
733  {
736  }
737  LWLockRelease(CommitTsLock);
738 
739  /* Create the current segment file, if necessary */
741  {
742  int slotno;
743 
744  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
745  slotno = ZeroCommitTsPage(pageno, false);
747  Assert(!CommitTsCtl->shared->page_dirty[slotno]);
748  LWLockRelease(CommitTsSLRULock);
749  }
750 
751  /* Change the activation status in shared memory. */
752  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
753  commitTsShared->commitTsActive = true;
754  LWLockRelease(CommitTsLock);
755 }
756 
757 /*
758  * Deactivate this module.
759  *
760  * This must be called when the track_commit_timestamp parameter is turned off.
761  * This happens during postmaster or standalone-backend startup, or during WAL
762  * replay.
763  *
764  * Resets CommitTs into invalid state to make sure we don't hand back
765  * possibly-invalid data; also removes segments of old data.
766  */
767 static void
769 {
770  /*
771  * Cleanup the status in the shared memory.
772  *
773  * We reset everything in the commitTsShared record to prevent user from
774  * getting confusing data about last committed transaction on the standby
775  * when the module was activated repeatedly on the primary.
776  */
777  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
778 
779  commitTsShared->commitTsActive = false;
780  commitTsShared->xidLastCommit = InvalidTransactionId;
781  TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
782  commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
783 
786 
787  LWLockRelease(CommitTsLock);
788 
789  /*
790  * Remove *all* files. This is necessary so that there are no leftover
791  * files; in the case where this feature is later enabled after running
792  * with it disabled for some time there may be a gap in the file sequence.
793  * (We can probably tolerate out-of-sequence files, as they are going to
794  * be overwritten anyway when we wrap around, but it seems better to be
795  * tidy.)
796  */
797  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
799  LWLockRelease(CommitTsSLRULock);
800 }
801 
802 /*
803  * Perform a checkpoint --- either during shutdown, or on-the-fly
804  */
805 void
807 {
808  /*
809  * Write dirty CommitTs pages to disk. This may result in sync requests
810  * queued for later handling by ProcessSyncRequests(), as part of the
811  * checkpoint.
812  */
814 }
815 
816 /*
817  * Make sure that CommitTs has room for a newly-allocated XID.
818  *
819  * NB: this is called while holding XidGenLock. We want it to be very fast
820  * most of the time; even when it's not so fast, no actual I/O need happen
821  * unless we're forced to write out a dirty CommitTs or xlog page to make room
822  * in shared memory.
823  *
824  * NB: the current implementation relies on track_commit_timestamp being
825  * PGC_POSTMASTER.
826  */
827 void
829 {
830  int pageno;
831 
832  /*
833  * Nothing to do if module not enabled. Note we do an unlocked read of
834  * the flag here, which is okay because this routine is only called from
835  * GetNewTransactionId, which is never called in a standby.
836  */
837  Assert(!InRecovery);
838  if (!commitTsShared->commitTsActive)
839  return;
840 
841  /*
842  * No work except at first XID of a page. But beware: just after
843  * wraparound, the first XID of page zero is FirstNormalTransactionId.
844  */
845  if (TransactionIdToCTsEntry(newestXact) != 0 &&
847  return;
848 
849  pageno = TransactionIdToCTsPage(newestXact);
850 
851  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
852 
853  /* Zero the page and make an XLOG entry about it */
854  ZeroCommitTsPage(pageno, !InRecovery);
855 
856  LWLockRelease(CommitTsSLRULock);
857 }
858 
859 /*
860  * Remove all CommitTs segments before the one holding the passed
861  * transaction ID.
862  *
863  * Note that we don't need to flush XLOG here.
864  */
865 void
867 {
868  int cutoffPage;
869 
870  /*
871  * The cutoff point is the start of the segment containing oldestXact. We
872  * pass the *page* containing oldestXact to SimpleLruTruncate.
873  */
874  cutoffPage = TransactionIdToCTsPage(oldestXact);
875 
876  /* Check to see if there's any files that could be removed */
878  &cutoffPage))
879  return; /* nothing to remove */
880 
881  /* Write XLOG record */
882  WriteTruncateXlogRec(cutoffPage, oldestXact);
883 
884  /* Now we can remove the old CommitTs segment(s) */
885  SimpleLruTruncate(CommitTsCtl, cutoffPage);
886 }
887 
888 /*
889  * Set the limit values between which commit TS can be consulted.
890  */
891 void
893 {
894  /*
895  * Be careful not to overwrite values that are either further into the
896  * "future" or signal a disabled committs.
897  */
898  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
900  {
905  }
906  else
907  {
911  }
912  LWLockRelease(CommitTsLock);
913 }
914 
915 /*
916  * Move forwards the oldest commitTS value that can be consulted
917  */
918 void
920 {
921  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
925  LWLockRelease(CommitTsLock);
926 }
927 
928 
929 /*
930  * Decide which of two commitTS page numbers is "older" for truncation
931  * purposes.
932  *
933  * We need to use comparison of TransactionIds here in order to do the right
934  * thing with wraparound XID arithmetic. However, if we are asked about
935  * page number zero, we don't want to hand InvalidTransactionId to
936  * TransactionIdPrecedes: it'll get weird about permanent xact IDs. So,
937  * offset both xids by FirstNormalTransactionId to avoid that.
938  */
939 static bool
940 CommitTsPagePrecedes(int page1, int page2)
941 {
942  TransactionId xid1;
943  TransactionId xid2;
944 
945  xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
946  xid1 += FirstNormalTransactionId;
947  xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
948  xid2 += FirstNormalTransactionId;
949 
950  return TransactionIdPrecedes(xid1, xid2);
951 }
952 
953 
954 /*
955  * Write a ZEROPAGE xlog record
956  */
957 static void
959 {
960  XLogBeginInsert();
961  XLogRegisterData((char *) (&pageno), sizeof(int));
962  (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
963 }
964 
965 /*
966  * Write a TRUNCATE xlog record
967  */
968 static void
969 WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
970 {
971  xl_commit_ts_truncate xlrec;
972 
973  xlrec.pageno = pageno;
974  xlrec.oldestXid = oldestXid;
975 
976  XLogBeginInsert();
977  XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
978  (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
979 }
980 
981 /*
982  * Write a SETTS xlog record
983  */
984 static void
988 {
989  xl_commit_ts_set record;
990 
991  record.timestamp = timestamp;
992  record.nodeid = nodeid;
993  record.mainxid = mainxid;
994 
995  XLogBeginInsert();
996  XLogRegisterData((char *) &record,
997  offsetof(xl_commit_ts_set, mainxid) +
998  sizeof(TransactionId));
999  XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
1000  XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
1001 }
1002 
1003 /*
1004  * CommitTS resource manager's routines
1005  */
1006 void
1008 {
1009  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
1010 
1011  /* Backup blocks are not used in commit_ts records */
1012  Assert(!XLogRecHasAnyBlockRefs(record));
1013 
1014  if (info == COMMIT_TS_ZEROPAGE)
1015  {
1016  int pageno;
1017  int slotno;
1018 
1019  memcpy(&pageno, XLogRecGetData(record), sizeof(int));
1020 
1021  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
1022 
1023  slotno = ZeroCommitTsPage(pageno, false);
1025  Assert(!CommitTsCtl->shared->page_dirty[slotno]);
1026 
1027  LWLockRelease(CommitTsSLRULock);
1028  }
1029  else if (info == COMMIT_TS_TRUNCATE)
1030  {
1032 
1034 
1035  /*
1036  * During XLOG replay, latest_page_number isn't set up yet; insert a
1037  * suitable value to bypass the sanity test in SimpleLruTruncate.
1038  */
1039  CommitTsCtl->shared->latest_page_number = trunc->pageno;
1040 
1042  }
1043  else if (info == COMMIT_TS_SETTS)
1044  {
1045  xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
1046  int nsubxids;
1047  TransactionId *subxids;
1048 
1049  nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
1050  sizeof(TransactionId));
1051  if (nsubxids > 0)
1052  {
1053  subxids = palloc(sizeof(TransactionId) * nsubxids);
1054  memcpy(subxids,
1056  sizeof(TransactionId) * nsubxids);
1057  }
1058  else
1059  subxids = NULL;
1060 
1061  TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
1062  setts->timestamp, setts->nodeid, true);
1063  if (subxids)
1064  pfree(subxids);
1065  }
1066  else
1067  elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1068 }
1069 
1070 /*
1071  * Entrypoint for sync.c to sync commit_ts files.
1072  */
1073 int
1074 committssyncfiletag(const FileTag *ftag, char *path)
1075 {
1076  return SlruSyncFileTag(CommitTsCtl, ftag, path);
1077 }
#define COMMIT_TS_ZEROPAGE
Definition: commit_ts.h:52
#define PG_GETARG_UINT32(n)
Definition: fmgr.h:270
CommitTimestampEntry dataLastCommit
Definition: commit_ts.c:95
Datum pg_last_committed_xact(PG_FUNCTION_ARGS)
Definition: commit_ts.c:428
int errhint(const char *fmt,...)
Definition: elog.c:1068
#define COMMIT_TS_SETTS
Definition: commit_ts.h:54
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
int SlruSyncFileTag(SlruCtl ctl, const FileTag *ftag, char *path)
Definition: slru.c:1489
uint32 TransactionId
Definition: c.h:521
void SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
Definition: commit_ts.c:892
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1426
static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid)
Definition: commit_ts.c:985
#define SizeOfCommitTimestampEntry
Definition: commit_ts.c:63
int64 timestamp
int64 TimestampTz
Definition: timestamp.h:39
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, RepOriginId nodeid, int pageno)
Definition: commit_ts.c:231
#define CommitTsCtl
Definition: commit_ts.c:79
static int ZeroCommitTsPage(int pageno, bool writeXlog)
Definition: commit_ts.c:604
int committssyncfiletag(const FileTag *ftag, char *path)
Definition: commit_ts.c:1074
void SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
Definition: slru.c:1225
bool InRecovery
Definition: xlog.c:205
#define Min(x, y)
Definition: c.h:928
unsigned char uint8
Definition: c.h:373
uint16 RepOriginId
Definition: xlogdefs.h:58
void StartupCommitTs(void)
Definition: commit_ts.c:621
int errcode(int sqlerrcode)
Definition: elog.c:610
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
unsigned int Oid
Definition: postgres_ext.h:31
bool RecoveryInProgress(void)
Definition: xlog.c:8076
static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
Definition: commit_ts.c:969
Datum pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
Definition: commit_ts.c:405
FullTransactionId nextXid
Definition: transam.h:213
#define PANIC
Definition: elog.h:53
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:155
#define PG_RETURN_TIMESTAMPTZ(x)
Definition: timestamp.h:40
Size CommitTsShmemBuffers(void)
Definition: commit_ts.c:531
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id, SyncRequestHandler sync_handler)
Definition: slru.c:186
void AdvanceOldestCommitTsXid(TransactionId oldestXact)
Definition: commit_ts.c:919
TimestampTz timestamp
Definition: commit_ts.h:58
#define XidFromFullTransactionId(x)
Definition: transam.h:48
void CompleteCommitTsInitialization(void)
Definition: commit_ts.c:631
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
void pfree(void *pointer)
Definition: mcxt.c:1057
#define XLogRecGetData(decoder)
Definition: xlogreader.h:310
#define FirstNormalTransactionId
Definition: transam.h:34
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, RepOriginId nodeid, int slotno)
Definition: commit_ts.c:257
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
#define XLogRecGetDataLen(decoder)
Definition: xlogreader.h:311
#define COMMIT_TS_XACTS_PER_PAGE
Definition: commit_ts.c:66
void CommitTsParameterChange(bool newvalue, bool oldvalue)
Definition: commit_ts.c:653
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:392
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid)
Definition: slru.c:394
TupleDesc BlessTupleDesc(TupleDesc tupdesc)
Definition: execTuples.c:2052
bool track_commit_timestamp
Definition: commit_ts.c:103
#define SizeOfCommitTsSet
Definition: commit_ts.h:64
bool IsUnderPostmaster
Definition: globals.c:109
void commit_ts_redo(XLogReaderState *record)
Definition: commit_ts.c:1007
VariableCache ShmemVariableCache
Definition: varsup.c:34
void SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
Definition: slru.c:1155
#define InvalidTransactionId
Definition: transam.h:31
TransactionId oldestXid
Definition: commit_ts.h:70
#define TIMESTAMP_NOBEGIN(j)
Definition: timestamp.h:112
static void ActivateCommitTs(void)
Definition: commit_ts.c:694
bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int pageno)
Definition: slru.c:625
void SimpleLruWritePage(SlruCtl ctl, int slotno)
Definition: slru.c:613
static void WriteZeroPageXlogRec(int pageno)
Definition: commit_ts.c:958
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:603
struct CommitTimestampEntry CommitTimestampEntry
TransactionId oldestCommitTsXid
Definition: transam.h:225
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:305
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
void TruncateCommitTs(TransactionId oldestXact)
Definition: commit_ts.c:866
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:330
TimestampTz time
Definition: commit_ts.c:59
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:422
#define TransactionIdGetDatum(X)
Definition: postgres.h:521
bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1394
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:282
#define TransactionIdToCTsPage(xid)
Definition: commit_ts.c:69
uintptr_t Datum
Definition: postgres.h:367
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:352
#define COMMIT_TS_TRUNCATE
Definition: commit_ts.h:53
static bool CommitTsPagePrecedes(int page1, int page2)
Definition: commit_ts.c:940
TransactionId xidLastCommit
Definition: commit_ts.c:94
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
Definition: slru.c:494
#define ereport(elevel,...)
Definition: elog.h:144
void CheckPointCommitTs(void)
Definition: commit_ts.c:806
#define Max(x, y)
Definition: c.h:922
TransactionId mainxid
Definition: commit_ts.h:60
#define Assert(condition)
Definition: c.h:746
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
void ExtendCommitTs(TransactionId newestXact)
Definition: commit_ts.c:828
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1449
TransactionId newestCommitTsXid
Definition: transam.h:226
size_t Size
Definition: c.h:474
Size CommitTsShmemSize(void)
Definition: commit_ts.c:540
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:221
#define TransactionIdToCTsEntry(xid)
Definition: commit_ts.c:71
static SlruCtlData CommitTsCtlData
Definition: commit_ts.c:77
#define InvalidRepOriginId
Definition: origin.h:33
static TransactionId ReadNewTransactionId(void)
Definition: transam.h:308
static void DeactivateCommitTs(void)
Definition: commit_ts.c:768
static Datum values[MAXATTR]
Definition: bootstrap.c:165
void * palloc(Size size)
Definition: mcxt.c:950
int errmsg(const char *fmt,...)
Definition: elog.c:821
Datum pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
Definition: commit_ts.c:482
RepOriginId nodeid
Definition: commit_ts.h:59
#define elog(elevel,...)
Definition: elog.h:214
int i
int NBuffers
Definition: globals.c:132
#define XLogRecHasAnyBlockRefs(decoder)
Definition: xlogreader.h:312
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
#define TransactionIdIsValid(xid)
Definition: transam.h:41
void BootStrapCommitTs(void)
Definition: commit_ts.c:585
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
RepOriginId nodeid
Definition: commit_ts.c:60
void XLogBeginInsert(void)
Definition: xloginsert.c:123
void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid, bool write_xlog)
Definition: commit_ts.c:145
CommitTimestampShared * commitTsShared
Definition: commit_ts.c:99
int16 AttrNumber
Definition: attnum.h:21
static void error_commit_ts_disabled(void)
Definition: commit_ts.c:389
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
Definition: slru.c:279
#define SizeOfCommitTsTruncate
Definition: commit_ts.h:73
#define PG_RETURN_NULL()
Definition: fmgr.h:344
Definition: sync.h:50
#define offsetof(type, field)
Definition: c.h:669
TransactionId GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:368
void CommitTsShmemInit(void)
Definition: commit_ts.c:551
struct CommitTimestampShared CommitTimestampShared