PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
launcher.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * launcher.c
3  * PostgreSQL logical replication worker launcher process
4  *
5  * Copyright (c) 2016-2017, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/launcher.c
9  *
10  * NOTES
11  * This module contains the logical replication worker launcher which
12  * uses the background worker infrastructure to start the logical
13  * replication workers for every enabled subscription.
14  *
15  *-------------------------------------------------------------------------
16  */
17 
18 #include "postgres.h"
19 
20 #include "funcapi.h"
21 #include "miscadmin.h"
22 #include "pgstat.h"
23 
24 #include "access/heapam.h"
25 #include "access/htup.h"
26 #include "access/htup_details.h"
27 #include "access/xact.h"
28 
30 
31 #include "libpq/pqsignal.h"
32 
33 #include "postmaster/bgworker.h"
35 #include "postmaster/postmaster.h"
36 
39 #include "replication/slot.h"
41 
42 #include "storage/ipc.h"
43 #include "storage/proc.h"
44 #include "storage/procarray.h"
45 #include "storage/procsignal.h"
46 
47 #include "tcop/tcopprot.h"
48 
49 #include "utils/memutils.h"
50 #include "utils/pg_lsn.h"
51 #include "utils/ps_status.h"
52 #include "utils/timeout.h"
53 #include "utils/snapmgr.h"
54 
55 /* max sleep time between cycles (3min) */
56 #define DEFAULT_NAPTIME_PER_CYCLE 180000L
57 
60 
61 typedef struct LogicalRepCtxStruct
62 {
63  /* Supervisor process. */
64  pid_t launcher_pid;
65 
66  /* Background workers. */
67  LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
69 
71 
72 static void logicalrep_worker_onexit(int code, Datum arg);
73 static void logicalrep_worker_detach(void);
74 
75 bool got_SIGTERM = false;
76 static bool on_commit_launcher_wakeup = false;
77 
79 
80 
81 /*
82  * Load the list of subscriptions.
83  *
84  * Only the fields interesting for worker start/stop functions are filled for
85  * each subscription.
86  */
87 static List *
89 {
90  List *res = NIL;
91  Relation rel;
92  HeapScanDesc scan;
93  HeapTuple tup;
94  MemoryContext resultcxt;
95 
96  /* This is the context that we will allocate our output data in */
97  resultcxt = CurrentMemoryContext;
98 
99  /*
100  * Start a transaction so we can access pg_database, and get a snapshot.
101  * We don't have a use for the snapshot itself, but we're interested in
102  * the secondary effect that it sets RecentGlobalXmin. (This is critical
103  * for anything that reads heap pages, because HOT may decide to prune
104  * them even if the process doesn't attempt to modify any tuples.)
105  */
107  (void) GetTransactionSnapshot();
108 
110  scan = heap_beginscan_catalog(rel, 0, NULL);
111 
113  {
115  Subscription *sub;
116  MemoryContext oldcxt;
117 
118  /*
119  * Allocate our results in the caller's context, not the
120  * transaction's. We do this inside the loop, and restore the original
121  * context at the end, so that leaky things like heap_getnext() are
122  * not called in a potentially long-lived context.
123  */
124  oldcxt = MemoryContextSwitchTo(resultcxt);
125 
126  sub = (Subscription *) palloc(sizeof(Subscription));
127  sub->oid = HeapTupleGetOid(tup);
128  sub->dbid = subform->subdbid;
129  sub->owner = subform->subowner;
130  sub->enabled = subform->subenabled;
131  sub->name = pstrdup(NameStr(subform->subname));
132 
133  /* We don't fill fields we are not interested in. */
134  sub->conninfo = NULL;
135  sub->slotname = NULL;
136  sub->publications = NIL;
137 
138  res = lappend(res, sub);
139  MemoryContextSwitchTo(oldcxt);
140  }
141 
142  heap_endscan(scan);
144 
146 
147  return res;
148 }
149 
150 /*
151  * Wait for a background worker to start up and attach to the shmem context.
152  *
153  * This is like WaitForBackgroundWorkerStartup(), except that we wait for
154  * attaching, not just start and we also just exit if postmaster died.
155  */
156 static bool
158  BackgroundWorkerHandle *handle)
159 {
161  int rc;
162 
163  for (;;)
164  {
165  pid_t pid;
166 
168 
169  status = GetBackgroundWorkerPid(handle, &pid);
170 
171  /*
172  * Worker started and attached to our shmem. This check is safe
173  * because only launcher ever starts the workers, so nobody can steal
174  * the worker slot.
175  */
176  if (status == BGWH_STARTED && worker->proc)
177  return true;
178  /* Worker didn't start or died before attaching to our shmem. */
179  if (status == BGWH_STOPPED)
180  return false;
181 
182  /*
183  * We need timeout because we generally don't get notified via latch
184  * about the worker attach.
185  */
186  rc = WaitLatch(MyLatch,
189 
190  if (rc & WL_POSTMASTER_DEATH)
191  proc_exit(1);
192 
194  }
195 
196  return false;
197 }
198 
199 /*
200  * Walks the workers array and searches for one that matches given
201  * subscription id.
202  */
205 {
206  int i;
207  LogicalRepWorker *res = NULL;
208 
209  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
210  /* Search for attached worker for a given subscription id. */
211  for (i = 0; i < max_logical_replication_workers; i++)
212  {
213  LogicalRepWorker *w = &LogicalRepCtx->workers[i];
214  if (w->subid == subid && w->proc && IsBackendPid(w->proc->pid))
215  {
216  res = w;
217  break;
218  }
219  }
220 
221  return res;
222 }
223 
224 /*
225  * Start new apply background worker.
226  */
227 void
228 logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid)
229 {
230  BackgroundWorker bgw;
231  BackgroundWorkerHandle *bgw_handle;
232  int slot;
233  LogicalRepWorker *worker = NULL;
234 
235  ereport(LOG,
236  (errmsg("starting logical replication worker for subscription \"%s\"",
237  subname)));
238 
239  /* Report this after the initial starting message for consistency. */
240  if (max_replication_slots == 0)
241  ereport(ERROR,
242  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
243  errmsg("cannot start logical replication workers when max_replication_slots = 0")));
244 
245  /*
246  * We need to do the modification of the shared memory under lock so that
247  * we have consistent view.
248  */
249  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
250 
251  /* Find unused worker slot. */
252  for (slot = 0; slot < max_logical_replication_workers; slot++)
253  {
254  if (!LogicalRepCtx->workers[slot].proc)
255  {
256  worker = &LogicalRepCtx->workers[slot];
257  break;
258  }
259  }
260 
261  /* Bail if not found */
262  if (worker == NULL)
263  {
264  LWLockRelease(LogicalRepWorkerLock);
266  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
267  errmsg("out of logical replication workers slots"),
268  errhint("You might need to increase max_logical_replication_workers.")));
269  return;
270  }
271 
272  /* Prepare the worker info. */
273  memset(worker, 0, sizeof(LogicalRepWorker));
274  worker->dbid = dbid;
275  worker->userid = userid;
276  worker->subid = subid;
277 
278  LWLockRelease(LogicalRepWorkerLock);
279 
280  /* Register the new dynamic worker. */
286  "logical replication worker for subscription %u", subid);
287 
290  bgw.bgw_main_arg = slot;
291 
292  if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
293  {
295  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
296  errmsg("out of background workers slots"),
297  errhint("You might need to increase max_worker_processes.")));
298  return;
299  }
300 
301  /* Now wait until it attaches. */
302  WaitForReplicationWorkerAttach(worker, bgw_handle);
303 }
304 
305 /*
306  * Stop the logical replication worker and wait until it detaches from the
307  * slot.
308  *
309  * The caller must hold LogicalRepLauncherLock to ensure that new workers are
310  * not being started during this function call.
311  */
312 void
314 {
315  LogicalRepWorker *worker;
316 
317  Assert(LWLockHeldByMe(LogicalRepLauncherLock));
318 
319  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
320 
321  worker = logicalrep_worker_find(subid);
322 
323  /* No worker, nothing to do. */
324  if (!worker)
325  {
326  LWLockRelease(LogicalRepWorkerLock);
327  return;
328  }
329 
330  /*
331  * If we found worker but it does not have proc set it is starting up,
332  * wait for it to finish and then kill it.
333  */
334  while (worker && !worker->proc)
335  {
336  int rc;
337 
338  LWLockRelease(LogicalRepWorkerLock);
339 
341 
342  /* Wait for signal. */
343  rc = WaitLatch(&MyProc->procLatch,
346 
347  /* emergency bailout if postmaster has died */
348  if (rc & WL_POSTMASTER_DEATH)
349  proc_exit(1);
350 
352 
353  /* Check worker status. */
354  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
355 
356  /*
357  * Worker is no longer associated with subscription. It must have
358  * exited, nothing more for us to do.
359  */
360  if (worker->subid == InvalidOid)
361  {
362  LWLockRelease(LogicalRepWorkerLock);
363  return;
364  }
365 
366  /* Worker has assigned proc, so it has started. */
367  if (worker->proc)
368  break;
369  }
370 
371  /* Now terminate the worker ... */
372  kill(worker->proc->pid, SIGTERM);
373  LWLockRelease(LogicalRepWorkerLock);
374 
375  /* ... and wait for it to die. */
376  for (;;)
377  {
378  int rc;
379 
380  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
381  if (!worker->proc)
382  {
383  LWLockRelease(LogicalRepWorkerLock);
384  break;
385  }
386  LWLockRelease(LogicalRepWorkerLock);
387 
389 
390  /* Wait for more work. */
391  rc = WaitLatch(&MyProc->procLatch,
394 
395  /* emergency bailout if postmaster has died */
396  if (rc & WL_POSTMASTER_DEATH)
397  proc_exit(1);
398 
400  }
401 }
402 
403 /*
404  * Attach to a slot.
405  */
406 void
408 {
409  /* Block concurrent access. */
410  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
411 
412  Assert(slot >= 0 && slot < max_logical_replication_workers);
413  MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
414 
415  if (MyLogicalRepWorker->proc)
416  ereport(ERROR,
417  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
418  errmsg("logical replication worker slot %d already used by "
419  "another worker", slot)));
420 
421  MyLogicalRepWorker->proc = MyProc;
423 
424  LWLockRelease(LogicalRepWorkerLock);
425 }
426 
427 /*
428  * Detach the worker (cleans up the worker info).
429  */
430 static void
432 {
433  /* Block concurrent access. */
434  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
435 
436  MyLogicalRepWorker->dbid = InvalidOid;
437  MyLogicalRepWorker->userid = InvalidOid;
438  MyLogicalRepWorker->subid = InvalidOid;
439  MyLogicalRepWorker->proc = NULL;
440 
441  LWLockRelease(LogicalRepWorkerLock);
442 }
443 
444 /*
445  * Cleanup function.
446  *
447  * Called on logical replication worker exit.
448  */
449 static void
451 {
453 }
454 
455 /* SIGTERM: set flag to exit at next convenient time */
456 void
458 {
459  got_SIGTERM = true;
460 
461  /* Waken anything waiting on the process latch */
462  SetLatch(MyLatch);
463 }
464 
465 /*
466  * ApplyLauncherShmemSize
467  * Compute space needed for replication launcher shared memory
468  */
469 Size
471 {
472  Size size;
473 
474  /*
475  * Need the fixed struct and the array of LogicalRepWorker.
476  */
477  size = sizeof(LogicalRepCtxStruct);
478  size = MAXALIGN(size);
480  sizeof(LogicalRepWorker)));
481  return size;
482 }
483 
484 void
486 {
487  BackgroundWorker bgw;
488 
490  return;
491 
497  "logical replication launcher");
498  bgw.bgw_restart_time = 5;
499  bgw.bgw_notify_pid = 0;
500  bgw.bgw_main_arg = (Datum) 0;
501 
503 }
504 
505 /*
506  * ApplyLauncherShmemInit
507  * Allocate and initialize replication launcher shared memory
508  */
509 void
511 {
512  bool found;
513 
514  LogicalRepCtx = (LogicalRepCtxStruct *)
515  ShmemInitStruct("Logical Replication Launcher Data",
517  &found);
518 
519  if (!found)
520  memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
521 }
522 
523 /*
524  * Wakeup the launcher on commit if requested.
525  */
526 void
528 {
531 }
532 
533 /*
534  * Request wakeup of the launcher on commit of the transaction.
535  *
536  * This is used to send launcher signal to stop sleeping and process the
537  * subscriptions when current transaction commits. Should be used when new
538  * tuple was added to the pg_subscription catalog.
539 */
540 void
542 {
545 }
546 
547 void
549 {
550  if (IsBackendPid(LogicalRepCtx->launcher_pid))
551  kill(LogicalRepCtx->launcher_pid, SIGUSR1);
552 }
553 
554 /*
555  * Main loop for the apply launcher process.
556  */
557 void
559 {
560  ereport(LOG,
561  (errmsg("logical replication launcher started")));
562 
563  /* Establish signal handlers. */
566 
567  /* Make it easy to identify our processes. */
568  SetConfigOption("application_name", MyBgworkerEntry->bgw_name,
570 
571  LogicalRepCtx->launcher_pid = MyProcPid;
572 
573  /*
574  * Establish connection to nailed catalogs (we only ever access
575  * pg_subscription).
576  */
578 
579  /* Enter main loop */
580  while (!got_SIGTERM)
581  {
582  int rc;
583  List *sublist;
584  ListCell *lc;
585  MemoryContext subctx;
586  MemoryContext oldctx;
588  TimestampTz last_start_time = 0;
589  long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
590 
591  now = GetCurrentTimestamp();
592 
593  /* Limit the start retry to once a wal_retrieve_retry_interval */
594  if (TimestampDifferenceExceeds(last_start_time, now,
596  {
597  /* Use temporary context for the database list and worker info. */
599  "Logical Replication Launcher sublist",
603  oldctx = MemoryContextSwitchTo(subctx);
604 
605  /* Block any concurrent DROP SUBSCRIPTION. */
606  LWLockAcquire(LogicalRepLauncherLock, LW_EXCLUSIVE);
607 
608  /* search for subscriptions to start or stop. */
609  sublist = get_subscription_list();
610 
611  /* Start the missing workers for enabled subscriptions. */
612  foreach(lc, sublist)
613  {
614  Subscription *sub = (Subscription *) lfirst(lc);
615  LogicalRepWorker *w;
616 
617  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
618  w = logicalrep_worker_find(sub->oid);
619  LWLockRelease(LogicalRepWorkerLock);
620 
621  if (sub->enabled && w == NULL)
622  {
623  logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, sub->owner);
624  last_start_time = now;
625  wait_time = wal_retrieve_retry_interval;
626  /* Limit to one worker per mainloop cycle. */
627  break;
628  }
629  }
630 
631  LWLockRelease(LogicalRepLauncherLock);
632 
633  /* Switch back to original memory context. */
634  MemoryContextSwitchTo(oldctx);
635  /* Clean the temporary memory. */
636  MemoryContextDelete(subctx);
637  }
638  else
639  {
640  /*
641  * The wait in previous cycle was interrupted in less than
642  * wal_retrieve_retry_interval since last worker was started,
643  * this usually means crash of the worker, so we should retry
644  * in wal_retrieve_retry_interval again.
645  */
646  wait_time = wal_retrieve_retry_interval;
647  }
648 
649  /* Wait for more work. */
650  rc = WaitLatch(&MyProc->procLatch,
652  wait_time,
654 
655  /* emergency bailout if postmaster has died */
656  if (rc & WL_POSTMASTER_DEATH)
657  proc_exit(1);
658 
660  }
661 
662  LogicalRepCtx->launcher_pid = 0;
663 
664  /* ... and if it returns, we're done */
665  ereport(LOG,
666  (errmsg("logical replication launcher shutting down")));
667 
668  proc_exit(0);
669 }
670 
671 /*
672  * Returns state of the subscriptions.
673  */
674 Datum
676 {
677 #define PG_STAT_GET_SUBSCRIPTION_COLS 7
678  Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
679  int i;
680  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
681  TupleDesc tupdesc;
682  Tuplestorestate *tupstore;
683  MemoryContext per_query_ctx;
684  MemoryContext oldcontext;
685 
686  /* check to see if caller supports us returning a tuplestore */
687  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
688  ereport(ERROR,
689  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
690  errmsg("set-valued function called in context that cannot accept a set")));
691  if (!(rsinfo->allowedModes & SFRM_Materialize))
692  ereport(ERROR,
693  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
694  errmsg("materialize mode required, but it is not " \
695  "allowed in this context")));
696 
697  /* Build a tuple descriptor for our result type */
698  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
699  elog(ERROR, "return type must be a row type");
700 
701  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
702  oldcontext = MemoryContextSwitchTo(per_query_ctx);
703 
704  tupstore = tuplestore_begin_heap(true, false, work_mem);
705  rsinfo->returnMode = SFRM_Materialize;
706  rsinfo->setResult = tupstore;
707  rsinfo->setDesc = tupdesc;
708 
709  MemoryContextSwitchTo(oldcontext);
710 
711  /* Make sure we get consistent view of the workers. */
712  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
713 
714  for (i = 0; i <= max_logical_replication_workers; i++)
715  {
716  /* for each row */
718  bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS];
719  int worker_pid;
720  LogicalRepWorker worker;
721 
722  memcpy(&worker, &LogicalRepCtx->workers[i],
723  sizeof(LogicalRepWorker));
724  if (!worker.proc || !IsBackendPid(worker.proc->pid))
725  continue;
726 
727  if (OidIsValid(subid) && worker.subid != subid)
728  continue;
729 
730  worker_pid = worker.proc->pid;
731 
732  MemSet(values, 0, sizeof(values));
733  MemSet(nulls, 0, sizeof(nulls));
734 
735  values[0] = ObjectIdGetDatum(worker.subid);
736  values[1] = Int32GetDatum(worker_pid);
737  if (XLogRecPtrIsInvalid(worker.last_lsn))
738  nulls[2] = true;
739  else
740  values[2] = LSNGetDatum(worker.last_lsn);
741  if (worker.last_send_time == 0)
742  nulls[3] = true;
743  else
744  values[3] = TimestampTzGetDatum(worker.last_send_time);
745  if (worker.last_recv_time == 0)
746  nulls[4] = true;
747  else
748  values[4] = TimestampTzGetDatum(worker.last_recv_time);
749  if (XLogRecPtrIsInvalid(worker.reply_lsn))
750  nulls[5] = true;
751  else
752  values[5] = LSNGetDatum(worker.reply_lsn);
753  if (worker.reply_time == 0)
754  nulls[6] = true;
755  else
756  values[6] = TimestampTzGetDatum(worker.reply_time);
757 
758  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
759 
760  /* If only a single subscription was requested, and we found it, break. */
761  if (OidIsValid(subid))
762  break;
763  }
764 
765  LWLockRelease(LogicalRepWorkerLock);
766 
767  /* clean up and return the tuplestore */
768  tuplestore_donestoring(tupstore);
769 
770  return (Datum) 0;
771 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:735
#define NIL
Definition: pg_list.h:69
void ApplyLauncherWakeup(void)
Definition: launcher.c:548
#define SIGUSR1
Definition: win32.h:211
#define IsA(nodeptr, _type_)
Definition: nodes.h:559
void RegisterBackgroundWorker(BackgroundWorker *worker)
Definition: bgworker.c:770
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
int MyProcPid
Definition: globals.c:38
int errhint(const char *fmt,...)
Definition: elog.c:987
#define GETSTRUCT(TUP)
Definition: htup_details.h:656
void heap_endscan(HeapScanDesc scan)
Definition: heapam.c:1581
#define WL_TIMEOUT
Definition: latch.h:127
void ApplyLauncherMain(Datum main_arg)
Definition: launcher.c:558
bool LWLockHeldByMe(LWLock *l)
Definition: lwlock.c:1830
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1165
void CommitTransactionCommand(void)
Definition: xact.c:2745
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define AccessShareLock
Definition: lockdefs.h:36
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:67
TimestampTz last_send_time
XLogRecPtr last_lsn
int bgw_restart_time
Definition: bgworker.h:93
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
#define MemSet(start, val, len)
Definition: c.h:853
BackgroundWorker * MyBgworkerEntry
Definition: postmaster.c:189
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define heap_close(r, l)
Definition: heapam.h:97
FormData_pg_subscription * Form_pg_subscription
void ResetLatch(volatile Latch *latch)
Definition: latch.c:462
#define LOG
Definition: elog.h:26
unsigned int Oid
Definition: postgres_ext.h:31
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:52
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1648
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:300
#define OidIsValid(objectId)
Definition: c.h:534
#define ALLOCSET_DEFAULT_MINSIZE
Definition: memutils.h:142
Latch procLatch
Definition: proc.h:93
static void logicalrep_worker_detach(void)
Definition: launcher.c:431
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1714
Datum bgw_main_arg
Definition: bgworker.h:97
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:301
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:59
#define ObjectIdGetDatum(X)
Definition: postgres.h:515
#define ERROR
Definition: elog.h:43
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
bgworker_main_type bgw_main
Definition: bgworker.h:94
XLogRecPtr reply_lsn
#define SubscriptionRelationId
void logicalrep_worker_attach(int slot)
Definition: launcher.c:407
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:6629
#define PG_GETARG_OID(n)
Definition: fmgr.h:231
Size ApplyLauncherShmemSize(void)
Definition: launcher.c:470
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:320
LogicalRepWorker * logicalrep_worker_find(Oid subid)
Definition: launcher.c:204
#define BGW_NEVER_RESTART
Definition: bgworker.h:84
List * publications
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
HeapScanDesc heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key)
Definition: heapam.c:1402
BgwHandleStatus
Definition: bgworker.h:102
static bool on_commit_launcher_wakeup
Definition: launcher.c:76
#define ereport(elevel, rest)
Definition: elog.h:122
MemoryContext TopMemoryContext
Definition: mcxt.c:43
List * lappend(List *list, void *datum)
Definition: list.c:128
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define WARNING
Definition: elog.h:40
void AtCommit_ApplyLauncher(void)
Definition: launcher.c:527
int wal_retrieve_retry_interval
Definition: xlog.c:106
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:316
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
static void logicalrep_worker_onexit(int code, Datum arg)
Definition: launcher.c:450
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:440
#define DEFAULT_NAPTIME_PER_CYCLE
Definition: launcher.c:56
uintptr_t Datum
Definition: postgres.h:374
void ApplyLauncherRegister(void)
Definition: launcher.c:485
#define PG_STAT_GET_SUBSCRIPTION_COLS
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction)
Definition: heapam.c:1781
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1287
void logicalrep_worker_sigterm(SIGNAL_ARGS)
Definition: launcher.c:457
int work_mem
Definition: globals.c:112
#define InvalidOid
Definition: postgres_ext.h:36
int allowedModes
Definition: execnodes.h:199
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
static List * get_subscription_list(void)
Definition: launcher.c:88
SetFunctionReturnMode returnMode
Definition: execnodes.h:201
int max_replication_slots
Definition: slot.c:98
void SetLatch(volatile Latch *latch)
Definition: latch.c:380
TimestampTz last_recv_time
#define PG_ARGISNULL(n)
Definition: fmgr.h:166
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define SIGNAL_ARGS
Definition: c.h:1079
#define NULL
Definition: c.h:226
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:90
#define Assert(condition)
Definition: c.h:671
#define lfirst(lc)
Definition: pg_list.h:106
bool got_SIGTERM
Definition: launcher.c:75
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:59
void StartTransactionCommand(void)
Definition: xact.c:2675
int max_logical_replication_workers
Definition: launcher.c:58
#define BGW_MAXLEN
Definition: bgworker.h:85
size_t Size
Definition: c.h:353
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:92
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition: bgworker.c:864
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1110
#define MAXALIGN(LEN)
Definition: c.h:584
void ApplyLauncherShmemInit(void)
Definition: launcher.c:510
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:133
struct LogicalRepCtxStruct LogicalRepCtxStruct
Tuplestorestate * setResult
Definition: execnodes.h:204
static Datum values[MAXATTR]
Definition: bootstrap.c:162
ExprContext * econtext
Definition: execnodes.h:197
#define Int32GetDatum(X)
Definition: postgres.h:487
TupleDesc setDesc
Definition: execnodes.h:205
void * palloc(Size size)
Definition: mcxt.c:891
int errmsg(const char *fmt,...)
Definition: elog.c:797
pid_t bgw_notify_pid
Definition: bgworker.h:99
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:70
#define ALLOCSET_DEFAULT_INITSIZE
Definition: memutils.h:143
int i
#define NameStr(name)
Definition: c.h:495
bool IsBackendPid(int pid)
Definition: procarray.c:2424
void logicalrep_worker_stop(Oid subid)
Definition: launcher.c:313
void * arg
struct Latch * MyLatch
Definition: globals.c:51
#define PG_FUNCTION_ARGS
Definition: fmgr.h:150
#define ALLOCSET_DEFAULT_MAXSIZE
Definition: memutils.h:144
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
Datum pg_stat_get_subscription(PG_FUNCTION_ARGS)
Definition: launcher.c:675
#define elog
Definition: elog.h:219
#define HeapTupleGetOid(tuple)
Definition: htup_details.h:695
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:222
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:541
Definition: pg_list.h:45
int pid
Definition: proc.h:98
#define WL_LATCH_SET
Definition: latch.h:124
void BackgroundWorkerInitializeConnection(char *dbname, char *username)
Definition: postmaster.c:5445
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:969
void ApplyWorkerMain(Datum main_arg)
Definition: worker.c:1308
static bool WaitForReplicationWorkerAttach(LogicalRepWorker *worker, BackgroundWorkerHandle *handle)
Definition: launcher.c:157
TimestampTz reply_time
void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid)
Definition: launcher.c:228
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5497