PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
walreceiverfuncs.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <sys/time.h>
#include <unistd.h>
#include <signal.h>
#include "access/xlog_internal.h"
#include "postmaster/startup.h"
#include "replication/walreceiver.h"
#include "storage/pmsignal.h"
#include "storage/shmem.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiverfuncs.c:

Go to the source code of this file.

Macros

#define WALRCV_STARTUP_TIMEOUT   10
 

Functions

Size WalRcvShmemSize (void)
 
void WalRcvShmemInit (void)
 
bool WalRcvRunning (void)
 
bool WalRcvStreaming (void)
 
void ShutdownWalRcv (void)
 
void RequestXLogStreaming (TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname)
 
XLogRecPtr GetWalRcvWriteRecPtr (XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
 
int GetReplicationApplyDelay (void)
 
int GetReplicationTransferLatency (void)
 

Variables

WalRcvDataWalRcv = NULL
 

Macro Definition Documentation

#define WALRCV_STARTUP_TIMEOUT   10

Definition at line 38 of file walreceiverfuncs.c.

Referenced by WalRcvRunning(), and WalRcvStreaming().

Function Documentation

int GetReplicationApplyDelay ( void  )

Definition at line 318 of file walreceiverfuncs.c.

References GetCurrentChunkReplayStartTime(), GetCurrentTimestamp(), GetXLogReplayRecPtr(), WalRcvData::mutex, WalRcvData::receivedUpto, SpinLockAcquire, SpinLockRelease, TimestampDifference(), and WalRcv.

Referenced by ProcessWalSndrMessage().

319 {
320  WalRcvData *walrcv = WalRcv;
321  XLogRecPtr receivePtr;
322  XLogRecPtr replayPtr;
323 
324  long secs;
325  int usecs;
326 
327  TimestampTz chunkReplayStartTime;
328 
329  SpinLockAcquire(&walrcv->mutex);
330  receivePtr = walrcv->receivedUpto;
331  SpinLockRelease(&walrcv->mutex);
332 
333  replayPtr = GetXLogReplayRecPtr(NULL);
334 
335  if (receivePtr == replayPtr)
336  return 0;
337 
338  chunkReplayStartTime = GetCurrentChunkReplayStartTime();
339 
340  if (chunkReplayStartTime == 0)
341  return -1;
342 
343  TimestampDifference(chunkReplayStartTime,
345  &secs, &usecs);
346 
347  return (((int) secs * 1000) + (usecs / 1000));
348 }
slock_t mutex
Definition: walreceiver.h:129
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
#define SpinLockAcquire(lock)
Definition: spin.h:62
TimestampTz GetCurrentChunkReplayStartTime(void)
Definition: xlog.c:6176
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11177
XLogRecPtr receivedUpto
Definition: walreceiver.h:82
#define SpinLockRelease(lock)
Definition: spin.h:64
uint64 XLogRecPtr
Definition: xlogdefs.h:21
WalRcvData * WalRcv
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1624
int GetReplicationTransferLatency ( void  )

Definition at line 355 of file walreceiverfuncs.c.

References WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::mutex, SpinLockAcquire, SpinLockRelease, TimestampDifference(), and WalRcv.

Referenced by ProcessWalSndrMessage().

356 {
357  WalRcvData *walrcv = WalRcv;
358 
359  TimestampTz lastMsgSendTime;
360  TimestampTz lastMsgReceiptTime;
361 
362  long secs = 0;
363  int usecs = 0;
364  int ms;
365 
366  SpinLockAcquire(&walrcv->mutex);
367  lastMsgSendTime = walrcv->lastMsgSendTime;
368  lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
369  SpinLockRelease(&walrcv->mutex);
370 
371  TimestampDifference(lastMsgSendTime,
372  lastMsgReceiptTime,
373  &secs, &usecs);
374 
375  ms = ((int) secs * 1000) + (usecs / 1000);
376 
377  return ms;
378 }
slock_t mutex
Definition: walreceiver.h:129
int64 TimestampTz
Definition: timestamp.h:39
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:97
TimestampTz lastMsgSendTime
Definition: walreceiver.h:96
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
WalRcvData * WalRcv
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1624
XLogRecPtr GetWalRcvWriteRecPtr ( XLogRecPtr latestChunkStart,
TimeLineID receiveTLI 
)

Definition at line 297 of file walreceiverfuncs.c.

References WalRcvData::latestChunkStart, WalRcvData::mutex, WalRcvData::receivedTLI, WalRcvData::receivedUpto, SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by CreateRestartPoint(), GetStandbyFlushRecPtr(), pg_last_wal_receive_lsn(), and WaitForWALToBecomeAvailable().

298 {
299  WalRcvData *walrcv = WalRcv;
300  XLogRecPtr recptr;
301 
302  SpinLockAcquire(&walrcv->mutex);
303  recptr = walrcv->receivedUpto;
304  if (latestChunkStart)
305  *latestChunkStart = walrcv->latestChunkStart;
306  if (receiveTLI)
307  *receiveTLI = walrcv->receivedTLI;
308  SpinLockRelease(&walrcv->mutex);
309 
310  return recptr;
311 }
slock_t mutex
Definition: walreceiver.h:129
TimeLineID receivedTLI
Definition: walreceiver.h:83
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr latestChunkStart
Definition: walreceiver.h:91
XLogRecPtr receivedUpto
Definition: walreceiver.h:82
#define SpinLockRelease(lock)
Definition: spin.h:64
static TimeLineID receiveTLI
Definition: xlog.c:203
uint64 XLogRecPtr
Definition: xlogdefs.h:21
WalRcvData * WalRcv
void RequestXLogStreaming ( TimeLineID  tli,
XLogRecPtr  recptr,
const char *  conninfo,
const char *  slotname 
)

Definition at line 223 of file walreceiverfuncs.c.

References Assert, WalRcvData::conninfo, WalRcvData::latch, WalRcvData::latestChunkStart, MAXCONNINFO, WalRcvData::mutex, NAMEDATALEN, now(), PMSIGNAL_START_WALRECEIVER, WalRcvData::receivedTLI, WalRcvData::receivedUpto, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, SendPostmasterSignal(), SetLatch(), WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, strlcpy(), wal_segment_size, WalRcv, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_WAITING, WalRcvData::walRcvState, and XLogSegmentOffset.

Referenced by WaitForWALToBecomeAvailable().

225 {
226  WalRcvData *walrcv = WalRcv;
227  bool launch = false;
228  pg_time_t now = (pg_time_t) time(NULL);
229  Latch *latch;
230 
231  /*
232  * We always start at the beginning of the segment. That prevents a broken
233  * segment (i.e., with no records in the first half of a segment) from
234  * being created by XLOG streaming, which might cause trouble later on if
235  * the segment is e.g archived.
236  */
237  if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
238  recptr -= XLogSegmentOffset(recptr, wal_segment_size);
239 
240  SpinLockAcquire(&walrcv->mutex);
241 
242  /* It better be stopped if we try to restart it */
243  Assert(walrcv->walRcvState == WALRCV_STOPPED ||
244  walrcv->walRcvState == WALRCV_WAITING);
245 
246  if (conninfo != NULL)
247  strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
248  else
249  walrcv->conninfo[0] = '\0';
250 
251  if (slotname != NULL)
252  strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
253  else
254  walrcv->slotname[0] = '\0';
255 
256  if (walrcv->walRcvState == WALRCV_STOPPED)
257  {
258  launch = true;
259  walrcv->walRcvState = WALRCV_STARTING;
260  }
261  else
262  walrcv->walRcvState = WALRCV_RESTARTING;
263  walrcv->startTime = now;
264 
265  /*
266  * If this is the first startup of walreceiver (on this timeline),
267  * initialize receivedUpto and latestChunkStart to the starting point.
268  */
269  if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
270  {
271  walrcv->receivedUpto = recptr;
272  walrcv->receivedTLI = tli;
273  walrcv->latestChunkStart = recptr;
274  }
275  walrcv->receiveStart = recptr;
276  walrcv->receiveStartTLI = tli;
277 
278  latch = walrcv->latch;
279 
280  SpinLockRelease(&walrcv->mutex);
281 
282  if (launch)
284  else if (latch)
285  SetLatch(latch);
286 }
int64 pg_time_t
Definition: pgtime.h:23
slock_t mutex
Definition: walreceiver.h:129
int wal_segment_size
Definition: xlog.c:113
WalRcvState walRcvState
Definition: walreceiver.h:63
TimeLineID receivedTLI
Definition: walreceiver.h:83
pg_time_t startTime
Definition: walreceiver.h:64
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define MAXCONNINFO
Definition: walreceiver.h:35
XLogRecPtr latestChunkStart
Definition: walreceiver.h:91
Definition: latch.h:110
XLogRecPtr receivedUpto
Definition: walreceiver.h:82
TimeLineID receiveStartTLI
Definition: walreceiver.h:73
Latch * latch
Definition: walreceiver.h:127
#define SpinLockRelease(lock)
Definition: spin.h:64
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
#define Assert(condition)
Definition: c.h:681
WalRcvData * WalRcv
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:113
XLogRecPtr receiveStart
Definition: walreceiver.h:72
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
char slotname[NAMEDATALEN]
Definition: walreceiver.h:115
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:109
void ShutdownWalRcv ( void  )

Definition at line 163 of file walreceiverfuncs.c.

References HandleStartupProcInterrupts(), WalRcvData::mutex, pg_usleep(), WalRcvData::pid, SpinLockAcquire, SpinLockRelease, WalRcv, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvRunning(), and WalRcvData::walRcvState.

Referenced by StartupXLOG(), and WaitForWALToBecomeAvailable().

164 {
165  WalRcvData *walrcv = WalRcv;
166  pid_t walrcvpid = 0;
167 
168  /*
169  * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
170  * mode once it's finished, and will also request postmaster to not
171  * restart itself.
172  */
173  SpinLockAcquire(&walrcv->mutex);
174  switch (walrcv->walRcvState)
175  {
176  case WALRCV_STOPPED:
177  break;
178  case WALRCV_STARTING:
179  walrcv->walRcvState = WALRCV_STOPPED;
180  break;
181 
182  case WALRCV_STREAMING:
183  case WALRCV_WAITING:
184  case WALRCV_RESTARTING:
185  walrcv->walRcvState = WALRCV_STOPPING;
186  /* fall through */
187  case WALRCV_STOPPING:
188  walrcvpid = walrcv->pid;
189  break;
190  }
191  SpinLockRelease(&walrcv->mutex);
192 
193  /*
194  * Signal walreceiver process if it was still running.
195  */
196  if (walrcvpid != 0)
197  kill(walrcvpid, SIGTERM);
198 
199  /*
200  * Wait for walreceiver to acknowledge its death by setting state to
201  * WALRCV_STOPPED.
202  */
203  while (WalRcvRunning())
204  {
205  /*
206  * This possibly-long loop needs to handle interrupts of startup
207  * process.
208  */
210 
211  pg_usleep(100000); /* 100ms */
212  }
213 }
slock_t mutex
Definition: walreceiver.h:129
WalRcvState walRcvState
Definition: walreceiver.h:63
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
pid_t pid
Definition: walreceiver.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
WalRcvData * WalRcv
void HandleStartupProcInterrupts(void)
Definition: startup.c:148
bool WalRcvRunning(void)
bool WalRcvRunning ( void  )

Definition at line 72 of file walreceiverfuncs.c.

References WalRcvData::mutex, now(), SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, WalRcv, WALRCV_STARTING, WALRCV_STARTUP_TIMEOUT, WALRCV_STOPPED, and WalRcvData::walRcvState.

Referenced by ShutdownWalRcv().

73 {
74  WalRcvData *walrcv = WalRcv;
76  pg_time_t startTime;
77 
78  SpinLockAcquire(&walrcv->mutex);
79 
80  state = walrcv->walRcvState;
81  startTime = walrcv->startTime;
82 
83  SpinLockRelease(&walrcv->mutex);
84 
85  /*
86  * If it has taken too long for walreceiver to start up, give up. Setting
87  * the state to STOPPED ensures that if walreceiver later does start up
88  * after all, it will see that it's not supposed to be running and die
89  * without doing anything.
90  */
91  if (state == WALRCV_STARTING)
92  {
93  pg_time_t now = (pg_time_t) time(NULL);
94 
95  if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
96  {
97  SpinLockAcquire(&walrcv->mutex);
98 
99  if (walrcv->walRcvState == WALRCV_STARTING)
100  state = walrcv->walRcvState = WALRCV_STOPPED;
101 
102  SpinLockRelease(&walrcv->mutex);
103  }
104  }
105 
106  if (state != WALRCV_STOPPED)
107  return true;
108  else
109  return false;
110 }
int64 pg_time_t
Definition: pgtime.h:23
slock_t mutex
Definition: walreceiver.h:129
WalRcvState walRcvState
Definition: walreceiver.h:63
pg_time_t startTime
Definition: walreceiver.h:64
WalRcvState
Definition: walreceiver.h:43
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
#define WALRCV_STARTUP_TIMEOUT
Definition: regguts.h:298
WalRcvData * WalRcv
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
void WalRcvShmemInit ( void  )

Definition at line 53 of file walreceiverfuncs.c.

References WalRcvData::latch, MemSet, WalRcvData::mutex, ShmemInitStruct(), SpinLockInit, WALRCV_STOPPED, WalRcvShmemSize(), and WalRcvData::walRcvState.

Referenced by CreateSharedMemoryAndSemaphores().

54 {
55  bool found;
56 
57  WalRcv = (WalRcvData *)
58  ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
59 
60  if (!found)
61  {
62  /* First time through, so initialize */
66  WalRcv->latch = NULL;
67  }
68 }
slock_t mutex
Definition: walreceiver.h:129
WalRcvState walRcvState
Definition: walreceiver.h:63
#define SpinLockInit(lock)
Definition: spin.h:60
#define MemSet(start, val, len)
Definition: c.h:863
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
Latch * latch
Definition: walreceiver.h:127
Size WalRcvShmemSize(void)
WalRcvData * WalRcv
Size WalRcvShmemSize ( void  )

Definition at line 42 of file walreceiverfuncs.c.

References add_size().

Referenced by CreateSharedMemoryAndSemaphores(), and WalRcvShmemInit().

43 {
44  Size size = 0;
45 
46  size = add_size(size, sizeof(WalRcvData));
47 
48  return size;
49 }
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:350
bool WalRcvStreaming ( void  )

Definition at line 117 of file walreceiverfuncs.c.

References WalRcvData::mutex, now(), SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, WalRcv, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STARTUP_TIMEOUT, WALRCV_STOPPED, WALRCV_STREAMING, and WalRcvData::walRcvState.

Referenced by WaitForWALToBecomeAvailable().

118 {
119  WalRcvData *walrcv = WalRcv;
121  pg_time_t startTime;
122 
123  SpinLockAcquire(&walrcv->mutex);
124 
125  state = walrcv->walRcvState;
126  startTime = walrcv->startTime;
127 
128  SpinLockRelease(&walrcv->mutex);
129 
130  /*
131  * If it has taken too long for walreceiver to start up, give up. Setting
132  * the state to STOPPED ensures that if walreceiver later does start up
133  * after all, it will see that it's not supposed to be running and die
134  * without doing anything.
135  */
136  if (state == WALRCV_STARTING)
137  {
138  pg_time_t now = (pg_time_t) time(NULL);
139 
140  if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
141  {
142  SpinLockAcquire(&walrcv->mutex);
143 
144  if (walrcv->walRcvState == WALRCV_STARTING)
145  state = walrcv->walRcvState = WALRCV_STOPPED;
146 
147  SpinLockRelease(&walrcv->mutex);
148  }
149  }
150 
151  if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
152  state == WALRCV_RESTARTING)
153  return true;
154  else
155  return false;
156 }
int64 pg_time_t
Definition: pgtime.h:23
slock_t mutex
Definition: walreceiver.h:129
WalRcvState walRcvState
Definition: walreceiver.h:63
pg_time_t startTime
Definition: walreceiver.h:64
WalRcvState
Definition: walreceiver.h:43
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
#define WALRCV_STARTUP_TIMEOUT
Definition: regguts.h:298
WalRcvData * WalRcv
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534

Variable Documentation