PostgreSQL Source Code git master
Loading...
Searching...
No Matches
walreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "access/xlogrecovery.h"
#include "access/xlogwait.h"
#include "catalog/pg_authid.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/auxprocess.h"
#include "postmaster/interrupt.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NUM_WALRCV_WAKEUPS   (WALRCV_WAKEUP_HSFEEDBACK + 1)
 

Typedefs

typedef enum WalRcvWakeupReason WalRcvWakeupReason
 

Enumerations

enum  WalRcvWakeupReason { WALRCV_WAKEUP_TERMINATE , WALRCV_WAKEUP_PING , WALRCV_WAKEUP_REPLY , WALRCV_WAKEUP_HSFEEDBACK }
 

Functions

static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len, TimeLineID tli)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvFlush (bool dying, TimeLineID tli)
 
static void XLogWalRcvClose (XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvSendReply (bool force, bool requestReply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
static void WalRcvComputeNextWakeup (WalRcvWakeupReason reason, TimestampTz now)
 
void WalReceiverMain (const void *startup_data, size_t startup_data_len)
 
void WalRcvForceReply (void)
 
static const charWalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
struct { 
 
   XLogRecPtr   Write 
 
   XLogRecPtr   Flush 
 
LogstreamResult 
 
static TimestampTz wakeup [NUM_WALRCV_WAKEUPS]
 
static StringInfoData reply_message
 

Macro Definition Documentation

◆ NUM_WALRCV_WAKEUPS

#define NUM_WALRCV_WAKEUPS   (WALRCV_WAKEUP_HSFEEDBACK + 1)

Definition at line 125 of file walreceiver.c.

Typedef Documentation

◆ WalRcvWakeupReason

Enumeration Type Documentation

◆ WalRcvWakeupReason

Enumerator
WALRCV_WAKEUP_TERMINATE 
WALRCV_WAKEUP_PING 
WALRCV_WAKEUP_REPLY 
WALRCV_WAKEUP_HSFEEDBACK 

Definition at line 119 of file walreceiver.c.

120{
125#define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1)
WalRcvWakeupReason
@ WALRCV_WAKEUP_TERMINATE
@ WALRCV_WAKEUP_REPLY
@ WALRCV_WAKEUP_PING
@ WALRCV_WAKEUP_HSFEEDBACK

Function Documentation

◆ pg_stat_get_wal_receiver()

Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1425 of file walreceiver.c.

1426{
1427 TupleDesc tupdesc;
1428 Datum *values;
1429 bool *nulls;
1430 int pid;
1431 bool ready_to_display;
1438 TimestampTz last_send_time;
1442 char sender_host[NI_MAXHOST];
1443 int sender_port = 0;
1444 char slotname[NAMEDATALEN];
1445 char conninfo[MAXCONNINFO];
1446
1447 /* Take a lock to ensure value consistency */
1449 pid = (int) WalRcv->pid;
1450 ready_to_display = WalRcv->ready_to_display;
1456 last_send_time = WalRcv->lastMsgSendTime;
1460 strlcpy(slotname, WalRcv->slotname, sizeof(slotname));
1461 strlcpy(sender_host, WalRcv->sender_host, sizeof(sender_host));
1462 sender_port = WalRcv->sender_port;
1463 strlcpy(conninfo, WalRcv->conninfo, sizeof(conninfo));
1465
1466 /*
1467 * No WAL receiver (or not ready yet), just return a tuple with NULL
1468 * values
1469 */
1470 if (pid == 0 || !ready_to_display)
1472
1473 /*
1474 * Read "writtenUpto" without holding a spinlock. Note that it may not be
1475 * consistent with the other shared variables of the WAL receiver
1476 * protected by a spinlock, but this should not be used for data integrity
1477 * checks.
1478 */
1480
1481 /* determine result type */
1482 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1483 elog(ERROR, "return type must be a row type");
1484
1485 values = palloc0_array(Datum, tupdesc->natts);
1486 nulls = palloc0_array(bool, tupdesc->natts);
1487
1488 /* Fetch values */
1489 values[0] = Int32GetDatum(pid);
1490
1492 {
1493 /*
1494 * Only superusers and roles with privileges of pg_read_all_stats can
1495 * see details. Other users only get the pid value to know whether it
1496 * is a WAL receiver, but no details.
1497 */
1498 memset(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1499 }
1500 else
1501 {
1503
1505 nulls[2] = true;
1506 else
1510 nulls[4] = true;
1511 else
1514 nulls[5] = true;
1515 else
1518 if (last_send_time == 0)
1519 nulls[7] = true;
1520 else
1521 values[7] = TimestampTzGetDatum(last_send_time);
1522 if (last_receipt_time == 0)
1523 nulls[8] = true;
1524 else
1527 nulls[9] = true;
1528 else
1530 if (latest_end_time == 0)
1531 nulls[10] = true;
1532 else
1534 if (*slotname == '\0')
1535 nulls[11] = true;
1536 else
1537 values[11] = CStringGetTextDatum(slotname);
1538 if (*sender_host == '\0')
1539 nulls[12] = true;
1540 else
1541 values[12] = CStringGetTextDatum(sender_host);
1542 if (sender_port == 0)
1543 nulls[13] = true;
1544 else
1545 values[13] = Int32GetDatum(sender_port);
1546 if (*conninfo == '\0')
1547 nulls[14] = true;
1548 else
1549 values[14] = CStringGetTextDatum(conninfo);
1550 }
1551
1552 /* Returns the record as Datum */
1554}
bool has_privs_of_role(Oid member, Oid role)
Definition acl.c:5284
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition atomics.h:467
static Datum values[MAXATTR]
Definition bootstrap.c:155
#define CStringGetTextDatum(s)
Definition builtins.h:97
int64 TimestampTz
Definition timestamp.h:39
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define palloc0_array(type, count)
Definition fe_memutils.h:77
#define PG_RETURN_NULL()
Definition fmgr.h:346
#define PG_RETURN_DATUM(x)
Definition fmgr.h:354
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition funcapi.c:276
@ TYPEFUNC_COMPOSITE
Definition funcapi.h:149
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
Definition funcapi.h:230
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1117
Oid GetUserId(void)
Definition miscinit.c:469
#define NAMEDATALEN
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition strlcpy.c:45
uint64_t Datum
Definition postgres.h:70
static Datum Int32GetDatum(int32 X)
Definition postgres.h:222
static int fb(int x)
#define SpinLockRelease(lock)
Definition spin.h:61
#define SpinLockAcquire(lock)
Definition spin.h:59
TimestampTz lastMsgReceiptTime
XLogRecPtr latestWalEnd
TimeLineID receiveStartTLI
Definition walreceiver.h:88
TimeLineID receivedTLI
Definition walreceiver.h:98
char slotname[NAMEDATALEN]
char sender_host[NI_MAXHOST]
XLogRecPtr receiveStart
Definition walreceiver.h:87
XLogRecPtr flushedUpto
Definition walreceiver.h:97
pg_atomic_uint64 writtenUpto
TimestampTz lastMsgSendTime
WalRcvState walRcvState
Definition walreceiver.h:72
TimestampTz latestWalEndTime
bool ready_to_display
slock_t mutex
char conninfo[MAXCONNINFO]
static Datum TimestampTzGetDatum(TimestampTz X)
Definition timestamp.h:52
static const char * WalRcvGetStateString(WalRcvState state)
#define MAXCONNINFO
Definition walreceiver.h:37
WalRcvState
Definition walreceiver.h:46
WalRcvData * WalRcv
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
uint64 XLogRecPtr
Definition xlogdefs.h:21
uint32 TimeLineID
Definition xlogdefs.h:63

References WalRcvData::conninfo, CStringGetTextDatum, elog, ERROR, fb(), WalRcvData::flushedUpto, get_call_result_type(), GetUserId(), has_privs_of_role(), heap_form_tuple(), HeapTupleGetDatum(), Int32GetDatum(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum(), MAXCONNINFO, WalRcvData::mutex, NAMEDATALEN, TupleDescData::natts, palloc0_array, pg_atomic_read_u64(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::sender_host, WalRcvData::sender_port, WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, strlcpy(), TimestampTzGetDatum(), TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, WalRcvData::writtenUpto, and XLogRecPtrIsValid.

◆ ProcessWalSndrMessage()

static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1287 of file walreceiver.c.

1288{
1290 TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1291
1292 /* Update shared-memory status */
1293 SpinLockAcquire(&walrcv->mutex);
1294 if (walrcv->latestWalEnd < walEnd)
1295 walrcv->latestWalEndTime = sendTime;
1296 walrcv->latestWalEnd = walEnd;
1297 walrcv->lastMsgSendTime = sendTime;
1298 walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1299 SpinLockRelease(&walrcv->mutex);
1300
1302 {
1303 char *sendtime;
1304 char *receipttime;
1305 int applyDelay;
1306
1307 /* Copy because timestamptz_to_str returns a static buffer */
1309 receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1311
1312 /* apply delay is not available */
1313 if (applyDelay == -1)
1314 elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1315 sendtime,
1318 else
1319 elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1320 sendtime,
1322 applyDelay,
1324
1325 pfree(sendtime);
1327 }
1328}
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1645
const char * timestamptz_to_str(TimestampTz t)
Definition timestamp.c:1862
bool message_level_is_interesting(int elevel)
Definition elog.c:273
#define DEBUG2
Definition elog.h:29
char * pstrdup(const char *in)
Definition mcxt.c:1781
void pfree(void *pointer)
Definition mcxt.c:1616
int GetReplicationApplyDelay(void)
int GetReplicationTransferLatency(void)

References DEBUG2, elog, fb(), GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), message_level_is_interesting(), pfree(), pstrdup(), SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

◆ WalRcvComputeNextWakeup()

static void WalRcvComputeNextWakeup ( WalRcvWakeupReason  reason,
TimestampTz  now 
)
static

Definition at line 1339 of file walreceiver.c.

1340{
1341 switch (reason)
1342 {
1344 if (wal_receiver_timeout <= 0)
1345 wakeup[reason] = TIMESTAMP_INFINITY;
1346 else
1348 break;
1349 case WALRCV_WAKEUP_PING:
1350 if (wal_receiver_timeout <= 0)
1351 wakeup[reason] = TIMESTAMP_INFINITY;
1352 else
1354 break;
1357 wakeup[reason] = TIMESTAMP_INFINITY;
1358 else
1360 break;
1363 wakeup[reason] = TIMESTAMP_INFINITY;
1364 else
1366 break;
1367 /* there's intentionally no default: here */
1368 }
1369}
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1609
#define TIMESTAMP_INFINITY
Definition timestamp.h:151
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
#define TimestampTzPlusSeconds(tz, s)
Definition timestamp.h:86
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]
bool hot_standby_feedback
Definition walreceiver.c:91
int wal_receiver_status_interval
Definition walreceiver.c:89
int wal_receiver_timeout
Definition walreceiver.c:90

References hot_standby_feedback, now(), TIMESTAMP_INFINITY, TimestampTzPlusMilliseconds, TimestampTzPlusSeconds, wakeup, wal_receiver_status_interval, wal_receiver_timeout, WALRCV_WAKEUP_HSFEEDBACK, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_REPLY, and WALRCV_WAKEUP_TERMINATE.

Referenced by WalReceiverMain(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ WalRcvDie()

static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 793 of file walreceiver.c.

794{
797
798 Assert(*startpointTLI_p != 0);
799
800 /* Ensure that all WAL records received are flushed to disk */
802
803 /* Mark ourselves inactive in shared memory */
804 SpinLockAcquire(&walrcv->mutex);
805 Assert(walrcv->walRcvState == WALRCV_STREAMING ||
806 walrcv->walRcvState == WALRCV_CONNECTING ||
807 walrcv->walRcvState == WALRCV_RESTARTING ||
808 walrcv->walRcvState == WALRCV_STARTING ||
809 walrcv->walRcvState == WALRCV_WAITING ||
810 walrcv->walRcvState == WALRCV_STOPPING);
811 Assert(walrcv->pid == MyProcPid);
812 walrcv->walRcvState = WALRCV_STOPPED;
813 walrcv->pid = 0;
814 walrcv->procno = INVALID_PROC_NUMBER;
815 walrcv->ready_to_display = false;
816 SpinLockRelease(&walrcv->mutex);
817
818 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
819
820 /* Terminate the connection gracefully. */
821 if (wrconn != NULL)
823
824 /* Wake up the startup process to notice promptly that we're gone */
826}
#define Assert(condition)
Definition c.h:873
void ConditionVariableBroadcast(ConditionVariable *cv)
int MyProcPid
Definition globals.c:47
void * arg
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:342
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
static WalReceiverConn * wrconn
Definition walreceiver.c:94
static void XLogWalRcvFlush(bool dying, TimeLineID tli)
@ WALRCV_STARTING
Definition walreceiver.h:48
@ WALRCV_STOPPED
Definition walreceiver.h:47
@ WALRCV_CONNECTING
Definition walreceiver.h:50
@ WALRCV_RESTARTING
Definition walreceiver.h:53
@ WALRCV_STREAMING
Definition walreceiver.h:51
@ WALRCV_WAITING
Definition walreceiver.h:52
@ WALRCV_STOPPING
Definition walreceiver.h:54
#define walrcv_disconnect(conn)
void WakeupRecovery(void)

References arg, Assert, ConditionVariableBroadcast(), DatumGetPointer(), fb(), INVALID_PROC_NUMBER, MyProcPid, SpinLockAcquire, SpinLockRelease, WakeupRecovery(), WalRcv, WALRCV_CONNECTING, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, wrconn, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

◆ WalRcvFetchTimeLineHistoryFiles()

static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 737 of file walreceiver.c.

738{
739 TimeLineID tli;
740
741 for (tli = first; tli <= last; tli++)
742 {
743 /* there's no history file for timeline 1 */
744 if (tli != 1 && !existsTimeLineHistory(tli))
745 {
746 char *fname;
747 char *content;
748 int len;
750
751 ereport(LOG,
752 (errmsg("fetching timeline history file for timeline %u from primary server",
753 tli)));
754
755 walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
756
757 /*
758 * Check that the filename on the primary matches what we
759 * calculated ourselves. This is just a sanity check, it should
760 * always match.
761 */
763 if (strcmp(fname, expectedfname) != 0)
766 errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
767 tli)));
768
769 /*
770 * Write the file to pg_wal.
771 */
772 writeTimeLineHistoryFile(tli, content, len);
773
774 /*
775 * Mark the streamed history file as ready for archiving if
776 * archive_mode is always.
777 */
780 else
781 XLogArchiveNotify(fname);
782
783 pfree(fname);
784 pfree(content);
785 }
786 }
787}
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition timeline.c:463
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition timeline.c:222
int errmsg_internal(const char *fmt,...)
Definition elog.c:1170
int errcode(int sqlerrcode)
Definition elog.c:863
int errmsg(const char *fmt,...)
Definition elog.c:1080
#define LOG
Definition elog.h:31
#define ereport(elevel,...)
Definition elog.h:150
const void size_t len
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
int XLogArchiveMode
Definition xlog.c:122
@ ARCHIVE_MODE_ALWAYS
Definition xlog.h:68
#define MAXFNAMELEN
static void TLHistoryFileName(char *fname, TimeLineID tli)
void XLogArchiveForceDone(const char *xlog)
void XLogArchiveNotify(const char *xlog)

References ARCHIVE_MODE_ALWAYS, ereport, errcode(), errmsg(), errmsg_internal(), ERROR, existsTimeLineHistory(), fb(), len, LOG, MAXFNAMELEN, pfree(), TLHistoryFileName(), walrcv_readtimelinehistoryfile, wrconn, writeTimeLineHistoryFile(), XLogArchiveForceDone(), XLogArchiveMode, and XLogArchiveNotify().

Referenced by WalReceiverMain().

◆ WalRcvForceReply()

void WalRcvForceReply ( void  )

Definition at line 1380 of file walreceiver.c.

1381{
1382 ProcNumber procno;
1383
1384 WalRcv->force_reply = true;
1385 /* fetching the proc number is probably atomic, but don't rely on it */
1387 procno = WalRcv->procno;
1389 if (procno != INVALID_PROC_NUMBER)
1390 SetLatch(&GetPGProcByNumber(procno)->procLatch);
1391}
void SetLatch(Latch *latch)
Definition latch.c:290
#define GetPGProcByNumber(n)
Definition proc.h:440
int ProcNumber
Definition procnumber.h:24
sig_atomic_t force_reply
ProcNumber procno
Definition walreceiver.h:68

References WalRcvData::force_reply, GetPGProcByNumber, INVALID_PROC_NUMBER, WalRcvData::mutex, WalRcvData::procno, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by ApplyWalRecord(), and WaitForWALToBecomeAvailable().

◆ WalRcvGetStateString()

static const char * WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1398 of file walreceiver.c.

1399{
1400 switch (state)
1401 {
1402 case WALRCV_STOPPED:
1403 return "stopped";
1404 case WALRCV_STARTING:
1405 return "starting";
1406 case WALRCV_CONNECTING:
1407 return "connecting";
1408 case WALRCV_STREAMING:
1409 return "streaming";
1410 case WALRCV_WAITING:
1411 return "waiting";
1412 case WALRCV_RESTARTING:
1413 return "restarting";
1414 case WALRCV_STOPPING:
1415 return "stopping";
1416 }
1417 return "UNKNOWN";
1418}

References WALRCV_CONNECTING, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

◆ WalRcvWaitForStartPosition()

static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 657 of file walreceiver.c.

658{
660 int state;
661
662 SpinLockAcquire(&walrcv->mutex);
663 state = walrcv->walRcvState;
665 {
666 SpinLockRelease(&walrcv->mutex);
667 if (state == WALRCV_STOPPING)
668 proc_exit(0);
669 else
670 elog(FATAL, "unexpected walreceiver state");
671 }
672 walrcv->walRcvState = WALRCV_WAITING;
673 walrcv->receiveStart = InvalidXLogRecPtr;
674 walrcv->receiveStartTLI = 0;
675 SpinLockRelease(&walrcv->mutex);
676
677 set_ps_display("idle");
678
679 /*
680 * nudge startup process to notice that we've stopped streaming and are
681 * now waiting for instructions.
682 */
684 for (;;)
685 {
687
689
690 SpinLockAcquire(&walrcv->mutex);
691 Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
692 walrcv->walRcvState == WALRCV_WAITING ||
693 walrcv->walRcvState == WALRCV_STOPPING);
694 if (walrcv->walRcvState == WALRCV_RESTARTING)
695 {
696 /*
697 * No need to handle changes in primary_conninfo or
698 * primary_slot_name here. Startup process will signal us to
699 * terminate in case those change.
700 */
701 *startpoint = walrcv->receiveStart;
702 *startpointTLI = walrcv->receiveStartTLI;
703 walrcv->walRcvState = WALRCV_CONNECTING;
704 SpinLockRelease(&walrcv->mutex);
705 break;
706 }
707 if (walrcv->walRcvState == WALRCV_STOPPING)
708 {
709 /*
710 * We should've received SIGTERM if the startup process wants us
711 * to die, but might as well check it here too.
712 */
713 SpinLockRelease(&walrcv->mutex);
714 exit(1);
715 }
716 SpinLockRelease(&walrcv->mutex);
717
720 }
721
723 {
724 char activitymsg[50];
725
726 snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%08X",
727 LSN_FORMAT_ARGS(*startpoint));
729 }
730}
#define FATAL
Definition elog.h:41
struct Latch * MyLatch
Definition globals.c:63
void proc_exit(int code)
Definition ipc.c:105
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
#define snprintf
Definition port.h:260
bool update_process_title
Definition ps_status.c:31
static void set_ps_display(const char *activity)
Definition ps_status.h:40
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References Assert, CHECK_FOR_INTERRUPTS, elog, FATAL, fb(), InvalidXLogRecPtr, LSN_FORMAT_ARGS, MyLatch, proc_exit(), ResetLatch(), set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_CONNECTING, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by WalReceiverMain().

◆ WalReceiverMain()

void WalReceiverMain ( const void startup_data,
size_t  startup_data_len 
)

Definition at line 153 of file walreceiver.c.

154{
155 char conninfo[MAXCONNINFO];
156 char *tmp_conninfo;
157 char slotname[NAMEDATALEN];
158 bool is_temp_slot;
159 XLogRecPtr startpoint;
160 TimeLineID startpointTLI;
162 bool first_stream;
165 char *err;
166 char *sender_host = NULL;
167 int sender_port = 0;
168 char *appname;
169
171
173
174 /*
175 * WalRcv should be set up already (if we are a backend, we inherit this
176 * by fork() or EXEC_BACKEND mechanism from the postmaster).
177 */
178 walrcv = WalRcv;
179 Assert(walrcv != NULL);
180
181 /*
182 * Mark walreceiver as running in shared memory.
183 *
184 * Do this as early as possible, so that if we fail later on, we'll set
185 * state to STOPPED. If we die before this, the startup process will keep
186 * waiting for us to start up, until it times out.
187 */
188 SpinLockAcquire(&walrcv->mutex);
189 Assert(walrcv->pid == 0);
190 switch (walrcv->walRcvState)
191 {
192 case WALRCV_STOPPING:
193 /* If we've already been requested to stop, don't start up. */
194 walrcv->walRcvState = WALRCV_STOPPED;
195 /* fall through */
196
197 case WALRCV_STOPPED:
198 SpinLockRelease(&walrcv->mutex);
199 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
200 proc_exit(1);
201 break;
202
203 case WALRCV_STARTING:
204 /* The usual case */
205 break;
206
208 case WALRCV_WAITING:
209 case WALRCV_STREAMING:
211 default:
212 /* Shouldn't happen */
213 SpinLockRelease(&walrcv->mutex);
214 elog(PANIC, "walreceiver still running according to shared memory state");
215 }
216 /* Advertise our PID so that the startup process can kill us */
217 walrcv->pid = MyProcPid;
218 walrcv->walRcvState = WALRCV_CONNECTING;
219
220 /* Fetch information required to start streaming */
221 walrcv->ready_to_display = false;
222 strlcpy(conninfo, walrcv->conninfo, MAXCONNINFO);
223 strlcpy(slotname, walrcv->slotname, NAMEDATALEN);
224 is_temp_slot = walrcv->is_temp_slot;
225 startpoint = walrcv->receiveStart;
226 startpointTLI = walrcv->receiveStartTLI;
227
228 /*
229 * At most one of is_temp_slot and slotname can be set; otherwise,
230 * RequestXLogStreaming messed up.
231 */
232 Assert(!is_temp_slot || (slotname[0] == '\0'));
233
234 /* Initialise to a sanish value */
236 walrcv->lastMsgSendTime =
237 walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
238
239 /* Report our proc number so that others can wake us up */
240 walrcv->procno = MyProcNumber;
241
242 SpinLockRelease(&walrcv->mutex);
243
245
246 /* Arrange to clean up at walreceiver exit */
247 on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
248
249 /* Properly accept or ignore signals the postmaster might send us */
250 pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
251 * file */
253 pqsignal(SIGTERM, die); /* request shutdown */
254 /* SIGQUIT handler was already set up by InitPostmasterChild */
259
260 /* Reset some signals that are accepted by postmaster but not here */
262
263 /* Load the libpq-specific functions */
264 load_file("libpqwalreceiver", false);
266 elog(ERROR, "libpqwalreceiver didn't initialize correctly");
267
268 /* Unblock signals (they were blocked when the postmaster forked us) */
270
271 /* Establish the connection to the primary for XLOG streaming */
272 appname = cluster_name[0] ? cluster_name : "walreceiver";
273 wrconn = walrcv_connect(conninfo, true, false, false, appname, &err);
274 if (!wrconn)
277 errmsg("streaming replication receiver \"%s\" could not connect to the primary server: %s",
278 appname, err)));
279
280 /*
281 * Save user-visible connection string. This clobbers the original
282 * conninfo, for security. Also save host and port of the sender server
283 * this walreceiver is connected to.
284 */
286 walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
287 SpinLockAcquire(&walrcv->mutex);
288 memset(walrcv->conninfo, 0, MAXCONNINFO);
289 if (tmp_conninfo)
291
292 memset(walrcv->sender_host, 0, NI_MAXHOST);
293 if (sender_host)
294 strlcpy(walrcv->sender_host, sender_host, NI_MAXHOST);
295
296 walrcv->sender_port = sender_port;
297 walrcv->ready_to_display = true;
298 SpinLockRelease(&walrcv->mutex);
299
300 if (tmp_conninfo)
302
303 if (sender_host)
304 pfree(sender_host);
305
306 first_stream = true;
307 for (;;)
308 {
309 char *primary_sysid;
310 char standby_sysid[32];
312
313 /*
314 * Check that we're connected to a valid server using the
315 * IDENTIFY_SYSTEM replication command.
316 */
318
322 {
325 errmsg("database system identifier differs between the primary and standby"),
326 errdetail("The primary's identifier is %s, the standby's identifier is %s.",
328 }
329
330 /*
331 * Confirm that the current timeline of the primary is the same or
332 * ahead of ours.
333 */
334 if (primaryTLI < startpointTLI)
337 errmsg("highest timeline %u of the primary is behind recovery timeline %u",
338 primaryTLI, startpointTLI)));
339
340 /*
341 * Get any missing history files. We do this always, even when we're
342 * not interested in that timeline, so that if we're promoted to
343 * become the primary later on, we don't select the same timeline that
344 * was already used in the current primary. This isn't bullet-proof -
345 * you'll need some external software to manage your cluster if you
346 * need to ensure that a unique timeline id is chosen in every case,
347 * but let's avoid the confusion of timeline id collisions where we
348 * can.
349 */
351
352 /*
353 * Create temporary replication slot if requested, and update slot
354 * name in shared memory. (Note the slot name cannot already be set
355 * in this case.)
356 */
357 if (is_temp_slot)
358 {
359 snprintf(slotname, sizeof(slotname),
360 "pg_walreceiver_%lld",
361 (long long int) walrcv_get_backend_pid(wrconn));
362
363 walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL);
364
365 SpinLockAcquire(&walrcv->mutex);
366 strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
367 SpinLockRelease(&walrcv->mutex);
368 }
369
370 /*
371 * Start streaming.
372 *
373 * We'll try to start at the requested starting point and timeline,
374 * even if it's different from the server's latest timeline. In case
375 * we've already reached the end of the old timeline, the server will
376 * finish the streaming immediately, and we will go back to await
377 * orders from the startup process. If recovery_target_timeline is
378 * 'latest', the startup process will scan pg_wal and find the new
379 * history file, bump recovery target timeline, and ask us to restart
380 * on the new timeline.
381 */
382 options.logical = false;
383 options.startpoint = startpoint;
384 options.slotname = slotname[0] != '\0' ? slotname : NULL;
385 options.proto.physical.startpointTLI = startpointTLI;
387 {
388 if (first_stream)
389 ereport(LOG,
390 errmsg("started streaming WAL from primary at %X/%08X on timeline %u",
391 LSN_FORMAT_ARGS(startpoint), startpointTLI));
392 else
393 ereport(LOG,
394 errmsg("restarted WAL streaming at %X/%08X on timeline %u",
395 LSN_FORMAT_ARGS(startpoint), startpointTLI));
396 first_stream = false;
397
398 /*
399 * Switch to STREAMING after a successful connection if current
400 * state is CONNECTING. This switch happens after an initial
401 * startup, or after a restart as determined by
402 * WalRcvWaitForStartPosition().
403 */
404 SpinLockAcquire(&walrcv->mutex);
405 if (walrcv->walRcvState == WALRCV_CONNECTING)
406 walrcv->walRcvState = WALRCV_STREAMING;
407 SpinLockRelease(&walrcv->mutex);
408
409 /* Initialize LogstreamResult and buffers for processing messages */
412
413 /* Initialize nap wakeup times. */
415 for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
417
418 /* Send initial reply/feedback messages. */
419 XLogWalRcvSendReply(true, false);
421
422 /* Loop until end-of-streaming or error */
423 for (;;)
424 {
425 char *buf;
426 int len;
427 bool endofwal = false;
429 int rc;
431 long nap;
432
433 /*
434 * Exit walreceiver if we're not in recovery. This should not
435 * happen, but cross-check the status here.
436 */
437 if (!RecoveryInProgress())
440 errmsg("cannot continue WAL streaming, recovery has already ended")));
441
442 /* Process any requests or signals received recently */
444
446 {
447 ConfigReloadPending = false;
449 /* recompute wakeup times */
451 for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
454 }
455
456 /* See if we can read data immediately */
458 if (len != 0)
459 {
460 /*
461 * Process the received data, and any subsequent data we
462 * can read without blocking.
463 */
464 for (;;)
465 {
466 if (len > 0)
467 {
468 /*
469 * Something was received from primary, so adjust
470 * the ping and terminate wakeup times.
471 */
474 now);
476 XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
477 startpointTLI);
478 }
479 else if (len == 0)
480 break;
481 else if (len < 0)
482 {
483 ereport(LOG,
484 (errmsg("replication terminated by primary server"),
485 errdetail("End of WAL reached on timeline %u at %X/%08X.",
486 startpointTLI,
488 endofwal = true;
489 break;
490 }
492 }
493
494 /* Let the primary know that we received some data. */
495 XLogWalRcvSendReply(false, false);
496
497 /*
498 * If we've written some records, flush them to disk and
499 * let the startup process and primary server know about
500 * them.
501 */
502 XLogWalRcvFlush(false, startpointTLI);
503 }
504
505 /* Check if we need to exit the streaming loop. */
506 if (endofwal)
507 break;
508
509 /* Find the soonest wakeup time, to limit our nap. */
511 for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
513
514 /* Calculate the nap time, clamping as necessary. */
517
518 /*
519 * Ideally we would reuse a WaitEventSet object repeatedly
520 * here to avoid the overheads of WaitLatchOrSocket on epoll
521 * systems, but we can't be sure that libpq (or any other
522 * walreceiver implementation) has the same socket (even if
523 * the fd is the same number, it may have been closed and
524 * reopened since the last time). In future, if there is a
525 * function for removing sockets from WaitEventSet, then we
526 * could add and remove just the socket each time, potentially
527 * avoiding some system calls.
528 */
533 wait_fd,
534 nap,
536 if (rc & WL_LATCH_SET)
537 {
540
541 if (walrcv->force_reply)
542 {
543 /*
544 * The recovery process has asked us to send apply
545 * feedback now. Make sure the flag is really set to
546 * false in shared memory before sending the reply, so
547 * we don't miss a new request for a reply.
548 */
549 walrcv->force_reply = false;
551 XLogWalRcvSendReply(true, false);
552 }
553 }
554 if (rc & WL_TIMEOUT)
555 {
556 /*
557 * We didn't receive anything new. If we haven't heard
558 * anything from the server for more than
559 * wal_receiver_timeout / 2, ping the server. Also, if
560 * it's been longer than wal_receiver_status_interval
561 * since the last update we sent, send a status update to
562 * the primary anyway, to report any progress in applying
563 * WAL.
564 */
565 bool requestReply = false;
566
567 /*
568 * Report pending statistics to the cumulative stats
569 * system. This location is useful for the report as it
570 * is not within a tight loop in the WAL receiver, to
571 * avoid bloating pgstats with requests, while also making
572 * sure that the reports happen each time a status update
573 * is sent.
574 */
575 pgstat_report_wal(false);
576
577 /*
578 * Check if time since last receive from primary has
579 * reached the configured limit.
580 */
585 errmsg("terminating walreceiver due to timeout")));
586
587 /*
588 * If we didn't receive anything new for half of receiver
589 * replication timeout, then ping the server.
590 */
592 {
593 requestReply = true;
595 }
596
599 }
600 }
601
602 /*
603 * The backend finished streaming. Exit streaming COPY-mode from
604 * our side, too.
605 */
607
608 /*
609 * If the server had switched to a new timeline that we didn't
610 * know about when we began streaming, fetch its timeline history
611 * file now.
612 */
614 }
615 else
616 ereport(LOG,
617 (errmsg("primary server contains no more WAL on requested timeline %u",
618 startpointTLI)));
619
620 /*
621 * End of WAL reached on the requested timeline. Close the last
622 * segment, and await for new orders from the startup process.
623 */
624 if (recvFile >= 0)
625 {
627
628 XLogWalRcvFlush(false, startpointTLI);
630 if (close(recvFile) != 0)
633 errmsg("could not close WAL segment %s: %m",
634 xlogfname)));
635
636 /*
637 * Create .done file forcibly to prevent the streamed segment from
638 * being archived later.
639 */
642 else
644 }
645 recvFile = -1;
646
647 elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
648 WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
649 }
650 /* not reached */
651}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:485
#define pg_memory_barrier()
Definition atomics.h:141
void AuxiliaryProcessMainCommon(void)
Definition auxprocess.c:39
sigset_t UnBlockSig
Definition pqsignal.c:22
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1757
#define Min(x, y)
Definition c.h:997
#define UINT64_FORMAT
Definition c.h:565
void load_file(const char *filename, bool restricted)
Definition dfmgr.c:149
int errcode_for_file_access(void)
Definition elog.c:886
int errdetail(const char *fmt,...)
Definition elog.c:1216
#define PANIC
Definition elog.h:42
#define DEBUG1
Definition elog.h:30
void err(int eval, const char *fmt,...)
Definition err.c:43
ProcNumber MyProcNumber
Definition globals.c:90
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
@ PGC_SIGHUP
Definition guc.h:75
char * cluster_name
Definition guc_tables.c:564
#define close(a)
Definition win32.h:12
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
int i
Definition isn.c:77
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition latch.c:223
static char buf[DEFAULT_XLOG_SEG_SIZE]
#define die(msg)
void pgstat_report_wal(bool force)
Definition pgstat_wal.c:46
#define pqsignal
Definition port.h:547
int pgsocket
Definition port.h:29
#define PGINVALID_SOCKET
Definition port.h:31
static Datum PointerGetDatum(const void *X)
Definition postgres.h:352
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition procsignal.c:677
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
#define WL_SOCKET_READABLE
#define WL_TIMEOUT
#define NUM_WALRCV_WAKEUPS
static StringInfoData reply_message
static int recvFile
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
static TimeLineID recvFileTLI
WalReceiverFunctionsType * WalReceiverFunctions
Definition walreceiver.c:95
static XLogSegNo recvSegNo
static void XLogWalRcvSendHSFeedback(bool immed)
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
static void WalRcvDie(int code, Datum arg)
static void XLogWalRcvSendReply(bool force, bool requestReply)
static struct @19 LogstreamResult
#define walrcv_startstreaming(conn, options)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
#define walrcv_get_conninfo(conn)
#define walrcv_endstreaming(conn, next_tli)
#define walrcv_identify_system(conn, primary_tli)
#define walrcv_get_backend_pid(conn)
#define walrcv_receive(conn, buffer, wait_fd)
#define SIGCHLD
Definition win32_port.h:168
#define SIGHUP
Definition win32_port.h:158
#define SIGPIPE
Definition win32_port.h:163
#define SIGUSR1
Definition win32_port.h:170
#define SIGALRM
Definition win32_port.h:164
#define SIGUSR2
Definition win32_port.h:171
uint64 GetSystemIdentifier(void)
Definition xlog.c:4627
bool RecoveryInProgress(void)
Definition xlog.c:6460
int wal_segment_size
Definition xlog.c:146
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References ARCHIVE_MODE_ALWAYS, Assert, AuxiliaryProcessMainCommon(), buf, CHECK_FOR_INTERRUPTS, close, cluster_name, ConditionVariableBroadcast(), ConfigReloadPending, DEBUG1, die, elog, ereport, err(), errcode(), errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, fb(), GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), i, initStringInfo(), len, load_file(), LOG, LogstreamResult, LSN_FORMAT_ARGS, MAXCONNINFO, MAXFNAMELEN, Min, MyLatch, MyProcNumber, MyProcPid, NAMEDATALEN, now(), NUM_WALRCV_WAKEUPS, on_shmem_exit(), PANIC, pfree(), pg_atomic_write_u64(), pg_memory_barrier, PGC_SIGHUP, PGINVALID_SOCKET, pgstat_report_wal(), PointerGetDatum(), pqsignal, proc_exit(), ProcessConfigFile(), procsignal_sigusr1_handler(), RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, reply_message, ResetLatch(), SIGALRM, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, snprintf, SpinLockAcquire, SpinLockRelease, strlcpy(), TIMESTAMP_INFINITY, TimestampDifferenceMilliseconds(), UINT64_FORMAT, UnBlockSig, WaitLatchOrSocket(), wakeup, wal_segment_size, WalRcv, walrcv_connect, WALRCV_CONNECTING, walrcv_create_slot, walrcv_endstreaming, walrcv_get_backend_pid, walrcv_get_conninfo, walrcv_get_senderinfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_TERMINATE, WalRcvComputeNextWakeup(), WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvWaitForStartPosition(), WalReceiverFunctions, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_TIMEOUT, wrconn, WalRcvData::writtenUpto, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName(), XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ XLogWalRcvClose()

static void XLogWalRcvClose ( XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 1070 of file walreceiver.c.

1071{
1072 char xlogfname[MAXFNAMELEN];
1073
1075 Assert(tli != 0);
1076
1077 /*
1078 * fsync() and close current file before we switch to next one. We would
1079 * otherwise have to reopen this file to fsync it later
1080 */
1081 XLogWalRcvFlush(false, tli);
1082
1084
1085 /*
1086 * XLOG segment files will be re-read by recovery in startup process soon,
1087 * so we don't advise the OS to release cache pages associated with the
1088 * file like XLogFileClose() does.
1089 */
1090 if (close(recvFile) != 0)
1091 ereport(PANIC,
1093 errmsg("could not close WAL segment %s: %m",
1094 xlogfname)));
1095
1096 /*
1097 * Create .done file forcibly to prevent the streamed segment from being
1098 * archived later.
1099 */
1102 else
1104
1105 recvFile = -1;
1106}
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

References ARCHIVE_MODE_ALWAYS, Assert, close, ereport, errcode_for_file_access(), errmsg(), fb(), MAXFNAMELEN, PANIC, recvFile, recvFileTLI, recvSegNo, wal_segment_size, XLByteInSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName(), and XLogWalRcvFlush().

Referenced by XLogWalRcvWrite().

◆ XLogWalRcvFlush()

static void XLogWalRcvFlush ( bool  dying,
TimeLineID  tli 
)
static

Definition at line 1006 of file walreceiver.c.

1007{
1008 Assert(tli != 0);
1009
1010 if (LogstreamResult.Flush < LogstreamResult.Write)
1011 {
1013
1015
1016 LogstreamResult.Flush = LogstreamResult.Write;
1017
1018 /* Update shared-memory status */
1019 SpinLockAcquire(&walrcv->mutex);
1020 if (walrcv->flushedUpto < LogstreamResult.Flush)
1021 {
1022 walrcv->latestChunkStart = walrcv->flushedUpto;
1023 walrcv->flushedUpto = LogstreamResult.Flush;
1024 walrcv->receivedTLI = tli;
1025 }
1026 SpinLockRelease(&walrcv->mutex);
1027
1028 /*
1029 * If we flushed an LSN that someone was waiting for, notify the
1030 * waiters.
1031 */
1032 if (waitLSNState &&
1033 (LogstreamResult.Flush >=
1036
1037 /* Signal the startup process and walsender that new WAL has arrived */
1040 WalSndWakeup(true, false);
1041
1042 /* Report XLOG streaming progress in PS display */
1044 {
1045 char activitymsg[50];
1046
1047 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%08X",
1050 }
1051
1052 /* Also let the primary know that we made some progress */
1053 if (!dying)
1054 {
1055 XLogWalRcvSendReply(false, false);
1057 }
1058 }
1059}
pg_atomic_uint64 minWaitedLSN[WAIT_LSN_TYPE_COUNT]
Definition xlogwait.h:85
#define AllowCascadeReplication()
Definition walreceiver.h:40
void WalSndWakeup(bool physical, bool logical)
Definition walsender.c:3810
void issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli)
Definition xlog.c:8858
struct WaitLSNState * waitLSNState
Definition xlogwait.c:68
void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
Definition xlogwait.c:317
@ WAIT_LSN_TYPE_STANDBY_FLUSH
Definition xlogwait.h:41

References AllowCascadeReplication, Assert, fb(), issue_xlog_fsync(), LogstreamResult, LSN_FORMAT_ARGS, WaitLSNState::minWaitedLSN, pg_atomic_read_u64(), recvFile, recvSegNo, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WAIT_LSN_TYPE_STANDBY_FLUSH, waitLSNState, WaitLSNWakeup(), WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvClose().

◆ XLogWalRcvProcessMsg()

static void XLogWalRcvProcessMsg ( unsigned char  type,
char buf,
Size  len,
TimeLineID  tli 
)
static

Definition at line 832 of file walreceiver.c.

833{
834 int hdrlen;
838 bool replyRequested;
839
840 switch (type)
841 {
843 {
845
846 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
847 if (len < hdrlen)
850 errmsg_internal("invalid WAL message received from primary")));
851
852 /* initialize a StringInfo with the given buffer */
854
855 /* read the fields */
860
861 buf += hdrlen;
862 len -= hdrlen;
864 break;
865 }
867 {
869
870 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
871 if (len != hdrlen)
874 errmsg_internal("invalid keepalive message received from primary")));
875
876 /* initialize a StringInfo with the given buffer */
878
879 /* read the fields */
883
885
886 /* If the primary requested a reply, send one immediately */
887 if (replyRequested)
888 XLogWalRcvSendReply(true, false);
889 break;
890 }
891 default:
894 errmsg_internal("invalid replication message type %d",
895 type)));
896 }
897}
int64_t int64
Definition c.h:543
int pq_getmsgbyte(StringInfo msg)
Definition pqformat.c:398
int64 pq_getmsgint64(StringInfo msg)
Definition pqformat.c:452
#define PqReplMsg_WALData
Definition protocol.h:77
#define PqReplMsg_Keepalive
Definition protocol.h:75
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition stringinfo.h:157
const char * type
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)

References buf, ereport, errcode(), errmsg_internal(), ERROR, fb(), initReadOnlyStringInfo(), len, pq_getmsgbyte(), pq_getmsgint64(), PqReplMsg_Keepalive, PqReplMsg_WALData, ProcessWalSndrMessage(), type, XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

◆ XLogWalRcvSendHSFeedback()

static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1191 of file walreceiver.c.

1192{
1195 TransactionId nextXid;
1198 TransactionId xmin,
1199 catalog_xmin;
1200
1201 /* initially true so we always send at least one feedback message */
1202 static bool primary_has_standby_xmin = true;
1203
1204 /*
1205 * If the user doesn't want status to be reported to the primary, be sure
1206 * to exit before doing anything at all.
1207 */
1210 return;
1211
1212 /* Get current timestamp. */
1214
1215 /* Send feedback at most once per wal_receiver_status_interval. */
1217 return;
1218
1219 /* Make sure we wake up when it's time to send feedback again. */
1221
1222 /*
1223 * If Hot Standby is not yet accepting connections there is nothing to
1224 * send. Check this after the interval has expired to reduce number of
1225 * calls.
1226 *
1227 * Bailing out here also ensures that we don't send feedback until we've
1228 * read our own replication slot state, so we don't tell the primary to
1229 * discard needed xmin or catalog_xmin from any slots that may exist on
1230 * this replica.
1231 */
1232 if (!HotStandbyActive())
1233 return;
1234
1235 /*
1236 * Make the expensive call to get the oldest xmin once we are certain
1237 * everything else has been checked.
1238 */
1240 {
1241 GetReplicationHorizons(&xmin, &catalog_xmin);
1242 }
1243 else
1244 {
1245 xmin = InvalidTransactionId;
1246 catalog_xmin = InvalidTransactionId;
1247 }
1248
1249 /*
1250 * Get epoch and adjust if nextXid and oldestXmin are different sides of
1251 * the epoch boundary.
1252 */
1257 if (nextXid < xmin)
1258 xmin_epoch--;
1259 if (nextXid < catalog_xmin)
1261
1262 elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1263 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1264
1265 /* Construct the message and send it. */
1271 pq_sendint32(&reply_message, catalog_xmin);
1274 if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1276 else
1278}
uint32_t uint32
Definition c.h:546
uint32 TransactionId
Definition c.h:666
static void pq_sendint32(StringInfo buf, uint32 i)
Definition pqformat.h:144
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition pqformat.h:152
void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin)
Definition procarray.c:1992
#define PqReplMsg_HotStandbyFeedback
Definition protocol.h:82
void resetStringInfo(StringInfo str)
Definition stringinfo.c:126
#define InvalidTransactionId
Definition transam.h:31
#define EpochFromFullTransactionId(x)
Definition transam.h:47
#define XidFromFullTransactionId(x)
Definition transam.h:48
#define TransactionIdIsValid(xid)
Definition transam.h:41
FullTransactionId ReadNextFullTransactionId(void)
Definition varsup.c:288
#define walrcv_send(conn, buffer, nbytes)
bool HotStandbyActive(void)

References StringInfoData::data, DEBUG2, elog, EpochFromFullTransactionId, fb(), GetCurrentTimestamp(), GetReplicationHorizons(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), pq_sendbyte(), pq_sendint32(), pq_sendint64(), PqReplMsg_HotStandbyFeedback, ReadNextFullTransactionId(), reply_message, resetStringInfo(), TransactionIdIsValid, wakeup, wal_receiver_status_interval, walrcv_send, WALRCV_WAKEUP_HSFEEDBACK, WalRcvComputeNextWakeup(), wrconn, and XidFromFullTransactionId.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

◆ XLogWalRcvSendReply()

static void XLogWalRcvSendReply ( bool  force,
bool  requestReply 
)
static

Definition at line 1122 of file walreceiver.c.

1123{
1128
1129 /*
1130 * If the user doesn't want status to be reported to the primary, be sure
1131 * to exit before doing anything at all.
1132 */
1133 if (!force && wal_receiver_status_interval <= 0)
1134 return;
1135
1136 /* Get current timestamp. */
1138
1139 /*
1140 * We can compare the write and flush positions to the last message we
1141 * sent without taking any lock, but the apply position requires a spin
1142 * lock, so we don't check that unless something else has changed or 10
1143 * seconds have passed. This means that the apply WAL location will
1144 * appear, from the primary's point of view, to lag slightly, but since
1145 * this is only for reporting purposes and only on idle systems, that's
1146 * probably OK.
1147 */
1148 if (!force
1149 && writePtr == LogstreamResult.Write
1150 && flushPtr == LogstreamResult.Flush
1152 return;
1153
1154 /* Make sure we wake up when it's time to send another reply. */
1156
1157 /* Construct a new message */
1158 writePtr = LogstreamResult.Write;
1159 flushPtr = LogstreamResult.Flush;
1161
1169
1170 /* Send it */
1171 elog(DEBUG2, "sending write %X/%08X flush %X/%08X apply %X/%08X%s",
1175 requestReply ? " (reply requested)" : "");
1176
1178}
#define PqReplMsg_StandbyStatusUpdate
Definition protocol.h:84

References StringInfoData::data, DEBUG2, elog, fb(), GetCurrentTimestamp(), GetXLogReplayRecPtr(), InvalidXLogRecPtr, StringInfoData::len, LogstreamResult, LSN_FORMAT_ARGS, now(), pq_sendbyte(), pq_sendint64(), PqReplMsg_StandbyStatusUpdate, reply_message, resetStringInfo(), wakeup, wal_receiver_status_interval, walrcv_send, WALRCV_WAKEUP_REPLY, WalRcvComputeNextWakeup(), and wrconn.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

◆ XLogWalRcvWrite()

static void XLogWalRcvWrite ( char buf,
Size  nbytes,
XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 903 of file walreceiver.c.

904{
905 int startoff;
906 int byteswritten;
908
909 Assert(tli != 0);
910
911 while (nbytes > 0)
912 {
913 int segbytes;
914
915 /* Close the current segment if it's completed */
918
919 if (recvFile < 0)
920 {
921 /* Create/use new log file */
924 recvFileTLI = tli;
925 }
926
927 /* Calculate the start offset of the received logs */
929
930 if (startoff + nbytes > wal_segment_size)
932 else
933 segbytes = nbytes;
934
935 /* OK to write the logs */
936 errno = 0;
937
938 /*
939 * Measure I/O timing to write WAL data, for pg_stat_io.
940 */
942
946
949
950 if (byteswritten <= 0)
951 {
953 int save_errno;
954
955 /* if write didn't set errno, assume no disk space */
956 if (errno == 0)
957 errno = ENOSPC;
958
964 errmsg("could not write to WAL segment %s "
965 "at offset %d, length %d: %m",
967 }
968
969 /* Update state for write */
971
972 nbytes -= byteswritten;
973 buf += byteswritten;
974
975 LogstreamResult.Write = recptr;
976 }
977
978 /* Update shared-memory status */
980
981 /*
982 * If we wrote an LSN that someone was waiting for, notify the waiters.
983 */
984 if (waitLSNState &&
985 (LogstreamResult.Write >=
988
989 /*
990 * Close the current segment if it's fully written up in the last cycle of
991 * the loop, to create its archive notification file soon. Otherwise WAL
992 * archiving of the segment will be delayed until any data in the next
993 * segment is received and written.
994 */
997}
return str start
@ IOOBJECT_WAL
Definition pgstat.h:279
@ IOCONTEXT_NORMAL
Definition pgstat.h:289
@ IOOP_WRITE
Definition pgstat.h:316
instr_time pgstat_prepare_io_time(bool track_io_guc)
Definition pgstat_io.c:91
void pgstat_count_io_op_time(IOObject io_object, IOContext io_context, IOOp io_op, instr_time start_time, uint32 cnt, uint64 bytes)
Definition pgstat_io.c:122
#define pg_pwrite
Definition port.h:248
off_t pgoff_t
Definition port.h:421
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition wait_event.h:69
static void pgstat_report_wait_end(void)
Definition wait_event.h:85
static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
int XLogFileInit(XLogSegNo logsegno, TimeLineID logtli)
Definition xlog.c:3417
bool track_wal_io_timing
Definition xlog.c:140
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
@ WAIT_LSN_TYPE_STANDBY_WRITE
Definition xlogwait.h:40

References Assert, buf, ereport, errcode_for_file_access(), errmsg(), fb(), IOCONTEXT_NORMAL, IOOBJECT_WAL, IOOP_WRITE, LogstreamResult, MAXFNAMELEN, WaitLSNState::minWaitedLSN, PANIC, pg_atomic_read_u64(), pg_atomic_write_u64(), pg_pwrite, pgstat_count_io_op_time(), pgstat_prepare_io_time(), pgstat_report_wait_end(), pgstat_report_wait_start(), recvFile, recvFileTLI, recvSegNo, start, track_wal_io_timing, WAIT_LSN_TYPE_STANDBY_WRITE, waitLSNState, WaitLSNWakeup(), wal_segment_size, WalRcv, WalRcvData::writtenUpto, XLByteInSeg, XLByteToSeg, XLogFileInit(), XLogFileName(), XLogSegmentOffset, and XLogWalRcvClose().

Referenced by XLogWalRcvProcessMsg().

Variable Documentation

◆ Flush

XLogRecPtr Flush

Definition at line 113 of file walreceiver.c.

Referenced by XLogWrite().

◆ hot_standby_feedback

bool hot_standby_feedback

◆ [struct]

struct { ... } LogstreamResult

◆ recvFile

int recvFile = -1
static

Definition at line 102 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ recvFileTLI

TimeLineID recvFileTLI = 0
static

Definition at line 103 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), and XLogWalRcvWrite().

◆ recvSegNo

XLogSegNo recvSegNo = 0
static

Definition at line 104 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ reply_message

StringInfoData reply_message
static

◆ wakeup

◆ wal_receiver_status_interval

◆ wal_receiver_timeout

int wal_receiver_timeout

◆ WalReceiverFunctions

WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 95 of file walreceiver.c.

Referenced by _PG_init(), and WalReceiverMain().

◆ wrconn

◆ Write

XLogRecPtr Write

Definition at line 112 of file walreceiver.c.

Referenced by XLogWrite().