PostgreSQL Source Code git master
Loading...
Searching...
No Matches
commit_ts.c File Reference
#include "postgres.h"
#include "access/commit_ts.h"
#include "access/htup_details.h"
#include "access/slru.h"
#include "access/transam.h"
#include "access/xloginsert.h"
#include "access/xlogutils.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "storage/shmem.h"
#include "storage/subsystems.h"
#include "utils/fmgrprotos.h"
#include "utils/guc_hooks.h"
#include "utils/timestamp.h"
Include dependency graph for commit_ts.c:

Go to the source code of this file.

Data Structures

struct  CommitTimestampEntry
 
struct  CommitTimestampShared
 

Macros

#define SizeOfCommitTimestampEntry
 
#define COMMIT_TS_XACTS_PER_PAGE    (BLCKSZ / SizeOfCommitTimestampEntry)
 
#define TransactionIdToCTsEntry(xid)    ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
 
#define CommitTsCtl   (&CommitTsSlruDesc)
 

Typedefs

typedef struct CommitTimestampEntry CommitTimestampEntry
 
typedef struct CommitTimestampShared CommitTimestampShared
 

Functions

static int64 TransactionIdToCTsPage (TransactionId xid)
 
static void CommitTsShmemRequest (void *arg)
 
static void CommitTsShmemInit (void *arg)
 
static bool CommitTsPagePrecedes (int64 page1, int64 page2)
 
static int commit_ts_errdetail_for_io_error (const void *opaque_data)
 
static void SetXidCommitTsInPage (TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, ReplOriginId nodeid, int64 pageno)
 
static void TransactionIdSetCommitTs (TransactionId xid, TimestampTz ts, ReplOriginId nodeid, int slotno)
 
static void error_commit_ts_disabled (void)
 
static void ActivateCommitTs (void)
 
static void DeactivateCommitTs (void)
 
static void WriteTruncateXlogRec (int64 pageno, TransactionId oldestXid)
 
void TransactionTreeSetCommitTsData (TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, ReplOriginId nodeid)
 
bool TransactionIdGetCommitTsData (TransactionId xid, TimestampTz *ts, ReplOriginId *nodeid)
 
TransactionId GetLatestCommitTsData (TimestampTz *ts, ReplOriginId *nodeid)
 
Datum pg_xact_commit_timestamp (PG_FUNCTION_ARGS)
 
Datum pg_last_committed_xact (PG_FUNCTION_ARGS)
 
Datum pg_xact_commit_timestamp_origin (PG_FUNCTION_ARGS)
 
static int CommitTsShmemBuffers (void)
 
bool check_commit_ts_buffers (int *newval, void **extra, GucSource source)
 
void BootStrapCommitTs (void)
 
void StartupCommitTs (void)
 
void CompleteCommitTsInitialization (void)
 
void CommitTsParameterChange (bool newvalue, bool oldvalue)
 
void CheckPointCommitTs (void)
 
void ExtendCommitTs (TransactionId newestXact)
 
void TruncateCommitTs (TransactionId oldestXact)
 
void SetCommitTsLimit (TransactionId oldestXact, TransactionId newestXact)
 
void AdvanceOldestCommitTsXid (TransactionId oldestXact)
 
void commit_ts_redo (XLogReaderState *record)
 
int committssyncfiletag (const FileTag *ftag, char *path)
 

Variables

const ShmemCallbacks CommitTsShmemCallbacks
 
static SlruDesc CommitTsSlruDesc
 
static CommitTimestampSharedcommitTsShared
 
bool track_commit_timestamp
 

Macro Definition Documentation

◆ COMMIT_TS_XACTS_PER_PAGE

#define COMMIT_TS_XACTS_PER_PAGE    (BLCKSZ / SizeOfCommitTimestampEntry)

Definition at line 64 of file commit_ts.c.

73{
74 return xid / (int64) COMMIT_TS_XACTS_PER_PAGE;
75}
76
77#define TransactionIdToCTsEntry(xid) \
78 ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
79
80/*
81 * Link to shared-memory data structures for CommitTs control
82 */
83static void CommitTsShmemRequest(void *arg);
84static void CommitTsShmemInit(void *arg);
86static int commit_ts_errdetail_for_io_error(const void *opaque_data);
87
90 .init_fn = CommitTsShmemInit,
91};
92
94
95#define CommitTsCtl (&CommitTsSlruDesc)
96
97/*
98 * We keep a cache of the last value set in shared memory.
99 *
100 * This is also good place to keep the activation status. We keep this
101 * separate from the GUC so that the standby can activate the module if the
102 * primary has it active independently of the value of the GUC.
103 *
104 * This is protected by CommitTsLock. In some places, we use commitTsActive
105 * without acquiring the lock; where this happens, a comment explains the
106 * rationale for it.
107 */
108typedef struct CommitTimestampShared
109{
112 bool commitTsActive;
114
116
117static void CommitTsShmemInit(void *arg);
118
119/* GUC variable */
121
122static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
123 TransactionId *subxids, TimestampTz ts,
124 ReplOriginId nodeid, int64 pageno);
126 ReplOriginId nodeid, int slotno);
127static void error_commit_ts_disabled(void);
128static void ActivateCommitTs(void);
129static void DeactivateCommitTs(void);
130static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid);
131
132/*
133 * TransactionTreeSetCommitTsData
134 *
135 * Record the final commit timestamp of transaction entries in the commit log
136 * for a transaction and its subtransaction tree, as efficiently as possible.
137 *
138 * xid is the top level transaction id.
139 *
140 * subxids is an array of xids of length nsubxids, representing subtransactions
141 * in the tree of xid. In various cases nsubxids may be zero.
142 * The reason why tracking just the parent xid commit timestamp is not enough
143 * is that the subtrans SLRU does not stay valid across crashes (it's not
144 * permanent) so we need to keep the information about them here. If the
145 * subtrans implementation changes in the future, we might want to revisit the
146 * decision of storing timestamp info for each subxid.
147 */
148void
151 ReplOriginId nodeid)
152{
153 int i;
156
157 /*
158 * No-op if the module is not active.
159 *
160 * An unlocked read here is fine, because in a standby (the only place
161 * where the flag can change in flight) this routine is only called by the
162 * recovery process, which is also the only process which can change the
163 * flag.
164 */
166 return;
167
168 /*
169 * Figure out the latest Xid in this batch: either the last subxid if
170 * there's any, otherwise the parent xid.
171 */
172 if (nsubxids > 0)
173 newestXact = subxids[nsubxids - 1];
174 else
175 newestXact = xid;
176
177 /*
178 * We split the xids to set the timestamp to in groups belonging to the
179 * same SLRU page; the first element in each such set is its head. The
180 * first group has the main XID as the head; subsequent sets use the first
181 * subxid not on the previous page as head. This way, we only have to
182 * lock/modify each SLRU page once.
183 */
184 headxid = xid;
185 i = 0;
186 for (;;)
187 {
189 int j;
190
191 for (j = i; j < nsubxids; j++)
192 {
193 if (TransactionIdToCTsPage(subxids[j]) != pageno)
194 break;
195 }
196 /* subxids[i..j] are on the same page as the head */
197
198 SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
199 pageno);
200
201 /* if we wrote out all subxids, we're done. */
202 if (j >= nsubxids)
203 break;
204
205 /*
206 * Set the new head and skip over it, as well as over the subxids we
207 * just wrote.
208 */
209 headxid = subxids[j];
210 i = j + 1;
211 }
212
213 /* update the cached value in shared memory */
218
219 /* and move forwards our endpoint, if needed */
223}
224
225/*
226 * Record the commit timestamp of transaction entries in the commit log for all
227 * entries on a single page. Atomic only on this page.
228 */
229static void
231 TransactionId *subxids, TimestampTz ts,
232 ReplOriginId nodeid, int64 pageno)
233{
234 LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
235 int slotno;
236 int i;
237
239
240 slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, &xid);
241
242 TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
243 for (i = 0; i < nsubxids; i++)
244 TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
245
246 CommitTsCtl->shared->page_dirty[slotno] = true;
247
248 LWLockRelease(lock);
249}
250
251/*
252 * Sets the commit timestamp of a single transaction.
253 *
254 * Caller must hold the correct SLRU bank lock, will be held at exit
255 */
256static void
258 ReplOriginId nodeid, int slotno)
259{
262
264
265 entry.time = ts;
266 entry.nodeid = nodeid;
267
268 memcpy(CommitTsCtl->shared->page_buffer[slotno] +
271}
272
273/*
274 * Interrogate the commit timestamp of a transaction.
275 *
276 * The return value indicates whether a commit timestamp record was found for
277 * the given xid. The timestamp value is returned in *ts (which may not be
278 * null), and the origin node for the Xid is returned in *nodeid, if it's not
279 * null.
280 */
281bool
283 ReplOriginId *nodeid)
284{
285 int64 pageno = TransactionIdToCTsPage(xid);
287 int slotno;
289 TransactionId oldestCommitTsXid;
290 TransactionId newestCommitTsXid;
291
292 if (!TransactionIdIsValid(xid))
295 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
296 else if (!TransactionIdIsNormal(xid))
297 {
298 /* frozen and bootstrap xids are always committed far in the past */
299 *ts = 0;
300 if (nodeid)
301 *nodeid = 0;
302 return false;
303 }
304
306
307 /* Error if module not enabled */
310
311 /*
312 * If we're asked for the cached value, return that. Otherwise, fall
313 * through to read from SLRU.
314 */
315 if (commitTsShared->xidLastCommit == xid)
316 {
318 if (nodeid)
320
322 return *ts != 0;
323 }
324
325 oldestCommitTsXid = TransamVariables->oldestCommitTsXid;
326 newestCommitTsXid = TransamVariables->newestCommitTsXid;
327 /* neither is invalid, or both are */
328 Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
330
331 /*
332 * Return empty if the requested value is outside our valid range.
333 */
334 if (!TransactionIdIsValid(oldestCommitTsXid) ||
335 TransactionIdPrecedes(xid, oldestCommitTsXid) ||
336 TransactionIdPrecedes(newestCommitTsXid, xid))
337 {
338 *ts = 0;
339 if (nodeid)
340 *nodeid = InvalidReplOriginId;
341 return false;
342 }
343
344 /* lock is acquired by SimpleLruReadPage_ReadOnly */
346 memcpy(&entry,
347 CommitTsCtl->shared->page_buffer[slotno] +
350
351 *ts = entry.time;
352 if (nodeid)
353 *nodeid = entry.nodeid;
354
356 return *ts != 0;
357}
358
359/*
360 * Return the Xid of the latest committed transaction. (As far as this module
361 * is concerned, anyway; it's up to the caller to ensure the value is useful
362 * for its purposes.)
363 *
364 * ts and nodeid are filled with the corresponding data; they can be passed
365 * as NULL if not wanted.
366 */
369{
370 TransactionId xid;
371
373
374 /* Error if module not enabled */
377
379 if (ts)
381 if (nodeid)
384
385 return xid;
386}
387
388static void
390{
393 errmsg("could not get commit timestamp data"),
395 errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
396 "track_commit_timestamp") :
397 errhint("Make sure the configuration parameter \"%s\" is set.",
398 "track_commit_timestamp")));
399}
400
401/*
402 * SQL-callable wrapper to obtain commit time of a transaction
403 */
404Datum
406{
408 TimestampTz ts;
409 bool found;
410
411 found = TransactionIdGetCommitTsData(xid, &ts, NULL);
412
413 if (!found)
415
417}
418
419
420/*
421 * pg_last_committed_xact
422 *
423 * SQL-callable wrapper to obtain some information about the latest
424 * committed transaction: transaction ID, timestamp and replication
425 * origin.
426 */
427Datum
429{
430 TransactionId xid;
431 ReplOriginId nodeid;
432 TimestampTz ts;
433 Datum values[3];
434 bool nulls[3];
435 TupleDesc tupdesc;
436 HeapTuple htup;
437
438 /* and construct a tuple with our data */
439 xid = GetLatestCommitTsData(&ts, &nodeid);
440
441 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
442 elog(ERROR, "return type must be a row type");
443
444 if (!TransactionIdIsNormal(xid))
445 {
446 memset(nulls, true, sizeof(nulls));
447 }
448 else
449 {
451 nulls[0] = false;
452
454 nulls[1] = false;
455
456 values[2] = ObjectIdGetDatum((Oid) nodeid);
457 nulls[2] = false;
458 }
459
460 htup = heap_form_tuple(tupdesc, values, nulls);
461
463}
464
465/*
466 * pg_xact_commit_timestamp_origin
467 *
468 * SQL-callable wrapper to obtain commit timestamp and replication origin
469 * of a given transaction.
470 */
471Datum
473{
475 ReplOriginId nodeid;
476 TimestampTz ts;
477 Datum values[2];
478 bool nulls[2];
479 TupleDesc tupdesc;
480 HeapTuple htup;
481 bool found;
482
483 found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
484
485 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
486 elog(ERROR, "return type must be a row type");
487
488 if (!found)
489 {
490 memset(nulls, true, sizeof(nulls));
491 }
492 else
493 {
495 nulls[0] = false;
496
497 values[1] = ObjectIdGetDatum((Oid) nodeid);
498 nulls[1] = false;
499 }
500
501 htup = heap_form_tuple(tupdesc, values, nulls);
502
504}
505
506/*
507 * Number of shared CommitTS buffers.
508 *
509 * If asked to autotune, use 2MB for every 1GB of shared buffers, up to 8MB.
510 * Otherwise just cap the configured amount to be between 16 and the maximum
511 * allowed.
512 */
513static int
515{
516 /* auto-tune based on shared buffers */
518 return SimpleLruAutotuneBuffers(512, 1024);
519
521}
522
523/*
524 * Register CommitTs shared memory needs at system startup (postmaster start
525 * or standalone backend)
526 */
527static void
529{
530 /* If auto-tuning is requested, now is the time to do it */
532 {
533 char buf[32];
534
535 snprintf(buf, sizeof(buf), "%d", CommitTsShmemBuffers());
536 SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
538
539 /*
540 * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
541 * However, if the DBA explicitly set commit_timestamp_buffers = 0 in
542 * the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override
543 * that and we must force the matter with PGC_S_OVERRIDE.
544 */
545 if (commit_timestamp_buffers == 0) /* failed to apply it? */
546 SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
548 }
551 .name = "commit_timestamp",
552 .Dir = "pg_commit_ts",
553 .long_segment_names = false,
554
555 .nslots = CommitTsShmemBuffers(),
556
557 .PagePrecedes = CommitTsPagePrecedes,
558 .errdetail_for_io_error = commit_ts_errdetail_for_io_error,
559
560 .sync_handler = SYNC_HANDLER_COMMIT_TS,
561 .buffer_tranche_id = LWTRANCHE_COMMITTS_BUFFER,
562 .bank_tranche_id = LWTRANCHE_COMMITTS_SLRU,
563 );
564
565 ShmemRequestStruct(.name = "CommitTs shared",
566 .size = sizeof(CommitTimestampShared),
567 .ptr = (void **) &commitTsShared,
568 );
569}
570
571static void
573{
578
580}
581
582/*
583 * GUC check_hook for commit_timestamp_buffers
584 */
585bool
587{
588 return check_slru_buffers("commit_timestamp_buffers", newval);
589}
590
591/*
592 * This function must be called ONCE on system install.
593 *
594 * (The CommitTs directory is assumed to have been created by initdb, and
595 * CommitTsShmemInit must have been called already.)
596 */
597void
599{
600 /*
601 * Nothing to do here at present, unlike most other SLRU modules; segments
602 * are created when the server is started with this module enabled. See
603 * ActivateCommitTs.
604 */
605}
606
607/*
608 * This must be called ONCE during postmaster or standalone-backend startup,
609 * after StartupXLOG has initialized TransamVariables->nextXid.
610 */
611void
612StartupCommitTs(void)
613{
615}
616
617/*
618 * This must be called ONCE during postmaster or standalone-backend startup,
619 * after recovery has finished.
620 */
621void
623{
624 /*
625 * If the feature is not enabled, turn it off for good. This also removes
626 * any leftover data.
627 *
628 * Conversely, we activate the module if the feature is enabled. This is
629 * necessary for primary and standby as the activation depends on the
630 * control file contents at the beginning of recovery or when a
631 * XLOG_PARAMETER_CHANGE is replayed.
632 */
635 else
637}
638
639/*
640 * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
641 * XLog record during recovery.
642 */
643void
645{
646 /*
647 * If the commit_ts module is disabled in this server and we get word from
648 * the primary server that it is enabled there, activate it so that we can
649 * replay future WAL records involving it; also mark it as active on
650 * pg_control. If the old value was already set, we already did this, so
651 * don't do anything.
652 *
653 * If the module is disabled in the primary, disable it here too, unless
654 * the module is enabled locally.
655 *
656 * Note this only runs in the recovery process, so an unlocked read is
657 * fine.
658 */
659 if (newvalue)
660 {
663 }
666}
667
668/*
669 * Activate this module whenever necessary.
670 * This must happen during postmaster or standalone-backend startup,
671 * or during WAL replay anytime the track_commit_timestamp setting is
672 * changed in the primary.
673 *
674 * The reason why this SLRU needs separate activation/deactivation functions is
675 * that it can be enabled/disabled during start and the activation/deactivation
676 * on the primary is propagated to the standby via replay. Other SLRUs don't
677 * have this property and they can be just initialized during normal startup.
678 *
679 * This is in charge of creating the currently active segment, if it's not
680 * already there. The reason for this is that the server might have been
681 * running with this module disabled for a while and thus might have skipped
682 * the normal creation point.
683 */
684static void
686{
687 TransactionId xid;
688 int64 pageno;
689
690 /*
691 * During bootstrap, we should not register commit timestamps so skip the
692 * activation in this case.
693 */
695 return;
696
697 /* If we've done this already, there's nothing to do */
700 {
702 return;
703 }
705
707 pageno = TransactionIdToCTsPage(xid);
708
709 /*
710 * Re-Initialize our idea of the latest page number.
711 */
712 pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number, pageno);
713
714 /*
715 * If CommitTs is enabled, but it wasn't in the previous server run, we
716 * need to set the oldest and newest values to the next Xid; that way, we
717 * will not try to read data that might not have been set.
718 *
719 * XXX does this have a problem if a server is started with commitTs
720 * enabled, then started with commitTs disabled, then restarted with it
721 * enabled again? It doesn't look like it does, because there should be a
722 * checkpoint that sets the value to InvalidTransactionId at end of
723 * recovery; and so any chance of injecting new transactions without
724 * CommitTs values would occur after the oldestCommitTsXid has been set to
725 * Invalid temporarily.
726 */
729 {
732 }
734
735 /* Create the current segment file, if necessary */
738
739 /* Change the activation status in shared memory. */
743}
744
745/*
746 * Deactivate this module.
747 *
748 * This must be called when the track_commit_timestamp parameter is turned off.
749 * This happens during postmaster or standalone-backend startup, or during WAL
750 * replay.
751 *
752 * Resets CommitTs into invalid state to make sure we don't hand back
753 * possibly-invalid data; also removes segments of old data.
754 */
755static void
757{
758 /*
759 * Cleanup the status in the shared memory.
760 *
761 * We reset everything in the commitTsShared record to prevent user from
762 * getting confusing data about last committed transaction on the standby
763 * when the module was activated repeatedly on the primary.
764 */
766
771
774
775 /*
776 * Remove *all* files. This is necessary so that there are no leftover
777 * files; in the case where this feature is later enabled after running
778 * with it disabled for some time there may be a gap in the file sequence.
779 * (We can probably tolerate out-of-sequence files, as they are going to
780 * be overwritten anyway when we wrap around, but it seems better to be
781 * tidy.)
782 *
783 * Note that we do this with CommitTsLock acquired in exclusive mode. This
784 * is very heavy-handed, but since this routine can only be called in the
785 * replica and should happen very rarely, we don't worry too much about
786 * it. Note also that no process should be consulting this SLRU if we
787 * have just deactivated it.
788 */
790
792}
793
794/*
795 * Perform a checkpoint --- either during shutdown, or on-the-fly
796 */
797void
799{
800 /*
801 * Write dirty CommitTs pages to disk. This may result in sync requests
802 * queued for later handling by ProcessSyncRequests(), as part of the
803 * checkpoint.
804 */
806}
807
808/*
809 * Make sure that CommitTs has room for a newly-allocated XID.
810 *
811 * NB: this is called while holding XidGenLock. We want it to be very fast
812 * most of the time; even when it's not so fast, no actual I/O need happen
813 * unless we're forced to write out a dirty CommitTs or xlog page to make room
814 * in shared memory.
815 *
816 * NB: the current implementation relies on track_commit_timestamp being
817 * PGC_POSTMASTER.
818 */
819void
821{
822 int64 pageno;
823 LWLock *lock;
824
825 /*
826 * Nothing to do if module not enabled. Note we do an unlocked read of
827 * the flag here, which is okay because this routine is only called from
828 * GetNewTransactionId, which is never called in a standby.
829 */
832 return;
833
834 /*
835 * No work except at first XID of a page. But beware: just after
836 * wraparound, the first XID of page zero is FirstNormalTransactionId.
837 */
840 return;
841
843
844 lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
845
847
848 /* Zero the page ... */
850
851 /* and make a WAL entry about that, unless we're in REDO */
852 if (!InRecovery)
854
855 LWLockRelease(lock);
856}
857
858/*
859 * Remove all CommitTs segments before the one holding the passed
860 * transaction ID.
861 *
862 * Note that we don't need to flush XLOG here.
863 */
864void
866{
868
869 /*
870 * The cutoff point is the start of the segment containing oldestXact. We
871 * pass the *page* containing oldestXact to SimpleLruTruncate.
872 */
874
875 /* Check to see if there's any files that could be removed */
877 &cutoffPage))
878 return; /* nothing to remove */
879
880 /* Write XLOG record */
881 WriteTruncateXlogRec(cutoffPage, oldestXact);
882
883 /* Now we can remove the old CommitTs segment(s) */
885}
886
887/*
888 * Set the limit values between which commit TS can be consulted.
889 */
890void
892{
893 /*
894 * Be careful not to overwrite values that are either further into the
895 * "future" or signal a disabled committs.
896 */
899 {
904 }
905 else
906 {
910 }
912}
913
914/*
915 * Move forwards the oldest commitTS value that can be consulted
916 */
917void
919{
925}
926
927
928/*
929 * Decide whether a commitTS page number is "older" for truncation purposes.
930 * Analogous to CLOGPagePrecedes().
931 *
932 * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128. This
933 * introduces differences compared to CLOG and the other SLRUs having (1 <<
934 * 31) % per_page == 0. This function never tests exactly
935 * TransactionIdPrecedes(x-2^31, x). When the system reaches xidStopLimit,
936 * there are two possible counts of page boundaries between oldestXact and the
937 * latest XID assigned, depending on whether oldestXact is within the first
938 * 128 entries of its page. Since this function doesn't know the location of
939 * oldestXact within page2, it returns false for one page that actually is
940 * expendable. This is a wider (yet still negligible) version of the
941 * truncation opportunity that CLOGPagePrecedes() cannot recognize.
942 *
943 * For the sake of a worked example, number entries with decimal values such
944 * that page1==1 entries range from 1.0 to 1.999. Let N+0.15 be the number of
945 * pages that 2^31 entries will span (N is an integer). If oldestXact=N+2.1,
946 * then the final safe XID assignment leaves newestXact=1.95. We keep page 2,
947 * because entry=2.85 is the border that toggles whether entries precede the
948 * last entry of the oldestXact page. While page 2 is expendable at
949 * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
950 */
951static bool
953{
956
961
962 return (TransactionIdPrecedes(xid1, xid2) &&
964}
965
966static int
968{
969 TransactionId xid = *(const TransactionId *) opaque_data;
970
971 return errdetail("Could not access commit timestamp of transaction %u.", xid);
972}
973
974/*
975 * Write a TRUNCATE xlog record
976 */
977static void
979{
981
982 xlrec.pageno = pageno;
983 xlrec.oldestXid = oldestXid;
984
988}
989
990/*
991 * CommitTS resource manager's routines
992 */
993void
995{
996 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
997
998 /* Backup blocks are not used in commit_ts records */
1000
1001 if (info == COMMIT_TS_ZEROPAGE)
1002 {
1003 int64 pageno;
1004
1005 memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
1007 }
1008 else if (info == COMMIT_TS_TRUNCATE)
1009 {
1011
1012 AdvanceOldestCommitTsXid(trunc->oldestXid);
1013
1014 /*
1015 * During XLOG replay, latest_page_number isn't set up yet; insert a
1016 * suitable value to bypass the sanity test in SimpleLruTruncate.
1017 */
1018 pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number,
1019 trunc->pageno);
1020
1022 }
1023 else
1024 elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1025}
1026
1027/*
1028 * Entrypoint for sync.c to sync commit_ts files.
1029 */
1030int
1031committssyncfiletag(const FileTag *ftag, char *path)
1032{
1033 return SlruSyncFileTag(CommitTsCtl, ftag, path);
1034}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:485
static Datum values[MAXATTR]
Definition bootstrap.c:190
#define Min(x, y)
Definition c.h:1091
uint8_t uint8
Definition c.h:622
#define Max(x, y)
Definition c.h:1085
#define Assert(condition)
Definition c.h:943
int64_t int64
Definition c.h:621
uint32 TransactionId
Definition c.h:736
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, ReplOriginId nodeid, int64 pageno)
Definition commit_ts.c:231
void StartupCommitTs(void)
Definition commit_ts.c:613
Datum pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
Definition commit_ts.c:473
static int commit_ts_errdetail_for_io_error(const void *opaque_data)
Definition commit_ts.c:968
Datum pg_last_committed_xact(PG_FUNCTION_ARGS)
Definition commit_ts.c:429
static SlruDesc CommitTsSlruDesc
Definition commit_ts.c:94
void CommitTsParameterChange(bool newvalue, bool oldvalue)
Definition commit_ts.c:645
#define COMMIT_TS_XACTS_PER_PAGE
Definition commit_ts.c:64
#define TransactionIdToCTsEntry(xid)
Definition commit_ts.c:78
static void DeactivateCommitTs(void)
Definition commit_ts.c:757
bool track_commit_timestamp
Definition commit_ts.c:121
void AdvanceOldestCommitTsXid(TransactionId oldestXact)
Definition commit_ts.c:919
static void CommitTsShmemInit(void *arg)
Definition commit_ts.c:573
static CommitTimestampShared * commitTsShared
Definition commit_ts.c:116
int committssyncfiletag(const FileTag *ftag, char *path)
Definition commit_ts.c:1032
void CompleteCommitTsInitialization(void)
Definition commit_ts.c:623
void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, ReplOriginId nodeid)
Definition commit_ts.c:150
bool check_commit_ts_buffers(int *newval, void **extra, GucSource source)
Definition commit_ts.c:587
static void ActivateCommitTs(void)
Definition commit_ts.c:686
static int64 TransactionIdToCTsPage(TransactionId xid)
Definition commit_ts.c:73
void TruncateCommitTs(TransactionId oldestXact)
Definition commit_ts.c:866
void commit_ts_redo(XLogReaderState *record)
Definition commit_ts.c:995
Datum pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
Definition commit_ts.c:406
static int CommitTsShmemBuffers(void)
Definition commit_ts.c:515
static void error_commit_ts_disabled(void)
Definition commit_ts.c:390
static bool CommitTsPagePrecedes(int64 page1, int64 page2)
Definition commit_ts.c:953
#define SizeOfCommitTimestampEntry
Definition commit_ts.c:61
void BootStrapCommitTs(void)
Definition commit_ts.c:599
void SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
Definition commit_ts.c:892
#define CommitTsCtl
Definition commit_ts.c:96
TransactionId GetLatestCommitTsData(TimestampTz *ts, ReplOriginId *nodeid)
Definition commit_ts.c:369
void ExtendCommitTs(TransactionId newestXact)
Definition commit_ts.c:821
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, ReplOriginId *nodeid)
Definition commit_ts.c:283
static void CommitTsShmemRequest(void *arg)
Definition commit_ts.c:529
const ShmemCallbacks CommitTsShmemCallbacks
Definition commit_ts.c:89
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, ReplOriginId nodeid, int slotno)
Definition commit_ts.c:258
static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid)
Definition commit_ts.c:979
void CheckPointCommitTs(void)
Definition commit_ts.c:799
#define COMMIT_TS_ZEROPAGE
Definition commit_ts.h:44
#define SizeOfCommitTsTruncate
Definition commit_ts.h:53
#define COMMIT_TS_TRUNCATE
Definition commit_ts.h:45
int64 TimestampTz
Definition timestamp.h:39
#define TIMESTAMP_NOBEGIN(j)
Definition timestamp.h:159
Datum arg
Definition elog.c:1322
int errcode(int sqlerrcode)
Definition elog.c:874
int errhint(const char *fmt,...) pg_attribute_printf(1
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define PANIC
Definition elog.h:44
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define ereport(elevel,...)
Definition elog.h:152
#define PG_RETURN_NULL()
Definition fmgr.h:346
#define PG_GETARG_TRANSACTIONID(n)
Definition fmgr.h:280
#define PG_RETURN_DATUM(x)
Definition fmgr.h:354
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition funcapi.c:276
@ TYPEFUNC_COMPOSITE
Definition funcapi.h:149
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
Definition funcapi.h:230
int commit_timestamp_buffers
Definition globals.c:164
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition guc.c:4234
#define newval
GucSource
Definition guc.h:112
@ PGC_S_DYNAMIC_DEFAULT
Definition guc.h:114
@ PGC_S_OVERRIDE
Definition guc.h:123
@ PGC_POSTMASTER
Definition guc.h:74
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1025
int j
Definition isn.c:78
int i
Definition isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_SHARED
Definition lwlock.h:105
@ LW_EXCLUSIVE
Definition lwlock.h:104
#define IsBootstrapProcessingMode()
Definition miscadmin.h:495
static char * errmsg
#define InvalidReplOriginId
Definition origin.h:33
static rewind_source * source
Definition pg_rewind.c:89
static char buf[DEFAULT_XLOG_SEG_SIZE]
int64 timestamp
#define snprintf
Definition port.h:260
static Datum TransactionIdGetDatum(TransactionId X)
Definition postgres.h:292
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
uint64_t Datum
Definition postgres.h:70
unsigned int Oid
static int fb(int x)
#define ShmemRequestStruct(...)
Definition shmem.h:176
bool SlruScanDirectory(SlruDesc *ctl, SlruScanCallback callback, void *data)
Definition slru.c:1844
int SimpleLruReadPage_ReadOnly(SlruDesc *ctl, int64 pageno, const void *opaque_data)
Definition slru.c:654
int SimpleLruAutotuneBuffers(int divisor, int max)
Definition slru.c:235
bool SlruScanDirCbReportPresence(SlruDesc *ctl, char *filename, int64 segpage, void *data)
Definition slru.c:1765
void SimpleLruTruncate(SlruDesc *ctl, int64 cutoffPage)
Definition slru.c:1458
void SimpleLruZeroAndWritePage(SlruDesc *ctl, int64 pageno)
Definition slru.c:466
int SimpleLruZeroPage(SlruDesc *ctl, int64 pageno)
Definition slru.c:397
bool SimpleLruDoesPhysicalPageExist(SlruDesc *ctl, int64 pageno)
Definition slru.c:795
bool SlruScanDirCbDeleteAll(SlruDesc *ctl, char *filename, int64 segpage, void *data)
Definition slru.c:1797
void SimpleLruWriteAll(SlruDesc *ctl, bool allow_redirtied)
Definition slru.c:1372
int SimpleLruReadPage(SlruDesc *ctl, int64 pageno, bool write_ok, const void *opaque_data)
Definition slru.c:550
bool check_slru_buffers(const char *name, int *newval)
Definition slru.c:377
int SlruSyncFileTag(SlruDesc *ctl, const FileTag *ftag, char *path)
Definition slru.c:1884
#define SlruPagePrecedesUnitTests(ctl, per_page)
Definition slru.h:233
#define SimpleLruRequest(...)
Definition slru.h:218
#define SLRU_MAX_ALLOWED_BUFFERS
Definition slru.h:26
static LWLock * SimpleLruGetBankLock(SlruDesc *ctl, int64 pageno)
Definition slru.h:207
TimestampTz time
Definition commit_ts.c:57
ReplOriginId nodeid
Definition commit_ts.c:58
CommitTimestampEntry dataLastCommit
Definition commit_ts.c:112
TransactionId xidLastCommit
Definition commit_ts.c:111
Definition sync.h:51
ShmemRequestCallback request_fn
Definition shmem.h:133
TransactionId oldestCommitTsXid
Definition transam.h:232
TransactionId newestCommitTsXid
Definition transam.h:233
FullTransactionId nextXid
Definition transam.h:220
@ SYNC_HANDLER_COMMIT_TS
Definition sync.h:39
static TransactionId ReadNextTransactionId(void)
Definition transam.h:375
#define InvalidTransactionId
Definition transam.h:31
#define TransactionIdEquals(id1, id2)
Definition transam.h:43
#define XidFromFullTransactionId(x)
Definition transam.h:48
#define FirstNormalTransactionId
Definition transam.h:34
#define TransactionIdIsValid(xid)
Definition transam.h:41
#define TransactionIdIsNormal(xid)
Definition transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263
static Datum TimestampTzGetDatum(TimestampTz X)
Definition timestamp.h:52
#define PG_RETURN_TIMESTAMPTZ(x)
Definition timestamp.h:68
TransamVariablesData * TransamVariables
Definition varsup.c:37
const char * name
bool RecoveryInProgress(void)
Definition xlog.c:6830
uint16 ReplOriginId
Definition xlogdefs.h:69
XLogRecPtr XLogSimpleInsertInt64(RmgrId rmid, uint8 info, int64 value)
Definition xloginsert.c:547
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition xloginsert.c:482
void XLogRegisterData(const void *data, uint32 len)
Definition xloginsert.c:372
void XLogBeginInsert(void)
Definition xloginsert.c:153
#define XLogRecGetInfo(decoder)
Definition xlogreader.h:410
#define XLogRecGetData(decoder)
Definition xlogreader.h:415
#define XLogRecHasAnyBlockRefs(decoder)
Definition xlogreader.h:417
bool InRecovery
Definition xlogutils.c:50

◆ CommitTsCtl

#define CommitTsCtl   (&CommitTsSlruDesc)

Definition at line 96 of file commit_ts.c.

◆ SizeOfCommitTimestampEntry

#define SizeOfCommitTimestampEntry
Value:

Definition at line 61 of file commit_ts.c.

◆ TransactionIdToCTsEntry

#define TransactionIdToCTsEntry (   xid)     ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)

Definition at line 78 of file commit_ts.c.

Typedef Documentation

◆ CommitTimestampEntry

◆ CommitTimestampShared

Function Documentation

◆ ActivateCommitTs()

static void ActivateCommitTs ( void  )
static

Definition at line 686 of file commit_ts.c.

687{
688 TransactionId xid;
689 int64 pageno;
690
691 /*
692 * During bootstrap, we should not register commit timestamps so skip the
693 * activation in this case.
694 */
696 return;
697
698 /* If we've done this already, there's nothing to do */
701 {
703 return;
704 }
706
708 pageno = TransactionIdToCTsPage(xid);
709
710 /*
711 * Re-Initialize our idea of the latest page number.
712 */
713 pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number, pageno);
714
715 /*
716 * If CommitTs is enabled, but it wasn't in the previous server run, we
717 * need to set the oldest and newest values to the next Xid; that way, we
718 * will not try to read data that might not have been set.
719 *
720 * XXX does this have a problem if a server is started with commitTs
721 * enabled, then started with commitTs disabled, then restarted with it
722 * enabled again? It doesn't look like it does, because there should be a
723 * checkpoint that sets the value to InvalidTransactionId at end of
724 * recovery; and so any chance of injecting new transactions without
725 * CommitTs values would occur after the oldestCommitTsXid has been set to
726 * Invalid temporarily.
727 */
730 {
733 }
735
736 /* Create the current segment file, if necessary */
739
740 /* Change the activation status in shared memory. */
744}

References CommitTimestampShared::commitTsActive, CommitTsCtl, commitTsShared, fb(), InvalidTransactionId, IsBootstrapProcessingMode, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), TransamVariablesData::newestCommitTsXid, TransamVariablesData::nextXid, TransamVariablesData::oldestCommitTsXid, pg_atomic_write_u64(), ReadNextTransactionId(), SimpleLruDoesPhysicalPageExist(), SimpleLruZeroAndWritePage(), TransactionIdToCTsPage(), TransamVariables, and XidFromFullTransactionId.

Referenced by CommitTsParameterChange(), CompleteCommitTsInitialization(), and StartupCommitTs().

◆ AdvanceOldestCommitTsXid()

◆ BootStrapCommitTs()

void BootStrapCommitTs ( void  )

Definition at line 599 of file commit_ts.c.

600{
601 /*
602 * Nothing to do here at present, unlike most other SLRU modules; segments
603 * are created when the server is started with this module enabled. See
604 * ActivateCommitTs.
605 */
606}

Referenced by BootStrapXLOG().

◆ check_commit_ts_buffers()

bool check_commit_ts_buffers ( int newval,
void **  extra,
GucSource  source 
)

Definition at line 587 of file commit_ts.c.

588{
589 return check_slru_buffers("commit_timestamp_buffers", newval);
590}

References check_slru_buffers(), and newval.

◆ CheckPointCommitTs()

void CheckPointCommitTs ( void  )

Definition at line 799 of file commit_ts.c.

800{
801 /*
802 * Write dirty CommitTs pages to disk. This may result in sync requests
803 * queued for later handling by ProcessSyncRequests(), as part of the
804 * checkpoint.
805 */
807}

References CommitTsCtl, and SimpleLruWriteAll().

Referenced by CheckPointGuts().

◆ commit_ts_errdetail_for_io_error()

static int commit_ts_errdetail_for_io_error ( const void opaque_data)
static

Definition at line 968 of file commit_ts.c.

969{
970 TransactionId xid = *(const TransactionId *) opaque_data;
971
972 return errdetail("Could not access commit timestamp of transaction %u.", xid);
973}

References errdetail(), and fb().

Referenced by CommitTsShmemRequest().

◆ commit_ts_redo()

void commit_ts_redo ( XLogReaderState record)

Definition at line 995 of file commit_ts.c.

996{
997 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
998
999 /* Backup blocks are not used in commit_ts records */
1001
1002 if (info == COMMIT_TS_ZEROPAGE)
1003 {
1004 int64 pageno;
1005
1006 memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
1008 }
1009 else if (info == COMMIT_TS_TRUNCATE)
1010 {
1012
1013 AdvanceOldestCommitTsXid(trunc->oldestXid);
1014
1015 /*
1016 * During XLOG replay, latest_page_number isn't set up yet; insert a
1017 * suitable value to bypass the sanity test in SimpleLruTruncate.
1018 */
1019 pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number,
1020 trunc->pageno);
1021
1023 }
1024 else
1025 elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1026}

References AdvanceOldestCommitTsXid(), Assert, COMMIT_TS_TRUNCATE, COMMIT_TS_ZEROPAGE, CommitTsCtl, elog, fb(), memcpy(), PANIC, pg_atomic_write_u64(), SimpleLruTruncate(), SimpleLruZeroAndWritePage(), XLogRecGetData, XLogRecGetInfo, and XLogRecHasAnyBlockRefs.

◆ CommitTsPagePrecedes()

◆ CommitTsParameterChange()

void CommitTsParameterChange ( bool  newvalue,
bool  oldvalue 
)

Definition at line 645 of file commit_ts.c.

646{
647 /*
648 * If the commit_ts module is disabled in this server and we get word from
649 * the primary server that it is enabled there, activate it so that we can
650 * replay future WAL records involving it; also mark it as active on
651 * pg_control. If the old value was already set, we already did this, so
652 * don't do anything.
653 *
654 * If the module is disabled in the primary, disable it here too, unless
655 * the module is enabled locally.
656 *
657 * Note this only runs in the recovery process, so an unlocked read is
658 * fine.
659 */
660 if (newvalue)
661 {
664 }
667}

References ActivateCommitTs(), CommitTimestampShared::commitTsActive, commitTsShared, DeactivateCommitTs(), and fb().

Referenced by xlog_redo().

◆ CommitTsShmemBuffers()

static int CommitTsShmemBuffers ( void  )
static

Definition at line 515 of file commit_ts.c.

516{
517 /* auto-tune based on shared buffers */
519 return SimpleLruAutotuneBuffers(512, 1024);
520
522}

References commit_timestamp_buffers, Max, Min, SimpleLruAutotuneBuffers(), and SLRU_MAX_ALLOWED_BUFFERS.

Referenced by CommitTsShmemRequest().

◆ CommitTsShmemInit()

◆ CommitTsShmemRequest()

static void CommitTsShmemRequest ( void arg)
static

Definition at line 529 of file commit_ts.c.

530{
531 /* If auto-tuning is requested, now is the time to do it */
533 {
534 char buf[32];
535
536 snprintf(buf, sizeof(buf), "%d", CommitTsShmemBuffers());
537 SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
539
540 /*
541 * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
542 * However, if the DBA explicitly set commit_timestamp_buffers = 0 in
543 * the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override
544 * that and we must force the matter with PGC_S_OVERRIDE.
545 */
546 if (commit_timestamp_buffers == 0) /* failed to apply it? */
547 SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
549 }
552 .name = "commit_timestamp",
553 .Dir = "pg_commit_ts",
554 .long_segment_names = false,
555
556 .nslots = CommitTsShmemBuffers(),
557
558 .PagePrecedes = CommitTsPagePrecedes,
559 .errdetail_for_io_error = commit_ts_errdetail_for_io_error,
560
561 .sync_handler = SYNC_HANDLER_COMMIT_TS,
562 .buffer_tranche_id = LWTRANCHE_COMMITTS_BUFFER,
563 .bank_tranche_id = LWTRANCHE_COMMITTS_SLRU,
564 );
565
566 ShmemRequestStruct(.name = "CommitTs shared",
567 .size = sizeof(CommitTimestampShared),
568 .ptr = (void **) &commitTsShared,
569 );
570}

References Assert, buf, commit_timestamp_buffers, commit_ts_errdetail_for_io_error(), CommitTsPagePrecedes(), commitTsShared, CommitTsShmemBuffers(), CommitTsSlruDesc, fb(), name, PGC_POSTMASTER, PGC_S_DYNAMIC_DEFAULT, PGC_S_OVERRIDE, SetConfigOption(), ShmemRequestStruct, SimpleLruRequest, snprintf, and SYNC_HANDLER_COMMIT_TS.

◆ committssyncfiletag()

int committssyncfiletag ( const FileTag ftag,
char path 
)

Definition at line 1032 of file commit_ts.c.

1033{
1034 return SlruSyncFileTag(CommitTsCtl, ftag, path);
1035}

References CommitTsCtl, and SlruSyncFileTag().

◆ CompleteCommitTsInitialization()

void CompleteCommitTsInitialization ( void  )

Definition at line 623 of file commit_ts.c.

624{
625 /*
626 * If the feature is not enabled, turn it off for good. This also removes
627 * any leftover data.
628 *
629 * Conversely, we activate the module if the feature is enabled. This is
630 * necessary for primary and standby as the activation depends on the
631 * control file contents at the beginning of recovery or when a
632 * XLOG_PARAMETER_CHANGE is replayed.
633 */
636 else
638}

References ActivateCommitTs(), DeactivateCommitTs(), and track_commit_timestamp.

Referenced by StartupXLOG().

◆ DeactivateCommitTs()

static void DeactivateCommitTs ( void  )
static

Definition at line 757 of file commit_ts.c.

758{
759 /*
760 * Cleanup the status in the shared memory.
761 *
762 * We reset everything in the commitTsShared record to prevent user from
763 * getting confusing data about last committed transaction on the standby
764 * when the module was activated repeatedly on the primary.
765 */
767
772
775
776 /*
777 * Remove *all* files. This is necessary so that there are no leftover
778 * files; in the case where this feature is later enabled after running
779 * with it disabled for some time there may be a gap in the file sequence.
780 * (We can probably tolerate out-of-sequence files, as they are going to
781 * be overwritten anyway when we wrap around, but it seems better to be
782 * tidy.)
783 *
784 * Note that we do this with CommitTsLock acquired in exclusive mode. This
785 * is very heavy-handed, but since this routine can only be called in the
786 * replica and should happen very rarely, we don't worry too much about
787 * it. Note also that no process should be consulting this SLRU if we
788 * have just deactivated it.
789 */
791
793}

References CommitTimestampShared::commitTsActive, CommitTsCtl, commitTsShared, CommitTimestampShared::dataLastCommit, fb(), InvalidReplOriginId, InvalidTransactionId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), TransamVariablesData::newestCommitTsXid, CommitTimestampEntry::nodeid, TransamVariablesData::oldestCommitTsXid, SlruScanDirCbDeleteAll(), SlruScanDirectory(), CommitTimestampEntry::time, TIMESTAMP_NOBEGIN, TransamVariables, and CommitTimestampShared::xidLastCommit.

Referenced by CommitTsParameterChange(), and CompleteCommitTsInitialization().

◆ error_commit_ts_disabled()

static void error_commit_ts_disabled ( void  )
static

Definition at line 390 of file commit_ts.c.

391{
394 errmsg("could not get commit timestamp data"),
396 errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
397 "track_commit_timestamp") :
398 errhint("Make sure the configuration parameter \"%s\" is set.",
399 "track_commit_timestamp")));
400}

References ereport, errcode(), errhint(), errmsg, ERROR, fb(), and RecoveryInProgress().

Referenced by GetLatestCommitTsData(), and TransactionIdGetCommitTsData().

◆ ExtendCommitTs()

void ExtendCommitTs ( TransactionId  newestXact)

Definition at line 821 of file commit_ts.c.

822{
823 int64 pageno;
824 LWLock *lock;
825
826 /*
827 * Nothing to do if module not enabled. Note we do an unlocked read of
828 * the flag here, which is okay because this routine is only called from
829 * GetNewTransactionId, which is never called in a standby.
830 */
833 return;
834
835 /*
836 * No work except at first XID of a page. But beware: just after
837 * wraparound, the first XID of page zero is FirstNormalTransactionId.
838 */
841 return;
842
844
845 lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
846
848
849 /* Zero the page ... */
851
852 /* and make a WAL entry about that, unless we're in REDO */
853 if (!InRecovery)
855
856 LWLockRelease(lock);
857}

References Assert, COMMIT_TS_ZEROPAGE, CommitTimestampShared::commitTsActive, CommitTsCtl, commitTsShared, fb(), FirstNormalTransactionId, InRecovery, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SimpleLruGetBankLock(), SimpleLruZeroPage(), TransactionIdEquals, TransactionIdToCTsEntry, TransactionIdToCTsPage(), and XLogSimpleInsertInt64().

Referenced by GetNewTransactionId().

◆ GetLatestCommitTsData()

◆ pg_last_committed_xact()

Datum pg_last_committed_xact ( PG_FUNCTION_ARGS  )

Definition at line 429 of file commit_ts.c.

430{
431 TransactionId xid;
432 ReplOriginId nodeid;
433 TimestampTz ts;
434 Datum values[3];
435 bool nulls[3];
436 TupleDesc tupdesc;
437 HeapTuple htup;
438
439 /* and construct a tuple with our data */
440 xid = GetLatestCommitTsData(&ts, &nodeid);
441
442 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
443 elog(ERROR, "return type must be a row type");
444
445 if (!TransactionIdIsNormal(xid))
446 {
447 memset(nulls, true, sizeof(nulls));
448 }
449 else
450 {
452 nulls[0] = false;
453
455 nulls[1] = false;
456
457 values[2] = ObjectIdGetDatum((Oid) nodeid);
458 nulls[2] = false;
459 }
460
461 htup = heap_form_tuple(tupdesc, values, nulls);
462
464}

References elog, ERROR, fb(), get_call_result_type(), GetLatestCommitTsData(), heap_form_tuple(), HeapTupleGetDatum(), ObjectIdGetDatum(), PG_RETURN_DATUM, TimestampTzGetDatum(), TransactionIdGetDatum(), TransactionIdIsNormal, TYPEFUNC_COMPOSITE, and values.

◆ pg_xact_commit_timestamp()

Datum pg_xact_commit_timestamp ( PG_FUNCTION_ARGS  )

Definition at line 406 of file commit_ts.c.

407{
409 TimestampTz ts;
410 bool found;
411
412 found = TransactionIdGetCommitTsData(xid, &ts, NULL);
413
414 if (!found)
416
418}

References fb(), PG_GETARG_TRANSACTIONID, PG_RETURN_NULL, PG_RETURN_TIMESTAMPTZ, and TransactionIdGetCommitTsData().

◆ pg_xact_commit_timestamp_origin()

Datum pg_xact_commit_timestamp_origin ( PG_FUNCTION_ARGS  )

Definition at line 473 of file commit_ts.c.

474{
476 ReplOriginId nodeid;
477 TimestampTz ts;
478 Datum values[2];
479 bool nulls[2];
480 TupleDesc tupdesc;
481 HeapTuple htup;
482 bool found;
483
484 found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
485
486 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
487 elog(ERROR, "return type must be a row type");
488
489 if (!found)
490 {
491 memset(nulls, true, sizeof(nulls));
492 }
493 else
494 {
496 nulls[0] = false;
497
498 values[1] = ObjectIdGetDatum((Oid) nodeid);
499 nulls[1] = false;
500 }
501
502 htup = heap_form_tuple(tupdesc, values, nulls);
503
505}

References elog, ERROR, fb(), get_call_result_type(), heap_form_tuple(), HeapTupleGetDatum(), ObjectIdGetDatum(), PG_GETARG_TRANSACTIONID, PG_RETURN_DATUM, TimestampTzGetDatum(), TransactionIdGetCommitTsData(), TYPEFUNC_COMPOSITE, and values.

◆ SetCommitTsLimit()

void SetCommitTsLimit ( TransactionId  oldestXact,
TransactionId  newestXact 
)

◆ SetXidCommitTsInPage()

static void SetXidCommitTsInPage ( TransactionId  xid,
int  nsubxids,
TransactionId subxids,
TimestampTz  ts,
ReplOriginId  nodeid,
int64  pageno 
)
static

Definition at line 231 of file commit_ts.c.

234{
235 LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
236 int slotno;
237 int i;
238
240
241 slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, &xid);
242
243 TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
244 for (i = 0; i < nsubxids; i++)
245 TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
246
247 CommitTsCtl->shared->page_dirty[slotno] = true;
248
249 LWLockRelease(lock);
250}

References CommitTsCtl, fb(), i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SimpleLruGetBankLock(), SimpleLruReadPage(), and TransactionIdSetCommitTs().

Referenced by TransactionTreeSetCommitTsData().

◆ StartupCommitTs()

void StartupCommitTs ( void  )

Definition at line 613 of file commit_ts.c.

614{
616}

References ActivateCommitTs().

Referenced by StartupXLOG().

◆ TransactionIdGetCommitTsData()

bool TransactionIdGetCommitTsData ( TransactionId  xid,
TimestampTz ts,
ReplOriginId nodeid 
)

Definition at line 283 of file commit_ts.c.

285{
286 int64 pageno = TransactionIdToCTsPage(xid);
288 int slotno;
290 TransactionId oldestCommitTsXid;
291 TransactionId newestCommitTsXid;
292
293 if (!TransactionIdIsValid(xid))
296 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
297 else if (!TransactionIdIsNormal(xid))
298 {
299 /* frozen and bootstrap xids are always committed far in the past */
300 *ts = 0;
301 if (nodeid)
302 *nodeid = 0;
303 return false;
304 }
305
307
308 /* Error if module not enabled */
311
312 /*
313 * If we're asked for the cached value, return that. Otherwise, fall
314 * through to read from SLRU.
315 */
316 if (commitTsShared->xidLastCommit == xid)
317 {
319 if (nodeid)
321
323 return *ts != 0;
324 }
325
326 oldestCommitTsXid = TransamVariables->oldestCommitTsXid;
327 newestCommitTsXid = TransamVariables->newestCommitTsXid;
328 /* neither is invalid, or both are */
329 Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
331
332 /*
333 * Return empty if the requested value is outside our valid range.
334 */
335 if (!TransactionIdIsValid(oldestCommitTsXid) ||
336 TransactionIdPrecedes(xid, oldestCommitTsXid) ||
337 TransactionIdPrecedes(newestCommitTsXid, xid))
338 {
339 *ts = 0;
340 if (nodeid)
341 *nodeid = InvalidReplOriginId;
342 return false;
343 }
344
345 /* lock is acquired by SimpleLruReadPage_ReadOnly */
347 memcpy(&entry,
348 CommitTsCtl->shared->page_buffer[slotno] +
351
352 *ts = entry.time;
353 if (nodeid)
354 *nodeid = entry.nodeid;
355
357 return *ts != 0;
358}

References Assert, CommitTimestampShared::commitTsActive, CommitTsCtl, commitTsShared, CommitTimestampShared::dataLastCommit, ereport, errcode(), errmsg, ERROR, error_commit_ts_disabled(), fb(), InvalidReplOriginId, LW_SHARED, LWLockAcquire(), LWLockRelease(), memcpy(), TransamVariablesData::newestCommitTsXid, CommitTimestampEntry::nodeid, TransamVariablesData::oldestCommitTsXid, SimpleLruGetBankLock(), SimpleLruReadPage_ReadOnly(), SizeOfCommitTimestampEntry, CommitTimestampEntry::time, TransactionIdIsNormal, TransactionIdIsValid, TransactionIdPrecedes(), TransactionIdToCTsEntry, TransactionIdToCTsPage(), TransamVariables, and CommitTimestampShared::xidLastCommit.

Referenced by GetTupleTransactionInfo(), pg_xact_commit_timestamp(), pg_xact_commit_timestamp_origin(), and update_most_recent_deletion_info().

◆ TransactionIdSetCommitTs()

static void TransactionIdSetCommitTs ( TransactionId  xid,
TimestampTz  ts,
ReplOriginId  nodeid,
int  slotno 
)
static

◆ TransactionIdToCTsPage()

static int64 TransactionIdToCTsPage ( TransactionId  xid)
inlinestatic

◆ TransactionTreeSetCommitTsData()

void TransactionTreeSetCommitTsData ( TransactionId  xid,
int  nsubxids,
TransactionId subxids,
TimestampTz  timestamp,
ReplOriginId  nodeid 
)

Definition at line 150 of file commit_ts.c.

153{
154 int i;
157
158 /*
159 * No-op if the module is not active.
160 *
161 * An unlocked read here is fine, because in a standby (the only place
162 * where the flag can change in flight) this routine is only called by the
163 * recovery process, which is also the only process which can change the
164 * flag.
165 */
167 return;
168
169 /*
170 * Figure out the latest Xid in this batch: either the last subxid if
171 * there's any, otherwise the parent xid.
172 */
173 if (nsubxids > 0)
174 newestXact = subxids[nsubxids - 1];
175 else
176 newestXact = xid;
177
178 /*
179 * We split the xids to set the timestamp to in groups belonging to the
180 * same SLRU page; the first element in each such set is its head. The
181 * first group has the main XID as the head; subsequent sets use the first
182 * subxid not on the previous page as head. This way, we only have to
183 * lock/modify each SLRU page once.
184 */
185 headxid = xid;
186 i = 0;
187 for (;;)
188 {
190 int j;
191
192 for (j = i; j < nsubxids; j++)
193 {
194 if (TransactionIdToCTsPage(subxids[j]) != pageno)
195 break;
196 }
197 /* subxids[i..j] are on the same page as the head */
198
199 SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
200 pageno);
201
202 /* if we wrote out all subxids, we're done. */
203 if (j >= nsubxids)
204 break;
205
206 /*
207 * Set the new head and skip over it, as well as over the subxids we
208 * just wrote.
209 */
210 headxid = subxids[j];
211 i = j + 1;
212 }
213
214 /* update the cached value in shared memory */
219
220 /* and move forwards our endpoint, if needed */
224}

References CommitTimestampShared::commitTsActive, commitTsShared, CommitTimestampShared::dataLastCommit, fb(), i, j, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), TransamVariablesData::newestCommitTsXid, CommitTimestampEntry::nodeid, SetXidCommitTsInPage(), CommitTimestampEntry::time, TransactionIdPrecedes(), TransactionIdToCTsPage(), TransamVariables, and CommitTimestampShared::xidLastCommit.

Referenced by RecordTransactionCommit(), RecordTransactionCommitPrepared(), and xact_redo_commit().

◆ TruncateCommitTs()

void TruncateCommitTs ( TransactionId  oldestXact)

Definition at line 866 of file commit_ts.c.

867{
869
870 /*
871 * The cutoff point is the start of the segment containing oldestXact. We
872 * pass the *page* containing oldestXact to SimpleLruTruncate.
873 */
875
876 /* Check to see if there's any files that could be removed */
878 &cutoffPage))
879 return; /* nothing to remove */
880
881 /* Write XLOG record */
882 WriteTruncateXlogRec(cutoffPage, oldestXact);
883
884 /* Now we can remove the old CommitTs segment(s) */
886}

References CommitTsCtl, fb(), SimpleLruTruncate(), SlruScanDirCbReportPresence(), SlruScanDirectory(), TransactionIdToCTsPage(), and WriteTruncateXlogRec().

Referenced by vac_truncate_clog().

◆ WriteTruncateXlogRec()

static void WriteTruncateXlogRec ( int64  pageno,
TransactionId  oldestXid 
)
static

Variable Documentation

◆ commitTsShared

◆ CommitTsShmemCallbacks

const ShmemCallbacks CommitTsShmemCallbacks
Initial value:
= {
.request_fn = CommitTsShmemRequest,
.init_fn = CommitTsShmemInit,
}

Definition at line 89 of file commit_ts.c.

89 {
90 .request_fn = CommitTsShmemRequest,
91 .init_fn = CommitTsShmemInit,
92};

◆ CommitTsSlruDesc

SlruDesc CommitTsSlruDesc
static

Definition at line 94 of file commit_ts.c.

Referenced by CommitTsShmemRequest().

◆ track_commit_timestamp