PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
launcher.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * launcher.c
3  * PostgreSQL logical replication worker launcher process
4  *
5  * Copyright (c) 2016-2017, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/launcher.c
9  *
10  * NOTES
11  * This module contains the logical replication worker launcher which
12  * uses the background worker infrastructure to start the logical
13  * replication workers for every enabled subscription.
14  *
15  *-------------------------------------------------------------------------
16  */
17 
18 #include "postgres.h"
19 
20 #include "funcapi.h"
21 #include "miscadmin.h"
22 #include "pgstat.h"
23 
24 #include "access/heapam.h"
25 #include "access/htup.h"
26 #include "access/htup_details.h"
27 #include "access/xact.h"
28 
31 
32 #include "libpq/pqsignal.h"
33 
34 #include "postmaster/bgworker.h"
36 #include "postmaster/postmaster.h"
37 
40 #include "replication/slot.h"
43 
44 #include "storage/ipc.h"
45 #include "storage/proc.h"
46 #include "storage/procarray.h"
47 #include "storage/procsignal.h"
48 
49 #include "tcop/tcopprot.h"
50 
51 #include "utils/memutils.h"
52 #include "utils/pg_lsn.h"
53 #include "utils/ps_status.h"
54 #include "utils/timeout.h"
55 #include "utils/snapmgr.h"
56 
57 /* max sleep time between cycles (3min) */
58 #define DEFAULT_NAPTIME_PER_CYCLE 180000L
59 
62 
64 
65 typedef struct LogicalRepCtxStruct
66 {
67  /* Supervisor process. */
68  pid_t launcher_pid;
69 
70  /* Background workers. */
71  LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
73 
75 
76 static void ApplyLauncherWakeup(void);
77 static void logicalrep_launcher_onexit(int code, Datum arg);
78 static void logicalrep_worker_onexit(int code, Datum arg);
79 static void logicalrep_worker_detach(void);
80 static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
81 
82 /* Flags set by signal handlers */
83 static volatile sig_atomic_t got_SIGHUP = false;
84 
85 static bool on_commit_launcher_wakeup = false;
86 
88 
89 
90 /*
91  * Load the list of subscriptions.
92  *
93  * Only the fields interesting for worker start/stop functions are filled for
94  * each subscription.
95  */
96 static List *
98 {
99  List *res = NIL;
100  Relation rel;
101  HeapScanDesc scan;
102  HeapTuple tup;
103  MemoryContext resultcxt;
104 
105  /* This is the context that we will allocate our output data in */
106  resultcxt = CurrentMemoryContext;
107 
108  /*
109  * Start a transaction so we can access pg_database, and get a snapshot.
110  * We don't have a use for the snapshot itself, but we're interested in
111  * the secondary effect that it sets RecentGlobalXmin. (This is critical
112  * for anything that reads heap pages, because HOT may decide to prune
113  * them even if the process doesn't attempt to modify any tuples.)
114  */
116  (void) GetTransactionSnapshot();
117 
119  scan = heap_beginscan_catalog(rel, 0, NULL);
120 
122  {
124  Subscription *sub;
125  MemoryContext oldcxt;
126 
127  /*
128  * Allocate our results in the caller's context, not the
129  * transaction's. We do this inside the loop, and restore the original
130  * context at the end, so that leaky things like heap_getnext() are
131  * not called in a potentially long-lived context.
132  */
133  oldcxt = MemoryContextSwitchTo(resultcxt);
134 
135  sub = (Subscription *) palloc0(sizeof(Subscription));
136  sub->oid = HeapTupleGetOid(tup);
137  sub->dbid = subform->subdbid;
138  sub->owner = subform->subowner;
139  sub->enabled = subform->subenabled;
140  sub->name = pstrdup(NameStr(subform->subname));
141  /* We don't fill fields we are not interested in. */
142 
143  res = lappend(res, sub);
144  MemoryContextSwitchTo(oldcxt);
145  }
146 
147  heap_endscan(scan);
149 
151 
152  return res;
153 }
154 
155 /*
156  * Wait for a background worker to start up and attach to the shmem context.
157  *
158  * This is only needed for cleaning up the shared memory in case the worker
159  * fails to attach.
160  */
161 static void
163  BackgroundWorkerHandle *handle)
164 {
166  int rc;
167  uint16 generation;
168 
169  /* Remember generation for future identification. */
170  generation = worker->generation;
171 
172  for (;;)
173  {
174  pid_t pid;
175 
177 
178  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
179 
180  /* Worker either died or has started; no need to do anything. */
181  if (!worker->in_use || worker->proc)
182  {
183  LWLockRelease(LogicalRepWorkerLock);
184  return;
185  }
186 
187  LWLockRelease(LogicalRepWorkerLock);
188 
189  /* Check if worker has died before attaching, and clean up after it. */
190  status = GetBackgroundWorkerPid(handle, &pid);
191 
192  if (status == BGWH_STOPPED)
193  {
194  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
195  /* Ensure that this was indeed the worker we waited for. */
196  if (generation == worker->generation)
198  LWLockRelease(LogicalRepWorkerLock);
199  return;
200  }
201 
202  /*
203  * We need timeout because we generally don't get notified via latch
204  * about the worker attach.
205  */
206  rc = WaitLatch(MyLatch,
209 
210  /* emergency bailout if postmaster has died */
211  if (rc & WL_POSTMASTER_DEATH)
212  proc_exit(1);
213 
214  if (rc & WL_LATCH_SET)
215  {
218  }
219  }
220 
221  return;
222 }
223 
224 /*
225  * Walks the workers array and searches for one that matches given
226  * subscription id and relid.
227  */
229 logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
230 {
231  int i;
232  LogicalRepWorker *res = NULL;
233 
234  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
235 
236  /* Search for attached worker for a given subscription id. */
237  for (i = 0; i < max_logical_replication_workers; i++)
238  {
239  LogicalRepWorker *w = &LogicalRepCtx->workers[i];
240 
241  if (w->in_use && w->subid == subid && w->relid == relid &&
242  (!only_running || w->proc))
243  {
244  res = w;
245  break;
246  }
247  }
248 
249  return res;
250 }
251 
252 /*
253  * Start new apply background worker.
254  */
255 void
256 logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
257  Oid relid)
258 {
259  BackgroundWorker bgw;
260  BackgroundWorkerHandle *bgw_handle;
261  int i;
262  int slot = 0;
263  LogicalRepWorker *worker = NULL;
264  int nsyncworkers;
266 
267  ereport(DEBUG1,
268  (errmsg("starting logical replication worker for subscription \"%s\"",
269  subname)));
270 
271  /* Report this after the initial starting message for consistency. */
272  if (max_replication_slots == 0)
273  ereport(ERROR,
274  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
275  errmsg("cannot start logical replication workers when max_replication_slots = 0")));
276 
277  /*
278  * We need to do the modification of the shared memory under lock so that
279  * we have consistent view.
280  */
281  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
282 
283 retry:
284  /* Find unused worker slot. */
285  for (i = 0; i < max_logical_replication_workers; i++)
286  {
287  LogicalRepWorker *w = &LogicalRepCtx->workers[i];
288 
289  if (!w->in_use)
290  {
291  worker = w;
292  slot = i;
293  break;
294  }
295  }
296 
297  nsyncworkers = logicalrep_sync_worker_count(subid);
298 
299  now = GetCurrentTimestamp();
300 
301  /*
302  * If we didn't find a free slot, try to do garbage collection. The
303  * reason we do this is because if some worker failed to start up and its
304  * parent has crashed while waiting, the in_use state was never cleared.
305  */
306  if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
307  {
308  bool did_cleanup = false;
309 
310  for (i = 0; i < max_logical_replication_workers; i++)
311  {
312  LogicalRepWorker *w = &LogicalRepCtx->workers[i];
313 
314  /*
315  * If the worker was marked in use but didn't manage to attach in
316  * time, clean it up.
317  */
318  if (w->in_use && !w->proc &&
321  {
322  elog(WARNING,
323  "logical replication worker for subscription %u took too long to start; canceled",
324  w->subid);
325 
327  did_cleanup = true;
328  }
329  }
330 
331  if (did_cleanup)
332  goto retry;
333  }
334 
335  /*
336  * If we reached the sync worker limit per subscription, just exit
337  * silently as we might get here because of an otherwise harmless race
338  * condition.
339  */
340  if (nsyncworkers >= max_sync_workers_per_subscription)
341  {
342  LWLockRelease(LogicalRepWorkerLock);
343  return;
344  }
345 
346  /*
347  * However if there are no more free worker slots, inform user about it
348  * before exiting.
349  */
350  if (worker == NULL)
351  {
352  LWLockRelease(LogicalRepWorkerLock);
354  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
355  errmsg("out of logical replication worker slots"),
356  errhint("You might need to increase max_logical_replication_workers.")));
357  return;
358  }
359 
360  /* Prepare the worker slot. */
361  worker->launch_time = now;
362  worker->in_use = true;
363  worker->generation++;
364  worker->proc = NULL;
365  worker->dbid = dbid;
366  worker->userid = userid;
367  worker->subid = subid;
368  worker->relid = relid;
369  worker->relstate = SUBREL_STATE_UNKNOWN;
371  worker->last_lsn = InvalidXLogRecPtr;
374  worker->reply_lsn = InvalidXLogRecPtr;
375  TIMESTAMP_NOBEGIN(worker->reply_time);
376 
377  LWLockRelease(LogicalRepWorkerLock);
378 
379  /* Register the new dynamic worker. */
380  memset(&bgw, 0, sizeof(bgw));
384  snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
385  snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
386  if (OidIsValid(relid))
388  "logical replication worker for subscription %u sync %u", subid, relid);
389  else
391  "logical replication worker for subscription %u", subid);
392 
395  bgw.bgw_main_arg = Int32GetDatum(slot);
396 
397  if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
398  {
400  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
401  errmsg("out of background worker slots"),
402  errhint("You might need to increase max_worker_processes.")));
403  return;
404  }
405 
406  /* Now wait until it attaches. */
407  WaitForReplicationWorkerAttach(worker, bgw_handle);
408 }
409 
410 /*
411  * Stop the logical replication worker and wait until it detaches from the
412  * slot.
413  */
414 void
416 {
417  LogicalRepWorker *worker;
418  uint16 generation;
419 
420  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
421 
422  worker = logicalrep_worker_find(subid, relid, false);
423 
424  /* No worker, nothing to do. */
425  if (!worker)
426  {
427  LWLockRelease(LogicalRepWorkerLock);
428  return;
429  }
430 
431  /*
432  * Remember which generation was our worker so we can check if what we see
433  * is still the same one.
434  */
435  generation = worker->generation;
436 
437  /*
438  * If we found worker but it does not have proc set it is starting up,
439  * wait for it to finish and then kill it.
440  */
441  while (worker->in_use && !worker->proc)
442  {
443  int rc;
444 
445  LWLockRelease(LogicalRepWorkerLock);
446 
447  /* Wait for signal. */
448  rc = WaitLatch(MyLatch,
451 
452  /* emergency bailout if postmaster has died */
453  if (rc & WL_POSTMASTER_DEATH)
454  proc_exit(1);
455 
456  if (rc & WL_LATCH_SET)
457  {
460  }
461 
462  /* Check worker status. */
463  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
464 
465  /*
466  * Check whether the worker slot is no longer used, which would mean
467  * that the worker has exited, or whether the worker generation is
468  * different, meaning that a different worker has taken the slot.
469  */
470  if (!worker->in_use || worker->generation != generation)
471  {
472  LWLockRelease(LogicalRepWorkerLock);
473  return;
474  }
475 
476  /* Worker has assigned proc, so it has started. */
477  if (worker->proc)
478  break;
479  }
480 
481  /* Now terminate the worker ... */
482  kill(worker->proc->pid, SIGTERM);
483  LWLockRelease(LogicalRepWorkerLock);
484 
485  /* ... and wait for it to die. */
486  for (;;)
487  {
488  int rc;
489 
490  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
491  if (!worker->proc || worker->generation != generation)
492  {
493  LWLockRelease(LogicalRepWorkerLock);
494  break;
495  }
496  LWLockRelease(LogicalRepWorkerLock);
497 
499 
500  /* Wait for more work. */
501  rc = WaitLatch(MyLatch,
504 
505  /* emergency bailout if postmaster has died */
506  if (rc & WL_POSTMASTER_DEATH)
507  proc_exit(1);
508 
509  if (rc & WL_LATCH_SET)
510  {
513  }
514  }
515 }
516 
517 /*
518  * Wake up (using latch) the logical replication worker.
519  */
520 void
522 {
523  LogicalRepWorker *worker;
524 
525  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
526  worker = logicalrep_worker_find(subid, relid, true);
527  LWLockRelease(LogicalRepWorkerLock);
528 
529  if (worker)
531 }
532 
533 /*
534  * Wake up (using latch) the logical replication worker.
535  */
536 void
538 {
539  SetLatch(&worker->proc->procLatch);
540 }
541 
542 /*
543  * Attach to a slot.
544  */
545 void
547 {
548  /* Block concurrent access. */
549  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
550 
551  Assert(slot >= 0 && slot < max_logical_replication_workers);
552  MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
553 
554  if (!MyLogicalRepWorker->in_use)
555  {
556  LWLockRelease(LogicalRepWorkerLock);
557  ereport(ERROR,
558  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
559  errmsg("logical replication worker slot %d is empty, cannot attach",
560  slot)));
561  }
562 
563  if (MyLogicalRepWorker->proc)
564  {
565  LWLockRelease(LogicalRepWorkerLock);
566  ereport(ERROR,
567  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
568  errmsg("logical replication worker slot %d is already used by "
569  "another worker, cannot attach", slot)));
570  }
571 
572  MyLogicalRepWorker->proc = MyProc;
574 
575  LWLockRelease(LogicalRepWorkerLock);
576 }
577 
578 /*
579  * Detach the worker (cleans up the worker info).
580  */
581 static void
583 {
584  /* Block concurrent access. */
585  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
586 
587  logicalrep_worker_cleanup(MyLogicalRepWorker);
588 
589  LWLockRelease(LogicalRepWorkerLock);
590 }
591 
592 /*
593  * Clean up worker info.
594  */
595 static void
597 {
598  Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
599 
600  worker->in_use = false;
601  worker->proc = NULL;
602  worker->dbid = InvalidOid;
603  worker->userid = InvalidOid;
604  worker->subid = InvalidOid;
605  worker->relid = InvalidOid;
606 }
607 
608 /*
609  * Cleanup function for logical replication launcher.
610  *
611  * Called on logical replication launcher exit.
612  */
613 static void
615 {
616  LogicalRepCtx->launcher_pid = 0;
617 }
618 
619 /*
620  * Cleanup function.
621  *
622  * Called on logical replication worker exit.
623  */
624 static void
626 {
627  /* Disconnect gracefully from the remote side. */
628  if (wrconn)
630 
632 
634 }
635 
636 /* SIGHUP: set flag to reload configuration at next convenient time */
637 static void
639 {
640  int save_errno = errno;
641 
642  got_SIGHUP = true;
643 
644  /* Waken anything waiting on the process latch */
645  SetLatch(MyLatch);
646 
647  errno = save_errno;
648 }
649 
650 /*
651  * Count the number of registered (not necessarily running) sync workers
652  * for a subscription.
653  */
654 int
656 {
657  int i;
658  int res = 0;
659 
660  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
661 
662  /* Search for attached worker for a given subscription id. */
663  for (i = 0; i < max_logical_replication_workers; i++)
664  {
665  LogicalRepWorker *w = &LogicalRepCtx->workers[i];
666 
667  if (w->subid == subid && OidIsValid(w->relid))
668  res++;
669  }
670 
671  return res;
672 }
673 
674 /*
675  * ApplyLauncherShmemSize
676  * Compute space needed for replication launcher shared memory
677  */
678 Size
680 {
681  Size size;
682 
683  /*
684  * Need the fixed struct and the array of LogicalRepWorker.
685  */
686  size = sizeof(LogicalRepCtxStruct);
687  size = MAXALIGN(size);
689  sizeof(LogicalRepWorker)));
690  return size;
691 }
692 
693 /*
694  * ApplyLauncherRegister
695  * Register a background worker running the logical replication launcher.
696  */
697 void
699 {
700  BackgroundWorker bgw;
701 
703  return;
704 
705  memset(&bgw, 0, sizeof(bgw));
709  snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
710  snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
712  "logical replication launcher");
713  bgw.bgw_restart_time = 5;
714  bgw.bgw_notify_pid = 0;
715  bgw.bgw_main_arg = (Datum) 0;
716 
718 }
719 
720 /*
721  * ApplyLauncherShmemInit
722  * Allocate and initialize replication launcher shared memory
723  */
724 void
726 {
727  bool found;
728 
729  LogicalRepCtx = (LogicalRepCtxStruct *)
730  ShmemInitStruct("Logical Replication Launcher Data",
732  &found);
733 
734  if (!found)
735  {
736  int slot;
737 
738  memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
739 
740  /* Initialize memory and spin locks for each worker slot. */
741  for (slot = 0; slot < max_logical_replication_workers; slot++)
742  {
743  LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
744 
745  memset(worker, 0, sizeof(LogicalRepWorker));
746  SpinLockInit(&worker->relmutex);
747  }
748  }
749 }
750 
751 /*
752  * Wakeup the launcher on commit if requested.
753  */
754 void
756 {
757  if (isCommit && on_commit_launcher_wakeup)
759 
761 }
762 
763 /*
764  * Request wakeup of the launcher on commit of the transaction.
765  *
766  * This is used to send launcher signal to stop sleeping and process the
767  * subscriptions when current transaction commits. Should be used when new
768  * tuple was added to the pg_subscription catalog.
769 */
770 void
772 {
775 }
776 
777 static void
779 {
780  if (LogicalRepCtx->launcher_pid != 0)
781  kill(LogicalRepCtx->launcher_pid, SIGUSR1);
782 }
783 
784 /*
785  * Main loop for the apply launcher process.
786  */
787 void
789 {
790  TimestampTz last_start_time = 0;
791 
792  ereport(DEBUG1,
793  (errmsg("logical replication launcher started")));
794 
796 
797  Assert(LogicalRepCtx->launcher_pid == 0);
798  LogicalRepCtx->launcher_pid = MyProcPid;
799 
800  /* Establish signal handlers. */
802  pqsignal(SIGTERM, die);
804 
805  /*
806  * Establish connection to nailed catalogs (we only ever access
807  * pg_subscription).
808  */
810 
811  /* Enter main loop */
812  for (;;)
813  {
814  int rc;
815  List *sublist;
816  ListCell *lc;
817  MemoryContext subctx;
818  MemoryContext oldctx;
820  long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
821 
823 
824  now = GetCurrentTimestamp();
825 
826  /* Limit the start retry to once a wal_retrieve_retry_interval */
827  if (TimestampDifferenceExceeds(last_start_time, now,
829  {
830  /* Use temporary context for the database list and worker info. */
832  "Logical Replication Launcher sublist",
836  oldctx = MemoryContextSwitchTo(subctx);
837 
838  /* search for subscriptions to start or stop. */
839  sublist = get_subscription_list();
840 
841  /* Start the missing workers for enabled subscriptions. */
842  foreach(lc, sublist)
843  {
844  Subscription *sub = (Subscription *) lfirst(lc);
845  LogicalRepWorker *w;
846 
847  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
848  w = logicalrep_worker_find(sub->oid, InvalidOid, false);
849  LWLockRelease(LogicalRepWorkerLock);
850 
851  if (sub->enabled && w == NULL)
852  {
853  last_start_time = now;
854  wait_time = wal_retrieve_retry_interval;
855 
856  logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
857  sub->owner, InvalidOid);
858  }
859  }
860 
861  /* Switch back to original memory context. */
862  MemoryContextSwitchTo(oldctx);
863  /* Clean the temporary memory. */
864  MemoryContextDelete(subctx);
865  }
866  else
867  {
868  /*
869  * The wait in previous cycle was interrupted in less than
870  * wal_retrieve_retry_interval since last worker was started, this
871  * usually means crash of the worker, so we should retry in
872  * wal_retrieve_retry_interval again.
873  */
874  wait_time = wal_retrieve_retry_interval;
875  }
876 
877  /* Wait for more work. */
878  rc = WaitLatch(MyLatch,
880  wait_time,
882 
883  /* emergency bailout if postmaster has died */
884  if (rc & WL_POSTMASTER_DEATH)
885  proc_exit(1);
886 
887  if (rc & WL_LATCH_SET)
888  {
891  }
892 
893  if (got_SIGHUP)
894  {
895  got_SIGHUP = false;
897  }
898  }
899 
900  /* Not reachable */
901 }
902 
903 /*
904  * Is current process the logical replication launcher?
905  */
906 bool
908 {
909  return LogicalRepCtx->launcher_pid == MyProcPid;
910 }
911 
912 /*
913  * Returns state of the subscriptions.
914  */
915 Datum
917 {
918 #define PG_STAT_GET_SUBSCRIPTION_COLS 8
919  Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
920  int i;
921  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
922  TupleDesc tupdesc;
923  Tuplestorestate *tupstore;
924  MemoryContext per_query_ctx;
925  MemoryContext oldcontext;
926 
927  /* check to see if caller supports us returning a tuplestore */
928  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
929  ereport(ERROR,
930  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
931  errmsg("set-valued function called in context that cannot accept a set")));
932  if (!(rsinfo->allowedModes & SFRM_Materialize))
933  ereport(ERROR,
934  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
935  errmsg("materialize mode required, but it is not " \
936  "allowed in this context")));
937 
938  /* Build a tuple descriptor for our result type */
939  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
940  elog(ERROR, "return type must be a row type");
941 
942  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
943  oldcontext = MemoryContextSwitchTo(per_query_ctx);
944 
945  tupstore = tuplestore_begin_heap(true, false, work_mem);
946  rsinfo->returnMode = SFRM_Materialize;
947  rsinfo->setResult = tupstore;
948  rsinfo->setDesc = tupdesc;
949 
950  MemoryContextSwitchTo(oldcontext);
951 
952  /* Make sure we get consistent view of the workers. */
953  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
954 
955  for (i = 0; i <= max_logical_replication_workers; i++)
956  {
957  /* for each row */
959  bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS];
960  int worker_pid;
961  LogicalRepWorker worker;
962 
963  memcpy(&worker, &LogicalRepCtx->workers[i],
964  sizeof(LogicalRepWorker));
965  if (!worker.proc || !IsBackendPid(worker.proc->pid))
966  continue;
967 
968  if (OidIsValid(subid) && worker.subid != subid)
969  continue;
970 
971  worker_pid = worker.proc->pid;
972 
973  MemSet(values, 0, sizeof(values));
974  MemSet(nulls, 0, sizeof(nulls));
975 
976  values[0] = ObjectIdGetDatum(worker.subid);
977  if (OidIsValid(worker.relid))
978  values[1] = ObjectIdGetDatum(worker.relid);
979  else
980  nulls[1] = true;
981  values[2] = Int32GetDatum(worker_pid);
982  if (XLogRecPtrIsInvalid(worker.last_lsn))
983  nulls[3] = true;
984  else
985  values[3] = LSNGetDatum(worker.last_lsn);
986  if (worker.last_send_time == 0)
987  nulls[4] = true;
988  else
989  values[4] = TimestampTzGetDatum(worker.last_send_time);
990  if (worker.last_recv_time == 0)
991  nulls[5] = true;
992  else
993  values[5] = TimestampTzGetDatum(worker.last_recv_time);
994  if (XLogRecPtrIsInvalid(worker.reply_lsn))
995  nulls[6] = true;
996  else
997  values[6] = LSNGetDatum(worker.reply_lsn);
998  if (worker.reply_time == 0)
999  nulls[7] = true;
1000  else
1001  values[7] = TimestampTzGetDatum(worker.reply_time);
1002 
1003  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1004 
1005  /*
1006  * If only a single subscription was requested, and we found it,
1007  * break.
1008  */
1009  if (OidIsValid(subid))
1010  break;
1011  }
1012 
1013  LWLockRelease(LogicalRepWorkerLock);
1014 
1015  /* clean up and return the tuplestore */
1016  tuplestore_donestoring(tupstore);
1017 
1018  return (Datum) 0;
1019 }
void AtEOXact_ApplyLauncher(bool isCommit)
Definition: launcher.c:755
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static volatile sig_atomic_t got_SIGHUP
Definition: launcher.c:83
#define NIL
Definition: pg_list.h:69
#define SIGUSR1
Definition: win32.h:202
WalReceiverConn * wrconn
Definition: worker.c:109
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:560
void RegisterBackgroundWorker(BackgroundWorker *worker)
Definition: bgworker.c:849
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
bool LWLockHeldByMeInMode(LWLock *l, LWLockMode mode)
Definition: lwlock.c:1849
#define DEBUG1
Definition: elog.h:25
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
int MyProcPid
Definition: globals.c:39
int errhint(const char *fmt,...)
Definition: elog.c:987
#define GETSTRUCT(TUP)
Definition: htup_details.h:656
void heap_endscan(HeapScanDesc scan)
Definition: heapam.c:1578
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
void ApplyLauncherMain(Datum main_arg)
Definition: launcher.c:788
bool LWLockHeldByMe(LWLock *l)
Definition: lwlock.c:1831
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
void logicalrep_worker_wakeup(Oid subid, Oid relid)
Definition: launcher.c:521
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1077
void CommitTransactionCommand(void)
Definition: xact.c:2748
#define SpinLockInit(lock)
Definition: spin.h:60
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define AccessShareLock
Definition: lockdefs.h:36
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:71
TimestampTz last_send_time
XLogRecPtr last_lsn
int bgw_restart_time
Definition: bgworker.h:93
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
#define MemSet(start, val, len)
Definition: c.h:857
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define heap_close(r, l)
Definition: heapam.h:97
FormData_pg_subscription * Form_pg_subscription
void ResetLatch(volatile Latch *latch)
Definition: latch.c:498
unsigned int Oid
Definition: postgres_ext.h:31
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:52
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1649
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:304
#define OidIsValid(objectId)
Definition: c.h:538
char bgw_function_name[BGW_MAXLEN]
Definition: bgworker.h:95
#define ALLOCSET_DEFAULT_MINSIZE
Definition: memutils.h:162
int wal_receiver_timeout
Definition: walreceiver.c:75
Latch procLatch
Definition: proc.h:103
XLogRecPtr relstate_lsn
static void logicalrep_worker_detach(void)
Definition: launcher.c:582
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:537
Datum bgw_main_arg
Definition: bgworker.h:96
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:336
unsigned short uint16
Definition: c.h:267
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:63
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
int max_sync_workers_per_subscription
Definition: launcher.c:61
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
XLogRecPtr reply_lsn
static void logicalrep_worker_cleanup(LogicalRepWorker *worker)
Definition: launcher.c:596
#define SUBREL_STATE_UNKNOWN
#define SubscriptionRelationId
void logicalrep_worker_attach(int slot)
Definition: launcher.c:546
#define PG_GETARG_OID(n)
Definition: fmgr.h:240
void logicalrep_worker_stop(Oid subid, Oid relid)
Definition: launcher.c:415
Size ApplyLauncherShmemSize(void)
Definition: launcher.c:679
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:320
#define BGW_NEVER_RESTART
Definition: bgworker.h:84
static void logicalrep_launcher_onexit(int code, Datum arg)
Definition: launcher.c:614
#define TIMESTAMP_NOBEGIN(j)
Definition: timestamp.h:112
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
HeapScanDesc heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key)
Definition: heapam.c:1399
BgwHandleStatus
Definition: bgworker.h:101
static bool on_commit_launcher_wakeup
Definition: launcher.c:85
#define ereport(elevel, rest)
Definition: elog.h:122
MemoryContext TopMemoryContext
Definition: mcxt.c:43
Definition: guc.h:72
List * lappend(List *list, void *datum)
Definition: list.c:128
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define WARNING
Definition: elog.h:40
int wal_retrieve_retry_interval
Definition: xlog.c:107
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
static void logicalrep_worker_onexit(int code, Datum arg)
Definition: launcher.c:625
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
void * palloc0(Size size)
Definition: mcxt.c:878
#define DEFAULT_NAPTIME_PER_CYCLE
Definition: launcher.c:58
uintptr_t Datum
Definition: postgres.h:372
void ApplyLauncherRegister(void)
Definition: launcher.c:698
static void logicalrep_launcher_sighup(SIGNAL_ARGS)
Definition: launcher.c:638
#define PG_STAT_GET_SUBSCRIPTION_COLS
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction)
Definition: heapam.c:1794
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1284
int work_mem
Definition: globals.c:113
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:655
#define SIGHUP
Definition: win32.h:188
#define InvalidOid
Definition: postgres_ext.h:36
int allowedModes
Definition: execnodes.h:268
TimestampTz launch_time
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
static List * get_subscription_list(void)
Definition: launcher.c:97
SetFunctionReturnMode returnMode
Definition: execnodes.h:270
int max_replication_slots
Definition: slot.c:99
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
TimestampTz last_recv_time
#define PG_ARGISNULL(n)
Definition: fmgr.h:174
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define SIGNAL_ARGS
Definition: c.h:1079
#define NULL
Definition: c.h:229
void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid)
Definition: launcher.c:256
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:90
#define Assert(condition)
Definition: c.h:675
#define lfirst(lc)
Definition: pg_list.h:106
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:59
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:229
void StartTransactionCommand(void)
Definition: xact.c:2678
int max_logical_replication_workers
Definition: launcher.c:60
#define BGW_MAXLEN
Definition: bgworker.h:85
size_t Size
Definition: c.h:356
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:92
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition: bgworker.c:933
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
#define MAXALIGN(LEN)
Definition: c.h:588
void ApplyLauncherShmemInit(void)
Definition: launcher.c:725
#define walrcv_disconnect(conn)
Definition: walreceiver.h:264
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:202
static void WaitForReplicationWorkerAttach(LogicalRepWorker *worker, BackgroundWorkerHandle *handle)
Definition: launcher.c:162
struct LogicalRepCtxStruct LogicalRepCtxStruct
Tuplestorestate * setResult
Definition: execnodes.h:273
static Datum values[MAXATTR]
Definition: bootstrap.c:163
ExprContext * econtext
Definition: execnodes.h:266
#define Int32GetDatum(X)
Definition: postgres.h:485
TupleDesc setDesc
Definition: execnodes.h:274
int errmsg(const char *fmt,...)
Definition: elog.c:797
pid_t bgw_notify_pid
Definition: bgworker.h:98
void die(SIGNAL_ARGS)
Definition: postgres.c:2617
bool IsLogicalLauncher(void)
Definition: launcher.c:907
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:74
#define ALLOCSET_DEFAULT_INITSIZE
Definition: memutils.h:163
int i
#define NameStr(name)
Definition: c.h:499
bool IsBackendPid(int pid)
Definition: procarray.c:2440
void * arg
struct Latch * MyLatch
Definition: globals.c:52
#define PG_FUNCTION_ARGS
Definition: fmgr.h:158
#define ALLOCSET_DEFAULT_MAXSIZE
Definition: memutils.h:164
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:100
Datum pg_stat_get_subscription(PG_FUNCTION_ARGS)
Definition: launcher.c:916
#define elog
Definition: elog.h:219
#define HeapTupleGetOid(tuple)
Definition: htup_details.h:695
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:224
static void ApplyLauncherWakeup(void)
Definition: launcher.c:778
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:771
char bgw_library_name[BGW_MAXLEN]
Definition: bgworker.h:94
Definition: pg_list.h:45
int pid
Definition: proc.h:108
#define WL_LATCH_SET
Definition: latch.h:124
void BackgroundWorkerInitializeConnection(char *dbname, char *username)
Definition: postmaster.c:5466
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1041
TimestampTz reply_time
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5518