Definition at line 1582 of file worker.c.
References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, am_tablesync_worker(), BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), CacheRegisterSyscacheCallback(), CommitTransactionCommand(), Subscription::conninfo, DatumGetInt32, LogicalRepWorker::dbid, DEBUG1, die, elog, Subscription::enabled, ereport, errmsg(), ERROR, get_rel_name(), GetCurrentTimestamp(), GetSubscription(), invalidate_syncing_table_states(), LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), LOG, WalRcvStreamOptions::logical, LOGICALREP_PROTO_VERSION_NUM, logicalrep_worker_attach(), logicalrep_worker_sighup(), LogicalRepApplyLoop(), LogicalRepSyncTableStart(), MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscriptionValid, Subscription::name, NAMEDATALEN, Subscription::oid, OidIsValid, options, pfree(), PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, pqsignal(), proc_exit(), WalRcvStreamOptions::proto, pstrdup(), Subscription::publications, LogicalRepWorker::relid, replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), LogicalRepWorker::reply_time, SetConfigOption(), SIGHUP, Subscription::slotname, WalRcvStreamOptions::slotname, snprintf, WalRcvStreamOptions::startpoint, StartTransactionCommand(), LogicalRepWorker::subid, subscription_change_cb(), SUBSCRIPTIONOID, SUBSCRIPTIONRELMAP, Subscription::synccommit, TopMemoryContext, LogicalRepWorker::userid, walrcv_connect, walrcv_identify_system, and walrcv_startstreaming.
1631 (
errmsg(
"logical replication apply worker for subscription %u will not " 1632 "start because the subscription was removed during startup",
1643 (
errmsg(
"logical replication apply worker for subscription \"%s\" will not " 1644 "start because the subscription was disabled during startup",
1661 (
errmsg(
"logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
1665 (
errmsg(
"logical replication apply worker for subscription \"%s\" has started",
1671 elog(
DEBUG1,
"connecting to publisher using connection string \"%s\"",
1683 myslotname =
pstrdup(syncslotname);
1686 pfree(syncslotname);
1704 (
errmsg(
"subscription has no replication slot set")));
1721 (
errmsg(
"could not connect to the publisher: %s", err)));
Subscription * MySubscription
#define LOGICALREP_PROTO_VERSION_NUM
#define AllocSetContextCreate
TimestampTz GetCurrentTimestamp(void)
#define walrcv_identify_system(conn, primary_tli)
char * pstrdup(const char *in)
void CommitTransactionCommand(void)
union WalRcvStreamOptions::@106 proto
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
TimestampTz last_send_time
#define walrcv_startstreaming(conn, options)
XLogRecPtr replorigin_session_get_progress(bool flush)
void replorigin_session_setup(RepOriginId node)
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
#define OidIsValid(objectId)
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Subscription * GetSubscription(Oid subid, bool missing_ok)
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
void pfree(void *pointer)
LogicalRepWorker * MyLogicalRepWorker
static bool am_tablesync_worker(void)
#define ALLOCSET_DEFAULT_SIZES
void logicalrep_worker_attach(int slot)
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
RepOriginId replorigin_create(char *roname)
#define ereport(elevel, rest)
MemoryContext TopMemoryContext
MemoryContext ApplyContext
static void logicalrep_worker_sighup(SIGNAL_ARGS)
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
pqsigfunc pqsignal(int signum, pqsigfunc handler)
TimestampTz last_recv_time
RepOriginId replorigin_session_origin
void StartTransactionCommand(void)
void load_file(const char *filename, bool restricted)
int errmsg(const char *fmt,...)
static void LogicalRepApplyLoop(XLogRecPtr last_received)
char * get_rel_name(Oid relid)
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
void BackgroundWorkerUnblockSignals(void)
#define walrcv_connect(conninfo, logical, appname, err)