PostgreSQL Source Code  git master
walreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "common/ip.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */
 

Functions

static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr)
 
static void XLogWalRcvFlush (bool dying)
 
static void XLogWalRcvSendReply (bool force, bool requestReply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
static void WalRcvSigHupHandler (SIGNAL_ARGS)
 
static void WalRcvShutdownHandler (SIGNAL_ARGS)
 
void ProcessWalRcvInterrupts (void)
 
void WalReceiverMain (void)
 
void WalRcvForceReply (void)
 
static const char * WalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

bool wal_receiver_create_temp_slot
 
int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
static uint32 recvOff = 0
 
static volatile sig_atomic_t got_SIGHUP = false
 
static volatile sig_atomic_t got_SIGTERM = false
 
struct {
   XLogRecPtr   Write
 
   XLogRecPtr   Flush
 
LogstreamResult
 
static StringInfoData reply_message
 
static StringInfoData incoming_message
 

Macro Definition Documentation

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */

Definition at line 85 of file walreceiver.c.

Referenced by WalReceiverMain().

Function Documentation

◆ pg_stat_get_wal_receiver()

Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1359 of file walreceiver.c.

References WalRcvData::conninfo, CStringGetTextDatum, elog, ERROR, get_call_result_type(), GetUserId(), heap_form_tuple(), HeapTupleGetDatum, Int32GetDatum, is_member_of_role(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum, MAXCONNINFO, MemSet, WalRcvData::mutex, NAMEDATALEN, TupleDescData::natts, NI_MAXHOST, palloc0(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receivedUpto, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::sender_host, WalRcvData::sender_port, WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, strlcpy(), TimestampTzGetDatum, TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, and XLogRecPtrIsInvalid.

1360 {
1361  TupleDesc tupdesc;
1362  Datum *values;
1363  bool *nulls;
1364  int pid;
1365  bool ready_to_display;
1367  XLogRecPtr receive_start_lsn;
1368  TimeLineID receive_start_tli;
1369  XLogRecPtr received_lsn;
1370  TimeLineID received_tli;
1371  TimestampTz last_send_time;
1372  TimestampTz last_receipt_time;
1373  XLogRecPtr latest_end_lsn;
1374  TimestampTz latest_end_time;
1375  char sender_host[NI_MAXHOST];
1376  int sender_port = 0;
1377  char slotname[NAMEDATALEN];
1378  char conninfo[MAXCONNINFO];
1379 
1380  /* Take a lock to ensure value consistency */
1382  pid = (int) WalRcv->pid;
1383  ready_to_display = WalRcv->ready_to_display;
1384  state = WalRcv->walRcvState;
1385  receive_start_lsn = WalRcv->receiveStart;
1386  receive_start_tli = WalRcv->receiveStartTLI;
1387  received_lsn = WalRcv->receivedUpto;
1388  received_tli = WalRcv->receivedTLI;
1389  last_send_time = WalRcv->lastMsgSendTime;
1390  last_receipt_time = WalRcv->lastMsgReceiptTime;
1391  latest_end_lsn = WalRcv->latestWalEnd;
1392  latest_end_time = WalRcv->latestWalEndTime;
1393  strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1394  strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
1395  sender_port = WalRcv->sender_port;
1396  strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1398 
1399  /*
1400  * No WAL receiver (or not ready yet), just return a tuple with NULL
1401  * values
1402  */
1403  if (pid == 0 || !ready_to_display)
1404  PG_RETURN_NULL();
1405 
1406  /* determine result type */
1407  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1408  elog(ERROR, "return type must be a row type");
1409 
1410  values = palloc0(sizeof(Datum) * tupdesc->natts);
1411  nulls = palloc0(sizeof(bool) * tupdesc->natts);
1412 
1413  /* Fetch values */
1414  values[0] = Int32GetDatum(pid);
1415 
1416  if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
1417  {
1418  /*
1419  * Only superusers and members of pg_read_all_stats can see details.
1420  * Other users only get the pid value to know whether it is a WAL
1421  * receiver, but no details.
1422  */
1423  MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1424  }
1425  else
1426  {
1427  values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1428 
1429  if (XLogRecPtrIsInvalid(receive_start_lsn))
1430  nulls[2] = true;
1431  else
1432  values[2] = LSNGetDatum(receive_start_lsn);
1433  values[3] = Int32GetDatum(receive_start_tli);
1434  if (XLogRecPtrIsInvalid(received_lsn))
1435  nulls[4] = true;
1436  else
1437  values[4] = LSNGetDatum(received_lsn);
1438  values[5] = Int32GetDatum(received_tli);
1439  if (last_send_time == 0)
1440  nulls[6] = true;
1441  else
1442  values[6] = TimestampTzGetDatum(last_send_time);
1443  if (last_receipt_time == 0)
1444  nulls[7] = true;
1445  else
1446  values[7] = TimestampTzGetDatum(last_receipt_time);
1447  if (XLogRecPtrIsInvalid(latest_end_lsn))
1448  nulls[8] = true;
1449  else
1450  values[8] = LSNGetDatum(latest_end_lsn);
1451  if (latest_end_time == 0)
1452  nulls[9] = true;
1453  else
1454  values[9] = TimestampTzGetDatum(latest_end_time);
1455  if (*slotname == '\0')
1456  nulls[10] = true;
1457  else
1458  values[10] = CStringGetTextDatum(slotname);
1459  if (*sender_host == '\0')
1460  nulls[11] = true;
1461  else
1462  values[11] = CStringGetTextDatum(sender_host);
1463  if (sender_port == 0)
1464  nulls[12] = true;
1465  else
1466  values[12] = Int32GetDatum(sender_port);
1467  if (*conninfo == '\0')
1468  nulls[13] = true;
1469  else
1470  values[13] = CStringGetTextDatum(conninfo);
1471  }
1472 
1473  /* Returns the record as Datum */
1474  PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1475 }
int sender_port
Definition: walreceiver.h:117
static const char * WalRcvGetStateString(WalRcvState state)
Definition: walreceiver.c:1334
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:196
uint32 TimeLineID
Definition: xlogdefs.h:52
slock_t mutex
Definition: walreceiver.h:143
Oid GetUserId(void)
Definition: miscinit.c:380
int64 TimestampTz
Definition: timestamp.h:39
WalRcvState walRcvState
Definition: walreceiver.h:64
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
TimeLineID receivedTLI
Definition: walreceiver.h:84
#define MemSet(start, val, len)
Definition: c.h:962
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:98
TimestampTz lastMsgSendTime
Definition: walreceiver.h:97
WalRcvState
Definition: walreceiver.h:44
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define NI_MAXHOST
Definition: getaddrinfo.h:88
#define ERROR
Definition: elog.h:43
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
pid_t pid
Definition: walreceiver.h:63
#define MAXCONNINFO
Definition: walreceiver.h:36
XLogRecPtr receivedUpto
Definition: walreceiver.h:83
TimeLineID receiveStartTLI
Definition: walreceiver.h:74
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
void * palloc0(Size size)
Definition: mcxt.c:980
uintptr_t Datum
Definition: postgres.h:367
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:343
bool ready_to_display
Definition: walreceiver.h:132
TimestampTz latestWalEndTime
Definition: walreceiver.h:104
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4924
XLogRecPtr latestWalEnd
Definition: walreceiver.h:103
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:220
WalRcvData * WalRcv
static Datum values[MAXATTR]
Definition: bootstrap.c:167
#define Int32GetDatum(X)
Definition: postgres.h:479
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:116
#define elog(elevel,...)
Definition: elog.h:228
#define CStringGetTextDatum(s)
Definition: builtins.h:83
XLogRecPtr receiveStart
Definition: walreceiver.h:73
#define PG_RETURN_NULL()
Definition: fmgr.h:335
char slotname[NAMEDATALEN]
Definition: walreceiver.h:123
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:110

◆ ProcessWalRcvInterrupts()

void ProcessWalRcvInterrupts ( void  )

Definition at line 147 of file walreceiver.c.

References CHECK_FOR_INTERRUPTS, ereport, errcode(), errmsg(), FATAL, and got_SIGTERM.

Referenced by libpqrcv_connect(), libpqrcv_PQgetResult(), libpqrcv_processTuples(), walrcv_clear_result(), WalRcvWaitForStartPosition(), and WalReceiverMain().

148 {
149  /*
150  * Although walreceiver interrupt handling doesn't use the same scheme as
151  * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
152  * any incoming signals on Win32, and also to make sure we process any
153  * barrier events.
154  */
156 
157  if (got_SIGTERM)
158  {
159  ereport(FATAL,
160  (errcode(ERRCODE_ADMIN_SHUTDOWN),
161  errmsg("terminating walreceiver process due to administrator command")));
162  }
163 }
int errcode(int sqlerrcode)
Definition: elog.c:608
#define FATAL
Definition: elog.h:52
#define ereport(elevel, rest)
Definition: elog.h:141
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static volatile sig_atomic_t got_SIGTERM
Definition: walreceiver.c:102

◆ ProcessWalSndrMessage()

static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1263 of file walreceiver.c.

References DEBUG2, elog, GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, log_min_messages, WalRcvData::mutex, pfree(), pstrdup(), SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

1264 {
1265  WalRcvData *walrcv = WalRcv;
1266 
1267  TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1268 
1269  /* Update shared-memory status */
1270  SpinLockAcquire(&walrcv->mutex);
1271  if (walrcv->latestWalEnd < walEnd)
1272  walrcv->latestWalEndTime = sendTime;
1273  walrcv->latestWalEnd = walEnd;
1274  walrcv->lastMsgSendTime = sendTime;
1275  walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1276  SpinLockRelease(&walrcv->mutex);
1277 
1278  if (log_min_messages <= DEBUG2)
1279  {
1280  char *sendtime;
1281  char *receipttime;
1282  int applyDelay;
1283 
1284  /* Copy because timestamptz_to_str returns a static buffer */
1285  sendtime = pstrdup(timestamptz_to_str(sendTime));
1286  receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1287  applyDelay = GetReplicationApplyDelay();
1288 
1289  /* apply delay is not available */
1290  if (applyDelay == -1)
1291  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1292  sendtime,
1293  receipttime,
1295  else
1296  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1297  sendtime,
1298  receipttime,
1299  applyDelay,
1301 
1302  pfree(sendtime);
1303  pfree(receipttime);
1304  }
1305 }
slock_t mutex
Definition: walreceiver.h:143
int GetReplicationTransferLatency(void)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1186
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:98
TimestampTz lastMsgSendTime
Definition: walreceiver.h:97
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pfree(void *pointer)
Definition: mcxt.c:1056
#define DEBUG2
Definition: elog.h:24
int GetReplicationApplyDelay(void)
#define SpinLockRelease(lock)
Definition: spin.h:64
TimestampTz latestWalEndTime
Definition: walreceiver.h:104
int log_min_messages
Definition: guc.c:518
XLogRecPtr latestWalEnd
Definition: walreceiver.h:103
WalRcvData * WalRcv
#define elog(elevel,...)
Definition: elog.h:228
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1743

◆ WalRcvDie()

static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 774 of file walreceiver.c.

References Assert, WalRcvData::latch, WalRcvData::mutex, MyProcPid, WalRcvData::pid, WalRcvData::ready_to_display, SpinLockAcquire, SpinLockRelease, WakeupRecovery(), WalRcv, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

775 {
776  WalRcvData *walrcv = WalRcv;
777 
778  /* Ensure that all WAL records received are flushed to disk */
779  XLogWalRcvFlush(true);
780 
781  /* Mark ourselves inactive in shared memory */
782  SpinLockAcquire(&walrcv->mutex);
783  Assert(walrcv->walRcvState == WALRCV_STREAMING ||
784  walrcv->walRcvState == WALRCV_RESTARTING ||
785  walrcv->walRcvState == WALRCV_STARTING ||
786  walrcv->walRcvState == WALRCV_WAITING ||
787  walrcv->walRcvState == WALRCV_STOPPING);
788  Assert(walrcv->pid == MyProcPid);
789  walrcv->walRcvState = WALRCV_STOPPED;
790  walrcv->pid = 0;
791  walrcv->ready_to_display = false;
792  walrcv->latch = NULL;
793  SpinLockRelease(&walrcv->mutex);
794 
795  /* Terminate the connection gracefully. */
796  if (wrconn != NULL)
798 
799  /* Wake up the startup process to notice promptly that we're gone */
800  WakeupRecovery();
801 }
int MyProcPid
Definition: globals.c:40
slock_t mutex
Definition: walreceiver.h:143
WalRcvState walRcvState
Definition: walreceiver.h:64
static WalReceiverConn * wrconn
Definition: walreceiver.c:82
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12300
pid_t pid
Definition: walreceiver.h:63
Latch * latch
Definition: walreceiver.h:141
#define SpinLockRelease(lock)
Definition: spin.h:64
bool ready_to_display
Definition: walreceiver.h:132
#define Assert(condition)
Definition: c.h:739
#define walrcv_disconnect(conn)
Definition: walreceiver.h:292
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1019
WalRcvData * WalRcv

◆ WalRcvFetchTimeLineHistoryFiles()

static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 727 of file walreceiver.c.

References ereport, errcode(), errmsg(), errmsg_internal(), ERROR, existsTimeLineHistory(), LOG, MAXFNAMELEN, pfree(), TLHistoryFileName, walrcv_readtimelinehistoryfile, and writeTimeLineHistoryFile().

Referenced by WalReceiverMain().

728 {
729  TimeLineID tli;
730 
731  for (tli = first; tli <= last; tli++)
732  {
733  /* there's no history file for timeline 1 */
734  if (tli != 1 && !existsTimeLineHistory(tli))
735  {
736  char *fname;
737  char *content;
738  int len;
739  char expectedfname[MAXFNAMELEN];
740 
741  ereport(LOG,
742  (errmsg("fetching timeline history file for timeline %u from primary server",
743  tli)));
744 
745  walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
746 
747  /*
748  * Check that the filename on the master matches what we
749  * calculated ourselves. This is just a sanity check, it should
750  * always match.
751  */
752  TLHistoryFileName(expectedfname, tli);
753  if (strcmp(fname, expectedfname) != 0)
754  ereport(ERROR,
755  (errcode(ERRCODE_PROTOCOL_VIOLATION),
756  errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
757  tli)));
758 
759  /*
760  * Write the file to pg_wal.
761  */
762  writeTimeLineHistoryFile(tli, content, len);
763 
764  pfree(fname);
765  pfree(content);
766  }
767  }
768 }
uint32 TimeLineID
Definition: xlogdefs.h:52
int errcode(int sqlerrcode)
Definition: elog.c:608
#define LOG
Definition: elog.h:26
static WalReceiverConn * wrconn
Definition: walreceiver.c:82
void pfree(void *pointer)
Definition: mcxt.c:1056
#define ERROR
Definition: elog.h:43
#define TLHistoryFileName(fname, tli)
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition: timeline.c:450
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition: timeline.c:207
#define ereport(elevel, rest)
Definition: elog.h:141
#define MAXFNAMELEN
int errmsg_internal(const char *fmt,...)
Definition: elog.c:909
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
Definition: walreceiver.h:276
int errmsg(const char *fmt,...)
Definition: elog.c:822

◆ WalRcvForceReply()

void WalRcvForceReply ( void  )

Definition at line 1316 of file walreceiver.c.

References WalRcvData::force_reply, WalRcvData::latch, WalRcvData::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by StartupXLOG(), WaitForWALToBecomeAvailable(), and walrcv_clear_result().

1317 {
1318  Latch *latch;
1319 
1320  WalRcv->force_reply = true;
1321  /* fetching the latch pointer might not be atomic, so use spinlock */
1323  latch = WalRcv->latch;
1325  if (latch)
1326  SetLatch(latch);
1327 }
slock_t mutex
Definition: walreceiver.h:143
sig_atomic_t force_reply
Definition: walreceiver.h:150
void SetLatch(Latch *latch)
Definition: latch.c:436
#define SpinLockAcquire(lock)
Definition: spin.h:62
Definition: latch.h:110
Latch * latch
Definition: walreceiver.h:141
#define SpinLockRelease(lock)
Definition: spin.h:64
WalRcvData * WalRcv

◆ WalRcvGetStateString()

static const char* WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1334 of file walreceiver.c.

References WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

1335 {
1336  switch (state)
1337  {
1338  case WALRCV_STOPPED:
1339  return "stopped";
1340  case WALRCV_STARTING:
1341  return "starting";
1342  case WALRCV_STREAMING:
1343  return "streaming";
1344  case WALRCV_WAITING:
1345  return "waiting";
1346  case WALRCV_RESTARTING:
1347  return "restarting";
1348  case WALRCV_STOPPING:
1349  return "stopping";
1350  }
1351  return "UNKNOWN";
1352 }
Definition: regguts.h:298

◆ WalRcvShutdownHandler()

static void WalRcvShutdownHandler ( SIGNAL_ARGS  )
static

Definition at line 813 of file walreceiver.c.

References got_SIGTERM, WalRcvData::latch, SetLatch(), and WalRcv.

Referenced by WalReceiverMain().

814 {
815  int save_errno = errno;
816 
817  got_SIGTERM = true;
818 
819  if (WalRcv->latch)
821 
822  errno = save_errno;
823 }
void SetLatch(Latch *latch)
Definition: latch.c:436
Latch * latch
Definition: walreceiver.h:141
WalRcvData * WalRcv
static volatile sig_atomic_t got_SIGTERM
Definition: walreceiver.c:102

◆ WalRcvSigHupHandler()

static void WalRcvSigHupHandler ( SIGNAL_ARGS  )
static

Definition at line 805 of file walreceiver.c.

References got_SIGHUP.

Referenced by WalReceiverMain().

806 {
807  got_SIGHUP = true;
808 }
static volatile sig_atomic_t got_SIGHUP
Definition: walreceiver.c:101

◆ WalRcvWaitForStartPosition()

static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 649 of file walreceiver.c.

References Assert, elog, FATAL, InvalidXLogRecPtr, WalRcvData::latch, WalRcvData::mutex, proc_exit(), ProcessWalRcvInterrupts(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, ResetLatch(), set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WAIT_EVENT_WAL_RECEIVER_WAIT_START, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by WalReceiverMain().

650 {
651  WalRcvData *walrcv = WalRcv;
652  int state;
653 
654  SpinLockAcquire(&walrcv->mutex);
655  state = walrcv->walRcvState;
656  if (state != WALRCV_STREAMING)
657  {
658  SpinLockRelease(&walrcv->mutex);
659  if (state == WALRCV_STOPPING)
660  proc_exit(0);
661  else
662  elog(FATAL, "unexpected walreceiver state");
663  }
664  walrcv->walRcvState = WALRCV_WAITING;
666  walrcv->receiveStartTLI = 0;
667  SpinLockRelease(&walrcv->mutex);
668 
670  set_ps_display("idle", false);
671 
672  /*
673  * nudge startup process to notice that we've stopped streaming and are
674  * now waiting for instructions.
675  */
676  WakeupRecovery();
677  for (;;)
678  {
679  ResetLatch(walrcv->latch);
680 
682 
683  SpinLockAcquire(&walrcv->mutex);
684  Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
685  walrcv->walRcvState == WALRCV_WAITING ||
686  walrcv->walRcvState == WALRCV_STOPPING);
687  if (walrcv->walRcvState == WALRCV_RESTARTING)
688  {
689  /* we don't expect primary_conninfo to change */
690  *startpoint = walrcv->receiveStart;
691  *startpointTLI = walrcv->receiveStartTLI;
692  walrcv->walRcvState = WALRCV_STREAMING;
693  SpinLockRelease(&walrcv->mutex);
694  break;
695  }
696  if (walrcv->walRcvState == WALRCV_STOPPING)
697  {
698  /*
699  * We should've received SIGTERM if the startup process wants us
700  * to die, but might as well check it here too.
701  */
702  SpinLockRelease(&walrcv->mutex);
703  exit(1);
704  }
705  SpinLockRelease(&walrcv->mutex);
706 
707  (void) WaitLatch(walrcv->latch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
709  }
710 
712  {
713  char activitymsg[50];
714 
715  snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
716  (uint32) (*startpoint >> 32),
717  (uint32) *startpoint);
718  set_ps_display(activitymsg, false);
719  }
720 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
slock_t mutex
Definition: walreceiver.h:143
bool update_process_title
Definition: ps_status.c:35
WalRcvState walRcvState
Definition: walreceiver.h:64
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:147
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:335
void proc_exit(int code)
Definition: ipc.c:104
void ResetLatch(Latch *latch)
Definition: latch.c:519
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:344
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12300
#define FATAL
Definition: elog.h:52
unsigned int uint32
Definition: c.h:359
TimeLineID receiveStartTLI
Definition: walreceiver.h:74
Latch * latch
Definition: walreceiver.h:141
#define SpinLockRelease(lock)
Definition: spin.h:64
#define Assert(condition)
Definition: c.h:739
Definition: regguts.h:298
WalRcvData * WalRcv
#define elog(elevel,...)
Definition: elog.h:228
XLogRecPtr receiveStart
Definition: walreceiver.h:73
#define snprintf
Definition: port.h:192
#define WL_LATCH_SET
Definition: latch.h:124
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ WalReceiverMain()

void WalReceiverMain ( void  )

Definition at line 168 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, Assert, BlockSig, buf, close, cluster_name, WalRcvData::conninfo, DEBUG1, elog, ereport, errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, WalRcvData::force_reply, GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), got_SIGHUP, initStringInfo(), WalRcvData::is_temp_slot, WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latch, WalRcvData::latestWalEndTime, load_file(), LOG, WalRcvStreamOptions::logical, LogstreamResult, MAXCONNINFO, MAXFNAMELEN, WalRcvData::mutex, MyProc, MyProcPid, NAMEDATALEN, NAPTIME_PER_CYCLE, NI_MAXHOST, now(), on_shmem_exit(), options, PANIC, pfree(), pg_memory_barrier, PG_SETMASK, PGC_SIGHUP, PGINVALID_SOCKET, WalRcvStreamOptions::physical, WalRcvData::pid, pqsignal(), proc_exit(), ProcessConfigFile(), ProcessWalRcvInterrupts(), PGPROC::procLatch, procsignal_sigusr1_handler(), WalRcvStreamOptions::proto, WalRcvData::ready_to_display, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, ResetLatch(), WalRcvData::sender_host, WalRcvData::sender_port, SIG_DFL, SIG_IGN, SIGALRM, SIGCHLD, SIGHUP, SignalHandlerForCrashExit(), SIGPIPE, SIGQUIT, SIGUSR1, SIGUSR2, WalRcvData::slotname, WalRcvStreamOptions::slotname, snprintf, SpinLockAcquire, SpinLockRelease, WalRcvStreamOptions::startpoint, strlcpy(), ThisTimeLineID, TimestampTzPlusMilliseconds, UINT64_FORMAT, UnBlockSig, WAIT_EVENT_WAL_RECEIVER_MAIN, WaitLatchOrSocket(), wal_receiver_create_temp_slot, wal_receiver_timeout, wal_segment_size, WalRcv, walrcv_connect, walrcv_create_slot, walrcv_endstreaming, walrcv_get_backend_pid, walrcv_get_conninfo, walrcv_get_senderinfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvShutdownHandler(), WalRcvSigHupHandler(), WalRcvData::walRcvState, WalRcvWaitForStartPosition(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_TIMEOUT, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName, XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by AuxiliaryProcessMain(), and walrcv_clear_result().

169 {
170  char conninfo[MAXCONNINFO];
171  char *tmp_conninfo;
172  char slotname[NAMEDATALEN];
173  bool is_temp_slot;
174  XLogRecPtr startpoint;
175  TimeLineID startpointTLI;
176  TimeLineID primaryTLI;
177  bool first_stream;
178  WalRcvData *walrcv = WalRcv;
179  TimestampTz last_recv_timestamp;
181  bool ping_sent;
182  char *err;
183  char *sender_host = NULL;
184  int sender_port = 0;
185 
186  /*
187  * WalRcv should be set up already (if we are a backend, we inherit this
188  * by fork() or EXEC_BACKEND mechanism from the postmaster).
189  */
190  Assert(walrcv != NULL);
191 
192  now = GetCurrentTimestamp();
193 
194  /*
195  * Mark walreceiver as running in shared memory.
196  *
197  * Do this as early as possible, so that if we fail later on, we'll set
198  * state to STOPPED. If we die before this, the startup process will keep
199  * waiting for us to start up, until it times out.
200  */
201  SpinLockAcquire(&walrcv->mutex);
202  Assert(walrcv->pid == 0);
203  switch (walrcv->walRcvState)
204  {
205  case WALRCV_STOPPING:
206  /* If we've already been requested to stop, don't start up. */
207  walrcv->walRcvState = WALRCV_STOPPED;
208  /* fall through */
209 
210  case WALRCV_STOPPED:
211  SpinLockRelease(&walrcv->mutex);
212  proc_exit(1);
213  break;
214 
215  case WALRCV_STARTING:
216  /* The usual case */
217  break;
218 
219  case WALRCV_WAITING:
220  case WALRCV_STREAMING:
221  case WALRCV_RESTARTING:
222  default:
223  /* Shouldn't happen */
224  SpinLockRelease(&walrcv->mutex);
225  elog(PANIC, "walreceiver still running according to shared memory state");
226  }
227  /* Advertise our PID so that the startup process can kill us */
228  walrcv->pid = MyProcPid;
229  walrcv->walRcvState = WALRCV_STREAMING;
230 
231  /* Fetch information required to start streaming */
232  walrcv->ready_to_display = false;
233  strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
234  strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
235  is_temp_slot = walrcv->is_temp_slot;
236  startpoint = walrcv->receiveStart;
237  startpointTLI = walrcv->receiveStartTLI;
238 
239  /* Initialise to a sanish value */
240  walrcv->lastMsgSendTime =
241  walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
242 
243  /* Report the latch to use to awaken this process */
244  walrcv->latch = &MyProc->procLatch;
245 
246  SpinLockRelease(&walrcv->mutex);
247 
248  /* Arrange to clean up at walreceiver exit */
250 
251  /* Properly accept or ignore signals the postmaster might send us */
252  pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */
253  pqsignal(SIGINT, SIG_IGN);
254  pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */
260 
261  /* Reset some signals that are accepted by postmaster but not here */
263 
264  /* We allow SIGQUIT (quickdie) at all times */
265  sigdelset(&BlockSig, SIGQUIT);
266 
267  /* Load the libpq-specific functions */
268  load_file("libpqwalreceiver", false);
269  if (WalReceiverFunctions == NULL)
270  elog(ERROR, "libpqwalreceiver didn't initialize correctly");
271 
272  /* Unblock signals (they were blocked when the postmaster forked us) */
274 
275  /* Establish the connection to the primary for XLOG streaming */
276  wrconn = walrcv_connect(conninfo, false, cluster_name[0] ? cluster_name : "walreceiver", &err);
277  if (!wrconn)
278  ereport(ERROR,
279  (errmsg("could not connect to the primary server: %s", err)));
280 
281  /*
282  * Save user-visible connection string. This clobbers the original
283  * conninfo, for security. Also save host and port of the sender server
284  * this walreceiver is connected to.
285  */
286  tmp_conninfo = walrcv_get_conninfo(wrconn);
287  walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
288  SpinLockAcquire(&walrcv->mutex);
289  memset(walrcv->conninfo, 0, MAXCONNINFO);
290  if (tmp_conninfo)
291  strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
292 
293  memset(walrcv->sender_host, 0, NI_MAXHOST);
294  if (sender_host)
295  strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
296 
297  walrcv->sender_port = sender_port;
298  walrcv->ready_to_display = true;
299  SpinLockRelease(&walrcv->mutex);
300 
301  if (tmp_conninfo)
302  pfree(tmp_conninfo);
303 
304  if (sender_host)
305  pfree(sender_host);
306 
307  first_stream = true;
308  for (;;)
309  {
310  char *primary_sysid;
311  char standby_sysid[32];
313 
314  /*
315  * Check that we're connected to a valid server using the
316  * IDENTIFY_SYSTEM replication command.
317  */
318  primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
319 
320  snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
322  if (strcmp(primary_sysid, standby_sysid) != 0)
323  {
324  ereport(ERROR,
325  (errmsg("database system identifier differs between the primary and standby"),
326  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
327  primary_sysid, standby_sysid)));
328  }
329 
330  /*
331  * Confirm that the current timeline of the primary is the same or
332  * ahead of ours.
333  */
334  if (primaryTLI < startpointTLI)
335  ereport(ERROR,
336  (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
337  primaryTLI, startpointTLI)));
338 
339  /*
340  * Get any missing history files. We do this always, even when we're
341  * not interested in that timeline, so that if we're promoted to
342  * become the master later on, we don't select the same timeline that
343  * was already used in the current master. This isn't bullet-proof -
344  * you'll need some external software to manage your cluster if you
345  * need to ensure that a unique timeline id is chosen in every case,
346  * but let's avoid the confusion of timeline id collisions where we
347  * can.
348  */
349  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
350 
351  /*
352  * Create temporary replication slot if no slot name is configured or
353  * the slot from the previous run was temporary, unless
354  * wal_receiver_create_temp_slot is disabled. We also need to handle
355  * the case where the previous run used a temporary slot but
356  * wal_receiver_create_temp_slot was changed in the meantime. In that
357  * case, we delete the old slot name in shared memory. (This would
358  * all be a bit easier if we just didn't copy the slot name into
359  * shared memory, since we won't need it again later, but then we
360  * can't see the slot name in the stats views.)
361  */
362  if (slotname[0] == '\0' || is_temp_slot)
363  {
364  bool changed = false;
365 
367  {
368  snprintf(slotname, sizeof(slotname),
369  "pg_walreceiver_%lld",
370  (long long int) walrcv_get_backend_pid(wrconn));
371 
372  walrcv_create_slot(wrconn, slotname, true, 0, NULL);
373  changed = true;
374  }
375  else if (slotname[0] != '\0')
376  {
377  slotname[0] = '\0';
378  changed = true;
379  }
380 
381  if (changed)
382  {
383  SpinLockAcquire(&walrcv->mutex);
384  strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
386  SpinLockRelease(&walrcv->mutex);
387  }
388  }
389 
390  /*
391  * Start streaming.
392  *
393  * We'll try to start at the requested starting point and timeline,
394  * even if it's different from the server's latest timeline. In case
395  * we've already reached the end of the old timeline, the server will
396  * finish the streaming immediately, and we will go back to await
397  * orders from the startup process. If recovery_target_timeline is
398  * 'latest', the startup process will scan pg_wal and find the new
399  * history file, bump recovery target timeline, and ask us to restart
400  * on the new timeline.
401  */
402  options.logical = false;
403  options.startpoint = startpoint;
404  options.slotname = slotname[0] != '\0' ? slotname : NULL;
405  options.proto.physical.startpointTLI = startpointTLI;
406  ThisTimeLineID = startpointTLI;
407  if (walrcv_startstreaming(wrconn, &options))
408  {
409  if (first_stream)
410  ereport(LOG,
411  (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
412  (uint32) (startpoint >> 32), (uint32) startpoint,
413  startpointTLI)));
414  else
415  ereport(LOG,
416  (errmsg("restarted WAL streaming at %X/%X on timeline %u",
417  (uint32) (startpoint >> 32), (uint32) startpoint,
418  startpointTLI)));
419  first_stream = false;
420 
421  /* Initialize LogstreamResult and buffers for processing messages */
425 
426  /* Initialize the last recv timestamp */
427  last_recv_timestamp = GetCurrentTimestamp();
428  ping_sent = false;
429 
430  /* Loop until end-of-streaming or error */
431  for (;;)
432  {
433  char *buf;
434  int len;
435  bool endofwal = false;
436  pgsocket wait_fd = PGINVALID_SOCKET;
437  int rc;
438 
439  /*
440  * Exit walreceiver if we're not in recovery. This should not
441  * happen, but cross-check the status here.
442  */
443  if (!RecoveryInProgress())
444  ereport(FATAL,
445  (errmsg("cannot continue WAL streaming, recovery has already ended")));
446 
447  /* Process any requests or signals received recently */
449 
450  if (got_SIGHUP)
451  {
452  got_SIGHUP = false;
455  }
456 
457  /* See if we can read data immediately */
458  len = walrcv_receive(wrconn, &buf, &wait_fd);
459  if (len != 0)
460  {
461  /*
462  * Process the received data, and any subsequent data we
463  * can read without blocking.
464  */
465  for (;;)
466  {
467  if (len > 0)
468  {
469  /*
470  * Something was received from master, so reset
471  * timeout
472  */
473  last_recv_timestamp = GetCurrentTimestamp();
474  ping_sent = false;
475  XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
476  }
477  else if (len == 0)
478  break;
479  else if (len < 0)
480  {
481  ereport(LOG,
482  (errmsg("replication terminated by primary server"),
483  errdetail("End of WAL reached on timeline %u at %X/%X.",
484  startpointTLI,
485  (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
486  endofwal = true;
487  break;
488  }
489  len = walrcv_receive(wrconn, &buf, &wait_fd);
490  }
491 
492  /* Let the master know that we received some data. */
493  XLogWalRcvSendReply(false, false);
494 
495  /*
496  * If we've written some records, flush them to disk and
497  * let the startup process and primary server know about
498  * them.
499  */
500  XLogWalRcvFlush(false);
501  }
502 
503  /* Check if we need to exit the streaming loop. */
504  if (endofwal)
505  break;
506 
507  /*
508  * Ideally we would reuse a WaitEventSet object repeatedly
509  * here to avoid the overheads of WaitLatchOrSocket on epoll
510  * systems, but we can't be sure that libpq (or any other
511  * walreceiver implementation) has the same socket (even if
512  * the fd is the same number, it may have been closed and
513  * reopened since the last time). In future, if there is a
514  * function for removing sockets from WaitEventSet, then we
515  * could add and remove just the socket each time, potentially
516  * avoiding some system calls.
517  */
518  Assert(wait_fd != PGINVALID_SOCKET);
519  rc = WaitLatchOrSocket(walrcv->latch,
522  wait_fd,
525  if (rc & WL_LATCH_SET)
526  {
527  ResetLatch(walrcv->latch);
529 
530  if (walrcv->force_reply)
531  {
532  /*
533  * The recovery process has asked us to send apply
534  * feedback now. Make sure the flag is really set to
535  * false in shared memory before sending the reply, so
536  * we don't miss a new request for a reply.
537  */
538  walrcv->force_reply = false;
540  XLogWalRcvSendReply(true, false);
541  }
542  }
543  if (rc & WL_TIMEOUT)
544  {
545  /*
546  * We didn't receive anything new. If we haven't heard
547  * anything from the server for more than
548  * wal_receiver_timeout / 2, ping the server. Also, if
549  * it's been longer than wal_receiver_status_interval
550  * since the last update we sent, send a status update to
551  * the master anyway, to report any progress in applying
552  * WAL.
553  */
554  bool requestReply = false;
555 
556  /*
557  * Check if time since last receive from standby has
558  * reached the configured limit.
559  */
560  if (wal_receiver_timeout > 0)
561  {
563  TimestampTz timeout;
564 
565  timeout =
566  TimestampTzPlusMilliseconds(last_recv_timestamp,
568 
569  if (now >= timeout)
570  ereport(ERROR,
571  (errmsg("terminating walreceiver due to timeout")));
572 
573  /*
574  * We didn't receive anything new, for half of
575  * receiver replication timeout. Ping the server.
576  */
577  if (!ping_sent)
578  {
579  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
580  (wal_receiver_timeout / 2));
581  if (now >= timeout)
582  {
583  requestReply = true;
584  ping_sent = true;
585  }
586  }
587  }
588 
589  XLogWalRcvSendReply(requestReply, requestReply);
591  }
592  }
593 
594  /*
595  * The backend finished streaming. Exit streaming COPY-mode from
596  * our side, too.
597  */
598  walrcv_endstreaming(wrconn, &primaryTLI);
599 
600  /*
601  * If the server had switched to a new timeline that we didn't
602  * know about when we began streaming, fetch its timeline history
603  * file now.
604  */
605  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
606  }
607  else
608  ereport(LOG,
609  (errmsg("primary server contains no more WAL on requested timeline %u",
610  startpointTLI)));
611 
612  /*
613  * End of WAL reached on the requested timeline. Close the last
614  * segment, and await for new orders from the startup process.
615  */
616  if (recvFile >= 0)
617  {
618  char xlogfname[MAXFNAMELEN];
619 
620  XLogWalRcvFlush(false);
622  if (close(recvFile) != 0)
623  ereport(PANIC,
625  errmsg("could not close log segment %s: %m",
626  xlogfname)));
627 
628  /*
629  * Create .done file forcibly to prevent the streamed segment from
630  * being archived later.
631  */
633  XLogArchiveForceDone(xlogfname);
634  else
635  XLogArchiveNotify(xlogfname);
636  }
637  recvFile = -1;
638 
639  elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
640  WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
641  }
642  /* not reached */
643 }
static struct @25 LogstreamResult
int sender_port
Definition: walreceiver.h:117
#define SIGQUIT
Definition: win32_port.h:155
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
Definition: walreceiver.c:727
#define walrcv_get_conninfo(conn)
Definition: walreceiver.h:268
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:280
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
uint32 TimeLineID
Definition: xlogdefs.h:52
slock_t mutex
Definition: walreceiver.h:143
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
int wal_segment_size
Definition: xlog.c:112
#define SIGUSR1
Definition: win32_port.h:166
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
#define SIGCHLD
Definition: win32_port.h:164
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
Definition: walreceiver.c:649
sig_atomic_t force_reply
Definition: walreceiver.h:150
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:272
WalRcvState walRcvState
Definition: walreceiver.h:64
static StringInfoData incoming_message
Definition: walreceiver.c:115
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1147
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:282
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:147
union WalRcvStreamOptions::@106 proto
static void WalRcvSigHupHandler(SIGNAL_ARGS)
Definition: walreceiver.c:805
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:278
void proc_exit(int code)
Definition: ipc.c:104
#define WL_SOCKET_READABLE
Definition: latch.h:125
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
Definition: walreceiver.c:829
#define SIGPIPE
Definition: win32_port.h:159
#define SIGUSR2
Definition: win32_port.h:167
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7930
#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
Definition: walreceiver.h:286
#define PANIC
Definition: elog.h:53
#define NAPTIME_PER_CYCLE
Definition: walreceiver.c:85
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:98
void ResetLatch(Latch *latch)
Definition: latch.c:519
TimestampTz lastMsgSendTime
Definition: walreceiver.h:97
#define PG_SETMASK(mask)
Definition: pqsignal.h:19
int wal_receiver_timeout
Definition: walreceiver.c:78
Latch procLatch
Definition: proc.h:104
static WalReceiverConn * wrconn
Definition: walreceiver.c:82
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
static StringInfoData reply_message
Definition: walreceiver.c:114
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:511
void SignalHandlerForCrashExit(SIGNAL_ARGS)
Definition: interrupt.c:72
void pfree(void *pointer)
Definition: mcxt.c:1056
#define NI_MAXHOST
Definition: getaddrinfo.h:88
#define ERROR
Definition: elog.h:43
#define FATAL
Definition: elog.h:52
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11193
pid_t pid
Definition: walreceiver.h:63
#define MAXCONNINFO
Definition: walreceiver.h:36
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
static char * buf
Definition: pg_test_fsync.c:67
int XLogArchiveMode
Definition: xlog.c:91
int errdetail(const char *fmt,...)
Definition: elog.c:955
int errcode_for_file_access(void)
Definition: elog.c:631
#define SIGHUP
Definition: win32_port.h:154
XLogRecPtr startpoint
Definition: walreceiver.h:160
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
unsigned int uint32
Definition: c.h:359
int pgsocket
Definition: port.h:31
sigset_t UnBlockSig
Definition: pqsignal.c:22
struct WalRcvStreamOptions::@106::@107 physical
static volatile sig_atomic_t got_SIGHUP
Definition: walreceiver.c:101
#define ereport(elevel, rest)
Definition: elog.h:141
TimeLineID receiveStartTLI
Definition: walreceiver.h:74
static TimeLineID recvFileTLI
Definition: walreceiver.c:93
#define walrcv_get_backend_pid(conn)
Definition: walreceiver.h:288
Definition: guc.h:72
char * cluster_name
Definition: guc.c:533
Latch * latch
Definition: walreceiver.h:141
#define SIG_IGN
Definition: win32_port.h:151
static char ** options
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
static XLogSegNo recvSegNo
Definition: walreceiver.c:94
bool wal_receiver_create_temp_slot
Definition: walreceiver.c:76
#define MAXFNAMELEN
#define SpinLockRelease(lock)
Definition: spin.h:64
sigset_t BlockSig
Definition: pqsignal.c:22
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:561
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:83
#define PGINVALID_SOCKET
Definition: port.h:33
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1078
bool ready_to_display
Definition: walreceiver.h:132
TimestampTz latestWalEndTime
Definition: walreceiver.h:104
static void WalRcvShutdownHandler(SIGNAL_ARGS)
Definition: walreceiver.c:813
TimeLineID ThisTimeLineID
Definition: xlog.c:187
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define pg_memory_barrier()
Definition: atomics.h:145
#define SIG_DFL
Definition: win32_port.h:149
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:739
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
#define SIGALRM
Definition: win32_port.h:160
bool is_temp_slot
Definition: walreceiver.h:129
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1019
WalRcvData * WalRcv
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4798
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:116
int errmsg(const char *fmt,...)
Definition: elog.c:822
static void WalRcvDie(int code, Datum arg)
Definition: walreceiver.c:774
#define elog(elevel,...)
Definition: elog.h:228
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
Definition: walreceiver.h:270
#define close(a)
Definition: win32.h:12
XLogRecPtr receiveStart
Definition: walreceiver.h:73
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:533
static int recvFile
Definition: walreceiver.c:92
#define snprintf
Definition: port.h:192
#define WL_LATCH_SET
Definition: latch.h:124
#define UINT64_FORMAT
Definition: c.h:402
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
char slotname[NAMEDATALEN]
Definition: walreceiver.h:123
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:110
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:264

◆ XLogWalRcvFlush()

static void XLogWalRcvFlush ( bool  dying)
static

Definition at line 1019 of file walreceiver.c.

References AllowCascadeReplication, issue_xlog_fsync(), WalRcvData::latestChunkStart, LogstreamResult, WalRcvData::mutex, WalRcvData::receivedTLI, WalRcvData::receivedUpto, recvFile, recvSegNo, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, ThisTimeLineID, update_process_title, WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvWrite().

1020 {
1021  if (LogstreamResult.Flush < LogstreamResult.Write)
1022  {
1023  WalRcvData *walrcv = WalRcv;
1024 
1026 
1027  LogstreamResult.Flush = LogstreamResult.Write;
1028 
1029  /* Update shared-memory status */
1030  SpinLockAcquire(&walrcv->mutex);
1031  if (walrcv->receivedUpto < LogstreamResult.Flush)
1032  {
1033  walrcv->latestChunkStart = walrcv->receivedUpto;
1034  walrcv->receivedUpto = LogstreamResult.Flush;
1035  walrcv->receivedTLI = ThisTimeLineID;
1036  }
1037  SpinLockRelease(&walrcv->mutex);
1038 
1039  /* Signal the startup process and walsender that new WAL has arrived */
1040  WakeupRecovery();
1042  WalSndWakeup();
1043 
1044  /* Report XLOG streaming progress in PS display */
1046  {
1047  char activitymsg[50];
1048 
1049  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1050  (uint32) (LogstreamResult.Write >> 32),
1051  (uint32) LogstreamResult.Write);
1052  set_ps_display(activitymsg, false);
1053  }
1054 
1055  /* Also let the master know that we made some progress */
1056  if (!dying)
1057  {
1058  XLogWalRcvSendReply(false, false);
1059  XLogWalRcvSendHSFeedback(false);
1060  }
1061  }
1062 }
static struct @25 LogstreamResult
slock_t mutex
Definition: walreceiver.h:143
bool update_process_title
Definition: ps_status.c:35
void issue_xlog_fsync(int fd, XLogSegNo segno)
Definition: xlog.c:10126
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1147
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:335
TimeLineID receivedTLI
Definition: walreceiver.h:84
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12300
XLogRecPtr latestChunkStart
Definition: walreceiver.h:92
#define AllowCascadeReplication()
Definition: walreceiver.h:39
unsigned int uint32
Definition: c.h:359
XLogRecPtr receivedUpto
Definition: walreceiver.h:83
static XLogSegNo recvSegNo
Definition: walreceiver.c:94
#define SpinLockRelease(lock)
Definition: spin.h:64
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1078
TimeLineID ThisTimeLineID
Definition: xlog.c:187
WalRcvData * WalRcv
static int recvFile
Definition: walreceiver.c:92
#define snprintf
Definition: port.h:192
void WalSndWakeup(void)
Definition: walsender.c:3044

◆ XLogWalRcvProcessMsg()

static void XLogWalRcvProcessMsg ( unsigned char  type,
char *  buf,
Size  len 
)
static

Definition at line 829 of file walreceiver.c.

References appendBinaryStringInfo(), ereport, errcode(), errmsg_internal(), ERROR, pq_getmsgbyte(), pq_getmsgint64(), ProcessWalSndrMessage(), resetStringInfo(), XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

830 {
831  int hdrlen;
832  XLogRecPtr dataStart;
833  XLogRecPtr walEnd;
834  TimestampTz sendTime;
835  bool replyRequested;
836 
838 
839  switch (type)
840  {
841  case 'w': /* WAL records */
842  {
843  /* copy message to StringInfo */
844  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
845  if (len < hdrlen)
846  ereport(ERROR,
847  (errcode(ERRCODE_PROTOCOL_VIOLATION),
848  errmsg_internal("invalid WAL message received from primary")));
850 
851  /* read the fields */
852  dataStart = pq_getmsgint64(&incoming_message);
853  walEnd = pq_getmsgint64(&incoming_message);
854  sendTime = pq_getmsgint64(&incoming_message);
855  ProcessWalSndrMessage(walEnd, sendTime);
856 
857  buf += hdrlen;
858  len -= hdrlen;
859  XLogWalRcvWrite(buf, len, dataStart);
860  break;
861  }
862  case 'k': /* Keepalive */
863  {
864  /* copy message to StringInfo */
865  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
866  if (len != hdrlen)
867  ereport(ERROR,
868  (errcode(ERRCODE_PROTOCOL_VIOLATION),
869  errmsg_internal("invalid keepalive message received from primary")));
871 
872  /* read the fields */
873  walEnd = pq_getmsgint64(&incoming_message);
874  sendTime = pq_getmsgint64(&incoming_message);
875  replyRequested = pq_getmsgbyte(&incoming_message);
876 
877  ProcessWalSndrMessage(walEnd, sendTime);
878 
879  /* If the primary requested a reply, send one immediately */
880  if (replyRequested)
881  XLogWalRcvSendReply(true, false);
882  break;
883  }
884  default:
885  ereport(ERROR,
886  (errcode(ERRCODE_PROTOCOL_VIOLATION),
887  errmsg_internal("invalid replication message type %d",
888  type)));
889  }
890 }
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
Definition: walreceiver.c:1263
int64 TimestampTz
Definition: timestamp.h:39
static StringInfoData incoming_message
Definition: walreceiver.c:115
int errcode(int sqlerrcode)
Definition: elog.c:608
#define ERROR
Definition: elog.h:43
static char * buf
Definition: pg_test_fsync.c:67
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
#define ereport(elevel, rest)
Definition: elog.h:141
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1078
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
int errmsg_internal(const char *fmt,...)
Definition: elog.c:909
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
Definition: walreceiver.c:896

◆ XLogWalRcvSendHSFeedback()

static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1147 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, EpochFromFullTransactionId, GetCurrentTimestamp(), GetOldestXmin(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), pq_sendbyte(), pq_sendint32(), pq_sendint64(), PROCARRAY_FLAGS_DEFAULT, PROCARRAY_SLOTS_XMIN, ProcArrayGetReplicationSlotXmin(), ReadNextFullTransactionId(), resetStringInfo(), TimestampDifferenceExceeds(), TransactionIdIsValid, TransactionIdPrecedes(), wal_receiver_status_interval, walrcv_send, and XidFromFullTransactionId.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

1148 {
1149  TimestampTz now;
1150  FullTransactionId nextFullXid;
1151  TransactionId nextXid;
1152  uint32 xmin_epoch,
1153  catalog_xmin_epoch;
1154  TransactionId xmin,
1155  catalog_xmin;
1156  static TimestampTz sendTime = 0;
1157 
1158  /* initially true so we always send at least one feedback message */
1159  static bool master_has_standby_xmin = true;
1160 
1161  /*
1162  * If the user doesn't want status to be reported to the master, be sure
1163  * to exit before doing anything at all.
1164  */
1166  !master_has_standby_xmin)
1167  return;
1168 
1169  /* Get current timestamp. */
1170  now = GetCurrentTimestamp();
1171 
1172  if (!immed)
1173  {
1174  /*
1175  * Send feedback at most once per wal_receiver_status_interval.
1176  */
1177  if (!TimestampDifferenceExceeds(sendTime, now,
1179  return;
1180  sendTime = now;
1181  }
1182 
1183  /*
1184  * If Hot Standby is not yet accepting connections there is nothing to
1185  * send. Check this after the interval has expired to reduce number of
1186  * calls.
1187  *
1188  * Bailing out here also ensures that we don't send feedback until we've
1189  * read our own replication slot state, so we don't tell the master to
1190  * discard needed xmin or catalog_xmin from any slots that may exist on
1191  * this replica.
1192  */
1193  if (!HotStandbyActive())
1194  return;
1195 
1196  /*
1197  * Make the expensive call to get the oldest xmin once we are certain
1198  * everything else has been checked.
1199  */
1201  {
1202  TransactionId slot_xmin;
1203 
1204  /*
1205  * Usually GetOldestXmin() would include both global replication slot
1206  * xmin and catalog_xmin in its calculations, but we want to derive
1207  * separate values for each of those. So we ask for an xmin that
1208  * excludes the catalog_xmin.
1209  */
1210  xmin = GetOldestXmin(NULL,
1212 
1213  ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
1214 
1215  if (TransactionIdIsValid(slot_xmin) &&
1216  TransactionIdPrecedes(slot_xmin, xmin))
1217  xmin = slot_xmin;
1218  }
1219  else
1220  {
1221  xmin = InvalidTransactionId;
1222  catalog_xmin = InvalidTransactionId;
1223  }
1224 
1225  /*
1226  * Get epoch and adjust if nextXid and oldestXmin are different sides of
1227  * the epoch boundary.
1228  */
1229  nextFullXid = ReadNextFullTransactionId();
1230  nextXid = XidFromFullTransactionId(nextFullXid);
1231  xmin_epoch = EpochFromFullTransactionId(nextFullXid);
1232  catalog_xmin_epoch = xmin_epoch;
1233  if (nextXid < xmin)
1234  xmin_epoch--;
1235  if (nextXid < catalog_xmin)
1236  catalog_xmin_epoch--;
1237 
1238  elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1239  xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1240 
1241  /* Construct the message and send it. */
1243  pq_sendbyte(&reply_message, 'h');
1245  pq_sendint32(&reply_message, xmin);
1246  pq_sendint32(&reply_message, xmin_epoch);
1247  pq_sendint32(&reply_message, catalog_xmin);
1248  pq_sendint32(&reply_message, catalog_xmin_epoch);
1250  if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1251  master_has_standby_xmin = true;
1252  else
1253  master_has_standby_xmin = false;
1254 }
bool hot_standby_feedback
Definition: walreceiver.c:79
uint32 TransactionId
Definition: c.h:514
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
bool HotStandbyActive(void)
Definition: xlog.c:7986
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
int wal_receiver_status_interval
Definition: walreceiver.c:77
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1682
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:3117
static WalReceiverConn * wrconn
Definition: walreceiver.c:82
#define XidFromFullTransactionId(x)
Definition: transam.h:48
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static StringInfoData reply_message
Definition: walreceiver.c:114
#define PROCARRAY_SLOTS_XMIN
Definition: procarray.h:37
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
unsigned int uint32
Definition: c.h:359
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:246
#define PROCARRAY_FLAGS_DEFAULT
Definition: procarray.h:50
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
TransactionId GetOldestXmin(Relation rel, int flags)
Definition: procarray.c:1306
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:284
#define elog(elevel,...)
Definition: elog.h:228
#define TransactionIdIsValid(xid)
Definition: transam.h:41
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547

◆ XLogWalRcvSendReply()

static void XLogWalRcvSendReply ( bool  force,
bool  requestReply 
)
static

Definition at line 1078 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), GetXLogReplayRecPtr(), StringInfoData::len, LogstreamResult, now(), pq_sendbyte(), pq_sendint64(), resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

1079 {
1080  static XLogRecPtr writePtr = 0;
1081  static XLogRecPtr flushPtr = 0;
1082  XLogRecPtr applyPtr;
1083  static TimestampTz sendTime = 0;
1084  TimestampTz now;
1085 
1086  /*
1087  * If the user doesn't want status to be reported to the master, be sure
1088  * to exit before doing anything at all.
1089  */
1090  if (!force && wal_receiver_status_interval <= 0)
1091  return;
1092 
1093  /* Get current timestamp. */
1094  now = GetCurrentTimestamp();
1095 
1096  /*
1097  * We can compare the write and flush positions to the last message we
1098  * sent without taking any lock, but the apply position requires a spin
1099  * lock, so we don't check that unless something else has changed or 10
1100  * seconds have passed. This means that the apply WAL location will
1101  * appear, from the master's point of view, to lag slightly, but since
1102  * this is only for reporting purposes and only on idle systems, that's
1103  * probably OK.
1104  */
1105  if (!force
1106  && writePtr == LogstreamResult.Write
1107  && flushPtr == LogstreamResult.Flush
1108  && !TimestampDifferenceExceeds(sendTime, now,
1110  return;
1111  sendTime = now;
1112 
1113  /* Construct a new message */
1114  writePtr = LogstreamResult.Write;
1115  flushPtr = LogstreamResult.Flush;
1116  applyPtr = GetXLogReplayRecPtr(NULL);
1117 
1119  pq_sendbyte(&reply_message, 'r');
1120  pq_sendint64(&reply_message, writePtr);
1121  pq_sendint64(&reply_message, flushPtr);
1122  pq_sendint64(&reply_message, applyPtr);
1124  pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1125 
1126  /* Send it */
1127  elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1128  (uint32) (writePtr >> 32), (uint32) writePtr,
1129  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1130  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1131  requestReply ? " (reply requested)" : "");
1132 
1134 }
static struct @25 LogstreamResult
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
int wal_receiver_status_interval
Definition: walreceiver.c:77
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1682
static WalReceiverConn * wrconn
Definition: walreceiver.c:82
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static StringInfoData reply_message
Definition: walreceiver.c:114
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11193
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
unsigned int uint32
Definition: c.h:359
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:284
#define elog(elevel,...)
Definition: elog.h:228
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547

◆ XLogWalRcvWrite()

static void XLogWalRcvWrite ( char *  buf,
Size  nbytes,
XLogRecPtr  recptr 
)
static

Definition at line 896 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, close, ereport, errcode_for_file_access(), errmsg(), LogstreamResult, MAXFNAMELEN, PANIC, recvFile, recvFileTLI, recvOff, recvSegNo, ThisTimeLineID, wal_segment_size, write, XLByteInSeg, XLByteToSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileInit(), XLogFileName, XLogSegmentOffset, and XLogWalRcvFlush().

Referenced by XLogWalRcvProcessMsg().

897 {
898  int startoff;
899  int byteswritten;
900 
901  while (nbytes > 0)
902  {
903  int segbytes;
904 
905  if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
906  {
907  bool use_existent;
908 
909  /*
910  * fsync() and close current file before we switch to next one. We
911  * would otherwise have to reopen this file to fsync it later
912  */
913  if (recvFile >= 0)
914  {
915  char xlogfname[MAXFNAMELEN];
916 
917  XLogWalRcvFlush(false);
918 
920 
921  /*
922  * XLOG segment files will be re-read by recovery in startup
923  * process soon, so we don't advise the OS to release cache
924  * pages associated with the file like XLogFileClose() does.
925  */
926  if (close(recvFile) != 0)
927  ereport(PANIC,
929  errmsg("could not close log segment %s: %m",
930  xlogfname)));
931 
932  /*
933  * Create .done file forcibly to prevent the streamed segment
934  * from being archived later.
935  */
937  XLogArchiveForceDone(xlogfname);
938  else
939  XLogArchiveNotify(xlogfname);
940  }
941  recvFile = -1;
942 
943  /* Create/use new log file */
945  use_existent = true;
946  recvFile = XLogFileInit(recvSegNo, &use_existent, true);
948  recvOff = 0;
949  }
950 
951  /* Calculate the start offset of the received logs */
952  startoff = XLogSegmentOffset(recptr, wal_segment_size);
953 
954  if (startoff + nbytes > wal_segment_size)
955  segbytes = wal_segment_size - startoff;
956  else
957  segbytes = nbytes;
958 
959  /* Need to seek in the file? */
960  if (recvOff != startoff)
961  {
962  if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
963  {
964  char xlogfname[MAXFNAMELEN];
965  int save_errno = errno;
966 
968  errno = save_errno;
969  ereport(PANIC,
971  errmsg("could not seek in log segment %s to offset %u: %m",
972  xlogfname, startoff)));
973  }
974 
975  recvOff = startoff;
976  }
977 
978  /* OK to write the logs */
979  errno = 0;
980 
981  byteswritten = write(recvFile, buf, segbytes);
982  if (byteswritten <= 0)
983  {
984  char xlogfname[MAXFNAMELEN];
985  int save_errno;
986 
987  /* if write didn't set errno, assume no disk space */
988  if (errno == 0)
989  errno = ENOSPC;
990 
991  save_errno = errno;
993  errno = save_errno;
994  ereport(PANIC,
996  errmsg("could not write to log segment %s "
997  "at offset %u, length %lu: %m",
998  xlogfname, recvOff, (unsigned long) segbytes)));
999  }
1000 
1001  /* Update state for write */
1002  recptr += byteswritten;
1003 
1004  recvOff += byteswritten;
1005  nbytes -= byteswritten;
1006  buf += byteswritten;
1007 
1008  LogstreamResult.Write = recptr;
1009  }
1010 }
static struct @25 LogstreamResult
int wal_segment_size
Definition: xlog.c:112
#define write(a, b, c)
Definition: win32.h:14
int XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
Definition: xlog.c:3211
#define PANIC
Definition: elog.h:53
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:511
static char * buf
Definition: pg_test_fsync.c:67
int XLogArchiveMode
Definition: xlog.c:91
int errcode_for_file_access(void)
Definition: elog.c:631
#define ereport(elevel, rest)
Definition: elog.h:141
static TimeLineID recvFileTLI
Definition: walreceiver.c:93
static XLogSegNo recvSegNo
Definition: walreceiver.c:94
#define MAXFNAMELEN
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:561
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
TimeLineID ThisTimeLineID
Definition: xlog.c:187
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1019
int errmsg(const char *fmt,...)
Definition: elog.c:822
static uint32 recvOff
Definition: walreceiver.c:95
#define close(a)
Definition: win32.h:12
static int recvFile
Definition: walreceiver.c:92
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

Variable Documentation

◆ Flush

XLogRecPtr Flush

Definition at line 111 of file walreceiver.c.

◆ got_SIGHUP

volatile sig_atomic_t got_SIGHUP = false
static

Definition at line 101 of file walreceiver.c.

Referenced by WalRcvSigHupHandler(), and WalReceiverMain().

◆ got_SIGTERM

volatile sig_atomic_t got_SIGTERM = false
static

Definition at line 102 of file walreceiver.c.

Referenced by ProcessWalRcvInterrupts(), and WalRcvShutdownHandler().

◆ hot_standby_feedback

bool hot_standby_feedback

Definition at line 79 of file walreceiver.c.

Referenced by XLogWalRcvSendHSFeedback().

◆ incoming_message

StringInfoData incoming_message
static

Definition at line 115 of file walreceiver.c.

◆ LogstreamResult

struct { ... } LogstreamResult

◆ recvFile

int recvFile = -1
static

Definition at line 92 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ recvFileTLI

TimeLineID recvFileTLI = 0
static

Definition at line 93 of file walreceiver.c.

Referenced by WalReceiverMain(), and XLogWalRcvWrite().

◆ recvOff

uint32 recvOff = 0
static

Definition at line 95 of file walreceiver.c.

Referenced by XLogWalRcvWrite().

◆ recvSegNo

XLogSegNo recvSegNo = 0
static

Definition at line 94 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ reply_message

StringInfoData reply_message
static

Definition at line 114 of file walreceiver.c.

Referenced by send_feedback().

◆ wal_receiver_create_temp_slot

bool wal_receiver_create_temp_slot

Definition at line 76 of file walreceiver.c.

Referenced by WalReceiverMain().

◆ wal_receiver_status_interval

int wal_receiver_status_interval

Definition at line 77 of file walreceiver.c.

Referenced by send_feedback(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ wal_receiver_timeout

int wal_receiver_timeout

Definition at line 78 of file walreceiver.c.

Referenced by logicalrep_worker_launch(), LogicalRepApplyLoop(), and WalReceiverMain().

◆ WalReceiverFunctions

WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 83 of file walreceiver.c.

Referenced by _PG_init().

◆ wrconn

WalReceiverConn* wrconn = NULL
static

Definition at line 82 of file walreceiver.c.

◆ Write

XLogRecPtr Write

Definition at line 110 of file walreceiver.c.