PostgreSQL Source Code git master
Loading...
Searching...
No Matches
commit_ts.c File Reference
#include "postgres.h"
#include "access/commit_ts.h"
#include "access/htup_details.h"
#include "access/slru.h"
#include "access/transam.h"
#include "access/xloginsert.h"
#include "access/xlogutils.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "storage/shmem.h"
#include "utils/fmgrprotos.h"
#include "utils/guc_hooks.h"
#include "utils/timestamp.h"
Include dependency graph for commit_ts.c:

Go to the source code of this file.

Data Structures

struct  CommitTimestampEntry
 
struct  CommitTimestampShared
 

Macros

#define SizeOfCommitTimestampEntry
 
#define COMMIT_TS_XACTS_PER_PAGE    (BLCKSZ / SizeOfCommitTimestampEntry)
 
#define TransactionIdToCTsEntry(xid)    ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
 
#define CommitTsCtl   (&CommitTsCtlData)
 

Typedefs

typedef struct CommitTimestampEntry CommitTimestampEntry
 
typedef struct CommitTimestampShared CommitTimestampShared
 

Functions

static int64 TransactionIdToCTsPage (TransactionId xid)
 
static void SetXidCommitTsInPage (TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, RepOriginId nodeid, int64 pageno)
 
static void TransactionIdSetCommitTs (TransactionId xid, TimestampTz ts, RepOriginId nodeid, int slotno)
 
static void error_commit_ts_disabled (void)
 
static bool CommitTsPagePrecedes (int64 page1, int64 page2)
 
static void ActivateCommitTs (void)
 
static void DeactivateCommitTs (void)
 
static void WriteTruncateXlogRec (int64 pageno, TransactionId oldestXid)
 
void TransactionTreeSetCommitTsData (TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid)
 
bool TransactionIdGetCommitTsData (TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)
 
TransactionId GetLatestCommitTsData (TimestampTz *ts, RepOriginId *nodeid)
 
Datum pg_xact_commit_timestamp (PG_FUNCTION_ARGS)
 
Datum pg_last_committed_xact (PG_FUNCTION_ARGS)
 
Datum pg_xact_commit_timestamp_origin (PG_FUNCTION_ARGS)
 
static int CommitTsShmemBuffers (void)
 
Size CommitTsShmemSize (void)
 
void CommitTsShmemInit (void)
 
bool check_commit_ts_buffers (int *newval, void **extra, GucSource source)
 
void BootStrapCommitTs (void)
 
void StartupCommitTs (void)
 
void CompleteCommitTsInitialization (void)
 
void CommitTsParameterChange (bool newvalue, bool oldvalue)
 
void CheckPointCommitTs (void)
 
void ExtendCommitTs (TransactionId newestXact)
 
void TruncateCommitTs (TransactionId oldestXact)
 
void SetCommitTsLimit (TransactionId oldestXact, TransactionId newestXact)
 
void AdvanceOldestCommitTsXid (TransactionId oldestXact)
 
void commit_ts_redo (XLogReaderState *record)
 
int committssyncfiletag (const FileTag *ftag, char *path)
 

Variables

static SlruCtlData CommitTsCtlData
 
static CommitTimestampSharedcommitTsShared
 
bool track_commit_timestamp
 

Macro Definition Documentation

◆ COMMIT_TS_XACTS_PER_PAGE

#define COMMIT_TS_XACTS_PER_PAGE    (BLCKSZ / SizeOfCommitTimestampEntry)

Definition at line 63 of file commit_ts.c.

72{
73 return xid / (int64) COMMIT_TS_XACTS_PER_PAGE;
74}
75
76#define TransactionIdToCTsEntry(xid) \
77 ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
78
79/*
80 * Link to shared-memory data structures for CommitTs control
81 */
83
84#define CommitTsCtl (&CommitTsCtlData)
85
86/*
87 * We keep a cache of the last value set in shared memory.
88 *
89 * This is also good place to keep the activation status. We keep this
90 * separate from the GUC so that the standby can activate the module if the
91 * primary has it active independently of the value of the GUC.
92 *
93 * This is protected by CommitTsLock. In some places, we use commitTsActive
94 * without acquiring the lock; where this happens, a comment explains the
95 * rationale for it.
96 */
97typedef struct CommitTimestampShared
98{
101 bool commitTsActive;
103
105
106
107/* GUC variable */
109
110static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
111 TransactionId *subxids, TimestampTz ts,
112 RepOriginId nodeid, int64 pageno);
114 RepOriginId nodeid, int slotno);
115static void error_commit_ts_disabled(void);
117static void ActivateCommitTs(void);
118static void DeactivateCommitTs(void);
119static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid);
120
121/*
122 * TransactionTreeSetCommitTsData
123 *
124 * Record the final commit timestamp of transaction entries in the commit log
125 * for a transaction and its subtransaction tree, as efficiently as possible.
126 *
127 * xid is the top level transaction id.
128 *
129 * subxids is an array of xids of length nsubxids, representing subtransactions
130 * in the tree of xid. In various cases nsubxids may be zero.
131 * The reason why tracking just the parent xid commit timestamp is not enough
132 * is that the subtrans SLRU does not stay valid across crashes (it's not
133 * permanent) so we need to keep the information about them here. If the
134 * subtrans implementation changes in the future, we might want to revisit the
135 * decision of storing timestamp info for each subxid.
136 */
137void
140 RepOriginId nodeid)
141{
142 int i;
145
146 /*
147 * No-op if the module is not active.
148 *
149 * An unlocked read here is fine, because in a standby (the only place
150 * where the flag can change in flight) this routine is only called by the
151 * recovery process, which is also the only process which can change the
152 * flag.
153 */
155 return;
156
157 /*
158 * Figure out the latest Xid in this batch: either the last subxid if
159 * there's any, otherwise the parent xid.
160 */
161 if (nsubxids > 0)
162 newestXact = subxids[nsubxids - 1];
163 else
164 newestXact = xid;
165
166 /*
167 * We split the xids to set the timestamp to in groups belonging to the
168 * same SLRU page; the first element in each such set is its head. The
169 * first group has the main XID as the head; subsequent sets use the first
170 * subxid not on the previous page as head. This way, we only have to
171 * lock/modify each SLRU page once.
172 */
173 headxid = xid;
174 i = 0;
175 for (;;)
176 {
178 int j;
179
180 for (j = i; j < nsubxids; j++)
181 {
182 if (TransactionIdToCTsPage(subxids[j]) != pageno)
183 break;
184 }
185 /* subxids[i..j] are on the same page as the head */
186
187 SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
188 pageno);
189
190 /* if we wrote out all subxids, we're done. */
191 if (j >= nsubxids)
192 break;
193
194 /*
195 * Set the new head and skip over it, as well as over the subxids we
196 * just wrote.
197 */
198 headxid = subxids[j];
199 i = j + 1;
200 }
201
202 /* update the cached value in shared memory */
207
208 /* and move forwards our endpoint, if needed */
212}
213
214/*
215 * Record the commit timestamp of transaction entries in the commit log for all
216 * entries on a single page. Atomic only on this page.
217 */
218static void
220 TransactionId *subxids, TimestampTz ts,
221 RepOriginId nodeid, int64 pageno)
222{
223 LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
224 int slotno;
225 int i;
226
228
229 slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
230
231 TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
232 for (i = 0; i < nsubxids; i++)
233 TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
234
235 CommitTsCtl->shared->page_dirty[slotno] = true;
236
237 LWLockRelease(lock);
238}
239
240/*
241 * Sets the commit timestamp of a single transaction.
242 *
243 * Caller must hold the correct SLRU bank lock, will be held at exit
244 */
245static void
247 RepOriginId nodeid, int slotno)
248{
251
253
254 entry.time = ts;
255 entry.nodeid = nodeid;
256
257 memcpy(CommitTsCtl->shared->page_buffer[slotno] +
260}
261
262/*
263 * Interrogate the commit timestamp of a transaction.
264 *
265 * The return value indicates whether a commit timestamp record was found for
266 * the given xid. The timestamp value is returned in *ts (which may not be
267 * null), and the origin node for the Xid is returned in *nodeid, if it's not
268 * null.
269 */
270bool
272 RepOriginId *nodeid)
273{
274 int64 pageno = TransactionIdToCTsPage(xid);
276 int slotno;
278 TransactionId oldestCommitTsXid;
279 TransactionId newestCommitTsXid;
280
281 if (!TransactionIdIsValid(xid))
284 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
285 else if (!TransactionIdIsNormal(xid))
286 {
287 /* frozen and bootstrap xids are always committed far in the past */
288 *ts = 0;
289 if (nodeid)
290 *nodeid = 0;
291 return false;
292 }
293
295
296 /* Error if module not enabled */
299
300 /*
301 * If we're asked for the cached value, return that. Otherwise, fall
302 * through to read from SLRU.
303 */
304 if (commitTsShared->xidLastCommit == xid)
305 {
307 if (nodeid)
309
311 return *ts != 0;
312 }
313
314 oldestCommitTsXid = TransamVariables->oldestCommitTsXid;
315 newestCommitTsXid = TransamVariables->newestCommitTsXid;
316 /* neither is invalid, or both are */
317 Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
319
320 /*
321 * Return empty if the requested value is outside our valid range.
322 */
323 if (!TransactionIdIsValid(oldestCommitTsXid) ||
324 TransactionIdPrecedes(xid, oldestCommitTsXid) ||
325 TransactionIdPrecedes(newestCommitTsXid, xid))
326 {
327 *ts = 0;
328 if (nodeid)
329 *nodeid = InvalidRepOriginId;
330 return false;
331 }
332
333 /* lock is acquired by SimpleLruReadPage_ReadOnly */
335 memcpy(&entry,
336 CommitTsCtl->shared->page_buffer[slotno] +
339
340 *ts = entry.time;
341 if (nodeid)
342 *nodeid = entry.nodeid;
343
345 return *ts != 0;
346}
347
348/*
349 * Return the Xid of the latest committed transaction. (As far as this module
350 * is concerned, anyway; it's up to the caller to ensure the value is useful
351 * for its purposes.)
352 *
353 * ts and nodeid are filled with the corresponding data; they can be passed
354 * as NULL if not wanted.
355 */
358{
359 TransactionId xid;
360
362
363 /* Error if module not enabled */
366
368 if (ts)
370 if (nodeid)
373
374 return xid;
375}
376
377static void
379{
382 errmsg("could not get commit timestamp data"),
384 errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
385 "track_commit_timestamp") :
386 errhint("Make sure the configuration parameter \"%s\" is set.",
387 "track_commit_timestamp")));
388}
389
390/*
391 * SQL-callable wrapper to obtain commit time of a transaction
392 */
393Datum
395{
397 TimestampTz ts;
398 bool found;
399
400 found = TransactionIdGetCommitTsData(xid, &ts, NULL);
401
402 if (!found)
404
406}
407
408
409/*
410 * pg_last_committed_xact
411 *
412 * SQL-callable wrapper to obtain some information about the latest
413 * committed transaction: transaction ID, timestamp and replication
414 * origin.
415 */
416Datum
418{
419 TransactionId xid;
420 RepOriginId nodeid;
421 TimestampTz ts;
422 Datum values[3];
423 bool nulls[3];
424 TupleDesc tupdesc;
425 HeapTuple htup;
426
427 /* and construct a tuple with our data */
428 xid = GetLatestCommitTsData(&ts, &nodeid);
429
430 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
431 elog(ERROR, "return type must be a row type");
432
433 if (!TransactionIdIsNormal(xid))
434 {
435 memset(nulls, true, sizeof(nulls));
436 }
437 else
438 {
440 nulls[0] = false;
441
443 nulls[1] = false;
444
445 values[2] = ObjectIdGetDatum((Oid) nodeid);
446 nulls[2] = false;
447 }
448
449 htup = heap_form_tuple(tupdesc, values, nulls);
450
452}
453
454/*
455 * pg_xact_commit_timestamp_origin
456 *
457 * SQL-callable wrapper to obtain commit timestamp and replication origin
458 * of a given transaction.
459 */
460Datum
462{
464 RepOriginId nodeid;
465 TimestampTz ts;
466 Datum values[2];
467 bool nulls[2];
468 TupleDesc tupdesc;
469 HeapTuple htup;
470 bool found;
471
472 found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
473
474 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
475 elog(ERROR, "return type must be a row type");
476
477 if (!found)
478 {
479 memset(nulls, true, sizeof(nulls));
480 }
481 else
482 {
484 nulls[0] = false;
485
486 values[1] = ObjectIdGetDatum((Oid) nodeid);
487 nulls[1] = false;
488 }
489
490 htup = heap_form_tuple(tupdesc, values, nulls);
491
493}
494
495/*
496 * Number of shared CommitTS buffers.
497 *
498 * If asked to autotune, use 2MB for every 1GB of shared buffers, up to 8MB.
499 * Otherwise just cap the configured amount to be between 16 and the maximum
500 * allowed.
501 */
502static int
504{
505 /* auto-tune based on shared buffers */
507 return SimpleLruAutotuneBuffers(512, 1024);
508
510}
511
512/*
513 * Shared memory sizing for CommitTs
514 */
515Size
517{
519 sizeof(CommitTimestampShared);
520}
521
522/*
523 * Initialize CommitTs at system startup (postmaster start or standalone
524 * backend)
525 */
526void
528{
529 bool found;
530
531 /* If auto-tuning is requested, now is the time to do it */
533 {
534 char buf[32];
535
536 snprintf(buf, sizeof(buf), "%d", CommitTsShmemBuffers());
537 SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
539
540 /*
541 * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
542 * However, if the DBA explicitly set commit_timestamp_buffers = 0 in
543 * the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override
544 * that and we must force the matter with PGC_S_OVERRIDE.
545 */
546 if (commit_timestamp_buffers == 0) /* failed to apply it? */
547 SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
549 }
551
552 CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
553 SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
554 "pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER,
557 false);
559
560 commitTsShared = ShmemInitStruct("CommitTs shared",
561 sizeof(CommitTimestampShared),
562 &found);
563
565 {
566 Assert(!found);
567
572 }
573 else
574 Assert(found);
575}
576
577/*
578 * GUC check_hook for commit_timestamp_buffers
579 */
580bool
582{
583 return check_slru_buffers("commit_timestamp_buffers", newval);
584}
585
586/*
587 * This function must be called ONCE on system install.
588 *
589 * (The CommitTs directory is assumed to have been created by initdb, and
590 * CommitTsShmemInit must have been called already.)
591 */
592void
594{
595 /*
596 * Nothing to do here at present, unlike most other SLRU modules; segments
597 * are created when the server is started with this module enabled. See
598 * ActivateCommitTs.
599 */
600}
601
602/*
603 * This must be called ONCE during postmaster or standalone-backend startup,
604 * after StartupXLOG has initialized TransamVariables->nextXid.
605 */
606void
607StartupCommitTs(void)
608{
610}
611
612/*
613 * This must be called ONCE during postmaster or standalone-backend startup,
614 * after recovery has finished.
615 */
616void
618{
619 /*
620 * If the feature is not enabled, turn it off for good. This also removes
621 * any leftover data.
622 *
623 * Conversely, we activate the module if the feature is enabled. This is
624 * necessary for primary and standby as the activation depends on the
625 * control file contents at the beginning of recovery or when a
626 * XLOG_PARAMETER_CHANGE is replayed.
627 */
630 else
632}
633
634/*
635 * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
636 * XLog record during recovery.
637 */
638void
640{
641 /*
642 * If the commit_ts module is disabled in this server and we get word from
643 * the primary server that it is enabled there, activate it so that we can
644 * replay future WAL records involving it; also mark it as active on
645 * pg_control. If the old value was already set, we already did this, so
646 * don't do anything.
647 *
648 * If the module is disabled in the primary, disable it here too, unless
649 * the module is enabled locally.
650 *
651 * Note this only runs in the recovery process, so an unlocked read is
652 * fine.
653 */
654 if (newvalue)
655 {
658 }
661}
662
663/*
664 * Activate this module whenever necessary.
665 * This must happen during postmaster or standalone-backend startup,
666 * or during WAL replay anytime the track_commit_timestamp setting is
667 * changed in the primary.
668 *
669 * The reason why this SLRU needs separate activation/deactivation functions is
670 * that it can be enabled/disabled during start and the activation/deactivation
671 * on the primary is propagated to the standby via replay. Other SLRUs don't
672 * have this property and they can be just initialized during normal startup.
673 *
674 * This is in charge of creating the currently active segment, if it's not
675 * already there. The reason for this is that the server might have been
676 * running with this module disabled for a while and thus might have skipped
677 * the normal creation point.
678 */
679static void
681{
682 TransactionId xid;
683 int64 pageno;
684
685 /*
686 * During bootstrap, we should not register commit timestamps so skip the
687 * activation in this case.
688 */
690 return;
691
692 /* If we've done this already, there's nothing to do */
695 {
697 return;
698 }
700
702 pageno = TransactionIdToCTsPage(xid);
703
704 /*
705 * Re-Initialize our idea of the latest page number.
706 */
707 pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number, pageno);
708
709 /*
710 * If CommitTs is enabled, but it wasn't in the previous server run, we
711 * need to set the oldest and newest values to the next Xid; that way, we
712 * will not try to read data that might not have been set.
713 *
714 * XXX does this have a problem if a server is started with commitTs
715 * enabled, then started with commitTs disabled, then restarted with it
716 * enabled again? It doesn't look like it does, because there should be a
717 * checkpoint that sets the value to InvalidTransactionId at end of
718 * recovery; and so any chance of injecting new transactions without
719 * CommitTs values would occur after the oldestCommitTsXid has been set to
720 * Invalid temporarily.
721 */
724 {
727 }
729
730 /* Create the current segment file, if necessary */
733
734 /* Change the activation status in shared memory. */
738}
739
740/*
741 * Deactivate this module.
742 *
743 * This must be called when the track_commit_timestamp parameter is turned off.
744 * This happens during postmaster or standalone-backend startup, or during WAL
745 * replay.
746 *
747 * Resets CommitTs into invalid state to make sure we don't hand back
748 * possibly-invalid data; also removes segments of old data.
749 */
750static void
752{
753 /*
754 * Cleanup the status in the shared memory.
755 *
756 * We reset everything in the commitTsShared record to prevent user from
757 * getting confusing data about last committed transaction on the standby
758 * when the module was activated repeatedly on the primary.
759 */
761
766
769
770 /*
771 * Remove *all* files. This is necessary so that there are no leftover
772 * files; in the case where this feature is later enabled after running
773 * with it disabled for some time there may be a gap in the file sequence.
774 * (We can probably tolerate out-of-sequence files, as they are going to
775 * be overwritten anyway when we wrap around, but it seems better to be
776 * tidy.)
777 *
778 * Note that we do this with CommitTsLock acquired in exclusive mode. This
779 * is very heavy-handed, but since this routine can only be called in the
780 * replica and should happen very rarely, we don't worry too much about
781 * it. Note also that no process should be consulting this SLRU if we
782 * have just deactivated it.
783 */
785
787}
788
789/*
790 * Perform a checkpoint --- either during shutdown, or on-the-fly
791 */
792void
794{
795 /*
796 * Write dirty CommitTs pages to disk. This may result in sync requests
797 * queued for later handling by ProcessSyncRequests(), as part of the
798 * checkpoint.
799 */
801}
802
803/*
804 * Make sure that CommitTs has room for a newly-allocated XID.
805 *
806 * NB: this is called while holding XidGenLock. We want it to be very fast
807 * most of the time; even when it's not so fast, no actual I/O need happen
808 * unless we're forced to write out a dirty CommitTs or xlog page to make room
809 * in shared memory.
810 *
811 * NB: the current implementation relies on track_commit_timestamp being
812 * PGC_POSTMASTER.
813 */
814void
816{
817 int64 pageno;
818 LWLock *lock;
819
820 /*
821 * Nothing to do if module not enabled. Note we do an unlocked read of
822 * the flag here, which is okay because this routine is only called from
823 * GetNewTransactionId, which is never called in a standby.
824 */
827 return;
828
829 /*
830 * No work except at first XID of a page. But beware: just after
831 * wraparound, the first XID of page zero is FirstNormalTransactionId.
832 */
835 return;
836
838
839 lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
840
842
843 /* Zero the page ... */
845
846 /* and make a WAL entry about that, unless we're in REDO */
847 if (!InRecovery)
849
850 LWLockRelease(lock);
851}
852
853/*
854 * Remove all CommitTs segments before the one holding the passed
855 * transaction ID.
856 *
857 * Note that we don't need to flush XLOG here.
858 */
859void
861{
863
864 /*
865 * The cutoff point is the start of the segment containing oldestXact. We
866 * pass the *page* containing oldestXact to SimpleLruTruncate.
867 */
869
870 /* Check to see if there's any files that could be removed */
872 &cutoffPage))
873 return; /* nothing to remove */
874
875 /* Write XLOG record */
876 WriteTruncateXlogRec(cutoffPage, oldestXact);
877
878 /* Now we can remove the old CommitTs segment(s) */
880}
881
882/*
883 * Set the limit values between which commit TS can be consulted.
884 */
885void
887{
888 /*
889 * Be careful not to overwrite values that are either further into the
890 * "future" or signal a disabled committs.
891 */
894 {
899 }
900 else
901 {
905 }
907}
908
909/*
910 * Move forwards the oldest commitTS value that can be consulted
911 */
912void
914{
920}
921
922
923/*
924 * Decide whether a commitTS page number is "older" for truncation purposes.
925 * Analogous to CLOGPagePrecedes().
926 *
927 * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128. This
928 * introduces differences compared to CLOG and the other SLRUs having (1 <<
929 * 31) % per_page == 0. This function never tests exactly
930 * TransactionIdPrecedes(x-2^31, x). When the system reaches xidStopLimit,
931 * there are two possible counts of page boundaries between oldestXact and the
932 * latest XID assigned, depending on whether oldestXact is within the first
933 * 128 entries of its page. Since this function doesn't know the location of
934 * oldestXact within page2, it returns false for one page that actually is
935 * expendable. This is a wider (yet still negligible) version of the
936 * truncation opportunity that CLOGPagePrecedes() cannot recognize.
937 *
938 * For the sake of a worked example, number entries with decimal values such
939 * that page1==1 entries range from 1.0 to 1.999. Let N+0.15 be the number of
940 * pages that 2^31 entries will span (N is an integer). If oldestXact=N+2.1,
941 * then the final safe XID assignment leaves newestXact=1.95. We keep page 2,
942 * because entry=2.85 is the border that toggles whether entries precede the
943 * last entry of the oldestXact page. While page 2 is expendable at
944 * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
945 */
946static bool
948{
951
956
957 return (TransactionIdPrecedes(xid1, xid2) &&
959}
960
961
962/*
963 * Write a TRUNCATE xlog record
964 */
965static void
967{
969
970 xlrec.pageno = pageno;
971 xlrec.oldestXid = oldestXid;
972
976}
977
978/*
979 * CommitTS resource manager's routines
980 */
981void
983{
984 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
985
986 /* Backup blocks are not used in commit_ts records */
988
989 if (info == COMMIT_TS_ZEROPAGE)
990 {
991 int64 pageno;
992
993 memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
995 }
996 else if (info == COMMIT_TS_TRUNCATE)
997 {
999
1000 AdvanceOldestCommitTsXid(trunc->oldestXid);
1001
1002 /*
1003 * During XLOG replay, latest_page_number isn't set up yet; insert a
1004 * suitable value to bypass the sanity test in SimpleLruTruncate.
1005 */
1006 pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number,
1007 trunc->pageno);
1008
1010 }
1011 else
1012 elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1013}
1014
1015/*
1016 * Entrypoint for sync.c to sync commit_ts files.
1017 */
1018int
1019committssyncfiletag(const FileTag *ftag, char *path)
1020{
1021 return SlruSyncFileTag(CommitTsCtl, ftag, path);
1022}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:485
static Datum values[MAXATTR]
Definition bootstrap.c:155
#define Min(x, y)
Definition c.h:1007
uint8_t uint8
Definition c.h:554
#define Max(x, y)
Definition c.h:1001
#define Assert(condition)
Definition c.h:883
int64_t int64
Definition c.h:553
uint32 TransactionId
Definition c.h:676
size_t Size
Definition c.h:629
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, RepOriginId nodeid, int64 pageno)
Definition commit_ts.c:220
void StartupCommitTs(void)
Definition commit_ts.c:608
static SlruCtlData CommitTsCtlData
Definition commit_ts.c:83
Datum pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
Definition commit_ts.c:462
Datum pg_last_committed_xact(PG_FUNCTION_ARGS)
Definition commit_ts.c:418
TransactionId GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
Definition commit_ts.c:358
void CommitTsParameterChange(bool newvalue, bool oldvalue)
Definition commit_ts.c:640
#define COMMIT_TS_XACTS_PER_PAGE
Definition commit_ts.c:63
#define TransactionIdToCTsEntry(xid)
Definition commit_ts.c:77
static void DeactivateCommitTs(void)
Definition commit_ts.c:752
Size CommitTsShmemSize(void)
Definition commit_ts.c:517
bool track_commit_timestamp
Definition commit_ts.c:109
void AdvanceOldestCommitTsXid(TransactionId oldestXact)
Definition commit_ts.c:914
static CommitTimestampShared * commitTsShared
Definition commit_ts.c:105
int committssyncfiletag(const FileTag *ftag, char *path)
Definition commit_ts.c:1020
void CompleteCommitTsInitialization(void)
Definition commit_ts.c:618
bool check_commit_ts_buffers(int *newval, void **extra, GucSource source)
Definition commit_ts.c:582
static void ActivateCommitTs(void)
Definition commit_ts.c:681
static int64 TransactionIdToCTsPage(TransactionId xid)
Definition commit_ts.c:72
void TruncateCommitTs(TransactionId oldestXact)
Definition commit_ts.c:861
void commit_ts_redo(XLogReaderState *record)
Definition commit_ts.c:983
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)
Definition commit_ts.c:272
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, RepOriginId nodeid, int slotno)
Definition commit_ts.c:247
Datum pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
Definition commit_ts.c:395
static int CommitTsShmemBuffers(void)
Definition commit_ts.c:504
static void error_commit_ts_disabled(void)
Definition commit_ts.c:379
static bool CommitTsPagePrecedes(int64 page1, int64 page2)
Definition commit_ts.c:948
#define SizeOfCommitTimestampEntry
Definition commit_ts.c:60
void BootStrapCommitTs(void)
Definition commit_ts.c:594
void CommitTsShmemInit(void)
Definition commit_ts.c:528
void SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
Definition commit_ts.c:887
#define CommitTsCtl
Definition commit_ts.c:85
void ExtendCommitTs(TransactionId newestXact)
Definition commit_ts.c:816
void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid)
Definition commit_ts.c:139
static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid)
Definition commit_ts.c:967
void CheckPointCommitTs(void)
Definition commit_ts.c:794
#define COMMIT_TS_ZEROPAGE
Definition commit_ts.h:46
#define SizeOfCommitTsTruncate
Definition commit_ts.h:55
#define COMMIT_TS_TRUNCATE
Definition commit_ts.h:47
int64 TimestampTz
Definition timestamp.h:39
#define TIMESTAMP_NOBEGIN(j)
Definition timestamp.h:159
int errhint(const char *fmt,...)
Definition elog.c:1330
int errcode(int sqlerrcode)
Definition elog.c:863
int errmsg(const char *fmt,...)
Definition elog.c:1080
#define PANIC
Definition elog.h:42
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
#define PG_RETURN_NULL()
Definition fmgr.h:346
#define PG_GETARG_TRANSACTIONID(n)
Definition fmgr.h:280
#define PG_RETURN_DATUM(x)
Definition fmgr.h:354
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition funcapi.c:276
@ TYPEFUNC_COMPOSITE
Definition funcapi.h:149
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
Definition funcapi.h:230
bool IsUnderPostmaster
Definition globals.c:120
int commit_timestamp_buffers
Definition globals.c:161
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition guc.c:4196
#define newval
GucSource
Definition guc.h:112
@ PGC_S_DYNAMIC_DEFAULT
Definition guc.h:114
@ PGC_S_OVERRIDE
Definition guc.h:123
@ PGC_POSTMASTER
Definition guc.h:74
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1117
int j
Definition isn.c:78
int i
Definition isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1176
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1793
@ LW_SHARED
Definition lwlock.h:113
@ LW_EXCLUSIVE
Definition lwlock.h:112
#define IsBootstrapProcessingMode()
Definition miscadmin.h:477
#define InvalidRepOriginId
Definition origin.h:33
static rewind_source * source
Definition pg_rewind.c:89
static char buf[DEFAULT_XLOG_SEG_SIZE]
int64 timestamp
#define snprintf
Definition port.h:260
static Datum TransactionIdGetDatum(TransactionId X)
Definition postgres.h:302
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:262
uint64_t Datum
Definition postgres.h:70
unsigned int Oid
static int fb(int x)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:389
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)
Definition slru.c:252
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
Definition slru.c:630
void SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
Definition slru.c:1347
int SimpleLruAutotuneBuffers(int divisor, int max)
Definition slru.c:231
bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int64 pageno)
Definition slru.c:771
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition slru.c:1816
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
Definition slru.c:1769
int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, TransactionId xid)
Definition slru.c:527
int SlruSyncFileTag(SlruCtl ctl, const FileTag *ftag, char *path)
Definition slru.c:1856
int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
Definition slru.c:375
void SimpleLruZeroAndWritePage(SlruCtl ctl, int64 pageno)
Definition slru.c:444
void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
Definition slru.c:1433
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition slru.c:198
bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int64 segpage, void *data)
Definition slru.c:1737
bool check_slru_buffers(const char *name, int *newval)
Definition slru.c:355
static LWLock * SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)
Definition slru.h:160
#define SlruPagePrecedesUnitTests(ctl, per_page)
Definition slru.h:185
#define SLRU_MAX_ALLOWED_BUFFERS
Definition slru.h:24
TimestampTz time
Definition commit_ts.c:56
RepOriginId nodeid
Definition commit_ts.c:57
CommitTimestampEntry dataLastCommit
Definition commit_ts.c:101
TransactionId xidLastCommit
Definition commit_ts.c:100
Definition sync.h:51
TransactionId oldestCommitTsXid
Definition transam.h:232
TransactionId newestCommitTsXid
Definition transam.h:233
FullTransactionId nextXid
Definition transam.h:220
@ SYNC_HANDLER_COMMIT_TS
Definition sync.h:39
static TransactionId ReadNextTransactionId(void)
Definition transam.h:377
#define InvalidTransactionId
Definition transam.h:31
#define TransactionIdEquals(id1, id2)
Definition transam.h:43
#define XidFromFullTransactionId(x)
Definition transam.h:48
#define FirstNormalTransactionId
Definition transam.h:34
#define TransactionIdIsValid(xid)
Definition transam.h:41
#define TransactionIdIsNormal(xid)
Definition transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263
static Datum TimestampTzGetDatum(TimestampTz X)
Definition timestamp.h:52
#define PG_RETURN_TIMESTAMPTZ(x)
Definition timestamp.h:68
TransamVariablesData * TransamVariables
Definition varsup.c:34
bool RecoveryInProgress(void)
Definition xlog.c:6461
uint16 RepOriginId
Definition xlogdefs.h:69
XLogRecPtr XLogSimpleInsertInt64(RmgrId rmid, uint8 info, int64 value)
Definition xloginsert.c:543
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition xloginsert.c:478
void XLogRegisterData(const void *data, uint32 len)
Definition xloginsert.c:368
void XLogBeginInsert(void)
Definition xloginsert.c:152
#define XLogRecGetInfo(decoder)
Definition xlogreader.h:409
#define XLogRecGetData(decoder)
Definition xlogreader.h:414
#define XLogRecHasAnyBlockRefs(decoder)
Definition xlogreader.h:416
bool InRecovery
Definition xlogutils.c:50

◆ CommitTsCtl

#define CommitTsCtl   (&CommitTsCtlData)

Definition at line 85 of file commit_ts.c.

◆ SizeOfCommitTimestampEntry

#define SizeOfCommitTimestampEntry
Value:

Definition at line 60 of file commit_ts.c.

◆ TransactionIdToCTsEntry

#define TransactionIdToCTsEntry (   xid)     ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)

Definition at line 77 of file commit_ts.c.

Typedef Documentation

◆ CommitTimestampEntry

◆ CommitTimestampShared

Function Documentation

◆ ActivateCommitTs()

static void ActivateCommitTs ( void  )
static

Definition at line 681 of file commit_ts.c.

682{
683 TransactionId xid;
684 int64 pageno;
685
686 /*
687 * During bootstrap, we should not register commit timestamps so skip the
688 * activation in this case.
689 */
691 return;
692
693 /* If we've done this already, there's nothing to do */
696 {
698 return;
699 }
701
703 pageno = TransactionIdToCTsPage(xid);
704
705 /*
706 * Re-Initialize our idea of the latest page number.
707 */
708 pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number, pageno);
709
710 /*
711 * If CommitTs is enabled, but it wasn't in the previous server run, we
712 * need to set the oldest and newest values to the next Xid; that way, we
713 * will not try to read data that might not have been set.
714 *
715 * XXX does this have a problem if a server is started with commitTs
716 * enabled, then started with commitTs disabled, then restarted with it
717 * enabled again? It doesn't look like it does, because there should be a
718 * checkpoint that sets the value to InvalidTransactionId at end of
719 * recovery; and so any chance of injecting new transactions without
720 * CommitTs values would occur after the oldestCommitTsXid has been set to
721 * Invalid temporarily.
722 */
725 {
728 }
730
731 /* Create the current segment file, if necessary */
734
735 /* Change the activation status in shared memory. */
739}

References CommitTimestampShared::commitTsActive, CommitTsCtl, commitTsShared, fb(), InvalidTransactionId, IsBootstrapProcessingMode, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), TransamVariablesData::newestCommitTsXid, TransamVariablesData::nextXid, TransamVariablesData::oldestCommitTsXid, pg_atomic_write_u64(), ReadNextTransactionId(), SimpleLruDoesPhysicalPageExist(), SimpleLruZeroAndWritePage(), TransactionIdToCTsPage(), TransamVariables, and XidFromFullTransactionId.

Referenced by CommitTsParameterChange(), CompleteCommitTsInitialization(), and StartupCommitTs().

◆ AdvanceOldestCommitTsXid()

◆ BootStrapCommitTs()

void BootStrapCommitTs ( void  )

Definition at line 594 of file commit_ts.c.

595{
596 /*
597 * Nothing to do here at present, unlike most other SLRU modules; segments
598 * are created when the server is started with this module enabled. See
599 * ActivateCommitTs.
600 */
601}

Referenced by BootStrapXLOG().

◆ check_commit_ts_buffers()

bool check_commit_ts_buffers ( int newval,
void **  extra,
GucSource  source 
)

Definition at line 582 of file commit_ts.c.

583{
584 return check_slru_buffers("commit_timestamp_buffers", newval);
585}

References check_slru_buffers(), and newval.

◆ CheckPointCommitTs()

void CheckPointCommitTs ( void  )

Definition at line 794 of file commit_ts.c.

795{
796 /*
797 * Write dirty CommitTs pages to disk. This may result in sync requests
798 * queued for later handling by ProcessSyncRequests(), as part of the
799 * checkpoint.
800 */
802}

References CommitTsCtl, and SimpleLruWriteAll().

Referenced by CheckPointGuts().

◆ commit_ts_redo()

void commit_ts_redo ( XLogReaderState record)

Definition at line 983 of file commit_ts.c.

984{
985 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
986
987 /* Backup blocks are not used in commit_ts records */
989
990 if (info == COMMIT_TS_ZEROPAGE)
991 {
992 int64 pageno;
993
994 memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
996 }
997 else if (info == COMMIT_TS_TRUNCATE)
998 {
1000
1001 AdvanceOldestCommitTsXid(trunc->oldestXid);
1002
1003 /*
1004 * During XLOG replay, latest_page_number isn't set up yet; insert a
1005 * suitable value to bypass the sanity test in SimpleLruTruncate.
1006 */
1007 pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number,
1008 trunc->pageno);
1009
1011 }
1012 else
1013 elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1014}

References AdvanceOldestCommitTsXid(), Assert, COMMIT_TS_TRUNCATE, COMMIT_TS_ZEROPAGE, CommitTsCtl, elog, fb(), PANIC, pg_atomic_write_u64(), SimpleLruTruncate(), SimpleLruZeroAndWritePage(), XLogRecGetData, XLogRecGetInfo, and XLogRecHasAnyBlockRefs.

◆ CommitTsPagePrecedes()

◆ CommitTsParameterChange()

void CommitTsParameterChange ( bool  newvalue,
bool  oldvalue 
)

Definition at line 640 of file commit_ts.c.

641{
642 /*
643 * If the commit_ts module is disabled in this server and we get word from
644 * the primary server that it is enabled there, activate it so that we can
645 * replay future WAL records involving it; also mark it as active on
646 * pg_control. If the old value was already set, we already did this, so
647 * don't do anything.
648 *
649 * If the module is disabled in the primary, disable it here too, unless
650 * the module is enabled locally.
651 *
652 * Note this only runs in the recovery process, so an unlocked read is
653 * fine.
654 */
655 if (newvalue)
656 {
659 }
662}

References ActivateCommitTs(), CommitTimestampShared::commitTsActive, commitTsShared, DeactivateCommitTs(), and fb().

Referenced by xlog_redo().

◆ CommitTsShmemBuffers()

static int CommitTsShmemBuffers ( void  )
static

Definition at line 504 of file commit_ts.c.

505{
506 /* auto-tune based on shared buffers */
508 return SimpleLruAutotuneBuffers(512, 1024);
509
511}

References commit_timestamp_buffers, Max, Min, SimpleLruAutotuneBuffers(), and SLRU_MAX_ALLOWED_BUFFERS.

Referenced by CommitTsShmemInit(), and CommitTsShmemSize().

◆ CommitTsShmemInit()

void CommitTsShmemInit ( void  )

Definition at line 528 of file commit_ts.c.

529{
530 bool found;
531
532 /* If auto-tuning is requested, now is the time to do it */
534 {
535 char buf[32];
536
537 snprintf(buf, sizeof(buf), "%d", CommitTsShmemBuffers());
538 SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
540
541 /*
542 * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
543 * However, if the DBA explicitly set commit_timestamp_buffers = 0 in
544 * the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override
545 * that and we must force the matter with PGC_S_OVERRIDE.
546 */
547 if (commit_timestamp_buffers == 0) /* failed to apply it? */
548 SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
550 }
552
553 CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
554 SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
555 "pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER,
558 false);
560
561 commitTsShared = ShmemInitStruct("CommitTs shared",
562 sizeof(CommitTimestampShared),
563 &found);
564
566 {
567 Assert(!found);
568
573 }
574 else
575 Assert(found);
576}

References Assert, buf, commit_timestamp_buffers, COMMIT_TS_XACTS_PER_PAGE, CommitTimestampShared::commitTsActive, CommitTsCtl, CommitTsPagePrecedes(), commitTsShared, CommitTsShmemBuffers(), CommitTimestampShared::dataLastCommit, fb(), InvalidRepOriginId, InvalidTransactionId, IsUnderPostmaster, CommitTimestampEntry::nodeid, PGC_POSTMASTER, PGC_S_DYNAMIC_DEFAULT, PGC_S_OVERRIDE, SetConfigOption(), ShmemInitStruct(), SimpleLruInit(), SlruPagePrecedesUnitTests, snprintf, SYNC_HANDLER_COMMIT_TS, CommitTimestampEntry::time, TIMESTAMP_NOBEGIN, and CommitTimestampShared::xidLastCommit.

Referenced by CreateOrAttachShmemStructs().

◆ CommitTsShmemSize()

Size CommitTsShmemSize ( void  )

Definition at line 517 of file commit_ts.c.

518{
520 sizeof(CommitTimestampShared);
521}

References CommitTsShmemBuffers(), and SimpleLruShmemSize().

Referenced by CalculateShmemSize().

◆ committssyncfiletag()

int committssyncfiletag ( const FileTag ftag,
char path 
)

Definition at line 1020 of file commit_ts.c.

1021{
1022 return SlruSyncFileTag(CommitTsCtl, ftag, path);
1023}

References CommitTsCtl, and SlruSyncFileTag().

◆ CompleteCommitTsInitialization()

void CompleteCommitTsInitialization ( void  )

Definition at line 618 of file commit_ts.c.

619{
620 /*
621 * If the feature is not enabled, turn it off for good. This also removes
622 * any leftover data.
623 *
624 * Conversely, we activate the module if the feature is enabled. This is
625 * necessary for primary and standby as the activation depends on the
626 * control file contents at the beginning of recovery or when a
627 * XLOG_PARAMETER_CHANGE is replayed.
628 */
631 else
633}

References ActivateCommitTs(), DeactivateCommitTs(), and track_commit_timestamp.

Referenced by StartupXLOG().

◆ DeactivateCommitTs()

static void DeactivateCommitTs ( void  )
static

Definition at line 752 of file commit_ts.c.

753{
754 /*
755 * Cleanup the status in the shared memory.
756 *
757 * We reset everything in the commitTsShared record to prevent user from
758 * getting confusing data about last committed transaction on the standby
759 * when the module was activated repeatedly on the primary.
760 */
762
767
770
771 /*
772 * Remove *all* files. This is necessary so that there are no leftover
773 * files; in the case where this feature is later enabled after running
774 * with it disabled for some time there may be a gap in the file sequence.
775 * (We can probably tolerate out-of-sequence files, as they are going to
776 * be overwritten anyway when we wrap around, but it seems better to be
777 * tidy.)
778 *
779 * Note that we do this with CommitTsLock acquired in exclusive mode. This
780 * is very heavy-handed, but since this routine can only be called in the
781 * replica and should happen very rarely, we don't worry too much about
782 * it. Note also that no process should be consulting this SLRU if we
783 * have just deactivated it.
784 */
786
788}

References CommitTimestampShared::commitTsActive, CommitTsCtl, commitTsShared, CommitTimestampShared::dataLastCommit, fb(), InvalidRepOriginId, InvalidTransactionId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), TransamVariablesData::newestCommitTsXid, CommitTimestampEntry::nodeid, TransamVariablesData::oldestCommitTsXid, SlruScanDirCbDeleteAll(), SlruScanDirectory(), CommitTimestampEntry::time, TIMESTAMP_NOBEGIN, TransamVariables, and CommitTimestampShared::xidLastCommit.

Referenced by CommitTsParameterChange(), and CompleteCommitTsInitialization().

◆ error_commit_ts_disabled()

static void error_commit_ts_disabled ( void  )
static

Definition at line 379 of file commit_ts.c.

380{
383 errmsg("could not get commit timestamp data"),
385 errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
386 "track_commit_timestamp") :
387 errhint("Make sure the configuration parameter \"%s\" is set.",
388 "track_commit_timestamp")));
389}

References ereport, errcode(), errhint(), errmsg(), ERROR, fb(), and RecoveryInProgress().

Referenced by GetLatestCommitTsData(), and TransactionIdGetCommitTsData().

◆ ExtendCommitTs()

void ExtendCommitTs ( TransactionId  newestXact)

Definition at line 816 of file commit_ts.c.

817{
818 int64 pageno;
819 LWLock *lock;
820
821 /*
822 * Nothing to do if module not enabled. Note we do an unlocked read of
823 * the flag here, which is okay because this routine is only called from
824 * GetNewTransactionId, which is never called in a standby.
825 */
828 return;
829
830 /*
831 * No work except at first XID of a page. But beware: just after
832 * wraparound, the first XID of page zero is FirstNormalTransactionId.
833 */
836 return;
837
839
840 lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
841
843
844 /* Zero the page ... */
846
847 /* and make a WAL entry about that, unless we're in REDO */
848 if (!InRecovery)
850
851 LWLockRelease(lock);
852}

References Assert, COMMIT_TS_ZEROPAGE, CommitTimestampShared::commitTsActive, CommitTsCtl, commitTsShared, fb(), FirstNormalTransactionId, InRecovery, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SimpleLruGetBankLock(), SimpleLruZeroPage(), TransactionIdEquals, TransactionIdToCTsEntry, TransactionIdToCTsPage(), and XLogSimpleInsertInt64().

Referenced by GetNewTransactionId().

◆ GetLatestCommitTsData()

◆ pg_last_committed_xact()

Datum pg_last_committed_xact ( PG_FUNCTION_ARGS  )

Definition at line 418 of file commit_ts.c.

419{
420 TransactionId xid;
421 RepOriginId nodeid;
422 TimestampTz ts;
423 Datum values[3];
424 bool nulls[3];
425 TupleDesc tupdesc;
426 HeapTuple htup;
427
428 /* and construct a tuple with our data */
429 xid = GetLatestCommitTsData(&ts, &nodeid);
430
431 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
432 elog(ERROR, "return type must be a row type");
433
434 if (!TransactionIdIsNormal(xid))
435 {
436 memset(nulls, true, sizeof(nulls));
437 }
438 else
439 {
441 nulls[0] = false;
442
444 nulls[1] = false;
445
446 values[2] = ObjectIdGetDatum((Oid) nodeid);
447 nulls[2] = false;
448 }
449
450 htup = heap_form_tuple(tupdesc, values, nulls);
451
453}

References elog, ERROR, fb(), get_call_result_type(), GetLatestCommitTsData(), heap_form_tuple(), HeapTupleGetDatum(), ObjectIdGetDatum(), PG_RETURN_DATUM, TimestampTzGetDatum(), TransactionIdGetDatum(), TransactionIdIsNormal, TYPEFUNC_COMPOSITE, and values.

◆ pg_xact_commit_timestamp()

Datum pg_xact_commit_timestamp ( PG_FUNCTION_ARGS  )

Definition at line 395 of file commit_ts.c.

396{
398 TimestampTz ts;
399 bool found;
400
401 found = TransactionIdGetCommitTsData(xid, &ts, NULL);
402
403 if (!found)
405
407}

References fb(), PG_GETARG_TRANSACTIONID, PG_RETURN_NULL, PG_RETURN_TIMESTAMPTZ, and TransactionIdGetCommitTsData().

◆ pg_xact_commit_timestamp_origin()

Datum pg_xact_commit_timestamp_origin ( PG_FUNCTION_ARGS  )

Definition at line 462 of file commit_ts.c.

463{
465 RepOriginId nodeid;
466 TimestampTz ts;
467 Datum values[2];
468 bool nulls[2];
469 TupleDesc tupdesc;
470 HeapTuple htup;
471 bool found;
472
473 found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
474
475 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
476 elog(ERROR, "return type must be a row type");
477
478 if (!found)
479 {
480 memset(nulls, true, sizeof(nulls));
481 }
482 else
483 {
485 nulls[0] = false;
486
487 values[1] = ObjectIdGetDatum((Oid) nodeid);
488 nulls[1] = false;
489 }
490
491 htup = heap_form_tuple(tupdesc, values, nulls);
492
494}

References elog, ERROR, fb(), get_call_result_type(), heap_form_tuple(), HeapTupleGetDatum(), ObjectIdGetDatum(), PG_GETARG_TRANSACTIONID, PG_RETURN_DATUM, TimestampTzGetDatum(), TransactionIdGetCommitTsData(), TYPEFUNC_COMPOSITE, and values.

◆ SetCommitTsLimit()

void SetCommitTsLimit ( TransactionId  oldestXact,
TransactionId  newestXact 
)

◆ SetXidCommitTsInPage()

static void SetXidCommitTsInPage ( TransactionId  xid,
int  nsubxids,
TransactionId subxids,
TimestampTz  ts,
RepOriginId  nodeid,
int64  pageno 
)
static

Definition at line 220 of file commit_ts.c.

223{
224 LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
225 int slotno;
226 int i;
227
229
230 slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
231
232 TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
233 for (i = 0; i < nsubxids; i++)
234 TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
235
236 CommitTsCtl->shared->page_dirty[slotno] = true;
237
238 LWLockRelease(lock);
239}

References CommitTsCtl, fb(), i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SimpleLruGetBankLock(), SimpleLruReadPage(), and TransactionIdSetCommitTs().

Referenced by TransactionTreeSetCommitTsData().

◆ StartupCommitTs()

void StartupCommitTs ( void  )

Definition at line 608 of file commit_ts.c.

609{
611}

References ActivateCommitTs().

Referenced by StartupXLOG().

◆ TransactionIdGetCommitTsData()

bool TransactionIdGetCommitTsData ( TransactionId  xid,
TimestampTz ts,
RepOriginId nodeid 
)

Definition at line 272 of file commit_ts.c.

274{
275 int64 pageno = TransactionIdToCTsPage(xid);
277 int slotno;
279 TransactionId oldestCommitTsXid;
280 TransactionId newestCommitTsXid;
281
282 if (!TransactionIdIsValid(xid))
285 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
286 else if (!TransactionIdIsNormal(xid))
287 {
288 /* frozen and bootstrap xids are always committed far in the past */
289 *ts = 0;
290 if (nodeid)
291 *nodeid = 0;
292 return false;
293 }
294
296
297 /* Error if module not enabled */
300
301 /*
302 * If we're asked for the cached value, return that. Otherwise, fall
303 * through to read from SLRU.
304 */
305 if (commitTsShared->xidLastCommit == xid)
306 {
308 if (nodeid)
310
312 return *ts != 0;
313 }
314
315 oldestCommitTsXid = TransamVariables->oldestCommitTsXid;
316 newestCommitTsXid = TransamVariables->newestCommitTsXid;
317 /* neither is invalid, or both are */
318 Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
320
321 /*
322 * Return empty if the requested value is outside our valid range.
323 */
324 if (!TransactionIdIsValid(oldestCommitTsXid) ||
325 TransactionIdPrecedes(xid, oldestCommitTsXid) ||
326 TransactionIdPrecedes(newestCommitTsXid, xid))
327 {
328 *ts = 0;
329 if (nodeid)
330 *nodeid = InvalidRepOriginId;
331 return false;
332 }
333
334 /* lock is acquired by SimpleLruReadPage_ReadOnly */
336 memcpy(&entry,
337 CommitTsCtl->shared->page_buffer[slotno] +
340
341 *ts = entry.time;
342 if (nodeid)
343 *nodeid = entry.nodeid;
344
346 return *ts != 0;
347}

References Assert, CommitTimestampShared::commitTsActive, CommitTsCtl, commitTsShared, CommitTimestampShared::dataLastCommit, ereport, errcode(), errmsg(), ERROR, error_commit_ts_disabled(), fb(), InvalidRepOriginId, LW_SHARED, LWLockAcquire(), LWLockRelease(), TransamVariablesData::newestCommitTsXid, CommitTimestampEntry::nodeid, TransamVariablesData::oldestCommitTsXid, SimpleLruGetBankLock(), SimpleLruReadPage_ReadOnly(), SizeOfCommitTimestampEntry, CommitTimestampEntry::time, TransactionIdIsNormal, TransactionIdIsValid, TransactionIdPrecedes(), TransactionIdToCTsEntry, TransactionIdToCTsPage(), TransamVariables, and CommitTimestampShared::xidLastCommit.

Referenced by GetTupleTransactionInfo(), pg_xact_commit_timestamp(), pg_xact_commit_timestamp_origin(), and update_most_recent_deletion_info().

◆ TransactionIdSetCommitTs()

static void TransactionIdSetCommitTs ( TransactionId  xid,
TimestampTz  ts,
RepOriginId  nodeid,
int  slotno 
)
static

Definition at line 247 of file commit_ts.c.

249{
252
254
255 entry.time = ts;
256 entry.nodeid = nodeid;
257
258 memcpy(CommitTsCtl->shared->page_buffer[slotno] +
261}

References Assert, CommitTsCtl, fb(), CommitTimestampEntry::nodeid, SizeOfCommitTimestampEntry, CommitTimestampEntry::time, TransactionIdIsNormal, and TransactionIdToCTsEntry.

Referenced by SetXidCommitTsInPage().

◆ TransactionIdToCTsPage()

static int64 TransactionIdToCTsPage ( TransactionId  xid)
inlinestatic

◆ TransactionTreeSetCommitTsData()

void TransactionTreeSetCommitTsData ( TransactionId  xid,
int  nsubxids,
TransactionId subxids,
TimestampTz  timestamp,
RepOriginId  nodeid 
)

Definition at line 139 of file commit_ts.c.

142{
143 int i;
146
147 /*
148 * No-op if the module is not active.
149 *
150 * An unlocked read here is fine, because in a standby (the only place
151 * where the flag can change in flight) this routine is only called by the
152 * recovery process, which is also the only process which can change the
153 * flag.
154 */
156 return;
157
158 /*
159 * Figure out the latest Xid in this batch: either the last subxid if
160 * there's any, otherwise the parent xid.
161 */
162 if (nsubxids > 0)
163 newestXact = subxids[nsubxids - 1];
164 else
165 newestXact = xid;
166
167 /*
168 * We split the xids to set the timestamp to in groups belonging to the
169 * same SLRU page; the first element in each such set is its head. The
170 * first group has the main XID as the head; subsequent sets use the first
171 * subxid not on the previous page as head. This way, we only have to
172 * lock/modify each SLRU page once.
173 */
174 headxid = xid;
175 i = 0;
176 for (;;)
177 {
179 int j;
180
181 for (j = i; j < nsubxids; j++)
182 {
183 if (TransactionIdToCTsPage(subxids[j]) != pageno)
184 break;
185 }
186 /* subxids[i..j] are on the same page as the head */
187
188 SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
189 pageno);
190
191 /* if we wrote out all subxids, we're done. */
192 if (j >= nsubxids)
193 break;
194
195 /*
196 * Set the new head and skip over it, as well as over the subxids we
197 * just wrote.
198 */
199 headxid = subxids[j];
200 i = j + 1;
201 }
202
203 /* update the cached value in shared memory */
208
209 /* and move forwards our endpoint, if needed */
213}

References CommitTimestampShared::commitTsActive, commitTsShared, CommitTimestampShared::dataLastCommit, fb(), i, j, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), TransamVariablesData::newestCommitTsXid, CommitTimestampEntry::nodeid, SetXidCommitTsInPage(), CommitTimestampEntry::time, TransactionIdPrecedes(), TransactionIdToCTsPage(), TransamVariables, and CommitTimestampShared::xidLastCommit.

Referenced by RecordTransactionCommit(), RecordTransactionCommitPrepared(), and xact_redo_commit().

◆ TruncateCommitTs()

void TruncateCommitTs ( TransactionId  oldestXact)

Definition at line 861 of file commit_ts.c.

862{
864
865 /*
866 * The cutoff point is the start of the segment containing oldestXact. We
867 * pass the *page* containing oldestXact to SimpleLruTruncate.
868 */
870
871 /* Check to see if there's any files that could be removed */
873 &cutoffPage))
874 return; /* nothing to remove */
875
876 /* Write XLOG record */
877 WriteTruncateXlogRec(cutoffPage, oldestXact);
878
879 /* Now we can remove the old CommitTs segment(s) */
881}

References CommitTsCtl, fb(), SimpleLruTruncate(), SlruScanDirCbReportPresence(), SlruScanDirectory(), TransactionIdToCTsPage(), and WriteTruncateXlogRec().

Referenced by vac_truncate_clog().

◆ WriteTruncateXlogRec()

static void WriteTruncateXlogRec ( int64  pageno,
TransactionId  oldestXid 
)
static

Variable Documentation

◆ CommitTsCtlData

SlruCtlData CommitTsCtlData
static

Definition at line 83 of file commit_ts.c.

◆ commitTsShared

◆ track_commit_timestamp