PostgreSQL Source Code  git master
walreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "access/xlogrecovery.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "common/ip.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Typedefs

typedef enum WalRcvWakeupReason WalRcvWakeupReason
 

Enumerations

enum  WalRcvWakeupReason {
  WALRCV_WAKEUP_TERMINATE , WALRCV_WAKEUP_PING , WALRCV_WAKEUP_REPLY , WALRCV_WAKEUP_HSFEEDBACK ,
  NUM_WALRCV_WAKEUPS
}
 

Functions

static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len, TimeLineID tli)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvFlush (bool dying, TimeLineID tli)
 
static void XLogWalRcvClose (XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvSendReply (bool force, bool requestReply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
static void WalRcvComputeNextWakeup (WalRcvWakeupReason reason, TimestampTz now)
 
void ProcessWalRcvInterrupts (void)
 
void WalReceiverMain (void)
 
void WalRcvForceReply (void)
 
static const char * WalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
struct {
   XLogRecPtr   Write
 
   XLogRecPtr   Flush
 
LogstreamResult
 
static TimestampTz wakeup [NUM_WALRCV_WAKEUPS]
 
static StringInfoData reply_message
 
static StringInfoData incoming_message
 

Typedef Documentation

◆ WalRcvWakeupReason

Enumeration Type Documentation

◆ WalRcvWakeupReason

Enumerator
WALRCV_WAKEUP_TERMINATE 
WALRCV_WAKEUP_PING 
WALRCV_WAKEUP_REPLY 
WALRCV_WAKEUP_HSFEEDBACK 
NUM_WALRCV_WAKEUPS 

Definition at line 120 of file walreceiver.c.

121 {
WalRcvWakeupReason
Definition: walreceiver.c:121
@ NUM_WALRCV_WAKEUPS
Definition: walreceiver.c:126
@ WALRCV_WAKEUP_TERMINATE
Definition: walreceiver.c:122
@ WALRCV_WAKEUP_REPLY
Definition: walreceiver.c:124
@ WALRCV_WAKEUP_PING
Definition: walreceiver.c:123
@ WALRCV_WAKEUP_HSFEEDBACK
Definition: walreceiver.c:125

Function Documentation

◆ pg_stat_get_wal_receiver()

Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1401 of file walreceiver.c.

1402 {
1403  TupleDesc tupdesc;
1404  Datum *values;
1405  bool *nulls;
1406  int pid;
1407  bool ready_to_display;
1409  XLogRecPtr receive_start_lsn;
1410  TimeLineID receive_start_tli;
1411  XLogRecPtr written_lsn;
1412  XLogRecPtr flushed_lsn;
1413  TimeLineID received_tli;
1414  TimestampTz last_send_time;
1415  TimestampTz last_receipt_time;
1416  XLogRecPtr latest_end_lsn;
1417  TimestampTz latest_end_time;
1418  char sender_host[NI_MAXHOST];
1419  int sender_port = 0;
1420  char slotname[NAMEDATALEN];
1421  char conninfo[MAXCONNINFO];
1422 
1423  /* Take a lock to ensure value consistency */
1425  pid = (int) WalRcv->pid;
1426  ready_to_display = WalRcv->ready_to_display;
1428  receive_start_lsn = WalRcv->receiveStart;
1429  receive_start_tli = WalRcv->receiveStartTLI;
1430  flushed_lsn = WalRcv->flushedUpto;
1431  received_tli = WalRcv->receivedTLI;
1432  last_send_time = WalRcv->lastMsgSendTime;
1433  last_receipt_time = WalRcv->lastMsgReceiptTime;
1434  latest_end_lsn = WalRcv->latestWalEnd;
1435  latest_end_time = WalRcv->latestWalEndTime;
1436  strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1437  strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
1438  sender_port = WalRcv->sender_port;
1439  strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1441 
1442  /*
1443  * No WAL receiver (or not ready yet), just return a tuple with NULL
1444  * values
1445  */
1446  if (pid == 0 || !ready_to_display)
1447  PG_RETURN_NULL();
1448 
1449  /*
1450  * Read "writtenUpto" without holding a spinlock. Note that it may not be
1451  * consistent with the other shared variables of the WAL receiver
1452  * protected by a spinlock, but this should not be used for data integrity
1453  * checks.
1454  */
1455  written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto);
1456 
1457  /* determine result type */
1458  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1459  elog(ERROR, "return type must be a row type");
1460 
1461  values = palloc0(sizeof(Datum) * tupdesc->natts);
1462  nulls = palloc0(sizeof(bool) * tupdesc->natts);
1463 
1464  /* Fetch values */
1465  values[0] = Int32GetDatum(pid);
1466 
1467  if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
1468  {
1469  /*
1470  * Only superusers and roles with privileges of pg_read_all_stats can
1471  * see details. Other users only get the pid value to know whether it
1472  * is a WAL receiver, but no details.
1473  */
1474  memset(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1475  }
1476  else
1477  {
1479 
1480  if (XLogRecPtrIsInvalid(receive_start_lsn))
1481  nulls[2] = true;
1482  else
1483  values[2] = LSNGetDatum(receive_start_lsn);
1484  values[3] = Int32GetDatum(receive_start_tli);
1485  if (XLogRecPtrIsInvalid(written_lsn))
1486  nulls[4] = true;
1487  else
1488  values[4] = LSNGetDatum(written_lsn);
1489  if (XLogRecPtrIsInvalid(flushed_lsn))
1490  nulls[5] = true;
1491  else
1492  values[5] = LSNGetDatum(flushed_lsn);
1493  values[6] = Int32GetDatum(received_tli);
1494  if (last_send_time == 0)
1495  nulls[7] = true;
1496  else
1497  values[7] = TimestampTzGetDatum(last_send_time);
1498  if (last_receipt_time == 0)
1499  nulls[8] = true;
1500  else
1501  values[8] = TimestampTzGetDatum(last_receipt_time);
1502  if (XLogRecPtrIsInvalid(latest_end_lsn))
1503  nulls[9] = true;
1504  else
1505  values[9] = LSNGetDatum(latest_end_lsn);
1506  if (latest_end_time == 0)
1507  nulls[10] = true;
1508  else
1509  values[10] = TimestampTzGetDatum(latest_end_time);
1510  if (*slotname == '\0')
1511  nulls[11] = true;
1512  else
1513  values[11] = CStringGetTextDatum(slotname);
1514  if (*sender_host == '\0')
1515  nulls[12] = true;
1516  else
1517  values[12] = CStringGetTextDatum(sender_host);
1518  if (sender_port == 0)
1519  nulls[13] = true;
1520  else
1521  values[13] = Int32GetDatum(sender_port);
1522  if (*conninfo == '\0')
1523  nulls[14] = true;
1524  else
1525  values[14] = CStringGetTextDatum(conninfo);
1526  }
1527 
1528  /* Returns the record as Datum */
1530 }
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:4933
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:424
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define CStringGetTextDatum(s)
Definition: builtins.h:85
int64 TimestampTz
Definition: timestamp.h:39
#define ERROR
Definition: elog.h:35
#define PG_RETURN_NULL()
Definition: fmgr.h:345
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:353
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:276
@ TYPEFUNC_COMPOSITE
Definition: funcapi.h:149
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
Definition: funcapi.h:230
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
void * palloc0(Size size)
Definition: mcxt.c:1230
Oid GetUserId(void)
Definition: miscinit.c:497
#define NAMEDATALEN
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
uintptr_t Datum
Definition: postgres.h:412
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:560
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:102
XLogRecPtr latestWalEnd
Definition: walreceiver.h:107
TimeLineID receiveStartTLI
Definition: walreceiver.h:78
TimeLineID receivedTLI
Definition: walreceiver.h:88
char slotname[NAMEDATALEN]
Definition: walreceiver.h:127
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:120
pid_t pid
Definition: walreceiver.h:66
XLogRecPtr receiveStart
Definition: walreceiver.h:77
XLogRecPtr flushedUpto
Definition: walreceiver.h:87
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:155
TimestampTz lastMsgSendTime
Definition: walreceiver.h:101
WalRcvState walRcvState
Definition: walreceiver.h:67
TimestampTz latestWalEndTime
Definition: walreceiver.h:108
bool ready_to_display
Definition: walreceiver.h:136
int sender_port
Definition: walreceiver.h:121
slock_t mutex
Definition: walreceiver.h:147
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:114
Definition: regguts.h:318
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
static const char * WalRcvGetStateString(WalRcvState state)
Definition: walreceiver.c:1376
#define MAXCONNINFO
Definition: walreceiver.h:39
WalRcvState
Definition: walreceiver.h:48
WalRcvData * WalRcv
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59

References WalRcvData::conninfo, CStringGetTextDatum, elog(), ERROR, WalRcvData::flushedUpto, get_call_result_type(), GetUserId(), has_privs_of_role(), heap_form_tuple(), HeapTupleGetDatum(), Int32GetDatum(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum(), MAXCONNINFO, WalRcvData::mutex, NAMEDATALEN, TupleDescData::natts, palloc0(), pg_atomic_read_u64(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::sender_host, WalRcvData::sender_port, WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, strlcpy(), TimestampTzGetDatum(), TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, WalRcvData::writtenUpto, and XLogRecPtrIsInvalid.

◆ ProcessWalRcvInterrupts()

void ProcessWalRcvInterrupts ( void  )

Definition at line 166 of file walreceiver.c.

167 {
168  /*
169  * Although walreceiver interrupt handling doesn't use the same scheme as
170  * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
171  * any incoming signals on Win32, and also to make sure we process any
172  * barrier events.
173  */
175 
177  {
178  ereport(FATAL,
179  (errcode(ERRCODE_ADMIN_SHUTDOWN),
180  errmsg("terminating walreceiver process due to administrator command")));
181  }
182 }
int errcode(int sqlerrcode)
Definition: elog.c:695
int errmsg(const char *fmt,...)
Definition: elog.c:906
#define FATAL
Definition: elog.h:37
#define ereport(elevel,...)
Definition: elog.h:145
volatile sig_atomic_t ShutdownRequestPending
Definition: interrupt.c:28
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121

References CHECK_FOR_INTERRUPTS, ereport, errcode(), errmsg(), FATAL, and ShutdownRequestPending.

Referenced by libpqrcv_connect(), libpqrcv_PQgetResult(), libpqrcv_processTuples(), WalRcvWaitForStartPosition(), and WalReceiverMain().

◆ ProcessWalSndrMessage()

static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1266 of file walreceiver.c.

1267 {
1268  WalRcvData *walrcv = WalRcv;
1269 
1270  TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1271 
1272  /* Update shared-memory status */
1273  SpinLockAcquire(&walrcv->mutex);
1274  if (walrcv->latestWalEnd < walEnd)
1275  walrcv->latestWalEndTime = sendTime;
1276  walrcv->latestWalEnd = walEnd;
1277  walrcv->lastMsgSendTime = sendTime;
1278  walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1279  SpinLockRelease(&walrcv->mutex);
1280 
1282  {
1283  char *sendtime;
1284  char *receipttime;
1285  int applyDelay;
1286 
1287  /* Copy because timestamptz_to_str returns a static buffer */
1288  sendtime = pstrdup(timestamptz_to_str(sendTime));
1289  receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1290  applyDelay = GetReplicationApplyDelay();
1291 
1292  /* apply delay is not available */
1293  if (applyDelay == -1)
1294  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1295  sendtime,
1296  receipttime,
1298  else
1299  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1300  sendtime,
1301  receipttime,
1302  applyDelay,
1304 
1305  pfree(sendtime);
1306  pfree(receipttime);
1307  }
1308 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1573
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1782
bool message_level_is_interesting(int elevel)
Definition: elog.c:269
#define DEBUG2
Definition: elog.h:25
char * pstrdup(const char *in)
Definition: mcxt.c:1483
void pfree(void *pointer)
Definition: mcxt.c:1306
int GetReplicationApplyDelay(void)
int GetReplicationTransferLatency(void)

References DEBUG2, elog(), GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, message_level_is_interesting(), WalRcvData::mutex, pfree(), pstrdup(), SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

◆ WalRcvComputeNextWakeup()

static void WalRcvComputeNextWakeup ( WalRcvWakeupReason  reason,
TimestampTz  now 
)
static

Definition at line 1316 of file walreceiver.c.

1317 {
1318  switch (reason)
1319  {
1321  if (wal_receiver_timeout <= 0)
1322  wakeup[reason] = PG_INT64_MAX;
1323  else
1324  wakeup[reason] = now + wal_receiver_timeout * INT64CONST(1000);
1325  break;
1326  case WALRCV_WAKEUP_PING:
1327  if (wal_receiver_timeout <= 0)
1328  wakeup[reason] = PG_INT64_MAX;
1329  else
1330  wakeup[reason] = now + (wal_receiver_timeout / 2) * INT64CONST(1000);
1331  break;
1334  wakeup[reason] = PG_INT64_MAX;
1335  else
1336  wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000);
1337  break;
1338  case WALRCV_WAKEUP_REPLY:
1340  wakeup[reason] = PG_INT64_MAX;
1341  else
1342  wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000);
1343  break;
1344  default:
1345  break;
1346  }
1347 }
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1537
#define PG_INT64_MAX
Definition: c.h:528
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]
Definition: walreceiver.c:132
bool hot_standby_feedback
Definition: walreceiver.c:92
int wal_receiver_status_interval
Definition: walreceiver.c:90
int wal_receiver_timeout
Definition: walreceiver.c:91

References hot_standby_feedback, now(), PG_INT64_MAX, wakeup, wal_receiver_status_interval, wal_receiver_timeout, WALRCV_WAKEUP_HSFEEDBACK, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_REPLY, and WALRCV_WAKEUP_TERMINATE.

Referenced by WalReceiverMain(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ WalRcvDie()

static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 806 of file walreceiver.c.

807 {
808  WalRcvData *walrcv = WalRcv;
809  TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
810 
811  Assert(*startpointTLI_p != 0);
812 
813  /* Ensure that all WAL records received are flushed to disk */
814  XLogWalRcvFlush(true, *startpointTLI_p);
815 
816  /* Mark ourselves inactive in shared memory */
817  SpinLockAcquire(&walrcv->mutex);
818  Assert(walrcv->walRcvState == WALRCV_STREAMING ||
819  walrcv->walRcvState == WALRCV_RESTARTING ||
820  walrcv->walRcvState == WALRCV_STARTING ||
821  walrcv->walRcvState == WALRCV_WAITING ||
822  walrcv->walRcvState == WALRCV_STOPPING);
823  Assert(walrcv->pid == MyProcPid);
824  walrcv->walRcvState = WALRCV_STOPPED;
825  walrcv->pid = 0;
826  walrcv->ready_to_display = false;
827  walrcv->latch = NULL;
828  SpinLockRelease(&walrcv->mutex);
829 
831 
832  /* Terminate the connection gracefully. */
833  if (wrconn != NULL)
835 
836  /* Wake up the startup process to notice promptly that we're gone */
837  WakeupRecovery();
838 }
void ConditionVariableBroadcast(ConditionVariable *cv)
int MyProcPid
Definition: globals.c:44
Assert(fmt[strlen(fmt) - 1] !='\n')
void * arg
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:660
Latch * latch
Definition: walreceiver.h:145
ConditionVariable walRcvStoppedCV
Definition: walreceiver.h:68
static WalReceiverConn * wrconn
Definition: walreceiver.c:95
static void XLogWalRcvFlush(bool dying, TimeLineID tli)
Definition: walreceiver.c:994
@ WALRCV_STARTING
Definition: walreceiver.h:50
@ WALRCV_STOPPED
Definition: walreceiver.h:49
@ WALRCV_RESTARTING
Definition: walreceiver.h:54
@ WALRCV_STREAMING
Definition: walreceiver.h:52
@ WALRCV_WAITING
Definition: walreceiver.h:53
@ WALRCV_STOPPING
Definition: walreceiver.h:55
#define walrcv_disconnect(conn)
Definition: walreceiver.h:436
void WakeupRecovery(void)

References arg, Assert(), ConditionVariableBroadcast(), DatumGetPointer(), WalRcvData::latch, WalRcvData::mutex, MyProcPid, WalRcvData::pid, WalRcvData::ready_to_display, SpinLockAcquire, SpinLockRelease, WakeupRecovery(), WalRcv, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, wrconn, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

◆ WalRcvFetchTimeLineHistoryFiles()

static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 750 of file walreceiver.c.

751 {
752  TimeLineID tli;
753 
754  for (tli = first; tli <= last; tli++)
755  {
756  /* there's no history file for timeline 1 */
757  if (tli != 1 && !existsTimeLineHistory(tli))
758  {
759  char *fname;
760  char *content;
761  int len;
762  char expectedfname[MAXFNAMELEN];
763 
764  ereport(LOG,
765  (errmsg("fetching timeline history file for timeline %u from primary server",
766  tli)));
767 
768  walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
769 
770  /*
771  * Check that the filename on the primary matches what we
772  * calculated ourselves. This is just a sanity check, it should
773  * always match.
774  */
775  TLHistoryFileName(expectedfname, tli);
776  if (strcmp(fname, expectedfname) != 0)
777  ereport(ERROR,
778  (errcode(ERRCODE_PROTOCOL_VIOLATION),
779  errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
780  tli)));
781 
782  /*
783  * Write the file to pg_wal.
784  */
785  writeTimeLineHistoryFile(tli, content, len);
786 
787  /*
788  * Mark the streamed history file as ready for archiving if
789  * archive_mode is always.
790  */
792  XLogArchiveForceDone(fname);
793  else
794  XLogArchiveNotify(fname);
795 
796  pfree(fname);
797  pfree(content);
798  }
799  }
800 }
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition: timeline.c:463
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition: timeline.c:222
int errmsg_internal(const char *fmt,...)
Definition: elog.c:993
#define LOG
Definition: elog.h:27
const void size_t len
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
Definition: walreceiver.h:420
int XLogArchiveMode
Definition: xlog.c:122
@ ARCHIVE_MODE_ALWAYS
Definition: xlog.h:62
#define MAXFNAMELEN
static void TLHistoryFileName(char *fname, TimeLineID tli)
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:542
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:476

References ARCHIVE_MODE_ALWAYS, ereport, errcode(), errmsg(), errmsg_internal(), ERROR, existsTimeLineHistory(), len, LOG, MAXFNAMELEN, pfree(), TLHistoryFileName(), walrcv_readtimelinehistoryfile, wrconn, writeTimeLineHistoryFile(), XLogArchiveForceDone(), XLogArchiveMode, and XLogArchiveNotify().

Referenced by WalReceiverMain().

◆ WalRcvForceReply()

void WalRcvForceReply ( void  )

Definition at line 1358 of file walreceiver.c.

1359 {
1360  Latch *latch;
1361 
1362  WalRcv->force_reply = true;
1363  /* fetching the latch pointer might not be atomic, so use spinlock */
1365  latch = WalRcv->latch;
1367  if (latch)
1368  SetLatch(latch);
1369 }
void SetLatch(Latch *latch)
Definition: latch.c:591
Definition: latch.h:111
sig_atomic_t force_reply
Definition: walreceiver.h:162

References WalRcvData::force_reply, WalRcvData::latch, WalRcvData::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by ApplyWalRecord(), and WaitForWALToBecomeAvailable().

◆ WalRcvGetStateString()

static const char* WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1376 of file walreceiver.c.

1377 {
1378  switch (state)
1379  {
1380  case WALRCV_STOPPED:
1381  return "stopped";
1382  case WALRCV_STARTING:
1383  return "starting";
1384  case WALRCV_STREAMING:
1385  return "streaming";
1386  case WALRCV_WAITING:
1387  return "waiting";
1388  case WALRCV_RESTARTING:
1389  return "restarting";
1390  case WALRCV_STOPPING:
1391  return "stopping";
1392  }
1393  return "UNKNOWN";
1394 }

References WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

◆ WalRcvWaitForStartPosition()

static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 670 of file walreceiver.c.

671 {
672  WalRcvData *walrcv = WalRcv;
673  int state;
674 
675  SpinLockAcquire(&walrcv->mutex);
676  state = walrcv->walRcvState;
677  if (state != WALRCV_STREAMING)
678  {
679  SpinLockRelease(&walrcv->mutex);
680  if (state == WALRCV_STOPPING)
681  proc_exit(0);
682  else
683  elog(FATAL, "unexpected walreceiver state");
684  }
685  walrcv->walRcvState = WALRCV_WAITING;
687  walrcv->receiveStartTLI = 0;
688  SpinLockRelease(&walrcv->mutex);
689 
690  set_ps_display("idle");
691 
692  /*
693  * nudge startup process to notice that we've stopped streaming and are
694  * now waiting for instructions.
695  */
696  WakeupRecovery();
697  for (;;)
698  {
700 
702 
703  SpinLockAcquire(&walrcv->mutex);
704  Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
705  walrcv->walRcvState == WALRCV_WAITING ||
706  walrcv->walRcvState == WALRCV_STOPPING);
707  if (walrcv->walRcvState == WALRCV_RESTARTING)
708  {
709  /*
710  * No need to handle changes in primary_conninfo or
711  * primary_slot_name here. Startup process will signal us to
712  * terminate in case those change.
713  */
714  *startpoint = walrcv->receiveStart;
715  *startpointTLI = walrcv->receiveStartTLI;
716  walrcv->walRcvState = WALRCV_STREAMING;
717  SpinLockRelease(&walrcv->mutex);
718  break;
719  }
720  if (walrcv->walRcvState == WALRCV_STOPPING)
721  {
722  /*
723  * We should've received SIGTERM if the startup process wants us
724  * to die, but might as well check it here too.
725  */
726  SpinLockRelease(&walrcv->mutex);
727  exit(1);
728  }
729  SpinLockRelease(&walrcv->mutex);
730 
733  }
734 
736  {
737  char activitymsg[50];
738 
739  snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
740  LSN_FORMAT_ARGS(*startpoint));
741  set_ps_display(activitymsg);
742  }
743 }
struct Latch * MyLatch
Definition: globals.c:58
void proc_exit(int code)
Definition: ipc.c:104
void ResetLatch(Latch *latch)
Definition: latch.c:683
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:476
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
exit(1)
#define snprintf
Definition: port.h:238
bool update_process_title
Definition: ps_status.c:35
void set_ps_display(const char *activity)
Definition: ps_status.c:342
@ WAIT_EVENT_WAL_RECEIVER_WAIT_START
Definition: wait_event.h:130
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:166
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert(), elog(), exit(), FATAL, InvalidXLogRecPtr, LSN_FORMAT_ARGS, WalRcvData::mutex, MyLatch, proc_exit(), ProcessWalRcvInterrupts(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, ResetLatch(), set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WAIT_EVENT_WAL_RECEIVER_WAIT_START, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by WalReceiverMain().

◆ WalReceiverMain()

void WalReceiverMain ( void  )

Definition at line 187 of file walreceiver.c.

188 {
189  char conninfo[MAXCONNINFO];
190  char *tmp_conninfo;
191  char slotname[NAMEDATALEN];
192  bool is_temp_slot;
193  XLogRecPtr startpoint;
194  TimeLineID startpointTLI;
195  TimeLineID primaryTLI;
196  bool first_stream;
197  WalRcvData *walrcv = WalRcv;
199  char *err;
200  char *sender_host = NULL;
201  int sender_port = 0;
202 
203  /*
204  * WalRcv should be set up already (if we are a backend, we inherit this
205  * by fork() or EXEC_BACKEND mechanism from the postmaster).
206  */
207  Assert(walrcv != NULL);
208 
210 
211  /*
212  * Mark walreceiver as running in shared memory.
213  *
214  * Do this as early as possible, so that if we fail later on, we'll set
215  * state to STOPPED. If we die before this, the startup process will keep
216  * waiting for us to start up, until it times out.
217  */
218  SpinLockAcquire(&walrcv->mutex);
219  Assert(walrcv->pid == 0);
220  switch (walrcv->walRcvState)
221  {
222  case WALRCV_STOPPING:
223  /* If we've already been requested to stop, don't start up. */
224  walrcv->walRcvState = WALRCV_STOPPED;
225  /* fall through */
226 
227  case WALRCV_STOPPED:
228  SpinLockRelease(&walrcv->mutex);
230  proc_exit(1);
231  break;
232 
233  case WALRCV_STARTING:
234  /* The usual case */
235  break;
236 
237  case WALRCV_WAITING:
238  case WALRCV_STREAMING:
239  case WALRCV_RESTARTING:
240  default:
241  /* Shouldn't happen */
242  SpinLockRelease(&walrcv->mutex);
243  elog(PANIC, "walreceiver still running according to shared memory state");
244  }
245  /* Advertise our PID so that the startup process can kill us */
246  walrcv->pid = MyProcPid;
247  walrcv->walRcvState = WALRCV_STREAMING;
248 
249  /* Fetch information required to start streaming */
250  walrcv->ready_to_display = false;
251  strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
252  strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
253  is_temp_slot = walrcv->is_temp_slot;
254  startpoint = walrcv->receiveStart;
255  startpointTLI = walrcv->receiveStartTLI;
256 
257  /*
258  * At most one of is_temp_slot and slotname can be set; otherwise,
259  * RequestXLogStreaming messed up.
260  */
261  Assert(!is_temp_slot || (slotname[0] == '\0'));
262 
263  /* Initialise to a sanish value */
264  walrcv->lastMsgSendTime =
265  walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
266 
267  /* Report the latch to use to awaken this process */
268  walrcv->latch = &MyProc->procLatch;
269 
270  SpinLockRelease(&walrcv->mutex);
271 
273 
274  /* Arrange to clean up at walreceiver exit */
275  on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
276 
277  /* Properly accept or ignore signals the postmaster might send us */
278  pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
279  * file */
280  pqsignal(SIGINT, SIG_IGN);
281  pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
282  /* SIGQUIT handler was already set up by InitPostmasterChild */
287 
288  /* Reset some signals that are accepted by postmaster but not here */
290 
291  /* Load the libpq-specific functions */
292  load_file("libpqwalreceiver", false);
293  if (WalReceiverFunctions == NULL)
294  elog(ERROR, "libpqwalreceiver didn't initialize correctly");
295 
296  /* Unblock signals (they were blocked when the postmaster forked us) */
298 
299  /* Establish the connection to the primary for XLOG streaming */
300  wrconn = walrcv_connect(conninfo, false,
301  cluster_name[0] ? cluster_name : "walreceiver",
302  &err);
303  if (!wrconn)
304  ereport(ERROR,
305  (errcode(ERRCODE_CONNECTION_FAILURE),
306  errmsg("could not connect to the primary server: %s", err)));
307 
308  /*
309  * Save user-visible connection string. This clobbers the original
310  * conninfo, for security. Also save host and port of the sender server
311  * this walreceiver is connected to.
312  */
313  tmp_conninfo = walrcv_get_conninfo(wrconn);
314  walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
315  SpinLockAcquire(&walrcv->mutex);
316  memset(walrcv->conninfo, 0, MAXCONNINFO);
317  if (tmp_conninfo)
318  strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
319 
320  memset(walrcv->sender_host, 0, NI_MAXHOST);
321  if (sender_host)
322  strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
323 
324  walrcv->sender_port = sender_port;
325  walrcv->ready_to_display = true;
326  SpinLockRelease(&walrcv->mutex);
327 
328  if (tmp_conninfo)
329  pfree(tmp_conninfo);
330 
331  if (sender_host)
332  pfree(sender_host);
333 
334  first_stream = true;
335  for (;;)
336  {
337  char *primary_sysid;
338  char standby_sysid[32];
340 
341  /*
342  * Check that we're connected to a valid server using the
343  * IDENTIFY_SYSTEM replication command.
344  */
345  primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
346 
347  snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
349  if (strcmp(primary_sysid, standby_sysid) != 0)
350  {
351  ereport(ERROR,
352  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
353  errmsg("database system identifier differs between the primary and standby"),
354  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
355  primary_sysid, standby_sysid)));
356  }
357 
358  /*
359  * Confirm that the current timeline of the primary is the same or
360  * ahead of ours.
361  */
362  if (primaryTLI < startpointTLI)
363  ereport(ERROR,
364  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
365  errmsg("highest timeline %u of the primary is behind recovery timeline %u",
366  primaryTLI, startpointTLI)));
367 
368  /*
369  * Get any missing history files. We do this always, even when we're
370  * not interested in that timeline, so that if we're promoted to
371  * become the primary later on, we don't select the same timeline that
372  * was already used in the current primary. This isn't bullet-proof -
373  * you'll need some external software to manage your cluster if you
374  * need to ensure that a unique timeline id is chosen in every case,
375  * but let's avoid the confusion of timeline id collisions where we
376  * can.
377  */
378  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
379 
380  /*
381  * Create temporary replication slot if requested, and update slot
382  * name in shared memory. (Note the slot name cannot already be set
383  * in this case.)
384  */
385  if (is_temp_slot)
386  {
387  snprintf(slotname, sizeof(slotname),
388  "pg_walreceiver_%lld",
389  (long long int) walrcv_get_backend_pid(wrconn));
390 
391  walrcv_create_slot(wrconn, slotname, true, false, 0, NULL);
392 
393  SpinLockAcquire(&walrcv->mutex);
394  strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
395  SpinLockRelease(&walrcv->mutex);
396  }
397 
398  /*
399  * Start streaming.
400  *
401  * We'll try to start at the requested starting point and timeline,
402  * even if it's different from the server's latest timeline. In case
403  * we've already reached the end of the old timeline, the server will
404  * finish the streaming immediately, and we will go back to await
405  * orders from the startup process. If recovery_target_timeline is
406  * 'latest', the startup process will scan pg_wal and find the new
407  * history file, bump recovery target timeline, and ask us to restart
408  * on the new timeline.
409  */
410  options.logical = false;
411  options.startpoint = startpoint;
412  options.slotname = slotname[0] != '\0' ? slotname : NULL;
413  options.proto.physical.startpointTLI = startpointTLI;
415  {
416  if (first_stream)
417  ereport(LOG,
418  (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
419  LSN_FORMAT_ARGS(startpoint), startpointTLI)));
420  else
421  ereport(LOG,
422  (errmsg("restarted WAL streaming at %X/%X on timeline %u",
423  LSN_FORMAT_ARGS(startpoint), startpointTLI)));
424  first_stream = false;
425 
426  /* Initialize LogstreamResult and buffers for processing messages */
430 
431  /* Initialize nap wakeup times. */
433  for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
435 
436  /* Send initial reply/feedback messages. */
437  XLogWalRcvSendReply(true, false);
439 
440  /* Loop until end-of-streaming or error */
441  for (;;)
442  {
443  char *buf;
444  int len;
445  bool endofwal = false;
446  pgsocket wait_fd = PGINVALID_SOCKET;
447  int rc;
448  TimestampTz nextWakeup;
449  int nap;
450 
451  /*
452  * Exit walreceiver if we're not in recovery. This should not
453  * happen, but cross-check the status here.
454  */
455  if (!RecoveryInProgress())
456  ereport(FATAL,
457  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
458  errmsg("cannot continue WAL streaming, recovery has already ended")));
459 
460  /* Process any requests or signals received recently */
462 
464  {
465  ConfigReloadPending = false;
468  for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
471  }
472 
473  /* See if we can read data immediately */
474  len = walrcv_receive(wrconn, &buf, &wait_fd);
476  if (len != 0)
477  {
478  /*
479  * Process the received data, and any subsequent data we
480  * can read without blocking.
481  */
482  for (;;)
483  {
484  if (len > 0)
485  {
486  /*
487  * Something was received from primary, so adjust
488  * the ping and terminate wakeup times.
489  */
491  now);
493  XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
494  startpointTLI);
495  }
496  else if (len == 0)
497  break;
498  else if (len < 0)
499  {
500  ereport(LOG,
501  (errmsg("replication terminated by primary server"),
502  errdetail("End of WAL reached on timeline %u at %X/%X.",
503  startpointTLI,
505  endofwal = true;
506  break;
507  }
508  len = walrcv_receive(wrconn, &buf, &wait_fd);
510  }
511 
512  /* Let the primary know that we received some data. */
513  XLogWalRcvSendReply(false, false);
514 
515  /*
516  * If we've written some records, flush them to disk and
517  * let the startup process and primary server know about
518  * them.
519  */
520  XLogWalRcvFlush(false, startpointTLI);
521  }
522 
523  /* Check if we need to exit the streaming loop. */
524  if (endofwal)
525  break;
526 
527  /* Find the soonest wakeup time, to limit our nap. */
528  nextWakeup = PG_INT64_MAX;
529  for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
530  nextWakeup = Min(wakeup[i], nextWakeup);
531 
532  /*
533  * Calculate the nap time. WaitLatchOrSocket() doesn't accept
534  * timeouts longer than INT_MAX milliseconds, so we limit the
535  * result accordingly. Also, we round up to the next
536  * millisecond to avoid waking up too early and spinning until
537  * one of the wakeup times.
538  */
539  nap = (int) Min(INT_MAX, Max(0, (nextWakeup - now + 999) / 1000));
540 
541  /*
542  * Ideally we would reuse a WaitEventSet object repeatedly
543  * here to avoid the overheads of WaitLatchOrSocket on epoll
544  * systems, but we can't be sure that libpq (or any other
545  * walreceiver implementation) has the same socket (even if
546  * the fd is the same number, it may have been closed and
547  * reopened since the last time). In future, if there is a
548  * function for removing sockets from WaitEventSet, then we
549  * could add and remove just the socket each time, potentially
550  * avoiding some system calls.
551  */
552  Assert(wait_fd != PGINVALID_SOCKET);
556  wait_fd,
557  nap,
560  if (rc & WL_LATCH_SET)
561  {
564 
565  if (walrcv->force_reply)
566  {
567  /*
568  * The recovery process has asked us to send apply
569  * feedback now. Make sure the flag is really set to
570  * false in shared memory before sending the reply, so
571  * we don't miss a new request for a reply.
572  */
573  walrcv->force_reply = false;
575  XLogWalRcvSendReply(true, false);
576  }
577  }
578  if (rc & WL_TIMEOUT)
579  {
580  /*
581  * We didn't receive anything new. If we haven't heard
582  * anything from the server for more than
583  * wal_receiver_timeout / 2, ping the server. Also, if
584  * it's been longer than wal_receiver_status_interval
585  * since the last update we sent, send a status update to
586  * the primary anyway, to report any progress in applying
587  * WAL.
588  */
589  bool requestReply = false;
590 
591  /*
592  * Check if time since last receive from primary has
593  * reached the configured limit.
594  */
596  ereport(ERROR,
597  (errcode(ERRCODE_CONNECTION_FAILURE),
598  errmsg("terminating walreceiver due to timeout")));
599 
600  /*
601  * We didn't receive anything new, for half of receiver
602  * replication timeout. Ping the server.
603  */
604  if (now >= wakeup[WALRCV_WAKEUP_PING])
605  {
606  requestReply = true;
608  }
609 
610  XLogWalRcvSendReply(requestReply, requestReply);
612  }
613  }
614 
615  /*
616  * The backend finished streaming. Exit streaming COPY-mode from
617  * our side, too.
618  */
619  walrcv_endstreaming(wrconn, &primaryTLI);
620 
621  /*
622  * If the server had switched to a new timeline that we didn't
623  * know about when we began streaming, fetch its timeline history
624  * file now.
625  */
626  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
627  }
628  else
629  ereport(LOG,
630  (errmsg("primary server contains no more WAL on requested timeline %u",
631  startpointTLI)));
632 
633  /*
634  * End of WAL reached on the requested timeline. Close the last
635  * segment, and await for new orders from the startup process.
636  */
637  if (recvFile >= 0)
638  {
639  char xlogfname[MAXFNAMELEN];
640 
641  XLogWalRcvFlush(false, startpointTLI);
643  if (close(recvFile) != 0)
644  ereport(PANIC,
646  errmsg("could not close WAL segment %s: %m",
647  xlogfname)));
648 
649  /*
650  * Create .done file forcibly to prevent the streamed segment from
651  * being archived later.
652  */
654  XLogArchiveForceDone(xlogfname);
655  else
656  XLogArchiveNotify(xlogfname);
657  }
658  recvFile = -1;
659 
660  elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
661  WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
662  }
663  /* not reached */
664 }
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:433
#define pg_memory_barrier()
Definition: atomics.h:140
sigset_t UnBlockSig
Definition: pqsignal.c:22
#define Min(x, y)
Definition: c.h:937
#define Max(x, y)
Definition: c.h:931
#define UINT64_FORMAT
Definition: c.h:485
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:144
int errcode_for_file_access(void)
Definition: elog.c:718
int errdetail(const char *fmt,...)
Definition: elog.c:1039
#define PANIC
Definition: elog.h:38
#define DEBUG1
Definition: elog.h:26
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
char * cluster_name
Definition: guc_tables.c:502
#define close(a)
Definition: win32.h:12
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition: interrupt.c:108
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
int i
Definition: isn.c:73
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:524
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_TIMEOUT
Definition: latch.h:128
static char ** options
static char * buf
Definition: pg_test_fsync.c:67
pqsigfunc pqsignal(int signo, pqsigfunc func)
int pgsocket
Definition: port.h:29
#define PGINVALID_SOCKET
Definition: port.h:31
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:670
#define PG_SETMASK(mask)
Definition: pqsignal.h:18
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:638
PGPROC * MyProc
Definition: proc.c:68
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Latch procLatch
Definition: proc.h:170
bool is_temp_slot
Definition: walreceiver.h:133
@ WAIT_EVENT_WAL_RECEIVER_MAIN
Definition: wait_event.h:47
static StringInfoData reply_message
Definition: walreceiver.c:134
static int recvFile
Definition: walreceiver.c:103
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
Definition: walreceiver.c:750
static StringInfoData incoming_message
Definition: walreceiver.c:135
static struct @18 LogstreamResult
static TimeLineID recvFileTLI
Definition: walreceiver.c:104
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:96
static XLogSegNo recvSegNo
Definition: walreceiver.c:105
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1170
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
Definition: walreceiver.c:670
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
Definition: walreceiver.c:844
static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
Definition: walreceiver.c:1316
static void WalRcvDie(int code, Datum arg)
Definition: walreceiver.c:806
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1101
#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
Definition: walreceiver.h:430
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:422
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:408
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
Definition: walreceiver.h:414
#define walrcv_get_conninfo(conn)
Definition: walreceiver.h:412
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:424
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:416
#define walrcv_get_backend_pid(conn)
Definition: walreceiver.h:432
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:426
#define SIGCHLD
Definition: win32_port.h:186
#define SIGHUP
Definition: win32_port.h:176
#define SIG_DFL
Definition: win32_port.h:171
#define SIGPIPE
Definition: win32_port.h:181
#define SIGUSR1
Definition: win32_port.h:188
#define SIGALRM
Definition: win32_port.h:182
#define SIGUSR2
Definition: win32_port.h:189
#define SIG_IGN
Definition: win32_port.h:173
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4181
bool RecoveryInProgress(void)
Definition: xlog.c:5912
int wal_segment_size
Definition: xlog.c:146
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References ARCHIVE_MODE_ALWAYS, Assert(), buf, close, cluster_name, ConditionVariableBroadcast(), ConfigReloadPending, WalRcvData::conninfo, DEBUG1, elog(), ereport, errcode(), errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, WalRcvData::force_reply, GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), i, incoming_message, initStringInfo(), WalRcvData::is_temp_slot, WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latch, WalRcvData::latestWalEndTime, len, load_file(), LOG, LogstreamResult, LSN_FORMAT_ARGS, Max, MAXCONNINFO, MAXFNAMELEN, Min, WalRcvData::mutex, MyLatch, MyProc, MyProcPid, NAMEDATALEN, now(), NUM_WALRCV_WAKEUPS, on_shmem_exit(), options, PANIC, pfree(), pg_atomic_write_u64(), PG_INT64_MAX, pg_memory_barrier, PG_SETMASK, PGC_SIGHUP, PGINVALID_SOCKET, WalRcvData::pid, PointerGetDatum(), pqsignal(), proc_exit(), ProcessConfigFile(), ProcessWalRcvInterrupts(), PGPROC::procLatch, procsignal_sigusr1_handler(), WalRcvData::ready_to_display, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, reply_message, ResetLatch(), WalRcvData::sender_host, WalRcvData::sender_port, SIG_DFL, SIG_IGN, SIGALRM, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SIGPIPE, SIGUSR1, SIGUSR2, WalRcvData::slotname, snprintf, SpinLockAcquire, SpinLockRelease, strlcpy(), UINT64_FORMAT, UnBlockSig, WAIT_EVENT_WAL_RECEIVER_MAIN, WaitLatchOrSocket(), wakeup, wal_segment_size, WalRcv, walrcv_connect, walrcv_create_slot, walrcv_endstreaming, walrcv_get_backend_pid, walrcv_get_conninfo, walrcv_get_senderinfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_TERMINATE, WalRcvComputeNextWakeup(), WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, WalRcvWaitForStartPosition(), WalReceiverFunctions, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_TIMEOUT, wrconn, WalRcvData::writtenUpto, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName(), XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by AuxiliaryProcessMain().

◆ XLogWalRcvClose()

static void XLogWalRcvClose ( XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 1049 of file walreceiver.c.

1050 {
1051  char xlogfname[MAXFNAMELEN];
1052 
1054  Assert(tli != 0);
1055 
1056  /*
1057  * fsync() and close current file before we switch to next one. We would
1058  * otherwise have to reopen this file to fsync it later
1059  */
1060  XLogWalRcvFlush(false, tli);
1061 
1063 
1064  /*
1065  * XLOG segment files will be re-read by recovery in startup process soon,
1066  * so we don't advise the OS to release cache pages associated with the
1067  * file like XLogFileClose() does.
1068  */
1069  if (close(recvFile) != 0)
1070  ereport(PANIC,
1072  errmsg("could not close WAL segment %s: %m",
1073  xlogfname)));
1074 
1075  /*
1076  * Create .done file forcibly to prevent the streamed segment from being
1077  * archived later.
1078  */
1080  XLogArchiveForceDone(xlogfname);
1081  else
1082  XLogArchiveNotify(xlogfname);
1083 
1084  recvFile = -1;
1085 }
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

References ARCHIVE_MODE_ALWAYS, Assert(), close, ereport, errcode_for_file_access(), errmsg(), MAXFNAMELEN, PANIC, recvFile, recvFileTLI, recvSegNo, wal_segment_size, XLByteInSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName(), and XLogWalRcvFlush().

Referenced by XLogWalRcvWrite().

◆ XLogWalRcvFlush()

static void XLogWalRcvFlush ( bool  dying,
TimeLineID  tli 
)
static

Definition at line 994 of file walreceiver.c.

995 {
996  Assert(tli != 0);
997 
998  if (LogstreamResult.Flush < LogstreamResult.Write)
999  {
1000  WalRcvData *walrcv = WalRcv;
1001 
1003 
1004  LogstreamResult.Flush = LogstreamResult.Write;
1005 
1006  /* Update shared-memory status */
1007  SpinLockAcquire(&walrcv->mutex);
1008  if (walrcv->flushedUpto < LogstreamResult.Flush)
1009  {
1010  walrcv->latestChunkStart = walrcv->flushedUpto;
1011  walrcv->flushedUpto = LogstreamResult.Flush;
1012  walrcv->receivedTLI = tli;
1013  }
1014  SpinLockRelease(&walrcv->mutex);
1015 
1016  /* Signal the startup process and walsender that new WAL has arrived */
1017  WakeupRecovery();
1019  WalSndWakeup();
1020 
1021  /* Report XLOG streaming progress in PS display */
1023  {
1024  char activitymsg[50];
1025 
1026  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1028  set_ps_display(activitymsg);
1029  }
1030 
1031  /* Also let the primary know that we made some progress */
1032  if (!dying)
1033  {
1034  XLogWalRcvSendReply(false, false);
1035  XLogWalRcvSendHSFeedback(false);
1036  }
1037  }
1038 }
XLogRecPtr latestChunkStart
Definition: walreceiver.h:96
#define AllowCascadeReplication()
Definition: walreceiver.h:42
void WalSndWakeup(void)
Definition: walsender.c:3296
void issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:8138

References AllowCascadeReplication, Assert(), WalRcvData::flushedUpto, issue_xlog_fsync(), WalRcvData::latestChunkStart, LogstreamResult, LSN_FORMAT_ARGS, WalRcvData::mutex, WalRcvData::receivedTLI, recvFile, recvSegNo, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvClose().

◆ XLogWalRcvProcessMsg()

static void XLogWalRcvProcessMsg ( unsigned char  type,
char *  buf,
Size  len,
TimeLineID  tli 
)
static

Definition at line 844 of file walreceiver.c.

845 {
846  int hdrlen;
847  XLogRecPtr dataStart;
848  XLogRecPtr walEnd;
849  TimestampTz sendTime;
850  bool replyRequested;
851 
853 
854  switch (type)
855  {
856  case 'w': /* WAL records */
857  {
858  /* copy message to StringInfo */
859  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
860  if (len < hdrlen)
861  ereport(ERROR,
862  (errcode(ERRCODE_PROTOCOL_VIOLATION),
863  errmsg_internal("invalid WAL message received from primary")));
865 
866  /* read the fields */
867  dataStart = pq_getmsgint64(&incoming_message);
868  walEnd = pq_getmsgint64(&incoming_message);
869  sendTime = pq_getmsgint64(&incoming_message);
870  ProcessWalSndrMessage(walEnd, sendTime);
871 
872  buf += hdrlen;
873  len -= hdrlen;
874  XLogWalRcvWrite(buf, len, dataStart, tli);
875  break;
876  }
877  case 'k': /* Keepalive */
878  {
879  /* copy message to StringInfo */
880  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
881  if (len != hdrlen)
882  ereport(ERROR,
883  (errcode(ERRCODE_PROTOCOL_VIOLATION),
884  errmsg_internal("invalid keepalive message received from primary")));
886 
887  /* read the fields */
888  walEnd = pq_getmsgint64(&incoming_message);
889  sendTime = pq_getmsgint64(&incoming_message);
890  replyRequested = pq_getmsgbyte(&incoming_message);
891 
892  ProcessWalSndrMessage(walEnd, sendTime);
893 
894  /* If the primary requested a reply, send one immediately */
895  if (replyRequested)
896  XLogWalRcvSendReply(true, false);
897  break;
898  }
899  default:
900  ereport(ERROR,
901  (errcode(ERRCODE_PROTOCOL_VIOLATION),
902  errmsg_internal("invalid replication message type %d",
903  type)));
904  }
905 }
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
Definition: walreceiver.c:1266
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
Definition: walreceiver.c:911

References appendBinaryStringInfo(), buf, ereport, errcode(), errmsg_internal(), ERROR, incoming_message, len, pq_getmsgbyte(), pq_getmsgint64(), ProcessWalSndrMessage(), resetStringInfo(), generate_unaccent_rules::type, XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

◆ XLogWalRcvSendHSFeedback()

static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1170 of file walreceiver.c.

1171 {
1172  TimestampTz now;
1173  FullTransactionId nextFullXid;
1174  TransactionId nextXid;
1175  uint32 xmin_epoch,
1176  catalog_xmin_epoch;
1177  TransactionId xmin,
1178  catalog_xmin;
1179 
1180  /* initially true so we always send at least one feedback message */
1181  static bool primary_has_standby_xmin = true;
1182 
1183  /*
1184  * If the user doesn't want status to be reported to the primary, be sure
1185  * to exit before doing anything at all.
1186  */
1188  !primary_has_standby_xmin)
1189  return;
1190 
1191  /* Get current timestamp. */
1193 
1194  /* Send feedback at most once per wal_receiver_status_interval. */
1195  if (!immed && now < wakeup[WALRCV_WAKEUP_HSFEEDBACK])
1196  return;
1197 
1198  /* Make sure we wake up when it's time to send feedback again. */
1200 
1201  /*
1202  * If Hot Standby is not yet accepting connections there is nothing to
1203  * send. Check this after the interval has expired to reduce number of
1204  * calls.
1205  *
1206  * Bailing out here also ensures that we don't send feedback until we've
1207  * read our own replication slot state, so we don't tell the primary to
1208  * discard needed xmin or catalog_xmin from any slots that may exist on
1209  * this replica.
1210  */
1211  if (!HotStandbyActive())
1212  return;
1213 
1214  /*
1215  * Make the expensive call to get the oldest xmin once we are certain
1216  * everything else has been checked.
1217  */
1219  {
1220  GetReplicationHorizons(&xmin, &catalog_xmin);
1221  }
1222  else
1223  {
1224  xmin = InvalidTransactionId;
1225  catalog_xmin = InvalidTransactionId;
1226  }
1227 
1228  /*
1229  * Get epoch and adjust if nextXid and oldestXmin are different sides of
1230  * the epoch boundary.
1231  */
1232  nextFullXid = ReadNextFullTransactionId();
1233  nextXid = XidFromFullTransactionId(nextFullXid);
1234  xmin_epoch = EpochFromFullTransactionId(nextFullXid);
1235  catalog_xmin_epoch = xmin_epoch;
1236  if (nextXid < xmin)
1237  xmin_epoch--;
1238  if (nextXid < catalog_xmin)
1239  catalog_xmin_epoch--;
1240 
1241  elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1242  xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1243 
1244  /* Construct the message and send it. */
1246  pq_sendbyte(&reply_message, 'h');
1248  pq_sendint32(&reply_message, xmin);
1249  pq_sendint32(&reply_message, xmin_epoch);
1250  pq_sendint32(&reply_message, catalog_xmin);
1251  pq_sendint32(&reply_message, catalog_xmin_epoch);
1253  if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1254  primary_has_standby_xmin = true;
1255  else
1256  primary_has_standby_xmin = false;
1257 }
unsigned int uint32
Definition: c.h:442
uint32 TransactionId
Definition: c.h:588
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:2044
#define InvalidTransactionId
Definition: transam.h:31
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
#define TransactionIdIsValid(xid)
Definition: transam.h:41
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:261
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:428
bool HotStandbyActive(void)

References StringInfoData::data, DEBUG2, elog(), EpochFromFullTransactionId, GetCurrentTimestamp(), GetReplicationHorizons(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), pq_sendbyte(), pq_sendint32(), pq_sendint64(), ReadNextFullTransactionId(), reply_message, resetStringInfo(), TransactionIdIsValid, wakeup, wal_receiver_status_interval, walrcv_send, WALRCV_WAKEUP_HSFEEDBACK, WalRcvComputeNextWakeup(), wrconn, and XidFromFullTransactionId.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

◆ XLogWalRcvSendReply()

static void XLogWalRcvSendReply ( bool  force,
bool  requestReply 
)
static

Definition at line 1101 of file walreceiver.c.

1102 {
1103  static XLogRecPtr writePtr = 0;
1104  static XLogRecPtr flushPtr = 0;
1105  XLogRecPtr applyPtr;
1106  TimestampTz now;
1107 
1108  /*
1109  * If the user doesn't want status to be reported to the primary, be sure
1110  * to exit before doing anything at all.
1111  */
1112  if (!force && wal_receiver_status_interval <= 0)
1113  return;
1114 
1115  /* Get current timestamp. */
1117 
1118  /*
1119  * We can compare the write and flush positions to the last message we
1120  * sent without taking any lock, but the apply position requires a spin
1121  * lock, so we don't check that unless something else has changed or 10
1122  * seconds have passed. This means that the apply WAL location will
1123  * appear, from the primary's point of view, to lag slightly, but since
1124  * this is only for reporting purposes and only on idle systems, that's
1125  * probably OK.
1126  */
1127  if (!force
1128  && writePtr == LogstreamResult.Write
1129  && flushPtr == LogstreamResult.Flush
1131  return;
1132 
1133  /* Make sure we wake up when it's time to send another reply. */
1135 
1136  /* Construct a new message */
1137  writePtr = LogstreamResult.Write;
1138  flushPtr = LogstreamResult.Flush;
1139  applyPtr = GetXLogReplayRecPtr(NULL);
1140 
1142  pq_sendbyte(&reply_message, 'r');
1143  pq_sendint64(&reply_message, writePtr);
1144  pq_sendint64(&reply_message, flushPtr);
1145  pq_sendint64(&reply_message, applyPtr);
1147  pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1148 
1149  /* Send it */
1150  elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1151  LSN_FORMAT_ARGS(writePtr),
1152  LSN_FORMAT_ARGS(flushPtr),
1153  LSN_FORMAT_ARGS(applyPtr),
1154  requestReply ? " (reply requested)" : "");
1155 
1157 }

References StringInfoData::data, DEBUG2, elog(), GetCurrentTimestamp(), GetXLogReplayRecPtr(), StringInfoData::len, LogstreamResult, LSN_FORMAT_ARGS, now(), pq_sendbyte(), pq_sendint64(), reply_message, resetStringInfo(), wakeup, wal_receiver_status_interval, walrcv_send, WALRCV_WAKEUP_REPLY, WalRcvComputeNextWakeup(), and wrconn.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

◆ XLogWalRcvWrite()

static void XLogWalRcvWrite ( char *  buf,
Size  nbytes,
XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 911 of file walreceiver.c.

912 {
913  int startoff;
914  int byteswritten;
915 
916  Assert(tli != 0);
917 
918  while (nbytes > 0)
919  {
920  int segbytes;
921 
922  /* Close the current segment if it's completed */
923  if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
924  XLogWalRcvClose(recptr, tli);
925 
926  if (recvFile < 0)
927  {
928  /* Create/use new log file */
931  recvFileTLI = tli;
932  }
933 
934  /* Calculate the start offset of the received logs */
935  startoff = XLogSegmentOffset(recptr, wal_segment_size);
936 
937  if (startoff + nbytes > wal_segment_size)
938  segbytes = wal_segment_size - startoff;
939  else
940  segbytes = nbytes;
941 
942  /* OK to write the logs */
943  errno = 0;
944 
945  byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
946  if (byteswritten <= 0)
947  {
948  char xlogfname[MAXFNAMELEN];
949  int save_errno;
950 
951  /* if write didn't set errno, assume no disk space */
952  if (errno == 0)
953  errno = ENOSPC;
954 
955  save_errno = errno;
957  errno = save_errno;
958  ereport(PANIC,
960  errmsg("could not write to WAL segment %s "
961  "at offset %u, length %lu: %m",
962  xlogfname, startoff, (unsigned long) segbytes)));
963  }
964 
965  /* Update state for write */
966  recptr += byteswritten;
967 
968  nbytes -= byteswritten;
969  buf += byteswritten;
970 
971  LogstreamResult.Write = recptr;
972  }
973 
974  /* Update shared-memory status */
976 
977  /*
978  * Close the current segment if it's fully written up in the last cycle of
979  * the loop, to create its archive notification file soon. Otherwise WAL
980  * archiving of the segment will be delayed until any data in the next
981  * segment is received and written.
982  */
983  if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
984  XLogWalRcvClose(recptr, tli);
985 }
#define pg_pwrite
Definition: port.h:226
static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
Definition: walreceiver.c:1049
int XLogFileInit(XLogSegNo logsegno, TimeLineID logtli)
Definition: xlog.c:3085
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

References Assert(), buf, ereport, errcode_for_file_access(), errmsg(), LogstreamResult, MAXFNAMELEN, PANIC, pg_atomic_write_u64(), pg_pwrite, recvFile, recvFileTLI, recvSegNo, wal_segment_size, WalRcv, WalRcvData::writtenUpto, XLByteInSeg, XLByteToSeg, XLogFileInit(), XLogFileName(), XLogSegmentOffset, and XLogWalRcvClose().

Referenced by XLogWalRcvProcessMsg().

Variable Documentation

◆ Flush

XLogRecPtr Flush

Definition at line 114 of file walreceiver.c.

◆ hot_standby_feedback

bool hot_standby_feedback

Definition at line 92 of file walreceiver.c.

Referenced by WalRcvComputeNextWakeup(), and XLogWalRcvSendHSFeedback().

◆ incoming_message

StringInfoData incoming_message
static

Definition at line 135 of file walreceiver.c.

Referenced by WalReceiverMain(), and XLogWalRcvProcessMsg().

◆ 

struct { ... } LogstreamResult

◆ recvFile

int recvFile = -1
static

Definition at line 103 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ recvFileTLI

TimeLineID recvFileTLI = 0
static

Definition at line 104 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), and XLogWalRcvWrite().

◆ recvSegNo

XLogSegNo recvSegNo = 0
static

Definition at line 105 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ reply_message

StringInfoData reply_message
static

◆ wakeup

◆ wal_receiver_status_interval

int wal_receiver_status_interval

◆ wal_receiver_timeout

int wal_receiver_timeout

◆ WalReceiverFunctions

WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 96 of file walreceiver.c.

Referenced by _PG_init(), and WalReceiverMain().

◆ wrconn

◆ Write

XLogRecPtr Write

Definition at line 113 of file walreceiver.c.