PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
walreceiver.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */
 

Functions

static void ProcessWalRcvInterrupts (void)
 
static void EnableWalRcvImmediateExit (void)
 
static void DisableWalRcvImmediateExit (void)
 
static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr)
 
static void XLogWalRcvFlush (bool dying)
 
static void XLogWalRcvSendReply (bool force, bool requestReply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
static void WalRcvSigHupHandler (SIGNAL_ARGS)
 
static void WalRcvSigUsr1Handler (SIGNAL_ARGS)
 
static void WalRcvShutdownHandler (SIGNAL_ARGS)
 
static void WalRcvQuickDieHandler (SIGNAL_ARGS)
 
void WalReceiverMain (void)
 
void WalRcvForceReply (void)
 
static const char * WalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
static uint32 recvOff = 0
 
static volatile sig_atomic_t got_SIGHUP = false
 
static volatile sig_atomic_t got_SIGTERM = false
 
struct {
   XLogRecPtr   Write
 
   XLogRecPtr   Flush
 
LogstreamResult
 
static StringInfoData reply_message
 
static StringInfoData incoming_message
 
static volatile bool WalRcvImmediateInterruptOK = false
 

Macro Definition Documentation

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */

Definition at line 82 of file walreceiver.c.

Referenced by WalReceiverMain().

Function Documentation

static void DisableWalRcvImmediateExit ( void  )
static

Definition at line 180 of file walreceiver.c.

References ProcessWalRcvInterrupts(), and WalRcvImmediateInterruptOK.

Referenced by WalRcvFetchTimeLineHistoryFiles(), and WalReceiverMain().

181 {
184 }
static volatile bool WalRcvImmediateInterruptOK
Definition: walreceiver.c:130
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:154
static void EnableWalRcvImmediateExit ( void  )
static

Definition at line 173 of file walreceiver.c.

References ProcessWalRcvInterrupts(), and WalRcvImmediateInterruptOK.

Referenced by WalRcvFetchTimeLineHistoryFiles(), and WalReceiverMain().

174 {
177 }
static volatile bool WalRcvImmediateInterruptOK
Definition: walreceiver.c:130
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:154
Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1377 of file walreceiver.c.

References WalRcvData::conninfo, CStringGetTextDatum, DEFAULT_ROLE_READ_ALL_STATS, elog, ERROR, get_call_result_type(), GetUserId(), heap_form_tuple(), HeapTupleGetDatum, Int32GetDatum, is_member_of_role(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum, MemSet, WalRcvData::mutex, tupleDesc::natts, NULL, palloc0(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, pstrdup(), WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receivedUpto, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, TimestampTzGetDatum, TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, and XLogRecPtrIsInvalid.

1378 {
1379  TupleDesc tupdesc;
1380  Datum *values;
1381  bool *nulls;
1382  int pid;
1383  bool ready_to_display;
1385  XLogRecPtr receive_start_lsn;
1386  TimeLineID receive_start_tli;
1387  XLogRecPtr received_lsn;
1388  TimeLineID received_tli;
1389  TimestampTz last_send_time;
1390  TimestampTz last_receipt_time;
1391  XLogRecPtr latest_end_lsn;
1392  TimestampTz latest_end_time;
1393  char *slotname;
1394  char *conninfo;
1395 
1396  /* Take a lock to ensure value consistency */
1398  pid = (int) WalRcv->pid;
1399  ready_to_display = WalRcv->ready_to_display;
1400  state = WalRcv->walRcvState;
1401  receive_start_lsn = WalRcv->receiveStart;
1402  receive_start_tli = WalRcv->receiveStartTLI;
1403  received_lsn = WalRcv->receivedUpto;
1404  received_tli = WalRcv->receivedTLI;
1405  last_send_time = WalRcv->lastMsgSendTime;
1406  last_receipt_time = WalRcv->lastMsgReceiptTime;
1407  latest_end_lsn = WalRcv->latestWalEnd;
1408  latest_end_time = WalRcv->latestWalEndTime;
1409  slotname = pstrdup(WalRcv->slotname);
1410  conninfo = pstrdup(WalRcv->conninfo);
1412 
1413  /*
1414  * No WAL receiver (or not ready yet), just return a tuple with NULL
1415  * values
1416  */
1417  if (pid == 0 || !ready_to_display)
1418  PG_RETURN_NULL();
1419 
1420  /* determine result type */
1421  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1422  elog(ERROR, "return type must be a row type");
1423 
1424  values = palloc0(sizeof(Datum) * tupdesc->natts);
1425  nulls = palloc0(sizeof(bool) * tupdesc->natts);
1426 
1427  /* Fetch values */
1428  values[0] = Int32GetDatum(pid);
1429 
1431  {
1432  /*
1433  * Only superusers can see details. Other users only get the pid value
1434  * to know whether it is a WAL receiver, but no details.
1435  */
1436  MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1437  }
1438  else
1439  {
1440  values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1441 
1442  if (XLogRecPtrIsInvalid(receive_start_lsn))
1443  nulls[2] = true;
1444  else
1445  values[2] = LSNGetDatum(receive_start_lsn);
1446  values[3] = Int32GetDatum(receive_start_tli);
1447  if (XLogRecPtrIsInvalid(received_lsn))
1448  nulls[4] = true;
1449  else
1450  values[4] = LSNGetDatum(received_lsn);
1451  values[5] = Int32GetDatum(received_tli);
1452  if (last_send_time == 0)
1453  nulls[6] = true;
1454  else
1455  values[6] = TimestampTzGetDatum(last_send_time);
1456  if (last_receipt_time == 0)
1457  nulls[7] = true;
1458  else
1459  values[7] = TimestampTzGetDatum(last_receipt_time);
1460  if (XLogRecPtrIsInvalid(latest_end_lsn))
1461  nulls[8] = true;
1462  else
1463  values[8] = LSNGetDatum(latest_end_lsn);
1464  if (latest_end_time == 0)
1465  nulls[9] = true;
1466  else
1467  values[9] = TimestampTzGetDatum(latest_end_time);
1468  if (*slotname == '\0')
1469  nulls[10] = true;
1470  else
1471  values[10] = CStringGetTextDatum(slotname);
1472  if (*conninfo == '\0')
1473  nulls[11] = true;
1474  else
1475  values[11] = CStringGetTextDatum(conninfo);
1476  }
1477 
1478  /* Returns the record as Datum */
1479  PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1480 }
static const char * WalRcvGetStateString(WalRcvState state)
Definition: walreceiver.c:1352
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
uint32 TimeLineID
Definition: xlogdefs.h:45
slock_t mutex
Definition: walreceiver.h:120
Oid GetUserId(void)
Definition: miscinit.c:284
int64 TimestampTz
Definition: timestamp.h:39
WalRcvState walRcvState
Definition: walreceiver.h:63
char * pstrdup(const char *in)
Definition: mcxt.c:1077
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
TimeLineID receivedTLI
Definition: walreceiver.h:83
#define MemSet(start, val, len)
Definition: c.h:857
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
int natts
Definition: tupdesc.h:73
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:97
TimestampTz lastMsgSendTime
Definition: walreceiver.h:96
WalRcvState
Definition: walreceiver.h:43
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
pid_t pid
Definition: walreceiver.h:62
XLogRecPtr receivedUpto
Definition: walreceiver.h:82
TimeLineID receiveStartTLI
Definition: walreceiver.h:73
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
void * palloc0(Size size)
Definition: mcxt.c:878
uintptr_t Datum
Definition: postgres.h:372
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:313
bool ready_to_display
Definition: walreceiver.h:118
TimestampTz latestWalEndTime
Definition: walreceiver.h:103
#define DEFAULT_ROLE_READ_ALL_STATS
Definition: pg_authid.h:108
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4837
XLogRecPtr latestWalEnd
Definition: walreceiver.h:102
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:222
WalRcvData * WalRcv
static Datum values[MAXATTR]
Definition: bootstrap.c:163
#define Int32GetDatum(X)
Definition: postgres.h:485
#define CStringGetTextDatum(s)
Definition: builtins.h:91
#define elog
Definition: elog.h:219
XLogRecPtr receiveStart
Definition: walreceiver.h:72
#define PG_RETURN_NULL()
Definition: fmgr.h:305
char slotname[NAMEDATALEN]
Definition: walreceiver.h:115
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:109
static void ProcessWalRcvInterrupts ( void  )
static

Definition at line 154 of file walreceiver.c.

References CHECK_FOR_INTERRUPTS, ereport, errcode(), errmsg(), FATAL, got_SIGTERM, and WalRcvImmediateInterruptOK.

Referenced by DisableWalRcvImmediateExit(), EnableWalRcvImmediateExit(), WalRcvShutdownHandler(), WalRcvWaitForStartPosition(), and WalReceiverMain().

155 {
156  /*
157  * Although walreceiver interrupt handling doesn't use the same scheme as
158  * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
159  * any incoming signals on Win32.
160  */
162 
163  if (got_SIGTERM)
164  {
166  ereport(FATAL,
167  (errcode(ERRCODE_ADMIN_SHUTDOWN),
168  errmsg("terminating walreceiver process due to administrator command")));
169  }
170 }
static volatile bool WalRcvImmediateInterruptOK
Definition: walreceiver.c:130
int errcode(int sqlerrcode)
Definition: elog.c:575
#define FATAL
Definition: elog.h:52
#define ereport(elevel, rest)
Definition: elog.h:122
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
static volatile sig_atomic_t got_SIGTERM
Definition: walreceiver.c:99
static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1287 of file walreceiver.c.

References DEBUG2, elog, GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, log_min_messages, WalRcvData::mutex, pfree(), pstrdup(), SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

1288 {
1289  WalRcvData *walrcv = WalRcv;
1290 
1291  TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1292 
1293  /* Update shared-memory status */
1294  SpinLockAcquire(&walrcv->mutex);
1295  if (walrcv->latestWalEnd < walEnd)
1296  walrcv->latestWalEndTime = sendTime;
1297  walrcv->latestWalEnd = walEnd;
1298  walrcv->lastMsgSendTime = sendTime;
1299  walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1300  SpinLockRelease(&walrcv->mutex);
1301 
1302  if (log_min_messages <= DEBUG2)
1303  {
1304  char *sendtime;
1305  char *receipttime;
1306  int applyDelay;
1307 
1308  /* Copy because timestamptz_to_str returns a static buffer */
1309  sendtime = pstrdup(timestamptz_to_str(sendTime));
1310  receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1311  applyDelay = GetReplicationApplyDelay();
1312 
1313  /* apply delay is not available */
1314  if (applyDelay == -1)
1315  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1316  sendtime,
1317  receipttime,
1319  else
1320  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1321  sendtime,
1322  receipttime,
1323  applyDelay,
1325 
1326  pfree(sendtime);
1327  pfree(receipttime);
1328  }
1329 }
slock_t mutex
Definition: walreceiver.h:120
int GetReplicationTransferLatency(void)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1077
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:97
TimestampTz lastMsgSendTime
Definition: walreceiver.h:96
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pfree(void *pointer)
Definition: mcxt.c:950
#define DEBUG2
Definition: elog.h:24
int GetReplicationApplyDelay(void)
#define SpinLockRelease(lock)
Definition: spin.h:64
TimestampTz latestWalEndTime
Definition: walreceiver.h:103
int log_min_messages
Definition: guc.c:451
XLogRecPtr latestWalEnd
Definition: walreceiver.h:102
WalRcvData * WalRcv
#define elog
Definition: elog.h:219
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1710
static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 768 of file walreceiver.c.

References Assert, WalRcvData::latch, WalRcvData::mutex, MyProcPid, NULL, WalRcvData::pid, WalRcvData::ready_to_display, SpinLockAcquire, SpinLockRelease, WakeupRecovery(), WalRcv, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

769 {
770  WalRcvData *walrcv = WalRcv;
771 
772  /* Ensure that all WAL records received are flushed to disk */
773  XLogWalRcvFlush(true);
774 
775  walrcv->latch = NULL;
776 
777  SpinLockAcquire(&walrcv->mutex);
778  Assert(walrcv->walRcvState == WALRCV_STREAMING ||
779  walrcv->walRcvState == WALRCV_RESTARTING ||
780  walrcv->walRcvState == WALRCV_STARTING ||
781  walrcv->walRcvState == WALRCV_WAITING ||
782  walrcv->walRcvState == WALRCV_STOPPING);
783  Assert(walrcv->pid == MyProcPid);
784  walrcv->walRcvState = WALRCV_STOPPED;
785  walrcv->pid = 0;
786  walrcv->ready_to_display = false;
787  SpinLockRelease(&walrcv->mutex);
788 
789  /* Terminate the connection gracefully. */
790  if (wrconn != NULL)
792 
793  /* Wake up the startup process to notice promptly that we're gone */
794  WakeupRecovery();
795 }
int MyProcPid
Definition: globals.c:39
slock_t mutex
Definition: walreceiver.h:120
WalRcvState walRcvState
Definition: walreceiver.h:63
static WalReceiverConn * wrconn
Definition: walreceiver.c:79
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12102
pid_t pid
Definition: walreceiver.h:62
Latch * latch
Definition: walreceiver.h:135
#define SpinLockRelease(lock)
Definition: spin.h:64
bool ready_to_display
Definition: walreceiver.h:118
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
#define walrcv_disconnect(conn)
Definition: walreceiver.h:264
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1046
WalRcvData * WalRcv
static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 719 of file walreceiver.c.

References DisableWalRcvImmediateExit(), EnableWalRcvImmediateExit(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, existsTimeLineHistory(), LOG, MAXFNAMELEN, pfree(), TLHistoryFileName, walrcv_readtimelinehistoryfile, and writeTimeLineHistoryFile().

Referenced by WalReceiverMain().

720 {
721  TimeLineID tli;
722 
723  for (tli = first; tli <= last; tli++)
724  {
725  /* there's no history file for timeline 1 */
726  if (tli != 1 && !existsTimeLineHistory(tli))
727  {
728  char *fname;
729  char *content;
730  int len;
731  char expectedfname[MAXFNAMELEN];
732 
733  ereport(LOG,
734  (errmsg("fetching timeline history file for timeline %u from primary server",
735  tli)));
736 
738  walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
740 
741  /*
742  * Check that the filename on the master matches what we
743  * calculated ourselves. This is just a sanity check, it should
744  * always match.
745  */
746  TLHistoryFileName(expectedfname, tli);
747  if (strcmp(fname, expectedfname) != 0)
748  ereport(ERROR,
749  (errcode(ERRCODE_PROTOCOL_VIOLATION),
750  errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
751  tli)));
752 
753  /*
754  * Write the file to pg_wal.
755  */
756  writeTimeLineHistoryFile(tli, content, len);
757 
758  pfree(fname);
759  pfree(content);
760  }
761  }
762 }
uint32 TimeLineID
Definition: xlogdefs.h:45
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LOG
Definition: elog.h:26
static WalReceiverConn * wrconn
Definition: walreceiver.c:79
void pfree(void *pointer)
Definition: mcxt.c:950
#define ERROR
Definition: elog.h:43
#define TLHistoryFileName(fname, tli)
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition: timeline.c:448
static void DisableWalRcvImmediateExit(void)
Definition: walreceiver.c:180
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition: timeline.c:207
#define ereport(elevel, rest)
Definition: elog.h:122
#define MAXFNAMELEN
static void EnableWalRcvImmediateExit(void)
Definition: walreceiver.c:173
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
Definition: walreceiver.h:250
int errmsg(const char *fmt,...)
Definition: elog.c:797
void WalRcvForceReply ( void  )

Definition at line 1340 of file walreceiver.c.

References WalRcvData::force_reply, WalRcvData::latch, SetLatch(), and WalRcv.

Referenced by StartupXLOG(), and WaitForWALToBecomeAvailable().

1341 {
1342  WalRcv->force_reply = true;
1343  if (WalRcv->latch)
1344  SetLatch(WalRcv->latch);
1345 }
Latch * latch
Definition: walreceiver.h:135
bool force_reply
Definition: walreceiver.h:126
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
WalRcvData * WalRcv
static const char* WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1352 of file walreceiver.c.

References WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

1353 {
1354  switch (state)
1355  {
1356  case WALRCV_STOPPED:
1357  return "stopped";
1358  case WALRCV_STARTING:
1359  return "starting";
1360  case WALRCV_STREAMING:
1361  return "streaming";
1362  case WALRCV_WAITING:
1363  return "waiting";
1364  case WALRCV_RESTARTING:
1365  return "restarting";
1366  case WALRCV_STOPPING:
1367  return "stopping";
1368  }
1369  return "UNKNOWN";
1370 }
Definition: regguts.h:298
static void WalRcvQuickDieHandler ( SIGNAL_ARGS  )
static

Definition at line 841 of file walreceiver.c.

References BlockSig, on_exit_reset(), and PG_SETMASK.

Referenced by WalReceiverMain().

842 {
844 
845  /*
846  * We DO NOT want to run proc_exit() callbacks -- we're here because
847  * shared memory may be corrupted, so we don't want to try to clean up our
848  * transaction. Just nail the windows shut and get out of town. Now that
849  * there's an atexit callback to prevent third-party code from breaking
850  * things by calling exit() directly, we have to reset the callbacks
851  * explicitly to make this work as intended.
852  */
853  on_exit_reset();
854 
855  /*
856  * Note we do exit(2) not exit(0). This is to force the postmaster into a
857  * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
858  * backend. This is necessary precisely because we don't clean up our
859  * shared memory state. (The "dead man switch" mechanism in pmsignal.c
860  * should ensure the postmaster sees this as a crash, too, but no harm in
861  * being doubly sure.)
862  */
863  exit(2);
864 }
#define PG_SETMASK(mask)
Definition: pqsignal.h:19
void on_exit_reset(void)
Definition: ipc.c:396
sigset_t BlockSig
Definition: pqsignal.c:22
static void WalRcvShutdownHandler ( SIGNAL_ARGS  )
static

Definition at line 818 of file walreceiver.c.

References got_SIGTERM, WalRcvData::latch, proc_exit_inprogress, ProcessWalRcvInterrupts(), SetLatch(), WalRcv, and WalRcvImmediateInterruptOK.

Referenced by WalReceiverMain().

819 {
820  int save_errno = errno;
821 
822  got_SIGTERM = true;
823 
824  if (WalRcv->latch)
826 
827  /* Don't joggle the elbow of proc_exit */
830 
831  errno = save_errno;
832 }
static volatile bool WalRcvImmediateInterruptOK
Definition: walreceiver.c:130
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:154
Latch * latch
Definition: walreceiver.h:135
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
bool proc_exit_inprogress
Definition: ipc.c:40
WalRcvData * WalRcv
static volatile sig_atomic_t got_SIGTERM
Definition: walreceiver.c:99
static void WalRcvSigHupHandler ( SIGNAL_ARGS  )
static

Definition at line 799 of file walreceiver.c.

References got_SIGHUP.

Referenced by WalReceiverMain().

800 {
801  got_SIGHUP = true;
802 }
static volatile sig_atomic_t got_SIGHUP
Definition: walreceiver.c:98
static void WalRcvSigUsr1Handler ( SIGNAL_ARGS  )
static

Definition at line 807 of file walreceiver.c.

References latch_sigusr1_handler().

Referenced by WalReceiverMain().

808 {
809  int save_errno = errno;
810 
812 
813  errno = save_errno;
814 }
void latch_sigusr1_handler(void)
Definition: latch.c:1467
static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 634 of file walreceiver.c.

References Assert, elog, FATAL, InvalidXLogRecPtr, WalRcvData::latch, WalRcvData::mutex, PostmasterIsAlive(), proc_exit(), ProcessWalRcvInterrupts(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, ResetLatch(), set_ps_display(), snprintf(), SpinLockAcquire, SpinLockRelease, update_process_title, WAIT_EVENT_WAL_RECEIVER_WAIT_START, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WL_LATCH_SET, and WL_POSTMASTER_DEATH.

Referenced by WalReceiverMain().

635 {
636  WalRcvData *walrcv = WalRcv;
637  int state;
638 
639  SpinLockAcquire(&walrcv->mutex);
640  state = walrcv->walRcvState;
641  if (state != WALRCV_STREAMING)
642  {
643  SpinLockRelease(&walrcv->mutex);
644  if (state == WALRCV_STOPPING)
645  proc_exit(0);
646  else
647  elog(FATAL, "unexpected walreceiver state");
648  }
649  walrcv->walRcvState = WALRCV_WAITING;
651  walrcv->receiveStartTLI = 0;
652  SpinLockRelease(&walrcv->mutex);
653 
655  set_ps_display("idle", false);
656 
657  /*
658  * nudge startup process to notice that we've stopped streaming and are
659  * now waiting for instructions.
660  */
661  WakeupRecovery();
662  for (;;)
663  {
664  ResetLatch(walrcv->latch);
665 
666  /*
667  * Emergency bailout if postmaster has died. This is to avoid the
668  * necessity for manual cleanup of all postmaster children.
669  */
670  if (!PostmasterIsAlive())
671  exit(1);
672 
674 
675  SpinLockAcquire(&walrcv->mutex);
676  Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
677  walrcv->walRcvState == WALRCV_WAITING ||
678  walrcv->walRcvState == WALRCV_STOPPING);
679  if (walrcv->walRcvState == WALRCV_RESTARTING)
680  {
681  /* we don't expect primary_conninfo to change */
682  *startpoint = walrcv->receiveStart;
683  *startpointTLI = walrcv->receiveStartTLI;
684  walrcv->walRcvState = WALRCV_STREAMING;
685  SpinLockRelease(&walrcv->mutex);
686  break;
687  }
688  if (walrcv->walRcvState == WALRCV_STOPPING)
689  {
690  /*
691  * We should've received SIGTERM if the startup process wants us
692  * to die, but might as well check it here too.
693  */
694  SpinLockRelease(&walrcv->mutex);
695  exit(1);
696  }
697  SpinLockRelease(&walrcv->mutex);
698 
701  }
702 
704  {
705  char activitymsg[50];
706 
707  snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
708  (uint32) (*startpoint >> 32),
709  (uint32) *startpoint);
710  set_ps_display(activitymsg, false);
711  }
712 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
slock_t mutex
Definition: walreceiver.h:120
bool update_process_title
Definition: ps_status.c:35
WalRcvState walRcvState
Definition: walreceiver.h:63
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:154
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:326
void proc_exit(int code)
Definition: ipc.c:99
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void ResetLatch(volatile Latch *latch)
Definition: latch.c:498
#define SpinLockAcquire(lock)
Definition: spin.h:62
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:336
void WakeupRecovery(void)
Definition: xlog.c:12102
#define FATAL
Definition: elog.h:52
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
unsigned int uint32
Definition: c.h:268
TimeLineID receiveStartTLI
Definition: walreceiver.h:73
Latch * latch
Definition: walreceiver.h:135
#define SpinLockRelease(lock)
Definition: spin.h:64
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
#define Assert(condition)
Definition: c.h:675
Definition: regguts.h:298
WalRcvData * WalRcv
#define elog
Definition: elog.h:219
XLogRecPtr receiveStart
Definition: walreceiver.h:72
#define WL_LATCH_SET
Definition: latch.h:124
void WalReceiverMain ( void  )

Definition at line 188 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, Assert, BlockSig, buf, close, WalRcvData::conninfo, CurrentResourceOwner, DEBUG1, DisableWalRcvImmediateExit(), elog, EnableWalRcvImmediateExit(), ereport, errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, WalRcvData::force_reply, GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), got_SIGHUP, initStringInfo(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latch, WalRcvData::latestWalEndTime, load_file(), LOG, WalRcvStreamOptions::logical, LogstreamResult, MAXCONNINFO, MAXFNAMELEN, WalRcvData::mutex, MyProc, MyProcPid, NAMEDATALEN, NAPTIME_PER_CYCLE, now(), NULL, on_shmem_exit(), options, PANIC, pfree(), pg_memory_barrier, PG_SETMASK, PGC_SIGHUP, PGINVALID_SOCKET, WalRcvStreamOptions::physical, WalRcvData::pid, pqsignal(), proc_exit(), ProcessConfigFile(), ProcessWalRcvInterrupts(), PGPROC::procLatch, WalRcvStreamOptions::proto, WalRcvData::ready_to_display, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, ResetLatch(), ResourceOwnerCreate(), server_version, SIG_DFL, SIG_IGN, SIGALRM, SIGCHLD, SIGCONT, SIGHUP, SIGPIPE, SIGQUIT, SIGTTIN, SIGTTOU, SIGUSR1, SIGUSR2, SIGWINCH, WalRcvData::slotname, WalRcvStreamOptions::slotname, snprintf(), SpinLockAcquire, SpinLockRelease, WalRcvStreamOptions::startpoint, strlcpy(), ThisTimeLineID, TimestampTzPlusMilliseconds, UINT64_FORMAT, UnBlockSig, WAIT_EVENT_WAL_RECEIVER_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, WalRcv, walrcv_connect, walrcv_endstreaming, walrcv_get_conninfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvQuickDieHandler(), WalRcvShutdownHandler(), WalRcvSigHupHandler(), WalRcvSigUsr1Handler(), WalRcvData::walRcvState, WalRcvWaitForStartPosition(), WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_TIMEOUT, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName, XLogFileNameP(), XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by AuxiliaryProcessMain().

189 {
190  char conninfo[MAXCONNINFO];
191  char *tmp_conninfo;
192  char slotname[NAMEDATALEN];
193  XLogRecPtr startpoint;
194  TimeLineID startpointTLI;
195  TimeLineID primaryTLI;
196  bool first_stream;
197  WalRcvData *walrcv = WalRcv;
198  TimestampTz last_recv_timestamp;
199  bool ping_sent;
200  char *err;
201 
202  /*
203  * WalRcv should be set up already (if we are a backend, we inherit this
204  * by fork() or EXEC_BACKEND mechanism from the postmaster).
205  */
206  Assert(walrcv != NULL);
207 
208  /*
209  * Mark walreceiver as running in shared memory.
210  *
211  * Do this as early as possible, so that if we fail later on, we'll set
212  * state to STOPPED. If we die before this, the startup process will keep
213  * waiting for us to start up, until it times out.
214  */
215  SpinLockAcquire(&walrcv->mutex);
216  Assert(walrcv->pid == 0);
217  switch (walrcv->walRcvState)
218  {
219  case WALRCV_STOPPING:
220  /* If we've already been requested to stop, don't start up. */
221  walrcv->walRcvState = WALRCV_STOPPED;
222  /* fall through */
223 
224  case WALRCV_STOPPED:
225  SpinLockRelease(&walrcv->mutex);
226  proc_exit(1);
227  break;
228 
229  case WALRCV_STARTING:
230  /* The usual case */
231  break;
232 
233  case WALRCV_WAITING:
234  case WALRCV_STREAMING:
235  case WALRCV_RESTARTING:
236  default:
237  /* Shouldn't happen */
238  elog(PANIC, "walreceiver still running according to shared memory state");
239  }
240  /* Advertise our PID so that the startup process can kill us */
241  walrcv->pid = MyProcPid;
242  walrcv->walRcvState = WALRCV_STREAMING;
243 
244  /* Fetch information required to start streaming */
245  walrcv->ready_to_display = false;
246  strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
247  strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
248  startpoint = walrcv->receiveStart;
249  startpointTLI = walrcv->receiveStartTLI;
250 
251  /* Initialise to a sanish value */
253 
254  SpinLockRelease(&walrcv->mutex);
255 
256  /* Arrange to clean up at walreceiver exit */
258 
259  walrcv->latch = &MyProc->procLatch;
260 
261  /* Properly accept or ignore signals the postmaster might send us */
262  pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */
263  pqsignal(SIGINT, SIG_IGN);
264  pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */
265  pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */
270 
271  /* Reset some signals that are accepted by postmaster but not here */
277 
278  /* We allow SIGQUIT (quickdie) at all times */
279  sigdelset(&BlockSig, SIGQUIT);
280 
281  /* Load the libpq-specific functions */
282  load_file("libpqwalreceiver", false);
283  if (WalReceiverFunctions == NULL)
284  elog(ERROR, "libpqwalreceiver didn't initialize correctly");
285 
286  /*
287  * Create a resource owner to keep track of our resources (not clear that
288  * we need this, but may as well have one).
289  */
290  CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
291 
292  /* Unblock signals (they were blocked when the postmaster forked us) */
294 
295  /* Establish the connection to the primary for XLOG streaming */
297  wrconn = walrcv_connect(conninfo, false, "walreceiver", &err);
298  if (!wrconn)
299  ereport(ERROR,
300  (errmsg("could not connect to the primary server: %s", err)));
302 
303  /*
304  * Save user-visible connection string. This clobbers the original
305  * conninfo, for security.
306  */
307  tmp_conninfo = walrcv_get_conninfo(wrconn);
308  SpinLockAcquire(&walrcv->mutex);
309  memset(walrcv->conninfo, 0, MAXCONNINFO);
310  if (tmp_conninfo)
311  {
312  strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
313  pfree(tmp_conninfo);
314  }
315  walrcv->ready_to_display = true;
316  SpinLockRelease(&walrcv->mutex);
317 
318  first_stream = true;
319  for (;;)
320  {
321  char *primary_sysid;
322  char standby_sysid[32];
323  int server_version;
325 
326  /*
327  * Check that we're connected to a valid server using the
328  * IDENTIFY_SYSTEM replication command.
329  */
331  primary_sysid = walrcv_identify_system(wrconn, &primaryTLI,
332  &server_version);
333 
334  snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
336  if (strcmp(primary_sysid, standby_sysid) != 0)
337  {
338  ereport(ERROR,
339  (errmsg("database system identifier differs between the primary and standby"),
340  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
341  primary_sysid, standby_sysid)));
342  }
344 
345  /*
346  * Confirm that the current timeline of the primary is the same or
347  * ahead of ours.
348  */
349  if (primaryTLI < startpointTLI)
350  ereport(ERROR,
351  (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
352  primaryTLI, startpointTLI)));
353 
354  /*
355  * Get any missing history files. We do this always, even when we're
356  * not interested in that timeline, so that if we're promoted to
357  * become the master later on, we don't select the same timeline that
358  * was already used in the current master. This isn't bullet-proof -
359  * you'll need some external software to manage your cluster if you
360  * need to ensure that a unique timeline id is chosen in every case,
361  * but let's avoid the confusion of timeline id collisions where we
362  * can.
363  */
364  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
365 
366  /*
367  * Start streaming.
368  *
369  * We'll try to start at the requested starting point and timeline,
370  * even if it's different from the server's latest timeline. In case
371  * we've already reached the end of the old timeline, the server will
372  * finish the streaming immediately, and we will go back to await
373  * orders from the startup process. If recovery_target_timeline is
374  * 'latest', the startup process will scan pg_wal and find the new
375  * history file, bump recovery target timeline, and ask us to restart
376  * on the new timeline.
377  */
378  options.logical = false;
379  options.startpoint = startpoint;
380  options.slotname = slotname[0] != '\0' ? slotname : NULL;
381  options.proto.physical.startpointTLI = startpointTLI;
382  ThisTimeLineID = startpointTLI;
383  if (walrcv_startstreaming(wrconn, &options))
384  {
385  if (first_stream)
386  ereport(LOG,
387  (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
388  (uint32) (startpoint >> 32), (uint32) startpoint,
389  startpointTLI)));
390  else
391  ereport(LOG,
392  (errmsg("restarted WAL streaming at %X/%X on timeline %u",
393  (uint32) (startpoint >> 32), (uint32) startpoint,
394  startpointTLI)));
395  first_stream = false;
396 
397  /* Initialize LogstreamResult and buffers for processing messages */
401 
402  /* Initialize the last recv timestamp */
403  last_recv_timestamp = GetCurrentTimestamp();
404  ping_sent = false;
405 
406  /* Loop until end-of-streaming or error */
407  for (;;)
408  {
409  char *buf;
410  int len;
411  bool endofwal = false;
412  pgsocket wait_fd = PGINVALID_SOCKET;
413  int rc;
414 
415  /*
416  * Exit walreceiver if we're not in recovery. This should not
417  * happen, but cross-check the status here.
418  */
419  if (!RecoveryInProgress())
420  ereport(FATAL,
421  (errmsg("cannot continue WAL streaming, recovery has already ended")));
422 
423  /* Process any requests or signals received recently */
425 
426  if (got_SIGHUP)
427  {
428  got_SIGHUP = false;
431  }
432 
433  /* See if we can read data immediately */
434  len = walrcv_receive(wrconn, &buf, &wait_fd);
435  if (len != 0)
436  {
437  /*
438  * Process the received data, and any subsequent data we
439  * can read without blocking.
440  */
441  for (;;)
442  {
443  if (len > 0)
444  {
445  /*
446  * Something was received from master, so reset
447  * timeout
448  */
449  last_recv_timestamp = GetCurrentTimestamp();
450  ping_sent = false;
451  XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
452  }
453  else if (len == 0)
454  break;
455  else if (len < 0)
456  {
457  ereport(LOG,
458  (errmsg("replication terminated by primary server"),
459  errdetail("End of WAL reached on timeline %u at %X/%X.",
460  startpointTLI,
461  (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
462  endofwal = true;
463  break;
464  }
465  len = walrcv_receive(wrconn, &buf, &wait_fd);
466  }
467 
468  /* Let the master know that we received some data. */
469  XLogWalRcvSendReply(false, false);
470 
471  /*
472  * If we've written some records, flush them to disk and
473  * let the startup process and primary server know about
474  * them.
475  */
476  XLogWalRcvFlush(false);
477  }
478 
479  /* Check if we need to exit the streaming loop. */
480  if (endofwal)
481  break;
482 
483  /*
484  * Ideally we would reuse a WaitEventSet object repeatedly
485  * here to avoid the overheads of WaitLatchOrSocket on epoll
486  * systems, but we can't be sure that libpq (or any other
487  * walreceiver implementation) has the same socket (even if
488  * the fd is the same number, it may have been closed and
489  * reopened since the last time). In future, if there is a
490  * function for removing sockets from WaitEventSet, then we
491  * could add and remove just the socket each time, potentially
492  * avoiding some system calls.
493  */
494  Assert(wait_fd != PGINVALID_SOCKET);
495  rc = WaitLatchOrSocket(walrcv->latch,
498  wait_fd,
501  if (rc & WL_LATCH_SET)
502  {
503  ResetLatch(walrcv->latch);
504  if (walrcv->force_reply)
505  {
506  /*
507  * The recovery process has asked us to send apply
508  * feedback now. Make sure the flag is really set to
509  * false in shared memory before sending the reply, so
510  * we don't miss a new request for a reply.
511  */
512  walrcv->force_reply = false;
514  XLogWalRcvSendReply(true, false);
515  }
516  }
517  if (rc & WL_POSTMASTER_DEATH)
518  {
519  /*
520  * Emergency bailout if postmaster has died. This is to
521  * avoid the necessity for manual cleanup of all
522  * postmaster children.
523  */
524  exit(1);
525  }
526  if (rc & WL_TIMEOUT)
527  {
528  /*
529  * We didn't receive anything new. If we haven't heard
530  * anything from the server for more than
531  * wal_receiver_timeout / 2, ping the server. Also, if
532  * it's been longer than wal_receiver_status_interval
533  * since the last update we sent, send a status update to
534  * the master anyway, to report any progress in applying
535  * WAL.
536  */
537  bool requestReply = false;
538 
539  /*
540  * Check if time since last receive from standby has
541  * reached the configured limit.
542  */
543  if (wal_receiver_timeout > 0)
544  {
546  TimestampTz timeout;
547 
548  timeout =
549  TimestampTzPlusMilliseconds(last_recv_timestamp,
551 
552  if (now >= timeout)
553  ereport(ERROR,
554  (errmsg("terminating walreceiver due to timeout")));
555 
556  /*
557  * We didn't receive anything new, for half of
558  * receiver replication timeout. Ping the server.
559  */
560  if (!ping_sent)
561  {
562  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
563  (wal_receiver_timeout / 2));
564  if (now >= timeout)
565  {
566  requestReply = true;
567  ping_sent = true;
568  }
569  }
570  }
571 
572  XLogWalRcvSendReply(requestReply, requestReply);
574  }
575  }
576 
577  /*
578  * The backend finished streaming. Exit streaming COPY-mode from
579  * our side, too.
580  */
582  walrcv_endstreaming(wrconn, &primaryTLI);
584 
585  /*
586  * If the server had switched to a new timeline that we didn't
587  * know about when we began streaming, fetch its timeline history
588  * file now.
589  */
590  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
591  }
592  else
593  ereport(LOG,
594  (errmsg("primary server contains no more WAL on requested timeline %u",
595  startpointTLI)));
596 
597  /*
598  * End of WAL reached on the requested timeline. Close the last
599  * segment, and await for new orders from the startup process.
600  */
601  if (recvFile >= 0)
602  {
603  char xlogfname[MAXFNAMELEN];
604 
605  XLogWalRcvFlush(false);
606  if (close(recvFile) != 0)
607  ereport(PANIC,
609  errmsg("could not close log segment %s: %m",
611 
612  /*
613  * Create .done file forcibly to prevent the streamed segment from
614  * being archived later.
615  */
616  XLogFileName(xlogfname, recvFileTLI, recvSegNo);
618  XLogArchiveForceDone(xlogfname);
619  else
620  XLogArchiveNotify(xlogfname);
621  }
622  recvFile = -1;
623 
624  elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
625  WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
626  }
627  /* not reached */
628 }
static struct @26 LogstreamResult
#define SIGUSR1
Definition: win32.h:202
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
Definition: walreceiver.c:719
#define walrcv_get_conninfo(conn)
Definition: walreceiver.h:246
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:254
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:39
#define SIGCONT
Definition: win32.h:197
uint32 TimeLineID
Definition: xlogdefs.h:45
slock_t mutex
Definition: walreceiver.h:120
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
Definition: walreceiver.c:634
WalRcvState walRcvState
Definition: walreceiver.h:63
#define SIGWINCH
Definition: win32.h:201
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
static StringInfoData incoming_message
Definition: walreceiver.c:112
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:154
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1174
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:256
#define XLogFileName(fname, tli, logSegNo)
#define SIGTTIN
Definition: win32.h:199
static void WalRcvSigHupHandler(SIGNAL_ARGS)
Definition: walreceiver.c:799
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:252
void proc_exit(int code)
Definition: ipc.c:99
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define WL_SOCKET_READABLE
Definition: latch.h:125
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
Definition: walreceiver.c:870
void ResetLatch(volatile Latch *latch)
Definition: latch.c:498
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7878
#define PANIC
Definition: elog.h:53
#define NAPTIME_PER_CYCLE
Definition: walreceiver.c:82
#define SIGQUIT
Definition: win32.h:189
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:97
TimestampTz lastMsgSendTime
Definition: walreceiver.h:96
#define PG_SETMASK(mask)
Definition: pqsignal.h:19
int wal_receiver_timeout
Definition: walreceiver.c:75
Latch procLatch
Definition: proc.h:103
static WalReceiverConn * wrconn
Definition: walreceiver.c:79
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
static StringInfoData reply_message
Definition: walreceiver.c:111
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:512
void pfree(void *pointer)
Definition: mcxt.c:950
#define SIG_IGN
Definition: win32.h:185
#define ERROR
Definition: elog.h:43
static void WalRcvSigUsr1Handler(SIGNAL_ARGS)
Definition: walreceiver.c:807
#define FATAL
Definition: elog.h:52
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11094
pid_t pid
Definition: walreceiver.h:62
#define MAXCONNINFO
Definition: walreceiver.h:35
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
#define walrcv_identify_system(conn, primary_tli, server_version)
Definition: walreceiver.h:248
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10136
static char * buf
Definition: pg_test_fsync.c:66
int XLogArchiveMode
Definition: xlog.c:94
int errdetail(const char *fmt,...)
Definition: elog.c:873
int errcode_for_file_access(void)
Definition: elog.c:598
static void DisableWalRcvImmediateExit(void)
Definition: walreceiver.c:180
XLogRecPtr startpoint
Definition: walreceiver.h:145
unsigned int uint32
Definition: c.h:268
int pgsocket
Definition: port.h:22
sigset_t UnBlockSig
Definition: pqsignal.c:22
static volatile sig_atomic_t got_SIGHUP
Definition: walreceiver.c:98
#define ereport(elevel, rest)
Definition: elog.h:122
TimeLineID receiveStartTLI
Definition: walreceiver.h:73
static TimeLineID recvFileTLI
Definition: walreceiver.c:90
Definition: guc.h:72
Latch * latch
Definition: walreceiver.h:135
static char ** options
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
static XLogSegNo recvSegNo
Definition: walreceiver.c:91
bool force_reply
Definition: walreceiver.h:126
#define MAXFNAMELEN
union WalRcvStreamOptions::@97 proto
#define SpinLockRelease(lock)
Definition: spin.h:64
sigset_t BlockSig
Definition: pqsignal.c:22
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void EnableWalRcvImmediateExit(void)
Definition: walreceiver.c:173
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:562
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:80
#define PGINVALID_SOCKET
Definition: port.h:24
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1105
bool ready_to_display
Definition: walreceiver.h:118
TimestampTz latestWalEndTime
Definition: walreceiver.h:103
static void WalRcvShutdownHandler(SIGNAL_ARGS)
Definition: walreceiver.c:818
#define SIGPIPE
Definition: win32.h:193
#define SIGHUP
Definition: win32.h:188
TimeLineID ThisTimeLineID
Definition: xlog.c:179
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define SIG_DFL
Definition: win32.h:183
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define pg_memory_barrier()
Definition: atomics.h:148
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:137
static int server_version
Definition: pg_dumpall.c:82
static void WalRcvQuickDieHandler(SIGNAL_ARGS)
Definition: walreceiver.c:841
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1046
WalRcvData * WalRcv
#define SIGTTOU
Definition: win32.h:200
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4695
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void WalRcvDie(int code, Datum arg)
Definition: walreceiver.c:768
struct WalRcvStreamOptions::@97::@98 physical
#define elog
Definition: elog.h:219
#define close(a)
Definition: win32.h:12
XLogRecPtr receiveStart
Definition: walreceiver.h:72
static int recvFile
Definition: walreceiver.c:89
#define SIGCHLD
Definition: win32.h:198
#define WL_LATCH_SET
Definition: latch.h:124
#define SIGALRM
Definition: win32.h:194
#define UINT64_FORMAT
Definition: c.h:316
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
char slotname[NAMEDATALEN]
Definition: walreceiver.h:115
#define SIGUSR2
Definition: win32.h:203
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:109
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:242
static void XLogWalRcvFlush ( bool  dying)
static

Definition at line 1046 of file walreceiver.c.

References AllowCascadeReplication, issue_xlog_fsync(), WalRcvData::latestChunkStart, LogstreamResult, WalRcvData::mutex, WalRcvData::receivedTLI, WalRcvData::receivedUpto, recvFile, recvSegNo, set_ps_display(), snprintf(), SpinLockAcquire, SpinLockRelease, ThisTimeLineID, update_process_title, WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvWrite().

1047 {
1048  if (LogstreamResult.Flush < LogstreamResult.Write)
1049  {
1050  WalRcvData *walrcv = WalRcv;
1051 
1053 
1054  LogstreamResult.Flush = LogstreamResult.Write;
1055 
1056  /* Update shared-memory status */
1057  SpinLockAcquire(&walrcv->mutex);
1058  if (walrcv->receivedUpto < LogstreamResult.Flush)
1059  {
1060  walrcv->latestChunkStart = walrcv->receivedUpto;
1061  walrcv->receivedUpto = LogstreamResult.Flush;
1062  walrcv->receivedTLI = ThisTimeLineID;
1063  }
1064  SpinLockRelease(&walrcv->mutex);
1065 
1066  /* Signal the startup process and walsender that new WAL has arrived */
1067  WakeupRecovery();
1069  WalSndWakeup();
1070 
1071  /* Report XLOG streaming progress in PS display */
1073  {
1074  char activitymsg[50];
1075 
1076  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1077  (uint32) (LogstreamResult.Write >> 32),
1078  (uint32) LogstreamResult.Write);
1079  set_ps_display(activitymsg, false);
1080  }
1081 
1082  /* Also let the master know that we made some progress */
1083  if (!dying)
1084  {
1085  XLogWalRcvSendReply(false, false);
1086  XLogWalRcvSendHSFeedback(false);
1087  }
1088  }
1089 }
static struct @26 LogstreamResult
slock_t mutex
Definition: walreceiver.h:120
bool update_process_title
Definition: ps_status.c:35
void issue_xlog_fsync(int fd, XLogSegNo segno)
Definition: xlog.c:10093
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1174
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:326
TimeLineID receivedTLI
Definition: walreceiver.h:83
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12102
XLogRecPtr latestChunkStart
Definition: walreceiver.h:91
#define AllowCascadeReplication()
Definition: walreceiver.h:38
unsigned int uint32
Definition: c.h:268
XLogRecPtr receivedUpto
Definition: walreceiver.h:82
static XLogSegNo recvSegNo
Definition: walreceiver.c:91
#define SpinLockRelease(lock)
Definition: spin.h:64
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1105
TimeLineID ThisTimeLineID
Definition: xlog.c:179
WalRcvData * WalRcv
static int recvFile
Definition: walreceiver.c:89
void WalSndWakeup(void)
Definition: walsender.c:3004
static void XLogWalRcvProcessMsg ( unsigned char  type,
char *  buf,
Size  len 
)
static

Definition at line 870 of file walreceiver.c.

References appendBinaryStringInfo(), ereport, errcode(), errmsg_internal(), ERROR, pq_getmsgbyte(), pq_getmsgint64(), ProcessWalSndrMessage(), resetStringInfo(), XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

871 {
872  int hdrlen;
873  XLogRecPtr dataStart;
874  XLogRecPtr walEnd;
875  TimestampTz sendTime;
876  bool replyRequested;
877 
879 
880  switch (type)
881  {
882  case 'w': /* WAL records */
883  {
884  /* copy message to StringInfo */
885  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
886  if (len < hdrlen)
887  ereport(ERROR,
888  (errcode(ERRCODE_PROTOCOL_VIOLATION),
889  errmsg_internal("invalid WAL message received from primary")));
891 
892  /* read the fields */
893  dataStart = pq_getmsgint64(&incoming_message);
894  walEnd = pq_getmsgint64(&incoming_message);
895  sendTime = pq_getmsgint64(&incoming_message);
896  ProcessWalSndrMessage(walEnd, sendTime);
897 
898  buf += hdrlen;
899  len -= hdrlen;
900  XLogWalRcvWrite(buf, len, dataStart);
901  break;
902  }
903  case 'k': /* Keepalive */
904  {
905  /* copy message to StringInfo */
906  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
907  if (len != hdrlen)
908  ereport(ERROR,
909  (errcode(ERRCODE_PROTOCOL_VIOLATION),
910  errmsg_internal("invalid keepalive message received from primary")));
912 
913  /* read the fields */
914  walEnd = pq_getmsgint64(&incoming_message);
915  sendTime = pq_getmsgint64(&incoming_message);
916  replyRequested = pq_getmsgbyte(&incoming_message);
917 
918  ProcessWalSndrMessage(walEnd, sendTime);
919 
920  /* If the primary requested a reply, send one immediately */
921  if (replyRequested)
922  XLogWalRcvSendReply(true, false);
923  break;
924  }
925  default:
926  ereport(ERROR,
927  (errcode(ERRCODE_PROTOCOL_VIOLATION),
928  errmsg_internal("invalid replication message type %d",
929  type)));
930  }
931 }
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
Definition: walreceiver.c:1287
int64 TimestampTz
Definition: timestamp.h:39
static StringInfoData incoming_message
Definition: walreceiver.c:112
int errcode(int sqlerrcode)
Definition: elog.c:575
#define ERROR
Definition: elog.h:43
static char * buf
Definition: pg_test_fsync.c:66
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
#define ereport(elevel, rest)
Definition: elog.h:122
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1105
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:208
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
Definition: walreceiver.c:937
static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1174 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), GetNextXidAndEpoch(), GetOldestXmin(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), NULL, pq_sendbyte(), pq_sendint(), pq_sendint64(), PROCARRAY_FLAGS_DEFAULT, PROCARRAY_SLOTS_XMIN, ProcArrayGetReplicationSlotXmin(), resetStringInfo(), TimestampDifferenceExceeds(), TransactionIdIsValid, TransactionIdPrecedes(), wal_receiver_status_interval, and walrcv_send.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

1175 {
1176  TimestampTz now;
1177  TransactionId nextXid;
1178  uint32 xmin_epoch,
1179  catalog_xmin_epoch;
1180  TransactionId xmin,
1181  catalog_xmin;
1182  static TimestampTz sendTime = 0;
1183 
1184  /* initially true so we always send at least one feedback message */
1185  static bool master_has_standby_xmin = true;
1186 
1187  /*
1188  * If the user doesn't want status to be reported to the master, be sure
1189  * to exit before doing anything at all.
1190  */
1192  !master_has_standby_xmin)
1193  return;
1194 
1195  /* Get current timestamp. */
1196  now = GetCurrentTimestamp();
1197 
1198  if (!immed)
1199  {
1200  /*
1201  * Send feedback at most once per wal_receiver_status_interval.
1202  */
1203  if (!TimestampDifferenceExceeds(sendTime, now,
1205  return;
1206  sendTime = now;
1207  }
1208 
1209  /*
1210  * If Hot Standby is not yet accepting connections there is nothing to
1211  * send. Check this after the interval has expired to reduce number of
1212  * calls.
1213  *
1214  * Bailing out here also ensures that we don't send feedback until we've
1215  * read our own replication slot state, so we don't tell the master to
1216  * discard needed xmin or catalog_xmin from any slots that may exist on
1217  * this replica.
1218  */
1219  if (!HotStandbyActive())
1220  return;
1221 
1222  /*
1223  * Make the expensive call to get the oldest xmin once we are certain
1224  * everything else has been checked.
1225  */
1227  {
1228  TransactionId slot_xmin;
1229 
1230  /*
1231  * Usually GetOldestXmin() would include both global replication slot
1232  * xmin and catalog_xmin in its calculations, but we want to derive
1233  * separate values for each of those. So we ask for an xmin that
1234  * excludes the catalog_xmin.
1235  */
1236  xmin = GetOldestXmin(NULL,
1238 
1239  ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
1240 
1241  if (TransactionIdIsValid(slot_xmin) &&
1242  TransactionIdPrecedes(slot_xmin, xmin))
1243  xmin = slot_xmin;
1244  }
1245  else
1246  {
1247  xmin = InvalidTransactionId;
1248  catalog_xmin = InvalidTransactionId;
1249  }
1250 
1251  /*
1252  * Get epoch and adjust if nextXid and oldestXmin are different sides of
1253  * the epoch boundary.
1254  */
1255  GetNextXidAndEpoch(&nextXid, &xmin_epoch);
1256  catalog_xmin_epoch = xmin_epoch;
1257  if (nextXid < xmin)
1258  xmin_epoch--;
1259  if (nextXid < catalog_xmin)
1260  catalog_xmin_epoch--;
1261 
1262  elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1263  xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1264 
1265  /* Construct the message and send it. */
1267  pq_sendbyte(&reply_message, 'h');
1269  pq_sendint(&reply_message, xmin, 4);
1270  pq_sendint(&reply_message, xmin_epoch, 4);
1271  pq_sendint(&reply_message, catalog_xmin, 4);
1272  pq_sendint(&reply_message, catalog_xmin_epoch, 4);
1274  if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1275  master_has_standby_xmin = true;
1276  else
1277  master_has_standby_xmin = false;
1278 }
bool hot_standby_feedback
Definition: walreceiver.c:76
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
uint32 TransactionId
Definition: c.h:397
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
bool HotStandbyActive(void)
Definition: xlog.c:7934
int wal_receiver_status_interval
Definition: walreceiver.c:74
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1649
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:2985
static WalReceiverConn * wrconn
Definition: walreceiver.c:79
static StringInfoData reply_message
Definition: walreceiver.c:111
void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
Definition: xlog.c:8296
#define PROCARRAY_SLOTS_XMIN
Definition: procarray.h:37
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
unsigned int uint32
Definition: c.h:268
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define PROCARRAY_FLAGS_DEFAULT
Definition: procarray.h:50
TransactionId GetOldestXmin(Relation rel, int flags)
Definition: procarray.c:1314
#define NULL
Definition: c.h:229
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:258
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
#define elog
Definition: elog.h:219
#define TransactionIdIsValid(xid)
Definition: transam.h:41
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static void XLogWalRcvSendReply ( bool  force,
bool  requestReply 
)
static

Definition at line 1105 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), GetXLogReplayRecPtr(), StringInfoData::len, LogstreamResult, now(), NULL, pq_sendbyte(), pq_sendint64(), resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

1106 {
1107  static XLogRecPtr writePtr = 0;
1108  static XLogRecPtr flushPtr = 0;
1109  XLogRecPtr applyPtr;
1110  static TimestampTz sendTime = 0;
1111  TimestampTz now;
1112 
1113  /*
1114  * If the user doesn't want status to be reported to the master, be sure
1115  * to exit before doing anything at all.
1116  */
1117  if (!force && wal_receiver_status_interval <= 0)
1118  return;
1119 
1120  /* Get current timestamp. */
1121  now = GetCurrentTimestamp();
1122 
1123  /*
1124  * We can compare the write and flush positions to the last message we
1125  * sent without taking any lock, but the apply position requires a spin
1126  * lock, so we don't check that unless something else has changed or 10
1127  * seconds have passed. This means that the apply WAL location will
1128  * appear, from the master's point of view, to lag slightly, but since
1129  * this is only for reporting purposes and only on idle systems, that's
1130  * probably OK.
1131  */
1132  if (!force
1133  && writePtr == LogstreamResult.Write
1134  && flushPtr == LogstreamResult.Flush
1135  && !TimestampDifferenceExceeds(sendTime, now,
1137  return;
1138  sendTime = now;
1139 
1140  /* Construct a new message */
1141  writePtr = LogstreamResult.Write;
1142  flushPtr = LogstreamResult.Flush;
1143  applyPtr = GetXLogReplayRecPtr(NULL);
1144 
1146  pq_sendbyte(&reply_message, 'r');
1147  pq_sendint64(&reply_message, writePtr);
1148  pq_sendint64(&reply_message, flushPtr);
1149  pq_sendint64(&reply_message, applyPtr);
1151  pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1152 
1153  /* Send it */
1154  elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1155  (uint32) (writePtr >> 32), (uint32) writePtr,
1156  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1157  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1158  requestReply ? " (reply requested)" : "");
1159 
1161 }
static struct @26 LogstreamResult
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
int wal_receiver_status_interval
Definition: walreceiver.c:74
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1649
static WalReceiverConn * wrconn
Definition: walreceiver.c:79
static StringInfoData reply_message
Definition: walreceiver.c:111
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11094
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
unsigned int uint32
Definition: c.h:268
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:258
#define elog
Definition: elog.h:219
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static void XLogWalRcvWrite ( char *  buf,
Size  nbytes,
XLogRecPtr  recptr 
)
static

Definition at line 937 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, close, ereport, errcode_for_file_access(), errmsg(), LogstreamResult, MAXFNAMELEN, PANIC, recvFile, recvFileTLI, recvOff, recvSegNo, ThisTimeLineID, write, XLByteInSeg, XLByteToSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileInit(), XLogFileName, XLogFileNameP(), XLogSegSize, and XLogWalRcvFlush().

Referenced by XLogWalRcvProcessMsg().

938 {
939  int startoff;
940  int byteswritten;
941 
942  while (nbytes > 0)
943  {
944  int segbytes;
945 
946  if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo))
947  {
948  bool use_existent;
949 
950  /*
951  * fsync() and close current file before we switch to next one. We
952  * would otherwise have to reopen this file to fsync it later
953  */
954  if (recvFile >= 0)
955  {
956  char xlogfname[MAXFNAMELEN];
957 
958  XLogWalRcvFlush(false);
959 
960  /*
961  * XLOG segment files will be re-read by recovery in startup
962  * process soon, so we don't advise the OS to release cache
963  * pages associated with the file like XLogFileClose() does.
964  */
965  if (close(recvFile) != 0)
966  ereport(PANIC,
968  errmsg("could not close log segment %s: %m",
970 
971  /*
972  * Create .done file forcibly to prevent the streamed segment
973  * from being archived later.
974  */
975  XLogFileName(xlogfname, recvFileTLI, recvSegNo);
977  XLogArchiveForceDone(xlogfname);
978  else
979  XLogArchiveNotify(xlogfname);
980  }
981  recvFile = -1;
982 
983  /* Create/use new log file */
984  XLByteToSeg(recptr, recvSegNo);
985  use_existent = true;
986  recvFile = XLogFileInit(recvSegNo, &use_existent, true);
988  recvOff = 0;
989  }
990 
991  /* Calculate the start offset of the received logs */
992  startoff = recptr % XLogSegSize;
993 
994  if (startoff + nbytes > XLogSegSize)
995  segbytes = XLogSegSize - startoff;
996  else
997  segbytes = nbytes;
998 
999  /* Need to seek in the file? */
1000  if (recvOff != startoff)
1001  {
1002  if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
1003  ereport(PANIC,
1005  errmsg("could not seek in log segment %s to offset %u: %m",
1007  startoff)));
1008  recvOff = startoff;
1009  }
1010 
1011  /* OK to write the logs */
1012  errno = 0;
1013 
1014  byteswritten = write(recvFile, buf, segbytes);
1015  if (byteswritten <= 0)
1016  {
1017  /* if write didn't set errno, assume no disk space */
1018  if (errno == 0)
1019  errno = ENOSPC;
1020  ereport(PANIC,
1022  errmsg("could not write to log segment %s "
1023  "at offset %u, length %lu: %m",
1025  recvOff, (unsigned long) segbytes)));
1026  }
1027 
1028  /* Update state for write */
1029  recptr += byteswritten;
1030 
1031  recvOff += byteswritten;
1032  nbytes -= byteswritten;
1033  buf += byteswritten;
1034 
1035  LogstreamResult.Write = recptr;
1036  }
1037 }
#define XLogSegSize
Definition: xlog_internal.h:92
static struct @26 LogstreamResult
#define write(a, b, c)
Definition: win32.h:14
int XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
Definition: xlog.c:3153
#define XLogFileName(fname, tli, logSegNo)
#define PANIC
Definition: elog.h:53
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:512
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10136
static char * buf
Definition: pg_test_fsync.c:66
int XLogArchiveMode
Definition: xlog.c:94
int errcode_for_file_access(void)
Definition: elog.c:598
#define ereport(elevel, rest)
Definition: elog.h:122
static TimeLineID recvFileTLI
Definition: walreceiver.c:90
static XLogSegNo recvSegNo
Definition: walreceiver.c:91
#define MAXFNAMELEN
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:562
TimeLineID ThisTimeLineID
Definition: xlog.c:179
#define XLByteToSeg(xlrp, logSegNo)
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1046
int errmsg(const char *fmt,...)
Definition: elog.c:797
static uint32 recvOff
Definition: walreceiver.c:92
#define close(a)
Definition: win32.h:12
static int recvFile
Definition: walreceiver.c:89
#define XLByteInSeg(xlrp, logSegNo)

Variable Documentation

XLogRecPtr Flush

Definition at line 108 of file walreceiver.c.

volatile sig_atomic_t got_SIGHUP = false
static

Definition at line 98 of file walreceiver.c.

Referenced by WalRcvSigHupHandler(), and WalReceiverMain().

volatile sig_atomic_t got_SIGTERM = false
static

Definition at line 99 of file walreceiver.c.

Referenced by ProcessWalRcvInterrupts(), and WalRcvShutdownHandler().

bool hot_standby_feedback

Definition at line 76 of file walreceiver.c.

Referenced by XLogWalRcvSendHSFeedback().

StringInfoData incoming_message
static

Definition at line 112 of file walreceiver.c.

struct { ... } LogstreamResult
int recvFile = -1
static

Definition at line 89 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

TimeLineID recvFileTLI = 0
static

Definition at line 90 of file walreceiver.c.

Referenced by WalReceiverMain(), and XLogWalRcvWrite().

uint32 recvOff = 0
static

Definition at line 92 of file walreceiver.c.

Referenced by XLogWalRcvWrite().

XLogSegNo recvSegNo = 0
static

Definition at line 91 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

StringInfoData reply_message
static

Definition at line 111 of file walreceiver.c.

Referenced by send_feedback().

int wal_receiver_status_interval

Definition at line 74 of file walreceiver.c.

Referenced by send_feedback(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

int wal_receiver_timeout

Definition at line 75 of file walreceiver.c.

Referenced by logicalrep_worker_launch(), LogicalRepApplyLoop(), and WalReceiverMain().

volatile bool WalRcvImmediateInterruptOK = false
static
WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 80 of file walreceiver.c.

Referenced by _PG_init().

WalReceiverConn* wrconn = NULL
static

Definition at line 79 of file walreceiver.c.

XLogRecPtr Write

Definition at line 107 of file walreceiver.c.