PostgreSQL Source Code  git master
slot.h
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * slot.h
3  * Replication slot management.
4  *
5  * Copyright (c) 2012-2022, PostgreSQL Global Development Group
6  *
7  *-------------------------------------------------------------------------
8  */
9 #ifndef SLOT_H
10 #define SLOT_H
11 
12 #include "access/xlog.h"
13 #include "access/xlogreader.h"
15 #include "storage/lwlock.h"
16 #include "storage/shmem.h"
17 #include "storage/spin.h"
19 
20 /*
21  * Behaviour of replication slots, upon release or crash.
22  *
23  * Slots marked as PERSISTENT are crash-safe and will not be dropped when
24  * released. Slots marked as EPHEMERAL will be dropped when released or after
25  * restarts. Slots marked TEMPORARY will be dropped at the end of a session
26  * or on error.
27  *
28  * EPHEMERAL is used as a not-quite-ready state when creating persistent
29  * slots. EPHEMERAL slots can be made PERSISTENT by calling
30  * ReplicationSlotPersist(). For a slot that goes away at the end of a
31  * session, TEMPORARY is the appropriate choice.
32  */
34 {
39 
40 /*
41  * On-Disk data of a replication slot, preserved across restarts.
42  */
44 {
45  /* The slot's identifier */
47 
48  /* database the slot is active on */
50 
51  /*
52  * The slot's behaviour when being dropped (or restored after a crash).
53  */
55 
56  /*
57  * xmin horizon for data
58  *
59  * NB: This may represent a value that hasn't been written to disk yet;
60  * see notes for effective_xmin, below.
61  */
63 
64  /*
65  * xmin horizon for catalog tuples
66  *
67  * NB: This may represent a value that hasn't been written to disk yet;
68  * see notes for effective_xmin, below.
69  */
71 
72  /* oldest LSN that might be required by this replication slot */
74 
75  /* restart_lsn is copied here when the slot is invalidated */
77 
78  /*
79  * Oldest LSN that the client has acked receipt for. This is used as the
80  * start_lsn point in case the client doesn't specify one, and also as a
81  * safety measure to jump forwards in case the client specifies a
82  * start_lsn that's further in the past than this value.
83  */
85 
86  /*
87  * LSN at which we enabled two_phase commit for this slot or LSN at which
88  * we found a consistent point at the time of slot creation.
89  */
91 
92  /*
93  * Allow decoding of prepared transactions?
94  */
95  bool two_phase;
96 
97  /* plugin name */
100 
101 /*
102  * Shared memory state of a single replication slot.
103  *
104  * The in-memory data of replication slots follows a locking model based
105  * on two linked concepts:
106  * - A replication slot's in_use flag is switched when added or discarded using
107  * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive
108  * mode when updating the flag by the backend owning the slot and doing the
109  * operation, while readers (concurrent backends not owning the slot) need
110  * to hold it in shared mode when looking at replication slot data.
111  * - Individual fields are protected by mutex where only the backend owning
112  * the slot is authorized to update the fields from its own slot. The
113  * backend owning the slot does not need to take this lock when reading its
114  * own fields, while concurrent backends not owning this slot should take the
115  * lock when reading this slot's data.
116  */
117 typedef struct ReplicationSlot
118 {
119  /* lock, on same cacheline as effective_xmin */
121 
122  /* is this slot defined */
123  bool in_use;
124 
125  /* Who is streaming out changes for this slot? 0 in unused slots. */
126  pid_t active_pid;
127 
128  /* any outstanding modifications? */
130  bool dirty;
131 
132  /*
133  * For logical decoding, it's extremely important that we never remove any
134  * data that's still needed for decoding purposes, even after a crash;
135  * otherwise, decoding will produce wrong answers. Ordinary streaming
136  * replication also needs to prevent old row versions from being removed
137  * too soon, but the worst consequence we might encounter there is
138  * unwanted query cancellations on the standby. Thus, for logical
139  * decoding, this value represents the latest xmin that has actually been
140  * written to disk, whereas for streaming replication, it's just the same
141  * as the persistent value (data.xmin).
142  */
145 
146  /* data surviving shutdowns and crashes */
148 
149  /* is somebody performing io on this slot? */
151 
152  /* Condition variable signaled when active_pid changes */
154 
155  /* all the remaining data is only used for logical slots */
156 
157  /*
158  * When the client has confirmed flushes >= candidate_xmin_lsn we can
159  * advance the catalog xmin. When restart_valid has been passed,
160  * restart_lsn can be increased.
161  */
167 
168 #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
169 #define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)
170 
171 /*
172  * Shared memory control area for all of replication slots.
173  */
175 {
176  /*
177  * This array should be declared [FLEXIBLE_ARRAY_MEMBER], but for some
178  * reason you can't do that in an otherwise-empty struct.
179  */
182 
183 /*
184  * Pointers to shared memory
185  */
188 
189 /* GUCs */
191 
192 /* shmem initialization functions */
193 extern Size ReplicationSlotsShmemSize(void);
194 extern void ReplicationSlotsShmemInit(void);
195 
196 /* management of individual slots */
197 extern void ReplicationSlotCreate(const char *name, bool db_specific,
199 extern void ReplicationSlotPersist(void);
200 extern void ReplicationSlotDrop(const char *name, bool nowait);
201 
202 extern void ReplicationSlotAcquire(const char *name, bool nowait);
203 extern void ReplicationSlotRelease(void);
204 extern void ReplicationSlotCleanup(void);
205 extern void ReplicationSlotSave(void);
206 extern void ReplicationSlotMarkDirty(void);
207 
208 /* misc stuff */
209 extern void ReplicationSlotInitialize(void);
210 extern bool ReplicationSlotValidateName(const char *name, int elevel);
211 extern void ReplicationSlotReserveWal(void);
212 extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
213 extern void ReplicationSlotsComputeRequiredLSN(void);
215 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
216 extern void ReplicationSlotsDropDBSlots(Oid dboid);
217 extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
218 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
219 extern int ReplicationSlotIndex(ReplicationSlot *slot);
220 extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot);
221 extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
222 
223 extern void StartupReplicationSlots(void);
224 extern void CheckPointReplicationSlots(void);
225 
226 extern void CheckSlotRequirements(void);
227 extern void CheckSlotPermissions(void);
228 
229 #endif /* SLOT_H */
#define PGDLLIMPORT
Definition: c.h:1342
uint32 TransactionId
Definition: c.h:598
size_t Size
Definition: c.h:551
const char * name
Definition: encode.c:561
static bool two_phase
unsigned int Oid
Definition: postgres_ext.h:31
int slock_t
Definition: s_lock.h:975
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:377
int ReplicationSlotIndex(ReplicationSlot *slot)
Definition: slot.c:410
struct ReplicationSlotCtlData ReplicationSlotCtlData
PGDLLIMPORT ReplicationSlot * MyReplicationSlot
Definition: slot.c:97
void ReplicationSlotCleanup(void)
Definition: slot.c:578
void ReplicationSlotMarkDirty(void)
Definition: slot.c:798
void ReplicationSlotReserveWal(void)
Definition: slot.c:1151
bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
Definition: slot.c:982
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:425
void ReplicationSlotsDropDBSlots(Oid dboid)
Definition: slot.c:1038
bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
Definition: slot.c:1393
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:930
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:837
void ReplicationSlotPersist(void)
Definition: slot.c:815
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:625
struct ReplicationSlotPersistentData ReplicationSlotPersistentData
ReplicationSlotPersistency
Definition: slot.h:34
@ RS_PERSISTENT
Definition: slot.h:35
@ RS_EPHEMERAL
Definition: slot.h:36
@ RS_TEMPORARY
Definition: slot.h:37
void ReplicationSlotSave(void)
Definition: slot.c:780
void CheckSlotPermissions(void)
Definition: slot.c:1136
void ReplicationSlotsShmemInit(void)
Definition: slot.c:134
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency p, bool two_phase)
Definition: slot.c:255
void ReplicationSlotRelease(void)
Definition: slot.c:522
void CheckPointReplicationSlots(void)
Definition: slot.c:1442
PGDLLIMPORT ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:94
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:887
void ReplicationSlotInitialize(void)
Definition: slot.c:169
PGDLLIMPORT int max_replication_slots
Definition: slot.c:100
struct ReplicationSlot ReplicationSlot
void StartupReplicationSlots(void)
Definition: slot.c:1477
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void CheckSlotRequirements(void)
Definition: slot.c:1114
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot)
Definition: tablesync.c:1155
Size ReplicationSlotsShmemSize(void)
Definition: slot.c:116
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:201
Definition: lwlock.h:32
ReplicationSlot replication_slots[1]
Definition: slot.h:180
TransactionId xmin
Definition: slot.h:62
XLogRecPtr two_phase_at
Definition: slot.h:90
TransactionId catalog_xmin
Definition: slot.h:70
XLogRecPtr restart_lsn
Definition: slot.h:73
XLogRecPtr confirmed_flush
Definition: slot.h:84
XLogRecPtr invalidated_at
Definition: slot.h:76
ReplicationSlotPersistency persistency
Definition: slot.h:54
XLogRecPtr candidate_xmin_lsn
Definition: slot.h:163
TransactionId effective_catalog_xmin
Definition: slot.h:144
slock_t mutex
Definition: slot.h:120
XLogRecPtr candidate_restart_valid
Definition: slot.h:164
pid_t active_pid
Definition: slot.h:126
bool in_use
Definition: slot.h:123
TransactionId effective_xmin
Definition: slot.h:143
bool just_dirtied
Definition: slot.h:129
XLogRecPtr candidate_restart_lsn
Definition: slot.h:165
LWLock io_in_progress_lock
Definition: slot.h:150
ConditionVariable active_cv
Definition: slot.h:153
TransactionId candidate_catalog_xmin
Definition: slot.h:162
bool dirty
Definition: slot.h:130
ReplicationSlotPersistentData data
Definition: slot.h:147
Definition: c.h:687
static WalReceiverConn * wrconn
Definition: walreceiver.c:95
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint64 XLogSegNo
Definition: xlogdefs.h:48