PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
walreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "access/xlogrecovery.h"
#include "catalog/pg_authid.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/auxprocess.h"
#include "postmaster/interrupt.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NUM_WALRCV_WAKEUPS   (WALRCV_WAKEUP_HSFEEDBACK + 1)
 

Typedefs

typedef enum WalRcvWakeupReason WalRcvWakeupReason
 

Enumerations

enum  WalRcvWakeupReason { WALRCV_WAKEUP_TERMINATE , WALRCV_WAKEUP_PING , WALRCV_WAKEUP_REPLY , WALRCV_WAKEUP_HSFEEDBACK }
 

Functions

static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len, TimeLineID tli)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvFlush (bool dying, TimeLineID tli)
 
static void XLogWalRcvClose (XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvSendReply (bool force, bool requestReply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
static void WalRcvComputeNextWakeup (WalRcvWakeupReason reason, TimestampTz now)
 
void WalReceiverMain (const void *startup_data, size_t startup_data_len)
 
void WalRcvForceReply (void)
 
static const char * WalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
struct {
   XLogRecPtr   Write
 
   XLogRecPtr   Flush
 
LogstreamResult
 
static TimestampTz wakeup [NUM_WALRCV_WAKEUPS]
 
static StringInfoData reply_message
 

Macro Definition Documentation

◆ NUM_WALRCV_WAKEUPS

#define NUM_WALRCV_WAKEUPS   (WALRCV_WAKEUP_HSFEEDBACK + 1)

Definition at line 124 of file walreceiver.c.

Typedef Documentation

◆ WalRcvWakeupReason

Enumeration Type Documentation

◆ WalRcvWakeupReason

Enumerator
WALRCV_WAKEUP_TERMINATE 
WALRCV_WAKEUP_PING 
WALRCV_WAKEUP_REPLY 
WALRCV_WAKEUP_HSFEEDBACK 

Definition at line 118 of file walreceiver.c.

119{
124#define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1)
WalRcvWakeupReason
Definition: walreceiver.c:119
@ WALRCV_WAKEUP_TERMINATE
Definition: walreceiver.c:120
@ WALRCV_WAKEUP_REPLY
Definition: walreceiver.c:122
@ WALRCV_WAKEUP_PING
Definition: walreceiver.c:121
@ WALRCV_WAKEUP_HSFEEDBACK
Definition: walreceiver.c:123

Function Documentation

◆ pg_stat_get_wal_receiver()

Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1393 of file walreceiver.c.

1394{
1395 TupleDesc tupdesc;
1396 Datum *values;
1397 bool *nulls;
1398 int pid;
1399 bool ready_to_display;
1401 XLogRecPtr receive_start_lsn;
1402 TimeLineID receive_start_tli;
1403 XLogRecPtr written_lsn;
1404 XLogRecPtr flushed_lsn;
1405 TimeLineID received_tli;
1406 TimestampTz last_send_time;
1407 TimestampTz last_receipt_time;
1408 XLogRecPtr latest_end_lsn;
1409 TimestampTz latest_end_time;
1410 char sender_host[NI_MAXHOST];
1411 int sender_port = 0;
1412 char slotname[NAMEDATALEN];
1413 char conninfo[MAXCONNINFO];
1414
1415 /* Take a lock to ensure value consistency */
1417 pid = (int) WalRcv->pid;
1418 ready_to_display = WalRcv->ready_to_display;
1420 receive_start_lsn = WalRcv->receiveStart;
1421 receive_start_tli = WalRcv->receiveStartTLI;
1422 flushed_lsn = WalRcv->flushedUpto;
1423 received_tli = WalRcv->receivedTLI;
1424 last_send_time = WalRcv->lastMsgSendTime;
1425 last_receipt_time = WalRcv->lastMsgReceiptTime;
1426 latest_end_lsn = WalRcv->latestWalEnd;
1427 latest_end_time = WalRcv->latestWalEndTime;
1428 strlcpy(slotname, WalRcv->slotname, sizeof(slotname));
1429 strlcpy(sender_host, WalRcv->sender_host, sizeof(sender_host));
1430 sender_port = WalRcv->sender_port;
1431 strlcpy(conninfo, WalRcv->conninfo, sizeof(conninfo));
1433
1434 /*
1435 * No WAL receiver (or not ready yet), just return a tuple with NULL
1436 * values
1437 */
1438 if (pid == 0 || !ready_to_display)
1440
1441 /*
1442 * Read "writtenUpto" without holding a spinlock. Note that it may not be
1443 * consistent with the other shared variables of the WAL receiver
1444 * protected by a spinlock, but this should not be used for data integrity
1445 * checks.
1446 */
1447 written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto);
1448
1449 /* determine result type */
1450 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1451 elog(ERROR, "return type must be a row type");
1452
1453 values = palloc0(sizeof(Datum) * tupdesc->natts);
1454 nulls = palloc0(sizeof(bool) * tupdesc->natts);
1455
1456 /* Fetch values */
1457 values[0] = Int32GetDatum(pid);
1458
1459 if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
1460 {
1461 /*
1462 * Only superusers and roles with privileges of pg_read_all_stats can
1463 * see details. Other users only get the pid value to know whether it
1464 * is a WAL receiver, but no details.
1465 */
1466 memset(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1467 }
1468 else
1469 {
1471
1472 if (XLogRecPtrIsInvalid(receive_start_lsn))
1473 nulls[2] = true;
1474 else
1475 values[2] = LSNGetDatum(receive_start_lsn);
1476 values[3] = Int32GetDatum(receive_start_tli);
1477 if (XLogRecPtrIsInvalid(written_lsn))
1478 nulls[4] = true;
1479 else
1480 values[4] = LSNGetDatum(written_lsn);
1481 if (XLogRecPtrIsInvalid(flushed_lsn))
1482 nulls[5] = true;
1483 else
1484 values[5] = LSNGetDatum(flushed_lsn);
1485 values[6] = Int32GetDatum(received_tli);
1486 if (last_send_time == 0)
1487 nulls[7] = true;
1488 else
1489 values[7] = TimestampTzGetDatum(last_send_time);
1490 if (last_receipt_time == 0)
1491 nulls[8] = true;
1492 else
1493 values[8] = TimestampTzGetDatum(last_receipt_time);
1494 if (XLogRecPtrIsInvalid(latest_end_lsn))
1495 nulls[9] = true;
1496 else
1497 values[9] = LSNGetDatum(latest_end_lsn);
1498 if (latest_end_time == 0)
1499 nulls[10] = true;
1500 else
1501 values[10] = TimestampTzGetDatum(latest_end_time);
1502 if (*slotname == '\0')
1503 nulls[11] = true;
1504 else
1505 values[11] = CStringGetTextDatum(slotname);
1506 if (*sender_host == '\0')
1507 nulls[12] = true;
1508 else
1509 values[12] = CStringGetTextDatum(sender_host);
1510 if (sender_port == 0)
1511 nulls[13] = true;
1512 else
1513 values[13] = Int32GetDatum(sender_port);
1514 if (*conninfo == '\0')
1515 nulls[14] = true;
1516 else
1517 values[14] = CStringGetTextDatum(conninfo);
1518 }
1519
1520 /* Returns the record as Datum */
1522}
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:5268
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:467
static Datum values[MAXATTR]
Definition: bootstrap.c:151
#define CStringGetTextDatum(s)
Definition: builtins.h:97
int64 TimestampTz
Definition: timestamp.h:39
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define PG_RETURN_NULL()
Definition: fmgr.h:345
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:353
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:276
@ TYPEFUNC_COMPOSITE
Definition: funcapi.h:149
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
Definition: funcapi.h:230
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1117
void * palloc0(Size size)
Definition: mcxt.c:1969
Oid GetUserId(void)
Definition: miscinit.c:520
#define NAMEDATALEN
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
uintptr_t Datum
Definition: postgres.h:69
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:217
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:111
XLogRecPtr latestWalEnd
Definition: walreceiver.h:116
TimeLineID receiveStartTLI
Definition: walreceiver.h:87
TimeLineID receivedTLI
Definition: walreceiver.h:97
char slotname[NAMEDATALEN]
Definition: walreceiver.h:136
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:129
pid_t pid
Definition: walreceiver.h:68
XLogRecPtr receiveStart
Definition: walreceiver.h:86
XLogRecPtr flushedUpto
Definition: walreceiver.h:96
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:155
TimestampTz lastMsgSendTime
Definition: walreceiver.h:110
WalRcvState walRcvState
Definition: walreceiver.h:71
TimestampTz latestWalEndTime
Definition: walreceiver.h:117
bool ready_to_display
Definition: walreceiver.h:145
int sender_port
Definition: walreceiver.h:130
slock_t mutex
Definition: walreceiver.h:147
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:123
Definition: regguts.h:323
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
static const char * WalRcvGetStateString(WalRcvState state)
Definition: walreceiver.c:1368
#define MAXCONNINFO
Definition: walreceiver.h:37
WalRcvState
Definition: walreceiver.h:46
WalRcvData * WalRcv
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59

References WalRcvData::conninfo, CStringGetTextDatum, elog, ERROR, WalRcvData::flushedUpto, get_call_result_type(), GetUserId(), has_privs_of_role(), heap_form_tuple(), HeapTupleGetDatum(), Int32GetDatum(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum(), MAXCONNINFO, WalRcvData::mutex, NAMEDATALEN, TupleDescData::natts, palloc0(), pg_atomic_read_u64(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::sender_host, WalRcvData::sender_port, WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, strlcpy(), TimestampTzGetDatum(), TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, WalRcvData::writtenUpto, and XLogRecPtrIsInvalid.

◆ ProcessWalSndrMessage()

static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1257 of file walreceiver.c.

1258{
1259 WalRcvData *walrcv = WalRcv;
1260 TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1261
1262 /* Update shared-memory status */
1263 SpinLockAcquire(&walrcv->mutex);
1264 if (walrcv->latestWalEnd < walEnd)
1265 walrcv->latestWalEndTime = sendTime;
1266 walrcv->latestWalEnd = walEnd;
1267 walrcv->lastMsgSendTime = sendTime;
1268 walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1269 SpinLockRelease(&walrcv->mutex);
1270
1272 {
1273 char *sendtime;
1274 char *receipttime;
1275 int applyDelay;
1276
1277 /* Copy because timestamptz_to_str returns a static buffer */
1278 sendtime = pstrdup(timestamptz_to_str(sendTime));
1279 receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1280 applyDelay = GetReplicationApplyDelay();
1281
1282 /* apply delay is not available */
1283 if (applyDelay == -1)
1284 elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1285 sendtime,
1286 receipttime,
1288 else
1289 elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1290 sendtime,
1291 receipttime,
1292 applyDelay,
1294
1295 pfree(sendtime);
1296 pfree(receipttime);
1297 }
1298}
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1862
bool message_level_is_interesting(int elevel)
Definition: elog.c:273
#define DEBUG2
Definition: elog.h:29
char * pstrdup(const char *in)
Definition: mcxt.c:2321
void pfree(void *pointer)
Definition: mcxt.c:2146
int GetReplicationApplyDelay(void)
int GetReplicationTransferLatency(void)

References DEBUG2, elog, GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, message_level_is_interesting(), WalRcvData::mutex, pfree(), pstrdup(), SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

◆ WalRcvComputeNextWakeup()

static void WalRcvComputeNextWakeup ( WalRcvWakeupReason  reason,
TimestampTz  now 
)
static

Definition at line 1309 of file walreceiver.c.

1310{
1311 switch (reason)
1312 {
1314 if (wal_receiver_timeout <= 0)
1315 wakeup[reason] = TIMESTAMP_INFINITY;
1316 else
1318 break;
1319 case WALRCV_WAKEUP_PING:
1320 if (wal_receiver_timeout <= 0)
1321 wakeup[reason] = TIMESTAMP_INFINITY;
1322 else
1324 break;
1327 wakeup[reason] = TIMESTAMP_INFINITY;
1328 else
1330 break;
1333 wakeup[reason] = TIMESTAMP_INFINITY;
1334 else
1336 break;
1337 /* there's intentionally no default: here */
1338 }
1339}
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
#define TIMESTAMP_INFINITY
Definition: timestamp.h:151
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
#define TimestampTzPlusSeconds(tz, s)
Definition: timestamp.h:86
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]
Definition: walreceiver.c:130
bool hot_standby_feedback
Definition: walreceiver.c:90
int wal_receiver_status_interval
Definition: walreceiver.c:88
int wal_receiver_timeout
Definition: walreceiver.c:89

References hot_standby_feedback, now(), TIMESTAMP_INFINITY, TimestampTzPlusMilliseconds, TimestampTzPlusSeconds, wakeup, wal_receiver_status_interval, wal_receiver_timeout, WALRCV_WAKEUP_HSFEEDBACK, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_REPLY, and WALRCV_WAKEUP_TERMINATE.

Referenced by WalReceiverMain(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ WalRcvDie()

static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 781 of file walreceiver.c.

782{
783 WalRcvData *walrcv = WalRcv;
784 TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
785
786 Assert(*startpointTLI_p != 0);
787
788 /* Ensure that all WAL records received are flushed to disk */
789 XLogWalRcvFlush(true, *startpointTLI_p);
790
791 /* Mark ourselves inactive in shared memory */
792 SpinLockAcquire(&walrcv->mutex);
794 walrcv->walRcvState == WALRCV_RESTARTING ||
795 walrcv->walRcvState == WALRCV_STARTING ||
796 walrcv->walRcvState == WALRCV_WAITING ||
797 walrcv->walRcvState == WALRCV_STOPPING);
798 Assert(walrcv->pid == MyProcPid);
799 walrcv->walRcvState = WALRCV_STOPPED;
800 walrcv->pid = 0;
801 walrcv->procno = INVALID_PROC_NUMBER;
802 walrcv->ready_to_display = false;
803 SpinLockRelease(&walrcv->mutex);
804
806
807 /* Terminate the connection gracefully. */
808 if (wrconn != NULL)
810
811 /* Wake up the startup process to notice promptly that we're gone */
813}
void ConditionVariableBroadcast(ConditionVariable *cv)
int MyProcPid
Definition: globals.c:48
Assert(PointerIsAligned(start, uint64))
void * arg
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:317
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
ProcNumber procno
Definition: walreceiver.h:67
ConditionVariable walRcvStoppedCV
Definition: walreceiver.h:72
static WalReceiverConn * wrconn
Definition: walreceiver.c:93
static void XLogWalRcvFlush(bool dying, TimeLineID tli)
Definition: walreceiver.c:985
@ WALRCV_STARTING
Definition: walreceiver.h:48
@ WALRCV_STOPPED
Definition: walreceiver.h:47
@ WALRCV_RESTARTING
Definition: walreceiver.h:52
@ WALRCV_STREAMING
Definition: walreceiver.h:50
@ WALRCV_WAITING
Definition: walreceiver.h:51
@ WALRCV_STOPPING
Definition: walreceiver.h:53
#define walrcv_disconnect(conn)
Definition: walreceiver.h:467
void WakeupRecovery(void)

References arg, Assert(), ConditionVariableBroadcast(), DatumGetPointer(), INVALID_PROC_NUMBER, WalRcvData::mutex, MyProcPid, WalRcvData::pid, WalRcvData::procno, WalRcvData::ready_to_display, SpinLockAcquire, SpinLockRelease, WakeupRecovery(), WalRcv, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, wrconn, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

◆ WalRcvFetchTimeLineHistoryFiles()

static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 725 of file walreceiver.c.

726{
727 TimeLineID tli;
728
729 for (tli = first; tli <= last; tli++)
730 {
731 /* there's no history file for timeline 1 */
732 if (tli != 1 && !existsTimeLineHistory(tli))
733 {
734 char *fname;
735 char *content;
736 int len;
737 char expectedfname[MAXFNAMELEN];
738
739 ereport(LOG,
740 (errmsg("fetching timeline history file for timeline %u from primary server",
741 tli)));
742
743 walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
744
745 /*
746 * Check that the filename on the primary matches what we
747 * calculated ourselves. This is just a sanity check, it should
748 * always match.
749 */
750 TLHistoryFileName(expectedfname, tli);
751 if (strcmp(fname, expectedfname) != 0)
753 (errcode(ERRCODE_PROTOCOL_VIOLATION),
754 errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
755 tli)));
756
757 /*
758 * Write the file to pg_wal.
759 */
760 writeTimeLineHistoryFile(tli, content, len);
761
762 /*
763 * Mark the streamed history file as ready for archiving if
764 * archive_mode is always.
765 */
768 else
769 XLogArchiveNotify(fname);
770
771 pfree(fname);
772 pfree(content);
773 }
774 }
775}
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition: timeline.c:463
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition: timeline.c:222
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1158
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define LOG
Definition: elog.h:31
#define ereport(elevel,...)
Definition: elog.h:149
const void size_t len
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
Definition: walreceiver.h:449
int XLogArchiveMode
Definition: xlog.c:119
@ ARCHIVE_MODE_ALWAYS
Definition: xlog.h:67
#define MAXFNAMELEN
static void TLHistoryFileName(char *fname, TimeLineID tli)
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:510
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:444

References ARCHIVE_MODE_ALWAYS, ereport, errcode(), errmsg(), errmsg_internal(), ERROR, existsTimeLineHistory(), len, LOG, MAXFNAMELEN, pfree(), TLHistoryFileName(), walrcv_readtimelinehistoryfile, wrconn, writeTimeLineHistoryFile(), XLogArchiveForceDone(), XLogArchiveMode, and XLogArchiveNotify().

Referenced by WalReceiverMain().

◆ WalRcvForceReply()

void WalRcvForceReply ( void  )

Definition at line 1350 of file walreceiver.c.

1351{
1352 ProcNumber procno;
1353
1354 WalRcv->force_reply = true;
1355 /* fetching the proc number is probably atomic, but don't rely on it */
1357 procno = WalRcv->procno;
1359 if (procno != INVALID_PROC_NUMBER)
1360 SetLatch(&GetPGProcByNumber(procno)->procLatch);
1361}
void SetLatch(Latch *latch)
Definition: latch.c:288
#define GetPGProcByNumber(n)
Definition: proc.h:424
int ProcNumber
Definition: procnumber.h:24
sig_atomic_t force_reply
Definition: walreceiver.h:162

References WalRcvData::force_reply, GetPGProcByNumber, INVALID_PROC_NUMBER, WalRcvData::mutex, WalRcvData::procno, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by ApplyWalRecord(), and WaitForWALToBecomeAvailable().

◆ WalRcvGetStateString()

static const char * WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1368 of file walreceiver.c.

1369{
1370 switch (state)
1371 {
1372 case WALRCV_STOPPED:
1373 return "stopped";
1374 case WALRCV_STARTING:
1375 return "starting";
1376 case WALRCV_STREAMING:
1377 return "streaming";
1378 case WALRCV_WAITING:
1379 return "waiting";
1380 case WALRCV_RESTARTING:
1381 return "restarting";
1382 case WALRCV_STOPPING:
1383 return "stopping";
1384 }
1385 return "UNKNOWN";
1386}

References WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

◆ WalRcvWaitForStartPosition()

static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 645 of file walreceiver.c.

646{
647 WalRcvData *walrcv = WalRcv;
648 int state;
649
650 SpinLockAcquire(&walrcv->mutex);
651 state = walrcv->walRcvState;
652 if (state != WALRCV_STREAMING)
653 {
654 SpinLockRelease(&walrcv->mutex);
655 if (state == WALRCV_STOPPING)
656 proc_exit(0);
657 else
658 elog(FATAL, "unexpected walreceiver state");
659 }
660 walrcv->walRcvState = WALRCV_WAITING;
662 walrcv->receiveStartTLI = 0;
663 SpinLockRelease(&walrcv->mutex);
664
665 set_ps_display("idle");
666
667 /*
668 * nudge startup process to notice that we've stopped streaming and are
669 * now waiting for instructions.
670 */
672 for (;;)
673 {
675
677
678 SpinLockAcquire(&walrcv->mutex);
680 walrcv->walRcvState == WALRCV_WAITING ||
681 walrcv->walRcvState == WALRCV_STOPPING);
682 if (walrcv->walRcvState == WALRCV_RESTARTING)
683 {
684 /*
685 * No need to handle changes in primary_conninfo or
686 * primary_slot_name here. Startup process will signal us to
687 * terminate in case those change.
688 */
689 *startpoint = walrcv->receiveStart;
690 *startpointTLI = walrcv->receiveStartTLI;
692 SpinLockRelease(&walrcv->mutex);
693 break;
694 }
695 if (walrcv->walRcvState == WALRCV_STOPPING)
696 {
697 /*
698 * We should've received SIGTERM if the startup process wants us
699 * to die, but might as well check it here too.
700 */
701 SpinLockRelease(&walrcv->mutex);
702 exit(1);
703 }
704 SpinLockRelease(&walrcv->mutex);
705
707 WAIT_EVENT_WAL_RECEIVER_WAIT_START);
708 }
709
711 {
712 char activitymsg[50];
713
714 snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
715 LSN_FORMAT_ARGS(*startpoint));
716 set_ps_display(activitymsg);
717 }
718}
#define FATAL
Definition: elog.h:41
struct Latch * MyLatch
Definition: globals.c:64
void proc_exit(int code)
Definition: ipc.c:104
void ResetLatch(Latch *latch)
Definition: latch.c:372
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:172
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
#define snprintf
Definition: port.h:239
bool update_process_title
Definition: ps_status.c:31
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
#define WL_EXIT_ON_PM_DEATH
Definition: waiteventset.h:39
#define WL_LATCH_SET
Definition: waiteventset.h:34
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert(), CHECK_FOR_INTERRUPTS, elog, FATAL, InvalidXLogRecPtr, LSN_FORMAT_ARGS, WalRcvData::mutex, MyLatch, proc_exit(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, ResetLatch(), set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by WalReceiverMain().

◆ WalReceiverMain()

void WalReceiverMain ( const void *  startup_data,
size_t  startup_data_len 
)

Definition at line 152 of file walreceiver.c.

153{
154 char conninfo[MAXCONNINFO];
155 char *tmp_conninfo;
156 char slotname[NAMEDATALEN];
157 bool is_temp_slot;
158 XLogRecPtr startpoint;
159 TimeLineID startpointTLI;
160 TimeLineID primaryTLI;
161 bool first_stream;
162 WalRcvData *walrcv;
164 char *err;
165 char *sender_host = NULL;
166 int sender_port = 0;
167 char *appname;
168
169 Assert(startup_data_len == 0);
170
173
174 /*
175 * WalRcv should be set up already (if we are a backend, we inherit this
176 * by fork() or EXEC_BACKEND mechanism from the postmaster).
177 */
178 walrcv = WalRcv;
179 Assert(walrcv != NULL);
180
181 /*
182 * Mark walreceiver as running in shared memory.
183 *
184 * Do this as early as possible, so that if we fail later on, we'll set
185 * state to STOPPED. If we die before this, the startup process will keep
186 * waiting for us to start up, until it times out.
187 */
188 SpinLockAcquire(&walrcv->mutex);
189 Assert(walrcv->pid == 0);
190 switch (walrcv->walRcvState)
191 {
192 case WALRCV_STOPPING:
193 /* If we've already been requested to stop, don't start up. */
194 walrcv->walRcvState = WALRCV_STOPPED;
195 /* fall through */
196
197 case WALRCV_STOPPED:
198 SpinLockRelease(&walrcv->mutex);
200 proc_exit(1);
201 break;
202
203 case WALRCV_STARTING:
204 /* The usual case */
205 break;
206
207 case WALRCV_WAITING:
208 case WALRCV_STREAMING:
210 default:
211 /* Shouldn't happen */
212 SpinLockRelease(&walrcv->mutex);
213 elog(PANIC, "walreceiver still running according to shared memory state");
214 }
215 /* Advertise our PID so that the startup process can kill us */
216 walrcv->pid = MyProcPid;
218
219 /* Fetch information required to start streaming */
220 walrcv->ready_to_display = false;
221 strlcpy(conninfo, walrcv->conninfo, MAXCONNINFO);
222 strlcpy(slotname, walrcv->slotname, NAMEDATALEN);
223 is_temp_slot = walrcv->is_temp_slot;
224 startpoint = walrcv->receiveStart;
225 startpointTLI = walrcv->receiveStartTLI;
226
227 /*
228 * At most one of is_temp_slot and slotname can be set; otherwise,
229 * RequestXLogStreaming messed up.
230 */
231 Assert(!is_temp_slot || (slotname[0] == '\0'));
232
233 /* Initialise to a sanish value */
235 walrcv->lastMsgSendTime =
236 walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
237
238 /* Report our proc number so that others can wake us up */
239 walrcv->procno = MyProcNumber;
240
241 SpinLockRelease(&walrcv->mutex);
242
244
245 /* Arrange to clean up at walreceiver exit */
246 on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
247
248 /* Properly accept or ignore signals the postmaster might send us */
249 pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
250 * file */
251 pqsignal(SIGINT, SIG_IGN);
252 pqsignal(SIGTERM, die); /* request shutdown */
253 /* SIGQUIT handler was already set up by InitPostmasterChild */
254 pqsignal(SIGALRM, SIG_IGN);
255 pqsignal(SIGPIPE, SIG_IGN);
257 pqsignal(SIGUSR2, SIG_IGN);
258
259 /* Reset some signals that are accepted by postmaster but not here */
260 pqsignal(SIGCHLD, SIG_DFL);
261
262 /* Load the libpq-specific functions */
263 load_file("libpqwalreceiver", false);
264 if (WalReceiverFunctions == NULL)
265 elog(ERROR, "libpqwalreceiver didn't initialize correctly");
266
267 /* Unblock signals (they were blocked when the postmaster forked us) */
268 sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
269
270 /* Establish the connection to the primary for XLOG streaming */
271 appname = cluster_name[0] ? cluster_name : "walreceiver";
272 wrconn = walrcv_connect(conninfo, true, false, false, appname, &err);
273 if (!wrconn)
275 (errcode(ERRCODE_CONNECTION_FAILURE),
276 errmsg("streaming replication receiver \"%s\" could not connect to the primary server: %s",
277 appname, err)));
278
279 /*
280 * Save user-visible connection string. This clobbers the original
281 * conninfo, for security. Also save host and port of the sender server
282 * this walreceiver is connected to.
283 */
284 tmp_conninfo = walrcv_get_conninfo(wrconn);
285 walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
286 SpinLockAcquire(&walrcv->mutex);
287 memset(walrcv->conninfo, 0, MAXCONNINFO);
288 if (tmp_conninfo)
289 strlcpy(walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
290
291 memset(walrcv->sender_host, 0, NI_MAXHOST);
292 if (sender_host)
293 strlcpy(walrcv->sender_host, sender_host, NI_MAXHOST);
294
295 walrcv->sender_port = sender_port;
296 walrcv->ready_to_display = true;
297 SpinLockRelease(&walrcv->mutex);
298
299 if (tmp_conninfo)
300 pfree(tmp_conninfo);
301
302 if (sender_host)
303 pfree(sender_host);
304
305 first_stream = true;
306 for (;;)
307 {
308 char *primary_sysid;
309 char standby_sysid[32];
311
312 /*
313 * Check that we're connected to a valid server using the
314 * IDENTIFY_SYSTEM replication command.
315 */
316 primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
317
318 snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
320 if (strcmp(primary_sysid, standby_sysid) != 0)
321 {
323 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
324 errmsg("database system identifier differs between the primary and standby"),
325 errdetail("The primary's identifier is %s, the standby's identifier is %s.",
326 primary_sysid, standby_sysid)));
327 }
328
329 /*
330 * Confirm that the current timeline of the primary is the same or
331 * ahead of ours.
332 */
333 if (primaryTLI < startpointTLI)
335 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
336 errmsg("highest timeline %u of the primary is behind recovery timeline %u",
337 primaryTLI, startpointTLI)));
338
339 /*
340 * Get any missing history files. We do this always, even when we're
341 * not interested in that timeline, so that if we're promoted to
342 * become the primary later on, we don't select the same timeline that
343 * was already used in the current primary. This isn't bullet-proof -
344 * you'll need some external software to manage your cluster if you
345 * need to ensure that a unique timeline id is chosen in every case,
346 * but let's avoid the confusion of timeline id collisions where we
347 * can.
348 */
349 WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
350
351 /*
352 * Create temporary replication slot if requested, and update slot
353 * name in shared memory. (Note the slot name cannot already be set
354 * in this case.)
355 */
356 if (is_temp_slot)
357 {
358 snprintf(slotname, sizeof(slotname),
359 "pg_walreceiver_%lld",
360 (long long int) walrcv_get_backend_pid(wrconn));
361
362 walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL);
363
364 SpinLockAcquire(&walrcv->mutex);
365 strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
366 SpinLockRelease(&walrcv->mutex);
367 }
368
369 /*
370 * Start streaming.
371 *
372 * We'll try to start at the requested starting point and timeline,
373 * even if it's different from the server's latest timeline. In case
374 * we've already reached the end of the old timeline, the server will
375 * finish the streaming immediately, and we will go back to await
376 * orders from the startup process. If recovery_target_timeline is
377 * 'latest', the startup process will scan pg_wal and find the new
378 * history file, bump recovery target timeline, and ask us to restart
379 * on the new timeline.
380 */
381 options.logical = false;
382 options.startpoint = startpoint;
383 options.slotname = slotname[0] != '\0' ? slotname : NULL;
384 options.proto.physical.startpointTLI = startpointTLI;
386 {
387 if (first_stream)
388 ereport(LOG,
389 (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
390 LSN_FORMAT_ARGS(startpoint), startpointTLI)));
391 else
392 ereport(LOG,
393 (errmsg("restarted WAL streaming at %X/%X on timeline %u",
394 LSN_FORMAT_ARGS(startpoint), startpointTLI)));
395 first_stream = false;
396
397 /* Initialize LogstreamResult and buffers for processing messages */
400
401 /* Initialize nap wakeup times. */
403 for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
405
406 /* Send initial reply/feedback messages. */
407 XLogWalRcvSendReply(true, false);
409
410 /* Loop until end-of-streaming or error */
411 for (;;)
412 {
413 char *buf;
414 int len;
415 bool endofwal = false;
416 pgsocket wait_fd = PGINVALID_SOCKET;
417 int rc;
418 TimestampTz nextWakeup;
419 long nap;
420
421 /*
422 * Exit walreceiver if we're not in recovery. This should not
423 * happen, but cross-check the status here.
424 */
425 if (!RecoveryInProgress())
427 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
428 errmsg("cannot continue WAL streaming, recovery has already ended")));
429
430 /* Process any requests or signals received recently */
432
434 {
435 ConfigReloadPending = false;
437 /* recompute wakeup times */
439 for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
442 }
443
444 /* See if we can read data immediately */
445 len = walrcv_receive(wrconn, &buf, &wait_fd);
446 if (len != 0)
447 {
448 /*
449 * Process the received data, and any subsequent data we
450 * can read without blocking.
451 */
452 for (;;)
453 {
454 if (len > 0)
455 {
456 /*
457 * Something was received from primary, so adjust
458 * the ping and terminate wakeup times.
459 */
462 now);
464 XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
465 startpointTLI);
466 }
467 else if (len == 0)
468 break;
469 else if (len < 0)
470 {
471 ereport(LOG,
472 (errmsg("replication terminated by primary server"),
473 errdetail("End of WAL reached on timeline %u at %X/%X.",
474 startpointTLI,
476 endofwal = true;
477 break;
478 }
479 len = walrcv_receive(wrconn, &buf, &wait_fd);
480 }
481
482 /* Let the primary know that we received some data. */
483 XLogWalRcvSendReply(false, false);
484
485 /*
486 * If we've written some records, flush them to disk and
487 * let the startup process and primary server know about
488 * them.
489 */
490 XLogWalRcvFlush(false, startpointTLI);
491 }
492
493 /* Check if we need to exit the streaming loop. */
494 if (endofwal)
495 break;
496
497 /* Find the soonest wakeup time, to limit our nap. */
498 nextWakeup = TIMESTAMP_INFINITY;
499 for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
500 nextWakeup = Min(wakeup[i], nextWakeup);
501
502 /* Calculate the nap time, clamping as necessary. */
504 nap = TimestampDifferenceMilliseconds(now, nextWakeup);
505
506 /*
507 * Ideally we would reuse a WaitEventSet object repeatedly
508 * here to avoid the overheads of WaitLatchOrSocket on epoll
509 * systems, but we can't be sure that libpq (or any other
510 * walreceiver implementation) has the same socket (even if
511 * the fd is the same number, it may have been closed and
512 * reopened since the last time). In future, if there is a
513 * function for removing sockets from WaitEventSet, then we
514 * could add and remove just the socket each time, potentially
515 * avoiding some system calls.
516 */
517 Assert(wait_fd != PGINVALID_SOCKET);
521 wait_fd,
522 nap,
523 WAIT_EVENT_WAL_RECEIVER_MAIN);
524 if (rc & WL_LATCH_SET)
525 {
528
529 if (walrcv->force_reply)
530 {
531 /*
532 * The recovery process has asked us to send apply
533 * feedback now. Make sure the flag is really set to
534 * false in shared memory before sending the reply, so
535 * we don't miss a new request for a reply.
536 */
537 walrcv->force_reply = false;
539 XLogWalRcvSendReply(true, false);
540 }
541 }
542 if (rc & WL_TIMEOUT)
543 {
544 /*
545 * We didn't receive anything new. If we haven't heard
546 * anything from the server for more than
547 * wal_receiver_timeout / 2, ping the server. Also, if
548 * it's been longer than wal_receiver_status_interval
549 * since the last update we sent, send a status update to
550 * the primary anyway, to report any progress in applying
551 * WAL.
552 */
553 bool requestReply = false;
554
555 /*
556 * Report pending statistics to the cumulative stats
557 * system. This location is useful for the report as it
558 * is not within a tight loop in the WAL receiver, to
559 * avoid bloating pgstats with requests, while also making
560 * sure that the reports happen each time a status update
561 * is sent.
562 */
563 pgstat_report_wal(false);
564
565 /*
566 * Check if time since last receive from primary has
567 * reached the configured limit.
568 */
572 (errcode(ERRCODE_CONNECTION_FAILURE),
573 errmsg("terminating walreceiver due to timeout")));
574
575 /*
576 * If we didn't receive anything new for half of receiver
577 * replication timeout, then ping the server.
578 */
580 {
581 requestReply = true;
583 }
584
585 XLogWalRcvSendReply(requestReply, requestReply);
587 }
588 }
589
590 /*
591 * The backend finished streaming. Exit streaming COPY-mode from
592 * our side, too.
593 */
594 walrcv_endstreaming(wrconn, &primaryTLI);
595
596 /*
597 * If the server had switched to a new timeline that we didn't
598 * know about when we began streaming, fetch its timeline history
599 * file now.
600 */
601 WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
602 }
603 else
604 ereport(LOG,
605 (errmsg("primary server contains no more WAL on requested timeline %u",
606 startpointTLI)));
607
608 /*
609 * End of WAL reached on the requested timeline. Close the last
610 * segment, and await for new orders from the startup process.
611 */
612 if (recvFile >= 0)
613 {
614 char xlogfname[MAXFNAMELEN];
615
616 XLogWalRcvFlush(false, startpointTLI);
618 if (close(recvFile) != 0)
621 errmsg("could not close WAL segment %s: %m",
622 xlogfname)));
623
624 /*
625 * Create .done file forcibly to prevent the streamed segment from
626 * being archived later.
627 */
629 XLogArchiveForceDone(xlogfname);
630 else
631 XLogArchiveNotify(xlogfname);
632 }
633 recvFile = -1;
634
635 elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
636 WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
637 }
638 /* not reached */
639}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:485
#define pg_memory_barrier()
Definition: atomics.h:143
void AuxiliaryProcessMainCommon(void)
Definition: auxprocess.c:39
sigset_t UnBlockSig
Definition: pqsignal.c:22
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1757
#define Min(x, y)
Definition: c.h:975
#define UINT64_FORMAT
Definition: c.h:521
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:134
int errcode_for_file_access(void)
Definition: elog.c:877
int errdetail(const char *fmt,...)
Definition: elog.c:1204
#define PANIC
Definition: elog.h:42
#define DEBUG1
Definition: elog.h:30
void err(int eval, const char *fmt,...)
Definition: err.c:43
ProcNumber MyProcNumber
Definition: globals.c:91
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
@ PGC_SIGHUP
Definition: guc.h:75
char * cluster_name
Definition: guc_tables.c:554
#define close(a)
Definition: win32.h:12
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:65
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
int i
Definition: isn.c:77
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:221
@ B_WAL_RECEIVER
Definition: miscadmin.h:366
BackendType MyBackendType
Definition: miscinit.c:64
static char ** options
#define die(msg)
static char * buf
Definition: pg_test_fsync.c:72
void pgstat_report_wal(bool force)
Definition: pgstat_wal.c:46
#define pqsignal
Definition: port.h:531
int pgsocket
Definition: port.h:29
#define PGINVALID_SOCKET
Definition: port.h:31
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:327
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:673
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
bool is_temp_slot
Definition: walreceiver.h:142
#define WL_SOCKET_READABLE
Definition: waiteventset.h:35
#define WL_TIMEOUT
Definition: waiteventset.h:37
#define NUM_WALRCV_WAKEUPS
Definition: walreceiver.c:124
static StringInfoData reply_message
Definition: walreceiver.c:132
static int recvFile
Definition: walreceiver.c:101
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
Definition: walreceiver.c:725
static TimeLineID recvFileTLI
Definition: walreceiver.c:102
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:94
static XLogSegNo recvSegNo
Definition: walreceiver.c:103
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1161
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
Definition: walreceiver.c:645
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
Definition: walreceiver.c:819
static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
Definition: walreceiver.c:1309
static void WalRcvDie(int code, Datum arg)
Definition: walreceiver.c:781
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1092
static struct @19 LogstreamResult
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:451
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:435
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
Definition: walreceiver.h:441
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
Definition: walreceiver.h:459
#define walrcv_get_conninfo(conn)
Definition: walreceiver.h:439
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:453
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:443
#define walrcv_get_backend_pid(conn)
Definition: walreceiver.h:463
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:455
#define SIGCHLD
Definition: win32_port.h:168
#define SIGHUP
Definition: win32_port.h:158
#define SIGPIPE
Definition: win32_port.h:163
#define SIGUSR1
Definition: win32_port.h:170
#define SIGALRM
Definition: win32_port.h:164
#define SIGUSR2
Definition: win32_port.h:171
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4734
bool RecoveryInProgress(void)
Definition: xlog.c:6522
int wal_segment_size
Definition: xlog.c:143
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References ARCHIVE_MODE_ALWAYS, Assert(), AuxiliaryProcessMainCommon(), B_WAL_RECEIVER, buf, CHECK_FOR_INTERRUPTS, close, cluster_name, ConditionVariableBroadcast(), ConfigReloadPending, WalRcvData::conninfo, DEBUG1, die, elog, ereport, err(), errcode(), errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, WalRcvData::force_reply, GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), i, initStringInfo(), WalRcvData::is_temp_slot, WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEndTime, len, load_file(), LOG, LogstreamResult, LSN_FORMAT_ARGS, MAXCONNINFO, MAXFNAMELEN, Min, WalRcvData::mutex, MyBackendType, MyLatch, MyProcNumber, MyProcPid, NAMEDATALEN, now(), NUM_WALRCV_WAKEUPS, on_shmem_exit(), options, PANIC, pfree(), pg_atomic_write_u64(), pg_memory_barrier, PGC_SIGHUP, PGINVALID_SOCKET, pgstat_report_wal(), WalRcvData::pid, PointerGetDatum(), pqsignal, proc_exit(), ProcessConfigFile(), WalRcvData::procno, procsignal_sigusr1_handler(), WalRcvData::ready_to_display, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, reply_message, ResetLatch(), WalRcvData::sender_host, WalRcvData::sender_port, SIGALRM, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, WalRcvData::slotname, snprintf, SpinLockAcquire, SpinLockRelease, strlcpy(), TIMESTAMP_INFINITY, TimestampDifferenceMilliseconds(), UINT64_FORMAT, UnBlockSig, WaitLatchOrSocket(), wakeup, wal_segment_size, WalRcv, walrcv_connect, walrcv_create_slot, walrcv_endstreaming, walrcv_get_backend_pid, walrcv_get_conninfo, walrcv_get_senderinfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_TERMINATE, WalRcvComputeNextWakeup(), WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, WalRcvWaitForStartPosition(), WalReceiverFunctions, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_TIMEOUT, wrconn, WalRcvData::writtenUpto, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName(), XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ XLogWalRcvClose()

static void XLogWalRcvClose ( XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 1040 of file walreceiver.c.

1041{
1042 char xlogfname[MAXFNAMELEN];
1043
1045 Assert(tli != 0);
1046
1047 /*
1048 * fsync() and close current file before we switch to next one. We would
1049 * otherwise have to reopen this file to fsync it later
1050 */
1051 XLogWalRcvFlush(false, tli);
1052
1054
1055 /*
1056 * XLOG segment files will be re-read by recovery in startup process soon,
1057 * so we don't advise the OS to release cache pages associated with the
1058 * file like XLogFileClose() does.
1059 */
1060 if (close(recvFile) != 0)
1061 ereport(PANIC,
1063 errmsg("could not close WAL segment %s: %m",
1064 xlogfname)));
1065
1066 /*
1067 * Create .done file forcibly to prevent the streamed segment from being
1068 * archived later.
1069 */
1071 XLogArchiveForceDone(xlogfname);
1072 else
1073 XLogArchiveNotify(xlogfname);
1074
1075 recvFile = -1;
1076}
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

References ARCHIVE_MODE_ALWAYS, Assert(), close, ereport, errcode_for_file_access(), errmsg(), MAXFNAMELEN, PANIC, recvFile, recvFileTLI, recvSegNo, wal_segment_size, XLByteInSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName(), and XLogWalRcvFlush().

Referenced by XLogWalRcvWrite().

◆ XLogWalRcvFlush()

static void XLogWalRcvFlush ( bool  dying,
TimeLineID  tli 
)
static

Definition at line 985 of file walreceiver.c.

986{
987 Assert(tli != 0);
988
989 if (LogstreamResult.Flush < LogstreamResult.Write)
990 {
991 WalRcvData *walrcv = WalRcv;
992
994
995 LogstreamResult.Flush = LogstreamResult.Write;
996
997 /* Update shared-memory status */
998 SpinLockAcquire(&walrcv->mutex);
999 if (walrcv->flushedUpto < LogstreamResult.Flush)
1000 {
1001 walrcv->latestChunkStart = walrcv->flushedUpto;
1002 walrcv->flushedUpto = LogstreamResult.Flush;
1003 walrcv->receivedTLI = tli;
1004 }
1005 SpinLockRelease(&walrcv->mutex);
1006
1007 /* Signal the startup process and walsender that new WAL has arrived */
1010 WalSndWakeup(true, false);
1011
1012 /* Report XLOG streaming progress in PS display */
1014 {
1015 char activitymsg[50];
1016
1017 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1019 set_ps_display(activitymsg);
1020 }
1021
1022 /* Also let the primary know that we made some progress */
1023 if (!dying)
1024 {
1025 XLogWalRcvSendReply(false, false);
1027 }
1028 }
1029}
XLogRecPtr latestChunkStart
Definition: walreceiver.h:105
#define AllowCascadeReplication()
Definition: walreceiver.h:40
void WalSndWakeup(bool physical, bool logical)
Definition: walsender.c:3671
void issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:8871

References AllowCascadeReplication, Assert(), WalRcvData::flushedUpto, issue_xlog_fsync(), WalRcvData::latestChunkStart, LogstreamResult, LSN_FORMAT_ARGS, WalRcvData::mutex, WalRcvData::receivedTLI, recvFile, recvSegNo, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvClose().

◆ XLogWalRcvProcessMsg()

static void XLogWalRcvProcessMsg ( unsigned char  type,
char *  buf,
Size  len,
TimeLineID  tli 
)
static

Definition at line 819 of file walreceiver.c.

820{
821 int hdrlen;
822 XLogRecPtr dataStart;
823 XLogRecPtr walEnd;
824 TimestampTz sendTime;
825 bool replyRequested;
826
827 switch (type)
828 {
829 case 'w': /* WAL records */
830 {
831 StringInfoData incoming_message;
832
833 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
834 if (len < hdrlen)
836 (errcode(ERRCODE_PROTOCOL_VIOLATION),
837 errmsg_internal("invalid WAL message received from primary")));
838
839 /* initialize a StringInfo with the given buffer */
840 initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
841
842 /* read the fields */
843 dataStart = pq_getmsgint64(&incoming_message);
844 walEnd = pq_getmsgint64(&incoming_message);
845 sendTime = pq_getmsgint64(&incoming_message);
846 ProcessWalSndrMessage(walEnd, sendTime);
847
848 buf += hdrlen;
849 len -= hdrlen;
850 XLogWalRcvWrite(buf, len, dataStart, tli);
851 break;
852 }
853 case 'k': /* Keepalive */
854 {
855 StringInfoData incoming_message;
856
857 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
858 if (len != hdrlen)
860 (errcode(ERRCODE_PROTOCOL_VIOLATION),
861 errmsg_internal("invalid keepalive message received from primary")));
862
863 /* initialize a StringInfo with the given buffer */
864 initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
865
866 /* read the fields */
867 walEnd = pq_getmsgint64(&incoming_message);
868 sendTime = pq_getmsgint64(&incoming_message);
869 replyRequested = pq_getmsgbyte(&incoming_message);
870
871 ProcessWalSndrMessage(walEnd, sendTime);
872
873 /* If the primary requested a reply, send one immediately */
874 if (replyRequested)
875 XLogWalRcvSendReply(true, false);
876 break;
877 }
878 default:
880 (errcode(ERRCODE_PROTOCOL_VIOLATION),
881 errmsg_internal("invalid replication message type %d",
882 type)));
883 }
884}
int64_t int64
Definition: c.h:499
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:453
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition: stringinfo.h:157
const char * type
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
Definition: walreceiver.c:1257
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
Definition: walreceiver.c:890

References buf, ereport, errcode(), errmsg_internal(), ERROR, initReadOnlyStringInfo(), len, pq_getmsgbyte(), pq_getmsgint64(), ProcessWalSndrMessage(), type, XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

◆ XLogWalRcvSendHSFeedback()

static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1161 of file walreceiver.c.

1162{
1164 FullTransactionId nextFullXid;
1165 TransactionId nextXid;
1166 uint32 xmin_epoch,
1167 catalog_xmin_epoch;
1168 TransactionId xmin,
1169 catalog_xmin;
1170
1171 /* initially true so we always send at least one feedback message */
1172 static bool primary_has_standby_xmin = true;
1173
1174 /*
1175 * If the user doesn't want status to be reported to the primary, be sure
1176 * to exit before doing anything at all.
1177 */
1179 !primary_has_standby_xmin)
1180 return;
1181
1182 /* Get current timestamp. */
1184
1185 /* Send feedback at most once per wal_receiver_status_interval. */
1186 if (!immed && now < wakeup[WALRCV_WAKEUP_HSFEEDBACK])
1187 return;
1188
1189 /* Make sure we wake up when it's time to send feedback again. */
1191
1192 /*
1193 * If Hot Standby is not yet accepting connections there is nothing to
1194 * send. Check this after the interval has expired to reduce number of
1195 * calls.
1196 *
1197 * Bailing out here also ensures that we don't send feedback until we've
1198 * read our own replication slot state, so we don't tell the primary to
1199 * discard needed xmin or catalog_xmin from any slots that may exist on
1200 * this replica.
1201 */
1202 if (!HotStandbyActive())
1203 return;
1204
1205 /*
1206 * Make the expensive call to get the oldest xmin once we are certain
1207 * everything else has been checked.
1208 */
1210 {
1211 GetReplicationHorizons(&xmin, &catalog_xmin);
1212 }
1213 else
1214 {
1215 xmin = InvalidTransactionId;
1216 catalog_xmin = InvalidTransactionId;
1217 }
1218
1219 /*
1220 * Get epoch and adjust if nextXid and oldestXmin are different sides of
1221 * the epoch boundary.
1222 */
1223 nextFullXid = ReadNextFullTransactionId();
1224 nextXid = XidFromFullTransactionId(nextFullXid);
1225 xmin_epoch = EpochFromFullTransactionId(nextFullXid);
1226 catalog_xmin_epoch = xmin_epoch;
1227 if (nextXid < xmin)
1228 xmin_epoch--;
1229 if (nextXid < catalog_xmin)
1230 catalog_xmin_epoch--;
1231
1232 elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1233 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1234
1235 /* Construct the message and send it. */
1240 pq_sendint32(&reply_message, xmin_epoch);
1241 pq_sendint32(&reply_message, catalog_xmin);
1242 pq_sendint32(&reply_message, catalog_xmin_epoch);
1244 if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1245 primary_has_standby_xmin = true;
1246 else
1247 primary_has_standby_xmin = false;
1248}
uint32_t uint32
Definition: c.h:502
uint32 TransactionId
Definition: c.h:623
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:144
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152
void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:2047
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:126
#define InvalidTransactionId
Definition: transam.h:31
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
#define TransactionIdIsValid(xid)
Definition: transam.h:41
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:288
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:457
bool HotStandbyActive(void)

References StringInfoData::data, DEBUG2, elog, EpochFromFullTransactionId, GetCurrentTimestamp(), GetReplicationHorizons(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), pq_sendbyte(), pq_sendint32(), pq_sendint64(), ReadNextFullTransactionId(), reply_message, resetStringInfo(), TransactionIdIsValid, wakeup, wal_receiver_status_interval, walrcv_send, WALRCV_WAKEUP_HSFEEDBACK, WalRcvComputeNextWakeup(), wrconn, and XidFromFullTransactionId.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

◆ XLogWalRcvSendReply()

static void XLogWalRcvSendReply ( bool  force,
bool  requestReply 
)
static

Definition at line 1092 of file walreceiver.c.

1093{
1094 static XLogRecPtr writePtr = 0;
1095 static XLogRecPtr flushPtr = 0;
1096 XLogRecPtr applyPtr;
1098
1099 /*
1100 * If the user doesn't want status to be reported to the primary, be sure
1101 * to exit before doing anything at all.
1102 */
1103 if (!force && wal_receiver_status_interval <= 0)
1104 return;
1105
1106 /* Get current timestamp. */
1108
1109 /*
1110 * We can compare the write and flush positions to the last message we
1111 * sent without taking any lock, but the apply position requires a spin
1112 * lock, so we don't check that unless something else has changed or 10
1113 * seconds have passed. This means that the apply WAL location will
1114 * appear, from the primary's point of view, to lag slightly, but since
1115 * this is only for reporting purposes and only on idle systems, that's
1116 * probably OK.
1117 */
1118 if (!force
1119 && writePtr == LogstreamResult.Write
1120 && flushPtr == LogstreamResult.Flush
1122 return;
1123
1124 /* Make sure we wake up when it's time to send another reply. */
1126
1127 /* Construct a new message */
1128 writePtr = LogstreamResult.Write;
1129 flushPtr = LogstreamResult.Flush;
1130 applyPtr = GetXLogReplayRecPtr(NULL);
1131
1134 pq_sendint64(&reply_message, writePtr);
1135 pq_sendint64(&reply_message, flushPtr);
1136 pq_sendint64(&reply_message, applyPtr);
1138 pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1139
1140 /* Send it */
1141 elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1142 LSN_FORMAT_ARGS(writePtr),
1143 LSN_FORMAT_ARGS(flushPtr),
1144 LSN_FORMAT_ARGS(applyPtr),
1145 requestReply ? " (reply requested)" : "");
1146
1148}

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), GetXLogReplayRecPtr(), StringInfoData::len, LogstreamResult, LSN_FORMAT_ARGS, now(), pq_sendbyte(), pq_sendint64(), reply_message, resetStringInfo(), wakeup, wal_receiver_status_interval, walrcv_send, WALRCV_WAKEUP_REPLY, WalRcvComputeNextWakeup(), and wrconn.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

◆ XLogWalRcvWrite()

static void XLogWalRcvWrite ( char *  buf,
Size  nbytes,
XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 890 of file walreceiver.c.

891{
892 int startoff;
893 int byteswritten;
895
896 Assert(tli != 0);
897
898 while (nbytes > 0)
899 {
900 int segbytes;
901
902 /* Close the current segment if it's completed */
903 if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
904 XLogWalRcvClose(recptr, tli);
905
906 if (recvFile < 0)
907 {
908 /* Create/use new log file */
911 recvFileTLI = tli;
912 }
913
914 /* Calculate the start offset of the received logs */
915 startoff = XLogSegmentOffset(recptr, wal_segment_size);
916
917 if (startoff + nbytes > wal_segment_size)
918 segbytes = wal_segment_size - startoff;
919 else
920 segbytes = nbytes;
921
922 /* OK to write the logs */
923 errno = 0;
924
925 /*
926 * Measure I/O timing to write WAL data, for pg_stat_io.
927 */
929
930 pgstat_report_wait_start(WAIT_EVENT_WAL_WRITE);
931 byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
933
935 IOOP_WRITE, start, 1, byteswritten);
936
937 if (byteswritten <= 0)
938 {
939 char xlogfname[MAXFNAMELEN];
940 int save_errno;
941
942 /* if write didn't set errno, assume no disk space */
943 if (errno == 0)
944 errno = ENOSPC;
945
946 save_errno = errno;
948 errno = save_errno;
951 errmsg("could not write to WAL segment %s "
952 "at offset %d, length %lu: %m",
953 xlogfname, startoff, (unsigned long) segbytes)));
954 }
955
956 /* Update state for write */
957 recptr += byteswritten;
958
959 nbytes -= byteswritten;
960 buf += byteswritten;
961
962 LogstreamResult.Write = recptr;
963 }
964
965 /* Update shared-memory status */
967
968 /*
969 * Close the current segment if it's fully written up in the last cycle of
970 * the loop, to create its archive notification file soon. Otherwise WAL
971 * archiving of the segment will be delayed until any data in the next
972 * segment is received and written.
973 */
974 if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
975 XLogWalRcvClose(recptr, tli);
976}
return str start
@ IOOBJECT_WAL
Definition: pgstat.h:276
@ IOCONTEXT_NORMAL
Definition: pgstat.h:286
@ IOOP_WRITE
Definition: pgstat.h:313
instr_time pgstat_prepare_io_time(bool track_io_guc)
Definition: pgstat_io.c:90
void pgstat_count_io_op_time(IOObject io_object, IOContext io_context, IOOp io_op, instr_time start_time, uint32 cnt, uint64 bytes)
Definition: pgstat_io.c:121
#define pg_pwrite
Definition: port.h:227
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:85
static void pgstat_report_wait_end(void)
Definition: wait_event.h:101
static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
Definition: walreceiver.c:1040
int XLogFileInit(XLogSegNo logsegno, TimeLineID logtli)
Definition: xlog.c:3519
bool track_wal_io_timing
Definition: xlog.c:137
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

References Assert(), buf, ereport, errcode_for_file_access(), errmsg(), IOCONTEXT_NORMAL, IOOBJECT_WAL, IOOP_WRITE, LogstreamResult, MAXFNAMELEN, PANIC, pg_atomic_write_u64(), pg_pwrite, pgstat_count_io_op_time(), pgstat_prepare_io_time(), pgstat_report_wait_end(), pgstat_report_wait_start(), recvFile, recvFileTLI, recvSegNo, start, track_wal_io_timing, wal_segment_size, WalRcv, WalRcvData::writtenUpto, XLByteInSeg, XLByteToSeg, XLogFileInit(), XLogFileName(), XLogSegmentOffset, and XLogWalRcvClose().

Referenced by XLogWalRcvProcessMsg().

Variable Documentation

◆ Flush

XLogRecPtr Flush

Definition at line 112 of file walreceiver.c.

Referenced by XLogWrite().

◆ hot_standby_feedback

bool hot_standby_feedback

◆ 

struct { ... } LogstreamResult

◆ recvFile

int recvFile = -1
static

Definition at line 101 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ recvFileTLI

TimeLineID recvFileTLI = 0
static

Definition at line 102 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), and XLogWalRcvWrite().

◆ recvSegNo

XLogSegNo recvSegNo = 0
static

Definition at line 103 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ reply_message

StringInfoData reply_message
static

◆ wakeup

◆ wal_receiver_status_interval

int wal_receiver_status_interval

◆ wal_receiver_timeout

int wal_receiver_timeout

◆ WalReceiverFunctions

WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 94 of file walreceiver.c.

Referenced by _PG_init(), and WalReceiverMain().

◆ wrconn

◆ Write

XLogRecPtr Write

Definition at line 111 of file walreceiver.c.

Referenced by XLogWrite().