75 const char *prefix,
Size message_size,
const char *message);
92 const char *prefix,
Size message_size,
const char *message);
114 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
115 errmsg(
"logical decoding requires wal_level >= logical")));
119 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
120 errmsg(
"logical decoding requires a database connection")));
137 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
138 errmsg(
"logical decoding cannot be used while in recovery")));
149 bool need_full_snapshot,
165 "Logical decoding context",
204 (
errcode(ERRCODE_OUT_OF_MEMORY),
205 errmsg(
"out of memory")));
282 ctx->
write = do_write;
319 List *output_plugin_options,
320 bool need_full_snapshot,
338 elog(
ERROR,
"cannot perform logical decoding without an acquired slot");
341 elog(
ERROR,
"cannot initialize logical decoding without a specified plugin");
346 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
347 errmsg(
"cannot use physical replication slot for logical decoding")));
351 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
352 errmsg(
"replication slot \"%s\" was not created in this database",
358 (
errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
359 errmsg(
"cannot create logical replication slot in transaction that has performed writes")));
412 if (need_full_snapshot)
424 need_full_snapshot,
false,
425 xl_routine, prepare_write, do_write,
471 List *output_plugin_options,
487 elog(
ERROR,
"cannot perform logical decoding without an acquired slot");
492 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
493 errmsg(
"cannot use physical replication slot for logical decoding")));
497 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
498 errmsg(
"replication slot \"%s\" was not created in this database",
506 else if (start_lsn < slot->data.confirmed_flush)
516 elog(
DEBUG1,
"cannot stream from %X/%X, minimum is %X/%X, forwarding",
525 fast_forward, xl_routine, prepare_write,
526 do_write, update_progress);
537 (
errmsg(
"starting logical decoding for slot \"%s\"",
539 errdetail(
"Streaming transactions committing after %X/%X, reading WAL from %X/%X.",
566 elog(
DEBUG1,
"searching for logical decoding starting point, starting at %X/%X",
619 elog(
ERROR,
"writes are only accepted in commit, begin and change callbacks");
632 elog(
ERROR,
"OutputPluginPrepareWrite needs to be called before OutputPluginWrite");
662 if (plugin_init == NULL)
663 elog(
ERROR,
"output plugins have to declare the _PG_output_plugin_init symbol");
666 plugin_init(callbacks);
669 elog(
ERROR,
"output plugins have to register a begin callback");
671 elog(
ERROR,
"output plugins have to register a change callback");
673 elog(
ERROR,
"output plugins have to register a commit callback");
683 errcontext(
"slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%X",
689 errcontext(
"slot \"%s\", output plugin \"%s\", in the %s callback",
708 errcallback.
arg = (
void *) &state;
735 errcallback.
arg = (
void *) &state;
768 errcallback.
arg = (
void *) &state;
799 errcallback.
arg = (
void *) &state;
839 errcallback.
arg = (
void *) &state;
854 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
855 errmsg(
"logical replication at prepare time requires begin_prepare_cb callback")));
882 errcallback.
arg = (
void *) &state;
897 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
898 errmsg(
"logical replication at prepare time requires prepare_cb callback")));
925 errcallback.
arg = (
void *) &state;
940 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
941 errmsg(
"logical replication at prepare time requires commit_prepared_cb callback")));
969 errcallback.
arg = (
void *) &state;
984 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
985 errmsg(
"logical replication at prepare time requires rollback_prepared_cb callback")));
1010 errcallback.
arg = (
void *) &state;
1050 errcallback.
arg = (
void *) &state;
1086 errcallback.
arg = (
void *) &state;
1116 errcallback.
arg = (
void *) &state;
1135 const char *prefix,
Size message_size,
const char *message)
1151 errcallback.
arg = (
void *) &state;
1162 message_size, message);
1186 errcallback.
arg = (
void *) &state;
1205 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1206 errmsg(
"logical streaming requires a stream_start_cb callback")));
1232 errcallback.
arg = (
void *) &state;
1251 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1252 errmsg(
"logical streaming requires a stream_stop_cb callback")));
1278 errcallback.
arg = (
void *) &state;
1290 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1291 errmsg(
"logical streaming requires a stream_abort_cb callback")));
1321 errcallback.
arg = (
void *) &state;
1333 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1334 errmsg(
"logical streaming at prepare time requires a stream_prepare_cb callback")));
1360 errcallback.
arg = (
void *) &state;
1372 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1373 errmsg(
"logical streaming requires a stream_commit_cb callback")));
1399 errcallback.
arg = (
void *) &state;
1418 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1419 errmsg(
"logical streaming requires a stream_change_cb callback")));
1430 const char *prefix,
Size message_size,
const char *message)
1450 errcallback.
arg = (
void *) &state;
1461 message_size, message);
1469 int nrelations,
Relation relations[],
1490 errcallback.
arg = (
void *) &state;
1523 bool updated_xmin =
false;
1545 else if (current_lsn <= slot->data.confirmed_flush)
1551 updated_xmin =
true;
1580 bool updated_lsn =
false;
1592 if (restart_lsn <= slot->data.restart_lsn)
1600 else if (current_lsn <= slot->data.confirmed_flush)
1620 elog(
DEBUG1,
"got new restart lsn %X/%X at %X/%X",
1635 elog(
DEBUG1,
"failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
1660 bool updated_xmin =
false;
1661 bool updated_restart =
false;
1686 updated_xmin =
true;
1698 updated_restart =
true;
1704 if (updated_xmin || updated_restart)
1708 elog(
DEBUG1,
"updated xmin: %u restart: %u", updated_xmin, updated_restart);
1760 elog(
DEBUG2,
"UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld",
LogicalDecodeTruncateCB truncate_cb
static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr last_lsn)
static const char * plugin
void CheckSlotRequirements(void)
TransactionId candidate_catalog_xmin
#define InvalidXLogRecPtr
static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly)
ReorderBufferApplyChangeCB apply_change
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
struct ReorderBuffer * reorder
#define PROC_IN_LOGICAL_DECODING
LogicalDecodeStreamPrepareCB stream_prepare_cb
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
void namestrcpy(Name name, const char *str)
bool DecodingContextReady(LogicalDecodingContext *ctx)
struct LogicalErrorCallbackState LogicalErrorCallbackState
static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
OutputPluginOptions options
void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
static void stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
ReorderBufferStreamAbortCB stream_abort
StringInfo makeStringInfo(void)
LogicalDecodeMessageCB message_cb
OutputPluginCallbacks callbacks
LogicalDecodeStreamMessageCB stream_message_cb
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
void ReorderBufferFree(ReorderBuffer *rb)
int errcode(int sqlerrcode)
bool IsTransactionOrTransactionBlock(void)
bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
void ReplicationSlotSave(void)
LogicalDecodeStreamAbortCB stream_abort_cb
List * output_plugin_options
ReorderBufferCommitCB commit
ReplicationSlotPersistentData data
bool RecoveryInProgress(void)
ReorderBufferStreamCommitCB stream_commit
void(* callback)(void *arg)
struct ErrorContextCallback * previous
#define SlotIsPhysical(slot)
LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite
void(* LogicalOutputPluginWriterUpdateProgress)(struct LogicalDecodingContext *lr, XLogRecPtr Ptr, TransactionId xid)
const char * callback_name
static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
XLogRecPtr confirmed_flush
XLogRecPtr write_location
static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
LogicalDecodeBeginPrepareCB begin_prepare_cb
void LWLockRelease(LWLock *lock)
ErrorContextCallback * error_context_stack
#define SpinLockAcquire(lock)
#define LSN_FORMAT_ARGS(lsn)
ReorderBufferStreamMessageCB stream_message
void ReplicationSlotReserveWal(void)
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
void ReplicationSlotsComputeRequiredLSN(void)
LogicalDecodePrepareCB prepare_cb
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
LogicalOutputPluginWriterPrepareWrite prepare_write
ReorderBufferStreamPrepareCB stream_prepare
void ResetLogicalStreamingState(void)
LogicalDecodeCommitCB commit_cb
TransactionId effective_xmin
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
#define ALLOCSET_DEFAULT_SIZES
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
static void shutdown_cb_wrapper(LogicalDecodingContext *ctx)
XLogRecPtr candidate_restart_valid
void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount, int spillbytes, int streamtxns, int streamcount, int streambytes)
int errdetail(const char *fmt,...)
TransactionId catalog_xmin
#define InvalidTransactionId
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
LogicalOutputPluginWriterUpdateProgress update_progress
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
ReorderBufferBeginCB begin_prepare
MemoryContext CurrentMemoryContext
bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
TransactionId GetTopTransactionIdIfAny(void)
ReorderBufferMessageCB message
LogicalDecodeCommitPreparedCB commit_prepared_cb
static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
ReorderBufferStreamChangeCB stream_change
ReorderBufferStreamTruncateCB stream_truncate
void * load_external_function(const char *filename, const char *funcname, bool signalNotFound, void **filehandle)
#define XLogRecPtrIsInvalid(r)
static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message)
TransactionId CheckXidAlive
void XLogReaderFree(XLogReaderState *state)
#define SpinLockRelease(lock)
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
void UpdateDecodingStats(LogicalDecodingContext *ctx)
void * palloc0(Size size)
static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr first_lsn)
LogicalDecodeChangeCB change_cb
LogicalDecodeFilterPrepareCB filter_prepare_cb
TransactionId effective_catalog_xmin
LogicalDecodeStreamTruncateCB stream_truncate_cb
static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
ReorderBufferStreamStartCB stream_start
#define ereport(elevel,...)
void FreeSnapshotBuilder(SnapBuild *builder)
static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
struct SnapBuild * snapshot_builder
ReplicationSlot * MyReplicationSlot
#define Assert(condition)
void FreeDecodingContext(LogicalDecodingContext *ctx)
void LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
ReorderBufferRollbackPreparedCB rollback_prepared
LogicalDecodeShutdownCB shutdown_cb
LogicalDecodeStreamCommitCB stream_commit_cb
static void begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
bool IsTransactionState(void)
ReorderBuffer * ReorderBufferAllocate(void)
void(* LogicalOutputPluginWriterWrite)(struct LogicalDecodingContext *lr, XLogRecPtr Ptr, TransactionId xid, bool last_write)
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
LogicalDecodeStartupCB startup_cb
XLogRecPtr candidate_xmin_lsn
XLogRecPtr report_location
LogicalDecodeStreamStartCB stream_start_cb
static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
int errmsg(const char *fmt,...)
ReorderBufferApplyTruncateCB apply_truncate
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
LogicalOutputPluginWriterWrite write
LogicalDecodeBeginCB begin_cb
#define CHECK_FOR_INTERRUPTS()
static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message)
LogicalDecodeStreamStopCB stream_stop_cb
#define TransactionIdIsValid(xid)
LogicalDecodingContext * ctx
ReorderBufferBeginCB begin
static void output_plugin_error_callback(void *arg)
static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin)
LogicalDecodeFilterByOriginCB filter_by_origin_cb
void CheckLogicalDecodingRequirements(void)
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
LogicalDecodeStreamChangeCB stream_change_cb
ReorderBufferCommitPreparedCB commit_prepared
SnapBuild * AllocateSnapshotBuilder(ReorderBuffer *reorder, TransactionId xmin_horizon, XLogRecPtr start_lsn, bool need_full_snapshot)
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
XLogRecPtr candidate_restart_lsn
void ReplicationSlotMarkDirty(void)
static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static LogicalDecodingContext * StartupDecodingContext(List *output_plugin_options, XLogRecPtr start_lsn, TransactionId xmin_horizon, bool need_full_snapshot, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
ReorderBufferPrepareCB prepare
ReorderBufferStreamStopCB stream_stop