80 #define ReplicationSlotOnDiskConstantSize \
81 offsetof(ReplicationSlotOnDisk, slotdata)
83 #define ReplicationSlotOnDiskNotChecksummedSize \
84 offsetof(ReplicationSlotOnDisk, version)
86 #define ReplicationSlotOnDiskChecksummedSize \
87 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
89 #define ReplicationSlotOnDiskV2Size \
90 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
92 #define SLOT_MAGIC 0x1051CA1
93 #define SLOT_VERSION 3
203 if (strlen(
name) == 0)
206 (
errcode(ERRCODE_INVALID_NAME),
207 errmsg(
"replication slot name \"%s\" is too short",
215 (
errcode(ERRCODE_NAME_TOO_LONG),
216 errmsg(
"replication slot name \"%s\" is too long",
221 for (cp =
name; *cp; cp++)
223 if (!((*cp >=
'a' && *cp <=
'z')
224 || (*cp >=
'0' && *cp <=
'9')
228 (
errcode(ERRCODE_INVALID_NAME),
229 errmsg(
"replication slot name \"%s\" contains invalid character",
231 errhint(
"Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
285 errmsg(
"replication slot \"%s\" already exists",
name)));
286 if (!s->
in_use && slot == NULL)
294 (
errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
295 errmsg(
"all replication slots are in use"),
296 errhint(
"Free one or increase max_replication_slots.")));
468 if (s == NULL || !s->
in_use)
473 (
errcode(ERRCODE_UNDEFINED_OBJECT),
474 errmsg(
"replication slot \"%s\" does not exist",
519 (
errcode(ERRCODE_OBJECT_IN_USE),
520 errmsg(
"replication slot \"%s\" is active for PID %d",
695 if (rename(path, tmppath) == 0)
723 errmsg(
"could not rename file \"%s\" to \"%s\": %m",
754 if (!
rmtree(tmppath,
true))
756 (
errmsg(
"could not remove directory \"%s\"", tmppath)));
870 agg_xmin = effective_xmin;
876 agg_catalog_xmin = effective_catalog_xmin;
920 restart_lsn < min_required))
921 min_required = restart_lsn;
981 restart_lsn < result)
982 result = restart_lsn;
1003 *nslots = *nactive = 0;
1093 if (active_pid == 0)
1109 (
errcode(ERRCODE_OBJECT_IN_USE),
1110 errmsg(
"replication slot \"%s\" is active for PID %d",
1111 slotname, active_pid)));
1144 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1145 errmsg(
"replication slots can only be used if max_replication_slots > 0")));
1149 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1150 errmsg(
"replication slots can only be used if wal_level >= replica")));
1161 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1162 errmsg(
"permission denied to use replication slots"),
1163 errdetail(
"Only roles with the %s attribute may use replication slots.",
1267 appendStringInfo(&err_detail,
_(
"The slot's restart_lsn %X/%X exceeds the limit by %llu bytes."),
1269 (
unsigned long long) (oldestLSN - restart_lsn));
1273 snapshotConflictHorizon);
1277 appendStringInfo(&err_detail,
_(
"Logical decoding on standby requires wal_level >= logical on the primary server."));
1285 errmsg(
"terminating process %d to release replication slot \"%s\"",
1287 errmsg(
"invalidating obsolete replication slot \"%s\"",
1290 hint ?
errhint(
"You might need to increase max_slot_wal_keep_size.") : 0);
1315 int last_signaled_pid = 0;
1316 bool released_lock =
false;
1366 snapshotConflictHorizon))
1370 snapshotConflictHorizon))
1399 if (active_pid == 0)
1413 *invalidated =
true;
1418 if (active_pid != 0)
1428 released_lock =
true;
1442 if (last_signaled_pid != active_pid)
1445 slotname, restart_lsn,
1446 oldestLSN, snapshotConflictHorizon);
1453 (
void)
kill(active_pid, SIGTERM);
1455 last_signaled_pid = active_pid;
1481 released_lock =
true;
1490 slotname, restart_lsn,
1491 oldestLSN, snapshotConflictHorizon);
1500 return released_lock;
1523 bool invalidated =
false;
1544 snapshotConflictHorizon,
1576 elog(
DEBUG1,
"performing replication slot checkpoint");
1609 DIR *replication_dir;
1610 struct dirent *replication_de;
1612 elog(
DEBUG1,
"starting up replication slots");
1616 while ((replication_de =
ReadDir(replication_dir,
"pg_replslot")) != NULL)
1621 if (strcmp(replication_de->
d_name,
".") == 0 ||
1622 strcmp(replication_de->
d_name,
"..") == 0)
1625 snprintf(path,
sizeof(path),
"pg_replslot/%s", replication_de->
d_name);
1638 (
errmsg(
"could not remove directory \"%s\"",
1697 errmsg(
"could not create directory \"%s\": %m",
1706 if (rename(tmppath, path) != 0)
1709 errmsg(
"could not rename file \"%s\" to \"%s\": %m",
1739 was_dirty = slot->
dirty;
1752 sprintf(tmppath,
"%s/state.tmp", dir);
1753 sprintf(path,
"%s/state", dir);
1764 int save_errno = errno;
1770 errmsg(
"could not create file \"%s\": %m",
1793 if ((
write(
fd, &cp,
sizeof(cp))) !=
sizeof(cp))
1795 int save_errno = errno;
1802 errno = save_errno ? save_errno : ENOSPC;
1805 errmsg(
"could not write to file \"%s\": %m",
1815 int save_errno = errno;
1823 errmsg(
"could not fsync file \"%s\": %m",
1831 int save_errno = errno;
1837 errmsg(
"could not close file \"%s\": %m",
1843 if (rename(tmppath, path) != 0)
1845 int save_errno = errno;
1851 errmsg(
"could not rename file \"%s\" to \"%s\": %m",
1873 slot->
dirty =
false;
1890 bool restored =
false;
1898 sprintf(path,
"%s/state.tmp", slotdir);
1899 if (unlink(path) < 0 && errno != ENOENT)
1902 errmsg(
"could not remove file \"%s\": %m", path)));
1904 sprintf(path,
"%s/state", slotdir);
1906 elog(
DEBUG1,
"restoring replication slot from \"%s\"", path);
1918 errmsg(
"could not open file \"%s\": %m", path)));
1928 errmsg(
"could not fsync file \"%s\": %m",
1946 errmsg(
"could not read file \"%s\": %m", path)));
1950 errmsg(
"could not read file \"%s\": read %d of %zu",
1959 errmsg(
"replication slot file \"%s\" has wrong magic number: %u instead of %u",
1966 errmsg(
"replication slot file \"%s\" has unsupported version %u",
1973 errmsg(
"replication slot file \"%s\" has corrupted length %u",
1982 if (readBytes != cp.
length)
1987 errmsg(
"could not read file \"%s\": %m", path)));
1991 errmsg(
"could not read file \"%s\": read %d of %zu",
1998 errmsg(
"could not close file \"%s\": %m", path)));
2009 (
errmsg(
"checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2018 if (!
rmtree(slotdir,
true))
2021 (
errmsg(
"could not remove directory \"%s\"",
2042 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2043 errmsg(
"logical replication slot \"%s\" exists, but wal_level < logical",
2045 errhint(
"Change wal_level to be logical or higher.")));
2048 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2049 errmsg(
"physical replication slot \"%s\" exists, but wal_level < replica",
2051 errhint(
"Change wal_level to be replica or higher.")));
2085 (
errmsg(
"too many replication slots active before shutdown"),
2086 errhint(
"Increase max_replication_slots and try again.")));
#define MemSet(start, val, len)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
void ConditionVariableCancelSleep(void)
elog(ERROR, "%s: %s", p2, msg)
int errdetail_internal(const char *fmt,...)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
struct dirent * ReadDir(DIR *dir, const char *dirname)
int MakePGDirectory(const char *directoryName)
int CloseTransientFile(int fd)
void fsync_fname(const char *fname, bool isdir)
int OpenTransientFile(const char *fileName, int fileFlags)
DIR * AllocateDir(const char *dirname)
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Assert(fmt[strlen(fmt) - 1] !='\n')
bool LWLockHeldByMe(LWLock *lock)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void LWLockInitialize(LWLock *lock, int tranche_id)
@ LWTRANCHE_REPLICATION_SLOT_IO
void pfree(void *pointer)
#define START_CRIT_SECTION()
#define END_CRIT_SECTION()
BackendType MyBackendType
bool has_rolreplication(Oid roleid)
void namestrcpy(Name name, const char *str)
#define ERRCODE_DATA_CORRUPTED
#define COMP_CRC32C(crc, data, len)
#define EQ_CRC32C(c1, c2)
void pgstat_create_replslot(ReplicationSlot *slot)
void pgstat_acquire_replslot(ReplicationSlot *slot)
void pgstat_drop_replslot(ReplicationSlot *slot)
static int fd(const char *x, int i)
#define PROC_IN_LOGICAL_DECODING
void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked)
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
@ PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT
bool rmtree(const char *path, bool rmtopdir)
Size add_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Size mul_size(Size s1, Size s2)
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
int ReplicationSlotIndex(ReplicationSlot *slot)
#define ReplicationSlotOnDiskChecksummedSize
void ReplicationSlotCleanup(void)
void ReplicationSlotMarkDirty(void)
void ReplicationSlotReserveWal(void)
bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
void ReplicationSlotAcquire(const char *name, bool nowait)
bool InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
void ReplicationSlotsDropDBSlots(Oid dboid)
#define ReplicationSlotOnDiskNotChecksummedSize
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
static void RestoreSlotFromDisk(const char *name)
void ReplicationSlotPersist(void)
ReplicationSlot * MyReplicationSlot
static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
void ReplicationSlotDrop(const char *name, bool nowait)
void ReplicationSlotSave(void)
static void CreateSlotOnDisk(ReplicationSlot *slot)
#define ReplicationSlotOnDiskV2Size
void CheckSlotPermissions(void)
bool ReplicationSlotName(int index, Name name)
void ReplicationSlotsShmemInit(void)
void ReplicationSlotRelease(void)
int max_replication_slots
ReplicationSlotCtlData * ReplicationSlotCtl
struct ReplicationSlotOnDisk ReplicationSlotOnDisk
void CheckPointReplicationSlots(void)
void ReplicationSlotsComputeRequiredLSN(void)
void ReplicationSlotInitialize(void)
static void ReplicationSlotDropPtr(ReplicationSlot *slot)
static bool InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, bool *invalidated)
void StartupReplicationSlots(void)
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase)
void CheckSlotRequirements(void)
static void ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, bool terminating, int pid, NameData slotname, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, TransactionId snapshotConflictHorizon)
static void ReplicationSlotDropAcquired(void)
#define ReplicationSlotOnDiskConstantSize
Size ReplicationSlotsShmemSize(void)
bool ReplicationSlotValidateName(const char *name, int elevel)
static void ReplicationSlotShmemExit(int code, Datum arg)
ReplicationSlotPersistency
#define SlotIsPhysical(slot)
ReplicationSlotInvalidationCause
#define SlotIsLogical(slot)
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
XLogRecPtr LogStandbySnapshot(void)
#define ERRCODE_DUPLICATE_OBJECT
bool pg_str_endswith(const char *str, const char *end)
void appendStringInfo(StringInfo str, const char *fmt,...)
void initStringInfo(StringInfo str)
ReplicationSlot replication_slots[1]
ReplicationSlotPersistentData slotdata
TransactionId catalog_xmin
ReplicationSlotPersistency persistency
ReplicationSlotInvalidationCause invalidated
XLogRecPtr candidate_xmin_lsn
TransactionId effective_catalog_xmin
XLogRecPtr candidate_restart_valid
TransactionId effective_xmin
XLogRecPtr candidate_restart_lsn
LWLock io_in_progress_lock
ConditionVariable active_cv
TransactionId candidate_catalog_xmin
ReplicationSlotPersistentData data
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
#define InvalidTransactionId
#define TransactionIdIsValid(xid)
@ WAIT_EVENT_REPLICATION_SLOT_READ
@ WAIT_EVENT_REPLICATION_SLOT_WRITE
@ WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC
@ WAIT_EVENT_REPLICATION_SLOT_SYNC
@ WAIT_EVENT_REPLICATION_SLOT_DROP
static void pgstat_report_wait_start(uint32 wait_event_info)
static void pgstat_report_wait_end(void)
bool RecoveryInProgress(void)
XLogSegNo XLogGetLastRemovedSegno(void)
XLogRecPtr GetRedoRecPtr(void)
void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
XLogRecPtr GetXLogInsertRecPtr(void)
void XLogFlush(XLogRecPtr record)
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)