PostgreSQL Source Code  git master
slotfuncs.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * slotfuncs.c
4  * Support functions for replication slots
5  *
6  * Copyright (c) 2012-2018, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/slotfuncs.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 
14 #include "postgres.h"
15 
16 #include "funcapi.h"
17 #include "miscadmin.h"
18 
19 #include "access/htup_details.h"
20 #include "replication/decode.h"
21 #include "replication/slot.h"
22 #include "replication/logical.h"
24 #include "utils/builtins.h"
25 #include "utils/inval.h"
26 #include "utils/pg_lsn.h"
27 #include "utils/resowner.h"
28 
29 static void
31 {
33  ereport(ERROR,
34  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
35  (errmsg("must be superuser or replication role to use replication slots"))));
36 }
37 
38 /*
39  * SQL function for creating a new physical (streaming replication)
40  * replication slot.
41  */
42 Datum
44 {
46  bool immediately_reserve = PG_GETARG_BOOL(1);
47  bool temporary = PG_GETARG_BOOL(2);
48  Datum values[2];
49  bool nulls[2];
50  TupleDesc tupdesc;
51  HeapTuple tuple;
52  Datum result;
53 
55 
56  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
57  elog(ERROR, "return type must be a row type");
58 
60 
62 
63  /* acquire replication slot, this will check for conflicting names */
64  ReplicationSlotCreate(NameStr(*name), false,
65  temporary ? RS_TEMPORARY : RS_PERSISTENT);
66 
67  values[0] = NameGetDatum(&MyReplicationSlot->data.name);
68  nulls[0] = false;
69 
70  if (immediately_reserve)
71  {
72  /* Reserve WAL as the user asked for it */
74 
75  /* Write this slot to disk */
78 
80  nulls[1] = false;
81  }
82  else
83  {
84  nulls[1] = true;
85  }
86 
87  tuple = heap_form_tuple(tupdesc, values, nulls);
88  result = HeapTupleGetDatum(tuple);
89 
91 
92  PG_RETURN_DATUM(result);
93 }
94 
95 
96 /*
97  * SQL function for creating a new logical replication slot.
98  */
99 Datum
101 {
102  Name name = PG_GETARG_NAME(0);
104  bool temporary = PG_GETARG_BOOL(2);
105 
106  LogicalDecodingContext *ctx = NULL;
107 
108  TupleDesc tupdesc;
109  HeapTuple tuple;
110  Datum result;
111  Datum values[2];
112  bool nulls[2];
113 
115 
116  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
117  elog(ERROR, "return type must be a row type");
118 
120 
122 
123  /*
124  * Acquire a logical decoding slot, this will check for conflicting names.
125  * Initially create persistent slot as ephemeral - that allows us to
126  * nicely handle errors during initialization because it'll get dropped if
127  * this transaction fails. We'll make it persistent at the end. Temporary
128  * slots can be created as temporary from beginning as they get dropped on
129  * error as well.
130  */
131  ReplicationSlotCreate(NameStr(*name), true,
132  temporary ? RS_TEMPORARY : RS_EPHEMERAL);
133 
134  /*
135  * Create logical decoding context, to build the initial snapshot.
136  */
137  ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
138  false, /* do not build snapshot */
139  logical_read_local_xlog_page, NULL, NULL,
140  NULL);
141 
142  /* build initial snapshot, might take a while */
144 
147 
148  /* don't need the decoding context anymore */
149  FreeDecodingContext(ctx);
150 
151  memset(nulls, 0, sizeof(nulls));
152 
153  tuple = heap_form_tuple(tupdesc, values, nulls);
154  result = HeapTupleGetDatum(tuple);
155 
156  /* ok, slot is now fully created, mark it as persistent if needed */
157  if (!temporary)
160 
161  PG_RETURN_DATUM(result);
162 }
163 
164 
165 /*
166  * SQL function for dropping a replication slot.
167  */
168 Datum
170 {
171  Name name = PG_GETARG_NAME(0);
172 
174 
176 
177  ReplicationSlotDrop(NameStr(*name), true);
178 
179  PG_RETURN_VOID();
180 }
181 
182 /*
183  * pg_get_replication_slots - SQL SRF showing active replication slots.
184  */
185 Datum
187 {
188 #define PG_GET_REPLICATION_SLOTS_COLS 11
189  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
190  TupleDesc tupdesc;
191  Tuplestorestate *tupstore;
192  MemoryContext per_query_ctx;
193  MemoryContext oldcontext;
194  int slotno;
195 
196  /* check to see if caller supports us returning a tuplestore */
197  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
198  ereport(ERROR,
199  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
200  errmsg("set-valued function called in context that cannot accept a set")));
201  if (!(rsinfo->allowedModes & SFRM_Materialize))
202  ereport(ERROR,
203  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
204  errmsg("materialize mode required, but it is not " \
205  "allowed in this context")));
206 
207  /* Build a tuple descriptor for our result type */
208  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
209  elog(ERROR, "return type must be a row type");
210 
211  /*
212  * We don't require any special permission to see this function's data
213  * because nothing should be sensitive. The most critical being the slot
214  * name, which shouldn't contain anything particularly sensitive.
215  */
216 
217  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
218  oldcontext = MemoryContextSwitchTo(per_query_ctx);
219 
220  tupstore = tuplestore_begin_heap(true, false, work_mem);
221  rsinfo->returnMode = SFRM_Materialize;
222  rsinfo->setResult = tupstore;
223  rsinfo->setDesc = tupdesc;
224 
225  MemoryContextSwitchTo(oldcontext);
226 
227  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
228  for (slotno = 0; slotno < max_replication_slots; slotno++)
229  {
232  bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
233 
234  ReplicationSlotPersistency persistency;
235  TransactionId xmin;
236  TransactionId catalog_xmin;
237  XLogRecPtr restart_lsn;
238  XLogRecPtr confirmed_flush_lsn;
239  pid_t active_pid;
240  Oid database;
241  NameData slot_name;
243  int i;
244 
245  if (!slot->in_use)
246  continue;
247 
248  SpinLockAcquire(&slot->mutex);
249 
250  xmin = slot->data.xmin;
251  catalog_xmin = slot->data.catalog_xmin;
252  database = slot->data.database;
253  restart_lsn = slot->data.restart_lsn;
254  confirmed_flush_lsn = slot->data.confirmed_flush;
255  namecpy(&slot_name, &slot->data.name);
256  namecpy(&plugin, &slot->data.plugin);
257  active_pid = slot->active_pid;
258  persistency = slot->data.persistency;
259 
260  SpinLockRelease(&slot->mutex);
261 
262  memset(nulls, 0, sizeof(nulls));
263 
264  i = 0;
265  values[i++] = NameGetDatum(&slot_name);
266 
267  if (database == InvalidOid)
268  nulls[i++] = true;
269  else
270  values[i++] = NameGetDatum(&plugin);
271 
272  if (database == InvalidOid)
273  values[i++] = CStringGetTextDatum("physical");
274  else
275  values[i++] = CStringGetTextDatum("logical");
276 
277  if (database == InvalidOid)
278  nulls[i++] = true;
279  else
280  values[i++] = database;
281 
282  values[i++] = BoolGetDatum(persistency == RS_TEMPORARY);
283  values[i++] = BoolGetDatum(active_pid != 0);
284 
285  if (active_pid != 0)
286  values[i++] = Int32GetDatum(active_pid);
287  else
288  nulls[i++] = true;
289 
290  if (xmin != InvalidTransactionId)
291  values[i++] = TransactionIdGetDatum(xmin);
292  else
293  nulls[i++] = true;
294 
295  if (catalog_xmin != InvalidTransactionId)
296  values[i++] = TransactionIdGetDatum(catalog_xmin);
297  else
298  nulls[i++] = true;
299 
300  if (restart_lsn != InvalidXLogRecPtr)
301  values[i++] = LSNGetDatum(restart_lsn);
302  else
303  nulls[i++] = true;
304 
305  if (confirmed_flush_lsn != InvalidXLogRecPtr)
306  values[i++] = LSNGetDatum(confirmed_flush_lsn);
307  else
308  nulls[i++] = true;
309 
310  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
311  }
312  LWLockRelease(ReplicationSlotControlLock);
313 
314  tuplestore_donestoring(tupstore);
315 
316  return (Datum) 0;
317 }
318 
319 /*
320  * Helper function for advancing physical replication slot forward.
321  */
322 static XLogRecPtr
324 {
325  XLogRecPtr retlsn = InvalidXLogRecPtr;
326 
328  if (MyReplicationSlot->data.restart_lsn < moveto)
329  {
331  retlsn = moveto;
332  }
334 
335  return retlsn;
336 }
337 
338 /*
339  * Helper function for advancing logical replication slot forward.
340  */
341 static XLogRecPtr
343 {
345  ResourceOwner old_resowner = CurrentResourceOwner;
346  XLogRecPtr retlsn = InvalidXLogRecPtr;
347 
348  PG_TRY();
349  {
350  /* restart at slot's confirmed_flush */
352  NIL,
353  true,
355  NULL, NULL, NULL);
356 
358  "logical decoding");
359 
360  /* invalidate non-timetravel entries */
362 
363  /* Decode until we run out of records */
364  while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) ||
365  (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < moveto))
366  {
367  XLogRecord *record;
368  char *errm = NULL;
369 
370  record = XLogReadRecord(ctx->reader, startlsn, &errm);
371  if (errm)
372  elog(ERROR, "%s", errm);
373 
374  /*
375  * Now that we've set up the xlog reader state, subsequent calls
376  * pass InvalidXLogRecPtr to say "continue from last record"
377  */
378  startlsn = InvalidXLogRecPtr;
379 
380  /*
381  * The {begin_txn,change,commit_txn}_wrapper callbacks above will
382  * store the description into our tuplestore.
383  */
384  if (record != NULL)
386 
387  /* check limits */
388  if (moveto <= ctx->reader->EndRecPtr)
389  break;
390 
392  }
393 
394  CurrentResourceOwner = old_resowner;
395 
396  if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
397  {
399 
400  /*
401  * If only the confirmed_flush_lsn has changed the slot won't get
402  * marked as dirty by the above. Callers on the walsender
403  * interface are expected to keep track of their own progress and
404  * don't need it written out. But SQL-interface users cannot
405  * specify their own start positions and it's harder for them to
406  * keep track of their progress, so we should make more of an
407  * effort to save it for them.
408  *
409  * Dirty the slot so it's written out at the next checkpoint.
410  * We'll still lose its position on crash, as documented, but it's
411  * better than always losing the position even on clean restart.
412  */
414  }
415 
417 
418  /* free context, call shutdown callback */
419  FreeDecodingContext(ctx);
420 
422  }
423  PG_CATCH();
424  {
425  /* clear all timetravel entries */
427 
428  PG_RE_THROW();
429  }
430  PG_END_TRY();
431 
432  return retlsn;
433 }
434 
435 /*
436  * SQL function for moving the position in a replication slot.
437  */
438 Datum
440 {
441  Name slotname = PG_GETARG_NAME(0);
442  XLogRecPtr moveto = PG_GETARG_LSN(1);
443  XLogRecPtr endlsn;
444  XLogRecPtr startlsn;
445  TupleDesc tupdesc;
446  Datum values[2];
447  bool nulls[2];
448  HeapTuple tuple;
449  Datum result;
450 
452 
454 
455  if (XLogRecPtrIsInvalid(moveto))
456  ereport(ERROR,
457  (errmsg("invalid target wal lsn")));
458 
459  /* Build a tuple descriptor for our result type */
460  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
461  elog(ERROR, "return type must be a row type");
462 
463  /*
464  * We can't move slot past what's been flushed/replayed so clamp the
465  * target possition accordingly.
466  */
467  if (!RecoveryInProgress())
468  moveto = Min(moveto, GetFlushRecPtr());
469  else
470  moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
471 
472  /* Acquire the slot so we "own" it */
473  ReplicationSlotAcquire(NameStr(*slotname), true);
474 
476  if (moveto < startlsn)
477  {
479  ereport(ERROR,
480  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
481  errmsg("cannot move slot to %X/%X, minimum is %X/%X",
482  (uint32) (moveto >> 32), (uint32) moveto,
485  }
486 
488  endlsn = pg_logical_replication_slot_advance(startlsn, moveto);
489  else
490  endlsn = pg_physical_replication_slot_advance(startlsn, moveto);
491 
492  values[0] = NameGetDatum(&MyReplicationSlot->data.name);
493  nulls[0] = false;
494 
495  /* Update the on disk state when lsn was updated. */
496  if (XLogRecPtrIsInvalid(endlsn))
497  {
502  }
503 
505 
506  /* Return the reached position. */
507  values[1] = LSNGetDatum(endlsn);
508  nulls[1] = false;
509 
510  tuple = heap_form_tuple(tupdesc, values, nulls);
511  result = HeapTupleGetDatum(tuple);
512 
513  PG_RETURN_DATUM(result);
514 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
int namecpy(Name n1, Name n2)
Definition: name.c:191
#define NIL
Definition: pg_list.h:69
static const char * plugin
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:93
void CheckSlotRequirements(void)
Definition: slot.c:965
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:563
#define NameGetDatum(X)
Definition: postgres.h:601
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
#define PG_GET_REPLICATION_SLOTS_COLS
uint32 TransactionId
Definition: c.h:455
Datum pg_get_replication_slots(PG_FUNCTION_ARGS)
Definition: slotfuncs.c:186
int logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: logicalfuncs.c:118
Oid GetUserId(void)
Definition: miscinit.c:284
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
Datum pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
Definition: slotfuncs.c:100
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:330
#define Min(x, y)
Definition: c.h:826
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:221
ReplicationSlotPersistency persistency
Definition: slot.h:54
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool superuser(void)
Definition: superuser.c:47
void ReplicationSlotSave(void)
Definition: slot.c:638
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8254
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:220
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:695
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:239
ReplicationSlotPersistentData data
Definition: slot.h:120
unsigned int Oid
Definition: postgres_ext.h:31
bool RecoveryInProgress(void)
Definition: xlog.c:7922
#define OidIsValid(objectId)
Definition: c.h:586
void InvalidateSystemCaches(void)
Definition: inval.c:641
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:195
XLogRecPtr confirmed_flush
Definition: slot.h:81
XLogRecPtr EndRecPtr
Definition: xlogreader.h:120
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1724
Datum pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
Definition: slotfuncs.c:43
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ReplicationSlotReserveWal(void)
Definition: slot.c:985
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:438
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:741
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:96
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11158
void ReplicationSlotPersist(void)
Definition: slot.c:673
Definition: c.h:551
Datum pg_replication_slot_advance(PG_FUNCTION_ARGS)
Definition: slotfuncs.c:439
TransactionId catalog_xmin
Definition: slot.h:70
#define InvalidTransactionId
Definition: transam.h:31
unsigned int uint32
Definition: c.h:306
void ReplicationSlotRelease(void)
Definition: slot.c:416
TransactionId xmin
Definition: slot.h:62
static void check_permissions(void)
Definition: slotfuncs.c:30
#define ereport(elevel, rest)
Definition: elog.h:122
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
bool in_use
Definition: slot.h:96
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
#define TransactionIdGetDatum(X)
Definition: postgres.h:527
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:24
uintptr_t Datum
Definition: postgres.h:372
Datum pg_drop_replication_slot(PG_FUNCTION_ARGS)
Definition: slotfuncs.c:169
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:313
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:348
int work_mem
Definition: globals.c:113
#define BoolGetDatum(X)
Definition: postgres.h:408
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:181
int allowedModes
Definition: execnodes.h:279
#define PG_RETURN_VOID()
Definition: fmgr.h:309
SetFunctionReturnMode returnMode
Definition: execnodes.h:281
#define PG_CATCH()
Definition: elog.h:293
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
int max_replication_slots
Definition: slot.c:99
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:680
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:481
XLogRecPtr restart_lsn
Definition: slot.h:73
bool has_rolreplication(Oid roleid)
Definition: miscinit.c:465
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1120
#define PG_RE_THROW()
Definition: elog.h:314
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:230
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:934
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:213
const char * name
Definition: encode.c:521
Tuplestorestate * setResult
Definition: execnodes.h:284
static Datum values[MAXATTR]
Definition: bootstrap.c:164
ExprContext * econtext
Definition: execnodes.h:277
#define Int32GetDatum(X)
Definition: postgres.h:485
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:509
TupleDesc setDesc
Definition: execnodes.h:285
static XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
Definition: slotfuncs.c:342
ReplicationSlotPersistency
Definition: slot.h:33
int errmsg(const char *fmt,...)
Definition: elog.c:797
XLogReaderState * reader
Definition: logical.h:44
pid_t active_pid
Definition: slot.h:99
int i
#define NameStr(name)
Definition: c.h:557
#define CStringGetTextDatum(s)
Definition: builtins.h:91
#define PG_FUNCTION_ARGS
Definition: fmgr.h:158
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
ReplicationSlot replication_slots[1]
Definition: slot.h:153
#define elog
Definition: elog.h:219
slock_t mutex
Definition: slot.h:93
static XLogRecPtr pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
Definition: slotfuncs.c:323
#define PG_TRY()
Definition: elog.h:284
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:76
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:695
#define PG_END_TRY()
Definition: elog.h:300
void ReplicationSlotMarkDirty(void)
Definition: slot.c:656
#define PG_GETARG_NAME(n)
Definition: fmgr.h:243
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416