PostgreSQL Source Code  git master
walreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "common/ip.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */
 

Functions

static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr)
 
static void XLogWalRcvFlush (bool dying)
 
static void XLogWalRcvClose (XLogRecPtr recptr)
 
static void XLogWalRcvSendReply (bool force, bool requestReply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
void ProcessWalRcvInterrupts (void)
 
void WalReceiverMain (void)
 
void WalRcvForceReply (void)
 
static const char * WalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
struct {
   XLogRecPtr   Write
 
   XLogRecPtr   Flush
 
LogstreamResult
 
static StringInfoData reply_message
 
static StringInfoData incoming_message
 

Macro Definition Documentation

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */

Definition at line 97 of file walreceiver.c.

Referenced by WalReceiverMain().

Function Documentation

◆ pg_stat_get_wal_receiver()

Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1329 of file walreceiver.c.

References WalRcvData::conninfo, CStringGetTextDatum, elog, ERROR, WalRcvData::flushedUpto, get_call_result_type(), GetUserId(), heap_form_tuple(), HeapTupleGetDatum, Int32GetDatum, is_member_of_role(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum, MAXCONNINFO, MemSet, WalRcvData::mutex, NAMEDATALEN, TupleDescData::natts, NI_MAXHOST, palloc0(), pg_atomic_read_u64(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::sender_host, WalRcvData::sender_port, WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, strlcpy(), TimestampTzGetDatum, TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, WalRcvData::writtenUpto, and XLogRecPtrIsInvalid.

1330 {
1331  TupleDesc tupdesc;
1332  Datum *values;
1333  bool *nulls;
1334  int pid;
1335  bool ready_to_display;
1337  XLogRecPtr receive_start_lsn;
1338  TimeLineID receive_start_tli;
1339  XLogRecPtr written_lsn;
1340  XLogRecPtr flushed_lsn;
1341  TimeLineID received_tli;
1342  TimestampTz last_send_time;
1343  TimestampTz last_receipt_time;
1344  XLogRecPtr latest_end_lsn;
1345  TimestampTz latest_end_time;
1346  char sender_host[NI_MAXHOST];
1347  int sender_port = 0;
1348  char slotname[NAMEDATALEN];
1349  char conninfo[MAXCONNINFO];
1350 
1351  /* Take a lock to ensure value consistency */
1353  pid = (int) WalRcv->pid;
1354  ready_to_display = WalRcv->ready_to_display;
1355  state = WalRcv->walRcvState;
1356  receive_start_lsn = WalRcv->receiveStart;
1357  receive_start_tli = WalRcv->receiveStartTLI;
1358  flushed_lsn = WalRcv->flushedUpto;
1359  received_tli = WalRcv->receivedTLI;
1360  last_send_time = WalRcv->lastMsgSendTime;
1361  last_receipt_time = WalRcv->lastMsgReceiptTime;
1362  latest_end_lsn = WalRcv->latestWalEnd;
1363  latest_end_time = WalRcv->latestWalEndTime;
1364  strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1365  strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
1366  sender_port = WalRcv->sender_port;
1367  strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1369 
1370  /*
1371  * No WAL receiver (or not ready yet), just return a tuple with NULL
1372  * values
1373  */
1374  if (pid == 0 || !ready_to_display)
1375  PG_RETURN_NULL();
1376 
1377  /*
1378  * Read "writtenUpto" without holding a spinlock. Note that it may not be
1379  * consistent with the other shared variables of the WAL receiver
1380  * protected by a spinlock, but this should not be used for data integrity
1381  * checks.
1382  */
1383  written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto);
1384 
1385  /* determine result type */
1386  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1387  elog(ERROR, "return type must be a row type");
1388 
1389  values = palloc0(sizeof(Datum) * tupdesc->natts);
1390  nulls = palloc0(sizeof(bool) * tupdesc->natts);
1391 
1392  /* Fetch values */
1393  values[0] = Int32GetDatum(pid);
1394 
1395  if (!is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
1396  {
1397  /*
1398  * Only superusers and members of pg_read_all_stats can see details.
1399  * Other users only get the pid value to know whether it is a WAL
1400  * receiver, but no details.
1401  */
1402  MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1403  }
1404  else
1405  {
1406  values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1407 
1408  if (XLogRecPtrIsInvalid(receive_start_lsn))
1409  nulls[2] = true;
1410  else
1411  values[2] = LSNGetDatum(receive_start_lsn);
1412  values[3] = Int32GetDatum(receive_start_tli);
1413  if (XLogRecPtrIsInvalid(written_lsn))
1414  nulls[4] = true;
1415  else
1416  values[4] = LSNGetDatum(written_lsn);
1417  if (XLogRecPtrIsInvalid(flushed_lsn))
1418  nulls[5] = true;
1419  else
1420  values[5] = LSNGetDatum(flushed_lsn);
1421  values[6] = Int32GetDatum(received_tli);
1422  if (last_send_time == 0)
1423  nulls[7] = true;
1424  else
1425  values[7] = TimestampTzGetDatum(last_send_time);
1426  if (last_receipt_time == 0)
1427  nulls[8] = true;
1428  else
1429  values[8] = TimestampTzGetDatum(last_receipt_time);
1430  if (XLogRecPtrIsInvalid(latest_end_lsn))
1431  nulls[9] = true;
1432  else
1433  values[9] = LSNGetDatum(latest_end_lsn);
1434  if (latest_end_time == 0)
1435  nulls[10] = true;
1436  else
1437  values[10] = TimestampTzGetDatum(latest_end_time);
1438  if (*slotname == '\0')
1439  nulls[11] = true;
1440  else
1441  values[11] = CStringGetTextDatum(slotname);
1442  if (*sender_host == '\0')
1443  nulls[12] = true;
1444  else
1445  values[12] = CStringGetTextDatum(sender_host);
1446  if (sender_port == 0)
1447  nulls[13] = true;
1448  else
1449  values[13] = Int32GetDatum(sender_port);
1450  if (*conninfo == '\0')
1451  nulls[14] = true;
1452  else
1453  values[14] = CStringGetTextDatum(conninfo);
1454  }
1455 
1456  /* Returns the record as Datum */
1457  PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1458 }
int sender_port
Definition: walreceiver.h:119
static const char * WalRcvGetStateString(WalRcvState state)
Definition: walreceiver.c:1304
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:207
uint32 TimeLineID
Definition: xlogdefs.h:59
slock_t mutex
Definition: walreceiver.h:145
Oid GetUserId(void)
Definition: miscinit.c:495
int64 TimestampTz
Definition: timestamp.h:39
WalRcvState walRcvState
Definition: walreceiver.h:65
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
TimeLineID receivedTLI
Definition: walreceiver.h:86
#define MemSet(start, val, len)
Definition: c.h:1008
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:100
TimestampTz lastMsgSendTime
Definition: walreceiver.h:99
WalRcvState
Definition: walreceiver.h:45
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:153
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define NI_MAXHOST
Definition: getaddrinfo.h:88
#define ERROR
Definition: elog.h:46
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
pid_t pid
Definition: walreceiver.h:64
#define MAXCONNINFO
Definition: walreceiver.h:37
TimeLineID receiveStartTLI
Definition: walreceiver.h:76
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
void * palloc0(Size size)
Definition: mcxt.c:1093
uintptr_t Datum
Definition: postgres.h:411
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:353
bool ready_to_display
Definition: walreceiver.h:134
TimestampTz latestWalEndTime
Definition: walreceiver.h:106
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4869
XLogRecPtr latestWalEnd
Definition: walreceiver.h:105
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:317
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:220
WalRcvData * WalRcv
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define Int32GetDatum(X)
Definition: postgres.h:523
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:429
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:118
#define elog(elevel,...)
Definition: elog.h:232
#define CStringGetTextDatum(s)
Definition: builtins.h:86
XLogRecPtr receiveStart
Definition: walreceiver.h:75
XLogRecPtr flushedUpto
Definition: walreceiver.h:85
#define PG_RETURN_NULL()
Definition: fmgr.h:345
char slotname[NAMEDATALEN]
Definition: walreceiver.h:125
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:112

◆ ProcessWalRcvInterrupts()

void ProcessWalRcvInterrupts ( void  )

Definition at line 147 of file walreceiver.c.

References CHECK_FOR_INTERRUPTS, ereport, errcode(), errmsg(), FATAL, and ShutdownRequestPending.

Referenced by libpqrcv_connect(), libpqrcv_PQgetResult(), libpqrcv_processTuples(), walrcv_clear_result(), WalRcvWaitForStartPosition(), and WalReceiverMain().

148 {
149  /*
150  * Although walreceiver interrupt handling doesn't use the same scheme as
151  * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
152  * any incoming signals on Win32, and also to make sure we process any
153  * barrier events.
154  */
156 
158  {
159  ereport(FATAL,
160  (errcode(ERRCODE_ADMIN_SHUTDOWN),
161  errmsg("terminating walreceiver process due to administrator command")));
162  }
163 }
int errcode(int sqlerrcode)
Definition: elog.c:698
#define FATAL
Definition: elog.h:49
volatile sig_atomic_t ShutdownRequestPending
Definition: interrupt.c:27
#define ereport(elevel,...)
Definition: elog.h:157
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120

◆ ProcessWalSndrMessage()

static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1233 of file walreceiver.c.

References DEBUG2, elog, GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, message_level_is_interesting(), WalRcvData::mutex, pfree(), pstrdup(), SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

1234 {
1235  WalRcvData *walrcv = WalRcv;
1236 
1237  TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1238 
1239  /* Update shared-memory status */
1240  SpinLockAcquire(&walrcv->mutex);
1241  if (walrcv->latestWalEnd < walEnd)
1242  walrcv->latestWalEndTime = sendTime;
1243  walrcv->latestWalEnd = walEnd;
1244  walrcv->lastMsgSendTime = sendTime;
1245  walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1246  SpinLockRelease(&walrcv->mutex);
1247 
1249  {
1250  char *sendtime;
1251  char *receipttime;
1252  int applyDelay;
1253 
1254  /* Copy because timestamptz_to_str returns a static buffer */
1255  sendtime = pstrdup(timestamptz_to_str(sendTime));
1256  receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1257  applyDelay = GetReplicationApplyDelay();
1258 
1259  /* apply delay is not available */
1260  if (applyDelay == -1)
1261  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1262  sendtime,
1263  receipttime,
1265  else
1266  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1267  sendtime,
1268  receipttime,
1269  applyDelay,
1271 
1272  pfree(sendtime);
1273  pfree(receipttime);
1274  }
1275 }
slock_t mutex
Definition: walreceiver.h:145
int GetReplicationTransferLatency(void)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1299
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:100
TimestampTz lastMsgSendTime
Definition: walreceiver.h:99
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pfree(void *pointer)
Definition: mcxt.c:1169
#define DEBUG2
Definition: elog.h:24
bool message_level_is_interesting(int elevel)
Definition: elog.c:270
int GetReplicationApplyDelay(void)
#define SpinLockRelease(lock)
Definition: spin.h:64
TimestampTz latestWalEndTime
Definition: walreceiver.h:106
XLogRecPtr latestWalEnd
Definition: walreceiver.h:105
WalRcvData * WalRcv
#define elog(elevel,...)
Definition: elog.h:232
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1774

◆ WalRcvDie()

static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 776 of file walreceiver.c.

References Assert, ConditionVariableBroadcast(), WalRcvData::latch, WalRcvData::mutex, MyProcPid, WalRcvData::pid, WalRcvData::ready_to_display, SpinLockAcquire, SpinLockRelease, WakeupRecovery(), WalRcv, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

777 {
778  WalRcvData *walrcv = WalRcv;
779 
780  /* Ensure that all WAL records received are flushed to disk */
781  XLogWalRcvFlush(true);
782 
783  /* Mark ourselves inactive in shared memory */
784  SpinLockAcquire(&walrcv->mutex);
785  Assert(walrcv->walRcvState == WALRCV_STREAMING ||
786  walrcv->walRcvState == WALRCV_RESTARTING ||
787  walrcv->walRcvState == WALRCV_STARTING ||
788  walrcv->walRcvState == WALRCV_WAITING ||
789  walrcv->walRcvState == WALRCV_STOPPING);
790  Assert(walrcv->pid == MyProcPid);
791  walrcv->walRcvState = WALRCV_STOPPED;
792  walrcv->pid = 0;
793  walrcv->ready_to_display = false;
794  walrcv->latch = NULL;
795  SpinLockRelease(&walrcv->mutex);
796 
798 
799  /* Terminate the connection gracefully. */
800  if (wrconn != NULL)
802 
803  /* Wake up the startup process to notice promptly that we're gone */
804  WakeupRecovery();
805 }
int MyProcPid
Definition: globals.c:43
slock_t mutex
Definition: walreceiver.h:145
WalRcvState walRcvState
Definition: walreceiver.h:65
void ConditionVariableBroadcast(ConditionVariable *cv)
static WalReceiverConn * wrconn
Definition: walreceiver.c:94
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:13168
pid_t pid
Definition: walreceiver.h:64
Latch * latch
Definition: walreceiver.h:143
#define SpinLockRelease(lock)
Definition: spin.h:64
bool ready_to_display
Definition: walreceiver.h:134
#define Assert(condition)
Definition: c.h:804
#define walrcv_disconnect(conn)
Definition: walreceiver.h:432
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:959
WalRcvData * WalRcv
ConditionVariable walRcvStoppedCV
Definition: walreceiver.h:66

◆ WalRcvFetchTimeLineHistoryFiles()

static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 720 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, ereport, errcode(), errmsg(), errmsg_internal(), ERROR, existsTimeLineHistory(), LOG, MAXFNAMELEN, pfree(), TLHistoryFileName, walrcv_readtimelinehistoryfile, writeTimeLineHistoryFile(), XLogArchiveForceDone(), XLogArchiveMode, and XLogArchiveNotify().

Referenced by WalReceiverMain().

721 {
722  TimeLineID tli;
723 
724  for (tli = first; tli <= last; tli++)
725  {
726  /* there's no history file for timeline 1 */
727  if (tli != 1 && !existsTimeLineHistory(tli))
728  {
729  char *fname;
730  char *content;
731  int len;
732  char expectedfname[MAXFNAMELEN];
733 
734  ereport(LOG,
735  (errmsg("fetching timeline history file for timeline %u from primary server",
736  tli)));
737 
738  walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
739 
740  /*
741  * Check that the filename on the primary matches what we
742  * calculated ourselves. This is just a sanity check, it should
743  * always match.
744  */
745  TLHistoryFileName(expectedfname, tli);
746  if (strcmp(fname, expectedfname) != 0)
747  ereport(ERROR,
748  (errcode(ERRCODE_PROTOCOL_VIOLATION),
749  errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
750  tli)));
751 
752  /*
753  * Write the file to pg_wal.
754  */
755  writeTimeLineHistoryFile(tli, content, len);
756 
757  /*
758  * Mark the streamed history file as ready for archiving if
759  * archive_mode is always.
760  */
762  XLogArchiveForceDone(fname);
763  else
764  XLogArchiveNotify(fname);
765 
766  pfree(fname);
767  pfree(content);
768  }
769  }
770 }
uint32 TimeLineID
Definition: xlogdefs.h:59
int errcode(int sqlerrcode)
Definition: elog.c:698
#define LOG
Definition: elog.h:26
static WalReceiverConn * wrconn
Definition: walreceiver.c:94
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:467
void pfree(void *pointer)
Definition: mcxt.c:1169
#define ERROR
Definition: elog.h:46
#define TLHistoryFileName(fname, tli)
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition: timeline.c:467
int XLogArchiveMode
Definition: xlog.c:96
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition: timeline.c:222
#define MAXFNAMELEN
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:517
#define ereport(elevel,...)
Definition: elog.h:157
int errmsg_internal(const char *fmt,...)
Definition: elog.c:996
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
Definition: walreceiver.h:416
int errmsg(const char *fmt,...)
Definition: elog.c:909

◆ WalRcvForceReply()

void WalRcvForceReply ( void  )

Definition at line 1286 of file walreceiver.c.

References WalRcvData::force_reply, WalRcvData::latch, WalRcvData::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by StartupXLOG(), WaitForWALToBecomeAvailable(), and walrcv_clear_result().

1287 {
1288  Latch *latch;
1289 
1290  WalRcv->force_reply = true;
1291  /* fetching the latch pointer might not be atomic, so use spinlock */
1293  latch = WalRcv->latch;
1295  if (latch)
1296  SetLatch(latch);
1297 }
slock_t mutex
Definition: walreceiver.h:145
sig_atomic_t force_reply
Definition: walreceiver.h:160
void SetLatch(Latch *latch)
Definition: latch.c:567
#define SpinLockAcquire(lock)
Definition: spin.h:62
Definition: latch.h:110
Latch * latch
Definition: walreceiver.h:143
#define SpinLockRelease(lock)
Definition: spin.h:64
WalRcvData * WalRcv

◆ WalRcvGetStateString()

static const char* WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1304 of file walreceiver.c.

References WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

1305 {
1306  switch (state)
1307  {
1308  case WALRCV_STOPPED:
1309  return "stopped";
1310  case WALRCV_STARTING:
1311  return "starting";
1312  case WALRCV_STREAMING:
1313  return "streaming";
1314  case WALRCV_WAITING:
1315  return "waiting";
1316  case WALRCV_RESTARTING:
1317  return "restarting";
1318  case WALRCV_STOPPING:
1319  return "stopping";
1320  }
1321  return "UNKNOWN";
1322 }
Definition: regguts.h:317

◆ WalRcvWaitForStartPosition()

static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 640 of file walreceiver.c.

References Assert, elog, FATAL, InvalidXLogRecPtr, LSN_FORMAT_ARGS, WalRcvData::mutex, MyLatch, proc_exit(), ProcessWalRcvInterrupts(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, ResetLatch(), set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WAIT_EVENT_WAL_RECEIVER_WAIT_START, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by WalReceiverMain().

641 {
642  WalRcvData *walrcv = WalRcv;
643  int state;
644 
645  SpinLockAcquire(&walrcv->mutex);
646  state = walrcv->walRcvState;
647  if (state != WALRCV_STREAMING)
648  {
649  SpinLockRelease(&walrcv->mutex);
650  if (state == WALRCV_STOPPING)
651  proc_exit(0);
652  else
653  elog(FATAL, "unexpected walreceiver state");
654  }
655  walrcv->walRcvState = WALRCV_WAITING;
657  walrcv->receiveStartTLI = 0;
658  SpinLockRelease(&walrcv->mutex);
659 
660  set_ps_display("idle");
661 
662  /*
663  * nudge startup process to notice that we've stopped streaming and are
664  * now waiting for instructions.
665  */
666  WakeupRecovery();
667  for (;;)
668  {
670 
672 
673  SpinLockAcquire(&walrcv->mutex);
674  Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
675  walrcv->walRcvState == WALRCV_WAITING ||
676  walrcv->walRcvState == WALRCV_STOPPING);
677  if (walrcv->walRcvState == WALRCV_RESTARTING)
678  {
679  /*
680  * No need to handle changes in primary_conninfo or
681  * primary_slotname here. Startup process will signal us to
682  * terminate in case those change.
683  */
684  *startpoint = walrcv->receiveStart;
685  *startpointTLI = walrcv->receiveStartTLI;
686  walrcv->walRcvState = WALRCV_STREAMING;
687  SpinLockRelease(&walrcv->mutex);
688  break;
689  }
690  if (walrcv->walRcvState == WALRCV_STOPPING)
691  {
692  /*
693  * We should've received SIGTERM if the startup process wants us
694  * to die, but might as well check it here too.
695  */
696  SpinLockRelease(&walrcv->mutex);
697  exit(1);
698  }
699  SpinLockRelease(&walrcv->mutex);
700 
703  }
704 
706  {
707  char activitymsg[50];
708 
709  snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
710  LSN_FORMAT_ARGS(*startpoint));
711  set_ps_display(activitymsg);
712  }
713 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
slock_t mutex
Definition: walreceiver.h:145
bool update_process_title
Definition: ps_status.c:36
WalRcvState walRcvState
Definition: walreceiver.h:65
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:147
void proc_exit(int code)
Definition: ipc.c:104
void ResetLatch(Latch *latch)
Definition: latch.c:660
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:452
void set_ps_display(const char *activity)
Definition: ps_status.c:349
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
void WakeupRecovery(void)
Definition: xlog.c:13168
#define FATAL
Definition: elog.h:49
TimeLineID receiveStartTLI
Definition: walreceiver.h:76
#define SpinLockRelease(lock)
Definition: spin.h:64
#define Assert(condition)
Definition: c.h:804
Definition: regguts.h:317
WalRcvData * WalRcv
#define elog(elevel,...)
Definition: elog.h:232
struct Latch * MyLatch
Definition: globals.c:57
XLogRecPtr receiveStart
Definition: walreceiver.h:75
#define snprintf
Definition: port.h:217
#define WL_LATCH_SET
Definition: latch.h:125
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130

◆ WalReceiverMain()

void WalReceiverMain ( void  )

Definition at line 168 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, Assert, buf, close, cluster_name, ConditionVariableBroadcast(), ConfigReloadPending, WalRcvData::conninfo, DEBUG1, elog, ereport, errcode(), errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, WalRcvData::force_reply, GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), initStringInfo(), WalRcvData::is_temp_slot, WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latch, WalRcvData::latestWalEndTime, load_file(), LOG, WalRcvStreamOptions::logical, LogstreamResult, LSN_FORMAT_ARGS, MAXCONNINFO, MAXFNAMELEN, WalRcvData::mutex, MyLatch, MyProc, MyProcPid, NAMEDATALEN, NAPTIME_PER_CYCLE, NI_MAXHOST, now(), on_shmem_exit(), options, PANIC, pfree(), pg_atomic_write_u64(), pg_memory_barrier, PG_SETMASK, PGC_SIGHUP, PGINVALID_SOCKET, WalRcvStreamOptions::physical, WalRcvData::pid, pqsignal(), proc_exit(), ProcessConfigFile(), ProcessWalRcvInterrupts(), PGPROC::procLatch, procsignal_sigusr1_handler(), WalRcvStreamOptions::proto, WalRcvData::ready_to_display, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, ResetLatch(), WalRcvData::sender_host, WalRcvData::sender_port, SIG_DFL, SIG_IGN, SIGALRM, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SIGPIPE, SIGUSR1, SIGUSR2, WalRcvData::slotname, WalRcvStreamOptions::slotname, snprintf, SpinLockAcquire, SpinLockRelease, WalRcvStreamOptions::startpoint, strlcpy(), ThisTimeLineID, TimestampTzPlusMilliseconds, UINT64_FORMAT, UnBlockSig, WAIT_EVENT_WAL_RECEIVER_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, wal_segment_size, WalRcv, walrcv_connect, walrcv_create_slot, walrcv_endstreaming, walrcv_get_backend_pid, walrcv_get_conninfo, walrcv_get_senderinfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, WalRcvWaitForStartPosition(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_TIMEOUT, WalRcvData::writtenUpto, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName, XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by AuxiliaryProcessMain(), and walrcv_clear_result().

169 {
170  char conninfo[MAXCONNINFO];
171  char *tmp_conninfo;
172  char slotname[NAMEDATALEN];
173  bool is_temp_slot;
174  XLogRecPtr startpoint;
175  TimeLineID startpointTLI;
176  TimeLineID primaryTLI;
177  bool first_stream;
178  WalRcvData *walrcv = WalRcv;
179  TimestampTz last_recv_timestamp;
181  bool ping_sent;
182  char *err;
183  char *sender_host = NULL;
184  int sender_port = 0;
185 
186  /*
187  * WalRcv should be set up already (if we are a backend, we inherit this
188  * by fork() or EXEC_BACKEND mechanism from the postmaster).
189  */
190  Assert(walrcv != NULL);
191 
192  now = GetCurrentTimestamp();
193 
194  /*
195  * Mark walreceiver as running in shared memory.
196  *
197  * Do this as early as possible, so that if we fail later on, we'll set
198  * state to STOPPED. If we die before this, the startup process will keep
199  * waiting for us to start up, until it times out.
200  */
201  SpinLockAcquire(&walrcv->mutex);
202  Assert(walrcv->pid == 0);
203  switch (walrcv->walRcvState)
204  {
205  case WALRCV_STOPPING:
206  /* If we've already been requested to stop, don't start up. */
207  walrcv->walRcvState = WALRCV_STOPPED;
208  /* fall through */
209 
210  case WALRCV_STOPPED:
211  SpinLockRelease(&walrcv->mutex);
213  proc_exit(1);
214  break;
215 
216  case WALRCV_STARTING:
217  /* The usual case */
218  break;
219 
220  case WALRCV_WAITING:
221  case WALRCV_STREAMING:
222  case WALRCV_RESTARTING:
223  default:
224  /* Shouldn't happen */
225  SpinLockRelease(&walrcv->mutex);
226  elog(PANIC, "walreceiver still running according to shared memory state");
227  }
228  /* Advertise our PID so that the startup process can kill us */
229  walrcv->pid = MyProcPid;
230  walrcv->walRcvState = WALRCV_STREAMING;
231 
232  /* Fetch information required to start streaming */
233  walrcv->ready_to_display = false;
234  strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
235  strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
236  is_temp_slot = walrcv->is_temp_slot;
237  startpoint = walrcv->receiveStart;
238  startpointTLI = walrcv->receiveStartTLI;
239 
240  /*
241  * At most one of is_temp_slot and slotname can be set; otherwise,
242  * RequestXLogStreaming messed up.
243  */
244  Assert(!is_temp_slot || (slotname[0] == '\0'));
245 
246  /* Initialise to a sanish value */
247  walrcv->lastMsgSendTime =
248  walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
249 
250  /* Report the latch to use to awaken this process */
251  walrcv->latch = &MyProc->procLatch;
252 
253  SpinLockRelease(&walrcv->mutex);
254 
256 
257  /* Arrange to clean up at walreceiver exit */
259 
260  /* Properly accept or ignore signals the postmaster might send us */
261  pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
262  * file */
263  pqsignal(SIGINT, SIG_IGN);
264  pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
265  /* SIGQUIT handler was already set up by InitPostmasterChild */
270 
271  /* Reset some signals that are accepted by postmaster but not here */
273 
274  /* Load the libpq-specific functions */
275  load_file("libpqwalreceiver", false);
276  if (WalReceiverFunctions == NULL)
277  elog(ERROR, "libpqwalreceiver didn't initialize correctly");
278 
279  /* Unblock signals (they were blocked when the postmaster forked us) */
281 
282  /* Establish the connection to the primary for XLOG streaming */
283  wrconn = walrcv_connect(conninfo, false,
284  cluster_name[0] ? cluster_name : "walreceiver",
285  &err);
286  if (!wrconn)
287  ereport(ERROR,
288  (errcode(ERRCODE_CONNECTION_FAILURE),
289  errmsg("could not connect to the primary server: %s", err)));
290 
291  /*
292  * Save user-visible connection string. This clobbers the original
293  * conninfo, for security. Also save host and port of the sender server
294  * this walreceiver is connected to.
295  */
296  tmp_conninfo = walrcv_get_conninfo(wrconn);
297  walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
298  SpinLockAcquire(&walrcv->mutex);
299  memset(walrcv->conninfo, 0, MAXCONNINFO);
300  if (tmp_conninfo)
301  strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
302 
303  memset(walrcv->sender_host, 0, NI_MAXHOST);
304  if (sender_host)
305  strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
306 
307  walrcv->sender_port = sender_port;
308  walrcv->ready_to_display = true;
309  SpinLockRelease(&walrcv->mutex);
310 
311  if (tmp_conninfo)
312  pfree(tmp_conninfo);
313 
314  if (sender_host)
315  pfree(sender_host);
316 
317  first_stream = true;
318  for (;;)
319  {
320  char *primary_sysid;
321  char standby_sysid[32];
323 
324  /*
325  * Check that we're connected to a valid server using the
326  * IDENTIFY_SYSTEM replication command.
327  */
328  primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
329 
330  snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
332  if (strcmp(primary_sysid, standby_sysid) != 0)
333  {
334  ereport(ERROR,
335  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
336  errmsg("database system identifier differs between the primary and standby"),
337  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
338  primary_sysid, standby_sysid)));
339  }
340 
341  /*
342  * Confirm that the current timeline of the primary is the same or
343  * ahead of ours.
344  */
345  if (primaryTLI < startpointTLI)
346  ereport(ERROR,
347  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
348  errmsg("highest timeline %u of the primary is behind recovery timeline %u",
349  primaryTLI, startpointTLI)));
350 
351  /*
352  * Get any missing history files. We do this always, even when we're
353  * not interested in that timeline, so that if we're promoted to
354  * become the primary later on, we don't select the same timeline that
355  * was already used in the current primary. This isn't bullet-proof -
356  * you'll need some external software to manage your cluster if you
357  * need to ensure that a unique timeline id is chosen in every case,
358  * but let's avoid the confusion of timeline id collisions where we
359  * can.
360  */
361  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
362 
363  /*
364  * Create temporary replication slot if requested, and update slot
365  * name in shared memory. (Note the slot name cannot already be set
366  * in this case.)
367  */
368  if (is_temp_slot)
369  {
370  snprintf(slotname, sizeof(slotname),
371  "pg_walreceiver_%lld",
372  (long long int) walrcv_get_backend_pid(wrconn));
373 
374  walrcv_create_slot(wrconn, slotname, true, false, 0, NULL);
375 
376  SpinLockAcquire(&walrcv->mutex);
377  strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
378  SpinLockRelease(&walrcv->mutex);
379  }
380 
381  /*
382  * Start streaming.
383  *
384  * We'll try to start at the requested starting point and timeline,
385  * even if it's different from the server's latest timeline. In case
386  * we've already reached the end of the old timeline, the server will
387  * finish the streaming immediately, and we will go back to await
388  * orders from the startup process. If recovery_target_timeline is
389  * 'latest', the startup process will scan pg_wal and find the new
390  * history file, bump recovery target timeline, and ask us to restart
391  * on the new timeline.
392  */
393  options.logical = false;
394  options.startpoint = startpoint;
395  options.slotname = slotname[0] != '\0' ? slotname : NULL;
396  options.proto.physical.startpointTLI = startpointTLI;
397  ThisTimeLineID = startpointTLI;
398  if (walrcv_startstreaming(wrconn, &options))
399  {
400  if (first_stream)
401  ereport(LOG,
402  (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
403  LSN_FORMAT_ARGS(startpoint), startpointTLI)));
404  else
405  ereport(LOG,
406  (errmsg("restarted WAL streaming at %X/%X on timeline %u",
407  LSN_FORMAT_ARGS(startpoint), startpointTLI)));
408  first_stream = false;
409 
410  /* Initialize LogstreamResult and buffers for processing messages */
414 
415  /* Initialize the last recv timestamp */
416  last_recv_timestamp = GetCurrentTimestamp();
417  ping_sent = false;
418 
419  /* Loop until end-of-streaming or error */
420  for (;;)
421  {
422  char *buf;
423  int len;
424  bool endofwal = false;
425  pgsocket wait_fd = PGINVALID_SOCKET;
426  int rc;
427 
428  /*
429  * Exit walreceiver if we're not in recovery. This should not
430  * happen, but cross-check the status here.
431  */
432  if (!RecoveryInProgress())
433  ereport(FATAL,
434  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
435  errmsg("cannot continue WAL streaming, recovery has already ended")));
436 
437  /* Process any requests or signals received recently */
439 
441  {
442  ConfigReloadPending = false;
445  }
446 
447  /* See if we can read data immediately */
448  len = walrcv_receive(wrconn, &buf, &wait_fd);
449  if (len != 0)
450  {
451  /*
452  * Process the received data, and any subsequent data we
453  * can read without blocking.
454  */
455  for (;;)
456  {
457  if (len > 0)
458  {
459  /*
460  * Something was received from primary, so reset
461  * timeout
462  */
463  last_recv_timestamp = GetCurrentTimestamp();
464  ping_sent = false;
465  XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
466  }
467  else if (len == 0)
468  break;
469  else if (len < 0)
470  {
471  ereport(LOG,
472  (errmsg("replication terminated by primary server"),
473  errdetail("End of WAL reached on timeline %u at %X/%X.",
474  startpointTLI,
476  endofwal = true;
477  break;
478  }
479  len = walrcv_receive(wrconn, &buf, &wait_fd);
480  }
481 
482  /* Let the primary know that we received some data. */
483  XLogWalRcvSendReply(false, false);
484 
485  /*
486  * If we've written some records, flush them to disk and
487  * let the startup process and primary server know about
488  * them.
489  */
490  XLogWalRcvFlush(false);
491  }
492 
493  /* Check if we need to exit the streaming loop. */
494  if (endofwal)
495  break;
496 
497  /*
498  * Ideally we would reuse a WaitEventSet object repeatedly
499  * here to avoid the overheads of WaitLatchOrSocket on epoll
500  * systems, but we can't be sure that libpq (or any other
501  * walreceiver implementation) has the same socket (even if
502  * the fd is the same number, it may have been closed and
503  * reopened since the last time). In future, if there is a
504  * function for removing sockets from WaitEventSet, then we
505  * could add and remove just the socket each time, potentially
506  * avoiding some system calls.
507  */
508  Assert(wait_fd != PGINVALID_SOCKET);
512  wait_fd,
515  if (rc & WL_LATCH_SET)
516  {
519 
520  if (walrcv->force_reply)
521  {
522  /*
523  * The recovery process has asked us to send apply
524  * feedback now. Make sure the flag is really set to
525  * false in shared memory before sending the reply, so
526  * we don't miss a new request for a reply.
527  */
528  walrcv->force_reply = false;
530  XLogWalRcvSendReply(true, false);
531  }
532  }
533  if (rc & WL_TIMEOUT)
534  {
535  /*
536  * We didn't receive anything new. If we haven't heard
537  * anything from the server for more than
538  * wal_receiver_timeout / 2, ping the server. Also, if
539  * it's been longer than wal_receiver_status_interval
540  * since the last update we sent, send a status update to
541  * the primary anyway, to report any progress in applying
542  * WAL.
543  */
544  bool requestReply = false;
545 
546  /*
547  * Check if time since last receive from primary has
548  * reached the configured limit.
549  */
550  if (wal_receiver_timeout > 0)
551  {
553  TimestampTz timeout;
554 
555  timeout =
556  TimestampTzPlusMilliseconds(last_recv_timestamp,
558 
559  if (now >= timeout)
560  ereport(ERROR,
561  (errcode(ERRCODE_CONNECTION_FAILURE),
562  errmsg("terminating walreceiver due to timeout")));
563 
564  /*
565  * We didn't receive anything new, for half of
566  * receiver replication timeout. Ping the server.
567  */
568  if (!ping_sent)
569  {
570  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
571  (wal_receiver_timeout / 2));
572  if (now >= timeout)
573  {
574  requestReply = true;
575  ping_sent = true;
576  }
577  }
578  }
579 
580  XLogWalRcvSendReply(requestReply, requestReply);
582  }
583  }
584 
585  /*
586  * The backend finished streaming. Exit streaming COPY-mode from
587  * our side, too.
588  */
589  walrcv_endstreaming(wrconn, &primaryTLI);
590 
591  /*
592  * If the server had switched to a new timeline that we didn't
593  * know about when we began streaming, fetch its timeline history
594  * file now.
595  */
596  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
597  }
598  else
599  ereport(LOG,
600  (errmsg("primary server contains no more WAL on requested timeline %u",
601  startpointTLI)));
602 
603  /*
604  * End of WAL reached on the requested timeline. Close the last
605  * segment, and await for new orders from the startup process.
606  */
607  if (recvFile >= 0)
608  {
609  char xlogfname[MAXFNAMELEN];
610 
611  XLogWalRcvFlush(false);
613  if (close(recvFile) != 0)
614  ereport(PANIC,
616  errmsg("could not close log segment %s: %m",
617  xlogfname)));
618 
619  /*
620  * Create .done file forcibly to prevent the streamed segment from
621  * being archived later.
622  */
624  XLogArchiveForceDone(xlogfname);
625  else
626  XLogArchiveNotify(xlogfname);
627  }
628  recvFile = -1;
629 
630  elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
631  WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
632  }
633  /* not reached */
634 }
int sender_port
Definition: walreceiver.h:119
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
Definition: walreceiver.c:720
#define walrcv_get_conninfo(conn)
Definition: walreceiver.h:408
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:420
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:43
uint32 TimeLineID
Definition: xlogdefs.h:59
slock_t mutex
Definition: walreceiver.h:145
#define WL_TIMEOUT
Definition: latch.h:128
void ProcessConfigFile(GucContext context)
int wal_segment_size
Definition: xlog.c:119
#define SIGUSR1
Definition: win32_port.h:179
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
#define SIGCHLD
Definition: win32_port.h:177
PGPROC * MyProc
Definition: proc.c:68
int64 TimestampTz
Definition: timestamp.h:39
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
Definition: walreceiver.c:640
sig_atomic_t force_reply
Definition: walreceiver.h:160
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:412
WalRcvState walRcvState
Definition: walreceiver.h:65
static StringInfoData incoming_message
Definition: walreceiver.c:119
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1132
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:422
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:147
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:418
union WalRcvStreamOptions::@104 proto
void ConditionVariableBroadcast(ConditionVariable *cv)
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:698
#define WL_SOCKET_READABLE
Definition: latch.h:126
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
Definition: walreceiver.c:811
#define SIGPIPE
Definition: win32_port.h:172
#define SIGUSR2
Definition: win32_port.h:180
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:8328
#define PANIC
Definition: elog.h:50
#define NAPTIME_PER_CYCLE
Definition: walreceiver.c:97
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:100
void ResetLatch(Latch *latch)
Definition: latch.c:660
TimestampTz lastMsgSendTime
Definition: walreceiver.h:99
#define PG_SETMASK(mask)
Definition: pqsignal.h:19
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:438
int wal_receiver_timeout
Definition: walreceiver.c:90
Latch procLatch
Definition: proc.h:130
static WalReceiverConn * wrconn
Definition: walreceiver.c:94
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:153
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
static StringInfoData reply_message
Definition: walreceiver.c:118
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:467
void pfree(void *pointer)
Definition: mcxt.c:1169
#define NI_MAXHOST
Definition: getaddrinfo.h:88
#define ERROR
Definition: elog.h:46
#define FATAL
Definition: elog.h:49
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11927
pid_t pid
Definition: walreceiver.h:64
#define MAXCONNINFO
Definition: walreceiver.h:37
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
static char * buf
Definition: pg_test_fsync.c:68
int XLogArchiveMode
Definition: xlog.c:96
int errdetail(const char *fmt,...)
Definition: elog.c:1042
int errcode_for_file_access(void)
Definition: elog.c:721
#define SIGHUP
Definition: win32_port.h:167
XLogRecPtr startpoint
Definition: walreceiver.h:170
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:500
int pgsocket
Definition: port.h:31
sigset_t UnBlockSig
Definition: pqsignal.c:22
TimeLineID receiveStartTLI
Definition: walreceiver.h:76
static TimeLineID recvFileTLI
Definition: walreceiver.c:105
#define walrcv_get_backend_pid(conn)
Definition: walreceiver.h:428
Definition: guc.h:72
char * cluster_name
Definition: guc.c:613
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition: interrupt.c:104
Latch * latch
Definition: walreceiver.h:143
#define SIG_IGN
Definition: win32_port.h:164
static char ** options
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
static XLogSegNo recvSegNo
Definition: walreceiver.c:106
#define MAXFNAMELEN
#define SpinLockRelease(lock)
Definition: spin.h:64
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:517
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:95
#define PGINVALID_SOCKET
Definition: port.h:33
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1063
bool ready_to_display
Definition: walreceiver.h:134
TimestampTz latestWalEndTime
Definition: walreceiver.h:106
TimeLineID ThisTimeLineID
Definition: xlog.c:194
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define ereport(elevel,...)
Definition: elog.h:157
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define pg_memory_barrier()
Definition: atomics.h:145
#define SIG_DFL
Definition: win32_port.h:162
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
#define SIGALRM
Definition: win32_port.h:173
bool is_temp_slot
Definition: walreceiver.h:131
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:959
WalRcvData * WalRcv
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4998
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:118
int errmsg(const char *fmt,...)
Definition: elog.c:909
static void WalRcvDie(int code, Datum arg)
Definition: walreceiver.c:776
#define elog(elevel,...)
Definition: elog.h:232
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
Definition: walreceiver.h:410
struct Latch * MyLatch
Definition: globals.c:57
struct WalRcvStreamOptions::@104::@105 physical
#define close(a)
Definition: win32.h:12
XLogRecPtr receiveStart
Definition: walreceiver.h:75
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:642
static int recvFile
Definition: walreceiver.c:104
#define snprintf
Definition: port.h:217
#define WL_LATCH_SET
Definition: latch.h:125
#define UINT64_FORMAT
Definition: c.h:484
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
char slotname[NAMEDATALEN]
Definition: walreceiver.h:125
ConditionVariable walRcvStoppedCV
Definition: walreceiver.h:66
static struct @17 LogstreamResult
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:112
#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
Definition: walreceiver.h:426
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:404

◆ XLogWalRcvClose()

static void XLogWalRcvClose ( XLogRecPtr  recptr)
static

Definition at line 1012 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, Assert, close, ereport, errcode_for_file_access(), errmsg(), MAXFNAMELEN, PANIC, recvFile, recvFileTLI, recvSegNo, wal_segment_size, XLByteInSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName, and XLogWalRcvFlush().

Referenced by XLogWalRcvWrite().

1013 {
1014  char xlogfname[MAXFNAMELEN];
1015 
1017 
1018  /*
1019  * fsync() and close current file before we switch to next one. We would
1020  * otherwise have to reopen this file to fsync it later
1021  */
1022  XLogWalRcvFlush(false);
1023 
1025 
1026  /*
1027  * XLOG segment files will be re-read by recovery in startup process soon,
1028  * so we don't advise the OS to release cache pages associated with the
1029  * file like XLogFileClose() does.
1030  */
1031  if (close(recvFile) != 0)
1032  ereport(PANIC,
1034  errmsg("could not close log segment %s: %m",
1035  xlogfname)));
1036 
1037  /*
1038  * Create .done file forcibly to prevent the streamed segment from being
1039  * archived later.
1040  */
1042  XLogArchiveForceDone(xlogfname);
1043  else
1044  XLogArchiveNotify(xlogfname);
1045 
1046  recvFile = -1;
1047 }
int wal_segment_size
Definition: xlog.c:119
#define PANIC
Definition: elog.h:50
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:467
int XLogArchiveMode
Definition: xlog.c:96
int errcode_for_file_access(void)
Definition: elog.c:721
static TimeLineID recvFileTLI
Definition: walreceiver.c:105
static XLogSegNo recvSegNo
Definition: walreceiver.c:106
#define MAXFNAMELEN
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:517
#define ereport(elevel,...)
Definition: elog.h:157
#define Assert(condition)
Definition: c.h:804
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:959
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define close(a)
Definition: win32.h:12
static int recvFile
Definition: walreceiver.c:104

◆ XLogWalRcvFlush()

static void XLogWalRcvFlush ( bool  dying)
static

Definition at line 959 of file walreceiver.c.

References AllowCascadeReplication, WalRcvData::flushedUpto, issue_xlog_fsync(), WalRcvData::latestChunkStart, LogstreamResult, LSN_FORMAT_ARGS, WalRcvData::mutex, WalRcvData::receivedTLI, recvFile, recvSegNo, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, ThisTimeLineID, update_process_title, WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvClose().

960 {
961  if (LogstreamResult.Flush < LogstreamResult.Write)
962  {
963  WalRcvData *walrcv = WalRcv;
964 
966 
967  LogstreamResult.Flush = LogstreamResult.Write;
968 
969  /* Update shared-memory status */
970  SpinLockAcquire(&walrcv->mutex);
971  if (walrcv->flushedUpto < LogstreamResult.Flush)
972  {
973  walrcv->latestChunkStart = walrcv->flushedUpto;
974  walrcv->flushedUpto = LogstreamResult.Flush;
975  walrcv->receivedTLI = ThisTimeLineID;
976  }
977  SpinLockRelease(&walrcv->mutex);
978 
979  /* Signal the startup process and walsender that new WAL has arrived */
980  WakeupRecovery();
982  WalSndWakeup();
983 
984  /* Report XLOG streaming progress in PS display */
986  {
987  char activitymsg[50];
988 
989  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
991  set_ps_display(activitymsg);
992  }
993 
994  /* Also let the primary know that we made some progress */
995  if (!dying)
996  {
997  XLogWalRcvSendReply(false, false);
999  }
1000  }
1001 }
slock_t mutex
Definition: walreceiver.h:145
bool update_process_title
Definition: ps_status.c:36
void issue_xlog_fsync(int fd, XLogSegNo segno)
Definition: xlog.c:10826
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1132
TimeLineID receivedTLI
Definition: walreceiver.h:86
void set_ps_display(const char *activity)
Definition: ps_status.c:349
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
void WakeupRecovery(void)
Definition: xlog.c:13168
XLogRecPtr latestChunkStart
Definition: walreceiver.h:94
#define AllowCascadeReplication()
Definition: walreceiver.h:40
static XLogSegNo recvSegNo
Definition: walreceiver.c:106
#define SpinLockRelease(lock)
Definition: spin.h:64
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1063
TimeLineID ThisTimeLineID
Definition: xlog.c:194
WalRcvData * WalRcv
static int recvFile
Definition: walreceiver.c:104
XLogRecPtr flushedUpto
Definition: walreceiver.h:85
#define snprintf
Definition: port.h:217
void WalSndWakeup(void)
Definition: walsender.c:3124
static struct @17 LogstreamResult

◆ XLogWalRcvProcessMsg()

static void XLogWalRcvProcessMsg ( unsigned char  type,
char *  buf,
Size  len 
)
static

Definition at line 811 of file walreceiver.c.

References appendBinaryStringInfo(), ereport, errcode(), errmsg_internal(), ERROR, pq_getmsgbyte(), pq_getmsgint64(), ProcessWalSndrMessage(), resetStringInfo(), XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

812 {
813  int hdrlen;
814  XLogRecPtr dataStart;
815  XLogRecPtr walEnd;
816  TimestampTz sendTime;
817  bool replyRequested;
818 
820 
821  switch (type)
822  {
823  case 'w': /* WAL records */
824  {
825  /* copy message to StringInfo */
826  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
827  if (len < hdrlen)
828  ereport(ERROR,
829  (errcode(ERRCODE_PROTOCOL_VIOLATION),
830  errmsg_internal("invalid WAL message received from primary")));
832 
833  /* read the fields */
834  dataStart = pq_getmsgint64(&incoming_message);
835  walEnd = pq_getmsgint64(&incoming_message);
836  sendTime = pq_getmsgint64(&incoming_message);
837  ProcessWalSndrMessage(walEnd, sendTime);
838 
839  buf += hdrlen;
840  len -= hdrlen;
841  XLogWalRcvWrite(buf, len, dataStart);
842  break;
843  }
844  case 'k': /* Keepalive */
845  {
846  /* copy message to StringInfo */
847  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
848  if (len != hdrlen)
849  ereport(ERROR,
850  (errcode(ERRCODE_PROTOCOL_VIOLATION),
851  errmsg_internal("invalid keepalive message received from primary")));
853 
854  /* read the fields */
855  walEnd = pq_getmsgint64(&incoming_message);
856  sendTime = pq_getmsgint64(&incoming_message);
857  replyRequested = pq_getmsgbyte(&incoming_message);
858 
859  ProcessWalSndrMessage(walEnd, sendTime);
860 
861  /* If the primary requested a reply, send one immediately */
862  if (replyRequested)
863  XLogWalRcvSendReply(true, false);
864  break;
865  }
866  default:
867  ereport(ERROR,
868  (errcode(ERRCODE_PROTOCOL_VIOLATION),
869  errmsg_internal("invalid replication message type %d",
870  type)));
871  }
872 }
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
Definition: walreceiver.c:1233
int64 TimestampTz
Definition: timestamp.h:39
static StringInfoData incoming_message
Definition: walreceiver.c:119
int errcode(int sqlerrcode)
Definition: elog.c:698
#define ERROR
Definition: elog.h:46
static char * buf
Definition: pg_test_fsync.c:68
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1063
#define ereport(elevel,...)
Definition: elog.h:157
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
int errmsg_internal(const char *fmt,...)
Definition: elog.c:996
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
Definition: walreceiver.c:878

◆ XLogWalRcvSendHSFeedback()

static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1132 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, EpochFromFullTransactionId, GetCurrentTimestamp(), GetReplicationHorizons(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), pq_sendbyte(), pq_sendint32(), pq_sendint64(), ReadNextFullTransactionId(), resetStringInfo(), TimestampDifferenceExceeds(), TransactionIdIsValid, wal_receiver_status_interval, walrcv_send, and XidFromFullTransactionId.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

1133 {
1134  TimestampTz now;
1135  FullTransactionId nextFullXid;
1136  TransactionId nextXid;
1137  uint32 xmin_epoch,
1138  catalog_xmin_epoch;
1139  TransactionId xmin,
1140  catalog_xmin;
1141  static TimestampTz sendTime = 0;
1142 
1143  /* initially true so we always send at least one feedback message */
1144  static bool primary_has_standby_xmin = true;
1145 
1146  /*
1147  * If the user doesn't want status to be reported to the primary, be sure
1148  * to exit before doing anything at all.
1149  */
1151  !primary_has_standby_xmin)
1152  return;
1153 
1154  /* Get current timestamp. */
1155  now = GetCurrentTimestamp();
1156 
1157  if (!immed)
1158  {
1159  /*
1160  * Send feedback at most once per wal_receiver_status_interval.
1161  */
1162  if (!TimestampDifferenceExceeds(sendTime, now,
1164  return;
1165  sendTime = now;
1166  }
1167 
1168  /*
1169  * If Hot Standby is not yet accepting connections there is nothing to
1170  * send. Check this after the interval has expired to reduce number of
1171  * calls.
1172  *
1173  * Bailing out here also ensures that we don't send feedback until we've
1174  * read our own replication slot state, so we don't tell the primary to
1175  * discard needed xmin or catalog_xmin from any slots that may exist on
1176  * this replica.
1177  */
1178  if (!HotStandbyActive())
1179  return;
1180 
1181  /*
1182  * Make the expensive call to get the oldest xmin once we are certain
1183  * everything else has been checked.
1184  */
1186  {
1187  GetReplicationHorizons(&xmin, &catalog_xmin);
1188  }
1189  else
1190  {
1191  xmin = InvalidTransactionId;
1192  catalog_xmin = InvalidTransactionId;
1193  }
1194 
1195  /*
1196  * Get epoch and adjust if nextXid and oldestXmin are different sides of
1197  * the epoch boundary.
1198  */
1199  nextFullXid = ReadNextFullTransactionId();
1200  nextXid = XidFromFullTransactionId(nextFullXid);
1201  xmin_epoch = EpochFromFullTransactionId(nextFullXid);
1202  catalog_xmin_epoch = xmin_epoch;
1203  if (nextXid < xmin)
1204  xmin_epoch--;
1205  if (nextXid < catalog_xmin)
1206  catalog_xmin_epoch--;
1207 
1208  elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1209  xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1210 
1211  /* Construct the message and send it. */
1213  pq_sendbyte(&reply_message, 'h');
1215  pq_sendint32(&reply_message, xmin);
1216  pq_sendint32(&reply_message, xmin_epoch);
1217  pq_sendint32(&reply_message, catalog_xmin);
1218  pq_sendint32(&reply_message, catalog_xmin_epoch);
1220  if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1221  primary_has_standby_xmin = true;
1222  else
1223  primary_has_standby_xmin = false;
1224 }
bool hot_standby_feedback
Definition: walreceiver.c:91
uint32 TransactionId
Definition: c.h:587
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
int64 TimestampTz
Definition: timestamp.h:39
bool HotStandbyActive(void)
Definition: xlog.c:8402
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
int wal_receiver_status_interval
Definition: walreceiver.c:89
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1711
static WalReceiverConn * wrconn
Definition: walreceiver.c:94
#define XidFromFullTransactionId(x)
Definition: transam.h:48
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static StringInfoData reply_message
Definition: walreceiver.c:118
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
unsigned int uint32
Definition: c.h:441
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:261
void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:2047
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:424
#define elog(elevel,...)
Definition: elog.h:232
#define TransactionIdIsValid(xid)
Definition: transam.h:41
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544

◆ XLogWalRcvSendReply()

static void XLogWalRcvSendReply ( bool  force,
bool  requestReply 
)
static

Definition at line 1063 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), GetXLogReplayRecPtr(), StringInfoData::len, LogstreamResult, LSN_FORMAT_ARGS, now(), pq_sendbyte(), pq_sendint64(), resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

1064 {
1065  static XLogRecPtr writePtr = 0;
1066  static XLogRecPtr flushPtr = 0;
1067  XLogRecPtr applyPtr;
1068  static TimestampTz sendTime = 0;
1069  TimestampTz now;
1070 
1071  /*
1072  * If the user doesn't want status to be reported to the primary, be sure
1073  * to exit before doing anything at all.
1074  */
1075  if (!force && wal_receiver_status_interval <= 0)
1076  return;
1077 
1078  /* Get current timestamp. */
1079  now = GetCurrentTimestamp();
1080 
1081  /*
1082  * We can compare the write and flush positions to the last message we
1083  * sent without taking any lock, but the apply position requires a spin
1084  * lock, so we don't check that unless something else has changed or 10
1085  * seconds have passed. This means that the apply WAL location will
1086  * appear, from the primary's point of view, to lag slightly, but since
1087  * this is only for reporting purposes and only on idle systems, that's
1088  * probably OK.
1089  */
1090  if (!force
1091  && writePtr == LogstreamResult.Write
1092  && flushPtr == LogstreamResult.Flush
1093  && !TimestampDifferenceExceeds(sendTime, now,
1095  return;
1096  sendTime = now;
1097 
1098  /* Construct a new message */
1099  writePtr = LogstreamResult.Write;
1100  flushPtr = LogstreamResult.Flush;
1101  applyPtr = GetXLogReplayRecPtr(NULL);
1102 
1104  pq_sendbyte(&reply_message, 'r');
1105  pq_sendint64(&reply_message, writePtr);
1106  pq_sendint64(&reply_message, flushPtr);
1107  pq_sendint64(&reply_message, applyPtr);
1109  pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1110 
1111  /* Send it */
1112  elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1113  LSN_FORMAT_ARGS(writePtr),
1114  LSN_FORMAT_ARGS(flushPtr),
1115  LSN_FORMAT_ARGS(applyPtr),
1116  requestReply ? " (reply requested)" : "");
1117 
1119 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
int64 TimestampTz
Definition: timestamp.h:39
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
int wal_receiver_status_interval
Definition: walreceiver.c:89
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1711
static WalReceiverConn * wrconn
Definition: walreceiver.c:94
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static StringInfoData reply_message
Definition: walreceiver.c:118
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11927
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:424
#define elog(elevel,...)
Definition: elog.h:232
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
static struct @17 LogstreamResult

◆ XLogWalRcvWrite()

static void XLogWalRcvWrite ( char *  buf,
Size  nbytes,
XLogRecPtr  recptr 
)
static

Definition at line 878 of file walreceiver.c.

References ereport, errcode_for_file_access(), errmsg(), LogstreamResult, MAXFNAMELEN, PANIC, pg_atomic_write_u64(), pg_pwrite(), recvFile, recvFileTLI, recvSegNo, ThisTimeLineID, wal_segment_size, WalRcv, WalRcvData::writtenUpto, XLByteInSeg, XLByteToSeg, XLogFileInit(), XLogFileName, XLogSegmentOffset, and XLogWalRcvClose().

Referenced by XLogWalRcvProcessMsg().

879 {
880  int startoff;
881  int byteswritten;
882 
883  while (nbytes > 0)
884  {
885  int segbytes;
886 
887  /* Close the current segment if it's completed */
888  if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
889  XLogWalRcvClose(recptr);
890 
891  if (recvFile < 0)
892  {
893  /* Create/use new log file */
897  }
898 
899  /* Calculate the start offset of the received logs */
900  startoff = XLogSegmentOffset(recptr, wal_segment_size);
901 
902  if (startoff + nbytes > wal_segment_size)
903  segbytes = wal_segment_size - startoff;
904  else
905  segbytes = nbytes;
906 
907  /* OK to write the logs */
908  errno = 0;
909 
910  byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
911  if (byteswritten <= 0)
912  {
913  char xlogfname[MAXFNAMELEN];
914  int save_errno;
915 
916  /* if write didn't set errno, assume no disk space */
917  if (errno == 0)
918  errno = ENOSPC;
919 
920  save_errno = errno;
922  errno = save_errno;
923  ereport(PANIC,
925  errmsg("could not write to log segment %s "
926  "at offset %u, length %lu: %m",
927  xlogfname, startoff, (unsigned long) segbytes)));
928  }
929 
930  /* Update state for write */
931  recptr += byteswritten;
932 
933  nbytes -= byteswritten;
934  buf += byteswritten;
935 
936  LogstreamResult.Write = recptr;
937  }
938 
939  /* Update shared-memory status */
941 
942  /*
943  * Close the current segment if it's fully written up in the last cycle of
944  * the loop, to create its archive notification file soon. Otherwise WAL
945  * archiving of the segment will be delayed until any data in the next
946  * segment is received and written.
947  */
948  if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
949  XLogWalRcvClose(recptr);
950 }
int wal_segment_size
Definition: xlog.c:119
int XLogFileInit(XLogSegNo logsegno)
Definition: xlog.c:3472
#define PANIC
Definition: elog.h:50
ssize_t pg_pwrite(int fd, const void *buf, size_t nbyte, off_t offset)
Definition: pwrite.c:27
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:438
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:153
static char * buf
Definition: pg_test_fsync.c:68
int errcode_for_file_access(void)
Definition: elog.c:721
static TimeLineID recvFileTLI
Definition: walreceiver.c:105
static XLogSegNo recvSegNo
Definition: walreceiver.c:106
static void XLogWalRcvClose(XLogRecPtr recptr)
Definition: walreceiver.c:1012
#define MAXFNAMELEN
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
TimeLineID ThisTimeLineID
Definition: xlog.c:194
#define ereport(elevel,...)
Definition: elog.h:157
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
WalRcvData * WalRcv
int errmsg(const char *fmt,...)
Definition: elog.c:909
static int recvFile
Definition: walreceiver.c:104
static struct @17 LogstreamResult
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

Variable Documentation

◆ Flush

XLogRecPtr Flush

Definition at line 115 of file walreceiver.c.

◆ hot_standby_feedback

bool hot_standby_feedback

Definition at line 91 of file walreceiver.c.

Referenced by XLogWalRcvSendHSFeedback().

◆ incoming_message

StringInfoData incoming_message
static

Definition at line 119 of file walreceiver.c.

◆ LogstreamResult

struct { ... } LogstreamResult

◆ recvFile

int recvFile = -1
static

Definition at line 104 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ recvFileTLI

TimeLineID recvFileTLI = 0
static

Definition at line 105 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), and XLogWalRcvWrite().

◆ recvSegNo

XLogSegNo recvSegNo = 0
static

Definition at line 106 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ reply_message

StringInfoData reply_message
static

Definition at line 118 of file walreceiver.c.

Referenced by send_feedback().

◆ wal_receiver_status_interval

int wal_receiver_status_interval

Definition at line 89 of file walreceiver.c.

Referenced by send_feedback(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ wal_receiver_timeout

int wal_receiver_timeout

Definition at line 90 of file walreceiver.c.

Referenced by logicalrep_worker_launch(), LogicalRepApplyLoop(), and WalReceiverMain().

◆ WalReceiverFunctions

WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 95 of file walreceiver.c.

Referenced by _PG_init().

◆ wrconn

WalReceiverConn* wrconn = NULL
static

Definition at line 94 of file walreceiver.c.

Referenced by AlterSubscription_refresh(), CreateSubscription(), and DropSubscription().

◆ Write

XLogRecPtr Write

Definition at line 114 of file walreceiver.c.