PostgreSQL Source Code  git master
walreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "common/ip.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */
 

Functions

static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr)
 
static void XLogWalRcvFlush (bool dying)
 
static void XLogWalRcvSendReply (bool force, bool requestReply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
void ProcessWalRcvInterrupts (void)
 
void WalReceiverMain (void)
 
void WalRcvForceReply (void)
 
static const char * WalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
struct {
   XLogRecPtr   Write
 
   XLogRecPtr   Flush
 
LogstreamResult
 
static StringInfoData reply_message
 
static StringInfoData incoming_message
 

Macro Definition Documentation

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */

Definition at line 97 of file walreceiver.c.

Referenced by WalReceiverMain().

Function Documentation

◆ pg_stat_get_wal_receiver()

Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1303 of file walreceiver.c.

References WalRcvData::conninfo, CStringGetTextDatum, elog, ERROR, WalRcvData::flushedUpto, get_call_result_type(), GetUserId(), heap_form_tuple(), HeapTupleGetDatum, Int32GetDatum, is_member_of_role(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum, MAXCONNINFO, MemSet, WalRcvData::mutex, NAMEDATALEN, TupleDescData::natts, NI_MAXHOST, palloc0(), pg_atomic_read_u64(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::sender_host, WalRcvData::sender_port, WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, strlcpy(), TimestampTzGetDatum, TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, WalRcvData::writtenUpto, and XLogRecPtrIsInvalid.

1304 {
1305  TupleDesc tupdesc;
1306  Datum *values;
1307  bool *nulls;
1308  int pid;
1309  bool ready_to_display;
1311  XLogRecPtr receive_start_lsn;
1312  TimeLineID receive_start_tli;
1313  XLogRecPtr written_lsn;
1314  XLogRecPtr flushed_lsn;
1315  TimeLineID received_tli;
1316  TimestampTz last_send_time;
1317  TimestampTz last_receipt_time;
1318  XLogRecPtr latest_end_lsn;
1319  TimestampTz latest_end_time;
1320  char sender_host[NI_MAXHOST];
1321  int sender_port = 0;
1322  char slotname[NAMEDATALEN];
1323  char conninfo[MAXCONNINFO];
1324 
1325  /* Take a lock to ensure value consistency */
1327  pid = (int) WalRcv->pid;
1328  ready_to_display = WalRcv->ready_to_display;
1329  state = WalRcv->walRcvState;
1330  receive_start_lsn = WalRcv->receiveStart;
1331  receive_start_tli = WalRcv->receiveStartTLI;
1332  flushed_lsn = WalRcv->flushedUpto;
1333  received_tli = WalRcv->receivedTLI;
1334  last_send_time = WalRcv->lastMsgSendTime;
1335  last_receipt_time = WalRcv->lastMsgReceiptTime;
1336  latest_end_lsn = WalRcv->latestWalEnd;
1337  latest_end_time = WalRcv->latestWalEndTime;
1338  strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1339  strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
1340  sender_port = WalRcv->sender_port;
1341  strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1343 
1344  /*
1345  * No WAL receiver (or not ready yet), just return a tuple with NULL
1346  * values
1347  */
1348  if (pid == 0 || !ready_to_display)
1349  PG_RETURN_NULL();
1350 
1351  /*
1352  * Read "writtenUpto" without holding a spinlock. Note that it may not be
1353  * consistent with the other shared variables of the WAL receiver
1354  * protected by a spinlock, but this should not be used for data integrity
1355  * checks.
1356  */
1357  written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto);
1358 
1359  /* determine result type */
1360  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1361  elog(ERROR, "return type must be a row type");
1362 
1363  values = palloc0(sizeof(Datum) * tupdesc->natts);
1364  nulls = palloc0(sizeof(bool) * tupdesc->natts);
1365 
1366  /* Fetch values */
1367  values[0] = Int32GetDatum(pid);
1368 
1369  if (!is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
1370  {
1371  /*
1372  * Only superusers and members of pg_read_all_stats can see details.
1373  * Other users only get the pid value to know whether it is a WAL
1374  * receiver, but no details.
1375  */
1376  MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1377  }
1378  else
1379  {
1380  values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1381 
1382  if (XLogRecPtrIsInvalid(receive_start_lsn))
1383  nulls[2] = true;
1384  else
1385  values[2] = LSNGetDatum(receive_start_lsn);
1386  values[3] = Int32GetDatum(receive_start_tli);
1387  if (XLogRecPtrIsInvalid(written_lsn))
1388  nulls[4] = true;
1389  else
1390  values[4] = LSNGetDatum(written_lsn);
1391  if (XLogRecPtrIsInvalid(flushed_lsn))
1392  nulls[5] = true;
1393  else
1394  values[5] = LSNGetDatum(flushed_lsn);
1395  values[6] = Int32GetDatum(received_tli);
1396  if (last_send_time == 0)
1397  nulls[7] = true;
1398  else
1399  values[7] = TimestampTzGetDatum(last_send_time);
1400  if (last_receipt_time == 0)
1401  nulls[8] = true;
1402  else
1403  values[8] = TimestampTzGetDatum(last_receipt_time);
1404  if (XLogRecPtrIsInvalid(latest_end_lsn))
1405  nulls[9] = true;
1406  else
1407  values[9] = LSNGetDatum(latest_end_lsn);
1408  if (latest_end_time == 0)
1409  nulls[10] = true;
1410  else
1411  values[10] = TimestampTzGetDatum(latest_end_time);
1412  if (*slotname == '\0')
1413  nulls[11] = true;
1414  else
1415  values[11] = CStringGetTextDatum(slotname);
1416  if (*sender_host == '\0')
1417  nulls[12] = true;
1418  else
1419  values[12] = CStringGetTextDatum(sender_host);
1420  if (sender_port == 0)
1421  nulls[13] = true;
1422  else
1423  values[13] = Int32GetDatum(sender_port);
1424  if (*conninfo == '\0')
1425  nulls[14] = true;
1426  else
1427  values[14] = CStringGetTextDatum(conninfo);
1428  }
1429 
1430  /* Returns the record as Datum */
1431  PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1432 }
int sender_port
Definition: walreceiver.h:119
static const char * WalRcvGetStateString(WalRcvState state)
Definition: walreceiver.c:1278
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:207
uint32 TimeLineID
Definition: xlogdefs.h:59
slock_t mutex
Definition: walreceiver.h:145
Oid GetUserId(void)
Definition: miscinit.c:478
int64 TimestampTz
Definition: timestamp.h:39
WalRcvState walRcvState
Definition: walreceiver.h:65
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
TimeLineID receivedTLI
Definition: walreceiver.h:86
#define MemSet(start, val, len)
Definition: c.h:1008
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:100
TimestampTz lastMsgSendTime
Definition: walreceiver.h:99
WalRcvState
Definition: walreceiver.h:45
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:153
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define NI_MAXHOST
Definition: getaddrinfo.h:88
#define ERROR
Definition: elog.h:46
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
pid_t pid
Definition: walreceiver.h:64
#define MAXCONNINFO
Definition: walreceiver.h:37
TimeLineID receiveStartTLI
Definition: walreceiver.h:76
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
void * palloc0(Size size)
Definition: mcxt.c:1093
uintptr_t Datum
Definition: postgres.h:411
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:353
bool ready_to_display
Definition: walreceiver.h:134
TimestampTz latestWalEndTime
Definition: walreceiver.h:106
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4869
XLogRecPtr latestWalEnd
Definition: walreceiver.h:105
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:317
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:220
WalRcvData * WalRcv
static Datum values[MAXATTR]
Definition: bootstrap.c:166
#define Int32GetDatum(X)
Definition: postgres.h:523
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:429
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:118
#define elog(elevel,...)
Definition: elog.h:232
#define CStringGetTextDatum(s)
Definition: builtins.h:82
XLogRecPtr receiveStart
Definition: walreceiver.h:75
XLogRecPtr flushedUpto
Definition: walreceiver.h:85
#define PG_RETURN_NULL()
Definition: fmgr.h:345
char slotname[NAMEDATALEN]
Definition: walreceiver.h:125
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:112

◆ ProcessWalRcvInterrupts()

void ProcessWalRcvInterrupts ( void  )

Definition at line 146 of file walreceiver.c.

References CHECK_FOR_INTERRUPTS, ereport, errcode(), errmsg(), FATAL, and ShutdownRequestPending.

Referenced by libpqrcv_connect(), libpqrcv_PQgetResult(), libpqrcv_processTuples(), walrcv_clear_result(), WalRcvWaitForStartPosition(), and WalReceiverMain().

147 {
148  /*
149  * Although walreceiver interrupt handling doesn't use the same scheme as
150  * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
151  * any incoming signals on Win32, and also to make sure we process any
152  * barrier events.
153  */
155 
157  {
158  ereport(FATAL,
159  (errcode(ERRCODE_ADMIN_SHUTDOWN),
160  errmsg("terminating walreceiver process due to administrator command")));
161  }
162 }
int errcode(int sqlerrcode)
Definition: elog.c:698
#define FATAL
Definition: elog.h:49
volatile sig_atomic_t ShutdownRequestPending
Definition: interrupt.c:27
#define ereport(elevel,...)
Definition: elog.h:157
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120

◆ ProcessWalSndrMessage()

static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1207 of file walreceiver.c.

References DEBUG2, elog, GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, message_level_is_interesting(), WalRcvData::mutex, pfree(), pstrdup(), SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

1208 {
1209  WalRcvData *walrcv = WalRcv;
1210 
1211  TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1212 
1213  /* Update shared-memory status */
1214  SpinLockAcquire(&walrcv->mutex);
1215  if (walrcv->latestWalEnd < walEnd)
1216  walrcv->latestWalEndTime = sendTime;
1217  walrcv->latestWalEnd = walEnd;
1218  walrcv->lastMsgSendTime = sendTime;
1219  walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1220  SpinLockRelease(&walrcv->mutex);
1221 
1223  {
1224  char *sendtime;
1225  char *receipttime;
1226  int applyDelay;
1227 
1228  /* Copy because timestamptz_to_str returns a static buffer */
1229  sendtime = pstrdup(timestamptz_to_str(sendTime));
1230  receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1231  applyDelay = GetReplicationApplyDelay();
1232 
1233  /* apply delay is not available */
1234  if (applyDelay == -1)
1235  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1236  sendtime,
1237  receipttime,
1239  else
1240  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1241  sendtime,
1242  receipttime,
1243  applyDelay,
1245 
1246  pfree(sendtime);
1247  pfree(receipttime);
1248  }
1249 }
slock_t mutex
Definition: walreceiver.h:145
int GetReplicationTransferLatency(void)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1299
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:100
TimestampTz lastMsgSendTime
Definition: walreceiver.h:99
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pfree(void *pointer)
Definition: mcxt.c:1169
#define DEBUG2
Definition: elog.h:24
bool message_level_is_interesting(int elevel)
Definition: elog.c:270
int GetReplicationApplyDelay(void)
#define SpinLockRelease(lock)
Definition: spin.h:64
TimestampTz latestWalEndTime
Definition: walreceiver.h:106
XLogRecPtr latestWalEnd
Definition: walreceiver.h:105
WalRcvData * WalRcv
#define elog(elevel,...)
Definition: elog.h:232
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1774

◆ WalRcvDie()

static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 775 of file walreceiver.c.

References Assert, ConditionVariableBroadcast(), WalRcvData::latch, WalRcvData::mutex, MyProcPid, WalRcvData::pid, WalRcvData::ready_to_display, SpinLockAcquire, SpinLockRelease, WakeupRecovery(), WalRcv, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

776 {
777  WalRcvData *walrcv = WalRcv;
778 
779  /* Ensure that all WAL records received are flushed to disk */
780  XLogWalRcvFlush(true);
781 
782  /* Mark ourselves inactive in shared memory */
783  SpinLockAcquire(&walrcv->mutex);
784  Assert(walrcv->walRcvState == WALRCV_STREAMING ||
785  walrcv->walRcvState == WALRCV_RESTARTING ||
786  walrcv->walRcvState == WALRCV_STARTING ||
787  walrcv->walRcvState == WALRCV_WAITING ||
788  walrcv->walRcvState == WALRCV_STOPPING);
789  Assert(walrcv->pid == MyProcPid);
790  walrcv->walRcvState = WALRCV_STOPPED;
791  walrcv->pid = 0;
792  walrcv->ready_to_display = false;
793  walrcv->latch = NULL;
794  SpinLockRelease(&walrcv->mutex);
795 
797 
798  /* Terminate the connection gracefully. */
799  if (wrconn != NULL)
801 
802  /* Wake up the startup process to notice promptly that we're gone */
803  WakeupRecovery();
804 }
int MyProcPid
Definition: globals.c:43
slock_t mutex
Definition: walreceiver.h:145
WalRcvState walRcvState
Definition: walreceiver.h:65
void ConditionVariableBroadcast(ConditionVariable *cv)
static WalReceiverConn * wrconn
Definition: walreceiver.c:94
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:13000
pid_t pid
Definition: walreceiver.h:64
Latch * latch
Definition: walreceiver.h:143
#define SpinLockRelease(lock)
Definition: spin.h:64
bool ready_to_display
Definition: walreceiver.h:134
#define Assert(condition)
Definition: c.h:804
#define walrcv_disconnect(conn)
Definition: walreceiver.h:432
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:979
WalRcvData * WalRcv
ConditionVariable walRcvStoppedCV
Definition: walreceiver.h:66

◆ WalRcvFetchTimeLineHistoryFiles()

static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 719 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, ereport, errcode(), errmsg(), errmsg_internal(), ERROR, existsTimeLineHistory(), LOG, MAXFNAMELEN, pfree(), TLHistoryFileName, walrcv_readtimelinehistoryfile, writeTimeLineHistoryFile(), XLogArchiveForceDone(), XLogArchiveMode, and XLogArchiveNotify().

Referenced by WalReceiverMain().

720 {
721  TimeLineID tli;
722 
723  for (tli = first; tli <= last; tli++)
724  {
725  /* there's no history file for timeline 1 */
726  if (tli != 1 && !existsTimeLineHistory(tli))
727  {
728  char *fname;
729  char *content;
730  int len;
731  char expectedfname[MAXFNAMELEN];
732 
733  ereport(LOG,
734  (errmsg("fetching timeline history file for timeline %u from primary server",
735  tli)));
736 
737  walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
738 
739  /*
740  * Check that the filename on the primary matches what we
741  * calculated ourselves. This is just a sanity check, it should
742  * always match.
743  */
744  TLHistoryFileName(expectedfname, tli);
745  if (strcmp(fname, expectedfname) != 0)
746  ereport(ERROR,
747  (errcode(ERRCODE_PROTOCOL_VIOLATION),
748  errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
749  tli)));
750 
751  /*
752  * Write the file to pg_wal.
753  */
754  writeTimeLineHistoryFile(tli, content, len);
755 
756  /*
757  * Mark the streamed history file as ready for archiving if
758  * archive_mode is always.
759  */
761  XLogArchiveForceDone(fname);
762  else
763  XLogArchiveNotify(fname);
764 
765  pfree(fname);
766  pfree(content);
767  }
768  }
769 }
uint32 TimeLineID
Definition: xlogdefs.h:59
int errcode(int sqlerrcode)
Definition: elog.c:698
#define LOG
Definition: elog.h:26
static WalReceiverConn * wrconn
Definition: walreceiver.c:94
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:467
void pfree(void *pointer)
Definition: mcxt.c:1169
#define ERROR
Definition: elog.h:46
#define TLHistoryFileName(fname, tli)
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition: timeline.c:467
int XLogArchiveMode
Definition: xlog.c:96
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition: timeline.c:222
#define MAXFNAMELEN
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:517
#define ereport(elevel,...)
Definition: elog.h:157
int errmsg_internal(const char *fmt,...)
Definition: elog.c:996
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
Definition: walreceiver.h:416
int errmsg(const char *fmt,...)
Definition: elog.c:909

◆ WalRcvForceReply()

void WalRcvForceReply ( void  )

Definition at line 1260 of file walreceiver.c.

References WalRcvData::force_reply, WalRcvData::latch, WalRcvData::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by StartupXLOG(), WaitForWALToBecomeAvailable(), and walrcv_clear_result().

1261 {
1262  Latch *latch;
1263 
1264  WalRcv->force_reply = true;
1265  /* fetching the latch pointer might not be atomic, so use spinlock */
1267  latch = WalRcv->latch;
1269  if (latch)
1270  SetLatch(latch);
1271 }
slock_t mutex
Definition: walreceiver.h:145
sig_atomic_t force_reply
Definition: walreceiver.h:160
void SetLatch(Latch *latch)
Definition: latch.c:567
#define SpinLockAcquire(lock)
Definition: spin.h:62
Definition: latch.h:110
Latch * latch
Definition: walreceiver.h:143
#define SpinLockRelease(lock)
Definition: spin.h:64
WalRcvData * WalRcv

◆ WalRcvGetStateString()

static const char* WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1278 of file walreceiver.c.

References WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

1279 {
1280  switch (state)
1281  {
1282  case WALRCV_STOPPED:
1283  return "stopped";
1284  case WALRCV_STARTING:
1285  return "starting";
1286  case WALRCV_STREAMING:
1287  return "streaming";
1288  case WALRCV_WAITING:
1289  return "waiting";
1290  case WALRCV_RESTARTING:
1291  return "restarting";
1292  case WALRCV_STOPPING:
1293  return "stopping";
1294  }
1295  return "UNKNOWN";
1296 }
Definition: regguts.h:317

◆ WalRcvWaitForStartPosition()

static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 639 of file walreceiver.c.

References Assert, elog, FATAL, InvalidXLogRecPtr, LSN_FORMAT_ARGS, WalRcvData::mutex, MyLatch, proc_exit(), ProcessWalRcvInterrupts(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, ResetLatch(), set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WAIT_EVENT_WAL_RECEIVER_WAIT_START, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by WalReceiverMain().

640 {
641  WalRcvData *walrcv = WalRcv;
642  int state;
643 
644  SpinLockAcquire(&walrcv->mutex);
645  state = walrcv->walRcvState;
646  if (state != WALRCV_STREAMING)
647  {
648  SpinLockRelease(&walrcv->mutex);
649  if (state == WALRCV_STOPPING)
650  proc_exit(0);
651  else
652  elog(FATAL, "unexpected walreceiver state");
653  }
654  walrcv->walRcvState = WALRCV_WAITING;
656  walrcv->receiveStartTLI = 0;
657  SpinLockRelease(&walrcv->mutex);
658 
659  set_ps_display("idle");
660 
661  /*
662  * nudge startup process to notice that we've stopped streaming and are
663  * now waiting for instructions.
664  */
665  WakeupRecovery();
666  for (;;)
667  {
669 
671 
672  SpinLockAcquire(&walrcv->mutex);
673  Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
674  walrcv->walRcvState == WALRCV_WAITING ||
675  walrcv->walRcvState == WALRCV_STOPPING);
676  if (walrcv->walRcvState == WALRCV_RESTARTING)
677  {
678  /*
679  * No need to handle changes in primary_conninfo or
680  * primary_slotname here. Startup process will signal us to
681  * terminate in case those change.
682  */
683  *startpoint = walrcv->receiveStart;
684  *startpointTLI = walrcv->receiveStartTLI;
685  walrcv->walRcvState = WALRCV_STREAMING;
686  SpinLockRelease(&walrcv->mutex);
687  break;
688  }
689  if (walrcv->walRcvState == WALRCV_STOPPING)
690  {
691  /*
692  * We should've received SIGTERM if the startup process wants us
693  * to die, but might as well check it here too.
694  */
695  SpinLockRelease(&walrcv->mutex);
696  exit(1);
697  }
698  SpinLockRelease(&walrcv->mutex);
699 
702  }
703 
705  {
706  char activitymsg[50];
707 
708  snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
709  LSN_FORMAT_ARGS(*startpoint));
710  set_ps_display(activitymsg);
711  }
712 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
slock_t mutex
Definition: walreceiver.h:145
bool update_process_title
Definition: ps_status.c:36
WalRcvState walRcvState
Definition: walreceiver.h:65
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:146
void proc_exit(int code)
Definition: ipc.c:104
void ResetLatch(Latch *latch)
Definition: latch.c:660
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:452
void set_ps_display(const char *activity)
Definition: ps_status.c:349
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
void WakeupRecovery(void)
Definition: xlog.c:13000
#define FATAL
Definition: elog.h:49
TimeLineID receiveStartTLI
Definition: walreceiver.h:76
#define SpinLockRelease(lock)
Definition: spin.h:64
#define Assert(condition)
Definition: c.h:804
Definition: regguts.h:317
WalRcvData * WalRcv
#define elog(elevel,...)
Definition: elog.h:232
struct Latch * MyLatch
Definition: globals.c:57
XLogRecPtr receiveStart
Definition: walreceiver.h:75
#define snprintf
Definition: port.h:216
#define WL_LATCH_SET
Definition: latch.h:125
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130

◆ WalReceiverMain()

void WalReceiverMain ( void  )

Definition at line 167 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, Assert, buf, close, cluster_name, ConditionVariableBroadcast(), ConfigReloadPending, WalRcvData::conninfo, DEBUG1, elog, ereport, errcode(), errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, WalRcvData::force_reply, GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), initStringInfo(), WalRcvData::is_temp_slot, WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latch, WalRcvData::latestWalEndTime, load_file(), LOG, WalRcvStreamOptions::logical, LogstreamResult, LSN_FORMAT_ARGS, MAXCONNINFO, MAXFNAMELEN, WalRcvData::mutex, MyLatch, MyProc, MyProcPid, NAMEDATALEN, NAPTIME_PER_CYCLE, NI_MAXHOST, now(), on_shmem_exit(), options, PANIC, pfree(), pg_atomic_write_u64(), pg_memory_barrier, PG_SETMASK, PGC_SIGHUP, PGINVALID_SOCKET, WalRcvStreamOptions::physical, WalRcvData::pid, pqsignal(), proc_exit(), ProcessConfigFile(), ProcessWalRcvInterrupts(), PGPROC::procLatch, procsignal_sigusr1_handler(), WalRcvStreamOptions::proto, WalRcvData::ready_to_display, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, ResetLatch(), WalRcvData::sender_host, WalRcvData::sender_port, SIG_DFL, SIG_IGN, SIGALRM, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SIGPIPE, SIGUSR1, SIGUSR2, WalRcvData::slotname, WalRcvStreamOptions::slotname, snprintf, SpinLockAcquire, SpinLockRelease, WalRcvStreamOptions::startpoint, strlcpy(), ThisTimeLineID, TimestampTzPlusMilliseconds, UINT64_FORMAT, UnBlockSig, WAIT_EVENT_WAL_RECEIVER_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, wal_segment_size, WalRcv, walrcv_connect, walrcv_create_slot, walrcv_endstreaming, walrcv_get_backend_pid, walrcv_get_conninfo, walrcv_get_senderinfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, WalRcvWaitForStartPosition(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_TIMEOUT, WalRcvData::writtenUpto, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName, XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by AuxiliaryProcessMain(), and walrcv_clear_result().

168 {
169  char conninfo[MAXCONNINFO];
170  char *tmp_conninfo;
171  char slotname[NAMEDATALEN];
172  bool is_temp_slot;
173  XLogRecPtr startpoint;
174  TimeLineID startpointTLI;
175  TimeLineID primaryTLI;
176  bool first_stream;
177  WalRcvData *walrcv = WalRcv;
178  TimestampTz last_recv_timestamp;
180  bool ping_sent;
181  char *err;
182  char *sender_host = NULL;
183  int sender_port = 0;
184 
185  /*
186  * WalRcv should be set up already (if we are a backend, we inherit this
187  * by fork() or EXEC_BACKEND mechanism from the postmaster).
188  */
189  Assert(walrcv != NULL);
190 
191  now = GetCurrentTimestamp();
192 
193  /*
194  * Mark walreceiver as running in shared memory.
195  *
196  * Do this as early as possible, so that if we fail later on, we'll set
197  * state to STOPPED. If we die before this, the startup process will keep
198  * waiting for us to start up, until it times out.
199  */
200  SpinLockAcquire(&walrcv->mutex);
201  Assert(walrcv->pid == 0);
202  switch (walrcv->walRcvState)
203  {
204  case WALRCV_STOPPING:
205  /* If we've already been requested to stop, don't start up. */
206  walrcv->walRcvState = WALRCV_STOPPED;
207  /* fall through */
208 
209  case WALRCV_STOPPED:
210  SpinLockRelease(&walrcv->mutex);
212  proc_exit(1);
213  break;
214 
215  case WALRCV_STARTING:
216  /* The usual case */
217  break;
218 
219  case WALRCV_WAITING:
220  case WALRCV_STREAMING:
221  case WALRCV_RESTARTING:
222  default:
223  /* Shouldn't happen */
224  SpinLockRelease(&walrcv->mutex);
225  elog(PANIC, "walreceiver still running according to shared memory state");
226  }
227  /* Advertise our PID so that the startup process can kill us */
228  walrcv->pid = MyProcPid;
229  walrcv->walRcvState = WALRCV_STREAMING;
230 
231  /* Fetch information required to start streaming */
232  walrcv->ready_to_display = false;
233  strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
234  strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
235  is_temp_slot = walrcv->is_temp_slot;
236  startpoint = walrcv->receiveStart;
237  startpointTLI = walrcv->receiveStartTLI;
238 
239  /*
240  * At most one of is_temp_slot and slotname can be set; otherwise,
241  * RequestXLogStreaming messed up.
242  */
243  Assert(!is_temp_slot || (slotname[0] == '\0'));
244 
245  /* Initialise to a sanish value */
246  walrcv->lastMsgSendTime =
247  walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
248 
249  /* Report the latch to use to awaken this process */
250  walrcv->latch = &MyProc->procLatch;
251 
252  SpinLockRelease(&walrcv->mutex);
253 
255 
256  /* Arrange to clean up at walreceiver exit */
258 
259  /* Properly accept or ignore signals the postmaster might send us */
260  pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
261  * file */
262  pqsignal(SIGINT, SIG_IGN);
263  pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
264  /* SIGQUIT handler was already set up by InitPostmasterChild */
269 
270  /* Reset some signals that are accepted by postmaster but not here */
272 
273  /* Load the libpq-specific functions */
274  load_file("libpqwalreceiver", false);
275  if (WalReceiverFunctions == NULL)
276  elog(ERROR, "libpqwalreceiver didn't initialize correctly");
277 
278  /* Unblock signals (they were blocked when the postmaster forked us) */
280 
281  /* Establish the connection to the primary for XLOG streaming */
282  wrconn = walrcv_connect(conninfo, false,
283  cluster_name[0] ? cluster_name : "walreceiver",
284  &err);
285  if (!wrconn)
286  ereport(ERROR,
287  (errcode(ERRCODE_CONNECTION_FAILURE),
288  errmsg("could not connect to the primary server: %s", err)));
289 
290  /*
291  * Save user-visible connection string. This clobbers the original
292  * conninfo, for security. Also save host and port of the sender server
293  * this walreceiver is connected to.
294  */
295  tmp_conninfo = walrcv_get_conninfo(wrconn);
296  walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
297  SpinLockAcquire(&walrcv->mutex);
298  memset(walrcv->conninfo, 0, MAXCONNINFO);
299  if (tmp_conninfo)
300  strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
301 
302  memset(walrcv->sender_host, 0, NI_MAXHOST);
303  if (sender_host)
304  strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
305 
306  walrcv->sender_port = sender_port;
307  walrcv->ready_to_display = true;
308  SpinLockRelease(&walrcv->mutex);
309 
310  if (tmp_conninfo)
311  pfree(tmp_conninfo);
312 
313  if (sender_host)
314  pfree(sender_host);
315 
316  first_stream = true;
317  for (;;)
318  {
319  char *primary_sysid;
320  char standby_sysid[32];
322 
323  /*
324  * Check that we're connected to a valid server using the
325  * IDENTIFY_SYSTEM replication command.
326  */
327  primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
328 
329  snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
331  if (strcmp(primary_sysid, standby_sysid) != 0)
332  {
333  ereport(ERROR,
334  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
335  errmsg("database system identifier differs between the primary and standby"),
336  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
337  primary_sysid, standby_sysid)));
338  }
339 
340  /*
341  * Confirm that the current timeline of the primary is the same or
342  * ahead of ours.
343  */
344  if (primaryTLI < startpointTLI)
345  ereport(ERROR,
346  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
347  errmsg("highest timeline %u of the primary is behind recovery timeline %u",
348  primaryTLI, startpointTLI)));
349 
350  /*
351  * Get any missing history files. We do this always, even when we're
352  * not interested in that timeline, so that if we're promoted to
353  * become the primary later on, we don't select the same timeline that
354  * was already used in the current primary. This isn't bullet-proof -
355  * you'll need some external software to manage your cluster if you
356  * need to ensure that a unique timeline id is chosen in every case,
357  * but let's avoid the confusion of timeline id collisions where we
358  * can.
359  */
360  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
361 
362  /*
363  * Create temporary replication slot if requested, and update slot
364  * name in shared memory. (Note the slot name cannot already be set
365  * in this case.)
366  */
367  if (is_temp_slot)
368  {
369  snprintf(slotname, sizeof(slotname),
370  "pg_walreceiver_%lld",
371  (long long int) walrcv_get_backend_pid(wrconn));
372 
373  walrcv_create_slot(wrconn, slotname, true, false, 0, NULL);
374 
375  SpinLockAcquire(&walrcv->mutex);
376  strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
377  SpinLockRelease(&walrcv->mutex);
378  }
379 
380  /*
381  * Start streaming.
382  *
383  * We'll try to start at the requested starting point and timeline,
384  * even if it's different from the server's latest timeline. In case
385  * we've already reached the end of the old timeline, the server will
386  * finish the streaming immediately, and we will go back to await
387  * orders from the startup process. If recovery_target_timeline is
388  * 'latest', the startup process will scan pg_wal and find the new
389  * history file, bump recovery target timeline, and ask us to restart
390  * on the new timeline.
391  */
392  options.logical = false;
393  options.startpoint = startpoint;
394  options.slotname = slotname[0] != '\0' ? slotname : NULL;
395  options.proto.physical.startpointTLI = startpointTLI;
396  ThisTimeLineID = startpointTLI;
397  if (walrcv_startstreaming(wrconn, &options))
398  {
399  if (first_stream)
400  ereport(LOG,
401  (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
402  LSN_FORMAT_ARGS(startpoint), startpointTLI)));
403  else
404  ereport(LOG,
405  (errmsg("restarted WAL streaming at %X/%X on timeline %u",
406  LSN_FORMAT_ARGS(startpoint), startpointTLI)));
407  first_stream = false;
408 
409  /* Initialize LogstreamResult and buffers for processing messages */
413 
414  /* Initialize the last recv timestamp */
415  last_recv_timestamp = GetCurrentTimestamp();
416  ping_sent = false;
417 
418  /* Loop until end-of-streaming or error */
419  for (;;)
420  {
421  char *buf;
422  int len;
423  bool endofwal = false;
424  pgsocket wait_fd = PGINVALID_SOCKET;
425  int rc;
426 
427  /*
428  * Exit walreceiver if we're not in recovery. This should not
429  * happen, but cross-check the status here.
430  */
431  if (!RecoveryInProgress())
432  ereport(FATAL,
433  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
434  errmsg("cannot continue WAL streaming, recovery has already ended")));
435 
436  /* Process any requests or signals received recently */
438 
440  {
441  ConfigReloadPending = false;
444  }
445 
446  /* See if we can read data immediately */
447  len = walrcv_receive(wrconn, &buf, &wait_fd);
448  if (len != 0)
449  {
450  /*
451  * Process the received data, and any subsequent data we
452  * can read without blocking.
453  */
454  for (;;)
455  {
456  if (len > 0)
457  {
458  /*
459  * Something was received from primary, so reset
460  * timeout
461  */
462  last_recv_timestamp = GetCurrentTimestamp();
463  ping_sent = false;
464  XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
465  }
466  else if (len == 0)
467  break;
468  else if (len < 0)
469  {
470  ereport(LOG,
471  (errmsg("replication terminated by primary server"),
472  errdetail("End of WAL reached on timeline %u at %X/%X.",
473  startpointTLI,
475  endofwal = true;
476  break;
477  }
478  len = walrcv_receive(wrconn, &buf, &wait_fd);
479  }
480 
481  /* Let the primary know that we received some data. */
482  XLogWalRcvSendReply(false, false);
483 
484  /*
485  * If we've written some records, flush them to disk and
486  * let the startup process and primary server know about
487  * them.
488  */
489  XLogWalRcvFlush(false);
490  }
491 
492  /* Check if we need to exit the streaming loop. */
493  if (endofwal)
494  break;
495 
496  /*
497  * Ideally we would reuse a WaitEventSet object repeatedly
498  * here to avoid the overheads of WaitLatchOrSocket on epoll
499  * systems, but we can't be sure that libpq (or any other
500  * walreceiver implementation) has the same socket (even if
501  * the fd is the same number, it may have been closed and
502  * reopened since the last time). In future, if there is a
503  * function for removing sockets from WaitEventSet, then we
504  * could add and remove just the socket each time, potentially
505  * avoiding some system calls.
506  */
507  Assert(wait_fd != PGINVALID_SOCKET);
511  wait_fd,
514  if (rc & WL_LATCH_SET)
515  {
518 
519  if (walrcv->force_reply)
520  {
521  /*
522  * The recovery process has asked us to send apply
523  * feedback now. Make sure the flag is really set to
524  * false in shared memory before sending the reply, so
525  * we don't miss a new request for a reply.
526  */
527  walrcv->force_reply = false;
529  XLogWalRcvSendReply(true, false);
530  }
531  }
532  if (rc & WL_TIMEOUT)
533  {
534  /*
535  * We didn't receive anything new. If we haven't heard
536  * anything from the server for more than
537  * wal_receiver_timeout / 2, ping the server. Also, if
538  * it's been longer than wal_receiver_status_interval
539  * since the last update we sent, send a status update to
540  * the primary anyway, to report any progress in applying
541  * WAL.
542  */
543  bool requestReply = false;
544 
545  /*
546  * Check if time since last receive from primary has
547  * reached the configured limit.
548  */
549  if (wal_receiver_timeout > 0)
550  {
552  TimestampTz timeout;
553 
554  timeout =
555  TimestampTzPlusMilliseconds(last_recv_timestamp,
557 
558  if (now >= timeout)
559  ereport(ERROR,
560  (errcode(ERRCODE_CONNECTION_FAILURE),
561  errmsg("terminating walreceiver due to timeout")));
562 
563  /*
564  * We didn't receive anything new, for half of
565  * receiver replication timeout. Ping the server.
566  */
567  if (!ping_sent)
568  {
569  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
570  (wal_receiver_timeout / 2));
571  if (now >= timeout)
572  {
573  requestReply = true;
574  ping_sent = true;
575  }
576  }
577  }
578 
579  XLogWalRcvSendReply(requestReply, requestReply);
581  }
582  }
583 
584  /*
585  * The backend finished streaming. Exit streaming COPY-mode from
586  * our side, too.
587  */
588  walrcv_endstreaming(wrconn, &primaryTLI);
589 
590  /*
591  * If the server had switched to a new timeline that we didn't
592  * know about when we began streaming, fetch its timeline history
593  * file now.
594  */
595  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
596  }
597  else
598  ereport(LOG,
599  (errmsg("primary server contains no more WAL on requested timeline %u",
600  startpointTLI)));
601 
602  /*
603  * End of WAL reached on the requested timeline. Close the last
604  * segment, and await for new orders from the startup process.
605  */
606  if (recvFile >= 0)
607  {
608  char xlogfname[MAXFNAMELEN];
609 
610  XLogWalRcvFlush(false);
612  if (close(recvFile) != 0)
613  ereport(PANIC,
615  errmsg("could not close log segment %s: %m",
616  xlogfname)));
617 
618  /*
619  * Create .done file forcibly to prevent the streamed segment from
620  * being archived later.
621  */
623  XLogArchiveForceDone(xlogfname);
624  else
625  XLogArchiveNotify(xlogfname);
626  }
627  recvFile = -1;
628 
629  elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
630  WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
631  }
632  /* not reached */
633 }
int sender_port
Definition: walreceiver.h:119
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
Definition: walreceiver.c:719
#define walrcv_get_conninfo(conn)
Definition: walreceiver.h:408
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:420
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:43
uint32 TimeLineID
Definition: xlogdefs.h:59
slock_t mutex
Definition: walreceiver.h:145
#define WL_TIMEOUT
Definition: latch.h:128
void ProcessConfigFile(GucContext context)
int wal_segment_size
Definition: xlog.c:119
#define SIGUSR1
Definition: win32_port.h:171
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
#define SIGCHLD
Definition: win32_port.h:169
PGPROC * MyProc
Definition: proc.c:68
int64 TimestampTz
Definition: timestamp.h:39
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
Definition: walreceiver.c:639
sig_atomic_t force_reply
Definition: walreceiver.h:160
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:412
WalRcvState walRcvState
Definition: walreceiver.h:65
static StringInfoData incoming_message
Definition: walreceiver.c:119
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1106
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:422
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:146
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:418
union WalRcvStreamOptions::@104 proto
void ConditionVariableBroadcast(ConditionVariable *cv)
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:698
#define WL_SOCKET_READABLE
Definition: latch.h:126
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
Definition: walreceiver.c:810
#define SIGPIPE
Definition: win32_port.h:164
#define SIGUSR2
Definition: win32_port.h:172
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:8248
#define PANIC
Definition: elog.h:50
#define NAPTIME_PER_CYCLE
Definition: walreceiver.c:97
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:100
void ResetLatch(Latch *latch)
Definition: latch.c:660
TimestampTz lastMsgSendTime
Definition: walreceiver.h:99
#define PG_SETMASK(mask)
Definition: pqsignal.h:19
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:438
int wal_receiver_timeout
Definition: walreceiver.c:90
Latch procLatch
Definition: proc.h:130
static WalReceiverConn * wrconn
Definition: walreceiver.c:94
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:153
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
static StringInfoData reply_message
Definition: walreceiver.c:118
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:467
void pfree(void *pointer)
Definition: mcxt.c:1169
#define NI_MAXHOST
Definition: getaddrinfo.h:88
#define ERROR
Definition: elog.h:46
#define FATAL
Definition: elog.h:49
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11773
pid_t pid
Definition: walreceiver.h:64
#define MAXCONNINFO
Definition: walreceiver.h:37
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
static char * buf
Definition: pg_test_fsync.c:68
int XLogArchiveMode
Definition: xlog.c:96
int errdetail(const char *fmt,...)
Definition: elog.c:1042
int errcode_for_file_access(void)
Definition: elog.c:721
#define SIGHUP
Definition: win32_port.h:159
XLogRecPtr startpoint
Definition: walreceiver.h:170
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:500
int pgsocket
Definition: port.h:31
sigset_t UnBlockSig
Definition: pqsignal.c:22
TimeLineID receiveStartTLI
Definition: walreceiver.h:76
static TimeLineID recvFileTLI
Definition: walreceiver.c:105
#define walrcv_get_backend_pid(conn)
Definition: walreceiver.h:428
Definition: guc.h:72
char * cluster_name
Definition: guc.c:613
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition: interrupt.c:104
Latch * latch
Definition: walreceiver.h:143
#define SIG_IGN
Definition: win32_port.h:156
static char ** options
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
static XLogSegNo recvSegNo
Definition: walreceiver.c:106
#define MAXFNAMELEN
#define SpinLockRelease(lock)
Definition: spin.h:64
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:517
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:95
#define PGINVALID_SOCKET
Definition: port.h:33
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1037
bool ready_to_display
Definition: walreceiver.h:134
TimestampTz latestWalEndTime
Definition: walreceiver.h:106
TimeLineID ThisTimeLineID
Definition: xlog.c:194
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define ereport(elevel,...)
Definition: elog.h:157
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define pg_memory_barrier()
Definition: atomics.h:145
#define SIG_DFL
Definition: win32_port.h:154
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
#define SIGALRM
Definition: win32_port.h:165
bool is_temp_slot
Definition: walreceiver.h:131
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:979
WalRcvData * WalRcv
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4981
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:118
int errmsg(const char *fmt,...)
Definition: elog.c:909
static void WalRcvDie(int code, Datum arg)
Definition: walreceiver.c:775
#define elog(elevel,...)
Definition: elog.h:232
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
Definition: walreceiver.h:410
struct Latch * MyLatch
Definition: globals.c:57
struct WalRcvStreamOptions::@104::@105 physical
#define close(a)
Definition: win32.h:12
XLogRecPtr receiveStart
Definition: walreceiver.h:75
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:642
static int recvFile
Definition: walreceiver.c:104
#define snprintf
Definition: port.h:216
#define WL_LATCH_SET
Definition: latch.h:125
#define UINT64_FORMAT
Definition: c.h:484
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
char slotname[NAMEDATALEN]
Definition: walreceiver.h:125
ConditionVariable walRcvStoppedCV
Definition: walreceiver.h:66
static struct @17 LogstreamResult
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:112
#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
Definition: walreceiver.h:426
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:404

◆ XLogWalRcvFlush()

static void XLogWalRcvFlush ( bool  dying)
static

Definition at line 979 of file walreceiver.c.

References AllowCascadeReplication, WalRcvData::flushedUpto, issue_xlog_fsync(), WalRcvData::latestChunkStart, LogstreamResult, LSN_FORMAT_ARGS, WalRcvData::mutex, WalRcvData::receivedTLI, recvFile, recvSegNo, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, ThisTimeLineID, update_process_title, WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvWrite().

980 {
981  if (LogstreamResult.Flush < LogstreamResult.Write)
982  {
983  WalRcvData *walrcv = WalRcv;
984 
986 
987  LogstreamResult.Flush = LogstreamResult.Write;
988 
989  /* Update shared-memory status */
990  SpinLockAcquire(&walrcv->mutex);
991  if (walrcv->flushedUpto < LogstreamResult.Flush)
992  {
993  walrcv->latestChunkStart = walrcv->flushedUpto;
994  walrcv->flushedUpto = LogstreamResult.Flush;
995  walrcv->receivedTLI = ThisTimeLineID;
996  }
997  SpinLockRelease(&walrcv->mutex);
998 
999  /* Signal the startup process and walsender that new WAL has arrived */
1000  WakeupRecovery();
1002  WalSndWakeup();
1003 
1004  /* Report XLOG streaming progress in PS display */
1006  {
1007  char activitymsg[50];
1008 
1009  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1011  set_ps_display(activitymsg);
1012  }
1013 
1014  /* Also let the primary know that we made some progress */
1015  if (!dying)
1016  {
1017  XLogWalRcvSendReply(false, false);
1018  XLogWalRcvSendHSFeedback(false);
1019  }
1020  }
1021 }
slock_t mutex
Definition: walreceiver.h:145
bool update_process_title
Definition: ps_status.c:36
void issue_xlog_fsync(int fd, XLogSegNo segno)
Definition: xlog.c:10672
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1106
TimeLineID receivedTLI
Definition: walreceiver.h:86
void set_ps_display(const char *activity)
Definition: ps_status.c:349
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
void WakeupRecovery(void)
Definition: xlog.c:13000
XLogRecPtr latestChunkStart
Definition: walreceiver.h:94
#define AllowCascadeReplication()
Definition: walreceiver.h:40
static XLogSegNo recvSegNo
Definition: walreceiver.c:106
#define SpinLockRelease(lock)
Definition: spin.h:64
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1037
TimeLineID ThisTimeLineID
Definition: xlog.c:194
WalRcvData * WalRcv
static int recvFile
Definition: walreceiver.c:104
XLogRecPtr flushedUpto
Definition: walreceiver.h:85
#define snprintf
Definition: port.h:216
void WalSndWakeup(void)
Definition: walsender.c:3120
static struct @17 LogstreamResult

◆ XLogWalRcvProcessMsg()

static void XLogWalRcvProcessMsg ( unsigned char  type,
char *  buf,
Size  len 
)
static

Definition at line 810 of file walreceiver.c.

References appendBinaryStringInfo(), ereport, errcode(), errmsg_internal(), ERROR, pq_getmsgbyte(), pq_getmsgint64(), ProcessWalSndrMessage(), resetStringInfo(), XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

811 {
812  int hdrlen;
813  XLogRecPtr dataStart;
814  XLogRecPtr walEnd;
815  TimestampTz sendTime;
816  bool replyRequested;
817 
819 
820  switch (type)
821  {
822  case 'w': /* WAL records */
823  {
824  /* copy message to StringInfo */
825  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
826  if (len < hdrlen)
827  ereport(ERROR,
828  (errcode(ERRCODE_PROTOCOL_VIOLATION),
829  errmsg_internal("invalid WAL message received from primary")));
831 
832  /* read the fields */
833  dataStart = pq_getmsgint64(&incoming_message);
834  walEnd = pq_getmsgint64(&incoming_message);
835  sendTime = pq_getmsgint64(&incoming_message);
836  ProcessWalSndrMessage(walEnd, sendTime);
837 
838  buf += hdrlen;
839  len -= hdrlen;
840  XLogWalRcvWrite(buf, len, dataStart);
841  break;
842  }
843  case 'k': /* Keepalive */
844  {
845  /* copy message to StringInfo */
846  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
847  if (len != hdrlen)
848  ereport(ERROR,
849  (errcode(ERRCODE_PROTOCOL_VIOLATION),
850  errmsg_internal("invalid keepalive message received from primary")));
852 
853  /* read the fields */
854  walEnd = pq_getmsgint64(&incoming_message);
855  sendTime = pq_getmsgint64(&incoming_message);
856  replyRequested = pq_getmsgbyte(&incoming_message);
857 
858  ProcessWalSndrMessage(walEnd, sendTime);
859 
860  /* If the primary requested a reply, send one immediately */
861  if (replyRequested)
862  XLogWalRcvSendReply(true, false);
863  break;
864  }
865  default:
866  ereport(ERROR,
867  (errcode(ERRCODE_PROTOCOL_VIOLATION),
868  errmsg_internal("invalid replication message type %d",
869  type)));
870  }
871 }
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
Definition: walreceiver.c:1207
int64 TimestampTz
Definition: timestamp.h:39
static StringInfoData incoming_message
Definition: walreceiver.c:119
int errcode(int sqlerrcode)
Definition: elog.c:698
#define ERROR
Definition: elog.h:46
static char * buf
Definition: pg_test_fsync.c:68
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1037
#define ereport(elevel,...)
Definition: elog.h:157
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
int errmsg_internal(const char *fmt,...)
Definition: elog.c:996
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
Definition: walreceiver.c:877

◆ XLogWalRcvSendHSFeedback()

static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1106 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, EpochFromFullTransactionId, GetCurrentTimestamp(), GetReplicationHorizons(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), pq_sendbyte(), pq_sendint32(), pq_sendint64(), ReadNextFullTransactionId(), resetStringInfo(), TimestampDifferenceExceeds(), TransactionIdIsValid, wal_receiver_status_interval, walrcv_send, and XidFromFullTransactionId.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

1107 {
1108  TimestampTz now;
1109  FullTransactionId nextFullXid;
1110  TransactionId nextXid;
1111  uint32 xmin_epoch,
1112  catalog_xmin_epoch;
1113  TransactionId xmin,
1114  catalog_xmin;
1115  static TimestampTz sendTime = 0;
1116 
1117  /* initially true so we always send at least one feedback message */
1118  static bool primary_has_standby_xmin = true;
1119 
1120  /*
1121  * If the user doesn't want status to be reported to the primary, be sure
1122  * to exit before doing anything at all.
1123  */
1125  !primary_has_standby_xmin)
1126  return;
1127 
1128  /* Get current timestamp. */
1129  now = GetCurrentTimestamp();
1130 
1131  if (!immed)
1132  {
1133  /*
1134  * Send feedback at most once per wal_receiver_status_interval.
1135  */
1136  if (!TimestampDifferenceExceeds(sendTime, now,
1138  return;
1139  sendTime = now;
1140  }
1141 
1142  /*
1143  * If Hot Standby is not yet accepting connections there is nothing to
1144  * send. Check this after the interval has expired to reduce number of
1145  * calls.
1146  *
1147  * Bailing out here also ensures that we don't send feedback until we've
1148  * read our own replication slot state, so we don't tell the primary to
1149  * discard needed xmin or catalog_xmin from any slots that may exist on
1150  * this replica.
1151  */
1152  if (!HotStandbyActive())
1153  return;
1154 
1155  /*
1156  * Make the expensive call to get the oldest xmin once we are certain
1157  * everything else has been checked.
1158  */
1160  {
1161  GetReplicationHorizons(&xmin, &catalog_xmin);
1162  }
1163  else
1164  {
1165  xmin = InvalidTransactionId;
1166  catalog_xmin = InvalidTransactionId;
1167  }
1168 
1169  /*
1170  * Get epoch and adjust if nextXid and oldestXmin are different sides of
1171  * the epoch boundary.
1172  */
1173  nextFullXid = ReadNextFullTransactionId();
1174  nextXid = XidFromFullTransactionId(nextFullXid);
1175  xmin_epoch = EpochFromFullTransactionId(nextFullXid);
1176  catalog_xmin_epoch = xmin_epoch;
1177  if (nextXid < xmin)
1178  xmin_epoch--;
1179  if (nextXid < catalog_xmin)
1180  catalog_xmin_epoch--;
1181 
1182  elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1183  xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1184 
1185  /* Construct the message and send it. */
1187  pq_sendbyte(&reply_message, 'h');
1189  pq_sendint32(&reply_message, xmin);
1190  pq_sendint32(&reply_message, xmin_epoch);
1191  pq_sendint32(&reply_message, catalog_xmin);
1192  pq_sendint32(&reply_message, catalog_xmin_epoch);
1194  if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1195  primary_has_standby_xmin = true;
1196  else
1197  primary_has_standby_xmin = false;
1198 }
bool hot_standby_feedback
Definition: walreceiver.c:91
uint32 TransactionId
Definition: c.h:587
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
int64 TimestampTz
Definition: timestamp.h:39
bool HotStandbyActive(void)
Definition: xlog.c:8322
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
int wal_receiver_status_interval
Definition: walreceiver.c:89
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1711
static WalReceiverConn * wrconn
Definition: walreceiver.c:94
#define XidFromFullTransactionId(x)
Definition: transam.h:48
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static StringInfoData reply_message
Definition: walreceiver.c:118
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
unsigned int uint32
Definition: c.h:441
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:261
void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:2005
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:424
#define elog(elevel,...)
Definition: elog.h:232
#define TransactionIdIsValid(xid)
Definition: transam.h:41
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544

◆ XLogWalRcvSendReply()

static void XLogWalRcvSendReply ( bool  force,
bool  requestReply 
)
static

Definition at line 1037 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), GetXLogReplayRecPtr(), StringInfoData::len, LogstreamResult, LSN_FORMAT_ARGS, now(), pq_sendbyte(), pq_sendint64(), resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

1038 {
1039  static XLogRecPtr writePtr = 0;
1040  static XLogRecPtr flushPtr = 0;
1041  XLogRecPtr applyPtr;
1042  static TimestampTz sendTime = 0;
1043  TimestampTz now;
1044 
1045  /*
1046  * If the user doesn't want status to be reported to the primary, be sure
1047  * to exit before doing anything at all.
1048  */
1049  if (!force && wal_receiver_status_interval <= 0)
1050  return;
1051 
1052  /* Get current timestamp. */
1053  now = GetCurrentTimestamp();
1054 
1055  /*
1056  * We can compare the write and flush positions to the last message we
1057  * sent without taking any lock, but the apply position requires a spin
1058  * lock, so we don't check that unless something else has changed or 10
1059  * seconds have passed. This means that the apply WAL location will
1060  * appear, from the primary's point of view, to lag slightly, but since
1061  * this is only for reporting purposes and only on idle systems, that's
1062  * probably OK.
1063  */
1064  if (!force
1065  && writePtr == LogstreamResult.Write
1066  && flushPtr == LogstreamResult.Flush
1067  && !TimestampDifferenceExceeds(sendTime, now,
1069  return;
1070  sendTime = now;
1071 
1072  /* Construct a new message */
1073  writePtr = LogstreamResult.Write;
1074  flushPtr = LogstreamResult.Flush;
1075  applyPtr = GetXLogReplayRecPtr(NULL);
1076 
1078  pq_sendbyte(&reply_message, 'r');
1079  pq_sendint64(&reply_message, writePtr);
1080  pq_sendint64(&reply_message, flushPtr);
1081  pq_sendint64(&reply_message, applyPtr);
1083  pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1084 
1085  /* Send it */
1086  elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1087  LSN_FORMAT_ARGS(writePtr),
1088  LSN_FORMAT_ARGS(flushPtr),
1089  LSN_FORMAT_ARGS(applyPtr),
1090  requestReply ? " (reply requested)" : "");
1091 
1093 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
int64 TimestampTz
Definition: timestamp.h:39
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
int wal_receiver_status_interval
Definition: walreceiver.c:89
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1711
static WalReceiverConn * wrconn
Definition: walreceiver.c:94
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static StringInfoData reply_message
Definition: walreceiver.c:118
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11773
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:424
#define elog(elevel,...)
Definition: elog.h:232
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
static struct @17 LogstreamResult

◆ XLogWalRcvWrite()

static void XLogWalRcvWrite ( char *  buf,
Size  nbytes,
XLogRecPtr  recptr 
)
static

Definition at line 877 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, close, ereport, errcode_for_file_access(), errmsg(), LogstreamResult, MAXFNAMELEN, PANIC, pg_atomic_write_u64(), pg_pwrite(), recvFile, recvFileTLI, recvSegNo, ThisTimeLineID, wal_segment_size, WalRcv, WalRcvData::writtenUpto, XLByteInSeg, XLByteToSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileInit(), XLogFileName, XLogSegmentOffset, and XLogWalRcvFlush().

Referenced by XLogWalRcvProcessMsg().

878 {
879  int startoff;
880  int byteswritten;
881 
882  while (nbytes > 0)
883  {
884  int segbytes;
885 
886  if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
887  {
888  /*
889  * fsync() and close current file before we switch to next one. We
890  * would otherwise have to reopen this file to fsync it later
891  */
892  if (recvFile >= 0)
893  {
894  char xlogfname[MAXFNAMELEN];
895 
896  XLogWalRcvFlush(false);
897 
899 
900  /*
901  * XLOG segment files will be re-read by recovery in startup
902  * process soon, so we don't advise the OS to release cache
903  * pages associated with the file like XLogFileClose() does.
904  */
905  if (close(recvFile) != 0)
906  ereport(PANIC,
908  errmsg("could not close log segment %s: %m",
909  xlogfname)));
910 
911  /*
912  * Create .done file forcibly to prevent the streamed segment
913  * from being archived later.
914  */
916  XLogArchiveForceDone(xlogfname);
917  else
918  XLogArchiveNotify(xlogfname);
919  }
920  recvFile = -1;
921 
922  /* Create/use new log file */
926  }
927 
928  /* Calculate the start offset of the received logs */
929  startoff = XLogSegmentOffset(recptr, wal_segment_size);
930 
931  if (startoff + nbytes > wal_segment_size)
932  segbytes = wal_segment_size - startoff;
933  else
934  segbytes = nbytes;
935 
936  /* OK to write the logs */
937  errno = 0;
938 
939  byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
940  if (byteswritten <= 0)
941  {
942  char xlogfname[MAXFNAMELEN];
943  int save_errno;
944 
945  /* if write didn't set errno, assume no disk space */
946  if (errno == 0)
947  errno = ENOSPC;
948 
949  save_errno = errno;
951  errno = save_errno;
952  ereport(PANIC,
954  errmsg("could not write to log segment %s "
955  "at offset %u, length %lu: %m",
956  xlogfname, startoff, (unsigned long) segbytes)));
957  }
958 
959  /* Update state for write */
960  recptr += byteswritten;
961 
962  nbytes -= byteswritten;
963  buf += byteswritten;
964 
965  LogstreamResult.Write = recptr;
966  }
967 
968  /* Update shared-memory status */
970 }
int wal_segment_size
Definition: xlog.c:119
int XLogFileInit(XLogSegNo logsegno)
Definition: xlog.c:3466
#define PANIC
Definition: elog.h:50
ssize_t pg_pwrite(int fd, const void *buf, size_t nbyte, off_t offset)
Definition: pwrite.c:27
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:438
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:153
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:467
static char * buf
Definition: pg_test_fsync.c:68
int XLogArchiveMode
Definition: xlog.c:96
int errcode_for_file_access(void)
Definition: elog.c:721
static TimeLineID recvFileTLI
Definition: walreceiver.c:105
static XLogSegNo recvSegNo
Definition: walreceiver.c:106
#define MAXFNAMELEN
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:517
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
TimeLineID ThisTimeLineID
Definition: xlog.c:194
#define ereport(elevel,...)
Definition: elog.h:157
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:979
WalRcvData * WalRcv
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define close(a)
Definition: win32.h:12
static int recvFile
Definition: walreceiver.c:104
static struct @17 LogstreamResult
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

Variable Documentation

◆ Flush

XLogRecPtr Flush

Definition at line 115 of file walreceiver.c.

◆ hot_standby_feedback

bool hot_standby_feedback

Definition at line 91 of file walreceiver.c.

Referenced by XLogWalRcvSendHSFeedback().

◆ incoming_message

StringInfoData incoming_message
static

Definition at line 119 of file walreceiver.c.

◆ LogstreamResult

struct { ... } LogstreamResult

◆ recvFile

int recvFile = -1
static

Definition at line 104 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ recvFileTLI

TimeLineID recvFileTLI = 0
static

Definition at line 105 of file walreceiver.c.

Referenced by WalReceiverMain(), and XLogWalRcvWrite().

◆ recvSegNo

XLogSegNo recvSegNo = 0
static

Definition at line 106 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ reply_message

StringInfoData reply_message
static

Definition at line 118 of file walreceiver.c.

Referenced by send_feedback().

◆ wal_receiver_status_interval

int wal_receiver_status_interval

Definition at line 89 of file walreceiver.c.

Referenced by send_feedback(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ wal_receiver_timeout

int wal_receiver_timeout

Definition at line 90 of file walreceiver.c.

Referenced by logicalrep_worker_launch(), LogicalRepApplyLoop(), and WalReceiverMain().

◆ WalReceiverFunctions

WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 95 of file walreceiver.c.

Referenced by _PG_init().

◆ wrconn

WalReceiverConn* wrconn = NULL
static

Definition at line 94 of file walreceiver.c.

Referenced by AlterSubscription_refresh(), CreateSubscription(), and DropSubscription().

◆ Write

XLogRecPtr Write

Definition at line 114 of file walreceiver.c.