PostgreSQL Source Code git master
slot.h File Reference
Include dependency graph for slot.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ReplicationSlotPersistentData
 
struct  ReplicationSlot
 
struct  ReplicationSlotCtlData
 

Macros

#define PG_REPLSLOT_DIR   "pg_replslot"
 
#define CONFLICT_DETECTION_SLOT   "pg_conflict_detection"
 
#define RS_INVAL_MAX_CAUSES   4
 
#define SlotIsPhysical(slot)   ((slot)->data.database == InvalidOid)
 
#define SlotIsLogical(slot)   ((slot)->data.database != InvalidOid)
 

Typedefs

typedef enum ReplicationSlotPersistency ReplicationSlotPersistency
 
typedef enum ReplicationSlotInvalidationCause ReplicationSlotInvalidationCause
 
typedef enum SlotSyncSkipReason SlotSyncSkipReason
 
typedef struct ReplicationSlotPersistentData ReplicationSlotPersistentData
 
typedef struct ReplicationSlot ReplicationSlot
 
typedef struct ReplicationSlotCtlData ReplicationSlotCtlData
 

Enumerations

enum  ReplicationSlotPersistency { RS_PERSISTENT , RS_EPHEMERAL , RS_TEMPORARY }
 
enum  ReplicationSlotInvalidationCause {
  RS_INVAL_NONE = 0 , RS_INVAL_WAL_REMOVED = (1 << 0) , RS_INVAL_HORIZON = (1 << 1) , RS_INVAL_WAL_LEVEL = (1 << 2) ,
  RS_INVAL_IDLE_TIMEOUT = (1 << 3)
}
 
enum  SlotSyncSkipReason {
  SS_SKIP_NONE , SS_SKIP_WAL_NOT_FLUSHED , SS_SKIP_WAL_OR_ROWS_REMOVED , SS_SKIP_NO_CONSISTENT_SNAPSHOT ,
  SS_SKIP_INVALID
}
 

Functions

static void ReplicationSlotSetInactiveSince (ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
 
Size ReplicationSlotsShmemSize (void)
 
void ReplicationSlotsShmemInit (void)
 
void ReplicationSlotCreate (const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
 
void ReplicationSlotPersist (void)
 
void ReplicationSlotDrop (const char *name, bool nowait)
 
void ReplicationSlotDropAcquired (void)
 
void ReplicationSlotAlter (const char *name, const bool *failover, const bool *two_phase)
 
void ReplicationSlotAcquire (const char *name, bool nowait, bool error_if_invalid)
 
void ReplicationSlotRelease (void)
 
void ReplicationSlotCleanup (bool synced_only)
 
void ReplicationSlotSave (void)
 
void ReplicationSlotMarkDirty (void)
 
void ReplicationSlotInitialize (void)
 
bool ReplicationSlotValidateName (const char *name, bool allow_reserved_name, int elevel)
 
bool ReplicationSlotValidateNameInternal (const char *name, bool allow_reserved_name, int *err_code, char **err_msg, char **err_hint)
 
void ReplicationSlotReserveWal (void)
 
void ReplicationSlotsComputeRequiredXmin (bool already_locked)
 
void ReplicationSlotsComputeRequiredLSN (void)
 
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN (void)
 
bool ReplicationSlotsCountDBSlots (Oid dboid, int *nslots, int *nactive)
 
bool CheckLogicalSlotExists (void)
 
void ReplicationSlotsDropDBSlots (Oid dboid)
 
bool InvalidateObsoleteReplicationSlots (uint32 possible_causes, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
 
ReplicationSlotSearchNamedReplicationSlot (const char *name, bool need_lock)
 
int ReplicationSlotIndex (ReplicationSlot *slot)
 
bool ReplicationSlotName (int index, Name name)
 
void ReplicationSlotNameForTablesync (Oid suboid, Oid relid, char *syncslotname, Size szslot)
 
void ReplicationSlotDropAtPubNode (WalReceiverConn *wrconn, char *slotname, bool missing_ok)
 
void StartupReplicationSlots (void)
 
void CheckPointReplicationSlots (bool is_shutdown)
 
void CheckSlotRequirements (void)
 
void CheckSlotPermissions (void)
 
ReplicationSlotInvalidationCause GetSlotInvalidationCause (const char *cause_name)
 
const char * GetSlotInvalidationCauseName (ReplicationSlotInvalidationCause cause)
 
bool SlotExistsInSyncStandbySlots (const char *slot_name)
 
bool StandbySlotsHaveCaughtup (XLogRecPtr wait_for_lsn, int elevel)
 
void WaitForStandbyConfirmation (XLogRecPtr wait_for_lsn)
 

Variables

PGDLLIMPORT ReplicationSlotCtlDataReplicationSlotCtl
 
PGDLLIMPORT ReplicationSlotMyReplicationSlot
 
PGDLLIMPORT int max_replication_slots
 
PGDLLIMPORT char * synchronized_standby_slots
 
PGDLLIMPORT int idle_replication_slot_timeout_secs
 

Macro Definition Documentation

◆ CONFLICT_DETECTION_SLOT

#define CONFLICT_DETECTION_SLOT   "pg_conflict_detection"

Definition at line 28 of file slot.h.

◆ PG_REPLSLOT_DIR

#define PG_REPLSLOT_DIR   "pg_replslot"

Definition at line 21 of file slot.h.

◆ RS_INVAL_MAX_CAUSES

#define RS_INVAL_MAX_CAUSES   4

Definition at line 72 of file slot.h.

◆ SlotIsLogical

#define SlotIsLogical (   slot)    ((slot)->data.database != InvalidOid)

Definition at line 285 of file slot.h.

◆ SlotIsPhysical

#define SlotIsPhysical (   slot)    ((slot)->data.database == InvalidOid)

Definition at line 284 of file slot.h.

Typedef Documentation

◆ ReplicationSlot

◆ ReplicationSlotCtlData

◆ ReplicationSlotInvalidationCause

◆ ReplicationSlotPersistency

◆ ReplicationSlotPersistentData

◆ SlotSyncSkipReason

Enumeration Type Documentation

◆ ReplicationSlotInvalidationCause

Enumerator
RS_INVAL_NONE 
RS_INVAL_WAL_REMOVED 
RS_INVAL_HORIZON 
RS_INVAL_WAL_LEVEL 
RS_INVAL_IDLE_TIMEOUT 

Definition at line 58 of file slot.h.

59{
60 RS_INVAL_NONE = 0,
61 /* required WAL has been removed */
62 RS_INVAL_WAL_REMOVED = (1 << 0),
63 /* required rows have been removed */
64 RS_INVAL_HORIZON = (1 << 1),
65 /* wal_level insufficient for slot */
66 RS_INVAL_WAL_LEVEL = (1 << 2),
67 /* idle slot timeout has occurred */
68 RS_INVAL_IDLE_TIMEOUT = (1 << 3),
ReplicationSlotInvalidationCause
Definition: slot.h:59
@ RS_INVAL_WAL_REMOVED
Definition: slot.h:62
@ RS_INVAL_IDLE_TIMEOUT
Definition: slot.h:68
@ RS_INVAL_HORIZON
Definition: slot.h:64
@ RS_INVAL_WAL_LEVEL
Definition: slot.h:66
@ RS_INVAL_NONE
Definition: slot.h:60

◆ ReplicationSlotPersistency

Enumerator
RS_PERSISTENT 
RS_EPHEMERAL 
RS_TEMPORARY 

Definition at line 43 of file slot.h.

44{
ReplicationSlotPersistency
Definition: slot.h:44
@ RS_PERSISTENT
Definition: slot.h:45
@ RS_EPHEMERAL
Definition: slot.h:46
@ RS_TEMPORARY
Definition: slot.h:47

◆ SlotSyncSkipReason

Enumerator
SS_SKIP_NONE 
SS_SKIP_WAL_NOT_FLUSHED 
SS_SKIP_WAL_OR_ROWS_REMOVED 
SS_SKIP_NO_CONSISTENT_SNAPSHOT 
SS_SKIP_INVALID 

Definition at line 80 of file slot.h.

81{
82 SS_SKIP_NONE, /* No skip */
83 SS_SKIP_WAL_NOT_FLUSHED, /* Standby did not flush the wal corresponding
84 * to confirmed flush of remote slot */
85 SS_SKIP_WAL_OR_ROWS_REMOVED, /* Remote slot is behind; required WAL or
86 * rows may be removed or at risk */
87 SS_SKIP_NO_CONSISTENT_SNAPSHOT, /* Standby could not build a consistent
88 * snapshot */
89 SS_SKIP_INVALID /* Local slot is invalid */
SlotSyncSkipReason
Definition: slot.h:81
@ SS_SKIP_WAL_NOT_FLUSHED
Definition: slot.h:83
@ SS_SKIP_NO_CONSISTENT_SNAPSHOT
Definition: slot.h:87
@ SS_SKIP_NONE
Definition: slot.h:82
@ SS_SKIP_INVALID
Definition: slot.h:89
@ SS_SKIP_WAL_OR_ROWS_REMOVED
Definition: slot.h:85

Function Documentation

◆ CheckLogicalSlotExists()

bool CheckLogicalSlotExists ( void  )

Definition at line 1612 of file slot.c.

1613{
1614 bool found = false;
1615
1616 if (max_replication_slots <= 0)
1617 return false;
1618
1619 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1620 for (int i = 0; i < max_replication_slots; i++)
1621 {
1622 ReplicationSlot *s;
1623 bool invalidated;
1624
1626
1627 /* cannot change while ReplicationSlotCtlLock is held */
1628 if (!s->in_use)
1629 continue;
1630
1631 if (SlotIsPhysical(s))
1632 continue;
1633
1635 invalidated = s->data.invalidated != RS_INVAL_NONE;
1637
1638 if (invalidated)
1639 continue;
1640
1641 found = true;
1642 break;
1643 }
1644 LWLockRelease(ReplicationSlotControlLock);
1645
1646 return found;
1647}
int i
Definition: isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1178
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1898
@ LW_SHARED
Definition: lwlock.h:113
int max_replication_slots
Definition: slot.c:151
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:145
#define SlotIsPhysical(slot)
Definition: slot.h:284
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
ReplicationSlot replication_slots[1]
Definition: slot.h:296
ReplicationSlotInvalidationCause invalidated
Definition: slot.h:128
slock_t mutex
Definition: slot.h:183
bool in_use
Definition: slot.h:186
ReplicationSlotPersistentData data
Definition: slot.h:210

References ReplicationSlot::data, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, RS_INVAL_NONE, SlotIsPhysical, SpinLockAcquire, and SpinLockRelease.

Referenced by DisableLogicalDecoding(), and UpdateLogicalDecodingStatusEndOfRecovery().

◆ CheckPointReplicationSlots()

void CheckPointReplicationSlots ( bool  is_shutdown)

Definition at line 2300 of file slot.c.

2301{
2302 int i;
2303 bool last_saved_restart_lsn_updated = false;
2304
2305 elog(DEBUG1, "performing replication slot checkpoint");
2306
2307 /*
2308 * Prevent any slot from being created/dropped while we're active. As we
2309 * explicitly do *not* want to block iterating over replication_slots or
2310 * acquiring a slot we cannot take the control lock - but that's OK,
2311 * because holding ReplicationSlotAllocationLock is strictly stronger, and
2312 * enough to guarantee that nobody can change the in_use bits on us.
2313 *
2314 * Additionally, acquiring the Allocation lock is necessary to serialize
2315 * the slot flush process with concurrent slot WAL reservation. This
2316 * ensures that the WAL position being reserved is either flushed to disk
2317 * or is beyond or equal to the redo pointer of the current checkpoint
2318 * (See ReplicationSlotReserveWal for details).
2319 */
2320 LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
2321
2322 for (i = 0; i < max_replication_slots; i++)
2323 {
2325 char path[MAXPGPATH];
2326
2327 if (!s->in_use)
2328 continue;
2329
2330 /* save the slot to disk, locking is handled in SaveSlotToPath() */
2331 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2332
2333 /*
2334 * Slot's data is not flushed each time the confirmed_flush LSN is
2335 * updated as that could lead to frequent writes. However, we decide
2336 * to force a flush of all logical slot's data at the time of shutdown
2337 * if the confirmed_flush LSN is changed since we last flushed it to
2338 * disk. This helps in avoiding an unnecessary retreat of the
2339 * confirmed_flush LSN after restart.
2340 */
2341 if (is_shutdown && SlotIsLogical(s))
2342 {
2344
2345 if (s->data.invalidated == RS_INVAL_NONE &&
2347 {
2348 s->just_dirtied = true;
2349 s->dirty = true;
2350 }
2352 }
2353
2354 /*
2355 * Track if we're going to update slot's last_saved_restart_lsn. We
2356 * need this to know if we need to recompute the required LSN.
2357 */
2359 last_saved_restart_lsn_updated = true;
2360
2361 SaveSlotToPath(s, path, LOG);
2362 }
2363 LWLockRelease(ReplicationSlotAllocationLock);
2364
2365 /*
2366 * Recompute the required LSN if SaveSlotToPath() updated
2367 * last_saved_restart_lsn for any slot.
2368 */
2369 if (last_saved_restart_lsn_updated)
2371}
#define NameStr(name)
Definition: c.h:771
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
#define elog(elevel,...)
Definition: elog.h:226
#define MAXPGPATH
#define sprintf
Definition: port.h:262
static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
Definition: slot.c:2500
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:1297
#define PG_REPLSLOT_DIR
Definition: slot.h:21
#define SlotIsLogical(slot)
Definition: slot.h:285
XLogRecPtr confirmed_flush
Definition: slot.h:136
XLogRecPtr last_saved_confirmed_flush
Definition: slot.h:235
bool just_dirtied
Definition: slot.h:192
XLogRecPtr last_saved_restart_lsn
Definition: slot.h:268
bool dirty
Definition: slot.h:193

References ReplicationSlotPersistentData::confirmed_flush, ReplicationSlot::data, DEBUG1, ReplicationSlot::dirty, elog, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, ReplicationSlot::just_dirtied, ReplicationSlot::last_saved_confirmed_flush, ReplicationSlot::last_saved_restart_lsn, LOG, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, MAXPGPATH, ReplicationSlot::mutex, ReplicationSlotPersistentData::name, NameStr, PG_REPLSLOT_DIR, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, SaveSlotToPath(), SlotIsLogical, SpinLockAcquire, SpinLockRelease, and sprintf.

Referenced by CheckPointGuts().

◆ CheckSlotPermissions()

void CheckSlotPermissions ( void  )

Definition at line 1676 of file slot.c.

1677{
1679 ereport(ERROR,
1680 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1681 errmsg("permission denied to use replication slots"),
1682 errdetail("Only roles with the %s attribute may use replication slots.",
1683 "REPLICATION")));
1684}
int errdetail(const char *fmt,...)
Definition: elog.c:1216
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:150
Oid GetUserId(void)
Definition: miscinit.c:469
bool has_rolreplication(Oid roleid)
Definition: miscinit.c:688

References ereport, errcode(), errdetail(), errmsg(), ERROR, GetUserId(), and has_rolreplication().

Referenced by copy_replication_slot(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_drop_replication_slot(), pg_logical_slot_get_changes_guts(), pg_replication_slot_advance(), and pg_sync_replication_slots().

◆ CheckSlotRequirements()

void CheckSlotRequirements ( void  )

Definition at line 1654 of file slot.c.

1655{
1656 /*
1657 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1658 * needs the same check.
1659 */
1660
1661 if (max_replication_slots == 0)
1662 ereport(ERROR,
1663 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1664 errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
1665
1667 ereport(ERROR,
1668 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1669 errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1670}
int wal_level
Definition: xlog.c:134
@ WAL_LEVEL_REPLICA
Definition: xlog.h:76

References ereport, errcode(), errmsg(), ERROR, max_replication_slots, wal_level, and WAL_LEVEL_REPLICA.

Referenced by CheckLogicalDecodingRequirements(), copy_replication_slot(), pg_create_physical_replication_slot(), and pg_drop_replication_slot().

◆ GetSlotInvalidationCause()

ReplicationSlotInvalidationCause GetSlotInvalidationCause ( const char *  cause_name)

Definition at line 2906 of file slot.c.

2907{
2908 Assert(cause_name);
2909
2910 /* Search lookup table for the cause having this name */
2911 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2912 {
2913 if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2915 }
2916
2917 Assert(false);
2918 return RS_INVAL_NONE; /* to keep compiler quiet */
2919}
Assert(PointerIsAligned(start, uint64))
static const SlotInvalidationCauseMap SlotInvalidationCauses[]
Definition: slot.c:113
#define RS_INVAL_MAX_CAUSES
Definition: slot.h:72
ReplicationSlotInvalidationCause cause
Definition: slot.c:109

References Assert(), SlotInvalidationCauseMap::cause, i, RS_INVAL_MAX_CAUSES, RS_INVAL_NONE, and SlotInvalidationCauses.

Referenced by fetch_remote_slots().

◆ GetSlotInvalidationCauseName()

const char * GetSlotInvalidationCauseName ( ReplicationSlotInvalidationCause  cause)

Definition at line 2926 of file slot.c.

2927{
2928 /* Search lookup table for the name of this cause */
2929 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2930 {
2931 if (SlotInvalidationCauses[i].cause == cause)
2933 }
2934
2935 Assert(false);
2936 return "none"; /* to keep compiler quiet */
2937}
const char * cause_name
Definition: slot.c:110

References Assert(), SlotInvalidationCauseMap::cause_name, i, RS_INVAL_MAX_CAUSES, and SlotInvalidationCauses.

Referenced by pg_get_replication_slots(), and ReplicationSlotAcquire().

◆ InvalidateObsoleteReplicationSlots()

bool InvalidateObsoleteReplicationSlots ( uint32  possible_causes,
XLogSegNo  oldestSegno,
Oid  dboid,
TransactionId  snapshotConflictHorizon 
)

Definition at line 2196 of file slot.c.

2199{
2200 XLogRecPtr oldestLSN;
2201 bool invalidated = false;
2202 bool invalidated_logical = false;
2203 bool found_valid_logicalslot;
2204
2205 Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2206 Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
2207 Assert(possible_causes != RS_INVAL_NONE);
2208
2209 if (max_replication_slots == 0)
2210 return invalidated;
2211
2212 XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
2213
2214restart:
2215 found_valid_logicalslot = false;
2216 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2217 for (int i = 0; i < max_replication_slots; i++)
2218 {
2220 bool released_lock = false;
2221
2222 if (!s->in_use)
2223 continue;
2224
2225 /* Prevent invalidation of logical slots during binary upgrade */
2227 {
2229 found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
2231
2232 continue;
2233 }
2234
2235 if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN,
2236 dboid, snapshotConflictHorizon,
2237 &released_lock))
2238 {
2239 Assert(released_lock);
2240
2241 /* Remember we have invalidated a physical or logical slot */
2242 invalidated = true;
2243
2244 /*
2245 * Additionally, remember we have invalidated a logical slot as we
2246 * can request disabling logical decoding later.
2247 */
2248 if (SlotIsLogical(s))
2249 invalidated_logical = true;
2250 }
2251 else
2252 {
2253 /*
2254 * We need to check if the slot is invalidated here since
2255 * InvalidatePossiblyObsoleteSlot() returns false also if the slot
2256 * is already invalidated.
2257 */
2259 found_valid_logicalslot |=
2262 }
2263
2264 /* if the lock was released, start from scratch */
2265 if (released_lock)
2266 goto restart;
2267 }
2268 LWLockRelease(ReplicationSlotControlLock);
2269
2270 /*
2271 * If any slots have been invalidated, recalculate the resource limits.
2272 */
2273 if (invalidated)
2274 {
2277 }
2278
2279 /*
2280 * Request the checkpointer to disable logical decoding if no valid
2281 * logical slots remain. If called by the checkpointer during a
2282 * checkpoint, only the request is initiated; actual deactivation is
2283 * deferred until after the checkpoint completes.
2284 */
2285 if (invalidated_logical && !found_valid_logicalslot)
2287
2288 return invalidated;
2289}
bool IsBinaryUpgrade
Definition: globals.c:121
void RequestDisableLogicalDecoding(void)
Definition: logicalctl.c:433
static bool InvalidatePossiblyObsoleteSlot(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, bool *released_lock_out)
Definition: slot.c:1962
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:1215
#define TransactionIdIsValid(xid)
Definition: transam.h:41
int wal_segment_size
Definition: xlog.c:146
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References Assert(), ReplicationSlot::data, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidatePossiblyObsoleteSlot(), IsBinaryUpgrade, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), RequestDisableLogicalDecoding(), RS_INVAL_HORIZON, RS_INVAL_NONE, RS_INVAL_WAL_REMOVED, SlotIsLogical, SpinLockAcquire, SpinLockRelease, TransactionIdIsValid, wal_segment_size, and XLogSegNoOffsetToRecPtr.

Referenced by CreateCheckPoint(), CreateRestartPoint(), ResolveRecoveryConflictWithSnapshot(), and xlog_redo().

◆ ReplicationSlotAcquire()

void ReplicationSlotAcquire ( const char *  name,
bool  nowait,
bool  error_if_invalid 
)

Definition at line 620 of file slot.c.

621{
623 int active_pid;
624
625 Assert(name != NULL);
626
627retry:
628 Assert(MyReplicationSlot == NULL);
629
630 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
631
632 /* Check if the slot exists with the given name. */
634 if (s == NULL || !s->in_use)
635 {
636 LWLockRelease(ReplicationSlotControlLock);
637
639 (errcode(ERRCODE_UNDEFINED_OBJECT),
640 errmsg("replication slot \"%s\" does not exist",
641 name)));
642 }
643
644 /*
645 * Do not allow users to acquire the reserved slot. This scenario may
646 * occur if the launcher that owns the slot has terminated unexpectedly
647 * due to an error, and a backend process attempts to reuse the slot.
648 */
651 errcode(ERRCODE_UNDEFINED_OBJECT),
652 errmsg("cannot acquire replication slot \"%s\"", name),
653 errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
654
655 /*
656 * This is the slot we want; check if it's active under some other
657 * process. In single user mode, we don't need this check.
658 */
660 {
661 /*
662 * Get ready to sleep on the slot in case it is active. (We may end
663 * up not sleeping, but we don't want to do this while holding the
664 * spinlock.)
665 */
666 if (!nowait)
668
669 /*
670 * It is important to reset the inactive_since under spinlock here to
671 * avoid race conditions with slot invalidation. See comments related
672 * to inactive_since in InvalidatePossiblyObsoleteSlot.
673 */
675 if (s->active_pid == 0)
677 active_pid = s->active_pid;
680 }
681 else
682 {
683 s->active_pid = active_pid = MyProcPid;
685 }
686 LWLockRelease(ReplicationSlotControlLock);
687
688 /*
689 * If we found the slot but it's already active in another process, we
690 * wait until the owning process signals us that it's been released, or
691 * error out.
692 */
693 if (active_pid != MyProcPid)
694 {
695 if (!nowait)
696 {
697 /* Wait here until we get signaled, and then restart */
699 WAIT_EVENT_REPLICATION_SLOT_DROP);
701 goto retry;
702 }
703
705 (errcode(ERRCODE_OBJECT_IN_USE),
706 errmsg("replication slot \"%s\" is active for PID %d",
707 NameStr(s->data.name), active_pid)));
708 }
709 else if (!nowait)
710 ConditionVariableCancelSleep(); /* no sleep needed after all */
711
712 /* We made this slot active, so it's ours now. */
714
715 /*
716 * We need to check for invalidation after making the slot ours to avoid
717 * the possible race condition with the checkpointer that can otherwise
718 * invalidate the slot immediately after the check.
719 */
720 if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
722 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
723 errmsg("can no longer access replication slot \"%s\"",
724 NameStr(s->data.name)),
725 errdetail("This replication slot has been invalidated due to \"%s\".",
727
728 /* Let everybody know we've modified this slot */
730
731 /*
732 * The call to pgstat_acquire_replslot() protects against stats for a
733 * different slot, from before a restart or such, being present during
734 * pgstat_report_replslot().
735 */
736 if (SlotIsLogical(s))
738
739
740 if (am_walsender)
741 {
744 ? errmsg("acquired logical replication slot \"%s\"",
745 NameStr(s->data.name))
746 : errmsg("acquired physical replication slot \"%s\"",
747 NameStr(s->data.name)));
748 }
749}
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int MyProcPid
Definition: globals.c:47
bool IsUnderPostmaster
Definition: globals.c:120
bool IsLogicalLauncher(void)
Definition: launcher.c:1587
void pgstat_acquire_replslot(ReplicationSlot *slot)
ReplicationSlot * MyReplicationSlot
Definition: slot.c:148
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:540
const char * GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
Definition: slot.c:2926
static bool IsSlotForConflictCheck(const char *name)
Definition: slot.c:361
static void ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
Definition: slot.h:303
pid_t active_pid
Definition: slot.h:189
ConditionVariable active_cv
Definition: slot.h:216
const char * name
bool am_walsender
Definition: walsender.c:123
bool log_replication_commands
Definition: walsender.c:133

References ReplicationSlot::active_cv, ReplicationSlot::active_pid, am_walsender, Assert(), ConditionVariableBroadcast(), ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), ConditionVariableSleep(), ReplicationSlot::data, DEBUG1, ereport, errcode(), errdetail(), errmsg(), ERROR, GetSlotInvalidationCauseName(), ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, IsLogicalLauncher(), IsSlotForConflictCheck(), IsUnderPostmaster, LOG, log_replication_commands, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyProcPid, MyReplicationSlot, name, ReplicationSlotPersistentData::name, NameStr, pgstat_acquire_replslot(), ReplicationSlotSetInactiveSince(), RS_INVAL_NONE, SearchNamedReplicationSlot(), SlotIsLogical, SpinLockAcquire, and SpinLockRelease.

Referenced by acquire_conflict_slot_if_exists(), binary_upgrade_logical_slot_has_caught_up(), drop_local_obsolete_slots(), pg_logical_slot_get_changes_guts(), pg_replication_slot_advance(), ReplicationSlotAlter(), ReplicationSlotDrop(), StartLogicalReplication(), StartReplication(), and synchronize_one_slot().

◆ ReplicationSlotAlter()

void ReplicationSlotAlter ( const char *  name,
const bool *  failover,
const bool *  two_phase 
)

Definition at line 949 of file slot.c.

951{
952 bool update_slot = false;
953
954 Assert(MyReplicationSlot == NULL);
956
957 ReplicationSlotAcquire(name, false, true);
958
961 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
962 errmsg("cannot use %s with a physical replication slot",
963 "ALTER_REPLICATION_SLOT"));
964
965 if (RecoveryInProgress())
966 {
967 /*
968 * Do not allow users to alter the slots which are currently being
969 * synced from the primary to the standby.
970 */
973 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
974 errmsg("cannot alter replication slot \"%s\"", name),
975 errdetail("This replication slot is being synchronized from the primary server."));
976
977 /*
978 * Do not allow users to enable failover on the standby as we do not
979 * support sync to the cascading standby.
980 */
981 if (failover && *failover)
983 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
984 errmsg("cannot enable failover for a replication slot"
985 " on the standby"));
986 }
987
988 if (failover)
989 {
990 /*
991 * Do not allow users to enable failover for temporary slots as we do
992 * not support syncing temporary slots to the standby.
993 */
996 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
997 errmsg("cannot enable failover for a temporary replication slot"));
998
1000 {
1004
1005 update_slot = true;
1006 }
1007 }
1008
1010 {
1014
1015 update_slot = true;
1016 }
1017
1018 if (update_slot)
1019 {
1022 }
1023
1025}
static bool two_phase
static bool failover
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition: slot.c:620
void ReplicationSlotMarkDirty(void)
Definition: slot.c:1173
void ReplicationSlotSave(void)
Definition: slot.c:1155
void ReplicationSlotRelease(void)
Definition: slot.c:758
ReplicationSlotPersistency persistency
Definition: slot.h:106
bool RecoveryInProgress(void)
Definition: xlog.c:6461

References Assert(), ReplicationSlot::data, ereport, errcode(), errdetail(), errmsg(), ERROR, failover, ReplicationSlotPersistentData::failover, ReplicationSlot::mutex, MyReplicationSlot, name, ReplicationSlotPersistentData::persistency, RecoveryInProgress(), ReplicationSlotAcquire(), ReplicationSlotMarkDirty(), ReplicationSlotRelease(), ReplicationSlotSave(), RS_TEMPORARY, SlotIsPhysical, SpinLockAcquire, SpinLockRelease, ReplicationSlotPersistentData::synced, two_phase, and ReplicationSlotPersistentData::two_phase.

Referenced by AlterReplicationSlot().

◆ ReplicationSlotCleanup()

void ReplicationSlotCleanup ( bool  synced_only)

Definition at line 857 of file slot.c.

858{
859 int i;
860 bool found_valid_logicalslot;
861 bool dropped_logical = false;
862
863 Assert(MyReplicationSlot == NULL);
864
865restart:
866 found_valid_logicalslot = false;
867 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
868 for (i = 0; i < max_replication_slots; i++)
869 {
871
872 if (!s->in_use)
873 continue;
874
876
877 found_valid_logicalslot |=
879
880 if ((s->active_pid == MyProcPid &&
881 (!synced_only || s->data.synced)))
882 {
885 LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
886
887 if (SlotIsLogical(s))
888 dropped_logical = true;
889
891
893 goto restart;
894 }
895 else
897 }
898
899 LWLockRelease(ReplicationSlotControlLock);
900
901 if (dropped_logical && !found_valid_logicalslot)
903}
static void ReplicationSlotDropPtr(ReplicationSlot *slot)
Definition: slot.c:1048

References ReplicationSlot::active_cv, ReplicationSlot::active_pid, Assert(), ConditionVariableBroadcast(), ReplicationSlot::data, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, MyProcPid, MyReplicationSlot, ReplicationSlotPersistentData::persistency, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotDropPtr(), RequestDisableLogicalDecoding(), RS_INVAL_NONE, RS_TEMPORARY, SlotIsLogical, SpinLockAcquire, SpinLockRelease, and ReplicationSlotPersistentData::synced.

Referenced by PostgresMain(), ReplicationSlotShmemExit(), slotsync_failure_callback(), slotsync_worker_onexit(), SyncReplicationSlots(), and WalSndErrorCleanup().

◆ ReplicationSlotCreate()

void ReplicationSlotCreate ( const char *  name,
bool  db_specific,
ReplicationSlotPersistency  persistency,
bool  two_phase,
bool  failover,
bool  synced 
)

Definition at line 378 of file slot.c.

381{
382 ReplicationSlot *slot = NULL;
383 int i;
384
385 Assert(MyReplicationSlot == NULL);
386
387 /*
388 * The logical launcher or pg_upgrade may create or migrate an internal
389 * slot, so using a reserved name is allowed in these cases.
390 */
392 ERROR);
393
394 if (failover)
395 {
396 /*
397 * Do not allow users to create the failover enabled slots on the
398 * standby as we do not support sync to the cascading standby.
399 *
400 * However, failover enabled slots can be created during slot
401 * synchronization because we need to retain the same values as the
402 * remote slot.
403 */
406 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
407 errmsg("cannot enable failover for a replication slot created on the standby"));
408
409 /*
410 * Do not allow users to create failover enabled temporary slots,
411 * because temporary slots will not be synced to the standby.
412 *
413 * However, failover enabled temporary slots can be created during
414 * slot synchronization. See the comments atop slotsync.c for details.
415 */
416 if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
418 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
419 errmsg("cannot enable failover for a temporary replication slot"));
420 }
421
422 /*
423 * If some other backend ran this code concurrently with us, we'd likely
424 * both allocate the same slot, and that would be bad. We'd also be at
425 * risk of missing a name collision. Also, we don't want to try to create
426 * a new slot while somebody's busy cleaning up an old one, because we
427 * might both be monkeying with the same directory.
428 */
429 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
430
431 /*
432 * Check for name collision, and identify an allocatable slot. We need to
433 * hold ReplicationSlotControlLock in shared mode for this, so that nobody
434 * else can change the in_use flags while we're looking at them.
435 */
436 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
437 for (i = 0; i < max_replication_slots; i++)
438 {
440
441 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
444 errmsg("replication slot \"%s\" already exists", name)));
445 if (!s->in_use && slot == NULL)
446 slot = s;
447 }
448 LWLockRelease(ReplicationSlotControlLock);
449
450 /* If all slots are in use, we're out of luck. */
451 if (slot == NULL)
453 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
454 errmsg("all replication slots are in use"),
455 errhint("Free one or increase \"max_replication_slots\".")));
456
457 /*
458 * Since this slot is not in use, nobody should be looking at any part of
459 * it other than the in_use field unless they're trying to allocate it.
460 * And since we hold ReplicationSlotAllocationLock, nobody except us can
461 * be doing that. So it's safe to initialize the slot.
462 */
463 Assert(!slot->in_use);
464 Assert(slot->active_pid == 0);
465
466 /* first initialize persistent data */
467 memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
468 namestrcpy(&slot->data.name, name);
469 slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
470 slot->data.persistency = persistency;
471 slot->data.two_phase = two_phase;
473 slot->data.failover = failover;
474 slot->data.synced = synced;
475
476 /* and then data only present in shared memory */
477 slot->just_dirtied = false;
478 slot->dirty = false;
487 slot->inactive_since = 0;
489
490 /*
491 * Create the slot on disk. We haven't actually marked the slot allocated
492 * yet, so no special cleanup is required if this errors out.
493 */
494 CreateSlotOnDisk(slot);
495
496 /*
497 * We need to briefly prevent any other backend from iterating over the
498 * slots while we flip the in_use flag. We also need to set the active
499 * flag while holding the ControlLock as otherwise a concurrent
500 * ReplicationSlotAcquire() could acquire the slot as well.
501 */
502 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
503
504 slot->in_use = true;
505
506 /* We can now mark the slot active, and that makes it our slot. */
507 SpinLockAcquire(&slot->mutex);
508 Assert(slot->active_pid == 0);
509 slot->active_pid = MyProcPid;
510 SpinLockRelease(&slot->mutex);
511 MyReplicationSlot = slot;
512
513 LWLockRelease(ReplicationSlotControlLock);
514
515 /*
516 * Create statistics entry for the new logical slot. We don't collect any
517 * stats for physical slots, so no need to create an entry for the same.
518 * See ReplicationSlotDropPtr for why we need to do this before releasing
519 * ReplicationSlotAllocationLock.
520 */
521 if (SlotIsLogical(slot))
523
524 /*
525 * Now that the slot has been marked as in_use and active, it's safe to
526 * let somebody else try to allocate a slot.
527 */
528 LWLockRelease(ReplicationSlotAllocationLock);
529
530 /* Let everybody know we've modified this slot */
532}
int errhint(const char *fmt,...)
Definition: elog.c:1330
Oid MyDatabaseId
Definition: globals.c:94
@ LW_EXCLUSIVE
Definition: lwlock.h:112
void namestrcpy(Name name, const char *str)
Definition: name.c:233
void pgstat_create_replslot(ReplicationSlot *slot)
#define InvalidOid
Definition: postgres_ext.h:37
static void CreateSlotOnDisk(ReplicationSlot *slot)
Definition: slot.c:2439
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
Definition: slot.c:266
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1882
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:30
XLogRecPtr candidate_xmin_lsn
Definition: slot.h:226
TransactionId effective_catalog_xmin
Definition: slot.h:207
XLogRecPtr candidate_restart_valid
Definition: slot.h:227
SlotSyncSkipReason slotsync_skip_reason
Definition: slot.h:281
TransactionId effective_xmin
Definition: slot.h:206
XLogRecPtr candidate_restart_lsn
Definition: slot.h:228
TransactionId candidate_catalog_xmin
Definition: slot.h:225
TimestampTz inactive_since
Definition: slot.h:242
#define InvalidTransactionId
Definition: transam.h:31
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References ReplicationSlot::active_cv, ReplicationSlot::active_pid, Assert(), ReplicationSlot::candidate_catalog_xmin, ReplicationSlot::candidate_restart_lsn, ReplicationSlot::candidate_restart_valid, ReplicationSlot::candidate_xmin_lsn, ConditionVariableBroadcast(), CreateSlotOnDisk(), ReplicationSlot::data, ReplicationSlotPersistentData::database, ReplicationSlot::dirty, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errhint(), errmsg(), ERROR, failover, ReplicationSlotPersistentData::failover, i, ReplicationSlot::in_use, ReplicationSlot::inactive_since, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsBinaryUpgrade, IsLogicalLauncher(), IsSyncingReplicationSlots(), ReplicationSlot::just_dirtied, ReplicationSlot::last_saved_confirmed_flush, ReplicationSlot::last_saved_restart_lsn, LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, MyDatabaseId, MyProcPid, MyReplicationSlot, name, ReplicationSlotPersistentData::name, NameStr, namestrcpy(), ReplicationSlotPersistentData::persistency, pgstat_create_replslot(), RecoveryInProgress(), ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotValidateName(), RS_TEMPORARY, SlotIsLogical, ReplicationSlot::slotsync_skip_reason, SpinLockAcquire, SpinLockRelease, SS_SKIP_NONE, ReplicationSlotPersistentData::synced, two_phase, ReplicationSlotPersistentData::two_phase, and ReplicationSlotPersistentData::two_phase_at.

Referenced by create_logical_replication_slot(), create_physical_replication_slot(), CreateConflictDetectionSlot(), CreateReplicationSlot(), and synchronize_one_slot().

◆ ReplicationSlotDrop()

void ReplicationSlotDrop ( const char *  name,
bool  nowait 
)

Definition at line 909 of file slot.c.

910{
911 bool is_logical;
912
913 Assert(MyReplicationSlot == NULL);
914
915 ReplicationSlotAcquire(name, nowait, false);
916
917 /*
918 * Do not allow users to drop the slots which are currently being synced
919 * from the primary to the standby.
920 */
923 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
924 errmsg("cannot drop replication slot \"%s\"", name),
925 errdetail("This replication slot is being synchronized from the primary server."));
926
927 is_logical = SlotIsLogical(MyReplicationSlot);
928
930
931 if (is_logical)
933}
void ReplicationSlotDropAcquired(void)
Definition: slot.c:1031

References Assert(), ReplicationSlot::data, ereport, errcode(), errdetail(), errmsg(), ERROR, MyReplicationSlot, name, RecoveryInProgress(), ReplicationSlotAcquire(), ReplicationSlotDropAcquired(), RequestDisableLogicalDecoding(), SlotIsLogical, and ReplicationSlotPersistentData::synced.

Referenced by DropReplicationSlot(), and pg_drop_replication_slot().

◆ ReplicationSlotDropAcquired()

void ReplicationSlotDropAcquired ( void  )

Definition at line 1031 of file slot.c.

1032{
1034
1035 Assert(MyReplicationSlot != NULL);
1036
1037 /* slot isn't acquired anymore */
1038 MyReplicationSlot = NULL;
1039
1041}

References Assert(), MyReplicationSlot, and ReplicationSlotDropPtr().

Referenced by ApplyLauncherMain(), drop_local_obsolete_slots(), ReplicationSlotDrop(), ReplicationSlotRelease(), and ReplicationSlotsDropDBSlots().

◆ ReplicationSlotDropAtPubNode()

void ReplicationSlotDropAtPubNode ( WalReceiverConn wrconn,
char *  slotname,
bool  missing_ok 
)

Definition at line 2299 of file subscriptioncmds.c.

2300{
2301 StringInfoData cmd;
2302
2303 Assert(wrconn);
2304
2305 load_file("libpqwalreceiver", false);
2306
2307 initStringInfo(&cmd);
2308 appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
2309
2310 PG_TRY();
2311 {
2312 WalRcvExecResult *res;
2313
2314 res = walrcv_exec(wrconn, cmd.data, 0, NULL);
2315
2316 if (res->status == WALRCV_OK_COMMAND)
2317 {
2318 /* NOTICE. Success. */
2320 (errmsg("dropped replication slot \"%s\" on publisher",
2321 slotname)));
2322 }
2323 else if (res->status == WALRCV_ERROR &&
2324 missing_ok &&
2325 res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
2326 {
2327 /* LOG. Error, but missing_ok = true. */
2328 ereport(LOG,
2329 (errmsg("could not drop replication slot \"%s\" on publisher: %s",
2330 slotname, res->err)));
2331 }
2332 else
2333 {
2334 /* ERROR. */
2335 ereport(ERROR,
2336 (errcode(ERRCODE_CONNECTION_FAILURE),
2337 errmsg("could not drop replication slot \"%s\" on publisher: %s",
2338 slotname, res->err)));
2339 }
2340
2342 }
2343 PG_FINALLY();
2344 {
2345 pfree(cmd.data);
2346 }
2347 PG_END_TRY();
2348}
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:149
#define PG_TRY(...)
Definition: elog.h:372
#define PG_END_TRY(...)
Definition: elog.h:397
#define NOTICE
Definition: elog.h:35
#define PG_FINALLY(...)
Definition: elog.h:389
void pfree(void *pointer)
Definition: mcxt.c:1616
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:13062
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
WalRcvExecStatus status
Definition: walreceiver.h:220
static WalReceiverConn * wrconn
Definition: walreceiver.c:94
@ WALRCV_OK_COMMAND
Definition: walreceiver.h:205
@ WALRCV_ERROR
Definition: walreceiver.h:204
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:471
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:465

References appendStringInfo(), Assert(), StringInfoData::data, ereport, WalRcvExecResult::err, errcode(), errmsg(), ERROR, initStringInfo(), load_file(), LOG, NOTICE, pfree(), PG_END_TRY, PG_FINALLY, PG_TRY, quote_identifier(), WalRcvExecResult::sqlstate, WalRcvExecResult::status, walrcv_clear_result(), WALRCV_ERROR, walrcv_exec, WALRCV_OK_COMMAND, and wrconn.

Referenced by AlterSubscription_refresh(), DropSubscription(), LogicalRepSyncTableStart(), and ProcessSyncingTablesForSync().

◆ ReplicationSlotIndex()

◆ ReplicationSlotInitialize()

void ReplicationSlotInitialize ( void  )

Definition at line 241 of file slot.c.

242{
244}
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337
static void ReplicationSlotShmemExit(int code, Datum arg)
Definition: slot.c:250

References before_shmem_exit(), and ReplicationSlotShmemExit().

Referenced by BaseInit().

◆ ReplicationSlotMarkDirty()

◆ ReplicationSlotName()

bool ReplicationSlotName ( int  index,
Name  name 
)

Definition at line 589 of file slot.c.

590{
591 ReplicationSlot *slot;
592 bool found;
593
595
596 /*
597 * Ensure that the slot cannot be dropped while we copy the name. Don't
598 * need the spinlock as the name of an existing slot cannot change.
599 */
600 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
601 found = slot->in_use;
602 if (slot->in_use)
604 LWLockRelease(ReplicationSlotControlLock);
605
606 return found;
607}
Definition: type.h:96

References ReplicationSlot::data, ReplicationSlot::in_use, LW_SHARED, LWLockAcquire(), LWLockRelease(), name, ReplicationSlotPersistentData::name, NameStr, namestrcpy(), ReplicationSlotCtlData::replication_slots, and ReplicationSlotCtl.

Referenced by pgstat_replslot_to_serialized_name_cb().

◆ ReplicationSlotNameForTablesync()

void ReplicationSlotNameForTablesync ( Oid  suboid,
Oid  relid,
char *  syncslotname,
Size  szslot 
)

Definition at line 1203 of file tablesync.c.

1205{
1206 snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1207 relid, GetSystemIdentifier());
1208}
#define UINT64_FORMAT
Definition: c.h:571
#define snprintf
Definition: port.h:260
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4628

References GetSystemIdentifier(), snprintf, and UINT64_FORMAT.

Referenced by AlterSubscription_refresh(), DropSubscription(), LogicalRepSyncTableStart(), ProcessSyncingTablesForSync(), and ReportSlotConnectionError().

◆ ReplicationSlotPersist()

◆ ReplicationSlotRelease()

void ReplicationSlotRelease ( void  )

Definition at line 758 of file slot.c.

759{
761 char *slotname = NULL; /* keep compiler quiet */
762 bool is_logical;
763 TimestampTz now = 0;
764
765 Assert(slot != NULL && slot->active_pid != 0);
766
767 is_logical = SlotIsLogical(slot);
768
769 if (am_walsender)
770 slotname = pstrdup(NameStr(slot->data.name));
771
772 if (slot->data.persistency == RS_EPHEMERAL)
773 {
774 /*
775 * Delete the slot. There is no !PANIC case where this is allowed to
776 * fail, all that may happen is an incomplete cleanup of the on-disk
777 * data.
778 */
780
781 /*
782 * Request to disable logical decoding, even though this slot may not
783 * have been the last logical slot. The checkpointer will verify if
784 * logical decoding should actually be disabled.
785 */
786 if (is_logical)
788 }
789
790 /*
791 * If slot needed to temporarily restrain both data and catalog xmin to
792 * create the catalog snapshot, remove that temporary constraint.
793 * Snapshots can only be exported while the initial snapshot is still
794 * acquired.
795 */
796 if (!TransactionIdIsValid(slot->data.xmin) &&
798 {
799 SpinLockAcquire(&slot->mutex);
801 SpinLockRelease(&slot->mutex);
803 }
804
805 /*
806 * Set the time since the slot has become inactive. We get the current
807 * time beforehand to avoid system call while holding the spinlock.
808 */
810
811 if (slot->data.persistency == RS_PERSISTENT)
812 {
813 /*
814 * Mark persistent slot inactive. We're not freeing it, just
815 * disconnecting, but wake up others that may be waiting for it.
816 */
817 SpinLockAcquire(&slot->mutex);
818 slot->active_pid = 0;
820 SpinLockRelease(&slot->mutex);
822 }
823 else
825
826 MyReplicationSlot = NULL;
827
828 /* might not have been set when we've been a plain slot */
829 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
830 MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
832 LWLockRelease(ProcArrayLock);
833
834 if (am_walsender)
835 {
837 is_logical
838 ? errmsg("released logical replication slot \"%s\"",
839 slotname)
840 : errmsg("released physical replication slot \"%s\"",
841 slotname));
842
843 pfree(slotname);
844 }
845}
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1781
PGPROC * MyProc
Definition: proc.c:67
PROC_HDR * ProcGlobal
Definition: proc.c:79
uint8 statusFlags
Definition: proc.h:259
int pgxactoff
Definition: proc.h:201
uint8 * statusFlags
Definition: proc.h:403
TransactionId xmin
Definition: slot.h:114

References ReplicationSlot::active_cv, ReplicationSlot::active_pid, am_walsender, Assert(), ConditionVariableBroadcast(), ReplicationSlot::data, DEBUG1, ReplicationSlot::effective_xmin, ereport, errmsg(), GetCurrentTimestamp(), InvalidTransactionId, LOG, log_replication_commands, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, now(), ReplicationSlotPersistentData::persistency, pfree(), PGPROC::pgxactoff, ProcGlobal, pstrdup(), ReplicationSlotDropAcquired(), ReplicationSlotsComputeRequiredXmin(), ReplicationSlotSetInactiveSince(), RequestDisableLogicalDecoding(), RS_EPHEMERAL, RS_PERSISTENT, SlotIsLogical, SpinLockAcquire, SpinLockRelease, PGPROC::statusFlags, PROC_HDR::statusFlags, TransactionIdIsValid, and ReplicationSlotPersistentData::xmin.

Referenced by binary_upgrade_create_conflict_detection_slot(), binary_upgrade_logical_slot_has_caught_up(), copy_replication_slot(), CreateReplicationSlot(), InvalidatePossiblyObsoleteSlot(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_logical_slot_get_changes_guts(), pg_replication_slot_advance(), PostgresMain(), ReplicationSlotAlter(), ReplicationSlotShmemExit(), slotsync_failure_callback(), slotsync_worker_onexit(), StartLogicalReplication(), StartReplication(), synchronize_one_slot(), and WalSndErrorCleanup().

◆ ReplicationSlotReserveWal()

void ReplicationSlotReserveWal ( void  )

Definition at line 1693 of file slot.c.

1694{
1696 XLogSegNo segno;
1697 XLogRecPtr restart_lsn;
1698
1699 Assert(slot != NULL);
1702
1703 /*
1704 * The replication slot mechanism is used to prevent the removal of
1705 * required WAL.
1706 *
1707 * Acquire an exclusive lock to prevent the checkpoint process from
1708 * concurrently computing the minimum slot LSN (see
1709 * CheckPointReplicationSlots). This ensures that the WAL reserved for
1710 * replication cannot be removed during a checkpoint.
1711 *
1712 * The mechanism is reliable because if WAL reservation occurs first, the
1713 * checkpoint must wait for the restart_lsn update before determining the
1714 * minimum non-removable LSN. On the other hand, if the checkpoint happens
1715 * first, subsequent WAL reservations will select positions at or beyond
1716 * the redo pointer of that checkpoint.
1717 */
1718 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1719
1720 /*
1721 * For logical slots log a standby snapshot and start logical decoding at
1722 * exactly that position. That allows the slot to start up more quickly.
1723 * But on a standby we cannot do WAL writes, so just use the replay
1724 * pointer; effectively, an attempt to create a logical slot on standby
1725 * will cause it to wait for an xl_running_xact record to be logged
1726 * independently on the primary, so that a snapshot can be built using the
1727 * record.
1728 *
1729 * None of this is needed (or indeed helpful) for physical slots as
1730 * they'll start replay at the last logged checkpoint anyway. Instead,
1731 * return the location of the last redo LSN, where a base backup has to
1732 * start replay at.
1733 */
1734 if (SlotIsPhysical(slot))
1735 restart_lsn = GetRedoRecPtr();
1736 else if (RecoveryInProgress())
1737 restart_lsn = GetXLogReplayRecPtr(NULL);
1738 else
1739 restart_lsn = GetXLogInsertRecPtr();
1740
1741 SpinLockAcquire(&slot->mutex);
1742 slot->data.restart_lsn = restart_lsn;
1743 SpinLockRelease(&slot->mutex);
1744
1745 /* prevent WAL removal as fast as possible */
1747
1748 /* Checkpoint shouldn't remove the required WAL. */
1750 if (XLogGetLastRemovedSegno() >= segno)
1751 elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
1752 NameStr(slot->data.name));
1753
1754 LWLockRelease(ReplicationSlotAllocationLock);
1755
1756 if (!RecoveryInProgress() && SlotIsLogical(slot))
1757 {
1758 XLogRecPtr flushptr;
1759
1760 /* make sure we have enough information to start */
1761 flushptr = LogStandbySnapshot();
1762
1763 /* and make sure it's fsynced to disk */
1764 XLogFlush(flushptr);
1765 }
1766}
XLogRecPtr LogStandbySnapshot(void)
Definition: standby.c:1282
XLogSegNo XLogGetLastRemovedSegno(void)
Definition: xlog.c:3796
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:6564
XLogRecPtr GetXLogInsertRecPtr(void)
Definition: xlog.c:9596
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2784
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLogRecPtrIsValid(r)
Definition: xlogdefs.h:29
uint64 XLogSegNo
Definition: xlogdefs.h:52
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References Assert(), ReplicationSlot::data, elog, ERROR, GetRedoRecPtr(), GetXLogInsertRecPtr(), GetXLogReplayRecPtr(), ReplicationSlot::last_saved_restart_lsn, LogStandbySnapshot(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SlotIsLogical, SlotIsPhysical, SpinLockAcquire, SpinLockRelease, wal_segment_size, XLByteToSeg, XLogFlush(), XLogGetLastRemovedSegno(), and XLogRecPtrIsValid.

Referenced by create_physical_replication_slot(), CreateInitDecodingContext(), and CreateReplicationSlot().

◆ ReplicationSlotSave()

◆ ReplicationSlotsComputeLogicalRestartLSN()

XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN ( void  )

Definition at line 1367 of file slot.c.

1368{
1370 int i;
1371
1372 if (max_replication_slots <= 0)
1373 return InvalidXLogRecPtr;
1374
1375 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1376
1377 for (i = 0; i < max_replication_slots; i++)
1378 {
1379 ReplicationSlot *s;
1380 XLogRecPtr restart_lsn;
1381 XLogRecPtr last_saved_restart_lsn;
1382 bool invalidated;
1383 ReplicationSlotPersistency persistency;
1384
1386
1387 /* cannot change while ReplicationSlotCtlLock is held */
1388 if (!s->in_use)
1389 continue;
1390
1391 /* we're only interested in logical slots */
1392 if (!SlotIsLogical(s))
1393 continue;
1394
1395 /* read once, it's ok if it increases while we're checking */
1397 persistency = s->data.persistency;
1398 restart_lsn = s->data.restart_lsn;
1399 invalidated = s->data.invalidated != RS_INVAL_NONE;
1400 last_saved_restart_lsn = s->last_saved_restart_lsn;
1402
1403 /* invalidated slots need not apply */
1404 if (invalidated)
1405 continue;
1406
1407 /*
1408 * For persistent slot use last_saved_restart_lsn to compute the
1409 * oldest LSN for removal of WAL segments. The segments between
1410 * last_saved_restart_lsn and restart_lsn might be needed by a
1411 * persistent slot in the case of database crash. Non-persistent
1412 * slots can't survive the database crash, so we don't care about
1413 * last_saved_restart_lsn for them.
1414 */
1415 if (persistency == RS_PERSISTENT)
1416 {
1417 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1418 restart_lsn > last_saved_restart_lsn)
1419 {
1420 restart_lsn = last_saved_restart_lsn;
1421 }
1422 }
1423
1424 if (!XLogRecPtrIsValid(restart_lsn))
1425 continue;
1426
1427 if (!XLogRecPtrIsValid(result) ||
1428 restart_lsn < result)
1429 result = restart_lsn;
1430 }
1431
1432 LWLockRelease(ReplicationSlotControlLock);
1433
1434 return result;
1435}

References ReplicationSlot::data, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, ReplicationSlot::last_saved_restart_lsn, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ReplicationSlotPersistentData::persistency, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, RS_PERSISTENT, SlotIsLogical, SpinLockAcquire, SpinLockRelease, and XLogRecPtrIsValid.

Referenced by CheckPointLogicalRewriteHeap(), and CheckPointSnapBuild().

◆ ReplicationSlotsComputeRequiredLSN()

void ReplicationSlotsComputeRequiredLSN ( void  )

Definition at line 1297 of file slot.c.

1298{
1299 int i;
1300 XLogRecPtr min_required = InvalidXLogRecPtr;
1301
1302 Assert(ReplicationSlotCtl != NULL);
1303
1304 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1305 for (i = 0; i < max_replication_slots; i++)
1306 {
1308 XLogRecPtr restart_lsn;
1309 XLogRecPtr last_saved_restart_lsn;
1310 bool invalidated;
1311 ReplicationSlotPersistency persistency;
1312
1313 if (!s->in_use)
1314 continue;
1315
1317 persistency = s->data.persistency;
1318 restart_lsn = s->data.restart_lsn;
1319 invalidated = s->data.invalidated != RS_INVAL_NONE;
1320 last_saved_restart_lsn = s->last_saved_restart_lsn;
1322
1323 /* invalidated slots need not apply */
1324 if (invalidated)
1325 continue;
1326
1327 /*
1328 * For persistent slot use last_saved_restart_lsn to compute the
1329 * oldest LSN for removal of WAL segments. The segments between
1330 * last_saved_restart_lsn and restart_lsn might be needed by a
1331 * persistent slot in the case of database crash. Non-persistent
1332 * slots can't survive the database crash, so we don't care about
1333 * last_saved_restart_lsn for them.
1334 */
1335 if (persistency == RS_PERSISTENT)
1336 {
1337 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1338 restart_lsn > last_saved_restart_lsn)
1339 {
1340 restart_lsn = last_saved_restart_lsn;
1341 }
1342 }
1343
1344 if (XLogRecPtrIsValid(restart_lsn) &&
1345 (!XLogRecPtrIsValid(min_required) ||
1346 restart_lsn < min_required))
1347 min_required = restart_lsn;
1348 }
1349 LWLockRelease(ReplicationSlotControlLock);
1350
1352}
void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
Definition: xlog.c:2670

References Assert(), ReplicationSlot::data, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, ReplicationSlot::last_saved_restart_lsn, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ReplicationSlotPersistentData::persistency, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, RS_PERSISTENT, SpinLockAcquire, SpinLockRelease, XLogRecPtrIsValid, and XLogSetReplicationSlotMinimumLSN().

Referenced by CheckPointReplicationSlots(), copy_replication_slot(), InvalidateObsoleteReplicationSlots(), LogicalConfirmReceivedLocation(), pg_replication_slot_advance(), PhysicalConfirmReceivedLocation(), ReplicationSlotDropPtr(), ReplicationSlotReserveWal(), reserve_wal_for_local_slot(), StartupReplicationSlots(), and update_local_synced_slot().

◆ ReplicationSlotsComputeRequiredXmin()

void ReplicationSlotsComputeRequiredXmin ( bool  already_locked)

Definition at line 1215 of file slot.c.

1216{
1217 int i;
1219 TransactionId agg_catalog_xmin = InvalidTransactionId;
1220
1221 Assert(ReplicationSlotCtl != NULL);
1222 Assert(!already_locked ||
1223 (LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_EXCLUSIVE) &&
1224 LWLockHeldByMeInMode(ProcArrayLock, LW_EXCLUSIVE)));
1225
1226 /*
1227 * Hold the ReplicationSlotControlLock until after updating the slot xmin
1228 * values, so no backend updates the initial xmin for newly created slot
1229 * concurrently. A shared lock is used here to minimize lock contention,
1230 * especially when many slots exist and advancements occur frequently.
1231 * This is safe since an exclusive lock is taken during initial slot xmin
1232 * update in slot creation.
1233 *
1234 * One might think that we can hold the ProcArrayLock exclusively and
1235 * update the slot xmin values, but it could increase lock contention on
1236 * the ProcArrayLock, which is not great since this function can be called
1237 * at non-negligible frequency.
1238 *
1239 * Concurrent invocation of this function may cause the computed slot xmin
1240 * to regress. However, this is harmless because tuples prior to the most
1241 * recent xmin are no longer useful once advancement occurs (see
1242 * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
1243 * before updating the effective_xmin). Thus, such regression merely
1244 * prevents VACUUM from prematurely removing tuples without causing the
1245 * early deletion of required data.
1246 */
1247 if (!already_locked)
1248 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1249
1250 for (i = 0; i < max_replication_slots; i++)
1251 {
1253 TransactionId effective_xmin;
1254 TransactionId effective_catalog_xmin;
1255 bool invalidated;
1256
1257 if (!s->in_use)
1258 continue;
1259
1261 effective_xmin = s->effective_xmin;
1262 effective_catalog_xmin = s->effective_catalog_xmin;
1263 invalidated = s->data.invalidated != RS_INVAL_NONE;
1265
1266 /* invalidated slots need not apply */
1267 if (invalidated)
1268 continue;
1269
1270 /* check the data xmin */
1271 if (TransactionIdIsValid(effective_xmin) &&
1272 (!TransactionIdIsValid(agg_xmin) ||
1273 TransactionIdPrecedes(effective_xmin, agg_xmin)))
1274 agg_xmin = effective_xmin;
1275
1276 /* check the catalog xmin */
1277 if (TransactionIdIsValid(effective_catalog_xmin) &&
1278 (!TransactionIdIsValid(agg_catalog_xmin) ||
1279 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1280 agg_catalog_xmin = effective_catalog_xmin;
1281 }
1282
1283 ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1284
1285 if (!already_locked)
1286 LWLockRelease(ReplicationSlotControlLock);
1287}
uint32 TransactionId
Definition: c.h:672
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:2025
void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked)
Definition: procarray.c:3922
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.h:263

References Assert(), ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidTransactionId, LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockHeldByMeInMode(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ProcArraySetReplicationSlotXmin(), ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, RS_INVAL_NONE, SpinLockAcquire, SpinLockRelease, TransactionIdIsValid, and TransactionIdPrecedes().

Referenced by copy_replication_slot(), CreateInitDecodingContext(), init_conflict_slot_xmin(), InvalidateObsoleteReplicationSlots(), LogicalConfirmReceivedLocation(), pg_replication_slot_advance(), PhysicalReplicationSlotNewXmin(), ReplicationSlotDropPtr(), ReplicationSlotRelease(), StartupReplicationSlots(), synchronize_one_slot(), update_conflict_slot_xmin(), and update_local_synced_slot().

◆ ReplicationSlotsCountDBSlots()

bool ReplicationSlotsCountDBSlots ( Oid  dboid,
int *  nslots,
int *  nactive 
)

Definition at line 1446 of file slot.c.

1447{
1448 int i;
1449
1450 *nslots = *nactive = 0;
1451
1452 if (max_replication_slots <= 0)
1453 return false;
1454
1455 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1456 for (i = 0; i < max_replication_slots; i++)
1457 {
1458 ReplicationSlot *s;
1459
1461
1462 /* cannot change while ReplicationSlotCtlLock is held */
1463 if (!s->in_use)
1464 continue;
1465
1466 /* only logical slots are database specific, skip */
1467 if (!SlotIsLogical(s))
1468 continue;
1469
1470 /* not our database, skip */
1471 if (s->data.database != dboid)
1472 continue;
1473
1474 /* NB: intentionally counting invalidated slots */
1475
1476 /* count slots with spinlock held */
1478 (*nslots)++;
1479 if (s->active_pid != 0)
1480 (*nactive)++;
1482 }
1483 LWLockRelease(ReplicationSlotControlLock);
1484
1485 if (*nslots > 0)
1486 return true;
1487 return false;
1488}

References ReplicationSlot::active_pid, ReplicationSlot::data, ReplicationSlotPersistentData::database, i, ReplicationSlot::in_use, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, SlotIsLogical, SpinLockAcquire, and SpinLockRelease.

Referenced by dropdb().

◆ ReplicationSlotsDropDBSlots()

void ReplicationSlotsDropDBSlots ( Oid  dboid)

Definition at line 1507 of file slot.c.

1508{
1509 int i;
1510 bool found_valid_logicalslot;
1511 bool dropped = false;
1512
1513 if (max_replication_slots <= 0)
1514 return;
1515
1516restart:
1517 found_valid_logicalslot = false;
1518 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1519 for (i = 0; i < max_replication_slots; i++)
1520 {
1521 ReplicationSlot *s;
1522 char *slotname;
1523 int active_pid;
1524
1526
1527 /* cannot change while ReplicationSlotCtlLock is held */
1528 if (!s->in_use)
1529 continue;
1530
1531 /* only logical slots are database specific, skip */
1532 if (!SlotIsLogical(s))
1533 continue;
1534
1535 /*
1536 * Check logical slots on other databases too so we can disable
1537 * logical decoding only if no slots in the cluster.
1538 */
1540 found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
1542
1543 /* not our database, skip */
1544 if (s->data.database != dboid)
1545 continue;
1546
1547 /* NB: intentionally including invalidated slots to drop */
1548
1549 /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1551 /* can't change while ReplicationSlotControlLock is held */
1552 slotname = NameStr(s->data.name);
1553 active_pid = s->active_pid;
1554 if (active_pid == 0)
1555 {
1557 s->active_pid = MyProcPid;
1558 }
1560
1561 /*
1562 * Even though we hold an exclusive lock on the database object a
1563 * logical slot for that DB can still be active, e.g. if it's
1564 * concurrently being dropped by a backend connected to another DB.
1565 *
1566 * That's fairly unlikely in practice, so we'll just bail out.
1567 *
1568 * The slot sync worker holds a shared lock on the database before
1569 * operating on synced logical slots to avoid conflict with the drop
1570 * happening here. The persistent synced slots are thus safe but there
1571 * is a possibility that the slot sync worker has created a temporary
1572 * slot (which stays active even on release) and we are trying to drop
1573 * that here. In practice, the chances of hitting this scenario are
1574 * less as during slot synchronization, the temporary slot is
1575 * immediately converted to persistent and thus is safe due to the
1576 * shared lock taken on the database. So, we'll just bail out in such
1577 * a case.
1578 *
1579 * XXX: We can consider shutting down the slot sync worker before
1580 * trying to drop synced temporary slots here.
1581 */
1582 if (active_pid)
1583 ereport(ERROR,
1584 (errcode(ERRCODE_OBJECT_IN_USE),
1585 errmsg("replication slot \"%s\" is active for PID %d",
1586 slotname, active_pid)));
1587
1588 /*
1589 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1590 * holding ReplicationSlotControlLock over filesystem operations,
1591 * release ReplicationSlotControlLock and use
1592 * ReplicationSlotDropAcquired.
1593 *
1594 * As that means the set of slots could change, restart scan from the
1595 * beginning each time we release the lock.
1596 */
1597 LWLockRelease(ReplicationSlotControlLock);
1599 dropped = true;
1600 goto restart;
1601 }
1602 LWLockRelease(ReplicationSlotControlLock);
1603
1604 if (dropped && !found_valid_logicalslot)
1606}

References ReplicationSlot::active_pid, ReplicationSlot::data, ReplicationSlotPersistentData::database, ereport, errcode(), errmsg(), ERROR, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, MyProcPid, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotDropAcquired(), RequestDisableLogicalDecoding(), RS_INVAL_NONE, SlotIsLogical, SpinLockAcquire, and SpinLockRelease.

Referenced by dbase_redo(), and dropdb().

◆ ReplicationSlotSetInactiveSince()

static void ReplicationSlotSetInactiveSince ( ReplicationSlot s,
TimestampTz  ts,
bool  acquire_lock 
)
inlinestatic

◆ ReplicationSlotsShmemInit()

void ReplicationSlotsShmemInit ( void  )

Definition at line 206 of file slot.c.

207{
208 bool found;
209
210 if (max_replication_slots == 0)
211 return;
212
214 ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
215 &found);
216
217 if (!found)
218 {
219 int i;
220
221 /* First time through, so initialize */
223
224 for (i = 0; i < max_replication_slots; i++)
225 {
227
228 /* everything else is zeroed by the memset above */
229 SpinLockInit(&slot->mutex);
231 LWTRANCHE_REPLICATION_SLOT_IO);
233 }
234 }
235}
#define MemSet(start, val, len)
Definition: c.h:1019
void ConditionVariableInit(ConditionVariable *cv)
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:698
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:389
Size ReplicationSlotsShmemSize(void)
Definition: slot.c:188
#define SpinLockInit(lock)
Definition: spin.h:57
LWLock io_in_progress_lock
Definition: slot.h:213

References ReplicationSlot::active_cv, ConditionVariableInit(), i, ReplicationSlot::io_in_progress_lock, LWLockInitialize(), max_replication_slots, MemSet, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotsShmemSize(), ShmemInitStruct(), and SpinLockInit.

Referenced by CreateOrAttachShmemStructs().

◆ ReplicationSlotsShmemSize()

Size ReplicationSlotsShmemSize ( void  )

Definition at line 188 of file slot.c.

189{
190 Size size = 0;
191
192 if (max_replication_slots == 0)
193 return size;
194
195 size = offsetof(ReplicationSlotCtlData, replication_slots);
196 size = add_size(size,
198
199 return size;
200}
size_t Size
Definition: c.h:625
Size add_size(Size s1, Size s2)
Definition: shmem.c:495
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510

References add_size(), max_replication_slots, and mul_size().

Referenced by CalculateShmemSize(), and ReplicationSlotsShmemInit().

◆ ReplicationSlotValidateName()

bool ReplicationSlotValidateName ( const char *  name,
bool  allow_reserved_name,
int  elevel 
)

Definition at line 266 of file slot.c.

268{
269 int err_code;
270 char *err_msg = NULL;
271 char *err_hint = NULL;
272
273 if (!ReplicationSlotValidateNameInternal(name, allow_reserved_name,
274 &err_code, &err_msg, &err_hint))
275 {
276 /*
277 * Use errmsg_internal() and errhint_internal() instead of errmsg()
278 * and errhint(), since the messages from
279 * ReplicationSlotValidateNameInternal() are already translated. This
280 * avoids double translation.
281 */
282 ereport(elevel,
283 errcode(err_code),
284 errmsg_internal("%s", err_msg),
285 (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
286
287 pfree(err_msg);
288 if (err_hint != NULL)
289 pfree(err_hint);
290 return false;
291 }
292
293 return true;
294}
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1170
int errhint_internal(const char *fmt,...)
Definition: elog.c:1352
bool ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name, int *err_code, char **err_msg, char **err_hint)
Definition: slot.c:311

References ereport, errcode(), errhint_internal(), errmsg_internal(), name, pfree(), and ReplicationSlotValidateNameInternal().

Referenced by parse_subscription_options(), ReplicationSlotCreate(), and StartupReorderBuffer().

◆ ReplicationSlotValidateNameInternal()

bool ReplicationSlotValidateNameInternal ( const char *  name,
bool  allow_reserved_name,
int *  err_code,
char **  err_msg,
char **  err_hint 
)

Definition at line 311 of file slot.c.

313{
314 const char *cp;
315
316 if (strlen(name) == 0)
317 {
318 *err_code = ERRCODE_INVALID_NAME;
319 *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
320 *err_hint = NULL;
321 return false;
322 }
323
324 if (strlen(name) >= NAMEDATALEN)
325 {
326 *err_code = ERRCODE_NAME_TOO_LONG;
327 *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
328 *err_hint = NULL;
329 return false;
330 }
331
332 for (cp = name; *cp; cp++)
333 {
334 if (!((*cp >= 'a' && *cp <= 'z')
335 || (*cp >= '0' && *cp <= '9')
336 || (*cp == '_')))
337 {
338 *err_code = ERRCODE_INVALID_NAME;
339 *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
340 *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
341 return false;
342 }
343 }
344
345 if (!allow_reserved_name && IsSlotForConflictCheck(name))
346 {
347 *err_code = ERRCODE_RESERVED_NAME;
348 *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
349 *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
351 return false;
352 }
353
354 return true;
355}
#define _(x)
Definition: elog.c:91
#define NAMEDATALEN
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43
#define CONFLICT_DETECTION_SLOT
Definition: slot.h:28

References _, CONFLICT_DETECTION_SLOT, IsSlotForConflictCheck(), name, NAMEDATALEN, and psprintf().

Referenced by check_primary_slot_name(), ReplicationSlotValidateName(), and validate_sync_standby_slots().

◆ SearchNamedReplicationSlot()

ReplicationSlot * SearchNamedReplicationSlot ( const char *  name,
bool  need_lock 
)

Definition at line 540 of file slot.c.

541{
542 int i;
543 ReplicationSlot *slot = NULL;
544
545 if (need_lock)
546 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
547
548 for (i = 0; i < max_replication_slots; i++)
549 {
551
552 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
553 {
554 slot = s;
555 break;
556 }
557 }
558
559 if (need_lock)
560 LWLockRelease(ReplicationSlotControlLock);
561
562 return slot;
563}

References ReplicationSlot::data, i, ReplicationSlot::in_use, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, name, ReplicationSlotPersistentData::name, NameStr, ReplicationSlotCtlData::replication_slots, and ReplicationSlotCtl.

Referenced by acquire_conflict_slot_if_exists(), get_replslot_index(), pg_ls_replslotdir(), pgstat_reset_replslot(), ReadReplicationSlot(), ReplicationSlotAcquire(), StandbySlotsHaveCaughtup(), and synchronize_one_slot().

◆ SlotExistsInSyncStandbySlots()

bool SlotExistsInSyncStandbySlots ( const char *  slot_name)

Definition at line 3050 of file slot.c.

3051{
3052 const char *standby_slot_name;
3053
3054 /* Return false if there is no value in synchronized_standby_slots */
3056 return false;
3057
3058 /*
3059 * XXX: We are not expecting this list to be long so a linear search
3060 * shouldn't hurt but if that turns out not to be true then we can cache
3061 * this information for each WalSender as well.
3062 */
3063 standby_slot_name = synchronized_standby_slots_config->slot_names;
3064 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3065 {
3066 if (strcmp(standby_slot_name, slot_name) == 0)
3067 return true;
3068
3069 standby_slot_name += strlen(standby_slot_name) + 1;
3070 }
3071
3072 return false;
3073}
static SyncStandbySlotsConfigData * synchronized_standby_slots_config
Definition: slot.c:167
char slot_names[FLEXIBLE_ARRAY_MEMBER]
Definition: slot.c:101

References i, SyncStandbySlotsConfigData::nslotnames, SyncStandbySlotsConfigData::slot_names, and synchronized_standby_slots_config.

Referenced by PhysicalWakeupLogicalWalSnd().

◆ StandbySlotsHaveCaughtup()

bool StandbySlotsHaveCaughtup ( XLogRecPtr  wait_for_lsn,
int  elevel 
)

Definition at line 3083 of file slot.c.

3084{
3085 const char *name;
3086 int caught_up_slot_num = 0;
3087 XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
3088
3089 /*
3090 * Don't need to wait for the standbys to catch up if there is no value in
3091 * synchronized_standby_slots.
3092 */
3094 return true;
3095
3096 /*
3097 * Don't need to wait for the standbys to catch up if we are on a standby
3098 * server, since we do not support syncing slots to cascading standbys.
3099 */
3100 if (RecoveryInProgress())
3101 return true;
3102
3103 /*
3104 * Don't need to wait for the standbys to catch up if they are already
3105 * beyond the specified WAL location.
3106 */
3108 ss_oldest_flush_lsn >= wait_for_lsn)
3109 return true;
3110
3111 /*
3112 * To prevent concurrent slot dropping and creation while filtering the
3113 * slots, take the ReplicationSlotControlLock outside of the loop.
3114 */
3115 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
3116
3118 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3119 {
3120 XLogRecPtr restart_lsn;
3121 bool invalidated;
3122 bool inactive;
3123 ReplicationSlot *slot;
3124
3125 slot = SearchNamedReplicationSlot(name, false);
3126
3127 /*
3128 * If a slot name provided in synchronized_standby_slots does not
3129 * exist, report a message and exit the loop.
3130 */
3131 if (!slot)
3132 {
3133 ereport(elevel,
3134 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3135 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
3136 name, "synchronized_standby_slots"),
3137 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3138 name),
3139 errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
3140 name, "synchronized_standby_slots"));
3141 break;
3142 }
3143
3144 /* Same as above: if a slot is not physical, exit the loop. */
3145 if (SlotIsLogical(slot))
3146 {
3147 ereport(elevel,
3148 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3149 errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
3150 name, "synchronized_standby_slots"),
3151 errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
3152 name),
3153 errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
3154 name, "synchronized_standby_slots"));
3155 break;
3156 }
3157
3158 SpinLockAcquire(&slot->mutex);
3159 restart_lsn = slot->data.restart_lsn;
3160 invalidated = slot->data.invalidated != RS_INVAL_NONE;
3161 inactive = slot->active_pid == 0;
3162 SpinLockRelease(&slot->mutex);
3163
3164 if (invalidated)
3165 {
3166 /* Specified physical slot has been invalidated */
3167 ereport(elevel,
3168 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3169 errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
3170 name, "synchronized_standby_slots"),
3171 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3172 name),
3173 errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
3174 name, "synchronized_standby_slots"));
3175 break;
3176 }
3177
3178 if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
3179 {
3180 /* Log a message if no active_pid for this physical slot */
3181 if (inactive)
3182 ereport(elevel,
3183 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3184 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3185 name, "synchronized_standby_slots"),
3186 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3187 name),
3188 errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3189 name, "synchronized_standby_slots"));
3190
3191 /* Continue if the current slot hasn't caught up. */
3192 break;
3193 }
3194
3195 Assert(restart_lsn >= wait_for_lsn);
3196
3197 if (!XLogRecPtrIsValid(min_restart_lsn) ||
3198 min_restart_lsn > restart_lsn)
3199 min_restart_lsn = restart_lsn;
3200
3201 caught_up_slot_num++;
3202
3203 name += strlen(name) + 1;
3204 }
3205
3206 LWLockRelease(ReplicationSlotControlLock);
3207
3208 /*
3209 * Return false if not all the standbys have caught up to the specified
3210 * WAL location.
3211 */
3212 if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
3213 return false;
3214
3215 /* The ss_oldest_flush_lsn must not retreat. */
3217 min_restart_lsn >= ss_oldest_flush_lsn);
3218
3219 ss_oldest_flush_lsn = min_restart_lsn;
3220
3221 return true;
3222}
static XLogRecPtr ss_oldest_flush_lsn
Definition: slot.c:173

References ReplicationSlot::active_pid, Assert(), ReplicationSlot::data, ereport, errcode(), errdetail(), errhint(), errmsg(), i, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, name, SyncStandbySlotsConfigData::nslotnames, RecoveryInProgress(), ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, SearchNamedReplicationSlot(), SyncStandbySlotsConfigData::slot_names, SlotIsLogical, SpinLockAcquire, SpinLockRelease, ss_oldest_flush_lsn, synchronized_standby_slots_config, and XLogRecPtrIsValid.

Referenced by NeedToWaitForStandbys(), and WaitForStandbyConfirmation().

◆ StartupReplicationSlots()

void StartupReplicationSlots ( void  )

Definition at line 2378 of file slot.c.

2379{
2380 DIR *replication_dir;
2381 struct dirent *replication_de;
2382
2383 elog(DEBUG1, "starting up replication slots");
2384
2385 /* restore all slots by iterating over all on-disk entries */
2386 replication_dir = AllocateDir(PG_REPLSLOT_DIR);
2387 while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
2388 {
2389 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2390 PGFileType de_type;
2391
2392 if (strcmp(replication_de->d_name, ".") == 0 ||
2393 strcmp(replication_de->d_name, "..") == 0)
2394 continue;
2395
2396 snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2397 de_type = get_dirent_type(path, replication_de, false, DEBUG1);
2398
2399 /* we're only creating directories here, skip if it's not our's */
2400 if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
2401 continue;
2402
2403 /* we crashed while a slot was being setup or deleted, clean up */
2404 if (pg_str_endswith(replication_de->d_name, ".tmp"))
2405 {
2406 if (!rmtree(path, true))
2407 {
2409 (errmsg("could not remove directory \"%s\"",
2410 path)));
2411 continue;
2412 }
2414 continue;
2415 }
2416
2417 /* looks like a slot in a normal state, restore */
2418 RestoreSlotFromDisk(replication_de->d_name);
2419 }
2420 FreeDir(replication_dir);
2421
2422 /* currently no slots exist, we're done. */
2423 if (max_replication_slots <= 0)
2424 return;
2425
2426 /* Now that we have recovered all the data, compute replication xmin */
2429}
#define WARNING
Definition: elog.h:36
int FreeDir(DIR *dir)
Definition: fd.c:3005
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:753
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2887
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2953
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Definition: file_utils.c:547
PGFileType
Definition: file_utils.h:19
@ PGFILETYPE_DIR
Definition: file_utils.h:23
@ PGFILETYPE_ERROR
Definition: file_utils.h:20
bool rmtree(const char *path, bool rmtopdir)
Definition: rmtree.c:50
static void RestoreSlotFromDisk(const char *name)
Definition: slot.c:2663
bool pg_str_endswith(const char *str, const char *end)
Definition: string.c:31
Definition: dirent.c:26
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15

References AllocateDir(), dirent::d_name, DEBUG1, elog, ereport, errmsg(), FreeDir(), fsync_fname(), get_dirent_type(), max_replication_slots, MAXPGPATH, PG_REPLSLOT_DIR, pg_str_endswith(), PGFILETYPE_DIR, PGFILETYPE_ERROR, ReadDir(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), RestoreSlotFromDisk(), rmtree(), snprintf, and WARNING.

Referenced by StartupXLOG().

◆ WaitForStandbyConfirmation()

void WaitForStandbyConfirmation ( XLogRecPtr  wait_for_lsn)

Definition at line 3231 of file slot.c.

3232{
3233 /*
3234 * Don't need to wait for the standby to catch up if the current acquired
3235 * slot is not a logical failover slot, or there is no value in
3236 * synchronized_standby_slots.
3237 */
3239 return;
3240
3242
3243 for (;;)
3244 {
3246
3248 {
3249 ConfigReloadPending = false;
3251 }
3252
3253 /* Exit if done waiting for every slot. */
3254 if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
3255 break;
3256
3257 /*
3258 * Wait for the slots in the synchronized_standby_slots to catch up,
3259 * but use a timeout (1s) so we can also check if the
3260 * synchronized_standby_slots has been changed.
3261 */
3263 WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
3264 }
3265
3267}
bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info)
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
@ PGC_SIGHUP
Definition: guc.h:75
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition: slot.c:3083
ConditionVariable wal_confirm_rcv_cv
WalSndCtlData * WalSndCtl
Definition: walsender.c:117

References CHECK_FOR_INTERRUPTS, ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), ConditionVariableTimedSleep(), ConfigReloadPending, ReplicationSlot::data, ReplicationSlotPersistentData::failover, MyReplicationSlot, PGC_SIGHUP, ProcessConfigFile(), StandbySlotsHaveCaughtup(), synchronized_standby_slots_config, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtl, and WARNING.

Referenced by LogicalSlotAdvanceAndCheckSnapState(), and pg_logical_slot_get_changes_guts().

Variable Documentation

◆ idle_replication_slot_timeout_secs

PGDLLIMPORT int idle_replication_slot_timeout_secs
extern

◆ max_replication_slots

◆ MyReplicationSlot

PGDLLIMPORT ReplicationSlot* MyReplicationSlot
extern

Definition at line 148 of file slot.c.

Referenced by abort_logical_decoding_activation(), ApplyLauncherMain(), binary_upgrade_logical_slot_has_caught_up(), compute_min_nonremovable_xid(), copy_replication_slot(), create_logical_replication_slot(), create_physical_replication_slot(), CreateConflictDetectionSlot(), CreateDecodingContext(), CreateInitDecodingContext(), CreateReplicationSlot(), DisableLogicalDecodingIfNecessary(), EnsureLogicalDecodingEnabled(), init_conflict_slot_xmin(), InvalidatePossiblyObsoleteSlot(), LogicalConfirmReceivedLocation(), LogicalIncreaseRestartDecodingForSlot(), LogicalIncreaseXminForSlot(), logicalrep_worker_launch(), LogicalReplicationSlotHasPendingWal(), LogicalSlotAdvanceAndCheckSnapState(), NeedToWaitForStandbys(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_logical_slot_get_changes_guts(), pg_physical_replication_slot_advance(), pg_replication_slot_advance(), PhysicalConfirmReceivedLocation(), PhysicalReplicationSlotNewXmin(), PhysicalWakeupLogicalWalSnd(), PostgresMain(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyReplyMessage(), ReorderBufferAllocate(), ReorderBufferFree(), ReorderBufferRestoreChanges(), ReorderBufferRestoreCleanup(), ReorderBufferSerializedPath(), ReorderBufferSerializeTXN(), ReplicationSlotAcquire(), ReplicationSlotAlter(), ReplicationSlotCleanup(), ReplicationSlotCreate(), ReplicationSlotDrop(), ReplicationSlotDropAcquired(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), ReplicationSlotsDropDBSlots(), ReplicationSlotShmemExit(), reserve_wal_for_local_slot(), slotsync_failure_callback(), slotsync_worker_onexit(), StartLogicalReplication(), StartReplication(), StartupDecodingContext(), synchronize_one_slot(), update_and_persist_local_synced_slot(), update_conflict_slot_xmin(), update_local_synced_slot(), update_slotsync_skip_stats(), WaitForStandbyConfirmation(), and WalSndErrorCleanup().

◆ ReplicationSlotCtl

◆ synchronized_standby_slots

PGDLLIMPORT char* synchronized_standby_slots
extern

Definition at line 164 of file slot.c.