PostgreSQL Source Code git master
Loading...
Searching...
No Matches
xlogwait.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * xlogwait.c
4 * Implements waiting for WAL operations to reach specific LSNs.
5 *
6 * Copyright (c) 2025-2026, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/access/transam/xlogwait.c
10 *
11 * NOTES
12 * This file implements waiting for WAL operations to reach specific LSNs
13 * on both physical standby and primary servers. The core idea is simple:
14 * every process that wants to wait publishes the LSN it needs to the
15 * shared memory, and the appropriate process (startup on standby,
16 * walreceiver on standby, or WAL writer/backend on primary) wakes it
17 * once that LSN has been reached.
18 *
19 * The shared memory used by this module comprises a procInfos
20 * per-backend array with the information of the awaited LSN for each
21 * of the backend processes. The elements of that array are organized
22 * into pairing heaps (waitersHeap), one for each WaitLSNType, which
23 * allows for very fast finding of the least awaited LSN for each type.
24 *
25 * In addition, the least-awaited LSN for each type is cached in the
26 * minWaitedLSN array. The waiter process publishes information about
27 * itself to the shared memory and waits on the latch until it is woken
28 * up by the appropriate process, standby is promoted, or the postmaster
29 * dies. Then, it cleans information about itself in the shared memory.
30 *
31 * On standby servers:
32 * - After replaying a WAL record, the startup process performs a fast
33 * path check minWaitedLSN[REPLAY] > replayLSN. If this check is
34 * negative, it checks waitersHeap[REPLAY] and wakes up the backends
35 * whose awaited LSNs are reached.
36 * - After receiving WAL, the walreceiver process performs similar checks
37 * against the flush and write LSNs, waking up waiters in the FLUSH
38 * and WRITE heaps, respectively.
39 *
40 * On primary servers: After flushing WAL, the WAL writer or backend
41 * process performs a similar check against the flush LSN and wakes up
42 * waiters whose target flush LSNs have been reached.
43 *
44 *-------------------------------------------------------------------------
45 */
46
47#include "postgres.h"
48
49#include <float.h>
50
51#include "access/xlog.h"
52#include "access/xlogrecovery.h"
53#include "access/xlogwait.h"
54#include "miscadmin.h"
55#include "pgstat.h"
57#include "storage/latch.h"
58#include "storage/proc.h"
59#include "storage/shmem.h"
60#include "utils/fmgrprotos.h"
61#include "utils/pg_lsn.h"
62#include "utils/snapmgr.h"
63
64
65static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
66 void *arg);
67
69
70/*
71 * Wait event for each WaitLSNType, used with WaitLatch() to report
72 * the wait in pg_stat_activity.
73 */
80
82 "WaitLSNWaitEvents must match WaitLSNType enum");
83
84/*
85 * Get the current LSN for the specified wait type.
86 */
89{
90 Assert(lsnType >= 0 && lsnType < WAIT_LSN_TYPE_COUNT);
91
92 switch (lsnType)
93 {
96
98 return GetWalRcvWriteRecPtr();
99
102
104 return GetFlushRecPtr(NULL);
105 }
106
107 elog(ERROR, "invalid LSN wait type: %d", lsnType);
109}
110
111/* Report the amount of shared memory space needed for WaitLSNState. */
112Size
114{
115 Size size;
116
119 return size;
120}
121
122/* Initialize the WaitLSNState in the shared memory. */
123void
125{
126 bool found;
127
128 waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
130 &found);
131 if (!found)
132 {
133 int i;
134
135 /* Initialize heaps and tracking */
136 for (i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
137 {
140 }
141
142 /* Initialize process info array */
145 }
146}
147
148/*
149 * Comparison function for LSN waiters heaps. Waiting processes are ordered by
150 * LSN, so that the waiter with smallest LSN is at the top.
151 */
152static int
154{
157
158 if (aproc->waitLSN < bproc->waitLSN)
159 return 1;
160 else if (aproc->waitLSN > bproc->waitLSN)
161 return -1;
162 else
163 return 0;
164}
165
166/*
167 * Update minimum waited LSN for the specified LSN type
168 */
169static void
186
187/*
188 * Add current process to appropriate waiters heap based on LSN type
189 */
190static void
192{
194 int i = (int) lsnType;
195
196 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
197
199
200 procInfo->procno = MyProcNumber;
201 procInfo->waitLSN = lsn;
202 procInfo->lsnType = lsnType;
203
204 Assert(!procInfo->inHeap);
206 procInfo->inHeap = true;
207 updateMinWaitedLSN(lsnType);
208
210}
211
212/*
213 * Remove current process from appropriate waiters heap based on LSN type
214 */
215static void
217{
219 int i = (int) lsnType;
220
221 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
222
224
225 Assert(procInfo->lsnType == lsnType);
226
227 if (procInfo->inHeap)
228 {
230 procInfo->inHeap = false;
231 updateMinWaitedLSN(lsnType);
232 }
233
235}
236
237/*
238 * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
239 * on the stack. It should be enough to take single iteration for most cases.
240 */
241#define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
242
243/*
244 * Remove waiters whose LSN has been reached from the heap and set their
245 * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap
246 * and set latches for all waiters.
247 *
248 * This function first accumulates waiters to wake up into an array, then
249 * wakes them up without holding a WaitLSNLock. The array size is static and
250 * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE. That should be more than enough
251 * to wake up all the waiters at once in the vast majority of cases. However,
252 * if there are more waiters, this function will loop to process them in
253 * multiple chunks.
254 */
255static void
257{
259 int numWakeUpProcs;
260 int i = (int) lsnType;
261
262 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
263
264 do
265 {
266 int j;
267
268 numWakeUpProcs = 0;
270
271 /*
272 * Iterate the waiters heap until we find LSN not yet reached. Record
273 * process numbers to wake up, but send wakeups after releasing lock.
274 */
276 {
279
280 /* Get procInfo using appropriate heap node */
282
284 break;
285
289
290 /* Update appropriate flag */
291 procInfo->inHeap = false;
292
294 break;
295 }
296
297 updateMinWaitedLSN(lsnType);
299
300 /*
301 * Set latches for processes whose waited LSNs have been reached.
302 * Since SetLatch() is a time-consuming operation, we do this outside
303 * of WaitLSNLock. This is safe because procLatch is never freed, so
304 * at worst we may set a latch for the wrong process or for no process
305 * at all, which is harmless.
306 */
307 for (j = 0; j < numWakeUpProcs; j++)
309
311}
312
313/*
314 * Wake up processes waiting for LSN to reach currentLSN
315 */
316void
318{
319 int i = (int) lsnType;
320
321 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
322
323 /*
324 * Fast path check. Skip if currentLSN is InvalidXLogRecPtr, which means
325 * "wake all waiters" (e.g., during promotion when recovery ends).
326 */
329 return;
330
331 wakeupWaiters(lsnType, currentLSN);
332}
333
334/*
335 * Clean up LSN waiters for exiting process
336 */
337void
339{
340 if (waitLSNState)
341 {
342 /*
343 * We do a fast-path check of the inHeap flag without the lock. This
344 * flag is set to true only by the process itself. So, it's only
345 * possible to get a false positive. But that will be eliminated by a
346 * recheck inside deleteLSNWaiter().
347 */
350 }
351}
352
353/*
354 * Check if the given LSN type requires recovery to be in progress.
355 * Standby wait types (replay, write, flush) require recovery;
356 * primary wait types (flush) do not.
357 */
358static inline bool
365
366/*
367 * Wait using MyLatch till the given LSN is reached, the replica gets
368 * promoted, or the postmaster dies.
369 *
370 * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached.
371 * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
372 * or replica got promoted before the target LSN reached.
373 */
376{
380
381 /* Shouldn't be called when shmem isn't initialized */
383
384 /* Should have a valid proc number */
386
387 if (timeout > 0)
388 {
391 }
392
393 /*
394 * Add our process to the waiters heap. It might happen that target LSN
395 * gets reached before we do. The check at the beginning of the loop
396 * below prevents the race condition.
397 */
398 addLSNWaiter(targetLSN, lsnType);
399
400 for (;;)
401 {
402 int rc;
403 long delay_ms = -1;
404
405 /* Get current LSN for the wait type */
407
408 /* Check that recovery is still in-progress */
410 {
411 /*
412 * Recovery was ended, but check if target LSN was already
413 * reached.
414 */
415 deleteLSNWaiter(lsnType);
416
420 }
421 else
422 {
423 /* Check if the waited LSN has been reached */
424 if (targetLSN <= currentLSN)
425 break;
426 }
427
428 if (timeout > 0)
429 {
431 if (delay_ms <= 0)
432 break;
433 }
434
436
437 rc = WaitLatch(MyLatch, wake_events, delay_ms,
438 WaitLSNWaitEvents[lsnType]);
439
440 /*
441 * Emergency bailout if postmaster has died. This is to avoid the
442 * necessity for manual cleanup of all postmaster children.
443 */
444 if (rc & WL_POSTMASTER_DEATH)
447 errmsg("terminating connection due to unexpected postmaster exit"),
448 errcontext("while waiting for LSN"));
449
450 if (rc & WL_LATCH_SET)
452 }
453
454 /*
455 * Delete our process from the shared memory heap. We might already be
456 * deleted by the startup process. The 'inHeap' flags prevents us from
457 * the double deletion.
458 */
459 deleteLSNWaiter(lsnType);
460
461 /*
462 * If we didn't reach the target LSN, we must be exited by timeout.
463 */
464 if (targetLSN > currentLSN)
466
468}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:485
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:453
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition atomics.h:467
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1757
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1645
#define Assert(condition)
Definition c.h:883
int64_t int64
Definition c.h:553
#define pg_unreachable()
Definition c.h:351
uint32_t uint32
Definition c.h:556
#define lengthof(array)
Definition c.h:813
#define PG_UINT64_MAX
Definition c.h:617
#define StaticAssertDecl(condition, errmessage)
Definition c.h:952
size_t Size
Definition c.h:629
int64 TimestampTz
Definition timestamp.h:39
int errcode(int sqlerrcode)
Definition elog.c:863
int errmsg(const char *fmt,...)
Definition elog.c:1080
#define errcontext
Definition elog.h:198
#define FATAL
Definition elog.h:41
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
ProcNumber MyProcNumber
Definition globals.c:90
int MaxBackends
Definition globals.c:146
struct Latch * MyLatch
Definition globals.c:63
int b
Definition isn.c:74
int a
Definition isn.c:73
int j
Definition isn.c:78
int i
Definition isn.c:77
void SetLatch(Latch *latch)
Definition latch.c:290
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1176
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1793
@ LW_EXCLUSIVE
Definition lwlock.h:112
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
void pairingheap_initialize(pairingheap *heap, pairingheap_comparator compare, void *arg)
Definition pairingheap.c:60
pairingheap_node * pairingheap_remove_first(pairingheap *heap)
pairingheap_node * pairingheap_first(pairingheap *heap)
#define pairingheap_is_empty(h)
Definition pairingheap.h:99
#define pairingheap_container(type, membername, ptr)
Definition pairingheap.h:43
#define pairingheap_const_container(type, membername, ptr)
Definition pairingheap.h:51
void * arg
static int fb(int x)
#define NUM_AUXILIARY_PROCS
Definition proc.h:469
#define GetPGProcByNumber(n)
Definition proc.h:446
int ProcNumber
Definition procnumber.h:24
Size add_size(Size s1, Size s2)
Definition shmem.c:495
Size mul_size(Size s1, Size s2)
Definition shmem.c:510
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:389
WaitLSNType lsnType
Definition xlogwait.h:60
WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER]
Definition xlogwait.h:97
pg_atomic_uint64 minWaitedLSN[WAIT_LSN_TYPE_COUNT]
Definition xlogwait.h:85
pairingheap waitersHeap[WAIT_LSN_TYPE_COUNT]
Definition xlogwait.h:91
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
#define WL_TIMEOUT
#define WL_LATCH_SET
#define WL_POSTMASTER_DEATH
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogRecPtr GetWalRcvWriteRecPtr(void)
bool RecoveryInProgress(void)
Definition xlog.c:6461
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition xlog.c:6626
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
uint64 XLogRecPtr
Definition xlogdefs.h:21
bool PromoteIsTriggered(void)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
void WaitLSNShmemInit(void)
Definition xlogwait.c:124
static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
Definition xlogwait.c:153
void WaitLSNCleanup(void)
Definition xlogwait.c:338
#define WAKEUP_PROC_STATIC_ARRAY_SIZE
Definition xlogwait.c:241
static bool WaitLSNTypeRequiresRecovery(WaitLSNType t)
Definition xlogwait.c:359
struct WaitLSNState * waitLSNState
Definition xlogwait.c:68
static void updateMinWaitedLSN(WaitLSNType lsnType)
Definition xlogwait.c:170
static void deleteLSNWaiter(WaitLSNType lsnType)
Definition xlogwait.c:216
XLogRecPtr GetCurrentLSNForWaitType(WaitLSNType lsnType)
Definition xlogwait.c:88
WaitLSNResult WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
Definition xlogwait.c:375
static void wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
Definition xlogwait.c:256
static void addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
Definition xlogwait.c:191
void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
Definition xlogwait.c:317
Size WaitLSNShmemSize(void)
Definition xlogwait.c:113
static const uint32 WaitLSNWaitEvents[]
Definition xlogwait.c:74
#define WAIT_LSN_TYPE_COUNT
Definition xlogwait.h:47
WaitLSNResult
Definition xlogwait.h:26
@ WAIT_LSN_RESULT_NOT_IN_RECOVERY
Definition xlogwait.h:28
@ WAIT_LSN_RESULT_TIMEOUT
Definition xlogwait.h:30
@ WAIT_LSN_RESULT_SUCCESS
Definition xlogwait.h:27
WaitLSNType
Definition xlogwait.h:37
@ WAIT_LSN_TYPE_PRIMARY_FLUSH
Definition xlogwait.h:44
@ WAIT_LSN_TYPE_STANDBY_REPLAY
Definition xlogwait.h:39
@ WAIT_LSN_TYPE_STANDBY_FLUSH
Definition xlogwait.h:41
@ WAIT_LSN_TYPE_STANDBY_WRITE
Definition xlogwait.h:40