PostgreSQL Source Code  git master
applyparallelworker.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * applyparallelworker.c
3  * Support routines for applying xact by parallel apply worker
4  *
5  * Copyright (c) 2023, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/applyparallelworker.c
9  *
10  * This file contains the code to launch, set up, and teardown a parallel apply
11  * worker which receives the changes from the leader worker and invokes routines
12  * to apply those on the subscriber database. Additionally, this file contains
13  * routines that are intended to support setting up, using, and tearing down a
14  * ParallelApplyWorkerInfo which is required so the leader worker and parallel
15  * apply workers can communicate with each other.
16  *
17  * The parallel apply workers are assigned (if available) as soon as xact's
18  * first stream is received for subscriptions that have set their 'streaming'
19  * option as parallel. The leader apply worker will send changes to this new
20  * worker via shared memory. We keep this worker assigned till the transaction
21  * commit is received and also wait for the worker to finish at commit. This
22  * preserves commit ordering and avoid file I/O in most cases, although we
23  * still need to spill to a file if there is no worker available. See comments
24  * atop logical/worker to know more about streamed xacts whose changes are
25  * spilled to disk. It is important to maintain commit order to avoid failures
26  * due to: (a) transaction dependencies - say if we insert a row in the first
27  * transaction and update it in the second transaction on publisher then
28  * allowing the subscriber to apply both in parallel can lead to failure in the
29  * update; (b) deadlocks - allowing transactions that update the same set of
30  * rows/tables in the opposite order to be applied in parallel can lead to
31  * deadlocks.
32  *
33  * A worker pool is used to avoid restarting workers for each streaming
34  * transaction. We maintain each worker's information (ParallelApplyWorkerInfo)
35  * in the ParallelApplyWorkerPool. After successfully launching a new worker,
36  * its information is added to the ParallelApplyWorkerPool. Once the worker
37  * finishes applying the transaction, it is marked as available for re-use.
38  * Now, before starting a new worker to apply the streaming transaction, we
39  * check the list for any available worker. Note that we retain a maximum of
40  * half the max_parallel_apply_workers_per_subscription workers in the pool and
41  * after that, we simply exit the worker after applying the transaction.
42  *
43  * XXX This worker pool threshold is arbitrary and we can provide a GUC
44  * variable for this in the future if required.
45  *
46  * The leader apply worker will create a separate dynamic shared memory segment
47  * when each parallel apply worker starts. The reason for this design is that
48  * we cannot predict how many workers will be needed. It may be possible to
49  * allocate enough shared memory in one segment based on the maximum number of
50  * parallel apply workers (max_parallel_apply_workers_per_subscription), but
51  * this would waste memory if no process is actually started.
52  *
53  * The dynamic shared memory segment contains: (a) a shm_mq that is used to
54  * send changes in the transaction from leader apply worker to parallel apply
55  * worker; (b) another shm_mq that is used to send errors (and other messages
56  * reported via elog/ereport) from the parallel apply worker to leader apply
57  * worker; (c) necessary information to be shared among parallel apply workers
58  * and the leader apply worker (i.e. members of ParallelApplyWorkerShared).
59  *
60  * Locking Considerations
61  * ----------------------
62  * We have a risk of deadlock due to concurrently applying the transactions in
63  * parallel mode that were independent on the publisher side but became
64  * dependent on the subscriber side due to the different database structures
65  * (like schema of subscription tables, constraints, etc.) on each side. This
66  * can happen even without parallel mode when there are concurrent operations
67  * on the subscriber. In order to detect the deadlocks among leader (LA) and
68  * parallel apply (PA) workers, we used lmgr locks when the PA waits for the
69  * next stream (set of changes) and LA waits for PA to finish the transaction.
70  * An alternative approach could be to not allow parallelism when the schema of
71  * tables is different between the publisher and subscriber but that would be
72  * too restrictive and would require the publisher to send much more
73  * information than it is currently sending.
74  *
75  * Consider a case where the subscribed table does not have a unique key on the
76  * publisher and has a unique key on the subscriber. The deadlock can happen in
77  * the following ways:
78  *
79  * 1) Deadlock between the leader apply worker and a parallel apply worker
80  *
81  * Consider that the parallel apply worker (PA) is executing TX-1 and the
82  * leader apply worker (LA) is executing TX-2 concurrently on the subscriber.
83  * Now, LA is waiting for PA because of the unique key constraint of the
84  * subscribed table while PA is waiting for LA to send the next stream of
85  * changes or transaction finish command message.
86  *
87  * In order for lmgr to detect this, we have LA acquire a session lock on the
88  * remote transaction (by pa_lock_stream()) and have PA wait on the lock before
89  * trying to receive the next stream of changes. Specifically, LA will acquire
90  * the lock in AccessExclusive mode before sending the STREAM_STOP and will
91  * release it if already acquired after sending the STREAM_START, STREAM_ABORT
92  * (for toplevel transaction), STREAM_PREPARE, and STREAM_COMMIT. The PA will
93  * acquire the lock in AccessShare mode after processing STREAM_STOP and
94  * STREAM_ABORT (for subtransaction) and then release the lock immediately
95  * after acquiring it.
96  *
97  * The lock graph for the above example will look as follows:
98  * LA (waiting to acquire the lock on the unique index) -> PA (waiting to
99  * acquire the stream lock) -> LA
100  *
101  * This way, when PA is waiting for LA for the next stream of changes, we can
102  * have a wait-edge from PA to LA in lmgr, which will make us detect the
103  * deadlock between LA and PA.
104  *
105  * 2) Deadlock between the leader apply worker and parallel apply workers
106  *
107  * This scenario is similar to the first case but TX-1 and TX-2 are executed by
108  * two parallel apply workers (PA-1 and PA-2 respectively). In this scenario,
109  * PA-2 is waiting for PA-1 to complete its transaction while PA-1 is waiting
110  * for subsequent input from LA. Also, LA is waiting for PA-2 to complete its
111  * transaction in order to preserve the commit order. There is a deadlock among
112  * the three processes.
113  *
114  * In order for lmgr to detect this, we have PA acquire a session lock (this is
115  * a different lock than referred in the previous case, see
116  * pa_lock_transaction()) on the transaction being applied and have LA wait on
117  * the lock before proceeding in the transaction finish commands. Specifically,
118  * PA will acquire this lock in AccessExclusive mode before executing the first
119  * message of the transaction and release it at the xact end. LA will acquire
120  * this lock in AccessShare mode at transaction finish commands and release it
121  * immediately.
122  *
123  * The lock graph for the above example will look as follows:
124  * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire the
125  * lock due to unique index constraint) -> PA-1 (waiting to acquire the stream
126  * lock) -> LA
127  *
128  * This way when LA is waiting to finish the transaction end command to preserve
129  * the commit order, we will be able to detect deadlock, if any.
130  *
131  * One might think we can use XactLockTableWait(), but XactLockTableWait()
132  * considers PREPARED TRANSACTION as still in progress which means the lock
133  * won't be released even after the parallel apply worker has prepared the
134  * transaction.
135  *
136  * 3) Deadlock when the shm_mq buffer is full
137  *
138  * In the previous scenario (ie. PA-1 and PA-2 are executing transactions
139  * concurrently), if the shm_mq buffer between LA and PA-2 is full, LA has to
140  * wait to send messages, and this wait doesn't appear in lmgr.
141  *
142  * To avoid this wait, we use a non-blocking write and wait with a timeout. If
143  * the timeout is exceeded, the LA will serialize all the pending messages to
144  * a file and indicate PA-2 that it needs to read that file for the remaining
145  * messages. Then LA will start waiting for commit as in the previous case
146  * which will detect deadlock if any. See pa_send_data() and
147  * enum TransApplyAction.
148  *
149  * Lock types
150  * ----------
151  * Both the stream lock and the transaction lock mentioned above are
152  * session-level locks because both locks could be acquired outside the
153  * transaction, and the stream lock in the leader needs to persist across
154  * transaction boundaries i.e. until the end of the streaming transaction.
155  *-------------------------------------------------------------------------
156  */
157 
158 #include "postgres.h"
159 
160 #include "libpq/pqformat.h"
161 #include "libpq/pqmq.h"
162 #include "pgstat.h"
163 #include "postmaster/interrupt.h"
166 #include "replication/origin.h"
168 #include "storage/ipc.h"
169 #include "storage/lmgr.h"
170 #include "tcop/tcopprot.h"
171 #include "utils/inval.h"
172 #include "utils/memutils.h"
173 #include "utils/syscache.h"
174 
175 #define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067
176 
177 /*
178  * DSM keys for parallel apply worker. Unlike other parallel execution code,
179  * since we don't need to worry about DSM keys conflicting with plan_node_id we
180  * can use small integers.
181  */
182 #define PARALLEL_APPLY_KEY_SHARED 1
183 #define PARALLEL_APPLY_KEY_MQ 2
184 #define PARALLEL_APPLY_KEY_ERROR_QUEUE 3
185 
186 /* Queue size of DSM, 16 MB for now. */
187 #define DSM_QUEUE_SIZE (16 * 1024 * 1024)
188 
189 /*
190  * Error queue size of DSM. It is desirable to make it large enough that a
191  * typical ErrorResponse can be sent without blocking. That way, a worker that
192  * errors out can write the whole message into the queue and terminate without
193  * waiting for the user backend.
194  */
195 #define DSM_ERROR_QUEUE_SIZE (16 * 1024)
196 
197 /*
198  * There are three fields in each message received by the parallel apply
199  * worker: start_lsn, end_lsn and send_time. Because we have updated these
200  * statistics in the leader apply worker, we can ignore these fields in the
201  * parallel apply worker (see function LogicalRepApplyLoop).
202  */
203 #define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
204 
205 /*
206  * The type of session-level lock on a transaction being applied on a logical
207  * replication subscriber.
208  */
209 #define PARALLEL_APPLY_LOCK_STREAM 0
210 #define PARALLEL_APPLY_LOCK_XACT 1
211 
212 /*
213  * Hash table entry to map xid to the parallel apply worker state.
214  */
216 {
217  TransactionId xid; /* Hash key -- must be first */
220 
221 /*
222  * A hash table used to cache the state of streaming transactions being applied
223  * by the parallel apply workers.
224  */
225 static HTAB *ParallelApplyTxnHash = NULL;
226 
227 /*
228 * A list (pool) of active parallel apply workers. The information for
229 * the new worker is added to the list after successfully launching it. The
230 * list entry is removed if there are already enough workers in the worker
231 * pool at the end of the transaction. For more information about the worker
232 * pool, see comments atop this file.
233  */
235 
236 /*
237  * Information shared between leader apply worker and parallel apply worker.
238  */
240 
241 /*
242  * Is there a message sent by a parallel apply worker that the leader apply
243  * worker needs to receive?
244  */
245 volatile sig_atomic_t ParallelApplyMessagePending = false;
246 
247 /*
248  * Cache the parallel apply worker information required for applying the
249  * current streaming transaction. It is used to save the cost of searching the
250  * hash table when applying the changes between STREAM_START and STREAM_STOP.
251  */
253 
254 /* A list to maintain subtransactions, if any. */
255 static List *subxactlist = NIL;
256 
257 static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo);
260 
261 /*
262  * Returns true if it is OK to start a parallel apply worker, false otherwise.
263  */
264 static bool
266 {
267  /* Only leader apply workers can start parallel apply workers. */
268  if (!am_leader_apply_worker())
269  return false;
270 
271  /*
272  * It is good to check for any change in the subscription parameter to
273  * avoid the case where for a very long time the change doesn't get
274  * reflected. This can happen when there is a constant flow of streaming
275  * transactions that are handled by parallel apply workers.
276  *
277  * It is better to do it before the below checks so that the latest values
278  * of subscription can be used for the checks.
279  */
281 
282  /*
283  * Don't start a new parallel apply worker if the subscription is not
284  * using parallel streaming mode, or if the publisher does not support
285  * parallel apply.
286  */
288  return false;
289 
290  /*
291  * Don't start a new parallel worker if user has set skiplsn as it's
292  * possible that they want to skip the streaming transaction. For
293  * streaming transactions, we need to serialize the transaction to a file
294  * so that we can get the last LSN of the transaction to judge whether to
295  * skip before starting to apply the change.
296  *
297  * One might think that we could allow parallelism if the first lsn of the
298  * transaction is greater than skiplsn, but we don't send it with the
299  * STREAM START message, and it doesn't seem worth sending the extra eight
300  * bytes with the STREAM START to enable parallelism for this case.
301  */
303  return false;
304 
305  /*
306  * For streaming transactions that are being applied using a parallel
307  * apply worker, we cannot decide whether to apply the change for a
308  * relation that is not in the READY state (see
309  * should_apply_changes_for_rel) as we won't know remote_final_lsn by that
310  * time. So, we don't start the new parallel apply worker in this case.
311  */
312  if (!AllTablesyncsReady())
313  return false;
314 
315  return true;
316 }
317 
318 /*
319  * Set up a dynamic shared memory segment.
320  *
321  * We set up a control region that contains a fixed-size worker info
322  * (ParallelApplyWorkerShared), a message queue, and an error queue.
323  *
324  * Returns true on success, false on failure.
325  */
326 static bool
328 {
330  Size segsize;
331  dsm_segment *seg;
332  shm_toc *toc;
334  shm_mq *mq;
335  Size queue_size = DSM_QUEUE_SIZE;
336  Size error_queue_size = DSM_ERROR_QUEUE_SIZE;
337 
338  /*
339  * Estimate how much shared memory we need.
340  *
341  * Because the TOC machinery may choose to insert padding of oddly-sized
342  * requests, we must estimate each chunk separately.
343  *
344  * We need one key to register the location of the header, and two other
345  * keys to track the locations of the message queue and the error message
346  * queue.
347  */
350  shm_toc_estimate_chunk(&e, queue_size);
351  shm_toc_estimate_chunk(&e, error_queue_size);
352 
354  segsize = shm_toc_estimate(&e);
355 
356  /* Create the shared memory segment and establish a table of contents. */
357  seg = dsm_create(shm_toc_estimate(&e), 0);
358  if (!seg)
359  return false;
360 
362  segsize);
363 
364  /* Set up the header region. */
365  shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared));
366  SpinLockInit(&shared->mutex);
367 
371  shared->fileset_state = FS_EMPTY;
372 
374 
375  /* Set up message queue for the worker. */
376  mq = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
379 
380  /* Attach the queue. */
381  winfo->mq_handle = shm_mq_attach(mq, seg, NULL);
382 
383  /* Set up error queue for the worker. */
384  mq = shm_mq_create(shm_toc_allocate(toc, error_queue_size),
385  error_queue_size);
388 
389  /* Attach the queue. */
390  winfo->error_mq_handle = shm_mq_attach(mq, seg, NULL);
391 
392  /* Return results to caller. */
393  winfo->dsm_seg = seg;
394  winfo->shared = shared;
395 
396  return true;
397 }
398 
399 /*
400  * Try to get a parallel apply worker from the pool. If none is available then
401  * start a new one.
402  */
405 {
406  MemoryContext oldcontext;
407  bool launched;
409  ListCell *lc;
410 
411  /* Try to get an available parallel apply worker from the worker pool. */
412  foreach(lc, ParallelApplyWorkerPool)
413  {
414  winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
415 
416  if (!winfo->in_use)
417  return winfo;
418  }
419 
420  /*
421  * Start a new parallel apply worker.
422  *
423  * The worker info can be used for the lifetime of the worker process, so
424  * create it in a permanent context.
425  */
426  oldcontext = MemoryContextSwitchTo(ApplyContext);
427 
429 
430  /* Setup shared memory. */
431  if (!pa_setup_dsm(winfo))
432  {
433  MemoryContextSwitchTo(oldcontext);
434  pfree(winfo);
435  return NULL;
436  }
437 
442  InvalidOid,
443  dsm_segment_handle(winfo->dsm_seg));
444 
445  if (launched)
446  {
448  }
449  else
450  {
451  pa_free_worker_info(winfo);
452  winfo = NULL;
453  }
454 
455  MemoryContextSwitchTo(oldcontext);
456 
457  return winfo;
458 }
459 
460 /*
461  * Allocate a parallel apply worker that will be used for the specified xid.
462  *
463  * We first try to get an available worker from the pool, if any and then try
464  * to launch a new worker. On successful allocation, remember the worker
465  * information in the hash table so that we can get it later for processing the
466  * streaming changes.
467  */
468 void
470 {
471  bool found;
472  ParallelApplyWorkerInfo *winfo = NULL;
474 
475  if (!pa_can_start())
476  return;
477 
478  winfo = pa_launch_parallel_worker();
479  if (!winfo)
480  return;
481 
482  /* First time through, initialize parallel apply worker state hashtable. */
484  {
485  HASHCTL ctl;
486 
487  MemSet(&ctl, 0, sizeof(ctl));
488  ctl.keysize = sizeof(TransactionId);
489  ctl.entrysize = sizeof(ParallelApplyWorkerEntry);
490  ctl.hcxt = ApplyContext;
491 
492  ParallelApplyTxnHash = hash_create("logical replication parallel apply workers hash",
493  16, &ctl,
495  }
496 
497  /* Create an entry for the requested transaction. */
498  entry = hash_search(ParallelApplyTxnHash, &xid, HASH_ENTER, &found);
499  if (found)
500  elog(ERROR, "hash table corrupted");
501 
502  /* Update the transaction information in shared memory. */
503  SpinLockAcquire(&winfo->shared->mutex);
505  winfo->shared->xid = xid;
506  SpinLockRelease(&winfo->shared->mutex);
507 
508  winfo->in_use = true;
509  winfo->serialize_changes = false;
510  entry->winfo = winfo;
511  entry->xid = xid;
512 }
513 
514 /*
515  * Find the assigned worker for the given transaction, if any.
516  */
519 {
520  bool found;
522 
523  if (!TransactionIdIsValid(xid))
524  return NULL;
525 
527  return NULL;
528 
529  /* Return the cached parallel apply worker if valid. */
531  return stream_apply_worker;
532 
533  /* Find an entry for the requested transaction. */
534  entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
535  if (found)
536  {
537  /* The worker must not have exited. */
538  Assert(entry->winfo->in_use);
539  return entry->winfo;
540  }
541 
542  return NULL;
543 }
544 
545 /*
546  * Makes the worker available for reuse.
547  *
548  * This removes the parallel apply worker entry from the hash table so that it
549  * can't be used. If there are enough workers in the pool, it stops the worker
550  * and frees the corresponding info. Otherwise it just marks the worker as
551  * available for reuse.
552  *
553  * For more information about the worker pool, see comments atop this file.
554  */
555 static void
557 {
559  Assert(winfo->in_use);
561 
562  if (!hash_search(ParallelApplyTxnHash, &winfo->shared->xid, HASH_REMOVE, NULL))
563  elog(ERROR, "hash table corrupted");
564 
565  /*
566  * Stop the worker if there are enough workers in the pool.
567  *
568  * XXX Additionally, we also stop the worker if the leader apply worker
569  * serialize part of the transaction data due to a send timeout. This is
570  * because the message could be partially written to the queue and there
571  * is no way to clean the queue other than resending the message until it
572  * succeeds. Instead of trying to send the data which anyway would have
573  * been serialized and then letting the parallel apply worker deal with
574  * the spurious message, we stop the worker.
575  */
576  if (winfo->serialize_changes ||
579  {
580  int slot_no;
581  uint16 generation;
582 
583  SpinLockAcquire(&winfo->shared->mutex);
584  generation = winfo->shared->logicalrep_worker_generation;
585  slot_no = winfo->shared->logicalrep_worker_slot_no;
586  SpinLockRelease(&winfo->shared->mutex);
587 
588  logicalrep_pa_worker_stop(slot_no, generation);
589 
590  pa_free_worker_info(winfo);
591 
592  return;
593  }
594 
595  winfo->in_use = false;
596  winfo->serialize_changes = false;
597 }
598 
599 /*
600  * Free the parallel apply worker information and unlink the files with
601  * serialized changes if any.
602  */
603 static void
605 {
606  Assert(winfo);
607 
608  if (winfo->mq_handle)
609  shm_mq_detach(winfo->mq_handle);
610 
611  if (winfo->error_mq_handle)
613 
614  /* Unlink the files with serialized changes. */
615  if (winfo->serialize_changes)
617 
618  if (winfo->dsm_seg)
619  dsm_detach(winfo->dsm_seg);
620 
621  /* Remove from the worker pool. */
623 
624  pfree(winfo);
625 }
626 
627 /*
628  * Detach the error queue for all parallel apply workers.
629  */
630 void
632 {
633  ListCell *lc;
634 
635  foreach(lc, ParallelApplyWorkerPool)
636  {
638 
640  winfo->error_mq_handle = NULL;
641  }
642 }
643 
644 /*
645  * Check if there are any pending spooled messages.
646  */
647 static bool
649 {
650  PartialFileSetState fileset_state;
651 
652  fileset_state = pa_get_fileset_state();
653 
654  return (fileset_state != FS_EMPTY);
655 }
656 
657 /*
658  * Replay the spooled messages once the leader apply worker has finished
659  * serializing changes to the file.
660  *
661  * Returns false if there aren't any pending spooled messages, true otherwise.
662  */
663 static bool
665 {
666  PartialFileSetState fileset_state;
667 
668  fileset_state = pa_get_fileset_state();
669 
670  if (fileset_state == FS_EMPTY)
671  return false;
672 
673  /*
674  * If the leader apply worker is busy serializing the partial changes then
675  * acquire the stream lock now and wait for the leader worker to finish
676  * serializing the changes. Otherwise, the parallel apply worker won't get
677  * a chance to receive a STREAM_STOP (and acquire the stream lock) until
678  * the leader had serialized all changes which can lead to undetected
679  * deadlock.
680  *
681  * Note that the fileset state can be FS_SERIALIZE_DONE once the leader
682  * worker has finished serializing the changes.
683  */
684  if (fileset_state == FS_SERIALIZE_IN_PROGRESS)
685  {
688 
689  fileset_state = pa_get_fileset_state();
690  }
691 
692  /*
693  * We cannot read the file immediately after the leader has serialized all
694  * changes to the file because there may still be messages in the memory
695  * queue. We will apply all spooled messages the next time we call this
696  * function and that will ensure there are no messages left in the memory
697  * queue.
698  */
699  if (fileset_state == FS_SERIALIZE_DONE)
700  {
702  }
703  else if (fileset_state == FS_READY)
704  {
709  }
710 
711  return true;
712 }
713 
714 /*
715  * Interrupt handler for main loop of parallel apply worker.
716  */
717 static void
719 {
721 
723  {
724  ereport(LOG,
725  (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",
726  MySubscription->name)));
727 
728  proc_exit(0);
729  }
730 
732  {
733  ConfigReloadPending = false;
735  }
736 }
737 
738 /* Parallel apply worker main loop. */
739 static void
741 {
742  shm_mq_result shmq_res;
743  ErrorContextCallback errcallback;
745 
746  /*
747  * Init the ApplyMessageContext which we clean up after each replication
748  * protocol message.
749  */
751  "ApplyMessageContext",
753 
754  /*
755  * Push apply error context callback. Fields will be filled while applying
756  * a change.
757  */
758  errcallback.callback = apply_error_callback;
759  errcallback.previous = error_context_stack;
760  error_context_stack = &errcallback;
761 
762  for (;;)
763  {
764  void *data;
765  Size len;
766 
768 
769  /* Ensure we are reading the data into our memory context. */
771 
772  shmq_res = shm_mq_receive(mqh, &len, &data, true);
773 
774  if (shmq_res == SHM_MQ_SUCCESS)
775  {
776  StringInfoData s;
777  int c;
778 
779  if (len == 0)
780  elog(ERROR, "invalid message length");
781 
782  s.cursor = 0;
783  s.maxlen = -1;
784  s.data = (char *) data;
785  s.len = len;
786 
787  /*
788  * The first byte of messages sent from leader apply worker to
789  * parallel apply workers can only be 'w'.
790  */
791  c = pq_getmsgbyte(&s);
792  if (c != 'w')
793  elog(ERROR, "unexpected message \"%c\"", c);
794 
795  /*
796  * Ignore statistics fields that have been updated by the leader
797  * apply worker.
798  *
799  * XXX We can avoid sending the statistics fields from the leader
800  * apply worker but for that, it needs to rebuild the entire
801  * message by removing these fields which could be more work than
802  * simply ignoring these fields in the parallel apply worker.
803  */
805 
806  apply_dispatch(&s);
807  }
808  else if (shmq_res == SHM_MQ_WOULD_BLOCK)
809  {
810  /* Replay the changes from the file, if any. */
812  {
813  int rc;
814 
815  /* Wait for more work. */
816  rc = WaitLatch(MyLatch,
818  1000L,
820 
821  if (rc & WL_LATCH_SET)
823  }
824  }
825  else
826  {
827  Assert(shmq_res == SHM_MQ_DETACHED);
828 
829  ereport(ERROR,
830  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
831  errmsg("lost connection to the logical replication apply worker")));
832  }
833 
835  MemoryContextSwitchTo(oldcxt);
836  }
837 
838  /* Pop the error context stack. */
839  error_context_stack = errcallback.previous;
840 
841  MemoryContextSwitchTo(oldcxt);
842 }
843 
844 /*
845  * Make sure the leader apply worker tries to read from our error queue one more
846  * time. This guards against the case where we exit uncleanly without sending
847  * an ErrorResponse, for example because some code calls proc_exit directly.
848  */
849 static void
851 {
855 
857 }
858 
859 /*
860  * Parallel apply worker entry point.
861  */
862 void
864 {
866  dsm_handle handle;
867  dsm_segment *seg;
868  shm_toc *toc;
869  shm_mq *mq;
870  shm_mq_handle *mqh;
871  shm_mq_handle *error_mqh;
872  RepOriginId originid;
873  int worker_slot = DatumGetInt32(main_arg);
874  char originname[NAMEDATALEN];
875 
876  /* Setup signal handling. */
879  pqsignal(SIGTERM, die);
881 
882  /*
883  * Attach to the dynamic shared memory segment for the parallel apply, and
884  * find its table of contents.
885  *
886  * Like parallel query, we don't need resource owner by this time. See
887  * ParallelWorkerMain.
888  */
889  memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle));
890  seg = dsm_attach(handle);
891  if (!seg)
892  ereport(ERROR,
893  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
894  errmsg("unable to map dynamic shared memory segment")));
895 
897  if (!toc)
898  ereport(ERROR,
899  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
900  errmsg("bad magic number in dynamic shared memory segment")));
901 
903 
904  /* Look up the shared information. */
905  shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
906  MyParallelShared = shared;
907 
908  /*
909  * Attach to the message queue.
910  */
911  mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_MQ, false);
913  mqh = shm_mq_attach(mq, seg, NULL);
914 
915  /*
916  * Primary initialization is complete. Now, we can attach to our slot.
917  * This is to ensure that the leader apply worker does not write data to
918  * the uninitialized memory queue.
919  */
920  logicalrep_worker_attach(worker_slot);
921 
926 
927  /*
928  * Attach to the error queue.
929  */
932  error_mqh = shm_mq_attach(mq, seg, NULL);
933 
934  pq_redirect_to_shm_mq(seg, error_mqh);
937 
940 
942 
943  /* Setup replication origin tracking. */
946  originname, sizeof(originname));
947  originid = replorigin_by_name(originname, false);
948 
949  /*
950  * The parallel apply worker doesn't need to monopolize this replication
951  * origin which was already acquired by its leader process.
952  */
954  replorigin_session_origin = originid;
956 
957  /*
958  * Setup callback for syscache so that we know when something changes in
959  * the subscription relation state.
960  */
963  (Datum) 0);
964 
965  set_apply_error_context_origin(originname);
966 
968 
969  /*
970  * The parallel apply worker must not get here because the parallel apply
971  * worker will only stop when it receives a SIGTERM or SIGINT from the
972  * leader, or when there is an error. None of these cases will allow the
973  * code to reach here.
974  */
975  Assert(false);
976 }
977 
978 /*
979  * Handle receipt of an interrupt indicating a parallel apply worker message.
980  *
981  * Note: this is called within a signal handler! All we can do is set a flag
982  * that will cause the next CHECK_FOR_INTERRUPTS() to invoke
983  * HandleParallelApplyMessages().
984  */
985 void
987 {
988  InterruptPending = true;
990  SetLatch(MyLatch);
991 }
992 
993 /*
994  * Handle a single protocol message received from a single parallel apply
995  * worker.
996  */
997 static void
999 {
1000  char msgtype;
1001 
1002  msgtype = pq_getmsgbyte(msg);
1003 
1004  switch (msgtype)
1005  {
1006  case 'E': /* ErrorResponse */
1007  {
1008  ErrorData edata;
1009 
1010  /* Parse ErrorResponse. */
1011  pq_parse_errornotice(msg, &edata);
1012 
1013  /*
1014  * If desired, add a context line to show that this is a
1015  * message propagated from a parallel apply worker. Otherwise,
1016  * it can sometimes be confusing to understand what actually
1017  * happened.
1018  */
1019  if (edata.context)
1020  edata.context = psprintf("%s\n%s", edata.context,
1021  _("logical replication parallel apply worker"));
1022  else
1023  edata.context = pstrdup(_("logical replication parallel apply worker"));
1024 
1025  /*
1026  * Context beyond that should use the error context callbacks
1027  * that were in effect in LogicalRepApplyLoop().
1028  */
1030 
1031  /*
1032  * The actual error must have been reported by the parallel
1033  * apply worker.
1034  */
1035  ereport(ERROR,
1036  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1037  errmsg("logical replication parallel apply worker exited due to error"),
1038  errcontext("%s", edata.context)));
1039  }
1040 
1041  /*
1042  * Don't need to do anything about NoticeResponse and
1043  * NotifyResponse as the logical replication worker doesn't need
1044  * to send messages to the client.
1045  */
1046  case 'N':
1047  case 'A':
1048  break;
1049 
1050  default:
1051  elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
1052  msgtype, msg->len);
1053  }
1054 }
1055 
1056 /*
1057  * Handle any queued protocol messages received from parallel apply workers.
1058  */
1059 void
1061 {
1062  ListCell *lc;
1063  MemoryContext oldcontext;
1064 
1065  static MemoryContext hpam_context = NULL;
1066 
1067  /*
1068  * This is invoked from ProcessInterrupts(), and since some of the
1069  * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
1070  * for recursive calls if more signals are received while this runs. It's
1071  * unclear that recursive entry would be safe, and it doesn't seem useful
1072  * even if it is safe, so let's block interrupts until done.
1073  */
1074  HOLD_INTERRUPTS();
1075 
1076  /*
1077  * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
1078  * don't want to risk leaking data into long-lived contexts, so let's do
1079  * our work here in a private context that we can reset on each use.
1080  */
1081  if (!hpam_context) /* first time through? */
1082  hpam_context = AllocSetContextCreate(TopMemoryContext,
1083  "HandleParallelApplyMessages",
1085  else
1086  MemoryContextReset(hpam_context);
1087 
1088  oldcontext = MemoryContextSwitchTo(hpam_context);
1089 
1091 
1092  foreach(lc, ParallelApplyWorkerPool)
1093  {
1095  Size nbytes;
1096  void *data;
1098 
1099  /*
1100  * The leader will detach from the error queue and set it to NULL
1101  * before preparing to stop all parallel apply workers, so we don't
1102  * need to handle error messages anymore. See
1103  * logicalrep_worker_detach.
1104  */
1105  if (!winfo->error_mq_handle)
1106  continue;
1107 
1108  res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true);
1109 
1110  if (res == SHM_MQ_WOULD_BLOCK)
1111  continue;
1112  else if (res == SHM_MQ_SUCCESS)
1113  {
1114  StringInfoData msg;
1115 
1116  initStringInfo(&msg);
1117  appendBinaryStringInfo(&msg, data, nbytes);
1119  pfree(msg.data);
1120  }
1121  else
1122  ereport(ERROR,
1123  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1124  errmsg("lost connection to the logical replication parallel apply worker")));
1125  }
1126 
1127  MemoryContextSwitchTo(oldcontext);
1128 
1129  /* Might as well clear the context on our way out */
1130  MemoryContextReset(hpam_context);
1131 
1133 }
1134 
1135 /*
1136  * Send the data to the specified parallel apply worker via shared-memory
1137  * queue.
1138  *
1139  * Returns false if the attempt to send data via shared memory times out, true
1140  * otherwise.
1141  */
1142 bool
1143 pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
1144 {
1145  int rc;
1146  shm_mq_result result;
1147  TimestampTz startTime = 0;
1148 
1150  Assert(!winfo->serialize_changes);
1151 
1152  /*
1153  * We don't try to send data to parallel worker for 'immediate' mode. This
1154  * is primarily used for testing purposes.
1155  */
1157  return false;
1158 
1159 /*
1160  * This timeout is a bit arbitrary but testing revealed that it is sufficient
1161  * to send the message unless the parallel apply worker is waiting on some
1162  * lock or there is a serious resource crunch. See the comments atop this file
1163  * to know why we are using a non-blocking way to send the message.
1164  */
1165 #define SHM_SEND_RETRY_INTERVAL_MS 1000
1166 #define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)
1167 
1168  for (;;)
1169  {
1170  result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true);
1171 
1172  if (result == SHM_MQ_SUCCESS)
1173  return true;
1174  else if (result == SHM_MQ_DETACHED)
1175  ereport(ERROR,
1176  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1177  errmsg("could not send data to shared-memory queue")));
1178 
1179  Assert(result == SHM_MQ_WOULD_BLOCK);
1180 
1181  /* Wait before retrying. */
1182  rc = WaitLatch(MyLatch,
1186 
1187  if (rc & WL_LATCH_SET)
1188  {
1191  }
1192 
1193  if (startTime == 0)
1194  startTime = GetCurrentTimestamp();
1195  else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
1197  return false;
1198  }
1199 }
1200 
1201 /*
1202  * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means
1203  * that the current data and any subsequent data for this transaction will be
1204  * serialized to a file. This is done to prevent possible deadlocks with
1205  * another parallel apply worker (refer to the comments atop this file).
1206  */
1207 void
1209  bool stream_locked)
1210 {
1211  ereport(LOG,
1212  (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
1213  winfo->shared->xid)));
1214 
1215  /*
1216  * The parallel apply worker could be stuck for some reason (say waiting
1217  * on some lock by other backend), so stop trying to send data directly to
1218  * it and start serializing data to the file instead.
1219  */
1220  winfo->serialize_changes = true;
1221 
1222  /* Initialize the stream fileset. */
1223  stream_start_internal(winfo->shared->xid, true);
1224 
1225  /*
1226  * Acquires the stream lock if not already to make sure that the parallel
1227  * apply worker will wait for the leader to release the stream lock until
1228  * the end of the transaction.
1229  */
1230  if (!stream_locked)
1232 
1234 }
1235 
1236 /*
1237  * Wait until the parallel apply worker's transaction state has reached or
1238  * exceeded the given xact_state.
1239  */
1240 static void
1242  ParallelTransState xact_state)
1243 {
1244  for (;;)
1245  {
1246  /*
1247  * Stop if the transaction state has reached or exceeded the given
1248  * xact_state.
1249  */
1250  if (pa_get_xact_state(winfo->shared) >= xact_state)
1251  break;
1252 
1253  /* Wait to be signalled. */
1254  (void) WaitLatch(MyLatch,
1256  10L,
1258 
1259  /* Reset the latch so we don't spin. */
1261 
1262  /* An interrupt may have occurred while we were waiting. */
1264  }
1265 }
1266 
1267 /*
1268  * Wait until the parallel apply worker's transaction finishes.
1269  */
1270 static void
1272 {
1273  /*
1274  * Wait until the parallel apply worker set the state to
1275  * PARALLEL_TRANS_STARTED which means it has acquired the transaction
1276  * lock. This is to prevent leader apply worker from acquiring the
1277  * transaction lock earlier than the parallel apply worker.
1278  */
1280 
1281  /*
1282  * Wait for the transaction lock to be released. This is required to
1283  * detect deadlock among leader and parallel apply workers. Refer to the
1284  * comments atop this file.
1285  */
1288 
1289  /*
1290  * Check if the state becomes PARALLEL_TRANS_FINISHED in case the parallel
1291  * apply worker failed while applying changes causing the lock to be
1292  * released.
1293  */
1295  ereport(ERROR,
1296  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1297  errmsg("lost connection to the logical replication parallel apply worker")));
1298 }
1299 
1300 /*
1301  * Set the transaction state for a given parallel apply worker.
1302  */
1303 void
1305  ParallelTransState xact_state)
1306 {
1307  SpinLockAcquire(&wshared->mutex);
1308  wshared->xact_state = xact_state;
1309  SpinLockRelease(&wshared->mutex);
1310 }
1311 
1312 /*
1313  * Get the transaction state for a given parallel apply worker.
1314  */
1315 static ParallelTransState
1317 {
1318  ParallelTransState xact_state;
1319 
1320  SpinLockAcquire(&wshared->mutex);
1321  xact_state = wshared->xact_state;
1322  SpinLockRelease(&wshared->mutex);
1323 
1324  return xact_state;
1325 }
1326 
1327 /*
1328  * Cache the parallel apply worker information.
1329  */
1330 void
1332 {
1333  stream_apply_worker = winfo;
1334 }
1335 
1336 /*
1337  * Form a unique savepoint name for the streaming transaction.
1338  *
1339  * Note that different subscriptions for publications on different nodes can
1340  * receive same remote xid, so we need to use subscription id along with it.
1341  *
1342  * Returns the name in the supplied buffer.
1343  */
1344 static void
1345 pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
1346 {
1347  snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid);
1348 }
1349 
1350 /*
1351  * Define a savepoint for a subxact in parallel apply worker if needed.
1352  *
1353  * The parallel apply worker can figure out if a new subtransaction was
1354  * started by checking if the new change arrived with a different xid. In that
1355  * case define a named savepoint, so that we are able to rollback to it
1356  * if required.
1357  */
1358 void
1360 {
1361  if (current_xid != top_xid &&
1362  !list_member_xid(subxactlist, current_xid))
1363  {
1364  MemoryContext oldctx;
1365  char spname[NAMEDATALEN];
1366 
1367  pa_savepoint_name(MySubscription->oid, current_xid,
1368  spname, sizeof(spname));
1369 
1370  elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
1371 
1372  /* We must be in transaction block to define the SAVEPOINT. */
1373  if (!IsTransactionBlock())
1374  {
1375  if (!IsTransactionState())
1377 
1380  }
1381 
1382  DefineSavepoint(spname);
1383 
1384  /*
1385  * CommitTransactionCommand is needed to start a subtransaction after
1386  * issuing a SAVEPOINT inside a transaction block (see
1387  * StartSubTransaction()).
1388  */
1390 
1392  subxactlist = lappend_xid(subxactlist, current_xid);
1393  MemoryContextSwitchTo(oldctx);
1394  }
1395 }
1396 
1397 /* Reset the list that maintains subtransactions. */
1398 void
1400 {
1401  /*
1402  * We don't need to free this explicitly as the allocated memory will be
1403  * freed at the transaction end.
1404  */
1405  subxactlist = NIL;
1406 }
1407 
1408 /*
1409  * Handle STREAM ABORT message when the transaction was applied in a parallel
1410  * apply worker.
1411  */
1412 void
1414 {
1415  TransactionId xid = abort_data->xid;
1416  TransactionId subxid = abort_data->subxid;
1417 
1418  /*
1419  * Update origin state so we can restart streaming from correct position
1420  * in case of crash.
1421  */
1424 
1425  /*
1426  * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1427  * just free the subxactlist.
1428  */
1429  if (subxid == xid)
1430  {
1432 
1433  /*
1434  * Release the lock as we might be processing an empty streaming
1435  * transaction in which case the lock won't be released during
1436  * transaction rollback.
1437  *
1438  * Note that it's ok to release the transaction lock before aborting
1439  * the transaction because even if the parallel apply worker dies due
1440  * to crash or some other reason, such a transaction would still be
1441  * considered aborted.
1442  */
1444 
1446 
1447  if (IsTransactionBlock())
1448  {
1449  EndTransactionBlock(false);
1451  }
1452 
1454 
1456  }
1457  else
1458  {
1459  /* OK, so it's a subxact. Rollback to the savepoint. */
1460  int i;
1461  char spname[NAMEDATALEN];
1462 
1463  pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname));
1464 
1465  elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
1466 
1467  /*
1468  * Search the subxactlist, determine the offset tracked for the
1469  * subxact, and truncate the list.
1470  *
1471  * Note that for an empty sub-transaction we won't find the subxid
1472  * here.
1473  */
1474  for (i = list_length(subxactlist) - 1; i >= 0; i--)
1475  {
1477 
1478  if (xid_tmp == subxid)
1479  {
1480  RollbackToSavepoint(spname);
1483  break;
1484  }
1485  }
1486  }
1487 }
1488 
1489 /*
1490  * Set the fileset state for a particular parallel apply worker. The fileset
1491  * will be set once the leader worker serialized all changes to the file
1492  * so that it can be used by parallel apply worker.
1493  */
1494 void
1496  PartialFileSetState fileset_state)
1497 {
1498  SpinLockAcquire(&wshared->mutex);
1499  wshared->fileset_state = fileset_state;
1500 
1501  if (fileset_state == FS_SERIALIZE_DONE)
1502  {
1506  }
1507 
1508  SpinLockRelease(&wshared->mutex);
1509 }
1510 
1511 /*
1512  * Get the fileset state for the current parallel apply worker.
1513  */
1514 static PartialFileSetState
1516 {
1517  PartialFileSetState fileset_state;
1518 
1520 
1522  fileset_state = MyParallelShared->fileset_state;
1524 
1525  return fileset_state;
1526 }
1527 
1528 /*
1529  * Helper functions to acquire and release a lock for each stream block.
1530  *
1531  * Set locktag_field4 to PARALLEL_APPLY_LOCK_STREAM to indicate that it's a
1532  * stream lock.
1533  *
1534  * Refer to the comments atop this file to see how the stream lock is used.
1535  */
1536 void
1538 {
1540  PARALLEL_APPLY_LOCK_STREAM, lockmode);
1541 }
1542 
1543 void
1545 {
1547  PARALLEL_APPLY_LOCK_STREAM, lockmode);
1548 }
1549 
1550 /*
1551  * Helper functions to acquire and release a lock for each local transaction
1552  * apply.
1553  *
1554  * Set locktag_field4 to PARALLEL_APPLY_LOCK_XACT to indicate that it's a
1555  * transaction lock.
1556  *
1557  * Note that all the callers must pass a remote transaction ID instead of a
1558  * local transaction ID as xid. This is because the local transaction ID will
1559  * only be assigned while applying the first change in the parallel apply but
1560  * it's possible that the first change in the parallel apply worker is blocked
1561  * by a concurrently executing transaction in another parallel apply worker. We
1562  * can only communicate the local transaction id to the leader after applying
1563  * the first change so it won't be able to wait after sending the xact finish
1564  * command using this lock.
1565  *
1566  * Refer to the comments atop this file to see how the transaction lock is
1567  * used.
1568  */
1569 void
1571 {
1573  PARALLEL_APPLY_LOCK_XACT, lockmode);
1574 }
1575 
1576 void
1578 {
1580  PARALLEL_APPLY_LOCK_XACT, lockmode);
1581 }
1582 
1583 /*
1584  * Decrement the number of pending streaming blocks and wait on the stream lock
1585  * if there is no pending block available.
1586  */
1587 void
1589 {
1591 
1592  /*
1593  * It is only possible to not have any pending stream chunks when we are
1594  * applying spooled messages.
1595  */
1597  {
1599  return;
1600 
1601  elog(ERROR, "invalid pending streaming chunk 0");
1602  }
1603 
1605  {
1608  }
1609 }
1610 
1611 /*
1612  * Finish processing the streaming transaction in the leader apply worker.
1613  */
1614 void
1616 {
1618 
1619  /*
1620  * Unlock the shared object lock so that parallel apply worker can
1621  * continue to receive and apply changes.
1622  */
1624 
1625  /*
1626  * Wait for that worker to finish. This is necessary to maintain commit
1627  * order which avoids failures due to transaction dependencies and
1628  * deadlocks.
1629  */
1630  pa_wait_for_xact_finish(winfo);
1631 
1632  if (!XLogRecPtrIsInvalid(remote_lsn))
1633  store_flush_position(remote_lsn, winfo->shared->last_commit_end);
1634 
1635  pa_free_worker(winfo);
1636 }
struct ParallelApplyWorkerEntry ParallelApplyWorkerEntry
static ParallelApplyWorkerInfo * stream_apply_worker
static List * ParallelApplyWorkerPool
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
static bool pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
#define DSM_ERROR_QUEUE_SIZE
volatile sig_atomic_t ParallelApplyMessagePending
static bool pa_can_start(void)
void HandleParallelApplyMessageInterrupt(void)
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)
static ParallelApplyWorkerInfo * pa_launch_parallel_worker(void)
#define SHM_SEND_TIMEOUT_MS
#define DSM_QUEUE_SIZE
static void pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
static void ProcessParallelApplyInterrupts(void)
static PartialFileSetState pa_get_fileset_state(void)
static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo)
#define PARALLEL_APPLY_LOCK_XACT
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
static List * subxactlist
static bool pa_has_spooled_message_pending()
static void pa_shutdown(int code, Datum arg)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_reset_subtrans(void)
static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared)
#define PARALLEL_APPLY_KEY_SHARED
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerShared * MyParallelShared
void pa_detach_all_error_mq(void)
static void LogicalParallelApplyLoop(shm_mq_handle *mqh)
static void pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo, ParallelTransState xact_state)
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
#define PARALLEL_APPLY_KEY_ERROR_QUEUE
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
static void pa_free_worker(ParallelApplyWorkerInfo *winfo)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
#define PARALLEL_APPLY_KEY_MQ
static void pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
#define SIZE_STATS_MESSAGE
#define SHM_SEND_RETRY_INTERVAL_MS
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_allocate_worker(TransactionId xid)
static bool pa_process_spooled_messages_if_required(void)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
static HTAB * ParallelApplyTxnHash
#define PARALLEL_APPLY_LOCK_STREAM
static void HandleParallelApplyMessage(StringInfo msg)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void ParallelApplyWorkerMain(Datum main_arg)
#define PG_LOGICAL_APPLY_SHM_MAGIC
void pa_decr_and_wait_stream_block(void)
void HandleParallelApplyMessages(void)
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
Definition: atomics.h:396
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition: atomics.h:218
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
Definition: atomics.h:236
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:4149
MemoryContext ApplyMessageContext
Definition: worker.c:306
void InitializeApplyWorker(void)
Definition: worker.c:4395
void apply_dispatch(StringInfo s)
Definition: worker.c:3236
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:457
ErrorContextCallback * apply_error_context_stack
Definition: worker.c:304
void stream_start_internal(TransactionId xid, bool first_segment)
Definition: worker.c:1449
void set_apply_error_context_origin(char *originname)
Definition: worker.c:4996
MemoryContext ApplyContext
Definition: worker.c:307
void apply_error_callback(void *arg)
Definition: worker.c:4854
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition: worker.c:3400
void maybe_reread_subscription(void)
Definition: worker.c:3823
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:2021
Subscription * MySubscription
Definition: worker.c:314
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1727
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1582
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_IDLE
#define InvalidBackendId
Definition: backendid.h:23
unsigned short uint16
Definition: c.h:489
#define unlikely(x)
Definition: c.h:295
#define MemSet(start, val, len)
Definition: c.h:1004
uint32 TransactionId
Definition: c.h:636
size_t Size
Definition: c.h:589
int64 TimestampTz
Definition: timestamp.h:39
dsm_handle dsm_segment_handle(dsm_segment *seg)
Definition: dsm.c:1094
void * dsm_segment_address(dsm_segment *seg)
Definition: dsm.c:1066
void dsm_detach(dsm_segment *seg)
Definition: dsm.c:776
dsm_segment * dsm_attach(dsm_handle h)
Definition: dsm.c:638
dsm_segment * dsm_create(Size size, int flags)
Definition: dsm.c:489
uint32 dsm_handle
Definition: dsm_impl.h:55
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:953
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:350
ErrorContextCallback * error_context_stack
Definition: elog.c:95
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define _(x)
Definition: elog.c:91
#define LOG
Definition: elog.h:31
#define errcontext
Definition: elog.h:196
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
volatile sig_atomic_t InterruptPending
Definition: globals.c:30
struct Latch * MyLatch
Definition: globals.c:58
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_REMOVE
Definition: hsearch.h:115
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition: interrupt.c:109
volatile sig_atomic_t ShutdownRequestPending
Definition: interrupt.c:28
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1519
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:333
void proc_exit(int code)
Definition: ipc.c:104
int i
Definition: isn.c:73
void SetLatch(Latch *latch)
Definition: latch.c:607
void ResetLatch(Latch *latch)
Definition: latch.c:699
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:492
#define WL_TIMEOUT
Definition: latch.h:128
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
void logicalrep_pa_worker_stop(int slot_no, uint16 generation)
Definition: launcher.c:619
void logicalrep_worker_attach(int slot)
Definition: launcher.c:674
bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm)
Definition: launcher.c:306
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:61
int max_parallel_apply_workers_per_subscription
Definition: launcher.c:59
Assert(fmt[strlen(fmt) - 1] !='\n')
List * list_delete_ptr(List *list, void *datum)
Definition: list.c:871
List * list_truncate(List *list, int new_size)
Definition: list.c:630
List * lappend_xid(List *list, TransactionId datum)
Definition: list.c:392
bool list_member_xid(const List *list, TransactionId datum)
Definition: list.c:741
List * lappend(List *list, void *datum)
Definition: list.c:338
void UnlockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)
Definition: lmgr.c:1146
void LockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)
Definition: lmgr.c:1128
int LOCKMODE
Definition: lockdefs.h:26
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define AccessShareLock
Definition: lockdefs.h:36
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:314
MemoryContext TopTransactionContext
Definition: mcxt.c:146
char * pstrdup(const char *in)
Definition: mcxt.c:1624
void pfree(void *pointer)
Definition: mcxt.c:1436
MemoryContext TopMemoryContext
Definition: mcxt.c:141
void * palloc0(Size size)
Definition: mcxt.c:1241
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:134
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:132
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:158
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:221
RepOriginId replorigin_session_origin
Definition: origin.c:156
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1095
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:157
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
void * arg
#define NAMEDATALEN
const void size_t len
const void * data
#define lfirst(lc)
Definition: pg_list.h:172
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
static ListCell * list_nth_cell(const List *list, int n)
Definition: pg_list.h:277
#define lfirst_xid(lc)
Definition: pg_list.h:175
#define die(msg)
Definition: pg_test_fsync.c:95
pqsigfunc pqsignal(int signo, pqsigfunc func)
#define snprintf
Definition: port.h:238
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
uintptr_t Datum
Definition: postgres.h:64
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:202
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5660
BackgroundWorker * MyBgworkerEntry
Definition: postmaster.c:193
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:402
void pq_set_parallel_leader(pid_t pid, BackendId backend_id)
Definition: pqmq.c:78
void pq_parse_errornotice(StringInfo msg, ErrorData *edata)
Definition: pqmq.c:216
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
Definition: pqmq.c:53
char * c
e
Definition: preproc-init.c:82
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:262
@ PROCSIG_PARALLEL_APPLY_MESSAGE
Definition: procsignal.h:38
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
int logical_replication_mode
@ LOGICAL_REP_MODE_IMMEDIATE
Definition: reorderbuffer.h:28
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:291
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:225
shm_mq * shm_mq_create(void *address, Size size)
Definition: shm_mq.c:178
void shm_mq_detach(shm_mq_handle *mqh)
Definition: shm_mq.c:844
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:207
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition: shm_mq.c:573
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Definition: shm_mq.c:330
shm_mq_result
Definition: shm_mq.h:37
@ SHM_MQ_SUCCESS
Definition: shm_mq.h:38
@ SHM_MQ_WOULD_BLOCK
Definition: shm_mq.h:39
@ SHM_MQ_DETACHED
Definition: shm_mq.h:40
shm_toc * shm_toc_attach(uint64 magic, void *address)
Definition: shm_toc.c:64
shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)
Definition: shm_toc.c:40
Size shm_toc_estimate(shm_toc_estimator *e)
Definition: shm_toc.c:263
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
#define shm_toc_initialize_estimator(e)
Definition: shm_toc.h:49
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
#define SpinLockInit(lock)
Definition: spin.h:60
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
PGPROC * MyProc
Definition: proc.c:66
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition: stringinfo.c:227
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
char bgw_extra[BGW_EXTRALEN]
Definition: bgworker.h:99
struct ErrorContextCallback * previous
Definition: elog.h:295
void(* callback)(void *arg)
Definition: elog.h:296
char * context
Definition: elog.h:443
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
Definition: dynahash.c:220
Definition: pg_list.h:54
TimestampTz last_recv_time
TimestampTz reply_time
FileSet * stream_fileset
TimestampTz last_send_time
ParallelApplyWorkerInfo * winfo
shm_mq_handle * error_mq_handle
shm_mq_handle * mq_handle
ParallelApplyWorkerShared * shared
pg_atomic_uint32 pending_stream_count
PartialFileSetState fileset_state
ParallelTransState xact_state
XLogRecPtr skiplsn
Definition: shm_mq.c:73
@ SUBSCRIPTIONRELMAP
Definition: syscache.h:100
bool AllTablesyncsReady(void)
Definition: tablesync.c:1573
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:272
#define TransactionIdIsValid(xid)
Definition: transam.h:41
@ WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN
Definition: wait_event.h:45
@ WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE
Definition: wait_event.h:110
@ WAIT_EVENT_LOGICAL_APPLY_SEND_DATA
Definition: wait_event.h:109
#define SIGHUP
Definition: win32_port.h:176
ParallelTransState
@ PARALLEL_TRANS_UNKNOWN
@ PARALLEL_TRANS_STARTED
@ PARALLEL_TRANS_FINISHED
static bool am_parallel_apply_worker(void)
PartialFileSetState
@ FS_EMPTY
@ FS_SERIALIZE_DONE
@ FS_READY
@ FS_SERIALIZE_IN_PROGRESS
static bool am_leader_apply_worker(void)
void DefineSavepoint(const char *name)
Definition: xact.c:4226
bool IsTransactionState(void)
Definition: xact.c:378
void StartTransactionCommand(void)
Definition: xact.c:2944
bool IsTransactionBlock(void)
Definition: xact.c:4823
void BeginTransactionBlock(void)
Definition: xact.c:3777
void CommitTransactionCommand(void)
Definition: xact.c:3041
void RollbackToSavepoint(const char *name)
Definition: xact.c:4420
bool EndTransactionBlock(bool chain)
Definition: xact.c:3897
void AbortCurrentTransaction(void)
Definition: xact.c:3312
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28