PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
launcher.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * launcher.c
3  * PostgreSQL logical replication worker launcher process
4  *
5  * Copyright (c) 2016-2017, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/launcher.c
9  *
10  * NOTES
11  * This module contains the logical replication worker launcher which
12  * uses the background worker infrastructure to start the logical
13  * replication workers for every enabled subscription.
14  *
15  *-------------------------------------------------------------------------
16  */
17 
18 #include "postgres.h"
19 
20 #include "funcapi.h"
21 #include "miscadmin.h"
22 #include "pgstat.h"
23 
24 #include "access/heapam.h"
25 #include "access/htup.h"
26 #include "access/htup_details.h"
27 #include "access/xact.h"
28 
31 
32 #include "libpq/pqsignal.h"
33 
34 #include "postmaster/bgworker.h"
36 #include "postmaster/postmaster.h"
37 
40 #include "replication/slot.h"
42 
43 #include "storage/ipc.h"
44 #include "storage/proc.h"
45 #include "storage/procarray.h"
46 #include "storage/procsignal.h"
47 
48 #include "tcop/tcopprot.h"
49 
50 #include "utils/memutils.h"
51 #include "utils/pg_lsn.h"
52 #include "utils/ps_status.h"
53 #include "utils/timeout.h"
54 #include "utils/snapmgr.h"
55 
56 /* max sleep time between cycles (3min) */
57 #define DEFAULT_NAPTIME_PER_CYCLE 180000L
58 
61 
63 
64 typedef struct LogicalRepCtxStruct
65 {
66  /* Supervisor process. */
67  pid_t launcher_pid;
68 
69  /* Background workers. */
70  LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
72 
74 
75 static void ApplyLauncherWakeup(void);
76 static void logicalrep_launcher_onexit(int code, Datum arg);
77 static void logicalrep_worker_onexit(int code, Datum arg);
78 static void logicalrep_worker_detach(void);
79 
80 /* Flags set by signal handlers */
81 volatile sig_atomic_t got_SIGHUP = false;
82 volatile sig_atomic_t got_SIGTERM = false;
83 
84 static bool on_commit_launcher_wakeup = false;
85 
87 
88 
89 /*
90  * Load the list of subscriptions.
91  *
92  * Only the fields interesting for worker start/stop functions are filled for
93  * each subscription.
94  */
95 static List *
97 {
98  List *res = NIL;
99  Relation rel;
100  HeapScanDesc scan;
101  HeapTuple tup;
102  MemoryContext resultcxt;
103 
104  /* This is the context that we will allocate our output data in */
105  resultcxt = CurrentMemoryContext;
106 
107  /*
108  * Start a transaction so we can access pg_database, and get a snapshot.
109  * We don't have a use for the snapshot itself, but we're interested in
110  * the secondary effect that it sets RecentGlobalXmin. (This is critical
111  * for anything that reads heap pages, because HOT may decide to prune
112  * them even if the process doesn't attempt to modify any tuples.)
113  */
115  (void) GetTransactionSnapshot();
116 
118  scan = heap_beginscan_catalog(rel, 0, NULL);
119 
121  {
123  Subscription *sub;
124  MemoryContext oldcxt;
125 
126  /*
127  * Allocate our results in the caller's context, not the
128  * transaction's. We do this inside the loop, and restore the original
129  * context at the end, so that leaky things like heap_getnext() are
130  * not called in a potentially long-lived context.
131  */
132  oldcxt = MemoryContextSwitchTo(resultcxt);
133 
134  sub = (Subscription *) palloc0(sizeof(Subscription));
135  sub->oid = HeapTupleGetOid(tup);
136  sub->dbid = subform->subdbid;
137  sub->owner = subform->subowner;
138  sub->enabled = subform->subenabled;
139  sub->name = pstrdup(NameStr(subform->subname));
140  /* We don't fill fields we are not interested in. */
141 
142  res = lappend(res, sub);
143  MemoryContextSwitchTo(oldcxt);
144  }
145 
146  heap_endscan(scan);
148 
150 
151  return res;
152 }
153 
154 /*
155  * Wait for a background worker to start up and attach to the shmem context.
156  *
157  * This is like WaitForBackgroundWorkerStartup(), except that we wait for
158  * attaching, not just start and we also just exit if postmaster died.
159  */
160 static bool
162  BackgroundWorkerHandle *handle)
163 {
165  int rc;
166 
167  for (;;)
168  {
169  pid_t pid;
170 
172 
173  status = GetBackgroundWorkerPid(handle, &pid);
174 
175  /*
176  * Worker started and attached to our shmem. This check is safe
177  * because only launcher ever starts the workers, so nobody can steal
178  * the worker slot.
179  */
180  if (status == BGWH_STARTED && worker->proc)
181  return true;
182  /* Worker didn't start or died before attaching to our shmem. */
183  if (status == BGWH_STOPPED)
184  return false;
185 
186  /*
187  * We need timeout because we generally don't get notified via latch
188  * about the worker attach.
189  */
190  rc = WaitLatch(MyLatch,
193 
194  if (rc & WL_POSTMASTER_DEATH)
195  proc_exit(1);
196 
198  }
199 
200  return false;
201 }
202 
203 /*
204  * Walks the workers array and searches for one that matches given
205  * subscription id and relid.
206  */
208 logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
209 {
210  int i;
211  LogicalRepWorker *res = NULL;
212 
213  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
214 
215  /* Search for attached worker for a given subscription id. */
216  for (i = 0; i < max_logical_replication_workers; i++)
217  {
218  LogicalRepWorker *w = &LogicalRepCtx->workers[i];
219  if (w->subid == subid && w->relid == relid &&
220  (!only_running || (w->proc && IsBackendPid(w->proc->pid))))
221  {
222  res = w;
223  break;
224  }
225  }
226 
227  return res;
228 }
229 
230 /*
231  * Start new apply background worker.
232  */
233 void
234 logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
235  Oid relid)
236 {
237  BackgroundWorker bgw;
238  BackgroundWorkerHandle *bgw_handle;
239  int slot;
240  LogicalRepWorker *worker = NULL;
241 
242  ereport(LOG,
243  (errmsg("starting logical replication worker for subscription \"%s\"",
244  subname)));
245 
246  /* Report this after the initial starting message for consistency. */
247  if (max_replication_slots == 0)
248  ereport(ERROR,
249  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
250  errmsg("cannot start logical replication workers when max_replication_slots = 0")));
251 
252  /*
253  * We need to do the modification of the shared memory under lock so that
254  * we have consistent view.
255  */
256  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
257 
258  /* Find unused worker slot. */
259  for (slot = 0; slot < max_logical_replication_workers; slot++)
260  {
261  if (!LogicalRepCtx->workers[slot].proc)
262  {
263  worker = &LogicalRepCtx->workers[slot];
264  break;
265  }
266  }
267 
268  /* Bail if not found */
269  if (worker == NULL)
270  {
271  LWLockRelease(LogicalRepWorkerLock);
273  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
274  errmsg("out of logical replication workers slots"),
275  errhint("You might need to increase max_logical_replication_workers.")));
276  return;
277  }
278 
279  /* Prepare the worker info. */
280  worker->proc = NULL;
281  worker->dbid = dbid;
282  worker->userid = userid;
283  worker->subid = subid;
284  worker->relid = relid;
285  worker->relstate = SUBREL_STATE_UNKNOWN;
287  worker->last_lsn = InvalidXLogRecPtr;
290  worker->reply_lsn = InvalidXLogRecPtr;
291  TIMESTAMP_NOBEGIN(worker->reply_time);
292 
293  LWLockRelease(LogicalRepWorkerLock);
294 
295  /* Register the new dynamic worker. */
296  memset(&bgw, 0, sizeof(bgw));
300  snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
301  snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
302  if (OidIsValid(relid))
304  "logical replication worker for subscription %u sync %u", subid, relid);
305  else
307  "logical replication worker for subscription %u", subid);
308 
311  bgw.bgw_main_arg = Int32GetDatum(slot);
312 
313  if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
314  {
316  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
317  errmsg("out of background workers slots"),
318  errhint("You might need to increase max_worker_processes.")));
319  return;
320  }
321 
322  /* Now wait until it attaches. */
323  WaitForReplicationWorkerAttach(worker, bgw_handle);
324 }
325 
326 /*
327  * Stop the logical replication worker and wait until it detaches from the
328  * slot.
329  */
330 void
332 {
333  LogicalRepWorker *worker;
334 
335  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
336 
337  worker = logicalrep_worker_find(subid, relid, false);
338 
339  /* No worker, nothing to do. */
340  if (!worker)
341  {
342  LWLockRelease(LogicalRepWorkerLock);
343  return;
344  }
345 
346  /*
347  * If we found worker but it does not have proc set it is starting up,
348  * wait for it to finish and then kill it.
349  */
350  while (worker && !worker->proc)
351  {
352  int rc;
353 
354  LWLockRelease(LogicalRepWorkerLock);
355 
357 
358  /* Wait for signal. */
359  rc = WaitLatch(&MyProc->procLatch,
362 
363  /* emergency bailout if postmaster has died */
364  if (rc & WL_POSTMASTER_DEATH)
365  proc_exit(1);
366 
368 
369  /* Check worker status. */
370  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
371 
372  /*
373  * Worker is no longer associated with subscription. It must have
374  * exited, nothing more for us to do.
375  */
376  if (worker->subid == InvalidOid)
377  {
378  LWLockRelease(LogicalRepWorkerLock);
379  return;
380  }
381 
382  /* Worker has assigned proc, so it has started. */
383  if (worker->proc)
384  break;
385  }
386 
387  /* Now terminate the worker ... */
388  kill(worker->proc->pid, SIGTERM);
389  LWLockRelease(LogicalRepWorkerLock);
390 
391  /* ... and wait for it to die. */
392  for (;;)
393  {
394  int rc;
395 
396  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
397  if (!worker->proc)
398  {
399  LWLockRelease(LogicalRepWorkerLock);
400  break;
401  }
402  LWLockRelease(LogicalRepWorkerLock);
403 
405 
406  /* Wait for more work. */
407  rc = WaitLatch(&MyProc->procLatch,
410 
411  /* emergency bailout if postmaster has died */
412  if (rc & WL_POSTMASTER_DEATH)
413  proc_exit(1);
414 
416  }
417 }
418 
419 /*
420  * Wake up (using latch) the logical replication worker.
421  */
422 void
424 {
425  LogicalRepWorker *worker;
426 
427  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
428  worker = logicalrep_worker_find(subid, relid, true);
429  LWLockRelease(LogicalRepWorkerLock);
430 
431  if (worker)
433 }
434 
435 /*
436  * Wake up (using latch) the logical replication worker.
437  */
438 void
440 {
441  SetLatch(&worker->proc->procLatch);
442 }
443 
444 /*
445  * Attach to a slot.
446  */
447 void
449 {
450  /* Block concurrent access. */
451  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
452 
453  Assert(slot >= 0 && slot < max_logical_replication_workers);
454  MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
455 
456  if (MyLogicalRepWorker->proc)
457  ereport(ERROR,
458  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
459  errmsg("logical replication worker slot %d already used by "
460  "another worker", slot)));
461 
462  MyLogicalRepWorker->proc = MyProc;
464 
465  LWLockRelease(LogicalRepWorkerLock);
466 }
467 
468 /*
469  * Detach the worker (cleans up the worker info).
470  */
471 static void
473 {
474  /* Block concurrent access. */
475  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
476 
477  MyLogicalRepWorker->dbid = InvalidOid;
478  MyLogicalRepWorker->userid = InvalidOid;
479  MyLogicalRepWorker->subid = InvalidOid;
480  MyLogicalRepWorker->proc = NULL;
481 
482  LWLockRelease(LogicalRepWorkerLock);
483 }
484 
485 /*
486  * Cleanup function for logical replication launcher.
487  *
488  * Called on logical replication launcher exit.
489  */
490 static void
492 {
493  LogicalRepCtx->launcher_pid = 0;
494 }
495 
496 /*
497  * Cleanup function.
498  *
499  * Called on logical replication worker exit.
500  */
501 static void
503 {
505 }
506 
507 /* SIGTERM: set flag to exit at next convenient time */
508 void
510 {
511  int save_errno = errno;
512 
513  got_SIGTERM = true;
514 
515  /* Waken anything waiting on the process latch */
516  SetLatch(MyLatch);
517 
518  errno = save_errno;
519 }
520 
521 /* SIGHUP: set flag to reload configuration at next convenient time */
522 void
524 {
525  int save_errno = errno;
526 
527  got_SIGHUP = true;
528 
529  /* Waken anything waiting on the process latch */
530  SetLatch(MyLatch);
531 
532  errno = save_errno;
533 }
534 
535 /*
536  * Count the number of registered (not necessarily running) sync workers
537  * for a subscription.
538  */
539 int
541 {
542  int i;
543  int res = 0;
544 
545  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
546 
547  /* Search for attached worker for a given subscription id. */
548  for (i = 0; i < max_logical_replication_workers; i++)
549  {
550  LogicalRepWorker *w = &LogicalRepCtx->workers[i];
551  if (w->subid == subid && OidIsValid(w->relid))
552  res++;
553  }
554 
555  return res;
556 }
557 
558 /*
559  * ApplyLauncherShmemSize
560  * Compute space needed for replication launcher shared memory
561  */
562 Size
564 {
565  Size size;
566 
567  /*
568  * Need the fixed struct and the array of LogicalRepWorker.
569  */
570  size = sizeof(LogicalRepCtxStruct);
571  size = MAXALIGN(size);
573  sizeof(LogicalRepWorker)));
574  return size;
575 }
576 
577 /*
578  * ApplyLauncherRegister
579  * Register a background worker running the logical replication launcher.
580  */
581 void
583 {
584  BackgroundWorker bgw;
585 
587  return;
588 
589  memset(&bgw, 0, sizeof(bgw));
593  snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
594  snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
596  "logical replication launcher");
597  bgw.bgw_restart_time = 5;
598  bgw.bgw_notify_pid = 0;
599  bgw.bgw_main_arg = (Datum) 0;
600 
602 }
603 
604 /*
605  * ApplyLauncherShmemInit
606  * Allocate and initialize replication launcher shared memory
607  */
608 void
610 {
611  bool found;
612 
613  LogicalRepCtx = (LogicalRepCtxStruct *)
614  ShmemInitStruct("Logical Replication Launcher Data",
616  &found);
617 
618  if (!found)
619  {
620  int slot;
621 
622  memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
623 
624  /* Initialize memory and spin locks for each worker slot. */
625  for (slot = 0; slot < max_logical_replication_workers; slot++)
626  {
627  LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
628 
629  memset(worker, 0, sizeof(LogicalRepWorker));
630  SpinLockInit(&worker->relmutex);
631  }
632  }
633 }
634 
635 /*
636  * Wakeup the launcher on commit if requested.
637  */
638 void
640 {
643 }
644 
645 /*
646  * Request wakeup of the launcher on commit of the transaction.
647  *
648  * This is used to send launcher signal to stop sleeping and process the
649  * subscriptions when current transaction commits. Should be used when new
650  * tuple was added to the pg_subscription catalog.
651 */
652 void
654 {
657 }
658 
659 static void
661 {
662  if (LogicalRepCtx->launcher_pid != 0)
663  kill(LogicalRepCtx->launcher_pid, SIGUSR1);
664 }
665 
666 /*
667  * Main loop for the apply launcher process.
668  */
669 void
671 {
672  ereport(DEBUG1,
673  (errmsg("logical replication launcher started")));
674 
676 
677  /* Establish signal handlers. */
681 
682  /* Make it easy to identify our processes. */
683  SetConfigOption("application_name", MyBgworkerEntry->bgw_name,
685 
686  LogicalRepCtx->launcher_pid = MyProcPid;
687 
688  /*
689  * Establish connection to nailed catalogs (we only ever access
690  * pg_subscription).
691  */
693 
694  /* Enter main loop */
695  while (!got_SIGTERM)
696  {
697  int rc;
698  List *sublist;
699  ListCell *lc;
700  MemoryContext subctx;
701  MemoryContext oldctx;
703  TimestampTz last_start_time = 0;
704  long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
705 
706  now = GetCurrentTimestamp();
707 
708  /* Limit the start retry to once a wal_retrieve_retry_interval */
709  if (TimestampDifferenceExceeds(last_start_time, now,
711  {
712  /* Use temporary context for the database list and worker info. */
714  "Logical Replication Launcher sublist",
718  oldctx = MemoryContextSwitchTo(subctx);
719 
720  /* search for subscriptions to start or stop. */
721  sublist = get_subscription_list();
722 
723  /* Start the missing workers for enabled subscriptions. */
724  foreach(lc, sublist)
725  {
726  Subscription *sub = (Subscription *) lfirst(lc);
727  LogicalRepWorker *w;
728 
729  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
730  w = logicalrep_worker_find(sub->oid, InvalidOid, false);
731  LWLockRelease(LogicalRepWorkerLock);
732 
733  if (sub->enabled && w == NULL)
734  {
735  logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
736  sub->owner, InvalidOid);
737  last_start_time = now;
738  wait_time = wal_retrieve_retry_interval;
739  /* Limit to one worker per mainloop cycle. */
740  break;
741  }
742  }
743 
744  /* Switch back to original memory context. */
745  MemoryContextSwitchTo(oldctx);
746  /* Clean the temporary memory. */
747  MemoryContextDelete(subctx);
748  }
749  else
750  {
751  /*
752  * The wait in previous cycle was interrupted in less than
753  * wal_retrieve_retry_interval since last worker was started,
754  * this usually means crash of the worker, so we should retry
755  * in wal_retrieve_retry_interval again.
756  */
757  wait_time = wal_retrieve_retry_interval;
758  }
759 
760  /* Wait for more work. */
761  rc = WaitLatch(&MyProc->procLatch,
763  wait_time,
765 
766  /* emergency bailout if postmaster has died */
767  if (rc & WL_POSTMASTER_DEATH)
768  proc_exit(1);
769 
770  if (got_SIGHUP)
771  {
772  got_SIGHUP = false;
774  }
775 
777  }
778 
779  LogicalRepCtx->launcher_pid = 0;
780 
781  /* ... and if it returns, we're done */
782  ereport(DEBUG1,
783  (errmsg("logical replication launcher shutting down")));
784 
785  proc_exit(0);
786 }
787 
788 /*
789  * Returns state of the subscriptions.
790  */
791 Datum
793 {
794 #define PG_STAT_GET_SUBSCRIPTION_COLS 8
795  Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
796  int i;
797  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
798  TupleDesc tupdesc;
799  Tuplestorestate *tupstore;
800  MemoryContext per_query_ctx;
801  MemoryContext oldcontext;
802 
803  /* check to see if caller supports us returning a tuplestore */
804  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
805  ereport(ERROR,
806  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
807  errmsg("set-valued function called in context that cannot accept a set")));
808  if (!(rsinfo->allowedModes & SFRM_Materialize))
809  ereport(ERROR,
810  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
811  errmsg("materialize mode required, but it is not " \
812  "allowed in this context")));
813 
814  /* Build a tuple descriptor for our result type */
815  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
816  elog(ERROR, "return type must be a row type");
817 
818  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
819  oldcontext = MemoryContextSwitchTo(per_query_ctx);
820 
821  tupstore = tuplestore_begin_heap(true, false, work_mem);
822  rsinfo->returnMode = SFRM_Materialize;
823  rsinfo->setResult = tupstore;
824  rsinfo->setDesc = tupdesc;
825 
826  MemoryContextSwitchTo(oldcontext);
827 
828  /* Make sure we get consistent view of the workers. */
829  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
830 
831  for (i = 0; i <= max_logical_replication_workers; i++)
832  {
833  /* for each row */
835  bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS];
836  int worker_pid;
837  LogicalRepWorker worker;
838 
839  memcpy(&worker, &LogicalRepCtx->workers[i],
840  sizeof(LogicalRepWorker));
841  if (!worker.proc || !IsBackendPid(worker.proc->pid))
842  continue;
843 
844  if (OidIsValid(subid) && worker.subid != subid)
845  continue;
846 
847  worker_pid = worker.proc->pid;
848 
849  MemSet(values, 0, sizeof(values));
850  MemSet(nulls, 0, sizeof(nulls));
851 
852  values[0] = ObjectIdGetDatum(worker.subid);
853  if (OidIsValid(worker.relid))
854  values[1] = ObjectIdGetDatum(worker.relid);
855  else
856  nulls[1] = true;
857  values[2] = Int32GetDatum(worker_pid);
858  if (XLogRecPtrIsInvalid(worker.last_lsn))
859  nulls[3] = true;
860  else
861  values[3] = LSNGetDatum(worker.last_lsn);
862  if (worker.last_send_time == 0)
863  nulls[4] = true;
864  else
865  values[4] = TimestampTzGetDatum(worker.last_send_time);
866  if (worker.last_recv_time == 0)
867  nulls[5] = true;
868  else
869  values[5] = TimestampTzGetDatum(worker.last_recv_time);
870  if (XLogRecPtrIsInvalid(worker.reply_lsn))
871  nulls[6] = true;
872  else
873  values[6] = LSNGetDatum(worker.reply_lsn);
874  if (worker.reply_time == 0)
875  nulls[7] = true;
876  else
877  values[7] = TimestampTzGetDatum(worker.reply_time);
878 
879  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
880 
881  /* If only a single subscription was requested, and we found it, break. */
882  if (OidIsValid(subid))
883  break;
884  }
885 
886  LWLockRelease(LogicalRepWorkerLock);
887 
888  /* clean up and return the tuplestore */
889  tuplestore_donestoring(tupstore);
890 
891  return (Datum) 0;
892 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
volatile sig_atomic_t got_SIGHUP
Definition: launcher.c:81
#define NIL
Definition: pg_list.h:69
void logicalrep_worker_sighup(SIGNAL_ARGS)
Definition: launcher.c:523
#define SIGUSR1
Definition: win32.h:202
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:560
void RegisterBackgroundWorker(BackgroundWorker *worker)
Definition: bgworker.c:849
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
#define DEBUG1
Definition: elog.h:25
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
int MyProcPid
Definition: globals.c:38
int errhint(const char *fmt,...)
Definition: elog.c:987
#define GETSTRUCT(TUP)
Definition: htup_details.h:656
void heap_endscan(HeapScanDesc scan)
Definition: heapam.c:1578
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
void ApplyLauncherMain(Datum main_arg)
Definition: launcher.c:670
bool LWLockHeldByMe(LWLock *l)
Definition: lwlock.c:1831
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
void logicalrep_worker_wakeup(Oid subid, Oid relid)
Definition: launcher.c:423
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1077
void CommitTransactionCommand(void)
Definition: xact.c:2747
#define SpinLockInit(lock)
Definition: spin.h:60
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define AccessShareLock
Definition: lockdefs.h:36
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:70
TimestampTz last_send_time
XLogRecPtr last_lsn
int bgw_restart_time
Definition: bgworker.h:93
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
#define MemSet(start, val, len)
Definition: c.h:857
BackgroundWorker * MyBgworkerEntry
Definition: postmaster.c:189
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define heap_close(r, l)
Definition: heapam.h:97
FormData_pg_subscription * Form_pg_subscription
void ResetLatch(volatile Latch *latch)
Definition: latch.c:450
#define LOG
Definition: elog.h:26
unsigned int Oid
Definition: postgres_ext.h:31
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:52
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1649
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:300
#define OidIsValid(objectId)
Definition: c.h:538
char bgw_function_name[BGW_MAXLEN]
Definition: bgworker.h:95
#define ALLOCSET_DEFAULT_MINSIZE
Definition: memutils.h:162
Latch procLatch
Definition: proc.h:103
XLogRecPtr relstate_lsn
static void logicalrep_worker_detach(void)
Definition: launcher.c:472
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:439
Datum bgw_main_arg
Definition: bgworker.h:96
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:288
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:62
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
int max_sync_workers_per_subscription
Definition: launcher.c:60
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
XLogRecPtr reply_lsn
#define SUBREL_STATE_UNKNOWN
#define SubscriptionRelationId
void logicalrep_worker_attach(int slot)
Definition: launcher.c:448
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:6676
#define PG_GETARG_OID(n)
Definition: fmgr.h:240
void logicalrep_worker_stop(Oid subid, Oid relid)
Definition: launcher.c:331
Size ApplyLauncherShmemSize(void)
Definition: launcher.c:563
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:320
#define BGW_NEVER_RESTART
Definition: bgworker.h:84
static void logicalrep_launcher_onexit(int code, Datum arg)
Definition: launcher.c:491
#define TIMESTAMP_NOBEGIN(j)
Definition: timestamp.h:112
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
HeapScanDesc heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key)
Definition: heapam.c:1399
BgwHandleStatus
Definition: bgworker.h:101
static bool on_commit_launcher_wakeup
Definition: launcher.c:84
#define ereport(elevel, rest)
Definition: elog.h:122
MemoryContext TopMemoryContext
Definition: mcxt.c:43
Definition: guc.h:72
List * lappend(List *list, void *datum)
Definition: list.c:128
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define WARNING
Definition: elog.h:40
void AtCommit_ApplyLauncher(void)
Definition: launcher.c:639
int wal_retrieve_retry_interval
Definition: xlog.c:107
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
static void logicalrep_worker_onexit(int code, Datum arg)
Definition: launcher.c:502
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
void * palloc0(Size size)
Definition: mcxt.c:878
#define DEFAULT_NAPTIME_PER_CYCLE
Definition: launcher.c:57
uintptr_t Datum
Definition: postgres.h:372
void ApplyLauncherRegister(void)
Definition: launcher.c:582
#define PG_STAT_GET_SUBSCRIPTION_COLS
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction)
Definition: heapam.c:1794
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1284
void logicalrep_worker_sigterm(SIGNAL_ARGS)
Definition: launcher.c:509
int work_mem
Definition: globals.c:112
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:540
#define SIGHUP
Definition: win32.h:188
#define InvalidOid
Definition: postgres_ext.h:36
int allowedModes
Definition: execnodes.h:268
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
static List * get_subscription_list(void)
Definition: launcher.c:96
SetFunctionReturnMode returnMode
Definition: execnodes.h:270
int max_replication_slots
Definition: slot.c:99
void SetLatch(volatile Latch *latch)
Definition: latch.c:367
TimestampTz last_recv_time
#define PG_ARGISNULL(n)
Definition: fmgr.h:174
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define SIGNAL_ARGS
Definition: c.h:1079
#define NULL
Definition: c.h:229
volatile sig_atomic_t got_SIGTERM
Definition: launcher.c:82
void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid)
Definition: launcher.c:234
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:90
#define Assert(condition)
Definition: c.h:675
#define lfirst(lc)
Definition: pg_list.h:106
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:59
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:208
void StartTransactionCommand(void)
Definition: xact.c:2677
int max_logical_replication_workers
Definition: launcher.c:59
#define BGW_MAXLEN
Definition: bgworker.h:85
size_t Size
Definition: c.h:356
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:92
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition: bgworker.c:933
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
#define MAXALIGN(LEN)
Definition: c.h:588
void ApplyLauncherShmemInit(void)
Definition: launcher.c:609
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:202
struct LogicalRepCtxStruct LogicalRepCtxStruct
Tuplestorestate * setResult
Definition: execnodes.h:273
static Datum values[MAXATTR]
Definition: bootstrap.c:163
ExprContext * econtext
Definition: execnodes.h:266
#define Int32GetDatum(X)
Definition: postgres.h:485
TupleDesc setDesc
Definition: execnodes.h:274
int errmsg(const char *fmt,...)
Definition: elog.c:797
pid_t bgw_notify_pid
Definition: bgworker.h:98
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:73
#define ALLOCSET_DEFAULT_INITSIZE
Definition: memutils.h:163
int i
#define NameStr(name)
Definition: c.h:499
bool IsBackendPid(int pid)
Definition: procarray.c:2429
void * arg
struct Latch * MyLatch
Definition: globals.c:51
#define PG_FUNCTION_ARGS
Definition: fmgr.h:158
#define ALLOCSET_DEFAULT_MAXSIZE
Definition: memutils.h:164
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
Datum pg_stat_get_subscription(PG_FUNCTION_ARGS)
Definition: launcher.c:792
#define elog
Definition: elog.h:219
#define HeapTupleGetOid(tuple)
Definition: htup_details.h:695
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:224
static void ApplyLauncherWakeup(void)
Definition: launcher.c:660
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:653
char bgw_library_name[BGW_MAXLEN]
Definition: bgworker.h:94
Definition: pg_list.h:45
int pid
Definition: proc.h:108
#define WL_LATCH_SET
Definition: latch.h:124
void BackgroundWorkerInitializeConnection(char *dbname, char *username)
Definition: postmaster.c:5495
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1041
static bool WaitForReplicationWorkerAttach(LogicalRepWorker *worker, BackgroundWorkerHandle *handle)
Definition: launcher.c:161
TimestampTz reply_time
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5547