PostgreSQL Source Code  git master
walreceiver.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */
 

Functions

static void ProcessWalRcvInterrupts (void)
 
static void EnableWalRcvImmediateExit (void)
 
static void DisableWalRcvImmediateExit (void)
 
static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr)
 
static void XLogWalRcvFlush (bool dying)
 
static void XLogWalRcvSendReply (bool force, bool requestReply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
static void WalRcvSigHupHandler (SIGNAL_ARGS)
 
static void WalRcvSigUsr1Handler (SIGNAL_ARGS)
 
static void WalRcvShutdownHandler (SIGNAL_ARGS)
 
static void WalRcvQuickDieHandler (SIGNAL_ARGS)
 
void WalReceiverMain (void)
 
void WalRcvForceReply (void)
 
static const char * WalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
static uint32 recvOff = 0
 
static volatile sig_atomic_t got_SIGHUP = false
 
static volatile sig_atomic_t got_SIGTERM = false
 
struct {
   XLogRecPtr   Write
 
   XLogRecPtr   Flush
 
LogstreamResult
 
static StringInfoData reply_message
 
static StringInfoData incoming_message
 
static volatile bool WalRcvImmediateInterruptOK = false
 

Macro Definition Documentation

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */

Definition at line 82 of file walreceiver.c.

Referenced by WalReceiverMain().

Function Documentation

◆ DisableWalRcvImmediateExit()

static void DisableWalRcvImmediateExit ( void  )
static

Definition at line 180 of file walreceiver.c.

References ProcessWalRcvInterrupts(), and WalRcvImmediateInterruptOK.

Referenced by WalRcvFetchTimeLineHistoryFiles(), and WalReceiverMain().

181 {
184 }
static volatile bool WalRcvImmediateInterruptOK
Definition: walreceiver.c:130
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:154

◆ EnableWalRcvImmediateExit()

static void EnableWalRcvImmediateExit ( void  )
static

Definition at line 173 of file walreceiver.c.

References ProcessWalRcvInterrupts(), and WalRcvImmediateInterruptOK.

Referenced by WalRcvFetchTimeLineHistoryFiles(), and WalReceiverMain().

174 {
177 }
static volatile bool WalRcvImmediateInterruptOK
Definition: walreceiver.c:130
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:154

◆ pg_stat_get_wal_receiver()

Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1389 of file walreceiver.c.

References WalRcvData::conninfo, CStringGetTextDatum, DEFAULT_ROLE_READ_ALL_STATS, elog, ERROR, get_call_result_type(), GetUserId(), heap_form_tuple(), HeapTupleGetDatum, Int32GetDatum, is_member_of_role(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum, MAXCONNINFO, MemSet, WalRcvData::mutex, NAMEDATALEN, tupleDesc::natts, palloc0(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receivedUpto, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, strlcpy(), TimestampTzGetDatum, TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, and XLogRecPtrIsInvalid.

1390 {
1391  TupleDesc tupdesc;
1392  Datum *values;
1393  bool *nulls;
1394  int pid;
1395  bool ready_to_display;
1397  XLogRecPtr receive_start_lsn;
1398  TimeLineID receive_start_tli;
1399  XLogRecPtr received_lsn;
1400  TimeLineID received_tli;
1401  TimestampTz last_send_time;
1402  TimestampTz last_receipt_time;
1403  XLogRecPtr latest_end_lsn;
1404  TimestampTz latest_end_time;
1405  char slotname[NAMEDATALEN];
1406  char conninfo[MAXCONNINFO];
1407 
1408  /* Take a lock to ensure value consistency */
1410  pid = (int) WalRcv->pid;
1411  ready_to_display = WalRcv->ready_to_display;
1412  state = WalRcv->walRcvState;
1413  receive_start_lsn = WalRcv->receiveStart;
1414  receive_start_tli = WalRcv->receiveStartTLI;
1415  received_lsn = WalRcv->receivedUpto;
1416  received_tli = WalRcv->receivedTLI;
1417  last_send_time = WalRcv->lastMsgSendTime;
1418  last_receipt_time = WalRcv->lastMsgReceiptTime;
1419  latest_end_lsn = WalRcv->latestWalEnd;
1420  latest_end_time = WalRcv->latestWalEndTime;
1421  strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1422  strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1424 
1425  /*
1426  * No WAL receiver (or not ready yet), just return a tuple with NULL
1427  * values
1428  */
1429  if (pid == 0 || !ready_to_display)
1430  PG_RETURN_NULL();
1431 
1432  /* determine result type */
1433  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1434  elog(ERROR, "return type must be a row type");
1435 
1436  values = palloc0(sizeof(Datum) * tupdesc->natts);
1437  nulls = palloc0(sizeof(bool) * tupdesc->natts);
1438 
1439  /* Fetch values */
1440  values[0] = Int32GetDatum(pid);
1441 
1443  {
1444  /*
1445  * Only superusers can see details. Other users only get the pid value
1446  * to know whether it is a WAL receiver, but no details.
1447  */
1448  MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1449  }
1450  else
1451  {
1452  values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1453 
1454  if (XLogRecPtrIsInvalid(receive_start_lsn))
1455  nulls[2] = true;
1456  else
1457  values[2] = LSNGetDatum(receive_start_lsn);
1458  values[3] = Int32GetDatum(receive_start_tli);
1459  if (XLogRecPtrIsInvalid(received_lsn))
1460  nulls[4] = true;
1461  else
1462  values[4] = LSNGetDatum(received_lsn);
1463  values[5] = Int32GetDatum(received_tli);
1464  if (last_send_time == 0)
1465  nulls[6] = true;
1466  else
1467  values[6] = TimestampTzGetDatum(last_send_time);
1468  if (last_receipt_time == 0)
1469  nulls[7] = true;
1470  else
1471  values[7] = TimestampTzGetDatum(last_receipt_time);
1472  if (XLogRecPtrIsInvalid(latest_end_lsn))
1473  nulls[8] = true;
1474  else
1475  values[8] = LSNGetDatum(latest_end_lsn);
1476  if (latest_end_time == 0)
1477  nulls[9] = true;
1478  else
1479  values[9] = TimestampTzGetDatum(latest_end_time);
1480  if (*slotname == '\0')
1481  nulls[10] = true;
1482  else
1483  values[10] = CStringGetTextDatum(slotname);
1484  if (*conninfo == '\0')
1485  nulls[11] = true;
1486  else
1487  values[11] = CStringGetTextDatum(conninfo);
1488  }
1489 
1490  /* Returns the record as Datum */
1491  PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1492 }
static const char * WalRcvGetStateString(WalRcvState state)
Definition: walreceiver.c:1364
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
uint32 TimeLineID
Definition: xlogdefs.h:45
slock_t mutex
Definition: walreceiver.h:129
Oid GetUserId(void)
Definition: miscinit.c:284
int64 TimestampTz
Definition: timestamp.h:39
WalRcvState walRcvState
Definition: walreceiver.h:63
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
TimeLineID receivedTLI
Definition: walreceiver.h:83
#define MemSet(start, val, len)
Definition: c.h:853
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:695
int natts
Definition: tupdesc.h:79
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:97
TimestampTz lastMsgSendTime
Definition: walreceiver.h:96
WalRcvState
Definition: walreceiver.h:43
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
pid_t pid
Definition: walreceiver.h:62
#define MAXCONNINFO
Definition: walreceiver.h:35
XLogRecPtr receivedUpto
Definition: walreceiver.h:82
TimeLineID receiveStartTLI
Definition: walreceiver.h:73
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
void * palloc0(Size size)
Definition: mcxt.c:877
uintptr_t Datum
Definition: postgres.h:372
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:313
bool ready_to_display
Definition: walreceiver.h:118
TimestampTz latestWalEndTime
Definition: walreceiver.h:103
#define DEFAULT_ROLE_READ_ALL_STATS
Definition: pg_authid.h:108
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4854
XLogRecPtr latestWalEnd
Definition: walreceiver.h:102
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:230
WalRcvData * WalRcv
static Datum values[MAXATTR]
Definition: bootstrap.c:164
#define Int32GetDatum(X)
Definition: postgres.h:485
#define CStringGetTextDatum(s)
Definition: builtins.h:91
#define elog
Definition: elog.h:219
XLogRecPtr receiveStart
Definition: walreceiver.h:72
#define PG_RETURN_NULL()
Definition: fmgr.h:305
char slotname[NAMEDATALEN]
Definition: walreceiver.h:115
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:109

◆ ProcessWalRcvInterrupts()

static void ProcessWalRcvInterrupts ( void  )
static

Definition at line 154 of file walreceiver.c.

References CHECK_FOR_INTERRUPTS, ereport, errcode(), errmsg(), FATAL, got_SIGTERM, and WalRcvImmediateInterruptOK.

Referenced by DisableWalRcvImmediateExit(), EnableWalRcvImmediateExit(), WalRcvShutdownHandler(), WalRcvWaitForStartPosition(), and WalReceiverMain().

155 {
156  /*
157  * Although walreceiver interrupt handling doesn't use the same scheme as
158  * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
159  * any incoming signals on Win32.
160  */
162 
163  if (got_SIGTERM)
164  {
166  ereport(FATAL,
167  (errcode(ERRCODE_ADMIN_SHUTDOWN),
168  errmsg("terminating walreceiver process due to administrator command")));
169  }
170 }
static volatile bool WalRcvImmediateInterruptOK
Definition: walreceiver.c:130
int errcode(int sqlerrcode)
Definition: elog.c:575
#define FATAL
Definition: elog.h:52
#define ereport(elevel, rest)
Definition: elog.h:122
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
static volatile sig_atomic_t got_SIGTERM
Definition: walreceiver.c:99

◆ ProcessWalSndrMessage()

static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1293 of file walreceiver.c.

References DEBUG2, elog, GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, log_min_messages, WalRcvData::mutex, pfree(), pstrdup(), SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

1294 {
1295  WalRcvData *walrcv = WalRcv;
1296 
1297  TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1298 
1299  /* Update shared-memory status */
1300  SpinLockAcquire(&walrcv->mutex);
1301  if (walrcv->latestWalEnd < walEnd)
1302  walrcv->latestWalEndTime = sendTime;
1303  walrcv->latestWalEnd = walEnd;
1304  walrcv->lastMsgSendTime = sendTime;
1305  walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1306  SpinLockRelease(&walrcv->mutex);
1307 
1308  if (log_min_messages <= DEBUG2)
1309  {
1310  char *sendtime;
1311  char *receipttime;
1312  int applyDelay;
1313 
1314  /* Copy because timestamptz_to_str returns a static buffer */
1315  sendtime = pstrdup(timestamptz_to_str(sendTime));
1316  receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1317  applyDelay = GetReplicationApplyDelay();
1318 
1319  /* apply delay is not available */
1320  if (applyDelay == -1)
1321  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1322  sendtime,
1323  receipttime,
1325  else
1326  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1327  sendtime,
1328  receipttime,
1329  applyDelay,
1331 
1332  pfree(sendtime);
1333  pfree(receipttime);
1334  }
1335 }
slock_t mutex
Definition: walreceiver.h:129
int GetReplicationTransferLatency(void)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1076
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:97
TimestampTz lastMsgSendTime
Definition: walreceiver.h:96
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pfree(void *pointer)
Definition: mcxt.c:949
#define DEBUG2
Definition: elog.h:24
int GetReplicationApplyDelay(void)
#define SpinLockRelease(lock)
Definition: spin.h:64
TimestampTz latestWalEndTime
Definition: walreceiver.h:103
int log_min_messages
Definition: guc.c:452
XLogRecPtr latestWalEnd
Definition: walreceiver.h:102
WalRcvData * WalRcv
#define elog
Definition: elog.h:219
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1710

◆ WalRcvDie()

static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 774 of file walreceiver.c.

References Assert, WalRcvData::latch, WalRcvData::mutex, MyProcPid, WalRcvData::pid, WalRcvData::ready_to_display, SpinLockAcquire, SpinLockRelease, WakeupRecovery(), WalRcv, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

775 {
776  WalRcvData *walrcv = WalRcv;
777 
778  /* Ensure that all WAL records received are flushed to disk */
779  XLogWalRcvFlush(true);
780 
781  /* Mark ourselves inactive in shared memory */
782  SpinLockAcquire(&walrcv->mutex);
783  Assert(walrcv->walRcvState == WALRCV_STREAMING ||
784  walrcv->walRcvState == WALRCV_RESTARTING ||
785  walrcv->walRcvState == WALRCV_STARTING ||
786  walrcv->walRcvState == WALRCV_WAITING ||
787  walrcv->walRcvState == WALRCV_STOPPING);
788  Assert(walrcv->pid == MyProcPid);
789  walrcv->walRcvState = WALRCV_STOPPED;
790  walrcv->pid = 0;
791  walrcv->ready_to_display = false;
792  walrcv->latch = NULL;
793  SpinLockRelease(&walrcv->mutex);
794 
795  /* Terminate the connection gracefully. */
796  if (wrconn != NULL)
798 
799  /* Wake up the startup process to notice promptly that we're gone */
800  WakeupRecovery();
801 }
int MyProcPid
Definition: globals.c:39
slock_t mutex
Definition: walreceiver.h:129
WalRcvState walRcvState
Definition: walreceiver.h:63
static WalReceiverConn * wrconn
Definition: walreceiver.c:79
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12138
pid_t pid
Definition: walreceiver.h:62
Latch * latch
Definition: walreceiver.h:127
#define SpinLockRelease(lock)
Definition: spin.h:64
bool ready_to_display
Definition: walreceiver.h:118
#define Assert(condition)
Definition: c.h:670
#define walrcv_disconnect(conn)
Definition: walreceiver.h:265
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1052
WalRcvData * WalRcv

◆ WalRcvFetchTimeLineHistoryFiles()

static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 725 of file walreceiver.c.

References DisableWalRcvImmediateExit(), EnableWalRcvImmediateExit(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, existsTimeLineHistory(), LOG, MAXFNAMELEN, pfree(), TLHistoryFileName, walrcv_readtimelinehistoryfile, and writeTimeLineHistoryFile().

Referenced by WalReceiverMain().

726 {
727  TimeLineID tli;
728 
729  for (tli = first; tli <= last; tli++)
730  {
731  /* there's no history file for timeline 1 */
732  if (tli != 1 && !existsTimeLineHistory(tli))
733  {
734  char *fname;
735  char *content;
736  int len;
737  char expectedfname[MAXFNAMELEN];
738 
739  ereport(LOG,
740  (errmsg("fetching timeline history file for timeline %u from primary server",
741  tli)));
742 
744  walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
746 
747  /*
748  * Check that the filename on the master matches what we
749  * calculated ourselves. This is just a sanity check, it should
750  * always match.
751  */
752  TLHistoryFileName(expectedfname, tli);
753  if (strcmp(fname, expectedfname) != 0)
754  ereport(ERROR,
755  (errcode(ERRCODE_PROTOCOL_VIOLATION),
756  errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
757  tli)));
758 
759  /*
760  * Write the file to pg_wal.
761  */
762  writeTimeLineHistoryFile(tli, content, len);
763 
764  pfree(fname);
765  pfree(content);
766  }
767  }
768 }
uint32 TimeLineID
Definition: xlogdefs.h:45
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LOG
Definition: elog.h:26
static WalReceiverConn * wrconn
Definition: walreceiver.c:79
void pfree(void *pointer)
Definition: mcxt.c:949
#define ERROR
Definition: elog.h:43
#define TLHistoryFileName(fname, tli)
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition: timeline.c:447
static void DisableWalRcvImmediateExit(void)
Definition: walreceiver.c:180
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition: timeline.c:207
#define ereport(elevel, rest)
Definition: elog.h:122
#define MAXFNAMELEN
static void EnableWalRcvImmediateExit(void)
Definition: walreceiver.c:173
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
Definition: walreceiver.h:251
int errmsg(const char *fmt,...)
Definition: elog.c:797

◆ WalRcvForceReply()

void WalRcvForceReply ( void  )

Definition at line 1346 of file walreceiver.c.

References WalRcvData::force_reply, WalRcvData::latch, WalRcvData::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by StartupXLOG(), WaitForWALToBecomeAvailable(), and walrcv_clear_result().

1347 {
1348  Latch *latch;
1349 
1350  WalRcv->force_reply = true;
1351  /* fetching the latch pointer might not be atomic, so use spinlock */
1353  latch = WalRcv->latch;
1355  if (latch)
1356  SetLatch(latch);
1357 }
slock_t mutex
Definition: walreceiver.h:129
sig_atomic_t force_reply
Definition: walreceiver.h:136
#define SpinLockAcquire(lock)
Definition: spin.h:62
Definition: latch.h:110
Latch * latch
Definition: walreceiver.h:127
#define SpinLockRelease(lock)
Definition: spin.h:64
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
WalRcvData * WalRcv

◆ WalRcvGetStateString()

static const char* WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1364 of file walreceiver.c.

References WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

1365 {
1366  switch (state)
1367  {
1368  case WALRCV_STOPPED:
1369  return "stopped";
1370  case WALRCV_STARTING:
1371  return "starting";
1372  case WALRCV_STREAMING:
1373  return "streaming";
1374  case WALRCV_WAITING:
1375  return "waiting";
1376  case WALRCV_RESTARTING:
1377  return "restarting";
1378  case WALRCV_STOPPING:
1379  return "stopping";
1380  }
1381  return "UNKNOWN";
1382 }
Definition: regguts.h:298

◆ WalRcvQuickDieHandler()

static void WalRcvQuickDieHandler ( SIGNAL_ARGS  )
static

Definition at line 847 of file walreceiver.c.

References BlockSig, on_exit_reset(), and PG_SETMASK.

Referenced by WalReceiverMain().

848 {
850 
851  /*
852  * We DO NOT want to run proc_exit() callbacks -- we're here because
853  * shared memory may be corrupted, so we don't want to try to clean up our
854  * transaction. Just nail the windows shut and get out of town. Now that
855  * there's an atexit callback to prevent third-party code from breaking
856  * things by calling exit() directly, we have to reset the callbacks
857  * explicitly to make this work as intended.
858  */
859  on_exit_reset();
860 
861  /*
862  * Note we do exit(2) not exit(0). This is to force the postmaster into a
863  * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
864  * backend. This is necessary precisely because we don't clean up our
865  * shared memory state. (The "dead man switch" mechanism in pmsignal.c
866  * should ensure the postmaster sees this as a crash, too, but no harm in
867  * being doubly sure.)
868  */
869  exit(2);
870 }
#define PG_SETMASK(mask)
Definition: pqsignal.h:19
void on_exit_reset(void)
Definition: ipc.c:396
sigset_t BlockSig
Definition: pqsignal.c:22

◆ WalRcvShutdownHandler()

static void WalRcvShutdownHandler ( SIGNAL_ARGS  )
static

Definition at line 824 of file walreceiver.c.

References got_SIGTERM, WalRcvData::latch, proc_exit_inprogress, ProcessWalRcvInterrupts(), SetLatch(), WalRcv, and WalRcvImmediateInterruptOK.

Referenced by WalReceiverMain().

825 {
826  int save_errno = errno;
827 
828  got_SIGTERM = true;
829 
830  if (WalRcv->latch)
832 
833  /* Don't joggle the elbow of proc_exit */
836 
837  errno = save_errno;
838 }
static volatile bool WalRcvImmediateInterruptOK
Definition: walreceiver.c:130
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:154
Latch * latch
Definition: walreceiver.h:127
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
bool proc_exit_inprogress
Definition: ipc.c:40
WalRcvData * WalRcv
static volatile sig_atomic_t got_SIGTERM
Definition: walreceiver.c:99

◆ WalRcvSigHupHandler()

static void WalRcvSigHupHandler ( SIGNAL_ARGS  )
static

Definition at line 805 of file walreceiver.c.

References got_SIGHUP.

Referenced by WalReceiverMain().

806 {
807  got_SIGHUP = true;
808 }
static volatile sig_atomic_t got_SIGHUP
Definition: walreceiver.c:98

◆ WalRcvSigUsr1Handler()

static void WalRcvSigUsr1Handler ( SIGNAL_ARGS  )
static

Definition at line 813 of file walreceiver.c.

References latch_sigusr1_handler().

Referenced by WalReceiverMain().

814 {
815  int save_errno = errno;
816 
818 
819  errno = save_errno;
820 }
void latch_sigusr1_handler(void)
Definition: latch.c:1473

◆ WalRcvWaitForStartPosition()

static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 640 of file walreceiver.c.

References Assert, elog, FATAL, InvalidXLogRecPtr, WalRcvData::latch, WalRcvData::mutex, PostmasterIsAlive(), proc_exit(), ProcessWalRcvInterrupts(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, ResetLatch(), set_ps_display(), snprintf(), SpinLockAcquire, SpinLockRelease, update_process_title, WAIT_EVENT_WAL_RECEIVER_WAIT_START, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WL_LATCH_SET, and WL_POSTMASTER_DEATH.

Referenced by WalReceiverMain().

641 {
642  WalRcvData *walrcv = WalRcv;
643  int state;
644 
645  SpinLockAcquire(&walrcv->mutex);
646  state = walrcv->walRcvState;
647  if (state != WALRCV_STREAMING)
648  {
649  SpinLockRelease(&walrcv->mutex);
650  if (state == WALRCV_STOPPING)
651  proc_exit(0);
652  else
653  elog(FATAL, "unexpected walreceiver state");
654  }
655  walrcv->walRcvState = WALRCV_WAITING;
657  walrcv->receiveStartTLI = 0;
658  SpinLockRelease(&walrcv->mutex);
659 
661  set_ps_display("idle", false);
662 
663  /*
664  * nudge startup process to notice that we've stopped streaming and are
665  * now waiting for instructions.
666  */
667  WakeupRecovery();
668  for (;;)
669  {
670  ResetLatch(walrcv->latch);
671 
672  /*
673  * Emergency bailout if postmaster has died. This is to avoid the
674  * necessity for manual cleanup of all postmaster children.
675  */
676  if (!PostmasterIsAlive())
677  exit(1);
678 
680 
681  SpinLockAcquire(&walrcv->mutex);
682  Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
683  walrcv->walRcvState == WALRCV_WAITING ||
684  walrcv->walRcvState == WALRCV_STOPPING);
685  if (walrcv->walRcvState == WALRCV_RESTARTING)
686  {
687  /* we don't expect primary_conninfo to change */
688  *startpoint = walrcv->receiveStart;
689  *startpointTLI = walrcv->receiveStartTLI;
690  walrcv->walRcvState = WALRCV_STREAMING;
691  SpinLockRelease(&walrcv->mutex);
692  break;
693  }
694  if (walrcv->walRcvState == WALRCV_STOPPING)
695  {
696  /*
697  * We should've received SIGTERM if the startup process wants us
698  * to die, but might as well check it here too.
699  */
700  SpinLockRelease(&walrcv->mutex);
701  exit(1);
702  }
703  SpinLockRelease(&walrcv->mutex);
704 
707  }
708 
710  {
711  char activitymsg[50];
712 
713  snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
714  (uint32) (*startpoint >> 32),
715  (uint32) *startpoint);
716  set_ps_display(activitymsg, false);
717  }
718 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
slock_t mutex
Definition: walreceiver.h:129
bool update_process_title
Definition: ps_status.c:35
WalRcvState walRcvState
Definition: walreceiver.h:63
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:154
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:326
void proc_exit(int code)
Definition: ipc.c:99
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void ResetLatch(volatile Latch *latch)
Definition: latch.c:497
#define SpinLockAcquire(lock)
Definition: spin.h:62
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:336
void WakeupRecovery(void)
Definition: xlog.c:12138
#define FATAL
Definition: elog.h:52
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
unsigned int uint32
Definition: c.h:296
TimeLineID receiveStartTLI
Definition: walreceiver.h:73
Latch * latch
Definition: walreceiver.h:127
#define SpinLockRelease(lock)
Definition: spin.h:64
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
#define Assert(condition)
Definition: c.h:670
Definition: regguts.h:298
WalRcvData * WalRcv
#define elog
Definition: elog.h:219
XLogRecPtr receiveStart
Definition: walreceiver.h:72
#define WL_LATCH_SET
Definition: latch.h:124

◆ WalReceiverMain()

void WalReceiverMain ( void  )

Definition at line 188 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, Assert, BlockSig, buf, close, WalRcvData::conninfo, CurrentResourceOwner, DEBUG1, DisableWalRcvImmediateExit(), elog, EnableWalRcvImmediateExit(), ereport, errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, WalRcvData::force_reply, GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), got_SIGHUP, initStringInfo(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latch, WalRcvData::latestWalEndTime, load_file(), LOG, WalRcvStreamOptions::logical, LogstreamResult, MAXCONNINFO, MAXFNAMELEN, WalRcvData::mutex, MyProc, MyProcPid, NAMEDATALEN, NAPTIME_PER_CYCLE, now(), on_shmem_exit(), options, PANIC, pfree(), pg_memory_barrier, PG_SETMASK, PGC_SIGHUP, PGINVALID_SOCKET, WalRcvStreamOptions::physical, WalRcvData::pid, pqsignal(), proc_exit(), ProcessConfigFile(), ProcessWalRcvInterrupts(), PGPROC::procLatch, WalRcvStreamOptions::proto, WalRcvData::ready_to_display, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, ResetLatch(), ResourceOwnerCreate(), server_version, SIG_DFL, SIG_IGN, SIGALRM, SIGCHLD, SIGCONT, SIGHUP, SIGPIPE, SIGQUIT, SIGTTIN, SIGTTOU, SIGUSR1, SIGUSR2, SIGWINCH, WalRcvData::slotname, WalRcvStreamOptions::slotname, snprintf(), SpinLockAcquire, SpinLockRelease, WalRcvStreamOptions::startpoint, strlcpy(), ThisTimeLineID, TimestampTzPlusMilliseconds, UINT64_FORMAT, UnBlockSig, WAIT_EVENT_WAL_RECEIVER_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, wal_segment_size, WalRcv, walrcv_connect, walrcv_endstreaming, walrcv_get_conninfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvQuickDieHandler(), WalRcvShutdownHandler(), WalRcvSigHupHandler(), WalRcvSigUsr1Handler(), WalRcvData::walRcvState, WalRcvWaitForStartPosition(), WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_TIMEOUT, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName, XLogFileNameP(), XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by AuxiliaryProcessMain(), and walrcv_clear_result().

189 {
190  char conninfo[MAXCONNINFO];
191  char *tmp_conninfo;
192  char slotname[NAMEDATALEN];
193  XLogRecPtr startpoint;
194  TimeLineID startpointTLI;
195  TimeLineID primaryTLI;
196  bool first_stream;
197  WalRcvData *walrcv = WalRcv;
198  TimestampTz last_recv_timestamp;
200  bool ping_sent;
201  char *err;
202 
203  /*
204  * WalRcv should be set up already (if we are a backend, we inherit this
205  * by fork() or EXEC_BACKEND mechanism from the postmaster).
206  */
207  Assert(walrcv != NULL);
208 
209  now = GetCurrentTimestamp();
210 
211  /*
212  * Mark walreceiver as running in shared memory.
213  *
214  * Do this as early as possible, so that if we fail later on, we'll set
215  * state to STOPPED. If we die before this, the startup process will keep
216  * waiting for us to start up, until it times out.
217  */
218  SpinLockAcquire(&walrcv->mutex);
219  Assert(walrcv->pid == 0);
220  switch (walrcv->walRcvState)
221  {
222  case WALRCV_STOPPING:
223  /* If we've already been requested to stop, don't start up. */
224  walrcv->walRcvState = WALRCV_STOPPED;
225  /* fall through */
226 
227  case WALRCV_STOPPED:
228  SpinLockRelease(&walrcv->mutex);
229  proc_exit(1);
230  break;
231 
232  case WALRCV_STARTING:
233  /* The usual case */
234  break;
235 
236  case WALRCV_WAITING:
237  case WALRCV_STREAMING:
238  case WALRCV_RESTARTING:
239  default:
240  /* Shouldn't happen */
241  SpinLockRelease(&walrcv->mutex);
242  elog(PANIC, "walreceiver still running according to shared memory state");
243  }
244  /* Advertise our PID so that the startup process can kill us */
245  walrcv->pid = MyProcPid;
246  walrcv->walRcvState = WALRCV_STREAMING;
247 
248  /* Fetch information required to start streaming */
249  walrcv->ready_to_display = false;
250  strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
251  strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
252  startpoint = walrcv->receiveStart;
253  startpointTLI = walrcv->receiveStartTLI;
254 
255  /* Initialise to a sanish value */
256  walrcv->lastMsgSendTime =
257  walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
258 
259  /* Report the latch to use to awaken this process */
260  walrcv->latch = &MyProc->procLatch;
261 
262  SpinLockRelease(&walrcv->mutex);
263 
264  /* Arrange to clean up at walreceiver exit */
266 
267  /* Properly accept or ignore signals the postmaster might send us */
268  pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */
269  pqsignal(SIGINT, SIG_IGN);
270  pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */
271  pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */
276 
277  /* Reset some signals that are accepted by postmaster but not here */
283 
284  /* We allow SIGQUIT (quickdie) at all times */
285  sigdelset(&BlockSig, SIGQUIT);
286 
287  /* Load the libpq-specific functions */
288  load_file("libpqwalreceiver", false);
289  if (WalReceiverFunctions == NULL)
290  elog(ERROR, "libpqwalreceiver didn't initialize correctly");
291 
292  /*
293  * Create a resource owner to keep track of our resources (not clear that
294  * we need this, but may as well have one).
295  */
296  CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
297 
298  /* Unblock signals (they were blocked when the postmaster forked us) */
300 
301  /* Establish the connection to the primary for XLOG streaming */
303  wrconn = walrcv_connect(conninfo, false, "walreceiver", &err);
304  if (!wrconn)
305  ereport(ERROR,
306  (errmsg("could not connect to the primary server: %s", err)));
308 
309  /*
310  * Save user-visible connection string. This clobbers the original
311  * conninfo, for security.
312  */
313  tmp_conninfo = walrcv_get_conninfo(wrconn);
314  SpinLockAcquire(&walrcv->mutex);
315  memset(walrcv->conninfo, 0, MAXCONNINFO);
316  if (tmp_conninfo)
317  strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
318  walrcv->ready_to_display = true;
319  SpinLockRelease(&walrcv->mutex);
320 
321  if (tmp_conninfo)
322  pfree(tmp_conninfo);
323 
324  first_stream = true;
325  for (;;)
326  {
327  char *primary_sysid;
328  char standby_sysid[32];
329  int server_version;
331 
332  /*
333  * Check that we're connected to a valid server using the
334  * IDENTIFY_SYSTEM replication command.
335  */
337  primary_sysid = walrcv_identify_system(wrconn, &primaryTLI,
338  &server_version);
339 
340  snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
342  if (strcmp(primary_sysid, standby_sysid) != 0)
343  {
344  ereport(ERROR,
345  (errmsg("database system identifier differs between the primary and standby"),
346  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
347  primary_sysid, standby_sysid)));
348  }
350 
351  /*
352  * Confirm that the current timeline of the primary is the same or
353  * ahead of ours.
354  */
355  if (primaryTLI < startpointTLI)
356  ereport(ERROR,
357  (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
358  primaryTLI, startpointTLI)));
359 
360  /*
361  * Get any missing history files. We do this always, even when we're
362  * not interested in that timeline, so that if we're promoted to
363  * become the master later on, we don't select the same timeline that
364  * was already used in the current master. This isn't bullet-proof -
365  * you'll need some external software to manage your cluster if you
366  * need to ensure that a unique timeline id is chosen in every case,
367  * but let's avoid the confusion of timeline id collisions where we
368  * can.
369  */
370  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
371 
372  /*
373  * Start streaming.
374  *
375  * We'll try to start at the requested starting point and timeline,
376  * even if it's different from the server's latest timeline. In case
377  * we've already reached the end of the old timeline, the server will
378  * finish the streaming immediately, and we will go back to await
379  * orders from the startup process. If recovery_target_timeline is
380  * 'latest', the startup process will scan pg_wal and find the new
381  * history file, bump recovery target timeline, and ask us to restart
382  * on the new timeline.
383  */
384  options.logical = false;
385  options.startpoint = startpoint;
386  options.slotname = slotname[0] != '\0' ? slotname : NULL;
387  options.proto.physical.startpointTLI = startpointTLI;
388  ThisTimeLineID = startpointTLI;
389  if (walrcv_startstreaming(wrconn, &options))
390  {
391  if (first_stream)
392  ereport(LOG,
393  (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
394  (uint32) (startpoint >> 32), (uint32) startpoint,
395  startpointTLI)));
396  else
397  ereport(LOG,
398  (errmsg("restarted WAL streaming at %X/%X on timeline %u",
399  (uint32) (startpoint >> 32), (uint32) startpoint,
400  startpointTLI)));
401  first_stream = false;
402 
403  /* Initialize LogstreamResult and buffers for processing messages */
407 
408  /* Initialize the last recv timestamp */
409  last_recv_timestamp = GetCurrentTimestamp();
410  ping_sent = false;
411 
412  /* Loop until end-of-streaming or error */
413  for (;;)
414  {
415  char *buf;
416  int len;
417  bool endofwal = false;
418  pgsocket wait_fd = PGINVALID_SOCKET;
419  int rc;
420 
421  /*
422  * Exit walreceiver if we're not in recovery. This should not
423  * happen, but cross-check the status here.
424  */
425  if (!RecoveryInProgress())
426  ereport(FATAL,
427  (errmsg("cannot continue WAL streaming, recovery has already ended")));
428 
429  /* Process any requests or signals received recently */
431 
432  if (got_SIGHUP)
433  {
434  got_SIGHUP = false;
437  }
438 
439  /* See if we can read data immediately */
440  len = walrcv_receive(wrconn, &buf, &wait_fd);
441  if (len != 0)
442  {
443  /*
444  * Process the received data, and any subsequent data we
445  * can read without blocking.
446  */
447  for (;;)
448  {
449  if (len > 0)
450  {
451  /*
452  * Something was received from master, so reset
453  * timeout
454  */
455  last_recv_timestamp = GetCurrentTimestamp();
456  ping_sent = false;
457  XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
458  }
459  else if (len == 0)
460  break;
461  else if (len < 0)
462  {
463  ereport(LOG,
464  (errmsg("replication terminated by primary server"),
465  errdetail("End of WAL reached on timeline %u at %X/%X.",
466  startpointTLI,
467  (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
468  endofwal = true;
469  break;
470  }
471  len = walrcv_receive(wrconn, &buf, &wait_fd);
472  }
473 
474  /* Let the master know that we received some data. */
475  XLogWalRcvSendReply(false, false);
476 
477  /*
478  * If we've written some records, flush them to disk and
479  * let the startup process and primary server know about
480  * them.
481  */
482  XLogWalRcvFlush(false);
483  }
484 
485  /* Check if we need to exit the streaming loop. */
486  if (endofwal)
487  break;
488 
489  /*
490  * Ideally we would reuse a WaitEventSet object repeatedly
491  * here to avoid the overheads of WaitLatchOrSocket on epoll
492  * systems, but we can't be sure that libpq (or any other
493  * walreceiver implementation) has the same socket (even if
494  * the fd is the same number, it may have been closed and
495  * reopened since the last time). In future, if there is a
496  * function for removing sockets from WaitEventSet, then we
497  * could add and remove just the socket each time, potentially
498  * avoiding some system calls.
499  */
500  Assert(wait_fd != PGINVALID_SOCKET);
501  rc = WaitLatchOrSocket(walrcv->latch,
504  wait_fd,
507  if (rc & WL_LATCH_SET)
508  {
509  ResetLatch(walrcv->latch);
510  if (walrcv->force_reply)
511  {
512  /*
513  * The recovery process has asked us to send apply
514  * feedback now. Make sure the flag is really set to
515  * false in shared memory before sending the reply, so
516  * we don't miss a new request for a reply.
517  */
518  walrcv->force_reply = false;
520  XLogWalRcvSendReply(true, false);
521  }
522  }
523  if (rc & WL_POSTMASTER_DEATH)
524  {
525  /*
526  * Emergency bailout if postmaster has died. This is to
527  * avoid the necessity for manual cleanup of all
528  * postmaster children.
529  */
530  exit(1);
531  }
532  if (rc & WL_TIMEOUT)
533  {
534  /*
535  * We didn't receive anything new. If we haven't heard
536  * anything from the server for more than
537  * wal_receiver_timeout / 2, ping the server. Also, if
538  * it's been longer than wal_receiver_status_interval
539  * since the last update we sent, send a status update to
540  * the master anyway, to report any progress in applying
541  * WAL.
542  */
543  bool requestReply = false;
544 
545  /*
546  * Check if time since last receive from standby has
547  * reached the configured limit.
548  */
549  if (wal_receiver_timeout > 0)
550  {
552  TimestampTz timeout;
553 
554  timeout =
555  TimestampTzPlusMilliseconds(last_recv_timestamp,
557 
558  if (now >= timeout)
559  ereport(ERROR,
560  (errmsg("terminating walreceiver due to timeout")));
561 
562  /*
563  * We didn't receive anything new, for half of
564  * receiver replication timeout. Ping the server.
565  */
566  if (!ping_sent)
567  {
568  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
569  (wal_receiver_timeout / 2));
570  if (now >= timeout)
571  {
572  requestReply = true;
573  ping_sent = true;
574  }
575  }
576  }
577 
578  XLogWalRcvSendReply(requestReply, requestReply);
580  }
581  }
582 
583  /*
584  * The backend finished streaming. Exit streaming COPY-mode from
585  * our side, too.
586  */
588  walrcv_endstreaming(wrconn, &primaryTLI);
590 
591  /*
592  * If the server had switched to a new timeline that we didn't
593  * know about when we began streaming, fetch its timeline history
594  * file now.
595  */
596  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
597  }
598  else
599  ereport(LOG,
600  (errmsg("primary server contains no more WAL on requested timeline %u",
601  startpointTLI)));
602 
603  /*
604  * End of WAL reached on the requested timeline. Close the last
605  * segment, and await for new orders from the startup process.
606  */
607  if (recvFile >= 0)
608  {
609  char xlogfname[MAXFNAMELEN];
610 
611  XLogWalRcvFlush(false);
612  if (close(recvFile) != 0)
613  ereport(PANIC,
615  errmsg("could not close log segment %s: %m",
617 
618  /*
619  * Create .done file forcibly to prevent the streamed segment from
620  * being archived later.
621  */
624  XLogArchiveForceDone(xlogfname);
625  else
626  XLogArchiveNotify(xlogfname);
627  }
628  recvFile = -1;
629 
630  elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
631  WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
632  }
633  /* not reached */
634 }
static struct @25 LogstreamResult
#define SIGQUIT
Definition: win32_port.h:164
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
Definition: walreceiver.c:725
#define walrcv_get_conninfo(conn)
Definition: walreceiver.h:247
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:255
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:39
uint32 TimeLineID
Definition: xlogdefs.h:45
#define SIGTTOU
Definition: win32_port.h:175
slock_t mutex
Definition: walreceiver.h:129
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
int wal_segment_size
Definition: xlog.c:113
#define SIGTTIN
Definition: win32_port.h:174
#define SIGUSR1
Definition: win32_port.h:177
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
#define SIGCHLD
Definition: win32_port.h:173
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
Definition: walreceiver.c:640
sig_atomic_t force_reply
Definition: walreceiver.h:136
#define SIGWINCH
Definition: win32_port.h:176
#define SIGCONT
Definition: win32_port.h:172
WalRcvState walRcvState
Definition: walreceiver.h:63
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
static StringInfoData incoming_message
Definition: walreceiver.c:112
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:154
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1180
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:257
static void WalRcvSigHupHandler(SIGNAL_ARGS)
Definition: walreceiver.c:805
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:253
void proc_exit(int code)
Definition: ipc.c:99
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define WL_SOCKET_READABLE
Definition: latch.h:125
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
Definition: walreceiver.c:876
#define SIGPIPE
Definition: win32_port.h:168
#define SIGUSR2
Definition: win32_port.h:178
void ResetLatch(volatile Latch *latch)
Definition: latch.c:497
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7929
#define PANIC
Definition: elog.h:53
#define NAPTIME_PER_CYCLE
Definition: walreceiver.c:82
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:97
TimestampTz lastMsgSendTime
Definition: walreceiver.h:96
#define PG_SETMASK(mask)
Definition: pqsignal.h:19
int wal_receiver_timeout
Definition: walreceiver.c:75
Latch procLatch
Definition: proc.h:104
static WalReceiverConn * wrconn
Definition: walreceiver.c:79
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
static StringInfoData reply_message
Definition: walreceiver.c:111
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:514
void pfree(void *pointer)
Definition: mcxt.c:949
#define ERROR
Definition: elog.h:43
static void WalRcvSigUsr1Handler(SIGNAL_ARGS)
Definition: walreceiver.c:813
#define FATAL
Definition: elog.h:52
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11128
pid_t pid
Definition: walreceiver.h:62
#define MAXCONNINFO
Definition: walreceiver.h:35
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
#define walrcv_identify_system(conn, primary_tli, server_version)
Definition: walreceiver.h:249
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10166
static char * buf
Definition: pg_test_fsync.c:67
int XLogArchiveMode
Definition: xlog.c:94
int errdetail(const char *fmt,...)
Definition: elog.c:873
int errcode_for_file_access(void)
Definition: elog.c:598
#define SIGHUP
Definition: win32_port.h:163
static void DisableWalRcvImmediateExit(void)
Definition: walreceiver.c:180
XLogRecPtr startpoint
Definition: walreceiver.h:146
unsigned int uint32
Definition: c.h:296
int pgsocket
Definition: port.h:31
sigset_t UnBlockSig
Definition: pqsignal.c:22
static volatile sig_atomic_t got_SIGHUP
Definition: walreceiver.c:98
#define ereport(elevel, rest)
Definition: elog.h:122
TimeLineID receiveStartTLI
Definition: walreceiver.h:73
static TimeLineID recvFileTLI
Definition: walreceiver.c:90
Definition: guc.h:72
Latch * latch
Definition: walreceiver.h:127
#define SIG_IGN
Definition: win32_port.h:160
static char ** options
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
static XLogSegNo recvSegNo
Definition: walreceiver.c:91
#define MAXFNAMELEN
union WalRcvStreamOptions::@97 proto
#define SpinLockRelease(lock)
Definition: spin.h:64
sigset_t BlockSig
Definition: pqsignal.c:22
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void EnableWalRcvImmediateExit(void)
Definition: walreceiver.c:173
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:564
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:80
#define PGINVALID_SOCKET
Definition: port.h:33
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1111
bool ready_to_display
Definition: walreceiver.h:118
TimestampTz latestWalEndTime
Definition: walreceiver.h:103
static void WalRcvShutdownHandler(SIGNAL_ARGS)
Definition: walreceiver.c:824
TimeLineID ThisTimeLineID
Definition: xlog.c:181
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define pg_memory_barrier()
Definition: atomics.h:148
#define SIG_DFL
Definition: win32_port.h:158
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:670
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:137
static int server_version
Definition: pg_dumpall.c:82
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
#define SIGALRM
Definition: win32_port.h:169
static void WalRcvQuickDieHandler(SIGNAL_ARGS)
Definition: walreceiver.c:847
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1052
WalRcvData * WalRcv
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4718
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void WalRcvDie(int code, Datum arg)
Definition: walreceiver.c:774
struct WalRcvStreamOptions::@97::@98 physical
#define elog
Definition: elog.h:219
#define close(a)
Definition: win32.h:12
XLogRecPtr receiveStart
Definition: walreceiver.h:72
static int recvFile
Definition: walreceiver.c:89
#define WL_LATCH_SET
Definition: latch.h:124
#define UINT64_FORMAT
Definition: c.h:339
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
char slotname[NAMEDATALEN]
Definition: walreceiver.h:115
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:109
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:243

◆ XLogWalRcvFlush()

static void XLogWalRcvFlush ( bool  dying)
static

Definition at line 1052 of file walreceiver.c.

References AllowCascadeReplication, issue_xlog_fsync(), WalRcvData::latestChunkStart, LogstreamResult, WalRcvData::mutex, WalRcvData::receivedTLI, WalRcvData::receivedUpto, recvFile, recvSegNo, set_ps_display(), snprintf(), SpinLockAcquire, SpinLockRelease, ThisTimeLineID, update_process_title, WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvWrite().

1053 {
1054  if (LogstreamResult.Flush < LogstreamResult.Write)
1055  {
1056  WalRcvData *walrcv = WalRcv;
1057 
1059 
1060  LogstreamResult.Flush = LogstreamResult.Write;
1061 
1062  /* Update shared-memory status */
1063  SpinLockAcquire(&walrcv->mutex);
1064  if (walrcv->receivedUpto < LogstreamResult.Flush)
1065  {
1066  walrcv->latestChunkStart = walrcv->receivedUpto;
1067  walrcv->receivedUpto = LogstreamResult.Flush;
1068  walrcv->receivedTLI = ThisTimeLineID;
1069  }
1070  SpinLockRelease(&walrcv->mutex);
1071 
1072  /* Signal the startup process and walsender that new WAL has arrived */
1073  WakeupRecovery();
1075  WalSndWakeup();
1076 
1077  /* Report XLOG streaming progress in PS display */
1079  {
1080  char activitymsg[50];
1081 
1082  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1083  (uint32) (LogstreamResult.Write >> 32),
1084  (uint32) LogstreamResult.Write);
1085  set_ps_display(activitymsg, false);
1086  }
1087 
1088  /* Also let the master know that we made some progress */
1089  if (!dying)
1090  {
1091  XLogWalRcvSendReply(false, false);
1092  XLogWalRcvSendHSFeedback(false);
1093  }
1094  }
1095 }
static struct @25 LogstreamResult
slock_t mutex
Definition: walreceiver.h:129
bool update_process_title
Definition: ps_status.c:35
void issue_xlog_fsync(int fd, XLogSegNo segno)
Definition: xlog.c:10123
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1180
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:326
TimeLineID receivedTLI
Definition: walreceiver.h:83
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12138
XLogRecPtr latestChunkStart
Definition: walreceiver.h:91
#define AllowCascadeReplication()
Definition: walreceiver.h:38
unsigned int uint32
Definition: c.h:296
XLogRecPtr receivedUpto
Definition: walreceiver.h:82
static XLogSegNo recvSegNo
Definition: walreceiver.c:91
#define SpinLockRelease(lock)
Definition: spin.h:64
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1111
TimeLineID ThisTimeLineID
Definition: xlog.c:181
WalRcvData * WalRcv
static int recvFile
Definition: walreceiver.c:89
void WalSndWakeup(void)
Definition: walsender.c:3004

◆ XLogWalRcvProcessMsg()

static void XLogWalRcvProcessMsg ( unsigned char  type,
char *  buf,
Size  len 
)
static

Definition at line 876 of file walreceiver.c.

References appendBinaryStringInfo(), ereport, errcode(), errmsg_internal(), ERROR, pq_getmsgbyte(), pq_getmsgint64(), ProcessWalSndrMessage(), resetStringInfo(), XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

877 {
878  int hdrlen;
879  XLogRecPtr dataStart;
880  XLogRecPtr walEnd;
881  TimestampTz sendTime;
882  bool replyRequested;
883 
885 
886  switch (type)
887  {
888  case 'w': /* WAL records */
889  {
890  /* copy message to StringInfo */
891  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
892  if (len < hdrlen)
893  ereport(ERROR,
894  (errcode(ERRCODE_PROTOCOL_VIOLATION),
895  errmsg_internal("invalid WAL message received from primary")));
897 
898  /* read the fields */
899  dataStart = pq_getmsgint64(&incoming_message);
900  walEnd = pq_getmsgint64(&incoming_message);
901  sendTime = pq_getmsgint64(&incoming_message);
902  ProcessWalSndrMessage(walEnd, sendTime);
903 
904  buf += hdrlen;
905  len -= hdrlen;
906  XLogWalRcvWrite(buf, len, dataStart);
907  break;
908  }
909  case 'k': /* Keepalive */
910  {
911  /* copy message to StringInfo */
912  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
913  if (len != hdrlen)
914  ereport(ERROR,
915  (errcode(ERRCODE_PROTOCOL_VIOLATION),
916  errmsg_internal("invalid keepalive message received from primary")));
918 
919  /* read the fields */
920  walEnd = pq_getmsgint64(&incoming_message);
921  sendTime = pq_getmsgint64(&incoming_message);
922  replyRequested = pq_getmsgbyte(&incoming_message);
923 
924  ProcessWalSndrMessage(walEnd, sendTime);
925 
926  /* If the primary requested a reply, send one immediately */
927  if (replyRequested)
928  XLogWalRcvSendReply(true, false);
929  break;
930  }
931  default:
932  ereport(ERROR,
933  (errcode(ERRCODE_PROTOCOL_VIOLATION),
934  errmsg_internal("invalid replication message type %d",
935  type)));
936  }
937 }
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
Definition: walreceiver.c:1293
int64 TimestampTz
Definition: timestamp.h:39
static StringInfoData incoming_message
Definition: walreceiver.c:112
int errcode(int sqlerrcode)
Definition: elog.c:575
#define ERROR
Definition: elog.h:43
static char * buf
Definition: pg_test_fsync.c:67
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
#define ereport(elevel, rest)
Definition: elog.h:122
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1111
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:208
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
Definition: walreceiver.c:943

◆ XLogWalRcvSendHSFeedback()

static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1180 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), GetNextXidAndEpoch(), GetOldestXmin(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), pq_sendbyte(), pq_sendint32(), pq_sendint64(), PROCARRAY_FLAGS_DEFAULT, PROCARRAY_SLOTS_XMIN, ProcArrayGetReplicationSlotXmin(), resetStringInfo(), TimestampDifferenceExceeds(), TransactionIdIsValid, TransactionIdPrecedes(), wal_receiver_status_interval, and walrcv_send.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

1181 {
1182  TimestampTz now;
1183  TransactionId nextXid;
1184  uint32 xmin_epoch,
1185  catalog_xmin_epoch;
1186  TransactionId xmin,
1187  catalog_xmin;
1188  static TimestampTz sendTime = 0;
1189 
1190  /* initially true so we always send at least one feedback message */
1191  static bool master_has_standby_xmin = true;
1192 
1193  /*
1194  * If the user doesn't want status to be reported to the master, be sure
1195  * to exit before doing anything at all.
1196  */
1198  !master_has_standby_xmin)
1199  return;
1200 
1201  /* Get current timestamp. */
1202  now = GetCurrentTimestamp();
1203 
1204  if (!immed)
1205  {
1206  /*
1207  * Send feedback at most once per wal_receiver_status_interval.
1208  */
1209  if (!TimestampDifferenceExceeds(sendTime, now,
1211  return;
1212  sendTime = now;
1213  }
1214 
1215  /*
1216  * If Hot Standby is not yet accepting connections there is nothing to
1217  * send. Check this after the interval has expired to reduce number of
1218  * calls.
1219  *
1220  * Bailing out here also ensures that we don't send feedback until we've
1221  * read our own replication slot state, so we don't tell the master to
1222  * discard needed xmin or catalog_xmin from any slots that may exist on
1223  * this replica.
1224  */
1225  if (!HotStandbyActive())
1226  return;
1227 
1228  /*
1229  * Make the expensive call to get the oldest xmin once we are certain
1230  * everything else has been checked.
1231  */
1233  {
1234  TransactionId slot_xmin;
1235 
1236  /*
1237  * Usually GetOldestXmin() would include both global replication slot
1238  * xmin and catalog_xmin in its calculations, but we want to derive
1239  * separate values for each of those. So we ask for an xmin that
1240  * excludes the catalog_xmin.
1241  */
1242  xmin = GetOldestXmin(NULL,
1244 
1245  ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
1246 
1247  if (TransactionIdIsValid(slot_xmin) &&
1248  TransactionIdPrecedes(slot_xmin, xmin))
1249  xmin = slot_xmin;
1250  }
1251  else
1252  {
1253  xmin = InvalidTransactionId;
1254  catalog_xmin = InvalidTransactionId;
1255  }
1256 
1257  /*
1258  * Get epoch and adjust if nextXid and oldestXmin are different sides of
1259  * the epoch boundary.
1260  */
1261  GetNextXidAndEpoch(&nextXid, &xmin_epoch);
1262  catalog_xmin_epoch = xmin_epoch;
1263  if (nextXid < xmin)
1264  xmin_epoch--;
1265  if (nextXid < catalog_xmin)
1266  catalog_xmin_epoch--;
1267 
1268  elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1269  xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1270 
1271  /* Construct the message and send it. */
1273  pq_sendbyte(&reply_message, 'h');
1275  pq_sendint32(&reply_message, xmin);
1276  pq_sendint32(&reply_message, xmin_epoch);
1277  pq_sendint32(&reply_message, catalog_xmin);
1278  pq_sendint32(&reply_message, catalog_xmin_epoch);
1280  if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1281  master_has_standby_xmin = true;
1282  else
1283  master_has_standby_xmin = false;
1284 }
bool hot_standby_feedback
Definition: walreceiver.c:76
uint32 TransactionId
Definition: c.h:445
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
static void pq_sendint32(StringInfo buf, int32 i)
Definition: pqformat.h:148
bool HotStandbyActive(void)
Definition: xlog.c:7985
static void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.h:156
static void pq_sendbyte(StringInfo buf, int8 byt)
Definition: pqformat.h:164
int wal_receiver_status_interval
Definition: walreceiver.c:74
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1649
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:2986
static WalReceiverConn * wrconn
Definition: walreceiver.c:79
static StringInfoData reply_message
Definition: walreceiver.c:111
void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
Definition: xlog.c:8330
#define PROCARRAY_SLOTS_XMIN
Definition: procarray.h:37
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
unsigned int uint32
Definition: c.h:296
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define PROCARRAY_FLAGS_DEFAULT
Definition: procarray.h:50
TransactionId GetOldestXmin(Relation rel, int flags)
Definition: procarray.c:1315
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:259
#define elog
Definition: elog.h:219
#define TransactionIdIsValid(xid)
Definition: transam.h:41
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534

◆ XLogWalRcvSendReply()

static void XLogWalRcvSendReply ( bool  force,
bool  requestReply 
)
static

Definition at line 1111 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), GetXLogReplayRecPtr(), StringInfoData::len, LogstreamResult, now(), pq_sendbyte(), pq_sendint64(), resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

1112 {
1113  static XLogRecPtr writePtr = 0;
1114  static XLogRecPtr flushPtr = 0;
1115  XLogRecPtr applyPtr;
1116  static TimestampTz sendTime = 0;
1117  TimestampTz now;
1118 
1119  /*
1120  * If the user doesn't want status to be reported to the master, be sure
1121  * to exit before doing anything at all.
1122  */
1123  if (!force && wal_receiver_status_interval <= 0)
1124  return;
1125 
1126  /* Get current timestamp. */
1127  now = GetCurrentTimestamp();
1128 
1129  /*
1130  * We can compare the write and flush positions to the last message we
1131  * sent without taking any lock, but the apply position requires a spin
1132  * lock, so we don't check that unless something else has changed or 10
1133  * seconds have passed. This means that the apply WAL location will
1134  * appear, from the master's point of view, to lag slightly, but since
1135  * this is only for reporting purposes and only on idle systems, that's
1136  * probably OK.
1137  */
1138  if (!force
1139  && writePtr == LogstreamResult.Write
1140  && flushPtr == LogstreamResult.Flush
1141  && !TimestampDifferenceExceeds(sendTime, now,
1143  return;
1144  sendTime = now;
1145 
1146  /* Construct a new message */
1147  writePtr = LogstreamResult.Write;
1148  flushPtr = LogstreamResult.Flush;
1149  applyPtr = GetXLogReplayRecPtr(NULL);
1150 
1152  pq_sendbyte(&reply_message, 'r');
1153  pq_sendint64(&reply_message, writePtr);
1154  pq_sendint64(&reply_message, flushPtr);
1155  pq_sendint64(&reply_message, applyPtr);
1157  pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1158 
1159  /* Send it */
1160  elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1161  (uint32) (writePtr >> 32), (uint32) writePtr,
1162  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1163  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1164  requestReply ? " (reply requested)" : "");
1165 
1167 }
static struct @25 LogstreamResult
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
static void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.h:156
static void pq_sendbyte(StringInfo buf, int8 byt)
Definition: pqformat.h:164
int wal_receiver_status_interval
Definition: walreceiver.c:74
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1649
static WalReceiverConn * wrconn
Definition: walreceiver.c:79
static StringInfoData reply_message
Definition: walreceiver.c:111
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11128
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
unsigned int uint32
Definition: c.h:296
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:259
#define elog
Definition: elog.h:219
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534

◆ XLogWalRcvWrite()

static void XLogWalRcvWrite ( char *  buf,
Size  nbytes,
XLogRecPtr  recptr 
)
static

Definition at line 943 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, close, ereport, errcode_for_file_access(), errmsg(), LogstreamResult, MAXFNAMELEN, PANIC, recvFile, recvFileTLI, recvOff, recvSegNo, ThisTimeLineID, wal_segment_size, write, XLByteInSeg, XLByteToSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileInit(), XLogFileName, XLogFileNameP(), XLogSegmentOffset, and XLogWalRcvFlush().

Referenced by XLogWalRcvProcessMsg().

944 {
945  int startoff;
946  int byteswritten;
947 
948  while (nbytes > 0)
949  {
950  int segbytes;
951 
952  if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
953  {
954  bool use_existent;
955 
956  /*
957  * fsync() and close current file before we switch to next one. We
958  * would otherwise have to reopen this file to fsync it later
959  */
960  if (recvFile >= 0)
961  {
962  char xlogfname[MAXFNAMELEN];
963 
964  XLogWalRcvFlush(false);
965 
966  /*
967  * XLOG segment files will be re-read by recovery in startup
968  * process soon, so we don't advise the OS to release cache
969  * pages associated with the file like XLogFileClose() does.
970  */
971  if (close(recvFile) != 0)
972  ereport(PANIC,
974  errmsg("could not close log segment %s: %m",
976 
977  /*
978  * Create .done file forcibly to prevent the streamed segment
979  * from being archived later.
980  */
983  XLogArchiveForceDone(xlogfname);
984  else
985  XLogArchiveNotify(xlogfname);
986  }
987  recvFile = -1;
988 
989  /* Create/use new log file */
991  use_existent = true;
992  recvFile = XLogFileInit(recvSegNo, &use_existent, true);
994  recvOff = 0;
995  }
996 
997  /* Calculate the start offset of the received logs */
998  startoff = XLogSegmentOffset(recptr, wal_segment_size);
999 
1000  if (startoff + nbytes > wal_segment_size)
1001  segbytes = wal_segment_size - startoff;
1002  else
1003  segbytes = nbytes;
1004 
1005  /* Need to seek in the file? */
1006  if (recvOff != startoff)
1007  {
1008  if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
1009  ereport(PANIC,
1011  errmsg("could not seek in log segment %s to offset %u: %m",
1013  startoff)));
1014  recvOff = startoff;
1015  }
1016 
1017  /* OK to write the logs */
1018  errno = 0;
1019 
1020  byteswritten = write(recvFile, buf, segbytes);
1021  if (byteswritten <= 0)
1022  {
1023  /* if write didn't set errno, assume no disk space */
1024  if (errno == 0)
1025  errno = ENOSPC;
1026  ereport(PANIC,
1028  errmsg("could not write to log segment %s "
1029  "at offset %u, length %lu: %m",
1031  recvOff, (unsigned long) segbytes)));
1032  }
1033 
1034  /* Update state for write */
1035  recptr += byteswritten;
1036 
1037  recvOff += byteswritten;
1038  nbytes -= byteswritten;
1039  buf += byteswritten;
1040 
1041  LogstreamResult.Write = recptr;
1042  }
1043 }
static struct @25 LogstreamResult
int wal_segment_size
Definition: xlog.c:113
#define write(a, b, c)
Definition: win32.h:14
int XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
Definition: xlog.c:3160
#define PANIC
Definition: elog.h:53
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:514
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10166
static char * buf
Definition: pg_test_fsync.c:67
int XLogArchiveMode
Definition: xlog.c:94
int errcode_for_file_access(void)
Definition: elog.c:598
#define ereport(elevel, rest)
Definition: elog.h:122
static TimeLineID recvFileTLI
Definition: walreceiver.c:90
static XLogSegNo recvSegNo
Definition: walreceiver.c:91
#define MAXFNAMELEN
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:564
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
TimeLineID ThisTimeLineID
Definition: xlog.c:181
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1052
int errmsg(const char *fmt,...)
Definition: elog.c:797
static uint32 recvOff
Definition: walreceiver.c:92
#define close(a)
Definition: win32.h:12
static int recvFile
Definition: walreceiver.c:89
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

Variable Documentation

◆ Flush

XLogRecPtr Flush

Definition at line 108 of file walreceiver.c.

◆ got_SIGHUP

volatile sig_atomic_t got_SIGHUP = false
static

Definition at line 98 of file walreceiver.c.

Referenced by WalRcvSigHupHandler(), and WalReceiverMain().

◆ got_SIGTERM

volatile sig_atomic_t got_SIGTERM = false
static

Definition at line 99 of file walreceiver.c.

Referenced by ProcessWalRcvInterrupts(), and WalRcvShutdownHandler().

◆ hot_standby_feedback

bool hot_standby_feedback

Definition at line 76 of file walreceiver.c.

Referenced by XLogWalRcvSendHSFeedback().

◆ incoming_message

StringInfoData incoming_message
static

Definition at line 112 of file walreceiver.c.

◆ LogstreamResult

struct { ... } LogstreamResult

◆ recvFile

int recvFile = -1
static

Definition at line 89 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ recvFileTLI

TimeLineID recvFileTLI = 0
static

Definition at line 90 of file walreceiver.c.

Referenced by WalReceiverMain(), and XLogWalRcvWrite().

◆ recvOff

uint32 recvOff = 0
static

Definition at line 92 of file walreceiver.c.

Referenced by XLogWalRcvWrite().

◆ recvSegNo

XLogSegNo recvSegNo = 0
static

Definition at line 91 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ reply_message

StringInfoData reply_message
static

Definition at line 111 of file walreceiver.c.

Referenced by send_feedback().

◆ wal_receiver_status_interval

int wal_receiver_status_interval

Definition at line 74 of file walreceiver.c.

Referenced by send_feedback(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ wal_receiver_timeout

int wal_receiver_timeout

Definition at line 75 of file walreceiver.c.

Referenced by logicalrep_worker_launch(), LogicalRepApplyLoop(), and WalReceiverMain().

◆ WalRcvImmediateInterruptOK

volatile bool WalRcvImmediateInterruptOK = false
static

◆ WalReceiverFunctions

WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 80 of file walreceiver.c.

Referenced by _PG_init().

◆ wrconn

WalReceiverConn* wrconn = NULL
static

Definition at line 79 of file walreceiver.c.

◆ Write

XLogRecPtr Write

Definition at line 107 of file walreceiver.c.