PostgreSQL Source Code  git master
slotfuncs.c File Reference
#include "postgres.h"
#include "access/htup_details.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "utils/builtins.h"
#include "utils/inval.h"
#include "utils/pg_lsn.h"
#include "utils/resowner.h"
Include dependency graph for slotfuncs.c:

Go to the source code of this file.

Macros

#define PG_GET_REPLICATION_SLOTS_COLS   14
 

Functions

static void check_permissions (void)
 
static void create_physical_replication_slot (char *name, bool immediately_reserve, bool temporary, XLogRecPtr restart_lsn)
 
Datum pg_create_physical_replication_slot (PG_FUNCTION_ARGS)
 
static void create_logical_replication_slot (char *name, char *plugin, bool temporary, bool two_phase, XLogRecPtr restart_lsn, bool find_startpoint)
 
Datum pg_create_logical_replication_slot (PG_FUNCTION_ARGS)
 
Datum pg_drop_replication_slot (PG_FUNCTION_ARGS)
 
Datum pg_get_replication_slots (PG_FUNCTION_ARGS)
 
static XLogRecPtr pg_physical_replication_slot_advance (XLogRecPtr moveto)
 
static XLogRecPtr pg_logical_replication_slot_advance (XLogRecPtr moveto)
 
Datum pg_replication_slot_advance (PG_FUNCTION_ARGS)
 
static Datum copy_replication_slot (FunctionCallInfo fcinfo, bool logical_slot)
 
Datum pg_copy_logical_replication_slot_a (PG_FUNCTION_ARGS)
 
Datum pg_copy_logical_replication_slot_b (PG_FUNCTION_ARGS)
 
Datum pg_copy_logical_replication_slot_c (PG_FUNCTION_ARGS)
 
Datum pg_copy_physical_replication_slot_a (PG_FUNCTION_ARGS)
 
Datum pg_copy_physical_replication_slot_b (PG_FUNCTION_ARGS)
 

Macro Definition Documentation

◆ PG_GET_REPLICATION_SLOTS_COLS

#define PG_GET_REPLICATION_SLOTS_COLS   14

Function Documentation

◆ check_permissions()

static void check_permissions ( void  )
static

Definition at line 29 of file slotfuncs.c.

References ereport, errcode(), errmsg(), ERROR, GetUserId(), has_rolreplication(), and superuser().

Referenced by copy_replication_slot(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_drop_replication_slot(), and pg_replication_slot_advance().

30 {
32  ereport(ERROR,
33  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
34  errmsg("must be superuser or replication role to use replication slots")));
35 }
Oid GetUserId(void)
Definition: miscinit.c:478
int errcode(int sqlerrcode)
Definition: elog.c:698
bool superuser(void)
Definition: superuser.c:46
#define ERROR
Definition: elog.h:46
#define ereport(elevel,...)
Definition: elog.h:157
bool has_rolreplication(Oid roleid)
Definition: miscinit.c:659
int errmsg(const char *fmt,...)
Definition: elog.c:909

◆ copy_replication_slot()

static Datum copy_replication_slot ( FunctionCallInfo  fcinfo,
bool  logical_slot 
)
static

Definition at line 705 of file slotfuncs.c.

References Assert, ReplicationSlotPersistentData::catalog_xmin, check_permissions(), CheckLogicalDecodingRequirements(), CheckSlotRequirements(), ReplicationSlotPersistentData::confirmed_flush, create_logical_replication_slot(), create_physical_replication_slot(), ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, elog, ereport, errcode(), errdetail(), errhint(), errmsg(), ERROR, get_call_result_type(), heap_form_tuple(), HeapTupleGetDatum, i, ReplicationSlot::in_use, LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotPersistentData::name, NameGetDatum, NameStr, ReplicationSlotPersistentData::persistency, PG_GETARG_BOOL, PG_GETARG_NAME, PG_NARGS, PG_RETURN_DATUM, plugin, ReplicationSlotPersistentData::plugin, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotSave(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), ReplicationSlotPersistentData::restart_lsn, RS_TEMPORARY, SlotIsLogical, SpinLockAcquire, SpinLockRelease, TYPEFUNC_COMPOSITE, values, wal_segment_size, XLByteToSeg, XLogGetLastRemovedSegno(), XLogRecPtrIsInvalid, and ReplicationSlotPersistentData::xmin.

Referenced by pg_copy_logical_replication_slot_a(), pg_copy_logical_replication_slot_b(), pg_copy_logical_replication_slot_c(), pg_copy_physical_replication_slot_a(), and pg_copy_physical_replication_slot_b().

706 {
707  Name src_name = PG_GETARG_NAME(0);
708  Name dst_name = PG_GETARG_NAME(1);
709  ReplicationSlot *src = NULL;
710  ReplicationSlot first_slot_contents;
711  ReplicationSlot second_slot_contents;
712  XLogRecPtr src_restart_lsn;
713  bool src_islogical;
714  bool temporary;
715  char *plugin;
716  Datum values[2];
717  bool nulls[2];
718  Datum result;
719  TupleDesc tupdesc;
720  HeapTuple tuple;
721 
722  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
723  elog(ERROR, "return type must be a row type");
724 
726 
727  if (logical_slot)
729  else
731 
732  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
733 
734  /*
735  * We need to prevent the source slot's reserved WAL from being removed,
736  * but we don't want to lock that slot for very long, and it can advance
737  * in the meantime. So obtain the source slot's data, and create a new
738  * slot using its restart_lsn. Afterwards we lock the source slot again
739  * and verify that the data we copied (name, type) has not changed
740  * incompatibly. No inconvenient WAL removal can occur once the new slot
741  * is created -- but since WAL removal could have occurred before we
742  * managed to create the new slot, we advance the new slot's restart_lsn
743  * to the source slot's updated restart_lsn the second time we lock it.
744  */
745  for (int i = 0; i < max_replication_slots; i++)
746  {
748 
749  if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
750  {
751  /* Copy the slot contents while holding spinlock */
752  SpinLockAcquire(&s->mutex);
753  first_slot_contents = *s;
754  SpinLockRelease(&s->mutex);
755  src = s;
756  break;
757  }
758  }
759 
760  LWLockRelease(ReplicationSlotControlLock);
761 
762  if (src == NULL)
763  ereport(ERROR,
764  (errcode(ERRCODE_UNDEFINED_OBJECT),
765  errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
766 
767  src_islogical = SlotIsLogical(&first_slot_contents);
768  src_restart_lsn = first_slot_contents.data.restart_lsn;
769  temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
770  plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
771 
772  /* Check type of replication slot */
773  if (src_islogical != logical_slot)
774  ereport(ERROR,
775  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
776  src_islogical ?
777  errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot",
778  NameStr(*src_name)) :
779  errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot",
780  NameStr(*src_name))));
781 
782  /* Copying non-reserved slot doesn't make sense */
783  if (XLogRecPtrIsInvalid(src_restart_lsn))
784  ereport(ERROR,
785  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
786  errmsg("cannot copy a replication slot that doesn't reserve WAL")));
787 
788  /* Overwrite params from optional arguments */
789  if (PG_NARGS() >= 3)
790  temporary = PG_GETARG_BOOL(2);
791  if (PG_NARGS() >= 4)
792  {
793  Assert(logical_slot);
794  plugin = NameStr(*(PG_GETARG_NAME(3)));
795  }
796 
797  /* Create new slot and acquire it */
798  if (logical_slot)
799  {
800  /*
801  * We must not try to read WAL, since we haven't reserved it yet --
802  * hence pass find_startpoint false. confirmed_flush will be set
803  * below, by copying from the source slot.
804  */
806  plugin,
807  temporary,
808  false,
809  src_restart_lsn,
810  false);
811  }
812  else
814  true,
815  temporary,
816  src_restart_lsn);
817 
818  /*
819  * Update the destination slot to current values of the source slot;
820  * recheck that the source slot is still the one we saw previously.
821  */
822  {
823  TransactionId copy_effective_xmin;
824  TransactionId copy_effective_catalog_xmin;
825  TransactionId copy_xmin;
826  TransactionId copy_catalog_xmin;
827  XLogRecPtr copy_restart_lsn;
828  XLogRecPtr copy_confirmed_flush;
829  bool copy_islogical;
830  char *copy_name;
831 
832  /* Copy data of source slot again */
833  SpinLockAcquire(&src->mutex);
834  second_slot_contents = *src;
835  SpinLockRelease(&src->mutex);
836 
837  copy_effective_xmin = second_slot_contents.effective_xmin;
838  copy_effective_catalog_xmin = second_slot_contents.effective_catalog_xmin;
839 
840  copy_xmin = second_slot_contents.data.xmin;
841  copy_catalog_xmin = second_slot_contents.data.catalog_xmin;
842  copy_restart_lsn = second_slot_contents.data.restart_lsn;
843  copy_confirmed_flush = second_slot_contents.data.confirmed_flush;
844 
845  /* for existence check */
846  copy_name = NameStr(second_slot_contents.data.name);
847  copy_islogical = SlotIsLogical(&second_slot_contents);
848 
849  /*
850  * Check if the source slot still exists and is valid. We regard it as
851  * invalid if the type of replication slot or name has been changed,
852  * or the restart_lsn either is invalid or has gone backward. (The
853  * restart_lsn could go backwards if the source slot is dropped and
854  * copied from an older slot during installation.)
855  *
856  * Since erroring out will release and drop the destination slot we
857  * don't need to release it here.
858  */
859  if (copy_restart_lsn < src_restart_lsn ||
860  src_islogical != copy_islogical ||
861  strcmp(copy_name, NameStr(*src_name)) != 0)
862  ereport(ERROR,
863  (errmsg("could not copy replication slot \"%s\"",
864  NameStr(*src_name)),
865  errdetail("The source replication slot was modified incompatibly during the copy operation.")));
866 
867  /* The source slot must have a consistent snapshot */
868  if (src_islogical && XLogRecPtrIsInvalid(copy_confirmed_flush))
869  ereport(ERROR,
870  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
871  errmsg("cannot copy unfinished logical replication slot \"%s\"",
872  NameStr(*src_name)),
873  errhint("Retry when the source replication slot's confirmed_flush_lsn is valid.")));
874 
875  /* Install copied values again */
877  MyReplicationSlot->effective_xmin = copy_effective_xmin;
878  MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin;
879 
880  MyReplicationSlot->data.xmin = copy_xmin;
881  MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
882  MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
883  MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush;
885 
890 
891 #ifdef USE_ASSERT_CHECKING
892  /* Check that the restart_lsn is available */
893  {
894  XLogSegNo segno;
895 
896  XLByteToSeg(copy_restart_lsn, segno, wal_segment_size);
897  Assert(XLogGetLastRemovedSegno() < segno);
898  }
899 #endif
900  }
901 
902  /* target slot fully created, mark as persistent if needed */
903  if (logical_slot && !temporary)
905 
906  /* All done. Set up the return values */
907  values[0] = NameGetDatum(dst_name);
908  nulls[0] = false;
910  {
912  nulls[1] = false;
913  }
914  else
915  nulls[1] = true;
916 
917  tuple = heap_form_tuple(tupdesc, values, nulls);
918  result = HeapTupleGetDatum(tuple);
919 
921 
922  PG_RETURN_DATUM(result);
923 }
static const char * plugin
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:93
void CheckSlotRequirements(void)
Definition: slot.c:1066
#define NameGetDatum(X)
Definition: postgres.h:639
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:207
int errhint(const char *fmt,...)
Definition: elog.c:1156
static void create_physical_replication_slot(char *name, bool immediately_reserve, bool temporary, XLogRecPtr restart_lsn)
Definition: slotfuncs.c:46
int wal_segment_size
Definition: xlog.c:121
uint32 TransactionId
Definition: c.h:587
ReplicationSlotPersistency persistency
Definition: slot.h:62
int errcode(int sqlerrcode)
Definition: elog.c:698
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
void ReplicationSlotSave(void)
Definition: slot.c:732
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:274
ReplicationSlotPersistentData data
Definition: slot.h:156
XLogRecPtr confirmed_flush
Definition: slot.h:92
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1805
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogSegNo XLogGetLastRemovedSegno(void)
Definition: xlog.c:4007
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:839
#define ERROR
Definition: elog.h:46
void ReplicationSlotPersist(void)
Definition: slot.c:767
TransactionId effective_xmin
Definition: slot.h:152
Definition: c.h:675
static void create_logical_replication_slot(char *name, char *plugin, bool temporary, bool two_phase, XLogRecPtr restart_lsn, bool find_startpoint)
Definition: slotfuncs.c:126
uint64 XLogSegNo
Definition: xlogdefs.h:48
int errdetail(const char *fmt,...)
Definition: elog.c:1042
TransactionId catalog_xmin
Definition: slot.h:78
void ReplicationSlotRelease(void)
Definition: slot.c:497
TransactionId xmin
Definition: slot.h:70
#define SlotIsLogical(slot)
Definition: slot.h:178
static void check_permissions(void)
Definition: slotfuncs.c:29
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
bool in_use
Definition: slot.h:132
#define SpinLockRelease(lock)
Definition: spin.h:64
uintptr_t Datum
Definition: postgres.h:411
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:353
TransactionId effective_catalog_xmin
Definition: slot.h:153
#define ereport(elevel,...)
Definition: elog.h:157
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
int max_replication_slots
Definition: slot.c:99
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
XLogRecPtr restart_lsn
Definition: slot.h:81
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1203
#define PG_NARGS()
Definition: fmgr.h:203
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:221
static Datum values[MAXATTR]
Definition: bootstrap.c:166
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define elog(elevel,...)
Definition: elog.h:232
int i
#define NameStr(name)
Definition: c.h:681
ReplicationSlot replication_slots[1]
Definition: slot.h:189
slock_t mutex
Definition: slot.h:129
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:103
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:789
void ReplicationSlotMarkDirty(void)
Definition: slot.c:750
#define PG_GETARG_NAME(n)
Definition: fmgr.h:278
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ create_logical_replication_slot()

static void create_logical_replication_slot ( char *  name,
char *  plugin,
bool  temporary,
bool  two_phase,
XLogRecPtr  restart_lsn,
bool  find_startpoint 
)
static

Definition at line 126 of file slotfuncs.c.

References Assert, CreateInitDecodingContext(), DecodingContextFindStartpoint(), FreeDecodingContext(), MyReplicationSlot, NIL, read_local_xlog_page(), ReplicationSlotCreate(), RS_EPHEMERAL, RS_TEMPORARY, and wal_segment_close().

Referenced by copy_replication_slot(), and pg_create_logical_replication_slot().

130 {
131  LogicalDecodingContext *ctx = NULL;
132 
134 
135  /*
136  * Acquire a logical decoding slot, this will check for conflicting names.
137  * Initially create persistent slot as ephemeral - that allows us to
138  * nicely handle errors during initialization because it'll get dropped if
139  * this transaction fails. We'll make it persistent at the end. Temporary
140  * slots can be created as temporary from beginning as they get dropped on
141  * error as well.
142  */
144  temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase);
145 
146  /*
147  * Create logical decoding context to find start point or, if we don't
148  * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
149  *
150  * Note: when !find_startpoint this is still important, because it's at
151  * this point that the output plugin is validated.
152  */
154  false, /* just catalogs is OK */
155  restart_lsn,
158  NULL, NULL, NULL);
159 
160  /*
161  * If caller needs us to determine the decoding start point, do so now.
162  * This might take a while.
163  */
164  if (find_startpoint)
166 
167  /* don't need the decoding context anymore */
168  FreeDecodingContext(ctx);
169 }
#define NIL
Definition: pg_list.h:65
static const char * plugin
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:823
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase)
Definition: slot.c:228
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:575
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:804
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:624
const char * name
Definition: encode.c:515
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, LogicalDecodingXLogPageReadCB page_read, WALSegmentCleanupCB cleanup_cb, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:320
bool read_local_xlog_page(XLogReaderState *state)
Definition: xlogutils.c:842

◆ create_physical_replication_slot()

static void create_physical_replication_slot ( char *  name,
bool  immediately_reserve,
bool  temporary,
XLogRecPtr  restart_lsn 
)
static

Definition at line 46 of file slotfuncs.c.

References Assert, ReplicationSlot::data, MyReplicationSlot, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotReserveWal(), ReplicationSlotSave(), ReplicationSlotPersistentData::restart_lsn, RS_PERSISTENT, RS_TEMPORARY, and XLogRecPtrIsInvalid.

Referenced by copy_replication_slot(), and pg_create_physical_replication_slot().

48 {
50 
51  /* acquire replication slot, this will check for conflicting names */
53  temporary ? RS_TEMPORARY : RS_PERSISTENT, false);
54 
55  if (immediately_reserve)
56  {
57  /* Reserve WAL as the user asked for it */
58  if (XLogRecPtrIsInvalid(restart_lsn))
60  else
61  MyReplicationSlot->data.restart_lsn = restart_lsn;
62 
63  /* Write this slot to disk */
66  }
67 }
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase)
Definition: slot.c:228
void ReplicationSlotSave(void)
Definition: slot.c:732
ReplicationSlotPersistentData data
Definition: slot.h:156
void ReplicationSlotReserveWal(void)
Definition: slot.c:1091
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:804
XLogRecPtr restart_lsn
Definition: slot.h:81
const char * name
Definition: encode.c:515
void ReplicationSlotMarkDirty(void)
Definition: slot.c:750

◆ pg_copy_logical_replication_slot_a()

Datum pg_copy_logical_replication_slot_a ( PG_FUNCTION_ARGS  )

Definition at line 927 of file slotfuncs.c.

References copy_replication_slot().

928 {
929  return copy_replication_slot(fcinfo, true);
930 }
static Datum copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
Definition: slotfuncs.c:705

◆ pg_copy_logical_replication_slot_b()

Datum pg_copy_logical_replication_slot_b ( PG_FUNCTION_ARGS  )

Definition at line 933 of file slotfuncs.c.

References copy_replication_slot().

934 {
935  return copy_replication_slot(fcinfo, true);
936 }
static Datum copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
Definition: slotfuncs.c:705

◆ pg_copy_logical_replication_slot_c()

Datum pg_copy_logical_replication_slot_c ( PG_FUNCTION_ARGS  )

Definition at line 939 of file slotfuncs.c.

References copy_replication_slot().

940 {
941  return copy_replication_slot(fcinfo, true);
942 }
static Datum copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
Definition: slotfuncs.c:705

◆ pg_copy_physical_replication_slot_a()

Datum pg_copy_physical_replication_slot_a ( PG_FUNCTION_ARGS  )

Definition at line 945 of file slotfuncs.c.

References copy_replication_slot().

946 {
947  return copy_replication_slot(fcinfo, false);
948 }
static Datum copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
Definition: slotfuncs.c:705

◆ pg_copy_physical_replication_slot_b()

Datum pg_copy_physical_replication_slot_b ( PG_FUNCTION_ARGS  )

Definition at line 951 of file slotfuncs.c.

References copy_replication_slot().

952 {
953  return copy_replication_slot(fcinfo, false);
954 }
static Datum copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
Definition: slotfuncs.c:705

◆ pg_create_logical_replication_slot()

Datum pg_create_logical_replication_slot ( PG_FUNCTION_ARGS  )

Definition at line 175 of file slotfuncs.c.

References check_permissions(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, create_logical_replication_slot(), ReplicationSlot::data, elog, ERROR, get_call_result_type(), heap_form_tuple(), HeapTupleGetDatum, InvalidXLogRecPtr, LSNGetDatum, MyReplicationSlot, ReplicationSlotPersistentData::name, name, NameGetDatum, NameStr, PG_GETARG_BOOL, PG_GETARG_NAME, PG_RETURN_DATUM, plugin, ReplicationSlotPersist(), ReplicationSlotRelease(), TYPEFUNC_COMPOSITE, and values.

176 {
177  Name name = PG_GETARG_NAME(0);
179  bool temporary = PG_GETARG_BOOL(2);
180  bool two_phase = PG_GETARG_BOOL(3);
181  Datum result;
182  TupleDesc tupdesc;
183  HeapTuple tuple;
184  Datum values[2];
185  bool nulls[2];
186 
187  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
188  elog(ERROR, "return type must be a row type");
189 
191 
193 
195  NameStr(*plugin),
196  temporary,
197  two_phase,
199  true);
200 
201  values[0] = NameGetDatum(&MyReplicationSlot->data.name);
203 
204  memset(nulls, 0, sizeof(nulls));
205 
206  tuple = heap_form_tuple(tupdesc, values, nulls);
207  result = HeapTupleGetDatum(tuple);
208 
209  /* ok, slot is now fully created, mark it as persistent if needed */
210  if (!temporary)
213 
214  PG_RETURN_DATUM(result);
215 }
static const char * plugin
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define NameGetDatum(X)
Definition: postgres.h:639
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:207
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:274
ReplicationSlotPersistentData data
Definition: slot.h:156
XLogRecPtr confirmed_flush
Definition: slot.h:92
#define ERROR
Definition: elog.h:46
void ReplicationSlotPersist(void)
Definition: slot.c:767
Definition: c.h:675
static void create_logical_replication_slot(char *name, char *plugin, bool temporary, bool two_phase, XLogRecPtr restart_lsn, bool find_startpoint)
Definition: slotfuncs.c:126
void ReplicationSlotRelease(void)
Definition: slot.c:497
static void check_permissions(void)
Definition: slotfuncs.c:29
uintptr_t Datum
Definition: postgres.h:411
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:353
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:221
const char * name
Definition: encode.c:515
static Datum values[MAXATTR]
Definition: bootstrap.c:166
#define elog(elevel,...)
Definition: elog.h:232
#define NameStr(name)
Definition: c.h:681
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:103
#define PG_GETARG_NAME(n)
Definition: fmgr.h:278

◆ pg_create_physical_replication_slot()

Datum pg_create_physical_replication_slot ( PG_FUNCTION_ARGS  )

Definition at line 74 of file slotfuncs.c.

References check_permissions(), CheckSlotRequirements(), create_physical_replication_slot(), ReplicationSlot::data, elog, ERROR, get_call_result_type(), heap_form_tuple(), HeapTupleGetDatum, InvalidXLogRecPtr, LSNGetDatum, MyReplicationSlot, ReplicationSlotPersistentData::name, name, NameGetDatum, NameStr, PG_GETARG_BOOL, PG_GETARG_NAME, PG_RETURN_DATUM, ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, TYPEFUNC_COMPOSITE, and values.

75 {
77  bool immediately_reserve = PG_GETARG_BOOL(1);
78  bool temporary = PG_GETARG_BOOL(2);
79  Datum values[2];
80  bool nulls[2];
81  TupleDesc tupdesc;
82  HeapTuple tuple;
83  Datum result;
84 
85  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
86  elog(ERROR, "return type must be a row type");
87 
89 
91 
93  immediately_reserve,
94  temporary,
96 
97  values[0] = NameGetDatum(&MyReplicationSlot->data.name);
98  nulls[0] = false;
99 
100  if (immediately_reserve)
101  {
103  nulls[1] = false;
104  }
105  else
106  nulls[1] = true;
107 
108  tuple = heap_form_tuple(tupdesc, values, nulls);
109  result = HeapTupleGetDatum(tuple);
110 
112 
113  PG_RETURN_DATUM(result);
114 }
void CheckSlotRequirements(void)
Definition: slot.c:1066
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define NameGetDatum(X)
Definition: postgres.h:639
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:207
static void create_physical_replication_slot(char *name, bool immediately_reserve, bool temporary, XLogRecPtr restart_lsn)
Definition: slotfuncs.c:46
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:274
ReplicationSlotPersistentData data
Definition: slot.h:156
#define ERROR
Definition: elog.h:46
Definition: c.h:675
void ReplicationSlotRelease(void)
Definition: slot.c:497
static void check_permissions(void)
Definition: slotfuncs.c:29
uintptr_t Datum
Definition: postgres.h:411
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:353
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
XLogRecPtr restart_lsn
Definition: slot.h:81
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:221
const char * name
Definition: encode.c:515
static Datum values[MAXATTR]
Definition: bootstrap.c:166
#define elog(elevel,...)
Definition: elog.h:232
#define NameStr(name)
Definition: c.h:681
#define PG_GETARG_NAME(n)
Definition: fmgr.h:278

◆ pg_drop_replication_slot()

Datum pg_drop_replication_slot ( PG_FUNCTION_ARGS  )

Definition at line 222 of file slotfuncs.c.

References check_permissions(), CheckSlotRequirements(), name, NameStr, PG_GETARG_NAME, PG_RETURN_VOID, and ReplicationSlotDrop().

223 {
224  Name name = PG_GETARG_NAME(0);
225 
227 
229 
230  ReplicationSlotDrop(NameStr(*name), true);
231 
232  PG_RETURN_VOID();
233 }
void CheckSlotRequirements(void)
Definition: slot.c:1066
Definition: c.h:675
static void check_permissions(void)
Definition: slotfuncs.c:29
#define PG_RETURN_VOID()
Definition: fmgr.h:349
const char * name
Definition: encode.c:515
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:591
#define NameStr(name)
Definition: c.h:681
#define PG_GETARG_NAME(n)
Definition: fmgr.h:278

◆ pg_get_replication_slots()

Datum pg_get_replication_slots ( PG_FUNCTION_ARGS  )

Definition at line 239 of file slotfuncs.c.

References ReplicationSlot::active_pid, ReturnSetInfo::allowedModes, Assert, BoolGetDatum, ReplicationSlotPersistentData::catalog_xmin, ReplicationSlotPersistentData::confirmed_flush, CStringGetTextDatum, ReplicationSlot::data, ReplicationSlotPersistentData::database, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, get_call_result_type(), GetWALAvailability(), GetXLogWriteRecPtr(), i, ReplicationSlot::in_use, Int32GetDatum, Int64GetDatum(), ReplicationSlotPersistentData::invalidated_at, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsA, LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), Max, max_replication_slots, max_slot_wal_keep_size_mb, MemoryContextSwitchTo(), ReplicationSlot::mutex, ReplicationSlotPersistentData::name, NameGetDatum, ObjectIdGetDatum, ReplicationSlotPersistentData::persistency, PG_GET_REPLICATION_SLOTS_COLS, ReplicationSlotPersistentData::plugin, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotPersistentData::restart_lsn, ReturnSetInfo::returnMode, RS_TEMPORARY, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, SpinLockAcquire, SpinLockRelease, TransactionIdGetDatum, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), ReplicationSlotPersistentData::two_phase, TYPEFUNC_COMPOSITE, values, wal_keep_size_mb, wal_segment_size, WALAVAIL_EXTENDED, WALAVAIL_INVALID_LSN, WALAVAIL_REMOVED, WALAVAIL_RESERVED, WALAVAIL_UNRESERVED, work_mem, XLByteToSeg, XLogMBVarToSegs, XLogRecPtrIsInvalid, XLogSegNoOffsetToRecPtr, and ReplicationSlotPersistentData::xmin.

240 {
241 #define PG_GET_REPLICATION_SLOTS_COLS 14
242  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
243  TupleDesc tupdesc;
244  Tuplestorestate *tupstore;
245  MemoryContext per_query_ctx;
246  MemoryContext oldcontext;
247  XLogRecPtr currlsn;
248  int slotno;
249 
250  /* check to see if caller supports us returning a tuplestore */
251  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
252  ereport(ERROR,
253  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
254  errmsg("set-valued function called in context that cannot accept a set")));
255  if (!(rsinfo->allowedModes & SFRM_Materialize))
256  ereport(ERROR,
257  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
258  errmsg("materialize mode required, but it is not allowed in this context")));
259 
260  /* Build a tuple descriptor for our result type */
261  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
262  elog(ERROR, "return type must be a row type");
263 
264  /*
265  * We don't require any special permission to see this function's data
266  * because nothing should be sensitive. The most critical being the slot
267  * name, which shouldn't contain anything particularly sensitive.
268  */
269 
270  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
271  oldcontext = MemoryContextSwitchTo(per_query_ctx);
272 
273  tupstore = tuplestore_begin_heap(true, false, work_mem);
274  rsinfo->returnMode = SFRM_Materialize;
275  rsinfo->setResult = tupstore;
276  rsinfo->setDesc = tupdesc;
277 
278  MemoryContextSwitchTo(oldcontext);
279 
280  currlsn = GetXLogWriteRecPtr();
281 
282  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
283  for (slotno = 0; slotno < max_replication_slots; slotno++)
284  {
286  ReplicationSlot slot_contents;
288  bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
289  WALAvailability walstate;
290  int i;
291 
292  if (!slot->in_use)
293  continue;
294 
295  /* Copy slot contents while holding spinlock, then examine at leisure */
296  SpinLockAcquire(&slot->mutex);
297  slot_contents = *slot;
298  SpinLockRelease(&slot->mutex);
299 
300  memset(values, 0, sizeof(values));
301  memset(nulls, 0, sizeof(nulls));
302 
303  i = 0;
304  values[i++] = NameGetDatum(&slot_contents.data.name);
305 
306  if (slot_contents.data.database == InvalidOid)
307  nulls[i++] = true;
308  else
309  values[i++] = NameGetDatum(&slot_contents.data.plugin);
310 
311  if (slot_contents.data.database == InvalidOid)
312  values[i++] = CStringGetTextDatum("physical");
313  else
314  values[i++] = CStringGetTextDatum("logical");
315 
316  if (slot_contents.data.database == InvalidOid)
317  nulls[i++] = true;
318  else
319  values[i++] = ObjectIdGetDatum(slot_contents.data.database);
320 
321  values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY);
322  values[i++] = BoolGetDatum(slot_contents.active_pid != 0);
323 
324  if (slot_contents.active_pid != 0)
325  values[i++] = Int32GetDatum(slot_contents.active_pid);
326  else
327  nulls[i++] = true;
328 
329  if (slot_contents.data.xmin != InvalidTransactionId)
330  values[i++] = TransactionIdGetDatum(slot_contents.data.xmin);
331  else
332  nulls[i++] = true;
333 
334  if (slot_contents.data.catalog_xmin != InvalidTransactionId)
335  values[i++] = TransactionIdGetDatum(slot_contents.data.catalog_xmin);
336  else
337  nulls[i++] = true;
338 
339  if (slot_contents.data.restart_lsn != InvalidXLogRecPtr)
340  values[i++] = LSNGetDatum(slot_contents.data.restart_lsn);
341  else
342  nulls[i++] = true;
343 
344  if (slot_contents.data.confirmed_flush != InvalidXLogRecPtr)
345  values[i++] = LSNGetDatum(slot_contents.data.confirmed_flush);
346  else
347  nulls[i++] = true;
348 
349  /*
350  * If invalidated_at is valid and restart_lsn is invalid, we know for
351  * certain that the slot has been invalidated. Otherwise, test
352  * availability from restart_lsn.
353  */
354  if (XLogRecPtrIsInvalid(slot_contents.data.restart_lsn) &&
355  !XLogRecPtrIsInvalid(slot_contents.data.invalidated_at))
356  walstate = WALAVAIL_REMOVED;
357  else
358  walstate = GetWALAvailability(slot_contents.data.restart_lsn);
359 
360  switch (walstate)
361  {
363  nulls[i++] = true;
364  break;
365 
366  case WALAVAIL_RESERVED:
367  values[i++] = CStringGetTextDatum("reserved");
368  break;
369 
370  case WALAVAIL_EXTENDED:
371  values[i++] = CStringGetTextDatum("extended");
372  break;
373 
374  case WALAVAIL_UNRESERVED:
375  values[i++] = CStringGetTextDatum("unreserved");
376  break;
377 
378  case WALAVAIL_REMOVED:
379 
380  /*
381  * If we read the restart_lsn long enough ago, maybe that file
382  * has been removed by now. However, the walsender could have
383  * moved forward enough that it jumped to another file after
384  * we looked. If checkpointer signalled the process to
385  * termination, then it's definitely lost; but if a process is
386  * still alive, then "unreserved" seems more appropriate.
387  *
388  * If we do change it, save the state for safe_wal_size below.
389  */
390  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
391  {
392  int pid;
393 
394  SpinLockAcquire(&slot->mutex);
395  pid = slot->active_pid;
396  slot_contents.data.restart_lsn = slot->data.restart_lsn;
397  SpinLockRelease(&slot->mutex);
398  if (pid != 0)
399  {
400  values[i++] = CStringGetTextDatum("unreserved");
401  walstate = WALAVAIL_UNRESERVED;
402  break;
403  }
404  }
405  values[i++] = CStringGetTextDatum("lost");
406  break;
407  }
408 
409  /*
410  * safe_wal_size is only computed for slots that have not been lost,
411  * and only if there's a configured maximum size.
412  */
413  if (walstate == WALAVAIL_REMOVED || max_slot_wal_keep_size_mb < 0)
414  nulls[i++] = true;
415  else
416  {
417  XLogSegNo targetSeg;
418  uint64 slotKeepSegs;
419  uint64 keepSegs;
420  XLogSegNo failSeg;
421  XLogRecPtr failLSN;
422 
423  XLByteToSeg(slot_contents.data.restart_lsn, targetSeg, wal_segment_size);
424 
425  /* determine how many segments slots can be kept by slots */
427  /* ditto for wal_keep_size */
429 
430  /* if currpos reaches failLSN, we lose our segment */
431  failSeg = targetSeg + Max(slotKeepSegs, keepSegs) + 1;
432  XLogSegNoOffsetToRecPtr(failSeg, 0, wal_segment_size, failLSN);
433 
434  values[i++] = Int64GetDatum(failLSN - currlsn);
435  }
436 
437  values[i++] = BoolGetDatum(slot_contents.data.two_phase);
438 
440 
441  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
442  }
443 
444  LWLockRelease(ReplicationSlotControlLock);
445 
446  tuplestore_donestoring(tupstore);
447 
448  return (Datum) 0;
449 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:93
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:590
#define NameGetDatum(X)
Definition: postgres.h:639
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:207
int wal_segment_size
Definition: xlog.c:121
#define PG_GET_REPLICATION_SLOTS_COLS
WALAvailability
Definition: xlog.h:281
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
ReplicationSlotPersistency persistency
Definition: slot.h:62
int errcode(int sqlerrcode)
Definition: elog.c:698
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
int wal_keep_size_mb
Definition: xlog.c:94
ReplicationSlotPersistentData data
Definition: slot.h:156
XLogRecPtr confirmed_flush
Definition: slot.h:92
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1805
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define ERROR
Definition: elog.h:46
uint64 XLogSegNo
Definition: xlogdefs.h:48
TransactionId catalog_xmin
Definition: slot.h:78
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: slot.h:70
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1700
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
bool in_use
Definition: slot.h:132
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
#define TransactionIdGetDatum(X)
Definition: postgres.h:565
uintptr_t Datum
Definition: postgres.h:411
int work_mem
Definition: globals.c:124
#define BoolGetDatum(X)
Definition: postgres.h:446
#define InvalidOid
Definition: postgres_ext.h:36
#define XLogMBVarToSegs(mbvar, wal_segsz_bytes)
#define ereport(elevel,...)
Definition: elog.h:157
int allowedModes
Definition: execnodes.h:305
int max_slot_wal_keep_size_mb
Definition: xlog.c:113
XLogRecPtr GetXLogWriteRecPtr(void)
Definition: xlog.c:11763
SetFunctionReturnMode returnMode
Definition: execnodes.h:307
#define Max(x, y)
Definition: c.h:980
int max_replication_slots
Definition: slot.c:99
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
XLogRecPtr restart_lsn
Definition: slot.h:81
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1203
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:233
Tuplestorestate * setResult
Definition: execnodes.h:310
static Datum values[MAXATTR]
Definition: bootstrap.c:166
ExprContext * econtext
Definition: execnodes.h:303
#define Int32GetDatum(X)
Definition: postgres.h:523
TupleDesc setDesc
Definition: execnodes.h:311
int errmsg(const char *fmt,...)
Definition: elog.c:909
pid_t active_pid
Definition: slot.h:135
WALAvailability GetWALAvailability(XLogRecPtr targetLSN)
Definition: xlog.c:9725
#define elog(elevel,...)
Definition: elog.h:232
int i
#define CStringGetTextDatum(s)
Definition: builtins.h:82
ReplicationSlot replication_slots[1]
Definition: slot.h:189
XLogRecPtr invalidated_at
Definition: slot.h:84
slock_t mutex
Definition: slot.h:129
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ pg_logical_replication_slot_advance()

static XLogRecPtr pg_logical_replication_slot_advance ( XLogRecPtr  moveto)
static

Definition at line 496 of file slotfuncs.c.

References Assert, CHECK_FOR_INTERRUPTS, ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), CurrentResourceOwner, ReplicationSlot::data, elog, XLogReaderState::EndRecPtr, ERROR, FreeDecodingContext(), InvalidateSystemCaches(), InvalidXLogRecPtr, LogicalConfirmReceivedLocation(), LogicalDecodingProcessRecord(), MyReplicationSlot, NIL, LogicalDecodingContext::page_read, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, read_local_xlog_page(), LogicalDecodingContext::reader, ReplicationSlotMarkDirty(), ReplicationSlotPersistentData::restart_lsn, wal_segment_close(), XLogBeginRead(), XLogReadRecord(), and XLREAD_NEED_DATA.

Referenced by pg_replication_slot_advance().

497 {
499  ResourceOwner old_resowner = CurrentResourceOwner;
500  XLogRecPtr retlsn;
501 
502  Assert(moveto != InvalidXLogRecPtr);
503 
504  PG_TRY();
505  {
506  /*
507  * Create our decoding context in fast_forward mode, passing start_lsn
508  * as InvalidXLogRecPtr, so that we start processing from my slot's
509  * confirmed_flush.
510  */
512  NIL,
513  true, /* fast_forward */
516  NULL, NULL, NULL);
517 
518  /*
519  * Start reading at the slot's restart_lsn, which we know to point to
520  * a valid record.
521  */
523 
524  /* invalidate non-timetravel entries */
526 
527  /* Decode at least one record, until we run out of records */
528  while (ctx->reader->EndRecPtr < moveto)
529  {
530  char *errm = NULL;
531  XLogRecord *record;
532 
533  /*
534  * Read records. No changes are generated in fast_forward mode,
535  * but snapbuilder/slot statuses are updated properly.
536  */
537  while (XLogReadRecord(ctx->reader, &record, &errm) ==
539  {
540  if (!ctx->page_read(ctx->reader))
541  break;
542  }
543 
544  if (errm)
545  elog(ERROR, "%s", errm);
546 
547  /*
548  * Process the record. Storage-level changes are ignored in
549  * fast_forward mode, but other modules (such as snapbuilder)
550  * might still have critical updates to do.
551  */
552  if (record)
554 
555  /* Stop once the requested target has been reached */
556  if (moveto <= ctx->reader->EndRecPtr)
557  break;
558 
560  }
561 
562  /*
563  * Logical decoding could have clobbered CurrentResourceOwner during
564  * transaction management, so restore the executor's value. (This is
565  * a kluge, but it's not worth cleaning up right now.)
566  */
567  CurrentResourceOwner = old_resowner;
568 
569  if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
570  {
572 
573  /*
574  * If only the confirmed_flush LSN has changed the slot won't get
575  * marked as dirty by the above. Callers on the walsender
576  * interface are expected to keep track of their own progress and
577  * don't need it written out. But SQL-interface users cannot
578  * specify their own start positions and it's harder for them to
579  * keep track of their progress, so we should make more of an
580  * effort to save it for them.
581  *
582  * Dirty the slot so it is written out at the next checkpoint. The
583  * LSN position advanced to may still be lost on a crash but this
584  * makes the data consistent after a clean shutdown.
585  */
587  }
588 
590 
591  /* free context, call shutdown callback */
592  FreeDecodingContext(ctx);
593 
595  }
596  PG_CATCH();
597  {
598  /* clear all timetravel entries */
600 
601  PG_RE_THROW();
602  }
603  PG_END_TRY();
604 
605  return retlsn;
606 }
#define NIL
Definition: pg_list.h:65
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:823
ResourceOwner CurrentResourceOwner
Definition: resowner.c:146
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, LogicalDecodingXLogPageReadCB page_read, WALSegmentCleanupCB cleanup_cb, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:479
ReplicationSlotPersistentData data
Definition: slot.h:156
XLogReadRecordResult XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
Definition: xlogreader.c:346
void InvalidateSystemCaches(void)
Definition: inval.c:649
XLogRecPtr confirmed_flush
Definition: slot.h:92
XLogRecPtr EndRecPtr
Definition: xlogreader.h:179
#define ERROR
Definition: elog.h:46
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:106
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:252
#define PG_CATCH()
Definition: elog.h:323
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:624
XLogRecPtr restart_lsn
Definition: slot.h:81
#define PG_RE_THROW()
Definition: elog.h:354
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1676
XLogReaderState * reader
Definition: logical.h:43
#define elog(elevel,...)
Definition: elog.h:232
LogicalDecodingXLogPageReadCB page_read
Definition: logical.h:44
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:102
#define PG_TRY()
Definition: elog.h:313
#define PG_END_TRY()
Definition: elog.h:338
void ReplicationSlotMarkDirty(void)
Definition: slot.c:750
bool read_local_xlog_page(XLogReaderState *state)
Definition: xlogutils.c:842

◆ pg_physical_replication_slot_advance()

static XLogRecPtr pg_physical_replication_slot_advance ( XLogRecPtr  moveto)
static

Definition at line 459 of file slotfuncs.c.

References Assert, ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by pg_replication_slot_advance().

460 {
462  XLogRecPtr retlsn = startlsn;
463 
464  Assert(moveto != InvalidXLogRecPtr);
465 
466  if (startlsn < moveto)
467  {
471  retlsn = moveto;
472 
473  /*
474  * Dirty the slot so as it is written out at the next checkpoint. Note
475  * that the LSN position advanced may still be lost in the event of a
476  * crash, but this makes the data consistent after a clean shutdown.
477  */
479  }
480 
481  return retlsn;
482 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReplicationSlotPersistentData data
Definition: slot.h:156
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
XLogRecPtr restart_lsn
Definition: slot.h:81
slock_t mutex
Definition: slot.h:129
void ReplicationSlotMarkDirty(void)
Definition: slot.c:750

◆ pg_replication_slot_advance()

Datum pg_replication_slot_advance ( PG_FUNCTION_ARGS  )

Definition at line 612 of file slotfuncs.c.

References Assert, check_permissions(), ReplicationSlotPersistentData::confirmed_flush, ReplicationSlot::data, ReplicationSlotPersistentData::database, elog, ereport, errcode(), errdetail(), errmsg(), ERROR, get_call_result_type(), GetFlushRecPtr(), GetXLogReplayRecPtr(), heap_form_tuple(), HeapTupleGetDatum, LSN_FORMAT_ARGS, LSNGetDatum, Min, MyReplicationSlot, ReplicationSlotPersistentData::name, NameGetDatum, NameStr, OidIsValid, PG_GETARG_LSN, PG_GETARG_NAME, pg_logical_replication_slot_advance(), pg_physical_replication_slot_advance(), PG_RETURN_DATUM, RecoveryInProgress(), ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), ReplicationSlotPersistentData::restart_lsn, SAB_Error, ThisTimeLineID, TYPEFUNC_COMPOSITE, values, and XLogRecPtrIsInvalid.

613 {
614  Name slotname = PG_GETARG_NAME(0);
615  XLogRecPtr moveto = PG_GETARG_LSN(1);
616  XLogRecPtr endlsn;
617  XLogRecPtr minlsn;
618  TupleDesc tupdesc;
619  Datum values[2];
620  bool nulls[2];
621  HeapTuple tuple;
622  Datum result;
623 
625 
627 
628  if (XLogRecPtrIsInvalid(moveto))
629  ereport(ERROR,
630  (errmsg("invalid target WAL LSN")));
631 
632  /* Build a tuple descriptor for our result type */
633  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
634  elog(ERROR, "return type must be a row type");
635 
636  /*
637  * We can't move slot past what's been flushed/replayed so clamp the
638  * target position accordingly.
639  */
640  if (!RecoveryInProgress())
641  moveto = Min(moveto, GetFlushRecPtr());
642  else
643  moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
644 
645  /* Acquire the slot so we "own" it */
646  (void) ReplicationSlotAcquire(NameStr(*slotname), SAB_Error);
647 
648  /* A slot whose restart_lsn has never been reserved cannot be advanced */
650  ereport(ERROR,
651  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
652  errmsg("replication slot \"%s\" cannot be advanced",
653  NameStr(*slotname)),
654  errdetail("This slot has never previously reserved WAL, or it has been invalidated.")));
655 
656  /*
657  * Check if the slot is not moving backwards. Physical slots rely simply
658  * on restart_lsn as a minimum point, while logical slots have confirmed
659  * consumption up to confirmed_flush, meaning that in both cases data
660  * older than that is not available anymore.
661  */
664  else
666 
667  if (moveto < minlsn)
668  ereport(ERROR,
669  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
670  errmsg("cannot advance replication slot to %X/%X, minimum is %X/%X",
671  LSN_FORMAT_ARGS(moveto), LSN_FORMAT_ARGS(minlsn))));
672 
673  /* Do the actual slot update, depending on the slot type */
675  endlsn = pg_logical_replication_slot_advance(moveto);
676  else
677  endlsn = pg_physical_replication_slot_advance(moveto);
678 
679  values[0] = NameGetDatum(&MyReplicationSlot->data.name);
680  nulls[0] = false;
681 
682  /*
683  * Recompute the minimum LSN and xmin across all slots to adjust with the
684  * advancing potentially done.
685  */
688 
690 
691  /* Return the reached position. */
692  values[1] = LSNGetDatum(endlsn);
693  nulls[1] = false;
694 
695  tuple = heap_form_tuple(tupdesc, values, nulls);
696  result = HeapTupleGetDatum(tuple);
697 
698  PG_RETURN_DATUM(result);
699 }
static XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr moveto)
Definition: slotfuncs.c:496
#define NameGetDatum(X)
Definition: postgres.h:639
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:207
#define Min(x, y)
Definition: c.h:986
int errcode(int sqlerrcode)
Definition: elog.c:698
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8589
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
ReplicationSlotPersistentData data
Definition: slot.h:156
bool RecoveryInProgress(void)
Definition: xlog.c:8237
#define OidIsValid(objectId)
Definition: c.h:710
XLogRecPtr confirmed_flush
Definition: slot.h:92
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:839
#define ERROR
Definition: elog.h:46
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11728
Definition: c.h:675
int errdetail(const char *fmt,...)
Definition: elog.c:1042
void ReplicationSlotRelease(void)
Definition: slot.c:497
static void check_permissions(void)
Definition: slotfuncs.c:29
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:24
uintptr_t Datum
Definition: postgres.h:411
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:353
TimeLineID ThisTimeLineID
Definition: xlog.c:196
#define ereport(elevel,...)
Definition: elog.h:157
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
XLogRecPtr restart_lsn
Definition: slot.h:81
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:221
static Datum values[MAXATTR]
Definition: bootstrap.c:166
int errmsg(const char *fmt,...)
Definition: elog.c:909
int ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
Definition: slot.c:388
#define elog(elevel,...)
Definition: elog.h:232
Definition: slot.h:43
#define NameStr(name)
Definition: c.h:681
static XLogRecPtr pg_physical_replication_slot_advance(XLogRecPtr moveto)
Definition: slotfuncs.c:459
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:789
#define PG_GETARG_NAME(n)
Definition: fmgr.h:278