PostgreSQL Source Code git master
walreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "access/xlogrecovery.h"
#include "catalog/pg_authid.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/auxprocess.h"
#include "postmaster/interrupt.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NUM_WALRCV_WAKEUPS   (WALRCV_WAKEUP_HSFEEDBACK + 1)
 

Typedefs

typedef enum WalRcvWakeupReason WalRcvWakeupReason
 

Enumerations

enum  WalRcvWakeupReason { WALRCV_WAKEUP_TERMINATE , WALRCV_WAKEUP_PING , WALRCV_WAKEUP_REPLY , WALRCV_WAKEUP_HSFEEDBACK }
 

Functions

static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len, TimeLineID tli)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvFlush (bool dying, TimeLineID tli)
 
static void XLogWalRcvClose (XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvSendReply (bool force, bool requestReply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
static void WalRcvComputeNextWakeup (WalRcvWakeupReason reason, TimestampTz now)
 
void ProcessWalRcvInterrupts (void)
 
void WalReceiverMain (char *startup_data, size_t startup_data_len)
 
void WalRcvForceReply (void)
 
static const char * WalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
struct {
   XLogRecPtr   Write
 
   XLogRecPtr   Flush
 
LogstreamResult
 
static TimestampTz wakeup [NUM_WALRCV_WAKEUPS]
 
static StringInfoData reply_message
 

Macro Definition Documentation

◆ NUM_WALRCV_WAKEUPS

#define NUM_WALRCV_WAKEUPS   (WALRCV_WAKEUP_HSFEEDBACK + 1)

Definition at line 123 of file walreceiver.c.

Typedef Documentation

◆ WalRcvWakeupReason

Enumeration Type Documentation

◆ WalRcvWakeupReason

Enumerator
WALRCV_WAKEUP_TERMINATE 
WALRCV_WAKEUP_PING 
WALRCV_WAKEUP_REPLY 
WALRCV_WAKEUP_HSFEEDBACK 

Definition at line 117 of file walreceiver.c.

118{
123#define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1)
WalRcvWakeupReason
Definition: walreceiver.c:118
@ WALRCV_WAKEUP_TERMINATE
Definition: walreceiver.c:119
@ WALRCV_WAKEUP_REPLY
Definition: walreceiver.c:121
@ WALRCV_WAKEUP_PING
Definition: walreceiver.c:120
@ WALRCV_WAKEUP_HSFEEDBACK
Definition: walreceiver.c:122

Function Documentation

◆ pg_stat_get_wal_receiver()

Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1402 of file walreceiver.c.

1403{
1404 TupleDesc tupdesc;
1405 Datum *values;
1406 bool *nulls;
1407 int pid;
1408 bool ready_to_display;
1410 XLogRecPtr receive_start_lsn;
1411 TimeLineID receive_start_tli;
1412 XLogRecPtr written_lsn;
1413 XLogRecPtr flushed_lsn;
1414 TimeLineID received_tli;
1415 TimestampTz last_send_time;
1416 TimestampTz last_receipt_time;
1417 XLogRecPtr latest_end_lsn;
1418 TimestampTz latest_end_time;
1419 char sender_host[NI_MAXHOST];
1420 int sender_port = 0;
1421 char slotname[NAMEDATALEN];
1422 char conninfo[MAXCONNINFO];
1423
1424 /* Take a lock to ensure value consistency */
1426 pid = (int) WalRcv->pid;
1427 ready_to_display = WalRcv->ready_to_display;
1429 receive_start_lsn = WalRcv->receiveStart;
1430 receive_start_tli = WalRcv->receiveStartTLI;
1431 flushed_lsn = WalRcv->flushedUpto;
1432 received_tli = WalRcv->receivedTLI;
1433 last_send_time = WalRcv->lastMsgSendTime;
1434 last_receipt_time = WalRcv->lastMsgReceiptTime;
1435 latest_end_lsn = WalRcv->latestWalEnd;
1436 latest_end_time = WalRcv->latestWalEndTime;
1437 strlcpy(slotname, WalRcv->slotname, sizeof(slotname));
1438 strlcpy(sender_host, WalRcv->sender_host, sizeof(sender_host));
1439 sender_port = WalRcv->sender_port;
1440 strlcpy(conninfo, WalRcv->conninfo, sizeof(conninfo));
1442
1443 /*
1444 * No WAL receiver (or not ready yet), just return a tuple with NULL
1445 * values
1446 */
1447 if (pid == 0 || !ready_to_display)
1449
1450 /*
1451 * Read "writtenUpto" without holding a spinlock. Note that it may not be
1452 * consistent with the other shared variables of the WAL receiver
1453 * protected by a spinlock, but this should not be used for data integrity
1454 * checks.
1455 */
1456 written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto);
1457
1458 /* determine result type */
1459 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1460 elog(ERROR, "return type must be a row type");
1461
1462 values = palloc0(sizeof(Datum) * tupdesc->natts);
1463 nulls = palloc0(sizeof(bool) * tupdesc->natts);
1464
1465 /* Fetch values */
1466 values[0] = Int32GetDatum(pid);
1467
1468 if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
1469 {
1470 /*
1471 * Only superusers and roles with privileges of pg_read_all_stats can
1472 * see details. Other users only get the pid value to know whether it
1473 * is a WAL receiver, but no details.
1474 */
1475 memset(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1476 }
1477 else
1478 {
1480
1481 if (XLogRecPtrIsInvalid(receive_start_lsn))
1482 nulls[2] = true;
1483 else
1484 values[2] = LSNGetDatum(receive_start_lsn);
1485 values[3] = Int32GetDatum(receive_start_tli);
1486 if (XLogRecPtrIsInvalid(written_lsn))
1487 nulls[4] = true;
1488 else
1489 values[4] = LSNGetDatum(written_lsn);
1490 if (XLogRecPtrIsInvalid(flushed_lsn))
1491 nulls[5] = true;
1492 else
1493 values[5] = LSNGetDatum(flushed_lsn);
1494 values[6] = Int32GetDatum(received_tli);
1495 if (last_send_time == 0)
1496 nulls[7] = true;
1497 else
1498 values[7] = TimestampTzGetDatum(last_send_time);
1499 if (last_receipt_time == 0)
1500 nulls[8] = true;
1501 else
1502 values[8] = TimestampTzGetDatum(last_receipt_time);
1503 if (XLogRecPtrIsInvalid(latest_end_lsn))
1504 nulls[9] = true;
1505 else
1506 values[9] = LSNGetDatum(latest_end_lsn);
1507 if (latest_end_time == 0)
1508 nulls[10] = true;
1509 else
1510 values[10] = TimestampTzGetDatum(latest_end_time);
1511 if (*slotname == '\0')
1512 nulls[11] = true;
1513 else
1514 values[11] = CStringGetTextDatum(slotname);
1515 if (*sender_host == '\0')
1516 nulls[12] = true;
1517 else
1518 values[12] = CStringGetTextDatum(sender_host);
1519 if (sender_port == 0)
1520 nulls[13] = true;
1521 else
1522 values[13] = Int32GetDatum(sender_port);
1523 if (*conninfo == '\0')
1524 nulls[14] = true;
1525 else
1526 values[14] = CStringGetTextDatum(conninfo);
1527 }
1528
1529 /* Returns the record as Datum */
1531}
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:5268
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:467
static Datum values[MAXATTR]
Definition: bootstrap.c:151
#define CStringGetTextDatum(s)
Definition: builtins.h:97
int64 TimestampTz
Definition: timestamp.h:39
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define PG_RETURN_NULL()
Definition: fmgr.h:345
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:353
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:276
@ TYPEFUNC_COMPOSITE
Definition: funcapi.h:149
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
Definition: funcapi.h:230
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1117
void * palloc0(Size size)
Definition: mcxt.c:1347
Oid GetUserId(void)
Definition: miscinit.c:517
#define NAMEDATALEN
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
uintptr_t Datum
Definition: postgres.h:69
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:217
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:111
XLogRecPtr latestWalEnd
Definition: walreceiver.h:116
TimeLineID receiveStartTLI
Definition: walreceiver.h:87
TimeLineID receivedTLI
Definition: walreceiver.h:97
char slotname[NAMEDATALEN]
Definition: walreceiver.h:136
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:129
pid_t pid
Definition: walreceiver.h:68
XLogRecPtr receiveStart
Definition: walreceiver.h:86
XLogRecPtr flushedUpto
Definition: walreceiver.h:96
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:155
TimestampTz lastMsgSendTime
Definition: walreceiver.h:110
WalRcvState walRcvState
Definition: walreceiver.h:71
TimestampTz latestWalEndTime
Definition: walreceiver.h:117
bool ready_to_display
Definition: walreceiver.h:145
int sender_port
Definition: walreceiver.h:130
slock_t mutex
Definition: walreceiver.h:147
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:123
Definition: regguts.h:323
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
static const char * WalRcvGetStateString(WalRcvState state)
Definition: walreceiver.c:1377
#define MAXCONNINFO
Definition: walreceiver.h:37
WalRcvState
Definition: walreceiver.h:46
WalRcvData * WalRcv
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59

References WalRcvData::conninfo, CStringGetTextDatum, elog, ERROR, WalRcvData::flushedUpto, get_call_result_type(), GetUserId(), has_privs_of_role(), heap_form_tuple(), HeapTupleGetDatum(), Int32GetDatum(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum(), MAXCONNINFO, WalRcvData::mutex, NAMEDATALEN, TupleDescData::natts, palloc0(), pg_atomic_read_u64(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::sender_host, WalRcvData::sender_port, WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, strlcpy(), TimestampTzGetDatum(), TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, WalRcvData::writtenUpto, and XLogRecPtrIsInvalid.

◆ ProcessWalRcvInterrupts()

void ProcessWalRcvInterrupts ( void  )

Definition at line 162 of file walreceiver.c.

163{
164 /*
165 * Although walreceiver interrupt handling doesn't use the same scheme as
166 * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
167 * any incoming signals on Win32, and also to make sure we process any
168 * barrier events.
169 */
171
173 {
175 (errcode(ERRCODE_ADMIN_SHUTDOWN),
176 errmsg("terminating walreceiver process due to administrator command")));
177 }
178}
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define FATAL
Definition: elog.h:41
#define ereport(elevel,...)
Definition: elog.h:149
volatile sig_atomic_t ShutdownRequestPending
Definition: interrupt.c:28
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122

References CHECK_FOR_INTERRUPTS, ereport, errcode(), errmsg(), FATAL, and ShutdownRequestPending.

Referenced by libpqrcv_connect(), libpqrcv_PQgetResult(), libpqrcv_processTuples(), WalRcvWaitForStartPosition(), and WalReceiverMain().

◆ ProcessWalSndrMessage()

static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1266 of file walreceiver.c.

1267{
1268 WalRcvData *walrcv = WalRcv;
1269 TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1270
1271 /* Update shared-memory status */
1272 SpinLockAcquire(&walrcv->mutex);
1273 if (walrcv->latestWalEnd < walEnd)
1274 walrcv->latestWalEndTime = sendTime;
1275 walrcv->latestWalEnd = walEnd;
1276 walrcv->lastMsgSendTime = sendTime;
1277 walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1278 SpinLockRelease(&walrcv->mutex);
1279
1281 {
1282 char *sendtime;
1283 char *receipttime;
1284 int applyDelay;
1285
1286 /* Copy because timestamptz_to_str returns a static buffer */
1287 sendtime = pstrdup(timestamptz_to_str(sendTime));
1288 receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1289 applyDelay = GetReplicationApplyDelay();
1290
1291 /* apply delay is not available */
1292 if (applyDelay == -1)
1293 elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1294 sendtime,
1295 receipttime,
1297 else
1298 elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1299 sendtime,
1300 receipttime,
1301 applyDelay,
1303
1304 pfree(sendtime);
1305 pfree(receipttime);
1306 }
1307}
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1644
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1843
bool message_level_is_interesting(int elevel)
Definition: elog.c:272
#define DEBUG2
Definition: elog.h:29
char * pstrdup(const char *in)
Definition: mcxt.c:1696
void pfree(void *pointer)
Definition: mcxt.c:1521
int GetReplicationApplyDelay(void)
int GetReplicationTransferLatency(void)

References DEBUG2, elog, GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, message_level_is_interesting(), WalRcvData::mutex, pfree(), pstrdup(), SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

◆ WalRcvComputeNextWakeup()

static void WalRcvComputeNextWakeup ( WalRcvWakeupReason  reason,
TimestampTz  now 
)
static

Definition at line 1318 of file walreceiver.c.

1319{
1320 switch (reason)
1321 {
1323 if (wal_receiver_timeout <= 0)
1324 wakeup[reason] = TIMESTAMP_INFINITY;
1325 else
1327 break;
1328 case WALRCV_WAKEUP_PING:
1329 if (wal_receiver_timeout <= 0)
1330 wakeup[reason] = TIMESTAMP_INFINITY;
1331 else
1333 break;
1336 wakeup[reason] = TIMESTAMP_INFINITY;
1337 else
1339 break;
1342 wakeup[reason] = TIMESTAMP_INFINITY;
1343 else
1345 break;
1346 /* there's intentionally no default: here */
1347 }
1348}
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1608
#define TIMESTAMP_INFINITY
Definition: timestamp.h:151
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
#define TimestampTzPlusSeconds(tz, s)
Definition: timestamp.h:86
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]
Definition: walreceiver.c:129
bool hot_standby_feedback
Definition: walreceiver.c:89
int wal_receiver_status_interval
Definition: walreceiver.c:87
int wal_receiver_timeout
Definition: walreceiver.c:88

References hot_standby_feedback, now(), TIMESTAMP_INFINITY, TimestampTzPlusMilliseconds, TimestampTzPlusSeconds, wakeup, wal_receiver_status_interval, wal_receiver_timeout, WALRCV_WAKEUP_HSFEEDBACK, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_REPLY, and WALRCV_WAKEUP_TERMINATE.

Referenced by WalReceiverMain(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ WalRcvDie()

static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 802 of file walreceiver.c.

803{
804 WalRcvData *walrcv = WalRcv;
805 TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
806
807 Assert(*startpointTLI_p != 0);
808
809 /* Ensure that all WAL records received are flushed to disk */
810 XLogWalRcvFlush(true, *startpointTLI_p);
811
812 /* Mark ourselves inactive in shared memory */
813 SpinLockAcquire(&walrcv->mutex);
815 walrcv->walRcvState == WALRCV_RESTARTING ||
816 walrcv->walRcvState == WALRCV_STARTING ||
817 walrcv->walRcvState == WALRCV_WAITING ||
818 walrcv->walRcvState == WALRCV_STOPPING);
819 Assert(walrcv->pid == MyProcPid);
820 walrcv->walRcvState = WALRCV_STOPPED;
821 walrcv->pid = 0;
822 walrcv->procno = INVALID_PROC_NUMBER;
823 walrcv->ready_to_display = false;
824 SpinLockRelease(&walrcv->mutex);
825
827
828 /* Terminate the connection gracefully. */
829 if (wrconn != NULL)
831
832 /* Wake up the startup process to notice promptly that we're gone */
834}
#define Assert(condition)
Definition: c.h:815
void ConditionVariableBroadcast(ConditionVariable *cv)
int MyProcPid
Definition: globals.c:46
void * arg
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:317
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
ProcNumber procno
Definition: walreceiver.h:67
ConditionVariable walRcvStoppedCV
Definition: walreceiver.h:72
static WalReceiverConn * wrconn
Definition: walreceiver.c:92
static void XLogWalRcvFlush(bool dying, TimeLineID tli)
Definition: walreceiver.c:994
@ WALRCV_STARTING
Definition: walreceiver.h:48
@ WALRCV_STOPPED
Definition: walreceiver.h:47
@ WALRCV_RESTARTING
Definition: walreceiver.h:52
@ WALRCV_STREAMING
Definition: walreceiver.h:50
@ WALRCV_WAITING
Definition: walreceiver.h:51
@ WALRCV_STOPPING
Definition: walreceiver.h:53
#define walrcv_disconnect(conn)
Definition: walreceiver.h:467
void WakeupRecovery(void)

References arg, Assert, ConditionVariableBroadcast(), DatumGetPointer(), INVALID_PROC_NUMBER, WalRcvData::mutex, MyProcPid, WalRcvData::pid, WalRcvData::procno, WalRcvData::ready_to_display, SpinLockAcquire, SpinLockRelease, WakeupRecovery(), WalRcv, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, wrconn, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

◆ WalRcvFetchTimeLineHistoryFiles()

static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 746 of file walreceiver.c.

747{
748 TimeLineID tli;
749
750 for (tli = first; tli <= last; tli++)
751 {
752 /* there's no history file for timeline 1 */
753 if (tli != 1 && !existsTimeLineHistory(tli))
754 {
755 char *fname;
756 char *content;
757 int len;
758 char expectedfname[MAXFNAMELEN];
759
760 ereport(LOG,
761 (errmsg("fetching timeline history file for timeline %u from primary server",
762 tli)));
763
764 walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
765
766 /*
767 * Check that the filename on the primary matches what we
768 * calculated ourselves. This is just a sanity check, it should
769 * always match.
770 */
771 TLHistoryFileName(expectedfname, tli);
772 if (strcmp(fname, expectedfname) != 0)
774 (errcode(ERRCODE_PROTOCOL_VIOLATION),
775 errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
776 tli)));
777
778 /*
779 * Write the file to pg_wal.
780 */
781 writeTimeLineHistoryFile(tli, content, len);
782
783 /*
784 * Mark the streamed history file as ready for archiving if
785 * archive_mode is always.
786 */
789 else
790 XLogArchiveNotify(fname);
791
792 pfree(fname);
793 pfree(content);
794 }
795 }
796}
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition: timeline.c:463
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition: timeline.c:222
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1157
#define LOG
Definition: elog.h:31
const void size_t len
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
Definition: walreceiver.h:449
int XLogArchiveMode
Definition: xlog.c:119
@ ARCHIVE_MODE_ALWAYS
Definition: xlog.h:67
#define MAXFNAMELEN
static void TLHistoryFileName(char *fname, TimeLineID tli)
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:510
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:444

References ARCHIVE_MODE_ALWAYS, ereport, errcode(), errmsg(), errmsg_internal(), ERROR, existsTimeLineHistory(), len, LOG, MAXFNAMELEN, pfree(), TLHistoryFileName(), walrcv_readtimelinehistoryfile, wrconn, writeTimeLineHistoryFile(), XLogArchiveForceDone(), XLogArchiveMode, and XLogArchiveNotify().

Referenced by WalReceiverMain().

◆ WalRcvForceReply()

void WalRcvForceReply ( void  )

Definition at line 1359 of file walreceiver.c.

1360{
1361 ProcNumber procno;
1362
1363 WalRcv->force_reply = true;
1364 /* fetching the proc number is probably atomic, but don't rely on it */
1366 procno = WalRcv->procno;
1368 if (procno != INVALID_PROC_NUMBER)
1369 SetLatch(&GetPGProcByNumber(procno)->procLatch);
1370}
void SetLatch(Latch *latch)
Definition: latch.c:632
#define GetPGProcByNumber(n)
Definition: proc.h:423
int ProcNumber
Definition: procnumber.h:24
sig_atomic_t force_reply
Definition: walreceiver.h:162

References WalRcvData::force_reply, GetPGProcByNumber, INVALID_PROC_NUMBER, WalRcvData::mutex, WalRcvData::procno, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by ApplyWalRecord(), and WaitForWALToBecomeAvailable().

◆ WalRcvGetStateString()

static const char * WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1377 of file walreceiver.c.

1378{
1379 switch (state)
1380 {
1381 case WALRCV_STOPPED:
1382 return "stopped";
1383 case WALRCV_STARTING:
1384 return "starting";
1385 case WALRCV_STREAMING:
1386 return "streaming";
1387 case WALRCV_WAITING:
1388 return "waiting";
1389 case WALRCV_RESTARTING:
1390 return "restarting";
1391 case WALRCV_STOPPING:
1392 return "stopping";
1393 }
1394 return "UNKNOWN";
1395}

References WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

◆ WalRcvWaitForStartPosition()

static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 666 of file walreceiver.c.

667{
668 WalRcvData *walrcv = WalRcv;
669 int state;
670
671 SpinLockAcquire(&walrcv->mutex);
672 state = walrcv->walRcvState;
673 if (state != WALRCV_STREAMING)
674 {
675 SpinLockRelease(&walrcv->mutex);
676 if (state == WALRCV_STOPPING)
677 proc_exit(0);
678 else
679 elog(FATAL, "unexpected walreceiver state");
680 }
681 walrcv->walRcvState = WALRCV_WAITING;
683 walrcv->receiveStartTLI = 0;
684 SpinLockRelease(&walrcv->mutex);
685
686 set_ps_display("idle");
687
688 /*
689 * nudge startup process to notice that we've stopped streaming and are
690 * now waiting for instructions.
691 */
693 for (;;)
694 {
696
698
699 SpinLockAcquire(&walrcv->mutex);
701 walrcv->walRcvState == WALRCV_WAITING ||
702 walrcv->walRcvState == WALRCV_STOPPING);
703 if (walrcv->walRcvState == WALRCV_RESTARTING)
704 {
705 /*
706 * No need to handle changes in primary_conninfo or
707 * primary_slot_name here. Startup process will signal us to
708 * terminate in case those change.
709 */
710 *startpoint = walrcv->receiveStart;
711 *startpointTLI = walrcv->receiveStartTLI;
713 SpinLockRelease(&walrcv->mutex);
714 break;
715 }
716 if (walrcv->walRcvState == WALRCV_STOPPING)
717 {
718 /*
719 * We should've received SIGTERM if the startup process wants us
720 * to die, but might as well check it here too.
721 */
722 SpinLockRelease(&walrcv->mutex);
723 exit(1);
724 }
725 SpinLockRelease(&walrcv->mutex);
726
728 WAIT_EVENT_WAL_RECEIVER_WAIT_START);
729 }
730
732 {
733 char activitymsg[50];
734
735 snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
736 LSN_FORMAT_ARGS(*startpoint));
737 set_ps_display(activitymsg);
738 }
739}
struct Latch * MyLatch
Definition: globals.c:62
void proc_exit(int code)
Definition: ipc.c:104
void ResetLatch(Latch *latch)
Definition: latch.c:724
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:517
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
exit(1)
#define snprintf
Definition: port.h:239
bool update_process_title
Definition: ps_status.c:31
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
void ProcessWalRcvInterrupts(void)
Definition: walreceiver.c:162
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert, elog, exit(), FATAL, InvalidXLogRecPtr, LSN_FORMAT_ARGS, WalRcvData::mutex, MyLatch, proc_exit(), ProcessWalRcvInterrupts(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, ResetLatch(), set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by WalReceiverMain().

◆ WalReceiverMain()

void WalReceiverMain ( char *  startup_data,
size_t  startup_data_len 
)

Definition at line 183 of file walreceiver.c.

184{
185 char conninfo[MAXCONNINFO];
186 char *tmp_conninfo;
187 char slotname[NAMEDATALEN];
188 bool is_temp_slot;
189 XLogRecPtr startpoint;
190 TimeLineID startpointTLI;
191 TimeLineID primaryTLI;
192 bool first_stream;
193 WalRcvData *walrcv;
195 char *err;
196 char *sender_host = NULL;
197 int sender_port = 0;
198 char *appname;
199
200 Assert(startup_data_len == 0);
201
204
205 /*
206 * WalRcv should be set up already (if we are a backend, we inherit this
207 * by fork() or EXEC_BACKEND mechanism from the postmaster).
208 */
209 walrcv = WalRcv;
210 Assert(walrcv != NULL);
211
212 /*
213 * Mark walreceiver as running in shared memory.
214 *
215 * Do this as early as possible, so that if we fail later on, we'll set
216 * state to STOPPED. If we die before this, the startup process will keep
217 * waiting for us to start up, until it times out.
218 */
219 SpinLockAcquire(&walrcv->mutex);
220 Assert(walrcv->pid == 0);
221 switch (walrcv->walRcvState)
222 {
223 case WALRCV_STOPPING:
224 /* If we've already been requested to stop, don't start up. */
225 walrcv->walRcvState = WALRCV_STOPPED;
226 /* fall through */
227
228 case WALRCV_STOPPED:
229 SpinLockRelease(&walrcv->mutex);
231 proc_exit(1);
232 break;
233
234 case WALRCV_STARTING:
235 /* The usual case */
236 break;
237
238 case WALRCV_WAITING:
239 case WALRCV_STREAMING:
241 default:
242 /* Shouldn't happen */
243 SpinLockRelease(&walrcv->mutex);
244 elog(PANIC, "walreceiver still running according to shared memory state");
245 }
246 /* Advertise our PID so that the startup process can kill us */
247 walrcv->pid = MyProcPid;
249
250 /* Fetch information required to start streaming */
251 walrcv->ready_to_display = false;
252 strlcpy(conninfo, walrcv->conninfo, MAXCONNINFO);
253 strlcpy(slotname, walrcv->slotname, NAMEDATALEN);
254 is_temp_slot = walrcv->is_temp_slot;
255 startpoint = walrcv->receiveStart;
256 startpointTLI = walrcv->receiveStartTLI;
257
258 /*
259 * At most one of is_temp_slot and slotname can be set; otherwise,
260 * RequestXLogStreaming messed up.
261 */
262 Assert(!is_temp_slot || (slotname[0] == '\0'));
263
264 /* Initialise to a sanish value */
266 walrcv->lastMsgSendTime =
267 walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
268
269 /* Report our proc number so that others can wake us up */
270 walrcv->procno = MyProcNumber;
271
272 SpinLockRelease(&walrcv->mutex);
273
275
276 /* Arrange to clean up at walreceiver exit */
277 on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
278
279 /* Properly accept or ignore signals the postmaster might send us */
280 pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
281 * file */
282 pqsignal(SIGINT, SIG_IGN);
283 pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
284 /* SIGQUIT handler was already set up by InitPostmasterChild */
285 pqsignal(SIGALRM, SIG_IGN);
286 pqsignal(SIGPIPE, SIG_IGN);
288 pqsignal(SIGUSR2, SIG_IGN);
289
290 /* Reset some signals that are accepted by postmaster but not here */
291 pqsignal(SIGCHLD, SIG_DFL);
292
293 /* Load the libpq-specific functions */
294 load_file("libpqwalreceiver", false);
295 if (WalReceiverFunctions == NULL)
296 elog(ERROR, "libpqwalreceiver didn't initialize correctly");
297
298 /* Unblock signals (they were blocked when the postmaster forked us) */
299 sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
300
301 /* Establish the connection to the primary for XLOG streaming */
302 appname = cluster_name[0] ? cluster_name : "walreceiver";
303 wrconn = walrcv_connect(conninfo, true, false, false, appname, &err);
304 if (!wrconn)
306 (errcode(ERRCODE_CONNECTION_FAILURE),
307 errmsg("streaming replication receiver \"%s\" could not connect to the primary server: %s",
308 appname, err)));
309
310 /*
311 * Save user-visible connection string. This clobbers the original
312 * conninfo, for security. Also save host and port of the sender server
313 * this walreceiver is connected to.
314 */
315 tmp_conninfo = walrcv_get_conninfo(wrconn);
316 walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
317 SpinLockAcquire(&walrcv->mutex);
318 memset(walrcv->conninfo, 0, MAXCONNINFO);
319 if (tmp_conninfo)
320 strlcpy(walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
321
322 memset(walrcv->sender_host, 0, NI_MAXHOST);
323 if (sender_host)
324 strlcpy(walrcv->sender_host, sender_host, NI_MAXHOST);
325
326 walrcv->sender_port = sender_port;
327 walrcv->ready_to_display = true;
328 SpinLockRelease(&walrcv->mutex);
329
330 if (tmp_conninfo)
331 pfree(tmp_conninfo);
332
333 if (sender_host)
334 pfree(sender_host);
335
336 first_stream = true;
337 for (;;)
338 {
339 char *primary_sysid;
340 char standby_sysid[32];
342
343 /*
344 * Check that we're connected to a valid server using the
345 * IDENTIFY_SYSTEM replication command.
346 */
347 primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
348
349 snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
351 if (strcmp(primary_sysid, standby_sysid) != 0)
352 {
354 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
355 errmsg("database system identifier differs between the primary and standby"),
356 errdetail("The primary's identifier is %s, the standby's identifier is %s.",
357 primary_sysid, standby_sysid)));
358 }
359
360 /*
361 * Confirm that the current timeline of the primary is the same or
362 * ahead of ours.
363 */
364 if (primaryTLI < startpointTLI)
366 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
367 errmsg("highest timeline %u of the primary is behind recovery timeline %u",
368 primaryTLI, startpointTLI)));
369
370 /*
371 * Get any missing history files. We do this always, even when we're
372 * not interested in that timeline, so that if we're promoted to
373 * become the primary later on, we don't select the same timeline that
374 * was already used in the current primary. This isn't bullet-proof -
375 * you'll need some external software to manage your cluster if you
376 * need to ensure that a unique timeline id is chosen in every case,
377 * but let's avoid the confusion of timeline id collisions where we
378 * can.
379 */
380 WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
381
382 /*
383 * Create temporary replication slot if requested, and update slot
384 * name in shared memory. (Note the slot name cannot already be set
385 * in this case.)
386 */
387 if (is_temp_slot)
388 {
389 snprintf(slotname, sizeof(slotname),
390 "pg_walreceiver_%lld",
391 (long long int) walrcv_get_backend_pid(wrconn));
392
393 walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL);
394
395 SpinLockAcquire(&walrcv->mutex);
396 strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
397 SpinLockRelease(&walrcv->mutex);
398 }
399
400 /*
401 * Start streaming.
402 *
403 * We'll try to start at the requested starting point and timeline,
404 * even if it's different from the server's latest timeline. In case
405 * we've already reached the end of the old timeline, the server will
406 * finish the streaming immediately, and we will go back to await
407 * orders from the startup process. If recovery_target_timeline is
408 * 'latest', the startup process will scan pg_wal and find the new
409 * history file, bump recovery target timeline, and ask us to restart
410 * on the new timeline.
411 */
412 options.logical = false;
413 options.startpoint = startpoint;
414 options.slotname = slotname[0] != '\0' ? slotname : NULL;
415 options.proto.physical.startpointTLI = startpointTLI;
417 {
418 if (first_stream)
419 ereport(LOG,
420 (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
421 LSN_FORMAT_ARGS(startpoint), startpointTLI)));
422 else
423 ereport(LOG,
424 (errmsg("restarted WAL streaming at %X/%X on timeline %u",
425 LSN_FORMAT_ARGS(startpoint), startpointTLI)));
426 first_stream = false;
427
428 /* Initialize LogstreamResult and buffers for processing messages */
431
432 /* Initialize nap wakeup times. */
434 for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
436
437 /* Send initial reply/feedback messages. */
438 XLogWalRcvSendReply(true, false);
440
441 /* Loop until end-of-streaming or error */
442 for (;;)
443 {
444 char *buf;
445 int len;
446 bool endofwal = false;
447 pgsocket wait_fd = PGINVALID_SOCKET;
448 int rc;
449 TimestampTz nextWakeup;
450 long nap;
451
452 /*
453 * Exit walreceiver if we're not in recovery. This should not
454 * happen, but cross-check the status here.
455 */
456 if (!RecoveryInProgress())
458 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
459 errmsg("cannot continue WAL streaming, recovery has already ended")));
460
461 /* Process any requests or signals received recently */
463
465 {
466 ConfigReloadPending = false;
468 /* recompute wakeup times */
470 for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
473 }
474
475 /* See if we can read data immediately */
476 len = walrcv_receive(wrconn, &buf, &wait_fd);
477 if (len != 0)
478 {
479 /*
480 * Process the received data, and any subsequent data we
481 * can read without blocking.
482 */
483 for (;;)
484 {
485 if (len > 0)
486 {
487 /*
488 * Something was received from primary, so adjust
489 * the ping and terminate wakeup times.
490 */
493 now);
495 XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
496 startpointTLI);
497 }
498 else if (len == 0)
499 break;
500 else if (len < 0)
501 {
502 ereport(LOG,
503 (errmsg("replication terminated by primary server"),
504 errdetail("End of WAL reached on timeline %u at %X/%X.",
505 startpointTLI,
507 endofwal = true;
508 break;
509 }
510 len = walrcv_receive(wrconn, &buf, &wait_fd);
511 }
512
513 /* Let the primary know that we received some data. */
514 XLogWalRcvSendReply(false, false);
515
516 /*
517 * If we've written some records, flush them to disk and
518 * let the startup process and primary server know about
519 * them.
520 */
521 XLogWalRcvFlush(false, startpointTLI);
522 }
523
524 /* Check if we need to exit the streaming loop. */
525 if (endofwal)
526 break;
527
528 /* Find the soonest wakeup time, to limit our nap. */
529 nextWakeup = TIMESTAMP_INFINITY;
530 for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
531 nextWakeup = Min(wakeup[i], nextWakeup);
532
533 /* Calculate the nap time, clamping as necessary. */
535 nap = TimestampDifferenceMilliseconds(now, nextWakeup);
536
537 /*
538 * Ideally we would reuse a WaitEventSet object repeatedly
539 * here to avoid the overheads of WaitLatchOrSocket on epoll
540 * systems, but we can't be sure that libpq (or any other
541 * walreceiver implementation) has the same socket (even if
542 * the fd is the same number, it may have been closed and
543 * reopened since the last time). In future, if there is a
544 * function for removing sockets from WaitEventSet, then we
545 * could add and remove just the socket each time, potentially
546 * avoiding some system calls.
547 */
548 Assert(wait_fd != PGINVALID_SOCKET);
552 wait_fd,
553 nap,
554 WAIT_EVENT_WAL_RECEIVER_MAIN);
555 if (rc & WL_LATCH_SET)
556 {
559
560 if (walrcv->force_reply)
561 {
562 /*
563 * The recovery process has asked us to send apply
564 * feedback now. Make sure the flag is really set to
565 * false in shared memory before sending the reply, so
566 * we don't miss a new request for a reply.
567 */
568 walrcv->force_reply = false;
570 XLogWalRcvSendReply(true, false);
571 }
572 }
573 if (rc & WL_TIMEOUT)
574 {
575 /*
576 * We didn't receive anything new. If we haven't heard
577 * anything from the server for more than
578 * wal_receiver_timeout / 2, ping the server. Also, if
579 * it's been longer than wal_receiver_status_interval
580 * since the last update we sent, send a status update to
581 * the primary anyway, to report any progress in applying
582 * WAL.
583 */
584 bool requestReply = false;
585
586 /*
587 * Check if time since last receive from primary has
588 * reached the configured limit.
589 */
593 (errcode(ERRCODE_CONNECTION_FAILURE),
594 errmsg("terminating walreceiver due to timeout")));
595
596 /*
597 * If we didn't receive anything new for half of receiver
598 * replication timeout, then ping the server.
599 */
601 {
602 requestReply = true;
604 }
605
606 XLogWalRcvSendReply(requestReply, requestReply);
608 }
609 }
610
611 /*
612 * The backend finished streaming. Exit streaming COPY-mode from
613 * our side, too.
614 */
615 walrcv_endstreaming(wrconn, &primaryTLI);
616
617 /*
618 * If the server had switched to a new timeline that we didn't
619 * know about when we began streaming, fetch its timeline history
620 * file now.
621 */
622 WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
623 }
624 else
625 ereport(LOG,
626 (errmsg("primary server contains no more WAL on requested timeline %u",
627 startpointTLI)));
628
629 /*
630 * End of WAL reached on the requested timeline. Close the last
631 * segment, and await for new orders from the startup process.
632 */
633 if (recvFile >= 0)
634 {
635 char xlogfname[MAXFNAMELEN];
636
637 XLogWalRcvFlush(false, startpointTLI);
639 if (close(recvFile) != 0)
642 errmsg("could not close WAL segment %s: %m",
643 xlogfname)));
644
645 /*
646 * Create .done file forcibly to prevent the streamed segment from
647 * being archived later.
648 */
650 XLogArchiveForceDone(xlogfname);
651 else
652 XLogArchiveNotify(xlogfname);
653 }
654 recvFile = -1;
655
656 elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
657 WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
658 }
659 /* not reached */
660}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:485
#define pg_memory_barrier()
Definition: atomics.h:143
void AuxiliaryProcessMainCommon(void)
Definition: auxprocess.c:39
sigset_t UnBlockSig
Definition: pqsignal.c:22
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1756
#define Min(x, y)
Definition: c.h:961
#define UINT64_FORMAT
Definition: c.h:507
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:134
int errcode_for_file_access(void)
Definition: elog.c:876
int errdetail(const char *fmt,...)
Definition: elog.c:1203
#define PANIC
Definition: elog.h:42
#define DEBUG1
Definition: elog.h:30
void err(int eval, const char *fmt,...)
Definition: err.c:43
ProcNumber MyProcNumber
Definition: globals.c:89
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
@ PGC_SIGHUP
Definition: guc.h:75
char * cluster_name
Definition: guc_tables.c:537
#define close(a)
Definition: win32.h:12
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition: interrupt.c:105
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
int i
Definition: isn.c:72
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:565
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_TIMEOUT
Definition: latch.h:130
@ B_WAL_RECEIVER
Definition: miscadmin.h:364
BackendType MyBackendType
Definition: miscinit.c:64
static char ** options
static char * buf
Definition: pg_test_fsync.c:72
#define pqsignal
Definition: port.h:521
int pgsocket
Definition: port.h:29
#define PGINVALID_SOCKET
Definition: port.h:31
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:327
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:671
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
bool is_temp_slot
Definition: walreceiver.h:142
#define NUM_WALRCV_WAKEUPS
Definition: walreceiver.c:123
static StringInfoData reply_message
Definition: walreceiver.c:131
static int recvFile
Definition: walreceiver.c:100
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
Definition: walreceiver.c:746
static TimeLineID recvFileTLI
Definition: walreceiver.c:101
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:93
static XLogSegNo recvSegNo
Definition: walreceiver.c:102
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1170
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
Definition: walreceiver.c:666
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
Definition: walreceiver.c:840
static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
Definition: walreceiver.c:1318
static void WalRcvDie(int code, Datum arg)
Definition: walreceiver.c:802
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1101
static struct @19 LogstreamResult
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:451
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:435
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
Definition: walreceiver.h:441
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
Definition: walreceiver.h:459
#define walrcv_get_conninfo(conn)
Definition: walreceiver.h:439
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:453
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:443
#define walrcv_get_backend_pid(conn)
Definition: walreceiver.h:463
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:455
#define SIGCHLD
Definition: win32_port.h:168
#define SIGHUP
Definition: win32_port.h:158
#define SIGPIPE
Definition: win32_port.h:163
#define SIGUSR1
Definition: win32_port.h:170
#define SIGALRM
Definition: win32_port.h:164
#define SIGUSR2
Definition: win32_port.h:171
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4589
bool RecoveryInProgress(void)
Definition: xlog.c:6355
int wal_segment_size
Definition: xlog.c:143
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References ARCHIVE_MODE_ALWAYS, Assert, AuxiliaryProcessMainCommon(), B_WAL_RECEIVER, buf, close, cluster_name, ConditionVariableBroadcast(), ConfigReloadPending, WalRcvData::conninfo, DEBUG1, elog, ereport, err(), errcode(), errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, WalRcvData::force_reply, GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), i, initStringInfo(), WalRcvData::is_temp_slot, WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEndTime, len, load_file(), LOG, LogstreamResult, LSN_FORMAT_ARGS, MAXCONNINFO, MAXFNAMELEN, Min, WalRcvData::mutex, MyBackendType, MyLatch, MyProcNumber, MyProcPid, NAMEDATALEN, now(), NUM_WALRCV_WAKEUPS, on_shmem_exit(), options, PANIC, pfree(), pg_atomic_write_u64(), pg_memory_barrier, PGC_SIGHUP, PGINVALID_SOCKET, WalRcvData::pid, PointerGetDatum(), pqsignal, proc_exit(), ProcessConfigFile(), ProcessWalRcvInterrupts(), WalRcvData::procno, procsignal_sigusr1_handler(), WalRcvData::ready_to_display, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, reply_message, ResetLatch(), WalRcvData::sender_host, WalRcvData::sender_port, SIGALRM, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SIGPIPE, SIGUSR1, SIGUSR2, WalRcvData::slotname, snprintf, SpinLockAcquire, SpinLockRelease, strlcpy(), TIMESTAMP_INFINITY, TimestampDifferenceMilliseconds(), UINT64_FORMAT, UnBlockSig, WaitLatchOrSocket(), wakeup, wal_segment_size, WalRcv, walrcv_connect, walrcv_create_slot, walrcv_endstreaming, walrcv_get_backend_pid, walrcv_get_conninfo, walrcv_get_senderinfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_TERMINATE, WalRcvComputeNextWakeup(), WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, WalRcvWaitForStartPosition(), WalReceiverFunctions, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_TIMEOUT, wrconn, WalRcvData::writtenUpto, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName(), XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ XLogWalRcvClose()

static void XLogWalRcvClose ( XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 1049 of file walreceiver.c.

1050{
1051 char xlogfname[MAXFNAMELEN];
1052
1054 Assert(tli != 0);
1055
1056 /*
1057 * fsync() and close current file before we switch to next one. We would
1058 * otherwise have to reopen this file to fsync it later
1059 */
1060 XLogWalRcvFlush(false, tli);
1061
1063
1064 /*
1065 * XLOG segment files will be re-read by recovery in startup process soon,
1066 * so we don't advise the OS to release cache pages associated with the
1067 * file like XLogFileClose() does.
1068 */
1069 if (close(recvFile) != 0)
1070 ereport(PANIC,
1072 errmsg("could not close WAL segment %s: %m",
1073 xlogfname)));
1074
1075 /*
1076 * Create .done file forcibly to prevent the streamed segment from being
1077 * archived later.
1078 */
1080 XLogArchiveForceDone(xlogfname);
1081 else
1082 XLogArchiveNotify(xlogfname);
1083
1084 recvFile = -1;
1085}
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

References ARCHIVE_MODE_ALWAYS, Assert, close, ereport, errcode_for_file_access(), errmsg(), MAXFNAMELEN, PANIC, recvFile, recvFileTLI, recvSegNo, wal_segment_size, XLByteInSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName(), and XLogWalRcvFlush().

Referenced by XLogWalRcvWrite().

◆ XLogWalRcvFlush()

static void XLogWalRcvFlush ( bool  dying,
TimeLineID  tli 
)
static

Definition at line 994 of file walreceiver.c.

995{
996 Assert(tli != 0);
997
998 if (LogstreamResult.Flush < LogstreamResult.Write)
999 {
1000 WalRcvData *walrcv = WalRcv;
1001
1003
1004 LogstreamResult.Flush = LogstreamResult.Write;
1005
1006 /* Update shared-memory status */
1007 SpinLockAcquire(&walrcv->mutex);
1008 if (walrcv->flushedUpto < LogstreamResult.Flush)
1009 {
1010 walrcv->latestChunkStart = walrcv->flushedUpto;
1011 walrcv->flushedUpto = LogstreamResult.Flush;
1012 walrcv->receivedTLI = tli;
1013 }
1014 SpinLockRelease(&walrcv->mutex);
1015
1016 /* Signal the startup process and walsender that new WAL has arrived */
1019 WalSndWakeup(true, false);
1020
1021 /* Report XLOG streaming progress in PS display */
1023 {
1024 char activitymsg[50];
1025
1026 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1028 set_ps_display(activitymsg);
1029 }
1030
1031 /* Also let the primary know that we made some progress */
1032 if (!dying)
1033 {
1034 XLogWalRcvSendReply(false, false);
1036 }
1037 }
1038}
XLogRecPtr latestChunkStart
Definition: walreceiver.h:105
#define AllowCascadeReplication()
Definition: walreceiver.h:40
void WalSndWakeup(bool physical, bool logical)
Definition: walsender.c:3637
void issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:8704

References AllowCascadeReplication, Assert, WalRcvData::flushedUpto, issue_xlog_fsync(), WalRcvData::latestChunkStart, LogstreamResult, LSN_FORMAT_ARGS, WalRcvData::mutex, WalRcvData::receivedTLI, recvFile, recvSegNo, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvClose().

◆ XLogWalRcvProcessMsg()

static void XLogWalRcvProcessMsg ( unsigned char  type,
char *  buf,
Size  len,
TimeLineID  tli 
)
static

Definition at line 840 of file walreceiver.c.

841{
842 int hdrlen;
843 XLogRecPtr dataStart;
844 XLogRecPtr walEnd;
845 TimestampTz sendTime;
846 bool replyRequested;
847
848 switch (type)
849 {
850 case 'w': /* WAL records */
851 {
852 StringInfoData incoming_message;
853
854 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
855 if (len < hdrlen)
857 (errcode(ERRCODE_PROTOCOL_VIOLATION),
858 errmsg_internal("invalid WAL message received from primary")));
859
860 /* initialize a StringInfo with the given buffer */
861 initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
862
863 /* read the fields */
864 dataStart = pq_getmsgint64(&incoming_message);
865 walEnd = pq_getmsgint64(&incoming_message);
866 sendTime = pq_getmsgint64(&incoming_message);
867 ProcessWalSndrMessage(walEnd, sendTime);
868
869 buf += hdrlen;
870 len -= hdrlen;
871 XLogWalRcvWrite(buf, len, dataStart, tli);
872 break;
873 }
874 case 'k': /* Keepalive */
875 {
876 StringInfoData incoming_message;
877
878 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
879 if (len != hdrlen)
881 (errcode(ERRCODE_PROTOCOL_VIOLATION),
882 errmsg_internal("invalid keepalive message received from primary")));
883
884 /* initialize a StringInfo with the given buffer */
885 initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
886
887 /* read the fields */
888 walEnd = pq_getmsgint64(&incoming_message);
889 sendTime = pq_getmsgint64(&incoming_message);
890 replyRequested = pq_getmsgbyte(&incoming_message);
891
892 ProcessWalSndrMessage(walEnd, sendTime);
893
894 /* If the primary requested a reply, send one immediately */
895 if (replyRequested)
896 XLogWalRcvSendReply(true, false);
897 break;
898 }
899 default:
901 (errcode(ERRCODE_PROTOCOL_VIOLATION),
902 errmsg_internal("invalid replication message type %d",
903 type)));
904 }
905}
int64_t int64
Definition: c.h:485
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:453
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition: stringinfo.h:157
const char * type
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
Definition: walreceiver.c:1266
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
Definition: walreceiver.c:911

References buf, ereport, errcode(), errmsg_internal(), ERROR, initReadOnlyStringInfo(), len, pq_getmsgbyte(), pq_getmsgint64(), ProcessWalSndrMessage(), type, XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

◆ XLogWalRcvSendHSFeedback()

static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1170 of file walreceiver.c.

1171{
1173 FullTransactionId nextFullXid;
1174 TransactionId nextXid;
1175 uint32 xmin_epoch,
1176 catalog_xmin_epoch;
1177 TransactionId xmin,
1178 catalog_xmin;
1179
1180 /* initially true so we always send at least one feedback message */
1181 static bool primary_has_standby_xmin = true;
1182
1183 /*
1184 * If the user doesn't want status to be reported to the primary, be sure
1185 * to exit before doing anything at all.
1186 */
1188 !primary_has_standby_xmin)
1189 return;
1190
1191 /* Get current timestamp. */
1193
1194 /* Send feedback at most once per wal_receiver_status_interval. */
1195 if (!immed && now < wakeup[WALRCV_WAKEUP_HSFEEDBACK])
1196 return;
1197
1198 /* Make sure we wake up when it's time to send feedback again. */
1200
1201 /*
1202 * If Hot Standby is not yet accepting connections there is nothing to
1203 * send. Check this after the interval has expired to reduce number of
1204 * calls.
1205 *
1206 * Bailing out here also ensures that we don't send feedback until we've
1207 * read our own replication slot state, so we don't tell the primary to
1208 * discard needed xmin or catalog_xmin from any slots that may exist on
1209 * this replica.
1210 */
1211 if (!HotStandbyActive())
1212 return;
1213
1214 /*
1215 * Make the expensive call to get the oldest xmin once we are certain
1216 * everything else has been checked.
1217 */
1219 {
1220 GetReplicationHorizons(&xmin, &catalog_xmin);
1221 }
1222 else
1223 {
1224 xmin = InvalidTransactionId;
1225 catalog_xmin = InvalidTransactionId;
1226 }
1227
1228 /*
1229 * Get epoch and adjust if nextXid and oldestXmin are different sides of
1230 * the epoch boundary.
1231 */
1232 nextFullXid = ReadNextFullTransactionId();
1233 nextXid = XidFromFullTransactionId(nextFullXid);
1234 xmin_epoch = EpochFromFullTransactionId(nextFullXid);
1235 catalog_xmin_epoch = xmin_epoch;
1236 if (nextXid < xmin)
1237 xmin_epoch--;
1238 if (nextXid < catalog_xmin)
1239 catalog_xmin_epoch--;
1240
1241 elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1242 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1243
1244 /* Construct the message and send it. */
1249 pq_sendint32(&reply_message, xmin_epoch);
1250 pq_sendint32(&reply_message, catalog_xmin);
1251 pq_sendint32(&reply_message, catalog_xmin_epoch);
1253 if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1254 primary_has_standby_xmin = true;
1255 else
1256 primary_has_standby_xmin = false;
1257}
uint32_t uint32
Definition: c.h:488
uint32 TransactionId
Definition: c.h:609
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:144
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152
void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:2047
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:126
#define InvalidTransactionId
Definition: transam.h:31
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
#define TransactionIdIsValid(xid)
Definition: transam.h:41
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:288
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:457
bool HotStandbyActive(void)

References StringInfoData::data, DEBUG2, elog, EpochFromFullTransactionId, GetCurrentTimestamp(), GetReplicationHorizons(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), pq_sendbyte(), pq_sendint32(), pq_sendint64(), ReadNextFullTransactionId(), reply_message, resetStringInfo(), TransactionIdIsValid, wakeup, wal_receiver_status_interval, walrcv_send, WALRCV_WAKEUP_HSFEEDBACK, WalRcvComputeNextWakeup(), wrconn, and XidFromFullTransactionId.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

◆ XLogWalRcvSendReply()

static void XLogWalRcvSendReply ( bool  force,
bool  requestReply 
)
static

Definition at line 1101 of file walreceiver.c.

1102{
1103 static XLogRecPtr writePtr = 0;
1104 static XLogRecPtr flushPtr = 0;
1105 XLogRecPtr applyPtr;
1107
1108 /*
1109 * If the user doesn't want status to be reported to the primary, be sure
1110 * to exit before doing anything at all.
1111 */
1112 if (!force && wal_receiver_status_interval <= 0)
1113 return;
1114
1115 /* Get current timestamp. */
1117
1118 /*
1119 * We can compare the write and flush positions to the last message we
1120 * sent without taking any lock, but the apply position requires a spin
1121 * lock, so we don't check that unless something else has changed or 10
1122 * seconds have passed. This means that the apply WAL location will
1123 * appear, from the primary's point of view, to lag slightly, but since
1124 * this is only for reporting purposes and only on idle systems, that's
1125 * probably OK.
1126 */
1127 if (!force
1128 && writePtr == LogstreamResult.Write
1129 && flushPtr == LogstreamResult.Flush
1131 return;
1132
1133 /* Make sure we wake up when it's time to send another reply. */
1135
1136 /* Construct a new message */
1137 writePtr = LogstreamResult.Write;
1138 flushPtr = LogstreamResult.Flush;
1139 applyPtr = GetXLogReplayRecPtr(NULL);
1140
1143 pq_sendint64(&reply_message, writePtr);
1144 pq_sendint64(&reply_message, flushPtr);
1145 pq_sendint64(&reply_message, applyPtr);
1147 pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1148
1149 /* Send it */
1150 elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1151 LSN_FORMAT_ARGS(writePtr),
1152 LSN_FORMAT_ARGS(flushPtr),
1153 LSN_FORMAT_ARGS(applyPtr),
1154 requestReply ? " (reply requested)" : "");
1155
1157}

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), GetXLogReplayRecPtr(), StringInfoData::len, LogstreamResult, LSN_FORMAT_ARGS, now(), pq_sendbyte(), pq_sendint64(), reply_message, resetStringInfo(), wakeup, wal_receiver_status_interval, walrcv_send, WALRCV_WAKEUP_REPLY, WalRcvComputeNextWakeup(), and wrconn.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

◆ XLogWalRcvWrite()

static void XLogWalRcvWrite ( char *  buf,
Size  nbytes,
XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 911 of file walreceiver.c.

912{
913 int startoff;
914 int byteswritten;
915
916 Assert(tli != 0);
917
918 while (nbytes > 0)
919 {
920 int segbytes;
921
922 /* Close the current segment if it's completed */
923 if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
924 XLogWalRcvClose(recptr, tli);
925
926 if (recvFile < 0)
927 {
928 /* Create/use new log file */
931 recvFileTLI = tli;
932 }
933
934 /* Calculate the start offset of the received logs */
935 startoff = XLogSegmentOffset(recptr, wal_segment_size);
936
937 if (startoff + nbytes > wal_segment_size)
938 segbytes = wal_segment_size - startoff;
939 else
940 segbytes = nbytes;
941
942 /* OK to write the logs */
943 errno = 0;
944
945 byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
946 if (byteswritten <= 0)
947 {
948 char xlogfname[MAXFNAMELEN];
949 int save_errno;
950
951 /* if write didn't set errno, assume no disk space */
952 if (errno == 0)
953 errno = ENOSPC;
954
955 save_errno = errno;
957 errno = save_errno;
960 errmsg("could not write to WAL segment %s "
961 "at offset %d, length %lu: %m",
962 xlogfname, startoff, (unsigned long) segbytes)));
963 }
964
965 /* Update state for write */
966 recptr += byteswritten;
967
968 nbytes -= byteswritten;
969 buf += byteswritten;
970
971 LogstreamResult.Write = recptr;
972 }
973
974 /* Update shared-memory status */
976
977 /*
978 * Close the current segment if it's fully written up in the last cycle of
979 * the loop, to create its archive notification file soon. Otherwise WAL
980 * archiving of the segment will be delayed until any data in the next
981 * segment is received and written.
982 */
983 if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
984 XLogWalRcvClose(recptr, tli);
985}
#define pg_pwrite
Definition: port.h:227
static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
Definition: walreceiver.c:1049
int XLogFileInit(XLogSegNo logsegno, TimeLineID logtli)
Definition: xlog.c:3401
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

References Assert, buf, ereport, errcode_for_file_access(), errmsg(), LogstreamResult, MAXFNAMELEN, PANIC, pg_atomic_write_u64(), pg_pwrite, recvFile, recvFileTLI, recvSegNo, wal_segment_size, WalRcv, WalRcvData::writtenUpto, XLByteInSeg, XLByteToSeg, XLogFileInit(), XLogFileName(), XLogSegmentOffset, and XLogWalRcvClose().

Referenced by XLogWalRcvProcessMsg().

Variable Documentation

◆ Flush

XLogRecPtr Flush

Definition at line 111 of file walreceiver.c.

Referenced by XLogWrite().

◆ hot_standby_feedback

bool hot_standby_feedback

◆ 

struct { ... } LogstreamResult

◆ recvFile

int recvFile = -1
static

Definition at line 100 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ recvFileTLI

TimeLineID recvFileTLI = 0
static

Definition at line 101 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), and XLogWalRcvWrite().

◆ recvSegNo

XLogSegNo recvSegNo = 0
static

Definition at line 102 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ reply_message

StringInfoData reply_message
static

◆ wakeup

◆ wal_receiver_status_interval

int wal_receiver_status_interval

◆ wal_receiver_timeout

int wal_receiver_timeout

◆ WalReceiverFunctions

WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 93 of file walreceiver.c.

Referenced by _PG_init(), and WalReceiverMain().

◆ wrconn

◆ Write

XLogRecPtr Write

Definition at line 110 of file walreceiver.c.

Referenced by XLogWrite().