33 #include "utils/fmgrprotos.h"
60 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
63 #define COMMIT_TS_XACTS_PER_PAGE \
64 (BLCKSZ / SizeOfCommitTimestampEntry)
77 #define TransactionIdToCTsEntry(xid) \
78 ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
85 #define CommitTsCtl (&CommitTsCtlData)
165 newestXact = subxids[nsubxids - 1];
183 for (
j =
i;
j < nsubxids;
j++)
201 headxid = subxids[
j];
235 for (
i = 0;
i < nsubxids;
i++)
286 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
287 errmsg(
"cannot retrieve commit timestamp for transaction %u", xid)));
384 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
385 errmsg(
"could not get commit timestamp data"),
387 errhint(
"Make sure the configuration parameter \"%s\" is set on the primary server.",
388 "track_commit_timestamp") :
389 errhint(
"Make sure the configuration parameter \"%s\" is set.",
390 "track_commit_timestamp")));
434 elog(
ERROR,
"return type must be a row type");
438 memset(nulls,
true,
sizeof(nulls));
478 elog(
ERROR,
"return type must be a row type");
482 memset(nulls,
true,
sizeof(nulls));
1056 elog(
PANIC,
"commit_ts_redo: unknown op code %u", info);
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
static Datum values[MAXATTR]
#define Assert(condition)
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, RepOriginId nodeid, int64 pageno)
static void WriteZeroPageXlogRec(int64 pageno)
void StartupCommitTs(void)
static SlruCtlData CommitTsCtlData
Datum pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
struct CommitTimestampEntry CommitTimestampEntry
struct CommitTimestampShared CommitTimestampShared
Datum pg_last_committed_xact(PG_FUNCTION_ARGS)
TransactionId GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
void CommitTsParameterChange(bool newvalue, bool oldvalue)
#define COMMIT_TS_XACTS_PER_PAGE
#define TransactionIdToCTsEntry(xid)
static void DeactivateCommitTs(void)
Size CommitTsShmemSize(void)
bool track_commit_timestamp
void AdvanceOldestCommitTsXid(TransactionId oldestXact)
static CommitTimestampShared * commitTsShared
int committssyncfiletag(const FileTag *ftag, char *path)
void CompleteCommitTsInitialization(void)
bool check_commit_ts_buffers(int *newval, void **extra, GucSource source)
static void ActivateCommitTs(void)
static int64 TransactionIdToCTsPage(TransactionId xid)
void TruncateCommitTs(TransactionId oldestXact)
void commit_ts_redo(XLogReaderState *record)
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, RepOriginId nodeid, int slotno)
Datum pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
static int CommitTsShmemBuffers(void)
static void error_commit_ts_disabled(void)
static bool CommitTsPagePrecedes(int64 page1, int64 page2)
#define SizeOfCommitTimestampEntry
void BootStrapCommitTs(void)
void CommitTsShmemInit(void)
void SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
void ExtendCommitTs(TransactionId newestXact)
void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid)
static int ZeroCommitTsPage(int64 pageno, bool writeXlog)
static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid)
void CheckPointCommitTs(void)
#define COMMIT_TS_ZEROPAGE
#define SizeOfCommitTsTruncate
#define COMMIT_TS_TRUNCATE
#define TIMESTAMP_NOBEGIN(j)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
#define PG_GETARG_TRANSACTIONID(n)
#define PG_RETURN_DATUM(x)
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
int commit_timestamp_buffers
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
@ LWTRANCHE_COMMITTS_BUFFER
@ LWTRANCHE_COMMITTS_SLRU
#define InvalidRepOriginId
static rewind_source * source
static Datum TransactionIdGetDatum(TransactionId X)
static Datum ObjectIdGetDatum(Oid X)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
void SimpleLruWritePage(SlruCtl ctl, int slotno)
void SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
int SimpleLruAutotuneBuffers(int divisor, int max)
bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int64 pageno)
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, TransactionId xid)
int SlruSyncFileTag(SlruCtl ctl, const FileTag *ftag, char *path)
int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
Size SimpleLruShmemSize(int nslots, int nlsns)
bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int64 segpage, void *data)
bool check_slru_buffers(const char *name, int *newval)
static LWLock * SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)
#define SlruPagePrecedesUnitTests(ctl, per_page)
#define SLRU_MAX_ALLOWED_BUFFERS
CommitTimestampEntry dataLastCommit
TransactionId xidLastCommit
TransactionId oldestCommitTsXid
TransactionId newestCommitTsXid
FullTransactionId nextXid
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
static TransactionId ReadNextTransactionId(void)
#define InvalidTransactionId
#define TransactionIdEquals(id1, id2)
#define XidFromFullTransactionId(x)
#define FirstNormalTransactionId
#define TransactionIdIsValid(xid)
#define TransactionIdIsNormal(xid)
static Datum TimestampTzGetDatum(TimestampTz X)
#define PG_RETURN_TIMESTAMPTZ(x)
TransamVariablesData * TransamVariables
bool RecoveryInProgress(void)
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
void XLogRegisterData(const char *data, uint32 len)
void XLogBeginInsert(void)
#define XLogRecGetInfo(decoder)
#define XLogRecGetData(decoder)
#define XLogRecHasAnyBlockRefs(decoder)