PostgreSQL Source Code git master
walreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "access/xlogrecovery.h"
#include "access/xlogwait.h"
#include "catalog/pg_authid.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/auxprocess.h"
#include "postmaster/interrupt.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NUM_WALRCV_WAKEUPS   (WALRCV_WAKEUP_HSFEEDBACK + 1)
 

Typedefs

typedef enum WalRcvWakeupReason WalRcvWakeupReason
 

Enumerations

enum  WalRcvWakeupReason { WALRCV_WAKEUP_TERMINATE , WALRCV_WAKEUP_PING , WALRCV_WAKEUP_REPLY , WALRCV_WAKEUP_HSFEEDBACK }
 

Functions

static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len, TimeLineID tli)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvFlush (bool dying, TimeLineID tli)
 
static void XLogWalRcvClose (XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvSendReply (bool force, bool requestReply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
static void WalRcvComputeNextWakeup (WalRcvWakeupReason reason, TimestampTz now)
 
void WalReceiverMain (const void *startup_data, size_t startup_data_len)
 
void WalRcvForceReply (void)
 
static const char * WalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
struct {
   XLogRecPtr   Write
 
   XLogRecPtr   Flush
 
LogstreamResult
 
static TimestampTz wakeup [NUM_WALRCV_WAKEUPS]
 
static StringInfoData reply_message
 

Macro Definition Documentation

◆ NUM_WALRCV_WAKEUPS

#define NUM_WALRCV_WAKEUPS   (WALRCV_WAKEUP_HSFEEDBACK + 1)

Definition at line 125 of file walreceiver.c.

Typedef Documentation

◆ WalRcvWakeupReason

Enumeration Type Documentation

◆ WalRcvWakeupReason

Enumerator
WALRCV_WAKEUP_TERMINATE 
WALRCV_WAKEUP_PING 
WALRCV_WAKEUP_REPLY 
WALRCV_WAKEUP_HSFEEDBACK 

Definition at line 119 of file walreceiver.c.

120{
125#define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1)
WalRcvWakeupReason
Definition: walreceiver.c:120
@ WALRCV_WAKEUP_TERMINATE
Definition: walreceiver.c:121
@ WALRCV_WAKEUP_REPLY
Definition: walreceiver.c:123
@ WALRCV_WAKEUP_PING
Definition: walreceiver.c:122
@ WALRCV_WAKEUP_HSFEEDBACK
Definition: walreceiver.c:124

Function Documentation

◆ pg_stat_get_wal_receiver()

Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1411 of file walreceiver.c.

1412{
1413 TupleDesc tupdesc;
1414 Datum *values;
1415 bool *nulls;
1416 int pid;
1417 bool ready_to_display;
1419 XLogRecPtr receive_start_lsn;
1420 TimeLineID receive_start_tli;
1421 XLogRecPtr written_lsn;
1422 XLogRecPtr flushed_lsn;
1423 TimeLineID received_tli;
1424 TimestampTz last_send_time;
1425 TimestampTz last_receipt_time;
1426 XLogRecPtr latest_end_lsn;
1427 TimestampTz latest_end_time;
1428 char sender_host[NI_MAXHOST];
1429 int sender_port = 0;
1430 char slotname[NAMEDATALEN];
1431 char conninfo[MAXCONNINFO];
1432
1433 /* Take a lock to ensure value consistency */
1435 pid = (int) WalRcv->pid;
1436 ready_to_display = WalRcv->ready_to_display;
1438 receive_start_lsn = WalRcv->receiveStart;
1439 receive_start_tli = WalRcv->receiveStartTLI;
1440 flushed_lsn = WalRcv->flushedUpto;
1441 received_tli = WalRcv->receivedTLI;
1442 last_send_time = WalRcv->lastMsgSendTime;
1443 last_receipt_time = WalRcv->lastMsgReceiptTime;
1444 latest_end_lsn = WalRcv->latestWalEnd;
1445 latest_end_time = WalRcv->latestWalEndTime;
1446 strlcpy(slotname, WalRcv->slotname, sizeof(slotname));
1447 strlcpy(sender_host, WalRcv->sender_host, sizeof(sender_host));
1448 sender_port = WalRcv->sender_port;
1449 strlcpy(conninfo, WalRcv->conninfo, sizeof(conninfo));
1451
1452 /*
1453 * No WAL receiver (or not ready yet), just return a tuple with NULL
1454 * values
1455 */
1456 if (pid == 0 || !ready_to_display)
1458
1459 /*
1460 * Read "writtenUpto" without holding a spinlock. Note that it may not be
1461 * consistent with the other shared variables of the WAL receiver
1462 * protected by a spinlock, but this should not be used for data integrity
1463 * checks.
1464 */
1465 written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto);
1466
1467 /* determine result type */
1468 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1469 elog(ERROR, "return type must be a row type");
1470
1471 values = palloc0_array(Datum, tupdesc->natts);
1472 nulls = palloc0_array(bool, tupdesc->natts);
1473
1474 /* Fetch values */
1475 values[0] = Int32GetDatum(pid);
1476
1477 if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
1478 {
1479 /*
1480 * Only superusers and roles with privileges of pg_read_all_stats can
1481 * see details. Other users only get the pid value to know whether it
1482 * is a WAL receiver, but no details.
1483 */
1484 memset(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1485 }
1486 else
1487 {
1489
1490 if (!XLogRecPtrIsValid(receive_start_lsn))
1491 nulls[2] = true;
1492 else
1493 values[2] = LSNGetDatum(receive_start_lsn);
1494 values[3] = Int32GetDatum(receive_start_tli);
1495 if (!XLogRecPtrIsValid(written_lsn))
1496 nulls[4] = true;
1497 else
1498 values[4] = LSNGetDatum(written_lsn);
1499 if (!XLogRecPtrIsValid(flushed_lsn))
1500 nulls[5] = true;
1501 else
1502 values[5] = LSNGetDatum(flushed_lsn);
1503 values[6] = Int32GetDatum(received_tli);
1504 if (last_send_time == 0)
1505 nulls[7] = true;
1506 else
1507 values[7] = TimestampTzGetDatum(last_send_time);
1508 if (last_receipt_time == 0)
1509 nulls[8] = true;
1510 else
1511 values[8] = TimestampTzGetDatum(last_receipt_time);
1512 if (!XLogRecPtrIsValid(latest_end_lsn))
1513 nulls[9] = true;
1514 else
1515 values[9] = LSNGetDatum(latest_end_lsn);
1516 if (latest_end_time == 0)
1517 nulls[10] = true;
1518 else
1519 values[10] = TimestampTzGetDatum(latest_end_time);
1520 if (*slotname == '\0')
1521 nulls[11] = true;
1522 else
1523 values[11] = CStringGetTextDatum(slotname);
1524 if (*sender_host == '\0')
1525 nulls[12] = true;
1526 else
1527 values[12] = CStringGetTextDatum(sender_host);
1528 if (sender_port == 0)
1529 nulls[13] = true;
1530 else
1531 values[13] = Int32GetDatum(sender_port);
1532 if (*conninfo == '\0')
1533 nulls[14] = true;
1534 else
1535 values[14] = CStringGetTextDatum(conninfo);
1536 }
1537
1538 /* Returns the record as Datum */
1540}
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:5284
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:467
static Datum values[MAXATTR]
Definition: bootstrap.c:155
#define CStringGetTextDatum(s)
Definition: builtins.h:97
int64 TimestampTz
Definition: timestamp.h:39
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define palloc0_array(type, count)
Definition: fe_memutils.h:77
#define PG_RETURN_NULL()
Definition: fmgr.h:346
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:354
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:276
@ TYPEFUNC_COMPOSITE
Definition: funcapi.h:149
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
Definition: funcapi.h:230
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1117
Oid GetUserId(void)
Definition: miscinit.c:469
#define NAMEDATALEN
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:31
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
uint64_t Datum
Definition: postgres.h:70
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:222
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:111
XLogRecPtr latestWalEnd
Definition: walreceiver.h:116
TimeLineID receiveStartTLI
Definition: walreceiver.h:87
TimeLineID receivedTLI
Definition: walreceiver.h:97
char slotname[NAMEDATALEN]
Definition: walreceiver.h:136
char sender_host[NI_MAXHOST]
Definition: walreceiver.h:129
pid_t pid
Definition: walreceiver.h:68
XLogRecPtr receiveStart
Definition: walreceiver.h:86
XLogRecPtr flushedUpto
Definition: walreceiver.h:96
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:155
TimestampTz lastMsgSendTime
Definition: walreceiver.h:110
WalRcvState walRcvState
Definition: walreceiver.h:71
TimestampTz latestWalEndTime
Definition: walreceiver.h:117
bool ready_to_display
Definition: walreceiver.h:145
int sender_port
Definition: walreceiver.h:130
slock_t mutex
Definition: walreceiver.h:147
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:123
Definition: regguts.h:323
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
static const char * WalRcvGetStateString(WalRcvState state)
Definition: walreceiver.c:1386
#define MAXCONNINFO
Definition: walreceiver.h:37
WalRcvState
Definition: walreceiver.h:46
WalRcvData * WalRcv
#define XLogRecPtrIsValid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:63

References WalRcvData::conninfo, CStringGetTextDatum, elog, ERROR, WalRcvData::flushedUpto, get_call_result_type(), GetUserId(), has_privs_of_role(), heap_form_tuple(), HeapTupleGetDatum(), Int32GetDatum(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum(), MAXCONNINFO, WalRcvData::mutex, NAMEDATALEN, TupleDescData::natts, palloc0_array, pg_atomic_read_u64(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::sender_host, WalRcvData::sender_port, WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, strlcpy(), TimestampTzGetDatum(), TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, WalRcvData::writtenUpto, and XLogRecPtrIsValid.

◆ ProcessWalSndrMessage()

static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1275 of file walreceiver.c.

1276{
1277 WalRcvData *walrcv = WalRcv;
1278 TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1279
1280 /* Update shared-memory status */
1281 SpinLockAcquire(&walrcv->mutex);
1282 if (walrcv->latestWalEnd < walEnd)
1283 walrcv->latestWalEndTime = sendTime;
1284 walrcv->latestWalEnd = walEnd;
1285 walrcv->lastMsgSendTime = sendTime;
1286 walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1287 SpinLockRelease(&walrcv->mutex);
1288
1290 {
1291 char *sendtime;
1292 char *receipttime;
1293 int applyDelay;
1294
1295 /* Copy because timestamptz_to_str returns a static buffer */
1296 sendtime = pstrdup(timestamptz_to_str(sendTime));
1297 receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1298 applyDelay = GetReplicationApplyDelay();
1299
1300 /* apply delay is not available */
1301 if (applyDelay == -1)
1302 elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1303 sendtime,
1304 receipttime,
1306 else
1307 elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1308 sendtime,
1309 receipttime,
1310 applyDelay,
1312
1313 pfree(sendtime);
1314 pfree(receipttime);
1315 }
1316}
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1862
bool message_level_is_interesting(int elevel)
Definition: elog.c:273
#define DEBUG2
Definition: elog.h:29
char * pstrdup(const char *in)
Definition: mcxt.c:1781
void pfree(void *pointer)
Definition: mcxt.c:1616
int GetReplicationApplyDelay(void)
int GetReplicationTransferLatency(void)

References DEBUG2, elog, GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, message_level_is_interesting(), WalRcvData::mutex, pfree(), pstrdup(), SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

◆ WalRcvComputeNextWakeup()

static void WalRcvComputeNextWakeup ( WalRcvWakeupReason  reason,
TimestampTz  now 
)
static

Definition at line 1327 of file walreceiver.c.

1328{
1329 switch (reason)
1330 {
1332 if (wal_receiver_timeout <= 0)
1333 wakeup[reason] = TIMESTAMP_INFINITY;
1334 else
1336 break;
1337 case WALRCV_WAKEUP_PING:
1338 if (wal_receiver_timeout <= 0)
1339 wakeup[reason] = TIMESTAMP_INFINITY;
1340 else
1342 break;
1345 wakeup[reason] = TIMESTAMP_INFINITY;
1346 else
1348 break;
1351 wakeup[reason] = TIMESTAMP_INFINITY;
1352 else
1354 break;
1355 /* there's intentionally no default: here */
1356 }
1357}
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
#define TIMESTAMP_INFINITY
Definition: timestamp.h:151
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
#define TimestampTzPlusSeconds(tz, s)
Definition: timestamp.h:86
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]
Definition: walreceiver.c:131
bool hot_standby_feedback
Definition: walreceiver.c:91
int wal_receiver_status_interval
Definition: walreceiver.c:89
int wal_receiver_timeout
Definition: walreceiver.c:90

References hot_standby_feedback, now(), TIMESTAMP_INFINITY, TimestampTzPlusMilliseconds, TimestampTzPlusSeconds, wakeup, wal_receiver_status_interval, wal_receiver_timeout, WALRCV_WAKEUP_HSFEEDBACK, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_REPLY, and WALRCV_WAKEUP_TERMINATE.

Referenced by WalReceiverMain(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ WalRcvDie()

static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 782 of file walreceiver.c.

783{
784 WalRcvData *walrcv = WalRcv;
785 TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
786
787 Assert(*startpointTLI_p != 0);
788
789 /* Ensure that all WAL records received are flushed to disk */
790 XLogWalRcvFlush(true, *startpointTLI_p);
791
792 /* Mark ourselves inactive in shared memory */
793 SpinLockAcquire(&walrcv->mutex);
795 walrcv->walRcvState == WALRCV_RESTARTING ||
796 walrcv->walRcvState == WALRCV_STARTING ||
797 walrcv->walRcvState == WALRCV_WAITING ||
798 walrcv->walRcvState == WALRCV_STOPPING);
799 Assert(walrcv->pid == MyProcPid);
800 walrcv->walRcvState = WALRCV_STOPPED;
801 walrcv->pid = 0;
802 walrcv->procno = INVALID_PROC_NUMBER;
803 walrcv->ready_to_display = false;
804 SpinLockRelease(&walrcv->mutex);
805
807
808 /* Terminate the connection gracefully. */
809 if (wrconn != NULL)
811
812 /* Wake up the startup process to notice promptly that we're gone */
814}
void ConditionVariableBroadcast(ConditionVariable *cv)
int MyProcPid
Definition: globals.c:47
Assert(PointerIsAligned(start, uint64))
void * arg
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:342
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
ProcNumber procno
Definition: walreceiver.h:67
ConditionVariable walRcvStoppedCV
Definition: walreceiver.h:72
static WalReceiverConn * wrconn
Definition: walreceiver.c:94
static void XLogWalRcvFlush(bool dying, TimeLineID tli)
Definition: walreceiver.c:994
@ WALRCV_STARTING
Definition: walreceiver.h:48
@ WALRCV_STOPPED
Definition: walreceiver.h:47
@ WALRCV_RESTARTING
Definition: walreceiver.h:52
@ WALRCV_STREAMING
Definition: walreceiver.h:50
@ WALRCV_WAITING
Definition: walreceiver.h:51
@ WALRCV_STOPPING
Definition: walreceiver.h:53
#define walrcv_disconnect(conn)
Definition: walreceiver.h:467
void WakeupRecovery(void)

References arg, Assert(), ConditionVariableBroadcast(), DatumGetPointer(), INVALID_PROC_NUMBER, WalRcvData::mutex, MyProcPid, WalRcvData::pid, WalRcvData::procno, WalRcvData::ready_to_display, SpinLockAcquire, SpinLockRelease, WakeupRecovery(), WalRcv, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, wrconn, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

◆ WalRcvFetchTimeLineHistoryFiles()

static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 726 of file walreceiver.c.

727{
728 TimeLineID tli;
729
730 for (tli = first; tli <= last; tli++)
731 {
732 /* there's no history file for timeline 1 */
733 if (tli != 1 && !existsTimeLineHistory(tli))
734 {
735 char *fname;
736 char *content;
737 int len;
738 char expectedfname[MAXFNAMELEN];
739
740 ereport(LOG,
741 (errmsg("fetching timeline history file for timeline %u from primary server",
742 tli)));
743
744 walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
745
746 /*
747 * Check that the filename on the primary matches what we
748 * calculated ourselves. This is just a sanity check, it should
749 * always match.
750 */
751 TLHistoryFileName(expectedfname, tli);
752 if (strcmp(fname, expectedfname) != 0)
754 (errcode(ERRCODE_PROTOCOL_VIOLATION),
755 errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
756 tli)));
757
758 /*
759 * Write the file to pg_wal.
760 */
761 writeTimeLineHistoryFile(tli, content, len);
762
763 /*
764 * Mark the streamed history file as ready for archiving if
765 * archive_mode is always.
766 */
769 else
770 XLogArchiveNotify(fname);
771
772 pfree(fname);
773 pfree(content);
774 }
775 }
776}
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition: timeline.c:463
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition: timeline.c:222
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1170
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define LOG
Definition: elog.h:31
#define ereport(elevel,...)
Definition: elog.h:150
const void size_t len
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
Definition: walreceiver.h:449
int XLogArchiveMode
Definition: xlog.c:122
@ ARCHIVE_MODE_ALWAYS
Definition: xlog.h:68
#define MAXFNAMELEN
static void TLHistoryFileName(char *fname, TimeLineID tli)
void XLogArchiveForceDone(const char *xlog)
Definition: xlogarchive.c:510
void XLogArchiveNotify(const char *xlog)
Definition: xlogarchive.c:444

References ARCHIVE_MODE_ALWAYS, ereport, errcode(), errmsg(), errmsg_internal(), ERROR, existsTimeLineHistory(), len, LOG, MAXFNAMELEN, pfree(), TLHistoryFileName(), walrcv_readtimelinehistoryfile, wrconn, writeTimeLineHistoryFile(), XLogArchiveForceDone(), XLogArchiveMode, and XLogArchiveNotify().

Referenced by WalReceiverMain().

◆ WalRcvForceReply()

void WalRcvForceReply ( void  )

Definition at line 1368 of file walreceiver.c.

1369{
1370 ProcNumber procno;
1371
1372 WalRcv->force_reply = true;
1373 /* fetching the proc number is probably atomic, but don't rely on it */
1375 procno = WalRcv->procno;
1377 if (procno != INVALID_PROC_NUMBER)
1378 SetLatch(&GetPGProcByNumber(procno)->procLatch);
1379}
void SetLatch(Latch *latch)
Definition: latch.c:290
#define GetPGProcByNumber(n)
Definition: proc.h:440
int ProcNumber
Definition: procnumber.h:24
sig_atomic_t force_reply
Definition: walreceiver.h:162

References WalRcvData::force_reply, GetPGProcByNumber, INVALID_PROC_NUMBER, WalRcvData::mutex, WalRcvData::procno, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by ApplyWalRecord(), and WaitForWALToBecomeAvailable().

◆ WalRcvGetStateString()

static const char * WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1386 of file walreceiver.c.

1387{
1388 switch (state)
1389 {
1390 case WALRCV_STOPPED:
1391 return "stopped";
1392 case WALRCV_STARTING:
1393 return "starting";
1394 case WALRCV_STREAMING:
1395 return "streaming";
1396 case WALRCV_WAITING:
1397 return "waiting";
1398 case WALRCV_RESTARTING:
1399 return "restarting";
1400 case WALRCV_STOPPING:
1401 return "stopping";
1402 }
1403 return "UNKNOWN";
1404}

References WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

◆ WalRcvWaitForStartPosition()

static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 646 of file walreceiver.c.

647{
648 WalRcvData *walrcv = WalRcv;
649 int state;
650
651 SpinLockAcquire(&walrcv->mutex);
652 state = walrcv->walRcvState;
653 if (state != WALRCV_STREAMING)
654 {
655 SpinLockRelease(&walrcv->mutex);
656 if (state == WALRCV_STOPPING)
657 proc_exit(0);
658 else
659 elog(FATAL, "unexpected walreceiver state");
660 }
661 walrcv->walRcvState = WALRCV_WAITING;
663 walrcv->receiveStartTLI = 0;
664 SpinLockRelease(&walrcv->mutex);
665
666 set_ps_display("idle");
667
668 /*
669 * nudge startup process to notice that we've stopped streaming and are
670 * now waiting for instructions.
671 */
673 for (;;)
674 {
676
678
679 SpinLockAcquire(&walrcv->mutex);
681 walrcv->walRcvState == WALRCV_WAITING ||
682 walrcv->walRcvState == WALRCV_STOPPING);
683 if (walrcv->walRcvState == WALRCV_RESTARTING)
684 {
685 /*
686 * No need to handle changes in primary_conninfo or
687 * primary_slot_name here. Startup process will signal us to
688 * terminate in case those change.
689 */
690 *startpoint = walrcv->receiveStart;
691 *startpointTLI = walrcv->receiveStartTLI;
693 SpinLockRelease(&walrcv->mutex);
694 break;
695 }
696 if (walrcv->walRcvState == WALRCV_STOPPING)
697 {
698 /*
699 * We should've received SIGTERM if the startup process wants us
700 * to die, but might as well check it here too.
701 */
702 SpinLockRelease(&walrcv->mutex);
703 exit(1);
704 }
705 SpinLockRelease(&walrcv->mutex);
706
708 WAIT_EVENT_WAL_RECEIVER_WAIT_START);
709 }
710
712 {
713 char activitymsg[50];
714
715 snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%08X",
716 LSN_FORMAT_ARGS(*startpoint));
717 set_ps_display(activitymsg);
718 }
719}
#define FATAL
Definition: elog.h:41
struct Latch * MyLatch
Definition: globals.c:63
void proc_exit(int code)
Definition: ipc.c:104
void ResetLatch(Latch *latch)
Definition: latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:172
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
#define snprintf
Definition: port.h:260
bool update_process_title
Definition: ps_status.c:31
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
#define WL_EXIT_ON_PM_DEATH
Definition: waiteventset.h:39
#define WL_LATCH_SET
Definition: waiteventset.h:34
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:47
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert(), CHECK_FOR_INTERRUPTS, elog, FATAL, InvalidXLogRecPtr, LSN_FORMAT_ARGS, WalRcvData::mutex, MyLatch, proc_exit(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, ResetLatch(), set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by WalReceiverMain().

◆ WalReceiverMain()

void WalReceiverMain ( const void *  startup_data,
size_t  startup_data_len 
)

Definition at line 153 of file walreceiver.c.

154{
155 char conninfo[MAXCONNINFO];
156 char *tmp_conninfo;
157 char slotname[NAMEDATALEN];
158 bool is_temp_slot;
159 XLogRecPtr startpoint;
160 TimeLineID startpointTLI;
161 TimeLineID primaryTLI;
162 bool first_stream;
163 WalRcvData *walrcv;
165 char *err;
166 char *sender_host = NULL;
167 int sender_port = 0;
168 char *appname;
169
170 Assert(startup_data_len == 0);
171
174
175 /*
176 * WalRcv should be set up already (if we are a backend, we inherit this
177 * by fork() or EXEC_BACKEND mechanism from the postmaster).
178 */
179 walrcv = WalRcv;
180 Assert(walrcv != NULL);
181
182 /*
183 * Mark walreceiver as running in shared memory.
184 *
185 * Do this as early as possible, so that if we fail later on, we'll set
186 * state to STOPPED. If we die before this, the startup process will keep
187 * waiting for us to start up, until it times out.
188 */
189 SpinLockAcquire(&walrcv->mutex);
190 Assert(walrcv->pid == 0);
191 switch (walrcv->walRcvState)
192 {
193 case WALRCV_STOPPING:
194 /* If we've already been requested to stop, don't start up. */
195 walrcv->walRcvState = WALRCV_STOPPED;
196 /* fall through */
197
198 case WALRCV_STOPPED:
199 SpinLockRelease(&walrcv->mutex);
201 proc_exit(1);
202 break;
203
204 case WALRCV_STARTING:
205 /* The usual case */
206 break;
207
208 case WALRCV_WAITING:
209 case WALRCV_STREAMING:
211 default:
212 /* Shouldn't happen */
213 SpinLockRelease(&walrcv->mutex);
214 elog(PANIC, "walreceiver still running according to shared memory state");
215 }
216 /* Advertise our PID so that the startup process can kill us */
217 walrcv->pid = MyProcPid;
219
220 /* Fetch information required to start streaming */
221 walrcv->ready_to_display = false;
222 strlcpy(conninfo, walrcv->conninfo, MAXCONNINFO);
223 strlcpy(slotname, walrcv->slotname, NAMEDATALEN);
224 is_temp_slot = walrcv->is_temp_slot;
225 startpoint = walrcv->receiveStart;
226 startpointTLI = walrcv->receiveStartTLI;
227
228 /*
229 * At most one of is_temp_slot and slotname can be set; otherwise,
230 * RequestXLogStreaming messed up.
231 */
232 Assert(!is_temp_slot || (slotname[0] == '\0'));
233
234 /* Initialise to a sanish value */
236 walrcv->lastMsgSendTime =
237 walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
238
239 /* Report our proc number so that others can wake us up */
240 walrcv->procno = MyProcNumber;
241
242 SpinLockRelease(&walrcv->mutex);
243
245
246 /* Arrange to clean up at walreceiver exit */
247 on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
248
249 /* Properly accept or ignore signals the postmaster might send us */
250 pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
251 * file */
252 pqsignal(SIGINT, SIG_IGN);
253 pqsignal(SIGTERM, die); /* request shutdown */
254 /* SIGQUIT handler was already set up by InitPostmasterChild */
255 pqsignal(SIGALRM, SIG_IGN);
256 pqsignal(SIGPIPE, SIG_IGN);
258 pqsignal(SIGUSR2, SIG_IGN);
259
260 /* Reset some signals that are accepted by postmaster but not here */
261 pqsignal(SIGCHLD, SIG_DFL);
262
263 /* Load the libpq-specific functions */
264 load_file("libpqwalreceiver", false);
265 if (WalReceiverFunctions == NULL)
266 elog(ERROR, "libpqwalreceiver didn't initialize correctly");
267
268 /* Unblock signals (they were blocked when the postmaster forked us) */
269 sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
270
271 /* Establish the connection to the primary for XLOG streaming */
272 appname = cluster_name[0] ? cluster_name : "walreceiver";
273 wrconn = walrcv_connect(conninfo, true, false, false, appname, &err);
274 if (!wrconn)
276 (errcode(ERRCODE_CONNECTION_FAILURE),
277 errmsg("streaming replication receiver \"%s\" could not connect to the primary server: %s",
278 appname, err)));
279
280 /*
281 * Save user-visible connection string. This clobbers the original
282 * conninfo, for security. Also save host and port of the sender server
283 * this walreceiver is connected to.
284 */
285 tmp_conninfo = walrcv_get_conninfo(wrconn);
286 walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
287 SpinLockAcquire(&walrcv->mutex);
288 memset(walrcv->conninfo, 0, MAXCONNINFO);
289 if (tmp_conninfo)
290 strlcpy(walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
291
292 memset(walrcv->sender_host, 0, NI_MAXHOST);
293 if (sender_host)
294 strlcpy(walrcv->sender_host, sender_host, NI_MAXHOST);
295
296 walrcv->sender_port = sender_port;
297 walrcv->ready_to_display = true;
298 SpinLockRelease(&walrcv->mutex);
299
300 if (tmp_conninfo)
301 pfree(tmp_conninfo);
302
303 if (sender_host)
304 pfree(sender_host);
305
306 first_stream = true;
307 for (;;)
308 {
309 char *primary_sysid;
310 char standby_sysid[32];
312
313 /*
314 * Check that we're connected to a valid server using the
315 * IDENTIFY_SYSTEM replication command.
316 */
317 primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
318
319 snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
321 if (strcmp(primary_sysid, standby_sysid) != 0)
322 {
324 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
325 errmsg("database system identifier differs between the primary and standby"),
326 errdetail("The primary's identifier is %s, the standby's identifier is %s.",
327 primary_sysid, standby_sysid)));
328 }
329
330 /*
331 * Confirm that the current timeline of the primary is the same or
332 * ahead of ours.
333 */
334 if (primaryTLI < startpointTLI)
336 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
337 errmsg("highest timeline %u of the primary is behind recovery timeline %u",
338 primaryTLI, startpointTLI)));
339
340 /*
341 * Get any missing history files. We do this always, even when we're
342 * not interested in that timeline, so that if we're promoted to
343 * become the primary later on, we don't select the same timeline that
344 * was already used in the current primary. This isn't bullet-proof -
345 * you'll need some external software to manage your cluster if you
346 * need to ensure that a unique timeline id is chosen in every case,
347 * but let's avoid the confusion of timeline id collisions where we
348 * can.
349 */
350 WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
351
352 /*
353 * Create temporary replication slot if requested, and update slot
354 * name in shared memory. (Note the slot name cannot already be set
355 * in this case.)
356 */
357 if (is_temp_slot)
358 {
359 snprintf(slotname, sizeof(slotname),
360 "pg_walreceiver_%lld",
361 (long long int) walrcv_get_backend_pid(wrconn));
362
363 walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL);
364
365 SpinLockAcquire(&walrcv->mutex);
366 strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
367 SpinLockRelease(&walrcv->mutex);
368 }
369
370 /*
371 * Start streaming.
372 *
373 * We'll try to start at the requested starting point and timeline,
374 * even if it's different from the server's latest timeline. In case
375 * we've already reached the end of the old timeline, the server will
376 * finish the streaming immediately, and we will go back to await
377 * orders from the startup process. If recovery_target_timeline is
378 * 'latest', the startup process will scan pg_wal and find the new
379 * history file, bump recovery target timeline, and ask us to restart
380 * on the new timeline.
381 */
382 options.logical = false;
383 options.startpoint = startpoint;
384 options.slotname = slotname[0] != '\0' ? slotname : NULL;
385 options.proto.physical.startpointTLI = startpointTLI;
387 {
388 if (first_stream)
389 ereport(LOG,
390 errmsg("started streaming WAL from primary at %X/%08X on timeline %u",
391 LSN_FORMAT_ARGS(startpoint), startpointTLI));
392 else
393 ereport(LOG,
394 errmsg("restarted WAL streaming at %X/%08X on timeline %u",
395 LSN_FORMAT_ARGS(startpoint), startpointTLI));
396 first_stream = false;
397
398 /* Initialize LogstreamResult and buffers for processing messages */
401
402 /* Initialize nap wakeup times. */
404 for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
406
407 /* Send initial reply/feedback messages. */
408 XLogWalRcvSendReply(true, false);
410
411 /* Loop until end-of-streaming or error */
412 for (;;)
413 {
414 char *buf;
415 int len;
416 bool endofwal = false;
417 pgsocket wait_fd = PGINVALID_SOCKET;
418 int rc;
419 TimestampTz nextWakeup;
420 long nap;
421
422 /*
423 * Exit walreceiver if we're not in recovery. This should not
424 * happen, but cross-check the status here.
425 */
426 if (!RecoveryInProgress())
428 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
429 errmsg("cannot continue WAL streaming, recovery has already ended")));
430
431 /* Process any requests or signals received recently */
433
435 {
436 ConfigReloadPending = false;
438 /* recompute wakeup times */
440 for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
443 }
444
445 /* See if we can read data immediately */
446 len = walrcv_receive(wrconn, &buf, &wait_fd);
447 if (len != 0)
448 {
449 /*
450 * Process the received data, and any subsequent data we
451 * can read without blocking.
452 */
453 for (;;)
454 {
455 if (len > 0)
456 {
457 /*
458 * Something was received from primary, so adjust
459 * the ping and terminate wakeup times.
460 */
463 now);
465 XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
466 startpointTLI);
467 }
468 else if (len == 0)
469 break;
470 else if (len < 0)
471 {
472 ereport(LOG,
473 (errmsg("replication terminated by primary server"),
474 errdetail("End of WAL reached on timeline %u at %X/%08X.",
475 startpointTLI,
477 endofwal = true;
478 break;
479 }
480 len = walrcv_receive(wrconn, &buf, &wait_fd);
481 }
482
483 /* Let the primary know that we received some data. */
484 XLogWalRcvSendReply(false, false);
485
486 /*
487 * If we've written some records, flush them to disk and
488 * let the startup process and primary server know about
489 * them.
490 */
491 XLogWalRcvFlush(false, startpointTLI);
492 }
493
494 /* Check if we need to exit the streaming loop. */
495 if (endofwal)
496 break;
497
498 /* Find the soonest wakeup time, to limit our nap. */
499 nextWakeup = TIMESTAMP_INFINITY;
500 for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
501 nextWakeup = Min(wakeup[i], nextWakeup);
502
503 /* Calculate the nap time, clamping as necessary. */
505 nap = TimestampDifferenceMilliseconds(now, nextWakeup);
506
507 /*
508 * Ideally we would reuse a WaitEventSet object repeatedly
509 * here to avoid the overheads of WaitLatchOrSocket on epoll
510 * systems, but we can't be sure that libpq (or any other
511 * walreceiver implementation) has the same socket (even if
512 * the fd is the same number, it may have been closed and
513 * reopened since the last time). In future, if there is a
514 * function for removing sockets from WaitEventSet, then we
515 * could add and remove just the socket each time, potentially
516 * avoiding some system calls.
517 */
518 Assert(wait_fd != PGINVALID_SOCKET);
522 wait_fd,
523 nap,
524 WAIT_EVENT_WAL_RECEIVER_MAIN);
525 if (rc & WL_LATCH_SET)
526 {
529
530 if (walrcv->force_reply)
531 {
532 /*
533 * The recovery process has asked us to send apply
534 * feedback now. Make sure the flag is really set to
535 * false in shared memory before sending the reply, so
536 * we don't miss a new request for a reply.
537 */
538 walrcv->force_reply = false;
540 XLogWalRcvSendReply(true, false);
541 }
542 }
543 if (rc & WL_TIMEOUT)
544 {
545 /*
546 * We didn't receive anything new. If we haven't heard
547 * anything from the server for more than
548 * wal_receiver_timeout / 2, ping the server. Also, if
549 * it's been longer than wal_receiver_status_interval
550 * since the last update we sent, send a status update to
551 * the primary anyway, to report any progress in applying
552 * WAL.
553 */
554 bool requestReply = false;
555
556 /*
557 * Report pending statistics to the cumulative stats
558 * system. This location is useful for the report as it
559 * is not within a tight loop in the WAL receiver, to
560 * avoid bloating pgstats with requests, while also making
561 * sure that the reports happen each time a status update
562 * is sent.
563 */
564 pgstat_report_wal(false);
565
566 /*
567 * Check if time since last receive from primary has
568 * reached the configured limit.
569 */
573 (errcode(ERRCODE_CONNECTION_FAILURE),
574 errmsg("terminating walreceiver due to timeout")));
575
576 /*
577 * If we didn't receive anything new for half of receiver
578 * replication timeout, then ping the server.
579 */
581 {
582 requestReply = true;
584 }
585
586 XLogWalRcvSendReply(requestReply, requestReply);
588 }
589 }
590
591 /*
592 * The backend finished streaming. Exit streaming COPY-mode from
593 * our side, too.
594 */
595 walrcv_endstreaming(wrconn, &primaryTLI);
596
597 /*
598 * If the server had switched to a new timeline that we didn't
599 * know about when we began streaming, fetch its timeline history
600 * file now.
601 */
602 WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
603 }
604 else
605 ereport(LOG,
606 (errmsg("primary server contains no more WAL on requested timeline %u",
607 startpointTLI)));
608
609 /*
610 * End of WAL reached on the requested timeline. Close the last
611 * segment, and await for new orders from the startup process.
612 */
613 if (recvFile >= 0)
614 {
615 char xlogfname[MAXFNAMELEN];
616
617 XLogWalRcvFlush(false, startpointTLI);
619 if (close(recvFile) != 0)
622 errmsg("could not close WAL segment %s: %m",
623 xlogfname)));
624
625 /*
626 * Create .done file forcibly to prevent the streamed segment from
627 * being archived later.
628 */
630 XLogArchiveForceDone(xlogfname);
631 else
632 XLogArchiveNotify(xlogfname);
633 }
634 recvFile = -1;
635
636 elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
637 WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
638 }
639 /* not reached */
640}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:485
#define pg_memory_barrier()
Definition: atomics.h:141
void AuxiliaryProcessMainCommon(void)
Definition: auxprocess.c:39
sigset_t UnBlockSig
Definition: pqsignal.c:22
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1757
#define Min(x, y)
Definition: c.h:1003
#define UINT64_FORMAT
Definition: c.h:571
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:149
int errcode_for_file_access(void)
Definition: elog.c:886
int errdetail(const char *fmt,...)
Definition: elog.c:1216
#define PANIC
Definition: elog.h:42
#define DEBUG1
Definition: elog.h:30
void err(int eval, const char *fmt,...)
Definition: err.c:43
ProcNumber MyProcNumber
Definition: globals.c:90
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
@ PGC_SIGHUP
Definition: guc.h:75
char * cluster_name
Definition: guc_tables.c:555
#define close(a)
Definition: win32.h:12
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
int i
Definition: isn.c:77
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:223
@ B_WAL_RECEIVER
Definition: miscadmin.h:366
BackendType MyBackendType
Definition: miscinit.c:64
static char ** options
static char buf[DEFAULT_XLOG_SEG_SIZE]
Definition: pg_test_fsync.c:71
#define die(msg)
Definition: pg_test_fsync.c:99
void pgstat_report_wal(bool force)
Definition: pgstat_wal.c:46
#define pqsignal
Definition: port.h:551
int pgsocket
Definition: port.h:29
#define PGINVALID_SOCKET
Definition: port.h:31
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:352
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:677
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
bool is_temp_slot
Definition: walreceiver.h:142
#define WL_SOCKET_READABLE
Definition: waiteventset.h:35
#define WL_TIMEOUT
Definition: waiteventset.h:37
#define NUM_WALRCV_WAKEUPS
Definition: walreceiver.c:125
static StringInfoData reply_message
Definition: walreceiver.c:133
static int recvFile
Definition: walreceiver.c:102
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
Definition: walreceiver.c:726
static TimeLineID recvFileTLI
Definition: walreceiver.c:103
WalReceiverFunctionsType * WalReceiverFunctions
Definition: walreceiver.c:95
static XLogSegNo recvSegNo
Definition: walreceiver.c:104
static void XLogWalRcvSendHSFeedback(bool immed)
Definition: walreceiver.c:1179
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
Definition: walreceiver.c:646
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
Definition: walreceiver.c:820
static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
Definition: walreceiver.c:1327
static void WalRcvDie(int code, Datum arg)
Definition: walreceiver.c:782
static void XLogWalRcvSendReply(bool force, bool requestReply)
Definition: walreceiver.c:1110
static struct @19 LogstreamResult
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:451
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:435
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
Definition: walreceiver.h:441
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
Definition: walreceiver.h:459
#define walrcv_get_conninfo(conn)
Definition: walreceiver.h:439
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:453
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:443
#define walrcv_get_backend_pid(conn)
Definition: walreceiver.h:463
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:455
#define SIGCHLD
Definition: win32_port.h:168
#define SIGHUP
Definition: win32_port.h:158
#define SIGPIPE
Definition: win32_port.h:163
#define SIGUSR1
Definition: win32_port.h:170
#define SIGALRM
Definition: win32_port.h:164
#define SIGUSR2
Definition: win32_port.h:171
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4628
bool RecoveryInProgress(void)
Definition: xlog.c:6461
int wal_segment_size
Definition: xlog.c:146
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References ARCHIVE_MODE_ALWAYS, Assert(), AuxiliaryProcessMainCommon(), B_WAL_RECEIVER, buf, CHECK_FOR_INTERRUPTS, close, cluster_name, ConditionVariableBroadcast(), ConfigReloadPending, WalRcvData::conninfo, DEBUG1, die, elog, ereport, err(), errcode(), errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, WalRcvData::force_reply, GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), i, initStringInfo(), WalRcvData::is_temp_slot, WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEndTime, len, load_file(), LOG, LogstreamResult, LSN_FORMAT_ARGS, MAXCONNINFO, MAXFNAMELEN, Min, WalRcvData::mutex, MyBackendType, MyLatch, MyProcNumber, MyProcPid, NAMEDATALEN, now(), NUM_WALRCV_WAKEUPS, on_shmem_exit(), options, PANIC, pfree(), pg_atomic_write_u64(), pg_memory_barrier, PGC_SIGHUP, PGINVALID_SOCKET, pgstat_report_wal(), WalRcvData::pid, PointerGetDatum(), pqsignal, proc_exit(), ProcessConfigFile(), WalRcvData::procno, procsignal_sigusr1_handler(), WalRcvData::ready_to_display, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, reply_message, ResetLatch(), WalRcvData::sender_host, WalRcvData::sender_port, SIGALRM, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, WalRcvData::slotname, snprintf, SpinLockAcquire, SpinLockRelease, strlcpy(), TIMESTAMP_INFINITY, TimestampDifferenceMilliseconds(), UINT64_FORMAT, UnBlockSig, WaitLatchOrSocket(), wakeup, wal_segment_size, WalRcv, walrcv_connect, walrcv_create_slot, walrcv_endstreaming, walrcv_get_backend_pid, walrcv_get_conninfo, walrcv_get_senderinfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_TERMINATE, WalRcvComputeNextWakeup(), WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, WalRcvWaitForStartPosition(), WalReceiverFunctions, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_TIMEOUT, wrconn, WalRcvData::writtenUpto, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName(), XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ XLogWalRcvClose()

static void XLogWalRcvClose ( XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 1058 of file walreceiver.c.

1059{
1060 char xlogfname[MAXFNAMELEN];
1061
1063 Assert(tli != 0);
1064
1065 /*
1066 * fsync() and close current file before we switch to next one. We would
1067 * otherwise have to reopen this file to fsync it later
1068 */
1069 XLogWalRcvFlush(false, tli);
1070
1072
1073 /*
1074 * XLOG segment files will be re-read by recovery in startup process soon,
1075 * so we don't advise the OS to release cache pages associated with the
1076 * file like XLogFileClose() does.
1077 */
1078 if (close(recvFile) != 0)
1079 ereport(PANIC,
1081 errmsg("could not close WAL segment %s: %m",
1082 xlogfname)));
1083
1084 /*
1085 * Create .done file forcibly to prevent the streamed segment from being
1086 * archived later.
1087 */
1089 XLogArchiveForceDone(xlogfname);
1090 else
1091 XLogArchiveNotify(xlogfname);
1092
1093 recvFile = -1;
1094}
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

References ARCHIVE_MODE_ALWAYS, Assert(), close, ereport, errcode_for_file_access(), errmsg(), MAXFNAMELEN, PANIC, recvFile, recvFileTLI, recvSegNo, wal_segment_size, XLByteInSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName(), and XLogWalRcvFlush().

Referenced by XLogWalRcvWrite().

◆ XLogWalRcvFlush()

static void XLogWalRcvFlush ( bool  dying,
TimeLineID  tli 
)
static

Definition at line 994 of file walreceiver.c.

995{
996 Assert(tli != 0);
997
998 if (LogstreamResult.Flush < LogstreamResult.Write)
999 {
1000 WalRcvData *walrcv = WalRcv;
1001
1003
1004 LogstreamResult.Flush = LogstreamResult.Write;
1005
1006 /* Update shared-memory status */
1007 SpinLockAcquire(&walrcv->mutex);
1008 if (walrcv->flushedUpto < LogstreamResult.Flush)
1009 {
1010 walrcv->latestChunkStart = walrcv->flushedUpto;
1011 walrcv->flushedUpto = LogstreamResult.Flush;
1012 walrcv->receivedTLI = tli;
1013 }
1014 SpinLockRelease(&walrcv->mutex);
1015
1016 /*
1017 * If we flushed an LSN that someone was waiting for, notify the
1018 * waiters.
1019 */
1020 if (waitLSNState &&
1021 (LogstreamResult.Flush >=
1024
1025 /* Signal the startup process and walsender that new WAL has arrived */
1028 WalSndWakeup(true, false);
1029
1030 /* Report XLOG streaming progress in PS display */
1032 {
1033 char activitymsg[50];
1034
1035 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%08X",
1037 set_ps_display(activitymsg);
1038 }
1039
1040 /* Also let the primary know that we made some progress */
1041 if (!dying)
1042 {
1043 XLogWalRcvSendReply(false, false);
1045 }
1046 }
1047}
pg_atomic_uint64 minWaitedLSN[WAIT_LSN_TYPE_COUNT]
Definition: xlogwait.h:85
XLogRecPtr latestChunkStart
Definition: walreceiver.h:105
#define AllowCascadeReplication()
Definition: walreceiver.h:40
void WalSndWakeup(bool physical, bool logical)
Definition: walsender.c:3800
void issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:8856
struct WaitLSNState * waitLSNState
Definition: xlogwait.c:69
void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
Definition: xlogwait.c:318
@ WAIT_LSN_TYPE_STANDBY_FLUSH
Definition: xlogwait.h:41

References AllowCascadeReplication, Assert(), WalRcvData::flushedUpto, issue_xlog_fsync(), WalRcvData::latestChunkStart, LogstreamResult, LSN_FORMAT_ARGS, WaitLSNState::minWaitedLSN, WalRcvData::mutex, pg_atomic_read_u64(), WalRcvData::receivedTLI, recvFile, recvSegNo, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WAIT_LSN_TYPE_STANDBY_FLUSH, waitLSNState, WaitLSNWakeup(), WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvClose().

◆ XLogWalRcvProcessMsg()

static void XLogWalRcvProcessMsg ( unsigned char  type,
char *  buf,
Size  len,
TimeLineID  tli 
)
static

Definition at line 820 of file walreceiver.c.

821{
822 int hdrlen;
823 XLogRecPtr dataStart;
824 XLogRecPtr walEnd;
825 TimestampTz sendTime;
826 bool replyRequested;
827
828 switch (type)
829 {
831 {
832 StringInfoData incoming_message;
833
834 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
835 if (len < hdrlen)
837 (errcode(ERRCODE_PROTOCOL_VIOLATION),
838 errmsg_internal("invalid WAL message received from primary")));
839
840 /* initialize a StringInfo with the given buffer */
841 initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
842
843 /* read the fields */
844 dataStart = pq_getmsgint64(&incoming_message);
845 walEnd = pq_getmsgint64(&incoming_message);
846 sendTime = pq_getmsgint64(&incoming_message);
847 ProcessWalSndrMessage(walEnd, sendTime);
848
849 buf += hdrlen;
850 len -= hdrlen;
851 XLogWalRcvWrite(buf, len, dataStart, tli);
852 break;
853 }
855 {
856 StringInfoData incoming_message;
857
858 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
859 if (len != hdrlen)
861 (errcode(ERRCODE_PROTOCOL_VIOLATION),
862 errmsg_internal("invalid keepalive message received from primary")));
863
864 /* initialize a StringInfo with the given buffer */
865 initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
866
867 /* read the fields */
868 walEnd = pq_getmsgint64(&incoming_message);
869 sendTime = pq_getmsgint64(&incoming_message);
870 replyRequested = pq_getmsgbyte(&incoming_message);
871
872 ProcessWalSndrMessage(walEnd, sendTime);
873
874 /* If the primary requested a reply, send one immediately */
875 if (replyRequested)
876 XLogWalRcvSendReply(true, false);
877 break;
878 }
879 default:
881 (errcode(ERRCODE_PROTOCOL_VIOLATION),
882 errmsg_internal("invalid replication message type %d",
883 type)));
884 }
885}
int64_t int64
Definition: c.h:549
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:398
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:452
#define PqReplMsg_WALData
Definition: protocol.h:77
#define PqReplMsg_Keepalive
Definition: protocol.h:75
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition: stringinfo.h:157
const char * type
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
Definition: walreceiver.c:1275
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
Definition: walreceiver.c:891

References buf, ereport, errcode(), errmsg_internal(), ERROR, initReadOnlyStringInfo(), len, pq_getmsgbyte(), pq_getmsgint64(), PqReplMsg_Keepalive, PqReplMsg_WALData, ProcessWalSndrMessage(), type, XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

◆ XLogWalRcvSendHSFeedback()

static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1179 of file walreceiver.c.

1180{
1182 FullTransactionId nextFullXid;
1183 TransactionId nextXid;
1184 uint32 xmin_epoch,
1185 catalog_xmin_epoch;
1186 TransactionId xmin,
1187 catalog_xmin;
1188
1189 /* initially true so we always send at least one feedback message */
1190 static bool primary_has_standby_xmin = true;
1191
1192 /*
1193 * If the user doesn't want status to be reported to the primary, be sure
1194 * to exit before doing anything at all.
1195 */
1197 !primary_has_standby_xmin)
1198 return;
1199
1200 /* Get current timestamp. */
1202
1203 /* Send feedback at most once per wal_receiver_status_interval. */
1204 if (!immed && now < wakeup[WALRCV_WAKEUP_HSFEEDBACK])
1205 return;
1206
1207 /* Make sure we wake up when it's time to send feedback again. */
1209
1210 /*
1211 * If Hot Standby is not yet accepting connections there is nothing to
1212 * send. Check this after the interval has expired to reduce number of
1213 * calls.
1214 *
1215 * Bailing out here also ensures that we don't send feedback until we've
1216 * read our own replication slot state, so we don't tell the primary to
1217 * discard needed xmin or catalog_xmin from any slots that may exist on
1218 * this replica.
1219 */
1220 if (!HotStandbyActive())
1221 return;
1222
1223 /*
1224 * Make the expensive call to get the oldest xmin once we are certain
1225 * everything else has been checked.
1226 */
1228 {
1229 GetReplicationHorizons(&xmin, &catalog_xmin);
1230 }
1231 else
1232 {
1233 xmin = InvalidTransactionId;
1234 catalog_xmin = InvalidTransactionId;
1235 }
1236
1237 /*
1238 * Get epoch and adjust if nextXid and oldestXmin are different sides of
1239 * the epoch boundary.
1240 */
1241 nextFullXid = ReadNextFullTransactionId();
1242 nextXid = XidFromFullTransactionId(nextFullXid);
1243 xmin_epoch = EpochFromFullTransactionId(nextFullXid);
1244 catalog_xmin_epoch = xmin_epoch;
1245 if (nextXid < xmin)
1246 xmin_epoch--;
1247 if (nextXid < catalog_xmin)
1248 catalog_xmin_epoch--;
1249
1250 elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1251 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1252
1253 /* Construct the message and send it. */
1258 pq_sendint32(&reply_message, xmin_epoch);
1259 pq_sendint32(&reply_message, catalog_xmin);
1260 pq_sendint32(&reply_message, catalog_xmin_epoch);
1262 if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1263 primary_has_standby_xmin = true;
1264 else
1265 primary_has_standby_xmin = false;
1266}
uint32_t uint32
Definition: c.h:552
uint32 TransactionId
Definition: c.h:672
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:144
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152
void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin)
Definition: procarray.c:1997
#define PqReplMsg_HotStandbyFeedback
Definition: protocol.h:82
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:126
#define InvalidTransactionId
Definition: transam.h:31
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
#define TransactionIdIsValid(xid)
Definition: transam.h:41
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:288
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:457
bool HotStandbyActive(void)

References StringInfoData::data, DEBUG2, elog, EpochFromFullTransactionId, GetCurrentTimestamp(), GetReplicationHorizons(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), pq_sendbyte(), pq_sendint32(), pq_sendint64(), PqReplMsg_HotStandbyFeedback, ReadNextFullTransactionId(), reply_message, resetStringInfo(), TransactionIdIsValid, wakeup, wal_receiver_status_interval, walrcv_send, WALRCV_WAKEUP_HSFEEDBACK, WalRcvComputeNextWakeup(), wrconn, and XidFromFullTransactionId.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

◆ XLogWalRcvSendReply()

static void XLogWalRcvSendReply ( bool  force,
bool  requestReply 
)
static

Definition at line 1110 of file walreceiver.c.

1111{
1112 static XLogRecPtr writePtr = 0;
1113 static XLogRecPtr flushPtr = 0;
1114 XLogRecPtr applyPtr;
1116
1117 /*
1118 * If the user doesn't want status to be reported to the primary, be sure
1119 * to exit before doing anything at all.
1120 */
1121 if (!force && wal_receiver_status_interval <= 0)
1122 return;
1123
1124 /* Get current timestamp. */
1126
1127 /*
1128 * We can compare the write and flush positions to the last message we
1129 * sent without taking any lock, but the apply position requires a spin
1130 * lock, so we don't check that unless something else has changed or 10
1131 * seconds have passed. This means that the apply WAL location will
1132 * appear, from the primary's point of view, to lag slightly, but since
1133 * this is only for reporting purposes and only on idle systems, that's
1134 * probably OK.
1135 */
1136 if (!force
1137 && writePtr == LogstreamResult.Write
1138 && flushPtr == LogstreamResult.Flush
1140 return;
1141
1142 /* Make sure we wake up when it's time to send another reply. */
1144
1145 /* Construct a new message */
1146 writePtr = LogstreamResult.Write;
1147 flushPtr = LogstreamResult.Flush;
1148 applyPtr = GetXLogReplayRecPtr(NULL);
1149
1152 pq_sendint64(&reply_message, writePtr);
1153 pq_sendint64(&reply_message, flushPtr);
1154 pq_sendint64(&reply_message, applyPtr);
1156 pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1157
1158 /* Send it */
1159 elog(DEBUG2, "sending write %X/%08X flush %X/%08X apply %X/%08X%s",
1160 LSN_FORMAT_ARGS(writePtr),
1161 LSN_FORMAT_ARGS(flushPtr),
1162 LSN_FORMAT_ARGS(applyPtr),
1163 requestReply ? " (reply requested)" : "");
1164
1166}
#define PqReplMsg_StandbyStatusUpdate
Definition: protocol.h:84

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), GetXLogReplayRecPtr(), StringInfoData::len, LogstreamResult, LSN_FORMAT_ARGS, now(), pq_sendbyte(), pq_sendint64(), PqReplMsg_StandbyStatusUpdate, reply_message, resetStringInfo(), wakeup, wal_receiver_status_interval, walrcv_send, WALRCV_WAKEUP_REPLY, WalRcvComputeNextWakeup(), and wrconn.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

◆ XLogWalRcvWrite()

static void XLogWalRcvWrite ( char *  buf,
Size  nbytes,
XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 891 of file walreceiver.c.

892{
893 int startoff;
894 int byteswritten;
896
897 Assert(tli != 0);
898
899 while (nbytes > 0)
900 {
901 int segbytes;
902
903 /* Close the current segment if it's completed */
904 if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
905 XLogWalRcvClose(recptr, tli);
906
907 if (recvFile < 0)
908 {
909 /* Create/use new log file */
912 recvFileTLI = tli;
913 }
914
915 /* Calculate the start offset of the received logs */
916 startoff = XLogSegmentOffset(recptr, wal_segment_size);
917
918 if (startoff + nbytes > wal_segment_size)
919 segbytes = wal_segment_size - startoff;
920 else
921 segbytes = nbytes;
922
923 /* OK to write the logs */
924 errno = 0;
925
926 /*
927 * Measure I/O timing to write WAL data, for pg_stat_io.
928 */
930
931 pgstat_report_wait_start(WAIT_EVENT_WAL_WRITE);
932 byteswritten = pg_pwrite(recvFile, buf, segbytes, (pgoff_t) startoff);
934
936 IOOP_WRITE, start, 1, byteswritten);
937
938 if (byteswritten <= 0)
939 {
940 char xlogfname[MAXFNAMELEN];
941 int save_errno;
942
943 /* if write didn't set errno, assume no disk space */
944 if (errno == 0)
945 errno = ENOSPC;
946
947 save_errno = errno;
949 errno = save_errno;
952 errmsg("could not write to WAL segment %s "
953 "at offset %d, length %d: %m",
954 xlogfname, startoff, segbytes)));
955 }
956
957 /* Update state for write */
958 recptr += byteswritten;
959
960 nbytes -= byteswritten;
961 buf += byteswritten;
962
963 LogstreamResult.Write = recptr;
964 }
965
966 /* Update shared-memory status */
968
969 /*
970 * If we wrote an LSN that someone was waiting for, notify the waiters.
971 */
972 if (waitLSNState &&
973 (LogstreamResult.Write >=
976
977 /*
978 * Close the current segment if it's fully written up in the last cycle of
979 * the loop, to create its archive notification file soon. Otherwise WAL
980 * archiving of the segment will be delayed until any data in the next
981 * segment is received and written.
982 */
983 if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
984 XLogWalRcvClose(recptr, tli);
985}
return str start
@ IOOBJECT_WAL
Definition: pgstat.h:279
@ IOCONTEXT_NORMAL
Definition: pgstat.h:289
@ IOOP_WRITE
Definition: pgstat.h:316
instr_time pgstat_prepare_io_time(bool track_io_guc)
Definition: pgstat_io.c:91
void pgstat_count_io_op_time(IOObject io_object, IOContext io_context, IOOp io_op, instr_time start_time, uint32 cnt, uint64 bytes)
Definition: pgstat_io.c:122
#define pg_pwrite
Definition: port.h:248
off_t pgoff_t
Definition: port.h:421
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:69
static void pgstat_report_wait_end(void)
Definition: wait_event.h:85
static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
Definition: walreceiver.c:1058
int XLogFileInit(XLogSegNo logsegno, TimeLineID logtli)
Definition: xlog.c:3418
bool track_wal_io_timing
Definition: xlog.c:140
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
@ WAIT_LSN_TYPE_STANDBY_WRITE
Definition: xlogwait.h:40

References Assert(), buf, ereport, errcode_for_file_access(), errmsg(), IOCONTEXT_NORMAL, IOOBJECT_WAL, IOOP_WRITE, LogstreamResult, MAXFNAMELEN, WaitLSNState::minWaitedLSN, PANIC, pg_atomic_read_u64(), pg_atomic_write_u64(), pg_pwrite, pgstat_count_io_op_time(), pgstat_prepare_io_time(), pgstat_report_wait_end(), pgstat_report_wait_start(), recvFile, recvFileTLI, recvSegNo, start, track_wal_io_timing, WAIT_LSN_TYPE_STANDBY_WRITE, waitLSNState, WaitLSNWakeup(), wal_segment_size, WalRcv, WalRcvData::writtenUpto, XLByteInSeg, XLByteToSeg, XLogFileInit(), XLogFileName(), XLogSegmentOffset, and XLogWalRcvClose().

Referenced by XLogWalRcvProcessMsg().

Variable Documentation

◆ Flush

XLogRecPtr Flush

Definition at line 113 of file walreceiver.c.

Referenced by XLogWrite().

◆ hot_standby_feedback

bool hot_standby_feedback

◆ 

struct { ... } LogstreamResult

◆ recvFile

int recvFile = -1
static

Definition at line 102 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ recvFileTLI

TimeLineID recvFileTLI = 0
static

Definition at line 103 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), and XLogWalRcvWrite().

◆ recvSegNo

XLogSegNo recvSegNo = 0
static

Definition at line 104 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ reply_message

StringInfoData reply_message
static

◆ wakeup

◆ wal_receiver_status_interval

int wal_receiver_status_interval

◆ wal_receiver_timeout

int wal_receiver_timeout

◆ WalReceiverFunctions

WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 95 of file walreceiver.c.

Referenced by _PG_init(), and WalReceiverMain().

◆ wrconn

◆ Write

XLogRecPtr Write

Definition at line 112 of file walreceiver.c.

Referenced by XLogWrite().