PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
slotfuncs.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * slotfuncs.c
4  * Support functions for replication slots
5  *
6  * Copyright (c) 2012-2017, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/slotfuncs.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 
14 #include "postgres.h"
15 
16 #include "funcapi.h"
17 #include "miscadmin.h"
18 
19 #include "access/htup_details.h"
20 #include "replication/slot.h"
21 #include "replication/logical.h"
23 #include "utils/builtins.h"
24 #include "utils/pg_lsn.h"
25 
26 static void
28 {
30  ereport(ERROR,
31  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
32  (errmsg("must be superuser or replication role to use replication slots"))));
33 }
34 
35 /*
36  * SQL function for creating a new physical (streaming replication)
37  * replication slot.
38  */
39 Datum
41 {
43  bool immediately_reserve = PG_GETARG_BOOL(1);
44  bool temporary = PG_GETARG_BOOL(2);
45  Datum values[2];
46  bool nulls[2];
47  TupleDesc tupdesc;
48  HeapTuple tuple;
49  Datum result;
50 
52 
53  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
54  elog(ERROR, "return type must be a row type");
55 
57 
59 
60  /* acquire replication slot, this will check for conflicting names */
61  ReplicationSlotCreate(NameStr(*name), false,
62  temporary ? RS_TEMPORARY : RS_PERSISTENT);
63 
64  values[0] = NameGetDatum(&MyReplicationSlot->data.name);
65  nulls[0] = false;
66 
67  if (immediately_reserve)
68  {
69  /* Reserve WAL as the user asked for it */
71 
72  /* Write this slot to disk */
75 
77  nulls[1] = false;
78  }
79  else
80  {
81  nulls[1] = true;
82  }
83 
84  tuple = heap_form_tuple(tupdesc, values, nulls);
85  result = HeapTupleGetDatum(tuple);
86 
88 
89  PG_RETURN_DATUM(result);
90 }
91 
92 
93 /*
94  * SQL function for creating a new logical replication slot.
95  */
96 Datum
98 {
101  bool temporary = PG_GETARG_BOOL(2);
102 
104 
105  TupleDesc tupdesc;
106  HeapTuple tuple;
107  Datum result;
108  Datum values[2];
109  bool nulls[2];
110 
112 
113  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
114  elog(ERROR, "return type must be a row type");
115 
117 
119 
120  /*
121  * Acquire a logical decoding slot, this will check for conflicting names.
122  * Initially create persistent slot as ephemeral - that allows us to nicely
123  * handle errors during initialization because it'll get dropped if this
124  * transaction fails. We'll make it persistent at the end.
125  * Temporary slots can be created as temporary from beginning as they get
126  * dropped on error as well.
127  */
128  ReplicationSlotCreate(NameStr(*name), true,
129  temporary ? RS_TEMPORARY : RS_EPHEMERAL);
130 
131  /*
132  * Create logical decoding context, to build the initial snapshot.
133  */
135  NameStr(*plugin), NIL,
137 
138  /* build initial snapshot, might take a while */
140 
143 
144  /* don't need the decoding context anymore */
145  FreeDecodingContext(ctx);
146 
147  memset(nulls, 0, sizeof(nulls));
148 
149  tuple = heap_form_tuple(tupdesc, values, nulls);
150  result = HeapTupleGetDatum(tuple);
151 
152  /* ok, slot is now fully created, mark it as persistent if needed */
153  if (!temporary)
156 
157  PG_RETURN_DATUM(result);
158 }
159 
160 
161 /*
162  * SQL function for dropping a replication slot.
163  */
164 Datum
166 {
167  Name name = PG_GETARG_NAME(0);
168 
170 
172 
174 
175  PG_RETURN_VOID();
176 }
177 
178 /*
179  * pg_get_replication_slots - SQL SRF showing active replication slots.
180  */
181 Datum
183 {
184 #define PG_GET_REPLICATION_SLOTS_COLS 11
185  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
186  TupleDesc tupdesc;
187  Tuplestorestate *tupstore;
188  MemoryContext per_query_ctx;
189  MemoryContext oldcontext;
190  int slotno;
191 
192  /* check to see if caller supports us returning a tuplestore */
193  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
194  ereport(ERROR,
195  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
196  errmsg("set-valued function called in context that cannot accept a set")));
197  if (!(rsinfo->allowedModes & SFRM_Materialize))
198  ereport(ERROR,
199  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
200  errmsg("materialize mode required, but it is not " \
201  "allowed in this context")));
202 
203  /* Build a tuple descriptor for our result type */
204  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
205  elog(ERROR, "return type must be a row type");
206 
207  /*
208  * We don't require any special permission to see this function's data
209  * because nothing should be sensitive. The most critical being the slot
210  * name, which shouldn't contain anything particularly sensitive.
211  */
212 
213  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
214  oldcontext = MemoryContextSwitchTo(per_query_ctx);
215 
216  tupstore = tuplestore_begin_heap(true, false, work_mem);
217  rsinfo->returnMode = SFRM_Materialize;
218  rsinfo->setResult = tupstore;
219  rsinfo->setDesc = tupdesc;
220 
221  MemoryContextSwitchTo(oldcontext);
222 
223  for (slotno = 0; slotno < max_replication_slots; slotno++)
224  {
227  bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
228 
229  ReplicationSlotPersistency persistency;
230  TransactionId xmin;
231  TransactionId catalog_xmin;
232  XLogRecPtr restart_lsn;
233  XLogRecPtr confirmed_flush_lsn;
234  pid_t active_pid;
235  Oid database;
236  NameData slot_name;
238  int i;
239 
240  SpinLockAcquire(&slot->mutex);
241  if (!slot->in_use)
242  {
243  SpinLockRelease(&slot->mutex);
244  continue;
245  }
246  else
247  {
248  xmin = slot->data.xmin;
249  catalog_xmin = slot->data.catalog_xmin;
250  database = slot->data.database;
251  restart_lsn = slot->data.restart_lsn;
252  confirmed_flush_lsn = slot->data.confirmed_flush;
253  namecpy(&slot_name, &slot->data.name);
254  namecpy(&plugin, &slot->data.plugin);
255 
256  active_pid = slot->active_pid;
257  persistency = slot->data.persistency;
258  }
259  SpinLockRelease(&slot->mutex);
260 
261  memset(nulls, 0, sizeof(nulls));
262 
263  i = 0;
264  values[i++] = NameGetDatum(&slot_name);
265 
266  if (database == InvalidOid)
267  nulls[i++] = true;
268  else
269  values[i++] = NameGetDatum(&plugin);
270 
271  if (database == InvalidOid)
272  values[i++] = CStringGetTextDatum("physical");
273  else
274  values[i++] = CStringGetTextDatum("logical");
275 
276  if (database == InvalidOid)
277  nulls[i++] = true;
278  else
279  values[i++] = database;
280 
281  values[i++] = BoolGetDatum(persistency == RS_TEMPORARY);
282  values[i++] = BoolGetDatum(active_pid != 0);
283 
284  if (active_pid != 0)
285  values[i++] = Int32GetDatum(active_pid);
286  else
287  nulls[i++] = true;
288 
289  if (xmin != InvalidTransactionId)
290  values[i++] = TransactionIdGetDatum(xmin);
291  else
292  nulls[i++] = true;
293 
294  if (catalog_xmin != InvalidTransactionId)
295  values[i++] = TransactionIdGetDatum(catalog_xmin);
296  else
297  nulls[i++] = true;
298 
299  if (restart_lsn != InvalidXLogRecPtr)
300  values[i++] = LSNGetDatum(restart_lsn);
301  else
302  nulls[i++] = true;
303 
304  if (confirmed_flush_lsn != InvalidXLogRecPtr)
305  values[i++] = LSNGetDatum(confirmed_flush_lsn);
306  else
307  nulls[i++] = true;
308 
309  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
310  }
311 
312  tuplestore_donestoring(tupstore);
313 
314  return (Datum) 0;
315 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:735
int namecpy(Name n1, Name n2)
Definition: name.c:191
#define NIL
Definition: pg_list.h:69
static const char * plugin
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:93
void CheckSlotRequirements(void)
Definition: slot.c:805
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:557
#define NameGetDatum(X)
Definition: postgres.h:601
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
#define PG_GET_REPLICATION_SLOTS_COLS
uint32 TransactionId
Definition: c.h:397
Datum pg_get_replication_slots(PG_FUNCTION_ARGS)
Definition: slotfuncs.c:182
int logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: logicalfuncs.c:118
Oid GetUserId(void)
Definition: miscinit.c:283
Datum pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
Definition: slotfuncs.c:97
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:220
ReplicationSlotPersistency persistency
Definition: slot.h:49
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool superuser(void)
Definition: superuser.c:47
return result
Definition: formatting.c:1618
void ReplicationSlotSave(void)
Definition: slot.c:563
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:239
ReplicationSlotPersistentData data
Definition: slot.h:115
unsigned int Oid
Definition: postgres_ext.h:31
XLogRecPtr confirmed_flush
Definition: slot.h:76
Datum pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
Definition: slotfuncs.c:40
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ReplicationSlotReserveWal(void)
Definition: slot.c:825
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:414
#define ERROR
Definition: elog.h:43
void ReplicationSlotPersist(void)
Definition: slot.c:598
Definition: c.h:493
TransactionId catalog_xmin
Definition: slot.h:65
#define InvalidTransactionId
Definition: transam.h:31
void ReplicationSlotRelease(void)
Definition: slot.c:375
TransactionId xmin
Definition: slot.h:57
static void check_permissions(void)
Definition: slotfuncs.c:27
#define ereport(elevel, rest)
Definition: elog.h:122
bool in_use
Definition: slot.h:91
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:316
#define TransactionIdGetDatum(X)
Definition: postgres.h:527
uintptr_t Datum
Definition: postgres.h:372
Datum pg_drop_replication_slot(PG_FUNCTION_ARGS)
Definition: slotfuncs.c:165
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:313
int work_mem
Definition: globals.c:112
#define BoolGetDatum(X)
Definition: postgres.h:408
void ReplicationSlotDrop(const char *name)
Definition: slot.c:440
#define InvalidOid
Definition: postgres_ext.h:36
int allowedModes
Definition: execnodes.h:267
#define PG_RETURN_VOID()
Definition: fmgr.h:309
SetFunctionReturnMode returnMode
Definition: execnodes.h:269
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
int max_replication_slots
Definition: slot.c:99
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:457
XLogRecPtr restart_lsn
Definition: slot.h:68
bool has_rolreplication(Oid roleid)
Definition: miscinit.c:464
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:222
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:201
const char * name
Definition: encode.c:521
Tuplestorestate * setResult
Definition: execnodes.h:272
static Datum values[MAXATTR]
Definition: bootstrap.c:162
ExprContext * econtext
Definition: execnodes.h:265
#define Int32GetDatum(X)
Definition: postgres.h:485
TupleDesc setDesc
Definition: execnodes.h:273
ReplicationSlotPersistency
Definition: slot.h:28
int errmsg(const char *fmt,...)
Definition: elog.c:797
pid_t active_pid
Definition: slot.h:94
int i
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write)
Definition: logical.c:211
#define NameStr(name)
Definition: c.h:499
#define CStringGetTextDatum(s)
Definition: builtins.h:91
#define PG_FUNCTION_ARGS
Definition: fmgr.h:158
ReplicationSlot replication_slots[1]
Definition: slot.h:145
#define elog
Definition: elog.h:219
slock_t mutex
Definition: slot.h:88
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:76
void ReplicationSlotMarkDirty(void)
Definition: slot.c:581
#define PG_GETARG_NAME(n)
Definition: fmgr.h:243