123 #define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1)
175 (
errcode(ERRCODE_ADMIN_SHUTDOWN),
176 errmsg(
"terminating walreceiver process due to administrator command")));
196 char *sender_host = NULL;
200 Assert(startup_data_len == 0);
244 elog(
PANIC,
"walreceiver still running according to shared memory state");
262 Assert(!is_temp_slot || (slotname[0] ==
'\0'));
296 elog(
ERROR,
"libpqwalreceiver didn't initialize correctly");
306 (
errcode(ERRCODE_CONNECTION_FAILURE),
307 errmsg(
"streaming replication receiver \"%s\" could not connect to the primary server: %s",
340 char standby_sysid[32];
351 if (strcmp(primary_sysid, standby_sysid) != 0)
354 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
355 errmsg(
"database system identifier differs between the primary and standby"),
356 errdetail(
"The primary's identifier is %s, the standby's identifier is %s.",
357 primary_sysid, standby_sysid)));
364 if (primaryTLI < startpointTLI)
366 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
367 errmsg(
"highest timeline %u of the primary is behind recovery timeline %u",
368 primaryTLI, startpointTLI)));
389 snprintf(slotname,
sizeof(slotname),
390 "pg_walreceiver_%lld",
413 options.startpoint = startpoint;
414 options.slotname = slotname[0] !=
'\0' ? slotname : NULL;
415 options.proto.physical.startpointTLI = startpointTLI;
420 (
errmsg(
"started streaming WAL from primary at %X/%X on timeline %u",
424 (
errmsg(
"restarted WAL streaming at %X/%X on timeline %u",
426 first_stream =
false;
446 bool endofwal =
false;
458 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
459 errmsg(
"cannot continue WAL streaming, recovery has already ended")));
503 (
errmsg(
"replication terminated by primary server"),
504 errdetail(
"End of WAL reached on timeline %u at %X/%X.",
554 WAIT_EVENT_WAL_RECEIVER_MAIN);
584 bool requestReply =
false;
593 (
errcode(ERRCODE_CONNECTION_FAILURE),
594 errmsg(
"terminating walreceiver due to timeout")));
626 (
errmsg(
"primary server contains no more WAL on requested timeline %u",
642 errmsg(
"could not close WAL segment %s: %m",
656 elog(
DEBUG1,
"walreceiver ended streaming and awaits new instructions");
679 elog(
FATAL,
"unexpected walreceiver state");
728 WAIT_EVENT_WAL_RECEIVER_WAIT_START);
733 char activitymsg[50];
735 snprintf(activitymsg,
sizeof(activitymsg),
"restarting at %X/%X",
750 for (tli = first; tli <= last; tli++)
761 (
errmsg(
"fetching timeline history file for timeline %u from primary server",
772 if (strcmp(fname, expectedfname) != 0)
774 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
775 errmsg_internal(
"primary reported unexpected file name for timeline history file of timeline %u",
807 Assert(*startpointTLI_p != 0);
823 walrcv->
latch = NULL;
854 hdrlen =
sizeof(int64) +
sizeof(int64) +
sizeof(int64);
857 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
878 hdrlen =
sizeof(int64) +
sizeof(int64) +
sizeof(char);
881 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
901 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
946 if (byteswritten <= 0)
960 errmsg(
"could not write to WAL segment %s "
961 "at offset %d, length %lu: %m",
962 xlogfname, startoff, (
unsigned long) segbytes)));
966 recptr += byteswritten;
968 nbytes -= byteswritten;
1024 char activitymsg[50];
1026 snprintf(activitymsg,
sizeof(activitymsg),
"streaming %X/%X",
1072 errmsg(
"could not close WAL segment %s: %m",
1150 elog(
DEBUG2,
"sending write %X/%X flush %X/%X apply %X/%X%s",
1154 requestReply ?
" (reply requested)" :
"");
1181 static bool primary_has_standby_xmin =
true;
1188 !primary_has_standby_xmin)
1235 catalog_xmin_epoch = xmin_epoch;
1238 if (nextXid < catalog_xmin)
1239 catalog_xmin_epoch--;
1241 elog(
DEBUG2,
"sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1242 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1254 primary_has_standby_xmin =
true;
1256 primary_has_standby_xmin =
false;
1292 if (applyDelay == -1)
1293 elog(
DEBUG2,
"sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1298 elog(
DEBUG2,
"sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1390 return "restarting";
1408 bool ready_to_display;
1419 char sender_host[NI_MAXHOST];
1420 int sender_port = 0;
1447 if (pid == 0 || !ready_to_display)
1460 elog(
ERROR,
"return type must be a row type");
1475 memset(&nulls[1],
true,
sizeof(
bool) * (tupdesc->
natts - 1));
1495 if (last_send_time == 0)
1499 if (last_receipt_time == 0)
1507 if (latest_end_time == 0)
1511 if (*slotname ==
'\0')
1515 if (*sender_host ==
'\0')
1519 if (sender_port == 0)
1523 if (*conninfo ==
'\0')
bool has_privs_of_role(Oid member, Oid role)
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
#define pg_memory_barrier()
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
void AuxiliaryProcessMainCommon(void)
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
bool existsTimeLineHistory(TimeLineID probeTLI)
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
const char * timestamptz_to_str(TimestampTz t)
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
#define Assert(condition)
void ConditionVariableBroadcast(ConditionVariable *cv)
#define TIMESTAMP_INFINITY
void load_file(const char *filename, bool restricted)
int errmsg_internal(const char *fmt,...)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
bool message_level_is_interesting(int elevel)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
#define PG_RETURN_DATUM(x)
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
void ProcessConfigFile(GucContext context)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
volatile sig_atomic_t ShutdownRequestPending
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define WL_SOCKET_READABLE
#define WL_EXIT_ON_PM_DEATH
char * pstrdup(const char *in)
void pfree(void *pointer)
void * palloc0(Size size)
#define CHECK_FOR_INTERRUPTS()
BackendType MyBackendType
static Datum LSNGetDatum(XLogRecPtr X)
pqsigfunc pqsignal(int signo, pqsigfunc func)
size_t strlcpy(char *dst, const char *src, size_t siz)
static Datum PointerGetDatum(const void *X)
static Pointer DatumGetPointer(Datum X)
static Datum Int32GetDatum(int32 X)
void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin)
void procsignal_sigusr1_handler(SIGNAL_ARGS)
bool update_process_title
static void set_ps_display(const char *activity)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void resetStringInfo(StringInfo str)
void initStringInfo(StringInfo str)
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
TimestampTz lastMsgReceiptTime
TimeLineID receiveStartTLI
char slotname[NAMEDATALEN]
char sender_host[NI_MAXHOST]
XLogRecPtr latestChunkStart
ConditionVariable walRcvStoppedCV
pg_atomic_uint64 writtenUpto
TimestampTz lastMsgSendTime
TimestampTz latestWalEndTime
char conninfo[MAXCONNINFO]
#define InvalidTransactionId
#define EpochFromFullTransactionId(x)
#define XidFromFullTransactionId(x)
#define TransactionIdIsValid(xid)
static Datum TimestampTzGetDatum(TimestampTz X)
#define TimestampTzPlusMilliseconds(tz, ms)
#define TimestampTzPlusSeconds(tz, s)
FullTransactionId ReadNextFullTransactionId(void)
void ProcessWalRcvInterrupts(void)
#define NUM_WALRCV_WAKEUPS
static WalReceiverConn * wrconn
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]
static StringInfoData reply_message
bool hot_standby_feedback
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
int wal_receiver_status_interval
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
static void XLogWalRcvFlush(bool dying, TimeLineID tli)
static TimeLineID recvFileTLI
WalReceiverFunctionsType * WalReceiverFunctions
static struct @21 LogstreamResult
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
void WalReceiverMain(char *startup_data, size_t startup_data_len)
Datum pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
static XLogSegNo recvSegNo
static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
static void XLogWalRcvSendHSFeedback(bool immed)
@ WALRCV_WAKEUP_TERMINATE
@ WALRCV_WAKEUP_HSFEEDBACK
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
static const char * WalRcvGetStateString(WalRcvState state)
static void WalRcvDie(int code, Datum arg)
static void XLogWalRcvSendReply(bool force, bool requestReply)
void WalRcvForceReply(void)
#define AllowCascadeReplication()
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
#define walrcv_startstreaming(conn, options)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_send(conn, buffer, nbytes)
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
#define walrcv_get_conninfo(conn)
#define walrcv_endstreaming(conn, next_tli)
#define walrcv_identify_system(conn, primary_tli)
#define walrcv_disconnect(conn)
#define walrcv_get_backend_pid(conn)
#define walrcv_receive(conn, buffer, wait_fd)
int GetReplicationApplyDelay(void)
int GetReplicationTransferLatency(void)
void WalSndWakeup(bool physical, bool logical)
int XLogFileInit(XLogSegNo logsegno, TimeLineID logtli)
uint64 GetSystemIdentifier(void)
bool RecoveryInProgress(void)
void issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli)
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
static void TLHistoryFileName(char *fname, TimeLineID tli)
void XLogArchiveForceDone(const char *xlog)
void XLogArchiveNotify(const char *xlog)
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr
bool HotStandbyActive(void)
void WakeupRecovery(void)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)