PostgreSQL Source Code  git master
walreceiver.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "common/ip.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */
 

Functions

static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr)
 
static void XLogWalRcvFlush (bool dying)
 
static void XLogWalRcvSendReply (bool force, bool requestReply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
static void WalRcvSigHupHandler (SIGNAL_ARGS)
 
static void WalRcvShutdownHandler (SIGNAL_ARGS)
 
static void WalRcvQuickDieHandler (SIGNAL_ARGS)
 
void ProcessWalRcvInterrupts (void)
 
void WalReceiverMain (void)
 
void WalRcvForceReply (void)
 
static const char * WalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
static uint32 recvOff = 0
 
static volatile sig_atomic_t got_SIGHUP = false
 
static volatile sig_atomic_t got_SIGTERM = false
 
struct {
   XLogRecPtr   Write
 
   XLogRecPtr   Flush
 
LogstreamResult
 
static StringInfoData reply_message
 
static StringInfoData incoming_message
 

Macro Definition Documentation

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */

Definition at line 84 of file walreceiver.c.

Referenced by WalReceiverMain().

Function Documentation

◆ pg_stat_get_wal_receiver()

Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1343 of file walreceiver.c.

References WalRcvData::conninfo, CStringGetTextDatum, elog, ERROR, get_call_result_type(), GetUserId(), heap_form_tuple(), HeapTupleGetDatum, Int32GetDatum, is_member_of_role(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum, MAXCONNINFO, MemSet, WalRcvData::mutex, NAMEDATALEN, TupleDescData::natts, NI_MAXHOST, palloc0(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receivedUpto, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::sender_host, WalRcvData::sender_port, WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, strlcpy(), TimestampTzGetDatum, TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, and XLogRecPtrIsInvalid.

1344 {
1345  TupleDesc tupdesc;
1346  Datum *values;
1347  bool *nulls;
1348  int pid;
1349  bool ready_to_display;
1351  XLogRecPtr receive_start_lsn;
1352  TimeLineID receive_start_tli;
1353  XLogRecPtr received_lsn;
1354  TimeLineID received_tli;
1355  TimestampTz last_send_time;
1356  TimestampTz last_receipt_time;
1357  XLogRecPtr latest_end_lsn;
1358  TimestampTz latest_end_time;
1359  char sender_host[NI_MAXHOST];
1360  int sender_port = 0;
1361  char slotname[NAMEDATALEN];
1362  char conninfo[MAXCONNINFO];
1363 
1364  /* Take a lock to ensure value consistency */
1366  pid = (int) WalRcv->pid;
1367  ready_to_display = WalRcv->ready_to_display;
1368  state = WalRcv->walRcvState;
1369  receive_start_lsn = WalRcv->receiveStart;
1370  receive_start_tli = WalRcv->receiveStartTLI;
1371  received_lsn = WalRcv->receivedUpto;
1372  received_tli = WalRcv->receivedTLI;
1373  last_send_time = WalRcv->lastMsgSendTime;
1374  last_receipt_time = WalRcv->lastMsgReceiptTime;
1375  latest_end_lsn = WalRcv->latestWalEnd;
1376  latest_end_time = WalRcv->latestWalEndTime;
1377  strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1378  strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
1379  sender_port = WalRcv->sender_port;
1380  strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1382 
1383  /*
1384  * No WAL receiver (or not ready yet), just return a tuple with NULL
1385  * values
1386  */
1387  if (pid == 0 || !ready_to_display)
1388  PG_RETURN_NULL();
1389 
1390  /* determine result type */
1391  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1392  elog(ERROR, "return type must be a row type");
1393 
1394  values = palloc0(sizeof(Datum) * tupdesc->natts);
1395  nulls = palloc0(sizeof(bool) * tupdesc->natts);
1396 
1397  /* Fetch values */
1398  values[0] = Int32GetDatum(pid);
1399 
1400  if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
1401  {
1402  /*
1403  * Only superusers and members of pg_read_all_stats can see details.
1404  * Other users only get the pid value to know whether it is a WAL
1405  * receiver, but no details.
1406  */
1407  MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1408  }
1409  else
1410  {
1411  values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1412 
1413  if (XLogRecPtrIsInvalid(receive_start_lsn))
1414  nulls[2] = true;
1415  else
1416  values[2] = LSNGetDatum(receive_start_lsn);
1417  values[3] = Int32GetDatum(receive_start_tli);
1418  if (XLogRecPtrIsInvalid(received_lsn))
1419  nulls[4] = true;
1420  else
1421  values[4] = LSNGetDatum(received_lsn);
1422  values[5] = Int32GetDatum(received_tli);
1423  if (last_send_time == 0)
1424  nulls[6] = true;
1425  else
1426  values[6] = TimestampTzGetDatum(last_send_time);
1427  if (last_receipt_time == 0)
1428  nulls[7] = true;
1429  else
1430  values[7] = TimestampTzGetDatum(last_receipt_time);
1431  if (XLogRecPtrIsInvalid(latest_end_lsn))
1432  nulls[8] = true;
1433  else
1434  values[8] = LSNGetDatum(latest_end_lsn);
1435  if (latest_end_time == 0)
1436  nulls[9] = true;
1437  else
1438  values[9] = TimestampTzGetDatum(latest_end_time);
1439  if (*slotname == '\0')
1440  nulls[10] = true;
1441  else
1442  values[10] = CStringGetTextDatum(slotname);
1443  if (*sender_host == '\0')
1444  nulls[11] = true;
1445  else
1446  values[11] = CStringGetTextDatum(sender_host);
1447  if (sender_port == 0)
1448  nulls[12] = true;
1449  else
1450  values[12] = Int32GetDatum(sender_port);
1451  if (*conninfo == '\0')
1452  nulls[13] = true;
1453  else
1454  values[13] = CStringGetTextDatum(conninfo);
1455  }
1456 
1457  /* Returns the record as Datum */
1458  PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1459 }
int sender_port
Definition: walreceiver.h:116
static const char * WalRcvGetStateString(WalRcvState state)
Definition: walreceiver.c:1318
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:196
uint32 TimeLineID
Definition: xlogdefs.h:52
slock_t mutex
Definition: walreceiver.h:136
Oid GetUserId(void)
Definition: miscinit.c:380
int64 TimestampTz
Definition: timestamp.h:39
WalRcvState walRcvState
Definition: walreceiver.h:63
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
TimeLineID receivedTLI
Definition: walreceiver.h:83
#define MemSet(start, val, len)
Definition: c.h:962
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:97
TimestampTz lastMsgSendTime
Definition: walreceiver.h:96
WalRcvState
Definition: walreceiver.h:43
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define NI_MAXHOST
Definition: getaddrinfo.h:88
#define ERROR
Definition: elog.h:43
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
pid_t pid
Definition: walreceiver.h:62
#define MAXCONNINFO
Definition: walreceiver.h:35
XLogRecPtr receivedUpto
Definition: walreceiver.h:82
TimeLineID receiveStartTLI
Definition: walreceiver.h:73
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
void * palloc0(Size size)
Definition: mcxt.c:980
uintptr_t Datum
Definition: postgres.h:367
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:343
bool ready_to_display
Definition: walreceiver.h:125
TimestampTz latestWalEndTime
Definition: walreceiver.h:103
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4924
XLogRecPtr latestWalEnd
Definition: walreceiver.h:102
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:220
WalRcvData * WalRcv
static Datum values[MAXATTR]
Definition: bootstrap.c:167
#define Int32GetDatum(X)
Definition: postgres.h:479
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:115
#define elog(elevel,...)
Definition: elog.h:228
#define CStringGetTextDatum(s)
Definition: builtins.h:83
XLogRecPtr receiveStart
Definition: walreceiver.h:72
#define PG_RETURN_NULL()
Definition: fmgr.h:335
char slotname[NAMEDATALEN]
Definition: walreceiver.h:122
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:109

◆ ProcessWalRcvInterrupts()

void ProcessWalRcvInterrupts ( void  )

Definition at line 147 of file walreceiver.c.

References CHECK_FOR_INTERRUPTS, ereport, errcode(), errmsg(), FATAL, and got_SIGTERM.

Referenced by libpqrcv_connect(), libpqrcv_PQgetResult(), libpqrcv_processTuples(), walrcv_clear_result(), WalRcvWaitForStartPosition(), and WalReceiverMain().

148 {
149  /*
150  * Although walreceiver interrupt handling doesn't use the same scheme as
151  * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
152  * any incoming signals on Win32.
153  */
155 
156  if (got_SIGTERM)
157  {
158  ereport(FATAL,
159  (errcode(ERRCODE_ADMIN_SHUTDOWN),
160  errmsg("terminating walreceiver process due to administrator command")));
161  }
162 }
int errcode(int sqlerrcode)
Definition: elog.c:608
#define FATAL
Definition: elog.h:52
#define ereport(elevel, rest)
Definition: elog.h:141
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static volatile sig_atomic_t got_SIGTERM
Definition: walreceiver.c:101

◆ ProcessWalSndrMessage()

static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1247 of file walreceiver.c.

References DEBUG2, elog, GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, log_min_messages, WalRcvData::mutex, pfree(), pstrdup(), SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

1248 {
1249  WalRcvData *walrcv = WalRcv;
1250 
1251  TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1252 
1253  /* Update shared-memory status */
1254  SpinLockAcquire(&walrcv->mutex);
1255  if (walrcv->latestWalEnd < walEnd)
1256  walrcv->latestWalEndTime = sendTime;
1257  walrcv->latestWalEnd = walEnd;
1258  walrcv->lastMsgSendTime = sendTime;
1259  walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1260  SpinLockRelease(&walrcv->mutex);
1261 
1262  if (log_min_messages <= DEBUG2)
1263  {
1264  char *sendtime;
1265  char *receipttime;
1266  int applyDelay;
1267 
1268  /* Copy because timestamptz_to_str returns a static buffer */
1269  sendtime = pstrdup(timestamptz_to_str(sendTime));
1270  receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1271  applyDelay = GetReplicationApplyDelay();
1272 
1273  /* apply delay is not available */
1274  if (applyDelay == -1)
1275  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1276  sendtime,
1277  receipttime,
1279  else
1280  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1281  sendtime,
1282  receipttime,
1283  applyDelay,
1285 
1286  pfree(sendtime);
1287  pfree(receipttime);
1288  }
1289 }
slock_t mutex
Definition: walreceiver.h:136
int GetReplicationTransferLatency(void)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1186
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:97
TimestampTz lastMsgSendTime
Definition: walreceiver.h:96
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pfree(void *pointer)
Definition: mcxt.c:1056
#define DEBUG2
Definition: elog.h:24
int GetReplicationApplyDelay(void)
#define SpinLockRelease(lock)
Definition: spin.h:64
TimestampTz latestWalEndTime
Definition: walreceiver.h:103
int log_min_messages
Definition: guc.c:513
XLogRecPtr latestWalEnd
Definition: walreceiver.h:102
WalRcvData * WalRcv
#define elog(elevel,...)
Definition: elog.h:228
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1743

◆ WalRcvDie()

static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 732 of file walreceiver.c.

References Assert, WalRcvData::latch, WalRcvData::mutex, MyProcPid, WalRcvData::pid, WalRcvData::ready_to_display, SpinLockAcquire, SpinLockRelease, WakeupRecovery(), WalRcv, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

733 {
734  WalRcvData *walrcv = WalRcv;
735 
736  /* Ensure that all WAL records received are flushed to disk */
737  XLogWalRcvFlush(true);
738 
739  /* Mark ourselves inactive in shared memory */
740  SpinLockAcquire(&walrcv->mutex);
741  Assert(walrcv->walRcvState == WALRCV_STREAMING ||
742  walrcv->walRcvState == WALRCV_RESTARTING ||
743  walrcv->walRcvState == WALRCV_STARTING ||
744  walrcv->walRcvState == WALRCV_WAITING ||
745  walrcv->walRcvState == WALRCV_STOPPING);
746  Assert(walrcv->pid == MyProcPid);
747  walrcv->walRcvState = WALRCV_STOPPED;
748  walrcv->pid = 0;
749  walrcv->ready_to_display = false;
750  walrcv->latch = NULL;
751  SpinLockRelease(&walrcv->mutex);
752 
753  /* Terminate the connection gracefully. */
754  if (wrconn != NULL)
756 
757  /* Wake up the startup process to notice promptly that we're gone */
758  WakeupRecovery();
759 }
int MyProcPid
Definition: globals.c:40
slock_t mutex
Definition: walreceiver.h:136
WalRcvState walRcvState
Definition: walreceiver.h:63
static WalReceiverConn * wrconn
Definition: walreceiver.c:81
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12279
pid_t pid
Definition: walreceiver.h:62
Latch * latch
Definition: walreceiver.h:134
#define SpinLockRelease(lock)
Definition: spin.h:64
bool ready_to_display
Definition: walreceiver.h:125
#define Assert(condition)
Definition: c.h:739
#define walrcv_disconnect(conn)
Definition: walreceiver.h:281
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1003
WalRcvData * WalRcv

◆ WalRcvFetchTimeLineHistoryFiles()

static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 685 of file walreceiver.c.

References ereport, errcode(), errmsg(), errmsg_internal(), ERROR, existsTimeLineHistory(), LOG, MAXFNAMELEN, pfree(), TLHistoryFileName, walrcv_readtimelinehistoryfile, and writeTimeLineHistoryFile().

Referenced by WalReceiverMain().

686 {
687  TimeLineID tli;
688 
689  for (tli = first; tli <= last; tli++)
690  {
691  /* there's no history file for timeline 1 */
692  if (tli != 1 && !existsTimeLineHistory(tli))
693  {
694  char *fname;
695  char *content;
696  int len;
697  char expectedfname[MAXFNAMELEN];
698 
699  ereport(LOG,
700  (errmsg("fetching timeline history file for timeline %u from primary server",
701  tli)));
702 
703  walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
704 
705  /*
706  * Check that the filename on the master matches what we
707  * calculated ourselves. This is just a sanity check, it should
708  * always match.
709  */
710  TLHistoryFileName(expectedfname, tli);
711  if (strcmp(fname, expectedfname) != 0)
712  ereport(ERROR,
713  (errcode(ERRCODE_PROTOCOL_VIOLATION),
714  errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
715  tli)));
716 
717  /*
718  * Write the file to pg_wal.
719  */
720  writeTimeLineHistoryFile(tli, content, len);
721 
722  pfree(fname);
723  pfree(content);
724  }
725  }
726 }
uint32 TimeLineID
Definition: xlogdefs.h:52
int errcode(int sqlerrcode)
Definition: elog.c:608
#define LOG
Definition: elog.h:26
static WalReceiverConn * wrconn
Definition: walreceiver.c:81
void pfree(void *pointer)
Definition: mcxt.c:1056
#define ERROR
Definition: elog.h:43
#define TLHistoryFileName(fname, tli)
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition: timeline.c:450
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition: timeline.c:207
#define ereport(elevel, rest)
Definition: elog.h:141
#define MAXFNAMELEN
int errmsg_internal(const char *fmt,...)
Definition: elog.c:909
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
Definition: walreceiver.h:267
int errmsg(const char *fmt,...)
Definition: elog.c:822

◆ WalRcvForceReply()

void WalRcvForceReply ( void  )

Definition at line 1300 of file walreceiver.c.

References WalRcvData::force_reply, WalRcvData::latch, WalRcvData::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by StartupXLOG(), WaitForWALToBecomeAvailable(), and walrcv_clear_result().

1301 {
1302  Latch *latch;
1303 
1304  WalRcv->force_reply = true;
1305  /* fetching the latch pointer might not be atomic, so use spinlock */
1307  latch = WalRcv->latch;
1309  if (latch)
1310  SetLatch(latch);
1311 }
slock_t mutex
Definition: walreceiver.h:136
sig_atomic_t force_reply
Definition: walreceiver.h:143
void SetLatch(Latch *latch)
Definition: latch.c:436
#define SpinLockAcquire(lock)
Definition: spin.h:62
Definition: latch.h:110
Latch * latch
Definition: walreceiver.h:134
#define SpinLockRelease(lock)
Definition: spin.h:64
WalRcvData * WalRcv

◆ WalRcvGetStateString()

static const char* WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1318 of file walreceiver.c.

References WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

1319 {
1320  switch (state)
1321  {
1322  case WALRCV_STOPPED:
1323  return "stopped";
1324  case WALRCV_STARTING:
1325  return "starting";
1326  case WALRCV_STREAMING:
1327  return "streaming";
1328  case WALRCV_WAITING:
1329  return "waiting";
1330  case WALRCV_RESTARTING:
1331  return "restarting";
1332  case WALRCV_STOPPING:
1333  return "stopping";
1334  }
1335  return "UNKNOWN";
1336 }
Definition: regguts.h:298

◆ WalRcvQuickDieHandler()

static void WalRcvQuickDieHandler ( SIGNAL_ARGS  )
static

Definition at line 790 of file walreceiver.c.

Referenced by WalReceiverMain().

791 {
792  /*
793  * We DO NOT want to run proc_exit() or atexit() callbacks -- we're here
794  * because shared memory may be corrupted, so we don't want to try to
795  * clean up our transaction. Just nail the windows shut and get out of
796  * town. The callbacks wouldn't be safe to run from a signal handler,
797  * anyway.
798  *
799  * Note we use _exit(2) not _exit(0). This is to force the postmaster
800  * into a system reset cycle if someone sends a manual SIGQUIT to a random
801  * backend. This is necessary precisely because we don't clean up our
802  * shared memory state. (The "dead man switch" mechanism in pmsignal.c
803  * should ensure the postmaster sees this as a crash, too, but no harm in
804  * being doubly sure.)
805  */
806  _exit(2);
807 }

◆ WalRcvShutdownHandler()

static void WalRcvShutdownHandler ( SIGNAL_ARGS  )
static

Definition at line 771 of file walreceiver.c.

References got_SIGTERM, WalRcvData::latch, SetLatch(), and WalRcv.

Referenced by WalReceiverMain().

772 {
773  int save_errno = errno;
774 
775  got_SIGTERM = true;
776 
777  if (WalRcv->latch)
779 
780  errno = save_errno;
781 }
void SetLatch(Latch *latch)
Definition: latch.c:436
Latch * latch
Definition: walreceiver.h:134
WalRcvData * WalRcv
static volatile sig_atomic_t got_SIGTERM
Definition: walreceiver.c:101

◆ WalRcvSigHupHandler()

static void WalRcvSigHupHandler ( SIGNAL_ARGS  )
static

Definition at line 763 of file walreceiver.c.

References got_SIGHUP.

Referenced by WalReceiverMain().

764 {
765  got_SIGHUP = true;
766 }
static volatile sig_atomic_t got_SIGHUP
Definition: walreceiver.c:100

◆ WalRcvWaitForStartPosition()

static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 607 of file walreceiver.c.

References Assert, elog, FATAL, InvalidXLogRecPtr, WalRcvData::latch, WalRcvData::mutex, proc_exit(), ProcessWalRcvInterrupts(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, ResetLatch(), set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WAIT_EVENT_WAL_RECEIVER_WAIT_START, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by WalReceiverMain().

608 {
609  WalRcvData *walrcv = WalRcv;
610  int state;
611 
612  SpinLockAcquire(&walrcv->mutex);
613  state = walrcv->walRcvState;
614  if (state != WALRCV_STREAMING)
615  {
616  SpinLockRelease(&walrcv->mutex);
617  if (state == WALRCV_STOPPING)
618  proc_exit(0);
619  else
620  elog(FATAL, "unexpected walreceiver state");
621  }
622  walrcv->walRcvState = WALRCV_WAITING;
624  walrcv->receiveStartTLI = 0;
625  SpinLockRelease(&walrcv->mutex);
626 
628  set_ps_display("idle", false);
629 
630  /*
631  * nudge startup process to notice that we've stopped streaming and are
632  * now waiting for instructions.
633  */
634  WakeupRecovery();
635  for (;;)
636  {
637  ResetLatch(walrcv->latch);
638 
640 
641  SpinLockAcquire(&walrcv->mutex);
642  Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
643  walrcv->walRcvState == WALRCV_WAITING ||
644  walrcv->walRcvState == WALRCV_STOPPING);
645  if (walrcv->walRcvState == WALRCV_RESTARTING)
646  {
647  /* we don't expect primary_conninfo to change */
648  *startpoint = walrcv->receiveStart;
649  *startpointTLI = walrcv->receiveStartTLI;
650  walrcv->walRcvState = WALRCV_STREAMING;
651  SpinLockRelease(&walrcv->mutex);
652  break;
653  }
654  if (walrcv->walRcvState == WALRCV_STOPPING)
655  {
656  /*
657  * We should've received SIGTERM if the startup process wants us
658  * to die, but might as well check it here too.
659  */
660  SpinLockRelease(&walrcv->mutex);
661  exit(1);
662  }
663  SpinLockRelease(&walrcv->mutex);
664 
665  (void) WaitLatch(walrcv->latch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
667  }
668 
670  {
671  char activitymsg[50];
672 
673  snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
674  (uint32) (*startpoint >> 32),
675  (uint32) *startpoint);
676  set_ps_display(activitymsg, false);
677  }
678 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
slock_t mutex
Definition: walreceiver.h:136
bool update_process_title
Definition: ps_status.c:35
WalRcvState walRcvState
Definition: walreceiver.h:63
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:147
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:331
void proc_exit(int code)
Definition: ipc.c:104
void ResetLatch(Latch *latch)
Definition: latch.c:519
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:344
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12279
#define FATAL
Definition: elog.h:52
unsigned int uint32
Definition: c.h:359
TimeLineID receiveStartTLI
Definition: walreceiver.h:73
Latch * latch
Definition: walreceiver.h:134
#define SpinLockRelease(lock)
Definition: spin.h:64
#define Assert(condition)
Definition: c.h:739
Definition: regguts.h:298
WalRcvData * WalRcv
#define elog(elevel,...)
Definition: elog.h:228
XLogRecPtr receiveStart
Definition: walreceiver.h:72
#define snprintf
Definition: port.h:192
#define WL_LATCH_SET
Definition: latch.h:124
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ WalReceiverMain()

void WalReceiverMain ( void  )

Definition at line 167 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, Assert, BlockSig, buf, close, cluster_name, WalRcvData::conninfo, DEBUG1, elog, ereport, errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, WalRcvData::force_reply, GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), got_SIGHUP, initStringInfo(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latch, WalRcvData::latestWalEndTime, load_file(), LOG, WalRcvStreamOptions::logical, LogstreamResult, MAXCONNINFO, MAXFNAMELEN, WalRcvData::mutex, MyProc, MyProcPid, NAMEDATALEN, NAPTIME_PER_CYCLE, NI_MAXHOST, now(), on_shmem_exit(), options, PANIC, pfree(), pg_memory_barrier, PG_SETMASK, PGC_SIGHUP, PGINVALID_SOCKET, WalRcvStreamOptions::physical, WalRcvData::pid, pqsignal(), proc_exit(), ProcessConfigFile(), ProcessWalRcvInterrupts(), PGPROC::procLatch, procsignal_sigusr1_handler(), WalRcvStreamOptions::proto, WalRcvData::ready_to_display, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, ResetLatch(), WalRcvData::sender_host, WalRcvData::sender_port, SIG_DFL, SIG_IGN, SIGALRM, SIGCHLD, SIGHUP, SIGPIPE, SIGQUIT, SIGUSR1, SIGUSR2, WalRcvData::slotname, WalRcvStreamOptions::slotname, snprintf, SpinLockAcquire, SpinLockRelease, WalRcvStreamOptions::startpoint, strlcpy(), ThisTimeLineID, TimestampTzPlusMilliseconds, UINT64_FORMAT, UnBlockSig, WAIT_EVENT_WAL_RECEIVER_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, wal_segment_size, WalRcv, walrcv_connect, walrcv_endstreaming, walrcv_get_conninfo, walrcv_get_senderinfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvQuickDieHandler(), WalRcvShutdownHandler(), WalRcvSigHupHandler(), WalRcvData::walRcvState, WalRcvWaitForStartPosition(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_TIMEOUT, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName, XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by AuxiliaryProcessMain(), and walrcv_clear_result().

168 {
169  char conninfo[MAXCONNINFO];
170  char *tmp_conninfo;
171  char slotname[NAMEDATALEN];
172  XLogRecPtr startpoint;
173  TimeLineID startpointTLI;
174  TimeLineID primaryTLI;
175  bool first_stream;
176  WalRcvData *walrcv = WalRcv;
177  TimestampTz last_recv_timestamp;
179  bool ping_sent;
180  char *err;
181  char *sender_host = NULL;
182  int sender_port = 0;
183 
184  /*
185  * WalRcv should be set up already (if we are a backend, we inherit this
186  * by fork() or EXEC_BACKEND mechanism from the postmaster).
187  */
188  Assert(walrcv != NULL);
189 
190  now = GetCurrentTimestamp();
191 
192  /*
193  * Mark walreceiver as running in shared memory.
194  *
195  * Do this as early as possible, so that if we fail later on, we'll set
196  * state to STOPPED. If we die before this, the startup process will keep
197  * waiting for us to start up, until it times out.
198  */
199  SpinLockAcquire(&walrcv->mutex);
200  Assert(walrcv->pid == 0);
201  switch (walrcv->walRcvState)
202  {
203  case WALRCV_STOPPING:
204  /* If we've already been requested to stop, don't start up. */
205  walrcv->walRcvState = WALRCV_STOPPED;
206  /* fall through */
207 
208  case WALRCV_STOPPED:
209  SpinLockRelease(&walrcv->mutex);
210  proc_exit(1);
211  break;
212 
213  case WALRCV_STARTING:
214  /* The usual case */
215  break;
216 
217  case WALRCV_WAITING:
218  case WALRCV_STREAMING:
219  case WALRCV_RESTARTING:
220  default:
221  /* Shouldn't happen */
222  SpinLockRelease(&walrcv->mutex);
223  elog(PANIC, "walreceiver still running according to shared memory state");
224  }
225  /* Advertise our PID so that the startup process can kill us */
226  walrcv->pid = MyProcPid;
227  walrcv->walRcvState = WALRCV_STREAMING;
228 
229  /* Fetch information required to start streaming */
230  walrcv->ready_to_display = false;
231  strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
232  strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
233  startpoint = walrcv->receiveStart;
234  startpointTLI = walrcv->receiveStartTLI;
235 
236  /* Initialise to a sanish value */
237  walrcv->lastMsgSendTime =
238  walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
239 
240  /* Report the latch to use to awaken this process */
241  walrcv->latch = &MyProc->procLatch;
242 
243  SpinLockRelease(&walrcv->mutex);
244 
245  /* Arrange to clean up at walreceiver exit */
247 
248  /* Properly accept or ignore signals the postmaster might send us */
249  pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */
250  pqsignal(SIGINT, SIG_IGN);
251  pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */
252  pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */
257 
258  /* Reset some signals that are accepted by postmaster but not here */
260 
261  /* We allow SIGQUIT (quickdie) at all times */
262  sigdelset(&BlockSig, SIGQUIT);
263 
264  /* Load the libpq-specific functions */
265  load_file("libpqwalreceiver", false);
266  if (WalReceiverFunctions == NULL)
267  elog(ERROR, "libpqwalreceiver didn't initialize correctly");
268 
269  /* Unblock signals (they were blocked when the postmaster forked us) */
271 
272  /* Establish the connection to the primary for XLOG streaming */
273  wrconn = walrcv_connect(conninfo, false, cluster_name[0] ? cluster_name : "walreceiver", &err);
274  if (!wrconn)
275  ereport(ERROR,
276  (errmsg("could not connect to the primary server: %s", err)));
277 
278  /*
279  * Save user-visible connection string. This clobbers the original
280  * conninfo, for security. Also save host and port of the sender server
281  * this walreceiver is connected to.
282  */
283  tmp_conninfo = walrcv_get_conninfo(wrconn);
284  walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
285  SpinLockAcquire(&walrcv->mutex);
286  memset(walrcv->conninfo, 0, MAXCONNINFO);
287  if (tmp_conninfo)
288  strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
289 
290  memset(walrcv->sender_host, 0, NI_MAXHOST);
291  if (sender_host)
292  strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
293 
294  walrcv->sender_port = sender_port;
295  walrcv->ready_to_display = true;
296  SpinLockRelease(&walrcv->mutex);
297 
298  if (tmp_conninfo)
299  pfree(tmp_conninfo);
300 
301  if (sender_host)
302  pfree(sender_host);
303 
304  first_stream = true;
305  for (;;)
306  {
307  char *primary_sysid;
308  char standby_sysid[32];
310 
311  /*
312  * Check that we're connected to a valid server using the
313  * IDENTIFY_SYSTEM replication command.
314  */
315  primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
316 
317  snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
319  if (strcmp(primary_sysid, standby_sysid) != 0)
320  {
321  ereport(ERROR,
322  (errmsg("database system identifier differs between the primary and standby"),
323  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
324  primary_sysid, standby_sysid)));
325  }
326 
327  /*
328  * Confirm that the current timeline of the primary is the same or
329  * ahead of ours.
330  */
331  if (primaryTLI < startpointTLI)
332  ereport(ERROR,
333  (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
334  primaryTLI, startpointTLI)));
335 
336  /*
337  * Get any missing history files. We do this always, even when we're
338  * not interested in that timeline, so that if we're promoted to
339  * become the master later on, we don't select the same timeline that
340  * was already used in the current master. This isn't bullet-proof -
341  * you'll need some external software to manage your cluster if you
342  * need to ensure that a unique timeline id is chosen in every case,
343  * but let's avoid the confusion of timeline id collisions where we
344  * can.
345  */
346  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
347 
348  /*
349  * Start streaming.
350  *
351  * We'll try to start at the requested starting point and timeline,
352  * even if it's different from the server's latest timeline. In case
353  * we've already reached the end of the old timeline, the server will
354  * finish the streaming immediately, and we will go back to await
355  * orders from the startup process. If recovery_target_timeline is
356  * 'latest', the startup process will scan pg_wal and find the new
357  * history file, bump recovery target timeline, and ask us to restart
358  * on the new timeline.
359  */
360  options.logical = false;
361  options.startpoint = startpoint;
362  options.slotname = slotname[0] != '\0' ? slotname : NULL;
363  options.proto.physical.startpointTLI = startpointTLI;
364  ThisTimeLineID = startpointTLI;
365  if (walrcv_startstreaming(wrconn, &options))
366  {
367  if (first_stream)
368  ereport(LOG,
369  (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
370  (uint32) (startpoint >> 32), (uint32) startpoint,
371  startpointTLI)));
372  else
373  ereport(LOG,
374  (errmsg("restarted WAL streaming at %X/%X on timeline %u",
375  (uint32) (startpoint >> 32), (uint32) startpoint,
376  startpointTLI)));
377  first_stream = false;
378 
379  /* Initialize LogstreamResult and buffers for processing messages */
383 
384  /* Initialize the last recv timestamp */
385  last_recv_timestamp = GetCurrentTimestamp();
386  ping_sent = false;
387 
388  /* Loop until end-of-streaming or error */
389  for (;;)
390  {
391  char *buf;
392  int len;
393  bool endofwal = false;
394  pgsocket wait_fd = PGINVALID_SOCKET;
395  int rc;
396 
397  /*
398  * Exit walreceiver if we're not in recovery. This should not
399  * happen, but cross-check the status here.
400  */
401  if (!RecoveryInProgress())
402  ereport(FATAL,
403  (errmsg("cannot continue WAL streaming, recovery has already ended")));
404 
405  /* Process any requests or signals received recently */
407 
408  if (got_SIGHUP)
409  {
410  got_SIGHUP = false;
413  }
414 
415  /* See if we can read data immediately */
416  len = walrcv_receive(wrconn, &buf, &wait_fd);
417  if (len != 0)
418  {
419  /*
420  * Process the received data, and any subsequent data we
421  * can read without blocking.
422  */
423  for (;;)
424  {
425  if (len > 0)
426  {
427  /*
428  * Something was received from master, so reset
429  * timeout
430  */
431  last_recv_timestamp = GetCurrentTimestamp();
432  ping_sent = false;
433  XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
434  }
435  else if (len == 0)
436  break;
437  else if (len < 0)
438  {
439  ereport(LOG,
440  (errmsg("replication terminated by primary server"),
441  errdetail("End of WAL reached on timeline %u at %X/%X.",
442  startpointTLI,
443  (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
444  endofwal = true;
445  break;
446  }
447  len = walrcv_receive(wrconn, &buf, &wait_fd);
448  }
449 
450  /* Let the master know that we received some data. */
451  XLogWalRcvSendReply(false, false);
452 
453  /*
454  * If we've written some records, flush them to disk and
455  * let the startup process and primary server know about
456  * them.
457  */
458  XLogWalRcvFlush(false);
459  }
460 
461  /* Check if we need to exit the streaming loop. */
462  if (endofwal)
463  break;
464 
465  /*
466  * Ideally we would reuse a WaitEventSet object repeatedly
467  * here to avoid the overheads of WaitLatchOrSocket on epoll
468  * systems, but we can't be sure that libpq (or any other
469  * walreceiver implementation) has the same socket (even if
470  * the fd is the same number, it may have been closed and
471  * reopened since the last time). In future, if there is a
472  * function for removing sockets from WaitEventSet, then we
473  * could add and remove just the socket each time, potentially
474  * avoiding some system calls.
475  */
476  Assert(wait_fd != PGINVALID_SOCKET);
477  rc = WaitLatchOrSocket(walrcv->latch,
480  wait_fd,
483  if (rc & WL_LATCH_SET)
484  {
485  ResetLatch(walrcv->latch);
487 
488  if (walrcv->force_reply)
489  {
490  /*
491  * The recovery process has asked us to send apply
492  * feedback now. Make sure the flag is really set to
493  * false in shared memory before sending the reply, so
494  * we don't miss a new request for a reply.
495  */
496  walrcv->force_reply = false;
498  XLogWalRcvSendReply(true, false);
499  }
500  }
501  if (rc & WL_TIMEOUT)
502  {
503  /*
504  * We didn't receive anything new. If we haven't heard
505  * anything from the server for more than
506  * wal_receiver_timeout / 2, ping the server. Also, if
507  * it's been longer than wal_receiver_status_interval
508  * since the last update we sent, send a status update to
509  * the master anyway, to report any progress in applying
510  * WAL.
511  */
512  bool requestReply = false;
513 
514  /*
515  * Check if time since last receive from standby has
516  * reached the configured limit.
517  */
518  if (wal_receiver_timeout > 0)
519  {
521  TimestampTz timeout;
522 
523  timeout =
524  TimestampTzPlusMilliseconds(last_recv_timestamp,
526 
527  if (now >= timeout)
528  ereport(ERROR,
529  (errmsg("terminating walreceiver due to timeout")));
530 
531  /*
532  * We didn't receive anything new, for half of
533  * receiver replication timeout. Ping the server.
534  */
535  if (!ping_sent)
536  {
537  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
538  (wal_receiver_timeout / 2));
539  if (now >= timeout)
540  {
541  requestReply = true;
542  ping_sent = true;
543  }
544  }
545  }
546 
547  XLogWalRcvSendReply(requestReply, requestReply);
549  }
550  }
551 
552  /*
553  * The backend finished streaming. Exit streaming COPY-mode from
554  * our side, too.
555  */
556  walrcv_endstreaming(wrconn, &primaryTLI);
557 
558  /*
559  * If the server had switched to a new timeline that we didn't
560  * know about when we began streaming, fetch its timeline history
561  * file now.
562  */
563  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
564  }
565  else
566  ereport(LOG,
567  (errmsg("primary server contains no more WAL on requested timeline %u",
568  startpointTLI)));
569 
570  /*
571  * End of WAL reached on the requested timeline. Close the last
572  * segment, and await for new orders from the startup process.
573  */
574  if (recvFile >= 0)
575  {
576  char xlogfname[MAXFNAMELEN];
577 
578  XLogWalRcvFlush(false);
580  if (close(recvFile) != 0)
581  ereport(PANIC,
583  errmsg("could not close log segment %s: %m",
584  xlogfname)));
585 
586  /*
587  * Create .done file forcibly to prevent the streamed segment from
588  * being archived later.
589  */
591  XLogArchiveForceDone(xlogfname);
592  else
593  XLogArchiveNotify(xlogfname);
594  }
595  recvFile = -1;
596 
597  elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
598  WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
599  }
600  /* not reached */
601 }
static struct @25 LogstreamResult
int sender_port
Definition: walreceiver.h:116
#define SIGQUIT
Definition: win32_port.h:155
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
Definition: walreceiver.c:685
#define walrcv_get_conninfo(conn)
Definition: walreceiver.h:259
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:271
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
uint32 TimeLineID
Definition: xlogdefs.h:52
slock_t mutex
Definition: walreceiver.h:136
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
int wal_segment_size
Definition: xlog.c:112
#define SIGUSR1
Definition: win32_port.h:166
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
#define SIGCHLD
Definition: win32_port.h:164
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
Definition: walreceiver.c:607
sig_atomic_t force_reply
Definition: walreceiver.h:143
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:263
WalRcvState walRcvState
Definition: walreceiver.h:63
static StringInfoData incoming_message
Definition: walreceiver.c:114
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1131
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:273
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:147
union WalRcvStreamOptions::@106 proto
static void WalRcvSigHupHandler(SIGNAL_ARGS)
Definition: walreceiver.c:763
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:269
void proc_exit(int code)
Definition: ipc.c:104
#define WL_SOCKET_READABLE
Definition: latch.h:125
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
Definition: walreceiver.c:813
#define SIGPIPE
Definition: win32_port.h:159
#define SIGUSR2
Definition: win32_port.h:167
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7935
#define PANIC
Definition: elog.h:53
#define NAPTIME_PER_CYCLE
Definition: walreceiver.c:84
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:97
void ResetLatch(Latch *latch)
Definition: latch.c:519
TimestampTz lastMsgSendTime
Definition: walreceiver.h:96
#define PG_SETMASK(mask)
Definition: pqsignal.h:19
int wal_receiver_timeout
Definition: walreceiver.c:77
Latch procLatch
Definition: proc.h:104
static WalReceiverConn * wrconn
Definition: walreceiver.c:81
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
static StringInfoData reply_message
Definition: walreceiver.c:113
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:511
void pfree(void *pointer)
Definition: mcxt.c:1056
#define NI_MAXHOST
Definition: getaddrinfo.h:88
#define ERROR
Definition: elog.h:43
#define FATAL
Definition: elog.h:52
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11172
pid_t pid
Definition: walreceiver.h:62
#define MAXCONNINFO
Definition: walreceiver.h:35
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
static char * buf
Definition: pg_test_fsync.c:67
int XLogArchiveMode
Definition: xlog.c:91
int errdetail(const char *fmt,...)
Definition: elog.c:955
int errcode_for_file_access(void)
Definition: elog.c:631
#define SIGHUP
Definition: win32_port.h:154
XLogRecPtr startpoint
Definition: walreceiver.h:153
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
unsigned int uint32
Definition: c.h:359
int pgsocket
Definition: port.h:31
sigset_t UnBlockSig
Definition: pqsignal.c:22
struct WalRcvStreamOptions::@106::@107 physical
static volatile sig_atomic_t got_SIGHUP
Definition: walreceiver.c:100
#define ereport(elevel, rest)
Definition: elog.h:141
TimeLineID receiveStartTLI
Definition: walreceiver.h:73
static TimeLineID recvFileTLI
Definition: walreceiver.c:92
Definition: guc.h:72
char * cluster_name
Definition: guc.c:528
Latch * latch
Definition: walreceiver.h:134
#define SIG_IGN
Definition: win32_port.h:151
static char ** options
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
static XLogSegNo recvSegNo
Definition: walreceiver.c:93
#define MAXFNAMELEN
#define SpinLockRelease(lock)
Definition: spin.h:64
sigset_t BlockSig
Definition: pqsignal.c:22
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:561
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:82
#define PGINVALID_SOCKET
Definition: port.h:33
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1062
bool ready_to_display
Definition: walreceiver.h:125
TimestampTz latestWalEndTime
Definition: walreceiver.h:103
static void WalRcvShutdownHandler(SIGNAL_ARGS)
Definition: walreceiver.c:771
TimeLineID ThisTimeLineID
Definition: xlog.c:187
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define pg_memory_barrier()
Definition: atomics.h:145
#define SIG_DFL
Definition: win32_port.h:149
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:739
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
#define SIGALRM
Definition: win32_port.h:160
static void WalRcvQuickDieHandler(SIGNAL_ARGS)
Definition: walreceiver.c:790
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1003
WalRcvData * WalRcv
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4799
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:115
int errmsg(const char *fmt,...)
Definition: elog.c:822
static void WalRcvDie(int code, Datum arg)
Definition: walreceiver.c:732
#define elog(elevel,...)
Definition: elog.h:228
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
Definition: walreceiver.h:261
#define close(a)
Definition: win32.h:12
XLogRecPtr receiveStart
Definition: walreceiver.h:72
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:260
static int recvFile
Definition: walreceiver.c:91
#define snprintf
Definition: port.h:192
#define WL_LATCH_SET
Definition: latch.h:124
#define UINT64_FORMAT
Definition: c.h:402
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
char slotname[NAMEDATALEN]
Definition: walreceiver.h:122
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:109
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:255

◆ XLogWalRcvFlush()

static void XLogWalRcvFlush ( bool  dying)
static

Definition at line 1003 of file walreceiver.c.

References AllowCascadeReplication, issue_xlog_fsync(), WalRcvData::latestChunkStart, LogstreamResult, WalRcvData::mutex, WalRcvData::receivedTLI, WalRcvData::receivedUpto, recvFile, recvSegNo, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, ThisTimeLineID, update_process_title, WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvWrite().

1004 {
1005  if (LogstreamResult.Flush < LogstreamResult.Write)
1006  {
1007  WalRcvData *walrcv = WalRcv;
1008 
1010 
1011  LogstreamResult.Flush = LogstreamResult.Write;
1012 
1013  /* Update shared-memory status */
1014  SpinLockAcquire(&walrcv->mutex);
1015  if (walrcv->receivedUpto < LogstreamResult.Flush)
1016  {
1017  walrcv->latestChunkStart = walrcv->receivedUpto;
1018  walrcv->receivedUpto = LogstreamResult.Flush;
1019  walrcv->receivedTLI = ThisTimeLineID;
1020  }
1021  SpinLockRelease(&walrcv->mutex);
1022 
1023  /* Signal the startup process and walsender that new WAL has arrived */
1024  WakeupRecovery();
1026  WalSndWakeup();
1027 
1028  /* Report XLOG streaming progress in PS display */
1030  {
1031  char activitymsg[50];
1032 
1033  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1034  (uint32) (LogstreamResult.Write >> 32),
1035  (uint32) LogstreamResult.Write);
1036  set_ps_display(activitymsg, false);
1037  }
1038 
1039  /* Also let the master know that we made some progress */
1040  if (!dying)
1041  {
1042  XLogWalRcvSendReply(false, false);
1043  XLogWalRcvSendHSFeedback(false);
1044  }
1045  }
1046 }
static struct @25 LogstreamResult
slock_t mutex
Definition: walreceiver.h:136
bool update_process_title
Definition: ps_status.c:35
void issue_xlog_fsync(int fd, XLogSegNo segno)
Definition: xlog.c:10131
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1131
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:331
TimeLineID receivedTLI
Definition: walreceiver.h:83
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12279
XLogRecPtr latestChunkStart
Definition: walreceiver.h:91
#define AllowCascadeReplication()
Definition: walreceiver.h:38
unsigned int uint32
Definition: c.h:359
XLogRecPtr receivedUpto
Definition: walreceiver.h:82
static XLogSegNo recvSegNo
Definition: walreceiver.c:93
#define SpinLockRelease(lock)
Definition: spin.h:64
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1062
TimeLineID ThisTimeLineID
Definition: xlog.c:187
WalRcvData * WalRcv
static int recvFile
Definition: walreceiver.c:91
#define snprintf
Definition: port.h:192
void WalSndWakeup(void)
Definition: walsender.c:3033

◆ XLogWalRcvProcessMsg()

static void XLogWalRcvProcessMsg ( unsigned char  type,
char *  buf,
Size  len 
)
static

Definition at line 813 of file walreceiver.c.

References appendBinaryStringInfo(), ereport, errcode(), errmsg_internal(), ERROR, pq_getmsgbyte(), pq_getmsgint64(), ProcessWalSndrMessage(), resetStringInfo(), XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

814 {
815  int hdrlen;
816  XLogRecPtr dataStart;
817  XLogRecPtr walEnd;
818  TimestampTz sendTime;
819  bool replyRequested;
820 
822 
823  switch (type)
824  {
825  case 'w': /* WAL records */
826  {
827  /* copy message to StringInfo */
828  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
829  if (len < hdrlen)
830  ereport(ERROR,
831  (errcode(ERRCODE_PROTOCOL_VIOLATION),
832  errmsg_internal("invalid WAL message received from primary")));
834 
835  /* read the fields */
836  dataStart = pq_getmsgint64(&incoming_message);
837  walEnd = pq_getmsgint64(&incoming_message);
838  sendTime = pq_getmsgint64(&incoming_message);
839  ProcessWalSndrMessage(walEnd, sendTime);
840 
841  buf += hdrlen;
842  len -= hdrlen;
843  XLogWalRcvWrite(buf, len, dataStart);
844  break;
845  }
846  case 'k': /* Keepalive */
847  {
848  /* copy message to StringInfo */
849  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
850  if (len != hdrlen)
851  ereport(ERROR,
852  (errcode(ERRCODE_PROTOCOL_VIOLATION),
853  errmsg_internal("invalid keepalive message received from primary")));
855 
856  /* read the fields */
857  walEnd = pq_getmsgint64(&incoming_message);
858  sendTime = pq_getmsgint64(&incoming_message);
859  replyRequested = pq_getmsgbyte(&incoming_message);
860 
861  ProcessWalSndrMessage(walEnd, sendTime);
862 
863  /* If the primary requested a reply, send one immediately */
864  if (replyRequested)
865  XLogWalRcvSendReply(true, false);
866  break;
867  }
868  default:
869  ereport(ERROR,
870  (errcode(ERRCODE_PROTOCOL_VIOLATION),
871  errmsg_internal("invalid replication message type %d",
872  type)));
873  }
874 }
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
Definition: walreceiver.c:1247
int64 TimestampTz
Definition: timestamp.h:39
static StringInfoData incoming_message
Definition: walreceiver.c:114
int errcode(int sqlerrcode)
Definition: elog.c:608
#define ERROR
Definition: elog.h:43
static char * buf
Definition: pg_test_fsync.c:67
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
#define ereport(elevel, rest)
Definition: elog.h:141
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1062
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
int errmsg_internal(const char *fmt,...)
Definition: elog.c:909
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
Definition: walreceiver.c:880

◆ XLogWalRcvSendHSFeedback()

static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1131 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, EpochFromFullTransactionId, GetCurrentTimestamp(), GetOldestXmin(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), pq_sendbyte(), pq_sendint32(), pq_sendint64(), PROCARRAY_FLAGS_DEFAULT, PROCARRAY_SLOTS_XMIN, ProcArrayGetReplicationSlotXmin(), ReadNextFullTransactionId(), resetStringInfo(), TimestampDifferenceExceeds(), TransactionIdIsValid, TransactionIdPrecedes(), wal_receiver_status_interval, walrcv_send, and XidFromFullTransactionId.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

1132 {
1133  TimestampTz now;
1134  FullTransactionId nextFullXid;
1135  TransactionId nextXid;
1136  uint32 xmin_epoch,
1137  catalog_xmin_epoch;
1138  TransactionId xmin,
1139  catalog_xmin;
1140  static TimestampTz sendTime = 0;
1141 
1142  /* initially true so we always send at least one feedback message */
1143  static bool master_has_standby_xmin = true;
1144 
1145  /*
1146  * If the user doesn't want status to be reported to the master, be sure
1147  * to exit before doing anything at all.
1148  */
1150  !master_has_standby_xmin)
1151  return;
1152 
1153  /* Get current timestamp. */
1154  now = GetCurrentTimestamp();
1155 
1156  if (!immed)
1157  {
1158  /*
1159  * Send feedback at most once per wal_receiver_status_interval.
1160  */
1161  if (!TimestampDifferenceExceeds(sendTime, now,
1163  return;
1164  sendTime = now;
1165  }
1166 
1167  /*
1168  * If Hot Standby is not yet accepting connections there is nothing to
1169  * send. Check this after the interval has expired to reduce number of
1170  * calls.
1171  *
1172  * Bailing out here also ensures that we don't send feedback until we've
1173  * read our own replication slot state, so we don't tell the master to
1174  * discard needed xmin or catalog_xmin from any slots that may exist on
1175  * this replica.
1176  */
1177  if (!HotStandbyActive())
1178  return;
1179 
1180  /*
1181  * Make the expensive call to get the oldest xmin once we are certain
1182  * everything else has been checked.
1183  */
1185  {
1186  TransactionId slot_xmin;
1187 
1188  /*
1189  * Usually GetOldestXmin() would include both global replication slot
1190  * xmin and catalog_xmin in its calculations, but we want to derive
1191  * separate values for each of those. So we ask for an xmin that
1192  * excludes the catalog_xmin.
1193  */
1194  xmin = GetOldestXmin(NULL,
1196 
1197  ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
1198 
1199  if (TransactionIdIsValid(slot_xmin) &&
1200  TransactionIdPrecedes(slot_xmin, xmin))
1201  xmin = slot_xmin;
1202  }
1203  else
1204  {
1205  xmin = InvalidTransactionId;
1206  catalog_xmin = InvalidTransactionId;
1207  }
1208 
1209  /*
1210  * Get epoch and adjust if nextXid and oldestXmin are different sides of
1211  * the epoch boundary.
1212  */
1213  nextFullXid = ReadNextFullTransactionId();
1214  nextXid = XidFromFullTransactionId(nextFullXid);
1215  xmin_epoch = EpochFromFullTransactionId(nextFullXid);
1216  catalog_xmin_epoch = xmin_epoch;
1217  if (nextXid < xmin)
1218  xmin_epoch--;
1219  if (nextXid < catalog_xmin)
1220  catalog_xmin_epoch--;
1221 
1222  elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1223  xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1224 
1225  /* Construct the message and send it. */
1227  pq_sendbyte(&reply_message, 'h');
1229  pq_sendint32(&reply_message, xmin);
1230  pq_sendint32(&reply_message, xmin_epoch);
1231  pq_sendint32(&reply_message, catalog_xmin);
1232  pq_sendint32(&reply_message, catalog_xmin_epoch);
1234  if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1235  master_has_standby_xmin = true;
1236  else
1237  master_has_standby_xmin = false;
1238 }
bool hot_standby_feedback
Definition: walreceiver.c:78
uint32 TransactionId
Definition: c.h:514
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
bool HotStandbyActive(void)
Definition: xlog.c:7991
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
int wal_receiver_status_interval
Definition: walreceiver.c:76
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1682
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:3117
static WalReceiverConn * wrconn
Definition: walreceiver.c:81
#define XidFromFullTransactionId(x)
Definition: transam.h:48
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static StringInfoData reply_message
Definition: walreceiver.c:113
#define PROCARRAY_SLOTS_XMIN
Definition: procarray.h:37
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
unsigned int uint32
Definition: c.h:359
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:246
#define PROCARRAY_FLAGS_DEFAULT
Definition: procarray.h:50
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
TransactionId GetOldestXmin(Relation rel, int flags)
Definition: procarray.c:1306
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:275
#define elog(elevel,...)
Definition: elog.h:228
#define TransactionIdIsValid(xid)
Definition: transam.h:41
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547

◆ XLogWalRcvSendReply()

static void XLogWalRcvSendReply ( bool  force,
bool  requestReply 
)
static

Definition at line 1062 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), GetXLogReplayRecPtr(), StringInfoData::len, LogstreamResult, now(), pq_sendbyte(), pq_sendint64(), resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

1063 {
1064  static XLogRecPtr writePtr = 0;
1065  static XLogRecPtr flushPtr = 0;
1066  XLogRecPtr applyPtr;
1067  static TimestampTz sendTime = 0;
1068  TimestampTz now;
1069 
1070  /*
1071  * If the user doesn't want status to be reported to the master, be sure
1072  * to exit before doing anything at all.
1073  */
1074  if (!force && wal_receiver_status_interval <= 0)
1075  return;
1076 
1077  /* Get current timestamp. */
1078  now = GetCurrentTimestamp();
1079 
1080  /*
1081  * We can compare the write and flush positions to the last message we
1082  * sent without taking any lock, but the apply position requires a spin
1083  * lock, so we don't check that unless something else has changed or 10
1084  * seconds have passed. This means that the apply WAL location will
1085  * appear, from the master's point of view, to lag slightly, but since
1086  * this is only for reporting purposes and only on idle systems, that's
1087  * probably OK.
1088  */
1089  if (!force
1090  && writePtr == LogstreamResult.Write
1091  && flushPtr == LogstreamResult.Flush
1092  && !TimestampDifferenceExceeds(sendTime, now,
1094  return;
1095  sendTime = now;
1096 
1097  /* Construct a new message */
1098  writePtr = LogstreamResult.Write;
1099  flushPtr = LogstreamResult.Flush;
1100  applyPtr = GetXLogReplayRecPtr(NULL);
1101 
1103  pq_sendbyte(&reply_message, 'r');
1104  pq_sendint64(&reply_message, writePtr);
1105  pq_sendint64(&reply_message, flushPtr);
1106  pq_sendint64(&reply_message, applyPtr);
1108  pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1109 
1110  /* Send it */
1111  elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1112  (uint32) (writePtr >> 32), (uint32) writePtr,
1113  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1114  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1115  requestReply ? " (reply requested)" : "");
1116 
1118 }
static struct @25 LogstreamResult
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
int wal_receiver_status_interval
Definition: walreceiver.c:76
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1682
static WalReceiverConn * wrconn
Definition: walreceiver.c:81
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static StringInfoData reply_message
Definition: walreceiver.c:113
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11172
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
unsigned int uint32
Definition: c.h:359
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:275
#define elog(elevel,...)
Definition: elog.h:228
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547

◆ XLogWalRcvWrite()

static void XLogWalRcvWrite ( char *  buf,
Size  nbytes,
XLogRecPtr  recptr 
)
static

Definition at line 880 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, close, ereport, errcode_for_file_access(), errmsg(), LogstreamResult, MAXFNAMELEN, PANIC, recvFile, recvFileTLI, recvOff, recvSegNo, ThisTimeLineID, wal_segment_size, write, XLByteInSeg, XLByteToSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileInit(), XLogFileName, XLogSegmentOffset, and XLogWalRcvFlush().

Referenced by XLogWalRcvProcessMsg().

881 {
882  int startoff;
883  int byteswritten;
884 
885  while (nbytes > 0)
886  {
887  int segbytes;
888 
889  if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
890  {
891  bool use_existent;
892 
893  /*
894  * fsync() and close current file before we switch to next one. We
895  * would otherwise have to reopen this file to fsync it later
896  */
897  if (recvFile >= 0)
898  {
899  char xlogfname[MAXFNAMELEN];
900 
901  XLogWalRcvFlush(false);
902 
904 
905  /*
906  * XLOG segment files will be re-read by recovery in startup
907  * process soon, so we don't advise the OS to release cache
908  * pages associated with the file like XLogFileClose() does.
909  */
910  if (close(recvFile) != 0)
911  ereport(PANIC,
913  errmsg("could not close log segment %s: %m",
914  xlogfname)));
915 
916  /*
917  * Create .done file forcibly to prevent the streamed segment
918  * from being archived later.
919  */
921  XLogArchiveForceDone(xlogfname);
922  else
923  XLogArchiveNotify(xlogfname);
924  }
925  recvFile = -1;
926 
927  /* Create/use new log file */
929  use_existent = true;
930  recvFile = XLogFileInit(recvSegNo, &use_existent, true);
932  recvOff = 0;
933  }
934 
935  /* Calculate the start offset of the received logs */
936  startoff = XLogSegmentOffset(recptr, wal_segment_size);
937 
938  if (startoff + nbytes > wal_segment_size)
939  segbytes = wal_segment_size - startoff;
940  else
941  segbytes = nbytes;
942 
943  /* Need to seek in the file? */
944  if (recvOff != startoff)
945  {
946  if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
947  {
948  char xlogfname[MAXFNAMELEN];
949  int save_errno = errno;
950 
952  errno = save_errno;
953  ereport(PANIC,
955  errmsg("could not seek in log segment %s to offset %u: %m",
956  xlogfname, startoff)));
957  }
958 
959  recvOff = startoff;
960  }
961 
962  /* OK to write the logs */
963  errno = 0;
964 
965  byteswritten = write(recvFile, buf, segbytes);
966  if (byteswritten <= 0)
967  {
968  char xlogfname[MAXFNAMELEN];
969  int save_errno;
970 
971  /* if write didn't set errno, assume no disk space */
972  if (errno == 0)
973  errno = ENOSPC;
974 
975  save_errno = errno;
977  errno = save_errno;
978  ereport(PANIC,
980  errmsg("could not write to log segment %s "
981  "at offset %u, length %lu: %m",
982  xlogfname, recvOff, (unsigned long) segbytes)));
983  }
984 
985  /* Update state for write */
986  recptr += byteswritten;
987 
988  recvOff += byteswritten;
989  nbytes -= byteswritten;
990  buf += byteswritten;
991 
992  LogstreamResult.Write = recptr;
993  }
994 }
static struct @25 LogstreamResult
int wal_segment_size
Definition: xlog.c:112
#define write(a, b, c)
Definition: win32.h:14
int XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
Definition: xlog.c:3212
#define PANIC
Definition: elog.h:53
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:511
static char * buf
Definition: pg_test_fsync.c:67
int XLogArchiveMode
Definition: xlog.c:91
int errcode_for_file_access(void)
Definition: elog.c:631
#define ereport(elevel, rest)
Definition: elog.h:141
static TimeLineID recvFileTLI
Definition: walreceiver.c:92
static XLogSegNo recvSegNo
Definition: walreceiver.c:93
#define MAXFNAMELEN
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:561
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
TimeLineID ThisTimeLineID
Definition: xlog.c:187
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1003
int errmsg(const char *fmt,...)
Definition: elog.c:822
static uint32 recvOff
Definition: walreceiver.c:94
#define close(a)
Definition: win32.h:12
static int recvFile
Definition: walreceiver.c:91
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

Variable Documentation

◆ Flush

XLogRecPtr Flush

Definition at line 110 of file walreceiver.c.

◆ got_SIGHUP

volatile sig_atomic_t got_SIGHUP = false
static

Definition at line 100 of file walreceiver.c.

Referenced by WalRcvSigHupHandler(), and WalReceiverMain().

◆ got_SIGTERM

volatile sig_atomic_t got_SIGTERM = false
static

Definition at line 101 of file walreceiver.c.

Referenced by ProcessWalRcvInterrupts(), and WalRcvShutdownHandler().

◆ hot_standby_feedback

bool hot_standby_feedback

Definition at line 78 of file walreceiver.c.

Referenced by XLogWalRcvSendHSFeedback().

◆ incoming_message

StringInfoData incoming_message
static

Definition at line 114 of file walreceiver.c.

◆ LogstreamResult

struct { ... } LogstreamResult

◆ recvFile

int recvFile = -1
static

Definition at line 91 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ recvFileTLI

TimeLineID recvFileTLI = 0
static

Definition at line 92 of file walreceiver.c.

Referenced by WalReceiverMain(), and XLogWalRcvWrite().

◆ recvOff

uint32 recvOff = 0
static

Definition at line 94 of file walreceiver.c.

Referenced by XLogWalRcvWrite().

◆ recvSegNo

XLogSegNo recvSegNo = 0
static

Definition at line 93 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ reply_message

StringInfoData reply_message
static

Definition at line 113 of file walreceiver.c.

Referenced by send_feedback().

◆ wal_receiver_status_interval

int wal_receiver_status_interval

Definition at line 76 of file walreceiver.c.

Referenced by send_feedback(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ wal_receiver_timeout

int wal_receiver_timeout

Definition at line 77 of file walreceiver.c.

Referenced by logicalrep_worker_launch(), LogicalRepApplyLoop(), and WalReceiverMain().

◆ WalReceiverFunctions

WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 82 of file walreceiver.c.

Referenced by _PG_init().

◆ wrconn

WalReceiverConn* wrconn = NULL
static

Definition at line 81 of file walreceiver.c.

◆ Write

XLogRecPtr Write

Definition at line 109 of file walreceiver.c.