PostgreSQL Source Code git master
Loading...
Searching...
No Matches
slot.h File Reference
Include dependency graph for slot.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ReplicationSlotPersistentData
 
struct  ReplicationSlot
 
struct  ReplicationSlotCtlData
 

Macros

#define PG_REPLSLOT_DIR   "pg_replslot"
 
#define CONFLICT_DETECTION_SLOT   "pg_conflict_detection"
 
#define RS_INVAL_MAX_CAUSES   4
 
#define SlotIsPhysical(slot)   ((slot)->data.database == InvalidOid)
 
#define SlotIsLogical(slot)   ((slot)->data.database != InvalidOid)
 

Typedefs

typedef enum ReplicationSlotPersistency ReplicationSlotPersistency
 
typedef enum ReplicationSlotInvalidationCause ReplicationSlotInvalidationCause
 
typedef enum SlotSyncSkipReason SlotSyncSkipReason
 
typedef struct ReplicationSlotPersistentData ReplicationSlotPersistentData
 
typedef struct ReplicationSlot ReplicationSlot
 
typedef struct ReplicationSlotCtlData ReplicationSlotCtlData
 

Enumerations

enum  ReplicationSlotPersistency { RS_PERSISTENT , RS_EPHEMERAL , RS_TEMPORARY }
 
enum  ReplicationSlotInvalidationCause {
  RS_INVAL_NONE = 0 , RS_INVAL_WAL_REMOVED = (1 << 0) , RS_INVAL_HORIZON = (1 << 1) , RS_INVAL_WAL_LEVEL = (1 << 2) ,
  RS_INVAL_IDLE_TIMEOUT = (1 << 3)
}
 
enum  SlotSyncSkipReason {
  SS_SKIP_NONE , SS_SKIP_WAL_NOT_FLUSHED , SS_SKIP_WAL_OR_ROWS_REMOVED , SS_SKIP_NO_CONSISTENT_SNAPSHOT ,
  SS_SKIP_INVALID
}
 

Functions

static void ReplicationSlotSetInactiveSince (ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
 
Size ReplicationSlotsShmemSize (void)
 
void ReplicationSlotsShmemInit (void)
 
void ReplicationSlotCreate (const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
 
void ReplicationSlotPersist (void)
 
void ReplicationSlotDrop (const char *name, bool nowait)
 
void ReplicationSlotDropAcquired (void)
 
void ReplicationSlotAlter (const char *name, const bool *failover, const bool *two_phase)
 
void ReplicationSlotAcquire (const char *name, bool nowait, bool error_if_invalid)
 
void ReplicationSlotRelease (void)
 
void ReplicationSlotCleanup (bool synced_only)
 
void ReplicationSlotSave (void)
 
void ReplicationSlotMarkDirty (void)
 
void ReplicationSlotInitialize (void)
 
bool ReplicationSlotValidateName (const char *name, bool allow_reserved_name, int elevel)
 
bool ReplicationSlotValidateNameInternal (const char *name, bool allow_reserved_name, int *err_code, char **err_msg, char **err_hint)
 
void ReplicationSlotReserveWal (void)
 
void ReplicationSlotsComputeRequiredXmin (bool already_locked)
 
void ReplicationSlotsComputeRequiredLSN (void)
 
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN (void)
 
bool ReplicationSlotsCountDBSlots (Oid dboid, int *nslots, int *nactive)
 
bool CheckLogicalSlotExists (void)
 
void ReplicationSlotsDropDBSlots (Oid dboid)
 
bool InvalidateObsoleteReplicationSlots (uint32 possible_causes, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
 
ReplicationSlotSearchNamedReplicationSlot (const char *name, bool need_lock)
 
int ReplicationSlotIndex (ReplicationSlot *slot)
 
bool ReplicationSlotName (int index, Name name)
 
void ReplicationSlotNameForTablesync (Oid suboid, Oid relid, char *syncslotname, Size szslot)
 
void ReplicationSlotDropAtPubNode (WalReceiverConn *wrconn, char *slotname, bool missing_ok)
 
void StartupReplicationSlots (void)
 
void CheckPointReplicationSlots (bool is_shutdown)
 
void CheckSlotRequirements (void)
 
void CheckSlotPermissions (void)
 
ReplicationSlotInvalidationCause GetSlotInvalidationCause (const char *cause_name)
 
const charGetSlotInvalidationCauseName (ReplicationSlotInvalidationCause cause)
 
bool SlotExistsInSyncStandbySlots (const char *slot_name)
 
bool StandbySlotsHaveCaughtup (XLogRecPtr wait_for_lsn, int elevel)
 
void WaitForStandbyConfirmation (XLogRecPtr wait_for_lsn)
 

Variables

PGDLLIMPORT ReplicationSlotCtlDataReplicationSlotCtl
 
PGDLLIMPORT ReplicationSlotMyReplicationSlot
 
PGDLLIMPORT int max_replication_slots
 
PGDLLIMPORT charsynchronized_standby_slots
 
PGDLLIMPORT int idle_replication_slot_timeout_secs
 

Macro Definition Documentation

◆ CONFLICT_DETECTION_SLOT

#define CONFLICT_DETECTION_SLOT   "pg_conflict_detection"

Definition at line 28 of file slot.h.

◆ PG_REPLSLOT_DIR

#define PG_REPLSLOT_DIR   "pg_replslot"

Definition at line 21 of file slot.h.

◆ RS_INVAL_MAX_CAUSES

#define RS_INVAL_MAX_CAUSES   4

Definition at line 72 of file slot.h.

◆ SlotIsLogical

#define SlotIsLogical (   slot)    ((slot)->data.database != InvalidOid)

Definition at line 288 of file slot.h.

◆ SlotIsPhysical

#define SlotIsPhysical (   slot)    ((slot)->data.database == InvalidOid)

Definition at line 287 of file slot.h.

Typedef Documentation

◆ ReplicationSlot

◆ ReplicationSlotCtlData

◆ ReplicationSlotInvalidationCause

◆ ReplicationSlotPersistency

◆ ReplicationSlotPersistentData

◆ SlotSyncSkipReason

Enumeration Type Documentation

◆ ReplicationSlotInvalidationCause

Enumerator
RS_INVAL_NONE 
RS_INVAL_WAL_REMOVED 
RS_INVAL_HORIZON 
RS_INVAL_WAL_LEVEL 
RS_INVAL_IDLE_TIMEOUT 

Definition at line 58 of file slot.h.

59{
60 RS_INVAL_NONE = 0,
61 /* required WAL has been removed */
62 RS_INVAL_WAL_REMOVED = (1 << 0),
63 /* required rows have been removed */
64 RS_INVAL_HORIZON = (1 << 1),
65 /* wal_level insufficient for slot */
66 RS_INVAL_WAL_LEVEL = (1 << 2),
67 /* idle slot timeout has occurred */
68 RS_INVAL_IDLE_TIMEOUT = (1 << 3),
ReplicationSlotInvalidationCause
Definition slot.h:59
@ RS_INVAL_WAL_REMOVED
Definition slot.h:62
@ RS_INVAL_IDLE_TIMEOUT
Definition slot.h:68
@ RS_INVAL_HORIZON
Definition slot.h:64
@ RS_INVAL_WAL_LEVEL
Definition slot.h:66
@ RS_INVAL_NONE
Definition slot.h:60

◆ ReplicationSlotPersistency

Enumerator
RS_PERSISTENT 
RS_EPHEMERAL 
RS_TEMPORARY 

Definition at line 43 of file slot.h.

44{
ReplicationSlotPersistency
Definition slot.h:44
@ RS_PERSISTENT
Definition slot.h:45
@ RS_EPHEMERAL
Definition slot.h:46
@ RS_TEMPORARY
Definition slot.h:47

◆ SlotSyncSkipReason

Enumerator
SS_SKIP_NONE 
SS_SKIP_WAL_NOT_FLUSHED 
SS_SKIP_WAL_OR_ROWS_REMOVED 
SS_SKIP_NO_CONSISTENT_SNAPSHOT 
SS_SKIP_INVALID 

Definition at line 80 of file slot.h.

81{
82 SS_SKIP_NONE, /* No skip */
83 SS_SKIP_WAL_NOT_FLUSHED, /* Standby did not flush the wal corresponding
84 * to confirmed flush of remote slot */
85 SS_SKIP_WAL_OR_ROWS_REMOVED, /* Remote slot is behind; required WAL or
86 * rows may be removed or at risk */
87 SS_SKIP_NO_CONSISTENT_SNAPSHOT, /* Standby could not build a consistent
88 * snapshot */
89 SS_SKIP_INVALID /* Local slot is invalid */
SlotSyncSkipReason
Definition slot.h:81
@ SS_SKIP_WAL_NOT_FLUSHED
Definition slot.h:83
@ SS_SKIP_NO_CONSISTENT_SNAPSHOT
Definition slot.h:87
@ SS_SKIP_NONE
Definition slot.h:82
@ SS_SKIP_INVALID
Definition slot.h:89
@ SS_SKIP_WAL_OR_ROWS_REMOVED
Definition slot.h:85

Function Documentation

◆ CheckLogicalSlotExists()

bool CheckLogicalSlotExists ( void  )
extern

Definition at line 1616 of file slot.c.

1617{
1618 bool found = false;
1619
1620 if (max_replication_slots <= 0)
1621 return false;
1622
1624 for (int i = 0; i < max_replication_slots; i++)
1625 {
1626 ReplicationSlot *s;
1627 bool invalidated;
1628
1630
1631 /* cannot change while ReplicationSlotCtlLock is held */
1632 if (!s->in_use)
1633 continue;
1634
1635 if (SlotIsPhysical(s))
1636 continue;
1637
1639 invalidated = s->data.invalidated != RS_INVAL_NONE;
1641
1642 if (invalidated)
1643 continue;
1644
1645 found = true;
1646 break;
1647 }
1649
1650 return found;
1651}
int i
Definition isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1177
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1794
@ LW_SHARED
Definition lwlock.h:113
static int fb(int x)
int max_replication_slots
Definition slot.c:152
ReplicationSlotCtlData * ReplicationSlotCtl
Definition slot.c:146
#define SlotIsPhysical(slot)
Definition slot.h:287
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
ReplicationSlot replication_slots[1]
Definition slot.h:299
ReplicationSlotInvalidationCause invalidated
Definition slot.h:128
slock_t mutex
Definition slot.h:183
bool in_use
Definition slot.h:186
ReplicationSlotPersistentData data
Definition slot.h:213

References ReplicationSlot::data, fb(), i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, RS_INVAL_NONE, SlotIsPhysical, SpinLockAcquire(), and SpinLockRelease().

Referenced by DisableLogicalDecoding(), and UpdateLogicalDecodingStatusEndOfRecovery().

◆ CheckPointReplicationSlots()

void CheckPointReplicationSlots ( bool  is_shutdown)
extern

Definition at line 2310 of file slot.c.

2311{
2312 int i;
2313 bool last_saved_restart_lsn_updated = false;
2314
2315 elog(DEBUG1, "performing replication slot checkpoint");
2316
2317 /*
2318 * Prevent any slot from being created/dropped while we're active. As we
2319 * explicitly do *not* want to block iterating over replication_slots or
2320 * acquiring a slot we cannot take the control lock - but that's OK,
2321 * because holding ReplicationSlotAllocationLock is strictly stronger, and
2322 * enough to guarantee that nobody can change the in_use bits on us.
2323 *
2324 * Additionally, acquiring the Allocation lock is necessary to serialize
2325 * the slot flush process with concurrent slot WAL reservation. This
2326 * ensures that the WAL position being reserved is either flushed to disk
2327 * or is beyond or equal to the redo pointer of the current checkpoint
2328 * (See ReplicationSlotReserveWal for details).
2329 */
2331
2332 for (i = 0; i < max_replication_slots; i++)
2333 {
2335 char path[MAXPGPATH];
2336
2337 if (!s->in_use)
2338 continue;
2339
2340 /* save the slot to disk, locking is handled in SaveSlotToPath() */
2341 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2342
2343 /*
2344 * Slot's data is not flushed each time the confirmed_flush LSN is
2345 * updated as that could lead to frequent writes. However, we decide
2346 * to force a flush of all logical slot's data at the time of shutdown
2347 * if the confirmed_flush LSN is changed since we last flushed it to
2348 * disk. This helps in avoiding an unnecessary retreat of the
2349 * confirmed_flush LSN after restart.
2350 */
2351 if (is_shutdown && SlotIsLogical(s))
2352 {
2354
2355 if (s->data.invalidated == RS_INVAL_NONE &&
2357 {
2358 s->just_dirtied = true;
2359 s->dirty = true;
2360 }
2362 }
2363
2364 /*
2365 * Track if we're going to update slot's last_saved_restart_lsn. We
2366 * need this to know if we need to recompute the required LSN.
2367 */
2370
2371 SaveSlotToPath(s, path, LOG);
2372 }
2374
2375 /*
2376 * Recompute the required LSN if SaveSlotToPath() updated
2377 * last_saved_restart_lsn for any slot.
2378 */
2381}
#define NameStr(name)
Definition c.h:837
#define LOG
Definition elog.h:31
#define DEBUG1
Definition elog.h:30
#define elog(elevel,...)
Definition elog.h:226
#define MAXPGPATH
#define sprintf
Definition port.h:262
static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
Definition slot.c:2510
void ReplicationSlotsComputeRequiredLSN(void)
Definition slot.c:1301
#define PG_REPLSLOT_DIR
Definition slot.h:21
#define SlotIsLogical(slot)
Definition slot.h:288
XLogRecPtr last_saved_confirmed_flush
Definition slot.h:238
bool just_dirtied
Definition slot.h:195
XLogRecPtr last_saved_restart_lsn
Definition slot.h:271

References ReplicationSlotPersistentData::confirmed_flush, ReplicationSlot::data, DEBUG1, ReplicationSlot::dirty, elog, fb(), i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, ReplicationSlot::just_dirtied, ReplicationSlot::last_saved_confirmed_flush, ReplicationSlot::last_saved_restart_lsn, LOG, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, MAXPGPATH, ReplicationSlot::mutex, ReplicationSlotPersistentData::name, NameStr, PG_REPLSLOT_DIR, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, SaveSlotToPath(), SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), and sprintf.

Referenced by CheckPointGuts().

◆ CheckSlotPermissions()

void CheckSlotPermissions ( void  )
extern

Definition at line 1680 of file slot.c.

1681{
1683 ereport(ERROR,
1685 errmsg("permission denied to use replication slots"),
1686 errdetail("Only roles with the %s attribute may use replication slots.",
1687 "REPLICATION")));
1688}
int errcode(int sqlerrcode)
Definition elog.c:874
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define ERROR
Definition elog.h:39
#define ereport(elevel,...)
Definition elog.h:150
Oid GetUserId(void)
Definition miscinit.c:470
bool has_rolreplication(Oid roleid)
Definition miscinit.c:689
static char * errmsg

References ereport, errcode(), errdetail(), errmsg, ERROR, fb(), GetUserId(), and has_rolreplication().

Referenced by copy_replication_slot(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_drop_replication_slot(), pg_logical_slot_get_changes_guts(), pg_replication_slot_advance(), and pg_sync_replication_slots().

◆ CheckSlotRequirements()

void CheckSlotRequirements ( void  )
extern

Definition at line 1658 of file slot.c.

1659{
1660 /*
1661 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1662 * needs the same check.
1663 */
1664
1665 if (max_replication_slots == 0)
1666 ereport(ERROR,
1668 errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
1669
1671 ereport(ERROR,
1673 errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1674}
int wal_level
Definition xlog.c:135
@ WAL_LEVEL_REPLICA
Definition xlog.h:76

References ereport, errcode(), errmsg, ERROR, fb(), max_replication_slots, wal_level, and WAL_LEVEL_REPLICA.

Referenced by CheckLogicalDecodingRequirements(), copy_replication_slot(), pg_create_physical_replication_slot(), and pg_drop_replication_slot().

◆ GetSlotInvalidationCause()

ReplicationSlotInvalidationCause GetSlotInvalidationCause ( const char cause_name)
extern

Definition at line 2916 of file slot.c.

2917{
2918 Assert(cause_name);
2919
2920 /* Search lookup table for the cause having this name */
2921 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2922 {
2923 if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2925 }
2926
2927 Assert(false);
2928 return RS_INVAL_NONE; /* to keep compiler quiet */
2929}
#define Assert(condition)
Definition c.h:945
static const SlotInvalidationCauseMap SlotInvalidationCauses[]
Definition slot.c:114
#define RS_INVAL_MAX_CAUSES
Definition slot.h:72
ReplicationSlotInvalidationCause cause
Definition slot.c:110

References Assert, SlotInvalidationCauseMap::cause, fb(), i, RS_INVAL_MAX_CAUSES, RS_INVAL_NONE, and SlotInvalidationCauses.

Referenced by fetch_remote_slots().

◆ GetSlotInvalidationCauseName()

const char * GetSlotInvalidationCauseName ( ReplicationSlotInvalidationCause  cause)
extern

Definition at line 2936 of file slot.c.

2937{
2938 /* Search lookup table for the name of this cause */
2939 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2940 {
2941 if (SlotInvalidationCauses[i].cause == cause)
2943 }
2944
2945 Assert(false);
2946 return "none"; /* to keep compiler quiet */
2947}
const char * cause_name
Definition slot.c:111

References Assert, SlotInvalidationCauseMap::cause_name, i, RS_INVAL_MAX_CAUSES, and SlotInvalidationCauses.

Referenced by pg_get_replication_slots(), and ReplicationSlotAcquire().

◆ InvalidateObsoleteReplicationSlots()

bool InvalidateObsoleteReplicationSlots ( uint32  possible_causes,
XLogSegNo  oldestSegno,
Oid  dboid,
TransactionId  snapshotConflictHorizon 
)
extern

Definition at line 2206 of file slot.c.

2209{
2211 bool invalidated = false;
2212 bool invalidated_logical = false;
2214
2215 Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2218
2219 if (max_replication_slots == 0)
2220 return invalidated;
2221
2223
2224restart:
2227 for (int i = 0; i < max_replication_slots; i++)
2228 {
2230 bool released_lock = false;
2231
2232 if (!s->in_use)
2233 continue;
2234
2235 /* Prevent invalidation of logical slots during binary upgrade */
2237 {
2241
2242 continue;
2243 }
2244
2246 dboid, snapshotConflictHorizon,
2247 &released_lock))
2248 {
2250
2251 /* Remember we have invalidated a physical or logical slot */
2252 invalidated = true;
2253
2254 /*
2255 * Additionally, remember we have invalidated a logical slot as we
2256 * can request disabling logical decoding later.
2257 */
2258 if (SlotIsLogical(s))
2259 invalidated_logical = true;
2260 }
2261 else
2262 {
2263 /*
2264 * We need to check if the slot is invalidated here since
2265 * InvalidatePossiblyObsoleteSlot() returns false also if the slot
2266 * is already invalidated.
2267 */
2272 }
2273
2274 /* if the lock was released, start from scratch */
2275 if (released_lock)
2276 goto restart;
2277 }
2279
2280 /*
2281 * If any slots have been invalidated, recalculate the resource limits.
2282 */
2283 if (invalidated)
2284 {
2287 }
2288
2289 /*
2290 * Request the checkpointer to disable logical decoding if no valid
2291 * logical slots remain. If called by the checkpointer during a
2292 * checkpoint, only the request is initiated; actual deactivation is
2293 * deferred until after the checkpoint completes.
2294 */
2297
2298 return invalidated;
2299}
bool IsBinaryUpgrade
Definition globals.c:121
void RequestDisableLogicalDecoding(void)
Definition logicalctl.c:434
static bool InvalidatePossiblyObsoleteSlot(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, bool *released_lock_out)
Definition slot.c:1966
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition slot.c:1219
#define TransactionIdIsValid(xid)
Definition transam.h:41
int wal_segment_size
Definition xlog.c:147
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
uint64 XLogRecPtr
Definition xlogdefs.h:21

References Assert, ReplicationSlot::data, fb(), i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidatePossiblyObsoleteSlot(), IsBinaryUpgrade, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), RequestDisableLogicalDecoding(), RS_INVAL_HORIZON, RS_INVAL_NONE, RS_INVAL_WAL_REMOVED, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), TransactionIdIsValid, wal_segment_size, and XLogSegNoOffsetToRecPtr.

Referenced by CreateCheckPoint(), CreateRestartPoint(), ResolveRecoveryConflictWithSnapshot(), and xlog_redo().

◆ ReplicationSlotAcquire()

void ReplicationSlotAcquire ( const char name,
bool  nowait,
bool  error_if_invalid 
)
extern

Definition at line 622 of file slot.c.

623{
625 ProcNumber active_proc;
626 int active_pid;
627
628 Assert(name != NULL);
629
630retry:
632
634
635 /* Check if the slot exists with the given name. */
637 if (s == NULL || !s->in_use)
638 {
640
643 errmsg("replication slot \"%s\" does not exist",
644 name)));
645 }
646
647 /*
648 * Do not allow users to acquire the reserved slot. This scenario may
649 * occur if the launcher that owns the slot has terminated unexpectedly
650 * due to an error, and a backend process attempts to reuse the slot.
651 */
655 errmsg("cannot acquire replication slot \"%s\"", name),
656 errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
657
658 /*
659 * This is the slot we want; check if it's active under some other
660 * process. In single user mode, we don't need this check.
661 */
663 {
664 /*
665 * Get ready to sleep on the slot in case it is active. (We may end
666 * up not sleeping, but we don't want to do this while holding the
667 * spinlock.)
668 */
669 if (!nowait)
671
672 /*
673 * It is important to reset the inactive_since under spinlock here to
674 * avoid race conditions with slot invalidation. See comments related
675 * to inactive_since in InvalidatePossiblyObsoleteSlot.
676 */
680 active_proc = s->active_proc;
683 }
684 else
685 {
686 s->active_proc = active_proc = MyProcNumber;
688 }
689 active_pid = GetPGProcByNumber(active_proc)->pid;
691
692 /*
693 * If we found the slot but it's already active in another process, we
694 * wait until the owning process signals us that it's been released, or
695 * error out.
696 */
697 if (active_proc != MyProcNumber)
698 {
699 if (!nowait)
700 {
701 /* Wait here until we get signaled, and then restart */
705 goto retry;
706 }
707
710 errmsg("replication slot \"%s\" is active for PID %d",
711 NameStr(s->data.name), active_pid)));
712 }
713 else if (!nowait)
714 ConditionVariableCancelSleep(); /* no sleep needed after all */
715
716 /* We made this slot active, so it's ours now. */
718
719 /*
720 * We need to check for invalidation after making the slot ours to avoid
721 * the possible race condition with the checkpointer that can otherwise
722 * invalidate the slot immediately after the check.
723 */
727 errmsg("can no longer access replication slot \"%s\"",
728 NameStr(s->data.name)),
729 errdetail("This replication slot has been invalidated due to \"%s\".",
731
732 /* Let everybody know we've modified this slot */
734
735 /*
736 * The call to pgstat_acquire_replslot() protects against stats for a
737 * different slot, from before a restart or such, being present during
738 * pgstat_report_replslot().
739 */
740 if (SlotIsLogical(s))
742
743
744 if (am_walsender)
745 {
748 ? errmsg("acquired logical replication slot \"%s\"",
749 NameStr(s->data.name))
750 : errmsg("acquired physical replication slot \"%s\"",
751 NameStr(s->data.name)));
752 }
753}
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
ProcNumber MyProcNumber
Definition globals.c:90
bool IsUnderPostmaster
Definition globals.c:120
bool IsLogicalLauncher(void)
Definition launcher.c:1587
const void * data
void pgstat_acquire_replslot(ReplicationSlot *slot)
#define GetPGProcByNumber(n)
Definition proc.h:501
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int ProcNumber
Definition procnumber.h:24
ReplicationSlot * MyReplicationSlot
Definition slot.c:149
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition slot.c:542
const char * GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
Definition slot.c:2936
static bool IsSlotForConflictCheck(const char *name)
Definition slot.c:363
static void ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
Definition slot.h:306
ConditionVariable active_cv
Definition slot.h:219
ProcNumber active_proc
Definition slot.h:192
const char * name
bool am_walsender
Definition walsender.c:124
bool log_replication_commands
Definition walsender.c:134

References ReplicationSlot::active_cv, ReplicationSlot::active_proc, am_walsender, Assert, ConditionVariableBroadcast(), ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), ConditionVariableSleep(), ReplicationSlot::data, DEBUG1, ereport, errcode(), errdetail(), errmsg, ERROR, fb(), GetPGProcByNumber, GetSlotInvalidationCauseName(), ReplicationSlot::in_use, INVALID_PROC_NUMBER, ReplicationSlotPersistentData::invalidated, IsLogicalLauncher(), IsSlotForConflictCheck(), IsUnderPostmaster, LOG, log_replication_commands, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyProcNumber, MyReplicationSlot, name, ReplicationSlotPersistentData::name, NameStr, pgstat_acquire_replslot(), ReplicationSlotSetInactiveSince(), RS_INVAL_NONE, SearchNamedReplicationSlot(), SlotIsLogical, SpinLockAcquire(), and SpinLockRelease().

Referenced by acquire_conflict_slot_if_exists(), binary_upgrade_check_logical_slot_pending_wal(), drop_local_obsolete_slots(), pg_logical_slot_get_changes_guts(), pg_replication_slot_advance(), ReplicationSlotAlter(), ReplicationSlotDrop(), StartLogicalReplication(), StartReplication(), and synchronize_one_slot().

◆ ReplicationSlotAlter()

void ReplicationSlotAlter ( const char name,
const bool failover,
const bool two_phase 
)
extern

Definition at line 953 of file slot.c.

955{
956 bool update_slot = false;
957
960
961 ReplicationSlotAcquire(name, false, true);
962
966 errmsg("cannot use %s with a physical replication slot",
967 "ALTER_REPLICATION_SLOT"));
968
969 if (RecoveryInProgress())
970 {
971 /*
972 * Do not allow users to alter the slots which are currently being
973 * synced from the primary to the standby.
974 */
978 errmsg("cannot alter replication slot \"%s\"", name),
979 errdetail("This replication slot is being synchronized from the primary server."));
980
981 /*
982 * Do not allow users to enable failover on the standby as we do not
983 * support sync to the cascading standby.
984 */
985 if (failover && *failover)
988 errmsg("cannot enable failover for a replication slot"
989 " on the standby"));
990 }
991
992 if (failover)
993 {
994 /*
995 * Do not allow users to enable failover for temporary slots as we do
996 * not support syncing temporary slots to the standby.
997 */
1001 errmsg("cannot enable failover for a temporary replication slot"));
1002
1004 {
1008
1009 update_slot = true;
1010 }
1011 }
1012
1014 {
1018
1019 update_slot = true;
1020 }
1021
1022 if (update_slot)
1023 {
1026 }
1027
1029}
static bool two_phase
static bool failover
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition slot.c:622
void ReplicationSlotMarkDirty(void)
Definition slot.c:1177
void ReplicationSlotSave(void)
Definition slot.c:1159
void ReplicationSlotRelease(void)
Definition slot.c:762
ReplicationSlotPersistency persistency
Definition slot.h:106
bool RecoveryInProgress(void)
Definition xlog.c:6444

References Assert, ReplicationSlot::data, ereport, errcode(), errdetail(), errmsg, ERROR, failover, ReplicationSlotPersistentData::failover, fb(), ReplicationSlot::mutex, MyReplicationSlot, name, ReplicationSlotPersistentData::persistency, RecoveryInProgress(), ReplicationSlotAcquire(), ReplicationSlotMarkDirty(), ReplicationSlotRelease(), ReplicationSlotSave(), RS_TEMPORARY, SlotIsPhysical, SpinLockAcquire(), SpinLockRelease(), ReplicationSlotPersistentData::synced, two_phase, and ReplicationSlotPersistentData::two_phase.

Referenced by AlterReplicationSlot().

◆ ReplicationSlotCleanup()

void ReplicationSlotCleanup ( bool  synced_only)
extern

Definition at line 861 of file slot.c.

862{
863 int i;
865 bool dropped_logical = false;
866
868
869restart:
872 for (i = 0; i < max_replication_slots; i++)
873 {
875
876 if (!s->in_use)
877 continue;
878
880
883
884 if ((s->active_proc == MyProcNumber &&
885 (!synced_only || s->data.synced)))
886 {
889 LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
890
891 if (SlotIsLogical(s))
892 dropped_logical = true;
893
895
897 goto restart;
898 }
899 else
901 }
902
904
907}
static void ReplicationSlotDropPtr(ReplicationSlot *slot)
Definition slot.c:1052

References ReplicationSlot::active_cv, ReplicationSlot::active_proc, Assert, ConditionVariableBroadcast(), ReplicationSlot::data, fb(), i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, MyProcNumber, MyReplicationSlot, ReplicationSlotPersistentData::persistency, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotDropPtr(), RequestDisableLogicalDecoding(), RS_INVAL_NONE, RS_TEMPORARY, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), and ReplicationSlotPersistentData::synced.

Referenced by PostgresMain(), ReplicationSlotShmemExit(), slotsync_failure_callback(), slotsync_worker_onexit(), SyncReplicationSlots(), and WalSndErrorCleanup().

◆ ReplicationSlotCreate()

void ReplicationSlotCreate ( const char name,
bool  db_specific,
ReplicationSlotPersistency  persistency,
bool  two_phase,
bool  failover,
bool  synced 
)
extern

Definition at line 380 of file slot.c.

383{
384 ReplicationSlot *slot = NULL;
385 int i;
386
388
389 /*
390 * The logical launcher or pg_upgrade may create or migrate an internal
391 * slot, so using a reserved name is allowed in these cases.
392 */
394 ERROR);
395
396 if (failover)
397 {
398 /*
399 * Do not allow users to create the failover enabled slots on the
400 * standby as we do not support sync to the cascading standby.
401 *
402 * However, failover enabled slots can be created during slot
403 * synchronization because we need to retain the same values as the
404 * remote slot.
405 */
409 errmsg("cannot enable failover for a replication slot created on the standby"));
410
411 /*
412 * Do not allow users to create failover enabled temporary slots,
413 * because temporary slots will not be synced to the standby.
414 *
415 * However, failover enabled temporary slots can be created during
416 * slot synchronization. See the comments atop slotsync.c for details.
417 */
418 if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
421 errmsg("cannot enable failover for a temporary replication slot"));
422 }
423
424 /*
425 * If some other backend ran this code concurrently with us, we'd likely
426 * both allocate the same slot, and that would be bad. We'd also be at
427 * risk of missing a name collision. Also, we don't want to try to create
428 * a new slot while somebody's busy cleaning up an old one, because we
429 * might both be monkeying with the same directory.
430 */
432
433 /*
434 * Check for name collision, and identify an allocatable slot. We need to
435 * hold ReplicationSlotControlLock in shared mode for this, so that nobody
436 * else can change the in_use flags while we're looking at them.
437 */
439 for (i = 0; i < max_replication_slots; i++)
440 {
442
443 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
446 errmsg("replication slot \"%s\" already exists", name)));
447 if (!s->in_use && slot == NULL)
448 slot = s;
449 }
451
452 /* If all slots are in use, we're out of luck. */
453 if (slot == NULL)
456 errmsg("all replication slots are in use"),
457 errhint("Free one or increase \"max_replication_slots\".")));
458
459 /*
460 * Since this slot is not in use, nobody should be looking at any part of
461 * it other than the in_use field unless they're trying to allocate it.
462 * And since we hold ReplicationSlotAllocationLock, nobody except us can
463 * be doing that. So it's safe to initialize the slot.
464 */
465 Assert(!slot->in_use);
467
468 /* first initialize persistent data */
469 memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
470 namestrcpy(&slot->data.name, name);
472 slot->data.persistency = persistency;
473 slot->data.two_phase = two_phase;
475 slot->data.failover = failover;
476 slot->data.synced = synced;
477
478 /* and then data only present in shared memory */
479 slot->just_dirtied = false;
480 slot->dirty = false;
489 slot->inactive_since = 0;
491
492 /*
493 * Create the slot on disk. We haven't actually marked the slot allocated
494 * yet, so no special cleanup is required if this errors out.
495 */
496 CreateSlotOnDisk(slot);
497
498 /*
499 * We need to briefly prevent any other backend from iterating over the
500 * slots while we flip the in_use flag. We also need to set the active
501 * flag while holding the ControlLock as otherwise a concurrent
502 * ReplicationSlotAcquire() could acquire the slot as well.
503 */
505
506 slot->in_use = true;
507
508 /* We can now mark the slot active, and that makes it our slot. */
509 SpinLockAcquire(&slot->mutex);
512 SpinLockRelease(&slot->mutex);
513 MyReplicationSlot = slot;
514
516
517 /*
518 * Create statistics entry for the new logical slot. We don't collect any
519 * stats for physical slots, so no need to create an entry for the same.
520 * See ReplicationSlotDropPtr for why we need to do this before releasing
521 * ReplicationSlotAllocationLock.
522 */
523 if (SlotIsLogical(slot))
525
526 /*
527 * Now that the slot has been marked as in_use and active, it's safe to
528 * let somebody else try to allocate a slot.
529 */
531
532 /* Let everybody know we've modified this slot */
534}
int errhint(const char *fmt,...) pg_attribute_printf(1
Oid MyDatabaseId
Definition globals.c:94
@ LW_EXCLUSIVE
Definition lwlock.h:112
void namestrcpy(Name name, const char *str)
Definition name.c:233
void pgstat_create_replslot(ReplicationSlot *slot)
#define InvalidOid
static void CreateSlotOnDisk(ReplicationSlot *slot)
Definition slot.c:2449
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
Definition slot.c:268
bool IsSyncingReplicationSlots(void)
Definition slotsync.c:1825
#define ERRCODE_DUPLICATE_OBJECT
Definition streamutil.c:30
XLogRecPtr candidate_xmin_lsn
Definition slot.h:229
TransactionId effective_catalog_xmin
Definition slot.h:210
XLogRecPtr candidate_restart_valid
Definition slot.h:230
SlotSyncSkipReason slotsync_skip_reason
Definition slot.h:284
TransactionId effective_xmin
Definition slot.h:209
XLogRecPtr candidate_restart_lsn
Definition slot.h:231
TransactionId candidate_catalog_xmin
Definition slot.h:228
TimestampTz inactive_since
Definition slot.h:245
#define InvalidTransactionId
Definition transam.h:31
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References ReplicationSlot::active_cv, ReplicationSlot::active_proc, Assert, ReplicationSlot::candidate_catalog_xmin, ReplicationSlot::candidate_restart_lsn, ReplicationSlot::candidate_restart_valid, ReplicationSlot::candidate_xmin_lsn, ConditionVariableBroadcast(), CreateSlotOnDisk(), ReplicationSlot::data, ReplicationSlotPersistentData::database, ReplicationSlot::dirty, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errhint(), errmsg, ERROR, failover, ReplicationSlotPersistentData::failover, fb(), i, ReplicationSlot::in_use, ReplicationSlot::inactive_since, INVALID_PROC_NUMBER, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsBinaryUpgrade, IsLogicalLauncher(), IsSyncingReplicationSlots(), ReplicationSlot::just_dirtied, ReplicationSlot::last_saved_confirmed_flush, ReplicationSlot::last_saved_restart_lsn, LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, MyDatabaseId, MyProcNumber, MyReplicationSlot, name, ReplicationSlotPersistentData::name, NameStr, namestrcpy(), ReplicationSlotPersistentData::persistency, pgstat_create_replslot(), RecoveryInProgress(), ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotValidateName(), RS_TEMPORARY, SlotIsLogical, ReplicationSlot::slotsync_skip_reason, SpinLockAcquire(), SpinLockRelease(), SS_SKIP_NONE, ReplicationSlotPersistentData::synced, two_phase, ReplicationSlotPersistentData::two_phase, and ReplicationSlotPersistentData::two_phase_at.

Referenced by create_logical_replication_slot(), create_physical_replication_slot(), CreateConflictDetectionSlot(), CreateReplicationSlot(), and synchronize_one_slot().

◆ ReplicationSlotDrop()

void ReplicationSlotDrop ( const char name,
bool  nowait 
)
extern

Definition at line 913 of file slot.c.

914{
915 bool is_logical;
916
918
919 ReplicationSlotAcquire(name, nowait, false);
920
921 /*
922 * Do not allow users to drop the slots which are currently being synced
923 * from the primary to the standby.
924 */
928 errmsg("cannot drop replication slot \"%s\"", name),
929 errdetail("This replication slot is being synchronized from the primary server."));
930
932
934
935 if (is_logical)
937}
void ReplicationSlotDropAcquired(void)
Definition slot.c:1035

References Assert, ReplicationSlot::data, ereport, errcode(), errdetail(), errmsg, ERROR, fb(), MyReplicationSlot, name, RecoveryInProgress(), ReplicationSlotAcquire(), ReplicationSlotDropAcquired(), RequestDisableLogicalDecoding(), SlotIsLogical, and ReplicationSlotPersistentData::synced.

Referenced by DropReplicationSlot(), and pg_drop_replication_slot().

◆ ReplicationSlotDropAcquired()

void ReplicationSlotDropAcquired ( void  )
extern

Definition at line 1035 of file slot.c.

1036{
1038
1040
1041 /* slot isn't acquired anymore */
1043
1045}

References Assert, fb(), MyReplicationSlot, and ReplicationSlotDropPtr().

Referenced by ApplyLauncherMain(), drop_local_obsolete_slots(), ReplicationSlotDrop(), ReplicationSlotRelease(), and ReplicationSlotsDropDBSlots().

◆ ReplicationSlotDropAtPubNode()

void ReplicationSlotDropAtPubNode ( WalReceiverConn wrconn,
char slotname,
bool  missing_ok 
)
extern

Definition at line 2494 of file subscriptioncmds.c.

2495{
2496 StringInfoData cmd;
2497
2498 Assert(wrconn);
2499
2500 load_file("libpqwalreceiver", false);
2501
2502 initStringInfo(&cmd);
2503 appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
2504
2505 PG_TRY();
2506 {
2507 WalRcvExecResult *res;
2508
2509 res = walrcv_exec(wrconn, cmd.data, 0, NULL);
2510
2511 if (res->status == WALRCV_OK_COMMAND)
2512 {
2513 /* NOTICE. Success. */
2515 (errmsg("dropped replication slot \"%s\" on publisher",
2516 slotname)));
2517 }
2518 else if (res->status == WALRCV_ERROR &&
2519 missing_ok &&
2521 {
2522 /* LOG. Error, but missing_ok = true. */
2523 ereport(LOG,
2524 (errmsg("could not drop replication slot \"%s\" on publisher: %s",
2525 slotname, res->err)));
2526 }
2527 else
2528 {
2529 /* ERROR. */
2530 ereport(ERROR,
2532 errmsg("could not drop replication slot \"%s\" on publisher: %s",
2533 slotname, res->err)));
2534 }
2535
2537 }
2538 PG_FINALLY();
2539 {
2540 pfree(cmd.data);
2541 }
2542 PG_END_TRY();
2543}
void load_file(const char *filename, bool restricted)
Definition dfmgr.c:149
#define PG_TRY(...)
Definition elog.h:372
#define PG_END_TRY(...)
Definition elog.h:397
#define NOTICE
Definition elog.h:35
#define PG_FINALLY(...)
Definition elog.h:389
void pfree(void *pointer)
Definition mcxt.c:1616
const char * quote_identifier(const char *ident)
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
WalRcvExecStatus status
static WalReceiverConn * wrconn
Definition walreceiver.c:95
@ WALRCV_OK_COMMAND
@ WALRCV_ERROR
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)

References appendStringInfo(), Assert, StringInfoData::data, ereport, WalRcvExecResult::err, errcode(), errmsg, ERROR, fb(), initStringInfo(), load_file(), LOG, NOTICE, pfree(), PG_END_TRY, PG_FINALLY, PG_TRY, quote_identifier(), WalRcvExecResult::sqlstate, WalRcvExecResult::status, walrcv_clear_result(), WALRCV_ERROR, walrcv_exec, WALRCV_OK_COMMAND, and wrconn.

Referenced by AlterSubscription_refresh(), DropSubscription(), LogicalRepSyncTableStart(), and ProcessSyncingTablesForSync().

◆ ReplicationSlotIndex()

◆ ReplicationSlotInitialize()

void ReplicationSlotInitialize ( void  )
extern

Definition at line 243 of file slot.c.

244{
246}
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
static void ReplicationSlotShmemExit(int code, Datum arg)
Definition slot.c:252

References before_shmem_exit(), and ReplicationSlotShmemExit().

Referenced by BaseInit().

◆ ReplicationSlotMarkDirty()

◆ ReplicationSlotName()

bool ReplicationSlotName ( int  index,
Name  name 
)
extern

Definition at line 591 of file slot.c.

592{
593 ReplicationSlot *slot;
594 bool found;
595
597
598 /*
599 * Ensure that the slot cannot be dropped while we copy the name. Don't
600 * need the spinlock as the name of an existing slot cannot change.
601 */
603 found = slot->in_use;
604 if (slot->in_use)
607
608 return found;
609}
Definition type.h:96

References ReplicationSlot::data, fb(), ReplicationSlot::in_use, LW_SHARED, LWLockAcquire(), LWLockRelease(), name, ReplicationSlotPersistentData::name, NameStr, namestrcpy(), ReplicationSlotCtlData::replication_slots, and ReplicationSlotCtl.

Referenced by pgstat_replslot_to_serialized_name_cb().

◆ ReplicationSlotNameForTablesync()

void ReplicationSlotNameForTablesync ( Oid  suboid,
Oid  relid,
char syncslotname,
Size  szslot 
)
extern

Definition at line 1203 of file tablesync.c.

1205{
1206 snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1207 relid, GetSystemIdentifier());
1208}
#define UINT64_FORMAT
Definition c.h:637
#define snprintf
Definition port.h:260
uint64 GetSystemIdentifier(void)
Definition xlog.c:4611

References fb(), GetSystemIdentifier(), snprintf, and UINT64_FORMAT.

Referenced by AlterSubscription_refresh(), DropSubscription(), LogicalRepSyncTableStart(), ProcessSyncingTablesForSync(), and ReportSlotConnectionError().

◆ ReplicationSlotPersist()

◆ ReplicationSlotRelease()

void ReplicationSlotRelease ( void  )
extern

Definition at line 762 of file slot.c.

763{
765 char *slotname = NULL; /* keep compiler quiet */
766 bool is_logical;
767 TimestampTz now = 0;
768
769 Assert(slot != NULL && slot->active_proc != INVALID_PROC_NUMBER);
770
772
773 if (am_walsender)
774 slotname = pstrdup(NameStr(slot->data.name));
775
776 if (slot->data.persistency == RS_EPHEMERAL)
777 {
778 /*
779 * Delete the slot. There is no !PANIC case where this is allowed to
780 * fail, all that may happen is an incomplete cleanup of the on-disk
781 * data.
782 */
784
785 /*
786 * Request to disable logical decoding, even though this slot may not
787 * have been the last logical slot. The checkpointer will verify if
788 * logical decoding should actually be disabled.
789 */
790 if (is_logical)
792 }
793
794 /*
795 * If slot needed to temporarily restrain both data and catalog xmin to
796 * create the catalog snapshot, remove that temporary constraint.
797 * Snapshots can only be exported while the initial snapshot is still
798 * acquired.
799 */
800 if (!TransactionIdIsValid(slot->data.xmin) &&
802 {
803 SpinLockAcquire(&slot->mutex);
805 SpinLockRelease(&slot->mutex);
807 }
808
809 /*
810 * Set the time since the slot has become inactive. We get the current
811 * time beforehand to avoid system call while holding the spinlock.
812 */
814
815 if (slot->data.persistency == RS_PERSISTENT)
816 {
817 /*
818 * Mark persistent slot inactive. We're not freeing it, just
819 * disconnecting, but wake up others that may be waiting for it.
820 */
821 SpinLockAcquire(&slot->mutex);
824 SpinLockRelease(&slot->mutex);
826 }
827 else
829
831
832 /* might not have been set when we've been a plain slot */
837
838 if (am_walsender)
839 {
842 ? errmsg("released logical replication slot \"%s\"",
843 slotname)
844 : errmsg("released physical replication slot \"%s\"",
845 slotname));
846
847 pfree(slotname);
848 }
849}
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1636
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1600
int64 TimestampTz
Definition timestamp.h:39
char * pstrdup(const char *in)
Definition mcxt.c:1781
PGPROC * MyProc
Definition proc.c:68
PROC_HDR * ProcGlobal
Definition proc.c:71
uint8 statusFlags
Definition proc.h:207
int pgxactoff
Definition proc.h:204
uint8 * statusFlags
Definition proc.h:453

References ReplicationSlot::active_cv, ReplicationSlot::active_proc, am_walsender, Assert, ConditionVariableBroadcast(), ReplicationSlot::data, DEBUG1, ReplicationSlot::effective_xmin, ereport, errmsg, fb(), GetCurrentTimestamp(), INVALID_PROC_NUMBER, InvalidTransactionId, LOG, log_replication_commands, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, now(), ReplicationSlotPersistentData::persistency, pfree(), PGPROC::pgxactoff, ProcGlobal, pstrdup(), ReplicationSlotDropAcquired(), ReplicationSlotsComputeRequiredXmin(), ReplicationSlotSetInactiveSince(), RequestDisableLogicalDecoding(), RS_EPHEMERAL, RS_PERSISTENT, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), PGPROC::statusFlags, PROC_HDR::statusFlags, TransactionIdIsValid, and ReplicationSlotPersistentData::xmin.

Referenced by binary_upgrade_check_logical_slot_pending_wal(), binary_upgrade_create_conflict_detection_slot(), copy_replication_slot(), CreateReplicationSlot(), InvalidatePossiblyObsoleteSlot(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_logical_slot_get_changes_guts(), pg_replication_slot_advance(), PostgresMain(), ReplicationSlotAlter(), ReplicationSlotShmemExit(), slotsync_failure_callback(), slotsync_worker_onexit(), StartLogicalReplication(), StartReplication(), synchronize_one_slot(), and WalSndErrorCleanup().

◆ ReplicationSlotReserveWal()

void ReplicationSlotReserveWal ( void  )
extern

Definition at line 1697 of file slot.c.

1698{
1700 XLogSegNo segno;
1701 XLogRecPtr restart_lsn;
1702
1703 Assert(slot != NULL);
1706
1707 /*
1708 * The replication slot mechanism is used to prevent the removal of
1709 * required WAL.
1710 *
1711 * Acquire an exclusive lock to prevent the checkpoint process from
1712 * concurrently computing the minimum slot LSN (see
1713 * CheckPointReplicationSlots). This ensures that the WAL reserved for
1714 * replication cannot be removed during a checkpoint.
1715 *
1716 * The mechanism is reliable because if WAL reservation occurs first, the
1717 * checkpoint must wait for the restart_lsn update before determining the
1718 * minimum non-removable LSN. On the other hand, if the checkpoint happens
1719 * first, subsequent WAL reservations will select positions at or beyond
1720 * the redo pointer of that checkpoint.
1721 */
1723
1724 /*
1725 * For logical slots log a standby snapshot and start logical decoding at
1726 * exactly that position. That allows the slot to start up more quickly.
1727 * But on a standby we cannot do WAL writes, so just use the replay
1728 * pointer; effectively, an attempt to create a logical slot on standby
1729 * will cause it to wait for an xl_running_xact record to be logged
1730 * independently on the primary, so that a snapshot can be built using the
1731 * record.
1732 *
1733 * None of this is needed (or indeed helpful) for physical slots as
1734 * they'll start replay at the last logged checkpoint anyway. Instead,
1735 * return the location of the last redo LSN, where a base backup has to
1736 * start replay at.
1737 */
1738 if (SlotIsPhysical(slot))
1739 restart_lsn = GetRedoRecPtr();
1740 else if (RecoveryInProgress())
1741 restart_lsn = GetXLogReplayRecPtr(NULL);
1742 else
1743 restart_lsn = GetXLogInsertRecPtr();
1744
1745 SpinLockAcquire(&slot->mutex);
1746 slot->data.restart_lsn = restart_lsn;
1747 SpinLockRelease(&slot->mutex);
1748
1749 /* prevent WAL removal as fast as possible */
1751
1752 /* Checkpoint shouldn't remove the required WAL. */
1754 if (XLogGetLastRemovedSegno() >= segno)
1755 elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
1756 NameStr(slot->data.name));
1757
1759
1760 if (!RecoveryInProgress() && SlotIsLogical(slot))
1761 {
1763
1764 /* make sure we have enough information to start */
1766
1767 /* and make sure it's fsynced to disk */
1769 }
1770}
XLogRecPtr LogStandbySnapshot(void)
Definition standby.c:1283
XLogSegNo XLogGetLastRemovedSegno(void)
Definition xlog.c:3779
XLogRecPtr GetRedoRecPtr(void)
Definition xlog.c:6547
XLogRecPtr GetXLogInsertRecPtr(void)
Definition xlog.c:9614
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2767
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
uint64 XLogSegNo
Definition xlogdefs.h:52
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References Assert, ReplicationSlot::data, elog, ERROR, fb(), GetRedoRecPtr(), GetXLogInsertRecPtr(), GetXLogReplayRecPtr(), ReplicationSlot::last_saved_restart_lsn, LogStandbySnapshot(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SlotIsLogical, SlotIsPhysical, SpinLockAcquire(), SpinLockRelease(), wal_segment_size, XLByteToSeg, XLogFlush(), XLogGetLastRemovedSegno(), and XLogRecPtrIsValid.

Referenced by create_physical_replication_slot(), CreateInitDecodingContext(), and CreateReplicationSlot().

◆ ReplicationSlotSave()

◆ ReplicationSlotsComputeLogicalRestartLSN()

XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN ( void  )
extern

Definition at line 1371 of file slot.c.

1372{
1374 int i;
1375
1376 if (max_replication_slots <= 0)
1377 return InvalidXLogRecPtr;
1378
1380
1381 for (i = 0; i < max_replication_slots; i++)
1382 {
1383 ReplicationSlot *s;
1384 XLogRecPtr restart_lsn;
1385 XLogRecPtr last_saved_restart_lsn;
1386 bool invalidated;
1387 ReplicationSlotPersistency persistency;
1388
1390
1391 /* cannot change while ReplicationSlotCtlLock is held */
1392 if (!s->in_use)
1393 continue;
1394
1395 /* we're only interested in logical slots */
1396 if (!SlotIsLogical(s))
1397 continue;
1398
1399 /* read once, it's ok if it increases while we're checking */
1401 persistency = s->data.persistency;
1402 restart_lsn = s->data.restart_lsn;
1403 invalidated = s->data.invalidated != RS_INVAL_NONE;
1404 last_saved_restart_lsn = s->last_saved_restart_lsn;
1406
1407 /* invalidated slots need not apply */
1408 if (invalidated)
1409 continue;
1410
1411 /*
1412 * For persistent slot use last_saved_restart_lsn to compute the
1413 * oldest LSN for removal of WAL segments. The segments between
1414 * last_saved_restart_lsn and restart_lsn might be needed by a
1415 * persistent slot in the case of database crash. Non-persistent
1416 * slots can't survive the database crash, so we don't care about
1417 * last_saved_restart_lsn for them.
1418 */
1419 if (persistency == RS_PERSISTENT)
1420 {
1421 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1422 restart_lsn > last_saved_restart_lsn)
1423 {
1424 restart_lsn = last_saved_restart_lsn;
1425 }
1426 }
1427
1428 if (!XLogRecPtrIsValid(restart_lsn))
1429 continue;
1430
1431 if (!XLogRecPtrIsValid(result) ||
1432 restart_lsn < result)
1433 result = restart_lsn;
1434 }
1435
1437
1438 return result;
1439}

References ReplicationSlot::data, fb(), i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, ReplicationSlot::last_saved_restart_lsn, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ReplicationSlotPersistentData::persistency, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, RS_PERSISTENT, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), and XLogRecPtrIsValid.

Referenced by CheckPointLogicalRewriteHeap(), and CheckPointSnapBuild().

◆ ReplicationSlotsComputeRequiredLSN()

void ReplicationSlotsComputeRequiredLSN ( void  )
extern

Definition at line 1301 of file slot.c.

1302{
1303 int i;
1305
1307
1309 for (i = 0; i < max_replication_slots; i++)
1310 {
1312 XLogRecPtr restart_lsn;
1313 XLogRecPtr last_saved_restart_lsn;
1314 bool invalidated;
1315 ReplicationSlotPersistency persistency;
1316
1317 if (!s->in_use)
1318 continue;
1319
1321 persistency = s->data.persistency;
1322 restart_lsn = s->data.restart_lsn;
1323 invalidated = s->data.invalidated != RS_INVAL_NONE;
1324 last_saved_restart_lsn = s->last_saved_restart_lsn;
1326
1327 /* invalidated slots need not apply */
1328 if (invalidated)
1329 continue;
1330
1331 /*
1332 * For persistent slot use last_saved_restart_lsn to compute the
1333 * oldest LSN for removal of WAL segments. The segments between
1334 * last_saved_restart_lsn and restart_lsn might be needed by a
1335 * persistent slot in the case of database crash. Non-persistent
1336 * slots can't survive the database crash, so we don't care about
1337 * last_saved_restart_lsn for them.
1338 */
1339 if (persistency == RS_PERSISTENT)
1340 {
1341 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1342 restart_lsn > last_saved_restart_lsn)
1343 {
1344 restart_lsn = last_saved_restart_lsn;
1345 }
1346 }
1347
1348 if (XLogRecPtrIsValid(restart_lsn) &&
1350 restart_lsn < min_required))
1351 min_required = restart_lsn;
1352 }
1354
1356}
void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
Definition xlog.c:2653

References Assert, ReplicationSlot::data, fb(), i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, ReplicationSlot::last_saved_restart_lsn, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ReplicationSlotPersistentData::persistency, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, RS_PERSISTENT, SpinLockAcquire(), SpinLockRelease(), XLogRecPtrIsValid, and XLogSetReplicationSlotMinimumLSN().

Referenced by CheckPointReplicationSlots(), copy_replication_slot(), InvalidateObsoleteReplicationSlots(), LogicalConfirmReceivedLocation(), pg_replication_slot_advance(), PhysicalConfirmReceivedLocation(), ReplicationSlotDropPtr(), ReplicationSlotReserveWal(), reserve_wal_for_local_slot(), StartupReplicationSlots(), and update_local_synced_slot().

◆ ReplicationSlotsComputeRequiredXmin()

void ReplicationSlotsComputeRequiredXmin ( bool  already_locked)
extern

Definition at line 1219 of file slot.c.

1220{
1221 int i;
1224
1229
1230 /*
1231 * Hold the ReplicationSlotControlLock until after updating the slot xmin
1232 * values, so no backend updates the initial xmin for newly created slot
1233 * concurrently. A shared lock is used here to minimize lock contention,
1234 * especially when many slots exist and advancements occur frequently.
1235 * This is safe since an exclusive lock is taken during initial slot xmin
1236 * update in slot creation.
1237 *
1238 * One might think that we can hold the ProcArrayLock exclusively and
1239 * update the slot xmin values, but it could increase lock contention on
1240 * the ProcArrayLock, which is not great since this function can be called
1241 * at non-negligible frequency.
1242 *
1243 * Concurrent invocation of this function may cause the computed slot xmin
1244 * to regress. However, this is harmless because tuples prior to the most
1245 * recent xmin are no longer useful once advancement occurs (see
1246 * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
1247 * before updating the effective_xmin). Thus, such regression merely
1248 * prevents VACUUM from prematurely removing tuples without causing the
1249 * early deletion of required data.
1250 */
1251 if (!already_locked)
1253
1254 for (i = 0; i < max_replication_slots; i++)
1255 {
1257 TransactionId effective_xmin;
1258 TransactionId effective_catalog_xmin;
1259 bool invalidated;
1260
1261 if (!s->in_use)
1262 continue;
1263
1265 effective_xmin = s->effective_xmin;
1266 effective_catalog_xmin = s->effective_catalog_xmin;
1267 invalidated = s->data.invalidated != RS_INVAL_NONE;
1269
1270 /* invalidated slots need not apply */
1271 if (invalidated)
1272 continue;
1273
1274 /* check the data xmin */
1275 if (TransactionIdIsValid(effective_xmin) &&
1277 TransactionIdPrecedes(effective_xmin, agg_xmin)))
1278 agg_xmin = effective_xmin;
1279
1280 /* check the catalog xmin */
1281 if (TransactionIdIsValid(effective_catalog_xmin) &&
1283 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1284 agg_catalog_xmin = effective_catalog_xmin;
1285 }
1286
1288
1289 if (!already_locked)
1291}
uint32 TransactionId
Definition c.h:738
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1956
void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked)
Definition procarray.c:3950
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263

References Assert, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, fb(), i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidTransactionId, LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockHeldByMeInMode(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ProcArraySetReplicationSlotXmin(), ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, RS_INVAL_NONE, SpinLockAcquire(), SpinLockRelease(), TransactionIdIsValid, and TransactionIdPrecedes().

Referenced by copy_replication_slot(), CreateInitDecodingContext(), init_conflict_slot_xmin(), InvalidateObsoleteReplicationSlots(), LogicalConfirmReceivedLocation(), pg_replication_slot_advance(), PhysicalReplicationSlotNewXmin(), ReplicationSlotDropPtr(), ReplicationSlotRelease(), StartupReplicationSlots(), synchronize_one_slot(), update_conflict_slot_xmin(), and update_local_synced_slot().

◆ ReplicationSlotsCountDBSlots()

bool ReplicationSlotsCountDBSlots ( Oid  dboid,
int nslots,
int nactive 
)
extern

Definition at line 1450 of file slot.c.

1451{
1452 int i;
1453
1454 *nslots = *nactive = 0;
1455
1456 if (max_replication_slots <= 0)
1457 return false;
1458
1460 for (i = 0; i < max_replication_slots; i++)
1461 {
1462 ReplicationSlot *s;
1463
1465
1466 /* cannot change while ReplicationSlotCtlLock is held */
1467 if (!s->in_use)
1468 continue;
1469
1470 /* only logical slots are database specific, skip */
1471 if (!SlotIsLogical(s))
1472 continue;
1473
1474 /* not our database, skip */
1475 if (s->data.database != dboid)
1476 continue;
1477
1478 /* NB: intentionally counting invalidated slots */
1479
1480 /* count slots with spinlock held */
1482 (*nslots)++;
1484 (*nactive)++;
1486 }
1488
1489 if (*nslots > 0)
1490 return true;
1491 return false;
1492}

References ReplicationSlot::active_proc, ReplicationSlot::data, ReplicationSlotPersistentData::database, fb(), i, ReplicationSlot::in_use, INVALID_PROC_NUMBER, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, SlotIsLogical, SpinLockAcquire(), and SpinLockRelease().

Referenced by dropdb().

◆ ReplicationSlotsDropDBSlots()

void ReplicationSlotsDropDBSlots ( Oid  dboid)
extern

Definition at line 1511 of file slot.c.

1512{
1513 int i;
1515 bool dropped = false;
1516
1517 if (max_replication_slots <= 0)
1518 return;
1519
1520restart:
1523 for (i = 0; i < max_replication_slots; i++)
1524 {
1525 ReplicationSlot *s;
1526 char *slotname;
1527 ProcNumber active_proc;
1528
1530
1531 /* cannot change while ReplicationSlotCtlLock is held */
1532 if (!s->in_use)
1533 continue;
1534
1535 /* only logical slots are database specific, skip */
1536 if (!SlotIsLogical(s))
1537 continue;
1538
1539 /*
1540 * Check logical slots on other databases too so we can disable
1541 * logical decoding only if no slots in the cluster.
1542 */
1546
1547 /* not our database, skip */
1548 if (s->data.database != dboid)
1549 continue;
1550
1551 /* NB: intentionally including invalidated slots to drop */
1552
1553 /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1555 /* can't change while ReplicationSlotControlLock is held */
1556 slotname = NameStr(s->data.name);
1557 active_proc = s->active_proc;
1558 if (active_proc == INVALID_PROC_NUMBER)
1559 {
1562 }
1564
1565 /*
1566 * Even though we hold an exclusive lock on the database object a
1567 * logical slot for that DB can still be active, e.g. if it's
1568 * concurrently being dropped by a backend connected to another DB.
1569 *
1570 * That's fairly unlikely in practice, so we'll just bail out.
1571 *
1572 * The slot sync worker holds a shared lock on the database before
1573 * operating on synced logical slots to avoid conflict with the drop
1574 * happening here. The persistent synced slots are thus safe but there
1575 * is a possibility that the slot sync worker has created a temporary
1576 * slot (which stays active even on release) and we are trying to drop
1577 * that here. In practice, the chances of hitting this scenario are
1578 * less as during slot synchronization, the temporary slot is
1579 * immediately converted to persistent and thus is safe due to the
1580 * shared lock taken on the database. So, we'll just bail out in such
1581 * a case.
1582 *
1583 * XXX: We can consider shutting down the slot sync worker before
1584 * trying to drop synced temporary slots here.
1585 */
1586 if (active_proc != INVALID_PROC_NUMBER)
1587 ereport(ERROR,
1589 errmsg("replication slot \"%s\" is active for PID %d",
1590 slotname, GetPGProcByNumber(active_proc)->pid)));
1591
1592 /*
1593 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1594 * holding ReplicationSlotControlLock over filesystem operations,
1595 * release ReplicationSlotControlLock and use
1596 * ReplicationSlotDropAcquired.
1597 *
1598 * As that means the set of slots could change, restart scan from the
1599 * beginning each time we release the lock.
1600 */
1603 dropped = true;
1604 goto restart;
1605 }
1607
1608 if (dropped && !found_valid_logicalslot)
1610}

References ReplicationSlot::active_proc, ReplicationSlot::data, ReplicationSlotPersistentData::database, ereport, errcode(), errmsg, ERROR, fb(), GetPGProcByNumber, i, ReplicationSlot::in_use, INVALID_PROC_NUMBER, ReplicationSlotPersistentData::invalidated, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, MyProcNumber, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotDropAcquired(), RequestDisableLogicalDecoding(), RS_INVAL_NONE, SlotIsLogical, SpinLockAcquire(), and SpinLockRelease().

Referenced by dbase_redo(), and dropdb().

◆ ReplicationSlotSetInactiveSince()

◆ ReplicationSlotsShmemInit()

void ReplicationSlotsShmemInit ( void  )
extern

Definition at line 207 of file slot.c.

208{
209 bool found;
210
211 if (max_replication_slots == 0)
212 return;
213
215 ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
216 &found);
217
218 if (!found)
219 {
220 int i;
221
222 /* First time through, so initialize */
224
225 for (i = 0; i < max_replication_slots; i++)
226 {
228
229 /* everything else is zeroed by the memset above */
231 SpinLockInit(&slot->mutex);
235 }
236 }
237}
#define MemSet(start, val, len)
Definition c.h:1109
void ConditionVariableInit(ConditionVariable *cv)
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition lwlock.c:699
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:381
Size ReplicationSlotsShmemSize(void)
Definition slot.c:189
static void SpinLockInit(volatile slock_t *lock)
Definition spin.h:50
LWLock io_in_progress_lock
Definition slot.h:216

References ReplicationSlot::active_cv, ReplicationSlot::active_proc, ConditionVariableInit(), fb(), i, INVALID_PROC_NUMBER, ReplicationSlot::io_in_progress_lock, LWLockInitialize(), max_replication_slots, MemSet, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotsShmemSize(), ShmemInitStruct(), and SpinLockInit().

Referenced by CreateOrAttachShmemStructs().

◆ ReplicationSlotsShmemSize()

Size ReplicationSlotsShmemSize ( void  )
extern

Definition at line 189 of file slot.c.

190{
191 Size size = 0;
192
193 if (max_replication_slots == 0)
194 return size;
195
196 size = offsetof(ReplicationSlotCtlData, replication_slots);
197 size = add_size(size,
199
200 return size;
201}
size_t Size
Definition c.h:691
Size add_size(Size s1, Size s2)
Definition shmem.c:485
Size mul_size(Size s1, Size s2)
Definition shmem.c:500

References add_size(), fb(), max_replication_slots, and mul_size().

Referenced by CalculateShmemSize(), and ReplicationSlotsShmemInit().

◆ ReplicationSlotValidateName()

bool ReplicationSlotValidateName ( const char name,
bool  allow_reserved_name,
int  elevel 
)
extern

Definition at line 268 of file slot.c.

270{
271 int err_code;
272 char *err_msg = NULL;
273 char *err_hint = NULL;
274
276 &err_code, &err_msg, &err_hint))
277 {
278 /*
279 * Use errmsg_internal() and errhint_internal() instead of errmsg()
280 * and errhint(), since the messages from
281 * ReplicationSlotValidateNameInternal() are already translated. This
282 * avoids double translation.
283 */
284 ereport(elevel,
286 errmsg_internal("%s", err_msg),
287 (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
288
289 pfree(err_msg);
290 if (err_hint != NULL)
292 return false;
293 }
294
295 return true;
296}
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
int int errhint_internal(const char *fmt,...) pg_attribute_printf(1
bool ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name, int *err_code, char **err_msg, char **err_hint)
Definition slot.c:313

References ereport, errcode(), errhint_internal(), errmsg_internal(), fb(), name, pfree(), and ReplicationSlotValidateNameInternal().

Referenced by parse_subscription_options(), ReplicationSlotCreate(), and StartupReorderBuffer().

◆ ReplicationSlotValidateNameInternal()

bool ReplicationSlotValidateNameInternal ( const char name,
bool  allow_reserved_name,
int err_code,
char **  err_msg,
char **  err_hint 
)
extern

Definition at line 313 of file slot.c.

315{
316 const char *cp;
317
318 if (strlen(name) == 0)
319 {
321 *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
322 *err_hint = NULL;
323 return false;
324 }
325
326 if (strlen(name) >= NAMEDATALEN)
327 {
329 *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
330 *err_hint = NULL;
331 return false;
332 }
333
334 for (cp = name; *cp; cp++)
335 {
336 if (!((*cp >= 'a' && *cp <= 'z')
337 || (*cp >= '0' && *cp <= '9')
338 || (*cp == '_')))
339 {
341 *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
342 *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
343 return false;
344 }
345 }
346
348 {
350 *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
351 *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
353 return false;
354 }
355
356 return true;
357}
#define _(x)
Definition elog.c:95
#define NAMEDATALEN
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
#define CONFLICT_DETECTION_SLOT
Definition slot.h:28

References _, CONFLICT_DETECTION_SLOT, fb(), IsSlotForConflictCheck(), name, NAMEDATALEN, and psprintf().

Referenced by check_primary_slot_name(), ReplicationSlotValidateName(), and validate_sync_standby_slots().

◆ SearchNamedReplicationSlot()

◆ SlotExistsInSyncStandbySlots()

bool SlotExistsInSyncStandbySlots ( const char slot_name)
extern

Definition at line 3060 of file slot.c.

3061{
3062 const char *standby_slot_name;
3063
3064 /* Return false if there is no value in synchronized_standby_slots */
3066 return false;
3067
3068 /*
3069 * XXX: We are not expecting this list to be long so a linear search
3070 * shouldn't hurt but if that turns out not to be true then we can cache
3071 * this information for each WalSender as well.
3072 */
3074 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3075 {
3076 if (strcmp(standby_slot_name, slot_name) == 0)
3077 return true;
3078
3080 }
3081
3082 return false;
3083}
static SyncStandbySlotsConfigData * synchronized_standby_slots_config
Definition slot.c:168
char slot_names[FLEXIBLE_ARRAY_MEMBER]
Definition slot.c:102

References fb(), i, SyncStandbySlotsConfigData::nslotnames, SyncStandbySlotsConfigData::slot_names, and synchronized_standby_slots_config.

Referenced by PhysicalWakeupLogicalWalSnd().

◆ StandbySlotsHaveCaughtup()

bool StandbySlotsHaveCaughtup ( XLogRecPtr  wait_for_lsn,
int  elevel 
)
extern

Definition at line 3093 of file slot.c.

3094{
3095 const char *name;
3096 int caught_up_slot_num = 0;
3098
3099 /*
3100 * Don't need to wait for the standbys to catch up if there is no value in
3101 * synchronized_standby_slots.
3102 */
3104 return true;
3105
3106 /*
3107 * Don't need to wait for the standbys to catch up if we are on a standby
3108 * server, since we do not support syncing slots to cascading standbys.
3109 */
3110 if (RecoveryInProgress())
3111 return true;
3112
3113 /*
3114 * Don't need to wait for the standbys to catch up if they are already
3115 * beyond the specified WAL location.
3116 */
3119 return true;
3120
3121 /*
3122 * To prevent concurrent slot dropping and creation while filtering the
3123 * slots, take the ReplicationSlotControlLock outside of the loop.
3124 */
3126
3128 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3129 {
3130 XLogRecPtr restart_lsn;
3131 bool invalidated;
3132 bool inactive;
3133 ReplicationSlot *slot;
3134
3135 slot = SearchNamedReplicationSlot(name, false);
3136
3137 /*
3138 * If a slot name provided in synchronized_standby_slots does not
3139 * exist, report a message and exit the loop.
3140 */
3141 if (!slot)
3142 {
3143 ereport(elevel,
3145 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
3146 name, "synchronized_standby_slots"),
3147 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3148 name),
3149 errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
3150 name, "synchronized_standby_slots"));
3151 break;
3152 }
3153
3154 /* Same as above: if a slot is not physical, exit the loop. */
3155 if (SlotIsLogical(slot))
3156 {
3157 ereport(elevel,
3159 errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
3160 name, "synchronized_standby_slots"),
3161 errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
3162 name),
3163 errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
3164 name, "synchronized_standby_slots"));
3165 break;
3166 }
3167
3168 SpinLockAcquire(&slot->mutex);
3169 restart_lsn = slot->data.restart_lsn;
3170 invalidated = slot->data.invalidated != RS_INVAL_NONE;
3172 SpinLockRelease(&slot->mutex);
3173
3174 if (invalidated)
3175 {
3176 /* Specified physical slot has been invalidated */
3177 ereport(elevel,
3179 errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
3180 name, "synchronized_standby_slots"),
3181 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3182 name),
3183 errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
3184 name, "synchronized_standby_slots"));
3185 break;
3186 }
3187
3188 if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
3189 {
3190 /* Log a message if no active_pid for this physical slot */
3191 if (inactive)
3192 ereport(elevel,
3194 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3195 name, "synchronized_standby_slots"),
3196 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3197 name),
3198 errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3199 name, "synchronized_standby_slots"));
3200
3201 /* Continue if the current slot hasn't caught up. */
3202 break;
3203 }
3204
3205 Assert(restart_lsn >= wait_for_lsn);
3206
3208 min_restart_lsn > restart_lsn)
3209 min_restart_lsn = restart_lsn;
3210
3212
3213 name += strlen(name) + 1;
3214 }
3215
3217
3218 /*
3219 * Return false if not all the standbys have caught up to the specified
3220 * WAL location.
3221 */
3223 return false;
3224
3225 /* The ss_oldest_flush_lsn must not retreat. */
3228
3230
3231 return true;
3232}
static XLogRecPtr ss_oldest_flush_lsn
Definition slot.c:174

References ReplicationSlot::active_proc, Assert, ReplicationSlot::data, ereport, errcode(), errdetail(), errhint(), errmsg, fb(), i, INVALID_PROC_NUMBER, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, name, SyncStandbySlotsConfigData::nslotnames, RecoveryInProgress(), ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, SearchNamedReplicationSlot(), SyncStandbySlotsConfigData::slot_names, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), ss_oldest_flush_lsn, synchronized_standby_slots_config, and XLogRecPtrIsValid.

Referenced by NeedToWaitForStandbys(), and WaitForStandbyConfirmation().

◆ StartupReplicationSlots()

void StartupReplicationSlots ( void  )
extern

Definition at line 2388 of file slot.c.

2389{
2391 struct dirent *replication_de;
2392
2393 elog(DEBUG1, "starting up replication slots");
2394
2395 /* restore all slots by iterating over all on-disk entries */
2398 {
2399 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2401
2402 if (strcmp(replication_de->d_name, ".") == 0 ||
2403 strcmp(replication_de->d_name, "..") == 0)
2404 continue;
2405
2406 snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2408
2409 /* we're only creating directories here, skip if it's not our's */
2411 continue;
2412
2413 /* we crashed while a slot was being setup or deleted, clean up */
2414 if (pg_str_endswith(replication_de->d_name, ".tmp"))
2415 {
2416 if (!rmtree(path, true))
2417 {
2419 (errmsg("could not remove directory \"%s\"",
2420 path)));
2421 continue;
2422 }
2424 continue;
2425 }
2426
2427 /* looks like a slot in a normal state, restore */
2429 }
2431
2432 /* currently no slots exist, we're done. */
2433 if (max_replication_slots <= 0)
2434 return;
2435
2436 /* Now that we have recovered all the data, compute replication xmin */
2439}
#define WARNING
Definition elog.h:36
int FreeDir(DIR *dir)
Definition fd.c:3009
void fsync_fname(const char *fname, bool isdir)
Definition fd.c:757
DIR * AllocateDir(const char *dirname)
Definition fd.c:2891
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition fd.c:2957
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Definition file_utils.c:547
PGFileType
Definition file_utils.h:19
@ PGFILETYPE_DIR
Definition file_utils.h:23
@ PGFILETYPE_ERROR
Definition file_utils.h:20
bool rmtree(const char *path, bool rmtopdir)
Definition rmtree.c:50
static void RestoreSlotFromDisk(const char *name)
Definition slot.c:2673
bool pg_str_endswith(const char *str, const char *end)
Definition string.c:31
Definition dirent.c:26

References AllocateDir(), DEBUG1, elog, ereport, errmsg, fb(), FreeDir(), fsync_fname(), get_dirent_type(), max_replication_slots, MAXPGPATH, PG_REPLSLOT_DIR, pg_str_endswith(), PGFILETYPE_DIR, PGFILETYPE_ERROR, ReadDir(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), RestoreSlotFromDisk(), rmtree(), snprintf, and WARNING.

Referenced by StartupXLOG().

◆ WaitForStandbyConfirmation()

void WaitForStandbyConfirmation ( XLogRecPtr  wait_for_lsn)
extern

Definition at line 3241 of file slot.c.

3242{
3243 /*
3244 * Don't need to wait for the standby to catch up if the current acquired
3245 * slot is not a logical failover slot, or there is no value in
3246 * synchronized_standby_slots.
3247 */
3249 return;
3250
3252
3253 for (;;)
3254 {
3256
3258 {
3259 ConfigReloadPending = false;
3261 }
3262
3263 /* Exit if done waiting for every slot. */
3265 break;
3266
3267 /*
3268 * Wait for the slots in the synchronized_standby_slots to catch up,
3269 * but use a timeout (1s) so we can also check if the
3270 * synchronized_standby_slots has been changed.
3271 */
3274 }
3275
3277}
bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info)
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
@ PGC_SIGHUP
Definition guc.h:75
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition slot.c:3093
ConditionVariable wal_confirm_rcv_cv
WalSndCtlData * WalSndCtl
Definition walsender.c:118

References CHECK_FOR_INTERRUPTS, ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), ConditionVariableTimedSleep(), ConfigReloadPending, ReplicationSlot::data, ReplicationSlotPersistentData::failover, fb(), MyReplicationSlot, PGC_SIGHUP, ProcessConfigFile(), StandbySlotsHaveCaughtup(), synchronized_standby_slots_config, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtl, and WARNING.

Referenced by LogicalSlotAdvanceAndCheckSnapState(), and pg_logical_slot_get_changes_guts().

Variable Documentation

◆ idle_replication_slot_timeout_secs

PGDLLIMPORT int idle_replication_slot_timeout_secs
extern

◆ max_replication_slots

◆ MyReplicationSlot

PGDLLIMPORT ReplicationSlot* MyReplicationSlot
extern

Definition at line 149 of file slot.c.

Referenced by abort_logical_decoding_activation(), ApplyLauncherMain(), binary_upgrade_check_logical_slot_pending_wal(), compute_min_nonremovable_xid(), copy_replication_slot(), create_logical_replication_slot(), create_physical_replication_slot(), CreateConflictDetectionSlot(), CreateDecodingContext(), CreateInitDecodingContext(), CreateReplicationSlot(), DisableLogicalDecodingIfNecessary(), EnsureLogicalDecodingEnabled(), init_conflict_slot_xmin(), InvalidatePossiblyObsoleteSlot(), LogicalConfirmReceivedLocation(), LogicalIncreaseRestartDecodingForSlot(), LogicalIncreaseXminForSlot(), logicalrep_worker_launch(), LogicalReplicationSlotCheckPendingWal(), LogicalSlotAdvanceAndCheckSnapState(), NeedToWaitForStandbys(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_logical_slot_get_changes_guts(), pg_physical_replication_slot_advance(), pg_replication_slot_advance(), PhysicalConfirmReceivedLocation(), PhysicalReplicationSlotNewXmin(), PhysicalWakeupLogicalWalSnd(), PostgresMain(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyReplyMessage(), ReorderBufferAllocate(), ReorderBufferFree(), ReorderBufferRestoreChanges(), ReorderBufferRestoreCleanup(), ReorderBufferSerializedPath(), ReorderBufferSerializeTXN(), ReplicationSlotAcquire(), ReplicationSlotAlter(), ReplicationSlotCleanup(), ReplicationSlotCreate(), ReplicationSlotDrop(), ReplicationSlotDropAcquired(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), ReplicationSlotsDropDBSlots(), ReplicationSlotShmemExit(), reserve_wal_for_local_slot(), slotsync_failure_callback(), slotsync_worker_onexit(), StartLogicalReplication(), StartReplication(), StartupDecodingContext(), synchronize_one_slot(), update_and_persist_local_synced_slot(), update_conflict_slot_xmin(), update_local_synced_slot(), update_slotsync_skip_stats(), WaitForStandbyConfirmation(), and WalSndErrorCleanup().

◆ ReplicationSlotCtl

◆ synchronized_standby_slots

PGDLLIMPORT char* synchronized_standby_slots
extern

Definition at line 165 of file slot.c.