PostgreSQL Source Code git master
Loading...
Searching...
No Matches
walreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "access/xlogrecovery.h"
#include "access/xlogwait.h"
#include "catalog/pg_authid.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/auxprocess.h"
#include "postmaster/interrupt.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/timestamp.h"
#include "utils/wait_event.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NUM_WALRCV_WAKEUPS   (WALRCV_WAKEUP_HSFEEDBACK + 1)
 

Typedefs

typedef enum WalRcvWakeupReason WalRcvWakeupReason
 

Enumerations

enum  WalRcvWakeupReason { WALRCV_WAKEUP_TERMINATE , WALRCV_WAKEUP_PING , WALRCV_WAKEUP_REPLY , WALRCV_WAKEUP_HSFEEDBACK }
 

Functions

static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len, TimeLineID tli)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvFlush (bool dying, TimeLineID tli)
 
static void XLogWalRcvClose (XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvSendReply (bool force, bool requestReply, bool checkApply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
static void WalRcvComputeNextWakeup (WalRcvWakeupReason reason, TimestampTz now)
 
void WalReceiverMain (const void *startup_data, size_t startup_data_len)
 
void WalRcvRequestApplyReply (void)
 
static const charWalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
struct { 
 
   XLogRecPtr   Write 
 
   XLogRecPtr   Flush 
 
LogstreamResult 
 
static TimestampTz wakeup [NUM_WALRCV_WAKEUPS]
 
static StringInfoData reply_message
 

Macro Definition Documentation

◆ NUM_WALRCV_WAKEUPS

#define NUM_WALRCV_WAKEUPS   (WALRCV_WAKEUP_HSFEEDBACK + 1)

Definition at line 126 of file walreceiver.c.

Typedef Documentation

◆ WalRcvWakeupReason

Enumeration Type Documentation

◆ WalRcvWakeupReason

Enumerator
WALRCV_WAKEUP_TERMINATE 
WALRCV_WAKEUP_PING 
WALRCV_WAKEUP_REPLY 
WALRCV_WAKEUP_HSFEEDBACK 

Definition at line 120 of file walreceiver.c.

121{
126#define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1)
WalRcvWakeupReason
@ WALRCV_WAKEUP_TERMINATE
@ WALRCV_WAKEUP_REPLY
@ WALRCV_WAKEUP_PING
@ WALRCV_WAKEUP_HSFEEDBACK

Function Documentation

◆ pg_stat_get_wal_receiver()

Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1444 of file walreceiver.c.

1445{
1446 TupleDesc tupdesc;
1447 Datum *values;
1448 bool *nulls;
1449 int pid;
1450 bool ready_to_display;
1457 TimestampTz last_send_time;
1461 char sender_host[NI_MAXHOST];
1462 int sender_port = 0;
1463 char slotname[NAMEDATALEN];
1464 char conninfo[MAXCONNINFO];
1465
1466 /* Take a lock to ensure value consistency */
1468 pid = (int) WalRcv->pid;
1469 ready_to_display = WalRcv->ready_to_display;
1475 last_send_time = WalRcv->lastMsgSendTime;
1479 strlcpy(slotname, WalRcv->slotname, sizeof(slotname));
1480 strlcpy(sender_host, WalRcv->sender_host, sizeof(sender_host));
1481 sender_port = WalRcv->sender_port;
1482 strlcpy(conninfo, WalRcv->conninfo, sizeof(conninfo));
1484
1485 /*
1486 * No WAL receiver (or not ready yet), just return a tuple with NULL
1487 * values
1488 */
1489 if (pid == 0 || !ready_to_display)
1491
1492 /*
1493 * Read "writtenUpto" without holding a spinlock. Note that it may not be
1494 * consistent with the other shared variables of the WAL receiver
1495 * protected by a spinlock, but this should not be used for data integrity
1496 * checks.
1497 */
1499
1500 /* determine result type */
1501 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1502 elog(ERROR, "return type must be a row type");
1503
1504 values = palloc0_array(Datum, tupdesc->natts);
1505 nulls = palloc0_array(bool, tupdesc->natts);
1506
1507 /* Fetch values */
1508 values[0] = Int32GetDatum(pid);
1509
1511 {
1512 /*
1513 * Only superusers and roles with privileges of pg_read_all_stats can
1514 * see details. Other users only get the pid value to know whether it
1515 * is a WAL receiver, but no details.
1516 */
1517 memset(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1518 }
1519 else
1520 {
1522
1524 nulls[2] = true;
1525 else
1529 nulls[4] = true;
1530 else
1533 nulls[5] = true;
1534 else
1537 if (last_send_time == 0)
1538 nulls[7] = true;
1539 else
1540 values[7] = TimestampTzGetDatum(last_send_time);
1541 if (last_receipt_time == 0)
1542 nulls[8] = true;
1543 else
1546 nulls[9] = true;
1547 else
1549 if (latest_end_time == 0)
1550 nulls[10] = true;
1551 else
1553 if (*slotname == '\0')
1554 nulls[11] = true;
1555 else
1556 values[11] = CStringGetTextDatum(slotname);
1557 if (*sender_host == '\0')
1558 nulls[12] = true;
1559 else
1560 values[12] = CStringGetTextDatum(sender_host);
1561 if (sender_port == 0)
1562 nulls[13] = true;
1563 else
1564 values[13] = Int32GetDatum(sender_port);
1565 if (*conninfo == '\0')
1566 nulls[14] = true;
1567 else
1568 values[14] = CStringGetTextDatum(conninfo);
1569 }
1570
1571 /* Returns the record as Datum */
1573}
bool has_privs_of_role(Oid member, Oid role)
Definition acl.c:5314
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition atomics.h:467
static Datum values[MAXATTR]
Definition bootstrap.c:190
#define CStringGetTextDatum(s)
Definition builtins.h:98
int64 TimestampTz
Definition timestamp.h:39
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define palloc0_array(type, count)
Definition fe_memutils.h:92
#define PG_RETURN_NULL()
Definition fmgr.h:346
#define PG_RETURN_DATUM(x)
Definition fmgr.h:354
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition funcapi.c:276
@ TYPEFUNC_COMPOSITE
Definition funcapi.h:149
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
Definition funcapi.h:230
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1025
Oid GetUserId(void)
Definition miscinit.c:470
#define NAMEDATALEN
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition strlcpy.c:45
uint64_t Datum
Definition postgres.h:70
static Datum Int32GetDatum(int32 X)
Definition postgres.h:212
static int fb(int x)
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
TimestampTz lastMsgReceiptTime
XLogRecPtr latestWalEnd
TimeLineID receiveStartTLI
Definition walreceiver.h:88
TimeLineID receivedTLI
Definition walreceiver.h:98
char slotname[NAMEDATALEN]
char sender_host[NI_MAXHOST]
XLogRecPtr receiveStart
Definition walreceiver.h:87
XLogRecPtr flushedUpto
Definition walreceiver.h:97
pg_atomic_uint64 writtenUpto
TimestampTz lastMsgSendTime
WalRcvState walRcvState
Definition walreceiver.h:72
TimestampTz latestWalEndTime
bool ready_to_display
slock_t mutex
char conninfo[MAXCONNINFO]
static Datum TimestampTzGetDatum(TimestampTz X)
Definition timestamp.h:52
static const char * WalRcvGetStateString(WalRcvState state)
#define MAXCONNINFO
Definition walreceiver.h:37
WalRcvState
Definition walreceiver.h:46
WalRcvData * WalRcv
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
uint64 XLogRecPtr
Definition xlogdefs.h:21
uint32 TimeLineID
Definition xlogdefs.h:63

References WalRcvData::conninfo, CStringGetTextDatum, elog, ERROR, fb(), WalRcvData::flushedUpto, get_call_result_type(), GetUserId(), has_privs_of_role(), heap_form_tuple(), HeapTupleGetDatum(), Int32GetDatum(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum(), MAXCONNINFO, WalRcvData::mutex, NAMEDATALEN, TupleDescData::natts, palloc0_array, pg_atomic_read_u64(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::sender_host, WalRcvData::sender_port, WalRcvData::slotname, SpinLockAcquire(), SpinLockRelease(), strlcpy(), TimestampTzGetDatum(), TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, WalRcvData::writtenUpto, and XLogRecPtrIsValid.

◆ ProcessWalSndrMessage()

static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1306 of file walreceiver.c.

1307{
1309 TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1310
1311 /* Update shared-memory status */
1312 SpinLockAcquire(&walrcv->mutex);
1313 if (walrcv->latestWalEnd < walEnd)
1314 walrcv->latestWalEndTime = sendTime;
1315 walrcv->latestWalEnd = walEnd;
1316 walrcv->lastMsgSendTime = sendTime;
1317 walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1318 SpinLockRelease(&walrcv->mutex);
1319
1321 {
1322 char *sendtime;
1323 char *receipttime;
1324 int applyDelay;
1325
1326 /* Copy because timestamptz_to_str returns a static buffer */
1328 receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1330
1331 /* apply delay is not available */
1332 if (applyDelay == -1)
1333 elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1334 sendtime,
1337 else
1338 elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1339 sendtime,
1341 applyDelay,
1343
1344 pfree(sendtime);
1346 }
1347}
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1649
const char * timestamptz_to_str(TimestampTz t)
Definition timestamp.c:1870
bool message_level_is_interesting(int elevel)
Definition elog.c:285
#define DEBUG2
Definition elog.h:30
char * pstrdup(const char *in)
Definition mcxt.c:1910
void pfree(void *pointer)
Definition mcxt.c:1619
int GetReplicationApplyDelay(void)
int GetReplicationTransferLatency(void)

References DEBUG2, elog, fb(), GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), message_level_is_interesting(), pfree(), pstrdup(), SpinLockAcquire(), SpinLockRelease(), timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

◆ WalRcvComputeNextWakeup()

static void WalRcvComputeNextWakeup ( WalRcvWakeupReason  reason,
TimestampTz  now 
)
static

Definition at line 1358 of file walreceiver.c.

1359{
1360 switch (reason)
1361 {
1363 if (wal_receiver_timeout <= 0)
1364 wakeup[reason] = TIMESTAMP_INFINITY;
1365 else
1367 break;
1368 case WALRCV_WAKEUP_PING:
1369 if (wal_receiver_timeout <= 0)
1370 wakeup[reason] = TIMESTAMP_INFINITY;
1371 else
1373 break;
1376 wakeup[reason] = TIMESTAMP_INFINITY;
1377 else
1379 break;
1382 wakeup[reason] = TIMESTAMP_INFINITY;
1383 else
1385 break;
1386 /* there's intentionally no default: here */
1387 }
1388}
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1613
#define TIMESTAMP_INFINITY
Definition timestamp.h:151
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
#define TimestampTzPlusSeconds(tz, s)
Definition timestamp.h:86
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]
bool hot_standby_feedback
Definition walreceiver.c:92
int wal_receiver_status_interval
Definition walreceiver.c:90
int wal_receiver_timeout
Definition walreceiver.c:91

References hot_standby_feedback, now(), TIMESTAMP_INFINITY, TimestampTzPlusMilliseconds, TimestampTzPlusSeconds, wakeup, wal_receiver_status_interval, wal_receiver_timeout, WALRCV_WAKEUP_HSFEEDBACK, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_REPLY, and WALRCV_WAKEUP_TERMINATE.

Referenced by WalReceiverMain(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ WalRcvDie()

static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 803 of file walreceiver.c.

804{
807
808 Assert(*startpointTLI_p != 0);
809
810 /* Ensure that all WAL records received are flushed to disk */
812
813 /* Mark ourselves inactive in shared memory */
814 SpinLockAcquire(&walrcv->mutex);
815 Assert(walrcv->walRcvState == WALRCV_STREAMING ||
816 walrcv->walRcvState == WALRCV_CONNECTING ||
817 walrcv->walRcvState == WALRCV_RESTARTING ||
818 walrcv->walRcvState == WALRCV_STARTING ||
819 walrcv->walRcvState == WALRCV_WAITING ||
820 walrcv->walRcvState == WALRCV_STOPPING);
821 Assert(walrcv->pid == MyProcPid);
822 walrcv->walRcvState = WALRCV_STOPPED;
823 walrcv->pid = 0;
824 walrcv->procno = INVALID_PROC_NUMBER;
825 walrcv->ready_to_display = false;
826 SpinLockRelease(&walrcv->mutex);
827
828 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
829
830 /* Terminate the connection gracefully. */
831 if (wrconn != NULL)
833
834 /* Wake up the startup process to notice promptly that we're gone */
836}
#define Assert(condition)
Definition c.h:943
void ConditionVariableBroadcast(ConditionVariable *cv)
Datum arg
Definition elog.c:1323
int MyProcPid
Definition globals.c:49
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:332
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
static WalReceiverConn * wrconn
Definition walreceiver.c:95
static void XLogWalRcvFlush(bool dying, TimeLineID tli)
@ WALRCV_STARTING
Definition walreceiver.h:48
@ WALRCV_STOPPED
Definition walreceiver.h:47
@ WALRCV_CONNECTING
Definition walreceiver.h:50
@ WALRCV_RESTARTING
Definition walreceiver.h:53
@ WALRCV_STREAMING
Definition walreceiver.h:51
@ WALRCV_WAITING
Definition walreceiver.h:52
@ WALRCV_STOPPING
Definition walreceiver.h:54
#define walrcv_disconnect(conn)
void WakeupRecovery(void)

References arg, Assert, ConditionVariableBroadcast(), DatumGetPointer(), fb(), INVALID_PROC_NUMBER, MyProcPid, SpinLockAcquire(), SpinLockRelease(), WakeupRecovery(), WalRcv, WALRCV_CONNECTING, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, wrconn, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

◆ WalRcvFetchTimeLineHistoryFiles()

static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 747 of file walreceiver.c.

748{
749 TimeLineID tli;
750
751 for (tli = first; tli <= last; tli++)
752 {
753 /* there's no history file for timeline 1 */
754 if (tli != 1 && !existsTimeLineHistory(tli))
755 {
756 char *fname;
757 char *content;
758 int len;
760
761 ereport(LOG,
762 (errmsg("fetching timeline history file for timeline %u from primary server",
763 tli)));
764
765 walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
766
767 /*
768 * Check that the filename on the primary matches what we
769 * calculated ourselves. This is just a sanity check, it should
770 * always match.
771 */
773 if (strcmp(fname, expectedfname) != 0)
776 errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
777 tli)));
778
779 /*
780 * Write the file to pg_wal.
781 */
782 writeTimeLineHistoryFile(tli, content, len);
783
784 /*
785 * Mark the streamed history file as ready for archiving if
786 * archive_mode is always.
787 */
790 else
791 XLogArchiveNotify(fname);
792
793 pfree(fname);
794 pfree(content);
795 }
796 }
797}
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition timeline.c:464
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition timeline.c:223
int errcode(int sqlerrcode)
Definition elog.c:875
#define LOG
Definition elog.h:32
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define ereport(elevel,...)
Definition elog.h:152
#define ERRCODE_PROTOCOL_VIOLATION
Definition fe-connect.c:96
static char * errmsg
const void size_t len
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
int XLogArchiveMode
Definition xlog.c:126
@ ARCHIVE_MODE_ALWAYS
Definition xlog.h:69
#define MAXFNAMELEN
static void TLHistoryFileName(char *fname, TimeLineID tli)
void XLogArchiveForceDone(const char *xlog)
void XLogArchiveNotify(const char *xlog)

References ARCHIVE_MODE_ALWAYS, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg, errmsg_internal(), ERROR, existsTimeLineHistory(), fb(), len, LOG, MAXFNAMELEN, pfree(), TLHistoryFileName(), walrcv_readtimelinehistoryfile, wrconn, writeTimeLineHistoryFile(), XLogArchiveForceDone(), XLogArchiveMode, and XLogArchiveNotify().

Referenced by WalReceiverMain().

◆ WalRcvGetStateString()

static const char * WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1417 of file walreceiver.c.

1418{
1419 switch (state)
1420 {
1421 case WALRCV_STOPPED:
1422 return "stopped";
1423 case WALRCV_STARTING:
1424 return "starting";
1425 case WALRCV_CONNECTING:
1426 return "connecting";
1427 case WALRCV_STREAMING:
1428 return "streaming";
1429 case WALRCV_WAITING:
1430 return "waiting";
1431 case WALRCV_RESTARTING:
1432 return "restarting";
1433 case WALRCV_STOPPING:
1434 return "stopping";
1435 }
1436 return "UNKNOWN";
1437}

References WALRCV_CONNECTING, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

◆ WalRcvRequestApplyReply()

void WalRcvRequestApplyReply ( void  )

Definition at line 1399 of file walreceiver.c.

1400{
1401 ProcNumber procno;
1402
1404 /* fetching the proc number is probably atomic, but don't rely on it */
1406 procno = WalRcv->procno;
1408 if (procno != INVALID_PROC_NUMBER)
1409 SetLatch(&GetPGProcByNumber(procno)->procLatch);
1410}
void SetLatch(Latch *latch)
Definition latch.c:290
#define GetPGProcByNumber(n)
Definition proc.h:504
int ProcNumber
Definition procnumber.h:24
ProcNumber procno
Definition walreceiver.h:68
sig_atomic_t apply_reply_requested

References WalRcvData::apply_reply_requested, GetPGProcByNumber, INVALID_PROC_NUMBER, WalRcvData::mutex, WalRcvData::procno, SetLatch(), SpinLockAcquire(), SpinLockRelease(), and WalRcv.

Referenced by ApplyWalRecord(), and WaitForWALToBecomeAvailable().

◆ WalRcvWaitForStartPosition()

static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 667 of file walreceiver.c.

668{
670 int state;
671
672 SpinLockAcquire(&walrcv->mutex);
673 state = walrcv->walRcvState;
675 {
676 SpinLockRelease(&walrcv->mutex);
677 if (state == WALRCV_STOPPING)
678 proc_exit(0);
679 else
680 elog(FATAL, "unexpected walreceiver state");
681 }
682 walrcv->walRcvState = WALRCV_WAITING;
683 walrcv->receiveStart = InvalidXLogRecPtr;
684 walrcv->receiveStartTLI = 0;
685 SpinLockRelease(&walrcv->mutex);
686
687 set_ps_display("idle");
688
689 /*
690 * nudge startup process to notice that we've stopped streaming and are
691 * now waiting for instructions.
692 */
694 for (;;)
695 {
697
699
700 SpinLockAcquire(&walrcv->mutex);
701 Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
702 walrcv->walRcvState == WALRCV_WAITING ||
703 walrcv->walRcvState == WALRCV_STOPPING);
704 if (walrcv->walRcvState == WALRCV_RESTARTING)
705 {
706 /*
707 * No need to handle changes in primary_conninfo or
708 * primary_slot_name here. Startup process will signal us to
709 * terminate in case those change.
710 */
711 *startpoint = walrcv->receiveStart;
712 *startpointTLI = walrcv->receiveStartTLI;
713 walrcv->walRcvState = WALRCV_CONNECTING;
714 SpinLockRelease(&walrcv->mutex);
715 break;
716 }
717 if (walrcv->walRcvState == WALRCV_STOPPING)
718 {
719 /*
720 * We should've received SIGTERM if the startup process wants us
721 * to die, but might as well check it here too.
722 */
723 SpinLockRelease(&walrcv->mutex);
724 proc_exit(1);
725 }
726 SpinLockRelease(&walrcv->mutex);
727
730 }
731
733 {
734 char activitymsg[50];
735
736 snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%08X",
737 LSN_FORMAT_ARGS(*startpoint));
739 }
740}
#define FATAL
Definition elog.h:42
struct Latch * MyLatch
Definition globals.c:65
void proc_exit(int code)
Definition ipc.c:105
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
#define snprintf
Definition port.h:261
bool update_process_title
Definition ps_status.c:31
static void set_ps_display(const char *activity)
Definition ps_status.h:40
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References Assert, CHECK_FOR_INTERRUPTS, elog, FATAL, fb(), InvalidXLogRecPtr, LSN_FORMAT_ARGS, MyLatch, proc_exit(), ResetLatch(), set_ps_display(), snprintf, SpinLockAcquire(), SpinLockRelease(), update_process_title, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_CONNECTING, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by WalReceiverMain().

◆ WalReceiverMain()

void WalReceiverMain ( const void startup_data,
size_t  startup_data_len 
)

Definition at line 154 of file walreceiver.c.

155{
156 char conninfo[MAXCONNINFO];
157 char *tmp_conninfo;
158 char slotname[NAMEDATALEN];
159 bool is_temp_slot;
160 XLogRecPtr startpoint;
161 TimeLineID startpointTLI;
163 bool first_stream;
166 char *err;
167 char *sender_host = NULL;
168 int sender_port = 0;
169 char *appname;
170
172
174
175 /*
176 * WalRcv should be set up already (if we are a backend, we inherit this
177 * by fork() or EXEC_BACKEND mechanism from the postmaster).
178 */
179 walrcv = WalRcv;
180 Assert(walrcv != NULL);
181
182 /*
183 * Mark walreceiver as running in shared memory.
184 *
185 * Do this as early as possible, so that if we fail later on, we'll set
186 * state to STOPPED. If we die before this, the startup process will keep
187 * waiting for us to start up, until it times out.
188 */
189 SpinLockAcquire(&walrcv->mutex);
190 Assert(walrcv->pid == 0);
191 switch (walrcv->walRcvState)
192 {
193 case WALRCV_STOPPING:
194 /* If we've already been requested to stop, don't start up. */
195 walrcv->walRcvState = WALRCV_STOPPED;
197
198 case WALRCV_STOPPED:
199 SpinLockRelease(&walrcv->mutex);
200 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
201 proc_exit(1);
202 break;
203
204 case WALRCV_STARTING:
205 /* The usual case */
206 break;
207
209 case WALRCV_WAITING:
210 case WALRCV_STREAMING:
212 default:
213 /* Shouldn't happen */
214 SpinLockRelease(&walrcv->mutex);
215 elog(PANIC, "walreceiver still running according to shared memory state");
216 }
217 /* Advertise our PID so that the startup process can kill us */
218 walrcv->pid = MyProcPid;
219 walrcv->walRcvState = WALRCV_CONNECTING;
220
221 /* Fetch information required to start streaming */
222 walrcv->ready_to_display = false;
223 strlcpy(conninfo, walrcv->conninfo, MAXCONNINFO);
224 strlcpy(slotname, walrcv->slotname, NAMEDATALEN);
225 is_temp_slot = walrcv->is_temp_slot;
226 startpoint = walrcv->receiveStart;
227 startpointTLI = walrcv->receiveStartTLI;
228
229 /*
230 * At most one of is_temp_slot and slotname can be set; otherwise,
231 * RequestXLogStreaming messed up.
232 */
233 Assert(!is_temp_slot || (slotname[0] == '\0'));
234
235 /* Initialise to a sanish value */
237 walrcv->lastMsgSendTime =
238 walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
239
240 /* Report our proc number so that others can wake us up */
241 walrcv->procno = MyProcNumber;
242
243 SpinLockRelease(&walrcv->mutex);
244
245 /* Arrange to clean up at walreceiver exit */
246 on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
247
248 /* Properly accept or ignore signals the postmaster might send us */
249 pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
250 * file */
252 pqsignal(SIGTERM, die); /* request shutdown */
253 /* SIGQUIT handler was already set up by InitPostmasterChild */
258
259 /* Reset some signals that are accepted by postmaster but not here */
261
262 /* Load the libpq-specific functions */
263 load_file("libpqwalreceiver", false);
265 elog(ERROR, "libpqwalreceiver didn't initialize correctly");
266
267 /* Unblock signals (they were blocked when the postmaster forked us) */
269
270 /*
271 * Switch the WAL receiver state as ready for display before doing a
272 * connection attempt, so as its connecting state is visible before
273 * attempting to contact the primary server. Note that this resets the
274 * original conninfo, sender_port and sender_host, for security. These
275 * fields are filled once the connection is fully established.
276 */
277 SpinLockAcquire(&walrcv->mutex);
278 memset(walrcv->conninfo, 0, MAXCONNINFO);
279 memset(walrcv->sender_host, 0, NI_MAXHOST);
280 walrcv->sender_port = 0;
281 walrcv->ready_to_display = true;
282 SpinLockRelease(&walrcv->mutex);
283
284 /* Establish the connection to the primary for XLOG streaming */
285 appname = cluster_name[0] ? cluster_name : "walreceiver";
286 wrconn = walrcv_connect(conninfo, true, false, false, appname, &err);
287 if (!wrconn)
290 errmsg("streaming replication receiver \"%s\" could not connect to the primary server: %s",
291 appname, err)));
292
293 /*
294 * Save user-visible connection string, now that the connection has been
295 * achieved.
296 */
298 walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
299 SpinLockAcquire(&walrcv->mutex);
300 if (tmp_conninfo)
302 if (sender_host)
303 strlcpy(walrcv->sender_host, sender_host, NI_MAXHOST);
304 walrcv->sender_port = sender_port;
305 SpinLockRelease(&walrcv->mutex);
306
307 if (tmp_conninfo)
309
310 if (sender_host)
311 pfree(sender_host);
312
313 /* Initialize buffers for processing messages */
315
316 first_stream = true;
317 for (;;)
318 {
319 char *primary_sysid;
320 char standby_sysid[32];
322
323 /*
324 * Check that we're connected to a valid server using the
325 * IDENTIFY_SYSTEM replication command.
326 */
328
332 {
335 errmsg("database system identifier differs between the primary and standby"),
336 errdetail("The primary's identifier is %s, the standby's identifier is %s.",
338 }
340
341 /*
342 * Confirm that the current timeline of the primary is the same or
343 * ahead of ours.
344 */
345 if (primaryTLI < startpointTLI)
348 errmsg("highest timeline %u of the primary is behind recovery timeline %u",
349 primaryTLI, startpointTLI)));
350
351 /*
352 * Get any missing history files. We do this always, even when we're
353 * not interested in that timeline, so that if we're promoted to
354 * become the primary later on, we don't select the same timeline that
355 * was already used in the current primary. This isn't bullet-proof -
356 * you'll need some external software to manage your cluster if you
357 * need to ensure that a unique timeline id is chosen in every case,
358 * but let's avoid the confusion of timeline id collisions where we
359 * can.
360 */
362
363 /*
364 * Create temporary replication slot if requested, and update slot
365 * name in shared memory. (Note the slot name cannot already be set
366 * in this case.)
367 */
368 if (is_temp_slot)
369 {
370 snprintf(slotname, sizeof(slotname),
371 "pg_walreceiver_%lld",
372 (long long int) walrcv_get_backend_pid(wrconn));
373
374 walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL);
375
376 SpinLockAcquire(&walrcv->mutex);
377 strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
378 SpinLockRelease(&walrcv->mutex);
379 }
380
381 /*
382 * Start streaming.
383 *
384 * We'll try to start at the requested starting point and timeline,
385 * even if it's different from the server's latest timeline. In case
386 * we've already reached the end of the old timeline, the server will
387 * finish the streaming immediately, and we will go back to await
388 * orders from the startup process. If recovery_target_timeline is
389 * 'latest', the startup process will scan pg_wal and find the new
390 * history file, bump recovery target timeline, and ask us to restart
391 * on the new timeline.
392 */
393 options.logical = false;
394 options.startpoint = startpoint;
395 options.slotname = slotname[0] != '\0' ? slotname : NULL;
396 options.proto.physical.startpointTLI = startpointTLI;
398 {
399 if (first_stream)
400 ereport(LOG,
401 errmsg("started streaming WAL from primary at %X/%08X on timeline %u",
402 LSN_FORMAT_ARGS(startpoint), startpointTLI));
403 else
404 ereport(LOG,
405 errmsg("restarted WAL streaming at %X/%08X on timeline %u",
406 LSN_FORMAT_ARGS(startpoint), startpointTLI));
407 first_stream = false;
408
409 /*
410 * Switch to STREAMING after a successful connection if current
411 * state is CONNECTING. This switch happens after an initial
412 * startup, or after a restart as determined by
413 * WalRcvWaitForStartPosition().
414 */
415 SpinLockAcquire(&walrcv->mutex);
416 if (walrcv->walRcvState == WALRCV_CONNECTING)
417 walrcv->walRcvState = WALRCV_STREAMING;
418 SpinLockRelease(&walrcv->mutex);
419
420 /* Initialize LogstreamResult for processing messages */
422
423 /* Initialize nap wakeup times. */
425 for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
427
428 /* Send initial reply/feedback messages. */
429 XLogWalRcvSendReply(true, false, false);
431
432 /* Loop until end-of-streaming or error */
433 for (;;)
434 {
435 char *buf;
436 int len;
437 bool endofwal = false;
439 int rc;
441 long nap;
442
443 /*
444 * Exit walreceiver if we're not in recovery. This should not
445 * happen, but cross-check the status here.
446 */
447 if (!RecoveryInProgress())
450 errmsg("cannot continue WAL streaming, recovery has already ended")));
451
452 /* Process any requests or signals received recently */
454
456 {
457 ConfigReloadPending = false;
459 /* recompute wakeup times */
461 for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
464 }
465
466 /* See if we can read data immediately */
468 if (len != 0)
469 {
470 /*
471 * Process the received data, and any subsequent data we
472 * can read without blocking.
473 */
474 for (;;)
475 {
476 if (len > 0)
477 {
478 /*
479 * Something was received from primary, so adjust
480 * the ping and terminate wakeup times.
481 */
484 now);
486 XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
487 startpointTLI);
488 }
489 else if (len == 0)
490 break;
491 else if (len < 0)
492 {
493 ereport(LOG,
494 (errmsg("replication terminated by primary server"),
495 errdetail("End of WAL reached on timeline %u at %X/%08X.",
496 startpointTLI,
498 endofwal = true;
499 break;
500 }
502 }
503
504 /* Let the primary know that we received some data. */
505 XLogWalRcvSendReply(false, false, false);
506
507 /*
508 * If we've written some records, flush them to disk and
509 * let the startup process and primary server know about
510 * them.
511 */
512 XLogWalRcvFlush(false, startpointTLI);
513 }
514
515 /* Check if we need to exit the streaming loop. */
516 if (endofwal)
517 break;
518
519 /* Find the soonest wakeup time, to limit our nap. */
521 for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
523
524 /* Calculate the nap time, clamping as necessary. */
527
528 /*
529 * Ideally we would reuse a WaitEventSet object repeatedly
530 * here to avoid the overheads of WaitLatchOrSocket on epoll
531 * systems, but we can't be sure that libpq (or any other
532 * walreceiver implementation) has the same socket (even if
533 * the fd is the same number, it may have been closed and
534 * reopened since the last time). In future, if there is a
535 * function for removing sockets from WaitEventSet, then we
536 * could add and remove just the socket each time, potentially
537 * avoiding some system calls.
538 */
543 wait_fd,
544 nap,
546 if (rc & WL_LATCH_SET)
547 {
550
551 if (walrcv->apply_reply_requested)
552 {
553 /*
554 * The recovery process has asked us to send apply
555 * feedback now. Make sure the flag is really set to
556 * false in shared memory before sending the reply, so
557 * we don't miss a new request for a reply.
558 */
559 walrcv->apply_reply_requested = false;
561 XLogWalRcvSendReply(false, false, true);
562 }
563 }
564 if (rc & WL_TIMEOUT)
565 {
566 /*
567 * We didn't receive anything new. If we haven't heard
568 * anything from the server for more than
569 * wal_receiver_timeout / 2, ping the server. Also, if
570 * it's been longer than wal_receiver_status_interval
571 * since the last update we sent, send a status update to
572 * the primary anyway, to report any progress in applying
573 * WAL.
574 */
575 bool requestReply = false;
576
577 /*
578 * Report pending statistics to the cumulative stats
579 * system. This location is useful for the report as it
580 * is not within a tight loop in the WAL receiver, to
581 * avoid bloating pgstats with requests, while also making
582 * sure that the reports happen each time a status update
583 * is sent.
584 */
585 pgstat_report_wal(false);
586
587 /*
588 * Check if time since last receive from primary has
589 * reached the configured limit.
590 */
595 errmsg("terminating walreceiver due to timeout")));
596
597 /*
598 * If we didn't receive anything new for half of receiver
599 * replication timeout, then ping the server.
600 */
602 {
603 requestReply = true;
605 }
606
609 }
610 }
611
612 /*
613 * The backend finished streaming. Exit streaming COPY-mode from
614 * our side, too.
615 */
617
618 /*
619 * If the server had switched to a new timeline that we didn't
620 * know about when we began streaming, fetch its timeline history
621 * file now.
622 */
624 }
625 else
626 ereport(LOG,
627 (errmsg("primary server contains no more WAL on requested timeline %u",
628 startpointTLI)));
629
630 /*
631 * End of WAL reached on the requested timeline. Close the last
632 * segment, and await for new orders from the startup process.
633 */
634 if (recvFile >= 0)
635 {
637
638 XLogWalRcvFlush(false, startpointTLI);
640 if (close(recvFile) != 0)
643 errmsg("could not close WAL segment %s: %m",
644 xlogfname)));
645
646 /*
647 * Create .done file forcibly to prevent the streamed segment from
648 * being archived later.
649 */
652 else
654 }
655 recvFile = -1;
656
657 elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
658 WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
659 }
660 /* not reached */
661}
#define pg_memory_barrier()
Definition atomics.h:141
void AuxiliaryProcessMainCommon(void)
Definition auxprocess.c:41
sigset_t UnBlockSig
Definition pqsignal.c:22
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1765
#define Min(x, y)
Definition c.h:1091
#define UINT64_FORMAT
Definition c.h:635
#define pg_fallthrough
Definition c.h:161
void load_file(const char *filename, bool restricted)
Definition dfmgr.c:149
int errcode_for_file_access(void)
Definition elog.c:898
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define PANIC
Definition elog.h:44
#define DEBUG1
Definition elog.h:31
void err(int eval, const char *fmt,...)
Definition err.c:43
ProcNumber MyProcNumber
Definition globals.c:92
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
@ PGC_SIGHUP
Definition guc.h:75
char * cluster_name
Definition guc_tables.c:582
#define close(a)
Definition win32.h:12
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
int i
Definition isn.c:77
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition latch.c:223
static char buf[DEFAULT_XLOG_SEG_SIZE]
#define die(msg)
void pgstat_report_wal(bool force)
Definition pgstat_wal.c:46
#define pqsignal
Definition port.h:548
#define PG_SIG_IGN
Definition port.h:552
int pgsocket
Definition port.h:29
#define PGINVALID_SOCKET
Definition port.h:31
#define PG_SIG_DFL
Definition port.h:551
#define PointerGetDatum(X)
Definition postgres.h:354
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition procsignal.c:696
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
#define WL_SOCKET_READABLE
#define WL_TIMEOUT
#define NUM_WALRCV_WAKEUPS
static StringInfoData reply_message
static int recvFile
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
static TimeLineID recvFileTLI
WalReceiverFunctionsType * WalReceiverFunctions
Definition walreceiver.c:96
static void XLogWalRcvSendReply(bool force, bool requestReply, bool checkApply)
static XLogSegNo recvSegNo
static void XLogWalRcvSendHSFeedback(bool immed)
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
static void WalRcvDie(int code, Datum arg)
static struct @19 LogstreamResult
#define walrcv_startstreaming(conn, options)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
#define walrcv_get_conninfo(conn)
#define walrcv_endstreaming(conn, next_tli)
#define walrcv_identify_system(conn, primary_tli)
#define walrcv_get_backend_pid(conn)
#define walrcv_receive(conn, buffer, wait_fd)
#define SIGCHLD
Definition win32_port.h:168
#define SIGHUP
Definition win32_port.h:158
#define SIGPIPE
Definition win32_port.h:163
#define SIGUSR1
Definition win32_port.h:170
#define SIGALRM
Definition win32_port.h:164
#define SIGUSR2
Definition win32_port.h:171
uint64 GetSystemIdentifier(void)
Definition xlog.c:4643
bool RecoveryInProgress(void)
Definition xlog.c:6834
int wal_segment_size
Definition xlog.c:150
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References ARCHIVE_MODE_ALWAYS, Assert, AuxiliaryProcessMainCommon(), buf, CHECK_FOR_INTERRUPTS, close, cluster_name, ConditionVariableBroadcast(), ConfigReloadPending, DEBUG1, die, elog, ereport, err(), errcode(), errcode_for_file_access(), errdetail(), errmsg, ERROR, FATAL, fb(), GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), i, initStringInfo(), len, load_file(), LOG, LogstreamResult, LSN_FORMAT_ARGS, MAXCONNINFO, MAXFNAMELEN, Min, MyLatch, MyProcNumber, MyProcPid, NAMEDATALEN, now(), NUM_WALRCV_WAKEUPS, on_shmem_exit(), PANIC, pfree(), pg_fallthrough, pg_memory_barrier, PG_SIG_DFL, PG_SIG_IGN, PGC_SIGHUP, PGINVALID_SOCKET, pgstat_report_wal(), PointerGetDatum, pqsignal, proc_exit(), ProcessConfigFile(), procsignal_sigusr1_handler(), RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, reply_message, ResetLatch(), SIGALRM, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, snprintf, SpinLockAcquire(), SpinLockRelease(), strlcpy(), TIMESTAMP_INFINITY, TimestampDifferenceMilliseconds(), UINT64_FORMAT, UnBlockSig, WaitLatchOrSocket(), wakeup, wal_segment_size, WalRcv, walrcv_connect, WALRCV_CONNECTING, walrcv_create_slot, walrcv_endstreaming, walrcv_get_backend_pid, walrcv_get_conninfo, walrcv_get_senderinfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_TERMINATE, WalRcvComputeNextWakeup(), WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvWaitForStartPosition(), WalReceiverFunctions, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_TIMEOUT, wrconn, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName(), XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ XLogWalRcvClose()

static void XLogWalRcvClose ( XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 1075 of file walreceiver.c.

1076{
1077 char xlogfname[MAXFNAMELEN];
1078
1080 Assert(tli != 0);
1081
1082 /*
1083 * fsync() and close current file before we switch to next one. We would
1084 * otherwise have to reopen this file to fsync it later
1085 */
1086 XLogWalRcvFlush(false, tli);
1087
1089
1090 /*
1091 * XLOG segment files will be re-read by recovery in startup process soon,
1092 * so we don't advise the OS to release cache pages associated with the
1093 * file like XLogFileClose() does.
1094 */
1095 if (close(recvFile) != 0)
1096 ereport(PANIC,
1098 errmsg("could not close WAL segment %s: %m",
1099 xlogfname)));
1100
1101 /*
1102 * Create .done file forcibly to prevent the streamed segment from being
1103 * archived later.
1104 */
1107 else
1109
1110 recvFile = -1;
1111}
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

References ARCHIVE_MODE_ALWAYS, Assert, close, ereport, errcode_for_file_access(), errmsg, fb(), MAXFNAMELEN, PANIC, recvFile, recvFileTLI, recvSegNo, wal_segment_size, XLByteInSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName(), and XLogWalRcvFlush().

Referenced by XLogWalRcvWrite().

◆ XLogWalRcvFlush()

static void XLogWalRcvFlush ( bool  dying,
TimeLineID  tli 
)
static

Definition at line 1014 of file walreceiver.c.

1015{
1016 Assert(tli != 0);
1017
1018 if (LogstreamResult.Flush < LogstreamResult.Write)
1019 {
1021
1023
1024 LogstreamResult.Flush = LogstreamResult.Write;
1025
1026 /* Update shared-memory status */
1027 SpinLockAcquire(&walrcv->mutex);
1028 if (walrcv->flushedUpto < LogstreamResult.Flush)
1029 {
1030 walrcv->latestChunkStart = walrcv->flushedUpto;
1031 walrcv->flushedUpto = LogstreamResult.Flush;
1032 walrcv->receivedTLI = tli;
1033 }
1034 SpinLockRelease(&walrcv->mutex);
1035
1036 /*
1037 * Wake up processes waiting for standby flush LSN to reach current
1038 * flush position.
1039 */
1041
1042 /* Signal the startup process and walsender that new WAL has arrived */
1045 WalSndWakeup(true, false);
1046
1047 /* Report XLOG streaming progress in PS display */
1049 {
1050 char activitymsg[50];
1051
1052 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%08X",
1055 }
1056
1057 /* Also let the primary know that we made some progress */
1058 if (!dying)
1059 {
1060 XLogWalRcvSendReply(false, false, false);
1062 }
1063 }
1064}
#define AllowCascadeReplication()
Definition walreceiver.h:40
void WalSndWakeup(bool physical, bool logical)
Definition walsender.c:4012
void issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli)
Definition xlog.c:9360
void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
Definition xlogwait.c:344
@ WAIT_LSN_TYPE_STANDBY_FLUSH
Definition xlogwait.h:41

References AllowCascadeReplication, Assert, fb(), issue_xlog_fsync(), LogstreamResult, LSN_FORMAT_ARGS, recvFile, recvSegNo, set_ps_display(), snprintf, SpinLockAcquire(), SpinLockRelease(), update_process_title, WAIT_LSN_TYPE_STANDBY_FLUSH, WaitLSNWakeup(), WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvClose().

◆ XLogWalRcvProcessMsg()

static void XLogWalRcvProcessMsg ( unsigned char  type,
char buf,
Size  len,
TimeLineID  tli 
)
static

Definition at line 842 of file walreceiver.c.

843{
844 int hdrlen;
848 bool replyRequested;
849
850 switch (type)
851 {
853 {
855
856 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
857 if (len < hdrlen)
860 errmsg_internal("invalid WAL message received from primary")));
861
862 /* initialize a StringInfo with the given buffer */
864
865 /* read the fields */
870
871 buf += hdrlen;
872 len -= hdrlen;
874 break;
875 }
877 {
879
880 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
881 if (len != hdrlen)
884 errmsg_internal("invalid keepalive message received from primary")));
885
886 /* initialize a StringInfo with the given buffer */
888
889 /* read the fields */
893
895
896 /* If the primary requested a reply, send one immediately */
897 if (replyRequested)
898 XLogWalRcvSendReply(true, false, false);
899 break;
900 }
901 default:
904 errmsg_internal("invalid replication message type %d",
905 type)));
906 }
907}
int64_t int64
Definition c.h:621
int pq_getmsgbyte(StringInfo msg)
Definition pqformat.c:398
int64 pq_getmsgint64(StringInfo msg)
Definition pqformat.c:452
#define PqReplMsg_WALData
Definition protocol.h:77
#define PqReplMsg_Keepalive
Definition protocol.h:75
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition stringinfo.h:157
const char * type
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)

References buf, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg_internal(), ERROR, fb(), initReadOnlyStringInfo(), len, pq_getmsgbyte(), pq_getmsgint64(), PqReplMsg_Keepalive, PqReplMsg_WALData, ProcessWalSndrMessage(), type, XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

◆ XLogWalRcvSendHSFeedback()

static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1210 of file walreceiver.c.

1211{
1214 TransactionId nextXid;
1217 TransactionId xmin,
1218 catalog_xmin;
1219
1220 /* initially true so we always send at least one feedback message */
1221 static bool primary_has_standby_xmin = true;
1222
1223 /*
1224 * If the user doesn't want status to be reported to the primary, be sure
1225 * to exit before doing anything at all.
1226 */
1229 return;
1230
1231 /* Get current timestamp. */
1233
1234 /* Send feedback at most once per wal_receiver_status_interval. */
1236 return;
1237
1238 /* Make sure we wake up when it's time to send feedback again. */
1240
1241 /*
1242 * If Hot Standby is not yet accepting connections there is nothing to
1243 * send. Check this after the interval has expired to reduce number of
1244 * calls.
1245 *
1246 * Bailing out here also ensures that we don't send feedback until we've
1247 * read our own replication slot state, so we don't tell the primary to
1248 * discard needed xmin or catalog_xmin from any slots that may exist on
1249 * this replica.
1250 */
1251 if (!HotStandbyActive())
1252 return;
1253
1254 /*
1255 * Make the expensive call to get the oldest xmin once we are certain
1256 * everything else has been checked.
1257 */
1259 {
1260 GetReplicationHorizons(&xmin, &catalog_xmin);
1261 }
1262 else
1263 {
1264 xmin = InvalidTransactionId;
1265 catalog_xmin = InvalidTransactionId;
1266 }
1267
1268 /*
1269 * Get epoch and adjust if nextXid and oldestXmin are different sides of
1270 * the epoch boundary.
1271 */
1276 if (nextXid < xmin)
1277 xmin_epoch--;
1278 if (nextXid < catalog_xmin)
1280
1281 elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1282 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1283
1284 /* Construct the message and send it. */
1290 pq_sendint32(&reply_message, catalog_xmin);
1293 if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1295 else
1297}
uint32_t uint32
Definition c.h:624
uint32 TransactionId
Definition c.h:736
static void pq_sendint32(StringInfo buf, uint32 i)
Definition pqformat.h:144
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition pqformat.h:152
void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin)
Definition procarray.c:1986
#define PqReplMsg_HotStandbyFeedback
Definition protocol.h:82
void resetStringInfo(StringInfo str)
Definition stringinfo.c:126
#define InvalidTransactionId
Definition transam.h:31
#define EpochFromFullTransactionId(x)
Definition transam.h:47
#define XidFromFullTransactionId(x)
Definition transam.h:48
#define TransactionIdIsValid(xid)
Definition transam.h:41
FullTransactionId ReadNextFullTransactionId(void)
Definition varsup.c:283
#define walrcv_send(conn, buffer, nbytes)
bool HotStandbyActive(void)

References StringInfoData::data, DEBUG2, elog, EpochFromFullTransactionId, fb(), GetCurrentTimestamp(), GetReplicationHorizons(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), pq_sendbyte(), pq_sendint32(), pq_sendint64(), PqReplMsg_HotStandbyFeedback, ReadNextFullTransactionId(), reply_message, resetStringInfo(), TransactionIdIsValid, wakeup, wal_receiver_status_interval, walrcv_send, WALRCV_WAKEUP_HSFEEDBACK, WalRcvComputeNextWakeup(), wrconn, and XidFromFullTransactionId.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

◆ XLogWalRcvSendReply()

static void XLogWalRcvSendReply ( bool  force,
bool  requestReply,
bool  checkApply 
)
static

Definition at line 1137 of file walreceiver.c.

1138{
1144
1145 /*
1146 * If the user doesn't want status to be reported to the primary, be sure
1147 * to exit before doing anything at all.
1148 */
1149 if (!force && wal_receiver_status_interval <= 0)
1150 return;
1151
1152 /* Get current timestamp. */
1154
1155 /*
1156 * We can compare the write and flush positions to the last message we
1157 * sent without taking any lock, but the apply position requires a spin
1158 * lock, so we don't check that unless it is expected to advance since the
1159 * previous update, i.e., when 'checkApply' is true.
1160 */
1161 if (!force && now < wakeup[WALRCV_WAKEUP_REPLY])
1162 {
1163 if (checkApply)
1165
1166 if (writePtr == LogstreamResult.Write
1167 && flushPtr == LogstreamResult.Flush
1169 return;
1170 }
1171
1172 /* Make sure we wake up when it's time to send another reply. */
1174
1175 /* Construct a new message */
1176 writePtr = LogstreamResult.Write;
1177 flushPtr = LogstreamResult.Flush;
1180
1188
1189 /* Send it */
1190 elog(DEBUG2, "sending write %X/%08X flush %X/%08X apply %X/%08X%s",
1194 requestReply ? " (reply requested)" : "");
1195
1197}
#define PqReplMsg_StandbyStatusUpdate
Definition protocol.h:84

References StringInfoData::data, DEBUG2, elog, fb(), GetCurrentTimestamp(), GetXLogReplayRecPtr(), InvalidXLogRecPtr, StringInfoData::len, LogstreamResult, LSN_FORMAT_ARGS, now(), pq_sendbyte(), pq_sendint64(), PqReplMsg_StandbyStatusUpdate, reply_message, resetStringInfo(), wakeup, wal_receiver_status_interval, walrcv_send, WALRCV_WAKEUP_REPLY, WalRcvComputeNextWakeup(), wrconn, and XLogRecPtrIsValid.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

◆ XLogWalRcvWrite()

static void XLogWalRcvWrite ( char buf,
Size  nbytes,
XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 913 of file walreceiver.c.

914{
915 int startoff;
916 int byteswritten;
918
919 Assert(tli != 0);
920
921 while (nbytes > 0)
922 {
923 int segbytes;
924
925 /* Close the current segment if it's completed */
928
929 if (recvFile < 0)
930 {
931 /* Create/use new log file */
934 recvFileTLI = tli;
935 }
936
937 /* Calculate the start offset of the received logs */
939
940 if (startoff + nbytes > wal_segment_size)
942 else
943 segbytes = nbytes;
944
945 /* OK to write the logs */
946 errno = 0;
947
948 /*
949 * Measure I/O timing to write WAL data, for pg_stat_io.
950 */
952
956
959
960 if (byteswritten <= 0)
961 {
963 int save_errno;
964
965 /* if write didn't set errno, assume no disk space */
966 if (errno == 0)
967 errno = ENOSPC;
968
974 errmsg("could not write to WAL segment %s "
975 "at offset %d, length %d: %m",
977 }
978
979 /* Update state for write */
981
982 nbytes -= byteswritten;
983 buf += byteswritten;
984
985 LogstreamResult.Write = recptr;
986 }
987
988 /* Update shared-memory status */
990
991 /*
992 * Wake up processes waiting for standby write LSN to reach current write
993 * position.
994 */
996
997 /*
998 * Close the current segment if it's fully written up in the last cycle of
999 * the loop, to create its archive notification file soon. Otherwise WAL
1000 * archiving of the segment will be delayed until any data in the next
1001 * segment is received and written.
1002 */
1004 XLogWalRcvClose(recptr, tli);
1005}
static void pg_atomic_write_membarrier_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:504
return str start
@ IOOBJECT_WAL
Definition pgstat.h:283
@ IOCONTEXT_NORMAL
Definition pgstat.h:293
@ IOOP_WRITE
Definition pgstat.h:320
instr_time pgstat_prepare_io_time(bool track_io_guc)
Definition pgstat_io.c:91
void pgstat_count_io_op_time(IOObject io_object, IOContext io_context, IOOp io_op, instr_time start_time, uint32 cnt, uint64 bytes)
Definition pgstat_io.c:122
#define pg_pwrite
Definition port.h:249
off_t pgoff_t
Definition port.h:422
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition wait_event.h:67
static void pgstat_report_wait_end(void)
Definition wait_event.h:83
static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
int XLogFileInit(XLogSegNo logsegno, TimeLineID logtli)
Definition xlog.c:3431
bool track_wal_io_timing
Definition xlog.c:144
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
@ WAIT_LSN_TYPE_STANDBY_WRITE
Definition xlogwait.h:40

References Assert, buf, ereport, errcode_for_file_access(), errmsg, fb(), IOCONTEXT_NORMAL, IOOBJECT_WAL, IOOP_WRITE, LogstreamResult, MAXFNAMELEN, PANIC, pg_atomic_write_membarrier_u64(), pg_pwrite, pgstat_count_io_op_time(), pgstat_prepare_io_time(), pgstat_report_wait_end(), pgstat_report_wait_start(), recvFile, recvFileTLI, recvSegNo, start, track_wal_io_timing, WAIT_LSN_TYPE_STANDBY_WRITE, WaitLSNWakeup(), wal_segment_size, WalRcv, WalRcvData::writtenUpto, XLByteInSeg, XLByteToSeg, XLogFileInit(), XLogFileName(), XLogSegmentOffset, and XLogWalRcvClose().

Referenced by XLogWalRcvProcessMsg().

Variable Documentation

◆ Flush

XLogRecPtr Flush

Definition at line 114 of file walreceiver.c.

Referenced by XLogWrite().

◆ hot_standby_feedback

bool hot_standby_feedback

◆ [struct]

struct { ... } LogstreamResult

◆ recvFile

int recvFile = -1
static

Definition at line 103 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ recvFileTLI

TimeLineID recvFileTLI = 0
static

Definition at line 104 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), and XLogWalRcvWrite().

◆ recvSegNo

XLogSegNo recvSegNo = 0
static

Definition at line 105 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ reply_message

StringInfoData reply_message
static

◆ wakeup

◆ wal_receiver_status_interval

◆ wal_receiver_timeout

int wal_receiver_timeout

◆ WalReceiverFunctions

WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 96 of file walreceiver.c.

Referenced by _PG_init(), and WalReceiverMain().

◆ wrconn

◆ Write

XLogRecPtr Write

Definition at line 113 of file walreceiver.c.

Referenced by XLogWrite().