PostgreSQL Source Code  git master
walreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "common/ip.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */
 

Functions

static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr)
 
static void XLogWalRcvFlush (bool dying)
 
static void XLogWalRcvSendReply (bool force, bool requestReply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
void ProcessWalRcvInterrupts (void)
 
void WalReceiverMain (void)
 
void WalRcvForceReply (void)
 
static const char * WalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
struct {
   XLogRecPtr   Write
 
   XLogRecPtr   Flush
 
LogstreamResult
 
static StringInfoData reply_message
 
static StringInfoData incoming_message
 

Macro Definition Documentation

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */

Definition at line 96 of file walreceiver.c.

Referenced by WalReceiverMain().

Function Documentation

◆ pg_stat_get_wal_receiver()

Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1299 of file walreceiver.c.

References WalRcvData::conninfo, CStringGetTextDatum, elog, ERROR, WalRcvData::flushedUpto, get_call_result_type(), GetUserId(), heap_form_tuple(), HeapTupleGetDatum, Int32GetDatum, is_member_of_role(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum, MAXCONNINFO, MemSet, WalRcvData::mutex, NAMEDATALEN, TupleDescData::natts, NI_MAXHOST, palloc0(), pg_atomic_read_u64(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::sender_host, WalRcvData::sender_port, WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, strlcpy(), TimestampTzGetDatum, TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, WalRcvData::writtenUpto, and XLogRecPtrIsInvalid.

1300 {
1301  TupleDesc tupdesc;
1302  Datum *values;
1303  bool *nulls;
1304  int pid;
1305  bool ready_to_display;
1307  XLogRecPtr receive_start_lsn;
1308  TimeLineID receive_start_tli;
1309  XLogRecPtr written_lsn;
1310  XLogRecPtr flushed_lsn;
1311  TimeLineID received_tli;
1312  TimestampTz last_send_time;
1313  TimestampTz last_receipt_time;
1314  XLogRecPtr latest_end_lsn;
1315  TimestampTz latest_end_time;
1316  char sender_host[NI_MAXHOST];
1317  int sender_port = 0;
1318  char slotname[NAMEDATALEN];
1319  char conninfo[MAXCONNINFO];
1320 
1321  /* Take a lock to ensure value consistency */
1323  pid = (int) WalRcv->pid;
1324  ready_to_display = WalRcv->ready_to_display;
1325  state = WalRcv->walRcvState;
1326  receive_start_lsn = WalRcv->receiveStart;
1327  receive_start_tli = WalRcv->receiveStartTLI;
1328  written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto);
1329  flushed_lsn = WalRcv->flushedUpto;
1330  received_tli = WalRcv->receivedTLI;
1331  last_send_time = WalRcv->lastMsgSendTime;
1332  last_receipt_time = WalRcv->lastMsgReceiptTime;
1333  latest_end_lsn = WalRcv->latestWalEnd;
1334  latest_end_time = WalRcv->latestWalEndTime;
1335  strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1336  strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
1337  sender_port = WalRcv->sender_port;
1338  strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1340 
1341  /*
1342  * No WAL receiver (or not ready yet), just return a tuple with NULL
1343  * values
1344  */
1345  if (pid == 0 || !ready_to_display)
1346  PG_RETURN_NULL();
1347 
1348  /* determine result type */
1349  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1350  elog(ERROR, "return type must be a row type");
1351 
1352  values = palloc0(sizeof(Datum) * tupdesc->natts);
1353  nulls = palloc0(sizeof(bool) * tupdesc->natts);
1354 
1355  /* Fetch values */
1356  values[0] = Int32GetDatum(pid);
1357 
1358  if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
1359  {
1360  /*
1361  * Only superusers and members of pg_read_all_stats can see details.
1362  * Other users only get the pid value to know whether it is a WAL
1363  * receiver, but no details.
1364  */
1365  MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1366  }
1367  else
1368  {
1369  values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1370 
1371  if (XLogRecPtrIsInvalid(receive_start_lsn))
1372  nulls[2] = true;
1373  else
1374  values[2] = LSNGetDatum(receive_start_lsn);
1375  values[3] = Int32GetDatum(receive_start_tli);
1376  if (XLogRecPtrIsInvalid(written_lsn))
1377  nulls[4] = true;
1378  else
1379  values[4] = LSNGetDatum(written_lsn);
1380  if (XLogRecPtrIsInvalid(flushed_lsn))
1381  nulls[5] = true;
1382  else
1383  values[5] = LSNGetDatum(flushed_lsn);
1384  values[6] = Int32GetDatum(received_tli);
1385  if (last_send_time == 0)
1386  nulls[7] = true;
1387  else
1388  values[7] = TimestampTzGetDatum(last_send_time);
1389  if (last_receipt_time == 0)
1390  nulls[8] = true;
1391  else
1392  values[8] = TimestampTzGetDatum(last_receipt_time);
1393  if (XLogRecPtrIsInvalid(latest_end_lsn))
1394  nulls[9] = true;
1395  else
1396  values[9] = LSNGetDatum(latest_end_lsn);
1397  if (latest_end_time == 0)
1398  nulls[10] = true;
1399  else
1400  values[10] = TimestampTzGetDatum(latest_end_time);
1401  if (*slotname == '\0')
1402  nulls[11] = true;
1403  else
1404  values[11] = CStringGetTextDatum(slotname);
1405  if (*sender_host == '\0')
1406  nulls[12] = true;
1407  else
1408  values[12] = CStringGetTextDatum(sender_host);
1409  if (sender_port == 0)
1410  nulls[13] = true;
1411  else
1412  values[13] = Int32GetDatum(sender_port);
1413  if (*conninfo == '\0')
1414  nulls[14] = true;
1415  else
1416  values[14] = CStringGetTextDatum(conninfo);
1417  }
1418 
1419  /* Returns the record as Datum */
1420  PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1421 }
int sender_port
Definition: walreceiver.h:117
static const char * WalRcvGetStateString(WalRcvState state)
Definition: walreceiver.c:1274
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:207
uint32 TimeLineID
Definition: xlogdefs.h:52
slock_t mutex
Definition: walreceiver.h:143
Oid GetUserId(void)
Definition: miscinit.c:476
int64 TimestampTz
Definition: timestamp.h:39
WalRcvState walRcvState
Definition: walreceiver.h:64
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
TimeLineID receivedTLI
Definition: walreceiver.h:84
#define MemSet(start, val, len)
Definition: c.h:996
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:98
TimestampTz lastMsgSendTime
Definition: walreceiver.h:97
WalRcvState
Definition: walreceiver.h:44
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:151
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define NI_MAXHOST
Definition: getaddrinfo.h:88
#define ERROR
Definition: elog.h:45
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
pid_t pid
Definition: walreceiver.h:63
#define MAXCONNINFO
Definition: walreceiver.h:36
TimeLineID receiveStartTLI
Definition: walreceiver.h:74
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
void * palloc0(Size size)
Definition: mcxt.c:981
uintptr_t Datum
Definition: postgres.h:367
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:353
bool ready_to_display
Definition: walreceiver.h:132
TimestampTz latestWalEndTime
Definition: walreceiver.h:104
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4919
XLogRecPtr latestWalEnd
Definition: walreceiver.h:103
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:221
WalRcvData * WalRcv
static Datum values[MAXATTR]
Definition: bootstrap.c:165
#define Int32GetDatum(X)
Definition: postgres.h:479
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:429
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:116
#define elog(elevel,...)
Definition: elog.h:228
#define CStringGetTextDatum(s)
Definition: builtins.h:82
XLogRecPtr receiveStart
Definition: walreceiver.h:73
XLogRecPtr flushedUpto
Definition: walreceiver.h:83
#define PG_RETURN_NULL()
Definition: fmgr.h:345
char slotname[NAMEDATALEN]
Definition: walreceiver.h:123
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:110

◆ ProcessWalRcvInterrupts()

void ProcessWalRcvInterrupts ( void  )

Definition at line 145 of file walreceiver.c.

References CHECK_FOR_INTERRUPTS, ereport, errcode(), errmsg(), FATAL, and ShutdownRequestPending.

Referenced by libpqrcv_connect(), libpqrcv_PQgetResult(), libpqrcv_processTuples(), walrcv_clear_result(), WalRcvWaitForStartPosition(), and WalReceiverMain().

146 {
147  /*
148  * Although walreceiver interrupt handling doesn't use the same scheme as
149  * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
150  * any incoming signals on Win32, and also to make sure we process any
151  * barrier events.
152  */
154 
156  {
157  ereport(FATAL,
158  (errcode(ERRCODE_ADMIN_SHUTDOWN),
159  errmsg("terminating walreceiver process due to administrator command")));
160  }
161 }
int errcode(int sqlerrcode)
Definition: elog.c:704
#define FATAL
Definition: elog.h:54
volatile sig_atomic_t ShutdownRequestPending
Definition: interrupt.c:27
#define ereport(elevel,...)
Definition: elog.h:155
int errmsg(const char *fmt,...)
Definition: elog.c:915
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:100

◆ ProcessWalSndrMessage()

static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1203 of file walreceiver.c.

References DEBUG2, elog, GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, message_level_is_interesting(), WalRcvData::mutex, pfree(), pstrdup(), SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

1204 {
1205  WalRcvData *walrcv = WalRcv;
1206 
1207  TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1208 
1209  /* Update shared-memory status */
1210  SpinLockAcquire(&walrcv->mutex);
1211  if (walrcv->latestWalEnd < walEnd)
1212  walrcv->latestWalEndTime = sendTime;
1213  walrcv->latestWalEnd = walEnd;
1214  walrcv->lastMsgSendTime = sendTime;
1215  walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1216  SpinLockRelease(&walrcv->mutex);
1217 
1219  {
1220  char *sendtime;
1221  char *receipttime;
1222  int applyDelay;
1223 
1224  /* Copy because timestamptz_to_str returns a static buffer */
1225  sendtime = pstrdup(timestamptz_to_str(sendTime));
1226  receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1227  applyDelay = GetReplicationApplyDelay();
1228 
1229  /* apply delay is not available */
1230  if (applyDelay == -1)
1231  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1232  sendtime,
1233  receipttime,
1235  else
1236  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1237  sendtime,
1238  receipttime,
1239  applyDelay,
1241 
1242  pfree(sendtime);
1243  pfree(receipttime);
1244  }
1245 }
slock_t mutex
Definition: walreceiver.h:143
int GetReplicationTransferLatency(void)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1187
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:98
TimestampTz lastMsgSendTime
Definition: walreceiver.h:97
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pfree(void *pointer)
Definition: mcxt.c:1057
#define DEBUG2
Definition: elog.h:24
bool message_level_is_interesting(int elevel)
Definition: elog.c:270
int GetReplicationApplyDelay(void)
#define SpinLockRelease(lock)
Definition: spin.h:64
TimestampTz latestWalEndTime
Definition: walreceiver.h:104
XLogRecPtr latestWalEnd
Definition: walreceiver.h:103
WalRcvData * WalRcv
#define elog(elevel,...)
Definition: elog.h:228
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1772

◆ WalRcvDie()

static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 769 of file walreceiver.c.

References Assert, WalRcvData::latch, WalRcvData::mutex, MyProcPid, WalRcvData::pid, WalRcvData::ready_to_display, SpinLockAcquire, SpinLockRelease, WakeupRecovery(), WalRcv, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

770 {
771  WalRcvData *walrcv = WalRcv;
772 
773  /* Ensure that all WAL records received are flushed to disk */
774  XLogWalRcvFlush(true);
775 
776  /* Mark ourselves inactive in shared memory */
777  SpinLockAcquire(&walrcv->mutex);
778  Assert(walrcv->walRcvState == WALRCV_STREAMING ||
779  walrcv->walRcvState == WALRCV_RESTARTING ||
780  walrcv->walRcvState == WALRCV_STARTING ||
781  walrcv->walRcvState == WALRCV_WAITING ||
782  walrcv->walRcvState == WALRCV_STOPPING);
783  Assert(walrcv->pid == MyProcPid);
784  walrcv->walRcvState = WALRCV_STOPPED;
785  walrcv->pid = 0;
786  walrcv->ready_to_display = false;
787  walrcv->latch = NULL;
788  SpinLockRelease(&walrcv->mutex);
789 
790  /* Terminate the connection gracefully. */
791  if (wrconn != NULL)
793 
794  /* Wake up the startup process to notice promptly that we're gone */
795  WakeupRecovery();
796 }
int MyProcPid
Definition: globals.c:41
slock_t mutex
Definition: walreceiver.h:143
WalRcvState walRcvState
Definition: walreceiver.h:64
static WalReceiverConn * wrconn
Definition: walreceiver.c:93
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12813
pid_t pid
Definition: walreceiver.h:63
Latch * latch
Definition: walreceiver.h:141
#define SpinLockRelease(lock)
Definition: spin.h:64
bool ready_to_display
Definition: walreceiver.h:132
#define Assert(condition)
Definition: c.h:792
#define walrcv_disconnect(conn)
Definition: walreceiver.h:426
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:974
WalRcvData * WalRcv

◆ WalRcvFetchTimeLineHistoryFiles()

static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 713 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, ereport, errcode(), errmsg(), errmsg_internal(), ERROR, existsTimeLineHistory(), LOG, MAXFNAMELEN, pfree(), TLHistoryFileName, walrcv_readtimelinehistoryfile, writeTimeLineHistoryFile(), XLogArchiveForceDone(), XLogArchiveMode, and XLogArchiveNotify().

Referenced by WalReceiverMain().

714 {
715  TimeLineID tli;
716 
717  for (tli = first; tli <= last; tli++)
718  {
719  /* there's no history file for timeline 1 */
720  if (tli != 1 && !existsTimeLineHistory(tli))
721  {
722  char *fname;
723  char *content;
724  int len;
725  char expectedfname[MAXFNAMELEN];
726 
727  ereport(LOG,
728  (errmsg("fetching timeline history file for timeline %u from primary server",
729  tli)));
730 
731  walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
732 
733  /*
734  * Check that the filename on the primary matches what we
735  * calculated ourselves. This is just a sanity check, it should
736  * always match.
737  */
738  TLHistoryFileName(expectedfname, tli);
739  if (strcmp(fname, expectedfname) != 0)
740  ereport(ERROR,
741  (errcode(ERRCODE_PROTOCOL_VIOLATION),
742  errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
743  tli)));
744 
745  /*
746  * Write the file to pg_wal.
747  */
748  writeTimeLineHistoryFile(tli, content, len);
749 
750  /*
751  * Mark the streamed history file as ready for archiving
752  * if archive_mode is always.
753  */
755  XLogArchiveForceDone(fname);
756  else
757  XLogArchiveNotify(fname);
758 
759  pfree(fname);
760  pfree(content);
761  }
762  }
763 }
uint32 TimeLineID
Definition: xlogdefs.h:52
int errcode(int sqlerrcode)
Definition: elog.c:704
#define LOG
Definition: elog.h:26
static WalReceiverConn * wrconn
Definition: walreceiver.c:93
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:467
void pfree(void *pointer)
Definition: mcxt.c:1057
#define ERROR
Definition: elog.h:45
#define TLHistoryFileName(fname, tli)
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition: timeline.c:467
int XLogArchiveMode
Definition: xlog.c:96
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition: timeline.c:222
#define MAXFNAMELEN
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:517
#define ereport(elevel,...)
Definition: elog.h:155
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1002
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
Definition: walreceiver.h:410
int errmsg(const char *fmt,...)
Definition: elog.c:915

◆ WalRcvForceReply()

void WalRcvForceReply ( void  )

Definition at line 1256 of file walreceiver.c.

References WalRcvData::force_reply, WalRcvData::latch, WalRcvData::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by StartupXLOG(), WaitForWALToBecomeAvailable(), and walrcv_clear_result().

1257 {
1258  Latch *latch;
1259 
1260  WalRcv->force_reply = true;
1261  /* fetching the latch pointer might not be atomic, so use spinlock */
1263  latch = WalRcv->latch;
1265  if (latch)
1266  SetLatch(latch);
1267 }
slock_t mutex
Definition: walreceiver.h:143
sig_atomic_t force_reply
Definition: walreceiver.h:158
void SetLatch(Latch *latch)
Definition: latch.c:505
#define SpinLockAcquire(lock)
Definition: spin.h:62
Definition: latch.h:110
Latch * latch
Definition: walreceiver.h:141
#define SpinLockRelease(lock)
Definition: spin.h:64
WalRcvData * WalRcv

◆ WalRcvGetStateString()

static const char* WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1274 of file walreceiver.c.

References WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

1275 {
1276  switch (state)
1277  {
1278  case WALRCV_STOPPED:
1279  return "stopped";
1280  case WALRCV_STARTING:
1281  return "starting";
1282  case WALRCV_STREAMING:
1283  return "streaming";
1284  case WALRCV_WAITING:
1285  return "waiting";
1286  case WALRCV_RESTARTING:
1287  return "restarting";
1288  case WALRCV_STOPPING:
1289  return "stopping";
1290  }
1291  return "UNKNOWN";
1292 }
Definition: regguts.h:298

◆ WalRcvWaitForStartPosition()

static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 632 of file walreceiver.c.

References Assert, elog, FATAL, InvalidXLogRecPtr, WalRcvData::mutex, MyLatch, proc_exit(), ProcessWalRcvInterrupts(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, ResetLatch(), set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WAIT_EVENT_WAL_RECEIVER_WAIT_START, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by WalReceiverMain().

633 {
634  WalRcvData *walrcv = WalRcv;
635  int state;
636 
637  SpinLockAcquire(&walrcv->mutex);
638  state = walrcv->walRcvState;
639  if (state != WALRCV_STREAMING)
640  {
641  SpinLockRelease(&walrcv->mutex);
642  if (state == WALRCV_STOPPING)
643  proc_exit(0);
644  else
645  elog(FATAL, "unexpected walreceiver state");
646  }
647  walrcv->walRcvState = WALRCV_WAITING;
649  walrcv->receiveStartTLI = 0;
650  SpinLockRelease(&walrcv->mutex);
651 
652  set_ps_display("idle");
653 
654  /*
655  * nudge startup process to notice that we've stopped streaming and are
656  * now waiting for instructions.
657  */
658  WakeupRecovery();
659  for (;;)
660  {
662 
664 
665  SpinLockAcquire(&walrcv->mutex);
666  Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
667  walrcv->walRcvState == WALRCV_WAITING ||
668  walrcv->walRcvState == WALRCV_STOPPING);
669  if (walrcv->walRcvState == WALRCV_RESTARTING)
670  {
671  /*
672  * No need to handle changes in primary_conninfo or
673  * primary_slotname here. Startup process will signal us to
674  * terminate in case those change.
675  */
676  *startpoint = walrcv->receiveStart;
677  *startpointTLI = walrcv->receiveStartTLI;
678  walrcv->walRcvState = WALRCV_STREAMING;
679  SpinLockRelease(&walrcv->mutex);
680  break;
681  }
682  if (walrcv->walRcvState == WALRCV_STOPPING)
683  {
684  /*
685  * We should've received SIGTERM if the startup process wants us
686  * to die, but might as well check it here too.
687  */
688  SpinLockRelease(&walrcv->mutex);
689  exit(1);
690  }
691  SpinLockRelease(&walrcv->mutex);
692 
695  }
696 
698  {
699  char activitymsg[50];
700 
701  snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
702  (uint32) (*startpoint >> 32),
703  (uint32) *startpoint);
704  set_ps_display(activitymsg);
705  }
706 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
slock_t mutex
Definition: walreceiver.h:143
bool update_process_title
Definition: ps_status.c:36
WalRcvState walRcvState
Definition: walreceiver.h:64
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:145
void proc_exit(int code)
Definition: ipc.c:104
void ResetLatch(Latch *latch)
Definition: latch.c:588
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:390
void set_ps_display(const char *activity)
Definition: ps_status.c:349
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12813
#define FATAL
Definition: elog.h:54
unsigned int uint32
Definition: c.h:429
TimeLineID receiveStartTLI
Definition: walreceiver.h:74
#define SpinLockRelease(lock)
Definition: spin.h:64
#define Assert(condition)
Definition: c.h:792
Definition: regguts.h:298
WalRcvData * WalRcv
#define elog(elevel,...)
Definition: elog.h:228
struct Latch * MyLatch
Definition: globals.c:55
XLogRecPtr receiveStart
Definition: walreceiver.h:73
#define snprintf
Definition: port.h:215
#define WL_LATCH_SET
Definition: latch.h:124
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ WalReceiverMain()

void WalReceiverMain ( void  )

Definition at line 166 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, Assert, buf, close, cluster_name, ConfigReloadPending, WalRcvData::conninfo, DEBUG1, elog, ereport, errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, WalRcvData::force_reply, GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), initStringInfo(), WalRcvData::is_temp_slot, WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latch, WalRcvData::latestWalEndTime, load_file(), LOG, WalRcvStreamOptions::logical, LogstreamResult, MAXCONNINFO, MAXFNAMELEN, WalRcvData::mutex, MyLatch, MyProc, MyProcPid, NAMEDATALEN, NAPTIME_PER_CYCLE, NI_MAXHOST, now(), on_shmem_exit(), options, PANIC, pfree(), pg_atomic_init_u64(), pg_memory_barrier, PG_SETMASK, PGC_SIGHUP, PGINVALID_SOCKET, WalRcvStreamOptions::physical, WalRcvData::pid, pqsignal(), proc_exit(), ProcessConfigFile(), ProcessWalRcvInterrupts(), PGPROC::procLatch, procsignal_sigusr1_handler(), WalRcvStreamOptions::proto, WalRcvData::ready_to_display, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, ResetLatch(), WalRcvData::sender_host, WalRcvData::sender_port, SIG_DFL, SIG_IGN, SIGALRM, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SIGPIPE, SIGUSR1, SIGUSR2, WalRcvData::slotname, WalRcvStreamOptions::slotname, snprintf, SpinLockAcquire, SpinLockRelease, WalRcvStreamOptions::startpoint, strlcpy(), ThisTimeLineID, TimestampTzPlusMilliseconds, UINT64_FORMAT, UnBlockSig, WAIT_EVENT_WAL_RECEIVER_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, wal_segment_size, WalRcv, walrcv_connect, walrcv_create_slot, walrcv_endstreaming, walrcv_get_backend_pid, walrcv_get_conninfo, walrcv_get_senderinfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvData::walRcvState, WalRcvWaitForStartPosition(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_TIMEOUT, WalRcvData::writtenUpto, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName, XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by AuxiliaryProcessMain(), and walrcv_clear_result().

167 {
168  char conninfo[MAXCONNINFO];
169  char *tmp_conninfo;
170  char slotname[NAMEDATALEN];
171  bool is_temp_slot;
172  XLogRecPtr startpoint;
173  TimeLineID startpointTLI;
174  TimeLineID primaryTLI;
175  bool first_stream;
176  WalRcvData *walrcv = WalRcv;
177  TimestampTz last_recv_timestamp;
179  bool ping_sent;
180  char *err;
181  char *sender_host = NULL;
182  int sender_port = 0;
183 
184  /*
185  * WalRcv should be set up already (if we are a backend, we inherit this
186  * by fork() or EXEC_BACKEND mechanism from the postmaster).
187  */
188  Assert(walrcv != NULL);
189 
190  now = GetCurrentTimestamp();
191 
192  /*
193  * Mark walreceiver as running in shared memory.
194  *
195  * Do this as early as possible, so that if we fail later on, we'll set
196  * state to STOPPED. If we die before this, the startup process will keep
197  * waiting for us to start up, until it times out.
198  */
199  SpinLockAcquire(&walrcv->mutex);
200  Assert(walrcv->pid == 0);
201  switch (walrcv->walRcvState)
202  {
203  case WALRCV_STOPPING:
204  /* If we've already been requested to stop, don't start up. */
205  walrcv->walRcvState = WALRCV_STOPPED;
206  /* fall through */
207 
208  case WALRCV_STOPPED:
209  SpinLockRelease(&walrcv->mutex);
210  proc_exit(1);
211  break;
212 
213  case WALRCV_STARTING:
214  /* The usual case */
215  break;
216 
217  case WALRCV_WAITING:
218  case WALRCV_STREAMING:
219  case WALRCV_RESTARTING:
220  default:
221  /* Shouldn't happen */
222  SpinLockRelease(&walrcv->mutex);
223  elog(PANIC, "walreceiver still running according to shared memory state");
224  }
225  /* Advertise our PID so that the startup process can kill us */
226  walrcv->pid = MyProcPid;
227  walrcv->walRcvState = WALRCV_STREAMING;
228 
229  /* Fetch information required to start streaming */
230  walrcv->ready_to_display = false;
231  strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
232  strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
233  is_temp_slot = walrcv->is_temp_slot;
234  startpoint = walrcv->receiveStart;
235  startpointTLI = walrcv->receiveStartTLI;
236 
237  /*
238  * At most one of is_temp_slot and slotname can be set; otherwise,
239  * RequestXLogStreaming messed up.
240  */
241  Assert(!is_temp_slot || (slotname[0] == '\0'));
242 
243  /* Initialise to a sanish value */
244  walrcv->lastMsgSendTime =
245  walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
246 
247  /* Report the latch to use to awaken this process */
248  walrcv->latch = &MyProc->procLatch;
249 
250  SpinLockRelease(&walrcv->mutex);
251 
253 
254  /* Arrange to clean up at walreceiver exit */
256 
257  /* Properly accept or ignore signals the postmaster might send us */
258  pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
259  * file */
260  pqsignal(SIGINT, SIG_IGN);
261  pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
262  /* SIGQUIT handler was already set up by InitPostmasterChild */
267 
268  /* Reset some signals that are accepted by postmaster but not here */
270 
271  /* Load the libpq-specific functions */
272  load_file("libpqwalreceiver", false);
273  if (WalReceiverFunctions == NULL)
274  elog(ERROR, "libpqwalreceiver didn't initialize correctly");
275 
276  /* Unblock signals (they were blocked when the postmaster forked us) */
278 
279  /* Establish the connection to the primary for XLOG streaming */
280  wrconn = walrcv_connect(conninfo, false, cluster_name[0] ? cluster_name : "walreceiver", &err);
281  if (!wrconn)
282  ereport(ERROR,
283  (errmsg("could not connect to the primary server: %s", err)));
284 
285  /*
286  * Save user-visible connection string. This clobbers the original
287  * conninfo, for security. Also save host and port of the sender server
288  * this walreceiver is connected to.
289  */
290  tmp_conninfo = walrcv_get_conninfo(wrconn);
291  walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
292  SpinLockAcquire(&walrcv->mutex);
293  memset(walrcv->conninfo, 0, MAXCONNINFO);
294  if (tmp_conninfo)
295  strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
296 
297  memset(walrcv->sender_host, 0, NI_MAXHOST);
298  if (sender_host)
299  strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
300 
301  walrcv->sender_port = sender_port;
302  walrcv->ready_to_display = true;
303  SpinLockRelease(&walrcv->mutex);
304 
305  if (tmp_conninfo)
306  pfree(tmp_conninfo);
307 
308  if (sender_host)
309  pfree(sender_host);
310 
311  first_stream = true;
312  for (;;)
313  {
314  char *primary_sysid;
315  char standby_sysid[32];
317 
318  /*
319  * Check that we're connected to a valid server using the
320  * IDENTIFY_SYSTEM replication command.
321  */
322  primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
323 
324  snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
326  if (strcmp(primary_sysid, standby_sysid) != 0)
327  {
328  ereport(ERROR,
329  (errmsg("database system identifier differs between the primary and standby"),
330  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
331  primary_sysid, standby_sysid)));
332  }
333 
334  /*
335  * Confirm that the current timeline of the primary is the same or
336  * ahead of ours.
337  */
338  if (primaryTLI < startpointTLI)
339  ereport(ERROR,
340  (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
341  primaryTLI, startpointTLI)));
342 
343  /*
344  * Get any missing history files. We do this always, even when we're
345  * not interested in that timeline, so that if we're promoted to
346  * become the primary later on, we don't select the same timeline that
347  * was already used in the current primary. This isn't bullet-proof -
348  * you'll need some external software to manage your cluster if you
349  * need to ensure that a unique timeline id is chosen in every case,
350  * but let's avoid the confusion of timeline id collisions where we
351  * can.
352  */
353  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
354 
355  /*
356  * Create temporary replication slot if requested, and update slot
357  * name in shared memory. (Note the slot name cannot already be set
358  * in this case.)
359  */
360  if (is_temp_slot)
361  {
362  snprintf(slotname, sizeof(slotname),
363  "pg_walreceiver_%lld",
364  (long long int) walrcv_get_backend_pid(wrconn));
365 
366  walrcv_create_slot(wrconn, slotname, true, 0, NULL);
367 
368  SpinLockAcquire(&walrcv->mutex);
369  strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
370  SpinLockRelease(&walrcv->mutex);
371  }
372 
373  /*
374  * Start streaming.
375  *
376  * We'll try to start at the requested starting point and timeline,
377  * even if it's different from the server's latest timeline. In case
378  * we've already reached the end of the old timeline, the server will
379  * finish the streaming immediately, and we will go back to await
380  * orders from the startup process. If recovery_target_timeline is
381  * 'latest', the startup process will scan pg_wal and find the new
382  * history file, bump recovery target timeline, and ask us to restart
383  * on the new timeline.
384  */
385  options.logical = false;
386  options.startpoint = startpoint;
387  options.slotname = slotname[0] != '\0' ? slotname : NULL;
388  options.proto.physical.startpointTLI = startpointTLI;
389  ThisTimeLineID = startpointTLI;
390  if (walrcv_startstreaming(wrconn, &options))
391  {
392  if (first_stream)
393  ereport(LOG,
394  (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
395  (uint32) (startpoint >> 32), (uint32) startpoint,
396  startpointTLI)));
397  else
398  ereport(LOG,
399  (errmsg("restarted WAL streaming at %X/%X on timeline %u",
400  (uint32) (startpoint >> 32), (uint32) startpoint,
401  startpointTLI)));
402  first_stream = false;
403 
404  /* Initialize LogstreamResult and buffers for processing messages */
408 
409  /* Initialize the last recv timestamp */
410  last_recv_timestamp = GetCurrentTimestamp();
411  ping_sent = false;
412 
413  /* Loop until end-of-streaming or error */
414  for (;;)
415  {
416  char *buf;
417  int len;
418  bool endofwal = false;
419  pgsocket wait_fd = PGINVALID_SOCKET;
420  int rc;
421 
422  /*
423  * Exit walreceiver if we're not in recovery. This should not
424  * happen, but cross-check the status here.
425  */
426  if (!RecoveryInProgress())
427  ereport(FATAL,
428  (errmsg("cannot continue WAL streaming, recovery has already ended")));
429 
430  /* Process any requests or signals received recently */
432 
434  {
435  ConfigReloadPending = false;
438  }
439 
440  /* See if we can read data immediately */
441  len = walrcv_receive(wrconn, &buf, &wait_fd);
442  if (len != 0)
443  {
444  /*
445  * Process the received data, and any subsequent data we
446  * can read without blocking.
447  */
448  for (;;)
449  {
450  if (len > 0)
451  {
452  /*
453  * Something was received from primary, so reset
454  * timeout
455  */
456  last_recv_timestamp = GetCurrentTimestamp();
457  ping_sent = false;
458  XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
459  }
460  else if (len == 0)
461  break;
462  else if (len < 0)
463  {
464  ereport(LOG,
465  (errmsg("replication terminated by primary server"),
466  errdetail("End of WAL reached on timeline %u at %X/%X.",
467  startpointTLI,
468  (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
469  endofwal = true;
470  break;
471  }
472  len = walrcv_receive(wrconn, &buf, &wait_fd);
473  }
474 
475  /* Let the primary know that we received some data. */
476  XLogWalRcvSendReply(false, false);
477 
478  /*
479  * If we've written some records, flush them to disk and
480  * let the startup process and primary server know about
481  * them.
482  */
483  XLogWalRcvFlush(false);
484  }
485 
486  /* Check if we need to exit the streaming loop. */
487  if (endofwal)
488  break;
489 
490  /*
491  * Ideally we would reuse a WaitEventSet object repeatedly
492  * here to avoid the overheads of WaitLatchOrSocket on epoll
493  * systems, but we can't be sure that libpq (or any other
494  * walreceiver implementation) has the same socket (even if
495  * the fd is the same number, it may have been closed and
496  * reopened since the last time). In future, if there is a
497  * function for removing sockets from WaitEventSet, then we
498  * could add and remove just the socket each time, potentially
499  * avoiding some system calls.
500  */
501  Assert(wait_fd != PGINVALID_SOCKET);
505  wait_fd,
508  if (rc & WL_LATCH_SET)
509  {
512 
513  if (walrcv->force_reply)
514  {
515  /*
516  * The recovery process has asked us to send apply
517  * feedback now. Make sure the flag is really set to
518  * false in shared memory before sending the reply, so
519  * we don't miss a new request for a reply.
520  */
521  walrcv->force_reply = false;
523  XLogWalRcvSendReply(true, false);
524  }
525  }
526  if (rc & WL_TIMEOUT)
527  {
528  /*
529  * We didn't receive anything new. If we haven't heard
530  * anything from the server for more than
531  * wal_receiver_timeout / 2, ping the server. Also, if
532  * it's been longer than wal_receiver_status_interval
533  * since the last update we sent, send a status update to
534  * the primary anyway, to report any progress in applying
535  * WAL.
536  */
537  bool requestReply = false;
538 
539  /*
540  * Check if time since last receive from primary has
541  * reached the configured limit.
542  */
543  if (wal_receiver_timeout > 0)
544  {
546  TimestampTz timeout;
547 
548  timeout =
549  TimestampTzPlusMilliseconds(last_recv_timestamp,
551 
552  if (now >= timeout)
553  ereport(ERROR,
554  (errmsg("terminating walreceiver due to timeout")));
555 
556  /*
557  * We didn't receive anything new, for half of
558  * receiver replication timeout. Ping the server.
559  */
560  if (!ping_sent)
561  {
562  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
563  (wal_receiver_timeout / 2));
564  if (now >= timeout)
565  {
566  requestReply = true;
567  ping_sent = true;
568  }
569  }
570  }
571 
572  XLogWalRcvSendReply(requestReply, requestReply);
574  }
575  }
576 
577  /*
578  * The backend finished streaming. Exit streaming COPY-mode from
579  * our side, too.
580  */
581  walrcv_endstreaming(wrconn, &primaryTLI);
582 
583  /*
584  * If the server had switched to a new timeline that we didn't
585  * know about when we began streaming, fetch its timeline history
586  * file now.
587  */
588  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
589  }
590  else
591  ereport(LOG,
592  (errmsg("primary server contains no more WAL on requested timeline %u",
593  startpointTLI)));
594 
595  /*
596  * End of WAL reached on the requested timeline. Close the last
597  * segment, and await for new orders from the startup process.
598  */
599  if (recvFile >= 0)
600  {
601  char xlogfname[MAXFNAMELEN];
602 
603  XLogWalRcvFlush(false);
605  if (close(recvFile) != 0)
606  ereport(PANIC,
608  errmsg("could not close log segment %s: %m",
609  xlogfname)));
610 
611  /*
612  * Create .done file forcibly to prevent the streamed segment from
613  * being archived later.
614  */
616  XLogArchiveForceDone(xlogfname);
617  else
618  XLogArchiveNotify(xlogfname);
619  }
620  recvFile = -1;
621 
622  elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
623  WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
624  }
625  /* not reached */
626 }
int sender_port
Definition: walreceiver.h:117
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
Definition: walreceiver.c:713
#define walrcv_get_conninfo(conn)
Definition: walreceiver.h:402
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:414
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:41
uint32 TimeLineID
Definition: xlogdefs.h:52
slock_t mutex
Definition: walreceiver.h:143
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
int wal_segment_size
Definition: xlog.c:118
#define SIGUSR1
Definition: win32_port.h:171
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
#define SIGCHLD
Definition: win32_port.h:169
PGPROC * MyProc
Definition: proc.c:68
int64 TimestampTz
Definition: timestamp.h:39
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
Definition: walreceiver.c:632
sig_atomic_t force_reply
Definition: walreceiver.h:158
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:406
WalRcvState walRcvState
Definition: walreceiver.h:64
static StringInfoData incoming_message
Definition: walreceiver.c:118
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1102
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:416
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:145
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:412
void proc_exit(int code)
Definition: ipc.c:104
static struct @22 LogstreamResult
#define WL_SOCKET_READABLE
Definition: latch.h:125
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
Definition: walreceiver.c:802
#define SIGPIPE
Definition: win32_port.h:164
#define SIGUSR2
Definition: win32_port.h:172
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:8148
#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
Definition: walreceiver.h:420
#define PANIC
Definition: elog.h:55
#define NAPTIME_PER_CYCLE
Definition: walreceiver.c:96
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:98
void ResetLatch(Latch *latch)
Definition: latch.c:588
TimestampTz lastMsgSendTime
Definition: walreceiver.h:97
#define PG_SETMASK(mask)
Definition: pqsignal.h:19
int wal_receiver_timeout
Definition: walreceiver.c:89
Latch procLatch
Definition: proc.h:130
static WalReceiverConn * wrconn
Definition: walreceiver.c:93
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:151
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
static StringInfoData reply_message
Definition: walreceiver.c:117
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:467
void pfree(void *pointer)
Definition: mcxt.c:1057
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:415
#define NI_MAXHOST
Definition: getaddrinfo.h:88
#define ERROR
Definition: elog.h:45
#define FATAL
Definition: elog.h:54
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11638
pid_t pid
Definition: walreceiver.h:63
#define MAXCONNINFO
Definition: walreceiver.h:36
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
static char * buf
Definition: pg_test_fsync.c:68
int XLogArchiveMode
Definition: xlog.c:96
int errdetail(const char *fmt,...)
Definition: elog.c:1048
int errcode_for_file_access(void)
Definition: elog.c:727
#define SIGHUP
Definition: win32_port.h:159
XLogRecPtr startpoint
Definition: walreceiver.h:168
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:438
unsigned int uint32
Definition: c.h:429
int pgsocket
Definition: port.h:31
sigset_t UnBlockSig
Definition: pqsignal.c:22
TimeLineID receiveStartTLI
Definition: walreceiver.h:74
static TimeLineID recvFileTLI
Definition: walreceiver.c:104
#define walrcv_get_backend_pid(conn)
Definition: walreceiver.h:422
Definition: guc.h:72
char * cluster_name
Definition: guc.c:557
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition: interrupt.c:104
Latch * latch
Definition: walreceiver.h:141
#define SIG_IGN
Definition: win32_port.h:156
static char ** options
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
static XLogSegNo recvSegNo
Definition: walreceiver.c:105
#define MAXFNAMELEN
#define SpinLockRelease(lock)
Definition: spin.h:64
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:517
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:94
#define PGINVALID_SOCKET
Definition: port.h:33
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1033
bool ready_to_display
Definition: walreceiver.h:132
TimestampTz latestWalEndTime
Definition: walreceiver.h:104
TimeLineID ThisTimeLineID
Definition: xlog.c:193
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define ereport(elevel,...)
Definition: elog.h:155
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define pg_memory_barrier()
Definition: atomics.h:145
#define SIG_DFL
Definition: win32_port.h:154
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:792
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
#define SIGALRM
Definition: win32_port.h:165
bool is_temp_slot
Definition: walreceiver.h:129
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:974
WalRcvData * WalRcv
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4941
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:116
int errmsg(const char *fmt,...)
Definition: elog.c:915
static void WalRcvDie(int code, Datum arg)
Definition: walreceiver.c:769
union WalRcvStreamOptions::@105 proto
#define elog(elevel,...)
Definition: elog.h:228
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
Definition: walreceiver.h:404
struct Latch * MyLatch
Definition: globals.c:55
struct WalRcvStreamOptions::@105::@106 physical
#define close(a)
Definition: win32.h:12
XLogRecPtr receiveStart
Definition: walreceiver.h:73
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:652
static int recvFile
Definition: walreceiver.c:103
#define snprintf
Definition: port.h:215
#define WL_LATCH_SET
Definition: latch.h:124
#define UINT64_FORMAT
Definition: c.h:472
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542
char slotname[NAMEDATALEN]
Definition: walreceiver.h:123
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:110
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:398

◆ XLogWalRcvFlush()

static void XLogWalRcvFlush ( bool  dying)
static

Definition at line 974 of file walreceiver.c.

References AllowCascadeReplication, WalRcvData::flushedUpto, issue_xlog_fsync(), WalRcvData::latestChunkStart, LogstreamResult, WalRcvData::mutex, WalRcvData::receivedTLI, recvFile, recvSegNo, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, ThisTimeLineID, update_process_title, WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvWrite().

975 {
976  if (LogstreamResult.Flush < LogstreamResult.Write)
977  {
978  WalRcvData *walrcv = WalRcv;
979 
981 
982  LogstreamResult.Flush = LogstreamResult.Write;
983 
984  /* Update shared-memory status */
985  SpinLockAcquire(&walrcv->mutex);
986  if (walrcv->flushedUpto < LogstreamResult.Flush)
987  {
988  walrcv->latestChunkStart = walrcv->flushedUpto;
989  walrcv->flushedUpto = LogstreamResult.Flush;
990  walrcv->receivedTLI = ThisTimeLineID;
991  }
992  SpinLockRelease(&walrcv->mutex);
993 
994  /* Signal the startup process and walsender that new WAL has arrived */
995  WakeupRecovery();
997  WalSndWakeup();
998 
999  /* Report XLOG streaming progress in PS display */
1001  {
1002  char activitymsg[50];
1003 
1004  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1005  (uint32) (LogstreamResult.Write >> 32),
1006  (uint32) LogstreamResult.Write);
1007  set_ps_display(activitymsg);
1008  }
1009 
1010  /* Also let the primary know that we made some progress */
1011  if (!dying)
1012  {
1013  XLogWalRcvSendReply(false, false);
1014  XLogWalRcvSendHSFeedback(false);
1015  }
1016  }
1017 }
slock_t mutex
Definition: walreceiver.h:143
bool update_process_title
Definition: ps_status.c:36
void issue_xlog_fsync(int fd, XLogSegNo segno)
Definition: xlog.c:10566
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1102
TimeLineID receivedTLI
Definition: walreceiver.h:84
static struct @22 LogstreamResult
void set_ps_display(const char *activity)
Definition: ps_status.c:349
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12813
XLogRecPtr latestChunkStart
Definition: walreceiver.h:92
#define AllowCascadeReplication()
Definition: walreceiver.h:39
unsigned int uint32
Definition: c.h:429
static XLogSegNo recvSegNo
Definition: walreceiver.c:105
#define SpinLockRelease(lock)
Definition: spin.h:64
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1033
TimeLineID ThisTimeLineID
Definition: xlog.c:193
WalRcvData * WalRcv
static int recvFile
Definition: walreceiver.c:103
XLogRecPtr flushedUpto
Definition: walreceiver.h:83
#define snprintf
Definition: port.h:215
void WalSndWakeup(void)
Definition: walsender.c:3108

◆ XLogWalRcvProcessMsg()

static void XLogWalRcvProcessMsg ( unsigned char  type,
char *  buf,
Size  len 
)
static

Definition at line 802 of file walreceiver.c.

References appendBinaryStringInfo(), ereport, errcode(), errmsg_internal(), ERROR, pq_getmsgbyte(), pq_getmsgint64(), ProcessWalSndrMessage(), resetStringInfo(), XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

803 {
804  int hdrlen;
805  XLogRecPtr dataStart;
806  XLogRecPtr walEnd;
807  TimestampTz sendTime;
808  bool replyRequested;
809 
811 
812  switch (type)
813  {
814  case 'w': /* WAL records */
815  {
816  /* copy message to StringInfo */
817  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
818  if (len < hdrlen)
819  ereport(ERROR,
820  (errcode(ERRCODE_PROTOCOL_VIOLATION),
821  errmsg_internal("invalid WAL message received from primary")));
823 
824  /* read the fields */
825  dataStart = pq_getmsgint64(&incoming_message);
826  walEnd = pq_getmsgint64(&incoming_message);
827  sendTime = pq_getmsgint64(&incoming_message);
828  ProcessWalSndrMessage(walEnd, sendTime);
829 
830  buf += hdrlen;
831  len -= hdrlen;
832  XLogWalRcvWrite(buf, len, dataStart);
833  break;
834  }
835  case 'k': /* Keepalive */
836  {
837  /* copy message to StringInfo */
838  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
839  if (len != hdrlen)
840  ereport(ERROR,
841  (errcode(ERRCODE_PROTOCOL_VIOLATION),
842  errmsg_internal("invalid keepalive message received from primary")));
844 
845  /* read the fields */
846  walEnd = pq_getmsgint64(&incoming_message);
847  sendTime = pq_getmsgint64(&incoming_message);
848  replyRequested = pq_getmsgbyte(&incoming_message);
849 
850  ProcessWalSndrMessage(walEnd, sendTime);
851 
852  /* If the primary requested a reply, send one immediately */
853  if (replyRequested)
854  XLogWalRcvSendReply(true, false);
855  break;
856  }
857  default:
858  ereport(ERROR,
859  (errcode(ERRCODE_PROTOCOL_VIOLATION),
860  errmsg_internal("invalid replication message type %d",
861  type)));
862  }
863 }
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
Definition: walreceiver.c:1203
int64 TimestampTz
Definition: timestamp.h:39
static StringInfoData incoming_message
Definition: walreceiver.c:118
int errcode(int sqlerrcode)
Definition: elog.c:704
#define ERROR
Definition: elog.h:45
static char * buf
Definition: pg_test_fsync.c:68
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1033
#define ereport(elevel,...)
Definition: elog.h:155
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1002
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
Definition: walreceiver.c:869

◆ XLogWalRcvSendHSFeedback()

static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1102 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, EpochFromFullTransactionId, GetCurrentTimestamp(), GetReplicationHorizons(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), pq_sendbyte(), pq_sendint32(), pq_sendint64(), ReadNextFullTransactionId(), resetStringInfo(), TimestampDifferenceExceeds(), TransactionIdIsValid, wal_receiver_status_interval, walrcv_send, and XidFromFullTransactionId.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

1103 {
1104  TimestampTz now;
1105  FullTransactionId nextFullXid;
1106  TransactionId nextXid;
1107  uint32 xmin_epoch,
1108  catalog_xmin_epoch;
1109  TransactionId xmin,
1110  catalog_xmin;
1111  static TimestampTz sendTime = 0;
1112 
1113  /* initially true so we always send at least one feedback message */
1114  static bool primary_has_standby_xmin = true;
1115 
1116  /*
1117  * If the user doesn't want status to be reported to the primary, be sure
1118  * to exit before doing anything at all.
1119  */
1121  !primary_has_standby_xmin)
1122  return;
1123 
1124  /* Get current timestamp. */
1125  now = GetCurrentTimestamp();
1126 
1127  if (!immed)
1128  {
1129  /*
1130  * Send feedback at most once per wal_receiver_status_interval.
1131  */
1132  if (!TimestampDifferenceExceeds(sendTime, now,
1134  return;
1135  sendTime = now;
1136  }
1137 
1138  /*
1139  * If Hot Standby is not yet accepting connections there is nothing to
1140  * send. Check this after the interval has expired to reduce number of
1141  * calls.
1142  *
1143  * Bailing out here also ensures that we don't send feedback until we've
1144  * read our own replication slot state, so we don't tell the primary to
1145  * discard needed xmin or catalog_xmin from any slots that may exist on
1146  * this replica.
1147  */
1148  if (!HotStandbyActive())
1149  return;
1150 
1151  /*
1152  * Make the expensive call to get the oldest xmin once we are certain
1153  * everything else has been checked.
1154  */
1156  {
1157  GetReplicationHorizons(&xmin, &catalog_xmin);
1158  }
1159  else
1160  {
1161  xmin = InvalidTransactionId;
1162  catalog_xmin = InvalidTransactionId;
1163  }
1164 
1165  /*
1166  * Get epoch and adjust if nextXid and oldestXmin are different sides of
1167  * the epoch boundary.
1168  */
1169  nextFullXid = ReadNextFullTransactionId();
1170  nextXid = XidFromFullTransactionId(nextFullXid);
1171  xmin_epoch = EpochFromFullTransactionId(nextFullXid);
1172  catalog_xmin_epoch = xmin_epoch;
1173  if (nextXid < xmin)
1174  xmin_epoch--;
1175  if (nextXid < catalog_xmin)
1176  catalog_xmin_epoch--;
1177 
1178  elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1179  xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1180 
1181  /* Construct the message and send it. */
1183  pq_sendbyte(&reply_message, 'h');
1185  pq_sendint32(&reply_message, xmin);
1186  pq_sendint32(&reply_message, xmin_epoch);
1187  pq_sendint32(&reply_message, catalog_xmin);
1188  pq_sendint32(&reply_message, catalog_xmin_epoch);
1190  if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1191  primary_has_standby_xmin = true;
1192  else
1193  primary_has_standby_xmin = false;
1194 }
bool hot_standby_feedback
Definition: walreceiver.c:90
uint32 TransactionId
Definition: c.h:575
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
int64 TimestampTz
Definition: timestamp.h:39
bool HotStandbyActive(void)
Definition: xlog.c:8222
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
int wal_receiver_status_interval
Definition: walreceiver.c:88
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1709
static WalReceiverConn * wrconn
Definition: walreceiver.c:93
#define XidFromFullTransactionId(x)
Definition: transam.h:48
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static StringInfoData reply_message
Definition: walreceiver.c:117
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
unsigned int uint32
Definition: c.h:429
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:261
void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:1950
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:418
#define elog(elevel,...)
Definition: elog.h:228
#define TransactionIdIsValid(xid)
Definition: transam.h:41
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542

◆ XLogWalRcvSendReply()

static void XLogWalRcvSendReply ( bool  force,
bool  requestReply 
)
static

Definition at line 1033 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), GetXLogReplayRecPtr(), StringInfoData::len, LogstreamResult, now(), pq_sendbyte(), pq_sendint64(), resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

1034 {
1035  static XLogRecPtr writePtr = 0;
1036  static XLogRecPtr flushPtr = 0;
1037  XLogRecPtr applyPtr;
1038  static TimestampTz sendTime = 0;
1039  TimestampTz now;
1040 
1041  /*
1042  * If the user doesn't want status to be reported to the primary, be sure
1043  * to exit before doing anything at all.
1044  */
1045  if (!force && wal_receiver_status_interval <= 0)
1046  return;
1047 
1048  /* Get current timestamp. */
1049  now = GetCurrentTimestamp();
1050 
1051  /*
1052  * We can compare the write and flush positions to the last message we
1053  * sent without taking any lock, but the apply position requires a spin
1054  * lock, so we don't check that unless something else has changed or 10
1055  * seconds have passed. This means that the apply WAL location will
1056  * appear, from the primary's point of view, to lag slightly, but since
1057  * this is only for reporting purposes and only on idle systems, that's
1058  * probably OK.
1059  */
1060  if (!force
1061  && writePtr == LogstreamResult.Write
1062  && flushPtr == LogstreamResult.Flush
1063  && !TimestampDifferenceExceeds(sendTime, now,
1065  return;
1066  sendTime = now;
1067 
1068  /* Construct a new message */
1069  writePtr = LogstreamResult.Write;
1070  flushPtr = LogstreamResult.Flush;
1071  applyPtr = GetXLogReplayRecPtr(NULL);
1072 
1074  pq_sendbyte(&reply_message, 'r');
1075  pq_sendint64(&reply_message, writePtr);
1076  pq_sendint64(&reply_message, flushPtr);
1077  pq_sendint64(&reply_message, applyPtr);
1079  pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1080 
1081  /* Send it */
1082  elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1083  (uint32) (writePtr >> 32), (uint32) writePtr,
1084  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1085  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1086  requestReply ? " (reply requested)" : "");
1087 
1089 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
int64 TimestampTz
Definition: timestamp.h:39
static struct @22 LogstreamResult
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
int wal_receiver_status_interval
Definition: walreceiver.c:88
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1709
static WalReceiverConn * wrconn
Definition: walreceiver.c:93
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static StringInfoData reply_message
Definition: walreceiver.c:117
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11638
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
unsigned int uint32
Definition: c.h:429
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:418
#define elog(elevel,...)
Definition: elog.h:228
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542

◆ XLogWalRcvWrite()

static void XLogWalRcvWrite ( char *  buf,
Size  nbytes,
XLogRecPtr  recptr 
)
static

Definition at line 869 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, close, ereport, errcode_for_file_access(), errmsg(), LogstreamResult, MAXFNAMELEN, PANIC, pg_atomic_write_u64(), pg_pwrite(), recvFile, recvFileTLI, recvSegNo, ThisTimeLineID, wal_segment_size, WalRcv, WalRcvData::writtenUpto, XLByteInSeg, XLByteToSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileInit(), XLogFileName, XLogSegmentOffset, and XLogWalRcvFlush().

Referenced by XLogWalRcvProcessMsg().

870 {
871  int startoff;
872  int byteswritten;
873 
874  while (nbytes > 0)
875  {
876  int segbytes;
877 
878  if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
879  {
880  bool use_existent;
881 
882  /*
883  * fsync() and close current file before we switch to next one. We
884  * would otherwise have to reopen this file to fsync it later
885  */
886  if (recvFile >= 0)
887  {
888  char xlogfname[MAXFNAMELEN];
889 
890  XLogWalRcvFlush(false);
891 
893 
894  /*
895  * XLOG segment files will be re-read by recovery in startup
896  * process soon, so we don't advise the OS to release cache
897  * pages associated with the file like XLogFileClose() does.
898  */
899  if (close(recvFile) != 0)
900  ereport(PANIC,
902  errmsg("could not close log segment %s: %m",
903  xlogfname)));
904 
905  /*
906  * Create .done file forcibly to prevent the streamed segment
907  * from being archived later.
908  */
910  XLogArchiveForceDone(xlogfname);
911  else
912  XLogArchiveNotify(xlogfname);
913  }
914  recvFile = -1;
915 
916  /* Create/use new log file */
918  use_existent = true;
919  recvFile = XLogFileInit(recvSegNo, &use_existent, true);
921  }
922 
923  /* Calculate the start offset of the received logs */
924  startoff = XLogSegmentOffset(recptr, wal_segment_size);
925 
926  if (startoff + nbytes > wal_segment_size)
927  segbytes = wal_segment_size - startoff;
928  else
929  segbytes = nbytes;
930 
931  /* OK to write the logs */
932  errno = 0;
933 
934  byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
935  if (byteswritten <= 0)
936  {
937  char xlogfname[MAXFNAMELEN];
938  int save_errno;
939 
940  /* if write didn't set errno, assume no disk space */
941  if (errno == 0)
942  errno = ENOSPC;
943 
944  save_errno = errno;
946  errno = save_errno;
947  ereport(PANIC,
949  errmsg("could not write to log segment %s "
950  "at offset %u, length %lu: %m",
951  xlogfname, startoff, (unsigned long) segbytes)));
952  }
953 
954  /* Update state for write */
955  recptr += byteswritten;
956 
957  nbytes -= byteswritten;
958  buf += byteswritten;
959 
960  LogstreamResult.Write = recptr;
961  }
962 
963  /* Update shared-memory status */
965 }
int wal_segment_size
Definition: xlog.c:118
int XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
Definition: xlog.c:3267
static struct @22 LogstreamResult
#define PANIC
Definition: elog.h:55
ssize_t pg_pwrite(int fd, const void *buf, size_t nbyte, off_t offset)
Definition: pwrite.c:27
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:438
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:151
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:467
static char * buf
Definition: pg_test_fsync.c:68
int XLogArchiveMode
Definition: xlog.c:96
int errcode_for_file_access(void)
Definition: elog.c:727
static TimeLineID recvFileTLI
Definition: walreceiver.c:104
static XLogSegNo recvSegNo
Definition: walreceiver.c:105
#define MAXFNAMELEN
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:517
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
TimeLineID ThisTimeLineID
Definition: xlog.c:193
#define ereport(elevel,...)
Definition: elog.h:155
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:974
WalRcvData * WalRcv
int errmsg(const char *fmt,...)
Definition: elog.c:915
#define close(a)
Definition: win32.h:12
static int recvFile
Definition: walreceiver.c:103
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

Variable Documentation

◆ Flush

XLogRecPtr Flush

Definition at line 114 of file walreceiver.c.

◆ hot_standby_feedback

bool hot_standby_feedback

Definition at line 90 of file walreceiver.c.

Referenced by XLogWalRcvSendHSFeedback().

◆ incoming_message

StringInfoData incoming_message
static

Definition at line 118 of file walreceiver.c.

◆ LogstreamResult

struct { ... } LogstreamResult

◆ recvFile

int recvFile = -1
static

Definition at line 103 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ recvFileTLI

TimeLineID recvFileTLI = 0
static

Definition at line 104 of file walreceiver.c.

Referenced by WalReceiverMain(), and XLogWalRcvWrite().

◆ recvSegNo

XLogSegNo recvSegNo = 0
static

Definition at line 105 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ reply_message

StringInfoData reply_message
static

Definition at line 117 of file walreceiver.c.

Referenced by send_feedback().

◆ wal_receiver_status_interval

int wal_receiver_status_interval

Definition at line 88 of file walreceiver.c.

Referenced by send_feedback(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ wal_receiver_timeout

int wal_receiver_timeout

Definition at line 89 of file walreceiver.c.

Referenced by logicalrep_worker_launch(), LogicalRepApplyLoop(), and WalReceiverMain().

◆ WalReceiverFunctions

WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 94 of file walreceiver.c.

Referenced by _PG_init().

◆ wrconn

WalReceiverConn* wrconn = NULL
static

Definition at line 93 of file walreceiver.c.

◆ Write

XLogRecPtr Write

Definition at line 113 of file walreceiver.c.