PostgreSQL Source Code  git master
slotfuncs.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * slotfuncs.c
4  * Support functions for replication slots
5  *
6  * Copyright (c) 2012-2018, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/slotfuncs.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 
14 #include "postgres.h"
15 
16 #include "funcapi.h"
17 #include "miscadmin.h"
18 
19 #include "access/htup_details.h"
20 #include "replication/decode.h"
21 #include "replication/slot.h"
22 #include "replication/logical.h"
24 #include "utils/builtins.h"
25 #include "utils/inval.h"
26 #include "utils/pg_lsn.h"
27 #include "utils/resowner.h"
28 
29 static void
31 {
33  ereport(ERROR,
34  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
35  (errmsg("must be superuser or replication role to use replication slots"))));
36 }
37 
38 /*
39  * SQL function for creating a new physical (streaming replication)
40  * replication slot.
41  */
42 Datum
44 {
46  bool immediately_reserve = PG_GETARG_BOOL(1);
47  bool temporary = PG_GETARG_BOOL(2);
48  Datum values[2];
49  bool nulls[2];
50  TupleDesc tupdesc;
51  HeapTuple tuple;
52  Datum result;
53 
55 
56  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
57  elog(ERROR, "return type must be a row type");
58 
60 
62 
63  /* acquire replication slot, this will check for conflicting names */
64  ReplicationSlotCreate(NameStr(*name), false,
65  temporary ? RS_TEMPORARY : RS_PERSISTENT);
66 
67  values[0] = NameGetDatum(&MyReplicationSlot->data.name);
68  nulls[0] = false;
69 
70  if (immediately_reserve)
71  {
72  /* Reserve WAL as the user asked for it */
74 
75  /* Write this slot to disk */
78 
80  nulls[1] = false;
81  }
82  else
83  {
84  nulls[1] = true;
85  }
86 
87  tuple = heap_form_tuple(tupdesc, values, nulls);
88  result = HeapTupleGetDatum(tuple);
89 
91 
92  PG_RETURN_DATUM(result);
93 }
94 
95 
96 /*
97  * SQL function for creating a new logical replication slot.
98  */
99 Datum
101 {
102  Name name = PG_GETARG_NAME(0);
104  bool temporary = PG_GETARG_BOOL(2);
105 
106  LogicalDecodingContext *ctx = NULL;
107 
108  TupleDesc tupdesc;
109  HeapTuple tuple;
110  Datum result;
111  Datum values[2];
112  bool nulls[2];
113 
115 
116  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
117  elog(ERROR, "return type must be a row type");
118 
120 
122 
123  /*
124  * Acquire a logical decoding slot, this will check for conflicting names.
125  * Initially create persistent slot as ephemeral - that allows us to
126  * nicely handle errors during initialization because it'll get dropped if
127  * this transaction fails. We'll make it persistent at the end. Temporary
128  * slots can be created as temporary from beginning as they get dropped on
129  * error as well.
130  */
131  ReplicationSlotCreate(NameStr(*name), true,
132  temporary ? RS_TEMPORARY : RS_EPHEMERAL);
133 
134  /*
135  * Create logical decoding context, to build the initial snapshot.
136  */
137  ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
138  false, /* do not build snapshot */
139  logical_read_local_xlog_page, NULL, NULL,
140  NULL);
141 
142  /* build initial snapshot, might take a while */
144 
147 
148  /* don't need the decoding context anymore */
149  FreeDecodingContext(ctx);
150 
151  memset(nulls, 0, sizeof(nulls));
152 
153  tuple = heap_form_tuple(tupdesc, values, nulls);
154  result = HeapTupleGetDatum(tuple);
155 
156  /* ok, slot is now fully created, mark it as persistent if needed */
157  if (!temporary)
160 
161  PG_RETURN_DATUM(result);
162 }
163 
164 
165 /*
166  * SQL function for dropping a replication slot.
167  */
168 Datum
170 {
171  Name name = PG_GETARG_NAME(0);
172 
174 
176 
177  ReplicationSlotDrop(NameStr(*name), true);
178 
179  PG_RETURN_VOID();
180 }
181 
182 /*
183  * pg_get_replication_slots - SQL SRF showing active replication slots.
184  */
185 Datum
187 {
188 #define PG_GET_REPLICATION_SLOTS_COLS 11
189  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
190  TupleDesc tupdesc;
191  Tuplestorestate *tupstore;
192  MemoryContext per_query_ctx;
193  MemoryContext oldcontext;
194  int slotno;
195 
196  /* check to see if caller supports us returning a tuplestore */
197  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
198  ereport(ERROR,
199  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
200  errmsg("set-valued function called in context that cannot accept a set")));
201  if (!(rsinfo->allowedModes & SFRM_Materialize))
202  ereport(ERROR,
203  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
204  errmsg("materialize mode required, but it is not " \
205  "allowed in this context")));
206 
207  /* Build a tuple descriptor for our result type */
208  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
209  elog(ERROR, "return type must be a row type");
210 
211  /*
212  * We don't require any special permission to see this function's data
213  * because nothing should be sensitive. The most critical being the slot
214  * name, which shouldn't contain anything particularly sensitive.
215  */
216 
217  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
218  oldcontext = MemoryContextSwitchTo(per_query_ctx);
219 
220  tupstore = tuplestore_begin_heap(true, false, work_mem);
221  rsinfo->returnMode = SFRM_Materialize;
222  rsinfo->setResult = tupstore;
223  rsinfo->setDesc = tupdesc;
224 
225  MemoryContextSwitchTo(oldcontext);
226 
227  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
228  for (slotno = 0; slotno < max_replication_slots; slotno++)
229  {
232  bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
233 
234  ReplicationSlotPersistency persistency;
235  TransactionId xmin;
236  TransactionId catalog_xmin;
237  XLogRecPtr restart_lsn;
238  XLogRecPtr confirmed_flush_lsn;
239  pid_t active_pid;
240  Oid database;
241  NameData slot_name;
243  int i;
244 
245  if (!slot->in_use)
246  continue;
247 
248  SpinLockAcquire(&slot->mutex);
249 
250  xmin = slot->data.xmin;
251  catalog_xmin = slot->data.catalog_xmin;
252  database = slot->data.database;
253  restart_lsn = slot->data.restart_lsn;
254  confirmed_flush_lsn = slot->data.confirmed_flush;
255  namecpy(&slot_name, &slot->data.name);
256  namecpy(&plugin, &slot->data.plugin);
257  active_pid = slot->active_pid;
258  persistency = slot->data.persistency;
259 
260  SpinLockRelease(&slot->mutex);
261 
262  memset(nulls, 0, sizeof(nulls));
263 
264  i = 0;
265  values[i++] = NameGetDatum(&slot_name);
266 
267  if (database == InvalidOid)
268  nulls[i++] = true;
269  else
270  values[i++] = NameGetDatum(&plugin);
271 
272  if (database == InvalidOid)
273  values[i++] = CStringGetTextDatum("physical");
274  else
275  values[i++] = CStringGetTextDatum("logical");
276 
277  if (database == InvalidOid)
278  nulls[i++] = true;
279  else
280  values[i++] = database;
281 
282  values[i++] = BoolGetDatum(persistency == RS_TEMPORARY);
283  values[i++] = BoolGetDatum(active_pid != 0);
284 
285  if (active_pid != 0)
286  values[i++] = Int32GetDatum(active_pid);
287  else
288  nulls[i++] = true;
289 
290  if (xmin != InvalidTransactionId)
291  values[i++] = TransactionIdGetDatum(xmin);
292  else
293  nulls[i++] = true;
294 
295  if (catalog_xmin != InvalidTransactionId)
296  values[i++] = TransactionIdGetDatum(catalog_xmin);
297  else
298  nulls[i++] = true;
299 
300  if (restart_lsn != InvalidXLogRecPtr)
301  values[i++] = LSNGetDatum(restart_lsn);
302  else
303  nulls[i++] = true;
304 
305  if (confirmed_flush_lsn != InvalidXLogRecPtr)
306  values[i++] = LSNGetDatum(confirmed_flush_lsn);
307  else
308  nulls[i++] = true;
309 
310  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
311  }
312  LWLockRelease(ReplicationSlotControlLock);
313 
314  tuplestore_donestoring(tupstore);
315 
316  return (Datum) 0;
317 }
318 
319 /*
320  * Helper function for advancing physical replication slot forward.
321  * The LSN position to move to is compared simply to the slot's
322  * restart_lsn, knowing that any position older than that would be
323  * removed by successive checkpoints.
324  */
325 static XLogRecPtr
327 {
329  XLogRecPtr retlsn = startlsn;
330 
331  if (startlsn < moveto)
332  {
336  retlsn = moveto;
337  }
338 
339  return retlsn;
340 }
341 
342 /*
343  * Helper function for advancing logical replication slot forward.
344  * The slot's restart_lsn is used as start point for reading records,
345  * while confirmed_lsn is used as base point for the decoding context.
346  * The LSN position to move to is checked by doing a per-record scan and
347  * logical decoding which makes sure that confirmed_lsn is updated to a
348  * LSN which allows the future slot consumer to get consistent logical
349  * changes.
350  */
351 static XLogRecPtr
353 {
355  ResourceOwner old_resowner = CurrentResourceOwner;
358 
359  PG_TRY();
360  {
361  /* restart at slot's confirmed_flush */
363  NIL,
364  true,
366  NULL, NULL, NULL);
367 
369  "logical decoding");
370 
371  /* invalidate non-timetravel entries */
373 
374  /* Decode until we run out of records */
375  while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) ||
376  (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < moveto))
377  {
378  XLogRecord *record;
379  char *errm = NULL;
380 
381  record = XLogReadRecord(ctx->reader, startlsn, &errm);
382  if (errm)
383  elog(ERROR, "%s", errm);
384 
385  /*
386  * Now that we've set up the xlog reader state, subsequent calls
387  * pass InvalidXLogRecPtr to say "continue from last record"
388  */
389  startlsn = InvalidXLogRecPtr;
390 
391  /*
392  * The {begin_txn,change,commit_txn}_wrapper callbacks above will
393  * store the description into our tuplestore.
394  */
395  if (record != NULL)
397 
398  /* Stop once the moving point wanted by caller has been reached */
399  if (moveto <= ctx->reader->EndRecPtr)
400  break;
401 
403  }
404 
405  CurrentResourceOwner = old_resowner;
406 
407  if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
408  {
410 
411  /*
412  * If only the confirmed_flush_lsn has changed the slot won't get
413  * marked as dirty by the above. Callers on the walsender
414  * interface are expected to keep track of their own progress and
415  * don't need it written out. But SQL-interface users cannot
416  * specify their own start positions and it's harder for them to
417  * keep track of their progress, so we should make more of an
418  * effort to save it for them.
419  *
420  * Dirty the slot so it's written out at the next checkpoint.
421  * We'll still lose its position on crash, as documented, but it's
422  * better than always losing the position even on clean restart.
423  */
425  }
426 
428 
429  /* free context, call shutdown callback */
430  FreeDecodingContext(ctx);
431 
433  }
434  PG_CATCH();
435  {
436  /* clear all timetravel entries */
438 
439  PG_RE_THROW();
440  }
441  PG_END_TRY();
442 
443  return retlsn;
444 }
445 
446 /*
447  * SQL function for moving the position in a replication slot.
448  */
449 Datum
451 {
452  Name slotname = PG_GETARG_NAME(0);
453  XLogRecPtr moveto = PG_GETARG_LSN(1);
454  XLogRecPtr endlsn;
455  XLogRecPtr minlsn;
456  TupleDesc tupdesc;
457  Datum values[2];
458  bool nulls[2];
459  HeapTuple tuple;
460  Datum result;
461 
463 
465 
466  if (XLogRecPtrIsInvalid(moveto))
467  ereport(ERROR,
468  (errmsg("invalid target wal lsn")));
469 
470  /* Build a tuple descriptor for our result type */
471  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
472  elog(ERROR, "return type must be a row type");
473 
474  /*
475  * We can't move slot past what's been flushed/replayed so clamp the
476  * target position accordingly.
477  */
478  if (!RecoveryInProgress())
479  moveto = Min(moveto, GetFlushRecPtr());
480  else
481  moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
482 
483  /* Acquire the slot so we "own" it */
484  ReplicationSlotAcquire(NameStr(*slotname), true);
485 
486  /*
487  * Check if the slot is not moving backwards. Physical slots rely simply
488  * on restart_lsn as a minimum point, while logical slots have confirmed
489  * consumption up to confirmed_lsn, meaning that in both cases data older
490  * than that is not available anymore.
491  */
494  else
496 
497  if (moveto < minlsn)
498  {
500  ereport(ERROR,
501  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
502  errmsg("cannot move slot to %X/%X, minimum is %X/%X",
503  (uint32) (moveto >> 32), (uint32) moveto,
504  (uint32) (minlsn >> 32), (uint32) minlsn)));
505  }
506 
507  /* Do the actual slot update, depending on the slot type */
509  endlsn = pg_logical_replication_slot_advance(moveto);
510  else
511  endlsn = pg_physical_replication_slot_advance(moveto);
512 
513  values[0] = NameGetDatum(&MyReplicationSlot->data.name);
514  nulls[0] = false;
515 
516  /* Update the on disk state when lsn was updated. */
517  if (XLogRecPtrIsInvalid(endlsn))
518  {
523  }
524 
526 
527  /* Return the reached position. */
528  values[1] = LSNGetDatum(endlsn);
529  nulls[1] = false;
530 
531  tuple = heap_form_tuple(tupdesc, values, nulls);
532  result = HeapTupleGetDatum(tuple);
533 
534  PG_RETURN_DATUM(result);
535 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr moveto)
Definition: slotfuncs.c:352
int namecpy(Name n1, Name n2)
Definition: name.c:191
#define NIL
Definition: pg_list.h:69
static const char * plugin
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:93
void CheckSlotRequirements(void)
Definition: slot.c:965
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:567
#define NameGetDatum(X)
Definition: postgres.h:580
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
#define PG_GET_REPLICATION_SLOTS_COLS
uint32 TransactionId
Definition: c.h:474
Datum pg_get_replication_slots(PG_FUNCTION_ARGS)
Definition: slotfuncs.c:186
int logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: logicalfuncs.c:118
Oid GetUserId(void)
Definition: miscinit.c:379
ResourceOwner CurrentResourceOwner
Definition: resowner.c:140
Datum pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
Definition: slotfuncs.c:100
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:330
#define Min(x, y)
Definition: c.h:857
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:221
ReplicationSlotPersistency persistency
Definition: slot.h:54
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool superuser(void)
Definition: superuser.c:47
void ReplicationSlotSave(void)
Definition: slot.c:638
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8281
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:223
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1074
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:244
ReplicationSlotPersistentData data
Definition: slot.h:133
unsigned int Oid
Definition: postgres_ext.h:31
bool RecoveryInProgress(void)
Definition: xlog.c:7949
#define OidIsValid(objectId)
Definition: c.h:605
void InvalidateSystemCaches(void)
Definition: inval.c:641
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:216
XLogRecPtr confirmed_flush
Definition: slot.h:81
XLogRecPtr EndRecPtr
Definition: xlogreader.h:120
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1725
Datum pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
Definition: slotfuncs.c:43
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ReplicationSlotReserveWal(void)
Definition: slot.c:985
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:447
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:741
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:97
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11194
void ReplicationSlotPersist(void)
Definition: slot.c:673
Definition: c.h:570
Datum pg_replication_slot_advance(PG_FUNCTION_ARGS)
Definition: slotfuncs.c:450
TransactionId catalog_xmin
Definition: slot.h:70
#define InvalidTransactionId
Definition: transam.h:31
unsigned int uint32
Definition: c.h:325
void ReplicationSlotRelease(void)
Definition: slot.c:416
TransactionId xmin
Definition: slot.h:62
static void check_permissions(void)
Definition: slotfuncs.c:30
#define ereport(elevel, rest)
Definition: elog.h:122
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
bool in_use
Definition: slot.h:109
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
#define TransactionIdGetDatum(X)
Definition: postgres.h:506
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:24
uintptr_t Datum
Definition: postgres.h:367
Datum pg_drop_replication_slot(PG_FUNCTION_ARGS)
Definition: slotfuncs.c:169
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:318
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:355
int work_mem
Definition: globals.c:120
#define BoolGetDatum(X)
Definition: postgres.h:387
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:181
int allowedModes
Definition: execnodes.h:297
#define PG_RETURN_VOID()
Definition: fmgr.h:314
SetFunctionReturnMode returnMode
Definition: execnodes.h:299
#define PG_CATCH()
Definition: elog.h:293
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
int max_replication_slots
Definition: slot.c:99
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:699
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:493
XLogRecPtr restart_lsn
Definition: slot.h:73
bool has_rolreplication(Oid roleid)
Definition: miscinit.c:560
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1121
#define PG_RE_THROW()
Definition: elog.h:314
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:231
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:986
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:225
const char * name
Definition: encode.c:521
Tuplestorestate * setResult
Definition: execnodes.h:302
static Datum values[MAXATTR]
Definition: bootstrap.c:164
ExprContext * econtext
Definition: execnodes.h:295
#define Int32GetDatum(X)
Definition: postgres.h:464
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:509
TupleDesc setDesc
Definition: execnodes.h:303
ReplicationSlotPersistency
Definition: slot.h:33
int errmsg(const char *fmt,...)
Definition: elog.c:797
XLogReaderState * reader
Definition: logical.h:44
pid_t active_pid
Definition: slot.h:112
int i
#define NameStr(name)
Definition: c.h:576
#define CStringGetTextDatum(s)
Definition: builtins.h:95
#define PG_FUNCTION_ARGS
Definition: fmgr.h:163
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
ReplicationSlot replication_slots[1]
Definition: slot.h:166
#define elog
Definition: elog.h:219
static XLogRecPtr pg_physical_replication_slot_advance(XLogRecPtr moveto)
Definition: slotfuncs.c:326
slock_t mutex
Definition: slot.h:106
#define PG_TRY()
Definition: elog.h:284
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:78
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:695
#define PG_END_TRY()
Definition: elog.h:300
void ReplicationSlotMarkDirty(void)
Definition: slot.c:656
#define PG_GETARG_NAME(n)
Definition: fmgr.h:248
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:418