124#define MIN_SLOTSYNC_WORKER_NAPTIME_MS 200
125#define MAX_SLOTSYNC_WORKER_NAPTIME_MS 30000
130#define SLOTSYNC_RESTART_INTERVAL_SEC 10
257 errmsg(
"could not synchronize replication slot \"%s\"",
259 errdetail(
"Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.",
319 errmsg_internal(
"synchronized confirmed_flush for slot \"%s\" differs from remote slot",
528 errmsg(
"dropped replication slot \"%s\" of database with OID %u",
660 errdetail(
"Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.",
673 errmsg(
"newly created replication slot \"%s\" is sync-ready now",
716 errmsg(
"exiting from slot synchronization because same"
717 " name slot \"%s\" already exists on the standby",
784 errmsg(
"skipping slot synchronization because the received slot sync"
785 " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
817 errdetail_internal(
"Local slot's start streaming location LSN(%X/%08X) is ahead of remote slot's LSN(%X/%08X).",
888 errmsg(
"skipping slot synchronization because the received slot sync"
889 " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
922#define SLOTSYNC_COLUMN_COUNT 10
933 "SELECT slot_name, plugin, confirmed_flush_lsn,"
934 " restart_lsn, catalog_xmin, two_phase,"
935 " two_phase_at, failover,"
936 " database, invalidation_reason"
937 " FROM pg_catalog.pg_replication_slots"
938 " WHERE failover and NOT temporary");
940 if (slot_names !=
NIL)
965 errmsg(
"could not fetch failover logical slots info from the primary server: %s",
1100#define PRIMARY_INFO_OUTPUT_COL_COUNT 2
1112 "SELECT pg_is_in_recovery(), count(*) = 1"
1113 " FROM pg_catalog.pg_replication_slots"
1114 " WHERE slot_type='physical' AND slot_name=%s",
1129 errmsg(
"could not fetch primary slot name \"%s\" info from the primary server: %s",
1131 errhint(
"Check if \"primary_slot_name\" is configured correctly."));
1136 "failed to fetch tuple for the primary server slot specified by \"primary_slot_name\"");
1151 errmsg(
"cannot synchronize replication slots from a standby server"));
1160 errmsg(
"replication slot \"%s\" specified by \"%s\" does not exist on primary server",
1193 errmsg(
"replication slot synchronization requires \"%s\" to be specified in \"%s\"",
1194 "dbname",
"primary_conninfo"));
1212 errmsg(
"replication slot synchronization requires \"effective_wal_level\" >= \"logical\" on the primary"),
1213 errhint(
"To enable logical decoding on primary, set \"wal_level\" >= \"logical\" or create at least one logical slot when \"wal_level\" = \"replica\"."));
1229 errmsg(
"replication slot synchronization requires \"%s\" to be set",
"primary_slot_name"));
1243 errmsg(
"replication slot synchronization requires \"%s\" to be enabled",
1244 "hot_standby_feedback"));
1257 errmsg(
"replication slot synchronization requires \"%s\" to be set",
1258 "primary_conninfo"));
1300 errmsg(
"replication slot synchronization worker will stop because \"%s\" is disabled",
1301 "sync_replication_slots"));
1318 errmsg(
"replication slot synchronization worker will restart because of a parameter change"));
1343 errmsg(
"replication slot synchronization will stop because of a parameter change"));
1361 errmsg(
"replication slot synchronization worker will stop because promotion is triggered"));
1373 errmsg(
"replication slot synchronization will stop because promotion is triggered"));
1490 errmsg(
"cannot synchronize replication slots concurrently"));
1667 errmsg(
"synchronization worker \"%s\" could not connect to the primary server: %s",
1965 slot_names =
lappend(slot_names, slot_name);
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
#define TextDatumGetCString(d)
#define Assert(condition)
Oid get_database_oid(const char *dbname, bool missing_ok)
void load_file(const char *filename, bool restricted)
int errmsg_internal(const char *fmt,...)
void EmitErrorReport(void)
int errdetail_internal(const char *fmt,...)
int errdetail(const char *fmt,...)
ErrorContextCallback * error_context_stack
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
sigjmp_buf * PG_exception_stack
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps TTSOpsMinimalTuple
#define palloc0_object(type)
void ProcessConfigFile(GucContext context)
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
#define PG_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
#define PG_END_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
List * lappend(List *list, void *datum)
void list_free_deep(List *list)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
XLogRecPtr LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto, bool *found_consistent_snapshot)
bool IsLogicalDecodingEnabled(void)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
char * pstrdup(const char *in)
void pfree(void *pointer)
#define GetProcessingMode()
#define CHECK_FOR_INTERRUPTS()
#define AmLogicalSlotSyncWorkerProcess()
#define HOLD_INTERRUPTS()
#define SetProcessingMode(mode)
BackendType MyBackendType
void namestrcpy(Name name, const char *str)
#define foreach_ptr(type, var, lst)
static XLogRecPtr DatumGetLSN(Datum X)
void pgstat_report_replslotsync(ReplicationSlot *slot)
void FloatExceptionHandler(SIGNAL_ARGS)
void StatementCancelHandler(SIGNAL_ARGS)
static bool DatumGetBool(Datum X)
static Datum PointerGetDatum(const void *X)
static Pointer DatumGetPointer(Datum X)
static TransactionId DatumGetTransactionId(Datum X)
void InitPostgres(const char *in_dbname, Oid dboid, const char *username, Oid useroid, bits32 flags, char *out_dbname)
TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly)
void procsignal_sigusr1_handler(SIGNAL_ARGS)
void init_ps_display(const char *fixed_part)
char * quote_literal_cstr(const char *rawstr)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
void ReplicationSlotDropAcquired(void)
void ReplicationSlotMarkDirty(void)
ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *cause_name)
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
void ReplicationSlotPersist(void)
ReplicationSlot * MyReplicationSlot
void ReplicationSlotSave(void)
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
void ReplicationSlotRelease(void)
int max_replication_slots
ReplicationSlotCtlData * ReplicationSlotCtl
void ReplicationSlotsComputeRequiredLSN(void)
void ReplicationSlotCleanup(bool synced_only)
ReplicationSlotInvalidationCause
#define SlotIsLogical(slot)
static void ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
@ SS_SKIP_WAL_NOT_FLUSHED
@ SS_SKIP_NO_CONSISTENT_SNAPSHOT
@ SS_SKIP_WAL_OR_ROWS_REMOVED
static List * get_local_synced_slots(void)
#define MIN_SLOTSYNC_WORKER_NAPTIME_MS
#define PRIMARY_INFO_OUTPUT_COL_COUNT
static void slotsync_worker_disconnect(int code, Datum arg)
void SyncReplicationSlots(WalReceiverConn *wrconn)
static bool local_sync_slot_required(ReplicationSlot *local_slot, List *remote_slots)
static void drop_local_obsolete_slots(List *remote_slot_list)
static void reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
static void update_slotsync_skip_stats(SlotSyncSkipReason skip_reason)
void ShutDownSlotSync(void)
bool sync_replication_slots
static bool synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid, bool *slot_persistence_pending)
static SlotSyncCtxStruct * SlotSyncCtx
static void slotsync_failure_callback(int code, Datum arg)
#define SLOTSYNC_COLUMN_COUNT
static List * extract_slot_names(List *remote_slots)
#define SLOTSYNC_RESTART_INTERVAL_SEC
char * CheckAndGetDbnameFromConninfo(void)
static bool syncing_slots
static void ProcessSlotSyncInterrupts(void)
#define MAX_SLOTSYNC_WORKER_NAPTIME_MS
static bool update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, bool *slot_persistence_pending)
bool SlotSyncWorkerCanRestart(void)
static void wait_for_slot_activity(bool some_slot_updated)
static void slotsync_reread_config(void)
static void reset_syncing_flag(void)
void SlotSyncShmemInit(void)
static bool update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, bool *found_consistent_snapshot, bool *remote_slot_precedes)
static void slotsync_worker_onexit(int code, Datum arg)
static void update_synced_slots_inactive_since(void)
bool ValidateSlotSyncParams(int elevel)
static void validate_remote_info(WalReceiverConn *wrconn)
static void check_and_set_sync_info(pid_t sync_process_pid)
bool IsSyncingReplicationSlots(void)
void ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len)
static List * fetch_remote_slots(WalReceiverConn *wrconn, List *slot_names)
Size SlotSyncShmemSize(void)
static bool synchronize_slots(WalReceiverConn *wrconn, List *remote_slot_list, bool *slot_persistence_pending)
bool SnapBuildSnapshotExists(XLogRecPtr lsn)
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void appendStringInfoChar(StringInfo str, char ch)
void initStringInfo(StringInfo str)
ReplicationSlotInvalidationCause invalidated
TransactionId catalog_xmin
ReplicationSlot replication_slots[1]
TransactionId catalog_xmin
XLogRecPtr confirmed_flush
ReplicationSlotPersistency persistency
ReplicationSlotInvalidationCause invalidated
TransactionId effective_catalog_xmin
SlotSyncSkipReason slotsync_skip_reason
ReplicationSlotPersistentData data
Tuplestorestate * tuplestore
void InitializeTimeouts(void)
static bool TransactionIdFollows(TransactionId id1, TransactionId id2)
#define InvalidTransactionId
#define TransactionIdIsValid(xid)
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
#define WL_EXIT_ON_PM_DEATH
static WalReceiverConn * wrconn
bool hot_standby_feedback
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_get_dbname_from_conninfo(conninfo)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_disconnect(conn)
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
bool IsTransactionState(void)
void StartTransactionCommand(void)
void CommitTransactionCommand(void)
XLogSegNo XLogGetLastRemovedSegno(void)
XLogSegNo XLogGetOldestSegno(TimeLineID tli)
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLogRecPtrIsValid(r)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr