PostgreSQL Source Code  git master
slotfuncs.c File Reference
#include "postgres.h"
#include "access/htup_details.h"
#include "access/xlog_internal.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalfuncs.h"
#include "replication/slot.h"
#include "utils/builtins.h"
#include "utils/inval.h"
#include "utils/pg_lsn.h"
#include "utils/resowner.h"
Include dependency graph for slotfuncs.c:

Go to the source code of this file.

Macros

#define PG_GET_REPLICATION_SLOTS_COLS   11
 

Functions

static void check_permissions (void)
 
static void create_physical_replication_slot (char *name, bool immediately_reserve, bool temporary, XLogRecPtr restart_lsn)
 
Datum pg_create_physical_replication_slot (PG_FUNCTION_ARGS)
 
static void create_logical_replication_slot (char *name, char *plugin, bool temporary, XLogRecPtr restart_lsn)
 
Datum pg_create_logical_replication_slot (PG_FUNCTION_ARGS)
 
Datum pg_drop_replication_slot (PG_FUNCTION_ARGS)
 
Datum pg_get_replication_slots (PG_FUNCTION_ARGS)
 
static XLogRecPtr pg_physical_replication_slot_advance (XLogRecPtr moveto)
 
static XLogRecPtr pg_logical_replication_slot_advance (XLogRecPtr moveto)
 
Datum pg_replication_slot_advance (PG_FUNCTION_ARGS)
 
static Datum copy_replication_slot (FunctionCallInfo fcinfo, bool logical_slot)
 
Datum pg_copy_logical_replication_slot_a (PG_FUNCTION_ARGS)
 
Datum pg_copy_logical_replication_slot_b (PG_FUNCTION_ARGS)
 
Datum pg_copy_logical_replication_slot_c (PG_FUNCTION_ARGS)
 
Datum pg_copy_physical_replication_slot_a (PG_FUNCTION_ARGS)
 
Datum pg_copy_physical_replication_slot_b (PG_FUNCTION_ARGS)
 

Macro Definition Documentation

◆ PG_GET_REPLICATION_SLOTS_COLS

#define PG_GET_REPLICATION_SLOTS_COLS   11

Function Documentation

◆ check_permissions()

static void check_permissions ( void  )
static

Definition at line 29 of file slotfuncs.c.

References ereport, errcode(), errmsg(), ERROR, GetUserId(), has_rolreplication(), and superuser().

Referenced by copy_replication_slot(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_drop_replication_slot(), and pg_replication_slot_advance().

30 {
32  ereport(ERROR,
33  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
34  errmsg("must be superuser or replication role to use replication slots")));
35 }
Oid GetUserId(void)
Definition: miscinit.c:378
int errcode(int sqlerrcode)
Definition: elog.c:608
bool superuser(void)
Definition: superuser.c:46
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:141
bool has_rolreplication(Oid roleid)
Definition: miscinit.c:559
int errmsg(const char *fmt,...)
Definition: elog.c:822

◆ copy_replication_slot()

static Datum copy_replication_slot ( FunctionCallInfo  fcinfo,
bool  logical_slot 
)
static

Definition at line 593 of file slotfuncs.c.

References Assert, ReplicationSlotPersistentData::catalog_xmin, check_permissions(), CheckLogicalDecodingRequirements(), CheckSlotRequirements(), ReplicationSlotPersistentData::confirmed_flush, create_logical_replication_slot(), create_physical_replication_slot(), ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, elog, ereport, errcode(), errdetail(), errmsg(), ERROR, get_call_result_type(), heap_form_tuple(), HeapTupleGetDatum, i, ReplicationSlot::in_use, LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotPersistentData::name, NameGetDatum, NameStr, ReplicationSlotPersistentData::persistency, PG_GETARG_BOOL, PG_GETARG_NAME, PG_NARGS, PG_RETURN_DATUM, plugin, ReplicationSlotPersistentData::plugin, pstrdup(), ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotSave(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), ReplicationSlotPersistentData::restart_lsn, RS_TEMPORARY, SlotIsLogical, SpinLockAcquire, SpinLockRelease, TYPEFUNC_COMPOSITE, values, wal_segment_size, XLByteToSeg, XLogGetLastRemovedSegno(), XLogRecPtrIsInvalid, and ReplicationSlotPersistentData::xmin.

Referenced by pg_copy_logical_replication_slot_a(), pg_copy_logical_replication_slot_b(), pg_copy_logical_replication_slot_c(), pg_copy_physical_replication_slot_a(), and pg_copy_physical_replication_slot_b().

594 {
595  Name src_name = PG_GETARG_NAME(0);
596  Name dst_name = PG_GETARG_NAME(1);
597  ReplicationSlot *src = NULL;
598  XLogRecPtr src_restart_lsn;
599  bool src_islogical;
600  bool temporary;
601  char *plugin;
602  Datum values[2];
603  bool nulls[2];
604  Datum result;
605  TupleDesc tupdesc;
606  HeapTuple tuple;
607 
608  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
609  elog(ERROR, "return type must be a row type");
610 
612 
613  if (logical_slot)
615  else
617 
618  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
619 
620  /*
621  * We need to prevent the source slot's reserved WAL from being removed,
622  * but we don't want to lock that slot for very long, and it can advance
623  * in the meantime. So obtain the source slot's data, and create a new
624  * slot using its restart_lsn. Afterwards we lock the source slot again
625  * and verify that the data we copied (name, type) has not changed
626  * incompatibly. No inconvenient WAL removal can occur once the new slot
627  * is created -- but since WAL removal could have occurred before we
628  * managed to create the new slot, we advance the new slot's restart_lsn
629  * to the source slot's updated restart_lsn the second time we lock it.
630  */
631  for (int i = 0; i < max_replication_slots; i++)
632  {
634 
635  if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
636  {
637  SpinLockAcquire(&s->mutex);
638  src_islogical = SlotIsLogical(s);
639  src_restart_lsn = s->data.restart_lsn;
640  temporary = s->data.persistency == RS_TEMPORARY;
641  plugin = logical_slot ? pstrdup(NameStr(s->data.plugin)) : NULL;
642  SpinLockRelease(&s->mutex);
643 
644  src = s;
645  break;
646  }
647  }
648 
649  LWLockRelease(ReplicationSlotControlLock);
650 
651  if (src == NULL)
652  ereport(ERROR,
653  (errcode(ERRCODE_UNDEFINED_OBJECT),
654  errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
655 
656  /* Check type of replication slot */
657  if (src_islogical != logical_slot)
658  ereport(ERROR,
659  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
660  src_islogical ?
661  errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot",
662  NameStr(*src_name)) :
663  errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot",
664  NameStr(*src_name))));
665 
666  /* Copying non-reserved slot doesn't make sense */
667  if (XLogRecPtrIsInvalid(src_restart_lsn))
668  {
669  Assert(!logical_slot);
670  ereport(ERROR,
671  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
672  errmsg("cannot copy a replication slot that doesn't reserve WAL")));
673  }
674 
675  /* Overwrite params from optional arguments */
676  if (PG_NARGS() >= 3)
677  temporary = PG_GETARG_BOOL(2);
678  if (PG_NARGS() >= 4)
679  {
680  Assert(logical_slot);
681  plugin = NameStr(*(PG_GETARG_NAME(3)));
682  }
683 
684  /* Create new slot and acquire it */
685  if (logical_slot)
687  plugin,
688  temporary,
689  src_restart_lsn);
690  else
692  true,
693  temporary,
694  src_restart_lsn);
695 
696  /*
697  * Update the destination slot to current values of the source slot;
698  * recheck that the source slot is still the one we saw previously.
699  */
700  {
701  TransactionId copy_effective_xmin;
702  TransactionId copy_effective_catalog_xmin;
703  TransactionId copy_xmin;
704  TransactionId copy_catalog_xmin;
705  XLogRecPtr copy_restart_lsn;
706  bool copy_islogical;
707  char *copy_name;
708 
709  /* Copy data of source slot again */
710  SpinLockAcquire(&src->mutex);
711  copy_effective_xmin = src->effective_xmin;
712  copy_effective_catalog_xmin = src->effective_catalog_xmin;
713 
714  copy_xmin = src->data.xmin;
715  copy_catalog_xmin = src->data.catalog_xmin;
716  copy_restart_lsn = src->data.restart_lsn;
717 
718  /* for existence check */
719  copy_name = pstrdup(NameStr(src->data.name));
720  copy_islogical = SlotIsLogical(src);
721  SpinLockRelease(&src->mutex);
722 
723  /*
724  * Check if the source slot still exists and is valid. We regard it as
725  * invalid if the type of replication slot or name has been changed,
726  * or the restart_lsn either is invalid or has gone backward. (The
727  * restart_lsn could go backwards if the source slot is dropped and
728  * copied from an older slot during installation.)
729  *
730  * Since erroring out will release and drop the destination slot we
731  * don't need to release it here.
732  */
733  if (copy_restart_lsn < src_restart_lsn ||
734  src_islogical != copy_islogical ||
735  strcmp(copy_name, NameStr(*src_name)) != 0)
736  ereport(ERROR,
737  (errmsg("could not copy replication slot \"%s\"",
738  NameStr(*src_name)),
739  errdetail("The source replication slot was modified incompatibly during the copy operation.")));
740 
741  /* Install copied values again */
743  MyReplicationSlot->effective_xmin = copy_effective_xmin;
744  MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin;
745 
746  MyReplicationSlot->data.xmin = copy_xmin;
747  MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
748  MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
750 
755 
756 #ifdef USE_ASSERT_CHECKING
757  /* Check that the restart_lsn is available */
758  {
759  XLogSegNo segno;
760 
761  XLByteToSeg(copy_restart_lsn, segno, wal_segment_size);
762  Assert(XLogGetLastRemovedSegno() < segno);
763  }
764 #endif
765  }
766 
767  /* target slot fully created, mark as persistent if needed */
768  if (logical_slot && !temporary)
770 
771  /* All done. Set up the return values */
772  values[0] = NameGetDatum(dst_name);
773  nulls[0] = false;
775  {
777  nulls[1] = false;
778  }
779  else
780  nulls[1] = true;
781 
782  tuple = heap_form_tuple(tupdesc, values, nulls);
783  result = HeapTupleGetDatum(tuple);
784 
786 
787  PG_RETURN_DATUM(result);
788 }
static const char * plugin
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:93
void CheckSlotRequirements(void)
Definition: slot.c:972
#define NameGetDatum(X)
Definition: postgres.h:595
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:196
static void create_physical_replication_slot(char *name, bool immediately_reserve, bool temporary, XLogRecPtr restart_lsn)
Definition: slotfuncs.c:46
int wal_segment_size
Definition: xlog.c:112
uint32 TransactionId
Definition: c.h:513
char * pstrdup(const char *in)
Definition: mcxt.c:1186
ReplicationSlotPersistency persistency
Definition: slot.h:53
int errcode(int sqlerrcode)
Definition: elog.c:608
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
void ReplicationSlotSave(void)
Definition: slot.c:645
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:269
ReplicationSlotPersistentData data
Definition: slot.h:132
XLogRecPtr confirmed_flush
Definition: slot.h:80
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1727
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogSegNo XLogGetLastRemovedSegno(void)
Definition: xlog.c:3890
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:748
#define ERROR
Definition: elog.h:43
void ReplicationSlotPersist(void)
Definition: slot.c:680
TransactionId effective_xmin
Definition: slot.h:128
Definition: c.h:609
uint64 XLogSegNo
Definition: xlogdefs.h:41
int errdetail(const char *fmt,...)
Definition: elog.c:955
TransactionId catalog_xmin
Definition: slot.h:69
void ReplicationSlotRelease(void)
Definition: slot.c:424
TransactionId xmin
Definition: slot.h:61
#define SlotIsLogical(slot)
Definition: slot.h:154
static void check_permissions(void)
Definition: slotfuncs.c:29
#define ereport(elevel, rest)
Definition: elog.h:141
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
bool in_use
Definition: slot.h:108
#define SpinLockRelease(lock)
Definition: spin.h:64
static void create_logical_replication_slot(char *name, char *plugin, bool temporary, XLogRecPtr restart_lsn)
Definition: slotfuncs.c:123
uintptr_t Datum
Definition: postgres.h:367
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:343
TransactionId effective_catalog_xmin
Definition: slot.h:129
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
int max_replication_slots
Definition: slot.c:99
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:738
XLogRecPtr restart_lsn
Definition: slot.h:72
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1123
#define PG_NARGS()
Definition: fmgr.h:198
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:220
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
int i
#define NameStr(name)
Definition: c.h:615
ReplicationSlot replication_slots[1]
Definition: slot.h:165
slock_t mutex
Definition: slot.h:105
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:75
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:702
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663
#define PG_GETARG_NAME(n)
Definition: fmgr.h:273
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ create_logical_replication_slot()

static void create_logical_replication_slot ( char *  name,
char *  plugin,
bool  temporary,
XLogRecPtr  restart_lsn 
)
static

Definition at line 123 of file slotfuncs.c.

References Assert, CreateInitDecodingContext(), DecodingContextFindStartpoint(), FreeDecodingContext(), logical_read_local_xlog_page(), MyReplicationSlot, NIL, ReplicationSlotCreate(), RS_EPHEMERAL, and RS_TEMPORARY.

Referenced by copy_replication_slot(), and pg_create_logical_replication_slot().

125 {
126  LogicalDecodingContext *ctx = NULL;
127 
129 
130  /*
131  * Acquire a logical decoding slot, this will check for conflicting names.
132  * Initially create persistent slot as ephemeral - that allows us to
133  * nicely handle errors during initialization because it'll get dropped if
134  * this transaction fails. We'll make it persistent at the end. Temporary
135  * slots can be created as temporary from beginning as they get dropped on
136  * error as well.
137  */
139  temporary ? RS_TEMPORARY : RS_EPHEMERAL);
140 
141  /*
142  * Create logical decoding context, to build the initial snapshot.
143  */
145  false, /* do not build snapshot */
146  restart_lsn,
147  logical_read_local_xlog_page, NULL, NULL,
148  NULL);
149 
150  /* build initial snapshot, might take a while */
152 
153  /* don't need the decoding context anymore */
154  FreeDecodingContext(ctx);
155 }
#define NIL
Definition: pg_list.h:65
static const char * plugin
int logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: logicalfuncs.c:109
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:221
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:462
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:227
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:738
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:505
const char * name
Definition: encode.c:521

◆ create_physical_replication_slot()

static void create_physical_replication_slot ( char *  name,
bool  immediately_reserve,
bool  temporary,
XLogRecPtr  restart_lsn 
)
static

Definition at line 46 of file slotfuncs.c.

References Assert, ReplicationSlot::data, MyReplicationSlot, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotReserveWal(), ReplicationSlotSave(), ReplicationSlotPersistentData::restart_lsn, RS_PERSISTENT, RS_TEMPORARY, and XLogRecPtrIsInvalid.

Referenced by copy_replication_slot(), and pg_create_physical_replication_slot().

48 {
50 
51  /* acquire replication slot, this will check for conflicting names */
53  temporary ? RS_TEMPORARY : RS_PERSISTENT);
54 
55  if (immediately_reserve)
56  {
57  /* Reserve WAL as the user asked for it */
58  if (XLogRecPtrIsInvalid(restart_lsn))
60  else
61  MyReplicationSlot->data.restart_lsn = restart_lsn;
62 
63  /* Write this slot to disk */
66  }
67 }
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:221
void ReplicationSlotSave(void)
Definition: slot.c:645
ReplicationSlotPersistentData data
Definition: slot.h:132
void ReplicationSlotReserveWal(void)
Definition: slot.c:997
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:738
XLogRecPtr restart_lsn
Definition: slot.h:72
const char * name
Definition: encode.c:521
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663

◆ pg_copy_logical_replication_slot_a()

Datum pg_copy_logical_replication_slot_a ( PG_FUNCTION_ARGS  )

Definition at line 792 of file slotfuncs.c.

References copy_replication_slot().

793 {
794  return copy_replication_slot(fcinfo, true);
795 }
static Datum copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
Definition: slotfuncs.c:593

◆ pg_copy_logical_replication_slot_b()

Datum pg_copy_logical_replication_slot_b ( PG_FUNCTION_ARGS  )

Definition at line 798 of file slotfuncs.c.

References copy_replication_slot().

799 {
800  return copy_replication_slot(fcinfo, true);
801 }
static Datum copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
Definition: slotfuncs.c:593

◆ pg_copy_logical_replication_slot_c()

Datum pg_copy_logical_replication_slot_c ( PG_FUNCTION_ARGS  )

Definition at line 804 of file slotfuncs.c.

References copy_replication_slot().

805 {
806  return copy_replication_slot(fcinfo, true);
807 }
static Datum copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
Definition: slotfuncs.c:593

◆ pg_copy_physical_replication_slot_a()

Datum pg_copy_physical_replication_slot_a ( PG_FUNCTION_ARGS  )

Definition at line 810 of file slotfuncs.c.

References copy_replication_slot().

811 {
812  return copy_replication_slot(fcinfo, false);
813 }
static Datum copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
Definition: slotfuncs.c:593

◆ pg_copy_physical_replication_slot_b()

Datum pg_copy_physical_replication_slot_b ( PG_FUNCTION_ARGS  )

Definition at line 816 of file slotfuncs.c.

References copy_replication_slot().

817 {
818  return copy_replication_slot(fcinfo, false);
819 }
static Datum copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
Definition: slotfuncs.c:593

◆ pg_create_logical_replication_slot()

Datum pg_create_logical_replication_slot ( PG_FUNCTION_ARGS  )

Definition at line 161 of file slotfuncs.c.

References check_permissions(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, create_logical_replication_slot(), ReplicationSlot::data, elog, ERROR, get_call_result_type(), heap_form_tuple(), HeapTupleGetDatum, InvalidXLogRecPtr, LSNGetDatum, MyReplicationSlot, ReplicationSlotPersistentData::name, name, NameGetDatum, NameStr, PG_GETARG_BOOL, PG_GETARG_NAME, PG_RETURN_DATUM, plugin, ReplicationSlotPersist(), ReplicationSlotRelease(), TYPEFUNC_COMPOSITE, and values.

162 {
163  Name name = PG_GETARG_NAME(0);
165  bool temporary = PG_GETARG_BOOL(2);
166  Datum result;
167  TupleDesc tupdesc;
168  HeapTuple tuple;
169  Datum values[2];
170  bool nulls[2];
171 
172  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
173  elog(ERROR, "return type must be a row type");
174 
176 
178 
180  NameStr(*plugin),
181  temporary,
183 
184  values[0] = NameGetDatum(&MyReplicationSlot->data.name);
186 
187  memset(nulls, 0, sizeof(nulls));
188 
189  tuple = heap_form_tuple(tupdesc, values, nulls);
190  result = HeapTupleGetDatum(tuple);
191 
192  /* ok, slot is now fully created, mark it as persistent if needed */
193  if (!temporary)
196 
197  PG_RETURN_DATUM(result);
198 }
static const char * plugin
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define NameGetDatum(X)
Definition: postgres.h:595
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:196
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:269
ReplicationSlotPersistentData data
Definition: slot.h:132
XLogRecPtr confirmed_flush
Definition: slot.h:80
#define ERROR
Definition: elog.h:43
void ReplicationSlotPersist(void)
Definition: slot.c:680
Definition: c.h:609
void ReplicationSlotRelease(void)
Definition: slot.c:424
static void check_permissions(void)
Definition: slotfuncs.c:29
static void create_logical_replication_slot(char *name, char *plugin, bool temporary, XLogRecPtr restart_lsn)
Definition: slotfuncs.c:123
uintptr_t Datum
Definition: postgres.h:367
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:343
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:220
const char * name
Definition: encode.c:521
static Datum values[MAXATTR]
Definition: bootstrap.c:167
#define elog(elevel,...)
Definition: elog.h:228
#define NameStr(name)
Definition: c.h:615
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:75
#define PG_GETARG_NAME(n)
Definition: fmgr.h:273

◆ pg_create_physical_replication_slot()

Datum pg_create_physical_replication_slot ( PG_FUNCTION_ARGS  )

Definition at line 74 of file slotfuncs.c.

References check_permissions(), CheckSlotRequirements(), create_physical_replication_slot(), ReplicationSlot::data, elog, ERROR, get_call_result_type(), heap_form_tuple(), HeapTupleGetDatum, InvalidXLogRecPtr, LSNGetDatum, MyReplicationSlot, ReplicationSlotPersistentData::name, name, NameGetDatum, NameStr, PG_GETARG_BOOL, PG_GETARG_NAME, PG_RETURN_DATUM, ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, TYPEFUNC_COMPOSITE, and values.

75 {
77  bool immediately_reserve = PG_GETARG_BOOL(1);
78  bool temporary = PG_GETARG_BOOL(2);
79  Datum values[2];
80  bool nulls[2];
81  TupleDesc tupdesc;
82  HeapTuple tuple;
83  Datum result;
84 
85  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
86  elog(ERROR, "return type must be a row type");
87 
89 
91 
93  immediately_reserve,
94  temporary,
96 
97  values[0] = NameGetDatum(&MyReplicationSlot->data.name);
98  nulls[0] = false;
99 
100  if (immediately_reserve)
101  {
103  nulls[1] = false;
104  }
105  else
106  nulls[1] = true;
107 
108  tuple = heap_form_tuple(tupdesc, values, nulls);
109  result = HeapTupleGetDatum(tuple);
110 
112 
113  PG_RETURN_DATUM(result);
114 }
void CheckSlotRequirements(void)
Definition: slot.c:972
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define NameGetDatum(X)
Definition: postgres.h:595
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:196
static void create_physical_replication_slot(char *name, bool immediately_reserve, bool temporary, XLogRecPtr restart_lsn)
Definition: slotfuncs.c:46
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:269
ReplicationSlotPersistentData data
Definition: slot.h:132
#define ERROR
Definition: elog.h:43
Definition: c.h:609
void ReplicationSlotRelease(void)
Definition: slot.c:424
static void check_permissions(void)
Definition: slotfuncs.c:29
uintptr_t Datum
Definition: postgres.h:367
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:343
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
XLogRecPtr restart_lsn
Definition: slot.h:72
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:220
const char * name
Definition: encode.c:521
static Datum values[MAXATTR]
Definition: bootstrap.c:167
#define elog(elevel,...)
Definition: elog.h:228
#define NameStr(name)
Definition: c.h:615
#define PG_GETARG_NAME(n)
Definition: fmgr.h:273

◆ pg_drop_replication_slot()

Datum pg_drop_replication_slot ( PG_FUNCTION_ARGS  )

Definition at line 205 of file slotfuncs.c.

References check_permissions(), CheckSlotRequirements(), name, NameStr, PG_GETARG_NAME, PG_RETURN_VOID, and ReplicationSlotDrop().

206 {
207  Name name = PG_GETARG_NAME(0);
208 
210 
212 
213  ReplicationSlotDrop(NameStr(*name), true);
214 
215  PG_RETURN_VOID();
216 }
void CheckSlotRequirements(void)
Definition: slot.c:972
Definition: c.h:609
static void check_permissions(void)
Definition: slotfuncs.c:29
#define PG_RETURN_VOID()
Definition: fmgr.h:339
const char * name
Definition: encode.c:521
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:517
#define NameStr(name)
Definition: c.h:615
#define PG_GETARG_NAME(n)
Definition: fmgr.h:273

◆ pg_get_replication_slots()

Datum pg_get_replication_slots ( PG_FUNCTION_ARGS  )

Definition at line 222 of file slotfuncs.c.

References ReplicationSlot::active_pid, ReturnSetInfo::allowedModes, BoolGetDatum, ReplicationSlotPersistentData::catalog_xmin, ReplicationSlotPersistentData::confirmed_flush, CStringGetTextDatum, ReplicationSlot::data, ReplicationSlotPersistentData::database, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, get_call_result_type(), i, ReplicationSlot::in_use, Int32GetDatum, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsA, LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, MemoryContextSwitchTo(), ReplicationSlot::mutex, ReplicationSlotPersistentData::name, namecpy(), NameGetDatum, ReplicationSlotPersistentData::persistency, PG_GET_REPLICATION_SLOTS_COLS, plugin, ReplicationSlotPersistentData::plugin, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotPersistentData::restart_lsn, ReturnSetInfo::returnMode, RS_TEMPORARY, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, SpinLockAcquire, SpinLockRelease, TransactionIdGetDatum, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, work_mem, and ReplicationSlotPersistentData::xmin.

223 {
224 #define PG_GET_REPLICATION_SLOTS_COLS 11
225  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
226  TupleDesc tupdesc;
227  Tuplestorestate *tupstore;
228  MemoryContext per_query_ctx;
229  MemoryContext oldcontext;
230  int slotno;
231 
232  /* check to see if caller supports us returning a tuplestore */
233  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
234  ereport(ERROR,
235  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
236  errmsg("set-valued function called in context that cannot accept a set")));
237  if (!(rsinfo->allowedModes & SFRM_Materialize))
238  ereport(ERROR,
239  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
240  errmsg("materialize mode required, but it is not allowed in this context")));
241 
242  /* Build a tuple descriptor for our result type */
243  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
244  elog(ERROR, "return type must be a row type");
245 
246  /*
247  * We don't require any special permission to see this function's data
248  * because nothing should be sensitive. The most critical being the slot
249  * name, which shouldn't contain anything particularly sensitive.
250  */
251 
252  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
253  oldcontext = MemoryContextSwitchTo(per_query_ctx);
254 
255  tupstore = tuplestore_begin_heap(true, false, work_mem);
256  rsinfo->returnMode = SFRM_Materialize;
257  rsinfo->setResult = tupstore;
258  rsinfo->setDesc = tupdesc;
259 
260  MemoryContextSwitchTo(oldcontext);
261 
262  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
263  for (slotno = 0; slotno < max_replication_slots; slotno++)
264  {
267  bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
268 
269  ReplicationSlotPersistency persistency;
270  TransactionId xmin;
271  TransactionId catalog_xmin;
272  XLogRecPtr restart_lsn;
273  XLogRecPtr confirmed_flush_lsn;
274  pid_t active_pid;
275  Oid database;
276  NameData slot_name;
278  int i;
279 
280  if (!slot->in_use)
281  continue;
282 
283  SpinLockAcquire(&slot->mutex);
284 
285  xmin = slot->data.xmin;
286  catalog_xmin = slot->data.catalog_xmin;
287  database = slot->data.database;
288  restart_lsn = slot->data.restart_lsn;
289  confirmed_flush_lsn = slot->data.confirmed_flush;
290  namecpy(&slot_name, &slot->data.name);
291  namecpy(&plugin, &slot->data.plugin);
292  active_pid = slot->active_pid;
293  persistency = slot->data.persistency;
294 
295  SpinLockRelease(&slot->mutex);
296 
297  memset(nulls, 0, sizeof(nulls));
298 
299  i = 0;
300  values[i++] = NameGetDatum(&slot_name);
301 
302  if (database == InvalidOid)
303  nulls[i++] = true;
304  else
305  values[i++] = NameGetDatum(&plugin);
306 
307  if (database == InvalidOid)
308  values[i++] = CStringGetTextDatum("physical");
309  else
310  values[i++] = CStringGetTextDatum("logical");
311 
312  if (database == InvalidOid)
313  nulls[i++] = true;
314  else
315  values[i++] = database;
316 
317  values[i++] = BoolGetDatum(persistency == RS_TEMPORARY);
318  values[i++] = BoolGetDatum(active_pid != 0);
319 
320  if (active_pid != 0)
321  values[i++] = Int32GetDatum(active_pid);
322  else
323  nulls[i++] = true;
324 
325  if (xmin != InvalidTransactionId)
326  values[i++] = TransactionIdGetDatum(xmin);
327  else
328  nulls[i++] = true;
329 
330  if (catalog_xmin != InvalidTransactionId)
331  values[i++] = TransactionIdGetDatum(catalog_xmin);
332  else
333  nulls[i++] = true;
334 
335  if (restart_lsn != InvalidXLogRecPtr)
336  values[i++] = LSNGetDatum(restart_lsn);
337  else
338  nulls[i++] = true;
339 
340  if (confirmed_flush_lsn != InvalidXLogRecPtr)
341  values[i++] = LSNGetDatum(confirmed_flush_lsn);
342  else
343  nulls[i++] = true;
344 
345  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
346  }
347  LWLockRelease(ReplicationSlotControlLock);
348 
349  tuplestore_donestoring(tupstore);
350 
351  return (Datum) 0;
352 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static const char * plugin
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:93
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:576
#define NameGetDatum(X)
Definition: postgres.h:595
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:196
#define PG_GET_REPLICATION_SLOTS_COLS
uint32 TransactionId
Definition: c.h:513
int namecpy(Name n1, const NameData *n2)
Definition: name.c:233
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
ReplicationSlotPersistency persistency
Definition: slot.h:53
int errcode(int sqlerrcode)
Definition: elog.c:608
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
ReplicationSlotPersistentData data
Definition: slot.h:132
unsigned int Oid
Definition: postgres_ext.h:31
XLogRecPtr confirmed_flush
Definition: slot.h:80
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1727
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
Definition: c.h:609
TransactionId catalog_xmin
Definition: slot.h:69
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: slot.h:61
#define ereport(elevel, rest)
Definition: elog.h:141
bool in_use
Definition: slot.h:108
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
#define TransactionIdGetDatum(X)
Definition: postgres.h:521
uintptr_t Datum
Definition: postgres.h:367
int work_mem
Definition: globals.c:121
#define BoolGetDatum(X)
Definition: postgres.h:402
#define InvalidOid
Definition: postgres_ext.h:36
int allowedModes
Definition: execnodes.h:303
SetFunctionReturnMode returnMode
Definition: execnodes.h:305
int max_replication_slots
Definition: slot.c:99
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogRecPtr restart_lsn
Definition: slot.h:72
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1123
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:231
Tuplestorestate * setResult
Definition: execnodes.h:308
static Datum values[MAXATTR]
Definition: bootstrap.c:167
ExprContext * econtext
Definition: execnodes.h:301
#define Int32GetDatum(X)
Definition: postgres.h:479
TupleDesc setDesc
Definition: execnodes.h:309
ReplicationSlotPersistency
Definition: slot.h:32
int errmsg(const char *fmt,...)
Definition: elog.c:822
pid_t active_pid
Definition: slot.h:111
#define elog(elevel,...)
Definition: elog.h:228
int i
#define CStringGetTextDatum(s)
Definition: builtins.h:87
ReplicationSlot replication_slots[1]
Definition: slot.h:165
slock_t mutex
Definition: slot.h:105

◆ pg_logical_replication_slot_advance()

static XLogRecPtr pg_logical_replication_slot_advance ( XLogRecPtr  moveto)
static

Definition at line 398 of file slotfuncs.c.

References CHECK_FOR_INTERRUPTS, ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), CurrentResourceOwner, ReplicationSlot::data, elog, XLogReaderState::EndRecPtr, ERROR, FreeDecodingContext(), InvalidateSystemCaches(), InvalidXLogRecPtr, logical_read_local_xlog_page(), LogicalConfirmReceivedLocation(), LogicalDecodingProcessRecord(), MyReplicationSlot, NIL, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, LogicalDecodingContext::reader, ReplicationSlotMarkDirty(), ReplicationSlotPersistentData::restart_lsn, XLogBeginRead(), and XLogReadRecord().

Referenced by pg_replication_slot_advance().

399 {
401  ResourceOwner old_resowner = CurrentResourceOwner;
402  XLogRecPtr retlsn;
403 
404  PG_TRY();
405  {
406  /*
407  * Create our decoding context in fast_forward mode, passing start_lsn
408  * as InvalidXLogRecPtr, so that we start processing from my slot's
409  * confirmed_flush.
410  */
412  NIL,
413  true, /* fast_forward */
415  NULL, NULL, NULL);
416 
417  /*
418  * Start reading at the slot's restart_lsn, which we know to point to
419  * a valid record.
420  */
422 
423  /* Initialize our return value in case we don't do anything */
425 
426  /* invalidate non-timetravel entries */
428 
429  /* Decode at least one record, until we run out of records */
430  while (ctx->reader->EndRecPtr < moveto)
431  {
432  char *errm = NULL;
433  XLogRecord *record;
434 
435  /*
436  * Read records. No changes are generated in fast_forward mode,
437  * but snapbuilder/slot statuses are updated properly.
438  */
439  record = XLogReadRecord(ctx->reader, &errm);
440  if (errm)
441  elog(ERROR, "%s", errm);
442 
443  /*
444  * Process the record. Storage-level changes are ignored in
445  * fast_forward mode, but other modules (such as snapbuilder)
446  * might still have critical updates to do.
447  */
448  if (record)
450 
451  /* Stop once the requested target has been reached */
452  if (moveto <= ctx->reader->EndRecPtr)
453  break;
454 
456  }
457 
458  /*
459  * Logical decoding could have clobbered CurrentResourceOwner during
460  * transaction management, so restore the executor's value. (This is
461  * a kluge, but it's not worth cleaning up right now.)
462  */
463  CurrentResourceOwner = old_resowner;
464 
465  if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
466  {
468 
469  /*
470  * If only the confirmed_flush LSN has changed the slot won't get
471  * marked as dirty by the above. Callers on the walsender
472  * interface are expected to keep track of their own progress and
473  * don't need it written out. But SQL-interface users cannot
474  * specify their own start positions and it's harder for them to
475  * keep track of their progress, so we should make more of an
476  * effort to save it for them.
477  *
478  * Dirty the slot so it is written out at the next checkpoint.
479  * The LSN position advanced to may still be lost on a crash
480  * but this makes the data consistent after a clean shutdown.
481  */
483  }
484 
486 
487  /* free context, call shutdown callback */
488  FreeDecodingContext(ctx);
489 
491  }
492  PG_CATCH();
493  {
494  /* clear all timetravel entries */
496 
497  PG_RE_THROW();
498  }
499  PG_END_TRY();
500 
501  return retlsn;
502 }
#define NIL
Definition: pg_list.h:65
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ResourceOwner CurrentResourceOwner
Definition: resowner.c:142
int logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: logicalfuncs.c:109
ReplicationSlotPersistentData data
Definition: slot.h:132
void InvalidateSystemCaches(void)
Definition: inval.c:643
XLogRecPtr confirmed_flush
Definition: slot.h:80
XLogRecPtr EndRecPtr
Definition: xlogreader.h:135
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:261
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:94
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:233
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:370
#define PG_CATCH()
Definition: elog.h:332
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:505
XLogRecPtr restart_lsn
Definition: slot.h:72
#define PG_RE_THROW()
Definition: elog.h:363
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:998
XLogReaderState * reader
Definition: logical.h:41
#define elog(elevel,...)
Definition: elog.h:228
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
#define PG_TRY()
Definition: elog.h:322
#define PG_END_TRY()
Definition: elog.h:347
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663

◆ pg_physical_replication_slot_advance()

static XLogRecPtr pg_physical_replication_slot_advance ( XLogRecPtr  moveto)
static

Definition at line 362 of file slotfuncs.c.

References ReplicationSlot::data, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by pg_replication_slot_advance().

363 {
365  XLogRecPtr retlsn = startlsn;
366 
367  if (startlsn < moveto)
368  {
372  retlsn = moveto;
373 
374  /*
375  * Dirty the slot so as it is written out at the next checkpoint.
376  * Note that the LSN position advanced may still be lost in the
377  * event of a crash, but this makes the data consistent after a
378  * clean shutdown.
379  */
381  }
382 
383  return retlsn;
384 }
ReplicationSlotPersistentData data
Definition: slot.h:132
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogRecPtr restart_lsn
Definition: slot.h:72
slock_t mutex
Definition: slot.h:105
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663

◆ pg_replication_slot_advance()

Datum pg_replication_slot_advance ( PG_FUNCTION_ARGS  )

Definition at line 508 of file slotfuncs.c.

References Assert, check_permissions(), ReplicationSlotPersistentData::confirmed_flush, ReplicationSlot::data, ReplicationSlotPersistentData::database, elog, ereport, errcode(), errmsg(), ERROR, get_call_result_type(), GetFlushRecPtr(), GetXLogReplayRecPtr(), heap_form_tuple(), HeapTupleGetDatum, LSNGetDatum, Min, MyReplicationSlot, ReplicationSlotPersistentData::name, NameGetDatum, NameStr, OidIsValid, PG_GETARG_LSN, PG_GETARG_NAME, pg_logical_replication_slot_advance(), pg_physical_replication_slot_advance(), PG_RETURN_DATUM, RecoveryInProgress(), ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, ThisTimeLineID, TYPEFUNC_COMPOSITE, values, and XLogRecPtrIsInvalid.

509 {
510  Name slotname = PG_GETARG_NAME(0);
511  XLogRecPtr moveto = PG_GETARG_LSN(1);
512  XLogRecPtr endlsn;
513  XLogRecPtr minlsn;
514  TupleDesc tupdesc;
515  Datum values[2];
516  bool nulls[2];
517  HeapTuple tuple;
518  Datum result;
519 
521 
523 
524  if (XLogRecPtrIsInvalid(moveto))
525  ereport(ERROR,
526  (errmsg("invalid target WAL LSN")));
527 
528  /* Build a tuple descriptor for our result type */
529  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
530  elog(ERROR, "return type must be a row type");
531 
532  /*
533  * We can't move slot past what's been flushed/replayed so clamp the
534  * target position accordingly.
535  */
536  if (!RecoveryInProgress())
537  moveto = Min(moveto, GetFlushRecPtr());
538  else
539  moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
540 
541  /* Acquire the slot so we "own" it */
542  ReplicationSlotAcquire(NameStr(*slotname), true);
543 
544  /* A slot whose restart_lsn has never been reserved cannot be advanced */
546  ereport(ERROR,
547  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
548  errmsg("cannot advance replication slot that has not previously reserved WAL")));
549 
550  /*
551  * Check if the slot is not moving backwards. Physical slots rely simply
552  * on restart_lsn as a minimum point, while logical slots have confirmed
553  * consumption up to confirmed_flush, meaning that in both cases data
554  * older than that is not available anymore.
555  */
558  else
560 
561  if (moveto < minlsn)
562  ereport(ERROR,
563  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
564  errmsg("cannot advance replication slot to %X/%X, minimum is %X/%X",
565  (uint32) (moveto >> 32), (uint32) moveto,
566  (uint32) (minlsn >> 32), (uint32) minlsn)));
567 
568  /* Do the actual slot update, depending on the slot type */
570  endlsn = pg_logical_replication_slot_advance(moveto);
571  else
572  endlsn = pg_physical_replication_slot_advance(moveto);
573 
574  values[0] = NameGetDatum(&MyReplicationSlot->data.name);
575  nulls[0] = false;
576 
578 
579  /* Return the reached position. */
580  values[1] = LSNGetDatum(endlsn);
581  nulls[1] = false;
582 
583  tuple = heap_form_tuple(tupdesc, values, nulls);
584  result = HeapTupleGetDatum(tuple);
585 
586  PG_RETURN_DATUM(result);
587 }
static XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr moveto)
Definition: slotfuncs.c:398
#define NameGetDatum(X)
Definition: postgres.h:595
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:196
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:330
#define Min(x, y)
Definition: c.h:920
int errcode(int sqlerrcode)
Definition: elog.c:608
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8275
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
ReplicationSlotPersistentData data
Definition: slot.h:132
bool RecoveryInProgress(void)
Definition: xlog.c:7942
#define OidIsValid(objectId)
Definition: c.h:644
XLogRecPtr confirmed_flush
Definition: slot.h:80
#define ERROR
Definition: elog.h:43
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11206
Definition: c.h:609
unsigned int uint32
Definition: c.h:367
void ReplicationSlotRelease(void)
Definition: slot.c:424
static void check_permissions(void)
Definition: slotfuncs.c:29
#define ereport(elevel, rest)
Definition: elog.h:141
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:24
uintptr_t Datum
Definition: postgres.h:367
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:343
TimeLineID ThisTimeLineID
Definition: xlog.c:187
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:738
XLogRecPtr restart_lsn
Definition: slot.h:72
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:220
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
#define NameStr(name)
Definition: c.h:615
static XLogRecPtr pg_physical_replication_slot_advance(XLogRecPtr moveto)
Definition: slotfuncs.c:362
#define PG_GETARG_NAME(n)
Definition: fmgr.h:273