PostgreSQL Source Code git master
commit_ts.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * commit_ts.c
4 * PostgreSQL commit timestamp manager
5 *
6 * This module is a pg_xact-like system that stores the commit timestamp
7 * for each transaction.
8 *
9 * XLOG interactions: this module generates an XLOG record whenever a new
10 * CommitTs page is initialized to zeroes. Other writes of CommitTS come
11 * from recording of transaction commit in xact.c, which generates its own
12 * XLOG records for these events and will re-perform the status update on
13 * redo; so we need make no additional XLOG entry here.
14 *
15 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
16 * Portions Copyright (c) 1994, Regents of the University of California
17 *
18 * src/backend/access/transam/commit_ts.c
19 *
20 *-------------------------------------------------------------------------
21 */
22#include "postgres.h"
23
24#include "access/commit_ts.h"
25#include "access/htup_details.h"
26#include "access/slru.h"
27#include "access/transam.h"
28#include "access/xloginsert.h"
29#include "access/xlogutils.h"
30#include "funcapi.h"
31#include "miscadmin.h"
32#include "storage/shmem.h"
33#include "utils/fmgrprotos.h"
34#include "utils/guc_hooks.h"
35#include "utils/timestamp.h"
36
37/*
38 * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
39 * everywhere else in Postgres.
40 *
41 * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
42 * CommitTs page numbering also wraps around at
43 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
44 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
45 * explicit notice of that fact in this module, except when comparing segment
46 * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
47 */
48
49/*
50 * We need 8+2 bytes per xact. Note that enlarging this struct might mean
51 * the largest possible file name is more than 5 chars long; see
52 * SlruScanDirectory.
53 */
55{
59
60#define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
61 sizeof(RepOriginId))
62
63#define COMMIT_TS_XACTS_PER_PAGE \
64 (BLCKSZ / SizeOfCommitTimestampEntry)
65
66
67/*
68 * Although we return an int64 the actual value can't currently exceed
69 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE.
70 */
71static inline int64
73{
74 return xid / (int64) COMMIT_TS_XACTS_PER_PAGE;
75}
76
77#define TransactionIdToCTsEntry(xid) \
78 ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
79
80/*
81 * Link to shared-memory data structures for CommitTs control
82 */
84
85#define CommitTsCtl (&CommitTsCtlData)
86
87/*
88 * We keep a cache of the last value set in shared memory.
89 *
90 * This is also good place to keep the activation status. We keep this
91 * separate from the GUC so that the standby can activate the module if the
92 * primary has it active independently of the value of the GUC.
93 *
94 * This is protected by CommitTsLock. In some places, we use commitTsActive
95 * without acquiring the lock; where this happens, a comment explains the
96 * rationale for it.
97 */
99{
104
106
107
108/* GUC variable */
110
111static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
112 TransactionId *subxids, TimestampTz ts,
113 RepOriginId nodeid, int64 pageno);
115 RepOriginId nodeid, int slotno);
116static void error_commit_ts_disabled(void);
117static int ZeroCommitTsPage(int64 pageno, bool writeXlog);
118static bool CommitTsPagePrecedes(int64 page1, int64 page2);
119static void ActivateCommitTs(void);
120static void DeactivateCommitTs(void);
121static void WriteZeroPageXlogRec(int64 pageno);
122static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid);
123
124/*
125 * TransactionTreeSetCommitTsData
126 *
127 * Record the final commit timestamp of transaction entries in the commit log
128 * for a transaction and its subtransaction tree, as efficiently as possible.
129 *
130 * xid is the top level transaction id.
131 *
132 * subxids is an array of xids of length nsubxids, representing subtransactions
133 * in the tree of xid. In various cases nsubxids may be zero.
134 * The reason why tracking just the parent xid commit timestamp is not enough
135 * is that the subtrans SLRU does not stay valid across crashes (it's not
136 * permanent) so we need to keep the information about them here. If the
137 * subtrans implementation changes in the future, we might want to revisit the
138 * decision of storing timestamp info for each subxid.
139 */
140void
143 RepOriginId nodeid)
144{
145 int i;
146 TransactionId headxid;
147 TransactionId newestXact;
148
149 /*
150 * No-op if the module is not active.
151 *
152 * An unlocked read here is fine, because in a standby (the only place
153 * where the flag can change in flight) this routine is only called by the
154 * recovery process, which is also the only process which can change the
155 * flag.
156 */
158 return;
159
160 /*
161 * Figure out the latest Xid in this batch: either the last subxid if
162 * there's any, otherwise the parent xid.
163 */
164 if (nsubxids > 0)
165 newestXact = subxids[nsubxids - 1];
166 else
167 newestXact = xid;
168
169 /*
170 * We split the xids to set the timestamp to in groups belonging to the
171 * same SLRU page; the first element in each such set is its head. The
172 * first group has the main XID as the head; subsequent sets use the first
173 * subxid not on the previous page as head. This way, we only have to
174 * lock/modify each SLRU page once.
175 */
176 headxid = xid;
177 i = 0;
178 for (;;)
179 {
180 int64 pageno = TransactionIdToCTsPage(headxid);
181 int j;
182
183 for (j = i; j < nsubxids; j++)
184 {
185 if (TransactionIdToCTsPage(subxids[j]) != pageno)
186 break;
187 }
188 /* subxids[i..j] are on the same page as the head */
189
190 SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
191 pageno);
192
193 /* if we wrote out all subxids, we're done. */
194 if (j >= nsubxids)
195 break;
196
197 /*
198 * Set the new head and skip over it, as well as over the subxids we
199 * just wrote.
200 */
201 headxid = subxids[j];
202 i = j + 1;
203 }
204
205 /* update the cached value in shared memory */
206 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
210
211 /* and move forwards our endpoint, if needed */
214 LWLockRelease(CommitTsLock);
215}
216
217/*
218 * Record the commit timestamp of transaction entries in the commit log for all
219 * entries on a single page. Atomic only on this page.
220 */
221static void
223 TransactionId *subxids, TimestampTz ts,
224 RepOriginId nodeid, int64 pageno)
225{
226 LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
227 int slotno;
228 int i;
229
231
232 slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
233
234 TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
235 for (i = 0; i < nsubxids; i++)
236 TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
237
238 CommitTsCtl->shared->page_dirty[slotno] = true;
239
240 LWLockRelease(lock);
241}
242
243/*
244 * Sets the commit timestamp of a single transaction.
245 *
246 * Caller must hold the correct SLRU bank lock, will be held at exit
247 */
248static void
250 RepOriginId nodeid, int slotno)
251{
252 int entryno = TransactionIdToCTsEntry(xid);
254
256
257 entry.time = ts;
258 entry.nodeid = nodeid;
259
260 memcpy(CommitTsCtl->shared->page_buffer[slotno] +
263}
264
265/*
266 * Interrogate the commit timestamp of a transaction.
267 *
268 * The return value indicates whether a commit timestamp record was found for
269 * the given xid. The timestamp value is returned in *ts (which may not be
270 * null), and the origin node for the Xid is returned in *nodeid, if it's not
271 * null.
272 */
273bool
275 RepOriginId *nodeid)
276{
277 int64 pageno = TransactionIdToCTsPage(xid);
278 int entryno = TransactionIdToCTsEntry(xid);
279 int slotno;
281 TransactionId oldestCommitTsXid;
282 TransactionId newestCommitTsXid;
283
284 if (!TransactionIdIsValid(xid))
286 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
287 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
288 else if (!TransactionIdIsNormal(xid))
289 {
290 /* frozen and bootstrap xids are always committed far in the past */
291 *ts = 0;
292 if (nodeid)
293 *nodeid = 0;
294 return false;
295 }
296
297 LWLockAcquire(CommitTsLock, LW_SHARED);
298
299 /* Error if module not enabled */
302
303 /*
304 * If we're asked for the cached value, return that. Otherwise, fall
305 * through to read from SLRU.
306 */
307 if (commitTsShared->xidLastCommit == xid)
308 {
310 if (nodeid)
312
313 LWLockRelease(CommitTsLock);
314 return *ts != 0;
315 }
316
317 oldestCommitTsXid = TransamVariables->oldestCommitTsXid;
318 newestCommitTsXid = TransamVariables->newestCommitTsXid;
319 /* neither is invalid, or both are */
320 Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
321 LWLockRelease(CommitTsLock);
322
323 /*
324 * Return empty if the requested value is outside our valid range.
325 */
326 if (!TransactionIdIsValid(oldestCommitTsXid) ||
327 TransactionIdPrecedes(xid, oldestCommitTsXid) ||
328 TransactionIdPrecedes(newestCommitTsXid, xid))
329 {
330 *ts = 0;
331 if (nodeid)
332 *nodeid = InvalidRepOriginId;
333 return false;
334 }
335
336 /* lock is acquired by SimpleLruReadPage_ReadOnly */
337 slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
338 memcpy(&entry,
339 CommitTsCtl->shared->page_buffer[slotno] +
342
343 *ts = entry.time;
344 if (nodeid)
345 *nodeid = entry.nodeid;
346
348 return *ts != 0;
349}
350
351/*
352 * Return the Xid of the latest committed transaction. (As far as this module
353 * is concerned, anyway; it's up to the caller to ensure the value is useful
354 * for its purposes.)
355 *
356 * ts and nodeid are filled with the corresponding data; they can be passed
357 * as NULL if not wanted.
358 */
361{
362 TransactionId xid;
363
364 LWLockAcquire(CommitTsLock, LW_SHARED);
365
366 /* Error if module not enabled */
369
371 if (ts)
373 if (nodeid)
375 LWLockRelease(CommitTsLock);
376
377 return xid;
378}
379
380static void
382{
384 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
385 errmsg("could not get commit timestamp data"),
387 errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
388 "track_commit_timestamp") :
389 errhint("Make sure the configuration parameter \"%s\" is set.",
390 "track_commit_timestamp")));
391}
392
393/*
394 * SQL-callable wrapper to obtain commit time of a transaction
395 */
396Datum
398{
400 TimestampTz ts;
401 bool found;
402
403 found = TransactionIdGetCommitTsData(xid, &ts, NULL);
404
405 if (!found)
407
409}
410
411
412/*
413 * pg_last_committed_xact
414 *
415 * SQL-callable wrapper to obtain some information about the latest
416 * committed transaction: transaction ID, timestamp and replication
417 * origin.
418 */
419Datum
421{
422 TransactionId xid;
423 RepOriginId nodeid;
424 TimestampTz ts;
425 Datum values[3];
426 bool nulls[3];
427 TupleDesc tupdesc;
428 HeapTuple htup;
429
430 /* and construct a tuple with our data */
431 xid = GetLatestCommitTsData(&ts, &nodeid);
432
433 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
434 elog(ERROR, "return type must be a row type");
435
436 if (!TransactionIdIsNormal(xid))
437 {
438 memset(nulls, true, sizeof(nulls));
439 }
440 else
441 {
443 nulls[0] = false;
444
446 nulls[1] = false;
447
448 values[2] = ObjectIdGetDatum((Oid) nodeid);
449 nulls[2] = false;
450 }
451
452 htup = heap_form_tuple(tupdesc, values, nulls);
453
455}
456
457/*
458 * pg_xact_commit_timestamp_origin
459 *
460 * SQL-callable wrapper to obtain commit timestamp and replication origin
461 * of a given transaction.
462 */
463Datum
465{
467 RepOriginId nodeid;
468 TimestampTz ts;
469 Datum values[2];
470 bool nulls[2];
471 TupleDesc tupdesc;
472 HeapTuple htup;
473 bool found;
474
475 found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
476
477 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
478 elog(ERROR, "return type must be a row type");
479
480 if (!found)
481 {
482 memset(nulls, true, sizeof(nulls));
483 }
484 else
485 {
487 nulls[0] = false;
488
489 values[1] = ObjectIdGetDatum((Oid) nodeid);
490 nulls[1] = false;
491 }
492
493 htup = heap_form_tuple(tupdesc, values, nulls);
494
496}
497
498/*
499 * Number of shared CommitTS buffers.
500 *
501 * If asked to autotune, use 2MB for every 1GB of shared buffers, up to 8MB.
502 * Otherwise just cap the configured amount to be between 16 and the maximum
503 * allowed.
504 */
505static int
507{
508 /* auto-tune based on shared buffers */
510 return SimpleLruAutotuneBuffers(512, 1024);
511
513}
514
515/*
516 * Shared memory sizing for CommitTs
517 */
518Size
520{
522 sizeof(CommitTimestampShared);
523}
524
525/*
526 * Initialize CommitTs at system startup (postmaster start or standalone
527 * backend)
528 */
529void
531{
532 bool found;
533
534 /* If auto-tuning is requested, now is the time to do it */
536 {
537 char buf[32];
538
539 snprintf(buf, sizeof(buf), "%d", CommitTsShmemBuffers());
540 SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
542
543 /*
544 * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
545 * However, if the DBA explicitly set commit_timestamp_buffers = 0 in
546 * the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override
547 * that and we must force the matter with PGC_S_OVERRIDE.
548 */
549 if (commit_timestamp_buffers == 0) /* failed to apply it? */
550 SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
552 }
554
555 CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
556 SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
557 "pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER,
560 false);
562
563 commitTsShared = ShmemInitStruct("CommitTs shared",
564 sizeof(CommitTimestampShared),
565 &found);
566
568 {
569 Assert(!found);
570
575 }
576 else
577 Assert(found);
578}
579
580/*
581 * GUC check_hook for commit_timestamp_buffers
582 */
583bool
585{
586 return check_slru_buffers("commit_timestamp_buffers", newval);
587}
588
589/*
590 * This function must be called ONCE on system install.
591 *
592 * (The CommitTs directory is assumed to have been created by initdb, and
593 * CommitTsShmemInit must have been called already.)
594 */
595void
597{
598 /*
599 * Nothing to do here at present, unlike most other SLRU modules; segments
600 * are created when the server is started with this module enabled. See
601 * ActivateCommitTs.
602 */
603}
604
605/*
606 * Initialize (or reinitialize) a page of CommitTs to zeroes.
607 * If writeXlog is true, also emit an XLOG record saying we did this.
608 *
609 * The page is not actually written, just set up in shared memory.
610 * The slot number of the new page is returned.
611 *
612 * Control lock must be held at entry, and will be held at exit.
613 */
614static int
615ZeroCommitTsPage(int64 pageno, bool writeXlog)
616{
617 int slotno;
618
619 slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
620
621 if (writeXlog)
622 WriteZeroPageXlogRec(pageno);
623
624 return slotno;
625}
626
627/*
628 * This must be called ONCE during postmaster or standalone-backend startup,
629 * after StartupXLOG has initialized TransamVariables->nextXid.
630 */
631void
633{
635}
636
637/*
638 * This must be called ONCE during postmaster or standalone-backend startup,
639 * after recovery has finished.
640 */
641void
643{
644 /*
645 * If the feature is not enabled, turn it off for good. This also removes
646 * any leftover data.
647 *
648 * Conversely, we activate the module if the feature is enabled. This is
649 * necessary for primary and standby as the activation depends on the
650 * control file contents at the beginning of recovery or when a
651 * XLOG_PARAMETER_CHANGE is replayed.
652 */
655 else
657}
658
659/*
660 * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
661 * XLog record during recovery.
662 */
663void
664CommitTsParameterChange(bool newvalue, bool oldvalue)
665{
666 /*
667 * If the commit_ts module is disabled in this server and we get word from
668 * the primary server that it is enabled there, activate it so that we can
669 * replay future WAL records involving it; also mark it as active on
670 * pg_control. If the old value was already set, we already did this, so
671 * don't do anything.
672 *
673 * If the module is disabled in the primary, disable it here too, unless
674 * the module is enabled locally.
675 *
676 * Note this only runs in the recovery process, so an unlocked read is
677 * fine.
678 */
679 if (newvalue)
680 {
683 }
686}
687
688/*
689 * Activate this module whenever necessary.
690 * This must happen during postmaster or standalone-backend startup,
691 * or during WAL replay anytime the track_commit_timestamp setting is
692 * changed in the primary.
693 *
694 * The reason why this SLRU needs separate activation/deactivation functions is
695 * that it can be enabled/disabled during start and the activation/deactivation
696 * on the primary is propagated to the standby via replay. Other SLRUs don't
697 * have this property and they can be just initialized during normal startup.
698 *
699 * This is in charge of creating the currently active segment, if it's not
700 * already there. The reason for this is that the server might have been
701 * running with this module disabled for a while and thus might have skipped
702 * the normal creation point.
703 */
704static void
706{
707 TransactionId xid;
708 int64 pageno;
709
710 /* If we've done this already, there's nothing to do */
711 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
713 {
714 LWLockRelease(CommitTsLock);
715 return;
716 }
717 LWLockRelease(CommitTsLock);
718
720 pageno = TransactionIdToCTsPage(xid);
721
722 /*
723 * Re-Initialize our idea of the latest page number.
724 */
725 pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number, pageno);
726
727 /*
728 * If CommitTs is enabled, but it wasn't in the previous server run, we
729 * need to set the oldest and newest values to the next Xid; that way, we
730 * will not try to read data that might not have been set.
731 *
732 * XXX does this have a problem if a server is started with commitTs
733 * enabled, then started with commitTs disabled, then restarted with it
734 * enabled again? It doesn't look like it does, because there should be a
735 * checkpoint that sets the value to InvalidTransactionId at end of
736 * recovery; and so any chance of injecting new transactions without
737 * CommitTs values would occur after the oldestCommitTsXid has been set to
738 * Invalid temporarily.
739 */
740 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
742 {
745 }
746 LWLockRelease(CommitTsLock);
747
748 /* Create the current segment file, if necessary */
750 {
751 LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
752 int slotno;
753
755 slotno = ZeroCommitTsPage(pageno, false);
757 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
758 LWLockRelease(lock);
759 }
760
761 /* Change the activation status in shared memory. */
762 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
764 LWLockRelease(CommitTsLock);
765}
766
767/*
768 * Deactivate this module.
769 *
770 * This must be called when the track_commit_timestamp parameter is turned off.
771 * This happens during postmaster or standalone-backend startup, or during WAL
772 * replay.
773 *
774 * Resets CommitTs into invalid state to make sure we don't hand back
775 * possibly-invalid data; also removes segments of old data.
776 */
777static void
779{
780 /*
781 * Cleanup the status in the shared memory.
782 *
783 * We reset everything in the commitTsShared record to prevent user from
784 * getting confusing data about last committed transaction on the standby
785 * when the module was activated repeatedly on the primary.
786 */
787 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
788
793
796
797 /*
798 * Remove *all* files. This is necessary so that there are no leftover
799 * files; in the case where this feature is later enabled after running
800 * with it disabled for some time there may be a gap in the file sequence.
801 * (We can probably tolerate out-of-sequence files, as they are going to
802 * be overwritten anyway when we wrap around, but it seems better to be
803 * tidy.)
804 *
805 * Note that we do this with CommitTsLock acquired in exclusive mode. This
806 * is very heavy-handed, but since this routine can only be called in the
807 * replica and should happen very rarely, we don't worry too much about
808 * it. Note also that no process should be consulting this SLRU if we
809 * have just deactivated it.
810 */
812
813 LWLockRelease(CommitTsLock);
814}
815
816/*
817 * Perform a checkpoint --- either during shutdown, or on-the-fly
818 */
819void
821{
822 /*
823 * Write dirty CommitTs pages to disk. This may result in sync requests
824 * queued for later handling by ProcessSyncRequests(), as part of the
825 * checkpoint.
826 */
828}
829
830/*
831 * Make sure that CommitTs has room for a newly-allocated XID.
832 *
833 * NB: this is called while holding XidGenLock. We want it to be very fast
834 * most of the time; even when it's not so fast, no actual I/O need happen
835 * unless we're forced to write out a dirty CommitTs or xlog page to make room
836 * in shared memory.
837 *
838 * NB: the current implementation relies on track_commit_timestamp being
839 * PGC_POSTMASTER.
840 */
841void
843{
844 int64 pageno;
845 LWLock *lock;
846
847 /*
848 * Nothing to do if module not enabled. Note we do an unlocked read of
849 * the flag here, which is okay because this routine is only called from
850 * GetNewTransactionId, which is never called in a standby.
851 */
854 return;
855
856 /*
857 * No work except at first XID of a page. But beware: just after
858 * wraparound, the first XID of page zero is FirstNormalTransactionId.
859 */
860 if (TransactionIdToCTsEntry(newestXact) != 0 &&
862 return;
863
864 pageno = TransactionIdToCTsPage(newestXact);
865
866 lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
867
869
870 /* Zero the page and make an XLOG entry about it */
872
873 LWLockRelease(lock);
874}
875
876/*
877 * Remove all CommitTs segments before the one holding the passed
878 * transaction ID.
879 *
880 * Note that we don't need to flush XLOG here.
881 */
882void
884{
885 int64 cutoffPage;
886
887 /*
888 * The cutoff point is the start of the segment containing oldestXact. We
889 * pass the *page* containing oldestXact to SimpleLruTruncate.
890 */
891 cutoffPage = TransactionIdToCTsPage(oldestXact);
892
893 /* Check to see if there's any files that could be removed */
895 &cutoffPage))
896 return; /* nothing to remove */
897
898 /* Write XLOG record */
899 WriteTruncateXlogRec(cutoffPage, oldestXact);
900
901 /* Now we can remove the old CommitTs segment(s) */
902 SimpleLruTruncate(CommitTsCtl, cutoffPage);
903}
904
905/*
906 * Set the limit values between which commit TS can be consulted.
907 */
908void
910{
911 /*
912 * Be careful not to overwrite values that are either further into the
913 * "future" or signal a disabled committs.
914 */
915 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
917 {
922 }
923 else
924 {
928 }
929 LWLockRelease(CommitTsLock);
930}
931
932/*
933 * Move forwards the oldest commitTS value that can be consulted
934 */
935void
937{
938 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
942 LWLockRelease(CommitTsLock);
943}
944
945
946/*
947 * Decide whether a commitTS page number is "older" for truncation purposes.
948 * Analogous to CLOGPagePrecedes().
949 *
950 * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128. This
951 * introduces differences compared to CLOG and the other SLRUs having (1 <<
952 * 31) % per_page == 0. This function never tests exactly
953 * TransactionIdPrecedes(x-2^31, x). When the system reaches xidStopLimit,
954 * there are two possible counts of page boundaries between oldestXact and the
955 * latest XID assigned, depending on whether oldestXact is within the first
956 * 128 entries of its page. Since this function doesn't know the location of
957 * oldestXact within page2, it returns false for one page that actually is
958 * expendable. This is a wider (yet still negligible) version of the
959 * truncation opportunity that CLOGPagePrecedes() cannot recognize.
960 *
961 * For the sake of a worked example, number entries with decimal values such
962 * that page1==1 entries range from 1.0 to 1.999. Let N+0.15 be the number of
963 * pages that 2^31 entries will span (N is an integer). If oldestXact=N+2.1,
964 * then the final safe XID assignment leaves newestXact=1.95. We keep page 2,
965 * because entry=2.85 is the border that toggles whether entries precede the
966 * last entry of the oldestXact page. While page 2 is expendable at
967 * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
968 */
969static bool
971{
972 TransactionId xid1;
973 TransactionId xid2;
974
975 xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
976 xid1 += FirstNormalTransactionId + 1;
977 xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
978 xid2 += FirstNormalTransactionId + 1;
979
980 return (TransactionIdPrecedes(xid1, xid2) &&
982}
983
984
985/*
986 * Write a ZEROPAGE xlog record
987 */
988static void
990{
992 XLogRegisterData(&pageno, sizeof(pageno));
993 (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
994}
995
996/*
997 * Write a TRUNCATE xlog record
998 */
999static void
1001{
1003
1004 xlrec.pageno = pageno;
1005 xlrec.oldestXid = oldestXid;
1006
1009 (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
1010}
1011
1012/*
1013 * CommitTS resource manager's routines
1014 */
1015void
1017{
1018 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
1019
1020 /* Backup blocks are not used in commit_ts records */
1022
1023 if (info == COMMIT_TS_ZEROPAGE)
1024 {
1025 int64 pageno;
1026 int slotno;
1027 LWLock *lock;
1028
1029 memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
1030
1031 lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
1033
1034 slotno = ZeroCommitTsPage(pageno, false);
1036 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
1037
1038 LWLockRelease(lock);
1039 }
1040 else if (info == COMMIT_TS_TRUNCATE)
1041 {
1043
1045
1046 /*
1047 * During XLOG replay, latest_page_number isn't set up yet; insert a
1048 * suitable value to bypass the sanity test in SimpleLruTruncate.
1049 */
1050 pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number,
1051 trunc->pageno);
1052
1054 }
1055 else
1056 elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1057}
1058
1059/*
1060 * Entrypoint for sync.c to sync commit_ts files.
1061 */
1062int
1063committssyncfiletag(const FileTag *ftag, char *path)
1064{
1065 return SlruSyncFileTag(CommitTsCtl, ftag, path);
1066}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:485
static Datum values[MAXATTR]
Definition: bootstrap.c:151
#define Min(x, y)
Definition: c.h:975
uint8_t uint8
Definition: c.h:500
#define Max(x, y)
Definition: c.h:969
int64_t int64
Definition: c.h:499
uint32 TransactionId
Definition: c.h:623
size_t Size
Definition: c.h:576
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, RepOriginId nodeid, int64 pageno)
Definition: commit_ts.c:222
static void WriteZeroPageXlogRec(int64 pageno)
Definition: commit_ts.c:989
void StartupCommitTs(void)
Definition: commit_ts.c:632
static SlruCtlData CommitTsCtlData
Definition: commit_ts.c:83
Datum pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
Definition: commit_ts.c:464
struct CommitTimestampEntry CommitTimestampEntry
struct CommitTimestampShared CommitTimestampShared
Datum pg_last_committed_xact(PG_FUNCTION_ARGS)
Definition: commit_ts.c:420
TransactionId GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:360
void CommitTsParameterChange(bool newvalue, bool oldvalue)
Definition: commit_ts.c:664
#define COMMIT_TS_XACTS_PER_PAGE
Definition: commit_ts.c:63
#define TransactionIdToCTsEntry(xid)
Definition: commit_ts.c:77
static void DeactivateCommitTs(void)
Definition: commit_ts.c:778
Size CommitTsShmemSize(void)
Definition: commit_ts.c:519
bool track_commit_timestamp
Definition: commit_ts.c:109
void AdvanceOldestCommitTsXid(TransactionId oldestXact)
Definition: commit_ts.c:936
static CommitTimestampShared * commitTsShared
Definition: commit_ts.c:105
int committssyncfiletag(const FileTag *ftag, char *path)
Definition: commit_ts.c:1063
void CompleteCommitTsInitialization(void)
Definition: commit_ts.c:642
bool check_commit_ts_buffers(int *newval, void **extra, GucSource source)
Definition: commit_ts.c:584
static void ActivateCommitTs(void)
Definition: commit_ts.c:705
static int64 TransactionIdToCTsPage(TransactionId xid)
Definition: commit_ts.c:72
void TruncateCommitTs(TransactionId oldestXact)
Definition: commit_ts.c:883
void commit_ts_redo(XLogReaderState *record)
Definition: commit_ts.c:1016
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:274
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, RepOriginId nodeid, int slotno)
Definition: commit_ts.c:249
Datum pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
Definition: commit_ts.c:397
static int CommitTsShmemBuffers(void)
Definition: commit_ts.c:506
static void error_commit_ts_disabled(void)
Definition: commit_ts.c:381
static bool CommitTsPagePrecedes(int64 page1, int64 page2)
Definition: commit_ts.c:970
#define SizeOfCommitTimestampEntry
Definition: commit_ts.c:60
void BootStrapCommitTs(void)
Definition: commit_ts.c:596
void CommitTsShmemInit(void)
Definition: commit_ts.c:530
void SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
Definition: commit_ts.c:909
#define CommitTsCtl
Definition: commit_ts.c:85
void ExtendCommitTs(TransactionId newestXact)
Definition: commit_ts.c:842
void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid)
Definition: commit_ts.c:141
static int ZeroCommitTsPage(int64 pageno, bool writeXlog)
Definition: commit_ts.c:615
static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid)
Definition: commit_ts.c:1000
void CheckPointCommitTs(void)
Definition: commit_ts.c:820
#define COMMIT_TS_ZEROPAGE
Definition: commit_ts.h:46
#define SizeOfCommitTsTruncate
Definition: commit_ts.h:66
#define COMMIT_TS_TRUNCATE
Definition: commit_ts.h:47
int64 TimestampTz
Definition: timestamp.h:39
#define TIMESTAMP_NOBEGIN(j)
Definition: timestamp.h:159
int errhint(const char *fmt,...)
Definition: elog.c:1317
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define PANIC
Definition: elog.h:42
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
#define PG_RETURN_NULL()
Definition: fmgr.h:345
#define PG_GETARG_TRANSACTIONID(n)
Definition: fmgr.h:279
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:353
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:276
@ TYPEFUNC_COMPOSITE
Definition: funcapi.h:149
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
Definition: funcapi.h:230
bool IsUnderPostmaster
Definition: globals.c:119
int commit_timestamp_buffers
Definition: globals.c:160
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:4332
#define newval
GucSource
Definition: guc.h:112
@ PGC_S_DYNAMIC_DEFAULT
Definition: guc.h:114
@ PGC_S_OVERRIDE
Definition: guc.h:123
@ PGC_POSTMASTER
Definition: guc.h:74
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1117
int j
Definition: isn.c:75
int i
Definition: isn.c:74
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1179
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1899
@ LWTRANCHE_COMMITTS_BUFFER
Definition: lwlock.h:182
@ LWTRANCHE_COMMITTS_SLRU
Definition: lwlock.h:213
@ LW_SHARED
Definition: lwlock.h:115
@ LW_EXCLUSIVE
Definition: lwlock.h:114
#define InvalidRepOriginId
Definition: origin.h:33
static rewind_source * source
Definition: pg_rewind.c:89
static char * buf
Definition: pg_test_fsync.c:72
int64 timestamp
#define snprintf
Definition: port.h:239
static Datum TransactionIdGetDatum(TransactionId X)
Definition: postgres.h:277
uintptr_t Datum
Definition: postgres.h:69
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:257
unsigned int Oid
Definition: postgres_ext.h:32
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:382
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)
Definition: slru.c:252
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
Definition: slru.c:605
void SimpleLruWritePage(SlruCtl ctl, int slotno)
Definition: slru.c:732
void SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
Definition: slru.c:1322
int SimpleLruAutotuneBuffers(int divisor, int max)
Definition: slru.c:232
bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int64 pageno)
Definition: slru.c:746
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1791
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
Definition: slru.c:1744
int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, TransactionId xid)
Definition: slru.c:502
int SlruSyncFileTag(SlruCtl ctl, const FileTag *ftag, char *path)
Definition: slru.c:1831
int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
Definition: slru.c:375
void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
Definition: slru.c:1408
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:199
bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int64 segpage, void *data)
Definition: slru.c:1712
bool check_slru_buffers(const char *name, int *newval)
Definition: slru.c:355
static LWLock * SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)
Definition: slru.h:175
#define SlruPagePrecedesUnitTests(ctl, per_page)
Definition: slru.h:199
#define SLRU_MAX_ALLOWED_BUFFERS
Definition: slru.h:24
TimestampTz time
Definition: commit_ts.c:56
RepOriginId nodeid
Definition: commit_ts.c:57
CommitTimestampEntry dataLastCommit
Definition: commit_ts.c:101
TransactionId xidLastCommit
Definition: commit_ts.c:100
Definition: sync.h:51
Definition: lwlock.h:42
TransactionId oldestCommitTsXid
Definition: transam.h:232
TransactionId newestCommitTsXid
Definition: transam.h:233
FullTransactionId nextXid
Definition: transam.h:220
TransactionId oldestXid
Definition: commit_ts.h:63
@ SYNC_HANDLER_COMMIT_TS
Definition: sync.h:39
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
static TransactionId ReadNextTransactionId(void)
Definition: transam.h:315
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
#define XidFromFullTransactionId(x)
Definition: transam.h:48
#define FirstNormalTransactionId
Definition: transam.h:34
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
#define PG_RETURN_TIMESTAMPTZ(x)
Definition: timestamp.h:68
TransamVariablesData * TransamVariables
Definition: varsup.c:34
bool RecoveryInProgress(void)
Definition: xlog.c:6380
uint16 RepOriginId
Definition: xlogdefs.h:65
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:474
void XLogRegisterData(const void *data, uint32 len)
Definition: xloginsert.c:364
void XLogBeginInsert(void)
Definition: xloginsert.c:149
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:410
#define XLogRecGetData(decoder)
Definition: xlogreader.h:415
#define XLogRecHasAnyBlockRefs(decoder)
Definition: xlogreader.h:417
bool InRecovery
Definition: xlogutils.c:50