113 #define RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL
116 "array length mismatch");
119 #define ReplicationSlotOnDiskConstantSize \
120 offsetof(ReplicationSlotOnDisk, slotdata)
122 #define ReplicationSlotOnDiskNotChecksummedSize \
123 offsetof(ReplicationSlotOnDisk, version)
125 #define ReplicationSlotOnDiskChecksummedSize \
126 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
128 #define ReplicationSlotOnDiskV2Size \
129 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
131 #define SLOT_MAGIC 0x1051CA1
132 #define SLOT_VERSION 5
256 if (strlen(
name) == 0)
259 (
errcode(ERRCODE_INVALID_NAME),
260 errmsg(
"replication slot name \"%s\" is too short",
268 (
errcode(ERRCODE_NAME_TOO_LONG),
269 errmsg(
"replication slot name \"%s\" is too long",
274 for (cp =
name; *cp; cp++)
276 if (!((*cp >=
'a' && *cp <=
'z')
277 || (*cp >=
'0' && *cp <=
'9')
281 (
errcode(ERRCODE_INVALID_NAME),
282 errmsg(
"replication slot name \"%s\" contains invalid character",
284 errhint(
"Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
311 bool two_phase,
bool failover,
bool synced)
332 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
333 errmsg(
"cannot enable failover for a replication slot created on the standby"));
344 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
345 errmsg(
"cannot enable failover for a temporary replication slot"));
370 errmsg(
"replication slot \"%s\" already exists",
name)));
371 if (!s->
in_use && slot == NULL)
379 (
errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
380 errmsg(
"all replication slots are in use"),
381 errhint(
"Free one or increase max_replication_slots.")));
554 if (s == NULL || !s->
in_use)
559 (
errcode(ERRCODE_UNDEFINED_OBJECT),
560 errmsg(
"replication slot \"%s\" does not exist",
599 WAIT_EVENT_REPLICATION_SLOT_DROP);
605 (
errcode(ERRCODE_OBJECT_IN_USE),
606 errmsg(
"replication slot \"%s\" is active for PID %d",
638 ?
errmsg(
"acquired logical replication slot \"%s\"",
640 :
errmsg(
"acquired physical replication slot \"%s\"",
655 char *slotname = NULL;
656 bool is_logical =
false;
732 ?
errmsg(
"released logical replication slot \"%s\"",
734 :
errmsg(
"released physical replication slot \"%s\"",
795 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
796 errmsg(
"cannot drop replication slot \"%s\"",
name),
797 errdetail(
"This slot is being synced from the primary server."));
814 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
815 errmsg(
"cannot use %s with a physical replication slot",
816 "ALTER_REPLICATION_SLOT"));
826 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
827 errmsg(
"cannot alter replication slot \"%s\"",
name),
828 errdetail(
"This slot is being synced from the primary server."));
836 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
837 errmsg(
"cannot enable failover for a replication slot"
847 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
848 errmsg(
"cannot enable failover for a temporary replication slot"));
907 if (rename(path, tmppath) == 0)
935 errmsg(
"could not rename file \"%s\" to \"%s\": %m",
966 if (!
rmtree(tmppath,
true))
968 (
errmsg(
"could not remove directory \"%s\"", tmppath)));
1082 agg_xmin = effective_xmin;
1088 agg_catalog_xmin = effective_catalog_xmin;
1132 restart_lsn < min_required))
1133 min_required = restart_lsn;
1193 restart_lsn < result)
1194 result = restart_lsn;
1215 *nslots = *nactive = 0;
1305 if (active_pid == 0)
1335 (
errcode(ERRCODE_OBJECT_IN_USE),
1336 errmsg(
"replication slot \"%s\" is active for PID %d",
1337 slotname, active_pid)));
1370 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1371 errmsg(
"replication slots can only be used if max_replication_slots > 0")));
1375 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1376 errmsg(
"replication slots can only be used if wal_level >= replica")));
1387 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1388 errmsg(
"permission denied to use replication slots"),
1389 errdetail(
"Only roles with the %s attribute may use replication slots.",
1493 unsigned long long ex = oldestLSN - restart_lsn;
1497 ngettext(
"The slot's restart_lsn %X/%X exceeds the limit by %llu byte.",
1498 "The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
1506 snapshotConflictHorizon);
1510 appendStringInfoString(&err_detail,
_(
"Logical decoding on standby requires wal_level >= logical on the primary server."));
1518 errmsg(
"terminating process %d to release replication slot \"%s\"",
1520 errmsg(
"invalidating obsolete replication slot \"%s\"",
1523 hint ?
errhint(
"You might need to increase %s.",
"max_slot_wal_keep_size") : 0);
1548 int last_signaled_pid = 0;
1549 bool released_lock =
false;
1550 bool terminated =
false;
1603 initial_restart_lsn < oldestLSN)
1604 invalidation_cause = cause;
1614 snapshotConflictHorizon))
1615 invalidation_cause = cause;
1618 snapshotConflictHorizon))
1619 invalidation_cause = cause;
1623 invalidation_cause = cause;
1635 invalidation_cause_prev != invalidation_cause));
1654 if (active_pid == 0)
1668 *invalidated =
true;
1681 if (active_pid != 0)
1691 released_lock =
true;
1705 if (last_signaled_pid != active_pid)
1708 slotname, restart_lsn,
1709 oldestLSN, snapshotConflictHorizon);
1716 (
void)
kill(active_pid, SIGTERM);
1718 last_signaled_pid = active_pid;
1720 invalidation_cause_prev = invalidation_cause;
1725 WAIT_EVENT_REPLICATION_SLOT_DROP);
1746 released_lock =
true;
1754 slotname, restart_lsn,
1755 oldestLSN, snapshotConflictHorizon);
1764 return released_lock;
1787 bool invalidated =
false;
1808 snapshotConflictHorizon,
1842 elog(
DEBUG1,
"performing replication slot checkpoint");
1899 DIR *replication_dir;
1900 struct dirent *replication_de;
1902 elog(
DEBUG1,
"starting up replication slots");
1906 while ((replication_de =
ReadDir(replication_dir,
"pg_replslot")) != NULL)
1911 if (strcmp(replication_de->
d_name,
".") == 0 ||
1912 strcmp(replication_de->
d_name,
"..") == 0)
1915 snprintf(path,
sizeof(path),
"pg_replslot/%s", replication_de->
d_name);
1928 (
errmsg(
"could not remove directory \"%s\"",
1987 errmsg(
"could not create directory \"%s\": %m",
1996 if (rename(tmppath, path) != 0)
1999 errmsg(
"could not rename file \"%s\" to \"%s\": %m",
2029 was_dirty = slot->
dirty;
2042 sprintf(tmppath,
"%s/state.tmp", dir);
2043 sprintf(path,
"%s/state", dir);
2054 int save_errno = errno;
2060 errmsg(
"could not create file \"%s\": %m",
2083 if ((
write(
fd, &cp,
sizeof(cp))) !=
sizeof(cp))
2085 int save_errno = errno;
2092 errno = save_errno ? save_errno : ENOSPC;
2095 errmsg(
"could not write to file \"%s\": %m",
2105 int save_errno = errno;
2113 errmsg(
"could not fsync file \"%s\": %m",
2121 int save_errno = errno;
2127 errmsg(
"could not close file \"%s\": %m",
2133 if (rename(tmppath, path) != 0)
2135 int save_errno = errno;
2141 errmsg(
"could not rename file \"%s\" to \"%s\": %m",
2163 slot->
dirty =
false;
2181 bool restored =
false;
2189 sprintf(path,
"%s/state.tmp", slotdir);
2190 if (unlink(path) < 0 && errno != ENOENT)
2193 errmsg(
"could not remove file \"%s\": %m", path)));
2195 sprintf(path,
"%s/state", slotdir);
2197 elog(
DEBUG1,
"restoring replication slot from \"%s\"", path);
2209 errmsg(
"could not open file \"%s\": %m", path)));
2219 errmsg(
"could not fsync file \"%s\": %m",
2237 errmsg(
"could not read file \"%s\": %m", path)));
2241 errmsg(
"could not read file \"%s\": read %d of %zu",
2250 errmsg(
"replication slot file \"%s\" has wrong magic number: %u instead of %u",
2257 errmsg(
"replication slot file \"%s\" has unsupported version %u",
2264 errmsg(
"replication slot file \"%s\" has corrupted length %u",
2273 if (readBytes != cp.
length)
2278 errmsg(
"could not read file \"%s\": %m", path)));
2282 errmsg(
"could not read file \"%s\": read %d of %zu",
2289 errmsg(
"could not close file \"%s\": %m", path)));
2300 (
errmsg(
"checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2309 if (!
rmtree(slotdir,
true))
2312 (
errmsg(
"could not remove directory \"%s\"",
2333 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2334 errmsg(
"logical replication slot \"%s\" exists, but wal_level < logical",
2336 errhint(
"Change wal_level to be logical or higher.")));
2339 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2340 errmsg(
"physical replication slot \"%s\" exists, but wal_level < replica",
2342 errhint(
"Change wal_level to be replica or higher.")));
2389 (
errmsg(
"too many replication slots active before shutdown"),
2390 errhint(
"Increase max_replication_slots and try again.")));
2404 Assert(invalidation_reason);
2493 if ((*
newval)[0] ==
'\0')
2502 if (!ok || elemlist ==
NIL)
2512 size += strlen(slot_name) + 1;
2523 strcpy(ptr, slot_name);
2524 ptr += strlen(slot_name) + 1;
2527 *extra = (
void *) config;
2555 const char *standby_slot_name;
2569 if (strcmp(standby_slot_name, slot_name) == 0)
2572 standby_slot_name += strlen(standby_slot_name) + 1;
2589 int caught_up_slot_num = 0;
2642 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2643 errmsg(
"replication slot \"%s\" specified in parameter %s does not exist",
2644 name,
"standby_slot_names"),
2645 errdetail(
"Logical replication is waiting on the standby associated with \"%s\".",
2647 errhint(
"Consider creating the slot \"%s\" or amend parameter %s.",
2648 name,
"standby_slot_names"));
2663 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2664 errmsg(
"cannot have logical replication slot \"%s\" in parameter %s",
2665 name,
"standby_slot_names"),
2666 errdetail(
"Logical replication is waiting for correction on \"%s\".",
2668 errhint(
"Consider removing logical slot \"%s\" from parameter %s.",
2669 name,
"standby_slot_names"));
2683 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2684 errmsg(
"physical slot \"%s\" specified in parameter %s has been invalidated",
2685 name,
"standby_slot_names"),
2686 errdetail(
"Logical replication is waiting on the standby associated with \"%s\".",
2688 errhint(
"Consider dropping and recreating the slot \"%s\" or amend parameter %s.",
2689 name,
"standby_slot_names"));
2698 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2699 errmsg(
"replication slot \"%s\" specified in parameter %s does not have active_pid",
2700 name,
"standby_slot_names"),
2701 errdetail(
"Logical replication is waiting on the standby associated with \"%s\".",
2703 errhint(
"Consider starting standby associated with \"%s\" or amend parameter %s.",
2704 name,
"standby_slot_names"));
2710 Assert(restart_lsn >= wait_for_lsn);
2713 min_restart_lsn > restart_lsn)
2714 min_restart_lsn = restart_lsn;
2716 caught_up_slot_num++;
2778 WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
#define ngettext(s, p, n)
#define PG_USED_FOR_ASSERTS_ONLY
#define FLEXIBLE_ARRAY_MEMBER
#define MemSet(start, val, len)
bool ConditionVariableCancelSleep(void)
bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int errdetail_internal(const char *fmt,...)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
struct dirent * ReadDir(DIR *dir, const char *dirname)
int MakePGDirectory(const char *directoryName)
int CloseTransientFile(int fd)
void fsync_fname(const char *fname, bool isdir)
int OpenTransientFile(const char *fileName, int fileFlags)
DIR * AllocateDir(const char *dirname)
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
void * guc_malloc(int elevel, size_t size)
#define GUC_check_errdetail
void ProcessConfigFile(GucContext context)
volatile sig_atomic_t ConfigReloadPending
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Assert(fmt[strlen(fmt) - 1] !='\n')
void list_free(List *list)
bool LWLockHeldByMe(LWLock *lock)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void LWLockInitialize(LWLock *lock, int tranche_id)
@ LWTRANCHE_REPLICATION_SLOT_IO
char * pstrdup(const char *in)
void pfree(void *pointer)
#define START_CRIT_SECTION()
#define CHECK_FOR_INTERRUPTS()
#define END_CRIT_SECTION()
BackendType MyBackendType
bool has_rolreplication(Oid roleid)
void namestrcpy(Name name, const char *str)
#define ERRCODE_DATA_CORRUPTED
#define COMP_CRC32C(crc, data, len)
#define EQ_CRC32C(c1, c2)
static int list_length(const List *l)
#define foreach_ptr(type, var, lst)
static rewind_source * source
void pgstat_create_replslot(ReplicationSlot *slot)
void pgstat_acquire_replslot(ReplicationSlot *slot)
void pgstat_drop_replslot(ReplicationSlot *slot)
static int fd(const char *x, int i)
#define PROC_IN_LOGICAL_DECODING
void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked)
#define INVALID_PROC_NUMBER
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
@ PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT
bool rmtree(const char *path, bool rmtopdir)
Size add_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Size mul_size(Size s1, Size s2)
static pg_noinline void Size size
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
void ReplicationSlotAlter(const char *name, bool failover)
int ReplicationSlotIndex(ReplicationSlot *slot)
#define ReplicationSlotOnDiskChecksummedSize
void CheckPointReplicationSlots(bool is_shutdown)
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
void ReplicationSlotCleanup(void)
void ReplicationSlotDropAcquired(void)
void ReplicationSlotMarkDirty(void)
void ReplicationSlotReserveWal(void)
char * standby_slot_names
bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
void ReplicationSlotAcquire(const char *name, bool nowait)
bool SlotExistsInStandbySlotNames(const char *slot_name)
bool InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
static bool validate_standby_slots(char *rawname, List **elemlist)
static XLogRecPtr ss_oldest_flush_lsn
ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *invalidation_reason)
void ReplicationSlotsDropDBSlots(Oid dboid)
#define ReplicationSlotOnDiskNotChecksummedSize
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
static void RestoreSlotFromDisk(const char *name)
#define RS_INVAL_MAX_CAUSES
void ReplicationSlotPersist(void)
ReplicationSlot * MyReplicationSlot
static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
void ReplicationSlotDrop(const char *name, bool nowait)
void ReplicationSlotSave(void)
static void CreateSlotOnDisk(ReplicationSlot *slot)
#define ReplicationSlotOnDiskV2Size
void CheckSlotPermissions(void)
bool ReplicationSlotName(int index, Name name)
void ReplicationSlotsShmemInit(void)
const char *const SlotInvalidationCauses[]
static StandbySlotNamesConfigData * standby_slot_names_config
void ReplicationSlotRelease(void)
int max_replication_slots
StaticAssertDecl(lengthof(SlotInvalidationCauses)==(RS_INVAL_MAX_CAUSES+1), "array length mismatch")
ReplicationSlotCtlData * ReplicationSlotCtl
struct ReplicationSlotOnDisk ReplicationSlotOnDisk
void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
void ReplicationSlotsComputeRequiredLSN(void)
void ReplicationSlotInitialize(void)
static void ReplicationSlotDropPtr(ReplicationSlot *slot)
static bool InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, bool *invalidated)
void StartupReplicationSlots(void)
void CheckSlotRequirements(void)
void assign_standby_slot_names(const char *newval, void *extra)
bool check_standby_slot_names(char **newval, void **extra, GucSource source)
static void ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, bool terminating, int pid, NameData slotname, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, TransactionId snapshotConflictHorizon)
#define ReplicationSlotOnDiskConstantSize
Size ReplicationSlotsShmemSize(void)
bool ReplicationSlotValidateName(const char *name, int elevel)
static void ReplicationSlotShmemExit(int code, Datum arg)
ReplicationSlotPersistency
#define SlotIsPhysical(slot)
ReplicationSlotInvalidationCause
#define SlotIsLogical(slot)
bool IsSyncingReplicationSlots(void)
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
XLogRecPtr LogStandbySnapshot(void)
#define ERRCODE_DUPLICATE_OBJECT
bool pg_str_endswith(const char *str, const char *end)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void initStringInfo(StringInfo str)
ReplicationSlot replication_slots[1]
ReplicationSlotPersistentData slotdata
TransactionId catalog_xmin
XLogRecPtr confirmed_flush
ReplicationSlotPersistency persistency
ReplicationSlotInvalidationCause invalidated
XLogRecPtr candidate_xmin_lsn
TransactionId effective_catalog_xmin
XLogRecPtr candidate_restart_valid
XLogRecPtr last_saved_confirmed_flush
TransactionId effective_xmin
XLogRecPtr candidate_restart_lsn
LWLock io_in_progress_lock
ConditionVariable active_cv
TransactionId candidate_catalog_xmin
ReplicationSlotPersistentData data
TimestampTz inactive_since
char slot_names[FLEXIBLE_ARRAY_MEMBER]
ConditionVariable wal_confirm_rcv_cv
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
#define InvalidTransactionId
#define TransactionIdIsValid(xid)
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
static void pgstat_report_wait_start(uint32 wait_event_info)
static void pgstat_report_wait_end(void)
bool log_replication_commands
WalSndCtlData * WalSndCtl
bool RecoveryInProgress(void)
XLogSegNo XLogGetLastRemovedSegno(void)
XLogRecPtr GetRedoRecPtr(void)
void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
XLogRecPtr GetXLogInsertRecPtr(void)
void XLogFlush(XLogRecPtr record)
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)