76 const char *prefix,
Size message_size,
const char *message);
93 const char *prefix,
Size message_size,
const char *message);
120 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
121 errmsg(
"logical decoding requires \"wal_level\" >= \"logical\"")));
125 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
126 errmsg(
"logical decoding requires a database connection")));
140 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
141 errmsg(
"logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary")));
153 bool need_full_snapshot,
170 "Logical decoding context",
209 (
errcode(ERRCODE_OUT_OF_MEMORY),
211 errdetail(
"Failed while allocating a WAL reading processor.")));
294 ctx->
write = do_write;
331 List *output_plugin_options,
332 bool need_full_snapshot,
356 elog(
ERROR,
"cannot perform logical decoding without an acquired slot");
359 elog(
ERROR,
"cannot initialize logical decoding without a specified plugin");
364 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
365 errmsg(
"cannot use physical replication slot for logical decoding")));
369 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
370 errmsg(
"replication slot \"%s\" was not created in this database",
376 (
errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
377 errmsg(
"cannot create logical replication slot in transaction that has performed writes")));
430 if (need_full_snapshot)
442 need_full_snapshot,
false,
true,
443 xl_routine, prepare_write, do_write,
497 List *output_plugin_options,
513 elog(
ERROR,
"cannot perform logical decoding without an acquired slot");
518 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
519 errmsg(
"cannot use physical replication slot for logical decoding")));
528 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
529 errmsg(
"replication slot \"%s\" was not created in this database",
539 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
540 errmsg(
"cannot use replication slot \"%s\" for logical decoding",
542 errdetail(
"This replication slot is being synchronized from the primary server."),
543 errhint(
"Specify another replication slot."));
553 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
554 errmsg(
"can no longer get changes from replication slot \"%s\"",
556 errdetail(
"This slot has been invalidated because it exceeded the maximum reserved size.")));
560 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
561 errmsg(
"can no longer get changes from replication slot \"%s\"",
563 errdetail(
"This slot has been invalidated because it was conflicting with recovery.")));
573 else if (start_lsn < slot->
data.confirmed_flush)
587 elog(
LOG,
"%X/%X has been already streamed, forwarding to %X/%X",
596 fast_forward,
false, xl_routine, prepare_write,
597 do_write, update_progress);
628 (
errmsg(
"starting logical decoding for slot \"%s\"",
630 errdetail(
"Streaming transactions committing after %X/%X, reading WAL from %X/%X.",
657 elog(
DEBUG1,
"searching for logical decoding starting point, starting at %X/%X",
669 elog(
ERROR,
"could not find logical decoding starting point: %s",
err);
671 elog(
ERROR,
"could not find logical decoding starting point");
712 elog(
ERROR,
"writes are only accepted in commit, begin and change callbacks");
725 elog(
ERROR,
"OutputPluginPrepareWrite needs to be called before OutputPluginWrite");
757 if (plugin_init == NULL)
758 elog(
ERROR,
"output plugins have to declare the _PG_output_plugin_init symbol");
761 plugin_init(callbacks);
764 elog(
ERROR,
"output plugins have to register a begin callback");
766 elog(
ERROR,
"output plugins have to register a change callback");
768 elog(
ERROR,
"output plugins have to register a commit callback");
778 errcontext(
"slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%X",
781 state->callback_name,
784 errcontext(
"slot \"%s\", output plugin \"%s\", in the %s callback",
787 state->callback_name);
800 state.callback_name =
"startup";
828 state.callback_name =
"shutdown";
862 state.callback_name =
"begin";
894 state.callback_name =
"commit";
935 state.callback_name =
"begin_prepare";
954 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
955 errmsg(
"logical replication at prepare time requires a %s callback",
956 "begin_prepare_cb")));
980 state.callback_name =
"prepare";
999 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1000 errmsg(
"logical replication at prepare time requires a %s callback",
1025 state.callback_name =
"commit_prepared";
1044 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1045 errmsg(
"logical replication at prepare time requires a %s callback",
1046 "commit_prepared_cb")));
1071 state.callback_name =
"rollback_prepared";
1090 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1091 errmsg(
"logical replication at prepare time requires a %s callback",
1092 "rollback_prepared_cb")));
1114 state.callback_name =
"change";
1115 state.report_location = change->
lsn;
1156 state.callback_name =
"truncate";
1157 state.report_location = change->
lsn;
1195 state.callback_name =
"filter_prepare";
1226 state.callback_name =
"filter_by_origin";
1249 const char *prefix,
Size message_size,
const char *message)
1262 state.callback_name =
"message";
1263 state.report_location = message_lsn;
1277 message_size, message);
1298 state.callback_name =
"stream_start";
1299 state.report_location = first_lsn;
1322 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1323 errmsg(
"logical streaming requires a %s callback",
1324 "stream_start_cb")));
1347 state.callback_name =
"stream_stop";
1348 state.report_location = last_lsn;
1371 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1372 errmsg(
"logical streaming requires a %s callback",
1373 "stream_stop_cb")));
1396 state.callback_name =
"stream_abort";
1397 state.report_location = abort_lsn;
1412 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1413 errmsg(
"logical streaming requires a %s callback",
1414 "stream_abort_cb")));
1441 state.callback_name =
"stream_prepare";
1457 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1458 errmsg(
"logical streaming at prepare time requires a %s callback",
1459 "stream_prepare_cb")));
1482 state.callback_name =
"stream_commit";
1498 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1499 errmsg(
"logical streaming requires a %s callback",
1500 "stream_commit_cb")));
1523 state.callback_name =
"stream_change";
1524 state.report_location = change->
lsn;
1547 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1548 errmsg(
"logical streaming requires a %s callback",
1549 "stream_change_cb")));
1560 const char *prefix,
Size message_size,
const char *message)
1577 state.callback_name =
"stream_message";
1578 state.report_location = message_lsn;
1592 message_size, message);
1600 int nrelations,
Relation relations[],
1618 state.callback_name =
"stream_truncate";
1619 state.report_location = change->
lsn;
1657 state.callback_name =
"update_progress_txn";
1658 state.report_location = lsn;
1695 bool updated_xmin =
false;
1697 bool got_new_xmin =
false;
1718 else if (current_lsn <= slot->
data.confirmed_flush)
1724 updated_xmin =
true;
1740 got_new_xmin =
true;
1745 elog(
DEBUG1,
"got new catalog xmin %u at %X/%X", xmin,
1763 bool updated_lsn =
false;
1775 if (restart_lsn <= slot->
data.restart_lsn)
1784 else if (current_lsn <= slot->
data.confirmed_flush)
1805 elog(
DEBUG1,
"got new restart lsn %X/%X at %X/%X",
1820 elog(
DEBUG1,
"failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
1845 bool updated_xmin =
false;
1846 bool updated_restart =
false;
1871 updated_xmin =
true;
1883 updated_restart =
true;
1889 if (updated_xmin || updated_restart)
1893 elog(
DEBUG1,
"updated xmin: %u restart: %u", updated_xmin, updated_restart);
1943 elog(
DEBUG2,
"UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld %lld %lld",
1983 bool has_pending_wal =
false;
2022 elog(
ERROR,
"could not find record for logical decoding: %s", errm);
2045 return has_pending_wal;
2064 bool *found_consistent_snapshot)
2072 if (found_consistent_snapshot)
2073 *found_consistent_snapshot =
false;
2117 elog(
ERROR,
"could not find record while advancing replication slot: %s",
2132 *found_consistent_snapshot =
true;
#define Assert(condition)
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
void * load_external_function(const char *filename, const char *funcname, bool signalNotFound, void **filehandle)
int errdetail(const char *fmt,...)
ErrorContextCallback * error_context_stack
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
void InvalidateSystemCaches(void)
static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static void update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr lsn)
XLogRecPtr LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto, bool *found_consistent_snapshot)
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
void FreeDecodingContext(LogicalDecodingContext *ctx)
bool LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr first_lsn)
static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static void output_plugin_error_callback(void *arg)
static void begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
static void stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
static LogicalDecodingContext * StartupDecodingContext(List *output_plugin_options, XLogRecPtr start_lsn, TransactionId xmin_horizon, bool need_full_snapshot, bool fast_forward, bool in_create, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
bool DecodingContextReady(LogicalDecodingContext *ctx)
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact)
static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
void UpdateDecodingStats(LogicalDecodingContext *ctx)
void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
void ResetLogicalStreamingState(void)
void LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
struct LogicalErrorCallbackState LogicalErrorCallbackState
static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message)
static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid, const char *gid)
static void shutdown_cb_wrapper(LogicalDecodingContext *ctx)
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
void CheckLogicalDecodingRequirements(void)
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin)
static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr last_lsn)
static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message)
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
void(* LogicalOutputPluginWriterUpdateProgress)(struct LogicalDecodingContext *lr, XLogRecPtr Ptr, TransactionId xid, bool skipped_xact)
void(* LogicalOutputPluginWriterWrite)(struct LogicalDecodingContext *lr, XLogRecPtr Ptr, TransactionId xid, bool last_write)
LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void * palloc0(Size size)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define CHECK_FOR_INTERRUPTS()
void namestrcpy(Name name, const char *str)
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
static const char * plugin
void pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *repSlotStat)
#define PROC_IN_LOGICAL_DECODING
TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly)
MemoryContextSwitchTo(old_ctx)
ReorderBuffer * ReorderBufferAllocate(void)
void ReorderBufferFree(ReorderBuffer *rb)
ResourceOwner CurrentResourceOwner
void ReplicationSlotMarkDirty(void)
void ReplicationSlotReserveWal(void)
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
ReplicationSlot * MyReplicationSlot
void ReplicationSlotSave(void)
void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
void ReplicationSlotsComputeRequiredLSN(void)
void CheckSlotRequirements(void)
#define SlotIsPhysical(slot)
bool IsSyncingReplicationSlots(void)
void SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
void FreeSnapshotBuilder(SnapBuild *builder)
SnapBuild * AllocateSnapshotBuilder(ReorderBuffer *reorder, TransactionId xmin_horizon, XLogRecPtr start_lsn, bool need_full_snapshot, bool in_slot_creation, XLogRecPtr two_phase_at)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
StringInfo makeStringInfo(void)
struct ErrorContextCallback * previous
void(* callback)(void *arg)
OutputPluginOptions options
struct SnapBuild * snapshot_builder
XLogRecPtr write_location
LogicalOutputPluginWriterPrepareWrite prepare_write
OutputPluginCallbacks callbacks
List * output_plugin_options
LogicalOutputPluginWriterWrite write
struct ReorderBuffer * reorder
LogicalOutputPluginWriterUpdateProgress update_progress
XLogRecPtr report_location
LogicalDecodingContext * ctx
const char * callback_name
LogicalDecodeStreamChangeCB stream_change_cb
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
LogicalDecodeStreamMessageCB stream_message_cb
LogicalDecodeFilterPrepareCB filter_prepare_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeTruncateCB truncate_cb
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
LogicalDecodeStreamPrepareCB stream_prepare_cb
LogicalDecodeCommitPreparedCB commit_prepared_cb
LogicalDecodeStreamStartCB stream_start_cb
LogicalDecodePrepareCB prepare_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeCommitCB commit_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamAbortCB stream_abort_cb
LogicalDecodeBeginPrepareCB begin_prepare_cb
LogicalDecodeChangeCB change_cb
LogicalDecodeShutdownCB shutdown_cb
PgStat_Counter stream_count
PgStat_Counter total_txns
PgStat_Counter total_bytes
PgStat_Counter spill_txns
PgStat_Counter stream_txns
PgStat_Counter spill_count
PgStat_Counter stream_bytes
PgStat_Counter spill_bytes
ReorderBufferStreamMessageCB stream_message
ReorderBufferStreamChangeCB stream_change
ReorderBufferBeginCB begin_prepare
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferCommitPreparedCB commit_prepared
ReorderBufferUpdateProgressTxnCB update_progress_txn
ReorderBufferMessageCB message
ReorderBufferRollbackPreparedCB rollback_prepared
ReorderBufferPrepareCB prepare
ReorderBufferStreamStopCB stream_stop
ReorderBufferApplyChangeCB apply_change
ReorderBufferStreamPrepareCB stream_prepare
ReorderBufferStreamAbortCB stream_abort
ReorderBufferCommitCB commit
ReorderBufferStreamStartCB stream_start
ReorderBufferStreamCommitCB stream_commit
ReorderBufferApplyTruncateCB apply_truncate
ReorderBufferBeginCB begin
TransactionId catalog_xmin
XLogRecPtr confirmed_flush
ReplicationSlotInvalidationCause invalidated
XLogRecPtr candidate_xmin_lsn
TransactionId effective_catalog_xmin
XLogRecPtr candidate_restart_valid
TransactionId effective_xmin
XLogRecPtr candidate_restart_lsn
TransactionId candidate_catalog_xmin
ReplicationSlotPersistentData data
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
#define InvalidTransactionId
#define TransactionIdIsValid(xid)
bool IsTransactionOrTransactionBlock(void)
TransactionId CheckXidAlive
bool IsTransactionState(void)
TransactionId GetTopTransactionIdIfAny(void)
bool RecoveryInProgress(void)
WalLevel GetActiveWalLevelOnStandby(void)
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
void XLogReaderFree(XLogReaderState *state)
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
void wal_segment_close(XLogReaderState *state)
void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)