PostgreSQL Source Code  git master
slot.h
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * slot.h
3  * Replication slot management.
4  *
5  * Copyright (c) 2012-2020, PostgreSQL Global Development Group
6  *
7  *-------------------------------------------------------------------------
8  */
9 #ifndef SLOT_H
10 #define SLOT_H
11 
12 #include "access/xlog.h"
13 #include "access/xlogreader.h"
15 #include "storage/lwlock.h"
16 #include "storage/shmem.h"
17 #include "storage/spin.h"
18 
19 /*
20  * Behaviour of replication slots, upon release or crash.
21  *
22  * Slots marked as PERSISTENT are crash-safe and will not be dropped when
23  * released. Slots marked as EPHEMERAL will be dropped when released or after
24  * restarts. Slots marked TEMPORARY will be dropped at the end of a session
25  * or on error.
26  *
27  * EPHEMERAL is used as a not-quite-ready state when creating persistent
28  * slots. EPHEMERAL slots can be made PERSISTENT by calling
29  * ReplicationSlotPersist(). For a slot that goes away at the end of a
30  * session, TEMPORARY is the appropriate choice.
31  */
33 {
38 
39 /* For ReplicationSlotAcquire, q.v. */
40 typedef enum SlotAcquireBehavior
41 {
46 
47 /*
48  * On-Disk data of a replication slot, preserved across restarts.
49  */
51 {
52  /* The slot's identifier */
54 
55  /* database the slot is active on */
57 
58  /*
59  * The slot's behaviour when being dropped (or restored after a crash).
60  */
62 
63  /*
64  * xmin horizon for data
65  *
66  * NB: This may represent a value that hasn't been written to disk yet;
67  * see notes for effective_xmin, below.
68  */
70 
71  /*
72  * xmin horizon for catalog tuples
73  *
74  * NB: This may represent a value that hasn't been written to disk yet;
75  * see notes for effective_xmin, below.
76  */
78 
79  /* oldest LSN that might be required by this replication slot */
81 
82  /* restart_lsn is copied here when the slot is invalidated */
84 
85  /*
86  * Oldest LSN that the client has acked receipt for. This is used as the
87  * start_lsn point in case the client doesn't specify one, and also as a
88  * safety measure to jump forwards in case the client specifies a
89  * start_lsn that's further in the past than this value.
90  */
92 
93  /* plugin name */
96 
97 /*
98  * Shared memory state of a single replication slot.
99  *
100  * The in-memory data of replication slots follows a locking model based
101  * on two linked concepts:
102  * - A replication slot's in_use flag is switched when added or discarded using
103  * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive
104  * mode when updating the flag by the backend owning the slot and doing the
105  * operation, while readers (concurrent backends not owning the slot) need
106  * to hold it in shared mode when looking at replication slot data.
107  * - Individual fields are protected by mutex where only the backend owning
108  * the slot is authorized to update the fields from its own slot. The
109  * backend owning the slot does not need to take this lock when reading its
110  * own fields, while concurrent backends not owning this slot should take the
111  * lock when reading this slot's data.
112  */
113 typedef struct ReplicationSlot
114 {
115  /* lock, on same cacheline as effective_xmin */
117 
118  /* is this slot defined */
119  bool in_use;
120 
121  /* Who is streaming out changes for this slot? 0 in unused slots. */
122  pid_t active_pid;
123 
124  /* any outstanding modifications? */
126  bool dirty;
127 
128  /*
129  * For logical decoding, it's extremely important that we never remove any
130  * data that's still needed for decoding purposes, even after a crash;
131  * otherwise, decoding will produce wrong answers. Ordinary streaming
132  * replication also needs to prevent old row versions from being removed
133  * too soon, but the worst consequence we might encounter there is
134  * unwanted query cancellations on the standby. Thus, for logical
135  * decoding, this value represents the latest xmin that has actually been
136  * written to disk, whereas for streaming replication, it's just the same
137  * as the persistent value (data.xmin).
138  */
141 
142  /* data surviving shutdowns and crashes */
144 
145  /* is somebody performing io on this slot? */
147 
148  /* Condition variable signaled when active_pid changes */
150 
151  /* all the remaining data is only used for logical slots */
152 
153  /*
154  * When the client has confirmed flushes >= candidate_xmin_lsn we can
155  * advance the catalog xmin. When restart_valid has been passed,
156  * restart_lsn can be increased.
157  */
163 
164 #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
165 #define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)
166 
167 /*
168  * Shared memory control area for all of replication slots.
169  */
171 {
172  /*
173  * This array should be declared [FLEXIBLE_ARRAY_MEMBER], but for some
174  * reason you can't do that in an otherwise-empty struct.
175  */
176  ReplicationSlot replication_slots[1];
178 
179 /*
180  * Pointers to shared memory
181  */
184 
185 /* GUCs */
187 
188 /* shmem initialization functions */
189 extern Size ReplicationSlotsShmemSize(void);
190 extern void ReplicationSlotsShmemInit(void);
191 
192 /* management of individual slots */
193 extern void ReplicationSlotCreate(const char *name, bool db_specific,
195 extern void ReplicationSlotPersist(void);
196 extern void ReplicationSlotDrop(const char *name, bool nowait);
197 
198 extern int ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior);
199 extern void ReplicationSlotRelease(void);
200 extern void ReplicationSlotCleanup(void);
201 extern void ReplicationSlotSave(void);
202 extern void ReplicationSlotMarkDirty(void);
203 
204 /* misc stuff */
205 extern bool ReplicationSlotValidateName(const char *name, int elevel);
206 extern void ReplicationSlotReserveWal(void);
207 extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
208 extern void ReplicationSlotsComputeRequiredLSN(void);
210 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
211 extern void ReplicationSlotsDropDBSlots(Oid dboid);
212 extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
213 
214 extern void StartupReplicationSlots(void);
215 extern void CheckPointReplicationSlots(void);
216 
217 extern void CheckSlotRequirements(void);
218 
219 #endif /* SLOT_H */
void ReplicationSlotRelease(void)
Definition: slot.c:476
int slock_t
Definition: s_lock.h:934
Definition: lwlock.h:31
TransactionId candidate_catalog_xmin
Definition: slot.h:158
void ReplicationSlotReserveWal(void)
Definition: slot.c:1057
void CheckPointReplicationSlots(void)
Definition: slot.c:1250
uint32 TransactionId
Definition: c.h:520
ReplicationSlotPersistency persistency
Definition: slot.h:61
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency p)
Definition: slot.c:222
ReplicationSlotPersistentData data
Definition: slot.h:143
unsigned int Oid
Definition: postgres_ext.h:31
#define PGDLLIMPORT
Definition: c.h:1257
XLogRecPtr confirmed_flush
Definition: slot.h:91
void ReplicationSlotsShmemInit(void)
Definition: slot.c:135
void CheckSlotRequirements(void)
Definition: slot.c:1032
void ReplicationSlotCleanup(void)
Definition: slot.c:532
PGDLLIMPORT int max_replication_slots
Definition: slot.c:99
void ReplicationSlotPersist(void)
Definition: slot.c:733
Definition: slot.h:43
TransactionId effective_xmin
Definition: slot.h:139
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:755
Definition: c.h:616
struct ReplicationSlot ReplicationSlot
XLogRecPtr candidate_restart_valid
Definition: slot.h:160
uint64 XLogSegNo
Definition: xlogdefs.h:41
SlotAcquireBehavior
Definition: slot.h:40
TransactionId catalog_xmin
Definition: slot.h:77
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:805
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:570
struct ReplicationSlotPersistentData ReplicationSlotPersistentData
TransactionId xmin
Definition: slot.h:69
bool in_use
Definition: slot.h:119
static int elevel
Definition: vacuumlazy.c:333
void StartupReplicationSlots(void)
Definition: slot.c:1285
bool just_dirtied
Definition: slot.h:125
TransactionId effective_catalog_xmin
Definition: slot.h:140
void ReplicationSlotMarkDirty(void)
Definition: slot.c:716
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogRecPtr restart_lsn
Definition: slot.h:80
size_t Size
Definition: c.h:473
Size ReplicationSlotsShmemSize(void)
Definition: slot.c:117
ConditionVariable active_cv
Definition: slot.h:149
int ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
Definition: slot.c:367
XLogRecPtr candidate_xmin_lsn
Definition: slot.h:159
struct ReplicationSlotCtlData ReplicationSlotCtlData
PGDLLIMPORT ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:93
PGDLLIMPORT ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
ReplicationSlotPersistency
Definition: slot.h:32
pid_t active_pid
Definition: slot.h:122
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:175
Definition: slot.h:42
void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
Definition: slot.c:1134
XLogRecPtr invalidated_at
Definition: slot.h:83
slock_t mutex
Definition: slot.h:116
void ReplicationSlotSave(void)
Definition: slot.c:698
bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
Definition: slot.c:900
bool dirty
Definition: slot.h:126
XLogRecPtr candidate_restart_lsn
Definition: slot.h:161
LWLock io_in_progress_lock
Definition: slot.h:146
void ReplicationSlotsDropDBSlots(Oid dboid)
Definition: slot.c:956
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:848