130 #include <sys/stat.h>
188 #include "utils/fmgroids.h"
199 #define NAPTIME_PER_CYCLE 1000
277 #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
385 return (rel->
state == SUBREL_STATE_READY ||
386 (rel->
state == SUBREL_STATE_SYNCDONE &&
456 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
457 errmsg_internal(
"invalid transaction ID in streamed replication transaction")));
563 int num_phys_attrs = desc->
natts;
577 defmap = (
int *)
palloc(num_phys_attrs *
sizeof(
int));
600 defmap[num_defaults] =
attnum;
605 for (
i = 0;
i < num_defaults;
i++)
626 for (
i = 0;
i < natts;
i++)
631 if (!att->attisdropped && remoteattnum >= 0)
635 Assert(remoteattnum < tupleData->ncols);
648 typioparam, att->atttypmod);
665 typioparam, att->atttypmod);
670 (
errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
671 errmsg(
"incorrect binary data format in logical replication column %d",
738 for (
i = 0;
i < natts;
i++)
743 if (remoteattnum < 0)
746 Assert(remoteattnum < tupleData->ncols);
763 typioparam, att->atttypmod);
780 typioparam, att->atttypmod);
785 (
errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
786 errmsg(
"incorrect binary data format in logical replication column %d",
840 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
841 errmsg_internal(
"incorrect commit LSN %X/%X in commit message (expected %X/%X)",
865 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
866 errmsg_internal(
"tablesync worker received a BEGIN PREPARE message")));
926 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
927 errmsg_internal(
"incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
1079 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1085 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1086 errmsg_internal(
"tablesync worker received a STREAM PREPARE message")));
1091 elog(
DEBUG1,
"received prepare for streamed transaction %u", prepare_data.
xid);
1141 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1155 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1175 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1176 errmsg_internal(
"invalid transaction ID in streamed replication transaction")));
1220 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1256 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1355 char *buffer = NULL;
1373 elog(
DEBUG1,
"replaying changes from file \"%s\"", path);
1414 if (nbytes !=
sizeof(
len))
1417 errmsg(
"could not read from streaming transaction's changes file \"%s\": %m",
1421 elog(
ERROR,
"incorrect length %d in streaming transaction's changes file \"%s\"",
1431 errmsg(
"could not read from streaming transaction's changes file \"%s\": %m",
1449 if (nchanges % 1000 == 0)
1450 elog(
DEBUG1,
"replayed %d changes from file \"%s\"",
1459 elog(
DEBUG1,
"replayed %d (all) changes from file \"%s\"",
1476 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1482 elog(
DEBUG1,
"received commit for streamed transaction %u", xid);
1633 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1634 errmsg(
"\"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1759 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1760 errmsg(
"publisher did not send replica identity column "
1761 "expected by the logical replication target relation \"%s.%s\"",
1766 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1767 errmsg(
"logical replication target relation \"%s.%s\" has "
1768 "neither REPLICA IDENTITY index nor PRIMARY "
1769 "KEY and published relation does not have "
1770 "REPLICA IDENTITY FULL",
1844 if (!att->attisdropped && remoteattnum >= 0)
1860 has_oldtup ? &oldtup : &newtup);
1869 remoteslot, &newtup);
1905 remoteslot, &localslot);
1936 "logical replication did not find row to be updated "
1937 "in replication target relation \"%s\"",
2042 remoteslot, &localslot);
2062 "logical replication did not find row to be deleted "
2063 "in replication target relation \"%s\"",
2098 (remoterel->
replident == REPLICA_IDENTITY_FULL));
2103 remoteslot, *localslot);
2106 remoteslot, *localslot);
2147 Assert(remoteslot != NULL);
2150 remoteslot, estate);
2151 Assert(partrelinfo != NULL);
2160 if (remoteslot_part == NULL)
2171 remoteslot_part =
ExecCopySlot(remoteslot_part, remoteslot);
2212 remoteslot_part, &localslot);
2222 "logical replication did not find row to be updated "
2223 "in replication target relation's partition \"%s\"",
2241 if (!partrel->
rd_rel->relispartition ||
2261 localslot, remoteslot_part);
2284 remoteslot_part, remoteslot);
2288 remoteslot =
ExecCopySlot(remoteslot, remoteslot_part);
2299 Assert(partrelinfo_new != partrelinfo);
2314 if (remoteslot_part == NULL)
2338 elog(
ERROR,
"unrecognized CmdType: %d", (
int) operation);
2351 bool cascade =
false;
2352 bool restart_seqs =
false;
2374 foreach(lc, remote_relids)
2390 remote_rels =
lappend(remote_rels, rel);
2408 foreach(child, children)
2430 rels =
lappend(rels, childrel);
2431 part_rels =
lappend(part_rels, childrel);
2435 relids_logged =
lappend_oid(relids_logged, childrelid);
2450 foreach(lc, remote_rels)
2456 foreach(lc, part_rels)
2569 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
2570 errmsg(
"invalid logical replication message type \"%c\"",
action)));
2592 bool *have_pending_txes)
2623 *have_pending_txes =
true;
2673 bool ping_sent =
false;
2682 "ApplyMessageContext",
2690 "LogicalStreamingContext",
2711 bool endofstream =
false;
2734 (
errmsg(
"data stream from publisher has ended")));
2767 if (last_received < start_lsn)
2768 last_received = start_lsn;
2770 if (last_received < end_lsn)
2771 last_received = end_lsn;
2781 bool reply_requested;
2787 if (last_received < end_lsn)
2788 last_received = end_lsn;
2867 bool requestReply =
false;
2884 (
errcode(ERRCODE_CONNECTION_FAILURE),
2885 errmsg(
"terminating logical replication worker due to timeout")));
2894 requestReply =
true;
2941 bool have_pending_txes;
2951 if (recvpos < last_recvpos)
2952 recvpos = last_recvpos;
2960 if (!have_pending_txes)
2961 flushpos = writepos = recvpos;
2963 if (writepos < last_writepos)
2964 writepos = last_writepos;
2966 if (flushpos < last_flushpos)
2967 flushpos = last_flushpos;
2973 writepos == last_writepos &&
2974 flushpos == last_flushpos &&
2997 elog(
DEBUG2,
"sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
3006 if (recvpos > last_recvpos)
3007 last_recvpos = recvpos;
3008 if (writepos > last_writepos)
3009 last_writepos = writepos;
3010 if (flushpos > last_flushpos)
3011 last_flushpos = flushpos;
3022 bool started_tx =
false;
3047 (
errmsg(
"logical replication apply worker for subscription \"%s\" will "
3048 "stop because the subscription was removed",
3058 (
errmsg(
"logical replication apply worker for subscription \"%s\" will "
3059 "stop because the subscription was disabled",
3084 (
errmsg(
"logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
3093 elog(
ERROR,
"subscription %u changed unexpectedly",
3208 errmsg(
"could not read from streaming transaction's subxact file \"%s\": %m",
3230 errmsg(
"could not read from streaming transaction's subxact file \"%s\": %m",
3277 if (subxacts[
i - 1].xid == xid)
3379 elog(
DEBUG1,
"opening file \"%s\" for streamed changes", path);
3401 path, O_RDWR,
false);
3487 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
3490 snprintf(gid, szgid,
"pg_gid_%u_%u", subid, xid);
3504 char *syncslotname = NULL;
3534 pfree(syncslotname);
3579 char *myslotname = NULL;
3629 (
errmsg(
"logical replication apply worker for subscription %u will not "
3630 "start because the subscription was removed during startup",
3641 (
errmsg(
"logical replication apply worker for subscription \"%s\" will not "
3642 "start because the subscription was disabled during startup",
3659 (
errmsg(
"logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
3663 (
errmsg(
"logical replication apply worker for subscription \"%s\" has started",
3669 elog(
DEBUG1,
"connecting to publisher using connection string \"%s\"",
3683 sizeof(originname));
3703 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3704 errmsg(
"subscription has no replication slot set")));
3721 (
errcode(ERRCODE_CONNECTION_FAILURE),
3722 errmsg(
"could not connect to the publisher: %s", err)));
3748 options.startpoint = origin_startpos;
3749 options.slotname = myslotname;
3752 options.proto.logical.proto_version =
3760 options.proto.logical.twophase =
false;
3777 options.proto.logical.twophase =
true;
3791 (
errmsg(
"logical replication apply worker for subscription \"%s\" two_phase is %s",
3840 errmsg(
"logical replication subscription \"%s\" has been disabled due to an error",
3879 errmsg(
"start skipping logical replication transaction finished at %X/%X",
3893 (
errmsg(
"done skipping logical replication transaction finished at %X/%X",
3915 bool started_tx =
false;
3955 if (subform->subskiplsn == myskiplsn)
3957 bool nulls[Natts_pg_subscription];
3958 bool replaces[Natts_pg_subscription];
3962 memset(nulls,
false,
sizeof(nulls));
3963 memset(replaces,
false,
sizeof(replaces));
3967 replaces[Anum_pg_subscription_subskiplsn - 1] =
true;
3973 if (myskiplsn != finish_lsn)
3976 errdetail(
"Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X",
3999 if (errarg->
rel == NULL)
4002 errcontext(
"processing remote data for replication origin \"%s\" during \"%s\"",
4006 errcontext(
"processing remote data for replication origin \"%s\" during \"%s\" in transaction %u",
4011 errcontext(
"processing remote data for replication origin \"%s\" during \"%s\" in transaction %u finished at %X/%X",
4018 errcontext(
"processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" in transaction %u finished at %X/%X",
4026 errcontext(
"processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u finished at %X/%X",
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
static void subxact_filename(char *path, Oid subid, TransactionId xid)
static void begin_replication_step(void)
static void end_replication_step(void)
static void cleanup_subxact_info(void)
static void apply_handle_stream_prepare(StringInfo s)
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
static void maybe_reread_subscription(void)
static void subxact_info_add(TransactionId xid)
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
static MemoryContext ApplyMessageContext
static void stream_cleanup_files(Oid subid, TransactionId xid)
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
static void apply_handle_type(StringInfo s)
static void apply_handle_truncate(StringInfo s)
static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup)
static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
static void start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
struct ApplyExecutionData ApplyExecutionData
static void changes_filename(char *path, Oid subid, TransactionId xid)
static Oid GetRelationIdentityOrPK(Relation rel)
static BufFile * stream_fd
static void apply_dispatch(StringInfo s)
static void apply_handle_update(StringInfo s)
static void apply_handle_stream_commit(StringInfo s)
static void stop_skipping_changes(void)
struct ApplySubXactData ApplySubXactData
#define NAPTIME_PER_CYCLE
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
static void apply_handle_commit_prepared(StringInfo s)
static void LogicalRepApplyLoop(XLogRecPtr last_received)
bool IsLogicalWorker(void)
static ApplySubXactData subxact_data
static void DisableSubscriptionAndExit(void)
static void store_flush_position(XLogRecPtr remote_lsn)
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
static ApplyErrorCallbackArg apply_error_callback_arg
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
bool in_remote_transaction
static XLogRecPtr skip_xact_finish_lsn
static void apply_handle_delete(StringInfo s)
#define is_skipping_changes()
static void stream_write_change(char action, StringInfo s)
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
static void apply_handle_begin(StringInfo s)
static dlist_head lsn_mapping
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
static void finish_edata(ApplyExecutionData *edata)
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
static void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
static void start_apply(XLogRecPtr origin_startpos)
static void apply_handle_commit(StringInfo s)
static void apply_handle_stream_abort(StringInfo s)
static void apply_handle_relation(StringInfo s)
struct ApplyErrorCallbackArg ApplyErrorCallbackArg
MemoryContext ApplyContext
static void subxact_info_write(Oid subid, TransactionId xid)
static void TargetPrivilegesCheck(Relation rel, AclMode mode)
static void apply_handle_prepare(StringInfo s)
static void apply_handle_rollback_prepared(StringInfo s)
static void apply_handle_stream_stop(StringInfo s)
static void apply_handle_origin(StringInfo s)
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
WalReceiverConn * LogRepWorkerWalRcvConn
static XLogRecPtr remote_final_lsn
static bool MySubscriptionValid
static MemoryContext LogicalStreamingContext
static void stream_open_file(Oid subid, TransactionId xid, bool first)
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
static bool in_streamed_transaction
struct SubXactInfo SubXactInfo
static void apply_handle_begin_prepare(StringInfo s)
struct FlushPosition FlushPosition
void ApplyWorkerMain(Datum main_arg)
static void apply_handle_stream_start(StringInfo s)
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
static void apply_error_callback(void *arg)
Subscription * MySubscription
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
static void stream_close_file(void)
static TransactionId stream_xid
static void apply_handle_insert(StringInfo s)
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
static void subxact_info_read(Oid subid, TransactionId xid)
static void reset_apply_error_context_info(void)
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
void pgstat_report_activity(BackendState state, const char *cmd_str)
Bitmapset * bms_add_member(Bitmapset *a, int x)
static Datum values[MAXATTR]
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset)
void BufFileWrite(BufFile *file, void *ptr, size_t size)
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
void BufFileClose(BufFile *file)
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
#define OidIsValid(objectId)
void load_file(const char *filename, bool restricted)
int errmsg_internal(const char *fmt,...)
void EmitErrorReport(void)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
ErrorContextCallback * error_context_stack
void FlushErrorState(void)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
bool equal(const void *a, const void *b)
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
void EvalPlanQualEnd(EPQState *epqstate)
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
const TupleTableSlotOps TTSOpsVirtual
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
void ExecInitRangeTable(EState *estate, List *rangeTable)
EState * CreateExecutorState(void)
void FreeExecutorState(EState *estate)
#define GetPerTupleExprContext(estate)
#define GetPerTupleMemoryContext(estate)
#define EvalPlanQualSetSlot(epqstate, slot)
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
void FileSetInit(FileSet *fileset)
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
void ProcessConfigFile(GucContext context)
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
static bool dlist_is_empty(dlist_head *head)
static void dlist_delete(dlist_node *node)
#define dlist_tail_element(type, membername, lhead)
#define dlist_foreach_modify(iter, lhead)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
#define DLIST_STATIC_INIT(name)
#define dlist_container(type, membername, ptr)
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void AcceptInvalidationMessages(void)
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
void ResetLatch(Latch *latch)
#define WL_SOCKET_READABLE
#define WL_EXIT_ON_PM_DEATH
void logicalrep_worker_attach(int slot)
LogicalRepWorker * MyLogicalRepWorker
Assert(fmt[strlen(fmt) - 1] !='\n')
List * lappend(List *list, void *datum)
List * lappend_oid(List *list, Oid datum)
bool list_member_oid(const List *list, Oid datum)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
#define AccessExclusiveLock
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
#define LOGICALREP_COLUMN_UNCHANGED
@ LOGICAL_REP_MSG_TRUNCATE
@ LOGICAL_REP_MSG_STREAM_STOP
@ LOGICAL_REP_MSG_STREAM_PREPARE
@ LOGICAL_REP_MSG_STREAM_ABORT
@ LOGICAL_REP_MSG_BEGIN_PREPARE
@ LOGICAL_REP_MSG_STREAM_START
@ LOGICAL_REP_MSG_PREPARE
@ LOGICAL_REP_MSG_RELATION
@ LOGICAL_REP_MSG_MESSAGE
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
@ LOGICAL_REP_MSG_COMMIT_PREPARED
@ LOGICAL_REP_MSG_STREAM_COMMIT
#define LOGICALREP_PROTO_VERSION_NUM
#define LOGICALREP_COLUMN_BINARY
#define LOGICALREP_COLUMN_TEXT
char * get_rel_name(Oid relid)
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
void MemoryContextReset(MemoryContext context)
MemoryContext TopTransactionContext
void pfree(void *pointer)
MemoryContext TopMemoryContext
void * palloc0(Size size)
void * repalloc(void *pointer, Size size)
char * MemoryContextStrdup(MemoryContext context, const char *string)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define MemoryContextResetAndDeleteChildren(ctx)
#define RESUME_INTERRUPTS()
#define CHECK_FOR_INTERRUPTS()
#define HOLD_INTERRUPTS()
char * GetUserNameFromId(Oid roleid, bool noerr)
ObjectType get_relkind_objtype(char relkind)
TimestampTz replorigin_session_origin_timestamp
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
RepOriginId replorigin_create(const char *roname)
void replorigin_session_setup(RepOriginId node)
RepOriginId replorigin_session_origin
XLogRecPtr replorigin_session_get_progress(bool flush)
XLogRecPtr replorigin_session_origin_lsn
#define InvalidRepOriginId
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
FormData_pg_attribute * Form_pg_attribute
static PgChecksumMode mode
static int server_version
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
static void * list_nth(const List *list, int n)
void FreeSubscription(Subscription *sub)
void DisableSubscription(Oid subid)
Subscription * GetSubscription(Oid subid, bool missing_ok)
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_PENDING
#define LOGICALREP_TWOPHASE_STATE_ENABLED
FormData_pg_subscription * Form_pg_subscription
long pgstat_report_stat(bool force)
void pgstat_report_subscription_error(Oid subid, bool is_apply_error)
Expr * expression_planner(Expr *expr)
#define ObjectIdGetDatum(X)
void BackgroundWorkerUnblockSignals(void)
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
static int fd(const char *x, int i)
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
char * logicalrep_message_type(LogicalRepMsgType action)
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid)
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
static color newsub(struct colormap *cm, color co)
#define RelationGetRelid(relation)
#define RelationIsLogicallyLogged(relation)
#define RelationGetDescr(relation)
#define RelationGetRelationName(relation)
#define RELATION_IS_OTHER_TEMP(relation)
Oid RelationGetPrimaryKeyIndex(Relation relation)
Oid RelationGetReplicaIndex(Relation relation)
void fill_extraUpdatedCols(RangeTblEntry *target_rte, Relation target_relation)
Node * build_column_default(Relation rel, int attrno)
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Snapshot GetTransactionSnapshot(void)
void PopActiveSnapshot(void)
void PushActiveSnapshot(Snapshot snap)
void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
StringInfo makeStringInfo(void)
void resetStringInfo(StringInfo str)
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
void initStringInfo(StringInfo str)
LogicalRepMsgType command
LogicalRepRelMapEntry * rel
ResultRelInfo * targetRelInfo
PartitionTupleRouting * proute
ModifyTableState * mtstate
LogicalRepRelMapEntry * targetRel
TransactionId subxact_last
List * es_opened_result_relations
struct ErrorContextCallback * previous
void(* callback)(void *arg)
LogicalRepRelation remoterel
XLogRecPtr prepare_end_lsn
XLogRecPtr rollback_end_lsn
TimestampTz rollback_time
StringInfoData * colvalues
TimestampTz last_recv_time
TimestampTz last_send_time
ResultRelInfo * resultRelInfo
TupleTableSlot * ri_PartitionTupleSlot
TupleConversionMap * ri_RootToPartitionMap
TupleDesc tts_tupleDescriptor
#define FirstLowInvalidHeapAttributeNumber
#define SearchSysCacheCopy1(cacheId, key1)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs)
bool AllTablesyncsReady(void)
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
void ReplicationOriginNameForTablesync(Oid suboid, Oid relid, char *originname, int szorgname)
void process_syncing_tables(XLogRecPtr current_lsn)
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
void UpdateTwoPhaseState(Oid suboid, char new_state)
#define InvalidTransactionId
#define TransactionIdIsValid(xid)
void AfterTriggerEndQuery(EState *estate)
void AfterTriggerBeginQuery(void)
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
#define TupleDescAttr(tupdesc, i)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
static void slot_getallattrs(TupleTableSlot *slot)
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
void FinishPreparedTransaction(const char *gid, bool isCommit)
#define TimestampTzPlusMilliseconds(tz, ms)
@ WAIT_EVENT_LOGICAL_APPLY_MAIN
static StringInfoData reply_message
int wal_receiver_status_interval
#define walrcv_startstreaming(conn, options)
#define walrcv_connect(conninfo, logical, appname, err)
#define walrcv_send(conn, buffer, nbytes)
#define walrcv_server_version(conn)
#define walrcv_endstreaming(conn, next_tli)
#define walrcv_identify_system(conn, primary_tli)
#define walrcv_receive(conn, buffer, wait_fd)
static bool am_tablesync_worker(void)
bool PrepareTransactionBlock(const char *gid)
bool IsTransactionState(void)
void CommandCounterIncrement(void)
void StartTransactionCommand(void)
void SetCurrentStatementStartTimestamp(void)
void BeginTransactionBlock(void)
void CommitTransactionCommand(void)
void AbortOutOfAnyTransaction(void)
CommandId GetCurrentCommandId(bool used)
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
XLogRecPtr XactLastCommitEnd
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr