PostgreSQL Source Code git master
Loading...
Searching...
No Matches
applyparallelworker.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 * applyparallelworker.c
3 * Support routines for applying xact by parallel apply worker
4 *
5 * Copyright (c) 2023-2026, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/applyparallelworker.c
9 *
10 * This file contains the code to launch, set up, and teardown a parallel apply
11 * worker which receives the changes from the leader worker and invokes routines
12 * to apply those on the subscriber database. Additionally, this file contains
13 * routines that are intended to support setting up, using, and tearing down a
14 * ParallelApplyWorkerInfo which is required so the leader worker and parallel
15 * apply workers can communicate with each other.
16 *
17 * The parallel apply workers are assigned (if available) as soon as xact's
18 * first stream is received for subscriptions that have set their 'streaming'
19 * option as parallel. The leader apply worker will send changes to this new
20 * worker via shared memory. We keep this worker assigned till the transaction
21 * commit is received and also wait for the worker to finish at commit. This
22 * preserves commit ordering and avoid file I/O in most cases, although we
23 * still need to spill to a file if there is no worker available. See comments
24 * atop logical/worker to know more about streamed xacts whose changes are
25 * spilled to disk. It is important to maintain commit order to avoid failures
26 * due to: (a) transaction dependencies - say if we insert a row in the first
27 * transaction and update it in the second transaction on publisher then
28 * allowing the subscriber to apply both in parallel can lead to failure in the
29 * update; (b) deadlocks - allowing transactions that update the same set of
30 * rows/tables in the opposite order to be applied in parallel can lead to
31 * deadlocks.
32 *
33 * A worker pool is used to avoid restarting workers for each streaming
34 * transaction. We maintain each worker's information (ParallelApplyWorkerInfo)
35 * in the ParallelApplyWorkerPool. After successfully launching a new worker,
36 * its information is added to the ParallelApplyWorkerPool. Once the worker
37 * finishes applying the transaction, it is marked as available for re-use.
38 * Now, before starting a new worker to apply the streaming transaction, we
39 * check the list for any available worker. Note that we retain a maximum of
40 * half the max_parallel_apply_workers_per_subscription workers in the pool and
41 * after that, we simply exit the worker after applying the transaction.
42 *
43 * XXX This worker pool threshold is arbitrary and we can provide a GUC
44 * variable for this in the future if required.
45 *
46 * The leader apply worker will create a separate dynamic shared memory segment
47 * when each parallel apply worker starts. The reason for this design is that
48 * we cannot predict how many workers will be needed. It may be possible to
49 * allocate enough shared memory in one segment based on the maximum number of
50 * parallel apply workers (max_parallel_apply_workers_per_subscription), but
51 * this would waste memory if no process is actually started.
52 *
53 * The dynamic shared memory segment contains: (a) a shm_mq that is used to
54 * send changes in the transaction from leader apply worker to parallel apply
55 * worker; (b) another shm_mq that is used to send errors (and other messages
56 * reported via elog/ereport) from the parallel apply worker to leader apply
57 * worker; (c) necessary information to be shared among parallel apply workers
58 * and the leader apply worker (i.e. members of ParallelApplyWorkerShared).
59 *
60 * Locking Considerations
61 * ----------------------
62 * We have a risk of deadlock due to concurrently applying the transactions in
63 * parallel mode that were independent on the publisher side but became
64 * dependent on the subscriber side due to the different database structures
65 * (like schema of subscription tables, constraints, etc.) on each side. This
66 * can happen even without parallel mode when there are concurrent operations
67 * on the subscriber. In order to detect the deadlocks among leader (LA) and
68 * parallel apply (PA) workers, we used lmgr locks when the PA waits for the
69 * next stream (set of changes) and LA waits for PA to finish the transaction.
70 * An alternative approach could be to not allow parallelism when the schema of
71 * tables is different between the publisher and subscriber but that would be
72 * too restrictive and would require the publisher to send much more
73 * information than it is currently sending.
74 *
75 * Consider a case where the subscribed table does not have a unique key on the
76 * publisher and has a unique key on the subscriber. The deadlock can happen in
77 * the following ways:
78 *
79 * 1) Deadlock between the leader apply worker and a parallel apply worker
80 *
81 * Consider that the parallel apply worker (PA) is executing TX-1 and the
82 * leader apply worker (LA) is executing TX-2 concurrently on the subscriber.
83 * Now, LA is waiting for PA because of the unique key constraint of the
84 * subscribed table while PA is waiting for LA to send the next stream of
85 * changes or transaction finish command message.
86 *
87 * In order for lmgr to detect this, we have LA acquire a session lock on the
88 * remote transaction (by pa_lock_stream()) and have PA wait on the lock before
89 * trying to receive the next stream of changes. Specifically, LA will acquire
90 * the lock in AccessExclusive mode before sending the STREAM_STOP and will
91 * release it if already acquired after sending the STREAM_START, STREAM_ABORT
92 * (for toplevel transaction), STREAM_PREPARE, and STREAM_COMMIT. The PA will
93 * acquire the lock in AccessShare mode after processing STREAM_STOP and
94 * STREAM_ABORT (for subtransaction) and then release the lock immediately
95 * after acquiring it.
96 *
97 * The lock graph for the above example will look as follows:
98 * LA (waiting to acquire the lock on the unique index) -> PA (waiting to
99 * acquire the stream lock) -> LA
100 *
101 * This way, when PA is waiting for LA for the next stream of changes, we can
102 * have a wait-edge from PA to LA in lmgr, which will make us detect the
103 * deadlock between LA and PA.
104 *
105 * 2) Deadlock between the leader apply worker and parallel apply workers
106 *
107 * This scenario is similar to the first case but TX-1 and TX-2 are executed by
108 * two parallel apply workers (PA-1 and PA-2 respectively). In this scenario,
109 * PA-2 is waiting for PA-1 to complete its transaction while PA-1 is waiting
110 * for subsequent input from LA. Also, LA is waiting for PA-2 to complete its
111 * transaction in order to preserve the commit order. There is a deadlock among
112 * the three processes.
113 *
114 * In order for lmgr to detect this, we have PA acquire a session lock (this is
115 * a different lock than referred in the previous case, see
116 * pa_lock_transaction()) on the transaction being applied and have LA wait on
117 * the lock before proceeding in the transaction finish commands. Specifically,
118 * PA will acquire this lock in AccessExclusive mode before executing the first
119 * message of the transaction and release it at the xact end. LA will acquire
120 * this lock in AccessShare mode at transaction finish commands and release it
121 * immediately.
122 *
123 * The lock graph for the above example will look as follows:
124 * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire the
125 * lock due to unique index constraint) -> PA-1 (waiting to acquire the stream
126 * lock) -> LA
127 *
128 * This way when LA is waiting to finish the transaction end command to preserve
129 * the commit order, we will be able to detect deadlock, if any.
130 *
131 * One might think we can use XactLockTableWait(), but XactLockTableWait()
132 * considers PREPARED TRANSACTION as still in progress which means the lock
133 * won't be released even after the parallel apply worker has prepared the
134 * transaction.
135 *
136 * 3) Deadlock when the shm_mq buffer is full
137 *
138 * In the previous scenario (ie. PA-1 and PA-2 are executing transactions
139 * concurrently), if the shm_mq buffer between LA and PA-2 is full, LA has to
140 * wait to send messages, and this wait doesn't appear in lmgr.
141 *
142 * To avoid this wait, we use a non-blocking write and wait with a timeout. If
143 * the timeout is exceeded, the LA will serialize all the pending messages to
144 * a file and indicate PA-2 that it needs to read that file for the remaining
145 * messages. Then LA will start waiting for commit as in the previous case
146 * which will detect deadlock if any. See pa_send_data() and
147 * enum TransApplyAction.
148 *
149 * Lock types
150 * ----------
151 * Both the stream lock and the transaction lock mentioned above are
152 * session-level locks because both locks could be acquired outside the
153 * transaction, and the stream lock in the leader needs to persist across
154 * transaction boundaries i.e. until the end of the streaming transaction.
155 *-------------------------------------------------------------------------
156 */
157
158#include "postgres.h"
159
160#include "libpq/pqformat.h"
161#include "libpq/pqmq.h"
162#include "pgstat.h"
163#include "postmaster/interrupt.h"
166#include "replication/origin.h"
168#include "storage/ipc.h"
169#include "storage/latch.h"
170#include "storage/lmgr.h"
171#include "storage/proc.h"
172#include "tcop/tcopprot.h"
173#include "utils/inval.h"
174#include "utils/memutils.h"
175#include "utils/syscache.h"
176#include "utils/wait_event.h"
177
178#define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067
179
180/*
181 * DSM keys for parallel apply worker. Unlike other parallel execution code,
182 * since we don't need to worry about DSM keys conflicting with plan_node_id we
183 * can use small integers.
184 */
185#define PARALLEL_APPLY_KEY_SHARED 1
186#define PARALLEL_APPLY_KEY_MQ 2
187#define PARALLEL_APPLY_KEY_ERROR_QUEUE 3
188
189/* Queue size of DSM, 16 MB for now. */
190#define DSM_QUEUE_SIZE (16 * 1024 * 1024)
191
192/*
193 * Error queue size of DSM. It is desirable to make it large enough that a
194 * typical ErrorResponse can be sent without blocking. That way, a worker that
195 * errors out can write the whole message into the queue and terminate without
196 * waiting for the user backend.
197 */
198#define DSM_ERROR_QUEUE_SIZE (16 * 1024)
199
200/*
201 * There are three fields in each message received by the parallel apply
202 * worker: start_lsn, end_lsn and send_time. Because we have updated these
203 * statistics in the leader apply worker, we can ignore these fields in the
204 * parallel apply worker (see function LogicalRepApplyLoop).
205 */
206#define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
207
208/*
209 * The type of session-level lock on a transaction being applied on a logical
210 * replication subscriber.
211 */
212#define PARALLEL_APPLY_LOCK_STREAM 0
213#define PARALLEL_APPLY_LOCK_XACT 1
214
215/*
216 * Hash table entry to map xid to the parallel apply worker state.
217 */
223
224/*
225 * A hash table used to cache the state of streaming transactions being applied
226 * by the parallel apply workers.
227 */
229
230/*
231* A list (pool) of active parallel apply workers. The information for
232* the new worker is added to the list after successfully launching it. The
233* list entry is removed if there are already enough workers in the worker
234* pool at the end of the transaction. For more information about the worker
235* pool, see comments atop this file.
236 */
238
239/*
240 * Information shared between leader apply worker and parallel apply worker.
241 */
243
244/*
245 * Is there a message sent by a parallel apply worker that the leader apply
246 * worker needs to receive?
247 */
249
250/*
251 * Cache the parallel apply worker information required for applying the
252 * current streaming transaction. It is used to save the cost of searching the
253 * hash table when applying the changes between STREAM_START and STREAM_STOP.
254 */
256
257/* A list to maintain subtransactions, if any. */
259
263
264/*
265 * Returns true if it is OK to start a parallel apply worker, false otherwise.
266 */
267static bool
269{
270 /* Only leader apply workers can start parallel apply workers. */
272 return false;
273
274 /*
275 * It is good to check for any change in the subscription parameter to
276 * avoid the case where for a very long time the change doesn't get
277 * reflected. This can happen when there is a constant flow of streaming
278 * transactions that are handled by parallel apply workers.
279 *
280 * It is better to do it before the below checks so that the latest values
281 * of subscription can be used for the checks.
282 */
284
285 /*
286 * Don't start a new parallel apply worker if the subscription is not
287 * using parallel streaming mode, or if the publisher does not support
288 * parallel apply.
289 */
291 return false;
292
293 /*
294 * Don't start a new parallel worker if user has set skiplsn as it's
295 * possible that they want to skip the streaming transaction. For
296 * streaming transactions, we need to serialize the transaction to a file
297 * so that we can get the last LSN of the transaction to judge whether to
298 * skip before starting to apply the change.
299 *
300 * One might think that we could allow parallelism if the first lsn of the
301 * transaction is greater than skiplsn, but we don't send it with the
302 * STREAM START message, and it doesn't seem worth sending the extra eight
303 * bytes with the STREAM START to enable parallelism for this case.
304 */
306 return false;
307
308 /*
309 * For streaming transactions that are being applied using a parallel
310 * apply worker, we cannot decide whether to apply the change for a
311 * relation that is not in the READY state (see
312 * should_apply_changes_for_rel) as we won't know remote_final_lsn by that
313 * time. So, we don't start the new parallel apply worker in this case.
314 */
315 if (!AllTablesyncsReady())
316 return false;
317
318 return true;
319}
320
321/*
322 * Set up a dynamic shared memory segment.
323 *
324 * We set up a control region that contains a fixed-size worker info
325 * (ParallelApplyWorkerShared), a message queue, and an error queue.
326 *
327 * Returns true on success, false on failure.
328 */
329static bool
331{
333 Size segsize;
334 dsm_segment *seg;
335 shm_toc *toc;
337 shm_mq *mq;
338 Size queue_size = DSM_QUEUE_SIZE;
340
341 /*
342 * Estimate how much shared memory we need.
343 *
344 * Because the TOC machinery may choose to insert padding of oddly-sized
345 * requests, we must estimate each chunk separately.
346 *
347 * We need one key to register the location of the header, and two other
348 * keys to track the locations of the message queue and the error message
349 * queue.
350 */
353 shm_toc_estimate_chunk(&e, queue_size);
355
357 segsize = shm_toc_estimate(&e);
358
359 /* Create the shared memory segment and establish a table of contents. */
360 seg = dsm_create(shm_toc_estimate(&e), 0);
361 if (!seg)
362 return false;
363
365 segsize);
366
367 /* Set up the header region. */
368 shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared));
369 SpinLockInit(&shared->mutex);
370
374 shared->fileset_state = FS_EMPTY;
375
377
378 /* Set up message queue for the worker. */
379 mq = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
382
383 /* Attach the queue. */
384 winfo->mq_handle = shm_mq_attach(mq, seg, NULL);
385
386 /* Set up error queue for the worker. */
391
392 /* Attach the queue. */
393 winfo->error_mq_handle = shm_mq_attach(mq, seg, NULL);
394
395 /* Return results to caller. */
396 winfo->dsm_seg = seg;
397 winfo->shared = shared;
398
399 return true;
400}
401
402/*
403 * Try to get a parallel apply worker from the pool. If none is available then
404 * start a new one.
405 */
408{
409 MemoryContext oldcontext;
410 bool launched;
412 ListCell *lc;
413
414 /* Try to get an available parallel apply worker from the worker pool. */
416 {
417 winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
418
419 if (!winfo->in_use)
420 return winfo;
421 }
422
423 /*
424 * Start a new parallel apply worker.
425 *
426 * The worker info can be used for the lifetime of the worker process, so
427 * create it in a permanent context.
428 */
430
432
433 /* Setup shared memory. */
434 if (!pa_setup_dsm(winfo))
435 {
436 MemoryContextSwitchTo(oldcontext);
437 pfree(winfo);
438 return NULL;
439 }
440
448 false);
449
450 if (launched)
451 {
453 }
454 else
455 {
456 pa_free_worker_info(winfo);
457 winfo = NULL;
458 }
459
460 MemoryContextSwitchTo(oldcontext);
461
462 return winfo;
463}
464
465/*
466 * Allocate a parallel apply worker that will be used for the specified xid.
467 *
468 * We first try to get an available worker from the pool, if any and then try
469 * to launch a new worker. On successful allocation, remember the worker
470 * information in the hash table so that we can get it later for processing the
471 * streaming changes.
472 */
473void
475{
476 bool found;
479
480 if (!pa_can_start())
481 return;
482
484 if (!winfo)
485 return;
486
487 /* First time through, initialize parallel apply worker state hashtable. */
489 {
490 HASHCTL ctl;
491
492 MemSet(&ctl, 0, sizeof(ctl));
493 ctl.keysize = sizeof(TransactionId);
494 ctl.entrysize = sizeof(ParallelApplyWorkerEntry);
495 ctl.hcxt = ApplyContext;
496
497 ParallelApplyTxnHash = hash_create("logical replication parallel apply workers hash",
498 16, &ctl,
500 }
501
502 /* Create an entry for the requested transaction. */
503 entry = hash_search(ParallelApplyTxnHash, &xid, HASH_ENTER, &found);
504 if (found)
505 elog(ERROR, "hash table corrupted");
506
507 /* Update the transaction information in shared memory. */
508 SpinLockAcquire(&winfo->shared->mutex);
510 winfo->shared->xid = xid;
511 SpinLockRelease(&winfo->shared->mutex);
512
513 winfo->in_use = true;
514 winfo->serialize_changes = false;
515 entry->winfo = winfo;
516}
517
518/*
519 * Find the assigned worker for the given transaction, if any.
520 */
523{
524 bool found;
526
527 if (!TransactionIdIsValid(xid))
528 return NULL;
529
531 return NULL;
532
533 /* Return the cached parallel apply worker if valid. */
535 return stream_apply_worker;
536
537 /* Find an entry for the requested transaction. */
538 entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
539 if (found)
540 {
541 /* The worker must not have exited. */
542 Assert(entry->winfo->in_use);
543 return entry->winfo;
544 }
545
546 return NULL;
547}
548
549/*
550 * Makes the worker available for reuse.
551 *
552 * This removes the parallel apply worker entry from the hash table so that it
553 * can't be used. If there are enough workers in the pool, it stops the worker
554 * and frees the corresponding info. Otherwise it just marks the worker as
555 * available for reuse.
556 *
557 * For more information about the worker pool, see comments atop this file.
558 */
559static void
561{
563 Assert(winfo->in_use);
565
567 elog(ERROR, "hash table corrupted");
568
569 /*
570 * Stop the worker if there are enough workers in the pool.
571 *
572 * XXX Additionally, we also stop the worker if the leader apply worker
573 * serialize part of the transaction data due to a send timeout. This is
574 * because the message could be partially written to the queue and there
575 * is no way to clean the queue other than resending the message until it
576 * succeeds. Instead of trying to send the data which anyway would have
577 * been serialized and then letting the parallel apply worker deal with
578 * the spurious message, we stop the worker.
579 */
580 if (winfo->serialize_changes ||
583 {
585 pa_free_worker_info(winfo);
586
587 return;
588 }
589
590 winfo->in_use = false;
591 winfo->serialize_changes = false;
592}
593
594/*
595 * Free the parallel apply worker information and unlink the files with
596 * serialized changes if any.
597 */
598static void
600{
601 Assert(winfo);
602
603 if (winfo->mq_handle)
604 shm_mq_detach(winfo->mq_handle);
605
606 if (winfo->error_mq_handle)
608
609 /* Unlink the files with serialized changes. */
610 if (winfo->serialize_changes)
612
613 if (winfo->dsm_seg)
614 dsm_detach(winfo->dsm_seg);
615
616 /* Remove from the worker pool. */
618
619 pfree(winfo);
620}
621
622/*
623 * Detach the error queue for all parallel apply workers.
624 */
625void
627{
628 ListCell *lc;
629
631 {
633
634 if (winfo->error_mq_handle)
635 {
637 winfo->error_mq_handle = NULL;
638 }
639 }
640}
641
642/*
643 * Check if there are any pending spooled messages.
644 */
645static bool
647{
648 PartialFileSetState fileset_state;
649
650 fileset_state = pa_get_fileset_state();
651
652 return (fileset_state != FS_EMPTY);
653}
654
655/*
656 * Replay the spooled messages once the leader apply worker has finished
657 * serializing changes to the file.
658 *
659 * Returns false if there aren't any pending spooled messages, true otherwise.
660 */
661static bool
663{
664 PartialFileSetState fileset_state;
665
666 fileset_state = pa_get_fileset_state();
667
668 if (fileset_state == FS_EMPTY)
669 return false;
670
671 /*
672 * If the leader apply worker is busy serializing the partial changes then
673 * acquire the stream lock now and wait for the leader worker to finish
674 * serializing the changes. Otherwise, the parallel apply worker won't get
675 * a chance to receive a STREAM_STOP (and acquire the stream lock) until
676 * the leader had serialized all changes which can lead to undetected
677 * deadlock.
678 *
679 * Note that the fileset state can be FS_SERIALIZE_DONE once the leader
680 * worker has finished serializing the changes.
681 */
682 if (fileset_state == FS_SERIALIZE_IN_PROGRESS)
683 {
686
687 fileset_state = pa_get_fileset_state();
688 }
689
690 /*
691 * We cannot read the file immediately after the leader has serialized all
692 * changes to the file because there may still be messages in the memory
693 * queue. We will apply all spooled messages the next time we call this
694 * function and that will ensure there are no messages left in the memory
695 * queue.
696 */
697 if (fileset_state == FS_SERIALIZE_DONE)
698 {
700 }
701 else if (fileset_state == FS_READY)
702 {
707 }
708
709 return true;
710}
711
712/*
713 * Interrupt handler for main loop of parallel apply worker.
714 */
715static void
717{
719
721 {
722 ereport(LOG,
723 (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",
725
726 proc_exit(0);
727 }
728
730 {
731 ConfigReloadPending = false;
733 }
734}
735
736/* Parallel apply worker main loop. */
737static void
739{
741 ErrorContextCallback errcallback;
743
744 /*
745 * Init the ApplyMessageContext which we clean up after each replication
746 * protocol message.
747 */
749 "ApplyMessageContext",
751
752 /*
753 * Push apply error context callback. Fields will be filled while applying
754 * a change.
755 */
756 errcallback.callback = apply_error_callback;
757 errcallback.previous = error_context_stack;
758 error_context_stack = &errcallback;
759
760 for (;;)
761 {
762 void *data;
763 Size len;
764
766
767 /* Ensure we are reading the data into our memory context. */
769
770 shmq_res = shm_mq_receive(mqh, &len, &data, true);
771
773 {
775 int c;
776
777 if (len == 0)
778 elog(ERROR, "invalid message length");
779
781
782 /*
783 * The first byte of messages sent from leader apply worker to
784 * parallel apply workers can only be PqReplMsg_WALData.
785 */
786 c = pq_getmsgbyte(&s);
787 if (c != PqReplMsg_WALData)
788 elog(ERROR, "unexpected message \"%c\"", c);
789
790 /*
791 * Ignore statistics fields that have been updated by the leader
792 * apply worker.
793 *
794 * XXX We can avoid sending the statistics fields from the leader
795 * apply worker but for that, it needs to rebuild the entire
796 * message by removing these fields which could be more work than
797 * simply ignoring these fields in the parallel apply worker.
798 */
800
801 apply_dispatch(&s);
802 }
803 else if (shmq_res == SHM_MQ_WOULD_BLOCK)
804 {
805 /* Replay the changes from the file, if any. */
807 {
808 int rc;
809
810 /* Wait for more work. */
811 rc = WaitLatch(MyLatch,
813 1000L,
815
816 if (rc & WL_LATCH_SET)
818
819 /*
820 * Force stats reporting to avoid long delays. There can be
821 * long idle gaps before the leader assigns the next
822 * transaction, and the only opportunity to report stats
823 * during such gaps is here.
824 */
825 if ((rc & WL_TIMEOUT) && !IsTransactionState())
826 pgstat_report_stat(true);
827 }
828 }
829 else
830 {
832
835 errmsg("lost connection to the logical replication apply worker")));
836 }
837
840 }
841
842 /* Pop the error context stack. */
843 error_context_stack = errcallback.previous;
844
846}
847
848/*
849 * Make sure the leader apply worker tries to read from our error queue one more
850 * time. This guards against the case where we exit uncleanly without sending
851 * an ErrorResponse, for example because some code calls proc_exit directly.
852 *
853 * Also explicitly detach from dsm segment to invoke on_dsm_detach callbacks,
854 * if any. See ParallelWorkerShutdown for details.
855 */
856static void
865
866/*
867 * Parallel apply worker entry point.
868 */
869void
871{
873 dsm_handle handle;
874 dsm_segment *seg;
875 shm_toc *toc;
876 shm_mq *mq;
878 shm_mq_handle *error_mqh;
882
884
885 /*
886 * Setup signal handling.
887 *
888 * Note: We intentionally used SIGUSR2 to trigger a graceful shutdown
889 * initiated by the leader apply worker. This helps to differentiate it
890 * from the case where we abort the current transaction and exit on
891 * receiving SIGTERM.
892 */
896
897 /*
898 * Attach to the dynamic shared memory segment for the parallel apply, and
899 * find its table of contents.
900 *
901 * Like parallel query, we don't need resource owner by this time. See
902 * ParallelWorkerMain.
903 */
904 memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle));
905 seg = dsm_attach(handle);
906 if (!seg)
909 errmsg("could not map dynamic shared memory segment")));
910
912 if (!toc)
915 errmsg("invalid magic number in dynamic shared memory segment")));
916
917 /* Look up the shared information. */
918 shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
919 MyParallelShared = shared;
920
921 /*
922 * Attach to the message queue.
923 */
926 mqh = shm_mq_attach(mq, seg, NULL);
927
928 /*
929 * Primary initialization is complete. Now, we can attach to our slot.
930 * This is to ensure that the leader apply worker does not write data to
931 * the uninitialized memory queue.
932 */
934
935 /*
936 * Register the shutdown callback after we are attached to the worker
937 * slot. This is to ensure that MyLogicalRepWorker remains valid when this
938 * callback is invoked.
939 */
941
946
947 /*
948 * Attach to the error queue.
949 */
952 error_mqh = shm_mq_attach(mq, seg, NULL);
953
954 pq_redirect_to_shm_mq(seg, error_mqh);
957
960
962
964
965 /* Setup replication origin tracking. */
968 originname, sizeof(originname));
970
971 /*
972 * The parallel apply worker doesn't need to monopolize this replication
973 * origin which was already acquired by its leader process.
974 */
978
979 /*
980 * Setup callback for syscache so that we know when something changes in
981 * the subscription relation state.
982 */
985 (Datum) 0);
986
988
990
991 /*
992 * The parallel apply worker must not get here because the parallel apply
993 * worker will only stop when it receives a SIGTERM or SIGUSR2 from the
994 * leader, or SIGINT from itself, or when there is an error. None of these
995 * cases will allow the code to reach here.
996 */
997 Assert(false);
998}
999
1000/*
1001 * Handle receipt of an interrupt indicating a parallel apply worker message.
1002 *
1003 * Note: this is called within a signal handler! All we can do is set a flag
1004 * that will cause the next CHECK_FOR_INTERRUPTS() to invoke
1005 * ProcessParallelApplyMessages().
1006 */
1007void
1009{
1010 InterruptPending = true;
1012 /* latch will be set by procsignal_sigusr1_handler */
1013}
1014
1015/*
1016 * Process a single protocol message received from a single parallel apply
1017 * worker.
1018 */
1019static void
1021{
1022 char msgtype;
1023
1024 msgtype = pq_getmsgbyte(msg);
1025
1026 switch (msgtype)
1027 {
1029 {
1031
1032 /* Parse ErrorResponse. */
1034
1035 /*
1036 * If desired, add a context line to show that this is a
1037 * message propagated from a parallel apply worker. Otherwise,
1038 * it can sometimes be confusing to understand what actually
1039 * happened.
1040 */
1041 if (edata.context)
1042 edata.context = psprintf("%s\n%s", edata.context,
1043 _("logical replication parallel apply worker"));
1044 else
1045 edata.context = pstrdup(_("logical replication parallel apply worker"));
1046
1047 /*
1048 * Context beyond that should use the error context callbacks
1049 * that were in effect in LogicalRepApplyLoop().
1050 */
1052
1053 /*
1054 * The actual error must have been reported by the parallel
1055 * apply worker.
1056 */
1057 ereport(ERROR,
1059 errmsg("logical replication parallel apply worker exited due to error"),
1060 errcontext("%s", edata.context)));
1061 }
1062
1063 /*
1064 * Don't need to do anything about NoticeResponse and
1065 * NotificationResponse as the logical replication worker doesn't
1066 * need to send messages to the client.
1067 */
1070 break;
1071
1072 default:
1073 elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
1074 msgtype, msg->len);
1075 }
1076}
1077
1078/*
1079 * Handle any queued protocol messages received from parallel apply workers.
1080 */
1081void
1083{
1084 ListCell *lc;
1085 MemoryContext oldcontext;
1086
1088
1089 /*
1090 * This is invoked from ProcessInterrupts(), and since some of the
1091 * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
1092 * for recursive calls if more signals are received while this runs. It's
1093 * unclear that recursive entry would be safe, and it doesn't seem useful
1094 * even if it is safe, so let's block interrupts until done.
1095 */
1097
1098 /*
1099 * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
1100 * don't want to risk leaking data into long-lived contexts, so let's do
1101 * our work here in a private context that we can reset on each use.
1102 */
1103 if (!hpam_context) /* first time through? */
1105 "ProcessParallelApplyMessages",
1107 else
1109
1110 oldcontext = MemoryContextSwitchTo(hpam_context);
1111
1113
1114 foreach(lc, ParallelApplyWorkerPool)
1115 {
1116 shm_mq_result res;
1117 Size nbytes;
1118 void *data;
1120
1121 /*
1122 * The leader will detach from the error queue and set it to NULL
1123 * before preparing to stop all parallel apply workers, so we don't
1124 * need to handle error messages anymore. See
1125 * logicalrep_worker_detach.
1126 */
1127 if (!winfo->error_mq_handle)
1128 continue;
1129
1130 res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true);
1131
1132 if (res == SHM_MQ_WOULD_BLOCK)
1133 continue;
1134 else if (res == SHM_MQ_SUCCESS)
1135 {
1136 StringInfoData msg;
1137
1138 initStringInfo(&msg);
1139 appendBinaryStringInfo(&msg, data, nbytes);
1141 pfree(msg.data);
1142 }
1143 else
1144 ereport(ERROR,
1146 errmsg("lost connection to the logical replication parallel apply worker")));
1147 }
1148
1149 MemoryContextSwitchTo(oldcontext);
1150
1151 /* Might as well clear the context on our way out */
1153
1155}
1156
1157/*
1158 * Send the data to the specified parallel apply worker via shared-memory
1159 * queue.
1160 *
1161 * Returns false if the attempt to send data via shared memory times out, true
1162 * otherwise.
1163 */
1164bool
1165pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
1166{
1167 int rc;
1169 TimestampTz startTime = 0;
1170
1172 Assert(!winfo->serialize_changes);
1173
1174 /*
1175 * We don't try to send data to parallel worker for 'immediate' mode. This
1176 * is primarily used for testing purposes.
1177 */
1179 return false;
1180
1181/*
1182 * This timeout is a bit arbitrary but testing revealed that it is sufficient
1183 * to send the message unless the parallel apply worker is waiting on some
1184 * lock or there is a serious resource crunch. See the comments atop this file
1185 * to know why we are using a non-blocking way to send the message.
1186 */
1187#define SHM_SEND_RETRY_INTERVAL_MS 1000
1188#define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)
1189
1190 for (;;)
1191 {
1192 result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true);
1193
1194 if (result == SHM_MQ_SUCCESS)
1195 return true;
1196 else if (result == SHM_MQ_DETACHED)
1197 ereport(ERROR,
1199 errmsg("could not send data to shared-memory queue")));
1200
1202
1203 /* Wait before retrying. */
1204 rc = WaitLatch(MyLatch,
1208
1209 if (rc & WL_LATCH_SET)
1210 {
1213 }
1214
1215 if (startTime == 0)
1216 startTime = GetCurrentTimestamp();
1217 else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
1219 return false;
1220 }
1221}
1222
1223/*
1224 * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means
1225 * that the current data and any subsequent data for this transaction will be
1226 * serialized to a file. This is done to prevent possible deadlocks with
1227 * another parallel apply worker (refer to the comments atop this file).
1228 */
1229void
1231 bool stream_locked)
1232{
1233 ereport(LOG,
1234 (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
1235 winfo->shared->xid)));
1236
1237 /*
1238 * The parallel apply worker could be stuck for some reason (say waiting
1239 * on some lock by other backend), so stop trying to send data directly to
1240 * it and start serializing data to the file instead.
1241 */
1242 winfo->serialize_changes = true;
1243
1244 /* Initialize the stream fileset. */
1245 stream_start_internal(winfo->shared->xid, true);
1246
1247 /*
1248 * Acquires the stream lock if not already to make sure that the parallel
1249 * apply worker will wait for the leader to release the stream lock until
1250 * the end of the transaction.
1251 */
1252 if (!stream_locked)
1254
1256}
1257
1258/*
1259 * Wait until the parallel apply worker's transaction state has reached or
1260 * exceeded the given xact_state.
1261 */
1262static void
1264 ParallelTransState xact_state)
1265{
1266 for (;;)
1267 {
1268 /*
1269 * Stop if the transaction state has reached or exceeded the given
1270 * xact_state.
1271 */
1272 if (pa_get_xact_state(winfo->shared) >= xact_state)
1273 break;
1274
1275 /* Wait to be signalled. */
1278 10L,
1280
1281 /* Reset the latch so we don't spin. */
1283
1284 /* An interrupt may have occurred while we were waiting. */
1286 }
1287}
1288
1289/*
1290 * Wait until the parallel apply worker's transaction finishes.
1291 */
1292static void
1294{
1295 /*
1296 * Wait until the parallel apply worker set the state to
1297 * PARALLEL_TRANS_STARTED which means it has acquired the transaction
1298 * lock. This is to prevent leader apply worker from acquiring the
1299 * transaction lock earlier than the parallel apply worker.
1300 */
1302
1303 /*
1304 * Wait for the transaction lock to be released. This is required to
1305 * detect deadlock among leader and parallel apply workers. Refer to the
1306 * comments atop this file.
1307 */
1310
1311 /*
1312 * Check if the state becomes PARALLEL_TRANS_FINISHED in case the parallel
1313 * apply worker failed while applying changes causing the lock to be
1314 * released.
1315 */
1317 ereport(ERROR,
1319 errmsg("lost connection to the logical replication parallel apply worker")));
1320}
1321
1322/*
1323 * Set the transaction state for a given parallel apply worker.
1324 */
1325void
1327 ParallelTransState xact_state)
1328{
1329 SpinLockAcquire(&wshared->mutex);
1330 wshared->xact_state = xact_state;
1331 SpinLockRelease(&wshared->mutex);
1332}
1333
1334/*
1335 * Get the transaction state for a given parallel apply worker.
1336 */
1337static ParallelTransState
1339{
1340 ParallelTransState xact_state;
1341
1342 SpinLockAcquire(&wshared->mutex);
1343 xact_state = wshared->xact_state;
1344 SpinLockRelease(&wshared->mutex);
1345
1346 return xact_state;
1347}
1348
1349/*
1350 * Cache the parallel apply worker information.
1351 */
1352void
1357
1358/*
1359 * Form a unique savepoint name for the streaming transaction.
1360 *
1361 * Note that different subscriptions for publications on different nodes can
1362 * receive same remote xid, so we need to use subscription id along with it.
1363 *
1364 * Returns the name in the supplied buffer.
1365 */
1366static void
1368{
1369 snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid);
1370}
1371
1372/*
1373 * Define a savepoint for a subxact in parallel apply worker if needed.
1374 *
1375 * The parallel apply worker can figure out if a new subtransaction was
1376 * started by checking if the new change arrived with a different xid. In that
1377 * case define a named savepoint, so that we are able to rollback to it
1378 * if required.
1379 */
1380void
1382{
1383 if (current_xid != top_xid &&
1385 {
1387 char spname[NAMEDATALEN];
1388
1390 spname, sizeof(spname));
1391
1392 elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
1393
1394 /* We must be in transaction block to define the SAVEPOINT. */
1395 if (!IsTransactionBlock())
1396 {
1397 if (!IsTransactionState())
1399
1402 }
1403
1405
1406 /*
1407 * CommitTransactionCommand is needed to start a subtransaction after
1408 * issuing a SAVEPOINT inside a transaction block (see
1409 * StartSubTransaction()).
1410 */
1412
1416 }
1417}
1418
1419/* Reset the list that maintains subtransactions. */
1420void
1422{
1423 /*
1424 * We don't need to free this explicitly as the allocated memory will be
1425 * freed at the transaction end.
1426 */
1427 subxactlist = NIL;
1428}
1429
1430/*
1431 * Handle STREAM ABORT message when the transaction was applied in a parallel
1432 * apply worker.
1433 */
1434void
1436{
1437 TransactionId xid = abort_data->xid;
1438 TransactionId subxid = abort_data->subxid;
1439
1440 /*
1441 * Update origin state so we can restart streaming from correct position
1442 * in case of crash.
1443 */
1446
1447 /*
1448 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1449 * just free the subxactlist.
1450 */
1451 if (subxid == xid)
1452 {
1454
1455 /*
1456 * Release the lock as we might be processing an empty streaming
1457 * transaction in which case the lock won't be released during
1458 * transaction rollback.
1459 *
1460 * Note that it's ok to release the transaction lock before aborting
1461 * the transaction because even if the parallel apply worker dies due
1462 * to crash or some other reason, such a transaction would still be
1463 * considered aborted.
1464 */
1466
1468
1469 if (IsTransactionBlock())
1470 {
1471 EndTransactionBlock(false);
1473 }
1474
1476
1478 }
1479 else
1480 {
1481 /* OK, so it's a subxact. Rollback to the savepoint. */
1482 int i;
1483 char spname[NAMEDATALEN];
1484
1485 pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname));
1486
1487 elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
1488
1489 /*
1490 * Search the subxactlist, determine the offset tracked for the
1491 * subxact, and truncate the list.
1492 *
1493 * Note that for an empty sub-transaction we won't find the subxid
1494 * here.
1495 */
1496 for (i = list_length(subxactlist) - 1; i >= 0; i--)
1497 {
1499
1500 if (xid_tmp == subxid)
1501 {
1505 break;
1506 }
1507 }
1508 }
1509}
1510
1511/*
1512 * Set the fileset state for a particular parallel apply worker. The fileset
1513 * will be set once the leader worker serialized all changes to the file
1514 * so that it can be used by parallel apply worker.
1515 */
1516void
1518 PartialFileSetState fileset_state)
1519{
1520 SpinLockAcquire(&wshared->mutex);
1521 wshared->fileset_state = fileset_state;
1522
1523 if (fileset_state == FS_SERIALIZE_DONE)
1524 {
1528 }
1529
1530 SpinLockRelease(&wshared->mutex);
1531}
1532
1533/*
1534 * Get the fileset state for the current parallel apply worker.
1535 */
1538{
1539 PartialFileSetState fileset_state;
1540
1542
1544 fileset_state = MyParallelShared->fileset_state;
1546
1547 return fileset_state;
1548}
1549
1550/*
1551 * Helper functions to acquire and release a lock for each stream block.
1552 *
1553 * Set locktag_field4 to PARALLEL_APPLY_LOCK_STREAM to indicate that it's a
1554 * stream lock.
1555 *
1556 * Refer to the comments atop this file to see how the stream lock is used.
1557 */
1558void
1564
1565void
1571
1572/*
1573 * Helper functions to acquire and release a lock for each local transaction
1574 * apply.
1575 *
1576 * Set locktag_field4 to PARALLEL_APPLY_LOCK_XACT to indicate that it's a
1577 * transaction lock.
1578 *
1579 * Note that all the callers must pass a remote transaction ID instead of a
1580 * local transaction ID as xid. This is because the local transaction ID will
1581 * only be assigned while applying the first change in the parallel apply but
1582 * it's possible that the first change in the parallel apply worker is blocked
1583 * by a concurrently executing transaction in another parallel apply worker. We
1584 * can only communicate the local transaction id to the leader after applying
1585 * the first change so it won't be able to wait after sending the xact finish
1586 * command using this lock.
1587 *
1588 * Refer to the comments atop this file to see how the transaction lock is
1589 * used.
1590 */
1591void
1597
1598void
1604
1605/*
1606 * Decrement the number of pending streaming blocks and wait on the stream lock
1607 * if there is no pending block available.
1608 */
1609void
1611{
1613
1614 /*
1615 * It is only possible to not have any pending stream chunks when we are
1616 * applying spooled messages.
1617 */
1619 {
1621 return;
1622
1623 elog(ERROR, "invalid pending streaming chunk 0");
1624 }
1625
1627 {
1630 }
1631}
1632
1633/*
1634 * Finish processing the streaming transaction in the leader apply worker.
1635 */
1636void
1638{
1640
1641 /*
1642 * Unlock the shared object lock so that parallel apply worker can
1643 * continue to receive and apply changes.
1644 */
1646
1647 /*
1648 * Wait for that worker to finish. This is necessary to maintain commit
1649 * order which avoids failures due to transaction dependencies and
1650 * deadlocks.
1651 */
1653
1654 if (XLogRecPtrIsValid(remote_lsn))
1655 store_flush_position(remote_lsn, winfo->shared->last_commit_end);
1656
1657 pa_free_worker(winfo);
1658}
static ParallelApplyWorkerInfo * stream_apply_worker
static List * ParallelApplyWorkerPool
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
static bool pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
#define DSM_ERROR_QUEUE_SIZE
volatile sig_atomic_t ParallelApplyMessagePending
static bool pa_can_start(void)
void HandleParallelApplyMessageInterrupt(void)
void ProcessParallelApplyMessages(void)
#define SHM_SEND_TIMEOUT_MS
#define DSM_QUEUE_SIZE
static void pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
static void ProcessParallelApplyInterrupts(void)
static void ProcessParallelApplyMessage(StringInfo msg)
static PartialFileSetState pa_get_fileset_state(void)
static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo)
#define PARALLEL_APPLY_LOCK_XACT
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
static List * subxactlist
static void pa_shutdown(int code, Datum arg)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_reset_subtrans(void)
static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared)
#define PARALLEL_APPLY_KEY_SHARED
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerShared * MyParallelShared
void pa_detach_all_error_mq(void)
static bool pa_has_spooled_message_pending(void)
static void LogicalParallelApplyLoop(shm_mq_handle *mqh)
static void pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo, ParallelTransState xact_state)
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
#define PARALLEL_APPLY_KEY_ERROR_QUEUE
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
static void pa_free_worker(ParallelApplyWorkerInfo *winfo)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
#define PARALLEL_APPLY_KEY_MQ
static void pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
#define SIZE_STATS_MESSAGE
#define SHM_SEND_RETRY_INTERVAL_MS
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_allocate_worker(TransactionId xid)
static bool pa_process_spooled_messages_if_required(void)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
static HTAB * ParallelApplyTxnHash
#define PARALLEL_APPLY_LOCK_STREAM
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
static ParallelApplyWorkerInfo * pa_launch_parallel_worker(void)
void ParallelApplyWorkerMain(Datum main_arg)
#define PG_LOGICAL_APPLY_SHM_MAGIC
void pa_decr_and_wait_stream_block(void)
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
Definition atomics.h:439
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition atomics.h:219
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
Definition atomics.h:237
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition worker.c:5425
MemoryContext ApplyMessageContext
Definition worker.c:476
bool InitializingApplyWorker
Definition worker.c:504
void apply_dispatch(StringInfo s)
Definition worker.c:3782
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition worker.c:648
ErrorContextCallback * apply_error_context_stack
Definition worker.c:474
void stream_start_internal(TransactionId xid, bool first_segment)
Definition worker.c:1694
void set_apply_error_context_origin(char *originname)
Definition worker.c:6365
MemoryContext ApplyContext
Definition worker.c:477
void apply_error_callback(void *arg)
Definition worker.c:6223
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition worker.c:3946
void maybe_reread_subscription(void)
Definition worker.c:5046
void InitializeLogRepWorker(void)
Definition worker.c:5780
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition worker.c:2267
Subscription * MySubscription
Definition worker.c:484
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1775
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1639
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_IDLE
void BackgroundWorkerUnblockSignals(void)
Definition bgworker.c:949
#define Assert(condition)
Definition c.h:943
#define unlikely(x)
Definition c.h:438
#define MemSet(start, val, len)
Definition c.h:1107
uint32 TransactionId
Definition c.h:736
size_t Size
Definition c.h:689
uint32 result
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
int64 TimestampTz
Definition timestamp.h:39
dsm_handle dsm_segment_handle(dsm_segment *seg)
Definition dsm.c:1131
void dsm_detach(dsm_segment *seg)
Definition dsm.c:811
void * dsm_segment_address(dsm_segment *seg)
Definition dsm.c:1103
dsm_segment * dsm_create(Size size, int flags)
Definition dsm.c:524
dsm_segment * dsm_attach(dsm_handle h)
Definition dsm.c:673
uint32 dsm_handle
Definition dsm_impl.h:55
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition dynahash.c:889
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition dynahash.c:360
Datum arg
Definition elog.c:1323
ErrorContextCallback * error_context_stack
Definition elog.c:100
int errcode(int sqlerrcode)
Definition elog.c:875
#define _(x)
Definition elog.c:96
#define LOG
Definition elog.h:32
#define errcontext
Definition elog.h:200
#define DEBUG1
Definition elog.h:31
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define ereport(elevel,...)
Definition elog.h:152
#define palloc0_object(type)
Definition fe_memutils.h:75
volatile sig_atomic_t InterruptPending
Definition globals.c:32
struct Latch * MyLatch
Definition globals.c:65
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
@ PGC_SIGHUP
Definition guc.h:75
@ HASH_FIND
Definition hsearch.h:108
@ HASH_REMOVE
Definition hsearch.h:110
@ HASH_ENTER
Definition hsearch.h:109
#define HASH_CONTEXT
Definition hsearch.h:97
#define HASH_ELEM
Definition hsearch.h:90
#define HASH_BLOBS
Definition hsearch.h:92
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition interrupt.c:104
volatile sig_atomic_t ShutdownRequestPending
Definition interrupt.c:28
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
void CacheRegisterSyscacheCallback(SysCacheIdentifier cacheid, SyscacheCallbackFunction func, Datum arg)
Definition inval.c:1816
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
void proc_exit(int code)
Definition ipc.c:105
int i
Definition isn.c:77
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)
Definition launcher.c:334
void logicalrep_worker_attach(int slot)
Definition launcher.c:767
void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
Definition launcher.c:689
LogicalRepWorker * MyLogicalRepWorker
Definition launcher.c:58
int max_parallel_apply_workers_per_subscription
Definition launcher.c:56
List * list_delete_ptr(List *list, void *datum)
Definition list.c:872
List * lappend(List *list, void *datum)
Definition list.c:339
List * lappend_xid(List *list, TransactionId datum)
Definition list.c:393
bool list_member_xid(const List *list, TransactionId datum)
Definition list.c:742
List * list_truncate(List *list, int new_size)
Definition list.c:631
void UnlockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)
Definition lmgr.c:1227
void LockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)
Definition lmgr.c:1209
int LOCKMODE
Definition lockdefs.h:26
#define AccessExclusiveLock
Definition lockdefs.h:43
#define AccessShareLock
Definition lockdefs.h:36
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:403
MemoryContext TopTransactionContext
Definition mcxt.c:171
char * pstrdup(const char *in)
Definition mcxt.c:1781
void pfree(void *pointer)
Definition mcxt.c:1616
MemoryContext TopMemoryContext
Definition mcxt.c:166
MemoryContext CurrentMemoryContext
Definition mcxt.c:160
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
#define RESUME_INTERRUPTS()
Definition miscadmin.h:138
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
#define HOLD_INTERRUPTS()
Definition miscadmin.h:136
static char * errmsg
ReplOriginXactState replorigin_xact_state
Definition origin.c:168
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:243
void replorigin_session_setup(ReplOriginId node, int acquired_by)
Definition origin.c:1156
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
#define NAMEDATALEN
const void size_t len
const void * data
#define lfirst(lc)
Definition pg_list.h:172
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
static ListCell * list_nth_cell(const List *list, int n)
Definition pg_list.h:309
#define lfirst_xid(lc)
Definition pg_list.h:175
long pgstat_report_stat(bool force)
Definition pgstat.c:722
#define pqsignal
Definition port.h:547
#define snprintf
Definition port.h:260
uint64_t Datum
Definition postgres.h:70
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:332
static int32 DatumGetInt32(Datum X)
Definition postgres.h:202
#define PointerGetDatum(X)
Definition postgres.h:354
#define InvalidOid
unsigned int Oid
BackgroundWorker * MyBgworkerEntry
Definition postmaster.c:201
int pq_getmsgbyte(StringInfo msg)
Definition pqformat.c:398
void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)
Definition pqmq.c:85
void pq_parse_errornotice(StringInfo msg, ErrorData *edata)
Definition pqmq.c:229
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
Definition pqmq.c:56
char * c
e
static int fb(int x)
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition procsignal.c:288
@ PROCSIG_PARALLEL_APPLY_MESSAGE
Definition procsignal.h:38
#define PqReplMsg_WALData
Definition protocol.h:77
#define PqMsg_NotificationResponse
Definition protocol.h:41
#define PqMsg_ErrorResponse
Definition protocol.h:44
#define PqMsg_NoticeResponse
Definition protocol.h:49
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
tree ctl
Definition radixtree.h:1838
int debug_logical_replication_streaming
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
Definition shm_mq.c:226
shm_mq * shm_mq_create(void *address, Size size)
Definition shm_mq.c:179
void shm_mq_detach(shm_mq_handle *mqh)
Definition shm_mq.c:845
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
Definition shm_mq.c:208
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition shm_mq.c:574
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Definition shm_mq.c:331
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition shm_mq.c:292
shm_mq_result
Definition shm_mq.h:39
@ SHM_MQ_SUCCESS
Definition shm_mq.h:40
@ SHM_MQ_WOULD_BLOCK
Definition shm_mq.h:41
@ SHM_MQ_DETACHED
Definition shm_mq.h:42
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition shm_toc.c:88
Size shm_toc_estimate(shm_toc_estimator *e)
Definition shm_toc.c:270
shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)
Definition shm_toc.c:40
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition shm_toc.c:171
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition shm_toc.c:239
shm_toc * shm_toc_attach(uint64 magic, void *address)
Definition shm_toc.c:64
#define shm_toc_estimate_chunk(e, sz)
Definition shm_toc.h:51
#define shm_toc_initialize_estimator(e)
Definition shm_toc.h:49
#define shm_toc_estimate_keys(e, cnt)
Definition shm_toc.h:53
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
static void SpinLockInit(volatile slock_t *lock)
Definition spin.h:50
PGPROC * MyProc
Definition proc.c:71
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition stringinfo.c:281
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition stringinfo.h:157
char bgw_extra[BGW_EXTRALEN]
Definition bgworker.h:106
struct ErrorContextCallback * previous
Definition elog.h:299
void(* callback)(void *arg)
Definition elog.h:300
Definition pg_list.h:54
TimestampTz last_recv_time
TimestampTz reply_time
FileSet * stream_fileset
TimestampTz last_send_time
ParallelApplyWorkerInfo * winfo
shm_mq_handle * error_mq_handle
ParallelApplyWorkerShared * shared
pg_atomic_uint32 pending_stream_count
PartialFileSetState fileset_state
ParallelTransState xact_state
ReplOriginId origin
Definition origin.h:45
XLogRecPtr origin_lsn
Definition origin.h:46
TimestampTz origin_timestamp
Definition origin.h:47
XLogRecPtr skiplsn
void InvalidateSyncingRelStates(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
Definition syncutils.c:101
bool AllTablesyncsReady(void)
Definition tablesync.c:1629
#define TransactionIdIsValid(xid)
Definition transam.h:41
#define WL_TIMEOUT
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET
#define SIGHUP
Definition win32_port.h:158
#define SIGUSR2
Definition win32_port.h:171
ParallelTransState
@ PARALLEL_TRANS_UNKNOWN
@ PARALLEL_TRANS_STARTED
@ PARALLEL_TRANS_FINISHED
static bool am_parallel_apply_worker(void)
@ WORKERTYPE_PARALLEL_APPLY
PartialFileSetState
@ FS_EMPTY
@ FS_SERIALIZE_DONE
@ FS_READY
@ FS_SERIALIZE_IN_PROGRESS
static bool am_leader_apply_worker(void)
void DefineSavepoint(const char *name)
Definition xact.c:4424
bool IsTransactionState(void)
Definition xact.c:389
void StartTransactionCommand(void)
Definition xact.c:3109
bool IsTransactionBlock(void)
Definition xact.c:5022
void BeginTransactionBlock(void)
Definition xact.c:3975
void CommitTransactionCommand(void)
Definition xact.c:3207
void RollbackToSavepoint(const char *name)
Definition xact.c:4618
bool EndTransactionBlock(bool chain)
Definition xact.c:4095
void AbortCurrentTransaction(void)
Definition xact.c:3501
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
uint16 ReplOriginId
Definition xlogdefs.h:69
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28