PostgreSQL Source Code  git master
slotfuncs.c File Reference
#include "postgres.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "access/htup_details.h"
#include "replication/decode.h"
#include "replication/slot.h"
#include "replication/logical.h"
#include "replication/logicalfuncs.h"
#include "utils/builtins.h"
#include "utils/inval.h"
#include "utils/pg_lsn.h"
#include "utils/resowner.h"
Include dependency graph for slotfuncs.c:

Go to the source code of this file.

Macros

#define PG_GET_REPLICATION_SLOTS_COLS   11
 

Functions

static void check_permissions (void)
 
Datum pg_create_physical_replication_slot (PG_FUNCTION_ARGS)
 
Datum pg_create_logical_replication_slot (PG_FUNCTION_ARGS)
 
Datum pg_drop_replication_slot (PG_FUNCTION_ARGS)
 
Datum pg_get_replication_slots (PG_FUNCTION_ARGS)
 
static XLogRecPtr pg_physical_replication_slot_advance (XLogRecPtr startlsn, XLogRecPtr moveto)
 
static XLogRecPtr pg_logical_replication_slot_advance (XLogRecPtr startlsn, XLogRecPtr moveto)
 
Datum pg_replication_slot_advance (PG_FUNCTION_ARGS)
 

Macro Definition Documentation

◆ PG_GET_REPLICATION_SLOTS_COLS

#define PG_GET_REPLICATION_SLOTS_COLS   11

Function Documentation

◆ check_permissions()

static void check_permissions ( void  )
static

Definition at line 30 of file slotfuncs.c.

References ereport, errcode(), errmsg(), ERROR, GetUserId(), has_rolreplication(), and superuser().

Referenced by pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_drop_replication_slot(), and pg_replication_slot_advance().

31 {
33  ereport(ERROR,
34  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
35  (errmsg("must be superuser or replication role to use replication slots"))));
36 }
Oid GetUserId(void)
Definition: miscinit.c:379
int errcode(int sqlerrcode)
Definition: elog.c:575
bool superuser(void)
Definition: superuser.c:47
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:122
bool has_rolreplication(Oid roleid)
Definition: miscinit.c:560
int errmsg(const char *fmt,...)
Definition: elog.c:797

◆ pg_create_logical_replication_slot()

Datum pg_create_logical_replication_slot ( PG_FUNCTION_ARGS  )

Definition at line 100 of file slotfuncs.c.

References Assert, check_permissions(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateInitDecodingContext(), CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), elog, ERROR, FreeDecodingContext(), get_call_result_type(), heap_form_tuple(), HeapTupleGetDatum, logical_read_local_xlog_page(), LSNGetDatum, MyReplicationSlot, ReplicationSlotPersistentData::name, name, NameStr, NIL, PG_GETARG_BOOL, PG_GETARG_NAME, PG_RETURN_DATUM, plugin, ReplicationSlotCreate(), ReplicationSlotPersist(), ReplicationSlotRelease(), RS_EPHEMERAL, RS_TEMPORARY, TYPEFUNC_COMPOSITE, and values.

101 {
102  Name name = PG_GETARG_NAME(0);
104  bool temporary = PG_GETARG_BOOL(2);
105 
106  LogicalDecodingContext *ctx = NULL;
107 
108  TupleDesc tupdesc;
109  HeapTuple tuple;
110  Datum result;
111  Datum values[2];
112  bool nulls[2];
113 
115 
116  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
117  elog(ERROR, "return type must be a row type");
118 
120 
122 
123  /*
124  * Acquire a logical decoding slot, this will check for conflicting names.
125  * Initially create persistent slot as ephemeral - that allows us to
126  * nicely handle errors during initialization because it'll get dropped if
127  * this transaction fails. We'll make it persistent at the end. Temporary
128  * slots can be created as temporary from beginning as they get dropped on
129  * error as well.
130  */
131  ReplicationSlotCreate(NameStr(*name), true,
132  temporary ? RS_TEMPORARY : RS_EPHEMERAL);
133 
134  /*
135  * Create logical decoding context, to build the initial snapshot.
136  */
137  ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
138  false, /* do not build snapshot */
139  logical_read_local_xlog_page, NULL, NULL,
140  NULL);
141 
142  /* build initial snapshot, might take a while */
144 
147 
148  /* don't need the decoding context anymore */
149  FreeDecodingContext(ctx);
150 
151  memset(nulls, 0, sizeof(nulls));
152 
153  tuple = heap_form_tuple(tupdesc, values, nulls);
154  result = HeapTupleGetDatum(tuple);
155 
156  /* ok, slot is now fully created, mark it as persistent if needed */
157  if (!temporary)
160 
161  PG_RETURN_DATUM(result);
162 }
#define NIL
Definition: pg_list.h:69
static const char * plugin
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
int logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: logicalfuncs.c:118
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:221
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:223
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1074
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:244
ReplicationSlotPersistentData data
Definition: slot.h:120
XLogRecPtr confirmed_flush
Definition: slot.h:81
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:445
#define ERROR
Definition: elog.h:43
void ReplicationSlotPersist(void)
Definition: slot.c:673
Definition: c.h:570
void ReplicationSlotRelease(void)
Definition: slot.c:416
static void check_permissions(void)
Definition: slotfuncs.c:30
uintptr_t Datum
Definition: postgres.h:367
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:318
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:699
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:488
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:231
const char * name
Definition: encode.c:521
static Datum values[MAXATTR]
Definition: bootstrap.c:164
#define NameStr(name)
Definition: c.h:576
#define CStringGetTextDatum(s)
Definition: builtins.h:95
#define elog
Definition: elog.h:219
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:78
#define PG_GETARG_NAME(n)
Definition: fmgr.h:248

◆ pg_create_physical_replication_slot()

Datum pg_create_physical_replication_slot ( PG_FUNCTION_ARGS  )

Definition at line 43 of file slotfuncs.c.

References Assert, check_permissions(), CheckSlotRequirements(), ReplicationSlot::data, elog, ERROR, get_call_result_type(), heap_form_tuple(), HeapTupleGetDatum, LSNGetDatum, MyReplicationSlot, ReplicationSlotPersistentData::name, name, NameGetDatum, NameStr, PG_GETARG_BOOL, PG_GETARG_NAME, PG_RETURN_DATUM, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), ReplicationSlotPersistentData::restart_lsn, RS_PERSISTENT, RS_TEMPORARY, TYPEFUNC_COMPOSITE, and values.

44 {
46  bool immediately_reserve = PG_GETARG_BOOL(1);
47  bool temporary = PG_GETARG_BOOL(2);
48  Datum values[2];
49  bool nulls[2];
50  TupleDesc tupdesc;
51  HeapTuple tuple;
52  Datum result;
53 
55 
56  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
57  elog(ERROR, "return type must be a row type");
58 
60 
62 
63  /* acquire replication slot, this will check for conflicting names */
64  ReplicationSlotCreate(NameStr(*name), false,
65  temporary ? RS_TEMPORARY : RS_PERSISTENT);
66 
67  values[0] = NameGetDatum(&MyReplicationSlot->data.name);
68  nulls[0] = false;
69 
70  if (immediately_reserve)
71  {
72  /* Reserve WAL as the user asked for it */
74 
75  /* Write this slot to disk */
78 
80  nulls[1] = false;
81  }
82  else
83  {
84  nulls[1] = true;
85  }
86 
87  tuple = heap_form_tuple(tupdesc, values, nulls);
88  result = HeapTupleGetDatum(tuple);
89 
91 
92  PG_RETURN_DATUM(result);
93 }
void CheckSlotRequirements(void)
Definition: slot.c:965
#define NameGetDatum(X)
Definition: postgres.h:580
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:221
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
void ReplicationSlotSave(void)
Definition: slot.c:638
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1074
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:244
ReplicationSlotPersistentData data
Definition: slot.h:120
void ReplicationSlotReserveWal(void)
Definition: slot.c:985
#define ERROR
Definition: elog.h:43
Definition: c.h:570
void ReplicationSlotRelease(void)
Definition: slot.c:416
static void check_permissions(void)
Definition: slotfuncs.c:30
uintptr_t Datum
Definition: postgres.h:367
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:318
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:699
XLogRecPtr restart_lsn
Definition: slot.h:73
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:231
const char * name
Definition: encode.c:521
static Datum values[MAXATTR]
Definition: bootstrap.c:164
#define NameStr(name)
Definition: c.h:576
#define elog
Definition: elog.h:219
void ReplicationSlotMarkDirty(void)
Definition: slot.c:656
#define PG_GETARG_NAME(n)
Definition: fmgr.h:248

◆ pg_drop_replication_slot()

Datum pg_drop_replication_slot ( PG_FUNCTION_ARGS  )

Definition at line 169 of file slotfuncs.c.

References check_permissions(), CheckSlotRequirements(), name, NameStr, PG_GETARG_NAME, PG_RETURN_VOID, and ReplicationSlotDrop().

170 {
171  Name name = PG_GETARG_NAME(0);
172 
174 
176 
177  ReplicationSlotDrop(NameStr(*name), true);
178 
179  PG_RETURN_VOID();
180 }
void CheckSlotRequirements(void)
Definition: slot.c:965
Definition: c.h:570
static void check_permissions(void)
Definition: slotfuncs.c:30
#define PG_RETURN_VOID()
Definition: fmgr.h:314
const char * name
Definition: encode.c:521
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:509
#define NameStr(name)
Definition: c.h:576
#define PG_GETARG_NAME(n)
Definition: fmgr.h:248

◆ pg_get_replication_slots()

Datum pg_get_replication_slots ( PG_FUNCTION_ARGS  )

Definition at line 186 of file slotfuncs.c.

References ReplicationSlot::active_pid, ReturnSetInfo::allowedModes, BoolGetDatum, ReplicationSlotPersistentData::catalog_xmin, ReplicationSlotPersistentData::confirmed_flush, CStringGetTextDatum, ReplicationSlot::data, ReplicationSlotPersistentData::database, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, get_call_result_type(), i, ReplicationSlot::in_use, Int32GetDatum, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsA, LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, MemoryContextSwitchTo(), ReplicationSlot::mutex, ReplicationSlotPersistentData::name, namecpy(), NameGetDatum, ReplicationSlotPersistentData::persistency, PG_GET_REPLICATION_SLOTS_COLS, plugin, ReplicationSlotPersistentData::plugin, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotPersistentData::restart_lsn, ReturnSetInfo::returnMode, RS_TEMPORARY, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, SpinLockAcquire, SpinLockRelease, TransactionIdGetDatum, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, work_mem, and ReplicationSlotPersistentData::xmin.

187 {
188 #define PG_GET_REPLICATION_SLOTS_COLS 11
189  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
190  TupleDesc tupdesc;
191  Tuplestorestate *tupstore;
192  MemoryContext per_query_ctx;
193  MemoryContext oldcontext;
194  int slotno;
195 
196  /* check to see if caller supports us returning a tuplestore */
197  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
198  ereport(ERROR,
199  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
200  errmsg("set-valued function called in context that cannot accept a set")));
201  if (!(rsinfo->allowedModes & SFRM_Materialize))
202  ereport(ERROR,
203  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
204  errmsg("materialize mode required, but it is not " \
205  "allowed in this context")));
206 
207  /* Build a tuple descriptor for our result type */
208  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
209  elog(ERROR, "return type must be a row type");
210 
211  /*
212  * We don't require any special permission to see this function's data
213  * because nothing should be sensitive. The most critical being the slot
214  * name, which shouldn't contain anything particularly sensitive.
215  */
216 
217  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
218  oldcontext = MemoryContextSwitchTo(per_query_ctx);
219 
220  tupstore = tuplestore_begin_heap(true, false, work_mem);
221  rsinfo->returnMode = SFRM_Materialize;
222  rsinfo->setResult = tupstore;
223  rsinfo->setDesc = tupdesc;
224 
225  MemoryContextSwitchTo(oldcontext);
226 
227  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
228  for (slotno = 0; slotno < max_replication_slots; slotno++)
229  {
232  bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
233 
234  ReplicationSlotPersistency persistency;
235  TransactionId xmin;
236  TransactionId catalog_xmin;
237  XLogRecPtr restart_lsn;
238  XLogRecPtr confirmed_flush_lsn;
239  pid_t active_pid;
240  Oid database;
241  NameData slot_name;
243  int i;
244 
245  if (!slot->in_use)
246  continue;
247 
248  SpinLockAcquire(&slot->mutex);
249 
250  xmin = slot->data.xmin;
251  catalog_xmin = slot->data.catalog_xmin;
252  database = slot->data.database;
253  restart_lsn = slot->data.restart_lsn;
254  confirmed_flush_lsn = slot->data.confirmed_flush;
255  namecpy(&slot_name, &slot->data.name);
256  namecpy(&plugin, &slot->data.plugin);
257  active_pid = slot->active_pid;
258  persistency = slot->data.persistency;
259 
260  SpinLockRelease(&slot->mutex);
261 
262  memset(nulls, 0, sizeof(nulls));
263 
264  i = 0;
265  values[i++] = NameGetDatum(&slot_name);
266 
267  if (database == InvalidOid)
268  nulls[i++] = true;
269  else
270  values[i++] = NameGetDatum(&plugin);
271 
272  if (database == InvalidOid)
273  values[i++] = CStringGetTextDatum("physical");
274  else
275  values[i++] = CStringGetTextDatum("logical");
276 
277  if (database == InvalidOid)
278  nulls[i++] = true;
279  else
280  values[i++] = database;
281 
282  values[i++] = BoolGetDatum(persistency == RS_TEMPORARY);
283  values[i++] = BoolGetDatum(active_pid != 0);
284 
285  if (active_pid != 0)
286  values[i++] = Int32GetDatum(active_pid);
287  else
288  nulls[i++] = true;
289 
290  if (xmin != InvalidTransactionId)
291  values[i++] = TransactionIdGetDatum(xmin);
292  else
293  nulls[i++] = true;
294 
295  if (catalog_xmin != InvalidTransactionId)
296  values[i++] = TransactionIdGetDatum(catalog_xmin);
297  else
298  nulls[i++] = true;
299 
300  if (restart_lsn != InvalidXLogRecPtr)
301  values[i++] = LSNGetDatum(restart_lsn);
302  else
303  nulls[i++] = true;
304 
305  if (confirmed_flush_lsn != InvalidXLogRecPtr)
306  values[i++] = LSNGetDatum(confirmed_flush_lsn);
307  else
308  nulls[i++] = true;
309 
310  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
311  }
312  LWLockRelease(ReplicationSlotControlLock);
313 
314  tuplestore_donestoring(tupstore);
315 
316  return (Datum) 0;
317 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
int namecpy(Name n1, Name n2)
Definition: name.c:191
static const char * plugin
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:93
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:568
#define NameGetDatum(X)
Definition: postgres.h:580
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
#define PG_GET_REPLICATION_SLOTS_COLS
uint32 TransactionId
Definition: c.h:474
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
ReplicationSlotPersistency persistency
Definition: slot.h:54
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
ReplicationSlotPersistentData data
Definition: slot.h:120
unsigned int Oid
Definition: postgres_ext.h:31
XLogRecPtr confirmed_flush
Definition: slot.h:81
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1725
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
Definition: c.h:570
TransactionId catalog_xmin
Definition: slot.h:70
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: slot.h:62
#define ereport(elevel, rest)
Definition: elog.h:122
bool in_use
Definition: slot.h:96
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
#define TransactionIdGetDatum(X)
Definition: postgres.h:506
uintptr_t Datum
Definition: postgres.h:367
int work_mem
Definition: globals.c:122
#define BoolGetDatum(X)
Definition: postgres.h:387
#define InvalidOid
Definition: postgres_ext.h:36
int allowedModes
Definition: execnodes.h:297
SetFunctionReturnMode returnMode
Definition: execnodes.h:299
int max_replication_slots
Definition: slot.c:99
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogRecPtr restart_lsn
Definition: slot.h:73
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1121
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:225
Tuplestorestate * setResult
Definition: execnodes.h:302
static Datum values[MAXATTR]
Definition: bootstrap.c:164
ExprContext * econtext
Definition: execnodes.h:295
#define Int32GetDatum(X)
Definition: postgres.h:464
TupleDesc setDesc
Definition: execnodes.h:303
ReplicationSlotPersistency
Definition: slot.h:33
int errmsg(const char *fmt,...)
Definition: elog.c:797
pid_t active_pid
Definition: slot.h:99
int i
#define CStringGetTextDatum(s)
Definition: builtins.h:95
ReplicationSlot replication_slots[1]
Definition: slot.h:153
#define elog
Definition: elog.h:219
slock_t mutex
Definition: slot.h:93

◆ pg_logical_replication_slot_advance()

static XLogRecPtr pg_logical_replication_slot_advance ( XLogRecPtr  startlsn,
XLogRecPtr  moveto 
)
static

Definition at line 342 of file slotfuncs.c.

References CHECK_FOR_INTERRUPTS, ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), CurrentResourceOwner, ReplicationSlot::data, elog, XLogReaderState::EndRecPtr, ERROR, FreeDecodingContext(), InvalidateSystemCaches(), InvalidXLogRecPtr, logical_read_local_xlog_page(), LogicalConfirmReceivedLocation(), LogicalDecodingProcessRecord(), MyReplicationSlot, NIL, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, LogicalDecodingContext::reader, ReplicationSlotMarkDirty(), ResourceOwnerCreate(), and XLogReadRecord().

Referenced by pg_replication_slot_advance().

343 {
345  ResourceOwner old_resowner = CurrentResourceOwner;
346  XLogRecPtr retlsn = InvalidXLogRecPtr;
347 
348  PG_TRY();
349  {
350  /* restart at slot's confirmed_flush */
352  NIL,
353  true,
355  NULL, NULL, NULL);
356 
358  "logical decoding");
359 
360  /* invalidate non-timetravel entries */
362 
363  /* Decode until we run out of records */
364  while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) ||
365  (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < moveto))
366  {
367  XLogRecord *record;
368  char *errm = NULL;
369 
370  record = XLogReadRecord(ctx->reader, startlsn, &errm);
371  if (errm)
372  elog(ERROR, "%s", errm);
373 
374  /*
375  * Now that we've set up the xlog reader state, subsequent calls
376  * pass InvalidXLogRecPtr to say "continue from last record"
377  */
378  startlsn = InvalidXLogRecPtr;
379 
380  /*
381  * The {begin_txn,change,commit_txn}_wrapper callbacks above will
382  * store the description into our tuplestore.
383  */
384  if (record != NULL)
386 
387  /* check limits */
388  if (moveto <= ctx->reader->EndRecPtr)
389  break;
390 
392  }
393 
394  CurrentResourceOwner = old_resowner;
395 
396  if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
397  {
399 
400  /*
401  * If only the confirmed_flush_lsn has changed the slot won't get
402  * marked as dirty by the above. Callers on the walsender
403  * interface are expected to keep track of their own progress and
404  * don't need it written out. But SQL-interface users cannot
405  * specify their own start positions and it's harder for them to
406  * keep track of their progress, so we should make more of an
407  * effort to save it for them.
408  *
409  * Dirty the slot so it's written out at the next checkpoint.
410  * We'll still lose its position on crash, as documented, but it's
411  * better than always losing the position even on clean restart.
412  */
414  }
415 
417 
418  /* free context, call shutdown callback */
419  FreeDecodingContext(ctx);
420 
422  }
423  PG_CATCH();
424  {
425  /* clear all timetravel entries */
427 
428  PG_RE_THROW();
429  }
430  PG_END_TRY();
431 
432  return retlsn;
433 }
#define NIL
Definition: pg_list.h:69
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: logicalfuncs.c:118
ResourceOwner CurrentResourceOwner
Definition: resowner.c:140
ReplicationSlotPersistentData data
Definition: slot.h:120
void InvalidateSystemCaches(void)
Definition: inval.c:641
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:193
XLogRecPtr confirmed_flush
Definition: slot.h:81
XLogRecPtr EndRecPtr
Definition: xlogreader.h:120
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:97
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:353
#define PG_CATCH()
Definition: elog.h:293
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:488
#define PG_RE_THROW()
Definition: elog.h:314
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:981
XLogReaderState * reader
Definition: logical.h:44
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
#define elog
Definition: elog.h:219
#define PG_TRY()
Definition: elog.h:284
#define PG_END_TRY()
Definition: elog.h:300
void ReplicationSlotMarkDirty(void)
Definition: slot.c:656
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:418

◆ pg_physical_replication_slot_advance()

static XLogRecPtr pg_physical_replication_slot_advance ( XLogRecPtr  startlsn,
XLogRecPtr  moveto 
)
static

Definition at line 323 of file slotfuncs.c.

References ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by pg_replication_slot_advance().

324 {
325  XLogRecPtr retlsn = InvalidXLogRecPtr;
326 
328  if (MyReplicationSlot->data.restart_lsn < moveto)
329  {
331  retlsn = moveto;
332  }
334 
335  return retlsn;
336 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReplicationSlotPersistentData data
Definition: slot.h:120
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogRecPtr restart_lsn
Definition: slot.h:73
slock_t mutex
Definition: slot.h:93

◆ pg_replication_slot_advance()

Datum pg_replication_slot_advance ( PG_FUNCTION_ARGS  )

Definition at line 439 of file slotfuncs.c.

References Assert, check_permissions(), ReplicationSlotPersistentData::confirmed_flush, ReplicationSlot::data, ReplicationSlotPersistentData::database, elog, ereport, errcode(), errmsg(), ERROR, get_call_result_type(), GetFlushRecPtr(), GetXLogReplayRecPtr(), heap_form_tuple(), HeapTupleGetDatum, LSNGetDatum, Min, MyReplicationSlot, ReplicationSlotPersistentData::name, NameGetDatum, NameStr, OidIsValid, PG_GETARG_LSN, PG_GETARG_NAME, pg_logical_replication_slot_advance(), pg_physical_replication_slot_advance(), PG_RETURN_DATUM, RecoveryInProgress(), ReplicationSlotAcquire(), ReplicationSlotMarkDirty(), ReplicationSlotRelease(), ReplicationSlotSave(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), ThisTimeLineID, TYPEFUNC_COMPOSITE, values, and XLogRecPtrIsInvalid.

440 {
441  Name slotname = PG_GETARG_NAME(0);
442  XLogRecPtr moveto = PG_GETARG_LSN(1);
443  XLogRecPtr endlsn;
444  XLogRecPtr startlsn;
445  TupleDesc tupdesc;
446  Datum values[2];
447  bool nulls[2];
448  HeapTuple tuple;
449  Datum result;
450 
452 
454 
455  if (XLogRecPtrIsInvalid(moveto))
456  ereport(ERROR,
457  (errmsg("invalid target wal lsn")));
458 
459  /* Build a tuple descriptor for our result type */
460  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
461  elog(ERROR, "return type must be a row type");
462 
463  /*
464  * We can't move slot past what's been flushed/replayed so clamp the
465  * target position accordingly.
466  */
467  if (!RecoveryInProgress())
468  moveto = Min(moveto, GetFlushRecPtr());
469  else
470  moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
471 
472  /* Acquire the slot so we "own" it */
473  ReplicationSlotAcquire(NameStr(*slotname), true);
474 
476  if (moveto < startlsn)
477  {
479  ereport(ERROR,
480  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
481  errmsg("cannot move slot to %X/%X, minimum is %X/%X",
482  (uint32) (moveto >> 32), (uint32) moveto,
483  (uint32) (startlsn >> 32), (uint32) startlsn)));
484  }
485 
487  endlsn = pg_logical_replication_slot_advance(startlsn, moveto);
488  else
489  endlsn = pg_physical_replication_slot_advance(startlsn, moveto);
490 
491  values[0] = NameGetDatum(&MyReplicationSlot->data.name);
492  nulls[0] = false;
493 
494  /* Update the on disk state when lsn was updated. */
495  if (XLogRecPtrIsInvalid(endlsn))
496  {
501  }
502 
504 
505  /* Return the reached position. */
506  values[1] = LSNGetDatum(endlsn);
507  nulls[1] = false;
508 
509  tuple = heap_form_tuple(tupdesc, values, nulls);
510  result = HeapTupleGetDatum(tuple);
511 
512  PG_RETURN_DATUM(result);
513 }
#define NameGetDatum(X)
Definition: postgres.h:580
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:330
#define Min(x, y)
Definition: c.h:857
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
void ReplicationSlotSave(void)
Definition: slot.c:638
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8281
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1074
ReplicationSlotPersistentData data
Definition: slot.h:120
bool RecoveryInProgress(void)
Definition: xlog.c:7949
#define OidIsValid(objectId)
Definition: c.h:605
XLogRecPtr confirmed_flush
Definition: slot.h:81
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:741
#define ERROR
Definition: elog.h:43
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11194
Definition: c.h:570
unsigned int uint32
Definition: c.h:325
void ReplicationSlotRelease(void)
Definition: slot.c:416
static void check_permissions(void)
Definition: slotfuncs.c:30
#define ereport(elevel, rest)
Definition: elog.h:122
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:24
uintptr_t Datum
Definition: postgres.h:367
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:318
TimeLineID ThisTimeLineID
Definition: xlog.c:181
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:699
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:231
static Datum values[MAXATTR]
Definition: bootstrap.c:164
static XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
Definition: slotfuncs.c:342
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define NameStr(name)
Definition: c.h:576
#define elog
Definition: elog.h:219
static XLogRecPtr pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
Definition: slotfuncs.c:323
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:695
void ReplicationSlotMarkDirty(void)
Definition: slot.c:656
#define PG_GETARG_NAME(n)
Definition: fmgr.h:248