PostgreSQL Source Code  git master
launcher.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * launcher.c
3  * PostgreSQL logical replication worker launcher process
4  *
5  * Copyright (c) 2016-2024, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/launcher.c
9  *
10  * NOTES
11  * This module contains the logical replication worker launcher which
12  * uses the background worker infrastructure to start the logical
13  * replication workers for every enabled subscription.
14  *
15  *-------------------------------------------------------------------------
16  */
17 
18 #include "postgres.h"
19 
20 #include "access/heapam.h"
21 #include "access/htup.h"
22 #include "access/htup_details.h"
23 #include "access/tableam.h"
24 #include "access/xact.h"
27 #include "funcapi.h"
28 #include "lib/dshash.h"
29 #include "miscadmin.h"
30 #include "pgstat.h"
31 #include "postmaster/bgworker.h"
32 #include "postmaster/interrupt.h"
34 #include "replication/slot.h"
37 #include "storage/ipc.h"
38 #include "storage/proc.h"
39 #include "storage/procarray.h"
40 #include "tcop/tcopprot.h"
41 #include "utils/builtins.h"
42 #include "utils/memutils.h"
43 #include "utils/pg_lsn.h"
44 #include "utils/snapmgr.h"
45 
46 /* max sleep time between cycles (3min) */
47 #define DEFAULT_NAPTIME_PER_CYCLE 180000L
48 
49 /* GUC variables */
53 
55 
56 typedef struct LogicalRepCtxStruct
57 {
58  /* Supervisor process. */
59  pid_t launcher_pid;
60 
61  /* Hash table holding last start times of subscriptions' apply workers. */
64 
65  /* Background workers. */
68 
70 
71 /* an entry in the last-start-times shared hash table */
73 {
74  Oid subid; /* OID of logrep subscription (hash key) */
75  TimestampTz last_start_time; /* last time its apply worker was started */
77 
78 /* parameters for the last-start-times shared hash table */
79 static const dshash_parameters dsh_params = {
80  sizeof(Oid),
86 };
87 
90 
91 static bool on_commit_launcher_wakeup = false;
92 
93 
94 static void ApplyLauncherWakeup(void);
95 static void logicalrep_launcher_onexit(int code, Datum arg);
96 static void logicalrep_worker_onexit(int code, Datum arg);
97 static void logicalrep_worker_detach(void);
98 static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
99 static int logicalrep_pa_worker_count(Oid subid);
100 static void logicalrep_launcher_attach_dshmem(void);
103 
104 
105 /*
106  * Load the list of subscriptions.
107  *
108  * Only the fields interesting for worker start/stop functions are filled for
109  * each subscription.
110  */
111 static List *
113 {
114  List *res = NIL;
115  Relation rel;
116  TableScanDesc scan;
117  HeapTuple tup;
118  MemoryContext resultcxt;
119 
120  /* This is the context that we will allocate our output data in */
121  resultcxt = CurrentMemoryContext;
122 
123  /*
124  * Start a transaction so we can access pg_database, and get a snapshot.
125  * We don't have a use for the snapshot itself, but we're interested in
126  * the secondary effect that it sets RecentGlobalXmin. (This is critical
127  * for anything that reads heap pages, because HOT may decide to prune
128  * them even if the process doesn't attempt to modify any tuples.)
129  *
130  * FIXME: This comment is inaccurate / the code buggy. A snapshot that is
131  * not pushed/active does not reliably prevent HOT pruning (->xmin could
132  * e.g. be cleared when cache invalidations are processed).
133  */
135  (void) GetTransactionSnapshot();
136 
137  rel = table_open(SubscriptionRelationId, AccessShareLock);
138  scan = table_beginscan_catalog(rel, 0, NULL);
139 
141  {
143  Subscription *sub;
144  MemoryContext oldcxt;
145 
146  /*
147  * Allocate our results in the caller's context, not the
148  * transaction's. We do this inside the loop, and restore the original
149  * context at the end, so that leaky things like heap_getnext() are
150  * not called in a potentially long-lived context.
151  */
152  oldcxt = MemoryContextSwitchTo(resultcxt);
153 
154  sub = (Subscription *) palloc0(sizeof(Subscription));
155  sub->oid = subform->oid;
156  sub->dbid = subform->subdbid;
157  sub->owner = subform->subowner;
158  sub->enabled = subform->subenabled;
159  sub->name = pstrdup(NameStr(subform->subname));
160  /* We don't fill fields we are not interested in. */
161 
162  res = lappend(res, sub);
163  MemoryContextSwitchTo(oldcxt);
164  }
165 
166  table_endscan(scan);
168 
170 
171  return res;
172 }
173 
174 /*
175  * Wait for a background worker to start up and attach to the shmem context.
176  *
177  * This is only needed for cleaning up the shared memory in case the worker
178  * fails to attach.
179  *
180  * Returns whether the attach was successful.
181  */
182 static bool
184  uint16 generation,
185  BackgroundWorkerHandle *handle)
186 {
187  BgwHandleStatus status;
188  int rc;
189 
190  for (;;)
191  {
192  pid_t pid;
193 
195 
196  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
197 
198  /* Worker either died or has started. Return false if died. */
199  if (!worker->in_use || worker->proc)
200  {
201  LWLockRelease(LogicalRepWorkerLock);
202  return worker->in_use;
203  }
204 
205  LWLockRelease(LogicalRepWorkerLock);
206 
207  /* Check if worker has died before attaching, and clean up after it. */
208  status = GetBackgroundWorkerPid(handle, &pid);
209 
210  if (status == BGWH_STOPPED)
211  {
212  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
213  /* Ensure that this was indeed the worker we waited for. */
214  if (generation == worker->generation)
216  LWLockRelease(LogicalRepWorkerLock);
217  return false;
218  }
219 
220  /*
221  * We need timeout because we generally don't get notified via latch
222  * about the worker attach. But we don't expect to have to wait long.
223  */
224  rc = WaitLatch(MyLatch,
226  10L, WAIT_EVENT_BGWORKER_STARTUP);
227 
228  if (rc & WL_LATCH_SET)
229  {
232  }
233  }
234 }
235 
236 /*
237  * Walks the workers array and searches for one that matches given
238  * subscription id and relid.
239  *
240  * We are only interested in the leader apply worker or table sync worker.
241  */
243 logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
244 {
245  int i;
246  LogicalRepWorker *res = NULL;
247 
248  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
249 
250  /* Search for attached worker for a given subscription id. */
251  for (i = 0; i < max_logical_replication_workers; i++)
252  {
254 
255  /* Skip parallel apply workers. */
256  if (isParallelApplyWorker(w))
257  continue;
258 
259  if (w->in_use && w->subid == subid && w->relid == relid &&
260  (!only_running || w->proc))
261  {
262  res = w;
263  break;
264  }
265  }
266 
267  return res;
268 }
269 
270 /*
271  * Similar to logicalrep_worker_find(), but returns a list of all workers for
272  * the subscription, instead of just one.
273  */
274 List *
275 logicalrep_workers_find(Oid subid, bool only_running)
276 {
277  int i;
278  List *res = NIL;
279 
280  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
281 
282  /* Search for attached worker for a given subscription id. */
283  for (i = 0; i < max_logical_replication_workers; i++)
284  {
286 
287  if (w->in_use && w->subid == subid && (!only_running || w->proc))
288  res = lappend(res, w);
289  }
290 
291  return res;
292 }
293 
294 /*
295  * Start new logical replication background worker, if possible.
296  *
297  * Returns true on success, false on failure.
298  */
299 bool
301  Oid dbid, Oid subid, const char *subname, Oid userid,
302  Oid relid, dsm_handle subworker_dsm)
303 {
304  BackgroundWorker bgw;
305  BackgroundWorkerHandle *bgw_handle;
306  uint16 generation;
307  int i;
308  int slot = 0;
309  LogicalRepWorker *worker = NULL;
310  int nsyncworkers;
311  int nparallelapplyworkers;
313  bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
314  bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
315 
316  /*----------
317  * Sanity checks:
318  * - must be valid worker type
319  * - tablesync workers are only ones to have relid
320  * - parallel apply worker is the only kind of subworker
321  */
322  Assert(wtype != WORKERTYPE_UNKNOWN);
323  Assert(is_tablesync_worker == OidIsValid(relid));
324  Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
325 
326  ereport(DEBUG1,
327  (errmsg_internal("starting logical replication worker for subscription \"%s\"",
328  subname)));
329 
330  /* Report this after the initial starting message for consistency. */
331  if (max_replication_slots == 0)
332  ereport(ERROR,
333  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
334  errmsg("cannot start logical replication workers when max_replication_slots = 0")));
335 
336  /*
337  * We need to do the modification of the shared memory under lock so that
338  * we have consistent view.
339  */
340  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
341 
342 retry:
343  /* Find unused worker slot. */
344  for (i = 0; i < max_logical_replication_workers; i++)
345  {
347 
348  if (!w->in_use)
349  {
350  worker = w;
351  slot = i;
352  break;
353  }
354  }
355 
356  nsyncworkers = logicalrep_sync_worker_count(subid);
357 
359 
360  /*
361  * If we didn't find a free slot, try to do garbage collection. The
362  * reason we do this is because if some worker failed to start up and its
363  * parent has crashed while waiting, the in_use state was never cleared.
364  */
365  if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
366  {
367  bool did_cleanup = false;
368 
369  for (i = 0; i < max_logical_replication_workers; i++)
370  {
372 
373  /*
374  * If the worker was marked in use but didn't manage to attach in
375  * time, clean it up.
376  */
377  if (w->in_use && !w->proc &&
380  {
381  elog(WARNING,
382  "logical replication worker for subscription %u took too long to start; canceled",
383  w->subid);
384 
386  did_cleanup = true;
387  }
388  }
389 
390  if (did_cleanup)
391  goto retry;
392  }
393 
394  /*
395  * We don't allow to invoke more sync workers once we have reached the
396  * sync worker limit per subscription. So, just return silently as we
397  * might get here because of an otherwise harmless race condition.
398  */
399  if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
400  {
401  LWLockRelease(LogicalRepWorkerLock);
402  return false;
403  }
404 
405  nparallelapplyworkers = logicalrep_pa_worker_count(subid);
406 
407  /*
408  * Return false if the number of parallel apply workers reached the limit
409  * per subscription.
410  */
411  if (is_parallel_apply_worker &&
412  nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
413  {
414  LWLockRelease(LogicalRepWorkerLock);
415  return false;
416  }
417 
418  /*
419  * However if there are no more free worker slots, inform user about it
420  * before exiting.
421  */
422  if (worker == NULL)
423  {
424  LWLockRelease(LogicalRepWorkerLock);
426  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
427  errmsg("out of logical replication worker slots"),
428  errhint("You might need to increase %s.", "max_logical_replication_workers")));
429  return false;
430  }
431 
432  /* Prepare the worker slot. */
433  worker->type = wtype;
434  worker->launch_time = now;
435  worker->in_use = true;
436  worker->generation++;
437  worker->proc = NULL;
438  worker->dbid = dbid;
439  worker->userid = userid;
440  worker->subid = subid;
441  worker->relid = relid;
442  worker->relstate = SUBREL_STATE_UNKNOWN;
444  worker->stream_fileset = NULL;
445  worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
446  worker->parallel_apply = is_parallel_apply_worker;
447  worker->last_lsn = InvalidXLogRecPtr;
450  worker->reply_lsn = InvalidXLogRecPtr;
451  TIMESTAMP_NOBEGIN(worker->reply_time);
452 
453  /* Before releasing lock, remember generation for future identification. */
454  generation = worker->generation;
455 
456  LWLockRelease(LogicalRepWorkerLock);
457 
458  /* Register the new dynamic worker. */
459  memset(&bgw, 0, sizeof(bgw));
463  snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
464 
465  switch (worker->type)
466  {
467  case WORKERTYPE_APPLY:
468  snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
470  "logical replication apply worker for subscription %u",
471  subid);
472  snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
473  break;
474 
476  snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
478  "logical replication parallel apply worker for subscription %u",
479  subid);
480  snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
481 
482  memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
483  break;
484 
486  snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
488  "logical replication tablesync worker for subscription %u sync %u",
489  subid,
490  relid);
491  snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
492  break;
493 
494  case WORKERTYPE_UNKNOWN:
495  /* Should never happen. */
496  elog(ERROR, "unknown worker type");
497  }
498 
501  bgw.bgw_main_arg = Int32GetDatum(slot);
502 
503  if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
504  {
505  /* Failed to start worker, so clean up the worker slot. */
506  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
507  Assert(generation == worker->generation);
509  LWLockRelease(LogicalRepWorkerLock);
510 
512  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
513  errmsg("out of background worker slots"),
514  errhint("You might need to increase %s.", "max_worker_processes")));
515  return false;
516  }
517 
518  /* Now wait until it attaches. */
519  return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
520 }
521 
522 /*
523  * Internal function to stop the worker and wait until it detaches from the
524  * slot.
525  */
526 static void
528 {
529  uint16 generation;
530 
531  Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
532 
533  /*
534  * Remember which generation was our worker so we can check if what we see
535  * is still the same one.
536  */
537  generation = worker->generation;
538 
539  /*
540  * If we found a worker but it does not have proc set then it is still
541  * starting up; wait for it to finish starting and then kill it.
542  */
543  while (worker->in_use && !worker->proc)
544  {
545  int rc;
546 
547  LWLockRelease(LogicalRepWorkerLock);
548 
549  /* Wait a bit --- we don't expect to have to wait long. */
550  rc = WaitLatch(MyLatch,
552  10L, WAIT_EVENT_BGWORKER_STARTUP);
553 
554  if (rc & WL_LATCH_SET)
555  {
558  }
559 
560  /* Recheck worker status. */
561  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
562 
563  /*
564  * Check whether the worker slot is no longer used, which would mean
565  * that the worker has exited, or whether the worker generation is
566  * different, meaning that a different worker has taken the slot.
567  */
568  if (!worker->in_use || worker->generation != generation)
569  return;
570 
571  /* Worker has assigned proc, so it has started. */
572  if (worker->proc)
573  break;
574  }
575 
576  /* Now terminate the worker ... */
577  kill(worker->proc->pid, signo);
578 
579  /* ... and wait for it to die. */
580  for (;;)
581  {
582  int rc;
583 
584  /* is it gone? */
585  if (!worker->proc || worker->generation != generation)
586  break;
587 
588  LWLockRelease(LogicalRepWorkerLock);
589 
590  /* Wait a bit --- we don't expect to have to wait long. */
591  rc = WaitLatch(MyLatch,
593  10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
594 
595  if (rc & WL_LATCH_SET)
596  {
599  }
600 
601  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
602  }
603 }
604 
605 /*
606  * Stop the logical replication worker for subid/relid, if any.
607  */
608 void
610 {
611  LogicalRepWorker *worker;
612 
613  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
614 
615  worker = logicalrep_worker_find(subid, relid, false);
616 
617  if (worker)
618  {
619  Assert(!isParallelApplyWorker(worker));
620  logicalrep_worker_stop_internal(worker, SIGTERM);
621  }
622 
623  LWLockRelease(LogicalRepWorkerLock);
624 }
625 
626 /*
627  * Stop the given logical replication parallel apply worker.
628  *
629  * Node that the function sends SIGINT instead of SIGTERM to the parallel apply
630  * worker so that the worker exits cleanly.
631  */
632 void
634 {
635  int slot_no;
636  uint16 generation;
637  LogicalRepWorker *worker;
638 
639  SpinLockAcquire(&winfo->shared->mutex);
640  generation = winfo->shared->logicalrep_worker_generation;
641  slot_no = winfo->shared->logicalrep_worker_slot_no;
642  SpinLockRelease(&winfo->shared->mutex);
643 
644  Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
645 
646  /*
647  * Detach from the error_mq_handle for the parallel apply worker before
648  * stopping it. This prevents the leader apply worker from trying to
649  * receive the message from the error queue that might already be detached
650  * by the parallel apply worker.
651  */
652  if (winfo->error_mq_handle)
653  {
655  winfo->error_mq_handle = NULL;
656  }
657 
658  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
659 
660  worker = &LogicalRepCtx->workers[slot_no];
661  Assert(isParallelApplyWorker(worker));
662 
663  /*
664  * Only stop the worker if the generation matches and the worker is alive.
665  */
666  if (worker->generation == generation && worker->proc)
667  logicalrep_worker_stop_internal(worker, SIGINT);
668 
669  LWLockRelease(LogicalRepWorkerLock);
670 }
671 
672 /*
673  * Wake up (using latch) any logical replication worker for specified sub/rel.
674  */
675 void
677 {
678  LogicalRepWorker *worker;
679 
680  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
681 
682  worker = logicalrep_worker_find(subid, relid, true);
683 
684  if (worker)
686 
687  LWLockRelease(LogicalRepWorkerLock);
688 }
689 
690 /*
691  * Wake up (using latch) the specified logical replication worker.
692  *
693  * Caller must hold lock, else worker->proc could change under us.
694  */
695 void
697 {
698  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
699 
700  SetLatch(&worker->proc->procLatch);
701 }
702 
703 /*
704  * Attach to a slot.
705  */
706 void
708 {
709  /* Block concurrent access. */
710  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
711 
712  Assert(slot >= 0 && slot < max_logical_replication_workers);
714 
716  {
717  LWLockRelease(LogicalRepWorkerLock);
718  ereport(ERROR,
719  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
720  errmsg("logical replication worker slot %d is empty, cannot attach",
721  slot)));
722  }
723 
725  {
726  LWLockRelease(LogicalRepWorkerLock);
727  ereport(ERROR,
728  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
729  errmsg("logical replication worker slot %d is already used by "
730  "another worker, cannot attach", slot)));
731  }
732 
735 
736  LWLockRelease(LogicalRepWorkerLock);
737 }
738 
739 /*
740  * Stop the parallel apply workers if any, and detach the leader apply worker
741  * (cleans up the worker info).
742  */
743 static void
745 {
746  /* Stop the parallel apply workers. */
748  {
749  List *workers;
750  ListCell *lc;
751 
752  /*
753  * Detach from the error_mq_handle for all parallel apply workers
754  * before terminating them. This prevents the leader apply worker from
755  * receiving the worker termination message and sending it to logs
756  * when the same is already done by the parallel worker.
757  */
759 
760  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
761 
763  foreach(lc, workers)
764  {
766 
767  if (isParallelApplyWorker(w))
769  }
770 
771  LWLockRelease(LogicalRepWorkerLock);
772  }
773 
774  /* Block concurrent access. */
775  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
776 
778 
779  LWLockRelease(LogicalRepWorkerLock);
780 }
781 
782 /*
783  * Clean up worker info.
784  */
785 static void
787 {
788  Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
789 
790  worker->type = WORKERTYPE_UNKNOWN;
791  worker->in_use = false;
792  worker->proc = NULL;
793  worker->dbid = InvalidOid;
794  worker->userid = InvalidOid;
795  worker->subid = InvalidOid;
796  worker->relid = InvalidOid;
797  worker->leader_pid = InvalidPid;
798  worker->parallel_apply = false;
799 }
800 
801 /*
802  * Cleanup function for logical replication launcher.
803  *
804  * Called on logical replication launcher exit.
805  */
806 static void
808 {
810 }
811 
812 /*
813  * Cleanup function.
814  *
815  * Called on logical replication worker exit.
816  */
817 static void
819 {
820  /* Disconnect gracefully from the remote side. */
823 
825 
826  /* Cleanup fileset used for streaming transactions. */
827  if (MyLogicalRepWorker->stream_fileset != NULL)
829 
830  /*
831  * Session level locks may be acquired outside of a transaction in
832  * parallel apply mode and will not be released when the worker
833  * terminates, so manually release all locks before the worker exits.
834  *
835  * The locks will be acquired once the worker is initialized.
836  */
839 
841 }
842 
843 /*
844  * Count the number of registered (not necessarily running) sync workers
845  * for a subscription.
846  */
847 int
849 {
850  int i;
851  int res = 0;
852 
853  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
854 
855  /* Search for attached worker for a given subscription id. */
856  for (i = 0; i < max_logical_replication_workers; i++)
857  {
859 
860  if (isTablesyncWorker(w) && w->subid == subid)
861  res++;
862  }
863 
864  return res;
865 }
866 
867 /*
868  * Count the number of registered (but not necessarily running) parallel apply
869  * workers for a subscription.
870  */
871 static int
873 {
874  int i;
875  int res = 0;
876 
877  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
878 
879  /*
880  * Scan all attached parallel apply workers, only counting those which
881  * have the given subscription id.
882  */
883  for (i = 0; i < max_logical_replication_workers; i++)
884  {
886 
887  if (isParallelApplyWorker(w) && w->subid == subid)
888  res++;
889  }
890 
891  return res;
892 }
893 
894 /*
895  * ApplyLauncherShmemSize
896  * Compute space needed for replication launcher shared memory
897  */
898 Size
900 {
901  Size size;
902 
903  /*
904  * Need the fixed struct and the array of LogicalRepWorker.
905  */
906  size = sizeof(LogicalRepCtxStruct);
907  size = MAXALIGN(size);
909  sizeof(LogicalRepWorker)));
910  return size;
911 }
912 
913 /*
914  * ApplyLauncherRegister
915  * Register a background worker running the logical replication launcher.
916  */
917 void
919 {
920  BackgroundWorker bgw;
921 
922  /*
923  * The logical replication launcher is disabled during binary upgrades, to
924  * prevent logical replication workers from running on the source cluster.
925  * That could cause replication origins to move forward after having been
926  * copied to the target cluster, potentially creating conflicts with the
927  * copied data files.
928  */
930  return;
931 
932  memset(&bgw, 0, sizeof(bgw));
936  snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
937  snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
939  "logical replication launcher");
941  "logical replication launcher");
942  bgw.bgw_restart_time = 5;
943  bgw.bgw_notify_pid = 0;
944  bgw.bgw_main_arg = (Datum) 0;
945 
947 }
948 
949 /*
950  * ApplyLauncherShmemInit
951  * Allocate and initialize replication launcher shared memory
952  */
953 void
955 {
956  bool found;
957 
959  ShmemInitStruct("Logical Replication Launcher Data",
961  &found);
962 
963  if (!found)
964  {
965  int slot;
966 
968 
971 
972  /* Initialize memory and spin locks for each worker slot. */
973  for (slot = 0; slot < max_logical_replication_workers; slot++)
974  {
975  LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
976 
977  memset(worker, 0, sizeof(LogicalRepWorker));
978  SpinLockInit(&worker->relmutex);
979  }
980  }
981 }
982 
983 /*
984  * Initialize or attach to the dynamic shared hash table that stores the
985  * last-start times, if not already done.
986  * This must be called before accessing the table.
987  */
988 static void
990 {
991  MemoryContext oldcontext;
992 
993  /* Quick exit if we already did this. */
995  last_start_times != NULL)
996  return;
997 
998  /* Otherwise, use a lock to ensure only one process creates the table. */
999  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
1000 
1001  /* Be sure any local memory allocated by DSA routines is persistent. */
1003 
1005  {
1006  /* Initialize dynamic shared hash table for last-start times. */
1011 
1012  /* Store handles in shared memory for other backends to use. */
1015  }
1016  else if (!last_start_times)
1017  {
1018  /* Attach to existing dynamic shared hash table. */
1023  }
1024 
1025  MemoryContextSwitchTo(oldcontext);
1026  LWLockRelease(LogicalRepWorkerLock);
1027 }
1028 
1029 /*
1030  * Set the last-start time for the subscription.
1031  */
1032 static void
1034 {
1036  bool found;
1037 
1039 
1040  entry = dshash_find_or_insert(last_start_times, &subid, &found);
1041  entry->last_start_time = start_time;
1043 }
1044 
1045 /*
1046  * Return the last-start time for the subscription, or 0 if there isn't one.
1047  */
1048 static TimestampTz
1050 {
1052  TimestampTz ret;
1053 
1055 
1056  entry = dshash_find(last_start_times, &subid, false);
1057  if (entry == NULL)
1058  return 0;
1059 
1060  ret = entry->last_start_time;
1062 
1063  return ret;
1064 }
1065 
1066 /*
1067  * Remove the last-start-time entry for the subscription, if one exists.
1068  *
1069  * This has two use-cases: to remove the entry related to a subscription
1070  * that's been deleted or disabled (just to avoid leaking shared memory),
1071  * and to allow immediate restart of an apply worker that has exited
1072  * due to subscription parameter changes.
1073  */
1074 void
1076 {
1078 
1079  (void) dshash_delete_key(last_start_times, &subid);
1080 }
1081 
1082 /*
1083  * Wakeup the launcher on commit if requested.
1084  */
1085 void
1087 {
1088  if (isCommit)
1089  {
1092  }
1093 
1094  on_commit_launcher_wakeup = false;
1095 }
1096 
1097 /*
1098  * Request wakeup of the launcher on commit of the transaction.
1099  *
1100  * This is used to send launcher signal to stop sleeping and process the
1101  * subscriptions when current transaction commits. Should be used when new
1102  * tuple was added to the pg_subscription catalog.
1103 */
1104 void
1106 {
1109 }
1110 
1111 static void
1113 {
1114  if (LogicalRepCtx->launcher_pid != 0)
1116 }
1117 
1118 /*
1119  * Main loop for the apply launcher process.
1120  */
1121 void
1123 {
1124  ereport(DEBUG1,
1125  (errmsg_internal("logical replication launcher started")));
1126 
1128 
1131 
1132  /* Establish signal handlers. */
1134  pqsignal(SIGTERM, die);
1136 
1137  /*
1138  * Establish connection to nailed catalogs (we only ever access
1139  * pg_subscription).
1140  */
1141  BackgroundWorkerInitializeConnection(NULL, NULL, 0);
1142 
1143  /* Enter main loop */
1144  for (;;)
1145  {
1146  int rc;
1147  List *sublist;
1148  ListCell *lc;
1149  MemoryContext subctx;
1150  MemoryContext oldctx;
1151  long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
1152 
1154 
1155  /* Use temporary context to avoid leaking memory across cycles. */
1157  "Logical Replication Launcher sublist",
1159  oldctx = MemoryContextSwitchTo(subctx);
1160 
1161  /* Start any missing workers for enabled subscriptions. */
1162  sublist = get_subscription_list();
1163  foreach(lc, sublist)
1164  {
1165  Subscription *sub = (Subscription *) lfirst(lc);
1166  LogicalRepWorker *w;
1167  TimestampTz last_start;
1168  TimestampTz now;
1169  long elapsed;
1170 
1171  if (!sub->enabled)
1172  continue;
1173 
1174  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1175  w = logicalrep_worker_find(sub->oid, InvalidOid, false);
1176  LWLockRelease(LogicalRepWorkerLock);
1177 
1178  if (w != NULL)
1179  continue; /* worker is running already */
1180 
1181  /*
1182  * If the worker is eligible to start now, launch it. Otherwise,
1183  * adjust wait_time so that we'll wake up as soon as it can be
1184  * started.
1185  *
1186  * Each subscription's apply worker can only be restarted once per
1187  * wal_retrieve_retry_interval, so that errors do not cause us to
1188  * repeatedly restart the worker as fast as possible. In cases
1189  * where a restart is expected (e.g., subscription parameter
1190  * changes), another process should remove the last-start entry
1191  * for the subscription so that the worker can be restarted
1192  * without waiting for wal_retrieve_retry_interval to elapse.
1193  */
1194  last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
1196  if (last_start == 0 ||
1198  {
1201  sub->dbid, sub->oid, sub->name,
1202  sub->owner, InvalidOid,
1204  }
1205  else
1206  {
1207  wait_time = Min(wait_time,
1208  wal_retrieve_retry_interval - elapsed);
1209  }
1210  }
1211 
1212  /* Switch back to original memory context. */
1213  MemoryContextSwitchTo(oldctx);
1214  /* Clean the temporary memory. */
1215  MemoryContextDelete(subctx);
1216 
1217  /* Wait for more work. */
1218  rc = WaitLatch(MyLatch,
1220  wait_time,
1221  WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
1222 
1223  if (rc & WL_LATCH_SET)
1224  {
1227  }
1228 
1229  if (ConfigReloadPending)
1230  {
1231  ConfigReloadPending = false;
1233  }
1234  }
1235 
1236  /* Not reachable */
1237 }
1238 
1239 /*
1240  * Is current process the logical replication launcher?
1241  */
1242 bool
1244 {
1246 }
1247 
1248 /*
1249  * Return the pid of the leader apply worker if the given pid is the pid of a
1250  * parallel apply worker, otherwise, return InvalidPid.
1251  */
1252 pid_t
1254 {
1255  int leader_pid = InvalidPid;
1256  int i;
1257 
1258  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1259 
1260  for (i = 0; i < max_logical_replication_workers; i++)
1261  {
1263 
1264  if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
1265  {
1266  leader_pid = w->leader_pid;
1267  break;
1268  }
1269  }
1270 
1271  LWLockRelease(LogicalRepWorkerLock);
1272 
1273  return leader_pid;
1274 }
1275 
1276 /*
1277  * Returns state of the subscriptions.
1278  */
1279 Datum
1281 {
1282 #define PG_STAT_GET_SUBSCRIPTION_COLS 10
1283  Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
1284  int i;
1285  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1286 
1287  InitMaterializedSRF(fcinfo, 0);
1288 
1289  /* Make sure we get consistent view of the workers. */
1290  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1291 
1292  for (i = 0; i < max_logical_replication_workers; i++)
1293  {
1294  /* for each row */
1296  bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1297  int worker_pid;
1298  LogicalRepWorker worker;
1299 
1300  memcpy(&worker, &LogicalRepCtx->workers[i],
1301  sizeof(LogicalRepWorker));
1302  if (!worker.proc || !IsBackendPid(worker.proc->pid))
1303  continue;
1304 
1305  if (OidIsValid(subid) && worker.subid != subid)
1306  continue;
1307 
1308  worker_pid = worker.proc->pid;
1309 
1310  values[0] = ObjectIdGetDatum(worker.subid);
1311  if (isTablesyncWorker(&worker))
1312  values[1] = ObjectIdGetDatum(worker.relid);
1313  else
1314  nulls[1] = true;
1315  values[2] = Int32GetDatum(worker_pid);
1316 
1317  if (isParallelApplyWorker(&worker))
1318  values[3] = Int32GetDatum(worker.leader_pid);
1319  else
1320  nulls[3] = true;
1321 
1322  if (XLogRecPtrIsInvalid(worker.last_lsn))
1323  nulls[4] = true;
1324  else
1325  values[4] = LSNGetDatum(worker.last_lsn);
1326  if (worker.last_send_time == 0)
1327  nulls[5] = true;
1328  else
1330  if (worker.last_recv_time == 0)
1331  nulls[6] = true;
1332  else
1334  if (XLogRecPtrIsInvalid(worker.reply_lsn))
1335  nulls[7] = true;
1336  else
1337  values[7] = LSNGetDatum(worker.reply_lsn);
1338  if (worker.reply_time == 0)
1339  nulls[8] = true;
1340  else
1341  values[8] = TimestampTzGetDatum(worker.reply_time);
1342 
1343  switch (worker.type)
1344  {
1345  case WORKERTYPE_APPLY:
1346  values[9] = CStringGetTextDatum("apply");
1347  break;
1349  values[9] = CStringGetTextDatum("parallel apply");
1350  break;
1351  case WORKERTYPE_TABLESYNC:
1352  values[9] = CStringGetTextDatum("table synchronization");
1353  break;
1354  case WORKERTYPE_UNKNOWN:
1355  /* Should never happen. */
1356  elog(ERROR, "unknown worker type");
1357  }
1358 
1359  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1360  values, nulls);
1361 
1362  /*
1363  * If only a single subscription was requested, and we found it,
1364  * break.
1365  */
1366  if (OidIsValid(subid))
1367  break;
1368  }
1369 
1370  LWLockRelease(LogicalRepWorkerLock);
1371 
1372  return (Datum) 0;
1373 }
void pa_detach_all_error_mq(void)
bool InitializingApplyWorker
Definition: worker.c:318
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:296
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1766
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1790
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1654
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1618
void RegisterBackgroundWorker(BackgroundWorker *worker)
Definition: bgworker.c:862
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1082
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition: bgworker.c:970
#define BGW_NEVER_RESTART
Definition: bgworker.h:85
BgwHandleStatus
Definition: bgworker.h:104
@ BGWH_STOPPED
Definition: bgworker.h:107
@ BgWorkerStart_RecoveryFinished
Definition: bgworker.h:81
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:60
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:53
#define BGW_MAXLEN
Definition: bgworker.h:86
static Datum values[MAXATTR]
Definition: bootstrap.c:152
#define CStringGetTextDatum(s)
Definition: builtins.h:97
#define NameStr(name)
Definition: c.h:746
unsigned short uint16
Definition: c.h:505
#define Min(x, y)
Definition: c.h:1004
#define MAXALIGN(LEN)
Definition: c.h:811
#define Assert(condition)
Definition: c.h:858
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:398
#define OidIsValid(objectId)
Definition: c.h:775
size_t Size
Definition: c.h:605
int64 TimestampTz
Definition: timestamp.h:39
#define TIMESTAMP_NOBEGIN(j)
Definition: timestamp.h:159
dsa_area * dsa_attach(dsa_handle handle)
Definition: dsa.c:510
void dsa_pin_mapping(dsa_area *area)
Definition: dsa.c:635
dsa_handle dsa_get_handle(dsa_area *area)
Definition: dsa.c:498
void dsa_pin(dsa_area *area)
Definition: dsa.c:975
dsm_handle dsa_handle
Definition: dsa.h:136
#define DSA_HANDLE_INVALID
Definition: dsa.h:139
#define dsa_create(tranch_id)
Definition: dsa.h:117
bool dshash_delete_key(dshash_table *hash_table, const void *key)
Definition: dshash.c:503
void dshash_memcpy(void *dest, const void *src, size_t size, void *arg)
Definition: dshash.c:590
void dshash_release_lock(dshash_table *hash_table, void *entry)
Definition: dshash.c:558
void * dshash_find(dshash_table *hash_table, const void *key, bool exclusive)
Definition: dshash.c:390
dshash_table_handle dshash_get_hash_table_handle(dshash_table *hash_table)
Definition: dshash.c:367
dshash_hash dshash_memhash(const void *v, size_t size, void *arg)
Definition: dshash.c:581
void * dshash_find_or_insert(dshash_table *hash_table, const void *key, bool *found)
Definition: dshash.c:433
dshash_table * dshash_attach(dsa_area *area, const dshash_parameters *params, dshash_table_handle handle, void *arg)
Definition: dshash.c:270
int dshash_memcmp(const void *a, const void *b, size_t size, void *arg)
Definition: dshash.c:572
dshash_table * dshash_create(dsa_area *area, const dshash_parameters *params, void *arg)
Definition: dshash.c:206
#define DSHASH_HANDLE_INVALID
Definition: dshash.h:27
dsa_pointer dshash_table_handle
Definition: dshash.h:24
uint32 dsm_handle
Definition: dsm_impl.h:55
#define DSM_HANDLE_INVALID
Definition: dsm_impl.h:58
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1159
int errhint(const char *fmt,...)
Definition: elog.c:1319
int errcode(int sqlerrcode)
Definition: elog.c:859
int errmsg(const char *fmt,...)
Definition: elog.c:1072
#define WARNING
Definition: elog.h:36
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:224
#define ereport(elevel,...)
Definition: elog.h:149
void FileSetDeleteAll(FileSet *fileset)
Definition: fileset.c:150
#define PG_GETARG_OID(n)
Definition: fmgr.h:275
#define PG_ARGISNULL(n)
Definition: fmgr.h:209
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
bool IsBinaryUpgrade
Definition: globals.c:118
int MyProcPid
Definition: globals.c:45
struct Latch * MyLatch
Definition: globals.c:60
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
HeapTuple heap_getnext(TableScanDesc sscan, ScanDirection direction)
Definition: heapam.c:1248
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337
int i
Definition: isn.c:73
void SetLatch(Latch *latch)
Definition: latch.c:632
void ResetLatch(Latch *latch)
Definition: latch.c:724
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:517
#define WL_TIMEOUT
Definition: latch.h:130
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
Datum pg_stat_get_subscription(PG_FUNCTION_ARGS)
Definition: launcher.c:1280
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm)
Definition: launcher.c:300
#define DEFAULT_NAPTIME_PER_CYCLE
Definition: launcher.c:47
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:243
void AtEOXact_ApplyLauncher(bool isCommit)
Definition: launcher.c:1086
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:696
Size ApplyLauncherShmemSize(void)
Definition: launcher.c:899
bool IsLogicalLauncher(void)
Definition: launcher.c:1243
void logicalrep_worker_attach(int slot)
Definition: launcher.c:707
static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
Definition: launcher.c:1033
static List * get_subscription_list(void)
Definition: launcher.c:112
static void logicalrep_launcher_onexit(int code, Datum arg)
Definition: launcher.c:807
static dsa_area * last_start_times_dsa
Definition: launcher.c:88
void ApplyLauncherMain(Datum main_arg)
Definition: launcher.c:1122
#define PG_STAT_GET_SUBSCRIPTION_COLS
int max_logical_replication_workers
Definition: launcher.c:50
void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
Definition: launcher.c:633
List * logicalrep_workers_find(Oid subid, bool only_running)
Definition: launcher.c:275
static int logicalrep_pa_worker_count(Oid subid)
Definition: launcher.c:872
static bool on_commit_launcher_wakeup
Definition: launcher.c:91
struct LogicalRepCtxStruct LogicalRepCtxStruct
static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid)
Definition: launcher.c:1049
void logicalrep_worker_wakeup(Oid subid, Oid relid)
Definition: launcher.c:676
void ApplyLauncherShmemInit(void)
Definition: launcher.c:954
static void logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
Definition: launcher.c:527
static dshash_table * last_start_times
Definition: launcher.c:89
void logicalrep_worker_stop(Oid subid, Oid relid)
Definition: launcher.c:609
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:54
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:1105
static const dshash_parameters dsh_params
Definition: launcher.c:79
static LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:69
static void logicalrep_worker_onexit(int code, Datum arg)
Definition: launcher.c:818
pid_t GetLeaderApplyWorkerPid(pid_t pid)
Definition: launcher.c:1253
int max_sync_workers_per_subscription
Definition: launcher.c:51
static void logicalrep_worker_detach(void)
Definition: launcher.c:744
static bool WaitForReplicationWorkerAttach(LogicalRepWorker *worker, uint16 generation, BackgroundWorkerHandle *handle)
Definition: launcher.c:183
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:848
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition: launcher.c:1075
void ApplyLauncherRegister(void)
Definition: launcher.c:918
struct LauncherLastStartTimesEntry LauncherLastStartTimesEntry
static void ApplyLauncherWakeup(void)
Definition: launcher.c:1112
static void logicalrep_launcher_attach_dshmem(void)
Definition: launcher.c:989
int max_parallel_apply_workers_per_subscription
Definition: launcher.c:52
static void logicalrep_worker_cleanup(LogicalRepWorker *worker)
Definition: launcher.c:786
List * lappend(List *list, void *datum)
Definition: list.c:339
void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
Definition: lock.c:2147
#define DEFAULT_LOCKMETHOD
Definition: lock.h:125
#define AccessShareLock
Definition: lockdefs.h:36
bool LWLockHeldByMe(LWLock *lock)
Definition: lwlock.c:1895
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1170
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1939
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1783
@ LWTRANCHE_LAUNCHER_HASH
Definition: lwlock.h:207
@ LWTRANCHE_LAUNCHER_DSA
Definition: lwlock.h:206
@ LW_SHARED
Definition: lwlock.h:115
@ LW_EXCLUSIVE
Definition: lwlock.h:114
char * pstrdup(const char *in)
Definition: mcxt.c:1695
MemoryContext TopMemoryContext
Definition: mcxt.c:149
void * palloc0(Size size)
Definition: mcxt.c:1346
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
#define InvalidPid
Definition: miscadmin.h:32
void * arg
#define MAXPGPATH
static time_t start_time
Definition: pg_ctl.c:94
#define lfirst(lc)
Definition: pg_list.h:172
#define NIL
Definition: pg_list.h:68
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
NameData subname
FormData_pg_subscription * Form_pg_subscription
#define die(msg)
pqsigfunc pqsignal(int signo, pqsigfunc func)
#define snprintf
Definition: port.h:238
uintptr_t Datum
Definition: postgres.h:64
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
void BackgroundWorkerInitializeConnection(const char *dbname, const char *username, uint32 flags)
Definition: postmaster.c:4155
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:4229
bool IsBackendPid(int pid)
Definition: procarray.c:3278
MemoryContextSwitchTo(old_ctx)
@ ForwardScanDirection
Definition: sdir.h:28
void shm_mq_detach(shm_mq_handle *mqh)
Definition: shm_mq.c:843
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
static pg_noinline void Size size
Definition: slab.c:607
int max_replication_slots
Definition: slot.c:141
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:216
#define SpinLockInit(lock)
Definition: spin.h:60
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
PGPROC * MyProc
Definition: proc.c:66
char bgw_function_name[BGW_MAXLEN]
Definition: bgworker.h:97
Datum bgw_main_arg
Definition: bgworker.h:98
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:91
int bgw_restart_time
Definition: bgworker.h:95
char bgw_type[BGW_MAXLEN]
Definition: bgworker.h:92
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:94
char bgw_extra[BGW_EXTRALEN]
Definition: bgworker.h:99
pid_t bgw_notify_pid
Definition: bgworker.h:100
char bgw_library_name[MAXPGPATH]
Definition: bgworker.h:96
TimestampTz last_start_time
Definition: launcher.c:75
Definition: pg_list.h:54
dsa_handle last_start_dsa
Definition: launcher.c:62
dshash_table_handle last_start_dsh
Definition: launcher.c:63
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:66
XLogRecPtr relstate_lsn
TimestampTz last_recv_time
LogicalRepWorkerType type
TimestampTz launch_time
TimestampTz reply_time
FileSet * stream_fileset
XLogRecPtr reply_lsn
XLogRecPtr last_lsn
TimestampTz last_send_time
int pid
Definition: proc.h:178
Latch procLatch
Definition: proc.h:165
shm_mq_handle * error_mq_handle
ParallelApplyWorkerShared * shared
TupleDesc setDesc
Definition: execnodes.h:340
Tuplestorestate * setResult
Definition: execnodes.h:339
Definition: dsa.c:348
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
TableScanDesc table_beginscan_catalog(Relation relation, int nkeys, struct ScanKeyData *key)
Definition: tableam.c:112
static void table_endscan(TableScanDesc scan)
Definition: tableam.h:1029
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:750
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
int wal_receiver_timeout
Definition: walreceiver.c:88
#define walrcv_disconnect(conn)
Definition: walreceiver.h:464
#define SIGHUP
Definition: win32_port.h:168
#define kill(pid, sig)
Definition: win32_port.h:485
#define SIGUSR1
Definition: win32_port.h:180
#define isParallelApplyWorker(worker)
LogicalRepWorkerType
@ WORKERTYPE_TABLESYNC
@ WORKERTYPE_UNKNOWN
@ WORKERTYPE_PARALLEL_APPLY
@ WORKERTYPE_APPLY
#define isTablesyncWorker(worker)
static bool am_leader_apply_worker(void)
void StartTransactionCommand(void)
Definition: xact.c:2995
void CommitTransactionCommand(void)
Definition: xact.c:3093
int wal_retrieve_retry_interval
Definition: xlog.c:134
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28