PostgreSQL Source Code  git master
commit_ts.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * commit_ts.c
4  * PostgreSQL commit timestamp manager
5  *
6  * This module is a pg_xact-like system that stores the commit timestamp
7  * for each transaction.
8  *
9  * XLOG interactions: this module generates an XLOG record whenever a new
10  * CommitTs page is initialized to zeroes. Other writes of CommitTS come
11  * from recording of transaction commit in xact.c, which generates its own
12  * XLOG records for these events and will re-perform the status update on
13  * redo; so we need make no additional XLOG entry here.
14  *
15  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
16  * Portions Copyright (c) 1994, Regents of the University of California
17  *
18  * src/backend/access/transam/commit_ts.c
19  *
20  *-------------------------------------------------------------------------
21  */
22 #include "postgres.h"
23 
24 #include "access/commit_ts.h"
25 #include "access/htup_details.h"
26 #include "access/slru.h"
27 #include "access/transam.h"
28 #include "access/xloginsert.h"
29 #include "access/xlogutils.h"
30 #include "catalog/pg_type.h"
31 #include "funcapi.h"
32 #include "miscadmin.h"
33 #include "pg_trace.h"
34 #include "storage/shmem.h"
35 #include "utils/builtins.h"
36 #include "utils/snapmgr.h"
37 #include "utils/timestamp.h"
38 
39 /*
40  * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
41  * everywhere else in Postgres.
42  *
43  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
44  * CommitTs page numbering also wraps around at
45  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
46  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
47  * explicit notice of that fact in this module, except when comparing segment
48  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
49  */
50 
51 /*
52  * We need 8+2 bytes per xact. Note that enlarging this struct might mean
53  * the largest possible file name is more than 5 chars long; see
54  * SlruScanDirectory.
55  */
56 typedef struct CommitTimestampEntry
57 {
61 
62 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
63  sizeof(RepOriginId))
64 
65 #define COMMIT_TS_XACTS_PER_PAGE \
66  (BLCKSZ / SizeOfCommitTimestampEntry)
67 
68 
69 /*
70  * Although we return an int64 the actual value can't currently exceed
71  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE.
72  */
73 static inline int64
75 {
76  return xid / (int64) COMMIT_TS_XACTS_PER_PAGE;
77 }
78 
79 #define TransactionIdToCTsEntry(xid) \
80  ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
81 
82 /*
83  * Link to shared-memory data structures for CommitTs control
84  */
86 
87 #define CommitTsCtl (&CommitTsCtlData)
88 
89 /*
90  * We keep a cache of the last value set in shared memory.
91  *
92  * This is also good place to keep the activation status. We keep this
93  * separate from the GUC so that the standby can activate the module if the
94  * primary has it active independently of the value of the GUC.
95  *
96  * This is protected by CommitTsLock. In some places, we use commitTsActive
97  * without acquiring the lock; where this happens, a comment explains the
98  * rationale for it.
99  */
100 typedef struct CommitTimestampShared
101 {
106 
108 
109 
110 /* GUC variable */
112 
113 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
114  TransactionId *subxids, TimestampTz ts,
115  RepOriginId nodeid, int64 pageno);
117  RepOriginId nodeid, int slotno);
118 static void error_commit_ts_disabled(void);
119 static int ZeroCommitTsPage(int64 pageno, bool writeXlog);
120 static bool CommitTsPagePrecedes(int64 page1, int64 page2);
121 static void ActivateCommitTs(void);
122 static void DeactivateCommitTs(void);
123 static void WriteZeroPageXlogRec(int64 pageno);
124 static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid);
125 
126 /*
127  * TransactionTreeSetCommitTsData
128  *
129  * Record the final commit timestamp of transaction entries in the commit log
130  * for a transaction and its subtransaction tree, as efficiently as possible.
131  *
132  * xid is the top level transaction id.
133  *
134  * subxids is an array of xids of length nsubxids, representing subtransactions
135  * in the tree of xid. In various cases nsubxids may be zero.
136  * The reason why tracking just the parent xid commit timestamp is not enough
137  * is that the subtrans SLRU does not stay valid across crashes (it's not
138  * permanent) so we need to keep the information about them here. If the
139  * subtrans implementation changes in the future, we might want to revisit the
140  * decision of storing timestamp info for each subxid.
141  */
142 void
145  RepOriginId nodeid)
146 {
147  int i;
148  TransactionId headxid;
149  TransactionId newestXact;
150 
151  /*
152  * No-op if the module is not active.
153  *
154  * An unlocked read here is fine, because in a standby (the only place
155  * where the flag can change in flight) this routine is only called by the
156  * recovery process, which is also the only process which can change the
157  * flag.
158  */
160  return;
161 
162  /*
163  * Figure out the latest Xid in this batch: either the last subxid if
164  * there's any, otherwise the parent xid.
165  */
166  if (nsubxids > 0)
167  newestXact = subxids[nsubxids - 1];
168  else
169  newestXact = xid;
170 
171  /*
172  * We split the xids to set the timestamp to in groups belonging to the
173  * same SLRU page; the first element in each such set is its head. The
174  * first group has the main XID as the head; subsequent sets use the first
175  * subxid not on the previous page as head. This way, we only have to
176  * lock/modify each SLRU page once.
177  */
178  headxid = xid;
179  i = 0;
180  for (;;)
181  {
182  int64 pageno = TransactionIdToCTsPage(headxid);
183  int j;
184 
185  for (j = i; j < nsubxids; j++)
186  {
187  if (TransactionIdToCTsPage(subxids[j]) != pageno)
188  break;
189  }
190  /* subxids[i..j] are on the same page as the head */
191 
192  SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
193  pageno);
194 
195  /* if we wrote out all subxids, we're done. */
196  if (j >= nsubxids)
197  break;
198 
199  /*
200  * Set the new head and skip over it, as well as over the subxids we
201  * just wrote.
202  */
203  headxid = subxids[j];
204  i = j + 1;
205  }
206 
207  /* update the cached value in shared memory */
208  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
212 
213  /* and move forwards our endpoint, if needed */
216  LWLockRelease(CommitTsLock);
217 }
218 
219 /*
220  * Record the commit timestamp of transaction entries in the commit log for all
221  * entries on a single page. Atomic only on this page.
222  */
223 static void
225  TransactionId *subxids, TimestampTz ts,
226  RepOriginId nodeid, int64 pageno)
227 {
228  int slotno;
229  int i;
230 
231  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
232 
233  slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
234 
235  TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
236  for (i = 0; i < nsubxids; i++)
237  TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
238 
239  CommitTsCtl->shared->page_dirty[slotno] = true;
240 
241  LWLockRelease(CommitTsSLRULock);
242 }
243 
244 /*
245  * Sets the commit timestamp of a single transaction.
246  *
247  * Must be called with CommitTsSLRULock held
248  */
249 static void
251  RepOriginId nodeid, int slotno)
252 {
253  int entryno = TransactionIdToCTsEntry(xid);
254  CommitTimestampEntry entry;
255 
257 
258  entry.time = ts;
259  entry.nodeid = nodeid;
260 
261  memcpy(CommitTsCtl->shared->page_buffer[slotno] +
262  SizeOfCommitTimestampEntry * entryno,
264 }
265 
266 /*
267  * Interrogate the commit timestamp of a transaction.
268  *
269  * The return value indicates whether a commit timestamp record was found for
270  * the given xid. The timestamp value is returned in *ts (which may not be
271  * null), and the origin node for the Xid is returned in *nodeid, if it's not
272  * null.
273  */
274 bool
276  RepOriginId *nodeid)
277 {
278  int64 pageno = TransactionIdToCTsPage(xid);
279  int entryno = TransactionIdToCTsEntry(xid);
280  int slotno;
281  CommitTimestampEntry entry;
282  TransactionId oldestCommitTsXid;
283  TransactionId newestCommitTsXid;
284 
285  if (!TransactionIdIsValid(xid))
286  ereport(ERROR,
287  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
288  errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
289  else if (!TransactionIdIsNormal(xid))
290  {
291  /* frozen and bootstrap xids are always committed far in the past */
292  *ts = 0;
293  if (nodeid)
294  *nodeid = 0;
295  return false;
296  }
297 
298  LWLockAcquire(CommitTsLock, LW_SHARED);
299 
300  /* Error if module not enabled */
303 
304  /*
305  * If we're asked for the cached value, return that. Otherwise, fall
306  * through to read from SLRU.
307  */
308  if (commitTsShared->xidLastCommit == xid)
309  {
311  if (nodeid)
313 
314  LWLockRelease(CommitTsLock);
315  return *ts != 0;
316  }
317 
318  oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
319  newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
320  /* neither is invalid, or both are */
321  Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
322  LWLockRelease(CommitTsLock);
323 
324  /*
325  * Return empty if the requested value is outside our valid range.
326  */
327  if (!TransactionIdIsValid(oldestCommitTsXid) ||
328  TransactionIdPrecedes(xid, oldestCommitTsXid) ||
329  TransactionIdPrecedes(newestCommitTsXid, xid))
330  {
331  *ts = 0;
332  if (nodeid)
333  *nodeid = InvalidRepOriginId;
334  return false;
335  }
336 
337  /* lock is acquired by SimpleLruReadPage_ReadOnly */
338  slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
339  memcpy(&entry,
340  CommitTsCtl->shared->page_buffer[slotno] +
341  SizeOfCommitTimestampEntry * entryno,
343 
344  *ts = entry.time;
345  if (nodeid)
346  *nodeid = entry.nodeid;
347 
348  LWLockRelease(CommitTsSLRULock);
349  return *ts != 0;
350 }
351 
352 /*
353  * Return the Xid of the latest committed transaction. (As far as this module
354  * is concerned, anyway; it's up to the caller to ensure the value is useful
355  * for its purposes.)
356  *
357  * ts and nodeid are filled with the corresponding data; they can be passed
358  * as NULL if not wanted.
359  */
362 {
363  TransactionId xid;
364 
365  LWLockAcquire(CommitTsLock, LW_SHARED);
366 
367  /* Error if module not enabled */
370 
372  if (ts)
374  if (nodeid)
376  LWLockRelease(CommitTsLock);
377 
378  return xid;
379 }
380 
381 static void
383 {
384  ereport(ERROR,
385  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
386  errmsg("could not get commit timestamp data"),
388  errhint("Make sure the configuration parameter %s is set on the primary server.",
389  "track_commit_timestamp") :
390  errhint("Make sure the configuration parameter %s is set.",
391  "track_commit_timestamp")));
392 }
393 
394 /*
395  * SQL-callable wrapper to obtain commit time of a transaction
396  */
397 Datum
399 {
401  TimestampTz ts;
402  bool found;
403 
404  found = TransactionIdGetCommitTsData(xid, &ts, NULL);
405 
406  if (!found)
407  PG_RETURN_NULL();
408 
410 }
411 
412 
413 /*
414  * pg_last_committed_xact
415  *
416  * SQL-callable wrapper to obtain some information about the latest
417  * committed transaction: transaction ID, timestamp and replication
418  * origin.
419  */
420 Datum
422 {
423  TransactionId xid;
424  RepOriginId nodeid;
425  TimestampTz ts;
426  Datum values[3];
427  bool nulls[3];
428  TupleDesc tupdesc;
429  HeapTuple htup;
430 
431  /* and construct a tuple with our data */
432  xid = GetLatestCommitTsData(&ts, &nodeid);
433 
434  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
435  elog(ERROR, "return type must be a row type");
436 
437  if (!TransactionIdIsNormal(xid))
438  {
439  memset(nulls, true, sizeof(nulls));
440  }
441  else
442  {
443  values[0] = TransactionIdGetDatum(xid);
444  nulls[0] = false;
445 
446  values[1] = TimestampTzGetDatum(ts);
447  nulls[1] = false;
448 
449  values[2] = ObjectIdGetDatum((Oid) nodeid);
450  nulls[2] = false;
451  }
452 
453  htup = heap_form_tuple(tupdesc, values, nulls);
454 
456 }
457 
458 /*
459  * pg_xact_commit_timestamp_origin
460  *
461  * SQL-callable wrapper to obtain commit timestamp and replication origin
462  * of a given transaction.
463  */
464 Datum
466 {
468  RepOriginId nodeid;
469  TimestampTz ts;
470  Datum values[2];
471  bool nulls[2];
472  TupleDesc tupdesc;
473  HeapTuple htup;
474  bool found;
475 
476  found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
477 
478  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
479  elog(ERROR, "return type must be a row type");
480 
481  if (!found)
482  {
483  memset(nulls, true, sizeof(nulls));
484  }
485  else
486  {
487  values[0] = TimestampTzGetDatum(ts);
488  nulls[0] = false;
489 
490  values[1] = ObjectIdGetDatum((Oid) nodeid);
491  nulls[1] = false;
492  }
493 
494  htup = heap_form_tuple(tupdesc, values, nulls);
495 
497 }
498 
499 /*
500  * Number of shared CommitTS buffers.
501  *
502  * We use a very similar logic as for the number of CLOG buffers (except we
503  * scale up twice as fast with shared buffers, and the maximum is twice as
504  * high); see comments in CLOGShmemBuffers.
505  */
506 Size
508 {
509  return Min(256, Max(4, NBuffers / 256));
510 }
511 
512 /*
513  * Shared memory sizing for CommitTs
514  */
515 Size
517 {
519  sizeof(CommitTimestampShared);
520 }
521 
522 /*
523  * Initialize CommitTs at system startup (postmaster start or standalone
524  * backend)
525  */
526 void
528 {
529  bool found;
530 
531  CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
533  CommitTsSLRULock, "pg_commit_ts",
536  false);
538 
539  commitTsShared = ShmemInitStruct("CommitTs shared",
540  sizeof(CommitTimestampShared),
541  &found);
542 
543  if (!IsUnderPostmaster)
544  {
545  Assert(!found);
546 
551  }
552  else
553  Assert(found);
554 }
555 
556 /*
557  * This function must be called ONCE on system install.
558  *
559  * (The CommitTs directory is assumed to have been created by initdb, and
560  * CommitTsShmemInit must have been called already.)
561  */
562 void
564 {
565  /*
566  * Nothing to do here at present, unlike most other SLRU modules; segments
567  * are created when the server is started with this module enabled. See
568  * ActivateCommitTs.
569  */
570 }
571 
572 /*
573  * Initialize (or reinitialize) a page of CommitTs to zeroes.
574  * If writeXlog is true, also emit an XLOG record saying we did this.
575  *
576  * The page is not actually written, just set up in shared memory.
577  * The slot number of the new page is returned.
578  *
579  * Control lock must be held at entry, and will be held at exit.
580  */
581 static int
582 ZeroCommitTsPage(int64 pageno, bool writeXlog)
583 {
584  int slotno;
585 
586  slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
587 
588  if (writeXlog)
589  WriteZeroPageXlogRec(pageno);
590 
591  return slotno;
592 }
593 
594 /*
595  * This must be called ONCE during postmaster or standalone-backend startup,
596  * after StartupXLOG has initialized ShmemVariableCache->nextXid.
597  */
598 void
600 {
602 }
603 
604 /*
605  * This must be called ONCE during postmaster or standalone-backend startup,
606  * after recovery has finished.
607  */
608 void
610 {
611  /*
612  * If the feature is not enabled, turn it off for good. This also removes
613  * any leftover data.
614  *
615  * Conversely, we activate the module if the feature is enabled. This is
616  * necessary for primary and standby as the activation depends on the
617  * control file contents at the beginning of recovery or when a
618  * XLOG_PARAMETER_CHANGE is replayed.
619  */
622  else
624 }
625 
626 /*
627  * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
628  * XLog record during recovery.
629  */
630 void
631 CommitTsParameterChange(bool newvalue, bool oldvalue)
632 {
633  /*
634  * If the commit_ts module is disabled in this server and we get word from
635  * the primary server that it is enabled there, activate it so that we can
636  * replay future WAL records involving it; also mark it as active on
637  * pg_control. If the old value was already set, we already did this, so
638  * don't do anything.
639  *
640  * If the module is disabled in the primary, disable it here too, unless
641  * the module is enabled locally.
642  *
643  * Note this only runs in the recovery process, so an unlocked read is
644  * fine.
645  */
646  if (newvalue)
647  {
650  }
651  else if (commitTsShared->commitTsActive)
653 }
654 
655 /*
656  * Activate this module whenever necessary.
657  * This must happen during postmaster or standalone-backend startup,
658  * or during WAL replay anytime the track_commit_timestamp setting is
659  * changed in the primary.
660  *
661  * The reason why this SLRU needs separate activation/deactivation functions is
662  * that it can be enabled/disabled during start and the activation/deactivation
663  * on the primary is propagated to the standby via replay. Other SLRUs don't
664  * have this property and they can be just initialized during normal startup.
665  *
666  * This is in charge of creating the currently active segment, if it's not
667  * already there. The reason for this is that the server might have been
668  * running with this module disabled for a while and thus might have skipped
669  * the normal creation point.
670  */
671 static void
673 {
674  TransactionId xid;
675  int64 pageno;
676 
677  /* If we've done this already, there's nothing to do */
678  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
680  {
681  LWLockRelease(CommitTsLock);
682  return;
683  }
684  LWLockRelease(CommitTsLock);
685 
687  pageno = TransactionIdToCTsPage(xid);
688 
689  /*
690  * Re-Initialize our idea of the latest page number.
691  */
692  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
693  CommitTsCtl->shared->latest_page_number = pageno;
694  LWLockRelease(CommitTsSLRULock);
695 
696  /*
697  * If CommitTs is enabled, but it wasn't in the previous server run, we
698  * need to set the oldest and newest values to the next Xid; that way, we
699  * will not try to read data that might not have been set.
700  *
701  * XXX does this have a problem if a server is started with commitTs
702  * enabled, then started with commitTs disabled, then restarted with it
703  * enabled again? It doesn't look like it does, because there should be a
704  * checkpoint that sets the value to InvalidTransactionId at end of
705  * recovery; and so any chance of injecting new transactions without
706  * CommitTs values would occur after the oldestCommitTsXid has been set to
707  * Invalid temporarily.
708  */
709  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
711  {
714  }
715  LWLockRelease(CommitTsLock);
716 
717  /* Create the current segment file, if necessary */
719  {
720  int slotno;
721 
722  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
723  slotno = ZeroCommitTsPage(pageno, false);
725  Assert(!CommitTsCtl->shared->page_dirty[slotno]);
726  LWLockRelease(CommitTsSLRULock);
727  }
728 
729  /* Change the activation status in shared memory. */
730  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
732  LWLockRelease(CommitTsLock);
733 }
734 
735 /*
736  * Deactivate this module.
737  *
738  * This must be called when the track_commit_timestamp parameter is turned off.
739  * This happens during postmaster or standalone-backend startup, or during WAL
740  * replay.
741  *
742  * Resets CommitTs into invalid state to make sure we don't hand back
743  * possibly-invalid data; also removes segments of old data.
744  */
745 static void
747 {
748  /*
749  * Cleanup the status in the shared memory.
750  *
751  * We reset everything in the commitTsShared record to prevent user from
752  * getting confusing data about last committed transaction on the standby
753  * when the module was activated repeatedly on the primary.
754  */
755  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
756 
761 
764 
765  LWLockRelease(CommitTsLock);
766 
767  /*
768  * Remove *all* files. This is necessary so that there are no leftover
769  * files; in the case where this feature is later enabled after running
770  * with it disabled for some time there may be a gap in the file sequence.
771  * (We can probably tolerate out-of-sequence files, as they are going to
772  * be overwritten anyway when we wrap around, but it seems better to be
773  * tidy.)
774  */
775  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
777  LWLockRelease(CommitTsSLRULock);
778 }
779 
780 /*
781  * Perform a checkpoint --- either during shutdown, or on-the-fly
782  */
783 void
785 {
786  /*
787  * Write dirty CommitTs pages to disk. This may result in sync requests
788  * queued for later handling by ProcessSyncRequests(), as part of the
789  * checkpoint.
790  */
792 }
793 
794 /*
795  * Make sure that CommitTs has room for a newly-allocated XID.
796  *
797  * NB: this is called while holding XidGenLock. We want it to be very fast
798  * most of the time; even when it's not so fast, no actual I/O need happen
799  * unless we're forced to write out a dirty CommitTs or xlog page to make room
800  * in shared memory.
801  *
802  * NB: the current implementation relies on track_commit_timestamp being
803  * PGC_POSTMASTER.
804  */
805 void
807 {
808  int64 pageno;
809 
810  /*
811  * Nothing to do if module not enabled. Note we do an unlocked read of
812  * the flag here, which is okay because this routine is only called from
813  * GetNewTransactionId, which is never called in a standby.
814  */
815  Assert(!InRecovery);
817  return;
818 
819  /*
820  * No work except at first XID of a page. But beware: just after
821  * wraparound, the first XID of page zero is FirstNormalTransactionId.
822  */
823  if (TransactionIdToCTsEntry(newestXact) != 0 &&
825  return;
826 
827  pageno = TransactionIdToCTsPage(newestXact);
828 
829  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
830 
831  /* Zero the page and make an XLOG entry about it */
832  ZeroCommitTsPage(pageno, !InRecovery);
833 
834  LWLockRelease(CommitTsSLRULock);
835 }
836 
837 /*
838  * Remove all CommitTs segments before the one holding the passed
839  * transaction ID.
840  *
841  * Note that we don't need to flush XLOG here.
842  */
843 void
845 {
846  int64 cutoffPage;
847 
848  /*
849  * The cutoff point is the start of the segment containing oldestXact. We
850  * pass the *page* containing oldestXact to SimpleLruTruncate.
851  */
852  cutoffPage = TransactionIdToCTsPage(oldestXact);
853 
854  /* Check to see if there's any files that could be removed */
856  &cutoffPage))
857  return; /* nothing to remove */
858 
859  /* Write XLOG record */
860  WriteTruncateXlogRec(cutoffPage, oldestXact);
861 
862  /* Now we can remove the old CommitTs segment(s) */
863  SimpleLruTruncate(CommitTsCtl, cutoffPage);
864 }
865 
866 /*
867  * Set the limit values between which commit TS can be consulted.
868  */
869 void
871 {
872  /*
873  * Be careful not to overwrite values that are either further into the
874  * "future" or signal a disabled committs.
875  */
876  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
878  {
883  }
884  else
885  {
889  }
890  LWLockRelease(CommitTsLock);
891 }
892 
893 /*
894  * Move forwards the oldest commitTS value that can be consulted
895  */
896 void
898 {
899  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
903  LWLockRelease(CommitTsLock);
904 }
905 
906 
907 /*
908  * Decide whether a commitTS page number is "older" for truncation purposes.
909  * Analogous to CLOGPagePrecedes().
910  *
911  * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128. This
912  * introduces differences compared to CLOG and the other SLRUs having (1 <<
913  * 31) % per_page == 0. This function never tests exactly
914  * TransactionIdPrecedes(x-2^31, x). When the system reaches xidStopLimit,
915  * there are two possible counts of page boundaries between oldestXact and the
916  * latest XID assigned, depending on whether oldestXact is within the first
917  * 128 entries of its page. Since this function doesn't know the location of
918  * oldestXact within page2, it returns false for one page that actually is
919  * expendable. This is a wider (yet still negligible) version of the
920  * truncation opportunity that CLOGPagePrecedes() cannot recognize.
921  *
922  * For the sake of a worked example, number entries with decimal values such
923  * that page1==1 entries range from 1.0 to 1.999. Let N+0.15 be the number of
924  * pages that 2^31 entries will span (N is an integer). If oldestXact=N+2.1,
925  * then the final safe XID assignment leaves newestXact=1.95. We keep page 2,
926  * because entry=2.85 is the border that toggles whether entries precede the
927  * last entry of the oldestXact page. While page 2 is expendable at
928  * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
929  */
930 static bool
931 CommitTsPagePrecedes(int64 page1, int64 page2)
932 {
933  TransactionId xid1;
934  TransactionId xid2;
935 
936  xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
937  xid1 += FirstNormalTransactionId + 1;
938  xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
939  xid2 += FirstNormalTransactionId + 1;
940 
941  return (TransactionIdPrecedes(xid1, xid2) &&
943 }
944 
945 
946 /*
947  * Write a ZEROPAGE xlog record
948  */
949 static void
950 WriteZeroPageXlogRec(int64 pageno)
951 {
952  XLogBeginInsert();
953  XLogRegisterData((char *) (&pageno), sizeof(pageno));
954  (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
955 }
956 
957 /*
958  * Write a TRUNCATE xlog record
959  */
960 static void
961 WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid)
962 {
963  xl_commit_ts_truncate xlrec;
964 
965  xlrec.pageno = pageno;
966  xlrec.oldestXid = oldestXid;
967 
968  XLogBeginInsert();
969  XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
970  (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
971 }
972 
973 /*
974  * CommitTS resource manager's routines
975  */
976 void
978 {
979  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
980 
981  /* Backup blocks are not used in commit_ts records */
982  Assert(!XLogRecHasAnyBlockRefs(record));
983 
984  if (info == COMMIT_TS_ZEROPAGE)
985  {
986  int64 pageno;
987  int slotno;
988 
989  memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
990 
991  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
992 
993  slotno = ZeroCommitTsPage(pageno, false);
995  Assert(!CommitTsCtl->shared->page_dirty[slotno]);
996 
997  LWLockRelease(CommitTsSLRULock);
998  }
999  else if (info == COMMIT_TS_TRUNCATE)
1000  {
1002 
1004 
1005  /*
1006  * During XLOG replay, latest_page_number isn't set up yet; insert a
1007  * suitable value to bypass the sanity test in SimpleLruTruncate.
1008  */
1009  CommitTsCtl->shared->latest_page_number = trunc->pageno;
1010 
1012  }
1013  else
1014  elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1015 }
1016 
1017 /*
1018  * Entrypoint for sync.c to sync commit_ts files.
1019  */
1020 int
1021 committssyncfiletag(const FileTag *ftag, char *path)
1022 {
1023  return SlruSyncFileTag(CommitTsCtl, ftag, path);
1024 }
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define Min(x, y)
Definition: c.h:993
#define Max(x, y)
Definition: c.h:987
unsigned char uint8
Definition: c.h:493
uint32 TransactionId
Definition: c.h:641
size_t Size
Definition: c.h:594
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, RepOriginId nodeid, int64 pageno)
Definition: commit_ts.c:224
static void WriteZeroPageXlogRec(int64 pageno)
Definition: commit_ts.c:950
void StartupCommitTs(void)
Definition: commit_ts.c:599
static SlruCtlData CommitTsCtlData
Definition: commit_ts.c:85
Datum pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
Definition: commit_ts.c:465
struct CommitTimestampEntry CommitTimestampEntry
struct CommitTimestampShared CommitTimestampShared
Datum pg_last_committed_xact(PG_FUNCTION_ARGS)
Definition: commit_ts.c:421
TransactionId GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:361
void CommitTsParameterChange(bool newvalue, bool oldvalue)
Definition: commit_ts.c:631
Size CommitTsShmemBuffers(void)
Definition: commit_ts.c:507
#define COMMIT_TS_XACTS_PER_PAGE
Definition: commit_ts.c:65
#define TransactionIdToCTsEntry(xid)
Definition: commit_ts.c:79
static void DeactivateCommitTs(void)
Definition: commit_ts.c:746
Size CommitTsShmemSize(void)
Definition: commit_ts.c:516
bool track_commit_timestamp
Definition: commit_ts.c:111
void AdvanceOldestCommitTsXid(TransactionId oldestXact)
Definition: commit_ts.c:897
static CommitTimestampShared * commitTsShared
Definition: commit_ts.c:107
int committssyncfiletag(const FileTag *ftag, char *path)
Definition: commit_ts.c:1021
void CompleteCommitTsInitialization(void)
Definition: commit_ts.c:609
static void ActivateCommitTs(void)
Definition: commit_ts.c:672
static int64 TransactionIdToCTsPage(TransactionId xid)
Definition: commit_ts.c:74
void TruncateCommitTs(TransactionId oldestXact)
Definition: commit_ts.c:844
void commit_ts_redo(XLogReaderState *record)
Definition: commit_ts.c:977
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:275
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, RepOriginId nodeid, int slotno)
Definition: commit_ts.c:250
Datum pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
Definition: commit_ts.c:398
static void error_commit_ts_disabled(void)
Definition: commit_ts.c:382
static bool CommitTsPagePrecedes(int64 page1, int64 page2)
Definition: commit_ts.c:931
#define SizeOfCommitTimestampEntry
Definition: commit_ts.c:62
void BootStrapCommitTs(void)
Definition: commit_ts.c:563
void CommitTsShmemInit(void)
Definition: commit_ts.c:527
void SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
Definition: commit_ts.c:870
#define CommitTsCtl
Definition: commit_ts.c:87
void ExtendCommitTs(TransactionId newestXact)
Definition: commit_ts.c:806
void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid)
Definition: commit_ts.c:143
static int ZeroCommitTsPage(int64 pageno, bool writeXlog)
Definition: commit_ts.c:582
static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid)
Definition: commit_ts.c:961
void CheckPointCommitTs(void)
Definition: commit_ts.c:784
#define COMMIT_TS_ZEROPAGE
Definition: commit_ts.h:47
#define SizeOfCommitTsTruncate
Definition: commit_ts.h:67
#define COMMIT_TS_TRUNCATE
Definition: commit_ts.h:48
int64 TimestampTz
Definition: timestamp.h:39
#define TIMESTAMP_NOBEGIN(j)
Definition: timestamp.h:159
int errhint(const char *fmt,...)
Definition: elog.c:1316
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define PANIC
Definition: elog.h:42
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
#define PG_RETURN_NULL()
Definition: fmgr.h:345
#define PG_GETARG_TRANSACTIONID(n)
Definition: fmgr.h:279
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:353
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:276
@ TYPEFUNC_COMPOSITE
Definition: funcapi.h:149
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
Definition: funcapi.h:230
int NBuffers
Definition: globals.c:138
bool IsUnderPostmaster
Definition: globals.c:115
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1117
int j
Definition: isn.c:74
int i
Definition: isn.c:73
Assert(fmt[strlen(fmt) - 1] !='\n')
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1195
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1808
@ LWTRANCHE_COMMITTS_BUFFER
Definition: lwlock.h:182
@ LW_SHARED
Definition: lwlock.h:117
@ LW_EXCLUSIVE
Definition: lwlock.h:116
#define InvalidRepOriginId
Definition: origin.h:33
int64 timestamp
static Datum TransactionIdGetDatum(TransactionId X)
Definition: postgres.h:272
uintptr_t Datum
Definition: postgres.h:64
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
unsigned int Oid
Definition: postgres_ext.h:31
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
Definition: slru.c:523
void SimpleLruWritePage(SlruCtl ctl, int slotno)
Definition: slru.c:642
void SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
Definition: slru.c:1184
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)
Definition: slru.c:214
bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int64 pageno)
Definition: slru.c:654
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1607
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
Definition: slru.c:1560
int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, TransactionId xid)
Definition: slru.c:423
int SlruSyncFileTag(SlruCtl ctl, const FileTag *ftag, char *path)
Definition: slru.c:1647
int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
Definition: slru.c:308
void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
Definition: slru.c:1254
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:182
bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int64 segpage, void *data)
Definition: slru.c:1528
#define SlruPagePrecedesUnitTests(ctl, per_page)
Definition: slru.h:165
TimestampTz time
Definition: commit_ts.c:58
RepOriginId nodeid
Definition: commit_ts.c:59
CommitTimestampEntry dataLastCommit
Definition: commit_ts.c:103
TransactionId xidLastCommit
Definition: commit_ts.c:102
Definition: sync.h:51
FullTransactionId nextXid
Definition: transam.h:220
TransactionId newestCommitTsXid
Definition: transam.h:233
TransactionId oldestCommitTsXid
Definition: transam.h:232
TransactionId oldestXid
Definition: commit_ts.h:64
@ SYNC_HANDLER_COMMIT_TS
Definition: sync.h:39
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
static TransactionId ReadNextTransactionId(void)
Definition: transam.h:315
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
#define XidFromFullTransactionId(x)
Definition: transam.h:48
#define FirstNormalTransactionId
Definition: transam.h:34
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
#define PG_RETURN_TIMESTAMPTZ(x)
Definition: timestamp.h:68
VariableCache ShmemVariableCache
Definition: varsup.c:34
bool RecoveryInProgress(void)
Definition: xlog.c:6037
uint16 RepOriginId
Definition: xlogdefs.h:65
void XLogRegisterData(char *data, uint32 len)
Definition: xloginsert.c:365
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:475
void XLogBeginInsert(void)
Definition: xloginsert.c:150
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:410
#define XLogRecGetData(decoder)
Definition: xlogreader.h:415
#define XLogRecHasAnyBlockRefs(decoder)
Definition: xlogreader.h:417
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
bool InRecovery
Definition: xlogutils.c:53