116 #define MIN_SLOTSYNC_WORKER_NAPTIME_MS 200
117 #define MAX_SLOTSYNC_WORKER_NAPTIME_MS 30000
122 #define SLOTSYNC_RESTART_INTERVAL_SEC 10
169 bool *found_consistent_snapshot,
170 bool *remote_slot_precedes)
173 bool updated_xmin_or_lsn =
false;
174 bool updated_config =
false;
178 if (found_consistent_snapshot)
179 *found_consistent_snapshot =
false;
181 if (remote_slot_precedes)
182 *remote_slot_precedes =
false;
215 errmsg(
"could not synchronize replication slot \"%s\" because remote slot precedes local slot",
217 errdetail(
"The remote slot has LSN %X/%X and catalog xmin %u, but the local slot has LSN %X/%X and catalog xmin %u.",
223 if (remote_slot_precedes)
224 *remote_slot_precedes =
true;
257 if (found_consistent_snapshot)
258 *found_consistent_snapshot =
true;
263 found_consistent_snapshot);
268 errmsg_internal(
"synchronized confirmed_flush for slot \"%s\" differs from remote slot",
275 updated_xmin_or_lsn =
true;
295 updated_config =
true;
303 if (updated_config || updated_xmin_or_lsn)
315 if (updated_xmin_or_lsn)
325 return updated_config || updated_xmin_or_lsn;
347 local_slots =
lappend(local_slots, s);
366 bool remote_exists =
false;
367 bool locally_invalidated =
false;
373 remote_exists =
true;
380 locally_invalidated =
389 return (remote_exists && !locally_invalidated);
446 synced_slot = local_slot->in_use && local_slot->data.synced;
459 errmsg(
"dropped replication slot \"%s\" of database with OID %u",
460 NameStr(local_slot->data.name),
461 local_slot->data.database));
510 if (oldest_segno == 1)
519 segno, oldest_segno);
528 if (segno >= oldest_segno)
548 bool found_consistent_snapshot =
false;
549 bool remote_slot_precedes =
false;
552 &found_consistent_snapshot,
553 &remote_slot_precedes);
559 if (remote_slot_precedes)
576 if (!found_consistent_snapshot)
579 errmsg(
"could not synchronize replication slot \"%s\"", remote_slot->
name),
580 errdetail(
"Logical decoding could not find consistent point from local slot's LSN %X/%X.",
589 errmsg(
"newly created replication slot \"%s\" is sync-ready now",
613 bool slot_updated =
false;
627 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
628 errmsg(
"skipping slot synchronization because the received slot sync"
629 " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X",
649 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
650 errmsg(
"exiting from slot synchronization because same"
651 " name slot \"%s\" already exists on the standby",
720 errdetail_internal(
"Local slot's start streaming location LSN(%X/%X) is ahead of remote slot's LSN(%X/%X).",
793 #define SLOTSYNC_COLUMN_COUNT 9
795 LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID};
800 bool some_slot_updated =
false;
801 bool started_tx =
false;
802 const char *query =
"SELECT slot_name, plugin, confirmed_flush_lsn,"
803 " restart_lsn, catalog_xmin, two_phase, failover,"
804 " database, invalidation_reason"
805 " FROM pg_catalog.pg_replication_slots"
806 " WHERE failover and NOT temporary";
819 errmsg(
"could not fetch failover logical slots info from the primary server: %s",
891 remote_slot_list =
lappend(remote_slot_list, remote_slot);
924 return some_slot_updated;
936 #define PRIMARY_INFO_OUTPUT_COL_COUNT 2
942 bool remote_in_recovery;
943 bool primary_slot_valid;
944 bool started_tx =
false;
948 "SELECT pg_is_in_recovery(), count(*) = 1"
949 " FROM pg_catalog.pg_replication_slots"
950 " WHERE slot_type='physical' AND slot_name=%s",
965 errmsg(
"could not fetch primary slot name \"%s\" info from the primary server: %s",
967 errhint(
"Check if \"primary_slot_name\" is configured correctly."));
972 "failed to fetch tuple for the primary server slot specified by \"primary_slot_name\"");
984 if (remote_in_recovery)
986 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
987 errmsg(
"cannot synchronize replication slots from a standby server"));
992 if (!primary_slot_valid)
994 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
996 errmsg(
"replication slot \"%s\" specified by \"%s\" does not exist on primary server",
1023 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1029 errmsg(
"replication slot synchronization requires \"%s\" to be specified in \"%s\"",
1030 "dbname",
"primary_conninfo"));
1049 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1050 errmsg(
"replication slot synchronization requires \"wal_level\" >= \"logical\""));
1061 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1063 errmsg(
"replication slot synchronization requires \"%s\" to be set",
"primary_slot_name"));
1075 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1077 errmsg(
"replication slot synchronization requires \"%s\" to be enabled",
1078 "hot_standby_feedback"));
1089 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1091 errmsg(
"replication slot synchronization requires \"%s\" to be set",
1092 "primary_conninfo"));
1112 bool conninfo_changed;
1113 bool primary_slotname_changed;
1120 conninfo_changed = strcmp(old_primary_conninfo,
PrimaryConnInfo) != 0;
1121 primary_slotname_changed = strcmp(old_primary_slotname,
PrimarySlotName) != 0;
1122 pfree(old_primary_conninfo);
1123 pfree(old_primary_slotname);
1129 errmsg(
"replication slot synchronization worker will shut down because \"%s\" is disabled",
"sync_replication_slots"));
1133 if (conninfo_changed ||
1134 primary_slotname_changed ||
1138 errmsg(
"replication slot synchronization worker will restart because of a parameter change"));
1162 errmsg(
"replication slot synchronization worker is shutting down on receiving SIGINT"));
1240 if (!some_slot_updated)
1260 WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN);
1286 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1287 errmsg(
"cannot synchronize replication slots when standby promotion is ongoing"));
1294 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1295 errmsg(
"cannot synchronize replication slots concurrently"));
1336 sigjmp_buf local_sigjmp_buf;
1339 Assert(startup_data_len == 0);
1369 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
1465 errcode(ERRCODE_CONNECTION_FAILURE),
1466 errmsg(
"synchronization worker \"%s\" could not connect to the primary server: %s",
1488 bool some_slot_updated =
false;
1587 kill(worker_pid, SIGINT);
1597 10L, WAIT_EVENT_REPLICATION_SLOTSYNC_SHUTDOWN);
1633 time_t curtime = time(NULL);
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
#define TextDatumGetCString(d)
#define Assert(condition)
Oid get_database_oid(const char *dbname, bool missing_ok)
static void PGresult * res
void load_file(const char *filename, bool restricted)
int errmsg_internal(const char *fmt,...)
void EmitErrorReport(void)
int errdetail_internal(const char *fmt,...)
int errdetail(const char *fmt,...)
ErrorContextCallback * error_context_stack
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
sigjmp_buf * PG_exception_stack
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
const TupleTableSlotOps TTSOpsMinimalTuple
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
void ProcessConfigFile(GucContext context)
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
volatile sig_atomic_t ShutdownRequestPending
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
#define PG_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
#define PG_END_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define WL_EXIT_ON_PM_DEATH
List * lappend(List *list, void *datum)
void list_free_deep(List *list)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
XLogRecPtr LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto, bool *found_consistent_snapshot)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
char * pstrdup(const char *in)
void pfree(void *pointer)
void * palloc0(Size size)
#define GetProcessingMode()
#define CHECK_FOR_INTERRUPTS()
#define AmLogicalSlotSyncWorkerProcess()
#define HOLD_INTERRUPTS()
#define SetProcessingMode(mode)
BackendType MyBackendType
void namestrcpy(Name name, const char *str)
#define foreach_ptr(type, var, lst)
static XLogRecPtr DatumGetLSN(Datum X)
pqsigfunc pqsignal(int signo, pqsigfunc func)
void FloatExceptionHandler(SIGNAL_ARGS)
static bool DatumGetBool(Datum X)
static Datum PointerGetDatum(const void *X)
static Pointer DatumGetPointer(Datum X)
static TransactionId DatumGetTransactionId(Datum X)
void InitPostgres(const char *in_dbname, Oid dboid, const char *username, Oid useroid, bits32 flags, char *out_dbname)
TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly)
void procsignal_sigusr1_handler(SIGNAL_ARGS)
void init_ps_display(const char *fixed_part)
char * quote_literal_cstr(const char *rawstr)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
static pg_noinline void Size size
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
void ReplicationSlotDropAcquired(void)
void ReplicationSlotMarkDirty(void)
void ReplicationSlotAcquire(const char *name, bool nowait)
ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *invalidation_reason)
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
void ReplicationSlotPersist(void)
ReplicationSlot * MyReplicationSlot
void ReplicationSlotSave(void)
void ReplicationSlotRelease(void)
int max_replication_slots
ReplicationSlotCtlData * ReplicationSlotCtl
void ReplicationSlotsComputeRequiredLSN(void)
void ReplicationSlotCleanup(bool synced_only)
ReplicationSlotInvalidationCause
#define SlotIsLogical(slot)
#define MIN_SLOTSYNC_WORKER_NAPTIME_MS
#define PRIMARY_INFO_OUTPUT_COL_COUNT
static void slotsync_worker_disconnect(int code, Datum arg)
void SyncReplicationSlots(WalReceiverConn *wrconn)
static bool local_sync_slot_required(ReplicationSlot *local_slot, List *remote_slots)
static void drop_local_obsolete_slots(List *remote_slot_list)
static void reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
void ShutDownSlotSync(void)
static bool update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
bool sync_replication_slots
static SlotSyncCtxStruct * SlotSyncCtx
static void slotsync_failure_callback(int code, Datum arg)
#define SLOTSYNC_COLUMN_COUNT
#define SLOTSYNC_RESTART_INTERVAL_SEC
static void reset_syncing_flag()
static bool syncing_slots
struct RemoteSlot RemoteSlot
struct SlotSyncCtxStruct SlotSyncCtxStruct
#define MAX_SLOTSYNC_WORKER_NAPTIME_MS
static bool synchronize_slots(WalReceiverConn *wrconn)
void ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
bool SlotSyncWorkerCanRestart(void)
static List * get_local_synced_slots(void)
static bool synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
static void wait_for_slot_activity(bool some_slot_updated)
static void slotsync_reread_config(void)
char * CheckAndGetDbnameFromConninfo(void)
void SlotSyncShmemInit(void)
static bool update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, bool *found_consistent_snapshot, bool *remote_slot_precedes)
static void slotsync_worker_onexit(int code, Datum arg)
static void check_and_set_sync_info(pid_t worker_pid)
static void update_synced_slots_inactive_since(void)
bool ValidateSlotSyncParams(int elevel)
static void validate_remote_info(WalReceiverConn *wrconn)
bool IsSyncingReplicationSlots(void)
Size SlotSyncShmemSize(void)
static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
bool SnapBuildSnapshotExists(XLogRecPtr lsn)
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void initStringInfo(StringInfo str)
ReplicationSlotInvalidationCause invalidated
TransactionId catalog_xmin
ReplicationSlot replication_slots[1]
TransactionId catalog_xmin
XLogRecPtr confirmed_flush
ReplicationSlotPersistency persistency
ReplicationSlotInvalidationCause invalidated
TransactionId effective_catalog_xmin
ReplicationSlotPersistentData data
TimestampTz inactive_since
void InitializeTimeouts(void)
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
bool TransactionIdFollows(TransactionId id1, TransactionId id2)
#define InvalidTransactionId
#define TransactionIdIsValid(xid)
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
static WalReceiverConn * wrconn
bool hot_standby_feedback
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_get_dbname_from_conninfo(conninfo)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_disconnect(conn)
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
bool IsTransactionState(void)
void StartTransactionCommand(void)
void CommitTransactionCommand(void)
XLogSegNo XLogGetLastRemovedSegno(void)
XLogSegNo XLogGetOldestSegno(TimeLineID tli)
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr