106#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
218#define LAG_TRACKER_BUFFER_SIZE 8192
460#define READ_REPLICATION_SLOT_COLS 3
481 if (slot == NULL || !slot->
in_use)
492 slot_contents = *slot;
498 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
499 errmsg(
"cannot use %s with a logical replication slot",
500 "READ_REPLICATION_SLOT"));
512 snprintf(xloc,
sizeof(xloc),
"%X/%X",
588 len = strlen(histfname);
596 errmsg(
"could not open file \"%s\": %m", path)));
599 histfilelen = lseek(
fd, 0, SEEK_END);
603 errmsg(
"could not seek to end of file \"%s\": %m", path)));
604 if (lseek(
fd, 0, SEEK_SET) != 0)
607 errmsg(
"could not seek to beginning of file \"%s\": %m", path)));
611 bytesleft = histfilelen;
612 while (bytesleft > 0)
623 errmsg(
"could not read file \"%s\": %m",
628 errmsg(
"could not read file \"%s\": read %d of %zu",
629 path, nread, (
Size) bytesleft)));
638 errmsg(
"could not close file \"%s\": %m", path)));
665 "incremental backup information",
725 (
errcode(ERRCODE_CONNECTION_FAILURE),
726 errmsg(
"unexpected EOF on client connection with an open transaction")));
741 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
742 errmsg(
"unexpected message type 0x%02X during COPY from stdin",
751 (
errcode(ERRCODE_CONNECTION_FAILURE),
752 errmsg(
"unexpected EOF on client connection with an open transaction")));
772 (
errcode(ERRCODE_QUERY_CANCELED),
773 errmsg(
"COPY from stdin failed: %s",
804 (
errcode(ERRCODE_OUT_OF_MEMORY),
806 errdetail(
"Failed while allocating a WAL reading processor.")));
822 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
823 errmsg(
"cannot use a logical replication slot for physical replication")));
854 List *timeLineHistory;
886 switchpoint < cmd->startpoint)
889 (
errmsg(
"requested starting point %X/%X on timeline %u is not in this server's history",
892 errdetail(
"This server's history forked from timeline %u at %X/%X.",
933 if (FlushPtr < cmd->startpoint)
936 (
errmsg(
"requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
973 char startpos_str[8 + 1 + 8 + 1];
980 snprintf(startpos_str,
sizeof(startpos_str),
"%X/%X",
1037 if (flushptr < targetPagePtr + reqLen)
1061 if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
1062 count = XLOG_BLCKSZ;
1064 count = flushptr - targetPagePtr;
1100 bool snapshot_action_given =
false;
1101 bool reserve_wal_given =
false;
1102 bool two_phase_given =
false;
1103 bool failover_given =
false;
1110 if (strcmp(defel->
defname,
"snapshot") == 0)
1116 (
errcode(ERRCODE_SYNTAX_ERROR),
1117 errmsg(
"conflicting or redundant options")));
1120 snapshot_action_given =
true;
1122 if (strcmp(
action,
"export") == 0)
1124 else if (strcmp(
action,
"nothing") == 0)
1126 else if (strcmp(
action,
"use") == 0)
1130 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1131 errmsg(
"unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
1134 else if (strcmp(defel->
defname,
"reserve_wal") == 0)
1138 (
errcode(ERRCODE_SYNTAX_ERROR),
1139 errmsg(
"conflicting or redundant options")));
1141 reserve_wal_given =
true;
1144 else if (strcmp(defel->
defname,
"two_phase") == 0)
1148 (
errcode(ERRCODE_SYNTAX_ERROR),
1149 errmsg(
"conflicting or redundant options")));
1150 two_phase_given =
true;
1153 else if (strcmp(defel->
defname,
"failover") == 0)
1157 (
errcode(ERRCODE_SYNTAX_ERROR),
1158 errmsg(
"conflicting or redundant options")));
1159 failover_given =
true;
1173 const char *snapshot_name = NULL;
1176 bool reserve_wal =
false;
1178 bool failover =
false;
1184 bool nulls[4] = {0};
1195 false,
false,
false);
1211 bool need_full_snapshot =
false;
1237 (
errmsg(
"%s must not be called inside a transaction",
1238 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1240 need_full_snapshot =
true;
1247 (
errmsg(
"%s must be called inside a transaction",
1248 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1253 (
errmsg(
"%s must be called in REPEATABLE READ isolation mode transaction",
1254 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1258 (
errmsg(
"%s must be called in a read-only transaction",
1259 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1264 (
errmsg(
"%s must be called before any query",
1265 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1270 (
errmsg(
"%s must not be called in a subtransaction",
1271 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1273 need_full_snapshot =
true;
1321 snprintf(xloc,
sizeof(xloc),
"%X/%X",
1354 if (snapshot_name != NULL)
1387 bool failover_given =
false;
1388 bool two_phase_given =
false;
1395 if (strcmp(defel->defname,
"failover") == 0)
1399 (
errcode(ERRCODE_SYNTAX_ERROR),
1400 errmsg(
"conflicting or redundant options")));
1401 failover_given =
true;
1404 else if (strcmp(defel->defname,
"two_phase") == 0)
1406 if (two_phase_given)
1408 (
errcode(ERRCODE_SYNTAX_ERROR),
1409 errmsg(
"conflicting or redundant options")));
1410 two_phase_given =
true;
1414 elog(
ERROR,
"unrecognized option: %s", defel->defname);
1418 failover_given ? &failover : NULL,
1447 (
errmsg(
"terminating walsender process after promotion")));
1611 WAIT_EVENT_WAL_SENDER_WRITE_DATA);
1648 bool pending_writes =
false;
1659#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1687 pending_writes =
true;
1697 if (pending_writes || (!end_xact &&
1747 *wait_event = WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION;
1769 if (target_lsn > flushed_lsn)
1771 *wait_event = WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL;
1807 return RecentFlushPtr;
1816 bool wait_for_standby_at_stop =
false;
1849 if (wait_event != WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
1868 wait_for_standby_at_stop =
true;
1890 if (!wait_for_standby_at_stop &&
1937 WalSndWait(wakeEvents, sleeptime, wait_event);
1942 return RecentFlushPtr;
1975 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1976 errmsg(
"cannot execute new commands while WAL sender is in stopping mode")));
1990 "Replication command context",
2010 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2011 errmsg(
"cannot execute SQL commands in WAL sender for physical replication")));
2023 (
errcode(ERRCODE_SYNTAX_ERROR),
2044 (
errmsg(
"received replication command: %s", cmd_string)));
2051 (
errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
2052 errmsg(
"current transaction is aborted, "
2053 "commands ignored until end of transaction block")));
2065 switch (cmd_node->
type)
2067 case T_IdentifySystemCmd:
2068 cmdtag =
"IDENTIFY_SYSTEM";
2074 case T_ReadReplicationSlotCmd:
2075 cmdtag =
"READ_REPLICATION_SLOT";
2081 case T_BaseBackupCmd:
2082 cmdtag =
"BASE_BACKUP";
2089 case T_CreateReplicationSlotCmd:
2090 cmdtag =
"CREATE_REPLICATION_SLOT";
2096 case T_DropReplicationSlotCmd:
2097 cmdtag =
"DROP_REPLICATION_SLOT";
2103 case T_AlterReplicationSlotCmd:
2104 cmdtag =
"ALTER_REPLICATION_SLOT";
2110 case T_StartReplicationCmd:
2114 cmdtag =
"START_REPLICATION";
2130 case T_TimeLineHistoryCmd:
2131 cmdtag =
"TIMELINE_HISTORY";
2138 case T_VariableShowStmt:
2154 case T_UploadManifestCmd:
2155 cmdtag =
"UPLOAD_MANIFEST";
2163 elog(
ERROR,
"unrecognized replication command node tag: %u",
2188 unsigned char firstchar;
2191 bool received =
false;
2208 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
2209 errmsg(
"unexpected EOF on standby connection")));
2231 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
2232 errmsg(
"invalid standby message type \"%c\"",
2243 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
2244 errmsg(
"unexpected EOF on standby connection")));
2320 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
2321 errmsg(
"unexpected message type \"%c\"", msgtype)));
2332 bool changed =
false;
2368 bool replyRequested;
2376 static bool fullyAppliedLastTime =
false;
2392 elog(
DEBUG2,
"write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
2396 replyRequested ?
" (reply requested)" :
"",
2399 pfree(replyTimeStr);
2416 clearLagTimes =
false;
2419 if (fullyAppliedLastTime)
2420 clearLagTimes =
true;
2421 fullyAppliedLastTime =
true;
2424 fullyAppliedLastTime =
false;
2438 walsnd->
write = writePtr;
2439 walsnd->
flush = flushPtr;
2440 walsnd->
apply = applyPtr;
2441 if (writeLag != -1 || clearLagTimes)
2443 if (flushLag != -1 || clearLagTimes)
2445 if (applyLag != -1 || clearLagTimes)
2470 bool changed =
false;
2529 if (
epoch != nextEpoch)
2534 if (
epoch + 1 != nextEpoch)
2553 uint32 feedbackCatalogEpoch;
2574 elog(
DEBUG2,
"hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2577 feedbackCatalogXmin,
2578 feedbackCatalogEpoch,
2581 pfree(replyTimeStr);
2674 long sleeptime = 10000;
2735 (
errmsg(
"terminating walsender process due to replication timeout")));
2862 WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2890 if (walsnd->
pid != 0)
2996 if (nextSegNo == endSegNo)
3002 if (
state->seg.ws_file >= 0)
3009 if (errno == ENOENT)
3012 int save_errno = errno;
3018 errmsg(
"requested WAL segment %s has already been removed",
3024 errmsg(
"could not open file \"%s\": %m",
3090 bool becameHistoric =
false;
3099 becameHistoric =
true;
3108 becameHistoric =
true;
3198 elog(
DEBUG1,
"walsender reached end of timeline at %X/%X (sent up to %X/%X)",
3228 if (SendRqstPtr <= endptr)
3230 endptr = SendRqstPtr;
3239 endptr -= (endptr % XLOG_BLCKSZ);
3243 nbytes = endptr - startptr;
3337 char activitymsg[50];
3339 snprintf(activitymsg,
sizeof(activitymsg),
"streaming %X/%X",
3374 elog(
ERROR,
"could not find record while sending logically-decoded data: %s",
3500 if (
receiveTLI == replayTLI && receivePtr > replayPtr)
3501 result = receivePtr;
3519 if (walsnd->
pid == 0)
3694 if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
3749 bool all_stopped =
true;
3757 if (walsnd->
pid == 0)
3765 all_stopped =
false;
3826 result->
time = offset;
3838#define PG_STAT_GET_WAL_SENDERS_COLS 12
3866 bool is_sync_standby;
3873 if (walsnd->
pid == 0)
3882 flush = walsnd->
flush;
3883 apply = walsnd->
apply;
3896 is_sync_standby =
false;
3897 for (
j = 0;
j < num_standbys;
j++)
3899 if (sync_standbys[
j].walsnd_index ==
i &&
3900 sync_standbys[
j].pid == pid)
3902 is_sync_standby =
true;
3975 else if (is_sync_standby)
4009 elog(
DEBUG2,
"sending replication keepalive");
4092 buffer_full =
false;
4206 (double) (lsn - prev.
lsn) / (double) (
next.lsn - prev.
lsn);
4210 ((
double) prev.
time + (
next.time - prev.
time) * fraction);
bool has_privs_of_role(Oid member, Oid role)
List * readTimeLineHistory(TimeLineID targetTLI)
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
TimestampTz GetCurrentTimestamp(void)
const char * timestamptz_to_str(TimestampTz t)
Datum now(PG_FUNCTION_ARGS)
void pgstat_report_activity(BackendState state, const char *cmd_str)
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
void AppendIncrementalManifestData(IncrementalBackupInfo *ib, const char *data, int len)
IncrementalBackupInfo * CreateIncrementalBackupInfo(MemoryContext mcxt)
void FinalizeIncrementalManifest(IncrementalBackupInfo *ib)
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
#define Assert(condition)
#define pg_attribute_noreturn()
#define MemSet(start, val, len)
#define OidIsValid(objectId)
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
char * get_database_name(Oid dbid)
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
char * defGetString(DefElem *def)
bool defGetBoolean(DefElem *def)
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
DestReceiver * CreateDestReceiver(CommandDest dest)
void EndReplicationCommand(const char *commandTag)
int errmsg_internal(const char *fmt,...)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
bool message_level_is_interesting(int elevel)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void do_tup_output(TupOutputState *tstate, const Datum *values, const bool *isnull)
const TupleTableSlotOps TTSOpsVirtual
void end_tup_output(TupOutputState *tstate)
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
int CloseTransientFile(int fd)
int BasicOpenFile(const char *fileName, int fileFlags)
int OpenTransientFile(const char *fileName, int fileFlags)
Datum Int64GetDatum(int64 X)
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
void ProcessConfigFile(GucContext context)
void GetPGVariable(const char *name, DestReceiver *dest)
static void dlist_init(dlist_head *head)
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
void SetLatch(Latch *latch)
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
void ResetLatch(Latch *latch)
#define WL_SOCKET_READABLE
#define WL_POSTMASTER_DEATH
#define WL_SOCKET_WRITEABLE
#define PQ_SMALL_MESSAGE_LIMIT
#define pq_flush_if_writable()
#define pq_is_send_pending()
#define PQ_LARGE_MESSAGE_LIMIT
#define pq_putmessage_noblock(msgtype, s, len)
#define FeBeWaitSetSocketPos
void list_free_deep(List *list)
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
void FreeDecodingContext(LogicalDecodingContext *ctx)
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
void CheckLogicalDecodingRequirements(void)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void LWLockReleaseAll(void)
char * MemoryContextStrdup(MemoryContext context, const char *string)
void * MemoryContextAllocZero(MemoryContext context, Size size)
char * pstrdup(const char *in)
void MemoryContextSetParent(MemoryContext context, MemoryContext new_parent)
void pfree(void *pointer)
MemoryContext TopMemoryContext
MemoryContext CurrentMemoryContext
MemoryContext CacheMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define HOLD_CANCEL_INTERRUPTS()
#define RESUME_CANCEL_INTERRUPTS()
#define CHECK_FOR_INTERRUPTS()
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
#define ERRCODE_DATA_CORRUPTED
#define foreach_ptr(type, var, lst)
static Datum LSNGetDatum(XLogRecPtr X)
void SendPostmasterSignal(PMSignalReason reason)
void MarkPostmasterChildWalSender(void)
@ PMSIGNAL_ADVANCE_STATE_MACHINE
void StatementCancelHandler(SIGNAL_ARGS)
CommandDest whereToSendOutput
const char * debug_query_string
static Datum Int32GetDatum(int32 X)
int pq_getbyte_if_available(unsigned char *c)
int pq_getmessage(StringInfo s, int maxlen)
WaitEventSet * FeBeWaitSet
void pq_startmsgread(void)
static int fd(const char *x, int i)
#define PROC_AFFECTS_ALL_HORIZONS
#define INVALID_PROC_NUMBER
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
void procsignal_sigusr1_handler(SIGNAL_ARGS)
@ PROCSIG_WALSND_INIT_STOPPING
#define PqMsg_CopyInResponse
#define PqMsg_CopyBothResponse
bool update_process_title
static void set_ps_display(const char *activity)
bool replication_scanner_is_replication_command(yyscan_t yyscanner)
void replication_scanner_finish(yyscan_t yyscanner)
void replication_scanner_init(const char *str, yyscan_t *yyscannerp)
@ REPLICATION_KIND_PHYSICAL
@ REPLICATION_KIND_LOGICAL
void ReleaseAuxProcessResources(bool isCommit)
ResourceOwner CurrentResourceOwner
void CreateAuxProcessResourceOwner(void)
ResourceOwner AuxProcessResourceOwner
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
void pg_usleep(long microsec)
static pg_noinline void Size size
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
void ReplicationSlotMarkDirty(void)
void ReplicationSlotReserveWal(void)
void ReplicationSlotAcquire(const char *name, bool nowait)
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
void ReplicationSlotPersist(void)
ReplicationSlot * MyReplicationSlot
void ReplicationSlotDrop(const char *name, bool nowait)
bool SlotExistsInSyncStandbySlots(const char *slot_name)
void ReplicationSlotSave(void)
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
void ReplicationSlotRelease(void)
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
void ReplicationSlotsComputeRequiredLSN(void)
void ReplicationSlotCleanup(bool synced_only)
#define SlotIsPhysical(slot)
#define SlotIsLogical(slot)
bool IsSyncingReplicationSlots(void)
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
const char * SnapBuildExportSnapshot(SnapBuild *builder)
void SnapBuildClearExportedSnapshot(void)
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void resetStringInfo(StringInfo str)
void enlargeStringInfo(StringInfo str, int needed)
void initStringInfo(StringInfo str)
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
int read_heads[NUM_SYNC_REP_WAIT_MODE]
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
struct SnapBuild * snapshot_builder
TransactionId catalog_xmin
XLogRecPtr confirmed_flush
TransactionId effective_catalog_xmin
TransactionId effective_xmin
ReplicationSlotPersistentData data
Tuplestorestate * setResult
ConditionVariable wal_confirm_rcv_cv
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
int sync_standby_priority
void SyncRepInitConfig(void)
SyncRepConfigData * SyncRepConfig
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
void SyncRepReleaseWaiters(void)
#define SYNC_REP_PRIORITY
#define NUM_SYNC_REP_WAIT_MODE
#define SyncRepRequested()
#define SYNC_REP_WAIT_WRITE
#define SYNC_REP_WAIT_FLUSH
#define SYNC_REP_WAIT_APPLY
void InitializeTimeouts(void)
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
#define InvalidTransactionId
#define EpochFromFullTransactionId(x)
#define XidFromFullTransactionId(x)
#define TransactionIdIsNormal(xid)
TupleDesc CreateTemplateTupleDesc(int natts)
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
static Datum TimestampTzGetDatum(TimestampTz X)
static Datum IntervalPGetDatum(const Interval *X)
#define TimestampTzPlusMilliseconds(tz, ms)
FullTransactionId ReadNextFullTransactionId(void)
static void pgstat_report_wait_start(uint32 wait_event_info)
static void pgstat_report_wait_end(void)
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
static void ProcessPendingWrites(void)
static XLogRecPtr sentPtr
#define READ_REPLICATION_SLOT_COLS
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
static void WalSndLastCycleHandler(SIGNAL_ARGS)
static volatile sig_atomic_t got_SIGUSR2
static void WalSndCheckTimeOut(void)
static void XLogSendPhysical(void)
static void ProcessRepliesIfAny(void)
static bool waiting_for_ping_response
void PhysicalWakeupLogicalWalSnd(void)
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
void WalSndErrorCleanup(void)
static void InitWalSenderSlot(void)
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
static void ProcessStandbyHSFeedbackMessage(void)
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
static StringInfoData tmpbuf
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
static LagTracker * lag_tracker
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
static void IdentifySystem(void)
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
static StringInfoData reply_message
static void WalSndKeepaliveIfNecessary(void)
void WalSndSetState(WalSndState state)
static StringInfoData output_message
static TimeLineID sendTimeLine
static bool HandleUploadManifestPacket(StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
static void WalSndLoop(WalSndSendDataCallback send_data)
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
void WalSndWakeup(bool physical, bool logical)
static LogicalDecodingContext * logical_decoding_ctx
static void XLogSendLogical(void)
void WalSndShmemInit(void)
static volatile sig_atomic_t replication_active
static void UploadManifest(void)
static volatile sig_atomic_t got_STOPPING
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
bool exec_replication_command(const char *cmd_string)
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
static bool NeedToWaitForStandbys(XLogRecPtr flushed_lsn, uint32 *wait_event)
#define PG_STAT_GET_WAL_SENDERS_COLS
void(* WalSndSendDataCallback)(void)
Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
void WalSndInitStopping(void)
void WalSndWaitStopping(void)
static bool sendTimeLineIsHistoric
void WalSndRqstFileReload(void)
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
bool am_cascading_walsender
static TimestampTz last_processing
static bool NeedToWaitForWal(XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
Size WalSndShmemSize(void)
bool log_replication_commands
void HandleWalSndInitStopping(void)
static TimeLineID sendTimeLineNextTLI
static MemoryContext uploaded_manifest_mcxt
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
static void ProcessStandbyReplyMessage(void)
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
static bool streamingDoneSending
static void StartLogicalReplication(StartReplicationCmd *cmd)
static IncrementalBackupInfo * uploaded_manifest
static void WalSndShutdown(void)
static void WalSndKill(int code, Datum arg)
static Interval * offset_to_interval(TimeOffset offset)
static bool WalSndCaughtUp
static XLogRecPtr sendTimeLineValidUpto
static void ProcessStandbyMessage(void)
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
#define LAG_TRACKER_BUFFER_SIZE
static const char * WalSndGetStateString(WalSndState state)
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
static long WalSndComputeSleeptime(TimestampTz now)
static bool streamingDoneReceiving
static void StartReplication(StartReplicationCmd *cmd)
static void WalSndDone(WalSndSendDataCallback send_data)
static XLogReaderState * xlogreader
static TimestampTz last_reply_timestamp
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
WalSndCtlData * WalSndCtl
PGDLLIMPORT Node * replication_parse_result
int replication_yyparse(yyscan_t yyscanner)
static const unsigned __int64 epoch
bool IsTransactionOrTransactionBlock(void)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
void StartTransactionCommand(void)
bool IsAbortedTransactionBlockState(void)
bool IsSubTransaction(void)
bool IsTransactionBlock(void)
void CommitTransactionCommand(void)
#define XACT_REPEATABLE_READ
uint64 GetSystemIdentifier(void)
bool RecoveryInProgress(void)
TimeLineID GetWALInsertionTimeLine(void)
Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, TimeLineID tli)
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
bool XLogBackgroundFlush(void)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
static void XLogFilePath(char *path, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void TLHistoryFilePath(char *path, TimeLineID tli)
static void TLHistoryFileName(char *fname, TimeLineID tli)
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
static TimeLineID receiveTLI
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
void wal_segment_close(XLogReaderState *state)
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength, TimeLineID currTLI)
void WALReadRaiseError(WALReadError *errinfo)