PostgreSQL Source Code  git master
walreceiver.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "common/ip.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */
 

Functions

static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr)
 
static void XLogWalRcvFlush (bool dying)
 
static void XLogWalRcvSendReply (bool force, bool requestReply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
static void WalRcvSigHupHandler (SIGNAL_ARGS)
 
static void WalRcvSigUsr1Handler (SIGNAL_ARGS)
 
static void WalRcvShutdownHandler (SIGNAL_ARGS)
 
static void WalRcvQuickDieHandler (SIGNAL_ARGS)
 
void ProcessWalRcvInterrupts (void)
 
void WalReceiverMain (void)
 
void WalRcvForceReply (void)
 
static const char * WalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
static uint32 recvOff = 0
 
static volatile sig_atomic_t got_SIGHUP = false
 
static volatile sig_atomic_t got_SIGTERM = false
 
struct {
   XLogRecPtr   Write
 
   XLogRecPtr   Flush
 
LogstreamResult
 
static StringInfoData reply_message
 
static StringInfoData incoming_message
 

Macro Definition Documentation

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */

Definition at line 83 of file walreceiver.c.

Referenced by WalReceiverMain().

Function Documentation

◆ pg_stat_get_wal_receiver()

Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1340 of file walreceiver.c.

References WalRcvData::conninfo, CStringGetTextDatum, elog, ERROR, get_call_result_type(), GetUserId(), heap_form_tuple(), HeapTupleGetDatum, Int32GetDatum, is_member_of_role(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum, MAXCONNINFO, MemSet, WalRcvData::mutex, NAMEDATALEN, TupleDescData::natts, NI_MAXHOST, palloc0(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receivedUpto, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::sender_host, WalRcvData::sender_port, WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, strlcpy(), TimestampTzGetDatum, TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, and XLogRecPtrIsInvalid.

1341 {
1342  TupleDesc tupdesc;
1343  Datum *values;
1344  bool *nulls;
1345  int pid;
1346  bool ready_to_display;
1348  XLogRecPtr receive_start_lsn;
1349  TimeLineID receive_start_tli;
1350  XLogRecPtr received_lsn;
1351  TimeLineID received_tli;
1352  TimestampTz last_send_time;
1353  TimestampTz last_receipt_time;
1354  XLogRecPtr latest_end_lsn;
1355  TimestampTz latest_end_time;
1356  char sender_host[NI_MAXHOST];
1357  int sender_port = 0;
1358  char slotname[NAMEDATALEN];
1359  char conninfo[MAXCONNINFO];
1360 
1361  /* Take a lock to ensure value consistency */
1363  pid = (int) WalRcv->pid;
1364  ready_to_display = WalRcv->ready_to_display;
1365  state = WalRcv->walRcvState;
1366  receive_start_lsn = WalRcv->receiveStart;
1367  receive_start_tli = WalRcv->receiveStartTLI;
1368  received_lsn = WalRcv->receivedUpto;
1369  received_tli = WalRcv->receivedTLI;
1370  last_send_time = WalRcv->lastMsgSendTime;
1371  last_receipt_time = WalRcv->lastMsgReceiptTime;
1372  latest_end_lsn = WalRcv->latestWalEnd;
1373  latest_end_time = WalRcv->latestWalEndTime;
1374  strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1375  strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
1376  sender_port = WalRcv->sender_port;
1377  strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1379 
1380  /*
1381  * No WAL receiver (or not ready yet), just return a tuple with NULL
1382  * values
1383  */
1384  if (pid == 0 || !ready_to_display)
1385  PG_RETURN_NULL();
1386 
1387  /* determine result type */
1388  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1389  elog(ERROR, "return type must be a row type");
1390 
1391  values = palloc0(sizeof(Datum) * tupdesc->natts);
1392  nulls = palloc0(sizeof(bool) * tupdesc->natts);
1393 
1394  /* Fetch values */
1395  values[0] = Int32GetDatum(pid);
1396 
1397  if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
1398  {
1399  /*
1400  * Only superusers and members of pg_read_all_stats can see details.
1401  * Other users only get the pid value to know whether it is a WAL
1402  * receiver, but no details.
1403  */
1404  MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1405  }
1406  else
1407  {
1408  values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1409 
1410  if (XLogRecPtrIsInvalid(receive_start_lsn))
1411  nulls[2] = true;
1412  else
1413  values[2] = LSNGetDatum(receive_start_lsn);
1414  values[3] = Int32GetDatum(receive_start_tli);
1415  if (XLogRecPtrIsInvalid(received_lsn))
1416  nulls[4] = true;
1417  else
1418  values[4] = LSNGetDatum(received_lsn);
1419  values[5] = Int32GetDatum(received_tli);
1420  if (last_send_time == 0)
1421  nulls[6] = true;
1422  else
1423  values[6] = TimestampTzGetDatum(last_send_time);
1424  if (last_receipt_time == 0)
1425  nulls[7] = true;
1426  else
1427  values[7] = TimestampTzGetDatum(last_receipt_time);
1428  if (XLogRecPtrIsInvalid(latest_end_lsn))
1429  nulls[8] = true;
1430  else
1431  values[8] = LSNGetDatum(latest_end_lsn);
1432  if (latest_end_time == 0)
1433  nulls[9] = true;
1434  else
1435  values[9] = TimestampTzGetDatum(latest_end_time);
1436  if (*slotname == '\0')
1437  nulls[10] = true;
1438  else
1439  values[10] = CStringGetTextDatum(slotname);
1440  if (*sender_host == '\0')
1441  nulls[11] = true;
1442  else
1443  values[11] = CStringGetTextDatum(sender_host);
1444  if (sender_port == 0)
1445  nulls[12] = true;
1446  else
1447  values[12] = Int32GetDatum(sender_port);
1448  if (*conninfo == '\0')
1449  nulls[13] = true;
1450  else
1451  values[13] = CStringGetTextDatum(conninfo);
1452  }
1453 
1454  /* Returns the record as Datum */
1455  PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1456 }
int sender_port
Definition: walreceiver.h:116
static const char * WalRcvGetStateString(WalRcvState state)
Definition: walreceiver.c:1315
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:196
uint32 TimeLineID
Definition: xlogdefs.h:52
slock_t mutex
Definition: walreceiver.h:136
Oid GetUserId(void)
Definition: miscinit.c:380
int64 TimestampTz
Definition: timestamp.h:39
WalRcvState walRcvState
Definition: walreceiver.h:63
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
TimeLineID receivedTLI
Definition: walreceiver.h:83
#define MemSet(start, val, len)
Definition: c.h:955
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:97
TimestampTz lastMsgSendTime
Definition: walreceiver.h:96
WalRcvState
Definition: walreceiver.h:43
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define NI_MAXHOST
Definition: getaddrinfo.h:88
#define ERROR
Definition: elog.h:43
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
pid_t pid
Definition: walreceiver.h:62
#define MAXCONNINFO
Definition: walreceiver.h:35
XLogRecPtr receivedUpto
Definition: walreceiver.h:82
TimeLineID receiveStartTLI
Definition: walreceiver.h:73
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
void * palloc0(Size size)
Definition: mcxt.c:955
uintptr_t Datum
Definition: postgres.h:367
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:343
bool ready_to_display
Definition: walreceiver.h:125
TimestampTz latestWalEndTime
Definition: walreceiver.h:103
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4932
XLogRecPtr latestWalEnd
Definition: walreceiver.h:102
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:221
WalRcvData * WalRcv
static Datum values[MAXATTR]
Definition: bootstrap.c:167
#define Int32GetDatum(X)
Definition: postgres.h:479
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:115
#define elog(elevel,...)
Definition: elog.h:226
#define CStringGetTextDatum(s)
Definition: builtins.h:83
XLogRecPtr receiveStart
Definition: walreceiver.h:72
#define PG_RETURN_NULL()
Definition: fmgr.h:335
char slotname[NAMEDATALEN]
Definition: walreceiver.h:122
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:109

◆ ProcessWalRcvInterrupts()

void ProcessWalRcvInterrupts ( void  )

Definition at line 147 of file walreceiver.c.

References CHECK_FOR_INTERRUPTS, ereport, errcode(), errmsg(), FATAL, and got_SIGTERM.

Referenced by libpqrcv_connect(), libpqrcv_PQgetResult(), libpqrcv_processTuples(), walrcv_clear_result(), WalRcvWaitForStartPosition(), and WalReceiverMain().

148 {
149  /*
150  * Although walreceiver interrupt handling doesn't use the same scheme as
151  * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
152  * any incoming signals on Win32.
153  */
155 
156  if (got_SIGTERM)
157  {
158  ereport(FATAL,
159  (errcode(ERRCODE_ADMIN_SHUTDOWN),
160  errmsg("terminating walreceiver process due to administrator command")));
161  }
162 }
int errcode(int sqlerrcode)
Definition: elog.c:570
#define FATAL
Definition: elog.h:52
#define ereport(elevel, rest)
Definition: elog.h:141
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static volatile sig_atomic_t got_SIGTERM
Definition: walreceiver.c:100

◆ ProcessWalSndrMessage()

static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1244 of file walreceiver.c.

References DEBUG2, elog, GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, log_min_messages, WalRcvData::mutex, pfree(), pstrdup(), SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

1245 {
1246  WalRcvData *walrcv = WalRcv;
1247 
1248  TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1249 
1250  /* Update shared-memory status */
1251  SpinLockAcquire(&walrcv->mutex);
1252  if (walrcv->latestWalEnd < walEnd)
1253  walrcv->latestWalEndTime = sendTime;
1254  walrcv->latestWalEnd = walEnd;
1255  walrcv->lastMsgSendTime = sendTime;
1256  walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1257  SpinLockRelease(&walrcv->mutex);
1258 
1259  if (log_min_messages <= DEBUG2)
1260  {
1261  char *sendtime;
1262  char *receipttime;
1263  int applyDelay;
1264 
1265  /* Copy because timestamptz_to_str returns a static buffer */
1266  sendtime = pstrdup(timestamptz_to_str(sendTime));
1267  receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1268  applyDelay = GetReplicationApplyDelay();
1269 
1270  /* apply delay is not available */
1271  if (applyDelay == -1)
1272  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1273  sendtime,
1274  receipttime,
1276  else
1277  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1278  sendtime,
1279  receipttime,
1280  applyDelay,
1282 
1283  pfree(sendtime);
1284  pfree(receipttime);
1285  }
1286 }
slock_t mutex
Definition: walreceiver.h:136
int GetReplicationTransferLatency(void)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1161
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:97
TimestampTz lastMsgSendTime
Definition: walreceiver.h:96
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pfree(void *pointer)
Definition: mcxt.c:1031
#define DEBUG2
Definition: elog.h:24
int GetReplicationApplyDelay(void)
#define SpinLockRelease(lock)
Definition: spin.h:64
TimestampTz latestWalEndTime
Definition: walreceiver.h:103
int log_min_messages
Definition: guc.c:510
XLogRecPtr latestWalEnd
Definition: walreceiver.h:102
WalRcvData * WalRcv
#define elog(elevel,...)
Definition: elog.h:226
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1729

◆ WalRcvDie()

static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 732 of file walreceiver.c.

References Assert, WalRcvData::latch, WalRcvData::mutex, MyProcPid, WalRcvData::pid, WalRcvData::ready_to_display, SpinLockAcquire, SpinLockRelease, WakeupRecovery(), WalRcv, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

733 {
734  WalRcvData *walrcv = WalRcv;
735 
736  /* Ensure that all WAL records received are flushed to disk */
737  XLogWalRcvFlush(true);
738 
739  /* Mark ourselves inactive in shared memory */
740  SpinLockAcquire(&walrcv->mutex);
741  Assert(walrcv->walRcvState == WALRCV_STREAMING ||
742  walrcv->walRcvState == WALRCV_RESTARTING ||
743  walrcv->walRcvState == WALRCV_STARTING ||
744  walrcv->walRcvState == WALRCV_WAITING ||
745  walrcv->walRcvState == WALRCV_STOPPING);
746  Assert(walrcv->pid == MyProcPid);
747  walrcv->walRcvState = WALRCV_STOPPED;
748  walrcv->pid = 0;
749  walrcv->ready_to_display = false;
750  walrcv->latch = NULL;
751  SpinLockRelease(&walrcv->mutex);
752 
753  /* Terminate the connection gracefully. */
754  if (wrconn != NULL)
756 
757  /* Wake up the startup process to notice promptly that we're gone */
758  WakeupRecovery();
759 }
int MyProcPid
Definition: globals.c:40
slock_t mutex
Definition: walreceiver.h:136
WalRcvState walRcvState
Definition: walreceiver.h:63
static WalReceiverConn * wrconn
Definition: walreceiver.c:80
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12225
pid_t pid
Definition: walreceiver.h:62
Latch * latch
Definition: walreceiver.h:134
#define SpinLockRelease(lock)
Definition: spin.h:64
bool ready_to_display
Definition: walreceiver.h:125
#define Assert(condition)
Definition: c.h:732
#define walrcv_disconnect(conn)
Definition: walreceiver.h:281
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1000
WalRcvData * WalRcv

◆ WalRcvFetchTimeLineHistoryFiles()

static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 685 of file walreceiver.c.

References ereport, errcode(), errmsg(), errmsg_internal(), ERROR, existsTimeLineHistory(), LOG, MAXFNAMELEN, pfree(), TLHistoryFileName, walrcv_readtimelinehistoryfile, and writeTimeLineHistoryFile().

Referenced by WalReceiverMain().

686 {
687  TimeLineID tli;
688 
689  for (tli = first; tli <= last; tli++)
690  {
691  /* there's no history file for timeline 1 */
692  if (tli != 1 && !existsTimeLineHistory(tli))
693  {
694  char *fname;
695  char *content;
696  int len;
697  char expectedfname[MAXFNAMELEN];
698 
699  ereport(LOG,
700  (errmsg("fetching timeline history file for timeline %u from primary server",
701  tli)));
702 
703  walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
704 
705  /*
706  * Check that the filename on the master matches what we
707  * calculated ourselves. This is just a sanity check, it should
708  * always match.
709  */
710  TLHistoryFileName(expectedfname, tli);
711  if (strcmp(fname, expectedfname) != 0)
712  ereport(ERROR,
713  (errcode(ERRCODE_PROTOCOL_VIOLATION),
714  errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
715  tli)));
716 
717  /*
718  * Write the file to pg_wal.
719  */
720  writeTimeLineHistoryFile(tli, content, len);
721 
722  pfree(fname);
723  pfree(content);
724  }
725  }
726 }
uint32 TimeLineID
Definition: xlogdefs.h:52
int errcode(int sqlerrcode)
Definition: elog.c:570
#define LOG
Definition: elog.h:26
static WalReceiverConn * wrconn
Definition: walreceiver.c:80
void pfree(void *pointer)
Definition: mcxt.c:1031
#define ERROR
Definition: elog.h:43
#define TLHistoryFileName(fname, tli)
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition: timeline.c:450
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition: timeline.c:207
#define ereport(elevel, rest)
Definition: elog.h:141
#define MAXFNAMELEN
int errmsg_internal(const char *fmt,...)
Definition: elog.c:814
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
Definition: walreceiver.h:267
int errmsg(const char *fmt,...)
Definition: elog.c:784

◆ WalRcvForceReply()

void WalRcvForceReply ( void  )

Definition at line 1297 of file walreceiver.c.

References WalRcvData::force_reply, WalRcvData::latch, WalRcvData::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by StartupXLOG(), WaitForWALToBecomeAvailable(), and walrcv_clear_result().

1298 {
1299  Latch *latch;
1300 
1301  WalRcv->force_reply = true;
1302  /* fetching the latch pointer might not be atomic, so use spinlock */
1304  latch = WalRcv->latch;
1306  if (latch)
1307  SetLatch(latch);
1308 }
slock_t mutex
Definition: walreceiver.h:136
sig_atomic_t force_reply
Definition: walreceiver.h:143
void SetLatch(Latch *latch)
Definition: latch.c:436
#define SpinLockAcquire(lock)
Definition: spin.h:62
Definition: latch.h:110
Latch * latch
Definition: walreceiver.h:134
#define SpinLockRelease(lock)
Definition: spin.h:64
WalRcvData * WalRcv

◆ WalRcvGetStateString()

static const char* WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1315 of file walreceiver.c.

References WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

1316 {
1317  switch (state)
1318  {
1319  case WALRCV_STOPPED:
1320  return "stopped";
1321  case WALRCV_STARTING:
1322  return "starting";
1323  case WALRCV_STREAMING:
1324  return "streaming";
1325  case WALRCV_WAITING:
1326  return "waiting";
1327  case WALRCV_RESTARTING:
1328  return "restarting";
1329  case WALRCV_STOPPING:
1330  return "stopping";
1331  }
1332  return "UNKNOWN";
1333 }
Definition: regguts.h:298

◆ WalRcvQuickDieHandler()

static void WalRcvQuickDieHandler ( SIGNAL_ARGS  )
static

Definition at line 801 of file walreceiver.c.

Referenced by WalReceiverMain().

802 {
803  /*
804  * We DO NOT want to run proc_exit() or atexit() callbacks -- we're here
805  * because shared memory may be corrupted, so we don't want to try to
806  * clean up our transaction. Just nail the windows shut and get out of
807  * town. The callbacks wouldn't be safe to run from a signal handler,
808  * anyway.
809  *
810  * Note we use _exit(2) not _exit(0). This is to force the postmaster
811  * into a system reset cycle if someone sends a manual SIGQUIT to a random
812  * backend. This is necessary precisely because we don't clean up our
813  * shared memory state. (The "dead man switch" mechanism in pmsignal.c
814  * should ensure the postmaster sees this as a crash, too, but no harm in
815  * being doubly sure.)
816  */
817  _exit(2);
818 }

◆ WalRcvShutdownHandler()

static void WalRcvShutdownHandler ( SIGNAL_ARGS  )
static

Definition at line 782 of file walreceiver.c.

References got_SIGTERM, WalRcvData::latch, SetLatch(), and WalRcv.

Referenced by WalReceiverMain().

783 {
784  int save_errno = errno;
785 
786  got_SIGTERM = true;
787 
788  if (WalRcv->latch)
790 
791  errno = save_errno;
792 }
void SetLatch(Latch *latch)
Definition: latch.c:436
Latch * latch
Definition: walreceiver.h:134
WalRcvData * WalRcv
static volatile sig_atomic_t got_SIGTERM
Definition: walreceiver.c:100

◆ WalRcvSigHupHandler()

static void WalRcvSigHupHandler ( SIGNAL_ARGS  )
static

Definition at line 763 of file walreceiver.c.

References got_SIGHUP.

Referenced by WalReceiverMain().

764 {
765  got_SIGHUP = true;
766 }
static volatile sig_atomic_t got_SIGHUP
Definition: walreceiver.c:99

◆ WalRcvSigUsr1Handler()

static void WalRcvSigUsr1Handler ( SIGNAL_ARGS  )
static

Definition at line 771 of file walreceiver.c.

References latch_sigusr1_handler().

Referenced by WalReceiverMain().

772 {
773  int save_errno = errno;
774 
776 
777  errno = save_errno;
778 }
void latch_sigusr1_handler(void)
Definition: latch.c:1515

◆ WalRcvWaitForStartPosition()

static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 607 of file walreceiver.c.

References Assert, elog, FATAL, InvalidXLogRecPtr, WalRcvData::latch, WalRcvData::mutex, proc_exit(), ProcessWalRcvInterrupts(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, ResetLatch(), set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WAIT_EVENT_WAL_RECEIVER_WAIT_START, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by WalReceiverMain().

608 {
609  WalRcvData *walrcv = WalRcv;
610  int state;
611 
612  SpinLockAcquire(&walrcv->mutex);
613  state = walrcv->walRcvState;
614  if (state != WALRCV_STREAMING)
615  {
616  SpinLockRelease(&walrcv->mutex);
617  if (state == WALRCV_STOPPING)
618  proc_exit(0);
619  else
620  elog(FATAL, "unexpected walreceiver state");
621  }
622  walrcv->walRcvState = WALRCV_WAITING;
624  walrcv->receiveStartTLI = 0;
625  SpinLockRelease(&walrcv->mutex);
626 
628  set_ps_display("idle", false);
629 
630  /*
631  * nudge startup process to notice that we've stopped streaming and are
632  * now waiting for instructions.
633  */
634  WakeupRecovery();
635  for (;;)
636  {
637  ResetLatch(walrcv->latch);
638 
640 
641  SpinLockAcquire(&walrcv->mutex);
642  Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
643  walrcv->walRcvState == WALRCV_WAITING ||
644  walrcv->walRcvState == WALRCV_STOPPING);
645  if (walrcv->walRcvState == WALRCV_RESTARTING)
646  {
647  /* we don't expect primary_conninfo to change */
648  *startpoint = walrcv->receiveStart;
649  *startpointTLI = walrcv->receiveStartTLI;
650  walrcv->walRcvState = WALRCV_STREAMING;
651  SpinLockRelease(&walrcv->mutex);
652  break;
653  }
654  if (walrcv->walRcvState == WALRCV_STOPPING)
655  {
656  /*
657  * We should've received SIGTERM if the startup process wants us
658  * to die, but might as well check it here too.
659  */
660  SpinLockRelease(&walrcv->mutex);
661  exit(1);
662  }
663  SpinLockRelease(&walrcv->mutex);
664 
665  (void) WaitLatch(walrcv->latch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
667  }
668 
670  {
671  char activitymsg[50];
672 
673  snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
674  (uint32) (*startpoint >> 32),
675  (uint32) *startpoint);
676  set_ps_display(activitymsg, false);
677  }
678 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
slock_t mutex
Definition: walreceiver.h:136
bool update_process_title
Definition: ps_status.c:35
WalRcvState walRcvState
Definition: walreceiver.h:63
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:147
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:331
void proc_exit(int code)
Definition: ipc.c:104
void ResetLatch(Latch *latch)
Definition: latch.c:519
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:344
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12225
#define FATAL
Definition: elog.h:52
unsigned int uint32
Definition: c.h:358
TimeLineID receiveStartTLI
Definition: walreceiver.h:73
Latch * latch
Definition: walreceiver.h:134
#define SpinLockRelease(lock)
Definition: spin.h:64
#define Assert(condition)
Definition: c.h:732
Definition: regguts.h:298
WalRcvData * WalRcv
#define elog(elevel,...)
Definition: elog.h:226
XLogRecPtr receiveStart
Definition: walreceiver.h:72
#define snprintf
Definition: port.h:192
#define WL_LATCH_SET
Definition: latch.h:124
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ WalReceiverMain()

void WalReceiverMain ( void  )

Definition at line 167 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, Assert, BlockSig, buf, close, cluster_name, WalRcvData::conninfo, DEBUG1, elog, ereport, errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, WalRcvData::force_reply, GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), got_SIGHUP, initStringInfo(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latch, WalRcvData::latestWalEndTime, load_file(), LOG, WalRcvStreamOptions::logical, LogstreamResult, MAXCONNINFO, MAXFNAMELEN, WalRcvData::mutex, MyProc, MyProcPid, NAMEDATALEN, NAPTIME_PER_CYCLE, NI_MAXHOST, now(), on_shmem_exit(), options, PANIC, pfree(), pg_memory_barrier, PG_SETMASK, PGC_SIGHUP, PGINVALID_SOCKET, WalRcvStreamOptions::physical, WalRcvData::pid, pqsignal(), proc_exit(), ProcessConfigFile(), ProcessWalRcvInterrupts(), PGPROC::procLatch, WalRcvStreamOptions::proto, WalRcvData::ready_to_display, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, ResetLatch(), WalRcvData::sender_host, WalRcvData::sender_port, SIG_DFL, SIG_IGN, SIGALRM, SIGCHLD, SIGHUP, SIGPIPE, SIGQUIT, SIGUSR1, SIGUSR2, WalRcvData::slotname, WalRcvStreamOptions::slotname, snprintf, SpinLockAcquire, SpinLockRelease, WalRcvStreamOptions::startpoint, strlcpy(), ThisTimeLineID, TimestampTzPlusMilliseconds, UINT64_FORMAT, UnBlockSig, WAIT_EVENT_WAL_RECEIVER_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, wal_segment_size, WalRcv, walrcv_connect, walrcv_endstreaming, walrcv_get_conninfo, walrcv_get_senderinfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvQuickDieHandler(), WalRcvShutdownHandler(), WalRcvSigHupHandler(), WalRcvSigUsr1Handler(), WalRcvData::walRcvState, WalRcvWaitForStartPosition(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_TIMEOUT, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName, XLogFileNameP(), XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by AuxiliaryProcessMain(), and walrcv_clear_result().

168 {
169  char conninfo[MAXCONNINFO];
170  char *tmp_conninfo;
171  char slotname[NAMEDATALEN];
172  XLogRecPtr startpoint;
173  TimeLineID startpointTLI;
174  TimeLineID primaryTLI;
175  bool first_stream;
176  WalRcvData *walrcv = WalRcv;
177  TimestampTz last_recv_timestamp;
179  bool ping_sent;
180  char *err;
181  char *sender_host = NULL;
182  int sender_port = 0;
183 
184  /*
185  * WalRcv should be set up already (if we are a backend, we inherit this
186  * by fork() or EXEC_BACKEND mechanism from the postmaster).
187  */
188  Assert(walrcv != NULL);
189 
190  now = GetCurrentTimestamp();
191 
192  /*
193  * Mark walreceiver as running in shared memory.
194  *
195  * Do this as early as possible, so that if we fail later on, we'll set
196  * state to STOPPED. If we die before this, the startup process will keep
197  * waiting for us to start up, until it times out.
198  */
199  SpinLockAcquire(&walrcv->mutex);
200  Assert(walrcv->pid == 0);
201  switch (walrcv->walRcvState)
202  {
203  case WALRCV_STOPPING:
204  /* If we've already been requested to stop, don't start up. */
205  walrcv->walRcvState = WALRCV_STOPPED;
206  /* fall through */
207 
208  case WALRCV_STOPPED:
209  SpinLockRelease(&walrcv->mutex);
210  proc_exit(1);
211  break;
212 
213  case WALRCV_STARTING:
214  /* The usual case */
215  break;
216 
217  case WALRCV_WAITING:
218  case WALRCV_STREAMING:
219  case WALRCV_RESTARTING:
220  default:
221  /* Shouldn't happen */
222  SpinLockRelease(&walrcv->mutex);
223  elog(PANIC, "walreceiver still running according to shared memory state");
224  }
225  /* Advertise our PID so that the startup process can kill us */
226  walrcv->pid = MyProcPid;
227  walrcv->walRcvState = WALRCV_STREAMING;
228 
229  /* Fetch information required to start streaming */
230  walrcv->ready_to_display = false;
231  strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
232  strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
233  startpoint = walrcv->receiveStart;
234  startpointTLI = walrcv->receiveStartTLI;
235 
236  /* Initialise to a sanish value */
237  walrcv->lastMsgSendTime =
238  walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
239 
240  /* Report the latch to use to awaken this process */
241  walrcv->latch = &MyProc->procLatch;
242 
243  SpinLockRelease(&walrcv->mutex);
244 
245  /* Arrange to clean up at walreceiver exit */
247 
248  /* Properly accept or ignore signals the postmaster might send us */
249  pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */
250  pqsignal(SIGINT, SIG_IGN);
251  pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */
252  pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */
257 
258  /* Reset some signals that are accepted by postmaster but not here */
260 
261  /* We allow SIGQUIT (quickdie) at all times */
262  sigdelset(&BlockSig, SIGQUIT);
263 
264  /* Load the libpq-specific functions */
265  load_file("libpqwalreceiver", false);
266  if (WalReceiverFunctions == NULL)
267  elog(ERROR, "libpqwalreceiver didn't initialize correctly");
268 
269  /* Unblock signals (they were blocked when the postmaster forked us) */
271 
272  /* Establish the connection to the primary for XLOG streaming */
273  wrconn = walrcv_connect(conninfo, false, cluster_name[0] ? cluster_name : "walreceiver", &err);
274  if (!wrconn)
275  ereport(ERROR,
276  (errmsg("could not connect to the primary server: %s", err)));
277 
278  /*
279  * Save user-visible connection string. This clobbers the original
280  * conninfo, for security. Also save host and port of the sender server
281  * this walreceiver is connected to.
282  */
283  tmp_conninfo = walrcv_get_conninfo(wrconn);
284  walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
285  SpinLockAcquire(&walrcv->mutex);
286  memset(walrcv->conninfo, 0, MAXCONNINFO);
287  if (tmp_conninfo)
288  strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
289 
290  memset(walrcv->sender_host, 0, NI_MAXHOST);
291  if (sender_host)
292  strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
293 
294  walrcv->sender_port = sender_port;
295  walrcv->ready_to_display = true;
296  SpinLockRelease(&walrcv->mutex);
297 
298  if (tmp_conninfo)
299  pfree(tmp_conninfo);
300 
301  if (sender_host)
302  pfree(sender_host);
303 
304  first_stream = true;
305  for (;;)
306  {
307  char *primary_sysid;
308  char standby_sysid[32];
310 
311  /*
312  * Check that we're connected to a valid server using the
313  * IDENTIFY_SYSTEM replication command.
314  */
315  primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
316 
317  snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
319  if (strcmp(primary_sysid, standby_sysid) != 0)
320  {
321  ereport(ERROR,
322  (errmsg("database system identifier differs between the primary and standby"),
323  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
324  primary_sysid, standby_sysid)));
325  }
326 
327  /*
328  * Confirm that the current timeline of the primary is the same or
329  * ahead of ours.
330  */
331  if (primaryTLI < startpointTLI)
332  ereport(ERROR,
333  (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
334  primaryTLI, startpointTLI)));
335 
336  /*
337  * Get any missing history files. We do this always, even when we're
338  * not interested in that timeline, so that if we're promoted to
339  * become the master later on, we don't select the same timeline that
340  * was already used in the current master. This isn't bullet-proof -
341  * you'll need some external software to manage your cluster if you
342  * need to ensure that a unique timeline id is chosen in every case,
343  * but let's avoid the confusion of timeline id collisions where we
344  * can.
345  */
346  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
347 
348  /*
349  * Start streaming.
350  *
351  * We'll try to start at the requested starting point and timeline,
352  * even if it's different from the server's latest timeline. In case
353  * we've already reached the end of the old timeline, the server will
354  * finish the streaming immediately, and we will go back to await
355  * orders from the startup process. If recovery_target_timeline is
356  * 'latest', the startup process will scan pg_wal and find the new
357  * history file, bump recovery target timeline, and ask us to restart
358  * on the new timeline.
359  */
360  options.logical = false;
361  options.startpoint = startpoint;
362  options.slotname = slotname[0] != '\0' ? slotname : NULL;
363  options.proto.physical.startpointTLI = startpointTLI;
364  ThisTimeLineID = startpointTLI;
365  if (walrcv_startstreaming(wrconn, &options))
366  {
367  if (first_stream)
368  ereport(LOG,
369  (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
370  (uint32) (startpoint >> 32), (uint32) startpoint,
371  startpointTLI)));
372  else
373  ereport(LOG,
374  (errmsg("restarted WAL streaming at %X/%X on timeline %u",
375  (uint32) (startpoint >> 32), (uint32) startpoint,
376  startpointTLI)));
377  first_stream = false;
378 
379  /* Initialize LogstreamResult and buffers for processing messages */
383 
384  /* Initialize the last recv timestamp */
385  last_recv_timestamp = GetCurrentTimestamp();
386  ping_sent = false;
387 
388  /* Loop until end-of-streaming or error */
389  for (;;)
390  {
391  char *buf;
392  int len;
393  bool endofwal = false;
394  pgsocket wait_fd = PGINVALID_SOCKET;
395  int rc;
396 
397  /*
398  * Exit walreceiver if we're not in recovery. This should not
399  * happen, but cross-check the status here.
400  */
401  if (!RecoveryInProgress())
402  ereport(FATAL,
403  (errmsg("cannot continue WAL streaming, recovery has already ended")));
404 
405  /* Process any requests or signals received recently */
407 
408  if (got_SIGHUP)
409  {
410  got_SIGHUP = false;
413  }
414 
415  /* See if we can read data immediately */
416  len = walrcv_receive(wrconn, &buf, &wait_fd);
417  if (len != 0)
418  {
419  /*
420  * Process the received data, and any subsequent data we
421  * can read without blocking.
422  */
423  for (;;)
424  {
425  if (len > 0)
426  {
427  /*
428  * Something was received from master, so reset
429  * timeout
430  */
431  last_recv_timestamp = GetCurrentTimestamp();
432  ping_sent = false;
433  XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
434  }
435  else if (len == 0)
436  break;
437  else if (len < 0)
438  {
439  ereport(LOG,
440  (errmsg("replication terminated by primary server"),
441  errdetail("End of WAL reached on timeline %u at %X/%X.",
442  startpointTLI,
443  (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
444  endofwal = true;
445  break;
446  }
447  len = walrcv_receive(wrconn, &buf, &wait_fd);
448  }
449 
450  /* Let the master know that we received some data. */
451  XLogWalRcvSendReply(false, false);
452 
453  /*
454  * If we've written some records, flush them to disk and
455  * let the startup process and primary server know about
456  * them.
457  */
458  XLogWalRcvFlush(false);
459  }
460 
461  /* Check if we need to exit the streaming loop. */
462  if (endofwal)
463  break;
464 
465  /*
466  * Ideally we would reuse a WaitEventSet object repeatedly
467  * here to avoid the overheads of WaitLatchOrSocket on epoll
468  * systems, but we can't be sure that libpq (or any other
469  * walreceiver implementation) has the same socket (even if
470  * the fd is the same number, it may have been closed and
471  * reopened since the last time). In future, if there is a
472  * function for removing sockets from WaitEventSet, then we
473  * could add and remove just the socket each time, potentially
474  * avoiding some system calls.
475  */
476  Assert(wait_fd != PGINVALID_SOCKET);
477  rc = WaitLatchOrSocket(walrcv->latch,
480  wait_fd,
483  if (rc & WL_LATCH_SET)
484  {
485  ResetLatch(walrcv->latch);
487 
488  if (walrcv->force_reply)
489  {
490  /*
491  * The recovery process has asked us to send apply
492  * feedback now. Make sure the flag is really set to
493  * false in shared memory before sending the reply, so
494  * we don't miss a new request for a reply.
495  */
496  walrcv->force_reply = false;
498  XLogWalRcvSendReply(true, false);
499  }
500  }
501  if (rc & WL_TIMEOUT)
502  {
503  /*
504  * We didn't receive anything new. If we haven't heard
505  * anything from the server for more than
506  * wal_receiver_timeout / 2, ping the server. Also, if
507  * it's been longer than wal_receiver_status_interval
508  * since the last update we sent, send a status update to
509  * the master anyway, to report any progress in applying
510  * WAL.
511  */
512  bool requestReply = false;
513 
514  /*
515  * Check if time since last receive from standby has
516  * reached the configured limit.
517  */
518  if (wal_receiver_timeout > 0)
519  {
521  TimestampTz timeout;
522 
523  timeout =
524  TimestampTzPlusMilliseconds(last_recv_timestamp,
526 
527  if (now >= timeout)
528  ereport(ERROR,
529  (errmsg("terminating walreceiver due to timeout")));
530 
531  /*
532  * We didn't receive anything new, for half of
533  * receiver replication timeout. Ping the server.
534  */
535  if (!ping_sent)
536  {
537  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
538  (wal_receiver_timeout / 2));
539  if (now >= timeout)
540  {
541  requestReply = true;
542  ping_sent = true;
543  }
544  }
545  }
546 
547  XLogWalRcvSendReply(requestReply, requestReply);
549  }
550  }
551 
552  /*
553  * The backend finished streaming. Exit streaming COPY-mode from
554  * our side, too.
555  */
556  walrcv_endstreaming(wrconn, &primaryTLI);
557 
558  /*
559  * If the server had switched to a new timeline that we didn't
560  * know about when we began streaming, fetch its timeline history
561  * file now.
562  */
563  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
564  }
565  else
566  ereport(LOG,
567  (errmsg("primary server contains no more WAL on requested timeline %u",
568  startpointTLI)));
569 
570  /*
571  * End of WAL reached on the requested timeline. Close the last
572  * segment, and await for new orders from the startup process.
573  */
574  if (recvFile >= 0)
575  {
576  char xlogfname[MAXFNAMELEN];
577 
578  XLogWalRcvFlush(false);
579  if (close(recvFile) != 0)
580  ereport(PANIC,
582  errmsg("could not close log segment %s: %m",
584 
585  /*
586  * Create .done file forcibly to prevent the streamed segment from
587  * being archived later.
588  */
591  XLogArchiveForceDone(xlogfname);
592  else
593  XLogArchiveNotify(xlogfname);
594  }
595  recvFile = -1;
596 
597  elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
598  WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
599  }
600  /* not reached */
601 }
static struct @25 LogstreamResult
int sender_port
Definition: walreceiver.h:116
#define SIGQUIT
Definition: win32_port.h:164
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
Definition: walreceiver.c:685
#define walrcv_get_conninfo(conn)
Definition: walreceiver.h:259
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:271
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
uint32 TimeLineID
Definition: xlogdefs.h:52
slock_t mutex
Definition: walreceiver.h:136
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
int wal_segment_size
Definition: xlog.c:112
#define SIGUSR1
Definition: win32_port.h:175
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
#define SIGCHLD
Definition: win32_port.h:173
PGPROC * MyProc
Definition: proc.c:68
int64 TimestampTz
Definition: timestamp.h:39
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
Definition: walreceiver.c:607
sig_atomic_t force_reply
Definition: walreceiver.h:143
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:263
WalRcvState walRcvState
Definition: walreceiver.h:63
static StringInfoData incoming_message
Definition: walreceiver.c:113
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1128
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:273
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:147
union WalRcvStreamOptions::@106 proto
static void WalRcvSigHupHandler(SIGNAL_ARGS)
Definition: walreceiver.c:763
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:269
void proc_exit(int code)
Definition: ipc.c:104
#define WL_SOCKET_READABLE
Definition: latch.h:125
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
Definition: walreceiver.c:824
#define SIGPIPE
Definition: win32_port.h:168
#define SIGUSR2
Definition: win32_port.h:176
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7898
#define PANIC
Definition: elog.h:53
#define NAPTIME_PER_CYCLE
Definition: walreceiver.c:83
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:97
void ResetLatch(Latch *latch)
Definition: latch.c:519
TimestampTz lastMsgSendTime
Definition: walreceiver.h:96
#define PG_SETMASK(mask)
Definition: pqsignal.h:19
int wal_receiver_timeout
Definition: walreceiver.c:76
Latch procLatch
Definition: proc.h:104
static WalReceiverConn * wrconn
Definition: walreceiver.c:80
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
static StringInfoData reply_message
Definition: walreceiver.c:112
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:504
void pfree(void *pointer)
Definition: mcxt.c:1031
#define NI_MAXHOST
Definition: getaddrinfo.h:88
#define ERROR
Definition: elog.h:43
static void WalRcvSigUsr1Handler(SIGNAL_ARGS)
Definition: walreceiver.c:771
#define FATAL
Definition: elog.h:52
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11130
pid_t pid
Definition: walreceiver.h:62
#define MAXCONNINFO
Definition: walreceiver.h:35
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10130
static char * buf
Definition: pg_test_fsync.c:68
int XLogArchiveMode
Definition: xlog.c:91
int errdetail(const char *fmt,...)
Definition: elog.c:860
int errcode_for_file_access(void)
Definition: elog.c:593
#define SIGHUP
Definition: win32_port.h:163
XLogRecPtr startpoint
Definition: walreceiver.h:153
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
unsigned int uint32
Definition: c.h:358
int pgsocket
Definition: port.h:31
sigset_t UnBlockSig
Definition: pqsignal.c:22
struct WalRcvStreamOptions::@106::@107 physical
static volatile sig_atomic_t got_SIGHUP
Definition: walreceiver.c:99
#define ereport(elevel, rest)
Definition: elog.h:141
TimeLineID receiveStartTLI
Definition: walreceiver.h:73
static TimeLineID recvFileTLI
Definition: walreceiver.c:91
Definition: guc.h:72
char * cluster_name
Definition: guc.c:521
Latch * latch
Definition: walreceiver.h:134
#define SIG_IGN
Definition: win32_port.h:160
static char ** options
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
static XLogSegNo recvSegNo
Definition: walreceiver.c:92
#define MAXFNAMELEN
#define SpinLockRelease(lock)
Definition: spin.h:64
sigset_t BlockSig
Definition: pqsignal.c:22
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:554
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:81
#define PGINVALID_SOCKET
Definition: port.h:33
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1059
bool ready_to_display
Definition: walreceiver.h:125
TimestampTz latestWalEndTime
Definition: walreceiver.h:103
static void WalRcvShutdownHandler(SIGNAL_ARGS)
Definition: walreceiver.c:782
TimeLineID ThisTimeLineID
Definition: xlog.c:187
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define pg_memory_barrier()
Definition: atomics.h:148
#define SIG_DFL
Definition: win32_port.h:158
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:732
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
#define SIGALRM
Definition: win32_port.h:169
static void WalRcvQuickDieHandler(SIGNAL_ARGS)
Definition: walreceiver.c:801
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1000
WalRcvData * WalRcv
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4802
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:115
int errmsg(const char *fmt,...)
Definition: elog.c:784
static void WalRcvDie(int code, Datum arg)
Definition: walreceiver.c:732
#define elog(elevel,...)
Definition: elog.h:226
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
Definition: walreceiver.h:261
#define close(a)
Definition: win32.h:12
XLogRecPtr receiveStart
Definition: walreceiver.h:72
static int recvFile
Definition: walreceiver.c:90
#define snprintf
Definition: port.h:192
#define WL_LATCH_SET
Definition: latch.h:124
#define UINT64_FORMAT
Definition: c.h:401
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
char slotname[NAMEDATALEN]
Definition: walreceiver.h:122
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:109
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:255

◆ XLogWalRcvFlush()

static void XLogWalRcvFlush ( bool  dying)
static

Definition at line 1000 of file walreceiver.c.

References AllowCascadeReplication, issue_xlog_fsync(), WalRcvData::latestChunkStart, LogstreamResult, WalRcvData::mutex, WalRcvData::receivedTLI, WalRcvData::receivedUpto, recvFile, recvSegNo, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, ThisTimeLineID, update_process_title, WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvWrite().

1001 {
1002  if (LogstreamResult.Flush < LogstreamResult.Write)
1003  {
1004  WalRcvData *walrcv = WalRcv;
1005 
1007 
1008  LogstreamResult.Flush = LogstreamResult.Write;
1009 
1010  /* Update shared-memory status */
1011  SpinLockAcquire(&walrcv->mutex);
1012  if (walrcv->receivedUpto < LogstreamResult.Flush)
1013  {
1014  walrcv->latestChunkStart = walrcv->receivedUpto;
1015  walrcv->receivedUpto = LogstreamResult.Flush;
1016  walrcv->receivedTLI = ThisTimeLineID;
1017  }
1018  SpinLockRelease(&walrcv->mutex);
1019 
1020  /* Signal the startup process and walsender that new WAL has arrived */
1021  WakeupRecovery();
1023  WalSndWakeup();
1024 
1025  /* Report XLOG streaming progress in PS display */
1027  {
1028  char activitymsg[50];
1029 
1030  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1031  (uint32) (LogstreamResult.Write >> 32),
1032  (uint32) LogstreamResult.Write);
1033  set_ps_display(activitymsg, false);
1034  }
1035 
1036  /* Also let the master know that we made some progress */
1037  if (!dying)
1038  {
1039  XLogWalRcvSendReply(false, false);
1040  XLogWalRcvSendHSFeedback(false);
1041  }
1042  }
1043 }
static struct @25 LogstreamResult
slock_t mutex
Definition: walreceiver.h:136
bool update_process_title
Definition: ps_status.c:35
void issue_xlog_fsync(int fd, XLogSegNo segno)
Definition: xlog.c:10085
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1128
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:331
TimeLineID receivedTLI
Definition: walreceiver.h:83
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12225
XLogRecPtr latestChunkStart
Definition: walreceiver.h:91
#define AllowCascadeReplication()
Definition: walreceiver.h:38
unsigned int uint32
Definition: c.h:358
XLogRecPtr receivedUpto
Definition: walreceiver.h:82
static XLogSegNo recvSegNo
Definition: walreceiver.c:92
#define SpinLockRelease(lock)
Definition: spin.h:64
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1059
TimeLineID ThisTimeLineID
Definition: xlog.c:187
WalRcvData * WalRcv
static int recvFile
Definition: walreceiver.c:90
#define snprintf
Definition: port.h:192
void WalSndWakeup(void)
Definition: walsender.c:3084

◆ XLogWalRcvProcessMsg()

static void XLogWalRcvProcessMsg ( unsigned char  type,
char *  buf,
Size  len 
)
static

Definition at line 824 of file walreceiver.c.

References appendBinaryStringInfo(), ereport, errcode(), errmsg_internal(), ERROR, pq_getmsgbyte(), pq_getmsgint64(), ProcessWalSndrMessage(), resetStringInfo(), XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

825 {
826  int hdrlen;
827  XLogRecPtr dataStart;
828  XLogRecPtr walEnd;
829  TimestampTz sendTime;
830  bool replyRequested;
831 
833 
834  switch (type)
835  {
836  case 'w': /* WAL records */
837  {
838  /* copy message to StringInfo */
839  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
840  if (len < hdrlen)
841  ereport(ERROR,
842  (errcode(ERRCODE_PROTOCOL_VIOLATION),
843  errmsg_internal("invalid WAL message received from primary")));
845 
846  /* read the fields */
847  dataStart = pq_getmsgint64(&incoming_message);
848  walEnd = pq_getmsgint64(&incoming_message);
849  sendTime = pq_getmsgint64(&incoming_message);
850  ProcessWalSndrMessage(walEnd, sendTime);
851 
852  buf += hdrlen;
853  len -= hdrlen;
854  XLogWalRcvWrite(buf, len, dataStart);
855  break;
856  }
857  case 'k': /* Keepalive */
858  {
859  /* copy message to StringInfo */
860  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
861  if (len != hdrlen)
862  ereport(ERROR,
863  (errcode(ERRCODE_PROTOCOL_VIOLATION),
864  errmsg_internal("invalid keepalive message received from primary")));
866 
867  /* read the fields */
868  walEnd = pq_getmsgint64(&incoming_message);
869  sendTime = pq_getmsgint64(&incoming_message);
870  replyRequested = pq_getmsgbyte(&incoming_message);
871 
872  ProcessWalSndrMessage(walEnd, sendTime);
873 
874  /* If the primary requested a reply, send one immediately */
875  if (replyRequested)
876  XLogWalRcvSendReply(true, false);
877  break;
878  }
879  default:
880  ereport(ERROR,
881  (errcode(ERRCODE_PROTOCOL_VIOLATION),
882  errmsg_internal("invalid replication message type %d",
883  type)));
884  }
885 }
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
Definition: walreceiver.c:1244
int64 TimestampTz
Definition: timestamp.h:39
static StringInfoData incoming_message
Definition: walreceiver.c:113
int errcode(int sqlerrcode)
Definition: elog.c:570
#define ERROR
Definition: elog.h:43
static char * buf
Definition: pg_test_fsync.c:68
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
#define ereport(elevel, rest)
Definition: elog.h:141
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1059
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
int errmsg_internal(const char *fmt,...)
Definition: elog.c:814
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:214
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
Definition: walreceiver.c:891

◆ XLogWalRcvSendHSFeedback()

static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1128 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, EpochFromFullTransactionId, GetCurrentTimestamp(), GetOldestXmin(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), pq_sendbyte(), pq_sendint32(), pq_sendint64(), PROCARRAY_FLAGS_DEFAULT, PROCARRAY_SLOTS_XMIN, ProcArrayGetReplicationSlotXmin(), ReadNextFullTransactionId(), resetStringInfo(), TimestampDifferenceExceeds(), TransactionIdIsValid, TransactionIdPrecedes(), wal_receiver_status_interval, walrcv_send, and XidFromFullTransactionId.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

1129 {
1130  TimestampTz now;
1131  FullTransactionId nextFullXid;
1132  TransactionId nextXid;
1133  uint32 xmin_epoch,
1134  catalog_xmin_epoch;
1135  TransactionId xmin,
1136  catalog_xmin;
1137  static TimestampTz sendTime = 0;
1138 
1139  /* initially true so we always send at least one feedback message */
1140  static bool master_has_standby_xmin = true;
1141 
1142  /*
1143  * If the user doesn't want status to be reported to the master, be sure
1144  * to exit before doing anything at all.
1145  */
1147  !master_has_standby_xmin)
1148  return;
1149 
1150  /* Get current timestamp. */
1151  now = GetCurrentTimestamp();
1152 
1153  if (!immed)
1154  {
1155  /*
1156  * Send feedback at most once per wal_receiver_status_interval.
1157  */
1158  if (!TimestampDifferenceExceeds(sendTime, now,
1160  return;
1161  sendTime = now;
1162  }
1163 
1164  /*
1165  * If Hot Standby is not yet accepting connections there is nothing to
1166  * send. Check this after the interval has expired to reduce number of
1167  * calls.
1168  *
1169  * Bailing out here also ensures that we don't send feedback until we've
1170  * read our own replication slot state, so we don't tell the master to
1171  * discard needed xmin or catalog_xmin from any slots that may exist on
1172  * this replica.
1173  */
1174  if (!HotStandbyActive())
1175  return;
1176 
1177  /*
1178  * Make the expensive call to get the oldest xmin once we are certain
1179  * everything else has been checked.
1180  */
1182  {
1183  TransactionId slot_xmin;
1184 
1185  /*
1186  * Usually GetOldestXmin() would include both global replication slot
1187  * xmin and catalog_xmin in its calculations, but we want to derive
1188  * separate values for each of those. So we ask for an xmin that
1189  * excludes the catalog_xmin.
1190  */
1191  xmin = GetOldestXmin(NULL,
1193 
1194  ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
1195 
1196  if (TransactionIdIsValid(slot_xmin) &&
1197  TransactionIdPrecedes(slot_xmin, xmin))
1198  xmin = slot_xmin;
1199  }
1200  else
1201  {
1202  xmin = InvalidTransactionId;
1203  catalog_xmin = InvalidTransactionId;
1204  }
1205 
1206  /*
1207  * Get epoch and adjust if nextXid and oldestXmin are different sides of
1208  * the epoch boundary.
1209  */
1210  nextFullXid = ReadNextFullTransactionId();
1211  nextXid = XidFromFullTransactionId(nextFullXid);
1212  xmin_epoch = EpochFromFullTransactionId(nextFullXid);
1213  catalog_xmin_epoch = xmin_epoch;
1214  if (nextXid < xmin)
1215  xmin_epoch--;
1216  if (nextXid < catalog_xmin)
1217  catalog_xmin_epoch--;
1218 
1219  elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1220  xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1221 
1222  /* Construct the message and send it. */
1224  pq_sendbyte(&reply_message, 'h');
1226  pq_sendint32(&reply_message, xmin);
1227  pq_sendint32(&reply_message, xmin_epoch);
1228  pq_sendint32(&reply_message, catalog_xmin);
1229  pq_sendint32(&reply_message, catalog_xmin_epoch);
1231  if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1232  master_has_standby_xmin = true;
1233  else
1234  master_has_standby_xmin = false;
1235 }
bool hot_standby_feedback
Definition: walreceiver.c:77
uint32 TransactionId
Definition: c.h:507
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
int64 TimestampTz
Definition: timestamp.h:39
bool HotStandbyActive(void)
Definition: xlog.c:7954
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
int wal_receiver_status_interval
Definition: walreceiver.c:75
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1668
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:3003
static WalReceiverConn * wrconn
Definition: walreceiver.c:80
#define XidFromFullTransactionId(x)
Definition: transam.h:48
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static StringInfoData reply_message
Definition: walreceiver.c:112
#define PROCARRAY_SLOTS_XMIN
Definition: procarray.h:37
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
unsigned int uint32
Definition: c.h:358
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:246
#define PROCARRAY_FLAGS_DEFAULT
Definition: procarray.h:50
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
TransactionId GetOldestXmin(Relation rel, int flags)
Definition: procarray.c:1304
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:275
#define elog(elevel,...)
Definition: elog.h:226
#define TransactionIdIsValid(xid)
Definition: transam.h:41
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533

◆ XLogWalRcvSendReply()

static void XLogWalRcvSendReply ( bool  force,
bool  requestReply 
)
static

Definition at line 1059 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), GetXLogReplayRecPtr(), StringInfoData::len, LogstreamResult, now(), pq_sendbyte(), pq_sendint64(), resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

1060 {
1061  static XLogRecPtr writePtr = 0;
1062  static XLogRecPtr flushPtr = 0;
1063  XLogRecPtr applyPtr;
1064  static TimestampTz sendTime = 0;
1065  TimestampTz now;
1066 
1067  /*
1068  * If the user doesn't want status to be reported to the master, be sure
1069  * to exit before doing anything at all.
1070  */
1071  if (!force && wal_receiver_status_interval <= 0)
1072  return;
1073 
1074  /* Get current timestamp. */
1075  now = GetCurrentTimestamp();
1076 
1077  /*
1078  * We can compare the write and flush positions to the last message we
1079  * sent without taking any lock, but the apply position requires a spin
1080  * lock, so we don't check that unless something else has changed or 10
1081  * seconds have passed. This means that the apply WAL location will
1082  * appear, from the master's point of view, to lag slightly, but since
1083  * this is only for reporting purposes and only on idle systems, that's
1084  * probably OK.
1085  */
1086  if (!force
1087  && writePtr == LogstreamResult.Write
1088  && flushPtr == LogstreamResult.Flush
1089  && !TimestampDifferenceExceeds(sendTime, now,
1091  return;
1092  sendTime = now;
1093 
1094  /* Construct a new message */
1095  writePtr = LogstreamResult.Write;
1096  flushPtr = LogstreamResult.Flush;
1097  applyPtr = GetXLogReplayRecPtr(NULL);
1098 
1100  pq_sendbyte(&reply_message, 'r');
1101  pq_sendint64(&reply_message, writePtr);
1102  pq_sendint64(&reply_message, flushPtr);
1103  pq_sendint64(&reply_message, applyPtr);
1105  pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1106 
1107  /* Send it */
1108  elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1109  (uint32) (writePtr >> 32), (uint32) writePtr,
1110  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1111  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1112  requestReply ? " (reply requested)" : "");
1113 
1115 }
static struct @25 LogstreamResult
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
int64 TimestampTz
Definition: timestamp.h:39
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
int wal_receiver_status_interval
Definition: walreceiver.c:75
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1668
static WalReceiverConn * wrconn
Definition: walreceiver.c:80
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static StringInfoData reply_message
Definition: walreceiver.c:112
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11130
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
unsigned int uint32
Definition: c.h:358
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:275
#define elog(elevel,...)
Definition: elog.h:226
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533

◆ XLogWalRcvWrite()

static void XLogWalRcvWrite ( char *  buf,
Size  nbytes,
XLogRecPtr  recptr 
)
static

Definition at line 891 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, close, ereport, errcode_for_file_access(), errmsg(), LogstreamResult, MAXFNAMELEN, PANIC, recvFile, recvFileTLI, recvOff, recvSegNo, ThisTimeLineID, wal_segment_size, write, XLByteInSeg, XLByteToSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileInit(), XLogFileName, XLogFileNameP(), XLogSegmentOffset, and XLogWalRcvFlush().

Referenced by XLogWalRcvProcessMsg().

892 {
893  int startoff;
894  int byteswritten;
895 
896  while (nbytes > 0)
897  {
898  int segbytes;
899 
900  if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
901  {
902  bool use_existent;
903 
904  /*
905  * fsync() and close current file before we switch to next one. We
906  * would otherwise have to reopen this file to fsync it later
907  */
908  if (recvFile >= 0)
909  {
910  char xlogfname[MAXFNAMELEN];
911 
912  XLogWalRcvFlush(false);
913 
914  /*
915  * XLOG segment files will be re-read by recovery in startup
916  * process soon, so we don't advise the OS to release cache
917  * pages associated with the file like XLogFileClose() does.
918  */
919  if (close(recvFile) != 0)
920  ereport(PANIC,
922  errmsg("could not close log segment %s: %m",
924 
925  /*
926  * Create .done file forcibly to prevent the streamed segment
927  * from being archived later.
928  */
931  XLogArchiveForceDone(xlogfname);
932  else
933  XLogArchiveNotify(xlogfname);
934  }
935  recvFile = -1;
936 
937  /* Create/use new log file */
939  use_existent = true;
940  recvFile = XLogFileInit(recvSegNo, &use_existent, true);
942  recvOff = 0;
943  }
944 
945  /* Calculate the start offset of the received logs */
946  startoff = XLogSegmentOffset(recptr, wal_segment_size);
947 
948  if (startoff + nbytes > wal_segment_size)
949  segbytes = wal_segment_size - startoff;
950  else
951  segbytes = nbytes;
952 
953  /* Need to seek in the file? */
954  if (recvOff != startoff)
955  {
956  if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
957  ereport(PANIC,
959  errmsg("could not seek in log segment %s to offset %u: %m",
961  startoff)));
962  recvOff = startoff;
963  }
964 
965  /* OK to write the logs */
966  errno = 0;
967 
968  byteswritten = write(recvFile, buf, segbytes);
969  if (byteswritten <= 0)
970  {
971  /* if write didn't set errno, assume no disk space */
972  if (errno == 0)
973  errno = ENOSPC;
974  ereport(PANIC,
976  errmsg("could not write to log segment %s "
977  "at offset %u, length %lu: %m",
979  recvOff, (unsigned long) segbytes)));
980  }
981 
982  /* Update state for write */
983  recptr += byteswritten;
984 
985  recvOff += byteswritten;
986  nbytes -= byteswritten;
987  buf += byteswritten;
988 
989  LogstreamResult.Write = recptr;
990  }
991 }
static struct @25 LogstreamResult
int wal_segment_size
Definition: xlog.c:112
#define write(a, b, c)
Definition: win32.h:14
int XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
Definition: xlog.c:3205
#define PANIC
Definition: elog.h:53
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:504
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10130
static char * buf
Definition: pg_test_fsync.c:68
int XLogArchiveMode
Definition: xlog.c:91
int errcode_for_file_access(void)
Definition: elog.c:593
#define ereport(elevel, rest)
Definition: elog.h:141
static TimeLineID recvFileTLI
Definition: walreceiver.c:91
static XLogSegNo recvSegNo
Definition: walreceiver.c:92
#define MAXFNAMELEN
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:554
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
TimeLineID ThisTimeLineID
Definition: xlog.c:187
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1000
int errmsg(const char *fmt,...)
Definition: elog.c:784
static uint32 recvOff
Definition: walreceiver.c:93
#define close(a)
Definition: win32.h:12
static int recvFile
Definition: walreceiver.c:90
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

Variable Documentation

◆ Flush

XLogRecPtr Flush

Definition at line 109 of file walreceiver.c.

◆ got_SIGHUP

volatile sig_atomic_t got_SIGHUP = false
static

Definition at line 99 of file walreceiver.c.

Referenced by WalRcvSigHupHandler(), and WalReceiverMain().

◆ got_SIGTERM

volatile sig_atomic_t got_SIGTERM = false
static

Definition at line 100 of file walreceiver.c.

Referenced by ProcessWalRcvInterrupts(), and WalRcvShutdownHandler().

◆ hot_standby_feedback

bool hot_standby_feedback

Definition at line 77 of file walreceiver.c.

Referenced by XLogWalRcvSendHSFeedback().

◆ incoming_message

StringInfoData incoming_message
static

Definition at line 113 of file walreceiver.c.

◆ LogstreamResult

struct { ... } LogstreamResult

◆ recvFile

int recvFile = -1
static

Definition at line 90 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ recvFileTLI

TimeLineID recvFileTLI = 0
static

Definition at line 91 of file walreceiver.c.

Referenced by WalReceiverMain(), and XLogWalRcvWrite().

◆ recvOff

uint32 recvOff = 0
static

Definition at line 93 of file walreceiver.c.

Referenced by XLogWalRcvWrite().

◆ recvSegNo

XLogSegNo recvSegNo = 0
static

Definition at line 92 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ reply_message

StringInfoData reply_message
static

Definition at line 112 of file walreceiver.c.

Referenced by send_feedback().

◆ wal_receiver_status_interval

int wal_receiver_status_interval

Definition at line 75 of file walreceiver.c.

Referenced by send_feedback(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ wal_receiver_timeout

int wal_receiver_timeout

Definition at line 76 of file walreceiver.c.

Referenced by logicalrep_worker_launch(), LogicalRepApplyLoop(), and WalReceiverMain().

◆ WalReceiverFunctions

WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 81 of file walreceiver.c.

Referenced by _PG_init().

◆ wrconn

WalReceiverConn* wrconn = NULL
static

Definition at line 80 of file walreceiver.c.

◆ Write

XLogRecPtr Write

Definition at line 108 of file walreceiver.c.