PostgreSQL Source Code git master
Loading...
Searching...
No Matches
xlogwait.c File Reference
#include "postgres.h"
#include <float.h>
#include "access/xlog.h"
#include "access/xlogrecovery.h"
#include "access/xlogwait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/walreceiver.h"
#include "storage/latch.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "storage/subsystems.h"
#include "utils/fmgrprotos.h"
#include "utils/pg_lsn.h"
#include "utils/snapmgr.h"
#include "utils/wait_event.h"
Include dependency graph for xlogwait.c:

Go to the source code of this file.

Macros

#define WAKEUP_PROC_STATIC_ARRAY_SIZE   (16)
 

Functions

static int waitlsn_cmp (const pairingheap_node *a, const pairingheap_node *b, void *arg)
 
static void WaitLSNShmemRequest (void *arg)
 
static void WaitLSNShmemInit (void *arg)
 
 StaticAssertDecl (lengthof(WaitLSNWaitEvents)==WAIT_LSN_TYPE_COUNT, "WaitLSNWaitEvents must match WaitLSNType enum")
 
XLogRecPtr GetCurrentLSNForWaitType (WaitLSNType lsnType)
 
static void updateMinWaitedLSN (WaitLSNType lsnType)
 
static void addLSNWaiter (XLogRecPtr lsn, WaitLSNType lsnType)
 
static void deleteLSNWaiter (WaitLSNType lsnType)
 
static void wakeupWaiters (WaitLSNType lsnType, XLogRecPtr currentLSN)
 
void WaitLSNWakeup (WaitLSNType lsnType, XLogRecPtr currentLSN)
 
void WaitLSNCleanup (void)
 
static bool WaitLSNTypeRequiresRecovery (WaitLSNType t)
 
WaitLSNResult WaitForLSN (WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
 

Variables

struct WaitLSNStatewaitLSNState = NULL
 
const ShmemCallbacks WaitLSNShmemCallbacks
 
static const uint32 WaitLSNWaitEvents []
 

Macro Definition Documentation

◆ WAKEUP_PROC_STATIC_ARRAY_SIZE

#define WAKEUP_PROC_STATIC_ARRAY_SIZE   (16)

Definition at line 244 of file xlogwait.c.

Function Documentation

◆ addLSNWaiter()

static void addLSNWaiter ( XLogRecPtr  lsn,
WaitLSNType  lsnType 
)
static

Definition at line 194 of file xlogwait.c.

195{
197 int i = (int) lsnType;
198
199 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
200
202
203 procInfo->procno = MyProcNumber;
204 procInfo->waitLSN = lsn;
205 procInfo->lsnType = lsnType;
206
207 Assert(!procInfo->inHeap);
209 procInfo->inHeap = true;
210 updateMinWaitedLSN(lsnType);
211
213}
#define Assert(condition)
Definition c.h:943
ProcNumber MyProcNumber
Definition globals.c:92
int i
Definition isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_EXCLUSIVE
Definition lwlock.h:104
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
static int fb(int x)
WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER]
Definition xlogwait.h:97
pairingheap waitersHeap[WAIT_LSN_TYPE_COUNT]
Definition xlogwait.h:91
struct WaitLSNState * waitLSNState
Definition xlogwait.c:70
static void updateMinWaitedLSN(WaitLSNType lsnType)
Definition xlogwait.c:173
#define WAIT_LSN_TYPE_COUNT
Definition xlogwait.h:47

References Assert, fb(), i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcNumber, pairingheap_add(), WaitLSNState::procInfos, updateMinWaitedLSN(), WAIT_LSN_TYPE_COUNT, WaitLSNState::waitersHeap, and waitLSNState.

Referenced by WaitForLSN().

◆ deleteLSNWaiter()

static void deleteLSNWaiter ( WaitLSNType  lsnType)
static

Definition at line 219 of file xlogwait.c.

220{
222 int i = (int) lsnType;
223
224 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
225
227
228 Assert(procInfo->lsnType == lsnType);
229
230 if (procInfo->inHeap)
231 {
233 procInfo->inHeap = false;
234 updateMinWaitedLSN(lsnType);
235 }
236
238}
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)

References Assert, fb(), i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcNumber, pairingheap_remove(), WaitLSNState::procInfos, updateMinWaitedLSN(), WAIT_LSN_TYPE_COUNT, WaitLSNState::waitersHeap, and waitLSNState.

Referenced by WaitForLSN(), and WaitLSNCleanup().

◆ GetCurrentLSNForWaitType()

XLogRecPtr GetCurrentLSNForWaitType ( WaitLSNType  lsnType)

Definition at line 98 of file xlogwait.c.

99{
100 Assert(lsnType >= 0 && lsnType < WAIT_LSN_TYPE_COUNT);
101
102 switch (lsnType)
103 {
106
108 return GetWalRcvWriteRecPtr();
109
112
114 return GetFlushRecPtr(NULL);
115 }
116
117 elog(ERROR, "invalid LSN wait type: %d", lsnType);
119}
#define pg_unreachable()
Definition c.h:367
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogRecPtr GetWalRcvWriteRecPtr(void)
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition xlog.c:6995
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
@ WAIT_LSN_TYPE_PRIMARY_FLUSH
Definition xlogwait.h:44
@ WAIT_LSN_TYPE_STANDBY_REPLAY
Definition xlogwait.h:39
@ WAIT_LSN_TYPE_STANDBY_FLUSH
Definition xlogwait.h:41
@ WAIT_LSN_TYPE_STANDBY_WRITE
Definition xlogwait.h:40

References Assert, elog, ERROR, fb(), GetFlushRecPtr(), GetWalRcvFlushRecPtr(), GetWalRcvWriteRecPtr(), GetXLogReplayRecPtr(), pg_unreachable, WAIT_LSN_TYPE_COUNT, WAIT_LSN_TYPE_PRIMARY_FLUSH, WAIT_LSN_TYPE_STANDBY_FLUSH, WAIT_LSN_TYPE_STANDBY_REPLAY, and WAIT_LSN_TYPE_STANDBY_WRITE.

Referenced by ExecWaitStmt(), and WaitForLSN().

◆ StaticAssertDecl()

StaticAssertDecl ( lengthof(WaitLSNWaitEvents = =WAIT_LSN_TYPE_COUNT,
"WaitLSNWaitEvents must match WaitLSNType enum"   
)

◆ updateMinWaitedLSN()

static void updateMinWaitedLSN ( WaitLSNType  lsnType)
static

Definition at line 173 of file xlogwait.c.

174{
175 XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
176 int i = (int) lsnType;
177
178 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
179
181 {
184
185 minWaitedLSN = procInfo->waitLSN;
186 }
188}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:485
#define PG_UINT64_MAX
Definition c.h:677
pairingheap_node * pairingheap_first(pairingheap *heap)
#define pairingheap_is_empty(h)
Definition pairingheap.h:99
#define pairingheap_container(type, membername, ptr)
Definition pairingheap.h:43
pg_atomic_uint64 minWaitedLSN[WAIT_LSN_TYPE_COUNT]
Definition xlogwait.h:85
uint64 XLogRecPtr
Definition xlogdefs.h:21

References Assert, fb(), i, WaitLSNState::minWaitedLSN, pairingheap_container, pairingheap_first(), pairingheap_is_empty, pg_atomic_write_u64(), PG_UINT64_MAX, WAIT_LSN_TYPE_COUNT, WaitLSNState::waitersHeap, and waitLSNState.

Referenced by addLSNWaiter(), deleteLSNWaiter(), and wakeupWaiters().

◆ WaitForLSN()

WaitLSNResult WaitForLSN ( WaitLSNType  lsnType,
XLogRecPtr  targetLSN,
int64  timeout 
)

Definition at line 378 of file xlogwait.c.

379{
383
384 /* Shouldn't be called when shmem isn't initialized */
386
387 /* Should have a valid proc number */
389
390 if (timeout > 0)
391 {
394 }
395
396 /*
397 * Add our process to the waiters heap. It might happen that target LSN
398 * gets reached before we do. The check at the beginning of the loop
399 * below prevents the race condition.
400 */
401 addLSNWaiter(targetLSN, lsnType);
402
403 for (;;)
404 {
405 int rc;
406 long delay_ms = -1;
407
408 /* Get current LSN for the wait type */
410
411 /* Check that recovery is still in-progress */
413 {
414 /*
415 * Recovery was ended, but check if target LSN was already
416 * reached.
417 */
418 deleteLSNWaiter(lsnType);
419
423 }
424 else
425 {
426 /* Check if the waited LSN has been reached */
427 if (targetLSN <= currentLSN)
428 break;
429 }
430
431 if (timeout > 0)
432 {
434 if (delay_ms <= 0)
435 break;
436 }
437
439
440 rc = WaitLatch(MyLatch, wake_events, delay_ms,
441 WaitLSNWaitEvents[lsnType]);
442
443 /*
444 * Emergency bailout if postmaster has died. This is to avoid the
445 * necessity for manual cleanup of all postmaster children.
446 */
447 if (rc & WL_POSTMASTER_DEATH)
450 errmsg("terminating connection due to unexpected postmaster exit"),
451 errcontext("while waiting for LSN"));
452
453 if (rc & WL_LATCH_SET)
455 }
456
457 /*
458 * Delete our process from the shared memory heap. We might already be
459 * deleted by the startup process. The 'inHeap' flags prevents us from
460 * the double deletion.
461 */
462 deleteLSNWaiter(lsnType);
463
464 /*
465 * If we didn't reach the target LSN, we must be exited by timeout.
466 */
467 if (targetLSN > currentLSN)
469
471}
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1751
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1639
int64 TimestampTz
Definition timestamp.h:39
int errcode(int sqlerrcode)
Definition elog.c:874
#define errcontext
Definition elog.h:200
#define FATAL
Definition elog.h:42
#define ereport(elevel,...)
Definition elog.h:152
int MaxBackends
Definition globals.c:149
struct Latch * MyLatch
Definition globals.c:65
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
static char * errmsg
#define NUM_AUXILIARY_PROCS
Definition proc.h:527
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
#define WL_TIMEOUT
#define WL_LATCH_SET
#define WL_POSTMASTER_DEATH
bool RecoveryInProgress(void)
Definition xlog.c:6830
bool PromoteIsTriggered(void)
static bool WaitLSNTypeRequiresRecovery(WaitLSNType t)
Definition xlogwait.c:362
static void deleteLSNWaiter(WaitLSNType lsnType)
Definition xlogwait.c:219
XLogRecPtr GetCurrentLSNForWaitType(WaitLSNType lsnType)
Definition xlogwait.c:98
static void addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
Definition xlogwait.c:194
static const uint32 WaitLSNWaitEvents[]
Definition xlogwait.c:84
@ WAIT_LSN_RESULT_NOT_IN_RECOVERY
Definition xlogwait.h:28
@ WAIT_LSN_RESULT_TIMEOUT
Definition xlogwait.h:30
@ WAIT_LSN_RESULT_SUCCESS
Definition xlogwait.h:27

References addLSNWaiter(), Assert, CHECK_FOR_INTERRUPTS, deleteLSNWaiter(), ereport, errcode(), errcontext, errmsg, FATAL, fb(), GetCurrentLSNForWaitType(), GetCurrentTimestamp(), MaxBackends, MyLatch, MyProcNumber, NUM_AUXILIARY_PROCS, PromoteIsTriggered(), RecoveryInProgress(), ResetLatch(), TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, WAIT_LSN_RESULT_NOT_IN_RECOVERY, WAIT_LSN_RESULT_SUCCESS, WAIT_LSN_RESULT_TIMEOUT, WaitLatch(), waitLSNState, WaitLSNTypeRequiresRecovery(), WaitLSNWaitEvents, WL_LATCH_SET, WL_POSTMASTER_DEATH, and WL_TIMEOUT.

Referenced by decode_concurrent_changes(), and ExecWaitStmt().

◆ waitlsn_cmp()

static int waitlsn_cmp ( const pairingheap_node a,
const pairingheap_node b,
void arg 
)
static

Definition at line 156 of file xlogwait.c.

157{
160
161 if (aproc->waitLSN < bproc->waitLSN)
162 return 1;
163 else if (aproc->waitLSN > bproc->waitLSN)
164 return -1;
165 else
166 return 0;
167}
int b
Definition isn.c:74
int a
Definition isn.c:73
#define pairingheap_const_container(type, membername, ptr)
Definition pairingheap.h:51

References a, b, fb(), and pairingheap_const_container.

Referenced by WaitLSNShmemInit().

◆ WaitLSNCleanup()

void WaitLSNCleanup ( void  )

Definition at line 341 of file xlogwait.c.

342{
343 if (waitLSNState)
344 {
345 /*
346 * We do a fast-path check of the inHeap flag without the lock. This
347 * flag is set to true only by the process itself. So, it's only
348 * possible to get a false positive. But that will be eliminated by a
349 * recheck inside deleteLSNWaiter().
350 */
353 }
354}
WaitLSNType lsnType
Definition xlogwait.h:60

References deleteLSNWaiter(), WaitLSNProcInfo::inHeap, WaitLSNProcInfo::lsnType, MyProcNumber, WaitLSNState::procInfos, and waitLSNState.

Referenced by AbortTransaction(), and ProcKill().

◆ WaitLSNShmemInit()

static void WaitLSNShmemInit ( void arg)
static

Definition at line 137 of file xlogwait.c.

138{
139 /* Initialize heaps and tracking */
140 for (int i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
141 {
144 }
145
146 /* Initialize process info array */
149}
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:453
void pairingheap_initialize(pairingheap *heap, pairingheap_comparator compare, void *arg)
Definition pairingheap.c:60
static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
Definition xlogwait.c:156

References fb(), i, MaxBackends, WaitLSNState::minWaitedLSN, NUM_AUXILIARY_PROCS, pairingheap_initialize(), pg_atomic_init_u64(), PG_UINT64_MAX, WaitLSNState::procInfos, WAIT_LSN_TYPE_COUNT, WaitLSNState::waitersHeap, waitlsn_cmp(), and waitLSNState.

◆ WaitLSNShmemRequest()

static void WaitLSNShmemRequest ( void arg)
static

Definition at line 123 of file xlogwait.c.

124{
125 Size size;
126
127 size = offsetof(WaitLSNState, procInfos);
129 ShmemRequestStruct(.name = "WaitLSNState",
130 .size = size,
131 .ptr = (void **) &waitLSNState,
132 );
133}
size_t Size
Definition c.h:689
Size add_size(Size s1, Size s2)
Definition shmem.c:1048
Size mul_size(Size s1, Size s2)
Definition shmem.c:1063
#define ShmemRequestStruct(...)
Definition shmem.h:176
const char * name

References add_size(), fb(), MaxBackends, mul_size(), name, NUM_AUXILIARY_PROCS, WaitLSNState::procInfos, ShmemRequestStruct, and waitLSNState.

◆ WaitLSNTypeRequiresRecovery()

static bool WaitLSNTypeRequiresRecovery ( WaitLSNType  t)
inlinestatic

◆ WaitLSNWakeup()

void WaitLSNWakeup ( WaitLSNType  lsnType,
XLogRecPtr  currentLSN 
)

Definition at line 320 of file xlogwait.c.

321{
322 int i = (int) lsnType;
323
324 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
325
326 /*
327 * Fast path check. Skip if currentLSN is InvalidXLogRecPtr, which means
328 * "wake all waiters" (e.g., during promotion when recovery ends).
329 */
332 return;
333
334 wakeupWaiters(lsnType, currentLSN);
335}
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition atomics.h:467
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
static void wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
Definition xlogwait.c:259

References Assert, fb(), i, WaitLSNState::minWaitedLSN, pg_atomic_read_u64(), WAIT_LSN_TYPE_COUNT, waitLSNState, wakeupWaiters(), and XLogRecPtrIsValid.

Referenced by PerformWalRecovery(), StartupXLOG(), XLogBackgroundFlush(), XLogFlush(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ wakeupWaiters()

static void wakeupWaiters ( WaitLSNType  lsnType,
XLogRecPtr  currentLSN 
)
static

Definition at line 259 of file xlogwait.c.

260{
262 int numWakeUpProcs;
263 int i = (int) lsnType;
264
265 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
266
267 do
268 {
269 int j;
270
271 numWakeUpProcs = 0;
273
274 /*
275 * Iterate the waiters heap until we find LSN not yet reached. Record
276 * process numbers to wake up, but send wakeups after releasing lock.
277 */
279 {
282
283 /* Get procInfo using appropriate heap node */
285
287 break;
288
292
293 /* Update appropriate flag */
294 procInfo->inHeap = false;
295
297 break;
298 }
299
300 updateMinWaitedLSN(lsnType);
302
303 /*
304 * Set latches for processes whose waited LSNs have been reached.
305 * Since SetLatch() is a time-consuming operation, we do this outside
306 * of WaitLSNLock. This is safe because procLatch is never freed, so
307 * at worst we may set a latch for the wrong process or for no process
308 * at all, which is harmless.
309 */
310 for (j = 0; j < numWakeUpProcs; j++)
312
314}
int j
Definition isn.c:78
void SetLatch(Latch *latch)
Definition latch.c:290
pairingheap_node * pairingheap_remove_first(pairingheap *heap)
#define GetPGProcByNumber(n)
Definition proc.h:504
int ProcNumber
Definition procnumber.h:24
#define WAKEUP_PROC_STATIC_ARRAY_SIZE
Definition xlogwait.c:244

References Assert, fb(), GetPGProcByNumber, i, j, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), pairingheap_container, pairingheap_first(), pairingheap_is_empty, pairingheap_remove_first(), SetLatch(), updateMinWaitedLSN(), WAIT_LSN_TYPE_COUNT, WaitLSNState::waitersHeap, waitLSNState, WAKEUP_PROC_STATIC_ARRAY_SIZE, and XLogRecPtrIsValid.

Referenced by WaitLSNWakeup().

Variable Documentation

◆ WaitLSNShmemCallbacks

const ShmemCallbacks WaitLSNShmemCallbacks
Initial value:
= {
.request_fn = WaitLSNShmemRequest,
.init_fn = WaitLSNShmemInit,
}
static void WaitLSNShmemRequest(void *arg)
Definition xlogwait.c:123
static void WaitLSNShmemInit(void *arg)
Definition xlogwait.c:137

Definition at line 75 of file xlogwait.c.

75 {
76 .request_fn = WaitLSNShmemRequest,
77 .init_fn = WaitLSNShmemInit,
78};

◆ waitLSNState

◆ WaitLSNWaitEvents