PostgreSQL Source Code git master
Loading...
Searching...
No Matches
worker.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 * worker.c
3 * PostgreSQL logical replication worker (apply)
4 *
5 * Copyright (c) 2016-2026, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/worker.c
9 *
10 * NOTES
11 * This file contains the worker which applies logical changes as they come
12 * from remote logical replication stream.
13 *
14 * The main worker (apply) is started by logical replication worker
15 * launcher for every enabled subscription in a database. It uses
16 * walsender protocol to communicate with publisher.
17 *
18 * This module includes server facing code and shares libpqwalreceiver
19 * module with walreceiver for providing the libpq specific functionality.
20 *
21 *
22 * STREAMED TRANSACTIONS
23 * ---------------------
24 * Streamed transactions (large transactions exceeding a memory limit on the
25 * upstream) are applied using one of two approaches:
26 *
27 * 1) Write to temporary files and apply when the final commit arrives
28 *
29 * This approach is used when the user has set the subscription's streaming
30 * option as on.
31 *
32 * Unlike the regular (non-streamed) case, handling streamed transactions has
33 * to handle aborts of both the toplevel transaction and subtransactions. This
34 * is achieved by tracking offsets for subtransactions, which is then used
35 * to truncate the file with serialized changes.
36 *
37 * The files are placed in tmp file directory by default, and the filenames
38 * include both the XID of the toplevel transaction and OID of the
39 * subscription. This is necessary so that different workers processing a
40 * remote transaction with the same XID doesn't interfere.
41 *
42 * We use BufFiles instead of using normal temporary files because (a) the
43 * BufFile infrastructure supports temporary files that exceed the OS file size
44 * limit, (b) provides a way for automatic clean up on the error and (c) provides
45 * a way to survive these files across local transactions and allow to open and
46 * close at stream start and close. We decided to use FileSet
47 * infrastructure as without that it deletes the files on the closure of the
48 * file and if we decide to keep stream files open across the start/stop stream
49 * then it will consume a lot of memory (more than 8K for each BufFile and
50 * there could be multiple such BufFiles as the subscriber could receive
51 * multiple start/stop streams for different transactions before getting the
52 * commit). Moreover, if we don't use FileSet then we also need to invent
53 * a new way to pass filenames to BufFile APIs so that we are allowed to open
54 * the file we desired across multiple stream-open calls for the same
55 * transaction.
56 *
57 * 2) Parallel apply workers.
58 *
59 * This approach is used when the user has set the subscription's streaming
60 * option as parallel. See logical/applyparallelworker.c for information about
61 * this approach.
62 *
63 * TWO_PHASE TRANSACTIONS
64 * ----------------------
65 * Two phase transactions are replayed at prepare and then committed or
66 * rolled back at commit prepared and rollback prepared respectively. It is
67 * possible to have a prepared transaction that arrives at the apply worker
68 * when the tablesync is busy doing the initial copy. In this case, the apply
69 * worker skips all the prepared operations [e.g. inserts] while the tablesync
70 * is still busy (see the condition of should_apply_changes_for_rel). The
71 * tablesync worker might not get such a prepared transaction because say it
72 * was prior to the initial consistent point but might have got some later
73 * commits. Now, the tablesync worker will exit without doing anything for the
74 * prepared transaction skipped by the apply worker as the sync location for it
75 * will be already ahead of the apply worker's current location. This would lead
76 * to an "empty prepare", because later when the apply worker does the commit
77 * prepare, there is nothing in it (the inserts were skipped earlier).
78 *
79 * To avoid this, and similar prepare confusions the subscription's two_phase
80 * commit is enabled only after the initial sync is over. The two_phase option
81 * has been implemented as a tri-state with values DISABLED, PENDING, and
82 * ENABLED.
83 *
84 * Even if the user specifies they want a subscription with two_phase = on,
85 * internally it will start with a tri-state of PENDING which only becomes
86 * ENABLED after all tablesync initializations are completed - i.e. when all
87 * tablesync workers have reached their READY state. In other words, the value
88 * PENDING is only a temporary state for subscription start-up.
89 *
90 * Until the two_phase is properly available (ENABLED) the subscription will
91 * behave as if two_phase = off. When the apply worker detects that all
92 * tablesyncs have become READY (while the tri-state was PENDING) it will
93 * restart the apply worker process. This happens in
94 * ProcessSyncingTablesForApply.
95 *
96 * When the (re-started) apply worker finds that all tablesyncs are READY for a
97 * two_phase tri-state of PENDING it start streaming messages with the
98 * two_phase option which in turn enables the decoding of two-phase commits at
99 * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100 * Now, it is possible that during the time we have not enabled two_phase, the
101 * publisher (replication server) would have skipped some prepares but we
102 * ensure that such prepares are sent along with commit prepare, see
103 * ReorderBufferFinishPrepared.
104 *
105 * If the subscription has no tables then a two_phase tri-state PENDING is
106 * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107 * PUBLICATION which might otherwise be disallowed (see below).
108 *
109 * If ever a user needs to be aware of the tri-state value, they can fetch it
110 * from the pg_subscription catalog (see column subtwophasestate).
111 *
112 * Finally, to avoid problems mentioned in previous paragraphs from any
113 * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
114 * to 'off' and then again back to 'on') there is a restriction for
115 * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
116 * the two_phase tri-state is ENABLED, except when copy_data = false.
117 *
118 * We can get prepare of the same GID more than once for the genuine cases
119 * where we have defined multiple subscriptions for publications on the same
120 * server and prepared transaction has operations on tables subscribed to those
121 * subscriptions. For such cases, if we use the GID sent by publisher one of
122 * the prepares will be successful and others will fail, in which case the
123 * server will send them again. Now, this can lead to a deadlock if user has
124 * set synchronous_standby_names for all the subscriptions on subscriber. To
125 * avoid such deadlocks, we generate a unique GID (consisting of the
126 * subscription oid and the xid of the prepared transaction) for each prepare
127 * transaction on the subscriber.
128 *
129 * FAILOVER
130 * ----------------------
131 * The logical slot on the primary can be synced to the standby by specifying
132 * failover = true when creating the subscription. Enabling failover allows us
133 * to smoothly transition to the promoted standby, ensuring that we can
134 * subscribe to the new primary without losing any data.
135 *
136 * RETAIN DEAD TUPLES
137 * ----------------------
138 * Each apply worker that enabled retain_dead_tuples option maintains a
139 * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
140 * prevent dead rows from being removed prematurely when the apply worker still
141 * needs them to detect update_deleted conflicts. Additionally, this helps to
142 * retain the required commit_ts module information, which further helps to
143 * detect update_origin_differs and delete_origin_differs conflicts reliably, as
144 * otherwise, vacuum freeze could remove the required information.
145 *
146 * The logical replication launcher manages an internal replication slot named
147 * "pg_conflict_detection". It asynchronously aggregates the non-removable
148 * transaction ID from all apply workers to determine the appropriate xmin for
149 * the slot, thereby retaining necessary tuples.
150 *
151 * The non-removable transaction ID in the apply worker is advanced to the
152 * oldest running transaction ID once all concurrent transactions on the
153 * publisher have been applied and flushed locally. The process involves:
154 *
155 * - RDT_GET_CANDIDATE_XID:
156 * Call GetOldestActiveTransactionId() to take oldestRunningXid as the
157 * candidate xid.
158 *
159 * - RDT_REQUEST_PUBLISHER_STATUS:
160 * Send a message to the walsender requesting the publisher status, which
161 * includes the latest WAL write position and information about transactions
162 * that are in the commit phase.
163 *
164 * - RDT_WAIT_FOR_PUBLISHER_STATUS:
165 * Wait for the status from the walsender. After receiving the first status,
166 * do not proceed if there are concurrent remote transactions that are still
167 * in the commit phase. These transactions might have been assigned an
168 * earlier commit timestamp but have not yet written the commit WAL record.
169 * Continue to request the publisher status (RDT_REQUEST_PUBLISHER_STATUS)
170 * until all these transactions have completed.
171 *
172 * - RDT_WAIT_FOR_LOCAL_FLUSH:
173 * Advance the non-removable transaction ID if the current flush location has
174 * reached or surpassed the last received WAL position.
175 *
176 * - RDT_STOP_CONFLICT_INFO_RETENTION:
177 * This phase is required only when max_retention_duration is defined. We
178 * enter this phase if the wait time in either the
179 * RDT_WAIT_FOR_PUBLISHER_STATUS or RDT_WAIT_FOR_LOCAL_FLUSH phase exceeds
180 * configured max_retention_duration. In this phase,
181 * pg_subscription.subretentionactive is updated to false within a new
182 * transaction, and oldest_nonremovable_xid is set to InvalidTransactionId.
183 *
184 * - RDT_RESUME_CONFLICT_INFO_RETENTION:
185 * This phase is required only when max_retention_duration is defined. We
186 * enter this phase if the retention was previously stopped, and the time
187 * required to advance the non-removable transaction ID in the
188 * RDT_WAIT_FOR_LOCAL_FLUSH phase has decreased to within acceptable limits
189 * (or if max_retention_duration is set to 0). During this phase,
190 * pg_subscription.subretentionactive is updated to true within a new
191 * transaction, and the worker will be restarted.
192 *
193 * The overall state progression is: GET_CANDIDATE_XID ->
194 * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
195 * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
196 * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID.
197 *
198 * Retaining the dead tuples for this period is sufficient for ensuring
199 * eventual consistency using last-update-wins strategy, as dead tuples are
200 * useful for detecting conflicts only during the application of concurrent
201 * transactions from remote nodes. After applying and flushing all remote
202 * transactions that occurred concurrently with the tuple DELETE, any
203 * subsequent UPDATE from a remote node should have a later timestamp. In such
204 * cases, it is acceptable to detect an update_missing scenario and convert the
205 * UPDATE to an INSERT when applying it. But, for concurrent remote
206 * transactions with earlier timestamps than the DELETE, detecting
207 * update_deleted is necessary, as the UPDATEs in remote transactions should be
208 * ignored if their timestamp is earlier than that of the dead tuples.
209 *
210 * Note that advancing the non-removable transaction ID is not supported if the
211 * publisher is also a physical standby. This is because the logical walsender
212 * on the standby can only get the WAL replay position but there may be more
213 * WALs that are being replicated from the primary and those WALs could have
214 * earlier commit timestamp.
215 *
216 * Similarly, when the publisher has subscribed to another publisher,
217 * information necessary for conflict detection cannot be retained for
218 * changes from origins other than the publisher. This is because publisher
219 * lacks the information on concurrent transactions of other publishers to
220 * which it subscribes. As the information on concurrent transactions is
221 * unavailable beyond subscriber's immediate publishers, the non-removable
222 * transaction ID might be advanced prematurely before changes from other
223 * origins have been fully applied.
224 *
225 * XXX Retaining information for changes from other origins might be possible
226 * by requesting the subscription on that origin to enable retain_dead_tuples
227 * and fetching the conflict detection slot.xmin along with the publisher's
228 * status. In the RDT_WAIT_FOR_PUBLISHER_STATUS phase, the apply worker could
229 * wait for the remote slot's xmin to reach the oldest active transaction ID,
230 * ensuring that all transactions from other origins have been applied on the
231 * publisher, thereby getting the latest WAL position that includes all
232 * concurrent changes. However, this approach may impact performance, so it
233 * might not worth the effort.
234 *
235 * XXX It seems feasible to get the latest commit's WAL location from the
236 * publisher and wait till that is applied. However, we can't do that
237 * because commit timestamps can regress as a commit with a later LSN is not
238 * guaranteed to have a later timestamp than those with earlier LSNs. Having
239 * said that, even if that is possible, it won't improve performance much as
240 * the apply always lag and moves slowly as compared with the transactions
241 * on the publisher.
242 *-------------------------------------------------------------------------
243 */
244
245#include "postgres.h"
246
247#include <sys/stat.h>
248#include <unistd.h>
249
250#include "access/commit_ts.h"
251#include "access/table.h"
252#include "access/tableam.h"
253#include "access/tupconvert.h"
254#include "access/twophase.h"
255#include "access/xact.h"
256#include "catalog/indexing.h"
257#include "catalog/pg_inherits.h"
261#include "commands/tablecmds.h"
262#include "commands/trigger.h"
263#include "executor/executor.h"
265#include "libpq/pqformat.h"
266#include "miscadmin.h"
267#include "optimizer/optimizer.h"
269#include "pgstat.h"
270#include "port/pg_bitutils.h"
271#include "postmaster/bgworker.h"
272#include "postmaster/interrupt.h"
273#include "postmaster/walwriter.h"
274#include "replication/conflict.h"
279#include "replication/origin.h"
280#include "replication/slot.h"
284#include "storage/buffile.h"
285#include "storage/ipc.h"
286#include "storage/latch.h"
287#include "storage/lmgr.h"
288#include "storage/procarray.h"
289#include "tcop/tcopprot.h"
290#include "utils/acl.h"
291#include "utils/guc.h"
292#include "utils/inval.h"
293#include "utils/lsyscache.h"
294#include "utils/memutils.h"
295#include "utils/pg_lsn.h"
296#include "utils/rel.h"
297#include "utils/rls.h"
298#include "utils/snapmgr.h"
299#include "utils/syscache.h"
300#include "utils/usercontext.h"
301#include "utils/wait_event.h"
302
303#define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
304
311
313
314typedef struct ApplyExecutionData
315{
316 EState *estate; /* executor state, used to track resources */
317
318 LogicalRepRelMapEntry *targetRel; /* replication target rel */
319 ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
320
321 /* These fields are used when the target relation is partitioned: */
322 ModifyTableState *mtstate; /* dummy ModifyTable state */
323 PartitionTupleRouting *proute; /* partition routing info */
325
326/* Struct for saving and restoring apply errcontext information */
328{
329 LogicalRepMsgType command; /* 0 if invalid */
331
332 /* Remote node information */
333 int remote_attnum; /* -1 if invalid */
338
339/*
340 * The action to be taken for the changes in the transaction.
341 *
342 * TRANS_LEADER_APPLY:
343 * This action means that we are in the leader apply worker or table sync
344 * worker. The changes of the transaction are either directly applied or
345 * are read from temporary files (for streaming transactions) and then
346 * applied by the worker.
347 *
348 * TRANS_LEADER_SERIALIZE:
349 * This action means that we are in the leader apply worker or table sync
350 * worker. Changes are written to temporary files and then applied when the
351 * final commit arrives.
352 *
353 * TRANS_LEADER_SEND_TO_PARALLEL:
354 * This action means that we are in the leader apply worker and need to send
355 * the changes to the parallel apply worker.
356 *
357 * TRANS_LEADER_PARTIAL_SERIALIZE:
358 * This action means that we are in the leader apply worker and have sent some
359 * changes directly to the parallel apply worker and the remaining changes are
360 * serialized to a file, due to timeout while sending data. The parallel apply
361 * worker will apply these serialized changes when the final commit arrives.
362 *
363 * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
364 * serializing changes, the leader worker also needs to serialize the
365 * STREAM_XXX message to a file, and wait for the parallel apply worker to
366 * finish the transaction when processing the transaction finish command. So
367 * this new action was introduced to keep the code and logic clear.
368 *
369 * TRANS_PARALLEL_APPLY:
370 * This action means that we are in the parallel apply worker and changes of
371 * the transaction are applied directly by the worker.
372 */
373typedef enum
374{
375 /* The action for non-streaming transactions. */
377
378 /* Actions for streaming transactions. */
384
385/*
386 * The phases involved in advancing the non-removable transaction ID.
387 *
388 * See comments atop worker.c for details of the transition between these
389 * phases.
390 */
400
401/*
402 * Critical information for managing phase transitions within the
403 * RetainDeadTuplesPhase.
404 */
406{
407 RetainDeadTuplesPhase phase; /* current phase */
408 XLogRecPtr remote_lsn; /* WAL write position on the publisher */
409
410 /*
411 * Oldest transaction ID that was in the commit phase on the publisher.
412 * Use FullTransactionId to prevent issues with transaction ID wraparound,
413 * where a new remote_oldestxid could falsely appear to originate from the
414 * past and block advancement.
415 */
417
418 /*
419 * Next transaction ID to be assigned on the publisher. Use
420 * FullTransactionId for consistency and to allow straightforward
421 * comparisons with remote_oldestxid.
422 */
424
425 TimestampTz reply_time; /* when the publisher responds with status */
426
427 /*
428 * Publisher transaction ID that must be awaited to complete before
429 * entering the final phase (RDT_WAIT_FOR_LOCAL_FLUSH). Use
430 * FullTransactionId for the same reason as remote_nextxid.
431 */
433
434 TransactionId candidate_xid; /* candidate for the non-removable
435 * transaction ID */
436 TimestampTz flushpos_update_time; /* when the remote flush position was
437 * updated in final phase
438 * (RDT_WAIT_FOR_LOCAL_FLUSH) */
439
440 long table_sync_wait_time; /* time spent waiting for table sync
441 * to finish */
442
443 /*
444 * The following fields are used to determine the timing for the next
445 * round of transaction ID advancement.
446 */
447 TimestampTz last_recv_time; /* when the last message was received */
448 TimestampTz candidate_xid_time; /* when the candidate_xid is decided */
449 int xid_advance_interval; /* how much time (ms) to wait before
450 * attempting to advance the
451 * non-removable transaction ID */
453
454/*
455 * The minimum (100ms) and maximum (3 minutes) intervals for advancing
456 * non-removable transaction IDs. The maximum interval is a bit arbitrary but
457 * is sufficient to not cause any undue network traffic.
458 */
459#define MIN_XID_ADVANCE_INTERVAL 100
460#define MAX_XID_ADVANCE_INTERVAL 180000
461
462/* errcontext tracker */
464{
465 .command = 0,
466 .rel = NULL,
467 .remote_attnum = -1,
468 .remote_xid = InvalidTransactionId,
469 .finish_lsn = InvalidXLogRecPtr,
470 .origin_name = NULL,
471};
472
474
477
478/* per stream context for streaming transactions */
480
482
484static bool MySubscriptionValid = false;
485
487
490
491/* fields valid only when processing streamed transaction */
492static bool in_streamed_transaction = false;
493
495
496/*
497 * The number of changes applied by parallel apply worker during one streaming
498 * block.
499 */
501
502/* Are we initializing an apply worker? */
504
505/*
506 * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
507 * the subscription if the remote transaction's finish LSN matches the subskiplsn.
508 * Once we start skipping changes, we don't stop it until we skip all changes of
509 * the transaction even if pg_subscription is updated and MySubscription->skiplsn
510 * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
511 * we don't skip receiving and spooling the changes since we decide whether or not
512 * to skip applying the changes when starting to apply changes. The subskiplsn is
513 * cleared after successfully skipping the transaction or applying non-empty
514 * transaction. The latter prevents the mistakenly specified subskiplsn from
515 * being left. Note that we cannot skip the streaming transactions when using
516 * parallel apply workers because we cannot get the finish LSN before applying
517 * the changes. So, we don't start parallel apply worker when finish LSN is set
518 * by the user.
519 */
521#define is_skipping_changes() (unlikely(XLogRecPtrIsValid(skip_xact_finish_lsn)))
522
523/* BufFile handle of the current streaming file */
525
526/*
527 * The remote WAL position that has been applied and flushed locally. We record
528 * and use this information both while sending feedback to the server and
529 * advancing oldest_nonremovable_xid.
530 */
532
533typedef struct SubXactInfo
534{
535 TransactionId xid; /* XID of the subxact */
536 int fileno; /* file number in the buffile */
537 pgoff_t offset; /* offset in the file */
539
540/* Sub-transaction data for the current streaming transaction */
541typedef struct ApplySubXactData
542{
543 uint32 nsubxacts; /* number of sub-transactions */
544 uint32 nsubxacts_max; /* current capacity of subxacts */
545 TransactionId subxact_last; /* xid of the last sub-transaction */
546 SubXactInfo *subxacts; /* sub-xact offset in changes file */
548
550
551static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
552static inline void changes_filename(char *path, Oid subid, TransactionId xid);
553
554/*
555 * Information about subtransactions of a given toplevel transaction.
556 */
557static void subxact_info_write(Oid subid, TransactionId xid);
558static void subxact_info_read(Oid subid, TransactionId xid);
559static void subxact_info_add(TransactionId xid);
560static inline void cleanup_subxact_info(void);
561
562/*
563 * Serialize and deserialize changes for a toplevel transaction.
564 */
565static void stream_open_file(Oid subid, TransactionId xid,
566 bool first_segment);
567static void stream_write_change(char action, StringInfo s);
568static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
569static void stream_close_file(void);
570
571static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
572
574 bool status_received);
577 bool status_received);
581 bool status_received);
586static bool update_retention_status(bool active);
589 bool new_xid_found);
590
591static void apply_worker_exit(void);
592
601 Oid localindexoid);
605 Oid localindexoid);
607 LogicalRepRelation *remoterel,
611static bool FindDeletedTupleInLocalRel(Relation localrel,
620 CmdType operation);
621
622/* Functions for skipping changes */
623static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
624static void stop_skipping_changes(void);
625static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
626
627/* Functions for apply error callback */
628static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
629static inline void reset_apply_error_context_info(void);
630
633
634static void set_wal_receiver_timeout(void);
635
636static void on_exit_clear_xact_state(int code, Datum arg);
637
638/*
639 * Form the origin name for the subscription.
640 *
641 * This is a common function for tablesync and other workers. Tablesync workers
642 * must pass a valid relid. Other callers must pass relid = InvalidOid.
643 *
644 * Return the name in the supplied buffer.
645 */
646void
649{
650 if (OidIsValid(relid))
651 {
652 /* Replication origin name for tablesync workers. */
653 snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
654 }
655 else
656 {
657 /* Replication origin name for non-tablesync workers. */
659 }
660}
661
662/*
663 * Should this worker apply changes for given relation.
664 *
665 * This is mainly needed for initial relation data sync as that runs in
666 * separate worker process running in parallel and we need some way to skip
667 * changes coming to the leader apply worker during the sync of a table.
668 *
669 * Note we need to do smaller or equals comparison for SYNCDONE state because
670 * it might hold position of end of initial slot consistent point WAL
671 * record + 1 (ie start of next record) and next record can be COMMIT of
672 * transaction we are now processing (which is what we set remote_final_lsn
673 * to in apply_handle_begin).
674 *
675 * Note that for streaming transactions that are being applied in the parallel
676 * apply worker, we disallow applying changes if the target table in the
677 * subscription is not in the READY state, because we cannot decide whether to
678 * apply the change as we won't know remote_final_lsn by that time.
679 *
680 * We already checked this in pa_can_start() before assigning the
681 * streaming transaction to the parallel worker, but it also needs to be
682 * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
683 * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
684 * while applying this transaction.
685 */
686static bool
688{
689 switch (MyLogicalRepWorker->type)
690 {
692 return MyLogicalRepWorker->relid == rel->localreloid;
693
695 /* We don't synchronize rel's that are in unknown state. */
696 if (rel->state != SUBREL_STATE_READY &&
700 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
702 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
703
704 return rel->state == SUBREL_STATE_READY;
705
706 case WORKERTYPE_APPLY:
707 return (rel->state == SUBREL_STATE_READY ||
708 (rel->state == SUBREL_STATE_SYNCDONE &&
709 rel->statelsn <= remote_final_lsn));
710
712 /* Should never happen. */
713 elog(ERROR, "sequence synchronization worker is not expected to apply changes");
714 break;
715
717 /* Should never happen. */
718 elog(ERROR, "Unknown worker type");
719 }
720
721 return false; /* dummy for compiler */
722}
723
724/*
725 * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
726 *
727 * Start a transaction, if this is the first step (else we keep using the
728 * existing transaction).
729 * Also provide a global snapshot and ensure we run in ApplyMessageContext.
730 */
731static void
746
747/*
748 * Finish up one step of a replication transaction.
749 * Callers of begin_replication_step() must also call this.
750 *
751 * We don't close out the transaction here, but we should increment
752 * the command counter to make the effects of this step visible.
753 */
754static void
761
762/*
763 * Handle streamed transactions for both the leader apply worker and the
764 * parallel apply workers.
765 *
766 * In the streaming case (receiving a block of the streamed transaction), for
767 * serialize mode, simply redirect it to a file for the proper toplevel
768 * transaction, and for parallel mode, the leader apply worker will send the
769 * changes to parallel apply workers and the parallel apply worker will define
770 * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
771 * messages will be applied by both leader apply worker and parallel apply
772 * workers).
773 *
774 * Returns true for streamed transactions (when the change is either serialized
775 * to file or sent to parallel apply worker), false otherwise (regular mode or
776 * needs to be processed by parallel apply worker).
777 *
778 * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
779 * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
780 * to a parallel apply worker.
781 */
782static bool
784{
789
791
792 /* not in streaming mode */
794 return false;
795
797
798 /*
799 * The parallel apply worker needs the xid in this message to decide
800 * whether to define a savepoint, so save the original message that has
801 * not moved the cursor after the xid. We will serialize this message to a
802 * file in PARTIAL_SERIALIZE mode.
803 */
804 original_msg = *s;
805
806 /*
807 * We should have received XID of the subxact as the first part of the
808 * message, so extract it.
809 */
811
815 errmsg_internal("invalid transaction ID in streamed replication transaction")));
816
817 switch (apply_action)
818 {
821
822 /* Add the new subxact to the array (unless already there). */
824
825 /* Write the change to the current file */
826 stream_write_change(action, s);
827 return true;
828
830 Assert(winfo);
831
832 /*
833 * XXX The publisher side doesn't always send relation/type update
834 * messages after the streaming transaction, so also update the
835 * relation/type in leader apply worker. See function
836 * cleanup_rel_sync_cache.
837 */
838 if (pa_send_data(winfo, s->len, s->data))
839 return (action != LOGICAL_REP_MSG_RELATION &&
840 action != LOGICAL_REP_MSG_TYPE);
841
842 /*
843 * Switch to serialize mode when we are not able to send the
844 * change to parallel apply worker.
845 */
846 pa_switch_to_partial_serialize(winfo, false);
847
851
852 /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
853 return (action != LOGICAL_REP_MSG_RELATION &&
854 action != LOGICAL_REP_MSG_TYPE);
855
858
859 /* Define a savepoint for a subxact if needed. */
861 return false;
862
863 default:
864 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
865 return false; /* silence compiler warning */
866 }
867}
868
869/*
870 * Executor state preparation for evaluation of constraint expressions,
871 * indexes and triggers for the specified relation.
872 *
873 * Note that the caller must open and close any indexes to be updated.
874 */
875static ApplyExecutionData *
877{
879 EState *estate;
881 List *perminfos = NIL;
882 ResultRelInfo *resultRelInfo;
883
885 edata->targetRel = rel;
886
887 edata->estate = estate = CreateExecutorState();
888
890 rte->rtekind = RTE_RELATION;
891 rte->relid = RelationGetRelid(rel->localrel);
892 rte->relkind = rel->localrel->rd_rel->relkind;
893 rte->rellockmode = AccessShareLock;
894
896
899
900 edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
901
902 /*
903 * Use Relation opened by logicalrep_rel_open() instead of opening it
904 * again.
905 */
906 InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
907
908 /*
909 * We put the ResultRelInfo in the es_opened_result_relations list, even
910 * though we don't populate the es_result_relations array. That's a bit
911 * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
912 *
913 * ExecOpenIndices() is not called here either, each execution path doing
914 * an apply operation being responsible for that.
915 */
917 lappend(estate->es_opened_result_relations, resultRelInfo);
918
919 estate->es_output_cid = GetCurrentCommandId(true);
920
921 /* Prepare to catch AFTER triggers. */
923
924 /* other fields of edata remain NULL for now */
925
926 return edata;
927}
928
929/*
930 * Finish any operations related to the executor state created by
931 * create_edata_for_relation().
932 */
933static void
935{
936 EState *estate = edata->estate;
937
938 /* Handle any queued AFTER triggers. */
939 AfterTriggerEndQuery(estate);
940
941 /* Shut down tuple routing, if any was done. */
942 if (edata->proute)
943 ExecCleanupTupleRouting(edata->mtstate, edata->proute);
944
945 /*
946 * Cleanup. It might seem that we should call ExecCloseResultRelations()
947 * here, but we intentionally don't. It would close the rel we added to
948 * es_opened_result_relations above, which is wrong because we took no
949 * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
950 * any other relations opened during execution.
951 */
952 ExecResetTupleTable(estate->es_tupleTable, false);
953 FreeExecutorState(estate);
954 pfree(edata);
955}
956
957/*
958 * Executes default values for columns for which we can't map to remote
959 * relation columns.
960 *
961 * This allows us to support tables which have more columns on the downstream
962 * than on the upstream.
963 */
964static void
966 TupleTableSlot *slot)
967{
969 int num_phys_attrs = desc->natts;
970 int i;
971 int attnum,
972 num_defaults = 0;
973 int *defmap;
974 ExprState **defexprs;
975 ExprContext *econtext;
976
977 econtext = GetPerTupleExprContext(estate);
978
979 /* We got all the data via replication, no need to evaluate anything. */
980 if (num_phys_attrs == rel->remoterel.natts)
981 return;
982
983 defmap = palloc_array(int, num_phys_attrs);
985
987 for (attnum = 0; attnum < num_phys_attrs; attnum++)
988 {
990 Expr *defexpr;
991
992 if (cattr->attisdropped || cattr->attgenerated)
993 continue;
994
995 if (rel->attrmap->attnums[attnum] >= 0)
996 continue;
997
998 defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
999
1000 if (defexpr != NULL)
1001 {
1002 /* Run the expression through planner */
1003 defexpr = expression_planner(defexpr);
1004
1005 /* Initialize executable expression in copycontext */
1006 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
1007 defmap[num_defaults] = attnum;
1008 num_defaults++;
1009 }
1010 }
1011
1012 for (i = 0; i < num_defaults; i++)
1013 slot->tts_values[defmap[i]] =
1014 ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
1015}
1016
1017/*
1018 * Store tuple data into slot.
1019 *
1020 * Incoming data can be either text or binary format.
1021 */
1022static void
1025{
1026 int natts = slot->tts_tupleDescriptor->natts;
1027 int i;
1028
1029 ExecClearTuple(slot);
1030
1031 /* Call the "in" function for each non-dropped, non-null attribute */
1032 Assert(natts == rel->attrmap->maplen);
1033 for (i = 0; i < natts; i++)
1034 {
1036 int remoteattnum = rel->attrmap->attnums[i];
1037
1038 if (!att->attisdropped && remoteattnum >= 0)
1039 {
1041
1043
1044 /* Set attnum for error callback */
1046
1048 {
1049 Oid typinput;
1050 Oid typioparam;
1051
1052 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1053 slot->tts_values[i] =
1055 typioparam, att->atttypmod);
1056 slot->tts_isnull[i] = false;
1057 }
1058 else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1059 {
1060 Oid typreceive;
1061 Oid typioparam;
1062
1063 /*
1064 * In some code paths we may be asked to re-parse the same
1065 * tuple data. Reset the StringInfo's cursor so that works.
1066 */
1067 colvalue->cursor = 0;
1068
1069 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1070 slot->tts_values[i] =
1071 OidReceiveFunctionCall(typreceive, colvalue,
1072 typioparam, att->atttypmod);
1073
1074 /* Trouble if it didn't eat the whole buffer */
1075 if (colvalue->cursor != colvalue->len)
1076 ereport(ERROR,
1078 errmsg("incorrect binary data format in logical replication column %d",
1079 remoteattnum + 1)));
1080 slot->tts_isnull[i] = false;
1081 }
1082 else
1083 {
1084 /*
1085 * NULL value from remote. (We don't expect to see
1086 * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
1087 * NULL.)
1088 */
1089 slot->tts_values[i] = (Datum) 0;
1090 slot->tts_isnull[i] = true;
1091 }
1092
1093 /* Reset attnum for error callback */
1095 }
1096 else
1097 {
1098 /*
1099 * We assign NULL to dropped attributes and missing values
1100 * (missing values should be later filled using
1101 * slot_fill_defaults).
1102 */
1103 slot->tts_values[i] = (Datum) 0;
1104 slot->tts_isnull[i] = true;
1105 }
1106 }
1107
1109}
1110
1111/*
1112 * Replace updated columns with data from the LogicalRepTupleData struct.
1113 * This is somewhat similar to heap_modify_tuple but also calls the type
1114 * input functions on the user data.
1115 *
1116 * "slot" is filled with a copy of the tuple in "srcslot", replacing
1117 * columns provided in "tupleData" and leaving others as-is.
1118 *
1119 * Caution: unreplaced pass-by-ref columns in "slot" will point into the
1120 * storage for "srcslot". This is OK for current usage, but someday we may
1121 * need to materialize "slot" at the end to make it independent of "srcslot".
1122 */
1123static void
1127{
1128 int natts = slot->tts_tupleDescriptor->natts;
1129 int i;
1130
1131 /* We'll fill "slot" with a virtual tuple, so we must start with ... */
1132 ExecClearTuple(slot);
1133
1134 /*
1135 * Copy all the column data from srcslot, so that we'll have valid values
1136 * for unreplaced columns.
1137 */
1138 Assert(natts == srcslot->tts_tupleDescriptor->natts);
1140 memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
1141 memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
1142
1143 /* Call the "in" function for each replaced attribute */
1144 Assert(natts == rel->attrmap->maplen);
1145 for (i = 0; i < natts; i++)
1146 {
1148 int remoteattnum = rel->attrmap->attnums[i];
1149
1150 if (remoteattnum < 0)
1151 continue;
1152
1154
1156 {
1158
1159 /* Set attnum for error callback */
1161
1163 {
1164 Oid typinput;
1165 Oid typioparam;
1166
1167 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1168 slot->tts_values[i] =
1170 typioparam, att->atttypmod);
1171 slot->tts_isnull[i] = false;
1172 }
1173 else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1174 {
1175 Oid typreceive;
1176 Oid typioparam;
1177
1178 /*
1179 * In some code paths we may be asked to re-parse the same
1180 * tuple data. Reset the StringInfo's cursor so that works.
1181 */
1182 colvalue->cursor = 0;
1183
1184 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1185 slot->tts_values[i] =
1186 OidReceiveFunctionCall(typreceive, colvalue,
1187 typioparam, att->atttypmod);
1188
1189 /* Trouble if it didn't eat the whole buffer */
1190 if (colvalue->cursor != colvalue->len)
1191 ereport(ERROR,
1193 errmsg("incorrect binary data format in logical replication column %d",
1194 remoteattnum + 1)));
1195 slot->tts_isnull[i] = false;
1196 }
1197 else
1198 {
1199 /* must be LOGICALREP_COLUMN_NULL */
1200 slot->tts_values[i] = (Datum) 0;
1201 slot->tts_isnull[i] = true;
1202 }
1203
1204 /* Reset attnum for error callback */
1206 }
1207 }
1208
1209 /* And finally, declare that "slot" contains a valid virtual tuple */
1211}
1212
1213/*
1214 * Handle BEGIN message.
1215 */
1216static void
1218{
1220
1221 /* There must not be an active streaming transaction. */
1223
1226
1227 remote_final_lsn = begin_data.final_lsn;
1228
1230
1231 in_remote_transaction = true;
1232
1234}
1235
1236/*
1237 * Handle COMMIT message.
1238 *
1239 * TODO, support tracking of multiple origins
1240 */
1241static void
1243{
1245
1247
1248 if (commit_data.commit_lsn != remote_final_lsn)
1249 ereport(ERROR,
1251 errmsg_internal("incorrect commit LSN %X/%08X in commit message (expected %X/%08X)",
1252 LSN_FORMAT_ARGS(commit_data.commit_lsn),
1254
1256
1257 /*
1258 * Process any tables that are being synchronized in parallel, as well as
1259 * any newly added tables or sequences.
1260 */
1262
1265}
1266
1267/*
1268 * Handle BEGIN PREPARE message.
1269 */
1270static void
1272{
1274
1275 /* Tablesync should never receive prepare. */
1276 if (am_tablesync_worker())
1277 ereport(ERROR,
1279 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1280
1281 /* There must not be an active streaming transaction. */
1283
1286
1287 remote_final_lsn = begin_data.prepare_lsn;
1288
1290
1291 in_remote_transaction = true;
1292
1294}
1295
1296/*
1297 * Common function to prepare the GID.
1298 */
1299static void
1301{
1302 char gid[GIDSIZE];
1303
1304 /*
1305 * Compute unique GID for two_phase transactions. We don't use GID of
1306 * prepared transaction sent by server as that can lead to deadlock when
1307 * we have multiple subscriptions from same node point to publications on
1308 * the same node. See comments atop worker.c
1309 */
1311 gid, sizeof(gid));
1312
1313 /*
1314 * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1315 * called within the PrepareTransactionBlock below.
1316 */
1317 if (!IsTransactionBlock())
1318 {
1320 CommitTransactionCommand(); /* Completes the preceding Begin command. */
1321 }
1322
1323 /*
1324 * Update origin state so we can restart streaming from correct position
1325 * in case of crash.
1326 */
1329
1331}
1332
1333/*
1334 * Handle PREPARE message.
1335 */
1336static void
1338{
1340
1342
1343 if (prepare_data.prepare_lsn != remote_final_lsn)
1344 ereport(ERROR,
1346 errmsg_internal("incorrect prepare LSN %X/%08X in prepare message (expected %X/%08X)",
1347 LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1349
1350 /*
1351 * Unlike commit, here, we always prepare the transaction even though no
1352 * change has happened in this transaction or all changes are skipped. It
1353 * is done this way because at commit prepared time, we won't know whether
1354 * we have skipped preparing a transaction because of those reasons.
1355 *
1356 * XXX, We can optimize such that at commit prepared time, we first check
1357 * whether we have prepared the transaction or not but that doesn't seem
1358 * worthwhile because such cases shouldn't be common.
1359 */
1361
1363
1366 pgstat_report_stat(false);
1367
1368 /*
1369 * It is okay not to set the local_end LSN for the prepare because we
1370 * always flush the prepare record. So, we can send the acknowledgment of
1371 * the remote_end LSN as soon as prepare is finished.
1372 *
1373 * XXX For the sake of consistency with commit, we could have set it with
1374 * the LSN of prepare but as of now we don't track that value similar to
1375 * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
1376 * it.
1377 */
1379
1380 in_remote_transaction = false;
1381
1382 /*
1383 * Process any tables that are being synchronized in parallel, as well as
1384 * any newly added tables or sequences.
1385 */
1387
1388 /*
1389 * Since we have already prepared the transaction, in a case where the
1390 * server crashes before clearing the subskiplsn, it will be left but the
1391 * transaction won't be resent. But that's okay because it's a rare case
1392 * and the subskiplsn will be cleared when finishing the next transaction.
1393 */
1396
1399}
1400
1401/*
1402 * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1403 *
1404 * Note that we don't need to wait here if the transaction was prepared in a
1405 * parallel apply worker. In that case, we have already waited for the prepare
1406 * to finish in apply_handle_stream_prepare() which will ensure all the
1407 * operations in that transaction have happened in the subscriber, so no
1408 * concurrent transaction can cause deadlock or transaction dependency issues.
1409 */
1410static void
1412{
1414 char gid[GIDSIZE];
1415
1418
1419 /* Compute GID for two_phase transactions. */
1421 gid, sizeof(gid));
1422
1423 /* There is no transaction when COMMIT PREPARED is called */
1425
1426 /*
1427 * Update origin state so we can restart streaming from correct position
1428 * in case of crash.
1429 */
1432
1433 FinishPreparedTransaction(gid, true);
1436 pgstat_report_stat(false);
1437
1439 in_remote_transaction = false;
1440
1441 /*
1442 * Process any tables that are being synchronized in parallel, as well as
1443 * any newly added tables or sequences.
1444 */
1446
1448
1451}
1452
1453/*
1454 * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1455 *
1456 * Note that we don't need to wait here if the transaction was prepared in a
1457 * parallel apply worker. In that case, we have already waited for the prepare
1458 * to finish in apply_handle_stream_prepare() which will ensure all the
1459 * operations in that transaction have happened in the subscriber, so no
1460 * concurrent transaction can cause deadlock or transaction dependency issues.
1461 */
1462static void
1464{
1466 char gid[GIDSIZE];
1467
1470
1471 /* Compute GID for two_phase transactions. */
1473 gid, sizeof(gid));
1474
1475 /*
1476 * It is possible that we haven't received prepare because it occurred
1477 * before walsender reached a consistent point or the two_phase was still
1478 * not enabled by that time, so in such cases, we need to skip rollback
1479 * prepared.
1480 */
1481 if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1482 rollback_data.prepare_time))
1483 {
1484 /*
1485 * Update origin state so we can restart streaming from correct
1486 * position in case of crash.
1487 */
1490
1491 /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1493 FinishPreparedTransaction(gid, false);
1496
1498 }
1499
1500 pgstat_report_stat(false);
1501
1502 /*
1503 * It is okay not to set the local_end LSN for the rollback of prepared
1504 * transaction because we always flush the WAL record for it. See
1505 * apply_handle_prepare.
1506 */
1508 in_remote_transaction = false;
1509
1510 /*
1511 * Process any tables that are being synchronized in parallel, as well as
1512 * any newly added tables or sequences.
1513 */
1514 ProcessSyncingRelations(rollback_data.rollback_end_lsn);
1515
1518}
1519
1520/*
1521 * Handle STREAM PREPARE.
1522 */
1523static void
1525{
1529
1530 /* Save the message before it is consumed. */
1532
1534 ereport(ERROR,
1536 errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1537
1538 /* Tablesync should never receive prepare. */
1539 if (am_tablesync_worker())
1540 ereport(ERROR,
1542 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1543
1546
1548
1549 switch (apply_action)
1550 {
1551 case TRANS_LEADER_APPLY:
1552
1553 /*
1554 * The transaction has been serialized to file, so replay all the
1555 * spooled operations.
1556 */
1558 prepare_data.xid, prepare_data.prepare_lsn);
1559
1560 /* Mark the transaction as prepared. */
1562
1564
1565 /*
1566 * It is okay not to set the local_end LSN for the prepare because
1567 * we always flush the prepare record. See apply_handle_prepare.
1568 */
1570
1571 in_remote_transaction = false;
1572
1573 /* Unlink the files with serialized changes and subxact info. */
1575
1576 elog(DEBUG1, "finished processing the STREAM PREPARE command");
1577 break;
1578
1580 Assert(winfo);
1581
1582 if (pa_send_data(winfo, s->len, s->data))
1583 {
1584 /* Finish processing the streaming transaction. */
1585 pa_xact_finish(winfo, prepare_data.end_lsn);
1586 break;
1587 }
1588
1589 /*
1590 * Switch to serialize mode when we are not able to send the
1591 * change to parallel apply worker.
1592 */
1593 pa_switch_to_partial_serialize(winfo, true);
1594
1597 Assert(winfo);
1598
1601 &original_msg);
1602
1604
1605 /* Finish processing the streaming transaction. */
1606 pa_xact_finish(winfo, prepare_data.end_lsn);
1607 break;
1608
1610
1611 /*
1612 * If the parallel apply worker is applying spooled messages then
1613 * close the file before preparing.
1614 */
1615 if (stream_fd)
1617
1619
1620 /* Mark the transaction as prepared. */
1622
1624
1626
1627 /*
1628 * It is okay not to set the local_end LSN for the prepare because
1629 * we always flush the prepare record. See apply_handle_prepare.
1630 */
1632
1635
1637
1638 elog(DEBUG1, "finished processing the STREAM PREPARE command");
1639 break;
1640
1641 default:
1642 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1643 break;
1644 }
1645
1646 pgstat_report_stat(false);
1647
1648 /*
1649 * Process any tables that are being synchronized in parallel, as well as
1650 * any newly added tables or sequences.
1651 */
1653
1654 /*
1655 * Similar to prepare case, the subskiplsn could be left in a case of
1656 * server crash but it's okay. See the comments in apply_handle_prepare().
1657 */
1660
1662
1664}
1665
1666/*
1667 * Handle ORIGIN message.
1668 *
1669 * TODO, support tracking of multiple origins
1670 */
1671static void
1673{
1674 /*
1675 * ORIGIN message can only come inside streaming transaction or inside
1676 * remote transaction and before any actual writes.
1677 */
1681 ereport(ERROR,
1683 errmsg_internal("ORIGIN message sent out of order")));
1684}
1685
1686/*
1687 * Initialize fileset (if not already done).
1688 *
1689 * Create a new file when first_segment is true, otherwise open the existing
1690 * file.
1691 */
1692void
1694{
1696
1697 /*
1698 * Initialize the worker's stream_fileset if we haven't yet. This will be
1699 * used for the entire duration of the worker so create it in a permanent
1700 * context. We create this on the very first streaming message from any
1701 * transaction and then use it for this and other streaming transactions.
1702 * Now, we could create a fileset at the start of the worker as well but
1703 * then we won't be sure that it will ever be used.
1704 */
1706 {
1708
1710
1713
1715 }
1716
1717 /* Open the spool file for this transaction. */
1719
1720 /* If this is not the first segment, open existing subxact file. */
1721 if (!first_segment)
1723
1725}
1726
1727/*
1728 * Handle STREAM START message.
1729 */
1730static void
1732{
1733 bool first_segment;
1736
1737 /* Save the message before it is consumed. */
1739
1741 ereport(ERROR,
1743 errmsg_internal("duplicate STREAM START message")));
1744
1745 /* There must not be an active streaming transaction. */
1747
1748 /* notify handle methods we're processing a remote transaction */
1750
1751 /* extract XID of the top-level transaction */
1753
1755 ereport(ERROR,
1757 errmsg_internal("invalid transaction ID in streamed replication transaction")));
1758
1760
1761 /* Try to allocate a worker for the streaming transaction. */
1762 if (first_segment)
1764
1766
1767 switch (apply_action)
1768 {
1770
1771 /*
1772 * Function stream_start_internal starts a transaction. This
1773 * transaction will be committed on the stream stop unless it is a
1774 * tablesync worker in which case it will be committed after
1775 * processing all the messages. We need this transaction for
1776 * handling the BufFile, used for serializing the streaming data
1777 * and subxact info.
1778 */
1780 break;
1781
1783 Assert(winfo);
1784
1785 /*
1786 * Once we start serializing the changes, the parallel apply
1787 * worker will wait for the leader to release the stream lock
1788 * until the end of the transaction. So, we don't need to release
1789 * the lock or increment the stream count in that case.
1790 */
1791 if (pa_send_data(winfo, s->len, s->data))
1792 {
1793 /*
1794 * Unlock the shared object lock so that the parallel apply
1795 * worker can continue to receive changes.
1796 */
1797 if (!first_segment)
1799
1800 /*
1801 * Increment the number of streaming blocks waiting to be
1802 * processed by parallel apply worker.
1803 */
1805
1806 /* Cache the parallel apply worker for this transaction. */
1808 break;
1809 }
1810
1811 /*
1812 * Switch to serialize mode when we are not able to send the
1813 * change to parallel apply worker.
1814 */
1816
1819 Assert(winfo);
1820
1821 /*
1822 * Open the spool file unless it was already opened when switching
1823 * to serialize mode. The transaction started in
1824 * stream_start_internal will be committed on the stream stop.
1825 */
1828
1830
1831 /* Cache the parallel apply worker for this transaction. */
1833 break;
1834
1836 if (first_segment)
1837 {
1838 /* Hold the lock until the end of the transaction. */
1841
1842 /*
1843 * Signal the leader apply worker, as it may be waiting for
1844 * us.
1845 */
1848 }
1849
1851 break;
1852
1853 default:
1854 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1855 break;
1856 }
1857
1859}
1860
1861/*
1862 * Update the information about subxacts and close the file.
1863 *
1864 * This function should be called when the stream_start_internal function has
1865 * been called.
1866 */
1867void
1869{
1870 /*
1871 * Serialize information about subxacts for the toplevel transaction, then
1872 * close the stream messages spool file.
1873 */
1876
1877 /* We must be in a valid transaction state */
1879
1880 /* Commit the per-stream transaction */
1882
1883 /* Reset per-stream context */
1885}
1886
1887/*
1888 * Handle STREAM STOP message.
1889 */
1890static void
1892{
1895
1897 ereport(ERROR,
1899 errmsg_internal("STREAM STOP message without STREAM START")));
1900
1902
1903 switch (apply_action)
1904 {
1907 break;
1908
1910 Assert(winfo);
1911
1912 /*
1913 * Lock before sending the STREAM_STOP message so that the leader
1914 * can hold the lock first and the parallel apply worker will wait
1915 * for leader to release the lock. See Locking Considerations atop
1916 * applyparallelworker.c.
1917 */
1919
1920 if (pa_send_data(winfo, s->len, s->data))
1921 {
1923 break;
1924 }
1925
1926 /*
1927 * Switch to serialize mode when we are not able to send the
1928 * change to parallel apply worker.
1929 */
1930 pa_switch_to_partial_serialize(winfo, true);
1931
1937 break;
1938
1940 elog(DEBUG1, "applied %u changes in the streaming chunk",
1942
1943 /*
1944 * By the time parallel apply worker is processing the changes in
1945 * the current streaming block, the leader apply worker may have
1946 * sent multiple streaming blocks. This can lead to parallel apply
1947 * worker start waiting even when there are more chunk of streams
1948 * in the queue. So, try to lock only if there is no message left
1949 * in the queue. See Locking Considerations atop
1950 * applyparallelworker.c.
1951 *
1952 * Note that here we have a race condition where we can start
1953 * waiting even when there are pending streaming chunks. This can
1954 * happen if the leader sends another streaming block and acquires
1955 * the stream lock again after the parallel apply worker checks
1956 * that there is no pending streaming block and before it actually
1957 * starts waiting on a lock. We can handle this case by not
1958 * allowing the leader to increment the stream block count during
1959 * the time parallel apply worker acquires the lock but it is not
1960 * clear whether that is worth the complexity.
1961 *
1962 * Now, if this missed chunk contains rollback to savepoint, then
1963 * there is a risk of deadlock which probably shouldn't happen
1964 * after restart.
1965 */
1967 break;
1968
1969 default:
1970 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1971 break;
1972 }
1973
1976
1977 /*
1978 * The parallel apply worker could be in a transaction in which case we
1979 * need to report the state as STATE_IDLEINTRANSACTION.
1980 */
1983 else
1985
1987}
1988
1989/*
1990 * Helper function to handle STREAM ABORT message when the transaction was
1991 * serialized to file.
1992 */
1993static void
1995{
1996 /*
1997 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1998 * just delete the files with serialized info.
1999 */
2000 if (xid == subxid)
2002 else
2003 {
2004 /*
2005 * OK, so it's a subxact. We need to read the subxact file for the
2006 * toplevel transaction, determine the offset tracked for the subxact,
2007 * and truncate the file with changes. We also remove the subxacts
2008 * with higher offsets (or rather higher XIDs).
2009 *
2010 * We intentionally scan the array from the tail, because we're likely
2011 * aborting a change for the most recent subtransactions.
2012 *
2013 * We can't use the binary search here as subxact XIDs won't
2014 * necessarily arrive in sorted order, consider the case where we have
2015 * released the savepoint for multiple subtransactions and then
2016 * performed rollback to savepoint for one of the earlier
2017 * sub-transaction.
2018 */
2019 int64 i;
2020 int64 subidx;
2021 BufFile *fd;
2022 bool found = false;
2023 char path[MAXPGPATH];
2024
2025 subidx = -1;
2028
2029 for (i = subxact_data.nsubxacts; i > 0; i--)
2030 {
2031 if (subxact_data.subxacts[i - 1].xid == subxid)
2032 {
2033 subidx = (i - 1);
2034 found = true;
2035 break;
2036 }
2037 }
2038
2039 /*
2040 * If it's an empty sub-transaction then we will not find the subxid
2041 * here so just cleanup the subxact info and return.
2042 */
2043 if (!found)
2044 {
2045 /* Cleanup the subxact info */
2049 return;
2050 }
2051
2052 /* open the changes file */
2055 O_RDWR, false);
2056
2057 /* OK, truncate the file at the right offset */
2061
2062 /* discard the subxacts added later */
2064
2065 /* write the updated subxact list */
2067
2070 }
2071}
2072
2073/*
2074 * Handle STREAM ABORT message.
2075 */
2076static void
2078{
2079 TransactionId xid;
2080 TransactionId subxid;
2084
2085 /* Save the message before it is consumed. */
2087 bool toplevel_xact;
2088
2090 ereport(ERROR,
2092 errmsg_internal("STREAM ABORT message without STREAM STOP")));
2093
2094 /* We receive abort information only when we can apply in parallel. */
2097
2098 xid = abort_data.xid;
2099 subxid = abort_data.subxid;
2100 toplevel_xact = (xid == subxid);
2101
2102 set_apply_error_context_xact(subxid, abort_data.abort_lsn);
2103
2105
2106 switch (apply_action)
2107 {
2108 case TRANS_LEADER_APPLY:
2109
2110 /*
2111 * We are in the leader apply worker and the transaction has been
2112 * serialized to file.
2113 */
2114 stream_abort_internal(xid, subxid);
2115
2116 elog(DEBUG1, "finished processing the STREAM ABORT command");
2117 break;
2118
2120 Assert(winfo);
2121
2122 /*
2123 * For the case of aborting the subtransaction, we increment the
2124 * number of streaming blocks and take the lock again before
2125 * sending the STREAM_ABORT to ensure that the parallel apply
2126 * worker will wait on the lock for the next set of changes after
2127 * processing the STREAM_ABORT message if it is not already
2128 * waiting for STREAM_STOP message.
2129 *
2130 * It is important to perform this locking before sending the
2131 * STREAM_ABORT message so that the leader can hold the lock first
2132 * and the parallel apply worker will wait for the leader to
2133 * release the lock. This is the same as what we do in
2134 * apply_handle_stream_stop. See Locking Considerations atop
2135 * applyparallelworker.c.
2136 */
2137 if (!toplevel_xact)
2138 {
2142 }
2143
2144 if (pa_send_data(winfo, s->len, s->data))
2145 {
2146 /*
2147 * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
2148 * wait here for the parallel apply worker to finish as that
2149 * is not required to maintain the commit order and won't have
2150 * the risk of failures due to transaction dependencies and
2151 * deadlocks. However, it is possible that before the parallel
2152 * worker finishes and we clear the worker info, the xid
2153 * wraparound happens on the upstream and a new transaction
2154 * with the same xid can appear and that can lead to duplicate
2155 * entries in ParallelApplyTxnHash. Yet another problem could
2156 * be that we may have serialized the changes in partial
2157 * serialize mode and the file containing xact changes may
2158 * already exist, and after xid wraparound trying to create
2159 * the file for the same xid can lead to an error. To avoid
2160 * these problems, we decide to wait for the aborts to finish.
2161 *
2162 * Note, it is okay to not update the flush location position
2163 * for aborts as in worst case that means such a transaction
2164 * won't be sent again after restart.
2165 */
2166 if (toplevel_xact)
2168
2169 break;
2170 }
2171
2172 /*
2173 * Switch to serialize mode when we are not able to send the
2174 * change to parallel apply worker.
2175 */
2176 pa_switch_to_partial_serialize(winfo, true);
2177
2180 Assert(winfo);
2181
2182 /*
2183 * Parallel apply worker might have applied some changes, so write
2184 * the STREAM_ABORT message so that it can rollback the
2185 * subtransaction if needed.
2186 */
2188 &original_msg);
2189
2190 if (toplevel_xact)
2191 {
2194 }
2195 break;
2196
2198
2199 /*
2200 * If the parallel apply worker is applying spooled messages then
2201 * close the file before aborting.
2202 */
2203 if (toplevel_xact && stream_fd)
2205
2207
2208 /*
2209 * We need to wait after processing rollback to savepoint for the
2210 * next set of changes.
2211 *
2212 * We have a race condition here due to which we can start waiting
2213 * here when there are more chunk of streams in the queue. See
2214 * apply_handle_stream_stop.
2215 */
2216 if (!toplevel_xact)
2218
2219 elog(DEBUG1, "finished processing the STREAM ABORT command");
2220 break;
2221
2222 default:
2223 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2224 break;
2225 }
2226
2228}
2229
2230/*
2231 * Ensure that the passed location is fileset's end.
2232 */
2233static void
2234ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
2235 pgoff_t offset)
2236{
2237 char path[MAXPGPATH];
2238 BufFile *fd;
2239 int last_fileno;
2241
2243
2245
2247
2248 fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2249
2250 BufFileSeek(fd, 0, 0, SEEK_END);
2252
2254
2256
2257 if (last_fileno != fileno || last_offset != offset)
2258 elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2259 path);
2260}
2261
2262/*
2263 * Common spoolfile processing.
2264 */
2265void
2267 XLogRecPtr lsn)
2268{
2269 int nchanges;
2270 char path[MAXPGPATH];
2271 char *buffer = NULL;
2273 ResourceOwner oldowner;
2274 int fileno;
2275 pgoff_t offset;
2276
2279
2280 /* Make sure we have an open transaction */
2282
2283 /*
2284 * Allocate file handle and memory required to process all the messages in
2285 * TopTransactionContext to avoid them getting reset after each message is
2286 * processed.
2287 */
2289
2290 /* Open the spool file for the committed/prepared transaction */
2292 elog(DEBUG1, "replaying changes from file \"%s\"", path);
2293
2294 /*
2295 * Make sure the file is owned by the toplevel transaction so that the
2296 * file will not be accidentally closed when aborting a subtransaction.
2297 */
2298 oldowner = CurrentResourceOwner;
2300
2301 stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2302
2303 CurrentResourceOwner = oldowner;
2304
2305 buffer = palloc(BLCKSZ);
2306
2308
2309 remote_final_lsn = lsn;
2310
2311 /*
2312 * Make sure the handle apply_dispatch methods are aware we're in a remote
2313 * transaction.
2314 */
2315 in_remote_transaction = true;
2317
2319
2320 /*
2321 * Read the entries one by one and pass them through the same logic as in
2322 * apply_dispatch.
2323 */
2324 nchanges = 0;
2325 while (true)
2326 {
2328 size_t nbytes;
2329 int len;
2330
2332
2333 /* read length of the on-disk record */
2334 nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2335
2336 /* have we reached end of the file? */
2337 if (nbytes == 0)
2338 break;
2339
2340 /* do we have a correct length? */
2341 if (len <= 0)
2342 elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2343 len, path);
2344
2345 /* make sure we have sufficiently large buffer */
2346 buffer = repalloc(buffer, len);
2347
2348 /* and finally read the data into the buffer */
2349 BufFileReadExact(stream_fd, buffer, len);
2350
2351 BufFileTell(stream_fd, &fileno, &offset);
2352
2353 /* init a stringinfo using the buffer and call apply_dispatch */
2354 initReadOnlyStringInfo(&s2, buffer, len);
2355
2356 /* Ensure we are reading the data into our memory context. */
2358
2360
2362
2364
2365 nchanges++;
2366
2367 /*
2368 * It is possible the file has been closed because we have processed
2369 * the transaction end message like stream_commit in which case that
2370 * must be the last message.
2371 */
2372 if (!stream_fd)
2373 {
2374 ensure_last_message(stream_fileset, xid, fileno, offset);
2375 break;
2376 }
2377
2378 if (nchanges % 1000 == 0)
2379 elog(DEBUG1, "replayed %d changes from file \"%s\"",
2380 nchanges, path);
2381 }
2382
2383 if (stream_fd)
2385
2386 elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2387 nchanges, path);
2388
2389 return;
2390}
2391
2392/*
2393 * Handle STREAM COMMIT message.
2394 */
2395static void
2397{
2398 TransactionId xid;
2402
2403 /* Save the message before it is consumed. */
2405
2407 ereport(ERROR,
2409 errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2410
2413
2415
2416 switch (apply_action)
2417 {
2418 case TRANS_LEADER_APPLY:
2419
2420 /*
2421 * The transaction has been serialized to file, so replay all the
2422 * spooled operations.
2423 */
2425 commit_data.commit_lsn);
2426
2428
2429 /* Unlink the files with serialized changes and subxact info. */
2431
2432 elog(DEBUG1, "finished processing the STREAM COMMIT command");
2433 break;
2434
2436 Assert(winfo);
2437
2438 if (pa_send_data(winfo, s->len, s->data))
2439 {
2440 /* Finish processing the streaming transaction. */
2441 pa_xact_finish(winfo, commit_data.end_lsn);
2442 break;
2443 }
2444
2445 /*
2446 * Switch to serialize mode when we are not able to send the
2447 * change to parallel apply worker.
2448 */
2449 pa_switch_to_partial_serialize(winfo, true);
2450
2453 Assert(winfo);
2454
2456 &original_msg);
2457
2459
2460 /* Finish processing the streaming transaction. */
2461 pa_xact_finish(winfo, commit_data.end_lsn);
2462 break;
2463
2465
2466 /*
2467 * If the parallel apply worker is applying spooled messages then
2468 * close the file before committing.
2469 */
2470 if (stream_fd)
2472
2474
2476
2477 /*
2478 * It is important to set the transaction state as finished before
2479 * releasing the lock. See pa_wait_for_xact_finish.
2480 */
2483
2485
2486 elog(DEBUG1, "finished processing the STREAM COMMIT command");
2487 break;
2488
2489 default:
2490 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2491 break;
2492 }
2493
2494 /*
2495 * Process any tables that are being synchronized in parallel, as well as
2496 * any newly added tables or sequences.
2497 */
2499
2501
2503}
2504
2505/*
2506 * Helper function for apply_handle_commit and apply_handle_stream_commit.
2507 */
2508static void
2510{
2511 if (is_skipping_changes())
2512 {
2514
2515 /*
2516 * Start a new transaction to clear the subskiplsn, if not started
2517 * yet.
2518 */
2519 if (!IsTransactionState())
2521 }
2522
2523 if (IsTransactionState())
2524 {
2525 /*
2526 * The transaction is either non-empty or skipped, so we clear the
2527 * subskiplsn.
2528 */
2530
2531 /*
2532 * Update origin state so we can restart streaming from correct
2533 * position in case of crash.
2534 */
2537
2539
2540 if (IsTransactionBlock())
2541 {
2542 EndTransactionBlock(false);
2544 }
2545
2546 pgstat_report_stat(false);
2547
2549 }
2550 else
2551 {
2552 /* Process any invalidation messages that might have accumulated. */
2555 }
2556
2557 in_remote_transaction = false;
2558}
2559
2560/*
2561 * Handle RELATION message.
2562 *
2563 * Note we don't do validation against local schema here. The validation
2564 * against local schema is postponed until first change for given relation
2565 * comes as we only care about it when applying changes for it anyway and we
2566 * do less locking this way.
2567 */
2568static void
2570{
2571 LogicalRepRelation *rel;
2572
2574 return;
2575
2576 rel = logicalrep_read_rel(s);
2578
2579 /* Also reset all entries in the partition map that refer to remoterel. */
2581}
2582
2583/*
2584 * Handle TYPE message.
2585 *
2586 * This implementation pays no attention to TYPE messages; we expect the user
2587 * to have set things up so that the incoming data is acceptable to the input
2588 * functions for the locally subscribed tables. Hence, we just read and
2589 * discard the message.
2590 */
2591static void
2601
2602/*
2603 * Check that we (the subscription owner) have sufficient privileges on the
2604 * target relation to perform the given operation.
2605 */
2606static void
2608{
2609 Oid relid;
2611
2612 relid = RelationGetRelid(rel);
2614 if (aclresult != ACLCHECK_OK)
2616 get_relkind_objtype(rel->rd_rel->relkind),
2617 get_rel_name(relid));
2618
2619 /*
2620 * We lack the infrastructure to honor RLS policies. It might be possible
2621 * to add such infrastructure here, but tablesync workers lack it, too, so
2622 * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2623 * but it seems dangerous to replicate a TRUNCATE and then refuse to
2624 * replicate subsequent INSERTs, so we forbid all commands the same.
2625 */
2626 if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2627 ereport(ERROR,
2629 errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2632}
2633
2634/*
2635 * Handle INSERT message.
2636 */
2637
2638static void
2640{
2643 LogicalRepRelId relid;
2646 EState *estate;
2649 bool run_as_owner;
2650
2651 /*
2652 * Quick return if we are skipping data modification changes or handling
2653 * streamed transactions.
2654 */
2655 if (is_skipping_changes() ||
2657 return;
2658
2660
2661 relid = logicalrep_read_insert(s, &newtup);
2664 {
2665 /*
2666 * The relation can't become interesting in the middle of the
2667 * transaction so it's safe to unlock it.
2668 */
2671 return;
2672 }
2673
2674 /*
2675 * Make sure that any user-supplied code runs as the table owner, unless
2676 * the user has opted out of that behavior.
2677 */
2679 if (!run_as_owner)
2680 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2681
2682 /* Set relation for error callback */
2684
2685 /* Initialize the executor state. */
2687 estate = edata->estate;
2690 &TTSOpsVirtual);
2691
2692 /* Process and store remote tuple in the slot */
2695 slot_fill_defaults(rel, estate, remoteslot);
2697
2698 /* For a partitioned table, insert the tuple into a partition. */
2699 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2702 else
2703 {
2704 ResultRelInfo *relinfo = edata->targetRelInfo;
2705
2706 ExecOpenIndices(relinfo, false);
2709 }
2710
2712
2713 /* Reset relation for error callback */
2715
2716 if (!run_as_owner)
2718
2720
2722}
2723
2724/*
2725 * Workhorse for apply_handle_insert()
2726 * relinfo is for the relation we're actually inserting into
2727 * (could be a child partition of edata->targetRelInfo)
2728 */
2729static void
2733{
2734 EState *estate = edata->estate;
2735
2736 /* Caller should have opened indexes already. */
2737 Assert(relinfo->ri_IndexRelationDescs != NULL ||
2738 !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
2739 RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
2740
2741 /* Caller will not have done this bit. */
2742 Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
2744
2745 /* Do the insert. */
2746 TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
2748}
2749
2750/*
2751 * Check if the logical replication relation is updatable and throw
2752 * appropriate error if it isn't.
2753 */
2754static void
2756{
2757 /*
2758 * For partitioned tables, we only need to care if the target partition is
2759 * updatable (aka has PK or RI defined for it).
2760 */
2761 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2762 return;
2763
2764 /* Updatable, no error. */
2765 if (rel->updatable)
2766 return;
2767
2768 /*
2769 * We are in error mode so it's fine this is somewhat slow. It's better to
2770 * give user correct error.
2771 */
2773 {
2774 ereport(ERROR,
2776 errmsg("publisher did not send replica identity column "
2777 "expected by the logical replication target relation \"%s.%s\"",
2778 rel->remoterel.nspname, rel->remoterel.relname)));
2779 }
2780
2781 ereport(ERROR,
2783 errmsg("logical replication target relation \"%s.%s\" has "
2784 "neither REPLICA IDENTITY index nor PRIMARY "
2785 "KEY and published relation does not have "
2786 "REPLICA IDENTITY FULL",
2787 rel->remoterel.nspname, rel->remoterel.relname)));
2788}
2789
2790/*
2791 * Handle UPDATE message.
2792 *
2793 * TODO: FDW support
2794 */
2795static void
2797{
2799 LogicalRepRelId relid;
2802 EState *estate;
2805 bool has_oldtup;
2809 bool run_as_owner;
2810
2811 /*
2812 * Quick return if we are skipping data modification changes or handling
2813 * streamed transactions.
2814 */
2815 if (is_skipping_changes() ||
2817 return;
2818
2820
2822 &newtup);
2825 {
2826 /*
2827 * The relation can't become interesting in the middle of the
2828 * transaction so it's safe to unlock it.
2829 */
2832 return;
2833 }
2834
2835 /* Set relation for error callback */
2837
2838 /* Check if we can do the update. */
2840
2841 /*
2842 * Make sure that any user-supplied code runs as the table owner, unless
2843 * the user has opted out of that behavior.
2844 */
2846 if (!run_as_owner)
2847 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2848
2849 /* Initialize the executor state. */
2851 estate = edata->estate;
2854 &TTSOpsVirtual);
2855
2856 /*
2857 * Populate updatedCols so that per-column triggers can fire, and so
2858 * executor can correctly pass down indexUnchanged hint. This could
2859 * include more columns than were actually changed on the publisher
2860 * because the logical replication protocol doesn't contain that
2861 * information. But it would for example exclude columns that only exist
2862 * on the subscriber, since we are not touching those.
2863 */
2865 for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2866 {
2867 CompactAttribute *att = TupleDescCompactAttr(remoteslot->tts_tupleDescriptor, i);
2868 int remoteattnum = rel->attrmap->attnums[i];
2869
2870 if (!att->attisdropped && remoteattnum >= 0)
2871 {
2872 Assert(remoteattnum < newtup.ncols);
2874 target_perminfo->updatedCols =
2875 bms_add_member(target_perminfo->updatedCols,
2877 }
2878 }
2879
2880 /* Build the search tuple. */
2883 has_oldtup ? &oldtup : &newtup);
2885
2886 /* For a partitioned table, apply update to correct partition. */
2887 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2890 else
2893
2895
2896 /* Reset relation for error callback */
2898
2899 if (!run_as_owner)
2901
2903
2905}
2906
2907/*
2908 * Workhorse for apply_handle_update()
2909 * relinfo is for the relation we're actually updating in
2910 * (could be a child partition of edata->targetRelInfo)
2911 */
2912static void
2917 Oid localindexoid)
2918{
2919 EState *estate = edata->estate;
2920 LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2921 Relation localrel = relinfo->ri_RelationDesc;
2922 EPQState epqstate;
2925 bool found;
2927
2928 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2929 ExecOpenIndices(relinfo, false);
2930
2931 found = FindReplTupleInLocalRel(edata, localrel,
2932 &relmapentry->remoterel,
2933 localindexoid,
2935
2936 /*
2937 * Tuple found.
2938 *
2939 * Note this will fail if there are other conflicting unique indexes.
2940 */
2941 if (found)
2942 {
2943 /*
2944 * Report the conflict if the tuple was modified by a different
2945 * origin.
2946 */
2948 &conflicttuple.origin, &conflicttuple.ts) &&
2950 {
2952
2953 /* Store the new tuple for conflict reporting */
2954 newslot = table_slot_create(localrel, &estate->es_tupleTable);
2955 slot_store_data(newslot, relmapentry, newtup);
2956
2957 conflicttuple.slot = localslot;
2958
2962 }
2963
2964 /* Process and store remote tuple in the slot */
2968
2969 EvalPlanQualSetSlot(&epqstate, remoteslot);
2970
2972
2973 /* Do the actual update. */
2974 TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
2975 ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2976 remoteslot);
2977 }
2978 else
2979 {
2982
2983 /*
2984 * Detecting whether the tuple was recently deleted or never existed
2985 * is crucial to avoid misleading the user during conflict handling.
2986 */
2987 if (FindDeletedTupleInLocalRel(localrel, localindexoid, remoteslot,
2988 &conflicttuple.xmin,
2989 &conflicttuple.origin,
2990 &conflicttuple.ts) &&
2993 else
2995
2996 /* Store the new tuple for conflict reporting */
2997 slot_store_data(newslot, relmapentry, newtup);
2998
2999 /*
3000 * The tuple to be updated could not be found or was deleted. Do
3001 * nothing except for emitting a log message.
3002 */
3005 }
3006
3007 /* Cleanup. */
3009 EvalPlanQualEnd(&epqstate);
3010}
3011
3012/*
3013 * Handle DELETE message.
3014 *
3015 * TODO: FDW support
3016 */
3017static void
3019{
3022 LogicalRepRelId relid;
3025 EState *estate;
3028 bool run_as_owner;
3029
3030 /*
3031 * Quick return if we are skipping data modification changes or handling
3032 * streamed transactions.
3033 */
3034 if (is_skipping_changes() ||
3036 return;
3037
3039
3040 relid = logicalrep_read_delete(s, &oldtup);
3043 {
3044 /*
3045 * The relation can't become interesting in the middle of the
3046 * transaction so it's safe to unlock it.
3047 */
3050 return;
3051 }
3052
3053 /* Set relation for error callback */
3055
3056 /* Check if we can do the delete. */
3058
3059 /*
3060 * Make sure that any user-supplied code runs as the table owner, unless
3061 * the user has opted out of that behavior.
3062 */
3064 if (!run_as_owner)
3065 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
3066
3067 /* Initialize the executor state. */
3069 estate = edata->estate;
3072 &TTSOpsVirtual);
3073
3074 /* Build the search tuple. */
3078
3079 /* For a partitioned table, apply delete to correct partition. */
3080 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3083 else
3084 {
3085 ResultRelInfo *relinfo = edata->targetRelInfo;
3086
3087 ExecOpenIndices(relinfo, false);
3091 }
3092
3094
3095 /* Reset relation for error callback */
3097
3098 if (!run_as_owner)
3100
3102
3104}
3105
3106/*
3107 * Workhorse for apply_handle_delete()
3108 * relinfo is for the relation we're actually deleting from
3109 * (could be a child partition of edata->targetRelInfo)
3110 */
3111static void
3115 Oid localindexoid)
3116{
3117 EState *estate = edata->estate;
3118 Relation localrel = relinfo->ri_RelationDesc;
3119 LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
3120 EPQState epqstate;
3123 bool found;
3124
3125 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3126
3127 /* Caller should have opened indexes already. */
3128 Assert(relinfo->ri_IndexRelationDescs != NULL ||
3129 !localrel->rd_rel->relhasindex ||
3130 RelationGetIndexList(localrel) == NIL);
3131
3132 found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
3134
3135 /* If found delete it. */
3136 if (found)
3137 {
3138 /*
3139 * Report the conflict if the tuple was modified by a different
3140 * origin.
3141 */
3143 &conflicttuple.origin, &conflicttuple.ts) &&
3145 {
3146 conflicttuple.slot = localslot;
3150 }
3151
3152 EvalPlanQualSetSlot(&epqstate, localslot);
3153
3154 /* Do the actual delete. */
3155 TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
3156 ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
3157 }
3158 else
3159 {
3160 /*
3161 * The tuple to be deleted could not be found. Do nothing except for
3162 * emitting a log message.
3163 */
3166 }
3167
3168 /* Cleanup. */
3169 EvalPlanQualEnd(&epqstate);
3170}
3171
3172/*
3173 * Try to find a tuple received from the publication side (in 'remoteslot') in
3174 * the corresponding local relation using either replica identity index,
3175 * primary key, index or if needed, sequential scan.
3176 *
3177 * Local tuple, if found, is returned in '*localslot'.
3178 */
3179static bool
3181 LogicalRepRelation *remoterel,
3185{
3186 EState *estate = edata->estate;
3187 bool found;
3188
3189 /*
3190 * Regardless of the top-level operation, we're performing a read here, so
3191 * check for SELECT privileges.
3192 */
3194
3195 *localslot = table_slot_create(localrel, &estate->es_tupleTable);
3196
3198 (remoterel->replident == REPLICA_IDENTITY_FULL));
3199
3201 {
3202#ifdef USE_ASSERT_CHECKING
3204
3205 /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
3207 (remoterel->replident == REPLICA_IDENTITY_FULL &&
3209 edata->targetRel->attrmap)));
3211#endif
3212
3213 found = RelationFindReplTupleByIndex(localrel, localidxoid,
3216 }
3217 else
3220
3221 return found;
3222}
3223
3224/*
3225 * Determine whether the index can reliably locate the deleted tuple in the
3226 * local relation.
3227 *
3228 * An index may exclude deleted tuples if it was re-indexed or re-created during
3229 * change application. Therefore, an index is considered usable only if the
3230 * conflict detection slot.xmin (conflict_detection_xmin) is greater than the
3231 * index tuple's xmin. This ensures that any tuples deleted prior to the index
3232 * creation or re-indexing are not relevant for conflict detection in the
3233 * current apply worker.
3234 *
3235 * Note that indexes may also be excluded if they were modified by other DDL
3236 * operations, such as ALTER INDEX. However, this is acceptable, as the
3237 * likelihood of such DDL changes coinciding with the need to scan dead
3238 * tuples for the update_deleted is low.
3239 */
3240static bool
3243{
3246
3248
3249 if (!HeapTupleIsValid(index_tuple)) /* should not happen */
3250 elog(ERROR, "cache lookup failed for index %u", localindexoid);
3251
3252 /*
3253 * No need to check for a frozen transaction ID, as
3254 * TransactionIdPrecedes() manages it internally, treating it as falling
3255 * behind the conflict_detection_xmin.
3256 */
3258
3260
3262}
3263
3264/*
3265 * Attempts to locate a deleted tuple in the local relation that matches the
3266 * values of the tuple received from the publication side (in 'remoteslot').
3267 * The search is performed using either the replica identity index, primary
3268 * key, other available index, or a sequential scan if necessary.
3269 *
3270 * Returns true if the deleted tuple is found. If found, the transaction ID,
3271 * origin, and commit timestamp of the deletion are stored in '*delete_xid',
3272 * '*delete_origin', and '*delete_time' respectively.
3273 */
3274static bool
3279{
3281
3282 /*
3283 * Return false if either dead tuples are not retained or commit timestamp
3284 * data is not available.
3285 */
3287 return false;
3288
3289 /*
3290 * For conflict detection, we use the leader worker's
3291 * oldest_nonremovable_xid value instead of invoking
3292 * GetOldestNonRemovableTransactionId() or using the conflict detection
3293 * slot's xmin. The oldest_nonremovable_xid acts as a threshold to
3294 * identify tuples that were recently deleted. These deleted tuples are no
3295 * longer visible to concurrent transactions. However, if a remote update
3296 * matches such a tuple, we log an update_deleted conflict.
3297 *
3298 * While GetOldestNonRemovableTransactionId() and slot.xmin may return
3299 * transaction IDs older than oldest_nonremovable_xid, for our current
3300 * purpose, it is acceptable to treat tuples deleted by transactions prior
3301 * to oldest_nonremovable_xid as update_missing conflicts.
3302 */
3304 {
3306 }
3307 else
3308 {
3309 LogicalRepWorker *leader;
3310
3311 /*
3312 * Obtain the information from the leader apply worker as only the
3313 * leader manages oldest_nonremovable_xid (see
3314 * maybe_advance_nonremovable_xid() for details).
3315 */
3319 false);
3320 if (!leader)
3321 {
3322 ereport(ERROR,
3324 errmsg("could not detect conflict as the leader apply worker has exited")));
3325 }
3326
3327 SpinLockAcquire(&leader->relmutex);
3329 SpinLockRelease(&leader->relmutex);
3331 }
3332
3333 /*
3334 * Return false if the leader apply worker has stopped retaining
3335 * information for detecting conflicts. This implies that update_deleted
3336 * can no longer be reliably detected.
3337 */
3339 return false;
3340
3341 if (OidIsValid(localidxoid) &&
3346 delete_time);
3347 else
3351}
3352
3353/*
3354 * This handles insert, update, delete on a partitioned table.
3355 */
3356static void
3360 CmdType operation)
3361{
3362 EState *estate = edata->estate;
3363 LogicalRepRelMapEntry *relmapentry = edata->targetRel;
3364 ResultRelInfo *relinfo = edata->targetRelInfo;
3365 Relation parentrel = relinfo->ri_RelationDesc;
3366 ModifyTableState *mtstate;
3367 PartitionTupleRouting *proute;
3369 Relation partrel;
3371 TupleConversionMap *map;
3374 AttrMap *attrmap = NULL;
3375
3376 /* ModifyTableState is needed for ExecFindPartition(). */
3377 edata->mtstate = mtstate = makeNode(ModifyTableState);
3378 mtstate->ps.plan = NULL;
3379 mtstate->ps.state = estate;
3380 mtstate->operation = operation;
3381 mtstate->resultRelInfo = relinfo;
3382
3383 /* ... as is PartitionTupleRouting. */
3384 edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
3385
3386 /*
3387 * Find the partition to which the "search tuple" belongs.
3388 */
3391 partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
3392 remoteslot, estate);
3394 partrel = partrelinfo->ri_RelationDesc;
3395
3396 /*
3397 * Check for supported relkind. We need this since partitions might be of
3398 * unsupported relkinds; and the set of partitions can change, so checking
3399 * at CREATE/ALTER SUBSCRIPTION would be insufficient.
3400 */
3401 CheckSubscriptionRelkind(partrel->rd_rel->relkind,
3402 relmapentry->remoterel.relkind,
3404 RelationGetRelationName(partrel));
3405
3406 /*
3407 * To perform any of the operations below, the tuple must match the
3408 * partition's rowtype. Convert if needed or just copy, using a dedicated
3409 * slot to store the tuple in any case.
3410 */
3411 remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
3412 if (remoteslot_part == NULL)
3413 remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
3414 map = ExecGetRootToChildMap(partrelinfo, estate);
3415 if (map != NULL)
3416 {
3417 attrmap = map->attrMap;
3420 }
3421 else
3422 {
3425 }
3427
3428 /* Check if we can do the update or delete on the leaf partition. */
3429 if (operation == CMD_UPDATE || operation == CMD_DELETE)
3430 {
3431 part_entry = logicalrep_partition_open(relmapentry, partrel,
3432 attrmap);
3434 }
3435
3436 switch (operation)
3437 {
3438 case CMD_INSERT:
3441 break;
3442
3443 case CMD_DELETE:
3446 part_entry->localindexoid);
3447 break;
3448
3449 case CMD_UPDATE:
3450
3451 /*
3452 * For UPDATE, depending on whether or not the updated tuple
3453 * satisfies the partition's constraint, perform a simple UPDATE
3454 * of the partition or move the updated tuple into a different
3455 * suitable partition.
3456 */
3457 {
3461 bool found;
3462 EPQState epqstate;
3464
3465 /* Get the matching local tuple from the partition. */
3466 found = FindReplTupleInLocalRel(edata, partrel,
3467 &part_entry->remoterel,
3468 part_entry->localindexoid,
3470 if (!found)
3471 {
3474
3475 /*
3476 * Detecting whether the tuple was recently deleted or
3477 * never existed is crucial to avoid misleading the user
3478 * during conflict handling.
3479 */
3480 if (FindDeletedTupleInLocalRel(partrel,
3481 part_entry->localindexoid,
3483 &conflicttuple.xmin,
3484 &conflicttuple.origin,
3485 &conflicttuple.ts) &&
3488 else
3490
3491 /* Store the new tuple for conflict reporting */
3493
3494 /*
3495 * The tuple to be updated could not be found or was
3496 * deleted. Do nothing except for emitting a log message.
3497 */
3501
3502 return;
3503 }
3504
3505 /*
3506 * Report the conflict if the tuple was modified by a
3507 * different origin.
3508 */
3510 &conflicttuple.origin,
3511 &conflicttuple.ts) &&
3513 {
3515
3516 /* Store the new tuple for conflict reporting */
3517 newslot = table_slot_create(partrel, &estate->es_tupleTable);
3519
3520 conflicttuple.slot = localslot;
3521
3525 }
3526
3527 /*
3528 * Apply the update to the local tuple, putting the result in
3529 * remoteslot_part.
3530 */
3533 newtup);
3535
3536 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3537
3538 /*
3539 * Does the updated tuple still satisfy the current
3540 * partition's constraint?
3541 */
3542 if (!partrel->rd_rel->relispartition ||
3544 false))
3545 {
3546 /*
3547 * Yes, so simply UPDATE the partition. We don't call
3548 * apply_handle_update_internal() here, which would
3549 * normally do the following work, to avoid repeating some
3550 * work already done above to find the local tuple in the
3551 * partition.
3552 */
3554
3556 TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
3557 ACL_UPDATE);
3558 ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3560 }
3561 else
3562 {
3563 /* Move the tuple into the new partition. */
3564
3565 /*
3566 * New partition will be found using tuple routing, which
3567 * can only occur via the parent table. We might need to
3568 * convert the tuple to the parent's rowtype. Note that
3569 * this is the tuple found in the partition, not the
3570 * original search tuple received by this function.
3571 */
3572 if (map)
3573 {
3577
3578 remoteslot =
3581 }
3582 else
3583 {
3586 }
3587
3588 /* Find the new partition. */
3591 proute, remoteslot,
3592 estate);
3595 partrel_new = partrelinfo_new->ri_RelationDesc;
3596
3597 /* Check that new partition also has supported relkind. */
3598 CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3599 relmapentry->remoterel.relkind,
3602
3603 /* DELETE old tuple found in the old partition. */
3604 EvalPlanQualSetSlot(&epqstate, localslot);
3605 TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);
3606 ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
3607
3608 /* INSERT new tuple into the new partition. */
3609
3610 /*
3611 * Convert the replacement tuple to match the destination
3612 * partition rowtype.
3613 */
3615 remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3616 if (remoteslot_part == NULL)
3618 &estate->es_tupleTable);
3620 if (map != NULL)
3621 {
3623 remoteslot,
3625 }
3626 else
3627 {
3629 remoteslot);
3631 }
3635 }
3636
3637 EvalPlanQualEnd(&epqstate);
3638 }
3639 break;
3640
3641 default:
3642 elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3643 break;
3644 }
3645}
3646
3647/*
3648 * Handle TRUNCATE message.
3649 *
3650 * TODO: FDW support
3651 */
3652static void
3654{
3655 bool cascade = false;
3656 bool restart_seqs = false;
3658 List *remote_rels = NIL;
3659 List *rels = NIL;
3660 List *part_rels = NIL;
3661 List *relids = NIL;
3663 ListCell *lc;
3664 LOCKMODE lockmode = AccessExclusiveLock;
3665
3666 /*
3667 * Quick return if we are skipping data modification changes or handling
3668 * streamed transactions.
3669 */
3670 if (is_skipping_changes() ||
3672 return;
3673
3675
3676 remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3677
3678 foreach(lc, remote_relids)
3679 {
3680 LogicalRepRelId relid = lfirst_oid(lc);
3682
3683 rel = logicalrep_rel_open(relid, lockmode);
3685 {
3686 /*
3687 * The relation can't become interesting in the middle of the
3688 * transaction so it's safe to unlock it.
3689 */
3690 logicalrep_rel_close(rel, lockmode);
3691 continue;
3692 }
3693
3696 rels = lappend(rels, rel->localrel);
3697 relids = lappend_oid(relids, rel->localreloid);
3700
3701 /*
3702 * Truncate partitions if we got a message to truncate a partitioned
3703 * table.
3704 */
3705 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3706 {
3707 ListCell *child;
3708 List *children = find_all_inheritors(rel->localreloid,
3709 lockmode,
3710 NULL);
3711
3712 foreach(child, children)
3713 {
3714 Oid childrelid = lfirst_oid(child);
3716
3717 if (list_member_oid(relids, childrelid))
3718 continue;
3719
3720 /* find_all_inheritors already got lock */
3722
3723 /*
3724 * Ignore temp tables of other backends. See similar code in
3725 * ExecuteTruncate().
3726 */
3728 {
3729 table_close(childrel, lockmode);
3730 continue;
3731 }
3732
3734 rels = lappend(rels, childrel);
3736 relids = lappend_oid(relids, childrelid);
3737 /* Log this relation only if needed for logical decoding */
3740 }
3741 }
3742 }
3743
3744 /*
3745 * Even if we used CASCADE on the upstream primary we explicitly default
3746 * to replaying changes without further cascading. This might be later
3747 * changeable with a user specified option.
3748 *
3749 * MySubscription->runasowner tells us whether we want to execute
3750 * replication actions as the subscription owner; the last argument to
3751 * TruncateGuts tells it whether we want to switch to the table owner.
3752 * Those are exactly opposite conditions.
3753 */
3755 relids,
3758 restart_seqs,
3760 foreach(lc, remote_rels)
3761 {
3763
3765 }
3766 foreach(lc, part_rels)
3767 {
3768 Relation rel = lfirst(lc);
3769
3770 table_close(rel, NoLock);
3771 }
3772
3774}
3775
3776
3777/*
3778 * Logical replication protocol message dispatcher.
3779 */
3780void
3782{
3783 LogicalRepMsgType action = pq_getmsgbyte(s);
3785
3786 /*
3787 * Set the current command being applied. Since this function can be
3788 * called recursively when applying spooled changes, save the current
3789 * command.
3790 */
3793
3794 switch (action)
3795 {
3798 break;
3799
3802 break;
3803
3806 break;
3807
3810 break;
3811
3814 break;
3815
3818 break;
3819
3822 break;
3823
3826 break;
3827
3830 break;
3831
3833
3834 /*
3835 * Logical replication does not use generic logical messages yet.
3836 * Although, it could be used by other applications that use this
3837 * output plugin.
3838 */
3839 break;
3840
3843 break;
3844
3847 break;
3848
3851 break;
3852
3855 break;
3856
3859 break;
3860
3863 break;
3864
3867 break;
3868
3871 break;
3872
3875 break;
3876
3877 default:
3878 ereport(ERROR,
3880 errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3881 }
3882
3883 /* Reset the current command */
3885}
3886
3887/*
3888 * Figure out which write/flush positions to report to the walsender process.
3889 *
3890 * We can't simply report back the last LSN the walsender sent us because the
3891 * local transaction might not yet be flushed to disk locally. Instead we
3892 * build a list that associates local with remote LSNs for every commit. When
3893 * reporting back the flush position to the sender we iterate that list and
3894 * check which entries on it are already locally flushed. Those we can report
3895 * as having been flushed.
3896 *
3897 * The have_pending_txes is true if there are outstanding transactions that
3898 * need to be flushed.
3899 */
3900static void
3902 bool *have_pending_txes)
3903{
3904 dlist_mutable_iter iter;
3906
3908 *flush = InvalidXLogRecPtr;
3909
3911 {
3912 FlushPosition *pos =
3913 dlist_container(FlushPosition, node, iter.cur);
3914
3915 *write = pos->remote_end;
3916
3917 if (pos->local_end <= local_flush)
3918 {
3919 *flush = pos->remote_end;
3920 dlist_delete(iter.cur);
3921 pfree(pos);
3922 }
3923 else
3924 {
3925 /*
3926 * Don't want to uselessly iterate over the rest of the list which
3927 * could potentially be long. Instead get the last element and
3928 * grab the write position from there.
3929 */
3931 &lsn_mapping);
3932 *write = pos->remote_end;
3933 *have_pending_txes = true;
3934 return;
3935 }
3936 }
3937
3939}
3940
3941/*
3942 * Store current remote/local lsn pair in the tracking list.
3943 */
3944void
3946{
3948
3949 /*
3950 * Skip for parallel apply workers, because the lsn_mapping is maintained
3951 * by the leader apply worker.
3952 */
3954 return;
3955
3956 /* Need to do this in permanent context */
3958
3959 /* Track commit lsn */
3961 flushpos->local_end = local_lsn;
3962 flushpos->remote_end = remote_lsn;
3963
3966}
3967
3968
3969/* Update statistics of the worker. */
3970static void
3982
3983/*
3984 * Apply main loop.
3985 */
3986static void
3988{
3990 bool ping_sent = false;
3991 TimeLineID tli;
3992 ErrorContextCallback errcallback;
3994
3995 /*
3996 * Init the ApplyMessageContext which we clean up after each replication
3997 * protocol message.
3998 */
4000 "ApplyMessageContext",
4002
4003 /*
4004 * This memory context is used for per-stream data when the streaming mode
4005 * is enabled. This context is reset on each stream stop.
4006 */
4008 "LogicalStreamingContext",
4010
4011 /* mark as idle, before starting to loop */
4013
4014 /*
4015 * Push apply error context callback. Fields will be filled while applying
4016 * a change.
4017 */
4018 errcallback.callback = apply_error_callback;
4019 errcallback.previous = error_context_stack;
4020 error_context_stack = &errcallback;
4022
4023 /* This outer loop iterates once per wait. */
4024 for (;;)
4025 {
4027 int rc;
4028 int len;
4029 char *buf = NULL;
4030 bool endofstream = false;
4031 long wait_time;
4032
4034
4036
4038
4039 if (len != 0)
4040 {
4041 /* Loop to process all available data (without blocking). */
4042 for (;;)
4043 {
4045
4046 if (len == 0)
4047 {
4048 break;
4049 }
4050 else if (len < 0)
4051 {
4052 ereport(LOG,
4053 (errmsg("data stream from publisher has ended")));
4054 endofstream = true;
4055 break;
4056 }
4057 else
4058 {
4059 int c;
4061
4063 {
4064 ConfigReloadPending = false;
4066 }
4067
4068 /* Reset timeout. */
4070 ping_sent = false;
4071
4072 rdt_data.last_recv_time = last_recv_timestamp;
4073
4074 /* Ensure we are reading the data into our memory context. */
4076
4078
4079 c = pq_getmsgbyte(&s);
4080
4081 if (c == PqReplMsg_WALData)
4082 {
4083 XLogRecPtr start_lsn;
4084 XLogRecPtr end_lsn;
4086
4087 start_lsn = pq_getmsgint64(&s);
4088 end_lsn = pq_getmsgint64(&s);
4090
4091 if (last_received < start_lsn)
4092 last_received = start_lsn;
4093
4094 if (last_received < end_lsn)
4095 last_received = end_lsn;
4096
4098
4099 apply_dispatch(&s);
4100
4102 }
4103 else if (c == PqReplMsg_Keepalive)
4104 {
4105 XLogRecPtr end_lsn;
4107 bool reply_requested;
4108
4109 end_lsn = pq_getmsgint64(&s);
4112
4113 if (last_received < end_lsn)
4114 last_received = end_lsn;
4115
4117
4119
4121 }
4122 else if (c == PqReplMsg_PrimaryStatusUpdate)
4123 {
4124 rdt_data.remote_lsn = pq_getmsgint64(&s);
4125 rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
4127 rdt_data.reply_time = pq_getmsgint64(&s);
4128
4129 /*
4130 * This should never happen, see
4131 * ProcessStandbyPSRequestMessage. But if it happens
4132 * due to a bug, we don't want to proceed as it can
4133 * incorrectly advance oldest_nonremovable_xid.
4134 */
4135 if (!XLogRecPtrIsValid(rdt_data.remote_lsn))
4136 elog(ERROR, "cannot get the latest WAL position from the publisher");
4137
4139
4140 UpdateWorkerStats(last_received, rdt_data.reply_time, false);
4141 }
4142 /* other message types are purposefully ignored */
4143
4145 }
4146
4148 }
4149 }
4150
4151 /* confirm all writes so far */
4152 send_feedback(last_received, false, false);
4153
4154 /* Reset the timestamp if no message was received */
4155 rdt_data.last_recv_time = 0;
4156
4158
4160 {
4161 /*
4162 * If we didn't get any transactions for a while there might be
4163 * unconsumed invalidation messages in the queue, consume them
4164 * now.
4165 */
4168
4169 /*
4170 * Process any relations that are being synchronized in parallel
4171 * and any newly added tables or sequences.
4172 */
4174 }
4175
4176 /* Cleanup the memory. */
4179
4180 /* Check if we need to exit the streaming loop. */
4181 if (endofstream)
4182 break;
4183
4184 /*
4185 * Wait for more data or latch. If we have unflushed transactions,
4186 * wake up after WalWriterDelay to see if they've been flushed yet (in
4187 * which case we should send a feedback message). Otherwise, there's
4188 * no particular urgency about waking up unless we get data or a
4189 * signal.
4190 */
4193 else
4195
4196 /*
4197 * Ensure to wake up when it's possible to advance the non-removable
4198 * transaction ID, or when the retention duration may have exceeded
4199 * max_retention_duration.
4200 */
4202 {
4203 if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
4204 rdt_data.xid_advance_interval)
4205 wait_time = Min(wait_time, rdt_data.xid_advance_interval);
4206 else if (MySubscription->maxretention > 0)
4208 }
4209
4213 fd, wait_time,
4215
4216 if (rc & WL_LATCH_SET)
4217 {
4220 }
4221
4223 {
4224 ConfigReloadPending = false;
4226 }
4227
4228 if (rc & WL_TIMEOUT)
4229 {
4230 /*
4231 * We didn't receive anything new. If we haven't heard anything
4232 * from the server for more than wal_receiver_timeout / 2, ping
4233 * the server. Also, if it's been longer than
4234 * wal_receiver_status_interval since the last update we sent,
4235 * send a status update to the primary anyway, to report any
4236 * progress in applying WAL.
4237 */
4238 bool requestReply = false;
4239
4240 /*
4241 * Check if time since last receive from primary has reached the
4242 * configured limit.
4243 */
4244 if (wal_receiver_timeout > 0)
4245 {
4248
4249 timeout =
4252
4253 if (now >= timeout)
4254 ereport(ERROR,
4256 errmsg("terminating logical replication worker due to timeout")));
4257
4258 /* Check to see if it's time for a ping. */
4259 if (!ping_sent)
4260 {
4262 (wal_receiver_timeout / 2));
4263 if (now >= timeout)
4264 {
4265 requestReply = true;
4266 ping_sent = true;
4267 }
4268 }
4269 }
4270
4272
4274
4275 /*
4276 * Force reporting to ensure long idle periods don't lead to
4277 * arbitrarily delayed stats. Stats can only be reported outside
4278 * of (implicit or explicit) transactions. That shouldn't lead to
4279 * stats being delayed for long, because transactions are either
4280 * sent as a whole on commit or streamed. Streamed transactions
4281 * are spilled to disk and applied on commit.
4282 */
4283 if (!IsTransactionState())
4284 pgstat_report_stat(true);
4285 }
4286 }
4287
4288 /* Pop the error context stack */
4289 error_context_stack = errcallback.previous;
4291
4292 /* All done */
4294}
4295
4296/*
4297 * Send a Standby Status Update message to server.
4298 *
4299 * 'recvpos' is the latest LSN we've received data to, force is set if we need
4300 * to send a response to avoid timeouts.
4301 */
4302static void
4304{
4305 static StringInfo reply_message = NULL;
4306 static TimestampTz send_time = 0;
4307
4310
4314 bool have_pending_txes;
4315
4316 /*
4317 * If the user doesn't want status to be reported to the publisher, be
4318 * sure to exit before doing anything at all.
4319 */
4320 if (!force && wal_receiver_status_interval <= 0)
4321 return;
4322
4323 /* It's legal to not pass a recvpos */
4324 if (recvpos < last_recvpos)
4326
4328
4329 /*
4330 * No outstanding transactions to flush, we can report the latest received
4331 * position. This is important for synchronous replication.
4332 */
4333 if (!have_pending_txes)
4335
4336 if (writepos < last_writepos)
4338
4339 if (flushpos < last_flushpos)
4341
4343
4344 /* if we've already reported everything we're good */
4345 if (!force &&
4350 return;
4351 send_time = now;
4352
4353 if (!reply_message)
4354 {
4356
4359 }
4360 else
4362
4364 pq_sendint64(reply_message, recvpos); /* write */
4365 pq_sendint64(reply_message, flushpos); /* flush */
4366 pq_sendint64(reply_message, writepos); /* apply */
4367 pq_sendint64(reply_message, now); /* sendTime */
4368 pq_sendbyte(reply_message, requestReply); /* replyRequested */
4369
4370 elog(DEBUG2, "sending feedback (force %d) to recv %X/%08X, write %X/%08X, flush %X/%08X",
4371 force,
4375
4378
4379 if (recvpos > last_recvpos)
4381 if (writepos > last_writepos)
4383 if (flushpos > last_flushpos)
4385}
4386
4387/*
4388 * Attempt to advance the non-removable transaction ID.
4389 *
4390 * See comments atop worker.c for details.
4391 */
4392static void
4401
4402/*
4403 * Preliminary check to determine if advancing the non-removable transaction ID
4404 * is allowed.
4405 */
4406static bool
4408{
4409 /*
4410 * It is sufficient to manage non-removable transaction ID for a
4411 * subscription by the main apply worker to detect update_deleted reliably
4412 * even for table sync or parallel apply workers.
4413 */
4415 return false;
4416
4417 /* No need to advance if retaining dead tuples is not required */
4419 return false;
4420
4421 return true;
4422}
4423
4424/*
4425 * Process phase transitions during the non-removable transaction ID
4426 * advancement. See comments atop worker.c for details of the transition.
4427 */
4428static void
4454
4455/*
4456 * Workhorse for the RDT_GET_CANDIDATE_XID phase.
4457 */
4458static void
4460{
4463
4464 /*
4465 * Use last_recv_time when applying changes in the loop to avoid
4466 * unnecessary system time retrieval. If last_recv_time is not available,
4467 * obtain the current timestamp.
4468 */
4469 now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4470
4471 /*
4472 * Compute the candidate_xid and request the publisher status at most once
4473 * per xid_advance_interval. Refer to adjust_xid_advance_interval() for
4474 * details on how this value is dynamically adjusted. This is to avoid
4475 * using CPU and network resources without making much progress.
4476 */
4477 if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4478 rdt_data->xid_advance_interval))
4479 return;
4480
4481 /*
4482 * Immediately update the timer, even if the function returns later
4483 * without setting candidate_xid due to inactivity on the subscriber. This
4484 * avoids frequent calls to GetOldestActiveTransactionId.
4485 */
4486 rdt_data->candidate_xid_time = now;
4487
4488 /*
4489 * Consider transactions in the current database, as only dead tuples from
4490 * this database are required for conflict detection.
4491 */
4493
4494 /*
4495 * Oldest active transaction ID (oldest_running_xid) can't be behind any
4496 * of its previously computed value.
4497 */
4500
4501 /* Return if the oldest_nonremovable_xid cannot be advanced */
4504 {
4506 return;
4507 }
4508
4510
4511 rdt_data->candidate_xid = oldest_running_xid;
4513
4514 /* process the next phase */
4516}
4517
4518/*
4519 * Workhorse for the RDT_REQUEST_PUBLISHER_STATUS phase.
4520 */
4521static void
4523{
4525
4526 if (!request_message)
4527 {
4529
4532 }
4533 else
4535
4536 /*
4537 * Send the current time to update the remote walsender's latest reply
4538 * message received time.
4539 */
4542
4543 elog(DEBUG2, "sending publisher status request message");
4544
4545 /* Send a request for the publisher status */
4547 request_message->data, request_message->len);
4548
4550
4551 /*
4552 * Skip calling maybe_advance_nonremovable_xid() since further transition
4553 * is possible only once we receive the publisher status message.
4554 */
4555}
4556
4557/*
4558 * Workhorse for the RDT_WAIT_FOR_PUBLISHER_STATUS phase.
4559 */
4560static void
4562 bool status_received)
4563{
4564 /*
4565 * Return if we have requested but not yet received the publisher status.
4566 */
4567 if (!status_received)
4568 return;
4569
4570 /*
4571 * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4572 * retaining conflict information for this worker.
4573 */
4575 {
4577 return;
4578 }
4579
4580 if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
4581 rdt_data->remote_wait_for = rdt_data->remote_nextxid;
4582
4583 /*
4584 * Check if all remote concurrent transactions that were active at the
4585 * first status request have now completed. If completed, proceed to the
4586 * next phase; otherwise, continue checking the publisher status until
4587 * these transactions finish.
4588 *
4589 * It's possible that transactions in the commit phase during the last
4590 * cycle have now finished committing, but remote_oldestxid remains older
4591 * than remote_wait_for. This can happen if some old transaction came in
4592 * the commit phase when we requested status in this cycle. We do not
4593 * handle this case explicitly as it's rare and the benefit doesn't
4594 * justify the required complexity. Tracking would require either caching
4595 * all xids at the publisher or sending them to subscribers. The condition
4596 * will resolve naturally once the remaining transactions are finished.
4597 *
4598 * Directly advancing the non-removable transaction ID is possible if
4599 * there are no activities on the publisher since the last advancement
4600 * cycle. However, it requires maintaining two fields, last_remote_nextxid
4601 * and last_remote_lsn, within the structure for comparison with the
4602 * current cycle's values. Considering the minimal cost of continuing in
4603 * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to
4604 * advance the transaction ID here.
4605 */
4606 if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for,
4607 rdt_data->remote_oldestxid))
4609 else
4611
4612 /* process the next phase */
4614}
4615
4616/*
4617 * Workhorse for the RDT_WAIT_FOR_LOCAL_FLUSH phase.
4618 */
4619static void
4621{
4622 Assert(XLogRecPtrIsValid(rdt_data->remote_lsn) &&
4623 TransactionIdIsValid(rdt_data->candidate_xid));
4624
4625 /*
4626 * We expect the publisher and subscriber clocks to be in sync using time
4627 * sync service like NTP. Otherwise, we will advance this worker's
4628 * oldest_nonremovable_xid prematurely, leading to the removal of rows
4629 * required to detect update_deleted reliably. This check primarily
4630 * addresses scenarios where the publisher's clock falls behind; if the
4631 * publisher's clock is ahead, subsequent transactions will naturally bear
4632 * later commit timestamps, conforming to the design outlined atop
4633 * worker.c.
4634 *
4635 * XXX Consider waiting for the publisher's clock to catch up with the
4636 * subscriber's before proceeding to the next phase.
4637 */
4638 if (TimestampDifferenceExceeds(rdt_data->reply_time,
4639 rdt_data->candidate_xid_time, 0))
4640 ereport(ERROR,
4641 errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"),
4642 errdetail_internal("The clock on the publisher is behind that of the subscriber."));
4643
4644 /*
4645 * Do not attempt to advance the non-removable transaction ID when table
4646 * sync is in progress. During this time, changes from a single
4647 * transaction may be applied by multiple table sync workers corresponding
4648 * to the target tables. So, it's necessary for all table sync workers to
4649 * apply and flush the corresponding changes before advancing the
4650 * transaction ID, otherwise, dead tuples that are still needed for
4651 * conflict detection in table sync workers could be removed prematurely.
4652 * However, confirming the apply and flush progress across all table sync
4653 * workers is complex and not worth the effort, so we simply return if not
4654 * all tables are in the READY state.
4655 *
4656 * Advancing the transaction ID is necessary even when no tables are
4657 * currently subscribed, to avoid retaining dead tuples unnecessarily.
4658 * While it might seem safe to skip all phases and directly assign
4659 * candidate_xid to oldest_nonremovable_xid during the
4660 * RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users
4661 * concurrently add tables to the subscription, the apply worker may not
4662 * process invalidations in time. Consequently,
4663 * HasSubscriptionTablesCached() might miss the new tables, leading to
4664 * premature advancement of oldest_nonremovable_xid.
4665 *
4666 * Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as
4667 * invalidations are guaranteed to be processed before applying changes
4668 * from newly added tables while waiting for the local flush to reach
4669 * remote_lsn.
4670 *
4671 * Additionally, even if we check for subscription tables during
4672 * RDT_GET_CANDIDATE_XID, they might be dropped before reaching
4673 * RDT_WAIT_FOR_LOCAL_FLUSH. Therefore, it's still necessary to verify
4674 * subscription tables at this stage to prevent unnecessary tuple
4675 * retention.
4676 */
4678 {
4680
4681 now = rdt_data->last_recv_time
4682 ? rdt_data->last_recv_time : GetCurrentTimestamp();
4683
4684 /*
4685 * Record the time spent waiting for table sync, it is needed for the
4686 * timeout check in should_stop_conflict_info_retention().
4687 */
4688 rdt_data->table_sync_wait_time =
4689 TimestampDifferenceMilliseconds(rdt_data->candidate_xid_time, now);
4690
4691 return;
4692 }
4693
4694 /*
4695 * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4696 * retaining conflict information for this worker.
4697 */
4699 {
4701 return;
4702 }
4703
4704 /*
4705 * Update and check the remote flush position if we are applying changes
4706 * in a loop. This is done at most once per WalWriterDelay to avoid
4707 * performing costly operations in get_flush_position() too frequently
4708 * during change application.
4709 */
4710 if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time &&
4711 TimestampDifferenceExceeds(rdt_data->flushpos_update_time,
4712 rdt_data->last_recv_time, WalWriterDelay))
4713 {
4716 bool have_pending_txes;
4717
4718 /* Fetch the latest remote flush position */
4720
4721 if (flushpos > last_flushpos)
4723
4724 rdt_data->flushpos_update_time = rdt_data->last_recv_time;
4725 }
4726
4727 /* Return to wait for the changes to be applied */
4728 if (last_flushpos < rdt_data->remote_lsn)
4729 return;
4730
4731 /*
4732 * Reaching this point implies should_stop_conflict_info_retention()
4733 * returned false earlier, meaning that the most recent duration for
4734 * advancing the non-removable transaction ID is within the
4735 * max_retention_duration or max_retention_duration is set to 0.
4736 *
4737 * Therefore, if conflict info retention was previously stopped due to a
4738 * timeout, it is now safe to resume retention.
4739 */
4741 {
4743 return;
4744 }
4745
4746 /*
4747 * Reaching here means the remote WAL position has been received, and all
4748 * transactions up to that position on the publisher have been applied and
4749 * flushed locally. So, we can advance the non-removable transaction ID.
4750 */
4754
4755 elog(DEBUG2, "confirmed flush up to remote lsn %X/%08X: new oldest_nonremovable_xid %u",
4756 LSN_FORMAT_ARGS(rdt_data->remote_lsn),
4757 rdt_data->candidate_xid);
4758
4759 /* Notify launcher to update the xmin of the conflict slot */
4761
4763
4764 /* process the next phase */
4766}
4767
4768/*
4769 * Check whether conflict information retention should be stopped due to
4770 * exceeding the maximum wait time (max_retention_duration).
4771 *
4772 * If retention should be stopped, return true. Otherwise, return false.
4773 */
4774static bool
4776{
4778
4779 Assert(TransactionIdIsValid(rdt_data->candidate_xid));
4782
4784 return false;
4785
4786 /*
4787 * Use last_recv_time when applying changes in the loop to avoid
4788 * unnecessary system time retrieval. If last_recv_time is not available,
4789 * obtain the current timestamp.
4790 */
4791 now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4792
4793 /*
4794 * Return early if the wait time has not exceeded the configured maximum
4795 * (max_retention_duration). Time spent waiting for table synchronization
4796 * is excluded from this calculation, as it occurs infrequently.
4797 */
4798 if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4800 rdt_data->table_sync_wait_time))
4801 return false;
4802
4803 return true;
4804}
4805
4806/*
4807 * Workhorse for the RDT_STOP_CONFLICT_INFO_RETENTION phase.
4808 */
4809static void
4811{
4812 /* Stop retention if not yet */
4814 {
4815 /*
4816 * If the retention status cannot be updated (e.g., due to active
4817 * transaction), skip further processing to avoid inconsistent
4818 * retention behavior.
4819 */
4820 if (!update_retention_status(false))
4821 return;
4822
4826
4827 ereport(LOG,
4828 errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
4830 errdetail("Retention is stopped because the apply process has not caught up with the publisher within the configured max_retention_duration."));
4831 }
4832
4834
4835 /*
4836 * If retention has been stopped, reset to the initial phase to retry
4837 * resuming retention. This reset is required to recalculate the current
4838 * wait time and resume retention if the time falls within
4839 * max_retention_duration.
4840 */
4842}
4843
4844/*
4845 * Workhorse for the RDT_RESUME_CONFLICT_INFO_RETENTION phase.
4846 */
4847static void
4849{
4850 /* We can't resume retention without updating retention status. */
4851 if (!update_retention_status(true))
4852 return;
4853
4854 ereport(LOG,
4855 errmsg("logical replication worker for subscription \"%s\" will resume retaining the information for detecting conflicts",
4858 ? errdetail("Retention is re-enabled because the apply process has caught up with the publisher within the configured max_retention_duration.")
4859 : errdetail("Retention is re-enabled because max_retention_duration has been set to unlimited."));
4860
4861 /*
4862 * Restart the worker to let the launcher initialize
4863 * oldest_nonremovable_xid at startup.
4864 *
4865 * While it's technically possible to derive this value on-the-fly using
4866 * the conflict detection slot's xmin, doing so risks a race condition:
4867 * the launcher might clean slot.xmin just after retention resumes. This
4868 * would make oldest_nonremovable_xid unreliable, especially during xid
4869 * wraparound.
4870 *
4871 * Although this can be prevented by introducing heavy weight locking, the
4872 * complexity it will bring doesn't seem worthwhile given how rarely
4873 * retention is resumed.
4874 */
4876}
4877
4878/*
4879 * Updates pg_subscription.subretentionactive to the given value within a
4880 * new transaction.
4881 *
4882 * If already inside an active transaction, skips the update and returns
4883 * false.
4884 *
4885 * Returns true if the update is successfully performed.
4886 */
4887static bool
4889{
4890 /*
4891 * Do not update the catalog during an active transaction. The transaction
4892 * may be started during change application, leading to a possible
4893 * rollback of catalog updates if the application fails subsequently.
4894 */
4895 if (IsTransactionState())
4896 return false;
4897
4899
4900 /*
4901 * Updating pg_subscription might involve TOAST table access, so ensure we
4902 * have a valid snapshot.
4903 */
4905
4906 /* Update pg_subscription.subretentionactive */
4908
4911
4912 /* Notify launcher to update the conflict slot */
4914
4916
4917 return true;
4918}
4919
4920/*
4921 * Reset all data fields of RetainDeadTuplesData except those used to
4922 * determine the timing for the next round of transaction ID advancement. We
4923 * can even use flushpos_update_time in the next round to decide whether to get
4924 * the latest flush position.
4925 */
4926static void
4928{
4930 rdt_data->remote_lsn = InvalidXLogRecPtr;
4931 rdt_data->remote_oldestxid = InvalidFullTransactionId;
4932 rdt_data->remote_nextxid = InvalidFullTransactionId;
4933 rdt_data->reply_time = 0;
4934 rdt_data->remote_wait_for = InvalidFullTransactionId;
4935 rdt_data->candidate_xid = InvalidTransactionId;
4936 rdt_data->table_sync_wait_time = 0;
4937}
4938
4939/*
4940 * Adjust the interval for advancing non-removable transaction IDs.
4941 *
4942 * If there is no activity on the node or retention has been stopped, we
4943 * progressively double the interval used to advance non-removable transaction
4944 * ID. This helps conserve CPU and network resources when there's little benefit
4945 * to frequent updates.
4946 *
4947 * The interval is capped by the lowest of the following:
4948 * - wal_receiver_status_interval (if set and retention is active),
4949 * - a default maximum of 3 minutes,
4950 * - max_retention_duration (if retention is active).
4951 *
4952 * This ensures the interval never exceeds the retention boundary, even if other
4953 * limits are higher. Once activity resumes on the node and the retention is
4954 * active, the interval is reset to lesser of 100ms and max_retention_duration,
4955 * allowing timely advancement of non-removable transaction ID.
4956 *
4957 * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
4958 * consider the other interval or a separate GUC if the need arises.
4959 */
4960static void
4962{
4963 if (rdt_data->xid_advance_interval && !new_xid_found)
4964 {
4968
4969 /*
4970 * No new transaction ID has been assigned since the last check, so
4971 * double the interval, but not beyond the maximum allowable value.
4972 */
4973 rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4974 max_interval);
4975 }
4976 else if (rdt_data->xid_advance_interval &&
4978 {
4979 /*
4980 * Retention has been stopped, so double the interval-capped at a
4981 * maximum of 3 minutes. The wal_receiver_status_interval is
4982 * intentionally not used as a upper bound, since the likelihood of
4983 * retention resuming is lower than that of general activity resuming.
4984 */
4985 rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4987 }
4988 else
4989 {
4990 /*
4991 * A new transaction ID was found or the interval is not yet
4992 * initialized, so set the interval to the minimum value.
4993 */
4994 rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
4995 }
4996
4997 /*
4998 * Ensure the wait time remains within the maximum retention time limit
4999 * when retention is active.
5000 */
5002 rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
5004}
5005
5006/*
5007 * Exit routine for apply workers due to subscription parameter changes.
5008 */
5009static void
5011{
5013 {
5014 /*
5015 * Don't stop the parallel apply worker as the leader will detect the
5016 * subscription parameter change and restart logical replication later
5017 * anyway. This also prevents the leader from reporting errors when
5018 * trying to communicate with a stopped parallel apply worker, which
5019 * would accidentally disable subscriptions if disable_on_error was
5020 * set.
5021 */
5022 return;
5023 }
5024
5025 /*
5026 * Reset the last-start time for this apply worker so that the launcher
5027 * will restart it without waiting for wal_retrieve_retry_interval if the
5028 * subscription is still active, and so that we won't leak that hash table
5029 * entry if it isn't.
5030 */
5033
5034 proc_exit(0);
5035}
5036
5037/*
5038 * Reread subscription info if needed.
5039 *
5040 * For significant changes, we react by exiting the current process; a new
5041 * one will be launched afterwards if needed.
5042 */
5043void
5045{
5048 bool started_tx = false;
5049
5050 /* When cache state is valid there is nothing to do here. */
5052 return;
5053
5054 /* This function might be called inside or outside of transaction. */
5055 if (!IsTransactionState())
5056 {
5058 started_tx = true;
5059 }
5060
5061 /* Ensure allocations in permanent context. */
5063
5065
5066 /*
5067 * Exit if the subscription was removed. This normally should not happen
5068 * as the worker gets killed during DROP SUBSCRIPTION.
5069 */
5070 if (!newsub)
5071 {
5072 ereport(LOG,
5073 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
5074 MySubscription->name)));
5075
5076 /* Ensure we remove no-longer-useful entry for worker's start time */
5079
5080 proc_exit(0);
5081 }
5082
5083 /* Exit if the subscription was disabled. */
5084 if (!newsub->enabled)
5085 {
5086 ereport(LOG,
5087 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
5088 MySubscription->name)));
5089
5091 }
5092
5093 /* !slotname should never happen when enabled is true. */
5094 Assert(newsub->slotname);
5095
5096 /* two-phase cannot be altered while the worker is running */
5097 Assert(newsub->twophasestate == MySubscription->twophasestate);
5098
5099 /*
5100 * Exit if any parameter that affects the remote connection was changed.
5101 * The launcher will start a new worker but note that the parallel apply
5102 * worker won't restart if the streaming option's value is changed from
5103 * 'parallel' to any other value or the server decides not to stream the
5104 * in-progress transaction.
5105 */
5106 if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
5107 strcmp(newsub->name, MySubscription->name) != 0 ||
5108 strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
5109 newsub->binary != MySubscription->binary ||
5110 newsub->stream != MySubscription->stream ||
5111 newsub->passwordrequired != MySubscription->passwordrequired ||
5112 strcmp(newsub->origin, MySubscription->origin) != 0 ||
5113 newsub->owner != MySubscription->owner ||
5114 !equal(newsub->publications, MySubscription->publications))
5115 {
5117 ereport(LOG,
5118 (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
5119 MySubscription->name)));
5120 else
5121 ereport(LOG,
5122 (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
5123 MySubscription->name)));
5124
5126 }
5127
5128 /*
5129 * Exit if the subscription owner's superuser privileges have been
5130 * revoked.
5131 */
5132 if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
5133 {
5135 ereport(LOG,
5136 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
5138 else
5139 ereport(LOG,
5140 errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
5142
5144 }
5145
5146 /* Check for other changes that should never happen too. */
5147 if (newsub->dbid != MySubscription->dbid)
5148 {
5149 elog(ERROR, "subscription %u changed unexpectedly",
5151 }
5152
5153 /* Clean old subscription info and switch to new one. */
5156
5158
5159 /* Change synchronous commit according to the user's wishes */
5160 SetConfigOption("synchronous_commit", MySubscription->synccommit,
5162
5163 /* Change wal_receiver_timeout according to the user's wishes */
5165
5166 if (started_tx)
5168
5169 MySubscriptionValid = true;
5170}
5171
5172/*
5173 * Change wal_receiver_timeout to MySubscription->walrcvtimeout.
5174 */
5175static void
5177{
5178 bool parsed;
5179 int val;
5181
5182 /*
5183 * Set the wal_receiver_timeout GUC to MySubscription->walrcvtimeout,
5184 * which comes from the subscription's wal_receiver_timeout option. If the
5185 * value is -1, reset the GUC to its default, meaning it will inherit from
5186 * the server config, command line, or role/database settings.
5187 */
5189 if (parsed && val == -1)
5190 SetConfigOption("wal_receiver_timeout", NULL,
5192 else
5193 SetConfigOption("wal_receiver_timeout", MySubscription->walrcvtimeout,
5195
5196 /*
5197 * Log the wal_receiver_timeout setting (in milliseconds) as a debug
5198 * message when it changes, to verify it was set correctly.
5199 */
5201 elog(DEBUG1, "logical replication worker for subscription \"%s\" wal_receiver_timeout: %d ms",
5203}
5204
5205/*
5206 * Callback from subscription syscache invalidation. Also needed for server or
5207 * user mapping invalidation, which can change the connection information for
5208 * subscriptions that connect using a server object.
5209 */
5210static void
5215
5216/*
5217 * subxact_info_write
5218 * Store information about subxacts for a toplevel transaction.
5219 *
5220 * For each subxact we store offset of its first change in the main file.
5221 * The file is always over-written as a whole.
5222 *
5223 * XXX We should only store subxacts that were not aborted yet.
5224 */
5225static void
5227{
5228 char path[MAXPGPATH];
5229 Size len;
5230 BufFile *fd;
5231
5233
5234 /* construct the subxact filename */
5235 subxact_filename(path, subid, xid);
5236
5237 /* Delete the subxacts file, if exists. */
5238 if (subxact_data.nsubxacts == 0)
5239 {
5242
5243 return;
5244 }
5245
5246 /*
5247 * Create the subxact file if it not already created, otherwise open the
5248 * existing file.
5249 */
5251 true);
5252 if (fd == NULL)
5254
5256
5257 /* Write the subxact count and subxact info */
5260
5262
5263 /* free the memory allocated for subxact info */
5265}
5266
5267/*
5268 * subxact_info_read
5269 * Restore information about subxacts of a streamed transaction.
5270 *
5271 * Read information about subxacts into the structure subxact_data that can be
5272 * used later.
5273 */
5274static void
5276{
5277 char path[MAXPGPATH];
5278 Size len;
5279 BufFile *fd;
5281
5285
5286 /*
5287 * If the subxact file doesn't exist that means we don't have any subxact
5288 * info.
5289 */
5290 subxact_filename(path, subid, xid);
5292 true);
5293 if (fd == NULL)
5294 return;
5295
5296 /* read number of subxact items */
5298
5300
5301 /* we keep the maximum as a power of 2 */
5303
5304 /*
5305 * Allocate subxact information in the logical streaming context. We need
5306 * this information during the complete stream so that we can add the sub
5307 * transaction info to this. On stream stop we will flush this information
5308 * to the subxact file and reset the logical streaming context.
5309 */
5314
5315 if (len > 0)
5317
5319}
5320
5321/*
5322 * subxact_info_add
5323 * Add information about a subxact (offset in the main file).
5324 */
5325static void
5327{
5328 SubXactInfo *subxacts = subxact_data.subxacts;
5329 int64 i;
5330
5331 /* We must have a valid top level stream xid and a stream fd. */
5333 Assert(stream_fd != NULL);
5334
5335 /*
5336 * If the XID matches the toplevel transaction, we don't want to add it.
5337 */
5338 if (stream_xid == xid)
5339 return;
5340
5341 /*
5342 * In most cases we're checking the same subxact as we've already seen in
5343 * the last call, so make sure to ignore it (this change comes later).
5344 */
5345 if (subxact_data.subxact_last == xid)
5346 return;
5347
5348 /* OK, remember we're processing this XID. */
5350
5351 /*
5352 * Check if the transaction is already present in the array of subxact. We
5353 * intentionally scan the array from the tail, because we're likely adding
5354 * a change for the most recent subtransactions.
5355 *
5356 * XXX Can we rely on the subxact XIDs arriving in sorted order? That
5357 * would allow us to use binary search here.
5358 */
5359 for (i = subxact_data.nsubxacts; i > 0; i--)
5360 {
5361 /* found, so we're done */
5362 if (subxacts[i - 1].xid == xid)
5363 return;
5364 }
5365
5366 /* This is a new subxact, so we need to add it to the array. */
5367 if (subxact_data.nsubxacts == 0)
5368 {
5370
5372
5373 /*
5374 * Allocate this memory for subxacts in per-stream context, see
5375 * subxact_info_read.
5376 */
5380 }
5382 {
5384 subxacts = repalloc_array(subxacts, SubXactInfo,
5386 }
5387
5388 subxacts[subxact_data.nsubxacts].xid = xid;
5389
5390 /*
5391 * Get the current offset of the stream file and store it as offset of
5392 * this subxact.
5393 */
5395 &subxacts[subxact_data.nsubxacts].fileno,
5396 &subxacts[subxact_data.nsubxacts].offset);
5397
5399 subxact_data.subxacts = subxacts;
5400}
5401
5402/* format filename for file containing the info about subxacts */
5403static inline void
5404subxact_filename(char *path, Oid subid, TransactionId xid)
5405{
5406 snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
5407}
5408
5409/* format filename for file containing serialized changes */
5410static inline void
5411changes_filename(char *path, Oid subid, TransactionId xid)
5412{
5413 snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
5414}
5415
5416/*
5417 * stream_cleanup_files
5418 * Cleanup files for a subscription / toplevel transaction.
5419 *
5420 * Remove files with serialized changes and subxact info for a particular
5421 * toplevel transaction. Each subscription has a separate set of files
5422 * for any toplevel transaction.
5423 */
5424void
5426{
5427 char path[MAXPGPATH];
5428
5429 /* Delete the changes file. */
5430 changes_filename(path, subid, xid);
5432
5433 /* Delete the subxact file, if it exists. */
5434 subxact_filename(path, subid, xid);
5436}
5437
5438/*
5439 * stream_open_file
5440 * Open a file that we'll use to serialize changes for a toplevel
5441 * transaction.
5442 *
5443 * Open a file for streamed changes from a toplevel transaction identified
5444 * by stream_xid (global variable). If it's the first chunk of streamed
5445 * changes for this transaction, create the buffile, otherwise open the
5446 * previously created file.
5447 */
5448static void
5450{
5451 char path[MAXPGPATH];
5453
5454 Assert(OidIsValid(subid));
5456 Assert(stream_fd == NULL);
5457
5458
5459 changes_filename(path, subid, xid);
5460 elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
5461
5462 /*
5463 * Create/open the buffiles under the logical streaming context so that we
5464 * have those files until stream stop.
5465 */
5467
5468 /*
5469 * If this is the first streamed segment, create the changes file.
5470 * Otherwise, just open the file for writing, in append mode.
5471 */
5472 if (first_segment)
5474 path);
5475 else
5476 {
5477 /*
5478 * Open the file and seek to the end of the file because we always
5479 * append the changes file.
5480 */
5482 path, O_RDWR, false);
5484 }
5485
5487}
5488
5489/*
5490 * stream_close_file
5491 * Close the currently open file with streamed changes.
5492 */
5493static void
5495{
5496 Assert(stream_fd != NULL);
5497
5499
5500 stream_fd = NULL;
5501}
5502
5503/*
5504 * stream_write_change
5505 * Serialize a change to a file for the current toplevel transaction.
5506 *
5507 * The change is serialized in a simple format, with length (not including
5508 * the length), action code (identifying the message type) and message
5509 * contents (without the subxact TransactionId value).
5510 */
5511static void
5513{
5514 int len;
5515
5516 Assert(stream_fd != NULL);
5517
5518 /* total on-disk size, including the action type character */
5519 len = (s->len - s->cursor) + sizeof(char);
5520
5521 /* first write the size */
5522 BufFileWrite(stream_fd, &len, sizeof(len));
5523
5524 /* then the action */
5525 BufFileWrite(stream_fd, &action, sizeof(action));
5526
5527 /* and finally the remaining part of the buffer (after the XID) */
5528 len = (s->len - s->cursor);
5529
5531}
5532
5533/*
5534 * stream_open_and_write_change
5535 * Serialize a message to a file for the given transaction.
5536 *
5537 * This function is similar to stream_write_change except that it will open the
5538 * target file if not already before writing the message and close the file at
5539 * the end.
5540 */
5541static void
5543{
5545
5546 if (!stream_fd)
5547 stream_start_internal(xid, false);
5548
5549 stream_write_change(action, s);
5551}
5552
5553/*
5554 * Sets streaming options including replication slot name and origin start
5555 * position. Workers need these options for logical replication.
5556 */
5557void
5559 char *slotname,
5561{
5562 int server_version;
5563
5564 options->logical = true;
5565 options->startpoint = *origin_startpos;
5566 options->slotname = slotname;
5567
5569 options->proto.logical.proto_version =
5574
5575 options->proto.logical.publication_names = MySubscription->publications;
5576 options->proto.logical.binary = MySubscription->binary;
5577
5578 /*
5579 * Assign the appropriate option value for streaming option according to
5580 * the 'streaming' mode and the publisher's ability to support that mode.
5581 */
5582 if (server_version >= 160000 &&
5584 {
5585 options->proto.logical.streaming_str = "parallel";
5587 }
5588 else if (server_version >= 140000 &&
5590 {
5591 options->proto.logical.streaming_str = "on";
5593 }
5594 else
5595 {
5596 options->proto.logical.streaming_str = NULL;
5598 }
5599
5600 options->proto.logical.twophase = false;
5601 options->proto.logical.origin = pstrdup(MySubscription->origin);
5602}
5603
5604/*
5605 * Cleanup the memory for subxacts and reset the related variables.
5606 */
5607static inline void
5618
5619/*
5620 * Common function to run the apply loop with error handling. Disable the
5621 * subscription, if necessary.
5622 *
5623 * Note that we don't handle FATAL errors which are probably because
5624 * of system resource error and are not repeatable.
5625 */
5626void
5628{
5629 PG_TRY();
5630 {
5632 }
5633 PG_CATCH();
5634 {
5635 /*
5636 * Reset the origin state to prevent the advancement of origin
5637 * progress if we fail to apply. Otherwise, this will result in
5638 * transaction loss as that transaction won't be sent again by the
5639 * server.
5640 */
5642
5645 else
5646 {
5647 /*
5648 * Report the worker failed while applying changes. Abort the
5649 * current transaction so that the stats message is sent in an
5650 * idle state.
5651 */
5654
5655 PG_RE_THROW();
5656 }
5657 }
5658 PG_END_TRY();
5659}
5660
5661/*
5662 * Runs the leader apply worker.
5663 *
5664 * It sets up replication origin, streaming options and then starts streaming.
5665 */
5666static void
5668{
5669 char originname[NAMEDATALEN];
5671 char *slotname = NULL;
5674 TimeLineID startpointTLI;
5675 char *err;
5676 bool must_use_password;
5677
5678 slotname = MySubscription->slotname;
5679
5680 /*
5681 * This shouldn't happen if the subscription is enabled, but guard against
5682 * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
5683 * slot is NULL.)
5684 */
5685 if (!slotname)
5686 ereport(ERROR,
5688 errmsg("subscription has no replication slot set")));
5689
5690 /* Setup replication origin tracking. */
5692 originname, sizeof(originname));
5695 if (!OidIsValid(originid))
5701
5702 /* Is the use of a password mandatory? */
5705
5707 true, must_use_password,
5709
5711 ereport(ERROR,
5713 errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
5714 MySubscription->name, err)));
5715
5716 /*
5717 * We don't really use the output identify_system for anything but it does
5718 * some initializations on the upstream so let's still call it.
5719 */
5721
5723
5725
5726 /*
5727 * Even when the two_phase mode is requested by the user, it remains as
5728 * the tri-state PENDING until all tablesyncs have reached READY state.
5729 * Only then, can it become ENABLED.
5730 *
5731 * Note: If the subscription has no tables then leave the state as
5732 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
5733 * work.
5734 */
5737 {
5738 /* Start streaming with two_phase enabled */
5739 options.proto.logical.twophase = true;
5741
5743
5744 /*
5745 * Updating pg_subscription might involve TOAST table access, so
5746 * ensure we have a valid snapshot.
5747 */
5749
5754 }
5755 else
5756 {
5758 }
5759
5761 (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
5766 "?")));
5767
5768 /* Run the main loop. */
5770}
5771
5772/*
5773 * Common initialization for leader apply worker, parallel apply worker,
5774 * tablesync worker and sequencesync worker.
5775 *
5776 * Initialize the database connection, in-memory subscription and necessary
5777 * config options.
5778 */
5779void
5781{
5783
5784 /* Run as replica session replication role. */
5785 SetConfigOption("session_replication_role", "replica",
5787
5788 /* Connect to our database. */
5791 0);
5792
5793 /*
5794 * Set always-secure search path, so malicious users can't redirect user
5795 * code (e.g. pg_index.indexprs).
5796 */
5797 SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
5798
5799 /* Load the subscription into persistent memory context. */
5801 "ApplyContext",
5805
5806 /*
5807 * Lock the subscription to prevent it from being concurrently dropped,
5808 * then re-verify its existence. After the initialization, the worker will
5809 * be terminated gracefully if the subscription is dropped.
5810 */
5814 if (!MySubscription)
5815 {
5816 ereport(LOG,
5817 (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
5819
5820 /* Ensure we remove no-longer-useful entry for worker's start time */
5823
5824 proc_exit(0);
5825 }
5826
5827 MySubscriptionValid = true;
5829
5830 if (!MySubscription->enabled)
5831 {
5832 ereport(LOG,
5833 (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
5834 MySubscription->name)));
5835
5837 }
5838
5839 /*
5840 * Restart the worker if retain_dead_tuples was enabled during startup.
5841 *
5842 * At this point, the replication slot used for conflict detection might
5843 * not exist yet, or could be dropped soon if the launcher perceives
5844 * retain_dead_tuples as disabled. To avoid unnecessary tracking of
5845 * oldest_nonremovable_xid when the slot is absent or at risk of being
5846 * dropped, a restart is initiated.
5847 *
5848 * The oldest_nonremovable_xid should be initialized only when the
5849 * subscription's retention is active before launching the worker. See
5850 * logicalrep_worker_launch.
5851 */
5852 if (am_leader_apply_worker() &&
5856 {
5857 ereport(LOG,
5858 errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
5859 MySubscription->name, "retain_dead_tuples"));
5860
5862 }
5863
5864 /* Setup synchronous commit according to the user's wishes */
5865 SetConfigOption("synchronous_commit", MySubscription->synccommit,
5867
5868 /* Change wal_receiver_timeout according to the user's wishes */
5870
5871 /*
5872 * Keep us informed about subscription or role changes. Note that the
5873 * role's superuser privilege can be revoked.
5874 */
5877 (Datum) 0);
5878 /* Changes to foreign servers may affect subscriptions using SERVER. */
5881 (Datum) 0);
5882 /* Changes to user mappings may affect subscriptions using SERVER. */
5885 (Datum) 0);
5886
5887 /*
5888 * Changes to FDW connection_function may affect subscriptions using
5889 * SERVER.
5890 */
5893 (Datum) 0);
5894
5897 (Datum) 0);
5898
5899 if (am_tablesync_worker())
5900 ereport(LOG,
5901 errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
5904 else if (am_sequencesync_worker())
5905 ereport(LOG,
5906 errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started",
5908 else
5909 ereport(LOG,
5910 errmsg("logical replication apply worker for subscription \"%s\" has started",
5912
5914
5915 /*
5916 * Register a callback to reset the origin state before aborting any
5917 * pending transaction during shutdown (see ShutdownPostgres()). This will
5918 * avoid origin advancement for an incomplete transaction which could
5919 * otherwise lead to its loss as such a transaction won't be sent by the
5920 * server again.
5921 *
5922 * Note that even a LOG or DEBUG statement placed after setting the origin
5923 * state may process a shutdown signal before committing the current apply
5924 * operation. So, it is important to register such a callback here.
5925 *
5926 * Register this callback here to ensure that all types of logical
5927 * replication workers that set up origins and apply remote transactions
5928 * are protected.
5929 */
5931}
5932
5933/*
5934 * Callback on exit to clear transaction-level replication origin state.
5935 */
5936static void
5938{
5940}
5941
5942/*
5943 * Common function to setup the leader apply, tablesync and sequencesync worker.
5944 */
5945void
5947{
5948 /* Attach to slot */
5950
5952
5953 /* Setup signal handling */
5956
5957 /*
5958 * We don't currently need any ResourceOwner in a walreceiver process, but
5959 * if we did, we could call CreateAuxProcessResourceOwner here.
5960 */
5961
5962 /* Initialise stats to a sanish value */
5965
5966 /* Load the libpq-specific functions */
5967 load_file("libpqwalreceiver", false);
5968
5970
5971 /* Connect to the origin and start the replication. */
5972 elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
5974
5975 /*
5976 * Setup callback for syscache so that we know when something changes in
5977 * the subscription relation state.
5978 */
5981 (Datum) 0);
5982}
5983
5984/* Logical Replication Apply worker entry point */
5985void
5987{
5989
5991
5993
5995
5997
5998 proc_exit(0);
5999}
6000
6001/*
6002 * After error recovery, disable the subscription in a new transaction
6003 * and exit cleanly.
6004 */
6005void
6007{
6008 /*
6009 * Emit the error message, and recover from the error state to an idle
6010 * state
6011 */
6013
6017
6019
6020 /*
6021 * Report the worker failed during sequence synchronization, table
6022 * synchronization, or apply.
6023 */
6025
6026 /* Disable the subscription */
6028
6029 /*
6030 * Updating pg_subscription might involve TOAST table access, so ensure we
6031 * have a valid snapshot.
6032 */
6034
6038
6039 /* Ensure we remove no-longer-useful entry for worker's start time */
6042
6043 /* Notify the subscription has been disabled and exit */
6044 ereport(LOG,
6045 errmsg("subscription \"%s\" has been disabled because of an error",
6047
6048 /*
6049 * Skip the track_commit_timestamp check when disabling the worker due to
6050 * an error, as verifying commit timestamps is unnecessary in this
6051 * context.
6052 */
6056
6057 proc_exit(0);
6058}
6059
6060/*
6061 * Is current process a logical replication worker?
6062 */
6063bool
6065{
6066 return MyLogicalRepWorker != NULL;
6067}
6068
6069/*
6070 * Is current process a logical replication parallel apply worker?
6071 */
6072bool
6077
6078/*
6079 * Start skipping changes of the transaction if the given LSN matches the
6080 * LSN specified by subscription's skiplsn.
6081 */
6082static void
6084{
6088
6089 /*
6090 * Quick return if it's not requested to skip this transaction. This
6091 * function is called for every remote transaction and we assume that
6092 * skipping the transaction is not used often.
6093 */
6095 MySubscription->skiplsn != finish_lsn))
6096 return;
6097
6098 /* Start skipping all changes of this transaction */
6099 skip_xact_finish_lsn = finish_lsn;
6100
6101 ereport(LOG,
6102 errmsg("logical replication starts skipping transaction at LSN %X/%08X",
6104}
6105
6106/*
6107 * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
6108 */
6109static void
6111{
6112 if (!is_skipping_changes())
6113 return;
6114
6115 ereport(LOG,
6116 errmsg("logical replication completed skipping transaction at LSN %X/%08X",
6118
6119 /* Stop skipping changes */
6121}
6122
6123/*
6124 * Clear subskiplsn of pg_subscription catalog.
6125 *
6126 * finish_lsn is the transaction's finish LSN that is used to check if the
6127 * subskiplsn matches it. If not matched, we raise a warning when clearing the
6128 * subskiplsn in order to inform users for cases e.g., where the user mistakenly
6129 * specified the wrong subskiplsn.
6130 */
6131static void
6133{
6134 Relation rel;
6136 HeapTuple tup;
6138 bool started_tx = false;
6139
6141 return;
6142
6143 if (!IsTransactionState())
6144 {
6146 started_tx = true;
6147 }
6148
6149 /*
6150 * Updating pg_subscription might involve TOAST table access, so ensure we
6151 * have a valid snapshot.
6152 */
6154
6155 /*
6156 * Protect subskiplsn of pg_subscription from being concurrently updated
6157 * while clearing it.
6158 */
6161
6163
6164 /* Fetch the existing tuple. */
6167
6168 if (!HeapTupleIsValid(tup))
6169 elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
6170
6172
6173 /*
6174 * Clear the subskiplsn. If the user has already changed subskiplsn before
6175 * clearing it we don't update the catalog and the replication origin
6176 * state won't get advanced. So in the worst case, if the server crashes
6177 * before sending an acknowledgment of the flush position the transaction
6178 * will be sent again and the user needs to set subskiplsn again. We can
6179 * reduce the possibility by logging a replication origin WAL record to
6180 * advance the origin LSN instead but there is no way to advance the
6181 * origin timestamp and it doesn't seem to be worth doing anything about
6182 * it since it's a very rare case.
6183 */
6184 if (subform->subskiplsn == myskiplsn)
6185 {
6186 bool nulls[Natts_pg_subscription];
6189
6190 memset(values, 0, sizeof(values));
6191 memset(nulls, false, sizeof(nulls));
6192 memset(replaces, false, sizeof(replaces));
6193
6194 /* reset subskiplsn */
6197
6199 replaces);
6200 CatalogTupleUpdate(rel, &tup->t_self, tup);
6201
6202 if (myskiplsn != finish_lsn)
6204 errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
6205 errdetail("Remote transaction's finish WAL location (LSN) %X/%08X did not match skip-LSN %X/%08X.",
6206 LSN_FORMAT_ARGS(finish_lsn),
6208 }
6209
6211 table_close(rel, NoLock);
6212
6214
6215 if (started_tx)
6217}
6218
6219/* Error callback to give more context info about the change being applied */
6220void
6222{
6224
6226 return;
6227
6228 Assert(errarg->origin_name);
6229
6230 if (errarg->rel == NULL)
6231 {
6232 if (!TransactionIdIsValid(errarg->remote_xid))
6233 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
6234 errarg->origin_name,
6236 else if (!XLogRecPtrIsValid(errarg->finish_lsn))
6237 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
6238 errarg->origin_name,
6240 errarg->remote_xid);
6241 else
6242 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
6243 errarg->origin_name,
6245 errarg->remote_xid,
6246 LSN_FORMAT_ARGS(errarg->finish_lsn));
6247 }
6248 else
6249 {
6250 if (errarg->remote_attnum < 0)
6251 {
6252 if (!XLogRecPtrIsValid(errarg->finish_lsn))
6253 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
6254 errarg->origin_name,
6256 errarg->rel->remoterel.nspname,
6257 errarg->rel->remoterel.relname,
6258 errarg->remote_xid);
6259 else
6260 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
6261 errarg->origin_name,
6263 errarg->rel->remoterel.nspname,
6264 errarg->rel->remoterel.relname,
6265 errarg->remote_xid,
6266 LSN_FORMAT_ARGS(errarg->finish_lsn));
6267 }
6268 else
6269 {
6270 if (!XLogRecPtrIsValid(errarg->finish_lsn))
6271 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
6272 errarg->origin_name,
6274 errarg->rel->remoterel.nspname,
6275 errarg->rel->remoterel.relname,
6276 errarg->rel->remoterel.attnames[errarg->remote_attnum],
6277 errarg->remote_xid);
6278 else
6279 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
6280 errarg->origin_name,
6282 errarg->rel->remoterel.nspname,
6283 errarg->rel->remoterel.relname,
6284 errarg->rel->remoterel.attnames[errarg->remote_attnum],
6285 errarg->remote_xid,
6286 LSN_FORMAT_ARGS(errarg->finish_lsn));
6287 }
6288 }
6289}
6290
6291/* Set transaction information of apply error callback */
6292static inline void
6298
6299/* Reset all information of apply error callback */
6300static inline void
6308
6309/*
6310 * Request wakeup of the workers for the given subscription OID
6311 * at commit of the current transaction.
6312 *
6313 * This is used to ensure that the workers process assorted changes
6314 * as soon as possible.
6315 */
6316void
6326
6327/*
6328 * Wake up the workers of any subscriptions that were changed in this xact.
6329 */
6330void
6332{
6334 {
6335 ListCell *lc;
6336
6339 {
6340 Oid subid = lfirst_oid(lc);
6341 List *workers;
6342 ListCell *lc2;
6343
6344 workers = logicalrep_workers_find(subid, true, false);
6345 foreach(lc2, workers)
6346 {
6348
6350 }
6351 }
6353 }
6354
6355 /* The List storage will be reclaimed automatically in xact cleanup. */
6357}
6358
6359/*
6360 * Allocate the origin name in long-lived context for error context message.
6361 */
6362void
6368
6369/*
6370 * Return the action to be taken for the given transaction. See
6371 * TransApplyAction for information on each of the actions.
6372 *
6373 * *winfo is assigned to the destination parallel worker info when the leader
6374 * apply worker has to pass all the transaction's changes to the parallel
6375 * apply worker.
6376 */
6377static TransApplyAction
6379{
6380 *winfo = NULL;
6381
6383 {
6384 return TRANS_PARALLEL_APPLY;
6385 }
6386
6387 /*
6388 * If we are processing this transaction using a parallel apply worker
6389 * then either we send the changes to the parallel worker or if the worker
6390 * is busy then serialize the changes to the file which will later be
6391 * processed by the parallel worker.
6392 */
6393 *winfo = pa_find_worker(xid);
6394
6395 if (*winfo && (*winfo)->serialize_changes)
6396 {
6398 }
6399 else if (*winfo)
6400 {
6402 }
6403
6404 /*
6405 * If there is no parallel worker involved to process this transaction
6406 * then we either directly apply the change or serialize it to a file
6407 * which will later be applied when the transaction finish message is
6408 * processed.
6409 */
6410 else if (in_streamed_transaction)
6411 {
6413 }
6414 else
6415 {
6416 return TRANS_LEADER_APPLY;
6417 }
6418}
AclResult
Definition acl.h:183
@ ACLCHECK_OK
Definition acl.h:184
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition aclchk.c:2672
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition aclchk.c:4082
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_reset_subtrans(void)
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerShared * MyParallelShared
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_allocate_worker(TransactionId xid)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void pa_decr_and_wait_stream_block(void)
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition atomics.h:424
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition worker.c:2755
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition worker.c:5404
static void begin_replication_step(void)
Definition worker.c:732
static void end_replication_step(void)
Definition worker.c:755
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
Definition worker.c:876
static void cleanup_subxact_info(void)
Definition worker.c:5608
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
Definition worker.c:5558
static void apply_handle_stream_prepare(StringInfo s)
Definition worker.c:1524
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition worker.c:2730
static void subxact_info_add(TransactionId xid)
Definition worker.c:5326
static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
Definition worker.c:4775
static XLogRecPtr last_flushpos
Definition worker.c:531
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition worker.c:5425
MemoryContext ApplyMessageContext
Definition worker.c:475
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition worker.c:687
static void apply_handle_type(StringInfo s)
Definition worker.c:2592
static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
Definition worker.c:4407
static void wait_for_local_flush(RetainDeadTuplesData *rdt_data)
Definition worker.c:4620
static void apply_handle_truncate(StringInfo s)
Definition worker.c:3653
RetainDeadTuplesPhase
Definition worker.c:392
@ RDT_WAIT_FOR_PUBLISHER_STATUS
Definition worker.c:395
@ RDT_RESUME_CONFLICT_INFO_RETENTION
Definition worker.c:398
@ RDT_GET_CANDIDATE_XID
Definition worker.c:393
@ RDT_REQUEST_PUBLISHER_STATUS
Definition worker.c:394
@ RDT_WAIT_FOR_LOCAL_FLUSH
Definition worker.c:396
@ RDT_STOP_CONFLICT_INFO_RETENTION
Definition worker.c:397
static void run_apply_worker(void)
Definition worker.c:5667
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition worker.c:3971
static void get_candidate_xid(RetainDeadTuplesData *rdt_data)
Definition worker.c:4459
static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
Definition worker.c:6378
TransApplyAction
Definition worker.c:374
@ TRANS_LEADER_SERIALIZE
Definition worker.c:379
@ TRANS_PARALLEL_APPLY
Definition worker.c:382
@ TRANS_LEADER_SEND_TO_PARALLEL
Definition worker.c:380
@ TRANS_LEADER_APPLY
Definition worker.c:376
@ TRANS_LEADER_PARTIAL_SERIALIZE
Definition worker.c:381
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition worker.c:783
static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
Definition worker.c:5542
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition worker.c:5411
bool InitializingApplyWorker
Definition worker.c:503
static void apply_worker_exit(void)
Definition worker.c:5010
static BufFile * stream_fd
Definition worker.c:524
static void apply_handle_update(StringInfo s)
Definition worker.c:2796
void stream_stop_internal(TransactionId xid)
Definition worker.c:1868
static void apply_handle_stream_commit(StringInfo s)
Definition worker.c:2396
void start_apply(XLogRecPtr origin_startpos)
Definition worker.c:5627
static void stop_skipping_changes(void)
Definition worker.c:6110
#define NAPTIME_PER_CYCLE
Definition worker.c:303
static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition worker.c:3180
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition worker.c:3901
static bool update_retention_status(bool active)
Definition worker.c:4888
static uint32 parallel_stream_nchanges
Definition worker.c:500
static void apply_handle_commit_prepared(StringInfo s)
Definition worker.c:1411
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition worker.c:3987
void LogicalRepWorkersWakeupAtCommit(Oid subid)
Definition worker.c:6317
#define MAX_XID_ADVANCE_INTERVAL
Definition worker.c:460
bool IsLogicalWorker(void)
Definition worker.c:6064
static ApplySubXactData subxact_data
Definition worker.c:549
static void ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, pgoff_t offset)
Definition worker.c:2234
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
Definition worker.c:3357
static ApplyErrorCallbackArg apply_error_callback_arg
Definition worker.c:463
static void subscription_change_cb(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
Definition worker.c:5211
bool in_remote_transaction
Definition worker.c:488
static XLogRecPtr skip_xact_finish_lsn
Definition worker.c:520
static void stream_open_file(Oid subid, TransactionId xid, bool first_segment)
Definition worker.c:5449
static void apply_handle_delete(StringInfo s)
Definition worker.c:3018
void apply_dispatch(StringInfo s)
Definition worker.c:3781
static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
Definition worker.c:4961
#define is_skipping_changes()
Definition worker.c:521
static void stream_write_change(char action, StringInfo s)
Definition worker.c:5512
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
Definition worker.c:6132
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
Definition worker.c:2913
#define MIN_XID_ADVANCE_INTERVAL
Definition worker.c:459
static void apply_handle_begin(StringInfo s)
Definition worker.c:1217
void DisableSubscriptionAndExit(void)
Definition worker.c:6006
static dlist_head lsn_mapping
Definition worker.c:312
bool IsLogicalParallelApplyWorker(void)
Definition worker.c:6073
void AtEOXact_LogicalRepWorkers(bool isCommit)
Definition worker.c:6331
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition worker.c:1023
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition worker.c:647
static void finish_edata(ApplyExecutionData *edata)
Definition worker.c:934
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition worker.c:1124
static void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
Definition worker.c:6293
ErrorContextCallback * apply_error_context_stack
Definition worker.c:473
static void stream_abort_internal(TransactionId xid, TransactionId subxid)
Definition worker.c:1994
static void apply_handle_commit(StringInfo s)
Definition worker.c:1242
static bool IsIndexUsableForFindingDeletedTuple(Oid localindexoid, TransactionId conflict_detection_xmin)
Definition worker.c:3241
void stream_start_internal(TransactionId xid, bool first_segment)
Definition worker.c:1693
static List * on_commit_wakeup_workers_subids
Definition worker.c:486
static void apply_handle_stream_abort(StringInfo s)
Definition worker.c:2077
static void apply_handle_relation(StringInfo s)
Definition worker.c:2569
void set_apply_error_context_origin(char *originname)
Definition worker.c:6363
static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data, bool status_received)
Definition worker.c:4561
MemoryContext ApplyContext
Definition worker.c:476
static void subxact_info_write(Oid subid, TransactionId xid)
Definition worker.c:5226
static void TargetPrivilegesCheck(Relation rel, AclMode mode)
Definition worker.c:2607
static void apply_handle_prepare(StringInfo s)
Definition worker.c:1337
static void apply_handle_rollback_prepared(StringInfo s)
Definition worker.c:1463
void SetupApplyOrSyncWorker(int worker_slot)
Definition worker.c:5946
static void apply_handle_stream_stop(StringInfo s)
Definition worker.c:1891
static void apply_handle_origin(StringInfo s)
Definition worker.c:1672
static void request_publisher_status(RetainDeadTuplesData *rdt_data)
Definition worker.c:4522
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition worker.c:4303
static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
Definition worker.c:4927
static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data, bool status_received)
Definition worker.c:4429
static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data, bool status_received)
Definition worker.c:4393
WalReceiverConn * LogRepWorkerWalRcvConn
Definition worker.c:481
static void resume_conflict_info_retention(RetainDeadTuplesData *rdt_data)
Definition worker.c:4848
static XLogRecPtr remote_final_lsn
Definition worker.c:489
static bool MySubscriptionValid
Definition worker.c:484
void apply_error_callback(void *arg)
Definition worker.c:6221
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition worker.c:3945
static MemoryContext LogicalStreamingContext
Definition worker.c:479
void maybe_reread_subscription(void)
Definition worker.c:5044
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
Definition worker.c:2509
void InitializeLogRepWorker(void)
Definition worker.c:5780
static void set_wal_receiver_timeout(void)
Definition worker.c:5176
static bool in_streamed_transaction
Definition worker.c:492
static void apply_handle_begin_prepare(StringInfo s)
Definition worker.c:1271
void ApplyWorkerMain(Datum main_arg)
Definition worker.c:5986
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition worker.c:2266
static void apply_handle_stream_start(StringInfo s)
Definition worker.c:1731
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Definition worker.c:6083
static void on_exit_clear_xact_state(int code, Datum arg)
Definition worker.c:5937
static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
Definition worker.c:4810
Subscription * MySubscription
Definition worker.c:483
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
Definition worker.c:1300
static void stream_close_file(void)
Definition worker.c:5494
static TransactionId stream_xid
Definition worker.c:494
static void apply_handle_insert(StringInfo s)
Definition worker.c:2639
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition worker.c:965
static bool FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, TupleTableSlot *remoteslot, TransactionId *delete_xid, ReplOriginId *delete_origin, TimestampTz *delete_time)
Definition worker.c:3275
static void subxact_info_read(Oid subid, TransactionId xid)
Definition worker.c:5275
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
Definition worker.c:3112
static void reset_apply_error_context_info(void)
Definition worker.c:6301
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1748
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1772
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1636
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1600
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_IDLE
@ STATE_IDLEINTRANSACTION
@ STATE_RUNNING
void BackgroundWorkerUnblockSignals(void)
Definition bgworker.c:934
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition bgworker.c:894
Bitmapset * bms_make_singleton(int x)
Definition bitmapset.c:216
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition bitmapset.c:799
static Datum values[MAXATTR]
Definition bootstrap.c:188
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition buffile.c:292
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition buffile.c:655
int BufFileSeek(BufFile *file, int fileno, pgoff_t offset, int whence)
Definition buffile.c:741
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition buffile.c:677
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
Definition buffile.c:665
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition buffile.c:268
void BufFileTruncateFileSet(BufFile *file, int fileno, pgoff_t offset)
Definition buffile.c:928
void BufFileTell(BufFile *file, int *fileno, pgoff_t *offset)
Definition buffile.c:833
void BufFileClose(BufFile *file)
Definition buffile.c:413
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
Definition buffile.c:365
#define Min(x, y)
Definition c.h:1093
#define likely(x)
Definition c.h:431
#define Assert(condition)
Definition c.h:945
int64_t int64
Definition c.h:615
uint64_t uint64
Definition c.h:619
uint32_t uint32
Definition c.h:618
#define pg_fallthrough
Definition c.h:152
uint32 TransactionId
Definition c.h:738
#define OidIsValid(objectId)
Definition c.h:860
size_t Size
Definition c.h:691
bool track_commit_timestamp
Definition commit_ts.c:109
bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, ReplOriginId *localorigin, TimestampTz *localts)
Definition conflict.c:63
void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples)
Definition conflict.c:104
void InitConflictIndexes(ResultRelInfo *relInfo)
Definition conflict.c:139
ConflictType
Definition conflict.h:32
@ CT_UPDATE_DELETED
Definition conflict.h:43
@ CT_DELETE_MISSING
Definition conflict.h:52
@ CT_UPDATE_ORIGIN_DIFFERS
Definition conflict.h:37
@ CT_UPDATE_MISSING
Definition conflict.h:46
@ CT_DELETE_ORIGIN_DIFFERS
Definition conflict.h:49
int64 TimestampTz
Definition timestamp.h:39
void load_file(const char *filename, bool restricted)
Definition dfmgr.c:149
Datum arg
Definition elog.c:1322
void EmitErrorReport(void)
Definition elog.c:1882
ErrorContextCallback * error_context_stack
Definition elog.c:99
void FlushErrorState(void)
Definition elog.c:2062
int errcode(int sqlerrcode)
Definition elog.c:874
#define LOG
Definition elog.h:31
#define PG_RE_THROW()
Definition elog.h:405
int int errdetail_internal(const char *fmt,...) pg_attribute_printf(1
#define errcontext
Definition elog.h:198
int errdetail(const char *fmt,...) pg_attribute_printf(1
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define PG_TRY(...)
Definition elog.h:372
#define WARNING
Definition elog.h:36
#define DEBUG2
Definition elog.h:29
#define PG_END_TRY(...)
Definition elog.h:397
#define DEBUG1
Definition elog.h:30
#define ERROR
Definition elog.h:39
#define PG_CATCH(...)
Definition elog.h:382
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
bool equal(const void *a, const void *b)
Definition equalfuncs.c:223
void err(int eval, const char *fmt,...)
Definition err.c:43
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition execExpr.c:143
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition execMain.c:1875
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam, List *resultRelations)
Definition execMain.c:2737
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition execMain.c:1262
void EvalPlanQualEnd(EPQState *epqstate)
Definition execMain.c:3198
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
void CheckSubscriptionRelkind(char localrelkind, char remoterelkind, const char *nspname, const char *relname)
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
bool RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, ReplOriginId *delete_origin, TimestampTz *delete_time)
bool RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, ReplOriginId *delete_origin, TimestampTz *delete_time)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
const TupleTableSlotOps TTSOpsVirtual
Definition execTuples.c:84
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
TupleConversionMap * ExecGetRootToChildMap(ResultRelInfo *resultRelInfo, EState *estate)
Definition execUtils.c:1331
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos, Bitmapset *unpruned_relids)
Definition execUtils.c:778
void FreeExecutorState(EState *estate)
Definition execUtils.c:197
EState * CreateExecutorState(void)
Definition execUtils.c:90
#define GetPerTupleExprContext(estate)
Definition executor.h:660
#define GetPerTupleMemoryContext(estate)
Definition executor.h:665
#define EvalPlanQualSetSlot(epqstate, slot)
Definition executor.h:290
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition executor.h:396
#define ERRCODE_PROTOCOL_VIOLATION
Definition fe-connect.c:96
#define palloc_object(type)
Definition fe_memutils.h:74
#define repalloc_array(pointer, type, count)
Definition fe_memutils.h:78
#define palloc_array(type, count)
Definition fe_memutils.h:76
#define palloc0_object(type)
Definition fe_memutils.h:75
void FileSetInit(FileSet *fileset)
Definition fileset.c:52
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
Definition fmgr.c:1773
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition fmgr.c:1755
struct Latch * MyLatch
Definition globals.c:63
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
bool parse_int(const char *value, int *result, int flags, const char **hintmsg)
Definition guc.c:2775
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition guc.c:4228
@ PGC_S_OVERRIDE
Definition guc.h:123
@ PGC_S_SESSION
Definition guc.h:126
@ PGC_SUSET
Definition guc.h:78
@ PGC_SIGHUP
Definition guc.h:75
@ PGC_BACKEND
Definition guc.h:77
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition heaptuple.c:1130
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1384
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static TransactionId HeapTupleHeaderGetXmin(const HeapTupleHeaderData *tup)
static void * GETSTRUCT(const HeapTupleData *tuple)
static void dlist_delete(dlist_node *node)
Definition ilist.h:405
#define dlist_tail_element(type, membername, lhead)
Definition ilist.h:612
#define dlist_foreach_modify(iter, lhead)
Definition ilist.h:640
static bool dlist_is_empty(const dlist_head *head)
Definition ilist.h:336
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition ilist.h:364
#define DLIST_STATIC_INIT(name)
Definition ilist.h:281
#define dlist_container(type, membername, ptr)
Definition ilist.h:593
void index_close(Relation relation, LOCKMODE lockmode)
Definition indexam.c:177
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition indexam.c:133
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition indexing.c:313
long val
Definition informix.c:689
#define write(a, b, c)
Definition win32.h:14
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
void AcceptInvalidationMessages(void)
Definition inval.c:930
void CacheRegisterSyscacheCallback(SysCacheIdentifier cacheid, SyscacheCallbackFunction func, Datum arg)
Definition inval.c:1816
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
void proc_exit(int code)
Definition ipc.c:105
int i
Definition isn.c:77
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition latch.c:223
void ResetLatch(Latch *latch)
Definition latch.c:374
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition launcher.c:294
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition launcher.c:747
void logicalrep_worker_attach(int slot)
Definition launcher.c:758
void ApplyLauncherWakeup(void)
Definition launcher.c:1195
LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
Definition launcher.c:259
void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
Definition launcher.c:724
LogicalRepWorker * MyLogicalRepWorker
Definition launcher.c:57
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition launcher.c:1155
List * lappend(List *list, void *datum)
Definition list.c:339
List * lappend_oid(List *list, Oid datum)
Definition list.c:375
List * list_append_unique_oid(List *list, Oid datum)
Definition list.c:1380
bool list_member_oid(const List *list, Oid datum)
Definition list.c:722
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
int LOCKMODE
Definition lockdefs.h:26
#define NoLock
Definition lockdefs.h:34
#define AccessExclusiveLock
Definition lockdefs.h:43
#define AccessShareLock
Definition lockdefs.h:36
#define RowExclusiveLock
Definition lockdefs.h:38
@ LockTupleExclusive
Definition lockoptions.h:59
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
#define LOGICALREP_COLUMN_UNCHANGED
LogicalRepMsgType
@ LOGICAL_REP_MSG_INSERT
@ LOGICAL_REP_MSG_TRUNCATE
@ LOGICAL_REP_MSG_STREAM_STOP
@ LOGICAL_REP_MSG_BEGIN
@ LOGICAL_REP_MSG_STREAM_PREPARE
@ LOGICAL_REP_MSG_STREAM_ABORT
@ LOGICAL_REP_MSG_BEGIN_PREPARE
@ LOGICAL_REP_MSG_STREAM_START
@ LOGICAL_REP_MSG_COMMIT
@ LOGICAL_REP_MSG_PREPARE
@ LOGICAL_REP_MSG_RELATION
@ LOGICAL_REP_MSG_MESSAGE
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
@ LOGICAL_REP_MSG_COMMIT_PREPARED
@ LOGICAL_REP_MSG_TYPE
@ LOGICAL_REP_MSG_DELETE
@ LOGICAL_REP_MSG_STREAM_COMMIT
@ LOGICAL_REP_MSG_ORIGIN
@ LOGICAL_REP_MSG_UPDATE
uint32 LogicalRepRelId
#define LOGICALREP_PROTO_VERSION_NUM
#define LOGICALREP_COLUMN_BINARY
#define LOGICALREP_COLUMN_TEXT
char * get_rel_name(Oid relid)
Definition lsyscache.c:2148
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition lsyscache.c:3096
char * get_namespace_name(Oid nspid)
Definition lsyscache.c:3588
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition lsyscache.c:3162
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1177
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1794
@ LW_SHARED
Definition lwlock.h:113
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition mcxt.c:1768
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:403
MemoryContext TopTransactionContext
Definition mcxt.c:171
char * pstrdup(const char *in)
Definition mcxt.c:1781
void * repalloc(void *pointer, Size size)
Definition mcxt.c:1632
void pfree(void *pointer)
Definition mcxt.c:1616
MemoryContext TopMemoryContext
Definition mcxt.c:166
void * palloc(Size size)
Definition mcxt.c:1387
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
#define RESUME_INTERRUPTS()
Definition miscadmin.h:136
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
#define HOLD_INTERRUPTS()
Definition miscadmin.h:134
Oid GetUserId(void)
Definition miscinit.c:470
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition miscinit.c:989
CmdType
Definition nodes.h:273
@ CMD_INSERT
Definition nodes.h:277
@ CMD_DELETE
Definition nodes.h:278
@ CMD_UPDATE
Definition nodes.h:276
#define makeNode(_type_)
Definition nodes.h:161
static char * errmsg
ObjectType get_relkind_objtype(char relkind)
ReplOriginId replorigin_create(const char *roname)
Definition origin.c:263
ReplOriginXactState replorigin_xact_state
Definition origin.c:167
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:232
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition origin.c:1329
void replorigin_xact_clear(bool clear_origin)
Definition origin.c:1353
void replorigin_session_setup(ReplOriginId node, int acquired_by)
Definition origin.c:1147
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
#define ACL_DELETE
Definition parsenodes.h:79
uint64 AclMode
Definition parsenodes.h:74
#define ACL_INSERT
Definition parsenodes.h:76
#define ACL_UPDATE
Definition parsenodes.h:78
@ RTE_RELATION
@ DROP_RESTRICT
#define ACL_SELECT
Definition parsenodes.h:77
#define ACL_TRUNCATE
Definition parsenodes.h:80
int16 attnum
FormData_pg_attribute * Form_pg_attribute
static uint32 pg_ceil_log2_32(uint32 num)
static PgChecksumMode mode
#define NAMEDATALEN
#define MAXPGPATH
const void size_t len
static int server_version
Definition pg_dumpall.c:122
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
#define lfirst(lc)
Definition pg_list.h:172
#define NIL
Definition pg_list.h:68
#define list_make1(x1)
Definition pg_list.h:212
static void * list_nth(const List *list, int n)
Definition pg_list.h:299
#define lfirst_oid(lc)
Definition pg_list.h:174
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
void FreeSubscription(Subscription *sub)
Subscription * GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
void DisableSubscription(Oid subid)
void UpdateDeadTupleRetentionStatus(Oid subid, bool active)
END_CATALOG_STRUCT typedef FormData_pg_subscription * Form_pg_subscription
static char buf[DEFAULT_XLOG_SEG_SIZE]
long pgstat_report_stat(bool force)
Definition pgstat.c:704
void pgstat_report_subscription_error(Oid subid)
int64 timestamp
Expr * expression_planner(Expr *expr)
Definition planner.c:6819
#define pqsignal
Definition port.h:547
int pgsocket
Definition port.h:29
#define snprintf
Definition port.h:260
#define PGINVALID_SOCKET
Definition port.h:31
off_t pgoff_t
Definition port.h:421
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
uint64_t Datum
Definition postgres.h:70
static int32 DatumGetInt32(Datum X)
Definition postgres.h:202
#define InvalidOid
unsigned int Oid
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition pqformat.c:414
int pq_getmsgbyte(StringInfo msg)
Definition pqformat.c:398
int64 pq_getmsgint64(StringInfo msg)
Definition pqformat.c:452
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition pqformat.h:152
char * c
static int fd(const char *x, int i)
static int fb(int x)
char * s2
TransactionId GetOldestActiveTransactionId(bool inCommitOnly, bool allDbs)
Definition procarray.c:2832
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition proto.c:98
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition proto.c:561
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
Definition proto.c:325
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
Definition proto.c:134
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition proto.c:757
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition proto.c:487
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition proto.c:615
void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
Definition proto.c:1187
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition proto.c:63
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
Definition proto.c:267
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition proto.c:698
const char * logicalrep_message_type(LogicalRepMsgType action)
Definition proto.c:1212
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition proto.c:365
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition proto.c:1132
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition proto.c:428
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition proto.c:228
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition proto.c:1082
#define PqReplMsg_WALData
Definition protocol.h:77
#define PqReplMsg_PrimaryStatusRequest
Definition protocol.h:83
#define PqReplMsg_Keepalive
Definition protocol.h:75
#define PqReplMsg_PrimaryStatusUpdate
Definition protocol.h:76
#define PqReplMsg_StandbyStatusUpdate
Definition protocol.h:84
static color newsub(struct colormap *cm, color co)
Definition regc_color.c:389
#define RelationGetRelid(relation)
Definition rel.h:514
#define RelationIsLogicallyLogged(relation)
Definition rel.h:710
#define RelationGetDescr(relation)
Definition rel.h:540
#define RelationGetRelationName(relation)
Definition rel.h:548
#define RELATION_IS_OTHER_TEMP(relation)
Definition rel.h:667
#define RelationGetNamespace(relation)
Definition rel.h:555
List * RelationGetIndexList(Relation relation)
Definition relcache.c:4826
ResourceOwner TopTransactionResourceOwner
Definition resowner.c:175
ResourceOwner CurrentResourceOwner
Definition resowner.c:173
Node * build_column_default(Relation rel, int attrno)
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition rls.c:52
@ RLS_ENABLED
Definition rls.h:45
Snapshot GetTransactionSnapshot(void)
Definition snapmgr.c:272
void PushActiveSnapshot(Snapshot snapshot)
Definition snapmgr.c:682
void PopActiveSnapshot(void)
Definition snapmgr.c:775
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
Definition relation.c:585
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition relation.c:647
bool IsIndexUsableForReplicaIdentityFull(Relation idxrel, AttrMap *attrmap)
Definition relation.c:835
Oid GetRelationIdentityOrPK(Relation rel)
Definition relation.c:905
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition relation.c:165
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition relation.c:518
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition relation.c:362
StringInfo makeStringInfo(void)
Definition stringinfo.c:72
void resetStringInfo(StringInfo str)
Definition stringinfo.c:126
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition stringinfo.h:157
TransactionId remote_xid
Definition worker.c:334
LogicalRepMsgType command
Definition worker.c:329
XLogRecPtr finish_lsn
Definition worker.c:335
LogicalRepRelMapEntry * rel
Definition worker.c:330
ResultRelInfo * targetRelInfo
Definition worker.c:319
EState * estate
Definition worker.c:316
PartitionTupleRouting * proute
Definition worker.c:323
ModifyTableState * mtstate
Definition worker.c:322
LogicalRepRelMapEntry * targetRel
Definition worker.c:318
uint32 nsubxacts
Definition worker.c:543
uint32 nsubxacts_max
Definition worker.c:544
SubXactInfo * subxacts
Definition worker.c:546
TransactionId subxact_last
Definition worker.c:545
int maplen
Definition attmap.h:37
AttrNumber * attnums
Definition attmap.h:36
List * es_rteperminfos
Definition execnodes.h:680
List * es_tupleTable
Definition execnodes.h:724
List * es_opened_result_relations
Definition execnodes.h:700
CommandId es_output_cid
Definition execnodes.h:694
struct ErrorContextCallback * previous
Definition elog.h:297
void(* callback)(void *arg)
Definition elog.h:298
dlist_node node
Definition worker.c:307
XLogRecPtr remote_end
Definition worker.c:309
XLogRecPtr local_end
Definition worker.c:308
Definition pg_list.h:54
LogicalRepRelation remoterel
TimestampTz last_recv_time
LogicalRepWorkerType type
TimestampTz reply_time
FileSet * stream_fileset
TransactionId oldest_nonremovable_xid
TimestampTz last_send_time
ResultRelInfo * resultRelInfo
Definition execnodes.h:1420
ParallelApplyWorkerShared * shared
pg_atomic_uint32 pending_stream_count
Plan * plan
Definition execnodes.h:1177
EState * state
Definition execnodes.h:1179
Form_pg_class rd_rel
Definition rel.h:111
ReplOriginId origin
Definition origin.h:45
XLogRecPtr origin_lsn
Definition origin.h:46
TimestampTz origin_timestamp
Definition origin.h:47
TimestampTz flushpos_update_time
Definition worker.c:436
FullTransactionId remote_oldestxid
Definition worker.c:416
FullTransactionId remote_wait_for
Definition worker.c:432
TimestampTz last_recv_time
Definition worker.c:447
TimestampTz candidate_xid_time
Definition worker.c:448
long table_sync_wait_time
Definition worker.c:440
FullTransactionId remote_nextxid
Definition worker.c:423
RetainDeadTuplesPhase phase
Definition worker.c:407
XLogRecPtr remote_lsn
Definition worker.c:408
TimestampTz reply_time
Definition worker.c:425
TransactionId candidate_xid
Definition worker.c:434
TransactionId xid
Definition worker.c:535
pgoff_t offset
Definition worker.c:537
int fileno
Definition worker.c:536
XLogRecPtr skiplsn
AttrMap * attrMap
Definition tupconvert.h:28
TupleDesc tts_tupleDescriptor
Definition tuptable.h:129
bool * tts_isnull
Definition tuptable.h:133
Datum * tts_values
Definition tuptable.h:131
dlist_node * cur
Definition ilist.h:200
void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
void ProcessSyncingRelations(XLogRecPtr current_lsn)
Definition syncutils.c:156
void InvalidateSyncingRelStates(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
Definition syncutils.c:101
#define FirstLowInvalidHeapAttributeNumber
Definition sysattr.h:27
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:264
HeapTuple SearchSysCache1(SysCacheIdentifier cacheId, Datum key1)
Definition syscache.c:220
#define SearchSysCacheCopy1(cacheId, key1)
Definition syscache.h:91
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition tableam.c:92
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs, bool run_as_table_owner)
Definition tablecmds.c:2014
bool AllTablesyncsReady(void)
Definition tablesync.c:1597
bool HasSubscriptionTablesCached(void)
Definition tablesync.c:1627
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition tablesync.c:1648
#define InvalidTransactionId
Definition transam.h:31
#define FullTransactionIdPrecedesOrEquals(a, b)
Definition transam.h:52
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition transam.h:282
static FullTransactionId FullTransactionIdFromU64(uint64 value)
Definition transam.h:81
#define TransactionIdEquals(id1, id2)
Definition transam.h:43
#define TransactionIdIsValid(xid)
Definition transam.h:41
#define InvalidFullTransactionId
Definition transam.h:56
#define FullTransactionIdIsValid(x)
Definition transam.h:55
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263
void AfterTriggerEndQuery(EState *estate)
Definition trigger.c:5137
void AfterTriggerBeginQuery(void)
Definition trigger.c:5117
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition tupconvert.c:103
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition tupconvert.c:193
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition tupdesc.h:178
static CompactAttribute * TupleDescCompactAttr(TupleDesc tupdesc, int i)
Definition tupdesc.h:193
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition tuptable.h:476
static void slot_getallattrs(TupleTableSlot *slot)
Definition tuptable.h:390
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition tuptable.h:543
void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res, int szgid)
Definition twophase.c:2750
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
Definition twophase.c:2691
void FinishPreparedTransaction(const char *gid, bool isCommit)
Definition twophase.c:1500
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition usercontext.c:87
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
const char * type
#define WL_SOCKET_READABLE
#define WL_TIMEOUT
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET
static StringInfoData reply_message
int wal_receiver_status_interval
Definition walreceiver.c:90
int wal_receiver_timeout
Definition walreceiver.c:91
#define walrcv_startstreaming(conn, options)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_send(conn, buffer, nbytes)
#define walrcv_server_version(conn)
#define walrcv_endstreaming(conn, next_tli)
#define walrcv_identify_system(conn, primary_tli)
#define walrcv_receive(conn, buffer, wait_fd)
int WalWriterDelay
Definition walwriter.c:71
#define SIGHUP
Definition win32_port.h:158
@ PARALLEL_TRANS_STARTED
@ PARALLEL_TRANS_FINISHED
static bool am_parallel_apply_worker(void)
@ WORKERTYPE_TABLESYNC
@ WORKERTYPE_UNKNOWN
@ WORKERTYPE_SEQUENCESYNC
@ WORKERTYPE_PARALLEL_APPLY
@ WORKERTYPE_APPLY
@ FS_SERIALIZE_DONE
static bool am_sequencesync_worker(void)
static bool am_tablesync_worker(void)
static bool am_leader_apply_worker(void)
bool IsTransactionOrTransactionBlock(void)
Definition xact.c:5012
bool PrepareTransactionBlock(const char *gid)
Definition xact.c:4015
bool IsTransactionState(void)
Definition xact.c:389
void CommandCounterIncrement(void)
Definition xact.c:1102
void StartTransactionCommand(void)
Definition xact.c:3081
void SetCurrentStatementStartTimestamp(void)
Definition xact.c:916
bool IsTransactionBlock(void)
Definition xact.c:4994
void BeginTransactionBlock(void)
Definition xact.c:3947
void CommitTransactionCommand(void)
Definition xact.c:3179
bool EndTransactionBlock(bool chain)
Definition xact.c:4067
void AbortOutOfAnyTransaction(void)
Definition xact.c:4885
CommandId GetCurrentCommandId(bool used)
Definition xact.c:831
#define GIDSIZE
Definition xact.h:31
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition xlog.c:6609
XLogRecPtr XactLastCommitEnd
Definition xlog.c:259
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint16 ReplOriginId
Definition xlogdefs.h:69
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
uint32 TimeLineID
Definition xlogdefs.h:63