PostgreSQL Source Code  git master
walreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "access/xlogrecovery.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "common/ip.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NUM_WALRCV_WAKEUPS   (WALRCV_WAKEUP_HSFEEDBACK + 1)
 

Typedefs

typedef enum WalRcvWakeupReason WalRcvWakeupReason
 

Enumerations

enum  WalRcvWakeupReason { WALRCV_WAKEUP_TERMINATE , WALRCV_WAKEUP_PING , WALRCV_WAKEUP_REPLY }
 

Functions

static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len, TimeLineID tli)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvFlush (bool dying, TimeLineID tli)
 
static void XLogWalRcvClose (XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvSendReply (bool force, bool requestReply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
static void WalRcvComputeNextWakeup (WalRcvWakeupReason reason, TimestampTz now)
 
void ProcessWalRcvInterrupts (void)
 
void WalReceiverMain (void)
 
void WalRcvForceReply (void)
 
static const char * WalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
struct {
   XLogRecPtr   Write
 
   XLogRecPtr   Flush
 
LogstreamResult
 
static TimestampTz wakeup [NUM_WALRCV_WAKEUPS]
 
static StringInfoData reply_message
 
static StringInfoData incoming_message
 

Macro Definition Documentation

◆ NUM_WALRCV_WAKEUPS

#define NUM_WALRCV_WAKEUPS   (WALRCV_WAKEUP_HSFEEDBACK + 1)

Definition at line 126 of file walreceiver.c.

Typedef Documentation

◆ WalRcvWakeupReason

Enumeration Type Documentation

◆ WalRcvWakeupReason

Enumerator
WALRCV_WAKEUP_TERMINATE 
WALRCV_WAKEUP_PING 
WALRCV_WAKEUP_REPLY 

Definition at line 120 of file walreceiver.c.

121 {
125  WALRCV_WAKEUP_HSFEEDBACK
126 #define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1)
WalRcvWakeupReason
Definition: walreceiver.c:121
@ WALRCV_WAKEUP_TERMINATE
Definition: walreceiver.c:122
@ WALRCV_WAKEUP_REPLY
Definition: walreceiver.c:124
@ WALRCV_WAKEUP_PING
Definition: walreceiver.c:123

Function Documentation

◆ pg_stat_get_wal_receiver()

Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1396 of file walreceiver.c.

1397 {
1398  TupleDesc tupdesc;
1399  Datum *values;
1400  bool *nulls;
1401  int pid;
1402  bool ready_to_display;
1404  XLogRecPtr receive_start_lsn;
1405  TimeLineID receive_start_tli;
1406  XLogRecPtr written_lsn;
1407  XLogRecPtr flushed_lsn;
1408  TimeLineID received_tli;
1409  TimestampTz last_send_time;
1410  TimestampTz last_receipt_time;
1411  XLogRecPtr latest_end_lsn;
1412  TimestampTz latest_end_time;
1413  char sender_host[NI_MAXHOST];
1414  int sender_port = 0;
1415  char slotname[NAMEDATALEN];
1416  char conninfo[MAXCONNINFO];
1417 
1418  /* Take a lock to ensure value consistency */
1420  pid = (int) WalRcv->pid;
1421  ready_to_display = WalRcv->ready_to_display;
1423  receive_start_lsn = WalRcv->receiveStart;
1424  receive_start_tli = WalRcv->receiveStartTLI;
1425  flushed_lsn = WalRcv->flushedUpto;
1426  received_tli = WalRcv->receivedTLI;
1427  last_send_time = WalRcv->lastMsgSendTime;
1428  last_receipt_time = WalRcv->lastMsgReceiptTime;
1429  latest_end_lsn = WalRcv->latestWalEnd;
1430  latest_end_time = WalRcv->latestWalEndTime;
1431  strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1432  strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
1433  sender_port = WalRcv->sender_port;
1434  strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1436 
1437  /*
1438  * No WAL receiver (or not ready yet), just return a tuple with NULL
1439  * values
1440  */
1441  if (pid == 0 || !ready_to_display)
1442  PG_RETURN_NULL();
1443 
1444  /*
1445  * Read "writtenUpto" without holding a spinlock. Note that it may not be
1446  * consistent with the other shared variables of the WAL receiver
1447  * protected by a spinlock, but this should not be used for data integrity
1448  * checks.
1449  */
1450  written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto);
1451 
1452  /* determine result type */
1453  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1454  elog(ERROR, "return type must be a row type");
1455 
1456  values = palloc0(sizeof(Datum) * tupdesc->natts);
1457  nulls = palloc0(sizeof(bool) * tupdesc->natts);
1458 
1459  /* Fetch values */
1460  values[0] = Int32GetDatum(pid);
1461 
1462  if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
1463  {
1464  /*
1465  * Only superusers and roles with privileges of pg_read_all_stats can
1466  * see details. Other users only get the pid value to know whether it
1467  * is a WAL receiver, but no details.
1468  */
1469  memset(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1470  }
1471  else
1472  {
1474 
1475  if (XLogRecPtrIsInvalid(receive_start_lsn))
1476  nulls[2] = true;
1477  else
1478  values[2] = LSNGetDatum(receive_start_lsn);
1479  values[3] = Int32GetDatum(receive_start_tli);
1480  if (XLogRecPtrIsInvalid(written_lsn))
1481  nulls[4] = true;
1482  else
1483  values[4] = LSNGetDatum(written_lsn);
1484  if (XLogRecPtrIsInvalid(flushed_lsn))
1485  nulls[5] = true;
1486  else
1487  values[5] = LSNGetDatum(flushed_lsn);
1488  values[6] = Int32GetDatum(received_tli);
1489  if (last_send_time == 0)
1490  nulls[7] = true;
1491  else
1492  values[7] = TimestampTzGetDatum(last_send_time);
1493  if (last_receipt_time == 0)
1494  nulls[8] = true;
1495  else
1496  values[8] = TimestampTzGetDatum(last_receipt_time);
1497  if (XLogRecPtrIsInvalid(latest_end_lsn))
1498  nulls[9] = true;
1499  else
1500  values[9] = LSNGetDatum(latest_end_lsn);
1501  if (latest_end_time == 0)
1502  nulls[10] = true;
1503  else
1504  values[10] = TimestampTzGetDatum(latest_end_time);
1505  if (*slotname == '\0')
1506  nulls[11] = true;
1507  else
1508  values[11] = CStringGetTextDatum(slotname);
1509  if (*sender_host == '\0')
1510  nulls[12] = true;
1511  else
1512  values[12] = CStringGetTextDatum(sender_host);
1513  if (sender_port == 0)
1514  nulls[13] = true;
1515  else
1516  values[13] = Int32GetDatum(sender_port);
1517  if (*conninfo == '\0')
1518  nulls[14] = true;
1519  else
1520  values[14] = CStringGetTextDatum(conninfo);
1521  }
1522 
1523  /* Returns the record as Datum */
1525 }
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:4961
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:424
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define CStringGetTextDatum(s)
Definition: builtins.h:94
int64 TimestampTz
Definition: timestamp.h:39
#define ERROR
Definition: elog.h:39
#define PG_RETURN_NULL()
Definition: fmgr.h:345
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:353
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:276
@ TYPEFUNC_COMPOSITE
Definition: funcapi.h:149
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
Definition: funcapi.h:230
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1108
void * palloc0(Size size)
Definition: mcxt.c:1257
Oid GetUserId(void)
Definition: miscinit.c:509
#define NAMEDATALEN
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
uintptr_t Datum
Definition: postgres.h:64
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:102
XLogRecPtr latestWalEnd
Definition: walreceiver.h:107
TimeLineID receiveStartTLI
Definition: walreceiver.h:78
TimeLineID receivedTLI
Definition: walreceiver.h:88
char slotname[NAMEDATALEN]
Definition: walreceiver.h:127
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:120
pid_t pid
Definition: walreceiver.h:66
XLogRecPtr receiveStart
Definition: walreceiver.h:77
XLogRecPtr flushedUpto
Definition: walreceiver.h:87
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:155
TimestampTz lastMsgSendTime
Definition: walreceiver.h:101
WalRcvState walRcvState
Definition: walreceiver.h:67
TimestampTz latestWalEndTime
Definition: walreceiver.h:108
bool ready_to_display
Definition: walreceiver.h:136
int sender_port
Definition: walreceiver.h:121
slock_t mutex
Definition: walreceiver.h:147
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:114
Definition: regguts.h:323
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
static const char * WalRcvGetStateString(WalRcvState state)
Definition: walreceiver.c:1371
#define MAXCONNINFO
Definition: walreceiver.h:39
WalRcvState
Definition: walreceiver.h:48
WalRcvData * WalRcv
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59

References WalRcvData::conninfo, CStringGetTextDatum, elog(), ERROR, WalRcvData::flushedUpto, get_call_result_type(), GetUserId(), has_privs_of_role(), heap_form_tuple(), HeapTupleGetDatum(), Int32GetDatum(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum(), MAXCONNINFO, WalRcvData::mutex, NAMEDATALEN, TupleDescData::natts, palloc0(), pg_atomic_read_u64(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::sender_host, WalRcvData::sender_port, WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, strlcpy(), TimestampTzGetDatum(), TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, WalRcvData::writtenUpto, and XLogRecPtrIsInvalid.

◆ ProcessWalRcvInterrupts()

void ProcessWalRcvInterrupts ( void  )

Definition at line 166 of file walreceiver.c.

167 {
168  /*
169  * Although walreceiver interrupt handling doesn't use the same scheme as
170  * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
171  * any incoming signals on Win32, and also to make sure we process any
172  * barrier events.
173  */
175 
177  {
178  ereport(FATAL,
179  (errcode(ERRCODE_ADMIN_SHUTDOWN),
180  errmsg("terminating walreceiver process due to administrator command")));
181  }
182 }
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define FATAL
Definition: elog.h:41
#define ereport(elevel,...)
Definition: elog.h:149
volatile sig_atomic_t ShutdownRequestPending
Definition: interrupt.c:28
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121

References CHECK_FOR_INTERRUPTS, ereport, errcode(), errmsg(), FATAL, and ShutdownRequestPending.

Referenced by libpqrcv_PQgetResult(), libpqrcv_processTuples(), WalRcvWaitForStartPosition(), and WalReceiverMain().

◆ ProcessWalSndrMessage()

static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1260 of file walreceiver.c.

1261 {
1262  WalRcvData *walrcv = WalRcv;
1263  TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1264 
1265  /* Update shared-memory status */
1266  SpinLockAcquire(&walrcv->mutex);
1267  if (walrcv->latestWalEnd < walEnd)
1268  walrcv->latestWalEndTime = sendTime;
1269  walrcv->latestWalEnd = walEnd;
1270  walrcv->lastMsgSendTime = sendTime;
1271  walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1272  SpinLockRelease(&walrcv->mutex);
1273 
1275  {
1276  char *sendtime;
1277  char *receipttime;
1278  int applyDelay;
1279 
1280  /* Copy because timestamptz_to_str returns a static buffer */
1281  sendtime = pstrdup(timestamptz_to_str(sendTime));
1282  receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1283  applyDelay = GetReplicationApplyDelay();
1284 
1285  /* apply delay is not available */
1286  if (applyDelay == -1)
1287  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1288  sendtime,
1289  receipttime,
1291  else
1292  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1293  sendtime,
1294  receipttime,
1295  applyDelay,
1297 
1298  pfree(sendtime);
1299  pfree(receipttime);
1300  }
1301 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1782
bool message_level_is_interesting(int elevel)
Definition: elog.c:277
#define DEBUG2
Definition: elog.h:29
char * pstrdup(const char *in)
Definition: mcxt.c:1644
void pfree(void *pointer)
Definition: mcxt.c:1456
int GetReplicationApplyDelay(void)
int GetReplicationTransferLatency(void)

References DEBUG2, elog(), GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, message_level_is_interesting(), WalRcvData::mutex, pfree(), pstrdup(), SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

◆ WalRcvComputeNextWakeup()

static void WalRcvComputeNextWakeup ( WalRcvWakeupReason  reason,
TimestampTz  now 
)
static

Definition at line 1312 of file walreceiver.c.

1313 {
1314  switch (reason)
1315  {
1317  if (wal_receiver_timeout <= 0)
1318  wakeup[reason] = TIMESTAMP_INFINITY;
1319  else
1321  break;
1322  case WALRCV_WAKEUP_PING:
1323  if (wal_receiver_timeout <= 0)
1324  wakeup[reason] = TIMESTAMP_INFINITY;
1325  else
1327  break;
1328  case WALRCV_WAKEUP_HSFEEDBACK:
1330  wakeup[reason] = TIMESTAMP_INFINITY;
1331  else
1333  break;
1334  case WALRCV_WAKEUP_REPLY:
1336  wakeup[reason] = TIMESTAMP_INFINITY;
1337  else
1339  break;
1340  /* there's intentionally no default: here */
1341  }
1342 }
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
#define TIMESTAMP_INFINITY
Definition: timestamp.h:150
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
#define TimestampTzPlusSeconds(tz, s)
Definition: timestamp.h:86
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]
Definition: walreceiver.c:132
bool hot_standby_feedback
Definition: walreceiver.c:92
int wal_receiver_status_interval
Definition: walreceiver.c:90
int wal_receiver_timeout
Definition: walreceiver.c:91

References hot_standby_feedback, now(), TIMESTAMP_INFINITY, TimestampTzPlusMilliseconds, TimestampTzPlusSeconds, wakeup, wal_receiver_status_interval, wal_receiver_timeout, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_REPLY, and WALRCV_WAKEUP_TERMINATE.

Referenced by WalReceiverMain(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ WalRcvDie()

static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 800 of file walreceiver.c.

801 {
802  WalRcvData *walrcv = WalRcv;
803  TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
804 
805  Assert(*startpointTLI_p != 0);
806 
807  /* Ensure that all WAL records received are flushed to disk */
808  XLogWalRcvFlush(true, *startpointTLI_p);
809 
810  /* Mark ourselves inactive in shared memory */
811  SpinLockAcquire(&walrcv->mutex);
812  Assert(walrcv->walRcvState == WALRCV_STREAMING ||
813  walrcv->walRcvState == WALRCV_RESTARTING ||
814  walrcv->walRcvState == WALRCV_STARTING ||
815  walrcv->walRcvState == WALRCV_WAITING ||
816  walrcv->walRcvState == WALRCV_STOPPING);
817  Assert(walrcv->pid == MyProcPid);
818  walrcv->walRcvState = WALRCV_STOPPED;
819  walrcv->pid = 0;
820  walrcv->ready_to_display = false;
821  walrcv->latch = NULL;
822  SpinLockRelease(&walrcv->mutex);
823 
825 
826  /* Terminate the connection gracefully. */
827  if (wrconn != NULL)
829 
830  /* Wake up the startup process to notice promptly that we're gone */
831  WakeupRecovery();
832 }
void ConditionVariableBroadcast(ConditionVariable *cv)
int MyProcPid
Definition: globals.c:44
Assert(fmt[strlen(fmt) - 1] !='\n')
void * arg
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312
Latch * latch
Definition: walreceiver.h:145
ConditionVariable walRcvStoppedCV
Definition: walreceiver.h:68
static WalReceiverConn * wrconn
Definition: walreceiver.c:95
static void XLogWalRcvFlush(bool dying, TimeLineID tli)
Definition: walreceiver.c:988
@ WALRCV_STARTING
Definition: walreceiver.h:50
@ WALRCV_STOPPED
Definition: walreceiver.h:49
@ WALRCV_RESTARTING
Definition: walreceiver.h:54
@ WALRCV_STREAMING
Definition: walreceiver.h:52
@ WALRCV_WAITING
Definition: walreceiver.h:53
@ WALRCV_STOPPING
Definition: walreceiver.h:55
#define walrcv_disconnect(conn)
Definition: walreceiver.h:438
void WakeupRecovery(void)

References arg, Assert(), ConditionVariableBroadcast(), DatumGetPointer(), WalRcvData::latch, WalRcvData::mutex, MyProcPid, WalRcvData::pid, WalRcvData::ready_to_display, SpinLockAcquire, SpinLockRelease, WakeupRecovery(), WalRcv, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, wrconn, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

◆ WalRcvFetchTimeLineHistoryFiles()

static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 744 of file walreceiver.c.

745 {
746  TimeLineID tli;
747 
748  for (tli = first; tli <= last; tli++)
749  {
750  /* there's no history file for timeline 1 */
751  if (tli != 1 && !existsTimeLineHistory(tli))
752  {
753  char *fname;
754  char *content;
755  int len;
756  char expectedfname[MAXFNAMELEN];
757 
758  ereport(LOG,
759  (errmsg("fetching timeline history file for timeline %u from primary server",
760  tli)));
761 
762  walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
763 
764  /*
765  * Check that the filename on the primary matches what we
766  * calculated ourselves. This is just a sanity check, it should
767  * always match.
768  */
769  TLHistoryFileName(expectedfname, tli);
770  if (strcmp(fname, expectedfname) != 0)
771  ereport(ERROR,
772  (errcode(ERRCODE_PROTOCOL_VIOLATION),
773  errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
774  tli)));
775 
776  /*
777  * Write the file to pg_wal.
778  */
779  writeTimeLineHistoryFile(tli, content, len);
780 
781  /*
782  * Mark the streamed history file as ready for archiving if
783  * archive_mode is always.
784  */
786  XLogArchiveForceDone(fname);
787  else
788  XLogArchiveNotify(fname);
789 
790  pfree(fname);
791  pfree(content);
792  }
793  }
794 }
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition: timeline.c:463
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition: timeline.c:222
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1156
#define LOG
Definition: elog.h:31
const void size_t len
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
Definition: walreceiver.h:422
int XLogArchiveMode
Definition: xlog.c:122
@ ARCHIVE_MODE_ALWAYS
Definition: xlog.h:62
#define MAXFNAMELEN
static void TLHistoryFileName(char *fname, TimeLineID tli)
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:504
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:438

References ARCHIVE_MODE_ALWAYS, ereport, errcode(), errmsg(), errmsg_internal(), ERROR, existsTimeLineHistory(), len, LOG, MAXFNAMELEN, pfree(), TLHistoryFileName(), walrcv_readtimelinehistoryfile, wrconn, writeTimeLineHistoryFile(), XLogArchiveForceDone(), XLogArchiveMode, and XLogArchiveNotify().

Referenced by WalReceiverMain().

◆ WalRcvForceReply()

void WalRcvForceReply ( void  )

Definition at line 1353 of file walreceiver.c.

1354 {
1355  Latch *latch;
1356 
1357  WalRcv->force_reply = true;
1358  /* fetching the latch pointer might not be atomic, so use spinlock */
1360  latch = WalRcv->latch;
1362  if (latch)
1363  SetLatch(latch);
1364 }
void SetLatch(Latch *latch)
Definition: latch.c:605
Definition: latch.h:111
sig_atomic_t force_reply
Definition: walreceiver.h:162

References WalRcvData::force_reply, WalRcvData::latch, WalRcvData::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by ApplyWalRecord(), and WaitForWALToBecomeAvailable().

◆ WalRcvGetStateString()

static const char* WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1371 of file walreceiver.c.

1372 {
1373  switch (state)
1374  {
1375  case WALRCV_STOPPED:
1376  return "stopped";
1377  case WALRCV_STARTING:
1378  return "starting";
1379  case WALRCV_STREAMING:
1380  return "streaming";
1381  case WALRCV_WAITING:
1382  return "waiting";
1383  case WALRCV_RESTARTING:
1384  return "restarting";
1385  case WALRCV_STOPPING:
1386  return "stopping";
1387  }
1388  return "UNKNOWN";
1389 }

References WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

◆ WalRcvWaitForStartPosition()

static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 664 of file walreceiver.c.

665 {
666  WalRcvData *walrcv = WalRcv;
667  int state;
668 
669  SpinLockAcquire(&walrcv->mutex);
670  state = walrcv->walRcvState;
671  if (state != WALRCV_STREAMING)
672  {
673  SpinLockRelease(&walrcv->mutex);
674  if (state == WALRCV_STOPPING)
675  proc_exit(0);
676  else
677  elog(FATAL, "unexpected walreceiver state");
678  }
679  walrcv->walRcvState = WALRCV_WAITING;
681  walrcv->receiveStartTLI = 0;
682  SpinLockRelease(&walrcv->mutex);
683 
684  set_ps_display("idle");
685 
686  /*
687  * nudge startup process to notice that we've stopped streaming and are
688  * now waiting for instructions.
689  */
690  WakeupRecovery();
691  for (;;)
692  {
694 
696 
697  SpinLockAcquire(&walrcv->mutex);
698  Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
699  walrcv->walRcvState == WALRCV_WAITING ||
700  walrcv->walRcvState == WALRCV_STOPPING);
701  if (walrcv->walRcvState == WALRCV_RESTARTING)
702  {
703  /*
704  * No need to handle changes in primary_conninfo or
705  * primary_slot_name here. Startup process will signal us to
706  * terminate in case those change.
707  */
708  *startpoint = walrcv->receiveStart;
709  *startpointTLI = walrcv->receiveStartTLI;
710  walrcv->walRcvState = WALRCV_STREAMING;
711  SpinLockRelease(&walrcv->mutex);
712  break;
713  }
714  if (walrcv->walRcvState == WALRCV_STOPPING)
715  {
716  /*
717  * We should've received SIGTERM if the startup process wants us
718  * to die, but might as well check it here too.
719  */
720  SpinLockRelease(&walrcv->mutex);
721  exit(1);
722  }
723  SpinLockRelease(&walrcv->mutex);
724 
726  WAIT_EVENT_WAL_RECEIVER_WAIT_START);
727  }
728 
730  {
731  char activitymsg[50];
732 
733  snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
734  LSN_FORMAT_ARGS(*startpoint));
735  set_ps_display(activitymsg);
736  }
737 }
struct Latch * MyLatch
Definition: globals.c:58
void proc_exit(int code)
Definition: ipc.c:104
void ResetLatch(Latch *latch)
Definition: latch.c:697
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:490
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
exit(1)
#define snprintf
Definition: port.h:238
bool update_process_title
Definition: ps_status.c:31
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:166
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert(), elog(), exit(), FATAL, InvalidXLogRecPtr, LSN_FORMAT_ARGS, WalRcvData::mutex, MyLatch, proc_exit(), ProcessWalRcvInterrupts(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, ResetLatch(), set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by WalReceiverMain().

◆ WalReceiverMain()

void WalReceiverMain ( void  )

Definition at line 187 of file walreceiver.c.

188 {
189  char conninfo[MAXCONNINFO];
190  char *tmp_conninfo;
191  char slotname[NAMEDATALEN];
192  bool is_temp_slot;
193  XLogRecPtr startpoint;
194  TimeLineID startpointTLI;
195  TimeLineID primaryTLI;
196  bool first_stream;
197  WalRcvData *walrcv = WalRcv;
199  char *err;
200  char *sender_host = NULL;
201  int sender_port = 0;
202 
203  /*
204  * WalRcv should be set up already (if we are a backend, we inherit this
205  * by fork() or EXEC_BACKEND mechanism from the postmaster).
206  */
207  Assert(walrcv != NULL);
208 
209  /*
210  * Mark walreceiver as running in shared memory.
211  *
212  * Do this as early as possible, so that if we fail later on, we'll set
213  * state to STOPPED. If we die before this, the startup process will keep
214  * waiting for us to start up, until it times out.
215  */
216  SpinLockAcquire(&walrcv->mutex);
217  Assert(walrcv->pid == 0);
218  switch (walrcv->walRcvState)
219  {
220  case WALRCV_STOPPING:
221  /* If we've already been requested to stop, don't start up. */
222  walrcv->walRcvState = WALRCV_STOPPED;
223  /* fall through */
224 
225  case WALRCV_STOPPED:
226  SpinLockRelease(&walrcv->mutex);
228  proc_exit(1);
229  break;
230 
231  case WALRCV_STARTING:
232  /* The usual case */
233  break;
234 
235  case WALRCV_WAITING:
236  case WALRCV_STREAMING:
237  case WALRCV_RESTARTING:
238  default:
239  /* Shouldn't happen */
240  SpinLockRelease(&walrcv->mutex);
241  elog(PANIC, "walreceiver still running according to shared memory state");
242  }
243  /* Advertise our PID so that the startup process can kill us */
244  walrcv->pid = MyProcPid;
245  walrcv->walRcvState = WALRCV_STREAMING;
246 
247  /* Fetch information required to start streaming */
248  walrcv->ready_to_display = false;
249  strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
250  strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
251  is_temp_slot = walrcv->is_temp_slot;
252  startpoint = walrcv->receiveStart;
253  startpointTLI = walrcv->receiveStartTLI;
254 
255  /*
256  * At most one of is_temp_slot and slotname can be set; otherwise,
257  * RequestXLogStreaming messed up.
258  */
259  Assert(!is_temp_slot || (slotname[0] == '\0'));
260 
261  /* Initialise to a sanish value */
263  walrcv->lastMsgSendTime =
264  walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
265 
266  /* Report the latch to use to awaken this process */
267  walrcv->latch = &MyProc->procLatch;
268 
269  SpinLockRelease(&walrcv->mutex);
270 
272 
273  /* Arrange to clean up at walreceiver exit */
274  on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
275 
276  /* Properly accept or ignore signals the postmaster might send us */
277  pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
278  * file */
279  pqsignal(SIGINT, SIG_IGN);
280  pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
281  /* SIGQUIT handler was already set up by InitPostmasterChild */
286 
287  /* Reset some signals that are accepted by postmaster but not here */
289 
290  /* Load the libpq-specific functions */
291  load_file("libpqwalreceiver", false);
292  if (WalReceiverFunctions == NULL)
293  elog(ERROR, "libpqwalreceiver didn't initialize correctly");
294 
295  /* Unblock signals (they were blocked when the postmaster forked us) */
296  sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
297 
298  /* Establish the connection to the primary for XLOG streaming */
299  wrconn = walrcv_connect(conninfo, false, false,
300  cluster_name[0] ? cluster_name : "walreceiver",
301  &err);
302  if (!wrconn)
303  ereport(ERROR,
304  (errcode(ERRCODE_CONNECTION_FAILURE),
305  errmsg("could not connect to the primary server: %s", err)));
306 
307  /*
308  * Save user-visible connection string. This clobbers the original
309  * conninfo, for security. Also save host and port of the sender server
310  * this walreceiver is connected to.
311  */
312  tmp_conninfo = walrcv_get_conninfo(wrconn);
313  walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
314  SpinLockAcquire(&walrcv->mutex);
315  memset(walrcv->conninfo, 0, MAXCONNINFO);
316  if (tmp_conninfo)
317  strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
318 
319  memset(walrcv->sender_host, 0, NI_MAXHOST);
320  if (sender_host)
321  strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
322 
323  walrcv->sender_port = sender_port;
324  walrcv->ready_to_display = true;
325  SpinLockRelease(&walrcv->mutex);
326 
327  if (tmp_conninfo)
328  pfree(tmp_conninfo);
329 
330  if (sender_host)
331  pfree(sender_host);
332 
333  first_stream = true;
334  for (;;)
335  {
336  char *primary_sysid;
337  char standby_sysid[32];
339 
340  /*
341  * Check that we're connected to a valid server using the
342  * IDENTIFY_SYSTEM replication command.
343  */
344  primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
345 
346  snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
348  if (strcmp(primary_sysid, standby_sysid) != 0)
349  {
350  ereport(ERROR,
351  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
352  errmsg("database system identifier differs between the primary and standby"),
353  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
354  primary_sysid, standby_sysid)));
355  }
356 
357  /*
358  * Confirm that the current timeline of the primary is the same or
359  * ahead of ours.
360  */
361  if (primaryTLI < startpointTLI)
362  ereport(ERROR,
363  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
364  errmsg("highest timeline %u of the primary is behind recovery timeline %u",
365  primaryTLI, startpointTLI)));
366 
367  /*
368  * Get any missing history files. We do this always, even when we're
369  * not interested in that timeline, so that if we're promoted to
370  * become the primary later on, we don't select the same timeline that
371  * was already used in the current primary. This isn't bullet-proof -
372  * you'll need some external software to manage your cluster if you
373  * need to ensure that a unique timeline id is chosen in every case,
374  * but let's avoid the confusion of timeline id collisions where we
375  * can.
376  */
377  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
378 
379  /*
380  * Create temporary replication slot if requested, and update slot
381  * name in shared memory. (Note the slot name cannot already be set
382  * in this case.)
383  */
384  if (is_temp_slot)
385  {
386  snprintf(slotname, sizeof(slotname),
387  "pg_walreceiver_%lld",
388  (long long int) walrcv_get_backend_pid(wrconn));
389 
390  walrcv_create_slot(wrconn, slotname, true, false, 0, NULL);
391 
392  SpinLockAcquire(&walrcv->mutex);
393  strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
394  SpinLockRelease(&walrcv->mutex);
395  }
396 
397  /*
398  * Start streaming.
399  *
400  * We'll try to start at the requested starting point and timeline,
401  * even if it's different from the server's latest timeline. In case
402  * we've already reached the end of the old timeline, the server will
403  * finish the streaming immediately, and we will go back to await
404  * orders from the startup process. If recovery_target_timeline is
405  * 'latest', the startup process will scan pg_wal and find the new
406  * history file, bump recovery target timeline, and ask us to restart
407  * on the new timeline.
408  */
409  options.logical = false;
410  options.startpoint = startpoint;
411  options.slotname = slotname[0] != '\0' ? slotname : NULL;
412  options.proto.physical.startpointTLI = startpointTLI;
414  {
415  if (first_stream)
416  ereport(LOG,
417  (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
418  LSN_FORMAT_ARGS(startpoint), startpointTLI)));
419  else
420  ereport(LOG,
421  (errmsg("restarted WAL streaming at %X/%X on timeline %u",
422  LSN_FORMAT_ARGS(startpoint), startpointTLI)));
423  first_stream = false;
424 
425  /* Initialize LogstreamResult and buffers for processing messages */
429 
430  /* Initialize nap wakeup times. */
432  for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
434 
435  /* Send initial reply/feedback messages. */
436  XLogWalRcvSendReply(true, false);
438 
439  /* Loop until end-of-streaming or error */
440  for (;;)
441  {
442  char *buf;
443  int len;
444  bool endofwal = false;
445  pgsocket wait_fd = PGINVALID_SOCKET;
446  int rc;
447  TimestampTz nextWakeup;
448  long nap;
449 
450  /*
451  * Exit walreceiver if we're not in recovery. This should not
452  * happen, but cross-check the status here.
453  */
454  if (!RecoveryInProgress())
455  ereport(FATAL,
456  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
457  errmsg("cannot continue WAL streaming, recovery has already ended")));
458 
459  /* Process any requests or signals received recently */
461 
463  {
464  ConfigReloadPending = false;
466  /* recompute wakeup times */
468  for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
471  }
472 
473  /* See if we can read data immediately */
474  len = walrcv_receive(wrconn, &buf, &wait_fd);
475  if (len != 0)
476  {
477  /*
478  * Process the received data, and any subsequent data we
479  * can read without blocking.
480  */
481  for (;;)
482  {
483  if (len > 0)
484  {
485  /*
486  * Something was received from primary, so adjust
487  * the ping and terminate wakeup times.
488  */
491  now);
493  XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
494  startpointTLI);
495  }
496  else if (len == 0)
497  break;
498  else if (len < 0)
499  {
500  ereport(LOG,
501  (errmsg("replication terminated by primary server"),
502  errdetail("End of WAL reached on timeline %u at %X/%X.",
503  startpointTLI,
505  endofwal = true;
506  break;
507  }
508  len = walrcv_receive(wrconn, &buf, &wait_fd);
509  }
510 
511  /* Let the primary know that we received some data. */
512  XLogWalRcvSendReply(false, false);
513 
514  /*
515  * If we've written some records, flush them to disk and
516  * let the startup process and primary server know about
517  * them.
518  */
519  XLogWalRcvFlush(false, startpointTLI);
520  }
521 
522  /* Check if we need to exit the streaming loop. */
523  if (endofwal)
524  break;
525 
526  /* Find the soonest wakeup time, to limit our nap. */
527  nextWakeup = TIMESTAMP_INFINITY;
528  for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
529  nextWakeup = Min(wakeup[i], nextWakeup);
530 
531  /* Calculate the nap time, clamping as necessary. */
533  nap = TimestampDifferenceMilliseconds(now, nextWakeup);
534 
535  /*
536  * Ideally we would reuse a WaitEventSet object repeatedly
537  * here to avoid the overheads of WaitLatchOrSocket on epoll
538  * systems, but we can't be sure that libpq (or any other
539  * walreceiver implementation) has the same socket (even if
540  * the fd is the same number, it may have been closed and
541  * reopened since the last time). In future, if there is a
542  * function for removing sockets from WaitEventSet, then we
543  * could add and remove just the socket each time, potentially
544  * avoiding some system calls.
545  */
546  Assert(wait_fd != PGINVALID_SOCKET);
550  wait_fd,
551  nap,
552  WAIT_EVENT_WAL_RECEIVER_MAIN);
553  if (rc & WL_LATCH_SET)
554  {
557 
558  if (walrcv->force_reply)
559  {
560  /*
561  * The recovery process has asked us to send apply
562  * feedback now. Make sure the flag is really set to
563  * false in shared memory before sending the reply, so
564  * we don't miss a new request for a reply.
565  */
566  walrcv->force_reply = false;
568  XLogWalRcvSendReply(true, false);
569  }
570  }
571  if (rc & WL_TIMEOUT)
572  {
573  /*
574  * We didn't receive anything new. If we haven't heard
575  * anything from the server for more than
576  * wal_receiver_timeout / 2, ping the server. Also, if
577  * it's been longer than wal_receiver_status_interval
578  * since the last update we sent, send a status update to
579  * the primary anyway, to report any progress in applying
580  * WAL.
581  */
582  bool requestReply = false;
583 
584  /*
585  * Check if time since last receive from primary has
586  * reached the configured limit.
587  */
590  ereport(ERROR,
591  (errcode(ERRCODE_CONNECTION_FAILURE),
592  errmsg("terminating walreceiver due to timeout")));
593 
594  /*
595  * If we didn't receive anything new for half of receiver
596  * replication timeout, then ping the server.
597  */
598  if (now >= wakeup[WALRCV_WAKEUP_PING])
599  {
600  requestReply = true;
602  }
603 
604  XLogWalRcvSendReply(requestReply, requestReply);
606  }
607  }
608 
609  /*
610  * The backend finished streaming. Exit streaming COPY-mode from
611  * our side, too.
612  */
613  walrcv_endstreaming(wrconn, &primaryTLI);
614 
615  /*
616  * If the server had switched to a new timeline that we didn't
617  * know about when we began streaming, fetch its timeline history
618  * file now.
619  */
620  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
621  }
622  else
623  ereport(LOG,
624  (errmsg("primary server contains no more WAL on requested timeline %u",
625  startpointTLI)));
626 
627  /*
628  * End of WAL reached on the requested timeline. Close the last
629  * segment, and await for new orders from the startup process.
630  */
631  if (recvFile >= 0)
632  {
633  char xlogfname[MAXFNAMELEN];
634 
635  XLogWalRcvFlush(false, startpointTLI);
637  if (close(recvFile) != 0)
638  ereport(PANIC,
640  errmsg("could not close WAL segment %s: %m",
641  xlogfname)));
642 
643  /*
644  * Create .done file forcibly to prevent the streamed segment from
645  * being archived later.
646  */
648  XLogArchiveForceDone(xlogfname);
649  else
650  XLogArchiveNotify(xlogfname);
651  }
652  recvFile = -1;
653 
654  elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
655  WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
656  }
657  /* not reached */
658 }
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:433
#define pg_memory_barrier()
Definition: atomics.h:140
sigset_t UnBlockSig
Definition: pqsignal.c:22
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1695
#define Min(x, y)
Definition: c.h:993
#define UINT64_FORMAT
Definition: c.h:538
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:144
int errcode_for_file_access(void)
Definition: elog.c:881
int errdetail(const char *fmt,...)
Definition: elog.c:1202
#define PANIC
Definition: elog.h:42
#define DEBUG1
Definition: elog.h:30
void err(int eval, const char *fmt,...)
Definition: err.c:43
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
char * cluster_name
Definition: guc_tables.c:535
#define close(a)
Definition: win32.h:12
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition: interrupt.c:109
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
int i
Definition: isn.c:73
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:538
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_TIMEOUT
Definition: latch.h:128
static char ** options
static char * buf
Definition: pg_test_fsync.c:67
pqsigfunc pqsignal(int signo, pqsigfunc func)
int pgsocket
Definition: port.h:29
#define PGINVALID_SOCKET
Definition: port.h:31
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:639
PGPROC * MyProc
Definition: proc.c:66
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Latch procLatch
Definition: proc.h:170
bool is_temp_slot
Definition: walreceiver.h:133
#define NUM_WALRCV_WAKEUPS
Definition: walreceiver.c:126
static StringInfoData reply_message
Definition: walreceiver.c:134
static int recvFile
Definition: walreceiver.c:103
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
Definition: walreceiver.c:744
static StringInfoData incoming_message
Definition: walreceiver.c:135
static struct @18 LogstreamResult
static TimeLineID recvFileTLI
Definition: walreceiver.c:104
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:96
static XLogSegNo recvSegNo
Definition: walreceiver.c:105
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1164
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
Definition: walreceiver.c:664
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
Definition: walreceiver.c:838
static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
Definition: walreceiver.c:1312
static void WalRcvDie(int code, Datum arg)
Definition: walreceiver.c:800
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1095
#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
Definition: walreceiver.h:432
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:424
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
Definition: walreceiver.h:416
#define walrcv_get_conninfo(conn)
Definition: walreceiver.h:414
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:426
#define walrcv_connect(conninfo, logical, must_use_password, appname, err)
Definition: walreceiver.h:410
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:418
#define walrcv_get_backend_pid(conn)
Definition: walreceiver.h:434
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:428
#define SIGCHLD
Definition: win32_port.h:178
#define SIGHUP
Definition: win32_port.h:168
#define SIG_DFL
Definition: win32_port.h:163
#define SIGPIPE
Definition: win32_port.h:173
#define SIGUSR1
Definition: win32_port.h:180
#define SIGALRM
Definition: win32_port.h:174
#define SIGUSR2
Definition: win32_port.h:181
#define SIG_IGN
Definition: win32_port.h:165
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4203
bool RecoveryInProgress(void)
Definition: xlog.c:5948
int wal_segment_size
Definition: xlog.c:146
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References ARCHIVE_MODE_ALWAYS, Assert(), buf, close, cluster_name, ConditionVariableBroadcast(), ConfigReloadPending, WalRcvData::conninfo, DEBUG1, elog(), ereport, err(), errcode(), errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, WalRcvData::force_reply, GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), i, incoming_message, initStringInfo(), WalRcvData::is_temp_slot, WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latch, WalRcvData::latestWalEndTime, len, load_file(), LOG, LogstreamResult, LSN_FORMAT_ARGS, MAXCONNINFO, MAXFNAMELEN, Min, WalRcvData::mutex, MyLatch, MyProc, MyProcPid, NAMEDATALEN, now(), NUM_WALRCV_WAKEUPS, on_shmem_exit(), options, PANIC, pfree(), pg_atomic_write_u64(), pg_memory_barrier, PGC_SIGHUP, PGINVALID_SOCKET, WalRcvData::pid, PointerGetDatum(), pqsignal(), proc_exit(), ProcessConfigFile(), ProcessWalRcvInterrupts(), PGPROC::procLatch, procsignal_sigusr1_handler(), WalRcvData::ready_to_display, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, reply_message, ResetLatch(), WalRcvData::sender_host, WalRcvData::sender_port, SIG_DFL, SIG_IGN, SIGALRM, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SIGPIPE, SIGUSR1, SIGUSR2, WalRcvData::slotname, snprintf, SpinLockAcquire, SpinLockRelease, strlcpy(), TIMESTAMP_INFINITY, TimestampDifferenceMilliseconds(), UINT64_FORMAT, UnBlockSig, WaitLatchOrSocket(), wakeup, wal_segment_size, WalRcv, walrcv_connect, walrcv_create_slot, walrcv_endstreaming, walrcv_get_backend_pid, walrcv_get_conninfo, walrcv_get_senderinfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_TERMINATE, WalRcvComputeNextWakeup(), WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, WalRcvWaitForStartPosition(), WalReceiverFunctions, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_TIMEOUT, wrconn, WalRcvData::writtenUpto, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName(), XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by AuxiliaryProcessMain().

◆ XLogWalRcvClose()

static void XLogWalRcvClose ( XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 1043 of file walreceiver.c.

1044 {
1045  char xlogfname[MAXFNAMELEN];
1046 
1048  Assert(tli != 0);
1049 
1050  /*
1051  * fsync() and close current file before we switch to next one. We would
1052  * otherwise have to reopen this file to fsync it later
1053  */
1054  XLogWalRcvFlush(false, tli);
1055 
1057 
1058  /*
1059  * XLOG segment files will be re-read by recovery in startup process soon,
1060  * so we don't advise the OS to release cache pages associated with the
1061  * file like XLogFileClose() does.
1062  */
1063  if (close(recvFile) != 0)
1064  ereport(PANIC,
1066  errmsg("could not close WAL segment %s: %m",
1067  xlogfname)));
1068 
1069  /*
1070  * Create .done file forcibly to prevent the streamed segment from being
1071  * archived later.
1072  */
1074  XLogArchiveForceDone(xlogfname);
1075  else
1076  XLogArchiveNotify(xlogfname);
1077 
1078  recvFile = -1;
1079 }
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

References ARCHIVE_MODE_ALWAYS, Assert(), close, ereport, errcode_for_file_access(), errmsg(), MAXFNAMELEN, PANIC, recvFile, recvFileTLI, recvSegNo, wal_segment_size, XLByteInSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName(), and XLogWalRcvFlush().

Referenced by XLogWalRcvWrite().

◆ XLogWalRcvFlush()

static void XLogWalRcvFlush ( bool  dying,
TimeLineID  tli 
)
static

Definition at line 988 of file walreceiver.c.

989 {
990  Assert(tli != 0);
991 
992  if (LogstreamResult.Flush < LogstreamResult.Write)
993  {
994  WalRcvData *walrcv = WalRcv;
995 
997 
998  LogstreamResult.Flush = LogstreamResult.Write;
999 
1000  /* Update shared-memory status */
1001  SpinLockAcquire(&walrcv->mutex);
1002  if (walrcv->flushedUpto < LogstreamResult.Flush)
1003  {
1004  walrcv->latestChunkStart = walrcv->flushedUpto;
1005  walrcv->flushedUpto = LogstreamResult.Flush;
1006  walrcv->receivedTLI = tli;
1007  }
1008  SpinLockRelease(&walrcv->mutex);
1009 
1010  /* Signal the startup process and walsender that new WAL has arrived */
1011  WakeupRecovery();
1013  WalSndWakeup(true, false);
1014 
1015  /* Report XLOG streaming progress in PS display */
1017  {
1018  char activitymsg[50];
1019 
1020  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1022  set_ps_display(activitymsg);
1023  }
1024 
1025  /* Also let the primary know that we made some progress */
1026  if (!dying)
1027  {
1028  XLogWalRcvSendReply(false, false);
1029  XLogWalRcvSendHSFeedback(false);
1030  }
1031  }
1032 }
XLogRecPtr latestChunkStart
Definition: walreceiver.h:96
#define AllowCascadeReplication()
Definition: walreceiver.h:42
void WalSndWakeup(bool physical, bool logical)
Definition: walsender.c:3333
void issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:8186

References AllowCascadeReplication, Assert(), WalRcvData::flushedUpto, issue_xlog_fsync(), WalRcvData::latestChunkStart, LogstreamResult, LSN_FORMAT_ARGS, WalRcvData::mutex, WalRcvData::receivedTLI, recvFile, recvSegNo, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvClose().

◆ XLogWalRcvProcessMsg()

static void XLogWalRcvProcessMsg ( unsigned char  type,
char *  buf,
Size  len,
TimeLineID  tli 
)
static

Definition at line 838 of file walreceiver.c.

839 {
840  int hdrlen;
841  XLogRecPtr dataStart;
842  XLogRecPtr walEnd;
843  TimestampTz sendTime;
844  bool replyRequested;
845 
847 
848  switch (type)
849  {
850  case 'w': /* WAL records */
851  {
852  /* copy message to StringInfo */
853  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
854  if (len < hdrlen)
855  ereport(ERROR,
856  (errcode(ERRCODE_PROTOCOL_VIOLATION),
857  errmsg_internal("invalid WAL message received from primary")));
859 
860  /* read the fields */
861  dataStart = pq_getmsgint64(&incoming_message);
862  walEnd = pq_getmsgint64(&incoming_message);
863  sendTime = pq_getmsgint64(&incoming_message);
864  ProcessWalSndrMessage(walEnd, sendTime);
865 
866  buf += hdrlen;
867  len -= hdrlen;
868  XLogWalRcvWrite(buf, len, dataStart, tli);
869  break;
870  }
871  case 'k': /* Keepalive */
872  {
873  /* copy message to StringInfo */
874  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
875  if (len != hdrlen)
876  ereport(ERROR,
877  (errcode(ERRCODE_PROTOCOL_VIOLATION),
878  errmsg_internal("invalid keepalive message received from primary")));
880 
881  /* read the fields */
882  walEnd = pq_getmsgint64(&incoming_message);
883  sendTime = pq_getmsgint64(&incoming_message);
884  replyRequested = pq_getmsgbyte(&incoming_message);
885 
886  ProcessWalSndrMessage(walEnd, sendTime);
887 
888  /* If the primary requested a reply, send one immediately */
889  if (replyRequested)
890  XLogWalRcvSendReply(true, false);
891  break;
892  }
893  default:
894  ereport(ERROR,
895  (errcode(ERRCODE_PROTOCOL_VIOLATION),
896  errmsg_internal("invalid replication message type %d",
897  type)));
898  }
899 }
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:402
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:456
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition: stringinfo.c:227
const char * type
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
Definition: walreceiver.c:1260
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
Definition: walreceiver.c:905

References appendBinaryStringInfo(), buf, ereport, errcode(), errmsg_internal(), ERROR, incoming_message, len, pq_getmsgbyte(), pq_getmsgint64(), ProcessWalSndrMessage(), resetStringInfo(), type, XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

◆ XLogWalRcvSendHSFeedback()

static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1164 of file walreceiver.c.

1165 {
1166  TimestampTz now;
1167  FullTransactionId nextFullXid;
1168  TransactionId nextXid;
1169  uint32 xmin_epoch,
1170  catalog_xmin_epoch;
1171  TransactionId xmin,
1172  catalog_xmin;
1173 
1174  /* initially true so we always send at least one feedback message */
1175  static bool primary_has_standby_xmin = true;
1176 
1177  /*
1178  * If the user doesn't want status to be reported to the primary, be sure
1179  * to exit before doing anything at all.
1180  */
1182  !primary_has_standby_xmin)
1183  return;
1184 
1185  /* Get current timestamp. */
1187 
1188  /* Send feedback at most once per wal_receiver_status_interval. */
1189  if (!immed && now < wakeup[WALRCV_WAKEUP_HSFEEDBACK])
1190  return;
1191 
1192  /* Make sure we wake up when it's time to send feedback again. */
1193  WalRcvComputeNextWakeup(WALRCV_WAKEUP_HSFEEDBACK, now);
1194 
1195  /*
1196  * If Hot Standby is not yet accepting connections there is nothing to
1197  * send. Check this after the interval has expired to reduce number of
1198  * calls.
1199  *
1200  * Bailing out here also ensures that we don't send feedback until we've
1201  * read our own replication slot state, so we don't tell the primary to
1202  * discard needed xmin or catalog_xmin from any slots that may exist on
1203  * this replica.
1204  */
1205  if (!HotStandbyActive())
1206  return;
1207 
1208  /*
1209  * Make the expensive call to get the oldest xmin once we are certain
1210  * everything else has been checked.
1211  */
1213  {
1214  GetReplicationHorizons(&xmin, &catalog_xmin);
1215  }
1216  else
1217  {
1218  xmin = InvalidTransactionId;
1219  catalog_xmin = InvalidTransactionId;
1220  }
1221 
1222  /*
1223  * Get epoch and adjust if nextXid and oldestXmin are different sides of
1224  * the epoch boundary.
1225  */
1226  nextFullXid = ReadNextFullTransactionId();
1227  nextXid = XidFromFullTransactionId(nextFullXid);
1228  xmin_epoch = EpochFromFullTransactionId(nextFullXid);
1229  catalog_xmin_epoch = xmin_epoch;
1230  if (nextXid < xmin)
1231  xmin_epoch--;
1232  if (nextXid < catalog_xmin)
1233  catalog_xmin_epoch--;
1234 
1235  elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1236  xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1237 
1238  /* Construct the message and send it. */
1240  pq_sendbyte(&reply_message, 'h');
1242  pq_sendint32(&reply_message, xmin);
1243  pq_sendint32(&reply_message, xmin_epoch);
1244  pq_sendint32(&reply_message, catalog_xmin);
1245  pq_sendint32(&reply_message, catalog_xmin_epoch);
1247  if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1248  primary_has_standby_xmin = true;
1249  else
1250  primary_has_standby_xmin = false;
1251 }
unsigned int uint32
Definition: c.h:495
uint32 TransactionId
Definition: c.h:641
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:2028
#define InvalidTransactionId
Definition: transam.h:31
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
#define TransactionIdIsValid(xid)
Definition: transam.h:41
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:261
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:430
bool HotStandbyActive(void)

References StringInfoData::data, DEBUG2, elog(), EpochFromFullTransactionId, GetCurrentTimestamp(), GetReplicationHorizons(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), pq_sendbyte(), pq_sendint32(), pq_sendint64(), ReadNextFullTransactionId(), reply_message, resetStringInfo(), TransactionIdIsValid, wakeup, wal_receiver_status_interval, walrcv_send, WalRcvComputeNextWakeup(), wrconn, and XidFromFullTransactionId.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

◆ XLogWalRcvSendReply()

static void XLogWalRcvSendReply ( bool  force,
bool  requestReply 
)
static

Definition at line 1095 of file walreceiver.c.

1096 {
1097  static XLogRecPtr writePtr = 0;
1098  static XLogRecPtr flushPtr = 0;
1099  XLogRecPtr applyPtr;
1100  TimestampTz now;
1101 
1102  /*
1103  * If the user doesn't want status to be reported to the primary, be sure
1104  * to exit before doing anything at all.
1105  */
1106  if (!force && wal_receiver_status_interval <= 0)
1107  return;
1108 
1109  /* Get current timestamp. */
1111 
1112  /*
1113  * We can compare the write and flush positions to the last message we
1114  * sent without taking any lock, but the apply position requires a spin
1115  * lock, so we don't check that unless something else has changed or 10
1116  * seconds have passed. This means that the apply WAL location will
1117  * appear, from the primary's point of view, to lag slightly, but since
1118  * this is only for reporting purposes and only on idle systems, that's
1119  * probably OK.
1120  */
1121  if (!force
1122  && writePtr == LogstreamResult.Write
1123  && flushPtr == LogstreamResult.Flush
1125  return;
1126 
1127  /* Make sure we wake up when it's time to send another reply. */
1129 
1130  /* Construct a new message */
1131  writePtr = LogstreamResult.Write;
1132  flushPtr = LogstreamResult.Flush;
1133  applyPtr = GetXLogReplayRecPtr(NULL);
1134 
1136  pq_sendbyte(&reply_message, 'r');
1137  pq_sendint64(&reply_message, writePtr);
1138  pq_sendint64(&reply_message, flushPtr);
1139  pq_sendint64(&reply_message, applyPtr);
1141  pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1142 
1143  /* Send it */
1144  elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1145  LSN_FORMAT_ARGS(writePtr),
1146  LSN_FORMAT_ARGS(flushPtr),
1147  LSN_FORMAT_ARGS(applyPtr),
1148  requestReply ? " (reply requested)" : "");
1149 
1151 }

References StringInfoData::data, DEBUG2, elog(), GetCurrentTimestamp(), GetXLogReplayRecPtr(), StringInfoData::len, LogstreamResult, LSN_FORMAT_ARGS, now(), pq_sendbyte(), pq_sendint64(), reply_message, resetStringInfo(), wakeup, wal_receiver_status_interval, walrcv_send, WALRCV_WAKEUP_REPLY, WalRcvComputeNextWakeup(), and wrconn.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

◆ XLogWalRcvWrite()

static void XLogWalRcvWrite ( char *  buf,
Size  nbytes,
XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 905 of file walreceiver.c.

906 {
907  int startoff;
908  int byteswritten;
909 
910  Assert(tli != 0);
911 
912  while (nbytes > 0)
913  {
914  int segbytes;
915 
916  /* Close the current segment if it's completed */
917  if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
918  XLogWalRcvClose(recptr, tli);
919 
920  if (recvFile < 0)
921  {
922  /* Create/use new log file */
925  recvFileTLI = tli;
926  }
927 
928  /* Calculate the start offset of the received logs */
929  startoff = XLogSegmentOffset(recptr, wal_segment_size);
930 
931  if (startoff + nbytes > wal_segment_size)
932  segbytes = wal_segment_size - startoff;
933  else
934  segbytes = nbytes;
935 
936  /* OK to write the logs */
937  errno = 0;
938 
939  byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
940  if (byteswritten <= 0)
941  {
942  char xlogfname[MAXFNAMELEN];
943  int save_errno;
944 
945  /* if write didn't set errno, assume no disk space */
946  if (errno == 0)
947  errno = ENOSPC;
948 
949  save_errno = errno;
951  errno = save_errno;
952  ereport(PANIC,
954  errmsg("could not write to WAL segment %s "
955  "at offset %u, length %lu: %m",
956  xlogfname, startoff, (unsigned long) segbytes)));
957  }
958 
959  /* Update state for write */
960  recptr += byteswritten;
961 
962  nbytes -= byteswritten;
963  buf += byteswritten;
964 
965  LogstreamResult.Write = recptr;
966  }
967 
968  /* Update shared-memory status */
970 
971  /*
972  * Close the current segment if it's fully written up in the last cycle of
973  * the loop, to create its archive notification file soon. Otherwise WAL
974  * archiving of the segment will be delayed until any data in the next
975  * segment is received and written.
976  */
977  if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
978  XLogWalRcvClose(recptr, tli);
979 }
#define pg_pwrite
Definition: port.h:226
static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
Definition: walreceiver.c:1043
int XLogFileInit(XLogSegNo logsegno, TimeLineID logtli)
Definition: xlog.c:3113
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

References Assert(), buf, ereport, errcode_for_file_access(), errmsg(), LogstreamResult, MAXFNAMELEN, PANIC, pg_atomic_write_u64(), pg_pwrite, recvFile, recvFileTLI, recvSegNo, wal_segment_size, WalRcv, WalRcvData::writtenUpto, XLByteInSeg, XLByteToSeg, XLogFileInit(), XLogFileName(), XLogSegmentOffset, and XLogWalRcvClose().

Referenced by XLogWalRcvProcessMsg().

Variable Documentation

◆ Flush

XLogRecPtr Flush

Definition at line 114 of file walreceiver.c.

◆ hot_standby_feedback

bool hot_standby_feedback

Definition at line 92 of file walreceiver.c.

Referenced by WalRcvComputeNextWakeup(), and XLogWalRcvSendHSFeedback().

◆ incoming_message

StringInfoData incoming_message
static

Definition at line 135 of file walreceiver.c.

Referenced by WalReceiverMain(), and XLogWalRcvProcessMsg().

◆ 

struct { ... } LogstreamResult

◆ recvFile

int recvFile = -1
static

Definition at line 103 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ recvFileTLI

TimeLineID recvFileTLI = 0
static

Definition at line 104 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), and XLogWalRcvWrite().

◆ recvSegNo

XLogSegNo recvSegNo = 0
static

Definition at line 105 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ reply_message

StringInfoData reply_message
static

◆ wakeup

◆ wal_receiver_status_interval

int wal_receiver_status_interval

◆ wal_receiver_timeout

int wal_receiver_timeout

◆ WalReceiverFunctions

WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 96 of file walreceiver.c.

Referenced by _PG_init(), and WalReceiverMain().

◆ wrconn

◆ Write

XLogRecPtr Write

Definition at line 113 of file walreceiver.c.