PostgreSQL Source Code git master
Loading...
Searching...
No Matches
xlogwait.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * xlogwait.c
4 * Implements waiting for WAL operations to reach specific LSNs.
5 *
6 * Copyright (c) 2025-2026, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/access/transam/xlogwait.c
10 *
11 * NOTES
12 * This file implements waiting for WAL operations to reach specific LSNs
13 * on both physical standby and primary servers. The core idea is simple:
14 * every process that wants to wait publishes the LSN it needs to the
15 * shared memory, and the appropriate process (startup on standby,
16 * walreceiver on standby, or WAL writer/backend on primary) wakes it
17 * once that LSN has been reached.
18 *
19 * The shared memory used by this module comprises a procInfos
20 * per-backend array with the information of the awaited LSN for each
21 * of the backend processes. The elements of that array are organized
22 * into pairing heaps (waitersHeap), one for each WaitLSNType, which
23 * allows for very fast finding of the least awaited LSN for each type.
24 *
25 * In addition, the least-awaited LSN for each type is cached in the
26 * minWaitedLSN array. The waiter process publishes information about
27 * itself to the shared memory and waits on the latch until it is woken
28 * up by the appropriate process, standby is promoted, or the postmaster
29 * dies. Then, it cleans information about itself in the shared memory.
30 *
31 * On standby servers:
32 * - After replaying a WAL record, the startup process performs a fast
33 * path check minWaitedLSN[REPLAY] > replayLSN. If this check is
34 * negative, it checks waitersHeap[REPLAY] and wakes up the backends
35 * whose awaited LSNs are reached.
36 * - After receiving WAL, the walreceiver process performs similar checks
37 * against the flush and write LSNs, waking up waiters in the FLUSH
38 * and WRITE heaps, respectively.
39 *
40 * On primary servers: After flushing WAL, the WAL writer or backend
41 * process performs a similar check against the flush LSN and wakes up
42 * waiters whose target flush LSNs have been reached.
43 *
44 *-------------------------------------------------------------------------
45 */
46
47#include "postgres.h"
48
49#include <float.h>
50
51#include "access/xlog.h"
52#include "access/xlogrecovery.h"
53#include "access/xlogwait.h"
54#include "miscadmin.h"
55#include "pgstat.h"
57#include "storage/latch.h"
58#include "storage/proc.h"
59#include "storage/shmem.h"
60#include "storage/subsystems.h"
61#include "utils/fmgrprotos.h"
62#include "utils/pg_lsn.h"
63#include "utils/snapmgr.h"
64#include "utils/wait_event.h"
65
66
67static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
68 void *arg);
69
71
72static void WaitLSNShmemRequest(void *arg);
73static void WaitLSNShmemInit(void *arg);
74
79
80/*
81 * Wait event for each WaitLSNType, used with WaitLatch() to report
82 * the wait in pg_stat_activity.
83 */
90
92 "WaitLSNWaitEvents must match WaitLSNType enum");
93
94/*
95 * Get the current LSN for the specified wait type.
96 */
99{
100 Assert(lsnType >= 0 && lsnType < WAIT_LSN_TYPE_COUNT);
101
102 switch (lsnType)
103 {
106
108 return GetWalRcvWriteRecPtr();
109
112
114 return GetFlushRecPtr(NULL);
115 }
116
117 elog(ERROR, "invalid LSN wait type: %d", lsnType);
119}
120
121/* Register the shared memory space needed for WaitLSNState. */
122static void
124{
125 Size size;
126
129 ShmemRequestStruct(.name = "WaitLSNState",
130 .size = size,
131 .ptr = (void **) &waitLSNState,
132 );
133}
134
135/* Initialize the WaitLSNState in the shared memory. */
136static void
138{
139 /* Initialize heaps and tracking */
140 for (int i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
141 {
144 }
145
146 /* Initialize process info array */
149}
150
151/*
152 * Comparison function for LSN waiters heaps. Waiting processes are ordered by
153 * LSN, so that the waiter with smallest LSN is at the top.
154 */
155static int
157{
160
161 if (aproc->waitLSN < bproc->waitLSN)
162 return 1;
163 else if (aproc->waitLSN > bproc->waitLSN)
164 return -1;
165 else
166 return 0;
167}
168
169/*
170 * Update minimum waited LSN for the specified LSN type
171 */
172static void
189
190/*
191 * Add current process to appropriate waiters heap based on LSN type
192 */
193static void
195{
197 int i = (int) lsnType;
198
199 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
200
202
203 procInfo->procno = MyProcNumber;
204 procInfo->waitLSN = lsn;
205 procInfo->lsnType = lsnType;
206
207 Assert(!procInfo->inHeap);
209 procInfo->inHeap = true;
210 updateMinWaitedLSN(lsnType);
211
213}
214
215/*
216 * Remove current process from appropriate waiters heap based on LSN type
217 */
218static void
220{
222 int i = (int) lsnType;
223
224 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
225
227
228 Assert(procInfo->lsnType == lsnType);
229
230 if (procInfo->inHeap)
231 {
233 procInfo->inHeap = false;
234 updateMinWaitedLSN(lsnType);
235 }
236
238}
239
240/*
241 * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
242 * on the stack. It should be enough to take single iteration for most cases.
243 */
244#define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
245
246/*
247 * Remove waiters whose LSN has been reached from the heap and set their
248 * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap
249 * and set latches for all waiters.
250 *
251 * This function first accumulates waiters to wake up into an array, then
252 * wakes them up without holding a WaitLSNLock. The array size is static and
253 * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE. That should be more than enough
254 * to wake up all the waiters at once in the vast majority of cases. However,
255 * if there are more waiters, this function will loop to process them in
256 * multiple chunks.
257 */
258static void
260{
262 int numWakeUpProcs;
263 int i = (int) lsnType;
264
265 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
266
267 do
268 {
269 int j;
270
271 numWakeUpProcs = 0;
273
274 /*
275 * Iterate the waiters heap until we find LSN not yet reached. Record
276 * process numbers to wake up, but send wakeups after releasing lock.
277 */
279 {
282
283 /* Get procInfo using appropriate heap node */
285
287 break;
288
292
293 /* Update appropriate flag */
294 procInfo->inHeap = false;
295
297 break;
298 }
299
300 updateMinWaitedLSN(lsnType);
302
303 /*
304 * Set latches for processes whose waited LSNs have been reached.
305 * Since SetLatch() is a time-consuming operation, we do this outside
306 * of WaitLSNLock. This is safe because procLatch is never freed, so
307 * at worst we may set a latch for the wrong process or for no process
308 * at all, which is harmless.
309 */
310 for (j = 0; j < numWakeUpProcs; j++)
312
314}
315
316/*
317 * Wake up processes waiting for LSN to reach currentLSN
318 */
319void
321{
322 int i = (int) lsnType;
323
324 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
325
326 /*
327 * Fast path check. Skip if currentLSN is InvalidXLogRecPtr, which means
328 * "wake all waiters" (e.g., during promotion when recovery ends).
329 */
332 return;
333
334 wakeupWaiters(lsnType, currentLSN);
335}
336
337/*
338 * Clean up LSN waiters for exiting process
339 */
340void
342{
343 if (waitLSNState)
344 {
345 /*
346 * We do a fast-path check of the inHeap flag without the lock. This
347 * flag is set to true only by the process itself. So, it's only
348 * possible to get a false positive. But that will be eliminated by a
349 * recheck inside deleteLSNWaiter().
350 */
353 }
354}
355
356/*
357 * Check if the given LSN type requires recovery to be in progress.
358 * Standby wait types (replay, write, flush) require recovery;
359 * primary wait types (flush) do not.
360 */
361static inline bool
368
369/*
370 * Wait using MyLatch till the given LSN is reached, the replica gets
371 * promoted, or the postmaster dies.
372 *
373 * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached.
374 * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
375 * or replica got promoted before the target LSN reached.
376 */
379{
383
384 /* Shouldn't be called when shmem isn't initialized */
386
387 /* Should have a valid proc number */
389
390 if (timeout > 0)
391 {
394 }
395
396 /*
397 * Add our process to the waiters heap. It might happen that target LSN
398 * gets reached before we do. The check at the beginning of the loop
399 * below prevents the race condition.
400 */
401 addLSNWaiter(targetLSN, lsnType);
402
403 for (;;)
404 {
405 int rc;
406 long delay_ms = -1;
407
408 /* Get current LSN for the wait type */
410
411 /* Check that recovery is still in-progress */
413 {
414 /*
415 * Recovery was ended, but check if target LSN was already
416 * reached.
417 */
418 deleteLSNWaiter(lsnType);
419
423 }
424 else
425 {
426 /* Check if the waited LSN has been reached */
427 if (targetLSN <= currentLSN)
428 break;
429 }
430
431 if (timeout > 0)
432 {
434 if (delay_ms <= 0)
435 break;
436 }
437
439
440 rc = WaitLatch(MyLatch, wake_events, delay_ms,
441 WaitLSNWaitEvents[lsnType]);
442
443 /*
444 * Emergency bailout if postmaster has died. This is to avoid the
445 * necessity for manual cleanup of all postmaster children.
446 */
447 if (rc & WL_POSTMASTER_DEATH)
450 errmsg("terminating connection due to unexpected postmaster exit"),
451 errcontext("while waiting for LSN"));
452
453 if (rc & WL_LATCH_SET)
455 }
456
457 /*
458 * Delete our process from the shared memory heap. We might already be
459 * deleted by the startup process. The 'inHeap' flags prevents us from
460 * the double deletion.
461 */
462 deleteLSNWaiter(lsnType);
463
464 /*
465 * If we didn't reach the target LSN, we must be exited by timeout.
466 */
467 if (targetLSN > currentLSN)
469
471}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:485
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:453
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition atomics.h:467
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1751
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1639
#define Assert(condition)
Definition c.h:943
int64_t int64
Definition c.h:621
#define pg_unreachable()
Definition c.h:367
uint32_t uint32
Definition c.h:624
#define lengthof(array)
Definition c.h:873
#define PG_UINT64_MAX
Definition c.h:677
#define StaticAssertDecl(condition, errmessage)
Definition c.h:1008
size_t Size
Definition c.h:689
int64 TimestampTz
Definition timestamp.h:39
Datum arg
Definition elog.c:1322
int errcode(int sqlerrcode)
Definition elog.c:874
#define errcontext
Definition elog.h:199
#define FATAL
Definition elog.h:41
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:227
#define ereport(elevel,...)
Definition elog.h:151
ProcNumber MyProcNumber
Definition globals.c:92
int MaxBackends
Definition globals.c:149
struct Latch * MyLatch
Definition globals.c:65
int b
Definition isn.c:74
int a
Definition isn.c:73
int j
Definition isn.c:78
int i
Definition isn.c:77
void SetLatch(Latch *latch)
Definition latch.c:290
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_EXCLUSIVE
Definition lwlock.h:104
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
static char * errmsg
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
void pairingheap_initialize(pairingheap *heap, pairingheap_comparator compare, void *arg)
Definition pairingheap.c:60
pairingheap_node * pairingheap_remove_first(pairingheap *heap)
pairingheap_node * pairingheap_first(pairingheap *heap)
#define pairingheap_is_empty(h)
Definition pairingheap.h:99
#define pairingheap_container(type, membername, ptr)
Definition pairingheap.h:43
#define pairingheap_const_container(type, membername, ptr)
Definition pairingheap.h:51
static int fb(int x)
#define NUM_AUXILIARY_PROCS
Definition proc.h:527
#define GetPGProcByNumber(n)
Definition proc.h:504
int ProcNumber
Definition procnumber.h:24
Size add_size(Size s1, Size s2)
Definition shmem.c:1048
Size mul_size(Size s1, Size s2)
Definition shmem.c:1063
#define ShmemRequestStruct(...)
Definition shmem.h:176
ShmemRequestCallback request_fn
Definition shmem.h:133
WaitLSNType lsnType
Definition xlogwait.h:60
WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER]
Definition xlogwait.h:97
pg_atomic_uint64 minWaitedLSN[WAIT_LSN_TYPE_COUNT]
Definition xlogwait.h:85
pairingheap waitersHeap[WAIT_LSN_TYPE_COUNT]
Definition xlogwait.h:91
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
const char * name
#define WL_TIMEOUT
#define WL_LATCH_SET
#define WL_POSTMASTER_DEATH
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogRecPtr GetWalRcvWriteRecPtr(void)
bool RecoveryInProgress(void)
Definition xlog.c:6830
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition xlog.c:6995
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
uint64 XLogRecPtr
Definition xlogdefs.h:21
bool PromoteIsTriggered(void)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
Definition xlogwait.c:156
void WaitLSNCleanup(void)
Definition xlogwait.c:341
#define WAKEUP_PROC_STATIC_ARRAY_SIZE
Definition xlogwait.c:244
const ShmemCallbacks WaitLSNShmemCallbacks
Definition xlogwait.c:75
static bool WaitLSNTypeRequiresRecovery(WaitLSNType t)
Definition xlogwait.c:362
struct WaitLSNState * waitLSNState
Definition xlogwait.c:70
static void updateMinWaitedLSN(WaitLSNType lsnType)
Definition xlogwait.c:173
static void deleteLSNWaiter(WaitLSNType lsnType)
Definition xlogwait.c:219
XLogRecPtr GetCurrentLSNForWaitType(WaitLSNType lsnType)
Definition xlogwait.c:98
WaitLSNResult WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
Definition xlogwait.c:378
static void WaitLSNShmemRequest(void *arg)
Definition xlogwait.c:123
static void WaitLSNShmemInit(void *arg)
Definition xlogwait.c:137
static void wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
Definition xlogwait.c:259
static void addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
Definition xlogwait.c:194
void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
Definition xlogwait.c:320
static const uint32 WaitLSNWaitEvents[]
Definition xlogwait.c:84
#define WAIT_LSN_TYPE_COUNT
Definition xlogwait.h:47
WaitLSNResult
Definition xlogwait.h:26
@ WAIT_LSN_RESULT_NOT_IN_RECOVERY
Definition xlogwait.h:28
@ WAIT_LSN_RESULT_TIMEOUT
Definition xlogwait.h:30
@ WAIT_LSN_RESULT_SUCCESS
Definition xlogwait.h:27
WaitLSNType
Definition xlogwait.h:37
@ WAIT_LSN_TYPE_PRIMARY_FLUSH
Definition xlogwait.h:44
@ WAIT_LSN_TYPE_STANDBY_REPLAY
Definition xlogwait.h:39
@ WAIT_LSN_TYPE_STANDBY_FLUSH
Definition xlogwait.h:41
@ WAIT_LSN_TYPE_STANDBY_WRITE
Definition xlogwait.h:40