140 #include <sys/stat.h>
200 #include "utils/fmgroids.h"
211 #define NAPTIME_PER_CYCLE 1000
349 #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
441 return _(
"logical replication table synchronization worker");
443 return _(
"logical replication parallel apply worker");
445 return _(
"logical replication apply worker");
458 char *originname,
Size szoriginname)
463 snprintf(originname, szoriginname,
"pg_%u_%u", suboid, relid);
468 snprintf(originname, szoriginname,
"pg_%u", suboid);
504 if (rel->
state != SUBREL_STATE_READY &&
505 rel->
state != SUBREL_STATE_UNKNOWN)
507 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
508 errmsg(
"logical replication parallel apply worker for subscription \"%s\" will stop",
510 errdetail(
"Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
512 return rel->
state == SUBREL_STATE_READY;
515 return (rel->
state == SUBREL_STATE_READY ||
516 (rel->
state == SUBREL_STATE_SYNCDONE &&
610 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
611 errmsg_internal(
"invalid transaction ID in streamed replication transaction")));
613 switch (apply_action)
764 int num_phys_attrs = desc->
natts;
778 defmap = (
int *)
palloc(num_phys_attrs *
sizeof(
int));
801 defmap[num_defaults] =
attnum;
806 for (
i = 0;
i < num_defaults;
i++)
827 for (
i = 0;
i < natts;
i++)
832 if (!att->attisdropped && remoteattnum >= 0)
836 Assert(remoteattnum < tupleData->ncols);
849 typioparam, att->atttypmod);
866 typioparam, att->atttypmod);
871 (
errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
872 errmsg(
"incorrect binary data format in logical replication column %d",
939 for (
i = 0;
i < natts;
i++)
944 if (remoteattnum < 0)
947 Assert(remoteattnum < tupleData->ncols);
964 typioparam, att->atttypmod);
981 typioparam, att->atttypmod);
986 (
errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
987 errmsg(
"incorrect binary data format in logical replication column %d",
1044 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1045 errmsg_internal(
"incorrect commit LSN %X/%X in commit message (expected %X/%X)",
1069 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1070 errmsg_internal(
"tablesync worker received a BEGIN PREPARE message")));
1136 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1137 errmsg_internal(
"incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
1302 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1308 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1309 errmsg_internal(
"tablesync worker received a STREAM PREPARE message")));
1316 switch (apply_action)
1339 elog(
DEBUG1,
"finished processing the STREAM PREPARE command");
1397 elog(
DEBUG1,
"finished processing the STREAM PREPARE command");
1401 elog(
ERROR,
"unexpected apply action: %d", (
int) apply_action);
1438 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1498 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1512 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1513 errmsg_internal(
"invalid transaction ID in streamed replication transaction")));
1523 switch (apply_action)
1609 elog(
ERROR,
"unexpected apply action: %d", (
int) apply_action);
1653 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1658 switch (apply_action)
1695 elog(
DEBUG1,
"applied %u changes in the streaming chunk",
1725 elog(
ERROR,
"unexpected apply action: %d", (
int) apply_action);
1846 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1853 xid = abort_data.
xid;
1854 subxid = abort_data.
subxid;
1855 toplevel_xact = (xid == subxid);
1861 switch (apply_action)
1871 elog(
DEBUG1,
"finished processing the STREAM ABORT command");
1974 elog(
DEBUG1,
"finished processing the STREAM ABORT command");
1978 elog(
ERROR,
"unexpected apply action: %d", (
int) apply_action);
2012 if (last_fileno != fileno || last_offset != offset)
2013 elog(
ERROR,
"unexpected message left in streaming transaction's changes file \"%s\"",
2027 char *buffer = NULL;
2048 elog(
DEBUG1,
"replaying changes from file \"%s\"", path);
2098 elog(
ERROR,
"incorrect length %d in streaming transaction's changes file \"%s\"",
2135 if (nchanges % 1000 == 0)
2136 elog(
DEBUG1,
"replayed %d changes from file \"%s\"",
2143 elog(
DEBUG1,
"replayed %d (all) changes from file \"%s\"",
2165 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
2173 switch (apply_action)
2189 elog(
DEBUG1,
"finished processing the STREAM COMMIT command");
2243 elog(
DEBUG1,
"finished processing the STREAM COMMIT command");
2247 elog(
ERROR,
"unexpected apply action: %d", (
int) apply_action);
2382 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2383 errmsg(
"user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2508 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2509 errmsg(
"publisher did not send replica identity column "
2510 "expected by the logical replication target relation \"%s.%s\"",
2515 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2516 errmsg(
"logical replication target relation \"%s.%s\" has "
2517 "neither REPLICA IDENTITY index nor PRIMARY "
2518 "KEY and published relation does not have "
2519 "REPLICA IDENTITY FULL",
2593 if (!att->attisdropped && remoteattnum >= 0)
2606 has_oldtup ? &oldtup : &newtup);
2653 remoteslot, &localslot);
2684 "logical replication did not find row to be updated "
2685 "in replication target relation \"%s\"",
2791 remoteslot, &localslot);
2811 "logical replication did not find row to be deleted "
2812 "in replication target relation \"%s\"",
2846 (remoterel->
replident == REPLICA_IDENTITY_FULL));
2851 remoteslot, *localslot);
2854 remoteslot, *localslot);
2895 Assert(remoteslot != NULL);
2898 remoteslot, estate);
2899 Assert(partrelinfo != NULL);
2917 if (remoteslot_part == NULL)
2928 remoteslot_part =
ExecCopySlot(remoteslot_part, remoteslot);
2972 remoteslot_part, &localslot);
2982 "logical replication did not find row to be updated "
2983 "in replication target relation's partition \"%s\"",
3001 if (!partrel->
rd_rel->relispartition ||
3021 localslot, remoteslot_part);
3044 remoteslot_part, remoteslot);
3048 remoteslot =
ExecCopySlot(remoteslot, remoteslot_part);
3058 Assert(partrelinfo_new != partrelinfo);
3079 if (remoteslot_part == NULL)
3103 elog(
ERROR,
"unrecognized CmdType: %d", (
int) operation);
3116 bool cascade =
false;
3117 bool restart_seqs =
false;
3139 foreach(lc, remote_relids)
3155 remote_rels =
lappend(remote_rels, rel);
3173 foreach(child, children)
3195 rels =
lappend(rels, childrel);
3196 part_rels =
lappend(part_rels, childrel);
3200 relids_logged =
lappend_oid(relids_logged, childrelid);
3215 foreach(lc, remote_rels)
3221 foreach(lc, part_rels)
3334 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
3335 errmsg(
"invalid logical replication message type \"%c\"",
action)));
3357 bool *have_pending_txes)
3388 *have_pending_txes =
true;
3445 bool ping_sent =
false;
3454 "ApplyMessageContext",
3462 "LogicalStreamingContext",
3484 bool endofstream =
false;
3507 (
errmsg(
"data stream from publisher has ended")));
3540 if (last_received < start_lsn)
3541 last_received = start_lsn;
3543 if (last_received < end_lsn)
3544 last_received = end_lsn;
3554 bool reply_requested;
3560 if (last_received < end_lsn)
3561 last_received = end_lsn;
3640 bool requestReply =
false;
3657 (
errcode(ERRCODE_CONNECTION_FAILURE),
3658 errmsg(
"terminating logical replication worker due to timeout")));
3667 requestReply =
true;
3715 bool have_pending_txes;
3725 if (recvpos < last_recvpos)
3726 recvpos = last_recvpos;
3734 if (!have_pending_txes)
3735 flushpos = writepos = recvpos;
3737 if (writepos < last_writepos)
3738 writepos = last_writepos;
3740 if (flushpos < last_flushpos)
3741 flushpos = last_flushpos;
3747 writepos == last_writepos &&
3748 flushpos == last_flushpos &&
3771 elog(
DEBUG2,
"sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
3780 if (recvpos > last_recvpos)
3781 last_recvpos = recvpos;
3782 if (writepos > last_writepos)
3783 last_writepos = writepos;
3784 if (flushpos > last_flushpos)
3785 last_flushpos = flushpos;
3827 bool started_tx =
false;
3853 (
errmsg(
"%s for subscription \"%s\" will stop because the subscription was removed",
3867 (
errmsg(
"%s for subscription \"%s\" will stop because the subscription was disabled",
3897 (
errmsg(
"logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
3902 (
errmsg(
"%s for subscription \"%s\" will restart because of a parameter change",
3911 elog(
ERROR,
"subscription %u changed unexpectedly",
4086 if (subxacts[
i - 1].xid == xid)
4184 elog(
DEBUG1,
"opening file \"%s\" for streamed changes", path);
4206 path, O_RDWR,
false);
4304 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
4307 snprintf(gid, szgid,
"pg_gid_%u_%u", subid, xid);
4321 char *syncslotname = NULL;
4351 pfree(syncslotname);
4426 (
errmsg(
"%s for subscription %u will not start because the subscription was removed during startup",
4442 (
errmsg(
"%s for subscription \"%s\" will not start because the subscription was disabled during startup",
4459 (
errmsg(
"logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
4465 (
errmsg(
"%s for subscription \"%s\" has started",
4478 char *myslotname = NULL;
4505 elog(
DEBUG1,
"connecting to publisher using connection string \"%s\"",
4515 sizeof(originname));
4534 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
4535 errmsg(
"subscription has no replication slot set")));
4540 originname,
sizeof(originname));
4553 (
errcode(ERRCODE_CONNECTION_FAILURE),
4554 errmsg(
"could not connect to the publisher: %s",
err)));
4575 options.startpoint = origin_startpos;
4576 options.slotname = myslotname;
4579 options.proto.logical.proto_version =
4595 options.proto.logical.streaming_str =
"parallel";
4601 options.proto.logical.streaming_str =
"on";
4606 options.proto.logical.streaming_str = NULL;
4610 options.proto.logical.twophase =
false;
4628 options.proto.logical.twophase =
true;
4642 (
errmsg_internal(
"logical replication apply worker for subscription \"%s\" two_phase is %s",
4695 errmsg(
"subscription \"%s\" has been disabled because of an error",
4743 errmsg(
"logical replication starts skipping transaction at LSN %X/%X",
4757 (
errmsg(
"logical replication completed skipping transaction at LSN %X/%X",
4779 bool started_tx =
false;
4819 if (subform->subskiplsn == myskiplsn)
4821 bool nulls[Natts_pg_subscription];
4822 bool replaces[Natts_pg_subscription];
4826 memset(nulls,
false,
sizeof(nulls));
4827 memset(replaces,
false,
sizeof(replaces));
4831 replaces[Anum_pg_subscription_subskiplsn - 1] =
true;
4837 if (myskiplsn != finish_lsn)
4840 errdetail(
"Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
4863 if (errarg->
rel == NULL)
4866 errcontext(
"processing remote data for replication origin \"%s\" during message type \"%s\"",
4870 errcontext(
"processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
4875 errcontext(
"processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
4886 errcontext(
"processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
4893 errcontext(
"processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
4904 errcontext(
"processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
4912 errcontext(
"processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
4978 foreach(lc2, workers)
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_reset_subtrans(void)
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerShared * MyParallelShared
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_allocate_worker(TransactionId xid)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void pa_decr_and_wait_stream_block(void)
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
static void subxact_filename(char *path, Oid subid, TransactionId xid)
static void begin_replication_step(void)
static void end_replication_step(void)
static void cleanup_subxact_info(void)
static void apply_handle_stream_prepare(StringInfo s)
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
static void subxact_info_add(TransactionId xid)
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
void stream_cleanup_files(Oid subid, TransactionId xid)
MemoryContext ApplyMessageContext
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
static void apply_handle_type(StringInfo s)
static void apply_handle_truncate(StringInfo s)
void InitializeApplyWorker(void)
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
static void start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
@ TRANS_LEADER_SEND_TO_PARALLEL
@ TRANS_LEADER_PARTIAL_SERIALIZE
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
struct ApplyExecutionData ApplyExecutionData
static void changes_filename(char *path, Oid subid, TransactionId xid)
static void apply_worker_exit(void)
static BufFile * stream_fd
static void apply_handle_update(StringInfo s)
void stream_stop_internal(TransactionId xid)
static void apply_handle_stream_commit(StringInfo s)
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
static void stop_skipping_changes(void)
struct ApplySubXactData ApplySubXactData
#define NAPTIME_PER_CYCLE
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
static uint32 parallel_stream_nchanges
static void apply_handle_commit_prepared(StringInfo s)
static void LogicalRepApplyLoop(XLogRecPtr last_received)
void LogicalRepWorkersWakeupAtCommit(Oid subid)
bool IsLogicalWorker(void)
static ApplySubXactData subxact_data
static void DisableSubscriptionAndExit(void)
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
ApplyErrorCallbackArg apply_error_callback_arg
bool in_remote_transaction
static XLogRecPtr skip_xact_finish_lsn
static void stream_open_file(Oid subid, TransactionId xid, bool first_segment)
static void apply_handle_delete(StringInfo s)
void apply_dispatch(StringInfo s)
#define is_skipping_changes()
static void stream_write_change(char action, StringInfo s)
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
static void ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, off_t offset)
static void apply_handle_begin(StringInfo s)
static dlist_head lsn_mapping
bool IsLogicalParallelApplyWorker(void)
void AtEOXact_LogicalRepWorkers(bool isCommit)
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
static void finish_edata(ApplyExecutionData *edata)
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
static void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
static void start_apply(XLogRecPtr origin_startpos)
ErrorContextCallback * apply_error_context_stack
static void stream_abort_internal(TransactionId xid, TransactionId subxid)
static void apply_handle_commit(StringInfo s)
void stream_start_internal(TransactionId xid, bool first_segment)
static List * on_commit_wakeup_workers_subids
static void apply_handle_stream_abort(StringInfo s)
static void apply_handle_relation(StringInfo s)
void set_apply_error_context_origin(char *originname)
struct ApplyErrorCallbackArg ApplyErrorCallbackArg
MemoryContext ApplyContext
static void subxact_info_write(Oid subid, TransactionId xid)
static void TargetPrivilegesCheck(Relation rel, AclMode mode)
static void apply_handle_prepare(StringInfo s)
static void apply_handle_rollback_prepared(StringInfo s)
static void apply_handle_stream_stop(StringInfo s)
static void apply_handle_origin(StringInfo s)
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
WalReceiverConn * LogRepWorkerWalRcvConn
static XLogRecPtr remote_final_lsn
static bool MySubscriptionValid
void apply_error_callback(void *arg)
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
static MemoryContext LogicalStreamingContext
void maybe_reread_subscription(void)
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
static bool in_streamed_transaction
struct SubXactInfo SubXactInfo
static void apply_handle_begin_prepare(StringInfo s)
struct FlushPosition FlushPosition
void ApplyWorkerMain(Datum main_arg)
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
static void apply_handle_stream_start(StringInfo s)
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Subscription * MySubscription
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
static void stream_close_file(void)
static TransactionId stream_xid
static void apply_handle_insert(StringInfo s)
static const char * get_worker_name(void)
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
static void subxact_info_read(Oid subid, TransactionId xid)
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
static void reset_apply_error_context_info(void)
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_IDLEINTRANSACTION
Bitmapset * bms_add_member(Bitmapset *a, int x)
static Datum values[MAXATTR]
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset)
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
void BufFileClose(BufFile *file)
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
#define OidIsValid(objectId)
elog(ERROR, "%s: %s", p2, msg)
void load_file(const char *filename, bool restricted)
int errmsg_internal(const char *fmt,...)
void EmitErrorReport(void)
int errdetail(const char *fmt,...)
ErrorContextCallback * error_context_stack
void FlushErrorState(void)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
bool equal(const void *a, const void *b)
void err(int eval, const char *fmt,...)
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
void EvalPlanQualEnd(EPQState *epqstate)
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
const TupleTableSlotOps TTSOpsVirtual