PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
logicalworker.h File Reference
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

void ApplyWorkerMain (Datum main_arg)
 

Function Documentation

void ApplyWorkerMain ( Datum  main_arg)

Definition at line 1308 of file worker.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate(), ApplyLoop(), Assert, BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), BackgroundWorker::bgw_name, CacheMemoryContext, CacheRegisterSyscacheCallback(), CommitTransactionCommand(), Subscription::conninfo, CreateCacheMemoryContext(), CurrentResourceOwner, DatumGetObjectId, LogicalRepWorker::dbid, DEBUG1, elog, Subscription::enabled, ereport, errmsg(), ERROR, GetCurrentTimestamp(), GetSubscription(), LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), LOG, WalRcvStreamOptions::logical, LOGICALREP_PROTO_VERSION_NUM, logicalrep_worker_attach(), logicalrep_worker_sigterm(), MemoryContextSwitchTo(), MyBgworkerEntry, MyLogicalRepWorker, MySubscriptionValid, Subscription::name, NAMEDATALEN, NULL, Subscription::oid, OidIsValid, options, PGC_S_OVERRIDE, PGC_S_SESSION, PGC_SUSET, PGC_USERSET, pqsignal(), proc_exit(), WalRcvStreamOptions::proto, Subscription::publications, replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), LogicalRepWorker::reply_time, ResourceOwnerCreate(), server_version, SetConfigOption(), Subscription::slotname, WalRcvStreamOptions::slotname, snprintf(), WalRcvStreamOptions::startpoint, StartTransactionCommand(), LogicalRepWorker::subid, subscription_change_cb(), SUBSCRIPTIONOID, LogicalRepWorker::userid, walrcv_connect, walrcv_disconnect, walrcv_identify_system, and walrcv_startstreaming.

Referenced by logicalrep_worker_launch().

1309 {
1310  int worker_slot = DatumGetObjectId(main_arg);
1311  MemoryContext oldctx;
1312  char originname[NAMEDATALEN];
1313  RepOriginId originid;
1314  XLogRecPtr origin_startpos;
1315  char *err;
1316  int server_version;
1317  TimeLineID startpointTLI;
1319 
1320  /* Attach to slot */
1321  logicalrep_worker_attach(worker_slot);
1322 
1323  /* Setup signal handling */
1326 
1327  /* Initialise stats to a sanish value */
1330 
1331  /* Make it easy to identify our processes. */
1332  SetConfigOption("application_name", MyBgworkerEntry->bgw_name,
1334 
1335  /* Load the libpq-specific functions */
1336  load_file("libpqwalreceiver", false);
1337 
1340  "logical replication apply");
1341 
1342  /* Run as replica session replication role. */
1343  SetConfigOption("session_replication_role", "replica",
1345 
1346  /* Connect to our database. */
1349 
1350  /* Load the subscription into persistent memory context. */
1353  "ApplyCacheContext",
1358  MySubscriptionValid = true;
1359  MemoryContextSwitchTo(oldctx);
1360 
1361  if (!MySubscription->enabled)
1362  {
1363  ereport(LOG,
1364  (errmsg("logical replication worker for subscription \"%s\" will not "
1365  "start because the subscription was disabled during startup",
1366  MySubscription->name)));
1367 
1368  proc_exit(0);
1369  }
1370 
1371  /* Keep us informed about subscription changes. */
1374  (Datum) 0);
1375 
1376  ereport(LOG,
1377  (errmsg("logical replication apply for subscription \"%s\" has started",
1378  MySubscription->name)));
1379 
1380  /* Setup replication origin tracking. */
1381  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1382  originid = replorigin_by_name(originname, true);
1383  if (!OidIsValid(originid))
1384  originid = replorigin_create(originname);
1385  replorigin_session_setup(originid);
1386  replorigin_session_origin = originid;
1387  origin_startpos = replorigin_session_get_progress(false);
1388 
1390 
1391  /* Connect to the origin and start the replication. */
1392  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1395  MySubscription->name, &err);
1396  if (wrconn == NULL)
1397  ereport(ERROR,
1398  (errmsg("could not connect to the publisher: %s", err)));
1399 
1400  /*
1401  * We don't really use the output identify_system for anything
1402  * but it does some initializations on the upstream so let's still
1403  * call it.
1404  */
1405  (void) walrcv_identify_system(wrconn, &startpointTLI, &server_version);
1406 
1407  /* Build logical replication streaming options. */
1408  options.logical = true;
1409  options.startpoint = origin_startpos;
1410  options.slotname = MySubscription->slotname;
1411  options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1412  options.proto.logical.publication_names = MySubscription->publications;
1413 
1414  /* Start streaming from the slot. */
1415  walrcv_startstreaming(wrconn, &options);
1416 
1417  /* Run the main loop. */
1418  ApplyLoop();
1419 
1421 
1422  /* We should only get here if we received SIGTERM */
1423  proc_exit(0);
1424 }
Subscription * MySubscription
Definition: worker.c:108
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
WalReceiverConn * wrconn
Definition: worker.c:106
#define DEBUG1
Definition: elog.h:25
static MemoryContext ApplyCacheContext
Definition: worker.c:104
uint32 TimeLineID
Definition: xlogdefs.h:45
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
#define DatumGetObjectId(X)
Definition: postgres.h:508
void CommitTransactionCommand(void)
Definition: xact.c:2745
union WalRcvStreamOptions::@53 proto
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
TimestampTz last_send_time
uint16 RepOriginId
Definition: xlogdefs.h:51
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:219
void proc_exit(int code)
Definition: ipc.c:99
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1107
BackgroundWorker * MyBgworkerEntry
Definition: postmaster.c:189
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:980
#define LOG
Definition: elog.h:26
#define OidIsValid(objectId)
Definition: c.h:534
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:206
#define NAMEDATALEN
Subscription * GetSubscription(Oid subid, bool missing_ok)
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:1300
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:59
#define ERROR
Definition: elog.h:43
Definition: guc.h:75
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:145
#define walrcv_identify_system(conn, primary_tli, server_version)
Definition: walreceiver.h:215
void logicalrep_worker_attach(int slot)
Definition: launcher.c:407
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:6629
XLogRecPtr startpoint
Definition: walreceiver.h:144
List * publications
RepOriginId replorigin_create(char *roname)
Definition: origin.c:235
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid)
Definition: postmaster.c:5468
#define ereport(elevel, rest)
Definition: elog.h:122
static char ** options
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:440
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1381
uintptr_t Datum
Definition: postgres.h:374
void logicalrep_worker_sigterm(SIGNAL_ARGS)
Definition: launcher.c:457
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
TimestampTz last_recv_time
#define NULL
Definition: c.h:226
uint64 XLogRecPtr
Definition: xlogdefs.h:21
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:90
#define Assert(condition)
Definition: c.h:671
RepOriginId replorigin_session_origin
Definition: origin.c:150
void StartTransactionCommand(void)
Definition: xact.c:2675
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:133
static int server_version
Definition: pg_dumpall.c:77
void CreateCacheMemoryContext(void)
Definition: catcache.c:525
#define walrcv_disconnect(conn)
Definition: walreceiver.h:231
bool MySubscriptionValid
Definition: worker.c:109
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define elog
Definition: elog.h:219
static void ApplyLoop(void)
Definition: worker.c:915
TimestampTz reply_time
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
MemoryContext CacheMemoryContext
Definition: mcxt.c:46
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5497
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:209