PostgreSQL Source Code git master
Loading...
Searching...
No Matches
xlogwait.c File Reference
#include "postgres.h"
#include <float.h>
#include "access/xlog.h"
#include "access/xlogrecovery.h"
#include "access/xlogwait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/walreceiver.h"
#include "storage/latch.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "utils/fmgrprotos.h"
#include "utils/pg_lsn.h"
#include "utils/snapmgr.h"
#include "utils/wait_event.h"
Include dependency graph for xlogwait.c:

Go to the source code of this file.

Macros

#define WAKEUP_PROC_STATIC_ARRAY_SIZE   (16)
 

Functions

static int waitlsn_cmp (const pairingheap_node *a, const pairingheap_node *b, void *arg)
 
 StaticAssertDecl (lengthof(WaitLSNWaitEvents)==WAIT_LSN_TYPE_COUNT, "WaitLSNWaitEvents must match WaitLSNType enum")
 
XLogRecPtr GetCurrentLSNForWaitType (WaitLSNType lsnType)
 
Size WaitLSNShmemSize (void)
 
void WaitLSNShmemInit (void)
 
static void updateMinWaitedLSN (WaitLSNType lsnType)
 
static void addLSNWaiter (XLogRecPtr lsn, WaitLSNType lsnType)
 
static void deleteLSNWaiter (WaitLSNType lsnType)
 
static void wakeupWaiters (WaitLSNType lsnType, XLogRecPtr currentLSN)
 
void WaitLSNWakeup (WaitLSNType lsnType, XLogRecPtr currentLSN)
 
void WaitLSNCleanup (void)
 
static bool WaitLSNTypeRequiresRecovery (WaitLSNType t)
 
WaitLSNResult WaitForLSN (WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
 

Variables

struct WaitLSNStatewaitLSNState = NULL
 
static const uint32 WaitLSNWaitEvents []
 

Macro Definition Documentation

◆ WAKEUP_PROC_STATIC_ARRAY_SIZE

#define WAKEUP_PROC_STATIC_ARRAY_SIZE   (16)

Definition at line 242 of file xlogwait.c.

Function Documentation

◆ addLSNWaiter()

static void addLSNWaiter ( XLogRecPtr  lsn,
WaitLSNType  lsnType 
)
static

Definition at line 192 of file xlogwait.c.

193{
195 int i = (int) lsnType;
196
197 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
198
200
201 procInfo->procno = MyProcNumber;
202 procInfo->waitLSN = lsn;
203 procInfo->lsnType = lsnType;
204
205 Assert(!procInfo->inHeap);
207 procInfo->inHeap = true;
208 updateMinWaitedLSN(lsnType);
209
211}
#define Assert(condition)
Definition c.h:945
ProcNumber MyProcNumber
Definition globals.c:90
int i
Definition isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1177
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1794
@ LW_EXCLUSIVE
Definition lwlock.h:112
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
static int fb(int x)
WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER]
Definition xlogwait.h:97
pairingheap waitersHeap[WAIT_LSN_TYPE_COUNT]
Definition xlogwait.h:91
struct WaitLSNState * waitLSNState
Definition xlogwait.c:69
static void updateMinWaitedLSN(WaitLSNType lsnType)
Definition xlogwait.c:171
#define WAIT_LSN_TYPE_COUNT
Definition xlogwait.h:47

References Assert, fb(), i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcNumber, pairingheap_add(), WaitLSNState::procInfos, updateMinWaitedLSN(), WAIT_LSN_TYPE_COUNT, WaitLSNState::waitersHeap, and waitLSNState.

Referenced by WaitForLSN().

◆ deleteLSNWaiter()

static void deleteLSNWaiter ( WaitLSNType  lsnType)
static

Definition at line 217 of file xlogwait.c.

218{
220 int i = (int) lsnType;
221
222 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
223
225
226 Assert(procInfo->lsnType == lsnType);
227
228 if (procInfo->inHeap)
229 {
231 procInfo->inHeap = false;
232 updateMinWaitedLSN(lsnType);
233 }
234
236}
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)

References Assert, fb(), i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcNumber, pairingheap_remove(), WaitLSNState::procInfos, updateMinWaitedLSN(), WAIT_LSN_TYPE_COUNT, WaitLSNState::waitersHeap, and waitLSNState.

Referenced by WaitForLSN(), and WaitLSNCleanup().

◆ GetCurrentLSNForWaitType()

XLogRecPtr GetCurrentLSNForWaitType ( WaitLSNType  lsnType)

Definition at line 89 of file xlogwait.c.

90{
91 Assert(lsnType >= 0 && lsnType < WAIT_LSN_TYPE_COUNT);
92
93 switch (lsnType)
94 {
97
99 return GetWalRcvWriteRecPtr();
100
103
105 return GetFlushRecPtr(NULL);
106 }
107
108 elog(ERROR, "invalid LSN wait type: %d", lsnType);
110}
#define pg_unreachable()
Definition c.h:361
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogRecPtr GetWalRcvWriteRecPtr(void)
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition xlog.c:6609
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
@ WAIT_LSN_TYPE_PRIMARY_FLUSH
Definition xlogwait.h:44
@ WAIT_LSN_TYPE_STANDBY_REPLAY
Definition xlogwait.h:39
@ WAIT_LSN_TYPE_STANDBY_FLUSH
Definition xlogwait.h:41
@ WAIT_LSN_TYPE_STANDBY_WRITE
Definition xlogwait.h:40

References Assert, elog, ERROR, fb(), GetFlushRecPtr(), GetWalRcvFlushRecPtr(), GetWalRcvWriteRecPtr(), GetXLogReplayRecPtr(), pg_unreachable, WAIT_LSN_TYPE_COUNT, WAIT_LSN_TYPE_PRIMARY_FLUSH, WAIT_LSN_TYPE_STANDBY_FLUSH, WAIT_LSN_TYPE_STANDBY_REPLAY, and WAIT_LSN_TYPE_STANDBY_WRITE.

Referenced by ExecWaitStmt(), and WaitForLSN().

◆ StaticAssertDecl()

StaticAssertDecl ( lengthof(WaitLSNWaitEvents = =WAIT_LSN_TYPE_COUNT,
"WaitLSNWaitEvents must match WaitLSNType enum"   
)

◆ updateMinWaitedLSN()

static void updateMinWaitedLSN ( WaitLSNType  lsnType)
static

Definition at line 171 of file xlogwait.c.

172{
173 XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
174 int i = (int) lsnType;
175
176 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
177
179 {
182
183 minWaitedLSN = procInfo->waitLSN;
184 }
186}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:485
#define PG_UINT64_MAX
Definition c.h:679
pairingheap_node * pairingheap_first(pairingheap *heap)
#define pairingheap_is_empty(h)
Definition pairingheap.h:99
#define pairingheap_container(type, membername, ptr)
Definition pairingheap.h:43
pg_atomic_uint64 minWaitedLSN[WAIT_LSN_TYPE_COUNT]
Definition xlogwait.h:85
uint64 XLogRecPtr
Definition xlogdefs.h:21

References Assert, fb(), i, WaitLSNState::minWaitedLSN, pairingheap_container, pairingheap_first(), pairingheap_is_empty, pg_atomic_write_u64(), PG_UINT64_MAX, WAIT_LSN_TYPE_COUNT, WaitLSNState::waitersHeap, and waitLSNState.

Referenced by addLSNWaiter(), deleteLSNWaiter(), and wakeupWaiters().

◆ WaitForLSN()

WaitLSNResult WaitForLSN ( WaitLSNType  lsnType,
XLogRecPtr  targetLSN,
int64  timeout 
)

Definition at line 376 of file xlogwait.c.

377{
381
382 /* Shouldn't be called when shmem isn't initialized */
384
385 /* Should have a valid proc number */
387
388 if (timeout > 0)
389 {
392 }
393
394 /*
395 * Add our process to the waiters heap. It might happen that target LSN
396 * gets reached before we do. The check at the beginning of the loop
397 * below prevents the race condition.
398 */
399 addLSNWaiter(targetLSN, lsnType);
400
401 for (;;)
402 {
403 int rc;
404 long delay_ms = -1;
405
406 /* Get current LSN for the wait type */
408
409 /* Check that recovery is still in-progress */
411 {
412 /*
413 * Recovery was ended, but check if target LSN was already
414 * reached.
415 */
416 deleteLSNWaiter(lsnType);
417
421 }
422 else
423 {
424 /* Check if the waited LSN has been reached */
425 if (targetLSN <= currentLSN)
426 break;
427 }
428
429 if (timeout > 0)
430 {
432 if (delay_ms <= 0)
433 break;
434 }
435
437
438 rc = WaitLatch(MyLatch, wake_events, delay_ms,
439 WaitLSNWaitEvents[lsnType]);
440
441 /*
442 * Emergency bailout if postmaster has died. This is to avoid the
443 * necessity for manual cleanup of all postmaster children.
444 */
445 if (rc & WL_POSTMASTER_DEATH)
448 errmsg("terminating connection due to unexpected postmaster exit"),
449 errcontext("while waiting for LSN"));
450
451 if (rc & WL_LATCH_SET)
453 }
454
455 /*
456 * Delete our process from the shared memory heap. We might already be
457 * deleted by the startup process. The 'inHeap' flags prevents us from
458 * the double deletion.
459 */
460 deleteLSNWaiter(lsnType);
461
462 /*
463 * If we didn't reach the target LSN, we must be exited by timeout.
464 */
465 if (targetLSN > currentLSN)
467
469}
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1748
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1636
int64 TimestampTz
Definition timestamp.h:39
int errcode(int sqlerrcode)
Definition elog.c:874
#define errcontext
Definition elog.h:198
#define FATAL
Definition elog.h:41
#define ereport(elevel,...)
Definition elog.h:150
int MaxBackends
Definition globals.c:146
struct Latch * MyLatch
Definition globals.c:63
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
static char * errmsg
#define NUM_AUXILIARY_PROCS
Definition proc.h:524
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
#define WL_TIMEOUT
#define WL_LATCH_SET
#define WL_POSTMASTER_DEATH
bool RecoveryInProgress(void)
Definition xlog.c:6444
bool PromoteIsTriggered(void)
static bool WaitLSNTypeRequiresRecovery(WaitLSNType t)
Definition xlogwait.c:360
static void deleteLSNWaiter(WaitLSNType lsnType)
Definition xlogwait.c:217
XLogRecPtr GetCurrentLSNForWaitType(WaitLSNType lsnType)
Definition xlogwait.c:89
static void addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
Definition xlogwait.c:192
static const uint32 WaitLSNWaitEvents[]
Definition xlogwait.c:75
@ WAIT_LSN_RESULT_NOT_IN_RECOVERY
Definition xlogwait.h:28
@ WAIT_LSN_RESULT_TIMEOUT
Definition xlogwait.h:30
@ WAIT_LSN_RESULT_SUCCESS
Definition xlogwait.h:27

References addLSNWaiter(), Assert, CHECK_FOR_INTERRUPTS, deleteLSNWaiter(), ereport, errcode(), errcontext, errmsg, FATAL, fb(), GetCurrentLSNForWaitType(), GetCurrentTimestamp(), MaxBackends, MyLatch, MyProcNumber, NUM_AUXILIARY_PROCS, PromoteIsTriggered(), RecoveryInProgress(), ResetLatch(), TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, WAIT_LSN_RESULT_NOT_IN_RECOVERY, WAIT_LSN_RESULT_SUCCESS, WAIT_LSN_RESULT_TIMEOUT, WaitLatch(), waitLSNState, WaitLSNTypeRequiresRecovery(), WaitLSNWaitEvents, WL_LATCH_SET, WL_POSTMASTER_DEATH, and WL_TIMEOUT.

Referenced by ExecWaitStmt().

◆ waitlsn_cmp()

static int waitlsn_cmp ( const pairingheap_node a,
const pairingheap_node b,
void arg 
)
static

Definition at line 154 of file xlogwait.c.

155{
158
159 if (aproc->waitLSN < bproc->waitLSN)
160 return 1;
161 else if (aproc->waitLSN > bproc->waitLSN)
162 return -1;
163 else
164 return 0;
165}
int b
Definition isn.c:74
int a
Definition isn.c:73
#define pairingheap_const_container(type, membername, ptr)
Definition pairingheap.h:51

References a, b, fb(), and pairingheap_const_container.

Referenced by WaitLSNShmemInit().

◆ WaitLSNCleanup()

void WaitLSNCleanup ( void  )

Definition at line 339 of file xlogwait.c.

340{
341 if (waitLSNState)
342 {
343 /*
344 * We do a fast-path check of the inHeap flag without the lock. This
345 * flag is set to true only by the process itself. So, it's only
346 * possible to get a false positive. But that will be eliminated by a
347 * recheck inside deleteLSNWaiter().
348 */
351 }
352}
WaitLSNType lsnType
Definition xlogwait.h:60

References deleteLSNWaiter(), WaitLSNProcInfo::inHeap, WaitLSNProcInfo::lsnType, MyProcNumber, WaitLSNState::procInfos, and waitLSNState.

Referenced by AbortTransaction(), and ProcKill().

◆ WaitLSNShmemInit()

void WaitLSNShmemInit ( void  )

Definition at line 125 of file xlogwait.c.

126{
127 bool found;
128
129 waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
131 &found);
132 if (!found)
133 {
134 int i;
135
136 /* Initialize heaps and tracking */
137 for (i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
138 {
141 }
142
143 /* Initialize process info array */
146 }
147}
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:453
void pairingheap_initialize(pairingheap *heap, pairingheap_comparator compare, void *arg)
Definition pairingheap.c:60
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:381
static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
Definition xlogwait.c:154
Size WaitLSNShmemSize(void)
Definition xlogwait.c:114

References fb(), i, MaxBackends, WaitLSNState::minWaitedLSN, NUM_AUXILIARY_PROCS, pairingheap_initialize(), pg_atomic_init_u64(), PG_UINT64_MAX, WaitLSNState::procInfos, ShmemInitStruct(), WAIT_LSN_TYPE_COUNT, WaitLSNState::waitersHeap, waitlsn_cmp(), WaitLSNShmemSize(), and waitLSNState.

Referenced by CreateOrAttachShmemStructs().

◆ WaitLSNShmemSize()

Size WaitLSNShmemSize ( void  )

Definition at line 114 of file xlogwait.c.

115{
116 Size size;
117
118 size = offsetof(WaitLSNState, procInfos);
120 return size;
121}
size_t Size
Definition c.h:691
Size add_size(Size s1, Size s2)
Definition shmem.c:485
Size mul_size(Size s1, Size s2)
Definition shmem.c:500

References add_size(), fb(), MaxBackends, mul_size(), NUM_AUXILIARY_PROCS, and WaitLSNState::procInfos.

Referenced by CalculateShmemSize(), and WaitLSNShmemInit().

◆ WaitLSNTypeRequiresRecovery()

static bool WaitLSNTypeRequiresRecovery ( WaitLSNType  t)
inlinestatic

◆ WaitLSNWakeup()

void WaitLSNWakeup ( WaitLSNType  lsnType,
XLogRecPtr  currentLSN 
)

Definition at line 318 of file xlogwait.c.

319{
320 int i = (int) lsnType;
321
322 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
323
324 /*
325 * Fast path check. Skip if currentLSN is InvalidXLogRecPtr, which means
326 * "wake all waiters" (e.g., during promotion when recovery ends).
327 */
330 return;
331
332 wakeupWaiters(lsnType, currentLSN);
333}
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition atomics.h:467
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
static void wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
Definition xlogwait.c:257

References Assert, fb(), i, WaitLSNState::minWaitedLSN, pg_atomic_read_u64(), WAIT_LSN_TYPE_COUNT, waitLSNState, wakeupWaiters(), and XLogRecPtrIsValid.

Referenced by PerformWalRecovery(), StartupXLOG(), XLogBackgroundFlush(), XLogFlush(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ wakeupWaiters()

static void wakeupWaiters ( WaitLSNType  lsnType,
XLogRecPtr  currentLSN 
)
static

Definition at line 257 of file xlogwait.c.

258{
260 int numWakeUpProcs;
261 int i = (int) lsnType;
262
263 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
264
265 do
266 {
267 int j;
268
269 numWakeUpProcs = 0;
271
272 /*
273 * Iterate the waiters heap until we find LSN not yet reached. Record
274 * process numbers to wake up, but send wakeups after releasing lock.
275 */
277 {
280
281 /* Get procInfo using appropriate heap node */
283
285 break;
286
290
291 /* Update appropriate flag */
292 procInfo->inHeap = false;
293
295 break;
296 }
297
298 updateMinWaitedLSN(lsnType);
300
301 /*
302 * Set latches for processes whose waited LSNs have been reached.
303 * Since SetLatch() is a time-consuming operation, we do this outside
304 * of WaitLSNLock. This is safe because procLatch is never freed, so
305 * at worst we may set a latch for the wrong process or for no process
306 * at all, which is harmless.
307 */
308 for (j = 0; j < numWakeUpProcs; j++)
310
312}
int j
Definition isn.c:78
void SetLatch(Latch *latch)
Definition latch.c:290
pairingheap_node * pairingheap_remove_first(pairingheap *heap)
#define GetPGProcByNumber(n)
Definition proc.h:501
int ProcNumber
Definition procnumber.h:24
#define WAKEUP_PROC_STATIC_ARRAY_SIZE
Definition xlogwait.c:242

References Assert, fb(), GetPGProcByNumber, i, j, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), pairingheap_container, pairingheap_first(), pairingheap_is_empty, pairingheap_remove_first(), SetLatch(), updateMinWaitedLSN(), WAIT_LSN_TYPE_COUNT, WaitLSNState::waitersHeap, waitLSNState, WAKEUP_PROC_STATIC_ARRAY_SIZE, and XLogRecPtrIsValid.

Referenced by WaitLSNWakeup().

Variable Documentation

◆ waitLSNState

◆ WaitLSNWaitEvents