84 #define NAPTIME_PER_CYCLE 100 159 (
errcode(ERRCODE_ADMIN_SHUTDOWN),
160 errmsg(
"terminating walreceiver process due to administrator command")));
181 char *sender_host = NULL;
223 elog(
PANIC,
"walreceiver still running according to shared memory state");
266 if (WalReceiverFunctions == NULL)
267 elog(
ERROR,
"libpqwalreceiver didn't initialize correctly");
276 (
errmsg(
"could not connect to the primary server: %s", err)));
308 char standby_sysid[32];
319 if (strcmp(primary_sysid, standby_sysid) != 0)
322 (
errmsg(
"database system identifier differs between the primary and standby"),
323 errdetail(
"The primary's identifier is %s, the standby's identifier is %s.",
324 primary_sysid, standby_sysid)));
331 if (primaryTLI < startpointTLI)
333 (
errmsg(
"highest timeline %u of the primary is behind recovery timeline %u",
334 primaryTLI, startpointTLI)));
362 options.
slotname = slotname[0] !=
'\0' ? slotname : NULL;
369 (
errmsg(
"started streaming WAL from primary at %X/%X on timeline %u",
374 (
errmsg(
"restarted WAL streaming at %X/%X on timeline %u",
377 first_stream =
false;
393 bool endofwal =
false;
403 (
errmsg(
"cannot continue WAL streaming, recovery has already ended")));
440 (
errmsg(
"replication terminated by primary server"),
441 errdetail(
"End of WAL reached on timeline %u at %X/%X.",
512 bool requestReply =
false;
529 (
errmsg(
"terminating walreceiver due to timeout")));
567 (
errmsg(
"primary server contains no more WAL on requested timeline %u",
583 errmsg(
"could not close log segment %s: %m",
597 elog(
DEBUG1,
"walreceiver ended streaming and awaits new instructions");
620 elog(
FATAL,
"unexpected walreceiver state");
671 char activitymsg[50];
673 snprintf(activitymsg,
sizeof(activitymsg),
"restarting at %X/%X",
674 (
uint32) (*startpoint >> 32),
689 for (tli = first; tli <= last; tli++)
700 (
errmsg(
"fetching timeline history file for timeline %u from primary server",
711 if (strcmp(fname, expectedfname) != 0)
713 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
714 errmsg_internal(
"primary reported unexpected file name for timeline history file of timeline %u",
750 walrcv->
latch = NULL;
773 int save_errno = errno;
828 hdrlen =
sizeof(int64) +
sizeof(int64) +
sizeof(int64);
831 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
849 hdrlen =
sizeof(int64) +
sizeof(int64) +
sizeof(char);
852 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
870 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
913 errmsg(
"could not close log segment %s: %m",
946 if (lseek(
recvFile, (off_t) startoff, SEEK_SET) < 0)
949 int save_errno = errno;
955 errmsg(
"could not seek in log segment %s to offset %u: %m",
956 xlogfname, startoff)));
966 if (byteswritten <= 0)
980 errmsg(
"could not write to log segment %s " 981 "at offset %u, length %lu: %m",
982 xlogfname,
recvOff, (
unsigned long) segbytes)));
986 recptr += byteswritten;
989 nbytes -= byteswritten;
1031 char activitymsg[50];
1033 snprintf(activitymsg,
sizeof(activitymsg),
"streaming %X/%X",
1108 pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1111 elog(
DEBUG2,
"sending write %X/%X flush %X/%X apply %X/%X%s",
1115 requestReply ?
" (reply requested)" :
"");
1143 static bool master_has_standby_xmin =
true;
1150 !master_has_standby_xmin)
1216 catalog_xmin_epoch = xmin_epoch;
1219 if (nextXid < catalog_xmin)
1220 catalog_xmin_epoch--;
1222 elog(
DEBUG2,
"sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1223 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1235 master_has_standby_xmin =
true;
1237 master_has_standby_xmin =
false;
1274 if (applyDelay == -1)
1275 elog(
DEBUG2,
"sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1280 elog(
DEBUG2,
"sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1331 return "restarting";
1349 bool ready_to_display;
1360 int sender_port = 0;
1387 if (pid == 0 || !ready_to_display)
1392 elog(
ERROR,
"return type must be a row type");
1407 MemSet(&nulls[1],
true,
sizeof(
bool) * (tupdesc->
natts - 1));
1423 if (last_send_time == 0)
1427 if (last_receipt_time == 0)
1435 if (latest_end_time == 0)
1439 if (*slotname ==
'\0')
1443 if (*sender_host ==
'\0')
1447 if (sender_port == 0)
1451 if (*conninfo ==
'\0')
static struct @25 LogstreamResult
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
#define InvalidXLogRecPtr
#define walrcv_get_conninfo(conn)
static const char * WalRcvGetStateString(WalRcvState state)
#define walrcv_endstreaming(conn, next_tli)
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
bool hot_standby_feedback
void ProcessConfigFile(GucContext context)
bool update_process_title
int GetReplicationTransferLatency(void)
TimestampTz GetCurrentTimestamp(void)
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
void issue_xlog_fsync(int fd, XLogSegNo segno)
#define walrcv_identify_system(conn, primary_tli)
static StringInfoData incoming_message
char * pstrdup(const char *in)
bool HotStandbyActive(void)
static void XLogWalRcvSendHSFeedback(bool immed)
int XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
#define walrcv_receive(conn, buffer, wait_fd)
void ProcessWalRcvInterrupts(void)
union WalRcvStreamOptions::@106 proto
static void WalRcvSigHupHandler(SIGNAL_ARGS)
void set_ps_display(const char *activity, bool force)
#define walrcv_startstreaming(conn, options)
int errcode(int sqlerrcode)
#define MemSet(start, val, len)
#define WL_SOCKET_READABLE
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
bool RecoveryInProgress(void)
void SetLatch(Latch *latch)
int wal_receiver_status_interval
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
#define NAPTIME_PER_CYCLE
TimestampTz lastMsgReceiptTime
Datum pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
void ResetLatch(Latch *latch)
TimestampTz lastMsgSendTime
static WalReceiverConn * wrconn
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define XidFromFullTransactionId(x)
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
#define SpinLockAcquire(lock)
static StringInfoData reply_message
void XLogArchiveNotify(const char *xlog)
void WalReceiverMain(void)
void pfree(void *pointer)
#define PROCARRAY_SLOTS_XMIN
void WakeupRecovery(void)
#define TimestampTzGetDatum(X)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
XLogRecPtr latestChunkStart
#define TLHistoryFileName(fname, tli)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
int errdetail(const char *fmt,...)
int errcode_for_file_access(void)
#define InvalidTransactionId
#define AllowCascadeReplication()
void resetStringInfo(StringInfo str)
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
bool existsTimeLineHistory(TimeLineID probeTLI)
struct WalRcvStreamOptions::@106::@107 physical
static volatile sig_atomic_t got_SIGHUP
#define ereport(elevel, rest)
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
TimeLineID receiveStartTLI
static TimeLineID recvFileTLI
FullTransactionId ReadNextFullTransactionId(void)
int GetReplicationApplyDelay(void)
void initStringInfo(StringInfo str)
#define XLogRecPtrIsInvalid(r)
static XLogSegNo recvSegNo
#define SpinLockRelease(lock)
void * palloc0(Size size)
void XLogArchiveForceDone(const char *xlog)
#define PG_RETURN_DATUM(x)
WalReceiverFunctionsType * WalReceiverFunctions
#define PROCARRAY_FLAGS_DEFAULT
static void XLogWalRcvSendReply(bool force, bool requestReply)
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define EpochFromFullTransactionId(x)
TimestampTz latestWalEndTime
static void WalRcvShutdownHandler(SIGNAL_ARGS)
TimeLineID ThisTimeLineID
#define TimestampTzPlusMilliseconds(tz, ms)
TransactionId GetOldestXmin(Relation rel, int flags)
pqsigfunc pqsignal(int signum, pqsigfunc handler)
bool is_member_of_role(Oid member, Oid role)
size_t strlcpy(char *dst, const char *src, size_t siz)
int errmsg_internal(const char *fmt,...)
#define pg_memory_barrier()
#define Assert(condition)
void load_file(const char *filename, bool restricted)
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
#define walrcv_disconnect(conn)
#define HeapTupleGetDatum(tuple)
#define walrcv_send(conn, buffer, nbytes)
static void WalRcvQuickDieHandler(SIGNAL_ARGS)
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
static void XLogWalRcvFlush(bool dying)
static Datum values[MAXATTR]
uint64 GetSystemIdentifier(void)
char sender_host[NI_MAXHOST]
int errmsg(const char *fmt,...)
static void WalRcvDie(int code, Datum arg)
#define walrcv_get_senderinfo(conn, sender_host, sender_port)
#define CStringGetTextDatum(s)
#define CHECK_FOR_INTERRUPTS()
#define TransactionIdIsValid(xid)
void procsignal_sigusr1_handler(SIGNAL_ARGS)
static volatile sig_atomic_t got_SIGTERM
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Datum now(PG_FUNCTION_ARGS)
char slotname[NAMEDATALEN]
#define WL_EXIT_ON_PM_DEATH
char conninfo[MAXCONNINFO]
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
const char * timestamptz_to_str(TimestampTz t)
void WalRcvForceReply(void)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define walrcv_connect(conninfo, logical, appname, err)