PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
walreceiver.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "catalog/pg_type.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */
 

Functions

static void ProcessWalRcvInterrupts (void)
 
static void EnableWalRcvImmediateExit (void)
 
static void DisableWalRcvImmediateExit (void)
 
static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr)
 
static void XLogWalRcvFlush (bool dying)
 
static void XLogWalRcvSendReply (bool force, bool requestReply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
static void WalRcvSigHupHandler (SIGNAL_ARGS)
 
static void WalRcvSigUsr1Handler (SIGNAL_ARGS)
 
static void WalRcvShutdownHandler (SIGNAL_ARGS)
 
static void WalRcvQuickDieHandler (SIGNAL_ARGS)
 
void WalReceiverMain (void)
 
void WalRcvForceReply (void)
 
static const char * WalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
static uint32 recvOff = 0
 
static volatile sig_atomic_t got_SIGHUP = false
 
static volatile sig_atomic_t got_SIGTERM = false
 
struct {
   XLogRecPtr   Write
 
   XLogRecPtr   Flush
 
LogstreamResult
 
static StringInfoData reply_message
 
static StringInfoData incoming_message
 
static volatile bool WalRcvImmediateInterruptOK = false
 

Macro Definition Documentation

#define NAPTIME_PER_CYCLE   100 /* max sleep time between cycles (100ms) */

Definition at line 81 of file walreceiver.c.

Referenced by WalReceiverMain().

Function Documentation

static void DisableWalRcvImmediateExit ( void  )
static

Definition at line 179 of file walreceiver.c.

References ProcessWalRcvInterrupts(), and WalRcvImmediateInterruptOK.

Referenced by WalRcvFetchTimeLineHistoryFiles(), and WalReceiverMain().

180 {
183 }
static volatile bool WalRcvImmediateInterruptOK
Definition: walreceiver.c:129
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:153
static void EnableWalRcvImmediateExit ( void  )
static

Definition at line 172 of file walreceiver.c.

References ProcessWalRcvInterrupts(), and WalRcvImmediateInterruptOK.

Referenced by WalRcvFetchTimeLineHistoryFiles(), and WalReceiverMain().

173 {
176 }
static volatile bool WalRcvImmediateInterruptOK
Definition: walreceiver.c:129
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:153
Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1349 of file walreceiver.c.

References WalRcvData::conninfo, CStringGetTextDatum, elog, ERROR, get_call_result_type(), heap_form_tuple(), HeapTupleGetDatum, Int32GetDatum, WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum, MemSet, WalRcvData::mutex, tupleDesc::natts, NULL, palloc0(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, pstrdup(), WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receivedUpto, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, superuser(), TimestampTzGetDatum, TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, and XLogRecPtrIsInvalid.

1350 {
1351  TupleDesc tupdesc;
1352  Datum *values;
1353  bool *nulls;
1354  WalRcvData *walrcv = WalRcv;
1356  XLogRecPtr receive_start_lsn;
1357  TimeLineID receive_start_tli;
1358  XLogRecPtr received_lsn;
1359  TimeLineID received_tli;
1360  TimestampTz last_send_time;
1361  TimestampTz last_receipt_time;
1362  XLogRecPtr latest_end_lsn;
1363  TimestampTz latest_end_time;
1364  char *slotname;
1365  char *conninfo;
1366 
1367  /*
1368  * No WAL receiver (or not ready yet), just return a tuple with NULL
1369  * values
1370  */
1371  if (walrcv->pid == 0 || !walrcv->ready_to_display)
1372  PG_RETURN_NULL();
1373 
1374  /* determine result type */
1375  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1376  elog(ERROR, "return type must be a row type");
1377 
1378  values = palloc0(sizeof(Datum) * tupdesc->natts);
1379  nulls = palloc0(sizeof(bool) * tupdesc->natts);
1380 
1381  /* Take a lock to ensure value consistency */
1382  SpinLockAcquire(&walrcv->mutex);
1383  state = walrcv->walRcvState;
1384  receive_start_lsn = walrcv->receiveStart;
1385  receive_start_tli = walrcv->receiveStartTLI;
1386  received_lsn = walrcv->receivedUpto;
1387  received_tli = walrcv->receivedTLI;
1388  last_send_time = walrcv->lastMsgSendTime;
1389  last_receipt_time = walrcv->lastMsgReceiptTime;
1390  latest_end_lsn = walrcv->latestWalEnd;
1391  latest_end_time = walrcv->latestWalEndTime;
1392  slotname = pstrdup(walrcv->slotname);
1393  conninfo = pstrdup(walrcv->conninfo);
1394  SpinLockRelease(&walrcv->mutex);
1395 
1396  /* Fetch values */
1397  values[0] = Int32GetDatum(walrcv->pid);
1398 
1399  if (!superuser())
1400  {
1401  /*
1402  * Only superusers can see details. Other users only get the pid value
1403  * to know whether it is a WAL receiver, but no details.
1404  */
1405  MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1406  }
1407  else
1408  {
1409  values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1410 
1411  if (XLogRecPtrIsInvalid(receive_start_lsn))
1412  nulls[2] = true;
1413  else
1414  values[2] = LSNGetDatum(receive_start_lsn);
1415  values[3] = Int32GetDatum(receive_start_tli);
1416  if (XLogRecPtrIsInvalid(received_lsn))
1417  nulls[4] = true;
1418  else
1419  values[4] = LSNGetDatum(received_lsn);
1420  values[5] = Int32GetDatum(received_tli);
1421  if (last_send_time == 0)
1422  nulls[6] = true;
1423  else
1424  values[6] = TimestampTzGetDatum(last_send_time);
1425  if (last_receipt_time == 0)
1426  nulls[7] = true;
1427  else
1428  values[7] = TimestampTzGetDatum(last_receipt_time);
1429  if (XLogRecPtrIsInvalid(latest_end_lsn))
1430  nulls[8] = true;
1431  else
1432  values[8] = LSNGetDatum(latest_end_lsn);
1433  if (latest_end_time == 0)
1434  nulls[9] = true;
1435  else
1436  values[9] = TimestampTzGetDatum(latest_end_time);
1437  if (*slotname == '\0')
1438  nulls[10] = true;
1439  else
1440  values[10] = CStringGetTextDatum(slotname);
1441  if (*conninfo == '\0')
1442  nulls[11] = true;
1443  else
1444  values[11] = CStringGetTextDatum(conninfo);
1445  }
1446 
1447  /* Returns the record as Datum */
1449  heap_form_tuple(tupdesc, values, nulls)));
1450 }
static const char * WalRcvGetStateString(WalRcvState state)
Definition: walreceiver.c:1324
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
uint32 TimeLineID
Definition: xlogdefs.h:45
slock_t mutex
Definition: walreceiver.h:114
int64 TimestampTz
Definition: timestamp.h:39
WalRcvState walRcvState
Definition: walreceiver.h:60
char * pstrdup(const char *in)
Definition: mcxt.c:1165
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
TimeLineID receivedTLI
Definition: walreceiver.h:80
bool superuser(void)
Definition: superuser.c:47
#define MemSet(start, val, len)
Definition: c.h:853
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
int natts
Definition: tupdesc.h:73
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:94
TimestampTz lastMsgSendTime
Definition: walreceiver.h:93
WalRcvState
Definition: walreceiver.h:40
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
pid_t pid
Definition: walreceiver.h:59
XLogRecPtr receivedUpto
Definition: walreceiver.h:79
TimeLineID receiveStartTLI
Definition: walreceiver.h:70
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
void * palloc0(Size size)
Definition: mcxt.c:920
uintptr_t Datum
Definition: postgres.h:374
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:297
bool ready_to_display
Definition: walreceiver.h:123
TimestampTz latestWalEndTime
Definition: walreceiver.h:100
XLogRecPtr latestWalEnd
Definition: walreceiver.h:99
#define NULL
Definition: c.h:226
uint64 XLogRecPtr
Definition: xlogdefs.h:21
Definition: regguts.h:298
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:222
WalRcvData * WalRcv
static Datum values[MAXATTR]
Definition: bootstrap.c:162
#define Int32GetDatum(X)
Definition: postgres.h:487
#define CStringGetTextDatum(s)
Definition: builtins.h:90
#define elog
Definition: elog.h:219
XLogRecPtr receiveStart
Definition: walreceiver.h:69
#define PG_RETURN_NULL()
Definition: fmgr.h:289
char slotname[NAMEDATALEN]
Definition: walreceiver.h:112
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:106
static void ProcessWalRcvInterrupts ( void  )
static

Definition at line 153 of file walreceiver.c.

References CHECK_FOR_INTERRUPTS, ereport, errcode(), errmsg(), FATAL, got_SIGTERM, and WalRcvImmediateInterruptOK.

Referenced by DisableWalRcvImmediateExit(), EnableWalRcvImmediateExit(), WalRcvShutdownHandler(), WalRcvWaitForStartPosition(), and WalReceiverMain().

154 {
155  /*
156  * Although walreceiver interrupt handling doesn't use the same scheme as
157  * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
158  * any incoming signals on Win32.
159  */
161 
162  if (got_SIGTERM)
163  {
165  ereport(FATAL,
166  (errcode(ERRCODE_ADMIN_SHUTDOWN),
167  errmsg("terminating walreceiver process due to administrator command")));
168  }
169 }
static volatile bool WalRcvImmediateInterruptOK
Definition: walreceiver.c:129
int errcode(int sqlerrcode)
Definition: elog.c:575
#define FATAL
Definition: elog.h:52
#define ereport(elevel, rest)
Definition: elog.h:122
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
static volatile sig_atomic_t got_SIGTERM
Definition: walreceiver.c:98
static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1259 of file walreceiver.c.

References DEBUG2, elog, GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, log_min_messages, WalRcvData::mutex, pfree(), pstrdup(), SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

1260 {
1261  WalRcvData *walrcv = WalRcv;
1262 
1263  TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1264 
1265  /* Update shared-memory status */
1266  SpinLockAcquire(&walrcv->mutex);
1267  if (walrcv->latestWalEnd < walEnd)
1268  walrcv->latestWalEndTime = sendTime;
1269  walrcv->latestWalEnd = walEnd;
1270  walrcv->lastMsgSendTime = sendTime;
1271  walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1272  SpinLockRelease(&walrcv->mutex);
1273 
1274  if (log_min_messages <= DEBUG2)
1275  {
1276  char *sendtime;
1277  char *receipttime;
1278  int applyDelay;
1279 
1280  /* Copy because timestamptz_to_str returns a static buffer */
1281  sendtime = pstrdup(timestamptz_to_str(sendTime));
1282  receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1283  applyDelay = GetReplicationApplyDelay();
1284 
1285  /* apply delay is not available */
1286  if (applyDelay == -1)
1287  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1288  sendtime,
1289  receipttime,
1291  else
1292  elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1293  sendtime,
1294  receipttime,
1295  applyDelay,
1297 
1298  pfree(sendtime);
1299  pfree(receipttime);
1300  }
1301 }
slock_t mutex
Definition: walreceiver.h:114
int GetReplicationTransferLatency(void)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1165
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:94
TimestampTz lastMsgSendTime
Definition: walreceiver.h:93
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pfree(void *pointer)
Definition: mcxt.c:992
#define DEBUG2
Definition: elog.h:24
int GetReplicationApplyDelay(void)
#define SpinLockRelease(lock)
Definition: spin.h:64
TimestampTz latestWalEndTime
Definition: walreceiver.h:100
int log_min_messages
Definition: guc.c:453
XLogRecPtr latestWalEnd
Definition: walreceiver.h:99
WalRcvData * WalRcv
#define elog
Definition: elog.h:219
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1709
static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 768 of file walreceiver.c.

References Assert, WalRcvData::latch, WalRcvData::mutex, MyProcPid, NULL, WalRcvData::pid, WalRcvData::ready_to_display, SpinLockAcquire, SpinLockRelease, WakeupRecovery(), WalRcv, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

769 {
770  WalRcvData *walrcv = WalRcv;
771 
772  /* Ensure that all WAL records received are flushed to disk */
773  XLogWalRcvFlush(true);
774 
775  walrcv->latch = NULL;
776 
777  SpinLockAcquire(&walrcv->mutex);
778  Assert(walrcv->walRcvState == WALRCV_STREAMING ||
779  walrcv->walRcvState == WALRCV_RESTARTING ||
780  walrcv->walRcvState == WALRCV_STARTING ||
781  walrcv->walRcvState == WALRCV_WAITING ||
782  walrcv->walRcvState == WALRCV_STOPPING);
783  Assert(walrcv->pid == MyProcPid);
784  walrcv->walRcvState = WALRCV_STOPPED;
785  walrcv->pid = 0;
786  walrcv->ready_to_display = false;
787  SpinLockRelease(&walrcv->mutex);
788 
789  /* Terminate the connection gracefully. */
790  if (wrconn != NULL)
792 
793  /* Wake up the startup process to notice promptly that we're gone */
794  WakeupRecovery();
795 }
int MyProcPid
Definition: globals.c:38
slock_t mutex
Definition: walreceiver.h:114
WalRcvState walRcvState
Definition: walreceiver.h:60
static WalReceiverConn * wrconn
Definition: walreceiver.c:78
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:11975
pid_t pid
Definition: walreceiver.h:59
Latch * latch
Definition: walreceiver.h:132
#define SpinLockRelease(lock)
Definition: spin.h:64
bool ready_to_display
Definition: walreceiver.h:123
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
#define walrcv_disconnect(conn)
Definition: walreceiver.h:231
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1046
WalRcvData * WalRcv
static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 719 of file walreceiver.c.

References DisableWalRcvImmediateExit(), EnableWalRcvImmediateExit(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, existsTimeLineHistory(), LOG, MAXFNAMELEN, pfree(), TLHistoryFileName, walrcv_readtimelinehistoryfile, and writeTimeLineHistoryFile().

Referenced by WalReceiverMain().

720 {
721  TimeLineID tli;
722 
723  for (tli = first; tli <= last; tli++)
724  {
725  /* there's no history file for timeline 1 */
726  if (tli != 1 && !existsTimeLineHistory(tli))
727  {
728  char *fname;
729  char *content;
730  int len;
731  char expectedfname[MAXFNAMELEN];
732 
733  ereport(LOG,
734  (errmsg("fetching timeline history file for timeline %u from primary server",
735  tli)));
736 
738  walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
740 
741  /*
742  * Check that the filename on the master matches what we
743  * calculated ourselves. This is just a sanity check, it should
744  * always match.
745  */
746  TLHistoryFileName(expectedfname, tli);
747  if (strcmp(fname, expectedfname) != 0)
748  ereport(ERROR,
749  (errcode(ERRCODE_PROTOCOL_VIOLATION),
750  errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
751  tli)));
752 
753  /*
754  * Write the file to pg_wal.
755  */
756  writeTimeLineHistoryFile(tli, content, len);
757 
758  pfree(fname);
759  pfree(content);
760  }
761  }
762 }
uint32 TimeLineID
Definition: xlogdefs.h:45
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LOG
Definition: elog.h:26
static WalReceiverConn * wrconn
Definition: walreceiver.c:78
void pfree(void *pointer)
Definition: mcxt.c:992
#define ERROR
Definition: elog.h:43
#define TLHistoryFileName(fname, tli)
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition: timeline.c:441
static void DisableWalRcvImmediateExit(void)
Definition: walreceiver.c:179
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition: timeline.c:206
#define ereport(elevel, rest)
Definition: elog.h:122
#define MAXFNAMELEN
static void EnableWalRcvImmediateExit(void)
Definition: walreceiver.c:172
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
Definition: walreceiver.h:217
int errmsg(const char *fmt,...)
Definition: elog.c:797
void WalRcvForceReply ( void  )

Definition at line 1312 of file walreceiver.c.

References WalRcvData::force_reply, WalRcvData::latch, SetLatch(), and WalRcv.

Referenced by StartupXLOG().

1313 {
1314  WalRcv->force_reply = true;
1315  if (WalRcv->latch)
1316  SetLatch(WalRcv->latch);
1317 }
Latch * latch
Definition: walreceiver.h:132
bool force_reply
Definition: walreceiver.h:120
void SetLatch(volatile Latch *latch)
Definition: latch.c:379
WalRcvData * WalRcv
static const char* WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1324 of file walreceiver.c.

References WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

1325 {
1326  switch (state)
1327  {
1328  case WALRCV_STOPPED:
1329  return "stopped";
1330  case WALRCV_STARTING:
1331  return "starting";
1332  case WALRCV_STREAMING:
1333  return "streaming";
1334  case WALRCV_WAITING:
1335  return "waiting";
1336  case WALRCV_RESTARTING:
1337  return "restarting";
1338  case WALRCV_STOPPING:
1339  return "stopping";
1340  }
1341  return "UNKNOWN";
1342 }
Definition: regguts.h:298
static void WalRcvQuickDieHandler ( SIGNAL_ARGS  )
static

Definition at line 841 of file walreceiver.c.

References BlockSig, on_exit_reset(), and PG_SETMASK.

Referenced by WalReceiverMain().

842 {
844 
845  /*
846  * We DO NOT want to run proc_exit() callbacks -- we're here because
847  * shared memory may be corrupted, so we don't want to try to clean up our
848  * transaction. Just nail the windows shut and get out of town. Now that
849  * there's an atexit callback to prevent third-party code from breaking
850  * things by calling exit() directly, we have to reset the callbacks
851  * explicitly to make this work as intended.
852  */
853  on_exit_reset();
854 
855  /*
856  * Note we do exit(2) not exit(0). This is to force the postmaster into a
857  * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
858  * backend. This is necessary precisely because we don't clean up our
859  * shared memory state. (The "dead man switch" mechanism in pmsignal.c
860  * should ensure the postmaster sees this as a crash, too, but no harm in
861  * being doubly sure.)
862  */
863  exit(2);
864 }
#define PG_SETMASK(mask)
Definition: pqsignal.h:19
void on_exit_reset(void)
Definition: ipc.c:396
sigset_t BlockSig
Definition: pqsignal.c:22
static void WalRcvShutdownHandler ( SIGNAL_ARGS  )
static

Definition at line 818 of file walreceiver.c.

References got_SIGTERM, WalRcvData::latch, proc_exit_inprogress, ProcessWalRcvInterrupts(), SetLatch(), WalRcv, and WalRcvImmediateInterruptOK.

Referenced by WalReceiverMain().

819 {
820  int save_errno = errno;
821 
822  got_SIGTERM = true;
823 
824  if (WalRcv->latch)
826 
827  /* Don't joggle the elbow of proc_exit */
830 
831  errno = save_errno;
832 }
static volatile bool WalRcvImmediateInterruptOK
Definition: walreceiver.c:129
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:153
Latch * latch
Definition: walreceiver.h:132
void SetLatch(volatile Latch *latch)
Definition: latch.c:379
bool proc_exit_inprogress
Definition: ipc.c:40
WalRcvData * WalRcv
static volatile sig_atomic_t got_SIGTERM
Definition: walreceiver.c:98
static void WalRcvSigHupHandler ( SIGNAL_ARGS  )
static

Definition at line 799 of file walreceiver.c.

References got_SIGHUP.

Referenced by WalReceiverMain().

800 {
801  got_SIGHUP = true;
802 }
static volatile sig_atomic_t got_SIGHUP
Definition: walreceiver.c:97
static void WalRcvSigUsr1Handler ( SIGNAL_ARGS  )
static

Definition at line 807 of file walreceiver.c.

References latch_sigusr1_handler().

Referenced by WalReceiverMain().

808 {
809  int save_errno = errno;
810 
812 
813  errno = save_errno;
814 }
void latch_sigusr1_handler(void)
Definition: latch.c:1540
static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 634 of file walreceiver.c.

References Assert, elog, FATAL, InvalidXLogRecPtr, WalRcvData::latch, WalRcvData::mutex, PostmasterIsAlive(), proc_exit(), ProcessWalRcvInterrupts(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, ResetLatch(), set_ps_display(), snprintf(), SpinLockAcquire, SpinLockRelease, update_process_title, WAIT_EVENT_WAL_RECEIVER_WAIT_START, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WL_LATCH_SET, and WL_POSTMASTER_DEATH.

Referenced by WalReceiverMain().

635 {
636  WalRcvData *walrcv = WalRcv;
637  int state;
638 
639  SpinLockAcquire(&walrcv->mutex);
640  state = walrcv->walRcvState;
641  if (state != WALRCV_STREAMING)
642  {
643  SpinLockRelease(&walrcv->mutex);
644  if (state == WALRCV_STOPPING)
645  proc_exit(0);
646  else
647  elog(FATAL, "unexpected walreceiver state");
648  }
649  walrcv->walRcvState = WALRCV_WAITING;
651  walrcv->receiveStartTLI = 0;
652  SpinLockRelease(&walrcv->mutex);
653 
655  set_ps_display("idle", false);
656 
657  /*
658  * nudge startup process to notice that we've stopped streaming and are
659  * now waiting for instructions.
660  */
661  WakeupRecovery();
662  for (;;)
663  {
664  ResetLatch(walrcv->latch);
665 
666  /*
667  * Emergency bailout if postmaster has died. This is to avoid the
668  * necessity for manual cleanup of all postmaster children.
669  */
670  if (!PostmasterIsAlive())
671  exit(1);
672 
674 
675  SpinLockAcquire(&walrcv->mutex);
676  Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
677  walrcv->walRcvState == WALRCV_WAITING ||
678  walrcv->walRcvState == WALRCV_STOPPING);
679  if (walrcv->walRcvState == WALRCV_RESTARTING)
680  {
681  /* we don't expect primary_conninfo to change */
682  *startpoint = walrcv->receiveStart;
683  *startpointTLI = walrcv->receiveStartTLI;
684  walrcv->walRcvState = WALRCV_STREAMING;
685  SpinLockRelease(&walrcv->mutex);
686  break;
687  }
688  if (walrcv->walRcvState == WALRCV_STOPPING)
689  {
690  /*
691  * We should've received SIGTERM if the startup process wants us
692  * to die, but might as well check it here too.
693  */
694  SpinLockRelease(&walrcv->mutex);
695  exit(1);
696  }
697  SpinLockRelease(&walrcv->mutex);
698 
701  }
702 
704  {
705  char activitymsg[50];
706 
707  snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
708  (uint32) (*startpoint >> 32),
709  (uint32) *startpoint);
710  set_ps_display(activitymsg, false);
711  }
712 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
slock_t mutex
Definition: walreceiver.h:114
bool update_process_title
Definition: ps_status.c:35
WalRcvState walRcvState
Definition: walreceiver.h:60
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:153
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:326
void proc_exit(int code)
Definition: ipc.c:99
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void ResetLatch(volatile Latch *latch)
Definition: latch.c:461
#define SpinLockAcquire(lock)
Definition: spin.h:62
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:300
void WakeupRecovery(void)
Definition: xlog.c:11975
#define FATAL
Definition: elog.h:52
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
unsigned int uint32
Definition: c.h:265
TimeLineID receiveStartTLI
Definition: walreceiver.h:70
Latch * latch
Definition: walreceiver.h:132
#define SpinLockRelease(lock)
Definition: spin.h:64
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
#define Assert(condition)
Definition: c.h:671
Definition: regguts.h:298
WalRcvData * WalRcv
#define elog
Definition: elog.h:219
XLogRecPtr receiveStart
Definition: walreceiver.h:69
#define WL_LATCH_SET
Definition: latch.h:124
void WalReceiverMain ( void  )

Definition at line 187 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, Assert, BlockSig, buf, close, WalRcvData::conninfo, CurrentResourceOwner, DEBUG1, DisableWalRcvImmediateExit(), elog, EnableWalRcvImmediateExit(), ereport, errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, WalRcvData::force_reply, GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), got_SIGHUP, initStringInfo(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latch, WalRcvData::latestWalEndTime, load_file(), LOG, WalRcvStreamOptions::logical, LogstreamResult, MAXCONNINFO, MAXFNAMELEN, WalRcvData::mutex, MyProc, MyProcPid, NAMEDATALEN, NAPTIME_PER_CYCLE, now(), NULL, on_shmem_exit(), options, PANIC, pfree(), pg_memory_barrier, PG_SETMASK, PGC_SIGHUP, PGINVALID_SOCKET, WalRcvStreamOptions::physical, WalRcvData::pid, pqsignal(), proc_exit(), ProcessConfigFile(), ProcessWalRcvInterrupts(), PGPROC::procLatch, WalRcvStreamOptions::proto, WalRcvData::ready_to_display, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, ResetLatch(), ResourceOwnerCreate(), server_version, SIG_DFL, SIG_IGN, SIGALRM, SIGCHLD, SIGCONT, SIGHUP, SIGPIPE, SIGQUIT, SIGTTIN, SIGTTOU, SIGUSR1, SIGUSR2, SIGWINCH, WalRcvData::slotname, WalRcvStreamOptions::slotname, snprintf(), SpinLockAcquire, SpinLockRelease, WalRcvStreamOptions::startpoint, strlcpy(), ThisTimeLineID, TimestampTzPlusMilliseconds, UINT64_FORMAT, UnBlockSig, WAIT_EVENT_WAL_RECEIVER_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, WalRcv, walrcv_connect, walrcv_endstreaming, walrcv_get_conninfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvQuickDieHandler(), WalRcvShutdownHandler(), WalRcvSigHupHandler(), WalRcvSigUsr1Handler(), WalRcvData::walRcvState, WalRcvWaitForStartPosition(), WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_SOCKET_READABLE, WL_TIMEOUT, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName, XLogFileNameP(), XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by AuxiliaryProcessMain().

188 {
189  char conninfo[MAXCONNINFO];
190  char *tmp_conninfo;
191  char slotname[NAMEDATALEN];
192  XLogRecPtr startpoint;
193  TimeLineID startpointTLI;
194  TimeLineID primaryTLI;
195  bool first_stream;
196  WalRcvData *walrcv = WalRcv;
197  TimestampTz last_recv_timestamp;
198  bool ping_sent;
199  char *err;
200 
201  /*
202  * WalRcv should be set up already (if we are a backend, we inherit this
203  * by fork() or EXEC_BACKEND mechanism from the postmaster).
204  */
205  Assert(walrcv != NULL);
206 
207  /*
208  * Mark walreceiver as running in shared memory.
209  *
210  * Do this as early as possible, so that if we fail later on, we'll set
211  * state to STOPPED. If we die before this, the startup process will keep
212  * waiting for us to start up, until it times out.
213  */
214  SpinLockAcquire(&walrcv->mutex);
215  Assert(walrcv->pid == 0);
216  switch (walrcv->walRcvState)
217  {
218  case WALRCV_STOPPING:
219  /* If we've already been requested to stop, don't start up. */
220  walrcv->walRcvState = WALRCV_STOPPED;
221  /* fall through */
222 
223  case WALRCV_STOPPED:
224  SpinLockRelease(&walrcv->mutex);
225  proc_exit(1);
226  break;
227 
228  case WALRCV_STARTING:
229  /* The usual case */
230  break;
231 
232  case WALRCV_WAITING:
233  case WALRCV_STREAMING:
234  case WALRCV_RESTARTING:
235  default:
236  /* Shouldn't happen */
237  elog(PANIC, "walreceiver still running according to shared memory state");
238  }
239  /* Advertise our PID so that the startup process can kill us */
240  walrcv->pid = MyProcPid;
241  walrcv->walRcvState = WALRCV_STREAMING;
242 
243  /* Fetch information required to start streaming */
244  walrcv->ready_to_display = false;
245  strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
246  strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
247  startpoint = walrcv->receiveStart;
248  startpointTLI = walrcv->receiveStartTLI;
249 
250  /* Initialise to a sanish value */
252 
253  SpinLockRelease(&walrcv->mutex);
254 
255  /* Arrange to clean up at walreceiver exit */
257 
258  walrcv->latch = &MyProc->procLatch;
259 
260  /* Properly accept or ignore signals the postmaster might send us */
261  pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config
262  * file */
263  pqsignal(SIGINT, SIG_IGN);
264  pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */
265  pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */
270 
271  /* Reset some signals that are accepted by postmaster but not here */
277 
278  /* We allow SIGQUIT (quickdie) at all times */
279  sigdelset(&BlockSig, SIGQUIT);
280 
281  /* Load the libpq-specific functions */
282  load_file("libpqwalreceiver", false);
283  if (WalReceiverFunctions == NULL)
284  elog(ERROR, "libpqwalreceiver didn't initialize correctly");
285 
286  /*
287  * Create a resource owner to keep track of our resources (not clear that
288  * we need this, but may as well have one).
289  */
290  CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
291 
292  /* Unblock signals (they were blocked when the postmaster forked us) */
294 
295  /* Establish the connection to the primary for XLOG streaming */
297  wrconn = walrcv_connect(conninfo, false, "walreceiver", &err);
298  if (!wrconn)
299  ereport(ERROR,
300  (errmsg("could not connect to the primary server: %s", err)));
302 
303  /*
304  * Save user-visible connection string. This clobbers the original
305  * conninfo, for security.
306  */
307  tmp_conninfo = walrcv_get_conninfo(wrconn);
308  SpinLockAcquire(&walrcv->mutex);
309  memset(walrcv->conninfo, 0, MAXCONNINFO);
310  if (tmp_conninfo)
311  {
312  strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
313  pfree(tmp_conninfo);
314  }
315  walrcv->ready_to_display = true;
316  SpinLockRelease(&walrcv->mutex);
317 
318  first_stream = true;
319  for (;;)
320  {
321  char *primary_sysid;
322  char standby_sysid[32];
323  int server_version;
325 
326  /*
327  * Check that we're connected to a valid server using the
328  * IDENTIFY_SYSTEM replication command.
329  */
331  primary_sysid = walrcv_identify_system(wrconn, &primaryTLI,
332  &server_version);
333 
334  snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
336  if (strcmp(primary_sysid, standby_sysid) != 0)
337  {
338  ereport(ERROR,
339  (errmsg("database system identifier differs between the primary and standby"),
340  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
341  primary_sysid, standby_sysid)));
342  }
344 
345  /*
346  * Confirm that the current timeline of the primary is the same or
347  * ahead of ours.
348  */
349  if (primaryTLI < startpointTLI)
350  ereport(ERROR,
351  (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
352  primaryTLI, startpointTLI)));
353 
354  /*
355  * Get any missing history files. We do this always, even when we're
356  * not interested in that timeline, so that if we're promoted to
357  * become the master later on, we don't select the same timeline that
358  * was already used in the current master. This isn't bullet-proof -
359  * you'll need some external software to manage your cluster if you
360  * need to ensure that a unique timeline id is chosen in every case,
361  * but let's avoid the confusion of timeline id collisions where we
362  * can.
363  */
364  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
365 
366  /*
367  * Start streaming.
368  *
369  * We'll try to start at the requested starting point and timeline,
370  * even if it's different from the server's latest timeline. In case
371  * we've already reached the end of the old timeline, the server will
372  * finish the streaming immediately, and we will go back to await
373  * orders from the startup process. If recovery_target_timeline is
374  * 'latest', the startup process will scan pg_wal and find the new
375  * history file, bump recovery target timeline, and ask us to restart
376  * on the new timeline.
377  */
378  options.logical = false;
379  options.startpoint = startpoint;
380  options.slotname = slotname[0] != '\0' ? slotname : NULL;
381  options.proto.physical.startpointTLI = startpointTLI;
382  ThisTimeLineID = startpointTLI;
383  if (walrcv_startstreaming(wrconn, &options))
384  {
385  if (first_stream)
386  ereport(LOG,
387  (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
388  (uint32) (startpoint >> 32), (uint32) startpoint,
389  startpointTLI)));
390  else
391  ereport(LOG,
392  (errmsg("restarted WAL streaming at %X/%X on timeline %u",
393  (uint32) (startpoint >> 32), (uint32) startpoint,
394  startpointTLI)));
395  first_stream = false;
396 
397  /* Initialize LogstreamResult and buffers for processing messages */
401 
402  /* Initialize the last recv timestamp */
403  last_recv_timestamp = GetCurrentTimestamp();
404  ping_sent = false;
405 
406  /* Loop until end-of-streaming or error */
407  for (;;)
408  {
409  char *buf;
410  int len;
411  bool endofwal = false;
412  pgsocket wait_fd = PGINVALID_SOCKET;
413  int rc;
414 
415  /*
416  * Exit walreceiver if we're not in recovery. This should not
417  * happen, but cross-check the status here.
418  */
419  if (!RecoveryInProgress())
420  ereport(FATAL,
421  (errmsg("cannot continue WAL streaming, recovery has already ended")));
422 
423  /* Process any requests or signals received recently */
425 
426  if (got_SIGHUP)
427  {
428  got_SIGHUP = false;
431  }
432 
433  /* See if we can read data immediately */
434  len = walrcv_receive(wrconn, &buf, &wait_fd);
435  if (len != 0)
436  {
437  /*
438  * Process the received data, and any subsequent data we
439  * can read without blocking.
440  */
441  for (;;)
442  {
443  if (len > 0)
444  {
445  /*
446  * Something was received from master, so reset
447  * timeout
448  */
449  last_recv_timestamp = GetCurrentTimestamp();
450  ping_sent = false;
451  XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
452  }
453  else if (len == 0)
454  break;
455  else if (len < 0)
456  {
457  ereport(LOG,
458  (errmsg("replication terminated by primary server"),
459  errdetail("End of WAL reached on timeline %u at %X/%X.",
460  startpointTLI,
461  (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
462  endofwal = true;
463  break;
464  }
465  len = walrcv_receive(wrconn, &buf, &wait_fd);
466  }
467 
468  /* Let the master know that we received some data. */
469  XLogWalRcvSendReply(false, false);
470 
471  /*
472  * If we've written some records, flush them to disk and
473  * let the startup process and primary server know about
474  * them.
475  */
476  XLogWalRcvFlush(false);
477  }
478 
479  /* Check if we need to exit the streaming loop. */
480  if (endofwal)
481  break;
482 
483  /*
484  * Ideally we would reuse a WaitEventSet object repeatedly
485  * here to avoid the overheads of WaitLatchOrSocket on epoll
486  * systems, but we can't be sure that libpq (or any other
487  * walreceiver implementation) has the same socket (even if
488  * the fd is the same number, it may have been closed and
489  * reopened since the last time). In future, if there is a
490  * function for removing sockets from WaitEventSet, then we
491  * could add and remove just the socket each time, potentially
492  * avoiding some system calls.
493  */
494  Assert(wait_fd != PGINVALID_SOCKET);
495  rc = WaitLatchOrSocket(walrcv->latch,
498  wait_fd,
501  if (rc & WL_LATCH_SET)
502  {
503  ResetLatch(walrcv->latch);
504  if (walrcv->force_reply)
505  {
506  /*
507  * The recovery process has asked us to send apply
508  * feedback now. Make sure the flag is really set to
509  * false in shared memory before sending the reply, so
510  * we don't miss a new request for a reply.
511  */
512  walrcv->force_reply = false;
514  XLogWalRcvSendReply(true, false);
515  }
516  }
517  if (rc & WL_POSTMASTER_DEATH)
518  {
519  /*
520  * Emergency bailout if postmaster has died. This is to
521  * avoid the necessity for manual cleanup of all
522  * postmaster children.
523  */
524  exit(1);
525  }
526  if (rc & WL_TIMEOUT)
527  {
528  /*
529  * We didn't receive anything new. If we haven't heard
530  * anything from the server for more than
531  * wal_receiver_timeout / 2, ping the server. Also, if
532  * it's been longer than wal_receiver_status_interval
533  * since the last update we sent, send a status update to
534  * the master anyway, to report any progress in applying
535  * WAL.
536  */
537  bool requestReply = false;
538 
539  /*
540  * Check if time since last receive from standby has
541  * reached the configured limit.
542  */
543  if (wal_receiver_timeout > 0)
544  {
546  TimestampTz timeout;
547 
548  timeout =
549  TimestampTzPlusMilliseconds(last_recv_timestamp,
551 
552  if (now >= timeout)
553  ereport(ERROR,
554  (errmsg("terminating walreceiver due to timeout")));
555 
556  /*
557  * We didn't receive anything new, for half of
558  * receiver replication timeout. Ping the server.
559  */
560  if (!ping_sent)
561  {
562  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
563  (wal_receiver_timeout / 2));
564  if (now >= timeout)
565  {
566  requestReply = true;
567  ping_sent = true;
568  }
569  }
570  }
571 
572  XLogWalRcvSendReply(requestReply, requestReply);
574  }
575  }
576 
577  /*
578  * The backend finished streaming. Exit streaming COPY-mode from
579  * our side, too.
580  */
582  walrcv_endstreaming(wrconn, &primaryTLI);
584 
585  /*
586  * If the server had switched to a new timeline that we didn't
587  * know about when we began streaming, fetch its timeline history
588  * file now.
589  */
590  WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
591  }
592  else
593  ereport(LOG,
594  (errmsg("primary server contains no more WAL on requested timeline %u",
595  startpointTLI)));
596 
597  /*
598  * End of WAL reached on the requested timeline. Close the last
599  * segment, and await for new orders from the startup process.
600  */
601  if (recvFile >= 0)
602  {
603  char xlogfname[MAXFNAMELEN];
604 
605  XLogWalRcvFlush(false);
606  if (close(recvFile) != 0)
607  ereport(PANIC,
609  errmsg("could not close log segment %s: %m",
611 
612  /*
613  * Create .done file forcibly to prevent the streamed segment from
614  * being archived later.
615  */
616  XLogFileName(xlogfname, recvFileTLI, recvSegNo);
618  XLogArchiveForceDone(xlogfname);
619  else
620  XLogArchiveNotify(xlogfname);
621  }
622  recvFile = -1;
623 
624  elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
625  WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
626  }
627  /* not reached */
628 }
#define SIGUSR1
Definition: win32.h:211
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
Definition: walreceiver.c:719
#define walrcv_get_conninfo(conn)
Definition: walreceiver.h:213
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:221
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:38
#define SIGCONT
Definition: win32.h:205
uint32 TimeLineID
Definition: xlogdefs.h:45
slock_t mutex
Definition: walreceiver.h:114
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
Definition: walreceiver.c:634
WalRcvState walRcvState
Definition: walreceiver.h:60
#define SIGWINCH
Definition: win32.h:209
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
static StringInfoData incoming_message
Definition: walreceiver.c:111
static void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:153
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1174
union WalRcvStreamOptions::@53 proto
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:223
#define XLogFileName(fname, tli, logSegNo)
#define SIGTTIN
Definition: win32.h:207
static void WalRcvSigHupHandler(SIGNAL_ARGS)
Definition: walreceiver.c:799
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:219
void proc_exit(int code)
Definition: ipc.c:99
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define WL_SOCKET_READABLE
Definition: latch.h:125
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
Definition: walreceiver.c:870
void ResetLatch(volatile Latch *latch)
Definition: latch.c:461
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7805
#define PANIC
Definition: elog.h:53
#define NAPTIME_PER_CYCLE
Definition: walreceiver.c:81
#define SIGQUIT
Definition: win32.h:197
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:94
TimestampTz lastMsgSendTime
Definition: walreceiver.h:93
#define PG_SETMASK(mask)
Definition: pqsignal.h:19
int wal_receiver_timeout
Definition: walreceiver.c:74
Latch procLatch
Definition: proc.h:93
static WalReceiverConn * wrconn
Definition: walreceiver.c:78
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
static StringInfoData reply_message
Definition: walreceiver.c:110
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:512
void pfree(void *pointer)
Definition: mcxt.c:992
#define SIG_IGN
Definition: win32.h:193
#define ERROR
Definition: elog.h:43
static void WalRcvSigUsr1Handler(SIGNAL_ARGS)
Definition: walreceiver.c:807
#define FATAL
Definition: elog.h:52
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:10984
pid_t pid
Definition: walreceiver.h:59
#define MAXCONNINFO
Definition: walreceiver.h:32
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
#define walrcv_identify_system(conn, primary_tli, server_version)
Definition: walreceiver.h:215
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10040
static char * buf
Definition: pg_test_fsync.c:65
int XLogArchiveMode
Definition: xlog.c:93
int errdetail(const char *fmt,...)
Definition: elog.c:873
int errcode_for_file_access(void)
Definition: elog.c:598
static void DisableWalRcvImmediateExit(void)
Definition: walreceiver.c:179
XLogRecPtr startpoint
Definition: walreceiver.h:144
unsigned int uint32
Definition: c.h:265
struct WalRcvStreamOptions::@53::@54 physical
int pgsocket
Definition: port.h:22
sigset_t UnBlockSig
Definition: pqsignal.c:22
static volatile sig_atomic_t got_SIGHUP
Definition: walreceiver.c:97
#define ereport(elevel, rest)
Definition: elog.h:122
TimeLineID receiveStartTLI
Definition: walreceiver.h:70
static TimeLineID recvFileTLI
Definition: walreceiver.c:89
Definition: guc.h:72
Latch * latch
Definition: walreceiver.h:132
static char ** options
void initStringInfo(StringInfo str)
Definition: stringinfo.c:65
static XLogSegNo recvSegNo
Definition: walreceiver.c:90
bool force_reply
Definition: walreceiver.h:120
#define MAXFNAMELEN
#define SpinLockRelease(lock)
Definition: spin.h:64
sigset_t BlockSig
Definition: pqsignal.c:22
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static void EnableWalRcvImmediateExit(void)
Definition: walreceiver.c:172
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:562
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:79
#define PGINVALID_SOCKET
Definition: port.h:24
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1105
bool ready_to_display
Definition: walreceiver.h:123
TimestampTz latestWalEndTime
Definition: walreceiver.h:100
static void WalRcvShutdownHandler(SIGNAL_ARGS)
Definition: walreceiver.c:818
#define SIGPIPE
Definition: win32.h:201
#define SIGHUP
Definition: win32.h:196
TimeLineID ThisTimeLineID
Definition: xlog.c:178
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define SIG_DFL
Definition: win32.h:191
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define pg_memory_barrier()
Definition: atomics.h:147
#define NULL
Definition: c.h:226
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:671
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:320
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:133
static int server_version
Definition: pg_dumpall.c:77
static void WalRcvQuickDieHandler(SIGNAL_ARGS)
Definition: walreceiver.c:841
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1046
WalRcvData * WalRcv
#define SIGTTOU
Definition: win32.h:208
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4661
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void WalRcvDie(int code, Datum arg)
Definition: walreceiver.c:768
#define elog
Definition: elog.h:219
#define close(a)
Definition: win32.h:17
XLogRecPtr receiveStart
Definition: walreceiver.h:69
static int recvFile
Definition: walreceiver.c:88
#define SIGCHLD
Definition: win32.h:206
#define WL_LATCH_SET
Definition: latch.h:124
#define SIGALRM
Definition: win32.h:202
#define UINT64_FORMAT
Definition: c.h:313
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
char slotname[NAMEDATALEN]
Definition: walreceiver.h:112
#define SIGUSR2
Definition: win32.h:212
static struct @24 LogstreamResult
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:106
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:209
static void XLogWalRcvFlush ( bool  dying)
static

Definition at line 1046 of file walreceiver.c.

References AllowCascadeReplication, issue_xlog_fsync(), WalRcvData::latestChunkStart, LogstreamResult, WalRcvData::mutex, WalRcvData::receivedTLI, WalRcvData::receivedUpto, recvFile, recvSegNo, set_ps_display(), snprintf(), SpinLockAcquire, SpinLockRelease, ThisTimeLineID, update_process_title, WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvWrite().

1047 {
1048  if (LogstreamResult.Flush < LogstreamResult.Write)
1049  {
1050  WalRcvData *walrcv = WalRcv;
1051 
1053 
1054  LogstreamResult.Flush = LogstreamResult.Write;
1055 
1056  /* Update shared-memory status */
1057  SpinLockAcquire(&walrcv->mutex);
1058  if (walrcv->receivedUpto < LogstreamResult.Flush)
1059  {
1060  walrcv->latestChunkStart = walrcv->receivedUpto;
1061  walrcv->receivedUpto = LogstreamResult.Flush;
1062  walrcv->receivedTLI = ThisTimeLineID;
1063  }
1064  SpinLockRelease(&walrcv->mutex);
1065 
1066  /* Signal the startup process and walsender that new WAL has arrived */
1067  WakeupRecovery();
1069  WalSndWakeup();
1070 
1071  /* Report XLOG streaming progress in PS display */
1073  {
1074  char activitymsg[50];
1075 
1076  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1077  (uint32) (LogstreamResult.Write >> 32),
1078  (uint32) LogstreamResult.Write);
1079  set_ps_display(activitymsg, false);
1080  }
1081 
1082  /* Also let the master know that we made some progress */
1083  if (!dying)
1084  {
1085  XLogWalRcvSendReply(false, false);
1086  XLogWalRcvSendHSFeedback(false);
1087  }
1088  }
1089 }
slock_t mutex
Definition: walreceiver.h:114
bool update_process_title
Definition: ps_status.c:35
void issue_xlog_fsync(int fd, XLogSegNo segno)
Definition: xlog.c:9997
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1174
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:326
TimeLineID receivedTLI
Definition: walreceiver.h:80
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define SpinLockAcquire(lock)
Definition: spin.h:62
void WakeupRecovery(void)
Definition: xlog.c:11975
XLogRecPtr latestChunkStart
Definition: walreceiver.h:88
#define AllowCascadeReplication()
Definition: walreceiver.h:35
unsigned int uint32
Definition: c.h:265
XLogRecPtr receivedUpto
Definition: walreceiver.h:79
static XLogSegNo recvSegNo
Definition: walreceiver.c:90
#define SpinLockRelease(lock)
Definition: spin.h:64
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1105
TimeLineID ThisTimeLineID
Definition: xlog.c:178
WalRcvData * WalRcv
static int recvFile
Definition: walreceiver.c:88
void WalSndWakeup(void)
Definition: walsender.c:2631
static struct @24 LogstreamResult
static void XLogWalRcvProcessMsg ( unsigned char  type,
char *  buf,
Size  len 
)
static

Definition at line 870 of file walreceiver.c.

References appendBinaryStringInfo(), ereport, errcode(), errmsg_internal(), ERROR, pq_getmsgbyte(), pq_getmsgint64(), ProcessWalSndrMessage(), resetStringInfo(), XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

871 {
872  int hdrlen;
873  XLogRecPtr dataStart;
874  XLogRecPtr walEnd;
875  TimestampTz sendTime;
876  bool replyRequested;
877 
879 
880  switch (type)
881  {
882  case 'w': /* WAL records */
883  {
884  /* copy message to StringInfo */
885  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
886  if (len < hdrlen)
887  ereport(ERROR,
888  (errcode(ERRCODE_PROTOCOL_VIOLATION),
889  errmsg_internal("invalid WAL message received from primary")));
891 
892  /* read the fields */
893  dataStart = pq_getmsgint64(&incoming_message);
894  walEnd = pq_getmsgint64(&incoming_message);
895  sendTime = pq_getmsgint64(&incoming_message);
896  ProcessWalSndrMessage(walEnd, sendTime);
897 
898  buf += hdrlen;
899  len -= hdrlen;
900  XLogWalRcvWrite(buf, len, dataStart);
901  break;
902  }
903  case 'k': /* Keepalive */
904  {
905  /* copy message to StringInfo */
906  hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
907  if (len != hdrlen)
908  ereport(ERROR,
909  (errcode(ERRCODE_PROTOCOL_VIOLATION),
910  errmsg_internal("invalid keepalive message received from primary")));
912 
913  /* read the fields */
914  walEnd = pq_getmsgint64(&incoming_message);
915  sendTime = pq_getmsgint64(&incoming_message);
916  replyRequested = pq_getmsgbyte(&incoming_message);
917 
918  ProcessWalSndrMessage(walEnd, sendTime);
919 
920  /* If the primary requested a reply, send one immediately */
921  if (replyRequested)
922  XLogWalRcvSendReply(true, false);
923  break;
924  }
925  default:
926  ereport(ERROR,
927  (errcode(ERRCODE_PROTOCOL_VIOLATION),
928  errmsg_internal("invalid replication message type %d",
929  type)));
930  }
931 }
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
Definition: walreceiver.c:1259
int64 TimestampTz
Definition: timestamp.h:39
static StringInfoData incoming_message
Definition: walreceiver.c:111
int errcode(int sqlerrcode)
Definition: elog.c:575
#define ERROR
Definition: elog.h:43
static char * buf
Definition: pg_test_fsync.c:65
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
#define ereport(elevel, rest)
Definition: elog.h:122
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1105
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:240
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
Definition: walreceiver.c:937
static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1174 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), GetNextXidAndEpoch(), GetOldestXmin(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), NULL, pq_sendbyte(), pq_sendint(), pq_sendint64(), resetStringInfo(), TimestampDifferenceExceeds(), TransactionIdIsValid, wal_receiver_status_interval, and walrcv_send.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

1175 {
1176  TimestampTz now;
1177  TransactionId nextXid;
1178  uint32 nextEpoch;
1179  TransactionId xmin;
1180  static TimestampTz sendTime = 0;
1181  /* initially true so we always send at least one feedback message */
1182  static bool master_has_standby_xmin = true;
1183 
1184  /*
1185  * If the user doesn't want status to be reported to the master, be sure
1186  * to exit before doing anything at all.
1187  */
1189  !master_has_standby_xmin)
1190  return;
1191 
1192  /* Get current timestamp. */
1193  now = GetCurrentTimestamp();
1194 
1195  if (!immed)
1196  {
1197  /*
1198  * Send feedback at most once per wal_receiver_status_interval.
1199  */
1200  if (!TimestampDifferenceExceeds(sendTime, now,
1202  return;
1203  sendTime = now;
1204  }
1205 
1206  /*
1207  * If Hot Standby is not yet accepting connections there is nothing to
1208  * send. Check this after the interval has expired to reduce number of
1209  * calls.
1210  *
1211  * Bailing out here also ensures that we don't send feedback until we've
1212  * read our own replication slot state, so we don't tell the master to
1213  * discard needed xmin or catalog_xmin from any slots that may exist
1214  * on this replica.
1215  */
1216  if (!HotStandbyActive())
1217  return;
1218 
1219  /*
1220  * Make the expensive call to get the oldest xmin once we are certain
1221  * everything else has been checked.
1222  */
1224  xmin = GetOldestXmin(NULL, false);
1225  else
1226  xmin = InvalidTransactionId;
1227 
1228  /*
1229  * Get epoch and adjust if nextXid and oldestXmin are different sides of
1230  * the epoch boundary.
1231  */
1232  GetNextXidAndEpoch(&nextXid, &nextEpoch);
1233  if (nextXid < xmin)
1234  nextEpoch--;
1235 
1236  elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
1237  xmin, nextEpoch);
1238 
1239  /* Construct the message and send it. */
1241  pq_sendbyte(&reply_message, 'h');
1243  pq_sendint(&reply_message, xmin, 4);
1244  pq_sendint(&reply_message, nextEpoch, 4);
1246  if (TransactionIdIsValid(xmin))
1247  master_has_standby_xmin = true;
1248  else
1249  master_has_standby_xmin = false;
1250 }
bool hot_standby_feedback
Definition: walreceiver.c:75
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
uint32 TransactionId
Definition: c.h:394
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
int64 TimestampTz
Definition: timestamp.h:39
bool HotStandbyActive(void)
Definition: xlog.c:7861
int wal_receiver_status_interval
Definition: walreceiver.c:73
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1648
static WalReceiverConn * wrconn
Definition: walreceiver.c:78
static StringInfoData reply_message
Definition: walreceiver.c:110
void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
Definition: xlog.c:8223
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
unsigned int uint32
Definition: c.h:265
TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum)
Definition: procarray.c:1305
#define NULL
Definition: c.h:226
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:225
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
#define elog
Definition: elog.h:219
#define TransactionIdIsValid(xid)
Definition: transam.h:41
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
static void XLogWalRcvSendReply ( bool  force,
bool  requestReply 
)
static

Definition at line 1105 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), GetXLogReplayRecPtr(), StringInfoData::len, LogstreamResult, now(), NULL, pq_sendbyte(), pq_sendint64(), resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

1106 {
1107  static XLogRecPtr writePtr = 0;
1108  static XLogRecPtr flushPtr = 0;
1109  XLogRecPtr applyPtr;
1110  static TimestampTz sendTime = 0;
1111  TimestampTz now;
1112 
1113  /*
1114  * If the user doesn't want status to be reported to the master, be sure
1115  * to exit before doing anything at all.
1116  */
1117  if (!force && wal_receiver_status_interval <= 0)
1118  return;
1119 
1120  /* Get current timestamp. */
1121  now = GetCurrentTimestamp();
1122 
1123  /*
1124  * We can compare the write and flush positions to the last message we
1125  * sent without taking any lock, but the apply position requires a spin
1126  * lock, so we don't check that unless something else has changed or 10
1127  * seconds have passed. This means that the apply log position will
1128  * appear, from the master's point of view, to lag slightly, but since
1129  * this is only for reporting purposes and only on idle systems, that's
1130  * probably OK.
1131  */
1132  if (!force
1133  && writePtr == LogstreamResult.Write
1134  && flushPtr == LogstreamResult.Flush
1135  && !TimestampDifferenceExceeds(sendTime, now,
1137  return;
1138  sendTime = now;
1139 
1140  /* Construct a new message */
1141  writePtr = LogstreamResult.Write;
1142  flushPtr = LogstreamResult.Flush;
1143  applyPtr = GetXLogReplayRecPtr(NULL);
1144 
1146  pq_sendbyte(&reply_message, 'r');
1147  pq_sendint64(&reply_message, writePtr);
1148  pq_sendint64(&reply_message, flushPtr);
1149  pq_sendint64(&reply_message, applyPtr);
1151  pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1152 
1153  /* Send it */
1154  elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1155  (uint32) (writePtr >> 32), (uint32) writePtr,
1156  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1157  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1158  requestReply ? " (reply requested)" : "");
1159 
1161 }
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
int64 TimestampTz
Definition: timestamp.h:39
int wal_receiver_status_interval
Definition: walreceiver.c:73
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1648
static WalReceiverConn * wrconn
Definition: walreceiver.c:78
static StringInfoData reply_message
Definition: walreceiver.c:110
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:10984
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
unsigned int uint32
Definition: c.h:265
#define NULL
Definition: c.h:226
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:225
#define elog
Definition: elog.h:219
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
static struct @24 LogstreamResult
static void XLogWalRcvWrite ( char *  buf,
Size  nbytes,
XLogRecPtr  recptr 
)
static

Definition at line 937 of file walreceiver.c.

References ARCHIVE_MODE_ALWAYS, close, ereport, errcode_for_file_access(), errmsg(), LogstreamResult, MAXFNAMELEN, PANIC, recvFile, recvFileTLI, recvOff, recvSegNo, ThisTimeLineID, write, XLByteInSeg, XLByteToSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileInit(), XLogFileName, XLogFileNameP(), XLogSegSize, and XLogWalRcvFlush().

Referenced by XLogWalRcvProcessMsg().

938 {
939  int startoff;
940  int byteswritten;
941 
942  while (nbytes > 0)
943  {
944  int segbytes;
945 
946  if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo))
947  {
948  bool use_existent;
949 
950  /*
951  * fsync() and close current file before we switch to next one. We
952  * would otherwise have to reopen this file to fsync it later
953  */
954  if (recvFile >= 0)
955  {
956  char xlogfname[MAXFNAMELEN];
957 
958  XLogWalRcvFlush(false);
959 
960  /*
961  * XLOG segment files will be re-read by recovery in startup
962  * process soon, so we don't advise the OS to release cache
963  * pages associated with the file like XLogFileClose() does.
964  */
965  if (close(recvFile) != 0)
966  ereport(PANIC,
968  errmsg("could not close log segment %s: %m",
970 
971  /*
972  * Create .done file forcibly to prevent the streamed segment
973  * from being archived later.
974  */
975  XLogFileName(xlogfname, recvFileTLI, recvSegNo);
977  XLogArchiveForceDone(xlogfname);
978  else
979  XLogArchiveNotify(xlogfname);
980  }
981  recvFile = -1;
982 
983  /* Create/use new log file */
984  XLByteToSeg(recptr, recvSegNo);
985  use_existent = true;
986  recvFile = XLogFileInit(recvSegNo, &use_existent, true);
988  recvOff = 0;
989  }
990 
991  /* Calculate the start offset of the received logs */
992  startoff = recptr % XLogSegSize;
993 
994  if (startoff + nbytes > XLogSegSize)
995  segbytes = XLogSegSize - startoff;
996  else
997  segbytes = nbytes;
998 
999  /* Need to seek in the file? */
1000  if (recvOff != startoff)
1001  {
1002  if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
1003  ereport(PANIC,
1005  errmsg("could not seek in log segment %s to offset %u: %m",
1007  startoff)));
1008  recvOff = startoff;
1009  }
1010 
1011  /* OK to write the logs */
1012  errno = 0;
1013 
1014  byteswritten = write(recvFile, buf, segbytes);
1015  if (byteswritten <= 0)
1016  {
1017  /* if write didn't set errno, assume no disk space */
1018  if (errno == 0)
1019  errno = ENOSPC;
1020  ereport(PANIC,
1022  errmsg("could not write to log segment %s "
1023  "at offset %u, length %lu: %m",
1025  recvOff, (unsigned long) segbytes)));
1026  }
1027 
1028  /* Update state for write */
1029  recptr += byteswritten;
1030 
1031  recvOff += byteswritten;
1032  nbytes -= byteswritten;
1033  buf += byteswritten;
1034 
1035  LogstreamResult.Write = recptr;
1036  }
1037 }
#define XLogSegSize
Definition: xlog_internal.h:92
#define write(a, b, c)
Definition: win32.h:19
int XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
Definition: xlog.c:3141
#define XLogFileName(fname, tli, logSegNo)
#define PANIC
Definition: elog.h:53
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:512
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10040
static char * buf
Definition: pg_test_fsync.c:65
int XLogArchiveMode
Definition: xlog.c:93
int errcode_for_file_access(void)
Definition: elog.c:598
#define ereport(elevel, rest)
Definition: elog.h:122
static TimeLineID recvFileTLI
Definition: walreceiver.c:89
static XLogSegNo recvSegNo
Definition: walreceiver.c:90
#define MAXFNAMELEN
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:562
TimeLineID ThisTimeLineID
Definition: xlog.c:178
#define XLByteToSeg(xlrp, logSegNo)
static void XLogWalRcvFlush(bool dying)
Definition: walreceiver.c:1046
int errmsg(const char *fmt,...)
Definition: elog.c:797
static uint32 recvOff
Definition: walreceiver.c:91
#define close(a)
Definition: win32.h:17
static int recvFile
Definition: walreceiver.c:88
#define XLByteInSeg(xlrp, logSegNo)
static struct @24 LogstreamResult

Variable Documentation

XLogRecPtr Flush

Definition at line 107 of file walreceiver.c.

volatile sig_atomic_t got_SIGHUP = false
static

Definition at line 97 of file walreceiver.c.

Referenced by WalRcvSigHupHandler(), and WalReceiverMain().

volatile sig_atomic_t got_SIGTERM = false
static

Definition at line 98 of file walreceiver.c.

Referenced by ProcessWalRcvInterrupts(), and WalRcvShutdownHandler().

bool hot_standby_feedback

Definition at line 75 of file walreceiver.c.

Referenced by XLogWalRcvSendHSFeedback().

StringInfoData incoming_message
static

Definition at line 111 of file walreceiver.c.

struct { ... } LogstreamResult
int recvFile = -1
static

Definition at line 88 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

TimeLineID recvFileTLI = 0
static

Definition at line 89 of file walreceiver.c.

Referenced by WalReceiverMain(), and XLogWalRcvWrite().

uint32 recvOff = 0
static

Definition at line 91 of file walreceiver.c.

Referenced by XLogWalRcvWrite().

XLogSegNo recvSegNo = 0
static

Definition at line 90 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

StringInfoData reply_message
static

Definition at line 110 of file walreceiver.c.

Referenced by send_feedback().

int wal_receiver_status_interval

Definition at line 73 of file walreceiver.c.

Referenced by send_feedback(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

int wal_receiver_timeout

Definition at line 74 of file walreceiver.c.

Referenced by ApplyLoop(), and WalReceiverMain().

volatile bool WalRcvImmediateInterruptOK = false
static
WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 79 of file walreceiver.c.

Referenced by _PG_init().

WalReceiverConn* wrconn = NULL
static

Definition at line 78 of file walreceiver.c.

XLogRecPtr Write

Definition at line 106 of file walreceiver.c.