PostgreSQL Source Code git master
Loading...
Searching...
No Matches
walreceiver.c File Reference
#include "postgres.h"
#include <unistd.h>
#include "access/htup_details.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "access/xlogrecovery.h"
#include "access/xlogwait.h"
#include "catalog/pg_authid.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/auxprocess.h"
#include "postmaster/interrupt.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Macros

#define NUM_WALRCV_WAKEUPS   (WALRCV_WAKEUP_HSFEEDBACK + 1)
 

Typedefs

typedef enum WalRcvWakeupReason WalRcvWakeupReason
 

Enumerations

enum  WalRcvWakeupReason { WALRCV_WAKEUP_TERMINATE , WALRCV_WAKEUP_PING , WALRCV_WAKEUP_REPLY , WALRCV_WAKEUP_HSFEEDBACK }
 

Functions

static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
 
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 
static void WalRcvDie (int code, Datum arg)
 
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len, TimeLineID tli)
 
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvFlush (bool dying, TimeLineID tli)
 
static void XLogWalRcvClose (XLogRecPtr recptr, TimeLineID tli)
 
static void XLogWalRcvSendReply (bool force, bool requestReply)
 
static void XLogWalRcvSendHSFeedback (bool immed)
 
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
 
static void WalRcvComputeNextWakeup (WalRcvWakeupReason reason, TimestampTz now)
 
void WalReceiverMain (const void *startup_data, size_t startup_data_len)
 
void WalRcvForceReply (void)
 
static const charWalRcvGetStateString (WalRcvState state)
 
Datum pg_stat_get_wal_receiver (PG_FUNCTION_ARGS)
 

Variables

int wal_receiver_status_interval
 
int wal_receiver_timeout
 
bool hot_standby_feedback
 
static WalReceiverConnwrconn = NULL
 
WalReceiverFunctionsTypeWalReceiverFunctions = NULL
 
static int recvFile = -1
 
static TimeLineID recvFileTLI = 0
 
static XLogSegNo recvSegNo = 0
 
struct { 
 
   XLogRecPtr   Write 
 
   XLogRecPtr   Flush 
 
LogstreamResult 
 
static TimestampTz wakeup [NUM_WALRCV_WAKEUPS]
 
static StringInfoData reply_message
 

Macro Definition Documentation

◆ NUM_WALRCV_WAKEUPS

#define NUM_WALRCV_WAKEUPS   (WALRCV_WAKEUP_HSFEEDBACK + 1)

Definition at line 125 of file walreceiver.c.

Typedef Documentation

◆ WalRcvWakeupReason

Enumeration Type Documentation

◆ WalRcvWakeupReason

Enumerator
WALRCV_WAKEUP_TERMINATE 
WALRCV_WAKEUP_PING 
WALRCV_WAKEUP_REPLY 
WALRCV_WAKEUP_HSFEEDBACK 

Definition at line 119 of file walreceiver.c.

120{
125#define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1)
WalRcvWakeupReason
@ WALRCV_WAKEUP_TERMINATE
@ WALRCV_WAKEUP_REPLY
@ WALRCV_WAKEUP_PING
@ WALRCV_WAKEUP_HSFEEDBACK

Function Documentation

◆ pg_stat_get_wal_receiver()

Datum pg_stat_get_wal_receiver ( PG_FUNCTION_ARGS  )

Definition at line 1426 of file walreceiver.c.

1427{
1428 TupleDesc tupdesc;
1429 Datum *values;
1430 bool *nulls;
1431 int pid;
1432 bool ready_to_display;
1439 TimestampTz last_send_time;
1443 char sender_host[NI_MAXHOST];
1444 int sender_port = 0;
1445 char slotname[NAMEDATALEN];
1446 char conninfo[MAXCONNINFO];
1447
1448 /* Take a lock to ensure value consistency */
1450 pid = (int) WalRcv->pid;
1451 ready_to_display = WalRcv->ready_to_display;
1457 last_send_time = WalRcv->lastMsgSendTime;
1461 strlcpy(slotname, WalRcv->slotname, sizeof(slotname));
1462 strlcpy(sender_host, WalRcv->sender_host, sizeof(sender_host));
1463 sender_port = WalRcv->sender_port;
1464 strlcpy(conninfo, WalRcv->conninfo, sizeof(conninfo));
1466
1467 /*
1468 * No WAL receiver (or not ready yet), just return a tuple with NULL
1469 * values
1470 */
1471 if (pid == 0 || !ready_to_display)
1473
1474 /*
1475 * Read "writtenUpto" without holding a spinlock. Note that it may not be
1476 * consistent with the other shared variables of the WAL receiver
1477 * protected by a spinlock, but this should not be used for data integrity
1478 * checks.
1479 */
1481
1482 /* determine result type */
1483 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1484 elog(ERROR, "return type must be a row type");
1485
1486 values = palloc0_array(Datum, tupdesc->natts);
1487 nulls = palloc0_array(bool, tupdesc->natts);
1488
1489 /* Fetch values */
1490 values[0] = Int32GetDatum(pid);
1491
1493 {
1494 /*
1495 * Only superusers and roles with privileges of pg_read_all_stats can
1496 * see details. Other users only get the pid value to know whether it
1497 * is a WAL receiver, but no details.
1498 */
1499 memset(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1500 }
1501 else
1502 {
1504
1506 nulls[2] = true;
1507 else
1511 nulls[4] = true;
1512 else
1515 nulls[5] = true;
1516 else
1519 if (last_send_time == 0)
1520 nulls[7] = true;
1521 else
1522 values[7] = TimestampTzGetDatum(last_send_time);
1523 if (last_receipt_time == 0)
1524 nulls[8] = true;
1525 else
1528 nulls[9] = true;
1529 else
1531 if (latest_end_time == 0)
1532 nulls[10] = true;
1533 else
1535 if (*slotname == '\0')
1536 nulls[11] = true;
1537 else
1538 values[11] = CStringGetTextDatum(slotname);
1539 if (*sender_host == '\0')
1540 nulls[12] = true;
1541 else
1542 values[12] = CStringGetTextDatum(sender_host);
1543 if (sender_port == 0)
1544 nulls[13] = true;
1545 else
1546 values[13] = Int32GetDatum(sender_port);
1547 if (*conninfo == '\0')
1548 nulls[14] = true;
1549 else
1550 values[14] = CStringGetTextDatum(conninfo);
1551 }
1552
1553 /* Returns the record as Datum */
1555}
bool has_privs_of_role(Oid member, Oid role)
Definition acl.c:5284
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition atomics.h:467
static Datum values[MAXATTR]
Definition bootstrap.c:155
#define CStringGetTextDatum(s)
Definition builtins.h:97
int64 TimestampTz
Definition timestamp.h:39
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define palloc0_array(type, count)
Definition fe_memutils.h:77
#define PG_RETURN_NULL()
Definition fmgr.h:346
#define PG_RETURN_DATUM(x)
Definition fmgr.h:354
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition funcapi.c:276
@ TYPEFUNC_COMPOSITE
Definition funcapi.h:149
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
Definition funcapi.h:230
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1117
Oid GetUserId(void)
Definition miscinit.c:469
#define NAMEDATALEN
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition strlcpy.c:45
uint64_t Datum
Definition postgres.h:70
static Datum Int32GetDatum(int32 X)
Definition postgres.h:222
static int fb(int x)
#define SpinLockRelease(lock)
Definition spin.h:61
#define SpinLockAcquire(lock)
Definition spin.h:59
TimestampTz lastMsgReceiptTime
XLogRecPtr latestWalEnd
TimeLineID receiveStartTLI
Definition walreceiver.h:88
TimeLineID receivedTLI
Definition walreceiver.h:98
char slotname[NAMEDATALEN]
char sender_host[NI_MAXHOST]
XLogRecPtr receiveStart
Definition walreceiver.h:87
XLogRecPtr flushedUpto
Definition walreceiver.h:97
pg_atomic_uint64 writtenUpto
TimestampTz lastMsgSendTime
WalRcvState walRcvState
Definition walreceiver.h:72
TimestampTz latestWalEndTime
bool ready_to_display
slock_t mutex
char conninfo[MAXCONNINFO]
static Datum TimestampTzGetDatum(TimestampTz X)
Definition timestamp.h:52
static const char * WalRcvGetStateString(WalRcvState state)
#define MAXCONNINFO
Definition walreceiver.h:37
WalRcvState
Definition walreceiver.h:46
WalRcvData * WalRcv
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
uint64 XLogRecPtr
Definition xlogdefs.h:21
uint32 TimeLineID
Definition xlogdefs.h:63

References WalRcvData::conninfo, CStringGetTextDatum, elog, ERROR, fb(), WalRcvData::flushedUpto, get_call_result_type(), GetUserId(), has_privs_of_role(), heap_form_tuple(), HeapTupleGetDatum(), Int32GetDatum(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, LSNGetDatum(), MAXCONNINFO, WalRcvData::mutex, NAMEDATALEN, TupleDescData::natts, palloc0_array, pg_atomic_read_u64(), PG_RETURN_DATUM, PG_RETURN_NULL, WalRcvData::pid, WalRcvData::ready_to_display, WalRcvData::receivedTLI, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, WalRcvData::sender_host, WalRcvData::sender_port, WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, strlcpy(), TimestampTzGetDatum(), TYPEFUNC_COMPOSITE, values, WalRcv, WalRcvGetStateString(), WalRcvData::walRcvState, WalRcvData::writtenUpto, and XLogRecPtrIsValid.

◆ ProcessWalSndrMessage()

static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
)
static

Definition at line 1288 of file walreceiver.c.

1289{
1291 TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1292
1293 /* Update shared-memory status */
1294 SpinLockAcquire(&walrcv->mutex);
1295 if (walrcv->latestWalEnd < walEnd)
1296 walrcv->latestWalEndTime = sendTime;
1297 walrcv->latestWalEnd = walEnd;
1298 walrcv->lastMsgSendTime = sendTime;
1299 walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1300 SpinLockRelease(&walrcv->mutex);
1301
1303 {
1304 char *sendtime;
1305 char *receipttime;
1306 int applyDelay;
1307
1308 /* Copy because timestamptz_to_str returns a static buffer */
1310 receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1312
1313 /* apply delay is not available */
1314 if (applyDelay == -1)
1315 elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1316 sendtime,
1319 else
1320 elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1321 sendtime,
1323 applyDelay,
1325
1326 pfree(sendtime);
1328 }
1329}
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1645
const char * timestamptz_to_str(TimestampTz t)
Definition timestamp.c:1862
bool message_level_is_interesting(int elevel)
Definition elog.c:273
#define DEBUG2
Definition elog.h:29
char * pstrdup(const char *in)
Definition mcxt.c:1781
void pfree(void *pointer)
Definition mcxt.c:1616
int GetReplicationApplyDelay(void)
int GetReplicationTransferLatency(void)

References DEBUG2, elog, fb(), GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), message_level_is_interesting(), pfree(), pstrdup(), SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

◆ WalRcvComputeNextWakeup()

static void WalRcvComputeNextWakeup ( WalRcvWakeupReason  reason,
TimestampTz  now 
)
static

Definition at line 1340 of file walreceiver.c.

1341{
1342 switch (reason)
1343 {
1345 if (wal_receiver_timeout <= 0)
1346 wakeup[reason] = TIMESTAMP_INFINITY;
1347 else
1349 break;
1350 case WALRCV_WAKEUP_PING:
1351 if (wal_receiver_timeout <= 0)
1352 wakeup[reason] = TIMESTAMP_INFINITY;
1353 else
1355 break;
1358 wakeup[reason] = TIMESTAMP_INFINITY;
1359 else
1361 break;
1364 wakeup[reason] = TIMESTAMP_INFINITY;
1365 else
1367 break;
1368 /* there's intentionally no default: here */
1369 }
1370}
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1609
#define TIMESTAMP_INFINITY
Definition timestamp.h:151
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
#define TimestampTzPlusSeconds(tz, s)
Definition timestamp.h:86
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]
bool hot_standby_feedback
Definition walreceiver.c:91
int wal_receiver_status_interval
Definition walreceiver.c:89
int wal_receiver_timeout
Definition walreceiver.c:90

References hot_standby_feedback, now(), TIMESTAMP_INFINITY, TimestampTzPlusMilliseconds, TimestampTzPlusSeconds, wakeup, wal_receiver_status_interval, wal_receiver_timeout, WALRCV_WAKEUP_HSFEEDBACK, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_REPLY, and WALRCV_WAKEUP_TERMINATE.

Referenced by WalReceiverMain(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ WalRcvDie()

static void WalRcvDie ( int  code,
Datum  arg 
)
static

Definition at line 794 of file walreceiver.c.

795{
798
799 Assert(*startpointTLI_p != 0);
800
801 /* Ensure that all WAL records received are flushed to disk */
803
804 /* Mark ourselves inactive in shared memory */
805 SpinLockAcquire(&walrcv->mutex);
806 Assert(walrcv->walRcvState == WALRCV_STREAMING ||
807 walrcv->walRcvState == WALRCV_CONNECTING ||
808 walrcv->walRcvState == WALRCV_RESTARTING ||
809 walrcv->walRcvState == WALRCV_STARTING ||
810 walrcv->walRcvState == WALRCV_WAITING ||
811 walrcv->walRcvState == WALRCV_STOPPING);
812 Assert(walrcv->pid == MyProcPid);
813 walrcv->walRcvState = WALRCV_STOPPED;
814 walrcv->pid = 0;
815 walrcv->procno = INVALID_PROC_NUMBER;
816 walrcv->ready_to_display = false;
817 SpinLockRelease(&walrcv->mutex);
818
819 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
820
821 /* Terminate the connection gracefully. */
822 if (wrconn != NULL)
824
825 /* Wake up the startup process to notice promptly that we're gone */
827}
#define Assert(condition)
Definition c.h:873
void ConditionVariableBroadcast(ConditionVariable *cv)
int MyProcPid
Definition globals.c:47
void * arg
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:342
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
static WalReceiverConn * wrconn
Definition walreceiver.c:94
static void XLogWalRcvFlush(bool dying, TimeLineID tli)
@ WALRCV_STARTING
Definition walreceiver.h:48
@ WALRCV_STOPPED
Definition walreceiver.h:47
@ WALRCV_CONNECTING
Definition walreceiver.h:50
@ WALRCV_RESTARTING
Definition walreceiver.h:53
@ WALRCV_STREAMING
Definition walreceiver.h:51
@ WALRCV_WAITING
Definition walreceiver.h:52
@ WALRCV_STOPPING
Definition walreceiver.h:54
#define walrcv_disconnect(conn)
void WakeupRecovery(void)

References arg, Assert, ConditionVariableBroadcast(), DatumGetPointer(), fb(), INVALID_PROC_NUMBER, MyProcPid, SpinLockAcquire, SpinLockRelease, WakeupRecovery(), WalRcv, WALRCV_CONNECTING, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, wrconn, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

◆ WalRcvFetchTimeLineHistoryFiles()

static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
)
static

Definition at line 738 of file walreceiver.c.

739{
740 TimeLineID tli;
741
742 for (tli = first; tli <= last; tli++)
743 {
744 /* there's no history file for timeline 1 */
745 if (tli != 1 && !existsTimeLineHistory(tli))
746 {
747 char *fname;
748 char *content;
749 int len;
751
752 ereport(LOG,
753 (errmsg("fetching timeline history file for timeline %u from primary server",
754 tli)));
755
756 walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
757
758 /*
759 * Check that the filename on the primary matches what we
760 * calculated ourselves. This is just a sanity check, it should
761 * always match.
762 */
764 if (strcmp(fname, expectedfname) != 0)
767 errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
768 tli)));
769
770 /*
771 * Write the file to pg_wal.
772 */
773 writeTimeLineHistoryFile(tli, content, len);
774
775 /*
776 * Mark the streamed history file as ready for archiving if
777 * archive_mode is always.
778 */
781 else
782 XLogArchiveNotify(fname);
783
784 pfree(fname);
785 pfree(content);
786 }
787 }
788}
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
Definition timeline.c:463
bool existsTimeLineHistory(TimeLineID probeTLI)
Definition timeline.c:222
int errmsg_internal(const char *fmt,...)
Definition elog.c:1170
int errcode(int sqlerrcode)
Definition elog.c:863
int errmsg(const char *fmt,...)
Definition elog.c:1080
#define LOG
Definition elog.h:31
#define ereport(elevel,...)
Definition elog.h:150
const void size_t len
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
int XLogArchiveMode
Definition xlog.c:122
@ ARCHIVE_MODE_ALWAYS
Definition xlog.h:68
#define MAXFNAMELEN
static void TLHistoryFileName(char *fname, TimeLineID tli)
void XLogArchiveForceDone(const char *xlog)
void XLogArchiveNotify(const char *xlog)

References ARCHIVE_MODE_ALWAYS, ereport, errcode(), errmsg(), errmsg_internal(), ERROR, existsTimeLineHistory(), fb(), len, LOG, MAXFNAMELEN, pfree(), TLHistoryFileName(), walrcv_readtimelinehistoryfile, wrconn, writeTimeLineHistoryFile(), XLogArchiveForceDone(), XLogArchiveMode, and XLogArchiveNotify().

Referenced by WalReceiverMain().

◆ WalRcvForceReply()

void WalRcvForceReply ( void  )

Definition at line 1381 of file walreceiver.c.

1382{
1383 ProcNumber procno;
1384
1385 WalRcv->force_reply = true;
1386 /* fetching the proc number is probably atomic, but don't rely on it */
1388 procno = WalRcv->procno;
1390 if (procno != INVALID_PROC_NUMBER)
1391 SetLatch(&GetPGProcByNumber(procno)->procLatch);
1392}
void SetLatch(Latch *latch)
Definition latch.c:290
#define GetPGProcByNumber(n)
Definition proc.h:446
int ProcNumber
Definition procnumber.h:24
sig_atomic_t force_reply
ProcNumber procno
Definition walreceiver.h:68

References WalRcvData::force_reply, GetPGProcByNumber, INVALID_PROC_NUMBER, WalRcvData::mutex, WalRcvData::procno, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by ApplyWalRecord(), and WaitForWALToBecomeAvailable().

◆ WalRcvGetStateString()

static const char * WalRcvGetStateString ( WalRcvState  state)
static

Definition at line 1399 of file walreceiver.c.

1400{
1401 switch (state)
1402 {
1403 case WALRCV_STOPPED:
1404 return "stopped";
1405 case WALRCV_STARTING:
1406 return "starting";
1407 case WALRCV_CONNECTING:
1408 return "connecting";
1409 case WALRCV_STREAMING:
1410 return "streaming";
1411 case WALRCV_WAITING:
1412 return "waiting";
1413 case WALRCV_RESTARTING:
1414 return "restarting";
1415 case WALRCV_STOPPING:
1416 return "stopping";
1417 }
1418 return "UNKNOWN";
1419}

References WALRCV_CONNECTING, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, and WALRCV_WAITING.

Referenced by pg_stat_get_wal_receiver().

◆ WalRcvWaitForStartPosition()

static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
)
static

Definition at line 658 of file walreceiver.c.

659{
661 int state;
662
663 SpinLockAcquire(&walrcv->mutex);
664 state = walrcv->walRcvState;
666 {
667 SpinLockRelease(&walrcv->mutex);
668 if (state == WALRCV_STOPPING)
669 proc_exit(0);
670 else
671 elog(FATAL, "unexpected walreceiver state");
672 }
673 walrcv->walRcvState = WALRCV_WAITING;
674 walrcv->receiveStart = InvalidXLogRecPtr;
675 walrcv->receiveStartTLI = 0;
676 SpinLockRelease(&walrcv->mutex);
677
678 set_ps_display("idle");
679
680 /*
681 * nudge startup process to notice that we've stopped streaming and are
682 * now waiting for instructions.
683 */
685 for (;;)
686 {
688
690
691 SpinLockAcquire(&walrcv->mutex);
692 Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
693 walrcv->walRcvState == WALRCV_WAITING ||
694 walrcv->walRcvState == WALRCV_STOPPING);
695 if (walrcv->walRcvState == WALRCV_RESTARTING)
696 {
697 /*
698 * No need to handle changes in primary_conninfo or
699 * primary_slot_name here. Startup process will signal us to
700 * terminate in case those change.
701 */
702 *startpoint = walrcv->receiveStart;
703 *startpointTLI = walrcv->receiveStartTLI;
704 walrcv->walRcvState = WALRCV_CONNECTING;
705 SpinLockRelease(&walrcv->mutex);
706 break;
707 }
708 if (walrcv->walRcvState == WALRCV_STOPPING)
709 {
710 /*
711 * We should've received SIGTERM if the startup process wants us
712 * to die, but might as well check it here too.
713 */
714 SpinLockRelease(&walrcv->mutex);
715 exit(1);
716 }
717 SpinLockRelease(&walrcv->mutex);
718
721 }
722
724 {
725 char activitymsg[50];
726
727 snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%08X",
728 LSN_FORMAT_ARGS(*startpoint));
730 }
731}
#define FATAL
Definition elog.h:41
struct Latch * MyLatch
Definition globals.c:63
void proc_exit(int code)
Definition ipc.c:105
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
#define snprintf
Definition port.h:260
bool update_process_title
Definition ps_status.c:31
static void set_ps_display(const char *activity)
Definition ps_status.h:40
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References Assert, CHECK_FOR_INTERRUPTS, elog, FATAL, fb(), InvalidXLogRecPtr, LSN_FORMAT_ARGS, MyLatch, proc_exit(), ResetLatch(), set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_CONNECTING, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by WalReceiverMain().

◆ WalReceiverMain()

void WalReceiverMain ( const void startup_data,
size_t  startup_data_len 
)

Definition at line 153 of file walreceiver.c.

154{
155 char conninfo[MAXCONNINFO];
156 char *tmp_conninfo;
157 char slotname[NAMEDATALEN];
158 bool is_temp_slot;
159 XLogRecPtr startpoint;
160 TimeLineID startpointTLI;
162 bool first_stream;
165 char *err;
166 char *sender_host = NULL;
167 int sender_port = 0;
168 char *appname;
169
171
174
175 /*
176 * WalRcv should be set up already (if we are a backend, we inherit this
177 * by fork() or EXEC_BACKEND mechanism from the postmaster).
178 */
179 walrcv = WalRcv;
180 Assert(walrcv != NULL);
181
182 /*
183 * Mark walreceiver as running in shared memory.
184 *
185 * Do this as early as possible, so that if we fail later on, we'll set
186 * state to STOPPED. If we die before this, the startup process will keep
187 * waiting for us to start up, until it times out.
188 */
189 SpinLockAcquire(&walrcv->mutex);
190 Assert(walrcv->pid == 0);
191 switch (walrcv->walRcvState)
192 {
193 case WALRCV_STOPPING:
194 /* If we've already been requested to stop, don't start up. */
195 walrcv->walRcvState = WALRCV_STOPPED;
196 /* fall through */
197
198 case WALRCV_STOPPED:
199 SpinLockRelease(&walrcv->mutex);
200 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
201 proc_exit(1);
202 break;
203
204 case WALRCV_STARTING:
205 /* The usual case */
206 break;
207
209 case WALRCV_WAITING:
210 case WALRCV_STREAMING:
212 default:
213 /* Shouldn't happen */
214 SpinLockRelease(&walrcv->mutex);
215 elog(PANIC, "walreceiver still running according to shared memory state");
216 }
217 /* Advertise our PID so that the startup process can kill us */
218 walrcv->pid = MyProcPid;
219 walrcv->walRcvState = WALRCV_CONNECTING;
220
221 /* Fetch information required to start streaming */
222 walrcv->ready_to_display = false;
223 strlcpy(conninfo, walrcv->conninfo, MAXCONNINFO);
224 strlcpy(slotname, walrcv->slotname, NAMEDATALEN);
225 is_temp_slot = walrcv->is_temp_slot;
226 startpoint = walrcv->receiveStart;
227 startpointTLI = walrcv->receiveStartTLI;
228
229 /*
230 * At most one of is_temp_slot and slotname can be set; otherwise,
231 * RequestXLogStreaming messed up.
232 */
233 Assert(!is_temp_slot || (slotname[0] == '\0'));
234
235 /* Initialise to a sanish value */
237 walrcv->lastMsgSendTime =
238 walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
239
240 /* Report our proc number so that others can wake us up */
241 walrcv->procno = MyProcNumber;
242
243 SpinLockRelease(&walrcv->mutex);
244
246
247 /* Arrange to clean up at walreceiver exit */
248 on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
249
250 /* Properly accept or ignore signals the postmaster might send us */
251 pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
252 * file */
254 pqsignal(SIGTERM, die); /* request shutdown */
255 /* SIGQUIT handler was already set up by InitPostmasterChild */
260
261 /* Reset some signals that are accepted by postmaster but not here */
263
264 /* Load the libpq-specific functions */
265 load_file("libpqwalreceiver", false);
267 elog(ERROR, "libpqwalreceiver didn't initialize correctly");
268
269 /* Unblock signals (they were blocked when the postmaster forked us) */
271
272 /* Establish the connection to the primary for XLOG streaming */
273 appname = cluster_name[0] ? cluster_name : "walreceiver";
274 wrconn = walrcv_connect(conninfo, true, false, false, appname, &err);
275 if (!wrconn)
278 errmsg("streaming replication receiver \"%s\" could not connect to the primary server: %s",
279 appname, err)));
280
281 /*
282 * Save user-visible connection string. This clobbers the original
283 * conninfo, for security. Also save host and port of the sender server
284 * this walreceiver is connected to.
285 */
287 walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
288 SpinLockAcquire(&walrcv->mutex);
289 memset(walrcv->conninfo, 0, MAXCONNINFO);
290 if (tmp_conninfo)
292
293 memset(walrcv->sender_host, 0, NI_MAXHOST);
294 if (sender_host)
295 strlcpy(walrcv->sender_host, sender_host, NI_MAXHOST);
296
297 walrcv->sender_port = sender_port;
298 walrcv->ready_to_display = true;
299 SpinLockRelease(&walrcv->mutex);
300
301 if (tmp_conninfo)
303
304 if (sender_host)
305 pfree(sender_host);
306
307 first_stream = true;
308 for (;;)
309 {
310 char *primary_sysid;
311 char standby_sysid[32];
313
314 /*
315 * Check that we're connected to a valid server using the
316 * IDENTIFY_SYSTEM replication command.
317 */
319
323 {
326 errmsg("database system identifier differs between the primary and standby"),
327 errdetail("The primary's identifier is %s, the standby's identifier is %s.",
329 }
330
331 /*
332 * Confirm that the current timeline of the primary is the same or
333 * ahead of ours.
334 */
335 if (primaryTLI < startpointTLI)
338 errmsg("highest timeline %u of the primary is behind recovery timeline %u",
339 primaryTLI, startpointTLI)));
340
341 /*
342 * Get any missing history files. We do this always, even when we're
343 * not interested in that timeline, so that if we're promoted to
344 * become the primary later on, we don't select the same timeline that
345 * was already used in the current primary. This isn't bullet-proof -
346 * you'll need some external software to manage your cluster if you
347 * need to ensure that a unique timeline id is chosen in every case,
348 * but let's avoid the confusion of timeline id collisions where we
349 * can.
350 */
352
353 /*
354 * Create temporary replication slot if requested, and update slot
355 * name in shared memory. (Note the slot name cannot already be set
356 * in this case.)
357 */
358 if (is_temp_slot)
359 {
360 snprintf(slotname, sizeof(slotname),
361 "pg_walreceiver_%lld",
362 (long long int) walrcv_get_backend_pid(wrconn));
363
364 walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL);
365
366 SpinLockAcquire(&walrcv->mutex);
367 strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
368 SpinLockRelease(&walrcv->mutex);
369 }
370
371 /*
372 * Start streaming.
373 *
374 * We'll try to start at the requested starting point and timeline,
375 * even if it's different from the server's latest timeline. In case
376 * we've already reached the end of the old timeline, the server will
377 * finish the streaming immediately, and we will go back to await
378 * orders from the startup process. If recovery_target_timeline is
379 * 'latest', the startup process will scan pg_wal and find the new
380 * history file, bump recovery target timeline, and ask us to restart
381 * on the new timeline.
382 */
383 options.logical = false;
384 options.startpoint = startpoint;
385 options.slotname = slotname[0] != '\0' ? slotname : NULL;
386 options.proto.physical.startpointTLI = startpointTLI;
388 {
389 if (first_stream)
390 ereport(LOG,
391 errmsg("started streaming WAL from primary at %X/%08X on timeline %u",
392 LSN_FORMAT_ARGS(startpoint), startpointTLI));
393 else
394 ereport(LOG,
395 errmsg("restarted WAL streaming at %X/%08X on timeline %u",
396 LSN_FORMAT_ARGS(startpoint), startpointTLI));
397 first_stream = false;
398
399 /*
400 * Switch to STREAMING after a successful connection if current
401 * state is CONNECTING. This switch happens after an initial
402 * startup, or after a restart as determined by
403 * WalRcvWaitForStartPosition().
404 */
405 SpinLockAcquire(&walrcv->mutex);
406 if (walrcv->walRcvState == WALRCV_CONNECTING)
407 walrcv->walRcvState = WALRCV_STREAMING;
408 SpinLockRelease(&walrcv->mutex);
409
410 /* Initialize LogstreamResult and buffers for processing messages */
413
414 /* Initialize nap wakeup times. */
416 for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
418
419 /* Send initial reply/feedback messages. */
420 XLogWalRcvSendReply(true, false);
422
423 /* Loop until end-of-streaming or error */
424 for (;;)
425 {
426 char *buf;
427 int len;
428 bool endofwal = false;
430 int rc;
432 long nap;
433
434 /*
435 * Exit walreceiver if we're not in recovery. This should not
436 * happen, but cross-check the status here.
437 */
438 if (!RecoveryInProgress())
441 errmsg("cannot continue WAL streaming, recovery has already ended")));
442
443 /* Process any requests or signals received recently */
445
447 {
448 ConfigReloadPending = false;
450 /* recompute wakeup times */
452 for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
455 }
456
457 /* See if we can read data immediately */
459 if (len != 0)
460 {
461 /*
462 * Process the received data, and any subsequent data we
463 * can read without blocking.
464 */
465 for (;;)
466 {
467 if (len > 0)
468 {
469 /*
470 * Something was received from primary, so adjust
471 * the ping and terminate wakeup times.
472 */
475 now);
477 XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
478 startpointTLI);
479 }
480 else if (len == 0)
481 break;
482 else if (len < 0)
483 {
484 ereport(LOG,
485 (errmsg("replication terminated by primary server"),
486 errdetail("End of WAL reached on timeline %u at %X/%08X.",
487 startpointTLI,
489 endofwal = true;
490 break;
491 }
493 }
494
495 /* Let the primary know that we received some data. */
496 XLogWalRcvSendReply(false, false);
497
498 /*
499 * If we've written some records, flush them to disk and
500 * let the startup process and primary server know about
501 * them.
502 */
503 XLogWalRcvFlush(false, startpointTLI);
504 }
505
506 /* Check if we need to exit the streaming loop. */
507 if (endofwal)
508 break;
509
510 /* Find the soonest wakeup time, to limit our nap. */
512 for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
514
515 /* Calculate the nap time, clamping as necessary. */
518
519 /*
520 * Ideally we would reuse a WaitEventSet object repeatedly
521 * here to avoid the overheads of WaitLatchOrSocket on epoll
522 * systems, but we can't be sure that libpq (or any other
523 * walreceiver implementation) has the same socket (even if
524 * the fd is the same number, it may have been closed and
525 * reopened since the last time). In future, if there is a
526 * function for removing sockets from WaitEventSet, then we
527 * could add and remove just the socket each time, potentially
528 * avoiding some system calls.
529 */
534 wait_fd,
535 nap,
537 if (rc & WL_LATCH_SET)
538 {
541
542 if (walrcv->force_reply)
543 {
544 /*
545 * The recovery process has asked us to send apply
546 * feedback now. Make sure the flag is really set to
547 * false in shared memory before sending the reply, so
548 * we don't miss a new request for a reply.
549 */
550 walrcv->force_reply = false;
552 XLogWalRcvSendReply(true, false);
553 }
554 }
555 if (rc & WL_TIMEOUT)
556 {
557 /*
558 * We didn't receive anything new. If we haven't heard
559 * anything from the server for more than
560 * wal_receiver_timeout / 2, ping the server. Also, if
561 * it's been longer than wal_receiver_status_interval
562 * since the last update we sent, send a status update to
563 * the primary anyway, to report any progress in applying
564 * WAL.
565 */
566 bool requestReply = false;
567
568 /*
569 * Report pending statistics to the cumulative stats
570 * system. This location is useful for the report as it
571 * is not within a tight loop in the WAL receiver, to
572 * avoid bloating pgstats with requests, while also making
573 * sure that the reports happen each time a status update
574 * is sent.
575 */
576 pgstat_report_wal(false);
577
578 /*
579 * Check if time since last receive from primary has
580 * reached the configured limit.
581 */
586 errmsg("terminating walreceiver due to timeout")));
587
588 /*
589 * If we didn't receive anything new for half of receiver
590 * replication timeout, then ping the server.
591 */
593 {
594 requestReply = true;
596 }
597
600 }
601 }
602
603 /*
604 * The backend finished streaming. Exit streaming COPY-mode from
605 * our side, too.
606 */
608
609 /*
610 * If the server had switched to a new timeline that we didn't
611 * know about when we began streaming, fetch its timeline history
612 * file now.
613 */
615 }
616 else
617 ereport(LOG,
618 (errmsg("primary server contains no more WAL on requested timeline %u",
619 startpointTLI)));
620
621 /*
622 * End of WAL reached on the requested timeline. Close the last
623 * segment, and await for new orders from the startup process.
624 */
625 if (recvFile >= 0)
626 {
628
629 XLogWalRcvFlush(false, startpointTLI);
631 if (close(recvFile) != 0)
634 errmsg("could not close WAL segment %s: %m",
635 xlogfname)));
636
637 /*
638 * Create .done file forcibly to prevent the streamed segment from
639 * being archived later.
640 */
643 else
645 }
646 recvFile = -1;
647
648 elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
649 WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
650 }
651 /* not reached */
652}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:485
#define pg_memory_barrier()
Definition atomics.h:141
void AuxiliaryProcessMainCommon(void)
Definition auxprocess.c:39
sigset_t UnBlockSig
Definition pqsignal.c:22
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1757
#define Min(x, y)
Definition c.h:997
#define UINT64_FORMAT
Definition c.h:565
void load_file(const char *filename, bool restricted)
Definition dfmgr.c:149
int errcode_for_file_access(void)
Definition elog.c:886
int errdetail(const char *fmt,...)
Definition elog.c:1216
#define PANIC
Definition elog.h:42
#define DEBUG1
Definition elog.h:30
void err(int eval, const char *fmt,...)
Definition err.c:43
ProcNumber MyProcNumber
Definition globals.c:90
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
@ PGC_SIGHUP
Definition guc.h:75
char * cluster_name
Definition guc_tables.c:555
#define close(a)
Definition win32.h:12
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
int i
Definition isn.c:77
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition latch.c:223
@ B_WAL_RECEIVER
Definition miscadmin.h:366
BackendType MyBackendType
Definition miscinit.c:64
static char buf[DEFAULT_XLOG_SEG_SIZE]
#define die(msg)
void pgstat_report_wal(bool force)
Definition pgstat_wal.c:46
#define pqsignal
Definition port.h:547
int pgsocket
Definition port.h:29
#define PGINVALID_SOCKET
Definition port.h:31
static Datum PointerGetDatum(const void *X)
Definition postgres.h:352
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition procsignal.c:677
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
#define WL_SOCKET_READABLE
#define WL_TIMEOUT
#define NUM_WALRCV_WAKEUPS
static StringInfoData reply_message
static int recvFile
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
static TimeLineID recvFileTLI
WalReceiverFunctionsType * WalReceiverFunctions
Definition walreceiver.c:95
static XLogSegNo recvSegNo
static void XLogWalRcvSendHSFeedback(bool immed)
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
static void WalRcvDie(int code, Datum arg)
static void XLogWalRcvSendReply(bool force, bool requestReply)
static struct @19 LogstreamResult
#define walrcv_startstreaming(conn, options)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
#define walrcv_get_conninfo(conn)
#define walrcv_endstreaming(conn, next_tli)
#define walrcv_identify_system(conn, primary_tli)
#define walrcv_get_backend_pid(conn)
#define walrcv_receive(conn, buffer, wait_fd)
#define SIGCHLD
Definition win32_port.h:168
#define SIGHUP
Definition win32_port.h:158
#define SIGPIPE
Definition win32_port.h:163
#define SIGUSR1
Definition win32_port.h:170
#define SIGALRM
Definition win32_port.h:164
#define SIGUSR2
Definition win32_port.h:171
uint64 GetSystemIdentifier(void)
Definition xlog.c:4627
bool RecoveryInProgress(void)
Definition xlog.c:6460
int wal_segment_size
Definition xlog.c:146
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References ARCHIVE_MODE_ALWAYS, Assert, AuxiliaryProcessMainCommon(), B_WAL_RECEIVER, buf, CHECK_FOR_INTERRUPTS, close, cluster_name, ConditionVariableBroadcast(), ConfigReloadPending, DEBUG1, die, elog, ereport, err(), errcode(), errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, fb(), GetCurrentTimestamp(), GetSystemIdentifier(), GetXLogReplayRecPtr(), i, initStringInfo(), len, load_file(), LOG, LogstreamResult, LSN_FORMAT_ARGS, MAXCONNINFO, MAXFNAMELEN, Min, MyBackendType, MyLatch, MyProcNumber, MyProcPid, NAMEDATALEN, now(), NUM_WALRCV_WAKEUPS, on_shmem_exit(), PANIC, pfree(), pg_atomic_write_u64(), pg_memory_barrier, PGC_SIGHUP, PGINVALID_SOCKET, pgstat_report_wal(), PointerGetDatum(), pqsignal, proc_exit(), ProcessConfigFile(), procsignal_sigusr1_handler(), RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, reply_message, ResetLatch(), SIGALRM, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, snprintf, SpinLockAcquire, SpinLockRelease, strlcpy(), TIMESTAMP_INFINITY, TimestampDifferenceMilliseconds(), UINT64_FORMAT, UnBlockSig, WaitLatchOrSocket(), wakeup, wal_segment_size, WalRcv, walrcv_connect, WALRCV_CONNECTING, walrcv_create_slot, walrcv_endstreaming, walrcv_get_backend_pid, walrcv_get_conninfo, walrcv_get_senderinfo, walrcv_identify_system, walrcv_receive, WALRCV_RESTARTING, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_TERMINATE, WalRcvComputeNextWakeup(), WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvWaitForStartPosition(), WalReceiverFunctions, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_TIMEOUT, wrconn, WalRcvData::writtenUpto, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName(), XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

◆ XLogWalRcvClose()

static void XLogWalRcvClose ( XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 1071 of file walreceiver.c.

1072{
1073 char xlogfname[MAXFNAMELEN];
1074
1076 Assert(tli != 0);
1077
1078 /*
1079 * fsync() and close current file before we switch to next one. We would
1080 * otherwise have to reopen this file to fsync it later
1081 */
1082 XLogWalRcvFlush(false, tli);
1083
1085
1086 /*
1087 * XLOG segment files will be re-read by recovery in startup process soon,
1088 * so we don't advise the OS to release cache pages associated with the
1089 * file like XLogFileClose() does.
1090 */
1091 if (close(recvFile) != 0)
1092 ereport(PANIC,
1094 errmsg("could not close WAL segment %s: %m",
1095 xlogfname)));
1096
1097 /*
1098 * Create .done file forcibly to prevent the streamed segment from being
1099 * archived later.
1100 */
1103 else
1105
1106 recvFile = -1;
1107}
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

References ARCHIVE_MODE_ALWAYS, Assert, close, ereport, errcode_for_file_access(), errmsg(), fb(), MAXFNAMELEN, PANIC, recvFile, recvFileTLI, recvSegNo, wal_segment_size, XLByteInSeg, XLogArchiveForceDone(), XLogArchiveMode, XLogArchiveNotify(), XLogFileName(), and XLogWalRcvFlush().

Referenced by XLogWalRcvWrite().

◆ XLogWalRcvFlush()

static void XLogWalRcvFlush ( bool  dying,
TimeLineID  tli 
)
static

Definition at line 1007 of file walreceiver.c.

1008{
1009 Assert(tli != 0);
1010
1011 if (LogstreamResult.Flush < LogstreamResult.Write)
1012 {
1014
1016
1017 LogstreamResult.Flush = LogstreamResult.Write;
1018
1019 /* Update shared-memory status */
1020 SpinLockAcquire(&walrcv->mutex);
1021 if (walrcv->flushedUpto < LogstreamResult.Flush)
1022 {
1023 walrcv->latestChunkStart = walrcv->flushedUpto;
1024 walrcv->flushedUpto = LogstreamResult.Flush;
1025 walrcv->receivedTLI = tli;
1026 }
1027 SpinLockRelease(&walrcv->mutex);
1028
1029 /*
1030 * If we flushed an LSN that someone was waiting for, notify the
1031 * waiters.
1032 */
1033 if (waitLSNState &&
1034 (LogstreamResult.Flush >=
1037
1038 /* Signal the startup process and walsender that new WAL has arrived */
1041 WalSndWakeup(true, false);
1042
1043 /* Report XLOG streaming progress in PS display */
1045 {
1046 char activitymsg[50];
1047
1048 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%08X",
1051 }
1052
1053 /* Also let the primary know that we made some progress */
1054 if (!dying)
1055 {
1056 XLogWalRcvSendReply(false, false);
1058 }
1059 }
1060}
pg_atomic_uint64 minWaitedLSN[WAIT_LSN_TYPE_COUNT]
Definition xlogwait.h:85
#define AllowCascadeReplication()
Definition walreceiver.h:40
void WalSndWakeup(bool physical, bool logical)
Definition walsender.c:3799
void issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli)
Definition xlog.c:8858
struct WaitLSNState * waitLSNState
Definition xlogwait.c:68
void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
Definition xlogwait.c:317
@ WAIT_LSN_TYPE_STANDBY_FLUSH
Definition xlogwait.h:41

References AllowCascadeReplication, Assert, fb(), issue_xlog_fsync(), LogstreamResult, LSN_FORMAT_ARGS, WaitLSNState::minWaitedLSN, pg_atomic_read_u64(), recvFile, recvSegNo, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, update_process_title, WAIT_LSN_TYPE_STANDBY_FLUSH, waitLSNState, WaitLSNWakeup(), WakeupRecovery(), WalRcv, WalSndWakeup(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvClose().

◆ XLogWalRcvProcessMsg()

static void XLogWalRcvProcessMsg ( unsigned char  type,
char buf,
Size  len,
TimeLineID  tli 
)
static

Definition at line 833 of file walreceiver.c.

834{
835 int hdrlen;
839 bool replyRequested;
840
841 switch (type)
842 {
844 {
846
847 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
848 if (len < hdrlen)
851 errmsg_internal("invalid WAL message received from primary")));
852
853 /* initialize a StringInfo with the given buffer */
855
856 /* read the fields */
861
862 buf += hdrlen;
863 len -= hdrlen;
865 break;
866 }
868 {
870
871 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
872 if (len != hdrlen)
875 errmsg_internal("invalid keepalive message received from primary")));
876
877 /* initialize a StringInfo with the given buffer */
879
880 /* read the fields */
884
886
887 /* If the primary requested a reply, send one immediately */
888 if (replyRequested)
889 XLogWalRcvSendReply(true, false);
890 break;
891 }
892 default:
895 errmsg_internal("invalid replication message type %d",
896 type)));
897 }
898}
int64_t int64
Definition c.h:543
int pq_getmsgbyte(StringInfo msg)
Definition pqformat.c:398
int64 pq_getmsgint64(StringInfo msg)
Definition pqformat.c:452
#define PqReplMsg_WALData
Definition protocol.h:77
#define PqReplMsg_Keepalive
Definition protocol.h:75
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition stringinfo.h:157
const char * type
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)

References buf, ereport, errcode(), errmsg_internal(), ERROR, fb(), initReadOnlyStringInfo(), len, pq_getmsgbyte(), pq_getmsgint64(), PqReplMsg_Keepalive, PqReplMsg_WALData, ProcessWalSndrMessage(), type, XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

◆ XLogWalRcvSendHSFeedback()

static void XLogWalRcvSendHSFeedback ( bool  immed)
static

Definition at line 1192 of file walreceiver.c.

1193{
1196 TransactionId nextXid;
1199 TransactionId xmin,
1200 catalog_xmin;
1201
1202 /* initially true so we always send at least one feedback message */
1203 static bool primary_has_standby_xmin = true;
1204
1205 /*
1206 * If the user doesn't want status to be reported to the primary, be sure
1207 * to exit before doing anything at all.
1208 */
1211 return;
1212
1213 /* Get current timestamp. */
1215
1216 /* Send feedback at most once per wal_receiver_status_interval. */
1218 return;
1219
1220 /* Make sure we wake up when it's time to send feedback again. */
1222
1223 /*
1224 * If Hot Standby is not yet accepting connections there is nothing to
1225 * send. Check this after the interval has expired to reduce number of
1226 * calls.
1227 *
1228 * Bailing out here also ensures that we don't send feedback until we've
1229 * read our own replication slot state, so we don't tell the primary to
1230 * discard needed xmin or catalog_xmin from any slots that may exist on
1231 * this replica.
1232 */
1233 if (!HotStandbyActive())
1234 return;
1235
1236 /*
1237 * Make the expensive call to get the oldest xmin once we are certain
1238 * everything else has been checked.
1239 */
1241 {
1242 GetReplicationHorizons(&xmin, &catalog_xmin);
1243 }
1244 else
1245 {
1246 xmin = InvalidTransactionId;
1247 catalog_xmin = InvalidTransactionId;
1248 }
1249
1250 /*
1251 * Get epoch and adjust if nextXid and oldestXmin are different sides of
1252 * the epoch boundary.
1253 */
1258 if (nextXid < xmin)
1259 xmin_epoch--;
1260 if (nextXid < catalog_xmin)
1262
1263 elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1264 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1265
1266 /* Construct the message and send it. */
1272 pq_sendint32(&reply_message, catalog_xmin);
1275 if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1277 else
1279}
uint32_t uint32
Definition c.h:546
uint32 TransactionId
Definition c.h:666
static void pq_sendint32(StringInfo buf, uint32 i)
Definition pqformat.h:144
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition pqformat.h:152
void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin)
Definition procarray.c:1997
#define PqReplMsg_HotStandbyFeedback
Definition protocol.h:82
void resetStringInfo(StringInfo str)
Definition stringinfo.c:126
#define InvalidTransactionId
Definition transam.h:31
#define EpochFromFullTransactionId(x)
Definition transam.h:47
#define XidFromFullTransactionId(x)
Definition transam.h:48
#define TransactionIdIsValid(xid)
Definition transam.h:41
FullTransactionId ReadNextFullTransactionId(void)
Definition varsup.c:288
#define walrcv_send(conn, buffer, nbytes)
bool HotStandbyActive(void)

References StringInfoData::data, DEBUG2, elog, EpochFromFullTransactionId, fb(), GetCurrentTimestamp(), GetReplicationHorizons(), hot_standby_feedback, HotStandbyActive(), InvalidTransactionId, StringInfoData::len, now(), pq_sendbyte(), pq_sendint32(), pq_sendint64(), PqReplMsg_HotStandbyFeedback, ReadNextFullTransactionId(), reply_message, resetStringInfo(), TransactionIdIsValid, wakeup, wal_receiver_status_interval, walrcv_send, WALRCV_WAKEUP_HSFEEDBACK, WalRcvComputeNextWakeup(), wrconn, and XidFromFullTransactionId.

Referenced by WalReceiverMain(), and XLogWalRcvFlush().

◆ XLogWalRcvSendReply()

static void XLogWalRcvSendReply ( bool  force,
bool  requestReply 
)
static

Definition at line 1123 of file walreceiver.c.

1124{
1125 static XLogRecPtr writePtr = 0;
1126 static XLogRecPtr flushPtr = 0;
1129
1130 /*
1131 * If the user doesn't want status to be reported to the primary, be sure
1132 * to exit before doing anything at all.
1133 */
1134 if (!force && wal_receiver_status_interval <= 0)
1135 return;
1136
1137 /* Get current timestamp. */
1139
1140 /*
1141 * We can compare the write and flush positions to the last message we
1142 * sent without taking any lock, but the apply position requires a spin
1143 * lock, so we don't check that unless something else has changed or 10
1144 * seconds have passed. This means that the apply WAL location will
1145 * appear, from the primary's point of view, to lag slightly, but since
1146 * this is only for reporting purposes and only on idle systems, that's
1147 * probably OK.
1148 */
1149 if (!force
1150 && writePtr == LogstreamResult.Write
1151 && flushPtr == LogstreamResult.Flush
1153 return;
1154
1155 /* Make sure we wake up when it's time to send another reply. */
1157
1158 /* Construct a new message */
1159 writePtr = LogstreamResult.Write;
1160 flushPtr = LogstreamResult.Flush;
1162
1170
1171 /* Send it */
1172 elog(DEBUG2, "sending write %X/%08X flush %X/%08X apply %X/%08X%s",
1176 requestReply ? " (reply requested)" : "");
1177
1179}
#define PqReplMsg_StandbyStatusUpdate
Definition protocol.h:84

References StringInfoData::data, DEBUG2, elog, fb(), GetCurrentTimestamp(), GetXLogReplayRecPtr(), StringInfoData::len, LogstreamResult, LSN_FORMAT_ARGS, now(), pq_sendbyte(), pq_sendint64(), PqReplMsg_StandbyStatusUpdate, reply_message, resetStringInfo(), wakeup, wal_receiver_status_interval, walrcv_send, WALRCV_WAKEUP_REPLY, WalRcvComputeNextWakeup(), and wrconn.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

◆ XLogWalRcvWrite()

static void XLogWalRcvWrite ( char buf,
Size  nbytes,
XLogRecPtr  recptr,
TimeLineID  tli 
)
static

Definition at line 904 of file walreceiver.c.

905{
906 int startoff;
907 int byteswritten;
909
910 Assert(tli != 0);
911
912 while (nbytes > 0)
913 {
914 int segbytes;
915
916 /* Close the current segment if it's completed */
919
920 if (recvFile < 0)
921 {
922 /* Create/use new log file */
925 recvFileTLI = tli;
926 }
927
928 /* Calculate the start offset of the received logs */
930
931 if (startoff + nbytes > wal_segment_size)
933 else
934 segbytes = nbytes;
935
936 /* OK to write the logs */
937 errno = 0;
938
939 /*
940 * Measure I/O timing to write WAL data, for pg_stat_io.
941 */
943
947
950
951 if (byteswritten <= 0)
952 {
954 int save_errno;
955
956 /* if write didn't set errno, assume no disk space */
957 if (errno == 0)
958 errno = ENOSPC;
959
965 errmsg("could not write to WAL segment %s "
966 "at offset %d, length %d: %m",
968 }
969
970 /* Update state for write */
972
973 nbytes -= byteswritten;
974 buf += byteswritten;
975
976 LogstreamResult.Write = recptr;
977 }
978
979 /* Update shared-memory status */
981
982 /*
983 * If we wrote an LSN that someone was waiting for, notify the waiters.
984 */
985 if (waitLSNState &&
986 (LogstreamResult.Write >=
989
990 /*
991 * Close the current segment if it's fully written up in the last cycle of
992 * the loop, to create its archive notification file soon. Otherwise WAL
993 * archiving of the segment will be delayed until any data in the next
994 * segment is received and written.
995 */
998}
return str start
@ IOOBJECT_WAL
Definition pgstat.h:279
@ IOCONTEXT_NORMAL
Definition pgstat.h:289
@ IOOP_WRITE
Definition pgstat.h:316
instr_time pgstat_prepare_io_time(bool track_io_guc)
Definition pgstat_io.c:91
void pgstat_count_io_op_time(IOObject io_object, IOContext io_context, IOOp io_op, instr_time start_time, uint32 cnt, uint64 bytes)
Definition pgstat_io.c:122
#define pg_pwrite
Definition port.h:248
off_t pgoff_t
Definition port.h:421
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition wait_event.h:69
static void pgstat_report_wait_end(void)
Definition wait_event.h:85
static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
int XLogFileInit(XLogSegNo logsegno, TimeLineID logtli)
Definition xlog.c:3417
bool track_wal_io_timing
Definition xlog.c:140
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
@ WAIT_LSN_TYPE_STANDBY_WRITE
Definition xlogwait.h:40

References Assert, buf, ereport, errcode_for_file_access(), errmsg(), fb(), IOCONTEXT_NORMAL, IOOBJECT_WAL, IOOP_WRITE, LogstreamResult, MAXFNAMELEN, WaitLSNState::minWaitedLSN, PANIC, pg_atomic_read_u64(), pg_atomic_write_u64(), pg_pwrite, pgstat_count_io_op_time(), pgstat_prepare_io_time(), pgstat_report_wait_end(), pgstat_report_wait_start(), recvFile, recvFileTLI, recvSegNo, start, track_wal_io_timing, WAIT_LSN_TYPE_STANDBY_WRITE, waitLSNState, WaitLSNWakeup(), wal_segment_size, WalRcv, WalRcvData::writtenUpto, XLByteInSeg, XLByteToSeg, XLogFileInit(), XLogFileName(), XLogSegmentOffset, and XLogWalRcvClose().

Referenced by XLogWalRcvProcessMsg().

Variable Documentation

◆ Flush

XLogRecPtr Flush

Definition at line 113 of file walreceiver.c.

Referenced by XLogWrite().

◆ hot_standby_feedback

bool hot_standby_feedback

◆ [struct]

struct { ... } LogstreamResult

◆ recvFile

int recvFile = -1
static

Definition at line 102 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ recvFileTLI

TimeLineID recvFileTLI = 0
static

Definition at line 103 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), and XLogWalRcvWrite().

◆ recvSegNo

XLogSegNo recvSegNo = 0
static

Definition at line 104 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvClose(), XLogWalRcvFlush(), and XLogWalRcvWrite().

◆ reply_message

StringInfoData reply_message
static

◆ wakeup

◆ wal_receiver_status_interval

◆ wal_receiver_timeout

int wal_receiver_timeout

◆ WalReceiverFunctions

WalReceiverFunctionsType* WalReceiverFunctions = NULL

Definition at line 95 of file walreceiver.c.

Referenced by _PG_init(), and WalReceiverMain().

◆ wrconn

◆ Write

XLogRecPtr Write

Definition at line 112 of file walreceiver.c.

Referenced by XLogWrite().