107 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
208 #define LAG_TRACKER_BUFFER_SIZE 8192
480 #define READ_REPLICATION_SLOT_COLS 3
501 if (slot == NULL || !slot->
in_use)
512 slot_contents = *slot;
518 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
519 errmsg(
"cannot use %s with a logical replication slot",
520 "READ_REPLICATION_SLOT"));
532 snprintf(xloc,
sizeof(xloc),
"%X/%X",
608 len = strlen(histfname);
616 errmsg(
"could not open file \"%s\": %m", path)));
619 histfilelen = lseek(
fd, 0, SEEK_END);
623 errmsg(
"could not seek to end of file \"%s\": %m", path)));
624 if (lseek(
fd, 0, SEEK_SET) != 0)
627 errmsg(
"could not seek to beginning of file \"%s\": %m", path)));
631 bytesleft = histfilelen;
632 while (bytesleft > 0)
643 errmsg(
"could not read file \"%s\": %m",
648 errmsg(
"could not read file \"%s\": read %d of %zu",
649 path, nread, (
Size) bytesleft)));
658 errmsg(
"could not close file \"%s\": %m", path)));
685 (
errcode(ERRCODE_OUT_OF_MEMORY),
687 errdetail(
"Failed while allocating a WAL reading processor.")));
703 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
704 errmsg(
"cannot use a logical replication slot for physical replication")));
735 List *timeLineHistory;
767 switchpoint < cmd->startpoint)
770 (
errmsg(
"requested starting point %X/%X on timeline %u is not in this server's history",
773 errdetail(
"This server's history forked from timeline %u at %X/%X.",
814 if (FlushPtr < cmd->startpoint)
817 (
errmsg(
"requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
854 char startpos_str[8 + 1 + 8 + 1];
861 snprintf(startpos_str,
sizeof(startpos_str),
"%X/%X",
938 if (flushptr < targetPagePtr + reqLen)
941 if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
944 count = flushptr - targetPagePtr;
980 bool snapshot_action_given =
false;
981 bool reserve_wal_given =
false;
982 bool two_phase_given =
false;
989 if (strcmp(defel->
defname,
"snapshot") == 0)
995 (
errcode(ERRCODE_SYNTAX_ERROR),
996 errmsg(
"conflicting or redundant options")));
999 snapshot_action_given =
true;
1001 if (strcmp(
action,
"export") == 0)
1003 else if (strcmp(
action,
"nothing") == 0)
1005 else if (strcmp(
action,
"use") == 0)
1009 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1010 errmsg(
"unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
1013 else if (strcmp(defel->
defname,
"reserve_wal") == 0)
1017 (
errcode(ERRCODE_SYNTAX_ERROR),
1018 errmsg(
"conflicting or redundant options")));
1020 reserve_wal_given =
true;
1023 else if (strcmp(defel->
defname,
"two_phase") == 0)
1027 (
errcode(ERRCODE_SYNTAX_ERROR),
1028 errmsg(
"conflicting or redundant options")));
1029 two_phase_given =
true;
1043 const char *snapshot_name = NULL;
1046 bool reserve_wal =
false;
1053 bool nulls[4] = {0};
1084 bool need_full_snapshot =
false;
1095 (
errmsg(
"%s must not be called inside a transaction",
1096 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1098 need_full_snapshot =
true;
1105 (
errmsg(
"%s must be called inside a transaction",
1106 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1111 (
errmsg(
"%s must be called in REPEATABLE READ isolation mode transaction",
1112 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1116 (
errmsg(
"%s must be called in a read-only transaction",
1117 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1122 (
errmsg(
"%s must be called before any query",
1123 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1128 (
errmsg(
"%s must not be called in a subtransaction",
1129 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1131 need_full_snapshot =
true;
1189 snprintf(xloc,
sizeof(xloc),
"%X/%X",
1222 if (snapshot_name != NULL)
1274 (
errmsg(
"terminating walsender process after promotion")));
1387 memcpy(&ctx->
out->
data[1 +
sizeof(int64) +
sizeof(int64)],
1438 WAIT_EVENT_WAL_SENDER_WRITE_DATA);
1475 bool pending_writes =
false;
1486 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1514 pending_writes =
true;
1524 if (pending_writes || (!end_xact &&
1549 loc <= RecentFlushPtr)
1550 return RecentFlushPtr;
1616 if (loc <= RecentFlushPtr)
1657 WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL);
1662 return RecentFlushPtr;
1694 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1695 errmsg(
"cannot execute new commands while WAL sender is in stopping mode")));
1709 "Replication command context",
1729 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1730 errmsg(
"cannot execute SQL commands in WAL sender for physical replication")));
1742 (
errcode(ERRCODE_SYNTAX_ERROR),
1763 (
errmsg(
"received replication command: %s", cmd_string)));
1770 (
errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1771 errmsg(
"current transaction is aborted, "
1772 "commands ignored until end of transaction block")));
1784 switch (cmd_node->
type)
1786 case T_IdentifySystemCmd:
1787 cmdtag =
"IDENTIFY_SYSTEM";
1793 case T_ReadReplicationSlotCmd:
1794 cmdtag =
"READ_REPLICATION_SLOT";
1800 case T_BaseBackupCmd:
1801 cmdtag =
"BASE_BACKUP";
1808 case T_CreateReplicationSlotCmd:
1809 cmdtag =
"CREATE_REPLICATION_SLOT";
1815 case T_DropReplicationSlotCmd:
1816 cmdtag =
"DROP_REPLICATION_SLOT";
1822 case T_StartReplicationCmd:
1826 cmdtag =
"START_REPLICATION";
1842 case T_TimeLineHistoryCmd:
1843 cmdtag =
"TIMELINE_HISTORY";
1850 case T_VariableShowStmt:
1867 elog(
ERROR,
"unrecognized replication command node tag: %u",
1892 unsigned char firstchar;
1895 bool received =
false;
1912 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1913 errmsg(
"unexpected EOF on standby connection")));
1935 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1936 errmsg(
"invalid standby message type \"%c\"",
1947 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1948 errmsg(
"unexpected EOF on standby connection")));
2024 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
2025 errmsg(
"unexpected message type \"%c\"", msgtype)));
2036 bool changed =
false;
2071 bool replyRequested;
2079 static bool fullyAppliedLastTime =
false;
2095 elog(
DEBUG2,
"write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
2099 replyRequested ?
" (reply requested)" :
"",
2102 pfree(replyTimeStr);
2119 clearLagTimes =
false;
2122 if (fullyAppliedLastTime)
2123 clearLagTimes =
true;
2124 fullyAppliedLastTime =
true;
2127 fullyAppliedLastTime =
false;
2141 walsnd->
write = writePtr;
2142 walsnd->
flush = flushPtr;
2143 walsnd->
apply = applyPtr;
2144 if (writeLag != -1 || clearLagTimes)
2146 if (flushLag != -1 || clearLagTimes)
2148 if (applyLag != -1 || clearLagTimes)
2173 bool changed =
false;
2232 if (
epoch != nextEpoch)
2237 if (
epoch + 1 != nextEpoch)
2256 uint32 feedbackCatalogEpoch;
2277 elog(
DEBUG2,
"hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2280 feedbackCatalogXmin,
2281 feedbackCatalogEpoch,
2284 pfree(replyTimeStr);
2377 long sleeptime = 10000;
2438 (
errmsg(
"terminating walsender process due to replication timeout")));
2565 WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2593 if (walsnd->
pid != 0)
2659 walsnd->
latch = NULL;
2702 if (nextSegNo == endSegNo)
2708 if (
state->seg.ws_file >= 0)
2715 if (errno == ENOENT)
2718 int save_errno = errno;
2724 errmsg(
"requested WAL segment %s has already been removed",
2730 errmsg(
"could not open file \"%s\": %m",
2795 bool becameHistoric =
false;
2804 becameHistoric =
true;
2813 becameHistoric =
true;
2903 elog(
DEBUG1,
"walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2933 if (SendRqstPtr <= endptr)
2935 endptr = SendRqstPtr;
2944 endptr -= (endptr % XLOG_BLCKSZ);
2948 nbytes = endptr - startptr;
3033 char activitymsg[50];
3035 snprintf(activitymsg,
sizeof(activitymsg),
"streaming %X/%X",
3070 elog(
ERROR,
"could not find record while sending logically-decoded data: %s",
3191 if (
receiveTLI == replayTLI && receivePtr > replayPtr)
3192 result = receivePtr;
3210 if (walsnd->
pid == 0)
3248 int save_errno = errno;
3437 bool all_stopped =
true;
3445 if (walsnd->
pid == 0)
3453 all_stopped =
false;
3514 result->
time = offset;
3526 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3554 bool is_sync_standby;
3561 if (walsnd->
pid == 0)
3570 flush = walsnd->
flush;
3571 apply = walsnd->
apply;
3584 is_sync_standby =
false;
3585 for (
j = 0;
j < num_standbys;
j++)
3587 if (sync_standbys[
j].walsnd_index ==
i &&
3588 sync_standbys[
j].pid == pid)
3590 is_sync_standby =
true;
3663 else if (is_sync_standby)
3697 elog(
DEBUG2,
"sending replication keepalive");
3780 buffer_full =
false;
3894 (double) (lsn - prev.
lsn) / (double) (
next.lsn - prev.
lsn);
3898 ((
double) prev.
time + (
next.time - prev.
time) * fraction);
bool has_privs_of_role(Oid member, Oid role)
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
List * readTimeLineHistory(TimeLineID targetTLI)
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
const char * timestamptz_to_str(TimestampTz t)
void pgstat_report_activity(BackendState state, const char *cmd_str)
void SendBaseBackup(BaseBackupCmd *cmd)
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
#define pg_attribute_noreturn()
#define MemSet(start, val, len)
#define OidIsValid(objectId)
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
char * get_database_name(Oid dbid)
elog(ERROR, "%s: %s", p2, msg)
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
bool defGetBoolean(DefElem *def)
char * defGetString(DefElem *def)
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
void EndReplicationCommand(const char *commandTag)
DestReceiver * CreateDestReceiver(CommandDest dest)
int errmsg_internal(const char *fmt,...)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
bool message_level_is_interesting(int elevel)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
const TupleTableSlotOps TTSOpsVirtual
void end_tup_output(TupOutputState *tstate)
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
int CloseTransientFile(int fd)
int BasicOpenFile(const char *fileName, int fileFlags)
int OpenTransientFile(const char *fileName, int fileFlags)
Datum Int64GetDatum(int64 X)
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
void ProcessConfigFile(GucContext context)
void GetPGVariable(const char *name, DestReceiver *dest)
static void dlist_init(dlist_head *head)
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
void SetLatch(Latch *latch)
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
void ResetLatch(Latch *latch)
#define WL_SOCKET_READABLE
#define WL_POSTMASTER_DEATH
#define WL_SOCKET_WRITEABLE
#define PQ_SMALL_MESSAGE_LIMIT
#define pq_flush_if_writable()
#define pq_is_send_pending()
#define PQ_LARGE_MESSAGE_LIMIT
#define pq_putmessage_noblock(msgtype, s, len)
#define FeBeWaitSetSocketPos
Assert(fmt[strlen(fmt) - 1] !='\n')
void list_free_deep(List *list)
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
void FreeDecodingContext(LogicalDecodingContext *ctx)
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
void CheckLogicalDecodingRequirements(void)
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void LWLockReleaseAll(void)
char * pstrdup(const char *in)
void pfree(void *pointer)
MemoryContext TopMemoryContext
void * MemoryContextAllocZero(MemoryContext context, Size size)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define CHECK_FOR_INTERRUPTS()
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
#define ERRCODE_DATA_CORRUPTED
static Datum LSNGetDatum(XLogRecPtr X)
void SendPostmasterSignal(PMSignalReason reason)
void MarkPostmasterChildWalSender(void)
@ PMSIGNAL_ADVANCE_STATE_MACHINE
pqsigfunc pqsignal(int signo, pqsigfunc func)
void StatementCancelHandler(SIGNAL_ARGS)
CommandDest whereToSendOutput
const char * debug_query_string
static Datum Int32GetDatum(int32 X)
int pq_getbyte_if_available(unsigned char *c)
int pq_getmessage(StringInfo s, int maxlen)
WaitEventSet * FeBeWaitSet
void pq_startmsgread(void)
static int fd(const char *x, int i)
#define PROC_AFFECTS_ALL_HORIZONS
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
void procsignal_sigusr1_handler(SIGNAL_ARGS)
@ PROCSIG_WALSND_INIT_STOPPING
#define PqMsg_CopyBothResponse
bool update_process_title
static void set_ps_display(const char *activity)
@ REPLICATION_KIND_PHYSICAL
@ REPLICATION_KIND_LOGICAL
ResourceOwner CurrentResourceOwner
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
void ResourceOwnerDelete(ResourceOwner owner)
@ RESOURCE_RELEASE_BEFORE_LOCKS
@ RESOURCE_RELEASE_AFTER_LOCKS
Size add_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Size mul_size(Size s1, Size s2)
void pg_usleep(long microsec)
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
void ReplicationSlotCleanup(void)
void ReplicationSlotMarkDirty(void)
void ReplicationSlotReserveWal(void)
void ReplicationSlotAcquire(const char *name, bool nowait)
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
void ReplicationSlotPersist(void)
ReplicationSlot * MyReplicationSlot
void ReplicationSlotDrop(const char *name, bool nowait)
void ReplicationSlotSave(void)
void ReplicationSlotRelease(void)
void ReplicationSlotsComputeRequiredLSN(void)
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase)
#define SlotIsLogical(slot)
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
void SnapBuildClearExportedSnapshot(void)
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void resetStringInfo(StringInfo str)
void enlargeStringInfo(StringInfo str, int needed)
void initStringInfo(StringInfo str)
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
int read_heads[NUM_SYNC_REP_WAIT_MODE]
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
struct SnapBuild * snapshot_builder
TransactionId catalog_xmin
XLogRecPtr confirmed_flush
TransactionId effective_catalog_xmin
TransactionId effective_xmin
ReplicationSlotPersistentData data
Tuplestorestate * setResult
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
int sync_standby_priority
void SyncRepInitConfig(void)
SyncRepConfigData * SyncRepConfig
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
void SyncRepReleaseWaiters(void)
#define SYNC_REP_PRIORITY
#define NUM_SYNC_REP_WAIT_MODE
#define SyncRepRequested()
#define SYNC_REP_WAIT_WRITE
#define SYNC_REP_WAIT_FLUSH
#define SYNC_REP_WAIT_APPLY
void InitializeTimeouts(void)
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
#define InvalidTransactionId
#define EpochFromFullTransactionId(x)
#define XidFromFullTransactionId(x)
#define TransactionIdIsNormal(xid)
TupleDesc CreateTemplateTupleDesc(int natts)
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
static Datum TimestampTzGetDatum(TimestampTz X)
static Datum IntervalPGetDatum(const Interval *X)
#define TimestampTzPlusMilliseconds(tz, ms)
FullTransactionId ReadNextFullTransactionId(void)
static void pgstat_report_wait_start(uint32 wait_event_info)
static void pgstat_report_wait_end(void)
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
static void ProcessPendingWrites(void)
static XLogRecPtr sentPtr
#define READ_REPLICATION_SLOT_COLS
static Interval * offset_to_interval(TimeOffset offset)
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
static void WalSndLastCycleHandler(SIGNAL_ARGS)
static volatile sig_atomic_t got_SIGUSR2
static void WalSndCheckTimeOut(void)
static void XLogSendPhysical(void)
static void ProcessRepliesIfAny(void)
static bool waiting_for_ping_response
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
void WalSndErrorCleanup(void)
static void InitWalSenderSlot(void)
static void ProcessStandbyHSFeedbackMessage(void)
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
static StringInfoData tmpbuf
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
static LagTracker * lag_tracker
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
static void IdentifySystem(void)
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
static StringInfoData reply_message
static void WalSndKeepaliveIfNecessary(void)
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase)
void WalSndSetState(WalSndState state)
static StringInfoData output_message
static TimeLineID sendTimeLine
static void WalSndLoop(WalSndSendDataCallback send_data)
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
void WalSndWakeup(bool physical, bool logical)
static LogicalDecodingContext * logical_decoding_ctx
static void XLogSendLogical(void)
void WalSndShmemInit(void)
static volatile sig_atomic_t replication_active
static volatile sig_atomic_t got_STOPPING
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
bool exec_replication_command(const char *cmd_string)
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
#define PG_STAT_GET_WAL_SENDERS_COLS
void(* WalSndSendDataCallback)(void)
Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
void WalSndInitStopping(void)
void WalSndWaitStopping(void)
static bool sendTimeLineIsHistoric
void WalSndResourceCleanup(bool isCommit)
void WalSndRqstFileReload(void)
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
bool am_cascading_walsender
static TimestampTz last_processing
Size WalSndShmemSize(void)
bool log_replication_commands
void HandleWalSndInitStopping(void)
static TimeLineID sendTimeLineNextTLI
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
static void ProcessStandbyReplyMessage(void)
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
static bool streamingDoneSending
static void StartLogicalReplication(StartReplicationCmd *cmd)
static void WalSndShutdown(void)
static void WalSndKill(int code, Datum arg)
static bool WalSndCaughtUp
static XLogRecPtr sendTimeLineValidUpto
static void ProcessStandbyMessage(void)
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
#define LAG_TRACKER_BUFFER_SIZE
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
static long WalSndComputeSleeptime(TimestampTz now)
static bool streamingDoneReceiving
static void StartReplication(StartReplicationCmd *cmd)
static void WalSndDone(WalSndSendDataCallback send_data)
static const char * WalSndGetStateString(WalSndState state)
static XLogReaderState * xlogreader
static TimestampTz last_reply_timestamp
WalSndCtlData * WalSndCtl
PGDLLIMPORT Node * replication_parse_result
void replication_scanner_finish(void)
int replication_yyparse(void)
void replication_scanner_init(const char *str)
bool replication_scanner_is_replication_command(void)
static const unsigned __int64 epoch
bool IsTransactionOrTransactionBlock(void)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
void StartTransactionCommand(void)
bool IsAbortedTransactionBlockState(void)
bool IsSubTransaction(void)
bool IsTransactionBlock(void)
void CommitTransactionCommand(void)
#define XACT_REPEATABLE_READ
uint64 GetSystemIdentifier(void)
bool RecoveryInProgress(void)
TimeLineID GetWALInsertionTimeLine(void)
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
bool XLogBackgroundFlush(void)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
static void XLogFilePath(char *path, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void TLHistoryFilePath(char *path, TimeLineID tli)
static void TLHistoryFileName(char *fname, TimeLineID tli)
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
static TimeLineID receiveTLI
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
void wal_segment_close(XLogReaderState *state)
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength, TimeLineID currTLI)
void WALReadRaiseError(WALReadError *errinfo)