PostgreSQL Source Code  git master
slotfuncs.c File Reference
#include "postgres.h"
#include "access/htup_details.h"
#include "access/xlog_internal.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalfuncs.h"
#include "replication/slot.h"
#include "utils/builtins.h"
#include "utils/inval.h"
#include "utils/pg_lsn.h"
#include "utils/resowner.h"
Include dependency graph for slotfuncs.c:

Go to the source code of this file.

Macros

#define PG_GET_REPLICATION_SLOTS_COLS   11
 

Functions

static void check_permissions (void)
 
static void create_physical_replication_slot (char *name, bool immediately_reserve, bool temporary, XLogRecPtr restart_lsn)
 
Datum pg_create_physical_replication_slot (PG_FUNCTION_ARGS)
 
static void create_logical_replication_slot (char *name, char *plugin, bool temporary, XLogRecPtr restart_lsn)
 
Datum pg_create_logical_replication_slot (PG_FUNCTION_ARGS)
 
Datum pg_drop_replication_slot (PG_FUNCTION_ARGS)
 
Datum pg_get_replication_slots (PG_FUNCTION_ARGS)
 
static XLogRecPtr pg_physical_replication_slot_advance (XLogRecPtr moveto)
 
static XLogRecPtr pg_logical_replication_slot_advance (XLogRecPtr moveto)
 
Datum pg_replication_slot_advance (PG_FUNCTION_ARGS)
 
static Datum copy_replication_slot (FunctionCallInfo fcinfo, bool logical_slot)
 
Datum pg_copy_logical_replication_slot_a (PG_FUNCTION_ARGS)
 
Datum pg_copy_logical_replication_slot_b (PG_FUNCTION_ARGS)
 
Datum pg_copy_logical_replication_slot_c (PG_FUNCTION_ARGS)
 
Datum pg_copy_physical_replication_slot_a (PG_FUNCTION_ARGS)
 
Datum pg_copy_physical_replication_slot_b (PG_FUNCTION_ARGS)
 

Macro Definition Documentation

◆ PG_GET_REPLICATION_SLOTS_COLS

#define PG_GET_REPLICATION_SLOTS_COLS   11

Function Documentation

◆ check_permissions()

static void check_permissions ( void  )
static

Definition at line 29 of file slotfuncs.c.

References ereport, errcode(), errmsg(), ERROR, GetUserId(), has_rolreplication(), and superuser().

Referenced by copy_replication_slot(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_drop_replication_slot(), and pg_replication_slot_advance().

30 {
32  ereport(ERROR,
33  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
34  (errmsg("must be superuser or replication role to use replication slots"))));
35 }
Oid GetUserId(void)
Definition: miscinit.c:380
int errcode(int sqlerrcode)
Definition: elog.c:608
bool superuser(void)
Definition: superuser.c:46
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:141
bool has_rolreplication(Oid roleid)
Definition: miscinit.c:561
int errmsg(const char *fmt,...)
Definition: elog.c:822

◆ copy_replication_slot()

static Datum copy_replication_slot ( FunctionCallInfo  fcinfo,
bool  logical_slot 
)
static

Definition at line 602 of file slotfuncs.c.

References Assert, ReplicationSlotPersistentData::catalog_xmin, check_permissions(), CheckLogicalDecodingRequirements(), CheckSlotRequirements(), ReplicationSlotPersistentData::confirmed_flush, create_logical_replication_slot(), create_physical_replication_slot(), ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, elog, ereport, errcode(), errdetail(), errmsg(), ERROR, get_call_result_type(), heap_form_tuple(), HeapTupleGetDatum, i, ReplicationSlot::in_use, LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotPersistentData::name, NameGetDatum, NameStr, ReplicationSlotPersistentData::persistency, PG_GETARG_BOOL, PG_GETARG_NAME, PG_NARGS, PG_RETURN_DATUM, plugin, ReplicationSlotPersistentData::plugin, pstrdup(), ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotSave(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), ReplicationSlotPersistentData::restart_lsn, RS_TEMPORARY, SlotIsLogical, SpinLockAcquire, SpinLockRelease, TYPEFUNC_COMPOSITE, values, wal_segment_size, XLByteToSeg, XLogGetLastRemovedSegno(), XLogRecPtrIsInvalid, and ReplicationSlotPersistentData::xmin.

Referenced by pg_copy_logical_replication_slot_a(), pg_copy_logical_replication_slot_b(), pg_copy_logical_replication_slot_c(), pg_copy_physical_replication_slot_a(), and pg_copy_physical_replication_slot_b().

603 {
604  Name src_name = PG_GETARG_NAME(0);
605  Name dst_name = PG_GETARG_NAME(1);
606  ReplicationSlot *src = NULL;
607  XLogRecPtr src_restart_lsn;
608  bool src_islogical;
609  bool temporary;
610  char *plugin;
611  Datum values[2];
612  bool nulls[2];
613  Datum result;
614  TupleDesc tupdesc;
615  HeapTuple tuple;
616 
617  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
618  elog(ERROR, "return type must be a row type");
619 
621 
622  if (logical_slot)
624  else
626 
627  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
628 
629  /*
630  * We need to prevent the source slot's reserved WAL from being removed,
631  * but we don't want to lock that slot for very long, and it can advance
632  * in the meantime. So obtain the source slot's data, and create a new
633  * slot using its restart_lsn. Afterwards we lock the source slot again
634  * and verify that the data we copied (name, type) has not changed
635  * incompatibly. No inconvenient WAL removal can occur once the new slot
636  * is created -- but since WAL removal could have occurred before we
637  * managed to create the new slot, we advance the new slot's restart_lsn
638  * to the source slot's updated restart_lsn the second time we lock it.
639  */
640  for (int i = 0; i < max_replication_slots; i++)
641  {
643 
644  if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
645  {
646  SpinLockAcquire(&s->mutex);
647  src_islogical = SlotIsLogical(s);
648  src_restart_lsn = s->data.restart_lsn;
649  temporary = s->data.persistency == RS_TEMPORARY;
650  plugin = logical_slot ? pstrdup(NameStr(s->data.plugin)) : NULL;
651  SpinLockRelease(&s->mutex);
652 
653  src = s;
654  break;
655  }
656  }
657 
658  LWLockRelease(ReplicationSlotControlLock);
659 
660  if (src == NULL)
661  ereport(ERROR,
662  (errcode(ERRCODE_UNDEFINED_OBJECT),
663  errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
664 
665  /* Check type of replication slot */
666  if (src_islogical != logical_slot)
667  ereport(ERROR,
668  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
669  src_islogical ?
670  errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot",
671  NameStr(*src_name)) :
672  errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot",
673  NameStr(*src_name))));
674 
675  /* Copying non-reserved slot doesn't make sense */
676  if (XLogRecPtrIsInvalid(src_restart_lsn))
677  {
678  Assert(!logical_slot);
679  ereport(ERROR,
680  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
681  (errmsg("cannot copy a replication slot that doesn't reserve WAL"))));
682  }
683 
684  /* Overwrite params from optional arguments */
685  if (PG_NARGS() >= 3)
686  temporary = PG_GETARG_BOOL(2);
687  if (PG_NARGS() >= 4)
688  {
689  Assert(logical_slot);
690  plugin = NameStr(*(PG_GETARG_NAME(3)));
691  }
692 
693  /* Create new slot and acquire it */
694  if (logical_slot)
696  plugin,
697  temporary,
698  src_restart_lsn);
699  else
701  true,
702  temporary,
703  src_restart_lsn);
704 
705  /*
706  * Update the destination slot to current values of the source slot;
707  * recheck that the source slot is still the one we saw previously.
708  */
709  {
710  TransactionId copy_effective_xmin;
711  TransactionId copy_effective_catalog_xmin;
712  TransactionId copy_xmin;
713  TransactionId copy_catalog_xmin;
714  XLogRecPtr copy_restart_lsn;
715  bool copy_islogical;
716  char *copy_name;
717 
718  /* Copy data of source slot again */
719  SpinLockAcquire(&src->mutex);
720  copy_effective_xmin = src->effective_xmin;
721  copy_effective_catalog_xmin = src->effective_catalog_xmin;
722 
723  copy_xmin = src->data.xmin;
724  copy_catalog_xmin = src->data.catalog_xmin;
725  copy_restart_lsn = src->data.restart_lsn;
726 
727  /* for existence check */
728  copy_name = pstrdup(NameStr(src->data.name));
729  copy_islogical = SlotIsLogical(src);
730  SpinLockRelease(&src->mutex);
731 
732  /*
733  * Check if the source slot still exists and is valid. We regard it as
734  * invalid if the type of replication slot or name has been changed,
735  * or the restart_lsn either is invalid or has gone backward. (The
736  * restart_lsn could go backwards if the source slot is dropped and
737  * copied from an older slot during installation.)
738  *
739  * Since erroring out will release and drop the destination slot we
740  * don't need to release it here.
741  */
742  if (copy_restart_lsn < src_restart_lsn ||
743  src_islogical != copy_islogical ||
744  strcmp(copy_name, NameStr(*src_name)) != 0)
745  ereport(ERROR,
746  (errmsg("could not copy replication slot \"%s\"",
747  NameStr(*src_name)),
748  errdetail("The source replication slot was modified incompatibly during the copy operation.")));
749 
750  /* Install copied values again */
752  MyReplicationSlot->effective_xmin = copy_effective_xmin;
753  MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin;
754 
755  MyReplicationSlot->data.xmin = copy_xmin;
756  MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
757  MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
759 
764 
765 #ifdef USE_ASSERT_CHECKING
766  /* Check that the restart_lsn is available */
767  {
768  XLogSegNo segno;
769 
770  XLByteToSeg(copy_restart_lsn, segno, wal_segment_size);
771  Assert(XLogGetLastRemovedSegno() < segno);
772  }
773 #endif
774  }
775 
776  /* target slot fully created, mark as persistent if needed */
777  if (logical_slot && !temporary)
779 
780  /* All done. Set up the return values */
781  values[0] = NameGetDatum(dst_name);
782  nulls[0] = false;
784  {
786  nulls[1] = false;
787  }
788  else
789  nulls[1] = true;
790 
791  tuple = heap_form_tuple(tupdesc, values, nulls);
792  result = HeapTupleGetDatum(tuple);
793 
795 
796  PG_RETURN_DATUM(result);
797 }
static const char * plugin
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:93
void CheckSlotRequirements(void)
Definition: slot.c:972
#define NameGetDatum(X)
Definition: postgres.h:595
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:196
static void create_physical_replication_slot(char *name, bool immediately_reserve, bool temporary, XLogRecPtr restart_lsn)
Definition: slotfuncs.c:46
int wal_segment_size
Definition: xlog.c:112
uint32 TransactionId
Definition: c.h:514
char * pstrdup(const char *in)
Definition: mcxt.c:1186
ReplicationSlotPersistency persistency
Definition: slot.h:53
int errcode(int sqlerrcode)
Definition: elog.c:608
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
void ReplicationSlotSave(void)
Definition: slot.c:645
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:269
ReplicationSlotPersistentData data
Definition: slot.h:132
XLogRecPtr confirmed_flush
Definition: slot.h:80
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogSegNo XLogGetLastRemovedSegno(void)
Definition: xlog.c:3891
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:748
#define ERROR
Definition: elog.h:43
void ReplicationSlotPersist(void)
Definition: slot.c:680
TransactionId effective_xmin
Definition: slot.h:128
Definition: c.h:610
uint64 XLogSegNo
Definition: xlogdefs.h:41
int errdetail(const char *fmt,...)
Definition: elog.c:955
TransactionId catalog_xmin
Definition: slot.h:69
void ReplicationSlotRelease(void)
Definition: slot.c:424
TransactionId xmin
Definition: slot.h:61
#define SlotIsLogical(slot)
Definition: slot.h:154
static void check_permissions(void)
Definition: slotfuncs.c:29
#define ereport(elevel, rest)
Definition: elog.h:141
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
bool in_use
Definition: slot.h:108
#define SpinLockRelease(lock)
Definition: spin.h:64
static void create_logical_replication_slot(char *name, char *plugin, bool temporary, XLogRecPtr restart_lsn)
Definition: slotfuncs.c:123
uintptr_t Datum
Definition: postgres.h:367
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:343
TransactionId effective_catalog_xmin
Definition: slot.h:129
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
int max_replication_slots
Definition: slot.c:99
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:739
XLogRecPtr restart_lsn
Definition: slot.h:72
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
#define PG_NARGS()
Definition: fmgr.h:198
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:220
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
int i
#define NameStr(name)
Definition: c.h:616
ReplicationSlot replication_slots[1]
Definition: slot.h:165
slock_t mutex
Definition: slot.h:105
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:75
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:702
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663
#define PG_GETARG_NAME(n)
Definition: fmgr.h:273
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ create_logical_replication_slot()

static void create_logical_replication_slot ( char *  name,
char *  plugin,
bool  temporary,
XLogRecPtr  restart_lsn 
)
static

Definition at line 123 of file slotfuncs.c.

References Assert, CreateInitDecodingContext(), DecodingContextFindStartpoint(), FreeDecodingContext(), logical_read_local_xlog_page(), MyReplicationSlot, NIL, ReplicationSlotCreate(), RS_EPHEMERAL, and RS_TEMPORARY.

Referenced by copy_replication_slot(), and pg_create_logical_replication_slot().

125 {
126  LogicalDecodingContext *ctx = NULL;
127 
129 
130  /*
131  * Acquire a logical decoding slot, this will check for conflicting names.
132  * Initially create persistent slot as ephemeral - that allows us to
133  * nicely handle errors during initialization because it'll get dropped if
134  * this transaction fails. We'll make it persistent at the end. Temporary
135  * slots can be created as temporary from beginning as they get dropped on
136  * error as well.
137  */
139  temporary ? RS_TEMPORARY : RS_EPHEMERAL);
140 
141  /*
142  * Create logical decoding context, to build the initial snapshot.
143  */
145  false, /* do not build snapshot */
146  restart_lsn,
147  logical_read_local_xlog_page, NULL, NULL,
148  NULL);
149 
150  /* build initial snapshot, might take a while */
152 
153  /* don't need the decoding context anymore */
154  FreeDecodingContext(ctx);
155 }
#define NIL
Definition: pg_list.h:65
static const char * plugin
int logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: logicalfuncs.c:110
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:221
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:462
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:227
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:739
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:508
const char * name
Definition: encode.c:521

◆ create_physical_replication_slot()

static void create_physical_replication_slot ( char *  name,
bool  immediately_reserve,
bool  temporary,
XLogRecPtr  restart_lsn 
)
static

Definition at line 46 of file slotfuncs.c.

References Assert, ReplicationSlot::data, MyReplicationSlot, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotReserveWal(), ReplicationSlotSave(), ReplicationSlotPersistentData::restart_lsn, RS_PERSISTENT, RS_TEMPORARY, and XLogRecPtrIsInvalid.

Referenced by copy_replication_slot(), and pg_create_physical_replication_slot().

48 {
50 
51  /* acquire replication slot, this will check for conflicting names */
53  temporary ? RS_TEMPORARY : RS_PERSISTENT);
54 
55  if (immediately_reserve)
56  {
57  /* Reserve WAL as the user asked for it */
58  if (XLogRecPtrIsInvalid(restart_lsn))
60  else
61  MyReplicationSlot->data.restart_lsn = restart_lsn;
62 
63  /* Write this slot to disk */
66  }
67 }
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:221
void ReplicationSlotSave(void)
Definition: slot.c:645
ReplicationSlotPersistentData data
Definition: slot.h:132
void ReplicationSlotReserveWal(void)
Definition: slot.c:997
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:739
XLogRecPtr restart_lsn
Definition: slot.h:72
const char * name
Definition: encode.c:521
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663

◆ pg_copy_logical_replication_slot_a()

Datum pg_copy_logical_replication_slot_a ( PG_FUNCTION_ARGS  )

Definition at line 801 of file slotfuncs.c.

References copy_replication_slot().

802 {
803  return copy_replication_slot(fcinfo, true);
804 }
static Datum copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
Definition: slotfuncs.c:602

◆ pg_copy_logical_replication_slot_b()

Datum pg_copy_logical_replication_slot_b ( PG_FUNCTION_ARGS  )

Definition at line 807 of file slotfuncs.c.

References copy_replication_slot().

808 {
809  return copy_replication_slot(fcinfo, true);
810 }
static Datum copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
Definition: slotfuncs.c:602

◆ pg_copy_logical_replication_slot_c()

Datum pg_copy_logical_replication_slot_c ( PG_FUNCTION_ARGS  )

Definition at line 813 of file slotfuncs.c.

References copy_replication_slot().

814 {
815  return copy_replication_slot(fcinfo, true);
816 }
static Datum copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
Definition: slotfuncs.c:602

◆ pg_copy_physical_replication_slot_a()

Datum pg_copy_physical_replication_slot_a ( PG_FUNCTION_ARGS  )

Definition at line 819 of file slotfuncs.c.

References copy_replication_slot().

820 {
821  return copy_replication_slot(fcinfo, false);
822 }
static Datum copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
Definition: slotfuncs.c:602

◆ pg_copy_physical_replication_slot_b()

Datum pg_copy_physical_replication_slot_b ( PG_FUNCTION_ARGS  )

Definition at line 825 of file slotfuncs.c.

References copy_replication_slot().

826 {
827  return copy_replication_slot(fcinfo, false);
828 }
static Datum copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
Definition: slotfuncs.c:602

◆ pg_create_logical_replication_slot()

Datum pg_create_logical_replication_slot ( PG_FUNCTION_ARGS  )

Definition at line 161 of file slotfuncs.c.

References check_permissions(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, create_logical_replication_slot(), ReplicationSlot::data, elog, ERROR, get_call_result_type(), heap_form_tuple(), HeapTupleGetDatum, InvalidXLogRecPtr, LSNGetDatum, MyReplicationSlot, ReplicationSlotPersistentData::name, name, NameGetDatum, NameStr, PG_GETARG_BOOL, PG_GETARG_NAME, PG_RETURN_DATUM, plugin, ReplicationSlotPersist(), ReplicationSlotRelease(), TYPEFUNC_COMPOSITE, and values.

162 {
163  Name name = PG_GETARG_NAME(0);
165  bool temporary = PG_GETARG_BOOL(2);
166  Datum result;
167  TupleDesc tupdesc;
168  HeapTuple tuple;
169  Datum values[2];
170  bool nulls[2];
171 
172  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
173  elog(ERROR, "return type must be a row type");
174 
176 
178 
180  NameStr(*plugin),
181  temporary,
183 
184  values[0] = NameGetDatum(&MyReplicationSlot->data.name);
186 
187  memset(nulls, 0, sizeof(nulls));
188 
189  tuple = heap_form_tuple(tupdesc, values, nulls);
190  result = HeapTupleGetDatum(tuple);
191 
192  /* ok, slot is now fully created, mark it as persistent if needed */
193  if (!temporary)
196 
197  PG_RETURN_DATUM(result);
198 }
static const char * plugin
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define NameGetDatum(X)
Definition: postgres.h:595
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:196
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:269
ReplicationSlotPersistentData data
Definition: slot.h:132
XLogRecPtr confirmed_flush
Definition: slot.h:80
#define ERROR
Definition: elog.h:43
void ReplicationSlotPersist(void)
Definition: slot.c:680
Definition: c.h:610
void ReplicationSlotRelease(void)
Definition: slot.c:424
static void check_permissions(void)
Definition: slotfuncs.c:29
static void create_logical_replication_slot(char *name, char *plugin, bool temporary, XLogRecPtr restart_lsn)
Definition: slotfuncs.c:123
uintptr_t Datum
Definition: postgres.h:367
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:343
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:220
const char * name
Definition: encode.c:521
static Datum values[MAXATTR]
Definition: bootstrap.c:167
#define elog(elevel,...)
Definition: elog.h:228
#define NameStr(name)
Definition: c.h:616
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:75
#define PG_GETARG_NAME(n)
Definition: fmgr.h:273

◆ pg_create_physical_replication_slot()

Datum pg_create_physical_replication_slot ( PG_FUNCTION_ARGS  )

Definition at line 74 of file slotfuncs.c.

References check_permissions(), CheckSlotRequirements(), create_physical_replication_slot(), ReplicationSlot::data, elog, ERROR, get_call_result_type(), heap_form_tuple(), HeapTupleGetDatum, InvalidXLogRecPtr, LSNGetDatum, MyReplicationSlot, ReplicationSlotPersistentData::name, name, NameGetDatum, NameStr, PG_GETARG_BOOL, PG_GETARG_NAME, PG_RETURN_DATUM, ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, TYPEFUNC_COMPOSITE, and values.

75 {
77  bool immediately_reserve = PG_GETARG_BOOL(1);
78  bool temporary = PG_GETARG_BOOL(2);
79  Datum values[2];
80  bool nulls[2];
81  TupleDesc tupdesc;
82  HeapTuple tuple;
83  Datum result;
84 
85  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
86  elog(ERROR, "return type must be a row type");
87 
89 
91 
93  immediately_reserve,
94  temporary,
96 
97  values[0] = NameGetDatum(&MyReplicationSlot->data.name);
98  nulls[0] = false;
99 
100  if (immediately_reserve)
101  {
103  nulls[1] = false;
104  }
105  else
106  nulls[1] = true;
107 
108  tuple = heap_form_tuple(tupdesc, values, nulls);
109  result = HeapTupleGetDatum(tuple);
110 
112 
113  PG_RETURN_DATUM(result);
114 }
void CheckSlotRequirements(void)
Definition: slot.c:972
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define NameGetDatum(X)
Definition: postgres.h:595
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:196
static void create_physical_replication_slot(char *name, bool immediately_reserve, bool temporary, XLogRecPtr restart_lsn)
Definition: slotfuncs.c:46
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:269
ReplicationSlotPersistentData data
Definition: slot.h:132
#define ERROR
Definition: elog.h:43
Definition: c.h:610
void ReplicationSlotRelease(void)
Definition: slot.c:424
static void check_permissions(void)
Definition: slotfuncs.c:29
uintptr_t Datum
Definition: postgres.h:367
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:343
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
XLogRecPtr restart_lsn
Definition: slot.h:72
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:220
const char * name
Definition: encode.c:521
static Datum values[MAXATTR]
Definition: bootstrap.c:167
#define elog(elevel,...)
Definition: elog.h:228
#define NameStr(name)
Definition: c.h:616
#define PG_GETARG_NAME(n)
Definition: fmgr.h:273

◆ pg_drop_replication_slot()

Datum pg_drop_replication_slot ( PG_FUNCTION_ARGS  )

Definition at line 205 of file slotfuncs.c.

References check_permissions(), CheckSlotRequirements(), name, NameStr, PG_GETARG_NAME, PG_RETURN_VOID, and ReplicationSlotDrop().

206 {
207  Name name = PG_GETARG_NAME(0);
208 
210 
212 
213  ReplicationSlotDrop(NameStr(*name), true);
214 
215  PG_RETURN_VOID();
216 }
void CheckSlotRequirements(void)
Definition: slot.c:972
Definition: c.h:610
static void check_permissions(void)
Definition: slotfuncs.c:29
#define PG_RETURN_VOID()
Definition: fmgr.h:339
const char * name
Definition: encode.c:521
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:517
#define NameStr(name)
Definition: c.h:616
#define PG_GETARG_NAME(n)
Definition: fmgr.h:273

◆ pg_get_replication_slots()

Datum pg_get_replication_slots ( PG_FUNCTION_ARGS  )

Definition at line 222 of file slotfuncs.c.

References ReplicationSlot::active_pid, ReturnSetInfo::allowedModes, BoolGetDatum, ReplicationSlotPersistentData::catalog_xmin, ReplicationSlotPersistentData::confirmed_flush, CStringGetTextDatum, ReplicationSlot::data, ReplicationSlotPersistentData::database, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, get_call_result_type(), i, ReplicationSlot::in_use, Int32GetDatum, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsA, LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, MemoryContextSwitchTo(), ReplicationSlot::mutex, ReplicationSlotPersistentData::name, namecpy(), NameGetDatum, ReplicationSlotPersistentData::persistency, PG_GET_REPLICATION_SLOTS_COLS, plugin, ReplicationSlotPersistentData::plugin, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotPersistentData::restart_lsn, ReturnSetInfo::returnMode, RS_TEMPORARY, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, SpinLockAcquire, SpinLockRelease, TransactionIdGetDatum, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, work_mem, and ReplicationSlotPersistentData::xmin.

223 {
224 #define PG_GET_REPLICATION_SLOTS_COLS 11
225  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
226  TupleDesc tupdesc;
227  Tuplestorestate *tupstore;
228  MemoryContext per_query_ctx;
229  MemoryContext oldcontext;
230  int slotno;
231 
232  /* check to see if caller supports us returning a tuplestore */
233  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
234  ereport(ERROR,
235  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
236  errmsg("set-valued function called in context that cannot accept a set")));
237  if (!(rsinfo->allowedModes & SFRM_Materialize))
238  ereport(ERROR,
239  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
240  errmsg("materialize mode required, but it is not " \
241  "allowed in this context")));
242 
243  /* Build a tuple descriptor for our result type */
244  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
245  elog(ERROR, "return type must be a row type");
246 
247  /*
248  * We don't require any special permission to see this function's data
249  * because nothing should be sensitive. The most critical being the slot
250  * name, which shouldn't contain anything particularly sensitive.
251  */
252 
253  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
254  oldcontext = MemoryContextSwitchTo(per_query_ctx);
255 
256  tupstore = tuplestore_begin_heap(true, false, work_mem);
257  rsinfo->returnMode = SFRM_Materialize;
258  rsinfo->setResult = tupstore;
259  rsinfo->setDesc = tupdesc;
260 
261  MemoryContextSwitchTo(oldcontext);
262 
263  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
264  for (slotno = 0; slotno < max_replication_slots; slotno++)
265  {
268  bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
269 
270  ReplicationSlotPersistency persistency;
271  TransactionId xmin;
272  TransactionId catalog_xmin;
273  XLogRecPtr restart_lsn;
274  XLogRecPtr confirmed_flush_lsn;
275  pid_t active_pid;
276  Oid database;
277  NameData slot_name;
279  int i;
280 
281  if (!slot->in_use)
282  continue;
283 
284  SpinLockAcquire(&slot->mutex);
285 
286  xmin = slot->data.xmin;
287  catalog_xmin = slot->data.catalog_xmin;
288  database = slot->data.database;
289  restart_lsn = slot->data.restart_lsn;
290  confirmed_flush_lsn = slot->data.confirmed_flush;
291  namecpy(&slot_name, &slot->data.name);
292  namecpy(&plugin, &slot->data.plugin);
293  active_pid = slot->active_pid;
294  persistency = slot->data.persistency;
295 
296  SpinLockRelease(&slot->mutex);
297 
298  memset(nulls, 0, sizeof(nulls));
299 
300  i = 0;
301  values[i++] = NameGetDatum(&slot_name);
302 
303  if (database == InvalidOid)
304  nulls[i++] = true;
305  else
306  values[i++] = NameGetDatum(&plugin);
307 
308  if (database == InvalidOid)
309  values[i++] = CStringGetTextDatum("physical");
310  else
311  values[i++] = CStringGetTextDatum("logical");
312 
313  if (database == InvalidOid)
314  nulls[i++] = true;
315  else
316  values[i++] = database;
317 
318  values[i++] = BoolGetDatum(persistency == RS_TEMPORARY);
319  values[i++] = BoolGetDatum(active_pid != 0);
320 
321  if (active_pid != 0)
322  values[i++] = Int32GetDatum(active_pid);
323  else
324  nulls[i++] = true;
325 
326  if (xmin != InvalidTransactionId)
327  values[i++] = TransactionIdGetDatum(xmin);
328  else
329  nulls[i++] = true;
330 
331  if (catalog_xmin != InvalidTransactionId)
332  values[i++] = TransactionIdGetDatum(catalog_xmin);
333  else
334  nulls[i++] = true;
335 
336  if (restart_lsn != InvalidXLogRecPtr)
337  values[i++] = LSNGetDatum(restart_lsn);
338  else
339  nulls[i++] = true;
340 
341  if (confirmed_flush_lsn != InvalidXLogRecPtr)
342  values[i++] = LSNGetDatum(confirmed_flush_lsn);
343  else
344  nulls[i++] = true;
345 
346  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
347  }
348  LWLockRelease(ReplicationSlotControlLock);
349 
350  tuplestore_donestoring(tupstore);
351 
352  return (Datum) 0;
353 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static const char * plugin
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:93
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:576
#define NameGetDatum(X)
Definition: postgres.h:595
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:196
#define PG_GET_REPLICATION_SLOTS_COLS
uint32 TransactionId
Definition: c.h:514
int namecpy(Name n1, const NameData *n2)
Definition: name.c:233
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
ReplicationSlotPersistency persistency
Definition: slot.h:53
int errcode(int sqlerrcode)
Definition: elog.c:608
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
ReplicationSlotPersistentData data
Definition: slot.h:132
unsigned int Oid
Definition: postgres_ext.h:31
XLogRecPtr confirmed_flush
Definition: slot.h:80
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
Definition: c.h:610
TransactionId catalog_xmin
Definition: slot.h:69
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: slot.h:61
#define ereport(elevel, rest)
Definition: elog.h:141
bool in_use
Definition: slot.h:108
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
#define TransactionIdGetDatum(X)
Definition: postgres.h:521
uintptr_t Datum
Definition: postgres.h:367
int work_mem
Definition: globals.c:121
#define BoolGetDatum(X)
Definition: postgres.h:402
#define InvalidOid
Definition: postgres_ext.h:36
int allowedModes
Definition: execnodes.h:302
SetFunctionReturnMode returnMode
Definition: execnodes.h:304
int max_replication_slots
Definition: slot.c:99
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogRecPtr restart_lsn
Definition: slot.h:72
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:230
Tuplestorestate * setResult
Definition: execnodes.h:307
static Datum values[MAXATTR]
Definition: bootstrap.c:167
ExprContext * econtext
Definition: execnodes.h:300
#define Int32GetDatum(X)
Definition: postgres.h:479
TupleDesc setDesc
Definition: execnodes.h:308
ReplicationSlotPersistency
Definition: slot.h:32
int errmsg(const char *fmt,...)
Definition: elog.c:822
pid_t active_pid
Definition: slot.h:111
#define elog(elevel,...)
Definition: elog.h:228
int i
#define CStringGetTextDatum(s)
Definition: builtins.h:83
ReplicationSlot replication_slots[1]
Definition: slot.h:165
slock_t mutex
Definition: slot.h:105

◆ pg_logical_replication_slot_advance()

static XLogRecPtr pg_logical_replication_slot_advance ( XLogRecPtr  moveto)
static

Definition at line 391 of file slotfuncs.c.

References CHECK_FOR_INTERRUPTS, ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), CurrentResourceOwner, ReplicationSlot::data, elog, XLogReaderState::EndRecPtr, ERROR, FreeDecodingContext(), InvalidateSystemCaches(), InvalidXLogRecPtr, logical_read_local_xlog_page(), LogicalConfirmReceivedLocation(), LogicalDecodingProcessRecord(), MyReplicationSlot, NIL, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, LogicalDecodingContext::reader, ReplicationSlotMarkDirty(), ReplicationSlotPersistentData::restart_lsn, XLogReadRecord(), and XLogRecPtrIsInvalid.

Referenced by pg_replication_slot_advance().

392 {
394  ResourceOwner old_resowner = CurrentResourceOwner;
395  XLogRecPtr startlsn;
396  XLogRecPtr retlsn;
397 
398  PG_TRY();
399  {
400  /*
401  * Create our decoding context in fast_forward mode, passing start_lsn
402  * as InvalidXLogRecPtr, so that we start processing from my slot's
403  * confirmed_flush.
404  */
406  NIL,
407  true, /* fast_forward */
409  NULL, NULL, NULL);
410 
411  /*
412  * Start reading at the slot's restart_lsn, which we know to point to
413  * a valid record.
414  */
415  startlsn = MyReplicationSlot->data.restart_lsn;
416 
417  /* Initialize our return value in case we don't do anything */
419 
420  /* invalidate non-timetravel entries */
422 
423  /* Decode at least one record, until we run out of records */
424  while ((!XLogRecPtrIsInvalid(startlsn) &&
425  startlsn < moveto) ||
427  ctx->reader->EndRecPtr < moveto))
428  {
429  char *errm = NULL;
430  XLogRecord *record;
431 
432  /*
433  * Read records. No changes are generated in fast_forward mode,
434  * but snapbuilder/slot statuses are updated properly.
435  */
436  record = XLogReadRecord(ctx->reader, startlsn, &errm);
437  if (errm)
438  elog(ERROR, "%s", errm);
439 
440  /* Read sequentially from now on */
441  startlsn = InvalidXLogRecPtr;
442 
443  /*
444  * Process the record. Storage-level changes are ignored in
445  * fast_forward mode, but other modules (such as snapbuilder)
446  * might still have critical updates to do.
447  */
448  if (record)
450 
451  /* Stop once the requested target has been reached */
452  if (moveto <= ctx->reader->EndRecPtr)
453  break;
454 
456  }
457 
458  /*
459  * Logical decoding could have clobbered CurrentResourceOwner during
460  * transaction management, so restore the executor's value. (This is
461  * a kluge, but it's not worth cleaning up right now.)
462  */
463  CurrentResourceOwner = old_resowner;
464 
465  if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
466  {
468 
469  /*
470  * If only the confirmed_flush LSN has changed the slot won't get
471  * marked as dirty by the above. Callers on the walsender
472  * interface are expected to keep track of their own progress and
473  * don't need it written out. But SQL-interface users cannot
474  * specify their own start positions and it's harder for them to
475  * keep track of their progress, so we should make more of an
476  * effort to save it for them.
477  *
478  * Dirty the slot so it's written out at the next checkpoint.
479  * We'll still lose its position on crash, as documented, but it's
480  * better than always losing the position even on clean restart.
481  */
483  }
484 
486 
487  /* free context, call shutdown callback */
488  FreeDecodingContext(ctx);
489 
491  }
492  PG_CATCH();
493  {
494  /* clear all timetravel entries */
496 
497  PG_RE_THROW();
498  }
499  PG_END_TRY();
500 
501  return retlsn;
502 }
#define NIL
Definition: pg_list.h:65
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ResourceOwner CurrentResourceOwner
Definition: resowner.c:142
int logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: logicalfuncs.c:110
ReplicationSlotPersistentData data
Definition: slot.h:132
void InvalidateSystemCaches(void)
Definition: inval.c:643
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:238
XLogRecPtr confirmed_flush
Definition: slot.h:80
XLogRecPtr EndRecPtr
Definition: xlogreader.h:132
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:94
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:370
#define PG_CATCH()
Definition: elog.h:332
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:508
XLogRecPtr restart_lsn
Definition: slot.h:72
#define PG_RE_THROW()
Definition: elog.h:363
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1001
XLogReaderState * reader
Definition: logical.h:41
#define elog(elevel,...)
Definition: elog.h:228
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
#define PG_TRY()
Definition: elog.h:322
#define PG_END_TRY()
Definition: elog.h:347
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663

◆ pg_physical_replication_slot_advance()

static XLogRecPtr pg_physical_replication_slot_advance ( XLogRecPtr  moveto)
static

Definition at line 363 of file slotfuncs.c.

References ReplicationSlot::data, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by pg_replication_slot_advance().

364 {
366  XLogRecPtr retlsn = startlsn;
367 
368  if (startlsn < moveto)
369  {
373  retlsn = moveto;
374  }
375 
376  return retlsn;
377 }
ReplicationSlotPersistentData data
Definition: slot.h:132
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogRecPtr restart_lsn
Definition: slot.h:72
slock_t mutex
Definition: slot.h:105

◆ pg_replication_slot_advance()

Datum pg_replication_slot_advance ( PG_FUNCTION_ARGS  )

Definition at line 508 of file slotfuncs.c.

References Assert, check_permissions(), ReplicationSlotPersistentData::confirmed_flush, ReplicationSlot::data, ReplicationSlotPersistentData::database, elog, ereport, errcode(), errmsg(), ERROR, get_call_result_type(), GetFlushRecPtr(), GetXLogReplayRecPtr(), heap_form_tuple(), HeapTupleGetDatum, LSNGetDatum, Min, MyReplicationSlot, ReplicationSlotPersistentData::name, NameGetDatum, NameStr, OidIsValid, PG_GETARG_LSN, PG_GETARG_NAME, pg_logical_replication_slot_advance(), pg_physical_replication_slot_advance(), PG_RETURN_DATUM, RecoveryInProgress(), ReplicationSlotAcquire(), ReplicationSlotMarkDirty(), ReplicationSlotRelease(), ReplicationSlotSave(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), ReplicationSlotPersistentData::restart_lsn, ThisTimeLineID, TYPEFUNC_COMPOSITE, values, and XLogRecPtrIsInvalid.

509 {
510  Name slotname = PG_GETARG_NAME(0);
511  XLogRecPtr moveto = PG_GETARG_LSN(1);
512  XLogRecPtr endlsn;
513  XLogRecPtr minlsn;
514  TupleDesc tupdesc;
515  Datum values[2];
516  bool nulls[2];
517  HeapTuple tuple;
518  Datum result;
519 
521 
523 
524  if (XLogRecPtrIsInvalid(moveto))
525  ereport(ERROR,
526  (errmsg("invalid target WAL LSN")));
527 
528  /* Build a tuple descriptor for our result type */
529  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
530  elog(ERROR, "return type must be a row type");
531 
532  /*
533  * We can't move slot past what's been flushed/replayed so clamp the
534  * target position accordingly.
535  */
536  if (!RecoveryInProgress())
537  moveto = Min(moveto, GetFlushRecPtr());
538  else
539  moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
540 
541  /* Acquire the slot so we "own" it */
542  ReplicationSlotAcquire(NameStr(*slotname), true);
543 
544  /* A slot whose restart_lsn has never been reserved cannot be advanced */
546  ereport(ERROR,
547  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
548  errmsg("cannot advance replication slot that has not previously reserved WAL")));
549 
550  /*
551  * Check if the slot is not moving backwards. Physical slots rely simply
552  * on restart_lsn as a minimum point, while logical slots have confirmed
553  * consumption up to confirmed_lsn, meaning that in both cases data older
554  * than that is not available anymore.
555  */
558  else
560 
561  if (moveto < minlsn)
562  ereport(ERROR,
563  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
564  errmsg("cannot advance replication slot to %X/%X, minimum is %X/%X",
565  (uint32) (moveto >> 32), (uint32) moveto,
566  (uint32) (minlsn >> 32), (uint32) minlsn)));
567 
568  /* Do the actual slot update, depending on the slot type */
570  endlsn = pg_logical_replication_slot_advance(moveto);
571  else
572  endlsn = pg_physical_replication_slot_advance(moveto);
573 
574  values[0] = NameGetDatum(&MyReplicationSlot->data.name);
575  nulls[0] = false;
576 
577  /* Update the on disk state when lsn was updated. */
578  if (XLogRecPtrIsInvalid(endlsn))
579  {
584  }
585 
587 
588  /* Return the reached position. */
589  values[1] = LSNGetDatum(endlsn);
590  nulls[1] = false;
591 
592  tuple = heap_form_tuple(tupdesc, values, nulls);
593  result = HeapTupleGetDatum(tuple);
594 
595  PG_RETURN_DATUM(result);
596 }
static XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr moveto)
Definition: slotfuncs.c:391
#define NameGetDatum(X)
Definition: postgres.h:595
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:196
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:330
#define Min(x, y)
Definition: c.h:911
int errcode(int sqlerrcode)
Definition: elog.c:608
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
void ReplicationSlotSave(void)
Definition: slot.c:645
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8267
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
ReplicationSlotPersistentData data
Definition: slot.h:132
bool RecoveryInProgress(void)
Definition: xlog.c:7935
#define OidIsValid(objectId)
Definition: c.h:645
XLogRecPtr confirmed_flush
Definition: slot.h:80
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:748
#define ERROR
Definition: elog.h:43
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11172
Definition: c.h:610
unsigned int uint32
Definition: c.h:359
void ReplicationSlotRelease(void)
Definition: slot.c:424
static void check_permissions(void)
Definition: slotfuncs.c:29
#define ereport(elevel, rest)
Definition: elog.h:141
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:24
uintptr_t Datum
Definition: postgres.h:367
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:343
TimeLineID ThisTimeLineID
Definition: xlog.c:187
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:739
XLogRecPtr restart_lsn
Definition: slot.h:72
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:220
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
#define NameStr(name)
Definition: c.h:616
static XLogRecPtr pg_physical_replication_slot_advance(XLogRecPtr moveto)
Definition: slotfuncs.c:363
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:702
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663
#define PG_GETARG_NAME(n)
Definition: fmgr.h:273