PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
walreceiver.h File Reference
#include "access/xlog.h"
#include "access/xlogdefs.h"
#include "fmgr.h"
#include "replication/logicalproto.h"
#include "replication/walsender.h"
#include "storage/latch.h"
#include "storage/spin.h"
#include "pgtime.h"
#include "utils/tuplestore.h"
Include dependency graph for walreceiver.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  WalRcvData
 
struct  WalRcvStreamOptions
 
struct  WalRcvExecResult
 
struct  WalReceiverFunctionsType
 

Macros

#define MAXCONNINFO   1024
 
#define AllowCascadeReplication()   (EnableHotStandby && max_wal_senders > 0)
 
#define walrcv_connect(conninfo, logical, appname, err)   WalReceiverFunctions->walrcv_connect(conninfo, logical, appname, err)
 
#define walrcv_check_conninfo(conninfo)   WalReceiverFunctions->walrcv_check_conninfo(conninfo)
 
#define walrcv_get_conninfo(conn)   WalReceiverFunctions->walrcv_get_conninfo(conn)
 
#define walrcv_identify_system(conn, primary_tli, server_version)   WalReceiverFunctions->walrcv_identify_system(conn, primary_tli, server_version)
 
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)   WalReceiverFunctions->walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
 
#define walrcv_startstreaming(conn, options)   WalReceiverFunctions->walrcv_startstreaming(conn, options)
 
#define walrcv_endstreaming(conn, next_tli)   WalReceiverFunctions->walrcv_endstreaming(conn, next_tli)
 
#define walrcv_receive(conn, buffer, wait_fd)   WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
 
#define walrcv_send(conn, buffer, nbytes)   WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
 
#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)   WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
 
#define walrcv_exec(conn, exec, nRetTypes, retTypes)   WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes)
 
#define walrcv_disconnect(conn)   WalReceiverFunctions->walrcv_disconnect(conn)
 

Typedefs

typedef struct WalReceiverConn WalReceiverConn
 
typedef struct WalRcvExecResult WalRcvExecResult
 
typedef WalReceiverConn *(* walrcv_connect_fn )(const char *conninfo, bool logical, const char *appname, char **err)
 
typedef void(* walrcv_check_conninfo_fn )(const char *conninfo)
 
typedef char *(* walrcv_get_conninfo_fn )(WalReceiverConn *conn)
 
typedef char *(* walrcv_identify_system_fn )(WalReceiverConn *conn, TimeLineID *primary_tli, int *server_version)
 
typedef void(* walrcv_readtimelinehistoryfile_fn )(WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *size)
 
typedef bool(* walrcv_startstreaming_fn )(WalReceiverConn *conn, const WalRcvStreamOptions *options)
 
typedef void(* walrcv_endstreaming_fn )(WalReceiverConn *conn, TimeLineID *next_tli)
 
typedef int(* walrcv_receive_fn )(WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)
 
typedef void(* walrcv_send_fn )(WalReceiverConn *conn, const char *buffer, int nbytes)
 
typedef char *(* walrcv_create_slot_fn )(WalReceiverConn *conn, const char *slotname, bool temporary, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
 
typedef WalRcvExecResult *(* walrcv_exec_fn )(WalReceiverConn *conn, const char *query, const int nRetTypes, const Oid *retTypes)
 
typedef void(* walrcv_disconnect_fn )(WalReceiverConn *conn)
 
typedef struct
WalReceiverFunctionsType 
WalReceiverFunctionsType
 

Enumerations

enum  WalRcvState {
  WALRCV_STOPPED, WALRCV_STARTING, WALRCV_STREAMING, WALRCV_WAITING,
  WALRCV_RESTARTING, WALRCV_STOPPING
}
 
enum  WalRcvExecStatus {
  WALRCV_ERROR, WALRCV_OK_COMMAND, WALRCV_OK_TUPLES, WALRCV_OK_COPY_IN,
  WALRCV_OK_COPY_OUT, WALRCV_OK_COPY_BOTH
}
 

Functions

static void walrcv_clear_result (WalRcvExecResult *walres)
 
void WalReceiverMain (void) pg_attribute_noreturn()
 
Size WalRcvShmemSize (void)
 
void WalRcvShmemInit (void)
 
void ShutdownWalRcv (void)
 
bool WalRcvStreaming (void)
 
bool WalRcvRunning (void)
 
void RequestXLogStreaming (TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname)
 
XLogRecPtr GetWalRcvWriteRecPtr (XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
 
int GetReplicationApplyDelay (void)
 
int GetReplicationTransferLatency (void)
 
void WalRcvForceReply (void)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
WalRcvDataWalRcv
 
PGDLLIMPORT
WalReceiverFunctionsType
WalReceiverFunctions
 

Macro Definition Documentation

#define AllowCascadeReplication ( )    (EnableHotStandby && max_wal_senders > 0)

Definition at line 38 of file walreceiver.h.

Referenced by StartupXLOG(), and XLogWalRcvFlush().

#define MAXCONNINFO   1024

Definition at line 35 of file walreceiver.h.

Referenced by RequestXLogStreaming(), and WalReceiverMain().

#define walrcv_check_conninfo (   conninfo)    WalReceiverFunctions->walrcv_check_conninfo(conninfo)

Definition at line 244 of file walreceiver.h.

Referenced by AlterSubscription(), and CreateSubscription().

#define walrcv_connect (   conninfo,
  logical,
  appname,
  err 
)    WalReceiverFunctions->walrcv_connect(conninfo, logical, appname, err)
#define walrcv_create_slot (   conn,
  slotname,
  temporary,
  snapshot_action,
  lsn 
)    WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)

Definition at line 260 of file walreceiver.h.

Referenced by CreateSubscription(), and LogicalRepSyncTableStart().

#define walrcv_disconnect (   conn)    WalReceiverFunctions->walrcv_disconnect(conn)
#define walrcv_endstreaming (   conn,
  next_tli 
)    WalReceiverFunctions->walrcv_endstreaming(conn, next_tli)
#define walrcv_exec (   conn,
  exec,
  nRetTypes,
  retTypes 
)    WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_get_conninfo (   conn)    WalReceiverFunctions->walrcv_get_conninfo(conn)

Definition at line 246 of file walreceiver.h.

Referenced by WalReceiverMain().

#define walrcv_identify_system (   conn,
  primary_tli,
  server_version 
)    WalReceiverFunctions->walrcv_identify_system(conn, primary_tli, server_version)

Definition at line 248 of file walreceiver.h.

Referenced by ApplyWorkerMain(), and WalReceiverMain().

#define walrcv_readtimelinehistoryfile (   conn,
  tli,
  filename,
  content,
  size 
)    WalReceiverFunctions->walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)

Definition at line 250 of file walreceiver.h.

Referenced by WalRcvFetchTimeLineHistoryFiles().

#define walrcv_receive (   conn,
  buffer,
  wait_fd 
)    WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)

Definition at line 256 of file walreceiver.h.

Referenced by copy_read_data(), LogicalRepApplyLoop(), and WalReceiverMain().

#define walrcv_send (   conn,
  buffer,
  nbytes 
)    WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)

Definition at line 258 of file walreceiver.h.

Referenced by send_feedback(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

#define walrcv_startstreaming (   conn,
  options 
)    WalReceiverFunctions->walrcv_startstreaming(conn, options)

Definition at line 252 of file walreceiver.h.

Referenced by ApplyWorkerMain(), and WalReceiverMain().

Typedef Documentation

typedef void(* walrcv_check_conninfo_fn)(const char *conninfo)

Definition at line 197 of file walreceiver.h.

typedef WalReceiverConn*(* walrcv_connect_fn)(const char *conninfo, bool logical, const char *appname, char **err)

Definition at line 194 of file walreceiver.h.

typedef char*(* walrcv_create_slot_fn)(WalReceiverConn *conn, const char *slotname, bool temporary, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)

Definition at line 214 of file walreceiver.h.

typedef void(* walrcv_disconnect_fn)(WalReceiverConn *conn)

Definition at line 222 of file walreceiver.h.

typedef void(* walrcv_endstreaming_fn)(WalReceiverConn *conn, TimeLineID *next_tli)

Definition at line 208 of file walreceiver.h.

typedef WalRcvExecResult*(* walrcv_exec_fn)(WalReceiverConn *conn, const char *query, const int nRetTypes, const Oid *retTypes)

Definition at line 218 of file walreceiver.h.

typedef char*(* walrcv_get_conninfo_fn)(WalReceiverConn *conn)

Definition at line 198 of file walreceiver.h.

typedef char*(* walrcv_identify_system_fn)(WalReceiverConn *conn, TimeLineID *primary_tli, int *server_version)

Definition at line 199 of file walreceiver.h.

typedef void(* walrcv_readtimelinehistoryfile_fn)(WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *size)

Definition at line 202 of file walreceiver.h.

typedef int(* walrcv_receive_fn)(WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)

Definition at line 210 of file walreceiver.h.

typedef void(* walrcv_send_fn)(WalReceiverConn *conn, const char *buffer, int nbytes)

Definition at line 212 of file walreceiver.h.

typedef bool(* walrcv_startstreaming_fn)(WalReceiverConn *conn, const WalRcvStreamOptions *options)

Definition at line 206 of file walreceiver.h.

Definition at line 162 of file walreceiver.h.

Enumeration Type Documentation

Enumerator
WALRCV_ERROR 
WALRCV_OK_COMMAND 
WALRCV_OK_TUPLES 
WALRCV_OK_COPY_IN 
WALRCV_OK_COPY_OUT 
WALRCV_OK_COPY_BOTH 

Definition at line 169 of file walreceiver.h.

170 {
171  WALRCV_ERROR, /* There was error when executing the query. */
172  WALRCV_OK_COMMAND, /* Query executed utility or replication
173  * command. */
174  WALRCV_OK_TUPLES, /* Query returned tuples. */
175  WALRCV_OK_COPY_IN, /* Query started COPY FROM. */
176  WALRCV_OK_COPY_OUT, /* Query started COPY TO. */
177  WALRCV_OK_COPY_BOTH /* Query started COPY BOTH replication
178  * protocol. */
WalRcvExecStatus
Definition: walreceiver.h:169
Enumerator
WALRCV_STOPPED 
WALRCV_STARTING 
WALRCV_STREAMING 
WALRCV_WAITING 
WALRCV_RESTARTING 
WALRCV_STOPPING 

Definition at line 43 of file walreceiver.h.

44 {
45  WALRCV_STOPPED, /* stopped and mustn't start up again */
46  WALRCV_STARTING, /* launched, but the process hasn't
47  * initialized yet */
48  WALRCV_STREAMING, /* walreceiver is streaming */
49  WALRCV_WAITING, /* stopped streaming, waiting for orders */
50  WALRCV_RESTARTING, /* asked to restart streaming */
51  WALRCV_STOPPING /* requested to stop, but still running */
52 } WalRcvState;
WalRcvState
Definition: walreceiver.h:43

Function Documentation

int GetReplicationApplyDelay ( void  )

Definition at line 315 of file walreceiverfuncs.c.

References GetCurrentChunkReplayStartTime(), GetCurrentTimestamp(), GetXLogReplayRecPtr(), WalRcvData::mutex, NULL, WalRcvData::receivedUpto, SpinLockAcquire, SpinLockRelease, TimestampDifference(), and WalRcv.

Referenced by ProcessWalSndrMessage().

316 {
317  WalRcvData *walrcv = WalRcv;
318  XLogRecPtr receivePtr;
319  XLogRecPtr replayPtr;
320 
321  long secs;
322  int usecs;
323 
324  TimestampTz chunkReplayStartTime;
325 
326  SpinLockAcquire(&walrcv->mutex);
327  receivePtr = walrcv->receivedUpto;
328  SpinLockRelease(&walrcv->mutex);
329 
330  replayPtr = GetXLogReplayRecPtr(NULL);
331 
332  if (receivePtr == replayPtr)
333  return 0;
334 
335  chunkReplayStartTime = GetCurrentChunkReplayStartTime();
336 
337  if (chunkReplayStartTime == 0)
338  return -1;
339 
340  TimestampDifference(chunkReplayStartTime,
342  &secs, &usecs);
343 
344  return (((int) secs * 1000) + (usecs / 1000));
345 }
slock_t mutex
Definition: walreceiver.h:117
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
#define SpinLockAcquire(lock)
Definition: spin.h:62
TimestampTz GetCurrentChunkReplayStartTime(void)
Definition: xlog.c:6090
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11088
XLogRecPtr receivedUpto
Definition: walreceiver.h:82
#define SpinLockRelease(lock)
Definition: spin.h:64
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
WalRcvData * WalRcv
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1624
int GetReplicationTransferLatency ( void  )

Definition at line 352 of file walreceiverfuncs.c.

References WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::mutex, SpinLockAcquire, SpinLockRelease, TimestampDifference(), and WalRcv.

Referenced by ProcessWalSndrMessage().

353 {
354  WalRcvData *walrcv = WalRcv;
355 
356  TimestampTz lastMsgSendTime;
357  TimestampTz lastMsgReceiptTime;
358 
359  long secs = 0;
360  int usecs = 0;
361  int ms;
362 
363  SpinLockAcquire(&walrcv->mutex);
364  lastMsgSendTime = walrcv->lastMsgSendTime;
365  lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
366  SpinLockRelease(&walrcv->mutex);
367 
368  TimestampDifference(lastMsgSendTime,
369  lastMsgReceiptTime,
370  &secs, &usecs);
371 
372  ms = ((int) secs * 1000) + (usecs / 1000);
373 
374  return ms;
375 }
slock_t mutex
Definition: walreceiver.h:117
int64 TimestampTz
Definition: timestamp.h:39
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:97
TimestampTz lastMsgSendTime
Definition: walreceiver.h:96
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
WalRcvData * WalRcv
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1624
XLogRecPtr GetWalRcvWriteRecPtr ( XLogRecPtr latestChunkStart,
TimeLineID receiveTLI 
)

Definition at line 294 of file walreceiverfuncs.c.

References WalRcvData::latestChunkStart, WalRcvData::mutex, WalRcvData::receivedTLI, WalRcvData::receivedUpto, SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by CreateRestartPoint(), GetStandbyFlushRecPtr(), pg_last_wal_receive_lsn(), and WaitForWALToBecomeAvailable().

295 {
296  WalRcvData *walrcv = WalRcv;
297  XLogRecPtr recptr;
298 
299  SpinLockAcquire(&walrcv->mutex);
300  recptr = walrcv->receivedUpto;
301  if (latestChunkStart)
302  *latestChunkStart = walrcv->latestChunkStart;
303  if (receiveTLI)
304  *receiveTLI = walrcv->receivedTLI;
305  SpinLockRelease(&walrcv->mutex);
306 
307  return recptr;
308 }
slock_t mutex
Definition: walreceiver.h:117
TimeLineID receivedTLI
Definition: walreceiver.h:83
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr latestChunkStart
Definition: walreceiver.h:91
XLogRecPtr receivedUpto
Definition: walreceiver.h:82
#define SpinLockRelease(lock)
Definition: spin.h:64
static TimeLineID receiveTLI
Definition: xlog.c:201
uint64 XLogRecPtr
Definition: xlogdefs.h:21
WalRcvData * WalRcv
void RequestXLogStreaming ( TimeLineID  tli,
XLogRecPtr  recptr,
const char *  conninfo,
const char *  slotname 
)

Definition at line 223 of file walreceiverfuncs.c.

References Assert, WalRcvData::conninfo, WalRcvData::latch, WalRcvData::latestChunkStart, MAXCONNINFO, WalRcvData::mutex, NAMEDATALEN, now(), NULL, PMSIGNAL_START_WALRECEIVER, WalRcvData::receivedTLI, WalRcvData::receivedUpto, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, SendPostmasterSignal(), SetLatch(), WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, strlcpy(), WalRcv, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_WAITING, WalRcvData::walRcvState, and XLogSegSize.

Referenced by WaitForWALToBecomeAvailable().

225 {
226  WalRcvData *walrcv = WalRcv;
227  bool launch = false;
228  pg_time_t now = (pg_time_t) time(NULL);
229 
230  /*
231  * We always start at the beginning of the segment. That prevents a broken
232  * segment (i.e., with no records in the first half of a segment) from
233  * being created by XLOG streaming, which might cause trouble later on if
234  * the segment is e.g archived.
235  */
236  if (recptr % XLogSegSize != 0)
237  recptr -= recptr % XLogSegSize;
238 
239  SpinLockAcquire(&walrcv->mutex);
240 
241  /* It better be stopped if we try to restart it */
242  Assert(walrcv->walRcvState == WALRCV_STOPPED ||
243  walrcv->walRcvState == WALRCV_WAITING);
244 
245  if (conninfo != NULL)
246  strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
247  else
248  walrcv->conninfo[0] = '\0';
249 
250  if (slotname != NULL)
251  strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
252  else
253  walrcv->slotname[0] = '\0';
254 
255  if (walrcv->walRcvState == WALRCV_STOPPED)
256  {
257  launch = true;
258  walrcv->walRcvState = WALRCV_STARTING;
259  }
260  else
261  walrcv->walRcvState = WALRCV_RESTARTING;
262  walrcv->startTime = now;
263 
264  /*
265  * If this is the first startup of walreceiver (on this timeline),
266  * initialize receivedUpto and latestChunkStart to the starting point.
267  */
268  if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
269  {
270  walrcv->receivedUpto = recptr;
271  walrcv->receivedTLI = tli;
272  walrcv->latestChunkStart = recptr;
273  }
274  walrcv->receiveStart = recptr;
275  walrcv->receiveStartTLI = tli;
276 
277  SpinLockRelease(&walrcv->mutex);
278 
279  if (launch)
281  else if (walrcv->latch)
282  SetLatch(walrcv->latch);
283 }
#define XLogSegSize
Definition: xlog_internal.h:92
int64 pg_time_t
Definition: pgtime.h:23
slock_t mutex
Definition: walreceiver.h:117
WalRcvState walRcvState
Definition: walreceiver.h:63
TimeLineID receivedTLI
Definition: walreceiver.h:83
pg_time_t startTime
Definition: walreceiver.h:64
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define MAXCONNINFO
Definition: walreceiver.h:35
XLogRecPtr latestChunkStart
Definition: walreceiver.h:91
XLogRecPtr receivedUpto
Definition: walreceiver.h:82
TimeLineID receiveStartTLI
Definition: walreceiver.h:73
Latch * latch
Definition: walreceiver.h:135
#define SpinLockRelease(lock)
Definition: spin.h:64
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
WalRcvData * WalRcv
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:113
XLogRecPtr receiveStart
Definition: walreceiver.h:72
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
char slotname[NAMEDATALEN]
Definition: walreceiver.h:115
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:109
void ShutdownWalRcv ( void  )

Definition at line 163 of file walreceiverfuncs.c.

References HandleStartupProcInterrupts(), WalRcvData::mutex, pg_usleep(), WalRcvData::pid, SpinLockAcquire, SpinLockRelease, WalRcv, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvRunning(), and WalRcvData::walRcvState.

Referenced by StartupXLOG(), and WaitForWALToBecomeAvailable().

164 {
165  WalRcvData *walrcv = WalRcv;
166  pid_t walrcvpid = 0;
167 
168  /*
169  * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
170  * mode once it's finished, and will also request postmaster to not
171  * restart itself.
172  */
173  SpinLockAcquire(&walrcv->mutex);
174  switch (walrcv->walRcvState)
175  {
176  case WALRCV_STOPPED:
177  break;
178  case WALRCV_STARTING:
179  walrcv->walRcvState = WALRCV_STOPPED;
180  break;
181 
182  case WALRCV_STREAMING:
183  case WALRCV_WAITING:
184  case WALRCV_RESTARTING:
185  walrcv->walRcvState = WALRCV_STOPPING;
186  /* fall through */
187  case WALRCV_STOPPING:
188  walrcvpid = walrcv->pid;
189  break;
190  }
191  SpinLockRelease(&walrcv->mutex);
192 
193  /*
194  * Signal walreceiver process if it was still running.
195  */
196  if (walrcvpid != 0)
197  kill(walrcvpid, SIGTERM);
198 
199  /*
200  * Wait for walreceiver to acknowledge its death by setting state to
201  * WALRCV_STOPPED.
202  */
203  while (WalRcvRunning())
204  {
205  /*
206  * This possibly-long loop needs to handle interrupts of startup
207  * process.
208  */
210 
211  pg_usleep(100000); /* 100ms */
212  }
213 }
slock_t mutex
Definition: walreceiver.h:117
WalRcvState walRcvState
Definition: walreceiver.h:63
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
pid_t pid
Definition: walreceiver.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
WalRcvData * WalRcv
void HandleStartupProcInterrupts(void)
Definition: startup.c:148
bool WalRcvRunning(void)
static void walrcv_clear_result ( WalRcvExecResult walres)
inlinestatic

Definition at line 268 of file walreceiver.h.

References WalRcvExecResult::err, FreeTupleDesc(), pfree(), WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, and tuplestore_end().

Referenced by copy_table(), DropSubscription(), fetch_remote_table_info(), fetch_table_list(), and LogicalRepSyncTableStart().

269 {
270  if (!walres)
271  return;
272 
273  if (walres->err)
274  pfree(walres->err);
275 
276  if (walres->tuplestore)
277  tuplestore_end(walres->tuplestore);
278 
279  if (walres->tupledesc)
280  FreeTupleDesc(walres->tupledesc);
281 
282  pfree(walres);
283 }
void pfree(void *pointer)
Definition: mcxt.c:950
TupleDesc tupledesc
Definition: walreceiver.h:190
Tuplestorestate * tuplestore
Definition: walreceiver.h:189
void tuplestore_end(Tuplestorestate *state)
Definition: tuplestore.c:453
void FreeTupleDesc(TupleDesc tupdesc)
Definition: tupdesc.c:268
void WalRcvForceReply ( void  )

Definition at line 1340 of file walreceiver.c.

References WalRcvData::force_reply, WalRcvData::latch, SetLatch(), and WalRcv.

Referenced by StartupXLOG(), and WaitForWALToBecomeAvailable().

1341 {
1342  WalRcv->force_reply = true;
1343  if (WalRcv->latch)
1344  SetLatch(WalRcv->latch);
1345 }
Latch * latch
Definition: walreceiver.h:135
bool force_reply
Definition: walreceiver.h:123
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
WalRcvData * WalRcv
bool WalRcvRunning ( void  )

Definition at line 72 of file walreceiverfuncs.c.

References WalRcvData::mutex, now(), NULL, SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, WalRcv, WALRCV_STARTING, WALRCV_STARTUP_TIMEOUT, WALRCV_STOPPED, and WalRcvData::walRcvState.

Referenced by ShutdownWalRcv().

73 {
74  WalRcvData *walrcv = WalRcv;
76  pg_time_t startTime;
77 
78  SpinLockAcquire(&walrcv->mutex);
79 
80  state = walrcv->walRcvState;
81  startTime = walrcv->startTime;
82 
83  SpinLockRelease(&walrcv->mutex);
84 
85  /*
86  * If it has taken too long for walreceiver to start up, give up. Setting
87  * the state to STOPPED ensures that if walreceiver later does start up
88  * after all, it will see that it's not supposed to be running and die
89  * without doing anything.
90  */
91  if (state == WALRCV_STARTING)
92  {
93  pg_time_t now = (pg_time_t) time(NULL);
94 
95  if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
96  {
97  SpinLockAcquire(&walrcv->mutex);
98 
99  if (walrcv->walRcvState == WALRCV_STARTING)
100  state = walrcv->walRcvState = WALRCV_STOPPED;
101 
102  SpinLockRelease(&walrcv->mutex);
103  }
104  }
105 
106  if (state != WALRCV_STOPPED)
107  return true;
108  else
109  return false;
110 }
int64 pg_time_t
Definition: pgtime.h:23
slock_t mutex
Definition: walreceiver.h:117
WalRcvState walRcvState
Definition: walreceiver.h:63
pg_time_t startTime
Definition: walreceiver.h:64
WalRcvState
Definition: walreceiver.h:43
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
#define WALRCV_STARTUP_TIMEOUT
#define NULL
Definition: c.h:229
Definition: regguts.h:298
WalRcvData * WalRcv
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
void WalRcvShmemInit ( void  )

Definition at line 53 of file walreceiverfuncs.c.

References WalRcvData::latch, MemSet, WalRcvData::mutex, NULL, ShmemInitStruct(), SpinLockInit, WALRCV_STOPPED, WalRcvShmemSize(), and WalRcvData::walRcvState.

Referenced by CreateSharedMemoryAndSemaphores().

54 {
55  bool found;
56 
57  WalRcv = (WalRcvData *)
58  ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
59 
60  if (!found)
61  {
62  /* First time through, so initialize */
66  WalRcv->latch = NULL;
67  }
68 }
slock_t mutex
Definition: walreceiver.h:117
WalRcvState walRcvState
Definition: walreceiver.h:63
#define SpinLockInit(lock)
Definition: spin.h:60
#define MemSet(start, val, len)
Definition: c.h:857
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
Latch * latch
Definition: walreceiver.h:135
Size WalRcvShmemSize(void)
#define NULL
Definition: c.h:229
WalRcvData * WalRcv
Size WalRcvShmemSize ( void  )

Definition at line 42 of file walreceiverfuncs.c.

References add_size().

Referenced by CreateSharedMemoryAndSemaphores(), and WalRcvShmemInit().

43 {
44  Size size = 0;
45 
46  size = add_size(size, sizeof(WalRcvData));
47 
48  return size;
49 }
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:356
bool WalRcvStreaming ( void  )

Definition at line 117 of file walreceiverfuncs.c.

References WalRcvData::mutex, now(), NULL, SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, WalRcv, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STARTUP_TIMEOUT, WALRCV_STOPPED, WALRCV_STREAMING, and WalRcvData::walRcvState.

Referenced by WaitForWALToBecomeAvailable().

118 {
119  WalRcvData *walrcv = WalRcv;
121  pg_time_t startTime;
122 
123  SpinLockAcquire(&walrcv->mutex);
124 
125  state = walrcv->walRcvState;
126  startTime = walrcv->startTime;
127 
128  SpinLockRelease(&walrcv->mutex);
129 
130  /*
131  * If it has taken too long for walreceiver to start up, give up. Setting
132  * the state to STOPPED ensures that if walreceiver later does start up
133  * after all, it will see that it's not supposed to be running and die
134  * without doing anything.
135  */
136  if (state == WALRCV_STARTING)
137  {
138  pg_time_t now = (pg_time_t) time(NULL);
139 
140  if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
141  {
142  SpinLockAcquire(&walrcv->mutex);
143 
144  if (walrcv->walRcvState == WALRCV_STARTING)
145  state = walrcv->walRcvState = WALRCV_STOPPED;
146 
147  SpinLockRelease(&walrcv->mutex);
148  }
149  }
150 
151  if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
152  state == WALRCV_RESTARTING)
153  return true;
154  else
155  return false;
156 }
int64 pg_time_t
Definition: pgtime.h:23
slock_t mutex
Definition: walreceiver.h:117
WalRcvState walRcvState
Definition: walreceiver.h:63
pg_time_t startTime
Definition: walreceiver.h:64
WalRcvState
Definition: walreceiver.h:43
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
#define WALRCV_STARTUP_TIMEOUT
#define NULL
Definition: c.h:229
Definition: regguts.h:298
WalRcvData * WalRcv
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
void WalReceiverMain ( void  )

Definition at line 188 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, Assert, BlockSig, buf, close, WalRcvData::conninfo, CurrentResourceOwner, DEBUG1, DisableWalRcvImmediateExit(), elog, EnableWalRcvImmediateExit(), ereport, errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, WalRcvData::force_reply, GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), got_SIGHUP, initStringInfo(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latch, WalRcvData::latestWalEndTime, load_file(), LOG, WalRcvStreamOptions::logical, LogstreamResult, MAXCONNINFO, MAXFNAMELEN, WalRcvData::mutex, MyProc, MyProcPid, NAMEDATALEN, NAPTIME_PER_CYCLE, now(), NULL, on_shmem_exit(), options, PANIC, pfree(), pg_memory_barrier, PG_SETMASK, PGC_SIGHUP, PGINVALID_SOCKET, WalRcvStreamOptions::physical, WalRcvData::pid, pqsignal(), proc_exit(), ProcessConfigFile(), ProcessWalRcvInterrupts(), PGPROC::procLatch, WalRcvStreamOptions::proto, WalRcvData::ready_to_display, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, ResetLatch(), ResourceOwnerCreate(), server_version, SIG_DFL, SIG_IGN, SIGALRM, SIGCHLD, SIGCONT, SIGHUP, SIGPIPE, SIGQUIT, SIGTTIN, SIGTTOU, SIGUSR1, SIGUSR2, SIGWINCH, WalRcvData::slotname, WalRcvStreamOptions::slotname, snprintf(), SpinLockAcquire, SpinLockRelease, WalRcvStreamOptions::startpoint, strlcpy(), ThisTimeLineID, TimestampTzPlusMilliseconds, UINT64_FORMAT, UnBlockSig, WAIT_EVENT_WAL_RECEIVER_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, WalRcv, walrcv_connect, walrcv_endstreaming, walrcv_get_conninfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvQuickDieHandler(), WalRcvShutdownHandler(), WalRcvSigHupHandler(), WalRcvSigUsr1Handler(), WalRcvData::walRcvState, WalRcvWaitForStartPosition(), WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_TIMEOUT, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName, XLogFileNameP(), XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by AuxiliaryProcessMain().

189 {
190  char conninfo[MAXCONNINFO];
191  char *tmp_conninfo;
192  char slotname[NAMEDATALEN];
193  XLogRecPtr startpoint;
194  TimeLineID startpointTLI;
195  TimeLineID primaryTLI;
196  bool first_stream;
197  WalRcvData *walrcv = WalRcv;
198  TimestampTz last_recv_timestamp;
199  bool ping_sent;
200  char *err;
201 
202  /*
203  * WalRcv should be set up already (if we are a backend, we inherit this
204  * by fork() or EXEC_BACKEND mechanism from the postmaster).
205  */
206  Assert(walrcv != NULL);
207 
208  /*
209  * Mark walreceiver as running in shared memory.
210  *
211  * Do this as early as possible, so that if we fail later on, we'll set
212  * state to STOPPED. If we die before this, the startup process will keep
213  * waiting for us to start up, until it times out.
214  */
215  SpinLockAcquire(&walrcv->mutex);
216  Assert(walrcv->pid == 0);
217  switch (walrcv->walRcvState)
218  {
219  case WALRCV_STOPPING:
220  /* If we've already been requested to stop, don't start up. */
221  walrcv->walRcvState = WALRCV_STOPPED;
222  /* fall through */
223 
224  case WALRCV_STOPPED:
225  SpinLockRelease(&walrcv->mutex);
226  proc_exit(1);
227  break;
228 
229  case WALRCV_STARTING:
230  /* The usual case */
231  break;
232 
233  case WALRCV_WAITING:
234  case WALRCV_STREAMING:
235  case WALRCV_RESTARTING:
236  default:
237  /* Shouldn't happen */
238  elog(PANIC, "walreceiver still running according to shared memory state");
239  }
240  /* Advertise our PID so that the startup process can kill us */
241  walrcv->pid = MyProcPid;
242  walrcv->walRcvState = WALRCV_STREAMING;
243 
244  /* Fetch information required to start streaming */
245  walrcv->ready_to_display = false;
246  strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
247  strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
248  startpoint = walrcv->receiveStart;
249  startpointTLI = walrcv->receiveStartTLI;
250 
251  /* Initialise to a sanish value */
253 
254  SpinLockRelease(&walrcv->mutex);
255 
256  /* Arrange to clean up at walreceiver exit */
258 
259  walrcv->latch = &MyProc->procLatch;
260 
261  /* Properly accept or ignore signals the postmaster might send us */
262  pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */
263  pqsignal(SIGINT, SIG_IGN);
264  pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */
265  pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */
270 
271  /* Reset some signals that are accepted by postmaster but not here */
277 
278  /* We allow SIGQUIT (quickdie) at all times */
279  sigdelset(&BlockSig, SIGQUIT);
280 
281  /* Load the libpq-specific functions */
282  load_file("libpqwalreceiver", false);
283  if (WalReceiverFunctions == NULL)
284  elog(ERROR, "libpqwalreceiver didn't initialize correctly");
285 
286  /*
287  * Create a resource owner to keep track of our resources (not clear that
288  * we need this, but may as well have one).
289  */
290  CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
291 
292  /* Unblock signals (they were blocked when the postmaster forked us) */
294 
295  /* Establish the connection to the primary for XLOG streaming */
297  wrconn = walrcv_connect(conninfo, false, "walreceiver", &err);
298  if (!wrconn)
299  ereport(ERROR,
300  (errmsg("could not connect to the primary server: %s", err)));
302 
303  /*
304  * Save user-visible connection string. This clobbers the original
305  * conninfo, for security.
306  */
307  tmp_conninfo = walrcv_get_conninfo(wrconn);
308  SpinLockAcquire(&walrcv->mutex);
309  memset(walrcv->conninfo, 0, MAXCONNINFO);
310  if (tmp_conninfo)
311  {
312  strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
313  pfree(tmp_conninfo);
314  }
315  walrcv->ready_to_display = true;
316  SpinLockRelease(&walrcv->mutex);
317 
318  first_stream = true;
319  for (;;)
320  {
321  char *primary_sysid;
322  char standby_sysid[32];
323  int server_version;
325 
326  /*
327  * Check that we're connected to a valid server using the
328  * IDENTIFY_SYSTEM replication command.
329  */
331  primary_sysid = walrcv_identify_system(wrconn, &primaryTLI,
332  &server_version);
333 
334  snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
336  if (strcmp(primary_sysid, standby_sysid) != 0)
337  {
338  ereport(ERROR,
339  (errmsg("database system identifier differs between the primary and standby"),
340  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
341  primary_sysid, standby_sysid)));
342  }
344 
345  /*
346  * Confirm that the current timeline of the primary is the same or
347  * ahead of ours.
348  */
349  if (primaryTLI < startpointTLI)
350  ereport(ERROR,
351  (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
352  primaryTLI, startpointTLI)));
353 
354  /*
355  * Get any missing history files. We do this always, even when we're
356  * not interested in that timeline, so that if we're promoted to
357  * become the master later on, we don't select the same timeline that
358  * was already used in the current master. This isn't bullet-proof -
359  * you'll need some external software to manage your cluster if you
360  * need to ensure that a unique timeline id is chosen in every case,
361  * but let's avoid the confusion of timeline id collisions where we
362  * can.
363  */
364  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
365 
366  /*
367  * Start streaming.
368  *
369  * We'll try to start at the requested starting point and timeline,
370  * even if it's different from the server's latest timeline. In case
371  * we've already reached the end of the old timeline, the server will
372  * finish the streaming immediately, and we will go back to await
373  * orders from the startup process. If recovery_target_timeline is
374  * 'latest', the startup process will scan pg_wal and find the new
375  * history file, bump recovery target timeline, and ask us to restart
376  * on the new timeline.
377  */
378  options.logical = false;
379  options.startpoint = startpoint;
380  options.slotname = slotname[0] != '\0' ? slotname : NULL;
381  options.proto.physical.startpointTLI = startpointTLI;
382  ThisTimeLineID = startpointTLI;
383  if (walrcv_startstreaming(wrconn, &options))
384  {
385  if (first_stream)
386  ereport(LOG,
387  (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
388  (uint32) (startpoint >> 32), (uint32) startpoint,
389  startpointTLI)));
390  else
391  ereport(LOG,
392  (errmsg("restarted WAL streaming at %X/%X on timeline %u",
393  (uint32) (startpoint >> 32), (uint32) startpoint,
394  startpointTLI)));
395  first_stream = false;
396 
397  /* Initialize LogstreamResult and buffers for processing messages */
401 
402  /* Initialize the last recv timestamp */
403  last_recv_timestamp = GetCurrentTimestamp();
404  ping_sent = false;
405 
406  /* Loop until end-of-streaming or error */
407  for (;;)
408  {
409  char *buf;
410  int len;
411  bool endofwal = false;
412  pgsocket wait_fd = PGINVALID_SOCKET;
413  int rc;
414 
415  /*
416  * Exit walreceiver if we're not in recovery. This should not
417  * happen, but cross-check the status here.
418  */
419  if (!RecoveryInProgress())
420  ereport(FATAL,
421  (errmsg("cannot continue WAL streaming, recovery has already ended")));
422 
423  /* Process any requests or signals received recently */
425 
426  if (got_SIGHUP)
427  {
428  got_SIGHUP = false;
431  }
432 
433  /* See if we can read data immediately */
434  len = walrcv_receive(wrconn, &buf, &wait_fd);
435  if (len != 0)
436  {
437  /*
438  * Process the received data, and any subsequent data we
439  * can read without blocking.
440  */
441  for (;;)
442  {
443  if (len > 0)
444  {
445  /*
446  * Something was received from master, so reset
447  * timeout
448  */
449  last_recv_timestamp = GetCurrentTimestamp();
450  ping_sent = false;
451  XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
452  }
453  else if (len == 0)
454  break;
455  else if (len < 0)
456  {
457  ereport(LOG,
458  (errmsg("replication terminated by primary server"),
459  errdetail("End of WAL reached on timeline %u at %X/%X.",
460  startpointTLI,
461  (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
462  endofwal = true;
463  break;
464  }
465  len = walrcv_receive(wrconn, &buf, &wait_fd);
466  }
467 
468  /* Let the master know that we received some data. */
469  XLogWalRcvSendReply(false, false);
470 
471  /*
472  * If we've written some records, flush them to disk and
473  * let the startup process and primary server know about
474  * them.
475  */
476  XLogWalRcvFlush(false);
477  }
478 
479  /* Check if we need to exit the streaming loop. */
480  if (endofwal)
481  break;
482 
483  /*
484  * Ideally we would reuse a WaitEventSet object repeatedly
485  * here to avoid the overheads of WaitLatchOrSocket on epoll
486  * systems, but we can't be sure that libpq (or any other
487  * walreceiver implementation) has the same socket (even if
488  * the fd is the same number, it may have been closed and
489  * reopened since the last time). In future, if there is a
490  * function for removing sockets from WaitEventSet, then we
491  * could add and remove just the socket each time, potentially
492  * avoiding some system calls.
493  */
494  Assert(wait_fd != PGINVALID_SOCKET);
495  rc = WaitLatchOrSocket(walrcv->latch,
498  wait_fd,
501  if (rc & WL_LATCH_SET)
502  {
503  ResetLatch(walrcv->latch);
504  if (walrcv->force_reply)
505  {
506  /*
507  * The recovery process has asked us to send apply
508  * feedback now. Make sure the flag is really set to
509  * false in shared memory before sending the reply, so
510  * we don't miss a new request for a reply.
511  */
512  walrcv->force_reply = false;
514  XLogWalRcvSendReply(true, false);
515  }
516  }
517  if (rc & WL_POSTMASTER_DEATH)
518  {
519  /*
520  * Emergency bailout if postmaster has died. This is to
521  * avoid the necessity for manual cleanup of all
522  * postmaster children.
523  */
524  exit(1);
525  }
526  if (rc & WL_TIMEOUT)
527  {
528  /*
529  * We didn't receive anything new. If we haven't heard
530  * anything from the server for more than
531  * wal_receiver_timeout / 2, ping the server. Also, if
532  * it's been longer than wal_receiver_status_interval
533  * since the last update we sent, send a status update to
534  * the master anyway, to report any progress in applying
535  * WAL.
536  */
537  bool requestReply = false;
538 
539  /*
540  * Check if time since last receive from standby has
541  * reached the configured limit.
542  */
543  if (wal_receiver_timeout > 0)
544  {
546  TimestampTz timeout;
547 
548  timeout =
549  TimestampTzPlusMilliseconds(last_recv_timestamp,
551 
552  if (now >= timeout)
553  ereport(ERROR,
554  (errmsg("terminating walreceiver due to timeout")));
555 
556  /*
557  * We didn't receive anything new, for half of
558  * receiver replication timeout. Ping the server.
559  */
560  if (!ping_sent)
561  {
562  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
563  (wal_receiver_timeout / 2));
564  if (now >= timeout)
565  {
566  requestReply = true;
567  ping_sent = true;
568  }
569  }
570  }
571 
572  XLogWalRcvSendReply(requestReply, requestReply);
574  }
575  }
576 
577  /*
578  * The backend finished streaming. Exit streaming COPY-mode from
579  * our side, too.
580  */
582  walrcv_endstreaming(wrconn, &primaryTLI);
584 
585  /*
586  * If the server had switched to a new timeline that we didn't
587  * know about when we began streaming, fetch its timeline history
588  * file now.
589  */
590  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
591  }
592  else
593  ereport(LOG,
594  (errmsg("primary server contains no more WAL on requested timeline %u",
595  startpointTLI)));
596 
597  /*
598  * End of WAL reached on the requested timeline. Close the last
599  * segment, and await for new orders from the startup process.
600  */
601  if (recvFile >= 0)
602  {
603  char xlogfname[MAXFNAMELEN];
604 
605  XLogWalRcvFlush(false);
606  if (close(recvFile) != 0)
607  ereport(PANIC,
609  errmsg("could not close log segment %s: %m",
611 
612  /*
613  * Create .done file forcibly to prevent the streamed segment from
614  * being archived later.
615  */
616  XLogFileName(xlogfname, recvFileTLI, recvSegNo);
618  XLogArchiveForceDone(xlogfname);
619  else
620  XLogArchiveNotify(xlogfname);
621  }
622  recvFile = -1;
623 
624  elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
625  WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
626  }
627  /* not reached */
628 }
static struct @26 LogstreamResult
#define SIGUSR1
Definition: win32.h:202
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
Definition: walreceiver.c:719
#define walrcv_get_conninfo(conn)
Definition: walreceiver.h:246
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:254
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:39
#define SIGCONT
Definition: win32.h:197
uint32 TimeLineID
Definition: xlogdefs.h:45
slock_t mutex
Definition: walreceiver.h:117
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
Definition: walreceiver.c:634
WalRcvState walRcvState
Definition: walreceiver.h:63
#define SIGWINCH
Definition: win32.h:201
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
static StringInfoData incoming_message
Definition: walreceiver.c:112
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:154
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1174
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:256
#define XLogFileName(fname, tli, logSegNo)
#define SIGTTIN
Definition: win32.h:199
static void WalRcvSigHupHandler(SIGNAL_ARGS)
Definition: walreceiver.c:799
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:252
void proc_exit(int code)
Definition: ipc.c:99
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define WL_SOCKET_READABLE
Definition: latch.h:125
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
Definition: walreceiver.c:870
void ResetLatch(volatile Latch *latch)
Definition: latch.c:498
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7872
#define PANIC
Definition: elog.h:53
#define NAPTIME_PER_CYCLE
Definition: walreceiver.c:82
#define SIGQUIT
Definition: win32.h:189
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:97
TimestampTz lastMsgSendTime
Definition: walreceiver.h:96
#define PG_SETMASK(mask)
Definition: pqsignal.h:19
int wal_receiver_timeout
Definition: walreceiver.c:75
Latch procLatch
Definition: proc.h:103
static WalReceiverConn * wrconn
Definition: walreceiver.c:79
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
static StringInfoData reply_message
Definition: walreceiver.c:111
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:512
void pfree(void *pointer)
Definition: mcxt.c:950
#define SIG_IGN
Definition: win32.h:185
#define ERROR
Definition: elog.h:43
static void WalRcvSigUsr1Handler(SIGNAL_ARGS)
Definition: walreceiver.c:807
#define FATAL
Definition: elog.h:52
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11088
pid_t pid
Definition: walreceiver.h:62
#define MAXCONNINFO
Definition: walreceiver.h:35
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
#define walrcv_identify_system(conn, primary_tli, server_version)
Definition: walreceiver.h:248
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10130
static char * buf
Definition: pg_test_fsync.c:66
int XLogArchiveMode
Definition: xlog.c:94
int errdetail(const char *fmt,...)
Definition: elog.c:873
int errcode_for_file_access(void)
Definition: elog.c:598
static void DisableWalRcvImmediateExit(void)
Definition: walreceiver.c:180
XLogRecPtr startpoint
Definition: walreceiver.h:145
unsigned int uint32
Definition: c.h:268
int pgsocket
Definition: port.h:22
sigset_t UnBlockSig
Definition: pqsignal.c:22
static volatile sig_atomic_t got_SIGHUP
Definition: walreceiver.c:98
#define ereport(elevel, rest)
Definition: elog.h:122
TimeLineID receiveStartTLI
Definition: walreceiver.h:73
static TimeLineID recvFileTLI
Definition: walreceiver.c:90
Definition: guc.h:72
Latch * latch
Definition: walreceiver.h:135
static char ** options
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
static XLogSegNo recvSegNo
Definition: walreceiver.c:91
bool force_reply
Definition: walreceiver.h:123
#define MAXFNAMELEN
union WalRcvStreamOptions::@97 proto
#define SpinLockRelease(lock)
Definition: spin.h:64
sigset_t BlockSig
Definition: pqsignal.c:22
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void EnableWalRcvImmediateExit(void)
Definition: walreceiver.c:173
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:562
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:80
#define PGINVALID_SOCKET
Definition: port.h:24
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1105
bool ready_to_display
Definition: walreceiver.h:126
TimestampTz latestWalEndTime
Definition: walreceiver.h:103
static void WalRcvShutdownHandler(SIGNAL_ARGS)
Definition: walreceiver.c:818
#define SIGPIPE
Definition: win32.h:193
#define SIGHUP
Definition: win32.h:188
TimeLineID ThisTimeLineID
Definition: xlog.c:179
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define SIG_DFL
Definition: win32.h:183
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define pg_memory_barrier()
Definition: atomics.h:148
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:137
static int server_version
Definition: pg_dumpall.c:82
static void WalRcvQuickDieHandler(SIGNAL_ARGS)
Definition: walreceiver.c:841
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1046
WalRcvData * WalRcv
#define SIGTTOU
Definition: win32.h:200
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4689
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void WalRcvDie(int code, Datum arg)
Definition: walreceiver.c:768
struct WalRcvStreamOptions::@97::@98 physical
#define elog
Definition: elog.h:219
#define close(a)
Definition: win32.h:12
XLogRecPtr receiveStart
Definition: walreceiver.h:72
static int recvFile
Definition: walreceiver.c:89
#define SIGCHLD
Definition: win32.h:198
#define WL_LATCH_SET
Definition: latch.h:124
#define SIGALRM
Definition: win32.h:194
#define UINT64_FORMAT
Definition: c.h:316
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
char slotname[NAMEDATALEN]
Definition: walreceiver.h:115
#define SIGUSR2
Definition: win32.h:203
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:109
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:242

Variable Documentation

bool hot_standby_feedback

Definition at line 76 of file walreceiver.c.

Referenced by XLogWalRcvSendHSFeedback().

int wal_receiver_status_interval

Definition at line 74 of file walreceiver.c.

Referenced by send_feedback(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

int wal_receiver_timeout

Definition at line 75 of file walreceiver.c.

Referenced by logicalrep_worker_launch(), LogicalRepApplyLoop(), and WalReceiverMain().

PGDLLIMPORT WalReceiverFunctionsType* WalReceiverFunctions

Definition at line 80 of file walreceiver.c.

Referenced by _PG_init().