299#define NAPTIME_PER_CYCLE 1000
455#define MIN_XID_ADVANCE_INTERVAL 100
456#define MAX_XID_ADVANCE_INTERVAL 180000
517#define is_skipping_changes() (unlikely(XLogRecPtrIsValid(skip_xact_finish_lsn)))
694 errmsg(
"logical replication parallel apply worker for subscription \"%s\" will stop",
696 errdetail(
"Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
707 elog(
ERROR,
"sequence synchronization worker is not expected to apply changes");
809 errmsg_internal(
"invalid transaction ID in streamed replication transaction")));
879 edata->targetRel = rel;
986 if (
cattr->attisdropped ||
cattr->attgenerated)
1001 defmap[num_defaults] =
attnum;
1006 for (
i = 0;
i < num_defaults;
i++)
1027 for (
i = 0;
i < natts;
i++)
1049 typioparam,
att->atttypmod);
1066 typioparam,
att->atttypmod);
1072 errmsg(
"incorrect binary data format in logical replication column %d",
1139 for (
i = 0;
i < natts;
i++)
1164 typioparam,
att->atttypmod);
1181 typioparam,
att->atttypmod);
1187 errmsg(
"incorrect binary data format in logical replication column %d",
1245 errmsg_internal(
"incorrect commit LSN %X/%08X in commit message (expected %X/%08X)",
1273 errmsg_internal(
"tablesync worker received a BEGIN PREPARE message")));
1340 errmsg_internal(
"incorrect prepare LSN %X/%08X in prepare message (expected %X/%08X)",
1536 errmsg_internal(
"tablesync worker received a STREAM PREPARE message")));
1570 elog(
DEBUG1,
"finished processing the STREAM PREPARE command");
1632 elog(
DEBUG1,
"finished processing the STREAM PREPARE command");
1751 errmsg_internal(
"invalid transaction ID in streamed replication transaction")));
1934 elog(
DEBUG1,
"applied %u changes in the streaming chunk",
2110 elog(
DEBUG1,
"finished processing the STREAM ABORT command");
2213 elog(
DEBUG1,
"finished processing the STREAM ABORT command");
2252 elog(
ERROR,
"unexpected message left in streaming transaction's changes file \"%s\"",
2265 char *buffer =
NULL;
2286 elog(
DEBUG1,
"replaying changes from file \"%s\"", path);
2336 elog(
ERROR,
"incorrect length %d in streaming transaction's changes file \"%s\"",
2373 elog(
DEBUG1,
"replayed %d changes from file \"%s\"",
2380 elog(
DEBUG1,
"replayed %d (all) changes from file \"%s\"",
2426 elog(
DEBUG1,
"finished processing the STREAM COMMIT command");
2480 elog(
DEBUG1,
"finished processing the STREAM COMMIT command");
2623 errmsg(
"user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2681 estate =
edata->estate;
2732 !
relinfo->ri_RelationDesc->rd_rel->relhasindex ||
2770 errmsg(
"publisher did not send replica identity column "
2771 "expected by the logical replication target relation \"%s.%s\"",
2777 errmsg(
"logical replication target relation \"%s.%s\" has "
2778 "neither REPLICA IDENTITY index nor PRIMARY "
2779 "KEY and published relation does not have "
2780 "REPLICA IDENTITY FULL",
2845 estate =
edata->estate;
2859 for (
int i = 0;
i <
remoteslot->tts_tupleDescriptor->natts;
i++)
3063 estate =
edata->estate;
3123 !localrel->
rd_rel->relhasindex ||
3196#ifdef USE_ASSERT_CHECKING
3203 edata->targetRel->attrmap)));
3244 elog(
ERROR,
"cache lookup failed for index %u", localindexoid);
3318 errmsg(
"could not detect conflict as the leader apply worker has exited")));
3536 if (!partrel->
rd_rel->relispartition ||
3636 elog(
ERROR,
"unrecognized CmdType: %d", (
int) operation);
3649 bool cascade =
false;
3650 bool restart_seqs =
false;
3706 foreach(child, children)
3874 errmsg(
"invalid logical replication message type \"??? (%d)\"", action)));
3994 "ApplyMessageContext",
4002 "LogicalStreamingContext",
4047 (
errmsg(
"data stream from publisher has ended")));
4130 elog(
ERROR,
"cannot get the latest WAL position from the publisher");
4250 errmsg(
"terminating logical replication worker due to timeout")));
4364 elog(
DEBUG2,
"sending feedback (force %d) to recv %X/%08X, write %X/%08X, flush %X/%08X",
4537 elog(
DEBUG2,
"sending publisher status request message");
4635 errmsg_internal(
"oldest_nonremovable_xid transaction ID could be advanced prematurely"),
4749 elog(
DEBUG2,
"confirmed flush up to remote lsn %X/%08X: new oldest_nonremovable_xid %u",
4822 errmsg(
"logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
4824 errdetail(
"Retention is stopped because the apply process has not caught up with the publisher within the configured max_retention_duration."));
4849 errmsg(
"logical replication worker for subscription \"%s\" will resume retaining the information for detecting conflicts",
4852 ?
errdetail(
"Retention is re-enabled because the apply process has caught up with the publisher within the configured max_retention_duration.")
4853 :
errdetail(
"Retention is re-enabled because max_retention_duration has been set to unlimited."));
4930 rdt_data->table_sync_wait_time = 0;
4970 else if (
rdt_data->xid_advance_interval &&
5067 (
errmsg(
"logical replication worker for subscription \"%s\" will stop because the subscription was removed",
5081 (
errmsg(
"logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
5112 (
errmsg(
"logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
5116 (
errmsg(
"logical replication worker for subscription \"%s\" will restart because of a parameter change",
5130 errmsg(
"logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
5134 errmsg(
"logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
5143 elog(
ERROR,
"subscription %u changed unexpectedly",
5318 if (subxacts[
i - 1].xid == xid)
5416 elog(
DEBUG1,
"opening file \"%s\" for streamed changes", path);
5525 options->proto.logical.proto_version =
5541 options->proto.logical.streaming_str =
"parallel";
5547 options->proto.logical.streaming_str =
"on";
5556 options->proto.logical.twophase =
false;
5628 char *slotname =
NULL;
5645 errmsg(
"subscription has no replication slot set")));
5670 errmsg(
"apply worker for subscription \"%s\" could not connect to the publisher: %s",
5696 options.proto.logical.twophase =
true;
5718 (
errmsg_internal(
"logical replication apply worker for subscription \"%s\" two_phase is %s",
5774 (
errmsg(
"logical replication worker for subscription %u will not start because the subscription was removed during startup",
5790 (
errmsg(
"logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
5815 errmsg(
"logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
5839 errmsg(
"logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
5844 errmsg(
"logical replication sequence synchronization worker for subscription \"%s\" has started",
5848 errmsg(
"logical replication apply worker for subscription \"%s\" has started",
5911 elog(
DEBUG1,
"connecting to publisher using connection string \"%s\"",
5985 errmsg(
"subscription \"%s\" has been disabled because of an error",
6042 errmsg(
"logical replication starts skipping transaction at LSN %X/%08X",
6056 errmsg(
"logical replication completed skipping transaction at LSN %X/%08X",
6131 memset(nulls,
false,
sizeof(nulls));
6145 errdetail(
"Remote transaction's finish WAL location (LSN) %X/%08X did not match skip-LSN %X/%08X.",
6173 errcontext(
"processing remote data for replication origin \"%s\" during message type \"%s\"",
6177 errcontext(
"processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
6182 errcontext(
"processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
6190 if (
errarg->remote_attnum < 0)
6193 errcontext(
"processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
6196 errarg->rel->remoterel.nspname,
6197 errarg->rel->remoterel.relname,
6200 errcontext(
"processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
6203 errarg->rel->remoterel.nspname,
6204 errarg->rel->remoterel.relname,
6211 errcontext(
"processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
6214 errarg->rel->remoterel.nspname,
6215 errarg->rel->remoterel.relname,
6216 errarg->rel->remoterel.attnames[
errarg->remote_attnum],
6219 errcontext(
"processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
6222 errarg->rel->remoterel.nspname,
6223 errarg->rel->remoterel.relname,
6224 errarg->rel->remoterel.attnames[
errarg->remote_attnum],
6285 foreach(
lc2, workers)
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_reset_subtrans(void)
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerShared * MyParallelShared
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_allocate_worker(TransactionId xid)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void pa_decr_and_wait_stream_block(void)
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
static void subxact_filename(char *path, Oid subid, TransactionId xid)
static void begin_replication_step(void)
static void end_replication_step(void)
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
static void cleanup_subxact_info(void)
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
static void apply_handle_stream_prepare(StringInfo s)
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
static void subxact_info_add(TransactionId xid)
static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
static XLogRecPtr last_flushpos
void stream_cleanup_files(Oid subid, TransactionId xid)
MemoryContext ApplyMessageContext
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
static void apply_handle_type(StringInfo s)
static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
static void wait_for_local_flush(RetainDeadTuplesData *rdt_data)
static void apply_handle_truncate(StringInfo s)
@ RDT_WAIT_FOR_PUBLISHER_STATUS
@ RDT_RESUME_CONFLICT_INFO_RETENTION
@ RDT_REQUEST_PUBLISHER_STATUS
@ RDT_WAIT_FOR_LOCAL_FLUSH
@ RDT_STOP_CONFLICT_INFO_RETENTION
static void run_apply_worker(void)
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
static void get_candidate_xid(RetainDeadTuplesData *rdt_data)
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
@ TRANS_LEADER_SEND_TO_PARALLEL
@ TRANS_LEADER_PARTIAL_SERIALIZE
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
static void changes_filename(char *path, Oid subid, TransactionId xid)
bool InitializingApplyWorker
static void apply_worker_exit(void)
static BufFile * stream_fd
static void apply_handle_update(StringInfo s)
void stream_stop_internal(TransactionId xid)
static void apply_handle_stream_commit(StringInfo s)
void start_apply(XLogRecPtr origin_startpos)
static void stop_skipping_changes(void)
#define NAPTIME_PER_CYCLE
static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
static bool update_retention_status(bool active)
static uint32 parallel_stream_nchanges
static void apply_handle_commit_prepared(StringInfo s)
static void LogicalRepApplyLoop(XLogRecPtr last_received)
void LogicalRepWorkersWakeupAtCommit(Oid subid)
#define MAX_XID_ADVANCE_INTERVAL
bool IsLogicalWorker(void)
static ApplySubXactData subxact_data
static void ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, pgoff_t offset)
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
static ApplyErrorCallbackArg apply_error_callback_arg
bool in_remote_transaction
static XLogRecPtr skip_xact_finish_lsn
static void stream_open_file(Oid subid, TransactionId xid, bool first_segment)
static void apply_handle_delete(StringInfo s)
void apply_dispatch(StringInfo s)
static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
#define is_skipping_changes()
static void stream_write_change(char action, StringInfo s)
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
#define MIN_XID_ADVANCE_INTERVAL
static void apply_handle_begin(StringInfo s)
void DisableSubscriptionAndExit(void)
static dlist_head lsn_mapping
bool IsLogicalParallelApplyWorker(void)
void AtEOXact_LogicalRepWorkers(bool isCommit)
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
static void finish_edata(ApplyExecutionData *edata)
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
static void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
ErrorContextCallback * apply_error_context_stack
static void stream_abort_internal(TransactionId xid, TransactionId subxid)
static void apply_handle_commit(StringInfo s)
static bool IsIndexUsableForFindingDeletedTuple(Oid localindexoid, TransactionId conflict_detection_xmin)
void stream_start_internal(TransactionId xid, bool first_segment)
static List * on_commit_wakeup_workers_subids
static void apply_handle_stream_abort(StringInfo s)
static void apply_handle_relation(StringInfo s)
void set_apply_error_context_origin(char *originname)
static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data, bool status_received)
MemoryContext ApplyContext
static void subxact_info_write(Oid subid, TransactionId xid)
static void TargetPrivilegesCheck(Relation rel, AclMode mode)
static void apply_handle_prepare(StringInfo s)
static void apply_handle_rollback_prepared(StringInfo s)
void SetupApplyOrSyncWorker(int worker_slot)
static void apply_handle_stream_stop(StringInfo s)
static void apply_handle_origin(StringInfo s)
static void request_publisher_status(RetainDeadTuplesData *rdt_data)
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data, bool status_received)
static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data, bool status_received)
WalReceiverConn * LogRepWorkerWalRcvConn
static void resume_conflict_info_retention(RetainDeadTuplesData *rdt_data)
static XLogRecPtr remote_final_lsn
static bool MySubscriptionValid
void apply_error_callback(void *arg)
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
static MemoryContext LogicalStreamingContext
void maybe_reread_subscription(void)
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
void InitializeLogRepWorker(void)
static bool in_streamed_transaction
static void apply_handle_begin_prepare(StringInfo s)
void ApplyWorkerMain(Datum main_arg)
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
static void apply_handle_stream_start(StringInfo s)
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
static void on_exit_clear_xact_state(int code, Datum arg)
static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
Subscription * MySubscription
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
static void stream_close_file(void)
static TransactionId stream_xid
static void apply_handle_insert(StringInfo s)
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
static bool FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, TupleTableSlot *remoteslot, TransactionId *delete_xid, ReplOriginId *delete_origin, TimestampTz *delete_time)
static void subxact_info_read(Oid subid, TransactionId xid)
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
static void reset_apply_error_context_info(void)
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_IDLEINTRANSACTION
void BackgroundWorkerUnblockSignals(void)
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Bitmapset * bms_make_singleton(int x)
Bitmapset * bms_add_member(Bitmapset *a, int x)
static Datum values[MAXATTR]
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
int BufFileSeek(BufFile *file, int fileno, pgoff_t offset, int whence)
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
void BufFileTruncateFileSet(BufFile *file, int fileno, pgoff_t offset)
void BufFileTell(BufFile *file, int *fileno, pgoff_t *offset)
void BufFileClose(BufFile *file)
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
#define Assert(condition)
#define OidIsValid(objectId)
bool track_commit_timestamp
bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, ReplOriginId *localorigin, TimestampTz *localts)
void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples)
void InitConflictIndexes(ResultRelInfo *relInfo)
@ CT_UPDATE_ORIGIN_DIFFERS
@ CT_DELETE_ORIGIN_DIFFERS
void load_file(const char *filename, bool restricted)
int errmsg_internal(const char *fmt,...)
void EmitErrorReport(void)
int errdetail_internal(const char *fmt,...)
int errdetail(const char *fmt,...)
ErrorContextCallback * error_context_stack
void FlushErrorState(void)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
bool equal(const void *a, const void *b)
void err(int eval, const char *fmt,...)
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam, List *resultRelations)
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
void EvalPlanQualEnd(EPQState *epqstate)
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
void CheckSubscriptionRelkind(char localrelkind, char remoterelkind, const char *nspname, const char *relname)
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
bool RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, ReplOriginId *delete_origin, TimestampTz *delete_time)
bool RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, ReplOriginId *delete_origin, TimestampTz *delete_time)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
const TupleTableSlotOps TTSOpsVirtual
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
TupleConversionMap * ExecGetRootToChildMap(ResultRelInfo *resultRelInfo, EState *estate)
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos, Bitmapset *unpruned_relids)
void FreeExecutorState(EState *estate)
EState * CreateExecutorState(void)
#define GetPerTupleExprContext(estate)
#define GetPerTupleMemoryContext(estate)
#define EvalPlanQualSetSlot(epqstate, slot)
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
#define palloc_object(type)
#define palloc0_object(type)
void FileSetInit(FileSet *fileset)
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
void ProcessConfigFile(GucContext context)
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
static TransactionId HeapTupleHeaderGetXmin(const HeapTupleHeaderData *tup)
static void * GETSTRUCT(const HeapTupleData *tuple)
static void dlist_delete(dlist_node *node)
#define dlist_tail_element(type, membername, lhead)
#define dlist_foreach_modify(iter, lhead)
static bool dlist_is_empty(const dlist_head *head)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
#define DLIST_STATIC_INIT(name)
#define dlist_container(type, membername, ptr)
void index_close(Relation relation, LOCKMODE lockmode)
Relation index_open(Oid relationId, LOCKMODE lockmode)
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void AcceptInvalidationMessages(void)
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
void ResetLatch(Latch *latch)
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
void logicalrep_worker_attach(int slot)
void ApplyLauncherWakeup(void)
LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
LogicalRepWorker * MyLogicalRepWorker
void ApplyLauncherForgetWorkerStartTime(Oid subid)
List * lappend(List *list, void *datum)
List * lappend_oid(List *list, Oid datum)
List * list_append_unique_oid(List *list, Oid datum)
bool list_member_oid(const List *list, Oid datum)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
#define AccessExclusiveLock
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
#define LOGICALREP_COLUMN_UNCHANGED
@ LOGICAL_REP_MSG_TRUNCATE
@ LOGICAL_REP_MSG_STREAM_STOP
@ LOGICAL_REP_MSG_STREAM_PREPARE
@ LOGICAL_REP_MSG_STREAM_ABORT
@ LOGICAL_REP_MSG_BEGIN_PREPARE
@ LOGICAL_REP_MSG_STREAM_START
@ LOGICAL_REP_MSG_PREPARE
@ LOGICAL_REP_MSG_RELATION
@ LOGICAL_REP_MSG_MESSAGE
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
@ LOGICAL_REP_MSG_COMMIT_PREPARED
@ LOGICAL_REP_MSG_STREAM_COMMIT
#define LOGICALREP_PROTO_VERSION_NUM
#define LOGICALREP_COLUMN_BINARY
#define LOGICALREP_COLUMN_TEXT
char * get_rel_name(Oid relid)
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
char * get_namespace_name(Oid nspid)
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
char * MemoryContextStrdup(MemoryContext context, const char *string)
void MemoryContextReset(MemoryContext context)
MemoryContext TopTransactionContext
char * pstrdup(const char *in)
void * repalloc(void *pointer, Size size)
void pfree(void *pointer)
MemoryContext TopMemoryContext
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define RESUME_INTERRUPTS()
#define CHECK_FOR_INTERRUPTS()
#define HOLD_INTERRUPTS()
char * GetUserNameFromId(Oid roleid, bool noerr)
ObjectType get_relkind_objtype(char relkind)
ReplOriginId replorigin_create(const char *roname)
ReplOriginXactState replorigin_xact_state
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
XLogRecPtr replorigin_session_get_progress(bool flush)
void replorigin_xact_clear(bool clear_origin)
void replorigin_session_setup(ReplOriginId node, int acquired_by)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
FormData_pg_attribute * Form_pg_attribute
static uint32 pg_ceil_log2_32(uint32 num)
static PgChecksumMode mode
static int server_version
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
static void * list_nth(const List *list, int n)
static Datum LSNGetDatum(XLogRecPtr X)
void FreeSubscription(Subscription *sub)
void DisableSubscription(Oid subid)
void UpdateDeadTupleRetentionStatus(Oid subid, bool active)
Subscription * GetSubscription(Oid subid, bool missing_ok)
FormData_pg_subscription * Form_pg_subscription
static char buf[DEFAULT_XLOG_SEG_SIZE]
long pgstat_report_stat(bool force)
void pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype)
Expr * expression_planner(Expr *expr)
static Datum ObjectIdGetDatum(Oid X)
static int32 DatumGetInt32(Datum X)
static int fd(const char *x, int i)
TransactionId GetOldestActiveTransactionId(bool inCommitOnly, bool allDbs)
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
const char * logicalrep_message_type(LogicalRepMsgType action)
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
#define PqReplMsg_WALData
#define PqReplMsg_PrimaryStatusRequest
#define PqReplMsg_Keepalive
#define PqReplMsg_PrimaryStatusUpdate
#define PqReplMsg_StandbyStatusUpdate
static color newsub(struct colormap *cm, color co)
#define RelationGetRelid(relation)
#define RelationIsLogicallyLogged(relation)
#define RelationGetDescr(relation)
#define RelationGetRelationName(relation)
#define RELATION_IS_OTHER_TEMP(relation)
#define RelationGetNamespace(relation)
List * RelationGetIndexList(Relation relation)
ResourceOwner TopTransactionResourceOwner
ResourceOwner CurrentResourceOwner
Node * build_column_default(Relation rel, int attrno)
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Snapshot GetTransactionSnapshot(void)
void PushActiveSnapshot(Snapshot snapshot)
void PopActiveSnapshot(void)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
bool IsIndexUsableForReplicaIdentityFull(Relation idxrel, AttrMap *attrmap)
Oid GetRelationIdentityOrPK(Relation rel)
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
StringInfo makeStringInfo(void)
void resetStringInfo(StringInfo str)
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
LogicalRepMsgType command
LogicalRepRelMapEntry * rel
ResultRelInfo * targetRelInfo
PartitionTupleRouting * proute
ModifyTableState * mtstate
LogicalRepRelMapEntry * targetRel
TransactionId subxact_last
List * es_opened_result_relations
struct ErrorContextCallback * previous
void(* callback)(void *arg)
LogicalRepRelation remoterel
TimestampTz last_recv_time
LogicalRepWorkerType type
TransactionId oldest_nonremovable_xid
TimestampTz last_send_time
ResultRelInfo * resultRelInfo
ParallelApplyWorkerShared * shared
pg_atomic_uint32 pending_stream_count
XLogRecPtr last_commit_end
TimestampTz origin_timestamp
TimestampTz flushpos_update_time
FullTransactionId remote_oldestxid
FullTransactionId remote_wait_for
TimestampTz last_recv_time
TimestampTz candidate_xid_time
long table_sync_wait_time
FullTransactionId remote_nextxid
RetainDeadTuplesPhase phase
TransactionId candidate_xid
TupleDesc tts_tupleDescriptor
void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
void ProcessSyncingRelations(XLogRecPtr current_lsn)
void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue)
#define FirstLowInvalidHeapAttributeNumber
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache1(int cacheId, Datum key1)
#define SearchSysCacheCopy1(cacheId, key1)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs, bool run_as_table_owner)
bool AllTablesyncsReady(void)
bool HasSubscriptionTablesCached(void)
void UpdateTwoPhaseState(Oid suboid, char new_state)
#define InvalidTransactionId
#define FullTransactionIdPrecedesOrEquals(a, b)
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
static FullTransactionId FullTransactionIdFromU64(uint64 value)
#define TransactionIdEquals(id1, id2)
#define TransactionIdIsValid(xid)
#define InvalidFullTransactionId
#define FullTransactionIdIsValid(x)
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
void AfterTriggerEndQuery(EState *estate)
void AfterTriggerBeginQuery(void)
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
static CompactAttribute * TupleDescCompactAttr(TupleDesc tupdesc, int i)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
static void slot_getallattrs(TupleTableSlot *slot)
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res, int szgid)
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
void FinishPreparedTransaction(const char *gid, bool isCommit)
void SwitchToUntrustedUser(Oid userid, UserContext *context)
void RestoreUserContext(UserContext *context)
#define TimestampTzPlusMilliseconds(tz, ms)
#define WL_SOCKET_READABLE
#define WL_EXIT_ON_PM_DEATH
static StringInfoData reply_message
int wal_receiver_status_interval
#define walrcv_startstreaming(conn, options)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_send(conn, buffer, nbytes)
#define walrcv_server_version(conn)
#define walrcv_endstreaming(conn, next_tli)
#define walrcv_identify_system(conn, primary_tli)
#define walrcv_receive(conn, buffer, wait_fd)
@ PARALLEL_TRANS_FINISHED
static bool am_parallel_apply_worker(void)
@ WORKERTYPE_SEQUENCESYNC
@ WORKERTYPE_PARALLEL_APPLY
static bool am_sequencesync_worker(void)
static bool am_tablesync_worker(void)
static bool am_leader_apply_worker(void)
bool IsTransactionOrTransactionBlock(void)
bool PrepareTransactionBlock(const char *gid)
bool IsTransactionState(void)
void CommandCounterIncrement(void)
void StartTransactionCommand(void)
void SetCurrentStatementStartTimestamp(void)
bool IsTransactionBlock(void)
void BeginTransactionBlock(void)
void CommitTransactionCommand(void)
bool EndTransactionBlock(bool chain)
void AbortOutOfAnyTransaction(void)
CommandId GetCurrentCommandId(bool used)
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
XLogRecPtr XactLastCommitEnd
#define XLogRecPtrIsValid(r)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr