PostgreSQL Source Code git master
Loading...
Searching...
No Matches
worker_internal.h File Reference
Include dependency graph for worker_internal.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  LogicalRepWorker
 
struct  ParallelApplyWorkerShared
 
struct  ParallelApplyWorkerInfo
 

Macros

#define isParallelApplyWorker(worker)
 
#define isTableSyncWorker(worker)
 
#define isSequenceSyncWorker(worker)
 

Typedefs

typedef enum LogicalRepWorkerType LogicalRepWorkerType
 
typedef struct LogicalRepWorker LogicalRepWorker
 
typedef enum ParallelTransState ParallelTransState
 
typedef enum PartialFileSetState PartialFileSetState
 
typedef struct ParallelApplyWorkerShared ParallelApplyWorkerShared
 
typedef struct ParallelApplyWorkerInfo ParallelApplyWorkerInfo
 

Enumerations

enum  LogicalRepWorkerType {
  WORKERTYPE_UNKNOWN = 0 , WORKERTYPE_TABLESYNC , WORKERTYPE_SEQUENCESYNC , WORKERTYPE_APPLY ,
  WORKERTYPE_PARALLEL_APPLY
}
 
enum  ParallelTransState { PARALLEL_TRANS_UNKNOWN , PARALLEL_TRANS_STARTED , PARALLEL_TRANS_FINISHED }
 
enum  PartialFileSetState { FS_EMPTY , FS_SERIALIZE_IN_PROGRESS , FS_SERIALIZE_DONE , FS_READY }
 

Functions

void logicalrep_worker_attach (int slot)
 
LogicalRepWorkerlogicalrep_worker_find (LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
 
Listlogicalrep_workers_find (Oid subid, bool only_running, bool acquire_lock)
 
bool logicalrep_worker_launch (LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)
 
void logicalrep_worker_stop (LogicalRepWorkerType wtype, Oid subid, Oid relid)
 
void logicalrep_pa_worker_stop (ParallelApplyWorkerInfo *winfo)
 
void logicalrep_worker_wakeup (LogicalRepWorkerType wtype, Oid subid, Oid relid)
 
void logicalrep_worker_wakeup_ptr (LogicalRepWorker *worker)
 
void logicalrep_reset_seqsync_start_time (void)
 
int logicalrep_sync_worker_count (Oid subid)
 
void ReplicationOriginNameForLogicalRep (Oid suboid, Oid relid, char *originname, Size szoriginname)
 
bool AllTablesyncsReady (void)
 
bool HasSubscriptionTablesCached (void)
 
void UpdateTwoPhaseState (Oid suboid, char new_state)
 
void ProcessSyncingTablesForSync (XLogRecPtr current_lsn)
 
void ProcessSyncingTablesForApply (XLogRecPtr current_lsn)
 
void ProcessSequencesForSync (void)
 
pg_noreturn void FinishSyncWorker (void)
 
void InvalidateSyncingRelStates (Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
 
void launch_sync_worker (LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, TimestampTz *last_start_time)
 
void ProcessSyncingRelations (XLogRecPtr current_lsn)
 
void FetchRelationStates (bool *has_pending_subtables, bool *has_pending_subsequences, bool *started_tx)
 
void stream_start_internal (TransactionId xid, bool first_segment)
 
void stream_stop_internal (TransactionId xid)
 
void apply_spooled_messages (FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
 
void apply_dispatch (StringInfo s)
 
void maybe_reread_subscription (void)
 
void stream_cleanup_files (Oid subid, TransactionId xid)
 
void set_stream_options (WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
 
void start_apply (XLogRecPtr origin_startpos)
 
void InitializeLogRepWorker (void)
 
void SetupApplyOrSyncWorker (int worker_slot)
 
void DisableSubscriptionAndExit (void)
 
void store_flush_position (XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
 
void apply_error_callback (void *arg)
 
void set_apply_error_context_origin (char *originname)
 
void pa_allocate_worker (TransactionId xid)
 
ParallelApplyWorkerInfopa_find_worker (TransactionId xid)
 
void pa_detach_all_error_mq (void)
 
bool pa_send_data (ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
 
void pa_switch_to_partial_serialize (ParallelApplyWorkerInfo *winfo, bool stream_locked)
 
void pa_set_xact_state (ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
 
void pa_set_stream_apply_worker (ParallelApplyWorkerInfo *winfo)
 
void pa_start_subtrans (TransactionId current_xid, TransactionId top_xid)
 
void pa_reset_subtrans (void)
 
void pa_stream_abort (LogicalRepStreamAbortData *abort_data)
 
void pa_set_fileset_state (ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
 
void pa_lock_stream (TransactionId xid, LOCKMODE lockmode)
 
void pa_unlock_stream (TransactionId xid, LOCKMODE lockmode)
 
void pa_lock_transaction (TransactionId xid, LOCKMODE lockmode)
 
void pa_unlock_transaction (TransactionId xid, LOCKMODE lockmode)
 
void pa_decr_and_wait_stream_block (void)
 
void pa_xact_finish (ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
 
static bool am_tablesync_worker (void)
 
static bool am_sequencesync_worker (void)
 
static bool am_leader_apply_worker (void)
 
static bool am_parallel_apply_worker (void)
 
static LogicalRepWorkerType get_logical_worker_type (void)
 

Variables

PGDLLIMPORT MemoryContext ApplyContext
 
PGDLLIMPORT MemoryContext ApplyMessageContext
 
PGDLLIMPORT ErrorContextCallbackapply_error_context_stack
 
PGDLLIMPORT ParallelApplyWorkerSharedMyParallelShared
 
PGDLLIMPORT struct WalReceiverConnLogRepWorkerWalRcvConn
 
PGDLLIMPORT SubscriptionMySubscription
 
PGDLLIMPORT LogicalRepWorkerMyLogicalRepWorker
 
PGDLLIMPORT bool in_remote_transaction
 
PGDLLIMPORT bool InitializingApplyWorker
 
PGDLLIMPORT Listtable_states_not_ready
 

Macro Definition Documentation

◆ isParallelApplyWorker

#define isParallelApplyWorker (   worker)
Value:
((worker)->in_use && \
(worker)->type == WORKERTYPE_PARALLEL_APPLY)
@ WORKERTYPE_PARALLEL_APPLY

Definition at line 362 of file worker_internal.h.

371{
373}
374
375static inline bool
377{
379}
380
381static inline bool
383{
386}
387
388static inline bool
390{
393}
394
395static inline LogicalRepWorkerType
397{
399 return MyLogicalRepWorker->type;
400}
401
402#endif /* WORKER_INTERNAL_H */
#define Assert(condition)
Definition c.h:943
LogicalRepWorkerType type
#define isParallelApplyWorker(worker)
static bool am_parallel_apply_worker(void)
PGDLLIMPORT LogicalRepWorker * MyLogicalRepWorker
Definition launcher.c:58
#define isSequenceSyncWorker(worker)
LogicalRepWorkerType
@ WORKERTYPE_APPLY
static bool am_sequencesync_worker(void)
static LogicalRepWorkerType get_logical_worker_type(void)
static bool am_leader_apply_worker(void)
#define isTableSyncWorker(worker)

◆ isSequenceSyncWorker

#define isSequenceSyncWorker (   worker)
Value:
((worker)->in_use && \
(worker)->type == WORKERTYPE_SEQUENCESYNC)
@ WORKERTYPE_SEQUENCESYNC

Definition at line 366 of file worker_internal.h.

◆ isTableSyncWorker

#define isTableSyncWorker (   worker)
Value:
((worker)->in_use && \
(worker)->type == WORKERTYPE_TABLESYNC)
@ WORKERTYPE_TABLESYNC

Definition at line 364 of file worker_internal.h.

Typedef Documentation

◆ LogicalRepWorker

◆ LogicalRepWorkerType

◆ ParallelApplyWorkerInfo

◆ ParallelApplyWorkerShared

◆ ParallelTransState

◆ PartialFileSetState

Enumeration Type Documentation

◆ LogicalRepWorkerType

Enumerator
WORKERTYPE_UNKNOWN 
WORKERTYPE_TABLESYNC 
WORKERTYPE_SEQUENCESYNC 
WORKERTYPE_APPLY 
WORKERTYPE_PARALLEL_APPLY 

Definition at line 28 of file worker_internal.h.

◆ ParallelTransState

Enumerator
PARALLEL_TRANS_UNKNOWN 
PARALLEL_TRANS_STARTED 
PARALLEL_TRANS_FINISHED 

Definition at line 119 of file worker_internal.h.

120{
ParallelTransState
@ PARALLEL_TRANS_UNKNOWN
@ PARALLEL_TRANS_STARTED
@ PARALLEL_TRANS_FINISHED

◆ PartialFileSetState

Enumerator
FS_EMPTY 
FS_SERIALIZE_IN_PROGRESS 
FS_SERIALIZE_DONE 
FS_READY 

Definition at line 142 of file worker_internal.h.

143{
144 FS_EMPTY,
147 FS_READY,
PartialFileSetState
@ FS_EMPTY
@ FS_SERIALIZE_DONE
@ FS_READY
@ FS_SERIALIZE_IN_PROGRESS

Function Documentation

◆ AllTablesyncsReady()

bool AllTablesyncsReady ( void  )
extern

Definition at line 1629 of file tablesync.c.

1630{
1631 bool started_tx;
1632 bool has_tables;
1633
1634 /* We need up-to-date sync state info for subscription tables here. */
1636
1637 if (started_tx)
1638 {
1640 pgstat_report_stat(true);
1641 }
1642
1643 /*
1644 * Return false when there are no tables in subscription or not all tables
1645 * are in ready state; true otherwise.
1646 */
1647 return has_tables && (table_states_not_ready == NIL);
1648}
#define NIL
Definition pg_list.h:68
long pgstat_report_stat(bool force)
Definition pgstat.c:722
static int fb(int x)
void FetchRelationStates(bool *has_pending_subtables, bool *has_pending_subsequences, bool *started_tx)
Definition syncutils.c:203
List * table_states_not_ready
Definition tablesync.c:127
void CommitTransactionCommand(void)
Definition xact.c:3207

References CommitTransactionCommand(), fb(), FetchRelationStates(), NIL, pgstat_report_stat(), and table_states_not_ready.

Referenced by pa_can_start(), ProcessSyncingTablesForApply(), run_apply_worker(), and wait_for_local_flush().

◆ am_leader_apply_worker()

◆ am_parallel_apply_worker()

◆ am_sequencesync_worker()

static bool am_sequencesync_worker ( void  )
inlinestatic

◆ am_tablesync_worker()

◆ apply_dispatch()

void apply_dispatch ( StringInfo  s)
extern

Definition at line 3782 of file worker.c.

3783{
3786
3787 /*
3788 * Set the current command being applied. Since this function can be
3789 * called recursively when applying spooled changes, save the current
3790 * command.
3791 */
3794
3795 switch (action)
3796 {
3799 break;
3800
3803 break;
3804
3807 break;
3808
3811 break;
3812
3815 break;
3816
3819 break;
3820
3823 break;
3824
3827 break;
3828
3831 break;
3832
3834
3835 /*
3836 * Logical replication does not use generic logical messages yet.
3837 * Although, it could be used by other applications that use this
3838 * output plugin.
3839 */
3840 break;
3841
3844 break;
3845
3848 break;
3849
3852 break;
3853
3856 break;
3857
3860 break;
3861
3864 break;
3865
3868 break;
3869
3872 break;
3873
3876 break;
3877
3878 default:
3879 ereport(ERROR,
3881 errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3882 }
3883
3884 /* Reset the current command */
3886}
static void apply_handle_stream_prepare(StringInfo s)
Definition worker.c:1525
static void apply_handle_type(StringInfo s)
Definition worker.c:2593
static void apply_handle_truncate(StringInfo s)
Definition worker.c:3654
static void apply_handle_update(StringInfo s)
Definition worker.c:2797
static void apply_handle_stream_commit(StringInfo s)
Definition worker.c:2397
static void apply_handle_commit_prepared(StringInfo s)
Definition worker.c:1412
static ApplyErrorCallbackArg apply_error_callback_arg
Definition worker.c:464
static void apply_handle_delete(StringInfo s)
Definition worker.c:3019
static void apply_handle_begin(StringInfo s)
Definition worker.c:1218
static void apply_handle_commit(StringInfo s)
Definition worker.c:1243
static void apply_handle_stream_abort(StringInfo s)
Definition worker.c:2078
static void apply_handle_relation(StringInfo s)
Definition worker.c:2570
static void apply_handle_prepare(StringInfo s)
Definition worker.c:1338
static void apply_handle_rollback_prepared(StringInfo s)
Definition worker.c:1464
static void apply_handle_stream_stop(StringInfo s)
Definition worker.c:1892
static void apply_handle_origin(StringInfo s)
Definition worker.c:1673
static void apply_handle_begin_prepare(StringInfo s)
Definition worker.c:1272
static void apply_handle_stream_start(StringInfo s)
Definition worker.c:1732
static void apply_handle_insert(StringInfo s)
Definition worker.c:2640
int errcode(int sqlerrcode)
Definition elog.c:874
#define ERROR
Definition elog.h:40
#define ereport(elevel,...)
Definition elog.h:152
#define ERRCODE_PROTOCOL_VIOLATION
Definition fe-connect.c:96
LogicalRepMsgType
@ LOGICAL_REP_MSG_INSERT
@ LOGICAL_REP_MSG_TRUNCATE
@ LOGICAL_REP_MSG_STREAM_STOP
@ LOGICAL_REP_MSG_BEGIN
@ LOGICAL_REP_MSG_STREAM_PREPARE
@ LOGICAL_REP_MSG_STREAM_ABORT
@ LOGICAL_REP_MSG_BEGIN_PREPARE
@ LOGICAL_REP_MSG_STREAM_START
@ LOGICAL_REP_MSG_COMMIT
@ LOGICAL_REP_MSG_PREPARE
@ LOGICAL_REP_MSG_RELATION
@ LOGICAL_REP_MSG_MESSAGE
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
@ LOGICAL_REP_MSG_COMMIT_PREPARED
@ LOGICAL_REP_MSG_TYPE
@ LOGICAL_REP_MSG_DELETE
@ LOGICAL_REP_MSG_STREAM_COMMIT
@ LOGICAL_REP_MSG_ORIGIN
@ LOGICAL_REP_MSG_UPDATE
static char * errmsg
int pq_getmsgbyte(StringInfo msg)
Definition pqformat.c:398
LogicalRepMsgType command
Definition worker.c:330

References apply_error_callback_arg, apply_handle_begin(), apply_handle_begin_prepare(), apply_handle_commit(), apply_handle_commit_prepared(), apply_handle_delete(), apply_handle_insert(), apply_handle_origin(), apply_handle_prepare(), apply_handle_relation(), apply_handle_rollback_prepared(), apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), apply_handle_truncate(), apply_handle_type(), apply_handle_update(), ApplyErrorCallbackArg::command, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg, ERROR, fb(), LOGICAL_REP_MSG_BEGIN, LOGICAL_REP_MSG_BEGIN_PREPARE, LOGICAL_REP_MSG_COMMIT, LOGICAL_REP_MSG_COMMIT_PREPARED, LOGICAL_REP_MSG_DELETE, LOGICAL_REP_MSG_INSERT, LOGICAL_REP_MSG_MESSAGE, LOGICAL_REP_MSG_ORIGIN, LOGICAL_REP_MSG_PREPARE, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_ROLLBACK_PREPARED, LOGICAL_REP_MSG_STREAM_ABORT, LOGICAL_REP_MSG_STREAM_COMMIT, LOGICAL_REP_MSG_STREAM_PREPARE, LOGICAL_REP_MSG_STREAM_START, LOGICAL_REP_MSG_STREAM_STOP, LOGICAL_REP_MSG_TRUNCATE, LOGICAL_REP_MSG_TYPE, LOGICAL_REP_MSG_UPDATE, and pq_getmsgbyte().

Referenced by apply_spooled_messages(), LogicalParallelApplyLoop(), and LogicalRepApplyLoop().

◆ apply_error_callback()

void apply_error_callback ( void arg)
extern

Definition at line 6222 of file worker.c.

6223{
6225
6227 return;
6228
6229 Assert(errarg->origin_name);
6230
6231 if (errarg->rel == NULL)
6232 {
6233 if (!TransactionIdIsValid(errarg->remote_xid))
6234 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
6235 errarg->origin_name,
6237 else if (!XLogRecPtrIsValid(errarg->finish_lsn))
6238 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
6239 errarg->origin_name,
6241 errarg->remote_xid);
6242 else
6243 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
6244 errarg->origin_name,
6246 errarg->remote_xid,
6247 LSN_FORMAT_ARGS(errarg->finish_lsn));
6248 }
6249 else
6250 {
6251 if (errarg->remote_attnum < 0)
6252 {
6253 if (!XLogRecPtrIsValid(errarg->finish_lsn))
6254 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
6255 errarg->origin_name,
6257 errarg->rel->remoterel.nspname,
6258 errarg->rel->remoterel.relname,
6259 errarg->remote_xid);
6260 else
6261 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
6262 errarg->origin_name,
6264 errarg->rel->remoterel.nspname,
6265 errarg->rel->remoterel.relname,
6266 errarg->remote_xid,
6267 LSN_FORMAT_ARGS(errarg->finish_lsn));
6268 }
6269 else
6270 {
6271 if (!XLogRecPtrIsValid(errarg->finish_lsn))
6272 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
6273 errarg->origin_name,
6275 errarg->rel->remoterel.nspname,
6276 errarg->rel->remoterel.relname,
6277 errarg->rel->remoterel.attnames[errarg->remote_attnum],
6278 errarg->remote_xid);
6279 else
6280 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
6281 errarg->origin_name,
6283 errarg->rel->remoterel.nspname,
6284 errarg->rel->remoterel.relname,
6285 errarg->rel->remoterel.attnames[errarg->remote_attnum],
6286 errarg->remote_xid,
6287 LSN_FORMAT_ARGS(errarg->finish_lsn));
6288 }
6289 }
6290}
#define errcontext
Definition elog.h:200
const char * logicalrep_message_type(LogicalRepMsgType action)
Definition proto.c:1212
#define TransactionIdIsValid(xid)
Definition transam.h:41
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47

References apply_error_callback_arg, Assert, ApplyErrorCallbackArg::command, errcontext, fb(), logicalrep_message_type(), LSN_FORMAT_ARGS, TransactionIdIsValid, and XLogRecPtrIsValid.

Referenced by LogicalParallelApplyLoop(), and LogicalRepApplyLoop().

◆ apply_spooled_messages()

void apply_spooled_messages ( FileSet stream_fileset,
TransactionId  xid,
XLogRecPtr  lsn 
)
extern

Definition at line 2267 of file worker.c.

2269{
2270 int nchanges;
2271 char path[MAXPGPATH];
2272 char *buffer = NULL;
2274 ResourceOwner oldowner;
2275 int fileno;
2276 pgoff_t offset;
2277
2280
2281 /* Make sure we have an open transaction */
2283
2284 /*
2285 * Allocate file handle and memory required to process all the messages in
2286 * TopTransactionContext to avoid them getting reset after each message is
2287 * processed.
2288 */
2290
2291 /* Open the spool file for the committed/prepared transaction */
2293 elog(DEBUG1, "replaying changes from file \"%s\"", path);
2294
2295 /*
2296 * Make sure the file is owned by the toplevel transaction so that the
2297 * file will not be accidentally closed when aborting a subtransaction.
2298 */
2299 oldowner = CurrentResourceOwner;
2301
2302 stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2303
2304 CurrentResourceOwner = oldowner;
2305
2306 buffer = palloc(BLCKSZ);
2307
2309
2310 remote_final_lsn = lsn;
2311
2312 /*
2313 * Make sure the handle apply_dispatch methods are aware we're in a remote
2314 * transaction.
2315 */
2316 in_remote_transaction = true;
2318
2320
2321 /*
2322 * Read the entries one by one and pass them through the same logic as in
2323 * apply_dispatch.
2324 */
2325 nchanges = 0;
2326 while (true)
2327 {
2329 size_t nbytes;
2330 int len;
2331
2333
2334 /* read length of the on-disk record */
2335 nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2336
2337 /* have we reached end of the file? */
2338 if (nbytes == 0)
2339 break;
2340
2341 /* do we have a correct length? */
2342 if (len <= 0)
2343 elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2344 len, path);
2345
2346 /* make sure we have sufficiently large buffer */
2347 buffer = repalloc(buffer, len);
2348
2349 /* and finally read the data into the buffer */
2350 BufFileReadExact(stream_fd, buffer, len);
2351
2352 BufFileTell(stream_fd, &fileno, &offset);
2353
2354 /* init a stringinfo using the buffer and call apply_dispatch */
2355 initReadOnlyStringInfo(&s2, buffer, len);
2356
2357 /* Ensure we are reading the data into our memory context. */
2359
2361
2363
2365
2366 nchanges++;
2367
2368 /*
2369 * It is possible the file has been closed because we have processed
2370 * the transaction end message like stream_commit in which case that
2371 * must be the last message.
2372 */
2373 if (!stream_fd)
2374 {
2375 ensure_last_message(stream_fileset, xid, fileno, offset);
2376 break;
2377 }
2378
2379 if (nchanges % 1000 == 0)
2380 elog(DEBUG1, "replayed %d changes from file \"%s\"",
2381 nchanges, path);
2382 }
2383
2384 if (stream_fd)
2386
2387 elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2388 nchanges, path);
2389
2390 return;
2391}
static void begin_replication_step(void)
Definition worker.c:733
static void end_replication_step(void)
Definition worker.c:756
MemoryContext ApplyMessageContext
Definition worker.c:476
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition worker.c:5410
static BufFile * stream_fd
Definition worker.c:525
static void ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, pgoff_t offset)
Definition worker.c:2235
bool in_remote_transaction
Definition worker.c:489
void apply_dispatch(StringInfo s)
Definition worker.c:3782
static XLogRecPtr remote_final_lsn
Definition worker.c:490
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Definition worker.c:6084
static void stream_close_file(void)
Definition worker.c:5493
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition buffile.c:292
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition buffile.c:655
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
Definition buffile.c:665
void BufFileTell(BufFile *file, int *fileno, pgoff_t *offset)
Definition buffile.c:833
#define DEBUG1
Definition elog.h:31
#define elog(elevel,...)
Definition elog.h:228
LogicalRepWorker * MyLogicalRepWorker
Definition launcher.c:58
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:403
MemoryContext TopTransactionContext
Definition mcxt.c:171
void * repalloc(void *pointer, Size size)
Definition mcxt.c:1632
void * palloc(Size size)
Definition mcxt.c:1387
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
#define MAXPGPATH
const void size_t len
off_t pgoff_t
Definition port.h:421
char * s2
ResourceOwner TopTransactionResourceOwner
Definition resowner.c:175
ResourceOwner CurrentResourceOwner
Definition resowner.c:173
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition stringinfo.h:157

References am_parallel_apply_worker(), apply_dispatch(), ApplyMessageContext, begin_replication_step(), BufFileOpenFileSet(), BufFileReadExact(), BufFileReadMaybeEOF(), BufFileTell(), changes_filename(), CHECK_FOR_INTERRUPTS, CurrentResourceOwner, DEBUG1, elog, end_replication_step(), ensure_last_message(), ERROR, fb(), in_remote_transaction, initReadOnlyStringInfo(), len, MAXPGPATH, maybe_start_skipping_changes(), MemoryContextReset(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc(), pgstat_report_activity(), remote_final_lsn, repalloc(), s2, STATE_RUNNING, stream_close_file(), stream_fd, LogicalRepWorker::subid, TopTransactionContext, and TopTransactionResourceOwner.

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), and pa_process_spooled_messages_if_required().

◆ DisableSubscriptionAndExit()

void DisableSubscriptionAndExit ( void  )
extern

Definition at line 6007 of file worker.c.

6008{
6009 /*
6010 * Emit the error message, and recover from the error state to an idle
6011 * state
6012 */
6014
6018
6020
6021 /*
6022 * Report the worker failed during sequence synchronization, table
6023 * synchronization, or apply.
6024 */
6026
6027 /* Disable the subscription */
6029
6030 /*
6031 * Updating pg_subscription might involve TOAST table access, so ensure we
6032 * have a valid snapshot.
6033 */
6035
6039
6040 /* Ensure we remove no-longer-useful entry for worker's start time */
6043
6044 /* Notify the subscription has been disabled and exit */
6045 ereport(LOG,
6046 errmsg("subscription \"%s\" has been disabled because of an error",
6048
6049 /*
6050 * Skip the track_commit_timestamp check when disabling the worker due to
6051 * an error, as verifying commit timestamps is unnecessary in this
6052 * context.
6053 */
6057
6058 proc_exit(0);
6059}
Subscription * MySubscription
Definition worker.c:484
void EmitErrorReport(void)
Definition elog.c:1882
void FlushErrorState(void)
Definition elog.c:2062
#define LOG
Definition elog.h:32
#define WARNING
Definition elog.h:37
void proc_exit(int code)
Definition ipc.c:105
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition launcher.c:1155
#define RESUME_INTERRUPTS()
Definition miscadmin.h:138
#define HOLD_INTERRUPTS()
Definition miscadmin.h:136
void DisableSubscription(Oid subid)
void pgstat_report_subscription_error(Oid subid)
Snapshot GetTransactionSnapshot(void)
Definition snapmgr.c:272
void PushActiveSnapshot(Snapshot snapshot)
Definition snapmgr.c:682
void PopActiveSnapshot(void)
Definition snapmgr.c:775
void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
void StartTransactionCommand(void)
Definition xact.c:3109
void AbortOutOfAnyTransaction(void)
Definition xact.c:4913

References AbortOutOfAnyTransaction(), am_leader_apply_worker(), ApplyLauncherForgetWorkerStartTime(), CheckSubDeadTupleRetention(), CommitTransactionCommand(), DisableSubscription(), EmitErrorReport(), ereport, errmsg, FlushErrorState(), GetTransactionSnapshot(), HOLD_INTERRUPTS, LOG, MyLogicalRepWorker, MySubscription, Subscription::name, Subscription::oid, pgstat_report_subscription_error(), PopActiveSnapshot(), proc_exit(), PushActiveSnapshot(), RESUME_INTERRUPTS, Subscription::retaindeadtuples, Subscription::retentionactive, StartTransactionCommand(), LogicalRepWorker::subid, and WARNING.

Referenced by start_apply(), start_sequence_sync(), and start_table_sync().

◆ FetchRelationStates()

void FetchRelationStates ( bool has_pending_subtables,
bool has_pending_subsequences,
bool started_tx 
)
extern

Definition at line 203 of file syncutils.c.

206{
207 /*
208 * has_subtables and has_subsequences_non_ready are declared as static,
209 * since the same value can be used until the system table is invalidated.
210 */
211 static bool has_subtables = false;
212 static bool has_subsequences_non_ready = false;
213
214 *started_tx = false;
215
217 {
219 List *rstates;
220 SubscriptionRelState *rstate;
221
224
225 /* Clean the old lists. */
228
229 if (!IsTransactionState())
230 {
232 *started_tx = true;
233 }
234
235 /* Fetch tables and sequences that are in non-READY state. */
237 true);
238
239 /* Allocate the tracking info in a permanent memory context. */
242 {
245 else
246 {
248 memcpy(rstate, subrel, sizeof(SubscriptionRelState));
250 rstate);
251 }
252 }
254
255 /*
256 * Does the subscription have tables?
257 *
258 * If there were not-READY tables found then we know it does. But if
259 * table_states_not_ready was empty we still need to check again to
260 * see if there are 0 tables.
261 */
264
265 /*
266 * If the subscription relation cache has been invalidated since we
267 * entered this routine, we still use and return the relations we just
268 * finished constructing, to avoid infinite loops, but we leave the
269 * table states marked as stale so that we'll rebuild it again on next
270 * access. Otherwise, we mark the table states as valid.
271 */
274 }
275
278
281}
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
#define palloc_object(type)
Definition fe_memutils.h:74
List * lappend(List *list, void *datum)
Definition list.c:339
void list_free_deep(List *list)
Definition list.c:1560
char get_rel_relkind(Oid relid)
Definition lsyscache.c:2223
MemoryContext CacheMemoryContext
Definition mcxt.c:169
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
bool HasSubscriptionTables(Oid subid)
List * GetSubscriptionRelations(Oid subid, bool tables, bool sequences, bool not_ready)
Definition pg_list.h:54
@ SYNC_RELATIONS_STATE_VALID
Definition syncutils.c:41
@ SYNC_RELATIONS_STATE_REBUILD_STARTED
Definition syncutils.c:40
static SyncingRelationsState relation_states_validity
Definition syncutils.c:44
bool IsTransactionState(void)
Definition xact.c:389

References CacheMemoryContext, fb(), foreach_ptr, get_rel_relkind(), GetSubscriptionRelations(), HasSubscriptionTables(), IsTransactionState(), lappend(), list_free_deep(), memcpy(), MemoryContextSwitchTo(), MySubscription, NIL, Subscription::oid, palloc_object, relation_states_validity, StartTransactionCommand(), SYNC_RELATIONS_STATE_REBUILD_STARTED, SYNC_RELATIONS_STATE_VALID, and table_states_not_ready.

Referenced by AllTablesyncsReady(), HasSubscriptionTablesCached(), ProcessSequencesForSync(), and ProcessSyncingTablesForApply().

◆ FinishSyncWorker()

pg_noreturn void FinishSyncWorker ( void  )
extern

Definition at line 50 of file syncutils.c.

51{
53
54 /*
55 * Commit any outstanding transaction. This is the usual case, unless
56 * there was nothing to do for the table.
57 */
59 {
62 }
63
64 /* And flush all writes. */
66
68 {
70 errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished",
72
73 /*
74 * Reset last_seqsync_start_time, so that next time a sequencesync
75 * worker is needed it can be started promptly.
76 */
78 }
79 else
80 {
83 errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
87
88 /* Find the leader apply worker and signal it. */
91 }
92
93 /* Stop gracefully */
94 proc_exit(0);
95}
void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
Definition launcher.c:733
void logicalrep_reset_seqsync_start_time(void)
Definition launcher.c:882
char * get_rel_name(Oid relid)
Definition lsyscache.c:2148
#define InvalidOid
static bool am_tablesync_worker(void)
XLogRecPtr GetXLogWriteRecPtr(void)
Definition xlog.c:10140
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2801

References am_sequencesync_worker(), am_tablesync_worker(), Assert, CommitTransactionCommand(), ereport, errmsg, get_rel_name(), GetXLogWriteRecPtr(), InvalidOid, IsTransactionState(), LOG, logicalrep_reset_seqsync_start_time(), logicalrep_worker_wakeup(), MyLogicalRepWorker, MySubscription, Subscription::name, pgstat_report_stat(), proc_exit(), LogicalRepWorker::relid, StartTransactionCommand(), LogicalRepWorker::subid, WORKERTYPE_APPLY, and XLogFlush().

Referenced by LogicalRepSyncTableStart(), ProcessSyncingTablesForSync(), SequenceSyncWorkerMain(), and TableSyncWorkerMain().

◆ get_logical_worker_type()

static LogicalRepWorkerType get_logical_worker_type ( void  )
inlinestatic

◆ HasSubscriptionTablesCached()

bool HasSubscriptionTablesCached ( void  )
extern

Definition at line 1659 of file tablesync.c.

1660{
1661 bool started_tx;
1662 bool has_tables;
1663
1664 /* We need up-to-date subscription tables info here */
1666
1667 if (started_tx)
1668 {
1670 pgstat_report_stat(true);
1671 }
1672
1673 return has_tables;
1674}

References CommitTransactionCommand(), fb(), FetchRelationStates(), and pgstat_report_stat().

Referenced by wait_for_local_flush().

◆ InitializeLogRepWorker()

void InitializeLogRepWorker ( void  )
extern

Definition at line 5779 of file worker.c.

5780{
5781 /* Run as replica session replication role. */
5782 SetConfigOption("session_replication_role", "replica",
5784
5785 /* Connect to our database. */
5788 0);
5789
5790 /*
5791 * Set always-secure search path, so malicious users can't redirect user
5792 * code (e.g. pg_index.indexprs).
5793 */
5794 SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
5795
5797 "ApplyContext",
5799
5801
5802 /*
5803 * Lock the subscription to prevent it from being concurrently dropped,
5804 * then re-verify its existence. After the initialization, the worker will
5805 * be terminated gracefully if the subscription is dropped.
5806 */
5809
5811
5812 if (MySubscription)
5813 {
5815 }
5816 else
5817 {
5818 ereport(LOG,
5819 (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
5821
5822 /* Ensure we remove no-longer-useful entry for worker's start time */
5825
5826 proc_exit(0);
5827 }
5828
5829 MySubscriptionValid = true;
5830
5831 if (!MySubscription->enabled)
5832 {
5833 ereport(LOG,
5834 (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
5835 MySubscription->name)));
5836
5838 }
5839
5840 /*
5841 * Restart the worker if retain_dead_tuples was enabled during startup.
5842 *
5843 * At this point, the replication slot used for conflict detection might
5844 * not exist yet, or could be dropped soon if the launcher perceives
5845 * retain_dead_tuples as disabled. To avoid unnecessary tracking of
5846 * oldest_nonremovable_xid when the slot is absent or at risk of being
5847 * dropped, a restart is initiated.
5848 *
5849 * The oldest_nonremovable_xid should be initialized only when the
5850 * subscription's retention is active before launching the worker. See
5851 * logicalrep_worker_launch.
5852 */
5853 if (am_leader_apply_worker() &&
5857 {
5858 ereport(LOG,
5859 errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
5860 MySubscription->name, "retain_dead_tuples"));
5861
5863 }
5864
5865 /* Setup synchronous commit according to the user's wishes */
5866 SetConfigOption("synchronous_commit", MySubscription->synccommit,
5868
5869 /* Change wal_receiver_timeout according to the user's wishes */
5871
5872 /*
5873 * Keep us informed about subscription or role changes. Note that the
5874 * role's superuser privilege can be revoked.
5875 */
5878 (Datum) 0);
5879 /* Changes to foreign servers may affect subscriptions using SERVER. */
5882 (Datum) 0);
5883 /* Changes to user mappings may affect subscriptions using SERVER. */
5886 (Datum) 0);
5887
5888 /*
5889 * Changes to FDW connection_function may affect subscriptions using
5890 * SERVER.
5891 */
5894 (Datum) 0);
5895
5898 (Datum) 0);
5899
5900 if (am_tablesync_worker())
5901 ereport(LOG,
5902 errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
5905 else if (am_sequencesync_worker())
5906 ereport(LOG,
5907 errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started",
5909 else
5910 ereport(LOG,
5911 errmsg("logical replication apply worker for subscription \"%s\" has started",
5913
5915
5916 /*
5917 * Register a callback to reset the origin state before aborting any
5918 * pending transaction during shutdown (see ShutdownPostgres()). This will
5919 * avoid origin advancement for an incomplete transaction which could
5920 * otherwise lead to its loss as such a transaction won't be sent by the
5921 * server again.
5922 *
5923 * Note that even a LOG or DEBUG statement placed after setting the origin
5924 * state may process a shutdown signal before committing the current apply
5925 * operation. So, it is important to register such a callback here.
5926 *
5927 * Register this callback here to ensure that all types of logical
5928 * replication workers that set up origins and apply remote transactions
5929 * are protected.
5930 */
5932}
static void apply_worker_exit(void)
Definition worker.c:5011
static void subscription_change_cb(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
Definition worker.c:5210
MemoryContext ApplyContext
Definition worker.c:477
static bool MySubscriptionValid
Definition worker.c:485
static void set_wal_receiver_timeout(void)
Definition worker.c:5175
static void on_exit_clear_xact_state(int code, Datum arg)
Definition worker.c:5938
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition bgworker.c:909
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition guc.c:4234
@ PGC_S_OVERRIDE
Definition guc.h:123
@ PGC_SUSET
Definition guc.h:78
@ PGC_BACKEND
Definition guc.h:77
void CacheRegisterSyscacheCallback(SysCacheIdentifier cacheid, SyscacheCallbackFunction func, Datum arg)
Definition inval.c:1816
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
#define AccessShareLock
Definition lockdefs.h:36
void MemoryContextSetParent(MemoryContext context, MemoryContext new_parent)
Definition mcxt.c:686
MemoryContext TopMemoryContext
Definition mcxt.c:166
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
Subscription * GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
uint64_t Datum
Definition postgres.h:70
TransactionId oldest_nonremovable_xid
MemoryContext cxt

References AccessShareLock, ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, am_leader_apply_worker(), am_sequencesync_worker(), am_tablesync_worker(), apply_worker_exit(), ApplyContext, ApplyLauncherForgetWorkerStartTime(), BackgroundWorkerInitializeConnectionByOid(), before_shmem_exit(), CacheRegisterSyscacheCallback(), CommitTransactionCommand(), Subscription::cxt, LogicalRepWorker::dbid, Subscription::enabled, ereport, errmsg, fb(), get_rel_name(), GetSubscription(), LockSharedObject(), LOG, MemoryContextSetParent(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, LogicalRepWorker::oldest_nonremovable_xid, on_exit_clear_xact_state(), PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, proc_exit(), LogicalRepWorker::relid, Subscription::retaindeadtuples, Subscription::retentionactive, set_wal_receiver_timeout(), SetConfigOption(), StartTransactionCommand(), LogicalRepWorker::subid, subscription_change_cb(), Subscription::synccommit, TopMemoryContext, TransactionIdIsValid, and LogicalRepWorker::userid.

Referenced by ParallelApplyWorkerMain(), and SetupApplyOrSyncWorker().

◆ InvalidateSyncingRelStates()

void InvalidateSyncingRelStates ( Datum  arg,
SysCacheIdentifier  cacheid,
uint32  hashvalue 
)
extern

◆ launch_sync_worker()

void launch_sync_worker ( LogicalRepWorkerType  wtype,
int  nsyncworkers,
Oid  relid,
TimestampTz last_start_time 
)
extern

Definition at line 118 of file syncutils.c.

120{
122
125
126 /* If there is a free sync worker slot, start a new sync worker */
128 return;
129
131
132 if (!(*last_start_time) ||
133 TimestampDifferenceExceeds(*last_start_time, now,
135 {
136 /*
137 * Set the last_start_time even if we fail to start the worker, so
138 * that we won't retry until wal_retrieve_retry_interval has elapsed.
139 */
140 *last_start_time = now;
146 relid, DSM_HANDLE_INVALID, false);
147 }
148}
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1775
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1639
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1603
#define OidIsValid(objectId)
Definition c.h:858
int64 TimestampTz
Definition timestamp.h:39
#define DSM_HANDLE_INVALID
Definition dsm_impl.h:58
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)
Definition launcher.c:334
int max_sync_workers_per_subscription
Definition launcher.c:55
int wal_retrieve_retry_interval
Definition xlog.c:141

References Assert, LogicalRepWorker::dbid, DSM_HANDLE_INVALID, fb(), GetCurrentTimestamp(), logicalrep_worker_launch(), max_sync_workers_per_subscription, MyLogicalRepWorker, MySubscription, Subscription::name, now(), Subscription::oid, OidIsValid, TimestampDifferenceExceeds(), LogicalRepWorker::userid, wal_retrieve_retry_interval, WORKERTYPE_SEQUENCESYNC, and WORKERTYPE_TABLESYNC.

Referenced by ProcessSequencesForSync(), and ProcessSyncingTablesForApply().

◆ logicalrep_pa_worker_stop()

void logicalrep_pa_worker_stop ( ParallelApplyWorkerInfo winfo)
extern

Definition at line 689 of file launcher.c.

690{
691 int slot_no;
692 uint16 generation;
693 LogicalRepWorker *worker;
694
695 SpinLockAcquire(&winfo->shared->mutex);
696 generation = winfo->shared->logicalrep_worker_generation;
698 SpinLockRelease(&winfo->shared->mutex);
699
701
702 /*
703 * Detach from the error_mq_handle for the parallel apply worker before
704 * stopping it. This prevents the leader apply worker from trying to
705 * receive the message from the error queue that might already be detached
706 * by the parallel apply worker.
707 */
708 if (winfo->error_mq_handle)
709 {
711 winfo->error_mq_handle = NULL;
712 }
713
715
716 worker = &LogicalRepCtx->workers[slot_no];
718
719 /*
720 * Only stop the worker if the generation matches and the worker is alive.
721 */
722 if (worker->generation == generation && worker->proc)
724
726}
uint16_t uint16
Definition c.h:623
int max_logical_replication_workers
Definition launcher.c:54
static void logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
Definition launcher.c:579
static LogicalRepCtxStruct * LogicalRepCtx
Definition launcher.c:73
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_SHARED
Definition lwlock.h:105
void shm_mq_detach(shm_mq_handle *mqh)
Definition shm_mq.c:845
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition launcher.c:70
shm_mq_handle * error_mq_handle
ParallelApplyWorkerShared * shared
#define SIGUSR2
Definition win32_port.h:171

References Assert, ParallelApplyWorkerInfo::error_mq_handle, fb(), LogicalRepWorker::generation, isParallelApplyWorker, ParallelApplyWorkerShared::logicalrep_worker_generation, ParallelApplyWorkerShared::logicalrep_worker_slot_no, logicalrep_worker_stop_internal(), LogicalRepCtx, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, ParallelApplyWorkerShared::mutex, LogicalRepWorker::proc, ParallelApplyWorkerInfo::shared, shm_mq_detach(), SIGUSR2, SpinLockAcquire(), SpinLockRelease(), and LogicalRepCtxStruct::workers.

Referenced by pa_free_worker().

◆ logicalrep_reset_seqsync_start_time()

void logicalrep_reset_seqsync_start_time ( void  )
extern

Definition at line 882 of file launcher.c.

883{
884 LogicalRepWorker *worker;
885
886 /*
887 * The apply worker can't access last_seqsync_start_time concurrently, so
888 * it is okay to use SHARED lock here. See ProcessSequencesForSync().
889 */
891
894 true);
895 if (worker)
896 worker->last_seqsync_start_time = 0;
897
899}
LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
Definition launcher.c:268
TimestampTz last_seqsync_start_time

References fb(), InvalidOid, LogicalRepWorker::last_seqsync_start_time, logicalrep_worker_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLogicalRepWorker, LogicalRepWorker::subid, and WORKERTYPE_APPLY.

Referenced by FinishSyncWorker().

◆ logicalrep_sync_worker_count()

int logicalrep_sync_worker_count ( Oid  subid)
extern

Definition at line 937 of file launcher.c.

938{
939 int i;
940 int res = 0;
941
943
944 /* Search for attached worker for a given subscription id. */
945 for (i = 0; i < max_logical_replication_workers; i++)
946 {
948
949 if (w->subid == subid && (isTableSyncWorker(w) || isSequenceSyncWorker(w)))
950 res++;
951 }
952
953 return res;
954}
int i
Definition isn.c:77
bool LWLockHeldByMe(LWLock *lock)
Definition lwlock.c:1885

References Assert, fb(), i, isSequenceSyncWorker, isTableSyncWorker, LogicalRepCtx, LWLockHeldByMe(), max_logical_replication_workers, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by logicalrep_worker_launch(), ProcessSequencesForSync(), and ProcessSyncingTablesForApply().

◆ logicalrep_worker_attach()

void logicalrep_worker_attach ( int  slot)
extern

Definition at line 767 of file launcher.c.

768{
769 /* Block concurrent access. */
771
772 Assert(slot >= 0 && slot < max_logical_replication_workers);
774
776 {
780 errmsg("logical replication worker slot %d is empty, cannot attach",
781 slot)));
782 }
783
785 {
789 errmsg("logical replication worker slot %d is already used by "
790 "another worker, cannot attach", slot)));
791 }
792
795
797}
static void logicalrep_worker_onexit(int code, Datum arg)
Definition launcher.c:907
@ LW_EXCLUSIVE
Definition lwlock.h:104
PGPROC * MyProc
Definition proc.c:71

References Assert, before_shmem_exit(), ereport, errcode(), errmsg, ERROR, fb(), LogicalRepWorker::in_use, logicalrep_worker_onexit(), LogicalRepCtx, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, MyLogicalRepWorker, MyProc, LogicalRepWorker::proc, and LogicalRepCtxStruct::workers.

Referenced by ParallelApplyWorkerMain(), and SetupApplyOrSyncWorker().

◆ logicalrep_worker_find()

LogicalRepWorker * logicalrep_worker_find ( LogicalRepWorkerType  wtype,
Oid  subid,
Oid  relid,
bool  only_running 
)
extern

Definition at line 268 of file launcher.c.

270{
271 int i;
272 LogicalRepWorker *res = NULL;
273
274 /* relid must be valid only for table sync workers */
277
278 /* Search for an attached worker that matches the specified criteria. */
279 for (i = 0; i < max_logical_replication_workers; i++)
280 {
282
283 /* Skip parallel apply workers. */
285 continue;
286
287 if (w->in_use && w->subid == subid && w->relid == relid &&
288 w->type == wtype && (!only_running || w->proc))
289 {
290 res = w;
291 break;
292 }
293 }
294
295 return res;
296}

References Assert, fb(), i, LogicalRepWorker::in_use, isParallelApplyWorker, LogicalRepCtx, LWLockHeldByMe(), max_logical_replication_workers, OidIsValid, LogicalRepWorker::proc, LogicalRepWorker::relid, LogicalRepWorker::subid, LogicalRepWorker::type, LogicalRepCtxStruct::workers, and WORKERTYPE_TABLESYNC.

Referenced by ApplyLauncherMain(), FindDeletedTupleInLocalRel(), logicalrep_reset_seqsync_start_time(), logicalrep_worker_stop(), logicalrep_worker_wakeup(), ProcessSequencesForSync(), ProcessSyncingTablesForApply(), wait_for_table_state_change(), and wait_for_worker_state_change().

◆ logicalrep_worker_launch()

bool logicalrep_worker_launch ( LogicalRepWorkerType  wtype,
Oid  dbid,
Oid  subid,
const char subname,
Oid  userid,
Oid  relid,
dsm_handle  subworker_dsm,
bool  retain_dead_tuples 
)
extern

Definition at line 334 of file launcher.c.

338{
341 uint16 generation;
342 int i;
343 int slot = 0;
344 LogicalRepWorker *worker = NULL;
345 int nsyncworkers;
351
352 /*----------
353 * Sanity checks:
354 * - must be valid worker type
355 * - tablesync workers are only ones to have relid
356 * - parallel apply worker is the only kind of subworker
357 * - The replication slot used in conflict detection is created when
358 * retain_dead_tuples is enabled
359 */
364
366 (errmsg_internal("starting logical replication worker for subscription \"%s\"",
367 subname)));
368
369 /* Report this after the initial starting message for consistency. */
373 errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
374
375 /*
376 * We need to do the modification of the shared memory under lock so that
377 * we have consistent view.
378 */
380
381retry:
382 /* Find unused worker slot. */
383 for (i = 0; i < max_logical_replication_workers; i++)
384 {
386
387 if (!w->in_use)
388 {
389 worker = w;
390 slot = i;
391 break;
392 }
393 }
394
396
398
399 /*
400 * If we didn't find a free slot, try to do garbage collection. The
401 * reason we do this is because if some worker failed to start up and its
402 * parent has crashed while waiting, the in_use state was never cleared.
403 */
405 {
406 bool did_cleanup = false;
407
408 for (i = 0; i < max_logical_replication_workers; i++)
409 {
411
412 /*
413 * If the worker was marked in use but didn't manage to attach in
414 * time, clean it up.
415 */
416 if (w->in_use && !w->proc &&
419 {
421 "logical replication worker for subscription %u took too long to start; canceled",
422 w->subid);
423
425 did_cleanup = true;
426 }
427 }
428
429 if (did_cleanup)
430 goto retry;
431 }
432
433 /*
434 * We don't allow to invoke more sync workers once we have reached the
435 * sync worker limit per subscription. So, just return silently as we
436 * might get here because of an otherwise harmless race condition.
437 */
440 {
442 return false;
443 }
444
446
447 /*
448 * Return false if the number of parallel apply workers reached the limit
449 * per subscription.
450 */
453 {
455 return false;
456 }
457
458 /*
459 * However if there are no more free worker slots, inform user about it
460 * before exiting.
461 */
462 if (worker == NULL)
463 {
467 errmsg("out of logical replication worker slots"),
468 errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
469 return false;
470 }
471
472 /* Prepare the worker slot. */
473 worker->type = wtype;
474 worker->launch_time = now;
475 worker->in_use = true;
476 worker->generation++;
477 worker->proc = NULL;
478 worker->dbid = dbid;
479 worker->userid = userid;
480 worker->subid = subid;
481 worker->relid = relid;
484 worker->stream_fileset = NULL;
490 worker->last_lsn = InvalidXLogRecPtr;
495 worker->last_seqsync_start_time = 0;
496
497 /* Before releasing lock, remember generation for future identification. */
498 generation = worker->generation;
499
501
502 /* Register the new dynamic worker. */
503 memset(&bgw, 0, sizeof(bgw));
504 bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
506 bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
507 snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
508
509 switch (worker->type)
510 {
511 case WORKERTYPE_APPLY:
512 snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
513 snprintf(bgw.bgw_name, BGW_MAXLEN,
514 "logical replication apply worker for subscription %u",
515 subid);
516 snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
517 break;
518
520 snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
521 snprintf(bgw.bgw_name, BGW_MAXLEN,
522 "logical replication parallel apply worker for subscription %u",
523 subid);
524 snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
525
526 memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
527 break;
528
530 snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain");
531 snprintf(bgw.bgw_name, BGW_MAXLEN,
532 "logical replication sequencesync worker for subscription %u",
533 subid);
534 snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker");
535 break;
536
538 snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain");
539 snprintf(bgw.bgw_name, BGW_MAXLEN,
540 "logical replication tablesync worker for subscription %u sync %u",
541 subid,
542 relid);
543 snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
544 break;
545
547 /* Should never happen. */
548 elog(ERROR, "unknown worker type");
549 }
550
551 bgw.bgw_restart_time = BGW_NEVER_RESTART;
552 bgw.bgw_notify_pid = MyProcPid;
553 bgw.bgw_main_arg = Int32GetDatum(slot);
554
556 {
557 /* Failed to start worker, so clean up the worker slot. */
559 Assert(generation == worker->generation);
562
565 errmsg("out of background worker slots"),
566 errhint("You might need to increase \"%s\".", "max_worker_processes")));
567 return false;
568 }
569
570 /* Now wait until it attaches. */
571 return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
572}
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition bgworker.c:1068
#define BGW_NEVER_RESTART
Definition bgworker.h:92
@ BgWorkerStart_RecoveryFinished
Definition bgworker.h:88
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition bgworker.h:60
#define BGWORKER_SHMEM_ACCESS
Definition bgworker.h:53
#define BGW_MAXLEN
Definition bgworker.h:93
#define TIMESTAMP_NOBEGIN(j)
Definition timestamp.h:159
uint32 dsm_handle
Definition dsm_impl.h:55
int errhint(const char *fmt,...) pg_attribute_printf(1
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
int MyProcPid
Definition globals.c:49
static int logicalrep_pa_worker_count(Oid subid)
Definition launcher.c:961
static bool WaitForReplicationWorkerAttach(LogicalRepWorker *worker, uint16 generation, BackgroundWorkerHandle *handle)
Definition launcher.c:191
int logicalrep_sync_worker_count(Oid subid)
Definition launcher.c:937
int max_parallel_apply_workers_per_subscription
Definition launcher.c:56
static void logicalrep_worker_cleanup(LogicalRepWorker *worker)
Definition launcher.c:848
#define InvalidPid
Definition miscadmin.h:32
int max_active_replication_origins
Definition origin.c:106
NameData subname
#define snprintf
Definition port.h:260
static Datum Int32GetDatum(int32 X)
Definition postgres.h:212
ReplicationSlot * MyReplicationSlot
Definition slot.c:158
XLogRecPtr relstate_lsn
TimestampTz last_recv_time
TimestampTz launch_time
TimestampTz reply_time
FileSet * stream_fileset
TimestampTz last_send_time
ReplicationSlotPersistentData data
Definition slot.h:213
#define InvalidTransactionId
Definition transam.h:31
int wal_receiver_timeout
Definition walreceiver.c:91
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References Assert, BGW_MAXLEN, BGW_NEVER_RESTART, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, ReplicationSlot::data, LogicalRepWorker::dbid, DEBUG1, DSM_HANDLE_INVALID, elog, ereport, errcode(), errhint(), errmsg, errmsg_internal(), ERROR, fb(), LogicalRepWorker::generation, GetCurrentTimestamp(), i, LogicalRepWorker::in_use, Int32GetDatum(), InvalidPid, InvalidTransactionId, InvalidXLogRecPtr, LogicalRepWorker::last_lsn, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, LogicalRepWorker::last_seqsync_start_time, LogicalRepWorker::launch_time, LogicalRepWorker::leader_pid, logicalrep_pa_worker_count(), logicalrep_sync_worker_count(), logicalrep_worker_cleanup(), LogicalRepCtx, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, max_logical_replication_workers, max_parallel_apply_workers_per_subscription, max_sync_workers_per_subscription, MAXPGPATH, memcpy(), MyProcPid, MyReplicationSlot, now(), OidIsValid, LogicalRepWorker::oldest_nonremovable_xid, LogicalRepWorker::parallel_apply, LogicalRepWorker::proc, RegisterDynamicBackgroundWorker(), LogicalRepWorker::relid, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, LogicalRepWorker::reply_lsn, LogicalRepWorker::reply_time, snprintf, LogicalRepWorker::stream_fileset, LogicalRepWorker::subid, subname, TIMESTAMP_NOBEGIN, TimestampDifferenceExceeds(), LogicalRepWorker::type, LogicalRepWorker::userid, WaitForReplicationWorkerAttach(), wal_receiver_timeout, WARNING, LogicalRepCtxStruct::workers, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, WORKERTYPE_SEQUENCESYNC, WORKERTYPE_TABLESYNC, WORKERTYPE_UNKNOWN, and ReplicationSlotPersistentData::xmin.

Referenced by ApplyLauncherMain(), launch_sync_worker(), and pa_launch_parallel_worker().

◆ logicalrep_worker_stop()

void logicalrep_worker_stop ( LogicalRepWorkerType  wtype,
Oid  subid,
Oid  relid 
)
extern

Definition at line 662 of file launcher.c.

663{
664 LogicalRepWorker *worker;
665
666 /* relid must be valid only for table sync workers */
668
670
671 worker = logicalrep_worker_find(wtype, subid, relid, false);
672
673 if (worker)
674 {
677 }
678
680}

References Assert, fb(), isParallelApplyWorker, logicalrep_worker_find(), logicalrep_worker_stop_internal(), LW_SHARED, LWLockAcquire(), LWLockRelease(), OidIsValid, and WORKERTYPE_TABLESYNC.

Referenced by AlterSubscription_refresh(), and DropSubscription().

◆ logicalrep_worker_wakeup()

void logicalrep_worker_wakeup ( LogicalRepWorkerType  wtype,
Oid  subid,
Oid  relid 
)
extern

Definition at line 733 of file launcher.c.

734{
735 LogicalRepWorker *worker;
736
737 /* relid must be valid only for table sync workers */
739
741
742 worker = logicalrep_worker_find(wtype, subid, relid, true);
743
744 if (worker)
746
748}
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition launcher.c:756

References Assert, fb(), logicalrep_worker_find(), logicalrep_worker_wakeup_ptr(), LW_SHARED, LWLockAcquire(), LWLockRelease(), OidIsValid, and WORKERTYPE_TABLESYNC.

Referenced by apply_handle_stream_start(), and FinishSyncWorker().

◆ logicalrep_worker_wakeup_ptr()

void logicalrep_worker_wakeup_ptr ( LogicalRepWorker worker)
extern

◆ logicalrep_workers_find()

List * logicalrep_workers_find ( Oid  subid,
bool  only_running,
bool  acquire_lock 
)
extern

Definition at line 303 of file launcher.c.

304{
305 int i;
306 List *res = NIL;
307
308 if (acquire_lock)
310
312
313 /* Search for attached worker for a given subscription id. */
314 for (i = 0; i < max_logical_replication_workers; i++)
315 {
317
318 if (w->in_use && w->subid == subid && (!only_running || w->proc))
319 res = lappend(res, w);
320 }
321
322 if (acquire_lock)
324
325 return res;
326}

References Assert, fb(), i, LogicalRepWorker::in_use, lappend(), LogicalRepCtx, LW_SHARED, LWLockAcquire(), LWLockHeldByMe(), LWLockRelease(), max_logical_replication_workers, NIL, LogicalRepWorker::proc, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by AlterSubscription(), AtEOXact_LogicalRepWorkers(), DropSubscription(), and logicalrep_worker_detach().

◆ maybe_reread_subscription()

void maybe_reread_subscription ( void  )
extern

Definition at line 5045 of file worker.c.

5046{
5048 bool started_tx = false;
5049
5050 /* When cache state is valid there is nothing to do here. */
5052 return;
5053
5054 /* This function might be called inside or outside of transaction. */
5055 if (!IsTransactionState())
5056 {
5058 started_tx = true;
5059 }
5060
5062
5063 if (newsub)
5064 {
5066 }
5067 else
5068 {
5069 /*
5070 * Exit if the subscription was removed. This normally should not
5071 * happen as the worker gets killed during DROP SUBSCRIPTION.
5072 */
5073 ereport(LOG,
5074 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
5075 MySubscription->name)));
5076
5077 /* Ensure we remove no-longer-useful entry for worker's start time */
5080
5081 proc_exit(0);
5082 }
5083
5084 /* Exit if the subscription was disabled. */
5085 if (!newsub->enabled)
5086 {
5087 ereport(LOG,
5088 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
5089 MySubscription->name)));
5090
5092 }
5093
5094 /* !slotname should never happen when enabled is true. */
5095 Assert(newsub->slotname);
5096
5097 /* two-phase cannot be altered while the worker is running */
5098 Assert(newsub->twophasestate == MySubscription->twophasestate);
5099
5100 /*
5101 * Exit if any parameter that affects the remote connection was changed.
5102 * The launcher will start a new worker but note that the parallel apply
5103 * worker won't restart if the streaming option's value is changed from
5104 * 'parallel' to any other value or the server decides not to stream the
5105 * in-progress transaction.
5106 */
5107 if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
5108 strcmp(newsub->name, MySubscription->name) != 0 ||
5109 strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
5110 newsub->binary != MySubscription->binary ||
5111 newsub->stream != MySubscription->stream ||
5112 newsub->passwordrequired != MySubscription->passwordrequired ||
5113 strcmp(newsub->origin, MySubscription->origin) != 0 ||
5114 newsub->owner != MySubscription->owner ||
5115 !equal(newsub->publications, MySubscription->publications))
5116 {
5118 ereport(LOG,
5119 (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
5120 MySubscription->name)));
5121 else
5122 ereport(LOG,
5123 (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
5124 MySubscription->name)));
5125
5127 }
5128
5129 /*
5130 * Exit if the subscription owner's superuser privileges have been
5131 * revoked.
5132 */
5133 if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
5134 {
5136 ereport(LOG,
5137 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
5139 else
5140 ereport(LOG,
5141 errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
5143
5145 }
5146
5147 /* Check for other changes that should never happen too. */
5148 if (newsub->dbid != MySubscription->dbid)
5149 {
5150 elog(ERROR, "subscription %u changed unexpectedly",
5152 }
5153
5154 /* Clean old subscription info and switch to new one. */
5157
5158 /* Change synchronous commit according to the user's wishes */
5159 SetConfigOption("synchronous_commit", MySubscription->synccommit,
5161
5162 /* Change wal_receiver_timeout according to the user's wishes */
5164
5165 if (started_tx)
5167
5168 MySubscriptionValid = true;
5169}
bool equal(const void *a, const void *b)
Definition equalfuncs.c:223
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:472
static color newsub(struct colormap *cm, color co)
Definition regc_color.c:389

References am_leader_apply_worker(), am_parallel_apply_worker(), apply_worker_exit(), ApplyContext, ApplyLauncherForgetWorkerStartTime(), Assert, Subscription::binary, CommitTransactionCommand(), Subscription::conninfo, Subscription::cxt, Subscription::dbid, elog, equal(), ereport, errmsg, ERROR, fb(), GetSubscription(), IsTransactionState(), LOG, MemoryContextDelete(), MemoryContextSetParent(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, newsub(), Subscription::origin, Subscription::owner, Subscription::ownersuperuser, Subscription::passwordrequired, PGC_BACKEND, PGC_S_OVERRIDE, proc_exit(), Subscription::publications, set_wal_receiver_timeout(), SetConfigOption(), Subscription::slotname, StartTransactionCommand(), Subscription::stream, LogicalRepWorker::subid, Subscription::synccommit, and Subscription::twophasestate.

Referenced by apply_handle_commit_internal(), begin_replication_step(), LogicalRepApplyLoop(), and pa_can_start().

◆ pa_allocate_worker()

void pa_allocate_worker ( TransactionId  xid)
extern

Definition at line 474 of file applyparallelworker.c.

475{
476 bool found;
479
480 if (!pa_can_start())
481 return;
482
484 if (!winfo)
485 return;
486
487 /* First time through, initialize parallel apply worker state hashtable. */
489 {
490 HASHCTL ctl;
491
492 MemSet(&ctl, 0, sizeof(ctl));
493 ctl.keysize = sizeof(TransactionId);
494 ctl.entrysize = sizeof(ParallelApplyWorkerEntry);
495 ctl.hcxt = ApplyContext;
496
497 ParallelApplyTxnHash = hash_create("logical replication parallel apply workers hash",
498 16, &ctl,
500 }
501
502 /* Create an entry for the requested transaction. */
503 entry = hash_search(ParallelApplyTxnHash, &xid, HASH_ENTER, &found);
504 if (found)
505 elog(ERROR, "hash table corrupted");
506
507 /* Update the transaction information in shared memory. */
508 SpinLockAcquire(&winfo->shared->mutex);
510 winfo->shared->xid = xid;
511 SpinLockRelease(&winfo->shared->mutex);
512
513 winfo->in_use = true;
514 winfo->serialize_changes = false;
515 entry->winfo = winfo;
516}
static bool pa_can_start(void)
static HTAB * ParallelApplyTxnHash
static ParallelApplyWorkerInfo * pa_launch_parallel_worker(void)
#define MemSet(start, val, len)
Definition c.h:1107
uint32 TransactionId
Definition c.h:736
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition dynahash.c:889
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition dynahash.c:360
@ HASH_ENTER
Definition hsearch.h:109
#define HASH_CONTEXT
Definition hsearch.h:97
#define HASH_ELEM
Definition hsearch.h:90
#define HASH_BLOBS
Definition hsearch.h:92
tree ctl
Definition radixtree.h:1838
ParallelApplyWorkerInfo * winfo
ParallelTransState xact_state

References ApplyContext, ctl, elog, ERROR, fb(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), ParallelApplyWorkerInfo::in_use, MemSet, ParallelApplyWorkerShared::mutex, pa_can_start(), pa_launch_parallel_worker(), PARALLEL_TRANS_UNKNOWN, ParallelApplyTxnHash, ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, SpinLockAcquire(), SpinLockRelease(), ParallelApplyWorkerEntry::winfo, ParallelApplyWorkerShared::xact_state, and ParallelApplyWorkerShared::xid.

Referenced by apply_handle_stream_start().

◆ pa_decr_and_wait_stream_block()

void pa_decr_and_wait_stream_block ( void  )
extern

Definition at line 1601 of file applyparallelworker.c.

1602{
1604
1605 /*
1606 * It is only possible to not have any pending stream chunks when we are
1607 * applying spooled messages.
1608 */
1610 {
1612 return;
1613
1614 elog(ERROR, "invalid pending streaming chunk 0");
1615 }
1616
1618 {
1621 }
1622}
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerShared * MyParallelShared
static bool pa_has_spooled_message_pending(void)
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
Definition atomics.h:439
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
Definition atomics.h:237
pg_atomic_uint32 pending_stream_count

References AccessShareLock, am_parallel_apply_worker(), Assert, elog, ERROR, MyParallelShared, pa_has_spooled_message_pending(), pa_lock_stream(), pa_unlock_stream(), ParallelApplyWorkerShared::pending_stream_count, pg_atomic_read_u32(), pg_atomic_sub_fetch_u32(), and ParallelApplyWorkerShared::xid.

Referenced by apply_handle_stream_abort(), and apply_handle_stream_stop().

◆ pa_detach_all_error_mq()

void pa_detach_all_error_mq ( void  )
extern

Definition at line 626 of file applyparallelworker.c.

627{
628 ListCell *lc;
629
631 {
633
634 if (winfo->error_mq_handle)
635 {
637 winfo->error_mq_handle = NULL;
638 }
639 }
640}
static List * ParallelApplyWorkerPool
#define lfirst(lc)
Definition pg_list.h:172

References ParallelApplyWorkerInfo::error_mq_handle, fb(), lfirst, ParallelApplyWorkerPool, and shm_mq_detach().

Referenced by logicalrep_worker_detach().

◆ pa_find_worker()

ParallelApplyWorkerInfo * pa_find_worker ( TransactionId  xid)
extern

Definition at line 522 of file applyparallelworker.c.

523{
524 bool found;
526
527 if (!TransactionIdIsValid(xid))
528 return NULL;
529
531 return NULL;
532
533 /* Return the cached parallel apply worker if valid. */
535 return stream_apply_worker;
536
537 /* Find an entry for the requested transaction. */
538 entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
539 if (found)
540 {
541 /* The worker must not have exited. */
542 Assert(entry->winfo->in_use);
543 return entry->winfo;
544 }
545
546 return NULL;
547}
static ParallelApplyWorkerInfo * stream_apply_worker
@ HASH_FIND
Definition hsearch.h:108

References Assert, fb(), HASH_FIND, hash_search(), ParallelApplyWorkerInfo::in_use, ParallelApplyTxnHash, stream_apply_worker, TransactionIdIsValid, and ParallelApplyWorkerEntry::winfo.

Referenced by get_transaction_apply_action().

◆ pa_lock_stream()

void pa_lock_stream ( TransactionId  xid,
LOCKMODE  lockmode 
)
extern

◆ pa_lock_transaction()

void pa_lock_transaction ( TransactionId  xid,
LOCKMODE  lockmode 
)
extern

◆ pa_reset_subtrans()

void pa_reset_subtrans ( void  )
extern

Definition at line 1412 of file applyparallelworker.c.

1413{
1414 /*
1415 * We don't need to free this explicitly as the allocated memory will be
1416 * freed at the transaction end.
1417 */
1418 subxactlist = NIL;
1419}
static List * subxactlist

References NIL, and subxactlist.

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), and pa_stream_abort().

◆ pa_send_data()

bool pa_send_data ( ParallelApplyWorkerInfo winfo,
Size  nbytes,
const void data 
)
extern

Definition at line 1156 of file applyparallelworker.c.

1157{
1158 int rc;
1160 TimestampTz startTime = 0;
1161
1163 Assert(!winfo->serialize_changes);
1164
1165 /*
1166 * We don't try to send data to parallel worker for 'immediate' mode. This
1167 * is primarily used for testing purposes.
1168 */
1170 return false;
1171
1172/*
1173 * This timeout is a bit arbitrary but testing revealed that it is sufficient
1174 * to send the message unless the parallel apply worker is waiting on some
1175 * lock or there is a serious resource crunch. See the comments atop this file
1176 * to know why we are using a non-blocking way to send the message.
1177 */
1178#define SHM_SEND_RETRY_INTERVAL_MS 1000
1179#define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)
1180
1181 for (;;)
1182 {
1183 result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true);
1184
1185 if (result == SHM_MQ_SUCCESS)
1186 return true;
1187 else if (result == SHM_MQ_DETACHED)
1188 ereport(ERROR,
1190 errmsg("could not send data to shared-memory queue")));
1191
1193
1194 /* Wait before retrying. */
1195 rc = WaitLatch(MyLatch,
1199
1200 if (rc & WL_LATCH_SET)
1201 {
1204 }
1205
1206 if (startTime == 0)
1207 startTime = GetCurrentTimestamp();
1208 else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
1210 return false;
1211 }
1212}
#define SHM_SEND_TIMEOUT_MS
#define SHM_SEND_RETRY_INTERVAL_MS
#define unlikely(x)
Definition c.h:438
uint32 result
struct Latch * MyLatch
Definition globals.c:65
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
const void * data
int debug_logical_replication_streaming
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Definition shm_mq.c:331
shm_mq_result
Definition shm_mq.h:39
@ SHM_MQ_SUCCESS
Definition shm_mq.h:40
@ SHM_MQ_WOULD_BLOCK
Definition shm_mq.h:41
@ SHM_MQ_DETACHED
Definition shm_mq.h:42
#define WL_TIMEOUT
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET

References Assert, CHECK_FOR_INTERRUPTS, data, DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE, debug_logical_replication_streaming, ereport, errcode(), errmsg, ERROR, fb(), GetCurrentTimestamp(), IsTransactionState(), ParallelApplyWorkerInfo::mq_handle, MyLatch, ResetLatch(), result, ParallelApplyWorkerInfo::serialize_changes, SHM_MQ_DETACHED, shm_mq_send(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, SHM_SEND_RETRY_INTERVAL_MS, SHM_SEND_TIMEOUT_MS, TimestampDifferenceExceeds(), unlikely, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().

◆ pa_set_fileset_state()

◆ pa_set_stream_apply_worker()

void pa_set_stream_apply_worker ( ParallelApplyWorkerInfo winfo)
extern

Definition at line 1344 of file applyparallelworker.c.

1345{
1346 stream_apply_worker = winfo;
1347}

References stream_apply_worker.

Referenced by apply_handle_stream_start(), and apply_handle_stream_stop().

◆ pa_set_xact_state()

void pa_set_xact_state ( ParallelApplyWorkerShared wshared,
ParallelTransState  xact_state 
)
extern

Definition at line 1317 of file applyparallelworker.c.

1319{
1320 SpinLockAcquire(&wshared->mutex);
1321 wshared->xact_state = xact_state;
1322 SpinLockRelease(&wshared->mutex);
1323}

References fb(), SpinLockAcquire(), and SpinLockRelease().

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), and pa_stream_abort().

◆ pa_start_subtrans()

void pa_start_subtrans ( TransactionId  current_xid,
TransactionId  top_xid 
)
extern

Definition at line 1372 of file applyparallelworker.c.

1373{
1374 if (current_xid != top_xid &&
1376 {
1378 char spname[NAMEDATALEN];
1379
1381 spname, sizeof(spname));
1382
1383 elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
1384
1385 /* We must be in transaction block to define the SAVEPOINT. */
1386 if (!IsTransactionBlock())
1387 {
1388 if (!IsTransactionState())
1390
1393 }
1394
1396
1397 /*
1398 * CommitTransactionCommand is needed to start a subtransaction after
1399 * issuing a SAVEPOINT inside a transaction block (see
1400 * StartSubTransaction()).
1401 */
1403
1407 }
1408}
static void pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
List * lappend_xid(List *list, TransactionId datum)
Definition list.c:393
bool list_member_xid(const List *list, TransactionId datum)
Definition list.c:742
#define NAMEDATALEN
void DefineSavepoint(const char *name)
Definition xact.c:4424
bool IsTransactionBlock(void)
Definition xact.c:5022
void BeginTransactionBlock(void)
Definition xact.c:3975

References BeginTransactionBlock(), CommitTransactionCommand(), DEBUG1, DefineSavepoint(), elog, fb(), IsTransactionBlock(), IsTransactionState(), lappend_xid(), list_member_xid(), MemoryContextSwitchTo(), MySubscription, NAMEDATALEN, Subscription::oid, pa_savepoint_name(), StartTransactionCommand(), subxactlist, and TopTransactionContext.

Referenced by handle_streamed_transaction().

◆ pa_stream_abort()

void pa_stream_abort ( LogicalRepStreamAbortData abort_data)
extern

Definition at line 1426 of file applyparallelworker.c.

1427{
1428 TransactionId xid = abort_data->xid;
1429 TransactionId subxid = abort_data->subxid;
1430
1431 /*
1432 * Update origin state so we can restart streaming from correct position
1433 * in case of crash.
1434 */
1437
1438 /*
1439 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1440 * just free the subxactlist.
1441 */
1442 if (subxid == xid)
1443 {
1445
1446 /*
1447 * Release the lock as we might be processing an empty streaming
1448 * transaction in which case the lock won't be released during
1449 * transaction rollback.
1450 *
1451 * Note that it's ok to release the transaction lock before aborting
1452 * the transaction because even if the parallel apply worker dies due
1453 * to crash or some other reason, such a transaction would still be
1454 * considered aborted.
1455 */
1457
1459
1460 if (IsTransactionBlock())
1461 {
1462 EndTransactionBlock(false);
1464 }
1465
1467
1469 }
1470 else
1471 {
1472 /* OK, so it's a subxact. Rollback to the savepoint. */
1473 int i;
1474 char spname[NAMEDATALEN];
1475
1476 pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname));
1477
1478 elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
1479
1480 /*
1481 * Search the subxactlist, determine the offset tracked for the
1482 * subxact, and truncate the list.
1483 *
1484 * Note that for an empty sub-transaction we won't find the subxid
1485 * here.
1486 */
1487 for (i = list_length(subxactlist) - 1; i >= 0; i--)
1488 {
1490
1491 if (xid_tmp == subxid)
1492 {
1496 break;
1497 }
1498 }
1499 }
1500}
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_reset_subtrans(void)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
@ STATE_IDLE
List * list_truncate(List *list, int new_size)
Definition list.c:631
#define AccessExclusiveLock
Definition lockdefs.h:43
ReplOriginXactState replorigin_xact_state
Definition origin.c:168
static int list_length(const List *l)
Definition pg_list.h:152
static ListCell * list_nth_cell(const List *list, int n)
Definition pg_list.h:309
#define lfirst_xid(lc)
Definition pg_list.h:175
XLogRecPtr origin_lsn
Definition origin.h:46
TimestampTz origin_timestamp
Definition origin.h:47
void RollbackToSavepoint(const char *name)
Definition xact.c:4618
bool EndTransactionBlock(bool chain)
Definition xact.c:4095
void AbortCurrentTransaction(void)
Definition xact.c:3501

References AbortCurrentTransaction(), AccessExclusiveLock, CommitTransactionCommand(), DEBUG1, elog, EndTransactionBlock(), fb(), i, IsTransactionBlock(), lfirst_xid, list_length(), list_nth_cell(), list_truncate(), MyParallelShared, MySubscription, NAMEDATALEN, Subscription::oid, ReplOriginXactState::origin_lsn, ReplOriginXactState::origin_timestamp, pa_reset_subtrans(), pa_savepoint_name(), pa_set_xact_state(), pa_unlock_transaction(), PARALLEL_TRANS_FINISHED, pgstat_report_activity(), replorigin_xact_state, RollbackToSavepoint(), STATE_IDLE, and subxactlist.

Referenced by apply_handle_stream_abort().

◆ pa_switch_to_partial_serialize()

void pa_switch_to_partial_serialize ( ParallelApplyWorkerInfo winfo,
bool  stream_locked 
)
extern

Definition at line 1221 of file applyparallelworker.c.

1223{
1224 ereport(LOG,
1225 (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
1226 winfo->shared->xid)));
1227
1228 /*
1229 * The parallel apply worker could be stuck for some reason (say waiting
1230 * on some lock by other backend), so stop trying to send data directly to
1231 * it and start serializing data to the file instead.
1232 */
1233 winfo->serialize_changes = true;
1234
1235 /* Initialize the stream fileset. */
1236 stream_start_internal(winfo->shared->xid, true);
1237
1238 /*
1239 * Acquires the stream lock if not already to make sure that the parallel
1240 * apply worker will wait for the leader to release the stream lock until
1241 * the end of the transaction.
1242 */
1243 if (!stream_locked)
1245
1247}
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void stream_start_internal(TransactionId xid, bool first_segment)
Definition worker.c:1694

References AccessExclusiveLock, ereport, errmsg, fb(), FS_SERIALIZE_IN_PROGRESS, LOG, pa_lock_stream(), pa_set_fileset_state(), ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, stream_start_internal(), and ParallelApplyWorkerShared::xid.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().

◆ pa_unlock_stream()

void pa_unlock_stream ( TransactionId  xid,
LOCKMODE  lockmode 
)
extern

◆ pa_unlock_transaction()

◆ pa_xact_finish()

void pa_xact_finish ( ParallelApplyWorkerInfo winfo,
XLogRecPtr  remote_lsn 
)
extern

Definition at line 1628 of file applyparallelworker.c.

1629{
1631
1632 /*
1633 * Unlock the shared object lock so that parallel apply worker can
1634 * continue to receive and apply changes.
1635 */
1637
1638 /*
1639 * Wait for that worker to finish. This is necessary to maintain commit
1640 * order which avoids failures due to transaction dependencies and
1641 * deadlocks.
1642 */
1644
1645 if (XLogRecPtrIsValid(remote_lsn))
1646 store_flush_position(remote_lsn, winfo->shared->last_commit_end);
1647
1648 pa_free_worker(winfo);
1649}
static void pa_free_worker(ParallelApplyWorkerInfo *winfo)
static void pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition worker.c:3946

References AccessExclusiveLock, am_leader_apply_worker(), Assert, ParallelApplyWorkerShared::last_commit_end, pa_free_worker(), pa_unlock_stream(), pa_wait_for_xact_finish(), ParallelApplyWorkerInfo::shared, store_flush_position(), ParallelApplyWorkerShared::xid, and XLogRecPtrIsValid.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), and apply_handle_stream_prepare().

◆ ProcessSequencesForSync()

void ProcessSequencesForSync ( void  )
extern

Definition at line 96 of file sequencesync.c.

97{
99 int nsyncworkers;
101 bool started_tx;
102
104
105 if (started_tx)
106 {
108 pgstat_report_stat(true);
109 }
110
112 return;
113
115
116 /* Check if there is a sequencesync worker already running? */
119 InvalidOid, true);
121 {
123 return;
124 }
125
126 /*
127 * Count running sync workers for this subscription, while we have the
128 * lock.
129 */
132
133 /*
134 * It is okay to read/update last_seqsync_start_time here in apply worker
135 * as we have already ensured that sync worker doesn't exist.
136 */
139}
void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, TimestampTz *last_start_time)
Definition syncutils.c:118

References CommitTransactionCommand(), fb(), FetchRelationStates(), InvalidOid, LogicalRepWorker::last_seqsync_start_time, launch_sync_worker(), logicalrep_sync_worker_count(), logicalrep_worker_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLogicalRepWorker, pgstat_report_stat(), LogicalRepWorker::subid, and WORKERTYPE_SEQUENCESYNC.

Referenced by ProcessSyncingRelations().

◆ ProcessSyncingRelations()

void ProcessSyncingRelations ( XLogRecPtr  current_lsn)
extern

Definition at line 156 of file syncutils.c.

157{
158 switch (MyLogicalRepWorker->type)
159 {
161
162 /*
163 * Skip for parallel apply workers because they only operate on
164 * tables that are in a READY state. See pa_can_start() and
165 * should_apply_changes_for_rel().
166 */
167 break;
168
171 break;
172
173 case WORKERTYPE_APPLY:
176 break;
177
179 /* Should never happen. */
180 elog(ERROR, "sequence synchronization worker is not expected to process relations");
181 break;
182
184 /* Should never happen. */
185 elog(ERROR, "Unknown worker type");
186 }
187}
void ProcessSequencesForSync(void)
void ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
Definition tablesync.c:246
void ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
Definition tablesync.c:368

References elog, ERROR, fb(), MyLogicalRepWorker, ProcessSequencesForSync(), ProcessSyncingTablesForApply(), ProcessSyncingTablesForSync(), LogicalRepWorker::type, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, WORKERTYPE_SEQUENCESYNC, WORKERTYPE_TABLESYNC, and WORKERTYPE_UNKNOWN.

Referenced by apply_handle_commit(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), apply_handle_stream_commit(), apply_handle_stream_prepare(), and LogicalRepApplyLoop().

◆ ProcessSyncingTablesForApply()

void ProcessSyncingTablesForApply ( XLogRecPtr  current_lsn)
extern

Definition at line 368 of file tablesync.c.

369{
371 {
372 Oid relid;
373 TimestampTz last_start_time;
374 };
375 static HTAB *last_start_times = NULL;
376 ListCell *lc;
377 bool started_tx;
378 bool should_exit = false;
379 Relation rel = NULL;
380
382
383 /* We need up-to-date sync state info for subscription tables here. */
385
386 /*
387 * Prepare a hash table for tracking last start times of workers, to avoid
388 * immediate restarts. We don't need it if there are no tables that need
389 * syncing.
390 */
392 {
393 HASHCTL ctl;
394
395 ctl.keysize = sizeof(Oid);
396 ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
397 last_start_times = hash_create("Logical replication table sync worker start times",
398 256, &ctl, HASH_ELEM | HASH_BLOBS);
399 }
400
401 /*
402 * Clean up the hash table when we're done with all tables (just to
403 * release the bit of memory).
404 */
406 {
409 }
410
411 /*
412 * Process all tables that are being synchronized.
413 */
414 foreach(lc, table_states_not_ready)
415 {
417
418 if (!started_tx)
419 {
421 started_tx = true;
422 }
423
425
426 if (rstate->state == SUBREL_STATE_SYNCDONE)
427 {
428 /*
429 * Apply has caught up to the position where the table sync has
430 * finished. Mark the table as ready so that the apply will just
431 * continue to replicate it normally.
432 */
433 if (current_lsn >= rstate->lsn)
434 {
436
437 rstate->state = SUBREL_STATE_READY;
438 rstate->lsn = current_lsn;
439
440 /*
441 * Remove the tablesync origin tracking if exists.
442 *
443 * There is a chance that the user is concurrently performing
444 * refresh for the subscription where we remove the table
445 * state and its origin or the tablesync worker would have
446 * already removed this origin. We can't rely on tablesync
447 * worker to remove the origin tracking as if there is any
448 * error while dropping we won't restart it to drop the
449 * origin. So passing missing_ok = true.
450 *
451 * Lock the subscription and origin in the same order as we
452 * are doing during DDL commands to avoid deadlocks. See
453 * AlterSubscription_refresh.
454 */
456 0, AccessShareLock);
457
458 if (!rel)
460
462 rstate->relid,
464 sizeof(originname));
466
467 /*
468 * Update the state to READY only after the origin cleanup.
469 */
471 rstate->relid, rstate->state,
472 rstate->lsn, true);
473 }
474 }
475 else
476 {
478
479 /*
480 * Look for a sync worker for this relation.
481 */
483
486 rstate->relid, false);
487
488 if (syncworker)
489 {
490 /* Found one, update our copy of its state */
491 SpinLockAcquire(&syncworker->relmutex);
492 rstate->state = syncworker->relstate;
493 rstate->lsn = syncworker->relstate_lsn;
494 if (rstate->state == SUBREL_STATE_SYNCWAIT)
495 {
496 /*
497 * Sync worker is waiting for apply. Tell sync worker it
498 * can catchup now.
499 */
501 syncworker->relstate_lsn =
502 Max(syncworker->relstate_lsn, current_lsn);
503 }
504 SpinLockRelease(&syncworker->relmutex);
505
506 /* If we told worker to catch up, wait for it. */
507 if (rstate->state == SUBREL_STATE_SYNCWAIT)
508 {
509 /* Signal the sync worker, as it may be waiting for us. */
510 if (syncworker->proc)
512
513 /* Now safe to release the LWLock */
515
516 if (started_tx)
517 {
518 /*
519 * We must commit the existing transaction to release
520 * the existing locks before entering a busy loop.
521 * This is required to avoid any undetected deadlocks
522 * due to any existing lock as deadlock detector won't
523 * be able to detect the waits on the latch.
524 *
525 * Also close any tables prior to the commit.
526 */
527 if (rel)
528 {
529 table_close(rel, NoLock);
530 rel = NULL;
531 }
533 pgstat_report_stat(false);
534 }
535
536 /*
537 * Enter busy loop and wait for synchronization worker to
538 * reach expected state (or die trying).
539 */
541 started_tx = true;
542
545 }
546 else
548 }
549 else
550 {
551 /*
552 * If there is no sync worker for this table yet, count
553 * running sync workers for this subscription, while we have
554 * the lock.
555 */
556 int nsyncworkers =
559 bool found;
560
561 /* Now safe to release the LWLock */
563
565 HASH_ENTER, &found);
566 if (!found)
567 hentry->last_start_time = 0;
568
570 rstate->relid, &hentry->last_start_time);
571 }
572 }
573 }
574
575 /* Close table if opened */
576 if (rel)
577 table_close(rel, NoLock);
578
579
580 if (started_tx)
581 {
582 /*
583 * Even when the two_phase mode is requested by the user, it remains
584 * as 'pending' until all tablesyncs have reached READY state.
585 *
586 * When this happens, we restart the apply worker and (if the
587 * conditions are still ok) then the two_phase tri-state will become
588 * 'enabled' at that time.
589 *
590 * Note: If the subscription has no tables then leave the state as
591 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
592 * work.
593 */
595 {
596 CommandCounterIncrement(); /* make updates visible */
597 if (AllTablesyncsReady())
598 {
599 ereport(LOG,
600 (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
602 should_exit = true;
603 }
604 }
605
607 pgstat_report_stat(true);
608 }
609
610 if (should_exit)
611 {
612 /*
613 * Reset the last-start time for this worker so that the launcher will
614 * restart it without waiting for wal_retrieve_retry_interval.
615 */
617
618 proc_exit(0);
619 }
620}
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition worker.c:648
#define Max(x, y)
Definition c.h:1085
void hash_destroy(HTAB *hashp)
Definition dynahash.c:802
static dshash_table * last_start_times
Definition launcher.c:101
#define NoLock
Definition lockdefs.h:34
#define RowExclusiveLock
Definition lockdefs.h:38
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition origin.c:459
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)
unsigned int Oid
Size keysize
Definition hsearch.h:69
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
bool AllTablesyncsReady(void)
Definition tablesync.c:1629
static bool wait_for_table_state_change(Oid relid, char expected_state)
Definition tablesync.c:142
void CommandCounterIncrement(void)
Definition xact.c:1130

References AccessShareLock, AllTablesyncsReady(), ApplyLauncherForgetWorkerStartTime(), Assert, CommandCounterIncrement(), CommitTransactionCommand(), ctl, ereport, errmsg, fb(), FetchRelationStates(), get_rel_relkind(), HASH_BLOBS, hash_create(), hash_destroy(), HASH_ELEM, HASH_ENTER, hash_search(), IsTransactionState(), HASHCTL::keysize, last_start_times, launch_sync_worker(), lfirst, LockSharedObject(), LOG, logicalrep_sync_worker_count(), logicalrep_worker_find(), logicalrep_worker_wakeup_ptr(), SubscriptionRelState::lsn, LW_SHARED, LWLockAcquire(), LWLockRelease(), Max, MyLogicalRepWorker, MySubscription, Subscription::name, NAMEDATALEN, NIL, NoLock, Subscription::oid, pgstat_report_stat(), proc_exit(), SubscriptionRelState::relid, ReplicationOriginNameForLogicalRep(), replorigin_drop_by_name(), RowExclusiveLock, SpinLockAcquire(), SpinLockRelease(), StartTransactionCommand(), SubscriptionRelState::state, LogicalRepWorker::subid, table_close(), table_open(), table_states_not_ready, Subscription::twophasestate, UpdateSubscriptionRelState(), wait_for_table_state_change(), and WORKERTYPE_TABLESYNC.

Referenced by ProcessSyncingRelations().

◆ ProcessSyncingTablesForSync()

void ProcessSyncingTablesForSync ( XLogRecPtr  current_lsn)
extern

Definition at line 246 of file tablesync.c.

247{
249
252 {
253 TimeLineID tli;
254 char syncslotname[NAMEDATALEN] = {0};
255 char originname[NAMEDATALEN] = {0};
256
259
261
262 /*
263 * UpdateSubscriptionRelState must be called within a transaction.
264 */
265 if (!IsTransactionState())
267
272 false);
273
274 /*
275 * End streaming so that LogRepWorkerWalRcvConn can be used to drop
276 * the slot.
277 */
279
280 /*
281 * Cleanup the tablesync slot.
282 *
283 * This has to be done after updating the state because otherwise if
284 * there is an error while doing the database operations we won't be
285 * able to rollback dropped slot.
286 */
290 sizeof(syncslotname));
291
292 /*
293 * It is important to give an error if we are unable to drop the slot,
294 * otherwise, it won't be dropped till the corresponding subscription
295 * is dropped. So passing missing_ok = false.
296 */
298
300 pgstat_report_stat(false);
301
302 /*
303 * Start a new transaction to clean up the tablesync origin tracking.
304 * This transaction will be ended within the FinishSyncWorker(). Now,
305 * even, if we fail to remove this here, the apply worker will ensure
306 * to clean it up afterward.
307 *
308 * We need to do this after the table state is set to SYNCDONE.
309 * Otherwise, if an error occurs while performing the database
310 * operation, the worker will be restarted and the in-memory state of
311 * replication progress (remote_lsn) won't be rolled-back which would
312 * have been cleared before restart. So, the restarted worker will use
313 * invalid replication progress state resulting in replay of
314 * transactions that have already been applied.
315 */
317
321 sizeof(originname));
322
323 /*
324 * Resetting the origin session removes the ownership of the slot.
325 * This is needed to allow the origin to be dropped.
326 */
329
330 /*
331 * Drop the tablesync's origin tracking if exists.
332 *
333 * There is a chance that the user is concurrently performing refresh
334 * for the subscription where we remove the table state and its origin
335 * or the apply worker would have removed this origin. So passing
336 * missing_ok = true.
337 */
339
341 }
342 else
344}
WalReceiverConn * LogRepWorkerWalRcvConn
Definition worker.c:482
void replorigin_session_reset(void)
Definition origin.c:1301
void replorigin_xact_clear(bool clear_origin)
Definition origin.c:1377
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
pg_noreturn void FinishSyncWorker(void)
Definition syncutils.c:50
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition tablesync.c:1235
#define walrcv_endstreaming(conn, next_tli)
uint32 TimeLineID
Definition xlogdefs.h:63

References CommitTransactionCommand(), fb(), FinishSyncWorker(), IsTransactionState(), LogRepWorkerWalRcvConn, MyLogicalRepWorker, NAMEDATALEN, pgstat_report_stat(), LogicalRepWorker::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_drop_by_name(), replorigin_session_reset(), replorigin_xact_clear(), SpinLockAcquire(), SpinLockRelease(), StartTransactionCommand(), LogicalRepWorker::subid, UpdateSubscriptionRelState(), and walrcv_endstreaming.

Referenced by ProcessSyncingRelations().

◆ ReplicationOriginNameForLogicalRep()

void ReplicationOriginNameForLogicalRep ( Oid  suboid,
Oid  relid,
char originname,
Size  szoriginname 
)
extern

Definition at line 648 of file worker.c.

650{
651 if (OidIsValid(relid))
652 {
653 /* Replication origin name for tablesync workers. */
654 snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
655 }
656 else
657 {
658 /* Replication origin name for non-tablesync workers. */
660 }
661}

References fb(), OidIsValid, and snprintf.

Referenced by AlterSubscription(), AlterSubscription_refresh(), binary_upgrade_replorigin_advance(), CreateSubscription(), DropSubscription(), LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), ProcessSyncingTablesForApply(), ProcessSyncingTablesForSync(), run_apply_worker(), and run_tablesync_worker().

◆ set_apply_error_context_origin()

void set_apply_error_context_origin ( char originname)
extern

Definition at line 6364 of file worker.c.

6365{
6367 originname);
6368}
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition mcxt.c:1768

References apply_error_callback_arg, ApplyContext, fb(), MemoryContextStrdup(), and ApplyErrorCallbackArg::origin_name.

Referenced by ParallelApplyWorkerMain(), run_apply_worker(), and run_tablesync_worker().

◆ set_stream_options()

void set_stream_options ( WalRcvStreamOptions options,
char slotname,
XLogRecPtr origin_startpos 
)
extern

Definition at line 5557 of file worker.c.

5560{
5561 int server_version;
5562
5563 options->logical = true;
5564 options->startpoint = *origin_startpos;
5565 options->slotname = slotname;
5566
5568 options->proto.logical.proto_version =
5573
5574 options->proto.logical.publication_names = MySubscription->publications;
5575 options->proto.logical.binary = MySubscription->binary;
5576
5577 /*
5578 * Assign the appropriate option value for streaming option according to
5579 * the 'streaming' mode and the publisher's ability to support that mode.
5580 */
5581 if (server_version >= 160000 &&
5583 {
5584 options->proto.logical.streaming_str = "parallel";
5586 }
5587 else if (server_version >= 140000 &&
5589 {
5590 options->proto.logical.streaming_str = "on";
5592 }
5593 else
5594 {
5595 options->proto.logical.streaming_str = NULL;
5597 }
5598
5599 options->proto.logical.twophase = false;
5600 options->proto.logical.origin = pstrdup(MySubscription->origin);
5601}
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
#define LOGICALREP_PROTO_VERSION_NUM
char * pstrdup(const char *in)
Definition mcxt.c:1781
static int server_version
Definition pg_dumpall.c:122
#define walrcv_server_version(conn)

References Subscription::binary, fb(), LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM, LOGICALREP_PROTO_STREAM_VERSION_NUM, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM, LOGICALREP_PROTO_VERSION_NUM, LogRepWorkerWalRcvConn, MyLogicalRepWorker, MySubscription, Subscription::origin, LogicalRepWorker::parallel_apply, pstrdup(), Subscription::publications, server_version, Subscription::stream, and walrcv_server_version.

Referenced by run_apply_worker(), and run_tablesync_worker().

◆ SetupApplyOrSyncWorker()

void SetupApplyOrSyncWorker ( int  worker_slot)
extern

Definition at line 5947 of file worker.c.

5948{
5949 /* Attach to slot */
5951
5953
5954 /* Setup signal handling */
5957
5958 /*
5959 * We don't currently need any ResourceOwner in a walreceiver process, but
5960 * if we did, we could call CreateAuxProcessResourceOwner here.
5961 */
5962
5963 /* Initialise stats to a sanish value */
5966
5967 /* Load the libpq-specific functions */
5968 load_file("libpqwalreceiver", false);
5969
5971
5972 /* Connect to the origin and start the replication. */
5973 elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
5975
5976 /*
5977 * Setup callback for syscache so that we know when something changes in
5978 * the subscription relation state.
5979 */
5982 (Datum) 0);
5983}
void InitializeLogRepWorker(void)
Definition worker.c:5779
void BackgroundWorkerUnblockSignals(void)
Definition bgworker.c:949
void load_file(const char *filename, bool restricted)
Definition dfmgr.c:149
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
void logicalrep_worker_attach(int slot)
Definition launcher.c:767
#define pqsignal
Definition port.h:547
void InvalidateSyncingRelStates(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
Definition syncutils.c:101
#define SIGHUP
Definition win32_port.h:158

References am_leader_apply_worker(), am_sequencesync_worker(), am_tablesync_worker(), Assert, BackgroundWorkerUnblockSignals(), CacheRegisterSyscacheCallback(), Subscription::conninfo, DEBUG1, elog, fb(), GetCurrentTimestamp(), InitializeLogRepWorker(), InvalidateSyncingRelStates(), LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), logicalrep_worker_attach(), MyLogicalRepWorker, MySubscription, pqsignal, LogicalRepWorker::reply_time, SIGHUP, and SignalHandlerForConfigReload().

Referenced by ApplyWorkerMain(), SequenceSyncWorkerMain(), and TableSyncWorkerMain().

◆ start_apply()

void start_apply ( XLogRecPtr  origin_startpos)
extern

Definition at line 5626 of file worker.c.

5627{
5628 PG_TRY();
5629 {
5631 }
5632 PG_CATCH();
5633 {
5634 /*
5635 * Reset the origin state to prevent the advancement of origin
5636 * progress if we fail to apply. Otherwise, this will result in
5637 * transaction loss as that transaction won't be sent again by the
5638 * server.
5639 */
5641
5644 else
5645 {
5646 /*
5647 * Report the worker failed while applying changes. Abort the
5648 * current transaction so that the stats message is sent in an
5649 * idle state.
5650 */
5653
5654 PG_RE_THROW();
5655 }
5656 }
5657 PG_END_TRY();
5658}
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition worker.c:3988
void DisableSubscriptionAndExit(void)
Definition worker.c:6007
#define PG_RE_THROW()
Definition elog.h:407
#define PG_TRY(...)
Definition elog.h:374
#define PG_END_TRY(...)
Definition elog.h:399
#define PG_CATCH(...)
Definition elog.h:384

References AbortOutOfAnyTransaction(), Subscription::disableonerr, DisableSubscriptionAndExit(), fb(), LogicalRepApplyLoop(), MySubscription, Subscription::oid, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgstat_report_subscription_error(), and replorigin_xact_clear().

Referenced by run_apply_worker(), and run_tablesync_worker().

◆ store_flush_position()

void store_flush_position ( XLogRecPtr  remote_lsn,
XLogRecPtr  local_lsn 
)
extern

Definition at line 3946 of file worker.c.

3947{
3949
3950 /*
3951 * Skip for parallel apply workers, because the lsn_mapping is maintained
3952 * by the leader apply worker.
3953 */
3955 return;
3956
3957 /* Need to do this in permanent context */
3959
3960 /* Track commit lsn */
3962 flushpos->local_end = local_lsn;
3963 flushpos->remote_end = remote_lsn;
3964
3967}
static dlist_head lsn_mapping
Definition worker.c:313
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition ilist.h:364

References am_parallel_apply_worker(), ApplyContext, ApplyMessageContext, dlist_push_tail(), fb(), lsn_mapping, MemoryContextSwitchTo(), and palloc_object.

Referenced by apply_handle_commit_internal(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), apply_handle_stream_prepare(), and pa_xact_finish().

◆ stream_cleanup_files()

void stream_cleanup_files ( Oid  subid,
TransactionId  xid 
)
extern

Definition at line 5424 of file worker.c.

5425{
5426 char path[MAXPGPATH];
5427
5428 /* Delete the changes file. */
5429 changes_filename(path, subid, xid);
5431
5432 /* Delete the subxact file, if it exists. */
5433 subxact_filename(path, subid, xid);
5435}
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition worker.c:5403
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
Definition buffile.c:365

References BufFileDeleteFileSet(), changes_filename(), MAXPGPATH, MyLogicalRepWorker, LogicalRepWorker::stream_fileset, and subxact_filename().

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), pa_free_worker_info(), and stream_abort_internal().

◆ stream_start_internal()

void stream_start_internal ( TransactionId  xid,
bool  first_segment 
)
extern

Definition at line 1694 of file worker.c.

1695{
1697
1698 /*
1699 * Initialize the worker's stream_fileset if we haven't yet. This will be
1700 * used for the entire duration of the worker so create it in a permanent
1701 * context. We create this on the very first streaming message from any
1702 * transaction and then use it for this and other streaming transactions.
1703 * Now, we could create a fileset at the start of the worker as well but
1704 * then we won't be sure that it will ever be used.
1705 */
1707 {
1709
1711
1714
1716 }
1717
1718 /* Open the spool file for this transaction. */
1720
1721 /* If this is not the first segment, open existing subxact file. */
1722 if (!first_segment)
1724
1726}
static void stream_open_file(Oid subid, TransactionId xid, bool first_segment)
Definition worker.c:5448
static void subxact_info_read(Oid subid, TransactionId xid)
Definition worker.c:5274
void FileSetInit(FileSet *fileset)
Definition fileset.c:52

References ApplyContext, begin_replication_step(), end_replication_step(), fb(), FileSetInit(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc_object, LogicalRepWorker::stream_fileset, stream_open_file(), LogicalRepWorker::subid, and subxact_info_read().

Referenced by apply_handle_stream_start(), pa_switch_to_partial_serialize(), and stream_open_and_write_change().

◆ stream_stop_internal()

void stream_stop_internal ( TransactionId  xid)
extern

Definition at line 1869 of file worker.c.

1870{
1871 /*
1872 * Serialize information about subxacts for the toplevel transaction, then
1873 * close the stream messages spool file.
1874 */
1877
1878 /* We must be in a valid transaction state */
1880
1881 /* Commit the per-stream transaction */
1883
1884 /* Reset per-stream context */
1886}
static void subxact_info_write(Oid subid, TransactionId xid)
Definition worker.c:5225
static MemoryContext LogicalStreamingContext
Definition worker.c:480

References Assert, CommitTransactionCommand(), IsTransactionState(), LogicalStreamingContext, MemoryContextReset(), MyLogicalRepWorker, stream_close_file(), LogicalRepWorker::subid, and subxact_info_write().

Referenced by apply_handle_stream_stop(), and stream_open_and_write_change().

◆ UpdateTwoPhaseState()

void UpdateTwoPhaseState ( Oid  suboid,
char  new_state 
)
extern

Definition at line 1680 of file tablesync.c.

1681{
1682 Relation rel;
1683 HeapTuple tup;
1684 bool nulls[Natts_pg_subscription];
1687
1691
1694 if (!HeapTupleIsValid(tup))
1695 elog(ERROR,
1696 "cache lookup failed for subscription oid %u",
1697 suboid);
1698
1699 /* Form a new tuple. */
1700 memset(values, 0, sizeof(values));
1701 memset(nulls, false, sizeof(nulls));
1702 memset(replaces, false, sizeof(replaces));
1703
1704 /* And update/set two_phase state */
1707
1709 values, nulls, replaces);
1710 CatalogTupleUpdate(rel, &tup->t_self, tup);
1711
1714}
static Datum values[MAXATTR]
Definition bootstrap.c:190
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition heaptuple.c:1118
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1372
#define HeapTupleIsValid(tuple)
Definition htup.h:78
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition indexing.c:313
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
static Datum CharGetDatum(char X)
Definition postgres.h:132
#define RelationGetDescr(relation)
Definition rel.h:542
#define SearchSysCacheCopy1(cacheId, key1)
Definition syscache.h:91

References Assert, CatalogTupleUpdate(), CharGetDatum(), elog, ERROR, fb(), heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, ObjectIdGetDatum(), RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy1, table_close(), table_open(), and values.

Referenced by CreateSubscription(), and run_apply_worker().

Variable Documentation

◆ apply_error_context_stack

PGDLLIMPORT ErrorContextCallback* apply_error_context_stack
extern

Definition at line 474 of file worker.c.

Referenced by LogicalRepApplyLoop(), and ProcessParallelApplyMessage().

◆ ApplyContext

◆ ApplyMessageContext

◆ in_remote_transaction

◆ InitializingApplyWorker

PGDLLIMPORT bool InitializingApplyWorker
extern

Definition at line 504 of file worker.c.

Referenced by ApplyWorkerMain(), logicalrep_worker_onexit(), and ParallelApplyWorkerMain().

◆ LogRepWorkerWalRcvConn

◆ MyLogicalRepWorker

PGDLLIMPORT LogicalRepWorker* MyLogicalRepWorker
extern

◆ MyParallelShared

◆ MySubscription

◆ table_states_not_ready

PGDLLIMPORT List* table_states_not_ready
extern