PostgreSQL Source Code git master
Loading...
Searching...
No Matches
xlogwait.c File Reference
#include "postgres.h"
#include <float.h>
#include "access/xlog.h"
#include "access/xlogrecovery.h"
#include "access/xlogwait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/walreceiver.h"
#include "storage/latch.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "utils/fmgrprotos.h"
#include "utils/pg_lsn.h"
#include "utils/snapmgr.h"
Include dependency graph for xlogwait.c:

Go to the source code of this file.

Macros

#define WAKEUP_PROC_STATIC_ARRAY_SIZE   (16)
 

Functions

static int waitlsn_cmp (const pairingheap_node *a, const pairingheap_node *b, void *arg)
 
 StaticAssertDecl (lengthof(WaitLSNWaitEvents)==WAIT_LSN_TYPE_COUNT, "WaitLSNWaitEvents must match WaitLSNType enum")
 
XLogRecPtr GetCurrentLSNForWaitType (WaitLSNType lsnType)
 
Size WaitLSNShmemSize (void)
 
void WaitLSNShmemInit (void)
 
static void updateMinWaitedLSN (WaitLSNType lsnType)
 
static void addLSNWaiter (XLogRecPtr lsn, WaitLSNType lsnType)
 
static void deleteLSNWaiter (WaitLSNType lsnType)
 
static void wakeupWaiters (WaitLSNType lsnType, XLogRecPtr currentLSN)
 
void WaitLSNWakeup (WaitLSNType lsnType, XLogRecPtr currentLSN)
 
void WaitLSNCleanup (void)
 
static bool WaitLSNTypeRequiresRecovery (WaitLSNType t)
 
WaitLSNResult WaitForLSN (WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
 

Variables

struct WaitLSNStatewaitLSNState = NULL
 
static const uint32 WaitLSNWaitEvents []
 

Macro Definition Documentation

◆ WAKEUP_PROC_STATIC_ARRAY_SIZE

#define WAKEUP_PROC_STATIC_ARRAY_SIZE   (16)

Definition at line 241 of file xlogwait.c.

Function Documentation

◆ addLSNWaiter()

static void addLSNWaiter ( XLogRecPtr  lsn,
WaitLSNType  lsnType 
)
static

Definition at line 191 of file xlogwait.c.

192{
194 int i = (int) lsnType;
195
196 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
197
199
200 procInfo->procno = MyProcNumber;
201 procInfo->waitLSN = lsn;
202 procInfo->lsnType = lsnType;
203
204 Assert(!procInfo->inHeap);
206 procInfo->inHeap = true;
207 updateMinWaitedLSN(lsnType);
208
210}
#define Assert(condition)
Definition c.h:883
ProcNumber MyProcNumber
Definition globals.c:90
int i
Definition isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1176
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1793
@ LW_EXCLUSIVE
Definition lwlock.h:112
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
static int fb(int x)
WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER]
Definition xlogwait.h:97
pairingheap waitersHeap[WAIT_LSN_TYPE_COUNT]
Definition xlogwait.h:91
struct WaitLSNState * waitLSNState
Definition xlogwait.c:68
static void updateMinWaitedLSN(WaitLSNType lsnType)
Definition xlogwait.c:170
#define WAIT_LSN_TYPE_COUNT
Definition xlogwait.h:47

References Assert, fb(), i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcNumber, pairingheap_add(), WaitLSNState::procInfos, updateMinWaitedLSN(), WAIT_LSN_TYPE_COUNT, WaitLSNState::waitersHeap, and waitLSNState.

Referenced by WaitForLSN().

◆ deleteLSNWaiter()

static void deleteLSNWaiter ( WaitLSNType  lsnType)
static

Definition at line 216 of file xlogwait.c.

217{
219 int i = (int) lsnType;
220
221 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
222
224
225 Assert(procInfo->lsnType == lsnType);
226
227 if (procInfo->inHeap)
228 {
230 procInfo->inHeap = false;
231 updateMinWaitedLSN(lsnType);
232 }
233
235}
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)

References Assert, fb(), i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcNumber, pairingheap_remove(), WaitLSNState::procInfos, updateMinWaitedLSN(), WAIT_LSN_TYPE_COUNT, WaitLSNState::waitersHeap, and waitLSNState.

Referenced by WaitForLSN(), and WaitLSNCleanup().

◆ GetCurrentLSNForWaitType()

XLogRecPtr GetCurrentLSNForWaitType ( WaitLSNType  lsnType)

Definition at line 88 of file xlogwait.c.

89{
90 Assert(lsnType >= 0 && lsnType < WAIT_LSN_TYPE_COUNT);
91
92 switch (lsnType)
93 {
96
98 return GetWalRcvWriteRecPtr();
99
102
104 return GetFlushRecPtr(NULL);
105 }
106
107 elog(ERROR, "invalid LSN wait type: %d", lsnType);
109}
#define pg_unreachable()
Definition c.h:351
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogRecPtr GetWalRcvWriteRecPtr(void)
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition xlog.c:6626
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
@ WAIT_LSN_TYPE_PRIMARY_FLUSH
Definition xlogwait.h:44
@ WAIT_LSN_TYPE_STANDBY_REPLAY
Definition xlogwait.h:39
@ WAIT_LSN_TYPE_STANDBY_FLUSH
Definition xlogwait.h:41
@ WAIT_LSN_TYPE_STANDBY_WRITE
Definition xlogwait.h:40

References Assert, elog, ERROR, fb(), GetFlushRecPtr(), GetWalRcvFlushRecPtr(), GetWalRcvWriteRecPtr(), GetXLogReplayRecPtr(), pg_unreachable, WAIT_LSN_TYPE_COUNT, WAIT_LSN_TYPE_PRIMARY_FLUSH, WAIT_LSN_TYPE_STANDBY_FLUSH, WAIT_LSN_TYPE_STANDBY_REPLAY, and WAIT_LSN_TYPE_STANDBY_WRITE.

Referenced by ExecWaitStmt(), and WaitForLSN().

◆ StaticAssertDecl()

StaticAssertDecl ( lengthof(WaitLSNWaitEvents = =WAIT_LSN_TYPE_COUNT,
"WaitLSNWaitEvents must match WaitLSNType enum"   
)

◆ updateMinWaitedLSN()

static void updateMinWaitedLSN ( WaitLSNType  lsnType)
static

Definition at line 170 of file xlogwait.c.

171{
172 XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
173 int i = (int) lsnType;
174
175 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
176
178 {
181
182 minWaitedLSN = procInfo->waitLSN;
183 }
185}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:485
#define PG_UINT64_MAX
Definition c.h:617
pairingheap_node * pairingheap_first(pairingheap *heap)
#define pairingheap_is_empty(h)
Definition pairingheap.h:99
#define pairingheap_container(type, membername, ptr)
Definition pairingheap.h:43
pg_atomic_uint64 minWaitedLSN[WAIT_LSN_TYPE_COUNT]
Definition xlogwait.h:85
uint64 XLogRecPtr
Definition xlogdefs.h:21

References Assert, fb(), i, WaitLSNState::minWaitedLSN, pairingheap_container, pairingheap_first(), pairingheap_is_empty, pg_atomic_write_u64(), PG_UINT64_MAX, WAIT_LSN_TYPE_COUNT, WaitLSNState::waitersHeap, and waitLSNState.

Referenced by addLSNWaiter(), deleteLSNWaiter(), and wakeupWaiters().

◆ WaitForLSN()

WaitLSNResult WaitForLSN ( WaitLSNType  lsnType,
XLogRecPtr  targetLSN,
int64  timeout 
)

Definition at line 375 of file xlogwait.c.

376{
380
381 /* Shouldn't be called when shmem isn't initialized */
383
384 /* Should have a valid proc number */
386
387 if (timeout > 0)
388 {
391 }
392
393 /*
394 * Add our process to the waiters heap. It might happen that target LSN
395 * gets reached before we do. The check at the beginning of the loop
396 * below prevents the race condition.
397 */
398 addLSNWaiter(targetLSN, lsnType);
399
400 for (;;)
401 {
402 int rc;
403 long delay_ms = -1;
404
405 /* Get current LSN for the wait type */
407
408 /* Check that recovery is still in-progress */
410 {
411 /*
412 * Recovery was ended, but check if target LSN was already
413 * reached.
414 */
415 deleteLSNWaiter(lsnType);
416
420 }
421 else
422 {
423 /* Check if the waited LSN has been reached */
424 if (targetLSN <= currentLSN)
425 break;
426 }
427
428 if (timeout > 0)
429 {
431 if (delay_ms <= 0)
432 break;
433 }
434
436
437 rc = WaitLatch(MyLatch, wake_events, delay_ms,
438 WaitLSNWaitEvents[lsnType]);
439
440 /*
441 * Emergency bailout if postmaster has died. This is to avoid the
442 * necessity for manual cleanup of all postmaster children.
443 */
444 if (rc & WL_POSTMASTER_DEATH)
447 errmsg("terminating connection due to unexpected postmaster exit"),
448 errcontext("while waiting for LSN"));
449
450 if (rc & WL_LATCH_SET)
452 }
453
454 /*
455 * Delete our process from the shared memory heap. We might already be
456 * deleted by the startup process. The 'inHeap' flags prevents us from
457 * the double deletion.
458 */
459 deleteLSNWaiter(lsnType);
460
461 /*
462 * If we didn't reach the target LSN, we must be exited by timeout.
463 */
464 if (targetLSN > currentLSN)
466
468}
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1757
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1645
int64 TimestampTz
Definition timestamp.h:39
int errcode(int sqlerrcode)
Definition elog.c:863
int errmsg(const char *fmt,...)
Definition elog.c:1080
#define errcontext
Definition elog.h:198
#define FATAL
Definition elog.h:41
#define ereport(elevel,...)
Definition elog.h:150
int MaxBackends
Definition globals.c:146
struct Latch * MyLatch
Definition globals.c:63
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
#define NUM_AUXILIARY_PROCS
Definition proc.h:469
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
#define WL_TIMEOUT
#define WL_LATCH_SET
#define WL_POSTMASTER_DEATH
bool RecoveryInProgress(void)
Definition xlog.c:6461
bool PromoteIsTriggered(void)
static bool WaitLSNTypeRequiresRecovery(WaitLSNType t)
Definition xlogwait.c:359
static void deleteLSNWaiter(WaitLSNType lsnType)
Definition xlogwait.c:216
XLogRecPtr GetCurrentLSNForWaitType(WaitLSNType lsnType)
Definition xlogwait.c:88
static void addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
Definition xlogwait.c:191
static const uint32 WaitLSNWaitEvents[]
Definition xlogwait.c:74
@ WAIT_LSN_RESULT_NOT_IN_RECOVERY
Definition xlogwait.h:28
@ WAIT_LSN_RESULT_TIMEOUT
Definition xlogwait.h:30
@ WAIT_LSN_RESULT_SUCCESS
Definition xlogwait.h:27

References addLSNWaiter(), Assert, CHECK_FOR_INTERRUPTS, deleteLSNWaiter(), ereport, errcode(), errcontext, errmsg(), FATAL, fb(), GetCurrentLSNForWaitType(), GetCurrentTimestamp(), MaxBackends, MyLatch, MyProcNumber, NUM_AUXILIARY_PROCS, PromoteIsTriggered(), RecoveryInProgress(), ResetLatch(), TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, WAIT_LSN_RESULT_NOT_IN_RECOVERY, WAIT_LSN_RESULT_SUCCESS, WAIT_LSN_RESULT_TIMEOUT, WaitLatch(), waitLSNState, WaitLSNTypeRequiresRecovery(), WaitLSNWaitEvents, WL_LATCH_SET, WL_POSTMASTER_DEATH, and WL_TIMEOUT.

Referenced by ExecWaitStmt().

◆ waitlsn_cmp()

static int waitlsn_cmp ( const pairingheap_node a,
const pairingheap_node b,
void arg 
)
static

Definition at line 153 of file xlogwait.c.

154{
157
158 if (aproc->waitLSN < bproc->waitLSN)
159 return 1;
160 else if (aproc->waitLSN > bproc->waitLSN)
161 return -1;
162 else
163 return 0;
164}
int b
Definition isn.c:74
int a
Definition isn.c:73
#define pairingheap_const_container(type, membername, ptr)
Definition pairingheap.h:51

References a, b, fb(), and pairingheap_const_container.

Referenced by WaitLSNShmemInit().

◆ WaitLSNCleanup()

void WaitLSNCleanup ( void  )

Definition at line 338 of file xlogwait.c.

339{
340 if (waitLSNState)
341 {
342 /*
343 * We do a fast-path check of the inHeap flag without the lock. This
344 * flag is set to true only by the process itself. So, it's only
345 * possible to get a false positive. But that will be eliminated by a
346 * recheck inside deleteLSNWaiter().
347 */
350 }
351}
WaitLSNType lsnType
Definition xlogwait.h:60

References deleteLSNWaiter(), WaitLSNProcInfo::inHeap, WaitLSNProcInfo::lsnType, MyProcNumber, WaitLSNState::procInfos, and waitLSNState.

Referenced by AbortTransaction(), and ProcKill().

◆ WaitLSNShmemInit()

void WaitLSNShmemInit ( void  )

Definition at line 124 of file xlogwait.c.

125{
126 bool found;
127
128 waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
130 &found);
131 if (!found)
132 {
133 int i;
134
135 /* Initialize heaps and tracking */
136 for (i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
137 {
140 }
141
142 /* Initialize process info array */
145 }
146}
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:453
void pairingheap_initialize(pairingheap *heap, pairingheap_comparator compare, void *arg)
Definition pairingheap.c:60
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:389
static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
Definition xlogwait.c:153
Size WaitLSNShmemSize(void)
Definition xlogwait.c:113

References fb(), i, MaxBackends, WaitLSNState::minWaitedLSN, NUM_AUXILIARY_PROCS, pairingheap_initialize(), pg_atomic_init_u64(), PG_UINT64_MAX, WaitLSNState::procInfos, ShmemInitStruct(), WAIT_LSN_TYPE_COUNT, WaitLSNState::waitersHeap, waitlsn_cmp(), WaitLSNShmemSize(), and waitLSNState.

Referenced by CreateOrAttachShmemStructs().

◆ WaitLSNShmemSize()

Size WaitLSNShmemSize ( void  )

Definition at line 113 of file xlogwait.c.

114{
115 Size size;
116
117 size = offsetof(WaitLSNState, procInfos);
119 return size;
120}
size_t Size
Definition c.h:629
Size add_size(Size s1, Size s2)
Definition shmem.c:495
Size mul_size(Size s1, Size s2)
Definition shmem.c:510

References add_size(), fb(), MaxBackends, mul_size(), NUM_AUXILIARY_PROCS, and WaitLSNState::procInfos.

Referenced by CalculateShmemSize(), and WaitLSNShmemInit().

◆ WaitLSNTypeRequiresRecovery()

static bool WaitLSNTypeRequiresRecovery ( WaitLSNType  t)
inlinestatic

◆ WaitLSNWakeup()

void WaitLSNWakeup ( WaitLSNType  lsnType,
XLogRecPtr  currentLSN 
)

Definition at line 317 of file xlogwait.c.

318{
319 int i = (int) lsnType;
320
321 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
322
323 /*
324 * Fast path check. Skip if currentLSN is InvalidXLogRecPtr, which means
325 * "wake all waiters" (e.g., during promotion when recovery ends).
326 */
329 return;
330
331 wakeupWaiters(lsnType, currentLSN);
332}
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition atomics.h:467
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
static void wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
Definition xlogwait.c:256

References Assert, fb(), i, WaitLSNState::minWaitedLSN, pg_atomic_read_u64(), WAIT_LSN_TYPE_COUNT, waitLSNState, wakeupWaiters(), and XLogRecPtrIsValid.

Referenced by PerformWalRecovery(), StartupXLOG(), XLogBackgroundFlush(), XLogFlush(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ wakeupWaiters()

static void wakeupWaiters ( WaitLSNType  lsnType,
XLogRecPtr  currentLSN 
)
static

Definition at line 256 of file xlogwait.c.

257{
259 int numWakeUpProcs;
260 int i = (int) lsnType;
261
262 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
263
264 do
265 {
266 int j;
267
268 numWakeUpProcs = 0;
270
271 /*
272 * Iterate the waiters heap until we find LSN not yet reached. Record
273 * process numbers to wake up, but send wakeups after releasing lock.
274 */
276 {
279
280 /* Get procInfo using appropriate heap node */
282
284 break;
285
289
290 /* Update appropriate flag */
291 procInfo->inHeap = false;
292
294 break;
295 }
296
297 updateMinWaitedLSN(lsnType);
299
300 /*
301 * Set latches for processes whose waited LSNs have been reached.
302 * Since SetLatch() is a time-consuming operation, we do this outside
303 * of WaitLSNLock. This is safe because procLatch is never freed, so
304 * at worst we may set a latch for the wrong process or for no process
305 * at all, which is harmless.
306 */
307 for (j = 0; j < numWakeUpProcs; j++)
309
311}
int j
Definition isn.c:78
void SetLatch(Latch *latch)
Definition latch.c:290
pairingheap_node * pairingheap_remove_first(pairingheap *heap)
#define GetPGProcByNumber(n)
Definition proc.h:446
int ProcNumber
Definition procnumber.h:24
#define WAKEUP_PROC_STATIC_ARRAY_SIZE
Definition xlogwait.c:241

References Assert, fb(), GetPGProcByNumber, i, j, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), pairingheap_container, pairingheap_first(), pairingheap_is_empty, pairingheap_remove_first(), SetLatch(), updateMinWaitedLSN(), WAIT_LSN_TYPE_COUNT, WaitLSNState::waitersHeap, waitLSNState, WAKEUP_PROC_STATIC_ARRAY_SIZE, and XLogRecPtrIsValid.

Referenced by WaitLSNWakeup().

Variable Documentation

◆ waitLSNState

◆ WaitLSNWaitEvents