PostgreSQL Source Code  git master
slot.h
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * slot.h
3  * Replication slot management.
4  *
5  * Copyright (c) 2012-2023, PostgreSQL Global Development Group
6  *
7  *-------------------------------------------------------------------------
8  */
9 #ifndef SLOT_H
10 #define SLOT_H
11 
12 #include "access/xlog.h"
13 #include "access/xlogreader.h"
15 #include "storage/lwlock.h"
16 #include "storage/shmem.h"
17 #include "storage/spin.h"
19 
20 /*
21  * Behaviour of replication slots, upon release or crash.
22  *
23  * Slots marked as PERSISTENT are crash-safe and will not be dropped when
24  * released. Slots marked as EPHEMERAL will be dropped when released or after
25  * restarts. Slots marked TEMPORARY will be dropped at the end of a session
26  * or on error.
27  *
28  * EPHEMERAL is used as a not-quite-ready state when creating persistent
29  * slots. EPHEMERAL slots can be made PERSISTENT by calling
30  * ReplicationSlotPersist(). For a slot that goes away at the end of a
31  * session, TEMPORARY is the appropriate choice.
32  */
34 {
39 
40 /*
41  * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
42  * 'invalidated' field is set to a value other than _NONE.
43  */
45 {
47  /* required WAL has been removed */
49  /* required rows have been removed */
51  /* wal_level insufficient for slot */
54 
55 /*
56  * On-Disk data of a replication slot, preserved across restarts.
57  */
59 {
60  /* The slot's identifier */
62 
63  /* database the slot is active on */
65 
66  /*
67  * The slot's behaviour when being dropped (or restored after a crash).
68  */
70 
71  /*
72  * xmin horizon for data
73  *
74  * NB: This may represent a value that hasn't been written to disk yet;
75  * see notes for effective_xmin, below.
76  */
78 
79  /*
80  * xmin horizon for catalog tuples
81  *
82  * NB: This may represent a value that hasn't been written to disk yet;
83  * see notes for effective_xmin, below.
84  */
86 
87  /* oldest LSN that might be required by this replication slot */
89 
90  /* RS_INVAL_NONE if valid, or the reason for having been invalidated */
92 
93  /*
94  * Oldest LSN that the client has acked receipt for. This is used as the
95  * start_lsn point in case the client doesn't specify one, and also as a
96  * safety measure to jump forwards in case the client specifies a
97  * start_lsn that's further in the past than this value.
98  */
100 
101  /*
102  * LSN at which we enabled two_phase commit for this slot or LSN at which
103  * we found a consistent point at the time of slot creation.
104  */
106 
107  /*
108  * Allow decoding of prepared transactions?
109  */
110  bool two_phase;
111 
112  /* plugin name */
115 
116 /*
117  * Shared memory state of a single replication slot.
118  *
119  * The in-memory data of replication slots follows a locking model based
120  * on two linked concepts:
121  * - A replication slot's in_use flag is switched when added or discarded using
122  * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive
123  * mode when updating the flag by the backend owning the slot and doing the
124  * operation, while readers (concurrent backends not owning the slot) need
125  * to hold it in shared mode when looking at replication slot data.
126  * - Individual fields are protected by mutex where only the backend owning
127  * the slot is authorized to update the fields from its own slot. The
128  * backend owning the slot does not need to take this lock when reading its
129  * own fields, while concurrent backends not owning this slot should take the
130  * lock when reading this slot's data.
131  */
132 typedef struct ReplicationSlot
133 {
134  /* lock, on same cacheline as effective_xmin */
136 
137  /* is this slot defined */
138  bool in_use;
139 
140  /* Who is streaming out changes for this slot? 0 in unused slots. */
141  pid_t active_pid;
142 
143  /* any outstanding modifications? */
145  bool dirty;
146 
147  /*
148  * For logical decoding, it's extremely important that we never remove any
149  * data that's still needed for decoding purposes, even after a crash;
150  * otherwise, decoding will produce wrong answers. Ordinary streaming
151  * replication also needs to prevent old row versions from being removed
152  * too soon, but the worst consequence we might encounter there is
153  * unwanted query cancellations on the standby. Thus, for logical
154  * decoding, this value represents the latest xmin that has actually been
155  * written to disk, whereas for streaming replication, it's just the same
156  * as the persistent value (data.xmin).
157  */
160 
161  /* data surviving shutdowns and crashes */
163 
164  /* is somebody performing io on this slot? */
166 
167  /* Condition variable signaled when active_pid changes */
169 
170  /* all the remaining data is only used for logical slots */
171 
172  /*
173  * When the client has confirmed flushes >= candidate_xmin_lsn we can
174  * advance the catalog xmin. When restart_valid has been passed,
175  * restart_lsn can be increased.
176  */
181 
182  /*
183  * This value tracks the last confirmed_flush LSN flushed which is used
184  * during a shutdown checkpoint to decide if logical's slot data should be
185  * forcibly flushed or not.
186  */
189 
190 #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
191 #define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)
192 
193 /*
194  * Shared memory control area for all of replication slots.
195  */
197 {
198  /*
199  * This array should be declared [FLEXIBLE_ARRAY_MEMBER], but for some
200  * reason you can't do that in an otherwise-empty struct.
201  */
204 
205 /*
206  * Pointers to shared memory
207  */
210 
211 /* GUCs */
213 
214 /* shmem initialization functions */
215 extern Size ReplicationSlotsShmemSize(void);
216 extern void ReplicationSlotsShmemInit(void);
217 
218 /* management of individual slots */
219 extern void ReplicationSlotCreate(const char *name, bool db_specific,
220  ReplicationSlotPersistency persistency,
221  bool two_phase);
222 extern void ReplicationSlotPersist(void);
223 extern void ReplicationSlotDrop(const char *name, bool nowait);
224 
225 extern void ReplicationSlotAcquire(const char *name, bool nowait);
226 extern void ReplicationSlotRelease(void);
227 extern void ReplicationSlotCleanup(void);
228 extern void ReplicationSlotSave(void);
229 extern void ReplicationSlotMarkDirty(void);
230 
231 /* misc stuff */
232 extern void ReplicationSlotInitialize(void);
233 extern bool ReplicationSlotValidateName(const char *name, int elevel);
234 extern void ReplicationSlotReserveWal(void);
235 extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
236 extern void ReplicationSlotsComputeRequiredLSN(void);
238 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
239 extern void ReplicationSlotsDropDBSlots(Oid dboid);
241  XLogSegNo oldestSegno,
242  Oid dboid,
243  TransactionId snapshotConflictHorizon);
244 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
245 extern int ReplicationSlotIndex(ReplicationSlot *slot);
246 extern bool ReplicationSlotName(int index, Name name);
247 extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot);
248 extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
249 
250 extern void StartupReplicationSlots(void);
251 extern void CheckPointReplicationSlots(bool is_shutdown);
252 
253 extern void CheckSlotRequirements(void);
254 extern void CheckSlotPermissions(void);
255 
256 #endif /* SLOT_H */
#define PGDLLIMPORT
Definition: c.h:1326
uint32 TransactionId
Definition: c.h:641
size_t Size
Definition: c.h:594
static bool two_phase
unsigned int Oid
Definition: postgres_ext.h:31
int slock_t
Definition: s_lock.h:754
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:376
int ReplicationSlotIndex(ReplicationSlot *slot)
Definition: slot.c:409
struct ReplicationSlotCtlData ReplicationSlotCtlData
PGDLLIMPORT ReplicationSlot * MyReplicationSlot
Definition: slot.c:99
void CheckPointReplicationSlots(bool is_shutdown)
Definition: slot.c:1620
void ReplicationSlotCleanup(void)
Definition: slot.c:635
void ReplicationSlotMarkDirty(void)
Definition: slot.c:828
void ReplicationSlotReserveWal(void)
Definition: slot.c:1205
bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
Definition: slot.c:1030
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:452
bool InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
Definition: slot.c:1564
void ReplicationSlotsDropDBSlots(Oid dboid)
Definition: slot.c:1088
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:972
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:867
void ReplicationSlotPersist(void)
Definition: slot.c:845
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:673
struct ReplicationSlotPersistentData ReplicationSlotPersistentData
ReplicationSlotPersistency
Definition: slot.h:34
@ RS_PERSISTENT
Definition: slot.h:35
@ RS_EPHEMERAL
Definition: slot.h:36
@ RS_TEMPORARY
Definition: slot.h:37
void ReplicationSlotSave(void)
Definition: slot.c:810
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition: tablesync.c:1250
void CheckSlotPermissions(void)
Definition: slot.c:1188
bool ReplicationSlotName(int index, Name name)
Definition: slot.c:425
void ReplicationSlotsShmemInit(void)
Definition: slot.c:136
void ReplicationSlotRelease(void)
Definition: slot.c:559
PGDLLIMPORT ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:96
ReplicationSlotInvalidationCause
Definition: slot.h:45
@ RS_INVAL_WAL_REMOVED
Definition: slot.h:48
@ RS_INVAL_HORIZON
Definition: slot.h:50
@ RS_INVAL_WAL_LEVEL
Definition: slot.h:52
@ RS_INVAL_NONE
Definition: slot.h:46
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:923
void ReplicationSlotInitialize(void)
Definition: slot.c:171
PGDLLIMPORT int max_replication_slots
Definition: slot.c:102
struct ReplicationSlot ReplicationSlot
void StartupReplicationSlots(void)
Definition: slot.c:1679
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase)
Definition: slot.c:253
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void CheckSlotRequirements(void)
Definition: slot.c:1166
Size ReplicationSlotsShmemSize(void)
Definition: slot.c:118
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:199
Definition: lwlock.h:41
ReplicationSlot replication_slots[1]
Definition: slot.h:202
TransactionId xmin
Definition: slot.h:77
TransactionId catalog_xmin
Definition: slot.h:85
XLogRecPtr restart_lsn
Definition: slot.h:88
XLogRecPtr confirmed_flush
Definition: slot.h:99
ReplicationSlotPersistency persistency
Definition: slot.h:69
ReplicationSlotInvalidationCause invalidated
Definition: slot.h:91
XLogRecPtr candidate_xmin_lsn
Definition: slot.h:178
TransactionId effective_catalog_xmin
Definition: slot.h:159
slock_t mutex
Definition: slot.h:135
XLogRecPtr candidate_restart_valid
Definition: slot.h:179
XLogRecPtr last_saved_confirmed_flush
Definition: slot.h:187
pid_t active_pid
Definition: slot.h:141
bool in_use
Definition: slot.h:138
TransactionId effective_xmin
Definition: slot.h:158
bool just_dirtied
Definition: slot.h:144
XLogRecPtr candidate_restart_lsn
Definition: slot.h:180
LWLock io_in_progress_lock
Definition: slot.h:165
ConditionVariable active_cv
Definition: slot.h:168
TransactionId candidate_catalog_xmin
Definition: slot.h:177
bool dirty
Definition: slot.h:145
ReplicationSlotPersistentData data
Definition: slot.h:162
Definition: type.h:95
Definition: c.h:730
const char * name
static WalReceiverConn * wrconn
Definition: walreceiver.c:95
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint64 XLogSegNo
Definition: xlogdefs.h:48