PostgreSQL Source Code git master
Loading...
Searching...
No Matches
xlogwait.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * xlogwait.c
4 * Implements waiting for WAL operations to reach specific LSNs.
5 *
6 * Copyright (c) 2025-2026, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/access/transam/xlogwait.c
10 *
11 * NOTES
12 * This file implements waiting for WAL operations to reach specific LSNs
13 * on both physical standby and primary servers. The core idea is simple:
14 * every process that wants to wait publishes the LSN it needs to the
15 * shared memory, and the appropriate process (startup on standby,
16 * walreceiver on standby, or WAL writer/backend on primary) wakes it
17 * once that LSN has been reached.
18 *
19 * The shared memory used by this module comprises a procInfos
20 * per-backend array with the information of the awaited LSN for each
21 * of the backend processes. The elements of that array are organized
22 * into pairing heaps (waitersHeap), one for each WaitLSNType, which
23 * allows for very fast finding of the least awaited LSN for each type.
24 *
25 * In addition, the least-awaited LSN for each type is cached in the
26 * minWaitedLSN array. The waiter process publishes information about
27 * itself to the shared memory and waits on the latch until it is woken
28 * up by the appropriate process, standby is promoted, or the postmaster
29 * dies. Then, it cleans information about itself in the shared memory.
30 *
31 * On standby servers:
32 * - After replaying a WAL record, the startup process performs a fast
33 * path check minWaitedLSN[REPLAY] > replayLSN. If this check is
34 * negative, it checks waitersHeap[REPLAY] and wakes up the backends
35 * whose awaited LSNs are reached.
36 * - After receiving WAL, the walreceiver process performs similar checks
37 * against the flush and write LSNs, waking up waiters in the FLUSH
38 * and WRITE heaps, respectively.
39 *
40 * On primary servers: After flushing WAL, the WAL writer or backend
41 * process performs a similar check against the flush LSN and wakes up
42 * waiters whose target flush LSNs have been reached.
43 *
44 *-------------------------------------------------------------------------
45 */
46
47#include "postgres.h"
48
49#include <float.h>
50
51#include "access/xlog.h"
52#include "access/xlogrecovery.h"
53#include "access/xlogwait.h"
54#include "miscadmin.h"
55#include "pgstat.h"
57#include "storage/latch.h"
58#include "storage/proc.h"
59#include "storage/shmem.h"
60#include "storage/subsystems.h"
61#include "utils/fmgrprotos.h"
62#include "utils/pg_lsn.h"
63#include "utils/snapmgr.h"
64#include "utils/wait_event.h"
65
66
67static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
68 void *arg);
69
71
72static void WaitLSNShmemRequest(void *arg);
73static void WaitLSNShmemInit(void *arg);
74
79
80/*
81 * Wait event for each WaitLSNType, used with WaitLatch() to report
82 * the wait in pg_stat_activity.
83 */
90
92 "WaitLSNWaitEvents must match WaitLSNType enum");
93
94/*
95 * Get the current LSN for the specified wait type. Provide memory
96 * barrier semantics before getting the value.
97 */
100{
101 Assert(lsnType >= 0 && lsnType < WAIT_LSN_TYPE_COUNT);
102
103 /*
104 * All of the cases below provide memory barrier semantics:
105 * GetWalRcvWriteRecPtr() and GetFlushRecPtr() have explicit barriers,
106 * while GetXLogReplayRecPtr() and GetWalRcvFlushRecPtr() use spinlocks.
107 */
108 switch (lsnType)
109 {
112
114 {
117
118 /*
119 * Use the replay position as a floor. WAL up to the replay
120 * point is already on disk from a base backup, archive
121 * restore, or prior streaming, so there is no reason to wait
122 * for the walreceiver to re-receive it.
123 */
124 return Max(recptr, replay);
125 }
126
128 {
131
132 /* Same floor as standby_write; see comment above. */
133 return Max(recptr, replay);
134 }
135
137 return GetFlushRecPtr(NULL);
138 }
139
140 elog(ERROR, "invalid LSN wait type: %d", lsnType);
142}
143
144/* Register the shared memory space needed for WaitLSNState. */
145static void
147{
148 Size size;
149
152 ShmemRequestStruct(.name = "WaitLSNState",
153 .size = size,
154 .ptr = (void **) &waitLSNState,
155 );
156}
157
158/* Initialize the WaitLSNState in the shared memory. */
159static void
161{
162 /* Initialize heaps and tracking */
163 for (int i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
164 {
167 }
168
169 /* Initialize process info array */
172}
173
174/*
175 * Comparison function for LSN waiters heaps. Waiting processes are ordered by
176 * LSN, so that the waiter with smallest LSN is at the top.
177 */
178static int
180{
183
184 if (aproc->waitLSN < bproc->waitLSN)
185 return 1;
186 else if (aproc->waitLSN > bproc->waitLSN)
187 return -1;
188 else
189 return 0;
190}
191
192/*
193 * Update minimum waited LSN for the specified LSN type
194 */
195static void
197{
199 int i = (int) lsnType;
200
201 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
202
204 {
207
208 minWaitedLSN = procInfo->waitLSN;
209 }
210 /* Pairs with pg_atomic_read_membarrier_u64() in WaitLSNWakeup(). */
212}
213
214/*
215 * Add current process to appropriate waiters heap based on LSN type
216 */
217static void
219{
221 int i = (int) lsnType;
222
223 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
224
226
227 procInfo->procno = MyProcNumber;
228 procInfo->waitLSN = lsn;
229 procInfo->lsnType = lsnType;
230
231 Assert(!procInfo->inHeap);
233 procInfo->inHeap = true;
234 updateMinWaitedLSN(lsnType);
235
237}
238
239/*
240 * Remove current process from appropriate waiters heap based on LSN type
241 */
242static void
244{
246 int i = (int) lsnType;
247
248 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
249
251
252 Assert(procInfo->lsnType == lsnType);
253
254 if (procInfo->inHeap)
255 {
257 procInfo->inHeap = false;
258 updateMinWaitedLSN(lsnType);
259 }
260
262}
263
264/*
265 * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
266 * on the stack. It should be enough to take single iteration for most cases.
267 */
268#define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
269
270/*
271 * Remove waiters whose LSN has been reached from the heap and set their
272 * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap
273 * and set latches for all waiters.
274 *
275 * This function first accumulates waiters to wake up into an array, then
276 * wakes them up without holding a WaitLSNLock. The array size is static and
277 * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE. That should be more than enough
278 * to wake up all the waiters at once in the vast majority of cases. However,
279 * if there are more waiters, this function will loop to process them in
280 * multiple chunks.
281 */
282static void
284{
286 int numWakeUpProcs;
287 int i = (int) lsnType;
288
289 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
290
291 do
292 {
293 int j;
294
295 numWakeUpProcs = 0;
297
298 /*
299 * Iterate the waiters heap until we find LSN not yet reached. Record
300 * process numbers to wake up, but send wakeups after releasing lock.
301 */
303 {
306
307 /* Get procInfo using appropriate heap node */
309
311 break;
312
316
317 /* Update appropriate flag */
318 procInfo->inHeap = false;
319
321 break;
322 }
323
324 updateMinWaitedLSN(lsnType);
326
327 /*
328 * Set latches for processes whose waited LSNs have been reached.
329 * Since SetLatch() is a time-consuming operation, we do this outside
330 * of WaitLSNLock. This is safe because procLatch is never freed, so
331 * at worst we may set a latch for the wrong process or for no process
332 * at all, which is harmless.
333 */
334 for (j = 0; j < numWakeUpProcs; j++)
336
338}
339
340/*
341 * Wake up processes waiting for LSN to reach currentLSN
342 */
343void
345{
346 int i = (int) lsnType;
347
348 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
349
350 /*
351 * Fast path check. Skip if currentLSN is InvalidXLogRecPtr, which means
352 * "wake all waiters" (e.g., during promotion when recovery ends). Pairs
353 * with pg_atomic_write_membarrier_u64() in updateMinWaitedLSN().
354 */
357 return;
358
359 wakeupWaiters(lsnType, currentLSN);
360}
361
362/*
363 * Clean up any LSN wait state for the current process.
364 */
365void
367{
368 if (waitLSNState)
369 {
370 /*
371 * We do a fast-path check of the inHeap flag without the lock. This
372 * flag is set to true only by the process itself. So, it's only
373 * possible to get a false positive. But that will be eliminated by a
374 * recheck inside deleteLSNWaiter().
375 */
378 }
379}
380
381/*
382 * Check if the given LSN type requires recovery to be in progress.
383 * Standby wait types (replay, write, flush) require recovery;
384 * primary wait types (flush) do not.
385 */
386static inline bool
393
394/*
395 * Wait using MyLatch till the given LSN is reached, the replica gets
396 * promoted, or the postmaster dies.
397 *
398 * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached.
399 * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
400 * or replica got promoted before the target LSN reached.
401 */
404{
408
409 /* Shouldn't be called when shmem isn't initialized */
411
412 /* Should have a valid proc number */
414
415 if (timeout > 0)
416 {
419 }
420
421 /*
422 * Add our process to the waiters heap. It might happen that target LSN
423 * gets reached before we do. The check at the beginning of the loop
424 * below prevents the race condition.
425 */
426 addLSNWaiter(targetLSN, lsnType);
427
428 for (;;)
429 {
430 int rc;
431 long delay_ms = -1;
432
433 /* Get current LSN for the wait type */
435
436 /* Check that recovery is still in-progress */
438 {
439 /*
440 * Recovery was ended, but check if target LSN was already
441 * reached.
442 */
443 deleteLSNWaiter(lsnType);
444
448 }
449 else
450 {
451 /* Check if the waited LSN has been reached */
452 if (targetLSN <= currentLSN)
453 break;
454 }
455
456 if (timeout > 0)
457 {
459 if (delay_ms <= 0)
460 break;
461 }
462
464
465 rc = WaitLatch(MyLatch, wake_events, delay_ms,
466 WaitLSNWaitEvents[lsnType]);
467
468 /*
469 * Emergency bailout if postmaster has died. This is to avoid the
470 * necessity for manual cleanup of all postmaster children.
471 */
472 if (rc & WL_POSTMASTER_DEATH)
475 errmsg("terminating connection due to unexpected postmaster exit"),
476 errcontext("while waiting for LSN"));
477
479 }
480
481 /*
482 * Delete our process from the shared memory heap. We might already be
483 * deleted by the startup process. The 'inHeap' flags prevents us from
484 * the double deletion.
485 */
486 deleteLSNWaiter(lsnType);
487
488 /*
489 * If we didn't reach the target LSN, we must be exited by timeout.
490 */
491 if (targetLSN > currentLSN)
493
495}
static uint64 pg_atomic_read_membarrier_u64(volatile pg_atomic_uint64 *ptr)
Definition atomics.h:476
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:453
static void pg_atomic_write_membarrier_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:504
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1765
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1649
#define Max(x, y)
Definition c.h:1085
#define Assert(condition)
Definition c.h:943
int64_t int64
Definition c.h:621
#define pg_unreachable()
Definition c.h:367
uint32_t uint32
Definition c.h:624
#define lengthof(array)
Definition c.h:873
#define PG_UINT64_MAX
Definition c.h:677
#define StaticAssertDecl(condition, errmessage)
Definition c.h:1008
size_t Size
Definition c.h:689
int64 TimestampTz
Definition timestamp.h:39
Datum arg
Definition elog.c:1323
int errcode(int sqlerrcode)
Definition elog.c:875
#define errcontext
Definition elog.h:200
#define FATAL
Definition elog.h:42
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define ereport(elevel,...)
Definition elog.h:152
ProcNumber MyProcNumber
Definition globals.c:92
int MaxBackends
Definition globals.c:149
struct Latch * MyLatch
Definition globals.c:65
int b
Definition isn.c:74
int a
Definition isn.c:73
int j
Definition isn.c:78
int i
Definition isn.c:77
void SetLatch(Latch *latch)
Definition latch.c:290
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_EXCLUSIVE
Definition lwlock.h:104
Size add_size(Size s1, Size s2)
Definition mcxt.c:1733
Size mul_size(Size s1, Size s2)
Definition mcxt.c:1752
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
static char * errmsg
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
void pairingheap_initialize(pairingheap *heap, pairingheap_comparator compare, void *arg)
Definition pairingheap.c:60
pairingheap_node * pairingheap_remove_first(pairingheap *heap)
pairingheap_node * pairingheap_first(pairingheap *heap)
#define pairingheap_is_empty(h)
Definition pairingheap.h:99
#define pairingheap_container(type, membername, ptr)
Definition pairingheap.h:43
#define pairingheap_const_container(type, membername, ptr)
Definition pairingheap.h:51
static int fb(int x)
#define NUM_AUXILIARY_PROCS
Definition proc.h:527
#define GetPGProcByNumber(n)
Definition proc.h:504
int ProcNumber
Definition procnumber.h:24
#define ShmemRequestStruct(...)
Definition shmem.h:176
ShmemRequestCallback request_fn
Definition shmem.h:133
WaitLSNType lsnType
Definition xlogwait.h:60
WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER]
Definition xlogwait.h:97
pg_atomic_uint64 minWaitedLSN[WAIT_LSN_TYPE_COUNT]
Definition xlogwait.h:85
pairingheap waitersHeap[WAIT_LSN_TYPE_COUNT]
Definition xlogwait.h:91
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
const char * name
#define WL_TIMEOUT
#define WL_LATCH_SET
#define WL_POSTMASTER_DEATH
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogRecPtr GetWalRcvWriteRecPtr(void)
bool RecoveryInProgress(void)
Definition xlog.c:6832
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition xlog.c:6997
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
uint64 XLogRecPtr
Definition xlogdefs.h:21
bool PromoteIsTriggered(void)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
Definition xlogwait.c:179
void WaitLSNCleanup(void)
Definition xlogwait.c:366
#define WAKEUP_PROC_STATIC_ARRAY_SIZE
Definition xlogwait.c:268
const ShmemCallbacks WaitLSNShmemCallbacks
Definition xlogwait.c:75
static bool WaitLSNTypeRequiresRecovery(WaitLSNType t)
Definition xlogwait.c:387
struct WaitLSNState * waitLSNState
Definition xlogwait.c:70
static void updateMinWaitedLSN(WaitLSNType lsnType)
Definition xlogwait.c:196
static void deleteLSNWaiter(WaitLSNType lsnType)
Definition xlogwait.c:243
XLogRecPtr GetCurrentLSNForWaitType(WaitLSNType lsnType)
Definition xlogwait.c:99
WaitLSNResult WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
Definition xlogwait.c:403
static void WaitLSNShmemRequest(void *arg)
Definition xlogwait.c:146
static void WaitLSNShmemInit(void *arg)
Definition xlogwait.c:160
static void wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
Definition xlogwait.c:283
static void addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
Definition xlogwait.c:218
void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
Definition xlogwait.c:344
static const uint32 WaitLSNWaitEvents[]
Definition xlogwait.c:84
#define WAIT_LSN_TYPE_COUNT
Definition xlogwait.h:47
WaitLSNResult
Definition xlogwait.h:26
@ WAIT_LSN_RESULT_NOT_IN_RECOVERY
Definition xlogwait.h:28
@ WAIT_LSN_RESULT_TIMEOUT
Definition xlogwait.h:30
@ WAIT_LSN_RESULT_SUCCESS
Definition xlogwait.h:27
WaitLSNType
Definition xlogwait.h:37
@ WAIT_LSN_TYPE_PRIMARY_FLUSH
Definition xlogwait.h:44
@ WAIT_LSN_TYPE_STANDBY_REPLAY
Definition xlogwait.h:39
@ WAIT_LSN_TYPE_STANDBY_FLUSH
Definition xlogwait.h:41
@ WAIT_LSN_TYPE_STANDBY_WRITE
Definition xlogwait.h:40