PostgreSQL Source Code  git master
walreceiver.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "common/ip.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */
 

Functions

static void ProcessWalRcvInterrupts (void)
 
static void EnableWalRcvImmediateExit (void)
 
static void DisableWalRcvImmediateExit (void)
 
static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr)
 
static void XLogWalRcvFlush (bool dying)
 
static void XLogWalRcvSendReply (bool force, bool requestReply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
static void WalRcvSigHupHandler (SIGNAL_ARGS)
 
static void WalRcvSigUsr1Handler (SIGNAL_ARGS)
 
static void WalRcvShutdownHandler (SIGNAL_ARGS)
 
static void WalRcvQuickDieHandler (SIGNAL_ARGS)
 
void WalReceiverMain (void)
 
void WalRcvForceReply (void)
 
static const char * WalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
static uint32 recvOff = 0
 
static volatile sig_atomic_t got_SIGHUP = false
 
static volatile sig_atomic_t got_SIGTERM = false
 
struct {
   XLogRecPtr   Write
 
   XLogRecPtr   Flush
 
LogstreamResult
 
static StringInfoData reply_message
 
static StringInfoData incoming_message
 
static volatile bool WalRcvImmediateInterruptOK = false
 

Macro Definition Documentation

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */

Definition at line 83 of file walreceiver.c.

Referenced by WalReceiverMain().

Function Documentation

◆ DisableWalRcvImmediateExit()

static void DisableWalRcvImmediateExit ( void  )
static

Definition at line 181 of file walreceiver.c.

References ProcessWalRcvInterrupts(), and WalRcvImmediateInterruptOK.

Referenced by WalRcvFetchTimeLineHistoryFiles(), and WalReceiverMain().

182 {
185 }
static volatile bool WalRcvImmediateInterruptOK
Definition: walreceiver.c:131
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:155

◆ EnableWalRcvImmediateExit()

static void EnableWalRcvImmediateExit ( void  )
static

Definition at line 174 of file walreceiver.c.

References ProcessWalRcvInterrupts(), and WalRcvImmediateInterruptOK.

Referenced by WalRcvFetchTimeLineHistoryFiles(), and WalReceiverMain().

175 {
178 }
static volatile bool WalRcvImmediateInterruptOK
Definition: walreceiver.c:131
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:155

◆ pg_stat_get_wal_receiver()

Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1403 of file walreceiver.c.

References WalRcvData::conninfo, CStringGetTextDatum, elog, ERROR, get_call_result_type(), GetUserId(), heap_form_tuple(), HeapTupleGetDatum, Int32GetDatum, is_member_of_role(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum, MAXCONNINFO, MemSet, WalRcvData::mutex, NAMEDATALEN, tupleDesc::natts, NI_MAXHOST, palloc0(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receivedUpto, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::sender_host, WalRcvData::sender_port, WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, strlcpy(), TimestampTzGetDatum, TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, and XLogRecPtrIsInvalid.

1404 {
1405  TupleDesc tupdesc;
1406  Datum *values;
1407  bool *nulls;
1408  int pid;
1409  bool ready_to_display;
1411  XLogRecPtr receive_start_lsn;
1412  TimeLineID receive_start_tli;
1413  XLogRecPtr received_lsn;
1414  TimeLineID received_tli;
1415  TimestampTz last_send_time;
1416  TimestampTz last_receipt_time;
1417  XLogRecPtr latest_end_lsn;
1418  TimestampTz latest_end_time;
1419  char sender_host[NI_MAXHOST];
1420  int sender_port = 0;
1421  char slotname[NAMEDATALEN];
1422  char conninfo[MAXCONNINFO];
1423 
1424  /* Take a lock to ensure value consistency */
1426  pid = (int) WalRcv->pid;
1427  ready_to_display = WalRcv->ready_to_display;
1428  state = WalRcv->walRcvState;
1429  receive_start_lsn = WalRcv->receiveStart;
1430  receive_start_tli = WalRcv->receiveStartTLI;
1431  received_lsn = WalRcv->receivedUpto;
1432  received_tli = WalRcv->receivedTLI;
1433  last_send_time = WalRcv->lastMsgSendTime;
1434  last_receipt_time = WalRcv->lastMsgReceiptTime;
1435  latest_end_lsn = WalRcv->latestWalEnd;
1436  latest_end_time = WalRcv->latestWalEndTime;
1437  strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1438  strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
1439  sender_port = WalRcv->sender_port;
1440  strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1442 
1443  /*
1444  * No WAL receiver (or not ready yet), just return a tuple with NULL
1445  * values
1446  */
1447  if (pid == 0 || !ready_to_display)
1448  PG_RETURN_NULL();
1449 
1450  /* determine result type */
1451  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1452  elog(ERROR, "return type must be a row type");
1453 
1454  values = palloc0(sizeof(Datum) * tupdesc->natts);
1455  nulls = palloc0(sizeof(bool) * tupdesc->natts);
1456 
1457  /* Fetch values */
1458  values[0] = Int32GetDatum(pid);
1459 
1460  if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
1461  {
1462  /*
1463  * Only superusers and members of pg_read_all_stats can see details.
1464  * Other users only get the pid value to know whether it is a WAL
1465  * receiver, but no details.
1466  */
1467  MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1468  }
1469  else
1470  {
1471  values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1472 
1473  if (XLogRecPtrIsInvalid(receive_start_lsn))
1474  nulls[2] = true;
1475  else
1476  values[2] = LSNGetDatum(receive_start_lsn);
1477  values[3] = Int32GetDatum(receive_start_tli);
1478  if (XLogRecPtrIsInvalid(received_lsn))
1479  nulls[4] = true;
1480  else
1481  values[4] = LSNGetDatum(received_lsn);
1482  values[5] = Int32GetDatum(received_tli);
1483  if (last_send_time == 0)
1484  nulls[6] = true;
1485  else
1486  values[6] = TimestampTzGetDatum(last_send_time);
1487  if (last_receipt_time == 0)
1488  nulls[7] = true;
1489  else
1490  values[7] = TimestampTzGetDatum(last_receipt_time);
1491  if (XLogRecPtrIsInvalid(latest_end_lsn))
1492  nulls[8] = true;
1493  else
1494  values[8] = LSNGetDatum(latest_end_lsn);
1495  if (latest_end_time == 0)
1496  nulls[9] = true;
1497  else
1498  values[9] = TimestampTzGetDatum(latest_end_time);
1499  if (*slotname == '\0')
1500  nulls[10] = true;
1501  else
1502  values[10] = CStringGetTextDatum(slotname);
1503  if (*sender_host == '\0')
1504  nulls[11] = true;
1505  else
1506  values[11] = CStringGetTextDatum(sender_host);
1507  if (sender_port == 0)
1508  nulls[12] = true;
1509  else
1510  values[12] = Int32GetDatum(sender_port);
1511  if (*conninfo == '\0')
1512  nulls[13] = true;
1513  else
1514  values[13] = CStringGetTextDatum(conninfo);
1515  }
1516 
1517  /* Returns the record as Datum */
1518  PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1519 }
int sender_port
Definition: walreceiver.h:117
static const char * WalRcvGetStateString(WalRcvState state)
Definition: walreceiver.c:1378
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
uint32 TimeLineID
Definition: xlogdefs.h:45
slock_t mutex
Definition: walreceiver.h:137
Oid GetUserId(void)
Definition: miscinit.c:379
int64 TimestampTz
Definition: timestamp.h:39
WalRcvState walRcvState
Definition: walreceiver.h:64
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
TimeLineID receivedTLI
Definition: walreceiver.h:84
#define MemSet(start, val, len)
Definition: c.h:908
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1074
int natts
Definition: tupdesc.h:82
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:98
TimestampTz lastMsgSendTime
Definition: walreceiver.h:97
WalRcvState
Definition: walreceiver.h:44
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define NI_MAXHOST
Definition: getaddrinfo.h:88
#define ERROR
Definition: elog.h:43
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
pid_t pid
Definition: walreceiver.h:63
#define MAXCONNINFO
Definition: walreceiver.h:36
XLogRecPtr receivedUpto
Definition: walreceiver.h:83
TimeLineID receiveStartTLI
Definition: walreceiver.h:74
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
void * palloc0(Size size)
Definition: mcxt.c:955
uintptr_t Datum
Definition: postgres.h:367
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:318
bool ready_to_display
Definition: walreceiver.h:126
TimestampTz latestWalEndTime
Definition: walreceiver.h:104
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4857
XLogRecPtr latestWalEnd
Definition: walreceiver.h:103
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:231
WalRcvData * WalRcv
static Datum values[MAXATTR]
Definition: bootstrap.c:164
#define Int32GetDatum(X)
Definition: postgres.h:464
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:116
#define CStringGetTextDatum(s)
Definition: builtins.h:95
#define elog
Definition: elog.h:219
XLogRecPtr receiveStart
Definition: walreceiver.h:73
#define PG_RETURN_NULL()
Definition: fmgr.h:310
char slotname[NAMEDATALEN]
Definition: walreceiver.h:123
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:110

◆ ProcessWalRcvInterrupts()

static void ProcessWalRcvInterrupts ( void  )
static

Definition at line 155 of file walreceiver.c.

References CHECK_FOR_INTERRUPTS, ereport, errcode(), errmsg(), FATAL, got_SIGTERM, and WalRcvImmediateInterruptOK.

Referenced by DisableWalRcvImmediateExit(), EnableWalRcvImmediateExit(), WalRcvShutdownHandler(), WalRcvWaitForStartPosition(), and WalReceiverMain().

156 {
157  /*
158  * Although walreceiver interrupt handling doesn't use the same scheme as
159  * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
160  * any incoming signals on Win32.
161  */
163 
164  if (got_SIGTERM)
165  {
167  ereport(FATAL,
168  (errcode(ERRCODE_ADMIN_SHUTDOWN),
169  errmsg("terminating walreceiver process due to administrator command")));
170  }
171 }
static volatile bool WalRcvImmediateInterruptOK
Definition: walreceiver.c:131
int errcode(int sqlerrcode)
Definition: elog.c:575
#define FATAL
Definition: elog.h:52
#define ereport(elevel, rest)
Definition: elog.h:122
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
static volatile sig_atomic_t got_SIGTERM
Definition: walreceiver.c:100

◆ ProcessWalSndrMessage()

static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1307 of file walreceiver.c.

References DEBUG2, elog, GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, log_min_messages, WalRcvData::mutex, pfree(), pstrdup(), SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

1308 {
1309  WalRcvData *walrcv = WalRcv;
1310 
1311  TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1312 
1313  /* Update shared-memory status */
1314  SpinLockAcquire(&walrcv->mutex);
1315  if (walrcv->latestWalEnd < walEnd)
1316  walrcv->latestWalEndTime = sendTime;
1317  walrcv->latestWalEnd = walEnd;
1318  walrcv->lastMsgSendTime = sendTime;
1319  walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1320  SpinLockRelease(&walrcv->mutex);
1321 
1322  if (log_min_messages <= DEBUG2)
1323  {
1324  char *sendtime;
1325  char *receipttime;
1326  int applyDelay;
1327 
1328  /* Copy because timestamptz_to_str returns a static buffer */
1329  sendtime = pstrdup(timestamptz_to_str(sendTime));
1330  receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1331  applyDelay = GetReplicationApplyDelay();
1332 
1333  /* apply delay is not available */
1334  if (applyDelay == -1)
1335  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1336  sendtime,
1337  receipttime,
1339  else
1340  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1341  sendtime,
1342  receipttime,
1343  applyDelay,
1345 
1346  pfree(sendtime);
1347  pfree(receipttime);
1348  }
1349 }
slock_t mutex
Definition: walreceiver.h:137
int GetReplicationTransferLatency(void)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1161
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:98
TimestampTz lastMsgSendTime
Definition: walreceiver.h:97
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pfree(void *pointer)
Definition: mcxt.c:1031
#define DEBUG2
Definition: elog.h:24
int GetReplicationApplyDelay(void)
#define SpinLockRelease(lock)
Definition: spin.h:64
TimestampTz latestWalEndTime
Definition: walreceiver.h:104
int log_min_messages
Definition: guc.c:454
XLogRecPtr latestWalEnd
Definition: walreceiver.h:103
WalRcvData * WalRcv
#define elog
Definition: elog.h:219
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1710

◆ WalRcvDie()

static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 788 of file walreceiver.c.

References Assert, WalRcvData::latch, WalRcvData::mutex, MyProcPid, WalRcvData::pid, WalRcvData::ready_to_display, SpinLockAcquire, SpinLockRelease, WakeupRecovery(), WalRcv, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

789 {
790  WalRcvData *walrcv = WalRcv;
791 
792  /* Ensure that all WAL records received are flushed to disk */
793  XLogWalRcvFlush(true);
794 
795  /* Mark ourselves inactive in shared memory */
796  SpinLockAcquire(&walrcv->mutex);
797  Assert(walrcv->walRcvState == WALRCV_STREAMING ||
798  walrcv->walRcvState == WALRCV_RESTARTING ||
799  walrcv->walRcvState == WALRCV_STARTING ||
800  walrcv->walRcvState == WALRCV_WAITING ||
801  walrcv->walRcvState == WALRCV_STOPPING);
802  Assert(walrcv->pid == MyProcPid);
803  walrcv->walRcvState = WALRCV_STOPPED;
804  walrcv->pid = 0;
805  walrcv->ready_to_display = false;
806  walrcv->latch = NULL;
807  SpinLockRelease(&walrcv->mutex);
808 
809  /* Terminate the connection gracefully. */
810  if (wrconn != NULL)
812 
813  /* Wake up the startup process to notice promptly that we're gone */
814  WakeupRecovery();
815 }
int MyProcPid
Definition: globals.c:40
slock_t mutex
Definition: walreceiver.h:137
WalRcvState walRcvState
Definition: walreceiver.h:64
static WalReceiverConn * wrconn
Definition: walreceiver.c:80
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12284
pid_t pid
Definition: walreceiver.h:63
Latch * latch
Definition: walreceiver.h:135
#define SpinLockRelease(lock)
Definition: spin.h:64
bool ready_to_display
Definition: walreceiver.h:126
#define Assert(condition)
Definition: c.h:699
#define walrcv_disconnect(conn)
Definition: walreceiver.h:279
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1066
WalRcvData * WalRcv

◆ WalRcvFetchTimeLineHistoryFiles()

static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 739 of file walreceiver.c.

References DisableWalRcvImmediateExit(), EnableWalRcvImmediateExit(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, existsTimeLineHistory(), LOG, MAXFNAMELEN, pfree(), TLHistoryFileName, walrcv_readtimelinehistoryfile, and writeTimeLineHistoryFile().

Referenced by WalReceiverMain().

740 {
741  TimeLineID tli;
742 
743  for (tli = first; tli <= last; tli++)
744  {
745  /* there's no history file for timeline 1 */
746  if (tli != 1 && !existsTimeLineHistory(tli))
747  {
748  char *fname;
749  char *content;
750  int len;
751  char expectedfname[MAXFNAMELEN];
752 
753  ereport(LOG,
754  (errmsg("fetching timeline history file for timeline %u from primary server",
755  tli)));
756 
758  walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
760 
761  /*
762  * Check that the filename on the master matches what we
763  * calculated ourselves. This is just a sanity check, it should
764  * always match.
765  */
766  TLHistoryFileName(expectedfname, tli);
767  if (strcmp(fname, expectedfname) != 0)
768  ereport(ERROR,
769  (errcode(ERRCODE_PROTOCOL_VIOLATION),
770  errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
771  tli)));
772 
773  /*
774  * Write the file to pg_wal.
775  */
776  writeTimeLineHistoryFile(tli, content, len);
777 
778  pfree(fname);
779  pfree(content);
780  }
781  }
782 }
uint32 TimeLineID
Definition: xlogdefs.h:45
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LOG
Definition: elog.h:26
static WalReceiverConn * wrconn
Definition: walreceiver.c:80
void pfree(void *pointer)
Definition: mcxt.c:1031
#define ERROR
Definition: elog.h:43
#define TLHistoryFileName(fname, tli)
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition: timeline.c:447
static void DisableWalRcvImmediateExit(void)
Definition: walreceiver.c:181
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition: timeline.c:207
#define ereport(elevel, rest)
Definition: elog.h:122
#define MAXFNAMELEN
static void EnableWalRcvImmediateExit(void)
Definition: walreceiver.c:174
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
Definition: walreceiver.h:265
int errmsg(const char *fmt,...)
Definition: elog.c:797

◆ WalRcvForceReply()

void WalRcvForceReply ( void  )

Definition at line 1360 of file walreceiver.c.

References WalRcvData::force_reply, WalRcvData::latch, WalRcvData::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by StartupXLOG(), WaitForWALToBecomeAvailable(), and walrcv_clear_result().

1361 {
1362  Latch *latch;
1363 
1364  WalRcv->force_reply = true;
1365  /* fetching the latch pointer might not be atomic, so use spinlock */
1367  latch = WalRcv->latch;
1369  if (latch)
1370  SetLatch(latch);
1371 }
slock_t mutex
Definition: walreceiver.h:137
sig_atomic_t force_reply
Definition: walreceiver.h:144
#define SpinLockAcquire(lock)
Definition: spin.h:62
Definition: latch.h:110
Latch * latch
Definition: walreceiver.h:135
#define SpinLockRelease(lock)
Definition: spin.h:64
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
WalRcvData * WalRcv

◆ WalRcvGetStateString()

static const char* WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1378 of file walreceiver.c.

References WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

1379 {
1380  switch (state)
1381  {
1382  case WALRCV_STOPPED:
1383  return "stopped";
1384  case WALRCV_STARTING:
1385  return "starting";
1386  case WALRCV_STREAMING:
1387  return "streaming";
1388  case WALRCV_WAITING:
1389  return "waiting";
1390  case WALRCV_RESTARTING:
1391  return "restarting";
1392  case WALRCV_STOPPING:
1393  return "stopping";
1394  }
1395  return "UNKNOWN";
1396 }
Definition: regguts.h:298

◆ WalRcvQuickDieHandler()

static void WalRcvQuickDieHandler ( SIGNAL_ARGS  )
static

Definition at line 861 of file walreceiver.c.

References BlockSig, on_exit_reset(), and PG_SETMASK.

Referenced by WalReceiverMain().

862 {
864 
865  /*
866  * We DO NOT want to run proc_exit() callbacks -- we're here because
867  * shared memory may be corrupted, so we don't want to try to clean up our
868  * transaction. Just nail the windows shut and get out of town. Now that
869  * there's an atexit callback to prevent third-party code from breaking
870  * things by calling exit() directly, we have to reset the callbacks
871  * explicitly to make this work as intended.
872  */
873  on_exit_reset();
874 
875  /*
876  * Note we do exit(2) not exit(0). This is to force the postmaster into a
877  * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
878  * backend. This is necessary precisely because we don't clean up our
879  * shared memory state. (The "dead man switch" mechanism in pmsignal.c
880  * should ensure the postmaster sees this as a crash, too, but no harm in
881  * being doubly sure.)
882  */
883  exit(2);
884 }
#define PG_SETMASK(mask)
Definition: pqsignal.h:19
void on_exit_reset(void)
Definition: ipc.c:407
sigset_t BlockSig
Definition: pqsignal.c:22

◆ WalRcvShutdownHandler()

static void WalRcvShutdownHandler ( SIGNAL_ARGS  )
static

Definition at line 838 of file walreceiver.c.

References got_SIGTERM, WalRcvData::latch, proc_exit_inprogress, ProcessWalRcvInterrupts(), SetLatch(), WalRcv, and WalRcvImmediateInterruptOK.

Referenced by WalReceiverMain().

839 {
840  int save_errno = errno;
841 
842  got_SIGTERM = true;
843 
844  if (WalRcv->latch)
846 
847  /* Don't joggle the elbow of proc_exit */
850 
851  errno = save_errno;
852 }
static volatile bool WalRcvImmediateInterruptOK
Definition: walreceiver.c:131
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:155
Latch * latch
Definition: walreceiver.h:135
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
bool proc_exit_inprogress
Definition: ipc.c:40
WalRcvData * WalRcv
static volatile sig_atomic_t got_SIGTERM
Definition: walreceiver.c:100

◆ WalRcvSigHupHandler()

static void WalRcvSigHupHandler ( SIGNAL_ARGS  )
static

Definition at line 819 of file walreceiver.c.

References got_SIGHUP.

Referenced by WalReceiverMain().

820 {
821  got_SIGHUP = true;
822 }
static volatile sig_atomic_t got_SIGHUP
Definition: walreceiver.c:99

◆ WalRcvSigUsr1Handler()

static void WalRcvSigUsr1Handler ( SIGNAL_ARGS  )
static

Definition at line 827 of file walreceiver.c.

References latch_sigusr1_handler().

Referenced by WalReceiverMain().

828 {
829  int save_errno = errno;
830 
832 
833  errno = save_errno;
834 }
void latch_sigusr1_handler(void)
Definition: latch.c:1473

◆ WalRcvWaitForStartPosition()

static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 654 of file walreceiver.c.

References Assert, elog, FATAL, InvalidXLogRecPtr, WalRcvData::latch, WalRcvData::mutex, PostmasterIsAlive(), proc_exit(), ProcessWalRcvInterrupts(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, ResetLatch(), set_ps_display(), snprintf(), SpinLockAcquire, SpinLockRelease, update_process_title, WAIT_EVENT_WAL_RECEIVER_WAIT_START, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WL_LATCH_SET, and WL_POSTMASTER_DEATH.

Referenced by WalReceiverMain().

655 {
656  WalRcvData *walrcv = WalRcv;
657  int state;
658 
659  SpinLockAcquire(&walrcv->mutex);
660  state = walrcv->walRcvState;
661  if (state != WALRCV_STREAMING)
662  {
663  SpinLockRelease(&walrcv->mutex);
664  if (state == WALRCV_STOPPING)
665  proc_exit(0);
666  else
667  elog(FATAL, "unexpected walreceiver state");
668  }
669  walrcv->walRcvState = WALRCV_WAITING;
671  walrcv->receiveStartTLI = 0;
672  SpinLockRelease(&walrcv->mutex);
673 
675  set_ps_display("idle", false);
676 
677  /*
678  * nudge startup process to notice that we've stopped streaming and are
679  * now waiting for instructions.
680  */
681  WakeupRecovery();
682  for (;;)
683  {
684  ResetLatch(walrcv->latch);
685 
686  /*
687  * Emergency bailout if postmaster has died. This is to avoid the
688  * necessity for manual cleanup of all postmaster children.
689  */
690  if (!PostmasterIsAlive())
691  exit(1);
692 
694 
695  SpinLockAcquire(&walrcv->mutex);
696  Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
697  walrcv->walRcvState == WALRCV_WAITING ||
698  walrcv->walRcvState == WALRCV_STOPPING);
699  if (walrcv->walRcvState == WALRCV_RESTARTING)
700  {
701  /* we don't expect primary_conninfo to change */
702  *startpoint = walrcv->receiveStart;
703  *startpointTLI = walrcv->receiveStartTLI;
704  walrcv->walRcvState = WALRCV_STREAMING;
705  SpinLockRelease(&walrcv->mutex);
706  break;
707  }
708  if (walrcv->walRcvState == WALRCV_STOPPING)
709  {
710  /*
711  * We should've received SIGTERM if the startup process wants us
712  * to die, but might as well check it here too.
713  */
714  SpinLockRelease(&walrcv->mutex);
715  exit(1);
716  }
717  SpinLockRelease(&walrcv->mutex);
718 
721  }
722 
724  {
725  char activitymsg[50];
726 
727  snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
728  (uint32) (*startpoint >> 32),
729  (uint32) *startpoint);
730  set_ps_display(activitymsg, false);
731  }
732 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
slock_t mutex
Definition: walreceiver.h:137
bool update_process_title
Definition: ps_status.c:35
WalRcvState walRcvState
Definition: walreceiver.h:64
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:155
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:326
void proc_exit(int code)
Definition: ipc.c:104
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void ResetLatch(volatile Latch *latch)
Definition: latch.c:497
#define SpinLockAcquire(lock)
Definition: spin.h:62
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:336
void WakeupRecovery(void)
Definition: xlog.c:12284
#define FATAL
Definition: elog.h:52
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
unsigned int uint32
Definition: c.h:325
TimeLineID receiveStartTLI
Definition: walreceiver.h:74
Latch * latch
Definition: walreceiver.h:135
#define SpinLockRelease(lock)
Definition: spin.h:64
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
#define Assert(condition)
Definition: c.h:699
Definition: regguts.h:298
WalRcvData * WalRcv
#define elog
Definition: elog.h:219
XLogRecPtr receiveStart
Definition: walreceiver.h:73
#define WL_LATCH_SET
Definition: latch.h:124

◆ WalReceiverMain()

void WalReceiverMain ( void  )

Definition at line 189 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, Assert, BlockSig, buf, close, WalRcvData::conninfo, CurrentResourceOwner, DEBUG1, DisableWalRcvImmediateExit(), elog, EnableWalRcvImmediateExit(), ereport, errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, WalRcvData::force_reply, GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), got_SIGHUP, initStringInfo(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latch, WalRcvData::latestWalEndTime, load_file(), LOG, WalRcvStreamOptions::logical, LogstreamResult, MAXCONNINFO, MAXFNAMELEN, WalRcvData::mutex, MyProc, MyProcPid, NAMEDATALEN, NAPTIME_PER_CYCLE, NI_MAXHOST, now(), on_shmem_exit(), options, PANIC, pfree(), pg_memory_barrier, PG_SETMASK, PGC_SIGHUP, PGINVALID_SOCKET, WalRcvStreamOptions::physical, WalRcvData::pid, pqsignal(), proc_exit(), ProcessConfigFile(), ProcessWalRcvInterrupts(), PGPROC::procLatch, WalRcvStreamOptions::proto, WalRcvData::ready_to_display, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, ResetLatch(), ResourceOwnerCreate(), WalRcvData::sender_host, WalRcvData::sender_port, server_version, SIG_DFL, SIG_IGN, SIGALRM, SIGCHLD, SIGCONT, SIGHUP, SIGPIPE, SIGQUIT, SIGTTIN, SIGTTOU, SIGUSR1, SIGUSR2, SIGWINCH, WalRcvData::slotname, WalRcvStreamOptions::slotname, snprintf(), SpinLockAcquire, SpinLockRelease, WalRcvStreamOptions::startpoint, strlcpy(), ThisTimeLineID, TimestampTzPlusMilliseconds, UINT64_FORMAT, UnBlockSig, WAIT_EVENT_WAL_RECEIVER_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, wal_segment_size, WalRcv, walrcv_connect, walrcv_endstreaming, walrcv_get_conninfo, walrcv_get_senderinfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvQuickDieHandler(), WalRcvShutdownHandler(), WalRcvSigHupHandler(), WalRcvSigUsr1Handler(), WalRcvData::walRcvState, WalRcvWaitForStartPosition(), WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_TIMEOUT, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName, XLogFileNameP(), XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by AuxiliaryProcessMain(), and walrcv_clear_result().

190 {
191  char conninfo[MAXCONNINFO];
192  char *tmp_conninfo;
193  char slotname[NAMEDATALEN];
194  XLogRecPtr startpoint;
195  TimeLineID startpointTLI;
196  TimeLineID primaryTLI;
197  bool first_stream;
198  WalRcvData *walrcv = WalRcv;
199  TimestampTz last_recv_timestamp;
201  bool ping_sent;
202  char *err;
203  char *sender_host = NULL;
204  int sender_port = 0;
205 
206  /*
207  * WalRcv should be set up already (if we are a backend, we inherit this
208  * by fork() or EXEC_BACKEND mechanism from the postmaster).
209  */
210  Assert(walrcv != NULL);
211 
212  now = GetCurrentTimestamp();
213 
214  /*
215  * Mark walreceiver as running in shared memory.
216  *
217  * Do this as early as possible, so that if we fail later on, we'll set
218  * state to STOPPED. If we die before this, the startup process will keep
219  * waiting for us to start up, until it times out.
220  */
221  SpinLockAcquire(&walrcv->mutex);
222  Assert(walrcv->pid == 0);
223  switch (walrcv->walRcvState)
224  {
225  case WALRCV_STOPPING:
226  /* If we've already been requested to stop, don't start up. */
227  walrcv->walRcvState = WALRCV_STOPPED;
228  /* fall through */
229 
230  case WALRCV_STOPPED:
231  SpinLockRelease(&walrcv->mutex);
232  proc_exit(1);
233  break;
234 
235  case WALRCV_STARTING:
236  /* The usual case */
237  break;
238 
239  case WALRCV_WAITING:
240  case WALRCV_STREAMING:
241  case WALRCV_RESTARTING:
242  default:
243  /* Shouldn't happen */
244  SpinLockRelease(&walrcv->mutex);
245  elog(PANIC, "walreceiver still running according to shared memory state");
246  }
247  /* Advertise our PID so that the startup process can kill us */
248  walrcv->pid = MyProcPid;
249  walrcv->walRcvState = WALRCV_STREAMING;
250 
251  /* Fetch information required to start streaming */
252  walrcv->ready_to_display = false;
253  strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
254  strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
255  startpoint = walrcv->receiveStart;
256  startpointTLI = walrcv->receiveStartTLI;
257 
258  /* Initialise to a sanish value */
259  walrcv->lastMsgSendTime =
260  walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
261 
262  /* Report the latch to use to awaken this process */
263  walrcv->latch = &MyProc->procLatch;
264 
265  SpinLockRelease(&walrcv->mutex);
266 
267  /* Arrange to clean up at walreceiver exit */
269 
270  /* Properly accept or ignore signals the postmaster might send us */
271  pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */
272  pqsignal(SIGINT, SIG_IGN);
273  pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */
274  pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */
279 
280  /* Reset some signals that are accepted by postmaster but not here */
286 
287  /* We allow SIGQUIT (quickdie) at all times */
288  sigdelset(&BlockSig, SIGQUIT);
289 
290  /* Load the libpq-specific functions */
291  load_file("libpqwalreceiver", false);
292  if (WalReceiverFunctions == NULL)
293  elog(ERROR, "libpqwalreceiver didn't initialize correctly");
294 
295  /*
296  * Create a resource owner to keep track of our resources (not clear that
297  * we need this, but may as well have one).
298  */
299  CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
300 
301  /* Unblock signals (they were blocked when the postmaster forked us) */
303 
304  /* Establish the connection to the primary for XLOG streaming */
306  wrconn = walrcv_connect(conninfo, false, "walreceiver", &err);
307  if (!wrconn)
308  ereport(ERROR,
309  (errmsg("could not connect to the primary server: %s", err)));
311 
312  /*
313  * Save user-visible connection string. This clobbers the original
314  * conninfo, for security. Also save host and port of the sender server
315  * this walreceiver is connected to.
316  */
317  tmp_conninfo = walrcv_get_conninfo(wrconn);
318  walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
319  SpinLockAcquire(&walrcv->mutex);
320  memset(walrcv->conninfo, 0, MAXCONNINFO);
321  if (tmp_conninfo)
322  strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
323 
324  memset(walrcv->sender_host, 0, NI_MAXHOST);
325  if (sender_host)
326  strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
327 
328  walrcv->sender_port = sender_port;
329  walrcv->ready_to_display = true;
330  SpinLockRelease(&walrcv->mutex);
331 
332  if (tmp_conninfo)
333  pfree(tmp_conninfo);
334 
335  if (sender_host)
336  pfree(sender_host);
337 
338  first_stream = true;
339  for (;;)
340  {
341  char *primary_sysid;
342  char standby_sysid[32];
343  int server_version;
345 
346  /*
347  * Check that we're connected to a valid server using the
348  * IDENTIFY_SYSTEM replication command.
349  */
351  primary_sysid = walrcv_identify_system(wrconn, &primaryTLI,
352  &server_version);
353 
354  snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
356  if (strcmp(primary_sysid, standby_sysid) != 0)
357  {
358  ereport(ERROR,
359  (errmsg("database system identifier differs between the primary and standby"),
360  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
361  primary_sysid, standby_sysid)));
362  }
364 
365  /*
366  * Confirm that the current timeline of the primary is the same or
367  * ahead of ours.
368  */
369  if (primaryTLI < startpointTLI)
370  ereport(ERROR,
371  (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
372  primaryTLI, startpointTLI)));
373 
374  /*
375  * Get any missing history files. We do this always, even when we're
376  * not interested in that timeline, so that if we're promoted to
377  * become the master later on, we don't select the same timeline that
378  * was already used in the current master. This isn't bullet-proof -
379  * you'll need some external software to manage your cluster if you
380  * need to ensure that a unique timeline id is chosen in every case,
381  * but let's avoid the confusion of timeline id collisions where we
382  * can.
383  */
384  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
385 
386  /*
387  * Start streaming.
388  *
389  * We'll try to start at the requested starting point and timeline,
390  * even if it's different from the server's latest timeline. In case
391  * we've already reached the end of the old timeline, the server will
392  * finish the streaming immediately, and we will go back to await
393  * orders from the startup process. If recovery_target_timeline is
394  * 'latest', the startup process will scan pg_wal and find the new
395  * history file, bump recovery target timeline, and ask us to restart
396  * on the new timeline.
397  */
398  options.logical = false;
399  options.startpoint = startpoint;
400  options.slotname = slotname[0] != '\0' ? slotname : NULL;
401  options.proto.physical.startpointTLI = startpointTLI;
402  ThisTimeLineID = startpointTLI;
403  if (walrcv_startstreaming(wrconn, &options))
404  {
405  if (first_stream)
406  ereport(LOG,
407  (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
408  (uint32) (startpoint >> 32), (uint32) startpoint,
409  startpointTLI)));
410  else
411  ereport(LOG,
412  (errmsg("restarted WAL streaming at %X/%X on timeline %u",
413  (uint32) (startpoint >> 32), (uint32) startpoint,
414  startpointTLI)));
415  first_stream = false;
416 
417  /* Initialize LogstreamResult and buffers for processing messages */
421 
422  /* Initialize the last recv timestamp */
423  last_recv_timestamp = GetCurrentTimestamp();
424  ping_sent = false;
425 
426  /* Loop until end-of-streaming or error */
427  for (;;)
428  {
429  char *buf;
430  int len;
431  bool endofwal = false;
432  pgsocket wait_fd = PGINVALID_SOCKET;
433  int rc;
434 
435  /*
436  * Exit walreceiver if we're not in recovery. This should not
437  * happen, but cross-check the status here.
438  */
439  if (!RecoveryInProgress())
440  ereport(FATAL,
441  (errmsg("cannot continue WAL streaming, recovery has already ended")));
442 
443  /* Process any requests or signals received recently */
445 
446  if (got_SIGHUP)
447  {
448  got_SIGHUP = false;
451  }
452 
453  /* See if we can read data immediately */
454  len = walrcv_receive(wrconn, &buf, &wait_fd);
455  if (len != 0)
456  {
457  /*
458  * Process the received data, and any subsequent data we
459  * can read without blocking.
460  */
461  for (;;)
462  {
463  if (len > 0)
464  {
465  /*
466  * Something was received from master, so reset
467  * timeout
468  */
469  last_recv_timestamp = GetCurrentTimestamp();
470  ping_sent = false;
471  XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
472  }
473  else if (len == 0)
474  break;
475  else if (len < 0)
476  {
477  ereport(LOG,
478  (errmsg("replication terminated by primary server"),
479  errdetail("End of WAL reached on timeline %u at %X/%X.",
480  startpointTLI,
481  (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
482  endofwal = true;
483  break;
484  }
485  len = walrcv_receive(wrconn, &buf, &wait_fd);
486  }
487 
488  /* Let the master know that we received some data. */
489  XLogWalRcvSendReply(false, false);
490 
491  /*
492  * If we've written some records, flush them to disk and
493  * let the startup process and primary server know about
494  * them.
495  */
496  XLogWalRcvFlush(false);
497  }
498 
499  /* Check if we need to exit the streaming loop. */
500  if (endofwal)
501  break;
502 
503  /*
504  * Ideally we would reuse a WaitEventSet object repeatedly
505  * here to avoid the overheads of WaitLatchOrSocket on epoll
506  * systems, but we can't be sure that libpq (or any other
507  * walreceiver implementation) has the same socket (even if
508  * the fd is the same number, it may have been closed and
509  * reopened since the last time). In future, if there is a
510  * function for removing sockets from WaitEventSet, then we
511  * could add and remove just the socket each time, potentially
512  * avoiding some system calls.
513  */
514  Assert(wait_fd != PGINVALID_SOCKET);
515  rc = WaitLatchOrSocket(walrcv->latch,
518  wait_fd,
521  if (rc & WL_LATCH_SET)
522  {
523  ResetLatch(walrcv->latch);
524  if (walrcv->force_reply)
525  {
526  /*
527  * The recovery process has asked us to send apply
528  * feedback now. Make sure the flag is really set to
529  * false in shared memory before sending the reply, so
530  * we don't miss a new request for a reply.
531  */
532  walrcv->force_reply = false;
534  XLogWalRcvSendReply(true, false);
535  }
536  }
537  if (rc & WL_POSTMASTER_DEATH)
538  {
539  /*
540  * Emergency bailout if postmaster has died. This is to
541  * avoid the necessity for manual cleanup of all
542  * postmaster children.
543  */
544  exit(1);
545  }
546  if (rc & WL_TIMEOUT)
547  {
548  /*
549  * We didn't receive anything new. If we haven't heard
550  * anything from the server for more than
551  * wal_receiver_timeout / 2, ping the server. Also, if
552  * it's been longer than wal_receiver_status_interval
553  * since the last update we sent, send a status update to
554  * the master anyway, to report any progress in applying
555  * WAL.
556  */
557  bool requestReply = false;
558 
559  /*
560  * Check if time since last receive from standby has
561  * reached the configured limit.
562  */
563  if (wal_receiver_timeout > 0)
564  {
566  TimestampTz timeout;
567 
568  timeout =
569  TimestampTzPlusMilliseconds(last_recv_timestamp,
571 
572  if (now >= timeout)
573  ereport(ERROR,
574  (errmsg("terminating walreceiver due to timeout")));
575 
576  /*
577  * We didn't receive anything new, for half of
578  * receiver replication timeout. Ping the server.
579  */
580  if (!ping_sent)
581  {
582  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
583  (wal_receiver_timeout / 2));
584  if (now >= timeout)
585  {
586  requestReply = true;
587  ping_sent = true;
588  }
589  }
590  }
591 
592  XLogWalRcvSendReply(requestReply, requestReply);
594  }
595  }
596 
597  /*
598  * The backend finished streaming. Exit streaming COPY-mode from
599  * our side, too.
600  */
602  walrcv_endstreaming(wrconn, &primaryTLI);
604 
605  /*
606  * If the server had switched to a new timeline that we didn't
607  * know about when we began streaming, fetch its timeline history
608  * file now.
609  */
610  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
611  }
612  else
613  ereport(LOG,
614  (errmsg("primary server contains no more WAL on requested timeline %u",
615  startpointTLI)));
616 
617  /*
618  * End of WAL reached on the requested timeline. Close the last
619  * segment, and await for new orders from the startup process.
620  */
621  if (recvFile >= 0)
622  {
623  char xlogfname[MAXFNAMELEN];
624 
625  XLogWalRcvFlush(false);
626  if (close(recvFile) != 0)
627  ereport(PANIC,
629  errmsg("could not close log segment %s: %m",
631 
632  /*
633  * Create .done file forcibly to prevent the streamed segment from
634  * being archived later.
635  */
638  XLogArchiveForceDone(xlogfname);
639  else
640  XLogArchiveNotify(xlogfname);
641  }
642  recvFile = -1;
643 
644  elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
645  WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
646  }
647  /* not reached */
648 }
static struct @25 LogstreamResult
int sender_port
Definition: walreceiver.h:117
#define SIGQUIT
Definition: win32_port.h:164
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
Definition: walreceiver.c:739
#define walrcv_get_conninfo(conn)
Definition: walreceiver.h:259
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:269
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
uint32 TimeLineID
Definition: xlogdefs.h:45
struct WalRcvStreamOptions::@107::@108 physical
#define SIGTTOU
Definition: win32_port.h:175
slock_t mutex
Definition: walreceiver.h:137
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
int wal_segment_size
Definition: xlog.c:113
#define SIGTTIN
Definition: win32_port.h:174
#define SIGUSR1
Definition: win32_port.h:177
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
#define SIGCHLD
Definition: win32_port.h:173
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
Definition: walreceiver.c:654
sig_atomic_t force_reply
Definition: walreceiver.h:144
#define SIGWINCH
Definition: win32_port.h:176
#define SIGCONT
Definition: win32_port.h:172
WalRcvState walRcvState
Definition: walreceiver.h:64
ResourceOwner CurrentResourceOwner
Definition: resowner.c:140
static StringInfoData incoming_message
Definition: walreceiver.c:113
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:155
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1194
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:271
static void WalRcvSigHupHandler(SIGNAL_ARGS)
Definition: walreceiver.c:819
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:267
void proc_exit(int code)
Definition: ipc.c:104
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define WL_SOCKET_READABLE
Definition: latch.h:125
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
Definition: walreceiver.c:890
#define SIGPIPE
Definition: win32_port.h:168
#define SIGUSR2
Definition: win32_port.h:178
void ResetLatch(volatile Latch *latch)
Definition: latch.c:497
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7949
#define PANIC
Definition: elog.h:53
#define NAPTIME_PER_CYCLE
Definition: walreceiver.c:83
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:98
TimestampTz lastMsgSendTime
Definition: walreceiver.h:97
#define PG_SETMASK(mask)
Definition: pqsignal.h:19
int wal_receiver_timeout
Definition: walreceiver.c:76
Latch procLatch
Definition: proc.h:104
static WalReceiverConn * wrconn
Definition: walreceiver.c:80
union WalRcvStreamOptions::@107 proto
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
static StringInfoData reply_message
Definition: walreceiver.c:112
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:514
void pfree(void *pointer)
Definition: mcxt.c:1031
#define NI_MAXHOST
Definition: getaddrinfo.h:88
#define ERROR
Definition: elog.h:43
static void WalRcvSigUsr1Handler(SIGNAL_ARGS)
Definition: walreceiver.c:827
#define FATAL
Definition: elog.h:52
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11194
pid_t pid
Definition: walreceiver.h:63
#define MAXCONNINFO
Definition: walreceiver.h:36
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:359
#define walrcv_identify_system(conn, primary_tli, server_version)
Definition: walreceiver.h:263
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10197
static char * buf
Definition: pg_test_fsync.c:67
int XLogArchiveMode
Definition: xlog.c:94
int errdetail(const char *fmt,...)
Definition: elog.c:873
int errcode_for_file_access(void)
Definition: elog.c:598
#define SIGHUP
Definition: win32_port.h:163
static void DisableWalRcvImmediateExit(void)
Definition: walreceiver.c:181
XLogRecPtr startpoint
Definition: walreceiver.h:154
unsigned int uint32
Definition: c.h:325
int pgsocket
Definition: port.h:31
sigset_t UnBlockSig
Definition: pqsignal.c:22
static volatile sig_atomic_t got_SIGHUP
Definition: walreceiver.c:99
#define ereport(elevel, rest)
Definition: elog.h:122
TimeLineID receiveStartTLI
Definition: walreceiver.h:74
static TimeLineID recvFileTLI
Definition: walreceiver.c:91
Definition: guc.h:72
Latch * latch
Definition: walreceiver.h:135
#define SIG_IGN
Definition: win32_port.h:160
static char ** options
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
static XLogSegNo recvSegNo
Definition: walreceiver.c:92
#define MAXFNAMELEN
#define SpinLockRelease(lock)
Definition: spin.h:64
sigset_t BlockSig
Definition: pqsignal.c:22
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void EnableWalRcvImmediateExit(void)
Definition: walreceiver.c:174
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:564
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:81
#define PGINVALID_SOCKET
Definition: port.h:33
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1125
bool ready_to_display
Definition: walreceiver.h:126
TimestampTz latestWalEndTime
Definition: walreceiver.h:104
static void WalRcvShutdownHandler(SIGNAL_ARGS)
Definition: walreceiver.c:838
TimeLineID ThisTimeLineID
Definition: xlog.c:181
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define pg_memory_barrier()
Definition: atomics.h:148
#define SIG_DFL
Definition: win32_port.h:158
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:699
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:137
static int server_version
Definition: pg_dumpall.c:79
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
#define SIGALRM
Definition: win32_port.h:169
static void WalRcvQuickDieHandler(SIGNAL_ARGS)
Definition: walreceiver.c:861
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1066
WalRcvData * WalRcv
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4741
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:116
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void WalRcvDie(int code, Datum arg)
Definition: walreceiver.c:788
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
Definition: walreceiver.h:261
#define elog
Definition: elog.h:219
#define close(a)
Definition: win32.h:12
XLogRecPtr receiveStart
Definition: walreceiver.h:73
static int recvFile
Definition: walreceiver.c:90
#define WL_LATCH_SET
Definition: latch.h:124
#define UINT64_FORMAT
Definition: c.h:368
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
char slotname[NAMEDATALEN]
Definition: walreceiver.h:123
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:110
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:418
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:255

◆ XLogWalRcvFlush()

static void XLogWalRcvFlush ( bool  dying)
static

Definition at line 1066 of file walreceiver.c.

References AllowCascadeReplication, issue_xlog_fsync(), WalRcvData::latestChunkStart, LogstreamResult, WalRcvData::mutex, WalRcvData::receivedTLI, WalRcvData::receivedUpto, recvFile, recvSegNo, set_ps_display(), snprintf(), SpinLockAcquire, SpinLockRelease, ThisTimeLineID, update_process_title, WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvWrite().

1067 {
1068  if (LogstreamResult.Flush < LogstreamResult.Write)
1069  {
1070  WalRcvData *walrcv = WalRcv;
1071 
1073 
1074  LogstreamResult.Flush = LogstreamResult.Write;
1075 
1076  /* Update shared-memory status */
1077  SpinLockAcquire(&walrcv->mutex);
1078  if (walrcv->receivedUpto < LogstreamResult.Flush)
1079  {
1080  walrcv->latestChunkStart = walrcv->receivedUpto;
1081  walrcv->receivedUpto = LogstreamResult.Flush;
1082  walrcv->receivedTLI = ThisTimeLineID;
1083  }
1084  SpinLockRelease(&walrcv->mutex);
1085 
1086  /* Signal the startup process and walsender that new WAL has arrived */
1087  WakeupRecovery();
1089  WalSndWakeup();
1090 
1091  /* Report XLOG streaming progress in PS display */
1093  {
1094  char activitymsg[50];
1095 
1096  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1097  (uint32) (LogstreamResult.Write >> 32),
1098  (uint32) LogstreamResult.Write);
1099  set_ps_display(activitymsg, false);
1100  }
1101 
1102  /* Also let the master know that we made some progress */
1103  if (!dying)
1104  {
1105  XLogWalRcvSendReply(false, false);
1106  XLogWalRcvSendHSFeedback(false);
1107  }
1108  }
1109 }
static struct @25 LogstreamResult
slock_t mutex
Definition: walreceiver.h:137
bool update_process_title
Definition: ps_status.c:35
void issue_xlog_fsync(int fd, XLogSegNo segno)
Definition: xlog.c:10154
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1194
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:326
TimeLineID receivedTLI
Definition: walreceiver.h:84
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12284
XLogRecPtr latestChunkStart
Definition: walreceiver.h:92
#define AllowCascadeReplication()
Definition: walreceiver.h:39
unsigned int uint32
Definition: c.h:325
XLogRecPtr receivedUpto
Definition: walreceiver.h:83
static XLogSegNo recvSegNo
Definition: walreceiver.c:92
#define SpinLockRelease(lock)
Definition: spin.h:64
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1125
TimeLineID ThisTimeLineID
Definition: xlog.c:181
WalRcvData * WalRcv
static int recvFile
Definition: walreceiver.c:90
void WalSndWakeup(void)
Definition: walsender.c:3020

◆ XLogWalRcvProcessMsg()

static void XLogWalRcvProcessMsg ( unsigned char  type,
char *  buf,
Size  len 
)
static

Definition at line 890 of file walreceiver.c.

References appendBinaryStringInfo(), ereport, errcode(), errmsg_internal(), ERROR, pq_getmsgbyte(), pq_getmsgint64(), ProcessWalSndrMessage(), resetStringInfo(), XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

891 {
892  int hdrlen;
893  XLogRecPtr dataStart;
894  XLogRecPtr walEnd;
895  TimestampTz sendTime;
896  bool replyRequested;
897 
899 
900  switch (type)
901  {
902  case 'w': /* WAL records */
903  {
904  /* copy message to StringInfo */
905  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
906  if (len < hdrlen)
907  ereport(ERROR,
908  (errcode(ERRCODE_PROTOCOL_VIOLATION),
909  errmsg_internal("invalid WAL message received from primary")));
911 
912  /* read the fields */
913  dataStart = pq_getmsgint64(&incoming_message);
914  walEnd = pq_getmsgint64(&incoming_message);
915  sendTime = pq_getmsgint64(&incoming_message);
916  ProcessWalSndrMessage(walEnd, sendTime);
917 
918  buf += hdrlen;
919  len -= hdrlen;
920  XLogWalRcvWrite(buf, len, dataStart);
921  break;
922  }
923  case 'k': /* Keepalive */
924  {
925  /* copy message to StringInfo */
926  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
927  if (len != hdrlen)
928  ereport(ERROR,
929  (errcode(ERRCODE_PROTOCOL_VIOLATION),
930  errmsg_internal("invalid keepalive message received from primary")));
932 
933  /* read the fields */
934  walEnd = pq_getmsgint64(&incoming_message);
935  sendTime = pq_getmsgint64(&incoming_message);
936  replyRequested = pq_getmsgbyte(&incoming_message);
937 
938  ProcessWalSndrMessage(walEnd, sendTime);
939 
940  /* If the primary requested a reply, send one immediately */
941  if (replyRequested)
942  XLogWalRcvSendReply(true, false);
943  break;
944  }
945  default:
946  ereport(ERROR,
947  (errcode(ERRCODE_PROTOCOL_VIOLATION),
948  errmsg_internal("invalid replication message type %d",
949  type)));
950  }
951 }
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
Definition: walreceiver.c:1307
int64 TimestampTz
Definition: timestamp.h:39
static StringInfoData incoming_message
Definition: walreceiver.c:113
int errcode(int sqlerrcode)
Definition: elog.c:575
#define ERROR
Definition: elog.h:43
static char * buf
Definition: pg_test_fsync.c:67
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
#define ereport(elevel, rest)
Definition: elog.h:122
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1125
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:208
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
Definition: walreceiver.c:957

◆ XLogWalRcvSendHSFeedback()

static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1194 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), GetNextXidAndEpoch(), GetOldestXmin(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), pq_sendbyte(), pq_sendint32(), pq_sendint64(), PROCARRAY_FLAGS_DEFAULT, PROCARRAY_SLOTS_XMIN, ProcArrayGetReplicationSlotXmin(), resetStringInfo(), TimestampDifferenceExceeds(), TransactionIdIsValid, TransactionIdPrecedes(), wal_receiver_status_interval, and walrcv_send.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

1195 {
1196  TimestampTz now;
1197  TransactionId nextXid;
1198  uint32 xmin_epoch,
1199  catalog_xmin_epoch;
1200  TransactionId xmin,
1201  catalog_xmin;
1202  static TimestampTz sendTime = 0;
1203 
1204  /* initially true so we always send at least one feedback message */
1205  static bool master_has_standby_xmin = true;
1206 
1207  /*
1208  * If the user doesn't want status to be reported to the master, be sure
1209  * to exit before doing anything at all.
1210  */
1212  !master_has_standby_xmin)
1213  return;
1214 
1215  /* Get current timestamp. */
1216  now = GetCurrentTimestamp();
1217 
1218  if (!immed)
1219  {
1220  /*
1221  * Send feedback at most once per wal_receiver_status_interval.
1222  */
1223  if (!TimestampDifferenceExceeds(sendTime, now,
1225  return;
1226  sendTime = now;
1227  }
1228 
1229  /*
1230  * If Hot Standby is not yet accepting connections there is nothing to
1231  * send. Check this after the interval has expired to reduce number of
1232  * calls.
1233  *
1234  * Bailing out here also ensures that we don't send feedback until we've
1235  * read our own replication slot state, so we don't tell the master to
1236  * discard needed xmin or catalog_xmin from any slots that may exist on
1237  * this replica.
1238  */
1239  if (!HotStandbyActive())
1240  return;
1241 
1242  /*
1243  * Make the expensive call to get the oldest xmin once we are certain
1244  * everything else has been checked.
1245  */
1247  {
1248  TransactionId slot_xmin;
1249 
1250  /*
1251  * Usually GetOldestXmin() would include both global replication slot
1252  * xmin and catalog_xmin in its calculations, but we want to derive
1253  * separate values for each of those. So we ask for an xmin that
1254  * excludes the catalog_xmin.
1255  */
1256  xmin = GetOldestXmin(NULL,
1258 
1259  ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
1260 
1261  if (TransactionIdIsValid(slot_xmin) &&
1262  TransactionIdPrecedes(slot_xmin, xmin))
1263  xmin = slot_xmin;
1264  }
1265  else
1266  {
1267  xmin = InvalidTransactionId;
1268  catalog_xmin = InvalidTransactionId;
1269  }
1270 
1271  /*
1272  * Get epoch and adjust if nextXid and oldestXmin are different sides of
1273  * the epoch boundary.
1274  */
1275  GetNextXidAndEpoch(&nextXid, &xmin_epoch);
1276  catalog_xmin_epoch = xmin_epoch;
1277  if (nextXid < xmin)
1278  xmin_epoch--;
1279  if (nextXid < catalog_xmin)
1280  catalog_xmin_epoch--;
1281 
1282  elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1283  xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1284 
1285  /* Construct the message and send it. */
1287  pq_sendbyte(&reply_message, 'h');
1289  pq_sendint32(&reply_message, xmin);
1290  pq_sendint32(&reply_message, xmin_epoch);
1291  pq_sendint32(&reply_message, catalog_xmin);
1292  pq_sendint32(&reply_message, catalog_xmin_epoch);
1294  if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1295  master_has_standby_xmin = true;
1296  else
1297  master_has_standby_xmin = false;
1298 }
bool hot_standby_feedback
Definition: walreceiver.c:77
uint32 TransactionId
Definition: c.h:474
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
static void pq_sendint32(StringInfo buf, int32 i)
Definition: pqformat.h:148
bool HotStandbyActive(void)
Definition: xlog.c:8005
static void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.h:156
static void pq_sendbyte(StringInfo buf, int8 byt)
Definition: pqformat.h:164
int wal_receiver_status_interval
Definition: walreceiver.c:75
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1649
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:2986
static WalReceiverConn * wrconn
Definition: walreceiver.c:80
static StringInfoData reply_message
Definition: walreceiver.c:112
void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
Definition: xlog.c:8350
#define PROCARRAY_SLOTS_XMIN
Definition: procarray.h:37
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
unsigned int uint32
Definition: c.h:325
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define PROCARRAY_FLAGS_DEFAULT
Definition: procarray.h:50
TransactionId GetOldestXmin(Relation rel, int flags)
Definition: procarray.c:1315
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:273
#define elog
Definition: elog.h:219
#define TransactionIdIsValid(xid)
Definition: transam.h:41
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534

◆ XLogWalRcvSendReply()

static void XLogWalRcvSendReply ( bool  force,
bool  requestReply 
)
static

Definition at line 1125 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), GetXLogReplayRecPtr(), StringInfoData::len, LogstreamResult, now(), pq_sendbyte(), pq_sendint64(), resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

1126 {
1127  static XLogRecPtr writePtr = 0;
1128  static XLogRecPtr flushPtr = 0;
1129  XLogRecPtr applyPtr;
1130  static TimestampTz sendTime = 0;
1131  TimestampTz now;
1132 
1133  /*
1134  * If the user doesn't want status to be reported to the master, be sure
1135  * to exit before doing anything at all.
1136  */
1137  if (!force && wal_receiver_status_interval <= 0)
1138  return;
1139 
1140  /* Get current timestamp. */
1141  now = GetCurrentTimestamp();
1142 
1143  /*
1144  * We can compare the write and flush positions to the last message we
1145  * sent without taking any lock, but the apply position requires a spin
1146  * lock, so we don't check that unless something else has changed or 10
1147  * seconds have passed. This means that the apply WAL location will
1148  * appear, from the master's point of view, to lag slightly, but since
1149  * this is only for reporting purposes and only on idle systems, that's
1150  * probably OK.
1151  */
1152  if (!force
1153  && writePtr == LogstreamResult.Write
1154  && flushPtr == LogstreamResult.Flush
1155  && !TimestampDifferenceExceeds(sendTime, now,
1157  return;
1158  sendTime = now;
1159 
1160  /* Construct a new message */
1161  writePtr = LogstreamResult.Write;
1162  flushPtr = LogstreamResult.Flush;
1163  applyPtr = GetXLogReplayRecPtr(NULL);
1164 
1166  pq_sendbyte(&reply_message, 'r');
1167  pq_sendint64(&reply_message, writePtr);
1168  pq_sendint64(&reply_message, flushPtr);
1169  pq_sendint64(&reply_message, applyPtr);
1171  pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1172 
1173  /* Send it */
1174  elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1175  (uint32) (writePtr >> 32), (uint32) writePtr,
1176  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1177  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1178  requestReply ? " (reply requested)" : "");
1179 
1181 }
static struct @25 LogstreamResult
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
static void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.h:156
static void pq_sendbyte(StringInfo buf, int8 byt)
Definition: pqformat.h:164
int wal_receiver_status_interval
Definition: walreceiver.c:75
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1649
static WalReceiverConn * wrconn
Definition: walreceiver.c:80
static StringInfoData reply_message
Definition: walreceiver.c:112
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11194
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
unsigned int uint32
Definition: c.h:325
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:273
#define elog
Definition: elog.h:219
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534

◆ XLogWalRcvWrite()

static void XLogWalRcvWrite ( char *  buf,
Size  nbytes,
XLogRecPtr  recptr 
)
static

Definition at line 957 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, close, ereport, errcode_for_file_access(), errmsg(), LogstreamResult, MAXFNAMELEN, PANIC, recvFile, recvFileTLI, recvOff, recvSegNo, ThisTimeLineID, wal_segment_size, write, XLByteInSeg, XLByteToSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileInit(), XLogFileName, XLogFileNameP(), XLogSegmentOffset, and XLogWalRcvFlush().

Referenced by XLogWalRcvProcessMsg().

958 {
959  int startoff;
960  int byteswritten;
961 
962  while (nbytes > 0)
963  {
964  int segbytes;
965 
966  if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
967  {
968  bool use_existent;
969 
970  /*
971  * fsync() and close current file before we switch to next one. We
972  * would otherwise have to reopen this file to fsync it later
973  */
974  if (recvFile >= 0)
975  {
976  char xlogfname[MAXFNAMELEN];
977 
978  XLogWalRcvFlush(false);
979 
980  /*
981  * XLOG segment files will be re-read by recovery in startup
982  * process soon, so we don't advise the OS to release cache
983  * pages associated with the file like XLogFileClose() does.
984  */
985  if (close(recvFile) != 0)
986  ereport(PANIC,
988  errmsg("could not close log segment %s: %m",
990 
991  /*
992  * Create .done file forcibly to prevent the streamed segment
993  * from being archived later.
994  */
997  XLogArchiveForceDone(xlogfname);
998  else
999  XLogArchiveNotify(xlogfname);
1000  }
1001  recvFile = -1;
1002 
1003  /* Create/use new log file */
1005  use_existent = true;
1006  recvFile = XLogFileInit(recvSegNo, &use_existent, true);
1008  recvOff = 0;
1009  }
1010 
1011  /* Calculate the start offset of the received logs */
1012  startoff = XLogSegmentOffset(recptr, wal_segment_size);
1013 
1014  if (startoff + nbytes > wal_segment_size)
1015  segbytes = wal_segment_size - startoff;
1016  else
1017  segbytes = nbytes;
1018 
1019  /* Need to seek in the file? */
1020  if (recvOff != startoff)
1021  {
1022  if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
1023  ereport(PANIC,
1025  errmsg("could not seek in log segment %s to offset %u: %m",
1027  startoff)));
1028  recvOff = startoff;
1029  }
1030 
1031  /* OK to write the logs */
1032  errno = 0;
1033 
1034  byteswritten = write(recvFile, buf, segbytes);
1035  if (byteswritten <= 0)
1036  {
1037  /* if write didn't set errno, assume no disk space */
1038  if (errno == 0)
1039  errno = ENOSPC;
1040  ereport(PANIC,
1042  errmsg("could not write to log segment %s "
1043  "at offset %u, length %lu: %m",
1045  recvOff, (unsigned long) segbytes)));
1046  }
1047 
1048  /* Update state for write */
1049  recptr += byteswritten;
1050 
1051  recvOff += byteswritten;
1052  nbytes -= byteswritten;
1053  buf += byteswritten;
1054 
1055  LogstreamResult.Write = recptr;
1056  }
1057 }
static struct @25 LogstreamResult
int wal_segment_size
Definition: xlog.c:113
#define write(a, b, c)
Definition: win32.h:14
int XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
Definition: xlog.c:3180
#define PANIC
Definition: elog.h:53
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:514
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10197
static char * buf
Definition: pg_test_fsync.c:67
int XLogArchiveMode
Definition: xlog.c:94
int errcode_for_file_access(void)
Definition: elog.c:598
#define ereport(elevel, rest)
Definition: elog.h:122
static TimeLineID recvFileTLI
Definition: walreceiver.c:91
static XLogSegNo recvSegNo
Definition: walreceiver.c:92
#define MAXFNAMELEN
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:564
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
TimeLineID ThisTimeLineID
Definition: xlog.c:181
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1066
int errmsg(const char *fmt,...)
Definition: elog.c:797
static uint32 recvOff
Definition: walreceiver.c:93
#define close(a)
Definition: win32.h:12
static int recvFile
Definition: walreceiver.c:90
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

Variable Documentation

◆ Flush

XLogRecPtr Flush

Definition at line 109 of file walreceiver.c.

◆ got_SIGHUP

volatile sig_atomic_t got_SIGHUP = false
static

Definition at line 99 of file walreceiver.c.

Referenced by WalRcvSigHupHandler(), and WalReceiverMain().

◆ got_SIGTERM

volatile sig_atomic_t got_SIGTERM = false
static

Definition at line 100 of file walreceiver.c.

Referenced by ProcessWalRcvInterrupts(), and WalRcvShutdownHandler().

◆ hot_standby_feedback

bool hot_standby_feedback

Definition at line 77 of file walreceiver.c.

Referenced by XLogWalRcvSendHSFeedback().

◆ incoming_message

StringInfoData incoming_message
static

Definition at line 113 of file walreceiver.c.

◆ LogstreamResult

struct { ... } LogstreamResult

◆ recvFile

int recvFile = -1
static

Definition at line 90 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ recvFileTLI

TimeLineID recvFileTLI = 0
static

Definition at line 91 of file walreceiver.c.

Referenced by WalReceiverMain(), and XLogWalRcvWrite().

◆ recvOff

uint32 recvOff = 0
static

Definition at line 93 of file walreceiver.c.

Referenced by XLogWalRcvWrite().

◆ recvSegNo

XLogSegNo recvSegNo = 0
static

Definition at line 92 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ reply_message

StringInfoData reply_message
static

Definition at line 112 of file walreceiver.c.

Referenced by send_feedback().

◆ wal_receiver_status_interval

int wal_receiver_status_interval

Definition at line 75 of file walreceiver.c.

Referenced by send_feedback(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ wal_receiver_timeout

int wal_receiver_timeout

Definition at line 76 of file walreceiver.c.

Referenced by logicalrep_worker_launch(), LogicalRepApplyLoop(), and WalReceiverMain().

◆ WalRcvImmediateInterruptOK

volatile bool WalRcvImmediateInterruptOK = false
static

◆ WalReceiverFunctions

WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 81 of file walreceiver.c.

Referenced by _PG_init().

◆ wrconn

WalReceiverConn* wrconn = NULL
static

Definition at line 80 of file walreceiver.c.

◆ Write

XLogRecPtr Write

Definition at line 108 of file walreceiver.c.