113#define TWOPHASE_DIR "pg_twophase"
370 errmsg(
"transaction identifier \"%s\" is too long",
377 errmsg(
"prepared transactions are disabled"),
378 errhint(
"Set \"max_prepared_transactions\" to a nonzero value.")));
397 errmsg(
"transaction identifier \"%s\" is already in use",
406 errmsg(
"maximum number of prepared transactions reached"),
407 errhint(
"Increase \"max_prepared_transactions\" (currently %d).",
414 gxact->ondisk =
false;
485 gxact->prepared_at = prepared_at;
487 gxact->owner = owner;
489 gxact->valid =
false;
490 gxact->inredo =
false;
584 errmsg(
"prepared transaction with identifier \"%s\" is busy",
590 errmsg(
"permission denied to finish prepared transaction"),
591 errhint(
"Must be superuser or the user that prepared the transaction.")));
602 errmsg(
"prepared transaction belongs to another database"),
603 errhint(
"Connect to the database where the transaction was prepared to finish it.")));
618 errmsg(
"prepared transaction with identifier \"%s\" does not exist",
654 elog(
ERROR,
"failed to find %p in GlobalTransaction array",
gxact);
689 for (
i = 0;
i < num;
i++)
839 elog(
ERROR,
"failed to find GlobalTransaction for xid %u",
912 return gxact->pgprocno;
976#define TWOPHASE_MAGIC 0x57F94534
1177 errmsg(
"two-phase state file maximum length exceeded")));
1215 gxact->prepare_end_lsn);
1317 errmsg(
"could not open file \"%s\": %m", path)));
1329 errmsg(
"could not stat file \"%s\": %m", path)));
1338 "incorrect size of file \"%s\": %lld bytes",
1346 errmsg(
"incorrect alignment of CRC offset for file \"%s\"",
1361 errmsg(
"could not read file \"%s\": %m", path)));
1364 (
errmsg(
"could not read file \"%s\": read %d of %lld",
1373 errmsg(
"could not close file \"%s\": %m", path)));
1379 errmsg(
"invalid magic number stored in file \"%s\"",
1385 errmsg(
"invalid size stored in file \"%s\"",
1397 errmsg(
"calculated CRC checksum does not match value stored in file \"%s\"",
1427 errdetail(
"Failed while allocating a WAL reading processor.")));
1437 errmsg(
"could not read two-phase state from WAL at %X/%08X: %s",
1442 errmsg(
"could not read two-phase state from WAL at %X/%08X",
1450 errmsg(
"expected two-phase state data is not present in WAL at %X/%08X",
1597 gxact->valid =
false;
1661 ondisk =
gxact->ondisk;
1705 if (callbacks[record->
rmid] !=
NULL)
1706 callbacks[record->
rmid] (fxid, record->
info, bufptr, record->
len);
1732 errmsg(
"could not remove file \"%s\": %m", path)));
1760 errmsg(
"could not recreate file \"%s\": %m", path)));
1772 errmsg(
"could not write file \"%s\": %m", path)));
1781 errmsg(
"could not write file \"%s\": %m", path)));
1793 errmsg(
"could not fsync file \"%s\": %m", path)));
1799 errmsg(
"could not close file \"%s\": %m", path)));
1866 gxact->ondisk =
true;
1888 "for a long-running prepared transaction",
1889 "%u two-phase state files were written "
1890 "for long-running prepared transactions",
1914 strspn(
clde->d_name,
"0123456789ABCDEF") == 16)
1922 true,
false,
false);
1986 gxact->prepare_start_lsn,
1987 gxact->ondisk,
false,
true);
2002 if (
nxids == allocsize)
2011 allocsize = allocsize * 2;
2015 xids[
nxids++] = xid;
2058 gxact->prepare_start_lsn,
2059 gxact->ondisk,
true,
false);
2108 gxact->prepare_start_lsn,
2109 gxact->ondisk,
true,
false);
2114 (
errmsg(
"recovering prepared transaction %u of epoch %u from shared memory",
2122 gid = (
const char *) bufptr;
2141 gxact->inredo =
false;
2210 (
errmsg(
"removing stale two-phase state file for transaction %u of epoch %u",
2218 (
errmsg(
"removing stale two-phase state from memory for transaction %u of epoch %u",
2232 (
errmsg(
"removing future two-phase state file for transaction %u of epoch %u",
2240 (
errmsg(
"removing future two-phase state from memory for transaction %u of epoch %u",
2266 errmsg(
"corrupted two-phase state file for transaction %u of epoch %u",
2272 errmsg(
"corrupted two-phase state in memory for transaction %u of epoch %u",
2456 elog(
PANIC,
"cannot abort transaction %u, it was already committed",
2527 gid = (
const char *) bufptr;
2560 (
errmsg(
"could not recover two-phase state file for transaction %u",
2562 errdetail(
"Two-phase state file has been found in WAL record %X/%08X, but this transaction has already been restored from disk.",
2570 errmsg(
"could not access file \"%s\": %m", path)));
2577 errmsg(
"maximum number of prepared transactions reached"),
2578 errhint(
"Increase \"max_prepared_transactions\" (currently %d).",
2584 gxact->prepare_start_lsn = start_lsn;
2585 gxact->prepare_end_lsn = end_lsn;
2589 gxact->valid =
false;
2591 gxact->inredo =
true;
2605 elog(
DEBUG2,
"added 2PC data in shared memory for transaction %u of epoch %u",
2650 elog(
DEBUG2,
"removing 2PC data for transaction %u of epoch %u ",
2866 oldestRunningXid = xid;
2871 return oldestRunningXid;
#define pg_write_barrier()
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
TimestampTz GetCurrentTimestamp(void)
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
#define Assert(condition)
#define FLEXIBLE_ARRAY_MEMBER
#define MemSet(start, val, len)
#define OidIsValid(objectId)
void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, ReplOriginId nodeid)
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
int errmsg_internal(const char *fmt,...)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
TupleDesc BlessTupleDesc(TupleDesc tupdesc)
int CloseTransientFile(int fd)
void fsync_fname(const char *fname, bool isdir)
DIR * AllocateDir(const char *dirname)
struct dirent * ReadDir(DIR *dir, const char *dirname)
int OpenTransientFile(const char *fileName, int fileFlags)
#define palloc_object(type)
#define palloc_array(type, count)
#define palloc0_object(type)
#define SRF_IS_FIRSTCALL()
#define SRF_PERCALL_SETUP()
#define SRF_RETURN_NEXT(_funcctx, _result)
#define SRF_FIRSTCALL_INIT()
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
#define SRF_RETURN_DONE(_funcctx)
bool IsPostmasterEnvironment
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
static void dlist_init(dlist_head *head)
static void dlist_node_init(dlist_node *node)
#define INJECTION_POINT_CACHED(name, arg)
#define INJECTION_POINT_LOAD(name)
int xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs, bool *RelcacheInitFileInval)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
#define VirtualTransactionIdIsValid(vxid)
#define GET_VXID_FROM_PGPROC(vxid_dst, proc)
#define LocalTransactionIdIsValid(lxid)
#define VirtualTransactionIdEquals(vxid1, vxid2)
bool LWLockHeldByMe(LWLock *lock)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
#define NUM_LOCK_PARTITIONS
void * repalloc(void *pointer, Size size)
void pfree(void *pointer)
void DropRelationFiles(RelFileLocator *delrels, int ndelrels, bool isRedo)
#define RESUME_INTERRUPTS()
#define AmStartupProcess()
#define START_CRIT_SECTION()
#define HOLD_INTERRUPTS()
#define END_CRIT_SECTION()
ReplOriginXactState replorigin_xact_state
void replorigin_advance(ReplOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
#define InvalidReplOriginId
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
#define ERRCODE_DATA_CORRUPTED
#define COMP_CRC32C(crc, data, len)
#define EQ_CRC32C(c1, c2)
static char buf[DEFAULT_XLOG_SEG_SIZE]
void pgstat_execute_transactional_drops(int ndrops, struct xl_xact_stats_item *items, bool is_redo)
void AtEOXact_PgStat(bool isCommit, bool parallel)
int pgstat_get_transactional_drops(bool isCommit, xl_xact_stats_item **items)
static Datum TransactionIdGetDatum(TransactionId X)
static Datum ObjectIdGetDatum(Oid X)
void PredicateLockTwoPhaseFinish(FullTransactionId fxid, bool isCommit)
static int fd(const char *x, int i)
#define DELAY_CHKPT_IN_COMMIT
#define GetPGProcByNumber(n)
#define PGPROC_MAX_CACHED_SUBXIDS
#define GetNumberFromPGProc(proc)
#define DELAY_CHKPT_START
void ProcArrayAdd(PGPROC *proc)
void ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
#define INVALID_PROC_NUMBER
void RelationCacheInitFilePostInvalidate(void)
void RelationCacheInitFilePreInvalidate(void)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
void SendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n)
PGPROC * PreparedXactProcs
void StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids)
int smgrGetPendingDeletes(bool forCommit, RelFileLocator **ptr)
#define ERRCODE_DUPLICATE_OBJECT
XLogRecPtr prepare_start_lsn
XLogRecPtr prepare_end_lsn
ProcNumber locking_backend
pg_atomic_uint64 waitStart
XidCacheStatus subxidStatus
dlist_head myProcLocks[NUM_LOCK_PARTITIONS]
ProcWaitStatus waitStatus
TimestampTz origin_timestamp
struct StateFileChunk * next
FullTransactionId nextXid
GlobalTransaction freeGXacts
GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER]
TransactionId xids[PGPROC_MAX_CACHED_SUBXIDS]
TimestampTz origin_timestamp
void SubTransSetParent(TransactionId xid, TransactionId parent)
bool superuser_arg(Oid roleid)
void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
TransactionId TransactionIdLatest(TransactionId mainxid, int nxids, const TransactionId *xids)
bool TransactionIdDidCommit(TransactionId transactionId)
void TransactionIdCommitTree(TransactionId xid, int nxids, TransactionId *xids)
void TransactionIdAbortTree(TransactionId xid, int nxids, TransactionId *xids)
bool TransactionIdDidAbort(TransactionId transactionId)
static bool TransactionIdFollows(TransactionId id1, TransactionId id2)
#define FullTransactionIdEquals(a, b)
#define InvalidTransactionId
static FullTransactionId FullTransactionIdFromAllowableAt(FullTransactionId nextFullXid, TransactionId xid)
#define EpochFromFullTransactionId(x)
static FullTransactionId FullTransactionIdFromU64(uint64 value)
#define FullTransactionIdFollowsOrEquals(a, b)
#define TransactionIdEquals(id1, id2)
#define XidFromFullTransactionId(x)
#define TransactionIdIsValid(xid)
#define FullTransactionIdIsValid(x)
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
TupleDesc CreateTemplateTupleDesc(int natts)
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
static char * ReadTwoPhaseFile(FullTransactionId fxid, bool missing_ok)
static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
ProcNumber TwoPhaseGetDummyProcNumber(FullTransactionId fxid, bool lock_held)
TransactionId TwoPhaseGetOldestXidInCommit(void)
static void ProcessRecords(char *bufptr, FullTransactionId fxid, const TwoPhaseCallback callbacks[])
void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res, int szgid)
void RecoverPreparedTransactions(void)
static bool twophaseExitRegistered
void restoreTwoPhaseData(void)
static GlobalTransaction TwoPhaseGetGXact(FullTransactionId fxid, bool lock_held)
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
Size TwoPhaseShmemSize(void)
GlobalTransaction MarkAsPreparing(FullTransactionId fxid, const char *gid, TimestampTz prepared_at, Oid owner, Oid databaseid)
static void RecordTransactionAbortPrepared(TransactionId xid, int nchildren, TransactionId *children, int nrels, RelFileLocator *rels, int nstats, xl_xact_stats_item *stats, const char *gid)
void RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info, const void *data, uint32 len)
static FullTransactionId AdjustToFullTransactionId(TransactionId xid)
static void RecordTransactionCommitPrepared(TransactionId xid, int nchildren, TransactionId *children, int nrels, RelFileLocator *rels, int nstats, xl_xact_stats_item *stats, int ninvalmsgs, SharedInvalidationMessage *invalmsgs, bool initfileinval, const char *gid)
static void RemoveGXact(GlobalTransaction gxact)
void PrepareRedoAdd(FullTransactionId fxid, char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn, ReplOriginId origin_id)
static void RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning)
static GlobalTransaction MyLockedGxact
static TwoPhaseStateData * TwoPhaseState
void AtAbort_Twophase(void)
static void save_state_data(const void *data, uint32 len)
void FinishPreparedTransaction(const char *gid, bool isCommit)
TransactionId TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid, bool *have_more)
static char * ProcessTwoPhaseBuffer(FullTransactionId fxid, XLogRecPtr prepare_start_lsn, bool fromdisk, bool setParent, bool setNextXid)
static void GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts, TransactionId *children)
void PrepareRedoRemove(TransactionId xid, bool giveWarning)
Datum pg_prepared_xact(PG_FUNCTION_ARGS)
void EndPrepare(GlobalTransaction gxact)
TransactionId PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
void StartPrepare(GlobalTransaction gxact)
static int GetPreparedTransactionList(GlobalTransaction *gxacts)
void TwoPhaseShmemInit(void)
void StandbyRecoverPreparedTransactions(void)
static void RecreateTwoPhaseFile(FullTransactionId fxid, void *content, int len)
static void AtProcExit_Twophase(int code, Datum arg)
static void PrepareRedoRemoveFull(FullTransactionId fxid, bool giveWarning)
static int TwoPhaseFilePath(char *path, FullTransactionId fxid)
static void MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
void PostPrepare_Twophase(void)
bool LookupGXactBySubid(Oid subid)
PGPROC * TwoPhaseGetDummyProc(FullTransactionId fxid, bool lock_held)
xl_xact_prepare TwoPhaseFileHeader
void CheckPointTwoPhase(XLogRecPtr redo_horizon)
bool StandbyTransactionIdIsPrepared(TransactionId xid)
static GlobalTransaction LockGXact(const char *gid, Oid user)
static void MarkAsPreparingGuts(GlobalTransaction gxact, FullTransactionId fxid, const char *gid, TimestampTz prepared_at, Oid owner, Oid databaseid)
static bool IsTwoPhaseTransactionGidForSubid(Oid subid, char *gid)
static struct xllist records
struct GlobalTransactionData * GlobalTransaction
const TwoPhaseCallback twophase_postcommit_callbacks[TWOPHASE_RM_MAX_ID+1]
const TwoPhaseCallback twophase_recover_callbacks[TWOPHASE_RM_MAX_ID+1]
const TwoPhaseCallback twophase_postabort_callbacks[TWOPHASE_RM_MAX_ID+1]
void(* TwoPhaseCallback)(FullTransactionId fxid, uint16 info, void *recdata, uint32 len)
#define TWOPHASE_RM_MAX_ID
#define TWOPHASE_RM_END_ID
static Datum TimestampTzGetDatum(TimestampTz X)
FullTransactionId ReadNextFullTransactionId(void)
void AdvanceNextFullTransactionIdPastXid(TransactionId xid)
TransamVariablesData * TransamVariables
static void pgstat_report_wait_start(uint32 wait_event_info)
static void pgstat_report_wait_end(void)
XLogRecPtr XactLogCommitRecord(TimestampTz commit_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileLocator *rels, int ndroppedstats, xl_xact_stats_item *droppedstats, int nmsgs, SharedInvalidationMessage *msgs, bool relcacheInval, int xactflags, TransactionId twophase_xid, const char *twophase_gid)
int xactGetCommittedChildren(TransactionId **ptr)
XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileLocator *rels, int ndroppedstats, xl_xact_stats_item *droppedstats, int xactflags, TransactionId twophase_xid, const char *twophase_gid)
#define XLOG_XACT_PREPARE
#define XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK
XLogRecPtr ProcLastRecPtr
bool RecoveryInProgress(void)
XLogRecPtr XactLastRecEnd
void XLogFlush(XLogRecPtr record)
#define XLOG_INCLUDE_ORIGIN
#define XLogRecPtrIsValid(r)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
void XLogRegisterData(const void *data, uint32 len)
void XLogSetRecordFlags(uint8 flags)
void XLogBeginInsert(void)
void XLogEnsureRecordSpace(int max_block_id, int ndatas)
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
void XLogReaderFree(XLogReaderState *state)
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
#define XLogRecGetDataLen(decoder)
#define XLogRecGetInfo(decoder)
#define XLogRecGetRmid(decoder)
#define XLogRecGetData(decoder)
static XLogReaderState * xlogreader
void wal_segment_close(XLogReaderState *state)
void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)