PostgreSQL Source Code git master
Loading...
Searching...
No Matches
worker.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 * worker.c
3 * PostgreSQL logical replication worker (apply)
4 *
5 * Copyright (c) 2016-2026, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/worker.c
9 *
10 * NOTES
11 * This file contains the worker which applies logical changes as they come
12 * from remote logical replication stream.
13 *
14 * The main worker (apply) is started by logical replication worker
15 * launcher for every enabled subscription in a database. It uses
16 * walsender protocol to communicate with publisher.
17 *
18 * This module includes server facing code and shares libpqwalreceiver
19 * module with walreceiver for providing the libpq specific functionality.
20 *
21 *
22 * STREAMED TRANSACTIONS
23 * ---------------------
24 * Streamed transactions (large transactions exceeding a memory limit on the
25 * upstream) are applied using one of two approaches:
26 *
27 * 1) Write to temporary files and apply when the final commit arrives
28 *
29 * This approach is used when the user has set the subscription's streaming
30 * option as on.
31 *
32 * Unlike the regular (non-streamed) case, handling streamed transactions has
33 * to handle aborts of both the toplevel transaction and subtransactions. This
34 * is achieved by tracking offsets for subtransactions, which is then used
35 * to truncate the file with serialized changes.
36 *
37 * The files are placed in tmp file directory by default, and the filenames
38 * include both the XID of the toplevel transaction and OID of the
39 * subscription. This is necessary so that different workers processing a
40 * remote transaction with the same XID doesn't interfere.
41 *
42 * We use BufFiles instead of using normal temporary files because (a) the
43 * BufFile infrastructure supports temporary files that exceed the OS file size
44 * limit, (b) provides a way for automatic clean up on the error and (c) provides
45 * a way to survive these files across local transactions and allow to open and
46 * close at stream start and close. We decided to use FileSet
47 * infrastructure as without that it deletes the files on the closure of the
48 * file and if we decide to keep stream files open across the start/stop stream
49 * then it will consume a lot of memory (more than 8K for each BufFile and
50 * there could be multiple such BufFiles as the subscriber could receive
51 * multiple start/stop streams for different transactions before getting the
52 * commit). Moreover, if we don't use FileSet then we also need to invent
53 * a new way to pass filenames to BufFile APIs so that we are allowed to open
54 * the file we desired across multiple stream-open calls for the same
55 * transaction.
56 *
57 * 2) Parallel apply workers.
58 *
59 * This approach is used when the user has set the subscription's streaming
60 * option as parallel. See logical/applyparallelworker.c for information about
61 * this approach.
62 *
63 * TWO_PHASE TRANSACTIONS
64 * ----------------------
65 * Two phase transactions are replayed at prepare and then committed or
66 * rolled back at commit prepared and rollback prepared respectively. It is
67 * possible to have a prepared transaction that arrives at the apply worker
68 * when the tablesync is busy doing the initial copy. In this case, the apply
69 * worker skips all the prepared operations [e.g. inserts] while the tablesync
70 * is still busy (see the condition of should_apply_changes_for_rel). The
71 * tablesync worker might not get such a prepared transaction because say it
72 * was prior to the initial consistent point but might have got some later
73 * commits. Now, the tablesync worker will exit without doing anything for the
74 * prepared transaction skipped by the apply worker as the sync location for it
75 * will be already ahead of the apply worker's current location. This would lead
76 * to an "empty prepare", because later when the apply worker does the commit
77 * prepare, there is nothing in it (the inserts were skipped earlier).
78 *
79 * To avoid this, and similar prepare confusions the subscription's two_phase
80 * commit is enabled only after the initial sync is over. The two_phase option
81 * has been implemented as a tri-state with values DISABLED, PENDING, and
82 * ENABLED.
83 *
84 * Even if the user specifies they want a subscription with two_phase = on,
85 * internally it will start with a tri-state of PENDING which only becomes
86 * ENABLED after all tablesync initializations are completed - i.e. when all
87 * tablesync workers have reached their READY state. In other words, the value
88 * PENDING is only a temporary state for subscription start-up.
89 *
90 * Until the two_phase is properly available (ENABLED) the subscription will
91 * behave as if two_phase = off. When the apply worker detects that all
92 * tablesyncs have become READY (while the tri-state was PENDING) it will
93 * restart the apply worker process. This happens in
94 * ProcessSyncingTablesForApply.
95 *
96 * When the (re-started) apply worker finds that all tablesyncs are READY for a
97 * two_phase tri-state of PENDING it start streaming messages with the
98 * two_phase option which in turn enables the decoding of two-phase commits at
99 * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100 * Now, it is possible that during the time we have not enabled two_phase, the
101 * publisher (replication server) would have skipped some prepares but we
102 * ensure that such prepares are sent along with commit prepare, see
103 * ReorderBufferFinishPrepared.
104 *
105 * If the subscription has no tables then a two_phase tri-state PENDING is
106 * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107 * PUBLICATION which might otherwise be disallowed (see below).
108 *
109 * If ever a user needs to be aware of the tri-state value, they can fetch it
110 * from the pg_subscription catalog (see column subtwophasestate).
111 *
112 * Finally, to avoid problems mentioned in previous paragraphs from any
113 * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
114 * to 'off' and then again back to 'on') there is a restriction for
115 * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
116 * the two_phase tri-state is ENABLED, except when copy_data = false.
117 *
118 * We can get prepare of the same GID more than once for the genuine cases
119 * where we have defined multiple subscriptions for publications on the same
120 * server and prepared transaction has operations on tables subscribed to those
121 * subscriptions. For such cases, if we use the GID sent by publisher one of
122 * the prepares will be successful and others will fail, in which case the
123 * server will send them again. Now, this can lead to a deadlock if user has
124 * set synchronous_standby_names for all the subscriptions on subscriber. To
125 * avoid such deadlocks, we generate a unique GID (consisting of the
126 * subscription oid and the xid of the prepared transaction) for each prepare
127 * transaction on the subscriber.
128 *
129 * FAILOVER
130 * ----------------------
131 * The logical slot on the primary can be synced to the standby by specifying
132 * failover = true when creating the subscription. Enabling failover allows us
133 * to smoothly transition to the promoted standby, ensuring that we can
134 * subscribe to the new primary without losing any data.
135 *
136 * RETAIN DEAD TUPLES
137 * ----------------------
138 * Each apply worker that enabled retain_dead_tuples option maintains a
139 * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
140 * prevent dead rows from being removed prematurely when the apply worker still
141 * needs them to detect update_deleted conflicts. Additionally, this helps to
142 * retain the required commit_ts module information, which further helps to
143 * detect update_origin_differs and delete_origin_differs conflicts reliably, as
144 * otherwise, vacuum freeze could remove the required information.
145 *
146 * The logical replication launcher manages an internal replication slot named
147 * "pg_conflict_detection". It asynchronously aggregates the non-removable
148 * transaction ID from all apply workers to determine the appropriate xmin for
149 * the slot, thereby retaining necessary tuples.
150 *
151 * The non-removable transaction ID in the apply worker is advanced to the
152 * oldest running transaction ID once all concurrent transactions on the
153 * publisher have been applied and flushed locally. The process involves:
154 *
155 * - RDT_GET_CANDIDATE_XID:
156 * Call GetOldestActiveTransactionId() to take oldestRunningXid as the
157 * candidate xid.
158 *
159 * - RDT_REQUEST_PUBLISHER_STATUS:
160 * Send a message to the walsender requesting the publisher status, which
161 * includes the latest WAL write position and information about transactions
162 * that are in the commit phase.
163 *
164 * - RDT_WAIT_FOR_PUBLISHER_STATUS:
165 * Wait for the status from the walsender. After receiving the first status,
166 * do not proceed if there are concurrent remote transactions that are still
167 * in the commit phase. These transactions might have been assigned an
168 * earlier commit timestamp but have not yet written the commit WAL record.
169 * Continue to request the publisher status (RDT_REQUEST_PUBLISHER_STATUS)
170 * until all these transactions have completed.
171 *
172 * - RDT_WAIT_FOR_LOCAL_FLUSH:
173 * Advance the non-removable transaction ID if the current flush location has
174 * reached or surpassed the last received WAL position.
175 *
176 * - RDT_STOP_CONFLICT_INFO_RETENTION:
177 * This phase is required only when max_retention_duration is defined. We
178 * enter this phase if the wait time in either the
179 * RDT_WAIT_FOR_PUBLISHER_STATUS or RDT_WAIT_FOR_LOCAL_FLUSH phase exceeds
180 * configured max_retention_duration. In this phase,
181 * pg_subscription.subretentionactive is updated to false within a new
182 * transaction, and oldest_nonremovable_xid is set to InvalidTransactionId.
183 *
184 * - RDT_RESUME_CONFLICT_INFO_RETENTION:
185 * This phase is required only when max_retention_duration is defined. We
186 * enter this phase if the retention was previously stopped, and the time
187 * required to advance the non-removable transaction ID in the
188 * RDT_WAIT_FOR_LOCAL_FLUSH phase has decreased to within acceptable limits
189 * (or if max_retention_duration is set to 0). During this phase,
190 * pg_subscription.subretentionactive is updated to true within a new
191 * transaction, and the worker will be restarted.
192 *
193 * The overall state progression is: GET_CANDIDATE_XID ->
194 * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
195 * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
196 * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID.
197 *
198 * Retaining the dead tuples for this period is sufficient for ensuring
199 * eventual consistency using last-update-wins strategy, as dead tuples are
200 * useful for detecting conflicts only during the application of concurrent
201 * transactions from remote nodes. After applying and flushing all remote
202 * transactions that occurred concurrently with the tuple DELETE, any
203 * subsequent UPDATE from a remote node should have a later timestamp. In such
204 * cases, it is acceptable to detect an update_missing scenario and convert the
205 * UPDATE to an INSERT when applying it. But, for concurrent remote
206 * transactions with earlier timestamps than the DELETE, detecting
207 * update_deleted is necessary, as the UPDATEs in remote transactions should be
208 * ignored if their timestamp is earlier than that of the dead tuples.
209 *
210 * Note that advancing the non-removable transaction ID is not supported if the
211 * publisher is also a physical standby. This is because the logical walsender
212 * on the standby can only get the WAL replay position but there may be more
213 * WALs that are being replicated from the primary and those WALs could have
214 * earlier commit timestamp.
215 *
216 * Similarly, when the publisher has subscribed to another publisher,
217 * information necessary for conflict detection cannot be retained for
218 * changes from origins other than the publisher. This is because publisher
219 * lacks the information on concurrent transactions of other publishers to
220 * which it subscribes. As the information on concurrent transactions is
221 * unavailable beyond subscriber's immediate publishers, the non-removable
222 * transaction ID might be advanced prematurely before changes from other
223 * origins have been fully applied.
224 *
225 * XXX Retaining information for changes from other origins might be possible
226 * by requesting the subscription on that origin to enable retain_dead_tuples
227 * and fetching the conflict detection slot.xmin along with the publisher's
228 * status. In the RDT_WAIT_FOR_PUBLISHER_STATUS phase, the apply worker could
229 * wait for the remote slot's xmin to reach the oldest active transaction ID,
230 * ensuring that all transactions from other origins have been applied on the
231 * publisher, thereby getting the latest WAL position that includes all
232 * concurrent changes. However, this approach may impact performance, so it
233 * might not worth the effort.
234 *
235 * XXX It seems feasible to get the latest commit's WAL location from the
236 * publisher and wait till that is applied. However, we can't do that
237 * because commit timestamps can regress as a commit with a later LSN is not
238 * guaranteed to have a later timestamp than those with earlier LSNs. Having
239 * said that, even if that is possible, it won't improve performance much as
240 * the apply always lag and moves slowly as compared with the transactions
241 * on the publisher.
242 *-------------------------------------------------------------------------
243 */
244
245#include "postgres.h"
246
247#include <sys/stat.h>
248#include <unistd.h>
249
250#include "access/genam.h"
251#include "access/commit_ts.h"
252#include "access/table.h"
253#include "access/tableam.h"
254#include "access/tupconvert.h"
255#include "access/twophase.h"
256#include "access/xact.h"
257#include "catalog/indexing.h"
258#include "catalog/pg_inherits.h"
262#include "commands/tablecmds.h"
263#include "commands/trigger.h"
264#include "executor/executor.h"
266#include "libpq/pqformat.h"
267#include "miscadmin.h"
268#include "optimizer/optimizer.h"
270#include "pgstat.h"
271#include "port/pg_bitutils.h"
272#include "postmaster/bgworker.h"
273#include "postmaster/interrupt.h"
274#include "postmaster/walwriter.h"
275#include "replication/conflict.h"
280#include "replication/origin.h"
281#include "replication/slot.h"
285#include "storage/buffile.h"
286#include "storage/ipc.h"
287#include "storage/latch.h"
288#include "storage/lmgr.h"
289#include "storage/procarray.h"
290#include "tcop/tcopprot.h"
291#include "utils/acl.h"
292#include "utils/guc.h"
293#include "utils/inval.h"
294#include "utils/lsyscache.h"
295#include "utils/memutils.h"
296#include "utils/pg_lsn.h"
297#include "utils/rel.h"
298#include "utils/rls.h"
299#include "utils/snapmgr.h"
300#include "utils/syscache.h"
301#include "utils/usercontext.h"
302#include "utils/wait_event.h"
303
304#define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
305
312
314
315typedef struct ApplyExecutionData
316{
317 EState *estate; /* executor state, used to track resources */
318
319 LogicalRepRelMapEntry *targetRel; /* replication target rel */
320 ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
321
322 /* These fields are used when the target relation is partitioned: */
323 ModifyTableState *mtstate; /* dummy ModifyTable state */
324 PartitionTupleRouting *proute; /* partition routing info */
326
327/* Struct for saving and restoring apply errcontext information */
329{
330 LogicalRepMsgType command; /* 0 if invalid */
332
333 /* Remote node information */
334 int remote_attnum; /* -1 if invalid */
339
340/*
341 * The action to be taken for the changes in the transaction.
342 *
343 * TRANS_LEADER_APPLY:
344 * This action means that we are in the leader apply worker or table sync
345 * worker. The changes of the transaction are either directly applied or
346 * are read from temporary files (for streaming transactions) and then
347 * applied by the worker.
348 *
349 * TRANS_LEADER_SERIALIZE:
350 * This action means that we are in the leader apply worker or table sync
351 * worker. Changes are written to temporary files and then applied when the
352 * final commit arrives.
353 *
354 * TRANS_LEADER_SEND_TO_PARALLEL:
355 * This action means that we are in the leader apply worker and need to send
356 * the changes to the parallel apply worker.
357 *
358 * TRANS_LEADER_PARTIAL_SERIALIZE:
359 * This action means that we are in the leader apply worker and have sent some
360 * changes directly to the parallel apply worker and the remaining changes are
361 * serialized to a file, due to timeout while sending data. The parallel apply
362 * worker will apply these serialized changes when the final commit arrives.
363 *
364 * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
365 * serializing changes, the leader worker also needs to serialize the
366 * STREAM_XXX message to a file, and wait for the parallel apply worker to
367 * finish the transaction when processing the transaction finish command. So
368 * this new action was introduced to keep the code and logic clear.
369 *
370 * TRANS_PARALLEL_APPLY:
371 * This action means that we are in the parallel apply worker and changes of
372 * the transaction are applied directly by the worker.
373 */
374typedef enum
375{
376 /* The action for non-streaming transactions. */
378
379 /* Actions for streaming transactions. */
385
386/*
387 * The phases involved in advancing the non-removable transaction ID.
388 *
389 * See comments atop worker.c for details of the transition between these
390 * phases.
391 */
401
402/*
403 * Critical information for managing phase transitions within the
404 * RetainDeadTuplesPhase.
405 */
407{
408 RetainDeadTuplesPhase phase; /* current phase */
409 XLogRecPtr remote_lsn; /* WAL write position on the publisher */
410
411 /*
412 * Oldest transaction ID that was in the commit phase on the publisher.
413 * Use FullTransactionId to prevent issues with transaction ID wraparound,
414 * where a new remote_oldestxid could falsely appear to originate from the
415 * past and block advancement.
416 */
418
419 /*
420 * Next transaction ID to be assigned on the publisher. Use
421 * FullTransactionId for consistency and to allow straightforward
422 * comparisons with remote_oldestxid.
423 */
425
426 TimestampTz reply_time; /* when the publisher responds with status */
427
428 /*
429 * Publisher transaction ID that must be awaited to complete before
430 * entering the final phase (RDT_WAIT_FOR_LOCAL_FLUSH). Use
431 * FullTransactionId for the same reason as remote_nextxid.
432 */
434
435 TransactionId candidate_xid; /* candidate for the non-removable
436 * transaction ID */
437 TimestampTz flushpos_update_time; /* when the remote flush position was
438 * updated in final phase
439 * (RDT_WAIT_FOR_LOCAL_FLUSH) */
440
441 long table_sync_wait_time; /* time spent waiting for table sync
442 * to finish */
443
444 /*
445 * The following fields are used to determine the timing for the next
446 * round of transaction ID advancement.
447 */
448 TimestampTz last_recv_time; /* when the last message was received */
449 TimestampTz candidate_xid_time; /* when the candidate_xid is decided */
450 int xid_advance_interval; /* how much time (ms) to wait before
451 * attempting to advance the
452 * non-removable transaction ID */
454
455/*
456 * The minimum (100ms) and maximum (3 minutes) intervals for advancing
457 * non-removable transaction IDs. The maximum interval is a bit arbitrary but
458 * is sufficient to not cause any undue network traffic.
459 */
460#define MIN_XID_ADVANCE_INTERVAL 100
461#define MAX_XID_ADVANCE_INTERVAL 180000
462
463/* errcontext tracker */
465{
466 .command = 0,
467 .rel = NULL,
468 .remote_attnum = -1,
469 .remote_xid = InvalidTransactionId,
470 .finish_lsn = InvalidXLogRecPtr,
471 .origin_name = NULL,
472};
473
475
478
479/* per stream context for streaming transactions */
481
483
485static bool MySubscriptionValid = false;
486
488
491
492/* fields valid only when processing streamed transaction */
493static bool in_streamed_transaction = false;
494
496
497/*
498 * The number of changes applied by parallel apply worker during one streaming
499 * block.
500 */
502
503/* Are we initializing an apply worker? */
505
506/*
507 * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
508 * the subscription if the remote transaction's finish LSN matches the subskiplsn.
509 * Once we start skipping changes, we don't stop it until we skip all changes of
510 * the transaction even if pg_subscription is updated and MySubscription->skiplsn
511 * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
512 * we don't skip receiving and spooling the changes since we decide whether or not
513 * to skip applying the changes when starting to apply changes. The subskiplsn is
514 * cleared after successfully skipping the transaction or applying non-empty
515 * transaction. The latter prevents the mistakenly specified subskiplsn from
516 * being left. Note that we cannot skip the streaming transactions when using
517 * parallel apply workers because we cannot get the finish LSN before applying
518 * the changes. So, we don't start parallel apply worker when finish LSN is set
519 * by the user.
520 */
522#define is_skipping_changes() (unlikely(XLogRecPtrIsValid(skip_xact_finish_lsn)))
523
524/* BufFile handle of the current streaming file */
526
527/*
528 * The remote WAL position that has been applied and flushed locally. We record
529 * and use this information both while sending feedback to the server and
530 * advancing oldest_nonremovable_xid.
531 */
533
534typedef struct SubXactInfo
535{
536 TransactionId xid; /* XID of the subxact */
537 int fileno; /* file number in the buffile */
538 pgoff_t offset; /* offset in the file */
540
541/* Sub-transaction data for the current streaming transaction */
542typedef struct ApplySubXactData
543{
544 uint32 nsubxacts; /* number of sub-transactions */
545 uint32 nsubxacts_max; /* current capacity of subxacts */
546 TransactionId subxact_last; /* xid of the last sub-transaction */
547 SubXactInfo *subxacts; /* sub-xact offset in changes file */
549
551
552static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
553static inline void changes_filename(char *path, Oid subid, TransactionId xid);
554
555/*
556 * Information about subtransactions of a given toplevel transaction.
557 */
558static void subxact_info_write(Oid subid, TransactionId xid);
559static void subxact_info_read(Oid subid, TransactionId xid);
560static void subxact_info_add(TransactionId xid);
561static inline void cleanup_subxact_info(void);
562
563/*
564 * Serialize and deserialize changes for a toplevel transaction.
565 */
566static void stream_open_file(Oid subid, TransactionId xid,
567 bool first_segment);
568static void stream_write_change(char action, StringInfo s);
569static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
570static void stream_close_file(void);
571
572static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
573
575 bool status_received);
578 bool status_received);
582 bool status_received);
587static bool update_retention_status(bool active);
590 bool new_xid_found);
591
592static void apply_worker_exit(void);
593
602 Oid localindexoid);
606 Oid localindexoid);
608 LogicalRepRelation *remoterel,
612static bool FindDeletedTupleInLocalRel(Relation localrel,
622
623/* Functions for skipping changes */
624static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
625static void stop_skipping_changes(void);
626static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
627
628/* Functions for apply error callback */
629static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
630static inline void reset_apply_error_context_info(void);
631
634
635static void set_wal_receiver_timeout(void);
636
637static void on_exit_clear_xact_state(int code, Datum arg);
638
639/*
640 * Form the origin name for the subscription.
641 *
642 * This is a common function for tablesync and other workers. Tablesync workers
643 * must pass a valid relid. Other callers must pass relid = InvalidOid.
644 *
645 * Return the name in the supplied buffer.
646 */
647void
650{
651 if (OidIsValid(relid))
652 {
653 /* Replication origin name for tablesync workers. */
654 snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
655 }
656 else
657 {
658 /* Replication origin name for non-tablesync workers. */
660 }
661}
662
663/*
664 * Should this worker apply changes for given relation.
665 *
666 * This is mainly needed for initial relation data sync as that runs in
667 * separate worker process running in parallel and we need some way to skip
668 * changes coming to the leader apply worker during the sync of a table.
669 *
670 * Note we need to do smaller or equals comparison for SYNCDONE state because
671 * it might hold position of end of initial slot consistent point WAL
672 * record + 1 (ie start of next record) and next record can be COMMIT of
673 * transaction we are now processing (which is what we set remote_final_lsn
674 * to in apply_handle_begin).
675 *
676 * Note that for streaming transactions that are being applied in the parallel
677 * apply worker, we disallow applying changes if the target table in the
678 * subscription is not in the READY state, because we cannot decide whether to
679 * apply the change as we won't know remote_final_lsn by that time.
680 *
681 * We already checked this in pa_can_start() before assigning the
682 * streaming transaction to the parallel worker, but it also needs to be
683 * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
684 * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
685 * while applying this transaction.
686 */
687static bool
689{
690 switch (MyLogicalRepWorker->type)
691 {
693 return MyLogicalRepWorker->relid == rel->localreloid;
694
696 /* We don't synchronize rel's that are in unknown state. */
697 if (rel->state != SUBREL_STATE_READY &&
701 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
703 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
704
705 return rel->state == SUBREL_STATE_READY;
706
707 case WORKERTYPE_APPLY:
708 return (rel->state == SUBREL_STATE_READY ||
709 (rel->state == SUBREL_STATE_SYNCDONE &&
710 rel->statelsn <= remote_final_lsn));
711
713 /* Should never happen. */
714 elog(ERROR, "sequence synchronization worker is not expected to apply changes");
715 break;
716
718 /* Should never happen. */
719 elog(ERROR, "Unknown worker type");
720 }
721
722 return false; /* dummy for compiler */
723}
724
725/*
726 * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
727 *
728 * Start a transaction, if this is the first step (else we keep using the
729 * existing transaction).
730 * Also provide a global snapshot and ensure we run in ApplyMessageContext.
731 */
732static void
747
748/*
749 * Finish up one step of a replication transaction.
750 * Callers of begin_replication_step() must also call this.
751 *
752 * We don't close out the transaction here, but we should increment
753 * the command counter to make the effects of this step visible.
754 */
755static void
762
763/*
764 * Handle streamed transactions for both the leader apply worker and the
765 * parallel apply workers.
766 *
767 * In the streaming case (receiving a block of the streamed transaction), for
768 * serialize mode, simply redirect it to a file for the proper toplevel
769 * transaction, and for parallel mode, the leader apply worker will send the
770 * changes to parallel apply workers and the parallel apply worker will define
771 * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
772 * messages will be applied by both leader apply worker and parallel apply
773 * workers).
774 *
775 * Returns true for streamed transactions (when the change is either serialized
776 * to file or sent to parallel apply worker), false otherwise (regular mode or
777 * needs to be processed by parallel apply worker).
778 *
779 * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
780 * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
781 * to a parallel apply worker.
782 */
783static bool
785{
790
792
793 /* not in streaming mode */
795 return false;
796
798
799 /*
800 * The parallel apply worker needs the xid in this message to decide
801 * whether to define a savepoint, so save the original message that has
802 * not moved the cursor after the xid. We will serialize this message to a
803 * file in PARTIAL_SERIALIZE mode.
804 */
805 original_msg = *s;
806
807 /*
808 * We should have received XID of the subxact as the first part of the
809 * message, so extract it.
810 */
812
816 errmsg_internal("invalid transaction ID in streamed replication transaction")));
817
818 switch (apply_action)
819 {
822
823 /* Add the new subxact to the array (unless already there). */
825
826 /* Write the change to the current file */
827 stream_write_change(action, s);
828 return true;
829
831 Assert(winfo);
832
833 /*
834 * XXX The publisher side doesn't always send relation/type update
835 * messages after the streaming transaction, so also update the
836 * relation/type in leader apply worker. See function
837 * cleanup_rel_sync_cache.
838 */
839 if (pa_send_data(winfo, s->len, s->data))
840 return (action != LOGICAL_REP_MSG_RELATION &&
841 action != LOGICAL_REP_MSG_TYPE);
842
843 /*
844 * Switch to serialize mode when we are not able to send the
845 * change to parallel apply worker.
846 */
847 pa_switch_to_partial_serialize(winfo, false);
848
852
853 /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
854 return (action != LOGICAL_REP_MSG_RELATION &&
855 action != LOGICAL_REP_MSG_TYPE);
856
859
860 /* Define a savepoint for a subxact if needed. */
862 return false;
863
864 default:
865 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
866 return false; /* silence compiler warning */
867 }
868}
869
870/*
871 * Executor state preparation for evaluation of constraint expressions,
872 * indexes and triggers for the specified relation.
873 *
874 * Note that the caller must open and close any indexes to be updated.
875 */
876static ApplyExecutionData *
878{
880 EState *estate;
882 List *perminfos = NIL;
883 ResultRelInfo *resultRelInfo;
884
886 edata->targetRel = rel;
887
888 edata->estate = estate = CreateExecutorState();
889
891 rte->rtekind = RTE_RELATION;
892 rte->relid = RelationGetRelid(rel->localrel);
893 rte->relkind = rel->localrel->rd_rel->relkind;
894 rte->rellockmode = AccessShareLock;
895
897
900
901 edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
902
903 /*
904 * Use Relation opened by logicalrep_rel_open() instead of opening it
905 * again.
906 */
907 InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
908
909 /*
910 * We put the ResultRelInfo in the es_opened_result_relations list, even
911 * though we don't populate the es_result_relations array. That's a bit
912 * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
913 *
914 * ExecOpenIndices() is not called here either, each execution path doing
915 * an apply operation being responsible for that.
916 */
918 lappend(estate->es_opened_result_relations, resultRelInfo);
919
920 estate->es_output_cid = GetCurrentCommandId(true);
921
922 /* Prepare to catch AFTER triggers. */
924
925 /* other fields of edata remain NULL for now */
926
927 return edata;
928}
929
930/*
931 * Finish any operations related to the executor state created by
932 * create_edata_for_relation().
933 */
934static void
936{
937 EState *estate = edata->estate;
938
939 /* Handle any queued AFTER triggers. */
940 AfterTriggerEndQuery(estate);
941
942 /* Shut down tuple routing, if any was done. */
943 if (edata->proute)
944 ExecCleanupTupleRouting(edata->mtstate, edata->proute);
945
946 /*
947 * Cleanup. It might seem that we should call ExecCloseResultRelations()
948 * here, but we intentionally don't. It would close the rel we added to
949 * es_opened_result_relations above, which is wrong because we took no
950 * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
951 * any other relations opened during execution.
952 */
953 ExecResetTupleTable(estate->es_tupleTable, false);
954 FreeExecutorState(estate);
955 pfree(edata);
956}
957
958/*
959 * Executes default values for columns for which we can't map to remote
960 * relation columns.
961 *
962 * This allows us to support tables which have more columns on the downstream
963 * than on the upstream.
964 */
965static void
967 TupleTableSlot *slot)
968{
970 int num_phys_attrs = desc->natts;
971 int i;
972 int attnum,
973 num_defaults = 0;
974 int *defmap;
975 ExprState **defexprs;
976 ExprContext *econtext;
977
978 econtext = GetPerTupleExprContext(estate);
979
980 /* We got all the data via replication, no need to evaluate anything. */
981 if (num_phys_attrs == rel->remoterel.natts)
982 return;
983
984 defmap = palloc_array(int, num_phys_attrs);
986
988 for (attnum = 0; attnum < num_phys_attrs; attnum++)
989 {
991 Expr *defexpr;
992
993 if (cattr->attisdropped || cattr->attgenerated)
994 continue;
995
996 if (rel->attrmap->attnums[attnum] >= 0)
997 continue;
998
999 defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
1000
1001 if (defexpr != NULL)
1002 {
1003 /* Run the expression through planner */
1004 defexpr = expression_planner(defexpr);
1005
1006 /* Initialize executable expression in copycontext */
1007 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
1008 defmap[num_defaults] = attnum;
1009 num_defaults++;
1010 }
1011 }
1012
1013 for (i = 0; i < num_defaults; i++)
1014 slot->tts_values[defmap[i]] =
1015 ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
1016}
1017
1018/*
1019 * Store tuple data into slot.
1020 *
1021 * Incoming data can be either text or binary format.
1022 */
1023static void
1026{
1027 int natts = slot->tts_tupleDescriptor->natts;
1028 int i;
1029
1030 ExecClearTuple(slot);
1031
1032 /* Call the "in" function for each non-dropped, non-null attribute */
1033 Assert(natts == rel->attrmap->maplen);
1034 for (i = 0; i < natts; i++)
1035 {
1037 int remoteattnum = rel->attrmap->attnums[i];
1038
1039 if (!att->attisdropped && remoteattnum >= 0)
1040 {
1042
1043 if (remoteattnum >= tupleData->ncols)
1044 ereport(ERROR,
1046 errmsg("logical replication column %d not found in tuple: only %d column(s) received",
1047 remoteattnum + 1, tupleData->ncols)));
1048
1049 colvalue = &tupleData->colvalues[remoteattnum];
1050
1051 /* Set attnum for error callback */
1053
1055 {
1056 Oid typinput;
1057 Oid typioparam;
1058
1059 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1060 slot->tts_values[i] =
1062 typioparam, att->atttypmod);
1063 slot->tts_isnull[i] = false;
1064 }
1065 else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1066 {
1067 Oid typreceive;
1068 Oid typioparam;
1069
1070 /*
1071 * In some code paths we may be asked to re-parse the same
1072 * tuple data. Reset the StringInfo's cursor so that works.
1073 */
1074 colvalue->cursor = 0;
1075
1076 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1077 slot->tts_values[i] =
1078 OidReceiveFunctionCall(typreceive, colvalue,
1079 typioparam, att->atttypmod);
1080
1081 /* Trouble if it didn't eat the whole buffer */
1082 if (colvalue->cursor != colvalue->len)
1083 ereport(ERROR,
1085 errmsg("incorrect binary data format in logical replication column %d",
1086 remoteattnum + 1)));
1087 slot->tts_isnull[i] = false;
1088 }
1089 else
1090 {
1091 /*
1092 * NULL value from remote. (We don't expect to see
1093 * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
1094 * NULL.)
1095 */
1096 slot->tts_values[i] = (Datum) 0;
1097 slot->tts_isnull[i] = true;
1098 }
1099
1100 /* Reset attnum for error callback */
1102 }
1103 else
1104 {
1105 /*
1106 * We assign NULL to dropped attributes and missing values
1107 * (missing values should be later filled using
1108 * slot_fill_defaults).
1109 */
1110 slot->tts_values[i] = (Datum) 0;
1111 slot->tts_isnull[i] = true;
1112 }
1113 }
1114
1116}
1117
1118/*
1119 * Replace updated columns with data from the LogicalRepTupleData struct.
1120 * This is somewhat similar to heap_modify_tuple but also calls the type
1121 * input functions on the user data.
1122 *
1123 * "slot" is filled with a copy of the tuple in "srcslot", replacing
1124 * columns provided in "tupleData" and leaving others as-is.
1125 *
1126 * Caution: unreplaced pass-by-ref columns in "slot" will point into the
1127 * storage for "srcslot". This is OK for current usage, but someday we may
1128 * need to materialize "slot" at the end to make it independent of "srcslot".
1129 */
1130static void
1134{
1135 int natts = slot->tts_tupleDescriptor->natts;
1136 int i;
1137
1138 /* We'll fill "slot" with a virtual tuple, so we must start with ... */
1139 ExecClearTuple(slot);
1140
1141 /*
1142 * Copy all the column data from srcslot, so that we'll have valid values
1143 * for unreplaced columns.
1144 */
1145 Assert(natts == srcslot->tts_tupleDescriptor->natts);
1147 memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
1148 memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
1149
1150 /* Call the "in" function for each replaced attribute */
1151 Assert(natts == rel->attrmap->maplen);
1152 for (i = 0; i < natts; i++)
1153 {
1155 int remoteattnum = rel->attrmap->attnums[i];
1156
1157 if (remoteattnum < 0)
1158 continue;
1159
1160 if (remoteattnum >= tupleData->ncols)
1161 ereport(ERROR,
1163 errmsg("logical replication column %d not found in tuple: only %d column(s) received",
1164 remoteattnum + 1, tupleData->ncols)));
1165
1167 {
1169
1170 /* Set attnum for error callback */
1172
1174 {
1175 Oid typinput;
1176 Oid typioparam;
1177
1178 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1179 slot->tts_values[i] =
1181 typioparam, att->atttypmod);
1182 slot->tts_isnull[i] = false;
1183 }
1184 else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1185 {
1186 Oid typreceive;
1187 Oid typioparam;
1188
1189 /*
1190 * In some code paths we may be asked to re-parse the same
1191 * tuple data. Reset the StringInfo's cursor so that works.
1192 */
1193 colvalue->cursor = 0;
1194
1195 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1196 slot->tts_values[i] =
1197 OidReceiveFunctionCall(typreceive, colvalue,
1198 typioparam, att->atttypmod);
1199
1200 /* Trouble if it didn't eat the whole buffer */
1201 if (colvalue->cursor != colvalue->len)
1202 ereport(ERROR,
1204 errmsg("incorrect binary data format in logical replication column %d",
1205 remoteattnum + 1)));
1206 slot->tts_isnull[i] = false;
1207 }
1208 else
1209 {
1210 /* must be LOGICALREP_COLUMN_NULL */
1211 slot->tts_values[i] = (Datum) 0;
1212 slot->tts_isnull[i] = true;
1213 }
1214
1215 /* Reset attnum for error callback */
1217 }
1218 }
1219
1220 /* And finally, declare that "slot" contains a valid virtual tuple */
1222}
1223
1224/*
1225 * Handle BEGIN message.
1226 */
1227static void
1229{
1231
1232 /* There must not be an active streaming transaction. */
1234
1237
1238 remote_final_lsn = begin_data.final_lsn;
1239
1241
1242 in_remote_transaction = true;
1243
1245}
1246
1247/*
1248 * Handle COMMIT message.
1249 *
1250 * TODO, support tracking of multiple origins
1251 */
1252static void
1254{
1256
1258
1259 if (commit_data.commit_lsn != remote_final_lsn)
1260 ereport(ERROR,
1262 errmsg_internal("incorrect commit LSN %X/%08X in commit message (expected %X/%08X)",
1263 LSN_FORMAT_ARGS(commit_data.commit_lsn),
1265
1267
1268 /*
1269 * Process any tables that are being synchronized in parallel, as well as
1270 * any newly added tables or sequences.
1271 */
1273
1276}
1277
1278/*
1279 * Handle BEGIN PREPARE message.
1280 */
1281static void
1283{
1285
1286 /* Tablesync should never receive prepare. */
1287 if (am_tablesync_worker())
1288 ereport(ERROR,
1290 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1291
1292 /* There must not be an active streaming transaction. */
1294
1297
1298 remote_final_lsn = begin_data.prepare_lsn;
1299
1301
1302 in_remote_transaction = true;
1303
1305}
1306
1307/*
1308 * Common function to prepare the GID.
1309 */
1310static void
1312{
1313 char gid[GIDSIZE];
1314
1315 /*
1316 * Compute unique GID for two_phase transactions. We don't use GID of
1317 * prepared transaction sent by server as that can lead to deadlock when
1318 * we have multiple subscriptions from same node point to publications on
1319 * the same node. See comments atop worker.c
1320 */
1322 gid, sizeof(gid));
1323
1324 /*
1325 * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1326 * called within the PrepareTransactionBlock below.
1327 */
1328 if (!IsTransactionBlock())
1329 {
1331 CommitTransactionCommand(); /* Completes the preceding Begin command. */
1332 }
1333
1334 /*
1335 * Update origin state so we can restart streaming from correct position
1336 * in case of crash.
1337 */
1340
1342}
1343
1344/*
1345 * Handle PREPARE message.
1346 */
1347static void
1349{
1351
1353
1354 if (prepare_data.prepare_lsn != remote_final_lsn)
1355 ereport(ERROR,
1357 errmsg_internal("incorrect prepare LSN %X/%08X in prepare message (expected %X/%08X)",
1358 LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1360
1361 /*
1362 * Unlike commit, here, we always prepare the transaction even though no
1363 * change has happened in this transaction or all changes are skipped. It
1364 * is done this way because at commit prepared time, we won't know whether
1365 * we have skipped preparing a transaction because of those reasons.
1366 *
1367 * XXX, We can optimize such that at commit prepared time, we first check
1368 * whether we have prepared the transaction or not but that doesn't seem
1369 * worthwhile because such cases shouldn't be common.
1370 */
1372
1374
1377 pgstat_report_stat(false);
1378
1379 /*
1380 * It is okay not to set the local_end LSN for the prepare because we
1381 * always flush the prepare record. So, we can send the acknowledgment of
1382 * the remote_end LSN as soon as prepare is finished.
1383 *
1384 * XXX For the sake of consistency with commit, we could have set it with
1385 * the LSN of prepare but as of now we don't track that value similar to
1386 * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
1387 * it.
1388 */
1390
1391 in_remote_transaction = false;
1392
1393 /*
1394 * Process any tables that are being synchronized in parallel, as well as
1395 * any newly added tables or sequences.
1396 */
1398
1399 /*
1400 * Since we have already prepared the transaction, in a case where the
1401 * server crashes before clearing the subskiplsn, it will be left but the
1402 * transaction won't be resent. But that's okay because it's a rare case
1403 * and the subskiplsn will be cleared when finishing the next transaction.
1404 */
1407
1410}
1411
1412/*
1413 * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1414 *
1415 * Note that we don't need to wait here if the transaction was prepared in a
1416 * parallel apply worker. In that case, we have already waited for the prepare
1417 * to finish in apply_handle_stream_prepare() which will ensure all the
1418 * operations in that transaction have happened in the subscriber, so no
1419 * concurrent transaction can cause deadlock or transaction dependency issues.
1420 */
1421static void
1423{
1425 char gid[GIDSIZE];
1426
1429
1430 /* Compute GID for two_phase transactions. */
1432 gid, sizeof(gid));
1433
1434 /* There is no transaction when COMMIT PREPARED is called */
1436
1437 /*
1438 * Update origin state so we can restart streaming from correct position
1439 * in case of crash.
1440 */
1443
1444 FinishPreparedTransaction(gid, true);
1447 pgstat_report_stat(false);
1448
1450 in_remote_transaction = false;
1451
1452 /*
1453 * Process any tables that are being synchronized in parallel, as well as
1454 * any newly added tables or sequences.
1455 */
1457
1459
1462}
1463
1464/*
1465 * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1466 *
1467 * Note that we don't need to wait here if the transaction was prepared in a
1468 * parallel apply worker. In that case, we have already waited for the prepare
1469 * to finish in apply_handle_stream_prepare() which will ensure all the
1470 * operations in that transaction have happened in the subscriber, so no
1471 * concurrent transaction can cause deadlock or transaction dependency issues.
1472 */
1473static void
1475{
1477 char gid[GIDSIZE];
1478
1481
1482 /* Compute GID for two_phase transactions. */
1484 gid, sizeof(gid));
1485
1486 /*
1487 * It is possible that we haven't received prepare because it occurred
1488 * before walsender reached a consistent point or the two_phase was still
1489 * not enabled by that time, so in such cases, we need to skip rollback
1490 * prepared.
1491 */
1492 if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1493 rollback_data.prepare_time))
1494 {
1495 /*
1496 * Update origin state so we can restart streaming from correct
1497 * position in case of crash.
1498 */
1501
1502 /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1504 FinishPreparedTransaction(gid, false);
1507
1509 }
1510
1511 pgstat_report_stat(false);
1512
1513 /*
1514 * It is okay not to set the local_end LSN for the rollback of prepared
1515 * transaction because we always flush the WAL record for it. See
1516 * apply_handle_prepare.
1517 */
1519 in_remote_transaction = false;
1520
1521 /*
1522 * Process any tables that are being synchronized in parallel, as well as
1523 * any newly added tables or sequences.
1524 */
1525 ProcessSyncingRelations(rollback_data.rollback_end_lsn);
1526
1529}
1530
1531/*
1532 * Handle STREAM PREPARE.
1533 */
1534static void
1536{
1540
1541 /* Save the message before it is consumed. */
1543
1545 ereport(ERROR,
1547 errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1548
1549 /* Tablesync should never receive prepare. */
1550 if (am_tablesync_worker())
1551 ereport(ERROR,
1553 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1554
1557
1559
1560 switch (apply_action)
1561 {
1562 case TRANS_LEADER_APPLY:
1563
1564 /*
1565 * The transaction has been serialized to file, so replay all the
1566 * spooled operations.
1567 */
1569 prepare_data.xid, prepare_data.prepare_lsn);
1570
1571 /* Mark the transaction as prepared. */
1573
1575
1576 /*
1577 * It is okay not to set the local_end LSN for the prepare because
1578 * we always flush the prepare record. See apply_handle_prepare.
1579 */
1581
1582 in_remote_transaction = false;
1583
1584 /* Unlink the files with serialized changes and subxact info. */
1586
1587 elog(DEBUG1, "finished processing the STREAM PREPARE command");
1588 break;
1589
1591 Assert(winfo);
1592
1593 if (pa_send_data(winfo, s->len, s->data))
1594 {
1595 /* Finish processing the streaming transaction. */
1596 pa_xact_finish(winfo, prepare_data.end_lsn);
1597 break;
1598 }
1599
1600 /*
1601 * Switch to serialize mode when we are not able to send the
1602 * change to parallel apply worker.
1603 */
1604 pa_switch_to_partial_serialize(winfo, true);
1605
1608 Assert(winfo);
1609
1612 &original_msg);
1613
1615
1616 /* Finish processing the streaming transaction. */
1617 pa_xact_finish(winfo, prepare_data.end_lsn);
1618 break;
1619
1621
1622 /*
1623 * If the parallel apply worker is applying spooled messages then
1624 * close the file before preparing.
1625 */
1626 if (stream_fd)
1628
1630
1631 /* Mark the transaction as prepared. */
1633
1635
1637
1638 /*
1639 * It is okay not to set the local_end LSN for the prepare because
1640 * we always flush the prepare record. See apply_handle_prepare.
1641 */
1643
1646
1648
1649 elog(DEBUG1, "finished processing the STREAM PREPARE command");
1650 break;
1651
1652 default:
1653 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1654 break;
1655 }
1656
1657 pgstat_report_stat(false);
1658
1659 /*
1660 * Process any tables that are being synchronized in parallel, as well as
1661 * any newly added tables or sequences.
1662 */
1664
1665 /*
1666 * Similar to prepare case, the subskiplsn could be left in a case of
1667 * server crash but it's okay. See the comments in apply_handle_prepare().
1668 */
1671
1673
1675}
1676
1677/*
1678 * Handle ORIGIN message.
1679 *
1680 * TODO, support tracking of multiple origins
1681 */
1682static void
1684{
1685 /*
1686 * ORIGIN message can only come inside streaming transaction or inside
1687 * remote transaction and before any actual writes.
1688 */
1692 ereport(ERROR,
1694 errmsg_internal("ORIGIN message sent out of order")));
1695}
1696
1697/*
1698 * Initialize fileset (if not already done).
1699 *
1700 * Create a new file when first_segment is true, otherwise open the existing
1701 * file.
1702 */
1703void
1705{
1707
1708 /*
1709 * Initialize the worker's stream_fileset if we haven't yet. This will be
1710 * used for the entire duration of the worker so create it in a permanent
1711 * context. We create this on the very first streaming message from any
1712 * transaction and then use it for this and other streaming transactions.
1713 * Now, we could create a fileset at the start of the worker as well but
1714 * then we won't be sure that it will ever be used.
1715 */
1717 {
1719
1721
1724
1726 }
1727
1728 /* Open the spool file for this transaction. */
1730
1731 /* If this is not the first segment, open existing subxact file. */
1732 if (!first_segment)
1734
1736}
1737
1738/*
1739 * Handle STREAM START message.
1740 */
1741static void
1743{
1744 bool first_segment;
1747
1748 /* Save the message before it is consumed. */
1750
1752 ereport(ERROR,
1754 errmsg_internal("duplicate STREAM START message")));
1755
1756 /* There must not be an active streaming transaction. */
1758
1759 /* notify handle methods we're processing a remote transaction */
1761
1762 /* extract XID of the top-level transaction */
1764
1766 ereport(ERROR,
1768 errmsg_internal("invalid transaction ID in streamed replication transaction")));
1769
1771
1772 /* Try to allocate a worker for the streaming transaction. */
1773 if (first_segment)
1775
1777
1778 switch (apply_action)
1779 {
1781
1782 /*
1783 * Function stream_start_internal starts a transaction. This
1784 * transaction will be committed on the stream stop unless it is a
1785 * tablesync worker in which case it will be committed after
1786 * processing all the messages. We need this transaction for
1787 * handling the BufFile, used for serializing the streaming data
1788 * and subxact info.
1789 */
1791 break;
1792
1794 Assert(winfo);
1795
1796 /*
1797 * Once we start serializing the changes, the parallel apply
1798 * worker will wait for the leader to release the stream lock
1799 * until the end of the transaction. So, we don't need to release
1800 * the lock or increment the stream count in that case.
1801 */
1802 if (pa_send_data(winfo, s->len, s->data))
1803 {
1804 /*
1805 * Unlock the shared object lock so that the parallel apply
1806 * worker can continue to receive changes.
1807 */
1808 if (!first_segment)
1810
1811 /*
1812 * Increment the number of streaming blocks waiting to be
1813 * processed by parallel apply worker.
1814 */
1816
1817 /* Cache the parallel apply worker for this transaction. */
1819 break;
1820 }
1821
1822 /*
1823 * Switch to serialize mode when we are not able to send the
1824 * change to parallel apply worker.
1825 */
1827
1830 Assert(winfo);
1831
1832 /*
1833 * Open the spool file unless it was already opened when switching
1834 * to serialize mode. The transaction started in
1835 * stream_start_internal will be committed on the stream stop.
1836 */
1839
1841
1842 /* Cache the parallel apply worker for this transaction. */
1844 break;
1845
1847 if (first_segment)
1848 {
1849 /* Hold the lock until the end of the transaction. */
1852
1853 /*
1854 * Signal the leader apply worker, as it may be waiting for
1855 * us.
1856 */
1859 }
1860
1862 break;
1863
1864 default:
1865 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1866 break;
1867 }
1868
1870}
1871
1872/*
1873 * Update the information about subxacts and close the file.
1874 *
1875 * This function should be called when the stream_start_internal function has
1876 * been called.
1877 */
1878void
1880{
1881 /*
1882 * Serialize information about subxacts for the toplevel transaction, then
1883 * close the stream messages spool file.
1884 */
1887
1888 /* We must be in a valid transaction state */
1890
1891 /* Commit the per-stream transaction */
1893
1894 /* Reset per-stream context */
1896}
1897
1898/*
1899 * Handle STREAM STOP message.
1900 */
1901static void
1903{
1906
1908 ereport(ERROR,
1910 errmsg_internal("STREAM STOP message without STREAM START")));
1911
1913
1914 switch (apply_action)
1915 {
1918 break;
1919
1921 Assert(winfo);
1922
1923 /*
1924 * Lock before sending the STREAM_STOP message so that the leader
1925 * can hold the lock first and the parallel apply worker will wait
1926 * for leader to release the lock. See Locking Considerations atop
1927 * applyparallelworker.c.
1928 */
1930
1931 if (pa_send_data(winfo, s->len, s->data))
1932 {
1934 break;
1935 }
1936
1937 /*
1938 * Switch to serialize mode when we are not able to send the
1939 * change to parallel apply worker.
1940 */
1941 pa_switch_to_partial_serialize(winfo, true);
1942
1948 break;
1949
1951 elog(DEBUG1, "applied %u changes in the streaming chunk",
1953
1954 /*
1955 * By the time parallel apply worker is processing the changes in
1956 * the current streaming block, the leader apply worker may have
1957 * sent multiple streaming blocks. This can lead to parallel apply
1958 * worker start waiting even when there are more chunk of streams
1959 * in the queue. So, try to lock only if there is no message left
1960 * in the queue. See Locking Considerations atop
1961 * applyparallelworker.c.
1962 *
1963 * Note that here we have a race condition where we can start
1964 * waiting even when there are pending streaming chunks. This can
1965 * happen if the leader sends another streaming block and acquires
1966 * the stream lock again after the parallel apply worker checks
1967 * that there is no pending streaming block and before it actually
1968 * starts waiting on a lock. We can handle this case by not
1969 * allowing the leader to increment the stream block count during
1970 * the time parallel apply worker acquires the lock but it is not
1971 * clear whether that is worth the complexity.
1972 *
1973 * Now, if this missed chunk contains rollback to savepoint, then
1974 * there is a risk of deadlock which probably shouldn't happen
1975 * after restart.
1976 */
1978 break;
1979
1980 default:
1981 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1982 break;
1983 }
1984
1987
1988 /*
1989 * The parallel apply worker could be in a transaction in which case we
1990 * need to report the state as STATE_IDLEINTRANSACTION.
1991 */
1994 else
1996
1998}
1999
2000/*
2001 * Helper function to handle STREAM ABORT message when the transaction was
2002 * serialized to file.
2003 */
2004static void
2006{
2007 /*
2008 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
2009 * just delete the files with serialized info.
2010 */
2011 if (xid == subxid)
2013 else
2014 {
2015 /*
2016 * OK, so it's a subxact. We need to read the subxact file for the
2017 * toplevel transaction, determine the offset tracked for the subxact,
2018 * and truncate the file with changes. We also remove the subxacts
2019 * with higher offsets (or rather higher XIDs).
2020 *
2021 * We intentionally scan the array from the tail, because we're likely
2022 * aborting a change for the most recent subtransactions.
2023 *
2024 * We can't use the binary search here as subxact XIDs won't
2025 * necessarily arrive in sorted order, consider the case where we have
2026 * released the savepoint for multiple subtransactions and then
2027 * performed rollback to savepoint for one of the earlier
2028 * sub-transaction.
2029 */
2030 int64 i;
2031 int64 subidx;
2032 BufFile *fd;
2033 bool found = false;
2034 char path[MAXPGPATH];
2035
2036 subidx = -1;
2039
2040 for (i = subxact_data.nsubxacts; i > 0; i--)
2041 {
2042 if (subxact_data.subxacts[i - 1].xid == subxid)
2043 {
2044 subidx = (i - 1);
2045 found = true;
2046 break;
2047 }
2048 }
2049
2050 /*
2051 * If it's an empty sub-transaction then we will not find the subxid
2052 * here so just cleanup the subxact info and return.
2053 */
2054 if (!found)
2055 {
2056 /* Cleanup the subxact info */
2060 return;
2061 }
2062
2063 /* open the changes file */
2066 O_RDWR, false);
2067
2068 /* OK, truncate the file at the right offset */
2072
2073 /* discard the subxacts added later */
2075
2076 /* write the updated subxact list */
2078
2081 }
2082}
2083
2084/*
2085 * Handle STREAM ABORT message.
2086 */
2087static void
2089{
2090 TransactionId xid;
2091 TransactionId subxid;
2095
2096 /* Save the message before it is consumed. */
2098 bool toplevel_xact;
2099
2101 ereport(ERROR,
2103 errmsg_internal("STREAM ABORT message without STREAM STOP")));
2104
2105 /* We receive abort information only when we can apply in parallel. */
2108
2109 xid = abort_data.xid;
2110 subxid = abort_data.subxid;
2111 toplevel_xact = (xid == subxid);
2112
2113 set_apply_error_context_xact(subxid, abort_data.abort_lsn);
2114
2116
2117 switch (apply_action)
2118 {
2119 case TRANS_LEADER_APPLY:
2120
2121 /*
2122 * We are in the leader apply worker and the transaction has been
2123 * serialized to file.
2124 */
2125 stream_abort_internal(xid, subxid);
2126
2127 elog(DEBUG1, "finished processing the STREAM ABORT command");
2128 break;
2129
2131 Assert(winfo);
2132
2133 /*
2134 * For the case of aborting the subtransaction, we increment the
2135 * number of streaming blocks and take the lock again before
2136 * sending the STREAM_ABORT to ensure that the parallel apply
2137 * worker will wait on the lock for the next set of changes after
2138 * processing the STREAM_ABORT message if it is not already
2139 * waiting for STREAM_STOP message.
2140 *
2141 * It is important to perform this locking before sending the
2142 * STREAM_ABORT message so that the leader can hold the lock first
2143 * and the parallel apply worker will wait for the leader to
2144 * release the lock. This is the same as what we do in
2145 * apply_handle_stream_stop. See Locking Considerations atop
2146 * applyparallelworker.c.
2147 */
2148 if (!toplevel_xact)
2149 {
2153 }
2154
2155 if (pa_send_data(winfo, s->len, s->data))
2156 {
2157 /*
2158 * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
2159 * wait here for the parallel apply worker to finish as that
2160 * is not required to maintain the commit order and won't have
2161 * the risk of failures due to transaction dependencies and
2162 * deadlocks. However, it is possible that before the parallel
2163 * worker finishes and we clear the worker info, the xid
2164 * wraparound happens on the upstream and a new transaction
2165 * with the same xid can appear and that can lead to duplicate
2166 * entries in ParallelApplyTxnHash. Yet another problem could
2167 * be that we may have serialized the changes in partial
2168 * serialize mode and the file containing xact changes may
2169 * already exist, and after xid wraparound trying to create
2170 * the file for the same xid can lead to an error. To avoid
2171 * these problems, we decide to wait for the aborts to finish.
2172 *
2173 * Note, it is okay to not update the flush location position
2174 * for aborts as in worst case that means such a transaction
2175 * won't be sent again after restart.
2176 */
2177 if (toplevel_xact)
2179
2180 break;
2181 }
2182
2183 /*
2184 * Switch to serialize mode when we are not able to send the
2185 * change to parallel apply worker.
2186 */
2187 pa_switch_to_partial_serialize(winfo, true);
2188
2191 Assert(winfo);
2192
2193 /*
2194 * Parallel apply worker might have applied some changes, so write
2195 * the STREAM_ABORT message so that it can rollback the
2196 * subtransaction if needed.
2197 */
2199 &original_msg);
2200
2201 if (toplevel_xact)
2202 {
2205 }
2206 break;
2207
2209
2210 /*
2211 * If the parallel apply worker is applying spooled messages then
2212 * close the file before aborting.
2213 */
2214 if (toplevel_xact && stream_fd)
2216
2218
2219 /*
2220 * We need to wait after processing rollback to savepoint for the
2221 * next set of changes.
2222 *
2223 * We have a race condition here due to which we can start waiting
2224 * here when there are more chunk of streams in the queue. See
2225 * apply_handle_stream_stop.
2226 */
2227 if (!toplevel_xact)
2229
2230 elog(DEBUG1, "finished processing the STREAM ABORT command");
2231 break;
2232
2233 default:
2234 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2235 break;
2236 }
2237
2239}
2240
2241/*
2242 * Ensure that the passed location is fileset's end.
2243 */
2244static void
2245ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
2246 pgoff_t offset)
2247{
2248 char path[MAXPGPATH];
2249 BufFile *fd;
2250 int last_fileno;
2252
2254
2256
2258
2259 fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2260
2261 BufFileSeek(fd, 0, 0, SEEK_END);
2263
2265
2267
2268 if (last_fileno != fileno || last_offset != offset)
2269 elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2270 path);
2271}
2272
2273/*
2274 * Common spoolfile processing.
2275 */
2276void
2278 XLogRecPtr lsn)
2279{
2280 int nchanges;
2281 char path[MAXPGPATH];
2282 char *buffer = NULL;
2284 ResourceOwner oldowner;
2285 int fileno;
2286 pgoff_t offset;
2287
2290
2291 /* Make sure we have an open transaction */
2293
2294 /*
2295 * Allocate file handle and memory required to process all the messages in
2296 * TopTransactionContext to avoid them getting reset after each message is
2297 * processed.
2298 */
2300
2301 /* Open the spool file for the committed/prepared transaction */
2303 elog(DEBUG1, "replaying changes from file \"%s\"", path);
2304
2305 /*
2306 * Make sure the file is owned by the toplevel transaction so that the
2307 * file will not be accidentally closed when aborting a subtransaction.
2308 */
2309 oldowner = CurrentResourceOwner;
2311
2312 stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2313
2314 CurrentResourceOwner = oldowner;
2315
2316 buffer = palloc(BLCKSZ);
2317
2319
2320 remote_final_lsn = lsn;
2321
2322 /*
2323 * Make sure the handle apply_dispatch methods are aware we're in a remote
2324 * transaction.
2325 */
2326 in_remote_transaction = true;
2328
2330
2331 /*
2332 * Read the entries one by one and pass them through the same logic as in
2333 * apply_dispatch.
2334 */
2335 nchanges = 0;
2336 while (true)
2337 {
2339 size_t nbytes;
2340 int len;
2341
2343
2344 /* read length of the on-disk record */
2345 nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2346
2347 /* have we reached end of the file? */
2348 if (nbytes == 0)
2349 break;
2350
2351 /* do we have a correct length? */
2352 if (len <= 0)
2353 elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2354 len, path);
2355
2356 /* make sure we have sufficiently large buffer */
2357 buffer = repalloc(buffer, len);
2358
2359 /* and finally read the data into the buffer */
2360 BufFileReadExact(stream_fd, buffer, len);
2361
2362 BufFileTell(stream_fd, &fileno, &offset);
2363
2364 /* init a stringinfo using the buffer and call apply_dispatch */
2365 initReadOnlyStringInfo(&s2, buffer, len);
2366
2367 /* Ensure we are reading the data into our memory context. */
2369
2371
2373
2375
2376 nchanges++;
2377
2378 /*
2379 * It is possible the file has been closed because we have processed
2380 * the transaction end message like stream_commit in which case that
2381 * must be the last message.
2382 */
2383 if (!stream_fd)
2384 {
2385 ensure_last_message(stream_fileset, xid, fileno, offset);
2386 break;
2387 }
2388
2389 if (nchanges % 1000 == 0)
2390 elog(DEBUG1, "replayed %d changes from file \"%s\"",
2391 nchanges, path);
2392 }
2393
2394 if (stream_fd)
2396
2397 elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2398 nchanges, path);
2399
2400 return;
2401}
2402
2403/*
2404 * Handle STREAM COMMIT message.
2405 */
2406static void
2408{
2409 TransactionId xid;
2413
2414 /* Save the message before it is consumed. */
2416
2418 ereport(ERROR,
2420 errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2421
2424
2426
2427 switch (apply_action)
2428 {
2429 case TRANS_LEADER_APPLY:
2430
2431 /*
2432 * The transaction has been serialized to file, so replay all the
2433 * spooled operations.
2434 */
2436 commit_data.commit_lsn);
2437
2439
2440 /* Unlink the files with serialized changes and subxact info. */
2442
2443 elog(DEBUG1, "finished processing the STREAM COMMIT command");
2444 break;
2445
2447 Assert(winfo);
2448
2449 if (pa_send_data(winfo, s->len, s->data))
2450 {
2451 /* Finish processing the streaming transaction. */
2452 pa_xact_finish(winfo, commit_data.end_lsn);
2453 break;
2454 }
2455
2456 /*
2457 * Switch to serialize mode when we are not able to send the
2458 * change to parallel apply worker.
2459 */
2460 pa_switch_to_partial_serialize(winfo, true);
2461
2464 Assert(winfo);
2465
2467 &original_msg);
2468
2470
2471 /* Finish processing the streaming transaction. */
2472 pa_xact_finish(winfo, commit_data.end_lsn);
2473 break;
2474
2476
2477 /*
2478 * If the parallel apply worker is applying spooled messages then
2479 * close the file before committing.
2480 */
2481 if (stream_fd)
2483
2485
2487
2488 /*
2489 * It is important to set the transaction state as finished before
2490 * releasing the lock. See pa_wait_for_xact_finish.
2491 */
2494
2496
2497 elog(DEBUG1, "finished processing the STREAM COMMIT command");
2498 break;
2499
2500 default:
2501 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2502 break;
2503 }
2504
2505 /*
2506 * Process any tables that are being synchronized in parallel, as well as
2507 * any newly added tables or sequences.
2508 */
2510
2512
2514}
2515
2516/*
2517 * Helper function for apply_handle_commit and apply_handle_stream_commit.
2518 */
2519static void
2521{
2522 if (is_skipping_changes())
2523 {
2525
2526 /*
2527 * Start a new transaction to clear the subskiplsn, if not started
2528 * yet.
2529 */
2530 if (!IsTransactionState())
2532 }
2533
2534 if (IsTransactionState())
2535 {
2536 /*
2537 * The transaction is either non-empty or skipped, so we clear the
2538 * subskiplsn.
2539 */
2541
2542 /*
2543 * Update origin state so we can restart streaming from correct
2544 * position in case of crash.
2545 */
2548
2550
2551 if (IsTransactionBlock())
2552 {
2553 EndTransactionBlock(false);
2555 }
2556
2557 pgstat_report_stat(false);
2558
2560 }
2561 else
2562 {
2563 /* Process any invalidation messages that might have accumulated. */
2566 }
2567
2568 in_remote_transaction = false;
2569}
2570
2571/*
2572 * Handle RELATION message.
2573 *
2574 * Note we don't do validation against local schema here. The validation
2575 * against local schema is postponed until first change for given relation
2576 * comes as we only care about it when applying changes for it anyway and we
2577 * do less locking this way.
2578 */
2579static void
2581{
2582 LogicalRepRelation *rel;
2583
2585 return;
2586
2587 rel = logicalrep_read_rel(s);
2589
2590 /* Also reset all entries in the partition map that refer to remoterel. */
2592}
2593
2594/*
2595 * Handle TYPE message.
2596 *
2597 * This implementation pays no attention to TYPE messages; we expect the user
2598 * to have set things up so that the incoming data is acceptable to the input
2599 * functions for the locally subscribed tables. Hence, we just read and
2600 * discard the message.
2601 */
2602static void
2612
2613/*
2614 * Check that we (the subscription owner) have sufficient privileges on the
2615 * target relation to perform the given operation.
2616 */
2617static void
2619{
2620 Oid relid;
2622
2623 relid = RelationGetRelid(rel);
2625 if (aclresult != ACLCHECK_OK)
2627 get_relkind_objtype(rel->rd_rel->relkind),
2628 get_rel_name(relid));
2629
2630 /*
2631 * We lack the infrastructure to honor RLS policies. It might be possible
2632 * to add such infrastructure here, but tablesync workers lack it, too, so
2633 * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2634 * but it seems dangerous to replicate a TRUNCATE and then refuse to
2635 * replicate subsequent INSERTs, so we forbid all commands the same.
2636 */
2637 if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2638 ereport(ERROR,
2640 errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2643}
2644
2645/*
2646 * Handle INSERT message.
2647 */
2648
2649static void
2651{
2654 LogicalRepRelId relid;
2657 EState *estate;
2660 bool run_as_owner;
2661
2662 /*
2663 * Quick return if we are skipping data modification changes or handling
2664 * streamed transactions.
2665 */
2666 if (is_skipping_changes() ||
2668 return;
2669
2671
2672 relid = logicalrep_read_insert(s, &newtup);
2675 {
2676 /*
2677 * The relation can't become interesting in the middle of the
2678 * transaction so it's safe to unlock it.
2679 */
2682 return;
2683 }
2684
2685 /*
2686 * Make sure that any user-supplied code runs as the table owner, unless
2687 * the user has opted out of that behavior.
2688 */
2690 if (!run_as_owner)
2691 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2692
2693 /* Set relation for error callback */
2695
2696 /* Initialize the executor state. */
2698 estate = edata->estate;
2701 &TTSOpsVirtual);
2702
2703 /* Process and store remote tuple in the slot */
2706 slot_fill_defaults(rel, estate, remoteslot);
2708
2709 /* For a partitioned table, insert the tuple into a partition. */
2710 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2713 else
2714 {
2715 ResultRelInfo *relinfo = edata->targetRelInfo;
2716
2717 ExecOpenIndices(relinfo, false);
2720 }
2721
2723
2724 /* Reset relation for error callback */
2726
2727 if (!run_as_owner)
2729
2731
2733}
2734
2735/*
2736 * Workhorse for apply_handle_insert()
2737 * relinfo is for the relation we're actually inserting into
2738 * (could be a child partition of edata->targetRelInfo)
2739 */
2740static void
2744{
2745 EState *estate = edata->estate;
2746
2747 /* Caller should have opened indexes already. */
2748 Assert(relinfo->ri_IndexRelationDescs != NULL ||
2749 !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
2750 RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
2751
2752 /* Caller will not have done this bit. */
2753 Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
2755
2756 /* Do the insert. */
2757 TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
2759}
2760
2761/*
2762 * Check if the logical replication relation is updatable and throw
2763 * appropriate error if it isn't.
2764 */
2765static void
2767{
2768 /*
2769 * For partitioned tables, we only need to care if the target partition is
2770 * updatable (aka has PK or RI defined for it).
2771 */
2772 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2773 return;
2774
2775 /* Updatable, no error. */
2776 if (rel->updatable)
2777 return;
2778
2779 /*
2780 * We are in error mode so it's fine this is somewhat slow. It's better to
2781 * give user correct error.
2782 */
2784 {
2785 ereport(ERROR,
2787 errmsg("publisher did not send replica identity column "
2788 "expected by the logical replication target relation \"%s.%s\"",
2789 rel->remoterel.nspname, rel->remoterel.relname)));
2790 }
2791
2792 ereport(ERROR,
2794 errmsg("logical replication target relation \"%s.%s\" has "
2795 "neither REPLICA IDENTITY index nor PRIMARY "
2796 "KEY and published relation does not have "
2797 "REPLICA IDENTITY FULL",
2798 rel->remoterel.nspname, rel->remoterel.relname)));
2799}
2800
2801/*
2802 * Handle UPDATE message.
2803 *
2804 * TODO: FDW support
2805 */
2806static void
2808{
2810 LogicalRepRelId relid;
2813 EState *estate;
2816 bool has_oldtup;
2820 bool run_as_owner;
2821
2822 /*
2823 * Quick return if we are skipping data modification changes or handling
2824 * streamed transactions.
2825 */
2826 if (is_skipping_changes() ||
2828 return;
2829
2831
2833 &newtup);
2836 {
2837 /*
2838 * The relation can't become interesting in the middle of the
2839 * transaction so it's safe to unlock it.
2840 */
2843 return;
2844 }
2845
2846 /* Set relation for error callback */
2848
2849 /* Check if we can do the update. */
2851
2852 /*
2853 * Make sure that any user-supplied code runs as the table owner, unless
2854 * the user has opted out of that behavior.
2855 */
2857 if (!run_as_owner)
2858 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2859
2860 /* Initialize the executor state. */
2862 estate = edata->estate;
2865 &TTSOpsVirtual);
2866
2867 /*
2868 * Populate updatedCols so that per-column triggers can fire, and so
2869 * executor can correctly pass down indexUnchanged hint. This could
2870 * include more columns than were actually changed on the publisher
2871 * because the logical replication protocol doesn't contain that
2872 * information. But it would for example exclude columns that only exist
2873 * on the subscriber, since we are not touching those.
2874 */
2876 for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2877 {
2878 CompactAttribute *att = TupleDescCompactAttr(remoteslot->tts_tupleDescriptor, i);
2879 int remoteattnum = rel->attrmap->attnums[i];
2880
2881 if (!att->attisdropped && remoteattnum >= 0)
2882 {
2883 if (remoteattnum >= newtup.ncols)
2884 ereport(ERROR,
2886 errmsg("logical replication column %d not found in tuple: only %d column(s) received",
2887 remoteattnum + 1, newtup.ncols)));
2888
2890 target_perminfo->updatedCols =
2891 bms_add_member(target_perminfo->updatedCols,
2893 }
2894 }
2895
2896 /* Build the search tuple. */
2899 has_oldtup ? &oldtup : &newtup);
2901
2902 /* For a partitioned table, apply update to correct partition. */
2903 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2906 else
2909
2911
2912 /* Reset relation for error callback */
2914
2915 if (!run_as_owner)
2917
2919
2921}
2922
2923/*
2924 * Workhorse for apply_handle_update()
2925 * relinfo is for the relation we're actually updating in
2926 * (could be a child partition of edata->targetRelInfo)
2927 */
2928static void
2933 Oid localindexoid)
2934{
2935 EState *estate = edata->estate;
2936 LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2937 Relation localrel = relinfo->ri_RelationDesc;
2938 EPQState epqstate;
2941 bool found;
2943
2944 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2945 ExecOpenIndices(relinfo, false);
2946
2947 found = FindReplTupleInLocalRel(edata, localrel,
2948 &relmapentry->remoterel,
2949 localindexoid,
2951
2952 /*
2953 * Tuple found.
2954 *
2955 * Note this will fail if there are other conflicting unique indexes.
2956 */
2957 if (found)
2958 {
2959 /*
2960 * Report the conflict if the tuple was modified by a different
2961 * origin.
2962 */
2964 &conflicttuple.origin, &conflicttuple.ts) &&
2966 {
2968
2969 /* Store the new tuple for conflict reporting */
2970 newslot = table_slot_create(localrel, &estate->es_tupleTable);
2971 slot_store_data(newslot, relmapentry, newtup);
2972
2973 conflicttuple.slot = localslot;
2974
2978 }
2979
2980 /* Process and store remote tuple in the slot */
2984
2985 EvalPlanQualSetSlot(&epqstate, remoteslot);
2986
2988
2989 /* Do the actual update. */
2990 TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
2991 ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2992 remoteslot);
2993 }
2994 else
2995 {
2998
2999 /*
3000 * Detecting whether the tuple was recently deleted or never existed
3001 * is crucial to avoid misleading the user during conflict handling.
3002 */
3003 if (FindDeletedTupleInLocalRel(localrel, localindexoid, remoteslot,
3004 &conflicttuple.xmin,
3005 &conflicttuple.origin,
3006 &conflicttuple.ts) &&
3009 else
3011
3012 /* Store the new tuple for conflict reporting */
3013 slot_store_data(newslot, relmapentry, newtup);
3014
3015 /*
3016 * The tuple to be updated could not be found or was deleted. Do
3017 * nothing except for emitting a log message.
3018 */
3021 }
3022
3023 /* Cleanup. */
3025 EvalPlanQualEnd(&epqstate);
3026}
3027
3028/*
3029 * Handle DELETE message.
3030 *
3031 * TODO: FDW support
3032 */
3033static void
3035{
3038 LogicalRepRelId relid;
3041 EState *estate;
3044 bool run_as_owner;
3045
3046 /*
3047 * Quick return if we are skipping data modification changes or handling
3048 * streamed transactions.
3049 */
3050 if (is_skipping_changes() ||
3052 return;
3053
3055
3056 relid = logicalrep_read_delete(s, &oldtup);
3059 {
3060 /*
3061 * The relation can't become interesting in the middle of the
3062 * transaction so it's safe to unlock it.
3063 */
3066 return;
3067 }
3068
3069 /* Set relation for error callback */
3071
3072 /* Check if we can do the delete. */
3074
3075 /*
3076 * Make sure that any user-supplied code runs as the table owner, unless
3077 * the user has opted out of that behavior.
3078 */
3080 if (!run_as_owner)
3081 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
3082
3083 /* Initialize the executor state. */
3085 estate = edata->estate;
3088 &TTSOpsVirtual);
3089
3090 /* Build the search tuple. */
3094
3095 /* For a partitioned table, apply delete to correct partition. */
3096 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3099 else
3100 {
3101 ResultRelInfo *relinfo = edata->targetRelInfo;
3102
3103 ExecOpenIndices(relinfo, false);
3107 }
3108
3110
3111 /* Reset relation for error callback */
3113
3114 if (!run_as_owner)
3116
3118
3120}
3121
3122/*
3123 * Workhorse for apply_handle_delete()
3124 * relinfo is for the relation we're actually deleting from
3125 * (could be a child partition of edata->targetRelInfo)
3126 */
3127static void
3131 Oid localindexoid)
3132{
3133 EState *estate = edata->estate;
3134 Relation localrel = relinfo->ri_RelationDesc;
3135 LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
3136 EPQState epqstate;
3139 bool found;
3140
3141 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3142
3143 /* Caller should have opened indexes already. */
3144 Assert(relinfo->ri_IndexRelationDescs != NULL ||
3145 !localrel->rd_rel->relhasindex ||
3146 RelationGetIndexList(localrel) == NIL);
3147
3148 found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
3150
3151 /* If found delete it. */
3152 if (found)
3153 {
3154 /*
3155 * Report the conflict if the tuple was modified by a different
3156 * origin.
3157 */
3159 &conflicttuple.origin, &conflicttuple.ts) &&
3161 {
3162 conflicttuple.slot = localslot;
3166 }
3167
3168 EvalPlanQualSetSlot(&epqstate, localslot);
3169
3170 /* Do the actual delete. */
3171 TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
3172 ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
3173 }
3174 else
3175 {
3176 /*
3177 * The tuple to be deleted could not be found. Do nothing except for
3178 * emitting a log message.
3179 */
3182 }
3183
3184 /* Cleanup. */
3185 EvalPlanQualEnd(&epqstate);
3186}
3187
3188/*
3189 * Try to find a tuple received from the publication side (in 'remoteslot') in
3190 * the corresponding local relation using either replica identity index,
3191 * primary key, index or if needed, sequential scan.
3192 *
3193 * Local tuple, if found, is returned in '*localslot'.
3194 */
3195static bool
3197 LogicalRepRelation *remoterel,
3201{
3202 EState *estate = edata->estate;
3203 bool found;
3204
3205 /*
3206 * Regardless of the top-level operation, we're performing a read here, so
3207 * check for SELECT privileges.
3208 */
3210
3211 *localslot = table_slot_create(localrel, &estate->es_tupleTable);
3212
3214 (remoterel->replident == REPLICA_IDENTITY_FULL));
3215
3217 {
3218#ifdef USE_ASSERT_CHECKING
3220
3221 /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
3223 (remoterel->replident == REPLICA_IDENTITY_FULL &&
3225 edata->targetRel->attrmap)));
3227#endif
3228
3229 found = RelationFindReplTupleByIndex(localrel, localidxoid,
3232 }
3233 else
3236
3237 return found;
3238}
3239
3240/*
3241 * Determine whether the index can reliably locate the deleted tuple in the
3242 * local relation.
3243 *
3244 * An index may exclude deleted tuples if it was re-indexed or re-created during
3245 * change application. Therefore, an index is considered usable only if the
3246 * conflict detection slot.xmin (conflict_detection_xmin) is greater than the
3247 * index tuple's xmin. This ensures that any tuples deleted prior to the index
3248 * creation or re-indexing are not relevant for conflict detection in the
3249 * current apply worker.
3250 *
3251 * Note that indexes may also be excluded if they were modified by other DDL
3252 * operations, such as ALTER INDEX. However, this is acceptable, as the
3253 * likelihood of such DDL changes coinciding with the need to scan dead
3254 * tuples for the update_deleted is low.
3255 */
3256static bool
3259{
3262
3264
3265 if (!HeapTupleIsValid(index_tuple)) /* should not happen */
3266 elog(ERROR, "cache lookup failed for index %u", localindexoid);
3267
3268 /*
3269 * No need to check for a frozen transaction ID, as
3270 * TransactionIdPrecedes() manages it internally, treating it as falling
3271 * behind the conflict_detection_xmin.
3272 */
3274
3276
3278}
3279
3280/*
3281 * Attempts to locate a deleted tuple in the local relation that matches the
3282 * values of the tuple received from the publication side (in 'remoteslot').
3283 * The search is performed using either the replica identity index, primary
3284 * key, other available index, or a sequential scan if necessary.
3285 *
3286 * Returns true if the deleted tuple is found. If found, the transaction ID,
3287 * origin, and commit timestamp of the deletion are stored in '*delete_xid',
3288 * '*delete_origin', and '*delete_time' respectively.
3289 */
3290static bool
3295{
3297
3298 /*
3299 * Return false if either dead tuples are not retained or commit timestamp
3300 * data is not available.
3301 */
3303 return false;
3304
3305 /*
3306 * For conflict detection, we use the leader worker's
3307 * oldest_nonremovable_xid value instead of invoking
3308 * GetOldestNonRemovableTransactionId() or using the conflict detection
3309 * slot's xmin. The oldest_nonremovable_xid acts as a threshold to
3310 * identify tuples that were recently deleted. These deleted tuples are no
3311 * longer visible to concurrent transactions. However, if a remote update
3312 * matches such a tuple, we log an update_deleted conflict.
3313 *
3314 * While GetOldestNonRemovableTransactionId() and slot.xmin may return
3315 * transaction IDs older than oldest_nonremovable_xid, for our current
3316 * purpose, it is acceptable to treat tuples deleted by transactions prior
3317 * to oldest_nonremovable_xid as update_missing conflicts.
3318 */
3320 {
3322 }
3323 else
3324 {
3325 LogicalRepWorker *leader;
3326
3327 /*
3328 * Obtain the information from the leader apply worker as only the
3329 * leader manages oldest_nonremovable_xid (see
3330 * maybe_advance_nonremovable_xid() for details).
3331 */
3335 false);
3336 if (!leader)
3337 {
3338 ereport(ERROR,
3340 errmsg("could not detect conflict as the leader apply worker has exited")));
3341 }
3342
3343 SpinLockAcquire(&leader->relmutex);
3345 SpinLockRelease(&leader->relmutex);
3347 }
3348
3349 /*
3350 * Return false if the leader apply worker has stopped retaining
3351 * information for detecting conflicts. This implies that update_deleted
3352 * can no longer be reliably detected.
3353 */
3355 return false;
3356
3357 if (OidIsValid(localidxoid) &&
3362 delete_time);
3363 else
3367}
3368
3369/*
3370 * This handles insert, update, delete on a partitioned table.
3371 */
3372static void
3377{
3378 EState *estate = edata->estate;
3379 LogicalRepRelMapEntry *relmapentry = edata->targetRel;
3380 ResultRelInfo *relinfo = edata->targetRelInfo;
3381 Relation parentrel = relinfo->ri_RelationDesc;
3382 ModifyTableState *mtstate;
3383 PartitionTupleRouting *proute;
3385 Relation partrel;
3387 TupleConversionMap *map;
3390 AttrMap *attrmap = NULL;
3391
3392 /* ModifyTableState is needed for ExecFindPartition(). */
3393 edata->mtstate = mtstate = makeNode(ModifyTableState);
3394 mtstate->ps.plan = NULL;
3395 mtstate->ps.state = estate;
3396 mtstate->operation = operation;
3397 mtstate->resultRelInfo = relinfo;
3398
3399 /* ... as is PartitionTupleRouting. */
3400 edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
3401
3402 /*
3403 * Find the partition to which the "search tuple" belongs.
3404 */
3407 partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
3408 remoteslot, estate);
3410 partrel = partrelinfo->ri_RelationDesc;
3411
3412 /*
3413 * Check for supported relkind. We need this since partitions might be of
3414 * unsupported relkinds; and the set of partitions can change, so checking
3415 * at CREATE/ALTER SUBSCRIPTION would be insufficient.
3416 */
3417 CheckSubscriptionRelkind(partrel->rd_rel->relkind,
3418 relmapentry->remoterel.relkind,
3420 RelationGetRelationName(partrel));
3421
3422 /*
3423 * To perform any of the operations below, the tuple must match the
3424 * partition's rowtype. Convert if needed or just copy, using a dedicated
3425 * slot to store the tuple in any case.
3426 */
3427 remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
3428 if (remoteslot_part == NULL)
3429 remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
3430 map = ExecGetRootToChildMap(partrelinfo, estate);
3431 if (map != NULL)
3432 {
3433 attrmap = map->attrMap;
3436 }
3437 else
3438 {
3441 }
3443
3444 /* Check if we can do the update or delete on the leaf partition. */
3446 {
3447 part_entry = logicalrep_partition_open(relmapentry, partrel,
3448 attrmap);
3450 }
3451
3452 switch (operation)
3453 {
3454 case CMD_INSERT:
3457 break;
3458
3459 case CMD_DELETE:
3462 part_entry->localindexoid);
3463 break;
3464
3465 case CMD_UPDATE:
3466
3467 /*
3468 * For UPDATE, depending on whether or not the updated tuple
3469 * satisfies the partition's constraint, perform a simple UPDATE
3470 * of the partition or move the updated tuple into a different
3471 * suitable partition.
3472 */
3473 {
3477 bool found;
3478 EPQState epqstate;
3480
3481 /* Get the matching local tuple from the partition. */
3482 found = FindReplTupleInLocalRel(edata, partrel,
3483 &part_entry->remoterel,
3484 part_entry->localindexoid,
3486 if (!found)
3487 {
3490
3491 /*
3492 * Detecting whether the tuple was recently deleted or
3493 * never existed is crucial to avoid misleading the user
3494 * during conflict handling.
3495 */
3496 if (FindDeletedTupleInLocalRel(partrel,
3497 part_entry->localindexoid,
3499 &conflicttuple.xmin,
3500 &conflicttuple.origin,
3501 &conflicttuple.ts) &&
3504 else
3506
3507 /* Store the new tuple for conflict reporting */
3509
3510 /*
3511 * The tuple to be updated could not be found or was
3512 * deleted. Do nothing except for emitting a log message.
3513 */
3517
3518 return;
3519 }
3520
3521 /*
3522 * Report the conflict if the tuple was modified by a
3523 * different origin.
3524 */
3526 &conflicttuple.origin,
3527 &conflicttuple.ts) &&
3529 {
3531
3532 /* Store the new tuple for conflict reporting */
3533 newslot = table_slot_create(partrel, &estate->es_tupleTable);
3535
3536 conflicttuple.slot = localslot;
3537
3541 }
3542
3543 /*
3544 * Apply the update to the local tuple, putting the result in
3545 * remoteslot_part.
3546 */
3549 newtup);
3551
3552 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3553
3554 /*
3555 * Does the updated tuple still satisfy the current
3556 * partition's constraint?
3557 */
3558 if (!partrel->rd_rel->relispartition ||
3560 false))
3561 {
3562 /*
3563 * Yes, so simply UPDATE the partition. We don't call
3564 * apply_handle_update_internal() here, which would
3565 * normally do the following work, to avoid repeating some
3566 * work already done above to find the local tuple in the
3567 * partition.
3568 */
3570
3572 TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
3573 ACL_UPDATE);
3574 ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3576 }
3577 else
3578 {
3579 /* Move the tuple into the new partition. */
3580
3581 /*
3582 * New partition will be found using tuple routing, which
3583 * can only occur via the parent table. We might need to
3584 * convert the tuple to the parent's rowtype. Note that
3585 * this is the tuple found in the partition, not the
3586 * original search tuple received by this function.
3587 */
3588 if (map)
3589 {
3593
3594 remoteslot =
3597 }
3598 else
3599 {
3602 }
3603
3604 /* Find the new partition. */
3607 proute, remoteslot,
3608 estate);
3611 partrel_new = partrelinfo_new->ri_RelationDesc;
3612
3613 /* Check that new partition also has supported relkind. */
3614 CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3615 relmapentry->remoterel.relkind,
3618
3619 /* DELETE old tuple found in the old partition. */
3620 EvalPlanQualSetSlot(&epqstate, localslot);
3621 TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);
3622 ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
3623
3624 /* INSERT new tuple into the new partition. */
3625
3626 /*
3627 * Convert the replacement tuple to match the destination
3628 * partition rowtype.
3629 */
3631 remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3632 if (remoteslot_part == NULL)
3634 &estate->es_tupleTable);
3636 if (map != NULL)
3637 {
3639 remoteslot,
3641 }
3642 else
3643 {
3645 remoteslot);
3647 }
3651 }
3652
3653 EvalPlanQualEnd(&epqstate);
3654 }
3655 break;
3656
3657 default:
3658 elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3659 break;
3660 }
3661}
3662
3663/*
3664 * Handle TRUNCATE message.
3665 *
3666 * TODO: FDW support
3667 */
3668static void
3670{
3671 bool cascade = false;
3672 bool restart_seqs = false;
3674 List *remote_rels = NIL;
3675 List *rels = NIL;
3676 List *part_rels = NIL;
3677 List *relids = NIL;
3679 ListCell *lc;
3680 LOCKMODE lockmode = AccessExclusiveLock;
3681
3682 /*
3683 * Quick return if we are skipping data modification changes or handling
3684 * streamed transactions.
3685 */
3686 if (is_skipping_changes() ||
3688 return;
3689
3691
3692 remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3693
3694 foreach(lc, remote_relids)
3695 {
3696 LogicalRepRelId relid = lfirst_oid(lc);
3698
3699 rel = logicalrep_rel_open(relid, lockmode);
3701 {
3702 /*
3703 * The relation can't become interesting in the middle of the
3704 * transaction so it's safe to unlock it.
3705 */
3706 logicalrep_rel_close(rel, lockmode);
3707 continue;
3708 }
3709
3712 rels = lappend(rels, rel->localrel);
3713 relids = lappend_oid(relids, rel->localreloid);
3716
3717 /*
3718 * Truncate partitions if we got a message to truncate a partitioned
3719 * table.
3720 */
3721 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3722 {
3723 ListCell *child;
3724 List *children = find_all_inheritors(rel->localreloid,
3725 lockmode,
3726 NULL);
3727
3728 foreach(child, children)
3729 {
3730 Oid childrelid = lfirst_oid(child);
3732
3733 if (list_member_oid(relids, childrelid))
3734 continue;
3735
3736 /* find_all_inheritors already got lock */
3738
3739 /*
3740 * Ignore temp tables of other backends. See similar code in
3741 * ExecuteTruncate().
3742 */
3744 {
3745 table_close(childrel, lockmode);
3746 continue;
3747 }
3748
3750 rels = lappend(rels, childrel);
3752 relids = lappend_oid(relids, childrelid);
3753 /* Log this relation only if needed for logical decoding */
3756 }
3757 }
3758 }
3759
3760 /*
3761 * Even if we used CASCADE on the upstream primary we explicitly default
3762 * to replaying changes without further cascading. This might be later
3763 * changeable with a user specified option.
3764 *
3765 * MySubscription->runasowner tells us whether we want to execute
3766 * replication actions as the subscription owner; the last argument to
3767 * TruncateGuts tells it whether we want to switch to the table owner.
3768 * Those are exactly opposite conditions.
3769 */
3771 relids,
3774 restart_seqs,
3776 foreach(lc, remote_rels)
3777 {
3779
3781 }
3782 foreach(lc, part_rels)
3783 {
3784 Relation rel = lfirst(lc);
3785
3786 table_close(rel, NoLock);
3787 }
3788
3790}
3791
3792
3793/*
3794 * Logical replication protocol message dispatcher.
3795 */
3796void
3798{
3799 LogicalRepMsgType action = pq_getmsgbyte(s);
3801
3802 /*
3803 * Set the current command being applied. Since this function can be
3804 * called recursively when applying spooled changes, save the current
3805 * command.
3806 */
3809
3810 switch (action)
3811 {
3814 break;
3815
3818 break;
3819
3822 break;
3823
3826 break;
3827
3830 break;
3831
3834 break;
3835
3838 break;
3839
3842 break;
3843
3846 break;
3847
3849
3850 /*
3851 * Logical replication does not use generic logical messages yet.
3852 * Although, it could be used by other applications that use this
3853 * output plugin.
3854 */
3855 break;
3856
3859 break;
3860
3863 break;
3864
3867 break;
3868
3871 break;
3872
3875 break;
3876
3879 break;
3880
3883 break;
3884
3887 break;
3888
3891 break;
3892
3893 default:
3894 ereport(ERROR,
3896 errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3897 }
3898
3899 /* Reset the current command */
3901}
3902
3903/*
3904 * Figure out which write/flush positions to report to the walsender process.
3905 *
3906 * We can't simply report back the last LSN the walsender sent us because the
3907 * local transaction might not yet be flushed to disk locally. Instead we
3908 * build a list that associates local with remote LSNs for every commit. When
3909 * reporting back the flush position to the sender we iterate that list and
3910 * check which entries on it are already locally flushed. Those we can report
3911 * as having been flushed.
3912 *
3913 * The have_pending_txes is true if there are outstanding transactions that
3914 * need to be flushed.
3915 */
3916static void
3918 bool *have_pending_txes)
3919{
3920 dlist_mutable_iter iter;
3922
3924 *flush = InvalidXLogRecPtr;
3925
3927 {
3928 FlushPosition *pos =
3929 dlist_container(FlushPosition, node, iter.cur);
3930
3931 *write = pos->remote_end;
3932
3933 if (pos->local_end <= local_flush)
3934 {
3935 *flush = pos->remote_end;
3936 dlist_delete(iter.cur);
3937 pfree(pos);
3938 }
3939 else
3940 {
3941 /*
3942 * Don't want to uselessly iterate over the rest of the list which
3943 * could potentially be long. Instead get the last element and
3944 * grab the write position from there.
3945 */
3947 &lsn_mapping);
3948 *write = pos->remote_end;
3949 *have_pending_txes = true;
3950 return;
3951 }
3952 }
3953
3955}
3956
3957/*
3958 * Store current remote/local lsn pair in the tracking list.
3959 */
3960void
3962{
3964
3965 /*
3966 * Skip for parallel apply workers, because the lsn_mapping is maintained
3967 * by the leader apply worker.
3968 */
3970 return;
3971
3972 /* Need to do this in permanent context */
3974
3975 /* Track commit lsn */
3977 flushpos->local_end = local_lsn;
3978 flushpos->remote_end = remote_lsn;
3979
3982}
3983
3984
3985/* Update statistics of the worker. */
3986static void
3998
3999/*
4000 * Apply main loop.
4001 */
4002static void
4004{
4006 bool ping_sent = false;
4007 TimeLineID tli;
4008 ErrorContextCallback errcallback;
4010
4011 /*
4012 * Init the ApplyMessageContext which we clean up after each replication
4013 * protocol message.
4014 */
4016 "ApplyMessageContext",
4018
4019 /*
4020 * This memory context is used for per-stream data when the streaming mode
4021 * is enabled. This context is reset on each stream stop.
4022 */
4024 "LogicalStreamingContext",
4026
4027 /* mark as idle, before starting to loop */
4029
4030 /*
4031 * Push apply error context callback. Fields will be filled while applying
4032 * a change.
4033 */
4034 errcallback.callback = apply_error_callback;
4035 errcallback.previous = error_context_stack;
4036 error_context_stack = &errcallback;
4038
4039 /* This outer loop iterates once per wait. */
4040 for (;;)
4041 {
4043 int rc;
4044 int len;
4045 char *buf = NULL;
4046 bool endofstream = false;
4047 long wait_time;
4048
4050
4052
4054
4055 if (len != 0)
4056 {
4057 /* Loop to process all available data (without blocking). */
4058 for (;;)
4059 {
4061
4062 if (len == 0)
4063 {
4064 break;
4065 }
4066 else if (len < 0)
4067 {
4068 ereport(LOG,
4069 (errmsg("data stream from publisher has ended")));
4070 endofstream = true;
4071 break;
4072 }
4073 else
4074 {
4075 int c;
4077
4079 {
4080 ConfigReloadPending = false;
4082 }
4083
4084 /* Reset timeout. */
4086 ping_sent = false;
4087
4088 rdt_data.last_recv_time = last_recv_timestamp;
4089
4090 /* Ensure we are reading the data into our memory context. */
4092
4094
4095 c = pq_getmsgbyte(&s);
4096
4097 if (c == PqReplMsg_WALData)
4098 {
4099 XLogRecPtr start_lsn;
4100 XLogRecPtr end_lsn;
4102
4103 start_lsn = pq_getmsgint64(&s);
4104 end_lsn = pq_getmsgint64(&s);
4106
4107 if (last_received < start_lsn)
4108 last_received = start_lsn;
4109
4110 if (last_received < end_lsn)
4111 last_received = end_lsn;
4112
4114
4115 apply_dispatch(&s);
4116
4118 }
4119 else if (c == PqReplMsg_Keepalive)
4120 {
4121 XLogRecPtr end_lsn;
4123 bool reply_requested;
4124
4125 end_lsn = pq_getmsgint64(&s);
4128
4129 if (last_received < end_lsn)
4130 last_received = end_lsn;
4131
4133
4135
4137 }
4138 else if (c == PqReplMsg_PrimaryStatusUpdate)
4139 {
4140 rdt_data.remote_lsn = pq_getmsgint64(&s);
4141 rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
4143 rdt_data.reply_time = pq_getmsgint64(&s);
4144
4145 /*
4146 * This should never happen, see
4147 * ProcessStandbyPSRequestMessage. But if it happens
4148 * due to a bug, we don't want to proceed as it can
4149 * incorrectly advance oldest_nonremovable_xid.
4150 */
4151 if (!XLogRecPtrIsValid(rdt_data.remote_lsn))
4152 elog(ERROR, "cannot get the latest WAL position from the publisher");
4153
4155
4156 UpdateWorkerStats(last_received, rdt_data.reply_time, false);
4157 }
4158 /* other message types are purposefully ignored */
4159
4161 }
4162
4164 }
4165 }
4166
4167 /* confirm all writes so far */
4168 send_feedback(last_received, false, false);
4169
4170 /* Reset the timestamp if no message was received */
4171 rdt_data.last_recv_time = 0;
4172
4174
4176 {
4177 /*
4178 * If we didn't get any transactions for a while there might be
4179 * unconsumed invalidation messages in the queue, consume them
4180 * now.
4181 */
4184
4185 /*
4186 * Process any relations that are being synchronized in parallel
4187 * and any newly added tables or sequences.
4188 */
4190 }
4191
4192 /* Cleanup the memory. */
4195
4196 /* Check if we need to exit the streaming loop. */
4197 if (endofstream)
4198 break;
4199
4200 /*
4201 * Wait for more data or latch. If we have unflushed transactions,
4202 * wake up after WalWriterDelay to see if they've been flushed yet (in
4203 * which case we should send a feedback message). Otherwise, there's
4204 * no particular urgency about waking up unless we get data or a
4205 * signal.
4206 */
4208 wait_time = WalWriterDelay;
4209 else
4210 wait_time = NAPTIME_PER_CYCLE;
4211
4212 /*
4213 * Ensure to wake up when it's possible to advance the non-removable
4214 * transaction ID, or when the retention duration may have exceeded
4215 * max_retention_duration.
4216 */
4218 {
4219 if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
4220 rdt_data.xid_advance_interval)
4221 wait_time = Min(wait_time, rdt_data.xid_advance_interval);
4222 else if (MySubscription->maxretention > 0)
4223 wait_time = Min(wait_time, MySubscription->maxretention);
4224 }
4225
4229 fd, wait_time,
4231
4232 if (rc & WL_LATCH_SET)
4233 {
4236 }
4237
4239 {
4240 ConfigReloadPending = false;
4242 }
4243
4244 if (rc & WL_TIMEOUT)
4245 {
4246 /*
4247 * We didn't receive anything new. If we haven't heard anything
4248 * from the server for more than wal_receiver_timeout / 2, ping
4249 * the server. Also, if it's been longer than
4250 * wal_receiver_status_interval since the last update we sent,
4251 * send a status update to the primary anyway, to report any
4252 * progress in applying WAL.
4253 */
4254 bool requestReply = false;
4255
4256 /*
4257 * Check if time since last receive from primary has reached the
4258 * configured limit.
4259 */
4260 if (wal_receiver_timeout > 0)
4261 {
4264
4265 timeout =
4268
4269 if (now >= timeout)
4270 ereport(ERROR,
4272 errmsg("terminating logical replication worker due to timeout")));
4273
4274 /* Check to see if it's time for a ping. */
4275 if (!ping_sent)
4276 {
4278 (wal_receiver_timeout / 2));
4279 if (now >= timeout)
4280 {
4281 requestReply = true;
4282 ping_sent = true;
4283 }
4284 }
4285 }
4286
4288
4290
4291 /*
4292 * Force reporting to ensure long idle periods don't lead to
4293 * arbitrarily delayed stats. Stats can only be reported outside
4294 * of (implicit or explicit) transactions. That shouldn't lead to
4295 * stats being delayed for long, because transactions are either
4296 * sent as a whole on commit or streamed. Streamed transactions
4297 * are spilled to disk and applied on commit.
4298 */
4299 if (!IsTransactionState())
4300 pgstat_report_stat(true);
4301 }
4302 }
4303
4304 /* Pop the error context stack */
4305 error_context_stack = errcallback.previous;
4307
4308 /* All done */
4310}
4311
4312/*
4313 * Send a Standby Status Update message to server.
4314 *
4315 * 'recvpos' is the latest LSN we've received data to, force is set if we need
4316 * to send a response to avoid timeouts.
4317 */
4318static void
4320{
4321 static StringInfo reply_message = NULL;
4322 static TimestampTz send_time = 0;
4323
4326
4330 bool have_pending_txes;
4331
4332 /*
4333 * If the user doesn't want status to be reported to the publisher, be
4334 * sure to exit before doing anything at all.
4335 */
4336 if (!force && wal_receiver_status_interval <= 0)
4337 return;
4338
4339 /* It's legal to not pass a recvpos */
4340 if (recvpos < last_recvpos)
4342
4344
4345 /*
4346 * No outstanding transactions to flush, we can report the latest received
4347 * position. This is important for synchronous replication.
4348 */
4349 if (!have_pending_txes)
4351
4352 if (writepos < last_writepos)
4354
4355 if (flushpos < last_flushpos)
4357
4359
4360 /* if we've already reported everything we're good */
4361 if (!force &&
4366 return;
4367 send_time = now;
4368
4369 if (!reply_message)
4370 {
4372
4375 }
4376 else
4378
4380 pq_sendint64(reply_message, recvpos); /* write */
4381 pq_sendint64(reply_message, flushpos); /* flush */
4382 pq_sendint64(reply_message, writepos); /* apply */
4383 pq_sendint64(reply_message, now); /* sendTime */
4384 pq_sendbyte(reply_message, requestReply); /* replyRequested */
4385
4386 elog(DEBUG2, "sending feedback (force %d) to recv %X/%08X, write %X/%08X, flush %X/%08X",
4387 force,
4391
4394
4395 if (recvpos > last_recvpos)
4397 if (writepos > last_writepos)
4399 if (flushpos > last_flushpos)
4401}
4402
4403/*
4404 * Attempt to advance the non-removable transaction ID.
4405 *
4406 * See comments atop worker.c for details.
4407 */
4408static void
4417
4418/*
4419 * Preliminary check to determine if advancing the non-removable transaction ID
4420 * is allowed.
4421 */
4422static bool
4424{
4425 /*
4426 * It is sufficient to manage non-removable transaction ID for a
4427 * subscription by the main apply worker to detect update_deleted reliably
4428 * even for table sync or parallel apply workers.
4429 */
4431 return false;
4432
4433 /* No need to advance if retaining dead tuples is not required */
4435 return false;
4436
4437 return true;
4438}
4439
4440/*
4441 * Process phase transitions during the non-removable transaction ID
4442 * advancement. See comments atop worker.c for details of the transition.
4443 */
4444static void
4470
4471/*
4472 * Workhorse for the RDT_GET_CANDIDATE_XID phase.
4473 */
4474static void
4476{
4479
4480 /*
4481 * Use last_recv_time when applying changes in the loop to avoid
4482 * unnecessary system time retrieval. If last_recv_time is not available,
4483 * obtain the current timestamp.
4484 */
4485 now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4486
4487 /*
4488 * Compute the candidate_xid and request the publisher status at most once
4489 * per xid_advance_interval. Refer to adjust_xid_advance_interval() for
4490 * details on how this value is dynamically adjusted. This is to avoid
4491 * using CPU and network resources without making much progress.
4492 */
4493 if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4494 rdt_data->xid_advance_interval))
4495 return;
4496
4497 /*
4498 * Immediately update the timer, even if the function returns later
4499 * without setting candidate_xid due to inactivity on the subscriber. This
4500 * avoids frequent calls to GetOldestActiveTransactionId.
4501 */
4502 rdt_data->candidate_xid_time = now;
4503
4504 /*
4505 * Consider transactions in the current database, as only dead tuples from
4506 * this database are required for conflict detection.
4507 */
4509
4510 /*
4511 * Oldest active transaction ID (oldest_running_xid) can't be behind any
4512 * of its previously computed value.
4513 */
4516
4517 /* Return if the oldest_nonremovable_xid cannot be advanced */
4520 {
4522 return;
4523 }
4524
4526
4527 rdt_data->candidate_xid = oldest_running_xid;
4529
4530 /* process the next phase */
4532}
4533
4534/*
4535 * Workhorse for the RDT_REQUEST_PUBLISHER_STATUS phase.
4536 */
4537static void
4539{
4541
4542 if (!request_message)
4543 {
4545
4548 }
4549 else
4551
4552 /*
4553 * Send the current time to update the remote walsender's latest reply
4554 * message received time.
4555 */
4558
4559 elog(DEBUG2, "sending publisher status request message");
4560
4561 /* Send a request for the publisher status */
4563 request_message->data, request_message->len);
4564
4566
4567 /*
4568 * Skip calling maybe_advance_nonremovable_xid() since further transition
4569 * is possible only once we receive the publisher status message.
4570 */
4571}
4572
4573/*
4574 * Workhorse for the RDT_WAIT_FOR_PUBLISHER_STATUS phase.
4575 */
4576static void
4578 bool status_received)
4579{
4580 /*
4581 * Return if we have requested but not yet received the publisher status.
4582 */
4583 if (!status_received)
4584 return;
4585
4586 /*
4587 * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4588 * retaining conflict information for this worker.
4589 */
4591 {
4593 return;
4594 }
4595
4596 if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
4597 rdt_data->remote_wait_for = rdt_data->remote_nextxid;
4598
4599 /*
4600 * Check if all remote concurrent transactions that were active at the
4601 * first status request have now completed. If completed, proceed to the
4602 * next phase; otherwise, continue checking the publisher status until
4603 * these transactions finish.
4604 *
4605 * It's possible that transactions in the commit phase during the last
4606 * cycle have now finished committing, but remote_oldestxid remains older
4607 * than remote_wait_for. This can happen if some old transaction came in
4608 * the commit phase when we requested status in this cycle. We do not
4609 * handle this case explicitly as it's rare and the benefit doesn't
4610 * justify the required complexity. Tracking would require either caching
4611 * all xids at the publisher or sending them to subscribers. The condition
4612 * will resolve naturally once the remaining transactions are finished.
4613 *
4614 * Directly advancing the non-removable transaction ID is possible if
4615 * there are no activities on the publisher since the last advancement
4616 * cycle. However, it requires maintaining two fields, last_remote_nextxid
4617 * and last_remote_lsn, within the structure for comparison with the
4618 * current cycle's values. Considering the minimal cost of continuing in
4619 * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to
4620 * advance the transaction ID here.
4621 */
4622 if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for,
4623 rdt_data->remote_oldestxid))
4625 else
4627
4628 /* process the next phase */
4630}
4631
4632/*
4633 * Workhorse for the RDT_WAIT_FOR_LOCAL_FLUSH phase.
4634 */
4635static void
4637{
4638 Assert(XLogRecPtrIsValid(rdt_data->remote_lsn) &&
4639 TransactionIdIsValid(rdt_data->candidate_xid));
4640
4641 /*
4642 * We expect the publisher and subscriber clocks to be in sync using time
4643 * sync service like NTP. Otherwise, we will advance this worker's
4644 * oldest_nonremovable_xid prematurely, leading to the removal of rows
4645 * required to detect update_deleted reliably. This check primarily
4646 * addresses scenarios where the publisher's clock falls behind; if the
4647 * publisher's clock is ahead, subsequent transactions will naturally bear
4648 * later commit timestamps, conforming to the design outlined atop
4649 * worker.c.
4650 *
4651 * XXX Consider waiting for the publisher's clock to catch up with the
4652 * subscriber's before proceeding to the next phase.
4653 */
4654 if (TimestampDifferenceExceeds(rdt_data->reply_time,
4655 rdt_data->candidate_xid_time, 0))
4656 ereport(ERROR,
4657 errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"),
4658 errdetail_internal("The clock on the publisher is behind that of the subscriber."));
4659
4660 /*
4661 * Do not attempt to advance the non-removable transaction ID when table
4662 * sync is in progress. During this time, changes from a single
4663 * transaction may be applied by multiple table sync workers corresponding
4664 * to the target tables. So, it's necessary for all table sync workers to
4665 * apply and flush the corresponding changes before advancing the
4666 * transaction ID, otherwise, dead tuples that are still needed for
4667 * conflict detection in table sync workers could be removed prematurely.
4668 * However, confirming the apply and flush progress across all table sync
4669 * workers is complex and not worth the effort, so we simply return if not
4670 * all tables are in the READY state.
4671 *
4672 * Advancing the transaction ID is necessary even when no tables are
4673 * currently subscribed, to avoid retaining dead tuples unnecessarily.
4674 * While it might seem safe to skip all phases and directly assign
4675 * candidate_xid to oldest_nonremovable_xid during the
4676 * RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users
4677 * concurrently add tables to the subscription, the apply worker may not
4678 * process invalidations in time. Consequently,
4679 * HasSubscriptionTablesCached() might miss the new tables, leading to
4680 * premature advancement of oldest_nonremovable_xid.
4681 *
4682 * Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as
4683 * invalidations are guaranteed to be processed before applying changes
4684 * from newly added tables while waiting for the local flush to reach
4685 * remote_lsn.
4686 *
4687 * Additionally, even if we check for subscription tables during
4688 * RDT_GET_CANDIDATE_XID, they might be dropped before reaching
4689 * RDT_WAIT_FOR_LOCAL_FLUSH. Therefore, it's still necessary to verify
4690 * subscription tables at this stage to prevent unnecessary tuple
4691 * retention.
4692 */
4694 {
4696
4697 now = rdt_data->last_recv_time
4698 ? rdt_data->last_recv_time : GetCurrentTimestamp();
4699
4700 /*
4701 * Record the time spent waiting for table sync, it is needed for the
4702 * timeout check in should_stop_conflict_info_retention().
4703 */
4704 rdt_data->table_sync_wait_time =
4705 TimestampDifferenceMilliseconds(rdt_data->candidate_xid_time, now);
4706
4707 return;
4708 }
4709
4710 /*
4711 * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4712 * retaining conflict information for this worker.
4713 */
4715 {
4717 return;
4718 }
4719
4720 /*
4721 * Update and check the remote flush position if we are applying changes
4722 * in a loop. This is done at most once per WalWriterDelay to avoid
4723 * performing costly operations in get_flush_position() too frequently
4724 * during change application.
4725 */
4726 if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time &&
4727 TimestampDifferenceExceeds(rdt_data->flushpos_update_time,
4728 rdt_data->last_recv_time, WalWriterDelay))
4729 {
4732 bool have_pending_txes;
4733
4734 /* Fetch the latest remote flush position */
4736
4737 if (flushpos > last_flushpos)
4739
4740 rdt_data->flushpos_update_time = rdt_data->last_recv_time;
4741 }
4742
4743 /* Return to wait for the changes to be applied */
4744 if (last_flushpos < rdt_data->remote_lsn)
4745 return;
4746
4747 /*
4748 * Reaching this point implies should_stop_conflict_info_retention()
4749 * returned false earlier, meaning that the most recent duration for
4750 * advancing the non-removable transaction ID is within the
4751 * max_retention_duration or max_retention_duration is set to 0.
4752 *
4753 * Therefore, if conflict info retention was previously stopped due to a
4754 * timeout, it is now safe to resume retention.
4755 */
4757 {
4759 return;
4760 }
4761
4762 /*
4763 * Reaching here means the remote WAL position has been received, and all
4764 * transactions up to that position on the publisher have been applied and
4765 * flushed locally. So, we can advance the non-removable transaction ID.
4766 */
4770
4771 elog(DEBUG2, "confirmed flush up to remote lsn %X/%08X: new oldest_nonremovable_xid %u",
4772 LSN_FORMAT_ARGS(rdt_data->remote_lsn),
4773 rdt_data->candidate_xid);
4774
4775 /* Notify launcher to update the xmin of the conflict slot */
4777
4779
4780 /* process the next phase */
4782}
4783
4784/*
4785 * Check whether conflict information retention should be stopped due to
4786 * exceeding the maximum wait time (max_retention_duration).
4787 *
4788 * If retention should be stopped, return true. Otherwise, return false.
4789 */
4790static bool
4792{
4794
4795 Assert(TransactionIdIsValid(rdt_data->candidate_xid));
4798
4800 return false;
4801
4802 /*
4803 * Use last_recv_time when applying changes in the loop to avoid
4804 * unnecessary system time retrieval. If last_recv_time is not available,
4805 * obtain the current timestamp.
4806 */
4807 now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4808
4809 /*
4810 * Return early if the wait time has not exceeded the configured maximum
4811 * (max_retention_duration). Time spent waiting for table synchronization
4812 * is excluded from this calculation, as it occurs infrequently.
4813 */
4814 if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4816 rdt_data->table_sync_wait_time))
4817 return false;
4818
4819 return true;
4820}
4821
4822/*
4823 * Workhorse for the RDT_STOP_CONFLICT_INFO_RETENTION phase.
4824 */
4825static void
4827{
4828 /* Stop retention if not yet */
4830 {
4831 /*
4832 * If the retention status cannot be updated (e.g., due to active
4833 * transaction), skip further processing to avoid inconsistent
4834 * retention behavior.
4835 */
4836 if (!update_retention_status(false))
4837 return;
4838
4842
4843 ereport(LOG,
4844 errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
4846 errdetail("Retention is stopped because the apply process has not caught up with the publisher within the configured max_retention_duration."));
4847 }
4848
4850
4851 /*
4852 * If retention has been stopped, reset to the initial phase to retry
4853 * resuming retention. This reset is required to recalculate the current
4854 * wait time and resume retention if the time falls within
4855 * max_retention_duration.
4856 */
4858}
4859
4860/*
4861 * Workhorse for the RDT_RESUME_CONFLICT_INFO_RETENTION phase.
4862 */
4863static void
4865{
4866 /* We can't resume retention without updating retention status. */
4867 if (!update_retention_status(true))
4868 return;
4869
4870 ereport(LOG,
4871 errmsg("logical replication worker for subscription \"%s\" will resume retaining the information for detecting conflicts",
4874 ? errdetail("Retention is re-enabled because the apply process has caught up with the publisher within the configured max_retention_duration.")
4875 : errdetail("Retention is re-enabled because max_retention_duration has been set to unlimited."));
4876
4877 /*
4878 * Restart the worker to let the launcher initialize
4879 * oldest_nonremovable_xid at startup.
4880 *
4881 * While it's technically possible to derive this value on-the-fly using
4882 * the conflict detection slot's xmin, doing so risks a race condition:
4883 * the launcher might clean slot.xmin just after retention resumes. This
4884 * would make oldest_nonremovable_xid unreliable, especially during xid
4885 * wraparound.
4886 *
4887 * Although this can be prevented by introducing heavy weight locking, the
4888 * complexity it will bring doesn't seem worthwhile given how rarely
4889 * retention is resumed.
4890 */
4892}
4893
4894/*
4895 * Updates pg_subscription.subretentionactive to the given value within a
4896 * new transaction.
4897 *
4898 * If already inside an active transaction, skips the update and returns
4899 * false.
4900 *
4901 * Returns true if the update is successfully performed.
4902 */
4903static bool
4905{
4906 /*
4907 * Do not update the catalog during an active transaction. The transaction
4908 * may be started during change application, leading to a possible
4909 * rollback of catalog updates if the application fails subsequently.
4910 */
4911 if (IsTransactionState())
4912 return false;
4913
4915
4916 /*
4917 * Updating pg_subscription might involve TOAST table access, so ensure we
4918 * have a valid snapshot.
4919 */
4921
4922 /* Update pg_subscription.subretentionactive */
4924
4927
4928 /* Notify launcher to update the conflict slot */
4930
4932
4933 return true;
4934}
4935
4936/*
4937 * Reset all data fields of RetainDeadTuplesData except those used to
4938 * determine the timing for the next round of transaction ID advancement. We
4939 * can even use flushpos_update_time in the next round to decide whether to get
4940 * the latest flush position.
4941 */
4942static void
4944{
4946 rdt_data->remote_lsn = InvalidXLogRecPtr;
4947 rdt_data->remote_oldestxid = InvalidFullTransactionId;
4948 rdt_data->remote_nextxid = InvalidFullTransactionId;
4949 rdt_data->reply_time = 0;
4950 rdt_data->remote_wait_for = InvalidFullTransactionId;
4951 rdt_data->candidate_xid = InvalidTransactionId;
4952 rdt_data->table_sync_wait_time = 0;
4953}
4954
4955/*
4956 * Adjust the interval for advancing non-removable transaction IDs.
4957 *
4958 * If there is no activity on the node or retention has been stopped, we
4959 * progressively double the interval used to advance non-removable transaction
4960 * ID. This helps conserve CPU and network resources when there's little benefit
4961 * to frequent updates.
4962 *
4963 * The interval is capped by the lowest of the following:
4964 * - wal_receiver_status_interval (if set and retention is active),
4965 * - a default maximum of 3 minutes,
4966 * - max_retention_duration (if retention is active).
4967 *
4968 * This ensures the interval never exceeds the retention boundary, even if other
4969 * limits are higher. Once activity resumes on the node and the retention is
4970 * active, the interval is reset to lesser of 100ms and max_retention_duration,
4971 * allowing timely advancement of non-removable transaction ID.
4972 *
4973 * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
4974 * consider the other interval or a separate GUC if the need arises.
4975 */
4976static void
4978{
4979 if (rdt_data->xid_advance_interval && !new_xid_found)
4980 {
4984
4985 /*
4986 * No new transaction ID has been assigned since the last check, so
4987 * double the interval, but not beyond the maximum allowable value.
4988 */
4989 rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4990 max_interval);
4991 }
4992 else if (rdt_data->xid_advance_interval &&
4994 {
4995 /*
4996 * Retention has been stopped, so double the interval-capped at a
4997 * maximum of 3 minutes. The wal_receiver_status_interval is
4998 * intentionally not used as an upper bound, since the likelihood of
4999 * retention resuming is lower than that of general activity resuming.
5000 */
5001 rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
5003 }
5004 else
5005 {
5006 /*
5007 * A new transaction ID was found or the interval is not yet
5008 * initialized, so set the interval to the minimum value.
5009 */
5010 rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
5011 }
5012
5013 /*
5014 * Ensure the wait time remains within the maximum retention time limit
5015 * when retention is active. Skip this cap when maxretention is zero,
5016 * which means unlimited retention (no timeout).
5017 */
5019 rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
5021}
5022
5023/*
5024 * Exit routine for apply workers due to subscription parameter changes.
5025 */
5026static void
5028{
5030 {
5031 /*
5032 * Don't stop the parallel apply worker as the leader will detect the
5033 * subscription parameter change and restart logical replication later
5034 * anyway. This also prevents the leader from reporting errors when
5035 * trying to communicate with a stopped parallel apply worker, which
5036 * would accidentally disable subscriptions if disable_on_error was
5037 * set.
5038 */
5039 return;
5040 }
5041
5042 /*
5043 * Reset the last-start time for this apply worker so that the launcher
5044 * will restart it without waiting for wal_retrieve_retry_interval if the
5045 * subscription is still active, and so that we won't leak that hash table
5046 * entry if it isn't.
5047 */
5050
5051 proc_exit(0);
5052}
5053
5054/*
5055 * Reread subscription info if needed.
5056 *
5057 * For significant changes, we react by exiting the current process; a new
5058 * one will be launched afterwards if needed.
5059 */
5060void
5062{
5064 bool started_tx = false;
5065
5066 /* When cache state is valid there is nothing to do here. */
5068 return;
5069
5070 /* This function might be called inside or outside of transaction. */
5071 if (!IsTransactionState())
5072 {
5074 started_tx = true;
5075 }
5076
5078
5079 if (newsub)
5080 {
5082 }
5083 else
5084 {
5085 /*
5086 * Exit if the subscription was removed. This normally should not
5087 * happen as the worker gets killed during DROP SUBSCRIPTION.
5088 */
5089 ereport(LOG,
5090 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
5091 MySubscription->name)));
5092
5093 /* Ensure we remove no-longer-useful entry for worker's start time */
5096
5097 proc_exit(0);
5098 }
5099
5100 /* Exit if the subscription was disabled. */
5101 if (!newsub->enabled)
5102 {
5103 ereport(LOG,
5104 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
5105 MySubscription->name)));
5106
5108 }
5109
5110 /* !slotname should never happen when enabled is true. */
5111 Assert(newsub->slotname);
5112
5113 /* two-phase cannot be altered while the worker is running */
5114 Assert(newsub->twophasestate == MySubscription->twophasestate);
5115
5116 /*
5117 * Exit if any parameter that affects the remote connection was changed.
5118 * The launcher will start a new worker but note that the parallel apply
5119 * worker won't restart if the streaming option's value is changed from
5120 * 'parallel' to any other value or the server decides not to stream the
5121 * in-progress transaction.
5122 */
5123 if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
5124 strcmp(newsub->name, MySubscription->name) != 0 ||
5125 strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
5126 newsub->binary != MySubscription->binary ||
5127 newsub->stream != MySubscription->stream ||
5128 newsub->passwordrequired != MySubscription->passwordrequired ||
5129 strcmp(newsub->origin, MySubscription->origin) != 0 ||
5130 newsub->owner != MySubscription->owner ||
5131 !equal(newsub->publications, MySubscription->publications))
5132 {
5134 ereport(LOG,
5135 (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
5136 MySubscription->name)));
5137 else
5138 ereport(LOG,
5139 (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
5140 MySubscription->name)));
5141
5143 }
5144
5145 /*
5146 * Exit if the subscription owner's superuser privileges have been
5147 * revoked.
5148 */
5149 if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
5150 {
5152 ereport(LOG,
5153 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
5155 else
5156 ereport(LOG,
5157 errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
5159
5161 }
5162
5163 /* Check for other changes that should never happen too. */
5164 if (newsub->dbid != MySubscription->dbid)
5165 {
5166 elog(ERROR, "subscription %u changed unexpectedly",
5168 }
5169
5170 /* Clean old subscription info and switch to new one. */
5173
5174 /* Change synchronous commit according to the user's wishes */
5175 SetConfigOption("synchronous_commit", MySubscription->synccommit,
5177
5178 /* Change wal_receiver_timeout according to the user's wishes */
5180
5181 if (started_tx)
5183
5184 MySubscriptionValid = true;
5185}
5186
5187/*
5188 * Change wal_receiver_timeout to MySubscription->walrcvtimeout.
5189 */
5190static void
5192{
5193 bool parsed;
5194 int val;
5196
5197 /*
5198 * Set the wal_receiver_timeout GUC to MySubscription->walrcvtimeout,
5199 * which comes from the subscription's wal_receiver_timeout option. If the
5200 * value is -1, reset the GUC to its default, meaning it will inherit from
5201 * the server config, command line, or role/database settings.
5202 */
5204 if (parsed && val == -1)
5205 SetConfigOption("wal_receiver_timeout", NULL,
5207 else
5208 SetConfigOption("wal_receiver_timeout", MySubscription->walrcvtimeout,
5210
5211 /*
5212 * Log the wal_receiver_timeout setting (in milliseconds) as a debug
5213 * message when it changes, to verify it was set correctly.
5214 */
5216 elog(DEBUG1, "logical replication worker for subscription \"%s\" wal_receiver_timeout: %d ms",
5218}
5219
5220/*
5221 * Callback from subscription syscache invalidation. Also needed for server or
5222 * user mapping invalidation, which can change the connection information for
5223 * subscriptions that connect using a server object.
5224 */
5225static void
5230
5231/*
5232 * subxact_info_write
5233 * Store information about subxacts for a toplevel transaction.
5234 *
5235 * For each subxact we store offset of its first change in the main file.
5236 * The file is always over-written as a whole.
5237 *
5238 * XXX We should only store subxacts that were not aborted yet.
5239 */
5240static void
5242{
5243 char path[MAXPGPATH];
5244 Size len;
5245 BufFile *fd;
5246
5248
5249 /* construct the subxact filename */
5250 subxact_filename(path, subid, xid);
5251
5252 /* Delete the subxacts file, if exists. */
5253 if (subxact_data.nsubxacts == 0)
5254 {
5257
5258 return;
5259 }
5260
5261 /*
5262 * Create the subxact file if it not already created, otherwise open the
5263 * existing file.
5264 */
5266 true);
5267 if (fd == NULL)
5269
5271
5272 /* Write the subxact count and subxact info */
5275
5277
5278 /* free the memory allocated for subxact info */
5280}
5281
5282/*
5283 * subxact_info_read
5284 * Restore information about subxacts of a streamed transaction.
5285 *
5286 * Read information about subxacts into the structure subxact_data that can be
5287 * used later.
5288 */
5289static void
5291{
5292 char path[MAXPGPATH];
5293 Size len;
5294 BufFile *fd;
5296
5300
5301 /*
5302 * If the subxact file doesn't exist that means we don't have any subxact
5303 * info.
5304 */
5305 subxact_filename(path, subid, xid);
5307 true);
5308 if (fd == NULL)
5309 return;
5310
5311 /* read number of subxact items */
5313
5315
5316 /* we keep the maximum as a power of 2 */
5318
5319 /*
5320 * Allocate subxact information in the logical streaming context. We need
5321 * this information during the complete stream so that we can add the sub
5322 * transaction info to this. On stream stop we will flush this information
5323 * to the subxact file and reset the logical streaming context.
5324 */
5329
5330 if (len > 0)
5332
5334}
5335
5336/*
5337 * subxact_info_add
5338 * Add information about a subxact (offset in the main file).
5339 */
5340static void
5342{
5343 SubXactInfo *subxacts = subxact_data.subxacts;
5344 int64 i;
5345
5346 /* We must have a valid top level stream xid and a stream fd. */
5348 Assert(stream_fd != NULL);
5349
5350 /*
5351 * If the XID matches the toplevel transaction, we don't want to add it.
5352 */
5353 if (stream_xid == xid)
5354 return;
5355
5356 /*
5357 * In most cases we're checking the same subxact as we've already seen in
5358 * the last call, so make sure to ignore it (this change comes later).
5359 */
5360 if (subxact_data.subxact_last == xid)
5361 return;
5362
5363 /* OK, remember we're processing this XID. */
5365
5366 /*
5367 * Check if the transaction is already present in the array of subxact. We
5368 * intentionally scan the array from the tail, because we're likely adding
5369 * a change for the most recent subtransactions.
5370 *
5371 * XXX Can we rely on the subxact XIDs arriving in sorted order? That
5372 * would allow us to use binary search here.
5373 */
5374 for (i = subxact_data.nsubxacts; i > 0; i--)
5375 {
5376 /* found, so we're done */
5377 if (subxacts[i - 1].xid == xid)
5378 return;
5379 }
5380
5381 /* This is a new subxact, so we need to add it to the array. */
5382 if (subxact_data.nsubxacts == 0)
5383 {
5385
5387
5388 /*
5389 * Allocate this memory for subxacts in per-stream context, see
5390 * subxact_info_read.
5391 */
5395 }
5397 {
5399 subxacts = repalloc_array(subxacts, SubXactInfo,
5401 }
5402
5403 subxacts[subxact_data.nsubxacts].xid = xid;
5404
5405 /*
5406 * Get the current offset of the stream file and store it as offset of
5407 * this subxact.
5408 */
5410 &subxacts[subxact_data.nsubxacts].fileno,
5411 &subxacts[subxact_data.nsubxacts].offset);
5412
5414 subxact_data.subxacts = subxacts;
5415}
5416
5417/* format filename for file containing the info about subxacts */
5418static inline void
5419subxact_filename(char *path, Oid subid, TransactionId xid)
5420{
5421 snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
5422}
5423
5424/* format filename for file containing serialized changes */
5425static inline void
5426changes_filename(char *path, Oid subid, TransactionId xid)
5427{
5428 snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
5429}
5430
5431/*
5432 * stream_cleanup_files
5433 * Cleanup files for a subscription / toplevel transaction.
5434 *
5435 * Remove files with serialized changes and subxact info for a particular
5436 * toplevel transaction. Each subscription has a separate set of files
5437 * for any toplevel transaction.
5438 */
5439void
5441{
5442 char path[MAXPGPATH];
5443
5444 /* Delete the changes file. */
5445 changes_filename(path, subid, xid);
5447
5448 /* Delete the subxact file, if it exists. */
5449 subxact_filename(path, subid, xid);
5451}
5452
5453/*
5454 * stream_open_file
5455 * Open a file that we'll use to serialize changes for a toplevel
5456 * transaction.
5457 *
5458 * Open a file for streamed changes from a toplevel transaction identified
5459 * by stream_xid (global variable). If it's the first chunk of streamed
5460 * changes for this transaction, create the buffile, otherwise open the
5461 * previously created file.
5462 */
5463static void
5465{
5466 char path[MAXPGPATH];
5468
5469 Assert(OidIsValid(subid));
5471 Assert(stream_fd == NULL);
5472
5473
5474 changes_filename(path, subid, xid);
5475 elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
5476
5477 /*
5478 * Create/open the buffiles under the logical streaming context so that we
5479 * have those files until stream stop.
5480 */
5482
5483 /*
5484 * If this is the first streamed segment, create the changes file.
5485 * Otherwise, just open the file for writing, in append mode.
5486 */
5487 if (first_segment)
5489 path);
5490 else
5491 {
5492 /*
5493 * Open the file and seek to the end of the file because we always
5494 * append the changes file.
5495 */
5497 path, O_RDWR, false);
5499 }
5500
5502}
5503
5504/*
5505 * stream_close_file
5506 * Close the currently open file with streamed changes.
5507 */
5508static void
5510{
5511 Assert(stream_fd != NULL);
5512
5514
5515 stream_fd = NULL;
5516}
5517
5518/*
5519 * stream_write_change
5520 * Serialize a change to a file for the current toplevel transaction.
5521 *
5522 * The change is serialized in a simple format, with length (not including
5523 * the length), action code (identifying the message type) and message
5524 * contents (without the subxact TransactionId value).
5525 */
5526static void
5528{
5529 int len;
5530
5531 Assert(stream_fd != NULL);
5532
5533 /* total on-disk size, including the action type character */
5534 len = (s->len - s->cursor) + sizeof(char);
5535
5536 /* first write the size */
5537 BufFileWrite(stream_fd, &len, sizeof(len));
5538
5539 /* then the action */
5540 BufFileWrite(stream_fd, &action, sizeof(action));
5541
5542 /* and finally the remaining part of the buffer (after the XID) */
5543 len = (s->len - s->cursor);
5544
5546}
5547
5548/*
5549 * stream_open_and_write_change
5550 * Serialize a message to a file for the given transaction.
5551 *
5552 * This function is similar to stream_write_change except that it will open the
5553 * target file if not already before writing the message and close the file at
5554 * the end.
5555 */
5556static void
5558{
5560
5561 if (!stream_fd)
5562 stream_start_internal(xid, false);
5563
5564 stream_write_change(action, s);
5566}
5567
5568/*
5569 * Sets streaming options including replication slot name and origin start
5570 * position. Workers need these options for logical replication.
5571 */
5572void
5574 char *slotname,
5576{
5577 int server_version;
5578
5579 options->logical = true;
5580 options->startpoint = *origin_startpos;
5581 options->slotname = slotname;
5582
5584 options->proto.logical.proto_version =
5589
5590 options->proto.logical.publication_names = MySubscription->publications;
5591 options->proto.logical.binary = MySubscription->binary;
5592
5593 /*
5594 * Assign the appropriate option value for streaming option according to
5595 * the 'streaming' mode and the publisher's ability to support that mode.
5596 */
5597 if (server_version >= 160000 &&
5599 {
5600 options->proto.logical.streaming_str = "parallel";
5602 }
5603 else if (server_version >= 140000 &&
5605 {
5606 options->proto.logical.streaming_str = "on";
5608 }
5609 else
5610 {
5611 options->proto.logical.streaming_str = NULL;
5613 }
5614
5615 options->proto.logical.twophase = false;
5616 options->proto.logical.origin = pstrdup(MySubscription->origin);
5617}
5618
5619/*
5620 * Cleanup the memory for subxacts and reset the related variables.
5621 */
5622static inline void
5633
5634/*
5635 * Common function to run the apply loop with error handling. Disable the
5636 * subscription, if necessary.
5637 *
5638 * Note that we don't handle FATAL errors which are probably because
5639 * of system resource error and are not repeatable.
5640 */
5641void
5643{
5644 PG_TRY();
5645 {
5647 }
5648 PG_CATCH();
5649 {
5650 /*
5651 * Reset the origin state to prevent the advancement of origin
5652 * progress if we fail to apply. Otherwise, this will result in
5653 * transaction loss as that transaction won't be sent again by the
5654 * server.
5655 */
5657
5660 else
5661 {
5662 /*
5663 * Report the worker failed while applying changes. Abort the
5664 * current transaction so that the stats message is sent in an
5665 * idle state.
5666 */
5669
5670 PG_RE_THROW();
5671 }
5672 }
5673 PG_END_TRY();
5674}
5675
5676/*
5677 * Runs the leader apply worker.
5678 *
5679 * It sets up replication origin, streaming options and then starts streaming.
5680 */
5681static void
5683{
5684 char originname[NAMEDATALEN];
5686 char *slotname = NULL;
5689 TimeLineID startpointTLI;
5690 char *err;
5691 bool must_use_password;
5692
5693 slotname = MySubscription->slotname;
5694
5695 /*
5696 * This shouldn't happen if the subscription is enabled, but guard against
5697 * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
5698 * slot is NULL.)
5699 */
5700 if (!slotname)
5701 ereport(ERROR,
5703 errmsg("subscription has no replication slot set")));
5704
5705 /* Setup replication origin tracking. */
5707 originname, sizeof(originname));
5710 if (!OidIsValid(originid))
5716
5717 /* Is the use of a password mandatory? */
5720
5722 true, must_use_password,
5724
5726 ereport(ERROR,
5728 errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
5729 MySubscription->name, err)));
5730
5731 /*
5732 * We don't really use the output identify_system for anything but it does
5733 * some initializations on the upstream so let's still call it.
5734 */
5736
5738
5740
5741 /*
5742 * Even when the two_phase mode is requested by the user, it remains as
5743 * the tri-state PENDING until all tablesyncs have reached READY state.
5744 * Only then, can it become ENABLED.
5745 *
5746 * Note: If the subscription has no tables then leave the state as
5747 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
5748 * work.
5749 */
5752 {
5753 /* Start streaming with two_phase enabled */
5754 options.proto.logical.twophase = true;
5756
5758
5759 /*
5760 * Updating pg_subscription might involve TOAST table access, so
5761 * ensure we have a valid snapshot.
5762 */
5764
5769 }
5770 else
5771 {
5773 }
5774
5776 (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
5781 "?")));
5782
5783 /* Run the main loop. */
5785}
5786
5787/*
5788 * Common initialization for leader apply worker, parallel apply worker,
5789 * tablesync worker and sequencesync worker.
5790 *
5791 * Initialize the database connection, in-memory subscription and necessary
5792 * config options.
5793 */
5794void
5796{
5797 /* Run as replica session replication role. */
5798 SetConfigOption("session_replication_role", "replica",
5800
5801 /* Connect to our database. */
5804 0);
5805
5806 /*
5807 * Set always-secure search path, so malicious users can't redirect user
5808 * code (e.g. pg_index.indexprs).
5809 */
5810 SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
5811
5813 "ApplyContext",
5815
5817
5818 /*
5819 * Lock the subscription to prevent it from being concurrently dropped,
5820 * then re-verify its existence. After the initialization, the worker will
5821 * be terminated gracefully if the subscription is dropped.
5822 */
5825
5827
5828 if (MySubscription)
5829 {
5831 }
5832 else
5833 {
5834 ereport(LOG,
5835 (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
5837
5838 /* Ensure we remove no-longer-useful entry for worker's start time */
5841
5842 proc_exit(0);
5843 }
5844
5845 MySubscriptionValid = true;
5846
5847 if (!MySubscription->enabled)
5848 {
5849 ereport(LOG,
5850 (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
5851 MySubscription->name)));
5852
5854 }
5855
5856 /*
5857 * Restart the worker if retain_dead_tuples was enabled during startup.
5858 *
5859 * At this point, the replication slot used for conflict detection might
5860 * not exist yet, or could be dropped soon if the launcher perceives
5861 * retain_dead_tuples as disabled. To avoid unnecessary tracking of
5862 * oldest_nonremovable_xid when the slot is absent or at risk of being
5863 * dropped, a restart is initiated.
5864 *
5865 * The oldest_nonremovable_xid should be initialized only when the
5866 * subscription's retention is active before launching the worker. See
5867 * logicalrep_worker_launch.
5868 */
5869 if (am_leader_apply_worker() &&
5873 {
5874 ereport(LOG,
5875 errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
5876 MySubscription->name, "retain_dead_tuples"));
5877
5879 }
5880
5881 /* Setup synchronous commit according to the user's wishes */
5882 SetConfigOption("synchronous_commit", MySubscription->synccommit,
5884
5885 /* Change wal_receiver_timeout according to the user's wishes */
5887
5888 /*
5889 * Keep us informed about subscription or role changes. Note that the
5890 * role's superuser privilege can be revoked.
5891 */
5894 (Datum) 0);
5895 /* Changes to foreign servers may affect subscriptions using SERVER. */
5898 (Datum) 0);
5899 /* Changes to user mappings may affect subscriptions using SERVER. */
5902 (Datum) 0);
5903
5904 /*
5905 * Changes to FDW connection_function may affect subscriptions using
5906 * SERVER.
5907 */
5910 (Datum) 0);
5911
5914 (Datum) 0);
5915
5916 if (am_tablesync_worker())
5917 ereport(LOG,
5918 errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
5921 else if (am_sequencesync_worker())
5922 ereport(LOG,
5923 errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started",
5925 else
5926 ereport(LOG,
5927 errmsg("logical replication apply worker for subscription \"%s\" has started",
5929
5931
5932 /*
5933 * Register a callback to reset the origin state before aborting any
5934 * pending transaction during shutdown (see ShutdownPostgres()). This will
5935 * avoid origin advancement for an incomplete transaction which could
5936 * otherwise lead to its loss as such a transaction won't be sent by the
5937 * server again.
5938 *
5939 * Note that even a LOG or DEBUG statement placed after setting the origin
5940 * state may process a shutdown signal before committing the current apply
5941 * operation. So, it is important to register such a callback here.
5942 *
5943 * Register this callback here to ensure that all types of logical
5944 * replication workers that set up origins and apply remote transactions
5945 * are protected.
5946 */
5948}
5949
5950/*
5951 * Callback on exit to clear transaction-level replication origin state.
5952 */
5953static void
5955{
5957}
5958
5959/*
5960 * Common function to setup the leader apply, tablesync and sequencesync worker.
5961 */
5962void
5964{
5965 /* Attach to slot */
5967
5969
5970 /* Setup signal handling */
5973
5974 /*
5975 * We don't currently need any ResourceOwner in a walreceiver process, but
5976 * if we did, we could call CreateAuxProcessResourceOwner here.
5977 */
5978
5979 /* Initialise stats to a sanish value */
5982
5983 /* Load the libpq-specific functions */
5984 load_file("libpqwalreceiver", false);
5985
5987
5988 /* Connect to the origin and start the replication. */
5989 elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
5991
5992 /*
5993 * Setup callback for syscache so that we know when something changes in
5994 * the subscription relation state.
5995 */
5998 (Datum) 0);
5999}
6000
6001/* Logical Replication Apply worker entry point */
6002void
6004{
6006
6008
6010
6012
6014
6015 proc_exit(0);
6016}
6017
6018/*
6019 * After error recovery, disable the subscription in a new transaction
6020 * and exit cleanly.
6021 */
6022void
6024{
6025 /*
6026 * Emit the error message, and recover from the error state to an idle
6027 * state
6028 */
6030
6034
6036
6037 /*
6038 * Report the worker failed during sequence synchronization, table
6039 * synchronization, or apply.
6040 */
6042
6043 /* Disable the subscription */
6045
6046 /*
6047 * Updating pg_subscription might involve TOAST table access, so ensure we
6048 * have a valid snapshot.
6049 */
6051
6055
6056 /* Ensure we remove no-longer-useful entry for worker's start time */
6059
6060 /* Notify the subscription has been disabled and exit */
6061 ereport(LOG,
6062 errmsg("subscription \"%s\" has been disabled because of an error",
6064
6065 /*
6066 * Skip the track_commit_timestamp check when disabling the worker due to
6067 * an error, as verifying commit timestamps is unnecessary in this
6068 * context.
6069 */
6073
6074 proc_exit(0);
6075}
6076
6077/*
6078 * Is current process a logical replication worker?
6079 */
6080bool
6082{
6083 return MyLogicalRepWorker != NULL;
6084}
6085
6086/*
6087 * Is current process a logical replication parallel apply worker?
6088 */
6089bool
6094
6095/*
6096 * Start skipping changes of the transaction if the given LSN matches the
6097 * LSN specified by subscription's skiplsn.
6098 */
6099static void
6101{
6105
6106 /*
6107 * Quick return if it's not requested to skip this transaction. This
6108 * function is called for every remote transaction and we assume that
6109 * skipping the transaction is not used often.
6110 */
6112 MySubscription->skiplsn != finish_lsn))
6113 return;
6114
6115 /* Start skipping all changes of this transaction */
6116 skip_xact_finish_lsn = finish_lsn;
6117
6118 ereport(LOG,
6119 errmsg("logical replication starts skipping transaction at LSN %X/%08X",
6121}
6122
6123/*
6124 * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
6125 */
6126static void
6128{
6129 if (!is_skipping_changes())
6130 return;
6131
6132 ereport(LOG,
6133 errmsg("logical replication completed skipping transaction at LSN %X/%08X",
6135
6136 /* Stop skipping changes */
6138}
6139
6140/*
6141 * Clear subskiplsn of pg_subscription catalog.
6142 *
6143 * finish_lsn is the transaction's finish LSN that is used to check if the
6144 * subskiplsn matches it. If not matched, we raise a warning when clearing the
6145 * subskiplsn in order to inform users for cases e.g., where the user mistakenly
6146 * specified the wrong subskiplsn.
6147 */
6148static void
6150{
6151 Relation rel;
6153 HeapTuple tup;
6155 bool started_tx = false;
6156
6158 return;
6159
6160 if (!IsTransactionState())
6161 {
6163 started_tx = true;
6164 }
6165
6166 /*
6167 * Updating pg_subscription might involve TOAST table access, so ensure we
6168 * have a valid snapshot.
6169 */
6171
6172 /*
6173 * Protect subskiplsn of pg_subscription from being concurrently updated
6174 * while clearing it.
6175 */
6178
6180
6181 /* Fetch the existing tuple. */
6184
6185 if (!HeapTupleIsValid(tup))
6186 elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
6187
6189
6190 /*
6191 * Clear the subskiplsn. If the user has already changed subskiplsn before
6192 * clearing it we don't update the catalog and the replication origin
6193 * state won't get advanced. So in the worst case, if the server crashes
6194 * before sending an acknowledgment of the flush position the transaction
6195 * will be sent again and the user needs to set subskiplsn again. We can
6196 * reduce the possibility by logging a replication origin WAL record to
6197 * advance the origin LSN instead but there is no way to advance the
6198 * origin timestamp and it doesn't seem to be worth doing anything about
6199 * it since it's a very rare case.
6200 */
6201 if (subform->subskiplsn == myskiplsn)
6202 {
6203 bool nulls[Natts_pg_subscription];
6206
6207 memset(values, 0, sizeof(values));
6208 memset(nulls, false, sizeof(nulls));
6209 memset(replaces, false, sizeof(replaces));
6210
6211 /* reset subskiplsn */
6214
6216 replaces);
6217 CatalogTupleUpdate(rel, &tup->t_self, tup);
6218
6219 if (myskiplsn != finish_lsn)
6221 errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
6222 errdetail("Remote transaction's finish WAL location (LSN) %X/%08X did not match skip-LSN %X/%08X.",
6223 LSN_FORMAT_ARGS(finish_lsn),
6225 }
6226
6228 table_close(rel, NoLock);
6229
6231
6232 if (started_tx)
6234}
6235
6236/* Error callback to give more context info about the change being applied */
6237void
6239{
6241
6243 return;
6244
6245 Assert(errarg->origin_name);
6246
6247 if (errarg->rel == NULL)
6248 {
6249 if (!TransactionIdIsValid(errarg->remote_xid))
6250 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
6251 errarg->origin_name,
6253 else if (!XLogRecPtrIsValid(errarg->finish_lsn))
6254 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
6255 errarg->origin_name,
6257 errarg->remote_xid);
6258 else
6259 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
6260 errarg->origin_name,
6262 errarg->remote_xid,
6263 LSN_FORMAT_ARGS(errarg->finish_lsn));
6264 }
6265 else
6266 {
6267 if (errarg->remote_attnum < 0)
6268 {
6269 if (!XLogRecPtrIsValid(errarg->finish_lsn))
6270 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
6271 errarg->origin_name,
6273 errarg->rel->remoterel.nspname,
6274 errarg->rel->remoterel.relname,
6275 errarg->remote_xid);
6276 else
6277 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
6278 errarg->origin_name,
6280 errarg->rel->remoterel.nspname,
6281 errarg->rel->remoterel.relname,
6282 errarg->remote_xid,
6283 LSN_FORMAT_ARGS(errarg->finish_lsn));
6284 }
6285 else
6286 {
6287 if (!XLogRecPtrIsValid(errarg->finish_lsn))
6288 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
6289 errarg->origin_name,
6291 errarg->rel->remoterel.nspname,
6292 errarg->rel->remoterel.relname,
6293 errarg->rel->remoterel.attnames[errarg->remote_attnum],
6294 errarg->remote_xid);
6295 else
6296 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
6297 errarg->origin_name,
6299 errarg->rel->remoterel.nspname,
6300 errarg->rel->remoterel.relname,
6301 errarg->rel->remoterel.attnames[errarg->remote_attnum],
6302 errarg->remote_xid,
6303 LSN_FORMAT_ARGS(errarg->finish_lsn));
6304 }
6305 }
6306}
6307
6308/* Set transaction information of apply error callback */
6309static inline void
6315
6316/* Reset all information of apply error callback */
6317static inline void
6325
6326/*
6327 * Request wakeup of the workers for the given subscription OID
6328 * at commit of the current transaction.
6329 *
6330 * This is used to ensure that the workers process assorted changes
6331 * as soon as possible.
6332 */
6333void
6343
6344/*
6345 * Wake up the workers of any subscriptions that were changed in this xact.
6346 */
6347void
6349{
6351 {
6352 ListCell *lc;
6353
6356 {
6357 Oid subid = lfirst_oid(lc);
6358 List *workers;
6359 ListCell *lc2;
6360
6361 workers = logicalrep_workers_find(subid, true, false);
6362 foreach(lc2, workers)
6363 {
6365
6367 }
6368 }
6370 }
6371
6372 /* The List storage will be reclaimed automatically in xact cleanup. */
6374}
6375
6376/*
6377 * Allocate the origin name in long-lived context for error context message.
6378 */
6379void
6385
6386/*
6387 * Return the action to be taken for the given transaction. See
6388 * TransApplyAction for information on each of the actions.
6389 *
6390 * *winfo is assigned to the destination parallel worker info when the leader
6391 * apply worker has to pass all the transaction's changes to the parallel
6392 * apply worker.
6393 */
6394static TransApplyAction
6396{
6397 *winfo = NULL;
6398
6400 {
6401 return TRANS_PARALLEL_APPLY;
6402 }
6403
6404 /*
6405 * If we are processing this transaction using a parallel apply worker
6406 * then either we send the changes to the parallel worker or if the worker
6407 * is busy then serialize the changes to the file which will later be
6408 * processed by the parallel worker.
6409 */
6410 *winfo = pa_find_worker(xid);
6411
6412 if (*winfo && (*winfo)->serialize_changes)
6413 {
6415 }
6416 else if (*winfo)
6417 {
6419 }
6420
6421 /*
6422 * If there is no parallel worker involved to process this transaction
6423 * then we either directly apply the change or serialize it to a file
6424 * which will later be applied when the transaction finish message is
6425 * processed.
6426 */
6427 else if (in_streamed_transaction)
6428 {
6430 }
6431 else
6432 {
6433 return TRANS_LEADER_APPLY;
6434 }
6435}
AclResult
Definition acl.h:183
@ ACLCHECK_OK
Definition acl.h:184
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition aclchk.c:2672
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition aclchk.c:4083
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_reset_subtrans(void)
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerShared * MyParallelShared
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_allocate_worker(TransactionId xid)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void pa_decr_and_wait_stream_block(void)
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition atomics.h:424
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition worker.c:2766
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition worker.c:5419
static void begin_replication_step(void)
Definition worker.c:733
static void end_replication_step(void)
Definition worker.c:756
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
Definition worker.c:877
static void cleanup_subxact_info(void)
Definition worker.c:5623
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
Definition worker.c:5573
static void apply_handle_stream_prepare(StringInfo s)
Definition worker.c:1535
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition worker.c:2741
static void subxact_info_add(TransactionId xid)
Definition worker.c:5341
static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
Definition worker.c:4791
static XLogRecPtr last_flushpos
Definition worker.c:532
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition worker.c:5440
MemoryContext ApplyMessageContext
Definition worker.c:476
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition worker.c:688
static void apply_handle_type(StringInfo s)
Definition worker.c:2603
static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
Definition worker.c:4423
static void wait_for_local_flush(RetainDeadTuplesData *rdt_data)
Definition worker.c:4636
static void apply_handle_truncate(StringInfo s)
Definition worker.c:3669
RetainDeadTuplesPhase
Definition worker.c:393
@ RDT_WAIT_FOR_PUBLISHER_STATUS
Definition worker.c:396
@ RDT_RESUME_CONFLICT_INFO_RETENTION
Definition worker.c:399
@ RDT_GET_CANDIDATE_XID
Definition worker.c:394
@ RDT_REQUEST_PUBLISHER_STATUS
Definition worker.c:395
@ RDT_WAIT_FOR_LOCAL_FLUSH
Definition worker.c:397
@ RDT_STOP_CONFLICT_INFO_RETENTION
Definition worker.c:398
static void run_apply_worker(void)
Definition worker.c:5682
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition worker.c:3987
static void get_candidate_xid(RetainDeadTuplesData *rdt_data)
Definition worker.c:4475
static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
Definition worker.c:6395
TransApplyAction
Definition worker.c:375
@ TRANS_LEADER_SERIALIZE
Definition worker.c:380
@ TRANS_PARALLEL_APPLY
Definition worker.c:383
@ TRANS_LEADER_SEND_TO_PARALLEL
Definition worker.c:381
@ TRANS_LEADER_APPLY
Definition worker.c:377
@ TRANS_LEADER_PARTIAL_SERIALIZE
Definition worker.c:382
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition worker.c:784
static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
Definition worker.c:5557
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition worker.c:5426
bool InitializingApplyWorker
Definition worker.c:504
static void apply_worker_exit(void)
Definition worker.c:5027
static BufFile * stream_fd
Definition worker.c:525
static void apply_handle_update(StringInfo s)
Definition worker.c:2807
void stream_stop_internal(TransactionId xid)
Definition worker.c:1879
static void apply_handle_stream_commit(StringInfo s)
Definition worker.c:2407
void start_apply(XLogRecPtr origin_startpos)
Definition worker.c:5642
static void stop_skipping_changes(void)
Definition worker.c:6127
#define NAPTIME_PER_CYCLE
Definition worker.c:304
static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition worker.c:3196
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition worker.c:3917
static bool update_retention_status(bool active)
Definition worker.c:4904
static uint32 parallel_stream_nchanges
Definition worker.c:501
static void apply_handle_commit_prepared(StringInfo s)
Definition worker.c:1422
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition worker.c:4003
void LogicalRepWorkersWakeupAtCommit(Oid subid)
Definition worker.c:6334
#define MAX_XID_ADVANCE_INTERVAL
Definition worker.c:461
bool IsLogicalWorker(void)
Definition worker.c:6081
static ApplySubXactData subxact_data
Definition worker.c:550
static void ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, pgoff_t offset)
Definition worker.c:2245
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
Definition worker.c:3373
static ApplyErrorCallbackArg apply_error_callback_arg
Definition worker.c:464
static void subscription_change_cb(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
Definition worker.c:5226
bool in_remote_transaction
Definition worker.c:489
static XLogRecPtr skip_xact_finish_lsn
Definition worker.c:521
static void stream_open_file(Oid subid, TransactionId xid, bool first_segment)
Definition worker.c:5464
static void apply_handle_delete(StringInfo s)
Definition worker.c:3034
void apply_dispatch(StringInfo s)
Definition worker.c:3797
static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
Definition worker.c:4977
#define is_skipping_changes()
Definition worker.c:522
static void stream_write_change(char action, StringInfo s)
Definition worker.c:5527
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
Definition worker.c:6149
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
Definition worker.c:2929
#define MIN_XID_ADVANCE_INTERVAL
Definition worker.c:460
static void apply_handle_begin(StringInfo s)
Definition worker.c:1228
void DisableSubscriptionAndExit(void)
Definition worker.c:6023
static dlist_head lsn_mapping
Definition worker.c:313
bool IsLogicalParallelApplyWorker(void)
Definition worker.c:6090
void AtEOXact_LogicalRepWorkers(bool isCommit)
Definition worker.c:6348
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition worker.c:1024
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition worker.c:648
static void finish_edata(ApplyExecutionData *edata)
Definition worker.c:935
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition worker.c:1131
static void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
Definition worker.c:6310
ErrorContextCallback * apply_error_context_stack
Definition worker.c:474
static void stream_abort_internal(TransactionId xid, TransactionId subxid)
Definition worker.c:2005
static void apply_handle_commit(StringInfo s)
Definition worker.c:1253
static bool IsIndexUsableForFindingDeletedTuple(Oid localindexoid, TransactionId conflict_detection_xmin)
Definition worker.c:3257
void stream_start_internal(TransactionId xid, bool first_segment)
Definition worker.c:1704
static List * on_commit_wakeup_workers_subids
Definition worker.c:487
static void apply_handle_stream_abort(StringInfo s)
Definition worker.c:2088
static void apply_handle_relation(StringInfo s)
Definition worker.c:2580
void set_apply_error_context_origin(char *originname)
Definition worker.c:6380
static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data, bool status_received)
Definition worker.c:4577
MemoryContext ApplyContext
Definition worker.c:477
static void subxact_info_write(Oid subid, TransactionId xid)
Definition worker.c:5241
static void TargetPrivilegesCheck(Relation rel, AclMode mode)
Definition worker.c:2618
static void apply_handle_prepare(StringInfo s)
Definition worker.c:1348
static void apply_handle_rollback_prepared(StringInfo s)
Definition worker.c:1474
void SetupApplyOrSyncWorker(int worker_slot)
Definition worker.c:5963
static void apply_handle_stream_stop(StringInfo s)
Definition worker.c:1902
static void apply_handle_origin(StringInfo s)
Definition worker.c:1683
static void request_publisher_status(RetainDeadTuplesData *rdt_data)
Definition worker.c:4538
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition worker.c:4319
static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
Definition worker.c:4943
static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data, bool status_received)
Definition worker.c:4445
static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data, bool status_received)
Definition worker.c:4409
WalReceiverConn * LogRepWorkerWalRcvConn
Definition worker.c:482
static void resume_conflict_info_retention(RetainDeadTuplesData *rdt_data)
Definition worker.c:4864
static XLogRecPtr remote_final_lsn
Definition worker.c:490
static bool MySubscriptionValid
Definition worker.c:485
void apply_error_callback(void *arg)
Definition worker.c:6238
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition worker.c:3961
static MemoryContext LogicalStreamingContext
Definition worker.c:480
void maybe_reread_subscription(void)
Definition worker.c:5061
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
Definition worker.c:2520
void InitializeLogRepWorker(void)
Definition worker.c:5795
static void set_wal_receiver_timeout(void)
Definition worker.c:5191
static bool in_streamed_transaction
Definition worker.c:493
static void apply_handle_begin_prepare(StringInfo s)
Definition worker.c:1282
void ApplyWorkerMain(Datum main_arg)
Definition worker.c:6003
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition worker.c:2277
static void apply_handle_stream_start(StringInfo s)
Definition worker.c:1742
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Definition worker.c:6100
static void on_exit_clear_xact_state(int code, Datum arg)
Definition worker.c:5954
static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
Definition worker.c:4826
Subscription * MySubscription
Definition worker.c:484
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
Definition worker.c:1311
static void stream_close_file(void)
Definition worker.c:5509
static TransactionId stream_xid
Definition worker.c:495
static void apply_handle_insert(StringInfo s)
Definition worker.c:2650
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition worker.c:966
static bool FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, TupleTableSlot *remoteslot, TransactionId *delete_xid, ReplOriginId *delete_origin, TimestampTz *delete_time)
Definition worker.c:3291
static void subxact_info_read(Oid subid, TransactionId xid)
Definition worker.c:5290
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
Definition worker.c:3128
static void reset_apply_error_context_info(void)
Definition worker.c:6318
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1765
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1789
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1649
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1613
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_IDLE
@ STATE_IDLEINTRANSACTION
@ STATE_RUNNING
void BackgroundWorkerUnblockSignals(void)
Definition bgworker.c:949
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition bgworker.c:909
Bitmapset * bms_make_singleton(int x)
Definition bitmapset.c:216
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition bitmapset.c:799
static Datum values[MAXATTR]
Definition bootstrap.c:190
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition buffile.c:292
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition buffile.c:655
int BufFileSeek(BufFile *file, int fileno, pgoff_t offset, int whence)
Definition buffile.c:741
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition buffile.c:677
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
Definition buffile.c:665
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition buffile.c:268
void BufFileTruncateFileSet(BufFile *file, int fileno, pgoff_t offset)
Definition buffile.c:928
void BufFileTell(BufFile *file, int *fileno, pgoff_t *offset)
Definition buffile.c:833
void BufFileClose(BufFile *file)
Definition buffile.c:413
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
Definition buffile.c:365
#define Min(x, y)
Definition c.h:1091
#define likely(x)
Definition c.h:437
#define Assert(condition)
Definition c.h:943
int64_t int64
Definition c.h:621
uint64_t uint64
Definition c.h:625
uint32_t uint32
Definition c.h:624
#define pg_fallthrough
Definition c.h:161
uint32 TransactionId
Definition c.h:736
#define OidIsValid(objectId)
Definition c.h:858
size_t Size
Definition c.h:689
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
bool track_commit_timestamp
Definition commit_ts.c:121
bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, ReplOriginId *localorigin, TimestampTz *localts)
Definition conflict.c:64
void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples)
Definition conflict.c:105
void InitConflictIndexes(ResultRelInfo *relInfo)
Definition conflict.c:140
ConflictType
Definition conflict.h:32
@ CT_UPDATE_DELETED
Definition conflict.h:43
@ CT_DELETE_MISSING
Definition conflict.h:52
@ CT_UPDATE_ORIGIN_DIFFERS
Definition conflict.h:37
@ CT_UPDATE_MISSING
Definition conflict.h:46
@ CT_DELETE_ORIGIN_DIFFERS
Definition conflict.h:49
static DataChecksumsWorkerOperation operation
int64 TimestampTz
Definition timestamp.h:39
void load_file(const char *filename, bool restricted)
Definition dfmgr.c:149
Datum arg
Definition elog.c:1323
void EmitErrorReport(void)
Definition elog.c:1883
ErrorContextCallback * error_context_stack
Definition elog.c:100
void FlushErrorState(void)
Definition elog.c:2063
int errcode(int sqlerrcode)
Definition elog.c:875
#define LOG
Definition elog.h:32
#define PG_RE_THROW()
Definition elog.h:407
int int errdetail_internal(const char *fmt,...) pg_attribute_printf(1
#define errcontext
Definition elog.h:200
int errdetail(const char *fmt,...) pg_attribute_printf(1
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define PG_TRY(...)
Definition elog.h:374
#define WARNING
Definition elog.h:37
#define DEBUG2
Definition elog.h:30
#define PG_END_TRY(...)
Definition elog.h:399
#define DEBUG1
Definition elog.h:31
#define ERROR
Definition elog.h:40
#define PG_CATCH(...)
Definition elog.h:384
#define elog(elevel,...)
Definition elog.h:228
#define ereport(elevel,...)
Definition elog.h:152
bool equal(const void *a, const void *b)
Definition equalfuncs.c:223
void err(int eval, const char *fmt,...)
Definition err.c:43
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition execExpr.c:143
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition execMain.c:1885
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam, List *resultRelations)
Definition execMain.c:2747
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition execMain.c:1271
void EvalPlanQualEnd(EPQState *epqstate)
Definition execMain.c:3208
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
void CheckSubscriptionRelkind(char localrelkind, char remoterelkind, const char *nspname, const char *relname)
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
bool RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, ReplOriginId *delete_origin, TimestampTz *delete_time)
bool RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, ReplOriginId *delete_origin, TimestampTz *delete_time)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
const TupleTableSlotOps TTSOpsVirtual
Definition execTuples.c:84
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
TupleConversionMap * ExecGetRootToChildMap(ResultRelInfo *resultRelInfo, EState *estate)
Definition execUtils.c:1352
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos, Bitmapset *unpruned_relids)
Definition execUtils.c:799
void FreeExecutorState(EState *estate)
Definition execUtils.c:197
EState * CreateExecutorState(void)
Definition execUtils.c:90
#define GetPerTupleExprContext(estate)
Definition executor.h:667
#define GetPerTupleMemoryContext(estate)
Definition executor.h:672
#define EvalPlanQualSetSlot(epqstate, slot)
Definition executor.h:290
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition executor.h:403
#define ERRCODE_PROTOCOL_VIOLATION
Definition fe-connect.c:96
#define palloc_object(type)
Definition fe_memutils.h:89
#define repalloc_array(pointer, type, count)
Definition fe_memutils.h:94
#define palloc_array(type, count)
Definition fe_memutils.h:91
#define palloc0_object(type)
Definition fe_memutils.h:90
void FileSetInit(FileSet *fileset)
Definition fileset.c:52
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
Definition fmgr.c:1773
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition fmgr.c:1755
struct Latch * MyLatch
Definition globals.c:65
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
bool parse_int(const char *value, int *result, int flags, const char **hintmsg)
Definition guc.c:2775
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition guc.c:4234
@ PGC_S_OVERRIDE
Definition guc.h:123
@ PGC_S_SESSION
Definition guc.h:126
@ PGC_SUSET
Definition guc.h:78
@ PGC_SIGHUP
Definition guc.h:75
@ PGC_BACKEND
Definition guc.h:77
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition heaptuple.c:1118
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1372
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static TransactionId HeapTupleHeaderGetXmin(const HeapTupleHeaderData *tup)
static void * GETSTRUCT(const HeapTupleData *tuple)
static void dlist_delete(dlist_node *node)
Definition ilist.h:405
#define dlist_tail_element(type, membername, lhead)
Definition ilist.h:612
#define dlist_foreach_modify(iter, lhead)
Definition ilist.h:640
static bool dlist_is_empty(const dlist_head *head)
Definition ilist.h:336
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition ilist.h:364
#define DLIST_STATIC_INIT(name)
Definition ilist.h:281
#define dlist_container(type, membername, ptr)
Definition ilist.h:593
void index_close(Relation relation, LOCKMODE lockmode)
Definition indexam.c:178
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition indexam.c:134
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition indexing.c:313
long val
Definition informix.c:689
#define write(a, b, c)
Definition win32.h:14
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
void AcceptInvalidationMessages(void)
Definition inval.c:930
void CacheRegisterSyscacheCallback(SysCacheIdentifier cacheid, SyscacheCallbackFunction func, Datum arg)
Definition inval.c:1813
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
void proc_exit(int code)
Definition ipc.c:105
int i
Definition isn.c:77
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition latch.c:223
void ResetLatch(Latch *latch)
Definition latch.c:374
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition launcher.c:303
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition launcher.c:756
void logicalrep_worker_attach(int slot)
Definition launcher.c:767
void ApplyLauncherWakeup(void)
Definition launcher.c:1195
LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
Definition launcher.c:268
void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
Definition launcher.c:733
LogicalRepWorker * MyLogicalRepWorker
Definition launcher.c:58
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition launcher.c:1155
List * lappend(List *list, void *datum)
Definition list.c:339
List * lappend_oid(List *list, Oid datum)
Definition list.c:375
List * list_append_unique_oid(List *list, Oid datum)
Definition list.c:1380
bool list_member_oid(const List *list, Oid datum)
Definition list.c:722
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
int LOCKMODE
Definition lockdefs.h:26
#define NoLock
Definition lockdefs.h:34
#define AccessExclusiveLock
Definition lockdefs.h:43
#define AccessShareLock
Definition lockdefs.h:36
#define RowExclusiveLock
Definition lockdefs.h:38
@ LockTupleExclusive
Definition lockoptions.h:59
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
#define LOGICALREP_COLUMN_UNCHANGED
LogicalRepMsgType
@ LOGICAL_REP_MSG_INSERT
@ LOGICAL_REP_MSG_TRUNCATE
@ LOGICAL_REP_MSG_STREAM_STOP
@ LOGICAL_REP_MSG_BEGIN
@ LOGICAL_REP_MSG_STREAM_PREPARE
@ LOGICAL_REP_MSG_STREAM_ABORT
@ LOGICAL_REP_MSG_BEGIN_PREPARE
@ LOGICAL_REP_MSG_STREAM_START
@ LOGICAL_REP_MSG_COMMIT
@ LOGICAL_REP_MSG_PREPARE
@ LOGICAL_REP_MSG_RELATION
@ LOGICAL_REP_MSG_MESSAGE
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
@ LOGICAL_REP_MSG_COMMIT_PREPARED
@ LOGICAL_REP_MSG_TYPE
@ LOGICAL_REP_MSG_DELETE
@ LOGICAL_REP_MSG_STREAM_COMMIT
@ LOGICAL_REP_MSG_ORIGIN
@ LOGICAL_REP_MSG_UPDATE
uint32 LogicalRepRelId
#define LOGICALREP_PROTO_VERSION_NUM
#define LOGICALREP_COLUMN_BINARY
#define LOGICALREP_COLUMN_TEXT
char * get_rel_name(Oid relid)
Definition lsyscache.c:2159
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition lsyscache.c:3107
char * get_namespace_name(Oid nspid)
Definition lsyscache.c:3599
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition lsyscache.c:3173
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_SHARED
Definition lwlock.h:105
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition mcxt.c:1897
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:406
MemoryContext TopTransactionContext
Definition mcxt.c:172
char * pstrdup(const char *in)
Definition mcxt.c:1910
void MemoryContextSetParent(MemoryContext context, MemoryContext new_parent)
Definition mcxt.c:689
void * repalloc(void *pointer, Size size)
Definition mcxt.c:1635
void pfree(void *pointer)
Definition mcxt.c:1619
MemoryContext TopMemoryContext
Definition mcxt.c:167
void * palloc(Size size)
Definition mcxt.c:1390
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:475
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
#define RESUME_INTERRUPTS()
Definition miscadmin.h:138
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
#define HOLD_INTERRUPTS()
Definition miscadmin.h:136
Oid GetUserId(void)
Definition miscinit.c:470
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition miscinit.c:990
CmdType
Definition nodes.h:273
@ CMD_INSERT
Definition nodes.h:277
@ CMD_DELETE
Definition nodes.h:278
@ CMD_UPDATE
Definition nodes.h:276
#define makeNode(_type_)
Definition nodes.h:161
static char * errmsg
ObjectType get_relkind_objtype(char relkind)
ReplOriginId replorigin_create(const char *roname)
Definition origin.c:274
ReplOriginXactState replorigin_xact_state
Definition origin.c:168
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:243
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition origin.c:1353
void replorigin_xact_clear(bool clear_origin)
Definition origin.c:1377
void replorigin_session_setup(ReplOriginId node, int acquired_by)
Definition origin.c:1156
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:138
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
#define ACL_DELETE
Definition parsenodes.h:79
uint64 AclMode
Definition parsenodes.h:74
#define ACL_INSERT
Definition parsenodes.h:76
#define ACL_UPDATE
Definition parsenodes.h:78
@ RTE_RELATION
@ DROP_RESTRICT
#define ACL_SELECT
Definition parsenodes.h:77
#define ACL_TRUNCATE
Definition parsenodes.h:80
int16 attnum
FormData_pg_attribute * Form_pg_attribute
static uint32 pg_ceil_log2_32(uint32 num)
static PgChecksumMode mode
#define NAMEDATALEN
#define MAXPGPATH
const void size_t len
static int server_version
Definition pg_dumpall.c:122
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
#define lfirst(lc)
Definition pg_list.h:172
#define NIL
Definition pg_list.h:68
#define list_make1(x1)
Definition pg_list.h:244
static void * list_nth(const List *list, int n)
Definition pg_list.h:331
#define lfirst_oid(lc)
Definition pg_list.h:174
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
Subscription * GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
void DisableSubscription(Oid subid)
void UpdateDeadTupleRetentionStatus(Oid subid, bool active)
END_CATALOG_STRUCT typedef FormData_pg_subscription * Form_pg_subscription
static char buf[DEFAULT_XLOG_SEG_SIZE]
long pgstat_report_stat(bool force)
Definition pgstat.c:722
void pgstat_report_subscription_error(Oid subid)
int64 timestamp
Expr * expression_planner(Expr *expr)
Definition planner.c:7081
#define pqsignal
Definition port.h:548
int pgsocket
Definition port.h:29
#define snprintf
Definition port.h:261
#define PGINVALID_SOCKET
Definition port.h:31
off_t pgoff_t
Definition port.h:422
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
uint64_t Datum
Definition postgres.h:70
static int32 DatumGetInt32(Datum X)
Definition postgres.h:202
#define InvalidOid
unsigned int Oid
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition pqformat.c:414
int pq_getmsgbyte(StringInfo msg)
Definition pqformat.c:398
int64 pq_getmsgint64(StringInfo msg)
Definition pqformat.c:452
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition pqformat.h:152
char * c
static int fd(const char *x, int i)
static int fb(int x)
char * s2
TransactionId GetOldestActiveTransactionId(bool inCommitOnly, bool allDbs)
Definition procarray.c:2845
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition proto.c:98
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition proto.c:561
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
Definition proto.c:325
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
Definition proto.c:134
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition proto.c:757
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition proto.c:487
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition proto.c:615
void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
Definition proto.c:1187
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition proto.c:63
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
Definition proto.c:267
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition proto.c:698
const char * logicalrep_message_type(LogicalRepMsgType action)
Definition proto.c:1212
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition proto.c:365
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition proto.c:1132
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition proto.c:428
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition proto.c:228
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition proto.c:1082
#define PqReplMsg_WALData
Definition protocol.h:77
#define PqReplMsg_PrimaryStatusRequest
Definition protocol.h:83
#define PqReplMsg_Keepalive
Definition protocol.h:75
#define PqReplMsg_PrimaryStatusUpdate
Definition protocol.h:76
#define PqReplMsg_StandbyStatusUpdate
Definition protocol.h:84
static color newsub(struct colormap *cm, color co)
Definition regc_color.c:390
#define RelationGetRelid(relation)
Definition rel.h:516
#define RelationIsLogicallyLogged(relation)
Definition rel.h:721
#define RelationGetDescr(relation)
Definition rel.h:542
#define RelationGetRelationName(relation)
Definition rel.h:550
#define RELATION_IS_OTHER_TEMP(relation)
Definition rel.h:678
#define RelationGetNamespace(relation)
Definition rel.h:557
List * RelationGetIndexList(Relation relation)
Definition relcache.c:4837
ResourceOwner TopTransactionResourceOwner
Definition resowner.c:175
ResourceOwner CurrentResourceOwner
Definition resowner.c:173
Node * build_column_default(Relation rel, int attrno)
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition rls.c:52
@ RLS_ENABLED
Definition rls.h:45
Snapshot GetTransactionSnapshot(void)
Definition snapmgr.c:272
void PushActiveSnapshot(Snapshot snapshot)
Definition snapmgr.c:682
void PopActiveSnapshot(void)
Definition snapmgr.c:775
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
Definition relation.c:585
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition relation.c:647
bool IsIndexUsableForReplicaIdentityFull(Relation idxrel, AttrMap *attrmap)
Definition relation.c:835
Oid GetRelationIdentityOrPK(Relation rel)
Definition relation.c:905
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition relation.c:165
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition relation.c:518
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition relation.c:362
StringInfo makeStringInfo(void)
Definition stringinfo.c:72
void resetStringInfo(StringInfo str)
Definition stringinfo.c:126
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition stringinfo.h:157
TransactionId remote_xid
Definition worker.c:335
LogicalRepMsgType command
Definition worker.c:330
XLogRecPtr finish_lsn
Definition worker.c:336
LogicalRepRelMapEntry * rel
Definition worker.c:331
ResultRelInfo * targetRelInfo
Definition worker.c:320
EState * estate
Definition worker.c:317
PartitionTupleRouting * proute
Definition worker.c:324
ModifyTableState * mtstate
Definition worker.c:323
LogicalRepRelMapEntry * targetRel
Definition worker.c:319
uint32 nsubxacts
Definition worker.c:544
uint32 nsubxacts_max
Definition worker.c:545
SubXactInfo * subxacts
Definition worker.c:547
TransactionId subxact_last
Definition worker.c:546
int maplen
Definition attmap.h:37
AttrNumber * attnums
Definition attmap.h:36
bool attisdropped
Definition tupdesc.h:78
List * es_rteperminfos
Definition execnodes.h:704
List * es_tupleTable
Definition execnodes.h:748
List * es_opened_result_relations
Definition execnodes.h:724
CommandId es_output_cid
Definition execnodes.h:718
struct ErrorContextCallback * previous
Definition elog.h:299
void(* callback)(void *arg)
Definition elog.h:300
dlist_node node
Definition worker.c:308
XLogRecPtr remote_end
Definition worker.c:310
XLogRecPtr local_end
Definition worker.c:309
Definition pg_list.h:54
LogicalRepRelation remoterel
TimestampTz last_recv_time
LogicalRepWorkerType type
TimestampTz reply_time
FileSet * stream_fileset
TransactionId oldest_nonremovable_xid
TimestampTz last_send_time
ResultRelInfo * resultRelInfo
Definition execnodes.h:1446
ParallelApplyWorkerShared * shared
pg_atomic_uint32 pending_stream_count
Plan * plan
Definition execnodes.h:1201
EState * state
Definition execnodes.h:1203
Form_pg_class rd_rel
Definition rel.h:111
ReplOriginId origin
Definition origin.h:45
XLogRecPtr origin_lsn
Definition origin.h:46
TimestampTz origin_timestamp
Definition origin.h:47
TimestampTz flushpos_update_time
Definition worker.c:437
FullTransactionId remote_oldestxid
Definition worker.c:417
FullTransactionId remote_wait_for
Definition worker.c:433
TimestampTz last_recv_time
Definition worker.c:448
TimestampTz candidate_xid_time
Definition worker.c:449
long table_sync_wait_time
Definition worker.c:441
FullTransactionId remote_nextxid
Definition worker.c:424
RetainDeadTuplesPhase phase
Definition worker.c:408
XLogRecPtr remote_lsn
Definition worker.c:409
TimestampTz reply_time
Definition worker.c:426
TransactionId candidate_xid
Definition worker.c:435
TransactionId xid
Definition worker.c:536
pgoff_t offset
Definition worker.c:538
int fileno
Definition worker.c:537
MemoryContext cxt
XLogRecPtr skiplsn
AttrMap * attrMap
Definition tupconvert.h:28
TupleDesc tts_tupleDescriptor
Definition tuptable.h:129
bool * tts_isnull
Definition tuptable.h:133
Datum * tts_values
Definition tuptable.h:131
dlist_node * cur
Definition ilist.h:200
void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
void ProcessSyncingRelations(XLogRecPtr current_lsn)
Definition syncutils.c:156
void InvalidateSyncingRelStates(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
Definition syncutils.c:101
#define FirstLowInvalidHeapAttributeNumber
Definition sysattr.h:27
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:265
HeapTuple SearchSysCache1(SysCacheIdentifier cacheId, Datum key1)
Definition syscache.c:221
#define SearchSysCacheCopy1(cacheId, key1)
Definition syscache.h:91
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition tableam.c:92
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs, bool run_as_table_owner)
Definition tablecmds.c:2040
bool AllTablesyncsReady(void)
Definition tablesync.c:1630
bool HasSubscriptionTablesCached(void)
Definition tablesync.c:1660
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition tablesync.c:1681
#define InvalidTransactionId
Definition transam.h:31
#define FullTransactionIdPrecedesOrEquals(a, b)
Definition transam.h:52
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition transam.h:282
static FullTransactionId FullTransactionIdFromU64(uint64 value)
Definition transam.h:81
#define TransactionIdEquals(id1, id2)
Definition transam.h:43
#define TransactionIdIsValid(xid)
Definition transam.h:41
#define InvalidFullTransactionId
Definition transam.h:56
#define FullTransactionIdIsValid(x)
Definition transam.h:55
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263
void AfterTriggerEndQuery(EState *estate)
Definition trigger.c:5161
void AfterTriggerBeginQuery(void)
Definition trigger.c:5141
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition tupconvert.c:103
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition tupconvert.c:193
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition tupdesc.h:178
static CompactAttribute * TupleDescCompactAttr(TupleDesc tupdesc, int i)
Definition tupdesc.h:195
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition tuptable.h:476
static void slot_getallattrs(TupleTableSlot *slot)
Definition tuptable.h:390
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition tuptable.h:544
void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res, int szgid)
Definition twophase.c:2753
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
Definition twophase.c:2694
void FinishPreparedTransaction(const char *gid, bool isCommit)
Definition twophase.c:1503
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition usercontext.c:87
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
const char * type
#define WL_SOCKET_READABLE
#define WL_TIMEOUT
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET
static StringInfoData reply_message
int wal_receiver_status_interval
Definition walreceiver.c:90
int wal_receiver_timeout
Definition walreceiver.c:91
#define walrcv_startstreaming(conn, options)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_send(conn, buffer, nbytes)
#define walrcv_server_version(conn)
#define walrcv_endstreaming(conn, next_tli)
#define walrcv_identify_system(conn, primary_tli)
#define walrcv_receive(conn, buffer, wait_fd)
int WalWriterDelay
Definition walwriter.c:71
#define SIGHUP
Definition win32_port.h:158
@ PARALLEL_TRANS_STARTED
@ PARALLEL_TRANS_FINISHED
static bool am_parallel_apply_worker(void)
@ WORKERTYPE_TABLESYNC
@ WORKERTYPE_UNKNOWN
@ WORKERTYPE_SEQUENCESYNC
@ WORKERTYPE_PARALLEL_APPLY
@ WORKERTYPE_APPLY
@ FS_SERIALIZE_DONE
static bool am_sequencesync_worker(void)
static bool am_tablesync_worker(void)
static bool am_leader_apply_worker(void)
bool IsTransactionOrTransactionBlock(void)
Definition xact.c:5040
bool PrepareTransactionBlock(const char *gid)
Definition xact.c:4043
bool IsTransactionState(void)
Definition xact.c:389
void CommandCounterIncrement(void)
Definition xact.c:1130
void StartTransactionCommand(void)
Definition xact.c:3109
void SetCurrentStatementStartTimestamp(void)
Definition xact.c:916
bool IsTransactionBlock(void)
Definition xact.c:5022
void BeginTransactionBlock(void)
Definition xact.c:3975
void CommitTransactionCommand(void)
Definition xact.c:3207
bool EndTransactionBlock(bool chain)
Definition xact.c:4095
void AbortOutOfAnyTransaction(void)
Definition xact.c:4913
CommandId GetCurrentCommandId(bool used)
Definition xact.c:831
#define GIDSIZE
Definition xact.h:31
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition xlog.c:6997
XLogRecPtr XactLastCommitEnd
Definition xlog.c:262
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint16 ReplOriginId
Definition xlogdefs.h:69
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
uint32 TimeLineID
Definition xlogdefs.h:63