PostgreSQL Source Code  git master
logicalworker.h File Reference
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

void ApplyWorkerMain (Datum main_arg)
 
bool IsLogicalWorker (void)
 

Function Documentation

◆ ApplyWorkerMain()

void ApplyWorkerMain ( Datum  main_arg)

Definition at line 1984 of file worker.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, am_tablesync_worker(), BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), Subscription::binary, CacheRegisterSyscacheCallback(), CommitTransactionCommand(), Subscription::conninfo, DatumGetInt32, LogicalRepWorker::dbid, DEBUG1, die, elog, Subscription::enabled, ereport, errmsg(), ERROR, get_rel_name(), GetCurrentTimestamp(), GetSubscription(), invalidate_syncing_table_states(), LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), LOG, WalRcvStreamOptions::logical, LOGICALREP_PROTO_VERSION_NUM, logicalrep_worker_attach(), LogicalRepApplyLoop(), LogicalRepSyncTableStart(), MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscriptionValid, Subscription::name, NAMEDATALEN, Subscription::oid, OidIsValid, options, pfree(), PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, pqsignal(), proc_exit(), WalRcvStreamOptions::proto, pstrdup(), Subscription::publications, LogicalRepWorker::relid, replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), LogicalRepWorker::reply_time, SetConfigOption(), SIGHUP, SignalHandlerForConfigReload(), Subscription::slotname, WalRcvStreamOptions::slotname, snprintf, WalRcvStreamOptions::startpoint, StartTransactionCommand(), LogicalRepWorker::subid, subscription_change_cb(), SUBSCRIPTIONOID, SUBSCRIPTIONRELMAP, Subscription::synccommit, TopMemoryContext, LogicalRepWorker::userid, walrcv_connect, walrcv_identify_system, and walrcv_startstreaming.

1985 {
1986  int worker_slot = DatumGetInt32(main_arg);
1987  MemoryContext oldctx;
1988  char originname[NAMEDATALEN];
1989  XLogRecPtr origin_startpos;
1990  char *myslotname;
1992 
1993  /* Attach to slot */
1994  logicalrep_worker_attach(worker_slot);
1995 
1996  /* Setup signal handling */
1998  pqsignal(SIGTERM, die);
2000 
2001  /*
2002  * We don't currently need any ResourceOwner in a walreceiver process, but
2003  * if we did, we could call CreateAuxProcessResourceOwner here.
2004  */
2005 
2006  /* Initialise stats to a sanish value */
2009 
2010  /* Load the libpq-specific functions */
2011  load_file("libpqwalreceiver", false);
2012 
2013  /* Run as replica session replication role. */
2014  SetConfigOption("session_replication_role", "replica",
2016 
2017  /* Connect to our database. */
2020  0);
2021 
2022  /* Load the subscription into persistent memory context. */
2024  "ApplyContext",
2028 
2030  if (!MySubscription)
2031  {
2032  ereport(LOG,
2033  (errmsg("logical replication apply worker for subscription %u will not "
2034  "start because the subscription was removed during startup",
2036  proc_exit(0);
2037  }
2038 
2039  MySubscriptionValid = true;
2040  MemoryContextSwitchTo(oldctx);
2041 
2042  if (!MySubscription->enabled)
2043  {
2044  ereport(LOG,
2045  (errmsg("logical replication apply worker for subscription \"%s\" will not "
2046  "start because the subscription was disabled during startup",
2047  MySubscription->name)));
2048 
2049  proc_exit(0);
2050  }
2051 
2052  /* Setup synchronous commit according to the user's wishes */
2053  SetConfigOption("synchronous_commit", MySubscription->synccommit,
2055 
2056  /* Keep us informed about subscription changes. */
2059  (Datum) 0);
2060 
2061  if (am_tablesync_worker())
2062  ereport(LOG,
2063  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
2065  else
2066  ereport(LOG,
2067  (errmsg("logical replication apply worker for subscription \"%s\" has started",
2068  MySubscription->name)));
2069 
2071 
2072  /* Connect to the origin and start the replication. */
2073  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
2075 
2076  if (am_tablesync_worker())
2077  {
2078  char *syncslotname;
2079 
2080  /* This is table synchronization worker, call initial sync. */
2081  syncslotname = LogicalRepSyncTableStart(&origin_startpos);
2082 
2083  /* The slot name needs to be allocated in permanent memory context. */
2085  myslotname = pstrdup(syncslotname);
2086  MemoryContextSwitchTo(oldctx);
2087 
2088  pfree(syncslotname);
2089  }
2090  else
2091  {
2092  /* This is main apply worker */
2093  RepOriginId originid;
2094  TimeLineID startpointTLI;
2095  char *err;
2096 
2097  myslotname = MySubscription->slotname;
2098 
2099  /*
2100  * This shouldn't happen if the subscription is enabled, but guard
2101  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
2102  * crash if slot is NULL.)
2103  */
2104  if (!myslotname)
2105  ereport(ERROR,
2106  (errmsg("subscription has no replication slot set")));
2107 
2108  /* Setup replication origin tracking. */
2110  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
2111  originid = replorigin_by_name(originname, true);
2112  if (!OidIsValid(originid))
2113  originid = replorigin_create(originname);
2114  replorigin_session_setup(originid);
2115  replorigin_session_origin = originid;
2116  origin_startpos = replorigin_session_get_progress(false);
2118 
2120  &err);
2121  if (wrconn == NULL)
2122  ereport(ERROR,
2123  (errmsg("could not connect to the publisher: %s", err)));
2124 
2125  /*
2126  * We don't really use the output identify_system for anything but it
2127  * does some initializations on the upstream so let's still call it.
2128  */
2129  (void) walrcv_identify_system(wrconn, &startpointTLI);
2130 
2131  }
2132 
2133  /*
2134  * Setup callback for syscache so that we know when something changes in
2135  * the subscription relation state.
2136  */
2139  (Datum) 0);
2140 
2141  /* Build logical replication streaming options. */
2142  options.logical = true;
2143  options.startpoint = origin_startpos;
2144  options.slotname = myslotname;
2145  options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
2146  options.proto.logical.publication_names = MySubscription->publications;
2147  options.proto.logical.binary = MySubscription->binary;
2148 
2149  /* Start normal logical streaming replication. */
2150  walrcv_startstreaming(wrconn, &options);
2151 
2152  /* Run the main loop. */
2153  LogicalRepApplyLoop(origin_startpos);
2154 
2155  proc_exit(0);
2156 }
Subscription * MySubscription
Definition: worker.c:107
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
WalReceiverConn * wrconn
Definition: worker.c:105
#define AllocSetContextCreate
Definition: memutils.h:170
#define DEBUG1
Definition: elog.h:25
uint32 TimeLineID
Definition: xlogdefs.h:52
#define DatumGetInt32(X)
Definition: postgres.h:472
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:405
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
char * pstrdup(const char *in)
Definition: mcxt.c:1186
void CommitTransactionCommand(void)
Definition: xact.c:2947
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
TimestampTz last_send_time
uint16 RepOriginId
Definition: xlogdefs.h:58
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:411
union WalRcvStreamOptions::@104 proto
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1186
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1051
#define LOG
Definition: elog.h:26
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:816
#define OidIsValid(objectId)
Definition: c.h:651
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:209
#define NAMEDATALEN
Subscription * GetSubscription(Oid subid, bool missing_ok)
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:1977
void pfree(void *pointer)
Definition: mcxt.c:1056
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
#define ERROR
Definition: elog.h:43
Definition: guc.h:75
static bool am_tablesync_worker(void)
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
void logicalrep_worker_attach(int slot)
Definition: launcher.c:625
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:7714
#define SIGHUP
Definition: win32_port.h:153
XLogRecPtr startpoint
Definition: walreceiver.h:168
List * publications
RepOriginId replorigin_create(char *roname)
Definition: origin.c:240
MemoryContext TopMemoryContext
Definition: mcxt.c:44
MemoryContext ApplyContext
Definition: worker.c:103
static char ** options
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1434
uintptr_t Datum
Definition: postgres.h:367
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5739
#define ereport(elevel,...)
Definition: elog.h:144
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
TimestampTz last_recv_time
uint64 XLogRecPtr
Definition: xlogdefs.h:21
RepOriginId replorigin_session_origin
Definition: origin.c:154
void StartTransactionCommand(void)
Definition: xact.c:2846
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
bool MySubscriptionValid
Definition: worker.c:108
int errmsg(const char *fmt,...)
Definition: elog.c:824
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:1561
#define elog(elevel,...)
Definition: elog.h:214
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1840
#define snprintf
Definition: port.h:193
#define die(msg)
Definition: pg_test_fsync.c:96
TimestampTz reply_time
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:256
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5768
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:397

◆ IsLogicalWorker()

bool IsLogicalWorker ( void  )

Definition at line 2162 of file worker.c.

References MyLogicalRepWorker.

Referenced by ProcessInterrupts().

2163 {
2164  return MyLogicalRepWorker != NULL;
2165 }
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57