PostgreSQL Source Code  git master
commit_ts.c File Reference
#include "postgres.h"
#include "access/commit_ts.h"
#include "access/htup_details.h"
#include "access/slru.h"
#include "access/transam.h"
#include "access/xloginsert.h"
#include "access/xlogutils.h"
#include "catalog/pg_type.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "pg_trace.h"
#include "storage/shmem.h"
#include "utils/builtins.h"
#include "utils/snapmgr.h"
#include "utils/timestamp.h"
Include dependency graph for commit_ts.c:

Go to the source code of this file.

Data Structures

struct  CommitTimestampEntry
 
struct  CommitTimestampShared
 

Macros

#define SizeOfCommitTimestampEntry
 
#define COMMIT_TS_XACTS_PER_PAGE    (BLCKSZ / SizeOfCommitTimestampEntry)
 
#define TransactionIdToCTsPage(xid)    ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
 
#define TransactionIdToCTsEntry(xid)    ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
 
#define CommitTsCtl   (&CommitTsCtlData)
 

Typedefs

typedef struct CommitTimestampEntry CommitTimestampEntry
 
typedef struct CommitTimestampShared CommitTimestampShared
 

Functions

static void SetXidCommitTsInPage (TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, RepOriginId nodeid, int pageno)
 
static void TransactionIdSetCommitTs (TransactionId xid, TimestampTz ts, RepOriginId nodeid, int slotno)
 
static void error_commit_ts_disabled (void)
 
static int ZeroCommitTsPage (int pageno, bool writeXlog)
 
static bool CommitTsPagePrecedes (int page1, int page2)
 
static void ActivateCommitTs (void)
 
static void DeactivateCommitTs (void)
 
static void WriteZeroPageXlogRec (int pageno)
 
static void WriteTruncateXlogRec (int pageno, TransactionId oldestXid)
 
void TransactionTreeSetCommitTsData (TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid)
 
bool TransactionIdGetCommitTsData (TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)
 
TransactionId GetLatestCommitTsData (TimestampTz *ts, RepOriginId *nodeid)
 
Datum pg_xact_commit_timestamp (PG_FUNCTION_ARGS)
 
Datum pg_last_committed_xact (PG_FUNCTION_ARGS)
 
Datum pg_xact_commit_timestamp_origin (PG_FUNCTION_ARGS)
 
Size CommitTsShmemBuffers (void)
 
Size CommitTsShmemSize (void)
 
void CommitTsShmemInit (void)
 
void BootStrapCommitTs (void)
 
void StartupCommitTs (void)
 
void CompleteCommitTsInitialization (void)
 
void CommitTsParameterChange (bool newvalue, bool oldvalue)
 
void CheckPointCommitTs (void)
 
void ExtendCommitTs (TransactionId newestXact)
 
void TruncateCommitTs (TransactionId oldestXact)
 
void SetCommitTsLimit (TransactionId oldestXact, TransactionId newestXact)
 
void AdvanceOldestCommitTsXid (TransactionId oldestXact)
 
void commit_ts_redo (XLogReaderState *record)
 
int committssyncfiletag (const FileTag *ftag, char *path)
 

Variables

static SlruCtlData CommitTsCtlData
 
static CommitTimestampSharedcommitTsShared
 
bool track_commit_timestamp
 

Macro Definition Documentation

◆ COMMIT_TS_XACTS_PER_PAGE

#define COMMIT_TS_XACTS_PER_PAGE    (BLCKSZ / SizeOfCommitTimestampEntry)

Definition at line 65 of file commit_ts.c.

◆ CommitTsCtl

#define CommitTsCtl   (&CommitTsCtlData)

Definition at line 78 of file commit_ts.c.

◆ SizeOfCommitTimestampEntry

#define SizeOfCommitTimestampEntry
Value:
(offsetof(CommitTimestampEntry, nodeid) + \
sizeof(RepOriginId))
uint16 RepOriginId
Definition: xlogdefs.h:65

Definition at line 62 of file commit_ts.c.

◆ TransactionIdToCTsEntry

#define TransactionIdToCTsEntry (   xid)     ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)

Definition at line 70 of file commit_ts.c.

◆ TransactionIdToCTsPage

#define TransactionIdToCTsPage (   xid)     ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)

Definition at line 68 of file commit_ts.c.

Typedef Documentation

◆ CommitTimestampEntry

◆ CommitTimestampShared

Function Documentation

◆ ActivateCommitTs()

static void ActivateCommitTs ( void  )
static

Definition at line 662 of file commit_ts.c.

663 {
664  TransactionId xid;
665  int pageno;
666 
667  /* If we've done this already, there's nothing to do */
668  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
670  {
671  LWLockRelease(CommitTsLock);
672  return;
673  }
674  LWLockRelease(CommitTsLock);
675 
677  pageno = TransactionIdToCTsPage(xid);
678 
679  /*
680  * Re-Initialize our idea of the latest page number.
681  */
682  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
683  CommitTsCtl->shared->latest_page_number = pageno;
684  LWLockRelease(CommitTsSLRULock);
685 
686  /*
687  * If CommitTs is enabled, but it wasn't in the previous server run, we
688  * need to set the oldest and newest values to the next Xid; that way, we
689  * will not try to read data that might not have been set.
690  *
691  * XXX does this have a problem if a server is started with commitTs
692  * enabled, then started with commitTs disabled, then restarted with it
693  * enabled again? It doesn't look like it does, because there should be a
694  * checkpoint that sets the value to InvalidTransactionId at end of
695  * recovery; and so any chance of injecting new transactions without
696  * CommitTs values would occur after the oldestCommitTsXid has been set to
697  * Invalid temporarily.
698  */
699  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
701  {
704  }
705  LWLockRelease(CommitTsLock);
706 
707  /* Create the current segment file, if necessary */
709  {
710  int slotno;
711 
712  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
713  slotno = ZeroCommitTsPage(pageno, false);
715  Assert(!CommitTsCtl->shared->page_dirty[slotno]);
716  LWLockRelease(CommitTsSLRULock);
717  }
718 
719  /* Change the activation status in shared memory. */
720  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
722  LWLockRelease(CommitTsLock);
723 }
uint32 TransactionId
Definition: c.h:641
static CommitTimestampShared * commitTsShared
Definition: commit_ts.c:98
static int ZeroCommitTsPage(int pageno, bool writeXlog)
Definition: commit_ts.c:572
#define CommitTsCtl
Definition: commit_ts.c:78
#define TransactionIdToCTsPage(xid)
Definition: commit_ts.c:68
Assert(fmt[strlen(fmt) - 1] !='\n')
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1195
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1808
@ LW_EXCLUSIVE
Definition: lwlock.h:116
void SimpleLruWritePage(SlruCtl ctl, int slotno)
Definition: slru.c:615
bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int pageno)
Definition: slru.c:627
FullTransactionId nextXid
Definition: transam.h:220
TransactionId newestCommitTsXid
Definition: transam.h:233
TransactionId oldestCommitTsXid
Definition: transam.h:232
static TransactionId ReadNextTransactionId(void)
Definition: transam.h:315
#define InvalidTransactionId
Definition: transam.h:31
#define XidFromFullTransactionId(x)
Definition: transam.h:48
VariableCache ShmemVariableCache
Definition: varsup.c:34

References Assert(), CommitTimestampShared::commitTsActive, CommitTsCtl, commitTsShared, InvalidTransactionId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), VariableCacheData::newestCommitTsXid, VariableCacheData::nextXid, VariableCacheData::oldestCommitTsXid, ReadNextTransactionId(), ShmemVariableCache, SimpleLruDoesPhysicalPageExist(), SimpleLruWritePage(), TransactionIdToCTsPage, XidFromFullTransactionId, and ZeroCommitTsPage().

Referenced by CommitTsParameterChange(), CompleteCommitTsInitialization(), and StartupCommitTs().

◆ AdvanceOldestCommitTsXid()

void AdvanceOldestCommitTsXid ( TransactionId  oldestXact)

Definition at line 887 of file commit_ts.c.

888 {
889  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
893  LWLockRelease(CommitTsLock);
894 }
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280

References InvalidTransactionId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), VariableCacheData::oldestCommitTsXid, ShmemVariableCache, and TransactionIdPrecedes().

Referenced by commit_ts_redo(), and vac_truncate_clog().

◆ BootStrapCommitTs()

void BootStrapCommitTs ( void  )

Definition at line 553 of file commit_ts.c.

554 {
555  /*
556  * Nothing to do here at present, unlike most other SLRU modules; segments
557  * are created when the server is started with this module enabled. See
558  * ActivateCommitTs.
559  */
560 }

Referenced by BootStrapXLOG().

◆ CheckPointCommitTs()

void CheckPointCommitTs ( void  )

Definition at line 774 of file commit_ts.c.

775 {
776  /*
777  * Write dirty CommitTs pages to disk. This may result in sync requests
778  * queued for later handling by ProcessSyncRequests(), as part of the
779  * checkpoint.
780  */
782 }
void SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
Definition: slru.c:1157

References CommitTsCtl, and SimpleLruWriteAll().

Referenced by CheckPointGuts().

◆ commit_ts_redo()

void commit_ts_redo ( XLogReaderState record)

Definition at line 967 of file commit_ts.c.

968 {
969  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
970 
971  /* Backup blocks are not used in commit_ts records */
972  Assert(!XLogRecHasAnyBlockRefs(record));
973 
974  if (info == COMMIT_TS_ZEROPAGE)
975  {
976  int pageno;
977  int slotno;
978 
979  memcpy(&pageno, XLogRecGetData(record), sizeof(int));
980 
981  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
982 
983  slotno = ZeroCommitTsPage(pageno, false);
985  Assert(!CommitTsCtl->shared->page_dirty[slotno]);
986 
987  LWLockRelease(CommitTsSLRULock);
988  }
989  else if (info == COMMIT_TS_TRUNCATE)
990  {
992 
994 
995  /*
996  * During XLOG replay, latest_page_number isn't set up yet; insert a
997  * suitable value to bypass the sanity test in SimpleLruTruncate.
998  */
999  CommitTsCtl->shared->latest_page_number = trunc->pageno;
1000 
1002  }
1003  else
1004  elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1005 }
unsigned char uint8
Definition: c.h:493
void AdvanceOldestCommitTsXid(TransactionId oldestXact)
Definition: commit_ts.c:887
#define COMMIT_TS_ZEROPAGE
Definition: commit_ts.h:47
#define COMMIT_TS_TRUNCATE
Definition: commit_ts.h:48
#define PANIC
Definition: elog.h:42
void SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
Definition: slru.c:1227
TransactionId oldestXid
Definition: commit_ts.h:64
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:410
#define XLogRecGetData(decoder)
Definition: xlogreader.h:415
#define XLogRecHasAnyBlockRefs(decoder)
Definition: xlogreader.h:417
#define XLR_INFO_MASK
Definition: xlogrecord.h:62

References AdvanceOldestCommitTsXid(), Assert(), COMMIT_TS_TRUNCATE, COMMIT_TS_ZEROPAGE, CommitTsCtl, elog(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), xl_commit_ts_truncate::oldestXid, xl_commit_ts_truncate::pageno, PANIC, SimpleLruTruncate(), SimpleLruWritePage(), XLogRecGetData, XLogRecGetInfo, XLogRecHasAnyBlockRefs, XLR_INFO_MASK, and ZeroCommitTsPage().

◆ CommitTsPagePrecedes()

static bool CommitTsPagePrecedes ( int  page1,
int  page2 
)
static

Definition at line 921 of file commit_ts.c.

922 {
923  TransactionId xid1;
924  TransactionId xid2;
925 
926  xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
927  xid1 += FirstNormalTransactionId + 1;
928  xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
929  xid2 += FirstNormalTransactionId + 1;
930 
931  return (TransactionIdPrecedes(xid1, xid2) &&
933 }
#define COMMIT_TS_XACTS_PER_PAGE
Definition: commit_ts.c:65
#define FirstNormalTransactionId
Definition: transam.h:34

References COMMIT_TS_XACTS_PER_PAGE, FirstNormalTransactionId, and TransactionIdPrecedes().

Referenced by CommitTsShmemInit().

◆ CommitTsParameterChange()

void CommitTsParameterChange ( bool  newvalue,
bool  oldvalue 
)

Definition at line 621 of file commit_ts.c.

622 {
623  /*
624  * If the commit_ts module is disabled in this server and we get word from
625  * the primary server that it is enabled there, activate it so that we can
626  * replay future WAL records involving it; also mark it as active on
627  * pg_control. If the old value was already set, we already did this, so
628  * don't do anything.
629  *
630  * If the module is disabled in the primary, disable it here too, unless
631  * the module is enabled locally.
632  *
633  * Note this only runs in the recovery process, so an unlocked read is
634  * fine.
635  */
636  if (newvalue)
637  {
640  }
641  else if (commitTsShared->commitTsActive)
643 }
static void DeactivateCommitTs(void)
Definition: commit_ts.c:736
static void ActivateCommitTs(void)
Definition: commit_ts.c:662

References ActivateCommitTs(), CommitTimestampShared::commitTsActive, commitTsShared, and DeactivateCommitTs().

Referenced by xlog_redo().

◆ CommitTsShmemBuffers()

Size CommitTsShmemBuffers ( void  )

Definition at line 498 of file commit_ts.c.

499 {
500  return Min(256, Max(4, NBuffers / 256));
501 }
#define Min(x, y)
Definition: c.h:993
#define Max(x, y)
Definition: c.h:987
int NBuffers
Definition: globals.c:136

References Max, Min, and NBuffers.

Referenced by CommitTsShmemInit(), and CommitTsShmemSize().

◆ CommitTsShmemInit()

void CommitTsShmemInit ( void  )

Definition at line 518 of file commit_ts.c.

519 {
520  bool found;
521 
522  CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
524  CommitTsSLRULock, "pg_commit_ts",
528 
529  commitTsShared = ShmemInitStruct("CommitTs shared",
530  sizeof(CommitTimestampShared),
531  &found);
532 
533  if (!IsUnderPostmaster)
534  {
535  Assert(!found);
536 
541  }
542  else
543  Assert(found);
544 }
Size CommitTsShmemBuffers(void)
Definition: commit_ts.c:498
static bool CommitTsPagePrecedes(int page1, int page2)
Definition: commit_ts.c:921
#define TIMESTAMP_NOBEGIN(j)
Definition: timestamp.h:158
bool IsUnderPostmaster
Definition: globals.c:113
@ LWTRANCHE_COMMITTS_BUFFER
Definition: lwlock.h:182
#define InvalidRepOriginId
Definition: origin.h:33
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, LWLock *ctllock, const char *subdir, int tranche_id, SyncRequestHandler sync_handler)
Definition: slru.c:188
#define SlruPagePrecedesUnitTests(ctl, per_page)
Definition: slru.h:156
TimestampTz time
Definition: commit_ts.c:58
RepOriginId nodeid
Definition: commit_ts.c:59
CommitTimestampEntry dataLastCommit
Definition: commit_ts.c:94
TransactionId xidLastCommit
Definition: commit_ts.c:93
@ SYNC_HANDLER_COMMIT_TS
Definition: sync.h:39

References Assert(), COMMIT_TS_XACTS_PER_PAGE, CommitTimestampShared::commitTsActive, CommitTsCtl, CommitTsPagePrecedes(), commitTsShared, CommitTsShmemBuffers(), CommitTimestampShared::dataLastCommit, InvalidRepOriginId, InvalidTransactionId, IsUnderPostmaster, LWTRANCHE_COMMITTS_BUFFER, CommitTimestampEntry::nodeid, ShmemInitStruct(), SimpleLruInit(), SlruPagePrecedesUnitTests, SYNC_HANDLER_COMMIT_TS, CommitTimestampEntry::time, TIMESTAMP_NOBEGIN, and CommitTimestampShared::xidLastCommit.

Referenced by CreateSharedMemoryAndSemaphores().

◆ CommitTsShmemSize()

Size CommitTsShmemSize ( void  )

Definition at line 507 of file commit_ts.c.

508 {
510  sizeof(CommitTimestampShared);
511 }
struct CommitTimestampShared CommitTimestampShared
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:156

References CommitTsShmemBuffers(), and SimpleLruShmemSize().

Referenced by CalculateShmemSize().

◆ committssyncfiletag()

int committssyncfiletag ( const FileTag ftag,
char *  path 
)

Definition at line 1011 of file commit_ts.c.

1012 {
1013  return SlruSyncFileTag(CommitTsCtl, ftag, path);
1014 }
int SlruSyncFileTag(SlruCtl ctl, const FileTag *ftag, char *path)
Definition: slru.c:1594

References CommitTsCtl, and SlruSyncFileTag().

◆ CompleteCommitTsInitialization()

void CompleteCommitTsInitialization ( void  )

Definition at line 599 of file commit_ts.c.

600 {
601  /*
602  * If the feature is not enabled, turn it off for good. This also removes
603  * any leftover data.
604  *
605  * Conversely, we activate the module if the feature is enabled. This is
606  * necessary for primary and standby as the activation depends on the
607  * control file contents at the beginning of recovery or when a
608  * XLOG_PARAMETER_CHANGE is replayed.
609  */
612  else
614 }
bool track_commit_timestamp
Definition: commit_ts.c:102

References ActivateCommitTs(), DeactivateCommitTs(), and track_commit_timestamp.

Referenced by StartupXLOG().

◆ DeactivateCommitTs()

static void DeactivateCommitTs ( void  )
static

Definition at line 736 of file commit_ts.c.

737 {
738  /*
739  * Cleanup the status in the shared memory.
740  *
741  * We reset everything in the commitTsShared record to prevent user from
742  * getting confusing data about last committed transaction on the standby
743  * when the module was activated repeatedly on the primary.
744  */
745  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
746 
751 
754 
755  LWLockRelease(CommitTsLock);
756 
757  /*
758  * Remove *all* files. This is necessary so that there are no leftover
759  * files; in the case where this feature is later enabled after running
760  * with it disabled for some time there may be a gap in the file sequence.
761  * (We can probably tolerate out-of-sequence files, as they are going to
762  * be overwritten anyway when we wrap around, but it seems better to be
763  * tidy.)
764  */
765  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
767  LWLockRelease(CommitTsSLRULock);
768 }
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1531
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1554

References CommitTimestampShared::commitTsActive, CommitTsCtl, commitTsShared, CommitTimestampShared::dataLastCommit, InvalidRepOriginId, InvalidTransactionId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), VariableCacheData::newestCommitTsXid, CommitTimestampEntry::nodeid, VariableCacheData::oldestCommitTsXid, ShmemVariableCache, SlruScanDirCbDeleteAll(), SlruScanDirectory(), CommitTimestampEntry::time, TIMESTAMP_NOBEGIN, and CommitTimestampShared::xidLastCommit.

Referenced by CommitTsParameterChange(), and CompleteCommitTsInitialization().

◆ error_commit_ts_disabled()

static void error_commit_ts_disabled ( void  )
static

Definition at line 373 of file commit_ts.c.

374 {
375  ereport(ERROR,
376  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
377  errmsg("could not get commit timestamp data"),
379  errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
380  "track_commit_timestamp") :
381  errhint("Make sure the configuration parameter \"%s\" is set.",
382  "track_commit_timestamp")));
383 }
int errhint(const char *fmt,...)
Definition: elog.c:1316
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
bool RecoveryInProgress(void)
Definition: xlog.c:5948

References ereport, errcode(), errhint(), errmsg(), ERROR, and RecoveryInProgress().

Referenced by GetLatestCommitTsData(), and TransactionIdGetCommitTsData().

◆ ExtendCommitTs()

void ExtendCommitTs ( TransactionId  newestXact)

Definition at line 796 of file commit_ts.c.

797 {
798  int pageno;
799 
800  /*
801  * Nothing to do if module not enabled. Note we do an unlocked read of
802  * the flag here, which is okay because this routine is only called from
803  * GetNewTransactionId, which is never called in a standby.
804  */
805  Assert(!InRecovery);
807  return;
808 
809  /*
810  * No work except at first XID of a page. But beware: just after
811  * wraparound, the first XID of page zero is FirstNormalTransactionId.
812  */
813  if (TransactionIdToCTsEntry(newestXact) != 0 &&
815  return;
816 
817  pageno = TransactionIdToCTsPage(newestXact);
818 
819  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
820 
821  /* Zero the page and make an XLOG entry about it */
822  ZeroCommitTsPage(pageno, !InRecovery);
823 
824  LWLockRelease(CommitTsSLRULock);
825 }
#define TransactionIdToCTsEntry(xid)
Definition: commit_ts.c:70
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
bool InRecovery
Definition: xlogutils.c:53

References Assert(), CommitTimestampShared::commitTsActive, commitTsShared, FirstNormalTransactionId, InRecovery, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), TransactionIdEquals, TransactionIdToCTsEntry, TransactionIdToCTsPage, and ZeroCommitTsPage().

Referenced by GetNewTransactionId().

◆ GetLatestCommitTsData()

TransactionId GetLatestCommitTsData ( TimestampTz ts,
RepOriginId nodeid 
)

Definition at line 352 of file commit_ts.c.

353 {
354  TransactionId xid;
355 
356  LWLockAcquire(CommitTsLock, LW_SHARED);
357 
358  /* Error if module not enabled */
361 
363  if (ts)
365  if (nodeid)
367  LWLockRelease(CommitTsLock);
368 
369  return xid;
370 }
static void error_commit_ts_disabled(void)
Definition: commit_ts.c:373
@ LW_SHARED
Definition: lwlock.h:117

References CommitTimestampShared::commitTsActive, commitTsShared, CommitTimestampShared::dataLastCommit, error_commit_ts_disabled(), LW_SHARED, LWLockAcquire(), LWLockRelease(), CommitTimestampEntry::nodeid, CommitTimestampEntry::time, and CommitTimestampShared::xidLastCommit.

Referenced by pg_last_committed_xact().

◆ pg_last_committed_xact()

Datum pg_last_committed_xact ( PG_FUNCTION_ARGS  )

Definition at line 412 of file commit_ts.c.

413 {
414  TransactionId xid;
415  RepOriginId nodeid;
416  TimestampTz ts;
417  Datum values[3];
418  bool nulls[3];
419  TupleDesc tupdesc;
420  HeapTuple htup;
421 
422  /* and construct a tuple with our data */
423  xid = GetLatestCommitTsData(&ts, &nodeid);
424 
425  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
426  elog(ERROR, "return type must be a row type");
427 
428  if (!TransactionIdIsNormal(xid))
429  {
430  memset(nulls, true, sizeof(nulls));
431  }
432  else
433  {
434  values[0] = TransactionIdGetDatum(xid);
435  nulls[0] = false;
436 
437  values[1] = TimestampTzGetDatum(ts);
438  nulls[1] = false;
439 
440  values[2] = ObjectIdGetDatum((Oid) nodeid);
441  nulls[2] = false;
442  }
443 
444  htup = heap_form_tuple(tupdesc, values, nulls);
445 
447 }
static Datum values[MAXATTR]
Definition: bootstrap.c:156
TransactionId GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:352
int64 TimestampTz
Definition: timestamp.h:39
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:353
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:276
@ TYPEFUNC_COMPOSITE
Definition: funcapi.h:149
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
Definition: funcapi.h:230
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1108
static Datum TransactionIdGetDatum(TransactionId X)
Definition: postgres.h:272
uintptr_t Datum
Definition: postgres.h:64
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
unsigned int Oid
Definition: postgres_ext.h:31
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52

References elog(), ERROR, get_call_result_type(), GetLatestCommitTsData(), heap_form_tuple(), HeapTupleGetDatum(), ObjectIdGetDatum(), PG_RETURN_DATUM, TimestampTzGetDatum(), TransactionIdGetDatum(), TransactionIdIsNormal, TYPEFUNC_COMPOSITE, and values.

◆ pg_xact_commit_timestamp()

Datum pg_xact_commit_timestamp ( PG_FUNCTION_ARGS  )

Definition at line 389 of file commit_ts.c.

390 {
392  TimestampTz ts;
393  bool found;
394 
395  found = TransactionIdGetCommitTsData(xid, &ts, NULL);
396 
397  if (!found)
398  PG_RETURN_NULL();
399 
401 }
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:266
#define PG_RETURN_NULL()
Definition: fmgr.h:345
#define PG_GETARG_TRANSACTIONID(n)
Definition: fmgr.h:279
#define PG_RETURN_TIMESTAMPTZ(x)
Definition: timestamp.h:68

References PG_GETARG_TRANSACTIONID, PG_RETURN_NULL, PG_RETURN_TIMESTAMPTZ, and TransactionIdGetCommitTsData().

◆ pg_xact_commit_timestamp_origin()

Datum pg_xact_commit_timestamp_origin ( PG_FUNCTION_ARGS  )

Definition at line 456 of file commit_ts.c.

457 {
459  RepOriginId nodeid;
460  TimestampTz ts;
461  Datum values[2];
462  bool nulls[2];
463  TupleDesc tupdesc;
464  HeapTuple htup;
465  bool found;
466 
467  found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
468 
469  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
470  elog(ERROR, "return type must be a row type");
471 
472  if (!found)
473  {
474  memset(nulls, true, sizeof(nulls));
475  }
476  else
477  {
478  values[0] = TimestampTzGetDatum(ts);
479  nulls[0] = false;
480 
481  values[1] = ObjectIdGetDatum((Oid) nodeid);
482  nulls[1] = false;
483  }
484 
485  htup = heap_form_tuple(tupdesc, values, nulls);
486 
488 }

References elog(), ERROR, get_call_result_type(), heap_form_tuple(), HeapTupleGetDatum(), ObjectIdGetDatum(), PG_GETARG_TRANSACTIONID, PG_RETURN_DATUM, TimestampTzGetDatum(), TransactionIdGetCommitTsData(), TYPEFUNC_COMPOSITE, and values.

◆ SetCommitTsLimit()

void SetCommitTsLimit ( TransactionId  oldestXact,
TransactionId  newestXact 
)

Definition at line 860 of file commit_ts.c.

861 {
862  /*
863  * Be careful not to overwrite values that are either further into the
864  * "future" or signal a disabled committs.
865  */
866  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
868  {
873  }
874  else
875  {
879  }
880  LWLockRelease(CommitTsLock);
881 }

References Assert(), InvalidTransactionId, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), VariableCacheData::newestCommitTsXid, VariableCacheData::oldestCommitTsXid, ShmemVariableCache, and TransactionIdPrecedes().

Referenced by BootStrapXLOG(), and StartupXLOG().

◆ SetXidCommitTsInPage()

static void SetXidCommitTsInPage ( TransactionId  xid,
int  nsubxids,
TransactionId subxids,
TimestampTz  ts,
RepOriginId  nodeid,
int  pageno 
)
static

Definition at line 215 of file commit_ts.c.

218 {
219  int slotno;
220  int i;
221 
222  LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
223 
224  slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
225 
226  TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
227  for (i = 0; i < nsubxids; i++)
228  TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
229 
230  CommitTsCtl->shared->page_dirty[slotno] = true;
231 
232  LWLockRelease(CommitTsSLRULock);
233 }
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, RepOriginId nodeid, int slotno)
Definition: commit_ts.c:241
int i
Definition: isn.c:73
int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid)
Definition: slru.c:396

References CommitTsCtl, i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SimpleLruReadPage(), and TransactionIdSetCommitTs().

Referenced by TransactionTreeSetCommitTsData().

◆ StartupCommitTs()

void StartupCommitTs ( void  )

Definition at line 589 of file commit_ts.c.

590 {
592 }

References ActivateCommitTs().

Referenced by StartupXLOG().

◆ TransactionIdGetCommitTsData()

bool TransactionIdGetCommitTsData ( TransactionId  xid,
TimestampTz ts,
RepOriginId nodeid 
)

Definition at line 266 of file commit_ts.c.

268 {
269  int pageno = TransactionIdToCTsPage(xid);
270  int entryno = TransactionIdToCTsEntry(xid);
271  int slotno;
272  CommitTimestampEntry entry;
273  TransactionId oldestCommitTsXid;
274  TransactionId newestCommitTsXid;
275 
276  if (!TransactionIdIsValid(xid))
277  ereport(ERROR,
278  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
279  errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
280  else if (!TransactionIdIsNormal(xid))
281  {
282  /* frozen and bootstrap xids are always committed far in the past */
283  *ts = 0;
284  if (nodeid)
285  *nodeid = 0;
286  return false;
287  }
288 
289  LWLockAcquire(CommitTsLock, LW_SHARED);
290 
291  /* Error if module not enabled */
294 
295  /*
296  * If we're asked for the cached value, return that. Otherwise, fall
297  * through to read from SLRU.
298  */
299  if (commitTsShared->xidLastCommit == xid)
300  {
302  if (nodeid)
304 
305  LWLockRelease(CommitTsLock);
306  return *ts != 0;
307  }
308 
309  oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
310  newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
311  /* neither is invalid, or both are */
312  Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
313  LWLockRelease(CommitTsLock);
314 
315  /*
316  * Return empty if the requested value is outside our valid range.
317  */
318  if (!TransactionIdIsValid(oldestCommitTsXid) ||
319  TransactionIdPrecedes(xid, oldestCommitTsXid) ||
320  TransactionIdPrecedes(newestCommitTsXid, xid))
321  {
322  *ts = 0;
323  if (nodeid)
324  *nodeid = InvalidRepOriginId;
325  return false;
326  }
327 
328  /* lock is acquired by SimpleLruReadPage_ReadOnly */
329  slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
330  memcpy(&entry,
331  CommitTsCtl->shared->page_buffer[slotno] +
332  SizeOfCommitTimestampEntry * entryno,
334 
335  *ts = entry.time;
336  if (nodeid)
337  *nodeid = entry.nodeid;
338 
339  LWLockRelease(CommitTsSLRULock);
340  return *ts != 0;
341 }
#define SizeOfCommitTimestampEntry
Definition: commit_ts.c:62
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
Definition: slru.c:496
#define TransactionIdIsValid(xid)
Definition: transam.h:41

References Assert(), CommitTimestampShared::commitTsActive, CommitTsCtl, commitTsShared, CommitTimestampShared::dataLastCommit, ereport, errcode(), errmsg(), ERROR, error_commit_ts_disabled(), InvalidRepOriginId, LW_SHARED, LWLockAcquire(), LWLockRelease(), VariableCacheData::newestCommitTsXid, CommitTimestampEntry::nodeid, VariableCacheData::oldestCommitTsXid, ShmemVariableCache, SimpleLruReadPage_ReadOnly(), SizeOfCommitTimestampEntry, CommitTimestampEntry::time, TransactionIdIsNormal, TransactionIdIsValid, TransactionIdPrecedes(), TransactionIdToCTsEntry, TransactionIdToCTsPage, and CommitTimestampShared::xidLastCommit.

Referenced by pg_xact_commit_timestamp(), and pg_xact_commit_timestamp_origin().

◆ TransactionIdSetCommitTs()

static void TransactionIdSetCommitTs ( TransactionId  xid,
TimestampTz  ts,
RepOriginId  nodeid,
int  slotno 
)
static

Definition at line 241 of file commit_ts.c.

243 {
244  int entryno = TransactionIdToCTsEntry(xid);
245  CommitTimestampEntry entry;
246 
248 
249  entry.time = ts;
250  entry.nodeid = nodeid;
251 
252  memcpy(CommitTsCtl->shared->page_buffer[slotno] +
253  SizeOfCommitTimestampEntry * entryno,
255 }

References Assert(), CommitTsCtl, CommitTimestampEntry::nodeid, SizeOfCommitTimestampEntry, CommitTimestampEntry::time, TransactionIdIsNormal, and TransactionIdToCTsEntry.

Referenced by SetXidCommitTsInPage().

◆ TransactionTreeSetCommitTsData()

void TransactionTreeSetCommitTsData ( TransactionId  xid,
int  nsubxids,
TransactionId subxids,
TimestampTz  timestamp,
RepOriginId  nodeid 
)

Definition at line 134 of file commit_ts.c.

137 {
138  int i;
139  TransactionId headxid;
140  TransactionId newestXact;
141 
142  /*
143  * No-op if the module is not active.
144  *
145  * An unlocked read here is fine, because in a standby (the only place
146  * where the flag can change in flight) this routine is only called by the
147  * recovery process, which is also the only process which can change the
148  * flag.
149  */
151  return;
152 
153  /*
154  * Figure out the latest Xid in this batch: either the last subxid if
155  * there's any, otherwise the parent xid.
156  */
157  if (nsubxids > 0)
158  newestXact = subxids[nsubxids - 1];
159  else
160  newestXact = xid;
161 
162  /*
163  * We split the xids to set the timestamp to in groups belonging to the
164  * same SLRU page; the first element in each such set is its head. The
165  * first group has the main XID as the head; subsequent sets use the first
166  * subxid not on the previous page as head. This way, we only have to
167  * lock/modify each SLRU page once.
168  */
169  headxid = xid;
170  i = 0;
171  for (;;)
172  {
173  int pageno = TransactionIdToCTsPage(headxid);
174  int j;
175 
176  for (j = i; j < nsubxids; j++)
177  {
178  if (TransactionIdToCTsPage(subxids[j]) != pageno)
179  break;
180  }
181  /* subxids[i..j] are on the same page as the head */
182 
183  SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
184  pageno);
185 
186  /* if we wrote out all subxids, we're done. */
187  if (j >= nsubxids)
188  break;
189 
190  /*
191  * Set the new head and skip over it, as well as over the subxids we
192  * just wrote.
193  */
194  headxid = subxids[j];
195  i = j + 1;
196  }
197 
198  /* update the cached value in shared memory */
199  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
203 
204  /* and move forwards our endpoint, if needed */
207  LWLockRelease(CommitTsLock);
208 }
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, RepOriginId nodeid, int pageno)
Definition: commit_ts.c:215
int j
Definition: isn.c:74
int64 timestamp

References CommitTimestampShared::commitTsActive, commitTsShared, CommitTimestampShared::dataLastCommit, i, j, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), VariableCacheData::newestCommitTsXid, CommitTimestampEntry::nodeid, SetXidCommitTsInPage(), ShmemVariableCache, CommitTimestampEntry::time, TransactionIdPrecedes(), TransactionIdToCTsPage, and CommitTimestampShared::xidLastCommit.

Referenced by RecordTransactionCommit(), RecordTransactionCommitPrepared(), and xact_redo_commit().

◆ TruncateCommitTs()

void TruncateCommitTs ( TransactionId  oldestXact)

Definition at line 834 of file commit_ts.c.

835 {
836  int cutoffPage;
837 
838  /*
839  * The cutoff point is the start of the segment containing oldestXact. We
840  * pass the *page* containing oldestXact to SimpleLruTruncate.
841  */
842  cutoffPage = TransactionIdToCTsPage(oldestXact);
843 
844  /* Check to see if there's any files that could be removed */
846  &cutoffPage))
847  return; /* nothing to remove */
848 
849  /* Write XLOG record */
850  WriteTruncateXlogRec(cutoffPage, oldestXact);
851 
852  /* Now we can remove the old CommitTs segment(s) */
853  SimpleLruTruncate(CommitTsCtl, cutoffPage);
854 }
static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
Definition: commit_ts.c:951
bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void *data)
Definition: slru.c:1501

References CommitTsCtl, SimpleLruTruncate(), SlruScanDirCbReportPresence(), SlruScanDirectory(), TransactionIdToCTsPage, and WriteTruncateXlogRec().

Referenced by vac_truncate_clog().

◆ WriteTruncateXlogRec()

static void WriteTruncateXlogRec ( int  pageno,
TransactionId  oldestXid 
)
static

Definition at line 951 of file commit_ts.c.

952 {
953  xl_commit_ts_truncate xlrec;
954 
955  xlrec.pageno = pageno;
956  xlrec.oldestXid = oldestXid;
957 
958  XLogBeginInsert();
959  XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
960  (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
961 }
#define SizeOfCommitTsTruncate
Definition: commit_ts.h:67
void XLogRegisterData(char *data, uint32 len)
Definition: xloginsert.c:351
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:461
void XLogBeginInsert(void)
Definition: xloginsert.c:150

References COMMIT_TS_TRUNCATE, xl_commit_ts_truncate::oldestXid, xl_commit_ts_truncate::pageno, SizeOfCommitTsTruncate, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by TruncateCommitTs().

◆ WriteZeroPageXlogRec()

static void WriteZeroPageXlogRec ( int  pageno)
static

Definition at line 940 of file commit_ts.c.

941 {
942  XLogBeginInsert();
943  XLogRegisterData((char *) (&pageno), sizeof(int));
944  (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
945 }

References COMMIT_TS_ZEROPAGE, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by ZeroCommitTsPage().

◆ ZeroCommitTsPage()

static int ZeroCommitTsPage ( int  pageno,
bool  writeXlog 
)
static

Definition at line 572 of file commit_ts.c.

573 {
574  int slotno;
575 
576  slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
577 
578  if (writeXlog)
579  WriteZeroPageXlogRec(pageno);
580 
581  return slotno;
582 }
static void WriteZeroPageXlogRec(int pageno)
Definition: commit_ts.c:940
int SimpleLruZeroPage(SlruCtl ctl, int pageno)
Definition: slru.c:281

References CommitTsCtl, SimpleLruZeroPage(), and WriteZeroPageXlogRec().

Referenced by ActivateCommitTs(), commit_ts_redo(), and ExtendCommitTs().

Variable Documentation

◆ CommitTsCtlData

SlruCtlData CommitTsCtlData
static

Definition at line 76 of file commit_ts.c.

◆ commitTsShared

◆ track_commit_timestamp

bool track_commit_timestamp