PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
logicalworker.h File Reference
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

void ApplyWorkerMain (Datum main_arg)
 
bool IsLogicalWorker (void)
 

Function Documentation

void ApplyWorkerMain ( Datum  main_arg)

Definition at line 1489 of file worker.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate(), am_tablesync_worker(), Assert, BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), CacheRegisterSyscacheCallback(), CommitTransactionCommand(), Subscription::conninfo, CurrentResourceOwner, DatumGetInt32, LogicalRepWorker::dbid, DEBUG1, die(), elog, Subscription::enabled, ereport, errmsg(), ERROR, get_rel_name(), GetCurrentTimestamp(), GetSubscription(), invalidate_syncing_table_states(), LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), LOG, WalRcvStreamOptions::logical, LOGICALREP_PROTO_VERSION_NUM, logicalrep_worker_attach(), logicalrep_worker_sighup(), LogicalRepApplyLoop(), LogicalRepSyncTableStart(), MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscriptionValid, Subscription::name, NAMEDATALEN, Subscription::oid, OidIsValid, options, pfree(), PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, pqsignal(), proc_exit(), WalRcvStreamOptions::proto, pstrdup(), Subscription::publications, LogicalRepWorker::relid, replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), LogicalRepWorker::reply_time, ResourceOwnerCreate(), server_version, SetConfigOption(), SIGHUP, Subscription::slotname, WalRcvStreamOptions::slotname, snprintf(), WalRcvStreamOptions::startpoint, StartTransactionCommand(), LogicalRepWorker::subid, subscription_change_cb(), SUBSCRIPTIONOID, SUBSCRIPTIONRELMAP, Subscription::synccommit, TopMemoryContext, LogicalRepWorker::userid, walrcv_connect, walrcv_identify_system, and walrcv_startstreaming.

1490 {
1491  int worker_slot = DatumGetInt32(main_arg);
1492  MemoryContext oldctx;
1493  char originname[NAMEDATALEN];
1494  XLogRecPtr origin_startpos;
1495  char *myslotname;
1497 
1498  /* Attach to slot */
1499  logicalrep_worker_attach(worker_slot);
1500 
1501  /* Setup signal handling */
1503  pqsignal(SIGTERM, die);
1505 
1506  /* Initialise stats to a sanish value */
1509 
1510  /* Load the libpq-specific functions */
1511  load_file("libpqwalreceiver", false);
1512 
1513  Assert(CurrentResourceOwner == NULL);
1515  "logical replication apply");
1516 
1517  /* Run as replica session replication role. */
1518  SetConfigOption("session_replication_role", "replica",
1520 
1521  /* Connect to our database. */
1524 
1525  /* Load the subscription into persistent memory context. */
1527  "ApplyContext",
1532  MySubscriptionValid = true;
1533  MemoryContextSwitchTo(oldctx);
1534 
1535  /* Setup synchronous commit according to the user's wishes */
1536  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1538 
1539  if (!MySubscription->enabled)
1540  {
1541  ereport(LOG,
1542  (errmsg("logical replication apply worker for subscription \"%s\" will not "
1543  "start because the subscription was disabled during startup",
1544  MySubscription->name)));
1545 
1546  proc_exit(0);
1547  }
1548 
1549  /* Keep us informed about subscription changes. */
1552  (Datum) 0);
1553 
1554  if (am_tablesync_worker())
1555  ereport(LOG,
1556  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
1558  else
1559  ereport(LOG,
1560  (errmsg("logical replication apply worker for subscription \"%s\" has started",
1561  MySubscription->name)));
1562 
1564 
1565  /* Connect to the origin and start the replication. */
1566  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1568 
1569  if (am_tablesync_worker())
1570  {
1571  char *syncslotname;
1572 
1573  /* This is table synchroniation worker, call initial sync. */
1574  syncslotname = LogicalRepSyncTableStart(&origin_startpos);
1575 
1576  /* The slot name needs to be allocated in permanent memory context. */
1578  myslotname = pstrdup(syncslotname);
1579  MemoryContextSwitchTo(oldctx);
1580 
1581  pfree(syncslotname);
1582  }
1583  else
1584  {
1585  /* This is main apply worker */
1586  RepOriginId originid;
1587  TimeLineID startpointTLI;
1588  char *err;
1589  int server_version;
1590 
1591  myslotname = MySubscription->slotname;
1592 
1593  /*
1594  * This shouldn't happen if the subscription is enabled, but guard
1595  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
1596  * crash if slot is NULL.)
1597  */
1598  if (!myslotname)
1599  ereport(ERROR,
1600  (errmsg("subscription has no replication slot set")));
1601 
1602  /* Setup replication origin tracking. */
1604  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1605  originid = replorigin_by_name(originname, true);
1606  if (!OidIsValid(originid))
1607  originid = replorigin_create(originname);
1608  replorigin_session_setup(originid);
1609  replorigin_session_origin = originid;
1610  origin_startpos = replorigin_session_get_progress(false);
1612 
1614  &err);
1615  if (wrconn == NULL)
1616  ereport(ERROR,
1617  (errmsg("could not connect to the publisher: %s", err)));
1618 
1619  /*
1620  * We don't really use the output identify_system for anything but it
1621  * does some initializations on the upstream so let's still call it.
1622  */
1623  (void) walrcv_identify_system(wrconn, &startpointTLI,
1624  &server_version);
1625 
1626  }
1627 
1628  /*
1629  * Setup callback for syscache so that we know when something changes in
1630  * the subscription relation state.
1631  */
1634  (Datum) 0);
1635 
1636  /* Build logical replication streaming options. */
1637  options.logical = true;
1638  options.startpoint = origin_startpos;
1639  options.slotname = myslotname;
1640  options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1641  options.proto.logical.publication_names = MySubscription->publications;
1642 
1643  /* Start normal logical streaming replication. */
1644  walrcv_startstreaming(wrconn, &options);
1645 
1646  /* Run the main loop. */
1647  LogicalRepApplyLoop(origin_startpos);
1648 
1649  proc_exit(0);
1650 }
Subscription * MySubscription
Definition: worker.c:112
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
WalReceiverConn * wrconn
Definition: worker.c:110
#define DEBUG1
Definition: elog.h:25
uint32 TimeLineID
Definition: xlogdefs.h:45
#define DatumGetInt32(X)
Definition: postgres.h:478
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
char * pstrdup(const char *in)
Definition: mcxt.c:1076
void CommitTransactionCommand(void)
Definition: xact.c:2744
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
TimestampTz last_send_time
uint16 RepOriginId
Definition: xlogdefs.h:51
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:253
void proc_exit(int code)
Definition: ipc.c:99
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1148
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1013
#define LOG
Definition: elog.h:26
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:815
#define OidIsValid(objectId)
Definition: c.h:532
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:211
#define NAMEDATALEN
Subscription * GetSubscription(Oid subid, bool missing_ok)
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:1468
void pfree(void *pointer)
Definition: mcxt.c:949
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:63
#define ERROR
Definition: elog.h:43
Definition: guc.h:75
static bool am_tablesync_worker(void)
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
#define walrcv_identify_system(conn, primary_tli, server_version)
Definition: walreceiver.h:249
void logicalrep_worker_attach(int slot)
Definition: launcher.c:612
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:6681
XLogRecPtr startpoint
Definition: walreceiver.h:146
List * publications
RepOriginId replorigin_create(char *roname)
Definition: origin.c:242
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid)
Definition: postmaster.c:5563
#define ereport(elevel, rest)
Definition: elog.h:122
MemoryContext TopMemoryContext
Definition: mcxt.c:43
MemoryContext ApplyContext
Definition: worker.c:108
static char ** options
union WalRcvStreamOptions::@97 proto
static void logicalrep_worker_sighup(SIGNAL_ARGS)
Definition: worker.c:1475
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1389
uintptr_t Datum
Definition: postgres.h:372
#define SIGHUP
Definition: win32.h:188
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
TimestampTz last_recv_time
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:681
RepOriginId replorigin_session_origin
Definition: origin.c:155
void StartTransactionCommand(void)
Definition: xact.c:2673
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:137
static int server_version
Definition: pg_dumpall.c:82
bool MySubscriptionValid
Definition: worker.c:113
int errmsg(const char *fmt,...)
Definition: elog.c:797
void die(SIGNAL_ARGS)
Definition: postgres.c:2652
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:1010
#define elog
Definition: elog.h:219
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1726
TimestampTz reply_time
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:271
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5592
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:243
bool IsLogicalWorker ( void  )

Definition at line 1656 of file worker.c.

References MyLogicalRepWorker.

Referenced by ProcessInterrupts().

1657 {
1658  return MyLogicalRepWorker != NULL;
1659 }
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:63