PostgreSQL Source Code  git master
logicalworker.h File Reference
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

void ApplyWorkerMain (Datum main_arg)
 
bool IsLogicalWorker (void)
 

Function Documentation

◆ ApplyWorkerMain()

void ApplyWorkerMain ( Datum  main_arg)

Definition at line 3329 of file worker.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AllTablesyncsReady(), am_tablesync_worker(), BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), Subscription::binary, CacheRegisterSyscacheCallback(), CommitTransactionCommand(), Subscription::conninfo, DatumGetInt32, LogicalRepWorker::dbid, DEBUG1, die, elog, Subscription::enabled, ereport, errcode(), errmsg(), ERROR, get_rel_name(), GetCurrentTimestamp(), GetSubscription(), invalidate_syncing_table_states(), LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), LOG, WalRcvStreamOptions::logical, LOGICALREP_PROTO_STREAM_VERSION_NUM, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM, LOGICALREP_PROTO_VERSION_NUM, LOGICALREP_TWOPHASE_STATE_DISABLED, LOGICALREP_TWOPHASE_STATE_ENABLED, LOGICALREP_TWOPHASE_STATE_PENDING, logicalrep_worker_attach(), LogicalRepApplyLoop(), LogicalRepSyncTableStart(), MemoryContextStrdup(), MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscriptionValid, Subscription::name, NAMEDATALEN, Subscription::oid, OidIsValid, options, pfree(), PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, pqsignal(), proc_exit(), WalRcvStreamOptions::proto, Subscription::publications, LogicalRepWorker::relid, replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), LogicalRepWorker::reply_time, server_version, SetConfigOption(), SIGHUP, SignalHandlerForConfigReload(), Subscription::slotname, WalRcvStreamOptions::slotname, snprintf, WalRcvStreamOptions::startpoint, StartTransactionCommand(), Subscription::stream, LogicalRepWorker::subid, subscription_change_cb(), SUBSCRIPTIONOID, SUBSCRIPTIONRELMAP, Subscription::synccommit, TopMemoryContext, Subscription::twophasestate, UpdateTwoPhaseState(), LogicalRepWorker::userid, walrcv_connect, walrcv_identify_system, walrcv_server_version, and walrcv_startstreaming.

3330 {
3331  int worker_slot = DatumGetInt32(main_arg);
3332  MemoryContext oldctx;
3333  char originname[NAMEDATALEN];
3334  XLogRecPtr origin_startpos;
3335  char *myslotname;
3337  int server_version;
3338 
3339  /* Attach to slot */
3340  logicalrep_worker_attach(worker_slot);
3341 
3342  /* Setup signal handling */
3344  pqsignal(SIGTERM, die);
3346 
3347  /*
3348  * We don't currently need any ResourceOwner in a walreceiver process, but
3349  * if we did, we could call CreateAuxProcessResourceOwner here.
3350  */
3351 
3352  /* Initialise stats to a sanish value */
3355 
3356  /* Load the libpq-specific functions */
3357  load_file("libpqwalreceiver", false);
3358 
3359  /* Run as replica session replication role. */
3360  SetConfigOption("session_replication_role", "replica",
3362 
3363  /* Connect to our database. */
3366  0);
3367 
3368  /*
3369  * Set always-secure search path, so malicious users can't redirect user
3370  * code (e.g. pg_index.indexprs).
3371  */
3372  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
3373 
3374  /* Load the subscription into persistent memory context. */
3376  "ApplyContext",
3380 
3382  if (!MySubscription)
3383  {
3384  ereport(LOG,
3385  (errmsg("logical replication apply worker for subscription %u will not "
3386  "start because the subscription was removed during startup",
3388  proc_exit(0);
3389  }
3390 
3391  MySubscriptionValid = true;
3392  MemoryContextSwitchTo(oldctx);
3393 
3394  if (!MySubscription->enabled)
3395  {
3396  ereport(LOG,
3397  (errmsg("logical replication apply worker for subscription \"%s\" will not "
3398  "start because the subscription was disabled during startup",
3399  MySubscription->name)));
3400 
3401  proc_exit(0);
3402  }
3403 
3404  /* Setup synchronous commit according to the user's wishes */
3405  SetConfigOption("synchronous_commit", MySubscription->synccommit,
3407 
3408  /* Keep us informed about subscription changes. */
3411  (Datum) 0);
3412 
3413  if (am_tablesync_worker())
3414  ereport(LOG,
3415  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
3417  else
3418  ereport(LOG,
3419  (errmsg("logical replication apply worker for subscription \"%s\" has started",
3420  MySubscription->name)));
3421 
3423 
3424  /* Connect to the origin and start the replication. */
3425  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
3427 
3428  if (am_tablesync_worker())
3429  {
3430  char *syncslotname;
3431 
3432  /* This is table synchronization worker, call initial sync. */
3433  syncslotname = LogicalRepSyncTableStart(&origin_startpos);
3434 
3435  /* allocate slot name in long-lived context */
3436  myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
3437 
3438  pfree(syncslotname);
3439  }
3440  else
3441  {
3442  /* This is main apply worker */
3443  RepOriginId originid;
3444  TimeLineID startpointTLI;
3445  char *err;
3446 
3447  myslotname = MySubscription->slotname;
3448 
3449  /*
3450  * This shouldn't happen if the subscription is enabled, but guard
3451  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
3452  * crash if slot is NULL.)
3453  */
3454  if (!myslotname)
3455  ereport(ERROR,
3456  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3457  errmsg("subscription has no replication slot set")));
3458 
3459  /* Setup replication origin tracking. */
3461  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
3462  originid = replorigin_by_name(originname, true);
3463  if (!OidIsValid(originid))
3464  originid = replorigin_create(originname);
3465  replorigin_session_setup(originid);
3466  replorigin_session_origin = originid;
3467  origin_startpos = replorigin_session_get_progress(false);
3469 
3471  MySubscription->name, &err);
3472  if (LogRepWorkerWalRcvConn == NULL)
3473  ereport(ERROR,
3474  (errcode(ERRCODE_CONNECTION_FAILURE),
3475  errmsg("could not connect to the publisher: %s", err)));
3476 
3477  /*
3478  * We don't really use the output identify_system for anything but it
3479  * does some initializations on the upstream so let's still call it.
3480  */
3481  (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
3482  }
3483 
3484  /*
3485  * Setup callback for syscache so that we know when something changes in
3486  * the subscription relation state.
3487  */
3490  (Datum) 0);
3491 
3492  /* Build logical replication streaming options. */
3493  options.logical = true;
3494  options.startpoint = origin_startpos;
3495  options.slotname = myslotname;
3496 
3498  options.proto.logical.proto_version =
3499  server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
3500  server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
3502 
3503  options.proto.logical.publication_names = MySubscription->publications;
3504  options.proto.logical.binary = MySubscription->binary;
3505  options.proto.logical.streaming = MySubscription->stream;
3506  options.proto.logical.twophase = false;
3507 
3508  if (!am_tablesync_worker())
3509  {
3510  /*
3511  * Even when the two_phase mode is requested by the user, it remains
3512  * as the tri-state PENDING until all tablesyncs have reached READY
3513  * state. Only then, can it become ENABLED.
3514  *
3515  * Note: If the subscription has no tables then leave the state as
3516  * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
3517  * work.
3518  */
3521  {
3522  /* Start streaming with two_phase enabled */
3523  options.proto.logical.twophase = true;
3525 
3530  }
3531  else
3532  {
3534  }
3535 
3536  ereport(DEBUG1,
3537  (errmsg("logical replication apply worker for subscription \"%s\" two_phase is %s.",
3542  "?")));
3543  }
3544  else
3545  {
3546  /* Start normal logical streaming replication. */
3548  }
3549 
3550  /* Run the main loop. */
3551  LogicalRepApplyLoop(origin_startpos);
3552 
3553  proc_exit(0);
3554 }
Subscription * MySubscription
Definition: worker.c:247
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:36
#define AllocSetContextCreate
Definition: memutils.h:173
#define DEBUG1
Definition: elog.h:25
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:245
uint32 TimeLineID
Definition: xlogdefs.h:59
#define DatumGetInt32(X)
Definition: postgres.h:516
#define LOGICALREP_TWOPHASE_STATE_DISABLED
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:412
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
void CommitTransactionCommand(void)
Definition: xact.c:2953
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:209
#define walrcv_server_version(conn)
Definition: walreceiver.h:414
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
TimestampTz last_send_time
uint16 RepOriginId
Definition: xlogdefs.h:65
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:418
union WalRcvStreamOptions::@104 proto
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1206
int errcode(int sqlerrcode)
Definition: elog.c:698
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:240
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1071
#define LOG
Definition: elog.h:26
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:920
#define OidIsValid(objectId)
Definition: c.h:710
#define LOGICALREP_TWOPHASE_STATE_ENABLED
#define NAMEDATALEN
Subscription * GetSubscription(Oid subid, bool missing_ok)
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:2951
void pfree(void *pointer)
Definition: mcxt.c:1169
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
#define ERROR
Definition: elog.h:46
Definition: guc.h:75
static bool am_tablesync_worker(void)
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
void logicalrep_worker_attach(int slot)
Definition: launcher.c:565
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:8110
#define LOGICALREP_TWOPHASE_STATE_PENDING
#define SIGHUP
Definition: win32_port.h:167
XLogRecPtr startpoint
Definition: walreceiver.h:170
MemoryContext TopMemoryContext
Definition: mcxt.c:48
MemoryContext ApplyContext
Definition: worker.c:240
static char ** options
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1498
uintptr_t Datum
Definition: postgres.h:411
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1255
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5752
#define ereport(elevel,...)
Definition: elog.h:157
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:37
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:38
TimestampTz last_recv_time
bool AllTablesyncsReady(void)
Definition: tablesync.c:1230
uint64 XLogRecPtr
Definition: xlogdefs.h:21
RepOriginId replorigin_session_origin
Definition: origin.c:154
void StartTransactionCommand(void)
Definition: xact.c:2852
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
static int server_version
Definition: pg_dumpall.c:83
bool MySubscriptionValid
Definition: worker.c:248
int errmsg(const char *fmt,...)
Definition: elog.c:909
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1286
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:2513
#define elog(elevel,...)
Definition: elog.h:232
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1899
#define snprintf
Definition: port.h:217
#define die(msg)
Definition: pg_test_fsync.c:97
TimestampTz reply_time
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:268
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5781
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:404

◆ IsLogicalWorker()

bool IsLogicalWorker ( void  )

Definition at line 3560 of file worker.c.

References MyLogicalRepWorker.

Referenced by ProcessInterrupts().

3561 {
3562  return MyLogicalRepWorker != NULL;
3563 }
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57