PostgreSQL Source Code git master
xlogwait.c File Reference
#include "postgres.h"
#include <float.h>
#include <math.h>
#include "access/xlog.h"
#include "access/xlogrecovery.h"
#include "access/xlogwait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/latch.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "utils/fmgrprotos.h"
#include "utils/pg_lsn.h"
#include "utils/snapmgr.h"
Include dependency graph for xlogwait.c:

Go to the source code of this file.

Macros

#define WAKEUP_PROC_STATIC_ARRAY_SIZE   (16)
 

Functions

static int waitlsn_cmp (const pairingheap_node *a, const pairingheap_node *b, void *arg)
 
Size WaitLSNShmemSize (void)
 
void WaitLSNShmemInit (void)
 
static void updateMinWaitedLSN (WaitLSNType lsnType)
 
static void addLSNWaiter (XLogRecPtr lsn, WaitLSNType lsnType)
 
static void deleteLSNWaiter (WaitLSNType lsnType)
 
static void wakeupWaiters (WaitLSNType lsnType, XLogRecPtr currentLSN)
 
void WaitLSNWakeup (WaitLSNType lsnType, XLogRecPtr currentLSN)
 
void WaitLSNCleanup (void)
 
WaitLSNResult WaitForLSN (WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
 

Variables

struct WaitLSNStatewaitLSNState = NULL
 

Macro Definition Documentation

◆ WAKEUP_PROC_STATIC_ARRAY_SIZE

#define WAKEUP_PROC_STATIC_ARRAY_SIZE   (16)

Definition at line 195 of file xlogwait.c.

Function Documentation

◆ addLSNWaiter()

static void addLSNWaiter ( XLogRecPtr  lsn,
WaitLSNType  lsnType 
)
static

Definition at line 145 of file xlogwait.c.

146{
148 int i = (int) lsnType;
149
150 Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
151
152 LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
153
154 procInfo->procno = MyProcNumber;
155 procInfo->waitLSN = lsn;
156 procInfo->lsnType = lsnType;
157
158 Assert(!procInfo->inHeap);
160 procInfo->inHeap = true;
161 updateMinWaitedLSN(lsnType);
162
163 LWLockRelease(WaitLSNLock);
164}
ProcNumber MyProcNumber
Definition: globals.c:90
Assert(PointerIsAligned(start, uint64))
int i
Definition: isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1174
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1894
@ LW_EXCLUSIVE
Definition: lwlock.h:112
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:126
ProcNumber procno
Definition: xlogwait.h:57
pairingheap_node heapNode
Definition: xlogwait.h:66
XLogRecPtr waitLSN
Definition: xlogwait.h:51
WaitLSNType lsnType
Definition: xlogwait.h:54
WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER]
Definition: xlogwait.h:91
pairingheap waitersHeap[WAIT_LSN_TYPE_COUNT]
Definition: xlogwait.h:85
struct WaitLSNState * waitLSNState
Definition: xlogwait.c:63
static void updateMinWaitedLSN(WaitLSNType lsnType)
Definition: xlogwait.c:124
@ WAIT_LSN_TYPE_COUNT
Definition: xlogwait.h:40

References Assert(), WaitLSNProcInfo::heapNode, i, WaitLSNProcInfo::inHeap, WaitLSNProcInfo::lsnType, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcNumber, pairingheap_add(), WaitLSNState::procInfos, WaitLSNProcInfo::procno, updateMinWaitedLSN(), WAIT_LSN_TYPE_COUNT, WaitLSNState::waitersHeap, WaitLSNProcInfo::waitLSN, and waitLSNState.

Referenced by WaitForLSN().

◆ deleteLSNWaiter()

static void deleteLSNWaiter ( WaitLSNType  lsnType)
static

Definition at line 170 of file xlogwait.c.

171{
173 int i = (int) lsnType;
174
175 Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
176
177 LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
178
179 Assert(procInfo->lsnType == lsnType);
180
181 if (procInfo->inHeap)
182 {
184 procInfo->inHeap = false;
185 updateMinWaitedLSN(lsnType);
186 }
187
188 LWLockRelease(WaitLSNLock);
189}
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:184

References Assert(), WaitLSNProcInfo::heapNode, i, WaitLSNProcInfo::inHeap, WaitLSNProcInfo::lsnType, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcNumber, pairingheap_remove(), WaitLSNState::procInfos, updateMinWaitedLSN(), WAIT_LSN_TYPE_COUNT, WaitLSNState::waitersHeap, and waitLSNState.

Referenced by WaitForLSN(), and WaitLSNCleanup().

◆ updateMinWaitedLSN()

static void updateMinWaitedLSN ( WaitLSNType  lsnType)
static

Definition at line 124 of file xlogwait.c.

125{
126 XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
127 int i = (int) lsnType;
128
129 Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
130
132 {
134 WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
135
136 minWaitedLSN = procInfo->waitLSN;
137 }
139}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:483
#define PG_UINT64_MAX
Definition: c.h:601
pairingheap_node * pairingheap_first(pairingheap *heap)
Definition: pairingheap.c:144
#define pairingheap_is_empty(h)
Definition: pairingheap.h:99
#define pairingheap_container(type, membername, ptr)
Definition: pairingheap.h:43
pg_atomic_uint64 minWaitedLSN[WAIT_LSN_TYPE_COUNT]
Definition: xlogwait.h:79
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References Assert(), i, WaitLSNState::minWaitedLSN, pairingheap_container, pairingheap_first(), pairingheap_is_empty, pg_atomic_write_u64(), PG_UINT64_MAX, WAIT_LSN_TYPE_COUNT, WaitLSNState::waitersHeap, WaitLSNProcInfo::waitLSN, and waitLSNState.

Referenced by addLSNWaiter(), deleteLSNWaiter(), and wakeupWaiters().

◆ WaitForLSN()

WaitLSNResult WaitForLSN ( WaitLSNType  lsnType,
XLogRecPtr  targetLSN,
int64  timeout 
)

Definition at line 314 of file xlogwait.c.

315{
316 XLogRecPtr currentLSN;
317 TimestampTz endtime = 0;
318 int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
319
320 /* Shouldn't be called when shmem isn't initialized */
322
323 /* Should have a valid proc number */
325
326 if (timeout > 0)
327 {
329 wake_events |= WL_TIMEOUT;
330 }
331
332 /*
333 * Add our process to the waiters heap. It might happen that target LSN
334 * gets reached before we do. The check at the beginning of the loop
335 * below prevents the race condition.
336 */
337 addLSNWaiter(targetLSN, lsnType);
338
339 for (;;)
340 {
341 int rc;
342 long delay_ms = -1;
343
344 if (lsnType == WAIT_LSN_TYPE_REPLAY)
345 currentLSN = GetXLogReplayRecPtr(NULL);
346 else
347 currentLSN = GetFlushRecPtr(NULL);
348
349 /* Check that recovery is still in-progress */
350 if (lsnType == WAIT_LSN_TYPE_REPLAY && !RecoveryInProgress())
351 {
352 /*
353 * Recovery was ended, but check if target LSN was already
354 * reached.
355 */
356 deleteLSNWaiter(lsnType);
357
358 if (PromoteIsTriggered() && targetLSN <= currentLSN)
361 }
362 else
363 {
364 /* Check if the waited LSN has been reached */
365 if (targetLSN <= currentLSN)
366 break;
367 }
368
369 if (timeout > 0)
370 {
372 if (delay_ms <= 0)
373 break;
374 }
375
377
378 rc = WaitLatch(MyLatch, wake_events, delay_ms,
379 (lsnType == WAIT_LSN_TYPE_REPLAY) ? WAIT_EVENT_WAIT_FOR_WAL_REPLAY : WAIT_EVENT_WAIT_FOR_WAL_FLUSH);
380
381 /*
382 * Emergency bailout if postmaster has died. This is to avoid the
383 * necessity for manual cleanup of all postmaster children.
384 */
385 if (rc & WL_POSTMASTER_DEATH)
387 errcode(ERRCODE_ADMIN_SHUTDOWN),
388 errmsg("terminating connection due to unexpected postmaster exit"),
389 errcontext("while waiting for LSN"));
390
391 if (rc & WL_LATCH_SET)
393 }
394
395 /*
396 * Delete our process from the shared memory heap. We might already be
397 * deleted by the startup process. The 'inHeap' flags prevents us from
398 * the double deletion.
399 */
400 deleteLSNWaiter(lsnType);
401
402 /*
403 * If we didn't reach the target LSN, we must be exited by timeout.
404 */
405 if (targetLSN > currentLSN)
407
409}
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1757
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
int64 TimestampTz
Definition: timestamp.h:39
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define errcontext
Definition: elog.h:198
#define FATAL
Definition: elog.h:41
#define ereport(elevel,...)
Definition: elog.h:150
int MaxBackends
Definition: globals.c:146
struct Latch * MyLatch
Definition: globals.c:63
void ResetLatch(Latch *latch)
Definition: latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:172
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
#define NUM_AUXILIARY_PROCS
Definition: proc.h:463
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
#define WL_TIMEOUT
Definition: waiteventset.h:37
#define WL_LATCH_SET
Definition: waiteventset.h:34
#define WL_POSTMASTER_DEATH
Definition: waiteventset.h:38
bool RecoveryInProgress(void)
Definition: xlog.c:6406
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6571
bool PromoteIsTriggered(void)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
static void deleteLSNWaiter(WaitLSNType lsnType)
Definition: xlogwait.c:170
static void addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
Definition: xlogwait.c:145
@ WAIT_LSN_RESULT_NOT_IN_RECOVERY
Definition: xlogwait.h:28
@ WAIT_LSN_RESULT_TIMEOUT
Definition: xlogwait.h:30
@ WAIT_LSN_RESULT_SUCCESS
Definition: xlogwait.h:27
@ WAIT_LSN_TYPE_REPLAY
Definition: xlogwait.h:38

References addLSNWaiter(), Assert(), CHECK_FOR_INTERRUPTS, deleteLSNWaiter(), ereport, errcode(), errcontext, errmsg(), FATAL, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), MaxBackends, MyLatch, MyProcNumber, NUM_AUXILIARY_PROCS, PromoteIsTriggered(), RecoveryInProgress(), ResetLatch(), TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, WAIT_LSN_RESULT_NOT_IN_RECOVERY, WAIT_LSN_RESULT_SUCCESS, WAIT_LSN_RESULT_TIMEOUT, WAIT_LSN_TYPE_REPLAY, WaitLatch(), waitLSNState, WL_LATCH_SET, WL_POSTMASTER_DEATH, and WL_TIMEOUT.

Referenced by ExecWaitStmt().

◆ waitlsn_cmp()

static int waitlsn_cmp ( const pairingheap_node a,
const pairingheap_node b,
void *  arg 
)
static

Definition at line 107 of file xlogwait.c.

108{
111
112 if (aproc->waitLSN < bproc->waitLSN)
113 return 1;
114 else if (aproc->waitLSN > bproc->waitLSN)
115 return -1;
116 else
117 return 0;
118}
int b
Definition: isn.c:74
int a
Definition: isn.c:73
#define pairingheap_const_container(type, membername, ptr)
Definition: pairingheap.h:51

References a, b, pairingheap_const_container, and WaitLSNProcInfo::waitLSN.

Referenced by WaitLSNShmemInit().

◆ WaitLSNCleanup()

void WaitLSNCleanup ( void  )

Definition at line 290 of file xlogwait.c.

291{
292 if (waitLSNState)
293 {
294 /*
295 * We do a fast-path check of the inHeap flag without the lock. This
296 * flag is set to true only by the process itself. So, it's only
297 * possible to get a false positive. But that will be eliminated by a
298 * recheck inside deleteLSNWaiter().
299 */
302 }
303}

References deleteLSNWaiter(), WaitLSNProcInfo::inHeap, WaitLSNProcInfo::lsnType, MyProcNumber, WaitLSNState::procInfos, and waitLSNState.

Referenced by AbortTransaction(), and ProcKill().

◆ WaitLSNShmemInit()

void WaitLSNShmemInit ( void  )

Definition at line 78 of file xlogwait.c.

79{
80 bool found;
81
82 waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
84 &found);
85 if (!found)
86 {
87 int i;
88
89 /* Initialize heaps and tracking */
90 for (i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
91 {
94 }
95
96 /* Initialize process info array */
97 memset(&waitLSNState->procInfos, 0,
99 }
100}
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:451
void pairingheap_initialize(pairingheap *heap, pairingheap_comparator compare, void *arg)
Definition: pairingheap.c:60
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:389
static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
Definition: xlogwait.c:107
Size WaitLSNShmemSize(void)
Definition: xlogwait.c:67

References i, MaxBackends, WaitLSNState::minWaitedLSN, NUM_AUXILIARY_PROCS, pairingheap_initialize(), pg_atomic_init_u64(), PG_UINT64_MAX, WaitLSNState::procInfos, ShmemInitStruct(), WAIT_LSN_TYPE_COUNT, WaitLSNState::waitersHeap, waitlsn_cmp(), WaitLSNShmemSize(), and waitLSNState.

Referenced by CreateOrAttachShmemStructs().

◆ WaitLSNShmemSize()

Size WaitLSNShmemSize ( void  )

Definition at line 67 of file xlogwait.c.

68{
69 Size size;
70
71 size = offsetof(WaitLSNState, procInfos);
73 return size;
74}
size_t Size
Definition: c.h:613
Size add_size(Size s1, Size s2)
Definition: shmem.c:495
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510

References add_size(), MaxBackends, mul_size(), NUM_AUXILIARY_PROCS, and WaitLSNState::procInfos.

Referenced by CalculateShmemSize(), and WaitLSNShmemInit().

◆ WaitLSNWakeup()

void WaitLSNWakeup ( WaitLSNType  lsnType,
XLogRecPtr  currentLSN 
)

Definition at line 269 of file xlogwait.c.

270{
271 int i = (int) lsnType;
272
273 Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
274
275 /*
276 * Fast path check. Skip if currentLSN is InvalidXLogRecPtr, which means
277 * "wake all waiters" (e.g., during promotion when recovery ends).
278 */
279 if (XLogRecPtrIsValid(currentLSN) &&
281 return;
282
283 wakeupWaiters(lsnType, currentLSN);
284}
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:465
#define XLogRecPtrIsValid(r)
Definition: xlogdefs.h:29
static void wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
Definition: xlogwait.c:210

References Assert(), i, WaitLSNState::minWaitedLSN, pg_atomic_read_u64(), WAIT_LSN_TYPE_COUNT, waitLSNState, wakeupWaiters(), and XLogRecPtrIsValid.

Referenced by PerformWalRecovery(), and StartupXLOG().

◆ wakeupWaiters()

static void wakeupWaiters ( WaitLSNType  lsnType,
XLogRecPtr  currentLSN 
)
static

Definition at line 210 of file xlogwait.c.

211{
213 int numWakeUpProcs;
214 int i = (int) lsnType;
215
216 Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
217
218 do
219 {
220 numWakeUpProcs = 0;
221 LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
222
223 /*
224 * Iterate the waiters heap until we find LSN not yet reached. Record
225 * process numbers to wake up, but send wakeups after releasing lock.
226 */
228 {
230 WaitLSNProcInfo *procInfo;
231
232 /* Get procInfo using appropriate heap node */
233 procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
234
235 if (XLogRecPtrIsValid(currentLSN) && procInfo->waitLSN > currentLSN)
236 break;
237
238 Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE);
239 wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
241
242 /* Update appropriate flag */
243 procInfo->inHeap = false;
244
245 if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
246 break;
247 }
248
249 updateMinWaitedLSN(lsnType);
250 LWLockRelease(WaitLSNLock);
251
252 /*
253 * Set latches for processes whose waited LSNs have been reached.
254 * Since SetLatch() is a time-consuming operation, we do this outside
255 * of WaitLSNLock. This is safe because procLatch is never freed, so
256 * at worst we may set a latch for the wrong process or for no process
257 * at all, which is harmless.
258 */
259 for (i = 0; i < numWakeUpProcs; i++)
260 SetLatch(&GetPGProcByNumber(wakeUpProcs[i])->procLatch);
261
262 } while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE);
263}
void SetLatch(Latch *latch)
Definition: latch.c:290
pairingheap_node * pairingheap_remove_first(pairingheap *heap)
Definition: pairingheap.c:159
#define GetPGProcByNumber(n)
Definition: proc.h:440
int ProcNumber
Definition: procnumber.h:24
#define WAKEUP_PROC_STATIC_ARRAY_SIZE
Definition: xlogwait.c:195

References Assert(), GetPGProcByNumber, i, WaitLSNProcInfo::inHeap, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), pairingheap_container, pairingheap_first(), pairingheap_is_empty, pairingheap_remove_first(), WaitLSNProcInfo::procno, SetLatch(), updateMinWaitedLSN(), WAIT_LSN_TYPE_COUNT, WaitLSNState::waitersHeap, WaitLSNProcInfo::waitLSN, waitLSNState, WAKEUP_PROC_STATIC_ARRAY_SIZE, and XLogRecPtrIsValid.

Referenced by WaitLSNWakeup().

Variable Documentation

◆ waitLSNState