PostgreSQL Source Code git master
Loading...
Searching...
No Matches
slot.h File Reference
Include dependency graph for slot.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ReplicationSlotPersistentData
 
struct  ReplicationSlot
 
struct  ReplicationSlotCtlData
 

Macros

#define PG_REPLSLOT_DIR   "pg_replslot"
 
#define CONFLICT_DETECTION_SLOT   "pg_conflict_detection"
 
#define RS_INVAL_MAX_CAUSES   4
 
#define SlotIsPhysical(slot)   ((slot)->data.database == InvalidOid)
 
#define SlotIsLogical(slot)   ((slot)->data.database != InvalidOid)
 

Typedefs

typedef enum ReplicationSlotPersistency ReplicationSlotPersistency
 
typedef enum ReplicationSlotInvalidationCause ReplicationSlotInvalidationCause
 
typedef enum SlotSyncSkipReason SlotSyncSkipReason
 
typedef struct ReplicationSlotPersistentData ReplicationSlotPersistentData
 
typedef struct ReplicationSlot ReplicationSlot
 
typedef struct ReplicationSlotCtlData ReplicationSlotCtlData
 

Enumerations

enum  ReplicationSlotPersistency { RS_PERSISTENT , RS_EPHEMERAL , RS_TEMPORARY }
 
enum  ReplicationSlotInvalidationCause {
  RS_INVAL_NONE = 0 , RS_INVAL_WAL_REMOVED = (1 << 0) , RS_INVAL_HORIZON = (1 << 1) , RS_INVAL_WAL_LEVEL = (1 << 2) ,
  RS_INVAL_IDLE_TIMEOUT = (1 << 3)
}
 
enum  SlotSyncSkipReason {
  SS_SKIP_NONE , SS_SKIP_WAL_NOT_FLUSHED , SS_SKIP_WAL_OR_ROWS_REMOVED , SS_SKIP_NO_CONSISTENT_SNAPSHOT ,
  SS_SKIP_INVALID
}
 

Functions

static void ReplicationSlotSetInactiveSince (ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
 
Size ReplicationSlotsShmemSize (void)
 
void ReplicationSlotsShmemInit (void)
 
void ReplicationSlotCreate (const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
 
void ReplicationSlotPersist (void)
 
void ReplicationSlotDrop (const char *name, bool nowait)
 
void ReplicationSlotDropAcquired (void)
 
void ReplicationSlotAlter (const char *name, const bool *failover, const bool *two_phase)
 
void ReplicationSlotAcquire (const char *name, bool nowait, bool error_if_invalid)
 
void ReplicationSlotRelease (void)
 
void ReplicationSlotCleanup (bool synced_only)
 
void ReplicationSlotSave (void)
 
void ReplicationSlotMarkDirty (void)
 
void ReplicationSlotInitialize (void)
 
bool ReplicationSlotValidateName (const char *name, bool allow_reserved_name, int elevel)
 
bool ReplicationSlotValidateNameInternal (const char *name, bool allow_reserved_name, int *err_code, char **err_msg, char **err_hint)
 
void ReplicationSlotReserveWal (void)
 
void ReplicationSlotsComputeRequiredXmin (bool already_locked)
 
void ReplicationSlotsComputeRequiredLSN (void)
 
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN (void)
 
bool ReplicationSlotsCountDBSlots (Oid dboid, int *nslots, int *nactive)
 
bool CheckLogicalSlotExists (void)
 
void ReplicationSlotsDropDBSlots (Oid dboid)
 
bool InvalidateObsoleteReplicationSlots (uint32 possible_causes, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
 
ReplicationSlotSearchNamedReplicationSlot (const char *name, bool need_lock)
 
int ReplicationSlotIndex (ReplicationSlot *slot)
 
bool ReplicationSlotName (int index, Name name)
 
void ReplicationSlotNameForTablesync (Oid suboid, Oid relid, char *syncslotname, Size szslot)
 
void ReplicationSlotDropAtPubNode (WalReceiverConn *wrconn, char *slotname, bool missing_ok)
 
void StartupReplicationSlots (void)
 
void CheckPointReplicationSlots (bool is_shutdown)
 
void CheckSlotRequirements (void)
 
void CheckSlotPermissions (void)
 
ReplicationSlotInvalidationCause GetSlotInvalidationCause (const char *cause_name)
 
const charGetSlotInvalidationCauseName (ReplicationSlotInvalidationCause cause)
 
bool SlotExistsInSyncStandbySlots (const char *slot_name)
 
bool StandbySlotsHaveCaughtup (XLogRecPtr wait_for_lsn, int elevel)
 
void WaitForStandbyConfirmation (XLogRecPtr wait_for_lsn)
 

Variables

PGDLLIMPORT ReplicationSlotCtlDataReplicationSlotCtl
 
PGDLLIMPORT ReplicationSlotMyReplicationSlot
 
PGDLLIMPORT int max_replication_slots
 
PGDLLIMPORT charsynchronized_standby_slots
 
PGDLLIMPORT int idle_replication_slot_timeout_secs
 

Macro Definition Documentation

◆ CONFLICT_DETECTION_SLOT

#define CONFLICT_DETECTION_SLOT   "pg_conflict_detection"

Definition at line 28 of file slot.h.

◆ PG_REPLSLOT_DIR

#define PG_REPLSLOT_DIR   "pg_replslot"

Definition at line 21 of file slot.h.

◆ RS_INVAL_MAX_CAUSES

#define RS_INVAL_MAX_CAUSES   4

Definition at line 72 of file slot.h.

◆ SlotIsLogical

#define SlotIsLogical (   slot)    ((slot)->data.database != InvalidOid)

Definition at line 288 of file slot.h.

◆ SlotIsPhysical

#define SlotIsPhysical (   slot)    ((slot)->data.database == InvalidOid)

Definition at line 287 of file slot.h.

Typedef Documentation

◆ ReplicationSlot

◆ ReplicationSlotCtlData

◆ ReplicationSlotInvalidationCause

◆ ReplicationSlotPersistency

◆ ReplicationSlotPersistentData

◆ SlotSyncSkipReason

Enumeration Type Documentation

◆ ReplicationSlotInvalidationCause

Enumerator
RS_INVAL_NONE 
RS_INVAL_WAL_REMOVED 
RS_INVAL_HORIZON 
RS_INVAL_WAL_LEVEL 
RS_INVAL_IDLE_TIMEOUT 

Definition at line 58 of file slot.h.

59{
60 RS_INVAL_NONE = 0,
61 /* required WAL has been removed */
62 RS_INVAL_WAL_REMOVED = (1 << 0),
63 /* required rows have been removed */
64 RS_INVAL_HORIZON = (1 << 1),
65 /* wal_level insufficient for slot */
66 RS_INVAL_WAL_LEVEL = (1 << 2),
67 /* idle slot timeout has occurred */
68 RS_INVAL_IDLE_TIMEOUT = (1 << 3),
ReplicationSlotInvalidationCause
Definition slot.h:59
@ RS_INVAL_WAL_REMOVED
Definition slot.h:62
@ RS_INVAL_IDLE_TIMEOUT
Definition slot.h:68
@ RS_INVAL_HORIZON
Definition slot.h:64
@ RS_INVAL_WAL_LEVEL
Definition slot.h:66
@ RS_INVAL_NONE
Definition slot.h:60

◆ ReplicationSlotPersistency

Enumerator
RS_PERSISTENT 
RS_EPHEMERAL 
RS_TEMPORARY 

Definition at line 43 of file slot.h.

44{
ReplicationSlotPersistency
Definition slot.h:44
@ RS_PERSISTENT
Definition slot.h:45
@ RS_EPHEMERAL
Definition slot.h:46
@ RS_TEMPORARY
Definition slot.h:47

◆ SlotSyncSkipReason

Enumerator
SS_SKIP_NONE 
SS_SKIP_WAL_NOT_FLUSHED 
SS_SKIP_WAL_OR_ROWS_REMOVED 
SS_SKIP_NO_CONSISTENT_SNAPSHOT 
SS_SKIP_INVALID 

Definition at line 80 of file slot.h.

81{
82 SS_SKIP_NONE, /* No skip */
83 SS_SKIP_WAL_NOT_FLUSHED, /* Standby did not flush the wal corresponding
84 * to confirmed flush of remote slot */
85 SS_SKIP_WAL_OR_ROWS_REMOVED, /* Remote slot is behind; required WAL or
86 * rows may be removed or at risk */
87 SS_SKIP_NO_CONSISTENT_SNAPSHOT, /* Standby could not build a consistent
88 * snapshot */
89 SS_SKIP_INVALID /* Local slot is invalid */
SlotSyncSkipReason
Definition slot.h:81
@ SS_SKIP_WAL_NOT_FLUSHED
Definition slot.h:83
@ SS_SKIP_NO_CONSISTENT_SNAPSHOT
Definition slot.h:87
@ SS_SKIP_NONE
Definition slot.h:82
@ SS_SKIP_INVALID
Definition slot.h:89
@ SS_SKIP_WAL_OR_ROWS_REMOVED
Definition slot.h:85

Function Documentation

◆ CheckLogicalSlotExists()

bool CheckLogicalSlotExists ( void  )
extern

Definition at line 1615 of file slot.c.

1616{
1617 bool found = false;
1618
1619 if (max_replication_slots <= 0)
1620 return false;
1621
1623 for (int i = 0; i < max_replication_slots; i++)
1624 {
1625 ReplicationSlot *s;
1626 bool invalidated;
1627
1629
1630 /* cannot change while ReplicationSlotCtlLock is held */
1631 if (!s->in_use)
1632 continue;
1633
1634 if (SlotIsPhysical(s))
1635 continue;
1636
1638 invalidated = s->data.invalidated != RS_INVAL_NONE;
1640
1641 if (invalidated)
1642 continue;
1643
1644 found = true;
1645 break;
1646 }
1648
1649 return found;
1650}
int i
Definition isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1176
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1793
@ LW_SHARED
Definition lwlock.h:113
static int fb(int x)
int max_replication_slots
Definition slot.c:151
ReplicationSlotCtlData * ReplicationSlotCtl
Definition slot.c:145
#define SlotIsPhysical(slot)
Definition slot.h:287
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
ReplicationSlot replication_slots[1]
Definition slot.h:299
ReplicationSlotInvalidationCause invalidated
Definition slot.h:128
slock_t mutex
Definition slot.h:183
bool in_use
Definition slot.h:186
ReplicationSlotPersistentData data
Definition slot.h:213

References ReplicationSlot::data, fb(), i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, RS_INVAL_NONE, SlotIsPhysical, SpinLockAcquire(), and SpinLockRelease().

Referenced by DisableLogicalDecoding(), and UpdateLogicalDecodingStatusEndOfRecovery().

◆ CheckPointReplicationSlots()

void CheckPointReplicationSlots ( bool  is_shutdown)
extern

Definition at line 2309 of file slot.c.

2310{
2311 int i;
2312 bool last_saved_restart_lsn_updated = false;
2313
2314 elog(DEBUG1, "performing replication slot checkpoint");
2315
2316 /*
2317 * Prevent any slot from being created/dropped while we're active. As we
2318 * explicitly do *not* want to block iterating over replication_slots or
2319 * acquiring a slot we cannot take the control lock - but that's OK,
2320 * because holding ReplicationSlotAllocationLock is strictly stronger, and
2321 * enough to guarantee that nobody can change the in_use bits on us.
2322 *
2323 * Additionally, acquiring the Allocation lock is necessary to serialize
2324 * the slot flush process with concurrent slot WAL reservation. This
2325 * ensures that the WAL position being reserved is either flushed to disk
2326 * or is beyond or equal to the redo pointer of the current checkpoint
2327 * (See ReplicationSlotReserveWal for details).
2328 */
2330
2331 for (i = 0; i < max_replication_slots; i++)
2332 {
2334 char path[MAXPGPATH];
2335
2336 if (!s->in_use)
2337 continue;
2338
2339 /* save the slot to disk, locking is handled in SaveSlotToPath() */
2340 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2341
2342 /*
2343 * Slot's data is not flushed each time the confirmed_flush LSN is
2344 * updated as that could lead to frequent writes. However, we decide
2345 * to force a flush of all logical slot's data at the time of shutdown
2346 * if the confirmed_flush LSN is changed since we last flushed it to
2347 * disk. This helps in avoiding an unnecessary retreat of the
2348 * confirmed_flush LSN after restart.
2349 */
2350 if (is_shutdown && SlotIsLogical(s))
2351 {
2353
2354 if (s->data.invalidated == RS_INVAL_NONE &&
2356 {
2357 s->just_dirtied = true;
2358 s->dirty = true;
2359 }
2361 }
2362
2363 /*
2364 * Track if we're going to update slot's last_saved_restart_lsn. We
2365 * need this to know if we need to recompute the required LSN.
2366 */
2369
2370 SaveSlotToPath(s, path, LOG);
2371 }
2373
2374 /*
2375 * Recompute the required LSN if SaveSlotToPath() updated
2376 * last_saved_restart_lsn for any slot.
2377 */
2380}
#define NameStr(name)
Definition c.h:777
#define LOG
Definition elog.h:31
#define DEBUG1
Definition elog.h:30
#define elog(elevel,...)
Definition elog.h:226
#define MAXPGPATH
#define sprintf
Definition port.h:262
static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
Definition slot.c:2509
void ReplicationSlotsComputeRequiredLSN(void)
Definition slot.c:1300
#define PG_REPLSLOT_DIR
Definition slot.h:21
#define SlotIsLogical(slot)
Definition slot.h:288
XLogRecPtr last_saved_confirmed_flush
Definition slot.h:238
bool just_dirtied
Definition slot.h:195
XLogRecPtr last_saved_restart_lsn
Definition slot.h:271

References ReplicationSlotPersistentData::confirmed_flush, ReplicationSlot::data, DEBUG1, ReplicationSlot::dirty, elog, fb(), i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, ReplicationSlot::just_dirtied, ReplicationSlot::last_saved_confirmed_flush, ReplicationSlot::last_saved_restart_lsn, LOG, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, MAXPGPATH, ReplicationSlot::mutex, ReplicationSlotPersistentData::name, NameStr, PG_REPLSLOT_DIR, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, SaveSlotToPath(), SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), and sprintf.

Referenced by CheckPointGuts().

◆ CheckSlotPermissions()

void CheckSlotPermissions ( void  )
extern

Definition at line 1679 of file slot.c.

1680{
1682 ereport(ERROR,
1684 errmsg("permission denied to use replication slots"),
1685 errdetail("Only roles with the %s attribute may use replication slots.",
1686 "REPLICATION")));
1687}
int errcode(int sqlerrcode)
Definition elog.c:874
int errmsg(const char *fmt,...)
Definition elog.c:1093
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define ERROR
Definition elog.h:39
#define ereport(elevel,...)
Definition elog.h:150
Oid GetUserId(void)
Definition miscinit.c:469
bool has_rolreplication(Oid roleid)
Definition miscinit.c:688

References ereport, errcode(), errdetail(), errmsg(), ERROR, fb(), GetUserId(), and has_rolreplication().

Referenced by copy_replication_slot(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_drop_replication_slot(), pg_logical_slot_get_changes_guts(), pg_replication_slot_advance(), and pg_sync_replication_slots().

◆ CheckSlotRequirements()

void CheckSlotRequirements ( void  )
extern

Definition at line 1657 of file slot.c.

1658{
1659 /*
1660 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1661 * needs the same check.
1662 */
1663
1664 if (max_replication_slots == 0)
1665 ereport(ERROR,
1667 errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
1668
1670 ereport(ERROR,
1672 errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1673}
int wal_level
Definition xlog.c:134
@ WAL_LEVEL_REPLICA
Definition xlog.h:76

References ereport, errcode(), errmsg(), ERROR, fb(), max_replication_slots, wal_level, and WAL_LEVEL_REPLICA.

Referenced by CheckLogicalDecodingRequirements(), copy_replication_slot(), pg_create_physical_replication_slot(), and pg_drop_replication_slot().

◆ GetSlotInvalidationCause()

ReplicationSlotInvalidationCause GetSlotInvalidationCause ( const char cause_name)
extern

Definition at line 2915 of file slot.c.

2916{
2917 Assert(cause_name);
2918
2919 /* Search lookup table for the cause having this name */
2920 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2921 {
2922 if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2924 }
2925
2926 Assert(false);
2927 return RS_INVAL_NONE; /* to keep compiler quiet */
2928}
#define Assert(condition)
Definition c.h:885
static const SlotInvalidationCauseMap SlotInvalidationCauses[]
Definition slot.c:113
#define RS_INVAL_MAX_CAUSES
Definition slot.h:72
ReplicationSlotInvalidationCause cause
Definition slot.c:109

References Assert, SlotInvalidationCauseMap::cause, fb(), i, RS_INVAL_MAX_CAUSES, RS_INVAL_NONE, and SlotInvalidationCauses.

Referenced by fetch_remote_slots().

◆ GetSlotInvalidationCauseName()

const char * GetSlotInvalidationCauseName ( ReplicationSlotInvalidationCause  cause)
extern

Definition at line 2935 of file slot.c.

2936{
2937 /* Search lookup table for the name of this cause */
2938 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2939 {
2940 if (SlotInvalidationCauses[i].cause == cause)
2942 }
2943
2944 Assert(false);
2945 return "none"; /* to keep compiler quiet */
2946}
const char * cause_name
Definition slot.c:110

References Assert, SlotInvalidationCauseMap::cause_name, i, RS_INVAL_MAX_CAUSES, and SlotInvalidationCauses.

Referenced by pg_get_replication_slots(), and ReplicationSlotAcquire().

◆ InvalidateObsoleteReplicationSlots()

bool InvalidateObsoleteReplicationSlots ( uint32  possible_causes,
XLogSegNo  oldestSegno,
Oid  dboid,
TransactionId  snapshotConflictHorizon 
)
extern

Definition at line 2205 of file slot.c.

2208{
2210 bool invalidated = false;
2211 bool invalidated_logical = false;
2213
2214 Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2217
2218 if (max_replication_slots == 0)
2219 return invalidated;
2220
2222
2223restart:
2226 for (int i = 0; i < max_replication_slots; i++)
2227 {
2229 bool released_lock = false;
2230
2231 if (!s->in_use)
2232 continue;
2233
2234 /* Prevent invalidation of logical slots during binary upgrade */
2236 {
2240
2241 continue;
2242 }
2243
2245 dboid, snapshotConflictHorizon,
2246 &released_lock))
2247 {
2249
2250 /* Remember we have invalidated a physical or logical slot */
2251 invalidated = true;
2252
2253 /*
2254 * Additionally, remember we have invalidated a logical slot as we
2255 * can request disabling logical decoding later.
2256 */
2257 if (SlotIsLogical(s))
2258 invalidated_logical = true;
2259 }
2260 else
2261 {
2262 /*
2263 * We need to check if the slot is invalidated here since
2264 * InvalidatePossiblyObsoleteSlot() returns false also if the slot
2265 * is already invalidated.
2266 */
2271 }
2272
2273 /* if the lock was released, start from scratch */
2274 if (released_lock)
2275 goto restart;
2276 }
2278
2279 /*
2280 * If any slots have been invalidated, recalculate the resource limits.
2281 */
2282 if (invalidated)
2283 {
2286 }
2287
2288 /*
2289 * Request the checkpointer to disable logical decoding if no valid
2290 * logical slots remain. If called by the checkpointer during a
2291 * checkpoint, only the request is initiated; actual deactivation is
2292 * deferred until after the checkpoint completes.
2293 */
2296
2297 return invalidated;
2298}
bool IsBinaryUpgrade
Definition globals.c:121
void RequestDisableLogicalDecoding(void)
Definition logicalctl.c:434
static bool InvalidatePossiblyObsoleteSlot(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, bool *released_lock_out)
Definition slot.c:1965
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition slot.c:1218
#define TransactionIdIsValid(xid)
Definition transam.h:41
int wal_segment_size
Definition xlog.c:146
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
uint64 XLogRecPtr
Definition xlogdefs.h:21

References Assert, ReplicationSlot::data, fb(), i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidatePossiblyObsoleteSlot(), IsBinaryUpgrade, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), RequestDisableLogicalDecoding(), RS_INVAL_HORIZON, RS_INVAL_NONE, RS_INVAL_WAL_REMOVED, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), TransactionIdIsValid, wal_segment_size, and XLogSegNoOffsetToRecPtr.

Referenced by CreateCheckPoint(), CreateRestartPoint(), ResolveRecoveryConflictWithSnapshot(), and xlog_redo().

◆ ReplicationSlotAcquire()

void ReplicationSlotAcquire ( const char name,
bool  nowait,
bool  error_if_invalid 
)
extern

Definition at line 621 of file slot.c.

622{
624 ProcNumber active_proc;
625 int active_pid;
626
627 Assert(name != NULL);
628
629retry:
631
633
634 /* Check if the slot exists with the given name. */
636 if (s == NULL || !s->in_use)
637 {
639
642 errmsg("replication slot \"%s\" does not exist",
643 name)));
644 }
645
646 /*
647 * Do not allow users to acquire the reserved slot. This scenario may
648 * occur if the launcher that owns the slot has terminated unexpectedly
649 * due to an error, and a backend process attempts to reuse the slot.
650 */
654 errmsg("cannot acquire replication slot \"%s\"", name),
655 errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
656
657 /*
658 * This is the slot we want; check if it's active under some other
659 * process. In single user mode, we don't need this check.
660 */
662 {
663 /*
664 * Get ready to sleep on the slot in case it is active. (We may end
665 * up not sleeping, but we don't want to do this while holding the
666 * spinlock.)
667 */
668 if (!nowait)
670
671 /*
672 * It is important to reset the inactive_since under spinlock here to
673 * avoid race conditions with slot invalidation. See comments related
674 * to inactive_since in InvalidatePossiblyObsoleteSlot.
675 */
679 active_proc = s->active_proc;
682 }
683 else
684 {
685 s->active_proc = active_proc = MyProcNumber;
687 }
688 active_pid = GetPGProcByNumber(active_proc)->pid;
690
691 /*
692 * If we found the slot but it's already active in another process, we
693 * wait until the owning process signals us that it's been released, or
694 * error out.
695 */
696 if (active_proc != MyProcNumber)
697 {
698 if (!nowait)
699 {
700 /* Wait here until we get signaled, and then restart */
704 goto retry;
705 }
706
709 errmsg("replication slot \"%s\" is active for PID %d",
710 NameStr(s->data.name), active_pid)));
711 }
712 else if (!nowait)
713 ConditionVariableCancelSleep(); /* no sleep needed after all */
714
715 /* We made this slot active, so it's ours now. */
717
718 /*
719 * We need to check for invalidation after making the slot ours to avoid
720 * the possible race condition with the checkpointer that can otherwise
721 * invalidate the slot immediately after the check.
722 */
726 errmsg("can no longer access replication slot \"%s\"",
727 NameStr(s->data.name)),
728 errdetail("This replication slot has been invalidated due to \"%s\".",
730
731 /* Let everybody know we've modified this slot */
733
734 /*
735 * The call to pgstat_acquire_replslot() protects against stats for a
736 * different slot, from before a restart or such, being present during
737 * pgstat_report_replslot().
738 */
739 if (SlotIsLogical(s))
741
742
743 if (am_walsender)
744 {
747 ? errmsg("acquired logical replication slot \"%s\"",
748 NameStr(s->data.name))
749 : errmsg("acquired physical replication slot \"%s\"",
750 NameStr(s->data.name)));
751 }
752}
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
ProcNumber MyProcNumber
Definition globals.c:90
bool IsUnderPostmaster
Definition globals.c:120
bool IsLogicalLauncher(void)
Definition launcher.c:1586
const void * data
void pgstat_acquire_replslot(ReplicationSlot *slot)
#define GetPGProcByNumber(n)
Definition proc.h:504
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int ProcNumber
Definition procnumber.h:24
ReplicationSlot * MyReplicationSlot
Definition slot.c:148
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition slot.c:541
const char * GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
Definition slot.c:2935
static bool IsSlotForConflictCheck(const char *name)
Definition slot.c:362
static void ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
Definition slot.h:306
ConditionVariable active_cv
Definition slot.h:219
ProcNumber active_proc
Definition slot.h:192
const char * name
bool am_walsender
Definition walsender.c:123
bool log_replication_commands
Definition walsender.c:133

References ReplicationSlot::active_cv, ReplicationSlot::active_proc, am_walsender, Assert, ConditionVariableBroadcast(), ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), ConditionVariableSleep(), ReplicationSlot::data, DEBUG1, ereport, errcode(), errdetail(), errmsg(), ERROR, fb(), GetPGProcByNumber, GetSlotInvalidationCauseName(), ReplicationSlot::in_use, INVALID_PROC_NUMBER, ReplicationSlotPersistentData::invalidated, IsLogicalLauncher(), IsSlotForConflictCheck(), IsUnderPostmaster, LOG, log_replication_commands, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyProcNumber, MyReplicationSlot, name, ReplicationSlotPersistentData::name, NameStr, pgstat_acquire_replslot(), ReplicationSlotSetInactiveSince(), RS_INVAL_NONE, SearchNamedReplicationSlot(), SlotIsLogical, SpinLockAcquire(), and SpinLockRelease().

Referenced by acquire_conflict_slot_if_exists(), binary_upgrade_check_logical_slot_pending_wal(), drop_local_obsolete_slots(), pg_logical_slot_get_changes_guts(), pg_replication_slot_advance(), ReplicationSlotAlter(), ReplicationSlotDrop(), StartLogicalReplication(), StartReplication(), and synchronize_one_slot().

◆ ReplicationSlotAlter()

void ReplicationSlotAlter ( const char name,
const bool failover,
const bool two_phase 
)
extern

Definition at line 952 of file slot.c.

954{
955 bool update_slot = false;
956
959
960 ReplicationSlotAcquire(name, false, true);
961
965 errmsg("cannot use %s with a physical replication slot",
966 "ALTER_REPLICATION_SLOT"));
967
968 if (RecoveryInProgress())
969 {
970 /*
971 * Do not allow users to alter the slots which are currently being
972 * synced from the primary to the standby.
973 */
977 errmsg("cannot alter replication slot \"%s\"", name),
978 errdetail("This replication slot is being synchronized from the primary server."));
979
980 /*
981 * Do not allow users to enable failover on the standby as we do not
982 * support sync to the cascading standby.
983 */
984 if (failover && *failover)
987 errmsg("cannot enable failover for a replication slot"
988 " on the standby"));
989 }
990
991 if (failover)
992 {
993 /*
994 * Do not allow users to enable failover for temporary slots as we do
995 * not support syncing temporary slots to the standby.
996 */
1000 errmsg("cannot enable failover for a temporary replication slot"));
1001
1003 {
1007
1008 update_slot = true;
1009 }
1010 }
1011
1013 {
1017
1018 update_slot = true;
1019 }
1020
1021 if (update_slot)
1022 {
1025 }
1026
1028}
static bool two_phase
static bool failover
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition slot.c:621
void ReplicationSlotMarkDirty(void)
Definition slot.c:1176
void ReplicationSlotSave(void)
Definition slot.c:1158
void ReplicationSlotRelease(void)
Definition slot.c:761
ReplicationSlotPersistency persistency
Definition slot.h:106
bool RecoveryInProgress(void)
Definition xlog.c:6460

References Assert, ReplicationSlot::data, ereport, errcode(), errdetail(), errmsg(), ERROR, failover, ReplicationSlotPersistentData::failover, fb(), ReplicationSlot::mutex, MyReplicationSlot, name, ReplicationSlotPersistentData::persistency, RecoveryInProgress(), ReplicationSlotAcquire(), ReplicationSlotMarkDirty(), ReplicationSlotRelease(), ReplicationSlotSave(), RS_TEMPORARY, SlotIsPhysical, SpinLockAcquire(), SpinLockRelease(), ReplicationSlotPersistentData::synced, two_phase, and ReplicationSlotPersistentData::two_phase.

Referenced by AlterReplicationSlot().

◆ ReplicationSlotCleanup()

void ReplicationSlotCleanup ( bool  synced_only)
extern

Definition at line 860 of file slot.c.

861{
862 int i;
864 bool dropped_logical = false;
865
867
868restart:
871 for (i = 0; i < max_replication_slots; i++)
872 {
874
875 if (!s->in_use)
876 continue;
877
879
882
883 if ((s->active_proc == MyProcNumber &&
884 (!synced_only || s->data.synced)))
885 {
888 LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
889
890 if (SlotIsLogical(s))
891 dropped_logical = true;
892
894
896 goto restart;
897 }
898 else
900 }
901
903
906}
static void ReplicationSlotDropPtr(ReplicationSlot *slot)
Definition slot.c:1051

References ReplicationSlot::active_cv, ReplicationSlot::active_proc, Assert, ConditionVariableBroadcast(), ReplicationSlot::data, fb(), i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, MyProcNumber, MyReplicationSlot, ReplicationSlotPersistentData::persistency, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotDropPtr(), RequestDisableLogicalDecoding(), RS_INVAL_NONE, RS_TEMPORARY, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), and ReplicationSlotPersistentData::synced.

Referenced by PostgresMain(), ReplicationSlotShmemExit(), slotsync_failure_callback(), slotsync_worker_onexit(), SyncReplicationSlots(), and WalSndErrorCleanup().

◆ ReplicationSlotCreate()

void ReplicationSlotCreate ( const char name,
bool  db_specific,
ReplicationSlotPersistency  persistency,
bool  two_phase,
bool  failover,
bool  synced 
)
extern

Definition at line 379 of file slot.c.

382{
383 ReplicationSlot *slot = NULL;
384 int i;
385
387
388 /*
389 * The logical launcher or pg_upgrade may create or migrate an internal
390 * slot, so using a reserved name is allowed in these cases.
391 */
393 ERROR);
394
395 if (failover)
396 {
397 /*
398 * Do not allow users to create the failover enabled slots on the
399 * standby as we do not support sync to the cascading standby.
400 *
401 * However, failover enabled slots can be created during slot
402 * synchronization because we need to retain the same values as the
403 * remote slot.
404 */
408 errmsg("cannot enable failover for a replication slot created on the standby"));
409
410 /*
411 * Do not allow users to create failover enabled temporary slots,
412 * because temporary slots will not be synced to the standby.
413 *
414 * However, failover enabled temporary slots can be created during
415 * slot synchronization. See the comments atop slotsync.c for details.
416 */
417 if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
420 errmsg("cannot enable failover for a temporary replication slot"));
421 }
422
423 /*
424 * If some other backend ran this code concurrently with us, we'd likely
425 * both allocate the same slot, and that would be bad. We'd also be at
426 * risk of missing a name collision. Also, we don't want to try to create
427 * a new slot while somebody's busy cleaning up an old one, because we
428 * might both be monkeying with the same directory.
429 */
431
432 /*
433 * Check for name collision, and identify an allocatable slot. We need to
434 * hold ReplicationSlotControlLock in shared mode for this, so that nobody
435 * else can change the in_use flags while we're looking at them.
436 */
438 for (i = 0; i < max_replication_slots; i++)
439 {
441
442 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
445 errmsg("replication slot \"%s\" already exists", name)));
446 if (!s->in_use && slot == NULL)
447 slot = s;
448 }
450
451 /* If all slots are in use, we're out of luck. */
452 if (slot == NULL)
455 errmsg("all replication slots are in use"),
456 errhint("Free one or increase \"max_replication_slots\".")));
457
458 /*
459 * Since this slot is not in use, nobody should be looking at any part of
460 * it other than the in_use field unless they're trying to allocate it.
461 * And since we hold ReplicationSlotAllocationLock, nobody except us can
462 * be doing that. So it's safe to initialize the slot.
463 */
464 Assert(!slot->in_use);
466
467 /* first initialize persistent data */
468 memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
469 namestrcpy(&slot->data.name, name);
471 slot->data.persistency = persistency;
472 slot->data.two_phase = two_phase;
474 slot->data.failover = failover;
475 slot->data.synced = synced;
476
477 /* and then data only present in shared memory */
478 slot->just_dirtied = false;
479 slot->dirty = false;
488 slot->inactive_since = 0;
490
491 /*
492 * Create the slot on disk. We haven't actually marked the slot allocated
493 * yet, so no special cleanup is required if this errors out.
494 */
495 CreateSlotOnDisk(slot);
496
497 /*
498 * We need to briefly prevent any other backend from iterating over the
499 * slots while we flip the in_use flag. We also need to set the active
500 * flag while holding the ControlLock as otherwise a concurrent
501 * ReplicationSlotAcquire() could acquire the slot as well.
502 */
504
505 slot->in_use = true;
506
507 /* We can now mark the slot active, and that makes it our slot. */
508 SpinLockAcquire(&slot->mutex);
511 SpinLockRelease(&slot->mutex);
512 MyReplicationSlot = slot;
513
515
516 /*
517 * Create statistics entry for the new logical slot. We don't collect any
518 * stats for physical slots, so no need to create an entry for the same.
519 * See ReplicationSlotDropPtr for why we need to do this before releasing
520 * ReplicationSlotAllocationLock.
521 */
522 if (SlotIsLogical(slot))
524
525 /*
526 * Now that the slot has been marked as in_use and active, it's safe to
527 * let somebody else try to allocate a slot.
528 */
530
531 /* Let everybody know we've modified this slot */
533}
int errhint(const char *fmt,...) pg_attribute_printf(1
Oid MyDatabaseId
Definition globals.c:94
@ LW_EXCLUSIVE
Definition lwlock.h:112
void namestrcpy(Name name, const char *str)
Definition name.c:233
void pgstat_create_replslot(ReplicationSlot *slot)
#define InvalidOid
static void CreateSlotOnDisk(ReplicationSlot *slot)
Definition slot.c:2448
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
Definition slot.c:267
bool IsSyncingReplicationSlots(void)
Definition slotsync.c:1820
#define ERRCODE_DUPLICATE_OBJECT
Definition streamutil.c:30
XLogRecPtr candidate_xmin_lsn
Definition slot.h:229
TransactionId effective_catalog_xmin
Definition slot.h:210
XLogRecPtr candidate_restart_valid
Definition slot.h:230
SlotSyncSkipReason slotsync_skip_reason
Definition slot.h:284
TransactionId effective_xmin
Definition slot.h:209
XLogRecPtr candidate_restart_lsn
Definition slot.h:231
TransactionId candidate_catalog_xmin
Definition slot.h:228
TimestampTz inactive_since
Definition slot.h:245
#define InvalidTransactionId
Definition transam.h:31
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References ReplicationSlot::active_cv, ReplicationSlot::active_proc, Assert, ReplicationSlot::candidate_catalog_xmin, ReplicationSlot::candidate_restart_lsn, ReplicationSlot::candidate_restart_valid, ReplicationSlot::candidate_xmin_lsn, ConditionVariableBroadcast(), CreateSlotOnDisk(), ReplicationSlot::data, ReplicationSlotPersistentData::database, ReplicationSlot::dirty, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errhint(), errmsg(), ERROR, failover, ReplicationSlotPersistentData::failover, fb(), i, ReplicationSlot::in_use, ReplicationSlot::inactive_since, INVALID_PROC_NUMBER, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsBinaryUpgrade, IsLogicalLauncher(), IsSyncingReplicationSlots(), ReplicationSlot::just_dirtied, ReplicationSlot::last_saved_confirmed_flush, ReplicationSlot::last_saved_restart_lsn, LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, MyDatabaseId, MyProcNumber, MyReplicationSlot, name, ReplicationSlotPersistentData::name, NameStr, namestrcpy(), ReplicationSlotPersistentData::persistency, pgstat_create_replslot(), RecoveryInProgress(), ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotValidateName(), RS_TEMPORARY, SlotIsLogical, ReplicationSlot::slotsync_skip_reason, SpinLockAcquire(), SpinLockRelease(), SS_SKIP_NONE, ReplicationSlotPersistentData::synced, two_phase, ReplicationSlotPersistentData::two_phase, and ReplicationSlotPersistentData::two_phase_at.

Referenced by create_logical_replication_slot(), create_physical_replication_slot(), CreateConflictDetectionSlot(), CreateReplicationSlot(), and synchronize_one_slot().

◆ ReplicationSlotDrop()

void ReplicationSlotDrop ( const char name,
bool  nowait 
)
extern

Definition at line 912 of file slot.c.

913{
914 bool is_logical;
915
917
918 ReplicationSlotAcquire(name, nowait, false);
919
920 /*
921 * Do not allow users to drop the slots which are currently being synced
922 * from the primary to the standby.
923 */
927 errmsg("cannot drop replication slot \"%s\"", name),
928 errdetail("This replication slot is being synchronized from the primary server."));
929
931
933
934 if (is_logical)
936}
void ReplicationSlotDropAcquired(void)
Definition slot.c:1034

References Assert, ReplicationSlot::data, ereport, errcode(), errdetail(), errmsg(), ERROR, fb(), MyReplicationSlot, name, RecoveryInProgress(), ReplicationSlotAcquire(), ReplicationSlotDropAcquired(), RequestDisableLogicalDecoding(), SlotIsLogical, and ReplicationSlotPersistentData::synced.

Referenced by DropReplicationSlot(), and pg_drop_replication_slot().

◆ ReplicationSlotDropAcquired()

void ReplicationSlotDropAcquired ( void  )
extern

Definition at line 1034 of file slot.c.

1035{
1037
1039
1040 /* slot isn't acquired anymore */
1042
1044}

References Assert, fb(), MyReplicationSlot, and ReplicationSlotDropPtr().

Referenced by ApplyLauncherMain(), drop_local_obsolete_slots(), ReplicationSlotDrop(), ReplicationSlotRelease(), and ReplicationSlotsDropDBSlots().

◆ ReplicationSlotDropAtPubNode()

void ReplicationSlotDropAtPubNode ( WalReceiverConn wrconn,
char slotname,
bool  missing_ok 
)
extern

Definition at line 2344 of file subscriptioncmds.c.

2345{
2346 StringInfoData cmd;
2347
2348 Assert(wrconn);
2349
2350 load_file("libpqwalreceiver", false);
2351
2352 initStringInfo(&cmd);
2353 appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
2354
2355 PG_TRY();
2356 {
2357 WalRcvExecResult *res;
2358
2359 res = walrcv_exec(wrconn, cmd.data, 0, NULL);
2360
2361 if (res->status == WALRCV_OK_COMMAND)
2362 {
2363 /* NOTICE. Success. */
2365 (errmsg("dropped replication slot \"%s\" on publisher",
2366 slotname)));
2367 }
2368 else if (res->status == WALRCV_ERROR &&
2369 missing_ok &&
2371 {
2372 /* LOG. Error, but missing_ok = true. */
2373 ereport(LOG,
2374 (errmsg("could not drop replication slot \"%s\" on publisher: %s",
2375 slotname, res->err)));
2376 }
2377 else
2378 {
2379 /* ERROR. */
2380 ereport(ERROR,
2382 errmsg("could not drop replication slot \"%s\" on publisher: %s",
2383 slotname, res->err)));
2384 }
2385
2387 }
2388 PG_FINALLY();
2389 {
2390 pfree(cmd.data);
2391 }
2392 PG_END_TRY();
2393}
void load_file(const char *filename, bool restricted)
Definition dfmgr.c:149
#define PG_TRY(...)
Definition elog.h:372
#define PG_END_TRY(...)
Definition elog.h:397
#define NOTICE
Definition elog.h:35
#define PG_FINALLY(...)
Definition elog.h:389
void pfree(void *pointer)
Definition mcxt.c:1616
const char * quote_identifier(const char *ident)
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
WalRcvExecStatus status
static WalReceiverConn * wrconn
Definition walreceiver.c:94
@ WALRCV_OK_COMMAND
@ WALRCV_ERROR
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)

References appendStringInfo(), Assert, StringInfoData::data, ereport, WalRcvExecResult::err, errcode(), errmsg(), ERROR, fb(), initStringInfo(), load_file(), LOG, NOTICE, pfree(), PG_END_TRY, PG_FINALLY, PG_TRY, quote_identifier(), WalRcvExecResult::sqlstate, WalRcvExecResult::status, walrcv_clear_result(), WALRCV_ERROR, walrcv_exec, WALRCV_OK_COMMAND, and wrconn.

Referenced by AlterSubscription_refresh(), DropSubscription(), LogicalRepSyncTableStart(), and ProcessSyncingTablesForSync().

◆ ReplicationSlotIndex()

◆ ReplicationSlotInitialize()

void ReplicationSlotInitialize ( void  )
extern

Definition at line 242 of file slot.c.

243{
245}
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
static void ReplicationSlotShmemExit(int code, Datum arg)
Definition slot.c:251

References before_shmem_exit(), and ReplicationSlotShmemExit().

Referenced by BaseInit().

◆ ReplicationSlotMarkDirty()

◆ ReplicationSlotName()

bool ReplicationSlotName ( int  index,
Name  name 
)
extern

Definition at line 590 of file slot.c.

591{
592 ReplicationSlot *slot;
593 bool found;
594
596
597 /*
598 * Ensure that the slot cannot be dropped while we copy the name. Don't
599 * need the spinlock as the name of an existing slot cannot change.
600 */
602 found = slot->in_use;
603 if (slot->in_use)
606
607 return found;
608}
Definition type.h:96

References ReplicationSlot::data, fb(), ReplicationSlot::in_use, LW_SHARED, LWLockAcquire(), LWLockRelease(), name, ReplicationSlotPersistentData::name, NameStr, namestrcpy(), ReplicationSlotCtlData::replication_slots, and ReplicationSlotCtl.

Referenced by pgstat_replslot_to_serialized_name_cb().

◆ ReplicationSlotNameForTablesync()

void ReplicationSlotNameForTablesync ( Oid  suboid,
Oid  relid,
char syncslotname,
Size  szslot 
)
extern

Definition at line 1202 of file tablesync.c.

1204{
1205 snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1206 relid, GetSystemIdentifier());
1207}
#define UINT64_FORMAT
Definition c.h:577
#define snprintf
Definition port.h:260
uint64 GetSystemIdentifier(void)
Definition xlog.c:4627

References fb(), GetSystemIdentifier(), snprintf, and UINT64_FORMAT.

Referenced by AlterSubscription_refresh(), DropSubscription(), LogicalRepSyncTableStart(), ProcessSyncingTablesForSync(), and ReportSlotConnectionError().

◆ ReplicationSlotPersist()

◆ ReplicationSlotRelease()

void ReplicationSlotRelease ( void  )
extern

Definition at line 761 of file slot.c.

762{
764 char *slotname = NULL; /* keep compiler quiet */
765 bool is_logical;
766 TimestampTz now = 0;
767
768 Assert(slot != NULL && slot->active_proc != INVALID_PROC_NUMBER);
769
771
772 if (am_walsender)
773 slotname = pstrdup(NameStr(slot->data.name));
774
775 if (slot->data.persistency == RS_EPHEMERAL)
776 {
777 /*
778 * Delete the slot. There is no !PANIC case where this is allowed to
779 * fail, all that may happen is an incomplete cleanup of the on-disk
780 * data.
781 */
783
784 /*
785 * Request to disable logical decoding, even though this slot may not
786 * have been the last logical slot. The checkpointer will verify if
787 * logical decoding should actually be disabled.
788 */
789 if (is_logical)
791 }
792
793 /*
794 * If slot needed to temporarily restrain both data and catalog xmin to
795 * create the catalog snapshot, remove that temporary constraint.
796 * Snapshots can only be exported while the initial snapshot is still
797 * acquired.
798 */
799 if (!TransactionIdIsValid(slot->data.xmin) &&
801 {
802 SpinLockAcquire(&slot->mutex);
804 SpinLockRelease(&slot->mutex);
806 }
807
808 /*
809 * Set the time since the slot has become inactive. We get the current
810 * time beforehand to avoid system call while holding the spinlock.
811 */
813
814 if (slot->data.persistency == RS_PERSISTENT)
815 {
816 /*
817 * Mark persistent slot inactive. We're not freeing it, just
818 * disconnecting, but wake up others that may be waiting for it.
819 */
820 SpinLockAcquire(&slot->mutex);
823 SpinLockRelease(&slot->mutex);
825 }
826 else
828
830
831 /* might not have been set when we've been a plain slot */
836
837 if (am_walsender)
838 {
841 ? errmsg("released logical replication slot \"%s\"",
842 slotname)
843 : errmsg("released physical replication slot \"%s\"",
844 slotname));
845
846 pfree(slotname);
847 }
848}
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1645
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1609
int64 TimestampTz
Definition timestamp.h:39
char * pstrdup(const char *in)
Definition mcxt.c:1781
PGPROC * MyProc
Definition proc.c:67
PROC_HDR * ProcGlobal
Definition proc.c:70
uint8 statusFlags
Definition proc.h:202
int pgxactoff
Definition proc.h:199
uint8 * statusFlags
Definition proc.h:456

References ReplicationSlot::active_cv, ReplicationSlot::active_proc, am_walsender, Assert, ConditionVariableBroadcast(), ReplicationSlot::data, DEBUG1, ReplicationSlot::effective_xmin, ereport, errmsg(), fb(), GetCurrentTimestamp(), INVALID_PROC_NUMBER, InvalidTransactionId, LOG, log_replication_commands, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, now(), ReplicationSlotPersistentData::persistency, pfree(), PGPROC::pgxactoff, ProcGlobal, pstrdup(), ReplicationSlotDropAcquired(), ReplicationSlotsComputeRequiredXmin(), ReplicationSlotSetInactiveSince(), RequestDisableLogicalDecoding(), RS_EPHEMERAL, RS_PERSISTENT, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), PGPROC::statusFlags, PROC_HDR::statusFlags, TransactionIdIsValid, and ReplicationSlotPersistentData::xmin.

Referenced by binary_upgrade_check_logical_slot_pending_wal(), binary_upgrade_create_conflict_detection_slot(), copy_replication_slot(), CreateReplicationSlot(), InvalidatePossiblyObsoleteSlot(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_logical_slot_get_changes_guts(), pg_replication_slot_advance(), PostgresMain(), ReplicationSlotAlter(), ReplicationSlotShmemExit(), slotsync_failure_callback(), slotsync_worker_onexit(), StartLogicalReplication(), StartReplication(), synchronize_one_slot(), and WalSndErrorCleanup().

◆ ReplicationSlotReserveWal()

void ReplicationSlotReserveWal ( void  )
extern

Definition at line 1696 of file slot.c.

1697{
1699 XLogSegNo segno;
1700 XLogRecPtr restart_lsn;
1701
1702 Assert(slot != NULL);
1705
1706 /*
1707 * The replication slot mechanism is used to prevent the removal of
1708 * required WAL.
1709 *
1710 * Acquire an exclusive lock to prevent the checkpoint process from
1711 * concurrently computing the minimum slot LSN (see
1712 * CheckPointReplicationSlots). This ensures that the WAL reserved for
1713 * replication cannot be removed during a checkpoint.
1714 *
1715 * The mechanism is reliable because if WAL reservation occurs first, the
1716 * checkpoint must wait for the restart_lsn update before determining the
1717 * minimum non-removable LSN. On the other hand, if the checkpoint happens
1718 * first, subsequent WAL reservations will select positions at or beyond
1719 * the redo pointer of that checkpoint.
1720 */
1722
1723 /*
1724 * For logical slots log a standby snapshot and start logical decoding at
1725 * exactly that position. That allows the slot to start up more quickly.
1726 * But on a standby we cannot do WAL writes, so just use the replay
1727 * pointer; effectively, an attempt to create a logical slot on standby
1728 * will cause it to wait for an xl_running_xact record to be logged
1729 * independently on the primary, so that a snapshot can be built using the
1730 * record.
1731 *
1732 * None of this is needed (or indeed helpful) for physical slots as
1733 * they'll start replay at the last logged checkpoint anyway. Instead,
1734 * return the location of the last redo LSN, where a base backup has to
1735 * start replay at.
1736 */
1737 if (SlotIsPhysical(slot))
1738 restart_lsn = GetRedoRecPtr();
1739 else if (RecoveryInProgress())
1740 restart_lsn = GetXLogReplayRecPtr(NULL);
1741 else
1742 restart_lsn = GetXLogInsertRecPtr();
1743
1744 SpinLockAcquire(&slot->mutex);
1745 slot->data.restart_lsn = restart_lsn;
1746 SpinLockRelease(&slot->mutex);
1747
1748 /* prevent WAL removal as fast as possible */
1750
1751 /* Checkpoint shouldn't remove the required WAL. */
1753 if (XLogGetLastRemovedSegno() >= segno)
1754 elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
1755 NameStr(slot->data.name));
1756
1758
1759 if (!RecoveryInProgress() && SlotIsLogical(slot))
1760 {
1762
1763 /* make sure we have enough information to start */
1765
1766 /* and make sure it's fsynced to disk */
1768 }
1769}
XLogRecPtr LogStandbySnapshot(void)
Definition standby.c:1282
XLogSegNo XLogGetLastRemovedSegno(void)
Definition xlog.c:3795
XLogRecPtr GetRedoRecPtr(void)
Definition xlog.c:6563
XLogRecPtr GetXLogInsertRecPtr(void)
Definition xlog.c:9608
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2783
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
uint64 XLogSegNo
Definition xlogdefs.h:52
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References Assert, ReplicationSlot::data, elog, ERROR, fb(), GetRedoRecPtr(), GetXLogInsertRecPtr(), GetXLogReplayRecPtr(), ReplicationSlot::last_saved_restart_lsn, LogStandbySnapshot(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SlotIsLogical, SlotIsPhysical, SpinLockAcquire(), SpinLockRelease(), wal_segment_size, XLByteToSeg, XLogFlush(), XLogGetLastRemovedSegno(), and XLogRecPtrIsValid.

Referenced by create_physical_replication_slot(), CreateInitDecodingContext(), and CreateReplicationSlot().

◆ ReplicationSlotSave()

◆ ReplicationSlotsComputeLogicalRestartLSN()

XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN ( void  )
extern

Definition at line 1370 of file slot.c.

1371{
1373 int i;
1374
1375 if (max_replication_slots <= 0)
1376 return InvalidXLogRecPtr;
1377
1379
1380 for (i = 0; i < max_replication_slots; i++)
1381 {
1382 ReplicationSlot *s;
1383 XLogRecPtr restart_lsn;
1384 XLogRecPtr last_saved_restart_lsn;
1385 bool invalidated;
1386 ReplicationSlotPersistency persistency;
1387
1389
1390 /* cannot change while ReplicationSlotCtlLock is held */
1391 if (!s->in_use)
1392 continue;
1393
1394 /* we're only interested in logical slots */
1395 if (!SlotIsLogical(s))
1396 continue;
1397
1398 /* read once, it's ok if it increases while we're checking */
1400 persistency = s->data.persistency;
1401 restart_lsn = s->data.restart_lsn;
1402 invalidated = s->data.invalidated != RS_INVAL_NONE;
1403 last_saved_restart_lsn = s->last_saved_restart_lsn;
1405
1406 /* invalidated slots need not apply */
1407 if (invalidated)
1408 continue;
1409
1410 /*
1411 * For persistent slot use last_saved_restart_lsn to compute the
1412 * oldest LSN for removal of WAL segments. The segments between
1413 * last_saved_restart_lsn and restart_lsn might be needed by a
1414 * persistent slot in the case of database crash. Non-persistent
1415 * slots can't survive the database crash, so we don't care about
1416 * last_saved_restart_lsn for them.
1417 */
1418 if (persistency == RS_PERSISTENT)
1419 {
1420 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1421 restart_lsn > last_saved_restart_lsn)
1422 {
1423 restart_lsn = last_saved_restart_lsn;
1424 }
1425 }
1426
1427 if (!XLogRecPtrIsValid(restart_lsn))
1428 continue;
1429
1430 if (!XLogRecPtrIsValid(result) ||
1431 restart_lsn < result)
1432 result = restart_lsn;
1433 }
1434
1436
1437 return result;
1438}

References ReplicationSlot::data, fb(), i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, ReplicationSlot::last_saved_restart_lsn, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ReplicationSlotPersistentData::persistency, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, RS_PERSISTENT, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), and XLogRecPtrIsValid.

Referenced by CheckPointLogicalRewriteHeap(), and CheckPointSnapBuild().

◆ ReplicationSlotsComputeRequiredLSN()

void ReplicationSlotsComputeRequiredLSN ( void  )
extern

Definition at line 1300 of file slot.c.

1301{
1302 int i;
1304
1306
1308 for (i = 0; i < max_replication_slots; i++)
1309 {
1311 XLogRecPtr restart_lsn;
1312 XLogRecPtr last_saved_restart_lsn;
1313 bool invalidated;
1314 ReplicationSlotPersistency persistency;
1315
1316 if (!s->in_use)
1317 continue;
1318
1320 persistency = s->data.persistency;
1321 restart_lsn = s->data.restart_lsn;
1322 invalidated = s->data.invalidated != RS_INVAL_NONE;
1323 last_saved_restart_lsn = s->last_saved_restart_lsn;
1325
1326 /* invalidated slots need not apply */
1327 if (invalidated)
1328 continue;
1329
1330 /*
1331 * For persistent slot use last_saved_restart_lsn to compute the
1332 * oldest LSN for removal of WAL segments. The segments between
1333 * last_saved_restart_lsn and restart_lsn might be needed by a
1334 * persistent slot in the case of database crash. Non-persistent
1335 * slots can't survive the database crash, so we don't care about
1336 * last_saved_restart_lsn for them.
1337 */
1338 if (persistency == RS_PERSISTENT)
1339 {
1340 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1341 restart_lsn > last_saved_restart_lsn)
1342 {
1343 restart_lsn = last_saved_restart_lsn;
1344 }
1345 }
1346
1347 if (XLogRecPtrIsValid(restart_lsn) &&
1349 restart_lsn < min_required))
1350 min_required = restart_lsn;
1351 }
1353
1355}
void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
Definition xlog.c:2669

References Assert, ReplicationSlot::data, fb(), i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, ReplicationSlot::last_saved_restart_lsn, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ReplicationSlotPersistentData::persistency, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, RS_PERSISTENT, SpinLockAcquire(), SpinLockRelease(), XLogRecPtrIsValid, and XLogSetReplicationSlotMinimumLSN().

Referenced by CheckPointReplicationSlots(), copy_replication_slot(), InvalidateObsoleteReplicationSlots(), LogicalConfirmReceivedLocation(), pg_replication_slot_advance(), PhysicalConfirmReceivedLocation(), ReplicationSlotDropPtr(), ReplicationSlotReserveWal(), reserve_wal_for_local_slot(), StartupReplicationSlots(), and update_local_synced_slot().

◆ ReplicationSlotsComputeRequiredXmin()

void ReplicationSlotsComputeRequiredXmin ( bool  already_locked)
extern

Definition at line 1218 of file slot.c.

1219{
1220 int i;
1223
1228
1229 /*
1230 * Hold the ReplicationSlotControlLock until after updating the slot xmin
1231 * values, so no backend updates the initial xmin for newly created slot
1232 * concurrently. A shared lock is used here to minimize lock contention,
1233 * especially when many slots exist and advancements occur frequently.
1234 * This is safe since an exclusive lock is taken during initial slot xmin
1235 * update in slot creation.
1236 *
1237 * One might think that we can hold the ProcArrayLock exclusively and
1238 * update the slot xmin values, but it could increase lock contention on
1239 * the ProcArrayLock, which is not great since this function can be called
1240 * at non-negligible frequency.
1241 *
1242 * Concurrent invocation of this function may cause the computed slot xmin
1243 * to regress. However, this is harmless because tuples prior to the most
1244 * recent xmin are no longer useful once advancement occurs (see
1245 * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
1246 * before updating the effective_xmin). Thus, such regression merely
1247 * prevents VACUUM from prematurely removing tuples without causing the
1248 * early deletion of required data.
1249 */
1250 if (!already_locked)
1252
1253 for (i = 0; i < max_replication_slots; i++)
1254 {
1256 TransactionId effective_xmin;
1257 TransactionId effective_catalog_xmin;
1258 bool invalidated;
1259
1260 if (!s->in_use)
1261 continue;
1262
1264 effective_xmin = s->effective_xmin;
1265 effective_catalog_xmin = s->effective_catalog_xmin;
1266 invalidated = s->data.invalidated != RS_INVAL_NONE;
1268
1269 /* invalidated slots need not apply */
1270 if (invalidated)
1271 continue;
1272
1273 /* check the data xmin */
1274 if (TransactionIdIsValid(effective_xmin) &&
1276 TransactionIdPrecedes(effective_xmin, agg_xmin)))
1277 agg_xmin = effective_xmin;
1278
1279 /* check the catalog xmin */
1280 if (TransactionIdIsValid(effective_catalog_xmin) &&
1282 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1283 agg_catalog_xmin = effective_catalog_xmin;
1284 }
1285
1287
1288 if (!already_locked)
1290}
uint32 TransactionId
Definition c.h:678
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1955
void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked)
Definition procarray.c:3954
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263

References Assert, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, fb(), i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidTransactionId, LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockHeldByMeInMode(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ProcArraySetReplicationSlotXmin(), ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, RS_INVAL_NONE, SpinLockAcquire(), SpinLockRelease(), TransactionIdIsValid, and TransactionIdPrecedes().

Referenced by copy_replication_slot(), CreateInitDecodingContext(), init_conflict_slot_xmin(), InvalidateObsoleteReplicationSlots(), LogicalConfirmReceivedLocation(), pg_replication_slot_advance(), PhysicalReplicationSlotNewXmin(), ReplicationSlotDropPtr(), ReplicationSlotRelease(), StartupReplicationSlots(), synchronize_one_slot(), update_conflict_slot_xmin(), and update_local_synced_slot().

◆ ReplicationSlotsCountDBSlots()

bool ReplicationSlotsCountDBSlots ( Oid  dboid,
int nslots,
int nactive 
)
extern

Definition at line 1449 of file slot.c.

1450{
1451 int i;
1452
1453 *nslots = *nactive = 0;
1454
1455 if (max_replication_slots <= 0)
1456 return false;
1457
1459 for (i = 0; i < max_replication_slots; i++)
1460 {
1461 ReplicationSlot *s;
1462
1464
1465 /* cannot change while ReplicationSlotCtlLock is held */
1466 if (!s->in_use)
1467 continue;
1468
1469 /* only logical slots are database specific, skip */
1470 if (!SlotIsLogical(s))
1471 continue;
1472
1473 /* not our database, skip */
1474 if (s->data.database != dboid)
1475 continue;
1476
1477 /* NB: intentionally counting invalidated slots */
1478
1479 /* count slots with spinlock held */
1481 (*nslots)++;
1483 (*nactive)++;
1485 }
1487
1488 if (*nslots > 0)
1489 return true;
1490 return false;
1491}

References ReplicationSlot::active_proc, ReplicationSlot::data, ReplicationSlotPersistentData::database, fb(), i, ReplicationSlot::in_use, INVALID_PROC_NUMBER, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, SlotIsLogical, SpinLockAcquire(), and SpinLockRelease().

Referenced by dropdb().

◆ ReplicationSlotsDropDBSlots()

void ReplicationSlotsDropDBSlots ( Oid  dboid)
extern

Definition at line 1510 of file slot.c.

1511{
1512 int i;
1514 bool dropped = false;
1515
1516 if (max_replication_slots <= 0)
1517 return;
1518
1519restart:
1522 for (i = 0; i < max_replication_slots; i++)
1523 {
1524 ReplicationSlot *s;
1525 char *slotname;
1526 ProcNumber active_proc;
1527
1529
1530 /* cannot change while ReplicationSlotCtlLock is held */
1531 if (!s->in_use)
1532 continue;
1533
1534 /* only logical slots are database specific, skip */
1535 if (!SlotIsLogical(s))
1536 continue;
1537
1538 /*
1539 * Check logical slots on other databases too so we can disable
1540 * logical decoding only if no slots in the cluster.
1541 */
1545
1546 /* not our database, skip */
1547 if (s->data.database != dboid)
1548 continue;
1549
1550 /* NB: intentionally including invalidated slots to drop */
1551
1552 /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1554 /* can't change while ReplicationSlotControlLock is held */
1555 slotname = NameStr(s->data.name);
1556 active_proc = s->active_proc;
1557 if (active_proc == INVALID_PROC_NUMBER)
1558 {
1561 }
1563
1564 /*
1565 * Even though we hold an exclusive lock on the database object a
1566 * logical slot for that DB can still be active, e.g. if it's
1567 * concurrently being dropped by a backend connected to another DB.
1568 *
1569 * That's fairly unlikely in practice, so we'll just bail out.
1570 *
1571 * The slot sync worker holds a shared lock on the database before
1572 * operating on synced logical slots to avoid conflict with the drop
1573 * happening here. The persistent synced slots are thus safe but there
1574 * is a possibility that the slot sync worker has created a temporary
1575 * slot (which stays active even on release) and we are trying to drop
1576 * that here. In practice, the chances of hitting this scenario are
1577 * less as during slot synchronization, the temporary slot is
1578 * immediately converted to persistent and thus is safe due to the
1579 * shared lock taken on the database. So, we'll just bail out in such
1580 * a case.
1581 *
1582 * XXX: We can consider shutting down the slot sync worker before
1583 * trying to drop synced temporary slots here.
1584 */
1585 if (active_proc != INVALID_PROC_NUMBER)
1586 ereport(ERROR,
1588 errmsg("replication slot \"%s\" is active for PID %d",
1589 slotname, GetPGProcByNumber(active_proc)->pid)));
1590
1591 /*
1592 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1593 * holding ReplicationSlotControlLock over filesystem operations,
1594 * release ReplicationSlotControlLock and use
1595 * ReplicationSlotDropAcquired.
1596 *
1597 * As that means the set of slots could change, restart scan from the
1598 * beginning each time we release the lock.
1599 */
1602 dropped = true;
1603 goto restart;
1604 }
1606
1607 if (dropped && !found_valid_logicalslot)
1609}

References ReplicationSlot::active_proc, ReplicationSlot::data, ReplicationSlotPersistentData::database, ereport, errcode(), errmsg(), ERROR, fb(), GetPGProcByNumber, i, ReplicationSlot::in_use, INVALID_PROC_NUMBER, ReplicationSlotPersistentData::invalidated, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, MyProcNumber, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotDropAcquired(), RequestDisableLogicalDecoding(), RS_INVAL_NONE, SlotIsLogical, SpinLockAcquire(), and SpinLockRelease().

Referenced by dbase_redo(), and dropdb().

◆ ReplicationSlotSetInactiveSince()

◆ ReplicationSlotsShmemInit()

void ReplicationSlotsShmemInit ( void  )
extern

Definition at line 206 of file slot.c.

207{
208 bool found;
209
210 if (max_replication_slots == 0)
211 return;
212
214 ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
215 &found);
216
217 if (!found)
218 {
219 int i;
220
221 /* First time through, so initialize */
223
224 for (i = 0; i < max_replication_slots; i++)
225 {
227
228 /* everything else is zeroed by the memset above */
230 SpinLockInit(&slot->mutex);
234 }
235 }
236}
#define MemSet(start, val, len)
Definition c.h:1035
void ConditionVariableInit(ConditionVariable *cv)
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition lwlock.c:698
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:378
Size ReplicationSlotsShmemSize(void)
Definition slot.c:188
static void SpinLockInit(volatile slock_t *lock)
Definition spin.h:50
LWLock io_in_progress_lock
Definition slot.h:216

References ReplicationSlot::active_cv, ReplicationSlot::active_proc, ConditionVariableInit(), fb(), i, INVALID_PROC_NUMBER, ReplicationSlot::io_in_progress_lock, LWLockInitialize(), max_replication_slots, MemSet, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotsShmemSize(), ShmemInitStruct(), and SpinLockInit().

Referenced by CreateOrAttachShmemStructs().

◆ ReplicationSlotsShmemSize()

Size ReplicationSlotsShmemSize ( void  )
extern

Definition at line 188 of file slot.c.

189{
190 Size size = 0;
191
192 if (max_replication_slots == 0)
193 return size;
194
195 size = offsetof(ReplicationSlotCtlData, replication_slots);
196 size = add_size(size,
198
199 return size;
200}
size_t Size
Definition c.h:631
Size add_size(Size s1, Size s2)
Definition shmem.c:482
Size mul_size(Size s1, Size s2)
Definition shmem.c:497

References add_size(), fb(), max_replication_slots, and mul_size().

Referenced by CalculateShmemSize(), and ReplicationSlotsShmemInit().

◆ ReplicationSlotValidateName()

bool ReplicationSlotValidateName ( const char name,
bool  allow_reserved_name,
int  elevel 
)
extern

Definition at line 267 of file slot.c.

269{
270 int err_code;
271 char *err_msg = NULL;
272 char *err_hint = NULL;
273
275 &err_code, &err_msg, &err_hint))
276 {
277 /*
278 * Use errmsg_internal() and errhint_internal() instead of errmsg()
279 * and errhint(), since the messages from
280 * ReplicationSlotValidateNameInternal() are already translated. This
281 * avoids double translation.
282 */
283 ereport(elevel,
285 errmsg_internal("%s", err_msg),
286 (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
287
288 pfree(err_msg);
289 if (err_hint != NULL)
291 return false;
292 }
293
294 return true;
295}
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
int int errhint_internal(const char *fmt,...) pg_attribute_printf(1
bool ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name, int *err_code, char **err_msg, char **err_hint)
Definition slot.c:312

References ereport, errcode(), errhint_internal(), errmsg_internal(), fb(), name, pfree(), and ReplicationSlotValidateNameInternal().

Referenced by parse_subscription_options(), ReplicationSlotCreate(), and StartupReorderBuffer().

◆ ReplicationSlotValidateNameInternal()

bool ReplicationSlotValidateNameInternal ( const char name,
bool  allow_reserved_name,
int err_code,
char **  err_msg,
char **  err_hint 
)
extern

Definition at line 312 of file slot.c.

314{
315 const char *cp;
316
317 if (strlen(name) == 0)
318 {
320 *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
321 *err_hint = NULL;
322 return false;
323 }
324
325 if (strlen(name) >= NAMEDATALEN)
326 {
328 *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
329 *err_hint = NULL;
330 return false;
331 }
332
333 for (cp = name; *cp; cp++)
334 {
335 if (!((*cp >= 'a' && *cp <= 'z')
336 || (*cp >= '0' && *cp <= '9')
337 || (*cp == '_')))
338 {
340 *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
341 *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
342 return false;
343 }
344 }
345
347 {
349 *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
350 *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
352 return false;
353 }
354
355 return true;
356}
#define _(x)
Definition elog.c:95
#define NAMEDATALEN
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
#define CONFLICT_DETECTION_SLOT
Definition slot.h:28

References _, CONFLICT_DETECTION_SLOT, fb(), IsSlotForConflictCheck(), name, NAMEDATALEN, and psprintf().

Referenced by check_primary_slot_name(), ReplicationSlotValidateName(), and validate_sync_standby_slots().

◆ SearchNamedReplicationSlot()

◆ SlotExistsInSyncStandbySlots()

bool SlotExistsInSyncStandbySlots ( const char slot_name)
extern

Definition at line 3059 of file slot.c.

3060{
3061 const char *standby_slot_name;
3062
3063 /* Return false if there is no value in synchronized_standby_slots */
3065 return false;
3066
3067 /*
3068 * XXX: We are not expecting this list to be long so a linear search
3069 * shouldn't hurt but if that turns out not to be true then we can cache
3070 * this information for each WalSender as well.
3071 */
3073 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3074 {
3075 if (strcmp(standby_slot_name, slot_name) == 0)
3076 return true;
3077
3079 }
3080
3081 return false;
3082}
static SyncStandbySlotsConfigData * synchronized_standby_slots_config
Definition slot.c:167
char slot_names[FLEXIBLE_ARRAY_MEMBER]
Definition slot.c:101

References fb(), i, SyncStandbySlotsConfigData::nslotnames, SyncStandbySlotsConfigData::slot_names, and synchronized_standby_slots_config.

Referenced by PhysicalWakeupLogicalWalSnd().

◆ StandbySlotsHaveCaughtup()

bool StandbySlotsHaveCaughtup ( XLogRecPtr  wait_for_lsn,
int  elevel 
)
extern

Definition at line 3092 of file slot.c.

3093{
3094 const char *name;
3095 int caught_up_slot_num = 0;
3097
3098 /*
3099 * Don't need to wait for the standbys to catch up if there is no value in
3100 * synchronized_standby_slots.
3101 */
3103 return true;
3104
3105 /*
3106 * Don't need to wait for the standbys to catch up if we are on a standby
3107 * server, since we do not support syncing slots to cascading standbys.
3108 */
3109 if (RecoveryInProgress())
3110 return true;
3111
3112 /*
3113 * Don't need to wait for the standbys to catch up if they are already
3114 * beyond the specified WAL location.
3115 */
3118 return true;
3119
3120 /*
3121 * To prevent concurrent slot dropping and creation while filtering the
3122 * slots, take the ReplicationSlotControlLock outside of the loop.
3123 */
3125
3127 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3128 {
3129 XLogRecPtr restart_lsn;
3130 bool invalidated;
3131 bool inactive;
3132 ReplicationSlot *slot;
3133
3134 slot = SearchNamedReplicationSlot(name, false);
3135
3136 /*
3137 * If a slot name provided in synchronized_standby_slots does not
3138 * exist, report a message and exit the loop.
3139 */
3140 if (!slot)
3141 {
3142 ereport(elevel,
3144 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
3145 name, "synchronized_standby_slots"),
3146 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3147 name),
3148 errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
3149 name, "synchronized_standby_slots"));
3150 break;
3151 }
3152
3153 /* Same as above: if a slot is not physical, exit the loop. */
3154 if (SlotIsLogical(slot))
3155 {
3156 ereport(elevel,
3158 errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
3159 name, "synchronized_standby_slots"),
3160 errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
3161 name),
3162 errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
3163 name, "synchronized_standby_slots"));
3164 break;
3165 }
3166
3167 SpinLockAcquire(&slot->mutex);
3168 restart_lsn = slot->data.restart_lsn;
3169 invalidated = slot->data.invalidated != RS_INVAL_NONE;
3171 SpinLockRelease(&slot->mutex);
3172
3173 if (invalidated)
3174 {
3175 /* Specified physical slot has been invalidated */
3176 ereport(elevel,
3178 errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
3179 name, "synchronized_standby_slots"),
3180 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3181 name),
3182 errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
3183 name, "synchronized_standby_slots"));
3184 break;
3185 }
3186
3187 if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
3188 {
3189 /* Log a message if no active_pid for this physical slot */
3190 if (inactive)
3191 ereport(elevel,
3193 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3194 name, "synchronized_standby_slots"),
3195 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3196 name),
3197 errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3198 name, "synchronized_standby_slots"));
3199
3200 /* Continue if the current slot hasn't caught up. */
3201 break;
3202 }
3203
3204 Assert(restart_lsn >= wait_for_lsn);
3205
3207 min_restart_lsn > restart_lsn)
3208 min_restart_lsn = restart_lsn;
3209
3211
3212 name += strlen(name) + 1;
3213 }
3214
3216
3217 /*
3218 * Return false if not all the standbys have caught up to the specified
3219 * WAL location.
3220 */
3222 return false;
3223
3224 /* The ss_oldest_flush_lsn must not retreat. */
3227
3229
3230 return true;
3231}
static XLogRecPtr ss_oldest_flush_lsn
Definition slot.c:173

References ReplicationSlot::active_proc, Assert, ReplicationSlot::data, ereport, errcode(), errdetail(), errhint(), errmsg(), fb(), i, INVALID_PROC_NUMBER, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, name, SyncStandbySlotsConfigData::nslotnames, RecoveryInProgress(), ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, SearchNamedReplicationSlot(), SyncStandbySlotsConfigData::slot_names, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), ss_oldest_flush_lsn, synchronized_standby_slots_config, and XLogRecPtrIsValid.

Referenced by NeedToWaitForStandbys(), and WaitForStandbyConfirmation().

◆ StartupReplicationSlots()

void StartupReplicationSlots ( void  )
extern

Definition at line 2387 of file slot.c.

2388{
2390 struct dirent *replication_de;
2391
2392 elog(DEBUG1, "starting up replication slots");
2393
2394 /* restore all slots by iterating over all on-disk entries */
2397 {
2398 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2400
2401 if (strcmp(replication_de->d_name, ".") == 0 ||
2402 strcmp(replication_de->d_name, "..") == 0)
2403 continue;
2404
2405 snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2407
2408 /* we're only creating directories here, skip if it's not our's */
2410 continue;
2411
2412 /* we crashed while a slot was being setup or deleted, clean up */
2413 if (pg_str_endswith(replication_de->d_name, ".tmp"))
2414 {
2415 if (!rmtree(path, true))
2416 {
2418 (errmsg("could not remove directory \"%s\"",
2419 path)));
2420 continue;
2421 }
2423 continue;
2424 }
2425
2426 /* looks like a slot in a normal state, restore */
2428 }
2430
2431 /* currently no slots exist, we're done. */
2432 if (max_replication_slots <= 0)
2433 return;
2434
2435 /* Now that we have recovered all the data, compute replication xmin */
2438}
#define WARNING
Definition elog.h:36
int FreeDir(DIR *dir)
Definition fd.c:3008
void fsync_fname(const char *fname, bool isdir)
Definition fd.c:756
DIR * AllocateDir(const char *dirname)
Definition fd.c:2890
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition fd.c:2956
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Definition file_utils.c:547
PGFileType
Definition file_utils.h:19
@ PGFILETYPE_DIR
Definition file_utils.h:23
@ PGFILETYPE_ERROR
Definition file_utils.h:20
bool rmtree(const char *path, bool rmtopdir)
Definition rmtree.c:50
static void RestoreSlotFromDisk(const char *name)
Definition slot.c:2672
bool pg_str_endswith(const char *str, const char *end)
Definition string.c:31
Definition dirent.c:26

References AllocateDir(), DEBUG1, elog, ereport, errmsg(), fb(), FreeDir(), fsync_fname(), get_dirent_type(), max_replication_slots, MAXPGPATH, PG_REPLSLOT_DIR, pg_str_endswith(), PGFILETYPE_DIR, PGFILETYPE_ERROR, ReadDir(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), RestoreSlotFromDisk(), rmtree(), snprintf, and WARNING.

Referenced by StartupXLOG().

◆ WaitForStandbyConfirmation()

void WaitForStandbyConfirmation ( XLogRecPtr  wait_for_lsn)
extern

Definition at line 3240 of file slot.c.

3241{
3242 /*
3243 * Don't need to wait for the standby to catch up if the current acquired
3244 * slot is not a logical failover slot, or there is no value in
3245 * synchronized_standby_slots.
3246 */
3248 return;
3249
3251
3252 for (;;)
3253 {
3255
3257 {
3258 ConfigReloadPending = false;
3260 }
3261
3262 /* Exit if done waiting for every slot. */
3264 break;
3265
3266 /*
3267 * Wait for the slots in the synchronized_standby_slots to catch up,
3268 * but use a timeout (1s) so we can also check if the
3269 * synchronized_standby_slots has been changed.
3270 */
3273 }
3274
3276}
bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info)
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
@ PGC_SIGHUP
Definition guc.h:75
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition slot.c:3092
ConditionVariable wal_confirm_rcv_cv
WalSndCtlData * WalSndCtl
Definition walsender.c:117

References CHECK_FOR_INTERRUPTS, ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), ConditionVariableTimedSleep(), ConfigReloadPending, ReplicationSlot::data, ReplicationSlotPersistentData::failover, fb(), MyReplicationSlot, PGC_SIGHUP, ProcessConfigFile(), StandbySlotsHaveCaughtup(), synchronized_standby_slots_config, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtl, and WARNING.

Referenced by LogicalSlotAdvanceAndCheckSnapState(), and pg_logical_slot_get_changes_guts().

Variable Documentation

◆ idle_replication_slot_timeout_secs

PGDLLIMPORT int idle_replication_slot_timeout_secs
extern

◆ max_replication_slots

◆ MyReplicationSlot

PGDLLIMPORT ReplicationSlot* MyReplicationSlot
extern

Definition at line 148 of file slot.c.

Referenced by abort_logical_decoding_activation(), ApplyLauncherMain(), binary_upgrade_check_logical_slot_pending_wal(), compute_min_nonremovable_xid(), copy_replication_slot(), create_logical_replication_slot(), create_physical_replication_slot(), CreateConflictDetectionSlot(), CreateDecodingContext(), CreateInitDecodingContext(), CreateReplicationSlot(), DisableLogicalDecodingIfNecessary(), EnsureLogicalDecodingEnabled(), init_conflict_slot_xmin(), InvalidatePossiblyObsoleteSlot(), LogicalConfirmReceivedLocation(), LogicalIncreaseRestartDecodingForSlot(), LogicalIncreaseXminForSlot(), logicalrep_worker_launch(), LogicalReplicationSlotCheckPendingWal(), LogicalSlotAdvanceAndCheckSnapState(), NeedToWaitForStandbys(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_logical_slot_get_changes_guts(), pg_physical_replication_slot_advance(), pg_replication_slot_advance(), PhysicalConfirmReceivedLocation(), PhysicalReplicationSlotNewXmin(), PhysicalWakeupLogicalWalSnd(), PostgresMain(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyReplyMessage(), ReorderBufferAllocate(), ReorderBufferFree(), ReorderBufferRestoreChanges(), ReorderBufferRestoreCleanup(), ReorderBufferSerializedPath(), ReorderBufferSerializeTXN(), ReplicationSlotAcquire(), ReplicationSlotAlter(), ReplicationSlotCleanup(), ReplicationSlotCreate(), ReplicationSlotDrop(), ReplicationSlotDropAcquired(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), ReplicationSlotsDropDBSlots(), ReplicationSlotShmemExit(), reserve_wal_for_local_slot(), slotsync_failure_callback(), slotsync_worker_onexit(), StartLogicalReplication(), StartReplication(), StartupDecodingContext(), synchronize_one_slot(), update_and_persist_local_synced_slot(), update_conflict_slot_xmin(), update_local_synced_slot(), update_slotsync_skip_stats(), WaitForStandbyConfirmation(), and WalSndErrorCleanup().

◆ ReplicationSlotCtl

◆ synchronized_standby_slots

PGDLLIMPORT char* synchronized_standby_slots
extern

Definition at line 164 of file slot.c.