PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
worker_internal.h File Reference
Include dependency graph for worker_internal.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  LogicalRepWorker
 

Typedefs

typedef struct LogicalRepWorker LogicalRepWorker
 

Functions

void logicalrep_worker_attach (int slot)
 
LogicalRepWorkerlogicalrep_worker_find (Oid subid)
 
int logicalrep_worker_count (Oid subid)
 
void logicalrep_worker_launch (Oid dbid, Oid subid, const char *subname, Oid userid)
 
void logicalrep_worker_stop (Oid subid)
 
void logicalrep_worker_wakeup (Oid subid)
 
void logicalrep_worker_sigterm (SIGNAL_ARGS)
 

Variables

struct WalReceiverConnwrconn
 
SubscriptionMySubscription
 
LogicalRepWorkerMyLogicalRepWorker
 
bool in_remote_transaction
 
bool got_SIGTERM
 

Typedef Documentation

Function Documentation

void logicalrep_worker_attach ( int  slot)

Definition at line 407 of file launcher.c.

References Assert, before_shmem_exit(), ereport, errcode(), errmsg(), ERROR, logicalrep_worker_onexit(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, MyProc, LogicalRepWorker::proc, and LogicalRepCtxStruct::workers.

Referenced by ApplyWorkerMain().

408 {
409  /* Block concurrent access. */
410  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
411 
412  Assert(slot >= 0 && slot < max_logical_replication_workers);
414 
416  ereport(ERROR,
417  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
418  errmsg("logical replication worker slot %d already used by "
419  "another worker", slot)));
420 
423 
424  LWLockRelease(LogicalRepWorkerLock);
425 }
PGPROC * MyProc
Definition: proc.c:67
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:67
int errcode(int sqlerrcode)
Definition: elog.c:575
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1714
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:59
#define ERROR
Definition: elog.h:43
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:320
#define ereport(elevel, rest)
Definition: elog.h:122
static void logicalrep_worker_onexit(int code, Datum arg)
Definition: launcher.c:450
uintptr_t Datum
Definition: postgres.h:374
#define Assert(condition)
Definition: c.h:671
int max_logical_replication_workers
Definition: launcher.c:58
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1110
int errmsg(const char *fmt,...)
Definition: elog.c:797
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:70
int logicalrep_worker_count ( Oid  subid)
LogicalRepWorker* logicalrep_worker_find ( Oid  subid)

Definition at line 204 of file launcher.c.

References Assert, i, IsBackendPid(), LWLockHeldByMe(), max_logical_replication_workers, NULL, PGPROC::pid, LogicalRepWorker::proc, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by ApplyLauncherMain(), and logicalrep_worker_stop().

205 {
206  int i;
207  LogicalRepWorker *res = NULL;
208 
209  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
210  /* Search for attached worker for a given subscription id. */
211  for (i = 0; i < max_logical_replication_workers; i++)
212  {
214  if (w->subid == subid && w->proc && IsBackendPid(w->proc->pid))
215  {
216  res = w;
217  break;
218  }
219  }
220 
221  return res;
222 }
bool LWLockHeldByMe(LWLock *l)
Definition: lwlock.c:1830
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:67
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
int max_logical_replication_workers
Definition: launcher.c:58
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:70
int i
bool IsBackendPid(int pid)
Definition: procarray.c:2424
int pid
Definition: proc.h:98
void logicalrep_worker_launch ( Oid  dbid,
Oid  subid,
const char *  subname,
Oid  userid 
)

Definition at line 228 of file launcher.c.

References ApplyWorkerMain(), BackgroundWorker::bgw_flags, BackgroundWorker::bgw_main, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BGW_NEVER_RESTART, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, LogicalRepWorker::dbid, ereport, errcode(), errhint(), errmsg(), ERROR, LOG, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, max_replication_slots, MyProcPid, NULL, LogicalRepWorker::proc, RegisterDynamicBackgroundWorker(), snprintf(), LogicalRepWorker::subid, LogicalRepWorker::userid, WaitForReplicationWorkerAttach(), WARNING, and LogicalRepCtxStruct::workers.

Referenced by ApplyLauncherMain().

229 {
230  BackgroundWorker bgw;
231  BackgroundWorkerHandle *bgw_handle;
232  int slot;
233  LogicalRepWorker *worker = NULL;
234 
235  ereport(LOG,
236  (errmsg("starting logical replication worker for subscription \"%s\"",
237  subname)));
238 
239  /* Report this after the initial starting message for consistency. */
240  if (max_replication_slots == 0)
241  ereport(ERROR,
242  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
243  errmsg("cannot start logical replication workers when max_replication_slots = 0")));
244 
245  /*
246  * We need to do the modification of the shared memory under lock so that
247  * we have consistent view.
248  */
249  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
250 
251  /* Find unused worker slot. */
252  for (slot = 0; slot < max_logical_replication_workers; slot++)
253  {
254  if (!LogicalRepCtx->workers[slot].proc)
255  {
256  worker = &LogicalRepCtx->workers[slot];
257  break;
258  }
259  }
260 
261  /* Bail if not found */
262  if (worker == NULL)
263  {
264  LWLockRelease(LogicalRepWorkerLock);
266  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
267  errmsg("out of logical replication workers slots"),
268  errhint("You might need to increase max_logical_replication_workers.")));
269  return;
270  }
271 
272  /* Prepare the worker info. */
273  memset(worker, 0, sizeof(LogicalRepWorker));
274  worker->dbid = dbid;
275  worker->userid = userid;
276  worker->subid = subid;
277 
278  LWLockRelease(LogicalRepWorkerLock);
279 
280  /* Register the new dynamic worker. */
286  "logical replication worker for subscription %u", subid);
287 
290  bgw.bgw_main_arg = slot;
291 
292  if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
293  {
295  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
296  errmsg("out of background workers slots"),
297  errhint("You might need to increase max_worker_processes.")));
298  return;
299  }
300 
301  /* Now wait until it attaches. */
302  WaitForReplicationWorkerAttach(worker, bgw_handle);
303 }
int MyProcPid
Definition: globals.c:38
int errhint(const char *fmt,...)
Definition: elog.c:987
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:67
int bgw_restart_time
Definition: bgworker.h:93
int errcode(int sqlerrcode)
Definition: elog.c:575
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define LOG
Definition: elog.h:26
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:52
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1714
Datum bgw_main_arg
Definition: bgworker.h:97
#define ERROR
Definition: elog.h:43
bgworker_main_type bgw_main
Definition: bgworker.h:94
#define BGW_NEVER_RESTART
Definition: bgworker.h:84
#define ereport(elevel, rest)
Definition: elog.h:122
#define WARNING
Definition: elog.h:40
int max_replication_slots
Definition: slot.c:98
#define NULL
Definition: c.h:226
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:90
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:59
int max_logical_replication_workers
Definition: launcher.c:58
#define BGW_MAXLEN
Definition: bgworker.h:85
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:92
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition: bgworker.c:864
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1110
int errmsg(const char *fmt,...)
Definition: elog.c:797
pid_t bgw_notify_pid
Definition: bgworker.h:99
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:70
void ApplyWorkerMain(Datum main_arg)
Definition: worker.c:1308
static bool WaitForReplicationWorkerAttach(LogicalRepWorker *worker, BackgroundWorkerHandle *handle)
Definition: launcher.c:157
void logicalrep_worker_sigterm ( SIGNAL_ARGS  )

Definition at line 457 of file launcher.c.

References got_SIGTERM, MyLatch, and SetLatch().

Referenced by ApplyLauncherMain(), and ApplyWorkerMain().

458 {
459  got_SIGTERM = true;
460 
461  /* Waken anything waiting on the process latch */
462  SetLatch(MyLatch);
463 }
void SetLatch(volatile Latch *latch)
Definition: latch.c:380
bool got_SIGTERM
Definition: launcher.c:75
struct Latch * MyLatch
Definition: globals.c:51
void logicalrep_worker_stop ( Oid  subid)

Definition at line 313 of file launcher.c.

References Assert, CHECK_FOR_INTERRUPTS, InvalidOid, logicalrep_worker_find(), LW_SHARED, LWLockAcquire(), LWLockHeldByMe(), LWLockRelease(), MyProc, PGPROC::pid, LogicalRepWorker::proc, proc_exit(), PGPROC::procLatch, ResetLatch(), LogicalRepWorker::subid, WAIT_EVENT_BGWORKER_SHUTDOWN, WAIT_EVENT_BGWORKER_STARTUP, WaitLatch(), WL_LATCH_SET, WL_POSTMASTER_DEATH, and WL_TIMEOUT.

Referenced by DropSubscription().

314 {
315  LogicalRepWorker *worker;
316 
317  Assert(LWLockHeldByMe(LogicalRepLauncherLock));
318 
319  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
320 
321  worker = logicalrep_worker_find(subid);
322 
323  /* No worker, nothing to do. */
324  if (!worker)
325  {
326  LWLockRelease(LogicalRepWorkerLock);
327  return;
328  }
329 
330  /*
331  * If we found worker but it does not have proc set it is starting up,
332  * wait for it to finish and then kill it.
333  */
334  while (worker && !worker->proc)
335  {
336  int rc;
337 
338  LWLockRelease(LogicalRepWorkerLock);
339 
341 
342  /* Wait for signal. */
343  rc = WaitLatch(&MyProc->procLatch,
346 
347  /* emergency bailout if postmaster has died */
348  if (rc & WL_POSTMASTER_DEATH)
349  proc_exit(1);
350 
352 
353  /* Check worker status. */
354  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
355 
356  /*
357  * Worker is no longer associated with subscription. It must have
358  * exited, nothing more for us to do.
359  */
360  if (worker->subid == InvalidOid)
361  {
362  LWLockRelease(LogicalRepWorkerLock);
363  return;
364  }
365 
366  /* Worker has assigned proc, so it has started. */
367  if (worker->proc)
368  break;
369  }
370 
371  /* Now terminate the worker ... */
372  kill(worker->proc->pid, SIGTERM);
373  LWLockRelease(LogicalRepWorkerLock);
374 
375  /* ... and wait for it to die. */
376  for (;;)
377  {
378  int rc;
379 
380  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
381  if (!worker->proc)
382  {
383  LWLockRelease(LogicalRepWorkerLock);
384  break;
385  }
386  LWLockRelease(LogicalRepWorkerLock);
387 
389 
390  /* Wait for more work. */
391  rc = WaitLatch(&MyProc->procLatch,
392  WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
394 
395  /* emergency bailout if postmaster has died */
396  if (rc & WL_POSTMASTER_DEATH)
397  proc_exit(1);
398 
400  }
401 }
#define WL_TIMEOUT
Definition: latch.h:127
bool LWLockHeldByMe(LWLock *l)
Definition: lwlock.c:1830
PGPROC * MyProc
Definition: proc.c:67
void proc_exit(int code)
Definition: ipc.c:99
void ResetLatch(volatile Latch *latch)
Definition: latch.c:462
Latch procLatch
Definition: proc.h:93
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1714
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:301
LogicalRepWorker * logicalrep_worker_find(Oid subid)
Definition: launcher.c:204
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
#define InvalidOid
Definition: postgres_ext.h:36
#define Assert(condition)
Definition: c.h:671
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1110
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
int pid
Definition: proc.h:98
#define WL_LATCH_SET
Definition: latch.h:124
void logicalrep_worker_wakeup ( Oid  subid)

Variable Documentation

bool got_SIGTERM

Definition at line 75 of file launcher.c.

Referenced by ApplyLauncherMain(), and logicalrep_worker_sigterm().

bool in_remote_transaction

Definition at line 111 of file worker.c.

Referenced by apply_handle_begin(), apply_handle_commit(), apply_handle_origin(), and ApplyLoop().

LogicalRepWorker* MyLogicalRepWorker

Definition at line 59 of file launcher.c.

Referenced by ApplyWorkerMain(), reread_subscription(), and UpdateWorkerStats().

Subscription* MySubscription

Definition at line 108 of file worker.c.

struct WalReceiverConn* wrconn

Definition at line 106 of file worker.c.

Referenced by CreateSubscription(), and DropSubscription().