115 #define TWOPHASE_DIR "pg_twophase"
230 bool fromdisk,
bool setParent,
bool setNextXid);
385 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
386 errmsg(
"transaction identifier \"%s\" is too long",
392 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
393 errmsg(
"prepared transactions are disabled"),
394 errhint(
"Set max_prepared_transactions to a nonzero value.")));
409 if (strcmp(gxact->
gid, gid) == 0)
413 errmsg(
"transaction identifier \"%s\" is already in use",
421 (
errcode(ERRCODE_OUT_OF_MEMORY),
422 errmsg(
"maximum number of prepared transactions reached"),
423 errhint(
"Increase max_prepared_transactions (currently %d).",
502 gxact->
owner = owner;
504 gxact->
valid =
false;
506 strcpy(gxact->
gid, gid);
592 if (strcmp(gxact->
gid, gid) != 0)
598 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
599 errmsg(
"prepared transaction with identifier \"%s\" is busy",
604 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
605 errmsg(
"permission denied to finish prepared transaction"),
606 errhint(
"Must be superuser or the user that prepared the transaction.")));
616 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
617 errmsg(
"prepared transaction belongs to another database"),
618 errhint(
"Connect to the database where the transaction was prepared to finish it.")));
632 (
errcode(ERRCODE_UNDEFINED_OBJECT),
633 errmsg(
"prepared transaction with identifier \"%s\" does not exist",
669 elog(
ERROR,
"failed to find %p in GlobalTransaction array", gxact);
704 for (
i = 0;
i < num;
i++)
756 TIMESTAMPTZOID, -1, 0);
833 if (xid == cached_xid)
843 if (gxact->
xid == xid)
854 elog(
ERROR,
"failed to find GlobalTransaction for xid %u", xid);
857 cached_gxact = result;
945 #define TwoPhaseFilePath(path, xid) \
946 snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
967 #define TWOPHASE_MAGIC 0x57F94534
1167 (
errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1168 errmsg(
"two-phase state file maximum length exceeded")));
1298 if (missing_ok && errno == ENOENT)
1303 errmsg(
"could not open file \"%s\": %m", path)));
1315 errmsg(
"could not stat file \"%s\": %m", path)));
1324 "incorrect size of file \"%s\": %lld bytes",
1329 if (crc_offset !=
MAXALIGN(crc_offset))
1332 errmsg(
"incorrect alignment of CRC offset for file \"%s\"",
1347 errmsg(
"could not read file \"%s\": %m", path)));
1350 (
errmsg(
"could not read file \"%s\": read %d of %lld",
1359 errmsg(
"could not close file \"%s\": %m", path)));
1365 errmsg(
"invalid magic number stored in file \"%s\"",
1371 errmsg(
"invalid size stored in file \"%s\"",
1383 errmsg(
"calculated CRC checksum does not match value stored in file \"%s\"",
1411 (
errcode(ERRCODE_OUT_OF_MEMORY),
1413 errdetail(
"Failed while allocating a WAL reading processor.")));
1423 errmsg(
"could not read two-phase state from WAL at %X/%X: %s",
1428 errmsg(
"could not read two-phase state from WAL at %X/%X",
1436 errmsg(
"expected two-phase state data is not present in WAL at %X/%X",
1578 gxact->
valid =
false;
1589 delrels = commitrels;
1594 delrels = abortrels;
1680 if (callbacks[record->
rmid] != NULL)
1681 callbacks[record->
rmid] (xid, record->
info,
1682 (
void *) bufptr, record->
len);
1701 if (errno != ENOENT || giveWarning)
1704 errmsg(
"could not remove file \"%s\": %m", path)));
1728 O_CREAT | O_TRUNC | O_WRONLY |
PG_BINARY);
1732 errmsg(
"could not recreate file \"%s\": %m", path)));
1744 errmsg(
"could not write file \"%s\": %m", path)));
1753 errmsg(
"could not write file \"%s\": %m", path)));
1765 errmsg(
"could not fsync file \"%s\": %m", path)));
1771 errmsg(
"could not close file \"%s\": %m", path)));
1797 int serialized_xacts = 0;
1802 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
1855 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
1860 "for a long-running prepared transaction",
1861 "%u two-phase state files were written "
1862 "for long-running prepared transactions",
1864 serialized_xacts)));
1885 if (strlen(clde->
d_name) == 8 &&
1886 strspn(clde->
d_name,
"0123456789ABCDEF") == 8)
1894 true,
false,
false);
1961 gxact->
ondisk,
false,
true);
1975 if (nxids == allocsize)
1984 allocsize = allocsize * 2;
1988 xids[nxids++] = xid;
2036 gxact->
ondisk,
false,
false);
2088 gxact->
ondisk,
true,
false);
2093 (
errmsg(
"recovering prepared transaction %u from shared memory", xid)));
2098 gid = (
const char *) bufptr;
2166 bool setParent,
bool setNextXid)
2186 (
errmsg(
"removing stale two-phase state file for transaction %u",
2193 (
errmsg(
"removing stale two-phase state from memory for transaction %u",
2206 (
errmsg(
"removing future two-phase state file for transaction %u",
2213 (
errmsg(
"removing future two-phase state from memory for transaction %u",
2238 errmsg(
"corrupted two-phase state file for transaction %u",
2243 errmsg(
"corrupted two-phase state in memory for transaction %u",
2318 nchildren, children, nrels, rels,
2320 ninvalmsgs, invalmsgs,
2405 elog(
PANIC,
"cannot abort transaction %u, it was already committed",
2416 nchildren, children,
2468 gid = (
const char *) bufptr;
2497 if (
access(path, F_OK) == 0)
2500 (
errmsg(
"could not recover two-phase state file for transaction %u",
2502 errdetail(
"Two-phase state file has been found in WAL record %X/%X, but this transaction has already been restored from disk.",
2507 if (errno != ENOENT)
2510 errmsg(
"could not access file \"%s\": %m", path)));
2516 (
errcode(ERRCODE_OUT_OF_MEMORY),
2517 errmsg(
"maximum number of prepared transactions reached"),
2518 errhint(
"Increase max_prepared_transactions (currently %d).",
2529 gxact->
valid =
false;
2532 strcpy(gxact->
gid, gid);
2545 elog(
DEBUG2,
"added 2PC data in shared memory for transaction %u", gxact->
xid);
2571 if (gxact->
xid == xid)
2588 elog(
DEBUG2,
"removing 2PC data for transaction %u", xid);
2622 if (gxact->
valid && strcmp(gxact->
gid, gid) == 0)
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
TimestampTz GetCurrentTimestamp(void)
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
#define FLEXIBLE_ARRAY_MEMBER
#define MemSet(start, val, len)
void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid)
elog(ERROR, "%s: %s", p2, msg)
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
TupleDesc BlessTupleDesc(TupleDesc tupdesc)
struct dirent * ReadDir(DIR *dir, const char *dirname)
int CloseTransientFile(int fd)
void fsync_fname(const char *fname, bool isdir)
int OpenTransientFile(const char *fileName, int fileFlags)
DIR * AllocateDir(const char *dirname)
#define SRF_IS_FIRSTCALL()
#define SRF_PERCALL_SETUP()
#define SRF_RETURN_NEXT(_funcctx, _result)
#define SRF_FIRSTCALL_INIT()
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
#define SRF_RETURN_DONE(_funcctx)
bool IsPostmasterEnvironment
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
static void dlist_init(dlist_head *head)
static void dlist_node_init(dlist_node *node)
int xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs, bool *RelcacheInitFileInval)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Assert(fmt[strlen(fmt) - 1] !='\n')
#define VirtualTransactionIdIsValid(vxid)
#define LocalTransactionIdIsValid(lxid)
#define VirtualTransactionIdEquals(vxid1, vxid2)
#define GET_VXID_FROM_PGPROC(vxid, proc)
bool LWLockHeldByMe(LWLock *lock)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
#define NUM_LOCK_PARTITIONS
void pfree(void *pointer)
void * palloc0(Size size)
void * repalloc(void *pointer, Size size)
void DropRelationFiles(RelFileLocator *delrels, int ndelrels, bool isRedo)
#define RESUME_INTERRUPTS()
#define AmStartupProcess()
#define START_CRIT_SECTION()
#define HOLD_INTERRUPTS()
#define END_CRIT_SECTION()
TimestampTz replorigin_session_origin_timestamp
void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
RepOriginId replorigin_session_origin
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
XLogRecPtr replorigin_session_origin_lsn
#define InvalidRepOriginId
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
#define ERRCODE_DATA_CORRUPTED
#define COMP_CRC32C(crc, data, len)
#define EQ_CRC32C(c1, c2)
void pgstat_execute_transactional_drops(int ndrops, struct xl_xact_stats_item *items, bool is_redo)
void AtEOXact_PgStat(bool isCommit, bool parallel)
int pgstat_get_transactional_drops(bool isCommit, xl_xact_stats_item **items)
static Datum TransactionIdGetDatum(TransactionId X)
static Datum ObjectIdGetDatum(Oid X)
void PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit)
static int fd(const char *x, int i)
#define PGPROC_MAX_CACHED_SUBXIDS
#define DELAY_CHKPT_START
void ProcArrayAdd(PGPROC *proc)
void ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
void RelationCacheInitFilePostInvalidate(void)
void RelationCacheInitFilePreInvalidate(void)
Size add_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Size mul_size(Size s1, Size s2)
void SendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n)
PGPROC * PreparedXactProcs
void StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids)
int smgrGetPendingDeletes(bool forCommit, RelFileLocator **ptr)
#define ERRCODE_DUPLICATE_OBJECT
MemoryContext multi_call_memory_ctx
XLogRecPtr prepare_start_lsn
XLogRecPtr prepare_end_lsn
BackendId locking_backend
pg_atomic_uint64 waitStart
XidCacheStatus subxidStatus
dlist_head myProcLocks[NUM_LOCK_PARTITIONS]
ProcWaitStatus waitStatus
struct StateFileChunk * next
GlobalTransaction freeGXacts
GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER]
FullTransactionId nextXid
TransactionId xids[PGPROC_MAX_CACHED_SUBXIDS]
TimestampTz origin_timestamp
void SubTransSetParent(TransactionId xid, TransactionId parent)
bool superuser_arg(Oid roleid)
void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
TransactionId TransactionIdLatest(TransactionId mainxid, int nxids, const TransactionId *xids)
bool TransactionIdDidCommit(TransactionId transactionId)
void TransactionIdCommitTree(TransactionId xid, int nxids, TransactionId *xids)
void TransactionIdAbortTree(TransactionId xid, int nxids, TransactionId *xids)
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
bool TransactionIdDidAbort(TransactionId transactionId)
bool TransactionIdFollows(TransactionId id1, TransactionId id2)
bool TransactionIdFollowsOrEquals(TransactionId id1, TransactionId id2)
#define InvalidTransactionId
#define TransactionIdEquals(id1, id2)
#define XidFromFullTransactionId(x)
#define TransactionIdIsValid(xid)
TupleDesc CreateTemplateTupleDesc(int natts)
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
void RecoverPreparedTransactions(void)
static bool twophaseExitRegistered
void restoreTwoPhaseData(void)
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
Size TwoPhaseShmemSize(void)
static void RecordTransactionAbortPrepared(TransactionId xid, int nchildren, TransactionId *children, int nrels, RelFileLocator *rels, int nstats, xl_xact_stats_item *stats, const char *gid)
void RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info, const void *data, uint32 len)
static void RecordTransactionCommitPrepared(TransactionId xid, int nchildren, TransactionId *children, int nrels, RelFileLocator *rels, int nstats, xl_xact_stats_item *stats, int ninvalmsgs, SharedInvalidationMessage *invalmsgs, bool initfileinval, const char *gid)
static void RemoveGXact(GlobalTransaction gxact)
struct TwoPhaseStateData TwoPhaseStateData
static GlobalTransaction MyLockedGxact
static TwoPhaseStateData * TwoPhaseState
static void ProcessRecords(char *bufptr, TransactionId xid, const TwoPhaseCallback callbacks[])
void AtAbort_Twophase(void)
BackendId TwoPhaseGetDummyBackendId(TransactionId xid, bool lock_held)
static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid, TimestampTz prepared_at, Oid owner, Oid databaseid)
struct GlobalTransactionData GlobalTransactionData
static void save_state_data(const void *data, uint32 len)
void FinishPreparedTransaction(const char *gid, bool isCommit)
#define TwoPhaseFilePath(path, xid)
struct TwoPhaseRecordOnDisk TwoPhaseRecordOnDisk
TransactionId TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid, bool *have_more)
static void GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts, TransactionId *children)
void PrepareRedoRemove(TransactionId xid, bool giveWarning)
Datum pg_prepared_xact(PG_FUNCTION_ARGS)
void EndPrepare(GlobalTransaction gxact)
static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
TransactionId PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
static char * ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
void StartPrepare(GlobalTransaction gxact)
static int GetPreparedTransactionList(GlobalTransaction *gxacts)
PGPROC * TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
void TwoPhaseShmemInit(void)
void PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn, RepOriginId origin_id)
static GlobalTransaction TwoPhaseGetGXact(TransactionId xid, bool lock_held)
void StandbyRecoverPreparedTransactions(void)
static void AtProcExit_Twophase(int code, Datum arg)
static char * ProcessTwoPhaseBuffer(TransactionId xid, XLogRecPtr prepare_start_lsn, bool fromdisk, bool setParent, bool setNextXid)
static void MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
void PostPrepare_Twophase(void)
xl_xact_prepare TwoPhaseFileHeader
void CheckPointTwoPhase(XLogRecPtr redo_horizon)
struct StateFileChunk StateFileChunk
bool StandbyTransactionIdIsPrepared(TransactionId xid)
static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
GlobalTransaction MarkAsPreparing(TransactionId xid, const char *gid, TimestampTz prepared_at, Oid owner, Oid databaseid)
static GlobalTransaction LockGXact(const char *gid, Oid user)
static struct xllist records
struct GlobalTransactionData * GlobalTransaction
const TwoPhaseCallback twophase_postcommit_callbacks[TWOPHASE_RM_MAX_ID+1]
const TwoPhaseCallback twophase_recover_callbacks[TWOPHASE_RM_MAX_ID+1]
const TwoPhaseCallback twophase_postabort_callbacks[TWOPHASE_RM_MAX_ID+1]
#define TWOPHASE_RM_MAX_ID
#define TWOPHASE_RM_END_ID
void(* TwoPhaseCallback)(TransactionId xid, uint16 info, void *recdata, uint32 len)
static Datum TimestampTzGetDatum(TimestampTz X)
void AdvanceNextFullTransactionIdPastXid(TransactionId xid)
VariableCache ShmemVariableCache
static void pgstat_report_wait_start(uint32 wait_event_info)
static void pgstat_report_wait_end(void)
XLogRecPtr XactLogCommitRecord(TimestampTz commit_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileLocator *rels, int ndroppedstats, xl_xact_stats_item *droppedstats, int nmsgs, SharedInvalidationMessage *msgs, bool relcacheInval, int xactflags, TransactionId twophase_xid, const char *twophase_gid)
int xactGetCommittedChildren(TransactionId **ptr)
XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileLocator *rels, int ndroppedstats, xl_xact_stats_item *droppedstats, int xactflags, TransactionId twophase_xid, const char *twophase_gid)
#define XLOG_XACT_PREPARE
#define XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK
XLogRecPtr ProcLastRecPtr
bool RecoveryInProgress(void)
XLogRecPtr XactLastRecEnd
void XLogFlush(XLogRecPtr record)
#define XLOG_INCLUDE_ORIGIN
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr
void XLogRegisterData(char *data, uint32 len)
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
void XLogSetRecordFlags(uint8 flags)
void XLogBeginInsert(void)
void XLogEnsureRecordSpace(int max_block_id, int ndatas)
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
void XLogReaderFree(XLogReaderState *state)
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
#define XLogRecGetDataLen(decoder)
#define XLogRecGetInfo(decoder)
#define XLogRecGetRmid(decoder)
#define XLogRecGetData(decoder)
static XLogReaderState * xlogreader
void wal_segment_close(XLogReaderState *state)
void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)