33 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
34 errmsg(
"must be superuser or replication role to use replication slots")));
55 if (immediately_reserve)
86 elog(
ERROR,
"return type must be a row type");
100 if (immediately_reserve)
127 bool temporary,
bool two_phase,
129 bool find_startpoint)
188 elog(
ERROR,
"return type must be a row type");
204 memset(nulls, 0,
sizeof(nulls));
241 #define PG_GET_REPLICATION_SLOTS_COLS 14 253 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
254 errmsg(
"set-valued function called in context that cannot accept a set")));
257 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
258 errmsg(
"materialize mode required, but it is not allowed in this context")));
262 elog(
ERROR,
"return type must be a row type");
297 slot_contents = *slot;
300 memset(values, 0,
sizeof(values));
301 memset(nulls, 0,
sizeof(nulls));
431 failSeg = targetSeg +
Max(slotKeepSegs, keepSegs) + 1;
466 if (startlsn < moveto)
556 if (moveto <= ctx->reader->EndRecPtr)
630 (
errmsg(
"invalid target WAL LSN")));
634 elog(
ERROR,
"return type must be a row type");
651 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
652 errmsg(
"replication slot \"%s\" cannot be advanced",
654 errdetail(
"This slot has never previously reserved WAL, or it has been invalidated.")));
669 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
670 errmsg(
"cannot advance replication slot to %X/%X, minimum is %X/%X",
723 elog(
ERROR,
"return type must be a row type");
753 first_slot_contents = *s;
764 (
errcode(ERRCODE_UNDEFINED_OBJECT),
765 errmsg(
"replication slot \"%s\" does not exist",
NameStr(*src_name))));
773 if (src_islogical != logical_slot)
775 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
777 errmsg(
"cannot copy physical replication slot \"%s\" as a logical replication slot",
779 errmsg(
"cannot copy logical replication slot \"%s\" as a physical replication slot",
785 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
786 errmsg(
"cannot copy a replication slot that doesn't reserve WAL")));
834 second_slot_contents = *src;
840 copy_xmin = second_slot_contents.
data.
xmin;
859 if (copy_restart_lsn < src_restart_lsn ||
860 src_islogical != copy_islogical ||
861 strcmp(copy_name,
NameStr(*src_name)) != 0)
863 (
errmsg(
"could not copy replication slot \"%s\"",
865 errdetail(
"The source replication slot was modified incompatibly during the copy operation.")));
870 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
871 errmsg(
"cannot copy unfinished logical replication slot \"%s\"",
873 errhint(
"Retry when the source replication slot's confirmed_flush_lsn is valid.")));
891 #ifdef USE_ASSERT_CHECKING 903 if (logical_slot && !temporary)
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
static XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr moveto)
static const char * plugin
ReplicationSlotCtlData * ReplicationSlotCtl
void CheckSlotRequirements(void)
#define InvalidXLogRecPtr
#define IsA(nodeptr, _type_)
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
int errhint(const char *fmt,...)
void wal_segment_close(XLogReaderState *state)
static void create_physical_replication_slot(char *name, bool immediately_reserve, bool temporary, XLogRecPtr restart_lsn)
#define PG_GET_REPLICATION_SLOTS_COLS
Datum pg_get_replication_slots(PG_FUNCTION_ARGS)
ResourceOwner CurrentResourceOwner
Datum pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, LogicalDecodingXLogPageReadCB page_read, WALSegmentCleanupCB cleanup_cb, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
#define tuplestore_donestoring(state)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
ReplicationSlotPersistency persistency
int errcode(int sqlerrcode)
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase)
void ReplicationSlotSave(void)
XLogRecPtr GetFlushRecPtr(void)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
#define PG_GETARG_BOOL(n)
Datum pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS)
ReplicationSlotPersistentData data
Datum pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS)
bool RecoveryInProgress(void)
XLogReadRecordResult XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
#define OidIsValid(objectId)
void InvalidateSystemCaches(void)
XLogRecPtr confirmed_flush
void LWLockRelease(LWLock *lock)
Datum pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
#define SpinLockAcquire(lock)
#define LSN_FORMAT_ARGS(lsn)
XLogSegNo XLogGetLastRemovedSegno(void)
void ReplicationSlotReserveWal(void)
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
void ReplicationSlotsComputeRequiredLSN(void)
#define ObjectIdGetDatum(X)
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Datum pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS)
static Datum copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
void ReplicationSlotPersist(void)
TransactionId effective_xmin
static void create_logical_replication_slot(char *name, char *plugin, bool temporary, bool two_phase, XLogRecPtr restart_lsn, bool find_startpoint)
Datum pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS)
int errdetail(const char *fmt,...)
Datum pg_replication_slot_advance(PG_FUNCTION_ARGS)
TransactionId catalog_xmin
#define InvalidTransactionId
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
void ReplicationSlotRelease(void)
#define SlotIsLogical(slot)
Datum Int64GetDatum(int64 X)
static void check_permissions(void)
#define XLogRecPtrIsInvalid(r)
#define SpinLockRelease(lock)
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
#define TransactionIdGetDatum(X)
Datum pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS)
Datum pg_drop_replication_slot(PG_FUNCTION_ARGS)
#define PG_RETURN_DATUM(x)
TransactionId effective_catalog_xmin
TimeLineID ThisTimeLineID
#define XLogMBVarToSegs(mbvar, wal_segsz_bytes)
#define ereport(elevel,...)
int max_slot_wal_keep_size_mb
XLogRecPtr GetXLogWriteRecPtr(void)
SetFunctionReturnMode returnMode
ReplicationSlot * MyReplicationSlot
int max_replication_slots
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define Assert(condition)
void FreeDecodingContext(LogicalDecodingContext *ctx)
bool has_rolreplication(Oid roleid)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
#define HeapTupleGetDatum(tuple)
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
MemoryContext ecxt_per_query_memory
Tuplestorestate * setResult
static Datum values[MAXATTR]
void ReplicationSlotDrop(const char *name, bool nowait)
int errmsg(const char *fmt,...)
int ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
WALAvailability GetWALAvailability(XLogRecPtr targetLSN)
#define CStringGetTextDatum(s)
LogicalDecodingXLogPageReadCB page_read
#define CHECK_FOR_INTERRUPTS()
ReplicationSlot replication_slots[1]
static XLogRecPtr pg_physical_replication_slot_advance(XLogRecPtr moveto)
XLogRecPtr invalidated_at
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, LogicalDecodingXLogPageReadCB page_read, WALSegmentCleanupCB cleanup_cb, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
void CheckLogicalDecodingRequirements(void)
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
void ReplicationSlotMarkDirty(void)
#define PG_GETARG_NAME(n)
bool read_local_xlog_page(XLogReaderState *state)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)