PostgreSQL Source Code git master
Loading...
Searching...
No Matches
xlogwait.c File Reference
#include "postgres.h"
#include <float.h>
#include "access/xlog.h"
#include "access/xlogrecovery.h"
#include "access/xlogwait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/walreceiver.h"
#include "storage/latch.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "storage/subsystems.h"
#include "utils/fmgrprotos.h"
#include "utils/pg_lsn.h"
#include "utils/snapmgr.h"
#include "utils/wait_event.h"
Include dependency graph for xlogwait.c:

Go to the source code of this file.

Macros

#define WAKEUP_PROC_STATIC_ARRAY_SIZE   (16)
 

Functions

static int waitlsn_cmp (const pairingheap_node *a, const pairingheap_node *b, void *arg)
 
static void WaitLSNShmemRequest (void *arg)
 
static void WaitLSNShmemInit (void *arg)
 
 StaticAssertDecl (lengthof(WaitLSNWaitEvents)==WAIT_LSN_TYPE_COUNT, "WaitLSNWaitEvents must match WaitLSNType enum")
 
XLogRecPtr GetCurrentLSNForWaitType (WaitLSNType lsnType)
 
static void updateMinWaitedLSN (WaitLSNType lsnType)
 
static void addLSNWaiter (XLogRecPtr lsn, WaitLSNType lsnType)
 
static void deleteLSNWaiter (WaitLSNType lsnType)
 
static void wakeupWaiters (WaitLSNType lsnType, XLogRecPtr currentLSN)
 
void WaitLSNWakeup (WaitLSNType lsnType, XLogRecPtr currentLSN)
 
void WaitLSNCleanup (void)
 
static bool WaitLSNTypeRequiresRecovery (WaitLSNType t)
 
WaitLSNResult WaitForLSN (WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
 

Variables

struct WaitLSNStatewaitLSNState = NULL
 
const ShmemCallbacks WaitLSNShmemCallbacks
 
static const uint32 WaitLSNWaitEvents []
 

Macro Definition Documentation

◆ WAKEUP_PROC_STATIC_ARRAY_SIZE

#define WAKEUP_PROC_STATIC_ARRAY_SIZE   (16)

Definition at line 268 of file xlogwait.c.

Function Documentation

◆ addLSNWaiter()

static void addLSNWaiter ( XLogRecPtr  lsn,
WaitLSNType  lsnType 
)
static

Definition at line 218 of file xlogwait.c.

219{
221 int i = (int) lsnType;
222
223 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
224
226
227 procInfo->procno = MyProcNumber;
228 procInfo->waitLSN = lsn;
229 procInfo->lsnType = lsnType;
230
231 Assert(!procInfo->inHeap);
233 procInfo->inHeap = true;
234 updateMinWaitedLSN(lsnType);
235
237}
#define Assert(condition)
Definition c.h:943
ProcNumber MyProcNumber
Definition globals.c:92
int i
Definition isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_EXCLUSIVE
Definition lwlock.h:104
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
static int fb(int x)
WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER]
Definition xlogwait.h:97
pairingheap waitersHeap[WAIT_LSN_TYPE_COUNT]
Definition xlogwait.h:91
struct WaitLSNState * waitLSNState
Definition xlogwait.c:70
static void updateMinWaitedLSN(WaitLSNType lsnType)
Definition xlogwait.c:196
#define WAIT_LSN_TYPE_COUNT
Definition xlogwait.h:47

References Assert, fb(), i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcNumber, pairingheap_add(), WaitLSNState::procInfos, updateMinWaitedLSN(), WAIT_LSN_TYPE_COUNT, WaitLSNState::waitersHeap, and waitLSNState.

Referenced by WaitForLSN().

◆ deleteLSNWaiter()

static void deleteLSNWaiter ( WaitLSNType  lsnType)
static

Definition at line 243 of file xlogwait.c.

244{
246 int i = (int) lsnType;
247
248 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
249
251
252 Assert(procInfo->lsnType == lsnType);
253
254 if (procInfo->inHeap)
255 {
257 procInfo->inHeap = false;
258 updateMinWaitedLSN(lsnType);
259 }
260
262}
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)

References Assert, fb(), i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcNumber, pairingheap_remove(), WaitLSNState::procInfos, updateMinWaitedLSN(), WAIT_LSN_TYPE_COUNT, WaitLSNState::waitersHeap, and waitLSNState.

Referenced by WaitForLSN(), and WaitLSNCleanup().

◆ GetCurrentLSNForWaitType()

XLogRecPtr GetCurrentLSNForWaitType ( WaitLSNType  lsnType)

Definition at line 99 of file xlogwait.c.

100{
101 Assert(lsnType >= 0 && lsnType < WAIT_LSN_TYPE_COUNT);
102
103 /*
104 * All of the cases below provide memory barrier semantics:
105 * GetWalRcvWriteRecPtr() and GetFlushRecPtr() have explicit barriers,
106 * while GetXLogReplayRecPtr() and GetWalRcvFlushRecPtr() use spinlocks.
107 */
108 switch (lsnType)
109 {
112
114 {
117
118 /*
119 * Use the replay position as a floor. WAL up to the replay
120 * point is already on disk from a base backup, archive
121 * restore, or prior streaming, so there is no reason to wait
122 * for the walreceiver to re-receive it.
123 */
124 return Max(recptr, replay);
125 }
126
128 {
131
132 /* Same floor as standby_write; see comment above. */
133 return Max(recptr, replay);
134 }
135
137 return GetFlushRecPtr(NULL);
138 }
139
140 elog(ERROR, "invalid LSN wait type: %d", lsnType);
142}
#define Max(x, y)
Definition c.h:1085
#define pg_unreachable()
Definition c.h:367
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogRecPtr GetWalRcvWriteRecPtr(void)
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition xlog.c:6997
uint64 XLogRecPtr
Definition xlogdefs.h:21
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
@ WAIT_LSN_TYPE_PRIMARY_FLUSH
Definition xlogwait.h:44
@ WAIT_LSN_TYPE_STANDBY_REPLAY
Definition xlogwait.h:39
@ WAIT_LSN_TYPE_STANDBY_FLUSH
Definition xlogwait.h:41
@ WAIT_LSN_TYPE_STANDBY_WRITE
Definition xlogwait.h:40

References Assert, elog, ERROR, fb(), GetFlushRecPtr(), GetWalRcvFlushRecPtr(), GetWalRcvWriteRecPtr(), GetXLogReplayRecPtr(), Max, pg_unreachable, WAIT_LSN_TYPE_COUNT, WAIT_LSN_TYPE_PRIMARY_FLUSH, WAIT_LSN_TYPE_STANDBY_FLUSH, WAIT_LSN_TYPE_STANDBY_REPLAY, and WAIT_LSN_TYPE_STANDBY_WRITE.

Referenced by ExecWaitStmt(), and WaitForLSN().

◆ StaticAssertDecl()

StaticAssertDecl ( lengthof(WaitLSNWaitEvents = =WAIT_LSN_TYPE_COUNT,
"WaitLSNWaitEvents must match WaitLSNType enum"   
)

◆ updateMinWaitedLSN()

static void updateMinWaitedLSN ( WaitLSNType  lsnType)
static

Definition at line 196 of file xlogwait.c.

197{
198 XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
199 int i = (int) lsnType;
200
201 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
202
204 {
207
208 minWaitedLSN = procInfo->waitLSN;
209 }
210 /* Pairs with pg_atomic_read_membarrier_u64() in WaitLSNWakeup(). */
212}
static void pg_atomic_write_membarrier_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:504
#define PG_UINT64_MAX
Definition c.h:677
pairingheap_node * pairingheap_first(pairingheap *heap)
#define pairingheap_is_empty(h)
Definition pairingheap.h:99
#define pairingheap_container(type, membername, ptr)
Definition pairingheap.h:43
pg_atomic_uint64 minWaitedLSN[WAIT_LSN_TYPE_COUNT]
Definition xlogwait.h:85

References Assert, fb(), i, WaitLSNState::minWaitedLSN, pairingheap_container, pairingheap_first(), pairingheap_is_empty, pg_atomic_write_membarrier_u64(), PG_UINT64_MAX, WAIT_LSN_TYPE_COUNT, WaitLSNState::waitersHeap, and waitLSNState.

Referenced by addLSNWaiter(), deleteLSNWaiter(), and wakeupWaiters().

◆ WaitForLSN()

WaitLSNResult WaitForLSN ( WaitLSNType  lsnType,
XLogRecPtr  targetLSN,
int64  timeout 
)

Definition at line 403 of file xlogwait.c.

404{
408
409 /* Shouldn't be called when shmem isn't initialized */
411
412 /* Should have a valid proc number */
414
415 if (timeout > 0)
416 {
419 }
420
421 /*
422 * Add our process to the waiters heap. It might happen that target LSN
423 * gets reached before we do. The check at the beginning of the loop
424 * below prevents the race condition.
425 */
426 addLSNWaiter(targetLSN, lsnType);
427
428 for (;;)
429 {
430 int rc;
431 long delay_ms = -1;
432
433 /* Get current LSN for the wait type */
435
436 /* Check that recovery is still in-progress */
438 {
439 /*
440 * Recovery was ended, but check if target LSN was already
441 * reached.
442 */
443 deleteLSNWaiter(lsnType);
444
448 }
449 else
450 {
451 /* Check if the waited LSN has been reached */
452 if (targetLSN <= currentLSN)
453 break;
454 }
455
456 if (timeout > 0)
457 {
459 if (delay_ms <= 0)
460 break;
461 }
462
464
465 rc = WaitLatch(MyLatch, wake_events, delay_ms,
466 WaitLSNWaitEvents[lsnType]);
467
468 /*
469 * Emergency bailout if postmaster has died. This is to avoid the
470 * necessity for manual cleanup of all postmaster children.
471 */
472 if (rc & WL_POSTMASTER_DEATH)
475 errmsg("terminating connection due to unexpected postmaster exit"),
476 errcontext("while waiting for LSN"));
477
479 }
480
481 /*
482 * Delete our process from the shared memory heap. We might already be
483 * deleted by the startup process. The 'inHeap' flags prevents us from
484 * the double deletion.
485 */
486 deleteLSNWaiter(lsnType);
487
488 /*
489 * If we didn't reach the target LSN, we must be exited by timeout.
490 */
491 if (targetLSN > currentLSN)
493
495}
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1765
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1649
int64 TimestampTz
Definition timestamp.h:39
int errcode(int sqlerrcode)
Definition elog.c:875
#define errcontext
Definition elog.h:200
#define FATAL
Definition elog.h:42
#define ereport(elevel,...)
Definition elog.h:152
int MaxBackends
Definition globals.c:149
struct Latch * MyLatch
Definition globals.c:65
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
static char * errmsg
#define NUM_AUXILIARY_PROCS
Definition proc.h:527
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
#define WL_TIMEOUT
#define WL_LATCH_SET
#define WL_POSTMASTER_DEATH
bool RecoveryInProgress(void)
Definition xlog.c:6832
bool PromoteIsTriggered(void)
static bool WaitLSNTypeRequiresRecovery(WaitLSNType t)
Definition xlogwait.c:387
static void deleteLSNWaiter(WaitLSNType lsnType)
Definition xlogwait.c:243
XLogRecPtr GetCurrentLSNForWaitType(WaitLSNType lsnType)
Definition xlogwait.c:99
static void addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
Definition xlogwait.c:218
static const uint32 WaitLSNWaitEvents[]
Definition xlogwait.c:84
@ WAIT_LSN_RESULT_NOT_IN_RECOVERY
Definition xlogwait.h:28
@ WAIT_LSN_RESULT_TIMEOUT
Definition xlogwait.h:30
@ WAIT_LSN_RESULT_SUCCESS
Definition xlogwait.h:27

References addLSNWaiter(), Assert, CHECK_FOR_INTERRUPTS, deleteLSNWaiter(), ereport, errcode(), errcontext, errmsg, FATAL, fb(), GetCurrentLSNForWaitType(), GetCurrentTimestamp(), MaxBackends, MyLatch, MyProcNumber, NUM_AUXILIARY_PROCS, PromoteIsTriggered(), RecoveryInProgress(), ResetLatch(), TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, WAIT_LSN_RESULT_NOT_IN_RECOVERY, WAIT_LSN_RESULT_SUCCESS, WAIT_LSN_RESULT_TIMEOUT, WaitLatch(), waitLSNState, WaitLSNTypeRequiresRecovery(), WaitLSNWaitEvents, WL_LATCH_SET, WL_POSTMASTER_DEATH, and WL_TIMEOUT.

Referenced by decode_concurrent_changes(), and ExecWaitStmt().

◆ waitlsn_cmp()

static int waitlsn_cmp ( const pairingheap_node a,
const pairingheap_node b,
void arg 
)
static

Definition at line 179 of file xlogwait.c.

180{
183
184 if (aproc->waitLSN < bproc->waitLSN)
185 return 1;
186 else if (aproc->waitLSN > bproc->waitLSN)
187 return -1;
188 else
189 return 0;
190}
int b
Definition isn.c:74
int a
Definition isn.c:73
#define pairingheap_const_container(type, membername, ptr)
Definition pairingheap.h:51

References a, b, fb(), and pairingheap_const_container.

Referenced by WaitLSNShmemInit().

◆ WaitLSNCleanup()

void WaitLSNCleanup ( void  )

Definition at line 366 of file xlogwait.c.

367{
368 if (waitLSNState)
369 {
370 /*
371 * We do a fast-path check of the inHeap flag without the lock. This
372 * flag is set to true only by the process itself. So, it's only
373 * possible to get a false positive. But that will be eliminated by a
374 * recheck inside deleteLSNWaiter().
375 */
378 }
379}
WaitLSNType lsnType
Definition xlogwait.h:60

References deleteLSNWaiter(), WaitLSNProcInfo::inHeap, WaitLSNProcInfo::lsnType, MyProcNumber, WaitLSNState::procInfos, and waitLSNState.

Referenced by AbortSubTransaction(), AbortTransaction(), and ProcKill().

◆ WaitLSNShmemInit()

static void WaitLSNShmemInit ( void arg)
static

Definition at line 160 of file xlogwait.c.

161{
162 /* Initialize heaps and tracking */
163 for (int i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
164 {
167 }
168
169 /* Initialize process info array */
172}
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:453
void pairingheap_initialize(pairingheap *heap, pairingheap_comparator compare, void *arg)
Definition pairingheap.c:60
static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
Definition xlogwait.c:179

References fb(), i, MaxBackends, WaitLSNState::minWaitedLSN, NUM_AUXILIARY_PROCS, pairingheap_initialize(), pg_atomic_init_u64(), PG_UINT64_MAX, WaitLSNState::procInfos, WAIT_LSN_TYPE_COUNT, WaitLSNState::waitersHeap, waitlsn_cmp(), and waitLSNState.

◆ WaitLSNShmemRequest()

static void WaitLSNShmemRequest ( void arg)
static

Definition at line 146 of file xlogwait.c.

147{
148 Size size;
149
150 size = offsetof(WaitLSNState, procInfos);
152 ShmemRequestStruct(.name = "WaitLSNState",
153 .size = size,
154 .ptr = (void **) &waitLSNState,
155 );
156}
size_t Size
Definition c.h:689
Size add_size(Size s1, Size s2)
Definition mcxt.c:1733
Size mul_size(Size s1, Size s2)
Definition mcxt.c:1752
#define ShmemRequestStruct(...)
Definition shmem.h:176
const char * name

References add_size(), fb(), MaxBackends, mul_size(), name, NUM_AUXILIARY_PROCS, WaitLSNState::procInfos, ShmemRequestStruct, and waitLSNState.

◆ WaitLSNTypeRequiresRecovery()

static bool WaitLSNTypeRequiresRecovery ( WaitLSNType  t)
inlinestatic

◆ WaitLSNWakeup()

void WaitLSNWakeup ( WaitLSNType  lsnType,
XLogRecPtr  currentLSN 
)

Definition at line 344 of file xlogwait.c.

345{
346 int i = (int) lsnType;
347
348 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
349
350 /*
351 * Fast path check. Skip if currentLSN is InvalidXLogRecPtr, which means
352 * "wake all waiters" (e.g., during promotion when recovery ends). Pairs
353 * with pg_atomic_write_membarrier_u64() in updateMinWaitedLSN().
354 */
357 return;
358
359 wakeupWaiters(lsnType, currentLSN);
360}
static uint64 pg_atomic_read_membarrier_u64(volatile pg_atomic_uint64 *ptr)
Definition atomics.h:476
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
static void wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
Definition xlogwait.c:283

References Assert, fb(), i, WaitLSNState::minWaitedLSN, pg_atomic_read_membarrier_u64(), WAIT_LSN_TYPE_COUNT, waitLSNState, wakeupWaiters(), and XLogRecPtrIsValid.

Referenced by PerformWalRecovery(), StartupXLOG(), XLogBackgroundFlush(), XLogFlush(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ wakeupWaiters()

static void wakeupWaiters ( WaitLSNType  lsnType,
XLogRecPtr  currentLSN 
)
static

Definition at line 283 of file xlogwait.c.

284{
286 int numWakeUpProcs;
287 int i = (int) lsnType;
288
289 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
290
291 do
292 {
293 int j;
294
295 numWakeUpProcs = 0;
297
298 /*
299 * Iterate the waiters heap until we find LSN not yet reached. Record
300 * process numbers to wake up, but send wakeups after releasing lock.
301 */
303 {
306
307 /* Get procInfo using appropriate heap node */
309
311 break;
312
316
317 /* Update appropriate flag */
318 procInfo->inHeap = false;
319
321 break;
322 }
323
324 updateMinWaitedLSN(lsnType);
326
327 /*
328 * Set latches for processes whose waited LSNs have been reached.
329 * Since SetLatch() is a time-consuming operation, we do this outside
330 * of WaitLSNLock. This is safe because procLatch is never freed, so
331 * at worst we may set a latch for the wrong process or for no process
332 * at all, which is harmless.
333 */
334 for (j = 0; j < numWakeUpProcs; j++)
336
338}
int j
Definition isn.c:78
void SetLatch(Latch *latch)
Definition latch.c:290
pairingheap_node * pairingheap_remove_first(pairingheap *heap)
#define GetPGProcByNumber(n)
Definition proc.h:504
int ProcNumber
Definition procnumber.h:24
#define WAKEUP_PROC_STATIC_ARRAY_SIZE
Definition xlogwait.c:268

References Assert, fb(), GetPGProcByNumber, i, j, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), pairingheap_container, pairingheap_first(), pairingheap_is_empty, pairingheap_remove_first(), SetLatch(), updateMinWaitedLSN(), WAIT_LSN_TYPE_COUNT, WaitLSNState::waitersHeap, waitLSNState, WAKEUP_PROC_STATIC_ARRAY_SIZE, and XLogRecPtrIsValid.

Referenced by WaitLSNWakeup().

Variable Documentation

◆ WaitLSNShmemCallbacks

const ShmemCallbacks WaitLSNShmemCallbacks
Initial value:
= {
.request_fn = WaitLSNShmemRequest,
.init_fn = WaitLSNShmemInit,
}
static void WaitLSNShmemRequest(void *arg)
Definition xlogwait.c:146
static void WaitLSNShmemInit(void *arg)
Definition xlogwait.c:160

Definition at line 75 of file xlogwait.c.

75 {
76 .request_fn = WaitLSNShmemRequest,
77 .init_fn = WaitLSNShmemInit,
78};

◆ waitLSNState

◆ WaitLSNWaitEvents