PostgreSQL Source Code  git master
walreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "common/ip.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */
 

Functions

static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr)
 
static void XLogWalRcvFlush (bool dying)
 
static void XLogWalRcvSendReply (bool force, bool requestReply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
static void WalRcvSigHupHandler (SIGNAL_ARGS)
 
static void WalRcvShutdownHandler (SIGNAL_ARGS)
 
void ProcessWalRcvInterrupts (void)
 
void WalReceiverMain (void)
 
void WalRcvForceReply (void)
 
static const char * WalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
static volatile sig_atomic_t got_SIGHUP = false
 
static volatile sig_atomic_t got_SIGTERM = false
 
struct {
   XLogRecPtr   Write
 
   XLogRecPtr   Flush
 
LogstreamResult
 
static StringInfoData reply_message
 
static StringInfoData incoming_message
 

Macro Definition Documentation

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */

Definition at line 96 of file walreceiver.c.

Referenced by WalReceiverMain().

Function Documentation

◆ pg_stat_get_wal_receiver()

Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1332 of file walreceiver.c.

References WalRcvData::conninfo, CStringGetTextDatum, elog, ERROR, WalRcvData::flushedUpto, get_call_result_type(), GetUserId(), heap_form_tuple(), HeapTupleGetDatum, Int32GetDatum, is_member_of_role(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum, MAXCONNINFO, MemSet, WalRcvData::mutex, NAMEDATALEN, TupleDescData::natts, NI_MAXHOST, palloc0(), pg_atomic_read_u64(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::sender_host, WalRcvData::sender_port, WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, strlcpy(), TimestampTzGetDatum, TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, WalRcvData::writtenUpto, and XLogRecPtrIsInvalid.

1333 {
1334  TupleDesc tupdesc;
1335  Datum *values;
1336  bool *nulls;
1337  int pid;
1338  bool ready_to_display;
1340  XLogRecPtr receive_start_lsn;
1341  TimeLineID receive_start_tli;
1342  XLogRecPtr written_lsn;
1343  XLogRecPtr flushed_lsn;
1344  TimeLineID received_tli;
1345  TimestampTz last_send_time;
1346  TimestampTz last_receipt_time;
1347  XLogRecPtr latest_end_lsn;
1348  TimestampTz latest_end_time;
1349  char sender_host[NI_MAXHOST];
1350  int sender_port = 0;
1351  char slotname[NAMEDATALEN];
1352  char conninfo[MAXCONNINFO];
1353 
1354  /* Take a lock to ensure value consistency */
1356  pid = (int) WalRcv->pid;
1357  ready_to_display = WalRcv->ready_to_display;
1358  state = WalRcv->walRcvState;
1359  receive_start_lsn = WalRcv->receiveStart;
1360  receive_start_tli = WalRcv->receiveStartTLI;
1361  written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto);
1362  flushed_lsn = WalRcv->flushedUpto;
1363  received_tli = WalRcv->receivedTLI;
1364  last_send_time = WalRcv->lastMsgSendTime;
1365  last_receipt_time = WalRcv->lastMsgReceiptTime;
1366  latest_end_lsn = WalRcv->latestWalEnd;
1367  latest_end_time = WalRcv->latestWalEndTime;
1368  strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1369  strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
1370  sender_port = WalRcv->sender_port;
1371  strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1373 
1374  /*
1375  * No WAL receiver (or not ready yet), just return a tuple with NULL
1376  * values
1377  */
1378  if (pid == 0 || !ready_to_display)
1379  PG_RETURN_NULL();
1380 
1381  /* determine result type */
1382  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1383  elog(ERROR, "return type must be a row type");
1384 
1385  values = palloc0(sizeof(Datum) * tupdesc->natts);
1386  nulls = palloc0(sizeof(bool) * tupdesc->natts);
1387 
1388  /* Fetch values */
1389  values[0] = Int32GetDatum(pid);
1390 
1391  if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
1392  {
1393  /*
1394  * Only superusers and members of pg_read_all_stats can see details.
1395  * Other users only get the pid value to know whether it is a WAL
1396  * receiver, but no details.
1397  */
1398  MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1399  }
1400  else
1401  {
1402  values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1403 
1404  if (XLogRecPtrIsInvalid(receive_start_lsn))
1405  nulls[2] = true;
1406  else
1407  values[2] = LSNGetDatum(receive_start_lsn);
1408  values[3] = Int32GetDatum(receive_start_tli);
1409  if (XLogRecPtrIsInvalid(written_lsn))
1410  nulls[4] = true;
1411  else
1412  values[4] = LSNGetDatum(written_lsn);
1413  if (XLogRecPtrIsInvalid(flushed_lsn))
1414  nulls[5] = true;
1415  else
1416  values[5] = LSNGetDatum(flushed_lsn);
1417  values[6] = Int32GetDatum(received_tli);
1418  if (last_send_time == 0)
1419  nulls[7] = true;
1420  else
1421  values[7] = TimestampTzGetDatum(last_send_time);
1422  if (last_receipt_time == 0)
1423  nulls[8] = true;
1424  else
1425  values[8] = TimestampTzGetDatum(last_receipt_time);
1426  if (XLogRecPtrIsInvalid(latest_end_lsn))
1427  nulls[9] = true;
1428  else
1429  values[9] = LSNGetDatum(latest_end_lsn);
1430  if (latest_end_time == 0)
1431  nulls[10] = true;
1432  else
1433  values[10] = TimestampTzGetDatum(latest_end_time);
1434  if (*slotname == '\0')
1435  nulls[11] = true;
1436  else
1437  values[11] = CStringGetTextDatum(slotname);
1438  if (*sender_host == '\0')
1439  nulls[12] = true;
1440  else
1441  values[12] = CStringGetTextDatum(sender_host);
1442  if (sender_port == 0)
1443  nulls[13] = true;
1444  else
1445  values[13] = Int32GetDatum(sender_port);
1446  if (*conninfo == '\0')
1447  nulls[14] = true;
1448  else
1449  values[14] = CStringGetTextDatum(conninfo);
1450  }
1451 
1452  /* Returns the record as Datum */
1453  PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1454 }
int sender_port
Definition: walreceiver.h:117
static const char * WalRcvGetStateString(WalRcvState state)
Definition: walreceiver.c:1307
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:205
uint32 TimeLineID
Definition: xlogdefs.h:52
slock_t mutex
Definition: walreceiver.h:143
Oid GetUserId(void)
Definition: miscinit.c:476
int64 TimestampTz
Definition: timestamp.h:39
WalRcvState walRcvState
Definition: walreceiver.h:64
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
TimeLineID receivedTLI
Definition: walreceiver.h:84
#define MemSet(start, val, len)
Definition: c.h:950
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:98
TimestampTz lastMsgSendTime
Definition: walreceiver.h:97
WalRcvState
Definition: walreceiver.h:44
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:151
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define NI_MAXHOST
Definition: getaddrinfo.h:88
#define ERROR
Definition: elog.h:43
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
pid_t pid
Definition: walreceiver.h:63
#define MAXCONNINFO
Definition: walreceiver.h:36
TimeLineID receiveStartTLI
Definition: walreceiver.h:74
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
void * palloc0(Size size)
Definition: mcxt.c:981
uintptr_t Datum
Definition: postgres.h:367
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:352
bool ready_to_display
Definition: walreceiver.h:132
TimestampTz latestWalEndTime
Definition: walreceiver.h:104
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4916
XLogRecPtr latestWalEnd
Definition: walreceiver.h:103
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:221
WalRcvData * WalRcv
static Datum values[MAXATTR]
Definition: bootstrap.c:165
#define Int32GetDatum(X)
Definition: postgres.h:479
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:429
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:116
#define elog(elevel,...)
Definition: elog.h:214
#define CStringGetTextDatum(s)
Definition: builtins.h:86
XLogRecPtr receiveStart
Definition: walreceiver.h:73
XLogRecPtr flushedUpto
Definition: walreceiver.h:83
#define PG_RETURN_NULL()
Definition: fmgr.h:344
char slotname[NAMEDATALEN]
Definition: walreceiver.h:123
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:110

◆ ProcessWalRcvInterrupts()

void ProcessWalRcvInterrupts ( void  )

Definition at line 157 of file walreceiver.c.

References CHECK_FOR_INTERRUPTS, ereport, errcode(), errmsg(), FATAL, and got_SIGTERM.

Referenced by libpqrcv_connect(), libpqrcv_PQgetResult(), libpqrcv_processTuples(), walrcv_clear_result(), WalRcvWaitForStartPosition(), and WalReceiverMain().

158 {
159  /*
160  * Although walreceiver interrupt handling doesn't use the same scheme as
161  * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
162  * any incoming signals on Win32, and also to make sure we process any
163  * barrier events.
164  */
166 
167  if (got_SIGTERM)
168  {
169  ereport(FATAL,
170  (errcode(ERRCODE_ADMIN_SHUTDOWN),
171  errmsg("terminating walreceiver process due to administrator command")));
172  }
173 }
int errcode(int sqlerrcode)
Definition: elog.c:610
#define FATAL
Definition: elog.h:52
#define ereport(elevel,...)
Definition: elog.h:144
int errmsg(const char *fmt,...)
Definition: elog.c:821
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static volatile sig_atomic_t got_SIGTERM
Definition: walreceiver.c:112

◆ ProcessWalSndrMessage()

static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1236 of file walreceiver.c.

References DEBUG2, elog, GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, log_min_messages, WalRcvData::mutex, pfree(), pstrdup(), SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

1237 {
1238  WalRcvData *walrcv = WalRcv;
1239 
1240  TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1241 
1242  /* Update shared-memory status */
1243  SpinLockAcquire(&walrcv->mutex);
1244  if (walrcv->latestWalEnd < walEnd)
1245  walrcv->latestWalEndTime = sendTime;
1246  walrcv->latestWalEnd = walEnd;
1247  walrcv->lastMsgSendTime = sendTime;
1248  walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1249  SpinLockRelease(&walrcv->mutex);
1250 
1251  if (log_min_messages <= DEBUG2)
1252  {
1253  char *sendtime;
1254  char *receipttime;
1255  int applyDelay;
1256 
1257  /* Copy because timestamptz_to_str returns a static buffer */
1258  sendtime = pstrdup(timestamptz_to_str(sendTime));
1259  receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1260  applyDelay = GetReplicationApplyDelay();
1261 
1262  /* apply delay is not available */
1263  if (applyDelay == -1)
1264  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1265  sendtime,
1266  receipttime,
1268  else
1269  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1270  sendtime,
1271  receipttime,
1272  applyDelay,
1274 
1275  pfree(sendtime);
1276  pfree(receipttime);
1277  }
1278 }
slock_t mutex
Definition: walreceiver.h:143
int GetReplicationTransferLatency(void)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1187
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:98
TimestampTz lastMsgSendTime
Definition: walreceiver.h:97
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pfree(void *pointer)
Definition: mcxt.c:1057
#define DEBUG2
Definition: elog.h:24
int GetReplicationApplyDelay(void)
#define SpinLockRelease(lock)
Definition: spin.h:64
TimestampTz latestWalEndTime
Definition: walreceiver.h:104
int log_min_messages
Definition: guc.c:538
XLogRecPtr latestWalEnd
Definition: walreceiver.h:103
WalRcvData * WalRcv
#define elog(elevel,...)
Definition: elog.h:214
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1740

◆ WalRcvDie()

static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 780 of file walreceiver.c.

References Assert, WalRcvData::latch, WalRcvData::mutex, MyProcPid, WalRcvData::pid, WalRcvData::ready_to_display, SpinLockAcquire, SpinLockRelease, WakeupRecovery(), WalRcv, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

781 {
782  WalRcvData *walrcv = WalRcv;
783 
784  /* Ensure that all WAL records received are flushed to disk */
785  XLogWalRcvFlush(true);
786 
787  /* Mark ourselves inactive in shared memory */
788  SpinLockAcquire(&walrcv->mutex);
789  Assert(walrcv->walRcvState == WALRCV_STREAMING ||
790  walrcv->walRcvState == WALRCV_RESTARTING ||
791  walrcv->walRcvState == WALRCV_STARTING ||
792  walrcv->walRcvState == WALRCV_WAITING ||
793  walrcv->walRcvState == WALRCV_STOPPING);
794  Assert(walrcv->pid == MyProcPid);
795  walrcv->walRcvState = WALRCV_STOPPED;
796  walrcv->pid = 0;
797  walrcv->ready_to_display = false;
798  walrcv->latch = NULL;
799  SpinLockRelease(&walrcv->mutex);
800 
801  /* Terminate the connection gracefully. */
802  if (wrconn != NULL)
804 
805  /* Wake up the startup process to notice promptly that we're gone */
806  WakeupRecovery();
807 }
int MyProcPid
Definition: globals.c:40
slock_t mutex
Definition: walreceiver.h:143
WalRcvState walRcvState
Definition: walreceiver.h:64
static WalReceiverConn * wrconn
Definition: walreceiver.c:93
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12675
pid_t pid
Definition: walreceiver.h:63
Latch * latch
Definition: walreceiver.h:141
#define SpinLockRelease(lock)
Definition: spin.h:64
bool ready_to_display
Definition: walreceiver.h:132
#define Assert(condition)
Definition: c.h:746
#define walrcv_disconnect(conn)
Definition: walreceiver.h:426
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1007
WalRcvData * WalRcv

◆ WalRcvFetchTimeLineHistoryFiles()

static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 724 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, ereport, errcode(), errmsg(), errmsg_internal(), ERROR, existsTimeLineHistory(), LOG, MAXFNAMELEN, pfree(), TLHistoryFileName, walrcv_readtimelinehistoryfile, writeTimeLineHistoryFile(), XLogArchiveForceDone(), XLogArchiveMode, and XLogArchiveNotify().

Referenced by WalReceiverMain().

725 {
726  TimeLineID tli;
727 
728  for (tli = first; tli <= last; tli++)
729  {
730  /* there's no history file for timeline 1 */
731  if (tli != 1 && !existsTimeLineHistory(tli))
732  {
733  char *fname;
734  char *content;
735  int len;
736  char expectedfname[MAXFNAMELEN];
737 
738  ereport(LOG,
739  (errmsg("fetching timeline history file for timeline %u from primary server",
740  tli)));
741 
742  walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
743 
744  /*
745  * Check that the filename on the primary matches what we
746  * calculated ourselves. This is just a sanity check, it should
747  * always match.
748  */
749  TLHistoryFileName(expectedfname, tli);
750  if (strcmp(fname, expectedfname) != 0)
751  ereport(ERROR,
752  (errcode(ERRCODE_PROTOCOL_VIOLATION),
753  errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
754  tli)));
755 
756  /*
757  * Write the file to pg_wal.
758  */
759  writeTimeLineHistoryFile(tli, content, len);
760 
761  /*
762  * Mark the streamed history file as ready for archiving
763  * if archive_mode is always.
764  */
766  XLogArchiveForceDone(fname);
767  else
768  XLogArchiveNotify(fname);
769 
770  pfree(fname);
771  pfree(content);
772  }
773  }
774 }
uint32 TimeLineID
Definition: xlogdefs.h:52
int errcode(int sqlerrcode)
Definition: elog.c:610
#define LOG
Definition: elog.h:26
static WalReceiverConn * wrconn
Definition: walreceiver.c:93
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:466
void pfree(void *pointer)
Definition: mcxt.c:1057
#define ERROR
Definition: elog.h:43
#define TLHistoryFileName(fname, tli)
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition: timeline.c:467
int XLogArchiveMode
Definition: xlog.c:95
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition: timeline.c:222
#define MAXFNAMELEN
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:516
#define ereport(elevel,...)
Definition: elog.h:144
int errmsg_internal(const char *fmt,...)
Definition: elog.c:908
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
Definition: walreceiver.h:410
int errmsg(const char *fmt,...)
Definition: elog.c:821

◆ WalRcvForceReply()

void WalRcvForceReply ( void  )

Definition at line 1289 of file walreceiver.c.

References WalRcvData::force_reply, WalRcvData::latch, WalRcvData::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by StartupXLOG(), WaitForWALToBecomeAvailable(), and walrcv_clear_result().

1290 {
1291  Latch *latch;
1292 
1293  WalRcv->force_reply = true;
1294  /* fetching the latch pointer might not be atomic, so use spinlock */
1296  latch = WalRcv->latch;
1298  if (latch)
1299  SetLatch(latch);
1300 }
slock_t mutex
Definition: walreceiver.h:143
sig_atomic_t force_reply
Definition: walreceiver.h:158
void SetLatch(Latch *latch)
Definition: latch.c:505
#define SpinLockAcquire(lock)
Definition: spin.h:62
Definition: latch.h:110
Latch * latch
Definition: walreceiver.h:141
#define SpinLockRelease(lock)
Definition: spin.h:64
WalRcvData * WalRcv

◆ WalRcvGetStateString()

static const char* WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1307 of file walreceiver.c.

References WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

1308 {
1309  switch (state)
1310  {
1311  case WALRCV_STOPPED:
1312  return "stopped";
1313  case WALRCV_STARTING:
1314  return "starting";
1315  case WALRCV_STREAMING:
1316  return "streaming";
1317  case WALRCV_WAITING:
1318  return "waiting";
1319  case WALRCV_RESTARTING:
1320  return "restarting";
1321  case WALRCV_STOPPING:
1322  return "stopping";
1323  }
1324  return "UNKNOWN";
1325 }
Definition: regguts.h:298

◆ WalRcvShutdownHandler()

static void WalRcvShutdownHandler ( SIGNAL_ARGS  )
static

Definition at line 819 of file walreceiver.c.

References got_SIGTERM, WalRcvData::latch, SetLatch(), and WalRcv.

Referenced by WalReceiverMain().

820 {
821  int save_errno = errno;
822 
823  got_SIGTERM = true;
824 
825  if (WalRcv->latch)
827 
828  errno = save_errno;
829 }
void SetLatch(Latch *latch)
Definition: latch.c:505
Latch * latch
Definition: walreceiver.h:141
WalRcvData * WalRcv
static volatile sig_atomic_t got_SIGTERM
Definition: walreceiver.c:112

◆ WalRcvSigHupHandler()

static void WalRcvSigHupHandler ( SIGNAL_ARGS  )
static

Definition at line 811 of file walreceiver.c.

References got_SIGHUP.

Referenced by WalReceiverMain().

812 {
813  got_SIGHUP = true;
814 }
static volatile sig_atomic_t got_SIGHUP
Definition: walreceiver.c:111

◆ WalRcvWaitForStartPosition()

static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 643 of file walreceiver.c.

References Assert, elog, FATAL, InvalidXLogRecPtr, WalRcvData::latch, WalRcvData::mutex, proc_exit(), ProcessWalRcvInterrupts(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, ResetLatch(), set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WAIT_EVENT_WAL_RECEIVER_WAIT_START, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by WalReceiverMain().

644 {
645  WalRcvData *walrcv = WalRcv;
646  int state;
647 
648  SpinLockAcquire(&walrcv->mutex);
649  state = walrcv->walRcvState;
650  if (state != WALRCV_STREAMING)
651  {
652  SpinLockRelease(&walrcv->mutex);
653  if (state == WALRCV_STOPPING)
654  proc_exit(0);
655  else
656  elog(FATAL, "unexpected walreceiver state");
657  }
658  walrcv->walRcvState = WALRCV_WAITING;
660  walrcv->receiveStartTLI = 0;
661  SpinLockRelease(&walrcv->mutex);
662 
663  set_ps_display("idle");
664 
665  /*
666  * nudge startup process to notice that we've stopped streaming and are
667  * now waiting for instructions.
668  */
669  WakeupRecovery();
670  for (;;)
671  {
672  ResetLatch(walrcv->latch);
673 
675 
676  SpinLockAcquire(&walrcv->mutex);
677  Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
678  walrcv->walRcvState == WALRCV_WAITING ||
679  walrcv->walRcvState == WALRCV_STOPPING);
680  if (walrcv->walRcvState == WALRCV_RESTARTING)
681  {
682  /*
683  * No need to handle changes in primary_conninfo or
684  * primary_slotname here. Startup process will signal us to
685  * terminate in case those change.
686  */
687  *startpoint = walrcv->receiveStart;
688  *startpointTLI = walrcv->receiveStartTLI;
689  walrcv->walRcvState = WALRCV_STREAMING;
690  SpinLockRelease(&walrcv->mutex);
691  break;
692  }
693  if (walrcv->walRcvState == WALRCV_STOPPING)
694  {
695  /*
696  * We should've received SIGTERM if the startup process wants us
697  * to die, but might as well check it here too.
698  */
699  SpinLockRelease(&walrcv->mutex);
700  exit(1);
701  }
702  SpinLockRelease(&walrcv->mutex);
703 
704  (void) WaitLatch(walrcv->latch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
706  }
707 
709  {
710  char activitymsg[50];
711 
712  snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
713  (uint32) (*startpoint >> 32),
714  (uint32) *startpoint);
715  set_ps_display(activitymsg);
716  }
717 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
slock_t mutex
Definition: walreceiver.h:143
bool update_process_title
Definition: ps_status.c:36
WalRcvState walRcvState
Definition: walreceiver.h:64
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:157
void proc_exit(int code)
Definition: ipc.c:104
void ResetLatch(Latch *latch)
Definition: latch.c:588
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:390
void set_ps_display(const char *activity)
Definition: ps_status.c:349
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12675
#define FATAL
Definition: elog.h:52
unsigned int uint32
Definition: c.h:375
TimeLineID receiveStartTLI
Definition: walreceiver.h:74
Latch * latch
Definition: walreceiver.h:141
#define SpinLockRelease(lock)
Definition: spin.h:64
#define Assert(condition)
Definition: c.h:746
Definition: regguts.h:298
WalRcvData * WalRcv
#define elog(elevel,...)
Definition: elog.h:214
XLogRecPtr receiveStart
Definition: walreceiver.h:73
#define snprintf
Definition: port.h:215
#define WL_LATCH_SET
Definition: latch.h:124
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ WalReceiverMain()

void WalReceiverMain ( void  )

Definition at line 178 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, Assert, buf, close, cluster_name, WalRcvData::conninfo, DEBUG1, elog, ereport, errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, WalRcvData::force_reply, GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), got_SIGHUP, initStringInfo(), WalRcvData::is_temp_slot, WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latch, WalRcvData::latestWalEndTime, load_file(), LOG, WalRcvStreamOptions::logical, LogstreamResult, MAXCONNINFO, MAXFNAMELEN, WalRcvData::mutex, MyProc, MyProcPid, NAMEDATALEN, NAPTIME_PER_CYCLE, NI_MAXHOST, now(), on_shmem_exit(), options, PANIC, pfree(), pg_atomic_init_u64(), pg_memory_barrier, PG_SETMASK, PGC_SIGHUP, PGINVALID_SOCKET, WalRcvStreamOptions::physical, WalRcvData::pid, pqsignal(), proc_exit(), ProcessConfigFile(), ProcessWalRcvInterrupts(), PGPROC::procLatch, procsignal_sigusr1_handler(), WalRcvStreamOptions::proto, WalRcvData::ready_to_display, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, ResetLatch(), WalRcvData::sender_host, WalRcvData::sender_port, SIG_DFL, SIG_IGN, SIGALRM, SIGCHLD, SIGHUP, SIGPIPE, SIGUSR1, SIGUSR2, WalRcvData::slotname, WalRcvStreamOptions::slotname, snprintf, SpinLockAcquire, SpinLockRelease, WalRcvStreamOptions::startpoint, strlcpy(), ThisTimeLineID, TimestampTzPlusMilliseconds, UINT64_FORMAT, UnBlockSig, WAIT_EVENT_WAL_RECEIVER_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, wal_segment_size, WalRcv, walrcv_connect, walrcv_create_slot, walrcv_endstreaming, walrcv_get_backend_pid, walrcv_get_conninfo, walrcv_get_senderinfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvShutdownHandler(), WalRcvSigHupHandler(), WalRcvData::walRcvState, WalRcvWaitForStartPosition(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_TIMEOUT, WalRcvData::writtenUpto, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName, XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by AuxiliaryProcessMain(), and walrcv_clear_result().

179 {
180  char conninfo[MAXCONNINFO];
181  char *tmp_conninfo;
182  char slotname[NAMEDATALEN];
183  bool is_temp_slot;
184  XLogRecPtr startpoint;
185  TimeLineID startpointTLI;
186  TimeLineID primaryTLI;
187  bool first_stream;
188  WalRcvData *walrcv = WalRcv;
189  TimestampTz last_recv_timestamp;
191  bool ping_sent;
192  char *err;
193  char *sender_host = NULL;
194  int sender_port = 0;
195 
196  /*
197  * WalRcv should be set up already (if we are a backend, we inherit this
198  * by fork() or EXEC_BACKEND mechanism from the postmaster).
199  */
200  Assert(walrcv != NULL);
201 
202  now = GetCurrentTimestamp();
203 
204  /*
205  * Mark walreceiver as running in shared memory.
206  *
207  * Do this as early as possible, so that if we fail later on, we'll set
208  * state to STOPPED. If we die before this, the startup process will keep
209  * waiting for us to start up, until it times out.
210  */
211  SpinLockAcquire(&walrcv->mutex);
212  Assert(walrcv->pid == 0);
213  switch (walrcv->walRcvState)
214  {
215  case WALRCV_STOPPING:
216  /* If we've already been requested to stop, don't start up. */
217  walrcv->walRcvState = WALRCV_STOPPED;
218  /* fall through */
219 
220  case WALRCV_STOPPED:
221  SpinLockRelease(&walrcv->mutex);
222  proc_exit(1);
223  break;
224 
225  case WALRCV_STARTING:
226  /* The usual case */
227  break;
228 
229  case WALRCV_WAITING:
230  case WALRCV_STREAMING:
231  case WALRCV_RESTARTING:
232  default:
233  /* Shouldn't happen */
234  SpinLockRelease(&walrcv->mutex);
235  elog(PANIC, "walreceiver still running according to shared memory state");
236  }
237  /* Advertise our PID so that the startup process can kill us */
238  walrcv->pid = MyProcPid;
239  walrcv->walRcvState = WALRCV_STREAMING;
240 
241  /* Fetch information required to start streaming */
242  walrcv->ready_to_display = false;
243  strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
244  strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
245  is_temp_slot = walrcv->is_temp_slot;
246  startpoint = walrcv->receiveStart;
247  startpointTLI = walrcv->receiveStartTLI;
248 
249  /*
250  * At most one of is_temp_slot and slotname can be set; otherwise,
251  * RequestXLogStreaming messed up.
252  */
253  Assert(!is_temp_slot || (slotname[0] == '\0'));
254 
255  /* Initialise to a sanish value */
256  walrcv->lastMsgSendTime =
257  walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
258 
259  /* Report the latch to use to awaken this process */
260  walrcv->latch = &MyProc->procLatch;
261 
262  SpinLockRelease(&walrcv->mutex);
263 
265 
266  /* Arrange to clean up at walreceiver exit */
268 
269  /* Properly accept or ignore signals the postmaster might send us */
270  pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */
271  pqsignal(SIGINT, SIG_IGN);
272  pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */
273  /* SIGQUIT handler was already set up by InitPostmasterChild */
278 
279  /* Reset some signals that are accepted by postmaster but not here */
281 
282  /* Load the libpq-specific functions */
283  load_file("libpqwalreceiver", false);
284  if (WalReceiverFunctions == NULL)
285  elog(ERROR, "libpqwalreceiver didn't initialize correctly");
286 
287  /* Unblock signals (they were blocked when the postmaster forked us) */
289 
290  /* Establish the connection to the primary for XLOG streaming */
291  wrconn = walrcv_connect(conninfo, false, cluster_name[0] ? cluster_name : "walreceiver", &err);
292  if (!wrconn)
293  ereport(ERROR,
294  (errmsg("could not connect to the primary server: %s", err)));
295 
296  /*
297  * Save user-visible connection string. This clobbers the original
298  * conninfo, for security. Also save host and port of the sender server
299  * this walreceiver is connected to.
300  */
301  tmp_conninfo = walrcv_get_conninfo(wrconn);
302  walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
303  SpinLockAcquire(&walrcv->mutex);
304  memset(walrcv->conninfo, 0, MAXCONNINFO);
305  if (tmp_conninfo)
306  strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
307 
308  memset(walrcv->sender_host, 0, NI_MAXHOST);
309  if (sender_host)
310  strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
311 
312  walrcv->sender_port = sender_port;
313  walrcv->ready_to_display = true;
314  SpinLockRelease(&walrcv->mutex);
315 
316  if (tmp_conninfo)
317  pfree(tmp_conninfo);
318 
319  if (sender_host)
320  pfree(sender_host);
321 
322  first_stream = true;
323  for (;;)
324  {
325  char *primary_sysid;
326  char standby_sysid[32];
328 
329  /*
330  * Check that we're connected to a valid server using the
331  * IDENTIFY_SYSTEM replication command.
332  */
333  primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
334 
335  snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
337  if (strcmp(primary_sysid, standby_sysid) != 0)
338  {
339  ereport(ERROR,
340  (errmsg("database system identifier differs between the primary and standby"),
341  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
342  primary_sysid, standby_sysid)));
343  }
344 
345  /*
346  * Confirm that the current timeline of the primary is the same or
347  * ahead of ours.
348  */
349  if (primaryTLI < startpointTLI)
350  ereport(ERROR,
351  (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
352  primaryTLI, startpointTLI)));
353 
354  /*
355  * Get any missing history files. We do this always, even when we're
356  * not interested in that timeline, so that if we're promoted to
357  * become the primary later on, we don't select the same timeline that
358  * was already used in the current primary. This isn't bullet-proof -
359  * you'll need some external software to manage your cluster if you
360  * need to ensure that a unique timeline id is chosen in every case,
361  * but let's avoid the confusion of timeline id collisions where we
362  * can.
363  */
364  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
365 
366  /*
367  * Create temporary replication slot if requested, and update slot
368  * name in shared memory. (Note the slot name cannot already be set
369  * in this case.)
370  */
371  if (is_temp_slot)
372  {
373  snprintf(slotname, sizeof(slotname),
374  "pg_walreceiver_%lld",
375  (long long int) walrcv_get_backend_pid(wrconn));
376 
377  walrcv_create_slot(wrconn, slotname, true, 0, NULL);
378 
379  SpinLockAcquire(&walrcv->mutex);
380  strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
381  SpinLockRelease(&walrcv->mutex);
382  }
383 
384  /*
385  * Start streaming.
386  *
387  * We'll try to start at the requested starting point and timeline,
388  * even if it's different from the server's latest timeline. In case
389  * we've already reached the end of the old timeline, the server will
390  * finish the streaming immediately, and we will go back to await
391  * orders from the startup process. If recovery_target_timeline is
392  * 'latest', the startup process will scan pg_wal and find the new
393  * history file, bump recovery target timeline, and ask us to restart
394  * on the new timeline.
395  */
396  options.logical = false;
397  options.startpoint = startpoint;
398  options.slotname = slotname[0] != '\0' ? slotname : NULL;
399  options.proto.physical.startpointTLI = startpointTLI;
400  ThisTimeLineID = startpointTLI;
401  if (walrcv_startstreaming(wrconn, &options))
402  {
403  if (first_stream)
404  ereport(LOG,
405  (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
406  (uint32) (startpoint >> 32), (uint32) startpoint,
407  startpointTLI)));
408  else
409  ereport(LOG,
410  (errmsg("restarted WAL streaming at %X/%X on timeline %u",
411  (uint32) (startpoint >> 32), (uint32) startpoint,
412  startpointTLI)));
413  first_stream = false;
414 
415  /* Initialize LogstreamResult and buffers for processing messages */
419 
420  /* Initialize the last recv timestamp */
421  last_recv_timestamp = GetCurrentTimestamp();
422  ping_sent = false;
423 
424  /* Loop until end-of-streaming or error */
425  for (;;)
426  {
427  char *buf;
428  int len;
429  bool endofwal = false;
430  pgsocket wait_fd = PGINVALID_SOCKET;
431  int rc;
432 
433  /*
434  * Exit walreceiver if we're not in recovery. This should not
435  * happen, but cross-check the status here.
436  */
437  if (!RecoveryInProgress())
438  ereport(FATAL,
439  (errmsg("cannot continue WAL streaming, recovery has already ended")));
440 
441  /* Process any requests or signals received recently */
443 
444  if (got_SIGHUP)
445  {
446  got_SIGHUP = false;
449  }
450 
451  /* See if we can read data immediately */
452  len = walrcv_receive(wrconn, &buf, &wait_fd);
453  if (len != 0)
454  {
455  /*
456  * Process the received data, and any subsequent data we
457  * can read without blocking.
458  */
459  for (;;)
460  {
461  if (len > 0)
462  {
463  /*
464  * Something was received from primary, so reset
465  * timeout
466  */
467  last_recv_timestamp = GetCurrentTimestamp();
468  ping_sent = false;
469  XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
470  }
471  else if (len == 0)
472  break;
473  else if (len < 0)
474  {
475  ereport(LOG,
476  (errmsg("replication terminated by primary server"),
477  errdetail("End of WAL reached on timeline %u at %X/%X.",
478  startpointTLI,
479  (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
480  endofwal = true;
481  break;
482  }
483  len = walrcv_receive(wrconn, &buf, &wait_fd);
484  }
485 
486  /* Let the primary know that we received some data. */
487  XLogWalRcvSendReply(false, false);
488 
489  /*
490  * If we've written some records, flush them to disk and
491  * let the startup process and primary server know about
492  * them.
493  */
494  XLogWalRcvFlush(false);
495  }
496 
497  /* Check if we need to exit the streaming loop. */
498  if (endofwal)
499  break;
500 
501  /*
502  * Ideally we would reuse a WaitEventSet object repeatedly
503  * here to avoid the overheads of WaitLatchOrSocket on epoll
504  * systems, but we can't be sure that libpq (or any other
505  * walreceiver implementation) has the same socket (even if
506  * the fd is the same number, it may have been closed and
507  * reopened since the last time). In future, if there is a
508  * function for removing sockets from WaitEventSet, then we
509  * could add and remove just the socket each time, potentially
510  * avoiding some system calls.
511  */
512  Assert(wait_fd != PGINVALID_SOCKET);
513  rc = WaitLatchOrSocket(walrcv->latch,
516  wait_fd,
519  if (rc & WL_LATCH_SET)
520  {
521  ResetLatch(walrcv->latch);
523 
524  if (walrcv->force_reply)
525  {
526  /*
527  * The recovery process has asked us to send apply
528  * feedback now. Make sure the flag is really set to
529  * false in shared memory before sending the reply, so
530  * we don't miss a new request for a reply.
531  */
532  walrcv->force_reply = false;
534  XLogWalRcvSendReply(true, false);
535  }
536  }
537  if (rc & WL_TIMEOUT)
538  {
539  /*
540  * We didn't receive anything new. If we haven't heard
541  * anything from the server for more than
542  * wal_receiver_timeout / 2, ping the server. Also, if
543  * it's been longer than wal_receiver_status_interval
544  * since the last update we sent, send a status update to
545  * the primary anyway, to report any progress in applying
546  * WAL.
547  */
548  bool requestReply = false;
549 
550  /*
551  * Check if time since last receive from standby has
552  * reached the configured limit.
553  */
554  if (wal_receiver_timeout > 0)
555  {
557  TimestampTz timeout;
558 
559  timeout =
560  TimestampTzPlusMilliseconds(last_recv_timestamp,
562 
563  if (now >= timeout)
564  ereport(ERROR,
565  (errmsg("terminating walreceiver due to timeout")));
566 
567  /*
568  * We didn't receive anything new, for half of
569  * receiver replication timeout. Ping the server.
570  */
571  if (!ping_sent)
572  {
573  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
574  (wal_receiver_timeout / 2));
575  if (now >= timeout)
576  {
577  requestReply = true;
578  ping_sent = true;
579  }
580  }
581  }
582 
583  XLogWalRcvSendReply(requestReply, requestReply);
585  }
586  }
587 
588  /*
589  * The backend finished streaming. Exit streaming COPY-mode from
590  * our side, too.
591  */
592  walrcv_endstreaming(wrconn, &primaryTLI);
593 
594  /*
595  * If the server had switched to a new timeline that we didn't
596  * know about when we began streaming, fetch its timeline history
597  * file now.
598  */
599  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
600  }
601  else
602  ereport(LOG,
603  (errmsg("primary server contains no more WAL on requested timeline %u",
604  startpointTLI)));
605 
606  /*
607  * End of WAL reached on the requested timeline. Close the last
608  * segment, and await for new orders from the startup process.
609  */
610  if (recvFile >= 0)
611  {
612  char xlogfname[MAXFNAMELEN];
613 
614  XLogWalRcvFlush(false);
616  if (close(recvFile) != 0)
617  ereport(PANIC,
619  errmsg("could not close log segment %s: %m",
620  xlogfname)));
621 
622  /*
623  * Create .done file forcibly to prevent the streamed segment from
624  * being archived later.
625  */
627  XLogArchiveForceDone(xlogfname);
628  else
629  XLogArchiveNotify(xlogfname);
630  }
631  recvFile = -1;
632 
633  elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
634  WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
635  }
636  /* not reached */
637 }
int sender_port
Definition: walreceiver.h:117
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
Definition: walreceiver.c:724
#define walrcv_get_conninfo(conn)
Definition: walreceiver.h:402
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:414
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
uint32 TimeLineID
Definition: xlogdefs.h:52
slock_t mutex
Definition: walreceiver.h:143
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
int wal_segment_size
Definition: xlog.c:117
#define SIGUSR1
Definition: win32_port.h:171
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
#define SIGCHLD
Definition: win32_port.h:169
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
Definition: walreceiver.c:643
sig_atomic_t force_reply
Definition: walreceiver.h:158
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:406
WalRcvState walRcvState
Definition: walreceiver.h:64
static StringInfoData incoming_message
Definition: walreceiver.c:125
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1135
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:416
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:157
static void WalRcvSigHupHandler(SIGNAL_ARGS)
Definition: walreceiver.c:811
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:412
union WalRcvStreamOptions::@104 proto
void proc_exit(int code)
Definition: ipc.c:104
#define WL_SOCKET_READABLE
Definition: latch.h:125
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
Definition: walreceiver.c:835
#define SIGPIPE
Definition: win32_port.h:164
#define SIGUSR2
Definition: win32_port.h:172
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:8076
#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
Definition: walreceiver.h:420
#define PANIC
Definition: elog.h:53
#define NAPTIME_PER_CYCLE
Definition: walreceiver.c:96
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:98
void ResetLatch(Latch *latch)
Definition: latch.c:588
TimestampTz lastMsgSendTime
Definition: walreceiver.h:97
#define PG_SETMASK(mask)
Definition: pqsignal.h:19
int wal_receiver_timeout
Definition: walreceiver.c:89
Latch procLatch
Definition: proc.h:121
static WalReceiverConn * wrconn
Definition: walreceiver.c:93
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:151
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
static StringInfoData reply_message
Definition: walreceiver.c:124
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:466
void pfree(void *pointer)
Definition: mcxt.c:1057
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:415
#define NI_MAXHOST
Definition: getaddrinfo.h:88
#define ERROR
Definition: elog.h:43
#define FATAL
Definition: elog.h:52
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11500
pid_t pid
Definition: walreceiver.h:63
#define MAXCONNINFO
Definition: walreceiver.h:36
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
static char * buf
Definition: pg_test_fsync.c:68
int XLogArchiveMode
Definition: xlog.c:95
int errdetail(const char *fmt,...)
Definition: elog.c:954
static struct @24 LogstreamResult
int errcode_for_file_access(void)
Definition: elog.c:633
#define SIGHUP
Definition: win32_port.h:159
XLogRecPtr startpoint
Definition: walreceiver.h:168
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:438
unsigned int uint32
Definition: c.h:375
int pgsocket
Definition: port.h:31
sigset_t UnBlockSig
Definition: pqsignal.c:22
static volatile sig_atomic_t got_SIGHUP
Definition: walreceiver.c:111
TimeLineID receiveStartTLI
Definition: walreceiver.h:74
static TimeLineID recvFileTLI
Definition: walreceiver.c:104
#define walrcv_get_backend_pid(conn)
Definition: walreceiver.h:422
Definition: guc.h:72
char * cluster_name
Definition: guc.c:555
Latch * latch
Definition: walreceiver.h:141
#define SIG_IGN
Definition: win32_port.h:156
static char ** options
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
static XLogSegNo recvSegNo
Definition: walreceiver.c:105
#define MAXFNAMELEN
#define SpinLockRelease(lock)
Definition: spin.h:64
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:516
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:94
#define PGINVALID_SOCKET
Definition: port.h:33
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1066
bool ready_to_display
Definition: walreceiver.h:132
TimestampTz latestWalEndTime
Definition: walreceiver.h:104
static void WalRcvShutdownHandler(SIGNAL_ARGS)
Definition: walreceiver.c:819
TimeLineID ThisTimeLineID
Definition: xlog.c:192
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define ereport(elevel,...)
Definition: elog.h:144
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define pg_memory_barrier()
Definition: atomics.h:145
#define SIG_DFL
Definition: win32_port.h:154
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:746
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
#define SIGALRM
Definition: win32_port.h:165
bool is_temp_slot
Definition: walreceiver.h:129
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1007
WalRcvData * WalRcv
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4913
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:116
int errmsg(const char *fmt,...)
Definition: elog.c:821
static void WalRcvDie(int code, Datum arg)
Definition: walreceiver.c:780
#define elog(elevel,...)
Definition: elog.h:214
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
Definition: walreceiver.h:404
struct WalRcvStreamOptions::@104::@105 physical
#define close(a)
Definition: win32.h:12
XLogRecPtr receiveStart
Definition: walreceiver.h:73
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:551
static int recvFile
Definition: walreceiver.c:103
#define snprintf
Definition: port.h:215
#define WL_LATCH_SET
Definition: latch.h:124
#define UINT64_FORMAT
Definition: c.h:418
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542
char slotname[NAMEDATALEN]
Definition: walreceiver.h:123
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:110
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:398

◆ XLogWalRcvFlush()

static void XLogWalRcvFlush ( bool  dying)
static

Definition at line 1007 of file walreceiver.c.

References AllowCascadeReplication, WalRcvData::flushedUpto, issue_xlog_fsync(), WalRcvData::latestChunkStart, LogstreamResult, WalRcvData::mutex, WalRcvData::receivedTLI, recvFile, recvSegNo, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, ThisTimeLineID, update_process_title, WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvWrite().

1008 {
1009  if (LogstreamResult.Flush < LogstreamResult.Write)
1010  {
1011  WalRcvData *walrcv = WalRcv;
1012 
1014 
1015  LogstreamResult.Flush = LogstreamResult.Write;
1016 
1017  /* Update shared-memory status */
1018  SpinLockAcquire(&walrcv->mutex);
1019  if (walrcv->flushedUpto < LogstreamResult.Flush)
1020  {
1021  walrcv->latestChunkStart = walrcv->flushedUpto;
1022  walrcv->flushedUpto = LogstreamResult.Flush;
1023  walrcv->receivedTLI = ThisTimeLineID;
1024  }
1025  SpinLockRelease(&walrcv->mutex);
1026 
1027  /* Signal the startup process and walsender that new WAL has arrived */
1028  WakeupRecovery();
1030  WalSndWakeup();
1031 
1032  /* Report XLOG streaming progress in PS display */
1034  {
1035  char activitymsg[50];
1036 
1037  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1038  (uint32) (LogstreamResult.Write >> 32),
1039  (uint32) LogstreamResult.Write);
1040  set_ps_display(activitymsg);
1041  }
1042 
1043  /* Also let the primary know that we made some progress */
1044  if (!dying)
1045  {
1046  XLogWalRcvSendReply(false, false);
1047  XLogWalRcvSendHSFeedback(false);
1048  }
1049  }
1050 }
slock_t mutex
Definition: walreceiver.h:143
bool update_process_title
Definition: ps_status.c:36
void issue_xlog_fsync(int fd, XLogSegNo segno)
Definition: xlog.c:10428
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1135
TimeLineID receivedTLI
Definition: walreceiver.h:84
void set_ps_display(const char *activity)
Definition: ps_status.c:349
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12675
XLogRecPtr latestChunkStart
Definition: walreceiver.h:92
static struct @24 LogstreamResult
#define AllowCascadeReplication()
Definition: walreceiver.h:39
unsigned int uint32
Definition: c.h:375
static XLogSegNo recvSegNo
Definition: walreceiver.c:105
#define SpinLockRelease(lock)
Definition: spin.h:64
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1066
TimeLineID ThisTimeLineID
Definition: xlog.c:192
WalRcvData * WalRcv
static int recvFile
Definition: walreceiver.c:103
XLogRecPtr flushedUpto
Definition: walreceiver.h:83
#define snprintf
Definition: port.h:215
void WalSndWakeup(void)
Definition: walsender.c:3120

◆ XLogWalRcvProcessMsg()

static void XLogWalRcvProcessMsg ( unsigned char  type,
char *  buf,
Size  len 
)
static

Definition at line 835 of file walreceiver.c.

References appendBinaryStringInfo(), ereport, errcode(), errmsg_internal(), ERROR, pq_getmsgbyte(), pq_getmsgint64(), ProcessWalSndrMessage(), resetStringInfo(), XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

836 {
837  int hdrlen;
838  XLogRecPtr dataStart;
839  XLogRecPtr walEnd;
840  TimestampTz sendTime;
841  bool replyRequested;
842 
844 
845  switch (type)
846  {
847  case 'w': /* WAL records */
848  {
849  /* copy message to StringInfo */
850  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
851  if (len < hdrlen)
852  ereport(ERROR,
853  (errcode(ERRCODE_PROTOCOL_VIOLATION),
854  errmsg_internal("invalid WAL message received from primary")));
856 
857  /* read the fields */
858  dataStart = pq_getmsgint64(&incoming_message);
859  walEnd = pq_getmsgint64(&incoming_message);
860  sendTime = pq_getmsgint64(&incoming_message);
861  ProcessWalSndrMessage(walEnd, sendTime);
862 
863  buf += hdrlen;
864  len -= hdrlen;
865  XLogWalRcvWrite(buf, len, dataStart);
866  break;
867  }
868  case 'k': /* Keepalive */
869  {
870  /* copy message to StringInfo */
871  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
872  if (len != hdrlen)
873  ereport(ERROR,
874  (errcode(ERRCODE_PROTOCOL_VIOLATION),
875  errmsg_internal("invalid keepalive message received from primary")));
877 
878  /* read the fields */
879  walEnd = pq_getmsgint64(&incoming_message);
880  sendTime = pq_getmsgint64(&incoming_message);
881  replyRequested = pq_getmsgbyte(&incoming_message);
882 
883  ProcessWalSndrMessage(walEnd, sendTime);
884 
885  /* If the primary requested a reply, send one immediately */
886  if (replyRequested)
887  XLogWalRcvSendReply(true, false);
888  break;
889  }
890  default:
891  ereport(ERROR,
892  (errcode(ERRCODE_PROTOCOL_VIOLATION),
893  errmsg_internal("invalid replication message type %d",
894  type)));
895  }
896 }
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
Definition: walreceiver.c:1236
int64 TimestampTz
Definition: timestamp.h:39
static StringInfoData incoming_message
Definition: walreceiver.c:125
int errcode(int sqlerrcode)
Definition: elog.c:610
#define ERROR
Definition: elog.h:43
static char * buf
Definition: pg_test_fsync.c:68
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1066
#define ereport(elevel,...)
Definition: elog.h:144
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
int errmsg_internal(const char *fmt,...)
Definition: elog.c:908
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
Definition: walreceiver.c:902

◆ XLogWalRcvSendHSFeedback()

static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1135 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, EpochFromFullTransactionId, GetCurrentTimestamp(), GetReplicationHorizons(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), pq_sendbyte(), pq_sendint32(), pq_sendint64(), ReadNextFullTransactionId(), resetStringInfo(), TimestampDifferenceExceeds(), TransactionIdIsValid, wal_receiver_status_interval, walrcv_send, and XidFromFullTransactionId.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

1136 {
1137  TimestampTz now;
1138  FullTransactionId nextFullXid;
1139  TransactionId nextXid;
1140  uint32 xmin_epoch,
1141  catalog_xmin_epoch;
1142  TransactionId xmin,
1143  catalog_xmin;
1144  static TimestampTz sendTime = 0;
1145 
1146  /* initially true so we always send at least one feedback message */
1147  static bool primary_has_standby_xmin = true;
1148 
1149  /*
1150  * If the user doesn't want status to be reported to the primary, be sure
1151  * to exit before doing anything at all.
1152  */
1154  !primary_has_standby_xmin)
1155  return;
1156 
1157  /* Get current timestamp. */
1158  now = GetCurrentTimestamp();
1159 
1160  if (!immed)
1161  {
1162  /*
1163  * Send feedback at most once per wal_receiver_status_interval.
1164  */
1165  if (!TimestampDifferenceExceeds(sendTime, now,
1167  return;
1168  sendTime = now;
1169  }
1170 
1171  /*
1172  * If Hot Standby is not yet accepting connections there is nothing to
1173  * send. Check this after the interval has expired to reduce number of
1174  * calls.
1175  *
1176  * Bailing out here also ensures that we don't send feedback until we've
1177  * read our own replication slot state, so we don't tell the primary to
1178  * discard needed xmin or catalog_xmin from any slots that may exist on
1179  * this replica.
1180  */
1181  if (!HotStandbyActive())
1182  return;
1183 
1184  /*
1185  * Make the expensive call to get the oldest xmin once we are certain
1186  * everything else has been checked.
1187  */
1189  {
1190  GetReplicationHorizons(&xmin, &catalog_xmin);
1191  }
1192  else
1193  {
1194  xmin = InvalidTransactionId;
1195  catalog_xmin = InvalidTransactionId;
1196  }
1197 
1198  /*
1199  * Get epoch and adjust if nextXid and oldestXmin are different sides of
1200  * the epoch boundary.
1201  */
1202  nextFullXid = ReadNextFullTransactionId();
1203  nextXid = XidFromFullTransactionId(nextFullXid);
1204  xmin_epoch = EpochFromFullTransactionId(nextFullXid);
1205  catalog_xmin_epoch = xmin_epoch;
1206  if (nextXid < xmin)
1207  xmin_epoch--;
1208  if (nextXid < catalog_xmin)
1209  catalog_xmin_epoch--;
1210 
1211  elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1212  xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1213 
1214  /* Construct the message and send it. */
1216  pq_sendbyte(&reply_message, 'h');
1218  pq_sendint32(&reply_message, xmin);
1219  pq_sendint32(&reply_message, xmin_epoch);
1220  pq_sendint32(&reply_message, catalog_xmin);
1221  pq_sendint32(&reply_message, catalog_xmin_epoch);
1223  if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1224  primary_has_standby_xmin = true;
1225  else
1226  primary_has_standby_xmin = false;
1227 }
bool hot_standby_feedback
Definition: walreceiver.c:90
uint32 TransactionId
Definition: c.h:521
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
int64 TimestampTz
Definition: timestamp.h:39
bool HotStandbyActive(void)
Definition: xlog.c:8150
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
int wal_receiver_status_interval
Definition: walreceiver.c:88
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1677
static WalReceiverConn * wrconn
Definition: walreceiver.c:93
#define XidFromFullTransactionId(x)
Definition: transam.h:48
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static StringInfoData reply_message
Definition: walreceiver.c:124
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
unsigned int uint32
Definition: c.h:375
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:261
void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:1946
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:418
#define elog(elevel,...)
Definition: elog.h:214
#define TransactionIdIsValid(xid)
Definition: transam.h:41
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542

◆ XLogWalRcvSendReply()

static void XLogWalRcvSendReply ( bool  force,
bool  requestReply 
)
static

Definition at line 1066 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), GetXLogReplayRecPtr(), StringInfoData::len, LogstreamResult, now(), pq_sendbyte(), pq_sendint64(), resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

1067 {
1068  static XLogRecPtr writePtr = 0;
1069  static XLogRecPtr flushPtr = 0;
1070  XLogRecPtr applyPtr;
1071  static TimestampTz sendTime = 0;
1072  TimestampTz now;
1073 
1074  /*
1075  * If the user doesn't want status to be reported to the primary, be sure
1076  * to exit before doing anything at all.
1077  */
1078  if (!force && wal_receiver_status_interval <= 0)
1079  return;
1080 
1081  /* Get current timestamp. */
1082  now = GetCurrentTimestamp();
1083 
1084  /*
1085  * We can compare the write and flush positions to the last message we
1086  * sent without taking any lock, but the apply position requires a spin
1087  * lock, so we don't check that unless something else has changed or 10
1088  * seconds have passed. This means that the apply WAL location will
1089  * appear, from the primary's point of view, to lag slightly, but since
1090  * this is only for reporting purposes and only on idle systems, that's
1091  * probably OK.
1092  */
1093  if (!force
1094  && writePtr == LogstreamResult.Write
1095  && flushPtr == LogstreamResult.Flush
1096  && !TimestampDifferenceExceeds(sendTime, now,
1098  return;
1099  sendTime = now;
1100 
1101  /* Construct a new message */
1102  writePtr = LogstreamResult.Write;
1103  flushPtr = LogstreamResult.Flush;
1104  applyPtr = GetXLogReplayRecPtr(NULL);
1105 
1107  pq_sendbyte(&reply_message, 'r');
1108  pq_sendint64(&reply_message, writePtr);
1109  pq_sendint64(&reply_message, flushPtr);
1110  pq_sendint64(&reply_message, applyPtr);
1112  pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1113 
1114  /* Send it */
1115  elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1116  (uint32) (writePtr >> 32), (uint32) writePtr,
1117  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1118  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1119  requestReply ? " (reply requested)" : "");
1120 
1122 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
int64 TimestampTz
Definition: timestamp.h:39
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
int wal_receiver_status_interval
Definition: walreceiver.c:88
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1677
static WalReceiverConn * wrconn
Definition: walreceiver.c:93
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static StringInfoData reply_message
Definition: walreceiver.c:124
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11500
#define DEBUG2
Definition: elog.h:24
static struct @24 LogstreamResult
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
unsigned int uint32
Definition: c.h:375
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:418
#define elog(elevel,...)
Definition: elog.h:214
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542

◆ XLogWalRcvWrite()

static void XLogWalRcvWrite ( char *  buf,
Size  nbytes,
XLogRecPtr  recptr 
)
static

Definition at line 902 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, close, ereport, errcode_for_file_access(), errmsg(), LogstreamResult, MAXFNAMELEN, PANIC, pg_atomic_write_u64(), pg_pwrite(), recvFile, recvFileTLI, recvSegNo, ThisTimeLineID, wal_segment_size, WalRcv, WalRcvData::writtenUpto, XLByteInSeg, XLByteToSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileInit(), XLogFileName, XLogSegmentOffset, and XLogWalRcvFlush().

Referenced by XLogWalRcvProcessMsg().

903 {
904  int startoff;
905  int byteswritten;
906 
907  while (nbytes > 0)
908  {
909  int segbytes;
910 
911  if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
912  {
913  bool use_existent;
914 
915  /*
916  * fsync() and close current file before we switch to next one. We
917  * would otherwise have to reopen this file to fsync it later
918  */
919  if (recvFile >= 0)
920  {
921  char xlogfname[MAXFNAMELEN];
922 
923  XLogWalRcvFlush(false);
924 
926 
927  /*
928  * XLOG segment files will be re-read by recovery in startup
929  * process soon, so we don't advise the OS to release cache
930  * pages associated with the file like XLogFileClose() does.
931  */
932  if (close(recvFile) != 0)
933  ereport(PANIC,
935  errmsg("could not close log segment %s: %m",
936  xlogfname)));
937 
938  /*
939  * Create .done file forcibly to prevent the streamed segment
940  * from being archived later.
941  */
943  XLogArchiveForceDone(xlogfname);
944  else
945  XLogArchiveNotify(xlogfname);
946  }
947  recvFile = -1;
948 
949  /* Create/use new log file */
951  use_existent = true;
952  recvFile = XLogFileInit(recvSegNo, &use_existent, true);
954  }
955 
956  /* Calculate the start offset of the received logs */
957  startoff = XLogSegmentOffset(recptr, wal_segment_size);
958 
959  if (startoff + nbytes > wal_segment_size)
960  segbytes = wal_segment_size - startoff;
961  else
962  segbytes = nbytes;
963 
964  /* OK to write the logs */
965  errno = 0;
966 
967  byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
968  if (byteswritten <= 0)
969  {
970  char xlogfname[MAXFNAMELEN];
971  int save_errno;
972 
973  /* if write didn't set errno, assume no disk space */
974  if (errno == 0)
975  errno = ENOSPC;
976 
977  save_errno = errno;
979  errno = save_errno;
980  ereport(PANIC,
982  errmsg("could not write to log segment %s "
983  "at offset %u, length %lu: %m",
984  xlogfname, startoff, (unsigned long) segbytes)));
985  }
986 
987  /* Update state for write */
988  recptr += byteswritten;
989 
990  nbytes -= byteswritten;
991  buf += byteswritten;
992 
993  LogstreamResult.Write = recptr;
994  }
995 
996  /* Update shared-memory status */
998 }
int wal_segment_size
Definition: xlog.c:117
int XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
Definition: xlog.c:3254
#define PANIC
Definition: elog.h:53
ssize_t pg_pwrite(int fd, const void *buf, size_t nbyte, off_t offset)
Definition: pwrite.c:27
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:438
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:151
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:466
static char * buf
Definition: pg_test_fsync.c:68
int XLogArchiveMode
Definition: xlog.c:95
static struct @24 LogstreamResult
int errcode_for_file_access(void)
Definition: elog.c:633
static TimeLineID recvFileTLI
Definition: walreceiver.c:104
static XLogSegNo recvSegNo
Definition: walreceiver.c:105
#define MAXFNAMELEN
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:516
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
TimeLineID ThisTimeLineID
Definition: xlog.c:192
#define ereport(elevel,...)
Definition: elog.h:144
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1007
WalRcvData * WalRcv
int errmsg(const char *fmt,...)
Definition: elog.c:821
#define close(a)
Definition: win32.h:12
static int recvFile
Definition: walreceiver.c:103
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

Variable Documentation

◆ Flush

XLogRecPtr Flush

Definition at line 121 of file walreceiver.c.

◆ got_SIGHUP

volatile sig_atomic_t got_SIGHUP = false
static

Definition at line 111 of file walreceiver.c.

Referenced by WalRcvSigHupHandler(), and WalReceiverMain().

◆ got_SIGTERM

volatile sig_atomic_t got_SIGTERM = false
static

Definition at line 112 of file walreceiver.c.

Referenced by ProcessWalRcvInterrupts(), and WalRcvShutdownHandler().

◆ hot_standby_feedback

bool hot_standby_feedback

Definition at line 90 of file walreceiver.c.

Referenced by XLogWalRcvSendHSFeedback().

◆ incoming_message

StringInfoData incoming_message
static

Definition at line 125 of file walreceiver.c.

◆ LogstreamResult

struct { ... } LogstreamResult

◆ recvFile

int recvFile = -1
static

Definition at line 103 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ recvFileTLI

TimeLineID recvFileTLI = 0
static

Definition at line 104 of file walreceiver.c.

Referenced by WalReceiverMain(), and XLogWalRcvWrite().

◆ recvSegNo

XLogSegNo recvSegNo = 0
static

Definition at line 105 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ reply_message

StringInfoData reply_message
static

Definition at line 124 of file walreceiver.c.

Referenced by send_feedback().

◆ wal_receiver_status_interval

int wal_receiver_status_interval

Definition at line 88 of file walreceiver.c.

Referenced by send_feedback(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ wal_receiver_timeout

int wal_receiver_timeout

Definition at line 89 of file walreceiver.c.

Referenced by logicalrep_worker_launch(), LogicalRepApplyLoop(), and WalReceiverMain().

◆ WalReceiverFunctions

WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 94 of file walreceiver.c.

Referenced by _PG_init().

◆ wrconn

WalReceiverConn* wrconn = NULL
static

Definition at line 93 of file walreceiver.c.

◆ Write

XLogRecPtr Write

Definition at line 120 of file walreceiver.c.