PostgreSQL Source Code  git master
walreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "common/ip.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */
 

Functions

static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr)
 
static void XLogWalRcvFlush (bool dying)
 
static void XLogWalRcvSendReply (bool force, bool requestReply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
static void WalRcvSigHupHandler (SIGNAL_ARGS)
 
static void WalRcvShutdownHandler (SIGNAL_ARGS)
 
void ProcessWalRcvInterrupts (void)
 
void WalReceiverMain (void)
 
void WalRcvForceReply (void)
 
static const char * WalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
static volatile sig_atomic_t got_SIGHUP = false
 
static volatile sig_atomic_t got_SIGTERM = false
 
struct {
   XLogRecPtr   Write
 
   XLogRecPtr   Flush
 
LogstreamResult
 
static StringInfoData reply_message
 
static StringInfoData incoming_message
 

Macro Definition Documentation

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */

Definition at line 96 of file walreceiver.c.

Referenced by WalReceiverMain().

Function Documentation

◆ pg_stat_get_wal_receiver()

Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1341 of file walreceiver.c.

References WalRcvData::conninfo, CStringGetTextDatum, elog, ERROR, WalRcvData::flushedUpto, get_call_result_type(), GetUserId(), heap_form_tuple(), HeapTupleGetDatum, Int32GetDatum, is_member_of_role(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum, MAXCONNINFO, MemSet, WalRcvData::mutex, NAMEDATALEN, TupleDescData::natts, NI_MAXHOST, palloc0(), pg_atomic_read_u64(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::sender_host, WalRcvData::sender_port, WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, strlcpy(), TimestampTzGetDatum, TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, WalRcvData::writtenUpto, and XLogRecPtrIsInvalid.

1342 {
1343  TupleDesc tupdesc;
1344  Datum *values;
1345  bool *nulls;
1346  int pid;
1347  bool ready_to_display;
1349  XLogRecPtr receive_start_lsn;
1350  TimeLineID receive_start_tli;
1351  XLogRecPtr written_lsn;
1352  XLogRecPtr flushed_lsn;
1353  TimeLineID received_tli;
1354  TimestampTz last_send_time;
1355  TimestampTz last_receipt_time;
1356  XLogRecPtr latest_end_lsn;
1357  TimestampTz latest_end_time;
1358  char sender_host[NI_MAXHOST];
1359  int sender_port = 0;
1360  char slotname[NAMEDATALEN];
1361  char conninfo[MAXCONNINFO];
1362 
1363  /* Take a lock to ensure value consistency */
1365  pid = (int) WalRcv->pid;
1366  ready_to_display = WalRcv->ready_to_display;
1367  state = WalRcv->walRcvState;
1368  receive_start_lsn = WalRcv->receiveStart;
1369  receive_start_tli = WalRcv->receiveStartTLI;
1370  written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto);
1371  flushed_lsn = WalRcv->flushedUpto;
1372  received_tli = WalRcv->receivedTLI;
1373  last_send_time = WalRcv->lastMsgSendTime;
1374  last_receipt_time = WalRcv->lastMsgReceiptTime;
1375  latest_end_lsn = WalRcv->latestWalEnd;
1376  latest_end_time = WalRcv->latestWalEndTime;
1377  strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1378  strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
1379  sender_port = WalRcv->sender_port;
1380  strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1382 
1383  /*
1384  * No WAL receiver (or not ready yet), just return a tuple with NULL
1385  * values
1386  */
1387  if (pid == 0 || !ready_to_display)
1388  PG_RETURN_NULL();
1389 
1390  /* determine result type */
1391  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1392  elog(ERROR, "return type must be a row type");
1393 
1394  values = palloc0(sizeof(Datum) * tupdesc->natts);
1395  nulls = palloc0(sizeof(bool) * tupdesc->natts);
1396 
1397  /* Fetch values */
1398  values[0] = Int32GetDatum(pid);
1399 
1400  if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
1401  {
1402  /*
1403  * Only superusers and members of pg_read_all_stats can see details.
1404  * Other users only get the pid value to know whether it is a WAL
1405  * receiver, but no details.
1406  */
1407  MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1408  }
1409  else
1410  {
1411  values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1412 
1413  if (XLogRecPtrIsInvalid(receive_start_lsn))
1414  nulls[2] = true;
1415  else
1416  values[2] = LSNGetDatum(receive_start_lsn);
1417  values[3] = Int32GetDatum(receive_start_tli);
1418  if (XLogRecPtrIsInvalid(written_lsn))
1419  nulls[4] = true;
1420  else
1421  values[4] = LSNGetDatum(written_lsn);
1422  if (XLogRecPtrIsInvalid(flushed_lsn))
1423  nulls[5] = true;
1424  else
1425  values[5] = LSNGetDatum(flushed_lsn);
1426  values[6] = Int32GetDatum(received_tli);
1427  if (last_send_time == 0)
1428  nulls[7] = true;
1429  else
1430  values[7] = TimestampTzGetDatum(last_send_time);
1431  if (last_receipt_time == 0)
1432  nulls[8] = true;
1433  else
1434  values[8] = TimestampTzGetDatum(last_receipt_time);
1435  if (XLogRecPtrIsInvalid(latest_end_lsn))
1436  nulls[9] = true;
1437  else
1438  values[9] = LSNGetDatum(latest_end_lsn);
1439  if (latest_end_time == 0)
1440  nulls[10] = true;
1441  else
1442  values[10] = TimestampTzGetDatum(latest_end_time);
1443  if (*slotname == '\0')
1444  nulls[11] = true;
1445  else
1446  values[11] = CStringGetTextDatum(slotname);
1447  if (*sender_host == '\0')
1448  nulls[12] = true;
1449  else
1450  values[12] = CStringGetTextDatum(sender_host);
1451  if (sender_port == 0)
1452  nulls[13] = true;
1453  else
1454  values[13] = Int32GetDatum(sender_port);
1455  if (*conninfo == '\0')
1456  nulls[14] = true;
1457  else
1458  values[14] = CStringGetTextDatum(conninfo);
1459  }
1460 
1461  /* Returns the record as Datum */
1462  PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1463 }
int sender_port
Definition: walreceiver.h:117
static const char * WalRcvGetStateString(WalRcvState state)
Definition: walreceiver.c:1316
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:205
uint32 TimeLineID
Definition: xlogdefs.h:52
slock_t mutex
Definition: walreceiver.h:143
Oid GetUserId(void)
Definition: miscinit.c:448
int64 TimestampTz
Definition: timestamp.h:39
WalRcvState walRcvState
Definition: walreceiver.h:64
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
TimeLineID receivedTLI
Definition: walreceiver.h:84
#define MemSet(start, val, len)
Definition: c.h:971
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:98
TimestampTz lastMsgSendTime
Definition: walreceiver.h:97
WalRcvState
Definition: walreceiver.h:44
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:151
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define NI_MAXHOST
Definition: getaddrinfo.h:88
#define ERROR
Definition: elog.h:43
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
pid_t pid
Definition: walreceiver.h:63
#define MAXCONNINFO
Definition: walreceiver.h:36
TimeLineID receiveStartTLI
Definition: walreceiver.h:74
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
void * palloc0(Size size)
Definition: mcxt.c:980
uintptr_t Datum
Definition: postgres.h:367
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:352
bool ready_to_display
Definition: walreceiver.h:132
TimestampTz latestWalEndTime
Definition: walreceiver.h:104
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4916
XLogRecPtr latestWalEnd
Definition: walreceiver.h:103
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:220
WalRcvData * WalRcv
static Datum values[MAXATTR]
Definition: bootstrap.c:167
#define Int32GetDatum(X)
Definition: postgres.h:479
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:429
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:116
#define elog(elevel,...)
Definition: elog.h:214
#define CStringGetTextDatum(s)
Definition: builtins.h:87
XLogRecPtr receiveStart
Definition: walreceiver.h:73
XLogRecPtr flushedUpto
Definition: walreceiver.h:83
#define PG_RETURN_NULL()
Definition: fmgr.h:344
char slotname[NAMEDATALEN]
Definition: walreceiver.h:123
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:110

◆ ProcessWalRcvInterrupts()

void ProcessWalRcvInterrupts ( void  )

Definition at line 157 of file walreceiver.c.

References CHECK_FOR_INTERRUPTS, ereport, errcode(), errmsg(), FATAL, and got_SIGTERM.

Referenced by libpqrcv_connect(), libpqrcv_PQgetResult(), libpqrcv_processTuples(), walrcv_clear_result(), WalRcvWaitForStartPosition(), and WalReceiverMain().

158 {
159  /*
160  * Although walreceiver interrupt handling doesn't use the same scheme as
161  * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
162  * any incoming signals on Win32, and also to make sure we process any
163  * barrier events.
164  */
166 
167  if (got_SIGTERM)
168  {
169  ereport(FATAL,
170  (errcode(ERRCODE_ADMIN_SHUTDOWN),
171  errmsg("terminating walreceiver process due to administrator command")));
172  }
173 }
int errcode(int sqlerrcode)
Definition: elog.c:610
#define FATAL
Definition: elog.h:52
#define ereport(elevel,...)
Definition: elog.h:144
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static volatile sig_atomic_t got_SIGTERM
Definition: walreceiver.c:112

◆ ProcessWalSndrMessage()

static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1245 of file walreceiver.c.

References DEBUG2, elog, GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, log_min_messages, WalRcvData::mutex, pfree(), pstrdup(), SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

1246 {
1247  WalRcvData *walrcv = WalRcv;
1248 
1249  TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1250 
1251  /* Update shared-memory status */
1252  SpinLockAcquire(&walrcv->mutex);
1253  if (walrcv->latestWalEnd < walEnd)
1254  walrcv->latestWalEndTime = sendTime;
1255  walrcv->latestWalEnd = walEnd;
1256  walrcv->lastMsgSendTime = sendTime;
1257  walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1258  SpinLockRelease(&walrcv->mutex);
1259 
1260  if (log_min_messages <= DEBUG2)
1261  {
1262  char *sendtime;
1263  char *receipttime;
1264  int applyDelay;
1265 
1266  /* Copy because timestamptz_to_str returns a static buffer */
1267  sendtime = pstrdup(timestamptz_to_str(sendTime));
1268  receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1269  applyDelay = GetReplicationApplyDelay();
1270 
1271  /* apply delay is not available */
1272  if (applyDelay == -1)
1273  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1274  sendtime,
1275  receipttime,
1277  else
1278  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1279  sendtime,
1280  receipttime,
1281  applyDelay,
1283 
1284  pfree(sendtime);
1285  pfree(receipttime);
1286  }
1287 }
slock_t mutex
Definition: walreceiver.h:143
int GetReplicationTransferLatency(void)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1186
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:98
TimestampTz lastMsgSendTime
Definition: walreceiver.h:97
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pfree(void *pointer)
Definition: mcxt.c:1056
#define DEBUG2
Definition: elog.h:24
int GetReplicationApplyDelay(void)
#define SpinLockRelease(lock)
Definition: spin.h:64
TimestampTz latestWalEndTime
Definition: walreceiver.h:104
int log_min_messages
Definition: guc.c:543
XLogRecPtr latestWalEnd
Definition: walreceiver.h:103
WalRcvData * WalRcv
#define elog(elevel,...)
Definition: elog.h:214
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1736

◆ WalRcvDie()

static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 774 of file walreceiver.c.

References Assert, WalRcvData::latch, WalRcvData::mutex, MyProcPid, WalRcvData::pid, WalRcvData::ready_to_display, SpinLockAcquire, SpinLockRelease, WakeupRecovery(), WalRcv, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

775 {
776  WalRcvData *walrcv = WalRcv;
777 
778  /* Ensure that all WAL records received are flushed to disk */
779  XLogWalRcvFlush(true);
780 
781  /* Mark ourselves inactive in shared memory */
782  SpinLockAcquire(&walrcv->mutex);
783  Assert(walrcv->walRcvState == WALRCV_STREAMING ||
784  walrcv->walRcvState == WALRCV_RESTARTING ||
785  walrcv->walRcvState == WALRCV_STARTING ||
786  walrcv->walRcvState == WALRCV_WAITING ||
787  walrcv->walRcvState == WALRCV_STOPPING);
788  Assert(walrcv->pid == MyProcPid);
789  walrcv->walRcvState = WALRCV_STOPPED;
790  walrcv->pid = 0;
791  walrcv->ready_to_display = false;
792  walrcv->latch = NULL;
793  SpinLockRelease(&walrcv->mutex);
794 
795  /* Terminate the connection gracefully. */
796  if (wrconn != NULL)
798 
799  /* Wake up the startup process to notice promptly that we're gone */
800  WakeupRecovery();
801 }
int MyProcPid
Definition: globals.c:40
slock_t mutex
Definition: walreceiver.h:143
WalRcvState walRcvState
Definition: walreceiver.h:64
static WalReceiverConn * wrconn
Definition: walreceiver.c:93
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12660
pid_t pid
Definition: walreceiver.h:63
Latch * latch
Definition: walreceiver.h:141
#define SpinLockRelease(lock)
Definition: spin.h:64
bool ready_to_display
Definition: walreceiver.h:132
#define Assert(condition)
Definition: c.h:738
#define walrcv_disconnect(conn)
Definition: walreceiver.h:300
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1001
WalRcvData * WalRcv

◆ WalRcvFetchTimeLineHistoryFiles()

static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 727 of file walreceiver.c.

References ereport, errcode(), errmsg(), errmsg_internal(), ERROR, existsTimeLineHistory(), LOG, MAXFNAMELEN, pfree(), TLHistoryFileName, walrcv_readtimelinehistoryfile, and writeTimeLineHistoryFile().

Referenced by WalReceiverMain().

728 {
729  TimeLineID tli;
730 
731  for (tli = first; tli <= last; tli++)
732  {
733  /* there's no history file for timeline 1 */
734  if (tli != 1 && !existsTimeLineHistory(tli))
735  {
736  char *fname;
737  char *content;
738  int len;
739  char expectedfname[MAXFNAMELEN];
740 
741  ereport(LOG,
742  (errmsg("fetching timeline history file for timeline %u from primary server",
743  tli)));
744 
745  walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
746 
747  /*
748  * Check that the filename on the master matches what we
749  * calculated ourselves. This is just a sanity check, it should
750  * always match.
751  */
752  TLHistoryFileName(expectedfname, tli);
753  if (strcmp(fname, expectedfname) != 0)
754  ereport(ERROR,
755  (errcode(ERRCODE_PROTOCOL_VIOLATION),
756  errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
757  tli)));
758 
759  /*
760  * Write the file to pg_wal.
761  */
762  writeTimeLineHistoryFile(tli, content, len);
763 
764  pfree(fname);
765  pfree(content);
766  }
767  }
768 }
uint32 TimeLineID
Definition: xlogdefs.h:52
int errcode(int sqlerrcode)
Definition: elog.c:610
#define LOG
Definition: elog.h:26
static WalReceiverConn * wrconn
Definition: walreceiver.c:93
void pfree(void *pointer)
Definition: mcxt.c:1056
#define ERROR
Definition: elog.h:43
#define TLHistoryFileName(fname, tli)
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition: timeline.c:467
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition: timeline.c:222
#define MAXFNAMELEN
#define ereport(elevel,...)
Definition: elog.h:144
int errmsg_internal(const char *fmt,...)
Definition: elog.c:911
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
Definition: walreceiver.h:284
int errmsg(const char *fmt,...)
Definition: elog.c:824

◆ WalRcvForceReply()

void WalRcvForceReply ( void  )

Definition at line 1298 of file walreceiver.c.

References WalRcvData::force_reply, WalRcvData::latch, WalRcvData::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by StartupXLOG(), WaitForWALToBecomeAvailable(), and walrcv_clear_result().

1299 {
1300  Latch *latch;
1301 
1302  WalRcv->force_reply = true;
1303  /* fetching the latch pointer might not be atomic, so use spinlock */
1305  latch = WalRcv->latch;
1307  if (latch)
1308  SetLatch(latch);
1309 }
slock_t mutex
Definition: walreceiver.h:143
sig_atomic_t force_reply
Definition: walreceiver.h:158
void SetLatch(Latch *latch)
Definition: latch.c:457
#define SpinLockAcquire(lock)
Definition: spin.h:62
Definition: latch.h:110
Latch * latch
Definition: walreceiver.h:141
#define SpinLockRelease(lock)
Definition: spin.h:64
WalRcvData * WalRcv

◆ WalRcvGetStateString()

static const char* WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1316 of file walreceiver.c.

References WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

1317 {
1318  switch (state)
1319  {
1320  case WALRCV_STOPPED:
1321  return "stopped";
1322  case WALRCV_STARTING:
1323  return "starting";
1324  case WALRCV_STREAMING:
1325  return "streaming";
1326  case WALRCV_WAITING:
1327  return "waiting";
1328  case WALRCV_RESTARTING:
1329  return "restarting";
1330  case WALRCV_STOPPING:
1331  return "stopping";
1332  }
1333  return "UNKNOWN";
1334 }
Definition: regguts.h:298

◆ WalRcvShutdownHandler()

static void WalRcvShutdownHandler ( SIGNAL_ARGS  )
static

Definition at line 813 of file walreceiver.c.

References got_SIGTERM, WalRcvData::latch, SetLatch(), and WalRcv.

Referenced by WalReceiverMain().

814 {
815  int save_errno = errno;
816 
817  got_SIGTERM = true;
818 
819  if (WalRcv->latch)
821 
822  errno = save_errno;
823 }
void SetLatch(Latch *latch)
Definition: latch.c:457
Latch * latch
Definition: walreceiver.h:141
WalRcvData * WalRcv
static volatile sig_atomic_t got_SIGTERM
Definition: walreceiver.c:112

◆ WalRcvSigHupHandler()

static void WalRcvSigHupHandler ( SIGNAL_ARGS  )
static

Definition at line 805 of file walreceiver.c.

References got_SIGHUP.

Referenced by WalReceiverMain().

806 {
807  got_SIGHUP = true;
808 }
static volatile sig_atomic_t got_SIGHUP
Definition: walreceiver.c:111

◆ WalRcvWaitForStartPosition()

static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 646 of file walreceiver.c.

References Assert, elog, FATAL, InvalidXLogRecPtr, WalRcvData::latch, WalRcvData::mutex, proc_exit(), ProcessWalRcvInterrupts(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, ResetLatch(), set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WAIT_EVENT_WAL_RECEIVER_WAIT_START, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by WalReceiverMain().

647 {
648  WalRcvData *walrcv = WalRcv;
649  int state;
650 
651  SpinLockAcquire(&walrcv->mutex);
652  state = walrcv->walRcvState;
653  if (state != WALRCV_STREAMING)
654  {
655  SpinLockRelease(&walrcv->mutex);
656  if (state == WALRCV_STOPPING)
657  proc_exit(0);
658  else
659  elog(FATAL, "unexpected walreceiver state");
660  }
661  walrcv->walRcvState = WALRCV_WAITING;
663  walrcv->receiveStartTLI = 0;
664  SpinLockRelease(&walrcv->mutex);
665 
666  set_ps_display("idle");
667 
668  /*
669  * nudge startup process to notice that we've stopped streaming and are
670  * now waiting for instructions.
671  */
672  WakeupRecovery();
673  for (;;)
674  {
675  ResetLatch(walrcv->latch);
676 
678 
679  SpinLockAcquire(&walrcv->mutex);
680  Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
681  walrcv->walRcvState == WALRCV_WAITING ||
682  walrcv->walRcvState == WALRCV_STOPPING);
683  if (walrcv->walRcvState == WALRCV_RESTARTING)
684  {
685  /*
686  * No need to handle changes in primary_conninfo or
687  * primary_slotname here. Startup process will signal us to
688  * terminate in case those change.
689  */
690  *startpoint = walrcv->receiveStart;
691  *startpointTLI = walrcv->receiveStartTLI;
692  walrcv->walRcvState = WALRCV_STREAMING;
693  SpinLockRelease(&walrcv->mutex);
694  break;
695  }
696  if (walrcv->walRcvState == WALRCV_STOPPING)
697  {
698  /*
699  * We should've received SIGTERM if the startup process wants us
700  * to die, but might as well check it here too.
701  */
702  SpinLockRelease(&walrcv->mutex);
703  exit(1);
704  }
705  SpinLockRelease(&walrcv->mutex);
706 
707  (void) WaitLatch(walrcv->latch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
709  }
710 
712  {
713  char activitymsg[50];
714 
715  snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
716  (uint32) (*startpoint >> 32),
717  (uint32) *startpoint);
718  set_ps_display(activitymsg);
719  }
720 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
slock_t mutex
Definition: walreceiver.h:143
bool update_process_title
Definition: ps_status.c:36
WalRcvState walRcvState
Definition: walreceiver.h:64
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:157
void proc_exit(int code)
Definition: ipc.c:104
void ResetLatch(Latch *latch)
Definition: latch.c:540
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:365
void set_ps_display(const char *activity)
Definition: ps_status.c:349
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12660
#define FATAL
Definition: elog.h:52
unsigned int uint32
Definition: c.h:367
TimeLineID receiveStartTLI
Definition: walreceiver.h:74
Latch * latch
Definition: walreceiver.h:141
#define SpinLockRelease(lock)
Definition: spin.h:64
#define Assert(condition)
Definition: c.h:738
Definition: regguts.h:298
WalRcvData * WalRcv
#define elog(elevel,...)
Definition: elog.h:214
XLogRecPtr receiveStart
Definition: walreceiver.h:73
#define snprintf
Definition: port.h:193
#define WL_LATCH_SET
Definition: latch.h:124
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ WalReceiverMain()

void WalReceiverMain ( void  )

Definition at line 178 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, Assert, BlockSig, buf, close, cluster_name, WalRcvData::conninfo, DEBUG1, elog, ereport, errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, WalRcvData::force_reply, GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), got_SIGHUP, initStringInfo(), WalRcvData::is_temp_slot, WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latch, WalRcvData::latestWalEndTime, load_file(), LOG, WalRcvStreamOptions::logical, LogstreamResult, MAXCONNINFO, MAXFNAMELEN, WalRcvData::mutex, MyProc, MyProcPid, NAMEDATALEN, NAPTIME_PER_CYCLE, NI_MAXHOST, now(), on_shmem_exit(), options, PANIC, pfree(), pg_atomic_init_u64(), pg_memory_barrier, PG_SETMASK, PGC_SIGHUP, PGINVALID_SOCKET, WalRcvStreamOptions::physical, WalRcvData::pid, pqsignal(), proc_exit(), ProcessConfigFile(), ProcessWalRcvInterrupts(), PGPROC::procLatch, procsignal_sigusr1_handler(), WalRcvStreamOptions::proto, WalRcvData::ready_to_display, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, ResetLatch(), WalRcvData::sender_host, WalRcvData::sender_port, SIG_DFL, SIG_IGN, SIGALRM, SIGCHLD, SIGHUP, SignalHandlerForCrashExit(), SIGPIPE, SIGQUIT, SIGUSR1, SIGUSR2, WalRcvData::slotname, WalRcvStreamOptions::slotname, snprintf, SpinLockAcquire, SpinLockRelease, WalRcvStreamOptions::startpoint, strlcpy(), ThisTimeLineID, TimestampTzPlusMilliseconds, UINT64_FORMAT, UnBlockSig, WAIT_EVENT_WAL_RECEIVER_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, wal_segment_size, WalRcv, walrcv_connect, walrcv_create_slot, walrcv_endstreaming, walrcv_get_backend_pid, walrcv_get_conninfo, walrcv_get_senderinfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvShutdownHandler(), WalRcvSigHupHandler(), WalRcvData::walRcvState, WalRcvWaitForStartPosition(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_TIMEOUT, WalRcvData::writtenUpto, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName, XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by AuxiliaryProcessMain(), and walrcv_clear_result().

179 {
180  char conninfo[MAXCONNINFO];
181  char *tmp_conninfo;
182  char slotname[NAMEDATALEN];
183  bool is_temp_slot;
184  XLogRecPtr startpoint;
185  TimeLineID startpointTLI;
186  TimeLineID primaryTLI;
187  bool first_stream;
188  WalRcvData *walrcv = WalRcv;
189  TimestampTz last_recv_timestamp;
191  bool ping_sent;
192  char *err;
193  char *sender_host = NULL;
194  int sender_port = 0;
195 
196  /*
197  * WalRcv should be set up already (if we are a backend, we inherit this
198  * by fork() or EXEC_BACKEND mechanism from the postmaster).
199  */
200  Assert(walrcv != NULL);
201 
202  now = GetCurrentTimestamp();
203 
204  /*
205  * Mark walreceiver as running in shared memory.
206  *
207  * Do this as early as possible, so that if we fail later on, we'll set
208  * state to STOPPED. If we die before this, the startup process will keep
209  * waiting for us to start up, until it times out.
210  */
211  SpinLockAcquire(&walrcv->mutex);
212  Assert(walrcv->pid == 0);
213  switch (walrcv->walRcvState)
214  {
215  case WALRCV_STOPPING:
216  /* If we've already been requested to stop, don't start up. */
217  walrcv->walRcvState = WALRCV_STOPPED;
218  /* fall through */
219 
220  case WALRCV_STOPPED:
221  SpinLockRelease(&walrcv->mutex);
222  proc_exit(1);
223  break;
224 
225  case WALRCV_STARTING:
226  /* The usual case */
227  break;
228 
229  case WALRCV_WAITING:
230  case WALRCV_STREAMING:
231  case WALRCV_RESTARTING:
232  default:
233  /* Shouldn't happen */
234  SpinLockRelease(&walrcv->mutex);
235  elog(PANIC, "walreceiver still running according to shared memory state");
236  }
237  /* Advertise our PID so that the startup process can kill us */
238  walrcv->pid = MyProcPid;
239  walrcv->walRcvState = WALRCV_STREAMING;
240 
241  /* Fetch information required to start streaming */
242  walrcv->ready_to_display = false;
243  strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
244  strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
245  is_temp_slot = walrcv->is_temp_slot;
246  startpoint = walrcv->receiveStart;
247  startpointTLI = walrcv->receiveStartTLI;
248 
249  /*
250  * At most one of is_temp_slot and slotname can be set; otherwise,
251  * RequestXLogStreaming messed up.
252  */
253  Assert(!is_temp_slot || (slotname[0] == '\0'));
254 
255  /* Initialise to a sanish value */
256  walrcv->lastMsgSendTime =
257  walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
258 
259  /* Report the latch to use to awaken this process */
260  walrcv->latch = &MyProc->procLatch;
261 
262  SpinLockRelease(&walrcv->mutex);
263 
265 
266  /* Arrange to clean up at walreceiver exit */
268 
269  /* Properly accept or ignore signals the postmaster might send us */
270  pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */
271  pqsignal(SIGINT, SIG_IGN);
272  pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */
278 
279  /* Reset some signals that are accepted by postmaster but not here */
281 
282  /* We allow SIGQUIT (quickdie) at all times */
283  sigdelset(&BlockSig, SIGQUIT);
284 
285  /* Load the libpq-specific functions */
286  load_file("libpqwalreceiver", false);
287  if (WalReceiverFunctions == NULL)
288  elog(ERROR, "libpqwalreceiver didn't initialize correctly");
289 
290  /* Unblock signals (they were blocked when the postmaster forked us) */
292 
293  /* Establish the connection to the primary for XLOG streaming */
294  wrconn = walrcv_connect(conninfo, false, cluster_name[0] ? cluster_name : "walreceiver", &err);
295  if (!wrconn)
296  ereport(ERROR,
297  (errmsg("could not connect to the primary server: %s", err)));
298 
299  /*
300  * Save user-visible connection string. This clobbers the original
301  * conninfo, for security. Also save host and port of the sender server
302  * this walreceiver is connected to.
303  */
304  tmp_conninfo = walrcv_get_conninfo(wrconn);
305  walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
306  SpinLockAcquire(&walrcv->mutex);
307  memset(walrcv->conninfo, 0, MAXCONNINFO);
308  if (tmp_conninfo)
309  strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
310 
311  memset(walrcv->sender_host, 0, NI_MAXHOST);
312  if (sender_host)
313  strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
314 
315  walrcv->sender_port = sender_port;
316  walrcv->ready_to_display = true;
317  SpinLockRelease(&walrcv->mutex);
318 
319  if (tmp_conninfo)
320  pfree(tmp_conninfo);
321 
322  if (sender_host)
323  pfree(sender_host);
324 
325  first_stream = true;
326  for (;;)
327  {
328  char *primary_sysid;
329  char standby_sysid[32];
331 
332  /*
333  * Check that we're connected to a valid server using the
334  * IDENTIFY_SYSTEM replication command.
335  */
336  primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
337 
338  snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
340  if (strcmp(primary_sysid, standby_sysid) != 0)
341  {
342  ereport(ERROR,
343  (errmsg("database system identifier differs between the primary and standby"),
344  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
345  primary_sysid, standby_sysid)));
346  }
347 
348  /*
349  * Confirm that the current timeline of the primary is the same or
350  * ahead of ours.
351  */
352  if (primaryTLI < startpointTLI)
353  ereport(ERROR,
354  (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
355  primaryTLI, startpointTLI)));
356 
357  /*
358  * Get any missing history files. We do this always, even when we're
359  * not interested in that timeline, so that if we're promoted to
360  * become the master later on, we don't select the same timeline that
361  * was already used in the current master. This isn't bullet-proof -
362  * you'll need some external software to manage your cluster if you
363  * need to ensure that a unique timeline id is chosen in every case,
364  * but let's avoid the confusion of timeline id collisions where we
365  * can.
366  */
367  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
368 
369  /*
370  * Create temporary replication slot if requested, and update slot
371  * name in shared memory. (Note the slot name cannot already be set
372  * in this case.)
373  */
374  if (is_temp_slot)
375  {
376  snprintf(slotname, sizeof(slotname),
377  "pg_walreceiver_%lld",
378  (long long int) walrcv_get_backend_pid(wrconn));
379 
380  walrcv_create_slot(wrconn, slotname, true, 0, NULL);
381 
382  SpinLockAcquire(&walrcv->mutex);
383  strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
384  SpinLockRelease(&walrcv->mutex);
385  }
386 
387  /*
388  * Start streaming.
389  *
390  * We'll try to start at the requested starting point and timeline,
391  * even if it's different from the server's latest timeline. In case
392  * we've already reached the end of the old timeline, the server will
393  * finish the streaming immediately, and we will go back to await
394  * orders from the startup process. If recovery_target_timeline is
395  * 'latest', the startup process will scan pg_wal and find the new
396  * history file, bump recovery target timeline, and ask us to restart
397  * on the new timeline.
398  */
399  options.logical = false;
400  options.startpoint = startpoint;
401  options.slotname = slotname[0] != '\0' ? slotname : NULL;
402  options.proto.physical.startpointTLI = startpointTLI;
403  ThisTimeLineID = startpointTLI;
404  if (walrcv_startstreaming(wrconn, &options))
405  {
406  if (first_stream)
407  ereport(LOG,
408  (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
409  (uint32) (startpoint >> 32), (uint32) startpoint,
410  startpointTLI)));
411  else
412  ereport(LOG,
413  (errmsg("restarted WAL streaming at %X/%X on timeline %u",
414  (uint32) (startpoint >> 32), (uint32) startpoint,
415  startpointTLI)));
416  first_stream = false;
417 
418  /* Initialize LogstreamResult and buffers for processing messages */
422 
423  /* Initialize the last recv timestamp */
424  last_recv_timestamp = GetCurrentTimestamp();
425  ping_sent = false;
426 
427  /* Loop until end-of-streaming or error */
428  for (;;)
429  {
430  char *buf;
431  int len;
432  bool endofwal = false;
433  pgsocket wait_fd = PGINVALID_SOCKET;
434  int rc;
435 
436  /*
437  * Exit walreceiver if we're not in recovery. This should not
438  * happen, but cross-check the status here.
439  */
440  if (!RecoveryInProgress())
441  ereport(FATAL,
442  (errmsg("cannot continue WAL streaming, recovery has already ended")));
443 
444  /* Process any requests or signals received recently */
446 
447  if (got_SIGHUP)
448  {
449  got_SIGHUP = false;
452  }
453 
454  /* See if we can read data immediately */
455  len = walrcv_receive(wrconn, &buf, &wait_fd);
456  if (len != 0)
457  {
458  /*
459  * Process the received data, and any subsequent data we
460  * can read without blocking.
461  */
462  for (;;)
463  {
464  if (len > 0)
465  {
466  /*
467  * Something was received from master, so reset
468  * timeout
469  */
470  last_recv_timestamp = GetCurrentTimestamp();
471  ping_sent = false;
472  XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
473  }
474  else if (len == 0)
475  break;
476  else if (len < 0)
477  {
478  ereport(LOG,
479  (errmsg("replication terminated by primary server"),
480  errdetail("End of WAL reached on timeline %u at %X/%X.",
481  startpointTLI,
482  (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
483  endofwal = true;
484  break;
485  }
486  len = walrcv_receive(wrconn, &buf, &wait_fd);
487  }
488 
489  /* Let the master know that we received some data. */
490  XLogWalRcvSendReply(false, false);
491 
492  /*
493  * If we've written some records, flush them to disk and
494  * let the startup process and primary server know about
495  * them.
496  */
497  XLogWalRcvFlush(false);
498  }
499 
500  /* Check if we need to exit the streaming loop. */
501  if (endofwal)
502  break;
503 
504  /*
505  * Ideally we would reuse a WaitEventSet object repeatedly
506  * here to avoid the overheads of WaitLatchOrSocket on epoll
507  * systems, but we can't be sure that libpq (or any other
508  * walreceiver implementation) has the same socket (even if
509  * the fd is the same number, it may have been closed and
510  * reopened since the last time). In future, if there is a
511  * function for removing sockets from WaitEventSet, then we
512  * could add and remove just the socket each time, potentially
513  * avoiding some system calls.
514  */
515  Assert(wait_fd != PGINVALID_SOCKET);
516  rc = WaitLatchOrSocket(walrcv->latch,
519  wait_fd,
522  if (rc & WL_LATCH_SET)
523  {
524  ResetLatch(walrcv->latch);
526 
527  if (walrcv->force_reply)
528  {
529  /*
530  * The recovery process has asked us to send apply
531  * feedback now. Make sure the flag is really set to
532  * false in shared memory before sending the reply, so
533  * we don't miss a new request for a reply.
534  */
535  walrcv->force_reply = false;
537  XLogWalRcvSendReply(true, false);
538  }
539  }
540  if (rc & WL_TIMEOUT)
541  {
542  /*
543  * We didn't receive anything new. If we haven't heard
544  * anything from the server for more than
545  * wal_receiver_timeout / 2, ping the server. Also, if
546  * it's been longer than wal_receiver_status_interval
547  * since the last update we sent, send a status update to
548  * the master anyway, to report any progress in applying
549  * WAL.
550  */
551  bool requestReply = false;
552 
553  /*
554  * Check if time since last receive from standby has
555  * reached the configured limit.
556  */
557  if (wal_receiver_timeout > 0)
558  {
560  TimestampTz timeout;
561 
562  timeout =
563  TimestampTzPlusMilliseconds(last_recv_timestamp,
565 
566  if (now >= timeout)
567  ereport(ERROR,
568  (errmsg("terminating walreceiver due to timeout")));
569 
570  /*
571  * We didn't receive anything new, for half of
572  * receiver replication timeout. Ping the server.
573  */
574  if (!ping_sent)
575  {
576  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
577  (wal_receiver_timeout / 2));
578  if (now >= timeout)
579  {
580  requestReply = true;
581  ping_sent = true;
582  }
583  }
584  }
585 
586  XLogWalRcvSendReply(requestReply, requestReply);
588  }
589  }
590 
591  /*
592  * The backend finished streaming. Exit streaming COPY-mode from
593  * our side, too.
594  */
595  walrcv_endstreaming(wrconn, &primaryTLI);
596 
597  /*
598  * If the server had switched to a new timeline that we didn't
599  * know about when we began streaming, fetch its timeline history
600  * file now.
601  */
602  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
603  }
604  else
605  ereport(LOG,
606  (errmsg("primary server contains no more WAL on requested timeline %u",
607  startpointTLI)));
608 
609  /*
610  * End of WAL reached on the requested timeline. Close the last
611  * segment, and await for new orders from the startup process.
612  */
613  if (recvFile >= 0)
614  {
615  char xlogfname[MAXFNAMELEN];
616 
617  XLogWalRcvFlush(false);
619  if (close(recvFile) != 0)
620  ereport(PANIC,
622  errmsg("could not close log segment %s: %m",
623  xlogfname)));
624 
625  /*
626  * Create .done file forcibly to prevent the streamed segment from
627  * being archived later.
628  */
630  XLogArchiveForceDone(xlogfname);
631  else
632  XLogArchiveNotify(xlogfname);
633  }
634  recvFile = -1;
635 
636  elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
637  WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
638  }
639  /* not reached */
640 }
int sender_port
Definition: walreceiver.h:117
#define SIGQUIT
Definition: win32_port.h:154
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
Definition: walreceiver.c:727
#define walrcv_get_conninfo(conn)
Definition: walreceiver.h:276
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:288
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
uint32 TimeLineID
Definition: xlogdefs.h:52
slock_t mutex
Definition: walreceiver.h:143
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
int wal_segment_size
Definition: xlog.c:116
#define SIGUSR1
Definition: win32_port.h:165
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
#define SIGCHLD
Definition: win32_port.h:163
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
Definition: walreceiver.c:646
sig_atomic_t force_reply
Definition: walreceiver.h:158
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:280
WalRcvState walRcvState
Definition: walreceiver.h:64
static StringInfoData incoming_message
Definition: walreceiver.c:125
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1129
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:290
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:157
static void WalRcvSigHupHandler(SIGNAL_ARGS)
Definition: walreceiver.c:805
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:286
union WalRcvStreamOptions::@104 proto
void proc_exit(int code)
Definition: ipc.c:104
#define WL_SOCKET_READABLE
Definition: latch.h:125
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
Definition: walreceiver.c:829
#define SIGPIPE
Definition: win32_port.h:158
#define SIGUSR2
Definition: win32_port.h:166
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:8069
#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
Definition: walreceiver.h:294
#define PANIC
Definition: elog.h:53
#define NAPTIME_PER_CYCLE
Definition: walreceiver.c:96
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:98
void ResetLatch(Latch *latch)
Definition: latch.c:540
TimestampTz lastMsgSendTime
Definition: walreceiver.h:97
#define PG_SETMASK(mask)
Definition: pqsignal.h:19
int wal_receiver_timeout
Definition: walreceiver.c:89
Latch procLatch
Definition: proc.h:104
static WalReceiverConn * wrconn
Definition: walreceiver.c:93
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:151
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
static StringInfoData reply_message
Definition: walreceiver.c:124
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:466
void SignalHandlerForCrashExit(SIGNAL_ARGS)
Definition: interrupt.c:72
void pfree(void *pointer)
Definition: mcxt.c:1056
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:415
#define NI_MAXHOST
Definition: getaddrinfo.h:88
#define ERROR
Definition: elog.h:43
#define FATAL
Definition: elog.h:52
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11463
pid_t pid
Definition: walreceiver.h:63
#define MAXCONNINFO
Definition: walreceiver.h:36
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
static char * buf
Definition: pg_test_fsync.c:67
int XLogArchiveMode
Definition: xlog.c:94
int errdetail(const char *fmt,...)
Definition: elog.c:957
static struct @24 LogstreamResult
int errcode_for_file_access(void)
Definition: elog.c:633
#define SIGHUP
Definition: win32_port.h:153
XLogRecPtr startpoint
Definition: walreceiver.h:168
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:390
unsigned int uint32
Definition: c.h:367
int pgsocket
Definition: port.h:31
sigset_t UnBlockSig
Definition: pqsignal.c:22
static volatile sig_atomic_t got_SIGHUP
Definition: walreceiver.c:111
TimeLineID receiveStartTLI
Definition: walreceiver.h:74
static TimeLineID recvFileTLI
Definition: walreceiver.c:104
#define walrcv_get_backend_pid(conn)
Definition: walreceiver.h:296
Definition: guc.h:72
char * cluster_name
Definition: guc.c:560
Latch * latch
Definition: walreceiver.h:141
#define SIG_IGN
Definition: win32_port.h:150
static char ** options
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
static XLogSegNo recvSegNo
Definition: walreceiver.c:105
#define MAXFNAMELEN
#define SpinLockRelease(lock)
Definition: spin.h:64
sigset_t BlockSig
Definition: pqsignal.c:22
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:516
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:94
#define PGINVALID_SOCKET
Definition: port.h:33
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1060
bool ready_to_display
Definition: walreceiver.h:132
TimestampTz latestWalEndTime
Definition: walreceiver.h:104
static void WalRcvShutdownHandler(SIGNAL_ARGS)
Definition: walreceiver.c:813
TimeLineID ThisTimeLineID
Definition: xlog.c:191
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define ereport(elevel,...)
Definition: elog.h:144
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define pg_memory_barrier()
Definition: atomics.h:145
#define SIG_DFL
Definition: win32_port.h:148
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:738
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
#define SIGALRM
Definition: win32_port.h:159
bool is_temp_slot
Definition: walreceiver.h:129
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1001
WalRcvData * WalRcv
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4910
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:116
int errmsg(const char *fmt,...)
Definition: elog.c:824
static void WalRcvDie(int code, Datum arg)
Definition: walreceiver.c:774
#define elog(elevel,...)
Definition: elog.h:214
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
Definition: walreceiver.h:278
struct WalRcvStreamOptions::@104::@105 physical
#define close(a)
Definition: win32.h:12
XLogRecPtr receiveStart
Definition: walreceiver.h:73
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:533
static int recvFile
Definition: walreceiver.c:103
#define snprintf
Definition: port.h:193
#define WL_LATCH_SET
Definition: latch.h:124
#define UINT64_FORMAT
Definition: c.h:410
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1538
char slotname[NAMEDATALEN]
Definition: walreceiver.h:123
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:110
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:272

◆ XLogWalRcvFlush()

static void XLogWalRcvFlush ( bool  dying)
static

Definition at line 1001 of file walreceiver.c.

References AllowCascadeReplication, WalRcvData::flushedUpto, issue_xlog_fsync(), WalRcvData::latestChunkStart, LogstreamResult, WalRcvData::mutex, WalRcvData::receivedTLI, recvFile, recvSegNo, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, ThisTimeLineID, update_process_title, WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvWrite().

1002 {
1003  if (LogstreamResult.Flush < LogstreamResult.Write)
1004  {
1005  WalRcvData *walrcv = WalRcv;
1006 
1008 
1009  LogstreamResult.Flush = LogstreamResult.Write;
1010 
1011  /* Update shared-memory status */
1012  SpinLockAcquire(&walrcv->mutex);
1013  if (walrcv->flushedUpto < LogstreamResult.Flush)
1014  {
1015  walrcv->latestChunkStart = walrcv->flushedUpto;
1016  walrcv->flushedUpto = LogstreamResult.Flush;
1017  walrcv->receivedTLI = ThisTimeLineID;
1018  }
1019  SpinLockRelease(&walrcv->mutex);
1020 
1021  /* Signal the startup process and walsender that new WAL has arrived */
1022  WakeupRecovery();
1024  WalSndWakeup();
1025 
1026  /* Report XLOG streaming progress in PS display */
1028  {
1029  char activitymsg[50];
1030 
1031  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1032  (uint32) (LogstreamResult.Write >> 32),
1033  (uint32) LogstreamResult.Write);
1034  set_ps_display(activitymsg);
1035  }
1036 
1037  /* Also let the master know that we made some progress */
1038  if (!dying)
1039  {
1040  XLogWalRcvSendReply(false, false);
1041  XLogWalRcvSendHSFeedback(false);
1042  }
1043  }
1044 }
slock_t mutex
Definition: walreceiver.h:143
bool update_process_title
Definition: ps_status.c:36
void issue_xlog_fsync(int fd, XLogSegNo segno)
Definition: xlog.c:10381
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1129
TimeLineID receivedTLI
Definition: walreceiver.h:84
void set_ps_display(const char *activity)
Definition: ps_status.c:349
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12660
XLogRecPtr latestChunkStart
Definition: walreceiver.h:92
static struct @24 LogstreamResult
#define AllowCascadeReplication()
Definition: walreceiver.h:39
unsigned int uint32
Definition: c.h:367
static XLogSegNo recvSegNo
Definition: walreceiver.c:105
#define SpinLockRelease(lock)
Definition: spin.h:64
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1060
TimeLineID ThisTimeLineID
Definition: xlog.c:191
WalRcvData * WalRcv
static int recvFile
Definition: walreceiver.c:103
XLogRecPtr flushedUpto
Definition: walreceiver.h:83
#define snprintf
Definition: port.h:193
void WalSndWakeup(void)
Definition: walsender.c:3112

◆ XLogWalRcvProcessMsg()

static void XLogWalRcvProcessMsg ( unsigned char  type,
char *  buf,
Size  len 
)
static

Definition at line 829 of file walreceiver.c.

References appendBinaryStringInfo(), ereport, errcode(), errmsg_internal(), ERROR, pq_getmsgbyte(), pq_getmsgint64(), ProcessWalSndrMessage(), resetStringInfo(), XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

830 {
831  int hdrlen;
832  XLogRecPtr dataStart;
833  XLogRecPtr walEnd;
834  TimestampTz sendTime;
835  bool replyRequested;
836 
838 
839  switch (type)
840  {
841  case 'w': /* WAL records */
842  {
843  /* copy message to StringInfo */
844  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
845  if (len < hdrlen)
846  ereport(ERROR,
847  (errcode(ERRCODE_PROTOCOL_VIOLATION),
848  errmsg_internal("invalid WAL message received from primary")));
850 
851  /* read the fields */
852  dataStart = pq_getmsgint64(&incoming_message);
853  walEnd = pq_getmsgint64(&incoming_message);
854  sendTime = pq_getmsgint64(&incoming_message);
855  ProcessWalSndrMessage(walEnd, sendTime);
856 
857  buf += hdrlen;
858  len -= hdrlen;
859  XLogWalRcvWrite(buf, len, dataStart);
860  break;
861  }
862  case 'k': /* Keepalive */
863  {
864  /* copy message to StringInfo */
865  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
866  if (len != hdrlen)
867  ereport(ERROR,
868  (errcode(ERRCODE_PROTOCOL_VIOLATION),
869  errmsg_internal("invalid keepalive message received from primary")));
871 
872  /* read the fields */
873  walEnd = pq_getmsgint64(&incoming_message);
874  sendTime = pq_getmsgint64(&incoming_message);
875  replyRequested = pq_getmsgbyte(&incoming_message);
876 
877  ProcessWalSndrMessage(walEnd, sendTime);
878 
879  /* If the primary requested a reply, send one immediately */
880  if (replyRequested)
881  XLogWalRcvSendReply(true, false);
882  break;
883  }
884  default:
885  ereport(ERROR,
886  (errcode(ERRCODE_PROTOCOL_VIOLATION),
887  errmsg_internal("invalid replication message type %d",
888  type)));
889  }
890 }
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
Definition: walreceiver.c:1245
int64 TimestampTz
Definition: timestamp.h:39
static StringInfoData incoming_message
Definition: walreceiver.c:125
int errcode(int sqlerrcode)
Definition: elog.c:610
#define ERROR
Definition: elog.h:43
static char * buf
Definition: pg_test_fsync.c:67
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1060
#define ereport(elevel,...)
Definition: elog.h:144
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
int errmsg_internal(const char *fmt,...)
Definition: elog.c:911
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
Definition: walreceiver.c:896

◆ XLogWalRcvSendHSFeedback()

static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1129 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, EpochFromFullTransactionId, GetCurrentTimestamp(), GetOldestXmin(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), pq_sendbyte(), pq_sendint32(), pq_sendint64(), PROCARRAY_FLAGS_DEFAULT, PROCARRAY_SLOTS_XMIN, ProcArrayGetReplicationSlotXmin(), ReadNextFullTransactionId(), resetStringInfo(), TimestampDifferenceExceeds(), TransactionIdIsValid, TransactionIdPrecedes(), wal_receiver_status_interval, walrcv_send, and XidFromFullTransactionId.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

1130 {
1131  TimestampTz now;
1132  FullTransactionId nextFullXid;
1133  TransactionId nextXid;
1134  uint32 xmin_epoch,
1135  catalog_xmin_epoch;
1136  TransactionId xmin,
1137  catalog_xmin;
1138  static TimestampTz sendTime = 0;
1139 
1140  /* initially true so we always send at least one feedback message */
1141  static bool master_has_standby_xmin = true;
1142 
1143  /*
1144  * If the user doesn't want status to be reported to the master, be sure
1145  * to exit before doing anything at all.
1146  */
1148  !master_has_standby_xmin)
1149  return;
1150 
1151  /* Get current timestamp. */
1152  now = GetCurrentTimestamp();
1153 
1154  if (!immed)
1155  {
1156  /*
1157  * Send feedback at most once per wal_receiver_status_interval.
1158  */
1159  if (!TimestampDifferenceExceeds(sendTime, now,
1161  return;
1162  sendTime = now;
1163  }
1164 
1165  /*
1166  * If Hot Standby is not yet accepting connections there is nothing to
1167  * send. Check this after the interval has expired to reduce number of
1168  * calls.
1169  *
1170  * Bailing out here also ensures that we don't send feedback until we've
1171  * read our own replication slot state, so we don't tell the master to
1172  * discard needed xmin or catalog_xmin from any slots that may exist on
1173  * this replica.
1174  */
1175  if (!HotStandbyActive())
1176  return;
1177 
1178  /*
1179  * Make the expensive call to get the oldest xmin once we are certain
1180  * everything else has been checked.
1181  */
1183  {
1184  TransactionId slot_xmin;
1185 
1186  /*
1187  * Usually GetOldestXmin() would include both global replication slot
1188  * xmin and catalog_xmin in its calculations, but we want to derive
1189  * separate values for each of those. So we ask for an xmin that
1190  * excludes the catalog_xmin.
1191  */
1192  xmin = GetOldestXmin(NULL,
1194 
1195  ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
1196 
1197  if (TransactionIdIsValid(slot_xmin) &&
1198  TransactionIdPrecedes(slot_xmin, xmin))
1199  xmin = slot_xmin;
1200  }
1201  else
1202  {
1203  xmin = InvalidTransactionId;
1204  catalog_xmin = InvalidTransactionId;
1205  }
1206 
1207  /*
1208  * Get epoch and adjust if nextXid and oldestXmin are different sides of
1209  * the epoch boundary.
1210  */
1211  nextFullXid = ReadNextFullTransactionId();
1212  nextXid = XidFromFullTransactionId(nextFullXid);
1213  xmin_epoch = EpochFromFullTransactionId(nextFullXid);
1214  catalog_xmin_epoch = xmin_epoch;
1215  if (nextXid < xmin)
1216  xmin_epoch--;
1217  if (nextXid < catalog_xmin)
1218  catalog_xmin_epoch--;
1219 
1220  elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1221  xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1222 
1223  /* Construct the message and send it. */
1225  pq_sendbyte(&reply_message, 'h');
1227  pq_sendint32(&reply_message, xmin);
1228  pq_sendint32(&reply_message, xmin_epoch);
1229  pq_sendint32(&reply_message, catalog_xmin);
1230  pq_sendint32(&reply_message, catalog_xmin_epoch);
1232  if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1233  master_has_standby_xmin = true;
1234  else
1235  master_has_standby_xmin = false;
1236 }
bool hot_standby_feedback
Definition: walreceiver.c:90
uint32 TransactionId
Definition: c.h:513
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
int64 TimestampTz
Definition: timestamp.h:39
bool HotStandbyActive(void)
Definition: xlog.c:8143
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
int wal_receiver_status_interval
Definition: walreceiver.c:88
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1673
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:3114
static WalReceiverConn * wrconn
Definition: walreceiver.c:93
#define XidFromFullTransactionId(x)
Definition: transam.h:48
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static StringInfoData reply_message
Definition: walreceiver.c:124
#define PROCARRAY_SLOTS_XMIN
Definition: procarray.h:37
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
unsigned int uint32
Definition: c.h:367
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:246
#define PROCARRAY_FLAGS_DEFAULT
Definition: procarray.h:50
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
TransactionId GetOldestXmin(Relation rel, int flags)
Definition: procarray.c:1305
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:292
#define elog(elevel,...)
Definition: elog.h:214
#define TransactionIdIsValid(xid)
Definition: transam.h:41
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1538

◆ XLogWalRcvSendReply()

static void XLogWalRcvSendReply ( bool  force,
bool  requestReply 
)
static

Definition at line 1060 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), GetXLogReplayRecPtr(), StringInfoData::len, LogstreamResult, now(), pq_sendbyte(), pq_sendint64(), resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

1061 {
1062  static XLogRecPtr writePtr = 0;
1063  static XLogRecPtr flushPtr = 0;
1064  XLogRecPtr applyPtr;
1065  static TimestampTz sendTime = 0;
1066  TimestampTz now;
1067 
1068  /*
1069  * If the user doesn't want status to be reported to the master, be sure
1070  * to exit before doing anything at all.
1071  */
1072  if (!force && wal_receiver_status_interval <= 0)
1073  return;
1074 
1075  /* Get current timestamp. */
1076  now = GetCurrentTimestamp();
1077 
1078  /*
1079  * We can compare the write and flush positions to the last message we
1080  * sent without taking any lock, but the apply position requires a spin
1081  * lock, so we don't check that unless something else has changed or 10
1082  * seconds have passed. This means that the apply WAL location will
1083  * appear, from the master's point of view, to lag slightly, but since
1084  * this is only for reporting purposes and only on idle systems, that's
1085  * probably OK.
1086  */
1087  if (!force
1088  && writePtr == LogstreamResult.Write
1089  && flushPtr == LogstreamResult.Flush
1090  && !TimestampDifferenceExceeds(sendTime, now,
1092  return;
1093  sendTime = now;
1094 
1095  /* Construct a new message */
1096  writePtr = LogstreamResult.Write;
1097  flushPtr = LogstreamResult.Flush;
1098  applyPtr = GetXLogReplayRecPtr(NULL);
1099 
1101  pq_sendbyte(&reply_message, 'r');
1102  pq_sendint64(&reply_message, writePtr);
1103  pq_sendint64(&reply_message, flushPtr);
1104  pq_sendint64(&reply_message, applyPtr);
1106  pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1107 
1108  /* Send it */
1109  elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1110  (uint32) (writePtr >> 32), (uint32) writePtr,
1111  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1112  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1113  requestReply ? " (reply requested)" : "");
1114 
1116 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
int64 TimestampTz
Definition: timestamp.h:39
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
int wal_receiver_status_interval
Definition: walreceiver.c:88
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1673
static WalReceiverConn * wrconn
Definition: walreceiver.c:93
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static StringInfoData reply_message
Definition: walreceiver.c:124
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11463
#define DEBUG2
Definition: elog.h:24
static struct @24 LogstreamResult
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
unsigned int uint32
Definition: c.h:367
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:292
#define elog(elevel,...)
Definition: elog.h:214
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1538

◆ XLogWalRcvWrite()

static void XLogWalRcvWrite ( char *  buf,
Size  nbytes,
XLogRecPtr  recptr 
)
static

Definition at line 896 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, close, ereport, errcode_for_file_access(), errmsg(), LogstreamResult, MAXFNAMELEN, PANIC, pg_atomic_write_u64(), pg_pwrite(), recvFile, recvFileTLI, recvSegNo, ThisTimeLineID, wal_segment_size, WalRcv, WalRcvData::writtenUpto, XLByteInSeg, XLByteToSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileInit(), XLogFileName, XLogSegmentOffset, and XLogWalRcvFlush().

Referenced by XLogWalRcvProcessMsg().

897 {
898  int startoff;
899  int byteswritten;
900 
901  while (nbytes > 0)
902  {
903  int segbytes;
904 
905  if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
906  {
907  bool use_existent;
908 
909  /*
910  * fsync() and close current file before we switch to next one. We
911  * would otherwise have to reopen this file to fsync it later
912  */
913  if (recvFile >= 0)
914  {
915  char xlogfname[MAXFNAMELEN];
916 
917  XLogWalRcvFlush(false);
918 
920 
921  /*
922  * XLOG segment files will be re-read by recovery in startup
923  * process soon, so we don't advise the OS to release cache
924  * pages associated with the file like XLogFileClose() does.
925  */
926  if (close(recvFile) != 0)
927  ereport(PANIC,
929  errmsg("could not close log segment %s: %m",
930  xlogfname)));
931 
932  /*
933  * Create .done file forcibly to prevent the streamed segment
934  * from being archived later.
935  */
937  XLogArchiveForceDone(xlogfname);
938  else
939  XLogArchiveNotify(xlogfname);
940  }
941  recvFile = -1;
942 
943  /* Create/use new log file */
945  use_existent = true;
946  recvFile = XLogFileInit(recvSegNo, &use_existent, true);
948  }
949 
950  /* Calculate the start offset of the received logs */
951  startoff = XLogSegmentOffset(recptr, wal_segment_size);
952 
953  if (startoff + nbytes > wal_segment_size)
954  segbytes = wal_segment_size - startoff;
955  else
956  segbytes = nbytes;
957 
958  /* OK to write the logs */
959  errno = 0;
960 
961  byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
962  if (byteswritten <= 0)
963  {
964  char xlogfname[MAXFNAMELEN];
965  int save_errno;
966 
967  /* if write didn't set errno, assume no disk space */
968  if (errno == 0)
969  errno = ENOSPC;
970 
971  save_errno = errno;
973  errno = save_errno;
974  ereport(PANIC,
976  errmsg("could not write to log segment %s "
977  "at offset %u, length %lu: %m",
978  xlogfname, startoff, (unsigned long) segbytes)));
979  }
980 
981  /* Update state for write */
982  recptr += byteswritten;
983 
984  nbytes -= byteswritten;
985  buf += byteswritten;
986 
987  LogstreamResult.Write = recptr;
988  }
989 
990  /* Update shared-memory status */
992 }
int wal_segment_size
Definition: xlog.c:116
int XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
Definition: xlog.c:3251
#define PANIC
Definition: elog.h:53
ssize_t pg_pwrite(int fd, const void *buf, size_t nbyte, off_t offset)
Definition: pwrite.c:27
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:438
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:151
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:466
static char * buf
Definition: pg_test_fsync.c:67
int XLogArchiveMode
Definition: xlog.c:94
static struct @24 LogstreamResult
int errcode_for_file_access(void)
Definition: elog.c:633
static TimeLineID recvFileTLI
Definition: walreceiver.c:104
static XLogSegNo recvSegNo
Definition: walreceiver.c:105
#define MAXFNAMELEN
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:516
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
TimeLineID ThisTimeLineID
Definition: xlog.c:191
#define ereport(elevel,...)
Definition: elog.h:144
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1001
WalRcvData * WalRcv
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define close(a)
Definition: win32.h:12
static int recvFile
Definition: walreceiver.c:103
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

Variable Documentation

◆ Flush

XLogRecPtr Flush

Definition at line 121 of file walreceiver.c.

◆ got_SIGHUP

volatile sig_atomic_t got_SIGHUP = false
static

Definition at line 111 of file walreceiver.c.

Referenced by WalRcvSigHupHandler(), and WalReceiverMain().

◆ got_SIGTERM

volatile sig_atomic_t got_SIGTERM = false
static

Definition at line 112 of file walreceiver.c.

Referenced by ProcessWalRcvInterrupts(), and WalRcvShutdownHandler().

◆ hot_standby_feedback

bool hot_standby_feedback

Definition at line 90 of file walreceiver.c.

Referenced by XLogWalRcvSendHSFeedback().

◆ incoming_message

StringInfoData incoming_message
static

Definition at line 125 of file walreceiver.c.

◆ LogstreamResult

struct { ... } LogstreamResult

◆ recvFile

int recvFile = -1
static

Definition at line 103 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ recvFileTLI

TimeLineID recvFileTLI = 0
static

Definition at line 104 of file walreceiver.c.

Referenced by WalReceiverMain(), and XLogWalRcvWrite().

◆ recvSegNo

XLogSegNo recvSegNo = 0
static

Definition at line 105 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ reply_message

StringInfoData reply_message
static

Definition at line 124 of file walreceiver.c.

Referenced by send_feedback().

◆ wal_receiver_status_interval

int wal_receiver_status_interval

Definition at line 88 of file walreceiver.c.

Referenced by send_feedback(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ wal_receiver_timeout

int wal_receiver_timeout

Definition at line 89 of file walreceiver.c.

Referenced by logicalrep_worker_launch(), LogicalRepApplyLoop(), and WalReceiverMain().

◆ WalReceiverFunctions

WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 94 of file walreceiver.c.

Referenced by _PG_init().

◆ wrconn

WalReceiverConn* wrconn = NULL
static

Definition at line 93 of file walreceiver.c.

◆ Write

XLogRecPtr Write

Definition at line 120 of file walreceiver.c.