PostgreSQL Source Code git master
worker_internal.h File Reference
Include dependency graph for worker_internal.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  LogicalRepWorker
 
struct  ParallelApplyWorkerShared
 
struct  ParallelApplyWorkerInfo
 

Macros

#define isParallelApplyWorker(worker)
 
#define isTableSyncWorker(worker)
 
#define isSequenceSyncWorker(worker)
 

Typedefs

typedef enum LogicalRepWorkerType LogicalRepWorkerType
 
typedef struct LogicalRepWorker LogicalRepWorker
 
typedef enum ParallelTransState ParallelTransState
 
typedef enum PartialFileSetState PartialFileSetState
 
typedef struct ParallelApplyWorkerShared ParallelApplyWorkerShared
 
typedef struct ParallelApplyWorkerInfo ParallelApplyWorkerInfo
 

Enumerations

enum  LogicalRepWorkerType {
  WORKERTYPE_UNKNOWN = 0 , WORKERTYPE_TABLESYNC , WORKERTYPE_SEQUENCESYNC , WORKERTYPE_APPLY ,
  WORKERTYPE_PARALLEL_APPLY
}
 
enum  ParallelTransState { PARALLEL_TRANS_UNKNOWN , PARALLEL_TRANS_STARTED , PARALLEL_TRANS_FINISHED }
 
enum  PartialFileSetState { FS_EMPTY , FS_SERIALIZE_IN_PROGRESS , FS_SERIALIZE_DONE , FS_READY }
 

Functions

void logicalrep_worker_attach (int slot)
 
LogicalRepWorkerlogicalrep_worker_find (LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
 
Listlogicalrep_workers_find (Oid subid, bool only_running, bool acquire_lock)
 
bool logicalrep_worker_launch (LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)
 
void logicalrep_worker_stop (LogicalRepWorkerType wtype, Oid subid, Oid relid)
 
void logicalrep_pa_worker_stop (ParallelApplyWorkerInfo *winfo)
 
void logicalrep_worker_wakeup (LogicalRepWorkerType wtype, Oid subid, Oid relid)
 
void logicalrep_worker_wakeup_ptr (LogicalRepWorker *worker)
 
void logicalrep_reset_seqsync_start_time (void)
 
int logicalrep_sync_worker_count (Oid subid)
 
void ReplicationOriginNameForLogicalRep (Oid suboid, Oid relid, char *originname, Size szoriginname)
 
bool AllTablesyncsReady (void)
 
bool HasSubscriptionTablesCached (void)
 
void UpdateTwoPhaseState (Oid suboid, char new_state)
 
void ProcessSyncingTablesForSync (XLogRecPtr current_lsn)
 
void ProcessSyncingTablesForApply (XLogRecPtr current_lsn)
 
void ProcessSequencesForSync (void)
 
pg_noreturn void FinishSyncWorker (void)
 
void InvalidateSyncingRelStates (Datum arg, int cacheid, uint32 hashvalue)
 
void launch_sync_worker (LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, TimestampTz *last_start_time)
 
void ProcessSyncingRelations (XLogRecPtr current_lsn)
 
void FetchRelationStates (bool *has_pending_subtables, bool *has_pending_sequences, bool *started_tx)
 
void stream_start_internal (TransactionId xid, bool first_segment)
 
void stream_stop_internal (TransactionId xid)
 
void apply_spooled_messages (FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
 
void apply_dispatch (StringInfo s)
 
void maybe_reread_subscription (void)
 
void stream_cleanup_files (Oid subid, TransactionId xid)
 
void set_stream_options (WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
 
void start_apply (XLogRecPtr origin_startpos)
 
void InitializeLogRepWorker (void)
 
void SetupApplyOrSyncWorker (int worker_slot)
 
void DisableSubscriptionAndExit (void)
 
void store_flush_position (XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
 
void apply_error_callback (void *arg)
 
void set_apply_error_context_origin (char *originname)
 
void pa_allocate_worker (TransactionId xid)
 
ParallelApplyWorkerInfopa_find_worker (TransactionId xid)
 
void pa_detach_all_error_mq (void)
 
bool pa_send_data (ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
 
void pa_switch_to_partial_serialize (ParallelApplyWorkerInfo *winfo, bool stream_locked)
 
void pa_set_xact_state (ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
 
void pa_set_stream_apply_worker (ParallelApplyWorkerInfo *winfo)
 
void pa_start_subtrans (TransactionId current_xid, TransactionId top_xid)
 
void pa_reset_subtrans (void)
 
void pa_stream_abort (LogicalRepStreamAbortData *abort_data)
 
void pa_set_fileset_state (ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
 
void pa_lock_stream (TransactionId xid, LOCKMODE lockmode)
 
void pa_unlock_stream (TransactionId xid, LOCKMODE lockmode)
 
void pa_lock_transaction (TransactionId xid, LOCKMODE lockmode)
 
void pa_unlock_transaction (TransactionId xid, LOCKMODE lockmode)
 
void pa_decr_and_wait_stream_block (void)
 
void pa_xact_finish (ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
 
static bool am_tablesync_worker (void)
 
static bool am_sequencesync_worker (void)
 
static bool am_leader_apply_worker (void)
 
static bool am_parallel_apply_worker (void)
 

Variables

PGDLLIMPORT MemoryContext ApplyContext
 
PGDLLIMPORT MemoryContext ApplyMessageContext
 
PGDLLIMPORT ErrorContextCallbackapply_error_context_stack
 
PGDLLIMPORT ParallelApplyWorkerSharedMyParallelShared
 
PGDLLIMPORT struct WalReceiverConnLogRepWorkerWalRcvConn
 
PGDLLIMPORT SubscriptionMySubscription
 
PGDLLIMPORT LogicalRepWorkerMyLogicalRepWorker
 
PGDLLIMPORT bool in_remote_transaction
 
PGDLLIMPORT bool InitializingApplyWorker
 
PGDLLIMPORT Listtable_states_not_ready
 

Macro Definition Documentation

◆ isParallelApplyWorker

#define isParallelApplyWorker (   worker)
Value:
((worker)->in_use && \
(worker)->type == WORKERTYPE_PARALLEL_APPLY)
@ WORKERTYPE_PARALLEL_APPLY

Definition at line 362 of file worker_internal.h.

◆ isSequenceSyncWorker

#define isSequenceSyncWorker (   worker)
Value:
((worker)->in_use && \
(worker)->type == WORKERTYPE_SEQUENCESYNC)
@ WORKERTYPE_SEQUENCESYNC

Definition at line 366 of file worker_internal.h.

◆ isTableSyncWorker

#define isTableSyncWorker (   worker)
Value:
((worker)->in_use && \
(worker)->type == WORKERTYPE_TABLESYNC)
@ WORKERTYPE_TABLESYNC

Definition at line 364 of file worker_internal.h.

Typedef Documentation

◆ LogicalRepWorker

◆ LogicalRepWorkerType

◆ ParallelApplyWorkerInfo

◆ ParallelApplyWorkerShared

◆ ParallelTransState

◆ PartialFileSetState

Enumeration Type Documentation

◆ LogicalRepWorkerType

Enumerator
WORKERTYPE_UNKNOWN 
WORKERTYPE_TABLESYNC 
WORKERTYPE_SEQUENCESYNC 
WORKERTYPE_APPLY 
WORKERTYPE_PARALLEL_APPLY 

Definition at line 29 of file worker_internal.h.

◆ ParallelTransState

Enumerator
PARALLEL_TRANS_UNKNOWN 
PARALLEL_TRANS_STARTED 
PARALLEL_TRANS_FINISHED 

Definition at line 120 of file worker_internal.h.

121{
ParallelTransState
@ PARALLEL_TRANS_UNKNOWN
@ PARALLEL_TRANS_STARTED
@ PARALLEL_TRANS_FINISHED

◆ PartialFileSetState

Enumerator
FS_EMPTY 
FS_SERIALIZE_IN_PROGRESS 
FS_SERIALIZE_DONE 
FS_READY 

Definition at line 143 of file worker_internal.h.

144{
145 FS_EMPTY,
148 FS_READY,
PartialFileSetState
@ FS_EMPTY
@ FS_SERIALIZE_DONE
@ FS_READY
@ FS_SERIALIZE_IN_PROGRESS

Function Documentation

◆ AllTablesyncsReady()

bool AllTablesyncsReady ( void  )

Definition at line 1600 of file tablesync.c.

1601{
1602 bool started_tx;
1603 bool has_tables;
1604
1605 /* We need up-to-date sync state info for subscription tables here. */
1606 FetchRelationStates(&has_tables, NULL, &started_tx);
1607
1608 if (started_tx)
1609 {
1611 pgstat_report_stat(true);
1612 }
1613
1614 /*
1615 * Return false when there are no tables in subscription or not all tables
1616 * are in ready state; true otherwise.
1617 */
1618 return has_tables && (table_states_not_ready == NIL);
1619}
#define NIL
Definition: pg_list.h:68
long pgstat_report_stat(bool force)
Definition: pgstat.c:694
void FetchRelationStates(bool *has_pending_subtables, bool *has_pending_subsequences, bool *started_tx)
Definition: syncutils.c:202
List * table_states_not_ready
Definition: tablesync.c:125
void CommitTransactionCommand(void)
Definition: xact.c:3175

References CommitTransactionCommand(), FetchRelationStates(), NIL, pgstat_report_stat(), and table_states_not_ready.

Referenced by pa_can_start(), ProcessSyncingTablesForApply(), run_apply_worker(), and wait_for_local_flush().

◆ am_leader_apply_worker()

◆ am_parallel_apply_worker()

◆ am_sequencesync_worker()

static bool am_sequencesync_worker ( void  )
inlinestatic

Definition at line 376 of file worker_internal.h.

377{
379}
#define isSequenceSyncWorker(worker)

References isSequenceSyncWorker, and MyLogicalRepWorker.

Referenced by FinishSyncWorker(), InitializeLogRepWorker(), SetupApplyOrSyncWorker(), and start_sequence_sync().

◆ am_tablesync_worker()

static bool am_tablesync_worker ( void  )
inlinestatic

◆ apply_dispatch()

void apply_dispatch ( StringInfo  s)

Definition at line 3775 of file worker.c.

3776{
3778 LogicalRepMsgType saved_command;
3779
3780 /*
3781 * Set the current command being applied. Since this function can be
3782 * called recursively when applying spooled changes, save the current
3783 * command.
3784 */
3785 saved_command = apply_error_callback_arg.command;
3787
3788 switch (action)
3789 {
3792 break;
3793
3796 break;
3797
3800 break;
3801
3804 break;
3805
3808 break;
3809
3812 break;
3813
3816 break;
3817
3820 break;
3821
3824 break;
3825
3827
3828 /*
3829 * Logical replication does not use generic logical messages yet.
3830 * Although, it could be used by other applications that use this
3831 * output plugin.
3832 */
3833 break;
3834
3837 break;
3838
3841 break;
3842
3845 break;
3846
3849 break;
3850
3853 break;
3854
3857 break;
3858
3861 break;
3862
3865 break;
3866
3869 break;
3870
3871 default:
3872 ereport(ERROR,
3873 (errcode(ERRCODE_PROTOCOL_VIOLATION),
3874 errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3875 }
3876
3877 /* Reset the current command */
3878 apply_error_callback_arg.command = saved_command;
3879}
static void apply_handle_stream_prepare(StringInfo s)
Definition: worker.c:1518
static void apply_handle_type(StringInfo s)
Definition: worker.c:2586
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:3647
static void apply_handle_update(StringInfo s)
Definition: worker.c:2790
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:2390
static void apply_handle_commit_prepared(StringInfo s)
Definition: worker.c:1405
static ApplyErrorCallbackArg apply_error_callback_arg
Definition: worker.c:459
static void apply_handle_delete(StringInfo s)
Definition: worker.c:3012
static void apply_handle_begin(StringInfo s)
Definition: worker.c:1211
static void apply_handle_commit(StringInfo s)
Definition: worker.c:1236
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:2071
static void apply_handle_relation(StringInfo s)
Definition: worker.c:2563
static void apply_handle_prepare(StringInfo s)
Definition: worker.c:1331
static void apply_handle_rollback_prepared(StringInfo s)
Definition: worker.c:1457
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:1885
static void apply_handle_origin(StringInfo s)
Definition: worker.c:1666
static void apply_handle_begin_prepare(StringInfo s)
Definition: worker.c:1265
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:1725
static void apply_handle_insert(StringInfo s)
Definition: worker.c:2633
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:150
LogicalRepMsgType
Definition: logicalproto.h:58
@ LOGICAL_REP_MSG_INSERT
Definition: logicalproto.h:62
@ LOGICAL_REP_MSG_TRUNCATE
Definition: logicalproto.h:65
@ LOGICAL_REP_MSG_STREAM_STOP
Definition: logicalproto.h:74
@ LOGICAL_REP_MSG_BEGIN
Definition: logicalproto.h:59
@ LOGICAL_REP_MSG_STREAM_PREPARE
Definition: logicalproto.h:77
@ LOGICAL_REP_MSG_STREAM_ABORT
Definition: logicalproto.h:76
@ LOGICAL_REP_MSG_BEGIN_PREPARE
Definition: logicalproto.h:69
@ LOGICAL_REP_MSG_STREAM_START
Definition: logicalproto.h:73
@ LOGICAL_REP_MSG_COMMIT
Definition: logicalproto.h:60
@ LOGICAL_REP_MSG_PREPARE
Definition: logicalproto.h:70
@ LOGICAL_REP_MSG_RELATION
Definition: logicalproto.h:66
@ LOGICAL_REP_MSG_MESSAGE
Definition: logicalproto.h:68
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
Definition: logicalproto.h:72
@ LOGICAL_REP_MSG_COMMIT_PREPARED
Definition: logicalproto.h:71
@ LOGICAL_REP_MSG_TYPE
Definition: logicalproto.h:67
@ LOGICAL_REP_MSG_DELETE
Definition: logicalproto.h:64
@ LOGICAL_REP_MSG_STREAM_COMMIT
Definition: logicalproto.h:75
@ LOGICAL_REP_MSG_ORIGIN
Definition: logicalproto.h:61
@ LOGICAL_REP_MSG_UPDATE
Definition: logicalproto.h:63
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
LogicalRepMsgType command
Definition: worker.c:325

References generate_unaccent_rules::action, apply_error_callback_arg, apply_handle_begin(), apply_handle_begin_prepare(), apply_handle_commit(), apply_handle_commit_prepared(), apply_handle_delete(), apply_handle_insert(), apply_handle_origin(), apply_handle_prepare(), apply_handle_relation(), apply_handle_rollback_prepared(), apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), apply_handle_truncate(), apply_handle_type(), apply_handle_update(), ApplyErrorCallbackArg::command, ereport, errcode(), errmsg(), ERROR, LOGICAL_REP_MSG_BEGIN, LOGICAL_REP_MSG_BEGIN_PREPARE, LOGICAL_REP_MSG_COMMIT, LOGICAL_REP_MSG_COMMIT_PREPARED, LOGICAL_REP_MSG_DELETE, LOGICAL_REP_MSG_INSERT, LOGICAL_REP_MSG_MESSAGE, LOGICAL_REP_MSG_ORIGIN, LOGICAL_REP_MSG_PREPARE, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_ROLLBACK_PREPARED, LOGICAL_REP_MSG_STREAM_ABORT, LOGICAL_REP_MSG_STREAM_COMMIT, LOGICAL_REP_MSG_STREAM_PREPARE, LOGICAL_REP_MSG_STREAM_START, LOGICAL_REP_MSG_STREAM_STOP, LOGICAL_REP_MSG_TRUNCATE, LOGICAL_REP_MSG_TYPE, LOGICAL_REP_MSG_UPDATE, and pq_getmsgbyte().

Referenced by apply_spooled_messages(), LogicalParallelApplyLoop(), and LogicalRepApplyLoop().

◆ apply_error_callback()

void apply_error_callback ( void *  arg)

Definition at line 6159 of file worker.c.

6160{
6162
6164 return;
6165
6166 Assert(errarg->origin_name);
6167
6168 if (errarg->rel == NULL)
6169 {
6170 if (!TransactionIdIsValid(errarg->remote_xid))
6171 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
6172 errarg->origin_name,
6174 else if (!XLogRecPtrIsValid(errarg->finish_lsn))
6175 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
6176 errarg->origin_name,
6178 errarg->remote_xid);
6179 else
6180 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
6181 errarg->origin_name,
6183 errarg->remote_xid,
6184 LSN_FORMAT_ARGS(errarg->finish_lsn));
6185 }
6186 else
6187 {
6188 if (errarg->remote_attnum < 0)
6189 {
6190 if (!XLogRecPtrIsValid(errarg->finish_lsn))
6191 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
6192 errarg->origin_name,
6194 errarg->rel->remoterel.nspname,
6195 errarg->rel->remoterel.relname,
6196 errarg->remote_xid);
6197 else
6198 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
6199 errarg->origin_name,
6201 errarg->rel->remoterel.nspname,
6202 errarg->rel->remoterel.relname,
6203 errarg->remote_xid,
6204 LSN_FORMAT_ARGS(errarg->finish_lsn));
6205 }
6206 else
6207 {
6208 if (!XLogRecPtrIsValid(errarg->finish_lsn))
6209 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
6210 errarg->origin_name,
6212 errarg->rel->remoterel.nspname,
6213 errarg->rel->remoterel.relname,
6214 errarg->rel->remoterel.attnames[errarg->remote_attnum],
6215 errarg->remote_xid);
6216 else
6217 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
6218 errarg->origin_name,
6220 errarg->rel->remoterel.nspname,
6221 errarg->rel->remoterel.relname,
6222 errarg->rel->remoterel.attnames[errarg->remote_attnum],
6223 errarg->remote_xid,
6224 LSN_FORMAT_ARGS(errarg->finish_lsn));
6225 }
6226 }
6227}
#define errcontext
Definition: elog.h:198
const char * logicalrep_message_type(LogicalRepMsgType action)
Definition: proto.c:1212
TransactionId remote_xid
Definition: worker.c:330
XLogRecPtr finish_lsn
Definition: worker.c:331
LogicalRepRelMapEntry * rel
Definition: worker.c:326
LogicalRepRelation remoterel
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define XLogRecPtrIsValid(r)
Definition: xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:47

References apply_error_callback_arg, Assert(), LogicalRepRelation::attnames, ApplyErrorCallbackArg::command, errcontext, ApplyErrorCallbackArg::finish_lsn, logicalrep_message_type(), LSN_FORMAT_ARGS, LogicalRepRelation::nspname, ApplyErrorCallbackArg::origin_name, ApplyErrorCallbackArg::rel, LogicalRepRelation::relname, ApplyErrorCallbackArg::remote_attnum, ApplyErrorCallbackArg::remote_xid, LogicalRepRelMapEntry::remoterel, TransactionIdIsValid, and XLogRecPtrIsValid.

Referenced by LogicalParallelApplyLoop(), and LogicalRepApplyLoop().

◆ apply_spooled_messages()

void apply_spooled_messages ( FileSet stream_fileset,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2260 of file worker.c.

2262{
2263 int nchanges;
2264 char path[MAXPGPATH];
2265 char *buffer = NULL;
2266 MemoryContext oldcxt;
2267 ResourceOwner oldowner;
2268 int fileno;
2269 off_t offset;
2270
2273
2274 /* Make sure we have an open transaction */
2276
2277 /*
2278 * Allocate file handle and memory required to process all the messages in
2279 * TopTransactionContext to avoid them getting reset after each message is
2280 * processed.
2281 */
2283
2284 /* Open the spool file for the committed/prepared transaction */
2286 elog(DEBUG1, "replaying changes from file \"%s\"", path);
2287
2288 /*
2289 * Make sure the file is owned by the toplevel transaction so that the
2290 * file will not be accidentally closed when aborting a subtransaction.
2291 */
2292 oldowner = CurrentResourceOwner;
2294
2295 stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2296
2297 CurrentResourceOwner = oldowner;
2298
2299 buffer = palloc(BLCKSZ);
2300
2301 MemoryContextSwitchTo(oldcxt);
2302
2303 remote_final_lsn = lsn;
2304
2305 /*
2306 * Make sure the handle apply_dispatch methods are aware we're in a remote
2307 * transaction.
2308 */
2309 in_remote_transaction = true;
2311
2313
2314 /*
2315 * Read the entries one by one and pass them through the same logic as in
2316 * apply_dispatch.
2317 */
2318 nchanges = 0;
2319 while (true)
2320 {
2322 size_t nbytes;
2323 int len;
2324
2326
2327 /* read length of the on-disk record */
2328 nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2329
2330 /* have we reached end of the file? */
2331 if (nbytes == 0)
2332 break;
2333
2334 /* do we have a correct length? */
2335 if (len <= 0)
2336 elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2337 len, path);
2338
2339 /* make sure we have sufficiently large buffer */
2340 buffer = repalloc(buffer, len);
2341
2342 /* and finally read the data into the buffer */
2343 BufFileReadExact(stream_fd, buffer, len);
2344
2345 BufFileTell(stream_fd, &fileno, &offset);
2346
2347 /* init a stringinfo using the buffer and call apply_dispatch */
2348 initReadOnlyStringInfo(&s2, buffer, len);
2349
2350 /* Ensure we are reading the data into our memory context. */
2352
2354
2356
2357 MemoryContextSwitchTo(oldcxt);
2358
2359 nchanges++;
2360
2361 /*
2362 * It is possible the file has been closed because we have processed
2363 * the transaction end message like stream_commit in which case that
2364 * must be the last message.
2365 */
2366 if (!stream_fd)
2367 {
2368 ensure_last_message(stream_fileset, xid, fileno, offset);
2369 break;
2370 }
2371
2372 if (nchanges % 1000 == 0)
2373 elog(DEBUG1, "replayed %d changes from file \"%s\"",
2374 nchanges, path);
2375 }
2376
2377 if (stream_fd)
2379
2380 elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2381 nchanges, path);
2382
2383 return;
2384}
static void begin_replication_step(void)
Definition: worker.c:726
static void end_replication_step(void)
Definition: worker.c:749
MemoryContext ApplyMessageContext
Definition: worker.c:471
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:5367
static BufFile * stream_fd
Definition: worker.c:520
bool in_remote_transaction
Definition: worker.c:484
void apply_dispatch(StringInfo s)
Definition: worker.c:3775
static void ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, off_t offset)
Definition: worker.c:2228
static XLogRecPtr remote_final_lsn
Definition: worker.c:485
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Definition: worker.c:6021
static void stream_close_file(void)
Definition: worker.c:5450
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:291
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:654
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
Definition: buffile.c:833
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
Definition: buffile.c:664
#define DEBUG1
Definition: elog.h:30
#define elog(elevel,...)
Definition: elog.h:226
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:56
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:400
MemoryContext TopTransactionContext
Definition: mcxt.c:171
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1610
void * palloc(Size size)
Definition: mcxt.c:1365
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
#define MAXPGPATH
const void size_t len
char * s2
ResourceOwner TopTransactionResourceOwner
Definition: resowner.c:175
ResourceOwner CurrentResourceOwner
Definition: resowner.c:173
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition: stringinfo.h:157
static bool am_parallel_apply_worker(void)

References am_parallel_apply_worker(), apply_dispatch(), ApplyMessageContext, begin_replication_step(), BufFileOpenFileSet(), BufFileReadExact(), BufFileReadMaybeEOF(), BufFileTell(), changes_filename(), CHECK_FOR_INTERRUPTS, CurrentResourceOwner, DEBUG1, elog, end_replication_step(), ensure_last_message(), ERROR, in_remote_transaction, initReadOnlyStringInfo(), len, MAXPGPATH, maybe_start_skipping_changes(), MemoryContextReset(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc(), pgstat_report_activity(), remote_final_lsn, repalloc(), s2, STATE_RUNNING, stream_close_file(), stream_fd, LogicalRepWorker::subid, TopTransactionContext, and TopTransactionResourceOwner.

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), and pa_process_spooled_messages_if_required().

◆ DisableSubscriptionAndExit()

void DisableSubscriptionAndExit ( void  )

Definition at line 5943 of file worker.c.

5944{
5945 /*
5946 * Emit the error message, and recover from the error state to an idle
5947 * state
5948 */
5950
5954
5956
5957 /*
5958 * Report the worker failed during sequence synchronization, table
5959 * synchronization, or apply.
5960 */
5963
5964 /* Disable the subscription */
5966
5967 /*
5968 * Updating pg_subscription might involve TOAST table access, so ensure we
5969 * have a valid snapshot.
5970 */
5972
5976
5977 /* Ensure we remove no-longer-useful entry for worker's start time */
5980
5981 /* Notify the subscription has been disabled and exit */
5982 ereport(LOG,
5983 errmsg("subscription \"%s\" has been disabled because of an error",
5985
5986 /*
5987 * Skip the track_commit_timestamp check when disabling the worker due to
5988 * an error, as verifying commit timestamps is unnecessary in this
5989 * context.
5990 */
5994
5995 proc_exit(0);
5996}
Subscription * MySubscription
Definition: worker.c:479
void EmitErrorReport(void)
Definition: elog.c:1704
void FlushErrorState(void)
Definition: elog.c:1884
#define LOG
Definition: elog.h:31
#define WARNING
Definition: elog.h:36
void proc_exit(int code)
Definition: ipc.c:104
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition: launcher.c:1154
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:136
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:134
void DisableSubscription(Oid subid)
void pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype)
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:272
void PushActiveSnapshot(Snapshot snapshot)
Definition: snapmgr.c:682
void PopActiveSnapshot(void)
Definition: snapmgr.c:775
void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
static bool am_leader_apply_worker(void)
void StartTransactionCommand(void)
Definition: xact.c:3077
void AbortOutOfAnyTransaction(void)
Definition: xact.c:4880

References AbortOutOfAnyTransaction(), am_leader_apply_worker(), ApplyLauncherForgetWorkerStartTime(), CheckSubDeadTupleRetention(), CommitTransactionCommand(), DisableSubscription(), EmitErrorReport(), ereport, errmsg(), FlushErrorState(), GetTransactionSnapshot(), HOLD_INTERRUPTS, LOG, MyLogicalRepWorker, MySubscription, Subscription::name, Subscription::oid, pgstat_report_subscription_error(), PopActiveSnapshot(), proc_exit(), PushActiveSnapshot(), RESUME_INTERRUPTS, Subscription::retaindeadtuples, Subscription::retentionactive, StartTransactionCommand(), LogicalRepWorker::subid, LogicalRepWorker::type, and WARNING.

Referenced by start_apply(), start_sequence_sync(), and start_table_sync().

◆ FetchRelationStates()

void FetchRelationStates ( bool *  has_pending_subtables,
bool *  has_pending_sequences,
bool *  started_tx 
)

Definition at line 202 of file syncutils.c.

205{
206 /*
207 * has_subtables and has_subsequences_non_ready are declared as static,
208 * since the same value can be used until the system table is invalidated.
209 */
210 static bool has_subtables = false;
211 static bool has_subsequences_non_ready = false;
212
213 *started_tx = false;
214
216 {
217 MemoryContext oldctx;
218 List *rstates;
219 SubscriptionRelState *rstate;
220
222 has_subsequences_non_ready = false;
223
224 /* Clean the old lists. */
227
228 if (!IsTransactionState())
229 {
231 *started_tx = true;
232 }
233
234 /* Fetch tables and sequences that are in non-READY state. */
235 rstates = GetSubscriptionRelations(MySubscription->oid, true, true,
236 true);
237
238 /* Allocate the tracking info in a permanent memory context. */
240 foreach_ptr(SubscriptionRelState, subrel, rstates)
241 {
242 if (get_rel_relkind(subrel->relid) == RELKIND_SEQUENCE)
243 has_subsequences_non_ready = true;
244 else
245 {
246 rstate = palloc(sizeof(SubscriptionRelState));
247 memcpy(rstate, subrel, sizeof(SubscriptionRelState));
249 rstate);
250 }
251 }
252 MemoryContextSwitchTo(oldctx);
253
254 /*
255 * Does the subscription have tables?
256 *
257 * If there were not-READY tables found then we know it does. But if
258 * table_states_not_ready was empty we still need to check again to
259 * see if there are 0 tables.
260 */
261 has_subtables = (table_states_not_ready != NIL) ||
263
264 /*
265 * If the subscription relation cache has been invalidated since we
266 * entered this routine, we still use and return the relations we just
267 * finished constructing, to avoid infinite loops, but we leave the
268 * table states marked as stale so that we'll rebuild it again on next
269 * access. Otherwise, we mark the table states as valid.
270 */
273 }
274
275 if (has_pending_subtables)
276 *has_pending_subtables = has_subtables;
277
278 if (has_pending_subsequences)
279 *has_pending_subsequences = has_subsequences_non_ready;
280}
List * lappend(List *list, void *datum)
Definition: list.c:339
void list_free_deep(List *list)
Definition: list.c:1560
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:2170
MemoryContext CacheMemoryContext
Definition: mcxt.c:169
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
bool HasSubscriptionTables(Oid subid)
List * GetSubscriptionRelations(Oid subid, bool tables, bool sequences, bool not_ready)
Definition: pg_list.h:54
@ SYNC_RELATIONS_STATE_VALID
Definition: syncutils.c:41
@ SYNC_RELATIONS_STATE_REBUILD_STARTED
Definition: syncutils.c:40
static SyncingRelationsState relation_states_validity
Definition: syncutils.c:44
bool IsTransactionState(void)
Definition: xact.c:388

References CacheMemoryContext, foreach_ptr, get_rel_relkind(), GetSubscriptionRelations(), HasSubscriptionTables(), IsTransactionState(), lappend(), list_free_deep(), MemoryContextSwitchTo(), MySubscription, NIL, Subscription::oid, palloc(), relation_states_validity, StartTransactionCommand(), SYNC_RELATIONS_STATE_REBUILD_STARTED, SYNC_RELATIONS_STATE_VALID, and table_states_not_ready.

Referenced by AllTablesyncsReady(), HasSubscriptionTablesCached(), ProcessSequencesForSync(), and ProcessSyncingTablesForApply().

◆ FinishSyncWorker()

pg_noreturn void FinishSyncWorker ( void  )

Definition at line 50 of file syncutils.c.

51{
53
54 /*
55 * Commit any outstanding transaction. This is the usual case, unless
56 * there was nothing to do for the table.
57 */
59 {
62 }
63
64 /* And flush all writes. */
66
68 {
70 errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished",
72
73 /*
74 * Reset last_seqsync_start_time, so that next time a sequencesync
75 * worker is needed it can be started promptly.
76 */
78 }
79 else
80 {
83 errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
87
88 /* Find the leader apply worker and signal it. */
91 }
92
93 /* Stop gracefully */
94 proc_exit(0);
95}
void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
Definition: launcher.c:723
void logicalrep_reset_seqsync_start_time(void)
Definition: launcher.c:872
char * get_rel_name(Oid relid)
Definition: lsyscache.c:2095
#define InvalidOid
Definition: postgres_ext.h:37
static bool am_sequencesync_worker(void)
static bool am_tablesync_worker(void)
XLogRecPtr GetXLogWriteRecPtr(void)
Definition: xlog.c:9515
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2783

References am_sequencesync_worker(), am_tablesync_worker(), Assert(), CommitTransactionCommand(), ereport, errmsg(), get_rel_name(), GetXLogWriteRecPtr(), InvalidOid, IsTransactionState(), LOG, logicalrep_reset_seqsync_start_time(), logicalrep_worker_wakeup(), MyLogicalRepWorker, MySubscription, Subscription::name, pgstat_report_stat(), proc_exit(), LogicalRepWorker::relid, StartTransactionCommand(), LogicalRepWorker::subid, WORKERTYPE_APPLY, and XLogFlush().

Referenced by LogicalRepSyncTableStart(), ProcessSyncingTablesForSync(), SequenceSyncWorkerMain(), and TableSyncWorkerMain().

◆ HasSubscriptionTablesCached()

bool HasSubscriptionTablesCached ( void  )

Definition at line 1630 of file tablesync.c.

1631{
1632 bool started_tx;
1633 bool has_tables;
1634
1635 /* We need up-to-date subscription tables info here */
1636 FetchRelationStates(&has_tables, NULL, &started_tx);
1637
1638 if (started_tx)
1639 {
1641 pgstat_report_stat(true);
1642 }
1643
1644 return has_tables;
1645}

References CommitTransactionCommand(), FetchRelationStates(), and pgstat_report_stat().

Referenced by wait_for_local_flush().

◆ InitializeLogRepWorker()

void InitializeLogRepWorker ( void  )

Definition at line 5737 of file worker.c.

5738{
5739 MemoryContext oldctx;
5740
5741 /* Run as replica session replication role. */
5742 SetConfigOption("session_replication_role", "replica",
5744
5745 /* Connect to our database. */
5748 0);
5749
5750 /*
5751 * Set always-secure search path, so malicious users can't redirect user
5752 * code (e.g. pg_index.indexprs).
5753 */
5754 SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
5755
5756 /* Load the subscription into persistent memory context. */
5758 "ApplyContext",
5762
5763 /*
5764 * Lock the subscription to prevent it from being concurrently dropped,
5765 * then re-verify its existence. After the initialization, the worker will
5766 * be terminated gracefully if the subscription is dropped.
5767 */
5768 LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, 0,
5771 if (!MySubscription)
5772 {
5773 ereport(LOG,
5774 (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
5776
5777 /* Ensure we remove no-longer-useful entry for worker's start time */
5780
5781 proc_exit(0);
5782 }
5783
5784 MySubscriptionValid = true;
5785 MemoryContextSwitchTo(oldctx);
5786
5787 if (!MySubscription->enabled)
5788 {
5789 ereport(LOG,
5790 (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
5791 MySubscription->name)));
5792
5794 }
5795
5796 /*
5797 * Restart the worker if retain_dead_tuples was enabled during startup.
5798 *
5799 * At this point, the replication slot used for conflict detection might
5800 * not exist yet, or could be dropped soon if the launcher perceives
5801 * retain_dead_tuples as disabled. To avoid unnecessary tracking of
5802 * oldest_nonremovable_xid when the slot is absent or at risk of being
5803 * dropped, a restart is initiated.
5804 *
5805 * The oldest_nonremovable_xid should be initialized only when the
5806 * subscription's retention is active before launching the worker. See
5807 * logicalrep_worker_launch.
5808 */
5809 if (am_leader_apply_worker() &&
5813 {
5814 ereport(LOG,
5815 errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
5816 MySubscription->name, "retain_dead_tuples"));
5817
5819 }
5820
5821 /* Setup synchronous commit according to the user's wishes */
5822 SetConfigOption("synchronous_commit", MySubscription->synccommit,
5824
5825 /*
5826 * Keep us informed about subscription or role changes. Note that the
5827 * role's superuser privilege can be revoked.
5828 */
5829 CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
5831 (Datum) 0);
5832
5835 (Datum) 0);
5836
5837 if (am_tablesync_worker())
5838 ereport(LOG,
5839 errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
5842 else if (am_sequencesync_worker())
5843 ereport(LOG,
5844 errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started",
5846 else
5847 ereport(LOG,
5848 errmsg("logical replication apply worker for subscription \"%s\" has started",
5850
5852}
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:5167
static void apply_worker_exit(void)
Definition: worker.c:5004
MemoryContext ApplyContext
Definition: worker.c:472
static bool MySubscriptionValid
Definition: worker.c:480
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: bgworker.c:890
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:4196
@ PGC_S_OVERRIDE
Definition: guc.h:123
@ PGC_SUSET
Definition: guc.h:78
@ PGC_BACKEND
Definition: guc.h:77
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1812
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1088
#define AccessShareLock
Definition: lockdefs.h:36
MemoryContext TopMemoryContext
Definition: mcxt.c:166
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
Subscription * GetSubscription(Oid subid, bool missing_ok)
uint64_t Datum
Definition: postgres.h:70
TransactionId oldest_nonremovable_xid

References AccessShareLock, ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, am_leader_apply_worker(), am_sequencesync_worker(), am_tablesync_worker(), apply_worker_exit(), ApplyContext, ApplyLauncherForgetWorkerStartTime(), BackgroundWorkerInitializeConnectionByOid(), CacheRegisterSyscacheCallback(), CommitTransactionCommand(), LogicalRepWorker::dbid, Subscription::enabled, ereport, errmsg(), get_rel_name(), GetSubscription(), LockSharedObject(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, LogicalRepWorker::oldest_nonremovable_xid, PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, proc_exit(), LogicalRepWorker::relid, Subscription::retaindeadtuples, Subscription::retentionactive, SetConfigOption(), StartTransactionCommand(), LogicalRepWorker::subid, subscription_change_cb(), Subscription::synccommit, TopMemoryContext, TransactionIdIsValid, and LogicalRepWorker::userid.

Referenced by ParallelApplyWorkerMain(), and SetupApplyOrSyncWorker().

◆ InvalidateSyncingRelStates()

void InvalidateSyncingRelStates ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)

◆ launch_sync_worker()

void launch_sync_worker ( LogicalRepWorkerType  wtype,
int  nsyncworkers,
Oid  relid,
TimestampTz last_start_time 
)

Definition at line 117 of file syncutils.c.

119{
121
122 Assert((wtype == WORKERTYPE_TABLESYNC && OidIsValid(relid)) ||
123 (wtype == WORKERTYPE_SEQUENCESYNC && !OidIsValid(relid)));
124
125 /* If there is a free sync worker slot, start a new sync worker */
126 if (nsyncworkers >= max_sync_workers_per_subscription)
127 return;
128
130
131 if (!(*last_start_time) ||
132 TimestampDifferenceExceeds(*last_start_time, now,
134 {
135 /*
136 * Set the last_start_time even if we fail to start the worker, so
137 * that we won't retry until wal_retrieve_retry_interval has elapsed.
138 */
139 *last_start_time = now;
140 (void) logicalrep_worker_launch(wtype,
145 relid, DSM_HANDLE_INVALID, false);
146 }
147}
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1781
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
#define OidIsValid(objectId)
Definition: c.h:777
int64 TimestampTz
Definition: timestamp.h:39
#define DSM_HANDLE_INVALID
Definition: dsm_impl.h:58
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)
Definition: launcher.c:324
int max_sync_workers_per_subscription
Definition: launcher.c:53
int wal_retrieve_retry_interval
Definition: xlog.c:136

References Assert(), LogicalRepWorker::dbid, DSM_HANDLE_INVALID, GetCurrentTimestamp(), logicalrep_worker_launch(), max_sync_workers_per_subscription, MyLogicalRepWorker, MySubscription, Subscription::name, now(), Subscription::oid, OidIsValid, TimestampDifferenceExceeds(), LogicalRepWorker::userid, wal_retrieve_retry_interval, WORKERTYPE_SEQUENCESYNC, and WORKERTYPE_TABLESYNC.

Referenced by ProcessSequencesForSync(), and ProcessSyncingTablesForApply().

◆ logicalrep_pa_worker_stop()

void logicalrep_pa_worker_stop ( ParallelApplyWorkerInfo winfo)

Definition at line 679 of file launcher.c.

680{
681 int slot_no;
682 uint16 generation;
683 LogicalRepWorker *worker;
684
685 SpinLockAcquire(&winfo->shared->mutex);
686 generation = winfo->shared->logicalrep_worker_generation;
687 slot_no = winfo->shared->logicalrep_worker_slot_no;
688 SpinLockRelease(&winfo->shared->mutex);
689
690 Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
691
692 /*
693 * Detach from the error_mq_handle for the parallel apply worker before
694 * stopping it. This prevents the leader apply worker from trying to
695 * receive the message from the error queue that might already be detached
696 * by the parallel apply worker.
697 */
698 if (winfo->error_mq_handle)
699 {
701 winfo->error_mq_handle = NULL;
702 }
703
704 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
705
706 worker = &LogicalRepCtx->workers[slot_no];
708
709 /*
710 * Only stop the worker if the generation matches and the worker is alive.
711 */
712 if (worker->generation == generation && worker->proc)
714
715 LWLockRelease(LogicalRepWorkerLock);
716}
uint16_t uint16
Definition: c.h:540
int max_logical_replication_workers
Definition: launcher.c:52
static void logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
Definition: launcher.c:569
static LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:71
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1174
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1894
@ LW_SHARED
Definition: lwlock.h:113
void shm_mq_detach(shm_mq_handle *mqh)
Definition: shm_mq.c:843
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:68
shm_mq_handle * error_mq_handle
ParallelApplyWorkerShared * shared
#define SIGUSR2
Definition: win32_port.h:171

References Assert(), ParallelApplyWorkerInfo::error_mq_handle, LogicalRepWorker::generation, isParallelApplyWorker, ParallelApplyWorkerShared::logicalrep_worker_generation, ParallelApplyWorkerShared::logicalrep_worker_slot_no, logicalrep_worker_stop_internal(), LogicalRepCtx, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, ParallelApplyWorkerShared::mutex, LogicalRepWorker::proc, ParallelApplyWorkerInfo::shared, shm_mq_detach(), SIGUSR2, SpinLockAcquire, SpinLockRelease, and LogicalRepCtxStruct::workers.

Referenced by pa_free_worker().

◆ logicalrep_reset_seqsync_start_time()

void logicalrep_reset_seqsync_start_time ( void  )

Definition at line 872 of file launcher.c.

873{
874 LogicalRepWorker *worker;
875
876 /*
877 * The apply worker can't access last_seqsync_start_time concurrently, so
878 * it is okay to use SHARED lock here. See ProcessSequencesForSync().
879 */
880 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
881
884 true);
885 if (worker)
886 worker->last_seqsync_start_time = 0;
887
888 LWLockRelease(LogicalRepWorkerLock);
889}
LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
Definition: launcher.c:258
TimestampTz last_seqsync_start_time

References InvalidOid, LogicalRepWorker::last_seqsync_start_time, logicalrep_worker_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLogicalRepWorker, LogicalRepWorker::subid, and WORKERTYPE_APPLY.

Referenced by FinishSyncWorker().

◆ logicalrep_sync_worker_count()

int logicalrep_sync_worker_count ( Oid  subid)

Definition at line 927 of file launcher.c.

928{
929 int i;
930 int res = 0;
931
932 Assert(LWLockHeldByMe(LogicalRepWorkerLock));
933
934 /* Search for attached worker for a given subscription id. */
935 for (i = 0; i < max_logical_replication_workers; i++)
936 {
938
939 if (w->subid == subid && (isTableSyncWorker(w) || isSequenceSyncWorker(w)))
940 res++;
941 }
942
943 return res;
944}
int i
Definition: isn.c:77
bool LWLockHeldByMe(LWLock *lock)
Definition: lwlock.c:1977

References Assert(), i, isSequenceSyncWorker, isTableSyncWorker, LogicalRepCtx, LWLockHeldByMe(), max_logical_replication_workers, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by logicalrep_worker_launch(), ProcessSequencesForSync(), and ProcessSyncingTablesForApply().

◆ logicalrep_worker_attach()

void logicalrep_worker_attach ( int  slot)

Definition at line 757 of file launcher.c.

758{
759 /* Block concurrent access. */
760 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
761
762 Assert(slot >= 0 && slot < max_logical_replication_workers);
764
766 {
767 LWLockRelease(LogicalRepWorkerLock);
769 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
770 errmsg("logical replication worker slot %d is empty, cannot attach",
771 slot)));
772 }
773
775 {
776 LWLockRelease(LogicalRepWorkerLock);
778 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
779 errmsg("logical replication worker slot %d is already used by "
780 "another worker, cannot attach", slot)));
781 }
782
785
786 LWLockRelease(LogicalRepWorkerLock);
787}
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337
static void logicalrep_worker_onexit(int code, Datum arg)
Definition: launcher.c:897
@ LW_EXCLUSIVE
Definition: lwlock.h:112
PGPROC * MyProc
Definition: proc.c:67

References Assert(), before_shmem_exit(), ereport, errcode(), errmsg(), ERROR, LogicalRepWorker::in_use, logicalrep_worker_onexit(), LogicalRepCtx, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, MyLogicalRepWorker, MyProc, LogicalRepWorker::proc, and LogicalRepCtxStruct::workers.

Referenced by ParallelApplyWorkerMain(), and SetupApplyOrSyncWorker().

◆ logicalrep_worker_find()

LogicalRepWorker * logicalrep_worker_find ( LogicalRepWorkerType  wtype,
Oid  subid,
Oid  relid,
bool  only_running 
)

Definition at line 258 of file launcher.c.

260{
261 int i;
262 LogicalRepWorker *res = NULL;
263
264 /* relid must be valid only for table sync workers */
265 Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
266 Assert(LWLockHeldByMe(LogicalRepWorkerLock));
267
268 /* Search for an attached worker that matches the specified criteria. */
269 for (i = 0; i < max_logical_replication_workers; i++)
270 {
272
273 /* Skip parallel apply workers. */
275 continue;
276
277 if (w->in_use && w->subid == subid && w->relid == relid &&
278 w->type == wtype && (!only_running || w->proc))
279 {
280 res = w;
281 break;
282 }
283 }
284
285 return res;
286}

References Assert(), i, LogicalRepWorker::in_use, isParallelApplyWorker, LogicalRepCtx, LWLockHeldByMe(), max_logical_replication_workers, OidIsValid, LogicalRepWorker::proc, LogicalRepWorker::relid, LogicalRepWorker::subid, LogicalRepWorker::type, LogicalRepCtxStruct::workers, and WORKERTYPE_TABLESYNC.

Referenced by ApplyLauncherMain(), FindDeletedTupleInLocalRel(), logicalrep_reset_seqsync_start_time(), logicalrep_worker_stop(), logicalrep_worker_wakeup(), ProcessSequencesForSync(), ProcessSyncingTablesForApply(), wait_for_table_state_change(), and wait_for_worker_state_change().

◆ logicalrep_worker_launch()

bool logicalrep_worker_launch ( LogicalRepWorkerType  wtype,
Oid  dbid,
Oid  subid,
const char *  subname,
Oid  userid,
Oid  relid,
dsm_handle  subworker_dsm,
bool  retain_dead_tuples 
)

Definition at line 324 of file launcher.c.

328{
330 BackgroundWorkerHandle *bgw_handle;
331 uint16 generation;
332 int i;
333 int slot = 0;
334 LogicalRepWorker *worker = NULL;
335 int nsyncworkers;
336 int nparallelapplyworkers;
338 bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
339 bool is_sequencesync_worker = (wtype == WORKERTYPE_SEQUENCESYNC);
340 bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
341
342 /*----------
343 * Sanity checks:
344 * - must be valid worker type
345 * - tablesync workers are only ones to have relid
346 * - parallel apply worker is the only kind of subworker
347 * - The replication slot used in conflict detection is created when
348 * retain_dead_tuples is enabled
349 */
350 Assert(wtype != WORKERTYPE_UNKNOWN);
351 Assert(is_tablesync_worker == OidIsValid(relid));
352 Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
353 Assert(!retain_dead_tuples || MyReplicationSlot);
354
356 (errmsg_internal("starting logical replication worker for subscription \"%s\"",
357 subname)));
358
359 /* Report this after the initial starting message for consistency. */
362 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
363 errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
364
365 /*
366 * We need to do the modification of the shared memory under lock so that
367 * we have consistent view.
368 */
369 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
370
371retry:
372 /* Find unused worker slot. */
373 for (i = 0; i < max_logical_replication_workers; i++)
374 {
376
377 if (!w->in_use)
378 {
379 worker = w;
380 slot = i;
381 break;
382 }
383 }
384
385 nsyncworkers = logicalrep_sync_worker_count(subid);
386
388
389 /*
390 * If we didn't find a free slot, try to do garbage collection. The
391 * reason we do this is because if some worker failed to start up and its
392 * parent has crashed while waiting, the in_use state was never cleared.
393 */
394 if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
395 {
396 bool did_cleanup = false;
397
398 for (i = 0; i < max_logical_replication_workers; i++)
399 {
401
402 /*
403 * If the worker was marked in use but didn't manage to attach in
404 * time, clean it up.
405 */
406 if (w->in_use && !w->proc &&
409 {
411 "logical replication worker for subscription %u took too long to start; canceled",
412 w->subid);
413
415 did_cleanup = true;
416 }
417 }
418
419 if (did_cleanup)
420 goto retry;
421 }
422
423 /*
424 * We don't allow to invoke more sync workers once we have reached the
425 * sync worker limit per subscription. So, just return silently as we
426 * might get here because of an otherwise harmless race condition.
427 */
428 if ((is_tablesync_worker || is_sequencesync_worker) &&
429 nsyncworkers >= max_sync_workers_per_subscription)
430 {
431 LWLockRelease(LogicalRepWorkerLock);
432 return false;
433 }
434
435 nparallelapplyworkers = logicalrep_pa_worker_count(subid);
436
437 /*
438 * Return false if the number of parallel apply workers reached the limit
439 * per subscription.
440 */
441 if (is_parallel_apply_worker &&
442 nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
443 {
444 LWLockRelease(LogicalRepWorkerLock);
445 return false;
446 }
447
448 /*
449 * However if there are no more free worker slots, inform user about it
450 * before exiting.
451 */
452 if (worker == NULL)
453 {
454 LWLockRelease(LogicalRepWorkerLock);
456 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
457 errmsg("out of logical replication worker slots"),
458 errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
459 return false;
460 }
461
462 /* Prepare the worker slot. */
463 worker->type = wtype;
464 worker->launch_time = now;
465 worker->in_use = true;
466 worker->generation++;
467 worker->proc = NULL;
468 worker->dbid = dbid;
469 worker->userid = userid;
470 worker->subid = subid;
471 worker->relid = relid;
472 worker->relstate = SUBREL_STATE_UNKNOWN;
474 worker->stream_fileset = NULL;
475 worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
476 worker->parallel_apply = is_parallel_apply_worker;
477 worker->oldest_nonremovable_xid = retain_dead_tuples
480 worker->last_lsn = InvalidXLogRecPtr;
485 worker->last_seqsync_start_time = 0;
486
487 /* Before releasing lock, remember generation for future identification. */
488 generation = worker->generation;
489
490 LWLockRelease(LogicalRepWorkerLock);
491
492 /* Register the new dynamic worker. */
493 memset(&bgw, 0, sizeof(bgw));
497 snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
498
499 switch (worker->type)
500 {
501 case WORKERTYPE_APPLY:
502 snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
504 "logical replication apply worker for subscription %u",
505 subid);
506 snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
507 break;
508
510 snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
512 "logical replication parallel apply worker for subscription %u",
513 subid);
514 snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
515
516 memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
517 break;
518
520 snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain");
522 "logical replication sequencesync worker for subscription %u",
523 subid);
524 snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker");
525 break;
526
528 snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain");
530 "logical replication tablesync worker for subscription %u sync %u",
531 subid,
532 relid);
533 snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
534 break;
535
537 /* Should never happen. */
538 elog(ERROR, "unknown worker type");
539 }
540
543 bgw.bgw_main_arg = Int32GetDatum(slot);
544
545 if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
546 {
547 /* Failed to start worker, so clean up the worker slot. */
548 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
549 Assert(generation == worker->generation);
551 LWLockRelease(LogicalRepWorkerLock);
552
554 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
555 errmsg("out of background worker slots"),
556 errhint("You might need to increase \"%s\".", "max_worker_processes")));
557 return false;
558 }
559
560 /* Now wait until it attaches. */
561 return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
562}
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition: bgworker.c:1049
#define BGW_NEVER_RESTART
Definition: bgworker.h:85
@ BgWorkerStart_RecoveryFinished
Definition: bgworker.h:81
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:60
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:53
#define BGW_MAXLEN
Definition: bgworker.h:86
#define TIMESTAMP_NOBEGIN(j)
Definition: timestamp.h:159
uint32 dsm_handle
Definition: dsm_impl.h:55
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1170
int errhint(const char *fmt,...)
Definition: elog.c:1330
int MyProcPid
Definition: globals.c:47
static int logicalrep_pa_worker_count(Oid subid)
Definition: launcher.c:951
static bool WaitForReplicationWorkerAttach(LogicalRepWorker *worker, uint16 generation, BackgroundWorkerHandle *handle)
Definition: launcher.c:181
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:927
int max_parallel_apply_workers_per_subscription
Definition: launcher.c:54
static void logicalrep_worker_cleanup(LogicalRepWorker *worker)
Definition: launcher.c:838
#define InvalidPid
Definition: miscadmin.h:32
int max_active_replication_origins
Definition: origin.c:104
NameData subname
#define snprintf
Definition: port.h:260
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:222
ReplicationSlot * MyReplicationSlot
Definition: slot.c:148
char bgw_function_name[BGW_MAXLEN]
Definition: bgworker.h:97
Datum bgw_main_arg
Definition: bgworker.h:98
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:91
int bgw_restart_time
Definition: bgworker.h:95
char bgw_type[BGW_MAXLEN]
Definition: bgworker.h:92
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:94
char bgw_extra[BGW_EXTRALEN]
Definition: bgworker.h:99
pid_t bgw_notify_pid
Definition: bgworker.h:100
char bgw_library_name[MAXPGPATH]
Definition: bgworker.h:96
XLogRecPtr relstate_lsn
TimestampTz last_recv_time
TimestampTz launch_time
TimestampTz reply_time
FileSet * stream_fileset
XLogRecPtr reply_lsn
TimestampTz last_send_time
TransactionId xmin
Definition: slot.h:114
ReplicationSlotPersistentData data
Definition: slot.h:210
#define InvalidTransactionId
Definition: transam.h:31
int wal_receiver_timeout
Definition: walreceiver.c:89
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert(), BackgroundWorker::bgw_extra, BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BGW_NEVER_RESTART, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BackgroundWorker::bgw_type, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, ReplicationSlot::data, LogicalRepWorker::dbid, DEBUG1, DSM_HANDLE_INVALID, elog, ereport, errcode(), errhint(), errmsg(), errmsg_internal(), ERROR, LogicalRepWorker::generation, GetCurrentTimestamp(), i, LogicalRepWorker::in_use, Int32GetDatum(), InvalidPid, InvalidTransactionId, InvalidXLogRecPtr, LogicalRepWorker::last_lsn, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, LogicalRepWorker::last_seqsync_start_time, LogicalRepWorker::launch_time, LogicalRepWorker::leader_pid, logicalrep_pa_worker_count(), logicalrep_sync_worker_count(), logicalrep_worker_cleanup(), LogicalRepCtx, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, max_logical_replication_workers, max_parallel_apply_workers_per_subscription, max_sync_workers_per_subscription, MAXPGPATH, MyProcPid, MyReplicationSlot, now(), OidIsValid, LogicalRepWorker::oldest_nonremovable_xid, LogicalRepWorker::parallel_apply, LogicalRepWorker::proc, RegisterDynamicBackgroundWorker(), LogicalRepWorker::relid, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, LogicalRepWorker::reply_lsn, LogicalRepWorker::reply_time, snprintf, LogicalRepWorker::stream_fileset, LogicalRepWorker::subid, subname, TIMESTAMP_NOBEGIN, TimestampDifferenceExceeds(), LogicalRepWorker::type, LogicalRepWorker::userid, WaitForReplicationWorkerAttach(), wal_receiver_timeout, WARNING, LogicalRepCtxStruct::workers, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, WORKERTYPE_SEQUENCESYNC, WORKERTYPE_TABLESYNC, WORKERTYPE_UNKNOWN, and ReplicationSlotPersistentData::xmin.

Referenced by ApplyLauncherMain(), launch_sync_worker(), and pa_launch_parallel_worker().

◆ logicalrep_worker_stop()

void logicalrep_worker_stop ( LogicalRepWorkerType  wtype,
Oid  subid,
Oid  relid 
)

Definition at line 652 of file launcher.c.

653{
654 LogicalRepWorker *worker;
655
656 /* relid must be valid only for table sync workers */
657 Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
658
659 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
660
661 worker = logicalrep_worker_find(wtype, subid, relid, false);
662
663 if (worker)
664 {
666 logicalrep_worker_stop_internal(worker, SIGTERM);
667 }
668
669 LWLockRelease(LogicalRepWorkerLock);
670}

References Assert(), isParallelApplyWorker, logicalrep_worker_find(), logicalrep_worker_stop_internal(), LW_SHARED, LWLockAcquire(), LWLockRelease(), OidIsValid, and WORKERTYPE_TABLESYNC.

Referenced by AlterSubscription_refresh(), and DropSubscription().

◆ logicalrep_worker_wakeup()

void logicalrep_worker_wakeup ( LogicalRepWorkerType  wtype,
Oid  subid,
Oid  relid 
)

Definition at line 723 of file launcher.c.

724{
725 LogicalRepWorker *worker;
726
727 /* relid must be valid only for table sync workers */
728 Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
729
730 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
731
732 worker = logicalrep_worker_find(wtype, subid, relid, true);
733
734 if (worker)
736
737 LWLockRelease(LogicalRepWorkerLock);
738}
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:746

References Assert(), logicalrep_worker_find(), logicalrep_worker_wakeup_ptr(), LW_SHARED, LWLockAcquire(), LWLockRelease(), OidIsValid, and WORKERTYPE_TABLESYNC.

Referenced by apply_handle_stream_start(), and FinishSyncWorker().

◆ logicalrep_worker_wakeup_ptr()

void logicalrep_worker_wakeup_ptr ( LogicalRepWorker worker)

Definition at line 746 of file launcher.c.

747{
748 Assert(LWLockHeldByMe(LogicalRepWorkerLock));
749
750 SetLatch(&worker->proc->procLatch);
751}
void SetLatch(Latch *latch)
Definition: latch.c:290
Latch procLatch
Definition: proc.h:186

References Assert(), LWLockHeldByMe(), LogicalRepWorker::proc, PGPROC::procLatch, and SetLatch().

Referenced by AtEOXact_LogicalRepWorkers(), logicalrep_worker_wakeup(), ProcessSyncingTablesForApply(), and wait_for_worker_state_change().

◆ logicalrep_workers_find()

List * logicalrep_workers_find ( Oid  subid,
bool  only_running,
bool  acquire_lock 
)

Definition at line 293 of file launcher.c.

294{
295 int i;
296 List *res = NIL;
297
298 if (acquire_lock)
299 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
300
301 Assert(LWLockHeldByMe(LogicalRepWorkerLock));
302
303 /* Search for attached worker for a given subscription id. */
304 for (i = 0; i < max_logical_replication_workers; i++)
305 {
307
308 if (w->in_use && w->subid == subid && (!only_running || w->proc))
309 res = lappend(res, w);
310 }
311
312 if (acquire_lock)
313 LWLockRelease(LogicalRepWorkerLock);
314
315 return res;
316}

References Assert(), i, LogicalRepWorker::in_use, lappend(), LogicalRepCtx, LW_SHARED, LWLockAcquire(), LWLockHeldByMe(), LWLockRelease(), max_logical_replication_workers, NIL, LogicalRepWorker::proc, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by AlterSubscription(), AtEOXact_LogicalRepWorkers(), DropSubscription(), and logicalrep_worker_detach().

◆ maybe_reread_subscription()

void maybe_reread_subscription ( void  )

Definition at line 5038 of file worker.c.

5039{
5040 MemoryContext oldctx;
5042 bool started_tx = false;
5043
5044 /* When cache state is valid there is nothing to do here. */
5046 return;
5047
5048 /* This function might be called inside or outside of transaction. */
5049 if (!IsTransactionState())
5050 {
5052 started_tx = true;
5053 }
5054
5055 /* Ensure allocations in permanent context. */
5057
5059
5060 /*
5061 * Exit if the subscription was removed. This normally should not happen
5062 * as the worker gets killed during DROP SUBSCRIPTION.
5063 */
5064 if (!newsub)
5065 {
5066 ereport(LOG,
5067 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
5068 MySubscription->name)));
5069
5070 /* Ensure we remove no-longer-useful entry for worker's start time */
5073
5074 proc_exit(0);
5075 }
5076
5077 /* Exit if the subscription was disabled. */
5078 if (!newsub->enabled)
5079 {
5080 ereport(LOG,
5081 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
5082 MySubscription->name)));
5083
5085 }
5086
5087 /* !slotname should never happen when enabled is true. */
5088 Assert(newsub->slotname);
5089
5090 /* two-phase cannot be altered while the worker is running */
5091 Assert(newsub->twophasestate == MySubscription->twophasestate);
5092
5093 /*
5094 * Exit if any parameter that affects the remote connection was changed.
5095 * The launcher will start a new worker but note that the parallel apply
5096 * worker won't restart if the streaming option's value is changed from
5097 * 'parallel' to any other value or the server decides not to stream the
5098 * in-progress transaction.
5099 */
5100 if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
5101 strcmp(newsub->name, MySubscription->name) != 0 ||
5102 strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
5103 newsub->binary != MySubscription->binary ||
5104 newsub->stream != MySubscription->stream ||
5105 newsub->passwordrequired != MySubscription->passwordrequired ||
5106 strcmp(newsub->origin, MySubscription->origin) != 0 ||
5107 newsub->owner != MySubscription->owner ||
5108 !equal(newsub->publications, MySubscription->publications))
5109 {
5111 ereport(LOG,
5112 (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
5113 MySubscription->name)));
5114 else
5115 ereport(LOG,
5116 (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
5117 MySubscription->name)));
5118
5120 }
5121
5122 /*
5123 * Exit if the subscription owner's superuser privileges have been
5124 * revoked.
5125 */
5126 if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
5127 {
5129 ereport(LOG,
5130 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
5132 else
5133 ereport(LOG,
5134 errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
5136
5138 }
5139
5140 /* Check for other changes that should never happen too. */
5141 if (newsub->dbid != MySubscription->dbid)
5142 {
5143 elog(ERROR, "subscription %u changed unexpectedly",
5145 }
5146
5147 /* Clean old subscription info and switch to new one. */
5150
5151 MemoryContextSwitchTo(oldctx);
5152
5153 /* Change synchronous commit according to the user's wishes */
5154 SetConfigOption("synchronous_commit", MySubscription->synccommit,
5156
5157 if (started_tx)
5159
5160 MySubscriptionValid = true;
5161}
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:223
void FreeSubscription(Subscription *sub)
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389

References am_leader_apply_worker(), am_parallel_apply_worker(), apply_worker_exit(), ApplyContext, ApplyLauncherForgetWorkerStartTime(), Assert(), Subscription::binary, CommitTransactionCommand(), Subscription::conninfo, Subscription::dbid, elog, equal(), ereport, errmsg(), ERROR, FreeSubscription(), GetSubscription(), IsTransactionState(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, newsub(), Subscription::origin, Subscription::owner, Subscription::ownersuperuser, Subscription::passwordrequired, PGC_BACKEND, PGC_S_OVERRIDE, proc_exit(), Subscription::publications, SetConfigOption(), Subscription::slotname, StartTransactionCommand(), Subscription::stream, LogicalRepWorker::subid, Subscription::synccommit, and Subscription::twophasestate.

Referenced by apply_handle_commit_internal(), begin_replication_step(), LogicalRepApplyLoop(), and pa_can_start().

◆ pa_allocate_worker()

void pa_allocate_worker ( TransactionId  xid)

Definition at line 471 of file applyparallelworker.c.

472{
473 bool found;
474 ParallelApplyWorkerInfo *winfo = NULL;
476
477 if (!pa_can_start())
478 return;
479
481 if (!winfo)
482 return;
483
484 /* First time through, initialize parallel apply worker state hashtable. */
486 {
487 HASHCTL ctl;
488
489 MemSet(&ctl, 0, sizeof(ctl));
490 ctl.keysize = sizeof(TransactionId);
491 ctl.entrysize = sizeof(ParallelApplyWorkerEntry);
492 ctl.hcxt = ApplyContext;
493
494 ParallelApplyTxnHash = hash_create("logical replication parallel apply workers hash",
495 16, &ctl,
497 }
498
499 /* Create an entry for the requested transaction. */
500 entry = hash_search(ParallelApplyTxnHash, &xid, HASH_ENTER, &found);
501 if (found)
502 elog(ERROR, "hash table corrupted");
503
504 /* Update the transaction information in shared memory. */
505 SpinLockAcquire(&winfo->shared->mutex);
507 winfo->shared->xid = xid;
508 SpinLockRelease(&winfo->shared->mutex);
509
510 winfo->in_use = true;
511 winfo->serialize_changes = false;
512 entry->winfo = winfo;
513}
struct ParallelApplyWorkerEntry ParallelApplyWorkerEntry
static bool pa_can_start(void)
static HTAB * ParallelApplyTxnHash
static ParallelApplyWorkerInfo * pa_launch_parallel_worker(void)
#define MemSet(start, val, len)
Definition: c.h:1022
uint32 TransactionId
Definition: c.h:660
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:952
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:358
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
tree ctl
Definition: radixtree.h:1838
ParallelApplyWorkerInfo * winfo
ParallelTransState xact_state

References ApplyContext, ctl, elog, ERROR, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), ParallelApplyWorkerInfo::in_use, MemSet, ParallelApplyWorkerShared::mutex, pa_can_start(), pa_launch_parallel_worker(), PARALLEL_TRANS_UNKNOWN, ParallelApplyTxnHash, ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, SpinLockAcquire, SpinLockRelease, ParallelApplyWorkerEntry::winfo, ParallelApplyWorkerShared::xact_state, and ParallelApplyWorkerShared::xid.

Referenced by apply_handle_stream_start().

◆ pa_decr_and_wait_stream_block()

void pa_decr_and_wait_stream_block ( void  )

Definition at line 1599 of file applyparallelworker.c.

1600{
1602
1603 /*
1604 * It is only possible to not have any pending stream chunks when we are
1605 * applying spooled messages.
1606 */
1608 {
1610 return;
1611
1612 elog(ERROR, "invalid pending streaming chunk 0");
1613 }
1614
1616 {
1619 }
1620}
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerShared * MyParallelShared
static bool pa_has_spooled_message_pending(void)
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
Definition: atomics.h:437
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
Definition: atomics.h:237
pg_atomic_uint32 pending_stream_count

References AccessShareLock, am_parallel_apply_worker(), Assert(), elog, ERROR, MyParallelShared, pa_has_spooled_message_pending(), pa_lock_stream(), pa_unlock_stream(), ParallelApplyWorkerShared::pending_stream_count, pg_atomic_read_u32(), pg_atomic_sub_fetch_u32(), and ParallelApplyWorkerShared::xid.

Referenced by apply_handle_stream_abort(), and apply_handle_stream_stop().

◆ pa_detach_all_error_mq()

void pa_detach_all_error_mq ( void  )

Definition at line 623 of file applyparallelworker.c.

624{
625 ListCell *lc;
626
627 foreach(lc, ParallelApplyWorkerPool)
628 {
630
631 if (winfo->error_mq_handle)
632 {
634 winfo->error_mq_handle = NULL;
635 }
636 }
637}
static List * ParallelApplyWorkerPool
#define lfirst(lc)
Definition: pg_list.h:172

References ParallelApplyWorkerInfo::error_mq_handle, lfirst, ParallelApplyWorkerPool, and shm_mq_detach().

Referenced by logicalrep_worker_detach().

◆ pa_find_worker()

ParallelApplyWorkerInfo * pa_find_worker ( TransactionId  xid)

Definition at line 519 of file applyparallelworker.c.

520{
521 bool found;
523
524 if (!TransactionIdIsValid(xid))
525 return NULL;
526
528 return NULL;
529
530 /* Return the cached parallel apply worker if valid. */
532 return stream_apply_worker;
533
534 /* Find an entry for the requested transaction. */
535 entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
536 if (found)
537 {
538 /* The worker must not have exited. */
539 Assert(entry->winfo->in_use);
540 return entry->winfo;
541 }
542
543 return NULL;
544}
static ParallelApplyWorkerInfo * stream_apply_worker
@ HASH_FIND
Definition: hsearch.h:113

References Assert(), HASH_FIND, hash_search(), ParallelApplyWorkerInfo::in_use, ParallelApplyTxnHash, stream_apply_worker, TransactionIdIsValid, and ParallelApplyWorkerEntry::winfo.

Referenced by get_transaction_apply_action().

◆ pa_lock_stream()

void pa_lock_stream ( TransactionId  xid,
LOCKMODE  lockmode 
)

◆ pa_lock_transaction()

void pa_lock_transaction ( TransactionId  xid,
LOCKMODE  lockmode 
)

◆ pa_reset_subtrans()

void pa_reset_subtrans ( void  )

Definition at line 1410 of file applyparallelworker.c.

1411{
1412 /*
1413 * We don't need to free this explicitly as the allocated memory will be
1414 * freed at the transaction end.
1415 */
1416 subxactlist = NIL;
1417}
static List * subxactlist

References NIL, and subxactlist.

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), and pa_stream_abort().

◆ pa_send_data()

bool pa_send_data ( ParallelApplyWorkerInfo winfo,
Size  nbytes,
const void *  data 
)

Definition at line 1154 of file applyparallelworker.c.

1155{
1156 int rc;
1157 shm_mq_result result;
1158 TimestampTz startTime = 0;
1159
1161 Assert(!winfo->serialize_changes);
1162
1163 /*
1164 * We don't try to send data to parallel worker for 'immediate' mode. This
1165 * is primarily used for testing purposes.
1166 */
1168 return false;
1169
1170/*
1171 * This timeout is a bit arbitrary but testing revealed that it is sufficient
1172 * to send the message unless the parallel apply worker is waiting on some
1173 * lock or there is a serious resource crunch. See the comments atop this file
1174 * to know why we are using a non-blocking way to send the message.
1175 */
1176#define SHM_SEND_RETRY_INTERVAL_MS 1000
1177#define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)
1178
1179 for (;;)
1180 {
1181 result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true);
1182
1183 if (result == SHM_MQ_SUCCESS)
1184 return true;
1185 else if (result == SHM_MQ_DETACHED)
1186 ereport(ERROR,
1187 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1188 errmsg("could not send data to shared-memory queue")));
1189
1190 Assert(result == SHM_MQ_WOULD_BLOCK);
1191
1192 /* Wait before retrying. */
1193 rc = WaitLatch(MyLatch,
1196 WAIT_EVENT_LOGICAL_APPLY_SEND_DATA);
1197
1198 if (rc & WL_LATCH_SET)
1199 {
1202 }
1203
1204 if (startTime == 0)
1205 startTime = GetCurrentTimestamp();
1206 else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
1208 return false;
1209 }
1210}
#define SHM_SEND_TIMEOUT_MS
#define SHM_SEND_RETRY_INTERVAL_MS
#define unlikely(x)
Definition: c.h:407
struct Latch * MyLatch
Definition: globals.c:63
void ResetLatch(Latch *latch)
Definition: latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:172
const void * data
int debug_logical_replication_streaming
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
Definition: reorderbuffer.h:34
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Definition: shm_mq.c:329
shm_mq_result
Definition: shm_mq.h:37
@ SHM_MQ_SUCCESS
Definition: shm_mq.h:38
@ SHM_MQ_WOULD_BLOCK
Definition: shm_mq.h:39
@ SHM_MQ_DETACHED
Definition: shm_mq.h:40
shm_mq_handle * mq_handle
#define WL_TIMEOUT
Definition: waiteventset.h:37
#define WL_EXIT_ON_PM_DEATH
Definition: waiteventset.h:39
#define WL_LATCH_SET
Definition: waiteventset.h:34

References Assert(), CHECK_FOR_INTERRUPTS, data, DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE, debug_logical_replication_streaming, ereport, errcode(), errmsg(), ERROR, GetCurrentTimestamp(), IsTransactionState(), ParallelApplyWorkerInfo::mq_handle, MyLatch, ResetLatch(), ParallelApplyWorkerInfo::serialize_changes, SHM_MQ_DETACHED, shm_mq_send(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, SHM_SEND_RETRY_INTERVAL_MS, SHM_SEND_TIMEOUT_MS, TimestampDifferenceExceeds(), unlikely, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().

◆ pa_set_fileset_state()

◆ pa_set_stream_apply_worker()

void pa_set_stream_apply_worker ( ParallelApplyWorkerInfo winfo)

Definition at line 1342 of file applyparallelworker.c.

1343{
1344 stream_apply_worker = winfo;
1345}

References stream_apply_worker.

Referenced by apply_handle_stream_start(), and apply_handle_stream_stop().

◆ pa_set_xact_state()

void pa_set_xact_state ( ParallelApplyWorkerShared wshared,
ParallelTransState  xact_state 
)

◆ pa_start_subtrans()

void pa_start_subtrans ( TransactionId  current_xid,
TransactionId  top_xid 
)

Definition at line 1370 of file applyparallelworker.c.

1371{
1372 if (current_xid != top_xid &&
1373 !list_member_xid(subxactlist, current_xid))
1374 {
1375 MemoryContext oldctx;
1376 char spname[NAMEDATALEN];
1377
1378 pa_savepoint_name(MySubscription->oid, current_xid,
1379 spname, sizeof(spname));
1380
1381 elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
1382
1383 /* We must be in transaction block to define the SAVEPOINT. */
1384 if (!IsTransactionBlock())
1385 {
1386 if (!IsTransactionState())
1388
1391 }
1392
1393 DefineSavepoint(spname);
1394
1395 /*
1396 * CommitTransactionCommand is needed to start a subtransaction after
1397 * issuing a SAVEPOINT inside a transaction block (see
1398 * StartSubTransaction()).
1399 */
1401
1403 subxactlist = lappend_xid(subxactlist, current_xid);
1404 MemoryContextSwitchTo(oldctx);
1405 }
1406}
static void pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
List * lappend_xid(List *list, TransactionId datum)
Definition: list.c:393
bool list_member_xid(const List *list, TransactionId datum)
Definition: list.c:742
#define NAMEDATALEN
void DefineSavepoint(const char *name)
Definition: xact.c:4391
bool IsTransactionBlock(void)
Definition: xact.c:4989
void BeginTransactionBlock(void)
Definition: xact.c:3942

References BeginTransactionBlock(), CommitTransactionCommand(), DEBUG1, DefineSavepoint(), elog, IsTransactionBlock(), IsTransactionState(), lappend_xid(), list_member_xid(), MemoryContextSwitchTo(), MySubscription, NAMEDATALEN, Subscription::oid, pa_savepoint_name(), StartTransactionCommand(), subxactlist, and TopTransactionContext.

Referenced by handle_streamed_transaction().

◆ pa_stream_abort()

void pa_stream_abort ( LogicalRepStreamAbortData abort_data)

Definition at line 1424 of file applyparallelworker.c.

1425{
1426 TransactionId xid = abort_data->xid;
1427 TransactionId subxid = abort_data->subxid;
1428
1429 /*
1430 * Update origin state so we can restart streaming from correct position
1431 * in case of crash.
1432 */
1435
1436 /*
1437 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1438 * just free the subxactlist.
1439 */
1440 if (subxid == xid)
1441 {
1443
1444 /*
1445 * Release the lock as we might be processing an empty streaming
1446 * transaction in which case the lock won't be released during
1447 * transaction rollback.
1448 *
1449 * Note that it's ok to release the transaction lock before aborting
1450 * the transaction because even if the parallel apply worker dies due
1451 * to crash or some other reason, such a transaction would still be
1452 * considered aborted.
1453 */
1455
1457
1458 if (IsTransactionBlock())
1459 {
1460 EndTransactionBlock(false);
1462 }
1463
1465
1467 }
1468 else
1469 {
1470 /* OK, so it's a subxact. Rollback to the savepoint. */
1471 int i;
1472 char spname[NAMEDATALEN];
1473
1474 pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname));
1475
1476 elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
1477
1478 /*
1479 * Search the subxactlist, determine the offset tracked for the
1480 * subxact, and truncate the list.
1481 *
1482 * Note that for an empty sub-transaction we won't find the subxid
1483 * here.
1484 */
1485 for (i = list_length(subxactlist) - 1; i >= 0; i--)
1486 {
1488
1489 if (xid_tmp == subxid)
1490 {
1491 RollbackToSavepoint(spname);
1494 break;
1495 }
1496 }
1497 }
1498}
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_reset_subtrans(void)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
@ STATE_IDLE
List * list_truncate(List *list, int new_size)
Definition: list.c:631
#define AccessExclusiveLock
Definition: lockdefs.h:43
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:165
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:164
static int list_length(const List *l)
Definition: pg_list.h:152
static ListCell * list_nth_cell(const List *list, int n)
Definition: pg_list.h:277
#define lfirst_xid(lc)
Definition: pg_list.h:175
void RollbackToSavepoint(const char *name)
Definition: xact.c:4585
bool EndTransactionBlock(bool chain)
Definition: xact.c:4062
void AbortCurrentTransaction(void)
Definition: xact.c:3469

References LogicalRepStreamAbortData::abort_lsn, LogicalRepStreamAbortData::abort_time, AbortCurrentTransaction(), AccessExclusiveLock, CommitTransactionCommand(), DEBUG1, elog, EndTransactionBlock(), i, IsTransactionBlock(), lfirst_xid, list_length(), list_nth_cell(), list_truncate(), MyParallelShared, MySubscription, NAMEDATALEN, Subscription::oid, pa_reset_subtrans(), pa_savepoint_name(), pa_set_xact_state(), pa_unlock_transaction(), PARALLEL_TRANS_FINISHED, pgstat_report_activity(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, RollbackToSavepoint(), STATE_IDLE, subxactlist, LogicalRepStreamAbortData::subxid, and LogicalRepStreamAbortData::xid.

Referenced by apply_handle_stream_abort().

◆ pa_switch_to_partial_serialize()

void pa_switch_to_partial_serialize ( ParallelApplyWorkerInfo winfo,
bool  stream_locked 
)

Definition at line 1219 of file applyparallelworker.c.

1221{
1222 ereport(LOG,
1223 (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
1224 winfo->shared->xid)));
1225
1226 /*
1227 * The parallel apply worker could be stuck for some reason (say waiting
1228 * on some lock by other backend), so stop trying to send data directly to
1229 * it and start serializing data to the file instead.
1230 */
1231 winfo->serialize_changes = true;
1232
1233 /* Initialize the stream fileset. */
1234 stream_start_internal(winfo->shared->xid, true);
1235
1236 /*
1237 * Acquires the stream lock if not already to make sure that the parallel
1238 * apply worker will wait for the leader to release the stream lock until
1239 * the end of the transaction.
1240 */
1241 if (!stream_locked)
1243
1245}
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void stream_start_internal(TransactionId xid, bool first_segment)
Definition: worker.c:1687

References AccessExclusiveLock, ereport, errmsg(), FS_SERIALIZE_IN_PROGRESS, LOG, pa_lock_stream(), pa_set_fileset_state(), ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, stream_start_internal(), and ParallelApplyWorkerShared::xid.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().

◆ pa_unlock_stream()

void pa_unlock_stream ( TransactionId  xid,
LOCKMODE  lockmode 
)

◆ pa_unlock_transaction()

◆ pa_xact_finish()

void pa_xact_finish ( ParallelApplyWorkerInfo winfo,
XLogRecPtr  remote_lsn 
)

Definition at line 1626 of file applyparallelworker.c.

1627{
1629
1630 /*
1631 * Unlock the shared object lock so that parallel apply worker can
1632 * continue to receive and apply changes.
1633 */
1635
1636 /*
1637 * Wait for that worker to finish. This is necessary to maintain commit
1638 * order which avoids failures due to transaction dependencies and
1639 * deadlocks.
1640 */
1642
1643 if (XLogRecPtrIsValid(remote_lsn))
1644 store_flush_position(remote_lsn, winfo->shared->last_commit_end);
1645
1646 pa_free_worker(winfo);
1647}
static void pa_free_worker(ParallelApplyWorkerInfo *winfo)
static void pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition: worker.c:3939

References AccessExclusiveLock, am_leader_apply_worker(), Assert(), ParallelApplyWorkerShared::last_commit_end, pa_free_worker(), pa_unlock_stream(), pa_wait_for_xact_finish(), ParallelApplyWorkerInfo::shared, store_flush_position(), ParallelApplyWorkerShared::xid, and XLogRecPtrIsValid.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), and apply_handle_stream_prepare().

◆ ProcessSequencesForSync()

void ProcessSequencesForSync ( void  )

Definition at line 94 of file sequencesync.c.

95{
96 LogicalRepWorker *sequencesync_worker;
97 int nsyncworkers;
98 bool has_pending_sequences;
99 bool started_tx;
100
101 FetchRelationStates(NULL, &has_pending_sequences, &started_tx);
102
103 if (started_tx)
104 {
106 pgstat_report_stat(true);
107 }
108
109 if (!has_pending_sequences)
110 return;
111
112 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
113
114 /* Check if there is a sequencesync worker already running? */
117 InvalidOid, true);
118 if (sequencesync_worker)
119 {
120 LWLockRelease(LogicalRepWorkerLock);
121 return;
122 }
123
124 /*
125 * Count running sync workers for this subscription, while we have the
126 * lock.
127 */
129 LWLockRelease(LogicalRepWorkerLock);
130
131 /*
132 * It is okay to read/update last_seqsync_start_time here in apply worker
133 * as we have already ensured that sync worker doesn't exist.
134 */
137}
void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, TimestampTz *last_start_time)
Definition: syncutils.c:117

References CommitTransactionCommand(), FetchRelationStates(), InvalidOid, LogicalRepWorker::last_seqsync_start_time, launch_sync_worker(), logicalrep_sync_worker_count(), logicalrep_worker_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLogicalRepWorker, pgstat_report_stat(), LogicalRepWorker::subid, and WORKERTYPE_SEQUENCESYNC.

Referenced by ProcessSyncingRelations().

◆ ProcessSyncingRelations()

void ProcessSyncingRelations ( XLogRecPtr  current_lsn)

Definition at line 155 of file syncutils.c.

156{
157 switch (MyLogicalRepWorker->type)
158 {
160
161 /*
162 * Skip for parallel apply workers because they only operate on
163 * tables that are in a READY state. See pa_can_start() and
164 * should_apply_changes_for_rel().
165 */
166 break;
167
169 ProcessSyncingTablesForSync(current_lsn);
170 break;
171
172 case WORKERTYPE_APPLY:
173 ProcessSyncingTablesForApply(current_lsn);
175 break;
176
178 /* Should never happen. */
179 elog(ERROR, "sequence synchronization worker is not expected to process relations");
180 break;
181
183 /* Should never happen. */
184 elog(ERROR, "Unknown worker type");
185 }
186}
void ProcessSequencesForSync(void)
Definition: sequencesync.c:94
void ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
Definition: tablesync.c:244
void ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
Definition: tablesync.c:368

References elog, ERROR, MyLogicalRepWorker, ProcessSequencesForSync(), ProcessSyncingTablesForApply(), ProcessSyncingTablesForSync(), LogicalRepWorker::type, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, WORKERTYPE_SEQUENCESYNC, WORKERTYPE_TABLESYNC, and WORKERTYPE_UNKNOWN.

Referenced by apply_handle_commit(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), apply_handle_stream_commit(), apply_handle_stream_prepare(), and LogicalRepApplyLoop().

◆ ProcessSyncingTablesForApply()

void ProcessSyncingTablesForApply ( XLogRecPtr  current_lsn)

Definition at line 368 of file tablesync.c.

369{
370 struct tablesync_start_time_mapping
371 {
372 Oid relid;
373 TimestampTz last_start_time;
374 };
375 static HTAB *last_start_times = NULL;
376 ListCell *lc;
377 bool started_tx;
378 bool should_exit = false;
379 Relation rel = NULL;
380
382
383 /* We need up-to-date sync state info for subscription tables here. */
384 FetchRelationStates(NULL, NULL, &started_tx);
385
386 /*
387 * Prepare a hash table for tracking last start times of workers, to avoid
388 * immediate restarts. We don't need it if there are no tables that need
389 * syncing.
390 */
392 {
393 HASHCTL ctl;
394
395 ctl.keysize = sizeof(Oid);
396 ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
397 last_start_times = hash_create("Logical replication table sync worker start times",
398 256, &ctl, HASH_ELEM | HASH_BLOBS);
399 }
400
401 /*
402 * Clean up the hash table when we're done with all tables (just to
403 * release the bit of memory).
404 */
406 {
408 last_start_times = NULL;
409 }
410
411 /*
412 * Process all tables that are being synchronized.
413 */
414 foreach(lc, table_states_not_ready)
415 {
417
418 if (!started_tx)
419 {
421 started_tx = true;
422 }
423
424 Assert(get_rel_relkind(rstate->relid) != RELKIND_SEQUENCE);
425
426 if (rstate->state == SUBREL_STATE_SYNCDONE)
427 {
428 /*
429 * Apply has caught up to the position where the table sync has
430 * finished. Mark the table as ready so that the apply will just
431 * continue to replicate it normally.
432 */
433 if (current_lsn >= rstate->lsn)
434 {
435 char originname[NAMEDATALEN];
436
437 rstate->state = SUBREL_STATE_READY;
438 rstate->lsn = current_lsn;
439
440 /*
441 * Remove the tablesync origin tracking if exists.
442 *
443 * There is a chance that the user is concurrently performing
444 * refresh for the subscription where we remove the table
445 * state and its origin or the tablesync worker would have
446 * already removed this origin. We can't rely on tablesync
447 * worker to remove the origin tracking as if there is any
448 * error while dropping we won't restart it to drop the
449 * origin. So passing missing_ok = true.
450 *
451 * Lock the subscription and origin in the same order as we
452 * are doing during DDL commands to avoid deadlocks. See
453 * AlterSubscription_refresh.
454 */
455 LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
456 0, AccessShareLock);
457
458 if (!rel)
459 rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
460
462 rstate->relid,
463 originname,
464 sizeof(originname));
465 replorigin_drop_by_name(originname, true, false);
466
467 /*
468 * Update the state to READY only after the origin cleanup.
469 */
471 rstate->relid, rstate->state,
472 rstate->lsn, true);
473 }
474 }
475 else
476 {
477 LogicalRepWorker *syncworker;
478
479 /*
480 * Look for a sync worker for this relation.
481 */
482 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
483
486 rstate->relid, false);
487
488 if (syncworker)
489 {
490 /* Found one, update our copy of its state */
491 SpinLockAcquire(&syncworker->relmutex);
492 rstate->state = syncworker->relstate;
493 rstate->lsn = syncworker->relstate_lsn;
494 if (rstate->state == SUBREL_STATE_SYNCWAIT)
495 {
496 /*
497 * Sync worker is waiting for apply. Tell sync worker it
498 * can catchup now.
499 */
500 syncworker->relstate = SUBREL_STATE_CATCHUP;
501 syncworker->relstate_lsn =
502 Max(syncworker->relstate_lsn, current_lsn);
503 }
504 SpinLockRelease(&syncworker->relmutex);
505
506 /* If we told worker to catch up, wait for it. */
507 if (rstate->state == SUBREL_STATE_SYNCWAIT)
508 {
509 /* Signal the sync worker, as it may be waiting for us. */
510 if (syncworker->proc)
512
513 /* Now safe to release the LWLock */
514 LWLockRelease(LogicalRepWorkerLock);
515
516 if (started_tx)
517 {
518 /*
519 * We must commit the existing transaction to release
520 * the existing locks before entering a busy loop.
521 * This is required to avoid any undetected deadlocks
522 * due to any existing lock as deadlock detector won't
523 * be able to detect the waits on the latch.
524 *
525 * Also close any tables prior to the commit.
526 */
527 if (rel)
528 {
529 table_close(rel, NoLock);
530 rel = NULL;
531 }
533 pgstat_report_stat(false);
534 }
535
536 /*
537 * Enter busy loop and wait for synchronization worker to
538 * reach expected state (or die trying).
539 */
541 started_tx = true;
542
544 SUBREL_STATE_SYNCDONE);
545 }
546 else
547 LWLockRelease(LogicalRepWorkerLock);
548 }
549 else
550 {
551 /*
552 * If there is no sync worker for this table yet, count
553 * running sync workers for this subscription, while we have
554 * the lock.
555 */
556 int nsyncworkers =
558 struct tablesync_start_time_mapping *hentry;
559 bool found;
560
561 /* Now safe to release the LWLock */
562 LWLockRelease(LogicalRepWorkerLock);
563
564 hentry = hash_search(last_start_times, &rstate->relid,
565 HASH_ENTER, &found);
566 if (!found)
567 hentry->last_start_time = 0;
568
570 rstate->relid, &hentry->last_start_time);
571 }
572 }
573 }
574
575 /* Close table if opened */
576 if (rel)
577 table_close(rel, NoLock);
578
579
580 if (started_tx)
581 {
582 /*
583 * Even when the two_phase mode is requested by the user, it remains
584 * as 'pending' until all tablesyncs have reached READY state.
585 *
586 * When this happens, we restart the apply worker and (if the
587 * conditions are still ok) then the two_phase tri-state will become
588 * 'enabled' at that time.
589 *
590 * Note: If the subscription has no tables then leave the state as
591 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
592 * work.
593 */
594 if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
595 {
596 CommandCounterIncrement(); /* make updates visible */
597 if (AllTablesyncsReady())
598 {
599 ereport(LOG,
600 (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
602 should_exit = true;
603 }
604 }
605
607 pgstat_report_stat(true);
608 }
609
610 if (should_exit)
611 {
612 /*
613 * Reset the last-start time for this worker so that the launcher will
614 * restart it without waiting for wal_retrieve_retry_interval.
615 */
617
618 proc_exit(0);
619 }
620}
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:641
#define Max(x, y)
Definition: c.h:1000
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865
static dshash_table * last_start_times
Definition: launcher.c:91
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition: origin.c:439
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)
unsigned int Oid
Definition: postgres_ext.h:32
Definition: dynahash.c:222
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
bool AllTablesyncsReady(void)
Definition: tablesync.c:1600
static bool wait_for_table_state_change(Oid relid, char expected_state)
Definition: tablesync.c:140
void CommandCounterIncrement(void)
Definition: xact.c:1101

References AccessShareLock, AllTablesyncsReady(), ApplyLauncherForgetWorkerStartTime(), Assert(), CommandCounterIncrement(), CommitTransactionCommand(), ctl, ereport, errmsg(), FetchRelationStates(), get_rel_relkind(), HASH_BLOBS, hash_create(), hash_destroy(), HASH_ELEM, HASH_ENTER, hash_search(), IsTransactionState(), last_start_times, launch_sync_worker(), lfirst, LockSharedObject(), LOG, logicalrep_sync_worker_count(), logicalrep_worker_find(), logicalrep_worker_wakeup_ptr(), SubscriptionRelState::lsn, LW_SHARED, LWLockAcquire(), LWLockRelease(), Max, MyLogicalRepWorker, MySubscription, Subscription::name, NAMEDATALEN, NIL, NoLock, Subscription::oid, pgstat_report_stat(), LogicalRepWorker::proc, proc_exit(), SubscriptionRelState::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForLogicalRep(), replorigin_drop_by_name(), RowExclusiveLock, SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), SubscriptionRelState::state, LogicalRepWorker::subid, table_close(), table_open(), table_states_not_ready, Subscription::twophasestate, UpdateSubscriptionRelState(), wait_for_table_state_change(), and WORKERTYPE_TABLESYNC.

Referenced by ProcessSyncingRelations().

◆ ProcessSyncingTablesForSync()

void ProcessSyncingTablesForSync ( XLogRecPtr  current_lsn)

Definition at line 244 of file tablesync.c.

245{
247
248 if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
249 current_lsn >= MyLogicalRepWorker->relstate_lsn)
250 {
251 TimeLineID tli;
252 char syncslotname[NAMEDATALEN] = {0};
253 char originname[NAMEDATALEN] = {0};
254
255 MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
256 MyLogicalRepWorker->relstate_lsn = current_lsn;
257
259
260 /*
261 * UpdateSubscriptionRelState must be called within a transaction.
262 */
263 if (!IsTransactionState())
265
270 false);
271
272 /*
273 * End streaming so that LogRepWorkerWalRcvConn can be used to drop
274 * the slot.
275 */
277
278 /*
279 * Cleanup the tablesync slot.
280 *
281 * This has to be done after updating the state because otherwise if
282 * there is an error while doing the database operations we won't be
283 * able to rollback dropped slot.
284 */
287 syncslotname,
288 sizeof(syncslotname));
289
290 /*
291 * It is important to give an error if we are unable to drop the slot,
292 * otherwise, it won't be dropped till the corresponding subscription
293 * is dropped. So passing missing_ok = false.
294 */
296
298 pgstat_report_stat(false);
299
300 /*
301 * Start a new transaction to clean up the tablesync origin tracking.
302 * This transaction will be ended within the FinishSyncWorker(). Now,
303 * even, if we fail to remove this here, the apply worker will ensure
304 * to clean it up afterward.
305 *
306 * We need to do this after the table state is set to SYNCDONE.
307 * Otherwise, if an error occurs while performing the database
308 * operation, the worker will be restarted and the in-memory state of
309 * replication progress (remote_lsn) won't be rolled-back which would
310 * have been cleared before restart. So, the restarted worker will use
311 * invalid replication progress state resulting in replay of
312 * transactions that have already been applied.
313 */
315
318 originname,
319 sizeof(originname));
320
321 /*
322 * Resetting the origin session removes the ownership of the slot.
323 * This is needed to allow the origin to be dropped.
324 */
329
330 /*
331 * Drop the tablesync's origin tracking if exists.
332 *
333 * There is a chance that the user is concurrently performing refresh
334 * for the subscription where we remove the table state and its origin
335 * or the apply worker would have removed this origin. So passing
336 * missing_ok = true.
337 */
338 replorigin_drop_by_name(originname, true, false);
339
341 }
342 else
344}
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:477
void replorigin_session_reset(void)
Definition: origin.c:1225
RepOriginId replorigin_session_origin
Definition: origin.c:163
#define InvalidRepOriginId
Definition: origin.h:33
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
pg_noreturn void FinishSyncWorker(void)
Definition: syncutils.c:50
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition: tablesync.c:1203
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:453
uint32 TimeLineID
Definition: xlogdefs.h:63

References CommitTransactionCommand(), FinishSyncWorker(), InvalidRepOriginId, InvalidXLogRecPtr, IsTransactionState(), LogRepWorkerWalRcvConn, MyLogicalRepWorker, NAMEDATALEN, pgstat_report_stat(), LogicalRepWorker::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_drop_by_name(), replorigin_session_origin, replorigin_session_origin_lsn, replorigin_session_origin_timestamp, replorigin_session_reset(), SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), LogicalRepWorker::subid, UpdateSubscriptionRelState(), and walrcv_endstreaming.

Referenced by ProcessSyncingRelations().

◆ ReplicationOriginNameForLogicalRep()

void ReplicationOriginNameForLogicalRep ( Oid  suboid,
Oid  relid,
char *  originname,
Size  szoriginname 
)

Definition at line 641 of file worker.c.

643{
644 if (OidIsValid(relid))
645 {
646 /* Replication origin name for tablesync workers. */
647 snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
648 }
649 else
650 {
651 /* Replication origin name for non-tablesync workers. */
652 snprintf(originname, szoriginname, "pg_%u", suboid);
653 }
654}

References OidIsValid, and snprintf.

Referenced by AlterSubscription(), AlterSubscription_refresh(), binary_upgrade_replorigin_advance(), CreateSubscription(), DropSubscription(), LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), ProcessSyncingTablesForApply(), ProcessSyncingTablesForSync(), run_apply_worker(), and run_tablesync_worker().

◆ set_apply_error_context_origin()

void set_apply_error_context_origin ( char *  originname)

Definition at line 6301 of file worker.c.

6302{
6304 originname);
6305}
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1746

References apply_error_callback_arg, ApplyContext, MemoryContextStrdup(), and ApplyErrorCallbackArg::origin_name.

Referenced by ParallelApplyWorkerMain(), run_apply_worker(), and run_tablesync_worker().

◆ set_stream_options()

void set_stream_options ( WalRcvStreamOptions options,
char *  slotname,
XLogRecPtr origin_startpos 
)

Definition at line 5514 of file worker.c.

5517{
5518 int server_version;
5519
5520 options->logical = true;
5521 options->startpoint = *origin_startpos;
5522 options->slotname = slotname;
5523
5525 options->proto.logical.proto_version =
5530
5531 options->proto.logical.publication_names = MySubscription->publications;
5532 options->proto.logical.binary = MySubscription->binary;
5533
5534 /*
5535 * Assign the appropriate option value for streaming option according to
5536 * the 'streaming' mode and the publisher's ability to support that mode.
5537 */
5538 if (server_version >= 160000 &&
5539 MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
5540 {
5541 options->proto.logical.streaming_str = "parallel";
5543 }
5544 else if (server_version >= 140000 &&
5545 MySubscription->stream != LOGICALREP_STREAM_OFF)
5546 {
5547 options->proto.logical.streaming_str = "on";
5549 }
5550 else
5551 {
5552 options->proto.logical.streaming_str = NULL;
5554 }
5555
5556 options->proto.logical.twophase = false;
5557 options->proto.logical.origin = pstrdup(MySubscription->origin);
5558}
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
Definition: logicalproto.h:44
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:42
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:43
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:41
char * pstrdup(const char *in)
Definition: mcxt.c:1759
static int server_version
Definition: pg_dumpall.c:109
#define walrcv_server_version(conn)
Definition: walreceiver.h:447

References Subscription::binary, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM, LOGICALREP_PROTO_STREAM_VERSION_NUM, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM, LOGICALREP_PROTO_VERSION_NUM, LogRepWorkerWalRcvConn, MyLogicalRepWorker, MySubscription, Subscription::origin, LogicalRepWorker::parallel_apply, pstrdup(), Subscription::publications, server_version, Subscription::stream, and walrcv_server_version.

Referenced by run_apply_worker(), and run_tablesync_worker().

◆ SetupApplyOrSyncWorker()

void SetupApplyOrSyncWorker ( int  worker_slot)

Definition at line 5869 of file worker.c.

5870{
5871 /* Attach to slot */
5872 logicalrep_worker_attach(worker_slot);
5873
5875
5876 /* Setup signal handling */
5878 pqsignal(SIGTERM, die);
5880
5881 /*
5882 * We don't currently need any ResourceOwner in a walreceiver process, but
5883 * if we did, we could call CreateAuxProcessResourceOwner here.
5884 */
5885
5886 /* Initialise stats to a sanish value */
5889
5890 /* Load the libpq-specific functions */
5891 load_file("libpqwalreceiver", false);
5892
5894
5895 /*
5896 * Register a callback to reset the origin state before aborting any
5897 * pending transaction during shutdown (see ShutdownPostgres()). This will
5898 * avoid origin advancement for an in-complete transaction which could
5899 * otherwise lead to its loss as such a transaction won't be sent by the
5900 * server again.
5901 *
5902 * Note that even a LOG or DEBUG statement placed after setting the origin
5903 * state may process a shutdown signal before committing the current apply
5904 * operation. So, it is important to register such a callback here.
5905 */
5907
5908 /* Connect to the origin and start the replication. */
5909 elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
5911
5912 /*
5913 * Setup callback for syscache so that we know when something changes in
5914 * the subscription relation state.
5915 */
5916 CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
5918 (Datum) 0);
5919}
static void replorigin_reset(int code, Datum arg)
Definition: worker.c:5858
void InitializeLogRepWorker(void)
Definition: worker.c:5737
void BackgroundWorkerUnblockSignals(void)
Definition: bgworker.c:930
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:149
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void logicalrep_worker_attach(int slot)
Definition: launcher.c:757
#define die(msg)
#define pqsignal
Definition: port.h:552
void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue)
Definition: syncutils.c:101
#define SIGHUP
Definition: win32_port.h:158

References am_leader_apply_worker(), am_sequencesync_worker(), am_tablesync_worker(), Assert(), BackgroundWorkerUnblockSignals(), before_shmem_exit(), CacheRegisterSyscacheCallback(), Subscription::conninfo, DEBUG1, die, elog, GetCurrentTimestamp(), InitializeLogRepWorker(), InvalidateSyncingRelStates(), LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), logicalrep_worker_attach(), MyLogicalRepWorker, MySubscription, pqsignal, replorigin_reset(), LogicalRepWorker::reply_time, SIGHUP, and SignalHandlerForConfigReload().

Referenced by ApplyWorkerMain(), SequenceSyncWorkerMain(), and TableSyncWorkerMain().

◆ start_apply()

void start_apply ( XLogRecPtr  origin_startpos)

Definition at line 5583 of file worker.c.

5584{
5585 PG_TRY();
5586 {
5587 LogicalRepApplyLoop(origin_startpos);
5588 }
5589 PG_CATCH();
5590 {
5591 /*
5592 * Reset the origin state to prevent the advancement of origin
5593 * progress if we fail to apply. Otherwise, this will result in
5594 * transaction loss as that transaction won't be sent again by the
5595 * server.
5596 */
5597 replorigin_reset(0, (Datum) 0);
5598
5601 else
5602 {
5603 /*
5604 * Report the worker failed while applying changes. Abort the
5605 * current transaction so that the stats message is sent in an
5606 * idle state.
5607 */
5611
5612 PG_RE_THROW();
5613 }
5614 }
5615 PG_END_TRY();
5616}
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:3981
void DisableSubscriptionAndExit(void)
Definition: worker.c:5943
#define PG_RE_THROW()
Definition: elog.h:405
#define PG_TRY(...)
Definition: elog.h:372
#define PG_END_TRY(...)
Definition: elog.h:397
#define PG_CATCH(...)
Definition: elog.h:382

References AbortOutOfAnyTransaction(), Subscription::disableonerr, DisableSubscriptionAndExit(), LogicalRepApplyLoop(), MyLogicalRepWorker, MySubscription, Subscription::oid, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgstat_report_subscription_error(), replorigin_reset(), and LogicalRepWorker::type.

Referenced by run_apply_worker(), and run_tablesync_worker().

◆ store_flush_position()

void store_flush_position ( XLogRecPtr  remote_lsn,
XLogRecPtr  local_lsn 
)

Definition at line 3939 of file worker.c.

3940{
3941 FlushPosition *flushpos;
3942
3943 /*
3944 * Skip for parallel apply workers, because the lsn_mapping is maintained
3945 * by the leader apply worker.
3946 */
3948 return;
3949
3950 /* Need to do this in permanent context */
3952
3953 /* Track commit lsn */
3954 flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
3955 flushpos->local_end = local_lsn;
3956 flushpos->remote_end = remote_lsn;
3957
3958 dlist_push_tail(&lsn_mapping, &flushpos->node);
3960}
static dlist_head lsn_mapping
Definition: worker.c:308
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
dlist_node node
Definition: worker.c:303
XLogRecPtr remote_end
Definition: worker.c:305
XLogRecPtr local_end
Definition: worker.c:304

References am_parallel_apply_worker(), ApplyContext, ApplyMessageContext, dlist_push_tail(), FlushPosition::local_end, lsn_mapping, MemoryContextSwitchTo(), FlushPosition::node, palloc(), and FlushPosition::remote_end.

Referenced by apply_handle_commit_internal(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), apply_handle_stream_prepare(), and pa_xact_finish().

◆ stream_cleanup_files()

void stream_cleanup_files ( Oid  subid,
TransactionId  xid 
)

Definition at line 5381 of file worker.c.

5382{
5383 char path[MAXPGPATH];
5384
5385 /* Delete the changes file. */
5386 changes_filename(path, subid, xid);
5388
5389 /* Delete the subxact file, if it exists. */
5390 subxact_filename(path, subid, xid);
5392}
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:5360
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
Definition: buffile.c:364

References BufFileDeleteFileSet(), changes_filename(), MAXPGPATH, MyLogicalRepWorker, LogicalRepWorker::stream_fileset, and subxact_filename().

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), pa_free_worker_info(), and stream_abort_internal().

◆ stream_start_internal()

void stream_start_internal ( TransactionId  xid,
bool  first_segment 
)

Definition at line 1687 of file worker.c.

1688{
1690
1691 /*
1692 * Initialize the worker's stream_fileset if we haven't yet. This will be
1693 * used for the entire duration of the worker so create it in a permanent
1694 * context. We create this on the very first streaming message from any
1695 * transaction and then use it for this and other streaming transactions.
1696 * Now, we could create a fileset at the start of the worker as well but
1697 * then we won't be sure that it will ever be used.
1698 */
1700 {
1701 MemoryContext oldctx;
1702
1704
1707
1708 MemoryContextSwitchTo(oldctx);
1709 }
1710
1711 /* Open the spool file for this transaction. */
1712 stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1713
1714 /* If this is not the first segment, open existing subxact file. */
1715 if (!first_segment)
1717
1719}
static void stream_open_file(Oid subid, TransactionId xid, bool first_segment)
Definition: worker.c:5405
static void subxact_info_read(Oid subid, TransactionId xid)
Definition: worker.c:5231
void FileSetInit(FileSet *fileset)
Definition: fileset.c:52

References ApplyContext, begin_replication_step(), end_replication_step(), FileSetInit(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc(), LogicalRepWorker::stream_fileset, stream_open_file(), LogicalRepWorker::subid, and subxact_info_read().

Referenced by apply_handle_stream_start(), pa_switch_to_partial_serialize(), and stream_open_and_write_change().

◆ stream_stop_internal()

void stream_stop_internal ( TransactionId  xid)

Definition at line 1862 of file worker.c.

1863{
1864 /*
1865 * Serialize information about subxacts for the toplevel transaction, then
1866 * close the stream messages spool file.
1867 */
1870
1871 /* We must be in a valid transaction state */
1873
1874 /* Commit the per-stream transaction */
1876
1877 /* Reset per-stream context */
1879}
static void subxact_info_write(Oid subid, TransactionId xid)
Definition: worker.c:5182
static MemoryContext LogicalStreamingContext
Definition: worker.c:475

References Assert(), CommitTransactionCommand(), IsTransactionState(), LogicalStreamingContext, MemoryContextReset(), MyLogicalRepWorker, stream_close_file(), LogicalRepWorker::subid, and subxact_info_write().

Referenced by apply_handle_stream_stop(), and stream_open_and_write_change().

◆ UpdateTwoPhaseState()

void UpdateTwoPhaseState ( Oid  suboid,
char  new_state 
)

Definition at line 1651 of file tablesync.c.

1652{
1653 Relation rel;
1654 HeapTuple tup;
1655 bool nulls[Natts_pg_subscription];
1656 bool replaces[Natts_pg_subscription];
1657 Datum values[Natts_pg_subscription];
1658
1659 Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
1660 new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1661 new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1662
1663 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1664 tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
1665 if (!HeapTupleIsValid(tup))
1666 elog(ERROR,
1667 "cache lookup failed for subscription oid %u",
1668 suboid);
1669
1670 /* Form a new tuple. */
1671 memset(values, 0, sizeof(values));
1672 memset(nulls, false, sizeof(nulls));
1673 memset(replaces, false, sizeof(replaces));
1674
1675 /* And update/set two_phase state */
1676 values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1677 replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1678
1679 tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1680 values, nulls, replaces);
1681 CatalogTupleUpdate(rel, &tup->t_self, tup);
1682
1683 heap_freetuple(tup);
1685}
static Datum values[MAXATTR]
Definition: bootstrap.c:153
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1210
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition: indexing.c:313
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:262
static Datum CharGetDatum(char X)
Definition: postgres.h:132
#define RelationGetDescr(relation)
Definition: rel.h:541
ItemPointerData t_self
Definition: htup.h:65
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:91

References Assert(), CatalogTupleUpdate(), CharGetDatum(), elog, ERROR, heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, ObjectIdGetDatum(), RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy1, HeapTupleData::t_self, table_close(), table_open(), and values.

Referenced by CreateSubscription(), and run_apply_worker().

Variable Documentation

◆ apply_error_context_stack

PGDLLIMPORT ErrorContextCallback* apply_error_context_stack
extern

Definition at line 469 of file worker.c.

Referenced by LogicalRepApplyLoop(), and ProcessParallelApplyMessage().

◆ ApplyContext

◆ ApplyMessageContext

◆ in_remote_transaction

◆ InitializingApplyWorker

PGDLLIMPORT bool InitializingApplyWorker
extern

Definition at line 499 of file worker.c.

Referenced by ApplyWorkerMain(), logicalrep_worker_onexit(), and ParallelApplyWorkerMain().

◆ LogRepWorkerWalRcvConn

◆ MyLogicalRepWorker

PGDLLIMPORT LogicalRepWorker* MyLogicalRepWorker
extern

◆ MyParallelShared

◆ MySubscription

◆ table_states_not_ready

PGDLLIMPORT List* table_states_not_ready
extern