PostgreSQL Source Code  git master
walreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "access/xlogrecovery.h"
#include "catalog/pg_authid.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/auxprocess.h"
#include "postmaster/interrupt.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NUM_WALRCV_WAKEUPS   (WALRCV_WAKEUP_HSFEEDBACK + 1)
 

Typedefs

typedef enum WalRcvWakeupReason WalRcvWakeupReason
 

Enumerations

enum  WalRcvWakeupReason { WALRCV_WAKEUP_TERMINATE , WALRCV_WAKEUP_PING , WALRCV_WAKEUP_REPLY , WALRCV_WAKEUP_HSFEEDBACK }
 

Functions

static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len, TimeLineID tli)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvFlush (bool dying, TimeLineID tli)
 
static void XLogWalRcvClose (XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvSendReply (bool force, bool requestReply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
static void WalRcvComputeNextWakeup (WalRcvWakeupReason reason, TimestampTz now)
 
void ProcessWalRcvInterrupts (void)
 
void WalReceiverMain (char *startup_data, size_t startup_data_len)
 
void WalRcvForceReply (void)
 
static const char * WalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
struct {
   XLogRecPtr   Write
 
   XLogRecPtr   Flush
 
LogstreamResult
 
static TimestampTz wakeup [NUM_WALRCV_WAKEUPS]
 
static StringInfoData reply_message
 

Macro Definition Documentation

◆ NUM_WALRCV_WAKEUPS

#define NUM_WALRCV_WAKEUPS   (WALRCV_WAKEUP_HSFEEDBACK + 1)

Definition at line 123 of file walreceiver.c.

Typedef Documentation

◆ WalRcvWakeupReason

Enumeration Type Documentation

◆ WalRcvWakeupReason

Enumerator
WALRCV_WAKEUP_TERMINATE 
WALRCV_WAKEUP_PING 
WALRCV_WAKEUP_REPLY 
WALRCV_WAKEUP_HSFEEDBACK 

Definition at line 117 of file walreceiver.c.

118 {
123 #define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1)
WalRcvWakeupReason
Definition: walreceiver.c:118
@ WALRCV_WAKEUP_TERMINATE
Definition: walreceiver.c:119
@ WALRCV_WAKEUP_REPLY
Definition: walreceiver.c:121
@ WALRCV_WAKEUP_PING
Definition: walreceiver.c:120
@ WALRCV_WAKEUP_HSFEEDBACK
Definition: walreceiver.c:122

Function Documentation

◆ pg_stat_get_wal_receiver()

Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1401 of file walreceiver.c.

1402 {
1403  TupleDesc tupdesc;
1404  Datum *values;
1405  bool *nulls;
1406  int pid;
1407  bool ready_to_display;
1409  XLogRecPtr receive_start_lsn;
1410  TimeLineID receive_start_tli;
1411  XLogRecPtr written_lsn;
1412  XLogRecPtr flushed_lsn;
1413  TimeLineID received_tli;
1414  TimestampTz last_send_time;
1415  TimestampTz last_receipt_time;
1416  XLogRecPtr latest_end_lsn;
1417  TimestampTz latest_end_time;
1418  char sender_host[NI_MAXHOST];
1419  int sender_port = 0;
1420  char slotname[NAMEDATALEN];
1421  char conninfo[MAXCONNINFO];
1422 
1423  /* Take a lock to ensure value consistency */
1425  pid = (int) WalRcv->pid;
1426  ready_to_display = WalRcv->ready_to_display;
1428  receive_start_lsn = WalRcv->receiveStart;
1429  receive_start_tli = WalRcv->receiveStartTLI;
1430  flushed_lsn = WalRcv->flushedUpto;
1431  received_tli = WalRcv->receivedTLI;
1432  last_send_time = WalRcv->lastMsgSendTime;
1433  last_receipt_time = WalRcv->lastMsgReceiptTime;
1434  latest_end_lsn = WalRcv->latestWalEnd;
1435  latest_end_time = WalRcv->latestWalEndTime;
1436  strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1437  strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
1438  sender_port = WalRcv->sender_port;
1439  strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1441 
1442  /*
1443  * No WAL receiver (or not ready yet), just return a tuple with NULL
1444  * values
1445  */
1446  if (pid == 0 || !ready_to_display)
1447  PG_RETURN_NULL();
1448 
1449  /*
1450  * Read "writtenUpto" without holding a spinlock. Note that it may not be
1451  * consistent with the other shared variables of the WAL receiver
1452  * protected by a spinlock, but this should not be used for data integrity
1453  * checks.
1454  */
1455  written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto);
1456 
1457  /* determine result type */
1458  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1459  elog(ERROR, "return type must be a row type");
1460 
1461  values = palloc0(sizeof(Datum) * tupdesc->natts);
1462  nulls = palloc0(sizeof(bool) * tupdesc->natts);
1463 
1464  /* Fetch values */
1465  values[0] = Int32GetDatum(pid);
1466 
1467  if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
1468  {
1469  /*
1470  * Only superusers and roles with privileges of pg_read_all_stats can
1471  * see details. Other users only get the pid value to know whether it
1472  * is a WAL receiver, but no details.
1473  */
1474  memset(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1475  }
1476  else
1477  {
1479 
1480  if (XLogRecPtrIsInvalid(receive_start_lsn))
1481  nulls[2] = true;
1482  else
1483  values[2] = LSNGetDatum(receive_start_lsn);
1484  values[3] = Int32GetDatum(receive_start_tli);
1485  if (XLogRecPtrIsInvalid(written_lsn))
1486  nulls[4] = true;
1487  else
1488  values[4] = LSNGetDatum(written_lsn);
1489  if (XLogRecPtrIsInvalid(flushed_lsn))
1490  nulls[5] = true;
1491  else
1492  values[5] = LSNGetDatum(flushed_lsn);
1493  values[6] = Int32GetDatum(received_tli);
1494  if (last_send_time == 0)
1495  nulls[7] = true;
1496  else
1497  values[7] = TimestampTzGetDatum(last_send_time);
1498  if (last_receipt_time == 0)
1499  nulls[8] = true;
1500  else
1501  values[8] = TimestampTzGetDatum(last_receipt_time);
1502  if (XLogRecPtrIsInvalid(latest_end_lsn))
1503  nulls[9] = true;
1504  else
1505  values[9] = LSNGetDatum(latest_end_lsn);
1506  if (latest_end_time == 0)
1507  nulls[10] = true;
1508  else
1509  values[10] = TimestampTzGetDatum(latest_end_time);
1510  if (*slotname == '\0')
1511  nulls[11] = true;
1512  else
1513  values[11] = CStringGetTextDatum(slotname);
1514  if (*sender_host == '\0')
1515  nulls[12] = true;
1516  else
1517  values[12] = CStringGetTextDatum(sender_host);
1518  if (sender_port == 0)
1519  nulls[13] = true;
1520  else
1521  values[13] = Int32GetDatum(sender_port);
1522  if (*conninfo == '\0')
1523  nulls[14] = true;
1524  else
1525  values[14] = CStringGetTextDatum(conninfo);
1526  }
1527 
1528  /* Returns the record as Datum */
1530 }
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:5128
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:462
static Datum values[MAXATTR]
Definition: bootstrap.c:152
#define CStringGetTextDatum(s)
Definition: builtins.h:97
int64 TimestampTz
Definition: timestamp.h:39
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:224
#define PG_RETURN_NULL()
Definition: fmgr.h:345
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:353
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:276
@ TYPEFUNC_COMPOSITE
Definition: funcapi.h:149
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
Definition: funcapi.h:230
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1116
void * palloc0(Size size)
Definition: mcxt.c:1346
Oid GetUserId(void)
Definition: miscinit.c:514
#define NAMEDATALEN
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
uintptr_t Datum
Definition: postgres.h:64
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:101
XLogRecPtr latestWalEnd
Definition: walreceiver.h:106
TimeLineID receiveStartTLI
Definition: walreceiver.h:77
TimeLineID receivedTLI
Definition: walreceiver.h:87
char slotname[NAMEDATALEN]
Definition: walreceiver.h:126
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:119
pid_t pid
Definition: walreceiver.h:65
XLogRecPtr receiveStart
Definition: walreceiver.h:76
XLogRecPtr flushedUpto
Definition: walreceiver.h:86
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:154
TimestampTz lastMsgSendTime
Definition: walreceiver.h:100
WalRcvState walRcvState
Definition: walreceiver.h:66
TimestampTz latestWalEndTime
Definition: walreceiver.h:107
bool ready_to_display
Definition: walreceiver.h:135
int sender_port
Definition: walreceiver.h:120
slock_t mutex
Definition: walreceiver.h:146
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:113
Definition: regguts.h:323
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
static const char * WalRcvGetStateString(WalRcvState state)
Definition: walreceiver.c:1376
#define MAXCONNINFO
Definition: walreceiver.h:38
WalRcvState
Definition: walreceiver.h:47
WalRcvData * WalRcv
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59

References WalRcvData::conninfo, CStringGetTextDatum, elog, ERROR, WalRcvData::flushedUpto, get_call_result_type(), GetUserId(), has_privs_of_role(), heap_form_tuple(), HeapTupleGetDatum(), Int32GetDatum(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum(), MAXCONNINFO, WalRcvData::mutex, NAMEDATALEN, TupleDescData::natts, palloc0(), pg_atomic_read_u64(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::sender_host, WalRcvData::sender_port, WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, strlcpy(), TimestampTzGetDatum(), TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, WalRcvData::writtenUpto, and XLogRecPtrIsInvalid.

◆ ProcessWalRcvInterrupts()

void ProcessWalRcvInterrupts ( void  )

Definition at line 162 of file walreceiver.c.

163 {
164  /*
165  * Although walreceiver interrupt handling doesn't use the same scheme as
166  * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
167  * any incoming signals on Win32, and also to make sure we process any
168  * barrier events.
169  */
171 
173  {
174  ereport(FATAL,
175  (errcode(ERRCODE_ADMIN_SHUTDOWN),
176  errmsg("terminating walreceiver process due to administrator command")));
177  }
178 }
int errcode(int sqlerrcode)
Definition: elog.c:859
int errmsg(const char *fmt,...)
Definition: elog.c:1072
#define FATAL
Definition: elog.h:41
#define ereport(elevel,...)
Definition: elog.h:149
volatile sig_atomic_t ShutdownRequestPending
Definition: interrupt.c:28
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122

References CHECK_FOR_INTERRUPTS, ereport, errcode(), errmsg(), FATAL, and ShutdownRequestPending.

Referenced by libpqrcv_connect(), libpqrcv_PQgetResult(), libpqrcv_processTuples(), WalRcvWaitForStartPosition(), and WalReceiverMain().

◆ ProcessWalSndrMessage()

static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1265 of file walreceiver.c.

1266 {
1267  WalRcvData *walrcv = WalRcv;
1268  TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1269 
1270  /* Update shared-memory status */
1271  SpinLockAcquire(&walrcv->mutex);
1272  if (walrcv->latestWalEnd < walEnd)
1273  walrcv->latestWalEndTime = sendTime;
1274  walrcv->latestWalEnd = walEnd;
1275  walrcv->lastMsgSendTime = sendTime;
1276  walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1277  SpinLockRelease(&walrcv->mutex);
1278 
1280  {
1281  char *sendtime;
1282  char *receipttime;
1283  int applyDelay;
1284 
1285  /* Copy because timestamptz_to_str returns a static buffer */
1286  sendtime = pstrdup(timestamptz_to_str(sendTime));
1287  receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1288  applyDelay = GetReplicationApplyDelay();
1289 
1290  /* apply delay is not available */
1291  if (applyDelay == -1)
1292  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1293  sendtime,
1294  receipttime,
1296  else
1297  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1298  sendtime,
1299  receipttime,
1300  applyDelay,
1302 
1303  pfree(sendtime);
1304  pfree(receipttime);
1305  }
1306 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1654
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1853
bool message_level_is_interesting(int elevel)
Definition: elog.c:276
#define DEBUG2
Definition: elog.h:29
char * pstrdup(const char *in)
Definition: mcxt.c:1695
void pfree(void *pointer)
Definition: mcxt.c:1520
int GetReplicationApplyDelay(void)
int GetReplicationTransferLatency(void)

References DEBUG2, elog, GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, message_level_is_interesting(), WalRcvData::mutex, pfree(), pstrdup(), SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

◆ WalRcvComputeNextWakeup()

static void WalRcvComputeNextWakeup ( WalRcvWakeupReason  reason,
TimestampTz  now 
)
static

Definition at line 1317 of file walreceiver.c.

1318 {
1319  switch (reason)
1320  {
1322  if (wal_receiver_timeout <= 0)
1323  wakeup[reason] = TIMESTAMP_INFINITY;
1324  else
1326  break;
1327  case WALRCV_WAKEUP_PING:
1328  if (wal_receiver_timeout <= 0)
1329  wakeup[reason] = TIMESTAMP_INFINITY;
1330  else
1332  break;
1335  wakeup[reason] = TIMESTAMP_INFINITY;
1336  else
1338  break;
1339  case WALRCV_WAKEUP_REPLY:
1341  wakeup[reason] = TIMESTAMP_INFINITY;
1342  else
1344  break;
1345  /* there's intentionally no default: here */
1346  }
1347 }
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1618
#define TIMESTAMP_INFINITY
Definition: timestamp.h:151
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
#define TimestampTzPlusSeconds(tz, s)
Definition: timestamp.h:86
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]
Definition: walreceiver.c:129
bool hot_standby_feedback
Definition: walreceiver.c:89
int wal_receiver_status_interval
Definition: walreceiver.c:87
int wal_receiver_timeout
Definition: walreceiver.c:88

References hot_standby_feedback, now(), TIMESTAMP_INFINITY, TimestampTzPlusMilliseconds, TimestampTzPlusSeconds, wakeup, wal_receiver_status_interval, wal_receiver_timeout, WALRCV_WAKEUP_HSFEEDBACK, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_REPLY, and WALRCV_WAKEUP_TERMINATE.

Referenced by WalReceiverMain(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ WalRcvDie()

static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 801 of file walreceiver.c.

802 {
803  WalRcvData *walrcv = WalRcv;
804  TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
805 
806  Assert(*startpointTLI_p != 0);
807 
808  /* Ensure that all WAL records received are flushed to disk */
809  XLogWalRcvFlush(true, *startpointTLI_p);
810 
811  /* Mark ourselves inactive in shared memory */
812  SpinLockAcquire(&walrcv->mutex);
813  Assert(walrcv->walRcvState == WALRCV_STREAMING ||
814  walrcv->walRcvState == WALRCV_RESTARTING ||
815  walrcv->walRcvState == WALRCV_STARTING ||
816  walrcv->walRcvState == WALRCV_WAITING ||
817  walrcv->walRcvState == WALRCV_STOPPING);
818  Assert(walrcv->pid == MyProcPid);
819  walrcv->walRcvState = WALRCV_STOPPED;
820  walrcv->pid = 0;
821  walrcv->ready_to_display = false;
822  walrcv->latch = NULL;
823  SpinLockRelease(&walrcv->mutex);
824 
826 
827  /* Terminate the connection gracefully. */
828  if (wrconn != NULL)
830 
831  /* Wake up the startup process to notice promptly that we're gone */
832  WakeupRecovery();
833 }
#define Assert(condition)
Definition: c.h:858
void ConditionVariableBroadcast(ConditionVariable *cv)
int MyProcPid
Definition: globals.c:45
void * arg
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312
Latch * latch
Definition: walreceiver.h:144
ConditionVariable walRcvStoppedCV
Definition: walreceiver.h:67
static WalReceiverConn * wrconn
Definition: walreceiver.c:92
static void XLogWalRcvFlush(bool dying, TimeLineID tli)
Definition: walreceiver.c:993
@ WALRCV_STARTING
Definition: walreceiver.h:49
@ WALRCV_STOPPED
Definition: walreceiver.h:48
@ WALRCV_RESTARTING
Definition: walreceiver.h:53
@ WALRCV_STREAMING
Definition: walreceiver.h:51
@ WALRCV_WAITING
Definition: walreceiver.h:52
@ WALRCV_STOPPING
Definition: walreceiver.h:54
#define walrcv_disconnect(conn)
Definition: walreceiver.h:464
void WakeupRecovery(void)

References arg, Assert, ConditionVariableBroadcast(), DatumGetPointer(), WalRcvData::latch, WalRcvData::mutex, MyProcPid, WalRcvData::pid, WalRcvData::ready_to_display, SpinLockAcquire, SpinLockRelease, WakeupRecovery(), WalRcv, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, wrconn, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

◆ WalRcvFetchTimeLineHistoryFiles()

static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 745 of file walreceiver.c.

746 {
747  TimeLineID tli;
748 
749  for (tli = first; tli <= last; tli++)
750  {
751  /* there's no history file for timeline 1 */
752  if (tli != 1 && !existsTimeLineHistory(tli))
753  {
754  char *fname;
755  char *content;
756  int len;
757  char expectedfname[MAXFNAMELEN];
758 
759  ereport(LOG,
760  (errmsg("fetching timeline history file for timeline %u from primary server",
761  tli)));
762 
763  walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
764 
765  /*
766  * Check that the filename on the primary matches what we
767  * calculated ourselves. This is just a sanity check, it should
768  * always match.
769  */
770  TLHistoryFileName(expectedfname, tli);
771  if (strcmp(fname, expectedfname) != 0)
772  ereport(ERROR,
773  (errcode(ERRCODE_PROTOCOL_VIOLATION),
774  errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
775  tli)));
776 
777  /*
778  * Write the file to pg_wal.
779  */
780  writeTimeLineHistoryFile(tli, content, len);
781 
782  /*
783  * Mark the streamed history file as ready for archiving if
784  * archive_mode is always.
785  */
787  XLogArchiveForceDone(fname);
788  else
789  XLogArchiveNotify(fname);
790 
791  pfree(fname);
792  pfree(content);
793  }
794  }
795 }
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition: timeline.c:463
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition: timeline.c:222
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1159
#define LOG
Definition: elog.h:31
const void size_t len
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
Definition: walreceiver.h:446
int XLogArchiveMode
Definition: xlog.c:119
@ ARCHIVE_MODE_ALWAYS
Definition: xlog.h:65
#define MAXFNAMELEN
static void TLHistoryFileName(char *fname, TimeLineID tli)
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:510
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:444

References ARCHIVE_MODE_ALWAYS, ereport, errcode(), errmsg(), errmsg_internal(), ERROR, existsTimeLineHistory(), len, LOG, MAXFNAMELEN, pfree(), TLHistoryFileName(), walrcv_readtimelinehistoryfile, wrconn, writeTimeLineHistoryFile(), XLogArchiveForceDone(), XLogArchiveMode, and XLogArchiveNotify().

Referenced by WalReceiverMain().

◆ WalRcvForceReply()

void WalRcvForceReply ( void  )

Definition at line 1358 of file walreceiver.c.

1359 {
1360  Latch *latch;
1361 
1362  WalRcv->force_reply = true;
1363  /* fetching the latch pointer might not be atomic, so use spinlock */
1365  latch = WalRcv->latch;
1367  if (latch)
1368  SetLatch(latch);
1369 }
void SetLatch(Latch *latch)
Definition: latch.c:632
Definition: latch.h:113
sig_atomic_t force_reply
Definition: walreceiver.h:161

References WalRcvData::force_reply, WalRcvData::latch, WalRcvData::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by ApplyWalRecord(), and WaitForWALToBecomeAvailable().

◆ WalRcvGetStateString()

static const char* WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1376 of file walreceiver.c.

1377 {
1378  switch (state)
1379  {
1380  case WALRCV_STOPPED:
1381  return "stopped";
1382  case WALRCV_STARTING:
1383  return "starting";
1384  case WALRCV_STREAMING:
1385  return "streaming";
1386  case WALRCV_WAITING:
1387  return "waiting";
1388  case WALRCV_RESTARTING:
1389  return "restarting";
1390  case WALRCV_STOPPING:
1391  return "stopping";
1392  }
1393  return "UNKNOWN";
1394 }

References WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

◆ WalRcvWaitForStartPosition()

static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 665 of file walreceiver.c.

666 {
667  WalRcvData *walrcv = WalRcv;
668  int state;
669 
670  SpinLockAcquire(&walrcv->mutex);
671  state = walrcv->walRcvState;
672  if (state != WALRCV_STREAMING)
673  {
674  SpinLockRelease(&walrcv->mutex);
675  if (state == WALRCV_STOPPING)
676  proc_exit(0);
677  else
678  elog(FATAL, "unexpected walreceiver state");
679  }
680  walrcv->walRcvState = WALRCV_WAITING;
682  walrcv->receiveStartTLI = 0;
683  SpinLockRelease(&walrcv->mutex);
684 
685  set_ps_display("idle");
686 
687  /*
688  * nudge startup process to notice that we've stopped streaming and are
689  * now waiting for instructions.
690  */
691  WakeupRecovery();
692  for (;;)
693  {
695 
697 
698  SpinLockAcquire(&walrcv->mutex);
699  Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
700  walrcv->walRcvState == WALRCV_WAITING ||
701  walrcv->walRcvState == WALRCV_STOPPING);
702  if (walrcv->walRcvState == WALRCV_RESTARTING)
703  {
704  /*
705  * No need to handle changes in primary_conninfo or
706  * primary_slot_name here. Startup process will signal us to
707  * terminate in case those change.
708  */
709  *startpoint = walrcv->receiveStart;
710  *startpointTLI = walrcv->receiveStartTLI;
711  walrcv->walRcvState = WALRCV_STREAMING;
712  SpinLockRelease(&walrcv->mutex);
713  break;
714  }
715  if (walrcv->walRcvState == WALRCV_STOPPING)
716  {
717  /*
718  * We should've received SIGTERM if the startup process wants us
719  * to die, but might as well check it here too.
720  */
721  SpinLockRelease(&walrcv->mutex);
722  exit(1);
723  }
724  SpinLockRelease(&walrcv->mutex);
725 
727  WAIT_EVENT_WAL_RECEIVER_WAIT_START);
728  }
729 
731  {
732  char activitymsg[50];
733 
734  snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
735  LSN_FORMAT_ARGS(*startpoint));
736  set_ps_display(activitymsg);
737  }
738 }
struct Latch * MyLatch
Definition: globals.c:60
void proc_exit(int code)
Definition: ipc.c:104
void ResetLatch(Latch *latch)
Definition: latch.c:724
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:517
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
exit(1)
#define snprintf
Definition: port.h:238
bool update_process_title
Definition: ps_status.c:29
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:162
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert, elog, exit(), FATAL, InvalidXLogRecPtr, LSN_FORMAT_ARGS, WalRcvData::mutex, MyLatch, proc_exit(), ProcessWalRcvInterrupts(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, ResetLatch(), set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by WalReceiverMain().

◆ WalReceiverMain()

void WalReceiverMain ( char *  startup_data,
size_t  startup_data_len 
)

Definition at line 183 of file walreceiver.c.

184 {
185  char conninfo[MAXCONNINFO];
186  char *tmp_conninfo;
187  char slotname[NAMEDATALEN];
188  bool is_temp_slot;
189  XLogRecPtr startpoint;
190  TimeLineID startpointTLI;
191  TimeLineID primaryTLI;
192  bool first_stream;
193  WalRcvData *walrcv;
195  char *err;
196  char *sender_host = NULL;
197  int sender_port = 0;
198 
199  Assert(startup_data_len == 0);
200 
203 
204  /*
205  * WalRcv should be set up already (if we are a backend, we inherit this
206  * by fork() or EXEC_BACKEND mechanism from the postmaster).
207  */
208  walrcv = WalRcv;
209  Assert(walrcv != NULL);
210 
211  /*
212  * Mark walreceiver as running in shared memory.
213  *
214  * Do this as early as possible, so that if we fail later on, we'll set
215  * state to STOPPED. If we die before this, the startup process will keep
216  * waiting for us to start up, until it times out.
217  */
218  SpinLockAcquire(&walrcv->mutex);
219  Assert(walrcv->pid == 0);
220  switch (walrcv->walRcvState)
221  {
222  case WALRCV_STOPPING:
223  /* If we've already been requested to stop, don't start up. */
224  walrcv->walRcvState = WALRCV_STOPPED;
225  /* fall through */
226 
227  case WALRCV_STOPPED:
228  SpinLockRelease(&walrcv->mutex);
230  proc_exit(1);
231  break;
232 
233  case WALRCV_STARTING:
234  /* The usual case */
235  break;
236 
237  case WALRCV_WAITING:
238  case WALRCV_STREAMING:
239  case WALRCV_RESTARTING:
240  default:
241  /* Shouldn't happen */
242  SpinLockRelease(&walrcv->mutex);
243  elog(PANIC, "walreceiver still running according to shared memory state");
244  }
245  /* Advertise our PID so that the startup process can kill us */
246  walrcv->pid = MyProcPid;
247  walrcv->walRcvState = WALRCV_STREAMING;
248 
249  /* Fetch information required to start streaming */
250  walrcv->ready_to_display = false;
251  strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
252  strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
253  is_temp_slot = walrcv->is_temp_slot;
254  startpoint = walrcv->receiveStart;
255  startpointTLI = walrcv->receiveStartTLI;
256 
257  /*
258  * At most one of is_temp_slot and slotname can be set; otherwise,
259  * RequestXLogStreaming messed up.
260  */
261  Assert(!is_temp_slot || (slotname[0] == '\0'));
262 
263  /* Initialise to a sanish value */
265  walrcv->lastMsgSendTime =
266  walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
267 
268  /* Report the latch to use to awaken this process */
269  walrcv->latch = &MyProc->procLatch;
270 
271  SpinLockRelease(&walrcv->mutex);
272 
274 
275  /* Arrange to clean up at walreceiver exit */
276  on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
277 
278  /* Properly accept or ignore signals the postmaster might send us */
279  pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
280  * file */
281  pqsignal(SIGINT, SIG_IGN);
282  pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
283  /* SIGQUIT handler was already set up by InitPostmasterChild */
288 
289  /* Reset some signals that are accepted by postmaster but not here */
291 
292  /* Load the libpq-specific functions */
293  load_file("libpqwalreceiver", false);
294  if (WalReceiverFunctions == NULL)
295  elog(ERROR, "libpqwalreceiver didn't initialize correctly");
296 
297  /* Unblock signals (they were blocked when the postmaster forked us) */
298  sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
299 
300  /* Establish the connection to the primary for XLOG streaming */
301  wrconn = walrcv_connect(conninfo, true, false, false,
302  cluster_name[0] ? cluster_name : "walreceiver",
303  &err);
304  if (!wrconn)
305  ereport(ERROR,
306  (errcode(ERRCODE_CONNECTION_FAILURE),
307  errmsg("could not connect to the primary server: %s", err)));
308 
309  /*
310  * Save user-visible connection string. This clobbers the original
311  * conninfo, for security. Also save host and port of the sender server
312  * this walreceiver is connected to.
313  */
314  tmp_conninfo = walrcv_get_conninfo(wrconn);
315  walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
316  SpinLockAcquire(&walrcv->mutex);
317  memset(walrcv->conninfo, 0, MAXCONNINFO);
318  if (tmp_conninfo)
319  strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
320 
321  memset(walrcv->sender_host, 0, NI_MAXHOST);
322  if (sender_host)
323  strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
324 
325  walrcv->sender_port = sender_port;
326  walrcv->ready_to_display = true;
327  SpinLockRelease(&walrcv->mutex);
328 
329  if (tmp_conninfo)
330  pfree(tmp_conninfo);
331 
332  if (sender_host)
333  pfree(sender_host);
334 
335  first_stream = true;
336  for (;;)
337  {
338  char *primary_sysid;
339  char standby_sysid[32];
341 
342  /*
343  * Check that we're connected to a valid server using the
344  * IDENTIFY_SYSTEM replication command.
345  */
346  primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
347 
348  snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
350  if (strcmp(primary_sysid, standby_sysid) != 0)
351  {
352  ereport(ERROR,
353  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
354  errmsg("database system identifier differs between the primary and standby"),
355  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
356  primary_sysid, standby_sysid)));
357  }
358 
359  /*
360  * Confirm that the current timeline of the primary is the same or
361  * ahead of ours.
362  */
363  if (primaryTLI < startpointTLI)
364  ereport(ERROR,
365  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
366  errmsg("highest timeline %u of the primary is behind recovery timeline %u",
367  primaryTLI, startpointTLI)));
368 
369  /*
370  * Get any missing history files. We do this always, even when we're
371  * not interested in that timeline, so that if we're promoted to
372  * become the primary later on, we don't select the same timeline that
373  * was already used in the current primary. This isn't bullet-proof -
374  * you'll need some external software to manage your cluster if you
375  * need to ensure that a unique timeline id is chosen in every case,
376  * but let's avoid the confusion of timeline id collisions where we
377  * can.
378  */
379  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
380 
381  /*
382  * Create temporary replication slot if requested, and update slot
383  * name in shared memory. (Note the slot name cannot already be set
384  * in this case.)
385  */
386  if (is_temp_slot)
387  {
388  snprintf(slotname, sizeof(slotname),
389  "pg_walreceiver_%lld",
390  (long long int) walrcv_get_backend_pid(wrconn));
391 
392  walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL);
393 
394  SpinLockAcquire(&walrcv->mutex);
395  strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
396  SpinLockRelease(&walrcv->mutex);
397  }
398 
399  /*
400  * Start streaming.
401  *
402  * We'll try to start at the requested starting point and timeline,
403  * even if it's different from the server's latest timeline. In case
404  * we've already reached the end of the old timeline, the server will
405  * finish the streaming immediately, and we will go back to await
406  * orders from the startup process. If recovery_target_timeline is
407  * 'latest', the startup process will scan pg_wal and find the new
408  * history file, bump recovery target timeline, and ask us to restart
409  * on the new timeline.
410  */
411  options.logical = false;
412  options.startpoint = startpoint;
413  options.slotname = slotname[0] != '\0' ? slotname : NULL;
414  options.proto.physical.startpointTLI = startpointTLI;
416  {
417  if (first_stream)
418  ereport(LOG,
419  (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
420  LSN_FORMAT_ARGS(startpoint), startpointTLI)));
421  else
422  ereport(LOG,
423  (errmsg("restarted WAL streaming at %X/%X on timeline %u",
424  LSN_FORMAT_ARGS(startpoint), startpointTLI)));
425  first_stream = false;
426 
427  /* Initialize LogstreamResult and buffers for processing messages */
430 
431  /* Initialize nap wakeup times. */
433  for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
435 
436  /* Send initial reply/feedback messages. */
437  XLogWalRcvSendReply(true, false);
439 
440  /* Loop until end-of-streaming or error */
441  for (;;)
442  {
443  char *buf;
444  int len;
445  bool endofwal = false;
446  pgsocket wait_fd = PGINVALID_SOCKET;
447  int rc;
448  TimestampTz nextWakeup;
449  long nap;
450 
451  /*
452  * Exit walreceiver if we're not in recovery. This should not
453  * happen, but cross-check the status here.
454  */
455  if (!RecoveryInProgress())
456  ereport(FATAL,
457  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
458  errmsg("cannot continue WAL streaming, recovery has already ended")));
459 
460  /* Process any requests or signals received recently */
462 
464  {
465  ConfigReloadPending = false;
467  /* recompute wakeup times */
469  for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
472  }
473 
474  /* See if we can read data immediately */
475  len = walrcv_receive(wrconn, &buf, &wait_fd);
476  if (len != 0)
477  {
478  /*
479  * Process the received data, and any subsequent data we
480  * can read without blocking.
481  */
482  for (;;)
483  {
484  if (len > 0)
485  {
486  /*
487  * Something was received from primary, so adjust
488  * the ping and terminate wakeup times.
489  */
492  now);
494  XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
495  startpointTLI);
496  }
497  else if (len == 0)
498  break;
499  else if (len < 0)
500  {
501  ereport(LOG,
502  (errmsg("replication terminated by primary server"),
503  errdetail("End of WAL reached on timeline %u at %X/%X.",
504  startpointTLI,
506  endofwal = true;
507  break;
508  }
509  len = walrcv_receive(wrconn, &buf, &wait_fd);
510  }
511 
512  /* Let the primary know that we received some data. */
513  XLogWalRcvSendReply(false, false);
514 
515  /*
516  * If we've written some records, flush them to disk and
517  * let the startup process and primary server know about
518  * them.
519  */
520  XLogWalRcvFlush(false, startpointTLI);
521  }
522 
523  /* Check if we need to exit the streaming loop. */
524  if (endofwal)
525  break;
526 
527  /* Find the soonest wakeup time, to limit our nap. */
528  nextWakeup = TIMESTAMP_INFINITY;
529  for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
530  nextWakeup = Min(wakeup[i], nextWakeup);
531 
532  /* Calculate the nap time, clamping as necessary. */
534  nap = TimestampDifferenceMilliseconds(now, nextWakeup);
535 
536  /*
537  * Ideally we would reuse a WaitEventSet object repeatedly
538  * here to avoid the overheads of WaitLatchOrSocket on epoll
539  * systems, but we can't be sure that libpq (or any other
540  * walreceiver implementation) has the same socket (even if
541  * the fd is the same number, it may have been closed and
542  * reopened since the last time). In future, if there is a
543  * function for removing sockets from WaitEventSet, then we
544  * could add and remove just the socket each time, potentially
545  * avoiding some system calls.
546  */
547  Assert(wait_fd != PGINVALID_SOCKET);
551  wait_fd,
552  nap,
553  WAIT_EVENT_WAL_RECEIVER_MAIN);
554  if (rc & WL_LATCH_SET)
555  {
558 
559  if (walrcv->force_reply)
560  {
561  /*
562  * The recovery process has asked us to send apply
563  * feedback now. Make sure the flag is really set to
564  * false in shared memory before sending the reply, so
565  * we don't miss a new request for a reply.
566  */
567  walrcv->force_reply = false;
569  XLogWalRcvSendReply(true, false);
570  }
571  }
572  if (rc & WL_TIMEOUT)
573  {
574  /*
575  * We didn't receive anything new. If we haven't heard
576  * anything from the server for more than
577  * wal_receiver_timeout / 2, ping the server. Also, if
578  * it's been longer than wal_receiver_status_interval
579  * since the last update we sent, send a status update to
580  * the primary anyway, to report any progress in applying
581  * WAL.
582  */
583  bool requestReply = false;
584 
585  /*
586  * Check if time since last receive from primary has
587  * reached the configured limit.
588  */
591  ereport(ERROR,
592  (errcode(ERRCODE_CONNECTION_FAILURE),
593  errmsg("terminating walreceiver due to timeout")));
594 
595  /*
596  * If we didn't receive anything new for half of receiver
597  * replication timeout, then ping the server.
598  */
599  if (now >= wakeup[WALRCV_WAKEUP_PING])
600  {
601  requestReply = true;
603  }
604 
605  XLogWalRcvSendReply(requestReply, requestReply);
607  }
608  }
609 
610  /*
611  * The backend finished streaming. Exit streaming COPY-mode from
612  * our side, too.
613  */
614  walrcv_endstreaming(wrconn, &primaryTLI);
615 
616  /*
617  * If the server had switched to a new timeline that we didn't
618  * know about when we began streaming, fetch its timeline history
619  * file now.
620  */
621  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
622  }
623  else
624  ereport(LOG,
625  (errmsg("primary server contains no more WAL on requested timeline %u",
626  startpointTLI)));
627 
628  /*
629  * End of WAL reached on the requested timeline. Close the last
630  * segment, and await for new orders from the startup process.
631  */
632  if (recvFile >= 0)
633  {
634  char xlogfname[MAXFNAMELEN];
635 
636  XLogWalRcvFlush(false, startpointTLI);
638  if (close(recvFile) != 0)
639  ereport(PANIC,
641  errmsg("could not close WAL segment %s: %m",
642  xlogfname)));
643 
644  /*
645  * Create .done file forcibly to prevent the streamed segment from
646  * being archived later.
647  */
649  XLogArchiveForceDone(xlogfname);
650  else
651  XLogArchiveNotify(xlogfname);
652  }
653  recvFile = -1;
654 
655  elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
656  WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
657  }
658  /* not reached */
659 }
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:480
#define pg_memory_barrier()
Definition: atomics.h:138
void AuxiliaryProcessMainCommon(void)
Definition: auxprocess.c:44
sigset_t UnBlockSig
Definition: pqsignal.c:22
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1766
#define Min(x, y)
Definition: c.h:1004
#define UINT64_FORMAT
Definition: c.h:549
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:144
int errcode_for_file_access(void)
Definition: elog.c:882
int errdetail(const char *fmt,...)
Definition: elog.c:1205
#define PANIC
Definition: elog.h:42
#define DEBUG1
Definition: elog.h:30
void err(int eval, const char *fmt,...)
Definition: err.c:43
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
char * cluster_name
Definition: guc_tables.c:540
#define close(a)
Definition: win32.h:12
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition: interrupt.c:105
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
int i
Definition: isn.c:73
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:565
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_TIMEOUT
Definition: latch.h:130
@ B_WAL_RECEIVER
Definition: miscadmin.h:359
BackendType MyBackendType
Definition: miscinit.c:63
static char ** options
static char * buf
Definition: pg_test_fsync.c:73
pqsigfunc pqsignal(int signo, pqsigfunc func)
int pgsocket
Definition: port.h:29
#define PGINVALID_SOCKET
Definition: port.h:31
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:635
PGPROC * MyProc
Definition: proc.c:66
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Latch procLatch
Definition: proc.h:165
bool is_temp_slot
Definition: walreceiver.h:132
#define NUM_WALRCV_WAKEUPS
Definition: walreceiver.c:123
static StringInfoData reply_message
Definition: walreceiver.c:131
static int recvFile
Definition: walreceiver.c:100
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
Definition: walreceiver.c:745
static TimeLineID recvFileTLI
Definition: walreceiver.c:101
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:93
static struct @21 LogstreamResult
static XLogSegNo recvSegNo
Definition: walreceiver.c:102
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1169
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
Definition: walreceiver.c:665
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
Definition: walreceiver.c:839
static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
Definition: walreceiver.c:1317
static void WalRcvDie(int code, Datum arg)
Definition: walreceiver.c:801
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1100
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:448
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:432
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
Definition: walreceiver.h:438
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
Definition: walreceiver.h:456
#define walrcv_get_conninfo(conn)
Definition: walreceiver.h:436
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:450
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:440
#define walrcv_get_backend_pid(conn)
Definition: walreceiver.h:460
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:452
#define SIGCHLD
Definition: win32_port.h:178
#define SIGHUP
Definition: win32_port.h:168
#define SIG_DFL
Definition: win32_port.h:163
#define SIGPIPE
Definition: win32_port.h:173
#define SIGUSR1
Definition: win32_port.h:180
#define SIGALRM
Definition: win32_port.h:174
#define SIGUSR2
Definition: win32_port.h:181
#define SIG_IGN
Definition: win32_port.h:165
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4535
bool RecoveryInProgress(void)
Definition: xlog.c:6290
int wal_segment_size
Definition: xlog.c:143
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References ARCHIVE_MODE_ALWAYS, Assert, AuxiliaryProcessMainCommon(), B_WAL_RECEIVER, buf, close, cluster_name, ConditionVariableBroadcast(), ConfigReloadPending, WalRcvData::conninfo, DEBUG1, elog, ereport, err(), errcode(), errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, WalRcvData::force_reply, GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), i, initStringInfo(), WalRcvData::is_temp_slot, WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latch, WalRcvData::latestWalEndTime, len, load_file(), LOG, LogstreamResult, LSN_FORMAT_ARGS, MAXCONNINFO, MAXFNAMELEN, Min, WalRcvData::mutex, MyBackendType, MyLatch, MyProc, MyProcPid, NAMEDATALEN, now(), NUM_WALRCV_WAKEUPS, on_shmem_exit(), options, PANIC, pfree(), pg_atomic_write_u64(), pg_memory_barrier, PGC_SIGHUP, PGINVALID_SOCKET, WalRcvData::pid, PointerGetDatum(), pqsignal(), proc_exit(), ProcessConfigFile(), ProcessWalRcvInterrupts(), PGPROC::procLatch, procsignal_sigusr1_handler(), WalRcvData::ready_to_display, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, reply_message, ResetLatch(), WalRcvData::sender_host, WalRcvData::sender_port, SIG_DFL, SIG_IGN, SIGALRM, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SIGPIPE, SIGUSR1, SIGUSR2, WalRcvData::slotname, snprintf, SpinLockAcquire, SpinLockRelease, strlcpy(), TIMESTAMP_INFINITY, TimestampDifferenceMilliseconds(), UINT64_FORMAT, UnBlockSig, WaitLatchOrSocket(), wakeup, wal_segment_size, WalRcv, walrcv_connect, walrcv_create_slot, walrcv_endstreaming, walrcv_get_backend_pid, walrcv_get_conninfo, walrcv_get_senderinfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_TERMINATE, WalRcvComputeNextWakeup(), WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, WalRcvWaitForStartPosition(), WalReceiverFunctions, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_TIMEOUT, wrconn, WalRcvData::writtenUpto, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName(), XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ XLogWalRcvClose()

static void XLogWalRcvClose ( XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 1048 of file walreceiver.c.

1049 {
1050  char xlogfname[MAXFNAMELEN];
1051 
1053  Assert(tli != 0);
1054 
1055  /*
1056  * fsync() and close current file before we switch to next one. We would
1057  * otherwise have to reopen this file to fsync it later
1058  */
1059  XLogWalRcvFlush(false, tli);
1060 
1062 
1063  /*
1064  * XLOG segment files will be re-read by recovery in startup process soon,
1065  * so we don't advise the OS to release cache pages associated with the
1066  * file like XLogFileClose() does.
1067  */
1068  if (close(recvFile) != 0)
1069  ereport(PANIC,
1071  errmsg("could not close WAL segment %s: %m",
1072  xlogfname)));
1073 
1074  /*
1075  * Create .done file forcibly to prevent the streamed segment from being
1076  * archived later.
1077  */
1079  XLogArchiveForceDone(xlogfname);
1080  else
1081  XLogArchiveNotify(xlogfname);
1082 
1083  recvFile = -1;
1084 }
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

References ARCHIVE_MODE_ALWAYS, Assert, close, ereport, errcode_for_file_access(), errmsg(), MAXFNAMELEN, PANIC, recvFile, recvFileTLI, recvSegNo, wal_segment_size, XLByteInSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName(), and XLogWalRcvFlush().

Referenced by XLogWalRcvWrite().

◆ XLogWalRcvFlush()

static void XLogWalRcvFlush ( bool  dying,
TimeLineID  tli 
)
static

Definition at line 993 of file walreceiver.c.

994 {
995  Assert(tli != 0);
996 
997  if (LogstreamResult.Flush < LogstreamResult.Write)
998  {
999  WalRcvData *walrcv = WalRcv;
1000 
1002 
1003  LogstreamResult.Flush = LogstreamResult.Write;
1004 
1005  /* Update shared-memory status */
1006  SpinLockAcquire(&walrcv->mutex);
1007  if (walrcv->flushedUpto < LogstreamResult.Flush)
1008  {
1009  walrcv->latestChunkStart = walrcv->flushedUpto;
1010  walrcv->flushedUpto = LogstreamResult.Flush;
1011  walrcv->receivedTLI = tli;
1012  }
1013  SpinLockRelease(&walrcv->mutex);
1014 
1015  /* Signal the startup process and walsender that new WAL has arrived */
1016  WakeupRecovery();
1018  WalSndWakeup(true, false);
1019 
1020  /* Report XLOG streaming progress in PS display */
1022  {
1023  char activitymsg[50];
1024 
1025  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1027  set_ps_display(activitymsg);
1028  }
1029 
1030  /* Also let the primary know that we made some progress */
1031  if (!dying)
1032  {
1033  XLogWalRcvSendReply(false, false);
1034  XLogWalRcvSendHSFeedback(false);
1035  }
1036  }
1037 }
XLogRecPtr latestChunkStart
Definition: walreceiver.h:95
#define AllowCascadeReplication()
Definition: walreceiver.h:41
void WalSndWakeup(bool physical, bool logical)
Definition: walsender.c:3666
void issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:8603

References AllowCascadeReplication, Assert, WalRcvData::flushedUpto, issue_xlog_fsync(), WalRcvData::latestChunkStart, LogstreamResult, LSN_FORMAT_ARGS, WalRcvData::mutex, WalRcvData::receivedTLI, recvFile, recvSegNo, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvClose().

◆ XLogWalRcvProcessMsg()

static void XLogWalRcvProcessMsg ( unsigned char  type,
char *  buf,
Size  len,
TimeLineID  tli 
)
static

Definition at line 839 of file walreceiver.c.

840 {
841  int hdrlen;
842  XLogRecPtr dataStart;
843  XLogRecPtr walEnd;
844  TimestampTz sendTime;
845  bool replyRequested;
846 
847  switch (type)
848  {
849  case 'w': /* WAL records */
850  {
851  StringInfoData incoming_message;
852 
853  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
854  if (len < hdrlen)
855  ereport(ERROR,
856  (errcode(ERRCODE_PROTOCOL_VIOLATION),
857  errmsg_internal("invalid WAL message received from primary")));
858 
859  /* initialize a StringInfo with the given buffer */
860  initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
861 
862  /* read the fields */
863  dataStart = pq_getmsgint64(&incoming_message);
864  walEnd = pq_getmsgint64(&incoming_message);
865  sendTime = pq_getmsgint64(&incoming_message);
866  ProcessWalSndrMessage(walEnd, sendTime);
867 
868  buf += hdrlen;
869  len -= hdrlen;
870  XLogWalRcvWrite(buf, len, dataStart, tli);
871  break;
872  }
873  case 'k': /* Keepalive */
874  {
875  StringInfoData incoming_message;
876 
877  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
878  if (len != hdrlen)
879  ereport(ERROR,
880  (errcode(ERRCODE_PROTOCOL_VIOLATION),
881  errmsg_internal("invalid keepalive message received from primary")));
882 
883  /* initialize a StringInfo with the given buffer */
884  initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
885 
886  /* read the fields */
887  walEnd = pq_getmsgint64(&incoming_message);
888  sendTime = pq_getmsgint64(&incoming_message);
889  replyRequested = pq_getmsgbyte(&incoming_message);
890 
891  ProcessWalSndrMessage(walEnd, sendTime);
892 
893  /* If the primary requested a reply, send one immediately */
894  if (replyRequested)
895  XLogWalRcvSendReply(true, false);
896  break;
897  }
898  default:
899  ereport(ERROR,
900  (errcode(ERRCODE_PROTOCOL_VIOLATION),
901  errmsg_internal("invalid replication message type %d",
902  type)));
903  }
904 }
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:453
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition: stringinfo.h:130
const char * type
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
Definition: walreceiver.c:1265
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
Definition: walreceiver.c:910

References buf, ereport, errcode(), errmsg_internal(), ERROR, initReadOnlyStringInfo(), len, pq_getmsgbyte(), pq_getmsgint64(), ProcessWalSndrMessage(), type, XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

◆ XLogWalRcvSendHSFeedback()

static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1169 of file walreceiver.c.

1170 {
1171  TimestampTz now;
1172  FullTransactionId nextFullXid;
1173  TransactionId nextXid;
1174  uint32 xmin_epoch,
1175  catalog_xmin_epoch;
1176  TransactionId xmin,
1177  catalog_xmin;
1178 
1179  /* initially true so we always send at least one feedback message */
1180  static bool primary_has_standby_xmin = true;
1181 
1182  /*
1183  * If the user doesn't want status to be reported to the primary, be sure
1184  * to exit before doing anything at all.
1185  */
1187  !primary_has_standby_xmin)
1188  return;
1189 
1190  /* Get current timestamp. */
1192 
1193  /* Send feedback at most once per wal_receiver_status_interval. */
1194  if (!immed && now < wakeup[WALRCV_WAKEUP_HSFEEDBACK])
1195  return;
1196 
1197  /* Make sure we wake up when it's time to send feedback again. */
1199 
1200  /*
1201  * If Hot Standby is not yet accepting connections there is nothing to
1202  * send. Check this after the interval has expired to reduce number of
1203  * calls.
1204  *
1205  * Bailing out here also ensures that we don't send feedback until we've
1206  * read our own replication slot state, so we don't tell the primary to
1207  * discard needed xmin or catalog_xmin from any slots that may exist on
1208  * this replica.
1209  */
1210  if (!HotStandbyActive())
1211  return;
1212 
1213  /*
1214  * Make the expensive call to get the oldest xmin once we are certain
1215  * everything else has been checked.
1216  */
1218  {
1219  GetReplicationHorizons(&xmin, &catalog_xmin);
1220  }
1221  else
1222  {
1223  xmin = InvalidTransactionId;
1224  catalog_xmin = InvalidTransactionId;
1225  }
1226 
1227  /*
1228  * Get epoch and adjust if nextXid and oldestXmin are different sides of
1229  * the epoch boundary.
1230  */
1231  nextFullXid = ReadNextFullTransactionId();
1232  nextXid = XidFromFullTransactionId(nextFullXid);
1233  xmin_epoch = EpochFromFullTransactionId(nextFullXid);
1234  catalog_xmin_epoch = xmin_epoch;
1235  if (nextXid < xmin)
1236  xmin_epoch--;
1237  if (nextXid < catalog_xmin)
1238  catalog_xmin_epoch--;
1239 
1240  elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1241  xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1242 
1243  /* Construct the message and send it. */
1245  pq_sendbyte(&reply_message, 'h');
1247  pq_sendint32(&reply_message, xmin);
1248  pq_sendint32(&reply_message, xmin_epoch);
1249  pq_sendint32(&reply_message, catalog_xmin);
1250  pq_sendint32(&reply_message, catalog_xmin_epoch);
1252  if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1253  primary_has_standby_xmin = true;
1254  else
1255  primary_has_standby_xmin = false;
1256 }
unsigned int uint32
Definition: c.h:506
uint32 TransactionId
Definition: c.h:652
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:144
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152
void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:2035
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:78
#define InvalidTransactionId
Definition: transam.h:31
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
#define TransactionIdIsValid(xid)
Definition: transam.h:41
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:288
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:454
bool HotStandbyActive(void)

References StringInfoData::data, DEBUG2, elog, EpochFromFullTransactionId, GetCurrentTimestamp(), GetReplicationHorizons(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), pq_sendbyte(), pq_sendint32(), pq_sendint64(), ReadNextFullTransactionId(), reply_message, resetStringInfo(), TransactionIdIsValid, wakeup, wal_receiver_status_interval, walrcv_send, WALRCV_WAKEUP_HSFEEDBACK, WalRcvComputeNextWakeup(), wrconn, and XidFromFullTransactionId.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

◆ XLogWalRcvSendReply()

static void XLogWalRcvSendReply ( bool  force,
bool  requestReply 
)
static

Definition at line 1100 of file walreceiver.c.

1101 {
1102  static XLogRecPtr writePtr = 0;
1103  static XLogRecPtr flushPtr = 0;
1104  XLogRecPtr applyPtr;
1105  TimestampTz now;
1106 
1107  /*
1108  * If the user doesn't want status to be reported to the primary, be sure
1109  * to exit before doing anything at all.
1110  */
1111  if (!force && wal_receiver_status_interval <= 0)
1112  return;
1113 
1114  /* Get current timestamp. */
1116 
1117  /*
1118  * We can compare the write and flush positions to the last message we
1119  * sent without taking any lock, but the apply position requires a spin
1120  * lock, so we don't check that unless something else has changed or 10
1121  * seconds have passed. This means that the apply WAL location will
1122  * appear, from the primary's point of view, to lag slightly, but since
1123  * this is only for reporting purposes and only on idle systems, that's
1124  * probably OK.
1125  */
1126  if (!force
1127  && writePtr == LogstreamResult.Write
1128  && flushPtr == LogstreamResult.Flush
1130  return;
1131 
1132  /* Make sure we wake up when it's time to send another reply. */
1134 
1135  /* Construct a new message */
1136  writePtr = LogstreamResult.Write;
1137  flushPtr = LogstreamResult.Flush;
1138  applyPtr = GetXLogReplayRecPtr(NULL);
1139 
1141  pq_sendbyte(&reply_message, 'r');
1142  pq_sendint64(&reply_message, writePtr);
1143  pq_sendint64(&reply_message, flushPtr);
1144  pq_sendint64(&reply_message, applyPtr);
1146  pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1147 
1148  /* Send it */
1149  elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1150  LSN_FORMAT_ARGS(writePtr),
1151  LSN_FORMAT_ARGS(flushPtr),
1152  LSN_FORMAT_ARGS(applyPtr),
1153  requestReply ? " (reply requested)" : "");
1154 
1156 }

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), GetXLogReplayRecPtr(), StringInfoData::len, LogstreamResult, LSN_FORMAT_ARGS, now(), pq_sendbyte(), pq_sendint64(), reply_message, resetStringInfo(), wakeup, wal_receiver_status_interval, walrcv_send, WALRCV_WAKEUP_REPLY, WalRcvComputeNextWakeup(), and wrconn.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

◆ XLogWalRcvWrite()

static void XLogWalRcvWrite ( char *  buf,
Size  nbytes,
XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 910 of file walreceiver.c.

911 {
912  int startoff;
913  int byteswritten;
914 
915  Assert(tli != 0);
916 
917  while (nbytes > 0)
918  {
919  int segbytes;
920 
921  /* Close the current segment if it's completed */
922  if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
923  XLogWalRcvClose(recptr, tli);
924 
925  if (recvFile < 0)
926  {
927  /* Create/use new log file */
930  recvFileTLI = tli;
931  }
932 
933  /* Calculate the start offset of the received logs */
934  startoff = XLogSegmentOffset(recptr, wal_segment_size);
935 
936  if (startoff + nbytes > wal_segment_size)
937  segbytes = wal_segment_size - startoff;
938  else
939  segbytes = nbytes;
940 
941  /* OK to write the logs */
942  errno = 0;
943 
944  byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
945  if (byteswritten <= 0)
946  {
947  char xlogfname[MAXFNAMELEN];
948  int save_errno;
949 
950  /* if write didn't set errno, assume no disk space */
951  if (errno == 0)
952  errno = ENOSPC;
953 
954  save_errno = errno;
956  errno = save_errno;
957  ereport(PANIC,
959  errmsg("could not write to WAL segment %s "
960  "at offset %d, length %lu: %m",
961  xlogfname, startoff, (unsigned long) segbytes)));
962  }
963 
964  /* Update state for write */
965  recptr += byteswritten;
966 
967  nbytes -= byteswritten;
968  buf += byteswritten;
969 
970  LogstreamResult.Write = recptr;
971  }
972 
973  /* Update shared-memory status */
975 
976  /*
977  * Close the current segment if it's fully written up in the last cycle of
978  * the loop, to create its archive notification file soon. Otherwise WAL
979  * archiving of the segment will be delayed until any data in the next
980  * segment is received and written.
981  */
982  if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
983  XLogWalRcvClose(recptr, tli);
984 }
#define pg_pwrite
Definition: port.h:226
static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
Definition: walreceiver.c:1048
int XLogFileInit(XLogSegNo logsegno, TimeLineID logtli)
Definition: xlog.c:3369
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

References Assert, buf, ereport, errcode_for_file_access(), errmsg(), LogstreamResult, MAXFNAMELEN, PANIC, pg_atomic_write_u64(), pg_pwrite, recvFile, recvFileTLI, recvSegNo, wal_segment_size, WalRcv, WalRcvData::writtenUpto, XLByteInSeg, XLByteToSeg, XLogFileInit(), XLogFileName(), XLogSegmentOffset, and XLogWalRcvClose().

Referenced by XLogWalRcvProcessMsg().

Variable Documentation

◆ Flush

XLogRecPtr Flush

Definition at line 111 of file walreceiver.c.

Referenced by XLogWrite().

◆ hot_standby_feedback

bool hot_standby_feedback

◆ 

struct { ... } LogstreamResult

◆ recvFile

int recvFile = -1
static

Definition at line 100 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ recvFileTLI

TimeLineID recvFileTLI = 0
static

Definition at line 101 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), and XLogWalRcvWrite().

◆ recvSegNo

XLogSegNo recvSegNo = 0
static

Definition at line 102 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ reply_message

StringInfoData reply_message
static

◆ wakeup

◆ wal_receiver_status_interval

int wal_receiver_status_interval

◆ wal_receiver_timeout

int wal_receiver_timeout

◆ WalReceiverFunctions

WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 93 of file walreceiver.c.

Referenced by _PG_init(), and WalReceiverMain().

◆ wrconn

◆ Write

XLogRecPtr Write

Definition at line 110 of file walreceiver.c.

Referenced by XLogWrite().