PostgreSQL Source Code git master
Loading...
Searching...
No Matches
xlogwait.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * xlogwait.c
4 * Implements waiting for WAL operations to reach specific LSNs.
5 *
6 * Copyright (c) 2025-2026, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/access/transam/xlogwait.c
10 *
11 * NOTES
12 * This file implements waiting for WAL operations to reach specific LSNs
13 * on both physical standby and primary servers. The core idea is simple:
14 * every process that wants to wait publishes the LSN it needs to the
15 * shared memory, and the appropriate process (startup on standby,
16 * walreceiver on standby, or WAL writer/backend on primary) wakes it
17 * once that LSN has been reached.
18 *
19 * The shared memory used by this module comprises a procInfos
20 * per-backend array with the information of the awaited LSN for each
21 * of the backend processes. The elements of that array are organized
22 * into pairing heaps (waitersHeap), one for each WaitLSNType, which
23 * allows for very fast finding of the least awaited LSN for each type.
24 *
25 * In addition, the least-awaited LSN for each type is cached in the
26 * minWaitedLSN array. The waiter process publishes information about
27 * itself to the shared memory and waits on the latch until it is woken
28 * up by the appropriate process, standby is promoted, or the postmaster
29 * dies. Then, it cleans information about itself in the shared memory.
30 *
31 * On standby servers:
32 * - After replaying a WAL record, the startup process performs a fast
33 * path check minWaitedLSN[REPLAY] > replayLSN. If this check is
34 * negative, it checks waitersHeap[REPLAY] and wakes up the backends
35 * whose awaited LSNs are reached.
36 * - After receiving WAL, the walreceiver process performs similar checks
37 * against the flush and write LSNs, waking up waiters in the FLUSH
38 * and WRITE heaps, respectively.
39 *
40 * On primary servers: After flushing WAL, the WAL writer or backend
41 * process performs a similar check against the flush LSN and wakes up
42 * waiters whose target flush LSNs have been reached.
43 *
44 *-------------------------------------------------------------------------
45 */
46
47#include "postgres.h"
48
49#include <float.h>
50
51#include "access/xlog.h"
52#include "access/xlogrecovery.h"
53#include "access/xlogwait.h"
54#include "miscadmin.h"
55#include "pgstat.h"
57#include "storage/latch.h"
58#include "storage/proc.h"
59#include "storage/shmem.h"
60#include "utils/fmgrprotos.h"
61#include "utils/pg_lsn.h"
62#include "utils/snapmgr.h"
63#include "utils/wait_event.h"
64
65
66static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
67 void *arg);
68
70
71/*
72 * Wait event for each WaitLSNType, used with WaitLatch() to report
73 * the wait in pg_stat_activity.
74 */
81
83 "WaitLSNWaitEvents must match WaitLSNType enum");
84
85/*
86 * Get the current LSN for the specified wait type.
87 */
90{
91 Assert(lsnType >= 0 && lsnType < WAIT_LSN_TYPE_COUNT);
92
93 switch (lsnType)
94 {
97
99 return GetWalRcvWriteRecPtr();
100
103
105 return GetFlushRecPtr(NULL);
106 }
107
108 elog(ERROR, "invalid LSN wait type: %d", lsnType);
110}
111
112/* Report the amount of shared memory space needed for WaitLSNState. */
113Size
115{
116 Size size;
117
120 return size;
121}
122
123/* Initialize the WaitLSNState in the shared memory. */
124void
126{
127 bool found;
128
129 waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
131 &found);
132 if (!found)
133 {
134 int i;
135
136 /* Initialize heaps and tracking */
137 for (i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
138 {
141 }
142
143 /* Initialize process info array */
146 }
147}
148
149/*
150 * Comparison function for LSN waiters heaps. Waiting processes are ordered by
151 * LSN, so that the waiter with smallest LSN is at the top.
152 */
153static int
155{
158
159 if (aproc->waitLSN < bproc->waitLSN)
160 return 1;
161 else if (aproc->waitLSN > bproc->waitLSN)
162 return -1;
163 else
164 return 0;
165}
166
167/*
168 * Update minimum waited LSN for the specified LSN type
169 */
170static void
187
188/*
189 * Add current process to appropriate waiters heap based on LSN type
190 */
191static void
193{
195 int i = (int) lsnType;
196
197 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
198
200
201 procInfo->procno = MyProcNumber;
202 procInfo->waitLSN = lsn;
203 procInfo->lsnType = lsnType;
204
205 Assert(!procInfo->inHeap);
207 procInfo->inHeap = true;
208 updateMinWaitedLSN(lsnType);
209
211}
212
213/*
214 * Remove current process from appropriate waiters heap based on LSN type
215 */
216static void
218{
220 int i = (int) lsnType;
221
222 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
223
225
226 Assert(procInfo->lsnType == lsnType);
227
228 if (procInfo->inHeap)
229 {
231 procInfo->inHeap = false;
232 updateMinWaitedLSN(lsnType);
233 }
234
236}
237
238/*
239 * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
240 * on the stack. It should be enough to take single iteration for most cases.
241 */
242#define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
243
244/*
245 * Remove waiters whose LSN has been reached from the heap and set their
246 * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap
247 * and set latches for all waiters.
248 *
249 * This function first accumulates waiters to wake up into an array, then
250 * wakes them up without holding a WaitLSNLock. The array size is static and
251 * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE. That should be more than enough
252 * to wake up all the waiters at once in the vast majority of cases. However,
253 * if there are more waiters, this function will loop to process them in
254 * multiple chunks.
255 */
256static void
258{
260 int numWakeUpProcs;
261 int i = (int) lsnType;
262
263 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
264
265 do
266 {
267 int j;
268
269 numWakeUpProcs = 0;
271
272 /*
273 * Iterate the waiters heap until we find LSN not yet reached. Record
274 * process numbers to wake up, but send wakeups after releasing lock.
275 */
277 {
280
281 /* Get procInfo using appropriate heap node */
283
285 break;
286
290
291 /* Update appropriate flag */
292 procInfo->inHeap = false;
293
295 break;
296 }
297
298 updateMinWaitedLSN(lsnType);
300
301 /*
302 * Set latches for processes whose waited LSNs have been reached.
303 * Since SetLatch() is a time-consuming operation, we do this outside
304 * of WaitLSNLock. This is safe because procLatch is never freed, so
305 * at worst we may set a latch for the wrong process or for no process
306 * at all, which is harmless.
307 */
308 for (j = 0; j < numWakeUpProcs; j++)
310
312}
313
314/*
315 * Wake up processes waiting for LSN to reach currentLSN
316 */
317void
319{
320 int i = (int) lsnType;
321
322 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
323
324 /*
325 * Fast path check. Skip if currentLSN is InvalidXLogRecPtr, which means
326 * "wake all waiters" (e.g., during promotion when recovery ends).
327 */
330 return;
331
332 wakeupWaiters(lsnType, currentLSN);
333}
334
335/*
336 * Clean up LSN waiters for exiting process
337 */
338void
340{
341 if (waitLSNState)
342 {
343 /*
344 * We do a fast-path check of the inHeap flag without the lock. This
345 * flag is set to true only by the process itself. So, it's only
346 * possible to get a false positive. But that will be eliminated by a
347 * recheck inside deleteLSNWaiter().
348 */
351 }
352}
353
354/*
355 * Check if the given LSN type requires recovery to be in progress.
356 * Standby wait types (replay, write, flush) require recovery;
357 * primary wait types (flush) do not.
358 */
359static inline bool
366
367/*
368 * Wait using MyLatch till the given LSN is reached, the replica gets
369 * promoted, or the postmaster dies.
370 *
371 * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached.
372 * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
373 * or replica got promoted before the target LSN reached.
374 */
377{
381
382 /* Shouldn't be called when shmem isn't initialized */
384
385 /* Should have a valid proc number */
387
388 if (timeout > 0)
389 {
392 }
393
394 /*
395 * Add our process to the waiters heap. It might happen that target LSN
396 * gets reached before we do. The check at the beginning of the loop
397 * below prevents the race condition.
398 */
399 addLSNWaiter(targetLSN, lsnType);
400
401 for (;;)
402 {
403 int rc;
404 long delay_ms = -1;
405
406 /* Get current LSN for the wait type */
408
409 /* Check that recovery is still in-progress */
411 {
412 /*
413 * Recovery was ended, but check if target LSN was already
414 * reached.
415 */
416 deleteLSNWaiter(lsnType);
417
421 }
422 else
423 {
424 /* Check if the waited LSN has been reached */
425 if (targetLSN <= currentLSN)
426 break;
427 }
428
429 if (timeout > 0)
430 {
432 if (delay_ms <= 0)
433 break;
434 }
435
437
438 rc = WaitLatch(MyLatch, wake_events, delay_ms,
439 WaitLSNWaitEvents[lsnType]);
440
441 /*
442 * Emergency bailout if postmaster has died. This is to avoid the
443 * necessity for manual cleanup of all postmaster children.
444 */
445 if (rc & WL_POSTMASTER_DEATH)
448 errmsg("terminating connection due to unexpected postmaster exit"),
449 errcontext("while waiting for LSN"));
450
451 if (rc & WL_LATCH_SET)
453 }
454
455 /*
456 * Delete our process from the shared memory heap. We might already be
457 * deleted by the startup process. The 'inHeap' flags prevents us from
458 * the double deletion.
459 */
460 deleteLSNWaiter(lsnType);
461
462 /*
463 * If we didn't reach the target LSN, we must be exited by timeout.
464 */
465 if (targetLSN > currentLSN)
467
469}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:485
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:453
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition atomics.h:467
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1748
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1636
#define Assert(condition)
Definition c.h:945
int64_t int64
Definition c.h:615
#define pg_unreachable()
Definition c.h:361
uint32_t uint32
Definition c.h:618
#define lengthof(array)
Definition c.h:875
#define PG_UINT64_MAX
Definition c.h:679
#define StaticAssertDecl(condition, errmessage)
Definition c.h:1010
size_t Size
Definition c.h:691
int64 TimestampTz
Definition timestamp.h:39
Datum arg
Definition elog.c:1322
int errcode(int sqlerrcode)
Definition elog.c:874
#define errcontext
Definition elog.h:198
#define FATAL
Definition elog.h:41
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
ProcNumber MyProcNumber
Definition globals.c:90
int MaxBackends
Definition globals.c:146
struct Latch * MyLatch
Definition globals.c:63
int b
Definition isn.c:74
int a
Definition isn.c:73
int j
Definition isn.c:78
int i
Definition isn.c:77
void SetLatch(Latch *latch)
Definition latch.c:290
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1177
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1794
@ LW_EXCLUSIVE
Definition lwlock.h:112
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
static char * errmsg
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
void pairingheap_initialize(pairingheap *heap, pairingheap_comparator compare, void *arg)
Definition pairingheap.c:60
pairingheap_node * pairingheap_remove_first(pairingheap *heap)
pairingheap_node * pairingheap_first(pairingheap *heap)
#define pairingheap_is_empty(h)
Definition pairingheap.h:99
#define pairingheap_container(type, membername, ptr)
Definition pairingheap.h:43
#define pairingheap_const_container(type, membername, ptr)
Definition pairingheap.h:51
static int fb(int x)
#define NUM_AUXILIARY_PROCS
Definition proc.h:524
#define GetPGProcByNumber(n)
Definition proc.h:501
int ProcNumber
Definition procnumber.h:24
Size add_size(Size s1, Size s2)
Definition shmem.c:485
Size mul_size(Size s1, Size s2)
Definition shmem.c:500
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:381
WaitLSNType lsnType
Definition xlogwait.h:60
WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER]
Definition xlogwait.h:97
pg_atomic_uint64 minWaitedLSN[WAIT_LSN_TYPE_COUNT]
Definition xlogwait.h:85
pairingheap waitersHeap[WAIT_LSN_TYPE_COUNT]
Definition xlogwait.h:91
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
#define WL_TIMEOUT
#define WL_LATCH_SET
#define WL_POSTMASTER_DEATH
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogRecPtr GetWalRcvWriteRecPtr(void)
bool RecoveryInProgress(void)
Definition xlog.c:6444
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition xlog.c:6609
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
uint64 XLogRecPtr
Definition xlogdefs.h:21
bool PromoteIsTriggered(void)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
void WaitLSNShmemInit(void)
Definition xlogwait.c:125
static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
Definition xlogwait.c:154
void WaitLSNCleanup(void)
Definition xlogwait.c:339
#define WAKEUP_PROC_STATIC_ARRAY_SIZE
Definition xlogwait.c:242
static bool WaitLSNTypeRequiresRecovery(WaitLSNType t)
Definition xlogwait.c:360
struct WaitLSNState * waitLSNState
Definition xlogwait.c:69
static void updateMinWaitedLSN(WaitLSNType lsnType)
Definition xlogwait.c:171
static void deleteLSNWaiter(WaitLSNType lsnType)
Definition xlogwait.c:217
XLogRecPtr GetCurrentLSNForWaitType(WaitLSNType lsnType)
Definition xlogwait.c:89
WaitLSNResult WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
Definition xlogwait.c:376
static void wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
Definition xlogwait.c:257
static void addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
Definition xlogwait.c:192
void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
Definition xlogwait.c:318
Size WaitLSNShmemSize(void)
Definition xlogwait.c:114
static const uint32 WaitLSNWaitEvents[]
Definition xlogwait.c:75
#define WAIT_LSN_TYPE_COUNT
Definition xlogwait.h:47
WaitLSNResult
Definition xlogwait.h:26
@ WAIT_LSN_RESULT_NOT_IN_RECOVERY
Definition xlogwait.h:28
@ WAIT_LSN_RESULT_TIMEOUT
Definition xlogwait.h:30
@ WAIT_LSN_RESULT_SUCCESS
Definition xlogwait.h:27
WaitLSNType
Definition xlogwait.h:37
@ WAIT_LSN_TYPE_PRIMARY_FLUSH
Definition xlogwait.h:44
@ WAIT_LSN_TYPE_STANDBY_REPLAY
Definition xlogwait.h:39
@ WAIT_LSN_TYPE_STANDBY_FLUSH
Definition xlogwait.h:41
@ WAIT_LSN_TYPE_STANDBY_WRITE
Definition xlogwait.h:40