PostgreSQL Source Code  git master
waitlsn.h File Reference
#include "lib/pairingheap.h"
#include "postgres.h"
#include "port/atomics.h"
#include "storage/latch.h"
#include "storage/spin.h"
#include "tcop/dest.h"
Include dependency graph for waitlsn.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  WaitLSNProcInfo
 
struct  WaitLSNState
 

Typedefs

typedef struct WaitLSNProcInfo WaitLSNProcInfo
 
typedef struct WaitLSNState WaitLSNState
 

Functions

Size WaitLSNShmemSize (void)
 
void WaitLSNShmemInit (void)
 
void WaitLSNSetLatches (XLogRecPtr currentLSN)
 
void WaitLSNCleanup (void)
 
void WaitForLSNReplay (XLogRecPtr targetLSN, int64 timeout)
 

Variables

PGDLLIMPORT WaitLSNStatewaitLSNState
 

Typedef Documentation

◆ WaitLSNProcInfo

◆ WaitLSNState

typedef struct WaitLSNState WaitLSNState

Function Documentation

◆ WaitForLSNReplay()

void WaitForLSNReplay ( XLogRecPtr  targetLSN,
int64  timeout 
)

Definition at line 221 of file waitlsn.c.

222 {
223  XLogRecPtr currentLSN;
224  TimestampTz endtime = 0;
225  int wake_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
226 
227  /* Shouldn't be called when shmem isn't initialized */
229 
230  /* Should have a valid proc number */
232 
233  if (!RecoveryInProgress())
234  {
235  /*
236  * Recovery is not in progress. Given that we detected this in the
237  * very first check, this procedure was mistakenly called on primary.
238  * However, it's possible that standby was promoted concurrently to
239  * the procedure call, while target LSN is replayed. So, we still
240  * check the last replay LSN before reporting an error.
241  */
242  if (targetLSN <= GetXLogReplayRecPtr(NULL))
243  return;
244  ereport(ERROR,
245  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
246  errmsg("recovery is not in progress"),
247  errhint("Waiting for LSN can only be executed during recovery.")));
248  }
249  else
250  {
251  /* If target LSN is already replayed, exit immediately */
252  if (targetLSN <= GetXLogReplayRecPtr(NULL))
253  return;
254  }
255 
256  if (timeout > 0)
257  {
258  endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
259  wake_events |= WL_TIMEOUT;
260  }
261 
262  /*
263  * Add our process to the pairing heap of waiters. It might happen that
264  * target LSN gets replayed before we do. Another check at the beginning
265  * of the loop below prevents the race condition.
266  */
267  addLSNWaiter(targetLSN);
268 
269  for (;;)
270  {
271  int rc;
272  long delay_ms = 0;
273 
274  /* Recheck that recovery is still in-progress */
275  if (!RecoveryInProgress())
276  {
277  /*
278  * Recovery was ended, but recheck if target LSN was already
279  * replayed.
280  */
281  currentLSN = GetXLogReplayRecPtr(NULL);
282  if (targetLSN <= currentLSN)
283  return;
284  ereport(ERROR,
285  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
286  errmsg("recovery is not in progress"),
287  errdetail("Recovery ended before replaying target LSN %X/%X; last replay LSN %X/%X.",
288  LSN_FORMAT_ARGS(targetLSN),
289  LSN_FORMAT_ARGS(currentLSN))));
290  }
291  else
292  {
293  /* Check if the waited LSN has been replayed */
294  currentLSN = GetXLogReplayRecPtr(NULL);
295  if (targetLSN <= currentLSN)
296  break;
297  }
298 
299  /*
300  * If the timeout value is specified, calculate the number of
301  * milliseconds before the timeout. Exit if the timeout is already
302  * reached.
303  */
304  if (timeout > 0)
305  {
307  if (delay_ms <= 0)
308  break;
309  }
310 
312 
313  rc = WaitLatch(MyLatch, wake_events, delay_ms,
314  WAIT_EVENT_WAIT_FOR_WAL_REPLAY);
315 
316  if (rc & WL_LATCH_SET)
318  }
319 
320  /*
321  * Delete our process from the shared memory pairing heap. We might
322  * already be deleted by the startup process. The 'inHeap' flag prevents
323  * us from the double deletion.
324  */
325  deleteLSNWaiter();
326 
327  /*
328  * If we didn't reach the target LSN, we must be exited by timeout.
329  */
330  if (targetLSN > currentLSN)
331  {
332  ereport(ERROR,
333  (errcode(ERRCODE_QUERY_CANCELED),
334  errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X",
335  LSN_FORMAT_ARGS(targetLSN),
336  LSN_FORMAT_ARGS(currentLSN))));
337  }
338 }
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1756
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1644
#define Assert(condition)
Definition: c.h:849
int64 TimestampTz
Definition: timestamp.h:39
int errdetail(const char *fmt,...)
Definition: elog.c:1203
int errhint(const char *fmt,...)
Definition: elog.c:1317
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
ProcNumber MyProcNumber
Definition: globals.c:89
int MaxBackends
Definition: globals.c:145
struct Latch * MyLatch
Definition: globals.c:62
void ResetLatch(Latch *latch)
Definition: latch.c:724
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:517
#define WL_TIMEOUT
Definition: latch.h:130
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
static void addLSNWaiter(XLogRecPtr lsn)
Definition: waitlsn.c:108
struct WaitLSNState * waitLSNState
Definition: waitlsn.c:37
static void deleteLSNWaiter(void)
Definition: waitlsn.c:130
bool RecoveryInProgress(void)
Definition: xlog.c:6333
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References addLSNWaiter(), Assert, CHECK_FOR_INTERRUPTS, deleteLSNWaiter(), ereport, errcode(), errdetail(), errhint(), errmsg(), ERROR, GetCurrentTimestamp(), GetXLogReplayRecPtr(), LSN_FORMAT_ARGS, MaxBackends, MyLatch, MyProcNumber, RecoveryInProgress(), ResetLatch(), TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, WaitLatch(), waitLSNState, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by pg_wal_replay_wait().

◆ WaitLSNCleanup()

void WaitLSNCleanup ( void  )

Definition at line 204 of file waitlsn.c.

205 {
206  /*
207  * We do a fast-path check of the 'inHeap' flag without the lock. This
208  * flag is set to true only by the process itself. So, it's only possible
209  * to get a false positive. But that will be eliminated by a recheck
210  * inside deleteLSNWaiter().
211  */
213  deleteLSNWaiter();
214 }
WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER]
Definition: waitlsn.h:70

References deleteLSNWaiter(), WaitLSNProcInfo::inHeap, MyProcNumber, WaitLSNState::procInfos, and waitLSNState.

Referenced by AbortTransaction(), and ProcKill().

◆ WaitLSNSetLatches()

void WaitLSNSetLatches ( XLogRecPtr  currentLSN)

Definition at line 155 of file waitlsn.c.

156 {
157  int i;
158  Latch **wakeUpProcLatches;
159  int numWakeUpProcs = 0;
160 
161  wakeUpProcLatches = palloc(sizeof(Latch *) * MaxBackends);
162 
163  LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
164 
165  /*
166  * Iterate the pairing heap of waiting processes till we find LSN not yet
167  * replayed. Record the process latches to set them later.
168  */
170  {
172  WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, phNode, node);
173 
174  if (!XLogRecPtrIsInvalid(currentLSN) &&
175  procInfo->waitLSN > currentLSN)
176  break;
177 
178  wakeUpProcLatches[numWakeUpProcs++] = procInfo->latch;
180  procInfo->inHeap = false;
181  }
182 
184 
185  LWLockRelease(WaitLSNLock);
186 
187  /*
188  * Set latches for processes, whose waited LSNs are already replayed. As
189  * the time consuming operations, we do it this outside of WaitLSNLock.
190  * This is actually fine because procLatch isn't ever freed, so we just
191  * can potentially set the wrong process' (or no process') latch.
192  */
193  for (i = 0; i < numWakeUpProcs; i++)
194  {
195  SetLatch(wakeUpProcLatches[i]);
196  }
197  pfree(wakeUpProcLatches);
198 }
int i
Definition: isn.c:73
void SetLatch(Latch *latch)
Definition: latch.c:632
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
@ LW_EXCLUSIVE
Definition: lwlock.h:114
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc(Size size)
Definition: mcxt.c:1317
pairingheap_node * pairingheap_first(pairingheap *heap)
Definition: pairingheap.c:144
pairingheap_node * pairingheap_remove_first(pairingheap *heap)
Definition: pairingheap.c:159
#define pairingheap_is_empty(h)
Definition: pairingheap.h:99
#define pairingheap_container(type, membername, ptr)
Definition: pairingheap.h:43
Definition: latch.h:113
XLogRecPtr waitLSN
Definition: waitlsn.h:30
Latch * latch
Definition: waitlsn.h:36
pairingheap waitersHeap
Definition: waitlsn.h:64
static void updateMinWaitedLSN(void)
Definition: waitlsn.c:90
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References i, WaitLSNProcInfo::inHeap, WaitLSNProcInfo::latch, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MaxBackends, pairingheap_container, pairingheap_first(), pairingheap_is_empty, pairingheap_remove_first(), palloc(), pfree(), SetLatch(), updateMinWaitedLSN(), WaitLSNState::waitersHeap, WaitLSNProcInfo::waitLSN, waitLSNState, and XLogRecPtrIsInvalid.

Referenced by PerformWalRecovery(), and StartupXLOG().

◆ WaitLSNShmemInit()

void WaitLSNShmemInit ( void  )

Definition at line 52 of file waitlsn.c.

53 {
54  bool found;
55 
56  waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
58  &found);
59  if (!found)
60  {
63  memset(&waitLSNState->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo));
64  }
65 }
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:453
#define PG_UINT64_MAX
Definition: c.h:584
void pairingheap_initialize(pairingheap *heap, pairingheap_comparator compare, void *arg)
Definition: pairingheap.c:60
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
pg_atomic_uint64 minWaitedLSN
Definition: waitlsn.h:58
static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
Definition: waitlsn.c:72
Size WaitLSNShmemSize(void)
Definition: waitlsn.c:41

References MaxBackends, WaitLSNState::minWaitedLSN, pairingheap_initialize(), pg_atomic_init_u64(), PG_UINT64_MAX, WaitLSNState::procInfos, ShmemInitStruct(), WaitLSNState::waitersHeap, waitlsn_cmp(), WaitLSNShmemSize(), and waitLSNState.

Referenced by CreateOrAttachShmemStructs().

◆ WaitLSNShmemSize()

Size WaitLSNShmemSize ( void  )

Definition at line 41 of file waitlsn.c.

42 {
43  Size size;
44 
45  size = offsetof(WaitLSNState, procInfos);
47  return size;
48 }
size_t Size
Definition: c.h:596
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
static pg_noinline void Size size
Definition: slab.c:607

References add_size(), MaxBackends, mul_size(), WaitLSNState::procInfos, and size.

Referenced by CalculateShmemSize(), and WaitLSNShmemInit().

Variable Documentation

◆ waitLSNState