PostgreSQL Source Code  git master
waitlsn.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * waitlsn.c
4  * Implements waiting for the given replay LSN, which is used in
5  * CALL pg_wal_replay_wait(target_lsn pg_lsn, timeout float8).
6  *
7  * Copyright (c) 2024, PostgreSQL Global Development Group
8  *
9  * IDENTIFICATION
10  * src/backend/commands/waitlsn.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include <float.h>
18 #include <math.h>
19 
20 #include "pgstat.h"
21 #include "access/xlog.h"
22 #include "access/xlogrecovery.h"
23 #include "commands/waitlsn.h"
24 #include "funcapi.h"
25 #include "miscadmin.h"
26 #include "storage/latch.h"
27 #include "storage/proc.h"
28 #include "storage/shmem.h"
29 #include "utils/fmgrprotos.h"
30 #include "utils/pg_lsn.h"
31 #include "utils/snapmgr.h"
32 #include "utils/wait_event_types.h"
33 
34 static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
35  void *arg);
36 
37 struct WaitLSNState *waitLSNState = NULL;
38 
39 /* Report the amount of shared memory space needed for WaitLSNState. */
40 Size
42 {
43  Size size;
44 
45  size = offsetof(WaitLSNState, procInfos);
47  return size;
48 }
49 
50 /* Initialize the WaitLSNState in the shared memory. */
51 void
53 {
54  bool found;
55 
56  waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
58  &found);
59  if (!found)
60  {
63  memset(&waitLSNState->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo));
64  }
65 }
66 
67 /*
68  * Comparison function for waitLSN->waitersHeap heap. Waiting processes are
69  * ordered by lsn, so that the waiter with smallest lsn is at the top.
70  */
71 static int
73 {
76 
77  if (aproc->waitLSN < bproc->waitLSN)
78  return 1;
79  else if (aproc->waitLSN > bproc->waitLSN)
80  return -1;
81  else
82  return 0;
83 }
84 
85 /*
86  * Update waitLSN->minWaitedLSN according to the current state of
87  * waitLSN->waitersHeap.
88  */
89 static void
91 {
93 
95  {
97 
98  minWaitedLSN = pairingheap_container(WaitLSNProcInfo, phNode, node)->waitLSN;
99  }
100 
102 }
103 
104 /*
105  * Put the current process into the heap of LSN waiters.
106  */
107 static void
109 {
111 
112  LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
113 
114  Assert(!procInfo->inHeap);
115 
116  procInfo->latch = MyLatch;
117  procInfo->waitLSN = lsn;
118 
120  procInfo->inHeap = true;
122 
123  LWLockRelease(WaitLSNLock);
124 }
125 
126 /*
127  * Remove the current process from the heap of LSN waiters if it's there.
128  */
129 static void
131 {
133 
134  LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
135 
136  if (!procInfo->inHeap)
137  {
138  LWLockRelease(WaitLSNLock);
139  return;
140  }
141 
143  procInfo->inHeap = false;
145 
146  LWLockRelease(WaitLSNLock);
147 }
148 
149 /*
150  * Remove waiters whose LSN has been replayed from the heap and set their
151  * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap
152  * and set latches for all waiters.
153  */
154 void
156 {
157  int i;
158  Latch **wakeUpProcLatches;
159  int numWakeUpProcs = 0;
160 
161  wakeUpProcLatches = palloc(sizeof(Latch *) * MaxBackends);
162 
163  LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
164 
165  /*
166  * Iterate the pairing heap of waiting processes till we find LSN not yet
167  * replayed. Record the process latches to set them later.
168  */
170  {
172  WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, phNode, node);
173 
174  if (!XLogRecPtrIsInvalid(currentLSN) &&
175  procInfo->waitLSN > currentLSN)
176  break;
177 
178  wakeUpProcLatches[numWakeUpProcs++] = procInfo->latch;
180  procInfo->inHeap = false;
181  }
182 
184 
185  LWLockRelease(WaitLSNLock);
186 
187  /*
188  * Set latches for processes, whose waited LSNs are already replayed. As
189  * the time consuming operations, we do it this outside of WaitLSNLock.
190  * This is actually fine because procLatch isn't ever freed, so we just
191  * can potentially set the wrong process' (or no process') latch.
192  */
193  for (i = 0; i < numWakeUpProcs; i++)
194  {
195  SetLatch(wakeUpProcLatches[i]);
196  }
197  pfree(wakeUpProcLatches);
198 }
199 
200 /*
201  * Delete our item from shmem array if any.
202  */
203 void
205 {
206  /*
207  * We do a fast-path check of the 'inHeap' flag without the lock. This
208  * flag is set to true only by the process itself. So, it's only possible
209  * to get a false positive. But that will be eliminated by a recheck
210  * inside deleteLSNWaiter().
211  */
213  deleteLSNWaiter();
214 }
215 
216 /*
217  * Wait using MyLatch till the given LSN is replayed, the postmaster dies or
218  * timeout happens.
219  */
220 static void
221 WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout)
222 {
223  XLogRecPtr currentLSN;
224  TimestampTz endtime = 0;
225  int wake_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
226 
227  /* Shouldn't be called when shmem isn't initialized */
229 
230  /* Should have a valid proc number */
232 
233  if (!RecoveryInProgress())
234  {
235  /*
236  * Recovery is not in progress. Given that we detected this in the
237  * very first check, this procedure was mistakenly called on primary.
238  * However, it's possible that standby was promoted concurrently to
239  * the procedure call, while target LSN is replayed. So, we still
240  * check the last replay LSN before reporting an error.
241  */
242  if (targetLSN <= GetXLogReplayRecPtr(NULL))
243  return;
244  ereport(ERROR,
245  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
246  errmsg("recovery is not in progress"),
247  errhint("Waiting for LSN can only be executed during recovery.")));
248  }
249  else
250  {
251  /* If target LSN is already replayed, exit immediately */
252  if (targetLSN <= GetXLogReplayRecPtr(NULL))
253  return;
254  }
255 
256  if (timeout > 0)
257  {
258  endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
259  wake_events |= WL_TIMEOUT;
260  }
261 
262  /*
263  * Add our process to the pairing heap of waiters. It might happen that
264  * target LSN gets replayed before we do. Another check at the beginning
265  * of the loop below prevents the race condition.
266  */
267  addLSNWaiter(targetLSN);
268 
269  for (;;)
270  {
271  int rc;
272  long delay_ms = 0;
273 
274  /* Recheck that recovery is still in-progress */
275  if (!RecoveryInProgress())
276  {
277  /*
278  * Recovery was ended, but recheck if target LSN was already
279  * replayed.
280  */
281  currentLSN = GetXLogReplayRecPtr(NULL);
282  if (targetLSN <= currentLSN)
283  return;
284  ereport(ERROR,
285  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
286  errmsg("recovery is not in progress"),
287  errdetail("Recovery ended before replaying target LSN %X/%X; last replay LSN %X/%X.",
288  LSN_FORMAT_ARGS(targetLSN),
289  LSN_FORMAT_ARGS(currentLSN))));
290  }
291  else
292  {
293  /* Check if the waited LSN has been replayed */
294  currentLSN = GetXLogReplayRecPtr(NULL);
295  if (targetLSN <= currentLSN)
296  break;
297  }
298 
299  /*
300  * If the timeout value is specified, calculate the number of
301  * milliseconds before the timeout. Exit if the timeout is already
302  * achieved.
303  */
304  if (timeout > 0)
305  {
307  if (delay_ms <= 0)
308  break;
309  }
310 
312 
313  rc = WaitLatch(MyLatch, wake_events, delay_ms,
314  WAIT_EVENT_WAIT_FOR_WAL_REPLAY);
315 
316  if (rc & WL_LATCH_SET)
318  }
319 
320  /*
321  * Delete our process from the shared memory pairing heap. We might
322  * already be deleted by the startup process. The 'inHeap' flag prevents
323  * us from the double deletion.
324  */
325  deleteLSNWaiter();
326 
327  /*
328  * If we didn't achieve the target LSN, we must be exited by timeout.
329  */
330  if (targetLSN > currentLSN)
331  {
332  ereport(ERROR,
333  (errcode(ERRCODE_QUERY_CANCELED),
334  errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X",
335  LSN_FORMAT_ARGS(targetLSN),
336  LSN_FORMAT_ARGS(currentLSN))));
337  }
338 }
339 
340 Datum
342 {
343  XLogRecPtr target_lsn = PG_GETARG_LSN(0);
344  int64 timeout = PG_GETARG_INT64(1);
345 
346  if (timeout < 0)
347  ereport(ERROR,
348  (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
349  errmsg("\"timeout\" must not be negative")));
350 
351  /*
352  * We are going to wait for the LSN replay. We should first care that we
353  * don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
354  * Otherwise, our snapshot could prevent the replay of WAL records
355  * implying a kind of self-deadlock. This is the reason why
356  * pg_wal_replay_wait() is a procedure, not a function.
357  *
358  * At first, we should check there is no active snapshot. According to
359  * PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is
360  * processed with a snapshot. Thankfully, we can pop this snapshot,
361  * because PortalRunUtility() can tolerate this.
362  */
363  if (ActiveSnapshotSet())
365 
366  /*
367  * At second, invalidate a catalog snapshot if any. And we should be done
368  * with the preparation.
369  */
371 
372  /* Give up if there is still an active or registered snapshot. */
373  if (GetOldestSnapshot())
374  ereport(ERROR,
375  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
376  errmsg("pg_wal_replay_wait() must be only called without an active or registered snapshot"),
377  errdetail("Make sure pg_wal_replay_wait() isn't called within a transaction with an isolation level higher than READ COMMITTED, another procedure, or a function.")));
378 
379  /*
380  * As the result we should hold no snapshot, and correspondingly our xmin
381  * should be unset.
382  */
384 
385  (void) WaitForLSNReplay(target_lsn, timeout);
386 
387  PG_RETURN_VOID();
388 }
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:485
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:453
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1756
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1644
#define Assert(condition)
Definition: c.h:858
#define PG_UINT64_MAX
Definition: c.h:593
size_t Size
Definition: c.h:605
int64 TimestampTz
Definition: timestamp.h:39
int errdetail(const char *fmt,...)
Definition: elog.c:1203
int errhint(const char *fmt,...)
Definition: elog.c:1317
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_GETARG_INT64(n)
Definition: fmgr.h:283
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
ProcNumber MyProcNumber
Definition: globals.c:89
int MaxBackends
Definition: globals.c:145
struct Latch * MyLatch
Definition: globals.c:62
int b
Definition: isn.c:70
int a
Definition: isn.c:69
int i
Definition: isn.c:73
void SetLatch(Latch *latch)
Definition: latch.c:632
void ResetLatch(Latch *latch)
Definition: latch.c:724
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:517
#define WL_TIMEOUT
Definition: latch.h:130
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
@ LW_EXCLUSIVE
Definition: lwlock.h:114
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc(Size size)
Definition: mcxt.c:1317
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:184
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:126
pairingheap_node * pairingheap_first(pairingheap *heap)
Definition: pairingheap.c:144
void pairingheap_initialize(pairingheap *heap, pairingheap_comparator compare, void *arg)
Definition: pairingheap.c:60
pairingheap_node * pairingheap_remove_first(pairingheap *heap)
Definition: pairingheap.c:159
#define pairingheap_is_empty(h)
Definition: pairingheap.h:99
#define pairingheap_container(type, membername, ptr)
Definition: pairingheap.h:43
#define pairingheap_const_container(type, membername, ptr)
Definition: pairingheap.h:51
void * arg
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:33
uintptr_t Datum
Definition: postgres.h:64
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
static pg_noinline void Size size
Definition: slab.c:607
bool ActiveSnapshotSet(void)
Definition: snapmgr.c:782
void PopActiveSnapshot(void)
Definition: snapmgr.c:743
void InvalidateCatalogSnapshot(void)
Definition: snapmgr.c:422
Snapshot GetOldestSnapshot(void)
Definition: snapmgr.c:323
PGPROC * MyProc
Definition: proc.c:67
Definition: latch.h:113
TransactionId xmin
Definition: proc.h:172
pairingheap_node phNode
Definition: waitlsn.h:39
XLogRecPtr waitLSN
Definition: waitlsn.h:30
Latch * latch
Definition: waitlsn.h:36
WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER]
Definition: waitlsn.h:70
pairingheap waitersHeap
Definition: waitlsn.h:64
pg_atomic_uint64 minWaitedLSN
Definition: waitlsn.h:58
#define InvalidTransactionId
Definition: transam.h:31
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
void WaitLSNShmemInit(void)
Definition: waitlsn.c:52
static void addLSNWaiter(XLogRecPtr lsn)
Definition: waitlsn.c:108
static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
Definition: waitlsn.c:72
void WaitLSNCleanup(void)
Definition: waitlsn.c:204
Datum pg_wal_replay_wait(PG_FUNCTION_ARGS)
Definition: waitlsn.c:341
struct WaitLSNState * waitLSNState
Definition: waitlsn.c:37
static void deleteLSNWaiter(void)
Definition: waitlsn.c:130
static void WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout)
Definition: waitlsn.c:221
Size WaitLSNShmemSize(void)
Definition: waitlsn.c:41
void WaitLSNSetLatches(XLogRecPtr currentLSN)
Definition: waitlsn.c:155
static void updateMinWaitedLSN(void)
Definition: waitlsn.c:90
bool RecoveryInProgress(void)
Definition: xlog.c:6333
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)