PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
walreceiverfuncs.c File Reference
#include "postgres.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <unistd.h>
#include <signal.h>
#include "access/xlog_internal.h"
#include "postmaster/startup.h"
#include "replication/walreceiver.h"
#include "storage/pmsignal.h"
#include "storage/shmem.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiverfuncs.c:

Go to the source code of this file.

Macros

#define WALRCV_STARTUP_TIMEOUT   10
 

Functions

Size WalRcvShmemSize (void)
 
void WalRcvShmemInit (void)
 
bool WalRcvRunning (void)
 
bool WalRcvStreaming (void)
 
void ShutdownWalRcv (void)
 
void RequestXLogStreaming (TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname)
 
XLogRecPtr GetWalRcvWriteRecPtr (XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
 
int GetReplicationApplyDelay (void)
 
int GetReplicationTransferLatency (void)
 

Variables

WalRcvDataWalRcv = NULL
 

Macro Definition Documentation

#define WALRCV_STARTUP_TIMEOUT   10

Definition at line 39 of file walreceiverfuncs.c.

Referenced by WalRcvRunning(), and WalRcvStreaming().

Function Documentation

int GetReplicationApplyDelay ( void  )

Definition at line 316 of file walreceiverfuncs.c.

References GetCurrentChunkReplayStartTime(), GetCurrentTimestamp(), GetXLogReplayRecPtr(), WalRcvData::mutex, NULL, WalRcvData::receivedUpto, SpinLockAcquire, SpinLockRelease, TimestampDifference(), and WalRcv.

Referenced by ProcessWalSndrMessage().

317 {
318  WalRcvData *walrcv = WalRcv;
319  XLogRecPtr receivePtr;
320  XLogRecPtr replayPtr;
321 
322  long secs;
323  int usecs;
324 
325  TimestampTz chunckReplayStartTime;
326 
327  SpinLockAcquire(&walrcv->mutex);
328  receivePtr = walrcv->receivedUpto;
329  SpinLockRelease(&walrcv->mutex);
330 
331  replayPtr = GetXLogReplayRecPtr(NULL);
332 
333  if (receivePtr == replayPtr)
334  return 0;
335 
336  chunckReplayStartTime = GetCurrentChunkReplayStartTime();
337 
338  if (chunckReplayStartTime == 0)
339  return -1;
340 
341  TimestampDifference(chunckReplayStartTime,
343  &secs, &usecs);
344 
345  return (((int) secs * 1000) + (usecs / 1000));
346 }
slock_t mutex
Definition: walreceiver.h:114
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1688
#define SpinLockAcquire(lock)
Definition: spin.h:62
TimestampTz GetCurrentChunkReplayStartTime(void)
Definition: xlog.c:6054
double TimestampTz
Definition: timestamp.h:51
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11004
XLogRecPtr receivedUpto
Definition: walreceiver.h:79
#define SpinLockRelease(lock)
Definition: spin.h:64
#define NULL
Definition: c.h:226
uint64 XLogRecPtr
Definition: xlogdefs.h:21
WalRcvData * WalRcv
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1791
int GetReplicationTransferLatency ( void  )

Definition at line 353 of file walreceiverfuncs.c.

References WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::mutex, SpinLockAcquire, SpinLockRelease, TimestampDifference(), and WalRcv.

Referenced by ProcessWalSndrMessage().

354 {
355  WalRcvData *walrcv = WalRcv;
356 
357  TimestampTz lastMsgSendTime;
358  TimestampTz lastMsgReceiptTime;
359 
360  long secs = 0;
361  int usecs = 0;
362  int ms;
363 
364  SpinLockAcquire(&walrcv->mutex);
365  lastMsgSendTime = walrcv->lastMsgSendTime;
366  lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
367  SpinLockRelease(&walrcv->mutex);
368 
369  TimestampDifference(lastMsgSendTime,
370  lastMsgReceiptTime,
371  &secs, &usecs);
372 
373  ms = ((int) secs * 1000) + (usecs / 1000);
374 
375  return ms;
376 }
slock_t mutex
Definition: walreceiver.h:114
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:94
TimestampTz lastMsgSendTime
Definition: walreceiver.h:93
#define SpinLockAcquire(lock)
Definition: spin.h:62
double TimestampTz
Definition: timestamp.h:51
#define SpinLockRelease(lock)
Definition: spin.h:64
WalRcvData * WalRcv
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1791
XLogRecPtr GetWalRcvWriteRecPtr ( XLogRecPtr latestChunkStart,
TimeLineID receiveTLI 
)

Definition at line 295 of file walreceiverfuncs.c.

References WalRcvData::latestChunkStart, WalRcvData::mutex, WalRcvData::receivedTLI, WalRcvData::receivedUpto, SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by CreateRestartPoint(), GetStandbyFlushRecPtr(), pg_last_wal_receive_location(), and WaitForWALToBecomeAvailable().

296 {
297  WalRcvData *walrcv = WalRcv;
298  XLogRecPtr recptr;
299 
300  SpinLockAcquire(&walrcv->mutex);
301  recptr = walrcv->receivedUpto;
302  if (latestChunkStart)
303  *latestChunkStart = walrcv->latestChunkStart;
304  if (receiveTLI)
305  *receiveTLI = walrcv->receivedTLI;
306  SpinLockRelease(&walrcv->mutex);
307 
308  return recptr;
309 }
slock_t mutex
Definition: walreceiver.h:114
TimeLineID receivedTLI
Definition: walreceiver.h:80
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr latestChunkStart
Definition: walreceiver.h:88
XLogRecPtr receivedUpto
Definition: walreceiver.h:79
#define SpinLockRelease(lock)
Definition: spin.h:64
static TimeLineID receiveTLI
Definition: xlog.c:199
uint64 XLogRecPtr
Definition: xlogdefs.h:21
WalRcvData * WalRcv
void RequestXLogStreaming ( TimeLineID  tli,
XLogRecPtr  recptr,
const char *  conninfo,
const char *  slotname 
)

Definition at line 224 of file walreceiverfuncs.c.

References Assert, WalRcvData::conninfo, WalRcvData::latch, WalRcvData::latestChunkStart, MAXCONNINFO, WalRcvData::mutex, NAMEDATALEN, now(), NULL, PMSIGNAL_START_WALRECEIVER, WalRcvData::receivedTLI, WalRcvData::receivedUpto, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, SendPostmasterSignal(), SetLatch(), WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, strlcpy(), WalRcv, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_WAITING, WalRcvData::walRcvState, and XLogSegSize.

Referenced by WaitForWALToBecomeAvailable().

226 {
227  WalRcvData *walrcv = WalRcv;
228  bool launch = false;
229  pg_time_t now = (pg_time_t) time(NULL);
230 
231  /*
232  * We always start at the beginning of the segment. That prevents a broken
233  * segment (i.e., with no records in the first half of a segment) from
234  * being created by XLOG streaming, which might cause trouble later on if
235  * the segment is e.g archived.
236  */
237  if (recptr % XLogSegSize != 0)
238  recptr -= recptr % XLogSegSize;
239 
240  SpinLockAcquire(&walrcv->mutex);
241 
242  /* It better be stopped if we try to restart it */
243  Assert(walrcv->walRcvState == WALRCV_STOPPED ||
244  walrcv->walRcvState == WALRCV_WAITING);
245 
246  if (conninfo != NULL)
247  strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
248  else
249  walrcv->conninfo[0] = '\0';
250 
251  if (slotname != NULL)
252  strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
253  else
254  walrcv->slotname[0] = '\0';
255 
256  if (walrcv->walRcvState == WALRCV_STOPPED)
257  {
258  launch = true;
259  walrcv->walRcvState = WALRCV_STARTING;
260  }
261  else
262  walrcv->walRcvState = WALRCV_RESTARTING;
263  walrcv->startTime = now;
264 
265  /*
266  * If this is the first startup of walreceiver (on this timeline),
267  * initialize receivedUpto and latestChunkStart to the starting point.
268  */
269  if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
270  {
271  walrcv->receivedUpto = recptr;
272  walrcv->receivedTLI = tli;
273  walrcv->latestChunkStart = recptr;
274  }
275  walrcv->receiveStart = recptr;
276  walrcv->receiveStartTLI = tli;
277 
278  SpinLockRelease(&walrcv->mutex);
279 
280  if (launch)
282  else if (walrcv->latch)
283  SetLatch(walrcv->latch);
284 }
#define XLogSegSize
Definition: xlog_internal.h:92
int64 pg_time_t
Definition: pgtime.h:23
slock_t mutex
Definition: walreceiver.h:114
WalRcvState walRcvState
Definition: walreceiver.h:60
TimeLineID receivedTLI
Definition: walreceiver.h:80
pg_time_t startTime
Definition: walreceiver.h:61
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define MAXCONNINFO
Definition: walreceiver.h:32
XLogRecPtr latestChunkStart
Definition: walreceiver.h:88
XLogRecPtr receivedUpto
Definition: walreceiver.h:79
TimeLineID receiveStartTLI
Definition: walreceiver.h:70
Latch * latch
Definition: walreceiver.h:132
#define SpinLockRelease(lock)
Definition: spin.h:64
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
void SetLatch(volatile Latch *latch)
Definition: latch.c:380
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:670
WalRcvData * WalRcv
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:113
XLogRecPtr receiveStart
Definition: walreceiver.h:69
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1652
char slotname[NAMEDATALEN]
Definition: walreceiver.h:112
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:106
void ShutdownWalRcv ( void  )

Definition at line 164 of file walreceiverfuncs.c.

References HandleStartupProcInterrupts(), WalRcvData::mutex, pg_usleep(), WalRcvData::pid, SpinLockAcquire, SpinLockRelease, WalRcv, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvRunning(), and WalRcvData::walRcvState.

Referenced by StartupXLOG(), and WaitForWALToBecomeAvailable().

165 {
166  WalRcvData *walrcv = WalRcv;
167  pid_t walrcvpid = 0;
168 
169  /*
170  * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
171  * mode once it's finished, and will also request postmaster to not
172  * restart itself.
173  */
174  SpinLockAcquire(&walrcv->mutex);
175  switch (walrcv->walRcvState)
176  {
177  case WALRCV_STOPPED:
178  break;
179  case WALRCV_STARTING:
180  walrcv->walRcvState = WALRCV_STOPPED;
181  break;
182 
183  case WALRCV_STREAMING:
184  case WALRCV_WAITING:
185  case WALRCV_RESTARTING:
186  walrcv->walRcvState = WALRCV_STOPPING;
187  /* fall through */
188  case WALRCV_STOPPING:
189  walrcvpid = walrcv->pid;
190  break;
191  }
192  SpinLockRelease(&walrcv->mutex);
193 
194  /*
195  * Signal walreceiver process if it was still running.
196  */
197  if (walrcvpid != 0)
198  kill(walrcvpid, SIGTERM);
199 
200  /*
201  * Wait for walreceiver to acknowledge its death by setting state to
202  * WALRCV_STOPPED.
203  */
204  while (WalRcvRunning())
205  {
206  /*
207  * This possibly-long loop needs to handle interrupts of startup
208  * process.
209  */
211 
212  pg_usleep(100000); /* 100ms */
213  }
214 }
slock_t mutex
Definition: walreceiver.h:114
WalRcvState walRcvState
Definition: walreceiver.h:60
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
pid_t pid
Definition: walreceiver.h:59
#define SpinLockRelease(lock)
Definition: spin.h:64
WalRcvData * WalRcv
void HandleStartupProcInterrupts(void)
Definition: startup.c:147
bool WalRcvRunning(void)
bool WalRcvRunning ( void  )

Definition at line 73 of file walreceiverfuncs.c.

References WalRcvData::mutex, now(), NULL, SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, WalRcv, WALRCV_STARTING, WALRCV_STARTUP_TIMEOUT, WALRCV_STOPPED, and WalRcvData::walRcvState.

Referenced by ShutdownWalRcv().

74 {
75  WalRcvData *walrcv = WalRcv;
77  pg_time_t startTime;
78 
79  SpinLockAcquire(&walrcv->mutex);
80 
81  state = walrcv->walRcvState;
82  startTime = walrcv->startTime;
83 
84  SpinLockRelease(&walrcv->mutex);
85 
86  /*
87  * If it has taken too long for walreceiver to start up, give up. Setting
88  * the state to STOPPED ensures that if walreceiver later does start up
89  * after all, it will see that it's not supposed to be running and die
90  * without doing anything.
91  */
92  if (state == WALRCV_STARTING)
93  {
94  pg_time_t now = (pg_time_t) time(NULL);
95 
96  if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
97  {
98  SpinLockAcquire(&walrcv->mutex);
99 
100  if (walrcv->walRcvState == WALRCV_STARTING)
101  state = walrcv->walRcvState = WALRCV_STOPPED;
102 
103  SpinLockRelease(&walrcv->mutex);
104  }
105  }
106 
107  if (state != WALRCV_STOPPED)
108  return true;
109  else
110  return false;
111 }
int64 pg_time_t
Definition: pgtime.h:23
slock_t mutex
Definition: walreceiver.h:114
WalRcvState walRcvState
Definition: walreceiver.h:60
pg_time_t startTime
Definition: walreceiver.h:61
WalRcvState
Definition: walreceiver.h:40
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
#define WALRCV_STARTUP_TIMEOUT
#define NULL
Definition: c.h:226
Definition: regguts.h:298
WalRcvData * WalRcv
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1652
void WalRcvShmemInit ( void  )

Definition at line 54 of file walreceiverfuncs.c.

References WalRcvData::latch, MemSet, WalRcvData::mutex, NULL, ShmemInitStruct(), SpinLockInit, WALRCV_STOPPED, WalRcvShmemSize(), and WalRcvData::walRcvState.

Referenced by CreateSharedMemoryAndSemaphores().

55 {
56  bool found;
57 
58  WalRcv = (WalRcvData *)
59  ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
60 
61  if (!found)
62  {
63  /* First time through, so initialize */
67  WalRcv->latch = NULL;
68  }
69 }
slock_t mutex
Definition: walreceiver.h:114
WalRcvState walRcvState
Definition: walreceiver.h:60
#define SpinLockInit(lock)
Definition: spin.h:60
#define MemSet(start, val, len)
Definition: c.h:852
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
Latch * latch
Definition: walreceiver.h:132
Size WalRcvShmemSize(void)
#define NULL
Definition: c.h:226
WalRcvData * WalRcv
Size WalRcvShmemSize ( void  )

Definition at line 43 of file walreceiverfuncs.c.

References add_size().

Referenced by CreateSharedMemoryAndSemaphores(), and WalRcvShmemInit().

44 {
45  Size size = 0;
46 
47  size = add_size(size, sizeof(WalRcvData));
48 
49  return size;
50 }
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:352
bool WalRcvStreaming ( void  )

Definition at line 118 of file walreceiverfuncs.c.

References WalRcvData::mutex, now(), NULL, SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, WalRcv, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STARTUP_TIMEOUT, WALRCV_STOPPED, WALRCV_STREAMING, and WalRcvData::walRcvState.

Referenced by WaitForWALToBecomeAvailable().

119 {
120  WalRcvData *walrcv = WalRcv;
122  pg_time_t startTime;
123 
124  SpinLockAcquire(&walrcv->mutex);
125 
126  state = walrcv->walRcvState;
127  startTime = walrcv->startTime;
128 
129  SpinLockRelease(&walrcv->mutex);
130 
131  /*
132  * If it has taken too long for walreceiver to start up, give up. Setting
133  * the state to STOPPED ensures that if walreceiver later does start up
134  * after all, it will see that it's not supposed to be running and die
135  * without doing anything.
136  */
137  if (state == WALRCV_STARTING)
138  {
139  pg_time_t now = (pg_time_t) time(NULL);
140 
141  if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
142  {
143  SpinLockAcquire(&walrcv->mutex);
144 
145  if (walrcv->walRcvState == WALRCV_STARTING)
146  state = walrcv->walRcvState = WALRCV_STOPPED;
147 
148  SpinLockRelease(&walrcv->mutex);
149  }
150  }
151 
152  if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
153  state == WALRCV_RESTARTING)
154  return true;
155  else
156  return false;
157 }
int64 pg_time_t
Definition: pgtime.h:23
slock_t mutex
Definition: walreceiver.h:114
WalRcvState walRcvState
Definition: walreceiver.h:60
pg_time_t startTime
Definition: walreceiver.h:61
WalRcvState
Definition: walreceiver.h:40
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
#define WALRCV_STARTUP_TIMEOUT
#define NULL
Definition: c.h:226
Definition: regguts.h:298
WalRcvData * WalRcv
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1652

Variable Documentation