PostgreSQL Source Code  git master
walreceiverfuncs.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <sys/time.h>
#include <unistd.h>
#include <signal.h>
#include "access/xlog_internal.h"
#include "postmaster/startup.h"
#include "replication/walreceiver.h"
#include "storage/pmsignal.h"
#include "storage/shmem.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiverfuncs.c:

Go to the source code of this file.

Macros

#define WALRCV_STARTUP_TIMEOUT   10
 

Functions

Size WalRcvShmemSize (void)
 
void WalRcvShmemInit (void)
 
bool WalRcvRunning (void)
 
bool WalRcvStreaming (void)
 
void ShutdownWalRcv (void)
 
void RequestXLogStreaming (TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname, bool create_temp_slot)
 
XLogRecPtr GetWalRcvFlushRecPtr (XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
 
XLogRecPtr GetWalRcvWriteRecPtr (void)
 
int GetReplicationApplyDelay (void)
 
int GetReplicationTransferLatency (void)
 

Variables

WalRcvDataWalRcv = NULL
 

Macro Definition Documentation

◆ WALRCV_STARTUP_TIMEOUT

#define WALRCV_STARTUP_TIMEOUT   10

Definition at line 38 of file walreceiverfuncs.c.

Referenced by WalRcvRunning(), and WalRcvStreaming().

Function Documentation

◆ GetReplicationApplyDelay()

int GetReplicationApplyDelay ( void  )

Definition at line 349 of file walreceiverfuncs.c.

References WalRcvData::flushedUpto, GetCurrentChunkReplayStartTime(), GetCurrentTimestamp(), GetXLogReplayRecPtr(), WalRcvData::mutex, SpinLockAcquire, SpinLockRelease, TimestampDifferenceMilliseconds(), and WalRcv.

Referenced by ProcessWalSndrMessage(), and walrcv_clear_result().

350 {
351  WalRcvData *walrcv = WalRcv;
352  XLogRecPtr receivePtr;
353  XLogRecPtr replayPtr;
354  TimestampTz chunkReplayStartTime;
355 
356  SpinLockAcquire(&walrcv->mutex);
357  receivePtr = walrcv->flushedUpto;
358  SpinLockRelease(&walrcv->mutex);
359 
360  replayPtr = GetXLogReplayRecPtr(NULL);
361 
362  if (receivePtr == replayPtr)
363  return 0;
364 
365  chunkReplayStartTime = GetCurrentChunkReplayStartTime();
366 
367  if (chunkReplayStartTime == 0)
368  return -1;
369 
370  return TimestampDifferenceMilliseconds(chunkReplayStartTime,
372 }
slock_t mutex
Definition: walreceiver.h:143
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
int64 TimestampTz
Definition: timestamp.h:39
#define SpinLockAcquire(lock)
Definition: spin.h:62
TimestampTz GetCurrentChunkReplayStartTime(void)
Definition: xlog.c:6216
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11596
#define SpinLockRelease(lock)
Definition: spin.h:64
uint64 XLogRecPtr
Definition: xlogdefs.h:21
WalRcvData * WalRcv
XLogRecPtr flushedUpto
Definition: walreceiver.h:83
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1691

◆ GetReplicationTransferLatency()

int GetReplicationTransferLatency ( void  )

Definition at line 379 of file walreceiverfuncs.c.

References WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::mutex, SpinLockAcquire, SpinLockRelease, TimestampDifferenceMilliseconds(), and WalRcv.

Referenced by ProcessWalSndrMessage(), and walrcv_clear_result().

380 {
381  WalRcvData *walrcv = WalRcv;
382  TimestampTz lastMsgSendTime;
383  TimestampTz lastMsgReceiptTime;
384 
385  SpinLockAcquire(&walrcv->mutex);
386  lastMsgSendTime = walrcv->lastMsgSendTime;
387  lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
388  SpinLockRelease(&walrcv->mutex);
389 
390  return TimestampDifferenceMilliseconds(lastMsgSendTime,
391  lastMsgReceiptTime);
392 }
slock_t mutex
Definition: walreceiver.h:143
int64 TimestampTz
Definition: timestamp.h:39
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:98
TimestampTz lastMsgSendTime
Definition: walreceiver.h:97
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
WalRcvData * WalRcv
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1691

◆ GetWalRcvFlushRecPtr()

XLogRecPtr GetWalRcvFlushRecPtr ( XLogRecPtr latestChunkStart,
TimeLineID receiveTLI 
)

Definition at line 316 of file walreceiverfuncs.c.

References WalRcvData::flushedUpto, WalRcvData::latestChunkStart, WalRcvData::mutex, WalRcvData::receivedTLI, SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by CreateRestartPoint(), GetStandbyFlushRecPtr(), pg_last_wal_receive_lsn(), WaitForWALToBecomeAvailable(), and walrcv_clear_result().

317 {
318  WalRcvData *walrcv = WalRcv;
319  XLogRecPtr recptr;
320 
321  SpinLockAcquire(&walrcv->mutex);
322  recptr = walrcv->flushedUpto;
323  if (latestChunkStart)
324  *latestChunkStart = walrcv->latestChunkStart;
325  if (receiveTLI)
326  *receiveTLI = walrcv->receivedTLI;
327  SpinLockRelease(&walrcv->mutex);
328 
329  return recptr;
330 }
slock_t mutex
Definition: walreceiver.h:143
TimeLineID receivedTLI
Definition: walreceiver.h:84
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr latestChunkStart
Definition: walreceiver.h:92
#define SpinLockRelease(lock)
Definition: spin.h:64
static TimeLineID receiveTLI
Definition: xlog.c:215
uint64 XLogRecPtr
Definition: xlogdefs.h:21
WalRcvData * WalRcv
XLogRecPtr flushedUpto
Definition: walreceiver.h:83

◆ GetWalRcvWriteRecPtr()

XLogRecPtr GetWalRcvWriteRecPtr ( void  )

Definition at line 337 of file walreceiverfuncs.c.

References pg_atomic_read_u64(), WalRcv, and WalRcvData::writtenUpto.

Referenced by walrcv_clear_result().

338 {
339  WalRcvData *walrcv = WalRcv;
340 
341  return pg_atomic_read_u64(&walrcv->writtenUpto);
342 }
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:151
WalRcvData * WalRcv
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:429

◆ RequestXLogStreaming()

void RequestXLogStreaming ( TimeLineID  tli,
XLogRecPtr  recptr,
const char *  conninfo,
const char *  slotname,
bool  create_temp_slot 
)

Definition at line 230 of file walreceiverfuncs.c.

References Assert, WalRcvData::conninfo, WalRcvData::flushedUpto, WalRcvData::is_temp_slot, WalRcvData::latch, WalRcvData::latestChunkStart, MAXCONNINFO, WalRcvData::mutex, NAMEDATALEN, now(), PMSIGNAL_START_WALRECEIVER, WalRcvData::receivedTLI, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, SendPostmasterSignal(), SetLatch(), WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, strlcpy(), wal_segment_size, WalRcv, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_WAITING, WalRcvData::walRcvState, and XLogSegmentOffset.

Referenced by WaitForWALToBecomeAvailable(), and walrcv_clear_result().

232 {
233  WalRcvData *walrcv = WalRcv;
234  bool launch = false;
235  pg_time_t now = (pg_time_t) time(NULL);
236  Latch *latch;
237 
238  /*
239  * We always start at the beginning of the segment. That prevents a broken
240  * segment (i.e., with no records in the first half of a segment) from
241  * being created by XLOG streaming, which might cause trouble later on if
242  * the segment is e.g archived.
243  */
244  if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
245  recptr -= XLogSegmentOffset(recptr, wal_segment_size);
246 
247  SpinLockAcquire(&walrcv->mutex);
248 
249  /* It better be stopped if we try to restart it */
250  Assert(walrcv->walRcvState == WALRCV_STOPPED ||
251  walrcv->walRcvState == WALRCV_WAITING);
252 
253  if (conninfo != NULL)
254  strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
255  else
256  walrcv->conninfo[0] = '\0';
257 
258  /*
259  * Use configured replication slot if present, and ignore the value of
260  * create_temp_slot as the slot name should be persistent. Otherwise, use
261  * create_temp_slot to determine whether this WAL receiver should create a
262  * temporary slot by itself and use it, or not.
263  */
264  if (slotname != NULL && slotname[0] != '\0')
265  {
266  strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
267  walrcv->is_temp_slot = false;
268  }
269  else
270  {
271  walrcv->slotname[0] = '\0';
272  walrcv->is_temp_slot = create_temp_slot;
273  }
274 
275  if (walrcv->walRcvState == WALRCV_STOPPED)
276  {
277  launch = true;
278  walrcv->walRcvState = WALRCV_STARTING;
279  }
280  else
281  walrcv->walRcvState = WALRCV_RESTARTING;
282  walrcv->startTime = now;
283 
284  /*
285  * If this is the first startup of walreceiver (on this timeline),
286  * initialize flushedUpto and latestChunkStart to the starting point.
287  */
288  if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
289  {
290  walrcv->flushedUpto = recptr;
291  walrcv->receivedTLI = tli;
292  walrcv->latestChunkStart = recptr;
293  }
294  walrcv->receiveStart = recptr;
295  walrcv->receiveStartTLI = tli;
296 
297  latch = walrcv->latch;
298 
299  SpinLockRelease(&walrcv->mutex);
300 
301  if (launch)
303  else if (latch)
304  SetLatch(latch);
305 }
int64 pg_time_t
Definition: pgtime.h:23
slock_t mutex
Definition: walreceiver.h:143
int wal_segment_size
Definition: xlog.c:118
WalRcvState walRcvState
Definition: walreceiver.h:64
TimeLineID receivedTLI
Definition: walreceiver.h:84
void SetLatch(Latch *latch)
Definition: latch.c:567
pg_time_t startTime
Definition: walreceiver.h:65
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define MAXCONNINFO
Definition: walreceiver.h:36
XLogRecPtr latestChunkStart
Definition: walreceiver.h:92
Definition: latch.h:110
TimeLineID receiveStartTLI
Definition: walreceiver.h:74
Latch * latch
Definition: walreceiver.h:141
#define SpinLockRelease(lock)
Definition: spin.h:64
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define Assert(condition)
Definition: c.h:804
bool is_temp_slot
Definition: walreceiver.h:129
WalRcvData * WalRcv
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:153
XLogRecPtr receiveStart
Definition: walreceiver.h:73
XLogRecPtr flushedUpto
Definition: walreceiver.h:83
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542
char slotname[NAMEDATALEN]
Definition: walreceiver.h:123
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:110

◆ ShutdownWalRcv()

void ShutdownWalRcv ( void  )

Definition at line 164 of file walreceiverfuncs.c.

References HandleStartupProcInterrupts(), kill, WalRcvData::mutex, pg_usleep(), WalRcvData::pid, SpinLockAcquire, SpinLockRelease, WalRcv, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvRunning(), and WalRcvData::walRcvState.

Referenced by StartupXLOG(), WaitForWALToBecomeAvailable(), and walrcv_clear_result().

165 {
166  WalRcvData *walrcv = WalRcv;
167  pid_t walrcvpid = 0;
168 
169  /*
170  * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
171  * mode once it's finished, and will also request postmaster to not
172  * restart itself.
173  */
174  SpinLockAcquire(&walrcv->mutex);
175  switch (walrcv->walRcvState)
176  {
177  case WALRCV_STOPPED:
178  break;
179  case WALRCV_STARTING:
180  walrcv->walRcvState = WALRCV_STOPPED;
181  break;
182 
183  case WALRCV_STREAMING:
184  case WALRCV_WAITING:
185  case WALRCV_RESTARTING:
186  walrcv->walRcvState = WALRCV_STOPPING;
187  /* fall through */
188  case WALRCV_STOPPING:
189  walrcvpid = walrcv->pid;
190  break;
191  }
192  SpinLockRelease(&walrcv->mutex);
193 
194  /*
195  * Signal walreceiver process if it was still running.
196  */
197  if (walrcvpid != 0)
198  kill(walrcvpid, SIGTERM);
199 
200  /*
201  * Wait for walreceiver to acknowledge its death by setting state to
202  * WALRCV_STOPPED.
203  */
204  while (WalRcvRunning())
205  {
206  /*
207  * This possibly-long loop needs to handle interrupts of startup
208  * process.
209  */
211 
212  pg_usleep(100000); /* 100ms */
213  }
214 }
slock_t mutex
Definition: walreceiver.h:143
WalRcvState walRcvState
Definition: walreceiver.h:64
#define kill(pid, sig)
Definition: win32_port.h:454
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
pid_t pid
Definition: walreceiver.h:63
#define SpinLockRelease(lock)
Definition: spin.h:64
WalRcvData * WalRcv
void HandleStartupProcInterrupts(void)
Definition: startup.c:135
bool WalRcvRunning(void)

◆ WalRcvRunning()

bool WalRcvRunning ( void  )

Definition at line 73 of file walreceiverfuncs.c.

References WalRcvData::mutex, now(), SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, WalRcv, WALRCV_STARTING, WALRCV_STARTUP_TIMEOUT, WALRCV_STOPPED, and WalRcvData::walRcvState.

Referenced by ShutdownWalRcv(), StartupRequestWalReceiverRestart(), and walrcv_clear_result().

74 {
75  WalRcvData *walrcv = WalRcv;
77  pg_time_t startTime;
78 
79  SpinLockAcquire(&walrcv->mutex);
80 
81  state = walrcv->walRcvState;
82  startTime = walrcv->startTime;
83 
84  SpinLockRelease(&walrcv->mutex);
85 
86  /*
87  * If it has taken too long for walreceiver to start up, give up. Setting
88  * the state to STOPPED ensures that if walreceiver later does start up
89  * after all, it will see that it's not supposed to be running and die
90  * without doing anything.
91  */
92  if (state == WALRCV_STARTING)
93  {
94  pg_time_t now = (pg_time_t) time(NULL);
95 
96  if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
97  {
98  SpinLockAcquire(&walrcv->mutex);
99 
100  if (walrcv->walRcvState == WALRCV_STARTING)
101  state = walrcv->walRcvState = WALRCV_STOPPED;
102 
103  SpinLockRelease(&walrcv->mutex);
104  }
105  }
106 
107  if (state != WALRCV_STOPPED)
108  return true;
109  else
110  return false;
111 }
int64 pg_time_t
Definition: pgtime.h:23
slock_t mutex
Definition: walreceiver.h:143
WalRcvState walRcvState
Definition: walreceiver.h:64
pg_time_t startTime
Definition: walreceiver.h:65
WalRcvState
Definition: walreceiver.h:44
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
#define WALRCV_STARTUP_TIMEOUT
Definition: regguts.h:317
WalRcvData * WalRcv
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542

◆ WalRcvShmemInit()

void WalRcvShmemInit ( void  )

Definition at line 53 of file walreceiverfuncs.c.

References WalRcvData::latch, MemSet, WalRcvData::mutex, pg_atomic_init_u64(), ShmemInitStruct(), SpinLockInit, WALRCV_STOPPED, WalRcvShmemSize(), WalRcvData::walRcvState, and WalRcvData::writtenUpto.

Referenced by CreateSharedMemoryAndSemaphores(), and walrcv_clear_result().

54 {
55  bool found;
56 
57  WalRcv = (WalRcvData *)
58  ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
59 
60  if (!found)
61  {
62  /* First time through, so initialize */
67  WalRcv->latch = NULL;
68  }
69 }
slock_t mutex
Definition: walreceiver.h:143
WalRcvState walRcvState
Definition: walreceiver.h:64
#define SpinLockInit(lock)
Definition: spin.h:60
#define MemSet(start, val, len)
Definition: c.h:1008
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:151
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:415
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
Latch * latch
Definition: walreceiver.h:141
Size WalRcvShmemSize(void)
WalRcvData * WalRcv

◆ WalRcvShmemSize()

Size WalRcvShmemSize ( void  )

Definition at line 42 of file walreceiverfuncs.c.

References add_size().

Referenced by CreateSharedMemoryAndSemaphores(), walrcv_clear_result(), and WalRcvShmemInit().

43 {
44  Size size = 0;
45 
46  size = add_size(size, sizeof(WalRcvData));
47 
48  return size;
49 }
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
size_t Size
Definition: c.h:540

◆ WalRcvStreaming()

bool WalRcvStreaming ( void  )

Definition at line 118 of file walreceiverfuncs.c.

References WalRcvData::mutex, now(), SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, WalRcv, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STARTUP_TIMEOUT, WALRCV_STOPPED, WALRCV_STREAMING, and WalRcvData::walRcvState.

Referenced by StartupXLOG(), WaitForWALToBecomeAvailable(), and walrcv_clear_result().

119 {
120  WalRcvData *walrcv = WalRcv;
122  pg_time_t startTime;
123 
124  SpinLockAcquire(&walrcv->mutex);
125 
126  state = walrcv->walRcvState;
127  startTime = walrcv->startTime;
128 
129  SpinLockRelease(&walrcv->mutex);
130 
131  /*
132  * If it has taken too long for walreceiver to start up, give up. Setting
133  * the state to STOPPED ensures that if walreceiver later does start up
134  * after all, it will see that it's not supposed to be running and die
135  * without doing anything.
136  */
137  if (state == WALRCV_STARTING)
138  {
139  pg_time_t now = (pg_time_t) time(NULL);
140 
141  if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
142  {
143  SpinLockAcquire(&walrcv->mutex);
144 
145  if (walrcv->walRcvState == WALRCV_STARTING)
146  state = walrcv->walRcvState = WALRCV_STOPPED;
147 
148  SpinLockRelease(&walrcv->mutex);
149  }
150  }
151 
152  if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
153  state == WALRCV_RESTARTING)
154  return true;
155  else
156  return false;
157 }
int64 pg_time_t
Definition: pgtime.h:23
slock_t mutex
Definition: walreceiver.h:143
WalRcvState walRcvState
Definition: walreceiver.h:64
pg_time_t startTime
Definition: walreceiver.h:65
WalRcvState
Definition: walreceiver.h:44
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
#define WALRCV_STARTUP_TIMEOUT
Definition: regguts.h:317
WalRcvData * WalRcv
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542

Variable Documentation

◆ WalRcv