124#define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1)
165 char *sender_host = NULL;
169 Assert(startup_data_len == 0);
213 elog(
PANIC,
"walreceiver still running according to shared memory state");
231 Assert(!is_temp_slot || (slotname[0] ==
'\0'));
265 elog(
ERROR,
"libpqwalreceiver didn't initialize correctly");
275 (
errcode(ERRCODE_CONNECTION_FAILURE),
276 errmsg(
"streaming replication receiver \"%s\" could not connect to the primary server: %s",
309 char standby_sysid[32];
320 if (strcmp(primary_sysid, standby_sysid) != 0)
323 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
324 errmsg(
"database system identifier differs between the primary and standby"),
325 errdetail(
"The primary's identifier is %s, the standby's identifier is %s.",
326 primary_sysid, standby_sysid)));
333 if (primaryTLI < startpointTLI)
335 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
336 errmsg(
"highest timeline %u of the primary is behind recovery timeline %u",
337 primaryTLI, startpointTLI)));
358 snprintf(slotname,
sizeof(slotname),
359 "pg_walreceiver_%lld",
382 options.startpoint = startpoint;
383 options.slotname = slotname[0] !=
'\0' ? slotname : NULL;
384 options.proto.physical.startpointTLI = startpointTLI;
389 (
errmsg(
"started streaming WAL from primary at %X/%X on timeline %u",
393 (
errmsg(
"restarted WAL streaming at %X/%X on timeline %u",
395 first_stream =
false;
415 bool endofwal =
false;
427 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
428 errmsg(
"cannot continue WAL streaming, recovery has already ended")));
472 (
errmsg(
"replication terminated by primary server"),
473 errdetail(
"End of WAL reached on timeline %u at %X/%X.",
523 WAIT_EVENT_WAL_RECEIVER_MAIN);
553 bool requestReply =
false;
572 (
errcode(ERRCODE_CONNECTION_FAILURE),
573 errmsg(
"terminating walreceiver due to timeout")));
605 (
errmsg(
"primary server contains no more WAL on requested timeline %u",
621 errmsg(
"could not close WAL segment %s: %m",
635 elog(
DEBUG1,
"walreceiver ended streaming and awaits new instructions");
658 elog(
FATAL,
"unexpected walreceiver state");
707 WAIT_EVENT_WAL_RECEIVER_WAIT_START);
712 char activitymsg[50];
714 snprintf(activitymsg,
sizeof(activitymsg),
"restarting at %X/%X",
729 for (tli = first; tli <= last; tli++)
740 (
errmsg(
"fetching timeline history file for timeline %u from primary server",
751 if (strcmp(fname, expectedfname) != 0)
753 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
754 errmsg_internal(
"primary reported unexpected file name for timeline history file of timeline %u",
786 Assert(*startpointTLI_p != 0);
836 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
857 hdrlen =
sizeof(
int64) +
sizeof(
int64) +
sizeof(char);
860 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
880 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
937 if (byteswritten <= 0)
951 errmsg(
"could not write to WAL segment %s "
952 "at offset %d, length %lu: %m",
953 xlogfname, startoff, (
unsigned long) segbytes)));
957 recptr += byteswritten;
959 nbytes -= byteswritten;
1015 char activitymsg[50];
1017 snprintf(activitymsg,
sizeof(activitymsg),
"streaming %X/%X",
1063 errmsg(
"could not close WAL segment %s: %m",
1141 elog(
DEBUG2,
"sending write %X/%X flush %X/%X apply %X/%X%s",
1145 requestReply ?
" (reply requested)" :
"");
1172 static bool primary_has_standby_xmin =
true;
1179 !primary_has_standby_xmin)
1226 catalog_xmin_epoch = xmin_epoch;
1229 if (nextXid < catalog_xmin)
1230 catalog_xmin_epoch--;
1232 elog(
DEBUG2,
"sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1233 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1245 primary_has_standby_xmin =
true;
1247 primary_has_standby_xmin =
false;
1283 if (applyDelay == -1)
1284 elog(
DEBUG2,
"sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1289 elog(
DEBUG2,
"sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1381 return "restarting";
1399 bool ready_to_display;
1410 char sender_host[NI_MAXHOST];
1411 int sender_port = 0;
1438 if (pid == 0 || !ready_to_display)
1451 elog(
ERROR,
"return type must be a row type");
1466 memset(&nulls[1],
true,
sizeof(
bool) * (tupdesc->
natts - 1));
1486 if (last_send_time == 0)
1490 if (last_receipt_time == 0)
1498 if (latest_end_time == 0)
1502 if (*slotname ==
'\0')
1506 if (*sender_host ==
'\0')
1510 if (sender_port == 0)
1514 if (*conninfo ==
'\0')
bool has_privs_of_role(Oid member, Oid role)
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
#define pg_memory_barrier()
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
void AuxiliaryProcessMainCommon(void)
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
bool existsTimeLineHistory(TimeLineID probeTLI)
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
TimestampTz GetCurrentTimestamp(void)
const char * timestamptz_to_str(TimestampTz t)
Datum now(PG_FUNCTION_ARGS)
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
void ConditionVariableBroadcast(ConditionVariable *cv)
#define TIMESTAMP_INFINITY
void load_file(const char *filename, bool restricted)
int errmsg_internal(const char *fmt,...)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
bool message_level_is_interesting(int elevel)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
#define PG_RETURN_DATUM(x)
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
void ProcessConfigFile(GucContext context)
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
char * pstrdup(const char *in)
void pfree(void *pointer)
void * palloc0(Size size)
#define CHECK_FOR_INTERRUPTS()
BackendType MyBackendType
static Datum LSNGetDatum(XLogRecPtr X)
instr_time pgstat_prepare_io_time(bool track_io_guc)
void pgstat_count_io_op_time(IOObject io_object, IOContext io_context, IOOp io_op, instr_time start_time, uint32 cnt, uint64 bytes)
void pgstat_report_wal(bool force)
size_t strlcpy(char *dst, const char *src, size_t siz)
static Datum PointerGetDatum(const void *X)
static Pointer DatumGetPointer(Datum X)
static Datum Int32GetDatum(int32 X)
#define GetPGProcByNumber(n)
void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin)
#define INVALID_PROC_NUMBER
void procsignal_sigusr1_handler(SIGNAL_ARGS)
bool update_process_title
static void set_ps_display(const char *activity)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void resetStringInfo(StringInfo str)
void initStringInfo(StringInfo str)
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
TimestampTz lastMsgReceiptTime
TimeLineID receiveStartTLI
char slotname[NAMEDATALEN]
char sender_host[NI_MAXHOST]
XLogRecPtr latestChunkStart
ConditionVariable walRcvStoppedCV
pg_atomic_uint64 writtenUpto
TimestampTz lastMsgSendTime
TimestampTz latestWalEndTime
char conninfo[MAXCONNINFO]
#define InvalidTransactionId
#define EpochFromFullTransactionId(x)
#define XidFromFullTransactionId(x)
#define TransactionIdIsValid(xid)
static Datum TimestampTzGetDatum(TimestampTz X)
#define TimestampTzPlusMilliseconds(tz, ms)
#define TimestampTzPlusSeconds(tz, s)
FullTransactionId ReadNextFullTransactionId(void)
static void pgstat_report_wait_start(uint32 wait_event_info)
static void pgstat_report_wait_end(void)
#define WL_SOCKET_READABLE
#define WL_EXIT_ON_PM_DEATH
#define NUM_WALRCV_WAKEUPS
static WalReceiverConn * wrconn
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]
void WalReceiverMain(const void *startup_data, size_t startup_data_len)
static StringInfoData reply_message
bool hot_standby_feedback
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
int wal_receiver_status_interval
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
static void XLogWalRcvFlush(bool dying, TimeLineID tli)
static TimeLineID recvFileTLI
WalReceiverFunctionsType * WalReceiverFunctions
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
Datum pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
static XLogSegNo recvSegNo
static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
static void XLogWalRcvSendHSFeedback(bool immed)
@ WALRCV_WAKEUP_TERMINATE
@ WALRCV_WAKEUP_HSFEEDBACK
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
static void WalRcvDie(int code, Datum arg)
static void XLogWalRcvSendReply(bool force, bool requestReply)
static struct @19 LogstreamResult
static const char * WalRcvGetStateString(WalRcvState state)
void WalRcvForceReply(void)
#define AllowCascadeReplication()
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
#define walrcv_startstreaming(conn, options)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_send(conn, buffer, nbytes)
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
#define walrcv_get_conninfo(conn)
#define walrcv_endstreaming(conn, next_tli)
#define walrcv_identify_system(conn, primary_tli)
#define walrcv_disconnect(conn)
#define walrcv_get_backend_pid(conn)
#define walrcv_receive(conn, buffer, wait_fd)
int GetReplicationApplyDelay(void)
int GetReplicationTransferLatency(void)
void WalSndWakeup(bool physical, bool logical)
int XLogFileInit(XLogSegNo logsegno, TimeLineID logtli)
uint64 GetSystemIdentifier(void)
bool RecoveryInProgress(void)
void issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli)
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
static void TLHistoryFileName(char *fname, TimeLineID tli)
void XLogArchiveForceDone(const char *xlog)
void XLogArchiveNotify(const char *xlog)
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr
bool HotStandbyActive(void)
void WakeupRecovery(void)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)