PostgreSQL Source Code  git master
slotfuncs.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * slotfuncs.c
4  * Support functions for replication slots
5  *
6  * Copyright (c) 2012-2021, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/slotfuncs.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/htup_details.h"
16 #include "access/xlog_internal.h"
17 #include "access/xlogutils.h"
18 #include "funcapi.h"
19 #include "miscadmin.h"
20 #include "replication/decode.h"
21 #include "replication/logical.h"
22 #include "replication/slot.h"
23 #include "utils/builtins.h"
24 #include "utils/inval.h"
25 #include "utils/pg_lsn.h"
26 #include "utils/resowner.h"
27 
28 static void
30 {
32  ereport(ERROR,
33  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
34  errmsg("must be superuser or replication role to use replication slots")));
35 }
36 
37 /*
38  * Helper function for creating a new physical replication slot with
39  * given arguments. Note that this function doesn't release the created
40  * slot.
41  *
42  * If restart_lsn is a valid value, we use it without WAL reservation
43  * routine. So the caller must guarantee that WAL is available.
44  */
45 static void
46 create_physical_replication_slot(char *name, bool immediately_reserve,
47  bool temporary, XLogRecPtr restart_lsn)
48 {
50 
51  /* acquire replication slot, this will check for conflicting names */
52  ReplicationSlotCreate(name, false,
53  temporary ? RS_TEMPORARY : RS_PERSISTENT, false);
54 
55  if (immediately_reserve)
56  {
57  /* Reserve WAL as the user asked for it */
58  if (XLogRecPtrIsInvalid(restart_lsn))
60  else
61  MyReplicationSlot->data.restart_lsn = restart_lsn;
62 
63  /* Write this slot to disk */
66  }
67 }
68 
69 /*
70  * SQL function for creating a new physical (streaming replication)
71  * replication slot.
72  */
73 Datum
75 {
77  bool immediately_reserve = PG_GETARG_BOOL(1);
78  bool temporary = PG_GETARG_BOOL(2);
79  Datum values[2];
80  bool nulls[2];
81  TupleDesc tupdesc;
82  HeapTuple tuple;
83  Datum result;
84 
85  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
86  elog(ERROR, "return type must be a row type");
87 
89 
91 
93  immediately_reserve,
94  temporary,
96 
97  values[0] = NameGetDatum(&MyReplicationSlot->data.name);
98  nulls[0] = false;
99 
100  if (immediately_reserve)
101  {
103  nulls[1] = false;
104  }
105  else
106  nulls[1] = true;
107 
108  tuple = heap_form_tuple(tupdesc, values, nulls);
109  result = HeapTupleGetDatum(tuple);
110 
112 
113  PG_RETURN_DATUM(result);
114 }
115 
116 
117 /*
118  * Helper function for creating a new logical replication slot with
119  * given arguments. Note that this function doesn't release the created
120  * slot.
121  *
122  * When find_startpoint is false, the slot's confirmed_flush is not set; it's
123  * caller's responsibility to ensure it's set to something sensible.
124  */
125 static void
127  bool temporary, bool two_phase,
128  XLogRecPtr restart_lsn,
129  bool find_startpoint)
130 {
131  LogicalDecodingContext *ctx = NULL;
132 
134 
135  /*
136  * Acquire a logical decoding slot, this will check for conflicting names.
137  * Initially create persistent slot as ephemeral - that allows us to
138  * nicely handle errors during initialization because it'll get dropped if
139  * this transaction fails. We'll make it persistent at the end. Temporary
140  * slots can be created as temporary from beginning as they get dropped on
141  * error as well.
142  */
143  ReplicationSlotCreate(name, true,
144  temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase);
145 
146  /*
147  * Create logical decoding context to find start point or, if we don't
148  * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
149  *
150  * Note: when !find_startpoint this is still important, because it's at
151  * this point that the output plugin is validated.
152  */
153  ctx = CreateInitDecodingContext(plugin, NIL,
154  false, /* just catalogs is OK */
155  restart_lsn,
158  NULL, NULL, NULL);
159 
160  /*
161  * If caller needs us to determine the decoding start point, do so now.
162  * This might take a while.
163  */
164  if (find_startpoint)
166 
167  /* don't need the decoding context anymore */
168  FreeDecodingContext(ctx);
169 }
170 
171 /*
172  * SQL function for creating a new logical replication slot.
173  */
174 Datum
176 {
177  Name name = PG_GETARG_NAME(0);
179  bool temporary = PG_GETARG_BOOL(2);
180  bool two_phase = PG_GETARG_BOOL(3);
181  Datum result;
182  TupleDesc tupdesc;
183  HeapTuple tuple;
184  Datum values[2];
185  bool nulls[2];
186 
187  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
188  elog(ERROR, "return type must be a row type");
189 
191 
193 
195  NameStr(*plugin),
196  temporary,
197  two_phase,
199  true);
200 
201  values[0] = NameGetDatum(&MyReplicationSlot->data.name);
203 
204  memset(nulls, 0, sizeof(nulls));
205 
206  tuple = heap_form_tuple(tupdesc, values, nulls);
207  result = HeapTupleGetDatum(tuple);
208 
209  /* ok, slot is now fully created, mark it as persistent if needed */
210  if (!temporary)
213 
214  PG_RETURN_DATUM(result);
215 }
216 
217 
218 /*
219  * SQL function for dropping a replication slot.
220  */
221 Datum
223 {
224  Name name = PG_GETARG_NAME(0);
225 
227 
229 
230  ReplicationSlotDrop(NameStr(*name), true);
231 
232  PG_RETURN_VOID();
233 }
234 
235 /*
236  * pg_get_replication_slots - SQL SRF showing active replication slots.
237  */
238 Datum
240 {
241 #define PG_GET_REPLICATION_SLOTS_COLS 14
242  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
243  TupleDesc tupdesc;
244  Tuplestorestate *tupstore;
245  MemoryContext per_query_ctx;
246  MemoryContext oldcontext;
247  XLogRecPtr currlsn;
248  int slotno;
249 
250  /* check to see if caller supports us returning a tuplestore */
251  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
252  ereport(ERROR,
253  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
254  errmsg("set-valued function called in context that cannot accept a set")));
255  if (!(rsinfo->allowedModes & SFRM_Materialize))
256  ereport(ERROR,
257  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
258  errmsg("materialize mode required, but it is not allowed in this context")));
259 
260  /* Build a tuple descriptor for our result type */
261  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
262  elog(ERROR, "return type must be a row type");
263 
264  /*
265  * We don't require any special permission to see this function's data
266  * because nothing should be sensitive. The most critical being the slot
267  * name, which shouldn't contain anything particularly sensitive.
268  */
269 
270  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
271  oldcontext = MemoryContextSwitchTo(per_query_ctx);
272 
273  tupstore = tuplestore_begin_heap(true, false, work_mem);
274  rsinfo->returnMode = SFRM_Materialize;
275  rsinfo->setResult = tupstore;
276  rsinfo->setDesc = tupdesc;
277 
278  MemoryContextSwitchTo(oldcontext);
279 
280  currlsn = GetXLogWriteRecPtr();
281 
282  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
283  for (slotno = 0; slotno < max_replication_slots; slotno++)
284  {
286  ReplicationSlot slot_contents;
288  bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
289  WALAvailability walstate;
290  int i;
291 
292  if (!slot->in_use)
293  continue;
294 
295  /* Copy slot contents while holding spinlock, then examine at leisure */
296  SpinLockAcquire(&slot->mutex);
297  slot_contents = *slot;
298  SpinLockRelease(&slot->mutex);
299 
300  memset(values, 0, sizeof(values));
301  memset(nulls, 0, sizeof(nulls));
302 
303  i = 0;
304  values[i++] = NameGetDatum(&slot_contents.data.name);
305 
306  if (slot_contents.data.database == InvalidOid)
307  nulls[i++] = true;
308  else
309  values[i++] = NameGetDatum(&slot_contents.data.plugin);
310 
311  if (slot_contents.data.database == InvalidOid)
312  values[i++] = CStringGetTextDatum("physical");
313  else
314  values[i++] = CStringGetTextDatum("logical");
315 
316  if (slot_contents.data.database == InvalidOid)
317  nulls[i++] = true;
318  else
319  values[i++] = ObjectIdGetDatum(slot_contents.data.database);
320 
321  values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY);
322  values[i++] = BoolGetDatum(slot_contents.active_pid != 0);
323 
324  if (slot_contents.active_pid != 0)
325  values[i++] = Int32GetDatum(slot_contents.active_pid);
326  else
327  nulls[i++] = true;
328 
329  if (slot_contents.data.xmin != InvalidTransactionId)
330  values[i++] = TransactionIdGetDatum(slot_contents.data.xmin);
331  else
332  nulls[i++] = true;
333 
334  if (slot_contents.data.catalog_xmin != InvalidTransactionId)
335  values[i++] = TransactionIdGetDatum(slot_contents.data.catalog_xmin);
336  else
337  nulls[i++] = true;
338 
339  if (slot_contents.data.restart_lsn != InvalidXLogRecPtr)
340  values[i++] = LSNGetDatum(slot_contents.data.restart_lsn);
341  else
342  nulls[i++] = true;
343 
344  if (slot_contents.data.confirmed_flush != InvalidXLogRecPtr)
345  values[i++] = LSNGetDatum(slot_contents.data.confirmed_flush);
346  else
347  nulls[i++] = true;
348 
349  /*
350  * If invalidated_at is valid and restart_lsn is invalid, we know for
351  * certain that the slot has been invalidated. Otherwise, test
352  * availability from restart_lsn.
353  */
354  if (XLogRecPtrIsInvalid(slot_contents.data.restart_lsn) &&
355  !XLogRecPtrIsInvalid(slot_contents.data.invalidated_at))
356  walstate = WALAVAIL_REMOVED;
357  else
358  walstate = GetWALAvailability(slot_contents.data.restart_lsn);
359 
360  switch (walstate)
361  {
363  nulls[i++] = true;
364  break;
365 
366  case WALAVAIL_RESERVED:
367  values[i++] = CStringGetTextDatum("reserved");
368  break;
369 
370  case WALAVAIL_EXTENDED:
371  values[i++] = CStringGetTextDatum("extended");
372  break;
373 
374  case WALAVAIL_UNRESERVED:
375  values[i++] = CStringGetTextDatum("unreserved");
376  break;
377 
378  case WALAVAIL_REMOVED:
379 
380  /*
381  * If we read the restart_lsn long enough ago, maybe that file
382  * has been removed by now. However, the walsender could have
383  * moved forward enough that it jumped to another file after
384  * we looked. If checkpointer signalled the process to
385  * termination, then it's definitely lost; but if a process is
386  * still alive, then "unreserved" seems more appropriate.
387  *
388  * If we do change it, save the state for safe_wal_size below.
389  */
390  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
391  {
392  int pid;
393 
394  SpinLockAcquire(&slot->mutex);
395  pid = slot->active_pid;
396  slot_contents.data.restart_lsn = slot->data.restart_lsn;
397  SpinLockRelease(&slot->mutex);
398  if (pid != 0)
399  {
400  values[i++] = CStringGetTextDatum("unreserved");
401  walstate = WALAVAIL_UNRESERVED;
402  break;
403  }
404  }
405  values[i++] = CStringGetTextDatum("lost");
406  break;
407  }
408 
409  /*
410  * safe_wal_size is only computed for slots that have not been lost,
411  * and only if there's a configured maximum size.
412  */
413  if (walstate == WALAVAIL_REMOVED || max_slot_wal_keep_size_mb < 0)
414  nulls[i++] = true;
415  else
416  {
417  XLogSegNo targetSeg;
418  uint64 slotKeepSegs;
419  uint64 keepSegs;
420  XLogSegNo failSeg;
421  XLogRecPtr failLSN;
422 
423  XLByteToSeg(slot_contents.data.restart_lsn, targetSeg, wal_segment_size);
424 
425  /* determine how many segments slots can be kept by slots */
427  /* ditto for wal_keep_size */
429 
430  /* if currpos reaches failLSN, we lose our segment */
431  failSeg = targetSeg + Max(slotKeepSegs, keepSegs) + 1;
432  XLogSegNoOffsetToRecPtr(failSeg, 0, wal_segment_size, failLSN);
433 
434  values[i++] = Int64GetDatum(failLSN - currlsn);
435  }
436 
437  values[i++] = BoolGetDatum(slot_contents.data.two_phase);
438 
440 
441  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
442  }
443 
444  LWLockRelease(ReplicationSlotControlLock);
445 
446  tuplestore_donestoring(tupstore);
447 
448  return (Datum) 0;
449 }
450 
451 /*
452  * Helper function for advancing our physical replication slot forward.
453  *
454  * The LSN position to move to is compared simply to the slot's restart_lsn,
455  * knowing that any position older than that would be removed by successive
456  * checkpoints.
457  */
458 static XLogRecPtr
460 {
462  XLogRecPtr retlsn = startlsn;
463 
464  Assert(moveto != InvalidXLogRecPtr);
465 
466  if (startlsn < moveto)
467  {
471  retlsn = moveto;
472 
473  /*
474  * Dirty the slot so as it is written out at the next checkpoint. Note
475  * that the LSN position advanced may still be lost in the event of a
476  * crash, but this makes the data consistent after a clean shutdown.
477  */
479  }
480 
481  return retlsn;
482 }
483 
484 /*
485  * Helper function for advancing our logical replication slot forward.
486  *
487  * The slot's restart_lsn is used as start point for reading records, while
488  * confirmed_flush is used as base point for the decoding context.
489  *
490  * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
491  * because we need to digest WAL to advance restart_lsn allowing to recycle
492  * WAL and removal of old catalog tuples. As decoding is done in fast_forward
493  * mode, no changes are generated anyway.
494  */
495 static XLogRecPtr
497 {
499  ResourceOwner old_resowner = CurrentResourceOwner;
500  XLogRecPtr retlsn;
501 
502  Assert(moveto != InvalidXLogRecPtr);
503 
504  PG_TRY();
505  {
506  /*
507  * Create our decoding context in fast_forward mode, passing start_lsn
508  * as InvalidXLogRecPtr, so that we start processing from my slot's
509  * confirmed_flush.
510  */
512  NIL,
513  true, /* fast_forward */
516  NULL, NULL, NULL);
517 
518  /*
519  * Start reading at the slot's restart_lsn, which we know to point to
520  * a valid record.
521  */
523 
524  /* invalidate non-timetravel entries */
526 
527  /* Decode at least one record, until we run out of records */
528  while (ctx->reader->EndRecPtr < moveto)
529  {
530  char *errm = NULL;
531  XLogRecord *record;
532 
533  /*
534  * Read records. No changes are generated in fast_forward mode,
535  * but snapbuilder/slot statuses are updated properly.
536  */
537  while (XLogReadRecord(ctx->reader, &record, &errm) ==
539  {
540  if (!ctx->page_read(ctx->reader))
541  break;
542  }
543 
544  if (errm)
545  elog(ERROR, "%s", errm);
546 
547  /*
548  * Process the record. Storage-level changes are ignored in
549  * fast_forward mode, but other modules (such as snapbuilder)
550  * might still have critical updates to do.
551  */
552  if (record)
554 
555  /* Stop once the requested target has been reached */
556  if (moveto <= ctx->reader->EndRecPtr)
557  break;
558 
560  }
561 
562  /*
563  * Logical decoding could have clobbered CurrentResourceOwner during
564  * transaction management, so restore the executor's value. (This is
565  * a kluge, but it's not worth cleaning up right now.)
566  */
567  CurrentResourceOwner = old_resowner;
568 
569  if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
570  {
572 
573  /*
574  * If only the confirmed_flush LSN has changed the slot won't get
575  * marked as dirty by the above. Callers on the walsender
576  * interface are expected to keep track of their own progress and
577  * don't need it written out. But SQL-interface users cannot
578  * specify their own start positions and it's harder for them to
579  * keep track of their progress, so we should make more of an
580  * effort to save it for them.
581  *
582  * Dirty the slot so it is written out at the next checkpoint. The
583  * LSN position advanced to may still be lost on a crash but this
584  * makes the data consistent after a clean shutdown.
585  */
587  }
588 
590 
591  /* free context, call shutdown callback */
592  FreeDecodingContext(ctx);
593 
595  }
596  PG_CATCH();
597  {
598  /* clear all timetravel entries */
600 
601  PG_RE_THROW();
602  }
603  PG_END_TRY();
604 
605  return retlsn;
606 }
607 
608 /*
609  * SQL function for moving the position in a replication slot.
610  */
611 Datum
613 {
614  Name slotname = PG_GETARG_NAME(0);
615  XLogRecPtr moveto = PG_GETARG_LSN(1);
616  XLogRecPtr endlsn;
617  XLogRecPtr minlsn;
618  TupleDesc tupdesc;
619  Datum values[2];
620  bool nulls[2];
621  HeapTuple tuple;
622  Datum result;
623 
625 
627 
628  if (XLogRecPtrIsInvalid(moveto))
629  ereport(ERROR,
630  (errmsg("invalid target WAL LSN")));
631 
632  /* Build a tuple descriptor for our result type */
633  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
634  elog(ERROR, "return type must be a row type");
635 
636  /*
637  * We can't move slot past what's been flushed/replayed so clamp the
638  * target position accordingly.
639  */
640  if (!RecoveryInProgress())
641  moveto = Min(moveto, GetFlushRecPtr());
642  else
643  moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
644 
645  /* Acquire the slot so we "own" it */
646  (void) ReplicationSlotAcquire(NameStr(*slotname), SAB_Error);
647 
648  /* A slot whose restart_lsn has never been reserved cannot be advanced */
650  ereport(ERROR,
651  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
652  errmsg("replication slot \"%s\" cannot be advanced",
653  NameStr(*slotname)),
654  errdetail("This slot has never previously reserved WAL, or it has been invalidated.")));
655 
656  /*
657  * Check if the slot is not moving backwards. Physical slots rely simply
658  * on restart_lsn as a minimum point, while logical slots have confirmed
659  * consumption up to confirmed_flush, meaning that in both cases data
660  * older than that is not available anymore.
661  */
664  else
666 
667  if (moveto < minlsn)
668  ereport(ERROR,
669  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
670  errmsg("cannot advance replication slot to %X/%X, minimum is %X/%X",
671  LSN_FORMAT_ARGS(moveto), LSN_FORMAT_ARGS(minlsn))));
672 
673  /* Do the actual slot update, depending on the slot type */
675  endlsn = pg_logical_replication_slot_advance(moveto);
676  else
677  endlsn = pg_physical_replication_slot_advance(moveto);
678 
679  values[0] = NameGetDatum(&MyReplicationSlot->data.name);
680  nulls[0] = false;
681 
682  /*
683  * Recompute the minimum LSN and xmin across all slots to adjust with the
684  * advancing potentially done.
685  */
688 
690 
691  /* Return the reached position. */
692  values[1] = LSNGetDatum(endlsn);
693  nulls[1] = false;
694 
695  tuple = heap_form_tuple(tupdesc, values, nulls);
696  result = HeapTupleGetDatum(tuple);
697 
698  PG_RETURN_DATUM(result);
699 }
700 
701 /*
702  * Helper function of copying a replication slot.
703  */
704 static Datum
705 copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
706 {
707  Name src_name = PG_GETARG_NAME(0);
708  Name dst_name = PG_GETARG_NAME(1);
709  ReplicationSlot *src = NULL;
710  ReplicationSlot first_slot_contents;
711  ReplicationSlot second_slot_contents;
712  XLogRecPtr src_restart_lsn;
713  bool src_islogical;
714  bool temporary;
715  char *plugin;
716  Datum values[2];
717  bool nulls[2];
718  Datum result;
719  TupleDesc tupdesc;
720  HeapTuple tuple;
721 
722  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
723  elog(ERROR, "return type must be a row type");
724 
726 
727  if (logical_slot)
729  else
731 
732  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
733 
734  /*
735  * We need to prevent the source slot's reserved WAL from being removed,
736  * but we don't want to lock that slot for very long, and it can advance
737  * in the meantime. So obtain the source slot's data, and create a new
738  * slot using its restart_lsn. Afterwards we lock the source slot again
739  * and verify that the data we copied (name, type) has not changed
740  * incompatibly. No inconvenient WAL removal can occur once the new slot
741  * is created -- but since WAL removal could have occurred before we
742  * managed to create the new slot, we advance the new slot's restart_lsn
743  * to the source slot's updated restart_lsn the second time we lock it.
744  */
745  for (int i = 0; i < max_replication_slots; i++)
746  {
748 
749  if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
750  {
751  /* Copy the slot contents while holding spinlock */
752  SpinLockAcquire(&s->mutex);
753  first_slot_contents = *s;
754  SpinLockRelease(&s->mutex);
755  src = s;
756  break;
757  }
758  }
759 
760  LWLockRelease(ReplicationSlotControlLock);
761 
762  if (src == NULL)
763  ereport(ERROR,
764  (errcode(ERRCODE_UNDEFINED_OBJECT),
765  errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
766 
767  src_islogical = SlotIsLogical(&first_slot_contents);
768  src_restart_lsn = first_slot_contents.data.restart_lsn;
769  temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
770  plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
771 
772  /* Check type of replication slot */
773  if (src_islogical != logical_slot)
774  ereport(ERROR,
775  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
776  src_islogical ?
777  errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot",
778  NameStr(*src_name)) :
779  errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot",
780  NameStr(*src_name))));
781 
782  /* Copying non-reserved slot doesn't make sense */
783  if (XLogRecPtrIsInvalid(src_restart_lsn))
784  ereport(ERROR,
785  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
786  errmsg("cannot copy a replication slot that doesn't reserve WAL")));
787 
788  /* Overwrite params from optional arguments */
789  if (PG_NARGS() >= 3)
790  temporary = PG_GETARG_BOOL(2);
791  if (PG_NARGS() >= 4)
792  {
793  Assert(logical_slot);
794  plugin = NameStr(*(PG_GETARG_NAME(3)));
795  }
796 
797  /* Create new slot and acquire it */
798  if (logical_slot)
799  {
800  /*
801  * We must not try to read WAL, since we haven't reserved it yet --
802  * hence pass find_startpoint false. confirmed_flush will be set
803  * below, by copying from the source slot.
804  */
806  plugin,
807  temporary,
808  false,
809  src_restart_lsn,
810  false);
811  }
812  else
814  true,
815  temporary,
816  src_restart_lsn);
817 
818  /*
819  * Update the destination slot to current values of the source slot;
820  * recheck that the source slot is still the one we saw previously.
821  */
822  {
823  TransactionId copy_effective_xmin;
824  TransactionId copy_effective_catalog_xmin;
825  TransactionId copy_xmin;
826  TransactionId copy_catalog_xmin;
827  XLogRecPtr copy_restart_lsn;
828  XLogRecPtr copy_confirmed_flush;
829  bool copy_islogical;
830  char *copy_name;
831 
832  /* Copy data of source slot again */
833  SpinLockAcquire(&src->mutex);
834  second_slot_contents = *src;
835  SpinLockRelease(&src->mutex);
836 
837  copy_effective_xmin = second_slot_contents.effective_xmin;
838  copy_effective_catalog_xmin = second_slot_contents.effective_catalog_xmin;
839 
840  copy_xmin = second_slot_contents.data.xmin;
841  copy_catalog_xmin = second_slot_contents.data.catalog_xmin;
842  copy_restart_lsn = second_slot_contents.data.restart_lsn;
843  copy_confirmed_flush = second_slot_contents.data.confirmed_flush;
844 
845  /* for existence check */
846  copy_name = NameStr(second_slot_contents.data.name);
847  copy_islogical = SlotIsLogical(&second_slot_contents);
848 
849  /*
850  * Check if the source slot still exists and is valid. We regard it as
851  * invalid if the type of replication slot or name has been changed,
852  * or the restart_lsn either is invalid or has gone backward. (The
853  * restart_lsn could go backwards if the source slot is dropped and
854  * copied from an older slot during installation.)
855  *
856  * Since erroring out will release and drop the destination slot we
857  * don't need to release it here.
858  */
859  if (copy_restart_lsn < src_restart_lsn ||
860  src_islogical != copy_islogical ||
861  strcmp(copy_name, NameStr(*src_name)) != 0)
862  ereport(ERROR,
863  (errmsg("could not copy replication slot \"%s\"",
864  NameStr(*src_name)),
865  errdetail("The source replication slot was modified incompatibly during the copy operation.")));
866 
867  /* The source slot must have a consistent snapshot */
868  if (src_islogical && XLogRecPtrIsInvalid(copy_confirmed_flush))
869  ereport(ERROR,
870  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
871  errmsg("cannot copy unfinished logical replication slot \"%s\"",
872  NameStr(*src_name)),
873  errhint("Retry when the source replication slot's confirmed_flush_lsn is valid.")));
874 
875  /* Install copied values again */
877  MyReplicationSlot->effective_xmin = copy_effective_xmin;
878  MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin;
879 
880  MyReplicationSlot->data.xmin = copy_xmin;
881  MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
882  MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
883  MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush;
885 
890 
891 #ifdef USE_ASSERT_CHECKING
892  /* Check that the restart_lsn is available */
893  {
894  XLogSegNo segno;
895 
896  XLByteToSeg(copy_restart_lsn, segno, wal_segment_size);
897  Assert(XLogGetLastRemovedSegno() < segno);
898  }
899 #endif
900  }
901 
902  /* target slot fully created, mark as persistent if needed */
903  if (logical_slot && !temporary)
905 
906  /* All done. Set up the return values */
907  values[0] = NameGetDatum(dst_name);
908  nulls[0] = false;
910  {
912  nulls[1] = false;
913  }
914  else
915  nulls[1] = true;
916 
917  tuple = heap_form_tuple(tupdesc, values, nulls);
918  result = HeapTupleGetDatum(tuple);
919 
921 
922  PG_RETURN_DATUM(result);
923 }
924 
925 /* The wrappers below are all to appease opr_sanity */
926 Datum
928 {
929  return copy_replication_slot(fcinfo, true);
930 }
931 
932 Datum
934 {
935  return copy_replication_slot(fcinfo, true);
936 }
937 
938 Datum
940 {
941  return copy_replication_slot(fcinfo, true);
942 }
943 
944 Datum
946 {
947  return copy_replication_slot(fcinfo, false);
948 }
949 
950 Datum
952 {
953  return copy_replication_slot(fcinfo, false);
954 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr moveto)
Definition: slotfuncs.c:496
#define NIL
Definition: pg_list.h:65
static const char * plugin
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:93
void CheckSlotRequirements(void)
Definition: slot.c:1066
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:590
#define NameGetDatum(X)
Definition: postgres.h:639
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:207
int errhint(const char *fmt,...)
Definition: elog.c:1156
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:823
static void create_physical_replication_slot(char *name, bool immediately_reserve, bool temporary, XLogRecPtr restart_lsn)
Definition: slotfuncs.c:46
int wal_segment_size
Definition: xlog.c:121
#define PG_GET_REPLICATION_SLOTS_COLS
uint32 TransactionId
Definition: c.h:587
Datum pg_get_replication_slots(PG_FUNCTION_ARGS)
Definition: slotfuncs.c:239
Oid GetUserId(void)
Definition: miscinit.c:478
ResourceOwner CurrentResourceOwner
Definition: resowner.c:146
Datum pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
Definition: slotfuncs.c:175
WALAvailability
Definition: xlog.h:281
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, LogicalDecodingXLogPageReadCB page_read, WALSegmentCleanupCB cleanup_cb, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:479
#define Min(x, y)
Definition: c.h:986
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
ReplicationSlotPersistency persistency
Definition: slot.h:62
int errcode(int sqlerrcode)
Definition: elog.c:698
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool superuser(void)
Definition: superuser.c:46
int wal_keep_size_mb
Definition: xlog.c:94
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase)
Definition: slot.c:228
void ReplicationSlotSave(void)
Definition: slot.c:732
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8589
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:274
Datum pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS)
Definition: slotfuncs.c:945
ReplicationSlotPersistentData data
Definition: slot.h:156
Datum pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS)
Definition: slotfuncs.c:939
bool RecoveryInProgress(void)
Definition: xlog.c:8237
XLogReadRecordResult XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
Definition: xlogreader.c:346
#define OidIsValid(objectId)
Definition: c.h:710
void InvalidateSystemCaches(void)
Definition: inval.c:649
XLogRecPtr confirmed_flush
Definition: slot.h:92
XLogRecPtr EndRecPtr
Definition: xlogreader.h:179
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1805
Datum pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
Definition: slotfuncs.c:74
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
XLogSegNo XLogGetLastRemovedSegno(void)
Definition: xlog.c:4007
void ReplicationSlotReserveWal(void)
Definition: slot.c:1091
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:575
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:839
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define ERROR
Definition: elog.h:46
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:106
Datum pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS)
Definition: slotfuncs.c:927
static Datum copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
Definition: slotfuncs.c:705
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11728
void ReplicationSlotPersist(void)
Definition: slot.c:767
TransactionId effective_xmin
Definition: slot.h:152
Definition: c.h:675
static void create_logical_replication_slot(char *name, char *plugin, bool temporary, bool two_phase, XLogRecPtr restart_lsn, bool find_startpoint)
Definition: slotfuncs.c:126
Datum pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS)
Definition: slotfuncs.c:951
uint64 XLogSegNo
Definition: xlogdefs.h:48
int errdetail(const char *fmt,...)
Definition: elog.c:1042
Datum pg_replication_slot_advance(PG_FUNCTION_ARGS)
Definition: slotfuncs.c:612
TransactionId catalog_xmin
Definition: slot.h:78
#define InvalidTransactionId
Definition: transam.h:31
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:252
void ReplicationSlotRelease(void)
Definition: slot.c:497
TransactionId xmin
Definition: slot.h:70
#define SlotIsLogical(slot)
Definition: slot.h:178
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1700
static void check_permissions(void)
Definition: slotfuncs.c:29
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
bool in_use
Definition: slot.h:132
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
#define TransactionIdGetDatum(X)
Definition: postgres.h:565
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:24
Datum pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS)
Definition: slotfuncs.c:933
uintptr_t Datum
Definition: postgres.h:411
Datum pg_drop_replication_slot(PG_FUNCTION_ARGS)
Definition: slotfuncs.c:222
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:353
TransactionId effective_catalog_xmin
Definition: slot.h:153
int work_mem
Definition: globals.c:124
#define BoolGetDatum(X)
Definition: postgres.h:446
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:196
#define XLogMBVarToSegs(mbvar, wal_segsz_bytes)
#define ereport(elevel,...)
Definition: elog.h:157
int allowedModes
Definition: execnodes.h:305
#define PG_RETURN_VOID()
Definition: fmgr.h:349
int max_slot_wal_keep_size_mb
Definition: xlog.c:113
XLogRecPtr GetXLogWriteRecPtr(void)
Definition: xlog.c:11763
SetFunctionReturnMode returnMode
Definition: execnodes.h:307
#define PG_CATCH()
Definition: elog.h:323
#define Max(x, y)
Definition: c.h:980
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
int max_replication_slots
Definition: slot.c:99
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:624
XLogRecPtr restart_lsn
Definition: slot.h:81
bool has_rolreplication(Oid roleid)
Definition: miscinit.c:659
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1203
#define PG_NARGS()
Definition: fmgr.h:203
#define PG_RE_THROW()
Definition: elog.h:354
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:221
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1676
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:233
const char * name
Definition: encode.c:515
Tuplestorestate * setResult
Definition: execnodes.h:310
static Datum values[MAXATTR]
Definition: bootstrap.c:166
ExprContext * econtext
Definition: execnodes.h:303
#define Int32GetDatum(X)
Definition: postgres.h:523
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:591
TupleDesc setDesc
Definition: execnodes.h:311
int errmsg(const char *fmt,...)
Definition: elog.c:909
XLogReaderState * reader
Definition: logical.h:43
int ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
Definition: slot.c:388
pid_t active_pid
Definition: slot.h:135
WALAvailability GetWALAvailability(XLogRecPtr targetLSN)
Definition: xlog.c:9725
#define elog(elevel,...)
Definition: elog.h:232
int i
Definition: slot.h:43
#define NameStr(name)
Definition: c.h:681
#define CStringGetTextDatum(s)
Definition: builtins.h:82
LogicalDecodingXLogPageReadCB page_read
Definition: logical.h:44
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:102
ReplicationSlot replication_slots[1]
Definition: slot.h:189
static XLogRecPtr pg_physical_replication_slot_advance(XLogRecPtr moveto)
Definition: slotfuncs.c:459
XLogRecPtr invalidated_at
Definition: slot.h:84
slock_t mutex
Definition: slot.h:129
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, LogicalDecodingXLogPageReadCB page_read, WALSegmentCleanupCB cleanup_cb, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:320
#define PG_TRY()
Definition: elog.h:313
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:103
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:789
#define PG_END_TRY()
Definition: elog.h:338
void ReplicationSlotMarkDirty(void)
Definition: slot.c:750
#define PG_GETARG_NAME(n)
Definition: fmgr.h:278
bool read_local_xlog_page(XLogReaderState *state)
Definition: xlogutils.c:842
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)