123 #define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1)
175 (
errcode(ERRCODE_ADMIN_SHUTDOWN),
176 errmsg(
"terminating walreceiver process due to administrator command")));
196 char *sender_host = NULL;
199 Assert(startup_data_len == 0);
243 elog(
PANIC,
"walreceiver still running according to shared memory state");
261 Assert(!is_temp_slot || (slotname[0] ==
'\0'));
295 elog(
ERROR,
"libpqwalreceiver didn't initialize correctly");
306 (
errcode(ERRCODE_CONNECTION_FAILURE),
307 errmsg(
"could not connect to the primary server: %s",
err)));
339 char standby_sysid[32];
350 if (strcmp(primary_sysid, standby_sysid) != 0)
353 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
354 errmsg(
"database system identifier differs between the primary and standby"),
355 errdetail(
"The primary's identifier is %s, the standby's identifier is %s.",
356 primary_sysid, standby_sysid)));
363 if (primaryTLI < startpointTLI)
365 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
366 errmsg(
"highest timeline %u of the primary is behind recovery timeline %u",
367 primaryTLI, startpointTLI)));
388 snprintf(slotname,
sizeof(slotname),
389 "pg_walreceiver_%lld",
412 options.startpoint = startpoint;
413 options.slotname = slotname[0] !=
'\0' ? slotname : NULL;
414 options.proto.physical.startpointTLI = startpointTLI;
419 (
errmsg(
"started streaming WAL from primary at %X/%X on timeline %u",
423 (
errmsg(
"restarted WAL streaming at %X/%X on timeline %u",
425 first_stream =
false;
445 bool endofwal =
false;
457 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
458 errmsg(
"cannot continue WAL streaming, recovery has already ended")));
502 (
errmsg(
"replication terminated by primary server"),
503 errdetail(
"End of WAL reached on timeline %u at %X/%X.",
553 WAIT_EVENT_WAL_RECEIVER_MAIN);
583 bool requestReply =
false;
592 (
errcode(ERRCODE_CONNECTION_FAILURE),
593 errmsg(
"terminating walreceiver due to timeout")));
625 (
errmsg(
"primary server contains no more WAL on requested timeline %u",
641 errmsg(
"could not close WAL segment %s: %m",
655 elog(
DEBUG1,
"walreceiver ended streaming and awaits new instructions");
678 elog(
FATAL,
"unexpected walreceiver state");
727 WAIT_EVENT_WAL_RECEIVER_WAIT_START);
732 char activitymsg[50];
734 snprintf(activitymsg,
sizeof(activitymsg),
"restarting at %X/%X",
749 for (tli = first; tli <= last; tli++)
760 (
errmsg(
"fetching timeline history file for timeline %u from primary server",
771 if (strcmp(fname, expectedfname) != 0)
773 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
774 errmsg_internal(
"primary reported unexpected file name for timeline history file of timeline %u",
806 Assert(*startpointTLI_p != 0);
822 walrcv->
latch = NULL;
853 hdrlen =
sizeof(int64) +
sizeof(int64) +
sizeof(int64);
856 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
877 hdrlen =
sizeof(int64) +
sizeof(int64) +
sizeof(char);
880 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
900 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
945 if (byteswritten <= 0)
959 errmsg(
"could not write to WAL segment %s "
960 "at offset %d, length %lu: %m",
961 xlogfname, startoff, (
unsigned long) segbytes)));
965 recptr += byteswritten;
967 nbytes -= byteswritten;
1023 char activitymsg[50];
1025 snprintf(activitymsg,
sizeof(activitymsg),
"streaming %X/%X",
1071 errmsg(
"could not close WAL segment %s: %m",
1149 elog(
DEBUG2,
"sending write %X/%X flush %X/%X apply %X/%X%s",
1153 requestReply ?
" (reply requested)" :
"");
1180 static bool primary_has_standby_xmin =
true;
1187 !primary_has_standby_xmin)
1234 catalog_xmin_epoch = xmin_epoch;
1237 if (nextXid < catalog_xmin)
1238 catalog_xmin_epoch--;
1240 elog(
DEBUG2,
"sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1241 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1253 primary_has_standby_xmin =
true;
1255 primary_has_standby_xmin =
false;
1291 if (applyDelay == -1)
1292 elog(
DEBUG2,
"sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1297 elog(
DEBUG2,
"sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1389 return "restarting";
1407 bool ready_to_display;
1418 char sender_host[NI_MAXHOST];
1419 int sender_port = 0;
1446 if (pid == 0 || !ready_to_display)
1459 elog(
ERROR,
"return type must be a row type");
1474 memset(&nulls[1],
true,
sizeof(
bool) * (tupdesc->
natts - 1));
1494 if (last_send_time == 0)
1498 if (last_receipt_time == 0)
1506 if (latest_end_time == 0)
1510 if (*slotname ==
'\0')
1514 if (*sender_host ==
'\0')
1518 if (sender_port == 0)
1522 if (*conninfo ==
'\0')
bool has_privs_of_role(Oid member, Oid role)
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
#define pg_memory_barrier()
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
void AuxiliaryProcessMainCommon(void)
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
bool existsTimeLineHistory(TimeLineID probeTLI)
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
const char * timestamptz_to_str(TimestampTz t)
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
void ConditionVariableBroadcast(ConditionVariable *cv)
#define TIMESTAMP_INFINITY
void load_file(const char *filename, bool restricted)
int errmsg_internal(const char *fmt,...)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
bool message_level_is_interesting(int elevel)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
#define PG_RETURN_DATUM(x)
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
void ProcessConfigFile(GucContext context)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
volatile sig_atomic_t ShutdownRequestPending
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define WL_SOCKET_READABLE
#define WL_EXIT_ON_PM_DEATH
Assert(fmt[strlen(fmt) - 1] !='\n')
char * pstrdup(const char *in)
void pfree(void *pointer)
void * palloc0(Size size)
#define CHECK_FOR_INTERRUPTS()
BackendType MyBackendType
static Datum LSNGetDatum(XLogRecPtr X)
pqsigfunc pqsignal(int signo, pqsigfunc func)
size_t strlcpy(char *dst, const char *src, size_t siz)
static Datum PointerGetDatum(const void *X)
static Pointer DatumGetPointer(Datum X)
static Datum Int32GetDatum(int32 X)
void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin)
void procsignal_sigusr1_handler(SIGNAL_ARGS)
bool update_process_title
static void set_ps_display(const char *activity)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void resetStringInfo(StringInfo str)
void initStringInfo(StringInfo str)
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
TimestampTz lastMsgReceiptTime
TimeLineID receiveStartTLI
char slotname[NAMEDATALEN]
char sender_host[NI_MAXHOST]
XLogRecPtr latestChunkStart
ConditionVariable walRcvStoppedCV
pg_atomic_uint64 writtenUpto
TimestampTz lastMsgSendTime
TimestampTz latestWalEndTime
char conninfo[MAXCONNINFO]
#define InvalidTransactionId
#define EpochFromFullTransactionId(x)
#define XidFromFullTransactionId(x)
#define TransactionIdIsValid(xid)
static Datum TimestampTzGetDatum(TimestampTz X)
#define TimestampTzPlusMilliseconds(tz, ms)
#define TimestampTzPlusSeconds(tz, s)
FullTransactionId ReadNextFullTransactionId(void)
void ProcessWalRcvInterrupts(void)
#define NUM_WALRCV_WAKEUPS
static WalReceiverConn * wrconn
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]
static StringInfoData reply_message
bool hot_standby_feedback
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
int wal_receiver_status_interval
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
static void XLogWalRcvFlush(bool dying, TimeLineID tli)
static struct @18 LogstreamResult
static TimeLineID recvFileTLI
WalReceiverFunctionsType * WalReceiverFunctions
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
void WalReceiverMain(char *startup_data, size_t startup_data_len)
Datum pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
static XLogSegNo recvSegNo
static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
static void XLogWalRcvSendHSFeedback(bool immed)
@ WALRCV_WAKEUP_TERMINATE
@ WALRCV_WAKEUP_HSFEEDBACK
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
static const char * WalRcvGetStateString(WalRcvState state)
static void WalRcvDie(int code, Datum arg)
static void XLogWalRcvSendReply(bool force, bool requestReply)
void WalRcvForceReply(void)
#define AllowCascadeReplication()
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
#define walrcv_startstreaming(conn, options)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_send(conn, buffer, nbytes)
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
#define walrcv_get_conninfo(conn)
#define walrcv_endstreaming(conn, next_tli)
#define walrcv_identify_system(conn, primary_tli)
#define walrcv_disconnect(conn)
#define walrcv_get_backend_pid(conn)
#define walrcv_receive(conn, buffer, wait_fd)
int GetReplicationApplyDelay(void)
int GetReplicationTransferLatency(void)
void WalSndWakeup(bool physical, bool logical)
int XLogFileInit(XLogSegNo logsegno, TimeLineID logtli)
uint64 GetSystemIdentifier(void)
bool RecoveryInProgress(void)
void issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli)
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
static void TLHistoryFileName(char *fname, TimeLineID tli)
void XLogArchiveForceDone(const char *xlog)
void XLogArchiveNotify(const char *xlog)
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr
bool HotStandbyActive(void)
void WakeupRecovery(void)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)