PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
launcher.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * launcher.c
3  * PostgreSQL logical replication worker launcher process
4  *
5  * Copyright (c) 2016-2017, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/launcher.c
9  *
10  * NOTES
11  * This module contains the logical replication worker launcher which
12  * uses the background worker infrastructure to start the logical
13  * replication workers for every enabled subscription.
14  *
15  *-------------------------------------------------------------------------
16  */
17 
18 #include "postgres.h"
19 
20 #include "funcapi.h"
21 #include "miscadmin.h"
22 #include "pgstat.h"
23 
24 #include "access/heapam.h"
25 #include "access/htup.h"
26 #include "access/htup_details.h"
27 #include "access/xact.h"
28 
30 
31 #include "libpq/pqsignal.h"
32 
33 #include "postmaster/bgworker.h"
35 #include "postmaster/postmaster.h"
36 
39 #include "replication/slot.h"
41 
42 #include "storage/ipc.h"
43 #include "storage/proc.h"
44 #include "storage/procarray.h"
45 #include "storage/procsignal.h"
46 
47 #include "tcop/tcopprot.h"
48 
49 #include "utils/memutils.h"
50 #include "utils/pg_lsn.h"
51 #include "utils/ps_status.h"
52 #include "utils/timeout.h"
53 #include "utils/snapmgr.h"
54 
55 /* max sleep time between cycles (3min) */
56 #define DEFAULT_NAPTIME_PER_CYCLE 180000L
57 
60 
61 typedef struct LogicalRepCtxStruct
62 {
63  /* Supervisor process. */
64  pid_t launcher_pid;
65 
66  /* Background workers. */
67  LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
69 
71 
72 static void logicalrep_worker_onexit(int code, Datum arg);
73 static void logicalrep_worker_detach(void);
74 
75 bool got_SIGTERM = false;
76 static bool on_commit_launcher_wakeup = false;
77 
79 
80 
81 /*
82  * Load the list of subscriptions.
83  *
84  * Only the fields interesting for worker start/stop functions are filled for
85  * each subscription.
86  */
87 static List *
89 {
90  List *res = NIL;
91  Relation rel;
92  HeapScanDesc scan;
93  HeapTuple tup;
94  MemoryContext resultcxt;
95 
96  /* This is the context that we will allocate our output data in */
97  resultcxt = CurrentMemoryContext;
98 
99  /*
100  * Start a transaction so we can access pg_database, and get a snapshot.
101  * We don't have a use for the snapshot itself, but we're interested in
102  * the secondary effect that it sets RecentGlobalXmin. (This is critical
103  * for anything that reads heap pages, because HOT may decide to prune
104  * them even if the process doesn't attempt to modify any tuples.)
105  */
107  (void) GetTransactionSnapshot();
108 
110  scan = heap_beginscan_catalog(rel, 0, NULL);
111 
113  {
115  Subscription *sub;
116  MemoryContext oldcxt;
117 
118  /*
119  * Allocate our results in the caller's context, not the
120  * transaction's. We do this inside the loop, and restore the original
121  * context at the end, so that leaky things like heap_getnext() are
122  * not called in a potentially long-lived context.
123  */
124  oldcxt = MemoryContextSwitchTo(resultcxt);
125 
126  sub = (Subscription *) palloc(sizeof(Subscription));
127  sub->oid = HeapTupleGetOid(tup);
128  sub->dbid = subform->subdbid;
129  sub->owner = subform->subowner;
130  sub->enabled = subform->subenabled;
131  sub->name = pstrdup(NameStr(subform->subname));
132 
133  /* We don't fill fields we are not interested in. */
134  sub->conninfo = NULL;
135  sub->slotname = NULL;
136  sub->publications = NIL;
137 
138  res = lappend(res, sub);
139  MemoryContextSwitchTo(oldcxt);
140  }
141 
142  heap_endscan(scan);
144 
146 
147  return res;
148 }
149 
150 /*
151  * Wait for a background worker to start up and attach to the shmem context.
152  *
153  * This is like WaitForBackgroundWorkerStartup(), except that we wait for
154  * attaching, not just start and we also just exit if postmaster died.
155  */
156 static bool
158  BackgroundWorkerHandle *handle)
159 {
161  int rc;
162 
163  for (;;)
164  {
165  pid_t pid;
166 
168 
169  status = GetBackgroundWorkerPid(handle, &pid);
170 
171  /*
172  * Worker started and attached to our shmem. This check is safe
173  * because only launcher ever starts the workers, so nobody can steal
174  * the worker slot.
175  */
176  if (status == BGWH_STARTED && worker->proc)
177  return true;
178  /* Worker didn't start or died before attaching to our shmem. */
179  if (status == BGWH_STOPPED)
180  return false;
181 
182  /*
183  * We need timeout because we generally don't get notified via latch
184  * about the worker attach.
185  */
186  rc = WaitLatch(MyLatch,
189 
190  if (rc & WL_POSTMASTER_DEATH)
191  proc_exit(1);
192 
194  }
195 
196  return false;
197 }
198 
199 /*
200  * Walks the workers array and searches for one that matches given
201  * subscription id.
202  */
205 {
206  int i;
207  LogicalRepWorker *res = NULL;
208 
209  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
210  /* Search for attached worker for a given subscription id. */
211  for (i = 0; i < max_logical_replication_workers; i++)
212  {
213  LogicalRepWorker *w = &LogicalRepCtx->workers[i];
214  if (w->subid == subid && w->proc && IsBackendPid(w->proc->pid))
215  {
216  res = w;
217  break;
218  }
219  }
220 
221  return res;
222 }
223 
224 /*
225  * Start new apply background worker.
226  */
227 void
228 logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid)
229 {
230  BackgroundWorker bgw;
231  BackgroundWorkerHandle *bgw_handle;
232  int slot;
233  LogicalRepWorker *worker = NULL;
234 
235  ereport(LOG,
236  (errmsg("starting logical replication worker for subscription \"%s\"",
237  subname)));
238 
239  /* Report this after the initial starting message for consistency. */
240  if (max_replication_slots == 0)
241  ereport(ERROR,
242  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
243  errmsg("cannot start logical replication workers when max_replication_slots = 0")));
244 
245  /*
246  * We need to do the modification of the shared memory under lock so that
247  * we have consistent view.
248  */
249  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
250 
251  /* Find unused worker slot. */
252  for (slot = 0; slot < max_logical_replication_workers; slot++)
253  {
254  if (!LogicalRepCtx->workers[slot].proc)
255  {
256  worker = &LogicalRepCtx->workers[slot];
257  break;
258  }
259  }
260 
261  /* Bail if not found */
262  if (worker == NULL)
263  {
264  LWLockRelease(LogicalRepWorkerLock);
266  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
267  errmsg("out of logical replication workers slots"),
268  errhint("You might need to increase max_logical_replication_workers.")));
269  return;
270  }
271 
272  /* Prepare the worker info. */
273  memset(worker, 0, sizeof(LogicalRepWorker));
274  worker->dbid = dbid;
275  worker->userid = userid;
276  worker->subid = subid;
277 
278  LWLockRelease(LogicalRepWorkerLock);
279 
280  /* Register the new dynamic worker. */
286  "logical replication worker for subscription %u", subid);
287 
290  bgw.bgw_main_arg = slot;
291 
292  if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
293  {
295  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
296  errmsg("out of background workers slots"),
297  errhint("You might need to increase max_worker_processes.")));
298  return;
299  }
300 
301  /* Now wait until it attaches. */
302  WaitForReplicationWorkerAttach(worker, bgw_handle);
303 }
304 
305 /*
306  * Stop the logical replication worker and wait until it detaches from the
307  * slot.
308  */
309 void
311 {
312  LogicalRepWorker *worker;
313 
314  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
315 
316  worker = logicalrep_worker_find(subid);
317 
318  /* No worker, nothing to do. */
319  if (!worker)
320  {
321  LWLockRelease(LogicalRepWorkerLock);
322  return;
323  }
324 
325  /*
326  * If we found worker but it does not have proc set it is starting up,
327  * wait for it to finish and then kill it.
328  */
329  while (worker && !worker->proc)
330  {
331  int rc;
332 
333  LWLockRelease(LogicalRepWorkerLock);
334 
336 
337  /* Wait for signal. */
338  rc = WaitLatch(&MyProc->procLatch,
341 
342  /* emergency bailout if postmaster has died */
343  if (rc & WL_POSTMASTER_DEATH)
344  proc_exit(1);
345 
347 
348  /* Check worker status. */
349  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
350 
351  /*
352  * Worker is no longer associated with subscription. It must have
353  * exited, nothing more for us to do.
354  */
355  if (worker->subid == InvalidOid)
356  {
357  LWLockRelease(LogicalRepWorkerLock);
358  return;
359  }
360 
361  /* Worker has assigned proc, so it has started. */
362  if (worker->proc)
363  break;
364  }
365 
366  /* Now terminate the worker ... */
367  kill(worker->proc->pid, SIGTERM);
368  LWLockRelease(LogicalRepWorkerLock);
369 
370  /* ... and wait for it to die. */
371  for (;;)
372  {
373  int rc;
374 
375  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
376  if (!worker->proc)
377  {
378  LWLockRelease(LogicalRepWorkerLock);
379  break;
380  }
381  LWLockRelease(LogicalRepWorkerLock);
382 
384 
385  /* Wait for more work. */
386  rc = WaitLatch(&MyProc->procLatch,
389 
390  /* emergency bailout if postmaster has died */
391  if (rc & WL_POSTMASTER_DEATH)
392  proc_exit(1);
393 
395  }
396 }
397 
398 /*
399  * Attach to a slot.
400  */
401 void
403 {
404  /* Block concurrent access. */
405  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
406 
407  Assert(slot >= 0 && slot < max_logical_replication_workers);
408  MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
409 
410  if (MyLogicalRepWorker->proc)
411  ereport(ERROR,
412  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
413  errmsg("logical replication worker slot %d already used by "
414  "another worker", slot)));
415 
416  MyLogicalRepWorker->proc = MyProc;
418 
419  LWLockRelease(LogicalRepWorkerLock);
420 }
421 
422 /*
423  * Detach the worker (cleans up the worker info).
424  */
425 static void
427 {
428  /* Block concurrent access. */
429  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
430 
431  MyLogicalRepWorker->dbid = InvalidOid;
432  MyLogicalRepWorker->userid = InvalidOid;
433  MyLogicalRepWorker->subid = InvalidOid;
434  MyLogicalRepWorker->proc = NULL;
435 
436  LWLockRelease(LogicalRepWorkerLock);
437 }
438 
439 /*
440  * Cleanup function.
441  *
442  * Called on logical replication worker exit.
443  */
444 static void
446 {
448 }
449 
450 /* SIGTERM: set flag to exit at next convenient time */
451 void
453 {
454  got_SIGTERM = true;
455 
456  /* Waken anything waiting on the process latch */
457  SetLatch(MyLatch);
458 }
459 
460 /*
461  * ApplyLauncherShmemSize
462  * Compute space needed for replication launcher shared memory
463  */
464 Size
466 {
467  Size size;
468 
469  /*
470  * Need the fixed struct and the array of LogicalRepWorker.
471  */
472  size = sizeof(LogicalRepCtxStruct);
473  size = MAXALIGN(size);
475  sizeof(LogicalRepWorker)));
476  return size;
477 }
478 
479 void
481 {
482  BackgroundWorker bgw;
483 
485  return;
486 
492  "logical replication launcher");
493  bgw.bgw_restart_time = 5;
494  bgw.bgw_notify_pid = 0;
495  bgw.bgw_main_arg = (Datum) 0;
496 
498 }
499 
500 /*
501  * ApplyLauncherShmemInit
502  * Allocate and initialize replication launcher shared memory
503  */
504 void
506 {
507  bool found;
508 
509  LogicalRepCtx = (LogicalRepCtxStruct *)
510  ShmemInitStruct("Logical Replication Launcher Data",
512  &found);
513 
514  if (!found)
515  memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
516 }
517 
518 /*
519  * Wakeup the launcher on commit if requested.
520  */
521 void
523 {
526 }
527 
528 /*
529  * Request wakeup of the launcher on commit of the transaction.
530  *
531  * This is used to send launcher signal to stop sleeping and process the
532  * subscriptions when current transaction commits. Should be used when new
533  * tuple was added to the pg_subscription catalog.
534 */
535 void
537 {
540 }
541 
542 void
544 {
545  if (IsBackendPid(LogicalRepCtx->launcher_pid))
546  kill(LogicalRepCtx->launcher_pid, SIGUSR1);
547 }
548 
549 /*
550  * Main loop for the apply launcher process.
551  */
552 void
554 {
555  ereport(DEBUG1,
556  (errmsg("logical replication launcher started")));
557 
558  /* Establish signal handlers. */
561 
562  /* Make it easy to identify our processes. */
563  SetConfigOption("application_name", MyBgworkerEntry->bgw_name,
565 
566  LogicalRepCtx->launcher_pid = MyProcPid;
567 
568  /*
569  * Establish connection to nailed catalogs (we only ever access
570  * pg_subscription).
571  */
573 
574  /* Enter main loop */
575  while (!got_SIGTERM)
576  {
577  int rc;
578  List *sublist;
579  ListCell *lc;
580  MemoryContext subctx;
581  MemoryContext oldctx;
583  TimestampTz last_start_time = 0;
584  long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
585 
586  now = GetCurrentTimestamp();
587 
588  /* Limit the start retry to once a wal_retrieve_retry_interval */
589  if (TimestampDifferenceExceeds(last_start_time, now,
591  {
592  /* Use temporary context for the database list and worker info. */
594  "Logical Replication Launcher sublist",
598  oldctx = MemoryContextSwitchTo(subctx);
599 
600  /* search for subscriptions to start or stop. */
601  sublist = get_subscription_list();
602 
603  /* Start the missing workers for enabled subscriptions. */
604  foreach(lc, sublist)
605  {
606  Subscription *sub = (Subscription *) lfirst(lc);
607  LogicalRepWorker *w;
608 
609  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
610  w = logicalrep_worker_find(sub->oid);
611  LWLockRelease(LogicalRepWorkerLock);
612 
613  if (sub->enabled && w == NULL)
614  {
615  logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, sub->owner);
616  last_start_time = now;
617  wait_time = wal_retrieve_retry_interval;
618  /* Limit to one worker per mainloop cycle. */
619  break;
620  }
621  }
622 
623  /* Switch back to original memory context. */
624  MemoryContextSwitchTo(oldctx);
625  /* Clean the temporary memory. */
626  MemoryContextDelete(subctx);
627  }
628  else
629  {
630  /*
631  * The wait in previous cycle was interrupted in less than
632  * wal_retrieve_retry_interval since last worker was started,
633  * this usually means crash of the worker, so we should retry
634  * in wal_retrieve_retry_interval again.
635  */
636  wait_time = wal_retrieve_retry_interval;
637  }
638 
639  /* Wait for more work. */
640  rc = WaitLatch(&MyProc->procLatch,
642  wait_time,
644 
645  /* emergency bailout if postmaster has died */
646  if (rc & WL_POSTMASTER_DEATH)
647  proc_exit(1);
648 
650  }
651 
652  LogicalRepCtx->launcher_pid = 0;
653 
654  /* ... and if it returns, we're done */
655  ereport(DEBUG1,
656  (errmsg("logical replication launcher shutting down")));
657 
658  proc_exit(0);
659 }
660 
661 /*
662  * Returns state of the subscriptions.
663  */
664 Datum
666 {
667 #define PG_STAT_GET_SUBSCRIPTION_COLS 7
668  Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
669  int i;
670  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
671  TupleDesc tupdesc;
672  Tuplestorestate *tupstore;
673  MemoryContext per_query_ctx;
674  MemoryContext oldcontext;
675 
676  /* check to see if caller supports us returning a tuplestore */
677  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
678  ereport(ERROR,
679  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
680  errmsg("set-valued function called in context that cannot accept a set")));
681  if (!(rsinfo->allowedModes & SFRM_Materialize))
682  ereport(ERROR,
683  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
684  errmsg("materialize mode required, but it is not " \
685  "allowed in this context")));
686 
687  /* Build a tuple descriptor for our result type */
688  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
689  elog(ERROR, "return type must be a row type");
690 
691  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
692  oldcontext = MemoryContextSwitchTo(per_query_ctx);
693 
694  tupstore = tuplestore_begin_heap(true, false, work_mem);
695  rsinfo->returnMode = SFRM_Materialize;
696  rsinfo->setResult = tupstore;
697  rsinfo->setDesc = tupdesc;
698 
699  MemoryContextSwitchTo(oldcontext);
700 
701  /* Make sure we get consistent view of the workers. */
702  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
703 
704  for (i = 0; i <= max_logical_replication_workers; i++)
705  {
706  /* for each row */
708  bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS];
709  int worker_pid;
710  LogicalRepWorker worker;
711 
712  memcpy(&worker, &LogicalRepCtx->workers[i],
713  sizeof(LogicalRepWorker));
714  if (!worker.proc || !IsBackendPid(worker.proc->pid))
715  continue;
716 
717  if (OidIsValid(subid) && worker.subid != subid)
718  continue;
719 
720  worker_pid = worker.proc->pid;
721 
722  MemSet(values, 0, sizeof(values));
723  MemSet(nulls, 0, sizeof(nulls));
724 
725  values[0] = ObjectIdGetDatum(worker.subid);
726  values[1] = Int32GetDatum(worker_pid);
727  if (XLogRecPtrIsInvalid(worker.last_lsn))
728  nulls[2] = true;
729  else
730  values[2] = LSNGetDatum(worker.last_lsn);
731  if (worker.last_send_time == 0)
732  nulls[3] = true;
733  else
734  values[3] = TimestampTzGetDatum(worker.last_send_time);
735  if (worker.last_recv_time == 0)
736  nulls[4] = true;
737  else
738  values[4] = TimestampTzGetDatum(worker.last_recv_time);
739  if (XLogRecPtrIsInvalid(worker.reply_lsn))
740  nulls[5] = true;
741  else
742  values[5] = LSNGetDatum(worker.reply_lsn);
743  if (worker.reply_time == 0)
744  nulls[6] = true;
745  else
746  values[6] = TimestampTzGetDatum(worker.reply_time);
747 
748  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
749 
750  /* If only a single subscription was requested, and we found it, break. */
751  if (OidIsValid(subid))
752  break;
753  }
754 
755  LWLockRelease(LogicalRepWorkerLock);
756 
757  /* clean up and return the tuplestore */
758  tuplestore_donestoring(tupstore);
759 
760  return (Datum) 0;
761 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:735
#define NIL
Definition: pg_list.h:69
void ApplyLauncherWakeup(void)
Definition: launcher.c:543
#define SIGUSR1
Definition: win32.h:211
#define IsA(nodeptr, _type_)
Definition: nodes.h:569
void RegisterBackgroundWorker(BackgroundWorker *worker)
Definition: bgworker.c:805
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
#define DEBUG1
Definition: elog.h:25
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
int MyProcPid
Definition: globals.c:38
int errhint(const char *fmt,...)
Definition: elog.c:987
#define GETSTRUCT(TUP)
Definition: htup_details.h:656
void heap_endscan(HeapScanDesc scan)
Definition: heapam.c:1581
#define WL_TIMEOUT
Definition: latch.h:127
void ApplyLauncherMain(Datum main_arg)
Definition: launcher.c:553
bool LWLockHeldByMe(LWLock *l)
Definition: lwlock.c:1831
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1077
void CommitTransactionCommand(void)
Definition: xact.c:2747
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define AccessShareLock
Definition: lockdefs.h:36
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:67
TimestampTz last_send_time
XLogRecPtr last_lsn
int bgw_restart_time
Definition: bgworker.h:93
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
#define MemSet(start, val, len)
Definition: c.h:857
BackgroundWorker * MyBgworkerEntry
Definition: postmaster.c:189
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define heap_close(r, l)
Definition: heapam.h:97
FormData_pg_subscription * Form_pg_subscription
void ResetLatch(volatile Latch *latch)
Definition: latch.c:461
#define LOG
Definition: elog.h:26
unsigned int Oid
Definition: postgres_ext.h:31
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:52
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1648
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:300
#define OidIsValid(objectId)
Definition: c.h:538
#define ALLOCSET_DEFAULT_MINSIZE
Definition: memutils.h:162
Latch procLatch
Definition: proc.h:98
static void logicalrep_worker_detach(void)
Definition: launcher.c:426
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
Datum bgw_main_arg
Definition: bgworker.h:97
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:300
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:59
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
bgworker_main_type bgw_main
Definition: bgworker.h:94
XLogRecPtr reply_lsn
#define SubscriptionRelationId
void logicalrep_worker_attach(int slot)
Definition: launcher.c:402
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:6639
#define PG_GETARG_OID(n)
Definition: fmgr.h:232
Size ApplyLauncherShmemSize(void)
Definition: launcher.c:465
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:320
LogicalRepWorker * logicalrep_worker_find(Oid subid)
Definition: launcher.c:204
#define BGW_NEVER_RESTART
Definition: bgworker.h:84
List * publications
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
HeapScanDesc heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key)
Definition: heapam.c:1402
BgwHandleStatus
Definition: bgworker.h:102
static bool on_commit_launcher_wakeup
Definition: launcher.c:76
#define ereport(elevel, rest)
Definition: elog.h:122
MemoryContext TopMemoryContext
Definition: mcxt.c:43
List * lappend(List *list, void *datum)
Definition: list.c:128
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define WARNING
Definition: elog.h:40
void AtCommit_ApplyLauncher(void)
Definition: launcher.c:522
int wal_retrieve_retry_interval
Definition: xlog.c:107
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:316
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
static void logicalrep_worker_onexit(int code, Datum arg)
Definition: launcher.c:445
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
#define DEFAULT_NAPTIME_PER_CYCLE
Definition: launcher.c:56
uintptr_t Datum
Definition: postgres.h:372
void ApplyLauncherRegister(void)
Definition: launcher.c:480
#define PG_STAT_GET_SUBSCRIPTION_COLS
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction)
Definition: heapam.c:1797
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1287
void logicalrep_worker_sigterm(SIGNAL_ARGS)
Definition: launcher.c:452
int work_mem
Definition: globals.c:112
#define InvalidOid
Definition: postgres_ext.h:36
int allowedModes
Definition: execnodes.h:201
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
static List * get_subscription_list(void)
Definition: launcher.c:88
SetFunctionReturnMode returnMode
Definition: execnodes.h:203
int max_replication_slots
Definition: slot.c:99
void SetLatch(volatile Latch *latch)
Definition: latch.c:379
TimestampTz last_recv_time
#define PG_ARGISNULL(n)
Definition: fmgr.h:166
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define SIGNAL_ARGS
Definition: c.h:1079
#define NULL
Definition: c.h:229
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:90
#define Assert(condition)
Definition: c.h:675
#define lfirst(lc)
Definition: pg_list.h:106
bool got_SIGTERM
Definition: launcher.c:75
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:59
void StartTransactionCommand(void)
Definition: xact.c:2677
int max_logical_replication_workers
Definition: launcher.c:58
#define BGW_MAXLEN
Definition: bgworker.h:85
size_t Size
Definition: c.h:356
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:92
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition: bgworker.c:899
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
#define MAXALIGN(LEN)
Definition: c.h:588
void ApplyLauncherShmemInit(void)
Definition: launcher.c:505
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:135
struct LogicalRepCtxStruct LogicalRepCtxStruct
Tuplestorestate * setResult
Definition: execnodes.h:206
static Datum values[MAXATTR]
Definition: bootstrap.c:162
ExprContext * econtext
Definition: execnodes.h:199
#define Int32GetDatum(X)
Definition: postgres.h:485
TupleDesc setDesc
Definition: execnodes.h:207
void * palloc(Size size)
Definition: mcxt.c:849
int errmsg(const char *fmt,...)
Definition: elog.c:797
pid_t bgw_notify_pid
Definition: bgworker.h:99
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:70
#define ALLOCSET_DEFAULT_INITSIZE
Definition: memutils.h:163
int i
#define NameStr(name)
Definition: c.h:499
bool IsBackendPid(int pid)
Definition: procarray.c:2419
void logicalrep_worker_stop(Oid subid)
Definition: launcher.c:310
void * arg
struct Latch * MyLatch
Definition: globals.c:51
#define PG_FUNCTION_ARGS
Definition: fmgr.h:150
#define ALLOCSET_DEFAULT_MAXSIZE
Definition: memutils.h:164
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
Datum pg_stat_get_subscription(PG_FUNCTION_ARGS)
Definition: launcher.c:665
#define elog
Definition: elog.h:219
#define HeapTupleGetOid(tuple)
Definition: htup_details.h:695
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:224
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:536
Definition: pg_list.h:45
int pid
Definition: proc.h:103
#define WL_LATCH_SET
Definition: latch.h:124
void BackgroundWorkerInitializeConnection(char *dbname, char *username)
Definition: postmaster.c:5452
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1004
void ApplyWorkerMain(Datum main_arg)
Definition: worker.c:1337
static bool WaitForReplicationWorkerAttach(LogicalRepWorker *worker, BackgroundWorkerHandle *handle)
Definition: launcher.c:157
TimestampTz reply_time
void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid)
Definition: launcher.c:228
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5504