106 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
218 #define LAG_TRACKER_BUFFER_SIZE 8192
490 #define READ_REPLICATION_SLOT_COLS 3
511 if (slot == NULL || !slot->
in_use)
522 slot_contents = *slot;
528 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
529 errmsg(
"cannot use %s with a logical replication slot",
530 "READ_REPLICATION_SLOT"));
542 snprintf(xloc,
sizeof(xloc),
"%X/%X",
618 len = strlen(histfname);
626 errmsg(
"could not open file \"%s\": %m", path)));
629 histfilelen = lseek(
fd, 0, SEEK_END);
633 errmsg(
"could not seek to end of file \"%s\": %m", path)));
634 if (lseek(
fd, 0, SEEK_SET) != 0)
637 errmsg(
"could not seek to beginning of file \"%s\": %m", path)));
641 bytesleft = histfilelen;
642 while (bytesleft > 0)
653 errmsg(
"could not read file \"%s\": %m",
658 errmsg(
"could not read file \"%s\": read %d of %zu",
659 path, nread, (
Size) bytesleft)));
668 errmsg(
"could not close file \"%s\": %m", path)));
693 "incremental backup information",
753 (
errcode(ERRCODE_CONNECTION_FAILURE),
754 errmsg(
"unexpected EOF on client connection with an open transaction")));
769 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
770 errmsg(
"unexpected message type 0x%02X during COPY from stdin",
779 (
errcode(ERRCODE_CONNECTION_FAILURE),
780 errmsg(
"unexpected EOF on client connection with an open transaction")));
800 (
errcode(ERRCODE_QUERY_CANCELED),
801 errmsg(
"COPY from stdin failed: %s",
832 (
errcode(ERRCODE_OUT_OF_MEMORY),
834 errdetail(
"Failed while allocating a WAL reading processor.")));
850 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
851 errmsg(
"cannot use a logical replication slot for physical replication")));
882 List *timeLineHistory;
914 switchpoint < cmd->startpoint)
917 (
errmsg(
"requested starting point %X/%X on timeline %u is not in this server's history",
920 errdetail(
"This server's history forked from timeline %u at %X/%X.",
961 if (FlushPtr < cmd->startpoint)
964 (
errmsg(
"requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
1001 char startpos_str[8 + 1 + 8 + 1];
1006 bool nulls[2] = {0};
1008 snprintf(startpos_str,
sizeof(startpos_str),
"%X/%X",
1065 if (flushptr < targetPagePtr + reqLen)
1089 if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
1090 count = XLOG_BLCKSZ;
1092 count = flushptr - targetPagePtr;
1128 bool snapshot_action_given =
false;
1129 bool reserve_wal_given =
false;
1130 bool two_phase_given =
false;
1131 bool failover_given =
false;
1138 if (strcmp(defel->
defname,
"snapshot") == 0)
1144 (
errcode(ERRCODE_SYNTAX_ERROR),
1145 errmsg(
"conflicting or redundant options")));
1148 snapshot_action_given =
true;
1150 if (strcmp(
action,
"export") == 0)
1152 else if (strcmp(
action,
"nothing") == 0)
1154 else if (strcmp(
action,
"use") == 0)
1158 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1159 errmsg(
"unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
1162 else if (strcmp(defel->
defname,
"reserve_wal") == 0)
1166 (
errcode(ERRCODE_SYNTAX_ERROR),
1167 errmsg(
"conflicting or redundant options")));
1169 reserve_wal_given =
true;
1172 else if (strcmp(defel->
defname,
"two_phase") == 0)
1176 (
errcode(ERRCODE_SYNTAX_ERROR),
1177 errmsg(
"conflicting or redundant options")));
1178 two_phase_given =
true;
1181 else if (strcmp(defel->
defname,
"failover") == 0)
1185 (
errcode(ERRCODE_SYNTAX_ERROR),
1186 errmsg(
"conflicting or redundant options")));
1187 failover_given =
true;
1201 const char *snapshot_name = NULL;
1204 bool reserve_wal =
false;
1206 bool failover =
false;
1212 bool nulls[4] = {0};
1223 false,
false,
false);
1239 bool need_full_snapshot =
false;
1265 (
errmsg(
"%s must not be called inside a transaction",
1266 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1268 need_full_snapshot =
true;
1275 (
errmsg(
"%s must be called inside a transaction",
1276 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1281 (
errmsg(
"%s must be called in REPEATABLE READ isolation mode transaction",
1282 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1286 (
errmsg(
"%s must be called in a read-only transaction",
1287 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1292 (
errmsg(
"%s must be called before any query",
1293 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1298 (
errmsg(
"%s must not be called in a subtransaction",
1299 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1301 need_full_snapshot =
true;
1349 snprintf(xloc,
sizeof(xloc),
"%X/%X",
1382 if (snapshot_name != NULL)
1415 bool failover_given =
false;
1416 bool two_phase_given =
false;
1423 if (strcmp(defel->defname,
"failover") == 0)
1427 (
errcode(ERRCODE_SYNTAX_ERROR),
1428 errmsg(
"conflicting or redundant options")));
1429 failover_given =
true;
1432 else if (strcmp(defel->defname,
"two_phase") == 0)
1434 if (two_phase_given)
1436 (
errcode(ERRCODE_SYNTAX_ERROR),
1437 errmsg(
"conflicting or redundant options")));
1438 two_phase_given =
true;
1442 elog(
ERROR,
"unrecognized option: %s", defel->defname);
1446 failover_given ? &failover : NULL,
1475 (
errmsg(
"terminating walsender process after promotion")));
1588 memcpy(&ctx->
out->
data[1 +
sizeof(int64) +
sizeof(int64)],
1639 WAIT_EVENT_WAL_SENDER_WRITE_DATA);
1676 bool pending_writes =
false;
1687 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1715 pending_writes =
true;
1725 if (pending_writes || (!end_xact &&
1775 *wait_event = WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION;
1797 if (target_lsn > flushed_lsn)
1799 *wait_event = WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL;
1835 return RecentFlushPtr;
1844 bool wait_for_standby_at_stop =
false;
1877 if (wait_event != WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
1896 wait_for_standby_at_stop =
true;
1918 if (!wait_for_standby_at_stop &&
1965 WalSndWait(wakeEvents, sleeptime, wait_event);
1970 return RecentFlushPtr;
2002 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2003 errmsg(
"cannot execute new commands while WAL sender is in stopping mode")));
2017 "Replication command context",
2037 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2038 errmsg(
"cannot execute SQL commands in WAL sender for physical replication")));
2050 (
errcode(ERRCODE_SYNTAX_ERROR),
2071 (
errmsg(
"received replication command: %s", cmd_string)));
2078 (
errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
2079 errmsg(
"current transaction is aborted, "
2080 "commands ignored until end of transaction block")));
2092 switch (cmd_node->
type)
2094 case T_IdentifySystemCmd:
2095 cmdtag =
"IDENTIFY_SYSTEM";
2101 case T_ReadReplicationSlotCmd:
2102 cmdtag =
"READ_REPLICATION_SLOT";
2108 case T_BaseBackupCmd:
2109 cmdtag =
"BASE_BACKUP";
2116 case T_CreateReplicationSlotCmd:
2117 cmdtag =
"CREATE_REPLICATION_SLOT";
2123 case T_DropReplicationSlotCmd:
2124 cmdtag =
"DROP_REPLICATION_SLOT";
2130 case T_AlterReplicationSlotCmd:
2131 cmdtag =
"ALTER_REPLICATION_SLOT";
2137 case T_StartReplicationCmd:
2141 cmdtag =
"START_REPLICATION";
2157 case T_TimeLineHistoryCmd:
2158 cmdtag =
"TIMELINE_HISTORY";
2165 case T_VariableShowStmt:
2181 case T_UploadManifestCmd:
2182 cmdtag =
"UPLOAD_MANIFEST";
2190 elog(
ERROR,
"unrecognized replication command node tag: %u",
2215 unsigned char firstchar;
2218 bool received =
false;
2235 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
2236 errmsg(
"unexpected EOF on standby connection")));
2258 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
2259 errmsg(
"invalid standby message type \"%c\"",
2270 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
2271 errmsg(
"unexpected EOF on standby connection")));
2347 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
2348 errmsg(
"unexpected message type \"%c\"", msgtype)));
2359 bool changed =
false;
2395 bool replyRequested;
2403 static bool fullyAppliedLastTime =
false;
2419 elog(
DEBUG2,
"write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
2423 replyRequested ?
" (reply requested)" :
"",
2426 pfree(replyTimeStr);
2443 clearLagTimes =
false;
2446 if (fullyAppliedLastTime)
2447 clearLagTimes =
true;
2448 fullyAppliedLastTime =
true;
2451 fullyAppliedLastTime =
false;
2465 walsnd->
write = writePtr;
2466 walsnd->
flush = flushPtr;
2467 walsnd->
apply = applyPtr;
2468 if (writeLag != -1 || clearLagTimes)
2470 if (flushLag != -1 || clearLagTimes)
2472 if (applyLag != -1 || clearLagTimes)
2497 bool changed =
false;
2556 if (
epoch != nextEpoch)
2561 if (
epoch + 1 != nextEpoch)
2580 uint32 feedbackCatalogEpoch;
2601 elog(
DEBUG2,
"hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2604 feedbackCatalogXmin,
2605 feedbackCatalogEpoch,
2608 pfree(replyTimeStr);
2701 long sleeptime = 10000;
2762 (
errmsg(
"terminating walsender process due to replication timeout")));
2889 WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2917 if (walsnd->
pid != 0)
2983 walsnd->
latch = NULL;
3026 if (nextSegNo == endSegNo)
3032 if (
state->seg.ws_file >= 0)
3039 if (errno == ENOENT)
3042 int save_errno = errno;
3048 errmsg(
"requested WAL segment %s has already been removed",
3054 errmsg(
"could not open file \"%s\": %m",
3120 bool becameHistoric =
false;
3129 becameHistoric =
true;
3138 becameHistoric =
true;
3228 elog(
DEBUG1,
"walsender reached end of timeline at %X/%X (sent up to %X/%X)",
3258 if (SendRqstPtr <= endptr)
3260 endptr = SendRqstPtr;
3269 endptr -= (endptr % XLOG_BLCKSZ);
3273 nbytes = endptr - startptr;
3367 char activitymsg[50];
3369 snprintf(activitymsg,
sizeof(activitymsg),
"streaming %X/%X",
3404 elog(
ERROR,
"could not find record while sending logically-decoded data: %s",
3530 if (
receiveTLI == replayTLI && receivePtr > replayPtr)
3531 result = receivePtr;
3549 if (walsnd->
pid == 0)
3724 if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
3779 bool all_stopped =
true;
3787 if (walsnd->
pid == 0)
3795 all_stopped =
false;
3856 result->
time = offset;
3868 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3896 bool is_sync_standby;
3903 if (walsnd->
pid == 0)
3912 flush = walsnd->
flush;
3913 apply = walsnd->
apply;
3926 is_sync_standby =
false;
3927 for (
j = 0;
j < num_standbys;
j++)
3929 if (sync_standbys[
j].walsnd_index ==
i &&
3930 sync_standbys[
j].pid == pid)
3932 is_sync_standby =
true;
4005 else if (is_sync_standby)
4039 elog(
DEBUG2,
"sending replication keepalive");
4122 buffer_full =
false;
4236 (double) (lsn - prev.
lsn) / (double) (
next.lsn - prev.
lsn);
4240 ((
double) prev.
time + (
next.time - prev.
time) * fraction);
bool has_privs_of_role(Oid member, Oid role)
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
List * readTimeLineHistory(TimeLineID targetTLI)
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
const char * timestamptz_to_str(TimestampTz t)
void pgstat_report_activity(BackendState state, const char *cmd_str)
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
void AppendIncrementalManifestData(IncrementalBackupInfo *ib, const char *data, int len)
IncrementalBackupInfo * CreateIncrementalBackupInfo(MemoryContext mcxt)
void FinalizeIncrementalManifest(IncrementalBackupInfo *ib)
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
#define Assert(condition)
#define pg_attribute_noreturn()
#define MemSet(start, val, len)
#define OidIsValid(objectId)
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
char * get_database_name(Oid dbid)
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
bool defGetBoolean(DefElem *def)
char * defGetString(DefElem *def)
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
void EndReplicationCommand(const char *commandTag)
DestReceiver * CreateDestReceiver(CommandDest dest)
int errmsg_internal(const char *fmt,...)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
bool message_level_is_interesting(int elevel)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void do_tup_output(TupOutputState *tstate, const Datum *values, const bool *isnull)
const TupleTableSlotOps TTSOpsVirtual
void end_tup_output(TupOutputState *tstate)
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
int CloseTransientFile(int fd)
int BasicOpenFile(const char *fileName, int fileFlags)
int OpenTransientFile(const char *fileName, int fileFlags)
Datum Int64GetDatum(int64 X)
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
void ProcessConfigFile(GucContext context)
void GetPGVariable(const char *name, DestReceiver *dest)
static void dlist_init(dlist_head *head)
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
void SetLatch(Latch *latch)
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
void ResetLatch(Latch *latch)
#define WL_SOCKET_READABLE
#define WL_POSTMASTER_DEATH
#define WL_SOCKET_WRITEABLE
#define PQ_SMALL_MESSAGE_LIMIT
#define pq_flush_if_writable()
#define pq_is_send_pending()
#define PQ_LARGE_MESSAGE_LIMIT
#define pq_putmessage_noblock(msgtype, s, len)
#define FeBeWaitSetSocketPos
void list_free_deep(List *list)
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
void FreeDecodingContext(LogicalDecodingContext *ctx)
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
void CheckLogicalDecodingRequirements(void)
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void LWLockReleaseAll(void)
void MemoryContextSetParent(MemoryContext context, MemoryContext new_parent)
char * pstrdup(const char *in)
void pfree(void *pointer)
MemoryContext TopMemoryContext
void * MemoryContextAllocZero(MemoryContext context, Size size)
MemoryContext CurrentMemoryContext
char * MemoryContextStrdup(MemoryContext context, const char *string)
MemoryContext CacheMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define HOLD_CANCEL_INTERRUPTS()
#define RESUME_CANCEL_INTERRUPTS()
#define CHECK_FOR_INTERRUPTS()
#define ERRCODE_DATA_CORRUPTED
#define foreach_ptr(type, var, lst)
static Datum LSNGetDatum(XLogRecPtr X)
void SendPostmasterSignal(PMSignalReason reason)
void MarkPostmasterChildWalSender(void)
@ PMSIGNAL_ADVANCE_STATE_MACHINE
pqsigfunc pqsignal(int signo, pqsigfunc func)
void StatementCancelHandler(SIGNAL_ARGS)
CommandDest whereToSendOutput
const char * debug_query_string
static Datum Int32GetDatum(int32 X)
int pq_getbyte_if_available(unsigned char *c)
int pq_getmessage(StringInfo s, int maxlen)
WaitEventSet * FeBeWaitSet
void pq_startmsgread(void)
static int fd(const char *x, int i)
#define PROC_AFFECTS_ALL_HORIZONS
#define INVALID_PROC_NUMBER
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
void procsignal_sigusr1_handler(SIGNAL_ARGS)
@ PROCSIG_WALSND_INIT_STOPPING
#define PqMsg_CopyInResponse
#define PqMsg_CopyBothResponse
bool update_process_title
static void set_ps_display(const char *activity)
MemoryContextSwitchTo(old_ctx)
@ REPLICATION_KIND_PHYSICAL
@ REPLICATION_KIND_LOGICAL
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
ResourceOwner CurrentResourceOwner
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
void ResourceOwnerDelete(ResourceOwner owner)
@ RESOURCE_RELEASE_BEFORE_LOCKS
@ RESOURCE_RELEASE_AFTER_LOCKS
Size add_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Size mul_size(Size s1, Size s2)
void pg_usleep(long microsec)
static pg_noinline void Size size
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
void ReplicationSlotMarkDirty(void)
void ReplicationSlotReserveWal(void)
void ReplicationSlotAcquire(const char *name, bool nowait)
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
void ReplicationSlotPersist(void)
ReplicationSlot * MyReplicationSlot
void ReplicationSlotDrop(const char *name, bool nowait)
bool SlotExistsInSyncStandbySlots(const char *slot_name)
void ReplicationSlotSave(void)
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
void ReplicationSlotRelease(void)
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
void ReplicationSlotsComputeRequiredLSN(void)
void ReplicationSlotCleanup(bool synced_only)
#define SlotIsPhysical(slot)
#define SlotIsLogical(slot)
bool IsSyncingReplicationSlots(void)
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
void SnapBuildClearExportedSnapshot(void)
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void resetStringInfo(StringInfo str)
void enlargeStringInfo(StringInfo str, int needed)
void initStringInfo(StringInfo str)
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
int read_heads[NUM_SYNC_REP_WAIT_MODE]
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
struct SnapBuild * snapshot_builder
TransactionId catalog_xmin
XLogRecPtr confirmed_flush
TransactionId effective_catalog_xmin
TransactionId effective_xmin
ReplicationSlotPersistentData data
Tuplestorestate * setResult
ConditionVariable wal_confirm_rcv_cv
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
int sync_standby_priority
void SyncRepInitConfig(void)
SyncRepConfigData * SyncRepConfig
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
void SyncRepReleaseWaiters(void)
#define SYNC_REP_PRIORITY
#define NUM_SYNC_REP_WAIT_MODE
#define SyncRepRequested()
#define SYNC_REP_WAIT_WRITE
#define SYNC_REP_WAIT_FLUSH
#define SYNC_REP_WAIT_APPLY
void InitializeTimeouts(void)
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
#define InvalidTransactionId
#define EpochFromFullTransactionId(x)
#define XidFromFullTransactionId(x)
#define TransactionIdIsNormal(xid)
TupleDesc CreateTemplateTupleDesc(int natts)
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
static Datum TimestampTzGetDatum(TimestampTz X)
static Datum IntervalPGetDatum(const Interval *X)
#define TimestampTzPlusMilliseconds(tz, ms)
FullTransactionId ReadNextFullTransactionId(void)
static void pgstat_report_wait_start(uint32 wait_event_info)
static void pgstat_report_wait_end(void)
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
static void ProcessPendingWrites(void)
static XLogRecPtr sentPtr
#define READ_REPLICATION_SLOT_COLS
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
static Interval * offset_to_interval(TimeOffset offset)
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
static void WalSndLastCycleHandler(SIGNAL_ARGS)
static volatile sig_atomic_t got_SIGUSR2
static void WalSndCheckTimeOut(void)
static void XLogSendPhysical(void)
static void ProcessRepliesIfAny(void)
static bool waiting_for_ping_response
void PhysicalWakeupLogicalWalSnd(void)
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
void WalSndErrorCleanup(void)
static void InitWalSenderSlot(void)
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
static void ProcessStandbyHSFeedbackMessage(void)
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
static StringInfoData tmpbuf
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
static LagTracker * lag_tracker
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
static void IdentifySystem(void)
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
static StringInfoData reply_message
static void WalSndKeepaliveIfNecessary(void)
void WalSndSetState(WalSndState state)
static StringInfoData output_message
static TimeLineID sendTimeLine
static bool HandleUploadManifestPacket(StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
static void WalSndLoop(WalSndSendDataCallback send_data)
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
void WalSndWakeup(bool physical, bool logical)
static LogicalDecodingContext * logical_decoding_ctx
static void XLogSendLogical(void)
void WalSndShmemInit(void)
static volatile sig_atomic_t replication_active
static void UploadManifest(void)
static volatile sig_atomic_t got_STOPPING
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
bool exec_replication_command(const char *cmd_string)
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
static bool NeedToWaitForStandbys(XLogRecPtr flushed_lsn, uint32 *wait_event)
#define PG_STAT_GET_WAL_SENDERS_COLS
void(* WalSndSendDataCallback)(void)
Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
void WalSndInitStopping(void)
void WalSndWaitStopping(void)
static bool sendTimeLineIsHistoric
void WalSndResourceCleanup(bool isCommit)
void WalSndRqstFileReload(void)
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
bool am_cascading_walsender
static TimestampTz last_processing
static bool NeedToWaitForWal(XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
Size WalSndShmemSize(void)
bool log_replication_commands
void HandleWalSndInitStopping(void)
static TimeLineID sendTimeLineNextTLI
static MemoryContext uploaded_manifest_mcxt
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
static void ProcessStandbyReplyMessage(void)
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
static bool streamingDoneSending
static void StartLogicalReplication(StartReplicationCmd *cmd)
static IncrementalBackupInfo * uploaded_manifest
static void WalSndShutdown(void)
static void WalSndKill(int code, Datum arg)
static bool WalSndCaughtUp
static XLogRecPtr sendTimeLineValidUpto
static void ProcessStandbyMessage(void)
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
#define LAG_TRACKER_BUFFER_SIZE
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
static long WalSndComputeSleeptime(TimestampTz now)
static bool streamingDoneReceiving
static void StartReplication(StartReplicationCmd *cmd)
static void WalSndDone(WalSndSendDataCallback send_data)
static const char * WalSndGetStateString(WalSndState state)
static XLogReaderState * xlogreader
static TimestampTz last_reply_timestamp
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
WalSndCtlData * WalSndCtl
PGDLLIMPORT Node * replication_parse_result
void replication_scanner_finish(void)
int replication_yyparse(void)
void replication_scanner_init(const char *str)
bool replication_scanner_is_replication_command(void)
static const unsigned __int64 epoch
bool IsTransactionOrTransactionBlock(void)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
void StartTransactionCommand(void)
bool IsAbortedTransactionBlockState(void)
bool IsSubTransaction(void)
bool IsTransactionBlock(void)
void CommitTransactionCommand(void)
#define XACT_REPEATABLE_READ
uint64 GetSystemIdentifier(void)
bool RecoveryInProgress(void)
TimeLineID GetWALInsertionTimeLine(void)
Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, TimeLineID tli)
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
bool XLogBackgroundFlush(void)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
static void XLogFilePath(char *path, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void TLHistoryFilePath(char *path, TimeLineID tli)
static void TLHistoryFileName(char *fname, TimeLineID tli)
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
static TimeLineID receiveTLI
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
void wal_segment_close(XLogReaderState *state)
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength, TimeLineID currTLI)
void WALReadRaiseError(WALReadError *errinfo)