PostgreSQL Source Code  git master
waitlsn.c File Reference
#include "postgres.h"
#include <float.h>
#include <math.h>
#include "pgstat.h"
#include "access/xlog.h"
#include "access/xlogrecovery.h"
#include "commands/waitlsn.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "storage/latch.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "utils/fmgrprotos.h"
#include "utils/pg_lsn.h"
#include "utils/snapmgr.h"
#include "utils/wait_event_types.h"
Include dependency graph for waitlsn.c:

Go to the source code of this file.

Functions

static int waitlsn_cmp (const pairingheap_node *a, const pairingheap_node *b, void *arg)
 
Size WaitLSNShmemSize (void)
 
void WaitLSNShmemInit (void)
 
static void updateMinWaitedLSN (void)
 
static void addLSNWaiter (XLogRecPtr lsn)
 
static void deleteLSNWaiter (void)
 
void WaitLSNSetLatches (XLogRecPtr currentLSN)
 
void WaitLSNCleanup (void)
 
static void WaitForLSNReplay (XLogRecPtr targetLSN, int64 timeout)
 
Datum pg_wal_replay_wait (PG_FUNCTION_ARGS)
 

Variables

struct WaitLSNStatewaitLSNState = NULL
 

Function Documentation

◆ addLSNWaiter()

static void addLSNWaiter ( XLogRecPtr  lsn)
static

Definition at line 108 of file waitlsn.c.

109 {
111 
112  LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
113 
114  Assert(!procInfo->inHeap);
115 
116  procInfo->latch = MyLatch;
117  procInfo->waitLSN = lsn;
118 
120  procInfo->inHeap = true;
122 
123  LWLockRelease(WaitLSNLock);
124 }
#define Assert(condition)
Definition: c.h:858
ProcNumber MyProcNumber
Definition: globals.c:89
struct Latch * MyLatch
Definition: globals.c:62
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
@ LW_EXCLUSIVE
Definition: lwlock.h:114
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:126
pairingheap_node phNode
Definition: waitlsn.h:39
XLogRecPtr waitLSN
Definition: waitlsn.h:30
Latch * latch
Definition: waitlsn.h:36
WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER]
Definition: waitlsn.h:70
pairingheap waitersHeap
Definition: waitlsn.h:64
struct WaitLSNState * waitLSNState
Definition: waitlsn.c:37
static void updateMinWaitedLSN(void)
Definition: waitlsn.c:90

References Assert, WaitLSNProcInfo::inHeap, WaitLSNProcInfo::latch, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyLatch, MyProcNumber, pairingheap_add(), WaitLSNProcInfo::phNode, WaitLSNState::procInfos, updateMinWaitedLSN(), WaitLSNState::waitersHeap, WaitLSNProcInfo::waitLSN, and waitLSNState.

Referenced by WaitForLSNReplay().

◆ deleteLSNWaiter()

static void deleteLSNWaiter ( void  )
static

Definition at line 130 of file waitlsn.c.

131 {
133 
134  LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
135 
136  if (!procInfo->inHeap)
137  {
138  LWLockRelease(WaitLSNLock);
139  return;
140  }
141 
143  procInfo->inHeap = false;
145 
146  LWLockRelease(WaitLSNLock);
147 }
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:184

References WaitLSNProcInfo::inHeap, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcNumber, pairingheap_remove(), WaitLSNProcInfo::phNode, WaitLSNState::procInfos, updateMinWaitedLSN(), WaitLSNState::waitersHeap, and waitLSNState.

Referenced by WaitForLSNReplay(), and WaitLSNCleanup().

◆ pg_wal_replay_wait()

Datum pg_wal_replay_wait ( PG_FUNCTION_ARGS  )

Definition at line 341 of file waitlsn.c.

342 {
343  XLogRecPtr target_lsn = PG_GETARG_LSN(0);
344  int64 timeout = PG_GETARG_INT64(1);
345 
346  if (timeout < 0)
347  ereport(ERROR,
348  (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
349  errmsg("\"timeout\" must not be negative")));
350 
351  /*
352  * We are going to wait for the LSN replay. We should first care that we
353  * don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
354  * Otherwise, our snapshot could prevent the replay of WAL records
355  * implying a kind of self-deadlock. This is the reason why
356  * pg_wal_replay_wait() is a procedure, not a function.
357  *
358  * At first, we should check there is no active snapshot. According to
359  * PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is
360  * processed with a snapshot. Thankfully, we can pop this snapshot,
361  * because PortalRunUtility() can tolerate this.
362  */
363  if (ActiveSnapshotSet())
365 
366  /*
367  * At second, invalidate a catalog snapshot if any. And we should be done
368  * with the preparation.
369  */
371 
372  /* Give up if there is still an active or registered snapshot. */
373  if (GetOldestSnapshot())
374  ereport(ERROR,
375  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
376  errmsg("pg_wal_replay_wait() must be only called without an active or registered snapshot"),
377  errdetail("Make sure pg_wal_replay_wait() isn't called within a transaction with an isolation level higher than READ COMMITTED, another procedure, or a function.")));
378 
379  /*
380  * As the result we should hold no snapshot, and correspondingly our xmin
381  * should be unset.
382  */
384 
385  (void) WaitForLSNReplay(target_lsn, timeout);
386 
387  PG_RETURN_VOID();
388 }
int errdetail(const char *fmt,...)
Definition: elog.c:1203
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_GETARG_INT64(n)
Definition: fmgr.h:283
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:33
bool ActiveSnapshotSet(void)
Definition: snapmgr.c:782
void PopActiveSnapshot(void)
Definition: snapmgr.c:743
void InvalidateCatalogSnapshot(void)
Definition: snapmgr.c:422
Snapshot GetOldestSnapshot(void)
Definition: snapmgr.c:323
PGPROC * MyProc
Definition: proc.c:67
TransactionId xmin
Definition: proc.h:172
#define InvalidTransactionId
Definition: transam.h:31
static void WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout)
Definition: waitlsn.c:221
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References ActiveSnapshotSet(), Assert, ereport, errcode(), errdetail(), errmsg(), ERROR, GetOldestSnapshot(), InvalidateCatalogSnapshot(), InvalidTransactionId, MyProc, PG_GETARG_INT64, PG_GETARG_LSN, PG_RETURN_VOID, PopActiveSnapshot(), WaitForLSNReplay(), and PGPROC::xmin.

◆ updateMinWaitedLSN()

static void updateMinWaitedLSN ( void  )
static

Definition at line 90 of file waitlsn.c.

91 {
92  XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
93 
95  {
97 
98  minWaitedLSN = pairingheap_container(WaitLSNProcInfo, phNode, node)->waitLSN;
99  }
100 
102 }
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:485
#define PG_UINT64_MAX
Definition: c.h:593
pairingheap_node * pairingheap_first(pairingheap *heap)
Definition: pairingheap.c:144
#define pairingheap_is_empty(h)
Definition: pairingheap.h:99
#define pairingheap_container(type, membername, ptr)
Definition: pairingheap.h:43
pg_atomic_uint64 minWaitedLSN
Definition: waitlsn.h:58

References WaitLSNState::minWaitedLSN, pairingheap_container, pairingheap_first(), pairingheap_is_empty, pg_atomic_write_u64(), PG_UINT64_MAX, WaitLSNState::waitersHeap, and waitLSNState.

Referenced by addLSNWaiter(), deleteLSNWaiter(), and WaitLSNSetLatches().

◆ WaitForLSNReplay()

static void WaitForLSNReplay ( XLogRecPtr  targetLSN,
int64  timeout 
)
static

Definition at line 221 of file waitlsn.c.

222 {
223  XLogRecPtr currentLSN;
224  TimestampTz endtime = 0;
225  int wake_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
226 
227  /* Shouldn't be called when shmem isn't initialized */
229 
230  /* Should have a valid proc number */
232 
233  if (!RecoveryInProgress())
234  {
235  /*
236  * Recovery is not in progress. Given that we detected this in the
237  * very first check, this procedure was mistakenly called on primary.
238  * However, it's possible that standby was promoted concurrently to
239  * the procedure call, while target LSN is replayed. So, we still
240  * check the last replay LSN before reporting an error.
241  */
242  if (targetLSN <= GetXLogReplayRecPtr(NULL))
243  return;
244  ereport(ERROR,
245  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
246  errmsg("recovery is not in progress"),
247  errhint("Waiting for LSN can only be executed during recovery.")));
248  }
249  else
250  {
251  /* If target LSN is already replayed, exit immediately */
252  if (targetLSN <= GetXLogReplayRecPtr(NULL))
253  return;
254  }
255 
256  if (timeout > 0)
257  {
258  endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
259  wake_events |= WL_TIMEOUT;
260  }
261 
262  /*
263  * Add our process to the pairing heap of waiters. It might happen that
264  * target LSN gets replayed before we do. Another check at the beginning
265  * of the loop below prevents the race condition.
266  */
267  addLSNWaiter(targetLSN);
268 
269  for (;;)
270  {
271  int rc;
272  long delay_ms = 0;
273 
274  /* Recheck that recovery is still in-progress */
275  if (!RecoveryInProgress())
276  {
277  /*
278  * Recovery was ended, but recheck if target LSN was already
279  * replayed.
280  */
281  currentLSN = GetXLogReplayRecPtr(NULL);
282  if (targetLSN <= currentLSN)
283  return;
284  ereport(ERROR,
285  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
286  errmsg("recovery is not in progress"),
287  errdetail("Recovery ended before replaying target LSN %X/%X; last replay LSN %X/%X.",
288  LSN_FORMAT_ARGS(targetLSN),
289  LSN_FORMAT_ARGS(currentLSN))));
290  }
291  else
292  {
293  /* Check if the waited LSN has been replayed */
294  currentLSN = GetXLogReplayRecPtr(NULL);
295  if (targetLSN <= currentLSN)
296  break;
297  }
298 
299  /*
300  * If the timeout value is specified, calculate the number of
301  * milliseconds before the timeout. Exit if the timeout is already
302  * achieved.
303  */
304  if (timeout > 0)
305  {
307  if (delay_ms <= 0)
308  break;
309  }
310 
312 
313  rc = WaitLatch(MyLatch, wake_events, delay_ms,
314  WAIT_EVENT_WAIT_FOR_WAL_REPLAY);
315 
316  if (rc & WL_LATCH_SET)
318  }
319 
320  /*
321  * Delete our process from the shared memory pairing heap. We might
322  * already be deleted by the startup process. The 'inHeap' flag prevents
323  * us from the double deletion.
324  */
325  deleteLSNWaiter();
326 
327  /*
328  * If we didn't achieve the target LSN, we must be exited by timeout.
329  */
330  if (targetLSN > currentLSN)
331  {
332  ereport(ERROR,
333  (errcode(ERRCODE_QUERY_CANCELED),
334  errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X",
335  LSN_FORMAT_ARGS(targetLSN),
336  LSN_FORMAT_ARGS(currentLSN))));
337  }
338 }
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1756
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1644
int64 TimestampTz
Definition: timestamp.h:39
int errhint(const char *fmt,...)
Definition: elog.c:1317
int MaxBackends
Definition: globals.c:145
void ResetLatch(Latch *latch)
Definition: latch.c:724
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:517
#define WL_TIMEOUT
Definition: latch.h:130
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
static void addLSNWaiter(XLogRecPtr lsn)
Definition: waitlsn.c:108
static void deleteLSNWaiter(void)
Definition: waitlsn.c:130
bool RecoveryInProgress(void)
Definition: xlog.c:6333
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References addLSNWaiter(), Assert, CHECK_FOR_INTERRUPTS, deleteLSNWaiter(), ereport, errcode(), errdetail(), errhint(), errmsg(), ERROR, GetCurrentTimestamp(), GetXLogReplayRecPtr(), LSN_FORMAT_ARGS, MaxBackends, MyLatch, MyProcNumber, RecoveryInProgress(), ResetLatch(), TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, WaitLatch(), waitLSNState, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by pg_wal_replay_wait().

◆ waitlsn_cmp()

static int waitlsn_cmp ( const pairingheap_node a,
const pairingheap_node b,
void *  arg 
)
static

Definition at line 72 of file waitlsn.c.

73 {
76 
77  if (aproc->waitLSN < bproc->waitLSN)
78  return 1;
79  else if (aproc->waitLSN > bproc->waitLSN)
80  return -1;
81  else
82  return 0;
83 }
int b
Definition: isn.c:70
int a
Definition: isn.c:69
#define pairingheap_const_container(type, membername, ptr)
Definition: pairingheap.h:51

References a, b, pairingheap_const_container, and WaitLSNProcInfo::waitLSN.

Referenced by WaitLSNShmemInit().

◆ WaitLSNCleanup()

void WaitLSNCleanup ( void  )

Definition at line 204 of file waitlsn.c.

205 {
206  /*
207  * We do a fast-path check of the 'inHeap' flag without the lock. This
208  * flag is set to true only by the process itself. So, it's only possible
209  * to get a false positive. But that will be eliminated by a recheck
210  * inside deleteLSNWaiter().
211  */
213  deleteLSNWaiter();
214 }

References deleteLSNWaiter(), WaitLSNProcInfo::inHeap, MyProcNumber, WaitLSNState::procInfos, and waitLSNState.

Referenced by AbortTransaction(), and ProcKill().

◆ WaitLSNSetLatches()

void WaitLSNSetLatches ( XLogRecPtr  currentLSN)

Definition at line 155 of file waitlsn.c.

156 {
157  int i;
158  Latch **wakeUpProcLatches;
159  int numWakeUpProcs = 0;
160 
161  wakeUpProcLatches = palloc(sizeof(Latch *) * MaxBackends);
162 
163  LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
164 
165  /*
166  * Iterate the pairing heap of waiting processes till we find LSN not yet
167  * replayed. Record the process latches to set them later.
168  */
170  {
172  WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, phNode, node);
173 
174  if (!XLogRecPtrIsInvalid(currentLSN) &&
175  procInfo->waitLSN > currentLSN)
176  break;
177 
178  wakeUpProcLatches[numWakeUpProcs++] = procInfo->latch;
180  procInfo->inHeap = false;
181  }
182 
184 
185  LWLockRelease(WaitLSNLock);
186 
187  /*
188  * Set latches for processes, whose waited LSNs are already replayed. As
189  * the time consuming operations, we do it this outside of WaitLSNLock.
190  * This is actually fine because procLatch isn't ever freed, so we just
191  * can potentially set the wrong process' (or no process') latch.
192  */
193  for (i = 0; i < numWakeUpProcs; i++)
194  {
195  SetLatch(wakeUpProcLatches[i]);
196  }
197  pfree(wakeUpProcLatches);
198 }
int i
Definition: isn.c:73
void SetLatch(Latch *latch)
Definition: latch.c:632
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc(Size size)
Definition: mcxt.c:1317
pairingheap_node * pairingheap_remove_first(pairingheap *heap)
Definition: pairingheap.c:159
Definition: latch.h:113
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References i, WaitLSNProcInfo::inHeap, WaitLSNProcInfo::latch, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MaxBackends, pairingheap_container, pairingheap_first(), pairingheap_is_empty, pairingheap_remove_first(), palloc(), pfree(), SetLatch(), updateMinWaitedLSN(), WaitLSNState::waitersHeap, WaitLSNProcInfo::waitLSN, waitLSNState, and XLogRecPtrIsInvalid.

Referenced by PerformWalRecovery(), and StartupXLOG().

◆ WaitLSNShmemInit()

void WaitLSNShmemInit ( void  )

Definition at line 52 of file waitlsn.c.

53 {
54  bool found;
55 
56  waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
58  &found);
59  if (!found)
60  {
63  memset(&waitLSNState->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo));
64  }
65 }
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:453
void pairingheap_initialize(pairingheap *heap, pairingheap_comparator compare, void *arg)
Definition: pairingheap.c:60
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
Definition: waitlsn.c:72
Size WaitLSNShmemSize(void)
Definition: waitlsn.c:41

References MaxBackends, WaitLSNState::minWaitedLSN, pairingheap_initialize(), pg_atomic_init_u64(), PG_UINT64_MAX, WaitLSNState::procInfos, ShmemInitStruct(), WaitLSNState::waitersHeap, waitlsn_cmp(), WaitLSNShmemSize(), and waitLSNState.

Referenced by CreateOrAttachShmemStructs().

◆ WaitLSNShmemSize()

Size WaitLSNShmemSize ( void  )

Definition at line 41 of file waitlsn.c.

42 {
43  Size size;
44 
45  size = offsetof(WaitLSNState, procInfos);
47  return size;
48 }
size_t Size
Definition: c.h:605
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
static pg_noinline void Size size
Definition: slab.c:607

References add_size(), MaxBackends, mul_size(), WaitLSNState::procInfos, and size.

Referenced by CalculateShmemSize(), and WaitLSNShmemInit().

Variable Documentation

◆ waitLSNState