PostgreSQL Source Code git master
Loading...
Searching...
No Matches
slot.h
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 * slot.h
3 * Replication slot management.
4 *
5 * Copyright (c) 2012-2026, PostgreSQL Global Development Group
6 *
7 *-------------------------------------------------------------------------
8 */
9#ifndef SLOT_H
10#define SLOT_H
11
12#include "access/xlog.h"
13#include "access/xlogreader.h"
15#include "storage/lwlock.h"
16#include "storage/shmem.h"
17#include "storage/spin.h"
19
20/* directory to store replication slot data in */
21#define PG_REPLSLOT_DIR "pg_replslot"
22
23/*
24 * The reserved name for a replication slot used to retain dead tuples for
25 * conflict detection in logical replication. See
26 * maybe_advance_nonremovable_xid() for detail.
27 */
28#define CONFLICT_DETECTION_SLOT "pg_conflict_detection"
29
30/*
31 * Behaviour of replication slots, upon release or crash.
32 *
33 * Slots marked as PERSISTENT are crash-safe and will not be dropped when
34 * released. Slots marked as EPHEMERAL will be dropped when released or after
35 * restarts. Slots marked TEMPORARY will be dropped at the end of a session
36 * or on error.
37 *
38 * EPHEMERAL is used as a not-quite-ready state when creating persistent
39 * slots. EPHEMERAL slots can be made PERSISTENT by calling
40 * ReplicationSlotPersist(). For a slot that goes away at the end of a
41 * session, TEMPORARY is the appropriate choice.
42 */
49
50/*
51 * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
52 * 'invalidated' field is set to a value other than _NONE.
53 *
54 * When adding a new invalidation cause here, the value must be powers of 2
55 * (e.g., 1, 2, 4...) for proper bitwise operations. Also, remember to update
56 * RS_INVAL_MAX_CAUSES below, and SlotInvalidationCauses in slot.c.
57 */
59{
61 /* required WAL has been removed */
63 /* required rows have been removed */
64 RS_INVAL_HORIZON = (1 << 1),
65 /* wal_level insufficient for slot */
67 /* idle slot timeout has occurred */
70
71/* Maximum number of invalidation causes */
72#define RS_INVAL_MAX_CAUSES 4
73
74/*
75 * When the slot synchronization worker is running, or when
76 * pg_sync_replication_slots is executed, slot synchronization may be
77 * skipped. This enum defines the possible reasons for skipping slot
78 * synchronization.
79 */
81{
82 SS_SKIP_NONE, /* No skip */
83 SS_SKIP_WAL_NOT_FLUSHED, /* Standby did not flush the wal corresponding
84 * to confirmed flush of remote slot */
85 SS_SKIP_WAL_OR_ROWS_REMOVED, /* Remote slot is behind; required WAL or
86 * rows may be removed or at risk */
87 SS_SKIP_NO_CONSISTENT_SNAPSHOT, /* Standby could not build a consistent
88 * snapshot */
89 SS_SKIP_INVALID /* Local slot is invalid */
91
92/*
93 * On-Disk data of a replication slot, preserved across restarts.
94 */
96{
97 /* The slot's identifier */
99
100 /* database the slot is active on */
102
103 /*
104 * The slot's behaviour when being dropped (or restored after a crash).
105 */
107
108 /*
109 * xmin horizon for data
110 *
111 * NB: This may represent a value that hasn't been written to disk yet;
112 * see notes for effective_xmin, below.
113 */
115
116 /*
117 * xmin horizon for catalog tuples
118 *
119 * NB: This may represent a value that hasn't been written to disk yet;
120 * see notes for effective_xmin, below.
121 */
123
124 /* oldest LSN that might be required by this replication slot */
126
127 /* RS_INVAL_NONE if valid, or the reason for having been invalidated */
129
130 /*
131 * Oldest LSN that the client has acked receipt for. This is used as the
132 * start_lsn point in case the client doesn't specify one, and also as a
133 * safety measure to jump forwards in case the client specifies a
134 * start_lsn that's further in the past than this value.
135 */
137
138 /*
139 * LSN at which we enabled two_phase commit for this slot or LSN at which
140 * we found a consistent point at the time of slot creation.
141 */
143
144 /*
145 * Allow decoding of prepared transactions?
146 */
148
149 /* plugin name */
151
152 /*
153 * Was this slot synchronized from the primary server?
154 */
155 bool synced;
156
157 /*
158 * Is this a failover slot (sync candidate for standbys)? Only relevant
159 * for logical slots on the primary server.
160 */
163
164/*
165 * Shared memory state of a single replication slot.
166 *
167 * The in-memory data of replication slots follows a locking model based
168 * on two linked concepts:
169 * - A replication slot's in_use flag is switched when added or discarded using
170 * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive
171 * mode when updating the flag by the backend owning the slot and doing the
172 * operation, while readers (concurrent backends not owning the slot) need
173 * to hold it in shared mode when looking at replication slot data.
174 * - Individual fields are protected by mutex where only the backend owning
175 * the slot is authorized to update the fields from its own slot. The
176 * backend owning the slot does not need to take this lock when reading its
177 * own fields, while concurrent backends not owning this slot should take the
178 * lock when reading this slot's data.
179 */
180typedef struct ReplicationSlot
181{
182 /* lock, on same cacheline as effective_xmin */
184
185 /* is this slot defined */
186 bool in_use;
187
188 /*
189 * Who is streaming out changes for this slot? INVALID_PROC_NUMBER in
190 * unused slots.
191 */
193
194 /* any outstanding modifications? */
196 bool dirty;
197
198 /*
199 * For logical decoding, it's extremely important that we never remove any
200 * data that's still needed for decoding purposes, even after a crash;
201 * otherwise, decoding will produce wrong answers. Ordinary streaming
202 * replication also needs to prevent old row versions from being removed
203 * too soon, but the worst consequence we might encounter there is
204 * unwanted query cancellations on the standby. Thus, for logical
205 * decoding, this value represents the latest xmin that has actually been
206 * written to disk, whereas for streaming replication, it's just the same
207 * as the persistent value (data.xmin).
208 */
211
212 /* data surviving shutdowns and crashes */
214
215 /* is somebody performing io on this slot? */
217
218 /* Condition variable signaled when active_proc changes */
220
221 /* all the remaining data is only used for logical slots */
222
223 /*
224 * When the client has confirmed flushes >= candidate_xmin_lsn we can
225 * advance the catalog xmin. When restart_valid has been passed,
226 * restart_lsn can be increased.
227 */
232
233 /*
234 * This value tracks the last confirmed_flush LSN flushed which is used
235 * during a shutdown checkpoint to decide if logical's slot data should be
236 * forcibly flushed or not.
237 */
239
240 /*
241 * The time when the slot became inactive. For synced slots on a standby
242 * server, it represents the time when slot synchronization was most
243 * recently stopped.
244 */
246
247 /*
248 * Latest restart_lsn that has been flushed to disk. For persistent slots
249 * the flushed LSN should be taken into account when calculating the
250 * oldest LSN for WAL segments removal.
251 *
252 * Do not assume that restart_lsn will always move forward, i.e., that the
253 * previously flushed restart_lsn is always behind data.restart_lsn. In
254 * streaming replication using a physical slot, the restart_lsn is updated
255 * based on the flushed WAL position reported by the walreceiver.
256 *
257 * This replication mode allows duplicate WAL records to be received and
258 * overwritten. If the walreceiver receives older WAL records and then
259 * reports them as flushed to the walsender, the restart_lsn may appear to
260 * move backward.
261 *
262 * This typically occurs at the beginning of replication. One reason is
263 * that streaming replication starts at the beginning of a segment, so, if
264 * restart_lsn is in the middle of a segment, it will be updated to an
265 * earlier LSN, see RequestXLogStreaming. Another reason is that the
266 * walreceiver chooses its startpoint based on the replayed LSN, so, if
267 * some records have been received but not yet applied, they will be
268 * received again and leads to updating the restart_lsn to an earlier
269 * position.
270 */
272
273 /*
274 * Reason for the most recent slot synchronization skip.
275 *
276 * Slot sync skips can occur for both temporary and persistent replication
277 * slots. They are more common for temporary slots, but persistent slots
278 * may also skip synchronization in rare cases (e.g.,
279 * SS_SKIP_WAL_NOT_FLUSHED or SS_SKIP_WAL_OR_ROWS_REMOVED).
280 *
281 * Since, temporary slots are dropped after server restart, persisting
282 * slotsync_skip_reason provides no practical benefit.
283 */
286
287#define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
288#define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)
289
290/*
291 * Shared memory control area for all of replication slots.
292 */
294{
295 /*
296 * This array should be declared [FLEXIBLE_ARRAY_MEMBER], but for some
297 * reason you can't do that in an otherwise-empty struct.
298 */
301
302/*
303 * Set slot's inactive_since property unless it was previously invalidated.
304 */
305static inline void
318
319/*
320 * Pointers to shared memory
321 */
324
325/* GUCs */
329
330/* shmem initialization functions */
332extern void ReplicationSlotsShmemInit(void);
333
334/* management of individual slots */
335extern void ReplicationSlotCreate(const char *name, bool db_specific,
336 ReplicationSlotPersistency persistency,
337 bool two_phase, bool failover,
338 bool synced);
339extern void ReplicationSlotPersist(void);
340extern void ReplicationSlotDrop(const char *name, bool nowait);
341extern void ReplicationSlotDropAcquired(void);
342extern void ReplicationSlotAlter(const char *name, const bool *failover,
343 const bool *two_phase);
344
345extern void ReplicationSlotAcquire(const char *name, bool nowait,
346 bool error_if_invalid);
347extern void ReplicationSlotRelease(void);
348extern void ReplicationSlotCleanup(bool synced_only);
349extern void ReplicationSlotSave(void);
350extern void ReplicationSlotMarkDirty(void);
351
352/* misc stuff */
353extern void ReplicationSlotInitialize(void);
354extern bool ReplicationSlotValidateName(const char *name,
356 int elevel);
357extern bool ReplicationSlotValidateNameInternal(const char *name,
359 int *err_code, char **err_msg, char **err_hint);
360extern void ReplicationSlotReserveWal(void);
362extern void ReplicationSlotsComputeRequiredLSN(void);
364extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
365extern bool CheckLogicalSlotExists(void);
366extern void ReplicationSlotsDropDBSlots(Oid dboid);
369 Oid dboid,
370 TransactionId snapshotConflictHorizon);
372extern int ReplicationSlotIndex(ReplicationSlot *slot);
373extern bool ReplicationSlotName(int index, Name name);
375extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
376
377extern void StartupReplicationSlots(void);
378extern void CheckPointReplicationSlots(bool is_shutdown);
379
380extern void CheckSlotRequirements(void);
381extern void CheckSlotPermissions(void);
383 GetSlotInvalidationCause(const char *cause_name);
385
386extern bool SlotExistsInSyncStandbySlots(const char *slot_name);
387extern bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel);
389
390#endif /* SLOT_H */
#define PGDLLIMPORT
Definition c.h:1344
uint32_t uint32
Definition c.h:546
uint32 TransactionId
Definition c.h:666
size_t Size
Definition c.h:619
int64 TimestampTz
Definition timestamp.h:39
static bool two_phase
static bool failover
unsigned int Oid
static int fb(int x)
int ProcNumber
Definition procnumber.h:24
int ReplicationSlotIndex(ReplicationSlot *slot)
Definition slot.c:574
PGDLLIMPORT char * synchronized_standby_slots
Definition slot.c:164
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition slot.c:621
PGDLLIMPORT ReplicationSlot * MyReplicationSlot
Definition slot.c:148
void CheckPointReplicationSlots(bool is_shutdown)
Definition slot.c:2309
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition slot.c:379
void ReplicationSlotDropAcquired(void)
Definition slot.c:1034
void ReplicationSlotMarkDirty(void)
Definition slot.c:1176
void ReplicationSlotReserveWal(void)
Definition slot.c:1696
bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
Definition slot.c:1449
bool ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name, int *err_code, char **err_msg, char **err_hint)
Definition slot.c:312
void ReplicationSlotsDropDBSlots(Oid dboid)
Definition slot.c:1510
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition slot.c:1370
PGDLLIMPORT int idle_replication_slot_timeout_secs
Definition slot.c:158
ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *cause_name)
Definition slot.c:2915
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition slot.c:1218
void ReplicationSlotPersist(void)
Definition slot.c:1193
bool CheckLogicalSlotExists(void)
Definition slot.c:1615
void ReplicationSlotDrop(const char *name, bool nowait)
Definition slot.c:912
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition slot.c:3059
ReplicationSlotPersistency
Definition slot.h:44
@ RS_PERSISTENT
Definition slot.h:45
@ RS_EPHEMERAL
Definition slot.h:46
@ RS_TEMPORARY
Definition slot.h:47
void ReplicationSlotSave(void)
Definition slot.c:1158
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition slot.c:541
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition tablesync.c:1201
void CheckSlotPermissions(void)
Definition slot.c:1679
bool ReplicationSlotName(int index, Name name)
Definition slot.c:590
void ReplicationSlotsShmemInit(void)
Definition slot.c:206
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
Definition slot.c:267
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
Definition slot.c:952
void ReplicationSlotRelease(void)
Definition slot.c:761
void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
Definition slot.c:3240
PGDLLIMPORT ReplicationSlotCtlData * ReplicationSlotCtl
Definition slot.c:145
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition slot.c:3092
ReplicationSlotInvalidationCause
Definition slot.h:59
@ RS_INVAL_WAL_REMOVED
Definition slot.h:62
@ RS_INVAL_IDLE_TIMEOUT
Definition slot.h:68
@ RS_INVAL_HORIZON
Definition slot.h:64
@ RS_INVAL_WAL_LEVEL
Definition slot.h:66
@ RS_INVAL_NONE
Definition slot.h:60
void ReplicationSlotsComputeRequiredLSN(void)
Definition slot.c:1300
void ReplicationSlotCleanup(bool synced_only)
Definition slot.c:860
void ReplicationSlotInitialize(void)
Definition slot.c:242
PGDLLIMPORT int max_replication_slots
Definition slot.c:151
void StartupReplicationSlots(void)
Definition slot.c:2387
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void CheckSlotRequirements(void)
Definition slot.c:1657
bool InvalidateObsoleteReplicationSlots(uint32 possible_causes, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
Definition slot.c:2205
static void ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
Definition slot.h:306
SlotSyncSkipReason
Definition slot.h:81
@ SS_SKIP_WAL_NOT_FLUSHED
Definition slot.h:83
@ SS_SKIP_NO_CONSISTENT_SNAPSHOT
Definition slot.h:87
@ SS_SKIP_NONE
Definition slot.h:82
@ SS_SKIP_INVALID
Definition slot.h:89
@ SS_SKIP_WAL_OR_ROWS_REMOVED
Definition slot.h:85
Size ReplicationSlotsShmemSize(void)
Definition slot.c:188
const char * GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
Definition slot.c:2935
#define SpinLockRelease(lock)
Definition spin.h:61
#define SpinLockAcquire(lock)
Definition spin.h:59
ReplicationSlot replication_slots[1]
Definition slot.h:299
TransactionId catalog_xmin
Definition slot.h:122
ReplicationSlotPersistency persistency
Definition slot.h:106
ReplicationSlotInvalidationCause invalidated
Definition slot.h:128
XLogRecPtr candidate_xmin_lsn
Definition slot.h:229
TransactionId effective_catalog_xmin
Definition slot.h:210
slock_t mutex
Definition slot.h:183
XLogRecPtr candidate_restart_valid
Definition slot.h:230
XLogRecPtr last_saved_confirmed_flush
Definition slot.h:238
SlotSyncSkipReason slotsync_skip_reason
Definition slot.h:284
bool in_use
Definition slot.h:186
TransactionId effective_xmin
Definition slot.h:209
bool just_dirtied
Definition slot.h:195
XLogRecPtr last_saved_restart_lsn
Definition slot.h:271
XLogRecPtr candidate_restart_lsn
Definition slot.h:231
LWLock io_in_progress_lock
Definition slot.h:216
ConditionVariable active_cv
Definition slot.h:219
TransactionId candidate_catalog_xmin
Definition slot.h:228
ProcNumber active_proc
Definition slot.h:192
ReplicationSlotPersistentData data
Definition slot.h:213
TimestampTz inactive_since
Definition slot.h:245
Definition type.h:96
Definition c.h:760
const char * name
static WalReceiverConn * wrconn
Definition walreceiver.c:94
uint64 XLogRecPtr
Definition xlogdefs.h:21
uint64 XLogSegNo
Definition xlogdefs.h:52