PostgreSQL Source Code  git master
slot.h
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * slot.h
3  * Replication slot management.
4  *
5  * Copyright (c) 2012-2024, PostgreSQL Global Development Group
6  *
7  *-------------------------------------------------------------------------
8  */
9 #ifndef SLOT_H
10 #define SLOT_H
11 
12 #include "access/xlog.h"
13 #include "access/xlogreader.h"
15 #include "storage/lwlock.h"
16 #include "storage/shmem.h"
17 #include "storage/spin.h"
19 
20 /* directory to store replication slot data in */
21 #define PG_REPLSLOT_DIR "pg_replslot"
22 
23 /*
24  * Behaviour of replication slots, upon release or crash.
25  *
26  * Slots marked as PERSISTENT are crash-safe and will not be dropped when
27  * released. Slots marked as EPHEMERAL will be dropped when released or after
28  * restarts. Slots marked TEMPORARY will be dropped at the end of a session
29  * or on error.
30  *
31  * EPHEMERAL is used as a not-quite-ready state when creating persistent
32  * slots. EPHEMERAL slots can be made PERSISTENT by calling
33  * ReplicationSlotPersist(). For a slot that goes away at the end of a
34  * session, TEMPORARY is the appropriate choice.
35  */
37 {
42 
43 /*
44  * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
45  * 'invalidated' field is set to a value other than _NONE.
46  *
47  * When adding a new invalidation cause here, remember to update
48  * SlotInvalidationCauses and RS_INVAL_MAX_CAUSES.
49  */
51 {
53  /* required WAL has been removed */
55  /* required rows have been removed */
57  /* wal_level insufficient for slot */
60 
61 extern PGDLLIMPORT const char *const SlotInvalidationCauses[];
62 
63 /*
64  * On-Disk data of a replication slot, preserved across restarts.
65  */
67 {
68  /* The slot's identifier */
70 
71  /* database the slot is active on */
73 
74  /*
75  * The slot's behaviour when being dropped (or restored after a crash).
76  */
78 
79  /*
80  * xmin horizon for data
81  *
82  * NB: This may represent a value that hasn't been written to disk yet;
83  * see notes for effective_xmin, below.
84  */
86 
87  /*
88  * xmin horizon for catalog tuples
89  *
90  * NB: This may represent a value that hasn't been written to disk yet;
91  * see notes for effective_xmin, below.
92  */
94 
95  /* oldest LSN that might be required by this replication slot */
97 
98  /* RS_INVAL_NONE if valid, or the reason for having been invalidated */
100 
101  /*
102  * Oldest LSN that the client has acked receipt for. This is used as the
103  * start_lsn point in case the client doesn't specify one, and also as a
104  * safety measure to jump forwards in case the client specifies a
105  * start_lsn that's further in the past than this value.
106  */
108 
109  /*
110  * LSN at which we enabled two_phase commit for this slot or LSN at which
111  * we found a consistent point at the time of slot creation.
112  */
114 
115  /*
116  * Allow decoding of prepared transactions?
117  */
118  bool two_phase;
119 
120  /* plugin name */
122 
123  /*
124  * Was this slot synchronized from the primary server?
125  */
126  char synced;
127 
128  /*
129  * Is this a failover slot (sync candidate for standbys)? Only relevant
130  * for logical slots on the primary server.
131  */
132  bool failover;
134 
135 /*
136  * Shared memory state of a single replication slot.
137  *
138  * The in-memory data of replication slots follows a locking model based
139  * on two linked concepts:
140  * - A replication slot's in_use flag is switched when added or discarded using
141  * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive
142  * mode when updating the flag by the backend owning the slot and doing the
143  * operation, while readers (concurrent backends not owning the slot) need
144  * to hold it in shared mode when looking at replication slot data.
145  * - Individual fields are protected by mutex where only the backend owning
146  * the slot is authorized to update the fields from its own slot. The
147  * backend owning the slot does not need to take this lock when reading its
148  * own fields, while concurrent backends not owning this slot should take the
149  * lock when reading this slot's data.
150  */
151 typedef struct ReplicationSlot
152 {
153  /* lock, on same cacheline as effective_xmin */
154  slock_t mutex;
155 
156  /* is this slot defined */
157  bool in_use;
158 
159  /* Who is streaming out changes for this slot? 0 in unused slots. */
160  pid_t active_pid;
161 
162  /* any outstanding modifications? */
164  bool dirty;
165 
166  /*
167  * For logical decoding, it's extremely important that we never remove any
168  * data that's still needed for decoding purposes, even after a crash;
169  * otherwise, decoding will produce wrong answers. Ordinary streaming
170  * replication also needs to prevent old row versions from being removed
171  * too soon, but the worst consequence we might encounter there is
172  * unwanted query cancellations on the standby. Thus, for logical
173  * decoding, this value represents the latest xmin that has actually been
174  * written to disk, whereas for streaming replication, it's just the same
175  * as the persistent value (data.xmin).
176  */
179 
180  /* data surviving shutdowns and crashes */
182 
183  /* is somebody performing io on this slot? */
185 
186  /* Condition variable signaled when active_pid changes */
188 
189  /* all the remaining data is only used for logical slots */
190 
191  /*
192  * When the client has confirmed flushes >= candidate_xmin_lsn we can
193  * advance the catalog xmin. When restart_valid has been passed,
194  * restart_lsn can be increased.
195  */
200 
201  /*
202  * This value tracks the last confirmed_flush LSN flushed which is used
203  * during a shutdown checkpoint to decide if logical's slot data should be
204  * forcibly flushed or not.
205  */
207 
208  /* The time since the slot has become inactive */
211 
212 #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
213 #define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)
214 
215 /*
216  * Shared memory control area for all of replication slots.
217  */
219 {
220  /*
221  * This array should be declared [FLEXIBLE_ARRAY_MEMBER], but for some
222  * reason you can't do that in an otherwise-empty struct.
223  */
226 
227 /*
228  * Pointers to shared memory
229  */
232 
233 /* GUCs */
236 
237 /* shmem initialization functions */
238 extern Size ReplicationSlotsShmemSize(void);
239 extern void ReplicationSlotsShmemInit(void);
240 
241 /* management of individual slots */
242 extern void ReplicationSlotCreate(const char *name, bool db_specific,
243  ReplicationSlotPersistency persistency,
244  bool two_phase, bool failover,
245  bool synced);
246 extern void ReplicationSlotPersist(void);
247 extern void ReplicationSlotDrop(const char *name, bool nowait);
248 extern void ReplicationSlotDropAcquired(void);
249 extern void ReplicationSlotAlter(const char *name, const bool *failover,
250  const bool *two_phase);
251 
252 extern void ReplicationSlotAcquire(const char *name, bool nowait);
253 extern void ReplicationSlotRelease(void);
254 extern void ReplicationSlotCleanup(bool synced_only);
255 extern void ReplicationSlotSave(void);
256 extern void ReplicationSlotMarkDirty(void);
257 
258 /* misc stuff */
259 extern void ReplicationSlotInitialize(void);
260 extern bool ReplicationSlotValidateName(const char *name, int elevel);
261 extern void ReplicationSlotReserveWal(void);
262 extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
263 extern void ReplicationSlotsComputeRequiredLSN(void);
265 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
266 extern void ReplicationSlotsDropDBSlots(Oid dboid);
268  XLogSegNo oldestSegno,
269  Oid dboid,
270  TransactionId snapshotConflictHorizon);
271 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
272 extern int ReplicationSlotIndex(ReplicationSlot *slot);
273 extern bool ReplicationSlotName(int index, Name name);
274 extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot);
275 extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
276 
277 extern void StartupReplicationSlots(void);
278 extern void CheckPointReplicationSlots(bool is_shutdown);
279 
280 extern void CheckSlotRequirements(void);
281 extern void CheckSlotPermissions(void);
283  GetSlotInvalidationCause(const char *invalidation_reason);
284 
285 extern bool SlotExistsInSyncStandbySlots(const char *slot_name);
286 extern bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel);
287 extern void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn);
288 
289 #endif /* SLOT_H */
#define PGDLLIMPORT
Definition: c.h:1307
uint32 TransactionId
Definition: c.h:643
size_t Size
Definition: c.h:596
int64 TimestampTz
Definition: timestamp.h:39
static bool two_phase
unsigned int Oid
Definition: postgres_ext.h:31
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:464
int ReplicationSlotIndex(ReplicationSlot *slot)
Definition: slot.c:497
struct ReplicationSlotCtlData ReplicationSlotCtlData
PGDLLIMPORT char * synchronized_standby_slots
Definition: slot.c:148
PGDLLIMPORT ReplicationSlot * MyReplicationSlot
Definition: slot.c:138
void CheckPointReplicationSlots(bool is_shutdown)
Definition: slot.c:1867
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition: slot.c:309
void ReplicationSlotDropAcquired(void)
Definition: slot.c:896
void ReplicationSlotMarkDirty(void)
Definition: slot.c:1038
void ReplicationSlotReserveWal(void)
Definition: slot.c:1429
bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
Definition: slot.c:1240
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:540
bool InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
Definition: slot.c:1811
ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *invalidation_reason)
Definition: slot.c:2420
void ReplicationSlotsDropDBSlots(Oid dboid)
Definition: slot.c:1298
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:1182
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:1077
void ReplicationSlotPersist(void)
Definition: slot.c:1055
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:784
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition: slot.c:2575
struct ReplicationSlotPersistentData ReplicationSlotPersistentData
ReplicationSlotPersistency
Definition: slot.h:37
@ RS_PERSISTENT
Definition: slot.h:38
@ RS_EPHEMERAL
Definition: slot.h:39
@ RS_TEMPORARY
Definition: slot.h:40
void ReplicationSlotSave(void)
Definition: slot.c:1020
PGDLLIMPORT const char *const SlotInvalidationCauses[]
Definition: slot.c:105
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition: tablesync.c:1267
void CheckSlotPermissions(void)
Definition: slot.c:1412
bool ReplicationSlotName(int index, Name name)
Definition: slot.c:513
void ReplicationSlotsShmemInit(void)
Definition: slot.c:189
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
Definition: slot.c:807
void ReplicationSlotRelease(void)
Definition: slot.c:652
void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
Definition: slot.c:2769
PGDLLIMPORT ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:135
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition: slot.c:2608
ReplicationSlotInvalidationCause
Definition: slot.h:51
@ RS_INVAL_WAL_REMOVED
Definition: slot.h:54
@ RS_INVAL_HORIZON
Definition: slot.h:56
@ RS_INVAL_WAL_LEVEL
Definition: slot.h:58
@ RS_INVAL_NONE
Definition: slot.h:52
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:1133
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:745
void ReplicationSlotInitialize(void)
Definition: slot.c:224
PGDLLIMPORT int max_replication_slots
Definition: slot.c:141
struct ReplicationSlot ReplicationSlot
void StartupReplicationSlots(void)
Definition: slot.c:1924
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void CheckSlotRequirements(void)
Definition: slot.c:1390
Size ReplicationSlotsShmemSize(void)
Definition: slot.c:171
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:252
Definition: lwlock.h:42
ReplicationSlot replication_slots[1]
Definition: slot.h:224
TransactionId xmin
Definition: slot.h:85
TransactionId catalog_xmin
Definition: slot.h:93
XLogRecPtr restart_lsn
Definition: slot.h:96
XLogRecPtr confirmed_flush
Definition: slot.h:107
ReplicationSlotPersistency persistency
Definition: slot.h:77
ReplicationSlotInvalidationCause invalidated
Definition: slot.h:99
XLogRecPtr candidate_xmin_lsn
Definition: slot.h:197
TransactionId effective_catalog_xmin
Definition: slot.h:178
slock_t mutex
Definition: slot.h:154
XLogRecPtr candidate_restart_valid
Definition: slot.h:198
XLogRecPtr last_saved_confirmed_flush
Definition: slot.h:206
pid_t active_pid
Definition: slot.h:160
bool in_use
Definition: slot.h:157
TransactionId effective_xmin
Definition: slot.h:177
bool just_dirtied
Definition: slot.h:163
XLogRecPtr candidate_restart_lsn
Definition: slot.h:199
LWLock io_in_progress_lock
Definition: slot.h:184
ConditionVariable active_cv
Definition: slot.h:187
TransactionId candidate_catalog_xmin
Definition: slot.h:196
bool dirty
Definition: slot.h:164
ReplicationSlotPersistentData data
Definition: slot.h:181
TimestampTz inactive_since
Definition: slot.h:209
Definition: type.h:95
Definition: c.h:732
const char * name
static WalReceiverConn * wrconn
Definition: walreceiver.c:92
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint64 XLogSegNo
Definition: xlogdefs.h:48