PostgreSQL Source Code  git master
commit_ts.c File Reference
#include "postgres.h"
#include "access/commit_ts.h"
#include "access/htup_details.h"
#include "access/slru.h"
#include "access/transam.h"
#include "access/xloginsert.h"
#include "access/xlogutils.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "storage/shmem.h"
#include "utils/fmgrprotos.h"
#include "utils/guc_hooks.h"
#include "utils/timestamp.h"
Include dependency graph for commit_ts.c:

Go to the source code of this file.

Data Structures

struct  CommitTimestampEntry
 
struct  CommitTimestampShared
 

Macros

#define SizeOfCommitTimestampEntry
 
#define COMMIT_TS_XACTS_PER_PAGE    (BLCKSZ / SizeOfCommitTimestampEntry)
 
#define TransactionIdToCTsEntry(xid)    ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
 
#define CommitTsCtl   (&CommitTsCtlData)
 

Typedefs

typedef struct CommitTimestampEntry CommitTimestampEntry
 
typedef struct CommitTimestampShared CommitTimestampShared
 

Functions

static int64 TransactionIdToCTsPage (TransactionId xid)
 
static void SetXidCommitTsInPage (TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, RepOriginId nodeid, int64 pageno)
 
static void TransactionIdSetCommitTs (TransactionId xid, TimestampTz ts, RepOriginId nodeid, int slotno)
 
static void error_commit_ts_disabled (void)
 
static int ZeroCommitTsPage (int64 pageno, bool writeXlog)
 
static bool CommitTsPagePrecedes (int64 page1, int64 page2)
 
static void ActivateCommitTs (void)
 
static void DeactivateCommitTs (void)
 
static void WriteZeroPageXlogRec (int64 pageno)
 
static void WriteTruncateXlogRec (int64 pageno, TransactionId oldestXid)
 
void TransactionTreeSetCommitTsData (TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid)
 
bool TransactionIdGetCommitTsData (TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)
 
TransactionId GetLatestCommitTsData (TimestampTz *ts, RepOriginId *nodeid)
 
Datum pg_xact_commit_timestamp (PG_FUNCTION_ARGS)
 
Datum pg_last_committed_xact (PG_FUNCTION_ARGS)
 
Datum pg_xact_commit_timestamp_origin (PG_FUNCTION_ARGS)
 
static int CommitTsShmemBuffers (void)
 
Size CommitTsShmemSize (void)
 
void CommitTsShmemInit (void)
 
bool check_commit_ts_buffers (int *newval, void **extra, GucSource source)
 
void BootStrapCommitTs (void)
 
void StartupCommitTs (void)
 
void CompleteCommitTsInitialization (void)
 
void CommitTsParameterChange (bool newvalue, bool oldvalue)
 
void CheckPointCommitTs (void)
 
void ExtendCommitTs (TransactionId newestXact)
 
void TruncateCommitTs (TransactionId oldestXact)
 
void SetCommitTsLimit (TransactionId oldestXact, TransactionId newestXact)
 
void AdvanceOldestCommitTsXid (TransactionId oldestXact)
 
void commit_ts_redo (XLogReaderState *record)
 
int committssyncfiletag (const FileTag *ftag, char *path)
 

Variables

static SlruCtlData CommitTsCtlData
 
static CommitTimestampSharedcommitTsShared
 
bool track_commit_timestamp
 

Macro Definition Documentation

◆ COMMIT_TS_XACTS_PER_PAGE

#define COMMIT_TS_XACTS_PER_PAGE    (BLCKSZ / SizeOfCommitTimestampEntry)

Definition at line 63 of file commit_ts.c.

◆ CommitTsCtl

#define CommitTsCtl   (&CommitTsCtlData)

Definition at line 85 of file commit_ts.c.

◆ SizeOfCommitTimestampEntry

#define SizeOfCommitTimestampEntry
Value:
(offsetof(CommitTimestampEntry, nodeid) + \
sizeof(RepOriginId))
uint16 RepOriginId
Definition: xlogdefs.h:65

Definition at line 60 of file commit_ts.c.

◆ TransactionIdToCTsEntry

#define TransactionIdToCTsEntry (   xid)     ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)

Definition at line 77 of file commit_ts.c.

Typedef Documentation

◆ CommitTimestampEntry

◆ CommitTimestampShared

Function Documentation

◆ ActivateCommitTs()

static void ActivateCommitTs ( void  )
static

Definition at line 705 of file commit_ts.c.

706 {
707  TransactionId xid;
708  int64 pageno;
709 
710  /* If we've done this already, there's nothing to do */
711  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
713  {
714  LWLockRelease(CommitTsLock);
715  return;
716  }
717  LWLockRelease(CommitTsLock);
718 
720  pageno = TransactionIdToCTsPage(xid);
721 
722  /*
723  * Re-Initialize our idea of the latest page number.
724  */
725  pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number, pageno);
726 
727  /*
728  * If CommitTs is enabled, but it wasn't in the previous server run, we
729  * need to set the oldest and newest values to the next Xid; that way, we
730  * will not try to read data that might not have been set.
731  *
732  * XXX does this have a problem if a server is started with commitTs
733  * enabled, then started with commitTs disabled, then restarted with it
734  * enabled again? It doesn't look like it does, because there should be a
735  * checkpoint that sets the value to InvalidTransactionId at end of
736  * recovery; and so any chance of injecting new transactions without
737  * CommitTs values would occur after the oldestCommitTsXid has been set to
738  * Invalid temporarily.
739  */
740  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
742  {
745  }
746  LWLockRelease(CommitTsLock);
747 
748  /* Create the current segment file, if necessary */
750  {
751  LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
752  int slotno;
753 
755  slotno = ZeroCommitTsPage(pageno, false);
757  Assert(!CommitTsCtl->shared->page_dirty[slotno]);
758  LWLockRelease(lock);
759  }
760 
761  /* Change the activation status in shared memory. */
762  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
764  LWLockRelease(CommitTsLock);
765 }
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:480
uint32 TransactionId
Definition: c.h:639
static CommitTimestampShared * commitTsShared
Definition: commit_ts.c:105
static int64 TransactionIdToCTsPage(TransactionId xid)
Definition: commit_ts.c:72
#define CommitTsCtl
Definition: commit_ts.c:85
static int ZeroCommitTsPage(int64 pageno, bool writeXlog)
Definition: commit_ts.c:615
Assert(fmt[strlen(fmt) - 1] !='\n')
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1172
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1785
@ LW_EXCLUSIVE
Definition: lwlock.h:116
void SimpleLruWritePage(SlruCtl ctl, int slotno)
Definition: slru.c:715
bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int64 pageno)
Definition: slru.c:729
static LWLock * SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)
Definition: slru.h:179
Definition: lwlock.h:41
TransactionId oldestCommitTsXid
Definition: transam.h:232
TransactionId newestCommitTsXid
Definition: transam.h:233
FullTransactionId nextXid
Definition: transam.h:220
static TransactionId ReadNextTransactionId(void)
Definition: transam.h:315
#define InvalidTransactionId
Definition: transam.h:31
#define XidFromFullTransactionId(x)
Definition: transam.h:48
TransamVariablesData * TransamVariables
Definition: varsup.c:34

References Assert(), CommitTimestampShared::commitTsActive, CommitTsCtl, commitTsShared, InvalidTransactionId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), TransamVariablesData::newestCommitTsXid, TransamVariablesData::nextXid, TransamVariablesData::oldestCommitTsXid, pg_atomic_write_u64(), ReadNextTransactionId(), SimpleLruDoesPhysicalPageExist(), SimpleLruGetBankLock(), SimpleLruWritePage(), TransactionIdToCTsPage(), TransamVariables, XidFromFullTransactionId, and ZeroCommitTsPage().

Referenced by CommitTsParameterChange(), CompleteCommitTsInitialization(), and StartupCommitTs().

◆ AdvanceOldestCommitTsXid()

void AdvanceOldestCommitTsXid ( TransactionId  oldestXact)

Definition at line 936 of file commit_ts.c.

937 {
938  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
941  TransamVariables->oldestCommitTsXid = oldestXact;
942  LWLockRelease(CommitTsLock);
943 }
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280

References InvalidTransactionId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), TransamVariablesData::oldestCommitTsXid, TransactionIdPrecedes(), and TransamVariables.

Referenced by commit_ts_redo(), and vac_truncate_clog().

◆ BootStrapCommitTs()

void BootStrapCommitTs ( void  )

Definition at line 596 of file commit_ts.c.

597 {
598  /*
599  * Nothing to do here at present, unlike most other SLRU modules; segments
600  * are created when the server is started with this module enabled. See
601  * ActivateCommitTs.
602  */
603 }

Referenced by BootStrapXLOG().

◆ check_commit_ts_buffers()

bool check_commit_ts_buffers ( int *  newval,
void **  extra,
GucSource  source 
)

Definition at line 584 of file commit_ts.c.

585 {
586  return check_slru_buffers("commit_timestamp_buffers", newval);
587 }
#define newval
bool check_slru_buffers(const char *name, int *newval)
Definition: slru.c:341

References check_slru_buffers(), and newval.

◆ CheckPointCommitTs()

void CheckPointCommitTs ( void  )

Definition at line 820 of file commit_ts.c.

821 {
822  /*
823  * Write dirty CommitTs pages to disk. This may result in sync requests
824  * queued for later handling by ProcessSyncRequests(), as part of the
825  * checkpoint.
826  */
828 }
void SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
Definition: slru.c:1305

References CommitTsCtl, and SimpleLruWriteAll().

Referenced by CheckPointGuts().

◆ commit_ts_redo()

void commit_ts_redo ( XLogReaderState record)

Definition at line 1016 of file commit_ts.c.

1017 {
1018  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
1019 
1020  /* Backup blocks are not used in commit_ts records */
1021  Assert(!XLogRecHasAnyBlockRefs(record));
1022 
1023  if (info == COMMIT_TS_ZEROPAGE)
1024  {
1025  int64 pageno;
1026  int slotno;
1027  LWLock *lock;
1028 
1029  memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
1030 
1031  lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
1032  LWLockAcquire(lock, LW_EXCLUSIVE);
1033 
1034  slotno = ZeroCommitTsPage(pageno, false);
1036  Assert(!CommitTsCtl->shared->page_dirty[slotno]);
1037 
1038  LWLockRelease(lock);
1039  }
1040  else if (info == COMMIT_TS_TRUNCATE)
1041  {
1043 
1045 
1046  /*
1047  * During XLOG replay, latest_page_number isn't set up yet; insert a
1048  * suitable value to bypass the sanity test in SimpleLruTruncate.
1049  */
1050  pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number,
1051  trunc->pageno);
1052 
1054  }
1055  else
1056  elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1057 }
unsigned char uint8
Definition: c.h:491
void AdvanceOldestCommitTsXid(TransactionId oldestXact)
Definition: commit_ts.c:936
#define COMMIT_TS_ZEROPAGE
Definition: commit_ts.h:46
#define COMMIT_TS_TRUNCATE
Definition: commit_ts.h:47
#define PANIC
Definition: elog.h:42
#define elog(elevel,...)
Definition: elog.h:224
void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
Definition: slru.c:1391
TransactionId oldestXid
Definition: commit_ts.h:63
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:410
#define XLogRecGetData(decoder)
Definition: xlogreader.h:415
#define XLogRecHasAnyBlockRefs(decoder)
Definition: xlogreader.h:417
#define XLR_INFO_MASK
Definition: xlogrecord.h:62

References AdvanceOldestCommitTsXid(), Assert(), COMMIT_TS_TRUNCATE, COMMIT_TS_ZEROPAGE, CommitTsCtl, elog, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), xl_commit_ts_truncate::oldestXid, xl_commit_ts_truncate::pageno, PANIC, pg_atomic_write_u64(), SimpleLruGetBankLock(), SimpleLruTruncate(), SimpleLruWritePage(), XLogRecGetData, XLogRecGetInfo, XLogRecHasAnyBlockRefs, XLR_INFO_MASK, and ZeroCommitTsPage().

◆ CommitTsPagePrecedes()

static bool CommitTsPagePrecedes ( int64  page1,
int64  page2 
)
static

Definition at line 970 of file commit_ts.c.

971 {
972  TransactionId xid1;
973  TransactionId xid2;
974 
975  xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
976  xid1 += FirstNormalTransactionId + 1;
977  xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
978  xid2 += FirstNormalTransactionId + 1;
979 
980  return (TransactionIdPrecedes(xid1, xid2) &&
982 }
#define COMMIT_TS_XACTS_PER_PAGE
Definition: commit_ts.c:63
#define FirstNormalTransactionId
Definition: transam.h:34

References COMMIT_TS_XACTS_PER_PAGE, FirstNormalTransactionId, and TransactionIdPrecedes().

Referenced by CommitTsShmemInit().

◆ CommitTsParameterChange()

void CommitTsParameterChange ( bool  newvalue,
bool  oldvalue 
)

Definition at line 664 of file commit_ts.c.

665 {
666  /*
667  * If the commit_ts module is disabled in this server and we get word from
668  * the primary server that it is enabled there, activate it so that we can
669  * replay future WAL records involving it; also mark it as active on
670  * pg_control. If the old value was already set, we already did this, so
671  * don't do anything.
672  *
673  * If the module is disabled in the primary, disable it here too, unless
674  * the module is enabled locally.
675  *
676  * Note this only runs in the recovery process, so an unlocked read is
677  * fine.
678  */
679  if (newvalue)
680  {
683  }
684  else if (commitTsShared->commitTsActive)
686 }
static void DeactivateCommitTs(void)
Definition: commit_ts.c:778
static void ActivateCommitTs(void)
Definition: commit_ts.c:705

References ActivateCommitTs(), CommitTimestampShared::commitTsActive, commitTsShared, and DeactivateCommitTs().

Referenced by xlog_redo().

◆ CommitTsShmemBuffers()

static int CommitTsShmemBuffers ( void  )
static

Definition at line 506 of file commit_ts.c.

507 {
508  /* auto-tune based on shared buffers */
509  if (commit_timestamp_buffers == 0)
510  return SimpleLruAutotuneBuffers(512, 1024);
511 
513 }
#define Min(x, y)
Definition: c.h:991
#define Max(x, y)
Definition: c.h:985
int commit_timestamp_buffers
Definition: globals.c:161
int SimpleLruAutotuneBuffers(int divisor, int max)
Definition: slru.c:217
#define SLRU_MAX_ALLOWED_BUFFERS
Definition: slru.h:24

References commit_timestamp_buffers, Max, Min, SimpleLruAutotuneBuffers(), and SLRU_MAX_ALLOWED_BUFFERS.

Referenced by CommitTsShmemInit(), and CommitTsShmemSize().

◆ CommitTsShmemInit()

void CommitTsShmemInit ( void  )

Definition at line 530 of file commit_ts.c.

531 {
532  bool found;
533 
534  /* If auto-tuning is requested, now is the time to do it */
535  if (commit_timestamp_buffers == 0)
536  {
537  char buf[32];
538 
539  snprintf(buf, sizeof(buf), "%d", CommitTsShmemBuffers());
540  SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
542 
543  /*
544  * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
545  * However, if the DBA explicitly set commit_timestamp_buffers = 0 in
546  * the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override
547  * that and we must force the matter with PGC_S_OVERRIDE.
548  */
549  if (commit_timestamp_buffers == 0) /* failed to apply it? */
550  SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
552  }
554 
555  CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
556  SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
557  "pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER,
560  false);
562 
563  commitTsShared = ShmemInitStruct("CommitTs shared",
564  sizeof(CommitTimestampShared),
565  &found);
566 
567  if (!IsUnderPostmaster)
568  {
569  Assert(!found);
570 
575  }
576  else
577  Assert(found);
578 }
static int CommitTsShmemBuffers(void)
Definition: commit_ts.c:506
static bool CommitTsPagePrecedes(int64 page1, int64 page2)
Definition: commit_ts.c:970
#define TIMESTAMP_NOBEGIN(j)
Definition: timestamp.h:159
bool IsUnderPostmaster
Definition: globals.c:116
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:4257
@ PGC_S_DYNAMIC_DEFAULT
Definition: guc.h:110
@ PGC_S_OVERRIDE
Definition: guc.h:119
@ PGC_POSTMASTER
Definition: guc.h:70
@ LWTRANCHE_COMMITTS_BUFFER
Definition: lwlock.h:182
@ LWTRANCHE_COMMITTS_SLRU
Definition: lwlock.h:212
#define InvalidRepOriginId
Definition: origin.h:33
static char * buf
Definition: pg_test_fsync.c:73
#define snprintf
Definition: port.h:238
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)
Definition: slru.c:238
#define SlruPagePrecedesUnitTests(ctl, per_page)
Definition: slru.h:203
TimestampTz time
Definition: commit_ts.c:56
RepOriginId nodeid
Definition: commit_ts.c:57
CommitTimestampEntry dataLastCommit
Definition: commit_ts.c:101
TransactionId xidLastCommit
Definition: commit_ts.c:100
@ SYNC_HANDLER_COMMIT_TS
Definition: sync.h:39

References Assert(), buf, commit_timestamp_buffers, COMMIT_TS_XACTS_PER_PAGE, CommitTimestampShared::commitTsActive, CommitTsCtl, CommitTsPagePrecedes(), commitTsShared, CommitTsShmemBuffers(), CommitTimestampShared::dataLastCommit, InvalidRepOriginId, InvalidTransactionId, IsUnderPostmaster, LWTRANCHE_COMMITTS_BUFFER, LWTRANCHE_COMMITTS_SLRU, CommitTimestampEntry::nodeid, PGC_POSTMASTER, PGC_S_DYNAMIC_DEFAULT, PGC_S_OVERRIDE, SetConfigOption(), ShmemInitStruct(), SimpleLruInit(), SlruPagePrecedesUnitTests, snprintf, SYNC_HANDLER_COMMIT_TS, CommitTimestampEntry::time, TIMESTAMP_NOBEGIN, and CommitTimestampShared::xidLastCommit.

Referenced by CreateOrAttachShmemStructs().

◆ CommitTsShmemSize()

Size CommitTsShmemSize ( void  )

Definition at line 519 of file commit_ts.c.

520 {
522  sizeof(CommitTimestampShared);
523 }
struct CommitTimestampShared CommitTimestampShared
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:184

References CommitTsShmemBuffers(), and SimpleLruShmemSize().

Referenced by CalculateShmemSize().

◆ committssyncfiletag()

int committssyncfiletag ( const FileTag ftag,
char *  path 
)

Definition at line 1063 of file commit_ts.c.

1064 {
1065  return SlruSyncFileTag(CommitTsCtl, ftag, path);
1066 }
int SlruSyncFileTag(SlruCtl ctl, const FileTag *ftag, char *path)
Definition: slru.c:1814

References CommitTsCtl, and SlruSyncFileTag().

◆ CompleteCommitTsInitialization()

void CompleteCommitTsInitialization ( void  )

Definition at line 642 of file commit_ts.c.

643 {
644  /*
645  * If the feature is not enabled, turn it off for good. This also removes
646  * any leftover data.
647  *
648  * Conversely, we activate the module if the feature is enabled. This is
649  * necessary for primary and standby as the activation depends on the
650  * control file contents at the beginning of recovery or when a
651  * XLOG_PARAMETER_CHANGE is replayed.
652  */
655  else
657 }
bool track_commit_timestamp
Definition: commit_ts.c:109

References ActivateCommitTs(), DeactivateCommitTs(), and track_commit_timestamp.

Referenced by StartupXLOG().

◆ DeactivateCommitTs()

static void DeactivateCommitTs ( void  )
static

Definition at line 778 of file commit_ts.c.

779 {
780  /*
781  * Cleanup the status in the shared memory.
782  *
783  * We reset everything in the commitTsShared record to prevent user from
784  * getting confusing data about last committed transaction on the standby
785  * when the module was activated repeatedly on the primary.
786  */
787  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
788 
793 
796 
797  /*
798  * Remove *all* files. This is necessary so that there are no leftover
799  * files; in the case where this feature is later enabled after running
800  * with it disabled for some time there may be a gap in the file sequence.
801  * (We can probably tolerate out-of-sequence files, as they are going to
802  * be overwritten anyway when we wrap around, but it seems better to be
803  * tidy.)
804  *
805  * Note that we do this with CommitTsLock acquired in exclusive mode. This
806  * is very heavy-handed, but since this routine can only be called in the
807  * replica and should happen very rarely, we don't worry too much about
808  * it. Note also that no process should be consulting this SLRU if we
809  * have just deactivated it.
810  */
812 
813  LWLockRelease(CommitTsLock);
814 }
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1774
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
Definition: slru.c:1727

References CommitTimestampShared::commitTsActive, CommitTsCtl, commitTsShared, CommitTimestampShared::dataLastCommit, InvalidRepOriginId, InvalidTransactionId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), TransamVariablesData::newestCommitTsXid, CommitTimestampEntry::nodeid, TransamVariablesData::oldestCommitTsXid, SlruScanDirCbDeleteAll(), SlruScanDirectory(), CommitTimestampEntry::time, TIMESTAMP_NOBEGIN, TransamVariables, and CommitTimestampShared::xidLastCommit.

Referenced by CommitTsParameterChange(), and CompleteCommitTsInitialization().

◆ error_commit_ts_disabled()

static void error_commit_ts_disabled ( void  )
static

Definition at line 381 of file commit_ts.c.

382 {
383  ereport(ERROR,
384  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
385  errmsg("could not get commit timestamp data"),
387  errhint("Make sure the configuration parameter %s is set on the primary server.",
388  "track_commit_timestamp") :
389  errhint("Make sure the configuration parameter %s is set.",
390  "track_commit_timestamp")));
391 }
int errhint(const char *fmt,...)
Definition: elog.c:1319
int errcode(int sqlerrcode)
Definition: elog.c:859
int errmsg(const char *fmt,...)
Definition: elog.c:1072
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
bool RecoveryInProgress(void)
Definition: xlog.c:6201

References ereport, errcode(), errhint(), errmsg(), ERROR, and RecoveryInProgress().

Referenced by GetLatestCommitTsData(), and TransactionIdGetCommitTsData().

◆ ExtendCommitTs()

void ExtendCommitTs ( TransactionId  newestXact)

Definition at line 842 of file commit_ts.c.

843 {
844  int64 pageno;
845  LWLock *lock;
846 
847  /*
848  * Nothing to do if module not enabled. Note we do an unlocked read of
849  * the flag here, which is okay because this routine is only called from
850  * GetNewTransactionId, which is never called in a standby.
851  */
852  Assert(!InRecovery);
854  return;
855 
856  /*
857  * No work except at first XID of a page. But beware: just after
858  * wraparound, the first XID of page zero is FirstNormalTransactionId.
859  */
860  if (TransactionIdToCTsEntry(newestXact) != 0 &&
862  return;
863 
864  pageno = TransactionIdToCTsPage(newestXact);
865 
866  lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
867 
869 
870  /* Zero the page and make an XLOG entry about it */
871  ZeroCommitTsPage(pageno, !InRecovery);
872 
873  LWLockRelease(lock);
874 }
#define TransactionIdToCTsEntry(xid)
Definition: commit_ts.c:77
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
bool InRecovery
Definition: xlogutils.c:50

References Assert(), CommitTimestampShared::commitTsActive, CommitTsCtl, commitTsShared, FirstNormalTransactionId, InRecovery, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SimpleLruGetBankLock(), TransactionIdEquals, TransactionIdToCTsEntry, TransactionIdToCTsPage(), and ZeroCommitTsPage().

Referenced by GetNewTransactionId().

◆ GetLatestCommitTsData()

TransactionId GetLatestCommitTsData ( TimestampTz ts,
RepOriginId nodeid 
)

Definition at line 360 of file commit_ts.c.

361 {
362  TransactionId xid;
363 
364  LWLockAcquire(CommitTsLock, LW_SHARED);
365 
366  /* Error if module not enabled */
369 
371  if (ts)
373  if (nodeid)
375  LWLockRelease(CommitTsLock);
376 
377  return xid;
378 }
static void error_commit_ts_disabled(void)
Definition: commit_ts.c:381
@ LW_SHARED
Definition: lwlock.h:117

References CommitTimestampShared::commitTsActive, commitTsShared, CommitTimestampShared::dataLastCommit, error_commit_ts_disabled(), LW_SHARED, LWLockAcquire(), LWLockRelease(), CommitTimestampEntry::nodeid, CommitTimestampEntry::time, and CommitTimestampShared::xidLastCommit.

Referenced by pg_last_committed_xact().

◆ pg_last_committed_xact()

Datum pg_last_committed_xact ( PG_FUNCTION_ARGS  )

Definition at line 420 of file commit_ts.c.

421 {
422  TransactionId xid;
423  RepOriginId nodeid;
424  TimestampTz ts;
425  Datum values[3];
426  bool nulls[3];
427  TupleDesc tupdesc;
428  HeapTuple htup;
429 
430  /* and construct a tuple with our data */
431  xid = GetLatestCommitTsData(&ts, &nodeid);
432 
433  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
434  elog(ERROR, "return type must be a row type");
435 
436  if (!TransactionIdIsNormal(xid))
437  {
438  memset(nulls, true, sizeof(nulls));
439  }
440  else
441  {
442  values[0] = TransactionIdGetDatum(xid);
443  nulls[0] = false;
444 
445  values[1] = TimestampTzGetDatum(ts);
446  nulls[1] = false;
447 
448  values[2] = ObjectIdGetDatum((Oid) nodeid);
449  nulls[2] = false;
450  }
451 
452  htup = heap_form_tuple(tupdesc, values, nulls);
453 
455 }
static Datum values[MAXATTR]
Definition: bootstrap.c:152
TransactionId GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:360
int64 TimestampTz
Definition: timestamp.h:39
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:353
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:276
@ TYPEFUNC_COMPOSITE
Definition: funcapi.h:149
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
Definition: funcapi.h:230
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1116
static Datum TransactionIdGetDatum(TransactionId X)
Definition: postgres.h:272
uintptr_t Datum
Definition: postgres.h:64
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
unsigned int Oid
Definition: postgres_ext.h:31
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52

References elog, ERROR, get_call_result_type(), GetLatestCommitTsData(), heap_form_tuple(), HeapTupleGetDatum(), ObjectIdGetDatum(), PG_RETURN_DATUM, TimestampTzGetDatum(), TransactionIdGetDatum(), TransactionIdIsNormal, TYPEFUNC_COMPOSITE, and values.

◆ pg_xact_commit_timestamp()

Datum pg_xact_commit_timestamp ( PG_FUNCTION_ARGS  )

Definition at line 397 of file commit_ts.c.

398 {
400  TimestampTz ts;
401  bool found;
402 
403  found = TransactionIdGetCommitTsData(xid, &ts, NULL);
404 
405  if (!found)
406  PG_RETURN_NULL();
407 
409 }
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:274
#define PG_RETURN_NULL()
Definition: fmgr.h:345
#define PG_GETARG_TRANSACTIONID(n)
Definition: fmgr.h:279
#define PG_RETURN_TIMESTAMPTZ(x)
Definition: timestamp.h:68

References PG_GETARG_TRANSACTIONID, PG_RETURN_NULL, PG_RETURN_TIMESTAMPTZ, and TransactionIdGetCommitTsData().

◆ pg_xact_commit_timestamp_origin()

Datum pg_xact_commit_timestamp_origin ( PG_FUNCTION_ARGS  )

Definition at line 464 of file commit_ts.c.

465 {
467  RepOriginId nodeid;
468  TimestampTz ts;
469  Datum values[2];
470  bool nulls[2];
471  TupleDesc tupdesc;
472  HeapTuple htup;
473  bool found;
474 
475  found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
476 
477  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
478  elog(ERROR, "return type must be a row type");
479 
480  if (!found)
481  {
482  memset(nulls, true, sizeof(nulls));
483  }
484  else
485  {
486  values[0] = TimestampTzGetDatum(ts);
487  nulls[0] = false;
488 
489  values[1] = ObjectIdGetDatum((Oid) nodeid);
490  nulls[1] = false;
491  }
492 
493  htup = heap_form_tuple(tupdesc, values, nulls);
494 
496 }

References elog, ERROR, get_call_result_type(), heap_form_tuple(), HeapTupleGetDatum(), ObjectIdGetDatum(), PG_GETARG_TRANSACTIONID, PG_RETURN_DATUM, TimestampTzGetDatum(), TransactionIdGetCommitTsData(), TYPEFUNC_COMPOSITE, and values.

◆ SetCommitTsLimit()

void SetCommitTsLimit ( TransactionId  oldestXact,
TransactionId  newestXact 
)

Definition at line 909 of file commit_ts.c.

910 {
911  /*
912  * Be careful not to overwrite values that are either further into the
913  * "future" or signal a disabled committs.
914  */
915  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
917  {
919  TransamVariables->oldestCommitTsXid = oldestXact;
921  TransamVariables->newestCommitTsXid = newestXact;
922  }
923  else
924  {
926  TransamVariables->oldestCommitTsXid = oldestXact;
927  TransamVariables->newestCommitTsXid = newestXact;
928  }
929  LWLockRelease(CommitTsLock);
930 }

References Assert(), InvalidTransactionId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), TransamVariablesData::newestCommitTsXid, TransamVariablesData::oldestCommitTsXid, TransactionIdPrecedes(), and TransamVariables.

Referenced by BootStrapXLOG(), and StartupXLOG().

◆ SetXidCommitTsInPage()

static void SetXidCommitTsInPage ( TransactionId  xid,
int  nsubxids,
TransactionId subxids,
TimestampTz  ts,
RepOriginId  nodeid,
int64  pageno 
)
static

Definition at line 222 of file commit_ts.c.

225 {
226  LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
227  int slotno;
228  int i;
229 
231 
232  slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
233 
234  TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
235  for (i = 0; i < nsubxids; i++)
236  TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
237 
238  CommitTsCtl->shared->page_dirty[slotno] = true;
239 
240  LWLockRelease(lock);
241 }
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, RepOriginId nodeid, int slotno)
Definition: commit_ts.c:249
int i
Definition: isn.c:73
int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, TransactionId xid)
Definition: slru.c:488

References CommitTsCtl, i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SimpleLruGetBankLock(), SimpleLruReadPage(), and TransactionIdSetCommitTs().

Referenced by TransactionTreeSetCommitTsData().

◆ StartupCommitTs()

void StartupCommitTs ( void  )

Definition at line 632 of file commit_ts.c.

633 {
635 }

References ActivateCommitTs().

Referenced by StartupXLOG().

◆ TransactionIdGetCommitTsData()

bool TransactionIdGetCommitTsData ( TransactionId  xid,
TimestampTz ts,
RepOriginId nodeid 
)

Definition at line 274 of file commit_ts.c.

276 {
277  int64 pageno = TransactionIdToCTsPage(xid);
278  int entryno = TransactionIdToCTsEntry(xid);
279  int slotno;
280  CommitTimestampEntry entry;
281  TransactionId oldestCommitTsXid;
282  TransactionId newestCommitTsXid;
283 
284  if (!TransactionIdIsValid(xid))
285  ereport(ERROR,
286  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
287  errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
288  else if (!TransactionIdIsNormal(xid))
289  {
290  /* frozen and bootstrap xids are always committed far in the past */
291  *ts = 0;
292  if (nodeid)
293  *nodeid = 0;
294  return false;
295  }
296 
297  LWLockAcquire(CommitTsLock, LW_SHARED);
298 
299  /* Error if module not enabled */
302 
303  /*
304  * If we're asked for the cached value, return that. Otherwise, fall
305  * through to read from SLRU.
306  */
307  if (commitTsShared->xidLastCommit == xid)
308  {
310  if (nodeid)
312 
313  LWLockRelease(CommitTsLock);
314  return *ts != 0;
315  }
316 
317  oldestCommitTsXid = TransamVariables->oldestCommitTsXid;
318  newestCommitTsXid = TransamVariables->newestCommitTsXid;
319  /* neither is invalid, or both are */
320  Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
321  LWLockRelease(CommitTsLock);
322 
323  /*
324  * Return empty if the requested value is outside our valid range.
325  */
326  if (!TransactionIdIsValid(oldestCommitTsXid) ||
327  TransactionIdPrecedes(xid, oldestCommitTsXid) ||
328  TransactionIdPrecedes(newestCommitTsXid, xid))
329  {
330  *ts = 0;
331  if (nodeid)
332  *nodeid = InvalidRepOriginId;
333  return false;
334  }
335 
336  /* lock is acquired by SimpleLruReadPage_ReadOnly */
337  slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
338  memcpy(&entry,
339  CommitTsCtl->shared->page_buffer[slotno] +
340  SizeOfCommitTimestampEntry * entryno,
342 
343  *ts = entry.time;
344  if (nodeid)
345  *nodeid = entry.nodeid;
346 
348  return *ts != 0;
349 }
#define SizeOfCommitTimestampEntry
Definition: commit_ts.c:60
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
Definition: slru.c:592
#define TransactionIdIsValid(xid)
Definition: transam.h:41

References Assert(), CommitTimestampShared::commitTsActive, CommitTsCtl, commitTsShared, CommitTimestampShared::dataLastCommit, ereport, errcode(), errmsg(), ERROR, error_commit_ts_disabled(), InvalidRepOriginId, LW_SHARED, LWLockAcquire(), LWLockRelease(), TransamVariablesData::newestCommitTsXid, CommitTimestampEntry::nodeid, TransamVariablesData::oldestCommitTsXid, SimpleLruGetBankLock(), SimpleLruReadPage_ReadOnly(), SizeOfCommitTimestampEntry, CommitTimestampEntry::time, TransactionIdIsNormal, TransactionIdIsValid, TransactionIdPrecedes(), TransactionIdToCTsEntry, TransactionIdToCTsPage(), TransamVariables, and CommitTimestampShared::xidLastCommit.

Referenced by pg_xact_commit_timestamp(), and pg_xact_commit_timestamp_origin().

◆ TransactionIdSetCommitTs()

static void TransactionIdSetCommitTs ( TransactionId  xid,
TimestampTz  ts,
RepOriginId  nodeid,
int  slotno 
)
static

Definition at line 249 of file commit_ts.c.

251 {
252  int entryno = TransactionIdToCTsEntry(xid);
253  CommitTimestampEntry entry;
254 
256 
257  entry.time = ts;
258  entry.nodeid = nodeid;
259 
260  memcpy(CommitTsCtl->shared->page_buffer[slotno] +
261  SizeOfCommitTimestampEntry * entryno,
263 }

References Assert(), CommitTsCtl, CommitTimestampEntry::nodeid, SizeOfCommitTimestampEntry, CommitTimestampEntry::time, TransactionIdIsNormal, and TransactionIdToCTsEntry.

Referenced by SetXidCommitTsInPage().

◆ TransactionIdToCTsPage()

static int64 TransactionIdToCTsPage ( TransactionId  xid)
inlinestatic

◆ TransactionTreeSetCommitTsData()

void TransactionTreeSetCommitTsData ( TransactionId  xid,
int  nsubxids,
TransactionId subxids,
TimestampTz  timestamp,
RepOriginId  nodeid 
)

Definition at line 141 of file commit_ts.c.

144 {
145  int i;
146  TransactionId headxid;
147  TransactionId newestXact;
148 
149  /*
150  * No-op if the module is not active.
151  *
152  * An unlocked read here is fine, because in a standby (the only place
153  * where the flag can change in flight) this routine is only called by the
154  * recovery process, which is also the only process which can change the
155  * flag.
156  */
158  return;
159 
160  /*
161  * Figure out the latest Xid in this batch: either the last subxid if
162  * there's any, otherwise the parent xid.
163  */
164  if (nsubxids > 0)
165  newestXact = subxids[nsubxids - 1];
166  else
167  newestXact = xid;
168 
169  /*
170  * We split the xids to set the timestamp to in groups belonging to the
171  * same SLRU page; the first element in each such set is its head. The
172  * first group has the main XID as the head; subsequent sets use the first
173  * subxid not on the previous page as head. This way, we only have to
174  * lock/modify each SLRU page once.
175  */
176  headxid = xid;
177  i = 0;
178  for (;;)
179  {
180  int64 pageno = TransactionIdToCTsPage(headxid);
181  int j;
182 
183  for (j = i; j < nsubxids; j++)
184  {
185  if (TransactionIdToCTsPage(subxids[j]) != pageno)
186  break;
187  }
188  /* subxids[i..j] are on the same page as the head */
189 
190  SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
191  pageno);
192 
193  /* if we wrote out all subxids, we're done. */
194  if (j >= nsubxids)
195  break;
196 
197  /*
198  * Set the new head and skip over it, as well as over the subxids we
199  * just wrote.
200  */
201  headxid = subxids[j];
202  i = j + 1;
203  }
204 
205  /* update the cached value in shared memory */
206  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
210 
211  /* and move forwards our endpoint, if needed */
213  TransamVariables->newestCommitTsXid = newestXact;
214  LWLockRelease(CommitTsLock);
215 }
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, RepOriginId nodeid, int64 pageno)
Definition: commit_ts.c:222
int j
Definition: isn.c:74
int64 timestamp

References CommitTimestampShared::commitTsActive, commitTsShared, CommitTimestampShared::dataLastCommit, i, j, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), TransamVariablesData::newestCommitTsXid, CommitTimestampEntry::nodeid, SetXidCommitTsInPage(), CommitTimestampEntry::time, TransactionIdPrecedes(), TransactionIdToCTsPage(), TransamVariables, and CommitTimestampShared::xidLastCommit.

Referenced by RecordTransactionCommit(), RecordTransactionCommitPrepared(), and xact_redo_commit().

◆ TruncateCommitTs()

void TruncateCommitTs ( TransactionId  oldestXact)

Definition at line 883 of file commit_ts.c.

884 {
885  int64 cutoffPage;
886 
887  /*
888  * The cutoff point is the start of the segment containing oldestXact. We
889  * pass the *page* containing oldestXact to SimpleLruTruncate.
890  */
891  cutoffPage = TransactionIdToCTsPage(oldestXact);
892 
893  /* Check to see if there's any files that could be removed */
895  &cutoffPage))
896  return; /* nothing to remove */
897 
898  /* Write XLOG record */
899  WriteTruncateXlogRec(cutoffPage, oldestXact);
900 
901  /* Now we can remove the old CommitTs segment(s) */
902  SimpleLruTruncate(CommitTsCtl, cutoffPage);
903 }
static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid)
Definition: commit_ts.c:1000
bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int64 segpage, void *data)
Definition: slru.c:1695

References CommitTsCtl, SimpleLruTruncate(), SlruScanDirCbReportPresence(), SlruScanDirectory(), TransactionIdToCTsPage(), and WriteTruncateXlogRec().

Referenced by vac_truncate_clog().

◆ WriteTruncateXlogRec()

static void WriteTruncateXlogRec ( int64  pageno,
TransactionId  oldestXid 
)
static

Definition at line 1000 of file commit_ts.c.

1001 {
1002  xl_commit_ts_truncate xlrec;
1003 
1004  xlrec.pageno = pageno;
1005  xlrec.oldestXid = oldestXid;
1006 
1007  XLogBeginInsert();
1008  XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
1009  (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
1010 }
#define SizeOfCommitTsTruncate
Definition: commit_ts.h:66
void XLogRegisterData(char *data, uint32 len)
Definition: xloginsert.c:364
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:474
void XLogBeginInsert(void)
Definition: xloginsert.c:149

References COMMIT_TS_TRUNCATE, xl_commit_ts_truncate::oldestXid, xl_commit_ts_truncate::pageno, SizeOfCommitTsTruncate, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by TruncateCommitTs().

◆ WriteZeroPageXlogRec()

static void WriteZeroPageXlogRec ( int64  pageno)
static

Definition at line 989 of file commit_ts.c.

990 {
991  XLogBeginInsert();
992  XLogRegisterData((char *) (&pageno), sizeof(pageno));
993  (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
994 }

References COMMIT_TS_ZEROPAGE, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by ZeroCommitTsPage().

◆ ZeroCommitTsPage()

static int ZeroCommitTsPage ( int64  pageno,
bool  writeXlog 
)
static

Definition at line 615 of file commit_ts.c.

616 {
617  int slotno;
618 
619  slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
620 
621  if (writeXlog)
622  WriteZeroPageXlogRec(pageno);
623 
624  return slotno;
625 }
static void WriteZeroPageXlogRec(int64 pageno)
Definition: commit_ts.c:989
int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
Definition: slru.c:361

References CommitTsCtl, SimpleLruZeroPage(), and WriteZeroPageXlogRec().

Referenced by ActivateCommitTs(), commit_ts_redo(), and ExtendCommitTs().

Variable Documentation

◆ CommitTsCtlData

SlruCtlData CommitTsCtlData
static

Definition at line 83 of file commit_ts.c.

◆ commitTsShared

◆ track_commit_timestamp

bool track_commit_timestamp