PostgreSQL Source Code  git master
slot.h
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * slot.h
3  * Replication slot management.
4  *
5  * Copyright (c) 2012-2024, PostgreSQL Global Development Group
6  *
7  *-------------------------------------------------------------------------
8  */
9 #ifndef SLOT_H
10 #define SLOT_H
11 
12 #include "access/xlog.h"
13 #include "access/xlogreader.h"
15 #include "storage/lwlock.h"
16 #include "storage/shmem.h"
17 #include "storage/spin.h"
19 
20 /*
21  * Behaviour of replication slots, upon release or crash.
22  *
23  * Slots marked as PERSISTENT are crash-safe and will not be dropped when
24  * released. Slots marked as EPHEMERAL will be dropped when released or after
25  * restarts. Slots marked TEMPORARY will be dropped at the end of a session
26  * or on error.
27  *
28  * EPHEMERAL is used as a not-quite-ready state when creating persistent
29  * slots. EPHEMERAL slots can be made PERSISTENT by calling
30  * ReplicationSlotPersist(). For a slot that goes away at the end of a
31  * session, TEMPORARY is the appropriate choice.
32  */
34 {
39 
40 /*
41  * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
42  * 'invalidated' field is set to a value other than _NONE.
43  *
44  * When adding a new invalidation cause here, remember to update
45  * SlotInvalidationCauses and RS_INVAL_MAX_CAUSES.
46  */
48 {
50  /* required WAL has been removed */
52  /* required rows have been removed */
54  /* wal_level insufficient for slot */
57 
58 extern PGDLLIMPORT const char *const SlotInvalidationCauses[];
59 
60 /*
61  * On-Disk data of a replication slot, preserved across restarts.
62  */
64 {
65  /* The slot's identifier */
67 
68  /* database the slot is active on */
70 
71  /*
72  * The slot's behaviour when being dropped (or restored after a crash).
73  */
75 
76  /*
77  * xmin horizon for data
78  *
79  * NB: This may represent a value that hasn't been written to disk yet;
80  * see notes for effective_xmin, below.
81  */
83 
84  /*
85  * xmin horizon for catalog tuples
86  *
87  * NB: This may represent a value that hasn't been written to disk yet;
88  * see notes for effective_xmin, below.
89  */
91 
92  /* oldest LSN that might be required by this replication slot */
94 
95  /* RS_INVAL_NONE if valid, or the reason for having been invalidated */
97 
98  /*
99  * Oldest LSN that the client has acked receipt for. This is used as the
100  * start_lsn point in case the client doesn't specify one, and also as a
101  * safety measure to jump forwards in case the client specifies a
102  * start_lsn that's further in the past than this value.
103  */
105 
106  /*
107  * LSN at which we enabled two_phase commit for this slot or LSN at which
108  * we found a consistent point at the time of slot creation.
109  */
111 
112  /*
113  * Allow decoding of prepared transactions?
114  */
115  bool two_phase;
116 
117  /* plugin name */
119 
120  /*
121  * Was this slot synchronized from the primary server?
122  */
123  char synced;
124 
125  /*
126  * Is this a failover slot (sync candidate for standbys)? Only relevant
127  * for logical slots on the primary server.
128  */
129  bool failover;
131 
132 /*
133  * Shared memory state of a single replication slot.
134  *
135  * The in-memory data of replication slots follows a locking model based
136  * on two linked concepts:
137  * - A replication slot's in_use flag is switched when added or discarded using
138  * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive
139  * mode when updating the flag by the backend owning the slot and doing the
140  * operation, while readers (concurrent backends not owning the slot) need
141  * to hold it in shared mode when looking at replication slot data.
142  * - Individual fields are protected by mutex where only the backend owning
143  * the slot is authorized to update the fields from its own slot. The
144  * backend owning the slot does not need to take this lock when reading its
145  * own fields, while concurrent backends not owning this slot should take the
146  * lock when reading this slot's data.
147  */
148 typedef struct ReplicationSlot
149 {
150  /* lock, on same cacheline as effective_xmin */
152 
153  /* is this slot defined */
154  bool in_use;
155 
156  /* Who is streaming out changes for this slot? 0 in unused slots. */
157  pid_t active_pid;
158 
159  /* any outstanding modifications? */
161  bool dirty;
162 
163  /*
164  * For logical decoding, it's extremely important that we never remove any
165  * data that's still needed for decoding purposes, even after a crash;
166  * otherwise, decoding will produce wrong answers. Ordinary streaming
167  * replication also needs to prevent old row versions from being removed
168  * too soon, but the worst consequence we might encounter there is
169  * unwanted query cancellations on the standby. Thus, for logical
170  * decoding, this value represents the latest xmin that has actually been
171  * written to disk, whereas for streaming replication, it's just the same
172  * as the persistent value (data.xmin).
173  */
176 
177  /* data surviving shutdowns and crashes */
179 
180  /* is somebody performing io on this slot? */
182 
183  /* Condition variable signaled when active_pid changes */
185 
186  /* all the remaining data is only used for logical slots */
187 
188  /*
189  * When the client has confirmed flushes >= candidate_xmin_lsn we can
190  * advance the catalog xmin. When restart_valid has been passed,
191  * restart_lsn can be increased.
192  */
197 
198  /*
199  * This value tracks the last confirmed_flush LSN flushed which is used
200  * during a shutdown checkpoint to decide if logical's slot data should be
201  * forcibly flushed or not.
202  */
204 
205  /* The time since the slot has become inactive */
208 
209 #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
210 #define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)
211 
212 /*
213  * Shared memory control area for all of replication slots.
214  */
216 {
217  /*
218  * This array should be declared [FLEXIBLE_ARRAY_MEMBER], but for some
219  * reason you can't do that in an otherwise-empty struct.
220  */
223 
224 /*
225  * Pointers to shared memory
226  */
229 
230 /* GUCs */
232 extern PGDLLIMPORT char *standby_slot_names;
233 
234 /* shmem initialization functions */
235 extern Size ReplicationSlotsShmemSize(void);
236 extern void ReplicationSlotsShmemInit(void);
237 
238 /* management of individual slots */
239 extern void ReplicationSlotCreate(const char *name, bool db_specific,
240  ReplicationSlotPersistency persistency,
241  bool two_phase, bool failover,
242  bool synced);
243 extern void ReplicationSlotPersist(void);
244 extern void ReplicationSlotDrop(const char *name, bool nowait);
245 extern void ReplicationSlotDropAcquired(void);
246 extern void ReplicationSlotAlter(const char *name, bool failover);
247 
248 extern void ReplicationSlotAcquire(const char *name, bool nowait);
249 extern void ReplicationSlotRelease(void);
250 extern void ReplicationSlotCleanup(bool synced_only);
251 extern void ReplicationSlotSave(void);
252 extern void ReplicationSlotMarkDirty(void);
253 
254 /* misc stuff */
255 extern void ReplicationSlotInitialize(void);
256 extern bool ReplicationSlotValidateName(const char *name, int elevel);
257 extern void ReplicationSlotReserveWal(void);
258 extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
259 extern void ReplicationSlotsComputeRequiredLSN(void);
261 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
262 extern void ReplicationSlotsDropDBSlots(Oid dboid);
264  XLogSegNo oldestSegno,
265  Oid dboid,
266  TransactionId snapshotConflictHorizon);
267 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
268 extern int ReplicationSlotIndex(ReplicationSlot *slot);
269 extern bool ReplicationSlotName(int index, Name name);
270 extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot);
271 extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
272 
273 extern void StartupReplicationSlots(void);
274 extern void CheckPointReplicationSlots(bool is_shutdown);
275 
276 extern void CheckSlotRequirements(void);
277 extern void CheckSlotPermissions(void);
279  GetSlotInvalidationCause(const char *invalidation_reason);
280 
281 extern bool SlotExistsInStandbySlotNames(const char *slot_name);
282 extern bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel);
283 extern void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn);
284 
285 #endif /* SLOT_H */
#define PGDLLIMPORT
Definition: c.h:1316
uint32 TransactionId
Definition: c.h:652
size_t Size
Definition: c.h:605
int64 TimestampTz
Definition: timestamp.h:39
static bool two_phase
unsigned int Oid
Definition: postgres_ext.h:31
int slock_t
Definition: s_lock.h:735
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:464
void ReplicationSlotAlter(const char *name, bool failover)
Definition: slot.c:807
int ReplicationSlotIndex(ReplicationSlot *slot)
Definition: slot.c:497
struct ReplicationSlotCtlData ReplicationSlotCtlData
PGDLLIMPORT ReplicationSlot * MyReplicationSlot
Definition: slot.c:138
void CheckPointReplicationSlots(bool is_shutdown)
Definition: slot.c:1839
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition: slot.c:309
void ReplicationSlotDropAcquired(void)
Definition: slot.c:868
void ReplicationSlotMarkDirty(void)
Definition: slot.c:1010
void ReplicationSlotReserveWal(void)
Definition: slot.c:1401
bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
Definition: slot.c:1212
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:540
bool SlotExistsInStandbySlotNames(const char *slot_name)
Definition: slot.c:2549
bool InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
Definition: slot.c:1783
ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *invalidation_reason)
Definition: slot.c:2394
void ReplicationSlotsDropDBSlots(Oid dboid)
Definition: slot.c:1270
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:1154
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:1049
void ReplicationSlotPersist(void)
Definition: slot.c:1027
PGDLLIMPORT char * standby_slot_names
Definition: slot.c:148
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:784
struct ReplicationSlotPersistentData ReplicationSlotPersistentData
ReplicationSlotPersistency
Definition: slot.h:34
@ RS_PERSISTENT
Definition: slot.h:35
@ RS_EPHEMERAL
Definition: slot.h:36
@ RS_TEMPORARY
Definition: slot.h:37
void ReplicationSlotSave(void)
Definition: slot.c:992
PGDLLIMPORT const char *const SlotInvalidationCauses[]
Definition: slot.c:105
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition: tablesync.c:1267
void CheckSlotPermissions(void)
Definition: slot.c:1384
bool ReplicationSlotName(int index, Name name)
Definition: slot.c:513
void ReplicationSlotsShmemInit(void)
Definition: slot.c:189
void ReplicationSlotRelease(void)
Definition: slot.c:652
void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
Definition: slot.c:2742
PGDLLIMPORT ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:135
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition: slot.c:2582
ReplicationSlotInvalidationCause
Definition: slot.h:48
@ RS_INVAL_WAL_REMOVED
Definition: slot.h:51
@ RS_INVAL_HORIZON
Definition: slot.h:53
@ RS_INVAL_WAL_LEVEL
Definition: slot.h:55
@ RS_INVAL_NONE
Definition: slot.h:49
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:1105
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:745
void ReplicationSlotInitialize(void)
Definition: slot.c:224
PGDLLIMPORT int max_replication_slots
Definition: slot.c:141
struct ReplicationSlot ReplicationSlot
void StartupReplicationSlots(void)
Definition: slot.c:1898
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void CheckSlotRequirements(void)
Definition: slot.c:1362
Size ReplicationSlotsShmemSize(void)
Definition: slot.c:171
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:252
Definition: lwlock.h:42
ReplicationSlot replication_slots[1]
Definition: slot.h:221
TransactionId xmin
Definition: slot.h:82
TransactionId catalog_xmin
Definition: slot.h:90
XLogRecPtr restart_lsn
Definition: slot.h:93
XLogRecPtr confirmed_flush
Definition: slot.h:104
ReplicationSlotPersistency persistency
Definition: slot.h:74
ReplicationSlotInvalidationCause invalidated
Definition: slot.h:96
XLogRecPtr candidate_xmin_lsn
Definition: slot.h:194
TransactionId effective_catalog_xmin
Definition: slot.h:175
slock_t mutex
Definition: slot.h:151
XLogRecPtr candidate_restart_valid
Definition: slot.h:195
XLogRecPtr last_saved_confirmed_flush
Definition: slot.h:203
pid_t active_pid
Definition: slot.h:157
bool in_use
Definition: slot.h:154
TransactionId effective_xmin
Definition: slot.h:174
bool just_dirtied
Definition: slot.h:160
XLogRecPtr candidate_restart_lsn
Definition: slot.h:196
LWLock io_in_progress_lock
Definition: slot.h:181
ConditionVariable active_cv
Definition: slot.h:184
TransactionId candidate_catalog_xmin
Definition: slot.h:193
bool dirty
Definition: slot.h:161
ReplicationSlotPersistentData data
Definition: slot.h:178
TimestampTz inactive_since
Definition: slot.h:206
Definition: type.h:95
Definition: c.h:741
const char * name
static WalReceiverConn * wrconn
Definition: walreceiver.c:92
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint64 XLogSegNo
Definition: xlogdefs.h:48