29 #include "utils/fmgrprotos.h"
32 #include "utils/wait_event_types.h"
158 Latch **wakeUpProcLatches;
159 int numWakeUpProcs = 0;
175 procInfo->
waitLSN > currentLSN)
178 wakeUpProcLatches[numWakeUpProcs++] = procInfo->
latch;
193 for (
i = 0;
i < numWakeUpProcs;
i++)
197 pfree(wakeUpProcLatches);
245 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
246 errmsg(
"recovery is not in progress"),
247 errhint(
"Waiting for LSN can only be executed during recovery.")));
282 if (targetLSN <= currentLSN)
285 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
286 errmsg(
"recovery is not in progress"),
287 errdetail(
"Recovery ended before replaying target LSN %X/%X; last replay LSN %X/%X.",
295 if (targetLSN <= currentLSN)
314 WAIT_EVENT_WAIT_FOR_WAL_REPLAY);
330 if (targetLSN > currentLSN)
333 (
errcode(ERRCODE_QUERY_CANCELED),
334 errmsg(
"timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X",
348 (
errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
349 errmsg(
"\"timeout\" must not be negative")));
375 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
376 errmsg(
"pg_wal_replay_wait() must be only called without an active or registered snapshot"),
377 errdetail(
"Make sure pg_wal_replay_wait() isn't called within a transaction with an isolation level higher than READ COMMITTED, another procedure, or a function.")));
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
TimestampTz GetCurrentTimestamp(void)
#define Assert(condition)
int errdetail(const char *fmt,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
#define PG_GETARG_INT64(n)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define WL_EXIT_ON_PM_DEATH
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void pfree(void *pointer)
#define CHECK_FOR_INTERRUPTS()
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
pairingheap_node * pairingheap_first(pairingheap *heap)
void pairingheap_initialize(pairingheap *heap, pairingheap_comparator compare, void *arg)
pairingheap_node * pairingheap_remove_first(pairingheap *heap)
#define pairingheap_is_empty(h)
#define pairingheap_container(type, membername, ptr)
#define pairingheap_const_container(type, membername, ptr)
Size add_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Size mul_size(Size s1, Size s2)
static pg_noinline void Size size
bool ActiveSnapshotSet(void)
void PopActiveSnapshot(void)
void InvalidateCatalogSnapshot(void)
Snapshot GetOldestSnapshot(void)
WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER]
pg_atomic_uint64 minWaitedLSN
#define InvalidTransactionId
#define TimestampTzPlusMilliseconds(tz, ms)
void WaitLSNShmemInit(void)
static void addLSNWaiter(XLogRecPtr lsn)
static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
void WaitLSNCleanup(void)
Datum pg_wal_replay_wait(PG_FUNCTION_ARGS)
struct WaitLSNState * waitLSNState
static void deleteLSNWaiter(void)
static void WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout)
Size WaitLSNShmemSize(void)
void WaitLSNSetLatches(XLogRecPtr currentLSN)
static void updateMinWaitedLSN(void)
bool RecoveryInProgress(void)
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)