PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
slotfuncs.c File Reference
#include "postgres.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "access/htup_details.h"
#include "replication/slot.h"
#include "replication/logical.h"
#include "replication/logicalfuncs.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
Include dependency graph for slotfuncs.c:

Go to the source code of this file.

Macros

#define PG_GET_REPLICATION_SLOTS_COLS   11
 

Functions

static void check_permissions (void)
 
Datum pg_create_physical_replication_slot (PG_FUNCTION_ARGS)
 
Datum pg_create_logical_replication_slot (PG_FUNCTION_ARGS)
 
Datum pg_drop_replication_slot (PG_FUNCTION_ARGS)
 
Datum pg_get_replication_slots (PG_FUNCTION_ARGS)
 

Macro Definition Documentation

#define PG_GET_REPLICATION_SLOTS_COLS   11

Function Documentation

static void check_permissions ( void  )
static

Definition at line 27 of file slotfuncs.c.

References ereport, errcode(), errmsg(), ERROR, GetUserId(), has_rolreplication(), and superuser().

Referenced by pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), and pg_drop_replication_slot().

28 {
30  ereport(ERROR,
31  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
32  (errmsg("must be superuser or replication role to use replication slots"))));
33 }
Oid GetUserId(void)
Definition: miscinit.c:284
int errcode(int sqlerrcode)
Definition: elog.c:575
bool superuser(void)
Definition: superuser.c:47
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:122
bool has_rolreplication(Oid roleid)
Definition: miscinit.c:465
int errmsg(const char *fmt,...)
Definition: elog.c:797
Datum pg_create_logical_replication_slot ( PG_FUNCTION_ARGS  )

Definition at line 97 of file slotfuncs.c.

References Assert, check_permissions(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateInitDecodingContext(), CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), elog, ERROR, FreeDecodingContext(), get_call_result_type(), heap_form_tuple(), HeapTupleGetDatum, logical_read_local_xlog_page(), LSNGetDatum, MyReplicationSlot, ReplicationSlotPersistentData::name, name, NameStr, NIL, NULL, PG_GETARG_BOOL, PG_GETARG_NAME, PG_RETURN_DATUM, plugin, ReplicationSlotCreate(), ReplicationSlotPersist(), ReplicationSlotRelease(), result, RS_EPHEMERAL, RS_TEMPORARY, TYPEFUNC_COMPOSITE, and values.

98 {
101  bool temporary = PG_GETARG_BOOL(2);
102 
104 
105  TupleDesc tupdesc;
106  HeapTuple tuple;
107  Datum result;
108  Datum values[2];
109  bool nulls[2];
110 
112 
113  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
114  elog(ERROR, "return type must be a row type");
115 
117 
119 
120  /*
121  * Acquire a logical decoding slot, this will check for conflicting names.
122  * Initially create persistent slot as ephemeral - that allows us to
123  * nicely handle errors during initialization because it'll get dropped if
124  * this transaction fails. We'll make it persistent at the end. Temporary
125  * slots can be created as temporary from beginning as they get dropped on
126  * error as well.
127  */
128  ReplicationSlotCreate(NameStr(*name), true,
129  temporary ? RS_TEMPORARY : RS_EPHEMERAL);
130 
131  /*
132  * Create logical decoding context, to build the initial snapshot.
133  */
134  ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
135  false, /* do not build snapshot */
137  NULL);
138 
139  /* build initial snapshot, might take a while */
141 
144 
145  /* don't need the decoding context anymore */
146  FreeDecodingContext(ctx);
147 
148  memset(nulls, 0, sizeof(nulls));
149 
150  tuple = heap_form_tuple(tupdesc, values, nulls);
151  result = HeapTupleGetDatum(tuple);
152 
153  /* ok, slot is now fully created, mark it as persistent if needed */
154  if (!temporary)
157 
158  PG_RETURN_DATUM(result);
159 }
#define NIL
Definition: pg_list.h:69
static const char * plugin
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
int logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: logicalfuncs.c:118
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:221
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
return result
Definition: formatting.c:1633
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:216
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:695
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:239
ReplicationSlotPersistentData data
Definition: slot.h:116
XLogRecPtr confirmed_flush
Definition: slot.h:77
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:432
#define ERROR
Definition: elog.h:43
void ReplicationSlotPersist(void)
Definition: slot.c:673
Definition: c.h:493
void ReplicationSlotRelease(void)
Definition: slot.c:416
static void check_permissions(void)
Definition: slotfuncs.c:27
uintptr_t Datum
Definition: postgres.h:372
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:313
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:676
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:475
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:222
const char * name
Definition: encode.c:521
static Datum values[MAXATTR]
Definition: bootstrap.c:163
#define NameStr(name)
Definition: c.h:499
#define CStringGetTextDatum(s)
Definition: builtins.h:91
#define elog
Definition: elog.h:219
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:76
#define PG_GETARG_NAME(n)
Definition: fmgr.h:243
Datum pg_create_physical_replication_slot ( PG_FUNCTION_ARGS  )

Definition at line 40 of file slotfuncs.c.

References Assert, check_permissions(), CheckSlotRequirements(), ReplicationSlot::data, elog, ERROR, get_call_result_type(), heap_form_tuple(), HeapTupleGetDatum, LSNGetDatum, MyReplicationSlot, ReplicationSlotPersistentData::name, name, NameGetDatum, NameStr, NULL, PG_GETARG_BOOL, PG_GETARG_NAME, PG_RETURN_DATUM, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), ReplicationSlotPersistentData::restart_lsn, result, RS_PERSISTENT, RS_TEMPORARY, TYPEFUNC_COMPOSITE, and values.

41 {
43  bool immediately_reserve = PG_GETARG_BOOL(1);
44  bool temporary = PG_GETARG_BOOL(2);
45  Datum values[2];
46  bool nulls[2];
47  TupleDesc tupdesc;
48  HeapTuple tuple;
49  Datum result;
50 
52 
53  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
54  elog(ERROR, "return type must be a row type");
55 
57 
59 
60  /* acquire replication slot, this will check for conflicting names */
61  ReplicationSlotCreate(NameStr(*name), false,
62  temporary ? RS_TEMPORARY : RS_PERSISTENT);
63 
64  values[0] = NameGetDatum(&MyReplicationSlot->data.name);
65  nulls[0] = false;
66 
67  if (immediately_reserve)
68  {
69  /* Reserve WAL as the user asked for it */
71 
72  /* Write this slot to disk */
75 
77  nulls[1] = false;
78  }
79  else
80  {
81  nulls[1] = true;
82  }
83 
84  tuple = heap_form_tuple(tupdesc, values, nulls);
85  result = HeapTupleGetDatum(tuple);
86 
88 
89  PG_RETURN_DATUM(result);
90 }
void CheckSlotRequirements(void)
Definition: slot.c:965
#define NameGetDatum(X)
Definition: postgres.h:601
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:221
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
return result
Definition: formatting.c:1633
void ReplicationSlotSave(void)
Definition: slot.c:638
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:695
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:239
ReplicationSlotPersistentData data
Definition: slot.h:116
void ReplicationSlotReserveWal(void)
Definition: slot.c:985
#define ERROR
Definition: elog.h:43
Definition: c.h:493
void ReplicationSlotRelease(void)
Definition: slot.c:416
static void check_permissions(void)
Definition: slotfuncs.c:27
uintptr_t Datum
Definition: postgres.h:372
#define PG_RETURN_DATUM(x)
Definition: fmgr.h:313
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:676
XLogRecPtr restart_lsn
Definition: slot.h:69
#define HeapTupleGetDatum(tuple)
Definition: funcapi.h:222
const char * name
Definition: encode.c:521
static Datum values[MAXATTR]
Definition: bootstrap.c:163
#define NameStr(name)
Definition: c.h:499
#define elog
Definition: elog.h:219
void ReplicationSlotMarkDirty(void)
Definition: slot.c:656
#define PG_GETARG_NAME(n)
Definition: fmgr.h:243
Datum pg_drop_replication_slot ( PG_FUNCTION_ARGS  )

Definition at line 166 of file slotfuncs.c.

References check_permissions(), CheckSlotRequirements(), name, NameStr, PG_GETARG_NAME, PG_RETURN_VOID, and ReplicationSlotDrop().

167 {
168  Name name = PG_GETARG_NAME(0);
169 
171 
173 
174  ReplicationSlotDrop(NameStr(*name), false);
175 
176  PG_RETURN_VOID();
177 }
void CheckSlotRequirements(void)
Definition: slot.c:965
Definition: c.h:493
static void check_permissions(void)
Definition: slotfuncs.c:27
#define PG_RETURN_VOID()
Definition: fmgr.h:309
const char * name
Definition: encode.c:521
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:509
#define NameStr(name)
Definition: c.h:499
#define PG_GETARG_NAME(n)
Definition: fmgr.h:243
Datum pg_get_replication_slots ( PG_FUNCTION_ARGS  )

Definition at line 183 of file slotfuncs.c.

References ReplicationSlot::active_pid, ReturnSetInfo::allowedModes, BoolGetDatum, ReplicationSlotPersistentData::catalog_xmin, ReplicationSlotPersistentData::confirmed_flush, CStringGetTextDatum, ReplicationSlot::data, ReplicationSlotPersistentData::database, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, get_call_result_type(), i, ReplicationSlot::in_use, Int32GetDatum, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsA, LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, MemoryContextSwitchTo(), ReplicationSlot::mutex, ReplicationSlotPersistentData::name, namecpy(), NameGetDatum, NULL, ReplicationSlotPersistentData::persistency, PG_GET_REPLICATION_SLOTS_COLS, plugin, ReplicationSlotPersistentData::plugin, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotPersistentData::restart_lsn, ReturnSetInfo::returnMode, RS_TEMPORARY, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, SpinLockAcquire, SpinLockRelease, TransactionIdGetDatum, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, work_mem, and ReplicationSlotPersistentData::xmin.

184 {
185 #define PG_GET_REPLICATION_SLOTS_COLS 11
186  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
187  TupleDesc tupdesc;
188  Tuplestorestate *tupstore;
189  MemoryContext per_query_ctx;
190  MemoryContext oldcontext;
191  int slotno;
192 
193  /* check to see if caller supports us returning a tuplestore */
194  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
195  ereport(ERROR,
196  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
197  errmsg("set-valued function called in context that cannot accept a set")));
198  if (!(rsinfo->allowedModes & SFRM_Materialize))
199  ereport(ERROR,
200  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
201  errmsg("materialize mode required, but it is not " \
202  "allowed in this context")));
203 
204  /* Build a tuple descriptor for our result type */
205  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
206  elog(ERROR, "return type must be a row type");
207 
208  /*
209  * We don't require any special permission to see this function's data
210  * because nothing should be sensitive. The most critical being the slot
211  * name, which shouldn't contain anything particularly sensitive.
212  */
213 
214  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
215  oldcontext = MemoryContextSwitchTo(per_query_ctx);
216 
217  tupstore = tuplestore_begin_heap(true, false, work_mem);
218  rsinfo->returnMode = SFRM_Materialize;
219  rsinfo->setResult = tupstore;
220  rsinfo->setDesc = tupdesc;
221 
222  MemoryContextSwitchTo(oldcontext);
223 
224  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
225  for (slotno = 0; slotno < max_replication_slots; slotno++)
226  {
229  bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
230 
231  ReplicationSlotPersistency persistency;
232  TransactionId xmin;
233  TransactionId catalog_xmin;
234  XLogRecPtr restart_lsn;
235  XLogRecPtr confirmed_flush_lsn;
236  pid_t active_pid;
237  Oid database;
238  NameData slot_name;
240  int i;
241 
242  if (!slot->in_use)
243  continue;
244 
245  SpinLockAcquire(&slot->mutex);
246 
247  xmin = slot->data.xmin;
248  catalog_xmin = slot->data.catalog_xmin;
249  database = slot->data.database;
250  restart_lsn = slot->data.restart_lsn;
251  confirmed_flush_lsn = slot->data.confirmed_flush;
252  namecpy(&slot_name, &slot->data.name);
253  namecpy(&plugin, &slot->data.plugin);
254  active_pid = slot->active_pid;
255  persistency = slot->data.persistency;
256 
257  SpinLockRelease(&slot->mutex);
258 
259  memset(nulls, 0, sizeof(nulls));
260 
261  i = 0;
262  values[i++] = NameGetDatum(&slot_name);
263 
264  if (database == InvalidOid)
265  nulls[i++] = true;
266  else
267  values[i++] = NameGetDatum(&plugin);
268 
269  if (database == InvalidOid)
270  values[i++] = CStringGetTextDatum("physical");
271  else
272  values[i++] = CStringGetTextDatum("logical");
273 
274  if (database == InvalidOid)
275  nulls[i++] = true;
276  else
277  values[i++] = database;
278 
279  values[i++] = BoolGetDatum(persistency == RS_TEMPORARY);
280  values[i++] = BoolGetDatum(active_pid != 0);
281 
282  if (active_pid != 0)
283  values[i++] = Int32GetDatum(active_pid);
284  else
285  nulls[i++] = true;
286 
287  if (xmin != InvalidTransactionId)
288  values[i++] = TransactionIdGetDatum(xmin);
289  else
290  nulls[i++] = true;
291 
292  if (catalog_xmin != InvalidTransactionId)
293  values[i++] = TransactionIdGetDatum(catalog_xmin);
294  else
295  nulls[i++] = true;
296 
297  if (restart_lsn != InvalidXLogRecPtr)
298  values[i++] = LSNGetDatum(restart_lsn);
299  else
300  nulls[i++] = true;
301 
302  if (confirmed_flush_lsn != InvalidXLogRecPtr)
303  values[i++] = LSNGetDatum(confirmed_flush_lsn);
304  else
305  nulls[i++] = true;
306 
307  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
308  }
309  LWLockRelease(ReplicationSlotControlLock);
310 
311  tuplestore_donestoring(tupstore);
312 
313  return (Datum) 0;
314 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
int namecpy(Name n1, Name n2)
Definition: name.c:191
static const char * plugin
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:93
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:560
#define NameGetDatum(X)
Definition: postgres.h:601
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
#define PG_GET_REPLICATION_SLOTS_COLS
uint32 TransactionId
Definition: c.h:397
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
ReplicationSlotPersistency persistency
Definition: slot.h:50
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
ReplicationSlotPersistentData data
Definition: slot.h:116
unsigned int Oid
Definition: postgres_ext.h:31
XLogRecPtr confirmed_flush
Definition: slot.h:77
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
Definition: c.h:493
TransactionId catalog_xmin
Definition: slot.h:66
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: slot.h:58
#define ereport(elevel, rest)
Definition: elog.h:122
bool in_use
Definition: slot.h:92
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
#define TransactionIdGetDatum(X)
Definition: postgres.h:527
uintptr_t Datum
Definition: postgres.h:372
int work_mem
Definition: globals.c:113
#define BoolGetDatum(X)
Definition: postgres.h:408
#define InvalidOid
Definition: postgres_ext.h:36
int allowedModes
Definition: execnodes.h:268
SetFunctionReturnMode returnMode
Definition: execnodes.h:270
int max_replication_slots
Definition: slot.c:99
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogRecPtr restart_lsn
Definition: slot.h:69
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:202
Tuplestorestate * setResult
Definition: execnodes.h:273
static Datum values[MAXATTR]
Definition: bootstrap.c:163
ExprContext * econtext
Definition: execnodes.h:266
#define Int32GetDatum(X)
Definition: postgres.h:485
TupleDesc setDesc
Definition: execnodes.h:274
ReplicationSlotPersistency
Definition: slot.h:29
int errmsg(const char *fmt,...)
Definition: elog.c:797
pid_t active_pid
Definition: slot.h:95
int i
#define CStringGetTextDatum(s)
Definition: builtins.h:91
ReplicationSlot replication_slots[1]
Definition: slot.h:149
#define elog
Definition: elog.h:219
slock_t mutex
Definition: slot.h:89