PostgreSQL Source Code git master
Loading...
Searching...
No Matches
commit_ts.c File Reference
#include "postgres.h"
#include "access/commit_ts.h"
#include "access/htup_details.h"
#include "access/slru.h"
#include "access/transam.h"
#include "access/xloginsert.h"
#include "access/xlogutils.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "storage/shmem.h"
#include "utils/fmgrprotos.h"
#include "utils/guc_hooks.h"
#include "utils/timestamp.h"
Include dependency graph for commit_ts.c:

Go to the source code of this file.

Data Structures

struct  CommitTimestampEntry
 
struct  CommitTimestampShared
 

Macros

#define SizeOfCommitTimestampEntry
 
#define COMMIT_TS_XACTS_PER_PAGE    (BLCKSZ / SizeOfCommitTimestampEntry)
 
#define TransactionIdToCTsEntry(xid)    ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
 
#define CommitTsCtl   (&CommitTsCtlData)
 

Typedefs

typedef struct CommitTimestampEntry CommitTimestampEntry
 
typedef struct CommitTimestampShared CommitTimestampShared
 

Functions

static int64 TransactionIdToCTsPage (TransactionId xid)
 
static void SetXidCommitTsInPage (TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, ReplOriginId nodeid, int64 pageno)
 
static void TransactionIdSetCommitTs (TransactionId xid, TimestampTz ts, ReplOriginId nodeid, int slotno)
 
static void error_commit_ts_disabled (void)
 
static bool CommitTsPagePrecedes (int64 page1, int64 page2)
 
static int commit_ts_errdetail_for_io_error (const void *opaque_data)
 
static void ActivateCommitTs (void)
 
static void DeactivateCommitTs (void)
 
static void WriteTruncateXlogRec (int64 pageno, TransactionId oldestXid)
 
void TransactionTreeSetCommitTsData (TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, ReplOriginId nodeid)
 
bool TransactionIdGetCommitTsData (TransactionId xid, TimestampTz *ts, ReplOriginId *nodeid)
 
TransactionId GetLatestCommitTsData (TimestampTz *ts, ReplOriginId *nodeid)
 
Datum pg_xact_commit_timestamp (PG_FUNCTION_ARGS)
 
Datum pg_last_committed_xact (PG_FUNCTION_ARGS)
 
Datum pg_xact_commit_timestamp_origin (PG_FUNCTION_ARGS)
 
static int CommitTsShmemBuffers (void)
 
Size CommitTsShmemSize (void)
 
void CommitTsShmemInit (void)
 
bool check_commit_ts_buffers (int *newval, void **extra, GucSource source)
 
void BootStrapCommitTs (void)
 
void StartupCommitTs (void)
 
void CompleteCommitTsInitialization (void)
 
void CommitTsParameterChange (bool newvalue, bool oldvalue)
 
void CheckPointCommitTs (void)
 
void ExtendCommitTs (TransactionId newestXact)
 
void TruncateCommitTs (TransactionId oldestXact)
 
void SetCommitTsLimit (TransactionId oldestXact, TransactionId newestXact)
 
void AdvanceOldestCommitTsXid (TransactionId oldestXact)
 
void commit_ts_redo (XLogReaderState *record)
 
int committssyncfiletag (const FileTag *ftag, char *path)
 

Variables

static SlruCtlData CommitTsCtlData
 
static CommitTimestampSharedcommitTsShared
 
bool track_commit_timestamp
 

Macro Definition Documentation

◆ COMMIT_TS_XACTS_PER_PAGE

#define COMMIT_TS_XACTS_PER_PAGE    (BLCKSZ / SizeOfCommitTimestampEntry)

Definition at line 63 of file commit_ts.c.

72{
73 return xid / (int64) COMMIT_TS_XACTS_PER_PAGE;
74}
75
76#define TransactionIdToCTsEntry(xid) \
77 ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
78
79/*
80 * Link to shared-memory data structures for CommitTs control
81 */
83
84#define CommitTsCtl (&CommitTsCtlData)
85
86/*
87 * We keep a cache of the last value set in shared memory.
88 *
89 * This is also good place to keep the activation status. We keep this
90 * separate from the GUC so that the standby can activate the module if the
91 * primary has it active independently of the value of the GUC.
92 *
93 * This is protected by CommitTsLock. In some places, we use commitTsActive
94 * without acquiring the lock; where this happens, a comment explains the
95 * rationale for it.
96 */
97typedef struct CommitTimestampShared
98{
101 bool commitTsActive;
103
105
106
107/* GUC variable */
109
110static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
111 TransactionId *subxids, TimestampTz ts,
112 ReplOriginId nodeid, int64 pageno);
114 ReplOriginId nodeid, int slotno);
115static void error_commit_ts_disabled(void);
117static int commit_ts_errdetail_for_io_error(const void *opaque_data);
118static void ActivateCommitTs(void);
119static void DeactivateCommitTs(void);
120static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid);
121
122/*
123 * TransactionTreeSetCommitTsData
124 *
125 * Record the final commit timestamp of transaction entries in the commit log
126 * for a transaction and its subtransaction tree, as efficiently as possible.
127 *
128 * xid is the top level transaction id.
129 *
130 * subxids is an array of xids of length nsubxids, representing subtransactions
131 * in the tree of xid. In various cases nsubxids may be zero.
132 * The reason why tracking just the parent xid commit timestamp is not enough
133 * is that the subtrans SLRU does not stay valid across crashes (it's not
134 * permanent) so we need to keep the information about them here. If the
135 * subtrans implementation changes in the future, we might want to revisit the
136 * decision of storing timestamp info for each subxid.
137 */
138void
141 ReplOriginId nodeid)
142{
143 int i;
146
147 /*
148 * No-op if the module is not active.
149 *
150 * An unlocked read here is fine, because in a standby (the only place
151 * where the flag can change in flight) this routine is only called by the
152 * recovery process, which is also the only process which can change the
153 * flag.
154 */
156 return;
157
158 /*
159 * Figure out the latest Xid in this batch: either the last subxid if
160 * there's any, otherwise the parent xid.
161 */
162 if (nsubxids > 0)
163 newestXact = subxids[nsubxids - 1];
164 else
165 newestXact = xid;
166
167 /*
168 * We split the xids to set the timestamp to in groups belonging to the
169 * same SLRU page; the first element in each such set is its head. The
170 * first group has the main XID as the head; subsequent sets use the first
171 * subxid not on the previous page as head. This way, we only have to
172 * lock/modify each SLRU page once.
173 */
174 headxid = xid;
175 i = 0;
176 for (;;)
177 {
179 int j;
180
181 for (j = i; j < nsubxids; j++)
182 {
183 if (TransactionIdToCTsPage(subxids[j]) != pageno)
184 break;
185 }
186 /* subxids[i..j] are on the same page as the head */
187
188 SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
189 pageno);
190
191 /* if we wrote out all subxids, we're done. */
192 if (j >= nsubxids)
193 break;
194
195 /*
196 * Set the new head and skip over it, as well as over the subxids we
197 * just wrote.
198 */
199 headxid = subxids[j];
200 i = j + 1;
201 }
202
203 /* update the cached value in shared memory */
208
209 /* and move forwards our endpoint, if needed */
213}
214
215/*
216 * Record the commit timestamp of transaction entries in the commit log for all
217 * entries on a single page. Atomic only on this page.
218 */
219static void
221 TransactionId *subxids, TimestampTz ts,
222 ReplOriginId nodeid, int64 pageno)
223{
224 LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
225 int slotno;
226 int i;
227
229
230 slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, &xid);
231
232 TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
233 for (i = 0; i < nsubxids; i++)
234 TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
235
236 CommitTsCtl->shared->page_dirty[slotno] = true;
237
238 LWLockRelease(lock);
239}
240
241/*
242 * Sets the commit timestamp of a single transaction.
243 *
244 * Caller must hold the correct SLRU bank lock, will be held at exit
245 */
246static void
248 ReplOriginId nodeid, int slotno)
249{
252
254
255 entry.time = ts;
256 entry.nodeid = nodeid;
257
258 memcpy(CommitTsCtl->shared->page_buffer[slotno] +
261}
262
263/*
264 * Interrogate the commit timestamp of a transaction.
265 *
266 * The return value indicates whether a commit timestamp record was found for
267 * the given xid. The timestamp value is returned in *ts (which may not be
268 * null), and the origin node for the Xid is returned in *nodeid, if it's not
269 * null.
270 */
271bool
273 ReplOriginId *nodeid)
274{
275 int64 pageno = TransactionIdToCTsPage(xid);
277 int slotno;
279 TransactionId oldestCommitTsXid;
280 TransactionId newestCommitTsXid;
281
282 if (!TransactionIdIsValid(xid))
285 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
286 else if (!TransactionIdIsNormal(xid))
287 {
288 /* frozen and bootstrap xids are always committed far in the past */
289 *ts = 0;
290 if (nodeid)
291 *nodeid = 0;
292 return false;
293 }
294
296
297 /* Error if module not enabled */
300
301 /*
302 * If we're asked for the cached value, return that. Otherwise, fall
303 * through to read from SLRU.
304 */
305 if (commitTsShared->xidLastCommit == xid)
306 {
308 if (nodeid)
310
312 return *ts != 0;
313 }
314
315 oldestCommitTsXid = TransamVariables->oldestCommitTsXid;
316 newestCommitTsXid = TransamVariables->newestCommitTsXid;
317 /* neither is invalid, or both are */
318 Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
320
321 /*
322 * Return empty if the requested value is outside our valid range.
323 */
324 if (!TransactionIdIsValid(oldestCommitTsXid) ||
325 TransactionIdPrecedes(xid, oldestCommitTsXid) ||
326 TransactionIdPrecedes(newestCommitTsXid, xid))
327 {
328 *ts = 0;
329 if (nodeid)
330 *nodeid = InvalidReplOriginId;
331 return false;
332 }
333
334 /* lock is acquired by SimpleLruReadPage_ReadOnly */
336 memcpy(&entry,
337 CommitTsCtl->shared->page_buffer[slotno] +
340
341 *ts = entry.time;
342 if (nodeid)
343 *nodeid = entry.nodeid;
344
346 return *ts != 0;
347}
348
349/*
350 * Return the Xid of the latest committed transaction. (As far as this module
351 * is concerned, anyway; it's up to the caller to ensure the value is useful
352 * for its purposes.)
353 *
354 * ts and nodeid are filled with the corresponding data; they can be passed
355 * as NULL if not wanted.
356 */
359{
360 TransactionId xid;
361
363
364 /* Error if module not enabled */
367
369 if (ts)
371 if (nodeid)
374
375 return xid;
376}
377
378static void
380{
383 errmsg("could not get commit timestamp data"),
385 errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
386 "track_commit_timestamp") :
387 errhint("Make sure the configuration parameter \"%s\" is set.",
388 "track_commit_timestamp")));
389}
390
391/*
392 * SQL-callable wrapper to obtain commit time of a transaction
393 */
394Datum
396{
398 TimestampTz ts;
399 bool found;
400
401 found = TransactionIdGetCommitTsData(xid, &ts, NULL);
402
403 if (!found)
405
407}
408
409
410/*
411 * pg_last_committed_xact
412 *
413 * SQL-callable wrapper to obtain some information about the latest
414 * committed transaction: transaction ID, timestamp and replication
415 * origin.
416 */
417Datum
419{
420 TransactionId xid;
421 ReplOriginId nodeid;
422 TimestampTz ts;
423 Datum values[3];
424 bool nulls[3];
425 TupleDesc tupdesc;
426 HeapTuple htup;
427
428 /* and construct a tuple with our data */
429 xid = GetLatestCommitTsData(&ts, &nodeid);
430
431 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
432 elog(ERROR, "return type must be a row type");
433
434 if (!TransactionIdIsNormal(xid))
435 {
436 memset(nulls, true, sizeof(nulls));
437 }
438 else
439 {
441 nulls[0] = false;
442
444 nulls[1] = false;
445
446 values[2] = ObjectIdGetDatum((Oid) nodeid);
447 nulls[2] = false;
448 }
449
450 htup = heap_form_tuple(tupdesc, values, nulls);
451
453}
454
455/*
456 * pg_xact_commit_timestamp_origin
457 *
458 * SQL-callable wrapper to obtain commit timestamp and replication origin
459 * of a given transaction.
460 */
461Datum
463{
465 ReplOriginId nodeid;
466 TimestampTz ts;
467 Datum values[2];
468 bool nulls[2];
469 TupleDesc tupdesc;
470 HeapTuple htup;
471 bool found;
472
473 found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
474
475 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
476 elog(ERROR, "return type must be a row type");
477
478 if (!found)
479 {
480 memset(nulls, true, sizeof(nulls));
481 }
482 else
483 {
485 nulls[0] = false;
486
487 values[1] = ObjectIdGetDatum((Oid) nodeid);
488 nulls[1] = false;
489 }
490
491 htup = heap_form_tuple(tupdesc, values, nulls);
492
494}
495
496/*
497 * Number of shared CommitTS buffers.
498 *
499 * If asked to autotune, use 2MB for every 1GB of shared buffers, up to 8MB.
500 * Otherwise just cap the configured amount to be between 16 and the maximum
501 * allowed.
502 */
503static int
505{
506 /* auto-tune based on shared buffers */
508 return SimpleLruAutotuneBuffers(512, 1024);
509
511}
512
513/*
514 * Shared memory sizing for CommitTs
515 */
516Size
518{
520 sizeof(CommitTimestampShared);
521}
522
523/*
524 * Initialize CommitTs at system startup (postmaster start or standalone
525 * backend)
526 */
527void
529{
530 bool found;
531
532 /* If auto-tuning is requested, now is the time to do it */
534 {
535 char buf[32];
536
537 snprintf(buf, sizeof(buf), "%d", CommitTsShmemBuffers());
538 SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
540
541 /*
542 * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
543 * However, if the DBA explicitly set commit_timestamp_buffers = 0 in
544 * the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override
545 * that and we must force the matter with PGC_S_OVERRIDE.
546 */
547 if (commit_timestamp_buffers == 0) /* failed to apply it? */
548 SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
550 }
552
553 CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
554 CommitTsCtl->errdetail_for_io_error = commit_ts_errdetail_for_io_error;
555 SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
556 "pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER,
559 false);
561
562 commitTsShared = ShmemInitStruct("CommitTs shared",
563 sizeof(CommitTimestampShared),
564 &found);
565
567 {
568 Assert(!found);
569
574 }
575 else
576 Assert(found);
577}
578
579/*
580 * GUC check_hook for commit_timestamp_buffers
581 */
582bool
584{
585 return check_slru_buffers("commit_timestamp_buffers", newval);
586}
587
588/*
589 * This function must be called ONCE on system install.
590 *
591 * (The CommitTs directory is assumed to have been created by initdb, and
592 * CommitTsShmemInit must have been called already.)
593 */
594void
596{
597 /*
598 * Nothing to do here at present, unlike most other SLRU modules; segments
599 * are created when the server is started with this module enabled. See
600 * ActivateCommitTs.
601 */
602}
603
604/*
605 * This must be called ONCE during postmaster or standalone-backend startup,
606 * after StartupXLOG has initialized TransamVariables->nextXid.
607 */
608void
609StartupCommitTs(void)
610{
612}
613
614/*
615 * This must be called ONCE during postmaster or standalone-backend startup,
616 * after recovery has finished.
617 */
618void
620{
621 /*
622 * If the feature is not enabled, turn it off for good. This also removes
623 * any leftover data.
624 *
625 * Conversely, we activate the module if the feature is enabled. This is
626 * necessary for primary and standby as the activation depends on the
627 * control file contents at the beginning of recovery or when a
628 * XLOG_PARAMETER_CHANGE is replayed.
629 */
632 else
634}
635
636/*
637 * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
638 * XLog record during recovery.
639 */
640void
642{
643 /*
644 * If the commit_ts module is disabled in this server and we get word from
645 * the primary server that it is enabled there, activate it so that we can
646 * replay future WAL records involving it; also mark it as active on
647 * pg_control. If the old value was already set, we already did this, so
648 * don't do anything.
649 *
650 * If the module is disabled in the primary, disable it here too, unless
651 * the module is enabled locally.
652 *
653 * Note this only runs in the recovery process, so an unlocked read is
654 * fine.
655 */
656 if (newvalue)
657 {
660 }
663}
664
665/*
666 * Activate this module whenever necessary.
667 * This must happen during postmaster or standalone-backend startup,
668 * or during WAL replay anytime the track_commit_timestamp setting is
669 * changed in the primary.
670 *
671 * The reason why this SLRU needs separate activation/deactivation functions is
672 * that it can be enabled/disabled during start and the activation/deactivation
673 * on the primary is propagated to the standby via replay. Other SLRUs don't
674 * have this property and they can be just initialized during normal startup.
675 *
676 * This is in charge of creating the currently active segment, if it's not
677 * already there. The reason for this is that the server might have been
678 * running with this module disabled for a while and thus might have skipped
679 * the normal creation point.
680 */
681static void
683{
684 TransactionId xid;
685 int64 pageno;
686
687 /*
688 * During bootstrap, we should not register commit timestamps so skip the
689 * activation in this case.
690 */
692 return;
693
694 /* If we've done this already, there's nothing to do */
697 {
699 return;
700 }
702
704 pageno = TransactionIdToCTsPage(xid);
705
706 /*
707 * Re-Initialize our idea of the latest page number.
708 */
709 pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number, pageno);
710
711 /*
712 * If CommitTs is enabled, but it wasn't in the previous server run, we
713 * need to set the oldest and newest values to the next Xid; that way, we
714 * will not try to read data that might not have been set.
715 *
716 * XXX does this have a problem if a server is started with commitTs
717 * enabled, then started with commitTs disabled, then restarted with it
718 * enabled again? It doesn't look like it does, because there should be a
719 * checkpoint that sets the value to InvalidTransactionId at end of
720 * recovery; and so any chance of injecting new transactions without
721 * CommitTs values would occur after the oldestCommitTsXid has been set to
722 * Invalid temporarily.
723 */
726 {
729 }
731
732 /* Create the current segment file, if necessary */
735
736 /* Change the activation status in shared memory. */
740}
741
742/*
743 * Deactivate this module.
744 *
745 * This must be called when the track_commit_timestamp parameter is turned off.
746 * This happens during postmaster or standalone-backend startup, or during WAL
747 * replay.
748 *
749 * Resets CommitTs into invalid state to make sure we don't hand back
750 * possibly-invalid data; also removes segments of old data.
751 */
752static void
754{
755 /*
756 * Cleanup the status in the shared memory.
757 *
758 * We reset everything in the commitTsShared record to prevent user from
759 * getting confusing data about last committed transaction on the standby
760 * when the module was activated repeatedly on the primary.
761 */
763
768
771
772 /*
773 * Remove *all* files. This is necessary so that there are no leftover
774 * files; in the case where this feature is later enabled after running
775 * with it disabled for some time there may be a gap in the file sequence.
776 * (We can probably tolerate out-of-sequence files, as they are going to
777 * be overwritten anyway when we wrap around, but it seems better to be
778 * tidy.)
779 *
780 * Note that we do this with CommitTsLock acquired in exclusive mode. This
781 * is very heavy-handed, but since this routine can only be called in the
782 * replica and should happen very rarely, we don't worry too much about
783 * it. Note also that no process should be consulting this SLRU if we
784 * have just deactivated it.
785 */
787
789}
790
791/*
792 * Perform a checkpoint --- either during shutdown, or on-the-fly
793 */
794void
796{
797 /*
798 * Write dirty CommitTs pages to disk. This may result in sync requests
799 * queued for later handling by ProcessSyncRequests(), as part of the
800 * checkpoint.
801 */
803}
804
805/*
806 * Make sure that CommitTs has room for a newly-allocated XID.
807 *
808 * NB: this is called while holding XidGenLock. We want it to be very fast
809 * most of the time; even when it's not so fast, no actual I/O need happen
810 * unless we're forced to write out a dirty CommitTs or xlog page to make room
811 * in shared memory.
812 *
813 * NB: the current implementation relies on track_commit_timestamp being
814 * PGC_POSTMASTER.
815 */
816void
818{
819 int64 pageno;
820 LWLock *lock;
821
822 /*
823 * Nothing to do if module not enabled. Note we do an unlocked read of
824 * the flag here, which is okay because this routine is only called from
825 * GetNewTransactionId, which is never called in a standby.
826 */
829 return;
830
831 /*
832 * No work except at first XID of a page. But beware: just after
833 * wraparound, the first XID of page zero is FirstNormalTransactionId.
834 */
837 return;
838
840
841 lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
842
844
845 /* Zero the page ... */
847
848 /* and make a WAL entry about that, unless we're in REDO */
849 if (!InRecovery)
851
852 LWLockRelease(lock);
853}
854
855/*
856 * Remove all CommitTs segments before the one holding the passed
857 * transaction ID.
858 *
859 * Note that we don't need to flush XLOG here.
860 */
861void
863{
865
866 /*
867 * The cutoff point is the start of the segment containing oldestXact. We
868 * pass the *page* containing oldestXact to SimpleLruTruncate.
869 */
871
872 /* Check to see if there's any files that could be removed */
874 &cutoffPage))
875 return; /* nothing to remove */
876
877 /* Write XLOG record */
878 WriteTruncateXlogRec(cutoffPage, oldestXact);
879
880 /* Now we can remove the old CommitTs segment(s) */
882}
883
884/*
885 * Set the limit values between which commit TS can be consulted.
886 */
887void
889{
890 /*
891 * Be careful not to overwrite values that are either further into the
892 * "future" or signal a disabled committs.
893 */
896 {
901 }
902 else
903 {
907 }
909}
910
911/*
912 * Move forwards the oldest commitTS value that can be consulted
913 */
914void
916{
922}
923
924
925/*
926 * Decide whether a commitTS page number is "older" for truncation purposes.
927 * Analogous to CLOGPagePrecedes().
928 *
929 * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128. This
930 * introduces differences compared to CLOG and the other SLRUs having (1 <<
931 * 31) % per_page == 0. This function never tests exactly
932 * TransactionIdPrecedes(x-2^31, x). When the system reaches xidStopLimit,
933 * there are two possible counts of page boundaries between oldestXact and the
934 * latest XID assigned, depending on whether oldestXact is within the first
935 * 128 entries of its page. Since this function doesn't know the location of
936 * oldestXact within page2, it returns false for one page that actually is
937 * expendable. This is a wider (yet still negligible) version of the
938 * truncation opportunity that CLOGPagePrecedes() cannot recognize.
939 *
940 * For the sake of a worked example, number entries with decimal values such
941 * that page1==1 entries range from 1.0 to 1.999. Let N+0.15 be the number of
942 * pages that 2^31 entries will span (N is an integer). If oldestXact=N+2.1,
943 * then the final safe XID assignment leaves newestXact=1.95. We keep page 2,
944 * because entry=2.85 is the border that toggles whether entries precede the
945 * last entry of the oldestXact page. While page 2 is expendable at
946 * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
947 */
948static bool
950{
953
958
959 return (TransactionIdPrecedes(xid1, xid2) &&
961}
962
963static int
965{
966 TransactionId xid = *(const TransactionId *) opaque_data;
967
968 return errdetail("Could not access commit timestamp of transaction %u.", xid);
969}
970
971/*
972 * Write a TRUNCATE xlog record
973 */
974static void
976{
978
979 xlrec.pageno = pageno;
980 xlrec.oldestXid = oldestXid;
981
985}
986
987/*
988 * CommitTS resource manager's routines
989 */
990void
992{
993 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
994
995 /* Backup blocks are not used in commit_ts records */
997
998 if (info == COMMIT_TS_ZEROPAGE)
999 {
1000 int64 pageno;
1001
1002 memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
1004 }
1005 else if (info == COMMIT_TS_TRUNCATE)
1006 {
1008
1009 AdvanceOldestCommitTsXid(trunc->oldestXid);
1010
1011 /*
1012 * During XLOG replay, latest_page_number isn't set up yet; insert a
1013 * suitable value to bypass the sanity test in SimpleLruTruncate.
1014 */
1015 pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number,
1016 trunc->pageno);
1017
1019 }
1020 else
1021 elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1022}
1023
1024/*
1025 * Entrypoint for sync.c to sync commit_ts files.
1026 */
1027int
1028committssyncfiletag(const FileTag *ftag, char *path)
1029{
1030 return SlruSyncFileTag(CommitTsCtl, ftag, path);
1031}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:485
static Datum values[MAXATTR]
Definition bootstrap.c:188
#define Min(x, y)
Definition c.h:1093
uint8_t uint8
Definition c.h:616
#define Max(x, y)
Definition c.h:1087
#define Assert(condition)
Definition c.h:945
int64_t int64
Definition c.h:615
uint32 TransactionId
Definition c.h:738
size_t Size
Definition c.h:691
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, ReplOriginId nodeid, int64 pageno)
Definition commit_ts.c:221
void StartupCommitTs(void)
Definition commit_ts.c:610
static SlruCtlData CommitTsCtlData
Definition commit_ts.c:83
Datum pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
Definition commit_ts.c:463
static int commit_ts_errdetail_for_io_error(const void *opaque_data)
Definition commit_ts.c:965
Datum pg_last_committed_xact(PG_FUNCTION_ARGS)
Definition commit_ts.c:419
void CommitTsParameterChange(bool newvalue, bool oldvalue)
Definition commit_ts.c:642
#define COMMIT_TS_XACTS_PER_PAGE
Definition commit_ts.c:63
#define TransactionIdToCTsEntry(xid)
Definition commit_ts.c:77
static void DeactivateCommitTs(void)
Definition commit_ts.c:754
Size CommitTsShmemSize(void)
Definition commit_ts.c:518
bool track_commit_timestamp
Definition commit_ts.c:109
void AdvanceOldestCommitTsXid(TransactionId oldestXact)
Definition commit_ts.c:916
static CommitTimestampShared * commitTsShared
Definition commit_ts.c:105
int committssyncfiletag(const FileTag *ftag, char *path)
Definition commit_ts.c:1029
void CompleteCommitTsInitialization(void)
Definition commit_ts.c:620
void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, ReplOriginId nodeid)
Definition commit_ts.c:140
bool check_commit_ts_buffers(int *newval, void **extra, GucSource source)
Definition commit_ts.c:584
static void ActivateCommitTs(void)
Definition commit_ts.c:683
static int64 TransactionIdToCTsPage(TransactionId xid)
Definition commit_ts.c:72
void TruncateCommitTs(TransactionId oldestXact)
Definition commit_ts.c:863
void commit_ts_redo(XLogReaderState *record)
Definition commit_ts.c:992
Datum pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
Definition commit_ts.c:396
static int CommitTsShmemBuffers(void)
Definition commit_ts.c:505
static void error_commit_ts_disabled(void)
Definition commit_ts.c:380
static bool CommitTsPagePrecedes(int64 page1, int64 page2)
Definition commit_ts.c:950
#define SizeOfCommitTimestampEntry
Definition commit_ts.c:60
void BootStrapCommitTs(void)
Definition commit_ts.c:596
void CommitTsShmemInit(void)
Definition commit_ts.c:529
void SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
Definition commit_ts.c:889
#define CommitTsCtl
Definition commit_ts.c:85
TransactionId GetLatestCommitTsData(TimestampTz *ts, ReplOriginId *nodeid)
Definition commit_ts.c:359
void ExtendCommitTs(TransactionId newestXact)
Definition commit_ts.c:818
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, ReplOriginId *nodeid)
Definition commit_ts.c:273
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, ReplOriginId nodeid, int slotno)
Definition commit_ts.c:248
static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid)
Definition commit_ts.c:976
void CheckPointCommitTs(void)
Definition commit_ts.c:796
#define COMMIT_TS_ZEROPAGE
Definition commit_ts.h:46
#define SizeOfCommitTsTruncate
Definition commit_ts.h:55
#define COMMIT_TS_TRUNCATE
Definition commit_ts.h:47
int64 TimestampTz
Definition timestamp.h:39
#define TIMESTAMP_NOBEGIN(j)
Definition timestamp.h:159
int errcode(int sqlerrcode)
Definition elog.c:874
int errhint(const char *fmt,...) pg_attribute_printf(1
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define PANIC
Definition elog.h:42
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
#define PG_RETURN_NULL()
Definition fmgr.h:346
#define PG_GETARG_TRANSACTIONID(n)
Definition fmgr.h:280
#define PG_RETURN_DATUM(x)
Definition fmgr.h:354
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition funcapi.c:276
@ TYPEFUNC_COMPOSITE
Definition funcapi.h:149
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
Definition funcapi.h:230
bool IsUnderPostmaster
Definition globals.c:120
int commit_timestamp_buffers
Definition globals.c:161
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition guc.c:4228
#define newval
GucSource
Definition guc.h:112
@ PGC_S_DYNAMIC_DEFAULT
Definition guc.h:114
@ PGC_S_OVERRIDE
Definition guc.h:123
@ PGC_POSTMASTER
Definition guc.h:74
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1037
int j
Definition isn.c:78
int i
Definition isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1177
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1794
@ LW_SHARED
Definition lwlock.h:113
@ LW_EXCLUSIVE
Definition lwlock.h:112
#define IsBootstrapProcessingMode()
Definition miscadmin.h:477
static char * errmsg
#define InvalidReplOriginId
Definition origin.h:33
static rewind_source * source
Definition pg_rewind.c:89
static char buf[DEFAULT_XLOG_SEG_SIZE]
int64 timestamp
#define snprintf
Definition port.h:260
static Datum TransactionIdGetDatum(TransactionId X)
Definition postgres.h:292
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
uint64_t Datum
Definition postgres.h:70
unsigned int Oid
static int fb(int x)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:381
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)
Definition slru.c:254
void SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
Definition slru.c:1355
int SimpleLruAutotuneBuffers(int divisor, int max)
Definition slru.c:233
int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, const void *opaque_data)
Definition slru.c:533
bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int64 pageno)
Definition slru.c:778
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition slru.c:1824
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
Definition slru.c:1777
int SlruSyncFileTag(SlruCtl ctl, const FileTag *ftag, char *path)
Definition slru.c:1864
int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
Definition slru.c:380
void SimpleLruZeroAndWritePage(SlruCtl ctl, int64 pageno)
Definition slru.c:449
void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
Definition slru.c:1441
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, const void *opaque_data)
Definition slru.c:637
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition slru.c:200
bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int64 segpage, void *data)
Definition slru.c:1745
bool check_slru_buffers(const char *name, int *newval)
Definition slru.c:360
static LWLock * SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)
Definition slru.h:171
#define SlruPagePrecedesUnitTests(ctl, per_page)
Definition slru.h:196
#define SLRU_MAX_ALLOWED_BUFFERS
Definition slru.h:25
TimestampTz time
Definition commit_ts.c:56
ReplOriginId nodeid
Definition commit_ts.c:57
CommitTimestampEntry dataLastCommit
Definition commit_ts.c:101
TransactionId xidLastCommit
Definition commit_ts.c:100
Definition sync.h:51
TransactionId oldestCommitTsXid
Definition transam.h:232
TransactionId newestCommitTsXid
Definition transam.h:233
FullTransactionId nextXid
Definition transam.h:220
@ SYNC_HANDLER_COMMIT_TS
Definition sync.h:39
static TransactionId ReadNextTransactionId(void)
Definition transam.h:377
#define InvalidTransactionId
Definition transam.h:31
#define TransactionIdEquals(id1, id2)
Definition transam.h:43
#define XidFromFullTransactionId(x)
Definition transam.h:48
#define FirstNormalTransactionId
Definition transam.h:34
#define TransactionIdIsValid(xid)
Definition transam.h:41
#define TransactionIdIsNormal(xid)
Definition transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263
static Datum TimestampTzGetDatum(TimestampTz X)
Definition timestamp.h:52
#define PG_RETURN_TIMESTAMPTZ(x)
Definition timestamp.h:68
TransamVariablesData * TransamVariables
Definition varsup.c:34
bool RecoveryInProgress(void)
Definition xlog.c:6444
uint16 ReplOriginId
Definition xlogdefs.h:69
XLogRecPtr XLogSimpleInsertInt64(RmgrId rmid, uint8 info, int64 value)
Definition xloginsert.c:544
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition xloginsert.c:479
void XLogRegisterData(const void *data, uint32 len)
Definition xloginsert.c:369
void XLogBeginInsert(void)
Definition xloginsert.c:153
#define XLogRecGetInfo(decoder)
Definition xlogreader.h:409
#define XLogRecGetData(decoder)
Definition xlogreader.h:414
#define XLogRecHasAnyBlockRefs(decoder)
Definition xlogreader.h:416
bool InRecovery
Definition xlogutils.c:50

◆ CommitTsCtl

#define CommitTsCtl   (&CommitTsCtlData)

Definition at line 85 of file commit_ts.c.

◆ SizeOfCommitTimestampEntry

#define SizeOfCommitTimestampEntry
Value:

Definition at line 60 of file commit_ts.c.

◆ TransactionIdToCTsEntry

#define TransactionIdToCTsEntry (   xid)     ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)

Definition at line 77 of file commit_ts.c.

Typedef Documentation

◆ CommitTimestampEntry

◆ CommitTimestampShared

Function Documentation

◆ ActivateCommitTs()

static void ActivateCommitTs ( void  )
static

Definition at line 683 of file commit_ts.c.

684{
685 TransactionId xid;
686 int64 pageno;
687
688 /*
689 * During bootstrap, we should not register commit timestamps so skip the
690 * activation in this case.
691 */
693 return;
694
695 /* If we've done this already, there's nothing to do */
698 {
700 return;
701 }
703
705 pageno = TransactionIdToCTsPage(xid);
706
707 /*
708 * Re-Initialize our idea of the latest page number.
709 */
710 pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number, pageno);
711
712 /*
713 * If CommitTs is enabled, but it wasn't in the previous server run, we
714 * need to set the oldest and newest values to the next Xid; that way, we
715 * will not try to read data that might not have been set.
716 *
717 * XXX does this have a problem if a server is started with commitTs
718 * enabled, then started with commitTs disabled, then restarted with it
719 * enabled again? It doesn't look like it does, because there should be a
720 * checkpoint that sets the value to InvalidTransactionId at end of
721 * recovery; and so any chance of injecting new transactions without
722 * CommitTs values would occur after the oldestCommitTsXid has been set to
723 * Invalid temporarily.
724 */
727 {
730 }
732
733 /* Create the current segment file, if necessary */
736
737 /* Change the activation status in shared memory. */
741}

References CommitTimestampShared::commitTsActive, CommitTsCtl, commitTsShared, fb(), InvalidTransactionId, IsBootstrapProcessingMode, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), TransamVariablesData::newestCommitTsXid, TransamVariablesData::nextXid, TransamVariablesData::oldestCommitTsXid, pg_atomic_write_u64(), ReadNextTransactionId(), SimpleLruDoesPhysicalPageExist(), SimpleLruZeroAndWritePage(), TransactionIdToCTsPage(), TransamVariables, and XidFromFullTransactionId.

Referenced by CommitTsParameterChange(), CompleteCommitTsInitialization(), and StartupCommitTs().

◆ AdvanceOldestCommitTsXid()

◆ BootStrapCommitTs()

void BootStrapCommitTs ( void  )

Definition at line 596 of file commit_ts.c.

597{
598 /*
599 * Nothing to do here at present, unlike most other SLRU modules; segments
600 * are created when the server is started with this module enabled. See
601 * ActivateCommitTs.
602 */
603}

Referenced by BootStrapXLOG().

◆ check_commit_ts_buffers()

bool check_commit_ts_buffers ( int newval,
void **  extra,
GucSource  source 
)

Definition at line 584 of file commit_ts.c.

585{
586 return check_slru_buffers("commit_timestamp_buffers", newval);
587}

References check_slru_buffers(), and newval.

◆ CheckPointCommitTs()

void CheckPointCommitTs ( void  )

Definition at line 796 of file commit_ts.c.

797{
798 /*
799 * Write dirty CommitTs pages to disk. This may result in sync requests
800 * queued for later handling by ProcessSyncRequests(), as part of the
801 * checkpoint.
802 */
804}

References CommitTsCtl, and SimpleLruWriteAll().

Referenced by CheckPointGuts().

◆ commit_ts_errdetail_for_io_error()

static int commit_ts_errdetail_for_io_error ( const void opaque_data)
static

Definition at line 965 of file commit_ts.c.

966{
967 TransactionId xid = *(const TransactionId *) opaque_data;
968
969 return errdetail("Could not access commit timestamp of transaction %u.", xid);
970}

References errdetail(), and fb().

Referenced by CommitTsShmemInit().

◆ commit_ts_redo()

void commit_ts_redo ( XLogReaderState record)

Definition at line 992 of file commit_ts.c.

993{
994 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
995
996 /* Backup blocks are not used in commit_ts records */
998
999 if (info == COMMIT_TS_ZEROPAGE)
1000 {
1001 int64 pageno;
1002
1003 memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
1005 }
1006 else if (info == COMMIT_TS_TRUNCATE)
1007 {
1009
1010 AdvanceOldestCommitTsXid(trunc->oldestXid);
1011
1012 /*
1013 * During XLOG replay, latest_page_number isn't set up yet; insert a
1014 * suitable value to bypass the sanity test in SimpleLruTruncate.
1015 */
1016 pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number,
1017 trunc->pageno);
1018
1020 }
1021 else
1022 elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1023}

References AdvanceOldestCommitTsXid(), Assert, COMMIT_TS_TRUNCATE, COMMIT_TS_ZEROPAGE, CommitTsCtl, elog, fb(), PANIC, pg_atomic_write_u64(), SimpleLruTruncate(), SimpleLruZeroAndWritePage(), XLogRecGetData, XLogRecGetInfo, and XLogRecHasAnyBlockRefs.

◆ CommitTsPagePrecedes()

◆ CommitTsParameterChange()

void CommitTsParameterChange ( bool  newvalue,
bool  oldvalue 
)

Definition at line 642 of file commit_ts.c.

643{
644 /*
645 * If the commit_ts module is disabled in this server and we get word from
646 * the primary server that it is enabled there, activate it so that we can
647 * replay future WAL records involving it; also mark it as active on
648 * pg_control. If the old value was already set, we already did this, so
649 * don't do anything.
650 *
651 * If the module is disabled in the primary, disable it here too, unless
652 * the module is enabled locally.
653 *
654 * Note this only runs in the recovery process, so an unlocked read is
655 * fine.
656 */
657 if (newvalue)
658 {
661 }
664}

References ActivateCommitTs(), CommitTimestampShared::commitTsActive, commitTsShared, DeactivateCommitTs(), and fb().

Referenced by xlog_redo().

◆ CommitTsShmemBuffers()

static int CommitTsShmemBuffers ( void  )
static

Definition at line 505 of file commit_ts.c.

506{
507 /* auto-tune based on shared buffers */
509 return SimpleLruAutotuneBuffers(512, 1024);
510
512}

References commit_timestamp_buffers, Max, Min, SimpleLruAutotuneBuffers(), and SLRU_MAX_ALLOWED_BUFFERS.

Referenced by CommitTsShmemInit(), and CommitTsShmemSize().

◆ CommitTsShmemInit()

void CommitTsShmemInit ( void  )

Definition at line 529 of file commit_ts.c.

530{
531 bool found;
532
533 /* If auto-tuning is requested, now is the time to do it */
535 {
536 char buf[32];
537
538 snprintf(buf, sizeof(buf), "%d", CommitTsShmemBuffers());
539 SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
541
542 /*
543 * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
544 * However, if the DBA explicitly set commit_timestamp_buffers = 0 in
545 * the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override
546 * that and we must force the matter with PGC_S_OVERRIDE.
547 */
548 if (commit_timestamp_buffers == 0) /* failed to apply it? */
549 SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
551 }
553
554 CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
555 CommitTsCtl->errdetail_for_io_error = commit_ts_errdetail_for_io_error;
556 SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
557 "pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER,
560 false);
562
563 commitTsShared = ShmemInitStruct("CommitTs shared",
564 sizeof(CommitTimestampShared),
565 &found);
566
568 {
569 Assert(!found);
570
575 }
576 else
577 Assert(found);
578}

References Assert, buf, commit_timestamp_buffers, commit_ts_errdetail_for_io_error(), COMMIT_TS_XACTS_PER_PAGE, CommitTimestampShared::commitTsActive, CommitTsCtl, CommitTsPagePrecedes(), commitTsShared, CommitTsShmemBuffers(), CommitTimestampShared::dataLastCommit, fb(), InvalidReplOriginId, InvalidTransactionId, IsUnderPostmaster, CommitTimestampEntry::nodeid, PGC_POSTMASTER, PGC_S_DYNAMIC_DEFAULT, PGC_S_OVERRIDE, SetConfigOption(), ShmemInitStruct(), SimpleLruInit(), SlruPagePrecedesUnitTests, snprintf, SYNC_HANDLER_COMMIT_TS, CommitTimestampEntry::time, TIMESTAMP_NOBEGIN, and CommitTimestampShared::xidLastCommit.

Referenced by CreateOrAttachShmemStructs().

◆ CommitTsShmemSize()

Size CommitTsShmemSize ( void  )

Definition at line 518 of file commit_ts.c.

519{
521 sizeof(CommitTimestampShared);
522}

References CommitTsShmemBuffers(), and SimpleLruShmemSize().

Referenced by CalculateShmemSize().

◆ committssyncfiletag()

int committssyncfiletag ( const FileTag ftag,
char path 
)

Definition at line 1029 of file commit_ts.c.

1030{
1031 return SlruSyncFileTag(CommitTsCtl, ftag, path);
1032}

References CommitTsCtl, and SlruSyncFileTag().

◆ CompleteCommitTsInitialization()

void CompleteCommitTsInitialization ( void  )

Definition at line 620 of file commit_ts.c.

621{
622 /*
623 * If the feature is not enabled, turn it off for good. This also removes
624 * any leftover data.
625 *
626 * Conversely, we activate the module if the feature is enabled. This is
627 * necessary for primary and standby as the activation depends on the
628 * control file contents at the beginning of recovery or when a
629 * XLOG_PARAMETER_CHANGE is replayed.
630 */
633 else
635}

References ActivateCommitTs(), DeactivateCommitTs(), and track_commit_timestamp.

Referenced by StartupXLOG().

◆ DeactivateCommitTs()

static void DeactivateCommitTs ( void  )
static

Definition at line 754 of file commit_ts.c.

755{
756 /*
757 * Cleanup the status in the shared memory.
758 *
759 * We reset everything in the commitTsShared record to prevent user from
760 * getting confusing data about last committed transaction on the standby
761 * when the module was activated repeatedly on the primary.
762 */
764
769
772
773 /*
774 * Remove *all* files. This is necessary so that there are no leftover
775 * files; in the case where this feature is later enabled after running
776 * with it disabled for some time there may be a gap in the file sequence.
777 * (We can probably tolerate out-of-sequence files, as they are going to
778 * be overwritten anyway when we wrap around, but it seems better to be
779 * tidy.)
780 *
781 * Note that we do this with CommitTsLock acquired in exclusive mode. This
782 * is very heavy-handed, but since this routine can only be called in the
783 * replica and should happen very rarely, we don't worry too much about
784 * it. Note also that no process should be consulting this SLRU if we
785 * have just deactivated it.
786 */
788
790}

References CommitTimestampShared::commitTsActive, CommitTsCtl, commitTsShared, CommitTimestampShared::dataLastCommit, fb(), InvalidReplOriginId, InvalidTransactionId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), TransamVariablesData::newestCommitTsXid, CommitTimestampEntry::nodeid, TransamVariablesData::oldestCommitTsXid, SlruScanDirCbDeleteAll(), SlruScanDirectory(), CommitTimestampEntry::time, TIMESTAMP_NOBEGIN, TransamVariables, and CommitTimestampShared::xidLastCommit.

Referenced by CommitTsParameterChange(), and CompleteCommitTsInitialization().

◆ error_commit_ts_disabled()

static void error_commit_ts_disabled ( void  )
static

Definition at line 380 of file commit_ts.c.

381{
384 errmsg("could not get commit timestamp data"),
386 errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
387 "track_commit_timestamp") :
388 errhint("Make sure the configuration parameter \"%s\" is set.",
389 "track_commit_timestamp")));
390}

References ereport, errcode(), errhint(), errmsg, ERROR, fb(), and RecoveryInProgress().

Referenced by GetLatestCommitTsData(), and TransactionIdGetCommitTsData().

◆ ExtendCommitTs()

void ExtendCommitTs ( TransactionId  newestXact)

Definition at line 818 of file commit_ts.c.

819{
820 int64 pageno;
821 LWLock *lock;
822
823 /*
824 * Nothing to do if module not enabled. Note we do an unlocked read of
825 * the flag here, which is okay because this routine is only called from
826 * GetNewTransactionId, which is never called in a standby.
827 */
830 return;
831
832 /*
833 * No work except at first XID of a page. But beware: just after
834 * wraparound, the first XID of page zero is FirstNormalTransactionId.
835 */
838 return;
839
841
842 lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
843
845
846 /* Zero the page ... */
848
849 /* and make a WAL entry about that, unless we're in REDO */
850 if (!InRecovery)
852
853 LWLockRelease(lock);
854}

References Assert, COMMIT_TS_ZEROPAGE, CommitTimestampShared::commitTsActive, CommitTsCtl, commitTsShared, fb(), FirstNormalTransactionId, InRecovery, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SimpleLruGetBankLock(), SimpleLruZeroPage(), TransactionIdEquals, TransactionIdToCTsEntry, TransactionIdToCTsPage(), and XLogSimpleInsertInt64().

Referenced by GetNewTransactionId().

◆ GetLatestCommitTsData()

◆ pg_last_committed_xact()

Datum pg_last_committed_xact ( PG_FUNCTION_ARGS  )

Definition at line 419 of file commit_ts.c.

420{
421 TransactionId xid;
422 ReplOriginId nodeid;
423 TimestampTz ts;
424 Datum values[3];
425 bool nulls[3];
426 TupleDesc tupdesc;
427 HeapTuple htup;
428
429 /* and construct a tuple with our data */
430 xid = GetLatestCommitTsData(&ts, &nodeid);
431
432 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
433 elog(ERROR, "return type must be a row type");
434
435 if (!TransactionIdIsNormal(xid))
436 {
437 memset(nulls, true, sizeof(nulls));
438 }
439 else
440 {
442 nulls[0] = false;
443
445 nulls[1] = false;
446
447 values[2] = ObjectIdGetDatum((Oid) nodeid);
448 nulls[2] = false;
449 }
450
451 htup = heap_form_tuple(tupdesc, values, nulls);
452
454}

References elog, ERROR, fb(), get_call_result_type(), GetLatestCommitTsData(), heap_form_tuple(), HeapTupleGetDatum(), ObjectIdGetDatum(), PG_RETURN_DATUM, TimestampTzGetDatum(), TransactionIdGetDatum(), TransactionIdIsNormal, TYPEFUNC_COMPOSITE, and values.

◆ pg_xact_commit_timestamp()

Datum pg_xact_commit_timestamp ( PG_FUNCTION_ARGS  )

Definition at line 396 of file commit_ts.c.

397{
399 TimestampTz ts;
400 bool found;
401
402 found = TransactionIdGetCommitTsData(xid, &ts, NULL);
403
404 if (!found)
406
408}

References fb(), PG_GETARG_TRANSACTIONID, PG_RETURN_NULL, PG_RETURN_TIMESTAMPTZ, and TransactionIdGetCommitTsData().

◆ pg_xact_commit_timestamp_origin()

Datum pg_xact_commit_timestamp_origin ( PG_FUNCTION_ARGS  )

Definition at line 463 of file commit_ts.c.

464{
466 ReplOriginId nodeid;
467 TimestampTz ts;
468 Datum values[2];
469 bool nulls[2];
470 TupleDesc tupdesc;
471 HeapTuple htup;
472 bool found;
473
474 found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
475
476 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
477 elog(ERROR, "return type must be a row type");
478
479 if (!found)
480 {
481 memset(nulls, true, sizeof(nulls));
482 }
483 else
484 {
486 nulls[0] = false;
487
488 values[1] = ObjectIdGetDatum((Oid) nodeid);
489 nulls[1] = false;
490 }
491
492 htup = heap_form_tuple(tupdesc, values, nulls);
493
495}

References elog, ERROR, fb(), get_call_result_type(), heap_form_tuple(), HeapTupleGetDatum(), ObjectIdGetDatum(), PG_GETARG_TRANSACTIONID, PG_RETURN_DATUM, TimestampTzGetDatum(), TransactionIdGetCommitTsData(), TYPEFUNC_COMPOSITE, and values.

◆ SetCommitTsLimit()

void SetCommitTsLimit ( TransactionId  oldestXact,
TransactionId  newestXact 
)

◆ SetXidCommitTsInPage()

static void SetXidCommitTsInPage ( TransactionId  xid,
int  nsubxids,
TransactionId subxids,
TimestampTz  ts,
ReplOriginId  nodeid,
int64  pageno 
)
static

Definition at line 221 of file commit_ts.c.

224{
225 LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
226 int slotno;
227 int i;
228
230
231 slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, &xid);
232
233 TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
234 for (i = 0; i < nsubxids; i++)
235 TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
236
237 CommitTsCtl->shared->page_dirty[slotno] = true;
238
239 LWLockRelease(lock);
240}

References CommitTsCtl, fb(), i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SimpleLruGetBankLock(), SimpleLruReadPage(), and TransactionIdSetCommitTs().

Referenced by TransactionTreeSetCommitTsData().

◆ StartupCommitTs()

void StartupCommitTs ( void  )

Definition at line 610 of file commit_ts.c.

611{
613}

References ActivateCommitTs().

Referenced by StartupXLOG().

◆ TransactionIdGetCommitTsData()

bool TransactionIdGetCommitTsData ( TransactionId  xid,
TimestampTz ts,
ReplOriginId nodeid 
)

Definition at line 273 of file commit_ts.c.

275{
276 int64 pageno = TransactionIdToCTsPage(xid);
278 int slotno;
280 TransactionId oldestCommitTsXid;
281 TransactionId newestCommitTsXid;
282
283 if (!TransactionIdIsValid(xid))
286 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
287 else if (!TransactionIdIsNormal(xid))
288 {
289 /* frozen and bootstrap xids are always committed far in the past */
290 *ts = 0;
291 if (nodeid)
292 *nodeid = 0;
293 return false;
294 }
295
297
298 /* Error if module not enabled */
301
302 /*
303 * If we're asked for the cached value, return that. Otherwise, fall
304 * through to read from SLRU.
305 */
306 if (commitTsShared->xidLastCommit == xid)
307 {
309 if (nodeid)
311
313 return *ts != 0;
314 }
315
316 oldestCommitTsXid = TransamVariables->oldestCommitTsXid;
317 newestCommitTsXid = TransamVariables->newestCommitTsXid;
318 /* neither is invalid, or both are */
319 Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
321
322 /*
323 * Return empty if the requested value is outside our valid range.
324 */
325 if (!TransactionIdIsValid(oldestCommitTsXid) ||
326 TransactionIdPrecedes(xid, oldestCommitTsXid) ||
327 TransactionIdPrecedes(newestCommitTsXid, xid))
328 {
329 *ts = 0;
330 if (nodeid)
331 *nodeid = InvalidReplOriginId;
332 return false;
333 }
334
335 /* lock is acquired by SimpleLruReadPage_ReadOnly */
337 memcpy(&entry,
338 CommitTsCtl->shared->page_buffer[slotno] +
341
342 *ts = entry.time;
343 if (nodeid)
344 *nodeid = entry.nodeid;
345
347 return *ts != 0;
348}

References Assert, CommitTimestampShared::commitTsActive, CommitTsCtl, commitTsShared, CommitTimestampShared::dataLastCommit, ereport, errcode(), errmsg, ERROR, error_commit_ts_disabled(), fb(), InvalidReplOriginId, LW_SHARED, LWLockAcquire(), LWLockRelease(), TransamVariablesData::newestCommitTsXid, CommitTimestampEntry::nodeid, TransamVariablesData::oldestCommitTsXid, SimpleLruGetBankLock(), SimpleLruReadPage_ReadOnly(), SizeOfCommitTimestampEntry, CommitTimestampEntry::time, TransactionIdIsNormal, TransactionIdIsValid, TransactionIdPrecedes(), TransactionIdToCTsEntry, TransactionIdToCTsPage(), TransamVariables, and CommitTimestampShared::xidLastCommit.

Referenced by GetTupleTransactionInfo(), pg_xact_commit_timestamp(), pg_xact_commit_timestamp_origin(), and update_most_recent_deletion_info().

◆ TransactionIdSetCommitTs()

static void TransactionIdSetCommitTs ( TransactionId  xid,
TimestampTz  ts,
ReplOriginId  nodeid,
int  slotno 
)
static

Definition at line 248 of file commit_ts.c.

250{
253
255
256 entry.time = ts;
257 entry.nodeid = nodeid;
258
259 memcpy(CommitTsCtl->shared->page_buffer[slotno] +
262}

References Assert, CommitTsCtl, fb(), CommitTimestampEntry::nodeid, SizeOfCommitTimestampEntry, CommitTimestampEntry::time, TransactionIdIsNormal, and TransactionIdToCTsEntry.

Referenced by SetXidCommitTsInPage().

◆ TransactionIdToCTsPage()

static int64 TransactionIdToCTsPage ( TransactionId  xid)
inlinestatic

◆ TransactionTreeSetCommitTsData()

void TransactionTreeSetCommitTsData ( TransactionId  xid,
int  nsubxids,
TransactionId subxids,
TimestampTz  timestamp,
ReplOriginId  nodeid 
)

Definition at line 140 of file commit_ts.c.

143{
144 int i;
147
148 /*
149 * No-op if the module is not active.
150 *
151 * An unlocked read here is fine, because in a standby (the only place
152 * where the flag can change in flight) this routine is only called by the
153 * recovery process, which is also the only process which can change the
154 * flag.
155 */
157 return;
158
159 /*
160 * Figure out the latest Xid in this batch: either the last subxid if
161 * there's any, otherwise the parent xid.
162 */
163 if (nsubxids > 0)
164 newestXact = subxids[nsubxids - 1];
165 else
166 newestXact = xid;
167
168 /*
169 * We split the xids to set the timestamp to in groups belonging to the
170 * same SLRU page; the first element in each such set is its head. The
171 * first group has the main XID as the head; subsequent sets use the first
172 * subxid not on the previous page as head. This way, we only have to
173 * lock/modify each SLRU page once.
174 */
175 headxid = xid;
176 i = 0;
177 for (;;)
178 {
180 int j;
181
182 for (j = i; j < nsubxids; j++)
183 {
184 if (TransactionIdToCTsPage(subxids[j]) != pageno)
185 break;
186 }
187 /* subxids[i..j] are on the same page as the head */
188
189 SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
190 pageno);
191
192 /* if we wrote out all subxids, we're done. */
193 if (j >= nsubxids)
194 break;
195
196 /*
197 * Set the new head and skip over it, as well as over the subxids we
198 * just wrote.
199 */
200 headxid = subxids[j];
201 i = j + 1;
202 }
203
204 /* update the cached value in shared memory */
209
210 /* and move forwards our endpoint, if needed */
214}

References CommitTimestampShared::commitTsActive, commitTsShared, CommitTimestampShared::dataLastCommit, fb(), i, j, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), TransamVariablesData::newestCommitTsXid, CommitTimestampEntry::nodeid, SetXidCommitTsInPage(), CommitTimestampEntry::time, TransactionIdPrecedes(), TransactionIdToCTsPage(), TransamVariables, and CommitTimestampShared::xidLastCommit.

Referenced by RecordTransactionCommit(), RecordTransactionCommitPrepared(), and xact_redo_commit().

◆ TruncateCommitTs()

void TruncateCommitTs ( TransactionId  oldestXact)

Definition at line 863 of file commit_ts.c.

864{
866
867 /*
868 * The cutoff point is the start of the segment containing oldestXact. We
869 * pass the *page* containing oldestXact to SimpleLruTruncate.
870 */
872
873 /* Check to see if there's any files that could be removed */
875 &cutoffPage))
876 return; /* nothing to remove */
877
878 /* Write XLOG record */
879 WriteTruncateXlogRec(cutoffPage, oldestXact);
880
881 /* Now we can remove the old CommitTs segment(s) */
883}

References CommitTsCtl, fb(), SimpleLruTruncate(), SlruScanDirCbReportPresence(), SlruScanDirectory(), TransactionIdToCTsPage(), and WriteTruncateXlogRec().

Referenced by vac_truncate_clog().

◆ WriteTruncateXlogRec()

static void WriteTruncateXlogRec ( int64  pageno,
TransactionId  oldestXid 
)
static

Variable Documentation

◆ CommitTsCtlData

SlruCtlData CommitTsCtlData
static

Definition at line 83 of file commit_ts.c.

◆ commitTsShared

◆ track_commit_timestamp