PostgreSQL Source Code  git master
walreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "access/xlogrecovery.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "common/ip.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */
 

Functions

static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len, TimeLineID tli)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvFlush (bool dying, TimeLineID tli)
 
static void XLogWalRcvClose (XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvSendReply (bool force, bool requestReply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
void ProcessWalRcvInterrupts (void)
 
void WalReceiverMain (void)
 
void WalRcvForceReply (void)
 
static const char * WalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
struct {
   XLogRecPtr   Write
 
   XLogRecPtr   Flush
 
LogstreamResult
 
static StringInfoData reply_message
 
static StringInfoData incoming_message
 

Macro Definition Documentation

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */

Definition at line 98 of file walreceiver.c.

Function Documentation

◆ pg_stat_get_wal_receiver()

Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1340 of file walreceiver.c.

1341 {
1342  TupleDesc tupdesc;
1343  Datum *values;
1344  bool *nulls;
1345  int pid;
1346  bool ready_to_display;
1348  XLogRecPtr receive_start_lsn;
1349  TimeLineID receive_start_tli;
1350  XLogRecPtr written_lsn;
1351  XLogRecPtr flushed_lsn;
1352  TimeLineID received_tli;
1353  TimestampTz last_send_time;
1354  TimestampTz last_receipt_time;
1355  XLogRecPtr latest_end_lsn;
1356  TimestampTz latest_end_time;
1357  char sender_host[NI_MAXHOST];
1358  int sender_port = 0;
1359  char slotname[NAMEDATALEN];
1360  char conninfo[MAXCONNINFO];
1361 
1362  /* Take a lock to ensure value consistency */
1364  pid = (int) WalRcv->pid;
1365  ready_to_display = WalRcv->ready_to_display;
1367  receive_start_lsn = WalRcv->receiveStart;
1368  receive_start_tli = WalRcv->receiveStartTLI;
1369  flushed_lsn = WalRcv->flushedUpto;
1370  received_tli = WalRcv->receivedTLI;
1371  last_send_time = WalRcv->lastMsgSendTime;
1372  last_receipt_time = WalRcv->lastMsgReceiptTime;
1373  latest_end_lsn = WalRcv->latestWalEnd;
1374  latest_end_time = WalRcv->latestWalEndTime;
1375  strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1376  strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
1377  sender_port = WalRcv->sender_port;
1378  strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1380 
1381  /*
1382  * No WAL receiver (or not ready yet), just return a tuple with NULL
1383  * values
1384  */
1385  if (pid == 0 || !ready_to_display)
1386  PG_RETURN_NULL();
1387 
1388  /*
1389  * Read "writtenUpto" without holding a spinlock. Note that it may not be
1390  * consistent with the other shared variables of the WAL receiver
1391  * protected by a spinlock, but this should not be used for data integrity
1392  * checks.
1393  */
1394  written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto);
1395 
1396  /* determine result type */
1397  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1398  elog(ERROR, "return type must be a row type");
1399 
1400  values = palloc0(sizeof(Datum) * tupdesc->natts);
1401  nulls = palloc0(sizeof(bool) * tupdesc->natts);
1402 
1403  /* Fetch values */
1404  values[0] = Int32GetDatum(pid);
1405 
1406  if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
1407  {
1408  /*
1409  * Only superusers and roles with privileges of pg_read_all_stats can
1410  * see details. Other users only get the pid value to know whether it
1411  * is a WAL receiver, but no details.
1412  */
1413  MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1414  }
1415  else
1416  {
1418 
1419  if (XLogRecPtrIsInvalid(receive_start_lsn))
1420  nulls[2] = true;
1421  else
1422  values[2] = LSNGetDatum(receive_start_lsn);
1423  values[3] = Int32GetDatum(receive_start_tli);
1424  if (XLogRecPtrIsInvalid(written_lsn))
1425  nulls[4] = true;
1426  else
1427  values[4] = LSNGetDatum(written_lsn);
1428  if (XLogRecPtrIsInvalid(flushed_lsn))
1429  nulls[5] = true;
1430  else
1431  values[5] = LSNGetDatum(flushed_lsn);
1432  values[6] = Int32GetDatum(received_tli);
1433  if (last_send_time == 0)
1434  nulls[7] = true;
1435  else
1436  values[7] = TimestampTzGetDatum(last_send_time);
1437  if (last_receipt_time == 0)
1438  nulls[8] = true;
1439  else
1440  values[8] = TimestampTzGetDatum(last_receipt_time);
1441  if (XLogRecPtrIsInvalid(latest_end_lsn))
1442  nulls[9] = true;
1443  else
1444  values[9] = LSNGetDatum(latest_end_lsn);
1445  if (latest_end_time == 0)
1446  nulls[10] = true;
1447  else
1448  values[10] = TimestampTzGetDatum(latest_end_time);
1449  if (*slotname == '\0')
1450  nulls[11] = true;
1451  else
1452  values[11] = CStringGetTextDatum(slotname);
1453  if (*sender_host == '\0')
1454  nulls[12] = true;
1455  else
1456  values[12] = CStringGetTextDatum(sender_host);
1457  if (sender_port == 0)
1458  nulls[13] = true;
1459  else
1460  values[13] = Int32GetDatum(sender_port);
1461  if (*conninfo == '\0')
1462  nulls[14] = true;
1463  else
1464  values[14] = CStringGetTextDatum(conninfo);
1465  }
1466 
1467  /* Returns the record as Datum */
1469 }
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:4951
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:429
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define CStringGetTextDatum(s)
Definition: builtins.h:85
#define MemSet(start, val, len)
Definition: c.h:1008
int64 TimestampTz
Definition: timestamp.h:39
#define ERROR
Definition: elog.h:33
#define elog(elevel,...)
Definition: elog.h:218
#define PG_RETURN_NULL()
Definition: fmgr.h:345
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:353
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:276
@ TYPEFUNC_COMPOSITE
Definition: funcapi.h:149
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:220
#define NI_MAXHOST
Definition: getaddrinfo.h:88
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
void * palloc0(Size size)
Definition: mcxt.c:1099
Oid GetUserId(void)
Definition: miscinit.c:492
#define NAMEDATALEN
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
uintptr_t Datum
Definition: postgres.h:411
#define Int32GetDatum(X)
Definition: postgres.h:523
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:100
XLogRecPtr latestWalEnd
Definition: walreceiver.h:105
TimeLineID receiveStartTLI
Definition: walreceiver.h:76
TimeLineID receivedTLI
Definition: walreceiver.h:86
char slotname[NAMEDATALEN]
Definition: walreceiver.h:125
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:118
pid_t pid
Definition: walreceiver.h:64
XLogRecPtr receiveStart
Definition: walreceiver.h:75
XLogRecPtr flushedUpto
Definition: walreceiver.h:85
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:153
TimestampTz lastMsgSendTime
Definition: walreceiver.h:99
WalRcvState walRcvState
Definition: walreceiver.h:65
TimestampTz latestWalEndTime
Definition: walreceiver.h:106
bool ready_to_display
Definition: walreceiver.h:134
int sender_port
Definition: walreceiver.h:119
slock_t mutex
Definition: walreceiver.h:145
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:112
Definition: regguts.h:318
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
static const char * WalRcvGetStateString(WalRcvState state)
Definition: walreceiver.c:1315
#define MAXCONNINFO
Definition: walreceiver.h:37
WalRcvState
Definition: walreceiver.h:46
WalRcvData * WalRcv
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59

References WalRcvData::conninfo, CStringGetTextDatum, elog, ERROR, WalRcvData::flushedUpto, get_call_result_type(), GetUserId(), has_privs_of_role(), heap_form_tuple(), HeapTupleGetDatum, Int32GetDatum, WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum, MAXCONNINFO, MemSet, WalRcvData::mutex, NAMEDATALEN, TupleDescData::natts, NI_MAXHOST, palloc0(), pg_atomic_read_u64(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::sender_host, WalRcvData::sender_port, WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, strlcpy(), TimestampTzGetDatum, TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, WalRcvData::writtenUpto, and XLogRecPtrIsInvalid.

◆ ProcessWalRcvInterrupts()

void ProcessWalRcvInterrupts ( void  )

Definition at line 150 of file walreceiver.c.

151 {
152  /*
153  * Although walreceiver interrupt handling doesn't use the same scheme as
154  * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
155  * any incoming signals on Win32, and also to make sure we process any
156  * barrier events.
157  */
159 
161  {
162  ereport(FATAL,
163  (errcode(ERRCODE_ADMIN_SHUTDOWN),
164  errmsg("terminating walreceiver process due to administrator command")));
165  }
166 }
int errcode(int sqlerrcode)
Definition: elog.c:693
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define FATAL
Definition: elog.h:35
#define ereport(elevel,...)
Definition: elog.h:143
volatile sig_atomic_t ShutdownRequestPending
Definition: interrupt.c:28
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121

References CHECK_FOR_INTERRUPTS, ereport, errcode(), errmsg(), FATAL, and ShutdownRequestPending.

Referenced by libpqrcv_connect(), libpqrcv_PQgetResult(), libpqrcv_processTuples(), WalRcvWaitForStartPosition(), and WalReceiverMain().

◆ ProcessWalSndrMessage()

static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1244 of file walreceiver.c.

1245 {
1246  WalRcvData *walrcv = WalRcv;
1247 
1248  TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1249 
1250  /* Update shared-memory status */
1251  SpinLockAcquire(&walrcv->mutex);
1252  if (walrcv->latestWalEnd < walEnd)
1253  walrcv->latestWalEndTime = sendTime;
1254  walrcv->latestWalEnd = walEnd;
1255  walrcv->lastMsgSendTime = sendTime;
1256  walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1257  SpinLockRelease(&walrcv->mutex);
1258 
1260  {
1261  char *sendtime;
1262  char *receipttime;
1263  int applyDelay;
1264 
1265  /* Copy because timestamptz_to_str returns a static buffer */
1266  sendtime = pstrdup(timestamptz_to_str(sendTime));
1267  receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1268  applyDelay = GetReplicationApplyDelay();
1269 
1270  /* apply delay is not available */
1271  if (applyDelay == -1)
1272  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1273  sendtime,
1274  receipttime,
1276  else
1277  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1278  sendtime,
1279  receipttime,
1280  applyDelay,
1282 
1283  pfree(sendtime);
1284  pfree(receipttime);
1285  }
1286 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1768
bool message_level_is_interesting(int elevel)
Definition: elog.c:265
#define DEBUG2
Definition: elog.h:23
char * pstrdup(const char *in)
Definition: mcxt.c:1305
void pfree(void *pointer)
Definition: mcxt.c:1175
int GetReplicationApplyDelay(void)
int GetReplicationTransferLatency(void)

References DEBUG2, elog, GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, message_level_is_interesting(), WalRcvData::mutex, pfree(), pstrdup(), SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

◆ WalRcvDie()

static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 779 of file walreceiver.c.

780 {
781  WalRcvData *walrcv = WalRcv;
782  TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
783 
784  Assert(*startpointTLI_p != 0);
785 
786  /* Ensure that all WAL records received are flushed to disk */
787  XLogWalRcvFlush(true, *startpointTLI_p);
788 
789  /* Mark ourselves inactive in shared memory */
790  SpinLockAcquire(&walrcv->mutex);
791  Assert(walrcv->walRcvState == WALRCV_STREAMING ||
792  walrcv->walRcvState == WALRCV_RESTARTING ||
793  walrcv->walRcvState == WALRCV_STARTING ||
794  walrcv->walRcvState == WALRCV_WAITING ||
795  walrcv->walRcvState == WALRCV_STOPPING);
796  Assert(walrcv->pid == MyProcPid);
797  walrcv->walRcvState = WALRCV_STOPPED;
798  walrcv->pid = 0;
799  walrcv->ready_to_display = false;
800  walrcv->latch = NULL;
801  SpinLockRelease(&walrcv->mutex);
802 
804 
805  /* Terminate the connection gracefully. */
806  if (wrconn != NULL)
808 
809  /* Wake up the startup process to notice promptly that we're gone */
810  WakeupRecovery();
811 }
void ConditionVariableBroadcast(ConditionVariable *cv)
int MyProcPid
Definition: globals.c:44
Assert(fmt[strlen(fmt) - 1] !='\n')
void * arg
#define DatumGetPointer(X)
Definition: postgres.h:593
Latch * latch
Definition: walreceiver.h:143
ConditionVariable walRcvStoppedCV
Definition: walreceiver.h:66
static WalReceiverConn * wrconn
Definition: walreceiver.c:95
static void XLogWalRcvFlush(bool dying, TimeLineID tli)
Definition: walreceiver.c:967
@ WALRCV_STARTING
Definition: walreceiver.h:48
@ WALRCV_STOPPED
Definition: walreceiver.h:47
@ WALRCV_RESTARTING
Definition: walreceiver.h:52
@ WALRCV_STREAMING
Definition: walreceiver.h:50
@ WALRCV_WAITING
Definition: walreceiver.h:51
@ WALRCV_STOPPING
Definition: walreceiver.h:53
#define walrcv_disconnect(conn)
Definition: walreceiver.h:432
void WakeupRecovery(void)

References arg, Assert(), ConditionVariableBroadcast(), DatumGetPointer, WalRcvData::latch, WalRcvData::mutex, MyProcPid, WalRcvData::pid, WalRcvData::ready_to_display, SpinLockAcquire, SpinLockRelease, WakeupRecovery(), WalRcv, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, wrconn, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

◆ WalRcvFetchTimeLineHistoryFiles()

static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 723 of file walreceiver.c.

724 {
725  TimeLineID tli;
726 
727  for (tli = first; tli <= last; tli++)
728  {
729  /* there's no history file for timeline 1 */
730  if (tli != 1 && !existsTimeLineHistory(tli))
731  {
732  char *fname;
733  char *content;
734  int len;
735  char expectedfname[MAXFNAMELEN];
736 
737  ereport(LOG,
738  (errmsg("fetching timeline history file for timeline %u from primary server",
739  tli)));
740 
741  walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
742 
743  /*
744  * Check that the filename on the primary matches what we
745  * calculated ourselves. This is just a sanity check, it should
746  * always match.
747  */
748  TLHistoryFileName(expectedfname, tli);
749  if (strcmp(fname, expectedfname) != 0)
750  ereport(ERROR,
751  (errcode(ERRCODE_PROTOCOL_VIOLATION),
752  errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
753  tli)));
754 
755  /*
756  * Write the file to pg_wal.
757  */
758  writeTimeLineHistoryFile(tli, content, len);
759 
760  /*
761  * Mark the streamed history file as ready for archiving if
762  * archive_mode is always.
763  */
765  XLogArchiveForceDone(fname);
766  else
767  XLogArchiveNotify(fname);
768 
769  pfree(fname);
770  pfree(content);
771  }
772  }
773 }
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition: timeline.c:467
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition: timeline.c:222
int errmsg_internal(const char *fmt,...)
Definition: elog.c:991
#define LOG
Definition: elog.h:25
const void size_t len
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
Definition: walreceiver.h:416
int XLogArchiveMode
Definition: xlog.c:120
@ ARCHIVE_MODE_ALWAYS
Definition: xlog.h:62
#define TLHistoryFileName(fname, tli)
#define MAXFNAMELEN
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:540
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:474

References ARCHIVE_MODE_ALWAYS, ereport, errcode(), errmsg(), errmsg_internal(), ERROR, existsTimeLineHistory(), len, LOG, MAXFNAMELEN, pfree(), TLHistoryFileName, walrcv_readtimelinehistoryfile, wrconn, writeTimeLineHistoryFile(), XLogArchiveForceDone(), XLogArchiveMode, and XLogArchiveNotify().

Referenced by WalReceiverMain().

◆ WalRcvForceReply()

void WalRcvForceReply ( void  )

Definition at line 1297 of file walreceiver.c.

1298 {
1299  Latch *latch;
1300 
1301  WalRcv->force_reply = true;
1302  /* fetching the latch pointer might not be atomic, so use spinlock */
1304  latch = WalRcv->latch;
1306  if (latch)
1307  SetLatch(latch);
1308 }
void SetLatch(Latch *latch)
Definition: latch.c:566
Definition: latch.h:111
sig_atomic_t force_reply
Definition: walreceiver.h:160

References WalRcvData::force_reply, WalRcvData::latch, WalRcvData::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by ApplyWalRecord(), and WaitForWALToBecomeAvailable().

◆ WalRcvGetStateString()

static const char* WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1315 of file walreceiver.c.

1316 {
1317  switch (state)
1318  {
1319  case WALRCV_STOPPED:
1320  return "stopped";
1321  case WALRCV_STARTING:
1322  return "starting";
1323  case WALRCV_STREAMING:
1324  return "streaming";
1325  case WALRCV_WAITING:
1326  return "waiting";
1327  case WALRCV_RESTARTING:
1328  return "restarting";
1329  case WALRCV_STOPPING:
1330  return "stopping";
1331  }
1332  return "UNKNOWN";
1333 }

References WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

◆ WalRcvWaitForStartPosition()

static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 643 of file walreceiver.c.

644 {
645  WalRcvData *walrcv = WalRcv;
646  int state;
647 
648  SpinLockAcquire(&walrcv->mutex);
649  state = walrcv->walRcvState;
650  if (state != WALRCV_STREAMING)
651  {
652  SpinLockRelease(&walrcv->mutex);
653  if (state == WALRCV_STOPPING)
654  proc_exit(0);
655  else
656  elog(FATAL, "unexpected walreceiver state");
657  }
658  walrcv->walRcvState = WALRCV_WAITING;
660  walrcv->receiveStartTLI = 0;
661  SpinLockRelease(&walrcv->mutex);
662 
663  set_ps_display("idle");
664 
665  /*
666  * nudge startup process to notice that we've stopped streaming and are
667  * now waiting for instructions.
668  */
669  WakeupRecovery();
670  for (;;)
671  {
673 
675 
676  SpinLockAcquire(&walrcv->mutex);
677  Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
678  walrcv->walRcvState == WALRCV_WAITING ||
679  walrcv->walRcvState == WALRCV_STOPPING);
680  if (walrcv->walRcvState == WALRCV_RESTARTING)
681  {
682  /*
683  * No need to handle changes in primary_conninfo or
684  * primary_slot_name here. Startup process will signal us to
685  * terminate in case those change.
686  */
687  *startpoint = walrcv->receiveStart;
688  *startpointTLI = walrcv->receiveStartTLI;
689  walrcv->walRcvState = WALRCV_STREAMING;
690  SpinLockRelease(&walrcv->mutex);
691  break;
692  }
693  if (walrcv->walRcvState == WALRCV_STOPPING)
694  {
695  /*
696  * We should've received SIGTERM if the startup process wants us
697  * to die, but might as well check it here too.
698  */
699  SpinLockRelease(&walrcv->mutex);
700  exit(1);
701  }
702  SpinLockRelease(&walrcv->mutex);
703 
706  }
707 
709  {
710  char activitymsg[50];
711 
712  snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
713  LSN_FORMAT_ARGS(*startpoint));
714  set_ps_display(activitymsg);
715  }
716 }
struct Latch * MyLatch
Definition: globals.c:58
void proc_exit(int code)
Definition: ipc.c:104
void ResetLatch(Latch *latch)
Definition: latch.c:658
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:451
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
exit(1)
#define snprintf
Definition: port.h:225
bool update_process_title
Definition: ps_status.c:36
void set_ps_display(const char *activity)
Definition: ps_status.c:349
@ WAIT_EVENT_WAL_RECEIVER_WAIT_START
Definition: wait_event.h:130
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:150
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert(), elog, exit(), FATAL, InvalidXLogRecPtr, LSN_FORMAT_ARGS, WalRcvData::mutex, MyLatch, proc_exit(), ProcessWalRcvInterrupts(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, ResetLatch(), set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WAIT_EVENT_WAL_RECEIVER_WAIT_START, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by WalReceiverMain().

◆ WalReceiverMain()

void WalReceiverMain ( void  )

Definition at line 171 of file walreceiver.c.

172 {
173  char conninfo[MAXCONNINFO];
174  char *tmp_conninfo;
175  char slotname[NAMEDATALEN];
176  bool is_temp_slot;
177  XLogRecPtr startpoint;
178  TimeLineID startpointTLI;
179  TimeLineID primaryTLI;
180  bool first_stream;
181  WalRcvData *walrcv = WalRcv;
182  TimestampTz last_recv_timestamp;
184  bool ping_sent;
185  char *err;
186  char *sender_host = NULL;
187  int sender_port = 0;
188 
189  /*
190  * WalRcv should be set up already (if we are a backend, we inherit this
191  * by fork() or EXEC_BACKEND mechanism from the postmaster).
192  */
193  Assert(walrcv != NULL);
194 
196 
197  /*
198  * Mark walreceiver as running in shared memory.
199  *
200  * Do this as early as possible, so that if we fail later on, we'll set
201  * state to STOPPED. If we die before this, the startup process will keep
202  * waiting for us to start up, until it times out.
203  */
204  SpinLockAcquire(&walrcv->mutex);
205  Assert(walrcv->pid == 0);
206  switch (walrcv->walRcvState)
207  {
208  case WALRCV_STOPPING:
209  /* If we've already been requested to stop, don't start up. */
210  walrcv->walRcvState = WALRCV_STOPPED;
211  /* fall through */
212 
213  case WALRCV_STOPPED:
214  SpinLockRelease(&walrcv->mutex);
216  proc_exit(1);
217  break;
218 
219  case WALRCV_STARTING:
220  /* The usual case */
221  break;
222 
223  case WALRCV_WAITING:
224  case WALRCV_STREAMING:
225  case WALRCV_RESTARTING:
226  default:
227  /* Shouldn't happen */
228  SpinLockRelease(&walrcv->mutex);
229  elog(PANIC, "walreceiver still running according to shared memory state");
230  }
231  /* Advertise our PID so that the startup process can kill us */
232  walrcv->pid = MyProcPid;
233  walrcv->walRcvState = WALRCV_STREAMING;
234 
235  /* Fetch information required to start streaming */
236  walrcv->ready_to_display = false;
237  strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
238  strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
239  is_temp_slot = walrcv->is_temp_slot;
240  startpoint = walrcv->receiveStart;
241  startpointTLI = walrcv->receiveStartTLI;
242 
243  /*
244  * At most one of is_temp_slot and slotname can be set; otherwise,
245  * RequestXLogStreaming messed up.
246  */
247  Assert(!is_temp_slot || (slotname[0] == '\0'));
248 
249  /* Initialise to a sanish value */
250  walrcv->lastMsgSendTime =
251  walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
252 
253  /* Report the latch to use to awaken this process */
254  walrcv->latch = &MyProc->procLatch;
255 
256  SpinLockRelease(&walrcv->mutex);
257 
259 
260  /* Arrange to clean up at walreceiver exit */
261  on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
262 
263  /* Properly accept or ignore signals the postmaster might send us */
264  pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
265  * file */
266  pqsignal(SIGINT, SIG_IGN);
267  pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
268  /* SIGQUIT handler was already set up by InitPostmasterChild */
273 
274  /* Reset some signals that are accepted by postmaster but not here */
276 
277  /* Load the libpq-specific functions */
278  load_file("libpqwalreceiver", false);
279  if (WalReceiverFunctions == NULL)
280  elog(ERROR, "libpqwalreceiver didn't initialize correctly");
281 
282  /* Unblock signals (they were blocked when the postmaster forked us) */
284 
285  /* Establish the connection to the primary for XLOG streaming */
286  wrconn = walrcv_connect(conninfo, false,
287  cluster_name[0] ? cluster_name : "walreceiver",
288  &err);
289  if (!wrconn)
290  ereport(ERROR,
291  (errcode(ERRCODE_CONNECTION_FAILURE),
292  errmsg("could not connect to the primary server: %s", err)));
293 
294  /*
295  * Save user-visible connection string. This clobbers the original
296  * conninfo, for security. Also save host and port of the sender server
297  * this walreceiver is connected to.
298  */
299  tmp_conninfo = walrcv_get_conninfo(wrconn);
300  walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
301  SpinLockAcquire(&walrcv->mutex);
302  memset(walrcv->conninfo, 0, MAXCONNINFO);
303  if (tmp_conninfo)
304  strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
305 
306  memset(walrcv->sender_host, 0, NI_MAXHOST);
307  if (sender_host)
308  strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
309 
310  walrcv->sender_port = sender_port;
311  walrcv->ready_to_display = true;
312  SpinLockRelease(&walrcv->mutex);
313 
314  if (tmp_conninfo)
315  pfree(tmp_conninfo);
316 
317  if (sender_host)
318  pfree(sender_host);
319 
320  first_stream = true;
321  for (;;)
322  {
323  char *primary_sysid;
324  char standby_sysid[32];
326 
327  /*
328  * Check that we're connected to a valid server using the
329  * IDENTIFY_SYSTEM replication command.
330  */
331  primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
332 
333  snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
335  if (strcmp(primary_sysid, standby_sysid) != 0)
336  {
337  ereport(ERROR,
338  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
339  errmsg("database system identifier differs between the primary and standby"),
340  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
341  primary_sysid, standby_sysid)));
342  }
343 
344  /*
345  * Confirm that the current timeline of the primary is the same or
346  * ahead of ours.
347  */
348  if (primaryTLI < startpointTLI)
349  ereport(ERROR,
350  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
351  errmsg("highest timeline %u of the primary is behind recovery timeline %u",
352  primaryTLI, startpointTLI)));
353 
354  /*
355  * Get any missing history files. We do this always, even when we're
356  * not interested in that timeline, so that if we're promoted to
357  * become the primary later on, we don't select the same timeline that
358  * was already used in the current primary. This isn't bullet-proof -
359  * you'll need some external software to manage your cluster if you
360  * need to ensure that a unique timeline id is chosen in every case,
361  * but let's avoid the confusion of timeline id collisions where we
362  * can.
363  */
364  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
365 
366  /*
367  * Create temporary replication slot if requested, and update slot
368  * name in shared memory. (Note the slot name cannot already be set
369  * in this case.)
370  */
371  if (is_temp_slot)
372  {
373  snprintf(slotname, sizeof(slotname),
374  "pg_walreceiver_%lld",
375  (long long int) walrcv_get_backend_pid(wrconn));
376 
377  walrcv_create_slot(wrconn, slotname, true, false, 0, NULL);
378 
379  SpinLockAcquire(&walrcv->mutex);
380  strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
381  SpinLockRelease(&walrcv->mutex);
382  }
383 
384  /*
385  * Start streaming.
386  *
387  * We'll try to start at the requested starting point and timeline,
388  * even if it's different from the server's latest timeline. In case
389  * we've already reached the end of the old timeline, the server will
390  * finish the streaming immediately, and we will go back to await
391  * orders from the startup process. If recovery_target_timeline is
392  * 'latest', the startup process will scan pg_wal and find the new
393  * history file, bump recovery target timeline, and ask us to restart
394  * on the new timeline.
395  */
396  options.logical = false;
397  options.startpoint = startpoint;
398  options.slotname = slotname[0] != '\0' ? slotname : NULL;
399  options.proto.physical.startpointTLI = startpointTLI;
401  {
402  if (first_stream)
403  ereport(LOG,
404  (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
405  LSN_FORMAT_ARGS(startpoint), startpointTLI)));
406  else
407  ereport(LOG,
408  (errmsg("restarted WAL streaming at %X/%X on timeline %u",
409  LSN_FORMAT_ARGS(startpoint), startpointTLI)));
410  first_stream = false;
411 
412  /* Initialize LogstreamResult and buffers for processing messages */
416 
417  /* Initialize the last recv timestamp */
418  last_recv_timestamp = GetCurrentTimestamp();
419  ping_sent = false;
420 
421  /* Loop until end-of-streaming or error */
422  for (;;)
423  {
424  char *buf;
425  int len;
426  bool endofwal = false;
427  pgsocket wait_fd = PGINVALID_SOCKET;
428  int rc;
429 
430  /*
431  * Exit walreceiver if we're not in recovery. This should not
432  * happen, but cross-check the status here.
433  */
434  if (!RecoveryInProgress())
435  ereport(FATAL,
436  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
437  errmsg("cannot continue WAL streaming, recovery has already ended")));
438 
439  /* Process any requests or signals received recently */
441 
443  {
444  ConfigReloadPending = false;
447  }
448 
449  /* See if we can read data immediately */
450  len = walrcv_receive(wrconn, &buf, &wait_fd);
451  if (len != 0)
452  {
453  /*
454  * Process the received data, and any subsequent data we
455  * can read without blocking.
456  */
457  for (;;)
458  {
459  if (len > 0)
460  {
461  /*
462  * Something was received from primary, so reset
463  * timeout
464  */
465  last_recv_timestamp = GetCurrentTimestamp();
466  ping_sent = false;
467  XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
468  startpointTLI);
469  }
470  else if (len == 0)
471  break;
472  else if (len < 0)
473  {
474  ereport(LOG,
475  (errmsg("replication terminated by primary server"),
476  errdetail("End of WAL reached on timeline %u at %X/%X.",
477  startpointTLI,
479  endofwal = true;
480  break;
481  }
482  len = walrcv_receive(wrconn, &buf, &wait_fd);
483  }
484 
485  /* Let the primary know that we received some data. */
486  XLogWalRcvSendReply(false, false);
487 
488  /*
489  * If we've written some records, flush them to disk and
490  * let the startup process and primary server know about
491  * them.
492  */
493  XLogWalRcvFlush(false, startpointTLI);
494  }
495 
496  /* Check if we need to exit the streaming loop. */
497  if (endofwal)
498  break;
499 
500  /*
501  * Ideally we would reuse a WaitEventSet object repeatedly
502  * here to avoid the overheads of WaitLatchOrSocket on epoll
503  * systems, but we can't be sure that libpq (or any other
504  * walreceiver implementation) has the same socket (even if
505  * the fd is the same number, it may have been closed and
506  * reopened since the last time). In future, if there is a
507  * function for removing sockets from WaitEventSet, then we
508  * could add and remove just the socket each time, potentially
509  * avoiding some system calls.
510  */
511  Assert(wait_fd != PGINVALID_SOCKET);
515  wait_fd,
518  if (rc & WL_LATCH_SET)
519  {
522 
523  if (walrcv->force_reply)
524  {
525  /*
526  * The recovery process has asked us to send apply
527  * feedback now. Make sure the flag is really set to
528  * false in shared memory before sending the reply, so
529  * we don't miss a new request for a reply.
530  */
531  walrcv->force_reply = false;
533  XLogWalRcvSendReply(true, false);
534  }
535  }
536  if (rc & WL_TIMEOUT)
537  {
538  /*
539  * We didn't receive anything new. If we haven't heard
540  * anything from the server for more than
541  * wal_receiver_timeout / 2, ping the server. Also, if
542  * it's been longer than wal_receiver_status_interval
543  * since the last update we sent, send a status update to
544  * the primary anyway, to report any progress in applying
545  * WAL.
546  */
547  bool requestReply = false;
548 
549  /*
550  * Check if time since last receive from primary has
551  * reached the configured limit.
552  */
553  if (wal_receiver_timeout > 0)
554  {
556  TimestampTz timeout;
557 
558  timeout =
559  TimestampTzPlusMilliseconds(last_recv_timestamp,
561 
562  if (now >= timeout)
563  ereport(ERROR,
564  (errcode(ERRCODE_CONNECTION_FAILURE),
565  errmsg("terminating walreceiver due to timeout")));
566 
567  /*
568  * We didn't receive anything new, for half of
569  * receiver replication timeout. Ping the server.
570  */
571  if (!ping_sent)
572  {
573  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
574  (wal_receiver_timeout / 2));
575  if (now >= timeout)
576  {
577  requestReply = true;
578  ping_sent = true;
579  }
580  }
581  }
582 
583  XLogWalRcvSendReply(requestReply, requestReply);
585  }
586  }
587 
588  /*
589  * The backend finished streaming. Exit streaming COPY-mode from
590  * our side, too.
591  */
592  walrcv_endstreaming(wrconn, &primaryTLI);
593 
594  /*
595  * If the server had switched to a new timeline that we didn't
596  * know about when we began streaming, fetch its timeline history
597  * file now.
598  */
599  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
600  }
601  else
602  ereport(LOG,
603  (errmsg("primary server contains no more WAL on requested timeline %u",
604  startpointTLI)));
605 
606  /*
607  * End of WAL reached on the requested timeline. Close the last
608  * segment, and await for new orders from the startup process.
609  */
610  if (recvFile >= 0)
611  {
612  char xlogfname[MAXFNAMELEN];
613 
614  XLogWalRcvFlush(false, startpointTLI);
616  if (close(recvFile) != 0)
617  ereport(PANIC,
619  errmsg("could not close log segment %s: %m",
620  xlogfname)));
621 
622  /*
623  * Create .done file forcibly to prevent the streamed segment from
624  * being archived later.
625  */
627  XLogArchiveForceDone(xlogfname);
628  else
629  XLogArchiveNotify(xlogfname);
630  }
631  recvFile = -1;
632 
633  elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
634  WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
635  }
636  /* not reached */
637 }
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:438
#define pg_memory_barrier()
Definition: atomics.h:145
sigset_t UnBlockSig
Definition: pqsignal.c:22
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1538
#define UINT64_FORMAT
Definition: c.h:484
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:144
int errcode_for_file_access(void)
Definition: elog.c:716
int errdetail(const char *fmt,...)
Definition: elog.c:1037
#define PANIC
Definition: elog.h:36
#define DEBUG1
Definition: elog.h:24
char * cluster_name
Definition: guc.c:653
@ PGC_SIGHUP
Definition: guc.h:72
void ProcessConfigFile(GucContext context)
#define close(a)
Definition: win32.h:12
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition: interrupt.c:108
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:499
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_TIMEOUT
Definition: latch.h:128
static char ** options
static char * buf
Definition: pg_test_fsync.c:67
int pgsocket
Definition: port.h:29
#define PGINVALID_SOCKET
Definition: port.h:31
#define PointerGetDatum(X)
Definition: postgres.h:600
#define PG_SETMASK(mask)
Definition: pqsignal.h:19
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:638
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:180
PGPROC * MyProc
Definition: proc.c:68
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Latch procLatch
Definition: proc.h:168
bool is_temp_slot
Definition: walreceiver.h:131
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
@ WAIT_EVENT_WAL_RECEIVER_MAIN
Definition: wait_event.h:47
static struct @17 LogstreamResult
static StringInfoData reply_message
Definition: walreceiver.c:119
static int recvFile
Definition: walreceiver.c:105
#define NAPTIME_PER_CYCLE
Definition: walreceiver.c:98
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
Definition: walreceiver.c:723
static StringInfoData incoming_message
Definition: walreceiver.c:120
static TimeLineID recvFileTLI
Definition: walreceiver.c:106
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:96
int wal_receiver_timeout
Definition: walreceiver.c:91
static XLogSegNo recvSegNo
Definition: walreceiver.c:107
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1143
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
Definition: walreceiver.c:643
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
Definition: walreceiver.c:817
static void WalRcvDie(int code, Datum arg)
Definition: walreceiver.c:779
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1074
#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
Definition: walreceiver.h:426
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:418
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:404
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
Definition: walreceiver.h:410
#define walrcv_get_conninfo(conn)
Definition: walreceiver.h:408
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:420
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:412
#define walrcv_get_backend_pid(conn)
Definition: walreceiver.h:428
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:422
#define SIGCHLD
Definition: win32_port.h:177
#define SIGHUP
Definition: win32_port.h:167
#define SIG_DFL
Definition: win32_port.h:162
#define SIGPIPE
Definition: win32_port.h:172
#define SIGUSR1
Definition: win32_port.h:179
#define SIGALRM
Definition: win32_port.h:173
#define SIGUSR2
Definition: win32_port.h:180
#define SIG_IGN
Definition: win32_port.h:164
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4206
bool RecoveryInProgress(void)
Definition: xlog.c:5753
int wal_segment_size
Definition: xlog.c:144
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References ARCHIVE_MODE_ALWAYS, Assert(), buf, close, cluster_name, ConditionVariableBroadcast(), ConfigReloadPending, WalRcvData::conninfo, DEBUG1, elog, ereport, errcode(), errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, WalRcvData::force_reply, GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), incoming_message, initStringInfo(), WalRcvData::is_temp_slot, WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latch, WalRcvData::latestWalEndTime, len, load_file(), LOG, LogstreamResult, LSN_FORMAT_ARGS, MAXCONNINFO, MAXFNAMELEN, WalRcvData::mutex, MyLatch, MyProc, MyProcPid, NAMEDATALEN, NAPTIME_PER_CYCLE, NI_MAXHOST, now(), on_shmem_exit(), options, PANIC, pfree(), pg_atomic_write_u64(), pg_memory_barrier, PG_SETMASK, PGC_SIGHUP, PGINVALID_SOCKET, WalRcvData::pid, PointerGetDatum, pqsignal(), proc_exit(), ProcessConfigFile(), ProcessWalRcvInterrupts(), PGPROC::procLatch, procsignal_sigusr1_handler(), WalRcvData::ready_to_display, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, reply_message, ResetLatch(), WalRcvData::sender_host, WalRcvData::sender_port, SIG_DFL, SIG_IGN, SIGALRM, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SIGPIPE, SIGUSR1, SIGUSR2, WalRcvData::slotname, snprintf, SpinLockAcquire, SpinLockRelease, strlcpy(), TimestampTzPlusMilliseconds, UINT64_FORMAT, UnBlockSig, WAIT_EVENT_WAL_RECEIVER_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, wal_segment_size, WalRcv, walrcv_connect, walrcv_create_slot, walrcv_endstreaming, walrcv_get_backend_pid, walrcv_get_conninfo, walrcv_get_senderinfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, WalRcvWaitForStartPosition(), WalReceiverFunctions, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_TIMEOUT, wrconn, WalRcvData::writtenUpto, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName, XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by AuxiliaryProcessMain().

◆ XLogWalRcvClose()

static void XLogWalRcvClose ( XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 1022 of file walreceiver.c.

1023 {
1024  char xlogfname[MAXFNAMELEN];
1025 
1027  Assert(tli != 0);
1028 
1029  /*
1030  * fsync() and close current file before we switch to next one. We would
1031  * otherwise have to reopen this file to fsync it later
1032  */
1033  XLogWalRcvFlush(false, tli);
1034 
1036 
1037  /*
1038  * XLOG segment files will be re-read by recovery in startup process soon,
1039  * so we don't advise the OS to release cache pages associated with the
1040  * file like XLogFileClose() does.
1041  */
1042  if (close(recvFile) != 0)
1043  ereport(PANIC,
1045  errmsg("could not close log segment %s: %m",
1046  xlogfname)));
1047 
1048  /*
1049  * Create .done file forcibly to prevent the streamed segment from being
1050  * archived later.
1051  */
1053  XLogArchiveForceDone(xlogfname);
1054  else
1055  XLogArchiveNotify(xlogfname);
1056 
1057  recvFile = -1;
1058 }
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

References ARCHIVE_MODE_ALWAYS, Assert(), close, ereport, errcode_for_file_access(), errmsg(), MAXFNAMELEN, PANIC, recvFile, recvFileTLI, recvSegNo, wal_segment_size, XLByteInSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName, and XLogWalRcvFlush().

Referenced by XLogWalRcvWrite().

◆ XLogWalRcvFlush()

static void XLogWalRcvFlush ( bool  dying,
TimeLineID  tli 
)
static

Definition at line 967 of file walreceiver.c.

968 {
969  Assert(tli != 0);
970 
971  if (LogstreamResult.Flush < LogstreamResult.Write)
972  {
973  WalRcvData *walrcv = WalRcv;
974 
976 
977  LogstreamResult.Flush = LogstreamResult.Write;
978 
979  /* Update shared-memory status */
980  SpinLockAcquire(&walrcv->mutex);
981  if (walrcv->flushedUpto < LogstreamResult.Flush)
982  {
983  walrcv->latestChunkStart = walrcv->flushedUpto;
984  walrcv->flushedUpto = LogstreamResult.Flush;
985  walrcv->receivedTLI = tli;
986  }
987  SpinLockRelease(&walrcv->mutex);
988 
989  /* Signal the startup process and walsender that new WAL has arrived */
990  WakeupRecovery();
992  WalSndWakeup();
993 
994  /* Report XLOG streaming progress in PS display */
996  {
997  char activitymsg[50];
998 
999  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1001  set_ps_display(activitymsg);
1002  }
1003 
1004  /* Also let the primary know that we made some progress */
1005  if (!dying)
1006  {
1007  XLogWalRcvSendReply(false, false);
1008  XLogWalRcvSendHSFeedback(false);
1009  }
1010  }
1011 }
XLogRecPtr latestChunkStart
Definition: walreceiver.h:94
#define AllowCascadeReplication()
Definition: walreceiver.h:40
void WalSndWakeup(void)
Definition: walsender.c:3309
void issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:7968

References AllowCascadeReplication, Assert(), WalRcvData::flushedUpto, issue_xlog_fsync(), WalRcvData::latestChunkStart, LogstreamResult, LSN_FORMAT_ARGS, WalRcvData::mutex, WalRcvData::receivedTLI, recvFile, recvSegNo, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvClose().

◆ XLogWalRcvProcessMsg()

static void XLogWalRcvProcessMsg ( unsigned char  type,
char *  buf,
Size  len,
TimeLineID  tli 
)
static

Definition at line 817 of file walreceiver.c.

818 {
819  int hdrlen;
820  XLogRecPtr dataStart;
821  XLogRecPtr walEnd;
822  TimestampTz sendTime;
823  bool replyRequested;
824 
826 
827  switch (type)
828  {
829  case 'w': /* WAL records */
830  {
831  /* copy message to StringInfo */
832  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
833  if (len < hdrlen)
834  ereport(ERROR,
835  (errcode(ERRCODE_PROTOCOL_VIOLATION),
836  errmsg_internal("invalid WAL message received from primary")));
838 
839  /* read the fields */
840  dataStart = pq_getmsgint64(&incoming_message);
841  walEnd = pq_getmsgint64(&incoming_message);
842  sendTime = pq_getmsgint64(&incoming_message);
843  ProcessWalSndrMessage(walEnd, sendTime);
844 
845  buf += hdrlen;
846  len -= hdrlen;
847  XLogWalRcvWrite(buf, len, dataStart, tli);
848  break;
849  }
850  case 'k': /* Keepalive */
851  {
852  /* copy message to StringInfo */
853  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
854  if (len != hdrlen)
855  ereport(ERROR,
856  (errcode(ERRCODE_PROTOCOL_VIOLATION),
857  errmsg_internal("invalid keepalive message received from primary")));
859 
860  /* read the fields */
861  walEnd = pq_getmsgint64(&incoming_message);
862  sendTime = pq_getmsgint64(&incoming_message);
863  replyRequested = pq_getmsgbyte(&incoming_message);
864 
865  ProcessWalSndrMessage(walEnd, sendTime);
866 
867  /* If the primary requested a reply, send one immediately */
868  if (replyRequested)
869  XLogWalRcvSendReply(true, false);
870  break;
871  }
872  default:
873  ereport(ERROR,
874  (errcode(ERRCODE_PROTOCOL_VIOLATION),
875  errmsg_internal("invalid replication message type %d",
876  type)));
877  }
878 }
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
Definition: walreceiver.c:1244
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
Definition: walreceiver.c:884

References appendBinaryStringInfo(), buf, ereport, errcode(), errmsg_internal(), ERROR, incoming_message, len, pq_getmsgbyte(), pq_getmsgint64(), ProcessWalSndrMessage(), resetStringInfo(), generate_unaccent_rules::type, XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

◆ XLogWalRcvSendHSFeedback()

static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1143 of file walreceiver.c.

1144 {
1145  TimestampTz now;
1146  FullTransactionId nextFullXid;
1147  TransactionId nextXid;
1148  uint32 xmin_epoch,
1149  catalog_xmin_epoch;
1150  TransactionId xmin,
1151  catalog_xmin;
1152  static TimestampTz sendTime = 0;
1153 
1154  /* initially true so we always send at least one feedback message */
1155  static bool primary_has_standby_xmin = true;
1156 
1157  /*
1158  * If the user doesn't want status to be reported to the primary, be sure
1159  * to exit before doing anything at all.
1160  */
1162  !primary_has_standby_xmin)
1163  return;
1164 
1165  /* Get current timestamp. */
1167 
1168  if (!immed)
1169  {
1170  /*
1171  * Send feedback at most once per wal_receiver_status_interval.
1172  */
1173  if (!TimestampDifferenceExceeds(sendTime, now,
1175  return;
1176  sendTime = now;
1177  }
1178 
1179  /*
1180  * If Hot Standby is not yet accepting connections there is nothing to
1181  * send. Check this after the interval has expired to reduce number of
1182  * calls.
1183  *
1184  * Bailing out here also ensures that we don't send feedback until we've
1185  * read our own replication slot state, so we don't tell the primary to
1186  * discard needed xmin or catalog_xmin from any slots that may exist on
1187  * this replica.
1188  */
1189  if (!HotStandbyActive())
1190  return;
1191 
1192  /*
1193  * Make the expensive call to get the oldest xmin once we are certain
1194  * everything else has been checked.
1195  */
1197  {
1198  GetReplicationHorizons(&xmin, &catalog_xmin);
1199  }
1200  else
1201  {
1202  xmin = InvalidTransactionId;
1203  catalog_xmin = InvalidTransactionId;
1204  }
1205 
1206  /*
1207  * Get epoch and adjust if nextXid and oldestXmin are different sides of
1208  * the epoch boundary.
1209  */
1210  nextFullXid = ReadNextFullTransactionId();
1211  nextXid = XidFromFullTransactionId(nextFullXid);
1212  xmin_epoch = EpochFromFullTransactionId(nextFullXid);
1213  catalog_xmin_epoch = xmin_epoch;
1214  if (nextXid < xmin)
1215  xmin_epoch--;
1216  if (nextXid < catalog_xmin)
1217  catalog_xmin_epoch--;
1218 
1219  elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1220  xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1221 
1222  /* Construct the message and send it. */
1224  pq_sendbyte(&reply_message, 'h');
1226  pq_sendint32(&reply_message, xmin);
1227  pq_sendint32(&reply_message, xmin_epoch);
1228  pq_sendint32(&reply_message, catalog_xmin);
1229  pq_sendint32(&reply_message, catalog_xmin_epoch);
1231  if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1232  primary_has_standby_xmin = true;
1233  else
1234  primary_has_standby_xmin = false;
1235 }
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1705
unsigned int uint32
Definition: c.h:441
uint32 TransactionId
Definition: c.h:587
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:2063
#define InvalidTransactionId
Definition: transam.h:31
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
#define TransactionIdIsValid(xid)
Definition: transam.h:41
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:261
bool hot_standby_feedback
Definition: walreceiver.c:92
int wal_receiver_status_interval
Definition: walreceiver.c:90
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:424
bool HotStandbyActive(void)

References StringInfoData::data, DEBUG2, elog, EpochFromFullTransactionId, GetCurrentTimestamp(), GetReplicationHorizons(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), pq_sendbyte(), pq_sendint32(), pq_sendint64(), ReadNextFullTransactionId(), reply_message, resetStringInfo(), TimestampDifferenceExceeds(), TransactionIdIsValid, wal_receiver_status_interval, walrcv_send, wrconn, and XidFromFullTransactionId.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

◆ XLogWalRcvSendReply()

static void XLogWalRcvSendReply ( bool  force,
bool  requestReply 
)
static

Definition at line 1074 of file walreceiver.c.

1075 {
1076  static XLogRecPtr writePtr = 0;
1077  static XLogRecPtr flushPtr = 0;
1078  XLogRecPtr applyPtr;
1079  static TimestampTz sendTime = 0;
1080  TimestampTz now;
1081 
1082  /*
1083  * If the user doesn't want status to be reported to the primary, be sure
1084  * to exit before doing anything at all.
1085  */
1086  if (!force && wal_receiver_status_interval <= 0)
1087  return;
1088 
1089  /* Get current timestamp. */
1091 
1092  /*
1093  * We can compare the write and flush positions to the last message we
1094  * sent without taking any lock, but the apply position requires a spin
1095  * lock, so we don't check that unless something else has changed or 10
1096  * seconds have passed. This means that the apply WAL location will
1097  * appear, from the primary's point of view, to lag slightly, but since
1098  * this is only for reporting purposes and only on idle systems, that's
1099  * probably OK.
1100  */
1101  if (!force
1102  && writePtr == LogstreamResult.Write
1103  && flushPtr == LogstreamResult.Flush
1104  && !TimestampDifferenceExceeds(sendTime, now,
1106  return;
1107  sendTime = now;
1108 
1109  /* Construct a new message */
1110  writePtr = LogstreamResult.Write;
1111  flushPtr = LogstreamResult.Flush;
1112  applyPtr = GetXLogReplayRecPtr(NULL);
1113 
1115  pq_sendbyte(&reply_message, 'r');
1116  pq_sendint64(&reply_message, writePtr);
1117  pq_sendint64(&reply_message, flushPtr);
1118  pq_sendint64(&reply_message, applyPtr);
1120  pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1121 
1122  /* Send it */
1123  elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1124  LSN_FORMAT_ARGS(writePtr),
1125  LSN_FORMAT_ARGS(flushPtr),
1126  LSN_FORMAT_ARGS(applyPtr),
1127  requestReply ? " (reply requested)" : "");
1128 
1130 }

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), GetXLogReplayRecPtr(), StringInfoData::len, LogstreamResult, LSN_FORMAT_ARGS, now(), pq_sendbyte(), pq_sendint64(), reply_message, resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, walrcv_send, and wrconn.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

◆ XLogWalRcvWrite()

static void XLogWalRcvWrite ( char *  buf,
Size  nbytes,
XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 884 of file walreceiver.c.

885 {
886  int startoff;
887  int byteswritten;
888 
889  Assert(tli != 0);
890 
891  while (nbytes > 0)
892  {
893  int segbytes;
894 
895  /* Close the current segment if it's completed */
896  if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
897  XLogWalRcvClose(recptr, tli);
898 
899  if (recvFile < 0)
900  {
901  /* Create/use new log file */
904  recvFileTLI = tli;
905  }
906 
907  /* Calculate the start offset of the received logs */
908  startoff = XLogSegmentOffset(recptr, wal_segment_size);
909 
910  if (startoff + nbytes > wal_segment_size)
911  segbytes = wal_segment_size - startoff;
912  else
913  segbytes = nbytes;
914 
915  /* OK to write the logs */
916  errno = 0;
917 
918  byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
919  if (byteswritten <= 0)
920  {
921  char xlogfname[MAXFNAMELEN];
922  int save_errno;
923 
924  /* if write didn't set errno, assume no disk space */
925  if (errno == 0)
926  errno = ENOSPC;
927 
928  save_errno = errno;
930  errno = save_errno;
931  ereport(PANIC,
933  errmsg("could not write to log segment %s "
934  "at offset %u, length %lu: %m",
935  xlogfname, startoff, (unsigned long) segbytes)));
936  }
937 
938  /* Update state for write */
939  recptr += byteswritten;
940 
941  nbytes -= byteswritten;
942  buf += byteswritten;
943 
944  LogstreamResult.Write = recptr;
945  }
946 
947  /* Update shared-memory status */
949 
950  /*
951  * Close the current segment if it's fully written up in the last cycle of
952  * the loop, to create its archive notification file soon. Otherwise WAL
953  * archiving of the segment will be delayed until any data in the next
954  * segment is received and written.
955  */
956  if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
957  XLogWalRcvClose(recptr, tli);
958 }
ssize_t pg_pwrite(int fd, const void *buf, size_t nbyte, off_t offset)
Definition: pwrite.c:27
static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
Definition: walreceiver.c:1022
int XLogFileInit(XLogSegNo logsegno, TimeLineID logtli)
Definition: xlog.c:3106
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

References Assert(), buf, ereport, errcode_for_file_access(), errmsg(), LogstreamResult, MAXFNAMELEN, PANIC, pg_atomic_write_u64(), pg_pwrite(), recvFile, recvFileTLI, recvSegNo, wal_segment_size, WalRcv, WalRcvData::writtenUpto, XLByteInSeg, XLByteToSeg, XLogFileInit(), XLogFileName, XLogSegmentOffset, and XLogWalRcvClose().

Referenced by XLogWalRcvProcessMsg().

Variable Documentation

◆ Flush

XLogRecPtr Flush

Definition at line 116 of file walreceiver.c.

◆ hot_standby_feedback

bool hot_standby_feedback

Definition at line 92 of file walreceiver.c.

Referenced by XLogWalRcvSendHSFeedback().

◆ incoming_message

StringInfoData incoming_message
static

Definition at line 120 of file walreceiver.c.

Referenced by WalReceiverMain(), and XLogWalRcvProcessMsg().

◆ 

struct { ... } LogstreamResult

◆ recvFile

int recvFile = -1
static

Definition at line 105 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ recvFileTLI

TimeLineID recvFileTLI = 0
static

Definition at line 106 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), and XLogWalRcvWrite().

◆ recvSegNo

XLogSegNo recvSegNo = 0
static

Definition at line 107 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ reply_message

StringInfoData reply_message
static

◆ wal_receiver_status_interval

int wal_receiver_status_interval

Definition at line 90 of file walreceiver.c.

Referenced by send_feedback(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ wal_receiver_timeout

int wal_receiver_timeout

Definition at line 91 of file walreceiver.c.

Referenced by logicalrep_worker_launch(), LogicalRepApplyLoop(), and WalReceiverMain().

◆ WalReceiverFunctions

WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 96 of file walreceiver.c.

Referenced by _PG_init(), and WalReceiverMain().

◆ wrconn

◆ Write

XLogRecPtr Write

Definition at line 115 of file walreceiver.c.