PostgreSQL Source Code  git master
logicalworker.h File Reference
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

void ApplyWorkerMain (Datum main_arg)
 
bool IsLogicalWorker (void)
 

Function Documentation

◆ ApplyWorkerMain()

void ApplyWorkerMain ( Datum  main_arg)

Definition at line 3555 of file worker.c.

3556 {
3557  int worker_slot = DatumGetInt32(main_arg);
3558  MemoryContext oldctx;
3559  char originname[NAMEDATALEN];
3560  XLogRecPtr origin_startpos = InvalidXLogRecPtr;
3561  char *myslotname = NULL;
3563  int server_version;
3564 
3565  /* Attach to slot */
3566  logicalrep_worker_attach(worker_slot);
3567 
3568  /* Setup signal handling */
3570  pqsignal(SIGTERM, die);
3572 
3573  /*
3574  * We don't currently need any ResourceOwner in a walreceiver process, but
3575  * if we did, we could call CreateAuxProcessResourceOwner here.
3576  */
3577 
3578  /* Initialise stats to a sanish value */
3581 
3582  /* Load the libpq-specific functions */
3583  load_file("libpqwalreceiver", false);
3584 
3585  /* Run as replica session replication role. */
3586  SetConfigOption("session_replication_role", "replica",
3588 
3589  /* Connect to our database. */
3592  0);
3593 
3594  /*
3595  * Set always-secure search path, so malicious users can't redirect user
3596  * code (e.g. pg_index.indexprs).
3597  */
3598  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
3599 
3600  /* Load the subscription into persistent memory context. */
3602  "ApplyContext",
3606 
3608  if (!MySubscription)
3609  {
3610  ereport(LOG,
3611  (errmsg("logical replication apply worker for subscription %u will not "
3612  "start because the subscription was removed during startup",
3614  proc_exit(0);
3615  }
3616 
3617  MySubscriptionValid = true;
3618  MemoryContextSwitchTo(oldctx);
3619 
3620  if (!MySubscription->enabled)
3621  {
3622  ereport(LOG,
3623  (errmsg("logical replication apply worker for subscription \"%s\" will not "
3624  "start because the subscription was disabled during startup",
3625  MySubscription->name)));
3626 
3627  proc_exit(0);
3628  }
3629 
3630  /* Setup synchronous commit according to the user's wishes */
3631  SetConfigOption("synchronous_commit", MySubscription->synccommit,
3633 
3634  /* Keep us informed about subscription changes. */
3637  (Datum) 0);
3638 
3639  if (am_tablesync_worker())
3640  ereport(LOG,
3641  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
3643  else
3644  ereport(LOG,
3645  (errmsg("logical replication apply worker for subscription \"%s\" has started",
3646  MySubscription->name)));
3647 
3649 
3650  /* Connect to the origin and start the replication. */
3651  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
3653 
3654  if (am_tablesync_worker())
3655  {
3656  start_table_sync(&origin_startpos, &myslotname);
3657 
3658  /*
3659  * Allocate the origin name in long-lived context for error context
3660  * message.
3661  */
3664  originname,
3665  sizeof(originname));
3667  originname);
3668  }
3669  else
3670  {
3671  /* This is main apply worker */
3672  RepOriginId originid;
3673  TimeLineID startpointTLI;
3674  char *err;
3675 
3676  myslotname = MySubscription->slotname;
3677 
3678  /*
3679  * This shouldn't happen if the subscription is enabled, but guard
3680  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
3681  * crash if slot is NULL.)
3682  */
3683  if (!myslotname)
3684  ereport(ERROR,
3685  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3686  errmsg("subscription has no replication slot set")));
3687 
3688  /* Setup replication origin tracking. */
3690  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
3691  originid = replorigin_by_name(originname, true);
3692  if (!OidIsValid(originid))
3693  originid = replorigin_create(originname);
3694  replorigin_session_setup(originid);
3695  replorigin_session_origin = originid;
3696  origin_startpos = replorigin_session_get_progress(false);
3698 
3700  MySubscription->name, &err);
3701  if (LogRepWorkerWalRcvConn == NULL)
3702  ereport(ERROR,
3703  (errcode(ERRCODE_CONNECTION_FAILURE),
3704  errmsg("could not connect to the publisher: %s", err)));
3705 
3706  /*
3707  * We don't really use the output identify_system for anything but it
3708  * does some initializations on the upstream so let's still call it.
3709  */
3710  (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
3711 
3712  /*
3713  * Allocate the origin name in long-lived context for error context
3714  * message.
3715  */
3717  originname);
3718  }
3719 
3720  /*
3721  * Setup callback for syscache so that we know when something changes in
3722  * the subscription relation state.
3723  */
3726  (Datum) 0);
3727 
3728  /* Build logical replication streaming options. */
3729  options.logical = true;
3730  options.startpoint = origin_startpos;
3731  options.slotname = myslotname;
3732 
3734  options.proto.logical.proto_version =
3738 
3739  options.proto.logical.publication_names = MySubscription->publications;
3740  options.proto.logical.binary = MySubscription->binary;
3741  options.proto.logical.streaming = MySubscription->stream;
3742  options.proto.logical.twophase = false;
3743 
3744  if (!am_tablesync_worker())
3745  {
3746  /*
3747  * Even when the two_phase mode is requested by the user, it remains
3748  * as the tri-state PENDING until all tablesyncs have reached READY
3749  * state. Only then, can it become ENABLED.
3750  *
3751  * Note: If the subscription has no tables then leave the state as
3752  * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
3753  * work.
3754  */
3757  {
3758  /* Start streaming with two_phase enabled */
3759  options.proto.logical.twophase = true;
3761 
3766  }
3767  else
3768  {
3770  }
3771 
3772  ereport(DEBUG1,
3773  (errmsg("logical replication apply worker for subscription \"%s\" two_phase is %s",
3778  "?")));
3779  }
3780  else
3781  {
3782  /* Start normal logical streaming replication. */
3784  }
3785 
3786  /* Run the main loop. */
3787  start_apply(origin_startpos);
3788 
3789  proc_exit(0);
3790 }
static void start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
Definition: worker.c:3484
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:3099
static ApplyErrorCallbackArg apply_error_callback_arg
Definition: worker.c:235
static void start_apply(XLogRecPtr origin_startpos)
Definition: worker.c:3527
MemoryContext ApplyContext
Definition: worker.c:246
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:251
static bool MySubscriptionValid
Definition: worker.c:254
Subscription * MySubscription
Definition: worker.c:253
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
#define OidIsValid(objectId)
Definition: c.h:710
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:144
int errcode(int sqlerrcode)
Definition: elog.c:693
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define LOG
Definition: elog.h:25
#define DEBUG1
Definition: elog.h:24
#define ERROR
Definition: elog.h:33
#define elog(elevel,...)
Definition: elog.h:218
#define ereport(elevel,...)
Definition: elog.h:143
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:8370
@ PGC_S_OVERRIDE
Definition: guc.h:116
@ PGC_SUSET
Definition: guc.h:75
@ PGC_BACKEND
Definition: guc.h:74
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1519
void proc_exit(int code)
Definition: ipc.c:104
void logicalrep_worker_attach(int slot)
Definition: launcher.c:564
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:58
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:38
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:39
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:37
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1909
MemoryContext TopMemoryContext
Definition: mcxt.c:48
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1292
#define AllocSetContextCreate
Definition: memutils.h:173
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:197
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:209
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:240
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1071
RepOriginId replorigin_session_origin
Definition: origin.c:155
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1206
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define NAMEDATALEN
static int server_version
Definition: pg_dumpall.c:85
static char ** options
Subscription * GetSubscription(Oid subid, bool missing_ok)
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_PENDING
#define LOGICALREP_TWOPHASE_STATE_ENABLED
#define die(msg)
Definition: pg_test_fsync.c:95
#define snprintf
Definition: port.h:225
uintptr_t Datum
Definition: postgres.h:411
#define DatumGetInt32(X)
Definition: postgres.h:516
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5713
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5684
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:180
TimestampTz last_recv_time
TimestampTz reply_time
TimestampTz last_send_time
@ SUBSCRIPTIONRELMAP
Definition: syscache.h:100
@ SUBSCRIPTIONOID
Definition: syscache.h:99
bool AllTablesyncsReady(void)
Definition: tablesync.c:1515
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:271
void ReplicationOriginNameForTablesync(Oid suboid, Oid relid, char *originname, int szorgname)
Definition: tablesync.c:1164
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1540
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:418
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:404
#define walrcv_server_version(conn)
Definition: walreceiver.h:414
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:412
#define SIGHUP
Definition: win32_port.h:167
static bool am_tablesync_worker(void)
void StartTransactionCommand(void)
Definition: xact.c:2925
void CommitTransactionCommand(void)
Definition: xact.c:3022
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint32 TimeLineID
Definition: xlogdefs.h:59

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AllTablesyncsReady(), am_tablesync_worker(), apply_error_callback_arg, ApplyContext, BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), Subscription::binary, CacheRegisterSyscacheCallback(), CommitTransactionCommand(), Subscription::conninfo, DatumGetInt32, LogicalRepWorker::dbid, DEBUG1, die, elog, Subscription::enabled, ereport, errcode(), errmsg(), ERROR, get_rel_name(), GetCurrentTimestamp(), GetSubscription(), invalidate_syncing_table_states(), InvalidXLogRecPtr, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), LOG, LOGICALREP_PROTO_STREAM_VERSION_NUM, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM, LOGICALREP_PROTO_VERSION_NUM, LOGICALREP_TWOPHASE_STATE_DISABLED, LOGICALREP_TWOPHASE_STATE_ENABLED, LOGICALREP_TWOPHASE_STATE_PENDING, logicalrep_worker_attach(), LogRepWorkerWalRcvConn, MemoryContextStrdup(), MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, NAMEDATALEN, Subscription::oid, OidIsValid, options, ApplyErrorCallbackArg::origin_name, PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, pqsignal(), proc_exit(), Subscription::publications, LogicalRepWorker::relid, ReplicationOriginNameForTablesync(), replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), LogicalRepWorker::reply_time, server_version, SetConfigOption(), SIGHUP, SignalHandlerForConfigReload(), Subscription::slotname, snprintf, start_apply(), start_table_sync(), StartTransactionCommand(), Subscription::stream, LogicalRepWorker::subid, subscription_change_cb(), SUBSCRIPTIONOID, SUBSCRIPTIONRELMAP, Subscription::synccommit, TopMemoryContext, Subscription::twophasestate, UpdateTwoPhaseState(), LogicalRepWorker::userid, walrcv_connect, walrcv_identify_system, walrcv_server_version, and walrcv_startstreaming.

◆ IsLogicalWorker()

bool IsLogicalWorker ( void  )

Definition at line 3832 of file worker.c.

3833 {
3834  return MyLogicalRepWorker != NULL;
3835 }

References MyLogicalRepWorker.

Referenced by ProcessInterrupts().