128 "array length mismatch");
131#define ReplicationSlotOnDiskConstantSize \
132 offsetof(ReplicationSlotOnDisk, slotdata)
134#define ReplicationSlotOnDiskNotChecksummedSize \
135 offsetof(ReplicationSlotOnDisk, version)
137#define ReplicationSlotOnDiskChecksummedSize \
138 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
140#define ReplicationSlotOnDiskV2Size \
141 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
143#define SLOT_MAGIC 0x1051CA1
144#define SLOT_VERSION 5
269 char *err_msg =
NULL;
318 *err_msg =
psprintf(
_(
"replication slot name \"%s\" is too short"),
name);
326 *err_msg =
psprintf(
_(
"replication slot name \"%s\" is too long"),
name);
333 if (!((*
cp >=
'a' && *
cp <=
'z')
334 || (*
cp >=
'0' && *
cp <=
'9')
338 *err_msg =
psprintf(
_(
"replication slot name \"%s\" contains invalid character"),
name);
339 *
err_hint =
psprintf(
_(
"Replication slot names may only contain lower case letters, numbers, and the underscore character."));
347 *err_msg =
psprintf(
_(
"replication slot name \"%s\" is reserved"),
name);
348 *
err_hint =
psprintf(
_(
"The name \"%s\" is reserved for the conflict detection slot."),
408 errmsg(
"cannot enable failover for a replication slot created on the standby"));
420 errmsg(
"cannot enable failover for a temporary replication slot"));
449 errmsg(
"replication slot \"%s\" already exists",
name)));
461 errmsg(
"all replication slots are in use"),
462 errhint(
"Free one or increase \"%s\".",
463 repack ?
"max_repack_replication_slots" :
"max_replication_slots")));
650 errmsg(
"replication slot \"%s\" does not exist",
662 errmsg(
"cannot acquire replication slot \"%s\"",
name),
663 errdetail(
"The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
717 errmsg(
"replication slot \"%s\" is active for PID %d",
734 errmsg(
"can no longer access replication slot \"%s\"",
736 errdetail(
"This replication slot has been invalidated due to \"%s\".",
755 ?
errmsg(
"acquired logical replication slot \"%s\"",
757 :
errmsg(
"acquired physical replication slot \"%s\"",
772 char *slotname =
NULL;
849 ?
errmsg(
"released logical replication slot \"%s\"",
851 :
errmsg(
"released physical replication slot \"%s\"",
935 errmsg(
"cannot drop replication slot \"%s\"",
name),
936 errdetail(
"This replication slot is being synchronized from the primary server."));
973 errmsg(
"cannot use %s with a physical replication slot",
974 "ALTER_REPLICATION_SLOT"));
985 errmsg(
"cannot alter replication slot \"%s\"",
name),
986 errdetail(
"This replication slot is being synchronized from the primary server."));
995 errmsg(
"cannot enable failover for a replication slot"
1008 errmsg(
"cannot enable failover for a temporary replication slot"));
1110 errmsg(
"could not rename file \"%s\" to \"%s\": %m",
1349 restart_lsn > last_saved_restart_lsn)
1351 restart_lsn = last_saved_restart_lsn;
1429 restart_lsn > last_saved_restart_lsn)
1431 restart_lsn = last_saved_restart_lsn;
1522 bool dropped =
false;
1596 errmsg(
"replication slot \"%s\" is active for PID %d",
1675 errmsg(
"replication slots can only be used if \"%s\" > 0",
1676 "max_replication_slots"));
1681 errmsg(
"REPACK can only be used if \"%s\" > 0",
1682 "max_repack_replication_slots"));
1687 errmsg(
"replication slots can only be used if \"wal_level\" >= \"replica\"")));
1699 errmsg(
"permission denied to use replication slots"),
1700 errdetail(
"Only roles with the %s attribute may use replication slots.",
1769 elog(
ERROR,
"WAL required by replication slot %s has been removed concurrently",
1812 ngettext(
"The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
1813 "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
1819 "max_slot_wal_keep_size");
1824 snapshotConflictHorizon);
1828 appendStringInfoString(&
err_detail,
_(
"Logical decoding on standby requires the primary server to either set \"wal_level\" >= \"logical\" or have at least one logical slot when \"wal_level\" = \"replica\"."));
1839 "idle_replication_slot_timeout");
1848 errmsg(
"terminating process %d to release replication slot \"%s\"",
1850 errmsg(
"invalidating obsolete replication slot \"%s\"",
1915 snapshotConflictHorizon))
1919 snapshotConflictHorizon))
1941#ifdef USE_INJECTION_POINTS
1944 *inactive_since = 0;
1988 bool invalidated =
false;
2035 snapshotConflictHorizon,
2127 slotname, restart_lsn,
2180 slotname, restart_lsn,
2225 bool invalidated =
false;
2260 dboid, snapshotConflictHorizon,
2329 elog(
DEBUG1,
"performing replication slot checkpoint");
2407 elog(
DEBUG1,
"starting up replication slots");
2433 (
errmsg(
"could not remove directory \"%s\"",
2492 errmsg(
"could not create directory \"%s\": %m",
2504 errmsg(
"could not rename file \"%s\" to \"%s\": %m",
2548 sprintf(path,
"%s/state", dir);
2565 errmsg(
"could not create file \"%s\": %m",
2601 errmsg(
"could not write to file \"%s\": %m",
2621 errmsg(
"could not fsync file \"%s\": %m",
2637 errmsg(
"could not close file \"%s\": %m",
2653 errmsg(
"could not rename file \"%s\" to \"%s\": %m",
2675 slot->
dirty =
false;
2707 errmsg(
"could not remove file \"%s\": %m", path)));
2711 elog(
DEBUG1,
"restoring replication slot from \"%s\"", path);
2723 errmsg(
"could not open file \"%s\": %m", path)));
2733 errmsg(
"could not fsync file \"%s\": %m",
2751 errmsg(
"could not read file \"%s\": %m", path)));
2755 errmsg(
"could not read file \"%s\": read %d of %zu",
2764 errmsg(
"replication slot file \"%s\" has wrong magic number: %u instead of %u",
2771 errmsg(
"replication slot file \"%s\" has unsupported version %u",
2772 path,
cp.version)));
2778 errmsg(
"replication slot file \"%s\" has corrupted length %u",
2792 errmsg(
"could not read file \"%s\": %m", path)));
2796 errmsg(
"could not read file \"%s\": read %d of %zu",
2803 errmsg(
"could not close file \"%s\": %m", path)));
2814 (
errmsg(
"checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2815 path, checksum,
cp.checksum)));
2826 (
errmsg(
"could not remove directory \"%s\"",
2850 errmsg(
"logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2852 errhint(
"Change \"wal_level\" to be \"replica\" or higher.")));
2865 errmsg(
"logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2867 errhint(
"Change \"hot_standby\" to be \"on\".")));
2872 errmsg(
"physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2874 errhint(
"Change \"wal_level\" to be \"replica\" or higher.")));
2927 (
errmsg(
"too many replication slots active before shutdown"),
2928 errhint(
"Increase \"max_replication_slots\" and try again.")));
2988 char *err_msg =
NULL;
3018 if ((*
newval)[0] ==
'\0')
3037 size +=
strlen(slot_name) + 1;
3051 ptr +=
strlen(slot_name) + 1;
3165 errmsg(
"replication slot \"%s\" specified in parameter \"%s\" does not exist",
3166 name,
"synchronized_standby_slots"),
3167 errdetail(
"Logical replication is waiting on the standby associated with replication slot \"%s\".",
3169 errhint(
"Create the replication slot \"%s\" or amend parameter \"%s\".",
3170 name,
"synchronized_standby_slots"));
3179 errmsg(
"cannot specify logical replication slot \"%s\" in parameter \"%s\"",
3180 name,
"synchronized_standby_slots"),
3181 errdetail(
"Logical replication is waiting for correction on replication slot \"%s\".",
3183 errhint(
"Remove the logical replication slot \"%s\" from parameter \"%s\".",
3184 name,
"synchronized_standby_slots"));
3199 errmsg(
"physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
3200 name,
"synchronized_standby_slots"),
3201 errdetail(
"Logical replication is waiting on the standby associated with replication slot \"%s\".",
3203 errhint(
"Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
3204 name,
"synchronized_standby_slots"));
3214 errmsg(
"replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3215 name,
"synchronized_standby_slots"),
3216 errdetail(
"Logical replication is waiting on the standby associated with replication slot \"%s\".",
3218 errhint(
"Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3219 name,
"synchronized_standby_slots"));
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
bool TimestampDifferenceExceedsSeconds(TimestampTz start_time, TimestampTz stop_time, int threshold_sec)
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
#define ngettext(s, p, n)
#define Assert(condition)
#define FLEXIBLE_ARRAY_MEMBER
#define StaticAssertDecl(condition, errmessage)
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
bool ConditionVariableCancelSleep(void)
bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int errcode_for_file_access(void)
int errcode(int sqlerrcode)
int int errdetail_internal(const char *fmt,...) pg_attribute_printf(1
int errhint(const char *fmt,...) pg_attribute_printf(1
int errdetail(const char *fmt,...) pg_attribute_printf(1
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define ereport(elevel,...)
int int errhint_internal(const char *fmt,...) pg_attribute_printf(1
int MakePGDirectory(const char *directoryName)
int CloseTransientFile(int fd)
void fsync_fname(const char *fname, bool isdir)
DIR * AllocateDir(const char *dirname)
struct dirent * ReadDir(DIR *dir, const char *dirname)
int OpenTransientFile(const char *fileName, int fileFlags)
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
void ProcessConfigFile(GucContext context)
void GUC_check_errcode(int sqlerrcode)
void * guc_malloc(int elevel, size_t size)
#define GUC_check_errdetail
#define GUC_check_errhint
#define IS_INJECTION_POINT_ATTACHED(name)
volatile sig_atomic_t ConfigReloadPending
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
bool IsLogicalLauncher(void)
void list_free(List *list)
void RequestDisableLogicalDecoding(void)
bool LWLockHeldByMe(LWLock *lock)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void LWLockInitialize(LWLock *lock, int tranche_id)
char * pstrdup(const char *in)
void pfree(void *pointer)
#define START_CRIT_SECTION()
#define CHECK_FOR_INTERRUPTS()
#define END_CRIT_SECTION()
BackendType MyBackendType
bool has_rolreplication(Oid roleid)
void namestrcpy(Name name, const char *str)
#define ERRCODE_DATA_CORRUPTED
#define COMP_CRC32C(crc, data, len)
#define EQ_CRC32C(c1, c2)
static int list_length(const List *l)
#define foreach_ptr(type, var, lst)
static rewind_source * source
void pgstat_create_replslot(ReplicationSlot *slot)
void pgstat_acquire_replslot(ReplicationSlot *slot)
void pgstat_drop_replslot(ReplicationSlot *slot)
static int fd(const char *x, int i)
#define GetPGProcByNumber(n)
void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked)
bool SignalRecoveryConflict(PGPROC *proc, pid_t pid, RecoveryConflictReason reason)
#define INVALID_PROC_NUMBER
char * psprintf(const char *fmt,...)
bool rmtree(const char *path, bool rmtopdir)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
#define ShmemRequestStruct(...)
int ReplicationSlotIndex(ReplicationSlot *slot)
const ShmemCallbacks ReplicationSlotsShmemCallbacks
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
static bool InvalidatePossiblyObsoleteSlot(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, bool *released_lock_out)
static void ReplicationSlotsShmemInit(void *arg)
static const SlotInvalidationCauseMap SlotInvalidationCauses[]
char * synchronized_standby_slots
void assign_synchronized_standby_slots(const char *newval, void *extra)
#define ReplicationSlotOnDiskChecksummedSize
void CheckPointReplicationSlots(bool is_shutdown)
int idle_replication_slot_timeout_secs
void ReplicationSlotDropAcquired(void)
void ReplicationSlotMarkDirty(void)
static ReplicationSlotInvalidationCause DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, TimestampTz *inactive_since, TimestampTz now)
void ReplicationSlotReserveWal(void)
bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
static XLogRecPtr ss_oldest_flush_lsn
bool ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name, int *err_code, char **err_msg, char **err_hint)
void ReplicationSlotsDropDBSlots(Oid dboid)
#define ReplicationSlotOnDiskNotChecksummedSize
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool repack, bool failover, bool synced)
ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *cause_name)
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
static void RestoreSlotFromDisk(const char *name)
void ReplicationSlotPersist(void)
bool CheckLogicalSlotExists(void)
static void ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, bool terminating, int pid, NameData slotname, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, TransactionId snapshotConflictHorizon, long slot_idle_seconds)
ReplicationSlot * MyReplicationSlot
static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
void ReplicationSlotDrop(const char *name, bool nowait)
bool SlotExistsInSyncStandbySlots(const char *slot_name)
static bool validate_sync_standby_slots(char *rawname, List **elemlist)
void ReplicationSlotSave(void)
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
static void CreateSlotOnDisk(ReplicationSlot *slot)
#define ReplicationSlotOnDiskV2Size
void CheckSlotPermissions(void)
bool ReplicationSlotName(int index, Name name)
bool check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
void ReplicationSlotRelease(void)
int max_replication_slots
ReplicationSlotCtlData * ReplicationSlotCtl
void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
void ReplicationSlotsComputeRequiredLSN(void)
void ReplicationSlotCleanup(bool synced_only)
int max_repack_replication_slots
void ReplicationSlotInitialize(void)
static void ReplicationSlotDropPtr(ReplicationSlot *slot)
void StartupReplicationSlots(void)
static bool CanInvalidateIdleSlot(ReplicationSlot *s)
void CheckSlotRequirements(bool repack)
bool InvalidateObsoleteReplicationSlots(uint32 possible_causes, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
static void ReplicationSlotsShmemRequest(void *arg)
static SyncStandbySlotsConfigData * synchronized_standby_slots_config
#define ReplicationSlotOnDiskConstantSize
const char * GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
static void ReplicationSlotShmemExit(int code, Datum arg)
static bool IsSlotForConflictCheck(const char *name)
#define CONFLICT_DETECTION_SLOT
#define RS_INVAL_MAX_CAUSES
ReplicationSlotPersistency
#define SlotIsPhysical(slot)
ReplicationSlotInvalidationCause
#define SlotIsLogical(slot)
static void ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
bool IsSyncingReplicationSlots(void)
static void SpinLockRelease(volatile slock_t *lock)
static void SpinLockAcquire(volatile slock_t *lock)
static void SpinLockInit(volatile slock_t *lock)
XLogRecPtr LogStandbySnapshot(Oid dbid)
@ RECOVERY_CONFLICT_LOGICALSLOT
#define ERRCODE_DUPLICATE_OBJECT
bool pg_str_endswith(const char *str, const char *end)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void initStringInfo(StringInfo str)
ReplicationSlot replication_slots[1]
ReplicationSlotPersistentData slotdata
XLogRecPtr confirmed_flush
ReplicationSlotPersistency persistency
ReplicationSlotInvalidationCause invalidated
XLogRecPtr candidate_xmin_lsn
TransactionId effective_catalog_xmin
XLogRecPtr candidate_restart_valid
XLogRecPtr last_saved_confirmed_flush
SlotSyncSkipReason slotsync_skip_reason
TransactionId effective_xmin
XLogRecPtr last_saved_restart_lsn
XLogRecPtr candidate_restart_lsn
LWLock io_in_progress_lock
ConditionVariable active_cv
TransactionId candidate_catalog_xmin
ReplicationSlotPersistentData data
TimestampTz inactive_since
ShmemRequestCallback request_fn
ReplicationSlotInvalidationCause cause
char slot_names[FLEXIBLE_ARRAY_MEMBER]
ConditionVariable wal_confirm_rcv_cv
#define InvalidTransactionId
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
#define TransactionIdIsValid(xid)
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
static void pgstat_report_wait_start(uint32 wait_event_info)
static void pgstat_report_wait_end(void)
bool log_replication_commands
WalSndCtlData * WalSndCtl
bool RecoveryInProgress(void)
XLogSegNo XLogGetLastRemovedSegno(void)
XLogRecPtr GetRedoRecPtr(void)
void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
XLogRecPtr GetXLogInsertRecPtr(void)
void XLogFlush(XLogRecPtr record)
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLogRecPtrIsValid(r)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)