PostgreSQL Source Code  git master
commit_ts.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * commit_ts.c
4  * PostgreSQL commit timestamp manager
5  *
6  * This module is a pg_xact-like system that stores the commit timestamp
7  * for each transaction.
8  *
9  * XLOG interactions: this module generates an XLOG record whenever a new
10  * CommitTs page is initialized to zeroes. Other writes of CommitTS come
11  * from recording of transaction commit in xact.c, which generates its own
12  * XLOG records for these events and will re-perform the status update on
13  * redo; so we need make no additional XLOG entry here.
14  *
15  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
16  * Portions Copyright (c) 1994, Regents of the University of California
17  *
18  * src/backend/access/transam/commit_ts.c
19  *
20  *-------------------------------------------------------------------------
21  */
22 #include "postgres.h"
23 
24 #include "access/commit_ts.h"
25 #include "access/htup_details.h"
26 #include "access/slru.h"
27 #include "access/transam.h"
28 #include "access/xloginsert.h"
29 #include "access/xlogutils.h"
30 #include "funcapi.h"
31 #include "miscadmin.h"
32 #include "storage/shmem.h"
33 #include "utils/fmgrprotos.h"
34 #include "utils/guc_hooks.h"
35 #include "utils/timestamp.h"
36 
37 /*
38  * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
39  * everywhere else in Postgres.
40  *
41  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
42  * CommitTs page numbering also wraps around at
43  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
44  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
45  * explicit notice of that fact in this module, except when comparing segment
46  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
47  */
48 
49 /*
50  * We need 8+2 bytes per xact. Note that enlarging this struct might mean
51  * the largest possible file name is more than 5 chars long; see
52  * SlruScanDirectory.
53  */
54 typedef struct CommitTimestampEntry
55 {
59 
60 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
61  sizeof(RepOriginId))
62 
63 #define COMMIT_TS_XACTS_PER_PAGE \
64  (BLCKSZ / SizeOfCommitTimestampEntry)
65 
66 
67 /*
68  * Although we return an int64 the actual value can't currently exceed
69  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE.
70  */
71 static inline int64
73 {
74  return xid / (int64) COMMIT_TS_XACTS_PER_PAGE;
75 }
76 
77 #define TransactionIdToCTsEntry(xid) \
78  ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
79 
80 /*
81  * Link to shared-memory data structures for CommitTs control
82  */
84 
85 #define CommitTsCtl (&CommitTsCtlData)
86 
87 /*
88  * We keep a cache of the last value set in shared memory.
89  *
90  * This is also good place to keep the activation status. We keep this
91  * separate from the GUC so that the standby can activate the module if the
92  * primary has it active independently of the value of the GUC.
93  *
94  * This is protected by CommitTsLock. In some places, we use commitTsActive
95  * without acquiring the lock; where this happens, a comment explains the
96  * rationale for it.
97  */
98 typedef struct CommitTimestampShared
99 {
104 
106 
107 
108 /* GUC variable */
110 
111 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
112  TransactionId *subxids, TimestampTz ts,
113  RepOriginId nodeid, int64 pageno);
115  RepOriginId nodeid, int slotno);
116 static void error_commit_ts_disabled(void);
117 static int ZeroCommitTsPage(int64 pageno, bool writeXlog);
118 static bool CommitTsPagePrecedes(int64 page1, int64 page2);
119 static void ActivateCommitTs(void);
120 static void DeactivateCommitTs(void);
121 static void WriteZeroPageXlogRec(int64 pageno);
122 static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid);
123 
124 /*
125  * TransactionTreeSetCommitTsData
126  *
127  * Record the final commit timestamp of transaction entries in the commit log
128  * for a transaction and its subtransaction tree, as efficiently as possible.
129  *
130  * xid is the top level transaction id.
131  *
132  * subxids is an array of xids of length nsubxids, representing subtransactions
133  * in the tree of xid. In various cases nsubxids may be zero.
134  * The reason why tracking just the parent xid commit timestamp is not enough
135  * is that the subtrans SLRU does not stay valid across crashes (it's not
136  * permanent) so we need to keep the information about them here. If the
137  * subtrans implementation changes in the future, we might want to revisit the
138  * decision of storing timestamp info for each subxid.
139  */
140 void
143  RepOriginId nodeid)
144 {
145  int i;
146  TransactionId headxid;
147  TransactionId newestXact;
148 
149  /*
150  * No-op if the module is not active.
151  *
152  * An unlocked read here is fine, because in a standby (the only place
153  * where the flag can change in flight) this routine is only called by the
154  * recovery process, which is also the only process which can change the
155  * flag.
156  */
158  return;
159 
160  /*
161  * Figure out the latest Xid in this batch: either the last subxid if
162  * there's any, otherwise the parent xid.
163  */
164  if (nsubxids > 0)
165  newestXact = subxids[nsubxids - 1];
166  else
167  newestXact = xid;
168 
169  /*
170  * We split the xids to set the timestamp to in groups belonging to the
171  * same SLRU page; the first element in each such set is its head. The
172  * first group has the main XID as the head; subsequent sets use the first
173  * subxid not on the previous page as head. This way, we only have to
174  * lock/modify each SLRU page once.
175  */
176  headxid = xid;
177  i = 0;
178  for (;;)
179  {
180  int64 pageno = TransactionIdToCTsPage(headxid);
181  int j;
182 
183  for (j = i; j < nsubxids; j++)
184  {
185  if (TransactionIdToCTsPage(subxids[j]) != pageno)
186  break;
187  }
188  /* subxids[i..j] are on the same page as the head */
189 
190  SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
191  pageno);
192 
193  /* if we wrote out all subxids, we're done. */
194  if (j >= nsubxids)
195  break;
196 
197  /*
198  * Set the new head and skip over it, as well as over the subxids we
199  * just wrote.
200  */
201  headxid = subxids[j];
202  i = j + 1;
203  }
204 
205  /* update the cached value in shared memory */
206  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
210 
211  /* and move forwards our endpoint, if needed */
213  TransamVariables->newestCommitTsXid = newestXact;
214  LWLockRelease(CommitTsLock);
215 }
216 
217 /*
218  * Record the commit timestamp of transaction entries in the commit log for all
219  * entries on a single page. Atomic only on this page.
220  */
221 static void
223  TransactionId *subxids, TimestampTz ts,
224  RepOriginId nodeid, int64 pageno)
225 {
226  LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
227  int slotno;
228  int i;
229 
231 
232  slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
233 
234  TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
235  for (i = 0; i < nsubxids; i++)
236  TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
237 
238  CommitTsCtl->shared->page_dirty[slotno] = true;
239 
240  LWLockRelease(lock);
241 }
242 
243 /*
244  * Sets the commit timestamp of a single transaction.
245  *
246  * Caller must hold the correct SLRU bank lock, will be held at exit
247  */
248 static void
250  RepOriginId nodeid, int slotno)
251 {
252  int entryno = TransactionIdToCTsEntry(xid);
253  CommitTimestampEntry entry;
254 
256 
257  entry.time = ts;
258  entry.nodeid = nodeid;
259 
260  memcpy(CommitTsCtl->shared->page_buffer[slotno] +
261  SizeOfCommitTimestampEntry * entryno,
263 }
264 
265 /*
266  * Interrogate the commit timestamp of a transaction.
267  *
268  * The return value indicates whether a commit timestamp record was found for
269  * the given xid. The timestamp value is returned in *ts (which may not be
270  * null), and the origin node for the Xid is returned in *nodeid, if it's not
271  * null.
272  */
273 bool
275  RepOriginId *nodeid)
276 {
277  int64 pageno = TransactionIdToCTsPage(xid);
278  int entryno = TransactionIdToCTsEntry(xid);
279  int slotno;
280  CommitTimestampEntry entry;
281  TransactionId oldestCommitTsXid;
282  TransactionId newestCommitTsXid;
283 
284  if (!TransactionIdIsValid(xid))
285  ereport(ERROR,
286  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
287  errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
288  else if (!TransactionIdIsNormal(xid))
289  {
290  /* frozen and bootstrap xids are always committed far in the past */
291  *ts = 0;
292  if (nodeid)
293  *nodeid = 0;
294  return false;
295  }
296 
297  LWLockAcquire(CommitTsLock, LW_SHARED);
298 
299  /* Error if module not enabled */
302 
303  /*
304  * If we're asked for the cached value, return that. Otherwise, fall
305  * through to read from SLRU.
306  */
307  if (commitTsShared->xidLastCommit == xid)
308  {
310  if (nodeid)
312 
313  LWLockRelease(CommitTsLock);
314  return *ts != 0;
315  }
316 
317  oldestCommitTsXid = TransamVariables->oldestCommitTsXid;
318  newestCommitTsXid = TransamVariables->newestCommitTsXid;
319  /* neither is invalid, or both are */
320  Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
321  LWLockRelease(CommitTsLock);
322 
323  /*
324  * Return empty if the requested value is outside our valid range.
325  */
326  if (!TransactionIdIsValid(oldestCommitTsXid) ||
327  TransactionIdPrecedes(xid, oldestCommitTsXid) ||
328  TransactionIdPrecedes(newestCommitTsXid, xid))
329  {
330  *ts = 0;
331  if (nodeid)
332  *nodeid = InvalidRepOriginId;
333  return false;
334  }
335 
336  /* lock is acquired by SimpleLruReadPage_ReadOnly */
337  slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
338  memcpy(&entry,
339  CommitTsCtl->shared->page_buffer[slotno] +
340  SizeOfCommitTimestampEntry * entryno,
342 
343  *ts = entry.time;
344  if (nodeid)
345  *nodeid = entry.nodeid;
346 
348  return *ts != 0;
349 }
350 
351 /*
352  * Return the Xid of the latest committed transaction. (As far as this module
353  * is concerned, anyway; it's up to the caller to ensure the value is useful
354  * for its purposes.)
355  *
356  * ts and nodeid are filled with the corresponding data; they can be passed
357  * as NULL if not wanted.
358  */
361 {
362  TransactionId xid;
363 
364  LWLockAcquire(CommitTsLock, LW_SHARED);
365 
366  /* Error if module not enabled */
369 
371  if (ts)
373  if (nodeid)
375  LWLockRelease(CommitTsLock);
376 
377  return xid;
378 }
379 
380 static void
382 {
383  ereport(ERROR,
384  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
385  errmsg("could not get commit timestamp data"),
387  errhint("Make sure the configuration parameter %s is set on the primary server.",
388  "track_commit_timestamp") :
389  errhint("Make sure the configuration parameter %s is set.",
390  "track_commit_timestamp")));
391 }
392 
393 /*
394  * SQL-callable wrapper to obtain commit time of a transaction
395  */
396 Datum
398 {
400  TimestampTz ts;
401  bool found;
402 
403  found = TransactionIdGetCommitTsData(xid, &ts, NULL);
404 
405  if (!found)
406  PG_RETURN_NULL();
407 
409 }
410 
411 
412 /*
413  * pg_last_committed_xact
414  *
415  * SQL-callable wrapper to obtain some information about the latest
416  * committed transaction: transaction ID, timestamp and replication
417  * origin.
418  */
419 Datum
421 {
422  TransactionId xid;
423  RepOriginId nodeid;
424  TimestampTz ts;
425  Datum values[3];
426  bool nulls[3];
427  TupleDesc tupdesc;
428  HeapTuple htup;
429 
430  /* and construct a tuple with our data */
431  xid = GetLatestCommitTsData(&ts, &nodeid);
432 
433  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
434  elog(ERROR, "return type must be a row type");
435 
436  if (!TransactionIdIsNormal(xid))
437  {
438  memset(nulls, true, sizeof(nulls));
439  }
440  else
441  {
442  values[0] = TransactionIdGetDatum(xid);
443  nulls[0] = false;
444 
445  values[1] = TimestampTzGetDatum(ts);
446  nulls[1] = false;
447 
448  values[2] = ObjectIdGetDatum((Oid) nodeid);
449  nulls[2] = false;
450  }
451 
452  htup = heap_form_tuple(tupdesc, values, nulls);
453 
455 }
456 
457 /*
458  * pg_xact_commit_timestamp_origin
459  *
460  * SQL-callable wrapper to obtain commit timestamp and replication origin
461  * of a given transaction.
462  */
463 Datum
465 {
467  RepOriginId nodeid;
468  TimestampTz ts;
469  Datum values[2];
470  bool nulls[2];
471  TupleDesc tupdesc;
472  HeapTuple htup;
473  bool found;
474 
475  found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
476 
477  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
478  elog(ERROR, "return type must be a row type");
479 
480  if (!found)
481  {
482  memset(nulls, true, sizeof(nulls));
483  }
484  else
485  {
486  values[0] = TimestampTzGetDatum(ts);
487  nulls[0] = false;
488 
489  values[1] = ObjectIdGetDatum((Oid) nodeid);
490  nulls[1] = false;
491  }
492 
493  htup = heap_form_tuple(tupdesc, values, nulls);
494 
496 }
497 
498 /*
499  * Number of shared CommitTS buffers.
500  *
501  * If asked to autotune, use 2MB for every 1GB of shared buffers, up to 8MB.
502  * Otherwise just cap the configured amount to be between 16 and the maximum
503  * allowed.
504  */
505 static int
507 {
508  /* auto-tune based on shared buffers */
509  if (commit_timestamp_buffers == 0)
510  return SimpleLruAutotuneBuffers(512, 1024);
511 
513 }
514 
515 /*
516  * Shared memory sizing for CommitTs
517  */
518 Size
520 {
522  sizeof(CommitTimestampShared);
523 }
524 
525 /*
526  * Initialize CommitTs at system startup (postmaster start or standalone
527  * backend)
528  */
529 void
531 {
532  bool found;
533 
534  /* If auto-tuning is requested, now is the time to do it */
535  if (commit_timestamp_buffers == 0)
536  {
537  char buf[32];
538 
539  snprintf(buf, sizeof(buf), "%d", CommitTsShmemBuffers());
540  SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
542 
543  /*
544  * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
545  * However, if the DBA explicitly set commit_timestamp_buffers = 0 in
546  * the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override
547  * that and we must force the matter with PGC_S_OVERRIDE.
548  */
549  if (commit_timestamp_buffers == 0) /* failed to apply it? */
550  SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
552  }
554 
555  CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
556  SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
557  "pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER,
560  false);
562 
563  commitTsShared = ShmemInitStruct("CommitTs shared",
564  sizeof(CommitTimestampShared),
565  &found);
566 
567  if (!IsUnderPostmaster)
568  {
569  Assert(!found);
570 
575  }
576  else
577  Assert(found);
578 }
579 
580 /*
581  * GUC check_hook for commit_timestamp_buffers
582  */
583 bool
585 {
586  return check_slru_buffers("commit_timestamp_buffers", newval);
587 }
588 
589 /*
590  * This function must be called ONCE on system install.
591  *
592  * (The CommitTs directory is assumed to have been created by initdb, and
593  * CommitTsShmemInit must have been called already.)
594  */
595 void
597 {
598  /*
599  * Nothing to do here at present, unlike most other SLRU modules; segments
600  * are created when the server is started with this module enabled. See
601  * ActivateCommitTs.
602  */
603 }
604 
605 /*
606  * Initialize (or reinitialize) a page of CommitTs to zeroes.
607  * If writeXlog is true, also emit an XLOG record saying we did this.
608  *
609  * The page is not actually written, just set up in shared memory.
610  * The slot number of the new page is returned.
611  *
612  * Control lock must be held at entry, and will be held at exit.
613  */
614 static int
615 ZeroCommitTsPage(int64 pageno, bool writeXlog)
616 {
617  int slotno;
618 
619  slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
620 
621  if (writeXlog)
622  WriteZeroPageXlogRec(pageno);
623 
624  return slotno;
625 }
626 
627 /*
628  * This must be called ONCE during postmaster or standalone-backend startup,
629  * after StartupXLOG has initialized TransamVariables->nextXid.
630  */
631 void
633 {
635 }
636 
637 /*
638  * This must be called ONCE during postmaster or standalone-backend startup,
639  * after recovery has finished.
640  */
641 void
643 {
644  /*
645  * If the feature is not enabled, turn it off for good. This also removes
646  * any leftover data.
647  *
648  * Conversely, we activate the module if the feature is enabled. This is
649  * necessary for primary and standby as the activation depends on the
650  * control file contents at the beginning of recovery or when a
651  * XLOG_PARAMETER_CHANGE is replayed.
652  */
655  else
657 }
658 
659 /*
660  * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
661  * XLog record during recovery.
662  */
663 void
664 CommitTsParameterChange(bool newvalue, bool oldvalue)
665 {
666  /*
667  * If the commit_ts module is disabled in this server and we get word from
668  * the primary server that it is enabled there, activate it so that we can
669  * replay future WAL records involving it; also mark it as active on
670  * pg_control. If the old value was already set, we already did this, so
671  * don't do anything.
672  *
673  * If the module is disabled in the primary, disable it here too, unless
674  * the module is enabled locally.
675  *
676  * Note this only runs in the recovery process, so an unlocked read is
677  * fine.
678  */
679  if (newvalue)
680  {
683  }
684  else if (commitTsShared->commitTsActive)
686 }
687 
688 /*
689  * Activate this module whenever necessary.
690  * This must happen during postmaster or standalone-backend startup,
691  * or during WAL replay anytime the track_commit_timestamp setting is
692  * changed in the primary.
693  *
694  * The reason why this SLRU needs separate activation/deactivation functions is
695  * that it can be enabled/disabled during start and the activation/deactivation
696  * on the primary is propagated to the standby via replay. Other SLRUs don't
697  * have this property and they can be just initialized during normal startup.
698  *
699  * This is in charge of creating the currently active segment, if it's not
700  * already there. The reason for this is that the server might have been
701  * running with this module disabled for a while and thus might have skipped
702  * the normal creation point.
703  */
704 static void
706 {
707  TransactionId xid;
708  int64 pageno;
709 
710  /* If we've done this already, there's nothing to do */
711  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
713  {
714  LWLockRelease(CommitTsLock);
715  return;
716  }
717  LWLockRelease(CommitTsLock);
718 
720  pageno = TransactionIdToCTsPage(xid);
721 
722  /*
723  * Re-Initialize our idea of the latest page number.
724  */
725  pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number, pageno);
726 
727  /*
728  * If CommitTs is enabled, but it wasn't in the previous server run, we
729  * need to set the oldest and newest values to the next Xid; that way, we
730  * will not try to read data that might not have been set.
731  *
732  * XXX does this have a problem if a server is started with commitTs
733  * enabled, then started with commitTs disabled, then restarted with it
734  * enabled again? It doesn't look like it does, because there should be a
735  * checkpoint that sets the value to InvalidTransactionId at end of
736  * recovery; and so any chance of injecting new transactions without
737  * CommitTs values would occur after the oldestCommitTsXid has been set to
738  * Invalid temporarily.
739  */
740  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
742  {
745  }
746  LWLockRelease(CommitTsLock);
747 
748  /* Create the current segment file, if necessary */
750  {
751  LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
752  int slotno;
753 
755  slotno = ZeroCommitTsPage(pageno, false);
757  Assert(!CommitTsCtl->shared->page_dirty[slotno]);
758  LWLockRelease(lock);
759  }
760 
761  /* Change the activation status in shared memory. */
762  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
764  LWLockRelease(CommitTsLock);
765 }
766 
767 /*
768  * Deactivate this module.
769  *
770  * This must be called when the track_commit_timestamp parameter is turned off.
771  * This happens during postmaster or standalone-backend startup, or during WAL
772  * replay.
773  *
774  * Resets CommitTs into invalid state to make sure we don't hand back
775  * possibly-invalid data; also removes segments of old data.
776  */
777 static void
779 {
780  /*
781  * Cleanup the status in the shared memory.
782  *
783  * We reset everything in the commitTsShared record to prevent user from
784  * getting confusing data about last committed transaction on the standby
785  * when the module was activated repeatedly on the primary.
786  */
787  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
788 
793 
796 
797  /*
798  * Remove *all* files. This is necessary so that there are no leftover
799  * files; in the case where this feature is later enabled after running
800  * with it disabled for some time there may be a gap in the file sequence.
801  * (We can probably tolerate out-of-sequence files, as they are going to
802  * be overwritten anyway when we wrap around, but it seems better to be
803  * tidy.)
804  *
805  * Note that we do this with CommitTsLock acquired in exclusive mode. This
806  * is very heavy-handed, but since this routine can only be called in the
807  * replica and should happen very rarely, we don't worry too much about
808  * it. Note also that no process should be consulting this SLRU if we
809  * have just deactivated it.
810  */
812 
813  LWLockRelease(CommitTsLock);
814 }
815 
816 /*
817  * Perform a checkpoint --- either during shutdown, or on-the-fly
818  */
819 void
821 {
822  /*
823  * Write dirty CommitTs pages to disk. This may result in sync requests
824  * queued for later handling by ProcessSyncRequests(), as part of the
825  * checkpoint.
826  */
828 }
829 
830 /*
831  * Make sure that CommitTs has room for a newly-allocated XID.
832  *
833  * NB: this is called while holding XidGenLock. We want it to be very fast
834  * most of the time; even when it's not so fast, no actual I/O need happen
835  * unless we're forced to write out a dirty CommitTs or xlog page to make room
836  * in shared memory.
837  *
838  * NB: the current implementation relies on track_commit_timestamp being
839  * PGC_POSTMASTER.
840  */
841 void
843 {
844  int64 pageno;
845  LWLock *lock;
846 
847  /*
848  * Nothing to do if module not enabled. Note we do an unlocked read of
849  * the flag here, which is okay because this routine is only called from
850  * GetNewTransactionId, which is never called in a standby.
851  */
852  Assert(!InRecovery);
854  return;
855 
856  /*
857  * No work except at first XID of a page. But beware: just after
858  * wraparound, the first XID of page zero is FirstNormalTransactionId.
859  */
860  if (TransactionIdToCTsEntry(newestXact) != 0 &&
862  return;
863 
864  pageno = TransactionIdToCTsPage(newestXact);
865 
866  lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
867 
869 
870  /* Zero the page and make an XLOG entry about it */
871  ZeroCommitTsPage(pageno, !InRecovery);
872 
873  LWLockRelease(lock);
874 }
875 
876 /*
877  * Remove all CommitTs segments before the one holding the passed
878  * transaction ID.
879  *
880  * Note that we don't need to flush XLOG here.
881  */
882 void
884 {
885  int64 cutoffPage;
886 
887  /*
888  * The cutoff point is the start of the segment containing oldestXact. We
889  * pass the *page* containing oldestXact to SimpleLruTruncate.
890  */
891  cutoffPage = TransactionIdToCTsPage(oldestXact);
892 
893  /* Check to see if there's any files that could be removed */
895  &cutoffPage))
896  return; /* nothing to remove */
897 
898  /* Write XLOG record */
899  WriteTruncateXlogRec(cutoffPage, oldestXact);
900 
901  /* Now we can remove the old CommitTs segment(s) */
902  SimpleLruTruncate(CommitTsCtl, cutoffPage);
903 }
904 
905 /*
906  * Set the limit values between which commit TS can be consulted.
907  */
908 void
910 {
911  /*
912  * Be careful not to overwrite values that are either further into the
913  * "future" or signal a disabled committs.
914  */
915  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
917  {
919  TransamVariables->oldestCommitTsXid = oldestXact;
921  TransamVariables->newestCommitTsXid = newestXact;
922  }
923  else
924  {
926  TransamVariables->oldestCommitTsXid = oldestXact;
927  TransamVariables->newestCommitTsXid = newestXact;
928  }
929  LWLockRelease(CommitTsLock);
930 }
931 
932 /*
933  * Move forwards the oldest commitTS value that can be consulted
934  */
935 void
937 {
938  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
941  TransamVariables->oldestCommitTsXid = oldestXact;
942  LWLockRelease(CommitTsLock);
943 }
944 
945 
946 /*
947  * Decide whether a commitTS page number is "older" for truncation purposes.
948  * Analogous to CLOGPagePrecedes().
949  *
950  * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128. This
951  * introduces differences compared to CLOG and the other SLRUs having (1 <<
952  * 31) % per_page == 0. This function never tests exactly
953  * TransactionIdPrecedes(x-2^31, x). When the system reaches xidStopLimit,
954  * there are two possible counts of page boundaries between oldestXact and the
955  * latest XID assigned, depending on whether oldestXact is within the first
956  * 128 entries of its page. Since this function doesn't know the location of
957  * oldestXact within page2, it returns false for one page that actually is
958  * expendable. This is a wider (yet still negligible) version of the
959  * truncation opportunity that CLOGPagePrecedes() cannot recognize.
960  *
961  * For the sake of a worked example, number entries with decimal values such
962  * that page1==1 entries range from 1.0 to 1.999. Let N+0.15 be the number of
963  * pages that 2^31 entries will span (N is an integer). If oldestXact=N+2.1,
964  * then the final safe XID assignment leaves newestXact=1.95. We keep page 2,
965  * because entry=2.85 is the border that toggles whether entries precede the
966  * last entry of the oldestXact page. While page 2 is expendable at
967  * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
968  */
969 static bool
970 CommitTsPagePrecedes(int64 page1, int64 page2)
971 {
972  TransactionId xid1;
973  TransactionId xid2;
974 
975  xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
976  xid1 += FirstNormalTransactionId + 1;
977  xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
978  xid2 += FirstNormalTransactionId + 1;
979 
980  return (TransactionIdPrecedes(xid1, xid2) &&
982 }
983 
984 
985 /*
986  * Write a ZEROPAGE xlog record
987  */
988 static void
989 WriteZeroPageXlogRec(int64 pageno)
990 {
991  XLogBeginInsert();
992  XLogRegisterData((char *) (&pageno), sizeof(pageno));
993  (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
994 }
995 
996 /*
997  * Write a TRUNCATE xlog record
998  */
999 static void
1000 WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid)
1001 {
1002  xl_commit_ts_truncate xlrec;
1003 
1004  xlrec.pageno = pageno;
1005  xlrec.oldestXid = oldestXid;
1006 
1007  XLogBeginInsert();
1008  XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
1009  (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
1010 }
1011 
1012 /*
1013  * CommitTS resource manager's routines
1014  */
1015 void
1017 {
1018  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
1019 
1020  /* Backup blocks are not used in commit_ts records */
1021  Assert(!XLogRecHasAnyBlockRefs(record));
1022 
1023  if (info == COMMIT_TS_ZEROPAGE)
1024  {
1025  int64 pageno;
1026  int slotno;
1027  LWLock *lock;
1028 
1029  memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
1030 
1031  lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
1032  LWLockAcquire(lock, LW_EXCLUSIVE);
1033 
1034  slotno = ZeroCommitTsPage(pageno, false);
1036  Assert(!CommitTsCtl->shared->page_dirty[slotno]);
1037 
1038  LWLockRelease(lock);
1039  }
1040  else if (info == COMMIT_TS_TRUNCATE)
1041  {
1043 
1045 
1046  /*
1047  * During XLOG replay, latest_page_number isn't set up yet; insert a
1048  * suitable value to bypass the sanity test in SimpleLruTruncate.
1049  */
1050  pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number,
1051  trunc->pageno);
1052 
1054  }
1055  else
1056  elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1057 }
1058 
1059 /*
1060  * Entrypoint for sync.c to sync commit_ts files.
1061  */
1062 int
1063 committssyncfiletag(const FileTag *ftag, char *path)
1064 {
1065  return SlruSyncFileTag(CommitTsCtl, ftag, path);
1066 }
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:480
static Datum values[MAXATTR]
Definition: bootstrap.c:152
#define Min(x, y)
Definition: c.h:991
#define Max(x, y)
Definition: c.h:985
unsigned char uint8
Definition: c.h:491
uint32 TransactionId
Definition: c.h:639
size_t Size
Definition: c.h:592
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, RepOriginId nodeid, int64 pageno)
Definition: commit_ts.c:222
static void WriteZeroPageXlogRec(int64 pageno)
Definition: commit_ts.c:989
void StartupCommitTs(void)
Definition: commit_ts.c:632
static SlruCtlData CommitTsCtlData
Definition: commit_ts.c:83
Datum pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
Definition: commit_ts.c:464
struct CommitTimestampEntry CommitTimestampEntry
struct CommitTimestampShared CommitTimestampShared
Datum pg_last_committed_xact(PG_FUNCTION_ARGS)
Definition: commit_ts.c:420
TransactionId GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:360
void CommitTsParameterChange(bool newvalue, bool oldvalue)
Definition: commit_ts.c:664
#define COMMIT_TS_XACTS_PER_PAGE
Definition: commit_ts.c:63
#define TransactionIdToCTsEntry(xid)
Definition: commit_ts.c:77
static void DeactivateCommitTs(void)
Definition: commit_ts.c:778
Size CommitTsShmemSize(void)
Definition: commit_ts.c:519
bool track_commit_timestamp
Definition: commit_ts.c:109
void AdvanceOldestCommitTsXid(TransactionId oldestXact)
Definition: commit_ts.c:936
static CommitTimestampShared * commitTsShared
Definition: commit_ts.c:105
int committssyncfiletag(const FileTag *ftag, char *path)
Definition: commit_ts.c:1063
void CompleteCommitTsInitialization(void)
Definition: commit_ts.c:642
bool check_commit_ts_buffers(int *newval, void **extra, GucSource source)
Definition: commit_ts.c:584
static void ActivateCommitTs(void)
Definition: commit_ts.c:705
static int64 TransactionIdToCTsPage(TransactionId xid)
Definition: commit_ts.c:72
void TruncateCommitTs(TransactionId oldestXact)
Definition: commit_ts.c:883
void commit_ts_redo(XLogReaderState *record)
Definition: commit_ts.c:1016
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:274
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, RepOriginId nodeid, int slotno)
Definition: commit_ts.c:249
Datum pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
Definition: commit_ts.c:397
static int CommitTsShmemBuffers(void)
Definition: commit_ts.c:506
static void error_commit_ts_disabled(void)
Definition: commit_ts.c:381
static bool CommitTsPagePrecedes(int64 page1, int64 page2)
Definition: commit_ts.c:970
#define SizeOfCommitTimestampEntry
Definition: commit_ts.c:60
void BootStrapCommitTs(void)
Definition: commit_ts.c:596
void CommitTsShmemInit(void)
Definition: commit_ts.c:530
void SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
Definition: commit_ts.c:909
#define CommitTsCtl
Definition: commit_ts.c:85
void ExtendCommitTs(TransactionId newestXact)
Definition: commit_ts.c:842
void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid)
Definition: commit_ts.c:141
static int ZeroCommitTsPage(int64 pageno, bool writeXlog)
Definition: commit_ts.c:615
static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid)
Definition: commit_ts.c:1000
void CheckPointCommitTs(void)
Definition: commit_ts.c:820
#define COMMIT_TS_ZEROPAGE
Definition: commit_ts.h:46
#define SizeOfCommitTsTruncate
Definition: commit_ts.h:66
#define COMMIT_TS_TRUNCATE
Definition: commit_ts.h:47
int64 TimestampTz
Definition: timestamp.h:39
#define TIMESTAMP_NOBEGIN(j)
Definition: timestamp.h:159
int errhint(const char *fmt,...)
Definition: elog.c:1319
int errcode(int sqlerrcode)
Definition: elog.c:859
int errmsg(const char *fmt,...)
Definition: elog.c:1072
#define PANIC
Definition: elog.h:42
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:224
#define ereport(elevel,...)
Definition: elog.h:149
#define PG_RETURN_NULL()
Definition: fmgr.h:345
#define PG_GETARG_TRANSACTIONID(n)
Definition: fmgr.h:279
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:353
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:276
@ TYPEFUNC_COMPOSITE
Definition: funcapi.h:149
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
Definition: funcapi.h:230
bool IsUnderPostmaster
Definition: globals.c:117
int commit_timestamp_buffers
Definition: globals.c:162
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:4275
#define newval
GucSource
Definition: guc.h:108
@ PGC_S_DYNAMIC_DEFAULT
Definition: guc.h:110
@ PGC_S_OVERRIDE
Definition: guc.h:119
@ PGC_POSTMASTER
Definition: guc.h:70
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1116
int j
Definition: isn.c:74
int i
Definition: isn.c:73
Assert(fmt[strlen(fmt) - 1] !='\n')
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1169
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1782
@ LWTRANCHE_COMMITTS_BUFFER
Definition: lwlock.h:180
@ LWTRANCHE_COMMITTS_SLRU
Definition: lwlock.h:210
@ LW_SHARED
Definition: lwlock.h:115
@ LW_EXCLUSIVE
Definition: lwlock.h:114
#define InvalidRepOriginId
Definition: origin.h:33
static rewind_source * source
Definition: pg_rewind.c:89
static char * buf
Definition: pg_test_fsync.c:73
int64 timestamp
#define snprintf
Definition: port.h:238
static Datum TransactionIdGetDatum(TransactionId X)
Definition: postgres.h:272
uintptr_t Datum
Definition: postgres.h:64
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
unsigned int Oid
Definition: postgres_ext.h:31
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)
Definition: slru.c:238
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
Definition: slru.c:591
void SimpleLruWritePage(SlruCtl ctl, int slotno)
Definition: slru.c:715
void SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
Definition: slru.c:1305
int SimpleLruAutotuneBuffers(int divisor, int max)
Definition: slru.c:217
bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int64 pageno)
Definition: slru.c:729
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1774
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
Definition: slru.c:1727
int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, TransactionId xid)
Definition: slru.c:488
int SlruSyncFileTag(SlruCtl ctl, const FileTag *ftag, char *path)
Definition: slru.c:1814
int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
Definition: slru.c:361
void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
Definition: slru.c:1391
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:184
bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int64 segpage, void *data)
Definition: slru.c:1695
bool check_slru_buffers(const char *name, int *newval)
Definition: slru.c:341
static LWLock * SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)
Definition: slru.h:179
#define SlruPagePrecedesUnitTests(ctl, per_page)
Definition: slru.h:203
#define SLRU_MAX_ALLOWED_BUFFERS
Definition: slru.h:24
TimestampTz time
Definition: commit_ts.c:56
RepOriginId nodeid
Definition: commit_ts.c:57
CommitTimestampEntry dataLastCommit
Definition: commit_ts.c:101
TransactionId xidLastCommit
Definition: commit_ts.c:100
Definition: sync.h:51
Definition: lwlock.h:42
TransactionId oldestCommitTsXid
Definition: transam.h:232
TransactionId newestCommitTsXid
Definition: transam.h:233
FullTransactionId nextXid
Definition: transam.h:220
TransactionId oldestXid
Definition: commit_ts.h:63
@ SYNC_HANDLER_COMMIT_TS
Definition: sync.h:39
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
static TransactionId ReadNextTransactionId(void)
Definition: transam.h:315
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
#define XidFromFullTransactionId(x)
Definition: transam.h:48
#define FirstNormalTransactionId
Definition: transam.h:34
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
#define PG_RETURN_TIMESTAMPTZ(x)
Definition: timestamp.h:68
TransamVariablesData * TransamVariables
Definition: varsup.c:34
bool RecoveryInProgress(void)
Definition: xlog.c:6201
uint16 RepOriginId
Definition: xlogdefs.h:65
void XLogRegisterData(char *data, uint32 len)
Definition: xloginsert.c:364
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:474
void XLogBeginInsert(void)
Definition: xloginsert.c:149
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:410
#define XLogRecGetData(decoder)
Definition: xlogreader.h:415
#define XLogRecHasAnyBlockRefs(decoder)
Definition: xlogreader.h:417
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
bool InRecovery
Definition: xlogutils.c:50