PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
walreceiverfuncs.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walreceiverfuncs.c
4  *
5  * This file contains functions used by the startup process to communicate
6  * with the walreceiver process. Functions implementing walreceiver itself
7  * are in walreceiver.c.
8  *
9  * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
10  *
11  *
12  * IDENTIFICATION
13  * src/backend/replication/walreceiverfuncs.c
14  *
15  *-------------------------------------------------------------------------
16  */
17 #include "postgres.h"
18 
19 #include <sys/stat.h>
20 #include <sys/time.h>
21 #include <time.h>
22 #include <unistd.h>
23 #include <signal.h>
24 
25 #include "access/xlog_internal.h"
26 #include "postmaster/startup.h"
28 #include "storage/pmsignal.h"
29 #include "storage/shmem.h"
30 #include "utils/timestamp.h"
31 
33 
34 /*
35  * How long to wait for walreceiver to start up after requesting
36  * postmaster to launch it. In seconds.
37  */
38 #define WALRCV_STARTUP_TIMEOUT 10
39 
40 /* Report shared memory space needed by WalRcvShmemInit */
41 Size
43 {
44  Size size = 0;
45 
46  size = add_size(size, sizeof(WalRcvData));
47 
48  return size;
49 }
50 
51 /* Allocate and initialize walreceiver-related shared memory */
52 void
54 {
55  bool found;
56 
57  WalRcv = (WalRcvData *)
58  ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
59 
60  if (!found)
61  {
62  /* First time through, so initialize */
63  MemSet(WalRcv, 0, WalRcvShmemSize());
64  WalRcv->walRcvState = WALRCV_STOPPED;
65  SpinLockInit(&WalRcv->mutex);
66  WalRcv->latch = NULL;
67  }
68 }
69 
70 /* Is walreceiver running (or starting up)? */
71 bool
73 {
74  WalRcvData *walrcv = WalRcv;
76  pg_time_t startTime;
77 
78  SpinLockAcquire(&walrcv->mutex);
79 
80  state = walrcv->walRcvState;
81  startTime = walrcv->startTime;
82 
83  SpinLockRelease(&walrcv->mutex);
84 
85  /*
86  * If it has taken too long for walreceiver to start up, give up. Setting
87  * the state to STOPPED ensures that if walreceiver later does start up
88  * after all, it will see that it's not supposed to be running and die
89  * without doing anything.
90  */
91  if (state == WALRCV_STARTING)
92  {
93  pg_time_t now = (pg_time_t) time(NULL);
94 
95  if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
96  {
97  SpinLockAcquire(&walrcv->mutex);
98 
99  if (walrcv->walRcvState == WALRCV_STARTING)
100  state = walrcv->walRcvState = WALRCV_STOPPED;
101 
102  SpinLockRelease(&walrcv->mutex);
103  }
104  }
105 
106  if (state != WALRCV_STOPPED)
107  return true;
108  else
109  return false;
110 }
111 
112 /*
113  * Is walreceiver running and streaming (or at least attempting to connect,
114  * or starting up)?
115  */
116 bool
118 {
119  WalRcvData *walrcv = WalRcv;
121  pg_time_t startTime;
122 
123  SpinLockAcquire(&walrcv->mutex);
124 
125  state = walrcv->walRcvState;
126  startTime = walrcv->startTime;
127 
128  SpinLockRelease(&walrcv->mutex);
129 
130  /*
131  * If it has taken too long for walreceiver to start up, give up. Setting
132  * the state to STOPPED ensures that if walreceiver later does start up
133  * after all, it will see that it's not supposed to be running and die
134  * without doing anything.
135  */
136  if (state == WALRCV_STARTING)
137  {
138  pg_time_t now = (pg_time_t) time(NULL);
139 
140  if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
141  {
142  SpinLockAcquire(&walrcv->mutex);
143 
144  if (walrcv->walRcvState == WALRCV_STARTING)
145  state = walrcv->walRcvState = WALRCV_STOPPED;
146 
147  SpinLockRelease(&walrcv->mutex);
148  }
149  }
150 
151  if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
152  state == WALRCV_RESTARTING)
153  return true;
154  else
155  return false;
156 }
157 
158 /*
159  * Stop walreceiver (if running) and wait for it to die.
160  * Executed by the Startup process.
161  */
162 void
164 {
165  WalRcvData *walrcv = WalRcv;
166  pid_t walrcvpid = 0;
167 
168  /*
169  * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
170  * mode once it's finished, and will also request postmaster to not
171  * restart itself.
172  */
173  SpinLockAcquire(&walrcv->mutex);
174  switch (walrcv->walRcvState)
175  {
176  case WALRCV_STOPPED:
177  break;
178  case WALRCV_STARTING:
179  walrcv->walRcvState = WALRCV_STOPPED;
180  break;
181 
182  case WALRCV_STREAMING:
183  case WALRCV_WAITING:
184  case WALRCV_RESTARTING:
185  walrcv->walRcvState = WALRCV_STOPPING;
186  /* fall through */
187  case WALRCV_STOPPING:
188  walrcvpid = walrcv->pid;
189  break;
190  }
191  SpinLockRelease(&walrcv->mutex);
192 
193  /*
194  * Signal walreceiver process if it was still running.
195  */
196  if (walrcvpid != 0)
197  kill(walrcvpid, SIGTERM);
198 
199  /*
200  * Wait for walreceiver to acknowledge its death by setting state to
201  * WALRCV_STOPPED.
202  */
203  while (WalRcvRunning())
204  {
205  /*
206  * This possibly-long loop needs to handle interrupts of startup
207  * process.
208  */
210 
211  pg_usleep(100000); /* 100ms */
212  }
213 }
214 
215 /*
216  * Request postmaster to start walreceiver.
217  *
218  * recptr indicates the position where streaming should begin, conninfo
219  * is a libpq connection string to use, and slotname is, optionally, the name
220  * of a replication slot to acquire.
221  */
222 void
223 RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
224  const char *slotname)
225 {
226  WalRcvData *walrcv = WalRcv;
227  bool launch = false;
228  pg_time_t now = (pg_time_t) time(NULL);
229 
230  /*
231  * We always start at the beginning of the segment. That prevents a broken
232  * segment (i.e., with no records in the first half of a segment) from
233  * being created by XLOG streaming, which might cause trouble later on if
234  * the segment is e.g archived.
235  */
236  if (recptr % XLogSegSize != 0)
237  recptr -= recptr % XLogSegSize;
238 
239  SpinLockAcquire(&walrcv->mutex);
240 
241  /* It better be stopped if we try to restart it */
242  Assert(walrcv->walRcvState == WALRCV_STOPPED ||
243  walrcv->walRcvState == WALRCV_WAITING);
244 
245  if (conninfo != NULL)
246  strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
247  else
248  walrcv->conninfo[0] = '\0';
249 
250  if (slotname != NULL)
251  strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
252  else
253  walrcv->slotname[0] = '\0';
254 
255  if (walrcv->walRcvState == WALRCV_STOPPED)
256  {
257  launch = true;
258  walrcv->walRcvState = WALRCV_STARTING;
259  }
260  else
261  walrcv->walRcvState = WALRCV_RESTARTING;
262  walrcv->startTime = now;
263 
264  /*
265  * If this is the first startup of walreceiver (on this timeline),
266  * initialize receivedUpto and latestChunkStart to the starting point.
267  */
268  if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
269  {
270  walrcv->receivedUpto = recptr;
271  walrcv->receivedTLI = tli;
272  walrcv->latestChunkStart = recptr;
273  }
274  walrcv->receiveStart = recptr;
275  walrcv->receiveStartTLI = tli;
276 
277  SpinLockRelease(&walrcv->mutex);
278 
279  if (launch)
281  else if (walrcv->latch)
282  SetLatch(walrcv->latch);
283 }
284 
285 /*
286  * Returns the last+1 byte position that walreceiver has written.
287  *
288  * Optionally, returns the previous chunk start, that is the first byte
289  * written in the most recent walreceiver flush cycle. Callers not
290  * interested in that value may pass NULL for latestChunkStart. Same for
291  * receiveTLI.
292  */
295 {
296  WalRcvData *walrcv = WalRcv;
297  XLogRecPtr recptr;
298 
299  SpinLockAcquire(&walrcv->mutex);
300  recptr = walrcv->receivedUpto;
301  if (latestChunkStart)
302  *latestChunkStart = walrcv->latestChunkStart;
303  if (receiveTLI)
304  *receiveTLI = walrcv->receivedTLI;
305  SpinLockRelease(&walrcv->mutex);
306 
307  return recptr;
308 }
309 
310 /*
311  * Returns the replication apply delay in ms or -1
312  * if the apply delay info is not available
313  */
314 int
316 {
317  WalRcvData *walrcv = WalRcv;
318  XLogRecPtr receivePtr;
319  XLogRecPtr replayPtr;
320 
321  long secs;
322  int usecs;
323 
324  TimestampTz chunkReplayStartTime;
325 
326  SpinLockAcquire(&walrcv->mutex);
327  receivePtr = walrcv->receivedUpto;
328  SpinLockRelease(&walrcv->mutex);
329 
330  replayPtr = GetXLogReplayRecPtr(NULL);
331 
332  if (receivePtr == replayPtr)
333  return 0;
334 
335  chunkReplayStartTime = GetCurrentChunkReplayStartTime();
336 
337  if (chunkReplayStartTime == 0)
338  return -1;
339 
340  TimestampDifference(chunkReplayStartTime,
342  &secs, &usecs);
343 
344  return (((int) secs * 1000) + (usecs / 1000));
345 }
346 
347 /*
348  * Returns the network latency in ms, note that this includes any
349  * difference in clock settings between the servers, as well as timezone.
350  */
351 int
353 {
354  WalRcvData *walrcv = WalRcv;
355 
356  TimestampTz lastMsgSendTime;
357  TimestampTz lastMsgReceiptTime;
358 
359  long secs = 0;
360  int usecs = 0;
361  int ms;
362 
363  SpinLockAcquire(&walrcv->mutex);
364  lastMsgSendTime = walrcv->lastMsgSendTime;
365  lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
366  SpinLockRelease(&walrcv->mutex);
367 
368  TimestampDifference(lastMsgSendTime,
369  lastMsgReceiptTime,
370  &secs, &usecs);
371 
372  ms = ((int) secs * 1000) + (usecs / 1000);
373 
374  return ms;
375 }
#define XLogSegSize
Definition: xlog_internal.h:92
uint32 TimeLineID
Definition: xlogdefs.h:45
int64 pg_time_t
Definition: pgtime.h:23
XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
slock_t mutex
Definition: walreceiver.h:120
int GetReplicationTransferLatency(void)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
WalRcvState walRcvState
Definition: walreceiver.h:63
#define SpinLockInit(lock)
Definition: spin.h:60
TimeLineID receivedTLI
Definition: walreceiver.h:83
#define MemSet(start, val, len)
Definition: c.h:857
void WalRcvShmemInit(void)
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:97
TimestampTz lastMsgSendTime
Definition: walreceiver.h:96
pg_time_t startTime
Definition: walreceiver.h:64
WalRcvState
Definition: walreceiver.h:43
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
TimestampTz GetCurrentChunkReplayStartTime(void)
Definition: xlog.c:6096
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11094
pid_t pid
Definition: walreceiver.h:62
#define MAXCONNINFO
Definition: walreceiver.h:35
XLogRecPtr latestChunkStart
Definition: walreceiver.h:91
XLogRecPtr receivedUpto
Definition: walreceiver.h:82
TimeLineID receiveStartTLI
Definition: walreceiver.h:73
Latch * latch
Definition: walreceiver.h:135
int GetReplicationApplyDelay(void)
void ShutdownWalRcv(void)
#define SpinLockRelease(lock)
Definition: spin.h:64
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
static TimeLineID receiveTLI
Definition: xlog.c:201
#define WALRCV_STARTUP_TIMEOUT
Size WalRcvShmemSize(void)
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
Definition: regguts.h:298
bool WalRcvStreaming(void)
size_t Size
Definition: c.h:356
WalRcvData * WalRcv
void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname)
void HandleStartupProcInterrupts(void)
Definition: startup.c:148
bool WalRcvRunning(void)
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:113
XLogRecPtr receiveStart
Definition: walreceiver.h:72
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1624
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
char slotname[NAMEDATALEN]
Definition: walreceiver.h:115
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:109