PostgreSQL Source Code git master
Loading...
Searching...
No Matches
worker.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 * worker.c
3 * PostgreSQL logical replication worker (apply)
4 *
5 * Copyright (c) 2016-2026, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/worker.c
9 *
10 * NOTES
11 * This file contains the worker which applies logical changes as they come
12 * from remote logical replication stream.
13 *
14 * The main worker (apply) is started by logical replication worker
15 * launcher for every enabled subscription in a database. It uses
16 * walsender protocol to communicate with publisher.
17 *
18 * This module includes server facing code and shares libpqwalreceiver
19 * module with walreceiver for providing the libpq specific functionality.
20 *
21 *
22 * STREAMED TRANSACTIONS
23 * ---------------------
24 * Streamed transactions (large transactions exceeding a memory limit on the
25 * upstream) are applied using one of two approaches:
26 *
27 * 1) Write to temporary files and apply when the final commit arrives
28 *
29 * This approach is used when the user has set the subscription's streaming
30 * option as on.
31 *
32 * Unlike the regular (non-streamed) case, handling streamed transactions has
33 * to handle aborts of both the toplevel transaction and subtransactions. This
34 * is achieved by tracking offsets for subtransactions, which is then used
35 * to truncate the file with serialized changes.
36 *
37 * The files are placed in tmp file directory by default, and the filenames
38 * include both the XID of the toplevel transaction and OID of the
39 * subscription. This is necessary so that different workers processing a
40 * remote transaction with the same XID doesn't interfere.
41 *
42 * We use BufFiles instead of using normal temporary files because (a) the
43 * BufFile infrastructure supports temporary files that exceed the OS file size
44 * limit, (b) provides a way for automatic clean up on the error and (c) provides
45 * a way to survive these files across local transactions and allow to open and
46 * close at stream start and close. We decided to use FileSet
47 * infrastructure as without that it deletes the files on the closure of the
48 * file and if we decide to keep stream files open across the start/stop stream
49 * then it will consume a lot of memory (more than 8K for each BufFile and
50 * there could be multiple such BufFiles as the subscriber could receive
51 * multiple start/stop streams for different transactions before getting the
52 * commit). Moreover, if we don't use FileSet then we also need to invent
53 * a new way to pass filenames to BufFile APIs so that we are allowed to open
54 * the file we desired across multiple stream-open calls for the same
55 * transaction.
56 *
57 * 2) Parallel apply workers.
58 *
59 * This approach is used when the user has set the subscription's streaming
60 * option as parallel. See logical/applyparallelworker.c for information about
61 * this approach.
62 *
63 * TWO_PHASE TRANSACTIONS
64 * ----------------------
65 * Two phase transactions are replayed at prepare and then committed or
66 * rolled back at commit prepared and rollback prepared respectively. It is
67 * possible to have a prepared transaction that arrives at the apply worker
68 * when the tablesync is busy doing the initial copy. In this case, the apply
69 * worker skips all the prepared operations [e.g. inserts] while the tablesync
70 * is still busy (see the condition of should_apply_changes_for_rel). The
71 * tablesync worker might not get such a prepared transaction because say it
72 * was prior to the initial consistent point but might have got some later
73 * commits. Now, the tablesync worker will exit without doing anything for the
74 * prepared transaction skipped by the apply worker as the sync location for it
75 * will be already ahead of the apply worker's current location. This would lead
76 * to an "empty prepare", because later when the apply worker does the commit
77 * prepare, there is nothing in it (the inserts were skipped earlier).
78 *
79 * To avoid this, and similar prepare confusions the subscription's two_phase
80 * commit is enabled only after the initial sync is over. The two_phase option
81 * has been implemented as a tri-state with values DISABLED, PENDING, and
82 * ENABLED.
83 *
84 * Even if the user specifies they want a subscription with two_phase = on,
85 * internally it will start with a tri-state of PENDING which only becomes
86 * ENABLED after all tablesync initializations are completed - i.e. when all
87 * tablesync workers have reached their READY state. In other words, the value
88 * PENDING is only a temporary state for subscription start-up.
89 *
90 * Until the two_phase is properly available (ENABLED) the subscription will
91 * behave as if two_phase = off. When the apply worker detects that all
92 * tablesyncs have become READY (while the tri-state was PENDING) it will
93 * restart the apply worker process. This happens in
94 * ProcessSyncingTablesForApply.
95 *
96 * When the (re-started) apply worker finds that all tablesyncs are READY for a
97 * two_phase tri-state of PENDING it start streaming messages with the
98 * two_phase option which in turn enables the decoding of two-phase commits at
99 * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100 * Now, it is possible that during the time we have not enabled two_phase, the
101 * publisher (replication server) would have skipped some prepares but we
102 * ensure that such prepares are sent along with commit prepare, see
103 * ReorderBufferFinishPrepared.
104 *
105 * If the subscription has no tables then a two_phase tri-state PENDING is
106 * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107 * PUBLICATION which might otherwise be disallowed (see below).
108 *
109 * If ever a user needs to be aware of the tri-state value, they can fetch it
110 * from the pg_subscription catalog (see column subtwophasestate).
111 *
112 * Finally, to avoid problems mentioned in previous paragraphs from any
113 * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
114 * to 'off' and then again back to 'on') there is a restriction for
115 * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
116 * the two_phase tri-state is ENABLED, except when copy_data = false.
117 *
118 * We can get prepare of the same GID more than once for the genuine cases
119 * where we have defined multiple subscriptions for publications on the same
120 * server and prepared transaction has operations on tables subscribed to those
121 * subscriptions. For such cases, if we use the GID sent by publisher one of
122 * the prepares will be successful and others will fail, in which case the
123 * server will send them again. Now, this can lead to a deadlock if user has
124 * set synchronous_standby_names for all the subscriptions on subscriber. To
125 * avoid such deadlocks, we generate a unique GID (consisting of the
126 * subscription oid and the xid of the prepared transaction) for each prepare
127 * transaction on the subscriber.
128 *
129 * FAILOVER
130 * ----------------------
131 * The logical slot on the primary can be synced to the standby by specifying
132 * failover = true when creating the subscription. Enabling failover allows us
133 * to smoothly transition to the promoted standby, ensuring that we can
134 * subscribe to the new primary without losing any data.
135 *
136 * RETAIN DEAD TUPLES
137 * ----------------------
138 * Each apply worker that enabled retain_dead_tuples option maintains a
139 * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
140 * prevent dead rows from being removed prematurely when the apply worker still
141 * needs them to detect update_deleted conflicts. Additionally, this helps to
142 * retain the required commit_ts module information, which further helps to
143 * detect update_origin_differs and delete_origin_differs conflicts reliably, as
144 * otherwise, vacuum freeze could remove the required information.
145 *
146 * The logical replication launcher manages an internal replication slot named
147 * "pg_conflict_detection". It asynchronously aggregates the non-removable
148 * transaction ID from all apply workers to determine the appropriate xmin for
149 * the slot, thereby retaining necessary tuples.
150 *
151 * The non-removable transaction ID in the apply worker is advanced to the
152 * oldest running transaction ID once all concurrent transactions on the
153 * publisher have been applied and flushed locally. The process involves:
154 *
155 * - RDT_GET_CANDIDATE_XID:
156 * Call GetOldestActiveTransactionId() to take oldestRunningXid as the
157 * candidate xid.
158 *
159 * - RDT_REQUEST_PUBLISHER_STATUS:
160 * Send a message to the walsender requesting the publisher status, which
161 * includes the latest WAL write position and information about transactions
162 * that are in the commit phase.
163 *
164 * - RDT_WAIT_FOR_PUBLISHER_STATUS:
165 * Wait for the status from the walsender. After receiving the first status,
166 * do not proceed if there are concurrent remote transactions that are still
167 * in the commit phase. These transactions might have been assigned an
168 * earlier commit timestamp but have not yet written the commit WAL record.
169 * Continue to request the publisher status (RDT_REQUEST_PUBLISHER_STATUS)
170 * until all these transactions have completed.
171 *
172 * - RDT_WAIT_FOR_LOCAL_FLUSH:
173 * Advance the non-removable transaction ID if the current flush location has
174 * reached or surpassed the last received WAL position.
175 *
176 * - RDT_STOP_CONFLICT_INFO_RETENTION:
177 * This phase is required only when max_retention_duration is defined. We
178 * enter this phase if the wait time in either the
179 * RDT_WAIT_FOR_PUBLISHER_STATUS or RDT_WAIT_FOR_LOCAL_FLUSH phase exceeds
180 * configured max_retention_duration. In this phase,
181 * pg_subscription.subretentionactive is updated to false within a new
182 * transaction, and oldest_nonremovable_xid is set to InvalidTransactionId.
183 *
184 * - RDT_RESUME_CONFLICT_INFO_RETENTION:
185 * This phase is required only when max_retention_duration is defined. We
186 * enter this phase if the retention was previously stopped, and the time
187 * required to advance the non-removable transaction ID in the
188 * RDT_WAIT_FOR_LOCAL_FLUSH phase has decreased to within acceptable limits
189 * (or if max_retention_duration is set to 0). During this phase,
190 * pg_subscription.subretentionactive is updated to true within a new
191 * transaction, and the worker will be restarted.
192 *
193 * The overall state progression is: GET_CANDIDATE_XID ->
194 * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
195 * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
196 * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID.
197 *
198 * Retaining the dead tuples for this period is sufficient for ensuring
199 * eventual consistency using last-update-wins strategy, as dead tuples are
200 * useful for detecting conflicts only during the application of concurrent
201 * transactions from remote nodes. After applying and flushing all remote
202 * transactions that occurred concurrently with the tuple DELETE, any
203 * subsequent UPDATE from a remote node should have a later timestamp. In such
204 * cases, it is acceptable to detect an update_missing scenario and convert the
205 * UPDATE to an INSERT when applying it. But, for concurrent remote
206 * transactions with earlier timestamps than the DELETE, detecting
207 * update_deleted is necessary, as the UPDATEs in remote transactions should be
208 * ignored if their timestamp is earlier than that of the dead tuples.
209 *
210 * Note that advancing the non-removable transaction ID is not supported if the
211 * publisher is also a physical standby. This is because the logical walsender
212 * on the standby can only get the WAL replay position but there may be more
213 * WALs that are being replicated from the primary and those WALs could have
214 * earlier commit timestamp.
215 *
216 * Similarly, when the publisher has subscribed to another publisher,
217 * information necessary for conflict detection cannot be retained for
218 * changes from origins other than the publisher. This is because publisher
219 * lacks the information on concurrent transactions of other publishers to
220 * which it subscribes. As the information on concurrent transactions is
221 * unavailable beyond subscriber's immediate publishers, the non-removable
222 * transaction ID might be advanced prematurely before changes from other
223 * origins have been fully applied.
224 *
225 * XXX Retaining information for changes from other origins might be possible
226 * by requesting the subscription on that origin to enable retain_dead_tuples
227 * and fetching the conflict detection slot.xmin along with the publisher's
228 * status. In the RDT_WAIT_FOR_PUBLISHER_STATUS phase, the apply worker could
229 * wait for the remote slot's xmin to reach the oldest active transaction ID,
230 * ensuring that all transactions from other origins have been applied on the
231 * publisher, thereby getting the latest WAL position that includes all
232 * concurrent changes. However, this approach may impact performance, so it
233 * might not worth the effort.
234 *
235 * XXX It seems feasible to get the latest commit's WAL location from the
236 * publisher and wait till that is applied. However, we can't do that
237 * because commit timestamps can regress as a commit with a later LSN is not
238 * guaranteed to have a later timestamp than those with earlier LSNs. Having
239 * said that, even if that is possible, it won't improve performance much as
240 * the apply always lag and moves slowly as compared with the transactions
241 * on the publisher.
242 *-------------------------------------------------------------------------
243 */
244
245#include "postgres.h"
246
247#include <sys/stat.h>
248#include <unistd.h>
249
250#include "access/commit_ts.h"
251#include "access/table.h"
252#include "access/tableam.h"
253#include "access/twophase.h"
254#include "access/xact.h"
255#include "catalog/indexing.h"
256#include "catalog/pg_inherits.h"
260#include "commands/tablecmds.h"
261#include "commands/trigger.h"
262#include "executor/executor.h"
264#include "libpq/pqformat.h"
265#include "miscadmin.h"
266#include "optimizer/optimizer.h"
268#include "pgstat.h"
269#include "postmaster/bgworker.h"
270#include "postmaster/interrupt.h"
271#include "postmaster/walwriter.h"
272#include "replication/conflict.h"
277#include "replication/origin.h"
278#include "replication/slot.h"
282#include "storage/buffile.h"
283#include "storage/ipc.h"
284#include "storage/latch.h"
285#include "storage/lmgr.h"
286#include "storage/procarray.h"
287#include "tcop/tcopprot.h"
288#include "utils/acl.h"
289#include "utils/guc.h"
290#include "utils/inval.h"
291#include "utils/lsyscache.h"
292#include "utils/memutils.h"
293#include "utils/pg_lsn.h"
294#include "utils/rel.h"
295#include "utils/rls.h"
296#include "utils/snapmgr.h"
297#include "utils/syscache.h"
298#include "utils/usercontext.h"
299
300#define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
301
308
310
311typedef struct ApplyExecutionData
312{
313 EState *estate; /* executor state, used to track resources */
314
315 LogicalRepRelMapEntry *targetRel; /* replication target rel */
316 ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
317
318 /* These fields are used when the target relation is partitioned: */
319 ModifyTableState *mtstate; /* dummy ModifyTable state */
320 PartitionTupleRouting *proute; /* partition routing info */
322
323/* Struct for saving and restoring apply errcontext information */
325{
326 LogicalRepMsgType command; /* 0 if invalid */
328
329 /* Remote node information */
330 int remote_attnum; /* -1 if invalid */
335
336/*
337 * The action to be taken for the changes in the transaction.
338 *
339 * TRANS_LEADER_APPLY:
340 * This action means that we are in the leader apply worker or table sync
341 * worker. The changes of the transaction are either directly applied or
342 * are read from temporary files (for streaming transactions) and then
343 * applied by the worker.
344 *
345 * TRANS_LEADER_SERIALIZE:
346 * This action means that we are in the leader apply worker or table sync
347 * worker. Changes are written to temporary files and then applied when the
348 * final commit arrives.
349 *
350 * TRANS_LEADER_SEND_TO_PARALLEL:
351 * This action means that we are in the leader apply worker and need to send
352 * the changes to the parallel apply worker.
353 *
354 * TRANS_LEADER_PARTIAL_SERIALIZE:
355 * This action means that we are in the leader apply worker and have sent some
356 * changes directly to the parallel apply worker and the remaining changes are
357 * serialized to a file, due to timeout while sending data. The parallel apply
358 * worker will apply these serialized changes when the final commit arrives.
359 *
360 * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
361 * serializing changes, the leader worker also needs to serialize the
362 * STREAM_XXX message to a file, and wait for the parallel apply worker to
363 * finish the transaction when processing the transaction finish command. So
364 * this new action was introduced to keep the code and logic clear.
365 *
366 * TRANS_PARALLEL_APPLY:
367 * This action means that we are in the parallel apply worker and changes of
368 * the transaction are applied directly by the worker.
369 */
370typedef enum
371{
372 /* The action for non-streaming transactions. */
374
375 /* Actions for streaming transactions. */
381
382/*
383 * The phases involved in advancing the non-removable transaction ID.
384 *
385 * See comments atop worker.c for details of the transition between these
386 * phases.
387 */
397
398/*
399 * Critical information for managing phase transitions within the
400 * RetainDeadTuplesPhase.
401 */
403{
404 RetainDeadTuplesPhase phase; /* current phase */
405 XLogRecPtr remote_lsn; /* WAL write position on the publisher */
406
407 /*
408 * Oldest transaction ID that was in the commit phase on the publisher.
409 * Use FullTransactionId to prevent issues with transaction ID wraparound,
410 * where a new remote_oldestxid could falsely appear to originate from the
411 * past and block advancement.
412 */
414
415 /*
416 * Next transaction ID to be assigned on the publisher. Use
417 * FullTransactionId for consistency and to allow straightforward
418 * comparisons with remote_oldestxid.
419 */
421
422 TimestampTz reply_time; /* when the publisher responds with status */
423
424 /*
425 * Publisher transaction ID that must be awaited to complete before
426 * entering the final phase (RDT_WAIT_FOR_LOCAL_FLUSH). Use
427 * FullTransactionId for the same reason as remote_nextxid.
428 */
430
431 TransactionId candidate_xid; /* candidate for the non-removable
432 * transaction ID */
433 TimestampTz flushpos_update_time; /* when the remote flush position was
434 * updated in final phase
435 * (RDT_WAIT_FOR_LOCAL_FLUSH) */
436
437 long table_sync_wait_time; /* time spent waiting for table sync
438 * to finish */
439
440 /*
441 * The following fields are used to determine the timing for the next
442 * round of transaction ID advancement.
443 */
444 TimestampTz last_recv_time; /* when the last message was received */
445 TimestampTz candidate_xid_time; /* when the candidate_xid is decided */
446 int xid_advance_interval; /* how much time (ms) to wait before
447 * attempting to advance the
448 * non-removable transaction ID */
450
451/*
452 * The minimum (100ms) and maximum (3 minutes) intervals for advancing
453 * non-removable transaction IDs. The maximum interval is a bit arbitrary but
454 * is sufficient to not cause any undue network traffic.
455 */
456#define MIN_XID_ADVANCE_INTERVAL 100
457#define MAX_XID_ADVANCE_INTERVAL 180000
458
459/* errcontext tracker */
461{
462 .command = 0,
463 .rel = NULL,
464 .remote_attnum = -1,
465 .remote_xid = InvalidTransactionId,
466 .finish_lsn = InvalidXLogRecPtr,
467 .origin_name = NULL,
468};
469
471
474
475/* per stream context for streaming transactions */
477
479
481static bool MySubscriptionValid = false;
482
484
487
488/* fields valid only when processing streamed transaction */
489static bool in_streamed_transaction = false;
490
492
493/*
494 * The number of changes applied by parallel apply worker during one streaming
495 * block.
496 */
498
499/* Are we initializing an apply worker? */
501
502/*
503 * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
504 * the subscription if the remote transaction's finish LSN matches the subskiplsn.
505 * Once we start skipping changes, we don't stop it until we skip all changes of
506 * the transaction even if pg_subscription is updated and MySubscription->skiplsn
507 * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
508 * we don't skip receiving and spooling the changes since we decide whether or not
509 * to skip applying the changes when starting to apply changes. The subskiplsn is
510 * cleared after successfully skipping the transaction or applying non-empty
511 * transaction. The latter prevents the mistakenly specified subskiplsn from
512 * being left. Note that we cannot skip the streaming transactions when using
513 * parallel apply workers because we cannot get the finish LSN before applying
514 * the changes. So, we don't start parallel apply worker when finish LSN is set
515 * by the user.
516 */
518#define is_skipping_changes() (unlikely(XLogRecPtrIsValid(skip_xact_finish_lsn)))
519
520/* BufFile handle of the current streaming file */
522
523/*
524 * The remote WAL position that has been applied and flushed locally. We record
525 * and use this information both while sending feedback to the server and
526 * advancing oldest_nonremovable_xid.
527 */
529
530typedef struct SubXactInfo
531{
532 TransactionId xid; /* XID of the subxact */
533 int fileno; /* file number in the buffile */
534 pgoff_t offset; /* offset in the file */
536
537/* Sub-transaction data for the current streaming transaction */
538typedef struct ApplySubXactData
539{
540 uint32 nsubxacts; /* number of sub-transactions */
541 uint32 nsubxacts_max; /* current capacity of subxacts */
542 TransactionId subxact_last; /* xid of the last sub-transaction */
543 SubXactInfo *subxacts; /* sub-xact offset in changes file */
545
547
548static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
549static inline void changes_filename(char *path, Oid subid, TransactionId xid);
550
551/*
552 * Information about subtransactions of a given toplevel transaction.
553 */
554static void subxact_info_write(Oid subid, TransactionId xid);
555static void subxact_info_read(Oid subid, TransactionId xid);
556static void subxact_info_add(TransactionId xid);
557static inline void cleanup_subxact_info(void);
558
559/*
560 * Serialize and deserialize changes for a toplevel transaction.
561 */
562static void stream_open_file(Oid subid, TransactionId xid,
563 bool first_segment);
564static void stream_write_change(char action, StringInfo s);
565static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
566static void stream_close_file(void);
567
568static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
569
571 bool status_received);
574 bool status_received);
578 bool status_received);
583static bool update_retention_status(bool active);
586 bool new_xid_found);
587
588static void apply_worker_exit(void);
589
598 Oid localindexoid);
602 Oid localindexoid);
604 LogicalRepRelation *remoterel,
608static bool FindDeletedTupleInLocalRel(Relation localrel,
617 CmdType operation);
618
619/* Functions for skipping changes */
620static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
621static void stop_skipping_changes(void);
622static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
623
624/* Functions for apply error callback */
625static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
626static inline void reset_apply_error_context_info(void);
627
630
631static void set_wal_receiver_timeout(void);
632
633static void on_exit_clear_xact_state(int code, Datum arg);
634
635/*
636 * Form the origin name for the subscription.
637 *
638 * This is a common function for tablesync and other workers. Tablesync workers
639 * must pass a valid relid. Other callers must pass relid = InvalidOid.
640 *
641 * Return the name in the supplied buffer.
642 */
643void
646{
647 if (OidIsValid(relid))
648 {
649 /* Replication origin name for tablesync workers. */
650 snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
651 }
652 else
653 {
654 /* Replication origin name for non-tablesync workers. */
656 }
657}
658
659/*
660 * Should this worker apply changes for given relation.
661 *
662 * This is mainly needed for initial relation data sync as that runs in
663 * separate worker process running in parallel and we need some way to skip
664 * changes coming to the leader apply worker during the sync of a table.
665 *
666 * Note we need to do smaller or equals comparison for SYNCDONE state because
667 * it might hold position of end of initial slot consistent point WAL
668 * record + 1 (ie start of next record) and next record can be COMMIT of
669 * transaction we are now processing (which is what we set remote_final_lsn
670 * to in apply_handle_begin).
671 *
672 * Note that for streaming transactions that are being applied in the parallel
673 * apply worker, we disallow applying changes if the target table in the
674 * subscription is not in the READY state, because we cannot decide whether to
675 * apply the change as we won't know remote_final_lsn by that time.
676 *
677 * We already checked this in pa_can_start() before assigning the
678 * streaming transaction to the parallel worker, but it also needs to be
679 * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
680 * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
681 * while applying this transaction.
682 */
683static bool
685{
686 switch (MyLogicalRepWorker->type)
687 {
689 return MyLogicalRepWorker->relid == rel->localreloid;
690
692 /* We don't synchronize rel's that are in unknown state. */
693 if (rel->state != SUBREL_STATE_READY &&
697 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
699 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
700
701 return rel->state == SUBREL_STATE_READY;
702
703 case WORKERTYPE_APPLY:
704 return (rel->state == SUBREL_STATE_READY ||
705 (rel->state == SUBREL_STATE_SYNCDONE &&
706 rel->statelsn <= remote_final_lsn));
707
709 /* Should never happen. */
710 elog(ERROR, "sequence synchronization worker is not expected to apply changes");
711 break;
712
714 /* Should never happen. */
715 elog(ERROR, "Unknown worker type");
716 }
717
718 return false; /* dummy for compiler */
719}
720
721/*
722 * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
723 *
724 * Start a transaction, if this is the first step (else we keep using the
725 * existing transaction).
726 * Also provide a global snapshot and ensure we run in ApplyMessageContext.
727 */
728static void
743
744/*
745 * Finish up one step of a replication transaction.
746 * Callers of begin_replication_step() must also call this.
747 *
748 * We don't close out the transaction here, but we should increment
749 * the command counter to make the effects of this step visible.
750 */
751static void
758
759/*
760 * Handle streamed transactions for both the leader apply worker and the
761 * parallel apply workers.
762 *
763 * In the streaming case (receiving a block of the streamed transaction), for
764 * serialize mode, simply redirect it to a file for the proper toplevel
765 * transaction, and for parallel mode, the leader apply worker will send the
766 * changes to parallel apply workers and the parallel apply worker will define
767 * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
768 * messages will be applied by both leader apply worker and parallel apply
769 * workers).
770 *
771 * Returns true for streamed transactions (when the change is either serialized
772 * to file or sent to parallel apply worker), false otherwise (regular mode or
773 * needs to be processed by parallel apply worker).
774 *
775 * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
776 * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
777 * to a parallel apply worker.
778 */
779static bool
781{
786
788
789 /* not in streaming mode */
791 return false;
792
794
795 /*
796 * The parallel apply worker needs the xid in this message to decide
797 * whether to define a savepoint, so save the original message that has
798 * not moved the cursor after the xid. We will serialize this message to a
799 * file in PARTIAL_SERIALIZE mode.
800 */
801 original_msg = *s;
802
803 /*
804 * We should have received XID of the subxact as the first part of the
805 * message, so extract it.
806 */
808
812 errmsg_internal("invalid transaction ID in streamed replication transaction")));
813
814 switch (apply_action)
815 {
818
819 /* Add the new subxact to the array (unless already there). */
821
822 /* Write the change to the current file */
823 stream_write_change(action, s);
824 return true;
825
827 Assert(winfo);
828
829 /*
830 * XXX The publisher side doesn't always send relation/type update
831 * messages after the streaming transaction, so also update the
832 * relation/type in leader apply worker. See function
833 * cleanup_rel_sync_cache.
834 */
835 if (pa_send_data(winfo, s->len, s->data))
836 return (action != LOGICAL_REP_MSG_RELATION &&
837 action != LOGICAL_REP_MSG_TYPE);
838
839 /*
840 * Switch to serialize mode when we are not able to send the
841 * change to parallel apply worker.
842 */
843 pa_switch_to_partial_serialize(winfo, false);
844
848
849 /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
850 return (action != LOGICAL_REP_MSG_RELATION &&
851 action != LOGICAL_REP_MSG_TYPE);
852
855
856 /* Define a savepoint for a subxact if needed. */
858 return false;
859
860 default:
861 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
862 return false; /* silence compiler warning */
863 }
864}
865
866/*
867 * Executor state preparation for evaluation of constraint expressions,
868 * indexes and triggers for the specified relation.
869 *
870 * Note that the caller must open and close any indexes to be updated.
871 */
872static ApplyExecutionData *
874{
876 EState *estate;
878 List *perminfos = NIL;
879 ResultRelInfo *resultRelInfo;
880
882 edata->targetRel = rel;
883
884 edata->estate = estate = CreateExecutorState();
885
887 rte->rtekind = RTE_RELATION;
888 rte->relid = RelationGetRelid(rel->localrel);
889 rte->relkind = rel->localrel->rd_rel->relkind;
890 rte->rellockmode = AccessShareLock;
891
893
896
897 edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
898
899 /*
900 * Use Relation opened by logicalrep_rel_open() instead of opening it
901 * again.
902 */
903 InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
904
905 /*
906 * We put the ResultRelInfo in the es_opened_result_relations list, even
907 * though we don't populate the es_result_relations array. That's a bit
908 * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
909 *
910 * ExecOpenIndices() is not called here either, each execution path doing
911 * an apply operation being responsible for that.
912 */
914 lappend(estate->es_opened_result_relations, resultRelInfo);
915
916 estate->es_output_cid = GetCurrentCommandId(true);
917
918 /* Prepare to catch AFTER triggers. */
920
921 /* other fields of edata remain NULL for now */
922
923 return edata;
924}
925
926/*
927 * Finish any operations related to the executor state created by
928 * create_edata_for_relation().
929 */
930static void
932{
933 EState *estate = edata->estate;
934
935 /* Handle any queued AFTER triggers. */
936 AfterTriggerEndQuery(estate);
937
938 /* Shut down tuple routing, if any was done. */
939 if (edata->proute)
940 ExecCleanupTupleRouting(edata->mtstate, edata->proute);
941
942 /*
943 * Cleanup. It might seem that we should call ExecCloseResultRelations()
944 * here, but we intentionally don't. It would close the rel we added to
945 * es_opened_result_relations above, which is wrong because we took no
946 * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
947 * any other relations opened during execution.
948 */
949 ExecResetTupleTable(estate->es_tupleTable, false);
950 FreeExecutorState(estate);
951 pfree(edata);
952}
953
954/*
955 * Executes default values for columns for which we can't map to remote
956 * relation columns.
957 *
958 * This allows us to support tables which have more columns on the downstream
959 * than on the upstream.
960 */
961static void
963 TupleTableSlot *slot)
964{
966 int num_phys_attrs = desc->natts;
967 int i;
968 int attnum,
969 num_defaults = 0;
970 int *defmap;
971 ExprState **defexprs;
972 ExprContext *econtext;
973
974 econtext = GetPerTupleExprContext(estate);
975
976 /* We got all the data via replication, no need to evaluate anything. */
977 if (num_phys_attrs == rel->remoterel.natts)
978 return;
979
980 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
981 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
982
984 for (attnum = 0; attnum < num_phys_attrs; attnum++)
985 {
987 Expr *defexpr;
988
989 if (cattr->attisdropped || cattr->attgenerated)
990 continue;
991
992 if (rel->attrmap->attnums[attnum] >= 0)
993 continue;
994
995 defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
996
997 if (defexpr != NULL)
998 {
999 /* Run the expression through planner */
1000 defexpr = expression_planner(defexpr);
1001
1002 /* Initialize executable expression in copycontext */
1003 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
1004 defmap[num_defaults] = attnum;
1005 num_defaults++;
1006 }
1007 }
1008
1009 for (i = 0; i < num_defaults; i++)
1010 slot->tts_values[defmap[i]] =
1011 ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
1012}
1013
1014/*
1015 * Store tuple data into slot.
1016 *
1017 * Incoming data can be either text or binary format.
1018 */
1019static void
1022{
1023 int natts = slot->tts_tupleDescriptor->natts;
1024 int i;
1025
1026 ExecClearTuple(slot);
1027
1028 /* Call the "in" function for each non-dropped, non-null attribute */
1029 Assert(natts == rel->attrmap->maplen);
1030 for (i = 0; i < natts; i++)
1031 {
1033 int remoteattnum = rel->attrmap->attnums[i];
1034
1035 if (!att->attisdropped && remoteattnum >= 0)
1036 {
1038
1040
1041 /* Set attnum for error callback */
1043
1045 {
1046 Oid typinput;
1047 Oid typioparam;
1048
1049 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1050 slot->tts_values[i] =
1052 typioparam, att->atttypmod);
1053 slot->tts_isnull[i] = false;
1054 }
1055 else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1056 {
1057 Oid typreceive;
1058 Oid typioparam;
1059
1060 /*
1061 * In some code paths we may be asked to re-parse the same
1062 * tuple data. Reset the StringInfo's cursor so that works.
1063 */
1064 colvalue->cursor = 0;
1065
1066 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1067 slot->tts_values[i] =
1068 OidReceiveFunctionCall(typreceive, colvalue,
1069 typioparam, att->atttypmod);
1070
1071 /* Trouble if it didn't eat the whole buffer */
1072 if (colvalue->cursor != colvalue->len)
1073 ereport(ERROR,
1075 errmsg("incorrect binary data format in logical replication column %d",
1076 remoteattnum + 1)));
1077 slot->tts_isnull[i] = false;
1078 }
1079 else
1080 {
1081 /*
1082 * NULL value from remote. (We don't expect to see
1083 * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
1084 * NULL.)
1085 */
1086 slot->tts_values[i] = (Datum) 0;
1087 slot->tts_isnull[i] = true;
1088 }
1089
1090 /* Reset attnum for error callback */
1092 }
1093 else
1094 {
1095 /*
1096 * We assign NULL to dropped attributes and missing values
1097 * (missing values should be later filled using
1098 * slot_fill_defaults).
1099 */
1100 slot->tts_values[i] = (Datum) 0;
1101 slot->tts_isnull[i] = true;
1102 }
1103 }
1104
1106}
1107
1108/*
1109 * Replace updated columns with data from the LogicalRepTupleData struct.
1110 * This is somewhat similar to heap_modify_tuple but also calls the type
1111 * input functions on the user data.
1112 *
1113 * "slot" is filled with a copy of the tuple in "srcslot", replacing
1114 * columns provided in "tupleData" and leaving others as-is.
1115 *
1116 * Caution: unreplaced pass-by-ref columns in "slot" will point into the
1117 * storage for "srcslot". This is OK for current usage, but someday we may
1118 * need to materialize "slot" at the end to make it independent of "srcslot".
1119 */
1120static void
1124{
1125 int natts = slot->tts_tupleDescriptor->natts;
1126 int i;
1127
1128 /* We'll fill "slot" with a virtual tuple, so we must start with ... */
1129 ExecClearTuple(slot);
1130
1131 /*
1132 * Copy all the column data from srcslot, so that we'll have valid values
1133 * for unreplaced columns.
1134 */
1135 Assert(natts == srcslot->tts_tupleDescriptor->natts);
1137 memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
1138 memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
1139
1140 /* Call the "in" function for each replaced attribute */
1141 Assert(natts == rel->attrmap->maplen);
1142 for (i = 0; i < natts; i++)
1143 {
1145 int remoteattnum = rel->attrmap->attnums[i];
1146
1147 if (remoteattnum < 0)
1148 continue;
1149
1151
1153 {
1155
1156 /* Set attnum for error callback */
1158
1160 {
1161 Oid typinput;
1162 Oid typioparam;
1163
1164 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1165 slot->tts_values[i] =
1167 typioparam, att->atttypmod);
1168 slot->tts_isnull[i] = false;
1169 }
1170 else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1171 {
1172 Oid typreceive;
1173 Oid typioparam;
1174
1175 /*
1176 * In some code paths we may be asked to re-parse the same
1177 * tuple data. Reset the StringInfo's cursor so that works.
1178 */
1179 colvalue->cursor = 0;
1180
1181 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1182 slot->tts_values[i] =
1183 OidReceiveFunctionCall(typreceive, colvalue,
1184 typioparam, att->atttypmod);
1185
1186 /* Trouble if it didn't eat the whole buffer */
1187 if (colvalue->cursor != colvalue->len)
1188 ereport(ERROR,
1190 errmsg("incorrect binary data format in logical replication column %d",
1191 remoteattnum + 1)));
1192 slot->tts_isnull[i] = false;
1193 }
1194 else
1195 {
1196 /* must be LOGICALREP_COLUMN_NULL */
1197 slot->tts_values[i] = (Datum) 0;
1198 slot->tts_isnull[i] = true;
1199 }
1200
1201 /* Reset attnum for error callback */
1203 }
1204 }
1205
1206 /* And finally, declare that "slot" contains a valid virtual tuple */
1208}
1209
1210/*
1211 * Handle BEGIN message.
1212 */
1213static void
1215{
1217
1218 /* There must not be an active streaming transaction. */
1220
1223
1224 remote_final_lsn = begin_data.final_lsn;
1225
1227
1228 in_remote_transaction = true;
1229
1231}
1232
1233/*
1234 * Handle COMMIT message.
1235 *
1236 * TODO, support tracking of multiple origins
1237 */
1238static void
1240{
1242
1244
1245 if (commit_data.commit_lsn != remote_final_lsn)
1246 ereport(ERROR,
1248 errmsg_internal("incorrect commit LSN %X/%08X in commit message (expected %X/%08X)",
1249 LSN_FORMAT_ARGS(commit_data.commit_lsn),
1251
1253
1254 /*
1255 * Process any tables that are being synchronized in parallel, as well as
1256 * any newly added tables or sequences.
1257 */
1259
1262}
1263
1264/*
1265 * Handle BEGIN PREPARE message.
1266 */
1267static void
1269{
1271
1272 /* Tablesync should never receive prepare. */
1273 if (am_tablesync_worker())
1274 ereport(ERROR,
1276 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1277
1278 /* There must not be an active streaming transaction. */
1280
1283
1284 remote_final_lsn = begin_data.prepare_lsn;
1285
1287
1288 in_remote_transaction = true;
1289
1291}
1292
1293/*
1294 * Common function to prepare the GID.
1295 */
1296static void
1298{
1299 char gid[GIDSIZE];
1300
1301 /*
1302 * Compute unique GID for two_phase transactions. We don't use GID of
1303 * prepared transaction sent by server as that can lead to deadlock when
1304 * we have multiple subscriptions from same node point to publications on
1305 * the same node. See comments atop worker.c
1306 */
1308 gid, sizeof(gid));
1309
1310 /*
1311 * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1312 * called within the PrepareTransactionBlock below.
1313 */
1314 if (!IsTransactionBlock())
1315 {
1317 CommitTransactionCommand(); /* Completes the preceding Begin command. */
1318 }
1319
1320 /*
1321 * Update origin state so we can restart streaming from correct position
1322 * in case of crash.
1323 */
1326
1328}
1329
1330/*
1331 * Handle PREPARE message.
1332 */
1333static void
1335{
1337
1339
1340 if (prepare_data.prepare_lsn != remote_final_lsn)
1341 ereport(ERROR,
1343 errmsg_internal("incorrect prepare LSN %X/%08X in prepare message (expected %X/%08X)",
1344 LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1346
1347 /*
1348 * Unlike commit, here, we always prepare the transaction even though no
1349 * change has happened in this transaction or all changes are skipped. It
1350 * is done this way because at commit prepared time, we won't know whether
1351 * we have skipped preparing a transaction because of those reasons.
1352 *
1353 * XXX, We can optimize such that at commit prepared time, we first check
1354 * whether we have prepared the transaction or not but that doesn't seem
1355 * worthwhile because such cases shouldn't be common.
1356 */
1358
1360
1363 pgstat_report_stat(false);
1364
1365 /*
1366 * It is okay not to set the local_end LSN for the prepare because we
1367 * always flush the prepare record. So, we can send the acknowledgment of
1368 * the remote_end LSN as soon as prepare is finished.
1369 *
1370 * XXX For the sake of consistency with commit, we could have set it with
1371 * the LSN of prepare but as of now we don't track that value similar to
1372 * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
1373 * it.
1374 */
1376
1377 in_remote_transaction = false;
1378
1379 /*
1380 * Process any tables that are being synchronized in parallel, as well as
1381 * any newly added tables or sequences.
1382 */
1384
1385 /*
1386 * Since we have already prepared the transaction, in a case where the
1387 * server crashes before clearing the subskiplsn, it will be left but the
1388 * transaction won't be resent. But that's okay because it's a rare case
1389 * and the subskiplsn will be cleared when finishing the next transaction.
1390 */
1393
1396}
1397
1398/*
1399 * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1400 *
1401 * Note that we don't need to wait here if the transaction was prepared in a
1402 * parallel apply worker. In that case, we have already waited for the prepare
1403 * to finish in apply_handle_stream_prepare() which will ensure all the
1404 * operations in that transaction have happened in the subscriber, so no
1405 * concurrent transaction can cause deadlock or transaction dependency issues.
1406 */
1407static void
1409{
1411 char gid[GIDSIZE];
1412
1415
1416 /* Compute GID for two_phase transactions. */
1418 gid, sizeof(gid));
1419
1420 /* There is no transaction when COMMIT PREPARED is called */
1422
1423 /*
1424 * Update origin state so we can restart streaming from correct position
1425 * in case of crash.
1426 */
1429
1430 FinishPreparedTransaction(gid, true);
1433 pgstat_report_stat(false);
1434
1436 in_remote_transaction = false;
1437
1438 /*
1439 * Process any tables that are being synchronized in parallel, as well as
1440 * any newly added tables or sequences.
1441 */
1443
1445
1448}
1449
1450/*
1451 * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1452 *
1453 * Note that we don't need to wait here if the transaction was prepared in a
1454 * parallel apply worker. In that case, we have already waited for the prepare
1455 * to finish in apply_handle_stream_prepare() which will ensure all the
1456 * operations in that transaction have happened in the subscriber, so no
1457 * concurrent transaction can cause deadlock or transaction dependency issues.
1458 */
1459static void
1461{
1463 char gid[GIDSIZE];
1464
1467
1468 /* Compute GID for two_phase transactions. */
1470 gid, sizeof(gid));
1471
1472 /*
1473 * It is possible that we haven't received prepare because it occurred
1474 * before walsender reached a consistent point or the two_phase was still
1475 * not enabled by that time, so in such cases, we need to skip rollback
1476 * prepared.
1477 */
1478 if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1479 rollback_data.prepare_time))
1480 {
1481 /*
1482 * Update origin state so we can restart streaming from correct
1483 * position in case of crash.
1484 */
1487
1488 /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1490 FinishPreparedTransaction(gid, false);
1493
1495 }
1496
1497 pgstat_report_stat(false);
1498
1499 /*
1500 * It is okay not to set the local_end LSN for the rollback of prepared
1501 * transaction because we always flush the WAL record for it. See
1502 * apply_handle_prepare.
1503 */
1505 in_remote_transaction = false;
1506
1507 /*
1508 * Process any tables that are being synchronized in parallel, as well as
1509 * any newly added tables or sequences.
1510 */
1511 ProcessSyncingRelations(rollback_data.rollback_end_lsn);
1512
1515}
1516
1517/*
1518 * Handle STREAM PREPARE.
1519 */
1520static void
1522{
1526
1527 /* Save the message before it is consumed. */
1529
1531 ereport(ERROR,
1533 errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1534
1535 /* Tablesync should never receive prepare. */
1536 if (am_tablesync_worker())
1537 ereport(ERROR,
1539 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1540
1543
1545
1546 switch (apply_action)
1547 {
1548 case TRANS_LEADER_APPLY:
1549
1550 /*
1551 * The transaction has been serialized to file, so replay all the
1552 * spooled operations.
1553 */
1555 prepare_data.xid, prepare_data.prepare_lsn);
1556
1557 /* Mark the transaction as prepared. */
1559
1561
1562 /*
1563 * It is okay not to set the local_end LSN for the prepare because
1564 * we always flush the prepare record. See apply_handle_prepare.
1565 */
1567
1568 in_remote_transaction = false;
1569
1570 /* Unlink the files with serialized changes and subxact info. */
1572
1573 elog(DEBUG1, "finished processing the STREAM PREPARE command");
1574 break;
1575
1577 Assert(winfo);
1578
1579 if (pa_send_data(winfo, s->len, s->data))
1580 {
1581 /* Finish processing the streaming transaction. */
1582 pa_xact_finish(winfo, prepare_data.end_lsn);
1583 break;
1584 }
1585
1586 /*
1587 * Switch to serialize mode when we are not able to send the
1588 * change to parallel apply worker.
1589 */
1590 pa_switch_to_partial_serialize(winfo, true);
1591
1594 Assert(winfo);
1595
1598 &original_msg);
1599
1601
1602 /* Finish processing the streaming transaction. */
1603 pa_xact_finish(winfo, prepare_data.end_lsn);
1604 break;
1605
1607
1608 /*
1609 * If the parallel apply worker is applying spooled messages then
1610 * close the file before preparing.
1611 */
1612 if (stream_fd)
1614
1616
1617 /* Mark the transaction as prepared. */
1619
1621
1623
1624 /*
1625 * It is okay not to set the local_end LSN for the prepare because
1626 * we always flush the prepare record. See apply_handle_prepare.
1627 */
1629
1632
1634
1635 elog(DEBUG1, "finished processing the STREAM PREPARE command");
1636 break;
1637
1638 default:
1639 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1640 break;
1641 }
1642
1643 pgstat_report_stat(false);
1644
1645 /*
1646 * Process any tables that are being synchronized in parallel, as well as
1647 * any newly added tables or sequences.
1648 */
1650
1651 /*
1652 * Similar to prepare case, the subskiplsn could be left in a case of
1653 * server crash but it's okay. See the comments in apply_handle_prepare().
1654 */
1657
1659
1661}
1662
1663/*
1664 * Handle ORIGIN message.
1665 *
1666 * TODO, support tracking of multiple origins
1667 */
1668static void
1670{
1671 /*
1672 * ORIGIN message can only come inside streaming transaction or inside
1673 * remote transaction and before any actual writes.
1674 */
1678 ereport(ERROR,
1680 errmsg_internal("ORIGIN message sent out of order")));
1681}
1682
1683/*
1684 * Initialize fileset (if not already done).
1685 *
1686 * Create a new file when first_segment is true, otherwise open the existing
1687 * file.
1688 */
1689void
1691{
1693
1694 /*
1695 * Initialize the worker's stream_fileset if we haven't yet. This will be
1696 * used for the entire duration of the worker so create it in a permanent
1697 * context. We create this on the very first streaming message from any
1698 * transaction and then use it for this and other streaming transactions.
1699 * Now, we could create a fileset at the start of the worker as well but
1700 * then we won't be sure that it will ever be used.
1701 */
1703 {
1705
1707
1710
1712 }
1713
1714 /* Open the spool file for this transaction. */
1716
1717 /* If this is not the first segment, open existing subxact file. */
1718 if (!first_segment)
1720
1722}
1723
1724/*
1725 * Handle STREAM START message.
1726 */
1727static void
1729{
1730 bool first_segment;
1733
1734 /* Save the message before it is consumed. */
1736
1738 ereport(ERROR,
1740 errmsg_internal("duplicate STREAM START message")));
1741
1742 /* There must not be an active streaming transaction. */
1744
1745 /* notify handle methods we're processing a remote transaction */
1747
1748 /* extract XID of the top-level transaction */
1750
1752 ereport(ERROR,
1754 errmsg_internal("invalid transaction ID in streamed replication transaction")));
1755
1757
1758 /* Try to allocate a worker for the streaming transaction. */
1759 if (first_segment)
1761
1763
1764 switch (apply_action)
1765 {
1767
1768 /*
1769 * Function stream_start_internal starts a transaction. This
1770 * transaction will be committed on the stream stop unless it is a
1771 * tablesync worker in which case it will be committed after
1772 * processing all the messages. We need this transaction for
1773 * handling the BufFile, used for serializing the streaming data
1774 * and subxact info.
1775 */
1777 break;
1778
1780 Assert(winfo);
1781
1782 /*
1783 * Once we start serializing the changes, the parallel apply
1784 * worker will wait for the leader to release the stream lock
1785 * until the end of the transaction. So, we don't need to release
1786 * the lock or increment the stream count in that case.
1787 */
1788 if (pa_send_data(winfo, s->len, s->data))
1789 {
1790 /*
1791 * Unlock the shared object lock so that the parallel apply
1792 * worker can continue to receive changes.
1793 */
1794 if (!first_segment)
1796
1797 /*
1798 * Increment the number of streaming blocks waiting to be
1799 * processed by parallel apply worker.
1800 */
1802
1803 /* Cache the parallel apply worker for this transaction. */
1805 break;
1806 }
1807
1808 /*
1809 * Switch to serialize mode when we are not able to send the
1810 * change to parallel apply worker.
1811 */
1813
1816 Assert(winfo);
1817
1818 /*
1819 * Open the spool file unless it was already opened when switching
1820 * to serialize mode. The transaction started in
1821 * stream_start_internal will be committed on the stream stop.
1822 */
1825
1827
1828 /* Cache the parallel apply worker for this transaction. */
1830 break;
1831
1833 if (first_segment)
1834 {
1835 /* Hold the lock until the end of the transaction. */
1838
1839 /*
1840 * Signal the leader apply worker, as it may be waiting for
1841 * us.
1842 */
1845 }
1846
1848 break;
1849
1850 default:
1851 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1852 break;
1853 }
1854
1856}
1857
1858/*
1859 * Update the information about subxacts and close the file.
1860 *
1861 * This function should be called when the stream_start_internal function has
1862 * been called.
1863 */
1864void
1866{
1867 /*
1868 * Serialize information about subxacts for the toplevel transaction, then
1869 * close the stream messages spool file.
1870 */
1873
1874 /* We must be in a valid transaction state */
1876
1877 /* Commit the per-stream transaction */
1879
1880 /* Reset per-stream context */
1882}
1883
1884/*
1885 * Handle STREAM STOP message.
1886 */
1887static void
1889{
1892
1894 ereport(ERROR,
1896 errmsg_internal("STREAM STOP message without STREAM START")));
1897
1899
1900 switch (apply_action)
1901 {
1904 break;
1905
1907 Assert(winfo);
1908
1909 /*
1910 * Lock before sending the STREAM_STOP message so that the leader
1911 * can hold the lock first and the parallel apply worker will wait
1912 * for leader to release the lock. See Locking Considerations atop
1913 * applyparallelworker.c.
1914 */
1916
1917 if (pa_send_data(winfo, s->len, s->data))
1918 {
1920 break;
1921 }
1922
1923 /*
1924 * Switch to serialize mode when we are not able to send the
1925 * change to parallel apply worker.
1926 */
1927 pa_switch_to_partial_serialize(winfo, true);
1928
1934 break;
1935
1937 elog(DEBUG1, "applied %u changes in the streaming chunk",
1939
1940 /*
1941 * By the time parallel apply worker is processing the changes in
1942 * the current streaming block, the leader apply worker may have
1943 * sent multiple streaming blocks. This can lead to parallel apply
1944 * worker start waiting even when there are more chunk of streams
1945 * in the queue. So, try to lock only if there is no message left
1946 * in the queue. See Locking Considerations atop
1947 * applyparallelworker.c.
1948 *
1949 * Note that here we have a race condition where we can start
1950 * waiting even when there are pending streaming chunks. This can
1951 * happen if the leader sends another streaming block and acquires
1952 * the stream lock again after the parallel apply worker checks
1953 * that there is no pending streaming block and before it actually
1954 * starts waiting on a lock. We can handle this case by not
1955 * allowing the leader to increment the stream block count during
1956 * the time parallel apply worker acquires the lock but it is not
1957 * clear whether that is worth the complexity.
1958 *
1959 * Now, if this missed chunk contains rollback to savepoint, then
1960 * there is a risk of deadlock which probably shouldn't happen
1961 * after restart.
1962 */
1964 break;
1965
1966 default:
1967 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1968 break;
1969 }
1970
1973
1974 /*
1975 * The parallel apply worker could be in a transaction in which case we
1976 * need to report the state as STATE_IDLEINTRANSACTION.
1977 */
1980 else
1982
1984}
1985
1986/*
1987 * Helper function to handle STREAM ABORT message when the transaction was
1988 * serialized to file.
1989 */
1990static void
1992{
1993 /*
1994 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1995 * just delete the files with serialized info.
1996 */
1997 if (xid == subxid)
1999 else
2000 {
2001 /*
2002 * OK, so it's a subxact. We need to read the subxact file for the
2003 * toplevel transaction, determine the offset tracked for the subxact,
2004 * and truncate the file with changes. We also remove the subxacts
2005 * with higher offsets (or rather higher XIDs).
2006 *
2007 * We intentionally scan the array from the tail, because we're likely
2008 * aborting a change for the most recent subtransactions.
2009 *
2010 * We can't use the binary search here as subxact XIDs won't
2011 * necessarily arrive in sorted order, consider the case where we have
2012 * released the savepoint for multiple subtransactions and then
2013 * performed rollback to savepoint for one of the earlier
2014 * sub-transaction.
2015 */
2016 int64 i;
2017 int64 subidx;
2018 BufFile *fd;
2019 bool found = false;
2020 char path[MAXPGPATH];
2021
2022 subidx = -1;
2025
2026 for (i = subxact_data.nsubxacts; i > 0; i--)
2027 {
2028 if (subxact_data.subxacts[i - 1].xid == subxid)
2029 {
2030 subidx = (i - 1);
2031 found = true;
2032 break;
2033 }
2034 }
2035
2036 /*
2037 * If it's an empty sub-transaction then we will not find the subxid
2038 * here so just cleanup the subxact info and return.
2039 */
2040 if (!found)
2041 {
2042 /* Cleanup the subxact info */
2046 return;
2047 }
2048
2049 /* open the changes file */
2052 O_RDWR, false);
2053
2054 /* OK, truncate the file at the right offset */
2058
2059 /* discard the subxacts added later */
2061
2062 /* write the updated subxact list */
2064
2067 }
2068}
2069
2070/*
2071 * Handle STREAM ABORT message.
2072 */
2073static void
2075{
2076 TransactionId xid;
2077 TransactionId subxid;
2081
2082 /* Save the message before it is consumed. */
2084 bool toplevel_xact;
2085
2087 ereport(ERROR,
2089 errmsg_internal("STREAM ABORT message without STREAM STOP")));
2090
2091 /* We receive abort information only when we can apply in parallel. */
2094
2095 xid = abort_data.xid;
2096 subxid = abort_data.subxid;
2097 toplevel_xact = (xid == subxid);
2098
2099 set_apply_error_context_xact(subxid, abort_data.abort_lsn);
2100
2102
2103 switch (apply_action)
2104 {
2105 case TRANS_LEADER_APPLY:
2106
2107 /*
2108 * We are in the leader apply worker and the transaction has been
2109 * serialized to file.
2110 */
2111 stream_abort_internal(xid, subxid);
2112
2113 elog(DEBUG1, "finished processing the STREAM ABORT command");
2114 break;
2115
2117 Assert(winfo);
2118
2119 /*
2120 * For the case of aborting the subtransaction, we increment the
2121 * number of streaming blocks and take the lock again before
2122 * sending the STREAM_ABORT to ensure that the parallel apply
2123 * worker will wait on the lock for the next set of changes after
2124 * processing the STREAM_ABORT message if it is not already
2125 * waiting for STREAM_STOP message.
2126 *
2127 * It is important to perform this locking before sending the
2128 * STREAM_ABORT message so that the leader can hold the lock first
2129 * and the parallel apply worker will wait for the leader to
2130 * release the lock. This is the same as what we do in
2131 * apply_handle_stream_stop. See Locking Considerations atop
2132 * applyparallelworker.c.
2133 */
2134 if (!toplevel_xact)
2135 {
2139 }
2140
2141 if (pa_send_data(winfo, s->len, s->data))
2142 {
2143 /*
2144 * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
2145 * wait here for the parallel apply worker to finish as that
2146 * is not required to maintain the commit order and won't have
2147 * the risk of failures due to transaction dependencies and
2148 * deadlocks. However, it is possible that before the parallel
2149 * worker finishes and we clear the worker info, the xid
2150 * wraparound happens on the upstream and a new transaction
2151 * with the same xid can appear and that can lead to duplicate
2152 * entries in ParallelApplyTxnHash. Yet another problem could
2153 * be that we may have serialized the changes in partial
2154 * serialize mode and the file containing xact changes may
2155 * already exist, and after xid wraparound trying to create
2156 * the file for the same xid can lead to an error. To avoid
2157 * these problems, we decide to wait for the aborts to finish.
2158 *
2159 * Note, it is okay to not update the flush location position
2160 * for aborts as in worst case that means such a transaction
2161 * won't be sent again after restart.
2162 */
2163 if (toplevel_xact)
2165
2166 break;
2167 }
2168
2169 /*
2170 * Switch to serialize mode when we are not able to send the
2171 * change to parallel apply worker.
2172 */
2173 pa_switch_to_partial_serialize(winfo, true);
2174
2177 Assert(winfo);
2178
2179 /*
2180 * Parallel apply worker might have applied some changes, so write
2181 * the STREAM_ABORT message so that it can rollback the
2182 * subtransaction if needed.
2183 */
2185 &original_msg);
2186
2187 if (toplevel_xact)
2188 {
2191 }
2192 break;
2193
2195
2196 /*
2197 * If the parallel apply worker is applying spooled messages then
2198 * close the file before aborting.
2199 */
2200 if (toplevel_xact && stream_fd)
2202
2204
2205 /*
2206 * We need to wait after processing rollback to savepoint for the
2207 * next set of changes.
2208 *
2209 * We have a race condition here due to which we can start waiting
2210 * here when there are more chunk of streams in the queue. See
2211 * apply_handle_stream_stop.
2212 */
2213 if (!toplevel_xact)
2215
2216 elog(DEBUG1, "finished processing the STREAM ABORT command");
2217 break;
2218
2219 default:
2220 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2221 break;
2222 }
2223
2225}
2226
2227/*
2228 * Ensure that the passed location is fileset's end.
2229 */
2230static void
2231ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
2232 pgoff_t offset)
2233{
2234 char path[MAXPGPATH];
2235 BufFile *fd;
2236 int last_fileno;
2238
2240
2242
2244
2245 fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2246
2247 BufFileSeek(fd, 0, 0, SEEK_END);
2249
2251
2253
2254 if (last_fileno != fileno || last_offset != offset)
2255 elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2256 path);
2257}
2258
2259/*
2260 * Common spoolfile processing.
2261 */
2262void
2264 XLogRecPtr lsn)
2265{
2266 int nchanges;
2267 char path[MAXPGPATH];
2268 char *buffer = NULL;
2270 ResourceOwner oldowner;
2271 int fileno;
2272 pgoff_t offset;
2273
2276
2277 /* Make sure we have an open transaction */
2279
2280 /*
2281 * Allocate file handle and memory required to process all the messages in
2282 * TopTransactionContext to avoid them getting reset after each message is
2283 * processed.
2284 */
2286
2287 /* Open the spool file for the committed/prepared transaction */
2289 elog(DEBUG1, "replaying changes from file \"%s\"", path);
2290
2291 /*
2292 * Make sure the file is owned by the toplevel transaction so that the
2293 * file will not be accidentally closed when aborting a subtransaction.
2294 */
2295 oldowner = CurrentResourceOwner;
2297
2298 stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2299
2300 CurrentResourceOwner = oldowner;
2301
2302 buffer = palloc(BLCKSZ);
2303
2305
2306 remote_final_lsn = lsn;
2307
2308 /*
2309 * Make sure the handle apply_dispatch methods are aware we're in a remote
2310 * transaction.
2311 */
2312 in_remote_transaction = true;
2314
2316
2317 /*
2318 * Read the entries one by one and pass them through the same logic as in
2319 * apply_dispatch.
2320 */
2321 nchanges = 0;
2322 while (true)
2323 {
2325 size_t nbytes;
2326 int len;
2327
2329
2330 /* read length of the on-disk record */
2331 nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2332
2333 /* have we reached end of the file? */
2334 if (nbytes == 0)
2335 break;
2336
2337 /* do we have a correct length? */
2338 if (len <= 0)
2339 elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2340 len, path);
2341
2342 /* make sure we have sufficiently large buffer */
2343 buffer = repalloc(buffer, len);
2344
2345 /* and finally read the data into the buffer */
2346 BufFileReadExact(stream_fd, buffer, len);
2347
2348 BufFileTell(stream_fd, &fileno, &offset);
2349
2350 /* init a stringinfo using the buffer and call apply_dispatch */
2351 initReadOnlyStringInfo(&s2, buffer, len);
2352
2353 /* Ensure we are reading the data into our memory context. */
2355
2357
2359
2361
2362 nchanges++;
2363
2364 /*
2365 * It is possible the file has been closed because we have processed
2366 * the transaction end message like stream_commit in which case that
2367 * must be the last message.
2368 */
2369 if (!stream_fd)
2370 {
2371 ensure_last_message(stream_fileset, xid, fileno, offset);
2372 break;
2373 }
2374
2375 if (nchanges % 1000 == 0)
2376 elog(DEBUG1, "replayed %d changes from file \"%s\"",
2377 nchanges, path);
2378 }
2379
2380 if (stream_fd)
2382
2383 elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2384 nchanges, path);
2385
2386 return;
2387}
2388
2389/*
2390 * Handle STREAM COMMIT message.
2391 */
2392static void
2394{
2395 TransactionId xid;
2399
2400 /* Save the message before it is consumed. */
2402
2404 ereport(ERROR,
2406 errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2407
2410
2412
2413 switch (apply_action)
2414 {
2415 case TRANS_LEADER_APPLY:
2416
2417 /*
2418 * The transaction has been serialized to file, so replay all the
2419 * spooled operations.
2420 */
2422 commit_data.commit_lsn);
2423
2425
2426 /* Unlink the files with serialized changes and subxact info. */
2428
2429 elog(DEBUG1, "finished processing the STREAM COMMIT command");
2430 break;
2431
2433 Assert(winfo);
2434
2435 if (pa_send_data(winfo, s->len, s->data))
2436 {
2437 /* Finish processing the streaming transaction. */
2438 pa_xact_finish(winfo, commit_data.end_lsn);
2439 break;
2440 }
2441
2442 /*
2443 * Switch to serialize mode when we are not able to send the
2444 * change to parallel apply worker.
2445 */
2446 pa_switch_to_partial_serialize(winfo, true);
2447
2450 Assert(winfo);
2451
2453 &original_msg);
2454
2456
2457 /* Finish processing the streaming transaction. */
2458 pa_xact_finish(winfo, commit_data.end_lsn);
2459 break;
2460
2462
2463 /*
2464 * If the parallel apply worker is applying spooled messages then
2465 * close the file before committing.
2466 */
2467 if (stream_fd)
2469
2471
2473
2474 /*
2475 * It is important to set the transaction state as finished before
2476 * releasing the lock. See pa_wait_for_xact_finish.
2477 */
2480
2482
2483 elog(DEBUG1, "finished processing the STREAM COMMIT command");
2484 break;
2485
2486 default:
2487 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2488 break;
2489 }
2490
2491 /*
2492 * Process any tables that are being synchronized in parallel, as well as
2493 * any newly added tables or sequences.
2494 */
2496
2498
2500}
2501
2502/*
2503 * Helper function for apply_handle_commit and apply_handle_stream_commit.
2504 */
2505static void
2507{
2508 if (is_skipping_changes())
2509 {
2511
2512 /*
2513 * Start a new transaction to clear the subskiplsn, if not started
2514 * yet.
2515 */
2516 if (!IsTransactionState())
2518 }
2519
2520 if (IsTransactionState())
2521 {
2522 /*
2523 * The transaction is either non-empty or skipped, so we clear the
2524 * subskiplsn.
2525 */
2527
2528 /*
2529 * Update origin state so we can restart streaming from correct
2530 * position in case of crash.
2531 */
2534
2536
2537 if (IsTransactionBlock())
2538 {
2539 EndTransactionBlock(false);
2541 }
2542
2543 pgstat_report_stat(false);
2544
2546 }
2547 else
2548 {
2549 /* Process any invalidation messages that might have accumulated. */
2552 }
2553
2554 in_remote_transaction = false;
2555}
2556
2557/*
2558 * Handle RELATION message.
2559 *
2560 * Note we don't do validation against local schema here. The validation
2561 * against local schema is postponed until first change for given relation
2562 * comes as we only care about it when applying changes for it anyway and we
2563 * do less locking this way.
2564 */
2565static void
2567{
2568 LogicalRepRelation *rel;
2569
2571 return;
2572
2573 rel = logicalrep_read_rel(s);
2575
2576 /* Also reset all entries in the partition map that refer to remoterel. */
2578}
2579
2580/*
2581 * Handle TYPE message.
2582 *
2583 * This implementation pays no attention to TYPE messages; we expect the user
2584 * to have set things up so that the incoming data is acceptable to the input
2585 * functions for the locally subscribed tables. Hence, we just read and
2586 * discard the message.
2587 */
2588static void
2598
2599/*
2600 * Check that we (the subscription owner) have sufficient privileges on the
2601 * target relation to perform the given operation.
2602 */
2603static void
2605{
2606 Oid relid;
2608
2609 relid = RelationGetRelid(rel);
2611 if (aclresult != ACLCHECK_OK)
2613 get_relkind_objtype(rel->rd_rel->relkind),
2614 get_rel_name(relid));
2615
2616 /*
2617 * We lack the infrastructure to honor RLS policies. It might be possible
2618 * to add such infrastructure here, but tablesync workers lack it, too, so
2619 * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2620 * but it seems dangerous to replicate a TRUNCATE and then refuse to
2621 * replicate subsequent INSERTs, so we forbid all commands the same.
2622 */
2623 if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2624 ereport(ERROR,
2626 errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2629}
2630
2631/*
2632 * Handle INSERT message.
2633 */
2634
2635static void
2637{
2640 LogicalRepRelId relid;
2643 EState *estate;
2646 bool run_as_owner;
2647
2648 /*
2649 * Quick return if we are skipping data modification changes or handling
2650 * streamed transactions.
2651 */
2652 if (is_skipping_changes() ||
2654 return;
2655
2657
2658 relid = logicalrep_read_insert(s, &newtup);
2661 {
2662 /*
2663 * The relation can't become interesting in the middle of the
2664 * transaction so it's safe to unlock it.
2665 */
2668 return;
2669 }
2670
2671 /*
2672 * Make sure that any user-supplied code runs as the table owner, unless
2673 * the user has opted out of that behavior.
2674 */
2676 if (!run_as_owner)
2677 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2678
2679 /* Set relation for error callback */
2681
2682 /* Initialize the executor state. */
2684 estate = edata->estate;
2687 &TTSOpsVirtual);
2688
2689 /* Process and store remote tuple in the slot */
2692 slot_fill_defaults(rel, estate, remoteslot);
2694
2695 /* For a partitioned table, insert the tuple into a partition. */
2696 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2699 else
2700 {
2701 ResultRelInfo *relinfo = edata->targetRelInfo;
2702
2703 ExecOpenIndices(relinfo, false);
2706 }
2707
2709
2710 /* Reset relation for error callback */
2712
2713 if (!run_as_owner)
2715
2717
2719}
2720
2721/*
2722 * Workhorse for apply_handle_insert()
2723 * relinfo is for the relation we're actually inserting into
2724 * (could be a child partition of edata->targetRelInfo)
2725 */
2726static void
2730{
2731 EState *estate = edata->estate;
2732
2733 /* Caller should have opened indexes already. */
2734 Assert(relinfo->ri_IndexRelationDescs != NULL ||
2735 !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
2736 RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
2737
2738 /* Caller will not have done this bit. */
2739 Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
2741
2742 /* Do the insert. */
2743 TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
2745}
2746
2747/*
2748 * Check if the logical replication relation is updatable and throw
2749 * appropriate error if it isn't.
2750 */
2751static void
2753{
2754 /*
2755 * For partitioned tables, we only need to care if the target partition is
2756 * updatable (aka has PK or RI defined for it).
2757 */
2758 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2759 return;
2760
2761 /* Updatable, no error. */
2762 if (rel->updatable)
2763 return;
2764
2765 /*
2766 * We are in error mode so it's fine this is somewhat slow. It's better to
2767 * give user correct error.
2768 */
2770 {
2771 ereport(ERROR,
2773 errmsg("publisher did not send replica identity column "
2774 "expected by the logical replication target relation \"%s.%s\"",
2775 rel->remoterel.nspname, rel->remoterel.relname)));
2776 }
2777
2778 ereport(ERROR,
2780 errmsg("logical replication target relation \"%s.%s\" has "
2781 "neither REPLICA IDENTITY index nor PRIMARY "
2782 "KEY and published relation does not have "
2783 "REPLICA IDENTITY FULL",
2784 rel->remoterel.nspname, rel->remoterel.relname)));
2785}
2786
2787/*
2788 * Handle UPDATE message.
2789 *
2790 * TODO: FDW support
2791 */
2792static void
2794{
2796 LogicalRepRelId relid;
2799 EState *estate;
2802 bool has_oldtup;
2806 bool run_as_owner;
2807
2808 /*
2809 * Quick return if we are skipping data modification changes or handling
2810 * streamed transactions.
2811 */
2812 if (is_skipping_changes() ||
2814 return;
2815
2817
2819 &newtup);
2822 {
2823 /*
2824 * The relation can't become interesting in the middle of the
2825 * transaction so it's safe to unlock it.
2826 */
2829 return;
2830 }
2831
2832 /* Set relation for error callback */
2834
2835 /* Check if we can do the update. */
2837
2838 /*
2839 * Make sure that any user-supplied code runs as the table owner, unless
2840 * the user has opted out of that behavior.
2841 */
2843 if (!run_as_owner)
2844 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2845
2846 /* Initialize the executor state. */
2848 estate = edata->estate;
2851 &TTSOpsVirtual);
2852
2853 /*
2854 * Populate updatedCols so that per-column triggers can fire, and so
2855 * executor can correctly pass down indexUnchanged hint. This could
2856 * include more columns than were actually changed on the publisher
2857 * because the logical replication protocol doesn't contain that
2858 * information. But it would for example exclude columns that only exist
2859 * on the subscriber, since we are not touching those.
2860 */
2862 for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2863 {
2864 CompactAttribute *att = TupleDescCompactAttr(remoteslot->tts_tupleDescriptor, i);
2865 int remoteattnum = rel->attrmap->attnums[i];
2866
2867 if (!att->attisdropped && remoteattnum >= 0)
2868 {
2869 Assert(remoteattnum < newtup.ncols);
2871 target_perminfo->updatedCols =
2872 bms_add_member(target_perminfo->updatedCols,
2874 }
2875 }
2876
2877 /* Build the search tuple. */
2880 has_oldtup ? &oldtup : &newtup);
2882
2883 /* For a partitioned table, apply update to correct partition. */
2884 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2887 else
2890
2892
2893 /* Reset relation for error callback */
2895
2896 if (!run_as_owner)
2898
2900
2902}
2903
2904/*
2905 * Workhorse for apply_handle_update()
2906 * relinfo is for the relation we're actually updating in
2907 * (could be a child partition of edata->targetRelInfo)
2908 */
2909static void
2914 Oid localindexoid)
2915{
2916 EState *estate = edata->estate;
2917 LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2918 Relation localrel = relinfo->ri_RelationDesc;
2919 EPQState epqstate;
2922 bool found;
2924
2925 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2926 ExecOpenIndices(relinfo, false);
2927
2928 found = FindReplTupleInLocalRel(edata, localrel,
2929 &relmapentry->remoterel,
2930 localindexoid,
2932
2933 /*
2934 * Tuple found.
2935 *
2936 * Note this will fail if there are other conflicting unique indexes.
2937 */
2938 if (found)
2939 {
2940 /*
2941 * Report the conflict if the tuple was modified by a different
2942 * origin.
2943 */
2945 &conflicttuple.origin, &conflicttuple.ts) &&
2947 {
2949
2950 /* Store the new tuple for conflict reporting */
2951 newslot = table_slot_create(localrel, &estate->es_tupleTable);
2952 slot_store_data(newslot, relmapentry, newtup);
2953
2954 conflicttuple.slot = localslot;
2955
2959 }
2960
2961 /* Process and store remote tuple in the slot */
2965
2966 EvalPlanQualSetSlot(&epqstate, remoteslot);
2967
2969
2970 /* Do the actual update. */
2971 TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
2972 ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2973 remoteslot);
2974 }
2975 else
2976 {
2979
2980 /*
2981 * Detecting whether the tuple was recently deleted or never existed
2982 * is crucial to avoid misleading the user during conflict handling.
2983 */
2984 if (FindDeletedTupleInLocalRel(localrel, localindexoid, remoteslot,
2985 &conflicttuple.xmin,
2986 &conflicttuple.origin,
2987 &conflicttuple.ts) &&
2990 else
2992
2993 /* Store the new tuple for conflict reporting */
2994 slot_store_data(newslot, relmapentry, newtup);
2995
2996 /*
2997 * The tuple to be updated could not be found or was deleted. Do
2998 * nothing except for emitting a log message.
2999 */
3002 }
3003
3004 /* Cleanup. */
3006 EvalPlanQualEnd(&epqstate);
3007}
3008
3009/*
3010 * Handle DELETE message.
3011 *
3012 * TODO: FDW support
3013 */
3014static void
3016{
3019 LogicalRepRelId relid;
3022 EState *estate;
3025 bool run_as_owner;
3026
3027 /*
3028 * Quick return if we are skipping data modification changes or handling
3029 * streamed transactions.
3030 */
3031 if (is_skipping_changes() ||
3033 return;
3034
3036
3037 relid = logicalrep_read_delete(s, &oldtup);
3040 {
3041 /*
3042 * The relation can't become interesting in the middle of the
3043 * transaction so it's safe to unlock it.
3044 */
3047 return;
3048 }
3049
3050 /* Set relation for error callback */
3052
3053 /* Check if we can do the delete. */
3055
3056 /*
3057 * Make sure that any user-supplied code runs as the table owner, unless
3058 * the user has opted out of that behavior.
3059 */
3061 if (!run_as_owner)
3062 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
3063
3064 /* Initialize the executor state. */
3066 estate = edata->estate;
3069 &TTSOpsVirtual);
3070
3071 /* Build the search tuple. */
3075
3076 /* For a partitioned table, apply delete to correct partition. */
3077 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3080 else
3081 {
3082 ResultRelInfo *relinfo = edata->targetRelInfo;
3083
3084 ExecOpenIndices(relinfo, false);
3088 }
3089
3091
3092 /* Reset relation for error callback */
3094
3095 if (!run_as_owner)
3097
3099
3101}
3102
3103/*
3104 * Workhorse for apply_handle_delete()
3105 * relinfo is for the relation we're actually deleting from
3106 * (could be a child partition of edata->targetRelInfo)
3107 */
3108static void
3112 Oid localindexoid)
3113{
3114 EState *estate = edata->estate;
3115 Relation localrel = relinfo->ri_RelationDesc;
3116 LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
3117 EPQState epqstate;
3120 bool found;
3121
3122 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3123
3124 /* Caller should have opened indexes already. */
3125 Assert(relinfo->ri_IndexRelationDescs != NULL ||
3126 !localrel->rd_rel->relhasindex ||
3127 RelationGetIndexList(localrel) == NIL);
3128
3129 found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
3131
3132 /* If found delete it. */
3133 if (found)
3134 {
3135 /*
3136 * Report the conflict if the tuple was modified by a different
3137 * origin.
3138 */
3140 &conflicttuple.origin, &conflicttuple.ts) &&
3142 {
3143 conflicttuple.slot = localslot;
3147 }
3148
3149 EvalPlanQualSetSlot(&epqstate, localslot);
3150
3151 /* Do the actual delete. */
3152 TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
3153 ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
3154 }
3155 else
3156 {
3157 /*
3158 * The tuple to be deleted could not be found. Do nothing except for
3159 * emitting a log message.
3160 */
3163 }
3164
3165 /* Cleanup. */
3166 EvalPlanQualEnd(&epqstate);
3167}
3168
3169/*
3170 * Try to find a tuple received from the publication side (in 'remoteslot') in
3171 * the corresponding local relation using either replica identity index,
3172 * primary key, index or if needed, sequential scan.
3173 *
3174 * Local tuple, if found, is returned in '*localslot'.
3175 */
3176static bool
3178 LogicalRepRelation *remoterel,
3182{
3183 EState *estate = edata->estate;
3184 bool found;
3185
3186 /*
3187 * Regardless of the top-level operation, we're performing a read here, so
3188 * check for SELECT privileges.
3189 */
3191
3192 *localslot = table_slot_create(localrel, &estate->es_tupleTable);
3193
3195 (remoterel->replident == REPLICA_IDENTITY_FULL));
3196
3198 {
3199#ifdef USE_ASSERT_CHECKING
3201
3202 /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
3204 (remoterel->replident == REPLICA_IDENTITY_FULL &&
3206 edata->targetRel->attrmap)));
3208#endif
3209
3210 found = RelationFindReplTupleByIndex(localrel, localidxoid,
3213 }
3214 else
3217
3218 return found;
3219}
3220
3221/*
3222 * Determine whether the index can reliably locate the deleted tuple in the
3223 * local relation.
3224 *
3225 * An index may exclude deleted tuples if it was re-indexed or re-created during
3226 * change application. Therefore, an index is considered usable only if the
3227 * conflict detection slot.xmin (conflict_detection_xmin) is greater than the
3228 * index tuple's xmin. This ensures that any tuples deleted prior to the index
3229 * creation or re-indexing are not relevant for conflict detection in the
3230 * current apply worker.
3231 *
3232 * Note that indexes may also be excluded if they were modified by other DDL
3233 * operations, such as ALTER INDEX. However, this is acceptable, as the
3234 * likelihood of such DDL changes coinciding with the need to scan dead
3235 * tuples for the update_deleted is low.
3236 */
3237static bool
3240{
3243
3245
3246 if (!HeapTupleIsValid(index_tuple)) /* should not happen */
3247 elog(ERROR, "cache lookup failed for index %u", localindexoid);
3248
3249 /*
3250 * No need to check for a frozen transaction ID, as
3251 * TransactionIdPrecedes() manages it internally, treating it as falling
3252 * behind the conflict_detection_xmin.
3253 */
3255
3257
3259}
3260
3261/*
3262 * Attempts to locate a deleted tuple in the local relation that matches the
3263 * values of the tuple received from the publication side (in 'remoteslot').
3264 * The search is performed using either the replica identity index, primary
3265 * key, other available index, or a sequential scan if necessary.
3266 *
3267 * Returns true if the deleted tuple is found. If found, the transaction ID,
3268 * origin, and commit timestamp of the deletion are stored in '*delete_xid',
3269 * '*delete_origin', and '*delete_time' respectively.
3270 */
3271static bool
3276{
3278
3279 /*
3280 * Return false if either dead tuples are not retained or commit timestamp
3281 * data is not available.
3282 */
3284 return false;
3285
3286 /*
3287 * For conflict detection, we use the leader worker's
3288 * oldest_nonremovable_xid value instead of invoking
3289 * GetOldestNonRemovableTransactionId() or using the conflict detection
3290 * slot's xmin. The oldest_nonremovable_xid acts as a threshold to
3291 * identify tuples that were recently deleted. These deleted tuples are no
3292 * longer visible to concurrent transactions. However, if a remote update
3293 * matches such a tuple, we log an update_deleted conflict.
3294 *
3295 * While GetOldestNonRemovableTransactionId() and slot.xmin may return
3296 * transaction IDs older than oldest_nonremovable_xid, for our current
3297 * purpose, it is acceptable to treat tuples deleted by transactions prior
3298 * to oldest_nonremovable_xid as update_missing conflicts.
3299 */
3301 {
3303 }
3304 else
3305 {
3306 LogicalRepWorker *leader;
3307
3308 /*
3309 * Obtain the information from the leader apply worker as only the
3310 * leader manages oldest_nonremovable_xid (see
3311 * maybe_advance_nonremovable_xid() for details).
3312 */
3316 false);
3317 if (!leader)
3318 {
3319 ereport(ERROR,
3321 errmsg("could not detect conflict as the leader apply worker has exited")));
3322 }
3323
3324 SpinLockAcquire(&leader->relmutex);
3326 SpinLockRelease(&leader->relmutex);
3328 }
3329
3330 /*
3331 * Return false if the leader apply worker has stopped retaining
3332 * information for detecting conflicts. This implies that update_deleted
3333 * can no longer be reliably detected.
3334 */
3336 return false;
3337
3338 if (OidIsValid(localidxoid) &&
3343 delete_time);
3344 else
3348}
3349
3350/*
3351 * This handles insert, update, delete on a partitioned table.
3352 */
3353static void
3357 CmdType operation)
3358{
3359 EState *estate = edata->estate;
3360 LogicalRepRelMapEntry *relmapentry = edata->targetRel;
3361 ResultRelInfo *relinfo = edata->targetRelInfo;
3362 Relation parentrel = relinfo->ri_RelationDesc;
3363 ModifyTableState *mtstate;
3364 PartitionTupleRouting *proute;
3366 Relation partrel;
3368 TupleConversionMap *map;
3371 AttrMap *attrmap = NULL;
3372
3373 /* ModifyTableState is needed for ExecFindPartition(). */
3374 edata->mtstate = mtstate = makeNode(ModifyTableState);
3375 mtstate->ps.plan = NULL;
3376 mtstate->ps.state = estate;
3377 mtstate->operation = operation;
3378 mtstate->resultRelInfo = relinfo;
3379
3380 /* ... as is PartitionTupleRouting. */
3381 edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
3382
3383 /*
3384 * Find the partition to which the "search tuple" belongs.
3385 */
3388 partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
3389 remoteslot, estate);
3391 partrel = partrelinfo->ri_RelationDesc;
3392
3393 /*
3394 * Check for supported relkind. We need this since partitions might be of
3395 * unsupported relkinds; and the set of partitions can change, so checking
3396 * at CREATE/ALTER SUBSCRIPTION would be insufficient.
3397 */
3398 CheckSubscriptionRelkind(partrel->rd_rel->relkind,
3399 relmapentry->remoterel.relkind,
3401 RelationGetRelationName(partrel));
3402
3403 /*
3404 * To perform any of the operations below, the tuple must match the
3405 * partition's rowtype. Convert if needed or just copy, using a dedicated
3406 * slot to store the tuple in any case.
3407 */
3408 remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
3409 if (remoteslot_part == NULL)
3410 remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
3411 map = ExecGetRootToChildMap(partrelinfo, estate);
3412 if (map != NULL)
3413 {
3414 attrmap = map->attrMap;
3417 }
3418 else
3419 {
3422 }
3424
3425 /* Check if we can do the update or delete on the leaf partition. */
3426 if (operation == CMD_UPDATE || operation == CMD_DELETE)
3427 {
3428 part_entry = logicalrep_partition_open(relmapentry, partrel,
3429 attrmap);
3431 }
3432
3433 switch (operation)
3434 {
3435 case CMD_INSERT:
3438 break;
3439
3440 case CMD_DELETE:
3443 part_entry->localindexoid);
3444 break;
3445
3446 case CMD_UPDATE:
3447
3448 /*
3449 * For UPDATE, depending on whether or not the updated tuple
3450 * satisfies the partition's constraint, perform a simple UPDATE
3451 * of the partition or move the updated tuple into a different
3452 * suitable partition.
3453 */
3454 {
3458 bool found;
3459 EPQState epqstate;
3461
3462 /* Get the matching local tuple from the partition. */
3463 found = FindReplTupleInLocalRel(edata, partrel,
3464 &part_entry->remoterel,
3465 part_entry->localindexoid,
3467 if (!found)
3468 {
3471
3472 /*
3473 * Detecting whether the tuple was recently deleted or
3474 * never existed is crucial to avoid misleading the user
3475 * during conflict handling.
3476 */
3477 if (FindDeletedTupleInLocalRel(partrel,
3478 part_entry->localindexoid,
3480 &conflicttuple.xmin,
3481 &conflicttuple.origin,
3482 &conflicttuple.ts) &&
3485 else
3487
3488 /* Store the new tuple for conflict reporting */
3490
3491 /*
3492 * The tuple to be updated could not be found or was
3493 * deleted. Do nothing except for emitting a log message.
3494 */
3498
3499 return;
3500 }
3501
3502 /*
3503 * Report the conflict if the tuple was modified by a
3504 * different origin.
3505 */
3507 &conflicttuple.origin,
3508 &conflicttuple.ts) &&
3510 {
3512
3513 /* Store the new tuple for conflict reporting */
3514 newslot = table_slot_create(partrel, &estate->es_tupleTable);
3516
3517 conflicttuple.slot = localslot;
3518
3522 }
3523
3524 /*
3525 * Apply the update to the local tuple, putting the result in
3526 * remoteslot_part.
3527 */
3530 newtup);
3532
3533 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3534
3535 /*
3536 * Does the updated tuple still satisfy the current
3537 * partition's constraint?
3538 */
3539 if (!partrel->rd_rel->relispartition ||
3541 false))
3542 {
3543 /*
3544 * Yes, so simply UPDATE the partition. We don't call
3545 * apply_handle_update_internal() here, which would
3546 * normally do the following work, to avoid repeating some
3547 * work already done above to find the local tuple in the
3548 * partition.
3549 */
3551
3553 TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
3554 ACL_UPDATE);
3555 ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3557 }
3558 else
3559 {
3560 /* Move the tuple into the new partition. */
3561
3562 /*
3563 * New partition will be found using tuple routing, which
3564 * can only occur via the parent table. We might need to
3565 * convert the tuple to the parent's rowtype. Note that
3566 * this is the tuple found in the partition, not the
3567 * original search tuple received by this function.
3568 */
3569 if (map)
3570 {
3574
3575 remoteslot =
3578 }
3579 else
3580 {
3583 }
3584
3585 /* Find the new partition. */
3588 proute, remoteslot,
3589 estate);
3592 partrel_new = partrelinfo_new->ri_RelationDesc;
3593
3594 /* Check that new partition also has supported relkind. */
3595 CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3596 relmapentry->remoterel.relkind,
3599
3600 /* DELETE old tuple found in the old partition. */
3601 EvalPlanQualSetSlot(&epqstate, localslot);
3602 TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);
3603 ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
3604
3605 /* INSERT new tuple into the new partition. */
3606
3607 /*
3608 * Convert the replacement tuple to match the destination
3609 * partition rowtype.
3610 */
3612 remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3613 if (remoteslot_part == NULL)
3615 &estate->es_tupleTable);
3617 if (map != NULL)
3618 {
3620 remoteslot,
3622 }
3623 else
3624 {
3626 remoteslot);
3628 }
3632 }
3633
3634 EvalPlanQualEnd(&epqstate);
3635 }
3636 break;
3637
3638 default:
3639 elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3640 break;
3641 }
3642}
3643
3644/*
3645 * Handle TRUNCATE message.
3646 *
3647 * TODO: FDW support
3648 */
3649static void
3651{
3652 bool cascade = false;
3653 bool restart_seqs = false;
3655 List *remote_rels = NIL;
3656 List *rels = NIL;
3657 List *part_rels = NIL;
3658 List *relids = NIL;
3660 ListCell *lc;
3661 LOCKMODE lockmode = AccessExclusiveLock;
3662
3663 /*
3664 * Quick return if we are skipping data modification changes or handling
3665 * streamed transactions.
3666 */
3667 if (is_skipping_changes() ||
3669 return;
3670
3672
3673 remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3674
3675 foreach(lc, remote_relids)
3676 {
3677 LogicalRepRelId relid = lfirst_oid(lc);
3679
3680 rel = logicalrep_rel_open(relid, lockmode);
3682 {
3683 /*
3684 * The relation can't become interesting in the middle of the
3685 * transaction so it's safe to unlock it.
3686 */
3687 logicalrep_rel_close(rel, lockmode);
3688 continue;
3689 }
3690
3693 rels = lappend(rels, rel->localrel);
3694 relids = lappend_oid(relids, rel->localreloid);
3697
3698 /*
3699 * Truncate partitions if we got a message to truncate a partitioned
3700 * table.
3701 */
3702 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3703 {
3704 ListCell *child;
3705 List *children = find_all_inheritors(rel->localreloid,
3706 lockmode,
3707 NULL);
3708
3709 foreach(child, children)
3710 {
3711 Oid childrelid = lfirst_oid(child);
3713
3714 if (list_member_oid(relids, childrelid))
3715 continue;
3716
3717 /* find_all_inheritors already got lock */
3719
3720 /*
3721 * Ignore temp tables of other backends. See similar code in
3722 * ExecuteTruncate().
3723 */
3725 {
3726 table_close(childrel, lockmode);
3727 continue;
3728 }
3729
3731 rels = lappend(rels, childrel);
3733 relids = lappend_oid(relids, childrelid);
3734 /* Log this relation only if needed for logical decoding */
3737 }
3738 }
3739 }
3740
3741 /*
3742 * Even if we used CASCADE on the upstream primary we explicitly default
3743 * to replaying changes without further cascading. This might be later
3744 * changeable with a user specified option.
3745 *
3746 * MySubscription->runasowner tells us whether we want to execute
3747 * replication actions as the subscription owner; the last argument to
3748 * TruncateGuts tells it whether we want to switch to the table owner.
3749 * Those are exactly opposite conditions.
3750 */
3752 relids,
3755 restart_seqs,
3757 foreach(lc, remote_rels)
3758 {
3760
3762 }
3763 foreach(lc, part_rels)
3764 {
3765 Relation rel = lfirst(lc);
3766
3767 table_close(rel, NoLock);
3768 }
3769
3771}
3772
3773
3774/*
3775 * Logical replication protocol message dispatcher.
3776 */
3777void
3779{
3780 LogicalRepMsgType action = pq_getmsgbyte(s);
3782
3783 /*
3784 * Set the current command being applied. Since this function can be
3785 * called recursively when applying spooled changes, save the current
3786 * command.
3787 */
3790
3791 switch (action)
3792 {
3795 break;
3796
3799 break;
3800
3803 break;
3804
3807 break;
3808
3811 break;
3812
3815 break;
3816
3819 break;
3820
3823 break;
3824
3827 break;
3828
3830
3831 /*
3832 * Logical replication does not use generic logical messages yet.
3833 * Although, it could be used by other applications that use this
3834 * output plugin.
3835 */
3836 break;
3837
3840 break;
3841
3844 break;
3845
3848 break;
3849
3852 break;
3853
3856 break;
3857
3860 break;
3861
3864 break;
3865
3868 break;
3869
3872 break;
3873
3874 default:
3875 ereport(ERROR,
3877 errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3878 }
3879
3880 /* Reset the current command */
3882}
3883
3884/*
3885 * Figure out which write/flush positions to report to the walsender process.
3886 *
3887 * We can't simply report back the last LSN the walsender sent us because the
3888 * local transaction might not yet be flushed to disk locally. Instead we
3889 * build a list that associates local with remote LSNs for every commit. When
3890 * reporting back the flush position to the sender we iterate that list and
3891 * check which entries on it are already locally flushed. Those we can report
3892 * as having been flushed.
3893 *
3894 * The have_pending_txes is true if there are outstanding transactions that
3895 * need to be flushed.
3896 */
3897static void
3899 bool *have_pending_txes)
3900{
3901 dlist_mutable_iter iter;
3903
3905 *flush = InvalidXLogRecPtr;
3906
3908 {
3909 FlushPosition *pos =
3910 dlist_container(FlushPosition, node, iter.cur);
3911
3912 *write = pos->remote_end;
3913
3914 if (pos->local_end <= local_flush)
3915 {
3916 *flush = pos->remote_end;
3917 dlist_delete(iter.cur);
3918 pfree(pos);
3919 }
3920 else
3921 {
3922 /*
3923 * Don't want to uselessly iterate over the rest of the list which
3924 * could potentially be long. Instead get the last element and
3925 * grab the write position from there.
3926 */
3928 &lsn_mapping);
3929 *write = pos->remote_end;
3930 *have_pending_txes = true;
3931 return;
3932 }
3933 }
3934
3936}
3937
3938/*
3939 * Store current remote/local lsn pair in the tracking list.
3940 */
3941void
3943{
3945
3946 /*
3947 * Skip for parallel apply workers, because the lsn_mapping is maintained
3948 * by the leader apply worker.
3949 */
3951 return;
3952
3953 /* Need to do this in permanent context */
3955
3956 /* Track commit lsn */
3958 flushpos->local_end = local_lsn;
3959 flushpos->remote_end = remote_lsn;
3960
3963}
3964
3965
3966/* Update statistics of the worker. */
3967static void
3979
3980/*
3981 * Apply main loop.
3982 */
3983static void
3985{
3987 bool ping_sent = false;
3988 TimeLineID tli;
3989 ErrorContextCallback errcallback;
3991
3992 /*
3993 * Init the ApplyMessageContext which we clean up after each replication
3994 * protocol message.
3995 */
3997 "ApplyMessageContext",
3999
4000 /*
4001 * This memory context is used for per-stream data when the streaming mode
4002 * is enabled. This context is reset on each stream stop.
4003 */
4005 "LogicalStreamingContext",
4007
4008 /* mark as idle, before starting to loop */
4010
4011 /*
4012 * Push apply error context callback. Fields will be filled while applying
4013 * a change.
4014 */
4015 errcallback.callback = apply_error_callback;
4016 errcallback.previous = error_context_stack;
4017 error_context_stack = &errcallback;
4019
4020 /* This outer loop iterates once per wait. */
4021 for (;;)
4022 {
4024 int rc;
4025 int len;
4026 char *buf = NULL;
4027 bool endofstream = false;
4028 long wait_time;
4029
4031
4033
4035
4036 if (len != 0)
4037 {
4038 /* Loop to process all available data (without blocking). */
4039 for (;;)
4040 {
4042
4043 if (len == 0)
4044 {
4045 break;
4046 }
4047 else if (len < 0)
4048 {
4049 ereport(LOG,
4050 (errmsg("data stream from publisher has ended")));
4051 endofstream = true;
4052 break;
4053 }
4054 else
4055 {
4056 int c;
4058
4060 {
4061 ConfigReloadPending = false;
4063 }
4064
4065 /* Reset timeout. */
4067 ping_sent = false;
4068
4069 rdt_data.last_recv_time = last_recv_timestamp;
4070
4071 /* Ensure we are reading the data into our memory context. */
4073
4075
4076 c = pq_getmsgbyte(&s);
4077
4078 if (c == PqReplMsg_WALData)
4079 {
4080 XLogRecPtr start_lsn;
4081 XLogRecPtr end_lsn;
4083
4084 start_lsn = pq_getmsgint64(&s);
4085 end_lsn = pq_getmsgint64(&s);
4087
4088 if (last_received < start_lsn)
4089 last_received = start_lsn;
4090
4091 if (last_received < end_lsn)
4092 last_received = end_lsn;
4093
4095
4096 apply_dispatch(&s);
4097
4099 }
4100 else if (c == PqReplMsg_Keepalive)
4101 {
4102 XLogRecPtr end_lsn;
4104 bool reply_requested;
4105
4106 end_lsn = pq_getmsgint64(&s);
4109
4110 if (last_received < end_lsn)
4111 last_received = end_lsn;
4112
4114
4116
4118 }
4119 else if (c == PqReplMsg_PrimaryStatusUpdate)
4120 {
4121 rdt_data.remote_lsn = pq_getmsgint64(&s);
4122 rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
4124 rdt_data.reply_time = pq_getmsgint64(&s);
4125
4126 /*
4127 * This should never happen, see
4128 * ProcessStandbyPSRequestMessage. But if it happens
4129 * due to a bug, we don't want to proceed as it can
4130 * incorrectly advance oldest_nonremovable_xid.
4131 */
4132 if (!XLogRecPtrIsValid(rdt_data.remote_lsn))
4133 elog(ERROR, "cannot get the latest WAL position from the publisher");
4134
4136
4137 UpdateWorkerStats(last_received, rdt_data.reply_time, false);
4138 }
4139 /* other message types are purposefully ignored */
4140
4142 }
4143
4145 }
4146 }
4147
4148 /* confirm all writes so far */
4149 send_feedback(last_received, false, false);
4150
4151 /* Reset the timestamp if no message was received */
4152 rdt_data.last_recv_time = 0;
4153
4155
4157 {
4158 /*
4159 * If we didn't get any transactions for a while there might be
4160 * unconsumed invalidation messages in the queue, consume them
4161 * now.
4162 */
4165
4166 /*
4167 * Process any relations that are being synchronized in parallel
4168 * and any newly added tables or sequences.
4169 */
4171 }
4172
4173 /* Cleanup the memory. */
4176
4177 /* Check if we need to exit the streaming loop. */
4178 if (endofstream)
4179 break;
4180
4181 /*
4182 * Wait for more data or latch. If we have unflushed transactions,
4183 * wake up after WalWriterDelay to see if they've been flushed yet (in
4184 * which case we should send a feedback message). Otherwise, there's
4185 * no particular urgency about waking up unless we get data or a
4186 * signal.
4187 */
4190 else
4192
4193 /*
4194 * Ensure to wake up when it's possible to advance the non-removable
4195 * transaction ID, or when the retention duration may have exceeded
4196 * max_retention_duration.
4197 */
4199 {
4200 if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
4201 rdt_data.xid_advance_interval)
4202 wait_time = Min(wait_time, rdt_data.xid_advance_interval);
4203 else if (MySubscription->maxretention > 0)
4205 }
4206
4210 fd, wait_time,
4212
4213 if (rc & WL_LATCH_SET)
4214 {
4217 }
4218
4220 {
4221 ConfigReloadPending = false;
4223 }
4224
4225 if (rc & WL_TIMEOUT)
4226 {
4227 /*
4228 * We didn't receive anything new. If we haven't heard anything
4229 * from the server for more than wal_receiver_timeout / 2, ping
4230 * the server. Also, if it's been longer than
4231 * wal_receiver_status_interval since the last update we sent,
4232 * send a status update to the primary anyway, to report any
4233 * progress in applying WAL.
4234 */
4235 bool requestReply = false;
4236
4237 /*
4238 * Check if time since last receive from primary has reached the
4239 * configured limit.
4240 */
4241 if (wal_receiver_timeout > 0)
4242 {
4245
4246 timeout =
4249
4250 if (now >= timeout)
4251 ereport(ERROR,
4253 errmsg("terminating logical replication worker due to timeout")));
4254
4255 /* Check to see if it's time for a ping. */
4256 if (!ping_sent)
4257 {
4259 (wal_receiver_timeout / 2));
4260 if (now >= timeout)
4261 {
4262 requestReply = true;
4263 ping_sent = true;
4264 }
4265 }
4266 }
4267
4269
4271
4272 /*
4273 * Force reporting to ensure long idle periods don't lead to
4274 * arbitrarily delayed stats. Stats can only be reported outside
4275 * of (implicit or explicit) transactions. That shouldn't lead to
4276 * stats being delayed for long, because transactions are either
4277 * sent as a whole on commit or streamed. Streamed transactions
4278 * are spilled to disk and applied on commit.
4279 */
4280 if (!IsTransactionState())
4281 pgstat_report_stat(true);
4282 }
4283 }
4284
4285 /* Pop the error context stack */
4286 error_context_stack = errcallback.previous;
4288
4289 /* All done */
4291}
4292
4293/*
4294 * Send a Standby Status Update message to server.
4295 *
4296 * 'recvpos' is the latest LSN we've received data to, force is set if we need
4297 * to send a response to avoid timeouts.
4298 */
4299static void
4301{
4302 static StringInfo reply_message = NULL;
4303 static TimestampTz send_time = 0;
4304
4307
4311 bool have_pending_txes;
4312
4313 /*
4314 * If the user doesn't want status to be reported to the publisher, be
4315 * sure to exit before doing anything at all.
4316 */
4317 if (!force && wal_receiver_status_interval <= 0)
4318 return;
4319
4320 /* It's legal to not pass a recvpos */
4321 if (recvpos < last_recvpos)
4323
4325
4326 /*
4327 * No outstanding transactions to flush, we can report the latest received
4328 * position. This is important for synchronous replication.
4329 */
4330 if (!have_pending_txes)
4332
4333 if (writepos < last_writepos)
4335
4336 if (flushpos < last_flushpos)
4338
4340
4341 /* if we've already reported everything we're good */
4342 if (!force &&
4347 return;
4348 send_time = now;
4349
4350 if (!reply_message)
4351 {
4353
4356 }
4357 else
4359
4361 pq_sendint64(reply_message, recvpos); /* write */
4362 pq_sendint64(reply_message, flushpos); /* flush */
4363 pq_sendint64(reply_message, writepos); /* apply */
4364 pq_sendint64(reply_message, now); /* sendTime */
4365 pq_sendbyte(reply_message, requestReply); /* replyRequested */
4366
4367 elog(DEBUG2, "sending feedback (force %d) to recv %X/%08X, write %X/%08X, flush %X/%08X",
4368 force,
4372
4375
4376 if (recvpos > last_recvpos)
4378 if (writepos > last_writepos)
4380 if (flushpos > last_flushpos)
4382}
4383
4384/*
4385 * Attempt to advance the non-removable transaction ID.
4386 *
4387 * See comments atop worker.c for details.
4388 */
4389static void
4398
4399/*
4400 * Preliminary check to determine if advancing the non-removable transaction ID
4401 * is allowed.
4402 */
4403static bool
4405{
4406 /*
4407 * It is sufficient to manage non-removable transaction ID for a
4408 * subscription by the main apply worker to detect update_deleted reliably
4409 * even for table sync or parallel apply workers.
4410 */
4412 return false;
4413
4414 /* No need to advance if retaining dead tuples is not required */
4416 return false;
4417
4418 return true;
4419}
4420
4421/*
4422 * Process phase transitions during the non-removable transaction ID
4423 * advancement. See comments atop worker.c for details of the transition.
4424 */
4425static void
4451
4452/*
4453 * Workhorse for the RDT_GET_CANDIDATE_XID phase.
4454 */
4455static void
4457{
4460
4461 /*
4462 * Use last_recv_time when applying changes in the loop to avoid
4463 * unnecessary system time retrieval. If last_recv_time is not available,
4464 * obtain the current timestamp.
4465 */
4466 now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4467
4468 /*
4469 * Compute the candidate_xid and request the publisher status at most once
4470 * per xid_advance_interval. Refer to adjust_xid_advance_interval() for
4471 * details on how this value is dynamically adjusted. This is to avoid
4472 * using CPU and network resources without making much progress.
4473 */
4474 if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4475 rdt_data->xid_advance_interval))
4476 return;
4477
4478 /*
4479 * Immediately update the timer, even if the function returns later
4480 * without setting candidate_xid due to inactivity on the subscriber. This
4481 * avoids frequent calls to GetOldestActiveTransactionId.
4482 */
4483 rdt_data->candidate_xid_time = now;
4484
4485 /*
4486 * Consider transactions in the current database, as only dead tuples from
4487 * this database are required for conflict detection.
4488 */
4490
4491 /*
4492 * Oldest active transaction ID (oldest_running_xid) can't be behind any
4493 * of its previously computed value.
4494 */
4497
4498 /* Return if the oldest_nonremovable_xid cannot be advanced */
4501 {
4503 return;
4504 }
4505
4507
4508 rdt_data->candidate_xid = oldest_running_xid;
4510
4511 /* process the next phase */
4513}
4514
4515/*
4516 * Workhorse for the RDT_REQUEST_PUBLISHER_STATUS phase.
4517 */
4518static void
4520{
4522
4523 if (!request_message)
4524 {
4526
4529 }
4530 else
4532
4533 /*
4534 * Send the current time to update the remote walsender's latest reply
4535 * message received time.
4536 */
4539
4540 elog(DEBUG2, "sending publisher status request message");
4541
4542 /* Send a request for the publisher status */
4544 request_message->data, request_message->len);
4545
4547
4548 /*
4549 * Skip calling maybe_advance_nonremovable_xid() since further transition
4550 * is possible only once we receive the publisher status message.
4551 */
4552}
4553
4554/*
4555 * Workhorse for the RDT_WAIT_FOR_PUBLISHER_STATUS phase.
4556 */
4557static void
4559 bool status_received)
4560{
4561 /*
4562 * Return if we have requested but not yet received the publisher status.
4563 */
4564 if (!status_received)
4565 return;
4566
4567 /*
4568 * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4569 * retaining conflict information for this worker.
4570 */
4572 {
4574 return;
4575 }
4576
4577 if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
4578 rdt_data->remote_wait_for = rdt_data->remote_nextxid;
4579
4580 /*
4581 * Check if all remote concurrent transactions that were active at the
4582 * first status request have now completed. If completed, proceed to the
4583 * next phase; otherwise, continue checking the publisher status until
4584 * these transactions finish.
4585 *
4586 * It's possible that transactions in the commit phase during the last
4587 * cycle have now finished committing, but remote_oldestxid remains older
4588 * than remote_wait_for. This can happen if some old transaction came in
4589 * the commit phase when we requested status in this cycle. We do not
4590 * handle this case explicitly as it's rare and the benefit doesn't
4591 * justify the required complexity. Tracking would require either caching
4592 * all xids at the publisher or sending them to subscribers. The condition
4593 * will resolve naturally once the remaining transactions are finished.
4594 *
4595 * Directly advancing the non-removable transaction ID is possible if
4596 * there are no activities on the publisher since the last advancement
4597 * cycle. However, it requires maintaining two fields, last_remote_nextxid
4598 * and last_remote_lsn, within the structure for comparison with the
4599 * current cycle's values. Considering the minimal cost of continuing in
4600 * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to
4601 * advance the transaction ID here.
4602 */
4603 if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for,
4604 rdt_data->remote_oldestxid))
4606 else
4608
4609 /* process the next phase */
4611}
4612
4613/*
4614 * Workhorse for the RDT_WAIT_FOR_LOCAL_FLUSH phase.
4615 */
4616static void
4618{
4619 Assert(XLogRecPtrIsValid(rdt_data->remote_lsn) &&
4620 TransactionIdIsValid(rdt_data->candidate_xid));
4621
4622 /*
4623 * We expect the publisher and subscriber clocks to be in sync using time
4624 * sync service like NTP. Otherwise, we will advance this worker's
4625 * oldest_nonremovable_xid prematurely, leading to the removal of rows
4626 * required to detect update_deleted reliably. This check primarily
4627 * addresses scenarios where the publisher's clock falls behind; if the
4628 * publisher's clock is ahead, subsequent transactions will naturally bear
4629 * later commit timestamps, conforming to the design outlined atop
4630 * worker.c.
4631 *
4632 * XXX Consider waiting for the publisher's clock to catch up with the
4633 * subscriber's before proceeding to the next phase.
4634 */
4635 if (TimestampDifferenceExceeds(rdt_data->reply_time,
4636 rdt_data->candidate_xid_time, 0))
4637 ereport(ERROR,
4638 errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"),
4639 errdetail_internal("The clock on the publisher is behind that of the subscriber."));
4640
4641 /*
4642 * Do not attempt to advance the non-removable transaction ID when table
4643 * sync is in progress. During this time, changes from a single
4644 * transaction may be applied by multiple table sync workers corresponding
4645 * to the target tables. So, it's necessary for all table sync workers to
4646 * apply and flush the corresponding changes before advancing the
4647 * transaction ID, otherwise, dead tuples that are still needed for
4648 * conflict detection in table sync workers could be removed prematurely.
4649 * However, confirming the apply and flush progress across all table sync
4650 * workers is complex and not worth the effort, so we simply return if not
4651 * all tables are in the READY state.
4652 *
4653 * Advancing the transaction ID is necessary even when no tables are
4654 * currently subscribed, to avoid retaining dead tuples unnecessarily.
4655 * While it might seem safe to skip all phases and directly assign
4656 * candidate_xid to oldest_nonremovable_xid during the
4657 * RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users
4658 * concurrently add tables to the subscription, the apply worker may not
4659 * process invalidations in time. Consequently,
4660 * HasSubscriptionTablesCached() might miss the new tables, leading to
4661 * premature advancement of oldest_nonremovable_xid.
4662 *
4663 * Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as
4664 * invalidations are guaranteed to be processed before applying changes
4665 * from newly added tables while waiting for the local flush to reach
4666 * remote_lsn.
4667 *
4668 * Additionally, even if we check for subscription tables during
4669 * RDT_GET_CANDIDATE_XID, they might be dropped before reaching
4670 * RDT_WAIT_FOR_LOCAL_FLUSH. Therefore, it's still necessary to verify
4671 * subscription tables at this stage to prevent unnecessary tuple
4672 * retention.
4673 */
4675 {
4677
4678 now = rdt_data->last_recv_time
4679 ? rdt_data->last_recv_time : GetCurrentTimestamp();
4680
4681 /*
4682 * Record the time spent waiting for table sync, it is needed for the
4683 * timeout check in should_stop_conflict_info_retention().
4684 */
4685 rdt_data->table_sync_wait_time =
4686 TimestampDifferenceMilliseconds(rdt_data->candidate_xid_time, now);
4687
4688 return;
4689 }
4690
4691 /*
4692 * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4693 * retaining conflict information for this worker.
4694 */
4696 {
4698 return;
4699 }
4700
4701 /*
4702 * Update and check the remote flush position if we are applying changes
4703 * in a loop. This is done at most once per WalWriterDelay to avoid
4704 * performing costly operations in get_flush_position() too frequently
4705 * during change application.
4706 */
4707 if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time &&
4708 TimestampDifferenceExceeds(rdt_data->flushpos_update_time,
4709 rdt_data->last_recv_time, WalWriterDelay))
4710 {
4713 bool have_pending_txes;
4714
4715 /* Fetch the latest remote flush position */
4717
4718 if (flushpos > last_flushpos)
4720
4721 rdt_data->flushpos_update_time = rdt_data->last_recv_time;
4722 }
4723
4724 /* Return to wait for the changes to be applied */
4725 if (last_flushpos < rdt_data->remote_lsn)
4726 return;
4727
4728 /*
4729 * Reaching this point implies should_stop_conflict_info_retention()
4730 * returned false earlier, meaning that the most recent duration for
4731 * advancing the non-removable transaction ID is within the
4732 * max_retention_duration or max_retention_duration is set to 0.
4733 *
4734 * Therefore, if conflict info retention was previously stopped due to a
4735 * timeout, it is now safe to resume retention.
4736 */
4738 {
4740 return;
4741 }
4742
4743 /*
4744 * Reaching here means the remote WAL position has been received, and all
4745 * transactions up to that position on the publisher have been applied and
4746 * flushed locally. So, we can advance the non-removable transaction ID.
4747 */
4751
4752 elog(DEBUG2, "confirmed flush up to remote lsn %X/%08X: new oldest_nonremovable_xid %u",
4753 LSN_FORMAT_ARGS(rdt_data->remote_lsn),
4754 rdt_data->candidate_xid);
4755
4756 /* Notify launcher to update the xmin of the conflict slot */
4758
4760
4761 /* process the next phase */
4763}
4764
4765/*
4766 * Check whether conflict information retention should be stopped due to
4767 * exceeding the maximum wait time (max_retention_duration).
4768 *
4769 * If retention should be stopped, return true. Otherwise, return false.
4770 */
4771static bool
4773{
4775
4776 Assert(TransactionIdIsValid(rdt_data->candidate_xid));
4779
4781 return false;
4782
4783 /*
4784 * Use last_recv_time when applying changes in the loop to avoid
4785 * unnecessary system time retrieval. If last_recv_time is not available,
4786 * obtain the current timestamp.
4787 */
4788 now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4789
4790 /*
4791 * Return early if the wait time has not exceeded the configured maximum
4792 * (max_retention_duration). Time spent waiting for table synchronization
4793 * is excluded from this calculation, as it occurs infrequently.
4794 */
4795 if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4797 rdt_data->table_sync_wait_time))
4798 return false;
4799
4800 return true;
4801}
4802
4803/*
4804 * Workhorse for the RDT_STOP_CONFLICT_INFO_RETENTION phase.
4805 */
4806static void
4808{
4809 /* Stop retention if not yet */
4811 {
4812 /*
4813 * If the retention status cannot be updated (e.g., due to active
4814 * transaction), skip further processing to avoid inconsistent
4815 * retention behavior.
4816 */
4817 if (!update_retention_status(false))
4818 return;
4819
4823
4824 ereport(LOG,
4825 errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
4827 errdetail("Retention is stopped because the apply process has not caught up with the publisher within the configured max_retention_duration."));
4828 }
4829
4831
4832 /*
4833 * If retention has been stopped, reset to the initial phase to retry
4834 * resuming retention. This reset is required to recalculate the current
4835 * wait time and resume retention if the time falls within
4836 * max_retention_duration.
4837 */
4839}
4840
4841/*
4842 * Workhorse for the RDT_RESUME_CONFLICT_INFO_RETENTION phase.
4843 */
4844static void
4846{
4847 /* We can't resume retention without updating retention status. */
4848 if (!update_retention_status(true))
4849 return;
4850
4851 ereport(LOG,
4852 errmsg("logical replication worker for subscription \"%s\" will resume retaining the information for detecting conflicts",
4855 ? errdetail("Retention is re-enabled because the apply process has caught up with the publisher within the configured max_retention_duration.")
4856 : errdetail("Retention is re-enabled because max_retention_duration has been set to unlimited."));
4857
4858 /*
4859 * Restart the worker to let the launcher initialize
4860 * oldest_nonremovable_xid at startup.
4861 *
4862 * While it's technically possible to derive this value on-the-fly using
4863 * the conflict detection slot's xmin, doing so risks a race condition:
4864 * the launcher might clean slot.xmin just after retention resumes. This
4865 * would make oldest_nonremovable_xid unreliable, especially during xid
4866 * wraparound.
4867 *
4868 * Although this can be prevented by introducing heavy weight locking, the
4869 * complexity it will bring doesn't seem worthwhile given how rarely
4870 * retention is resumed.
4871 */
4873}
4874
4875/*
4876 * Updates pg_subscription.subretentionactive to the given value within a
4877 * new transaction.
4878 *
4879 * If already inside an active transaction, skips the update and returns
4880 * false.
4881 *
4882 * Returns true if the update is successfully performed.
4883 */
4884static bool
4886{
4887 /*
4888 * Do not update the catalog during an active transaction. The transaction
4889 * may be started during change application, leading to a possible
4890 * rollback of catalog updates if the application fails subsequently.
4891 */
4892 if (IsTransactionState())
4893 return false;
4894
4896
4897 /*
4898 * Updating pg_subscription might involve TOAST table access, so ensure we
4899 * have a valid snapshot.
4900 */
4902
4903 /* Update pg_subscription.subretentionactive */
4905
4908
4909 /* Notify launcher to update the conflict slot */
4911
4913
4914 return true;
4915}
4916
4917/*
4918 * Reset all data fields of RetainDeadTuplesData except those used to
4919 * determine the timing for the next round of transaction ID advancement. We
4920 * can even use flushpos_update_time in the next round to decide whether to get
4921 * the latest flush position.
4922 */
4923static void
4925{
4927 rdt_data->remote_lsn = InvalidXLogRecPtr;
4928 rdt_data->remote_oldestxid = InvalidFullTransactionId;
4929 rdt_data->remote_nextxid = InvalidFullTransactionId;
4930 rdt_data->reply_time = 0;
4931 rdt_data->remote_wait_for = InvalidFullTransactionId;
4932 rdt_data->candidate_xid = InvalidTransactionId;
4933 rdt_data->table_sync_wait_time = 0;
4934}
4935
4936/*
4937 * Adjust the interval for advancing non-removable transaction IDs.
4938 *
4939 * If there is no activity on the node or retention has been stopped, we
4940 * progressively double the interval used to advance non-removable transaction
4941 * ID. This helps conserve CPU and network resources when there's little benefit
4942 * to frequent updates.
4943 *
4944 * The interval is capped by the lowest of the following:
4945 * - wal_receiver_status_interval (if set and retention is active),
4946 * - a default maximum of 3 minutes,
4947 * - max_retention_duration (if retention is active).
4948 *
4949 * This ensures the interval never exceeds the retention boundary, even if other
4950 * limits are higher. Once activity resumes on the node and the retention is
4951 * active, the interval is reset to lesser of 100ms and max_retention_duration,
4952 * allowing timely advancement of non-removable transaction ID.
4953 *
4954 * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
4955 * consider the other interval or a separate GUC if the need arises.
4956 */
4957static void
4959{
4960 if (rdt_data->xid_advance_interval && !new_xid_found)
4961 {
4965
4966 /*
4967 * No new transaction ID has been assigned since the last check, so
4968 * double the interval, but not beyond the maximum allowable value.
4969 */
4970 rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4971 max_interval);
4972 }
4973 else if (rdt_data->xid_advance_interval &&
4975 {
4976 /*
4977 * Retention has been stopped, so double the interval-capped at a
4978 * maximum of 3 minutes. The wal_receiver_status_interval is
4979 * intentionally not used as a upper bound, since the likelihood of
4980 * retention resuming is lower than that of general activity resuming.
4981 */
4982 rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4984 }
4985 else
4986 {
4987 /*
4988 * A new transaction ID was found or the interval is not yet
4989 * initialized, so set the interval to the minimum value.
4990 */
4991 rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
4992 }
4993
4994 /*
4995 * Ensure the wait time remains within the maximum retention time limit
4996 * when retention is active.
4997 */
4999 rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
5001}
5002
5003/*
5004 * Exit routine for apply workers due to subscription parameter changes.
5005 */
5006static void
5008{
5010 {
5011 /*
5012 * Don't stop the parallel apply worker as the leader will detect the
5013 * subscription parameter change and restart logical replication later
5014 * anyway. This also prevents the leader from reporting errors when
5015 * trying to communicate with a stopped parallel apply worker, which
5016 * would accidentally disable subscriptions if disable_on_error was
5017 * set.
5018 */
5019 return;
5020 }
5021
5022 /*
5023 * Reset the last-start time for this apply worker so that the launcher
5024 * will restart it without waiting for wal_retrieve_retry_interval if the
5025 * subscription is still active, and so that we won't leak that hash table
5026 * entry if it isn't.
5027 */
5030
5031 proc_exit(0);
5032}
5033
5034/*
5035 * Reread subscription info if needed.
5036 *
5037 * For significant changes, we react by exiting the current process; a new
5038 * one will be launched afterwards if needed.
5039 */
5040void
5042{
5045 bool started_tx = false;
5046
5047 /* When cache state is valid there is nothing to do here. */
5049 return;
5050
5051 /* This function might be called inside or outside of transaction. */
5052 if (!IsTransactionState())
5053 {
5055 started_tx = true;
5056 }
5057
5058 /* Ensure allocations in permanent context. */
5060
5062
5063 /*
5064 * Exit if the subscription was removed. This normally should not happen
5065 * as the worker gets killed during DROP SUBSCRIPTION.
5066 */
5067 if (!newsub)
5068 {
5069 ereport(LOG,
5070 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
5071 MySubscription->name)));
5072
5073 /* Ensure we remove no-longer-useful entry for worker's start time */
5076
5077 proc_exit(0);
5078 }
5079
5080 /* Exit if the subscription was disabled. */
5081 if (!newsub->enabled)
5082 {
5083 ereport(LOG,
5084 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
5085 MySubscription->name)));
5086
5088 }
5089
5090 /* !slotname should never happen when enabled is true. */
5091 Assert(newsub->slotname);
5092
5093 /* two-phase cannot be altered while the worker is running */
5094 Assert(newsub->twophasestate == MySubscription->twophasestate);
5095
5096 /*
5097 * Exit if any parameter that affects the remote connection was changed.
5098 * The launcher will start a new worker but note that the parallel apply
5099 * worker won't restart if the streaming option's value is changed from
5100 * 'parallel' to any other value or the server decides not to stream the
5101 * in-progress transaction.
5102 */
5103 if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
5104 strcmp(newsub->name, MySubscription->name) != 0 ||
5105 strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
5106 newsub->binary != MySubscription->binary ||
5107 newsub->stream != MySubscription->stream ||
5108 newsub->passwordrequired != MySubscription->passwordrequired ||
5109 strcmp(newsub->origin, MySubscription->origin) != 0 ||
5110 newsub->owner != MySubscription->owner ||
5111 !equal(newsub->publications, MySubscription->publications))
5112 {
5114 ereport(LOG,
5115 (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
5116 MySubscription->name)));
5117 else
5118 ereport(LOG,
5119 (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
5120 MySubscription->name)));
5121
5123 }
5124
5125 /*
5126 * Exit if the subscription owner's superuser privileges have been
5127 * revoked.
5128 */
5129 if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
5130 {
5132 ereport(LOG,
5133 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
5135 else
5136 ereport(LOG,
5137 errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
5139
5141 }
5142
5143 /* Check for other changes that should never happen too. */
5144 if (newsub->dbid != MySubscription->dbid)
5145 {
5146 elog(ERROR, "subscription %u changed unexpectedly",
5148 }
5149
5150 /* Clean old subscription info and switch to new one. */
5153
5155
5156 /* Change synchronous commit according to the user's wishes */
5157 SetConfigOption("synchronous_commit", MySubscription->synccommit,
5159
5160 /* Change wal_receiver_timeout according to the user's wishes */
5162
5163 if (started_tx)
5165
5166 MySubscriptionValid = true;
5167}
5168
5169/*
5170 * Change wal_receiver_timeout to MySubscription->walrcvtimeout.
5171 */
5172static void
5174{
5175 bool parsed;
5176 int val;
5178
5179 /*
5180 * Set the wal_receiver_timeout GUC to MySubscription->walrcvtimeout,
5181 * which comes from the subscription's wal_receiver_timeout option. If the
5182 * value is -1, reset the GUC to its default, meaning it will inherit from
5183 * the server config, command line, or role/database settings.
5184 */
5186 if (parsed && val == -1)
5187 SetConfigOption("wal_receiver_timeout", NULL,
5189 else
5190 SetConfigOption("wal_receiver_timeout", MySubscription->walrcvtimeout,
5192
5193 /*
5194 * Log the wal_receiver_timeout setting (in milliseconds) as a debug
5195 * message when it changes, to verify it was set correctly.
5196 */
5198 elog(DEBUG1, "logical replication worker for subscription \"%s\" wal_receiver_timeout: %d ms",
5200}
5201
5202/*
5203 * Callback from subscription syscache invalidation.
5204 */
5205static void
5210
5211/*
5212 * subxact_info_write
5213 * Store information about subxacts for a toplevel transaction.
5214 *
5215 * For each subxact we store offset of its first change in the main file.
5216 * The file is always over-written as a whole.
5217 *
5218 * XXX We should only store subxacts that were not aborted yet.
5219 */
5220static void
5222{
5223 char path[MAXPGPATH];
5224 Size len;
5225 BufFile *fd;
5226
5228
5229 /* construct the subxact filename */
5230 subxact_filename(path, subid, xid);
5231
5232 /* Delete the subxacts file, if exists. */
5233 if (subxact_data.nsubxacts == 0)
5234 {
5237
5238 return;
5239 }
5240
5241 /*
5242 * Create the subxact file if it not already created, otherwise open the
5243 * existing file.
5244 */
5246 true);
5247 if (fd == NULL)
5249
5251
5252 /* Write the subxact count and subxact info */
5255
5257
5258 /* free the memory allocated for subxact info */
5260}
5261
5262/*
5263 * subxact_info_read
5264 * Restore information about subxacts of a streamed transaction.
5265 *
5266 * Read information about subxacts into the structure subxact_data that can be
5267 * used later.
5268 */
5269static void
5271{
5272 char path[MAXPGPATH];
5273 Size len;
5274 BufFile *fd;
5276
5280
5281 /*
5282 * If the subxact file doesn't exist that means we don't have any subxact
5283 * info.
5284 */
5285 subxact_filename(path, subid, xid);
5287 true);
5288 if (fd == NULL)
5289 return;
5290
5291 /* read number of subxact items */
5293
5295
5296 /* we keep the maximum as a power of 2 */
5298
5299 /*
5300 * Allocate subxact information in the logical streaming context. We need
5301 * this information during the complete stream so that we can add the sub
5302 * transaction info to this. On stream stop we will flush this information
5303 * to the subxact file and reset the logical streaming context.
5304 */
5307 sizeof(SubXactInfo));
5309
5310 if (len > 0)
5312
5314}
5315
5316/*
5317 * subxact_info_add
5318 * Add information about a subxact (offset in the main file).
5319 */
5320static void
5322{
5323 SubXactInfo *subxacts = subxact_data.subxacts;
5324 int64 i;
5325
5326 /* We must have a valid top level stream xid and a stream fd. */
5328 Assert(stream_fd != NULL);
5329
5330 /*
5331 * If the XID matches the toplevel transaction, we don't want to add it.
5332 */
5333 if (stream_xid == xid)
5334 return;
5335
5336 /*
5337 * In most cases we're checking the same subxact as we've already seen in
5338 * the last call, so make sure to ignore it (this change comes later).
5339 */
5340 if (subxact_data.subxact_last == xid)
5341 return;
5342
5343 /* OK, remember we're processing this XID. */
5345
5346 /*
5347 * Check if the transaction is already present in the array of subxact. We
5348 * intentionally scan the array from the tail, because we're likely adding
5349 * a change for the most recent subtransactions.
5350 *
5351 * XXX Can we rely on the subxact XIDs arriving in sorted order? That
5352 * would allow us to use binary search here.
5353 */
5354 for (i = subxact_data.nsubxacts; i > 0; i--)
5355 {
5356 /* found, so we're done */
5357 if (subxacts[i - 1].xid == xid)
5358 return;
5359 }
5360
5361 /* This is a new subxact, so we need to add it to the array. */
5362 if (subxact_data.nsubxacts == 0)
5363 {
5365
5367
5368 /*
5369 * Allocate this memory for subxacts in per-stream context, see
5370 * subxact_info_read.
5371 */
5373 subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
5375 }
5377 {
5379 subxacts = repalloc(subxacts,
5381 }
5382
5383 subxacts[subxact_data.nsubxacts].xid = xid;
5384
5385 /*
5386 * Get the current offset of the stream file and store it as offset of
5387 * this subxact.
5388 */
5390 &subxacts[subxact_data.nsubxacts].fileno,
5391 &subxacts[subxact_data.nsubxacts].offset);
5392
5394 subxact_data.subxacts = subxacts;
5395}
5396
5397/* format filename for file containing the info about subxacts */
5398static inline void
5399subxact_filename(char *path, Oid subid, TransactionId xid)
5400{
5401 snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
5402}
5403
5404/* format filename for file containing serialized changes */
5405static inline void
5406changes_filename(char *path, Oid subid, TransactionId xid)
5407{
5408 snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
5409}
5410
5411/*
5412 * stream_cleanup_files
5413 * Cleanup files for a subscription / toplevel transaction.
5414 *
5415 * Remove files with serialized changes and subxact info for a particular
5416 * toplevel transaction. Each subscription has a separate set of files
5417 * for any toplevel transaction.
5418 */
5419void
5421{
5422 char path[MAXPGPATH];
5423
5424 /* Delete the changes file. */
5425 changes_filename(path, subid, xid);
5427
5428 /* Delete the subxact file, if it exists. */
5429 subxact_filename(path, subid, xid);
5431}
5432
5433/*
5434 * stream_open_file
5435 * Open a file that we'll use to serialize changes for a toplevel
5436 * transaction.
5437 *
5438 * Open a file for streamed changes from a toplevel transaction identified
5439 * by stream_xid (global variable). If it's the first chunk of streamed
5440 * changes for this transaction, create the buffile, otherwise open the
5441 * previously created file.
5442 */
5443static void
5445{
5446 char path[MAXPGPATH];
5448
5449 Assert(OidIsValid(subid));
5451 Assert(stream_fd == NULL);
5452
5453
5454 changes_filename(path, subid, xid);
5455 elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
5456
5457 /*
5458 * Create/open the buffiles under the logical streaming context so that we
5459 * have those files until stream stop.
5460 */
5462
5463 /*
5464 * If this is the first streamed segment, create the changes file.
5465 * Otherwise, just open the file for writing, in append mode.
5466 */
5467 if (first_segment)
5469 path);
5470 else
5471 {
5472 /*
5473 * Open the file and seek to the end of the file because we always
5474 * append the changes file.
5475 */
5477 path, O_RDWR, false);
5479 }
5480
5482}
5483
5484/*
5485 * stream_close_file
5486 * Close the currently open file with streamed changes.
5487 */
5488static void
5490{
5491 Assert(stream_fd != NULL);
5492
5494
5495 stream_fd = NULL;
5496}
5497
5498/*
5499 * stream_write_change
5500 * Serialize a change to a file for the current toplevel transaction.
5501 *
5502 * The change is serialized in a simple format, with length (not including
5503 * the length), action code (identifying the message type) and message
5504 * contents (without the subxact TransactionId value).
5505 */
5506static void
5508{
5509 int len;
5510
5511 Assert(stream_fd != NULL);
5512
5513 /* total on-disk size, including the action type character */
5514 len = (s->len - s->cursor) + sizeof(char);
5515
5516 /* first write the size */
5517 BufFileWrite(stream_fd, &len, sizeof(len));
5518
5519 /* then the action */
5520 BufFileWrite(stream_fd, &action, sizeof(action));
5521
5522 /* and finally the remaining part of the buffer (after the XID) */
5523 len = (s->len - s->cursor);
5524
5526}
5527
5528/*
5529 * stream_open_and_write_change
5530 * Serialize a message to a file for the given transaction.
5531 *
5532 * This function is similar to stream_write_change except that it will open the
5533 * target file if not already before writing the message and close the file at
5534 * the end.
5535 */
5536static void
5538{
5540
5541 if (!stream_fd)
5542 stream_start_internal(xid, false);
5543
5544 stream_write_change(action, s);
5546}
5547
5548/*
5549 * Sets streaming options including replication slot name and origin start
5550 * position. Workers need these options for logical replication.
5551 */
5552void
5554 char *slotname,
5556{
5557 int server_version;
5558
5559 options->logical = true;
5560 options->startpoint = *origin_startpos;
5561 options->slotname = slotname;
5562
5564 options->proto.logical.proto_version =
5569
5570 options->proto.logical.publication_names = MySubscription->publications;
5571 options->proto.logical.binary = MySubscription->binary;
5572
5573 /*
5574 * Assign the appropriate option value for streaming option according to
5575 * the 'streaming' mode and the publisher's ability to support that mode.
5576 */
5577 if (server_version >= 160000 &&
5579 {
5580 options->proto.logical.streaming_str = "parallel";
5582 }
5583 else if (server_version >= 140000 &&
5585 {
5586 options->proto.logical.streaming_str = "on";
5588 }
5589 else
5590 {
5591 options->proto.logical.streaming_str = NULL;
5593 }
5594
5595 options->proto.logical.twophase = false;
5596 options->proto.logical.origin = pstrdup(MySubscription->origin);
5597}
5598
5599/*
5600 * Cleanup the memory for subxacts and reset the related variables.
5601 */
5602static inline void
5613
5614/*
5615 * Common function to run the apply loop with error handling. Disable the
5616 * subscription, if necessary.
5617 *
5618 * Note that we don't handle FATAL errors which are probably because
5619 * of system resource error and are not repeatable.
5620 */
5621void
5623{
5624 PG_TRY();
5625 {
5627 }
5628 PG_CATCH();
5629 {
5630 /*
5631 * Reset the origin state to prevent the advancement of origin
5632 * progress if we fail to apply. Otherwise, this will result in
5633 * transaction loss as that transaction won't be sent again by the
5634 * server.
5635 */
5637
5640 else
5641 {
5642 /*
5643 * Report the worker failed while applying changes. Abort the
5644 * current transaction so that the stats message is sent in an
5645 * idle state.
5646 */
5649
5650 PG_RE_THROW();
5651 }
5652 }
5653 PG_END_TRY();
5654}
5655
5656/*
5657 * Runs the leader apply worker.
5658 *
5659 * It sets up replication origin, streaming options and then starts streaming.
5660 */
5661static void
5663{
5664 char originname[NAMEDATALEN];
5666 char *slotname = NULL;
5669 TimeLineID startpointTLI;
5670 char *err;
5671 bool must_use_password;
5672
5673 slotname = MySubscription->slotname;
5674
5675 /*
5676 * This shouldn't happen if the subscription is enabled, but guard against
5677 * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
5678 * slot is NULL.)
5679 */
5680 if (!slotname)
5681 ereport(ERROR,
5683 errmsg("subscription has no replication slot set")));
5684
5685 /* Setup replication origin tracking. */
5687 originname, sizeof(originname));
5690 if (!OidIsValid(originid))
5696
5697 /* Is the use of a password mandatory? */
5700
5702 true, must_use_password,
5704
5706 ereport(ERROR,
5708 errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
5709 MySubscription->name, err)));
5710
5711 /*
5712 * We don't really use the output identify_system for anything but it does
5713 * some initializations on the upstream so let's still call it.
5714 */
5716
5718
5720
5721 /*
5722 * Even when the two_phase mode is requested by the user, it remains as
5723 * the tri-state PENDING until all tablesyncs have reached READY state.
5724 * Only then, can it become ENABLED.
5725 *
5726 * Note: If the subscription has no tables then leave the state as
5727 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
5728 * work.
5729 */
5732 {
5733 /* Start streaming with two_phase enabled */
5734 options.proto.logical.twophase = true;
5736
5738
5739 /*
5740 * Updating pg_subscription might involve TOAST table access, so
5741 * ensure we have a valid snapshot.
5742 */
5744
5749 }
5750 else
5751 {
5753 }
5754
5756 (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
5761 "?")));
5762
5763 /* Run the main loop. */
5765}
5766
5767/*
5768 * Common initialization for leader apply worker, parallel apply worker,
5769 * tablesync worker and sequencesync worker.
5770 *
5771 * Initialize the database connection, in-memory subscription and necessary
5772 * config options.
5773 */
5774void
5776{
5778
5779 /* Run as replica session replication role. */
5780 SetConfigOption("session_replication_role", "replica",
5782
5783 /* Connect to our database. */
5786 0);
5787
5788 /*
5789 * Set always-secure search path, so malicious users can't redirect user
5790 * code (e.g. pg_index.indexprs).
5791 */
5792 SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
5793
5794 /* Load the subscription into persistent memory context. */
5796 "ApplyContext",
5800
5801 /*
5802 * Lock the subscription to prevent it from being concurrently dropped,
5803 * then re-verify its existence. After the initialization, the worker will
5804 * be terminated gracefully if the subscription is dropped.
5805 */
5809 if (!MySubscription)
5810 {
5811 ereport(LOG,
5812 (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
5814
5815 /* Ensure we remove no-longer-useful entry for worker's start time */
5818
5819 proc_exit(0);
5820 }
5821
5822 MySubscriptionValid = true;
5824
5825 if (!MySubscription->enabled)
5826 {
5827 ereport(LOG,
5828 (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
5829 MySubscription->name)));
5830
5832 }
5833
5834 /*
5835 * Restart the worker if retain_dead_tuples was enabled during startup.
5836 *
5837 * At this point, the replication slot used for conflict detection might
5838 * not exist yet, or could be dropped soon if the launcher perceives
5839 * retain_dead_tuples as disabled. To avoid unnecessary tracking of
5840 * oldest_nonremovable_xid when the slot is absent or at risk of being
5841 * dropped, a restart is initiated.
5842 *
5843 * The oldest_nonremovable_xid should be initialized only when the
5844 * subscription's retention is active before launching the worker. See
5845 * logicalrep_worker_launch.
5846 */
5847 if (am_leader_apply_worker() &&
5851 {
5852 ereport(LOG,
5853 errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
5854 MySubscription->name, "retain_dead_tuples"));
5855
5857 }
5858
5859 /* Setup synchronous commit according to the user's wishes */
5860 SetConfigOption("synchronous_commit", MySubscription->synccommit,
5862
5863 /* Change wal_receiver_timeout according to the user's wishes */
5865
5866 /*
5867 * Keep us informed about subscription or role changes. Note that the
5868 * role's superuser privilege can be revoked.
5869 */
5872 (Datum) 0);
5873
5876 (Datum) 0);
5877
5878 if (am_tablesync_worker())
5879 ereport(LOG,
5880 errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
5883 else if (am_sequencesync_worker())
5884 ereport(LOG,
5885 errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started",
5887 else
5888 ereport(LOG,
5889 errmsg("logical replication apply worker for subscription \"%s\" has started",
5891
5893
5894 /*
5895 * Register a callback to reset the origin state before aborting any
5896 * pending transaction during shutdown (see ShutdownPostgres()). This will
5897 * avoid origin advancement for an incomplete transaction which could
5898 * otherwise lead to its loss as such a transaction won't be sent by the
5899 * server again.
5900 *
5901 * Note that even a LOG or DEBUG statement placed after setting the origin
5902 * state may process a shutdown signal before committing the current apply
5903 * operation. So, it is important to register such a callback here.
5904 *
5905 * Register this callback here to ensure that all types of logical
5906 * replication workers that set up origins and apply remote transactions
5907 * are protected.
5908 */
5910}
5911
5912/*
5913 * Callback on exit to clear transaction-level replication origin state.
5914 */
5915static void
5917{
5919}
5920
5921/*
5922 * Common function to setup the leader apply, tablesync and sequencesync worker.
5923 */
5924void
5926{
5927 /* Attach to slot */
5929
5931
5932 /* Setup signal handling */
5935
5936 /*
5937 * We don't currently need any ResourceOwner in a walreceiver process, but
5938 * if we did, we could call CreateAuxProcessResourceOwner here.
5939 */
5940
5941 /* Initialise stats to a sanish value */
5944
5945 /* Load the libpq-specific functions */
5946 load_file("libpqwalreceiver", false);
5947
5949
5950 /* Connect to the origin and start the replication. */
5951 elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
5953
5954 /*
5955 * Setup callback for syscache so that we know when something changes in
5956 * the subscription relation state.
5957 */
5960 (Datum) 0);
5961}
5962
5963/* Logical Replication Apply worker entry point */
5964void
5966{
5968
5970
5972
5974
5976
5977 proc_exit(0);
5978}
5979
5980/*
5981 * After error recovery, disable the subscription in a new transaction
5982 * and exit cleanly.
5983 */
5984void
5986{
5987 /*
5988 * Emit the error message, and recover from the error state to an idle
5989 * state
5990 */
5992
5996
5998
5999 /*
6000 * Report the worker failed during sequence synchronization, table
6001 * synchronization, or apply.
6002 */
6004
6005 /* Disable the subscription */
6007
6008 /*
6009 * Updating pg_subscription might involve TOAST table access, so ensure we
6010 * have a valid snapshot.
6011 */
6013
6017
6018 /* Ensure we remove no-longer-useful entry for worker's start time */
6021
6022 /* Notify the subscription has been disabled and exit */
6023 ereport(LOG,
6024 errmsg("subscription \"%s\" has been disabled because of an error",
6026
6027 /*
6028 * Skip the track_commit_timestamp check when disabling the worker due to
6029 * an error, as verifying commit timestamps is unnecessary in this
6030 * context.
6031 */
6035
6036 proc_exit(0);
6037}
6038
6039/*
6040 * Is current process a logical replication worker?
6041 */
6042bool
6044{
6045 return MyLogicalRepWorker != NULL;
6046}
6047
6048/*
6049 * Is current process a logical replication parallel apply worker?
6050 */
6051bool
6056
6057/*
6058 * Start skipping changes of the transaction if the given LSN matches the
6059 * LSN specified by subscription's skiplsn.
6060 */
6061static void
6063{
6067
6068 /*
6069 * Quick return if it's not requested to skip this transaction. This
6070 * function is called for every remote transaction and we assume that
6071 * skipping the transaction is not used often.
6072 */
6074 MySubscription->skiplsn != finish_lsn))
6075 return;
6076
6077 /* Start skipping all changes of this transaction */
6078 skip_xact_finish_lsn = finish_lsn;
6079
6080 ereport(LOG,
6081 errmsg("logical replication starts skipping transaction at LSN %X/%08X",
6083}
6084
6085/*
6086 * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
6087 */
6088static void
6090{
6091 if (!is_skipping_changes())
6092 return;
6093
6094 ereport(LOG,
6095 errmsg("logical replication completed skipping transaction at LSN %X/%08X",
6097
6098 /* Stop skipping changes */
6100}
6101
6102/*
6103 * Clear subskiplsn of pg_subscription catalog.
6104 *
6105 * finish_lsn is the transaction's finish LSN that is used to check if the
6106 * subskiplsn matches it. If not matched, we raise a warning when clearing the
6107 * subskiplsn in order to inform users for cases e.g., where the user mistakenly
6108 * specified the wrong subskiplsn.
6109 */
6110static void
6112{
6113 Relation rel;
6115 HeapTuple tup;
6117 bool started_tx = false;
6118
6120 return;
6121
6122 if (!IsTransactionState())
6123 {
6125 started_tx = true;
6126 }
6127
6128 /*
6129 * Updating pg_subscription might involve TOAST table access, so ensure we
6130 * have a valid snapshot.
6131 */
6133
6134 /*
6135 * Protect subskiplsn of pg_subscription from being concurrently updated
6136 * while clearing it.
6137 */
6140
6142
6143 /* Fetch the existing tuple. */
6146
6147 if (!HeapTupleIsValid(tup))
6148 elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
6149
6151
6152 /*
6153 * Clear the subskiplsn. If the user has already changed subskiplsn before
6154 * clearing it we don't update the catalog and the replication origin
6155 * state won't get advanced. So in the worst case, if the server crashes
6156 * before sending an acknowledgment of the flush position the transaction
6157 * will be sent again and the user needs to set subskiplsn again. We can
6158 * reduce the possibility by logging a replication origin WAL record to
6159 * advance the origin LSN instead but there is no way to advance the
6160 * origin timestamp and it doesn't seem to be worth doing anything about
6161 * it since it's a very rare case.
6162 */
6163 if (subform->subskiplsn == myskiplsn)
6164 {
6165 bool nulls[Natts_pg_subscription];
6168
6169 memset(values, 0, sizeof(values));
6170 memset(nulls, false, sizeof(nulls));
6171 memset(replaces, false, sizeof(replaces));
6172
6173 /* reset subskiplsn */
6176
6178 replaces);
6179 CatalogTupleUpdate(rel, &tup->t_self, tup);
6180
6181 if (myskiplsn != finish_lsn)
6183 errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
6184 errdetail("Remote transaction's finish WAL location (LSN) %X/%08X did not match skip-LSN %X/%08X.",
6185 LSN_FORMAT_ARGS(finish_lsn),
6187 }
6188
6190 table_close(rel, NoLock);
6191
6193
6194 if (started_tx)
6196}
6197
6198/* Error callback to give more context info about the change being applied */
6199void
6201{
6203
6205 return;
6206
6207 Assert(errarg->origin_name);
6208
6209 if (errarg->rel == NULL)
6210 {
6211 if (!TransactionIdIsValid(errarg->remote_xid))
6212 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
6213 errarg->origin_name,
6215 else if (!XLogRecPtrIsValid(errarg->finish_lsn))
6216 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
6217 errarg->origin_name,
6219 errarg->remote_xid);
6220 else
6221 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
6222 errarg->origin_name,
6224 errarg->remote_xid,
6225 LSN_FORMAT_ARGS(errarg->finish_lsn));
6226 }
6227 else
6228 {
6229 if (errarg->remote_attnum < 0)
6230 {
6231 if (!XLogRecPtrIsValid(errarg->finish_lsn))
6232 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
6233 errarg->origin_name,
6235 errarg->rel->remoterel.nspname,
6236 errarg->rel->remoterel.relname,
6237 errarg->remote_xid);
6238 else
6239 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
6240 errarg->origin_name,
6242 errarg->rel->remoterel.nspname,
6243 errarg->rel->remoterel.relname,
6244 errarg->remote_xid,
6245 LSN_FORMAT_ARGS(errarg->finish_lsn));
6246 }
6247 else
6248 {
6249 if (!XLogRecPtrIsValid(errarg->finish_lsn))
6250 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
6251 errarg->origin_name,
6253 errarg->rel->remoterel.nspname,
6254 errarg->rel->remoterel.relname,
6255 errarg->rel->remoterel.attnames[errarg->remote_attnum],
6256 errarg->remote_xid);
6257 else
6258 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
6259 errarg->origin_name,
6261 errarg->rel->remoterel.nspname,
6262 errarg->rel->remoterel.relname,
6263 errarg->rel->remoterel.attnames[errarg->remote_attnum],
6264 errarg->remote_xid,
6265 LSN_FORMAT_ARGS(errarg->finish_lsn));
6266 }
6267 }
6268}
6269
6270/* Set transaction information of apply error callback */
6271static inline void
6277
6278/* Reset all information of apply error callback */
6279static inline void
6287
6288/*
6289 * Request wakeup of the workers for the given subscription OID
6290 * at commit of the current transaction.
6291 *
6292 * This is used to ensure that the workers process assorted changes
6293 * as soon as possible.
6294 */
6295void
6305
6306/*
6307 * Wake up the workers of any subscriptions that were changed in this xact.
6308 */
6309void
6311{
6313 {
6314 ListCell *lc;
6315
6318 {
6319 Oid subid = lfirst_oid(lc);
6320 List *workers;
6321 ListCell *lc2;
6322
6323 workers = logicalrep_workers_find(subid, true, false);
6324 foreach(lc2, workers)
6325 {
6327
6329 }
6330 }
6332 }
6333
6334 /* The List storage will be reclaimed automatically in xact cleanup. */
6336}
6337
6338/*
6339 * Allocate the origin name in long-lived context for error context message.
6340 */
6341void
6347
6348/*
6349 * Return the action to be taken for the given transaction. See
6350 * TransApplyAction for information on each of the actions.
6351 *
6352 * *winfo is assigned to the destination parallel worker info when the leader
6353 * apply worker has to pass all the transaction's changes to the parallel
6354 * apply worker.
6355 */
6356static TransApplyAction
6358{
6359 *winfo = NULL;
6360
6362 {
6363 return TRANS_PARALLEL_APPLY;
6364 }
6365
6366 /*
6367 * If we are processing this transaction using a parallel apply worker
6368 * then either we send the changes to the parallel worker or if the worker
6369 * is busy then serialize the changes to the file which will later be
6370 * processed by the parallel worker.
6371 */
6372 *winfo = pa_find_worker(xid);
6373
6374 if (*winfo && (*winfo)->serialize_changes)
6375 {
6377 }
6378 else if (*winfo)
6379 {
6381 }
6382
6383 /*
6384 * If there is no parallel worker involved to process this transaction
6385 * then we either directly apply the change or serialize it to a file
6386 * which will later be applied when the transaction finish message is
6387 * processed.
6388 */
6389 else if (in_streamed_transaction)
6390 {
6392 }
6393 else
6394 {
6395 return TRANS_LEADER_APPLY;
6396 }
6397}
AclResult
Definition acl.h:182
@ ACLCHECK_OK
Definition acl.h:183
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition aclchk.c:2654
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition aclchk.c:4057
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_reset_subtrans(void)
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerShared * MyParallelShared
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_allocate_worker(TransactionId xid)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void pa_decr_and_wait_stream_block(void)
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition atomics.h:424
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition worker.c:2752
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition worker.c:5399
static void begin_replication_step(void)
Definition worker.c:729
static void end_replication_step(void)
Definition worker.c:752
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
Definition worker.c:873
static void cleanup_subxact_info(void)
Definition worker.c:5603
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
Definition worker.c:5553
static void apply_handle_stream_prepare(StringInfo s)
Definition worker.c:1521
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition worker.c:2727
static void subxact_info_add(TransactionId xid)
Definition worker.c:5321
static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
Definition worker.c:4772
static XLogRecPtr last_flushpos
Definition worker.c:528
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition worker.c:5420
MemoryContext ApplyMessageContext
Definition worker.c:472
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition worker.c:684
static void apply_handle_type(StringInfo s)
Definition worker.c:2589
static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
Definition worker.c:4404
static void wait_for_local_flush(RetainDeadTuplesData *rdt_data)
Definition worker.c:4617
static void apply_handle_truncate(StringInfo s)
Definition worker.c:3650
RetainDeadTuplesPhase
Definition worker.c:389
@ RDT_WAIT_FOR_PUBLISHER_STATUS
Definition worker.c:392
@ RDT_RESUME_CONFLICT_INFO_RETENTION
Definition worker.c:395
@ RDT_GET_CANDIDATE_XID
Definition worker.c:390
@ RDT_REQUEST_PUBLISHER_STATUS
Definition worker.c:391
@ RDT_WAIT_FOR_LOCAL_FLUSH
Definition worker.c:393
@ RDT_STOP_CONFLICT_INFO_RETENTION
Definition worker.c:394
static void run_apply_worker(void)
Definition worker.c:5662
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition worker.c:3968
static void get_candidate_xid(RetainDeadTuplesData *rdt_data)
Definition worker.c:4456
static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
Definition worker.c:6357
TransApplyAction
Definition worker.c:371
@ TRANS_LEADER_SERIALIZE
Definition worker.c:376
@ TRANS_PARALLEL_APPLY
Definition worker.c:379
@ TRANS_LEADER_SEND_TO_PARALLEL
Definition worker.c:377
@ TRANS_LEADER_APPLY
Definition worker.c:373
@ TRANS_LEADER_PARTIAL_SERIALIZE
Definition worker.c:378
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition worker.c:780
static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
Definition worker.c:5537
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition worker.c:5406
bool InitializingApplyWorker
Definition worker.c:500
static void apply_worker_exit(void)
Definition worker.c:5007
static BufFile * stream_fd
Definition worker.c:521
static void apply_handle_update(StringInfo s)
Definition worker.c:2793
void stream_stop_internal(TransactionId xid)
Definition worker.c:1865
static void apply_handle_stream_commit(StringInfo s)
Definition worker.c:2393
void start_apply(XLogRecPtr origin_startpos)
Definition worker.c:5622
static void stop_skipping_changes(void)
Definition worker.c:6089
#define NAPTIME_PER_CYCLE
Definition worker.c:300
static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition worker.c:3177
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition worker.c:3898
static bool update_retention_status(bool active)
Definition worker.c:4885
static uint32 parallel_stream_nchanges
Definition worker.c:497
static void apply_handle_commit_prepared(StringInfo s)
Definition worker.c:1408
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition worker.c:3984
void LogicalRepWorkersWakeupAtCommit(Oid subid)
Definition worker.c:6296
#define MAX_XID_ADVANCE_INTERVAL
Definition worker.c:457
bool IsLogicalWorker(void)
Definition worker.c:6043
static ApplySubXactData subxact_data
Definition worker.c:546
static void ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, pgoff_t offset)
Definition worker.c:2231
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
Definition worker.c:3354
static ApplyErrorCallbackArg apply_error_callback_arg
Definition worker.c:460
static void subscription_change_cb(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
Definition worker.c:5206
bool in_remote_transaction
Definition worker.c:485
static XLogRecPtr skip_xact_finish_lsn
Definition worker.c:517
static void stream_open_file(Oid subid, TransactionId xid, bool first_segment)
Definition worker.c:5444
static void apply_handle_delete(StringInfo s)
Definition worker.c:3015
void apply_dispatch(StringInfo s)
Definition worker.c:3778
static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
Definition worker.c:4958
#define is_skipping_changes()
Definition worker.c:518
static void stream_write_change(char action, StringInfo s)
Definition worker.c:5507
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
Definition worker.c:6111
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
Definition worker.c:2910
#define MIN_XID_ADVANCE_INTERVAL
Definition worker.c:456
static void apply_handle_begin(StringInfo s)
Definition worker.c:1214
void DisableSubscriptionAndExit(void)
Definition worker.c:5985
static dlist_head lsn_mapping
Definition worker.c:309
bool IsLogicalParallelApplyWorker(void)
Definition worker.c:6052
void AtEOXact_LogicalRepWorkers(bool isCommit)
Definition worker.c:6310
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition worker.c:1020
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition worker.c:644
static void finish_edata(ApplyExecutionData *edata)
Definition worker.c:931
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition worker.c:1121
static void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
Definition worker.c:6272
ErrorContextCallback * apply_error_context_stack
Definition worker.c:470
static void stream_abort_internal(TransactionId xid, TransactionId subxid)
Definition worker.c:1991
static void apply_handle_commit(StringInfo s)
Definition worker.c:1239
static bool IsIndexUsableForFindingDeletedTuple(Oid localindexoid, TransactionId conflict_detection_xmin)
Definition worker.c:3238
void stream_start_internal(TransactionId xid, bool first_segment)
Definition worker.c:1690
static List * on_commit_wakeup_workers_subids
Definition worker.c:483
static void apply_handle_stream_abort(StringInfo s)
Definition worker.c:2074
static void apply_handle_relation(StringInfo s)
Definition worker.c:2566
void set_apply_error_context_origin(char *originname)
Definition worker.c:6342
static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data, bool status_received)
Definition worker.c:4558
MemoryContext ApplyContext
Definition worker.c:473
static void subxact_info_write(Oid subid, TransactionId xid)
Definition worker.c:5221
static void TargetPrivilegesCheck(Relation rel, AclMode mode)
Definition worker.c:2604
static void apply_handle_prepare(StringInfo s)
Definition worker.c:1334
static void apply_handle_rollback_prepared(StringInfo s)
Definition worker.c:1460
void SetupApplyOrSyncWorker(int worker_slot)
Definition worker.c:5925
static void apply_handle_stream_stop(StringInfo s)
Definition worker.c:1888
static void apply_handle_origin(StringInfo s)
Definition worker.c:1669
static void request_publisher_status(RetainDeadTuplesData *rdt_data)
Definition worker.c:4519
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition worker.c:4300
static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
Definition worker.c:4924
static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data, bool status_received)
Definition worker.c:4426
static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data, bool status_received)
Definition worker.c:4390
WalReceiverConn * LogRepWorkerWalRcvConn
Definition worker.c:478
static void resume_conflict_info_retention(RetainDeadTuplesData *rdt_data)
Definition worker.c:4845
static XLogRecPtr remote_final_lsn
Definition worker.c:486
static bool MySubscriptionValid
Definition worker.c:481
void apply_error_callback(void *arg)
Definition worker.c:6200
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition worker.c:3942
static MemoryContext LogicalStreamingContext
Definition worker.c:476
void maybe_reread_subscription(void)
Definition worker.c:5041
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
Definition worker.c:2506
void InitializeLogRepWorker(void)
Definition worker.c:5775
static void set_wal_receiver_timeout(void)
Definition worker.c:5173
static bool in_streamed_transaction
Definition worker.c:489
static void apply_handle_begin_prepare(StringInfo s)
Definition worker.c:1268
void ApplyWorkerMain(Datum main_arg)
Definition worker.c:5965
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition worker.c:2263
static void apply_handle_stream_start(StringInfo s)
Definition worker.c:1728
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Definition worker.c:6062
static void on_exit_clear_xact_state(int code, Datum arg)
Definition worker.c:5916
static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
Definition worker.c:4807
Subscription * MySubscription
Definition worker.c:480
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
Definition worker.c:1297
static void stream_close_file(void)
Definition worker.c:5489
static TransactionId stream_xid
Definition worker.c:491
static void apply_handle_insert(StringInfo s)
Definition worker.c:2636
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition worker.c:962
static bool FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, TupleTableSlot *remoteslot, TransactionId *delete_xid, ReplOriginId *delete_origin, TimestampTz *delete_time)
Definition worker.c:3272
static void subxact_info_read(Oid subid, TransactionId xid)
Definition worker.c:5270
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
Definition worker.c:3109
static void reset_apply_error_context_info(void)
Definition worker.c:6280
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1757
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1781
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1645
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1609
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_IDLE
@ STATE_IDLEINTRANSACTION
@ STATE_RUNNING
void BackgroundWorkerUnblockSignals(void)
Definition bgworker.c:933
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition bgworker.c:893
Bitmapset * bms_make_singleton(int x)
Definition bitmapset.c:216
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition bitmapset.c:799
static Datum values[MAXATTR]
Definition bootstrap.c:147
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition buffile.c:291
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition buffile.c:654
int BufFileSeek(BufFile *file, int fileno, pgoff_t offset, int whence)
Definition buffile.c:740
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition buffile.c:676
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
Definition buffile.c:664
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition buffile.c:267
void BufFileTruncateFileSet(BufFile *file, int fileno, pgoff_t offset)
Definition buffile.c:927
void BufFileTell(BufFile *file, int *fileno, pgoff_t *offset)
Definition buffile.c:832
void BufFileClose(BufFile *file)
Definition buffile.c:412
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
Definition buffile.c:364
#define Min(x, y)
Definition c.h:1019
#define likely(x)
Definition c.h:423
#define Assert(condition)
Definition c.h:885
int64_t int64
Definition c.h:555
uint64_t uint64
Definition c.h:559
uint32_t uint32
Definition c.h:558
#define pg_fallthrough
Definition c.h:144
uint32 TransactionId
Definition c.h:678
#define OidIsValid(objectId)
Definition c.h:800
size_t Size
Definition c.h:631
bool track_commit_timestamp
Definition commit_ts.c:109
bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, ReplOriginId *localorigin, TimestampTz *localts)
Definition conflict.c:63
void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples)
Definition conflict.c:104
void InitConflictIndexes(ResultRelInfo *relInfo)
Definition conflict.c:139
ConflictType
Definition conflict.h:32
@ CT_UPDATE_DELETED
Definition conflict.h:43
@ CT_DELETE_MISSING
Definition conflict.h:52
@ CT_UPDATE_ORIGIN_DIFFERS
Definition conflict.h:37
@ CT_UPDATE_MISSING
Definition conflict.h:46
@ CT_DELETE_ORIGIN_DIFFERS
Definition conflict.h:49
int64 TimestampTz
Definition timestamp.h:39
void load_file(const char *filename, bool restricted)
Definition dfmgr.c:149
Datum arg
Definition elog.c:1322
void EmitErrorReport(void)
Definition elog.c:1882
ErrorContextCallback * error_context_stack
Definition elog.c:99
void FlushErrorState(void)
Definition elog.c:2062
int errcode(int sqlerrcode)
Definition elog.c:874
int errmsg(const char *fmt,...)
Definition elog.c:1093
#define LOG
Definition elog.h:31
#define PG_RE_THROW()
Definition elog.h:405
int int errdetail_internal(const char *fmt,...) pg_attribute_printf(1
#define errcontext
Definition elog.h:198
int errdetail(const char *fmt,...) pg_attribute_printf(1
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define PG_TRY(...)
Definition elog.h:372
#define WARNING
Definition elog.h:36
#define DEBUG2
Definition elog.h:29
#define PG_END_TRY(...)
Definition elog.h:397
#define DEBUG1
Definition elog.h:30
#define ERROR
Definition elog.h:39
#define PG_CATCH(...)
Definition elog.h:382
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
bool equal(const void *a, const void *b)
Definition equalfuncs.c:223
void err(int eval, const char *fmt,...)
Definition err.c:43
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition execExpr.c:143
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition execMain.c:1860
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam, List *resultRelations)
Definition execMain.c:2722
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition execMain.c:1247
void EvalPlanQualEnd(EPQState *epqstate)
Definition execMain.c:3183
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
void CheckSubscriptionRelkind(char localrelkind, char remoterelkind, const char *nspname, const char *relname)
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
bool RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, ReplOriginId *delete_origin, TimestampTz *delete_time)
bool RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, ReplOriginId *delete_origin, TimestampTz *delete_time)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
const TupleTableSlotOps TTSOpsVirtual
Definition execTuples.c:84
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
TupleConversionMap * ExecGetRootToChildMap(ResultRelInfo *resultRelInfo, EState *estate)
Definition execUtils.c:1326
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos, Bitmapset *unpruned_relids)
Definition execUtils.c:773
void FreeExecutorState(EState *estate)
Definition execUtils.c:192
EState * CreateExecutorState(void)
Definition execUtils.c:88
#define GetPerTupleExprContext(estate)
Definition executor.h:656
#define GetPerTupleMemoryContext(estate)
Definition executor.h:661
#define EvalPlanQualSetSlot(epqstate, slot)
Definition executor.h:289
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition executor.h:393
#define ERRCODE_PROTOCOL_VIOLATION
Definition fe-connect.c:96
#define palloc_object(type)
Definition fe_memutils.h:74
#define palloc0_object(type)
Definition fe_memutils.h:75
void FileSetInit(FileSet *fileset)
Definition fileset.c:52
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
Definition fmgr.c:1772
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition fmgr.c:1754
struct Latch * MyLatch
Definition globals.c:63
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
bool parse_int(const char *value, int *result, int flags, const char **hintmsg)
Definition guc.c:2743
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition guc.c:4196
@ PGC_S_OVERRIDE
Definition guc.h:123
@ PGC_S_SESSION
Definition guc.h:126
@ PGC_SUSET
Definition guc.h:78
@ PGC_SIGHUP
Definition guc.h:75
@ PGC_BACKEND
Definition guc.h:77
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition heaptuple.c:1210
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static TransactionId HeapTupleHeaderGetXmin(const HeapTupleHeaderData *tup)
static void * GETSTRUCT(const HeapTupleData *tuple)
static void dlist_delete(dlist_node *node)
Definition ilist.h:405
#define dlist_tail_element(type, membername, lhead)
Definition ilist.h:612
#define dlist_foreach_modify(iter, lhead)
Definition ilist.h:640
static bool dlist_is_empty(const dlist_head *head)
Definition ilist.h:336
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition ilist.h:364
#define DLIST_STATIC_INIT(name)
Definition ilist.h:281
#define dlist_container(type, membername, ptr)
Definition ilist.h:593
void index_close(Relation relation, LOCKMODE lockmode)
Definition indexam.c:177
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition indexam.c:133
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition indexing.c:313
long val
Definition informix.c:689
#define write(a, b, c)
Definition win32.h:14
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
void AcceptInvalidationMessages(void)
Definition inval.c:930
void CacheRegisterSyscacheCallback(SysCacheIdentifier cacheid, SyscacheCallbackFunction func, Datum arg)
Definition inval.c:1816
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
void proc_exit(int code)
Definition ipc.c:105
int i
Definition isn.c:77
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition latch.c:223
void ResetLatch(Latch *latch)
Definition latch.c:374
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition launcher.c:293
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition launcher.c:746
void logicalrep_worker_attach(int slot)
Definition launcher.c:757
void ApplyLauncherWakeup(void)
Definition launcher.c:1194
LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
Definition launcher.c:258
void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
Definition launcher.c:723
LogicalRepWorker * MyLogicalRepWorker
Definition launcher.c:56
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition launcher.c:1154
List * lappend(List *list, void *datum)
Definition list.c:339
List * lappend_oid(List *list, Oid datum)
Definition list.c:375
List * list_append_unique_oid(List *list, Oid datum)
Definition list.c:1380
bool list_member_oid(const List *list, Oid datum)
Definition list.c:722
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
int LOCKMODE
Definition lockdefs.h:26
#define NoLock
Definition lockdefs.h:34
#define AccessExclusiveLock
Definition lockdefs.h:43
#define AccessShareLock
Definition lockdefs.h:36
#define RowExclusiveLock
Definition lockdefs.h:38
@ LockTupleExclusive
Definition lockoptions.h:59
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
#define LOGICALREP_COLUMN_UNCHANGED
LogicalRepMsgType
@ LOGICAL_REP_MSG_INSERT
@ LOGICAL_REP_MSG_TRUNCATE
@ LOGICAL_REP_MSG_STREAM_STOP
@ LOGICAL_REP_MSG_BEGIN
@ LOGICAL_REP_MSG_STREAM_PREPARE
@ LOGICAL_REP_MSG_STREAM_ABORT
@ LOGICAL_REP_MSG_BEGIN_PREPARE
@ LOGICAL_REP_MSG_STREAM_START
@ LOGICAL_REP_MSG_COMMIT
@ LOGICAL_REP_MSG_PREPARE
@ LOGICAL_REP_MSG_RELATION
@ LOGICAL_REP_MSG_MESSAGE
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
@ LOGICAL_REP_MSG_COMMIT_PREPARED
@ LOGICAL_REP_MSG_TYPE
@ LOGICAL_REP_MSG_DELETE
@ LOGICAL_REP_MSG_STREAM_COMMIT
@ LOGICAL_REP_MSG_ORIGIN
@ LOGICAL_REP_MSG_UPDATE
uint32 LogicalRepRelId
#define LOGICALREP_PROTO_VERSION_NUM
#define LOGICALREP_COLUMN_BINARY
#define LOGICALREP_COLUMN_TEXT
char * get_rel_name(Oid relid)
Definition lsyscache.c:2078
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition lsyscache.c:3026
char * get_namespace_name(Oid nspid)
Definition lsyscache.c:3518
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition lsyscache.c:3092
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1176
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1793
@ LW_SHARED
Definition lwlock.h:113
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition mcxt.c:1768
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:403
MemoryContext TopTransactionContext
Definition mcxt.c:171
char * pstrdup(const char *in)
Definition mcxt.c:1781
void * repalloc(void *pointer, Size size)
Definition mcxt.c:1632
void pfree(void *pointer)
Definition mcxt.c:1616
MemoryContext TopMemoryContext
Definition mcxt.c:166
void * palloc(Size size)
Definition mcxt.c:1387
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
#define RESUME_INTERRUPTS()
Definition miscadmin.h:136
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
#define HOLD_INTERRUPTS()
Definition miscadmin.h:134
Oid GetUserId(void)
Definition miscinit.c:469
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition miscinit.c:988
CmdType
Definition nodes.h:273
@ CMD_INSERT
Definition nodes.h:277
@ CMD_DELETE
Definition nodes.h:278
@ CMD_UPDATE
Definition nodes.h:276
#define makeNode(_type_)
Definition nodes.h:161
ObjectType get_relkind_objtype(char relkind)
ReplOriginId replorigin_create(const char *roname)
Definition origin.c:262
ReplOriginXactState replorigin_xact_state
Definition origin.c:166
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:231
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition origin.c:1328
void replorigin_xact_clear(bool clear_origin)
Definition origin.c:1352
void replorigin_session_setup(ReplOriginId node, int acquired_by)
Definition origin.c:1146
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
#define ACL_DELETE
Definition parsenodes.h:79
uint64 AclMode
Definition parsenodes.h:74
#define ACL_INSERT
Definition parsenodes.h:76
#define ACL_UPDATE
Definition parsenodes.h:78
@ RTE_RELATION
@ DROP_RESTRICT
#define ACL_SELECT
Definition parsenodes.h:77
#define ACL_TRUNCATE
Definition parsenodes.h:80
int16 attnum
FormData_pg_attribute * Form_pg_attribute
static uint32 pg_ceil_log2_32(uint32 num)
static PgChecksumMode mode
#define NAMEDATALEN
#define MAXPGPATH
const void size_t len
static int server_version
Definition pg_dumpall.c:121
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
#define lfirst(lc)
Definition pg_list.h:172
#define NIL
Definition pg_list.h:68
#define list_make1(x1)
Definition pg_list.h:212
static void * list_nth(const List *list, int n)
Definition pg_list.h:299
#define lfirst_oid(lc)
Definition pg_list.h:174
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
void FreeSubscription(Subscription *sub)
void DisableSubscription(Oid subid)
void UpdateDeadTupleRetentionStatus(Oid subid, bool active)
Subscription * GetSubscription(Oid subid, bool missing_ok)
END_CATALOG_STRUCT typedef FormData_pg_subscription * Form_pg_subscription
static char buf[DEFAULT_XLOG_SEG_SIZE]
long pgstat_report_stat(bool force)
Definition pgstat.c:704
void pgstat_report_subscription_error(Oid subid)
int64 timestamp
Expr * expression_planner(Expr *expr)
Definition planner.c:6819
#define pqsignal
Definition port.h:547
int pgsocket
Definition port.h:29
#define snprintf
Definition port.h:260
#define PGINVALID_SOCKET
Definition port.h:31
off_t pgoff_t
Definition port.h:421
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:262
uint64_t Datum
Definition postgres.h:70
static int32 DatumGetInt32(Datum X)
Definition postgres.h:212
#define InvalidOid
unsigned int Oid
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition pqformat.c:414
int pq_getmsgbyte(StringInfo msg)
Definition pqformat.c:398
int64 pq_getmsgint64(StringInfo msg)
Definition pqformat.c:452
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition pqformat.h:152
char * c
static int fd(const char *x, int i)
static int fb(int x)
char * s2
TransactionId GetOldestActiveTransactionId(bool inCommitOnly, bool allDbs)
Definition procarray.c:2836
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition proto.c:98
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition proto.c:561
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
Definition proto.c:325
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
Definition proto.c:134
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition proto.c:757
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition proto.c:487
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition proto.c:615
void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
Definition proto.c:1187
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition proto.c:63
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
Definition proto.c:267
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition proto.c:698
const char * logicalrep_message_type(LogicalRepMsgType action)
Definition proto.c:1212
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition proto.c:365
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition proto.c:1132
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition proto.c:428
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition proto.c:228
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition proto.c:1082
#define PqReplMsg_WALData
Definition protocol.h:77
#define PqReplMsg_PrimaryStatusRequest
Definition protocol.h:83
#define PqReplMsg_Keepalive
Definition protocol.h:75
#define PqReplMsg_PrimaryStatusUpdate
Definition protocol.h:76
#define PqReplMsg_StandbyStatusUpdate
Definition protocol.h:84
static color newsub(struct colormap *cm, color co)
Definition regc_color.c:389
#define RelationGetRelid(relation)
Definition rel.h:514
#define RelationIsLogicallyLogged(relation)
Definition rel.h:710
#define RelationGetDescr(relation)
Definition rel.h:540
#define RelationGetRelationName(relation)
Definition rel.h:548
#define RELATION_IS_OTHER_TEMP(relation)
Definition rel.h:667
#define RelationGetNamespace(relation)
Definition rel.h:555
List * RelationGetIndexList(Relation relation)
Definition relcache.c:4831
ResourceOwner TopTransactionResourceOwner
Definition resowner.c:175
ResourceOwner CurrentResourceOwner
Definition resowner.c:173
Node * build_column_default(Relation rel, int attrno)
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition rls.c:52
@ RLS_ENABLED
Definition rls.h:45
Snapshot GetTransactionSnapshot(void)
Definition snapmgr.c:272
void PushActiveSnapshot(Snapshot snapshot)
Definition snapmgr.c:682
void PopActiveSnapshot(void)
Definition snapmgr.c:775
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
Definition relation.c:585
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition relation.c:647
bool IsIndexUsableForReplicaIdentityFull(Relation idxrel, AttrMap *attrmap)
Definition relation.c:835
Oid GetRelationIdentityOrPK(Relation rel)
Definition relation.c:905
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition relation.c:165
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition relation.c:518
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition relation.c:362
StringInfo makeStringInfo(void)
Definition stringinfo.c:72
void resetStringInfo(StringInfo str)
Definition stringinfo.c:126
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition stringinfo.h:157
TransactionId remote_xid
Definition worker.c:331
LogicalRepMsgType command
Definition worker.c:326
XLogRecPtr finish_lsn
Definition worker.c:332
LogicalRepRelMapEntry * rel
Definition worker.c:327
ResultRelInfo * targetRelInfo
Definition worker.c:316
EState * estate
Definition worker.c:313
PartitionTupleRouting * proute
Definition worker.c:320
ModifyTableState * mtstate
Definition worker.c:319
LogicalRepRelMapEntry * targetRel
Definition worker.c:315
uint32 nsubxacts
Definition worker.c:540
uint32 nsubxacts_max
Definition worker.c:541
SubXactInfo * subxacts
Definition worker.c:543
TransactionId subxact_last
Definition worker.c:542
int maplen
Definition attmap.h:37
AttrNumber * attnums
Definition attmap.h:36
List * es_rteperminfos
Definition execnodes.h:671
List * es_tupleTable
Definition execnodes.h:715
List * es_opened_result_relations
Definition execnodes.h:691
CommandId es_output_cid
Definition execnodes.h:685
struct ErrorContextCallback * previous
Definition elog.h:297
void(* callback)(void *arg)
Definition elog.h:298
dlist_node node
Definition worker.c:304
XLogRecPtr remote_end
Definition worker.c:306
XLogRecPtr local_end
Definition worker.c:305
Definition pg_list.h:54
LogicalRepRelation remoterel
TimestampTz last_recv_time
LogicalRepWorkerType type
TimestampTz reply_time
FileSet * stream_fileset
TransactionId oldest_nonremovable_xid
TimestampTz last_send_time
ResultRelInfo * resultRelInfo
Definition execnodes.h:1411
ParallelApplyWorkerShared * shared
pg_atomic_uint32 pending_stream_count
Plan * plan
Definition execnodes.h:1168
EState * state
Definition execnodes.h:1170
Form_pg_class rd_rel
Definition rel.h:111
ReplOriginId origin
Definition origin.h:45
XLogRecPtr origin_lsn
Definition origin.h:46
TimestampTz origin_timestamp
Definition origin.h:47
TimestampTz flushpos_update_time
Definition worker.c:433
FullTransactionId remote_oldestxid
Definition worker.c:413
FullTransactionId remote_wait_for
Definition worker.c:429
TimestampTz last_recv_time
Definition worker.c:444
TimestampTz candidate_xid_time
Definition worker.c:445
long table_sync_wait_time
Definition worker.c:437
FullTransactionId remote_nextxid
Definition worker.c:420
RetainDeadTuplesPhase phase
Definition worker.c:404
XLogRecPtr remote_lsn
Definition worker.c:405
TimestampTz reply_time
Definition worker.c:422
TransactionId candidate_xid
Definition worker.c:431
TransactionId xid
Definition worker.c:532
pgoff_t offset
Definition worker.c:534
int fileno
Definition worker.c:533
XLogRecPtr skiplsn
AttrMap * attrMap
Definition tupconvert.h:28
TupleDesc tts_tupleDescriptor
Definition tuptable.h:122
bool * tts_isnull
Definition tuptable.h:126
Datum * tts_values
Definition tuptable.h:124
dlist_node * cur
Definition ilist.h:200
void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
void ProcessSyncingRelations(XLogRecPtr current_lsn)
Definition syncutils.c:156
void InvalidateSyncingRelStates(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
Definition syncutils.c:101
#define FirstLowInvalidHeapAttributeNumber
Definition sysattr.h:27
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:264
HeapTuple SearchSysCache1(SysCacheIdentifier cacheId, Datum key1)
Definition syscache.c:220
#define SearchSysCacheCopy1(cacheId, key1)
Definition syscache.h:91
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition tableam.c:92
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs, bool run_as_table_owner)
Definition tablecmds.c:1991
bool AllTablesyncsReady(void)
Definition tablesync.c:1596
bool HasSubscriptionTablesCached(void)
Definition tablesync.c:1626
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition tablesync.c:1647
#define InvalidTransactionId
Definition transam.h:31
#define FullTransactionIdPrecedesOrEquals(a, b)
Definition transam.h:52
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition transam.h:282
static FullTransactionId FullTransactionIdFromU64(uint64 value)
Definition transam.h:81
#define TransactionIdEquals(id1, id2)
Definition transam.h:43
#define TransactionIdIsValid(xid)
Definition transam.h:41
#define InvalidFullTransactionId
Definition transam.h:56
#define FullTransactionIdIsValid(x)
Definition transam.h:55
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263
void AfterTriggerEndQuery(EState *estate)
Definition trigger.c:5135
void AfterTriggerBeginQuery(void)
Definition trigger.c:5115
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition tupconvert.c:103
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition tupconvert.c:193
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition tupdesc.h:160
static CompactAttribute * TupleDescCompactAttr(TupleDesc tupdesc, int i)
Definition tupdesc.h:175
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition tuptable.h:457
static void slot_getallattrs(TupleTableSlot *slot)
Definition tuptable.h:371
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition tuptable.h:524
void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res, int szgid)
Definition twophase.c:2747
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
Definition twophase.c:2688
void FinishPreparedTransaction(const char *gid, bool isCommit)
Definition twophase.c:1497
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition usercontext.c:87
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
const char * type
#define WL_SOCKET_READABLE
#define WL_TIMEOUT
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET
static StringInfoData reply_message
int wal_receiver_status_interval
Definition walreceiver.c:89
int wal_receiver_timeout
Definition walreceiver.c:90
#define walrcv_startstreaming(conn, options)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_send(conn, buffer, nbytes)
#define walrcv_server_version(conn)
#define walrcv_endstreaming(conn, next_tli)
#define walrcv_identify_system(conn, primary_tli)
#define walrcv_receive(conn, buffer, wait_fd)
int WalWriterDelay
Definition walwriter.c:70
#define SIGHUP
Definition win32_port.h:158
@ PARALLEL_TRANS_STARTED
@ PARALLEL_TRANS_FINISHED
static bool am_parallel_apply_worker(void)
@ WORKERTYPE_TABLESYNC
@ WORKERTYPE_UNKNOWN
@ WORKERTYPE_SEQUENCESYNC
@ WORKERTYPE_PARALLEL_APPLY
@ WORKERTYPE_APPLY
@ FS_SERIALIZE_DONE
static bool am_sequencesync_worker(void)
static bool am_tablesync_worker(void)
static bool am_leader_apply_worker(void)
bool IsTransactionOrTransactionBlock(void)
Definition xact.c:5011
bool PrepareTransactionBlock(const char *gid)
Definition xact.c:4014
bool IsTransactionState(void)
Definition xact.c:388
void CommandCounterIncrement(void)
Definition xact.c:1101
void StartTransactionCommand(void)
Definition xact.c:3080
void SetCurrentStatementStartTimestamp(void)
Definition xact.c:915
bool IsTransactionBlock(void)
Definition xact.c:4993
void BeginTransactionBlock(void)
Definition xact.c:3946
void CommitTransactionCommand(void)
Definition xact.c:3178
bool EndTransactionBlock(bool chain)
Definition xact.c:4066
void AbortOutOfAnyTransaction(void)
Definition xact.c:4884
CommandId GetCurrentCommandId(bool used)
Definition xact.c:830
#define GIDSIZE
Definition xact.h:31
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition xlog.c:6625
XLogRecPtr XactLastCommitEnd
Definition xlog.c:258
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint16 ReplOriginId
Definition xlogdefs.h:69
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
uint32 TimeLineID
Definition xlogdefs.h:63