PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
launcher.c File Reference
#include "postgres.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "access/heapam.h"
#include "access/htup.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "libpq/pqsignal.h"
#include "postmaster/bgworker.h"
#include "postmaster/fork_process.h"
#include "postmaster/postmaster.h"
#include "replication/logicallauncher.h"
#include "replication/logicalworker.h"
#include "replication/slot.h"
#include "replication/worker_internal.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/snapmgr.h"
Include dependency graph for launcher.c:

Go to the source code of this file.

Data Structures

struct  LogicalRepCtxStruct
 

Macros

#define DEFAULT_NAPTIME_PER_CYCLE   180000L
 
#define PG_STAT_GET_SUBSCRIPTION_COLS   8
 

Typedefs

typedef struct LogicalRepCtxStruct LogicalRepCtxStruct
 

Functions

static void logicalrep_worker_onexit (int code, Datum arg)
 
static void logicalrep_worker_detach (void)
 
Datum pg_stat_get_subscription (PG_FUNCTION_ARGS)
 
static Listget_subscription_list (void)
 
static bool WaitForReplicationWorkerAttach (LogicalRepWorker *worker, BackgroundWorkerHandle *handle)
 
LogicalRepWorkerlogicalrep_worker_find (Oid subid, Oid relid, bool only_running)
 
void logicalrep_worker_launch (Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid)
 
void logicalrep_worker_stop (Oid subid, Oid relid)
 
void logicalrep_worker_wakeup (Oid subid, Oid relid)
 
void logicalrep_worker_wakeup_ptr (LogicalRepWorker *worker)
 
void logicalrep_worker_attach (int slot)
 
void logicalrep_worker_sigterm (SIGNAL_ARGS)
 
int logicalrep_sync_worker_count (Oid subid)
 
Size ApplyLauncherShmemSize (void)
 
void ApplyLauncherRegister (void)
 
void ApplyLauncherShmemInit (void)
 
void AtCommit_ApplyLauncher (void)
 
void ApplyLauncherWakeupAtCommit (void)
 
void ApplyLauncherWakeup (void)
 
void ApplyLauncherMain (Datum main_arg)
 

Variables

int max_logical_replication_workers = 4
 
int max_sync_workers_per_subscription = 2
 
LogicalRepWorkerMyLogicalRepWorker = NULL
 
LogicalRepCtxStructLogicalRepCtx
 
bool got_SIGTERM = false
 
static bool on_commit_launcher_wakeup = false
 

Macro Definition Documentation

#define DEFAULT_NAPTIME_PER_CYCLE   180000L

Definition at line 57 of file launcher.c.

Referenced by ApplyLauncherMain().

#define PG_STAT_GET_SUBSCRIPTION_COLS   8

Typedef Documentation

Function Documentation

void ApplyLauncherMain ( Datum  main_arg)

Definition at line 632 of file launcher.c.

References ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE, ALLOCSET_DEFAULT_MINSIZE, AllocSetContextCreate(), BackgroundWorkerInitializeConnection(), BackgroundWorkerUnblockSignals(), BackgroundWorker::bgw_name, Subscription::dbid, DEBUG1, DEFAULT_NAPTIME_PER_CYCLE, Subscription::enabled, ereport, errmsg(), get_subscription_list(), GetCurrentTimestamp(), got_SIGTERM, InvalidOid, LogicalRepCtxStruct::launcher_pid, lfirst, logicalrep_worker_find(), logicalrep_worker_launch(), logicalrep_worker_sigterm(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MemoryContextDelete(), MemoryContextSwitchTo(), MyBgworkerEntry, MyProc, MyProcPid, Subscription::name, now(), NULL, Subscription::oid, Subscription::owner, PGC_S_SESSION, PGC_USERSET, pqsignal(), proc_exit(), PGPROC::procLatch, ResetLatch(), SetConfigOption(), TimestampDifferenceExceeds(), TopMemoryContext, WAIT_EVENT_LOGICAL_LAUNCHER_MAIN, WaitLatch(), wal_retrieve_retry_interval, WL_LATCH_SET, WL_POSTMASTER_DEATH, and WL_TIMEOUT.

Referenced by ApplyLauncherRegister().

633 {
634  ereport(DEBUG1,
635  (errmsg("logical replication launcher started")));
636 
637  /* Establish signal handlers. */
640 
641  /* Make it easy to identify our processes. */
642  SetConfigOption("application_name", MyBgworkerEntry->bgw_name,
644 
646 
647  /*
648  * Establish connection to nailed catalogs (we only ever access
649  * pg_subscription).
650  */
652 
653  /* Enter main loop */
654  while (!got_SIGTERM)
655  {
656  int rc;
657  List *sublist;
658  ListCell *lc;
659  MemoryContext subctx;
660  MemoryContext oldctx;
662  TimestampTz last_start_time = 0;
663  long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
664 
665  now = GetCurrentTimestamp();
666 
667  /* Limit the start retry to once a wal_retrieve_retry_interval */
668  if (TimestampDifferenceExceeds(last_start_time, now,
670  {
671  /* Use temporary context for the database list and worker info. */
673  "Logical Replication Launcher sublist",
677  oldctx = MemoryContextSwitchTo(subctx);
678 
679  /* search for subscriptions to start or stop. */
680  sublist = get_subscription_list();
681 
682  /* Start the missing workers for enabled subscriptions. */
683  foreach(lc, sublist)
684  {
685  Subscription *sub = (Subscription *) lfirst(lc);
686  LogicalRepWorker *w;
687 
688  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
689  w = logicalrep_worker_find(sub->oid, InvalidOid, false);
690  LWLockRelease(LogicalRepWorkerLock);
691 
692  if (sub->enabled && w == NULL)
693  {
694  logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
695  sub->owner, InvalidOid);
696  last_start_time = now;
697  wait_time = wal_retrieve_retry_interval;
698  /* Limit to one worker per mainloop cycle. */
699  break;
700  }
701  }
702 
703  /* Switch back to original memory context. */
704  MemoryContextSwitchTo(oldctx);
705  /* Clean the temporary memory. */
706  MemoryContextDelete(subctx);
707  }
708  else
709  {
710  /*
711  * The wait in previous cycle was interrupted in less than
712  * wal_retrieve_retry_interval since last worker was started,
713  * this usually means crash of the worker, so we should retry
714  * in wal_retrieve_retry_interval again.
715  */
716  wait_time = wal_retrieve_retry_interval;
717  }
718 
719  /* Wait for more work. */
720  rc = WaitLatch(&MyProc->procLatch,
722  wait_time,
724 
725  /* emergency bailout if postmaster has died */
726  if (rc & WL_POSTMASTER_DEATH)
727  proc_exit(1);
728 
730  }
731 
733 
734  /* ... and if it returns, we're done */
735  ereport(DEBUG1,
736  (errmsg("logical replication launcher shutting down")));
737 
738  proc_exit(0);
739 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:38
#define WL_TIMEOUT
Definition: latch.h:127
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void proc_exit(int code)
Definition: ipc.c:99
BackgroundWorker * MyBgworkerEntry
Definition: postmaster.c:189
void ResetLatch(volatile Latch *latch)
Definition: latch.c:461
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1648
#define ALLOCSET_DEFAULT_MINSIZE
Definition: memutils.h:162
Latch procLatch
Definition: proc.h:103
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:300
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:6651
#define ereport(elevel, rest)
Definition: elog.h:122
MemoryContext TopMemoryContext
Definition: mcxt.c:43
int wal_retrieve_retry_interval
Definition: xlog.c:107
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
#define DEFAULT_NAPTIME_PER_CYCLE
Definition: launcher.c:57
void logicalrep_worker_sigterm(SIGNAL_ARGS)
Definition: launcher.c:495
#define InvalidOid
Definition: postgres_ext.h:36
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
static List * get_subscription_list(void)
Definition: launcher.c:91
#define NULL
Definition: c.h:229
void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid)
Definition: launcher.c:233
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:90
#define lfirst(lc)
Definition: pg_list.h:106
bool got_SIGTERM
Definition: launcher.c:78
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:207
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
int errmsg(const char *fmt,...)
Definition: elog.c:797
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:73
#define ALLOCSET_DEFAULT_INITSIZE
Definition: memutils.h:163
#define ALLOCSET_DEFAULT_MAXSIZE
Definition: memutils.h:164
Definition: pg_list.h:45
#define WL_LATCH_SET
Definition: latch.h:124
void BackgroundWorkerInitializeConnection(char *dbname, char *username)
Definition: postmaster.c:5452
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5504
void ApplyLauncherRegister ( void  )

Definition at line 546 of file launcher.c.

References ApplyLauncherMain(), BackgroundWorker::bgw_flags, BackgroundWorker::bgw_main, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, max_logical_replication_workers, RegisterBackgroundWorker(), and snprintf().

Referenced by PostmasterMain().

547 {
548  BackgroundWorker bgw;
549 
551  return;
552 
558  "logical replication launcher");
559  bgw.bgw_restart_time = 5;
560  bgw.bgw_notify_pid = 0;
561  bgw.bgw_main_arg = (Datum) 0;
562 
564 }
void RegisterBackgroundWorker(BackgroundWorker *worker)
Definition: bgworker.c:805
void ApplyLauncherMain(Datum main_arg)
Definition: launcher.c:632
int bgw_restart_time
Definition: bgworker.h:93
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:52
Datum bgw_main_arg
Definition: bgworker.h:97
bgworker_main_type bgw_main
Definition: bgworker.h:94
uintptr_t Datum
Definition: postgres.h:372
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:90
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:59
int max_logical_replication_workers
Definition: launcher.c:59
#define BGW_MAXLEN
Definition: bgworker.h:85
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:92
pid_t bgw_notify_pid
Definition: bgworker.h:99
void ApplyLauncherShmemInit ( void  )

Definition at line 571 of file launcher.c.

References ApplyLauncherShmemSize(), max_logical_replication_workers, LogicalRepWorker::relmutex, ShmemInitStruct(), SpinLockInit, and LogicalRepCtxStruct::workers.

Referenced by CreateSharedMemoryAndSemaphores().

572 {
573  bool found;
574 
576  ShmemInitStruct("Logical Replication Launcher Data",
578  &found);
579 
580  if (!found)
581  {
582  int slot;
583 
585 
586  /* Initialize memory and spin locks for each worker slot. */
587  for (slot = 0; slot < max_logical_replication_workers; slot++)
588  {
589  LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
590 
591  memset(worker, 0, sizeof(LogicalRepWorker));
592  SpinLockInit(&worker->relmutex);
593  }
594  }
595 }
#define SpinLockInit(lock)
Definition: spin.h:60
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:70
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
Size ApplyLauncherShmemSize(void)
Definition: launcher.c:531
int max_logical_replication_workers
Definition: launcher.c:59
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:73
Size ApplyLauncherShmemSize ( void  )

Definition at line 531 of file launcher.c.

References add_size(), max_logical_replication_workers, MAXALIGN, and mul_size().

Referenced by ApplyLauncherShmemInit(), and CreateSharedMemoryAndSemaphores().

532 {
533  Size size;
534 
535  /*
536  * Need the fixed struct and the array of LogicalRepWorker.
537  */
538  size = sizeof(LogicalRepCtxStruct);
539  size = MAXALIGN(size);
541  sizeof(LogicalRepWorker)));
542  return size;
543 }
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
int max_logical_replication_workers
Definition: launcher.c:59
size_t Size
Definition: c.h:356
#define MAXALIGN(LEN)
Definition: c.h:588
struct LogicalRepCtxStruct LogicalRepCtxStruct
void ApplyLauncherWakeup ( void  )

Definition at line 622 of file launcher.c.

References IsBackendPid(), LogicalRepCtxStruct::launcher_pid, and SIGUSR1.

Referenced by AtCommit_ApplyLauncher().

623 {
626 }
#define SIGUSR1
Definition: win32.h:211
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:73
bool IsBackendPid(int pid)
Definition: procarray.c:2425
void ApplyLauncherWakeupAtCommit ( void  )

Definition at line 615 of file launcher.c.

References on_commit_launcher_wakeup.

Referenced by CreateSubscription().

616 {
619 }
static bool on_commit_launcher_wakeup
Definition: launcher.c:79
void AtCommit_ApplyLauncher ( void  )

Definition at line 601 of file launcher.c.

References ApplyLauncherWakeup(), and on_commit_launcher_wakeup.

Referenced by CommitTransaction().

602 {
605 }
void ApplyLauncherWakeup(void)
Definition: launcher.c:622
static bool on_commit_launcher_wakeup
Definition: launcher.c:79
static List* get_subscription_list ( void  )
static

Definition at line 91 of file launcher.c.

References AccessShareLock, CommitTransactionCommand(), Subscription::conninfo, CurrentMemoryContext, Subscription::dbid, Subscription::enabled, ForwardScanDirection, GETSTRUCT, GetTransactionSnapshot(), heap_beginscan_catalog(), heap_close, heap_endscan(), heap_getnext(), heap_open(), HeapTupleGetOid, HeapTupleIsValid, lappend(), MemoryContextSwitchTo(), Subscription::name, NameStr, NIL, NULL, Subscription::oid, Subscription::owner, palloc(), pstrdup(), Subscription::publications, Subscription::slotname, StartTransactionCommand(), and SubscriptionRelationId.

Referenced by ApplyLauncherMain().

92 {
93  List *res = NIL;
94  Relation rel;
95  HeapScanDesc scan;
96  HeapTuple tup;
97  MemoryContext resultcxt;
98 
99  /* This is the context that we will allocate our output data in */
100  resultcxt = CurrentMemoryContext;
101 
102  /*
103  * Start a transaction so we can access pg_database, and get a snapshot.
104  * We don't have a use for the snapshot itself, but we're interested in
105  * the secondary effect that it sets RecentGlobalXmin. (This is critical
106  * for anything that reads heap pages, because HOT may decide to prune
107  * them even if the process doesn't attempt to modify any tuples.)
108  */
110  (void) GetTransactionSnapshot();
111 
113  scan = heap_beginscan_catalog(rel, 0, NULL);
114 
116  {
118  Subscription *sub;
119  MemoryContext oldcxt;
120 
121  /*
122  * Allocate our results in the caller's context, not the
123  * transaction's. We do this inside the loop, and restore the original
124  * context at the end, so that leaky things like heap_getnext() are
125  * not called in a potentially long-lived context.
126  */
127  oldcxt = MemoryContextSwitchTo(resultcxt);
128 
129  sub = (Subscription *) palloc(sizeof(Subscription));
130  sub->oid = HeapTupleGetOid(tup);
131  sub->dbid = subform->subdbid;
132  sub->owner = subform->subowner;
133  sub->enabled = subform->subenabled;
134  sub->name = pstrdup(NameStr(subform->subname));
135 
136  /* We don't fill fields we are not interested in. */
137  sub->conninfo = NULL;
138  sub->slotname = NULL;
139  sub->publications = NIL;
140 
141  res = lappend(res, sub);
142  MemoryContextSwitchTo(oldcxt);
143  }
144 
145  heap_endscan(scan);
147 
149 
150  return res;
151 }
#define NIL
Definition: pg_list.h:69
#define GETSTRUCT(TUP)
Definition: htup_details.h:656
void heap_endscan(HeapScanDesc scan)
Definition: heapam.c:1581
char * pstrdup(const char *in)
Definition: mcxt.c:1077
void CommitTransactionCommand(void)
Definition: xact.c:2747
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define AccessShareLock
Definition: lockdefs.h:36
#define heap_close(r, l)
Definition: heapam.h:97
FormData_pg_subscription * Form_pg_subscription
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:300
#define SubscriptionRelationId
List * publications
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
HeapScanDesc heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key)
Definition: heapam.c:1402
List * lappend(List *list, void *datum)
Definition: list.c:128
HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction)
Definition: heapam.c:1797
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1287
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define NULL
Definition: c.h:229
void StartTransactionCommand(void)
Definition: xact.c:2677
void * palloc(Size size)
Definition: mcxt.c:849
#define NameStr(name)
Definition: c.h:499
#define HeapTupleGetOid(tuple)
Definition: htup_details.h:695
Definition: pg_list.h:45
int logicalrep_sync_worker_count ( Oid  subid)

Definition at line 508 of file launcher.c.

References Assert, i, LWLockHeldByMe(), max_logical_replication_workers, OidIsValid, LogicalRepWorker::relid, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by process_syncing_tables_for_apply().

509 {
510  int i;
511  int res = 0;
512 
513  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
514 
515  /* Search for attached worker for a given subscription id. */
516  for (i = 0; i < max_logical_replication_workers; i++)
517  {
519  if (w->subid == subid && OidIsValid(w->relid))
520  res++;
521  }
522 
523  return res;
524 }
bool LWLockHeldByMe(LWLock *l)
Definition: lwlock.c:1831
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:70
#define OidIsValid(objectId)
Definition: c.h:538
#define Assert(condition)
Definition: c.h:675
int max_logical_replication_workers
Definition: launcher.c:59
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:73
int i
void logicalrep_worker_attach ( int  slot)

Definition at line 445 of file launcher.c.

References Assert, before_shmem_exit(), ereport, errcode(), errmsg(), ERROR, logicalrep_worker_onexit(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, MyProc, LogicalRepWorker::proc, and LogicalRepCtxStruct::workers.

Referenced by ApplyWorkerMain().

446 {
447  /* Block concurrent access. */
448  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
449 
450  Assert(slot >= 0 && slot < max_logical_replication_workers);
452 
454  ereport(ERROR,
455  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
456  errmsg("logical replication worker slot %d already used by "
457  "another worker", slot)));
458 
461 
462  LWLockRelease(LogicalRepWorkerLock);
463 }
PGPROC * MyProc
Definition: proc.c:67
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:70
int errcode(int sqlerrcode)
Definition: elog.c:575
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:62
#define ERROR
Definition: elog.h:43
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:320
#define ereport(elevel, rest)
Definition: elog.h:122
static void logicalrep_worker_onexit(int code, Datum arg)
Definition: launcher.c:488
uintptr_t Datum
Definition: postgres.h:372
#define Assert(condition)
Definition: c.h:675
int max_logical_replication_workers
Definition: launcher.c:59
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
int errmsg(const char *fmt,...)
Definition: elog.c:797
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:73
static void logicalrep_worker_detach ( void  )
static

Definition at line 469 of file launcher.c.

References LogicalRepWorker::dbid, InvalidOid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NULL, LogicalRepWorker::proc, LogicalRepWorker::subid, and LogicalRepWorker::userid.

Referenced by logicalrep_worker_onexit().

470 {
471  /* Block concurrent access. */
472  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
473 
478 
479  LWLockRelease(LogicalRepWorkerLock);
480 }
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:62
#define InvalidOid
Definition: postgres_ext.h:36
#define NULL
Definition: c.h:229
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
LogicalRepWorker* logicalrep_worker_find ( Oid  subid,
Oid  relid,
bool  only_running 
)

Definition at line 207 of file launcher.c.

References Assert, i, IsBackendPid(), LWLockHeldByMe(), max_logical_replication_workers, NULL, PGPROC::pid, LogicalRepWorker::proc, LogicalRepWorker::relid, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by ApplyLauncherMain(), logicalrep_worker_stop(), logicalrep_worker_wakeup(), process_syncing_tables_for_apply(), and wait_for_sync_status_change().

208 {
209  int i;
210  LogicalRepWorker *res = NULL;
211 
212  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
213 
214  /* Search for attached worker for a given subscription id. */
215  for (i = 0; i < max_logical_replication_workers; i++)
216  {
218  if (w->subid == subid && w->relid == relid &&
219  (!only_running || (w->proc && IsBackendPid(w->proc->pid))))
220  {
221  res = w;
222  break;
223  }
224  }
225 
226  return res;
227 }
bool LWLockHeldByMe(LWLock *l)
Definition: lwlock.c:1831
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:70
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
int max_logical_replication_workers
Definition: launcher.c:59
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:73
int i
bool IsBackendPid(int pid)
Definition: procarray.c:2425
int pid
Definition: proc.h:108
void logicalrep_worker_launch ( Oid  dbid,
Oid  subid,
const char *  subname,
Oid  userid,
Oid  relid 
)

Definition at line 233 of file launcher.c.

References ApplyWorkerMain(), BackgroundWorker::bgw_flags, BackgroundWorker::bgw_main, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BGW_NEVER_RESTART, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, LogicalRepWorker::dbid, ereport, errcode(), errhint(), errmsg(), ERROR, InvalidXLogRecPtr, LogicalRepWorker::last_lsn, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, LOG, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, max_replication_slots, MyProcPid, NULL, OidIsValid, LogicalRepWorker::proc, RegisterDynamicBackgroundWorker(), LogicalRepWorker::relid, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, LogicalRepWorker::reply_lsn, LogicalRepWorker::reply_time, snprintf(), LogicalRepWorker::subid, SUBREL_STATE_UNKNOWN, TIMESTAMP_NOBEGIN, LogicalRepWorker::userid, WaitForReplicationWorkerAttach(), WARNING, and LogicalRepCtxStruct::workers.

Referenced by ApplyLauncherMain(), and process_syncing_tables_for_apply().

235 {
236  BackgroundWorker bgw;
237  BackgroundWorkerHandle *bgw_handle;
238  int slot;
239  LogicalRepWorker *worker = NULL;
240 
241  ereport(LOG,
242  (errmsg("starting logical replication worker for subscription \"%s\"",
243  subname)));
244 
245  /* Report this after the initial starting message for consistency. */
246  if (max_replication_slots == 0)
247  ereport(ERROR,
248  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
249  errmsg("cannot start logical replication workers when max_replication_slots = 0")));
250 
251  /*
252  * We need to do the modification of the shared memory under lock so that
253  * we have consistent view.
254  */
255  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
256 
257  /* Find unused worker slot. */
258  for (slot = 0; slot < max_logical_replication_workers; slot++)
259  {
260  if (!LogicalRepCtx->workers[slot].proc)
261  {
262  worker = &LogicalRepCtx->workers[slot];
263  break;
264  }
265  }
266 
267  /* Bail if not found */
268  if (worker == NULL)
269  {
270  LWLockRelease(LogicalRepWorkerLock);
272  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
273  errmsg("out of logical replication workers slots"),
274  errhint("You might need to increase max_logical_replication_workers.")));
275  return;
276  }
277 
278  /* Prepare the worker info. */
279  worker->proc = NULL;
280  worker->dbid = dbid;
281  worker->userid = userid;
282  worker->subid = subid;
283  worker->relid = relid;
284  worker->relstate = SUBREL_STATE_UNKNOWN;
286  worker->last_lsn = InvalidXLogRecPtr;
289  worker->reply_lsn = InvalidXLogRecPtr;
290  TIMESTAMP_NOBEGIN(worker->reply_time);
291 
292  LWLockRelease(LogicalRepWorkerLock);
293 
294  /* Register the new dynamic worker. */
299  if (OidIsValid(relid))
301  "logical replication worker for subscription %u sync %u", subid, relid);
302  else
304  "logical replication worker for subscription %u", subid);
305 
308  bgw.bgw_main_arg = slot;
309 
310  if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
311  {
313  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
314  errmsg("out of background workers slots"),
315  errhint("You might need to increase max_worker_processes.")));
316  return;
317  }
318 
319  /* Now wait until it attaches. */
320  WaitForReplicationWorkerAttach(worker, bgw_handle);
321 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:38
int errhint(const char *fmt,...)
Definition: elog.c:987
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:70
TimestampTz last_send_time
XLogRecPtr last_lsn
int bgw_restart_time
Definition: bgworker.h:93
int errcode(int sqlerrcode)
Definition: elog.c:575
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define LOG
Definition: elog.h:26
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:52
#define OidIsValid(objectId)
Definition: c.h:538
XLogRecPtr relstate_lsn
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
Datum bgw_main_arg
Definition: bgworker.h:97
#define ERROR
Definition: elog.h:43
bgworker_main_type bgw_main
Definition: bgworker.h:94
XLogRecPtr reply_lsn
#define SUBREL_STATE_UNKNOWN
#define BGW_NEVER_RESTART
Definition: bgworker.h:84
#define TIMESTAMP_NOBEGIN(j)
Definition: timestamp.h:112
#define ereport(elevel, rest)
Definition: elog.h:122
#define WARNING
Definition: elog.h:40
int max_replication_slots
Definition: slot.c:99
TimestampTz last_recv_time
#define NULL
Definition: c.h:229
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:90
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:59
int max_logical_replication_workers
Definition: launcher.c:59
#define BGW_MAXLEN
Definition: bgworker.h:85
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:92
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition: bgworker.c:899
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
int errmsg(const char *fmt,...)
Definition: elog.c:797
pid_t bgw_notify_pid
Definition: bgworker.h:99
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:73
void ApplyWorkerMain(Datum main_arg)
Definition: worker.c:1416
static bool WaitForReplicationWorkerAttach(LogicalRepWorker *worker, BackgroundWorkerHandle *handle)
Definition: launcher.c:160
TimestampTz reply_time
static void logicalrep_worker_onexit ( int  code,
Datum  arg 
)
static

Definition at line 488 of file launcher.c.

References logicalrep_worker_detach().

Referenced by logicalrep_worker_attach().

489 {
491 }
static void logicalrep_worker_detach(void)
Definition: launcher.c:469
void logicalrep_worker_sigterm ( SIGNAL_ARGS  )

Definition at line 495 of file launcher.c.

References got_SIGTERM, MyLatch, and SetLatch().

Referenced by ApplyLauncherMain(), and ApplyWorkerMain().

496 {
497  got_SIGTERM = true;
498 
499  /* Waken anything waiting on the process latch */
500  SetLatch(MyLatch);
501 }
void SetLatch(volatile Latch *latch)
Definition: latch.c:379
bool got_SIGTERM
Definition: launcher.c:78
struct Latch * MyLatch
Definition: globals.c:51
void logicalrep_worker_stop ( Oid  subid,
Oid  relid 
)

Definition at line 328 of file launcher.c.

References CHECK_FOR_INTERRUPTS, InvalidOid, logicalrep_worker_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyProc, PGPROC::pid, LogicalRepWorker::proc, proc_exit(), PGPROC::procLatch, ResetLatch(), LogicalRepWorker::subid, WAIT_EVENT_BGWORKER_SHUTDOWN, WAIT_EVENT_BGWORKER_STARTUP, WaitLatch(), WL_LATCH_SET, WL_POSTMASTER_DEATH, and WL_TIMEOUT.

Referenced by DropSubscription().

329 {
330  LogicalRepWorker *worker;
331 
332  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
333 
334  worker = logicalrep_worker_find(subid, relid, false);
335 
336  /* No worker, nothing to do. */
337  if (!worker)
338  {
339  LWLockRelease(LogicalRepWorkerLock);
340  return;
341  }
342 
343  /*
344  * If we found worker but it does not have proc set it is starting up,
345  * wait for it to finish and then kill it.
346  */
347  while (worker && !worker->proc)
348  {
349  int rc;
350 
351  LWLockRelease(LogicalRepWorkerLock);
352 
354 
355  /* Wait for signal. */
356  rc = WaitLatch(&MyProc->procLatch,
359 
360  /* emergency bailout if postmaster has died */
361  if (rc & WL_POSTMASTER_DEATH)
362  proc_exit(1);
363 
365 
366  /* Check worker status. */
367  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
368 
369  /*
370  * Worker is no longer associated with subscription. It must have
371  * exited, nothing more for us to do.
372  */
373  if (worker->subid == InvalidOid)
374  {
375  LWLockRelease(LogicalRepWorkerLock);
376  return;
377  }
378 
379  /* Worker has assigned proc, so it has started. */
380  if (worker->proc)
381  break;
382  }
383 
384  /* Now terminate the worker ... */
385  kill(worker->proc->pid, SIGTERM);
386  LWLockRelease(LogicalRepWorkerLock);
387 
388  /* ... and wait for it to die. */
389  for (;;)
390  {
391  int rc;
392 
393  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
394  if (!worker->proc)
395  {
396  LWLockRelease(LogicalRepWorkerLock);
397  break;
398  }
399  LWLockRelease(LogicalRepWorkerLock);
400 
402 
403  /* Wait for more work. */
404  rc = WaitLatch(&MyProc->procLatch,
405  WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
407 
408  /* emergency bailout if postmaster has died */
409  if (rc & WL_POSTMASTER_DEATH)
410  proc_exit(1);
411 
413  }
414 }
#define WL_TIMEOUT
Definition: latch.h:127
PGPROC * MyProc
Definition: proc.c:67
void proc_exit(int code)
Definition: ipc.c:99
void ResetLatch(volatile Latch *latch)
Definition: latch.c:461
Latch procLatch
Definition: proc.h:103
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:300
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
#define InvalidOid
Definition: postgres_ext.h:36
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:207
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
int pid
Definition: proc.h:108
#define WL_LATCH_SET
Definition: latch.h:124
void logicalrep_worker_wakeup ( Oid  subid,
Oid  relid 
)

Definition at line 420 of file launcher.c.

References logicalrep_worker_find(), logicalrep_worker_wakeup_ptr(), LW_SHARED, LWLockAcquire(), and LWLockRelease().

Referenced by pg_attribute_noreturn().

421 {
422  LogicalRepWorker *worker;
423 
424  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
425  worker = logicalrep_worker_find(subid, relid, true);
426  LWLockRelease(LogicalRepWorkerLock);
427 
428  if (worker)
430 }
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:436
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:207
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
void logicalrep_worker_wakeup_ptr ( LogicalRepWorker worker)

Definition at line 436 of file launcher.c.

References LogicalRepWorker::proc, PGPROC::procLatch, and SetLatch().

Referenced by logicalrep_worker_wakeup(), and process_syncing_tables_for_apply().

437 {
438  SetLatch(&worker->proc->procLatch);
439 }
Latch procLatch
Definition: proc.h:103
void SetLatch(volatile Latch *latch)
Definition: latch.c:379
Datum pg_stat_get_subscription ( PG_FUNCTION_ARGS  )

Definition at line 745 of file launcher.c.

References ReturnSetInfo::allowedModes, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, get_call_result_type(), i, Int32GetDatum, InvalidOid, IsA, IsBackendPid(), LogicalRepWorker::last_lsn, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, MemoryContextSwitchTo(), MemSet, NULL, ObjectIdGetDatum, OidIsValid, PG_ARGISNULL, PG_GETARG_OID, PG_STAT_GET_SUBSCRIPTION_COLS, PGPROC::pid, LogicalRepWorker::proc, LogicalRepWorker::relid, LogicalRepWorker::reply_lsn, LogicalRepWorker::reply_time, ReturnSetInfo::returnMode, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, LogicalRepWorker::subid, TimestampTzGetDatum, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, work_mem, LogicalRepCtxStruct::workers, and XLogRecPtrIsInvalid.

746 {
747 #define PG_STAT_GET_SUBSCRIPTION_COLS 8
748  Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
749  int i;
750  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
751  TupleDesc tupdesc;
752  Tuplestorestate *tupstore;
753  MemoryContext per_query_ctx;
754  MemoryContext oldcontext;
755 
756  /* check to see if caller supports us returning a tuplestore */
757  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
758  ereport(ERROR,
759  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
760  errmsg("set-valued function called in context that cannot accept a set")));
761  if (!(rsinfo->allowedModes & SFRM_Materialize))
762  ereport(ERROR,
763  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
764  errmsg("materialize mode required, but it is not " \
765  "allowed in this context")));
766 
767  /* Build a tuple descriptor for our result type */
768  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
769  elog(ERROR, "return type must be a row type");
770 
771  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
772  oldcontext = MemoryContextSwitchTo(per_query_ctx);
773 
774  tupstore = tuplestore_begin_heap(true, false, work_mem);
775  rsinfo->returnMode = SFRM_Materialize;
776  rsinfo->setResult = tupstore;
777  rsinfo->setDesc = tupdesc;
778 
779  MemoryContextSwitchTo(oldcontext);
780 
781  /* Make sure we get consistent view of the workers. */
782  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
783 
784  for (i = 0; i <= max_logical_replication_workers; i++)
785  {
786  /* for each row */
788  bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS];
789  int worker_pid;
790  LogicalRepWorker worker;
791 
792  memcpy(&worker, &LogicalRepCtx->workers[i],
793  sizeof(LogicalRepWorker));
794  if (!worker.proc || !IsBackendPid(worker.proc->pid))
795  continue;
796 
797  if (OidIsValid(subid) && worker.subid != subid)
798  continue;
799 
800  worker_pid = worker.proc->pid;
801 
802  MemSet(values, 0, sizeof(values));
803  MemSet(nulls, 0, sizeof(nulls));
804 
805  values[0] = ObjectIdGetDatum(worker.subid);
806  if (OidIsValid(worker.relid))
807  values[1] = ObjectIdGetDatum(worker.relid);
808  else
809  nulls[1] = true;
810  values[2] = Int32GetDatum(worker_pid);
811  if (XLogRecPtrIsInvalid(worker.last_lsn))
812  nulls[3] = true;
813  else
814  values[3] = LSNGetDatum(worker.last_lsn);
815  if (worker.last_send_time == 0)
816  nulls[4] = true;
817  else
818  values[4] = TimestampTzGetDatum(worker.last_send_time);
819  if (worker.last_recv_time == 0)
820  nulls[5] = true;
821  else
822  values[5] = TimestampTzGetDatum(worker.last_recv_time);
823  if (XLogRecPtrIsInvalid(worker.reply_lsn))
824  nulls[6] = true;
825  else
826  values[6] = LSNGetDatum(worker.reply_lsn);
827  if (worker.reply_time == 0)
828  nulls[7] = true;
829  else
830  values[7] = TimestampTzGetDatum(worker.reply_time);
831 
832  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
833 
834  /* If only a single subscription was requested, and we found it, break. */
835  if (OidIsValid(subid))
836  break;
837  }
838 
839  LWLockRelease(LogicalRepWorkerLock);
840 
841  /* clean up and return the tuplestore */
842  tuplestore_donestoring(tupstore);
843 
844  return (Datum) 0;
845 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:735
#define IsA(nodeptr, _type_)
Definition: nodes.h:557
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:70
TimestampTz last_send_time
XLogRecPtr last_lsn
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
#define MemSet(start, val, len)
Definition: c.h:857
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:538
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
XLogRecPtr reply_lsn
#define PG_GETARG_OID(n)
Definition: fmgr.h:240
#define ereport(elevel, rest)
Definition: elog.h:122
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:316
uintptr_t Datum
Definition: postgres.h:372
#define PG_STAT_GET_SUBSCRIPTION_COLS
int work_mem
Definition: globals.c:112
#define InvalidOid
Definition: postgres_ext.h:36
int allowedModes
Definition: execnodes.h:267
SetFunctionReturnMode returnMode
Definition: execnodes.h:269
TimestampTz last_recv_time
#define PG_ARGISNULL(n)
Definition: fmgr.h:174
#define NULL
Definition: c.h:229
int max_logical_replication_workers
Definition: launcher.c:59
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:201
Tuplestorestate * setResult
Definition: execnodes.h:272
static Datum values[MAXATTR]
Definition: bootstrap.c:162
ExprContext * econtext
Definition: execnodes.h:265
#define Int32GetDatum(X)
Definition: postgres.h:485
TupleDesc setDesc
Definition: execnodes.h:273
int errmsg(const char *fmt,...)
Definition: elog.c:797
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:73
int i
bool IsBackendPid(int pid)
Definition: procarray.c:2425
#define elog
Definition: elog.h:219
int pid
Definition: proc.h:108
TimestampTz reply_time
static bool WaitForReplicationWorkerAttach ( LogicalRepWorker worker,
BackgroundWorkerHandle handle 
)
static

Definition at line 160 of file launcher.c.

References BGWH_STARTED, BGWH_STOPPED, CHECK_FOR_INTERRUPTS, GetBackgroundWorkerPid(), MyLatch, LogicalRepWorker::proc, proc_exit(), ResetLatch(), status(), WAIT_EVENT_BGWORKER_STARTUP, WaitLatch(), WL_LATCH_SET, WL_POSTMASTER_DEATH, and WL_TIMEOUT.

Referenced by logicalrep_worker_launch().

162 {
164  int rc;
165 
166  for (;;)
167  {
168  pid_t pid;
169 
171 
172  status = GetBackgroundWorkerPid(handle, &pid);
173 
174  /*
175  * Worker started and attached to our shmem. This check is safe
176  * because only launcher ever starts the workers, so nobody can steal
177  * the worker slot.
178  */
179  if (status == BGWH_STARTED && worker->proc)
180  return true;
181  /* Worker didn't start or died before attaching to our shmem. */
182  if (status == BGWH_STOPPED)
183  return false;
184 
185  /*
186  * We need timeout because we generally don't get notified via latch
187  * about the worker attach.
188  */
189  rc = WaitLatch(MyLatch,
192 
193  if (rc & WL_POSTMASTER_DEATH)
194  proc_exit(1);
195 
197  }
198 
199  return false;
200 }
#define WL_TIMEOUT
Definition: latch.h:127
void proc_exit(int code)
Definition: ipc.c:99
void ResetLatch(volatile Latch *latch)
Definition: latch.c:461
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:300
BgwHandleStatus
Definition: bgworker.h:102
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
struct Latch * MyLatch
Definition: globals.c:51
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:224
#define WL_LATCH_SET
Definition: latch.h:124
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1004

Variable Documentation

bool got_SIGTERM = false

Definition at line 78 of file launcher.c.

Referenced by ApplyLauncherMain(), and logicalrep_worker_sigterm().

LogicalRepCtxStruct* LogicalRepCtx

Definition at line 73 of file launcher.c.

int max_sync_workers_per_subscription = 2

Definition at line 60 of file launcher.c.

Referenced by process_syncing_tables_for_apply().

bool on_commit_launcher_wakeup = false
static

Definition at line 79 of file launcher.c.

Referenced by ApplyLauncherWakeupAtCommit(), and AtCommit_ApplyLauncher().