PostgreSQL Source Code  git master
walreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "common/ip.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */
 

Functions

static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr)
 
static void XLogWalRcvFlush (bool dying)
 
static void XLogWalRcvSendReply (bool force, bool requestReply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
void ProcessWalRcvInterrupts (void)
 
void WalReceiverMain (void)
 
void WalRcvForceReply (void)
 
static const char * WalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
struct {
   XLogRecPtr   Write
 
   XLogRecPtr   Flush
 
LogstreamResult
 
static StringInfoData reply_message
 
static StringInfoData incoming_message
 

Macro Definition Documentation

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */

Definition at line 97 of file walreceiver.c.

Referenced by WalReceiverMain().

Function Documentation

◆ pg_stat_get_wal_receiver()

Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1299 of file walreceiver.c.

References WalRcvData::conninfo, CStringGetTextDatum, elog, ERROR, WalRcvData::flushedUpto, get_call_result_type(), GetUserId(), heap_form_tuple(), HeapTupleGetDatum, Int32GetDatum, is_member_of_role(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum, MAXCONNINFO, MemSet, WalRcvData::mutex, NAMEDATALEN, TupleDescData::natts, NI_MAXHOST, palloc0(), pg_atomic_read_u64(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::sender_host, WalRcvData::sender_port, WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, strlcpy(), TimestampTzGetDatum, TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, WalRcvData::writtenUpto, and XLogRecPtrIsInvalid.

1300 {
1301  TupleDesc tupdesc;
1302  Datum *values;
1303  bool *nulls;
1304  int pid;
1305  bool ready_to_display;
1307  XLogRecPtr receive_start_lsn;
1308  TimeLineID receive_start_tli;
1309  XLogRecPtr written_lsn;
1310  XLogRecPtr flushed_lsn;
1311  TimeLineID received_tli;
1312  TimestampTz last_send_time;
1313  TimestampTz last_receipt_time;
1314  XLogRecPtr latest_end_lsn;
1315  TimestampTz latest_end_time;
1316  char sender_host[NI_MAXHOST];
1317  int sender_port = 0;
1318  char slotname[NAMEDATALEN];
1319  char conninfo[MAXCONNINFO];
1320 
1321  /* Take a lock to ensure value consistency */
1323  pid = (int) WalRcv->pid;
1324  ready_to_display = WalRcv->ready_to_display;
1325  state = WalRcv->walRcvState;
1326  receive_start_lsn = WalRcv->receiveStart;
1327  receive_start_tli = WalRcv->receiveStartTLI;
1328  flushed_lsn = WalRcv->flushedUpto;
1329  received_tli = WalRcv->receivedTLI;
1330  last_send_time = WalRcv->lastMsgSendTime;
1331  last_receipt_time = WalRcv->lastMsgReceiptTime;
1332  latest_end_lsn = WalRcv->latestWalEnd;
1333  latest_end_time = WalRcv->latestWalEndTime;
1334  strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1335  strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
1336  sender_port = WalRcv->sender_port;
1337  strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1339 
1340  /*
1341  * No WAL receiver (or not ready yet), just return a tuple with NULL
1342  * values
1343  */
1344  if (pid == 0 || !ready_to_display)
1345  PG_RETURN_NULL();
1346 
1347  /*
1348  * Read "writtenUpto" without holding a spinlock. Note that it may not be
1349  * consistent with the other shared variables of the WAL receiver
1350  * protected by a spinlock, but this should not be used for data integrity
1351  * checks.
1352  */
1353  written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto);
1354 
1355  /* determine result type */
1356  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1357  elog(ERROR, "return type must be a row type");
1358 
1359  values = palloc0(sizeof(Datum) * tupdesc->natts);
1360  nulls = palloc0(sizeof(bool) * tupdesc->natts);
1361 
1362  /* Fetch values */
1363  values[0] = Int32GetDatum(pid);
1364 
1365  if (!is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
1366  {
1367  /*
1368  * Only superusers and members of pg_read_all_stats can see details.
1369  * Other users only get the pid value to know whether it is a WAL
1370  * receiver, but no details.
1371  */
1372  MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1373  }
1374  else
1375  {
1376  values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1377 
1378  if (XLogRecPtrIsInvalid(receive_start_lsn))
1379  nulls[2] = true;
1380  else
1381  values[2] = LSNGetDatum(receive_start_lsn);
1382  values[3] = Int32GetDatum(receive_start_tli);
1383  if (XLogRecPtrIsInvalid(written_lsn))
1384  nulls[4] = true;
1385  else
1386  values[4] = LSNGetDatum(written_lsn);
1387  if (XLogRecPtrIsInvalid(flushed_lsn))
1388  nulls[5] = true;
1389  else
1390  values[5] = LSNGetDatum(flushed_lsn);
1391  values[6] = Int32GetDatum(received_tli);
1392  if (last_send_time == 0)
1393  nulls[7] = true;
1394  else
1395  values[7] = TimestampTzGetDatum(last_send_time);
1396  if (last_receipt_time == 0)
1397  nulls[8] = true;
1398  else
1399  values[8] = TimestampTzGetDatum(last_receipt_time);
1400  if (XLogRecPtrIsInvalid(latest_end_lsn))
1401  nulls[9] = true;
1402  else
1403  values[9] = LSNGetDatum(latest_end_lsn);
1404  if (latest_end_time == 0)
1405  nulls[10] = true;
1406  else
1407  values[10] = TimestampTzGetDatum(latest_end_time);
1408  if (*slotname == '\0')
1409  nulls[11] = true;
1410  else
1411  values[11] = CStringGetTextDatum(slotname);
1412  if (*sender_host == '\0')
1413  nulls[12] = true;
1414  else
1415  values[12] = CStringGetTextDatum(sender_host);
1416  if (sender_port == 0)
1417  nulls[13] = true;
1418  else
1419  values[13] = Int32GetDatum(sender_port);
1420  if (*conninfo == '\0')
1421  nulls[14] = true;
1422  else
1423  values[14] = CStringGetTextDatum(conninfo);
1424  }
1425 
1426  /* Returns the record as Datum */
1427  PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1428 }
int sender_port
Definition: walreceiver.h:119
static const char * WalRcvGetStateString(WalRcvState state)
Definition: walreceiver.c:1274
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:207
uint32 TimeLineID
Definition: xlogdefs.h:59
slock_t mutex
Definition: walreceiver.h:145
Oid GetUserId(void)
Definition: miscinit.c:478
int64 TimestampTz
Definition: timestamp.h:39
WalRcvState walRcvState
Definition: walreceiver.h:65
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
TimeLineID receivedTLI
Definition: walreceiver.h:86
#define MemSet(start, val, len)
Definition: c.h:1008
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:100
TimestampTz lastMsgSendTime
Definition: walreceiver.h:99
WalRcvState
Definition: walreceiver.h:45
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:153
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define NI_MAXHOST
Definition: getaddrinfo.h:88
#define ERROR
Definition: elog.h:46
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
pid_t pid
Definition: walreceiver.h:64
#define MAXCONNINFO
Definition: walreceiver.h:37
TimeLineID receiveStartTLI
Definition: walreceiver.h:76
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
void * palloc0(Size size)
Definition: mcxt.c:1093
uintptr_t Datum
Definition: postgres.h:411
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:353
bool ready_to_display
Definition: walreceiver.h:134
TimestampTz latestWalEndTime
Definition: walreceiver.h:106
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4869
XLogRecPtr latestWalEnd
Definition: walreceiver.h:105
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:317
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:221
WalRcvData * WalRcv
static Datum values[MAXATTR]
Definition: bootstrap.c:166
#define Int32GetDatum(X)
Definition: postgres.h:523
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:429
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:118
#define elog(elevel,...)
Definition: elog.h:232
#define CStringGetTextDatum(s)
Definition: builtins.h:82
XLogRecPtr receiveStart
Definition: walreceiver.h:75
XLogRecPtr flushedUpto
Definition: walreceiver.h:85
#define PG_RETURN_NULL()
Definition: fmgr.h:345
char slotname[NAMEDATALEN]
Definition: walreceiver.h:125
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:112

◆ ProcessWalRcvInterrupts()

void ProcessWalRcvInterrupts ( void  )

Definition at line 146 of file walreceiver.c.

References CHECK_FOR_INTERRUPTS, ereport, errcode(), errmsg(), FATAL, and ShutdownRequestPending.

Referenced by libpqrcv_connect(), libpqrcv_PQgetResult(), libpqrcv_processTuples(), walrcv_clear_result(), WalRcvWaitForStartPosition(), and WalReceiverMain().

147 {
148  /*
149  * Although walreceiver interrupt handling doesn't use the same scheme as
150  * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
151  * any incoming signals on Win32, and also to make sure we process any
152  * barrier events.
153  */
155 
157  {
158  ereport(FATAL,
159  (errcode(ERRCODE_ADMIN_SHUTDOWN),
160  errmsg("terminating walreceiver process due to administrator command")));
161  }
162 }
int errcode(int sqlerrcode)
Definition: elog.c:698
#define FATAL
Definition: elog.h:49
volatile sig_atomic_t ShutdownRequestPending
Definition: interrupt.c:27
#define ereport(elevel,...)
Definition: elog.h:157
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:102

◆ ProcessWalSndrMessage()

static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1203 of file walreceiver.c.

References DEBUG2, elog, GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, message_level_is_interesting(), WalRcvData::mutex, pfree(), pstrdup(), SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

1204 {
1205  WalRcvData *walrcv = WalRcv;
1206 
1207  TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1208 
1209  /* Update shared-memory status */
1210  SpinLockAcquire(&walrcv->mutex);
1211  if (walrcv->latestWalEnd < walEnd)
1212  walrcv->latestWalEndTime = sendTime;
1213  walrcv->latestWalEnd = walEnd;
1214  walrcv->lastMsgSendTime = sendTime;
1215  walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1216  SpinLockRelease(&walrcv->mutex);
1217 
1219  {
1220  char *sendtime;
1221  char *receipttime;
1222  int applyDelay;
1223 
1224  /* Copy because timestamptz_to_str returns a static buffer */
1225  sendtime = pstrdup(timestamptz_to_str(sendTime));
1226  receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1227  applyDelay = GetReplicationApplyDelay();
1228 
1229  /* apply delay is not available */
1230  if (applyDelay == -1)
1231  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1232  sendtime,
1233  receipttime,
1235  else
1236  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1237  sendtime,
1238  receipttime,
1239  applyDelay,
1241 
1242  pfree(sendtime);
1243  pfree(receipttime);
1244  }
1245 }
slock_t mutex
Definition: walreceiver.h:145
int GetReplicationTransferLatency(void)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1299
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:100
TimestampTz lastMsgSendTime
Definition: walreceiver.h:99
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pfree(void *pointer)
Definition: mcxt.c:1169
#define DEBUG2
Definition: elog.h:24
bool message_level_is_interesting(int elevel)
Definition: elog.c:270
int GetReplicationApplyDelay(void)
#define SpinLockRelease(lock)
Definition: spin.h:64
TimestampTz latestWalEndTime
Definition: walreceiver.h:106
XLogRecPtr latestWalEnd
Definition: walreceiver.h:105
WalRcvData * WalRcv
#define elog(elevel,...)
Definition: elog.h:232
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1774

◆ WalRcvDie()

static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 768 of file walreceiver.c.

References Assert, ConditionVariableBroadcast(), WalRcvData::latch, WalRcvData::mutex, MyProcPid, WalRcvData::pid, WalRcvData::ready_to_display, SpinLockAcquire, SpinLockRelease, WakeupRecovery(), WalRcv, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

769 {
770  WalRcvData *walrcv = WalRcv;
771 
772  /* Ensure that all WAL records received are flushed to disk */
773  XLogWalRcvFlush(true);
774 
775  /* Mark ourselves inactive in shared memory */
776  SpinLockAcquire(&walrcv->mutex);
777  Assert(walrcv->walRcvState == WALRCV_STREAMING ||
778  walrcv->walRcvState == WALRCV_RESTARTING ||
779  walrcv->walRcvState == WALRCV_STARTING ||
780  walrcv->walRcvState == WALRCV_WAITING ||
781  walrcv->walRcvState == WALRCV_STOPPING);
782  Assert(walrcv->pid == MyProcPid);
783  walrcv->walRcvState = WALRCV_STOPPED;
784  walrcv->pid = 0;
785  walrcv->ready_to_display = false;
786  walrcv->latch = NULL;
787  SpinLockRelease(&walrcv->mutex);
788 
790 
791  /* Terminate the connection gracefully. */
792  if (wrconn != NULL)
794 
795  /* Wake up the startup process to notice promptly that we're gone */
796  WakeupRecovery();
797 }
int MyProcPid
Definition: globals.c:43
slock_t mutex
Definition: walreceiver.h:145
WalRcvState walRcvState
Definition: walreceiver.h:65
void ConditionVariableBroadcast(ConditionVariable *cv)
static WalReceiverConn * wrconn
Definition: walreceiver.c:94
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12935
pid_t pid
Definition: walreceiver.h:64
Latch * latch
Definition: walreceiver.h:143
#define SpinLockRelease(lock)
Definition: spin.h:64
bool ready_to_display
Definition: walreceiver.h:134
#define Assert(condition)
Definition: c.h:804
#define walrcv_disconnect(conn)
Definition: walreceiver.h:429
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:975
WalRcvData * WalRcv
ConditionVariable walRcvStoppedCV
Definition: walreceiver.h:66

◆ WalRcvFetchTimeLineHistoryFiles()

static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 712 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, ereport, errcode(), errmsg(), errmsg_internal(), ERROR, existsTimeLineHistory(), LOG, MAXFNAMELEN, pfree(), TLHistoryFileName, walrcv_readtimelinehistoryfile, writeTimeLineHistoryFile(), XLogArchiveForceDone(), XLogArchiveMode, and XLogArchiveNotify().

Referenced by WalReceiverMain().

713 {
714  TimeLineID tli;
715 
716  for (tli = first; tli <= last; tli++)
717  {
718  /* there's no history file for timeline 1 */
719  if (tli != 1 && !existsTimeLineHistory(tli))
720  {
721  char *fname;
722  char *content;
723  int len;
724  char expectedfname[MAXFNAMELEN];
725 
726  ereport(LOG,
727  (errmsg("fetching timeline history file for timeline %u from primary server",
728  tli)));
729 
730  walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
731 
732  /*
733  * Check that the filename on the primary matches what we
734  * calculated ourselves. This is just a sanity check, it should
735  * always match.
736  */
737  TLHistoryFileName(expectedfname, tli);
738  if (strcmp(fname, expectedfname) != 0)
739  ereport(ERROR,
740  (errcode(ERRCODE_PROTOCOL_VIOLATION),
741  errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
742  tli)));
743 
744  /*
745  * Write the file to pg_wal.
746  */
747  writeTimeLineHistoryFile(tli, content, len);
748 
749  /*
750  * Mark the streamed history file as ready for archiving
751  * if archive_mode is always.
752  */
754  XLogArchiveForceDone(fname);
755  else
756  XLogArchiveNotify(fname);
757 
758  pfree(fname);
759  pfree(content);
760  }
761  }
762 }
uint32 TimeLineID
Definition: xlogdefs.h:59
int errcode(int sqlerrcode)
Definition: elog.c:698
#define LOG
Definition: elog.h:26
static WalReceiverConn * wrconn
Definition: walreceiver.c:94
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:467
void pfree(void *pointer)
Definition: mcxt.c:1169
#define ERROR
Definition: elog.h:46
#define TLHistoryFileName(fname, tli)
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition: timeline.c:467
int XLogArchiveMode
Definition: xlog.c:97
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition: timeline.c:222
#define MAXFNAMELEN
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:517
#define ereport(elevel,...)
Definition: elog.h:157
int errmsg_internal(const char *fmt,...)
Definition: elog.c:996
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
Definition: walreceiver.h:413
int errmsg(const char *fmt,...)
Definition: elog.c:909

◆ WalRcvForceReply()

void WalRcvForceReply ( void  )

Definition at line 1256 of file walreceiver.c.

References WalRcvData::force_reply, WalRcvData::latch, WalRcvData::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by StartupXLOG(), WaitForWALToBecomeAvailable(), and walrcv_clear_result().

1257 {
1258  Latch *latch;
1259 
1260  WalRcv->force_reply = true;
1261  /* fetching the latch pointer might not be atomic, so use spinlock */
1263  latch = WalRcv->latch;
1265  if (latch)
1266  SetLatch(latch);
1267 }
slock_t mutex
Definition: walreceiver.h:145
sig_atomic_t force_reply
Definition: walreceiver.h:160
void SetLatch(Latch *latch)
Definition: latch.c:567
#define SpinLockAcquire(lock)
Definition: spin.h:62
Definition: latch.h:110
Latch * latch
Definition: walreceiver.h:143
#define SpinLockRelease(lock)
Definition: spin.h:64
WalRcvData * WalRcv

◆ WalRcvGetStateString()

static const char* WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1274 of file walreceiver.c.

References WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

1275 {
1276  switch (state)
1277  {
1278  case WALRCV_STOPPED:
1279  return "stopped";
1280  case WALRCV_STARTING:
1281  return "starting";
1282  case WALRCV_STREAMING:
1283  return "streaming";
1284  case WALRCV_WAITING:
1285  return "waiting";
1286  case WALRCV_RESTARTING:
1287  return "restarting";
1288  case WALRCV_STOPPING:
1289  return "stopping";
1290  }
1291  return "UNKNOWN";
1292 }
Definition: regguts.h:317

◆ WalRcvWaitForStartPosition()

static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 632 of file walreceiver.c.

References Assert, elog, FATAL, InvalidXLogRecPtr, LSN_FORMAT_ARGS, WalRcvData::mutex, MyLatch, proc_exit(), ProcessWalRcvInterrupts(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, ResetLatch(), set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WAIT_EVENT_WAL_RECEIVER_WAIT_START, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by WalReceiverMain().

633 {
634  WalRcvData *walrcv = WalRcv;
635  int state;
636 
637  SpinLockAcquire(&walrcv->mutex);
638  state = walrcv->walRcvState;
639  if (state != WALRCV_STREAMING)
640  {
641  SpinLockRelease(&walrcv->mutex);
642  if (state == WALRCV_STOPPING)
643  proc_exit(0);
644  else
645  elog(FATAL, "unexpected walreceiver state");
646  }
647  walrcv->walRcvState = WALRCV_WAITING;
649  walrcv->receiveStartTLI = 0;
650  SpinLockRelease(&walrcv->mutex);
651 
652  set_ps_display("idle");
653 
654  /*
655  * nudge startup process to notice that we've stopped streaming and are
656  * now waiting for instructions.
657  */
658  WakeupRecovery();
659  for (;;)
660  {
662 
664 
665  SpinLockAcquire(&walrcv->mutex);
666  Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
667  walrcv->walRcvState == WALRCV_WAITING ||
668  walrcv->walRcvState == WALRCV_STOPPING);
669  if (walrcv->walRcvState == WALRCV_RESTARTING)
670  {
671  /*
672  * No need to handle changes in primary_conninfo or
673  * primary_slotname here. Startup process will signal us to
674  * terminate in case those change.
675  */
676  *startpoint = walrcv->receiveStart;
677  *startpointTLI = walrcv->receiveStartTLI;
678  walrcv->walRcvState = WALRCV_STREAMING;
679  SpinLockRelease(&walrcv->mutex);
680  break;
681  }
682  if (walrcv->walRcvState == WALRCV_STOPPING)
683  {
684  /*
685  * We should've received SIGTERM if the startup process wants us
686  * to die, but might as well check it here too.
687  */
688  SpinLockRelease(&walrcv->mutex);
689  exit(1);
690  }
691  SpinLockRelease(&walrcv->mutex);
692 
695  }
696 
698  {
699  char activitymsg[50];
700 
701  snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
702  LSN_FORMAT_ARGS(*startpoint));
703  set_ps_display(activitymsg);
704  }
705 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
slock_t mutex
Definition: walreceiver.h:145
bool update_process_title
Definition: ps_status.c:36
WalRcvState walRcvState
Definition: walreceiver.h:65
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:146
void proc_exit(int code)
Definition: ipc.c:104
void ResetLatch(Latch *latch)
Definition: latch.c:660
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:452
void set_ps_display(const char *activity)
Definition: ps_status.c:349
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
void WakeupRecovery(void)
Definition: xlog.c:12935
#define FATAL
Definition: elog.h:49
TimeLineID receiveStartTLI
Definition: walreceiver.h:76
#define SpinLockRelease(lock)
Definition: spin.h:64
#define Assert(condition)
Definition: c.h:804
Definition: regguts.h:317
WalRcvData * WalRcv
#define elog(elevel,...)
Definition: elog.h:232
struct Latch * MyLatch
Definition: globals.c:57
XLogRecPtr receiveStart
Definition: walreceiver.h:75
#define snprintf
Definition: port.h:216
#define WL_LATCH_SET
Definition: latch.h:125
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130

◆ WalReceiverMain()

void WalReceiverMain ( void  )

Definition at line 167 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, Assert, buf, close, cluster_name, ConditionVariableBroadcast(), ConfigReloadPending, WalRcvData::conninfo, DEBUG1, elog, ereport, errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, WalRcvData::force_reply, GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), initStringInfo(), WalRcvData::is_temp_slot, WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latch, WalRcvData::latestWalEndTime, load_file(), LOG, WalRcvStreamOptions::logical, LogstreamResult, LSN_FORMAT_ARGS, MAXCONNINFO, MAXFNAMELEN, WalRcvData::mutex, MyLatch, MyProc, MyProcPid, NAMEDATALEN, NAPTIME_PER_CYCLE, NI_MAXHOST, now(), on_shmem_exit(), options, PANIC, pfree(), pg_atomic_write_u64(), pg_memory_barrier, PG_SETMASK, PGC_SIGHUP, PGINVALID_SOCKET, WalRcvStreamOptions::physical, WalRcvData::pid, pqsignal(), proc_exit(), ProcessConfigFile(), ProcessWalRcvInterrupts(), PGPROC::procLatch, procsignal_sigusr1_handler(), WalRcvStreamOptions::proto, WalRcvData::ready_to_display, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, ResetLatch(), WalRcvData::sender_host, WalRcvData::sender_port, SIG_DFL, SIG_IGN, SIGALRM, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SIGPIPE, SIGUSR1, SIGUSR2, WalRcvData::slotname, WalRcvStreamOptions::slotname, snprintf, SpinLockAcquire, SpinLockRelease, WalRcvStreamOptions::startpoint, strlcpy(), ThisTimeLineID, TimestampTzPlusMilliseconds, UINT64_FORMAT, UnBlockSig, WAIT_EVENT_WAL_RECEIVER_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, wal_segment_size, WalRcv, walrcv_connect, walrcv_create_slot, walrcv_endstreaming, walrcv_get_backend_pid, walrcv_get_conninfo, walrcv_get_senderinfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, WalRcvWaitForStartPosition(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_TIMEOUT, WalRcvData::writtenUpto, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName, XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by AuxiliaryProcessMain(), and walrcv_clear_result().

168 {
169  char conninfo[MAXCONNINFO];
170  char *tmp_conninfo;
171  char slotname[NAMEDATALEN];
172  bool is_temp_slot;
173  XLogRecPtr startpoint;
174  TimeLineID startpointTLI;
175  TimeLineID primaryTLI;
176  bool first_stream;
177  WalRcvData *walrcv = WalRcv;
178  TimestampTz last_recv_timestamp;
180  bool ping_sent;
181  char *err;
182  char *sender_host = NULL;
183  int sender_port = 0;
184 
185  /*
186  * WalRcv should be set up already (if we are a backend, we inherit this
187  * by fork() or EXEC_BACKEND mechanism from the postmaster).
188  */
189  Assert(walrcv != NULL);
190 
191  now = GetCurrentTimestamp();
192 
193  /*
194  * Mark walreceiver as running in shared memory.
195  *
196  * Do this as early as possible, so that if we fail later on, we'll set
197  * state to STOPPED. If we die before this, the startup process will keep
198  * waiting for us to start up, until it times out.
199  */
200  SpinLockAcquire(&walrcv->mutex);
201  Assert(walrcv->pid == 0);
202  switch (walrcv->walRcvState)
203  {
204  case WALRCV_STOPPING:
205  /* If we've already been requested to stop, don't start up. */
206  walrcv->walRcvState = WALRCV_STOPPED;
207  /* fall through */
208 
209  case WALRCV_STOPPED:
210  SpinLockRelease(&walrcv->mutex);
212  proc_exit(1);
213  break;
214 
215  case WALRCV_STARTING:
216  /* The usual case */
217  break;
218 
219  case WALRCV_WAITING:
220  case WALRCV_STREAMING:
221  case WALRCV_RESTARTING:
222  default:
223  /* Shouldn't happen */
224  SpinLockRelease(&walrcv->mutex);
225  elog(PANIC, "walreceiver still running according to shared memory state");
226  }
227  /* Advertise our PID so that the startup process can kill us */
228  walrcv->pid = MyProcPid;
229  walrcv->walRcvState = WALRCV_STREAMING;
230 
231  /* Fetch information required to start streaming */
232  walrcv->ready_to_display = false;
233  strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
234  strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
235  is_temp_slot = walrcv->is_temp_slot;
236  startpoint = walrcv->receiveStart;
237  startpointTLI = walrcv->receiveStartTLI;
238 
239  /*
240  * At most one of is_temp_slot and slotname can be set; otherwise,
241  * RequestXLogStreaming messed up.
242  */
243  Assert(!is_temp_slot || (slotname[0] == '\0'));
244 
245  /* Initialise to a sanish value */
246  walrcv->lastMsgSendTime =
247  walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
248 
249  /* Report the latch to use to awaken this process */
250  walrcv->latch = &MyProc->procLatch;
251 
252  SpinLockRelease(&walrcv->mutex);
253 
255 
256  /* Arrange to clean up at walreceiver exit */
258 
259  /* Properly accept or ignore signals the postmaster might send us */
260  pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
261  * file */
262  pqsignal(SIGINT, SIG_IGN);
263  pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
264  /* SIGQUIT handler was already set up by InitPostmasterChild */
269 
270  /* Reset some signals that are accepted by postmaster but not here */
272 
273  /* Load the libpq-specific functions */
274  load_file("libpqwalreceiver", false);
275  if (WalReceiverFunctions == NULL)
276  elog(ERROR, "libpqwalreceiver didn't initialize correctly");
277 
278  /* Unblock signals (they were blocked when the postmaster forked us) */
280 
281  /* Establish the connection to the primary for XLOG streaming */
282  wrconn = walrcv_connect(conninfo, false, cluster_name[0] ? cluster_name : "walreceiver", &err);
283  if (!wrconn)
284  ereport(ERROR,
285  (errmsg("could not connect to the primary server: %s", err)));
286 
287  /*
288  * Save user-visible connection string. This clobbers the original
289  * conninfo, for security. Also save host and port of the sender server
290  * this walreceiver is connected to.
291  */
292  tmp_conninfo = walrcv_get_conninfo(wrconn);
293  walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
294  SpinLockAcquire(&walrcv->mutex);
295  memset(walrcv->conninfo, 0, MAXCONNINFO);
296  if (tmp_conninfo)
297  strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
298 
299  memset(walrcv->sender_host, 0, NI_MAXHOST);
300  if (sender_host)
301  strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
302 
303  walrcv->sender_port = sender_port;
304  walrcv->ready_to_display = true;
305  SpinLockRelease(&walrcv->mutex);
306 
307  if (tmp_conninfo)
308  pfree(tmp_conninfo);
309 
310  if (sender_host)
311  pfree(sender_host);
312 
313  first_stream = true;
314  for (;;)
315  {
316  char *primary_sysid;
317  char standby_sysid[32];
319 
320  /*
321  * Check that we're connected to a valid server using the
322  * IDENTIFY_SYSTEM replication command.
323  */
324  primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
325 
326  snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
328  if (strcmp(primary_sysid, standby_sysid) != 0)
329  {
330  ereport(ERROR,
331  (errmsg("database system identifier differs between the primary and standby"),
332  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
333  primary_sysid, standby_sysid)));
334  }
335 
336  /*
337  * Confirm that the current timeline of the primary is the same or
338  * ahead of ours.
339  */
340  if (primaryTLI < startpointTLI)
341  ereport(ERROR,
342  (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
343  primaryTLI, startpointTLI)));
344 
345  /*
346  * Get any missing history files. We do this always, even when we're
347  * not interested in that timeline, so that if we're promoted to
348  * become the primary later on, we don't select the same timeline that
349  * was already used in the current primary. This isn't bullet-proof -
350  * you'll need some external software to manage your cluster if you
351  * need to ensure that a unique timeline id is chosen in every case,
352  * but let's avoid the confusion of timeline id collisions where we
353  * can.
354  */
355  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
356 
357  /*
358  * Create temporary replication slot if requested, and update slot
359  * name in shared memory. (Note the slot name cannot already be set
360  * in this case.)
361  */
362  if (is_temp_slot)
363  {
364  snprintf(slotname, sizeof(slotname),
365  "pg_walreceiver_%lld",
366  (long long int) walrcv_get_backend_pid(wrconn));
367 
368  walrcv_create_slot(wrconn, slotname, true, 0, NULL);
369 
370  SpinLockAcquire(&walrcv->mutex);
371  strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
372  SpinLockRelease(&walrcv->mutex);
373  }
374 
375  /*
376  * Start streaming.
377  *
378  * We'll try to start at the requested starting point and timeline,
379  * even if it's different from the server's latest timeline. In case
380  * we've already reached the end of the old timeline, the server will
381  * finish the streaming immediately, and we will go back to await
382  * orders from the startup process. If recovery_target_timeline is
383  * 'latest', the startup process will scan pg_wal and find the new
384  * history file, bump recovery target timeline, and ask us to restart
385  * on the new timeline.
386  */
387  options.logical = false;
388  options.startpoint = startpoint;
389  options.slotname = slotname[0] != '\0' ? slotname : NULL;
390  options.proto.physical.startpointTLI = startpointTLI;
391  ThisTimeLineID = startpointTLI;
392  if (walrcv_startstreaming(wrconn, &options))
393  {
394  if (first_stream)
395  ereport(LOG,
396  (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
397  LSN_FORMAT_ARGS(startpoint), startpointTLI)));
398  else
399  ereport(LOG,
400  (errmsg("restarted WAL streaming at %X/%X on timeline %u",
401  LSN_FORMAT_ARGS(startpoint), startpointTLI)));
402  first_stream = false;
403 
404  /* Initialize LogstreamResult and buffers for processing messages */
408 
409  /* Initialize the last recv timestamp */
410  last_recv_timestamp = GetCurrentTimestamp();
411  ping_sent = false;
412 
413  /* Loop until end-of-streaming or error */
414  for (;;)
415  {
416  char *buf;
417  int len;
418  bool endofwal = false;
419  pgsocket wait_fd = PGINVALID_SOCKET;
420  int rc;
421 
422  /*
423  * Exit walreceiver if we're not in recovery. This should not
424  * happen, but cross-check the status here.
425  */
426  if (!RecoveryInProgress())
427  ereport(FATAL,
428  (errmsg("cannot continue WAL streaming, recovery has already ended")));
429 
430  /* Process any requests or signals received recently */
432 
434  {
435  ConfigReloadPending = false;
438  }
439 
440  /* See if we can read data immediately */
441  len = walrcv_receive(wrconn, &buf, &wait_fd);
442  if (len != 0)
443  {
444  /*
445  * Process the received data, and any subsequent data we
446  * can read without blocking.
447  */
448  for (;;)
449  {
450  if (len > 0)
451  {
452  /*
453  * Something was received from primary, so reset
454  * timeout
455  */
456  last_recv_timestamp = GetCurrentTimestamp();
457  ping_sent = false;
458  XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
459  }
460  else if (len == 0)
461  break;
462  else if (len < 0)
463  {
464  ereport(LOG,
465  (errmsg("replication terminated by primary server"),
466  errdetail("End of WAL reached on timeline %u at %X/%X.",
467  startpointTLI,
469  endofwal = true;
470  break;
471  }
472  len = walrcv_receive(wrconn, &buf, &wait_fd);
473  }
474 
475  /* Let the primary know that we received some data. */
476  XLogWalRcvSendReply(false, false);
477 
478  /*
479  * If we've written some records, flush them to disk and
480  * let the startup process and primary server know about
481  * them.
482  */
483  XLogWalRcvFlush(false);
484  }
485 
486  /* Check if we need to exit the streaming loop. */
487  if (endofwal)
488  break;
489 
490  /*
491  * Ideally we would reuse a WaitEventSet object repeatedly
492  * here to avoid the overheads of WaitLatchOrSocket on epoll
493  * systems, but we can't be sure that libpq (or any other
494  * walreceiver implementation) has the same socket (even if
495  * the fd is the same number, it may have been closed and
496  * reopened since the last time). In future, if there is a
497  * function for removing sockets from WaitEventSet, then we
498  * could add and remove just the socket each time, potentially
499  * avoiding some system calls.
500  */
501  Assert(wait_fd != PGINVALID_SOCKET);
505  wait_fd,
508  if (rc & WL_LATCH_SET)
509  {
512 
513  if (walrcv->force_reply)
514  {
515  /*
516  * The recovery process has asked us to send apply
517  * feedback now. Make sure the flag is really set to
518  * false in shared memory before sending the reply, so
519  * we don't miss a new request for a reply.
520  */
521  walrcv->force_reply = false;
523  XLogWalRcvSendReply(true, false);
524  }
525  }
526  if (rc & WL_TIMEOUT)
527  {
528  /*
529  * We didn't receive anything new. If we haven't heard
530  * anything from the server for more than
531  * wal_receiver_timeout / 2, ping the server. Also, if
532  * it's been longer than wal_receiver_status_interval
533  * since the last update we sent, send a status update to
534  * the primary anyway, to report any progress in applying
535  * WAL.
536  */
537  bool requestReply = false;
538 
539  /*
540  * Check if time since last receive from primary has
541  * reached the configured limit.
542  */
543  if (wal_receiver_timeout > 0)
544  {
546  TimestampTz timeout;
547 
548  timeout =
549  TimestampTzPlusMilliseconds(last_recv_timestamp,
551 
552  if (now >= timeout)
553  ereport(ERROR,
554  (errmsg("terminating walreceiver due to timeout")));
555 
556  /*
557  * We didn't receive anything new, for half of
558  * receiver replication timeout. Ping the server.
559  */
560  if (!ping_sent)
561  {
562  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
563  (wal_receiver_timeout / 2));
564  if (now >= timeout)
565  {
566  requestReply = true;
567  ping_sent = true;
568  }
569  }
570  }
571 
572  XLogWalRcvSendReply(requestReply, requestReply);
574  }
575  }
576 
577  /*
578  * The backend finished streaming. Exit streaming COPY-mode from
579  * our side, too.
580  */
581  walrcv_endstreaming(wrconn, &primaryTLI);
582 
583  /*
584  * If the server had switched to a new timeline that we didn't
585  * know about when we began streaming, fetch its timeline history
586  * file now.
587  */
588  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
589  }
590  else
591  ereport(LOG,
592  (errmsg("primary server contains no more WAL on requested timeline %u",
593  startpointTLI)));
594 
595  /*
596  * End of WAL reached on the requested timeline. Close the last
597  * segment, and await for new orders from the startup process.
598  */
599  if (recvFile >= 0)
600  {
601  char xlogfname[MAXFNAMELEN];
602 
603  XLogWalRcvFlush(false);
605  if (close(recvFile) != 0)
606  ereport(PANIC,
608  errmsg("could not close log segment %s: %m",
609  xlogfname)));
610 
611  /*
612  * Create .done file forcibly to prevent the streamed segment from
613  * being archived later.
614  */
616  XLogArchiveForceDone(xlogfname);
617  else
618  XLogArchiveNotify(xlogfname);
619  }
620  recvFile = -1;
621 
622  elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
623  WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
624  }
625  /* not reached */
626 }
int sender_port
Definition: walreceiver.h:119
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
Definition: walreceiver.c:712
#define walrcv_get_conninfo(conn)
Definition: walreceiver.h:405
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:417
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:43
uint32 TimeLineID
Definition: xlogdefs.h:59
slock_t mutex
Definition: walreceiver.h:145
#define WL_TIMEOUT
Definition: latch.h:128
void ProcessConfigFile(GucContext context)
int wal_segment_size
Definition: xlog.c:121
#define SIGUSR1
Definition: win32_port.h:171
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
#define SIGCHLD
Definition: win32_port.h:169
PGPROC * MyProc
Definition: proc.c:68
int64 TimestampTz
Definition: timestamp.h:39
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
Definition: walreceiver.c:632
sig_atomic_t force_reply
Definition: walreceiver.h:160
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:409
WalRcvState walRcvState
Definition: walreceiver.h:65
static StringInfoData incoming_message
Definition: walreceiver.c:119
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1102
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:419
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:146
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:415
void ConditionVariableBroadcast(ConditionVariable *cv)
void proc_exit(int code)
Definition: ipc.c:104
#define WL_SOCKET_READABLE
Definition: latch.h:126
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
Definition: walreceiver.c:803
#define SIGPIPE
Definition: win32_port.h:164
#define SIGUSR2
Definition: win32_port.h:172
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:8237
#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
Definition: walreceiver.h:423
#define PANIC
Definition: elog.h:50
#define NAPTIME_PER_CYCLE
Definition: walreceiver.c:97
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:100
void ResetLatch(Latch *latch)
Definition: latch.c:660
TimestampTz lastMsgSendTime
Definition: walreceiver.h:99
#define PG_SETMASK(mask)
Definition: pqsignal.h:19
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:438
int wal_receiver_timeout
Definition: walreceiver.c:90
Latch procLatch
Definition: proc.h:130
static WalReceiverConn * wrconn
Definition: walreceiver.c:94
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:153
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
static StringInfoData reply_message
Definition: walreceiver.c:118
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:467
void pfree(void *pointer)
Definition: mcxt.c:1169
#define NI_MAXHOST
Definition: getaddrinfo.h:88
#define ERROR
Definition: elog.h:46
#define FATAL
Definition: elog.h:49
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11728
pid_t pid
Definition: walreceiver.h:64
#define MAXCONNINFO
Definition: walreceiver.h:37
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
static char * buf
Definition: pg_test_fsync.c:68
int XLogArchiveMode
Definition: xlog.c:97
int errdetail(const char *fmt,...)
Definition: elog.c:1042
int errcode_for_file_access(void)
Definition: elog.c:721
#define SIGHUP
Definition: win32_port.h:159
XLogRecPtr startpoint
Definition: walreceiver.h:170
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:500
int pgsocket
Definition: port.h:31
sigset_t UnBlockSig
Definition: pqsignal.c:22
TimeLineID receiveStartTLI
Definition: walreceiver.h:76
static TimeLineID recvFileTLI
Definition: walreceiver.c:105
#define walrcv_get_backend_pid(conn)
Definition: walreceiver.h:425
Definition: guc.h:72
char * cluster_name
Definition: guc.c:582
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition: interrupt.c:104
Latch * latch
Definition: walreceiver.h:143
#define SIG_IGN
Definition: win32_port.h:156
struct WalRcvStreamOptions::@103::@104 physical
static char ** options
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
static XLogSegNo recvSegNo
Definition: walreceiver.c:106
#define MAXFNAMELEN
#define SpinLockRelease(lock)
Definition: spin.h:64
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:517
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:95
#define PGINVALID_SOCKET
Definition: port.h:33
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1033
bool ready_to_display
Definition: walreceiver.h:134
TimestampTz latestWalEndTime
Definition: walreceiver.h:106
TimeLineID ThisTimeLineID
Definition: xlog.c:196
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define ereport(elevel,...)
Definition: elog.h:157
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define pg_memory_barrier()
Definition: atomics.h:145
#define SIG_DFL
Definition: win32_port.h:154
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
#define SIGALRM
Definition: win32_port.h:165
bool is_temp_slot
Definition: walreceiver.h:131
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:975
WalRcvData * WalRcv
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4965
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:118
int errmsg(const char *fmt,...)
Definition: elog.c:909
static void WalRcvDie(int code, Datum arg)
Definition: walreceiver.c:768
#define elog(elevel,...)
Definition: elog.h:232
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
Definition: walreceiver.h:407
struct Latch * MyLatch
Definition: globals.c:57
union WalRcvStreamOptions::@103 proto
#define close(a)
Definition: win32.h:12
XLogRecPtr receiveStart
Definition: walreceiver.h:75
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:642
static int recvFile
Definition: walreceiver.c:104
#define snprintf
Definition: port.h:216
#define WL_LATCH_SET
Definition: latch.h:125
#define UINT64_FORMAT
Definition: c.h:484
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
char slotname[NAMEDATALEN]
Definition: walreceiver.h:125
ConditionVariable walRcvStoppedCV
Definition: walreceiver.h:66
static struct @17 LogstreamResult
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:112
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:401

◆ XLogWalRcvFlush()

static void XLogWalRcvFlush ( bool  dying)
static

Definition at line 975 of file walreceiver.c.

References AllowCascadeReplication, WalRcvData::flushedUpto, issue_xlog_fsync(), WalRcvData::latestChunkStart, LogstreamResult, LSN_FORMAT_ARGS, WalRcvData::mutex, WalRcvData::receivedTLI, recvFile, recvSegNo, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, ThisTimeLineID, update_process_title, WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvWrite().

976 {
977  if (LogstreamResult.Flush < LogstreamResult.Write)
978  {
979  WalRcvData *walrcv = WalRcv;
980 
982 
983  LogstreamResult.Flush = LogstreamResult.Write;
984 
985  /* Update shared-memory status */
986  SpinLockAcquire(&walrcv->mutex);
987  if (walrcv->flushedUpto < LogstreamResult.Flush)
988  {
989  walrcv->latestChunkStart = walrcv->flushedUpto;
990  walrcv->flushedUpto = LogstreamResult.Flush;
991  walrcv->receivedTLI = ThisTimeLineID;
992  }
993  SpinLockRelease(&walrcv->mutex);
994 
995  /* Signal the startup process and walsender that new WAL has arrived */
996  WakeupRecovery();
998  WalSndWakeup();
999 
1000  /* Report XLOG streaming progress in PS display */
1002  {
1003  char activitymsg[50];
1004 
1005  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1007  set_ps_display(activitymsg);
1008  }
1009 
1010  /* Also let the primary know that we made some progress */
1011  if (!dying)
1012  {
1013  XLogWalRcvSendReply(false, false);
1014  XLogWalRcvSendHSFeedback(false);
1015  }
1016  }
1017 }
slock_t mutex
Definition: walreceiver.h:145
bool update_process_title
Definition: ps_status.c:36
void issue_xlog_fsync(int fd, XLogSegNo segno)
Definition: xlog.c:10629
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1102
TimeLineID receivedTLI
Definition: walreceiver.h:86
void set_ps_display(const char *activity)
Definition: ps_status.c:349
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
void WakeupRecovery(void)
Definition: xlog.c:12935
XLogRecPtr latestChunkStart
Definition: walreceiver.h:94
#define AllowCascadeReplication()
Definition: walreceiver.h:40
static XLogSegNo recvSegNo
Definition: walreceiver.c:106
#define SpinLockRelease(lock)
Definition: spin.h:64
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1033
TimeLineID ThisTimeLineID
Definition: xlog.c:196
WalRcvData * WalRcv
static int recvFile
Definition: walreceiver.c:104
XLogRecPtr flushedUpto
Definition: walreceiver.h:85
#define snprintf
Definition: port.h:216
void WalSndWakeup(void)
Definition: walsender.c:3101
static struct @17 LogstreamResult

◆ XLogWalRcvProcessMsg()

static void XLogWalRcvProcessMsg ( unsigned char  type,
char *  buf,
Size  len 
)
static

Definition at line 803 of file walreceiver.c.

References appendBinaryStringInfo(), ereport, errcode(), errmsg_internal(), ERROR, pq_getmsgbyte(), pq_getmsgint64(), ProcessWalSndrMessage(), resetStringInfo(), XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

804 {
805  int hdrlen;
806  XLogRecPtr dataStart;
807  XLogRecPtr walEnd;
808  TimestampTz sendTime;
809  bool replyRequested;
810 
812 
813  switch (type)
814  {
815  case 'w': /* WAL records */
816  {
817  /* copy message to StringInfo */
818  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
819  if (len < hdrlen)
820  ereport(ERROR,
821  (errcode(ERRCODE_PROTOCOL_VIOLATION),
822  errmsg_internal("invalid WAL message received from primary")));
824 
825  /* read the fields */
826  dataStart = pq_getmsgint64(&incoming_message);
827  walEnd = pq_getmsgint64(&incoming_message);
828  sendTime = pq_getmsgint64(&incoming_message);
829  ProcessWalSndrMessage(walEnd, sendTime);
830 
831  buf += hdrlen;
832  len -= hdrlen;
833  XLogWalRcvWrite(buf, len, dataStart);
834  break;
835  }
836  case 'k': /* Keepalive */
837  {
838  /* copy message to StringInfo */
839  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
840  if (len != hdrlen)
841  ereport(ERROR,
842  (errcode(ERRCODE_PROTOCOL_VIOLATION),
843  errmsg_internal("invalid keepalive message received from primary")));
845 
846  /* read the fields */
847  walEnd = pq_getmsgint64(&incoming_message);
848  sendTime = pq_getmsgint64(&incoming_message);
849  replyRequested = pq_getmsgbyte(&incoming_message);
850 
851  ProcessWalSndrMessage(walEnd, sendTime);
852 
853  /* If the primary requested a reply, send one immediately */
854  if (replyRequested)
855  XLogWalRcvSendReply(true, false);
856  break;
857  }
858  default:
859  ereport(ERROR,
860  (errcode(ERRCODE_PROTOCOL_VIOLATION),
861  errmsg_internal("invalid replication message type %d",
862  type)));
863  }
864 }
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
Definition: walreceiver.c:1203
int64 TimestampTz
Definition: timestamp.h:39
static StringInfoData incoming_message
Definition: walreceiver.c:119
int errcode(int sqlerrcode)
Definition: elog.c:698
#define ERROR
Definition: elog.h:46
static char * buf
Definition: pg_test_fsync.c:68
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1033
#define ereport(elevel,...)
Definition: elog.h:157
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
int errmsg_internal(const char *fmt,...)
Definition: elog.c:996
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
Definition: walreceiver.c:870

◆ XLogWalRcvSendHSFeedback()

static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1102 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, EpochFromFullTransactionId, GetCurrentTimestamp(), GetReplicationHorizons(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), pq_sendbyte(), pq_sendint32(), pq_sendint64(), ReadNextFullTransactionId(), resetStringInfo(), TimestampDifferenceExceeds(), TransactionIdIsValid, wal_receiver_status_interval, walrcv_send, and XidFromFullTransactionId.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

1103 {
1104  TimestampTz now;
1105  FullTransactionId nextFullXid;
1106  TransactionId nextXid;
1107  uint32 xmin_epoch,
1108  catalog_xmin_epoch;
1109  TransactionId xmin,
1110  catalog_xmin;
1111  static TimestampTz sendTime = 0;
1112 
1113  /* initially true so we always send at least one feedback message */
1114  static bool primary_has_standby_xmin = true;
1115 
1116  /*
1117  * If the user doesn't want status to be reported to the primary, be sure
1118  * to exit before doing anything at all.
1119  */
1121  !primary_has_standby_xmin)
1122  return;
1123 
1124  /* Get current timestamp. */
1125  now = GetCurrentTimestamp();
1126 
1127  if (!immed)
1128  {
1129  /*
1130  * Send feedback at most once per wal_receiver_status_interval.
1131  */
1132  if (!TimestampDifferenceExceeds(sendTime, now,
1134  return;
1135  sendTime = now;
1136  }
1137 
1138  /*
1139  * If Hot Standby is not yet accepting connections there is nothing to
1140  * send. Check this after the interval has expired to reduce number of
1141  * calls.
1142  *
1143  * Bailing out here also ensures that we don't send feedback until we've
1144  * read our own replication slot state, so we don't tell the primary to
1145  * discard needed xmin or catalog_xmin from any slots that may exist on
1146  * this replica.
1147  */
1148  if (!HotStandbyActive())
1149  return;
1150 
1151  /*
1152  * Make the expensive call to get the oldest xmin once we are certain
1153  * everything else has been checked.
1154  */
1156  {
1157  GetReplicationHorizons(&xmin, &catalog_xmin);
1158  }
1159  else
1160  {
1161  xmin = InvalidTransactionId;
1162  catalog_xmin = InvalidTransactionId;
1163  }
1164 
1165  /*
1166  * Get epoch and adjust if nextXid and oldestXmin are different sides of
1167  * the epoch boundary.
1168  */
1169  nextFullXid = ReadNextFullTransactionId();
1170  nextXid = XidFromFullTransactionId(nextFullXid);
1171  xmin_epoch = EpochFromFullTransactionId(nextFullXid);
1172  catalog_xmin_epoch = xmin_epoch;
1173  if (nextXid < xmin)
1174  xmin_epoch--;
1175  if (nextXid < catalog_xmin)
1176  catalog_xmin_epoch--;
1177 
1178  elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1179  xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1180 
1181  /* Construct the message and send it. */
1183  pq_sendbyte(&reply_message, 'h');
1185  pq_sendint32(&reply_message, xmin);
1186  pq_sendint32(&reply_message, xmin_epoch);
1187  pq_sendint32(&reply_message, catalog_xmin);
1188  pq_sendint32(&reply_message, catalog_xmin_epoch);
1190  if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1191  primary_has_standby_xmin = true;
1192  else
1193  primary_has_standby_xmin = false;
1194 }
bool hot_standby_feedback
Definition: walreceiver.c:91
uint32 TransactionId
Definition: c.h:587
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
int64 TimestampTz
Definition: timestamp.h:39
bool HotStandbyActive(void)
Definition: xlog.c:8311
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
int wal_receiver_status_interval
Definition: walreceiver.c:89
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1711
static WalReceiverConn * wrconn
Definition: walreceiver.c:94
#define XidFromFullTransactionId(x)
Definition: transam.h:48
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static StringInfoData reply_message
Definition: walreceiver.c:118
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
unsigned int uint32
Definition: c.h:441
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:261
void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:1981
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:421
#define elog(elevel,...)
Definition: elog.h:232
#define TransactionIdIsValid(xid)
Definition: transam.h:41
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544

◆ XLogWalRcvSendReply()

static void XLogWalRcvSendReply ( bool  force,
bool  requestReply 
)
static

Definition at line 1033 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), GetXLogReplayRecPtr(), StringInfoData::len, LogstreamResult, LSN_FORMAT_ARGS, now(), pq_sendbyte(), pq_sendint64(), resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

1034 {
1035  static XLogRecPtr writePtr = 0;
1036  static XLogRecPtr flushPtr = 0;
1037  XLogRecPtr applyPtr;
1038  static TimestampTz sendTime = 0;
1039  TimestampTz now;
1040 
1041  /*
1042  * If the user doesn't want status to be reported to the primary, be sure
1043  * to exit before doing anything at all.
1044  */
1045  if (!force && wal_receiver_status_interval <= 0)
1046  return;
1047 
1048  /* Get current timestamp. */
1049  now = GetCurrentTimestamp();
1050 
1051  /*
1052  * We can compare the write and flush positions to the last message we
1053  * sent without taking any lock, but the apply position requires a spin
1054  * lock, so we don't check that unless something else has changed or 10
1055  * seconds have passed. This means that the apply WAL location will
1056  * appear, from the primary's point of view, to lag slightly, but since
1057  * this is only for reporting purposes and only on idle systems, that's
1058  * probably OK.
1059  */
1060  if (!force
1061  && writePtr == LogstreamResult.Write
1062  && flushPtr == LogstreamResult.Flush
1063  && !TimestampDifferenceExceeds(sendTime, now,
1065  return;
1066  sendTime = now;
1067 
1068  /* Construct a new message */
1069  writePtr = LogstreamResult.Write;
1070  flushPtr = LogstreamResult.Flush;
1071  applyPtr = GetXLogReplayRecPtr(NULL);
1072 
1074  pq_sendbyte(&reply_message, 'r');
1075  pq_sendint64(&reply_message, writePtr);
1076  pq_sendint64(&reply_message, flushPtr);
1077  pq_sendint64(&reply_message, applyPtr);
1079  pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1080 
1081  /* Send it */
1082  elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1083  LSN_FORMAT_ARGS(writePtr),
1084  LSN_FORMAT_ARGS(flushPtr),
1085  LSN_FORMAT_ARGS(applyPtr),
1086  requestReply ? " (reply requested)" : "");
1087 
1089 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
int64 TimestampTz
Definition: timestamp.h:39
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
int wal_receiver_status_interval
Definition: walreceiver.c:89
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1711
static WalReceiverConn * wrconn
Definition: walreceiver.c:94
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static StringInfoData reply_message
Definition: walreceiver.c:118
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11728
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:421
#define elog(elevel,...)
Definition: elog.h:232
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
static struct @17 LogstreamResult

◆ XLogWalRcvWrite()

static void XLogWalRcvWrite ( char *  buf,
Size  nbytes,
XLogRecPtr  recptr 
)
static

Definition at line 870 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, close, ereport, errcode_for_file_access(), errmsg(), LogstreamResult, MAXFNAMELEN, PANIC, pg_atomic_write_u64(), pg_pwrite(), recvFile, recvFileTLI, recvSegNo, ThisTimeLineID, wal_segment_size, WalRcv, WalRcvData::writtenUpto, XLByteInSeg, XLByteToSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileInit(), XLogFileName, XLogSegmentOffset, and XLogWalRcvFlush().

Referenced by XLogWalRcvProcessMsg().

871 {
872  int startoff;
873  int byteswritten;
874 
875  while (nbytes > 0)
876  {
877  int segbytes;
878 
879  if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
880  {
881  bool use_existent;
882 
883  /*
884  * fsync() and close current file before we switch to next one. We
885  * would otherwise have to reopen this file to fsync it later
886  */
887  if (recvFile >= 0)
888  {
889  char xlogfname[MAXFNAMELEN];
890 
891  XLogWalRcvFlush(false);
892 
894 
895  /*
896  * XLOG segment files will be re-read by recovery in startup
897  * process soon, so we don't advise the OS to release cache
898  * pages associated with the file like XLogFileClose() does.
899  */
900  if (close(recvFile) != 0)
901  ereport(PANIC,
903  errmsg("could not close log segment %s: %m",
904  xlogfname)));
905 
906  /*
907  * Create .done file forcibly to prevent the streamed segment
908  * from being archived later.
909  */
911  XLogArchiveForceDone(xlogfname);
912  else
913  XLogArchiveNotify(xlogfname);
914  }
915  recvFile = -1;
916 
917  /* Create/use new log file */
919  use_existent = true;
920  recvFile = XLogFileInit(recvSegNo, &use_existent, true);
922  }
923 
924  /* Calculate the start offset of the received logs */
925  startoff = XLogSegmentOffset(recptr, wal_segment_size);
926 
927  if (startoff + nbytes > wal_segment_size)
928  segbytes = wal_segment_size - startoff;
929  else
930  segbytes = nbytes;
931 
932  /* OK to write the logs */
933  errno = 0;
934 
935  byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
936  if (byteswritten <= 0)
937  {
938  char xlogfname[MAXFNAMELEN];
939  int save_errno;
940 
941  /* if write didn't set errno, assume no disk space */
942  if (errno == 0)
943  errno = ENOSPC;
944 
945  save_errno = errno;
947  errno = save_errno;
948  ereport(PANIC,
950  errmsg("could not write to log segment %s "
951  "at offset %u, length %lu: %m",
952  xlogfname, startoff, (unsigned long) segbytes)));
953  }
954 
955  /* Update state for write */
956  recptr += byteswritten;
957 
958  nbytes -= byteswritten;
959  buf += byteswritten;
960 
961  LogstreamResult.Write = recptr;
962  }
963 
964  /* Update shared-memory status */
966 }
int wal_segment_size
Definition: xlog.c:121
int XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
Definition: xlog.c:3288
#define PANIC
Definition: elog.h:50
ssize_t pg_pwrite(int fd, const void *buf, size_t nbyte, off_t offset)
Definition: pwrite.c:27
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:438
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:153
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:467
static char * buf
Definition: pg_test_fsync.c:68
int XLogArchiveMode
Definition: xlog.c:97
int errcode_for_file_access(void)
Definition: elog.c:721
static TimeLineID recvFileTLI
Definition: walreceiver.c:105
static XLogSegNo recvSegNo
Definition: walreceiver.c:106
#define MAXFNAMELEN
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:517
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
TimeLineID ThisTimeLineID
Definition: xlog.c:196
#define ereport(elevel,...)
Definition: elog.h:157
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:975
WalRcvData * WalRcv
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define close(a)
Definition: win32.h:12
static int recvFile
Definition: walreceiver.c:104
static struct @17 LogstreamResult
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

Variable Documentation

◆ Flush

XLogRecPtr Flush

Definition at line 115 of file walreceiver.c.

◆ hot_standby_feedback

bool hot_standby_feedback

Definition at line 91 of file walreceiver.c.

Referenced by XLogWalRcvSendHSFeedback().

◆ incoming_message

StringInfoData incoming_message
static

Definition at line 119 of file walreceiver.c.

◆ LogstreamResult

struct { ... } LogstreamResult

◆ recvFile

int recvFile = -1
static

Definition at line 104 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ recvFileTLI

TimeLineID recvFileTLI = 0
static

Definition at line 105 of file walreceiver.c.

Referenced by WalReceiverMain(), and XLogWalRcvWrite().

◆ recvSegNo

XLogSegNo recvSegNo = 0
static

Definition at line 106 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ reply_message

StringInfoData reply_message
static

Definition at line 118 of file walreceiver.c.

Referenced by send_feedback().

◆ wal_receiver_status_interval

int wal_receiver_status_interval

Definition at line 89 of file walreceiver.c.

Referenced by send_feedback(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ wal_receiver_timeout

int wal_receiver_timeout

Definition at line 90 of file walreceiver.c.

Referenced by logicalrep_worker_launch(), LogicalRepApplyLoop(), and WalReceiverMain().

◆ WalReceiverFunctions

WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 95 of file walreceiver.c.

Referenced by _PG_init().

◆ wrconn

WalReceiverConn* wrconn = NULL
static

Definition at line 94 of file walreceiver.c.

◆ Write

XLogRecPtr Write

Definition at line 114 of file walreceiver.c.