PostgreSQL Source Code git master
Loading...
Searching...
No Matches
walreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "access/xlogrecovery.h"
#include "access/xlogwait.h"
#include "catalog/pg_authid.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/auxprocess.h"
#include "postmaster/interrupt.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/timestamp.h"
#include "utils/wait_event.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NUM_WALRCV_WAKEUPS   (WALRCV_WAKEUP_HSFEEDBACK + 1)
 

Typedefs

typedef enum WalRcvWakeupReason WalRcvWakeupReason
 

Enumerations

enum  WalRcvWakeupReason { WALRCV_WAKEUP_TERMINATE , WALRCV_WAKEUP_PING , WALRCV_WAKEUP_REPLY , WALRCV_WAKEUP_HSFEEDBACK }
 

Functions

static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len, TimeLineID tli)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvFlush (bool dying, TimeLineID tli)
 
static void XLogWalRcvClose (XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvSendReply (bool force, bool requestReply, bool checkApply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
static void WalRcvComputeNextWakeup (WalRcvWakeupReason reason, TimestampTz now)
 
void WalReceiverMain (const void *startup_data, size_t startup_data_len)
 
void WalRcvRequestApplyReply (void)
 
static const charWalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
struct { 
 
   XLogRecPtr   Write 
 
   XLogRecPtr   Flush 
 
LogstreamResult 
 
static TimestampTz wakeup [NUM_WALRCV_WAKEUPS]
 
static StringInfoData reply_message
 

Macro Definition Documentation

◆ NUM_WALRCV_WAKEUPS

#define NUM_WALRCV_WAKEUPS   (WALRCV_WAKEUP_HSFEEDBACK + 1)

Definition at line 126 of file walreceiver.c.

Typedef Documentation

◆ WalRcvWakeupReason

Enumeration Type Documentation

◆ WalRcvWakeupReason

Enumerator
WALRCV_WAKEUP_TERMINATE 
WALRCV_WAKEUP_PING 
WALRCV_WAKEUP_REPLY 
WALRCV_WAKEUP_HSFEEDBACK 

Definition at line 120 of file walreceiver.c.

121{
126#define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1)
WalRcvWakeupReason
@ WALRCV_WAKEUP_TERMINATE
@ WALRCV_WAKEUP_REPLY
@ WALRCV_WAKEUP_PING
@ WALRCV_WAKEUP_HSFEEDBACK

Function Documentation

◆ pg_stat_get_wal_receiver()

Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1436 of file walreceiver.c.

1437{
1438 TupleDesc tupdesc;
1439 Datum *values;
1440 bool *nulls;
1441 int pid;
1442 bool ready_to_display;
1449 TimestampTz last_send_time;
1453 char sender_host[NI_MAXHOST];
1454 int sender_port = 0;
1455 char slotname[NAMEDATALEN];
1456 char conninfo[MAXCONNINFO];
1457
1458 /* Take a lock to ensure value consistency */
1460 pid = (int) WalRcv->pid;
1461 ready_to_display = WalRcv->ready_to_display;
1467 last_send_time = WalRcv->lastMsgSendTime;
1471 strlcpy(slotname, WalRcv->slotname, sizeof(slotname));
1472 strlcpy(sender_host, WalRcv->sender_host, sizeof(sender_host));
1473 sender_port = WalRcv->sender_port;
1474 strlcpy(conninfo, WalRcv->conninfo, sizeof(conninfo));
1476
1477 /*
1478 * No WAL receiver (or not ready yet), just return a tuple with NULL
1479 * values
1480 */
1481 if (pid == 0 || !ready_to_display)
1483
1484 /*
1485 * Read "writtenUpto" without holding a spinlock. Note that it may not be
1486 * consistent with the other shared variables of the WAL receiver
1487 * protected by a spinlock, but this should not be used for data integrity
1488 * checks.
1489 */
1491
1492 /* determine result type */
1493 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1494 elog(ERROR, "return type must be a row type");
1495
1496 values = palloc0_array(Datum, tupdesc->natts);
1497 nulls = palloc0_array(bool, tupdesc->natts);
1498
1499 /* Fetch values */
1500 values[0] = Int32GetDatum(pid);
1501
1503 {
1504 /*
1505 * Only superusers and roles with privileges of pg_read_all_stats can
1506 * see details. Other users only get the pid value to know whether it
1507 * is a WAL receiver, but no details.
1508 */
1509 memset(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1510 }
1511 else
1512 {
1514
1516 nulls[2] = true;
1517 else
1521 nulls[4] = true;
1522 else
1525 nulls[5] = true;
1526 else
1529 if (last_send_time == 0)
1530 nulls[7] = true;
1531 else
1532 values[7] = TimestampTzGetDatum(last_send_time);
1533 if (last_receipt_time == 0)
1534 nulls[8] = true;
1535 else
1538 nulls[9] = true;
1539 else
1541 if (latest_end_time == 0)
1542 nulls[10] = true;
1543 else
1545 if (*slotname == '\0')
1546 nulls[11] = true;
1547 else
1548 values[11] = CStringGetTextDatum(slotname);
1549 if (*sender_host == '\0')
1550 nulls[12] = true;
1551 else
1552 values[12] = CStringGetTextDatum(sender_host);
1553 if (sender_port == 0)
1554 nulls[13] = true;
1555 else
1556 values[13] = Int32GetDatum(sender_port);
1557 if (*conninfo == '\0')
1558 nulls[14] = true;
1559 else
1560 values[14] = CStringGetTextDatum(conninfo);
1561 }
1562
1563 /* Returns the record as Datum */
1565}
bool has_privs_of_role(Oid member, Oid role)
Definition acl.c:5314
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition atomics.h:467
static Datum values[MAXATTR]
Definition bootstrap.c:190
#define CStringGetTextDatum(s)
Definition builtins.h:98
int64 TimestampTz
Definition timestamp.h:39
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define palloc0_array(type, count)
Definition fe_memutils.h:92
#define PG_RETURN_NULL()
Definition fmgr.h:346
#define PG_RETURN_DATUM(x)
Definition fmgr.h:354
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition funcapi.c:276
@ TYPEFUNC_COMPOSITE
Definition funcapi.h:149
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
Definition funcapi.h:230
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1025
Oid GetUserId(void)
Definition miscinit.c:470
#define NAMEDATALEN
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition strlcpy.c:45
uint64_t Datum
Definition postgres.h:70
static Datum Int32GetDatum(int32 X)
Definition postgres.h:212
static int fb(int x)
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
TimestampTz lastMsgReceiptTime
XLogRecPtr latestWalEnd
TimeLineID receiveStartTLI
Definition walreceiver.h:88
TimeLineID receivedTLI
Definition walreceiver.h:98
char slotname[NAMEDATALEN]
char sender_host[NI_MAXHOST]
XLogRecPtr receiveStart
Definition walreceiver.h:87
XLogRecPtr flushedUpto
Definition walreceiver.h:97
pg_atomic_uint64 writtenUpto
TimestampTz lastMsgSendTime
WalRcvState walRcvState
Definition walreceiver.h:72
TimestampTz latestWalEndTime
bool ready_to_display
slock_t mutex
char conninfo[MAXCONNINFO]
static Datum TimestampTzGetDatum(TimestampTz X)
Definition timestamp.h:52
static const char * WalRcvGetStateString(WalRcvState state)
#define MAXCONNINFO
Definition walreceiver.h:37
WalRcvState
Definition walreceiver.h:46
WalRcvData * WalRcv
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
uint64 XLogRecPtr
Definition xlogdefs.h:21
uint32 TimeLineID
Definition xlogdefs.h:63

References WalRcvData::conninfo, CStringGetTextDatum, elog, ERROR, fb(), WalRcvData::flushedUpto, get_call_result_type(), GetUserId(), has_privs_of_role(), heap_form_tuple(), HeapTupleGetDatum(), Int32GetDatum(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum(), MAXCONNINFO, WalRcvData::mutex, NAMEDATALEN, TupleDescData::natts, palloc0_array, pg_atomic_read_u64(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::sender_host, WalRcvData::sender_port, WalRcvData::slotname, SpinLockAcquire(), SpinLockRelease(), strlcpy(), TimestampTzGetDatum(), TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, WalRcvData::writtenUpto, and XLogRecPtrIsValid.

◆ ProcessWalSndrMessage()

static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1298 of file walreceiver.c.

1299{
1301 TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1302
1303 /* Update shared-memory status */
1304 SpinLockAcquire(&walrcv->mutex);
1305 if (walrcv->latestWalEnd < walEnd)
1306 walrcv->latestWalEndTime = sendTime;
1307 walrcv->latestWalEnd = walEnd;
1308 walrcv->lastMsgSendTime = sendTime;
1309 walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1310 SpinLockRelease(&walrcv->mutex);
1311
1313 {
1314 char *sendtime;
1315 char *receipttime;
1316 int applyDelay;
1317
1318 /* Copy because timestamptz_to_str returns a static buffer */
1320 receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1322
1323 /* apply delay is not available */
1324 if (applyDelay == -1)
1325 elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1326 sendtime,
1329 else
1330 elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1331 sendtime,
1333 applyDelay,
1335
1336 pfree(sendtime);
1338 }
1339}
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1649
const char * timestamptz_to_str(TimestampTz t)
Definition timestamp.c:1870
bool message_level_is_interesting(int elevel)
Definition elog.c:285
#define DEBUG2
Definition elog.h:30
char * pstrdup(const char *in)
Definition mcxt.c:1910
void pfree(void *pointer)
Definition mcxt.c:1619
int GetReplicationApplyDelay(void)
int GetReplicationTransferLatency(void)

References DEBUG2, elog, fb(), GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), message_level_is_interesting(), pfree(), pstrdup(), SpinLockAcquire(), SpinLockRelease(), timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

◆ WalRcvComputeNextWakeup()

static void WalRcvComputeNextWakeup ( WalRcvWakeupReason  reason,
TimestampTz  now 
)
static

Definition at line 1350 of file walreceiver.c.

1351{
1352 switch (reason)
1353 {
1355 if (wal_receiver_timeout <= 0)
1356 wakeup[reason] = TIMESTAMP_INFINITY;
1357 else
1359 break;
1360 case WALRCV_WAKEUP_PING:
1361 if (wal_receiver_timeout <= 0)
1362 wakeup[reason] = TIMESTAMP_INFINITY;
1363 else
1365 break;
1368 wakeup[reason] = TIMESTAMP_INFINITY;
1369 else
1371 break;
1374 wakeup[reason] = TIMESTAMP_INFINITY;
1375 else
1377 break;
1378 /* there's intentionally no default: here */
1379 }
1380}
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1613
#define TIMESTAMP_INFINITY
Definition timestamp.h:151
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
#define TimestampTzPlusSeconds(tz, s)
Definition timestamp.h:86
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]
bool hot_standby_feedback
Definition walreceiver.c:92
int wal_receiver_status_interval
Definition walreceiver.c:90
int wal_receiver_timeout
Definition walreceiver.c:91

References hot_standby_feedback, now(), TIMESTAMP_INFINITY, TimestampTzPlusMilliseconds, TimestampTzPlusSeconds, wakeup, wal_receiver_status_interval, wal_receiver_timeout, WALRCV_WAKEUP_HSFEEDBACK, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_REPLY, and WALRCV_WAKEUP_TERMINATE.

Referenced by WalReceiverMain(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ WalRcvDie()

static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 795 of file walreceiver.c.

796{
799
800 Assert(*startpointTLI_p != 0);
801
802 /* Ensure that all WAL records received are flushed to disk */
804
805 /* Mark ourselves inactive in shared memory */
806 SpinLockAcquire(&walrcv->mutex);
807 Assert(walrcv->walRcvState == WALRCV_STREAMING ||
808 walrcv->walRcvState == WALRCV_CONNECTING ||
809 walrcv->walRcvState == WALRCV_RESTARTING ||
810 walrcv->walRcvState == WALRCV_STARTING ||
811 walrcv->walRcvState == WALRCV_WAITING ||
812 walrcv->walRcvState == WALRCV_STOPPING);
813 Assert(walrcv->pid == MyProcPid);
814 walrcv->walRcvState = WALRCV_STOPPED;
815 walrcv->pid = 0;
816 walrcv->procno = INVALID_PROC_NUMBER;
817 walrcv->ready_to_display = false;
818 SpinLockRelease(&walrcv->mutex);
819
820 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
821
822 /* Terminate the connection gracefully. */
823 if (wrconn != NULL)
825
826 /* Wake up the startup process to notice promptly that we're gone */
828}
#define Assert(condition)
Definition c.h:943
void ConditionVariableBroadcast(ConditionVariable *cv)
Datum arg
Definition elog.c:1323
int MyProcPid
Definition globals.c:49
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:332
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
static WalReceiverConn * wrconn
Definition walreceiver.c:95
static void XLogWalRcvFlush(bool dying, TimeLineID tli)
@ WALRCV_STARTING
Definition walreceiver.h:48
@ WALRCV_STOPPED
Definition walreceiver.h:47
@ WALRCV_CONNECTING
Definition walreceiver.h:50
@ WALRCV_RESTARTING
Definition walreceiver.h:53
@ WALRCV_STREAMING
Definition walreceiver.h:51
@ WALRCV_WAITING
Definition walreceiver.h:52
@ WALRCV_STOPPING
Definition walreceiver.h:54
#define walrcv_disconnect(conn)
void WakeupRecovery(void)

References arg, Assert, ConditionVariableBroadcast(), DatumGetPointer(), fb(), INVALID_PROC_NUMBER, MyProcPid, SpinLockAcquire(), SpinLockRelease(), WakeupRecovery(), WalRcv, WALRCV_CONNECTING, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, wrconn, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

◆ WalRcvFetchTimeLineHistoryFiles()

static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 739 of file walreceiver.c.

740{
741 TimeLineID tli;
742
743 for (tli = first; tli <= last; tli++)
744 {
745 /* there's no history file for timeline 1 */
746 if (tli != 1 && !existsTimeLineHistory(tli))
747 {
748 char *fname;
749 char *content;
750 int len;
752
753 ereport(LOG,
754 (errmsg("fetching timeline history file for timeline %u from primary server",
755 tli)));
756
757 walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
758
759 /*
760 * Check that the filename on the primary matches what we
761 * calculated ourselves. This is just a sanity check, it should
762 * always match.
763 */
765 if (strcmp(fname, expectedfname) != 0)
768 errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
769 tli)));
770
771 /*
772 * Write the file to pg_wal.
773 */
774 writeTimeLineHistoryFile(tli, content, len);
775
776 /*
777 * Mark the streamed history file as ready for archiving if
778 * archive_mode is always.
779 */
782 else
783 XLogArchiveNotify(fname);
784
785 pfree(fname);
786 pfree(content);
787 }
788 }
789}
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition timeline.c:464
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition timeline.c:223
int errcode(int sqlerrcode)
Definition elog.c:875
#define LOG
Definition elog.h:32
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define ereport(elevel,...)
Definition elog.h:152
#define ERRCODE_PROTOCOL_VIOLATION
Definition fe-connect.c:96
static char * errmsg
const void size_t len
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
int XLogArchiveMode
Definition xlog.c:126
@ ARCHIVE_MODE_ALWAYS
Definition xlog.h:69
#define MAXFNAMELEN
static void TLHistoryFileName(char *fname, TimeLineID tli)
void XLogArchiveForceDone(const char *xlog)
void XLogArchiveNotify(const char *xlog)

References ARCHIVE_MODE_ALWAYS, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg, errmsg_internal(), ERROR, existsTimeLineHistory(), fb(), len, LOG, MAXFNAMELEN, pfree(), TLHistoryFileName(), walrcv_readtimelinehistoryfile, wrconn, writeTimeLineHistoryFile(), XLogArchiveForceDone(), XLogArchiveMode, and XLogArchiveNotify().

Referenced by WalReceiverMain().

◆ WalRcvGetStateString()

static const char * WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1409 of file walreceiver.c.

1410{
1411 switch (state)
1412 {
1413 case WALRCV_STOPPED:
1414 return "stopped";
1415 case WALRCV_STARTING:
1416 return "starting";
1417 case WALRCV_CONNECTING:
1418 return "connecting";
1419 case WALRCV_STREAMING:
1420 return "streaming";
1421 case WALRCV_WAITING:
1422 return "waiting";
1423 case WALRCV_RESTARTING:
1424 return "restarting";
1425 case WALRCV_STOPPING:
1426 return "stopping";
1427 }
1428 return "UNKNOWN";
1429}

References WALRCV_CONNECTING, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

◆ WalRcvRequestApplyReply()

void WalRcvRequestApplyReply ( void  )

Definition at line 1391 of file walreceiver.c.

1392{
1393 ProcNumber procno;
1394
1396 /* fetching the proc number is probably atomic, but don't rely on it */
1398 procno = WalRcv->procno;
1400 if (procno != INVALID_PROC_NUMBER)
1401 SetLatch(&GetPGProcByNumber(procno)->procLatch);
1402}
void SetLatch(Latch *latch)
Definition latch.c:290
#define GetPGProcByNumber(n)
Definition proc.h:504
int ProcNumber
Definition procnumber.h:24
ProcNumber procno
Definition walreceiver.h:68
sig_atomic_t apply_reply_requested

References WalRcvData::apply_reply_requested, GetPGProcByNumber, INVALID_PROC_NUMBER, WalRcvData::mutex, WalRcvData::procno, SetLatch(), SpinLockAcquire(), SpinLockRelease(), and WalRcv.

Referenced by ApplyWalRecord(), and WaitForWALToBecomeAvailable().

◆ WalRcvWaitForStartPosition()

static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 659 of file walreceiver.c.

660{
662 int state;
663
664 SpinLockAcquire(&walrcv->mutex);
665 state = walrcv->walRcvState;
667 {
668 SpinLockRelease(&walrcv->mutex);
669 if (state == WALRCV_STOPPING)
670 proc_exit(0);
671 else
672 elog(FATAL, "unexpected walreceiver state");
673 }
674 walrcv->walRcvState = WALRCV_WAITING;
675 walrcv->receiveStart = InvalidXLogRecPtr;
676 walrcv->receiveStartTLI = 0;
677 SpinLockRelease(&walrcv->mutex);
678
679 set_ps_display("idle");
680
681 /*
682 * nudge startup process to notice that we've stopped streaming and are
683 * now waiting for instructions.
684 */
686 for (;;)
687 {
689
691
692 SpinLockAcquire(&walrcv->mutex);
693 Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
694 walrcv->walRcvState == WALRCV_WAITING ||
695 walrcv->walRcvState == WALRCV_STOPPING);
696 if (walrcv->walRcvState == WALRCV_RESTARTING)
697 {
698 /*
699 * No need to handle changes in primary_conninfo or
700 * primary_slot_name here. Startup process will signal us to
701 * terminate in case those change.
702 */
703 *startpoint = walrcv->receiveStart;
704 *startpointTLI = walrcv->receiveStartTLI;
705 walrcv->walRcvState = WALRCV_CONNECTING;
706 SpinLockRelease(&walrcv->mutex);
707 break;
708 }
709 if (walrcv->walRcvState == WALRCV_STOPPING)
710 {
711 /*
712 * We should've received SIGTERM if the startup process wants us
713 * to die, but might as well check it here too.
714 */
715 SpinLockRelease(&walrcv->mutex);
716 proc_exit(1);
717 }
718 SpinLockRelease(&walrcv->mutex);
719
722 }
723
725 {
726 char activitymsg[50];
727
728 snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%08X",
729 LSN_FORMAT_ARGS(*startpoint));
731 }
732}
#define FATAL
Definition elog.h:42
struct Latch * MyLatch
Definition globals.c:65
void proc_exit(int code)
Definition ipc.c:105
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
#define snprintf
Definition port.h:261
bool update_process_title
Definition ps_status.c:31
static void set_ps_display(const char *activity)
Definition ps_status.h:40
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References Assert, CHECK_FOR_INTERRUPTS, elog, FATAL, fb(), InvalidXLogRecPtr, LSN_FORMAT_ARGS, MyLatch, proc_exit(), ResetLatch(), set_ps_display(), snprintf, SpinLockAcquire(), SpinLockRelease(), update_process_title, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_CONNECTING, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by WalReceiverMain().

◆ WalReceiverMain()

void WalReceiverMain ( const void startup_data,
size_t  startup_data_len 
)

Definition at line 154 of file walreceiver.c.

155{
156 char conninfo[MAXCONNINFO];
157 char *tmp_conninfo;
158 char slotname[NAMEDATALEN];
159 bool is_temp_slot;
160 XLogRecPtr startpoint;
161 TimeLineID startpointTLI;
163 bool first_stream;
166 char *err;
167 char *sender_host = NULL;
168 int sender_port = 0;
169 char *appname;
170
172
174
175 /*
176 * WalRcv should be set up already (if we are a backend, we inherit this
177 * by fork() or EXEC_BACKEND mechanism from the postmaster).
178 */
179 walrcv = WalRcv;
180 Assert(walrcv != NULL);
181
182 /*
183 * Mark walreceiver as running in shared memory.
184 *
185 * Do this as early as possible, so that if we fail later on, we'll set
186 * state to STOPPED. If we die before this, the startup process will keep
187 * waiting for us to start up, until it times out.
188 */
189 SpinLockAcquire(&walrcv->mutex);
190 Assert(walrcv->pid == 0);
191 switch (walrcv->walRcvState)
192 {
193 case WALRCV_STOPPING:
194 /* If we've already been requested to stop, don't start up. */
195 walrcv->walRcvState = WALRCV_STOPPED;
197
198 case WALRCV_STOPPED:
199 SpinLockRelease(&walrcv->mutex);
200 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
201 proc_exit(1);
202 break;
203
204 case WALRCV_STARTING:
205 /* The usual case */
206 break;
207
209 case WALRCV_WAITING:
210 case WALRCV_STREAMING:
212 default:
213 /* Shouldn't happen */
214 SpinLockRelease(&walrcv->mutex);
215 elog(PANIC, "walreceiver still running according to shared memory state");
216 }
217 /* Advertise our PID so that the startup process can kill us */
218 walrcv->pid = MyProcPid;
219 walrcv->walRcvState = WALRCV_CONNECTING;
220
221 /* Fetch information required to start streaming */
222 walrcv->ready_to_display = false;
223 strlcpy(conninfo, walrcv->conninfo, MAXCONNINFO);
224 strlcpy(slotname, walrcv->slotname, NAMEDATALEN);
225 is_temp_slot = walrcv->is_temp_slot;
226 startpoint = walrcv->receiveStart;
227 startpointTLI = walrcv->receiveStartTLI;
228
229 /*
230 * At most one of is_temp_slot and slotname can be set; otherwise,
231 * RequestXLogStreaming messed up.
232 */
233 Assert(!is_temp_slot || (slotname[0] == '\0'));
234
235 /* Initialise to a sanish value */
237 walrcv->lastMsgSendTime =
238 walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
239
240 /* Report our proc number so that others can wake us up */
241 walrcv->procno = MyProcNumber;
242
243 SpinLockRelease(&walrcv->mutex);
244
245 /* Arrange to clean up at walreceiver exit */
246 on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
247
248 /* Properly accept or ignore signals the postmaster might send us */
249 pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
250 * file */
252 pqsignal(SIGTERM, die); /* request shutdown */
253 /* SIGQUIT handler was already set up by InitPostmasterChild */
258
259 /* Reset some signals that are accepted by postmaster but not here */
261
262 /* Load the libpq-specific functions */
263 load_file("libpqwalreceiver", false);
265 elog(ERROR, "libpqwalreceiver didn't initialize correctly");
266
267 /* Unblock signals (they were blocked when the postmaster forked us) */
269
270 /* Establish the connection to the primary for XLOG streaming */
271 appname = cluster_name[0] ? cluster_name : "walreceiver";
272 wrconn = walrcv_connect(conninfo, true, false, false, appname, &err);
273 if (!wrconn)
276 errmsg("streaming replication receiver \"%s\" could not connect to the primary server: %s",
277 appname, err)));
278
279 /*
280 * Save user-visible connection string. This clobbers the original
281 * conninfo, for security. Also save host and port of the sender server
282 * this walreceiver is connected to.
283 */
285 walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
286 SpinLockAcquire(&walrcv->mutex);
287 memset(walrcv->conninfo, 0, MAXCONNINFO);
288 if (tmp_conninfo)
290
291 memset(walrcv->sender_host, 0, NI_MAXHOST);
292 if (sender_host)
293 strlcpy(walrcv->sender_host, sender_host, NI_MAXHOST);
294
295 walrcv->sender_port = sender_port;
296 walrcv->ready_to_display = true;
297 SpinLockRelease(&walrcv->mutex);
298
299 if (tmp_conninfo)
301
302 if (sender_host)
303 pfree(sender_host);
304
305 /* Initialize buffers for processing messages */
307
308 first_stream = true;
309 for (;;)
310 {
311 char *primary_sysid;
312 char standby_sysid[32];
314
315 /*
316 * Check that we're connected to a valid server using the
317 * IDENTIFY_SYSTEM replication command.
318 */
320
324 {
327 errmsg("database system identifier differs between the primary and standby"),
328 errdetail("The primary's identifier is %s, the standby's identifier is %s.",
330 }
332
333 /*
334 * Confirm that the current timeline of the primary is the same or
335 * ahead of ours.
336 */
337 if (primaryTLI < startpointTLI)
340 errmsg("highest timeline %u of the primary is behind recovery timeline %u",
341 primaryTLI, startpointTLI)));
342
343 /*
344 * Get any missing history files. We do this always, even when we're
345 * not interested in that timeline, so that if we're promoted to
346 * become the primary later on, we don't select the same timeline that
347 * was already used in the current primary. This isn't bullet-proof -
348 * you'll need some external software to manage your cluster if you
349 * need to ensure that a unique timeline id is chosen in every case,
350 * but let's avoid the confusion of timeline id collisions where we
351 * can.
352 */
354
355 /*
356 * Create temporary replication slot if requested, and update slot
357 * name in shared memory. (Note the slot name cannot already be set
358 * in this case.)
359 */
360 if (is_temp_slot)
361 {
362 snprintf(slotname, sizeof(slotname),
363 "pg_walreceiver_%lld",
364 (long long int) walrcv_get_backend_pid(wrconn));
365
366 walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL);
367
368 SpinLockAcquire(&walrcv->mutex);
369 strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
370 SpinLockRelease(&walrcv->mutex);
371 }
372
373 /*
374 * Start streaming.
375 *
376 * We'll try to start at the requested starting point and timeline,
377 * even if it's different from the server's latest timeline. In case
378 * we've already reached the end of the old timeline, the server will
379 * finish the streaming immediately, and we will go back to await
380 * orders from the startup process. If recovery_target_timeline is
381 * 'latest', the startup process will scan pg_wal and find the new
382 * history file, bump recovery target timeline, and ask us to restart
383 * on the new timeline.
384 */
385 options.logical = false;
386 options.startpoint = startpoint;
387 options.slotname = slotname[0] != '\0' ? slotname : NULL;
388 options.proto.physical.startpointTLI = startpointTLI;
390 {
391 if (first_stream)
392 ereport(LOG,
393 errmsg("started streaming WAL from primary at %X/%08X on timeline %u",
394 LSN_FORMAT_ARGS(startpoint), startpointTLI));
395 else
396 ereport(LOG,
397 errmsg("restarted WAL streaming at %X/%08X on timeline %u",
398 LSN_FORMAT_ARGS(startpoint), startpointTLI));
399 first_stream = false;
400
401 /*
402 * Switch to STREAMING after a successful connection if current
403 * state is CONNECTING. This switch happens after an initial
404 * startup, or after a restart as determined by
405 * WalRcvWaitForStartPosition().
406 */
407 SpinLockAcquire(&walrcv->mutex);
408 if (walrcv->walRcvState == WALRCV_CONNECTING)
409 walrcv->walRcvState = WALRCV_STREAMING;
410 SpinLockRelease(&walrcv->mutex);
411
412 /* Initialize LogstreamResult for processing messages */
414
415 /* Initialize nap wakeup times. */
417 for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
419
420 /* Send initial reply/feedback messages. */
421 XLogWalRcvSendReply(true, false, false);
423
424 /* Loop until end-of-streaming or error */
425 for (;;)
426 {
427 char *buf;
428 int len;
429 bool endofwal = false;
431 int rc;
433 long nap;
434
435 /*
436 * Exit walreceiver if we're not in recovery. This should not
437 * happen, but cross-check the status here.
438 */
439 if (!RecoveryInProgress())
442 errmsg("cannot continue WAL streaming, recovery has already ended")));
443
444 /* Process any requests or signals received recently */
446
448 {
449 ConfigReloadPending = false;
451 /* recompute wakeup times */
453 for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
456 }
457
458 /* See if we can read data immediately */
460 if (len != 0)
461 {
462 /*
463 * Process the received data, and any subsequent data we
464 * can read without blocking.
465 */
466 for (;;)
467 {
468 if (len > 0)
469 {
470 /*
471 * Something was received from primary, so adjust
472 * the ping and terminate wakeup times.
473 */
476 now);
478 XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
479 startpointTLI);
480 }
481 else if (len == 0)
482 break;
483 else if (len < 0)
484 {
485 ereport(LOG,
486 (errmsg("replication terminated by primary server"),
487 errdetail("End of WAL reached on timeline %u at %X/%08X.",
488 startpointTLI,
490 endofwal = true;
491 break;
492 }
494 }
495
496 /* Let the primary know that we received some data. */
497 XLogWalRcvSendReply(false, false, false);
498
499 /*
500 * If we've written some records, flush them to disk and
501 * let the startup process and primary server know about
502 * them.
503 */
504 XLogWalRcvFlush(false, startpointTLI);
505 }
506
507 /* Check if we need to exit the streaming loop. */
508 if (endofwal)
509 break;
510
511 /* Find the soonest wakeup time, to limit our nap. */
513 for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
515
516 /* Calculate the nap time, clamping as necessary. */
519
520 /*
521 * Ideally we would reuse a WaitEventSet object repeatedly
522 * here to avoid the overheads of WaitLatchOrSocket on epoll
523 * systems, but we can't be sure that libpq (or any other
524 * walreceiver implementation) has the same socket (even if
525 * the fd is the same number, it may have been closed and
526 * reopened since the last time). In future, if there is a
527 * function for removing sockets from WaitEventSet, then we
528 * could add and remove just the socket each time, potentially
529 * avoiding some system calls.
530 */
535 wait_fd,
536 nap,
538 if (rc & WL_LATCH_SET)
539 {
542
543 if (walrcv->apply_reply_requested)
544 {
545 /*
546 * The recovery process has asked us to send apply
547 * feedback now. Make sure the flag is really set to
548 * false in shared memory before sending the reply, so
549 * we don't miss a new request for a reply.
550 */
551 walrcv->apply_reply_requested = false;
553 XLogWalRcvSendReply(false, false, true);
554 }
555 }
556 if (rc & WL_TIMEOUT)
557 {
558 /*
559 * We didn't receive anything new. If we haven't heard
560 * anything from the server for more than
561 * wal_receiver_timeout / 2, ping the server. Also, if
562 * it's been longer than wal_receiver_status_interval
563 * since the last update we sent, send a status update to
564 * the primary anyway, to report any progress in applying
565 * WAL.
566 */
567 bool requestReply = false;
568
569 /*
570 * Report pending statistics to the cumulative stats
571 * system. This location is useful for the report as it
572 * is not within a tight loop in the WAL receiver, to
573 * avoid bloating pgstats with requests, while also making
574 * sure that the reports happen each time a status update
575 * is sent.
576 */
577 pgstat_report_wal(false);
578
579 /*
580 * Check if time since last receive from primary has
581 * reached the configured limit.
582 */
587 errmsg("terminating walreceiver due to timeout")));
588
589 /*
590 * If we didn't receive anything new for half of receiver
591 * replication timeout, then ping the server.
592 */
594 {
595 requestReply = true;
597 }
598
601 }
602 }
603
604 /*
605 * The backend finished streaming. Exit streaming COPY-mode from
606 * our side, too.
607 */
609
610 /*
611 * If the server had switched to a new timeline that we didn't
612 * know about when we began streaming, fetch its timeline history
613 * file now.
614 */
616 }
617 else
618 ereport(LOG,
619 (errmsg("primary server contains no more WAL on requested timeline %u",
620 startpointTLI)));
621
622 /*
623 * End of WAL reached on the requested timeline. Close the last
624 * segment, and await for new orders from the startup process.
625 */
626 if (recvFile >= 0)
627 {
629
630 XLogWalRcvFlush(false, startpointTLI);
632 if (close(recvFile) != 0)
635 errmsg("could not close WAL segment %s: %m",
636 xlogfname)));
637
638 /*
639 * Create .done file forcibly to prevent the streamed segment from
640 * being archived later.
641 */
644 else
646 }
647 recvFile = -1;
648
649 elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
650 WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
651 }
652 /* not reached */
653}
#define pg_memory_barrier()
Definition atomics.h:141
void AuxiliaryProcessMainCommon(void)
Definition auxprocess.c:41
sigset_t UnBlockSig
Definition pqsignal.c:22
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1765
#define Min(x, y)
Definition c.h:1091
#define UINT64_FORMAT
Definition c.h:635
#define pg_fallthrough
Definition c.h:161
void load_file(const char *filename, bool restricted)
Definition dfmgr.c:149
int errcode_for_file_access(void)
Definition elog.c:898
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define PANIC
Definition elog.h:44
#define DEBUG1
Definition elog.h:31
void err(int eval, const char *fmt,...)
Definition err.c:43
ProcNumber MyProcNumber
Definition globals.c:92
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
@ PGC_SIGHUP
Definition guc.h:75
char * cluster_name
Definition guc_tables.c:582
#define close(a)
Definition win32.h:12
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
int i
Definition isn.c:77
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition latch.c:223
static char buf[DEFAULT_XLOG_SEG_SIZE]
#define die(msg)
void pgstat_report_wal(bool force)
Definition pgstat_wal.c:46
#define pqsignal
Definition port.h:548
#define PG_SIG_IGN
Definition port.h:552
int pgsocket
Definition port.h:29
#define PGINVALID_SOCKET
Definition port.h:31
#define PG_SIG_DFL
Definition port.h:551
#define PointerGetDatum(X)
Definition postgres.h:354
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition procsignal.c:688
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
#define WL_SOCKET_READABLE
#define WL_TIMEOUT
#define NUM_WALRCV_WAKEUPS
static StringInfoData reply_message
static int recvFile
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
static TimeLineID recvFileTLI
WalReceiverFunctionsType * WalReceiverFunctions
Definition walreceiver.c:96
static void XLogWalRcvSendReply(bool force, bool requestReply, bool checkApply)
static XLogSegNo recvSegNo
static void XLogWalRcvSendHSFeedback(bool immed)
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
static void WalRcvDie(int code, Datum arg)
static struct @19 LogstreamResult
#define walrcv_startstreaming(conn, options)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
#define walrcv_get_conninfo(conn)
#define walrcv_endstreaming(conn, next_tli)
#define walrcv_identify_system(conn, primary_tli)
#define walrcv_get_backend_pid(conn)
#define walrcv_receive(conn, buffer, wait_fd)
#define SIGCHLD
Definition win32_port.h:168
#define SIGHUP
Definition win32_port.h:158
#define SIGPIPE
Definition win32_port.h:163
#define SIGUSR1
Definition win32_port.h:170
#define SIGALRM
Definition win32_port.h:164
#define SIGUSR2
Definition win32_port.h:171
uint64 GetSystemIdentifier(void)
Definition xlog.c:4643
bool RecoveryInProgress(void)
Definition xlog.c:6832
int wal_segment_size
Definition xlog.c:150
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References ARCHIVE_MODE_ALWAYS, Assert, AuxiliaryProcessMainCommon(), buf, CHECK_FOR_INTERRUPTS, close, cluster_name, ConditionVariableBroadcast(), ConfigReloadPending, DEBUG1, die, elog, ereport, err(), errcode(), errcode_for_file_access(), errdetail(), errmsg, ERROR, FATAL, fb(), GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), i, initStringInfo(), len, load_file(), LOG, LogstreamResult, LSN_FORMAT_ARGS, MAXCONNINFO, MAXFNAMELEN, Min, MyLatch, MyProcNumber, MyProcPid, NAMEDATALEN, now(), NUM_WALRCV_WAKEUPS, on_shmem_exit(), PANIC, pfree(), pg_fallthrough, pg_memory_barrier, PG_SIG_DFL, PG_SIG_IGN, PGC_SIGHUP, PGINVALID_SOCKET, pgstat_report_wal(), PointerGetDatum, pqsignal, proc_exit(), ProcessConfigFile(), procsignal_sigusr1_handler(), RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, reply_message, ResetLatch(), SIGALRM, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, snprintf, SpinLockAcquire(), SpinLockRelease(), strlcpy(), TIMESTAMP_INFINITY, TimestampDifferenceMilliseconds(), UINT64_FORMAT, UnBlockSig, WaitLatchOrSocket(), wakeup, wal_segment_size, WalRcv, walrcv_connect, WALRCV_CONNECTING, walrcv_create_slot, walrcv_endstreaming, walrcv_get_backend_pid, walrcv_get_conninfo, walrcv_get_senderinfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_TERMINATE, WalRcvComputeNextWakeup(), WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvWaitForStartPosition(), WalReceiverFunctions, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_TIMEOUT, wrconn, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName(), XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ XLogWalRcvClose()

static void XLogWalRcvClose ( XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 1067 of file walreceiver.c.

1068{
1069 char xlogfname[MAXFNAMELEN];
1070
1072 Assert(tli != 0);
1073
1074 /*
1075 * fsync() and close current file before we switch to next one. We would
1076 * otherwise have to reopen this file to fsync it later
1077 */
1078 XLogWalRcvFlush(false, tli);
1079
1081
1082 /*
1083 * XLOG segment files will be re-read by recovery in startup process soon,
1084 * so we don't advise the OS to release cache pages associated with the
1085 * file like XLogFileClose() does.
1086 */
1087 if (close(recvFile) != 0)
1088 ereport(PANIC,
1090 errmsg("could not close WAL segment %s: %m",
1091 xlogfname)));
1092
1093 /*
1094 * Create .done file forcibly to prevent the streamed segment from being
1095 * archived later.
1096 */
1099 else
1101
1102 recvFile = -1;
1103}
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

References ARCHIVE_MODE_ALWAYS, Assert, close, ereport, errcode_for_file_access(), errmsg, fb(), MAXFNAMELEN, PANIC, recvFile, recvFileTLI, recvSegNo, wal_segment_size, XLByteInSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName(), and XLogWalRcvFlush().

Referenced by XLogWalRcvWrite().

◆ XLogWalRcvFlush()

static void XLogWalRcvFlush ( bool  dying,
TimeLineID  tli 
)
static

Definition at line 1006 of file walreceiver.c.

1007{
1008 Assert(tli != 0);
1009
1010 if (LogstreamResult.Flush < LogstreamResult.Write)
1011 {
1013
1015
1016 LogstreamResult.Flush = LogstreamResult.Write;
1017
1018 /* Update shared-memory status */
1019 SpinLockAcquire(&walrcv->mutex);
1020 if (walrcv->flushedUpto < LogstreamResult.Flush)
1021 {
1022 walrcv->latestChunkStart = walrcv->flushedUpto;
1023 walrcv->flushedUpto = LogstreamResult.Flush;
1024 walrcv->receivedTLI = tli;
1025 }
1026 SpinLockRelease(&walrcv->mutex);
1027
1028 /*
1029 * Wake up processes waiting for standby flush LSN to reach current
1030 * flush position.
1031 */
1033
1034 /* Signal the startup process and walsender that new WAL has arrived */
1037 WalSndWakeup(true, false);
1038
1039 /* Report XLOG streaming progress in PS display */
1041 {
1042 char activitymsg[50];
1043
1044 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%08X",
1047 }
1048
1049 /* Also let the primary know that we made some progress */
1050 if (!dying)
1051 {
1052 XLogWalRcvSendReply(false, false, false);
1054 }
1055 }
1056}
#define AllowCascadeReplication()
Definition walreceiver.h:40
void WalSndWakeup(bool physical, bool logical)
Definition walsender.c:4012
void issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli)
Definition xlog.c:9358
void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
Definition xlogwait.c:344
@ WAIT_LSN_TYPE_STANDBY_FLUSH
Definition xlogwait.h:41

References AllowCascadeReplication, Assert, fb(), issue_xlog_fsync(), LogstreamResult, LSN_FORMAT_ARGS, recvFile, recvSegNo, set_ps_display(), snprintf, SpinLockAcquire(), SpinLockRelease(), update_process_title, WAIT_LSN_TYPE_STANDBY_FLUSH, WaitLSNWakeup(), WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvClose().

◆ XLogWalRcvProcessMsg()

static void XLogWalRcvProcessMsg ( unsigned char  type,
char buf,
Size  len,
TimeLineID  tli 
)
static

Definition at line 834 of file walreceiver.c.

835{
836 int hdrlen;
840 bool replyRequested;
841
842 switch (type)
843 {
845 {
847
848 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
849 if (len < hdrlen)
852 errmsg_internal("invalid WAL message received from primary")));
853
854 /* initialize a StringInfo with the given buffer */
856
857 /* read the fields */
862
863 buf += hdrlen;
864 len -= hdrlen;
866 break;
867 }
869 {
871
872 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
873 if (len != hdrlen)
876 errmsg_internal("invalid keepalive message received from primary")));
877
878 /* initialize a StringInfo with the given buffer */
880
881 /* read the fields */
885
887
888 /* If the primary requested a reply, send one immediately */
889 if (replyRequested)
890 XLogWalRcvSendReply(true, false, false);
891 break;
892 }
893 default:
896 errmsg_internal("invalid replication message type %d",
897 type)));
898 }
899}
int64_t int64
Definition c.h:621
int pq_getmsgbyte(StringInfo msg)
Definition pqformat.c:398
int64 pq_getmsgint64(StringInfo msg)
Definition pqformat.c:452
#define PqReplMsg_WALData
Definition protocol.h:77
#define PqReplMsg_Keepalive
Definition protocol.h:75
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition stringinfo.h:157
const char * type
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)

References buf, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg_internal(), ERROR, fb(), initReadOnlyStringInfo(), len, pq_getmsgbyte(), pq_getmsgint64(), PqReplMsg_Keepalive, PqReplMsg_WALData, ProcessWalSndrMessage(), type, XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

◆ XLogWalRcvSendHSFeedback()

static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1202 of file walreceiver.c.

1203{
1206 TransactionId nextXid;
1209 TransactionId xmin,
1210 catalog_xmin;
1211
1212 /* initially true so we always send at least one feedback message */
1213 static bool primary_has_standby_xmin = true;
1214
1215 /*
1216 * If the user doesn't want status to be reported to the primary, be sure
1217 * to exit before doing anything at all.
1218 */
1221 return;
1222
1223 /* Get current timestamp. */
1225
1226 /* Send feedback at most once per wal_receiver_status_interval. */
1228 return;
1229
1230 /* Make sure we wake up when it's time to send feedback again. */
1232
1233 /*
1234 * If Hot Standby is not yet accepting connections there is nothing to
1235 * send. Check this after the interval has expired to reduce number of
1236 * calls.
1237 *
1238 * Bailing out here also ensures that we don't send feedback until we've
1239 * read our own replication slot state, so we don't tell the primary to
1240 * discard needed xmin or catalog_xmin from any slots that may exist on
1241 * this replica.
1242 */
1243 if (!HotStandbyActive())
1244 return;
1245
1246 /*
1247 * Make the expensive call to get the oldest xmin once we are certain
1248 * everything else has been checked.
1249 */
1251 {
1252 GetReplicationHorizons(&xmin, &catalog_xmin);
1253 }
1254 else
1255 {
1256 xmin = InvalidTransactionId;
1257 catalog_xmin = InvalidTransactionId;
1258 }
1259
1260 /*
1261 * Get epoch and adjust if nextXid and oldestXmin are different sides of
1262 * the epoch boundary.
1263 */
1268 if (nextXid < xmin)
1269 xmin_epoch--;
1270 if (nextXid < catalog_xmin)
1272
1273 elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1274 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1275
1276 /* Construct the message and send it. */
1282 pq_sendint32(&reply_message, catalog_xmin);
1285 if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1287 else
1289}
uint32_t uint32
Definition c.h:624
uint32 TransactionId
Definition c.h:736
static void pq_sendint32(StringInfo buf, uint32 i)
Definition pqformat.h:144
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition pqformat.h:152
void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin)
Definition procarray.c:1986
#define PqReplMsg_HotStandbyFeedback
Definition protocol.h:82
void resetStringInfo(StringInfo str)
Definition stringinfo.c:126
#define InvalidTransactionId
Definition transam.h:31
#define EpochFromFullTransactionId(x)
Definition transam.h:47
#define XidFromFullTransactionId(x)
Definition transam.h:48
#define TransactionIdIsValid(xid)
Definition transam.h:41
FullTransactionId ReadNextFullTransactionId(void)
Definition varsup.c:283
#define walrcv_send(conn, buffer, nbytes)
bool HotStandbyActive(void)

References StringInfoData::data, DEBUG2, elog, EpochFromFullTransactionId, fb(), GetCurrentTimestamp(), GetReplicationHorizons(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), pq_sendbyte(), pq_sendint32(), pq_sendint64(), PqReplMsg_HotStandbyFeedback, ReadNextFullTransactionId(), reply_message, resetStringInfo(), TransactionIdIsValid, wakeup, wal_receiver_status_interval, walrcv_send, WALRCV_WAKEUP_HSFEEDBACK, WalRcvComputeNextWakeup(), wrconn, and XidFromFullTransactionId.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

◆ XLogWalRcvSendReply()

static void XLogWalRcvSendReply ( bool  force,
bool  requestReply,
bool  checkApply 
)
static

Definition at line 1129 of file walreceiver.c.

1130{
1136
1137 /*
1138 * If the user doesn't want status to be reported to the primary, be sure
1139 * to exit before doing anything at all.
1140 */
1141 if (!force && wal_receiver_status_interval <= 0)
1142 return;
1143
1144 /* Get current timestamp. */
1146
1147 /*
1148 * We can compare the write and flush positions to the last message we
1149 * sent without taking any lock, but the apply position requires a spin
1150 * lock, so we don't check that unless it is expected to advance since the
1151 * previous update, i.e., when 'checkApply' is true.
1152 */
1153 if (!force && now < wakeup[WALRCV_WAKEUP_REPLY])
1154 {
1155 if (checkApply)
1157
1158 if (writePtr == LogstreamResult.Write
1159 && flushPtr == LogstreamResult.Flush
1161 return;
1162 }
1163
1164 /* Make sure we wake up when it's time to send another reply. */
1166
1167 /* Construct a new message */
1168 writePtr = LogstreamResult.Write;
1169 flushPtr = LogstreamResult.Flush;
1172
1180
1181 /* Send it */
1182 elog(DEBUG2, "sending write %X/%08X flush %X/%08X apply %X/%08X%s",
1186 requestReply ? " (reply requested)" : "");
1187
1189}
#define PqReplMsg_StandbyStatusUpdate
Definition protocol.h:84

References StringInfoData::data, DEBUG2, elog, fb(), GetCurrentTimestamp(), GetXLogReplayRecPtr(), InvalidXLogRecPtr, StringInfoData::len, LogstreamResult, LSN_FORMAT_ARGS, now(), pq_sendbyte(), pq_sendint64(), PqReplMsg_StandbyStatusUpdate, reply_message, resetStringInfo(), wakeup, wal_receiver_status_interval, walrcv_send, WALRCV_WAKEUP_REPLY, WalRcvComputeNextWakeup(), wrconn, and XLogRecPtrIsValid.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

◆ XLogWalRcvWrite()

static void XLogWalRcvWrite ( char buf,
Size  nbytes,
XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 905 of file walreceiver.c.

906{
907 int startoff;
908 int byteswritten;
910
911 Assert(tli != 0);
912
913 while (nbytes > 0)
914 {
915 int segbytes;
916
917 /* Close the current segment if it's completed */
920
921 if (recvFile < 0)
922 {
923 /* Create/use new log file */
926 recvFileTLI = tli;
927 }
928
929 /* Calculate the start offset of the received logs */
931
932 if (startoff + nbytes > wal_segment_size)
934 else
935 segbytes = nbytes;
936
937 /* OK to write the logs */
938 errno = 0;
939
940 /*
941 * Measure I/O timing to write WAL data, for pg_stat_io.
942 */
944
948
951
952 if (byteswritten <= 0)
953 {
955 int save_errno;
956
957 /* if write didn't set errno, assume no disk space */
958 if (errno == 0)
959 errno = ENOSPC;
960
966 errmsg("could not write to WAL segment %s "
967 "at offset %d, length %d: %m",
969 }
970
971 /* Update state for write */
973
974 nbytes -= byteswritten;
975 buf += byteswritten;
976
977 LogstreamResult.Write = recptr;
978 }
979
980 /* Update shared-memory status */
982
983 /*
984 * Wake up processes waiting for standby write LSN to reach current write
985 * position.
986 */
988
989 /*
990 * Close the current segment if it's fully written up in the last cycle of
991 * the loop, to create its archive notification file soon. Otherwise WAL
992 * archiving of the segment will be delayed until any data in the next
993 * segment is received and written.
994 */
997}
static void pg_atomic_write_membarrier_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:504
return str start
@ IOOBJECT_WAL
Definition pgstat.h:283
@ IOCONTEXT_NORMAL
Definition pgstat.h:293
@ IOOP_WRITE
Definition pgstat.h:320
instr_time pgstat_prepare_io_time(bool track_io_guc)
Definition pgstat_io.c:91
void pgstat_count_io_op_time(IOObject io_object, IOContext io_context, IOOp io_op, instr_time start_time, uint32 cnt, uint64 bytes)
Definition pgstat_io.c:122
#define pg_pwrite
Definition port.h:249
off_t pgoff_t
Definition port.h:422
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition wait_event.h:67
static void pgstat_report_wait_end(void)
Definition wait_event.h:83
static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
int XLogFileInit(XLogSegNo logsegno, TimeLineID logtli)
Definition xlog.c:3431
bool track_wal_io_timing
Definition xlog.c:144
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
@ WAIT_LSN_TYPE_STANDBY_WRITE
Definition xlogwait.h:40

References Assert, buf, ereport, errcode_for_file_access(), errmsg, fb(), IOCONTEXT_NORMAL, IOOBJECT_WAL, IOOP_WRITE, LogstreamResult, MAXFNAMELEN, PANIC, pg_atomic_write_membarrier_u64(), pg_pwrite, pgstat_count_io_op_time(), pgstat_prepare_io_time(), pgstat_report_wait_end(), pgstat_report_wait_start(), recvFile, recvFileTLI, recvSegNo, start, track_wal_io_timing, WAIT_LSN_TYPE_STANDBY_WRITE, WaitLSNWakeup(), wal_segment_size, WalRcv, WalRcvData::writtenUpto, XLByteInSeg, XLByteToSeg, XLogFileInit(), XLogFileName(), XLogSegmentOffset, and XLogWalRcvClose().

Referenced by XLogWalRcvProcessMsg().

Variable Documentation

◆ Flush

XLogRecPtr Flush

Definition at line 114 of file walreceiver.c.

Referenced by XLogWrite().

◆ hot_standby_feedback

bool hot_standby_feedback

◆ [struct]

struct { ... } LogstreamResult

◆ recvFile

int recvFile = -1
static

Definition at line 103 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ recvFileTLI

TimeLineID recvFileTLI = 0
static

Definition at line 104 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), and XLogWalRcvWrite().

◆ recvSegNo

XLogSegNo recvSegNo = 0
static

Definition at line 105 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ reply_message

StringInfoData reply_message
static

◆ wakeup

◆ wal_receiver_status_interval

◆ wal_receiver_timeout

int wal_receiver_timeout

◆ WalReceiverFunctions

WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 96 of file walreceiver.c.

Referenced by _PG_init(), and WalReceiverMain().

◆ wrconn

◆ Write

XLogRecPtr Write

Definition at line 113 of file walreceiver.c.

Referenced by XLogWrite().