PostgreSQL Source Code git master
Loading...
Searching...
No Matches
applyparallelworker.c File Reference
#include "postgres.h"
#include "libpq/pqformat.h"
#include "libpq/pqmq.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/logicallauncher.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/worker_internal.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
#include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "utils/wait_event.h"
Include dependency graph for applyparallelworker.c:

Go to the source code of this file.

Data Structures

struct  ParallelApplyWorkerEntry
 

Macros

#define PG_LOGICAL_APPLY_SHM_MAGIC   0x787ca067
 
#define PARALLEL_APPLY_KEY_SHARED   1
 
#define PARALLEL_APPLY_KEY_MQ   2
 
#define PARALLEL_APPLY_KEY_ERROR_QUEUE   3
 
#define DSM_QUEUE_SIZE   (16 * 1024 * 1024)
 
#define DSM_ERROR_QUEUE_SIZE   (16 * 1024)
 
#define SIZE_STATS_MESSAGE   (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
 
#define PARALLEL_APPLY_LOCK_STREAM   0
 
#define PARALLEL_APPLY_LOCK_XACT   1
 
#define SHM_SEND_RETRY_INTERVAL_MS   1000
 
#define SHM_SEND_TIMEOUT_MS   (10000 - SHM_SEND_RETRY_INTERVAL_MS)
 

Typedefs

typedef struct ParallelApplyWorkerEntry ParallelApplyWorkerEntry
 

Functions

static void pa_free_worker_info (ParallelApplyWorkerInfo *winfo)
 
static ParallelTransState pa_get_xact_state (ParallelApplyWorkerShared *wshared)
 
static PartialFileSetState pa_get_fileset_state (void)
 
static bool pa_can_start (void)
 
static bool pa_setup_dsm (ParallelApplyWorkerInfo *winfo)
 
static ParallelApplyWorkerInfopa_launch_parallel_worker (void)
 
void pa_allocate_worker (TransactionId xid)
 
ParallelApplyWorkerInfopa_find_worker (TransactionId xid)
 
static void pa_free_worker (ParallelApplyWorkerInfo *winfo)
 
void pa_detach_all_error_mq (void)
 
static bool pa_has_spooled_message_pending (void)
 
static bool pa_process_spooled_messages_if_required (void)
 
static void ProcessParallelApplyInterrupts (void)
 
static void LogicalParallelApplyLoop (shm_mq_handle *mqh)
 
static void pa_shutdown (int code, Datum arg)
 
void ParallelApplyWorkerMain (Datum main_arg)
 
void HandleParallelApplyMessageInterrupt (void)
 
static void ProcessParallelApplyMessage (StringInfo msg)
 
void ProcessParallelApplyMessages (void)
 
bool pa_send_data (ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
 
void pa_switch_to_partial_serialize (ParallelApplyWorkerInfo *winfo, bool stream_locked)
 
static void pa_wait_for_xact_state (ParallelApplyWorkerInfo *winfo, ParallelTransState xact_state)
 
static void pa_wait_for_xact_finish (ParallelApplyWorkerInfo *winfo)
 
void pa_set_xact_state (ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
 
void pa_set_stream_apply_worker (ParallelApplyWorkerInfo *winfo)
 
static void pa_savepoint_name (Oid suboid, TransactionId xid, char *spname, Size szsp)
 
void pa_start_subtrans (TransactionId current_xid, TransactionId top_xid)
 
void pa_reset_subtrans (void)
 
void pa_stream_abort (LogicalRepStreamAbortData *abort_data)
 
void pa_set_fileset_state (ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
 
void pa_lock_stream (TransactionId xid, LOCKMODE lockmode)
 
void pa_unlock_stream (TransactionId xid, LOCKMODE lockmode)
 
void pa_lock_transaction (TransactionId xid, LOCKMODE lockmode)
 
void pa_unlock_transaction (TransactionId xid, LOCKMODE lockmode)
 
void pa_decr_and_wait_stream_block (void)
 
void pa_xact_finish (ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
 

Variables

static HTABParallelApplyTxnHash = NULL
 
static ListParallelApplyWorkerPool = NIL
 
ParallelApplyWorkerSharedMyParallelShared = NULL
 
volatile sig_atomic_t ParallelApplyMessagePending = false
 
static ParallelApplyWorkerInfostream_apply_worker = NULL
 
static Listsubxactlist = NIL
 

Macro Definition Documentation

◆ DSM_ERROR_QUEUE_SIZE

#define DSM_ERROR_QUEUE_SIZE   (16 * 1024)

Definition at line 198 of file applyparallelworker.c.

◆ DSM_QUEUE_SIZE

#define DSM_QUEUE_SIZE   (16 * 1024 * 1024)

Definition at line 190 of file applyparallelworker.c.

◆ PARALLEL_APPLY_KEY_ERROR_QUEUE

#define PARALLEL_APPLY_KEY_ERROR_QUEUE   3

Definition at line 187 of file applyparallelworker.c.

◆ PARALLEL_APPLY_KEY_MQ

#define PARALLEL_APPLY_KEY_MQ   2

Definition at line 186 of file applyparallelworker.c.

◆ PARALLEL_APPLY_KEY_SHARED

#define PARALLEL_APPLY_KEY_SHARED   1

Definition at line 185 of file applyparallelworker.c.

◆ PARALLEL_APPLY_LOCK_STREAM

#define PARALLEL_APPLY_LOCK_STREAM   0

Definition at line 212 of file applyparallelworker.c.

◆ PARALLEL_APPLY_LOCK_XACT

#define PARALLEL_APPLY_LOCK_XACT   1

Definition at line 213 of file applyparallelworker.c.

◆ PG_LOGICAL_APPLY_SHM_MAGIC

#define PG_LOGICAL_APPLY_SHM_MAGIC   0x787ca067

Definition at line 178 of file applyparallelworker.c.

◆ SHM_SEND_RETRY_INTERVAL_MS

#define SHM_SEND_RETRY_INTERVAL_MS   1000

◆ SHM_SEND_TIMEOUT_MS

#define SHM_SEND_TIMEOUT_MS   (10000 - SHM_SEND_RETRY_INTERVAL_MS)

◆ SIZE_STATS_MESSAGE

#define SIZE_STATS_MESSAGE   (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))

Definition at line 206 of file applyparallelworker.c.

Typedef Documentation

◆ ParallelApplyWorkerEntry

Function Documentation

◆ HandleParallelApplyMessageInterrupt()

void HandleParallelApplyMessageInterrupt ( void  )

Definition at line 999 of file applyparallelworker.c.

1000{
1001 InterruptPending = true;
1004}
volatile sig_atomic_t ParallelApplyMessagePending
volatile sig_atomic_t InterruptPending
Definition globals.c:32
struct Latch * MyLatch
Definition globals.c:63
void SetLatch(Latch *latch)
Definition latch.c:290

References InterruptPending, MyLatch, ParallelApplyMessagePending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

◆ LogicalParallelApplyLoop()

static void LogicalParallelApplyLoop ( shm_mq_handle mqh)
static

Definition at line 738 of file applyparallelworker.c.

739{
741 ErrorContextCallback errcallback;
743
744 /*
745 * Init the ApplyMessageContext which we clean up after each replication
746 * protocol message.
747 */
749 "ApplyMessageContext",
751
752 /*
753 * Push apply error context callback. Fields will be filled while applying
754 * a change.
755 */
756 errcallback.callback = apply_error_callback;
757 errcallback.previous = error_context_stack;
758 error_context_stack = &errcallback;
759
760 for (;;)
761 {
762 void *data;
763 Size len;
764
766
767 /* Ensure we are reading the data into our memory context. */
769
770 shmq_res = shm_mq_receive(mqh, &len, &data, true);
771
773 {
775 int c;
776
777 if (len == 0)
778 elog(ERROR, "invalid message length");
779
781
782 /*
783 * The first byte of messages sent from leader apply worker to
784 * parallel apply workers can only be PqReplMsg_WALData.
785 */
786 c = pq_getmsgbyte(&s);
787 if (c != PqReplMsg_WALData)
788 elog(ERROR, "unexpected message \"%c\"", c);
789
790 /*
791 * Ignore statistics fields that have been updated by the leader
792 * apply worker.
793 *
794 * XXX We can avoid sending the statistics fields from the leader
795 * apply worker but for that, it needs to rebuild the entire
796 * message by removing these fields which could be more work than
797 * simply ignoring these fields in the parallel apply worker.
798 */
800
801 apply_dispatch(&s);
802 }
803 else if (shmq_res == SHM_MQ_WOULD_BLOCK)
804 {
805 /* Replay the changes from the file, if any. */
807 {
808 int rc;
809
810 /* Wait for more work. */
811 rc = WaitLatch(MyLatch,
813 1000L,
815
816 if (rc & WL_LATCH_SET)
818 }
819 }
820 else
821 {
823
826 errmsg("lost connection to the logical replication apply worker")));
827 }
828
831 }
832
833 /* Pop the error context stack. */
834 error_context_stack = errcallback.previous;
835
837}
static void ProcessParallelApplyInterrupts(void)
#define SIZE_STATS_MESSAGE
static bool pa_process_spooled_messages_if_required(void)
MemoryContext ApplyMessageContext
Definition worker.c:475
void apply_dispatch(StringInfo s)
Definition worker.c:3781
MemoryContext ApplyContext
Definition worker.c:476
void apply_error_callback(void *arg)
Definition worker.c:6221
#define Assert(condition)
Definition c.h:945
size_t Size
Definition c.h:691
ErrorContextCallback * error_context_stack
Definition elog.c:99
int errcode(int sqlerrcode)
Definition elog.c:874
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:403
MemoryContext CurrentMemoryContext
Definition mcxt.c:160
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
static char * errmsg
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
const void size_t len
const void * data
int pq_getmsgbyte(StringInfo msg)
Definition pqformat.c:398
char * c
static int fb(int x)
#define PqReplMsg_WALData
Definition protocol.h:77
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition shm_mq.c:574
shm_mq_result
Definition shm_mq.h:39
@ SHM_MQ_SUCCESS
Definition shm_mq.h:40
@ SHM_MQ_WOULD_BLOCK
Definition shm_mq.h:41
@ SHM_MQ_DETACHED
Definition shm_mq.h:42
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition stringinfo.h:157
struct ErrorContextCallback * previous
Definition elog.h:297
void(* callback)(void *arg)
Definition elog.h:298
#define WL_TIMEOUT
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, apply_dispatch(), apply_error_callback(), ApplyContext, ApplyMessageContext, Assert, ErrorContextCallback::callback, CurrentMemoryContext, StringInfoData::cursor, data, elog, ereport, errcode(), errmsg, ERROR, error_context_stack, fb(), initReadOnlyStringInfo(), len, MemoryContextReset(), MemoryContextSwitchTo(), MyLatch, pa_process_spooled_messages_if_required(), pq_getmsgbyte(), PqReplMsg_WALData, ErrorContextCallback::previous, ProcessParallelApplyInterrupts(), ResetLatch(), SHM_MQ_DETACHED, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, SIZE_STATS_MESSAGE, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by ParallelApplyWorkerMain().

◆ pa_allocate_worker()

void pa_allocate_worker ( TransactionId  xid)

Definition at line 474 of file applyparallelworker.c.

475{
476 bool found;
479
480 if (!pa_can_start())
481 return;
482
484 if (!winfo)
485 return;
486
487 /* First time through, initialize parallel apply worker state hashtable. */
489 {
490 HASHCTL ctl;
491
492 MemSet(&ctl, 0, sizeof(ctl));
493 ctl.keysize = sizeof(TransactionId);
494 ctl.entrysize = sizeof(ParallelApplyWorkerEntry);
495 ctl.hcxt = ApplyContext;
496
497 ParallelApplyTxnHash = hash_create("logical replication parallel apply workers hash",
498 16, &ctl,
500 }
501
502 /* Create an entry for the requested transaction. */
503 entry = hash_search(ParallelApplyTxnHash, &xid, HASH_ENTER, &found);
504 if (found)
505 elog(ERROR, "hash table corrupted");
506
507 /* Update the transaction information in shared memory. */
508 SpinLockAcquire(&winfo->shared->mutex);
510 winfo->shared->xid = xid;
511 SpinLockRelease(&winfo->shared->mutex);
512
513 winfo->in_use = true;
514 winfo->serialize_changes = false;
515 entry->winfo = winfo;
516}
static bool pa_can_start(void)
static HTAB * ParallelApplyTxnHash
static ParallelApplyWorkerInfo * pa_launch_parallel_worker(void)
#define MemSet(start, val, len)
Definition c.h:1109
uint32 TransactionId
Definition c.h:738
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition dynahash.c:952
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition dynahash.c:358
@ HASH_ENTER
Definition hsearch.h:114
#define HASH_CONTEXT
Definition hsearch.h:102
#define HASH_ELEM
Definition hsearch.h:95
#define HASH_BLOBS
Definition hsearch.h:97
tree ctl
Definition radixtree.h:1838
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
ParallelApplyWorkerInfo * winfo
ParallelApplyWorkerShared * shared
ParallelTransState xact_state
@ PARALLEL_TRANS_UNKNOWN

References ApplyContext, ctl, elog, ERROR, fb(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), ParallelApplyWorkerInfo::in_use, MemSet, ParallelApplyWorkerShared::mutex, pa_can_start(), pa_launch_parallel_worker(), PARALLEL_TRANS_UNKNOWN, ParallelApplyTxnHash, ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, SpinLockAcquire(), SpinLockRelease(), ParallelApplyWorkerEntry::winfo, ParallelApplyWorkerShared::xact_state, and ParallelApplyWorkerShared::xid.

Referenced by apply_handle_stream_start().

◆ pa_can_start()

static bool pa_can_start ( void  )
static

Definition at line 268 of file applyparallelworker.c.

269{
270 /* Only leader apply workers can start parallel apply workers. */
272 return false;
273
274 /*
275 * It is good to check for any change in the subscription parameter to
276 * avoid the case where for a very long time the change doesn't get
277 * reflected. This can happen when there is a constant flow of streaming
278 * transactions that are handled by parallel apply workers.
279 *
280 * It is better to do it before the below checks so that the latest values
281 * of subscription can be used for the checks.
282 */
284
285 /*
286 * Don't start a new parallel apply worker if the subscription is not
287 * using parallel streaming mode, or if the publisher does not support
288 * parallel apply.
289 */
291 return false;
292
293 /*
294 * Don't start a new parallel worker if user has set skiplsn as it's
295 * possible that they want to skip the streaming transaction. For
296 * streaming transactions, we need to serialize the transaction to a file
297 * so that we can get the last LSN of the transaction to judge whether to
298 * skip before starting to apply the change.
299 *
300 * One might think that we could allow parallelism if the first lsn of the
301 * transaction is greater than skiplsn, but we don't send it with the
302 * STREAM START message, and it doesn't seem worth sending the extra eight
303 * bytes with the STREAM START to enable parallelism for this case.
304 */
306 return false;
307
308 /*
309 * For streaming transactions that are being applied using a parallel
310 * apply worker, we cannot decide whether to apply the change for a
311 * relation that is not in the READY state (see
312 * should_apply_changes_for_rel) as we won't know remote_final_lsn by that
313 * time. So, we don't start the new parallel apply worker in this case.
314 */
315 if (!AllTablesyncsReady())
316 return false;
317
318 return true;
319}
void maybe_reread_subscription(void)
Definition worker.c:5044
Subscription * MySubscription
Definition worker.c:483
LogicalRepWorker * MyLogicalRepWorker
Definition launcher.c:57
XLogRecPtr skiplsn
bool AllTablesyncsReady(void)
Definition tablesync.c:1597
static bool am_leader_apply_worker(void)
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29

References AllTablesyncsReady(), am_leader_apply_worker(), maybe_reread_subscription(), MyLogicalRepWorker, MySubscription, LogicalRepWorker::parallel_apply, Subscription::skiplsn, and XLogRecPtrIsValid.

Referenced by pa_allocate_worker().

◆ pa_decr_and_wait_stream_block()

void pa_decr_and_wait_stream_block ( void  )

Definition at line 1601 of file applyparallelworker.c.

1602{
1604
1605 /*
1606 * It is only possible to not have any pending stream chunks when we are
1607 * applying spooled messages.
1608 */
1610 {
1612 return;
1613
1614 elog(ERROR, "invalid pending streaming chunk 0");
1615 }
1616
1618 {
1621 }
1622}
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerShared * MyParallelShared
static bool pa_has_spooled_message_pending(void)
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
Definition atomics.h:439
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
Definition atomics.h:237
#define AccessShareLock
Definition lockdefs.h:36
pg_atomic_uint32 pending_stream_count
static bool am_parallel_apply_worker(void)

References AccessShareLock, am_parallel_apply_worker(), Assert, elog, ERROR, MyParallelShared, pa_has_spooled_message_pending(), pa_lock_stream(), pa_unlock_stream(), ParallelApplyWorkerShared::pending_stream_count, pg_atomic_read_u32(), pg_atomic_sub_fetch_u32(), and ParallelApplyWorkerShared::xid.

Referenced by apply_handle_stream_abort(), and apply_handle_stream_stop().

◆ pa_detach_all_error_mq()

void pa_detach_all_error_mq ( void  )

Definition at line 626 of file applyparallelworker.c.

627{
628 ListCell *lc;
629
631 {
633
634 if (winfo->error_mq_handle)
635 {
637 winfo->error_mq_handle = NULL;
638 }
639 }
640}
static List * ParallelApplyWorkerPool
#define lfirst(lc)
Definition pg_list.h:172
void shm_mq_detach(shm_mq_handle *mqh)
Definition shm_mq.c:845
shm_mq_handle * error_mq_handle

References ParallelApplyWorkerInfo::error_mq_handle, fb(), lfirst, ParallelApplyWorkerPool, and shm_mq_detach().

Referenced by logicalrep_worker_detach().

◆ pa_find_worker()

ParallelApplyWorkerInfo * pa_find_worker ( TransactionId  xid)

Definition at line 522 of file applyparallelworker.c.

523{
524 bool found;
526
527 if (!TransactionIdIsValid(xid))
528 return NULL;
529
531 return NULL;
532
533 /* Return the cached parallel apply worker if valid. */
535 return stream_apply_worker;
536
537 /* Find an entry for the requested transaction. */
538 entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
539 if (found)
540 {
541 /* The worker must not have exited. */
542 Assert(entry->winfo->in_use);
543 return entry->winfo;
544 }
545
546 return NULL;
547}
static ParallelApplyWorkerInfo * stream_apply_worker
@ HASH_FIND
Definition hsearch.h:113
#define TransactionIdIsValid(xid)
Definition transam.h:41

References Assert, fb(), HASH_FIND, hash_search(), ParallelApplyWorkerInfo::in_use, ParallelApplyTxnHash, stream_apply_worker, TransactionIdIsValid, and ParallelApplyWorkerEntry::winfo.

Referenced by get_transaction_apply_action().

◆ pa_free_worker()

static void pa_free_worker ( ParallelApplyWorkerInfo winfo)
static

Definition at line 560 of file applyparallelworker.c.

561{
563 Assert(winfo->in_use);
565
567 elog(ERROR, "hash table corrupted");
568
569 /*
570 * Stop the worker if there are enough workers in the pool.
571 *
572 * XXX Additionally, we also stop the worker if the leader apply worker
573 * serialize part of the transaction data due to a send timeout. This is
574 * because the message could be partially written to the queue and there
575 * is no way to clean the queue other than resending the message until it
576 * succeeds. Instead of trying to send the data which anyway would have
577 * been serialized and then letting the parallel apply worker deal with
578 * the spurious message, we stop the worker.
579 */
580 if (winfo->serialize_changes ||
583 {
585 pa_free_worker_info(winfo);
586
587 return;
588 }
589
590 winfo->in_use = false;
591 winfo->serialize_changes = false;
592}
static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo)
static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared)
@ HASH_REMOVE
Definition hsearch.h:115
void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
Definition launcher.c:680
int max_parallel_apply_workers_per_subscription
Definition launcher.c:55
static int list_length(const List *l)
Definition pg_list.h:152
@ PARALLEL_TRANS_FINISHED

References am_parallel_apply_worker(), Assert, elog, ERROR, fb(), HASH_REMOVE, hash_search(), ParallelApplyWorkerInfo::in_use, list_length(), logicalrep_pa_worker_stop(), max_parallel_apply_workers_per_subscription, pa_free_worker_info(), pa_get_xact_state(), PARALLEL_TRANS_FINISHED, ParallelApplyTxnHash, ParallelApplyWorkerPool, ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, and ParallelApplyWorkerShared::xid.

Referenced by pa_xact_finish().

◆ pa_free_worker_info()

static void pa_free_worker_info ( ParallelApplyWorkerInfo winfo)
static

Definition at line 599 of file applyparallelworker.c.

600{
601 Assert(winfo);
602
603 if (winfo->mq_handle)
604 shm_mq_detach(winfo->mq_handle);
605
606 if (winfo->error_mq_handle)
608
609 /* Unlink the files with serialized changes. */
610 if (winfo->serialize_changes)
612
613 if (winfo->dsm_seg)
614 dsm_detach(winfo->dsm_seg);
615
616 /* Remove from the worker pool. */
618
619 pfree(winfo);
620}
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition worker.c:5425
void dsm_detach(dsm_segment *seg)
Definition dsm.c:803
List * list_delete_ptr(List *list, void *datum)
Definition list.c:872
void pfree(void *pointer)
Definition mcxt.c:1616

References Assert, dsm_detach(), ParallelApplyWorkerInfo::dsm_seg, ParallelApplyWorkerInfo::error_mq_handle, list_delete_ptr(), ParallelApplyWorkerInfo::mq_handle, MyLogicalRepWorker, ParallelApplyWorkerPool, pfree(), ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, shm_mq_detach(), stream_cleanup_files(), LogicalRepWorker::subid, and ParallelApplyWorkerShared::xid.

Referenced by pa_free_worker(), and pa_launch_parallel_worker().

◆ pa_get_fileset_state()

◆ pa_get_xact_state()

static ParallelTransState pa_get_xact_state ( ParallelApplyWorkerShared wshared)
static

Definition at line 1329 of file applyparallelworker.c.

1330{
1331 ParallelTransState xact_state;
1332
1333 SpinLockAcquire(&wshared->mutex);
1334 xact_state = wshared->xact_state;
1335 SpinLockRelease(&wshared->mutex);
1336
1337 return xact_state;
1338}
ParallelTransState

References fb(), SpinLockAcquire(), and SpinLockRelease().

Referenced by pa_free_worker(), pa_wait_for_xact_finish(), and pa_wait_for_xact_state().

◆ pa_has_spooled_message_pending()

static bool pa_has_spooled_message_pending ( void  )
static

Definition at line 646 of file applyparallelworker.c.

647{
648 PartialFileSetState fileset_state;
649
650 fileset_state = pa_get_fileset_state();
651
652 return (fileset_state != FS_EMPTY);
653}
static PartialFileSetState pa_get_fileset_state(void)
@ FS_EMPTY

References FS_EMPTY, and pa_get_fileset_state().

Referenced by pa_decr_and_wait_stream_block().

◆ pa_launch_parallel_worker()

static ParallelApplyWorkerInfo * pa_launch_parallel_worker ( void  )
static

Definition at line 407 of file applyparallelworker.c.

408{
409 MemoryContext oldcontext;
410 bool launched;
412 ListCell *lc;
413
414 /* Try to get an available parallel apply worker from the worker pool. */
416 {
417 winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
418
419 if (!winfo->in_use)
420 return winfo;
421 }
422
423 /*
424 * Start a new parallel apply worker.
425 *
426 * The worker info can be used for the lifetime of the worker process, so
427 * create it in a permanent context.
428 */
430
432
433 /* Setup shared memory. */
434 if (!pa_setup_dsm(winfo))
435 {
436 MemoryContextSwitchTo(oldcontext);
437 pfree(winfo);
438 return NULL;
439 }
440
448 false);
449
450 if (launched)
451 {
453 }
454 else
455 {
456 pa_free_worker_info(winfo);
457 winfo = NULL;
458 }
459
460 MemoryContextSwitchTo(oldcontext);
461
462 return winfo;
463}
static bool pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
dsm_handle dsm_segment_handle(dsm_segment *seg)
Definition dsm.c:1123
#define palloc0_object(type)
Definition fe_memutils.h:75
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)
Definition launcher.c:325
List * lappend(List *list, void *datum)
Definition list.c:339
#define InvalidOid
@ WORKERTYPE_PARALLEL_APPLY

References ApplyContext, LogicalRepWorker::dbid, ParallelApplyWorkerInfo::dsm_seg, dsm_segment_handle(), fb(), ParallelApplyWorkerInfo::in_use, InvalidOid, lappend(), lfirst, logicalrep_worker_launch(), MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, Subscription::name, Subscription::oid, pa_free_worker_info(), pa_setup_dsm(), palloc0_object, ParallelApplyWorkerPool, pfree(), LogicalRepWorker::userid, and WORKERTYPE_PARALLEL_APPLY.

Referenced by pa_allocate_worker().

◆ pa_lock_stream()

void pa_lock_stream ( TransactionId  xid,
LOCKMODE  lockmode 
)

◆ pa_lock_transaction()

◆ pa_process_spooled_messages_if_required()

static bool pa_process_spooled_messages_if_required ( void  )
static

Definition at line 662 of file applyparallelworker.c.

663{
664 PartialFileSetState fileset_state;
665
666 fileset_state = pa_get_fileset_state();
667
668 if (fileset_state == FS_EMPTY)
669 return false;
670
671 /*
672 * If the leader apply worker is busy serializing the partial changes then
673 * acquire the stream lock now and wait for the leader worker to finish
674 * serializing the changes. Otherwise, the parallel apply worker won't get
675 * a chance to receive a STREAM_STOP (and acquire the stream lock) until
676 * the leader had serialized all changes which can lead to undetected
677 * deadlock.
678 *
679 * Note that the fileset state can be FS_SERIALIZE_DONE once the leader
680 * worker has finished serializing the changes.
681 */
682 if (fileset_state == FS_SERIALIZE_IN_PROGRESS)
683 {
686
687 fileset_state = pa_get_fileset_state();
688 }
689
690 /*
691 * We cannot read the file immediately after the leader has serialized all
692 * changes to the file because there may still be messages in the memory
693 * queue. We will apply all spooled messages the next time we call this
694 * function and that will ensure there are no messages left in the memory
695 * queue.
696 */
697 if (fileset_state == FS_SERIALIZE_DONE)
698 {
700 }
701 else if (fileset_state == FS_READY)
702 {
707 }
708
709 return true;
710}
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition worker.c:2266
@ FS_SERIALIZE_DONE
@ FS_READY
@ FS_SERIALIZE_IN_PROGRESS
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References AccessShareLock, apply_spooled_messages(), ParallelApplyWorkerShared::fileset, FS_EMPTY, FS_READY, FS_SERIALIZE_DONE, FS_SERIALIZE_IN_PROGRESS, InvalidXLogRecPtr, MyParallelShared, pa_get_fileset_state(), pa_lock_stream(), pa_set_fileset_state(), pa_unlock_stream(), and ParallelApplyWorkerShared::xid.

Referenced by LogicalParallelApplyLoop().

◆ pa_reset_subtrans()

void pa_reset_subtrans ( void  )

Definition at line 1412 of file applyparallelworker.c.

1413{
1414 /*
1415 * We don't need to free this explicitly as the allocated memory will be
1416 * freed at the transaction end.
1417 */
1418 subxactlist = NIL;
1419}
static List * subxactlist
#define NIL
Definition pg_list.h:68

References NIL, and subxactlist.

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), and pa_stream_abort().

◆ pa_savepoint_name()

static void pa_savepoint_name ( Oid  suboid,
TransactionId  xid,
char spname,
Size  szsp 
)
static

Definition at line 1358 of file applyparallelworker.c.

1359{
1360 snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid);
1361}
#define snprintf
Definition port.h:260

References fb(), and snprintf.

Referenced by pa_start_subtrans(), and pa_stream_abort().

◆ pa_send_data()

bool pa_send_data ( ParallelApplyWorkerInfo winfo,
Size  nbytes,
const void data 
)

Definition at line 1156 of file applyparallelworker.c.

1157{
1158 int rc;
1159 shm_mq_result result;
1160 TimestampTz startTime = 0;
1161
1163 Assert(!winfo->serialize_changes);
1164
1165 /*
1166 * We don't try to send data to parallel worker for 'immediate' mode. This
1167 * is primarily used for testing purposes.
1168 */
1170 return false;
1171
1172/*
1173 * This timeout is a bit arbitrary but testing revealed that it is sufficient
1174 * to send the message unless the parallel apply worker is waiting on some
1175 * lock or there is a serious resource crunch. See the comments atop this file
1176 * to know why we are using a non-blocking way to send the message.
1177 */
1178#define SHM_SEND_RETRY_INTERVAL_MS 1000
1179#define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)
1180
1181 for (;;)
1182 {
1183 result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true);
1184
1185 if (result == SHM_MQ_SUCCESS)
1186 return true;
1187 else if (result == SHM_MQ_DETACHED)
1188 ereport(ERROR,
1190 errmsg("could not send data to shared-memory queue")));
1191
1192 Assert(result == SHM_MQ_WOULD_BLOCK);
1193
1194 /* Wait before retrying. */
1195 rc = WaitLatch(MyLatch,
1199
1200 if (rc & WL_LATCH_SET)
1201 {
1204 }
1205
1206 if (startTime == 0)
1207 startTime = GetCurrentTimestamp();
1208 else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
1210 return false;
1211 }
1212}
#define SHM_SEND_TIMEOUT_MS
#define SHM_SEND_RETRY_INTERVAL_MS
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1772
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1636
#define unlikely(x)
Definition c.h:432
int64 TimestampTz
Definition timestamp.h:39
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
int debug_logical_replication_streaming
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Definition shm_mq.c:331
bool IsTransactionState(void)
Definition xact.c:389

References Assert, CHECK_FOR_INTERRUPTS, data, DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE, debug_logical_replication_streaming, ereport, errcode(), errmsg, ERROR, fb(), GetCurrentTimestamp(), IsTransactionState(), ParallelApplyWorkerInfo::mq_handle, MyLatch, ResetLatch(), ParallelApplyWorkerInfo::serialize_changes, SHM_MQ_DETACHED, shm_mq_send(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, SHM_SEND_RETRY_INTERVAL_MS, SHM_SEND_TIMEOUT_MS, TimestampDifferenceExceeds(), unlikely, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().

◆ pa_set_fileset_state()

◆ pa_set_stream_apply_worker()

void pa_set_stream_apply_worker ( ParallelApplyWorkerInfo winfo)

Definition at line 1344 of file applyparallelworker.c.

1345{
1346 stream_apply_worker = winfo;
1347}

References stream_apply_worker.

Referenced by apply_handle_stream_start(), and apply_handle_stream_stop().

◆ pa_set_xact_state()

void pa_set_xact_state ( ParallelApplyWorkerShared wshared,
ParallelTransState  xact_state 
)

Definition at line 1317 of file applyparallelworker.c.

1319{
1320 SpinLockAcquire(&wshared->mutex);
1321 wshared->xact_state = xact_state;
1322 SpinLockRelease(&wshared->mutex);
1323}

References fb(), SpinLockAcquire(), and SpinLockRelease().

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), and pa_stream_abort().

◆ pa_setup_dsm()

static bool pa_setup_dsm ( ParallelApplyWorkerInfo winfo)
static

Definition at line 330 of file applyparallelworker.c.

331{
333 Size segsize;
334 dsm_segment *seg;
335 shm_toc *toc;
337 shm_mq *mq;
338 Size queue_size = DSM_QUEUE_SIZE;
340
341 /*
342 * Estimate how much shared memory we need.
343 *
344 * Because the TOC machinery may choose to insert padding of oddly-sized
345 * requests, we must estimate each chunk separately.
346 *
347 * We need one key to register the location of the header, and two other
348 * keys to track the locations of the message queue and the error message
349 * queue.
350 */
353 shm_toc_estimate_chunk(&e, queue_size);
355
357 segsize = shm_toc_estimate(&e);
358
359 /* Create the shared memory segment and establish a table of contents. */
360 seg = dsm_create(shm_toc_estimate(&e), 0);
361 if (!seg)
362 return false;
363
365 segsize);
366
367 /* Set up the header region. */
368 shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared));
369 SpinLockInit(&shared->mutex);
370
374 shared->fileset_state = FS_EMPTY;
375
377
378 /* Set up message queue for the worker. */
379 mq = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
382
383 /* Attach the queue. */
384 winfo->mq_handle = shm_mq_attach(mq, seg, NULL);
385
386 /* Set up error queue for the worker. */
391
392 /* Attach the queue. */
393 winfo->error_mq_handle = shm_mq_attach(mq, seg, NULL);
394
395 /* Return results to caller. */
396 winfo->dsm_seg = seg;
397 winfo->shared = shared;
398
399 return true;
400}
#define DSM_ERROR_QUEUE_SIZE
#define DSM_QUEUE_SIZE
#define PARALLEL_APPLY_KEY_SHARED
#define PARALLEL_APPLY_KEY_ERROR_QUEUE
#define PARALLEL_APPLY_KEY_MQ
#define PG_LOGICAL_APPLY_SHM_MAGIC
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition atomics.h:219
void * dsm_segment_address(dsm_segment *seg)
Definition dsm.c:1095
dsm_segment * dsm_create(Size size, int flags)
Definition dsm.c:516
e
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
Definition shm_mq.c:226
shm_mq * shm_mq_create(void *address, Size size)
Definition shm_mq.c:179
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
Definition shm_mq.c:208
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition shm_mq.c:292
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition shm_toc.c:88
Size shm_toc_estimate(shm_toc_estimator *e)
Definition shm_toc.c:263
shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)
Definition shm_toc.c:40
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition shm_toc.c:171
#define shm_toc_estimate_chunk(e, sz)
Definition shm_toc.h:51
#define shm_toc_initialize_estimator(e)
Definition shm_toc.h:49
#define shm_toc_estimate_keys(e, cnt)
Definition shm_toc.h:53
static void SpinLockInit(volatile slock_t *lock)
Definition spin.h:50
PGPROC * MyProc
Definition proc.c:68

References dsm_create(), DSM_ERROR_QUEUE_SIZE, DSM_QUEUE_SIZE, ParallelApplyWorkerInfo::dsm_seg, dsm_segment_address(), ParallelApplyWorkerInfo::error_mq_handle, fb(), ParallelApplyWorkerShared::fileset_state, FS_EMPTY, InvalidXLogRecPtr, ParallelApplyWorkerShared::last_commit_end, ParallelApplyWorkerInfo::mq_handle, ParallelApplyWorkerShared::mutex, MyProc, PARALLEL_APPLY_KEY_ERROR_QUEUE, PARALLEL_APPLY_KEY_MQ, PARALLEL_APPLY_KEY_SHARED, PARALLEL_TRANS_UNKNOWN, ParallelApplyWorkerShared::pending_stream_count, pg_atomic_init_u32(), PG_LOGICAL_APPLY_SHM_MAGIC, ParallelApplyWorkerInfo::shared, shm_mq_attach(), shm_mq_create(), shm_mq_set_receiver(), shm_mq_set_sender(), shm_toc_allocate(), shm_toc_create(), shm_toc_estimate(), shm_toc_estimate_chunk, shm_toc_estimate_keys, shm_toc_initialize_estimator, shm_toc_insert(), SpinLockInit(), and ParallelApplyWorkerShared::xact_state.

Referenced by pa_launch_parallel_worker().

◆ pa_shutdown()

static void pa_shutdown ( int  code,
Datum  arg 
)
static

Definition at line 848 of file applyparallelworker.c.

849{
853
855}
Datum arg
Definition elog.c:1322
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:332
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition procsignal.c:287
@ PROCSIG_PARALLEL_APPLY_MESSAGE
Definition procsignal.h:38

References arg, DatumGetPointer(), dsm_detach(), INVALID_PROC_NUMBER, LogicalRepWorker::leader_pid, MyLogicalRepWorker, PROCSIG_PARALLEL_APPLY_MESSAGE, and SendProcSignal().

Referenced by ParallelApplyWorkerMain().

◆ pa_start_subtrans()

void pa_start_subtrans ( TransactionId  current_xid,
TransactionId  top_xid 
)

Definition at line 1372 of file applyparallelworker.c.

1373{
1374 if (current_xid != top_xid &&
1376 {
1378 char spname[NAMEDATALEN];
1379
1381 spname, sizeof(spname));
1382
1383 elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
1384
1385 /* We must be in transaction block to define the SAVEPOINT. */
1386 if (!IsTransactionBlock())
1387 {
1388 if (!IsTransactionState())
1390
1393 }
1394
1396
1397 /*
1398 * CommitTransactionCommand is needed to start a subtransaction after
1399 * issuing a SAVEPOINT inside a transaction block (see
1400 * StartSubTransaction()).
1401 */
1403
1407 }
1408}
static void pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
#define DEBUG1
Definition elog.h:30
List * lappend_xid(List *list, TransactionId datum)
Definition list.c:393
bool list_member_xid(const List *list, TransactionId datum)
Definition list.c:742
MemoryContext TopTransactionContext
Definition mcxt.c:171
#define NAMEDATALEN
void DefineSavepoint(const char *name)
Definition xact.c:4396
void StartTransactionCommand(void)
Definition xact.c:3081
bool IsTransactionBlock(void)
Definition xact.c:4994
void BeginTransactionBlock(void)
Definition xact.c:3947
void CommitTransactionCommand(void)
Definition xact.c:3179

References BeginTransactionBlock(), CommitTransactionCommand(), DEBUG1, DefineSavepoint(), elog, fb(), IsTransactionBlock(), IsTransactionState(), lappend_xid(), list_member_xid(), MemoryContextSwitchTo(), MySubscription, NAMEDATALEN, Subscription::oid, pa_savepoint_name(), StartTransactionCommand(), subxactlist, and TopTransactionContext.

Referenced by handle_streamed_transaction().

◆ pa_stream_abort()

void pa_stream_abort ( LogicalRepStreamAbortData abort_data)

Definition at line 1426 of file applyparallelworker.c.

1427{
1428 TransactionId xid = abort_data->xid;
1429 TransactionId subxid = abort_data->subxid;
1430
1431 /*
1432 * Update origin state so we can restart streaming from correct position
1433 * in case of crash.
1434 */
1437
1438 /*
1439 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1440 * just free the subxactlist.
1441 */
1442 if (subxid == xid)
1443 {
1445
1446 /*
1447 * Release the lock as we might be processing an empty streaming
1448 * transaction in which case the lock won't be released during
1449 * transaction rollback.
1450 *
1451 * Note that it's ok to release the transaction lock before aborting
1452 * the transaction because even if the parallel apply worker dies due
1453 * to crash or some other reason, such a transaction would still be
1454 * considered aborted.
1455 */
1457
1459
1460 if (IsTransactionBlock())
1461 {
1462 EndTransactionBlock(false);
1464 }
1465
1467
1469 }
1470 else
1471 {
1472 /* OK, so it's a subxact. Rollback to the savepoint. */
1473 int i;
1474 char spname[NAMEDATALEN];
1475
1476 pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname));
1477
1478 elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
1479
1480 /*
1481 * Search the subxactlist, determine the offset tracked for the
1482 * subxact, and truncate the list.
1483 *
1484 * Note that for an empty sub-transaction we won't find the subxid
1485 * here.
1486 */
1487 for (i = list_length(subxactlist) - 1; i >= 0; i--)
1488 {
1490
1491 if (xid_tmp == subxid)
1492 {
1496 break;
1497 }
1498 }
1499 }
1500}
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_reset_subtrans(void)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_IDLE
int i
Definition isn.c:77
List * list_truncate(List *list, int new_size)
Definition list.c:631
#define AccessExclusiveLock
Definition lockdefs.h:43
ReplOriginXactState replorigin_xact_state
Definition origin.c:167
static ListCell * list_nth_cell(const List *list, int n)
Definition pg_list.h:277
#define lfirst_xid(lc)
Definition pg_list.h:175
XLogRecPtr origin_lsn
Definition origin.h:46
TimestampTz origin_timestamp
Definition origin.h:47
void RollbackToSavepoint(const char *name)
Definition xact.c:4590
bool EndTransactionBlock(bool chain)
Definition xact.c:4067
void AbortCurrentTransaction(void)
Definition xact.c:3473

References AbortCurrentTransaction(), AccessExclusiveLock, CommitTransactionCommand(), DEBUG1, elog, EndTransactionBlock(), fb(), i, IsTransactionBlock(), lfirst_xid, list_length(), list_nth_cell(), list_truncate(), MyParallelShared, MySubscription, NAMEDATALEN, Subscription::oid, ReplOriginXactState::origin_lsn, ReplOriginXactState::origin_timestamp, pa_reset_subtrans(), pa_savepoint_name(), pa_set_xact_state(), pa_unlock_transaction(), PARALLEL_TRANS_FINISHED, pgstat_report_activity(), replorigin_xact_state, RollbackToSavepoint(), STATE_IDLE, and subxactlist.

Referenced by apply_handle_stream_abort().

◆ pa_switch_to_partial_serialize()

void pa_switch_to_partial_serialize ( ParallelApplyWorkerInfo winfo,
bool  stream_locked 
)

Definition at line 1221 of file applyparallelworker.c.

1223{
1224 ereport(LOG,
1225 (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
1226 winfo->shared->xid)));
1227
1228 /*
1229 * The parallel apply worker could be stuck for some reason (say waiting
1230 * on some lock by other backend), so stop trying to send data directly to
1231 * it and start serializing data to the file instead.
1232 */
1233 winfo->serialize_changes = true;
1234
1235 /* Initialize the stream fileset. */
1236 stream_start_internal(winfo->shared->xid, true);
1237
1238 /*
1239 * Acquires the stream lock if not already to make sure that the parallel
1240 * apply worker will wait for the leader to release the stream lock until
1241 * the end of the transaction.
1242 */
1243 if (!stream_locked)
1245
1247}
void stream_start_internal(TransactionId xid, bool first_segment)
Definition worker.c:1693
#define LOG
Definition elog.h:31

References AccessExclusiveLock, ereport, errmsg, fb(), FS_SERIALIZE_IN_PROGRESS, LOG, pa_lock_stream(), pa_set_fileset_state(), ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, stream_start_internal(), and ParallelApplyWorkerShared::xid.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().

◆ pa_unlock_stream()

void pa_unlock_stream ( TransactionId  xid,
LOCKMODE  lockmode 
)

◆ pa_unlock_transaction()

◆ pa_wait_for_xact_finish()

static void pa_wait_for_xact_finish ( ParallelApplyWorkerInfo winfo)
static

Definition at line 1284 of file applyparallelworker.c.

1285{
1286 /*
1287 * Wait until the parallel apply worker set the state to
1288 * PARALLEL_TRANS_STARTED which means it has acquired the transaction
1289 * lock. This is to prevent leader apply worker from acquiring the
1290 * transaction lock earlier than the parallel apply worker.
1291 */
1293
1294 /*
1295 * Wait for the transaction lock to be released. This is required to
1296 * detect deadlock among leader and parallel apply workers. Refer to the
1297 * comments atop this file.
1298 */
1301
1302 /*
1303 * Check if the state becomes PARALLEL_TRANS_FINISHED in case the parallel
1304 * apply worker failed while applying changes causing the lock to be
1305 * released.
1306 */
1308 ereport(ERROR,
1310 errmsg("lost connection to the logical replication parallel apply worker")));
1311}
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
static void pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo, ParallelTransState xact_state)
@ PARALLEL_TRANS_STARTED

References AccessShareLock, ereport, errcode(), errmsg, ERROR, fb(), pa_get_xact_state(), pa_lock_transaction(), pa_unlock_transaction(), pa_wait_for_xact_state(), PARALLEL_TRANS_FINISHED, PARALLEL_TRANS_STARTED, ParallelApplyWorkerInfo::shared, and ParallelApplyWorkerShared::xid.

Referenced by pa_xact_finish().

◆ pa_wait_for_xact_state()

static void pa_wait_for_xact_state ( ParallelApplyWorkerInfo winfo,
ParallelTransState  xact_state 
)
static

Definition at line 1254 of file applyparallelworker.c.

1256{
1257 for (;;)
1258 {
1259 /*
1260 * Stop if the transaction state has reached or exceeded the given
1261 * xact_state.
1262 */
1263 if (pa_get_xact_state(winfo->shared) >= xact_state)
1264 break;
1265
1266 /* Wait to be signalled. */
1269 10L,
1271
1272 /* Reset the latch so we don't spin. */
1274
1275 /* An interrupt may have occurred while we were waiting. */
1277 }
1278}

References CHECK_FOR_INTERRUPTS, fb(), MyLatch, pa_get_xact_state(), ResetLatch(), ParallelApplyWorkerInfo::shared, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by pa_wait_for_xact_finish().

◆ pa_xact_finish()

void pa_xact_finish ( ParallelApplyWorkerInfo winfo,
XLogRecPtr  remote_lsn 
)

Definition at line 1628 of file applyparallelworker.c.

1629{
1631
1632 /*
1633 * Unlock the shared object lock so that parallel apply worker can
1634 * continue to receive and apply changes.
1635 */
1637
1638 /*
1639 * Wait for that worker to finish. This is necessary to maintain commit
1640 * order which avoids failures due to transaction dependencies and
1641 * deadlocks.
1642 */
1644
1645 if (XLogRecPtrIsValid(remote_lsn))
1646 store_flush_position(remote_lsn, winfo->shared->last_commit_end);
1647
1648 pa_free_worker(winfo);
1649}
static void pa_free_worker(ParallelApplyWorkerInfo *winfo)
static void pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition worker.c:3945

References AccessExclusiveLock, am_leader_apply_worker(), Assert, ParallelApplyWorkerShared::last_commit_end, pa_free_worker(), pa_unlock_stream(), pa_wait_for_xact_finish(), ParallelApplyWorkerInfo::shared, store_flush_position(), ParallelApplyWorkerShared::xid, and XLogRecPtrIsValid.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), and apply_handle_stream_prepare().

◆ ParallelApplyWorkerMain()

void ParallelApplyWorkerMain ( Datum  main_arg)

Definition at line 861 of file applyparallelworker.c.

862{
864 dsm_handle handle;
865 dsm_segment *seg;
866 shm_toc *toc;
867 shm_mq *mq;
869 shm_mq_handle *error_mqh;
873
875
876 /*
877 * Setup signal handling.
878 *
879 * Note: We intentionally used SIGUSR2 to trigger a graceful shutdown
880 * initiated by the leader apply worker. This helps to differentiate it
881 * from the case where we abort the current transaction and exit on
882 * receiving SIGTERM.
883 */
887
888 /*
889 * Attach to the dynamic shared memory segment for the parallel apply, and
890 * find its table of contents.
891 *
892 * Like parallel query, we don't need resource owner by this time. See
893 * ParallelWorkerMain.
894 */
895 memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle));
896 seg = dsm_attach(handle);
897 if (!seg)
900 errmsg("could not map dynamic shared memory segment")));
901
903 if (!toc)
906 errmsg("invalid magic number in dynamic shared memory segment")));
907
908 /* Look up the shared information. */
909 shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
910 MyParallelShared = shared;
911
912 /*
913 * Attach to the message queue.
914 */
917 mqh = shm_mq_attach(mq, seg, NULL);
918
919 /*
920 * Primary initialization is complete. Now, we can attach to our slot.
921 * This is to ensure that the leader apply worker does not write data to
922 * the uninitialized memory queue.
923 */
925
926 /*
927 * Register the shutdown callback after we are attached to the worker
928 * slot. This is to ensure that MyLogicalRepWorker remains valid when this
929 * callback is invoked.
930 */
932
937
938 /*
939 * Attach to the error queue.
940 */
943 error_mqh = shm_mq_attach(mq, seg, NULL);
944
945 pq_redirect_to_shm_mq(seg, error_mqh);
948
951
953
955
956 /* Setup replication origin tracking. */
959 originname, sizeof(originname));
961
962 /*
963 * The parallel apply worker doesn't need to monopolize this replication
964 * origin which was already acquired by its leader process.
965 */
969
970 /*
971 * Setup callback for syscache so that we know when something changes in
972 * the subscription relation state.
973 */
976 (Datum) 0);
977
979
981
982 /*
983 * The parallel apply worker must not get here because the parallel apply
984 * worker will only stop when it receives a SIGTERM or SIGUSR2 from the
985 * leader, or SIGINT from itself, or when there is an error. None of these
986 * cases will allow the code to reach here.
987 */
988 Assert(false);
989}
static void pa_shutdown(int code, Datum arg)
static void LogicalParallelApplyLoop(shm_mq_handle *mqh)
bool InitializingApplyWorker
Definition worker.c:503
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition worker.c:647
void set_apply_error_context_origin(char *originname)
Definition worker.c:6363
void InitializeLogRepWorker(void)
Definition worker.c:5780
void BackgroundWorkerUnblockSignals(void)
Definition bgworker.c:934
dsm_segment * dsm_attach(dsm_handle h)
Definition dsm.c:665
uint32 dsm_handle
Definition dsm_impl.h:55
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition interrupt.c:104
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
void CacheRegisterSyscacheCallback(SysCacheIdentifier cacheid, SyscacheCallbackFunction func, Datum arg)
Definition inval.c:1816
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
void logicalrep_worker_attach(int slot)
Definition launcher.c:758
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:232
void replorigin_session_setup(ReplOriginId node, int acquired_by)
Definition origin.c:1147
#define pqsignal
Definition port.h:547
static Datum PointerGetDatum(const void *X)
Definition postgres.h:342
uint64_t Datum
Definition postgres.h:70
static int32 DatumGetInt32(Datum X)
Definition postgres.h:202
BackgroundWorker * MyBgworkerEntry
Definition postmaster.c:200
void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)
Definition pqmq.c:84
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
Definition pqmq.c:55
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition shm_toc.c:232
shm_toc * shm_toc_attach(uint64 magic, void *address)
Definition shm_toc.c:64
char bgw_extra[BGW_EXTRALEN]
Definition bgworker.h:106
TimestampTz last_recv_time
TimestampTz reply_time
TimestampTz last_send_time
ReplOriginId origin
Definition origin.h:45
void InvalidateSyncingRelStates(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
Definition syncutils.c:101
#define SIGHUP
Definition win32_port.h:158
#define SIGUSR2
Definition win32_port.h:171
uint16 ReplOriginId
Definition xlogdefs.h:69

References Assert, BackgroundWorkerUnblockSignals(), before_shmem_exit(), BackgroundWorker::bgw_extra, CacheRegisterSyscacheCallback(), CommitTransactionCommand(), DatumGetInt32(), dsm_attach(), dsm_segment_address(), ereport, errcode(), errmsg, ERROR, fb(), LogicalRepWorker::generation, InitializeLogRepWorker(), InitializingApplyWorker, INVALID_PROC_NUMBER, InvalidateSyncingRelStates(), InvalidOid, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, LogicalRepWorker::leader_pid, LogicalParallelApplyLoop(), logicalrep_worker_attach(), ParallelApplyWorkerShared::logicalrep_worker_generation, ParallelApplyWorkerShared::logicalrep_worker_slot_no, ParallelApplyWorkerShared::mutex, MyBgworkerEntry, MyLogicalRepWorker, MyParallelShared, MyProc, MySubscription, NAMEDATALEN, Subscription::oid, ReplOriginXactState::origin, pa_shutdown(), PARALLEL_APPLY_KEY_ERROR_QUEUE, PARALLEL_APPLY_KEY_MQ, PARALLEL_APPLY_KEY_SHARED, PG_LOGICAL_APPLY_SHM_MAGIC, PointerGetDatum(), pq_redirect_to_shm_mq(), pq_set_parallel_leader(), pqsignal, ReplicationOriginNameForLogicalRep(), replorigin_by_name(), replorigin_session_setup(), replorigin_xact_state, LogicalRepWorker::reply_time, set_apply_error_context_origin(), shm_mq_attach(), shm_mq_set_receiver(), shm_mq_set_sender(), shm_toc_attach(), shm_toc_lookup(), SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SIGUSR2, SpinLockAcquire(), SpinLockRelease(), and StartTransactionCommand().

◆ ProcessParallelApplyInterrupts()

static void ProcessParallelApplyInterrupts ( void  )
static

Definition at line 716 of file applyparallelworker.c.

717{
719
721 {
722 ereport(LOG,
723 (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",
725
726 proc_exit(0);
727 }
728
730 {
731 ConfigReloadPending = false;
733 }
734}
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
@ PGC_SIGHUP
Definition guc.h:75
volatile sig_atomic_t ShutdownRequestPending
Definition interrupt.c:28
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
void proc_exit(int code)
Definition ipc.c:105

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, ereport, errmsg, LOG, MySubscription, Subscription::name, PGC_SIGHUP, proc_exit(), ProcessConfigFile(), and ShutdownRequestPending.

Referenced by LogicalParallelApplyLoop().

◆ ProcessParallelApplyMessage()

static void ProcessParallelApplyMessage ( StringInfo  msg)
static

Definition at line 1011 of file applyparallelworker.c.

1012{
1013 char msgtype;
1014
1015 msgtype = pq_getmsgbyte(msg);
1016
1017 switch (msgtype)
1018 {
1020 {
1022
1023 /* Parse ErrorResponse. */
1025
1026 /*
1027 * If desired, add a context line to show that this is a
1028 * message propagated from a parallel apply worker. Otherwise,
1029 * it can sometimes be confusing to understand what actually
1030 * happened.
1031 */
1032 if (edata.context)
1033 edata.context = psprintf("%s\n%s", edata.context,
1034 _("logical replication parallel apply worker"));
1035 else
1036 edata.context = pstrdup(_("logical replication parallel apply worker"));
1037
1038 /*
1039 * Context beyond that should use the error context callbacks
1040 * that were in effect in LogicalRepApplyLoop().
1041 */
1043
1044 /*
1045 * The actual error must have been reported by the parallel
1046 * apply worker.
1047 */
1048 ereport(ERROR,
1050 errmsg("logical replication parallel apply worker exited due to error"),
1051 errcontext("%s", edata.context)));
1052 }
1053
1054 /*
1055 * Don't need to do anything about NoticeResponse and
1056 * NotificationResponse as the logical replication worker doesn't
1057 * need to send messages to the client.
1058 */
1061 break;
1062
1063 default:
1064 elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
1065 msgtype, msg->len);
1066 }
1067}
ErrorContextCallback * apply_error_context_stack
Definition worker.c:473
#define _(x)
Definition elog.c:95
#define errcontext
Definition elog.h:198
char * pstrdup(const char *in)
Definition mcxt.c:1781
void pq_parse_errornotice(StringInfo msg, ErrorData *edata)
Definition pqmq.c:224
#define PqMsg_NotificationResponse
Definition protocol.h:41
#define PqMsg_ErrorResponse
Definition protocol.h:44
#define PqMsg_NoticeResponse
Definition protocol.h:49
char * psprintf(const char *fmt,...)
Definition psprintf.c:43

References _, apply_error_context_stack, elog, ereport, errcode(), errcontext, errmsg, ERROR, error_context_stack, fb(), StringInfoData::len, pq_getmsgbyte(), pq_parse_errornotice(), PqMsg_ErrorResponse, PqMsg_NoticeResponse, PqMsg_NotificationResponse, psprintf(), and pstrdup().

Referenced by ProcessParallelApplyMessages().

◆ ProcessParallelApplyMessages()

void ProcessParallelApplyMessages ( void  )

Definition at line 1073 of file applyparallelworker.c.

1074{
1075 ListCell *lc;
1076 MemoryContext oldcontext;
1077
1079
1080 /*
1081 * This is invoked from ProcessInterrupts(), and since some of the
1082 * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
1083 * for recursive calls if more signals are received while this runs. It's
1084 * unclear that recursive entry would be safe, and it doesn't seem useful
1085 * even if it is safe, so let's block interrupts until done.
1086 */
1088
1089 /*
1090 * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
1091 * don't want to risk leaking data into long-lived contexts, so let's do
1092 * our work here in a private context that we can reset on each use.
1093 */
1094 if (!hpam_context) /* first time through? */
1096 "ProcessParallelApplyMessages",
1098 else
1100
1101 oldcontext = MemoryContextSwitchTo(hpam_context);
1102
1104
1105 foreach(lc, ParallelApplyWorkerPool)
1106 {
1107 shm_mq_result res;
1108 Size nbytes;
1109 void *data;
1111
1112 /*
1113 * The leader will detach from the error queue and set it to NULL
1114 * before preparing to stop all parallel apply workers, so we don't
1115 * need to handle error messages anymore. See
1116 * logicalrep_worker_detach.
1117 */
1118 if (!winfo->error_mq_handle)
1119 continue;
1120
1121 res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true);
1122
1123 if (res == SHM_MQ_WOULD_BLOCK)
1124 continue;
1125 else if (res == SHM_MQ_SUCCESS)
1126 {
1127 StringInfoData msg;
1128
1129 initStringInfo(&msg);
1130 appendBinaryStringInfo(&msg, data, nbytes);
1132 pfree(msg.data);
1133 }
1134 else
1135 ereport(ERROR,
1137 errmsg("lost connection to the logical replication parallel apply worker")));
1138 }
1139
1140 MemoryContextSwitchTo(oldcontext);
1141
1142 /* Might as well clear the context on our way out */
1144
1146}
static void ProcessParallelApplyMessage(StringInfo msg)
MemoryContext TopMemoryContext
Definition mcxt.c:166
#define RESUME_INTERRUPTS()
Definition miscadmin.h:136
#define HOLD_INTERRUPTS()
Definition miscadmin.h:134
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition stringinfo.c:281
void initStringInfo(StringInfo str)
Definition stringinfo.c:97

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, appendBinaryStringInfo(), StringInfoData::data, data, ereport, errcode(), errmsg, ERROR, ParallelApplyWorkerInfo::error_mq_handle, fb(), HOLD_INTERRUPTS, initStringInfo(), lfirst, MemoryContextReset(), MemoryContextSwitchTo(), ParallelApplyMessagePending, ParallelApplyWorkerPool, pfree(), ProcessParallelApplyMessage(), RESUME_INTERRUPTS, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, and TopMemoryContext.

Referenced by ProcessInterrupts().

Variable Documentation

◆ MyParallelShared

◆ ParallelApplyMessagePending

◆ ParallelApplyTxnHash

HTAB* ParallelApplyTxnHash = NULL
static

Definition at line 228 of file applyparallelworker.c.

Referenced by pa_allocate_worker(), pa_find_worker(), and pa_free_worker().

◆ ParallelApplyWorkerPool

◆ stream_apply_worker

ParallelApplyWorkerInfo* stream_apply_worker = NULL
static

Definition at line 255 of file applyparallelworker.c.

Referenced by pa_find_worker(), and pa_set_stream_apply_worker().

◆ subxactlist

List* subxactlist = NIL
static

Definition at line 258 of file applyparallelworker.c.

Referenced by pa_reset_subtrans(), pa_start_subtrans(), and pa_stream_abort().