PostgreSQL Source Code git master
slot.h
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 * slot.h
3 * Replication slot management.
4 *
5 * Copyright (c) 2012-2025, PostgreSQL Global Development Group
6 *
7 *-------------------------------------------------------------------------
8 */
9#ifndef SLOT_H
10#define SLOT_H
11
12#include "access/xlog.h"
13#include "access/xlogreader.h"
15#include "storage/lwlock.h"
16#include "storage/shmem.h"
17#include "storage/spin.h"
19
20/* directory to store replication slot data in */
21#define PG_REPLSLOT_DIR "pg_replslot"
22
23/*
24 * Behaviour of replication slots, upon release or crash.
25 *
26 * Slots marked as PERSISTENT are crash-safe and will not be dropped when
27 * released. Slots marked as EPHEMERAL will be dropped when released or after
28 * restarts. Slots marked TEMPORARY will be dropped at the end of a session
29 * or on error.
30 *
31 * EPHEMERAL is used as a not-quite-ready state when creating persistent
32 * slots. EPHEMERAL slots can be made PERSISTENT by calling
33 * ReplicationSlotPersist(). For a slot that goes away at the end of a
34 * session, TEMPORARY is the appropriate choice.
35 */
37{
42
43/*
44 * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
45 * 'invalidated' field is set to a value other than _NONE.
46 *
47 * When adding a new invalidation cause here, the value must be powers of 2
48 * (e.g., 1, 2, 4...) for proper bitwise operations. Also, remember to update
49 * RS_INVAL_MAX_CAUSES below, and SlotInvalidationCauses in slot.c.
50 */
52{
54 /* required WAL has been removed */
56 /* required rows have been removed */
57 RS_INVAL_HORIZON = (1 << 1),
58 /* wal_level insufficient for slot */
60 /* idle slot timeout has occurred */
63
64/* Maximum number of invalidation causes */
65#define RS_INVAL_MAX_CAUSES 4
66
67/*
68 * On-Disk data of a replication slot, preserved across restarts.
69 */
71{
72 /* The slot's identifier */
74
75 /* database the slot is active on */
77
78 /*
79 * The slot's behaviour when being dropped (or restored after a crash).
80 */
82
83 /*
84 * xmin horizon for data
85 *
86 * NB: This may represent a value that hasn't been written to disk yet;
87 * see notes for effective_xmin, below.
88 */
90
91 /*
92 * xmin horizon for catalog tuples
93 *
94 * NB: This may represent a value that hasn't been written to disk yet;
95 * see notes for effective_xmin, below.
96 */
98
99 /* oldest LSN that might be required by this replication slot */
101
102 /* RS_INVAL_NONE if valid, or the reason for having been invalidated */
104
105 /*
106 * Oldest LSN that the client has acked receipt for. This is used as the
107 * start_lsn point in case the client doesn't specify one, and also as a
108 * safety measure to jump forwards in case the client specifies a
109 * start_lsn that's further in the past than this value.
110 */
112
113 /*
114 * LSN at which we enabled two_phase commit for this slot or LSN at which
115 * we found a consistent point at the time of slot creation.
116 */
118
119 /*
120 * Allow decoding of prepared transactions?
121 */
123
124 /* plugin name */
126
127 /*
128 * Was this slot synchronized from the primary server?
129 */
130 char synced;
131
132 /*
133 * Is this a failover slot (sync candidate for standbys)? Only relevant
134 * for logical slots on the primary server.
135 */
138
139/*
140 * Shared memory state of a single replication slot.
141 *
142 * The in-memory data of replication slots follows a locking model based
143 * on two linked concepts:
144 * - A replication slot's in_use flag is switched when added or discarded using
145 * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive
146 * mode when updating the flag by the backend owning the slot and doing the
147 * operation, while readers (concurrent backends not owning the slot) need
148 * to hold it in shared mode when looking at replication slot data.
149 * - Individual fields are protected by mutex where only the backend owning
150 * the slot is authorized to update the fields from its own slot. The
151 * backend owning the slot does not need to take this lock when reading its
152 * own fields, while concurrent backends not owning this slot should take the
153 * lock when reading this slot's data.
154 */
155typedef struct ReplicationSlot
156{
157 /* lock, on same cacheline as effective_xmin */
158 slock_t mutex;
159
160 /* is this slot defined */
161 bool in_use;
162
163 /* Who is streaming out changes for this slot? 0 in unused slots. */
165
166 /* any outstanding modifications? */
168 bool dirty;
169
170 /*
171 * For logical decoding, it's extremely important that we never remove any
172 * data that's still needed for decoding purposes, even after a crash;
173 * otherwise, decoding will produce wrong answers. Ordinary streaming
174 * replication also needs to prevent old row versions from being removed
175 * too soon, but the worst consequence we might encounter there is
176 * unwanted query cancellations on the standby. Thus, for logical
177 * decoding, this value represents the latest xmin that has actually been
178 * written to disk, whereas for streaming replication, it's just the same
179 * as the persistent value (data.xmin).
180 */
183
184 /* data surviving shutdowns and crashes */
186
187 /* is somebody performing io on this slot? */
189
190 /* Condition variable signaled when active_pid changes */
192
193 /* all the remaining data is only used for logical slots */
194
195 /*
196 * When the client has confirmed flushes >= candidate_xmin_lsn we can
197 * advance the catalog xmin. When restart_valid has been passed,
198 * restart_lsn can be increased.
199 */
204
205 /*
206 * This value tracks the last confirmed_flush LSN flushed which is used
207 * during a shutdown checkpoint to decide if logical's slot data should be
208 * forcibly flushed or not.
209 */
211
212 /*
213 * The time when the slot became inactive. For synced slots on a standby
214 * server, it represents the time when slot synchronization was most
215 * recently stopped.
216 */
219
220#define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
221#define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)
222
223/*
224 * Shared memory control area for all of replication slots.
225 */
227{
228 /*
229 * This array should be declared [FLEXIBLE_ARRAY_MEMBER], but for some
230 * reason you can't do that in an otherwise-empty struct.
231 */
234
235/*
236 * Set slot's inactive_since property unless it was previously invalidated.
237 */
238static inline void
240 bool acquire_lock)
241{
242 if (acquire_lock)
244
246 s->inactive_since = ts;
247
248 if (acquire_lock)
250}
251
252/*
253 * Pointers to shared memory
254 */
257
258/* GUCs */
262
263/* shmem initialization functions */
265extern void ReplicationSlotsShmemInit(void);
266
267/* management of individual slots */
268extern void ReplicationSlotCreate(const char *name, bool db_specific,
269 ReplicationSlotPersistency persistency,
270 bool two_phase, bool failover,
271 bool synced);
272extern void ReplicationSlotPersist(void);
273extern void ReplicationSlotDrop(const char *name, bool nowait);
274extern void ReplicationSlotDropAcquired(void);
275extern void ReplicationSlotAlter(const char *name, const bool *failover,
276 const bool *two_phase);
277
278extern void ReplicationSlotAcquire(const char *name, bool nowait,
279 bool error_if_invalid);
280extern void ReplicationSlotRelease(void);
281extern void ReplicationSlotCleanup(bool synced_only);
282extern void ReplicationSlotSave(void);
283extern void ReplicationSlotMarkDirty(void);
284
285/* misc stuff */
286extern void ReplicationSlotInitialize(void);
287extern bool ReplicationSlotValidateName(const char *name, int elevel);
288extern void ReplicationSlotReserveWal(void);
289extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
290extern void ReplicationSlotsComputeRequiredLSN(void);
292extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
293extern void ReplicationSlotsDropDBSlots(Oid dboid);
294extern bool InvalidateObsoleteReplicationSlots(uint32 possible_causes,
295 XLogSegNo oldestSegno,
296 Oid dboid,
297 TransactionId snapshotConflictHorizon);
298extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
299extern int ReplicationSlotIndex(ReplicationSlot *slot);
300extern bool ReplicationSlotName(int index, Name name);
301extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot);
302extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
303
304extern void StartupReplicationSlots(void);
305extern void CheckPointReplicationSlots(bool is_shutdown);
306
307extern void CheckSlotRequirements(void);
308extern void CheckSlotPermissions(void);
310 GetSlotInvalidationCause(const char *invalidation_reason);
312
313extern bool SlotExistsInSyncStandbySlots(const char *slot_name);
314extern bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel);
315extern void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn);
316
317#endif /* SLOT_H */
#define PGDLLIMPORT
Definition: c.h:1291
uint32_t uint32
Definition: c.h:502
uint32 TransactionId
Definition: c.h:623
size_t Size
Definition: c.h:576
int64 TimestampTz
Definition: timestamp.h:39
static bool two_phase
unsigned int Oid
Definition: postgres_ext.h:32
int ReplicationSlotIndex(ReplicationSlot *slot)
Definition: slot.c:512
struct ReplicationSlotCtlData ReplicationSlotCtlData
PGDLLIMPORT char * synchronized_standby_slots
Definition: slot.c:163
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition: slot.c:559
PGDLLIMPORT ReplicationSlot * MyReplicationSlot
Definition: slot.c:147
void CheckPointReplicationSlots(bool is_shutdown)
Definition: slot.c:2032
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition: slot.c:324
void ReplicationSlotDropAcquired(void)
Definition: slot.c:919
void ReplicationSlotMarkDirty(void)
Definition: slot.c:1061
void ReplicationSlotReserveWal(void)
Definition: slot.c:1452
bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
Definition: slot.c:1263
PGDLLIMPORT int idle_replication_slot_timeout_mins
Definition: slot.c:157
ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *invalidation_reason)
Definition: slot.c:2607
void ReplicationSlotsDropDBSlots(Oid dboid)
Definition: slot.c:1321
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:1205
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:1100
void ReplicationSlotPersist(void)
Definition: slot.c:1078
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:814
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition: slot.c:2770
struct ReplicationSlotPersistentData ReplicationSlotPersistentData
ReplicationSlotPersistency
Definition: slot.h:37
@ RS_PERSISTENT
Definition: slot.h:38
@ RS_EPHEMERAL
Definition: slot.h:39
@ RS_TEMPORARY
Definition: slot.h:40
void ReplicationSlotSave(void)
Definition: slot.c:1043
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:479
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition: tablesync.c:1273
void CheckSlotPermissions(void)
Definition: slot.c:1435
bool ReplicationSlotName(int index, Name name)
Definition: slot.c:528
void ReplicationSlotsShmemInit(void)
Definition: slot.c:204
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
Definition: slot.c:837
void ReplicationSlotRelease(void)
Definition: slot.c:686
void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
Definition: slot.c:2957
PGDLLIMPORT ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:144
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition: slot.c:2803
ReplicationSlotInvalidationCause
Definition: slot.h:52
@ RS_INVAL_WAL_REMOVED
Definition: slot.h:55
@ RS_INVAL_IDLE_TIMEOUT
Definition: slot.h:61
@ RS_INVAL_HORIZON
Definition: slot.h:57
@ RS_INVAL_WAL_LEVEL
Definition: slot.h:59
@ RS_INVAL_NONE
Definition: slot.h:53
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:1156
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:775
void ReplicationSlotInitialize(void)
Definition: slot.c:239
PGDLLIMPORT int max_replication_slots
Definition: slot.c:150
struct ReplicationSlot ReplicationSlot
void StartupReplicationSlots(void)
Definition: slot.c:2089
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void CheckSlotRequirements(void)
Definition: slot.c:1413
bool InvalidateObsoleteReplicationSlots(uint32 possible_causes, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
Definition: slot.c:1976
static void ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
Definition: slot.h:239
Size ReplicationSlotsShmemSize(void)
Definition: slot.c:186
const char * GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
Definition: slot.c:2627
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:267
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
Definition: lwlock.h:42
ReplicationSlot replication_slots[1]
Definition: slot.h:232
TransactionId xmin
Definition: slot.h:89
TransactionId catalog_xmin
Definition: slot.h:97
XLogRecPtr confirmed_flush
Definition: slot.h:111
ReplicationSlotPersistency persistency
Definition: slot.h:81
ReplicationSlotInvalidationCause invalidated
Definition: slot.h:103
XLogRecPtr candidate_xmin_lsn
Definition: slot.h:201
TransactionId effective_catalog_xmin
Definition: slot.h:182
slock_t mutex
Definition: slot.h:158
XLogRecPtr candidate_restart_valid
Definition: slot.h:202
XLogRecPtr last_saved_confirmed_flush
Definition: slot.h:210
pid_t active_pid
Definition: slot.h:164
bool in_use
Definition: slot.h:161
TransactionId effective_xmin
Definition: slot.h:181
bool just_dirtied
Definition: slot.h:167
XLogRecPtr candidate_restart_lsn
Definition: slot.h:203
LWLock io_in_progress_lock
Definition: slot.h:188
ConditionVariable active_cv
Definition: slot.h:191
TransactionId candidate_catalog_xmin
Definition: slot.h:200
bool dirty
Definition: slot.h:168
ReplicationSlotPersistentData data
Definition: slot.h:185
TimestampTz inactive_since
Definition: slot.h:217
Definition: type.h:96
Definition: c.h:712
const char * name
static WalReceiverConn * wrconn
Definition: walreceiver.c:92
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint64 XLogSegNo
Definition: xlogdefs.h:48