PostgreSQL Source Code  git master
logicalworker.h File Reference
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

void ApplyWorkerMain (Datum main_arg)
 
bool IsLogicalWorker (void)
 

Function Documentation

◆ ApplyWorkerMain()

void ApplyWorkerMain ( Datum  main_arg)

Definition at line 2955 of file worker.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, am_tablesync_worker(), BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), Subscription::binary, CacheRegisterSyscacheCallback(), CommitTransactionCommand(), Subscription::conninfo, DatumGetInt32, LogicalRepWorker::dbid, DEBUG1, die, elog, Subscription::enabled, ereport, errmsg(), ERROR, get_rel_name(), GetCurrentTimestamp(), GetSubscription(), invalidate_syncing_table_states(), LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), LOG, WalRcvStreamOptions::logical, LOGICALREP_PROTO_STREAM_VERSION_NUM, LOGICALREP_PROTO_VERSION_NUM, logicalrep_worker_attach(), LogicalRepApplyLoop(), LogicalRepSyncTableStart(), MemoryContextStrdup(), MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscriptionValid, Subscription::name, NAMEDATALEN, Subscription::oid, OidIsValid, options, pfree(), PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, pqsignal(), proc_exit(), WalRcvStreamOptions::proto, Subscription::publications, LogicalRepWorker::relid, replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), LogicalRepWorker::reply_time, SetConfigOption(), SIGHUP, SignalHandlerForConfigReload(), Subscription::slotname, WalRcvStreamOptions::slotname, snprintf, WalRcvStreamOptions::startpoint, StartTransactionCommand(), Subscription::stream, LogicalRepWorker::subid, subscription_change_cb(), SUBSCRIPTIONOID, SUBSCRIPTIONRELMAP, Subscription::synccommit, TopMemoryContext, LogicalRepWorker::userid, walrcv_connect, walrcv_identify_system, walrcv_server_version, and walrcv_startstreaming.

2956 {
2957  int worker_slot = DatumGetInt32(main_arg);
2958  MemoryContext oldctx;
2959  char originname[NAMEDATALEN];
2960  XLogRecPtr origin_startpos;
2961  char *myslotname;
2963 
2964  /* Attach to slot */
2965  logicalrep_worker_attach(worker_slot);
2966 
2967  /* Setup signal handling */
2969  pqsignal(SIGTERM, die);
2971 
2972  /*
2973  * We don't currently need any ResourceOwner in a walreceiver process, but
2974  * if we did, we could call CreateAuxProcessResourceOwner here.
2975  */
2976 
2977  /* Initialise stats to a sanish value */
2980 
2981  /* Load the libpq-specific functions */
2982  load_file("libpqwalreceiver", false);
2983 
2984  /* Run as replica session replication role. */
2985  SetConfigOption("session_replication_role", "replica",
2987 
2988  /* Connect to our database. */
2991  0);
2992 
2993  /*
2994  * Set always-secure search path, so malicious users can't redirect user
2995  * code (e.g. pg_index.indexprs).
2996  */
2997  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
2998 
2999  /* Load the subscription into persistent memory context. */
3001  "ApplyContext",
3005 
3007  if (!MySubscription)
3008  {
3009  ereport(LOG,
3010  (errmsg("logical replication apply worker for subscription %u will not "
3011  "start because the subscription was removed during startup",
3013  proc_exit(0);
3014  }
3015 
3016  MySubscriptionValid = true;
3017  MemoryContextSwitchTo(oldctx);
3018 
3019  if (!MySubscription->enabled)
3020  {
3021  ereport(LOG,
3022  (errmsg("logical replication apply worker for subscription \"%s\" will not "
3023  "start because the subscription was disabled during startup",
3024  MySubscription->name)));
3025 
3026  proc_exit(0);
3027  }
3028 
3029  /* Setup synchronous commit according to the user's wishes */
3030  SetConfigOption("synchronous_commit", MySubscription->synccommit,
3032 
3033  /* Keep us informed about subscription changes. */
3036  (Datum) 0);
3037 
3038  if (am_tablesync_worker())
3039  ereport(LOG,
3040  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
3042  else
3043  ereport(LOG,
3044  (errmsg("logical replication apply worker for subscription \"%s\" has started",
3045  MySubscription->name)));
3046 
3048 
3049  /* Connect to the origin and start the replication. */
3050  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
3052 
3053  if (am_tablesync_worker())
3054  {
3055  char *syncslotname;
3056 
3057  /* This is table synchronization worker, call initial sync. */
3058  syncslotname = LogicalRepSyncTableStart(&origin_startpos);
3059 
3060  /* allocate slot name in long-lived context */
3061  myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
3062 
3063  pfree(syncslotname);
3064  }
3065  else
3066  {
3067  /* This is main apply worker */
3068  RepOriginId originid;
3069  TimeLineID startpointTLI;
3070  char *err;
3071 
3072  myslotname = MySubscription->slotname;
3073 
3074  /*
3075  * This shouldn't happen if the subscription is enabled, but guard
3076  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
3077  * crash if slot is NULL.)
3078  */
3079  if (!myslotname)
3080  ereport(ERROR,
3081  (errmsg("subscription has no replication slot set")));
3082 
3083  /* Setup replication origin tracking. */
3085  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
3086  originid = replorigin_by_name(originname, true);
3087  if (!OidIsValid(originid))
3088  originid = replorigin_create(originname);
3089  replorigin_session_setup(originid);
3090  replorigin_session_origin = originid;
3091  origin_startpos = replorigin_session_get_progress(false);
3093 
3095  MySubscription->name, &err);
3096  if (LogRepWorkerWalRcvConn == NULL)
3097  ereport(ERROR,
3098  (errmsg("could not connect to the publisher: %s", err)));
3099 
3100  /*
3101  * We don't really use the output identify_system for anything but it
3102  * does some initializations on the upstream so let's still call it.
3103  */
3104  (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
3105  }
3106 
3107  /*
3108  * Setup callback for syscache so that we know when something changes in
3109  * the subscription relation state.
3110  */
3113  (Datum) 0);
3114 
3115  /* Build logical replication streaming options. */
3116  options.logical = true;
3117  options.startpoint = origin_startpos;
3118  options.slotname = myslotname;
3119  options.proto.logical.proto_version =
3122  options.proto.logical.publication_names = MySubscription->publications;
3123  options.proto.logical.binary = MySubscription->binary;
3124  options.proto.logical.streaming = MySubscription->stream;
3125 
3126  /* Start normal logical streaming replication. */
3128 
3129  /* Run the main loop. */
3130  LogicalRepApplyLoop(origin_startpos);
3131 
3132  proc_exit(0);
3133 }
Subscription * MySubscription
Definition: worker.c:161
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:32
#define AllocSetContextCreate
Definition: memutils.h:173
#define DEBUG1
Definition: elog.h:25
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:159
uint32 TimeLineID
Definition: xlogdefs.h:59
#define DatumGetInt32(X)
Definition: postgres.h:516
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:409
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
void CommitTransactionCommand(void)
Definition: xact.c:2939
#define walrcv_server_version(conn)
Definition: walreceiver.h:411
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
TimestampTz last_send_time
uint16 RepOriginId
Definition: xlogdefs.h:65
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:415
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1203
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1068
#define LOG
Definition: elog.h:26
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:919
#define OidIsValid(objectId)
Definition: c.h:710
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:209
#define NAMEDATALEN
Subscription * GetSubscription(Oid subid, bool missing_ok)
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:2512
void pfree(void *pointer)
Definition: mcxt.c:1169
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
#define ERROR
Definition: elog.h:46
Definition: guc.h:75
static bool am_tablesync_worker(void)
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
void logicalrep_worker_attach(int slot)
Definition: launcher.c:564
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:8050
#define SIGHUP
Definition: win32_port.h:159
XLogRecPtr startpoint
Definition: walreceiver.h:170
List * publications
RepOriginId replorigin_create(char *roname)
Definition: origin.c:240
MemoryContext TopMemoryContext
Definition: mcxt.c:48
MemoryContext ApplyContext
Definition: worker.c:154
static char ** options
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1435
uintptr_t Datum
Definition: postgres.h:411
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5695
#define ereport(elevel,...)
Definition: elog.h:157
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:33
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
TimestampTz last_recv_time
uint64 XLogRecPtr
Definition: xlogdefs.h:21
RepOriginId replorigin_session_origin
Definition: origin.c:154
void StartTransactionCommand(void)
Definition: xact.c:2838
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
bool MySubscriptionValid
Definition: worker.c:162
int errmsg(const char *fmt,...)
Definition: elog.c:909
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1286
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:2090
#define elog(elevel,...)
Definition: elog.h:232
union WalRcvStreamOptions::@103 proto
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1899
#define snprintf
Definition: port.h:216
#define die(msg)
Definition: pg_test_fsync.c:97
TimestampTz reply_time
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:264
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5724
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:401

◆ IsLogicalWorker()

bool IsLogicalWorker ( void  )

Definition at line 3139 of file worker.c.

References MyLogicalRepWorker.

Referenced by ProcessInterrupts().

3140 {
3141  return MyLogicalRepWorker != NULL;
3142 }
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57