PostgreSQL Source Code  git master
worker_internal.h
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * worker_internal.h
4  * Internal headers shared by logical replication workers.
5  *
6  * Portions Copyright (c) 2016-2023, PostgreSQL Global Development Group
7  *
8  * src/include/replication/worker_internal.h
9  *
10  *-------------------------------------------------------------------------
11  */
12 #ifndef WORKER_INTERNAL_H
13 #define WORKER_INTERNAL_H
14 
15 #include <signal.h>
16 
17 #include "access/xlogdefs.h"
19 #include "datatype/timestamp.h"
20 #include "miscadmin.h"
23 #include "storage/buffile.h"
24 #include "storage/fileset.h"
25 #include "storage/lock.h"
26 #include "storage/shm_mq.h"
27 #include "storage/shm_toc.h"
28 #include "storage/spin.h"
29 
30 /* Different types of worker */
32 {
38 
39 typedef struct LogicalRepWorker
40 {
41  /* What type of worker is this? */
43 
44  /* Time at which this worker was launched. */
46 
47  /* Indicates if this slot is used or free. */
48  bool in_use;
49 
50  /* Increased every time the slot is taken by new worker. */
52 
53  /* Pointer to proc array. NULL if not running. */
55 
56  /* Database id to connect to. */
58 
59  /* User to use for connection (will be same as owner of subscription). */
61 
62  /* Subscription id for the worker. */
64 
65  /* Used for initial table synchronization. */
67  char relstate;
70 
71  /*
72  * Used to create the changes and subxact files for the streaming
73  * transactions. Upon the arrival of the first streaming transaction or
74  * when the first-time leader apply worker times out while sending changes
75  * to the parallel apply worker, the fileset will be initialized, and it
76  * will be deleted when the worker exits. Under this, separate buffiles
77  * would be created for each transaction which will be deleted after the
78  * transaction is finished.
79  */
81 
82  /*
83  * PID of leader apply worker if this slot is used for a parallel apply
84  * worker, InvalidPid otherwise.
85  */
86  pid_t leader_pid;
87 
88  /* Indicates whether apply can be performed in parallel. */
90 
91  /* Stats. */
98 
99 /*
100  * State of the transaction in parallel apply worker.
101  *
102  * The enum values must have the same order as the transaction state
103  * transitions.
104  */
105 typedef enum ParallelTransState
106 {
111 
112 /*
113  * State of fileset used to communicate changes from leader to parallel
114  * apply worker.
115  *
116  * FS_EMPTY indicates an initial state where the leader doesn't need to use
117  * the file to communicate with the parallel apply worker.
118  *
119  * FS_SERIALIZE_IN_PROGRESS indicates that the leader is serializing changes
120  * to the file.
121  *
122  * FS_SERIALIZE_DONE indicates that the leader has serialized all changes to
123  * the file.
124  *
125  * FS_READY indicates that it is now ok for a parallel apply worker to
126  * read the file.
127  */
129 {
135 
136 /*
137  * Struct for sharing information between leader apply worker and parallel
138  * apply workers.
139  */
141 {
143 
145 
146  /*
147  * State used to ensure commit ordering.
148  *
149  * The parallel apply worker will set it to PARALLEL_TRANS_FINISHED after
150  * handling the transaction finish commands while the apply leader will
151  * wait for it to become PARALLEL_TRANS_FINISHED before proceeding in
152  * transaction finish commands (e.g. STREAM_COMMIT/STREAM_PREPARE/
153  * STREAM_ABORT).
154  */
156 
157  /* Information from the corresponding LogicalRepWorker slot. */
160 
161  /*
162  * Indicates whether there are pending streaming blocks in the queue. The
163  * parallel apply worker will check it before starting to wait.
164  */
166 
167  /*
168  * XactLastCommitEnd from the parallel apply worker. This is required by
169  * the leader worker so it can update the lsn_mappings.
170  */
172 
173  /*
174  * After entering PARTIAL_SERIALIZE mode, the leader apply worker will
175  * serialize changes to the file, and share the fileset with the parallel
176  * apply worker when processing the transaction finish command. Then the
177  * parallel apply worker will apply all the spooled messages.
178  *
179  * FileSet is used here instead of SharedFileSet because we need it to
180  * survive after releasing the shared memory so that the leader apply
181  * worker can re-use the same fileset for the next streaming transaction.
182  */
186 
187 /*
188  * Information which is used to manage the parallel apply worker.
189  */
191 {
192  /*
193  * This queue is used to send changes from the leader apply worker to the
194  * parallel apply worker.
195  */
197 
198  /*
199  * This queue is used to transfer error messages from the parallel apply
200  * worker to the leader apply worker.
201  */
203 
205 
206  /*
207  * Indicates whether the leader apply worker needs to serialize the
208  * remaining changes to a file due to timeout when attempting to send data
209  * to the parallel apply worker via shared memory.
210  */
212 
213  /*
214  * True if the worker is being used to process a parallel apply
215  * transaction. False indicates this worker is available for re-use.
216  */
217  bool in_use;
218 
221 
222 /* Main memory context for apply worker. Permanent during worker lifetime. */
224 
226 
228 
230 
231 /* libpqreceiver connection */
233 
234 /* Worker and subscription objects. */
237 
239 
241 
242 extern void logicalrep_worker_attach(int slot);
243 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
244  bool only_running);
245 extern List *logicalrep_workers_find(Oid subid, bool only_running);
247  Oid dbid, Oid subid, const char *subname,
248  Oid userid, Oid relid,
249  dsm_handle subworker_dsm);
250 extern void logicalrep_worker_stop(Oid subid, Oid relid);
252 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
254 
255 extern int logicalrep_sync_worker_count(Oid subid);
256 
257 extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
258  char *originname, Size szoriginname);
259 
260 extern bool AllTablesyncsReady(void);
261 extern void UpdateTwoPhaseState(Oid suboid, char new_state);
262 
263 extern void process_syncing_tables(XLogRecPtr current_lsn);
264 extern void invalidate_syncing_table_states(Datum arg, int cacheid,
265  uint32 hashvalue);
266 
267 extern void stream_start_internal(TransactionId xid, bool first_segment);
268 extern void stream_stop_internal(TransactionId xid);
269 
270 /* Common streaming function to apply all the spooled messages */
271 extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
272  XLogRecPtr lsn);
273 
274 extern void apply_dispatch(StringInfo s);
275 
276 extern void maybe_reread_subscription(void);
277 
278 extern void stream_cleanup_files(Oid subid, TransactionId xid);
279 
281  char *slotname,
282  XLogRecPtr *origin_startpos);
283 
284 extern void start_apply(XLogRecPtr origin_startpos);
285 
286 extern void InitializeLogRepWorker(void);
287 
288 extern void SetupApplyOrSyncWorker(int worker_slot);
289 
290 extern void DisableSubscriptionAndExit(void);
291 
292 extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn);
293 
294 /* Function for apply error callback */
295 extern void apply_error_callback(void *arg);
296 extern void set_apply_error_context_origin(char *originname);
297 
298 /* Parallel apply worker setup and interactions */
299 extern void pa_allocate_worker(TransactionId xid);
301 extern void pa_detach_all_error_mq(void);
302 
303 extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
304  const void *data);
306  bool stream_locked);
307 
308 extern void pa_set_xact_state(ParallelApplyWorkerShared *wshared,
309  ParallelTransState xact_state);
311 
312 extern void pa_start_subtrans(TransactionId current_xid,
313  TransactionId top_xid);
314 extern void pa_reset_subtrans(void);
315 extern void pa_stream_abort(LogicalRepStreamAbortData *abort_data);
317  PartialFileSetState fileset_state);
318 
319 extern void pa_lock_stream(TransactionId xid, LOCKMODE lockmode);
320 extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode);
321 
322 extern void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode);
323 extern void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode);
324 
325 extern void pa_decr_and_wait_stream_block(void);
326 
327 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
328  XLogRecPtr remote_lsn);
329 
330 #define isParallelApplyWorker(worker) ((worker)->in_use && \
331  (worker)->type == WORKERTYPE_PARALLEL_APPLY)
332 #define isTablesyncWorker(worker) ((worker)->in_use && \
333  (worker)->type == WORKERTYPE_TABLESYNC)
334 
335 static inline bool
337 {
339 }
340 
341 static inline bool
343 {
346 }
347 
348 static inline bool
350 {
353 }
354 
355 #endif /* WORKER_INTERNAL_H */
unsigned short uint16
Definition: c.h:494
unsigned int uint32
Definition: c.h:495
#define PGDLLIMPORT
Definition: c.h:1326
uint32 TransactionId
Definition: c.h:641
size_t Size
Definition: c.h:594
int64 TimestampTz
Definition: timestamp.h:39
uint32 dsm_handle
Definition: dsm_impl.h:55
Assert(fmt[strlen(fmt) - 1] !='\n')
int LOCKMODE
Definition: lockdefs.h:26
void * arg
const void * data
NameData subname
uintptr_t Datum
Definition: postgres.h:64
unsigned int Oid
Definition: postgres_ext.h:31
int slock_t
Definition: s_lock.h:754
Definition: pg_list.h:54
XLogRecPtr relstate_lsn
TimestampTz last_recv_time
LogicalRepWorkerType type
TimestampTz launch_time
TimestampTz reply_time
FileSet * stream_fileset
XLogRecPtr reply_lsn
XLogRecPtr last_lsn
TimestampTz last_send_time
Definition: proc.h:162
shm_mq_handle * error_mq_handle
shm_mq_handle * mq_handle
ParallelApplyWorkerShared * shared
pg_atomic_uint32 pending_stream_count
PartialFileSetState fileset_state
ParallelTransState xact_state
bool AllTablesyncsReady(void)
Definition: tablesync.c:1703
ParallelTransState
@ PARALLEL_TRANS_UNKNOWN
@ PARALLEL_TRANS_STARTED
@ PARALLEL_TRANS_FINISHED
#define isParallelApplyWorker(worker)
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
Definition: worker.c:4356
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm)
Definition: launcher.c:306
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:249
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:4223
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
PGDLLIMPORT ErrorContextCallback * apply_error_context_stack
Definition: worker.c:306
PGDLLIMPORT bool in_remote_transaction
Definition: worker.c:321
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:702
PGDLLIMPORT MemoryContext ApplyMessageContext
Definition: worker.c:308
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:274
static bool am_parallel_apply_worker(void)
PGDLLIMPORT LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:61
struct ParallelApplyWorkerShared ParallelApplyWorkerShared
void logicalrep_worker_attach(int slot)
Definition: launcher.c:713
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)
void stream_stop_internal(TransactionId xid)
Definition: worker.c:1621
void start_apply(XLogRecPtr origin_startpos)
Definition: worker.c:4443
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
struct ParallelApplyWorkerInfo ParallelApplyWorkerInfo
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
void apply_dispatch(StringInfo s)
Definition: worker.c:3290
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_reset_subtrans(void)
void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
Definition: launcher.c:639
List * logicalrep_workers_find(Oid subid, bool only_running)
Definition: launcher.c:281
void DisableSubscriptionAndExit(void)
Definition: worker.c:4723
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:650
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:446
void pa_detach_all_error_mq(void)
PGDLLIMPORT struct WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:314
PGDLLIMPORT bool InitializingApplyWorker
Definition: worker.c:336
void stream_start_internal(TransactionId xid, bool first_segment)
Definition: worker.c:1447
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
void logicalrep_worker_wakeup(Oid subid, Oid relid)
Definition: launcher.c:682
void logicalrep_worker_stop(Oid subid, Oid relid)
Definition: launcher.c:615
void set_apply_error_context_origin(char *originname)
Definition: worker.c:5053
LogicalRepWorkerType
@ WORKERTYPE_TABLESYNC
@ WORKERTYPE_UNKNOWN
@ WORKERTYPE_PARALLEL_APPLY
@ WORKERTYPE_APPLY
struct LogicalRepWorker LogicalRepWorker
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
PartialFileSetState
@ FS_EMPTY
@ FS_SERIALIZE_DONE
@ FS_READY
@ FS_SERIALIZE_IN_PROGRESS
#define isTablesyncWorker(worker)
void SetupApplyOrSyncWorker(int worker_slot)
Definition: worker.c:4662
static bool am_tablesync_worker(void)
static bool am_leader_apply_worker(void)
PGDLLIMPORT Subscription * MySubscription
Definition: worker.c:316
void apply_error_callback(void *arg)
Definition: worker.c:4911
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition: worker.c:3454
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:854
void maybe_reread_subscription(void)
Definition: worker.c:3880
void InitializeLogRepWorker(void)
Definition: worker.c:4579
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_allocate_worker(TransactionId xid)
PGDLLIMPORT ParallelApplyWorkerShared * MyParallelShared
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:2019
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1728
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void pa_decr_and_wait_stream_block(void)
PGDLLIMPORT MemoryContext ApplyContext
Definition: worker.c:309
uint64 XLogRecPtr
Definition: xlogdefs.h:21