PostgreSQL Source Code  git master
walreceiverfuncs.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <sys/time.h>
#include <unistd.h>
#include <signal.h>
#include "access/xlog_internal.h"
#include "access/xlogrecovery.h"
#include "pgstat.h"
#include "postmaster/startup.h"
#include "replication/walreceiver.h"
#include "storage/pmsignal.h"
#include "storage/shmem.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiverfuncs.c:

Go to the source code of this file.

Macros

#define WALRCV_STARTUP_TIMEOUT   10
 

Functions

Size WalRcvShmemSize (void)
 
void WalRcvShmemInit (void)
 
bool WalRcvRunning (void)
 
bool WalRcvStreaming (void)
 
void ShutdownWalRcv (void)
 
void RequestXLogStreaming (TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname, bool create_temp_slot)
 
XLogRecPtr GetWalRcvFlushRecPtr (XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
 
XLogRecPtr GetWalRcvWriteRecPtr (void)
 
int GetReplicationApplyDelay (void)
 
int GetReplicationTransferLatency (void)
 

Variables

WalRcvDataWalRcv = NULL
 

Macro Definition Documentation

◆ WALRCV_STARTUP_TIMEOUT

#define WALRCV_STARTUP_TIMEOUT   10

Definition at line 40 of file walreceiverfuncs.c.

Function Documentation

◆ GetReplicationApplyDelay()

int GetReplicationApplyDelay ( void  )

Definition at line 365 of file walreceiverfuncs.c.

366 {
367  WalRcvData *walrcv = WalRcv;
368  XLogRecPtr receivePtr;
369  XLogRecPtr replayPtr;
370  TimestampTz chunkReplayStartTime;
371 
372  SpinLockAcquire(&walrcv->mutex);
373  receivePtr = walrcv->flushedUpto;
374  SpinLockRelease(&walrcv->mutex);
375 
376  replayPtr = GetXLogReplayRecPtr(NULL);
377 
378  if (receivePtr == replayPtr)
379  return 0;
380 
381  chunkReplayStartTime = GetCurrentChunkReplayStartTime();
382 
383  if (chunkReplayStartTime == 0)
384  return -1;
385 
386  return TimestampDifferenceMilliseconds(chunkReplayStartTime,
388 }
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1695
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flushedUpto
Definition: walreceiver.h:87
slock_t mutex
Definition: walreceiver.h:147
WalRcvData * WalRcv
uint64 XLogRecPtr
Definition: xlogdefs.h:21
TimestampTz GetCurrentChunkReplayStartTime(void)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References WalRcvData::flushedUpto, GetCurrentChunkReplayStartTime(), GetCurrentTimestamp(), GetXLogReplayRecPtr(), WalRcvData::mutex, SpinLockAcquire, SpinLockRelease, TimestampDifferenceMilliseconds(), and WalRcv.

Referenced by ProcessWalSndrMessage().

◆ GetReplicationTransferLatency()

int GetReplicationTransferLatency ( void  )

Definition at line 395 of file walreceiverfuncs.c.

396 {
397  WalRcvData *walrcv = WalRcv;
398  TimestampTz lastMsgSendTime;
399  TimestampTz lastMsgReceiptTime;
400 
401  SpinLockAcquire(&walrcv->mutex);
402  lastMsgSendTime = walrcv->lastMsgSendTime;
403  lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
404  SpinLockRelease(&walrcv->mutex);
405 
406  return TimestampDifferenceMilliseconds(lastMsgSendTime,
407  lastMsgReceiptTime);
408 }
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:102
TimestampTz lastMsgSendTime
Definition: walreceiver.h:101

References WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::mutex, SpinLockAcquire, SpinLockRelease, TimestampDifferenceMilliseconds(), and WalRcv.

Referenced by ProcessWalSndrMessage().

◆ GetWalRcvFlushRecPtr()

XLogRecPtr GetWalRcvFlushRecPtr ( XLogRecPtr latestChunkStart,
TimeLineID receiveTLI 
)

Definition at line 332 of file walreceiverfuncs.c.

333 {
334  WalRcvData *walrcv = WalRcv;
335  XLogRecPtr recptr;
336 
337  SpinLockAcquire(&walrcv->mutex);
338  recptr = walrcv->flushedUpto;
339  if (latestChunkStart)
340  *latestChunkStart = walrcv->latestChunkStart;
341  if (receiveTLI)
342  *receiveTLI = walrcv->receivedTLI;
343  SpinLockRelease(&walrcv->mutex);
344 
345  return recptr;
346 }
TimeLineID receivedTLI
Definition: walreceiver.h:88
XLogRecPtr latestChunkStart
Definition: walreceiver.h:96
static TimeLineID receiveTLI
Definition: xlogrecovery.c:263

References WalRcvData::flushedUpto, WalRcvData::latestChunkStart, WalRcvData::mutex, WalRcvData::receivedTLI, receiveTLI, SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by CreateRestartPoint(), GetStandbyFlushRecPtr(), pg_last_wal_receive_lsn(), and WaitForWALToBecomeAvailable().

◆ GetWalRcvWriteRecPtr()

XLogRecPtr GetWalRcvWriteRecPtr ( void  )

Definition at line 353 of file walreceiverfuncs.c.

354 {
355  WalRcvData *walrcv = WalRcv;
356 
357  return pg_atomic_read_u64(&walrcv->writtenUpto);
358 }
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:424
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:155

References pg_atomic_read_u64(), WalRcv, and WalRcvData::writtenUpto.

◆ RequestXLogStreaming()

void RequestXLogStreaming ( TimeLineID  tli,
XLogRecPtr  recptr,
const char *  conninfo,
const char *  slotname,
bool  create_temp_slot 
)

Definition at line 246 of file walreceiverfuncs.c.

248 {
249  WalRcvData *walrcv = WalRcv;
250  bool launch = false;
251  pg_time_t now = (pg_time_t) time(NULL);
252  Latch *latch;
253 
254  /*
255  * We always start at the beginning of the segment. That prevents a broken
256  * segment (i.e., with no records in the first half of a segment) from
257  * being created by XLOG streaming, which might cause trouble later on if
258  * the segment is e.g archived.
259  */
260  if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
261  recptr -= XLogSegmentOffset(recptr, wal_segment_size);
262 
263  SpinLockAcquire(&walrcv->mutex);
264 
265  /* It better be stopped if we try to restart it */
266  Assert(walrcv->walRcvState == WALRCV_STOPPED ||
267  walrcv->walRcvState == WALRCV_WAITING);
268 
269  if (conninfo != NULL)
270  strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
271  else
272  walrcv->conninfo[0] = '\0';
273 
274  /*
275  * Use configured replication slot if present, and ignore the value of
276  * create_temp_slot as the slot name should be persistent. Otherwise, use
277  * create_temp_slot to determine whether this WAL receiver should create a
278  * temporary slot by itself and use it, or not.
279  */
280  if (slotname != NULL && slotname[0] != '\0')
281  {
282  strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
283  walrcv->is_temp_slot = false;
284  }
285  else
286  {
287  walrcv->slotname[0] = '\0';
288  walrcv->is_temp_slot = create_temp_slot;
289  }
290 
291  if (walrcv->walRcvState == WALRCV_STOPPED)
292  {
293  launch = true;
294  walrcv->walRcvState = WALRCV_STARTING;
295  }
296  else
297  walrcv->walRcvState = WALRCV_RESTARTING;
298  walrcv->startTime = now;
299 
300  /*
301  * If this is the first startup of walreceiver (on this timeline),
302  * initialize flushedUpto and latestChunkStart to the starting point.
303  */
304  if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
305  {
306  walrcv->flushedUpto = recptr;
307  walrcv->receivedTLI = tli;
308  walrcv->latestChunkStart = recptr;
309  }
310  walrcv->receiveStart = recptr;
311  walrcv->receiveStartTLI = tli;
312 
313  latch = walrcv->latch;
314 
315  SpinLockRelease(&walrcv->mutex);
316 
317  if (launch)
319  else if (latch)
320  SetLatch(latch);
321 }
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
void SetLatch(Latch *latch)
Definition: latch.c:605
Assert(fmt[strlen(fmt) - 1] !='\n')
#define NAMEDATALEN
int64 pg_time_t
Definition: pgtime.h:23
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:181
@ PMSIGNAL_START_WALRECEIVER
Definition: pmsignal.h:41
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
Definition: latch.h:111
TimeLineID receiveStartTLI
Definition: walreceiver.h:78
char slotname[NAMEDATALEN]
Definition: walreceiver.h:127
Latch * latch
Definition: walreceiver.h:145
XLogRecPtr receiveStart
Definition: walreceiver.h:77
bool is_temp_slot
Definition: walreceiver.h:133
pg_time_t startTime
Definition: walreceiver.h:69
WalRcvState walRcvState
Definition: walreceiver.h:67
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:114
#define MAXCONNINFO
Definition: walreceiver.h:39
@ WALRCV_STARTING
Definition: walreceiver.h:50
@ WALRCV_STOPPED
Definition: walreceiver.h:49
@ WALRCV_RESTARTING
Definition: walreceiver.h:54
@ WALRCV_WAITING
Definition: walreceiver.h:53
int wal_segment_size
Definition: xlog.c:146
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)

References Assert(), WalRcvData::conninfo, WalRcvData::flushedUpto, WalRcvData::is_temp_slot, WalRcvData::latch, WalRcvData::latestChunkStart, MAXCONNINFO, WalRcvData::mutex, NAMEDATALEN, now(), PMSIGNAL_START_WALRECEIVER, WalRcvData::receivedTLI, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, SendPostmasterSignal(), SetLatch(), WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, strlcpy(), wal_segment_size, WalRcv, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_WAITING, WalRcvData::walRcvState, and XLogSegmentOffset.

Referenced by WaitForWALToBecomeAvailable().

◆ ShutdownWalRcv()

void ShutdownWalRcv ( void  )

Definition at line 179 of file walreceiverfuncs.c.

180 {
181  WalRcvData *walrcv = WalRcv;
182  pid_t walrcvpid = 0;
183  bool stopped = false;
184 
185  /*
186  * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
187  * mode once it's finished, and will also request postmaster to not
188  * restart itself.
189  */
190  SpinLockAcquire(&walrcv->mutex);
191  switch (walrcv->walRcvState)
192  {
193  case WALRCV_STOPPED:
194  break;
195  case WALRCV_STARTING:
196  walrcv->walRcvState = WALRCV_STOPPED;
197  stopped = true;
198  break;
199 
200  case WALRCV_STREAMING:
201  case WALRCV_WAITING:
202  case WALRCV_RESTARTING:
203  walrcv->walRcvState = WALRCV_STOPPING;
204  /* fall through */
205  case WALRCV_STOPPING:
206  walrcvpid = walrcv->pid;
207  break;
208  }
209  SpinLockRelease(&walrcv->mutex);
210 
211  /* Unnecessary but consistent. */
212  if (stopped)
214 
215  /*
216  * Signal walreceiver process if it was still running.
217  */
218  if (walrcvpid != 0)
219  kill(walrcvpid, SIGTERM);
220 
221  /*
222  * Wait for walreceiver to acknowledge its death by setting state to
223  * WALRCV_STOPPED.
224  */
226  while (WalRcvRunning())
228  WAIT_EVENT_WAL_RECEIVER_EXIT);
230 }
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
pid_t pid
Definition: walreceiver.h:66
ConditionVariable walRcvStoppedCV
Definition: walreceiver.h:68
@ WALRCV_STREAMING
Definition: walreceiver.h:52
@ WALRCV_STOPPING
Definition: walreceiver.h:55
bool WalRcvRunning(void)
#define kill(pid, sig)
Definition: win32_port.h:485

References ConditionVariableBroadcast(), ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), ConditionVariableSleep(), kill, WalRcvData::mutex, WalRcvData::pid, SpinLockAcquire, SpinLockRelease, WalRcv, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvRunning(), WalRcvData::walRcvState, and WalRcvData::walRcvStoppedCV.

Referenced by XLogShutdownWalRcv().

◆ WalRcvRunning()

bool WalRcvRunning ( void  )

Definition at line 76 of file walreceiverfuncs.c.

77 {
78  WalRcvData *walrcv = WalRcv;
80  pg_time_t startTime;
81 
82  SpinLockAcquire(&walrcv->mutex);
83 
84  state = walrcv->walRcvState;
85  startTime = walrcv->startTime;
86 
87  SpinLockRelease(&walrcv->mutex);
88 
89  /*
90  * If it has taken too long for walreceiver to start up, give up. Setting
91  * the state to STOPPED ensures that if walreceiver later does start up
92  * after all, it will see that it's not supposed to be running and die
93  * without doing anything.
94  */
95  if (state == WALRCV_STARTING)
96  {
97  pg_time_t now = (pg_time_t) time(NULL);
98 
99  if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
100  {
101  bool stopped = false;
102 
103  SpinLockAcquire(&walrcv->mutex);
104  if (walrcv->walRcvState == WALRCV_STARTING)
105  {
106  state = walrcv->walRcvState = WALRCV_STOPPED;
107  stopped = true;
108  }
109  SpinLockRelease(&walrcv->mutex);
110 
111  if (stopped)
113  }
114  }
115 
116  if (state != WALRCV_STOPPED)
117  return true;
118  else
119  return false;
120 }
Definition: regguts.h:323
WalRcvState
Definition: walreceiver.h:48
#define WALRCV_STARTUP_TIMEOUT

References ConditionVariableBroadcast(), WalRcvData::mutex, now(), SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, WalRcv, WALRCV_STARTING, WALRCV_STARTUP_TIMEOUT, WALRCV_STOPPED, WalRcvData::walRcvState, and WalRcvData::walRcvStoppedCV.

Referenced by ShutdownWalRcv(), and StartupRequestWalReceiverRestart().

◆ WalRcvShmemInit()

void WalRcvShmemInit ( void  )

Definition at line 55 of file walreceiverfuncs.c.

56 {
57  bool found;
58 
59  WalRcv = (WalRcvData *)
60  ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
61 
62  if (!found)
63  {
64  /* First time through, so initialize */
70  WalRcv->latch = NULL;
71  }
72 }
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:410
#define MemSet(start, val, len)
Definition: c.h:1009
void ConditionVariableInit(ConditionVariable *cv)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
#define SpinLockInit(lock)
Definition: spin.h:60
Size WalRcvShmemSize(void)

References ConditionVariableInit(), WalRcvData::latch, MemSet, WalRcvData::mutex, pg_atomic_init_u64(), ShmemInitStruct(), SpinLockInit, WalRcv, WALRCV_STOPPED, WalRcvShmemSize(), WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, and WalRcvData::writtenUpto.

Referenced by CreateSharedMemoryAndSemaphores().

◆ WalRcvShmemSize()

Size WalRcvShmemSize ( void  )

Definition at line 44 of file walreceiverfuncs.c.

45 {
46  Size size = 0;
47 
48  size = add_size(size, sizeof(WalRcvData));
49 
50  return size;
51 }
size_t Size
Definition: c.h:594
Size add_size(Size s1, Size s2)
Definition: shmem.c:502

References add_size().

Referenced by CalculateShmemSize(), and WalRcvShmemInit().

◆ WalRcvStreaming()

bool WalRcvStreaming ( void  )

Definition at line 127 of file walreceiverfuncs.c.

128 {
129  WalRcvData *walrcv = WalRcv;
131  pg_time_t startTime;
132 
133  SpinLockAcquire(&walrcv->mutex);
134 
135  state = walrcv->walRcvState;
136  startTime = walrcv->startTime;
137 
138  SpinLockRelease(&walrcv->mutex);
139 
140  /*
141  * If it has taken too long for walreceiver to start up, give up. Setting
142  * the state to STOPPED ensures that if walreceiver later does start up
143  * after all, it will see that it's not supposed to be running and die
144  * without doing anything.
145  */
146  if (state == WALRCV_STARTING)
147  {
148  pg_time_t now = (pg_time_t) time(NULL);
149 
150  if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
151  {
152  bool stopped = false;
153 
154  SpinLockAcquire(&walrcv->mutex);
155  if (walrcv->walRcvState == WALRCV_STARTING)
156  {
157  state = walrcv->walRcvState = WALRCV_STOPPED;
158  stopped = true;
159  }
160  SpinLockRelease(&walrcv->mutex);
161 
162  if (stopped)
164  }
165  }
166 
169  return true;
170  else
171  return false;
172 }

References ConditionVariableBroadcast(), WalRcvData::mutex, now(), SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, WalRcv, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STARTUP_TIMEOUT, WALRCV_STOPPED, WALRCV_STREAMING, WalRcvData::walRcvState, and WalRcvData::walRcvStoppedCV.

Referenced by FinishWalRecovery(), and WaitForWALToBecomeAvailable().

Variable Documentation

◆ WalRcv