107 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
208 #define LAG_TRACKER_BUFFER_SIZE 8192
480 #define READ_REPLICATION_SLOT_COLS 3
501 if (slot == NULL || !slot->
in_use)
512 slot_contents = *slot;
518 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
519 errmsg(
"cannot use %s with a logical replication slot",
520 "READ_REPLICATION_SLOT"));
532 snprintf(xloc,
sizeof(xloc),
"%X/%X",
608 len = strlen(histfname);
616 errmsg(
"could not open file \"%s\": %m", path)));
619 histfilelen = lseek(
fd, 0, SEEK_END);
623 errmsg(
"could not seek to end of file \"%s\": %m", path)));
624 if (lseek(
fd, 0, SEEK_SET) != 0)
627 errmsg(
"could not seek to beginning of file \"%s\": %m", path)));
631 bytesleft = histfilelen;
632 while (bytesleft > 0)
643 errmsg(
"could not read file \"%s\": %m",
648 errmsg(
"could not read file \"%s\": read %d of %zu",
649 path, nread, (
Size) bytesleft)));
658 errmsg(
"could not close file \"%s\": %m", path)));
685 (
errcode(ERRCODE_OUT_OF_MEMORY),
687 errdetail(
"Failed while allocating a WAL reading processor.")));
703 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
704 errmsg(
"cannot use a logical replication slot for physical replication")));
735 List *timeLineHistory;
767 switchpoint < cmd->startpoint)
770 (
errmsg(
"requested starting point %X/%X on timeline %u is not in this server's history",
773 errdetail(
"This server's history forked from timeline %u at %X/%X.",
814 if (FlushPtr < cmd->startpoint)
817 (
errmsg(
"requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
854 char startpos_str[8 + 1 + 8 + 1];
861 snprintf(startpos_str,
sizeof(startpos_str),
"%X/%X",
927 if (flushptr < targetPagePtr + reqLen)
930 if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
933 count = flushptr - targetPagePtr;
969 bool snapshot_action_given =
false;
970 bool reserve_wal_given =
false;
971 bool two_phase_given =
false;
978 if (strcmp(defel->
defname,
"snapshot") == 0)
984 (
errcode(ERRCODE_SYNTAX_ERROR),
985 errmsg(
"conflicting or redundant options")));
988 snapshot_action_given =
true;
990 if (strcmp(
action,
"export") == 0)
992 else if (strcmp(
action,
"nothing") == 0)
994 else if (strcmp(
action,
"use") == 0)
998 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
999 errmsg(
"unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
1002 else if (strcmp(defel->
defname,
"reserve_wal") == 0)
1006 (
errcode(ERRCODE_SYNTAX_ERROR),
1007 errmsg(
"conflicting or redundant options")));
1009 reserve_wal_given =
true;
1012 else if (strcmp(defel->
defname,
"two_phase") == 0)
1016 (
errcode(ERRCODE_SYNTAX_ERROR),
1017 errmsg(
"conflicting or redundant options")));
1018 two_phase_given =
true;
1032 const char *snapshot_name = NULL;
1035 bool reserve_wal =
false;
1042 bool nulls[4] = {0};
1073 bool need_full_snapshot =
false;
1084 (
errmsg(
"%s must not be called inside a transaction",
1085 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1087 need_full_snapshot =
true;
1094 (
errmsg(
"%s must be called inside a transaction",
1095 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1100 (
errmsg(
"%s must be called in REPEATABLE READ isolation mode transaction",
1101 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1105 (
errmsg(
"%s must be called in a read only transaction",
1106 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1111 (
errmsg(
"%s must be called before any query",
1112 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1117 (
errmsg(
"%s must not be called in a subtransaction",
1118 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1120 need_full_snapshot =
true;
1178 snprintf(xloc,
sizeof(xloc),
"%X/%X",
1212 if (snapshot_name != NULL)
1258 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1259 errmsg(
"cannot read from logical replication slot \"%s\"",
1261 errdetail(
"This slot has been invalidated because it exceeded the maximum reserved size.")));
1271 (
errmsg(
"terminating walsender process after promotion")));
1384 memcpy(&ctx->
out->
data[1 +
sizeof(int64) +
sizeof(int64)],
1472 bool pending_writes =
false;
1483 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1511 pending_writes =
true;
1521 if (pending_writes || (!end_xact &&
1546 loc <= RecentFlushPtr)
1547 return RecentFlushPtr;
1613 if (loc <= RecentFlushPtr)
1659 return RecentFlushPtr;
1691 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1692 errmsg(
"cannot execute new commands while WAL sender is in stopping mode")));
1706 "Replication command context",
1726 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1727 errmsg(
"cannot execute SQL commands in WAL sender for physical replication")));
1739 (
errcode(ERRCODE_SYNTAX_ERROR),
1760 (
errmsg(
"received replication command: %s", cmd_string)));
1767 (
errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1768 errmsg(
"current transaction is aborted, "
1769 "commands ignored until end of transaction block")));
1781 switch (cmd_node->
type)
1783 case T_IdentifySystemCmd:
1784 cmdtag =
"IDENTIFY_SYSTEM";
1790 case T_ReadReplicationSlotCmd:
1791 cmdtag =
"READ_REPLICATION_SLOT";
1797 case T_BaseBackupCmd:
1798 cmdtag =
"BASE_BACKUP";
1805 case T_CreateReplicationSlotCmd:
1806 cmdtag =
"CREATE_REPLICATION_SLOT";
1812 case T_DropReplicationSlotCmd:
1813 cmdtag =
"DROP_REPLICATION_SLOT";
1819 case T_StartReplicationCmd:
1823 cmdtag =
"START_REPLICATION";
1839 case T_TimeLineHistoryCmd:
1840 cmdtag =
"TIMELINE_HISTORY";
1847 case T_VariableShowStmt:
1864 elog(
ERROR,
"unrecognized replication command node tag: %u",
1889 unsigned char firstchar;
1892 bool received =
false;
1909 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1910 errmsg(
"unexpected EOF on standby connection")));
1932 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1933 errmsg(
"invalid standby message type \"%c\"",
1944 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1945 errmsg(
"unexpected EOF on standby connection")));
2021 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
2022 errmsg(
"unexpected message type \"%c\"", msgtype)));
2033 bool changed =
false;
2068 bool replyRequested;
2076 static bool fullyAppliedLastTime =
false;
2092 elog(
DEBUG2,
"write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
2096 replyRequested ?
" (reply requested)" :
"",
2099 pfree(replyTimeStr);
2116 clearLagTimes =
false;
2119 if (fullyAppliedLastTime)
2120 clearLagTimes =
true;
2121 fullyAppliedLastTime =
true;
2124 fullyAppliedLastTime =
false;
2138 walsnd->
write = writePtr;
2139 walsnd->
flush = flushPtr;
2140 walsnd->
apply = applyPtr;
2141 if (writeLag != -1 || clearLagTimes)
2143 if (flushLag != -1 || clearLagTimes)
2145 if (applyLag != -1 || clearLagTimes)
2170 bool changed =
false;
2229 if (
epoch != nextEpoch)
2234 if (
epoch + 1 != nextEpoch)
2253 uint32 feedbackCatalogEpoch;
2274 elog(
DEBUG2,
"hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2277 feedbackCatalogXmin,
2278 feedbackCatalogEpoch,
2281 pfree(replyTimeStr);
2374 long sleeptime = 10000;
2435 (
errmsg(
"terminating walsender process due to replication timeout")));
2590 if (walsnd->
pid != 0)
2639 walsnd->
latch = NULL;
2682 if (nextSegNo == endSegNo)
2688 if (
state->seg.ws_file >= 0)
2695 if (errno == ENOENT)
2698 int save_errno = errno;
2704 errmsg(
"requested WAL segment %s has already been removed",
2710 errmsg(
"could not open file \"%s\": %m",
2775 bool becameHistoric =
false;
2784 becameHistoric =
true;
2793 becameHistoric =
true;
2883 elog(
DEBUG1,
"walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2913 if (SendRqstPtr <= endptr)
2915 endptr = SendRqstPtr;
2924 endptr -= (endptr % XLOG_BLCKSZ);
2928 nbytes = endptr - startptr;
3013 char activitymsg[50];
3015 snprintf(activitymsg,
sizeof(activitymsg),
"streaming %X/%X",
3050 elog(
ERROR,
"could not find record while sending logically-decoded data: %s",
3166 if (
receiveTLI == replayTLI && receivePtr > replayPtr)
3167 result = receivePtr;
3185 if (walsnd->
pid == 0)
3223 int save_errno = errno;
3310 latch = walsnd->
latch;
3372 bool all_stopped =
true;
3380 if (walsnd->
pid == 0)
3388 all_stopped =
false;
3449 result->
time = offset;
3461 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3489 bool is_sync_standby;
3496 if (walsnd->
pid == 0)
3505 flush = walsnd->
flush;
3506 apply = walsnd->
apply;
3519 is_sync_standby =
false;
3520 for (
j = 0;
j < num_standbys;
j++)
3522 if (sync_standbys[
j].walsnd_index ==
i &&
3523 sync_standbys[
j].pid == pid)
3525 is_sync_standby =
true;
3598 else if (is_sync_standby)
3632 elog(
DEBUG2,
"sending replication keepalive");
3715 buffer_full =
false;
3829 (double) (lsn - prev.
lsn) / (double) (
next.lsn - prev.
lsn);
3833 ((
double) prev.
time + (
next.time - prev.
time) * fraction);
bool has_privs_of_role(Oid member, Oid role)
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
List * readTimeLineHistory(TimeLineID targetTLI)
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
const char * timestamptz_to_str(TimestampTz t)
void pgstat_report_activity(BackendState state, const char *cmd_str)
void SendBaseBackup(BaseBackupCmd *cmd)
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
#define pg_attribute_noreturn()
#define MemSet(start, val, len)
#define OidIsValid(objectId)
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
void ConditionVariableCancelSleep(void)
char * get_database_name(Oid dbid)
elog(ERROR, "%s: %s", p2, msg)
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
bool defGetBoolean(DefElem *def)
char * defGetString(DefElem *def)
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
void EndReplicationCommand(const char *commandTag)
DestReceiver * CreateDestReceiver(CommandDest dest)
int errmsg_internal(const char *fmt,...)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
bool message_level_is_interesting(int elevel)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
const TupleTableSlotOps TTSOpsVirtual
void end_tup_output(TupOutputState *tstate)
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
int CloseTransientFile(int fd)
int BasicOpenFile(const char *fileName, int fileFlags)
int OpenTransientFile(const char *fileName, int fileFlags)
Datum Int64GetDatum(int64 X)
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
void ProcessConfigFile(GucContext context)
void GetPGVariable(const char *name, DestReceiver *dest)
static void dlist_init(dlist_head *head)
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
void SetLatch(Latch *latch)
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
void ResetLatch(Latch *latch)
#define WL_SOCKET_READABLE
#define WL_POSTMASTER_DEATH
#define WL_SOCKET_WRITEABLE
#define PQ_SMALL_MESSAGE_LIMIT
#define pq_flush_if_writable()
#define pq_is_send_pending()
#define PQ_LARGE_MESSAGE_LIMIT
#define pq_putmessage_noblock(msgtype, s, len)
#define FeBeWaitSetSocketPos
Assert(fmt[strlen(fmt) - 1] !='\n')
void list_free_deep(List *list)
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
void FreeDecodingContext(LogicalDecodingContext *ctx)
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
void CheckLogicalDecodingRequirements(void)
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void LWLockReleaseAll(void)
char * pstrdup(const char *in)
void pfree(void *pointer)
MemoryContext TopMemoryContext
void * MemoryContextAllocZero(MemoryContext context, Size size)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define CHECK_FOR_INTERRUPTS()
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
#define ERRCODE_DATA_CORRUPTED
static Datum LSNGetDatum(XLogRecPtr X)
void SendPostmasterSignal(PMSignalReason reason)
void MarkPostmasterChildWalSender(void)
@ PMSIGNAL_ADVANCE_STATE_MACHINE
pqsigfunc pqsignal(int signo, pqsigfunc func)
void StatementCancelHandler(SIGNAL_ARGS)
CommandDest whereToSendOutput
const char * debug_query_string
static Datum Int32GetDatum(int32 X)
int pq_getbyte_if_available(unsigned char *c)
int pq_getmessage(StringInfo s, int maxlen)
WaitEventSet * FeBeWaitSet
void pq_startmsgread(void)
static int fd(const char *x, int i)
#define PROC_AFFECTS_ALL_HORIZONS
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
void procsignal_sigusr1_handler(SIGNAL_ARGS)
@ PROCSIG_WALSND_INIT_STOPPING
bool update_process_title
static void set_ps_display(const char *activity)
@ REPLICATION_KIND_PHYSICAL
@ REPLICATION_KIND_LOGICAL
ResourceOwner CurrentResourceOwner
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
void ResourceOwnerDelete(ResourceOwner owner)
@ RESOURCE_RELEASE_BEFORE_LOCKS
@ RESOURCE_RELEASE_AFTER_LOCKS
Size add_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Size mul_size(Size s1, Size s2)
void pg_usleep(long microsec)
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
void ReplicationSlotCleanup(void)
void ReplicationSlotMarkDirty(void)
void ReplicationSlotReserveWal(void)
void ReplicationSlotAcquire(const char *name, bool nowait)
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
void ReplicationSlotPersist(void)
ReplicationSlot * MyReplicationSlot
void ReplicationSlotDrop(const char *name, bool nowait)
void ReplicationSlotSave(void)
void ReplicationSlotRelease(void)
void ReplicationSlotsComputeRequiredLSN(void)
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase)
#define SlotIsLogical(slot)
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
void SnapBuildClearExportedSnapshot(void)
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void resetStringInfo(StringInfo str)
void enlargeStringInfo(StringInfo str, int needed)
void initStringInfo(StringInfo str)
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
int read_heads[NUM_SYNC_REP_WAIT_MODE]
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
struct SnapBuild * snapshot_builder
TransactionId catalog_xmin
XLogRecPtr confirmed_flush
TransactionId effective_catalog_xmin
TransactionId effective_xmin
ReplicationSlotPersistentData data
Tuplestorestate * setResult
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int sync_standby_priority
void SyncRepInitConfig(void)
SyncRepConfigData * SyncRepConfig
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
void SyncRepReleaseWaiters(void)
#define SYNC_REP_PRIORITY
#define NUM_SYNC_REP_WAIT_MODE
#define SyncRepRequested()
#define SYNC_REP_WAIT_WRITE
#define SYNC_REP_WAIT_FLUSH
#define SYNC_REP_WAIT_APPLY
void InitializeTimeouts(void)
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
#define InvalidTransactionId
#define EpochFromFullTransactionId(x)
#define XidFromFullTransactionId(x)
#define TransactionIdIsNormal(xid)
TupleDesc CreateTemplateTupleDesc(int natts)
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
static Datum TimestampTzGetDatum(TimestampTz X)
static Datum IntervalPGetDatum(const Interval *X)
#define TimestampTzPlusMilliseconds(tz, ms)
FullTransactionId ReadNextFullTransactionId(void)
@ WAIT_EVENT_WAL_SENDER_MAIN
@ WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ
@ WAIT_EVENT_WAL_SENDER_WRITE_DATA
@ WAIT_EVENT_WAL_SENDER_WAIT_WAL
static void pgstat_report_wait_start(uint32 wait_event_info)
static void pgstat_report_wait_end(void)
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
static void ProcessPendingWrites(void)
static XLogRecPtr sentPtr
#define READ_REPLICATION_SLOT_COLS
static Interval * offset_to_interval(TimeOffset offset)
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
static void WalSndLastCycleHandler(SIGNAL_ARGS)
static volatile sig_atomic_t got_SIGUSR2
static void WalSndCheckTimeOut(void)
static void XLogSendPhysical(void)
static void ProcessRepliesIfAny(void)
static bool waiting_for_ping_response
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
void WalSndErrorCleanup(void)
static void InitWalSenderSlot(void)
static void ProcessStandbyHSFeedbackMessage(void)
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
static StringInfoData tmpbuf
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
static LagTracker * lag_tracker
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
static void IdentifySystem(void)
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
static StringInfoData reply_message
static void WalSndKeepaliveIfNecessary(void)
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase)
void WalSndSetState(WalSndState state)
static StringInfoData output_message
static TimeLineID sendTimeLine
static void WalSndLoop(WalSndSendDataCallback send_data)
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
static LogicalDecodingContext * logical_decoding_ctx
static void XLogSendLogical(void)
void WalSndShmemInit(void)
static volatile sig_atomic_t replication_active
static volatile sig_atomic_t got_STOPPING
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
bool exec_replication_command(const char *cmd_string)
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
#define PG_STAT_GET_WAL_SENDERS_COLS
void(* WalSndSendDataCallback)(void)
Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
void WalSndInitStopping(void)
void WalSndWaitStopping(void)
static bool sendTimeLineIsHistoric
void WalSndResourceCleanup(bool isCommit)
void WalSndRqstFileReload(void)
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
bool am_cascading_walsender
static TimestampTz last_processing
Size WalSndShmemSize(void)
bool log_replication_commands
void HandleWalSndInitStopping(void)
static TimeLineID sendTimeLineNextTLI
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
static void ProcessStandbyReplyMessage(void)
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
static bool streamingDoneSending
static void StartLogicalReplication(StartReplicationCmd *cmd)
static void WalSndShutdown(void)
static void WalSndKill(int code, Datum arg)
static bool WalSndCaughtUp
static XLogRecPtr sendTimeLineValidUpto
static void ProcessStandbyMessage(void)
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
#define LAG_TRACKER_BUFFER_SIZE
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
static long WalSndComputeSleeptime(TimestampTz now)
static bool streamingDoneReceiving
static void StartReplication(StartReplicationCmd *cmd)
static void WalSndDone(WalSndSendDataCallback send_data)
static const char * WalSndGetStateString(WalSndState state)
static XLogReaderState * xlogreader
static TimestampTz last_reply_timestamp
WalSndCtlData * WalSndCtl
PGDLLIMPORT Node * replication_parse_result
void replication_scanner_finish(void)
int replication_yyparse(void)
void replication_scanner_init(const char *str)
bool replication_scanner_is_replication_command(void)
static const unsigned __int64 epoch
bool IsTransactionOrTransactionBlock(void)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
void StartTransactionCommand(void)
bool IsAbortedTransactionBlockState(void)
bool IsSubTransaction(void)
bool IsTransactionBlock(void)
void CommitTransactionCommand(void)
#define XACT_REPEATABLE_READ
uint64 GetSystemIdentifier(void)
bool RecoveryInProgress(void)
TimeLineID GetWALInsertionTimeLine(void)
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
bool XLogBackgroundFlush(void)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
static void XLogFilePath(char *path, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void TLHistoryFilePath(char *path, TimeLineID tli)
static void TLHistoryFileName(char *fname, TimeLineID tli)
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
static TimeLineID receiveTLI
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
void wal_segment_close(XLogReaderState *state)
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength, TimeLineID currTLI)
void WALReadRaiseError(WALReadError *errinfo)