112 #include "utils/fmgroids.h" 121 #define NAPTIME_PER_CYCLE 1000 268 return (rel->
state == SUBREL_STATE_READY ||
269 (rel->
state == SUBREL_STATE_SYNCDONE &&
317 Assert(stream_fd != NULL);
417 int num_phys_attrs = desc->
natts;
431 defmap = (
int *)
palloc(num_phys_attrs *
sizeof(
int));
435 for (attnum = 0; attnum < num_phys_attrs; attnum++)
454 defmap[num_defaults] =
attnum;
460 for (i = 0; i < num_defaults; i++)
490 errcontext(
"processing remote data for replication target relation \"%s.%s\" column \"%s\", " 491 "remote type %s, local type %s",
519 errcallback.
arg = (
void *) &errarg;
525 for (i = 0; i < natts; i++)
530 if (!att->attisdropped && remoteattnum >= 0)
534 Assert(remoteattnum < tupleData->ncols);
547 typioparam, att->atttypmod);
564 typioparam, att->atttypmod);
569 (
errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
570 errmsg(
"incorrect binary data format in logical replication column %d",
645 errcallback.
arg = (
void *) &errarg;
651 for (i = 0; i < natts; i++)
656 if (remoteattnum < 0)
659 Assert(remoteattnum < tupleData->ncols);
676 typioparam, att->atttypmod);
693 typioparam, att->atttypmod);
698 (
errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
699 errmsg(
"incorrect binary data format in logical replication column %d",
777 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
778 errmsg(
"ORIGIN message sent out of order")));
817 xidhash =
hash_create(
"StreamXidHash", 1024, &hash_ctl,
908 for (i = subxact_data.
nsubxacts; i > 0; i--)
979 elog(
DEBUG1,
"received commit for streamed transaction %u", xid);
992 elog(
DEBUG1,
"replaying changes from file \"%s\"", path);
1034 if (nbytes !=
sizeof(len))
1037 errmsg(
"could not read from streaming transaction's changes file \"%s\": %m",
1049 errmsg(
"could not read from streaming transaction's changes file \"%s\": %m",
1067 if (nchanges % 1000 == 0)
1068 elog(
DEBUG1,
"replayed %d changes from file '%s'",
1077 elog(
DEBUG1,
"replayed %d (all) changes from file \"%s\"",
1273 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1274 errmsg(
"publisher did not send replica identity column " 1275 "expected by the logical replication target relation \"%s.%s\"",
1280 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1281 errmsg(
"logical replication target relation \"%s.%s\" has " 1282 "neither REPLICA IDENTITY index nor PRIMARY " 1283 "KEY and published relation does not have " 1284 "REPLICA IDENTITY FULL",
1348 if (!att->attisdropped && remoteattnum >= 0)
1366 has_oldtup ? &oldtup : &newtup);
1375 remoteslot, &newtup, rel);
1404 remoteslot, &localslot);
1433 "logical replication did not find row for update " 1434 "in replication target relation \"%s\"",
1524 remoteslot, &localslot);
1538 "logical replication did not find row for delete " 1539 "in replication target relation \"%s\"",
1568 (remoterel->
replident == REPLICA_IDENTITY_FULL));
1573 remoteslot, *localslot);
1576 remoteslot, *localslot);
1612 Assert(remoteslot != NULL);
1615 remoteslot, estate);
1616 Assert(partrelinfo != NULL);
1625 if (remoteslot_part == NULL)
1633 remoteslot_part =
ExecCopySlot(remoteslot_part, remoteslot);
1672 remoteslot_part, &localslot);
1692 "logical replication did not find row for update " 1693 "in replication target relation \"%s\"",
1701 if (!partrel->
rd_rel->relispartition ||
1719 localslot, remoteslot_part);
1742 remoteslot_part, remoteslot);
1746 remoteslot =
ExecCopySlot(remoteslot, remoteslot_part);
1757 Assert(partrelinfo_new != partrelinfo);
1773 if (remoteslot_part == NULL)
1797 elog(
ERROR,
"unrecognized CmdType: %d", (
int) operation);
1812 bool cascade =
false;
1813 bool restart_seqs =
false;
1830 foreach(lc, remote_relids)
1846 remote_rels =
lappend(remote_rels, rel);
1864 foreach(child, children)
1885 rels =
lappend(rels, childrel);
1886 part_rels =
lappend(part_rels, childrel);
1891 relids_logged =
lappend_oid(relids_logged, childrelid);
1907 foreach(lc, remote_rels)
1913 foreach(lc, part_rels)
1997 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1998 errmsg(
"invalid logical replication message type \"%c\"", action)));
2016 bool *have_pending_txes)
2047 *have_pending_txes =
true;
2097 bool ping_sent =
false;
2105 "ApplyMessageContext",
2113 "LogicalStreamingContext",
2126 bool endofstream =
false;
2149 (
errmsg(
"data stream from publisher has ended")));
2182 if (last_received < start_lsn)
2183 last_received = start_lsn;
2185 if (last_received < end_lsn)
2186 last_received = end_lsn;
2196 bool reply_requested;
2202 if (last_received < end_lsn)
2203 last_received = end_lsn;
2282 bool requestReply =
false;
2299 (
errmsg(
"terminating logical replication worker due to timeout")));
2308 requestReply =
true;
2341 bool have_pending_txes;
2351 if (recvpos < last_recvpos)
2352 recvpos = last_recvpos;
2360 if (!have_pending_txes)
2361 flushpos = writepos = recvpos;
2363 if (writepos < last_writepos)
2364 writepos = last_writepos;
2366 if (flushpos < last_flushpos)
2367 flushpos = last_flushpos;
2373 writepos == last_writepos &&
2374 flushpos == last_flushpos &&
2397 elog(
DEBUG2,
"sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
2405 if (recvpos > last_recvpos)
2406 last_recvpos = recvpos;
2407 if (writepos > last_writepos)
2408 last_writepos = writepos;
2409 if (flushpos > last_flushpos)
2410 last_flushpos = flushpos;
2421 bool started_tx =
false;
2446 (
errmsg(
"logical replication apply worker for subscription \"%s\" will " 2447 "stop because the subscription was removed",
2448 MySubscription->
name)));
2460 (
errmsg(
"logical replication apply worker for subscription \"%s\" will " 2461 "stop because the subscription was disabled",
2462 MySubscription->
name)));
2475 strcmp(newsub->
name, MySubscription->
name) != 0 ||
2482 (
errmsg(
"logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
2483 MySubscription->
name)));
2489 if (newsub->
dbid != MySubscription->
dbid)
2491 elog(
ERROR,
"subscription %u changed unexpectedly",
2645 errmsg(
"could not read from streaming transaction's subxact file \"%s\": %m",
2667 errmsg(
"could not read from streaming transaction's subxact file \"%s\": %m",
2685 Assert(stream_fd != NULL);
2711 for (i = subxact_data.
nsubxacts; i > 0; i--)
2714 if (subxacts[i - 1].xid == xid)
2832 Assert(stream_fd == NULL);
2839 Assert(first_segment || found);
2841 elog(
DEBUG1,
"opening file \"%s\" for streamed changes", path);
2901 Assert(stream_fd != NULL);
2924 Assert(stream_fd != NULL);
2927 len = (s->
len - s->
cursor) +
sizeof(
char);
3010 if (!MySubscription)
3013 (
errmsg(
"logical replication apply worker for subscription %u will not " 3014 "start because the subscription was removed during startup",
3025 (
errmsg(
"logical replication apply worker for subscription \"%s\" will not " 3026 "start because the subscription was disabled during startup",
3027 MySubscription->
name)));
3043 (
errmsg(
"logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
3047 (
errmsg(
"logical replication apply worker for subscription \"%s\" has started",
3048 MySubscription->
name)));
3053 elog(
DEBUG1,
"connecting to publisher using connection string \"%s\"",
3066 pfree(syncslotname);
3075 myslotname = MySubscription->
slotname;
3084 (
errmsg(
"subscription has no replication slot set")));
3088 snprintf(originname,
sizeof(originname),
"pg_%u", MySubscription->
oid);
3101 (
errmsg(
"could not connect to the publisher: %s", err)));
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Subscription * MySubscription
static void apply_handle_type(StringInfo s)
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
void ExecInitRangeTable(EState *estate, List *rangeTable)
#define LOGICALREP_PROTO_VERSION_NUM
static void stream_close_file(void)
static void stream_write_change(char action, StringInfo s)
static Oid GetRelationIdentityOrPK(Relation rel)
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
static void subxact_info_add(TransactionId xid)
static void subxact_info_read(Oid subid, TransactionId xid)
#define InvalidXLogRecPtr
#define AllocSetContextCreate
#define walrcv_endstreaming(conn, next_tli)
void table_close(Relation relation, LOCKMODE lockmode)
static void store_flush_position(XLogRecPtr remote_lsn)
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
MemoryContext TopTransactionContext
void AcceptInvalidationMessages(void)
static XLogRecPtr remote_final_lsn
Oid RelationGetReplicaIndex(Relation relation)
void ProcessConfigFile(GucContext context)
static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepRelation *remoterel)
static void apply_handle_insert(StringInfo s)
#define dlist_foreach_modify(iter, lhead)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
static dlist_head lsn_mapping
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
bool equal(const void *a, const void *b)
#define RelationGetDescr(relation)
void process_syncing_tables(XLogRecPtr current_lsn)
static void apply_handle_update_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, LogicalRepRelMapEntry *relmapentry)
TimestampTz GetCurrentTimestamp(void)
static void changes_filename(char *path, Oid subid, TransactionId xid)
static void apply_handle_commit_internal(StringInfo s, LogicalRepCommitData *commit_data)
static void apply_handle_stream_abort(StringInfo s)
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
ResultRelInfo * resultRelInfo
LogicalRepRelMapEntry * rel
#define TupleDescAttr(tupdesc, i)
#define walrcv_identify_system(conn, primary_tli)
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void CommitTransactionCommand(void)
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
XLogRecPtr XactLastCommitEnd
void BufFileTruncateShared(BufFile *file, int fileno, off_t offset)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
StringInfo makeStringInfo(void)
const TupleTableSlotOps TTSOpsVirtual
#define walrcv_receive(conn, buffer, wait_fd)
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Expr * expression_planner(Expr *expr)
#define walrcv_server_version(conn)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
TimestampTz last_send_time
static void apply_handle_stream_start(StringInfo s)
#define walrcv_startstreaming(conn, options)
XLogRecPtr replorigin_session_get_progress(bool flush)
int errcode(int sqlerrcode)
#define LOGICALREP_COLUMN_TEXT
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
#define WL_SOCKET_READABLE
#define FirstLowInvalidHeapAttributeNumber
void MemoryContextReset(MemoryContext context)
XLogRecPtr GetFlushRecPtr(void)
void PopActiveSnapshot(void)
#define TRUNCATE_REL_CONTEXT_CASCADING
#define RelationIsLogicallyLogged(relation)
StringInfoData * colvalues
static void subxact_filename(char *path, Oid subid, TransactionId xid)
void pgstat_report_stat(bool disconnect)
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
void replorigin_session_setup(RepOriginId node)
TransactionId subxact_last
struct SlotErrCallbackArg SlotErrCallbackArg
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
bool IsLogicalWorker(void)
void(* callback)(void *arg)
int wal_receiver_status_interval
List * lappend_oid(List *list, Oid datum)
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
struct ErrorContextCallback * previous
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
Snapshot GetTransactionSnapshot(void)
#define OidIsValid(objectId)
#define LOGICALREP_COLUMN_BINARY
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
static int fd(const char *x, int i)
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
void BufFileClose(BufFile *file)
void ResetLatch(Latch *latch)
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
void EvalPlanQualEnd(EPQState *epqstate)
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
ErrorContextCallback * error_context_stack
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
void FreeExecutorState(EState *estate)
Subscription * GetSubscription(Oid subid, bool missing_ok)
#define GetPerTupleExprContext(estate)
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
static StringInfoData reply_message
#define LSN_FORMAT_ARGS(lsn)
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
#define dlist_container(type, membername, ptr)
#define LOGICALREP_COLUMN_UNCHANGED
void pfree(void *pointer)
#define dlist_tail_element(type, membername, lhead)
static void stream_cleanup_files(Oid subid, TransactionId xid)
LogicalRepWorker * MyLogicalRepWorker
static bool ensure_transaction(void)
struct ApplySubXactData ApplySubXactData
LogicalRepRelation remoterel
#define NAPTIME_PER_CYCLE
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_extra, List *relids_logged, DropBehavior behavior, bool restart_seqs)
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
static void * list_nth(const List *list, int n)
static void apply_handle_stream_stop(StringInfo s)
bool in_remote_transaction
void fill_extraUpdatedCols(RangeTblEntry *target_rte, Relation target_relation)
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
bool in_streamed_transaction
TupleTableSlot * ri_PartitionTupleSlot
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
static void slot_getallattrs(TupleTableSlot *slot)
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
static bool am_tablesync_worker(void)
static void apply_handle_delete(StringInfo s)
#define ALLOCSET_DEFAULT_SIZES
BufFile * BufFileCreateShared(SharedFileSet *fileset, const char *name)
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
XLogRecPtr replorigin_session_origin_lsn
void logicalrep_worker_attach(int slot)
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
void PushActiveSnapshot(Snapshot snap)
TimestampTz replorigin_session_origin_timestamp
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
List * es_opened_result_relations
int errcode_for_file_access(void)
#define InvalidTransactionId
SharedFileSet * stream_fileset
#define RelationGetRelationName(relation)
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
FormData_pg_attribute * Form_pg_attribute
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
void resetStringInfo(StringInfo str)
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
MemoryContext CurrentMemoryContext
static void apply_handle_begin(StringInfo s)
RepOriginId replorigin_create(char *roname)
static void dlist_delete(dlist_node *node)
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel, ResultRelInfo **resultRelInfo)
Oid get_atttype(Oid relid, AttrNumber attnum)
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
BufFile * BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode)
MemoryContext TopMemoryContext
EState * CreateExecutorState(void)
List * lappend_int(List *list, int datum)
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
List * lappend(List *list, void *datum)
MemoryContext ApplyContext
void initStringInfo(StringInfo str)
#define DLIST_STATIC_INIT(name)
static void apply_handle_commit(StringInfo s)
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
static void apply_handle_tuple_routing(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, LogicalRepRelMapEntry *relmapentry, CmdType operation)
#define MemoryContextResetAndDeleteChildren(ctx)
TupleDesc tts_tupleDescriptor
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
Node * build_column_default(Relation rel, int attrno)
static void apply_handle_stream_commit(StringInfo s)
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
static void apply_handle_update(StringInfo s)
void CommandCounterIncrement(void)
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
static void apply_handle_relation(StringInfo s)
#define TimestampTzPlusMilliseconds(tz, ms)
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
SharedFileSet * subxact_fileset
#define ereport(elevel,...)
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
static MemoryContext LogicalStreamingContext
struct SubXactInfo SubXactInfo
pqsigfunc pqsignal(int signum, pqsigfunc handler)
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
void AfterTriggerBeginQuery(void)
static void apply_dispatch(StringInfo s)
static void maybe_reread_subscription(void)
TimestampTz last_recv_time
static MemoryContext ApplyMessageContext
bool list_member_oid(const List *list, Oid datum)
void SharedFileSetDeleteAll(SharedFileSet *fileset)
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
#define Assert(condition)
RepOriginId replorigin_session_origin
#define RELATION_IS_OTHER_TEMP(relation)
static void apply_handle_origin(StringInfo s)
void StartTransactionCommand(void)
void load_file(const char *filename, bool restricted)
static bool dlist_is_empty(dlist_head *head)
static void slot_store_error_callback(void *arg)
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
bool IsTransactionState(void)
Bitmapset * bms_add_member(Bitmapset *a, int x)
#define walrcv_send(conn, buffer, nbytes)
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
void * repalloc(void *pointer, Size size)
#define GetPerTupleMemoryContext(estate)
void FreeSubscription(Subscription *sub)
static void finish_estate(EState *estate)
char * logicalrep_typmap_gettypname(Oid remoteid)
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
static void subxact_info_write(Oid subid, TransactionId xid)
void AfterTriggerEndQuery(EState *estate)
void SetCurrentStatementStartTimestamp(void)
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
TupleConversionMap * ri_RootToPartitionMap
int errmsg(const char *fmt,...)
void pgstat_report_activity(BackendState state, const char *cmd_str)
char * MemoryContextStrdup(MemoryContext context, const char *string)
static void stream_open_file(Oid subid, TransactionId xid, bool first)
static ApplySubXactData subxact_data
static BufFile * stream_fd
static void LogicalRepApplyLoop(XLogRecPtr last_received)
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
volatile sig_atomic_t ConfigReloadPending
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
static void cleanup_subxact_info(void)
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
struct FlushPosition FlushPosition
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
#define CHECK_FOR_INTERRUPTS()
union WalRcvStreamOptions::@103 proto
#define TRUNCATE_REL_CONTEXT_NORMAL
CommandId GetCurrentCommandId(bool used)
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
void BufFileWrite(BufFile *file, void *ptr, size_t size)
#define TransactionIdIsValid(xid)
Relation table_open(Oid relationId, LOCKMODE lockmode)
static color newsub(struct colormap *cm, color co)
char * get_rel_name(Oid relid)
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
#define RelationGetRelid(relation)
static TransactionId stream_xid
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Datum now(PG_FUNCTION_ARGS)
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Oid RelationGetPrimaryKeyIndex(Relation relation)
struct StreamXidHash StreamXidHash
void logicalrep_typmap_update(LogicalRepTyp *remotetyp)
void ApplyWorkerMain(Datum main_arg)
void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid)
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
#define WL_EXIT_ON_PM_DEATH
#define EvalPlanQualSetSlot(epqstate, slot)
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
static void apply_handle_insert_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot)
void BackgroundWorkerUnblockSignals(void)
static void apply_handle_truncate(StringInfo s)
#define walrcv_connect(conninfo, logical, appname, err)
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)