PostgreSQL Source Code  git master
walreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "access/xlogrecovery.h"
#include "catalog/pg_authid.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/auxprocess.h"
#include "postmaster/interrupt.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NUM_WALRCV_WAKEUPS   (WALRCV_WAKEUP_HSFEEDBACK + 1)
 

Typedefs

typedef enum WalRcvWakeupReason WalRcvWakeupReason
 

Enumerations

enum  WalRcvWakeupReason { WALRCV_WAKEUP_TERMINATE , WALRCV_WAKEUP_PING , WALRCV_WAKEUP_REPLY , WALRCV_WAKEUP_HSFEEDBACK }
 

Functions

static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len, TimeLineID tli)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvFlush (bool dying, TimeLineID tli)
 
static void XLogWalRcvClose (XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvSendReply (bool force, bool requestReply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
static void WalRcvComputeNextWakeup (WalRcvWakeupReason reason, TimestampTz now)
 
void ProcessWalRcvInterrupts (void)
 
void WalReceiverMain (char *startup_data, size_t startup_data_len)
 
void WalRcvForceReply (void)
 
static const char * WalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
struct {
   XLogRecPtr   Write
 
   XLogRecPtr   Flush
 
LogstreamResult
 
static TimestampTz wakeup [NUM_WALRCV_WAKEUPS]
 
static StringInfoData reply_message
 

Macro Definition Documentation

◆ NUM_WALRCV_WAKEUPS

#define NUM_WALRCV_WAKEUPS   (WALRCV_WAKEUP_HSFEEDBACK + 1)

Definition at line 123 of file walreceiver.c.

Typedef Documentation

◆ WalRcvWakeupReason

Enumeration Type Documentation

◆ WalRcvWakeupReason

Enumerator
WALRCV_WAKEUP_TERMINATE 
WALRCV_WAKEUP_PING 
WALRCV_WAKEUP_REPLY 
WALRCV_WAKEUP_HSFEEDBACK 

Definition at line 117 of file walreceiver.c.

118 {
123 #define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1)
WalRcvWakeupReason
Definition: walreceiver.c:118
@ WALRCV_WAKEUP_TERMINATE
Definition: walreceiver.c:119
@ WALRCV_WAKEUP_REPLY
Definition: walreceiver.c:121
@ WALRCV_WAKEUP_PING
Definition: walreceiver.c:120
@ WALRCV_WAKEUP_HSFEEDBACK
Definition: walreceiver.c:122

Function Documentation

◆ pg_stat_get_wal_receiver()

Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1402 of file walreceiver.c.

1403 {
1404  TupleDesc tupdesc;
1405  Datum *values;
1406  bool *nulls;
1407  int pid;
1408  bool ready_to_display;
1410  XLogRecPtr receive_start_lsn;
1411  TimeLineID receive_start_tli;
1412  XLogRecPtr written_lsn;
1413  XLogRecPtr flushed_lsn;
1414  TimeLineID received_tli;
1415  TimestampTz last_send_time;
1416  TimestampTz last_receipt_time;
1417  XLogRecPtr latest_end_lsn;
1418  TimestampTz latest_end_time;
1419  char sender_host[NI_MAXHOST];
1420  int sender_port = 0;
1421  char slotname[NAMEDATALEN];
1422  char conninfo[MAXCONNINFO];
1423 
1424  /* Take a lock to ensure value consistency */
1426  pid = (int) WalRcv->pid;
1427  ready_to_display = WalRcv->ready_to_display;
1429  receive_start_lsn = WalRcv->receiveStart;
1430  receive_start_tli = WalRcv->receiveStartTLI;
1431  flushed_lsn = WalRcv->flushedUpto;
1432  received_tli = WalRcv->receivedTLI;
1433  last_send_time = WalRcv->lastMsgSendTime;
1434  last_receipt_time = WalRcv->lastMsgReceiptTime;
1435  latest_end_lsn = WalRcv->latestWalEnd;
1436  latest_end_time = WalRcv->latestWalEndTime;
1437  strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1438  strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
1439  sender_port = WalRcv->sender_port;
1440  strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1442 
1443  /*
1444  * No WAL receiver (or not ready yet), just return a tuple with NULL
1445  * values
1446  */
1447  if (pid == 0 || !ready_to_display)
1448  PG_RETURN_NULL();
1449 
1450  /*
1451  * Read "writtenUpto" without holding a spinlock. Note that it may not be
1452  * consistent with the other shared variables of the WAL receiver
1453  * protected by a spinlock, but this should not be used for data integrity
1454  * checks.
1455  */
1456  written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto);
1457 
1458  /* determine result type */
1459  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1460  elog(ERROR, "return type must be a row type");
1461 
1462  values = palloc0(sizeof(Datum) * tupdesc->natts);
1463  nulls = palloc0(sizeof(bool) * tupdesc->natts);
1464 
1465  /* Fetch values */
1466  values[0] = Int32GetDatum(pid);
1467 
1468  if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
1469  {
1470  /*
1471  * Only superusers and roles with privileges of pg_read_all_stats can
1472  * see details. Other users only get the pid value to know whether it
1473  * is a WAL receiver, but no details.
1474  */
1475  memset(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1476  }
1477  else
1478  {
1480 
1481  if (XLogRecPtrIsInvalid(receive_start_lsn))
1482  nulls[2] = true;
1483  else
1484  values[2] = LSNGetDatum(receive_start_lsn);
1485  values[3] = Int32GetDatum(receive_start_tli);
1486  if (XLogRecPtrIsInvalid(written_lsn))
1487  nulls[4] = true;
1488  else
1489  values[4] = LSNGetDatum(written_lsn);
1490  if (XLogRecPtrIsInvalid(flushed_lsn))
1491  nulls[5] = true;
1492  else
1493  values[5] = LSNGetDatum(flushed_lsn);
1494  values[6] = Int32GetDatum(received_tli);
1495  if (last_send_time == 0)
1496  nulls[7] = true;
1497  else
1498  values[7] = TimestampTzGetDatum(last_send_time);
1499  if (last_receipt_time == 0)
1500  nulls[8] = true;
1501  else
1502  values[8] = TimestampTzGetDatum(last_receipt_time);
1503  if (XLogRecPtrIsInvalid(latest_end_lsn))
1504  nulls[9] = true;
1505  else
1506  values[9] = LSNGetDatum(latest_end_lsn);
1507  if (latest_end_time == 0)
1508  nulls[10] = true;
1509  else
1510  values[10] = TimestampTzGetDatum(latest_end_time);
1511  if (*slotname == '\0')
1512  nulls[11] = true;
1513  else
1514  values[11] = CStringGetTextDatum(slotname);
1515  if (*sender_host == '\0')
1516  nulls[12] = true;
1517  else
1518  values[12] = CStringGetTextDatum(sender_host);
1519  if (sender_port == 0)
1520  nulls[13] = true;
1521  else
1522  values[13] = Int32GetDatum(sender_port);
1523  if (*conninfo == '\0')
1524  nulls[14] = true;
1525  else
1526  values[14] = CStringGetTextDatum(conninfo);
1527  }
1528 
1529  /* Returns the record as Datum */
1531 }
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:5268
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:467
static Datum values[MAXATTR]
Definition: bootstrap.c:150
#define CStringGetTextDatum(s)
Definition: builtins.h:97
int64 TimestampTz
Definition: timestamp.h:39
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define PG_RETURN_NULL()
Definition: fmgr.h:345
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:353
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:276
@ TYPEFUNC_COMPOSITE
Definition: funcapi.h:149
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
Definition: funcapi.h:230
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1116
void * palloc0(Size size)
Definition: mcxt.c:1347
Oid GetUserId(void)
Definition: miscinit.c:514
#define NAMEDATALEN
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
uintptr_t Datum
Definition: postgres.h:64
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:111
XLogRecPtr latestWalEnd
Definition: walreceiver.h:116
TimeLineID receiveStartTLI
Definition: walreceiver.h:87
TimeLineID receivedTLI
Definition: walreceiver.h:97
char slotname[NAMEDATALEN]
Definition: walreceiver.h:136
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:129
pid_t pid
Definition: walreceiver.h:68
XLogRecPtr receiveStart
Definition: walreceiver.h:86
XLogRecPtr flushedUpto
Definition: walreceiver.h:96
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:155
TimestampTz lastMsgSendTime
Definition: walreceiver.h:110
WalRcvState walRcvState
Definition: walreceiver.h:71
TimestampTz latestWalEndTime
Definition: walreceiver.h:117
bool ready_to_display
Definition: walreceiver.h:145
int sender_port
Definition: walreceiver.h:130
slock_t mutex
Definition: walreceiver.h:147
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:123
Definition: regguts.h:323
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
static const char * WalRcvGetStateString(WalRcvState state)
Definition: walreceiver.c:1377
#define MAXCONNINFO
Definition: walreceiver.h:37
WalRcvState
Definition: walreceiver.h:46
WalRcvData * WalRcv
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59

References WalRcvData::conninfo, CStringGetTextDatum, elog, ERROR, WalRcvData::flushedUpto, get_call_result_type(), GetUserId(), has_privs_of_role(), heap_form_tuple(), HeapTupleGetDatum(), Int32GetDatum(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum(), MAXCONNINFO, WalRcvData::mutex, NAMEDATALEN, TupleDescData::natts, palloc0(), pg_atomic_read_u64(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::sender_host, WalRcvData::sender_port, WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, strlcpy(), TimestampTzGetDatum(), TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, WalRcvData::writtenUpto, and XLogRecPtrIsInvalid.

◆ ProcessWalRcvInterrupts()

void ProcessWalRcvInterrupts ( void  )

Definition at line 162 of file walreceiver.c.

163 {
164  /*
165  * Although walreceiver interrupt handling doesn't use the same scheme as
166  * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
167  * any incoming signals on Win32, and also to make sure we process any
168  * barrier events.
169  */
171 
173  {
174  ereport(FATAL,
175  (errcode(ERRCODE_ADMIN_SHUTDOWN),
176  errmsg("terminating walreceiver process due to administrator command")));
177  }
178 }
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define FATAL
Definition: elog.h:41
#define ereport(elevel,...)
Definition: elog.h:149
volatile sig_atomic_t ShutdownRequestPending
Definition: interrupt.c:28
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122

References CHECK_FOR_INTERRUPTS, ereport, errcode(), errmsg(), FATAL, and ShutdownRequestPending.

Referenced by libpqrcv_connect(), libpqrcv_PQgetResult(), libpqrcv_processTuples(), WalRcvWaitForStartPosition(), and WalReceiverMain().

◆ ProcessWalSndrMessage()

static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1266 of file walreceiver.c.

1267 {
1268  WalRcvData *walrcv = WalRcv;
1269  TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1270 
1271  /* Update shared-memory status */
1272  SpinLockAcquire(&walrcv->mutex);
1273  if (walrcv->latestWalEnd < walEnd)
1274  walrcv->latestWalEndTime = sendTime;
1275  walrcv->latestWalEnd = walEnd;
1276  walrcv->lastMsgSendTime = sendTime;
1277  walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1278  SpinLockRelease(&walrcv->mutex);
1279 
1281  {
1282  char *sendtime;
1283  char *receipttime;
1284  int applyDelay;
1285 
1286  /* Copy because timestamptz_to_str returns a static buffer */
1287  sendtime = pstrdup(timestamptz_to_str(sendTime));
1288  receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1289  applyDelay = GetReplicationApplyDelay();
1290 
1291  /* apply delay is not available */
1292  if (applyDelay == -1)
1293  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1294  sendtime,
1295  receipttime,
1297  else
1298  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1299  sendtime,
1300  receipttime,
1301  applyDelay,
1303 
1304  pfree(sendtime);
1305  pfree(receipttime);
1306  }
1307 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1644
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1843
bool message_level_is_interesting(int elevel)
Definition: elog.c:272
#define DEBUG2
Definition: elog.h:29
char * pstrdup(const char *in)
Definition: mcxt.c:1696
void pfree(void *pointer)
Definition: mcxt.c:1521
int GetReplicationApplyDelay(void)
int GetReplicationTransferLatency(void)

References DEBUG2, elog, GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, message_level_is_interesting(), WalRcvData::mutex, pfree(), pstrdup(), SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

◆ WalRcvComputeNextWakeup()

static void WalRcvComputeNextWakeup ( WalRcvWakeupReason  reason,
TimestampTz  now 
)
static

Definition at line 1318 of file walreceiver.c.

1319 {
1320  switch (reason)
1321  {
1323  if (wal_receiver_timeout <= 0)
1324  wakeup[reason] = TIMESTAMP_INFINITY;
1325  else
1327  break;
1328  case WALRCV_WAKEUP_PING:
1329  if (wal_receiver_timeout <= 0)
1330  wakeup[reason] = TIMESTAMP_INFINITY;
1331  else
1333  break;
1336  wakeup[reason] = TIMESTAMP_INFINITY;
1337  else
1339  break;
1340  case WALRCV_WAKEUP_REPLY:
1342  wakeup[reason] = TIMESTAMP_INFINITY;
1343  else
1345  break;
1346  /* there's intentionally no default: here */
1347  }
1348 }
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1608
#define TIMESTAMP_INFINITY
Definition: timestamp.h:151
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
#define TimestampTzPlusSeconds(tz, s)
Definition: timestamp.h:86
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]
Definition: walreceiver.c:129
bool hot_standby_feedback
Definition: walreceiver.c:89
int wal_receiver_status_interval
Definition: walreceiver.c:87
int wal_receiver_timeout
Definition: walreceiver.c:88

References hot_standby_feedback, now(), TIMESTAMP_INFINITY, TimestampTzPlusMilliseconds, TimestampTzPlusSeconds, wakeup, wal_receiver_status_interval, wal_receiver_timeout, WALRCV_WAKEUP_HSFEEDBACK, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_REPLY, and WALRCV_WAKEUP_TERMINATE.

Referenced by WalReceiverMain(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ WalRcvDie()

static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 802 of file walreceiver.c.

803 {
804  WalRcvData *walrcv = WalRcv;
805  TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
806 
807  Assert(*startpointTLI_p != 0);
808 
809  /* Ensure that all WAL records received are flushed to disk */
810  XLogWalRcvFlush(true, *startpointTLI_p);
811 
812  /* Mark ourselves inactive in shared memory */
813  SpinLockAcquire(&walrcv->mutex);
814  Assert(walrcv->walRcvState == WALRCV_STREAMING ||
815  walrcv->walRcvState == WALRCV_RESTARTING ||
816  walrcv->walRcvState == WALRCV_STARTING ||
817  walrcv->walRcvState == WALRCV_WAITING ||
818  walrcv->walRcvState == WALRCV_STOPPING);
819  Assert(walrcv->pid == MyProcPid);
820  walrcv->walRcvState = WALRCV_STOPPED;
821  walrcv->pid = 0;
822  walrcv->procno = INVALID_PROC_NUMBER;
823  walrcv->ready_to_display = false;
824  SpinLockRelease(&walrcv->mutex);
825 
827 
828  /* Terminate the connection gracefully. */
829  if (wrconn != NULL)
831 
832  /* Wake up the startup process to notice promptly that we're gone */
833  WakeupRecovery();
834 }
#define Assert(condition)
Definition: c.h:849
void ConditionVariableBroadcast(ConditionVariable *cv)
int MyProcPid
Definition: globals.c:46
void * arg
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
ProcNumber procno
Definition: walreceiver.h:67
ConditionVariable walRcvStoppedCV
Definition: walreceiver.h:72
static WalReceiverConn * wrconn
Definition: walreceiver.c:92
static void XLogWalRcvFlush(bool dying, TimeLineID tli)
Definition: walreceiver.c:994
@ WALRCV_STARTING
Definition: walreceiver.h:48
@ WALRCV_STOPPED
Definition: walreceiver.h:47
@ WALRCV_RESTARTING
Definition: walreceiver.h:52
@ WALRCV_STREAMING
Definition: walreceiver.h:50
@ WALRCV_WAITING
Definition: walreceiver.h:51
@ WALRCV_STOPPING
Definition: walreceiver.h:53
#define walrcv_disconnect(conn)
Definition: walreceiver.h:467
void WakeupRecovery(void)

References arg, Assert, ConditionVariableBroadcast(), DatumGetPointer(), INVALID_PROC_NUMBER, WalRcvData::mutex, MyProcPid, WalRcvData::pid, WalRcvData::procno, WalRcvData::ready_to_display, SpinLockAcquire, SpinLockRelease, WakeupRecovery(), WalRcv, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, wrconn, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

◆ WalRcvFetchTimeLineHistoryFiles()

static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 746 of file walreceiver.c.

747 {
748  TimeLineID tli;
749 
750  for (tli = first; tli <= last; tli++)
751  {
752  /* there's no history file for timeline 1 */
753  if (tli != 1 && !existsTimeLineHistory(tli))
754  {
755  char *fname;
756  char *content;
757  int len;
758  char expectedfname[MAXFNAMELEN];
759 
760  ereport(LOG,
761  (errmsg("fetching timeline history file for timeline %u from primary server",
762  tli)));
763 
764  walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
765 
766  /*
767  * Check that the filename on the primary matches what we
768  * calculated ourselves. This is just a sanity check, it should
769  * always match.
770  */
771  TLHistoryFileName(expectedfname, tli);
772  if (strcmp(fname, expectedfname) != 0)
773  ereport(ERROR,
774  (errcode(ERRCODE_PROTOCOL_VIOLATION),
775  errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
776  tli)));
777 
778  /*
779  * Write the file to pg_wal.
780  */
781  writeTimeLineHistoryFile(tli, content, len);
782 
783  /*
784  * Mark the streamed history file as ready for archiving if
785  * archive_mode is always.
786  */
788  XLogArchiveForceDone(fname);
789  else
790  XLogArchiveNotify(fname);
791 
792  pfree(fname);
793  pfree(content);
794  }
795  }
796 }
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition: timeline.c:463
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition: timeline.c:222
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1157
#define LOG
Definition: elog.h:31
const void size_t len
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
Definition: walreceiver.h:449
int XLogArchiveMode
Definition: xlog.c:120
@ ARCHIVE_MODE_ALWAYS
Definition: xlog.h:67
#define MAXFNAMELEN
static void TLHistoryFileName(char *fname, TimeLineID tli)
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:510
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:444

References ARCHIVE_MODE_ALWAYS, ereport, errcode(), errmsg(), errmsg_internal(), ERROR, existsTimeLineHistory(), len, LOG, MAXFNAMELEN, pfree(), TLHistoryFileName(), walrcv_readtimelinehistoryfile, wrconn, writeTimeLineHistoryFile(), XLogArchiveForceDone(), XLogArchiveMode, and XLogArchiveNotify().

Referenced by WalReceiverMain().

◆ WalRcvForceReply()

void WalRcvForceReply ( void  )

Definition at line 1359 of file walreceiver.c.

1360 {
1361  ProcNumber procno;
1362 
1363  WalRcv->force_reply = true;
1364  /* fetching the proc number is probably atomic, but don't rely on it */
1366  procno = WalRcv->procno;
1368  if (procno != INVALID_PROC_NUMBER)
1369  SetLatch(&GetPGProcByNumber(procno)->procLatch);
1370 }
void SetLatch(Latch *latch)
Definition: latch.c:632
#define GetPGProcByNumber(n)
Definition: proc.h:436
int ProcNumber
Definition: procnumber.h:24
sig_atomic_t force_reply
Definition: walreceiver.h:162

References WalRcvData::force_reply, GetPGProcByNumber, INVALID_PROC_NUMBER, WalRcvData::mutex, WalRcvData::procno, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by ApplyWalRecord(), and WaitForWALToBecomeAvailable().

◆ WalRcvGetStateString()

static const char* WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1377 of file walreceiver.c.

1378 {
1379  switch (state)
1380  {
1381  case WALRCV_STOPPED:
1382  return "stopped";
1383  case WALRCV_STARTING:
1384  return "starting";
1385  case WALRCV_STREAMING:
1386  return "streaming";
1387  case WALRCV_WAITING:
1388  return "waiting";
1389  case WALRCV_RESTARTING:
1390  return "restarting";
1391  case WALRCV_STOPPING:
1392  return "stopping";
1393  }
1394  return "UNKNOWN";
1395 }

References WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

◆ WalRcvWaitForStartPosition()

static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 666 of file walreceiver.c.

667 {
668  WalRcvData *walrcv = WalRcv;
669  int state;
670 
671  SpinLockAcquire(&walrcv->mutex);
672  state = walrcv->walRcvState;
673  if (state != WALRCV_STREAMING)
674  {
675  SpinLockRelease(&walrcv->mutex);
676  if (state == WALRCV_STOPPING)
677  proc_exit(0);
678  else
679  elog(FATAL, "unexpected walreceiver state");
680  }
681  walrcv->walRcvState = WALRCV_WAITING;
683  walrcv->receiveStartTLI = 0;
684  SpinLockRelease(&walrcv->mutex);
685 
686  set_ps_display("idle");
687 
688  /*
689  * nudge startup process to notice that we've stopped streaming and are
690  * now waiting for instructions.
691  */
692  WakeupRecovery();
693  for (;;)
694  {
696 
698 
699  SpinLockAcquire(&walrcv->mutex);
700  Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
701  walrcv->walRcvState == WALRCV_WAITING ||
702  walrcv->walRcvState == WALRCV_STOPPING);
703  if (walrcv->walRcvState == WALRCV_RESTARTING)
704  {
705  /*
706  * No need to handle changes in primary_conninfo or
707  * primary_slot_name here. Startup process will signal us to
708  * terminate in case those change.
709  */
710  *startpoint = walrcv->receiveStart;
711  *startpointTLI = walrcv->receiveStartTLI;
712  walrcv->walRcvState = WALRCV_STREAMING;
713  SpinLockRelease(&walrcv->mutex);
714  break;
715  }
716  if (walrcv->walRcvState == WALRCV_STOPPING)
717  {
718  /*
719  * We should've received SIGTERM if the startup process wants us
720  * to die, but might as well check it here too.
721  */
722  SpinLockRelease(&walrcv->mutex);
723  exit(1);
724  }
725  SpinLockRelease(&walrcv->mutex);
726 
728  WAIT_EVENT_WAL_RECEIVER_WAIT_START);
729  }
730 
732  {
733  char activitymsg[50];
734 
735  snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
736  LSN_FORMAT_ARGS(*startpoint));
737  set_ps_display(activitymsg);
738  }
739 }
struct Latch * MyLatch
Definition: globals.c:62
void proc_exit(int code)
Definition: ipc.c:104
void ResetLatch(Latch *latch)
Definition: latch.c:724
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:517
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
exit(1)
#define snprintf
Definition: port.h:238
bool update_process_title
Definition: ps_status.c:29
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:162
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert, elog, exit(), FATAL, InvalidXLogRecPtr, LSN_FORMAT_ARGS, WalRcvData::mutex, MyLatch, proc_exit(), ProcessWalRcvInterrupts(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, ResetLatch(), set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by WalReceiverMain().

◆ WalReceiverMain()

void WalReceiverMain ( char *  startup_data,
size_t  startup_data_len 
)

Definition at line 183 of file walreceiver.c.

184 {
185  char conninfo[MAXCONNINFO];
186  char *tmp_conninfo;
187  char slotname[NAMEDATALEN];
188  bool is_temp_slot;
189  XLogRecPtr startpoint;
190  TimeLineID startpointTLI;
191  TimeLineID primaryTLI;
192  bool first_stream;
193  WalRcvData *walrcv;
195  char *err;
196  char *sender_host = NULL;
197  int sender_port = 0;
198  char *appname;
199 
200  Assert(startup_data_len == 0);
201 
204 
205  /*
206  * WalRcv should be set up already (if we are a backend, we inherit this
207  * by fork() or EXEC_BACKEND mechanism from the postmaster).
208  */
209  walrcv = WalRcv;
210  Assert(walrcv != NULL);
211 
212  /*
213  * Mark walreceiver as running in shared memory.
214  *
215  * Do this as early as possible, so that if we fail later on, we'll set
216  * state to STOPPED. If we die before this, the startup process will keep
217  * waiting for us to start up, until it times out.
218  */
219  SpinLockAcquire(&walrcv->mutex);
220  Assert(walrcv->pid == 0);
221  switch (walrcv->walRcvState)
222  {
223  case WALRCV_STOPPING:
224  /* If we've already been requested to stop, don't start up. */
225  walrcv->walRcvState = WALRCV_STOPPED;
226  /* fall through */
227 
228  case WALRCV_STOPPED:
229  SpinLockRelease(&walrcv->mutex);
231  proc_exit(1);
232  break;
233 
234  case WALRCV_STARTING:
235  /* The usual case */
236  break;
237 
238  case WALRCV_WAITING:
239  case WALRCV_STREAMING:
240  case WALRCV_RESTARTING:
241  default:
242  /* Shouldn't happen */
243  SpinLockRelease(&walrcv->mutex);
244  elog(PANIC, "walreceiver still running according to shared memory state");
245  }
246  /* Advertise our PID so that the startup process can kill us */
247  walrcv->pid = MyProcPid;
248  walrcv->walRcvState = WALRCV_STREAMING;
249 
250  /* Fetch information required to start streaming */
251  walrcv->ready_to_display = false;
252  strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
253  strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
254  is_temp_slot = walrcv->is_temp_slot;
255  startpoint = walrcv->receiveStart;
256  startpointTLI = walrcv->receiveStartTLI;
257 
258  /*
259  * At most one of is_temp_slot and slotname can be set; otherwise,
260  * RequestXLogStreaming messed up.
261  */
262  Assert(!is_temp_slot || (slotname[0] == '\0'));
263 
264  /* Initialise to a sanish value */
266  walrcv->lastMsgSendTime =
267  walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
268 
269  /* Report our proc number so that others can wake us up */
270  walrcv->procno = MyProcNumber;
271 
272  SpinLockRelease(&walrcv->mutex);
273 
275 
276  /* Arrange to clean up at walreceiver exit */
277  on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
278 
279  /* Properly accept or ignore signals the postmaster might send us */
280  pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
281  * file */
282  pqsignal(SIGINT, SIG_IGN);
283  pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
284  /* SIGQUIT handler was already set up by InitPostmasterChild */
289 
290  /* Reset some signals that are accepted by postmaster but not here */
292 
293  /* Load the libpq-specific functions */
294  load_file("libpqwalreceiver", false);
295  if (WalReceiverFunctions == NULL)
296  elog(ERROR, "libpqwalreceiver didn't initialize correctly");
297 
298  /* Unblock signals (they were blocked when the postmaster forked us) */
299  sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
300 
301  /* Establish the connection to the primary for XLOG streaming */
302  appname = cluster_name[0] ? cluster_name : "walreceiver";
303  wrconn = walrcv_connect(conninfo, true, false, false, appname, &err);
304  if (!wrconn)
305  ereport(ERROR,
306  (errcode(ERRCODE_CONNECTION_FAILURE),
307  errmsg("streaming replication receiver \"%s\" could not connect to the primary server: %s",
308  appname, err)));
309 
310  /*
311  * Save user-visible connection string. This clobbers the original
312  * conninfo, for security. Also save host and port of the sender server
313  * this walreceiver is connected to.
314  */
315  tmp_conninfo = walrcv_get_conninfo(wrconn);
316  walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
317  SpinLockAcquire(&walrcv->mutex);
318  memset(walrcv->conninfo, 0, MAXCONNINFO);
319  if (tmp_conninfo)
320  strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
321 
322  memset(walrcv->sender_host, 0, NI_MAXHOST);
323  if (sender_host)
324  strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
325 
326  walrcv->sender_port = sender_port;
327  walrcv->ready_to_display = true;
328  SpinLockRelease(&walrcv->mutex);
329 
330  if (tmp_conninfo)
331  pfree(tmp_conninfo);
332 
333  if (sender_host)
334  pfree(sender_host);
335 
336  first_stream = true;
337  for (;;)
338  {
339  char *primary_sysid;
340  char standby_sysid[32];
342 
343  /*
344  * Check that we're connected to a valid server using the
345  * IDENTIFY_SYSTEM replication command.
346  */
347  primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
348 
349  snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
351  if (strcmp(primary_sysid, standby_sysid) != 0)
352  {
353  ereport(ERROR,
354  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
355  errmsg("database system identifier differs between the primary and standby"),
356  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
357  primary_sysid, standby_sysid)));
358  }
359 
360  /*
361  * Confirm that the current timeline of the primary is the same or
362  * ahead of ours.
363  */
364  if (primaryTLI < startpointTLI)
365  ereport(ERROR,
366  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
367  errmsg("highest timeline %u of the primary is behind recovery timeline %u",
368  primaryTLI, startpointTLI)));
369 
370  /*
371  * Get any missing history files. We do this always, even when we're
372  * not interested in that timeline, so that if we're promoted to
373  * become the primary later on, we don't select the same timeline that
374  * was already used in the current primary. This isn't bullet-proof -
375  * you'll need some external software to manage your cluster if you
376  * need to ensure that a unique timeline id is chosen in every case,
377  * but let's avoid the confusion of timeline id collisions where we
378  * can.
379  */
380  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
381 
382  /*
383  * Create temporary replication slot if requested, and update slot
384  * name in shared memory. (Note the slot name cannot already be set
385  * in this case.)
386  */
387  if (is_temp_slot)
388  {
389  snprintf(slotname, sizeof(slotname),
390  "pg_walreceiver_%lld",
391  (long long int) walrcv_get_backend_pid(wrconn));
392 
393  walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL);
394 
395  SpinLockAcquire(&walrcv->mutex);
396  strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
397  SpinLockRelease(&walrcv->mutex);
398  }
399 
400  /*
401  * Start streaming.
402  *
403  * We'll try to start at the requested starting point and timeline,
404  * even if it's different from the server's latest timeline. In case
405  * we've already reached the end of the old timeline, the server will
406  * finish the streaming immediately, and we will go back to await
407  * orders from the startup process. If recovery_target_timeline is
408  * 'latest', the startup process will scan pg_wal and find the new
409  * history file, bump recovery target timeline, and ask us to restart
410  * on the new timeline.
411  */
412  options.logical = false;
413  options.startpoint = startpoint;
414  options.slotname = slotname[0] != '\0' ? slotname : NULL;
415  options.proto.physical.startpointTLI = startpointTLI;
417  {
418  if (first_stream)
419  ereport(LOG,
420  (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
421  LSN_FORMAT_ARGS(startpoint), startpointTLI)));
422  else
423  ereport(LOG,
424  (errmsg("restarted WAL streaming at %X/%X on timeline %u",
425  LSN_FORMAT_ARGS(startpoint), startpointTLI)));
426  first_stream = false;
427 
428  /* Initialize LogstreamResult and buffers for processing messages */
431 
432  /* Initialize nap wakeup times. */
434  for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
436 
437  /* Send initial reply/feedback messages. */
438  XLogWalRcvSendReply(true, false);
440 
441  /* Loop until end-of-streaming or error */
442  for (;;)
443  {
444  char *buf;
445  int len;
446  bool endofwal = false;
447  pgsocket wait_fd = PGINVALID_SOCKET;
448  int rc;
449  TimestampTz nextWakeup;
450  long nap;
451 
452  /*
453  * Exit walreceiver if we're not in recovery. This should not
454  * happen, but cross-check the status here.
455  */
456  if (!RecoveryInProgress())
457  ereport(FATAL,
458  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
459  errmsg("cannot continue WAL streaming, recovery has already ended")));
460 
461  /* Process any requests or signals received recently */
463 
465  {
466  ConfigReloadPending = false;
468  /* recompute wakeup times */
470  for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
473  }
474 
475  /* See if we can read data immediately */
476  len = walrcv_receive(wrconn, &buf, &wait_fd);
477  if (len != 0)
478  {
479  /*
480  * Process the received data, and any subsequent data we
481  * can read without blocking.
482  */
483  for (;;)
484  {
485  if (len > 0)
486  {
487  /*
488  * Something was received from primary, so adjust
489  * the ping and terminate wakeup times.
490  */
493  now);
495  XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
496  startpointTLI);
497  }
498  else if (len == 0)
499  break;
500  else if (len < 0)
501  {
502  ereport(LOG,
503  (errmsg("replication terminated by primary server"),
504  errdetail("End of WAL reached on timeline %u at %X/%X.",
505  startpointTLI,
507  endofwal = true;
508  break;
509  }
510  len = walrcv_receive(wrconn, &buf, &wait_fd);
511  }
512 
513  /* Let the primary know that we received some data. */
514  XLogWalRcvSendReply(false, false);
515 
516  /*
517  * If we've written some records, flush them to disk and
518  * let the startup process and primary server know about
519  * them.
520  */
521  XLogWalRcvFlush(false, startpointTLI);
522  }
523 
524  /* Check if we need to exit the streaming loop. */
525  if (endofwal)
526  break;
527 
528  /* Find the soonest wakeup time, to limit our nap. */
529  nextWakeup = TIMESTAMP_INFINITY;
530  for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
531  nextWakeup = Min(wakeup[i], nextWakeup);
532 
533  /* Calculate the nap time, clamping as necessary. */
535  nap = TimestampDifferenceMilliseconds(now, nextWakeup);
536 
537  /*
538  * Ideally we would reuse a WaitEventSet object repeatedly
539  * here to avoid the overheads of WaitLatchOrSocket on epoll
540  * systems, but we can't be sure that libpq (or any other
541  * walreceiver implementation) has the same socket (even if
542  * the fd is the same number, it may have been closed and
543  * reopened since the last time). In future, if there is a
544  * function for removing sockets from WaitEventSet, then we
545  * could add and remove just the socket each time, potentially
546  * avoiding some system calls.
547  */
548  Assert(wait_fd != PGINVALID_SOCKET);
552  wait_fd,
553  nap,
554  WAIT_EVENT_WAL_RECEIVER_MAIN);
555  if (rc & WL_LATCH_SET)
556  {
559 
560  if (walrcv->force_reply)
561  {
562  /*
563  * The recovery process has asked us to send apply
564  * feedback now. Make sure the flag is really set to
565  * false in shared memory before sending the reply, so
566  * we don't miss a new request for a reply.
567  */
568  walrcv->force_reply = false;
570  XLogWalRcvSendReply(true, false);
571  }
572  }
573  if (rc & WL_TIMEOUT)
574  {
575  /*
576  * We didn't receive anything new. If we haven't heard
577  * anything from the server for more than
578  * wal_receiver_timeout / 2, ping the server. Also, if
579  * it's been longer than wal_receiver_status_interval
580  * since the last update we sent, send a status update to
581  * the primary anyway, to report any progress in applying
582  * WAL.
583  */
584  bool requestReply = false;
585 
586  /*
587  * Check if time since last receive from primary has
588  * reached the configured limit.
589  */
592  ereport(ERROR,
593  (errcode(ERRCODE_CONNECTION_FAILURE),
594  errmsg("terminating walreceiver due to timeout")));
595 
596  /*
597  * If we didn't receive anything new for half of receiver
598  * replication timeout, then ping the server.
599  */
600  if (now >= wakeup[WALRCV_WAKEUP_PING])
601  {
602  requestReply = true;
604  }
605 
606  XLogWalRcvSendReply(requestReply, requestReply);
608  }
609  }
610 
611  /*
612  * The backend finished streaming. Exit streaming COPY-mode from
613  * our side, too.
614  */
615  walrcv_endstreaming(wrconn, &primaryTLI);
616 
617  /*
618  * If the server had switched to a new timeline that we didn't
619  * know about when we began streaming, fetch its timeline history
620  * file now.
621  */
622  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
623  }
624  else
625  ereport(LOG,
626  (errmsg("primary server contains no more WAL on requested timeline %u",
627  startpointTLI)));
628 
629  /*
630  * End of WAL reached on the requested timeline. Close the last
631  * segment, and await for new orders from the startup process.
632  */
633  if (recvFile >= 0)
634  {
635  char xlogfname[MAXFNAMELEN];
636 
637  XLogWalRcvFlush(false, startpointTLI);
639  if (close(recvFile) != 0)
640  ereport(PANIC,
642  errmsg("could not close WAL segment %s: %m",
643  xlogfname)));
644 
645  /*
646  * Create .done file forcibly to prevent the streamed segment from
647  * being archived later.
648  */
650  XLogArchiveForceDone(xlogfname);
651  else
652  XLogArchiveNotify(xlogfname);
653  }
654  recvFile = -1;
655 
656  elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
657  WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
658  }
659  /* not reached */
660 }
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:485
#define pg_memory_barrier()
Definition: atomics.h:143
void AuxiliaryProcessMainCommon(void)
Definition: auxprocess.c:39
sigset_t UnBlockSig
Definition: pqsignal.c:22
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1756
#define Min(x, y)
Definition: c.h:995
#define UINT64_FORMAT
Definition: c.h:540
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:144
int errcode_for_file_access(void)
Definition: elog.c:876
int errdetail(const char *fmt,...)
Definition: elog.c:1203
#define PANIC
Definition: elog.h:42
#define DEBUG1
Definition: elog.h:30
void err(int eval, const char *fmt,...)
Definition: err.c:43
ProcNumber MyProcNumber
Definition: globals.c:89
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
char * cluster_name
Definition: guc_tables.c:537
#define close(a)
Definition: win32.h:12
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition: interrupt.c:105
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
int i
Definition: isn.c:72
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:565
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_TIMEOUT
Definition: latch.h:130
@ B_WAL_RECEIVER
Definition: miscadmin.h:356
BackendType MyBackendType
Definition: miscinit.c:63
static char ** options
static char * buf
Definition: pg_test_fsync.c:73
pqsigfunc pqsignal(int signo, pqsigfunc func)
int pgsocket
Definition: port.h:29
#define PGINVALID_SOCKET
Definition: port.h:31
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:671
void initStringInfo(StringInfo str)
Definition: stringinfo.c:56
bool is_temp_slot
Definition: walreceiver.h:142
#define NUM_WALRCV_WAKEUPS
Definition: walreceiver.c:123
static StringInfoData reply_message
Definition: walreceiver.c:131
static int recvFile
Definition: walreceiver.c:100
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
Definition: walreceiver.c:746
static TimeLineID recvFileTLI
Definition: walreceiver.c:101
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:93
static XLogSegNo recvSegNo
Definition: walreceiver.c:102
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1170
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
Definition: walreceiver.c:666
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
Definition: walreceiver.c:840
static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
Definition: walreceiver.c:1318
static void WalRcvDie(int code, Datum arg)
Definition: walreceiver.c:802
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1101
static struct @19 LogstreamResult
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:451
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:435
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
Definition: walreceiver.h:441
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
Definition: walreceiver.h:459
#define walrcv_get_conninfo(conn)
Definition: walreceiver.h:439
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:453
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:443
#define walrcv_get_backend_pid(conn)
Definition: walreceiver.h:463
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:455
#define SIGCHLD
Definition: win32_port.h:178
#define SIGHUP
Definition: win32_port.h:168
#define SIG_DFL
Definition: win32_port.h:163
#define SIGPIPE
Definition: win32_port.h:173
#define SIGUSR1
Definition: win32_port.h:180
#define SIGALRM
Definition: win32_port.h:174
#define SIGUSR2
Definition: win32_port.h:181
#define SIG_IGN
Definition: win32_port.h:165
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4569
bool RecoveryInProgress(void)
Definition: xlog.c:6341
int wal_segment_size
Definition: xlog.c:144
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References ARCHIVE_MODE_ALWAYS, Assert, AuxiliaryProcessMainCommon(), B_WAL_RECEIVER, buf, close, cluster_name, ConditionVariableBroadcast(), ConfigReloadPending, WalRcvData::conninfo, DEBUG1, elog, ereport, err(), errcode(), errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, WalRcvData::force_reply, GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), i, initStringInfo(), WalRcvData::is_temp_slot, WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEndTime, len, load_file(), LOG, LogstreamResult, LSN_FORMAT_ARGS, MAXCONNINFO, MAXFNAMELEN, Min, WalRcvData::mutex, MyBackendType, MyLatch, MyProcNumber, MyProcPid, NAMEDATALEN, now(), NUM_WALRCV_WAKEUPS, on_shmem_exit(), options, PANIC, pfree(), pg_atomic_write_u64(), pg_memory_barrier, PGC_SIGHUP, PGINVALID_SOCKET, WalRcvData::pid, PointerGetDatum(), pqsignal(), proc_exit(), ProcessConfigFile(), ProcessWalRcvInterrupts(), WalRcvData::procno, procsignal_sigusr1_handler(), WalRcvData::ready_to_display, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, reply_message, ResetLatch(), WalRcvData::sender_host, WalRcvData::sender_port, SIG_DFL, SIG_IGN, SIGALRM, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SIGPIPE, SIGUSR1, SIGUSR2, WalRcvData::slotname, snprintf, SpinLockAcquire, SpinLockRelease, strlcpy(), TIMESTAMP_INFINITY, TimestampDifferenceMilliseconds(), UINT64_FORMAT, UnBlockSig, WaitLatchOrSocket(), wakeup, wal_segment_size, WalRcv, walrcv_connect, walrcv_create_slot, walrcv_endstreaming, walrcv_get_backend_pid, walrcv_get_conninfo, walrcv_get_senderinfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_TERMINATE, WalRcvComputeNextWakeup(), WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, WalRcvWaitForStartPosition(), WalReceiverFunctions, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_TIMEOUT, wrconn, WalRcvData::writtenUpto, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName(), XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ XLogWalRcvClose()

static void XLogWalRcvClose ( XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 1049 of file walreceiver.c.

1050 {
1051  char xlogfname[MAXFNAMELEN];
1052 
1054  Assert(tli != 0);
1055 
1056  /*
1057  * fsync() and close current file before we switch to next one. We would
1058  * otherwise have to reopen this file to fsync it later
1059  */
1060  XLogWalRcvFlush(false, tli);
1061 
1063 
1064  /*
1065  * XLOG segment files will be re-read by recovery in startup process soon,
1066  * so we don't advise the OS to release cache pages associated with the
1067  * file like XLogFileClose() does.
1068  */
1069  if (close(recvFile) != 0)
1070  ereport(PANIC,
1072  errmsg("could not close WAL segment %s: %m",
1073  xlogfname)));
1074 
1075  /*
1076  * Create .done file forcibly to prevent the streamed segment from being
1077  * archived later.
1078  */
1080  XLogArchiveForceDone(xlogfname);
1081  else
1082  XLogArchiveNotify(xlogfname);
1083 
1084  recvFile = -1;
1085 }
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

References ARCHIVE_MODE_ALWAYS, Assert, close, ereport, errcode_for_file_access(), errmsg(), MAXFNAMELEN, PANIC, recvFile, recvFileTLI, recvSegNo, wal_segment_size, XLByteInSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName(), and XLogWalRcvFlush().

Referenced by XLogWalRcvWrite().

◆ XLogWalRcvFlush()

static void XLogWalRcvFlush ( bool  dying,
TimeLineID  tli 
)
static

Definition at line 994 of file walreceiver.c.

995 {
996  Assert(tli != 0);
997 
998  if (LogstreamResult.Flush < LogstreamResult.Write)
999  {
1000  WalRcvData *walrcv = WalRcv;
1001 
1003 
1004  LogstreamResult.Flush = LogstreamResult.Write;
1005 
1006  /* Update shared-memory status */
1007  SpinLockAcquire(&walrcv->mutex);
1008  if (walrcv->flushedUpto < LogstreamResult.Flush)
1009  {
1010  walrcv->latestChunkStart = walrcv->flushedUpto;
1011  walrcv->flushedUpto = LogstreamResult.Flush;
1012  walrcv->receivedTLI = tli;
1013  }
1014  SpinLockRelease(&walrcv->mutex);
1015 
1016  /* Signal the startup process and walsender that new WAL has arrived */
1017  WakeupRecovery();
1019  WalSndWakeup(true, false);
1020 
1021  /* Report XLOG streaming progress in PS display */
1023  {
1024  char activitymsg[50];
1025 
1026  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1028  set_ps_display(activitymsg);
1029  }
1030 
1031  /* Also let the primary know that we made some progress */
1032  if (!dying)
1033  {
1034  XLogWalRcvSendReply(false, false);
1035  XLogWalRcvSendHSFeedback(false);
1036  }
1037  }
1038 }
XLogRecPtr latestChunkStart
Definition: walreceiver.h:105
#define AllowCascadeReplication()
Definition: walreceiver.h:40
void WalSndWakeup(bool physical, bool logical)
Definition: walsender.c:3638
void issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:8690

References AllowCascadeReplication, Assert, WalRcvData::flushedUpto, issue_xlog_fsync(), WalRcvData::latestChunkStart, LogstreamResult, LSN_FORMAT_ARGS, WalRcvData::mutex, WalRcvData::receivedTLI, recvFile, recvSegNo, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvClose().

◆ XLogWalRcvProcessMsg()

static void XLogWalRcvProcessMsg ( unsigned char  type,
char *  buf,
Size  len,
TimeLineID  tli 
)
static

Definition at line 840 of file walreceiver.c.

841 {
842  int hdrlen;
843  XLogRecPtr dataStart;
844  XLogRecPtr walEnd;
845  TimestampTz sendTime;
846  bool replyRequested;
847 
848  switch (type)
849  {
850  case 'w': /* WAL records */
851  {
852  StringInfoData incoming_message;
853 
854  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
855  if (len < hdrlen)
856  ereport(ERROR,
857  (errcode(ERRCODE_PROTOCOL_VIOLATION),
858  errmsg_internal("invalid WAL message received from primary")));
859 
860  /* initialize a StringInfo with the given buffer */
861  initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
862 
863  /* read the fields */
864  dataStart = pq_getmsgint64(&incoming_message);
865  walEnd = pq_getmsgint64(&incoming_message);
866  sendTime = pq_getmsgint64(&incoming_message);
867  ProcessWalSndrMessage(walEnd, sendTime);
868 
869  buf += hdrlen;
870  len -= hdrlen;
871  XLogWalRcvWrite(buf, len, dataStart, tli);
872  break;
873  }
874  case 'k': /* Keepalive */
875  {
876  StringInfoData incoming_message;
877 
878  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
879  if (len != hdrlen)
880  ereport(ERROR,
881  (errcode(ERRCODE_PROTOCOL_VIOLATION),
882  errmsg_internal("invalid keepalive message received from primary")));
883 
884  /* initialize a StringInfo with the given buffer */
885  initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
886 
887  /* read the fields */
888  walEnd = pq_getmsgint64(&incoming_message);
889  sendTime = pq_getmsgint64(&incoming_message);
890  replyRequested = pq_getmsgbyte(&incoming_message);
891 
892  ProcessWalSndrMessage(walEnd, sendTime);
893 
894  /* If the primary requested a reply, send one immediately */
895  if (replyRequested)
896  XLogWalRcvSendReply(true, false);
897  break;
898  }
899  default:
900  ereport(ERROR,
901  (errcode(ERRCODE_PROTOCOL_VIOLATION),
902  errmsg_internal("invalid replication message type %d",
903  type)));
904  }
905 }
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:453
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition: stringinfo.h:130
const char * type
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
Definition: walreceiver.c:1266
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
Definition: walreceiver.c:911

References buf, ereport, errcode(), errmsg_internal(), ERROR, initReadOnlyStringInfo(), len, pq_getmsgbyte(), pq_getmsgint64(), ProcessWalSndrMessage(), type, XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

◆ XLogWalRcvSendHSFeedback()

static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1170 of file walreceiver.c.

1171 {
1172  TimestampTz now;
1173  FullTransactionId nextFullXid;
1174  TransactionId nextXid;
1175  uint32 xmin_epoch,
1176  catalog_xmin_epoch;
1177  TransactionId xmin,
1178  catalog_xmin;
1179 
1180  /* initially true so we always send at least one feedback message */
1181  static bool primary_has_standby_xmin = true;
1182 
1183  /*
1184  * If the user doesn't want status to be reported to the primary, be sure
1185  * to exit before doing anything at all.
1186  */
1188  !primary_has_standby_xmin)
1189  return;
1190 
1191  /* Get current timestamp. */
1193 
1194  /* Send feedback at most once per wal_receiver_status_interval. */
1195  if (!immed && now < wakeup[WALRCV_WAKEUP_HSFEEDBACK])
1196  return;
1197 
1198  /* Make sure we wake up when it's time to send feedback again. */
1200 
1201  /*
1202  * If Hot Standby is not yet accepting connections there is nothing to
1203  * send. Check this after the interval has expired to reduce number of
1204  * calls.
1205  *
1206  * Bailing out here also ensures that we don't send feedback until we've
1207  * read our own replication slot state, so we don't tell the primary to
1208  * discard needed xmin or catalog_xmin from any slots that may exist on
1209  * this replica.
1210  */
1211  if (!HotStandbyActive())
1212  return;
1213 
1214  /*
1215  * Make the expensive call to get the oldest xmin once we are certain
1216  * everything else has been checked.
1217  */
1219  {
1220  GetReplicationHorizons(&xmin, &catalog_xmin);
1221  }
1222  else
1223  {
1224  xmin = InvalidTransactionId;
1225  catalog_xmin = InvalidTransactionId;
1226  }
1227 
1228  /*
1229  * Get epoch and adjust if nextXid and oldestXmin are different sides of
1230  * the epoch boundary.
1231  */
1232  nextFullXid = ReadNextFullTransactionId();
1233  nextXid = XidFromFullTransactionId(nextFullXid);
1234  xmin_epoch = EpochFromFullTransactionId(nextFullXid);
1235  catalog_xmin_epoch = xmin_epoch;
1236  if (nextXid < xmin)
1237  xmin_epoch--;
1238  if (nextXid < catalog_xmin)
1239  catalog_xmin_epoch--;
1240 
1241  elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1242  xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1243 
1244  /* Construct the message and send it. */
1246  pq_sendbyte(&reply_message, 'h');
1248  pq_sendint32(&reply_message, xmin);
1249  pq_sendint32(&reply_message, xmin_epoch);
1250  pq_sendint32(&reply_message, catalog_xmin);
1251  pq_sendint32(&reply_message, catalog_xmin_epoch);
1253  if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1254  primary_has_standby_xmin = true;
1255  else
1256  primary_has_standby_xmin = false;
1257 }
unsigned int uint32
Definition: c.h:506
uint32 TransactionId
Definition: c.h:643
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:144
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152
void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:2047
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
#define InvalidTransactionId
Definition: transam.h:31
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
#define TransactionIdIsValid(xid)
Definition: transam.h:41
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:288
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:457
bool HotStandbyActive(void)

References StringInfoData::data, DEBUG2, elog, EpochFromFullTransactionId, GetCurrentTimestamp(), GetReplicationHorizons(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), pq_sendbyte(), pq_sendint32(), pq_sendint64(), ReadNextFullTransactionId(), reply_message, resetStringInfo(), TransactionIdIsValid, wakeup, wal_receiver_status_interval, walrcv_send, WALRCV_WAKEUP_HSFEEDBACK, WalRcvComputeNextWakeup(), wrconn, and XidFromFullTransactionId.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

◆ XLogWalRcvSendReply()

static void XLogWalRcvSendReply ( bool  force,
bool  requestReply 
)
static

Definition at line 1101 of file walreceiver.c.

1102 {
1103  static XLogRecPtr writePtr = 0;
1104  static XLogRecPtr flushPtr = 0;
1105  XLogRecPtr applyPtr;
1106  TimestampTz now;
1107 
1108  /*
1109  * If the user doesn't want status to be reported to the primary, be sure
1110  * to exit before doing anything at all.
1111  */
1112  if (!force && wal_receiver_status_interval <= 0)
1113  return;
1114 
1115  /* Get current timestamp. */
1117 
1118  /*
1119  * We can compare the write and flush positions to the last message we
1120  * sent without taking any lock, but the apply position requires a spin
1121  * lock, so we don't check that unless something else has changed or 10
1122  * seconds have passed. This means that the apply WAL location will
1123  * appear, from the primary's point of view, to lag slightly, but since
1124  * this is only for reporting purposes and only on idle systems, that's
1125  * probably OK.
1126  */
1127  if (!force
1128  && writePtr == LogstreamResult.Write
1129  && flushPtr == LogstreamResult.Flush
1131  return;
1132 
1133  /* Make sure we wake up when it's time to send another reply. */
1135 
1136  /* Construct a new message */
1137  writePtr = LogstreamResult.Write;
1138  flushPtr = LogstreamResult.Flush;
1139  applyPtr = GetXLogReplayRecPtr(NULL);
1140 
1142  pq_sendbyte(&reply_message, 'r');
1143  pq_sendint64(&reply_message, writePtr);
1144  pq_sendint64(&reply_message, flushPtr);
1145  pq_sendint64(&reply_message, applyPtr);
1147  pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1148 
1149  /* Send it */
1150  elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1151  LSN_FORMAT_ARGS(writePtr),
1152  LSN_FORMAT_ARGS(flushPtr),
1153  LSN_FORMAT_ARGS(applyPtr),
1154  requestReply ? " (reply requested)" : "");
1155 
1157 }

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), GetXLogReplayRecPtr(), StringInfoData::len, LogstreamResult, LSN_FORMAT_ARGS, now(), pq_sendbyte(), pq_sendint64(), reply_message, resetStringInfo(), wakeup, wal_receiver_status_interval, walrcv_send, WALRCV_WAKEUP_REPLY, WalRcvComputeNextWakeup(), and wrconn.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

◆ XLogWalRcvWrite()

static void XLogWalRcvWrite ( char *  buf,
Size  nbytes,
XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 911 of file walreceiver.c.

912 {
913  int startoff;
914  int byteswritten;
915 
916  Assert(tli != 0);
917 
918  while (nbytes > 0)
919  {
920  int segbytes;
921 
922  /* Close the current segment if it's completed */
923  if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
924  XLogWalRcvClose(recptr, tli);
925 
926  if (recvFile < 0)
927  {
928  /* Create/use new log file */
931  recvFileTLI = tli;
932  }
933 
934  /* Calculate the start offset of the received logs */
935  startoff = XLogSegmentOffset(recptr, wal_segment_size);
936 
937  if (startoff + nbytes > wal_segment_size)
938  segbytes = wal_segment_size - startoff;
939  else
940  segbytes = nbytes;
941 
942  /* OK to write the logs */
943  errno = 0;
944 
945  byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
946  if (byteswritten <= 0)
947  {
948  char xlogfname[MAXFNAMELEN];
949  int save_errno;
950 
951  /* if write didn't set errno, assume no disk space */
952  if (errno == 0)
953  errno = ENOSPC;
954 
955  save_errno = errno;
957  errno = save_errno;
958  ereport(PANIC,
960  errmsg("could not write to WAL segment %s "
961  "at offset %d, length %lu: %m",
962  xlogfname, startoff, (unsigned long) segbytes)));
963  }
964 
965  /* Update state for write */
966  recptr += byteswritten;
967 
968  nbytes -= byteswritten;
969  buf += byteswritten;
970 
971  LogstreamResult.Write = recptr;
972  }
973 
974  /* Update shared-memory status */
976 
977  /*
978  * Close the current segment if it's fully written up in the last cycle of
979  * the loop, to create its archive notification file soon. Otherwise WAL
980  * archiving of the segment will be delayed until any data in the next
981  * segment is received and written.
982  */
983  if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
984  XLogWalRcvClose(recptr, tli);
985 }
#define pg_pwrite
Definition: port.h:226
static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
Definition: walreceiver.c:1049
int XLogFileInit(XLogSegNo logsegno, TimeLineID logtli)
Definition: xlog.c:3381
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

References Assert, buf, ereport, errcode_for_file_access(), errmsg(), LogstreamResult, MAXFNAMELEN, PANIC, pg_atomic_write_u64(), pg_pwrite, recvFile, recvFileTLI, recvSegNo, wal_segment_size, WalRcv, WalRcvData::writtenUpto, XLByteInSeg, XLByteToSeg, XLogFileInit(), XLogFileName(), XLogSegmentOffset, and XLogWalRcvClose().

Referenced by XLogWalRcvProcessMsg().

Variable Documentation

◆ Flush

XLogRecPtr Flush

Definition at line 111 of file walreceiver.c.

Referenced by XLogWrite().

◆ hot_standby_feedback

bool hot_standby_feedback

◆ 

struct { ... } LogstreamResult

◆ recvFile

int recvFile = -1
static

Definition at line 100 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ recvFileTLI

TimeLineID recvFileTLI = 0
static

Definition at line 101 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), and XLogWalRcvWrite().

◆ recvSegNo

XLogSegNo recvSegNo = 0
static

Definition at line 102 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ reply_message

StringInfoData reply_message
static

◆ wakeup

◆ wal_receiver_status_interval

int wal_receiver_status_interval

◆ wal_receiver_timeout

int wal_receiver_timeout

◆ WalReceiverFunctions

WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 93 of file walreceiver.c.

Referenced by _PG_init(), and WalReceiverMain().

◆ wrconn

◆ Write

XLogRecPtr Write

Definition at line 110 of file walreceiver.c.

Referenced by XLogWrite().