PostgreSQL Source Code git master
Loading...
Searching...
No Matches
walreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "access/xlogrecovery.h"
#include "access/xlogwait.h"
#include "catalog/pg_authid.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/auxprocess.h"
#include "postmaster/interrupt.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/timestamp.h"
#include "utils/wait_event.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NUM_WALRCV_WAKEUPS   (WALRCV_WAKEUP_HSFEEDBACK + 1)
 

Typedefs

typedef enum WalRcvWakeupReason WalRcvWakeupReason
 

Enumerations

enum  WalRcvWakeupReason { WALRCV_WAKEUP_TERMINATE , WALRCV_WAKEUP_PING , WALRCV_WAKEUP_REPLY , WALRCV_WAKEUP_HSFEEDBACK }
 

Functions

static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len, TimeLineID tli)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvFlush (bool dying, TimeLineID tli)
 
static void XLogWalRcvClose (XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvSendReply (bool force, bool requestReply, bool checkApply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
static void WalRcvComputeNextWakeup (WalRcvWakeupReason reason, TimestampTz now)
 
void WalReceiverMain (const void *startup_data, size_t startup_data_len)
 
void WalRcvRequestApplyReply (void)
 
static const charWalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
struct { 
 
   XLogRecPtr   Write 
 
   XLogRecPtr   Flush 
 
LogstreamResult 
 
static TimestampTz wakeup [NUM_WALRCV_WAKEUPS]
 
static StringInfoData reply_message
 

Macro Definition Documentation

◆ NUM_WALRCV_WAKEUPS

#define NUM_WALRCV_WAKEUPS   (WALRCV_WAKEUP_HSFEEDBACK + 1)

Definition at line 126 of file walreceiver.c.

Typedef Documentation

◆ WalRcvWakeupReason

Enumeration Type Documentation

◆ WalRcvWakeupReason

Enumerator
WALRCV_WAKEUP_TERMINATE 
WALRCV_WAKEUP_PING 
WALRCV_WAKEUP_REPLY 
WALRCV_WAKEUP_HSFEEDBACK 

Definition at line 120 of file walreceiver.c.

121{
126#define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1)
WalRcvWakeupReason
@ WALRCV_WAKEUP_TERMINATE
@ WALRCV_WAKEUP_REPLY
@ WALRCV_WAKEUP_PING
@ WALRCV_WAKEUP_HSFEEDBACK

Function Documentation

◆ pg_stat_get_wal_receiver()

Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1438 of file walreceiver.c.

1439{
1440 TupleDesc tupdesc;
1441 Datum *values;
1442 bool *nulls;
1443 int pid;
1444 bool ready_to_display;
1451 TimestampTz last_send_time;
1455 char sender_host[NI_MAXHOST];
1456 int sender_port = 0;
1457 char slotname[NAMEDATALEN];
1458 char conninfo[MAXCONNINFO];
1459
1460 /* Take a lock to ensure value consistency */
1462 pid = (int) WalRcv->pid;
1463 ready_to_display = WalRcv->ready_to_display;
1469 last_send_time = WalRcv->lastMsgSendTime;
1473 strlcpy(slotname, WalRcv->slotname, sizeof(slotname));
1474 strlcpy(sender_host, WalRcv->sender_host, sizeof(sender_host));
1475 sender_port = WalRcv->sender_port;
1476 strlcpy(conninfo, WalRcv->conninfo, sizeof(conninfo));
1478
1479 /*
1480 * No WAL receiver (or not ready yet), just return a tuple with NULL
1481 * values
1482 */
1483 if (pid == 0 || !ready_to_display)
1485
1486 /*
1487 * Read "writtenUpto" without holding a spinlock. Note that it may not be
1488 * consistent with the other shared variables of the WAL receiver
1489 * protected by a spinlock, but this should not be used for data integrity
1490 * checks.
1491 */
1493
1494 /* determine result type */
1495 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1496 elog(ERROR, "return type must be a row type");
1497
1498 values = palloc0_array(Datum, tupdesc->natts);
1499 nulls = palloc0_array(bool, tupdesc->natts);
1500
1501 /* Fetch values */
1502 values[0] = Int32GetDatum(pid);
1503
1505 {
1506 /*
1507 * Only superusers and roles with privileges of pg_read_all_stats can
1508 * see details. Other users only get the pid value to know whether it
1509 * is a WAL receiver, but no details.
1510 */
1511 memset(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1512 }
1513 else
1514 {
1516
1518 nulls[2] = true;
1519 else
1523 nulls[4] = true;
1524 else
1527 nulls[5] = true;
1528 else
1531 if (last_send_time == 0)
1532 nulls[7] = true;
1533 else
1534 values[7] = TimestampTzGetDatum(last_send_time);
1535 if (last_receipt_time == 0)
1536 nulls[8] = true;
1537 else
1540 nulls[9] = true;
1541 else
1543 if (latest_end_time == 0)
1544 nulls[10] = true;
1545 else
1547 if (*slotname == '\0')
1548 nulls[11] = true;
1549 else
1550 values[11] = CStringGetTextDatum(slotname);
1551 if (*sender_host == '\0')
1552 nulls[12] = true;
1553 else
1554 values[12] = CStringGetTextDatum(sender_host);
1555 if (sender_port == 0)
1556 nulls[13] = true;
1557 else
1558 values[13] = Int32GetDatum(sender_port);
1559 if (*conninfo == '\0')
1560 nulls[14] = true;
1561 else
1562 values[14] = CStringGetTextDatum(conninfo);
1563 }
1564
1565 /* Returns the record as Datum */
1567}
bool has_privs_of_role(Oid member, Oid role)
Definition acl.c:5314
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition atomics.h:467
static Datum values[MAXATTR]
Definition bootstrap.c:190
#define CStringGetTextDatum(s)
Definition builtins.h:98
int64 TimestampTz
Definition timestamp.h:39
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define palloc0_array(type, count)
Definition fe_memutils.h:77
#define PG_RETURN_NULL()
Definition fmgr.h:346
#define PG_RETURN_DATUM(x)
Definition fmgr.h:354
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition funcapi.c:276
@ TYPEFUNC_COMPOSITE
Definition funcapi.h:149
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
Definition funcapi.h:230
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1025
Oid GetUserId(void)
Definition miscinit.c:470
#define NAMEDATALEN
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition strlcpy.c:45
uint64_t Datum
Definition postgres.h:70
static Datum Int32GetDatum(int32 X)
Definition postgres.h:212
static int fb(int x)
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
TimestampTz lastMsgReceiptTime
XLogRecPtr latestWalEnd
TimeLineID receiveStartTLI
Definition walreceiver.h:88
TimeLineID receivedTLI
Definition walreceiver.h:98
char slotname[NAMEDATALEN]
char sender_host[NI_MAXHOST]
XLogRecPtr receiveStart
Definition walreceiver.h:87
XLogRecPtr flushedUpto
Definition walreceiver.h:97
pg_atomic_uint64 writtenUpto
TimestampTz lastMsgSendTime
WalRcvState walRcvState
Definition walreceiver.h:72
TimestampTz latestWalEndTime
bool ready_to_display
slock_t mutex
char conninfo[MAXCONNINFO]
static Datum TimestampTzGetDatum(TimestampTz X)
Definition timestamp.h:52
static const char * WalRcvGetStateString(WalRcvState state)
#define MAXCONNINFO
Definition walreceiver.h:37
WalRcvState
Definition walreceiver.h:46
WalRcvData * WalRcv
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
uint64 XLogRecPtr
Definition xlogdefs.h:21
uint32 TimeLineID
Definition xlogdefs.h:63

References WalRcvData::conninfo, CStringGetTextDatum, elog, ERROR, fb(), WalRcvData::flushedUpto, get_call_result_type(), GetUserId(), has_privs_of_role(), heap_form_tuple(), HeapTupleGetDatum(), Int32GetDatum(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum(), MAXCONNINFO, WalRcvData::mutex, NAMEDATALEN, TupleDescData::natts, palloc0_array, pg_atomic_read_u64(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::sender_host, WalRcvData::sender_port, WalRcvData::slotname, SpinLockAcquire(), SpinLockRelease(), strlcpy(), TimestampTzGetDatum(), TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, WalRcvData::writtenUpto, and XLogRecPtrIsValid.

◆ ProcessWalSndrMessage()

static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1300 of file walreceiver.c.

1301{
1303 TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1304
1305 /* Update shared-memory status */
1306 SpinLockAcquire(&walrcv->mutex);
1307 if (walrcv->latestWalEnd < walEnd)
1308 walrcv->latestWalEndTime = sendTime;
1309 walrcv->latestWalEnd = walEnd;
1310 walrcv->lastMsgSendTime = sendTime;
1311 walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1312 SpinLockRelease(&walrcv->mutex);
1313
1315 {
1316 char *sendtime;
1317 char *receipttime;
1318 int applyDelay;
1319
1320 /* Copy because timestamptz_to_str returns a static buffer */
1322 receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1324
1325 /* apply delay is not available */
1326 if (applyDelay == -1)
1327 elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1328 sendtime,
1331 else
1332 elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1333 sendtime,
1335 applyDelay,
1337
1338 pfree(sendtime);
1340 }
1341}
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1639
const char * timestamptz_to_str(TimestampTz t)
Definition timestamp.c:1856
bool message_level_is_interesting(int elevel)
Definition elog.c:285
#define DEBUG2
Definition elog.h:30
char * pstrdup(const char *in)
Definition mcxt.c:1781
void pfree(void *pointer)
Definition mcxt.c:1616
int GetReplicationApplyDelay(void)
int GetReplicationTransferLatency(void)

References DEBUG2, elog, fb(), GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), message_level_is_interesting(), pfree(), pstrdup(), SpinLockAcquire(), SpinLockRelease(), timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

◆ WalRcvComputeNextWakeup()

static void WalRcvComputeNextWakeup ( WalRcvWakeupReason  reason,
TimestampTz  now 
)
static

Definition at line 1352 of file walreceiver.c.

1353{
1354 switch (reason)
1355 {
1357 if (wal_receiver_timeout <= 0)
1358 wakeup[reason] = TIMESTAMP_INFINITY;
1359 else
1361 break;
1362 case WALRCV_WAKEUP_PING:
1363 if (wal_receiver_timeout <= 0)
1364 wakeup[reason] = TIMESTAMP_INFINITY;
1365 else
1367 break;
1370 wakeup[reason] = TIMESTAMP_INFINITY;
1371 else
1373 break;
1376 wakeup[reason] = TIMESTAMP_INFINITY;
1377 else
1379 break;
1380 /* there's intentionally no default: here */
1381 }
1382}
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1603
#define TIMESTAMP_INFINITY
Definition timestamp.h:151
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
#define TimestampTzPlusSeconds(tz, s)
Definition timestamp.h:86
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]
bool hot_standby_feedback
Definition walreceiver.c:92
int wal_receiver_status_interval
Definition walreceiver.c:90
int wal_receiver_timeout
Definition walreceiver.c:91

References hot_standby_feedback, now(), TIMESTAMP_INFINITY, TimestampTzPlusMilliseconds, TimestampTzPlusSeconds, wakeup, wal_receiver_status_interval, wal_receiver_timeout, WALRCV_WAKEUP_HSFEEDBACK, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_REPLY, and WALRCV_WAKEUP_TERMINATE.

Referenced by WalReceiverMain(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ WalRcvDie()

static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 792 of file walreceiver.c.

793{
796
797 Assert(*startpointTLI_p != 0);
798
799 /* Ensure that all WAL records received are flushed to disk */
801
802 /* Mark ourselves inactive in shared memory */
803 SpinLockAcquire(&walrcv->mutex);
804 Assert(walrcv->walRcvState == WALRCV_STREAMING ||
805 walrcv->walRcvState == WALRCV_CONNECTING ||
806 walrcv->walRcvState == WALRCV_RESTARTING ||
807 walrcv->walRcvState == WALRCV_STARTING ||
808 walrcv->walRcvState == WALRCV_WAITING ||
809 walrcv->walRcvState == WALRCV_STOPPING);
810 Assert(walrcv->pid == MyProcPid);
811 walrcv->walRcvState = WALRCV_STOPPED;
812 walrcv->pid = 0;
813 walrcv->procno = INVALID_PROC_NUMBER;
814 walrcv->ready_to_display = false;
815 SpinLockRelease(&walrcv->mutex);
816
817 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
818
819 /* Terminate the connection gracefully. */
820 if (wrconn != NULL)
822
823 /* Wake up the startup process to notice promptly that we're gone */
825}
#define Assert(condition)
Definition c.h:943
void ConditionVariableBroadcast(ConditionVariable *cv)
Datum arg
Definition elog.c:1323
int MyProcPid
Definition globals.c:49
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:332
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
static WalReceiverConn * wrconn
Definition walreceiver.c:95
static void XLogWalRcvFlush(bool dying, TimeLineID tli)
@ WALRCV_STARTING
Definition walreceiver.h:48
@ WALRCV_STOPPED
Definition walreceiver.h:47
@ WALRCV_CONNECTING
Definition walreceiver.h:50
@ WALRCV_RESTARTING
Definition walreceiver.h:53
@ WALRCV_STREAMING
Definition walreceiver.h:51
@ WALRCV_WAITING
Definition walreceiver.h:52
@ WALRCV_STOPPING
Definition walreceiver.h:54
#define walrcv_disconnect(conn)
void WakeupRecovery(void)

References arg, Assert, ConditionVariableBroadcast(), DatumGetPointer(), fb(), INVALID_PROC_NUMBER, MyProcPid, SpinLockAcquire(), SpinLockRelease(), WakeupRecovery(), WalRcv, WALRCV_CONNECTING, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, wrconn, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

◆ WalRcvFetchTimeLineHistoryFiles()

static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 736 of file walreceiver.c.

737{
738 TimeLineID tli;
739
740 for (tli = first; tli <= last; tli++)
741 {
742 /* there's no history file for timeline 1 */
743 if (tli != 1 && !existsTimeLineHistory(tli))
744 {
745 char *fname;
746 char *content;
747 int len;
749
750 ereport(LOG,
751 (errmsg("fetching timeline history file for timeline %u from primary server",
752 tli)));
753
754 walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
755
756 /*
757 * Check that the filename on the primary matches what we
758 * calculated ourselves. This is just a sanity check, it should
759 * always match.
760 */
762 if (strcmp(fname, expectedfname) != 0)
765 errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
766 tli)));
767
768 /*
769 * Write the file to pg_wal.
770 */
771 writeTimeLineHistoryFile(tli, content, len);
772
773 /*
774 * Mark the streamed history file as ready for archiving if
775 * archive_mode is always.
776 */
779 else
780 XLogArchiveNotify(fname);
781
782 pfree(fname);
783 pfree(content);
784 }
785 }
786}
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition timeline.c:464
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition timeline.c:223
int errcode(int sqlerrcode)
Definition elog.c:875
#define LOG
Definition elog.h:32
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define ereport(elevel,...)
Definition elog.h:152
#define ERRCODE_PROTOCOL_VIOLATION
Definition fe-connect.c:96
static char * errmsg
const void size_t len
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
int XLogArchiveMode
Definition xlog.c:126
@ ARCHIVE_MODE_ALWAYS
Definition xlog.h:69
#define MAXFNAMELEN
static void TLHistoryFileName(char *fname, TimeLineID tli)
void XLogArchiveForceDone(const char *xlog)
void XLogArchiveNotify(const char *xlog)

References ARCHIVE_MODE_ALWAYS, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg, errmsg_internal(), ERROR, existsTimeLineHistory(), fb(), len, LOG, MAXFNAMELEN, pfree(), TLHistoryFileName(), walrcv_readtimelinehistoryfile, wrconn, writeTimeLineHistoryFile(), XLogArchiveForceDone(), XLogArchiveMode, and XLogArchiveNotify().

Referenced by WalReceiverMain().

◆ WalRcvGetStateString()

static const char * WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1411 of file walreceiver.c.

1412{
1413 switch (state)
1414 {
1415 case WALRCV_STOPPED:
1416 return "stopped";
1417 case WALRCV_STARTING:
1418 return "starting";
1419 case WALRCV_CONNECTING:
1420 return "connecting";
1421 case WALRCV_STREAMING:
1422 return "streaming";
1423 case WALRCV_WAITING:
1424 return "waiting";
1425 case WALRCV_RESTARTING:
1426 return "restarting";
1427 case WALRCV_STOPPING:
1428 return "stopping";
1429 }
1430 return "UNKNOWN";
1431}

References WALRCV_CONNECTING, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

◆ WalRcvRequestApplyReply()

void WalRcvRequestApplyReply ( void  )

Definition at line 1393 of file walreceiver.c.

1394{
1395 ProcNumber procno;
1396
1398 /* fetching the proc number is probably atomic, but don't rely on it */
1400 procno = WalRcv->procno;
1402 if (procno != INVALID_PROC_NUMBER)
1403 SetLatch(&GetPGProcByNumber(procno)->procLatch);
1404}
void SetLatch(Latch *latch)
Definition latch.c:290
#define GetPGProcByNumber(n)
Definition proc.h:504
int ProcNumber
Definition procnumber.h:24
ProcNumber procno
Definition walreceiver.h:68
sig_atomic_t apply_reply_requested

References WalRcvData::apply_reply_requested, GetPGProcByNumber, INVALID_PROC_NUMBER, WalRcvData::mutex, WalRcvData::procno, SetLatch(), SpinLockAcquire(), SpinLockRelease(), and WalRcv.

Referenced by ApplyWalRecord(), and WaitForWALToBecomeAvailable().

◆ WalRcvWaitForStartPosition()

static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 656 of file walreceiver.c.

657{
659 int state;
660
661 SpinLockAcquire(&walrcv->mutex);
662 state = walrcv->walRcvState;
664 {
665 SpinLockRelease(&walrcv->mutex);
666 if (state == WALRCV_STOPPING)
667 proc_exit(0);
668 else
669 elog(FATAL, "unexpected walreceiver state");
670 }
671 walrcv->walRcvState = WALRCV_WAITING;
672 walrcv->receiveStart = InvalidXLogRecPtr;
673 walrcv->receiveStartTLI = 0;
674 SpinLockRelease(&walrcv->mutex);
675
676 set_ps_display("idle");
677
678 /*
679 * nudge startup process to notice that we've stopped streaming and are
680 * now waiting for instructions.
681 */
683 for (;;)
684 {
686
688
689 SpinLockAcquire(&walrcv->mutex);
690 Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
691 walrcv->walRcvState == WALRCV_WAITING ||
692 walrcv->walRcvState == WALRCV_STOPPING);
693 if (walrcv->walRcvState == WALRCV_RESTARTING)
694 {
695 /*
696 * No need to handle changes in primary_conninfo or
697 * primary_slot_name here. Startup process will signal us to
698 * terminate in case those change.
699 */
700 *startpoint = walrcv->receiveStart;
701 *startpointTLI = walrcv->receiveStartTLI;
702 walrcv->walRcvState = WALRCV_CONNECTING;
703 SpinLockRelease(&walrcv->mutex);
704 break;
705 }
706 if (walrcv->walRcvState == WALRCV_STOPPING)
707 {
708 /*
709 * We should've received SIGTERM if the startup process wants us
710 * to die, but might as well check it here too.
711 */
712 SpinLockRelease(&walrcv->mutex);
713 proc_exit(1);
714 }
715 SpinLockRelease(&walrcv->mutex);
716
719 }
720
722 {
723 char activitymsg[50];
724
725 snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%08X",
726 LSN_FORMAT_ARGS(*startpoint));
728 }
729}
#define FATAL
Definition elog.h:42
struct Latch * MyLatch
Definition globals.c:65
void proc_exit(int code)
Definition ipc.c:105
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
#define snprintf
Definition port.h:260
bool update_process_title
Definition ps_status.c:31
static void set_ps_display(const char *activity)
Definition ps_status.h:40
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References Assert, CHECK_FOR_INTERRUPTS, elog, FATAL, fb(), InvalidXLogRecPtr, LSN_FORMAT_ARGS, MyLatch, proc_exit(), ResetLatch(), set_ps_display(), snprintf, SpinLockAcquire(), SpinLockRelease(), update_process_title, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_CONNECTING, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by WalReceiverMain().

◆ WalReceiverMain()

void WalReceiverMain ( const void startup_data,
size_t  startup_data_len 
)

Definition at line 154 of file walreceiver.c.

155{
156 char conninfo[MAXCONNINFO];
157 char *tmp_conninfo;
158 char slotname[NAMEDATALEN];
159 bool is_temp_slot;
160 XLogRecPtr startpoint;
161 TimeLineID startpointTLI;
163 bool first_stream;
166 char *err;
167 char *sender_host = NULL;
168 int sender_port = 0;
169 char *appname;
170
172
174
175 /*
176 * WalRcv should be set up already (if we are a backend, we inherit this
177 * by fork() or EXEC_BACKEND mechanism from the postmaster).
178 */
179 walrcv = WalRcv;
180 Assert(walrcv != NULL);
181
182 /*
183 * Mark walreceiver as running in shared memory.
184 *
185 * Do this as early as possible, so that if we fail later on, we'll set
186 * state to STOPPED. If we die before this, the startup process will keep
187 * waiting for us to start up, until it times out.
188 */
189 SpinLockAcquire(&walrcv->mutex);
190 Assert(walrcv->pid == 0);
191 switch (walrcv->walRcvState)
192 {
193 case WALRCV_STOPPING:
194 /* If we've already been requested to stop, don't start up. */
195 walrcv->walRcvState = WALRCV_STOPPED;
197
198 case WALRCV_STOPPED:
199 SpinLockRelease(&walrcv->mutex);
200 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
201 proc_exit(1);
202 break;
203
204 case WALRCV_STARTING:
205 /* The usual case */
206 break;
207
209 case WALRCV_WAITING:
210 case WALRCV_STREAMING:
212 default:
213 /* Shouldn't happen */
214 SpinLockRelease(&walrcv->mutex);
215 elog(PANIC, "walreceiver still running according to shared memory state");
216 }
217 /* Advertise our PID so that the startup process can kill us */
218 walrcv->pid = MyProcPid;
219 walrcv->walRcvState = WALRCV_CONNECTING;
220
221 /* Fetch information required to start streaming */
222 walrcv->ready_to_display = false;
223 strlcpy(conninfo, walrcv->conninfo, MAXCONNINFO);
224 strlcpy(slotname, walrcv->slotname, NAMEDATALEN);
225 is_temp_slot = walrcv->is_temp_slot;
226 startpoint = walrcv->receiveStart;
227 startpointTLI = walrcv->receiveStartTLI;
228
229 /*
230 * At most one of is_temp_slot and slotname can be set; otherwise,
231 * RequestXLogStreaming messed up.
232 */
233 Assert(!is_temp_slot || (slotname[0] == '\0'));
234
235 /* Initialise to a sanish value */
237 walrcv->lastMsgSendTime =
238 walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
239
240 /* Report our proc number so that others can wake us up */
241 walrcv->procno = MyProcNumber;
242
243 SpinLockRelease(&walrcv->mutex);
244
245 /* Arrange to clean up at walreceiver exit */
246 on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
247
248 /* Properly accept or ignore signals the postmaster might send us */
249 pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
250 * file */
252 pqsignal(SIGTERM, die); /* request shutdown */
253 /* SIGQUIT handler was already set up by InitPostmasterChild */
258
259 /* Reset some signals that are accepted by postmaster but not here */
261
262 /* Load the libpq-specific functions */
263 load_file("libpqwalreceiver", false);
265 elog(ERROR, "libpqwalreceiver didn't initialize correctly");
266
267 /* Unblock signals (they were blocked when the postmaster forked us) */
269
270 /* Establish the connection to the primary for XLOG streaming */
271 appname = cluster_name[0] ? cluster_name : "walreceiver";
272 wrconn = walrcv_connect(conninfo, true, false, false, appname, &err);
273 if (!wrconn)
276 errmsg("streaming replication receiver \"%s\" could not connect to the primary server: %s",
277 appname, err)));
278
279 /*
280 * Save user-visible connection string. This clobbers the original
281 * conninfo, for security. Also save host and port of the sender server
282 * this walreceiver is connected to.
283 */
285 walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
286 SpinLockAcquire(&walrcv->mutex);
287 memset(walrcv->conninfo, 0, MAXCONNINFO);
288 if (tmp_conninfo)
290
291 memset(walrcv->sender_host, 0, NI_MAXHOST);
292 if (sender_host)
293 strlcpy(walrcv->sender_host, sender_host, NI_MAXHOST);
294
295 walrcv->sender_port = sender_port;
296 walrcv->ready_to_display = true;
297 SpinLockRelease(&walrcv->mutex);
298
299 if (tmp_conninfo)
301
302 if (sender_host)
303 pfree(sender_host);
304
305 first_stream = true;
306 for (;;)
307 {
308 char *primary_sysid;
309 char standby_sysid[32];
311
312 /*
313 * Check that we're connected to a valid server using the
314 * IDENTIFY_SYSTEM replication command.
315 */
317
321 {
324 errmsg("database system identifier differs between the primary and standby"),
325 errdetail("The primary's identifier is %s, the standby's identifier is %s.",
327 }
328
329 /*
330 * Confirm that the current timeline of the primary is the same or
331 * ahead of ours.
332 */
333 if (primaryTLI < startpointTLI)
336 errmsg("highest timeline %u of the primary is behind recovery timeline %u",
337 primaryTLI, startpointTLI)));
338
339 /*
340 * Get any missing history files. We do this always, even when we're
341 * not interested in that timeline, so that if we're promoted to
342 * become the primary later on, we don't select the same timeline that
343 * was already used in the current primary. This isn't bullet-proof -
344 * you'll need some external software to manage your cluster if you
345 * need to ensure that a unique timeline id is chosen in every case,
346 * but let's avoid the confusion of timeline id collisions where we
347 * can.
348 */
350
351 /*
352 * Create temporary replication slot if requested, and update slot
353 * name in shared memory. (Note the slot name cannot already be set
354 * in this case.)
355 */
356 if (is_temp_slot)
357 {
358 snprintf(slotname, sizeof(slotname),
359 "pg_walreceiver_%lld",
360 (long long int) walrcv_get_backend_pid(wrconn));
361
362 walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL);
363
364 SpinLockAcquire(&walrcv->mutex);
365 strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
366 SpinLockRelease(&walrcv->mutex);
367 }
368
369 /*
370 * Start streaming.
371 *
372 * We'll try to start at the requested starting point and timeline,
373 * even if it's different from the server's latest timeline. In case
374 * we've already reached the end of the old timeline, the server will
375 * finish the streaming immediately, and we will go back to await
376 * orders from the startup process. If recovery_target_timeline is
377 * 'latest', the startup process will scan pg_wal and find the new
378 * history file, bump recovery target timeline, and ask us to restart
379 * on the new timeline.
380 */
381 options.logical = false;
382 options.startpoint = startpoint;
383 options.slotname = slotname[0] != '\0' ? slotname : NULL;
384 options.proto.physical.startpointTLI = startpointTLI;
386 {
387 if (first_stream)
388 ereport(LOG,
389 errmsg("started streaming WAL from primary at %X/%08X on timeline %u",
390 LSN_FORMAT_ARGS(startpoint), startpointTLI));
391 else
392 ereport(LOG,
393 errmsg("restarted WAL streaming at %X/%08X on timeline %u",
394 LSN_FORMAT_ARGS(startpoint), startpointTLI));
395 first_stream = false;
396
397 /*
398 * Switch to STREAMING after a successful connection if current
399 * state is CONNECTING. This switch happens after an initial
400 * startup, or after a restart as determined by
401 * WalRcvWaitForStartPosition().
402 */
403 SpinLockAcquire(&walrcv->mutex);
404 if (walrcv->walRcvState == WALRCV_CONNECTING)
405 walrcv->walRcvState = WALRCV_STREAMING;
406 SpinLockRelease(&walrcv->mutex);
407
408 /* Initialize LogstreamResult and buffers for processing messages */
411
412 /* Initialize nap wakeup times. */
414 for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
416
417 /* Send initial reply/feedback messages. */
418 XLogWalRcvSendReply(true, false, false);
420
421 /* Loop until end-of-streaming or error */
422 for (;;)
423 {
424 char *buf;
425 int len;
426 bool endofwal = false;
428 int rc;
430 long nap;
431
432 /*
433 * Exit walreceiver if we're not in recovery. This should not
434 * happen, but cross-check the status here.
435 */
436 if (!RecoveryInProgress())
439 errmsg("cannot continue WAL streaming, recovery has already ended")));
440
441 /* Process any requests or signals received recently */
443
445 {
446 ConfigReloadPending = false;
448 /* recompute wakeup times */
450 for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
453 }
454
455 /* See if we can read data immediately */
457 if (len != 0)
458 {
459 /*
460 * Process the received data, and any subsequent data we
461 * can read without blocking.
462 */
463 for (;;)
464 {
465 if (len > 0)
466 {
467 /*
468 * Something was received from primary, so adjust
469 * the ping and terminate wakeup times.
470 */
473 now);
475 XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
476 startpointTLI);
477 }
478 else if (len == 0)
479 break;
480 else if (len < 0)
481 {
482 ereport(LOG,
483 (errmsg("replication terminated by primary server"),
484 errdetail("End of WAL reached on timeline %u at %X/%08X.",
485 startpointTLI,
487 endofwal = true;
488 break;
489 }
491 }
492
493 /* Let the primary know that we received some data. */
494 XLogWalRcvSendReply(false, false, false);
495
496 /*
497 * If we've written some records, flush them to disk and
498 * let the startup process and primary server know about
499 * them.
500 */
501 XLogWalRcvFlush(false, startpointTLI);
502 }
503
504 /* Check if we need to exit the streaming loop. */
505 if (endofwal)
506 break;
507
508 /* Find the soonest wakeup time, to limit our nap. */
510 for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
512
513 /* Calculate the nap time, clamping as necessary. */
516
517 /*
518 * Ideally we would reuse a WaitEventSet object repeatedly
519 * here to avoid the overheads of WaitLatchOrSocket on epoll
520 * systems, but we can't be sure that libpq (or any other
521 * walreceiver implementation) has the same socket (even if
522 * the fd is the same number, it may have been closed and
523 * reopened since the last time). In future, if there is a
524 * function for removing sockets from WaitEventSet, then we
525 * could add and remove just the socket each time, potentially
526 * avoiding some system calls.
527 */
532 wait_fd,
533 nap,
535 if (rc & WL_LATCH_SET)
536 {
539
540 if (walrcv->apply_reply_requested)
541 {
542 /*
543 * The recovery process has asked us to send apply
544 * feedback now. Make sure the flag is really set to
545 * false in shared memory before sending the reply, so
546 * we don't miss a new request for a reply.
547 */
548 walrcv->apply_reply_requested = false;
550 XLogWalRcvSendReply(false, false, true);
551 }
552 }
553 if (rc & WL_TIMEOUT)
554 {
555 /*
556 * We didn't receive anything new. If we haven't heard
557 * anything from the server for more than
558 * wal_receiver_timeout / 2, ping the server. Also, if
559 * it's been longer than wal_receiver_status_interval
560 * since the last update we sent, send a status update to
561 * the primary anyway, to report any progress in applying
562 * WAL.
563 */
564 bool requestReply = false;
565
566 /*
567 * Report pending statistics to the cumulative stats
568 * system. This location is useful for the report as it
569 * is not within a tight loop in the WAL receiver, to
570 * avoid bloating pgstats with requests, while also making
571 * sure that the reports happen each time a status update
572 * is sent.
573 */
574 pgstat_report_wal(false);
575
576 /*
577 * Check if time since last receive from primary has
578 * reached the configured limit.
579 */
584 errmsg("terminating walreceiver due to timeout")));
585
586 /*
587 * If we didn't receive anything new for half of receiver
588 * replication timeout, then ping the server.
589 */
591 {
592 requestReply = true;
594 }
595
598 }
599 }
600
601 /*
602 * The backend finished streaming. Exit streaming COPY-mode from
603 * our side, too.
604 */
606
607 /*
608 * If the server had switched to a new timeline that we didn't
609 * know about when we began streaming, fetch its timeline history
610 * file now.
611 */
613 }
614 else
615 ereport(LOG,
616 (errmsg("primary server contains no more WAL on requested timeline %u",
617 startpointTLI)));
618
619 /*
620 * End of WAL reached on the requested timeline. Close the last
621 * segment, and await for new orders from the startup process.
622 */
623 if (recvFile >= 0)
624 {
626
627 XLogWalRcvFlush(false, startpointTLI);
629 if (close(recvFile) != 0)
632 errmsg("could not close WAL segment %s: %m",
633 xlogfname)));
634
635 /*
636 * Create .done file forcibly to prevent the streamed segment from
637 * being archived later.
638 */
641 else
643 }
644 recvFile = -1;
645
646 elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
647 WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
648 }
649 /* not reached */
650}
#define pg_memory_barrier()
Definition atomics.h:141
void AuxiliaryProcessMainCommon(void)
Definition auxprocess.c:41
sigset_t UnBlockSig
Definition pqsignal.c:22
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1751
#define Min(x, y)
Definition c.h:1091
#define UINT64_FORMAT
Definition c.h:635
#define pg_fallthrough
Definition c.h:161
void load_file(const char *filename, bool restricted)
Definition dfmgr.c:149
int errcode_for_file_access(void)
Definition elog.c:898
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define PANIC
Definition elog.h:44
#define DEBUG1
Definition elog.h:31
void err(int eval, const char *fmt,...)
Definition err.c:43
ProcNumber MyProcNumber
Definition globals.c:92
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
@ PGC_SIGHUP
Definition guc.h:75
char * cluster_name
Definition guc_tables.c:582
#define close(a)
Definition win32.h:12
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
int i
Definition isn.c:77
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition latch.c:223
static char buf[DEFAULT_XLOG_SEG_SIZE]
#define die(msg)
void pgstat_report_wal(bool force)
Definition pgstat_wal.c:46
#define pqsignal
Definition port.h:547
#define PG_SIG_IGN
Definition port.h:551
int pgsocket
Definition port.h:29
#define PGINVALID_SOCKET
Definition port.h:31
#define PG_SIG_DFL
Definition port.h:550
#define PointerGetDatum(X)
Definition postgres.h:354
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition procsignal.c:688
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
#define WL_SOCKET_READABLE
#define WL_TIMEOUT
#define NUM_WALRCV_WAKEUPS
static StringInfoData reply_message
static int recvFile
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
static TimeLineID recvFileTLI
WalReceiverFunctionsType * WalReceiverFunctions
Definition walreceiver.c:96
static void XLogWalRcvSendReply(bool force, bool requestReply, bool checkApply)
static XLogSegNo recvSegNo
static void XLogWalRcvSendHSFeedback(bool immed)
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
static void WalRcvDie(int code, Datum arg)
static struct @19 LogstreamResult
#define walrcv_startstreaming(conn, options)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
#define walrcv_get_conninfo(conn)
#define walrcv_endstreaming(conn, next_tli)
#define walrcv_identify_system(conn, primary_tli)
#define walrcv_get_backend_pid(conn)
#define walrcv_receive(conn, buffer, wait_fd)
#define SIGCHLD
Definition win32_port.h:168
#define SIGHUP
Definition win32_port.h:158
#define SIGPIPE
Definition win32_port.h:163
#define SIGUSR1
Definition win32_port.h:170
#define SIGALRM
Definition win32_port.h:164
#define SIGUSR2
Definition win32_port.h:171
uint64 GetSystemIdentifier(void)
Definition xlog.c:4647
bool RecoveryInProgress(void)
Definition xlog.c:6830
int wal_segment_size
Definition xlog.c:150
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References ARCHIVE_MODE_ALWAYS, Assert, AuxiliaryProcessMainCommon(), buf, CHECK_FOR_INTERRUPTS, close, cluster_name, ConditionVariableBroadcast(), ConfigReloadPending, DEBUG1, die, elog, ereport, err(), errcode(), errcode_for_file_access(), errdetail(), errmsg, ERROR, FATAL, fb(), GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), i, initStringInfo(), len, load_file(), LOG, LogstreamResult, LSN_FORMAT_ARGS, MAXCONNINFO, MAXFNAMELEN, Min, MyLatch, MyProcNumber, MyProcPid, NAMEDATALEN, now(), NUM_WALRCV_WAKEUPS, on_shmem_exit(), PANIC, pfree(), pg_fallthrough, pg_memory_barrier, PG_SIG_DFL, PG_SIG_IGN, PGC_SIGHUP, PGINVALID_SOCKET, pgstat_report_wal(), PointerGetDatum, pqsignal, proc_exit(), ProcessConfigFile(), procsignal_sigusr1_handler(), RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, reply_message, ResetLatch(), SIGALRM, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, snprintf, SpinLockAcquire(), SpinLockRelease(), strlcpy(), TIMESTAMP_INFINITY, TimestampDifferenceMilliseconds(), UINT64_FORMAT, UnBlockSig, WaitLatchOrSocket(), wakeup, wal_segment_size, WalRcv, walrcv_connect, WALRCV_CONNECTING, walrcv_create_slot, walrcv_endstreaming, walrcv_get_backend_pid, walrcv_get_conninfo, walrcv_get_senderinfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_TERMINATE, WalRcvComputeNextWakeup(), WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvWaitForStartPosition(), WalReceiverFunctions, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_TIMEOUT, wrconn, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName(), XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ XLogWalRcvClose()

static void XLogWalRcvClose ( XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 1069 of file walreceiver.c.

1070{
1071 char xlogfname[MAXFNAMELEN];
1072
1074 Assert(tli != 0);
1075
1076 /*
1077 * fsync() and close current file before we switch to next one. We would
1078 * otherwise have to reopen this file to fsync it later
1079 */
1080 XLogWalRcvFlush(false, tli);
1081
1083
1084 /*
1085 * XLOG segment files will be re-read by recovery in startup process soon,
1086 * so we don't advise the OS to release cache pages associated with the
1087 * file like XLogFileClose() does.
1088 */
1089 if (close(recvFile) != 0)
1090 ereport(PANIC,
1092 errmsg("could not close WAL segment %s: %m",
1093 xlogfname)));
1094
1095 /*
1096 * Create .done file forcibly to prevent the streamed segment from being
1097 * archived later.
1098 */
1101 else
1103
1104 recvFile = -1;
1105}
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

References ARCHIVE_MODE_ALWAYS, Assert, close, ereport, errcode_for_file_access(), errmsg, fb(), MAXFNAMELEN, PANIC, recvFile, recvFileTLI, recvSegNo, wal_segment_size, XLByteInSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName(), and XLogWalRcvFlush().

Referenced by XLogWalRcvWrite().

◆ XLogWalRcvFlush()

static void XLogWalRcvFlush ( bool  dying,
TimeLineID  tli 
)
static

Definition at line 1005 of file walreceiver.c.

1006{
1007 Assert(tli != 0);
1008
1009 if (LogstreamResult.Flush < LogstreamResult.Write)
1010 {
1012
1014
1015 LogstreamResult.Flush = LogstreamResult.Write;
1016
1017 /* Update shared-memory status */
1018 SpinLockAcquire(&walrcv->mutex);
1019 if (walrcv->flushedUpto < LogstreamResult.Flush)
1020 {
1021 walrcv->latestChunkStart = walrcv->flushedUpto;
1022 walrcv->flushedUpto = LogstreamResult.Flush;
1023 walrcv->receivedTLI = tli;
1024 }
1025 SpinLockRelease(&walrcv->mutex);
1026
1027 /*
1028 * If we flushed an LSN that someone was waiting for, notify the
1029 * waiters.
1030 */
1031 if (waitLSNState &&
1032 (LogstreamResult.Flush >=
1035
1036 /* Signal the startup process and walsender that new WAL has arrived */
1039 WalSndWakeup(true, false);
1040
1041 /* Report XLOG streaming progress in PS display */
1043 {
1044 char activitymsg[50];
1045
1046 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%08X",
1049 }
1050
1051 /* Also let the primary know that we made some progress */
1052 if (!dying)
1053 {
1054 XLogWalRcvSendReply(false, false, false);
1056 }
1057 }
1058}
pg_atomic_uint64 minWaitedLSN[WAIT_LSN_TYPE_COUNT]
Definition xlogwait.h:85
#define AllowCascadeReplication()
Definition walreceiver.h:40
void WalSndWakeup(bool physical, bool logical)
Definition walsender.c:3958
void issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli)
Definition xlog.c:9374
struct WaitLSNState * waitLSNState
Definition xlogwait.c:70
void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
Definition xlogwait.c:320
@ WAIT_LSN_TYPE_STANDBY_FLUSH
Definition xlogwait.h:41

References AllowCascadeReplication, Assert, fb(), issue_xlog_fsync(), LogstreamResult, LSN_FORMAT_ARGS, WaitLSNState::minWaitedLSN, pg_atomic_read_u64(), recvFile, recvSegNo, set_ps_display(), snprintf, SpinLockAcquire(), SpinLockRelease(), update_process_title, WAIT_LSN_TYPE_STANDBY_FLUSH, waitLSNState, WaitLSNWakeup(), WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvClose().

◆ XLogWalRcvProcessMsg()

static void XLogWalRcvProcessMsg ( unsigned char  type,
char buf,
Size  len,
TimeLineID  tli 
)
static

Definition at line 831 of file walreceiver.c.

832{
833 int hdrlen;
837 bool replyRequested;
838
839 switch (type)
840 {
842 {
844
845 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
846 if (len < hdrlen)
849 errmsg_internal("invalid WAL message received from primary")));
850
851 /* initialize a StringInfo with the given buffer */
853
854 /* read the fields */
859
860 buf += hdrlen;
861 len -= hdrlen;
863 break;
864 }
866 {
868
869 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
870 if (len != hdrlen)
873 errmsg_internal("invalid keepalive message received from primary")));
874
875 /* initialize a StringInfo with the given buffer */
877
878 /* read the fields */
882
884
885 /* If the primary requested a reply, send one immediately */
886 if (replyRequested)
887 XLogWalRcvSendReply(true, false, false);
888 break;
889 }
890 default:
893 errmsg_internal("invalid replication message type %d",
894 type)));
895 }
896}
int64_t int64
Definition c.h:621
int pq_getmsgbyte(StringInfo msg)
Definition pqformat.c:398
int64 pq_getmsgint64(StringInfo msg)
Definition pqformat.c:452
#define PqReplMsg_WALData
Definition protocol.h:77
#define PqReplMsg_Keepalive
Definition protocol.h:75
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition stringinfo.h:157
const char * type
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)

References buf, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg_internal(), ERROR, fb(), initReadOnlyStringInfo(), len, pq_getmsgbyte(), pq_getmsgint64(), PqReplMsg_Keepalive, PqReplMsg_WALData, ProcessWalSndrMessage(), type, XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

◆ XLogWalRcvSendHSFeedback()

static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1204 of file walreceiver.c.

1205{
1208 TransactionId nextXid;
1211 TransactionId xmin,
1212 catalog_xmin;
1213
1214 /* initially true so we always send at least one feedback message */
1215 static bool primary_has_standby_xmin = true;
1216
1217 /*
1218 * If the user doesn't want status to be reported to the primary, be sure
1219 * to exit before doing anything at all.
1220 */
1223 return;
1224
1225 /* Get current timestamp. */
1227
1228 /* Send feedback at most once per wal_receiver_status_interval. */
1230 return;
1231
1232 /* Make sure we wake up when it's time to send feedback again. */
1234
1235 /*
1236 * If Hot Standby is not yet accepting connections there is nothing to
1237 * send. Check this after the interval has expired to reduce number of
1238 * calls.
1239 *
1240 * Bailing out here also ensures that we don't send feedback until we've
1241 * read our own replication slot state, so we don't tell the primary to
1242 * discard needed xmin or catalog_xmin from any slots that may exist on
1243 * this replica.
1244 */
1245 if (!HotStandbyActive())
1246 return;
1247
1248 /*
1249 * Make the expensive call to get the oldest xmin once we are certain
1250 * everything else has been checked.
1251 */
1253 {
1254 GetReplicationHorizons(&xmin, &catalog_xmin);
1255 }
1256 else
1257 {
1258 xmin = InvalidTransactionId;
1259 catalog_xmin = InvalidTransactionId;
1260 }
1261
1262 /*
1263 * Get epoch and adjust if nextXid and oldestXmin are different sides of
1264 * the epoch boundary.
1265 */
1270 if (nextXid < xmin)
1271 xmin_epoch--;
1272 if (nextXid < catalog_xmin)
1274
1275 elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1276 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1277
1278 /* Construct the message and send it. */
1284 pq_sendint32(&reply_message, catalog_xmin);
1287 if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1289 else
1291}
uint32_t uint32
Definition c.h:624
uint32 TransactionId
Definition c.h:736
static void pq_sendint32(StringInfo buf, uint32 i)
Definition pqformat.h:144
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition pqformat.h:152
void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin)
Definition procarray.c:1986
#define PqReplMsg_HotStandbyFeedback
Definition protocol.h:82
void resetStringInfo(StringInfo str)
Definition stringinfo.c:126
#define InvalidTransactionId
Definition transam.h:31
#define EpochFromFullTransactionId(x)
Definition transam.h:47
#define XidFromFullTransactionId(x)
Definition transam.h:48
#define TransactionIdIsValid(xid)
Definition transam.h:41
FullTransactionId ReadNextFullTransactionId(void)
Definition varsup.c:283
#define walrcv_send(conn, buffer, nbytes)
bool HotStandbyActive(void)

References StringInfoData::data, DEBUG2, elog, EpochFromFullTransactionId, fb(), GetCurrentTimestamp(), GetReplicationHorizons(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), pq_sendbyte(), pq_sendint32(), pq_sendint64(), PqReplMsg_HotStandbyFeedback, ReadNextFullTransactionId(), reply_message, resetStringInfo(), TransactionIdIsValid, wakeup, wal_receiver_status_interval, walrcv_send, WALRCV_WAKEUP_HSFEEDBACK, WalRcvComputeNextWakeup(), wrconn, and XidFromFullTransactionId.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

◆ XLogWalRcvSendReply()

static void XLogWalRcvSendReply ( bool  force,
bool  requestReply,
bool  checkApply 
)
static

Definition at line 1131 of file walreceiver.c.

1132{
1138
1139 /*
1140 * If the user doesn't want status to be reported to the primary, be sure
1141 * to exit before doing anything at all.
1142 */
1143 if (!force && wal_receiver_status_interval <= 0)
1144 return;
1145
1146 /* Get current timestamp. */
1148
1149 /*
1150 * We can compare the write and flush positions to the last message we
1151 * sent without taking any lock, but the apply position requires a spin
1152 * lock, so we don't check that unless it is expected to advance since the
1153 * previous update, i.e., when 'checkApply' is true.
1154 */
1155 if (!force && now < wakeup[WALRCV_WAKEUP_REPLY])
1156 {
1157 if (checkApply)
1159
1160 if (writePtr == LogstreamResult.Write
1161 && flushPtr == LogstreamResult.Flush
1163 return;
1164 }
1165
1166 /* Make sure we wake up when it's time to send another reply. */
1168
1169 /* Construct a new message */
1170 writePtr = LogstreamResult.Write;
1171 flushPtr = LogstreamResult.Flush;
1174
1182
1183 /* Send it */
1184 elog(DEBUG2, "sending write %X/%08X flush %X/%08X apply %X/%08X%s",
1188 requestReply ? " (reply requested)" : "");
1189
1191}
#define PqReplMsg_StandbyStatusUpdate
Definition protocol.h:84

References StringInfoData::data, DEBUG2, elog, fb(), GetCurrentTimestamp(), GetXLogReplayRecPtr(), InvalidXLogRecPtr, StringInfoData::len, LogstreamResult, LSN_FORMAT_ARGS, now(), pq_sendbyte(), pq_sendint64(), PqReplMsg_StandbyStatusUpdate, reply_message, resetStringInfo(), wakeup, wal_receiver_status_interval, walrcv_send, WALRCV_WAKEUP_REPLY, WalRcvComputeNextWakeup(), wrconn, and XLogRecPtrIsValid.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

◆ XLogWalRcvWrite()

static void XLogWalRcvWrite ( char buf,
Size  nbytes,
XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 902 of file walreceiver.c.

903{
904 int startoff;
905 int byteswritten;
907
908 Assert(tli != 0);
909
910 while (nbytes > 0)
911 {
912 int segbytes;
913
914 /* Close the current segment if it's completed */
917
918 if (recvFile < 0)
919 {
920 /* Create/use new log file */
923 recvFileTLI = tli;
924 }
925
926 /* Calculate the start offset of the received logs */
928
929 if (startoff + nbytes > wal_segment_size)
931 else
932 segbytes = nbytes;
933
934 /* OK to write the logs */
935 errno = 0;
936
937 /*
938 * Measure I/O timing to write WAL data, for pg_stat_io.
939 */
941
945
948
949 if (byteswritten <= 0)
950 {
952 int save_errno;
953
954 /* if write didn't set errno, assume no disk space */
955 if (errno == 0)
956 errno = ENOSPC;
957
963 errmsg("could not write to WAL segment %s "
964 "at offset %d, length %d: %m",
966 }
967
968 /* Update state for write */
970
971 nbytes -= byteswritten;
972 buf += byteswritten;
973
974 LogstreamResult.Write = recptr;
975 }
976
977 /* Update shared-memory status */
979
980 /*
981 * If we wrote an LSN that someone was waiting for, notify the waiters.
982 */
983 if (waitLSNState &&
984 (LogstreamResult.Write >=
987
988 /*
989 * Close the current segment if it's fully written up in the last cycle of
990 * the loop, to create its archive notification file soon. Otherwise WAL
991 * archiving of the segment will be delayed until any data in the next
992 * segment is received and written.
993 */
996}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:485
return str start
@ IOOBJECT_WAL
Definition pgstat.h:283
@ IOCONTEXT_NORMAL
Definition pgstat.h:293
@ IOOP_WRITE
Definition pgstat.h:320
instr_time pgstat_prepare_io_time(bool track_io_guc)
Definition pgstat_io.c:91
void pgstat_count_io_op_time(IOObject io_object, IOContext io_context, IOOp io_op, instr_time start_time, uint32 cnt, uint64 bytes)
Definition pgstat_io.c:122
#define pg_pwrite
Definition port.h:248
off_t pgoff_t
Definition port.h:421
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition wait_event.h:67
static void pgstat_report_wait_end(void)
Definition wait_event.h:83
static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
int XLogFileInit(XLogSegNo logsegno, TimeLineID logtli)
Definition xlog.c:3435
bool track_wal_io_timing
Definition xlog.c:144
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
@ WAIT_LSN_TYPE_STANDBY_WRITE
Definition xlogwait.h:40

References Assert, buf, ereport, errcode_for_file_access(), errmsg, fb(), IOCONTEXT_NORMAL, IOOBJECT_WAL, IOOP_WRITE, LogstreamResult, MAXFNAMELEN, WaitLSNState::minWaitedLSN, PANIC, pg_atomic_read_u64(), pg_atomic_write_u64(), pg_pwrite, pgstat_count_io_op_time(), pgstat_prepare_io_time(), pgstat_report_wait_end(), pgstat_report_wait_start(), recvFile, recvFileTLI, recvSegNo, start, track_wal_io_timing, WAIT_LSN_TYPE_STANDBY_WRITE, waitLSNState, WaitLSNWakeup(), wal_segment_size, WalRcv, WalRcvData::writtenUpto, XLByteInSeg, XLByteToSeg, XLogFileInit(), XLogFileName(), XLogSegmentOffset, and XLogWalRcvClose().

Referenced by XLogWalRcvProcessMsg().

Variable Documentation

◆ Flush

XLogRecPtr Flush

Definition at line 114 of file walreceiver.c.

Referenced by XLogWrite().

◆ hot_standby_feedback

bool hot_standby_feedback

◆ [struct]

struct { ... } LogstreamResult

◆ recvFile

int recvFile = -1
static

Definition at line 103 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ recvFileTLI

TimeLineID recvFileTLI = 0
static

Definition at line 104 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), and XLogWalRcvWrite().

◆ recvSegNo

XLogSegNo recvSegNo = 0
static

Definition at line 105 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ reply_message

StringInfoData reply_message
static

◆ wakeup

◆ wal_receiver_status_interval

◆ wal_receiver_timeout

int wal_receiver_timeout

◆ WalReceiverFunctions

WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 96 of file walreceiver.c.

Referenced by _PG_init(), and WalReceiverMain().

◆ wrconn

◆ Write

XLogRecPtr Write

Definition at line 113 of file walreceiver.c.

Referenced by XLogWrite().