175 #define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067
182 #define PARALLEL_APPLY_KEY_SHARED 1
183 #define PARALLEL_APPLY_KEY_MQ 2
184 #define PARALLEL_APPLY_KEY_ERROR_QUEUE 3
187 #define DSM_QUEUE_SIZE (16 * 1024 * 1024)
195 #define DSM_ERROR_QUEUE_SIZE (16 * 1024)
203 #define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
209 #define PARALLEL_APPLY_LOCK_STREAM 0
210 #define PARALLEL_APPLY_LOCK_XACT 1
511 entry->
winfo = winfo;
719 (
errmsg(
"logical replication parallel apply worker for subscription \"%s\" has finished",
745 "ApplyMessageContext",
810 WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);
821 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
822 errmsg(
"lost connection to the logical replication apply worker")));
889 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
890 errmsg(
"could not map dynamic shared memory segment")));
895 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
896 errmsg(
"invalid magic number in dynamic shared memory segment")));
949 originname,
sizeof(originname));
1024 _(
"logical replication parallel apply worker"));
1026 edata.
context =
pstrdup(
_(
"logical replication parallel apply worker"));
1039 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1040 errmsg(
"logical replication parallel apply worker exited due to error"),
1054 elog(
ERROR,
"unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
1086 "HandleParallelApplyMessages",
1126 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1127 errmsg(
"lost connection to the logical replication parallel apply worker")));
1168 #define SHM_SEND_RETRY_INTERVAL_MS 1000
1169 #define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)
1179 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1180 errmsg(
"could not send data to shared-memory queue")));
1188 WAIT_EVENT_LOGICAL_APPLY_SEND_DATA);
1215 (
errmsg(
"logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
1260 WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
1299 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1300 errmsg(
"lost connection to the logical replication parallel apply worker")));
1350 snprintf(spname, szsp,
"pg_sp_%u_%u", suboid, xid);
1364 if (current_xid != top_xid &&
1371 spname,
sizeof(spname));
1373 elog(
DEBUG1,
"defining savepoint %s in logical replication parallel apply worker", spname);
1468 elog(
DEBUG1,
"rolling back to savepoint %s in logical replication parallel apply worker", spname);
1481 if (xid_tmp == subxid)
1528 return fileset_state;
1604 elog(
ERROR,
"invalid pending streaming chunk 0");
struct ParallelApplyWorkerEntry ParallelApplyWorkerEntry
static ParallelApplyWorkerInfo * stream_apply_worker
static List * ParallelApplyWorkerPool
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
static bool pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
#define DSM_ERROR_QUEUE_SIZE
volatile sig_atomic_t ParallelApplyMessagePending
static bool pa_can_start(void)
void HandleParallelApplyMessageInterrupt(void)
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)
static ParallelApplyWorkerInfo * pa_launch_parallel_worker(void)
#define SHM_SEND_TIMEOUT_MS
static void pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
static void ProcessParallelApplyInterrupts(void)
static PartialFileSetState pa_get_fileset_state(void)
static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo)
#define PARALLEL_APPLY_LOCK_XACT
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
static List * subxactlist
static bool pa_has_spooled_message_pending()
static void pa_shutdown(int code, Datum arg)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_reset_subtrans(void)
static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared)
#define PARALLEL_APPLY_KEY_SHARED
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerShared * MyParallelShared
void pa_detach_all_error_mq(void)
static void LogicalParallelApplyLoop(shm_mq_handle *mqh)
static void pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo, ParallelTransState xact_state)
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
#define PARALLEL_APPLY_KEY_ERROR_QUEUE
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
static void pa_free_worker(ParallelApplyWorkerInfo *winfo)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
#define PARALLEL_APPLY_KEY_MQ
static void pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
#define SIZE_STATS_MESSAGE
#define SHM_SEND_RETRY_INTERVAL_MS
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_allocate_worker(TransactionId xid)
static bool pa_process_spooled_messages_if_required(void)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
static HTAB * ParallelApplyTxnHash
#define PARALLEL_APPLY_LOCK_STREAM
static void HandleParallelApplyMessage(StringInfo msg)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void ParallelApplyWorkerMain(Datum main_arg)
#define PG_LOGICAL_APPLY_SHM_MAGIC
void pa_decr_and_wait_stream_block(void)
void HandleParallelApplyMessages(void)
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
void stream_cleanup_files(Oid subid, TransactionId xid)
MemoryContext ApplyMessageContext
bool InitializingApplyWorker
void apply_dispatch(StringInfo s)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
ErrorContextCallback * apply_error_context_stack
void stream_start_internal(TransactionId xid, bool first_segment)
void set_apply_error_context_origin(char *originname)
MemoryContext ApplyContext
void apply_error_callback(void *arg)
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
void maybe_reread_subscription(void)
void InitializeLogRepWorker(void)
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Subscription * MySubscription
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
TimestampTz GetCurrentTimestamp(void)
void pgstat_report_activity(BackendState state, const char *cmd_str)
void BackgroundWorkerUnblockSignals(void)
#define Assert(condition)
#define MemSet(start, val, len)
static void PGresult * res
dsm_handle dsm_segment_handle(dsm_segment *seg)
void * dsm_segment_address(dsm_segment *seg)
void dsm_detach(dsm_segment *seg)
dsm_segment * dsm_attach(dsm_handle h)
dsm_segment * dsm_create(Size size, int flags)
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
ErrorContextCallback * error_context_stack
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
volatile sig_atomic_t InterruptPending
void ProcessConfigFile(GucContext context)
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
volatile sig_atomic_t ShutdownRequestPending
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define WL_EXIT_ON_PM_DEATH
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm)
void logicalrep_worker_attach(int slot)
void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
LogicalRepWorker * MyLogicalRepWorker
int max_parallel_apply_workers_per_subscription
List * list_delete_ptr(List *list, void *datum)
List * list_truncate(List *list, int new_size)
List * lappend_xid(List *list, TransactionId datum)
bool list_member_xid(const List *list, TransactionId datum)
List * lappend(List *list, void *datum)
void UnlockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)
void LockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)
#define AccessExclusiveLock
void MemoryContextReset(MemoryContext context)
MemoryContext TopTransactionContext
char * pstrdup(const char *in)
void pfree(void *pointer)
MemoryContext TopMemoryContext
void * palloc0(Size size)
MemoryContext CurrentMemoryContext
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define RESUME_INTERRUPTS()
#define CHECK_FOR_INTERRUPTS()
#define HOLD_INTERRUPTS()
TimestampTz replorigin_session_origin_timestamp
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
RepOriginId replorigin_session_origin
void replorigin_session_setup(RepOriginId node, int acquired_by)
XLogRecPtr replorigin_session_origin_lsn
static int list_length(const List *l)
static ListCell * list_nth_cell(const List *list, int n)
pqsigfunc pqsignal(int signo, pqsigfunc func)
static Datum PointerGetDatum(const void *X)
static Pointer DatumGetPointer(Datum X)
static int32 DatumGetInt32(Datum X)
BackgroundWorker * MyBgworkerEntry
void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)
void pq_parse_errornotice(StringInfo msg, ErrorData *edata)
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
#define INVALID_PROC_NUMBER
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
@ PROCSIG_PARALLEL_APPLY_MESSAGE
char * psprintf(const char *fmt,...)
MemoryContextSwitchTo(old_ctx)
int debug_logical_replication_streaming
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
shm_mq * shm_mq_create(void *address, Size size)
void shm_mq_detach(shm_mq_handle *mqh)
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
shm_toc * shm_toc_attach(uint64 magic, void *address)
shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)
Size shm_toc_estimate(shm_toc_estimator *e)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_initialize_estimator(e)
#define shm_toc_estimate_keys(e, cnt)
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
void initStringInfo(StringInfo str)
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
char bgw_extra[BGW_EXTRALEN]
struct ErrorContextCallback * previous
void(* callback)(void *arg)
TimestampTz last_recv_time
TimestampTz last_send_time
ParallelApplyWorkerInfo * winfo
shm_mq_handle * error_mq_handle
shm_mq_handle * mq_handle
ParallelApplyWorkerShared * shared
int logicalrep_worker_slot_no
pg_atomic_uint32 pending_stream_count
PartialFileSetState fileset_state
uint16 logicalrep_worker_generation
ParallelTransState xact_state
XLogRecPtr last_commit_end
bool AllTablesyncsReady(void)
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
#define TransactionIdIsValid(xid)
@ PARALLEL_TRANS_FINISHED
static bool am_parallel_apply_worker(void)
@ WORKERTYPE_PARALLEL_APPLY
@ FS_SERIALIZE_IN_PROGRESS
static bool am_leader_apply_worker(void)
void DefineSavepoint(const char *name)
bool IsTransactionState(void)
void StartTransactionCommand(void)
bool IsTransactionBlock(void)
void BeginTransactionBlock(void)
void CommitTransactionCommand(void)
void RollbackToSavepoint(const char *name)
bool EndTransactionBlock(bool chain)
void AbortCurrentTransaction(void)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr