PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
walreceiver.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */
 

Functions

static void ProcessWalRcvInterrupts (void)
 
static void EnableWalRcvImmediateExit (void)
 
static void DisableWalRcvImmediateExit (void)
 
static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr)
 
static void XLogWalRcvFlush (bool dying)
 
static void XLogWalRcvSendReply (bool force, bool requestReply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
static void WalRcvSigHupHandler (SIGNAL_ARGS)
 
static void WalRcvSigUsr1Handler (SIGNAL_ARGS)
 
static void WalRcvShutdownHandler (SIGNAL_ARGS)
 
static void WalRcvQuickDieHandler (SIGNAL_ARGS)
 
void WalReceiverMain (void)
 
void WalRcvForceReply (void)
 
static const char * WalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
static uint32 recvOff = 0
 
static volatile sig_atomic_t got_SIGHUP = false
 
static volatile sig_atomic_t got_SIGTERM = false
 
struct {
   XLogRecPtr   Write
 
   XLogRecPtr   Flush
 
LogstreamResult
 
static StringInfoData reply_message
 
static StringInfoData incoming_message
 
static volatile bool WalRcvImmediateInterruptOK = false
 

Macro Definition Documentation

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */

Definition at line 82 of file walreceiver.c.

Referenced by WalReceiverMain().

Function Documentation

static void DisableWalRcvImmediateExit ( void  )
static

Definition at line 180 of file walreceiver.c.

References ProcessWalRcvInterrupts(), and WalRcvImmediateInterruptOK.

Referenced by WalRcvFetchTimeLineHistoryFiles(), and WalReceiverMain().

181 {
184 }
static volatile bool WalRcvImmediateInterruptOK
Definition: walreceiver.c:130
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:154
static void EnableWalRcvImmediateExit ( void  )
static

Definition at line 173 of file walreceiver.c.

References ProcessWalRcvInterrupts(), and WalRcvImmediateInterruptOK.

Referenced by WalRcvFetchTimeLineHistoryFiles(), and WalReceiverMain().

174 {
177 }
static volatile bool WalRcvImmediateInterruptOK
Definition: walreceiver.c:130
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:154
Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1378 of file walreceiver.c.

References WalRcvData::conninfo, CStringGetTextDatum, DEFAULT_ROLE_READ_ALL_STATS, elog, ERROR, get_call_result_type(), GetUserId(), heap_form_tuple(), HeapTupleGetDatum, Int32GetDatum, is_member_of_role(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum, MemSet, WalRcvData::mutex, tupleDesc::natts, NULL, palloc0(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, pstrdup(), WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receivedUpto, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, TimestampTzGetDatum, TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, and XLogRecPtrIsInvalid.

1379 {
1380  TupleDesc tupdesc;
1381  Datum *values;
1382  bool *nulls;
1383  WalRcvData *walrcv = WalRcv;
1385  XLogRecPtr receive_start_lsn;
1386  TimeLineID receive_start_tli;
1387  XLogRecPtr received_lsn;
1388  TimeLineID received_tli;
1389  TimestampTz last_send_time;
1390  TimestampTz last_receipt_time;
1391  XLogRecPtr latest_end_lsn;
1392  TimestampTz latest_end_time;
1393  char *slotname;
1394  char *conninfo;
1395 
1396  /*
1397  * No WAL receiver (or not ready yet), just return a tuple with NULL
1398  * values
1399  */
1400  if (walrcv->pid == 0 || !walrcv->ready_to_display)
1401  PG_RETURN_NULL();
1402 
1403  /* determine result type */
1404  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1405  elog(ERROR, "return type must be a row type");
1406 
1407  values = palloc0(sizeof(Datum) * tupdesc->natts);
1408  nulls = palloc0(sizeof(bool) * tupdesc->natts);
1409 
1410  /* Take a lock to ensure value consistency */
1411  SpinLockAcquire(&walrcv->mutex);
1412  state = walrcv->walRcvState;
1413  receive_start_lsn = walrcv->receiveStart;
1414  receive_start_tli = walrcv->receiveStartTLI;
1415  received_lsn = walrcv->receivedUpto;
1416  received_tli = walrcv->receivedTLI;
1417  last_send_time = walrcv->lastMsgSendTime;
1418  last_receipt_time = walrcv->lastMsgReceiptTime;
1419  latest_end_lsn = walrcv->latestWalEnd;
1420  latest_end_time = walrcv->latestWalEndTime;
1421  slotname = pstrdup(walrcv->slotname);
1422  conninfo = pstrdup(walrcv->conninfo);
1423  SpinLockRelease(&walrcv->mutex);
1424 
1425  /* Fetch values */
1426  values[0] = Int32GetDatum(walrcv->pid);
1427 
1429  {
1430  /*
1431  * Only superusers can see details. Other users only get the pid value
1432  * to know whether it is a WAL receiver, but no details.
1433  */
1434  MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1435  }
1436  else
1437  {
1438  values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1439 
1440  if (XLogRecPtrIsInvalid(receive_start_lsn))
1441  nulls[2] = true;
1442  else
1443  values[2] = LSNGetDatum(receive_start_lsn);
1444  values[3] = Int32GetDatum(receive_start_tli);
1445  if (XLogRecPtrIsInvalid(received_lsn))
1446  nulls[4] = true;
1447  else
1448  values[4] = LSNGetDatum(received_lsn);
1449  values[5] = Int32GetDatum(received_tli);
1450  if (last_send_time == 0)
1451  nulls[6] = true;
1452  else
1453  values[6] = TimestampTzGetDatum(last_send_time);
1454  if (last_receipt_time == 0)
1455  nulls[7] = true;
1456  else
1457  values[7] = TimestampTzGetDatum(last_receipt_time);
1458  if (XLogRecPtrIsInvalid(latest_end_lsn))
1459  nulls[8] = true;
1460  else
1461  values[8] = LSNGetDatum(latest_end_lsn);
1462  if (latest_end_time == 0)
1463  nulls[9] = true;
1464  else
1465  values[9] = TimestampTzGetDatum(latest_end_time);
1466  if (*slotname == '\0')
1467  nulls[10] = true;
1468  else
1469  values[10] = CStringGetTextDatum(slotname);
1470  if (*conninfo == '\0')
1471  nulls[11] = true;
1472  else
1473  values[11] = CStringGetTextDatum(conninfo);
1474  }
1475 
1476  /* Returns the record as Datum */
1478  heap_form_tuple(tupdesc, values, nulls)));
1479 }
static const char * WalRcvGetStateString(WalRcvState state)
Definition: walreceiver.c:1353
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
uint32 TimeLineID
Definition: xlogdefs.h:45
slock_t mutex
Definition: walreceiver.h:117
Oid GetUserId(void)
Definition: miscinit.c:283
int64 TimestampTz
Definition: timestamp.h:39
WalRcvState walRcvState
Definition: walreceiver.h:63
char * pstrdup(const char *in)
Definition: mcxt.c:1077
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
TimeLineID receivedTLI
Definition: walreceiver.h:83
#define MemSet(start, val, len)
Definition: c.h:857
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
int natts
Definition: tupdesc.h:73
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:97
TimestampTz lastMsgSendTime
Definition: walreceiver.h:96
WalRcvState
Definition: walreceiver.h:43
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
pid_t pid
Definition: walreceiver.h:62
XLogRecPtr receivedUpto
Definition: walreceiver.h:82
TimeLineID receiveStartTLI
Definition: walreceiver.h:73
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
void * palloc0(Size size)
Definition: mcxt.c:878
uintptr_t Datum
Definition: postgres.h:372
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:313
bool ready_to_display
Definition: walreceiver.h:126
TimestampTz latestWalEndTime
Definition: walreceiver.h:103
#define DEFAULT_ROLE_READ_ALL_STATS
Definition: pg_authid.h:108
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4837
XLogRecPtr latestWalEnd
Definition: walreceiver.h:102
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:222
WalRcvData * WalRcv
static Datum values[MAXATTR]
Definition: bootstrap.c:163
#define Int32GetDatum(X)
Definition: postgres.h:485
#define CStringGetTextDatum(s)
Definition: builtins.h:91
#define elog
Definition: elog.h:219
XLogRecPtr receiveStart
Definition: walreceiver.h:72
#define PG_RETURN_NULL()
Definition: fmgr.h:305
char slotname[NAMEDATALEN]
Definition: walreceiver.h:115
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:109
static void ProcessWalRcvInterrupts ( void  )
static

Definition at line 154 of file walreceiver.c.

References CHECK_FOR_INTERRUPTS, ereport, errcode(), errmsg(), FATAL, got_SIGTERM, and WalRcvImmediateInterruptOK.

Referenced by DisableWalRcvImmediateExit(), EnableWalRcvImmediateExit(), WalRcvShutdownHandler(), WalRcvWaitForStartPosition(), and WalReceiverMain().

155 {
156  /*
157  * Although walreceiver interrupt handling doesn't use the same scheme as
158  * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
159  * any incoming signals on Win32.
160  */
162 
163  if (got_SIGTERM)
164  {
166  ereport(FATAL,
167  (errcode(ERRCODE_ADMIN_SHUTDOWN),
168  errmsg("terminating walreceiver process due to administrator command")));
169  }
170 }
static volatile bool WalRcvImmediateInterruptOK
Definition: walreceiver.c:130
int errcode(int sqlerrcode)
Definition: elog.c:575
#define FATAL
Definition: elog.h:52
#define ereport(elevel, rest)
Definition: elog.h:122
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
static volatile sig_atomic_t got_SIGTERM
Definition: walreceiver.c:99
static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1288 of file walreceiver.c.

References DEBUG2, elog, GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, log_min_messages, WalRcvData::mutex, pfree(), pstrdup(), SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

1289 {
1290  WalRcvData *walrcv = WalRcv;
1291 
1292  TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1293 
1294  /* Update shared-memory status */
1295  SpinLockAcquire(&walrcv->mutex);
1296  if (walrcv->latestWalEnd < walEnd)
1297  walrcv->latestWalEndTime = sendTime;
1298  walrcv->latestWalEnd = walEnd;
1299  walrcv->lastMsgSendTime = sendTime;
1300  walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1301  SpinLockRelease(&walrcv->mutex);
1302 
1303  if (log_min_messages <= DEBUG2)
1304  {
1305  char *sendtime;
1306  char *receipttime;
1307  int applyDelay;
1308 
1309  /* Copy because timestamptz_to_str returns a static buffer */
1310  sendtime = pstrdup(timestamptz_to_str(sendTime));
1311  receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1312  applyDelay = GetReplicationApplyDelay();
1313 
1314  /* apply delay is not available */
1315  if (applyDelay == -1)
1316  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1317  sendtime,
1318  receipttime,
1320  else
1321  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1322  sendtime,
1323  receipttime,
1324  applyDelay,
1326 
1327  pfree(sendtime);
1328  pfree(receipttime);
1329  }
1330 }
slock_t mutex
Definition: walreceiver.h:117
int GetReplicationTransferLatency(void)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1077
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:97
TimestampTz lastMsgSendTime
Definition: walreceiver.h:96
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pfree(void *pointer)
Definition: mcxt.c:950
#define DEBUG2
Definition: elog.h:24
int GetReplicationApplyDelay(void)
#define SpinLockRelease(lock)
Definition: spin.h:64
TimestampTz latestWalEndTime
Definition: walreceiver.h:103
int log_min_messages
Definition: guc.c:451
XLogRecPtr latestWalEnd
Definition: walreceiver.h:102
WalRcvData * WalRcv
#define elog
Definition: elog.h:219
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1710
static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 769 of file walreceiver.c.

References Assert, WalRcvData::latch, WalRcvData::mutex, MyProcPid, NULL, WalRcvData::pid, WalRcvData::ready_to_display, SpinLockAcquire, SpinLockRelease, WakeupRecovery(), WalRcv, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

770 {
771  WalRcvData *walrcv = WalRcv;
772 
773  /* Ensure that all WAL records received are flushed to disk */
774  XLogWalRcvFlush(true);
775 
776  walrcv->latch = NULL;
777 
778  SpinLockAcquire(&walrcv->mutex);
779  Assert(walrcv->walRcvState == WALRCV_STREAMING ||
780  walrcv->walRcvState == WALRCV_RESTARTING ||
781  walrcv->walRcvState == WALRCV_STARTING ||
782  walrcv->walRcvState == WALRCV_WAITING ||
783  walrcv->walRcvState == WALRCV_STOPPING);
784  Assert(walrcv->pid == MyProcPid);
785  walrcv->walRcvState = WALRCV_STOPPED;
786  walrcv->pid = 0;
787  walrcv->ready_to_display = false;
788  SpinLockRelease(&walrcv->mutex);
789 
790  /* Terminate the connection gracefully. */
791  if (wrconn != NULL)
793 
794  /* Wake up the startup process to notice promptly that we're gone */
795  WakeupRecovery();
796 }
int MyProcPid
Definition: globals.c:38
slock_t mutex
Definition: walreceiver.h:117
WalRcvState walRcvState
Definition: walreceiver.h:63
static WalReceiverConn * wrconn
Definition: walreceiver.c:79
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12092
pid_t pid
Definition: walreceiver.h:62
Latch * latch
Definition: walreceiver.h:135
#define SpinLockRelease(lock)
Definition: spin.h:64
bool ready_to_display
Definition: walreceiver.h:126
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
#define walrcv_disconnect(conn)
Definition: walreceiver.h:264
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1047
WalRcvData * WalRcv
static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 720 of file walreceiver.c.

References DisableWalRcvImmediateExit(), EnableWalRcvImmediateExit(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, existsTimeLineHistory(), LOG, MAXFNAMELEN, pfree(), TLHistoryFileName, walrcv_readtimelinehistoryfile, and writeTimeLineHistoryFile().

Referenced by WalReceiverMain().

721 {
722  TimeLineID tli;
723 
724  for (tli = first; tli <= last; tli++)
725  {
726  /* there's no history file for timeline 1 */
727  if (tli != 1 && !existsTimeLineHistory(tli))
728  {
729  char *fname;
730  char *content;
731  int len;
732  char expectedfname[MAXFNAMELEN];
733 
734  ereport(LOG,
735  (errmsg("fetching timeline history file for timeline %u from primary server",
736  tli)));
737 
739  walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
741 
742  /*
743  * Check that the filename on the master matches what we
744  * calculated ourselves. This is just a sanity check, it should
745  * always match.
746  */
747  TLHistoryFileName(expectedfname, tli);
748  if (strcmp(fname, expectedfname) != 0)
749  ereport(ERROR,
750  (errcode(ERRCODE_PROTOCOL_VIOLATION),
751  errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
752  tli)));
753 
754  /*
755  * Write the file to pg_wal.
756  */
757  writeTimeLineHistoryFile(tli, content, len);
758 
759  pfree(fname);
760  pfree(content);
761  }
762  }
763 }
uint32 TimeLineID
Definition: xlogdefs.h:45
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LOG
Definition: elog.h:26
static WalReceiverConn * wrconn
Definition: walreceiver.c:79
void pfree(void *pointer)
Definition: mcxt.c:950
#define ERROR
Definition: elog.h:43
#define TLHistoryFileName(fname, tli)
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition: timeline.c:448
static void DisableWalRcvImmediateExit(void)
Definition: walreceiver.c:180
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition: timeline.c:207
#define ereport(elevel, rest)
Definition: elog.h:122
#define MAXFNAMELEN
static void EnableWalRcvImmediateExit(void)
Definition: walreceiver.c:173
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
Definition: walreceiver.h:250
int errmsg(const char *fmt,...)
Definition: elog.c:797
void WalRcvForceReply ( void  )

Definition at line 1341 of file walreceiver.c.

References WalRcvData::force_reply, WalRcvData::latch, SetLatch(), and WalRcv.

Referenced by StartupXLOG(), and WaitForWALToBecomeAvailable().

1342 {
1343  WalRcv->force_reply = true;
1344  if (WalRcv->latch)
1345  SetLatch(WalRcv->latch);
1346 }
Latch * latch
Definition: walreceiver.h:135
bool force_reply
Definition: walreceiver.h:123
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
WalRcvData * WalRcv
static const char* WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1353 of file walreceiver.c.

References WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

1354 {
1355  switch (state)
1356  {
1357  case WALRCV_STOPPED:
1358  return "stopped";
1359  case WALRCV_STARTING:
1360  return "starting";
1361  case WALRCV_STREAMING:
1362  return "streaming";
1363  case WALRCV_WAITING:
1364  return "waiting";
1365  case WALRCV_RESTARTING:
1366  return "restarting";
1367  case WALRCV_STOPPING:
1368  return "stopping";
1369  }
1370  return "UNKNOWN";
1371 }
Definition: regguts.h:298
static void WalRcvQuickDieHandler ( SIGNAL_ARGS  )
static

Definition at line 842 of file walreceiver.c.

References BlockSig, on_exit_reset(), and PG_SETMASK.

Referenced by WalReceiverMain().

843 {
845 
846  /*
847  * We DO NOT want to run proc_exit() callbacks -- we're here because
848  * shared memory may be corrupted, so we don't want to try to clean up our
849  * transaction. Just nail the windows shut and get out of town. Now that
850  * there's an atexit callback to prevent third-party code from breaking
851  * things by calling exit() directly, we have to reset the callbacks
852  * explicitly to make this work as intended.
853  */
854  on_exit_reset();
855 
856  /*
857  * Note we do exit(2) not exit(0). This is to force the postmaster into a
858  * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
859  * backend. This is necessary precisely because we don't clean up our
860  * shared memory state. (The "dead man switch" mechanism in pmsignal.c
861  * should ensure the postmaster sees this as a crash, too, but no harm in
862  * being doubly sure.)
863  */
864  exit(2);
865 }
#define PG_SETMASK(mask)
Definition: pqsignal.h:19
void on_exit_reset(void)
Definition: ipc.c:396
sigset_t BlockSig
Definition: pqsignal.c:22
static void WalRcvShutdownHandler ( SIGNAL_ARGS  )
static

Definition at line 819 of file walreceiver.c.

References got_SIGTERM, WalRcvData::latch, proc_exit_inprogress, ProcessWalRcvInterrupts(), SetLatch(), WalRcv, and WalRcvImmediateInterruptOK.

Referenced by WalReceiverMain().

820 {
821  int save_errno = errno;
822 
823  got_SIGTERM = true;
824 
825  if (WalRcv->latch)
827 
828  /* Don't joggle the elbow of proc_exit */
831 
832  errno = save_errno;
833 }
static volatile bool WalRcvImmediateInterruptOK
Definition: walreceiver.c:130
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:154
Latch * latch
Definition: walreceiver.h:135
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
bool proc_exit_inprogress
Definition: ipc.c:40
WalRcvData * WalRcv
static volatile sig_atomic_t got_SIGTERM
Definition: walreceiver.c:99
static void WalRcvSigHupHandler ( SIGNAL_ARGS  )
static

Definition at line 800 of file walreceiver.c.

References got_SIGHUP.

Referenced by WalReceiverMain().

801 {
802  got_SIGHUP = true;
803 }
static volatile sig_atomic_t got_SIGHUP
Definition: walreceiver.c:98
static void WalRcvSigUsr1Handler ( SIGNAL_ARGS  )
static

Definition at line 808 of file walreceiver.c.

References latch_sigusr1_handler().

Referenced by WalReceiverMain().

809 {
810  int save_errno = errno;
811 
813 
814  errno = save_errno;
815 }
void latch_sigusr1_handler(void)
Definition: latch.c:1467
static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 635 of file walreceiver.c.

References Assert, elog, FATAL, InvalidXLogRecPtr, WalRcvData::latch, WalRcvData::mutex, PostmasterIsAlive(), proc_exit(), ProcessWalRcvInterrupts(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, ResetLatch(), set_ps_display(), snprintf(), SpinLockAcquire, SpinLockRelease, update_process_title, WAIT_EVENT_WAL_RECEIVER_WAIT_START, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WL_LATCH_SET, and WL_POSTMASTER_DEATH.

Referenced by WalReceiverMain().

636 {
637  WalRcvData *walrcv = WalRcv;
638  int state;
639 
640  SpinLockAcquire(&walrcv->mutex);
641  state = walrcv->walRcvState;
642  if (state != WALRCV_STREAMING)
643  {
644  SpinLockRelease(&walrcv->mutex);
645  if (state == WALRCV_STOPPING)
646  proc_exit(0);
647  else
648  elog(FATAL, "unexpected walreceiver state");
649  }
650  walrcv->walRcvState = WALRCV_WAITING;
652  walrcv->receiveStartTLI = 0;
653  SpinLockRelease(&walrcv->mutex);
654 
656  set_ps_display("idle", false);
657 
658  /*
659  * nudge startup process to notice that we've stopped streaming and are
660  * now waiting for instructions.
661  */
662  WakeupRecovery();
663  for (;;)
664  {
665  ResetLatch(walrcv->latch);
666 
667  /*
668  * Emergency bailout if postmaster has died. This is to avoid the
669  * necessity for manual cleanup of all postmaster children.
670  */
671  if (!PostmasterIsAlive())
672  exit(1);
673 
675 
676  SpinLockAcquire(&walrcv->mutex);
677  Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
678  walrcv->walRcvState == WALRCV_WAITING ||
679  walrcv->walRcvState == WALRCV_STOPPING);
680  if (walrcv->walRcvState == WALRCV_RESTARTING)
681  {
682  /* we don't expect primary_conninfo to change */
683  *startpoint = walrcv->receiveStart;
684  *startpointTLI = walrcv->receiveStartTLI;
685  walrcv->walRcvState = WALRCV_STREAMING;
686  SpinLockRelease(&walrcv->mutex);
687  break;
688  }
689  if (walrcv->walRcvState == WALRCV_STOPPING)
690  {
691  /*
692  * We should've received SIGTERM if the startup process wants us
693  * to die, but might as well check it here too.
694  */
695  SpinLockRelease(&walrcv->mutex);
696  exit(1);
697  }
698  SpinLockRelease(&walrcv->mutex);
699 
702  }
703 
705  {
706  char activitymsg[50];
707 
708  snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
709  (uint32) (*startpoint >> 32),
710  (uint32) *startpoint);
711  set_ps_display(activitymsg, false);
712  }
713 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
slock_t mutex
Definition: walreceiver.h:117
bool update_process_title
Definition: ps_status.c:35
WalRcvState walRcvState
Definition: walreceiver.h:63
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:154
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:326
void proc_exit(int code)
Definition: ipc.c:99
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void ResetLatch(volatile Latch *latch)
Definition: latch.c:498
#define SpinLockAcquire(lock)
Definition: spin.h:62
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:336
void WakeupRecovery(void)
Definition: xlog.c:12092
#define FATAL
Definition: elog.h:52
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
unsigned int uint32
Definition: c.h:268
TimeLineID receiveStartTLI
Definition: walreceiver.h:73
Latch * latch
Definition: walreceiver.h:135
#define SpinLockRelease(lock)
Definition: spin.h:64
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
#define Assert(condition)
Definition: c.h:675
Definition: regguts.h:298
WalRcvData * WalRcv
#define elog
Definition: elog.h:219
XLogRecPtr receiveStart
Definition: walreceiver.h:72
#define WL_LATCH_SET
Definition: latch.h:124
void WalReceiverMain ( void  )

Definition at line 188 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, Assert, BlockSig, buf, close, WalRcvData::conninfo, CurrentResourceOwner, DEBUG1, DisableWalRcvImmediateExit(), elog, EnableWalRcvImmediateExit(), ereport, errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, WalRcvData::force_reply, GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), got_SIGHUP, initStringInfo(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latch, WalRcvData::latestWalEndTime, load_file(), LOG, WalRcvStreamOptions::logical, LogstreamResult, MAXCONNINFO, MAXFNAMELEN, WalRcvData::mutex, MyProc, MyProcPid, NAMEDATALEN, NAPTIME_PER_CYCLE, now(), NULL, on_shmem_exit(), options, PANIC, pfree(), pg_memory_barrier, PG_SETMASK, PGC_SIGHUP, PGINVALID_SOCKET, WalRcvStreamOptions::physical, WalRcvData::pid, pqsignal(), proc_exit(), ProcessConfigFile(), ProcessWalRcvInterrupts(), PGPROC::procLatch, WalRcvStreamOptions::proto, WalRcvData::ready_to_display, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, ResetLatch(), ResourceOwnerCreate(), server_version, SIG_DFL, SIG_IGN, SIGALRM, SIGCHLD, SIGCONT, SIGHUP, SIGPIPE, SIGQUIT, SIGTTIN, SIGTTOU, SIGUSR1, SIGUSR2, SIGWINCH, WalRcvData::slotname, WalRcvStreamOptions::slotname, snprintf(), SpinLockAcquire, SpinLockRelease, WalRcvStreamOptions::startpoint, strlcpy(), ThisTimeLineID, TimestampTzPlusMilliseconds, UINT64_FORMAT, UnBlockSig, WAIT_EVENT_WAL_RECEIVER_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, WalRcv, walrcv_connect, walrcv_endstreaming, walrcv_get_conninfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvQuickDieHandler(), WalRcvShutdownHandler(), WalRcvSigHupHandler(), WalRcvSigUsr1Handler(), WalRcvData::walRcvState, WalRcvWaitForStartPosition(), WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_TIMEOUT, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName, XLogFileNameP(), XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by AuxiliaryProcessMain().

189 {
190  char conninfo[MAXCONNINFO];
191  char *tmp_conninfo;
192  char slotname[NAMEDATALEN];
193  XLogRecPtr startpoint;
194  TimeLineID startpointTLI;
195  TimeLineID primaryTLI;
196  bool first_stream;
197  WalRcvData *walrcv = WalRcv;
198  TimestampTz last_recv_timestamp;
199  bool ping_sent;
200  char *err;
201 
202  /*
203  * WalRcv should be set up already (if we are a backend, we inherit this
204  * by fork() or EXEC_BACKEND mechanism from the postmaster).
205  */
206  Assert(walrcv != NULL);
207 
208  /*
209  * Mark walreceiver as running in shared memory.
210  *
211  * Do this as early as possible, so that if we fail later on, we'll set
212  * state to STOPPED. If we die before this, the startup process will keep
213  * waiting for us to start up, until it times out.
214  */
215  SpinLockAcquire(&walrcv->mutex);
216  Assert(walrcv->pid == 0);
217  switch (walrcv->walRcvState)
218  {
219  case WALRCV_STOPPING:
220  /* If we've already been requested to stop, don't start up. */
221  walrcv->walRcvState = WALRCV_STOPPED;
222  /* fall through */
223 
224  case WALRCV_STOPPED:
225  SpinLockRelease(&walrcv->mutex);
226  proc_exit(1);
227  break;
228 
229  case WALRCV_STARTING:
230  /* The usual case */
231  break;
232 
233  case WALRCV_WAITING:
234  case WALRCV_STREAMING:
235  case WALRCV_RESTARTING:
236  default:
237  /* Shouldn't happen */
238  elog(PANIC, "walreceiver still running according to shared memory state");
239  }
240  /* Advertise our PID so that the startup process can kill us */
241  walrcv->pid = MyProcPid;
242  walrcv->walRcvState = WALRCV_STREAMING;
243 
244  /* Fetch information required to start streaming */
245  walrcv->ready_to_display = false;
246  strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
247  strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
248  startpoint = walrcv->receiveStart;
249  startpointTLI = walrcv->receiveStartTLI;
250 
251  /* Initialise to a sanish value */
253 
254  SpinLockRelease(&walrcv->mutex);
255 
256  /* Arrange to clean up at walreceiver exit */
258 
259  walrcv->latch = &MyProc->procLatch;
260 
261  /* Properly accept or ignore signals the postmaster might send us */
262  pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config
263  * file */
264  pqsignal(SIGINT, SIG_IGN);
265  pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */
266  pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */
271 
272  /* Reset some signals that are accepted by postmaster but not here */
278 
279  /* We allow SIGQUIT (quickdie) at all times */
280  sigdelset(&BlockSig, SIGQUIT);
281 
282  /* Load the libpq-specific functions */
283  load_file("libpqwalreceiver", false);
284  if (WalReceiverFunctions == NULL)
285  elog(ERROR, "libpqwalreceiver didn't initialize correctly");
286 
287  /*
288  * Create a resource owner to keep track of our resources (not clear that
289  * we need this, but may as well have one).
290  */
291  CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
292 
293  /* Unblock signals (they were blocked when the postmaster forked us) */
295 
296  /* Establish the connection to the primary for XLOG streaming */
298  wrconn = walrcv_connect(conninfo, false, "walreceiver", &err);
299  if (!wrconn)
300  ereport(ERROR,
301  (errmsg("could not connect to the primary server: %s", err)));
303 
304  /*
305  * Save user-visible connection string. This clobbers the original
306  * conninfo, for security.
307  */
308  tmp_conninfo = walrcv_get_conninfo(wrconn);
309  SpinLockAcquire(&walrcv->mutex);
310  memset(walrcv->conninfo, 0, MAXCONNINFO);
311  if (tmp_conninfo)
312  {
313  strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
314  pfree(tmp_conninfo);
315  }
316  walrcv->ready_to_display = true;
317  SpinLockRelease(&walrcv->mutex);
318 
319  first_stream = true;
320  for (;;)
321  {
322  char *primary_sysid;
323  char standby_sysid[32];
324  int server_version;
326 
327  /*
328  * Check that we're connected to a valid server using the
329  * IDENTIFY_SYSTEM replication command.
330  */
332  primary_sysid = walrcv_identify_system(wrconn, &primaryTLI,
333  &server_version);
334 
335  snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
337  if (strcmp(primary_sysid, standby_sysid) != 0)
338  {
339  ereport(ERROR,
340  (errmsg("database system identifier differs between the primary and standby"),
341  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
342  primary_sysid, standby_sysid)));
343  }
345 
346  /*
347  * Confirm that the current timeline of the primary is the same or
348  * ahead of ours.
349  */
350  if (primaryTLI < startpointTLI)
351  ereport(ERROR,
352  (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
353  primaryTLI, startpointTLI)));
354 
355  /*
356  * Get any missing history files. We do this always, even when we're
357  * not interested in that timeline, so that if we're promoted to
358  * become the master later on, we don't select the same timeline that
359  * was already used in the current master. This isn't bullet-proof -
360  * you'll need some external software to manage your cluster if you
361  * need to ensure that a unique timeline id is chosen in every case,
362  * but let's avoid the confusion of timeline id collisions where we
363  * can.
364  */
365  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
366 
367  /*
368  * Start streaming.
369  *
370  * We'll try to start at the requested starting point and timeline,
371  * even if it's different from the server's latest timeline. In case
372  * we've already reached the end of the old timeline, the server will
373  * finish the streaming immediately, and we will go back to await
374  * orders from the startup process. If recovery_target_timeline is
375  * 'latest', the startup process will scan pg_wal and find the new
376  * history file, bump recovery target timeline, and ask us to restart
377  * on the new timeline.
378  */
379  options.logical = false;
380  options.startpoint = startpoint;
381  options.slotname = slotname[0] != '\0' ? slotname : NULL;
382  options.proto.physical.startpointTLI = startpointTLI;
383  ThisTimeLineID = startpointTLI;
384  if (walrcv_startstreaming(wrconn, &options))
385  {
386  if (first_stream)
387  ereport(LOG,
388  (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
389  (uint32) (startpoint >> 32), (uint32) startpoint,
390  startpointTLI)));
391  else
392  ereport(LOG,
393  (errmsg("restarted WAL streaming at %X/%X on timeline %u",
394  (uint32) (startpoint >> 32), (uint32) startpoint,
395  startpointTLI)));
396  first_stream = false;
397 
398  /* Initialize LogstreamResult and buffers for processing messages */
402 
403  /* Initialize the last recv timestamp */
404  last_recv_timestamp = GetCurrentTimestamp();
405  ping_sent = false;
406 
407  /* Loop until end-of-streaming or error */
408  for (;;)
409  {
410  char *buf;
411  int len;
412  bool endofwal = false;
413  pgsocket wait_fd = PGINVALID_SOCKET;
414  int rc;
415 
416  /*
417  * Exit walreceiver if we're not in recovery. This should not
418  * happen, but cross-check the status here.
419  */
420  if (!RecoveryInProgress())
421  ereport(FATAL,
422  (errmsg("cannot continue WAL streaming, recovery has already ended")));
423 
424  /* Process any requests or signals received recently */
426 
427  if (got_SIGHUP)
428  {
429  got_SIGHUP = false;
432  }
433 
434  /* See if we can read data immediately */
435  len = walrcv_receive(wrconn, &buf, &wait_fd);
436  if (len != 0)
437  {
438  /*
439  * Process the received data, and any subsequent data we
440  * can read without blocking.
441  */
442  for (;;)
443  {
444  if (len > 0)
445  {
446  /*
447  * Something was received from master, so reset
448  * timeout
449  */
450  last_recv_timestamp = GetCurrentTimestamp();
451  ping_sent = false;
452  XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
453  }
454  else if (len == 0)
455  break;
456  else if (len < 0)
457  {
458  ereport(LOG,
459  (errmsg("replication terminated by primary server"),
460  errdetail("End of WAL reached on timeline %u at %X/%X.",
461  startpointTLI,
462  (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
463  endofwal = true;
464  break;
465  }
466  len = walrcv_receive(wrconn, &buf, &wait_fd);
467  }
468 
469  /* Let the master know that we received some data. */
470  XLogWalRcvSendReply(false, false);
471 
472  /*
473  * If we've written some records, flush them to disk and
474  * let the startup process and primary server know about
475  * them.
476  */
477  XLogWalRcvFlush(false);
478  }
479 
480  /* Check if we need to exit the streaming loop. */
481  if (endofwal)
482  break;
483 
484  /*
485  * Ideally we would reuse a WaitEventSet object repeatedly
486  * here to avoid the overheads of WaitLatchOrSocket on epoll
487  * systems, but we can't be sure that libpq (or any other
488  * walreceiver implementation) has the same socket (even if
489  * the fd is the same number, it may have been closed and
490  * reopened since the last time). In future, if there is a
491  * function for removing sockets from WaitEventSet, then we
492  * could add and remove just the socket each time, potentially
493  * avoiding some system calls.
494  */
495  Assert(wait_fd != PGINVALID_SOCKET);
496  rc = WaitLatchOrSocket(walrcv->latch,
499  wait_fd,
502  if (rc & WL_LATCH_SET)
503  {
504  ResetLatch(walrcv->latch);
505  if (walrcv->force_reply)
506  {
507  /*
508  * The recovery process has asked us to send apply
509  * feedback now. Make sure the flag is really set to
510  * false in shared memory before sending the reply, so
511  * we don't miss a new request for a reply.
512  */
513  walrcv->force_reply = false;
515  XLogWalRcvSendReply(true, false);
516  }
517  }
518  if (rc & WL_POSTMASTER_DEATH)
519  {
520  /*
521  * Emergency bailout if postmaster has died. This is to
522  * avoid the necessity for manual cleanup of all
523  * postmaster children.
524  */
525  exit(1);
526  }
527  if (rc & WL_TIMEOUT)
528  {
529  /*
530  * We didn't receive anything new. If we haven't heard
531  * anything from the server for more than
532  * wal_receiver_timeout / 2, ping the server. Also, if
533  * it's been longer than wal_receiver_status_interval
534  * since the last update we sent, send a status update to
535  * the master anyway, to report any progress in applying
536  * WAL.
537  */
538  bool requestReply = false;
539 
540  /*
541  * Check if time since last receive from standby has
542  * reached the configured limit.
543  */
544  if (wal_receiver_timeout > 0)
545  {
547  TimestampTz timeout;
548 
549  timeout =
550  TimestampTzPlusMilliseconds(last_recv_timestamp,
552 
553  if (now >= timeout)
554  ereport(ERROR,
555  (errmsg("terminating walreceiver due to timeout")));
556 
557  /*
558  * We didn't receive anything new, for half of
559  * receiver replication timeout. Ping the server.
560  */
561  if (!ping_sent)
562  {
563  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
564  (wal_receiver_timeout / 2));
565  if (now >= timeout)
566  {
567  requestReply = true;
568  ping_sent = true;
569  }
570  }
571  }
572 
573  XLogWalRcvSendReply(requestReply, requestReply);
575  }
576  }
577 
578  /*
579  * The backend finished streaming. Exit streaming COPY-mode from
580  * our side, too.
581  */
583  walrcv_endstreaming(wrconn, &primaryTLI);
585 
586  /*
587  * If the server had switched to a new timeline that we didn't
588  * know about when we began streaming, fetch its timeline history
589  * file now.
590  */
591  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
592  }
593  else
594  ereport(LOG,
595  (errmsg("primary server contains no more WAL on requested timeline %u",
596  startpointTLI)));
597 
598  /*
599  * End of WAL reached on the requested timeline. Close the last
600  * segment, and await for new orders from the startup process.
601  */
602  if (recvFile >= 0)
603  {
604  char xlogfname[MAXFNAMELEN];
605 
606  XLogWalRcvFlush(false);
607  if (close(recvFile) != 0)
608  ereport(PANIC,
610  errmsg("could not close log segment %s: %m",
612 
613  /*
614  * Create .done file forcibly to prevent the streamed segment from
615  * being archived later.
616  */
617  XLogFileName(xlogfname, recvFileTLI, recvSegNo);
619  XLogArchiveForceDone(xlogfname);
620  else
621  XLogArchiveNotify(xlogfname);
622  }
623  recvFile = -1;
624 
625  elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
626  WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
627  }
628  /* not reached */
629 }
#define SIGUSR1
Definition: win32.h:202
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
Definition: walreceiver.c:720
#define walrcv_get_conninfo(conn)
Definition: walreceiver.h:246
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:254
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:38
#define SIGCONT
Definition: win32.h:197
uint32 TimeLineID
Definition: xlogdefs.h:45
slock_t mutex
Definition: walreceiver.h:117
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
Definition: walreceiver.c:635
WalRcvState walRcvState
Definition: walreceiver.h:63
#define SIGWINCH
Definition: win32.h:201
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
static StringInfoData incoming_message
Definition: walreceiver.c:112
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:154
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1175
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:256
#define XLogFileName(fname, tli, logSegNo)
#define SIGTTIN
Definition: win32.h:199
static void WalRcvSigHupHandler(SIGNAL_ARGS)
Definition: walreceiver.c:800
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:252
void proc_exit(int code)
Definition: ipc.c:99
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define WL_SOCKET_READABLE
Definition: latch.h:125
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
Definition: walreceiver.c:871
void ResetLatch(volatile Latch *latch)
Definition: latch.c:498
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7873
#define PANIC
Definition: elog.h:53
#define NAPTIME_PER_CYCLE
Definition: walreceiver.c:82
#define SIGQUIT
Definition: win32.h:189
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:97
TimestampTz lastMsgSendTime
Definition: walreceiver.h:96
#define PG_SETMASK(mask)
Definition: pqsignal.h:19
int wal_receiver_timeout
Definition: walreceiver.c:75
Latch procLatch
Definition: proc.h:103
static WalReceiverConn * wrconn
Definition: walreceiver.c:79
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
static StringInfoData reply_message
Definition: walreceiver.c:111
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:512
void pfree(void *pointer)
Definition: mcxt.c:950
#define SIG_IGN
Definition: win32.h:185
#define ERROR
Definition: elog.h:43
static void WalRcvSigUsr1Handler(SIGNAL_ARGS)
Definition: walreceiver.c:808
#define FATAL
Definition: elog.h:52
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11084
pid_t pid
Definition: walreceiver.h:62
#define MAXCONNINFO
Definition: walreceiver.h:35
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
#define walrcv_identify_system(conn, primary_tli, server_version)
Definition: walreceiver.h:248
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10126
static char * buf
Definition: pg_test_fsync.c:66
int XLogArchiveMode
Definition: xlog.c:94
int errdetail(const char *fmt,...)
Definition: elog.c:873
int errcode_for_file_access(void)
Definition: elog.c:598
static void DisableWalRcvImmediateExit(void)
Definition: walreceiver.c:180
XLogRecPtr startpoint
Definition: walreceiver.h:145
unsigned int uint32
Definition: c.h:268
int pgsocket
Definition: port.h:22
sigset_t UnBlockSig
Definition: pqsignal.c:22
static volatile sig_atomic_t got_SIGHUP
Definition: walreceiver.c:98
#define ereport(elevel, rest)
Definition: elog.h:122
TimeLineID receiveStartTLI
Definition: walreceiver.h:73
static TimeLineID recvFileTLI
Definition: walreceiver.c:90
Definition: guc.h:72
Latch * latch
Definition: walreceiver.h:135
static char ** options
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
static XLogSegNo recvSegNo
Definition: walreceiver.c:91
bool force_reply
Definition: walreceiver.h:123
#define MAXFNAMELEN
union WalRcvStreamOptions::@97 proto
#define SpinLockRelease(lock)
Definition: spin.h:64
sigset_t BlockSig
Definition: pqsignal.c:22
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void EnableWalRcvImmediateExit(void)
Definition: walreceiver.c:173
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:562
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:80
#define PGINVALID_SOCKET
Definition: port.h:24
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1106
bool ready_to_display
Definition: walreceiver.h:126
TimestampTz latestWalEndTime
Definition: walreceiver.h:103
static void WalRcvShutdownHandler(SIGNAL_ARGS)
Definition: walreceiver.c:819
#define SIGPIPE
Definition: win32.h:193
#define SIGHUP
Definition: win32.h:188
TimeLineID ThisTimeLineID
Definition: xlog.c:179
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define SIG_DFL
Definition: win32.h:183
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define pg_memory_barrier()
Definition: atomics.h:148
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:137
static int server_version
Definition: pg_dumpall.c:82
static void WalRcvQuickDieHandler(SIGNAL_ARGS)
Definition: walreceiver.c:842
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1047
WalRcvData * WalRcv
#define SIGTTOU
Definition: win32.h:200
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4690
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void WalRcvDie(int code, Datum arg)
Definition: walreceiver.c:769
struct WalRcvStreamOptions::@97::@98 physical
static struct @26 LogstreamResult
#define elog
Definition: elog.h:219
#define close(a)
Definition: win32.h:12
XLogRecPtr receiveStart
Definition: walreceiver.h:72
static int recvFile
Definition: walreceiver.c:89
#define SIGCHLD
Definition: win32.h:198
#define WL_LATCH_SET
Definition: latch.h:124
#define SIGALRM
Definition: win32.h:194
#define UINT64_FORMAT
Definition: c.h:316
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
char slotname[NAMEDATALEN]
Definition: walreceiver.h:115
#define SIGUSR2
Definition: win32.h:203
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:109
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:242
static void XLogWalRcvFlush ( bool  dying)
static

Definition at line 1047 of file walreceiver.c.

References AllowCascadeReplication, issue_xlog_fsync(), WalRcvData::latestChunkStart, LogstreamResult, WalRcvData::mutex, WalRcvData::receivedTLI, WalRcvData::receivedUpto, recvFile, recvSegNo, set_ps_display(), snprintf(), SpinLockAcquire, SpinLockRelease, ThisTimeLineID, update_process_title, WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvWrite().

1048 {
1049  if (LogstreamResult.Flush < LogstreamResult.Write)
1050  {
1051  WalRcvData *walrcv = WalRcv;
1052 
1054 
1055  LogstreamResult.Flush = LogstreamResult.Write;
1056 
1057  /* Update shared-memory status */
1058  SpinLockAcquire(&walrcv->mutex);
1059  if (walrcv->receivedUpto < LogstreamResult.Flush)
1060  {
1061  walrcv->latestChunkStart = walrcv->receivedUpto;
1062  walrcv->receivedUpto = LogstreamResult.Flush;
1063  walrcv->receivedTLI = ThisTimeLineID;
1064  }
1065  SpinLockRelease(&walrcv->mutex);
1066 
1067  /* Signal the startup process and walsender that new WAL has arrived */
1068  WakeupRecovery();
1070  WalSndWakeup();
1071 
1072  /* Report XLOG streaming progress in PS display */
1074  {
1075  char activitymsg[50];
1076 
1077  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1078  (uint32) (LogstreamResult.Write >> 32),
1079  (uint32) LogstreamResult.Write);
1080  set_ps_display(activitymsg, false);
1081  }
1082 
1083  /* Also let the master know that we made some progress */
1084  if (!dying)
1085  {
1086  XLogWalRcvSendReply(false, false);
1087  XLogWalRcvSendHSFeedback(false);
1088  }
1089  }
1090 }
slock_t mutex
Definition: walreceiver.h:117
bool update_process_title
Definition: ps_status.c:35
void issue_xlog_fsync(int fd, XLogSegNo segno)
Definition: xlog.c:10083
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1175
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:326
TimeLineID receivedTLI
Definition: walreceiver.h:83
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:12092
XLogRecPtr latestChunkStart
Definition: walreceiver.h:91
#define AllowCascadeReplication()
Definition: walreceiver.h:38
unsigned int uint32
Definition: c.h:268
XLogRecPtr receivedUpto
Definition: walreceiver.h:82
static XLogSegNo recvSegNo
Definition: walreceiver.c:91
#define SpinLockRelease(lock)
Definition: spin.h:64
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1106
TimeLineID ThisTimeLineID
Definition: xlog.c:179
WalRcvData * WalRcv
static struct @26 LogstreamResult
static int recvFile
Definition: walreceiver.c:89
void WalSndWakeup(void)
Definition: walsender.c:3017
static void XLogWalRcvProcessMsg ( unsigned char  type,
char *  buf,
Size  len 
)
static

Definition at line 871 of file walreceiver.c.

References appendBinaryStringInfo(), ereport, errcode(), errmsg_internal(), ERROR, pq_getmsgbyte(), pq_getmsgint64(), ProcessWalSndrMessage(), resetStringInfo(), XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

872 {
873  int hdrlen;
874  XLogRecPtr dataStart;
875  XLogRecPtr walEnd;
876  TimestampTz sendTime;
877  bool replyRequested;
878 
880 
881  switch (type)
882  {
883  case 'w': /* WAL records */
884  {
885  /* copy message to StringInfo */
886  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
887  if (len < hdrlen)
888  ereport(ERROR,
889  (errcode(ERRCODE_PROTOCOL_VIOLATION),
890  errmsg_internal("invalid WAL message received from primary")));
892 
893  /* read the fields */
894  dataStart = pq_getmsgint64(&incoming_message);
895  walEnd = pq_getmsgint64(&incoming_message);
896  sendTime = pq_getmsgint64(&incoming_message);
897  ProcessWalSndrMessage(walEnd, sendTime);
898 
899  buf += hdrlen;
900  len -= hdrlen;
901  XLogWalRcvWrite(buf, len, dataStart);
902  break;
903  }
904  case 'k': /* Keepalive */
905  {
906  /* copy message to StringInfo */
907  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
908  if (len != hdrlen)
909  ereport(ERROR,
910  (errcode(ERRCODE_PROTOCOL_VIOLATION),
911  errmsg_internal("invalid keepalive message received from primary")));
913 
914  /* read the fields */
915  walEnd = pq_getmsgint64(&incoming_message);
916  sendTime = pq_getmsgint64(&incoming_message);
917  replyRequested = pq_getmsgbyte(&incoming_message);
918 
919  ProcessWalSndrMessage(walEnd, sendTime);
920 
921  /* If the primary requested a reply, send one immediately */
922  if (replyRequested)
923  XLogWalRcvSendReply(true, false);
924  break;
925  }
926  default:
927  ereport(ERROR,
928  (errcode(ERRCODE_PROTOCOL_VIOLATION),
929  errmsg_internal("invalid replication message type %d",
930  type)));
931  }
932 }
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
Definition: walreceiver.c:1288
int64 TimestampTz
Definition: timestamp.h:39
static StringInfoData incoming_message
Definition: walreceiver.c:112
int errcode(int sqlerrcode)
Definition: elog.c:575
#define ERROR
Definition: elog.h:43
static char * buf
Definition: pg_test_fsync.c:66
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
#define ereport(elevel, rest)
Definition: elog.h:122
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1106
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:208
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
Definition: walreceiver.c:938
static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1175 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), GetNextXidAndEpoch(), GetOldestXmin(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), NULL, pq_sendbyte(), pq_sendint(), pq_sendint64(), PROCARRAY_FLAGS_DEFAULT, PROCARRAY_SLOTS_XMIN, ProcArrayGetReplicationSlotXmin(), resetStringInfo(), TimestampDifferenceExceeds(), TransactionIdIsValid, TransactionIdPrecedes(), wal_receiver_status_interval, and walrcv_send.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

1176 {
1177  TimestampTz now;
1178  TransactionId nextXid;
1179  uint32 xmin_epoch,
1180  catalog_xmin_epoch;
1181  TransactionId xmin,
1182  catalog_xmin;
1183  static TimestampTz sendTime = 0;
1184 
1185  /* initially true so we always send at least one feedback message */
1186  static bool master_has_standby_xmin = true;
1187 
1188  /*
1189  * If the user doesn't want status to be reported to the master, be sure
1190  * to exit before doing anything at all.
1191  */
1193  !master_has_standby_xmin)
1194  return;
1195 
1196  /* Get current timestamp. */
1197  now = GetCurrentTimestamp();
1198 
1199  if (!immed)
1200  {
1201  /*
1202  * Send feedback at most once per wal_receiver_status_interval.
1203  */
1204  if (!TimestampDifferenceExceeds(sendTime, now,
1206  return;
1207  sendTime = now;
1208  }
1209 
1210  /*
1211  * If Hot Standby is not yet accepting connections there is nothing to
1212  * send. Check this after the interval has expired to reduce number of
1213  * calls.
1214  *
1215  * Bailing out here also ensures that we don't send feedback until we've
1216  * read our own replication slot state, so we don't tell the master to
1217  * discard needed xmin or catalog_xmin from any slots that may exist on
1218  * this replica.
1219  */
1220  if (!HotStandbyActive())
1221  return;
1222 
1223  /*
1224  * Make the expensive call to get the oldest xmin once we are certain
1225  * everything else has been checked.
1226  */
1228  {
1229  TransactionId slot_xmin;
1230 
1231  /*
1232  * Usually GetOldestXmin() would include both global replication slot
1233  * xmin and catalog_xmin in its calculations, but we want to derive
1234  * separate values for each of those. So we ask for an xmin that
1235  * excludes the catalog_xmin.
1236  */
1237  xmin = GetOldestXmin(NULL,
1239 
1240  ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
1241 
1242  if (TransactionIdIsValid(slot_xmin) &&
1243  TransactionIdPrecedes(slot_xmin, xmin))
1244  xmin = slot_xmin;
1245  }
1246  else
1247  {
1248  xmin = InvalidTransactionId;
1249  catalog_xmin = InvalidTransactionId;
1250  }
1251 
1252  /*
1253  * Get epoch and adjust if nextXid and oldestXmin are different sides of
1254  * the epoch boundary.
1255  */
1256  GetNextXidAndEpoch(&nextXid, &xmin_epoch);
1257  catalog_xmin_epoch = xmin_epoch;
1258  if (nextXid < xmin)
1259  xmin_epoch--;
1260  if (nextXid < catalog_xmin)
1261  catalog_xmin_epoch--;
1262 
1263  elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1264  xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1265 
1266  /* Construct the message and send it. */
1268  pq_sendbyte(&reply_message, 'h');
1270  pq_sendint(&reply_message, xmin, 4);
1271  pq_sendint(&reply_message, xmin_epoch, 4);
1272  pq_sendint(&reply_message, catalog_xmin, 4);
1273  pq_sendint(&reply_message, catalog_xmin_epoch, 4);
1275  if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1276  master_has_standby_xmin = true;
1277  else
1278  master_has_standby_xmin = false;
1279 }
bool hot_standby_feedback
Definition: walreceiver.c:76
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
uint32 TransactionId
Definition: c.h:397
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
bool HotStandbyActive(void)
Definition: xlog.c:7929
int wal_receiver_status_interval
Definition: walreceiver.c:74
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1649
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:2982
static WalReceiverConn * wrconn
Definition: walreceiver.c:79
static StringInfoData reply_message
Definition: walreceiver.c:111
void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
Definition: xlog.c:8291
#define PROCARRAY_SLOTS_XMIN
Definition: procarray.h:38
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
unsigned int uint32
Definition: c.h:268
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define PROCARRAY_FLAGS_DEFAULT
Definition: procarray.h:51
TransactionId GetOldestXmin(Relation rel, int flags)
Definition: procarray.c:1314
#define NULL
Definition: c.h:229
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:258
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
#define elog
Definition: elog.h:219
#define TransactionIdIsValid(xid)
Definition: transam.h:41
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static void XLogWalRcvSendReply ( bool  force,
bool  requestReply 
)
static

Definition at line 1106 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), GetXLogReplayRecPtr(), StringInfoData::len, LogstreamResult, now(), NULL, pq_sendbyte(), pq_sendint64(), resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

1107 {
1108  static XLogRecPtr writePtr = 0;
1109  static XLogRecPtr flushPtr = 0;
1110  XLogRecPtr applyPtr;
1111  static TimestampTz sendTime = 0;
1112  TimestampTz now;
1113 
1114  /*
1115  * If the user doesn't want status to be reported to the master, be sure
1116  * to exit before doing anything at all.
1117  */
1118  if (!force && wal_receiver_status_interval <= 0)
1119  return;
1120 
1121  /* Get current timestamp. */
1122  now = GetCurrentTimestamp();
1123 
1124  /*
1125  * We can compare the write and flush positions to the last message we
1126  * sent without taking any lock, but the apply position requires a spin
1127  * lock, so we don't check that unless something else has changed or 10
1128  * seconds have passed. This means that the apply WAL location will
1129  * appear, from the master's point of view, to lag slightly, but since
1130  * this is only for reporting purposes and only on idle systems, that's
1131  * probably OK.
1132  */
1133  if (!force
1134  && writePtr == LogstreamResult.Write
1135  && flushPtr == LogstreamResult.Flush
1136  && !TimestampDifferenceExceeds(sendTime, now,
1138  return;
1139  sendTime = now;
1140 
1141  /* Construct a new message */
1142  writePtr = LogstreamResult.Write;
1143  flushPtr = LogstreamResult.Flush;
1144  applyPtr = GetXLogReplayRecPtr(NULL);
1145 
1147  pq_sendbyte(&reply_message, 'r');
1148  pq_sendint64(&reply_message, writePtr);
1149  pq_sendint64(&reply_message, flushPtr);
1150  pq_sendint64(&reply_message, applyPtr);
1152  pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1153 
1154  /* Send it */
1155  elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1156  (uint32) (writePtr >> 32), (uint32) writePtr,
1157  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1158  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1159  requestReply ? " (reply requested)" : "");
1160 
1162 }
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
int wal_receiver_status_interval
Definition: walreceiver.c:74
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1649
static WalReceiverConn * wrconn
Definition: walreceiver.c:79
static StringInfoData reply_message
Definition: walreceiver.c:111
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11084
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
unsigned int uint32
Definition: c.h:268
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:258
static struct @26 LogstreamResult
#define elog
Definition: elog.h:219
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
static void XLogWalRcvWrite ( char *  buf,
Size  nbytes,
XLogRecPtr  recptr 
)
static

Definition at line 938 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, close, ereport, errcode_for_file_access(), errmsg(), LogstreamResult, MAXFNAMELEN, PANIC, recvFile, recvFileTLI, recvOff, recvSegNo, ThisTimeLineID, write, XLByteInSeg, XLByteToSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileInit(), XLogFileName, XLogFileNameP(), XLogSegSize, and XLogWalRcvFlush().

Referenced by XLogWalRcvProcessMsg().

939 {
940  int startoff;
941  int byteswritten;
942 
943  while (nbytes > 0)
944  {
945  int segbytes;
946 
947  if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo))
948  {
949  bool use_existent;
950 
951  /*
952  * fsync() and close current file before we switch to next one. We
953  * would otherwise have to reopen this file to fsync it later
954  */
955  if (recvFile >= 0)
956  {
957  char xlogfname[MAXFNAMELEN];
958 
959  XLogWalRcvFlush(false);
960 
961  /*
962  * XLOG segment files will be re-read by recovery in startup
963  * process soon, so we don't advise the OS to release cache
964  * pages associated with the file like XLogFileClose() does.
965  */
966  if (close(recvFile) != 0)
967  ereport(PANIC,
969  errmsg("could not close log segment %s: %m",
971 
972  /*
973  * Create .done file forcibly to prevent the streamed segment
974  * from being archived later.
975  */
976  XLogFileName(xlogfname, recvFileTLI, recvSegNo);
978  XLogArchiveForceDone(xlogfname);
979  else
980  XLogArchiveNotify(xlogfname);
981  }
982  recvFile = -1;
983 
984  /* Create/use new log file */
985  XLByteToSeg(recptr, recvSegNo);
986  use_existent = true;
987  recvFile = XLogFileInit(recvSegNo, &use_existent, true);
989  recvOff = 0;
990  }
991 
992  /* Calculate the start offset of the received logs */
993  startoff = recptr % XLogSegSize;
994 
995  if (startoff + nbytes > XLogSegSize)
996  segbytes = XLogSegSize - startoff;
997  else
998  segbytes = nbytes;
999 
1000  /* Need to seek in the file? */
1001  if (recvOff != startoff)
1002  {
1003  if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
1004  ereport(PANIC,
1006  errmsg("could not seek in log segment %s to offset %u: %m",
1008  startoff)));
1009  recvOff = startoff;
1010  }
1011 
1012  /* OK to write the logs */
1013  errno = 0;
1014 
1015  byteswritten = write(recvFile, buf, segbytes);
1016  if (byteswritten <= 0)
1017  {
1018  /* if write didn't set errno, assume no disk space */
1019  if (errno == 0)
1020  errno = ENOSPC;
1021  ereport(PANIC,
1023  errmsg("could not write to log segment %s "
1024  "at offset %u, length %lu: %m",
1026  recvOff, (unsigned long) segbytes)));
1027  }
1028 
1029  /* Update state for write */
1030  recptr += byteswritten;
1031 
1032  recvOff += byteswritten;
1033  nbytes -= byteswritten;
1034  buf += byteswritten;
1035 
1036  LogstreamResult.Write = recptr;
1037  }
1038 }
#define XLogSegSize
Definition: xlog_internal.h:92
#define write(a, b, c)
Definition: win32.h:14
int XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
Definition: xlog.c:3154
#define XLogFileName(fname, tli, logSegNo)
#define PANIC
Definition: elog.h:53
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:512
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10126
static char * buf
Definition: pg_test_fsync.c:66
int XLogArchiveMode
Definition: xlog.c:94
int errcode_for_file_access(void)
Definition: elog.c:598
#define ereport(elevel, rest)
Definition: elog.h:122
static TimeLineID recvFileTLI
Definition: walreceiver.c:90
static XLogSegNo recvSegNo
Definition: walreceiver.c:91
#define MAXFNAMELEN
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:562
TimeLineID ThisTimeLineID
Definition: xlog.c:179
#define XLByteToSeg(xlrp, logSegNo)
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1047
int errmsg(const char *fmt,...)
Definition: elog.c:797
static struct @26 LogstreamResult
static uint32 recvOff
Definition: walreceiver.c:92
#define close(a)
Definition: win32.h:12
static int recvFile
Definition: walreceiver.c:89
#define XLByteInSeg(xlrp, logSegNo)

Variable Documentation

XLogRecPtr Flush

Definition at line 108 of file walreceiver.c.

volatile sig_atomic_t got_SIGHUP = false
static

Definition at line 98 of file walreceiver.c.

Referenced by WalRcvSigHupHandler(), and WalReceiverMain().

volatile sig_atomic_t got_SIGTERM = false
static

Definition at line 99 of file walreceiver.c.

Referenced by ProcessWalRcvInterrupts(), and WalRcvShutdownHandler().

bool hot_standby_feedback

Definition at line 76 of file walreceiver.c.

Referenced by XLogWalRcvSendHSFeedback().

StringInfoData incoming_message
static

Definition at line 112 of file walreceiver.c.

struct { ... } LogstreamResult
int recvFile = -1
static

Definition at line 89 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

TimeLineID recvFileTLI = 0
static

Definition at line 90 of file walreceiver.c.

Referenced by WalReceiverMain(), and XLogWalRcvWrite().

uint32 recvOff = 0
static

Definition at line 92 of file walreceiver.c.

Referenced by XLogWalRcvWrite().

XLogSegNo recvSegNo = 0
static

Definition at line 91 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

StringInfoData reply_message
static

Definition at line 111 of file walreceiver.c.

Referenced by send_feedback().

int wal_receiver_status_interval

Definition at line 74 of file walreceiver.c.

Referenced by send_feedback(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

int wal_receiver_timeout

Definition at line 75 of file walreceiver.c.

Referenced by logicalrep_worker_launch(), LogicalRepApplyLoop(), and WalReceiverMain().

volatile bool WalRcvImmediateInterruptOK = false
static
WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 80 of file walreceiver.c.

Referenced by _PG_init().

WalReceiverConn* wrconn = NULL
static

Definition at line 79 of file walreceiver.c.

XLogRecPtr Write

Definition at line 107 of file walreceiver.c.