PostgreSQL Source Code  git master
walreceiverfuncs.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <sys/time.h>
#include <unistd.h>
#include <signal.h>
#include "access/xlog_internal.h"
#include "access/xlogrecovery.h"
#include "pgstat.h"
#include "replication/walreceiver.h"
#include "storage/pmsignal.h"
#include "storage/shmem.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiverfuncs.c:

Go to the source code of this file.

Macros

#define WALRCV_STARTUP_TIMEOUT   10
 

Functions

Size WalRcvShmemSize (void)
 
void WalRcvShmemInit (void)
 
bool WalRcvRunning (void)
 
bool WalRcvStreaming (void)
 
void ShutdownWalRcv (void)
 
void RequestXLogStreaming (TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname, bool create_temp_slot)
 
XLogRecPtr GetWalRcvFlushRecPtr (XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
 
XLogRecPtr GetWalRcvWriteRecPtr (void)
 
int GetReplicationApplyDelay (void)
 
int GetReplicationTransferLatency (void)
 

Variables

WalRcvDataWalRcv = NULL
 

Macro Definition Documentation

◆ WALRCV_STARTUP_TIMEOUT

#define WALRCV_STARTUP_TIMEOUT   10

Definition at line 39 of file walreceiverfuncs.c.

Function Documentation

◆ GetReplicationApplyDelay()

int GetReplicationApplyDelay ( void  )

Definition at line 364 of file walreceiverfuncs.c.

365 {
366  WalRcvData *walrcv = WalRcv;
367  XLogRecPtr receivePtr;
368  XLogRecPtr replayPtr;
369  TimestampTz chunkReplayStartTime;
370 
371  SpinLockAcquire(&walrcv->mutex);
372  receivePtr = walrcv->flushedUpto;
373  SpinLockRelease(&walrcv->mutex);
374 
375  replayPtr = GetXLogReplayRecPtr(NULL);
376 
377  if (receivePtr == replayPtr)
378  return 0;
379 
380  chunkReplayStartTime = GetCurrentChunkReplayStartTime();
381 
382  if (chunkReplayStartTime == 0)
383  return -1;
384 
385  return TimestampDifferenceMilliseconds(chunkReplayStartTime,
387 }
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1766
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1654
int64 TimestampTz
Definition: timestamp.h:39
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flushedUpto
Definition: walreceiver.h:86
slock_t mutex
Definition: walreceiver.h:146
WalRcvData * WalRcv
uint64 XLogRecPtr
Definition: xlogdefs.h:21
TimestampTz GetCurrentChunkReplayStartTime(void)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References WalRcvData::flushedUpto, GetCurrentChunkReplayStartTime(), GetCurrentTimestamp(), GetXLogReplayRecPtr(), WalRcvData::mutex, SpinLockAcquire, SpinLockRelease, TimestampDifferenceMilliseconds(), and WalRcv.

Referenced by ProcessWalSndrMessage().

◆ GetReplicationTransferLatency()

int GetReplicationTransferLatency ( void  )

Definition at line 394 of file walreceiverfuncs.c.

395 {
396  WalRcvData *walrcv = WalRcv;
397  TimestampTz lastMsgSendTime;
398  TimestampTz lastMsgReceiptTime;
399 
400  SpinLockAcquire(&walrcv->mutex);
401  lastMsgSendTime = walrcv->lastMsgSendTime;
402  lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
403  SpinLockRelease(&walrcv->mutex);
404 
405  return TimestampDifferenceMilliseconds(lastMsgSendTime,
406  lastMsgReceiptTime);
407 }
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:101
TimestampTz lastMsgSendTime
Definition: walreceiver.h:100

References WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::mutex, SpinLockAcquire, SpinLockRelease, TimestampDifferenceMilliseconds(), and WalRcv.

Referenced by ProcessWalSndrMessage().

◆ GetWalRcvFlushRecPtr()

XLogRecPtr GetWalRcvFlushRecPtr ( XLogRecPtr latestChunkStart,
TimeLineID receiveTLI 
)

Definition at line 331 of file walreceiverfuncs.c.

332 {
333  WalRcvData *walrcv = WalRcv;
334  XLogRecPtr recptr;
335 
336  SpinLockAcquire(&walrcv->mutex);
337  recptr = walrcv->flushedUpto;
338  if (latestChunkStart)
339  *latestChunkStart = walrcv->latestChunkStart;
340  if (receiveTLI)
341  *receiveTLI = walrcv->receivedTLI;
342  SpinLockRelease(&walrcv->mutex);
343 
344  return recptr;
345 }
TimeLineID receivedTLI
Definition: walreceiver.h:87
XLogRecPtr latestChunkStart
Definition: walreceiver.h:95
static TimeLineID receiveTLI
Definition: xlogrecovery.c:263

References WalRcvData::flushedUpto, WalRcvData::latestChunkStart, WalRcvData::mutex, WalRcvData::receivedTLI, receiveTLI, SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by CreateRestartPoint(), GetLatestLSN(), GetStandbyFlushRecPtr(), pg_last_wal_receive_lsn(), reserve_wal_for_local_slot(), and WaitForWALToBecomeAvailable().

◆ GetWalRcvWriteRecPtr()

XLogRecPtr GetWalRcvWriteRecPtr ( void  )

Definition at line 352 of file walreceiverfuncs.c.

353 {
354  WalRcvData *walrcv = WalRcv;
355 
356  return pg_atomic_read_u64(&walrcv->writtenUpto);
357 }
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:462
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:154

References pg_atomic_read_u64(), WalRcv, and WalRcvData::writtenUpto.

◆ RequestXLogStreaming()

void RequestXLogStreaming ( TimeLineID  tli,
XLogRecPtr  recptr,
const char *  conninfo,
const char *  slotname,
bool  create_temp_slot 
)

Definition at line 245 of file walreceiverfuncs.c.

247 {
248  WalRcvData *walrcv = WalRcv;
249  bool launch = false;
250  pg_time_t now = (pg_time_t) time(NULL);
251  Latch *latch;
252 
253  /*
254  * We always start at the beginning of the segment. That prevents a broken
255  * segment (i.e., with no records in the first half of a segment) from
256  * being created by XLOG streaming, which might cause trouble later on if
257  * the segment is e.g archived.
258  */
259  if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
260  recptr -= XLogSegmentOffset(recptr, wal_segment_size);
261 
262  SpinLockAcquire(&walrcv->mutex);
263 
264  /* It better be stopped if we try to restart it */
265  Assert(walrcv->walRcvState == WALRCV_STOPPED ||
266  walrcv->walRcvState == WALRCV_WAITING);
267 
268  if (conninfo != NULL)
269  strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
270  else
271  walrcv->conninfo[0] = '\0';
272 
273  /*
274  * Use configured replication slot if present, and ignore the value of
275  * create_temp_slot as the slot name should be persistent. Otherwise, use
276  * create_temp_slot to determine whether this WAL receiver should create a
277  * temporary slot by itself and use it, or not.
278  */
279  if (slotname != NULL && slotname[0] != '\0')
280  {
281  strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
282  walrcv->is_temp_slot = false;
283  }
284  else
285  {
286  walrcv->slotname[0] = '\0';
287  walrcv->is_temp_slot = create_temp_slot;
288  }
289 
290  if (walrcv->walRcvState == WALRCV_STOPPED)
291  {
292  launch = true;
293  walrcv->walRcvState = WALRCV_STARTING;
294  }
295  else
296  walrcv->walRcvState = WALRCV_RESTARTING;
297  walrcv->startTime = now;
298 
299  /*
300  * If this is the first startup of walreceiver (on this timeline),
301  * initialize flushedUpto and latestChunkStart to the starting point.
302  */
303  if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
304  {
305  walrcv->flushedUpto = recptr;
306  walrcv->receivedTLI = tli;
307  walrcv->latestChunkStart = recptr;
308  }
309  walrcv->receiveStart = recptr;
310  walrcv->receiveStartTLI = tli;
311 
312  latch = walrcv->latch;
313 
314  SpinLockRelease(&walrcv->mutex);
315 
316  if (launch)
318  else if (latch)
319  SetLatch(latch);
320 }
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1618
void SetLatch(Latch *latch)
Definition: latch.c:632
Assert(fmt[strlen(fmt) - 1] !='\n')
#define NAMEDATALEN
int64 pg_time_t
Definition: pgtime.h:23
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:181
@ PMSIGNAL_START_WALRECEIVER
Definition: pmsignal.h:41
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
Definition: latch.h:113
TimeLineID receiveStartTLI
Definition: walreceiver.h:77
char slotname[NAMEDATALEN]
Definition: walreceiver.h:126
Latch * latch
Definition: walreceiver.h:144
XLogRecPtr receiveStart
Definition: walreceiver.h:76
bool is_temp_slot
Definition: walreceiver.h:132
pg_time_t startTime
Definition: walreceiver.h:68
WalRcvState walRcvState
Definition: walreceiver.h:66
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:113
#define MAXCONNINFO
Definition: walreceiver.h:38
@ WALRCV_STARTING
Definition: walreceiver.h:49
@ WALRCV_STOPPED
Definition: walreceiver.h:48
@ WALRCV_RESTARTING
Definition: walreceiver.h:53
@ WALRCV_WAITING
Definition: walreceiver.h:52
int wal_segment_size
Definition: xlog.c:143
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)

References Assert(), WalRcvData::conninfo, WalRcvData::flushedUpto, WalRcvData::is_temp_slot, WalRcvData::latch, WalRcvData::latestChunkStart, MAXCONNINFO, WalRcvData::mutex, NAMEDATALEN, now(), PMSIGNAL_START_WALRECEIVER, WalRcvData::receivedTLI, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, SendPostmasterSignal(), SetLatch(), WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, strlcpy(), wal_segment_size, WalRcv, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_WAITING, WalRcvData::walRcvState, and XLogSegmentOffset.

Referenced by WaitForWALToBecomeAvailable().

◆ ShutdownWalRcv()

void ShutdownWalRcv ( void  )

Definition at line 178 of file walreceiverfuncs.c.

179 {
180  WalRcvData *walrcv = WalRcv;
181  pid_t walrcvpid = 0;
182  bool stopped = false;
183 
184  /*
185  * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
186  * mode once it's finished, and will also request postmaster to not
187  * restart itself.
188  */
189  SpinLockAcquire(&walrcv->mutex);
190  switch (walrcv->walRcvState)
191  {
192  case WALRCV_STOPPED:
193  break;
194  case WALRCV_STARTING:
195  walrcv->walRcvState = WALRCV_STOPPED;
196  stopped = true;
197  break;
198 
199  case WALRCV_STREAMING:
200  case WALRCV_WAITING:
201  case WALRCV_RESTARTING:
202  walrcv->walRcvState = WALRCV_STOPPING;
203  /* fall through */
204  case WALRCV_STOPPING:
205  walrcvpid = walrcv->pid;
206  break;
207  }
208  SpinLockRelease(&walrcv->mutex);
209 
210  /* Unnecessary but consistent. */
211  if (stopped)
213 
214  /*
215  * Signal walreceiver process if it was still running.
216  */
217  if (walrcvpid != 0)
218  kill(walrcvpid, SIGTERM);
219 
220  /*
221  * Wait for walreceiver to acknowledge its death by setting state to
222  * WALRCV_STOPPED.
223  */
225  while (WalRcvRunning())
227  WAIT_EVENT_WAL_RECEIVER_EXIT);
229 }
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
pid_t pid
Definition: walreceiver.h:65
ConditionVariable walRcvStoppedCV
Definition: walreceiver.h:67
@ WALRCV_STREAMING
Definition: walreceiver.h:51
@ WALRCV_STOPPING
Definition: walreceiver.h:54
bool WalRcvRunning(void)
#define kill(pid, sig)
Definition: win32_port.h:485

References ConditionVariableBroadcast(), ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), ConditionVariableSleep(), kill, WalRcvData::mutex, WalRcvData::pid, SpinLockAcquire, SpinLockRelease, WalRcv, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvRunning(), WalRcvData::walRcvState, and WalRcvData::walRcvStoppedCV.

Referenced by XLogShutdownWalRcv().

◆ WalRcvRunning()

bool WalRcvRunning ( void  )

Definition at line 75 of file walreceiverfuncs.c.

76 {
77  WalRcvData *walrcv = WalRcv;
79  pg_time_t startTime;
80 
81  SpinLockAcquire(&walrcv->mutex);
82 
83  state = walrcv->walRcvState;
84  startTime = walrcv->startTime;
85 
86  SpinLockRelease(&walrcv->mutex);
87 
88  /*
89  * If it has taken too long for walreceiver to start up, give up. Setting
90  * the state to STOPPED ensures that if walreceiver later does start up
91  * after all, it will see that it's not supposed to be running and die
92  * without doing anything.
93  */
94  if (state == WALRCV_STARTING)
95  {
96  pg_time_t now = (pg_time_t) time(NULL);
97 
98  if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
99  {
100  bool stopped = false;
101 
102  SpinLockAcquire(&walrcv->mutex);
103  if (walrcv->walRcvState == WALRCV_STARTING)
104  {
105  state = walrcv->walRcvState = WALRCV_STOPPED;
106  stopped = true;
107  }
108  SpinLockRelease(&walrcv->mutex);
109 
110  if (stopped)
112  }
113  }
114 
115  if (state != WALRCV_STOPPED)
116  return true;
117  else
118  return false;
119 }
Definition: regguts.h:323
WalRcvState
Definition: walreceiver.h:47
#define WALRCV_STARTUP_TIMEOUT

References ConditionVariableBroadcast(), WalRcvData::mutex, now(), SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, WalRcv, WALRCV_STARTING, WALRCV_STARTUP_TIMEOUT, WALRCV_STOPPED, WalRcvData::walRcvState, and WalRcvData::walRcvStoppedCV.

Referenced by ShutdownWalRcv(), and StartupRequestWalReceiverRestart().

◆ WalRcvShmemInit()

void WalRcvShmemInit ( void  )

Definition at line 54 of file walreceiverfuncs.c.

55 {
56  bool found;
57 
58  WalRcv = (WalRcvData *)
59  ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
60 
61  if (!found)
62  {
63  /* First time through, so initialize */
69  WalRcv->latch = NULL;
70  }
71 }
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:448
#define MemSet(start, val, len)
Definition: c.h:1007
void ConditionVariableInit(ConditionVariable *cv)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
#define SpinLockInit(lock)
Definition: spin.h:60
Size WalRcvShmemSize(void)

References ConditionVariableInit(), WalRcvData::latch, MemSet, WalRcvData::mutex, pg_atomic_init_u64(), ShmemInitStruct(), SpinLockInit, WalRcv, WALRCV_STOPPED, WalRcvShmemSize(), WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, and WalRcvData::writtenUpto.

Referenced by CreateOrAttachShmemStructs().

◆ WalRcvShmemSize()

Size WalRcvShmemSize ( void  )

Definition at line 43 of file walreceiverfuncs.c.

44 {
45  Size size = 0;
46 
47  size = add_size(size, sizeof(WalRcvData));
48 
49  return size;
50 }
size_t Size
Definition: c.h:592
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
static pg_noinline void Size size
Definition: slab.c:607

References add_size(), and size.

Referenced by CalculateShmemSize(), and WalRcvShmemInit().

◆ WalRcvStreaming()

bool WalRcvStreaming ( void  )

Definition at line 126 of file walreceiverfuncs.c.

127 {
128  WalRcvData *walrcv = WalRcv;
130  pg_time_t startTime;
131 
132  SpinLockAcquire(&walrcv->mutex);
133 
134  state = walrcv->walRcvState;
135  startTime = walrcv->startTime;
136 
137  SpinLockRelease(&walrcv->mutex);
138 
139  /*
140  * If it has taken too long for walreceiver to start up, give up. Setting
141  * the state to STOPPED ensures that if walreceiver later does start up
142  * after all, it will see that it's not supposed to be running and die
143  * without doing anything.
144  */
145  if (state == WALRCV_STARTING)
146  {
147  pg_time_t now = (pg_time_t) time(NULL);
148 
149  if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
150  {
151  bool stopped = false;
152 
153  SpinLockAcquire(&walrcv->mutex);
154  if (walrcv->walRcvState == WALRCV_STARTING)
155  {
156  state = walrcv->walRcvState = WALRCV_STOPPED;
157  stopped = true;
158  }
159  SpinLockRelease(&walrcv->mutex);
160 
161  if (stopped)
163  }
164  }
165 
168  return true;
169  else
170  return false;
171 }

References ConditionVariableBroadcast(), WalRcvData::mutex, now(), SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, WalRcv, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STARTUP_TIMEOUT, WALRCV_STOPPED, WALRCV_STREAMING, WalRcvData::walRcvState, and WalRcvData::walRcvStoppedCV.

Referenced by FinishWalRecovery(), and WaitForWALToBecomeAvailable().

Variable Documentation

◆ WalRcv