114#define MIN_SLOTSYNC_WORKER_NAPTIME_MS 200
115#define MAX_SLOTSYNC_WORKER_NAPTIME_MS 30000
120#define SLOTSYNC_RESTART_INTERVAL_SEC 10
168 bool *found_consistent_snapshot,
169 bool *remote_slot_precedes)
172 bool updated_xmin_or_lsn =
false;
173 bool updated_config =
false;
177 if (found_consistent_snapshot)
178 *found_consistent_snapshot =
false;
180 if (remote_slot_precedes)
181 *remote_slot_precedes =
false;
214 errmsg(
"could not synchronize replication slot \"%s\" because remote slot precedes local slot",
216 errdetail(
"The remote slot has LSN %X/%X and catalog xmin %u, but the local slot has LSN %X/%X and catalog xmin %u.",
222 if (remote_slot_precedes)
223 *remote_slot_precedes =
true;
256 if (found_consistent_snapshot)
257 *found_consistent_snapshot =
true;
262 found_consistent_snapshot);
267 errmsg_internal(
"synchronized confirmed_flush for slot \"%s\" differs from remote slot",
274 updated_xmin_or_lsn =
true;
296 updated_config =
true;
304 if (updated_config || updated_xmin_or_lsn)
316 if (updated_xmin_or_lsn)
326 return updated_config || updated_xmin_or_lsn;
348 local_slots =
lappend(local_slots, s);
367 bool remote_exists =
false;
368 bool locally_invalidated =
false;
374 remote_exists =
true;
381 locally_invalidated =
390 return (remote_exists && !locally_invalidated);
447 synced_slot = local_slot->in_use && local_slot->data.synced;
460 errmsg(
"dropped replication slot \"%s\" of database with OID %u",
461 NameStr(local_slot->data.name),
462 local_slot->data.database));
511 if (oldest_segno == 1)
520 segno, oldest_segno);
529 if (segno >= oldest_segno)
549 bool found_consistent_snapshot =
false;
550 bool remote_slot_precedes =
false;
553 &found_consistent_snapshot,
554 &remote_slot_precedes);
560 if (remote_slot_precedes)
577 if (!found_consistent_snapshot)
580 errmsg(
"could not synchronize replication slot \"%s\"", remote_slot->
name),
581 errdetail(
"Logical decoding could not find consistent point from local slot's LSN %X/%X.",
590 errmsg(
"newly created replication slot \"%s\" is sync-ready now",
614 bool slot_updated =
false;
628 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
629 errmsg(
"skipping slot synchronization because the received slot sync"
630 " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X",
650 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
651 errmsg(
"exiting from slot synchronization because same"
652 " name slot \"%s\" already exists on the standby",
721 errdetail_internal(
"Local slot's start streaming location LSN(%X/%X) is ahead of remote slot's LSN(%X/%X).",
794#define SLOTSYNC_COLUMN_COUNT 10
796 LSNOID, XIDOID, BOOLOID, LSNOID, BOOLOID, TEXTOID, TEXTOID};
801 bool some_slot_updated =
false;
802 bool started_tx =
false;
803 const char *query =
"SELECT slot_name, plugin, confirmed_flush_lsn,"
804 " restart_lsn, catalog_xmin, two_phase, two_phase_at, failover,"
805 " database, invalidation_reason"
806 " FROM pg_catalog.pg_replication_slots"
807 " WHERE failover and NOT temporary";
820 errmsg(
"could not fetch failover logical slots info from the primary server: %s",
895 remote_slot_list =
lappend(remote_slot_list, remote_slot);
928 return some_slot_updated;
940#define PRIMARY_INFO_OUTPUT_COL_COUNT 2
946 bool remote_in_recovery;
947 bool primary_slot_valid;
948 bool started_tx =
false;
952 "SELECT pg_is_in_recovery(), count(*) = 1"
953 " FROM pg_catalog.pg_replication_slots"
954 " WHERE slot_type='physical' AND slot_name=%s",
969 errmsg(
"could not fetch primary slot name \"%s\" info from the primary server: %s",
971 errhint(
"Check if \"primary_slot_name\" is configured correctly."));
976 "failed to fetch tuple for the primary server slot specified by \"primary_slot_name\"");
988 if (remote_in_recovery)
990 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
991 errmsg(
"cannot synchronize replication slots from a standby server"));
996 if (!primary_slot_valid)
998 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1000 errmsg(
"replication slot \"%s\" specified by \"%s\" does not exist on primary server",
1027 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1033 errmsg(
"replication slot synchronization requires \"%s\" to be specified in \"%s\"",
1034 "dbname",
"primary_conninfo"));
1053 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1054 errmsg(
"replication slot synchronization requires \"wal_level\" >= \"logical\""));
1065 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1067 errmsg(
"replication slot synchronization requires \"%s\" to be set",
"primary_slot_name"));
1079 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1081 errmsg(
"replication slot synchronization requires \"%s\" to be enabled",
1082 "hot_standby_feedback"));
1093 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1095 errmsg(
"replication slot synchronization requires \"%s\" to be set",
1096 "primary_conninfo"));
1116 bool conninfo_changed;
1117 bool primary_slotname_changed;
1124 conninfo_changed = strcmp(old_primary_conninfo,
PrimaryConnInfo) != 0;
1125 primary_slotname_changed = strcmp(old_primary_slotname,
PrimarySlotName) != 0;
1126 pfree(old_primary_conninfo);
1127 pfree(old_primary_slotname);
1133 errmsg(
"replication slot synchronization worker will shut down because \"%s\" is disabled",
"sync_replication_slots"));
1137 if (conninfo_changed ||
1138 primary_slotname_changed ||
1142 errmsg(
"replication slot synchronization worker will restart because of a parameter change"));
1166 errmsg(
"replication slot synchronization worker is shutting down on receiving SIGINT"));
1244 if (!some_slot_updated)
1264 WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN);
1290 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1291 errmsg(
"cannot synchronize replication slots when standby promotion is ongoing"));
1298 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1299 errmsg(
"cannot synchronize replication slots concurrently"));
1340 sigjmp_buf local_sigjmp_buf;
1343 Assert(startup_data_len == 0);
1373 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
1469 errcode(ERRCODE_CONNECTION_FAILURE),
1470 errmsg(
"synchronization worker \"%s\" could not connect to the primary server: %s",
1492 bool some_slot_updated =
false;
1589 kill(worker_pid, SIGINT);
1599 10L, WAIT_EVENT_REPLICATION_SLOTSYNC_SHUTDOWN);
1635 time_t curtime = time(NULL);
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
#define TextDatumGetCString(d)
Oid get_database_oid(const char *dbname, bool missing_ok)
void load_file(const char *filename, bool restricted)
int errmsg_internal(const char *fmt,...)
void EmitErrorReport(void)
int errdetail_internal(const char *fmt,...)
int errdetail(const char *fmt,...)
ErrorContextCallback * error_context_stack
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
sigjmp_buf * PG_exception_stack
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps TTSOpsMinimalTuple
void ProcessConfigFile(GucContext context)
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Assert(PointerIsAligned(start, uint64))
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
volatile sig_atomic_t ShutdownRequestPending
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
#define PG_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
#define PG_END_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
List * lappend(List *list, void *datum)
void list_free_deep(List *list)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
XLogRecPtr LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto, bool *found_consistent_snapshot)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
char * pstrdup(const char *in)
void pfree(void *pointer)
void * palloc0(Size size)
#define GetProcessingMode()
#define CHECK_FOR_INTERRUPTS()
#define AmLogicalSlotSyncWorkerProcess()
#define HOLD_INTERRUPTS()
#define SetProcessingMode(mode)
BackendType MyBackendType
void namestrcpy(Name name, const char *str)
#define foreach_ptr(type, var, lst)
static XLogRecPtr DatumGetLSN(Datum X)
void FloatExceptionHandler(SIGNAL_ARGS)
static bool DatumGetBool(Datum X)
static Datum PointerGetDatum(const void *X)
static Pointer DatumGetPointer(Datum X)
static TransactionId DatumGetTransactionId(Datum X)
void InitPostgres(const char *in_dbname, Oid dboid, const char *username, Oid useroid, bits32 flags, char *out_dbname)
TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly)
void procsignal_sigusr1_handler(SIGNAL_ARGS)
void init_ps_display(const char *fixed_part)
char * quote_literal_cstr(const char *rawstr)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
void ReplicationSlotDropAcquired(void)
void ReplicationSlotMarkDirty(void)
ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *cause_name)
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
void ReplicationSlotPersist(void)
ReplicationSlot * MyReplicationSlot
void ReplicationSlotSave(void)
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
void ReplicationSlotRelease(void)
int max_replication_slots
ReplicationSlotCtlData * ReplicationSlotCtl
void ReplicationSlotsComputeRequiredLSN(void)
void ReplicationSlotCleanup(bool synced_only)
ReplicationSlotInvalidationCause
#define SlotIsLogical(slot)
static void ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
static List * get_local_synced_slots(void)
#define MIN_SLOTSYNC_WORKER_NAPTIME_MS
#define PRIMARY_INFO_OUTPUT_COL_COUNT
static void slotsync_worker_disconnect(int code, Datum arg)
void SyncReplicationSlots(WalReceiverConn *wrconn)
static bool local_sync_slot_required(ReplicationSlot *local_slot, List *remote_slots)
static void drop_local_obsolete_slots(List *remote_slot_list)
static void reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
void ShutDownSlotSync(void)
static bool update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
bool sync_replication_slots
static SlotSyncCtxStruct * SlotSyncCtx
static void slotsync_failure_callback(int code, Datum arg)
#define SLOTSYNC_COLUMN_COUNT
#define SLOTSYNC_RESTART_INTERVAL_SEC
static void reset_syncing_flag()
char * CheckAndGetDbnameFromConninfo(void)
static bool syncing_slots
struct RemoteSlot RemoteSlot
struct SlotSyncCtxStruct SlotSyncCtxStruct
#define MAX_SLOTSYNC_WORKER_NAPTIME_MS
static bool synchronize_slots(WalReceiverConn *wrconn)
bool SlotSyncWorkerCanRestart(void)
static bool synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
static void wait_for_slot_activity(bool some_slot_updated)
static void slotsync_reread_config(void)
void SlotSyncShmemInit(void)
static bool update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, bool *found_consistent_snapshot, bool *remote_slot_precedes)
static void slotsync_worker_onexit(int code, Datum arg)
static void check_and_set_sync_info(pid_t worker_pid)
static void update_synced_slots_inactive_since(void)
bool ValidateSlotSyncParams(int elevel)
static void validate_remote_info(WalReceiverConn *wrconn)
bool IsSyncingReplicationSlots(void)
void ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len)
Size SlotSyncShmemSize(void)
static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
bool SnapBuildSnapshotExists(XLogRecPtr lsn)
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void initStringInfo(StringInfo str)
ReplicationSlotInvalidationCause invalidated
TransactionId catalog_xmin
ReplicationSlot replication_slots[1]
TransactionId catalog_xmin
XLogRecPtr confirmed_flush
ReplicationSlotPersistency persistency
ReplicationSlotInvalidationCause invalidated
TransactionId effective_catalog_xmin
ReplicationSlotPersistentData data
Tuplestorestate * tuplestore
void InitializeTimeouts(void)
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
bool TransactionIdFollows(TransactionId id1, TransactionId id2)
#define InvalidTransactionId
#define TransactionIdIsValid(xid)
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
#define WL_EXIT_ON_PM_DEATH
static WalReceiverConn * wrconn
bool hot_standby_feedback
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_get_dbname_from_conninfo(conninfo)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_disconnect(conn)
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
bool IsTransactionState(void)
void StartTransactionCommand(void)
void CommitTransactionCommand(void)
XLogSegNo XLogGetLastRemovedSegno(void)
XLogSegNo XLogGetOldestSegno(TimeLineID tli)
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr