125#define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1)
166 char *sender_host = NULL;
170 Assert(startup_data_len == 0);
214 elog(
PANIC,
"walreceiver still running according to shared memory state");
232 Assert(!is_temp_slot || (slotname[0] ==
'\0'));
266 elog(
ERROR,
"libpqwalreceiver didn't initialize correctly");
276 (
errcode(ERRCODE_CONNECTION_FAILURE),
277 errmsg(
"streaming replication receiver \"%s\" could not connect to the primary server: %s",
310 char standby_sysid[32];
321 if (strcmp(primary_sysid, standby_sysid) != 0)
324 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
325 errmsg(
"database system identifier differs between the primary and standby"),
326 errdetail(
"The primary's identifier is %s, the standby's identifier is %s.",
327 primary_sysid, standby_sysid)));
334 if (primaryTLI < startpointTLI)
336 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
337 errmsg(
"highest timeline %u of the primary is behind recovery timeline %u",
338 primaryTLI, startpointTLI)));
359 snprintf(slotname,
sizeof(slotname),
360 "pg_walreceiver_%lld",
383 options.startpoint = startpoint;
384 options.slotname = slotname[0] !=
'\0' ? slotname : NULL;
385 options.proto.physical.startpointTLI = startpointTLI;
390 errmsg(
"started streaming WAL from primary at %X/%08X on timeline %u",
394 errmsg(
"restarted WAL streaming at %X/%08X on timeline %u",
396 first_stream =
false;
416 bool endofwal =
false;
428 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
429 errmsg(
"cannot continue WAL streaming, recovery has already ended")));
473 (
errmsg(
"replication terminated by primary server"),
474 errdetail(
"End of WAL reached on timeline %u at %X/%08X.",
524 WAIT_EVENT_WAL_RECEIVER_MAIN);
554 bool requestReply =
false;
573 (
errcode(ERRCODE_CONNECTION_FAILURE),
574 errmsg(
"terminating walreceiver due to timeout")));
606 (
errmsg(
"primary server contains no more WAL on requested timeline %u",
622 errmsg(
"could not close WAL segment %s: %m",
636 elog(
DEBUG1,
"walreceiver ended streaming and awaits new instructions");
659 elog(
FATAL,
"unexpected walreceiver state");
708 WAIT_EVENT_WAL_RECEIVER_WAIT_START);
713 char activitymsg[50];
715 snprintf(activitymsg,
sizeof(activitymsg),
"restarting at %X/%08X",
730 for (tli = first; tli <= last; tli++)
741 (
errmsg(
"fetching timeline history file for timeline %u from primary server",
752 if (strcmp(fname, expectedfname) != 0)
754 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
755 errmsg_internal(
"primary reported unexpected file name for timeline history file of timeline %u",
787 Assert(*startpointTLI_p != 0);
837 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
858 hdrlen =
sizeof(
int64) +
sizeof(
int64) +
sizeof(char);
861 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
881 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
938 if (byteswritten <= 0)
952 errmsg(
"could not write to WAL segment %s "
953 "at offset %d, length %d: %m",
954 xlogfname, startoff, segbytes)));
958 recptr += byteswritten;
960 nbytes -= byteswritten;
1033 char activitymsg[50];
1035 snprintf(activitymsg,
sizeof(activitymsg),
"streaming %X/%08X",
1081 errmsg(
"could not close WAL segment %s: %m",
1159 elog(
DEBUG2,
"sending write %X/%08X flush %X/%08X apply %X/%08X%s",
1163 requestReply ?
" (reply requested)" :
"");
1190 static bool primary_has_standby_xmin =
true;
1197 !primary_has_standby_xmin)
1244 catalog_xmin_epoch = xmin_epoch;
1247 if (nextXid < catalog_xmin)
1248 catalog_xmin_epoch--;
1250 elog(
DEBUG2,
"sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1251 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1263 primary_has_standby_xmin =
true;
1265 primary_has_standby_xmin =
false;
1301 if (applyDelay == -1)
1302 elog(
DEBUG2,
"sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1307 elog(
DEBUG2,
"sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1399 return "restarting";
1417 bool ready_to_display;
1428 char sender_host[NI_MAXHOST];
1429 int sender_port = 0;
1456 if (pid == 0 || !ready_to_display)
1469 elog(
ERROR,
"return type must be a row type");
1484 memset(&nulls[1],
true,
sizeof(
bool) * (tupdesc->
natts - 1));
1504 if (last_send_time == 0)
1508 if (last_receipt_time == 0)
1516 if (latest_end_time == 0)
1520 if (*slotname ==
'\0')
1524 if (*sender_host ==
'\0')
1528 if (sender_port == 0)
1532 if (*conninfo ==
'\0')
bool has_privs_of_role(Oid member, Oid role)
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
#define pg_memory_barrier()
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
void AuxiliaryProcessMainCommon(void)
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
bool existsTimeLineHistory(TimeLineID probeTLI)
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
TimestampTz GetCurrentTimestamp(void)
const char * timestamptz_to_str(TimestampTz t)
Datum now(PG_FUNCTION_ARGS)
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
void ConditionVariableBroadcast(ConditionVariable *cv)
#define TIMESTAMP_INFINITY
void load_file(const char *filename, bool restricted)
int errmsg_internal(const char *fmt,...)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
bool message_level_is_interesting(int elevel)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
#define palloc0_array(type, count)
#define PG_RETURN_DATUM(x)
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
void ProcessConfigFile(GucContext context)
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
char * pstrdup(const char *in)
void pfree(void *pointer)
#define CHECK_FOR_INTERRUPTS()
BackendType MyBackendType
static Datum LSNGetDatum(XLogRecPtr X)
static char buf[DEFAULT_XLOG_SEG_SIZE]
instr_time pgstat_prepare_io_time(bool track_io_guc)
void pgstat_count_io_op_time(IOObject io_object, IOContext io_context, IOOp io_op, instr_time start_time, uint32 cnt, uint64 bytes)
void pgstat_report_wal(bool force)
size_t strlcpy(char *dst, const char *src, size_t siz)
static Datum PointerGetDatum(const void *X)
static Pointer DatumGetPointer(Datum X)
static Datum Int32GetDatum(int32 X)
#define GetPGProcByNumber(n)
void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin)
#define INVALID_PROC_NUMBER
void procsignal_sigusr1_handler(SIGNAL_ARGS)
#define PqReplMsg_WALData
#define PqReplMsg_Keepalive
#define PqReplMsg_HotStandbyFeedback
#define PqReplMsg_StandbyStatusUpdate
bool update_process_title
static void set_ps_display(const char *activity)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void resetStringInfo(StringInfo str)
void initStringInfo(StringInfo str)
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
pg_atomic_uint64 minWaitedLSN[WAIT_LSN_TYPE_COUNT]
TimestampTz lastMsgReceiptTime
TimeLineID receiveStartTLI
char slotname[NAMEDATALEN]
char sender_host[NI_MAXHOST]
XLogRecPtr latestChunkStart
ConditionVariable walRcvStoppedCV
pg_atomic_uint64 writtenUpto
TimestampTz lastMsgSendTime
TimestampTz latestWalEndTime
char conninfo[MAXCONNINFO]
#define InvalidTransactionId
#define EpochFromFullTransactionId(x)
#define XidFromFullTransactionId(x)
#define TransactionIdIsValid(xid)
static Datum TimestampTzGetDatum(TimestampTz X)
#define TimestampTzPlusMilliseconds(tz, ms)
#define TimestampTzPlusSeconds(tz, s)
FullTransactionId ReadNextFullTransactionId(void)
static void pgstat_report_wait_start(uint32 wait_event_info)
static void pgstat_report_wait_end(void)
#define WL_SOCKET_READABLE
#define WL_EXIT_ON_PM_DEATH
#define NUM_WALRCV_WAKEUPS
static WalReceiverConn * wrconn
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]
void WalReceiverMain(const void *startup_data, size_t startup_data_len)
static StringInfoData reply_message
bool hot_standby_feedback
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
int wal_receiver_status_interval
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
static void XLogWalRcvFlush(bool dying, TimeLineID tli)
static TimeLineID recvFileTLI
WalReceiverFunctionsType * WalReceiverFunctions
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
Datum pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
static XLogSegNo recvSegNo
static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
static void XLogWalRcvSendHSFeedback(bool immed)
@ WALRCV_WAKEUP_TERMINATE
@ WALRCV_WAKEUP_HSFEEDBACK
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
static void WalRcvDie(int code, Datum arg)
static void XLogWalRcvSendReply(bool force, bool requestReply)
static struct @19 LogstreamResult
static const char * WalRcvGetStateString(WalRcvState state)
void WalRcvForceReply(void)
#define AllowCascadeReplication()
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
#define walrcv_startstreaming(conn, options)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_send(conn, buffer, nbytes)
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
#define walrcv_get_conninfo(conn)
#define walrcv_endstreaming(conn, next_tli)
#define walrcv_identify_system(conn, primary_tli)
#define walrcv_disconnect(conn)
#define walrcv_get_backend_pid(conn)
#define walrcv_receive(conn, buffer, wait_fd)
int GetReplicationApplyDelay(void)
int GetReplicationTransferLatency(void)
void WalSndWakeup(bool physical, bool logical)
int XLogFileInit(XLogSegNo logsegno, TimeLineID logtli)
uint64 GetSystemIdentifier(void)
bool RecoveryInProgress(void)
void issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli)
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
static void TLHistoryFileName(char *fname, TimeLineID tli)
void XLogArchiveForceDone(const char *xlog)
void XLogArchiveNotify(const char *xlog)
#define XLogRecPtrIsValid(r)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr
bool HotStandbyActive(void)
void WakeupRecovery(void)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
struct WaitLSNState * waitLSNState
void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
@ WAIT_LSN_TYPE_STANDBY_FLUSH
@ WAIT_LSN_TYPE_STANDBY_WRITE