112 #define TWOPHASE_DIR "pg_twophase"
226 bool fromdisk,
bool setParent,
bool setNextXid);
367 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
368 errmsg(
"transaction identifier \"%s\" is too long",
374 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
375 errmsg(
"prepared transactions are disabled"),
376 errhint(
"Set \"max_prepared_transactions\" to a nonzero value.")));
391 if (strcmp(gxact->
gid, gid) == 0)
395 errmsg(
"transaction identifier \"%s\" is already in use",
403 (
errcode(ERRCODE_OUT_OF_MEMORY),
404 errmsg(
"maximum number of prepared transactions reached"),
405 errhint(
"Increase \"max_prepared_transactions\" (currently %d).",
483 gxact->
owner = owner;
485 gxact->
valid =
false;
487 strcpy(gxact->
gid, gid);
573 if (strcmp(gxact->
gid, gid) != 0)
579 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
580 errmsg(
"prepared transaction with identifier \"%s\" is busy",
585 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
586 errmsg(
"permission denied to finish prepared transaction"),
587 errhint(
"Must be superuser or the user that prepared the transaction.")));
597 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
598 errmsg(
"prepared transaction belongs to another database"),
599 errhint(
"Connect to the database where the transaction was prepared to finish it.")));
613 (
errcode(ERRCODE_UNDEFINED_OBJECT),
614 errmsg(
"prepared transaction with identifier \"%s\" does not exist",
650 elog(
ERROR,
"failed to find %p in GlobalTransaction array", gxact);
685 for (
i = 0;
i < num;
i++)
737 TIMESTAMPTZOID, -1, 0);
814 if (xid == cached_xid)
824 if (gxact->
xid == xid)
835 elog(
ERROR,
"failed to find GlobalTransaction for xid %u", xid);
838 cached_gxact = result;
989 #define TWOPHASE_MAGIC 0x57F94534
1189 (
errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1190 errmsg(
"two-phase state file maximum length exceeded")));
1320 if (missing_ok && errno == ENOENT)
1325 errmsg(
"could not open file \"%s\": %m", path)));
1337 errmsg(
"could not stat file \"%s\": %m", path)));
1346 "incorrect size of file \"%s\": %lld bytes",
1351 if (crc_offset !=
MAXALIGN(crc_offset))
1354 errmsg(
"incorrect alignment of CRC offset for file \"%s\"",
1369 errmsg(
"could not read file \"%s\": %m", path)));
1372 (
errmsg(
"could not read file \"%s\": read %d of %lld",
1381 errmsg(
"could not close file \"%s\": %m", path)));
1387 errmsg(
"invalid magic number stored in file \"%s\"",
1393 errmsg(
"invalid size stored in file \"%s\"",
1405 errmsg(
"calculated CRC checksum does not match value stored in file \"%s\"",
1433 (
errcode(ERRCODE_OUT_OF_MEMORY),
1435 errdetail(
"Failed while allocating a WAL reading processor.")));
1445 errmsg(
"could not read two-phase state from WAL at %X/%X: %s",
1450 errmsg(
"could not read two-phase state from WAL at %X/%X",
1458 errmsg(
"expected two-phase state data is not present in WAL at %X/%X",
1600 gxact->
valid =
false;
1611 delrels = commitrels;
1616 delrels = abortrels;
1702 if (callbacks[record->
rmid] != NULL)
1703 callbacks[record->
rmid] (xid, record->
info,
1704 (
void *) bufptr, record->
len);
1723 if (errno != ENOENT || giveWarning)
1726 errmsg(
"could not remove file \"%s\": %m", path)));
1750 O_CREAT | O_TRUNC | O_WRONLY |
PG_BINARY);
1754 errmsg(
"could not recreate file \"%s\": %m", path)));
1766 errmsg(
"could not write file \"%s\": %m", path)));
1775 errmsg(
"could not write file \"%s\": %m", path)));
1787 errmsg(
"could not fsync file \"%s\": %m", path)));
1793 errmsg(
"could not close file \"%s\": %m", path)));
1819 int serialized_xacts = 0;
1824 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
1877 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
1882 "for a long-running prepared transaction",
1883 "%u two-phase state files were written "
1884 "for long-running prepared transactions",
1886 serialized_xacts)));
1907 if (strlen(clde->
d_name) == 16 &&
1908 strspn(clde->
d_name,
"0123456789ABCDEF") == 16)
1918 true,
false,
false);
1985 gxact->
ondisk,
false,
true);
1999 if (nxids == allocsize)
2008 allocsize = allocsize * 2;
2012 xids[nxids++] = xid;
2059 gxact->
ondisk,
true,
false);
2111 gxact->
ondisk,
true,
false);
2116 (
errmsg(
"recovering prepared transaction %u from shared memory", xid)));
2121 gid = (
const char *) bufptr;
2189 bool setParent,
bool setNextXid)
2209 (
errmsg(
"removing stale two-phase state file for transaction %u",
2216 (
errmsg(
"removing stale two-phase state from memory for transaction %u",
2229 (
errmsg(
"removing future two-phase state file for transaction %u",
2236 (
errmsg(
"removing future two-phase state from memory for transaction %u",
2261 errmsg(
"corrupted two-phase state file for transaction %u",
2266 errmsg(
"corrupted two-phase state in memory for transaction %u",
2341 nchildren, children, nrels, rels,
2343 ninvalmsgs, invalmsgs,
2428 elog(
PANIC,
"cannot abort transaction %u, it was already committed",
2439 nchildren, children,
2491 gid = (
const char *) bufptr;
2520 if (
access(path, F_OK) == 0)
2523 (
errmsg(
"could not recover two-phase state file for transaction %u",
2525 errdetail(
"Two-phase state file has been found in WAL record %X/%X, but this transaction has already been restored from disk.",
2530 if (errno != ENOENT)
2533 errmsg(
"could not access file \"%s\": %m", path)));
2539 (
errcode(ERRCODE_OUT_OF_MEMORY),
2540 errmsg(
"maximum number of prepared transactions reached"),
2541 errhint(
"Increase \"max_prepared_transactions\" (currently %d).",
2552 gxact->
valid =
false;
2555 strcpy(gxact->
gid, gid);
2568 elog(
DEBUG2,
"added 2PC data in shared memory for transaction %u", gxact->
xid);
2594 if (gxact->
xid == xid)
2611 elog(
DEBUG2,
"removing 2PC data for transaction %u", xid);
2645 if (gxact->
valid && strcmp(gxact->
gid, gid) == 0)
2698 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
2701 snprintf(gid_res, szgid,
"pg_gid_%u_%u", subid, xid);
2718 ret = sscanf(gid,
"pg_gid_%u_%u", &subid_from_gid, &xid_from_gid);
2724 if (ret != 2 || subid != subid_from_gid)
2734 return strcmp(gid, gid_tmp) == 0;
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
TimestampTz GetCurrentTimestamp(void)
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
#define Assert(condition)
#define FLEXIBLE_ARRAY_MEMBER
#define strtou64(str, endptr, base)
#define MemSet(start, val, len)
#define OidIsValid(objectId)
void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid)
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
int errmsg_internal(const char *fmt,...)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
TupleDesc BlessTupleDesc(TupleDesc tupdesc)
struct dirent * ReadDir(DIR *dir, const char *dirname)
int CloseTransientFile(int fd)
void fsync_fname(const char *fname, bool isdir)
int OpenTransientFile(const char *fileName, int fileFlags)
DIR * AllocateDir(const char *dirname)
#define SRF_IS_FIRSTCALL()
#define SRF_PERCALL_SETUP()
#define SRF_RETURN_NEXT(_funcctx, _result)
#define SRF_FIRSTCALL_INIT()
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
#define SRF_RETURN_DONE(_funcctx)
bool IsPostmasterEnvironment
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
static void dlist_init(dlist_head *head)
static void dlist_node_init(dlist_node *node)
int xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs, bool *RelcacheInitFileInval)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
#define VirtualTransactionIdIsValid(vxid)
#define GET_VXID_FROM_PGPROC(vxid_dst, proc)
#define LocalTransactionIdIsValid(lxid)
#define VirtualTransactionIdEquals(vxid1, vxid2)
bool LWLockHeldByMe(LWLock *lock)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
#define NUM_LOCK_PARTITIONS
void pfree(void *pointer)
void * palloc0(Size size)
void * repalloc(void *pointer, Size size)
void DropRelationFiles(RelFileLocator *delrels, int ndelrels, bool isRedo)
#define RESUME_INTERRUPTS()
#define AmStartupProcess()
#define START_CRIT_SECTION()
#define HOLD_INTERRUPTS()
#define END_CRIT_SECTION()
TimestampTz replorigin_session_origin_timestamp
void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
RepOriginId replorigin_session_origin
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
XLogRecPtr replorigin_session_origin_lsn
#define InvalidRepOriginId
#define ERRCODE_DATA_CORRUPTED
#define COMP_CRC32C(crc, data, len)
#define EQ_CRC32C(c1, c2)
void pgstat_execute_transactional_drops(int ndrops, struct xl_xact_stats_item *items, bool is_redo)
void AtEOXact_PgStat(bool isCommit, bool parallel)
int pgstat_get_transactional_drops(bool isCommit, xl_xact_stats_item **items)
static Datum TransactionIdGetDatum(TransactionId X)
static Datum ObjectIdGetDatum(Oid X)
void PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit)
static int fd(const char *x, int i)
#define GetPGProcByNumber(n)
#define PGPROC_MAX_CACHED_SUBXIDS
#define GetNumberFromPGProc(proc)
#define DELAY_CHKPT_START
void ProcArrayAdd(PGPROC *proc)
void ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
#define INVALID_PROC_NUMBER
MemoryContextSwitchTo(old_ctx)
void RelationCacheInitFilePostInvalidate(void)
void RelationCacheInitFilePreInvalidate(void)
Size add_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Size mul_size(Size s1, Size s2)
void SendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n)
static pg_noinline void Size size
PGPROC * PreparedXactProcs
void StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids)
int smgrGetPendingDeletes(bool forCommit, RelFileLocator **ptr)
#define ERRCODE_DUPLICATE_OBJECT
MemoryContext multi_call_memory_ctx
XLogRecPtr prepare_start_lsn
XLogRecPtr prepare_end_lsn
ProcNumber locking_backend
pg_atomic_uint64 waitStart
XidCacheStatus subxidStatus
dlist_head myProcLocks[NUM_LOCK_PARTITIONS]
ProcWaitStatus waitStatus
struct StateFileChunk * next
FullTransactionId nextXid
GlobalTransaction freeGXacts
GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER]
TransactionId xids[PGPROC_MAX_CACHED_SUBXIDS]
TimestampTz origin_timestamp
void SubTransSetParent(TransactionId xid, TransactionId parent)
bool superuser_arg(Oid roleid)
void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
TransactionId TransactionIdLatest(TransactionId mainxid, int nxids, const TransactionId *xids)
bool TransactionIdDidCommit(TransactionId transactionId)
void TransactionIdCommitTree(TransactionId xid, int nxids, TransactionId *xids)
void TransactionIdAbortTree(TransactionId xid, int nxids, TransactionId *xids)
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
bool TransactionIdDidAbort(TransactionId transactionId)
bool TransactionIdFollows(TransactionId id1, TransactionId id2)
bool TransactionIdFollowsOrEquals(TransactionId id1, TransactionId id2)
#define InvalidTransactionId
#define EpochFromFullTransactionId(x)
static FullTransactionId FullTransactionIdFromU64(uint64 value)
#define TransactionIdEquals(id1, id2)
#define XidFromFullTransactionId(x)
#define TransactionIdIsValid(xid)
static FullTransactionId FullTransactionIdFromEpochAndXid(uint32 epoch, TransactionId xid)
TupleDesc CreateTemplateTupleDesc(int natts)
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res, int szgid)
void RecoverPreparedTransactions(void)
static bool twophaseExitRegistered
void restoreTwoPhaseData(void)
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
Size TwoPhaseShmemSize(void)
static void RecordTransactionAbortPrepared(TransactionId xid, int nchildren, TransactionId *children, int nrels, RelFileLocator *rels, int nstats, xl_xact_stats_item *stats, const char *gid)
void RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info, const void *data, uint32 len)
static FullTransactionId AdjustToFullTransactionId(TransactionId xid)
static void RecordTransactionCommitPrepared(TransactionId xid, int nchildren, TransactionId *children, int nrels, RelFileLocator *rels, int nstats, xl_xact_stats_item *stats, int ninvalmsgs, SharedInvalidationMessage *invalmsgs, bool initfileinval, const char *gid)
static void RemoveGXact(GlobalTransaction gxact)
struct TwoPhaseStateData TwoPhaseStateData
static GlobalTransaction MyLockedGxact
static TwoPhaseStateData * TwoPhaseState
static void ProcessRecords(char *bufptr, TransactionId xid, const TwoPhaseCallback callbacks[])
void AtAbort_Twophase(void)
static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid, TimestampTz prepared_at, Oid owner, Oid databaseid)
struct GlobalTransactionData GlobalTransactionData
static void save_state_data(const void *data, uint32 len)
void FinishPreparedTransaction(const char *gid, bool isCommit)
struct TwoPhaseRecordOnDisk TwoPhaseRecordOnDisk
TransactionId TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid, bool *have_more)
static void GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts, TransactionId *children)
void PrepareRedoRemove(TransactionId xid, bool giveWarning)
Datum pg_prepared_xact(PG_FUNCTION_ARGS)
void EndPrepare(GlobalTransaction gxact)
static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
TransactionId PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
static char * ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
void StartPrepare(GlobalTransaction gxact)
static int GetPreparedTransactionList(GlobalTransaction *gxacts)
PGPROC * TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
ProcNumber TwoPhaseGetDummyProcNumber(TransactionId xid, bool lock_held)
void TwoPhaseShmemInit(void)
void PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn, RepOriginId origin_id)
static int TwoPhaseFilePath(char *path, TransactionId xid)
static GlobalTransaction TwoPhaseGetGXact(TransactionId xid, bool lock_held)
void StandbyRecoverPreparedTransactions(void)
static void AtProcExit_Twophase(int code, Datum arg)
static char * ProcessTwoPhaseBuffer(TransactionId xid, XLogRecPtr prepare_start_lsn, bool fromdisk, bool setParent, bool setNextXid)
static void MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
void PostPrepare_Twophase(void)
bool LookupGXactBySubid(Oid subid)
xl_xact_prepare TwoPhaseFileHeader
void CheckPointTwoPhase(XLogRecPtr redo_horizon)
struct StateFileChunk StateFileChunk
bool StandbyTransactionIdIsPrepared(TransactionId xid)
static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
GlobalTransaction MarkAsPreparing(TransactionId xid, const char *gid, TimestampTz prepared_at, Oid owner, Oid databaseid)
static GlobalTransaction LockGXact(const char *gid, Oid user)
static bool IsTwoPhaseTransactionGidForSubid(Oid subid, char *gid)
static struct xllist records
struct GlobalTransactionData * GlobalTransaction
const TwoPhaseCallback twophase_postcommit_callbacks[TWOPHASE_RM_MAX_ID+1]
const TwoPhaseCallback twophase_recover_callbacks[TWOPHASE_RM_MAX_ID+1]
const TwoPhaseCallback twophase_postabort_callbacks[TWOPHASE_RM_MAX_ID+1]
#define TWOPHASE_RM_MAX_ID
#define TWOPHASE_RM_END_ID
void(* TwoPhaseCallback)(TransactionId xid, uint16 info, void *recdata, uint32 len)
static Datum TimestampTzGetDatum(TimestampTz X)
void AdvanceNextFullTransactionIdPastXid(TransactionId xid)
TransamVariablesData * TransamVariables
static void pgstat_report_wait_start(uint32 wait_event_info)
static void pgstat_report_wait_end(void)
static const unsigned __int64 epoch
XLogRecPtr XactLogCommitRecord(TimestampTz commit_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileLocator *rels, int ndroppedstats, xl_xact_stats_item *droppedstats, int nmsgs, SharedInvalidationMessage *msgs, bool relcacheInval, int xactflags, TransactionId twophase_xid, const char *twophase_gid)
int xactGetCommittedChildren(TransactionId **ptr)
XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileLocator *rels, int ndroppedstats, xl_xact_stats_item *droppedstats, int xactflags, TransactionId twophase_xid, const char *twophase_gid)
#define XLOG_XACT_PREPARE
#define XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK
XLogRecPtr ProcLastRecPtr
bool RecoveryInProgress(void)
XLogRecPtr XactLastRecEnd
void XLogFlush(XLogRecPtr record)
#define XLOG_INCLUDE_ORIGIN
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
void XLogSetRecordFlags(uint8 flags)
void XLogRegisterData(const char *data, uint32 len)
void XLogBeginInsert(void)
void XLogEnsureRecordSpace(int max_block_id, int ndatas)
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
void XLogReaderFree(XLogReaderState *state)
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
#define XLogRecGetDataLen(decoder)
#define XLogRecGetInfo(decoder)
#define XLogRecGetRmid(decoder)
#define XLogRecGetData(decoder)
static XLogReaderState * xlogreader
void wal_segment_close(XLogReaderState *state)
void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)