PostgreSQL Source Code git master
Loading...
Searching...
No Matches
worker.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 * worker.c
3 * PostgreSQL logical replication worker (apply)
4 *
5 * Copyright (c) 2016-2026, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/worker.c
9 *
10 * NOTES
11 * This file contains the worker which applies logical changes as they come
12 * from remote logical replication stream.
13 *
14 * The main worker (apply) is started by logical replication worker
15 * launcher for every enabled subscription in a database. It uses
16 * walsender protocol to communicate with publisher.
17 *
18 * This module includes server facing code and shares libpqwalreceiver
19 * module with walreceiver for providing the libpq specific functionality.
20 *
21 *
22 * STREAMED TRANSACTIONS
23 * ---------------------
24 * Streamed transactions (large transactions exceeding a memory limit on the
25 * upstream) are applied using one of two approaches:
26 *
27 * 1) Write to temporary files and apply when the final commit arrives
28 *
29 * This approach is used when the user has set the subscription's streaming
30 * option as on.
31 *
32 * Unlike the regular (non-streamed) case, handling streamed transactions has
33 * to handle aborts of both the toplevel transaction and subtransactions. This
34 * is achieved by tracking offsets for subtransactions, which is then used
35 * to truncate the file with serialized changes.
36 *
37 * The files are placed in tmp file directory by default, and the filenames
38 * include both the XID of the toplevel transaction and OID of the
39 * subscription. This is necessary so that different workers processing a
40 * remote transaction with the same XID doesn't interfere.
41 *
42 * We use BufFiles instead of using normal temporary files because (a) the
43 * BufFile infrastructure supports temporary files that exceed the OS file size
44 * limit, (b) provides a way for automatic clean up on the error and (c) provides
45 * a way to survive these files across local transactions and allow to open and
46 * close at stream start and close. We decided to use FileSet
47 * infrastructure as without that it deletes the files on the closure of the
48 * file and if we decide to keep stream files open across the start/stop stream
49 * then it will consume a lot of memory (more than 8K for each BufFile and
50 * there could be multiple such BufFiles as the subscriber could receive
51 * multiple start/stop streams for different transactions before getting the
52 * commit). Moreover, if we don't use FileSet then we also need to invent
53 * a new way to pass filenames to BufFile APIs so that we are allowed to open
54 * the file we desired across multiple stream-open calls for the same
55 * transaction.
56 *
57 * 2) Parallel apply workers.
58 *
59 * This approach is used when the user has set the subscription's streaming
60 * option as parallel. See logical/applyparallelworker.c for information about
61 * this approach.
62 *
63 * TWO_PHASE TRANSACTIONS
64 * ----------------------
65 * Two phase transactions are replayed at prepare and then committed or
66 * rolled back at commit prepared and rollback prepared respectively. It is
67 * possible to have a prepared transaction that arrives at the apply worker
68 * when the tablesync is busy doing the initial copy. In this case, the apply
69 * worker skips all the prepared operations [e.g. inserts] while the tablesync
70 * is still busy (see the condition of should_apply_changes_for_rel). The
71 * tablesync worker might not get such a prepared transaction because say it
72 * was prior to the initial consistent point but might have got some later
73 * commits. Now, the tablesync worker will exit without doing anything for the
74 * prepared transaction skipped by the apply worker as the sync location for it
75 * will be already ahead of the apply worker's current location. This would lead
76 * to an "empty prepare", because later when the apply worker does the commit
77 * prepare, there is nothing in it (the inserts were skipped earlier).
78 *
79 * To avoid this, and similar prepare confusions the subscription's two_phase
80 * commit is enabled only after the initial sync is over. The two_phase option
81 * has been implemented as a tri-state with values DISABLED, PENDING, and
82 * ENABLED.
83 *
84 * Even if the user specifies they want a subscription with two_phase = on,
85 * internally it will start with a tri-state of PENDING which only becomes
86 * ENABLED after all tablesync initializations are completed - i.e. when all
87 * tablesync workers have reached their READY state. In other words, the value
88 * PENDING is only a temporary state for subscription start-up.
89 *
90 * Until the two_phase is properly available (ENABLED) the subscription will
91 * behave as if two_phase = off. When the apply worker detects that all
92 * tablesyncs have become READY (while the tri-state was PENDING) it will
93 * restart the apply worker process. This happens in
94 * ProcessSyncingTablesForApply.
95 *
96 * When the (re-started) apply worker finds that all tablesyncs are READY for a
97 * two_phase tri-state of PENDING it start streaming messages with the
98 * two_phase option which in turn enables the decoding of two-phase commits at
99 * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100 * Now, it is possible that during the time we have not enabled two_phase, the
101 * publisher (replication server) would have skipped some prepares but we
102 * ensure that such prepares are sent along with commit prepare, see
103 * ReorderBufferFinishPrepared.
104 *
105 * If the subscription has no tables then a two_phase tri-state PENDING is
106 * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107 * PUBLICATION which might otherwise be disallowed (see below).
108 *
109 * If ever a user needs to be aware of the tri-state value, they can fetch it
110 * from the pg_subscription catalog (see column subtwophasestate).
111 *
112 * Finally, to avoid problems mentioned in previous paragraphs from any
113 * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
114 * to 'off' and then again back to 'on') there is a restriction for
115 * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
116 * the two_phase tri-state is ENABLED, except when copy_data = false.
117 *
118 * We can get prepare of the same GID more than once for the genuine cases
119 * where we have defined multiple subscriptions for publications on the same
120 * server and prepared transaction has operations on tables subscribed to those
121 * subscriptions. For such cases, if we use the GID sent by publisher one of
122 * the prepares will be successful and others will fail, in which case the
123 * server will send them again. Now, this can lead to a deadlock if user has
124 * set synchronous_standby_names for all the subscriptions on subscriber. To
125 * avoid such deadlocks, we generate a unique GID (consisting of the
126 * subscription oid and the xid of the prepared transaction) for each prepare
127 * transaction on the subscriber.
128 *
129 * FAILOVER
130 * ----------------------
131 * The logical slot on the primary can be synced to the standby by specifying
132 * failover = true when creating the subscription. Enabling failover allows us
133 * to smoothly transition to the promoted standby, ensuring that we can
134 * subscribe to the new primary without losing any data.
135 *
136 * RETAIN DEAD TUPLES
137 * ----------------------
138 * Each apply worker that enabled retain_dead_tuples option maintains a
139 * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
140 * prevent dead rows from being removed prematurely when the apply worker still
141 * needs them to detect update_deleted conflicts. Additionally, this helps to
142 * retain the required commit_ts module information, which further helps to
143 * detect update_origin_differs and delete_origin_differs conflicts reliably, as
144 * otherwise, vacuum freeze could remove the required information.
145 *
146 * The logical replication launcher manages an internal replication slot named
147 * "pg_conflict_detection". It asynchronously aggregates the non-removable
148 * transaction ID from all apply workers to determine the appropriate xmin for
149 * the slot, thereby retaining necessary tuples.
150 *
151 * The non-removable transaction ID in the apply worker is advanced to the
152 * oldest running transaction ID once all concurrent transactions on the
153 * publisher have been applied and flushed locally. The process involves:
154 *
155 * - RDT_GET_CANDIDATE_XID:
156 * Call GetOldestActiveTransactionId() to take oldestRunningXid as the
157 * candidate xid.
158 *
159 * - RDT_REQUEST_PUBLISHER_STATUS:
160 * Send a message to the walsender requesting the publisher status, which
161 * includes the latest WAL write position and information about transactions
162 * that are in the commit phase.
163 *
164 * - RDT_WAIT_FOR_PUBLISHER_STATUS:
165 * Wait for the status from the walsender. After receiving the first status,
166 * do not proceed if there are concurrent remote transactions that are still
167 * in the commit phase. These transactions might have been assigned an
168 * earlier commit timestamp but have not yet written the commit WAL record.
169 * Continue to request the publisher status (RDT_REQUEST_PUBLISHER_STATUS)
170 * until all these transactions have completed.
171 *
172 * - RDT_WAIT_FOR_LOCAL_FLUSH:
173 * Advance the non-removable transaction ID if the current flush location has
174 * reached or surpassed the last received WAL position.
175 *
176 * - RDT_STOP_CONFLICT_INFO_RETENTION:
177 * This phase is required only when max_retention_duration is defined. We
178 * enter this phase if the wait time in either the
179 * RDT_WAIT_FOR_PUBLISHER_STATUS or RDT_WAIT_FOR_LOCAL_FLUSH phase exceeds
180 * configured max_retention_duration. In this phase,
181 * pg_subscription.subretentionactive is updated to false within a new
182 * transaction, and oldest_nonremovable_xid is set to InvalidTransactionId.
183 *
184 * - RDT_RESUME_CONFLICT_INFO_RETENTION:
185 * This phase is required only when max_retention_duration is defined. We
186 * enter this phase if the retention was previously stopped, and the time
187 * required to advance the non-removable transaction ID in the
188 * RDT_WAIT_FOR_LOCAL_FLUSH phase has decreased to within acceptable limits
189 * (or if max_retention_duration is set to 0). During this phase,
190 * pg_subscription.subretentionactive is updated to true within a new
191 * transaction, and the worker will be restarted.
192 *
193 * The overall state progression is: GET_CANDIDATE_XID ->
194 * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
195 * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
196 * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID.
197 *
198 * Retaining the dead tuples for this period is sufficient for ensuring
199 * eventual consistency using last-update-wins strategy, as dead tuples are
200 * useful for detecting conflicts only during the application of concurrent
201 * transactions from remote nodes. After applying and flushing all remote
202 * transactions that occurred concurrently with the tuple DELETE, any
203 * subsequent UPDATE from a remote node should have a later timestamp. In such
204 * cases, it is acceptable to detect an update_missing scenario and convert the
205 * UPDATE to an INSERT when applying it. But, for concurrent remote
206 * transactions with earlier timestamps than the DELETE, detecting
207 * update_deleted is necessary, as the UPDATEs in remote transactions should be
208 * ignored if their timestamp is earlier than that of the dead tuples.
209 *
210 * Note that advancing the non-removable transaction ID is not supported if the
211 * publisher is also a physical standby. This is because the logical walsender
212 * on the standby can only get the WAL replay position but there may be more
213 * WALs that are being replicated from the primary and those WALs could have
214 * earlier commit timestamp.
215 *
216 * Similarly, when the publisher has subscribed to another publisher,
217 * information necessary for conflict detection cannot be retained for
218 * changes from origins other than the publisher. This is because publisher
219 * lacks the information on concurrent transactions of other publishers to
220 * which it subscribes. As the information on concurrent transactions is
221 * unavailable beyond subscriber's immediate publishers, the non-removable
222 * transaction ID might be advanced prematurely before changes from other
223 * origins have been fully applied.
224 *
225 * XXX Retaining information for changes from other origins might be possible
226 * by requesting the subscription on that origin to enable retain_dead_tuples
227 * and fetching the conflict detection slot.xmin along with the publisher's
228 * status. In the RDT_WAIT_FOR_PUBLISHER_STATUS phase, the apply worker could
229 * wait for the remote slot's xmin to reach the oldest active transaction ID,
230 * ensuring that all transactions from other origins have been applied on the
231 * publisher, thereby getting the latest WAL position that includes all
232 * concurrent changes. However, this approach may impact performance, so it
233 * might not worth the effort.
234 *
235 * XXX It seems feasible to get the latest commit's WAL location from the
236 * publisher and wait till that is applied. However, we can't do that
237 * because commit timestamps can regress as a commit with a later LSN is not
238 * guaranteed to have a later timestamp than those with earlier LSNs. Having
239 * said that, even if that is possible, it won't improve performance much as
240 * the apply always lag and moves slowly as compared with the transactions
241 * on the publisher.
242 *-------------------------------------------------------------------------
243 */
244
245#include "postgres.h"
246
247#include <sys/stat.h>
248#include <unistd.h>
249
250#include "access/commit_ts.h"
251#include "access/table.h"
252#include "access/tableam.h"
253#include "access/twophase.h"
254#include "access/xact.h"
255#include "catalog/indexing.h"
256#include "catalog/pg_inherits.h"
260#include "commands/tablecmds.h"
261#include "commands/trigger.h"
262#include "executor/executor.h"
264#include "libpq/pqformat.h"
265#include "miscadmin.h"
266#include "optimizer/optimizer.h"
268#include "pgstat.h"
269#include "postmaster/bgworker.h"
270#include "postmaster/interrupt.h"
271#include "postmaster/walwriter.h"
272#include "replication/conflict.h"
277#include "replication/origin.h"
278#include "replication/slot.h"
282#include "storage/buffile.h"
283#include "storage/ipc.h"
284#include "storage/latch.h"
285#include "storage/lmgr.h"
286#include "storage/procarray.h"
287#include "tcop/tcopprot.h"
288#include "utils/acl.h"
289#include "utils/guc.h"
290#include "utils/inval.h"
291#include "utils/lsyscache.h"
292#include "utils/memutils.h"
293#include "utils/pg_lsn.h"
294#include "utils/rel.h"
295#include "utils/rls.h"
296#include "utils/snapmgr.h"
297#include "utils/syscache.h"
298#include "utils/usercontext.h"
299#include "utils/wait_event.h"
300
301#define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
302
309
311
312typedef struct ApplyExecutionData
313{
314 EState *estate; /* executor state, used to track resources */
315
316 LogicalRepRelMapEntry *targetRel; /* replication target rel */
317 ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
318
319 /* These fields are used when the target relation is partitioned: */
320 ModifyTableState *mtstate; /* dummy ModifyTable state */
321 PartitionTupleRouting *proute; /* partition routing info */
323
324/* Struct for saving and restoring apply errcontext information */
326{
327 LogicalRepMsgType command; /* 0 if invalid */
329
330 /* Remote node information */
331 int remote_attnum; /* -1 if invalid */
336
337/*
338 * The action to be taken for the changes in the transaction.
339 *
340 * TRANS_LEADER_APPLY:
341 * This action means that we are in the leader apply worker or table sync
342 * worker. The changes of the transaction are either directly applied or
343 * are read from temporary files (for streaming transactions) and then
344 * applied by the worker.
345 *
346 * TRANS_LEADER_SERIALIZE:
347 * This action means that we are in the leader apply worker or table sync
348 * worker. Changes are written to temporary files and then applied when the
349 * final commit arrives.
350 *
351 * TRANS_LEADER_SEND_TO_PARALLEL:
352 * This action means that we are in the leader apply worker and need to send
353 * the changes to the parallel apply worker.
354 *
355 * TRANS_LEADER_PARTIAL_SERIALIZE:
356 * This action means that we are in the leader apply worker and have sent some
357 * changes directly to the parallel apply worker and the remaining changes are
358 * serialized to a file, due to timeout while sending data. The parallel apply
359 * worker will apply these serialized changes when the final commit arrives.
360 *
361 * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
362 * serializing changes, the leader worker also needs to serialize the
363 * STREAM_XXX message to a file, and wait for the parallel apply worker to
364 * finish the transaction when processing the transaction finish command. So
365 * this new action was introduced to keep the code and logic clear.
366 *
367 * TRANS_PARALLEL_APPLY:
368 * This action means that we are in the parallel apply worker and changes of
369 * the transaction are applied directly by the worker.
370 */
371typedef enum
372{
373 /* The action for non-streaming transactions. */
375
376 /* Actions for streaming transactions. */
382
383/*
384 * The phases involved in advancing the non-removable transaction ID.
385 *
386 * See comments atop worker.c for details of the transition between these
387 * phases.
388 */
398
399/*
400 * Critical information for managing phase transitions within the
401 * RetainDeadTuplesPhase.
402 */
404{
405 RetainDeadTuplesPhase phase; /* current phase */
406 XLogRecPtr remote_lsn; /* WAL write position on the publisher */
407
408 /*
409 * Oldest transaction ID that was in the commit phase on the publisher.
410 * Use FullTransactionId to prevent issues with transaction ID wraparound,
411 * where a new remote_oldestxid could falsely appear to originate from the
412 * past and block advancement.
413 */
415
416 /*
417 * Next transaction ID to be assigned on the publisher. Use
418 * FullTransactionId for consistency and to allow straightforward
419 * comparisons with remote_oldestxid.
420 */
422
423 TimestampTz reply_time; /* when the publisher responds with status */
424
425 /*
426 * Publisher transaction ID that must be awaited to complete before
427 * entering the final phase (RDT_WAIT_FOR_LOCAL_FLUSH). Use
428 * FullTransactionId for the same reason as remote_nextxid.
429 */
431
432 TransactionId candidate_xid; /* candidate for the non-removable
433 * transaction ID */
434 TimestampTz flushpos_update_time; /* when the remote flush position was
435 * updated in final phase
436 * (RDT_WAIT_FOR_LOCAL_FLUSH) */
437
438 long table_sync_wait_time; /* time spent waiting for table sync
439 * to finish */
440
441 /*
442 * The following fields are used to determine the timing for the next
443 * round of transaction ID advancement.
444 */
445 TimestampTz last_recv_time; /* when the last message was received */
446 TimestampTz candidate_xid_time; /* when the candidate_xid is decided */
447 int xid_advance_interval; /* how much time (ms) to wait before
448 * attempting to advance the
449 * non-removable transaction ID */
451
452/*
453 * The minimum (100ms) and maximum (3 minutes) intervals for advancing
454 * non-removable transaction IDs. The maximum interval is a bit arbitrary but
455 * is sufficient to not cause any undue network traffic.
456 */
457#define MIN_XID_ADVANCE_INTERVAL 100
458#define MAX_XID_ADVANCE_INTERVAL 180000
459
460/* errcontext tracker */
462{
463 .command = 0,
464 .rel = NULL,
465 .remote_attnum = -1,
466 .remote_xid = InvalidTransactionId,
467 .finish_lsn = InvalidXLogRecPtr,
468 .origin_name = NULL,
469};
470
472
475
476/* per stream context for streaming transactions */
478
480
482static bool MySubscriptionValid = false;
483
485
488
489/* fields valid only when processing streamed transaction */
490static bool in_streamed_transaction = false;
491
493
494/*
495 * The number of changes applied by parallel apply worker during one streaming
496 * block.
497 */
499
500/* Are we initializing an apply worker? */
502
503/*
504 * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
505 * the subscription if the remote transaction's finish LSN matches the subskiplsn.
506 * Once we start skipping changes, we don't stop it until we skip all changes of
507 * the transaction even if pg_subscription is updated and MySubscription->skiplsn
508 * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
509 * we don't skip receiving and spooling the changes since we decide whether or not
510 * to skip applying the changes when starting to apply changes. The subskiplsn is
511 * cleared after successfully skipping the transaction or applying non-empty
512 * transaction. The latter prevents the mistakenly specified subskiplsn from
513 * being left. Note that we cannot skip the streaming transactions when using
514 * parallel apply workers because we cannot get the finish LSN before applying
515 * the changes. So, we don't start parallel apply worker when finish LSN is set
516 * by the user.
517 */
519#define is_skipping_changes() (unlikely(XLogRecPtrIsValid(skip_xact_finish_lsn)))
520
521/* BufFile handle of the current streaming file */
523
524/*
525 * The remote WAL position that has been applied and flushed locally. We record
526 * and use this information both while sending feedback to the server and
527 * advancing oldest_nonremovable_xid.
528 */
530
531typedef struct SubXactInfo
532{
533 TransactionId xid; /* XID of the subxact */
534 int fileno; /* file number in the buffile */
535 pgoff_t offset; /* offset in the file */
537
538/* Sub-transaction data for the current streaming transaction */
539typedef struct ApplySubXactData
540{
541 uint32 nsubxacts; /* number of sub-transactions */
542 uint32 nsubxacts_max; /* current capacity of subxacts */
543 TransactionId subxact_last; /* xid of the last sub-transaction */
544 SubXactInfo *subxacts; /* sub-xact offset in changes file */
546
548
549static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
550static inline void changes_filename(char *path, Oid subid, TransactionId xid);
551
552/*
553 * Information about subtransactions of a given toplevel transaction.
554 */
555static void subxact_info_write(Oid subid, TransactionId xid);
556static void subxact_info_read(Oid subid, TransactionId xid);
557static void subxact_info_add(TransactionId xid);
558static inline void cleanup_subxact_info(void);
559
560/*
561 * Serialize and deserialize changes for a toplevel transaction.
562 */
563static void stream_open_file(Oid subid, TransactionId xid,
564 bool first_segment);
565static void stream_write_change(char action, StringInfo s);
566static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
567static void stream_close_file(void);
568
569static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
570
572 bool status_received);
575 bool status_received);
579 bool status_received);
584static bool update_retention_status(bool active);
587 bool new_xid_found);
588
589static void apply_worker_exit(void);
590
599 Oid localindexoid);
603 Oid localindexoid);
605 LogicalRepRelation *remoterel,
609static bool FindDeletedTupleInLocalRel(Relation localrel,
618 CmdType operation);
619
620/* Functions for skipping changes */
621static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
622static void stop_skipping_changes(void);
623static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
624
625/* Functions for apply error callback */
626static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
627static inline void reset_apply_error_context_info(void);
628
631
632static void set_wal_receiver_timeout(void);
633
634static void on_exit_clear_xact_state(int code, Datum arg);
635
636/*
637 * Form the origin name for the subscription.
638 *
639 * This is a common function for tablesync and other workers. Tablesync workers
640 * must pass a valid relid. Other callers must pass relid = InvalidOid.
641 *
642 * Return the name in the supplied buffer.
643 */
644void
647{
648 if (OidIsValid(relid))
649 {
650 /* Replication origin name for tablesync workers. */
651 snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
652 }
653 else
654 {
655 /* Replication origin name for non-tablesync workers. */
657 }
658}
659
660/*
661 * Should this worker apply changes for given relation.
662 *
663 * This is mainly needed for initial relation data sync as that runs in
664 * separate worker process running in parallel and we need some way to skip
665 * changes coming to the leader apply worker during the sync of a table.
666 *
667 * Note we need to do smaller or equals comparison for SYNCDONE state because
668 * it might hold position of end of initial slot consistent point WAL
669 * record + 1 (ie start of next record) and next record can be COMMIT of
670 * transaction we are now processing (which is what we set remote_final_lsn
671 * to in apply_handle_begin).
672 *
673 * Note that for streaming transactions that are being applied in the parallel
674 * apply worker, we disallow applying changes if the target table in the
675 * subscription is not in the READY state, because we cannot decide whether to
676 * apply the change as we won't know remote_final_lsn by that time.
677 *
678 * We already checked this in pa_can_start() before assigning the
679 * streaming transaction to the parallel worker, but it also needs to be
680 * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
681 * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
682 * while applying this transaction.
683 */
684static bool
686{
687 switch (MyLogicalRepWorker->type)
688 {
690 return MyLogicalRepWorker->relid == rel->localreloid;
691
693 /* We don't synchronize rel's that are in unknown state. */
694 if (rel->state != SUBREL_STATE_READY &&
698 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
700 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
701
702 return rel->state == SUBREL_STATE_READY;
703
704 case WORKERTYPE_APPLY:
705 return (rel->state == SUBREL_STATE_READY ||
706 (rel->state == SUBREL_STATE_SYNCDONE &&
707 rel->statelsn <= remote_final_lsn));
708
710 /* Should never happen. */
711 elog(ERROR, "sequence synchronization worker is not expected to apply changes");
712 break;
713
715 /* Should never happen. */
716 elog(ERROR, "Unknown worker type");
717 }
718
719 return false; /* dummy for compiler */
720}
721
722/*
723 * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
724 *
725 * Start a transaction, if this is the first step (else we keep using the
726 * existing transaction).
727 * Also provide a global snapshot and ensure we run in ApplyMessageContext.
728 */
729static void
744
745/*
746 * Finish up one step of a replication transaction.
747 * Callers of begin_replication_step() must also call this.
748 *
749 * We don't close out the transaction here, but we should increment
750 * the command counter to make the effects of this step visible.
751 */
752static void
759
760/*
761 * Handle streamed transactions for both the leader apply worker and the
762 * parallel apply workers.
763 *
764 * In the streaming case (receiving a block of the streamed transaction), for
765 * serialize mode, simply redirect it to a file for the proper toplevel
766 * transaction, and for parallel mode, the leader apply worker will send the
767 * changes to parallel apply workers and the parallel apply worker will define
768 * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
769 * messages will be applied by both leader apply worker and parallel apply
770 * workers).
771 *
772 * Returns true for streamed transactions (when the change is either serialized
773 * to file or sent to parallel apply worker), false otherwise (regular mode or
774 * needs to be processed by parallel apply worker).
775 *
776 * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
777 * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
778 * to a parallel apply worker.
779 */
780static bool
782{
787
789
790 /* not in streaming mode */
792 return false;
793
795
796 /*
797 * The parallel apply worker needs the xid in this message to decide
798 * whether to define a savepoint, so save the original message that has
799 * not moved the cursor after the xid. We will serialize this message to a
800 * file in PARTIAL_SERIALIZE mode.
801 */
802 original_msg = *s;
803
804 /*
805 * We should have received XID of the subxact as the first part of the
806 * message, so extract it.
807 */
809
813 errmsg_internal("invalid transaction ID in streamed replication transaction")));
814
815 switch (apply_action)
816 {
819
820 /* Add the new subxact to the array (unless already there). */
822
823 /* Write the change to the current file */
824 stream_write_change(action, s);
825 return true;
826
828 Assert(winfo);
829
830 /*
831 * XXX The publisher side doesn't always send relation/type update
832 * messages after the streaming transaction, so also update the
833 * relation/type in leader apply worker. See function
834 * cleanup_rel_sync_cache.
835 */
836 if (pa_send_data(winfo, s->len, s->data))
837 return (action != LOGICAL_REP_MSG_RELATION &&
838 action != LOGICAL_REP_MSG_TYPE);
839
840 /*
841 * Switch to serialize mode when we are not able to send the
842 * change to parallel apply worker.
843 */
844 pa_switch_to_partial_serialize(winfo, false);
845
849
850 /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
851 return (action != LOGICAL_REP_MSG_RELATION &&
852 action != LOGICAL_REP_MSG_TYPE);
853
856
857 /* Define a savepoint for a subxact if needed. */
859 return false;
860
861 default:
862 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
863 return false; /* silence compiler warning */
864 }
865}
866
867/*
868 * Executor state preparation for evaluation of constraint expressions,
869 * indexes and triggers for the specified relation.
870 *
871 * Note that the caller must open and close any indexes to be updated.
872 */
873static ApplyExecutionData *
875{
877 EState *estate;
879 List *perminfos = NIL;
880 ResultRelInfo *resultRelInfo;
881
883 edata->targetRel = rel;
884
885 edata->estate = estate = CreateExecutorState();
886
888 rte->rtekind = RTE_RELATION;
889 rte->relid = RelationGetRelid(rel->localrel);
890 rte->relkind = rel->localrel->rd_rel->relkind;
891 rte->rellockmode = AccessShareLock;
892
894
897
898 edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
899
900 /*
901 * Use Relation opened by logicalrep_rel_open() instead of opening it
902 * again.
903 */
904 InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
905
906 /*
907 * We put the ResultRelInfo in the es_opened_result_relations list, even
908 * though we don't populate the es_result_relations array. That's a bit
909 * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
910 *
911 * ExecOpenIndices() is not called here either, each execution path doing
912 * an apply operation being responsible for that.
913 */
915 lappend(estate->es_opened_result_relations, resultRelInfo);
916
917 estate->es_output_cid = GetCurrentCommandId(true);
918
919 /* Prepare to catch AFTER triggers. */
921
922 /* other fields of edata remain NULL for now */
923
924 return edata;
925}
926
927/*
928 * Finish any operations related to the executor state created by
929 * create_edata_for_relation().
930 */
931static void
933{
934 EState *estate = edata->estate;
935
936 /* Handle any queued AFTER triggers. */
937 AfterTriggerEndQuery(estate);
938
939 /* Shut down tuple routing, if any was done. */
940 if (edata->proute)
941 ExecCleanupTupleRouting(edata->mtstate, edata->proute);
942
943 /*
944 * Cleanup. It might seem that we should call ExecCloseResultRelations()
945 * here, but we intentionally don't. It would close the rel we added to
946 * es_opened_result_relations above, which is wrong because we took no
947 * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
948 * any other relations opened during execution.
949 */
950 ExecResetTupleTable(estate->es_tupleTable, false);
951 FreeExecutorState(estate);
952 pfree(edata);
953}
954
955/*
956 * Executes default values for columns for which we can't map to remote
957 * relation columns.
958 *
959 * This allows us to support tables which have more columns on the downstream
960 * than on the upstream.
961 */
962static void
964 TupleTableSlot *slot)
965{
967 int num_phys_attrs = desc->natts;
968 int i;
969 int attnum,
970 num_defaults = 0;
971 int *defmap;
972 ExprState **defexprs;
973 ExprContext *econtext;
974
975 econtext = GetPerTupleExprContext(estate);
976
977 /* We got all the data via replication, no need to evaluate anything. */
978 if (num_phys_attrs == rel->remoterel.natts)
979 return;
980
981 defmap = palloc_array(int, num_phys_attrs);
983
985 for (attnum = 0; attnum < num_phys_attrs; attnum++)
986 {
988 Expr *defexpr;
989
990 if (cattr->attisdropped || cattr->attgenerated)
991 continue;
992
993 if (rel->attrmap->attnums[attnum] >= 0)
994 continue;
995
996 defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
997
998 if (defexpr != NULL)
999 {
1000 /* Run the expression through planner */
1001 defexpr = expression_planner(defexpr);
1002
1003 /* Initialize executable expression in copycontext */
1004 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
1005 defmap[num_defaults] = attnum;
1006 num_defaults++;
1007 }
1008 }
1009
1010 for (i = 0; i < num_defaults; i++)
1011 slot->tts_values[defmap[i]] =
1012 ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
1013}
1014
1015/*
1016 * Store tuple data into slot.
1017 *
1018 * Incoming data can be either text or binary format.
1019 */
1020static void
1023{
1024 int natts = slot->tts_tupleDescriptor->natts;
1025 int i;
1026
1027 ExecClearTuple(slot);
1028
1029 /* Call the "in" function for each non-dropped, non-null attribute */
1030 Assert(natts == rel->attrmap->maplen);
1031 for (i = 0; i < natts; i++)
1032 {
1034 int remoteattnum = rel->attrmap->attnums[i];
1035
1036 if (!att->attisdropped && remoteattnum >= 0)
1037 {
1039
1041
1042 /* Set attnum for error callback */
1044
1046 {
1047 Oid typinput;
1048 Oid typioparam;
1049
1050 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1051 slot->tts_values[i] =
1053 typioparam, att->atttypmod);
1054 slot->tts_isnull[i] = false;
1055 }
1056 else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1057 {
1058 Oid typreceive;
1059 Oid typioparam;
1060
1061 /*
1062 * In some code paths we may be asked to re-parse the same
1063 * tuple data. Reset the StringInfo's cursor so that works.
1064 */
1065 colvalue->cursor = 0;
1066
1067 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1068 slot->tts_values[i] =
1069 OidReceiveFunctionCall(typreceive, colvalue,
1070 typioparam, att->atttypmod);
1071
1072 /* Trouble if it didn't eat the whole buffer */
1073 if (colvalue->cursor != colvalue->len)
1074 ereport(ERROR,
1076 errmsg("incorrect binary data format in logical replication column %d",
1077 remoteattnum + 1)));
1078 slot->tts_isnull[i] = false;
1079 }
1080 else
1081 {
1082 /*
1083 * NULL value from remote. (We don't expect to see
1084 * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
1085 * NULL.)
1086 */
1087 slot->tts_values[i] = (Datum) 0;
1088 slot->tts_isnull[i] = true;
1089 }
1090
1091 /* Reset attnum for error callback */
1093 }
1094 else
1095 {
1096 /*
1097 * We assign NULL to dropped attributes and missing values
1098 * (missing values should be later filled using
1099 * slot_fill_defaults).
1100 */
1101 slot->tts_values[i] = (Datum) 0;
1102 slot->tts_isnull[i] = true;
1103 }
1104 }
1105
1107}
1108
1109/*
1110 * Replace updated columns with data from the LogicalRepTupleData struct.
1111 * This is somewhat similar to heap_modify_tuple but also calls the type
1112 * input functions on the user data.
1113 *
1114 * "slot" is filled with a copy of the tuple in "srcslot", replacing
1115 * columns provided in "tupleData" and leaving others as-is.
1116 *
1117 * Caution: unreplaced pass-by-ref columns in "slot" will point into the
1118 * storage for "srcslot". This is OK for current usage, but someday we may
1119 * need to materialize "slot" at the end to make it independent of "srcslot".
1120 */
1121static void
1125{
1126 int natts = slot->tts_tupleDescriptor->natts;
1127 int i;
1128
1129 /* We'll fill "slot" with a virtual tuple, so we must start with ... */
1130 ExecClearTuple(slot);
1131
1132 /*
1133 * Copy all the column data from srcslot, so that we'll have valid values
1134 * for unreplaced columns.
1135 */
1136 Assert(natts == srcslot->tts_tupleDescriptor->natts);
1138 memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
1139 memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
1140
1141 /* Call the "in" function for each replaced attribute */
1142 Assert(natts == rel->attrmap->maplen);
1143 for (i = 0; i < natts; i++)
1144 {
1146 int remoteattnum = rel->attrmap->attnums[i];
1147
1148 if (remoteattnum < 0)
1149 continue;
1150
1152
1154 {
1156
1157 /* Set attnum for error callback */
1159
1161 {
1162 Oid typinput;
1163 Oid typioparam;
1164
1165 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1166 slot->tts_values[i] =
1168 typioparam, att->atttypmod);
1169 slot->tts_isnull[i] = false;
1170 }
1171 else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1172 {
1173 Oid typreceive;
1174 Oid typioparam;
1175
1176 /*
1177 * In some code paths we may be asked to re-parse the same
1178 * tuple data. Reset the StringInfo's cursor so that works.
1179 */
1180 colvalue->cursor = 0;
1181
1182 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1183 slot->tts_values[i] =
1184 OidReceiveFunctionCall(typreceive, colvalue,
1185 typioparam, att->atttypmod);
1186
1187 /* Trouble if it didn't eat the whole buffer */
1188 if (colvalue->cursor != colvalue->len)
1189 ereport(ERROR,
1191 errmsg("incorrect binary data format in logical replication column %d",
1192 remoteattnum + 1)));
1193 slot->tts_isnull[i] = false;
1194 }
1195 else
1196 {
1197 /* must be LOGICALREP_COLUMN_NULL */
1198 slot->tts_values[i] = (Datum) 0;
1199 slot->tts_isnull[i] = true;
1200 }
1201
1202 /* Reset attnum for error callback */
1204 }
1205 }
1206
1207 /* And finally, declare that "slot" contains a valid virtual tuple */
1209}
1210
1211/*
1212 * Handle BEGIN message.
1213 */
1214static void
1216{
1218
1219 /* There must not be an active streaming transaction. */
1221
1224
1225 remote_final_lsn = begin_data.final_lsn;
1226
1228
1229 in_remote_transaction = true;
1230
1232}
1233
1234/*
1235 * Handle COMMIT message.
1236 *
1237 * TODO, support tracking of multiple origins
1238 */
1239static void
1241{
1243
1245
1246 if (commit_data.commit_lsn != remote_final_lsn)
1247 ereport(ERROR,
1249 errmsg_internal("incorrect commit LSN %X/%08X in commit message (expected %X/%08X)",
1250 LSN_FORMAT_ARGS(commit_data.commit_lsn),
1252
1254
1255 /*
1256 * Process any tables that are being synchronized in parallel, as well as
1257 * any newly added tables or sequences.
1258 */
1260
1263}
1264
1265/*
1266 * Handle BEGIN PREPARE message.
1267 */
1268static void
1270{
1272
1273 /* Tablesync should never receive prepare. */
1274 if (am_tablesync_worker())
1275 ereport(ERROR,
1277 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1278
1279 /* There must not be an active streaming transaction. */
1281
1284
1285 remote_final_lsn = begin_data.prepare_lsn;
1286
1288
1289 in_remote_transaction = true;
1290
1292}
1293
1294/*
1295 * Common function to prepare the GID.
1296 */
1297static void
1299{
1300 char gid[GIDSIZE];
1301
1302 /*
1303 * Compute unique GID for two_phase transactions. We don't use GID of
1304 * prepared transaction sent by server as that can lead to deadlock when
1305 * we have multiple subscriptions from same node point to publications on
1306 * the same node. See comments atop worker.c
1307 */
1309 gid, sizeof(gid));
1310
1311 /*
1312 * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1313 * called within the PrepareTransactionBlock below.
1314 */
1315 if (!IsTransactionBlock())
1316 {
1318 CommitTransactionCommand(); /* Completes the preceding Begin command. */
1319 }
1320
1321 /*
1322 * Update origin state so we can restart streaming from correct position
1323 * in case of crash.
1324 */
1327
1329}
1330
1331/*
1332 * Handle PREPARE message.
1333 */
1334static void
1336{
1338
1340
1341 if (prepare_data.prepare_lsn != remote_final_lsn)
1342 ereport(ERROR,
1344 errmsg_internal("incorrect prepare LSN %X/%08X in prepare message (expected %X/%08X)",
1345 LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1347
1348 /*
1349 * Unlike commit, here, we always prepare the transaction even though no
1350 * change has happened in this transaction or all changes are skipped. It
1351 * is done this way because at commit prepared time, we won't know whether
1352 * we have skipped preparing a transaction because of those reasons.
1353 *
1354 * XXX, We can optimize such that at commit prepared time, we first check
1355 * whether we have prepared the transaction or not but that doesn't seem
1356 * worthwhile because such cases shouldn't be common.
1357 */
1359
1361
1364 pgstat_report_stat(false);
1365
1366 /*
1367 * It is okay not to set the local_end LSN for the prepare because we
1368 * always flush the prepare record. So, we can send the acknowledgment of
1369 * the remote_end LSN as soon as prepare is finished.
1370 *
1371 * XXX For the sake of consistency with commit, we could have set it with
1372 * the LSN of prepare but as of now we don't track that value similar to
1373 * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
1374 * it.
1375 */
1377
1378 in_remote_transaction = false;
1379
1380 /*
1381 * Process any tables that are being synchronized in parallel, as well as
1382 * any newly added tables or sequences.
1383 */
1385
1386 /*
1387 * Since we have already prepared the transaction, in a case where the
1388 * server crashes before clearing the subskiplsn, it will be left but the
1389 * transaction won't be resent. But that's okay because it's a rare case
1390 * and the subskiplsn will be cleared when finishing the next transaction.
1391 */
1394
1397}
1398
1399/*
1400 * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1401 *
1402 * Note that we don't need to wait here if the transaction was prepared in a
1403 * parallel apply worker. In that case, we have already waited for the prepare
1404 * to finish in apply_handle_stream_prepare() which will ensure all the
1405 * operations in that transaction have happened in the subscriber, so no
1406 * concurrent transaction can cause deadlock or transaction dependency issues.
1407 */
1408static void
1410{
1412 char gid[GIDSIZE];
1413
1416
1417 /* Compute GID for two_phase transactions. */
1419 gid, sizeof(gid));
1420
1421 /* There is no transaction when COMMIT PREPARED is called */
1423
1424 /*
1425 * Update origin state so we can restart streaming from correct position
1426 * in case of crash.
1427 */
1430
1431 FinishPreparedTransaction(gid, true);
1434 pgstat_report_stat(false);
1435
1437 in_remote_transaction = false;
1438
1439 /*
1440 * Process any tables that are being synchronized in parallel, as well as
1441 * any newly added tables or sequences.
1442 */
1444
1446
1449}
1450
1451/*
1452 * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1453 *
1454 * Note that we don't need to wait here if the transaction was prepared in a
1455 * parallel apply worker. In that case, we have already waited for the prepare
1456 * to finish in apply_handle_stream_prepare() which will ensure all the
1457 * operations in that transaction have happened in the subscriber, so no
1458 * concurrent transaction can cause deadlock or transaction dependency issues.
1459 */
1460static void
1462{
1464 char gid[GIDSIZE];
1465
1468
1469 /* Compute GID for two_phase transactions. */
1471 gid, sizeof(gid));
1472
1473 /*
1474 * It is possible that we haven't received prepare because it occurred
1475 * before walsender reached a consistent point or the two_phase was still
1476 * not enabled by that time, so in such cases, we need to skip rollback
1477 * prepared.
1478 */
1479 if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1480 rollback_data.prepare_time))
1481 {
1482 /*
1483 * Update origin state so we can restart streaming from correct
1484 * position in case of crash.
1485 */
1488
1489 /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1491 FinishPreparedTransaction(gid, false);
1494
1496 }
1497
1498 pgstat_report_stat(false);
1499
1500 /*
1501 * It is okay not to set the local_end LSN for the rollback of prepared
1502 * transaction because we always flush the WAL record for it. See
1503 * apply_handle_prepare.
1504 */
1506 in_remote_transaction = false;
1507
1508 /*
1509 * Process any tables that are being synchronized in parallel, as well as
1510 * any newly added tables or sequences.
1511 */
1512 ProcessSyncingRelations(rollback_data.rollback_end_lsn);
1513
1516}
1517
1518/*
1519 * Handle STREAM PREPARE.
1520 */
1521static void
1523{
1527
1528 /* Save the message before it is consumed. */
1530
1532 ereport(ERROR,
1534 errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1535
1536 /* Tablesync should never receive prepare. */
1537 if (am_tablesync_worker())
1538 ereport(ERROR,
1540 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1541
1544
1546
1547 switch (apply_action)
1548 {
1549 case TRANS_LEADER_APPLY:
1550
1551 /*
1552 * The transaction has been serialized to file, so replay all the
1553 * spooled operations.
1554 */
1556 prepare_data.xid, prepare_data.prepare_lsn);
1557
1558 /* Mark the transaction as prepared. */
1560
1562
1563 /*
1564 * It is okay not to set the local_end LSN for the prepare because
1565 * we always flush the prepare record. See apply_handle_prepare.
1566 */
1568
1569 in_remote_transaction = false;
1570
1571 /* Unlink the files with serialized changes and subxact info. */
1573
1574 elog(DEBUG1, "finished processing the STREAM PREPARE command");
1575 break;
1576
1578 Assert(winfo);
1579
1580 if (pa_send_data(winfo, s->len, s->data))
1581 {
1582 /* Finish processing the streaming transaction. */
1583 pa_xact_finish(winfo, prepare_data.end_lsn);
1584 break;
1585 }
1586
1587 /*
1588 * Switch to serialize mode when we are not able to send the
1589 * change to parallel apply worker.
1590 */
1591 pa_switch_to_partial_serialize(winfo, true);
1592
1595 Assert(winfo);
1596
1599 &original_msg);
1600
1602
1603 /* Finish processing the streaming transaction. */
1604 pa_xact_finish(winfo, prepare_data.end_lsn);
1605 break;
1606
1608
1609 /*
1610 * If the parallel apply worker is applying spooled messages then
1611 * close the file before preparing.
1612 */
1613 if (stream_fd)
1615
1617
1618 /* Mark the transaction as prepared. */
1620
1622
1624
1625 /*
1626 * It is okay not to set the local_end LSN for the prepare because
1627 * we always flush the prepare record. See apply_handle_prepare.
1628 */
1630
1633
1635
1636 elog(DEBUG1, "finished processing the STREAM PREPARE command");
1637 break;
1638
1639 default:
1640 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1641 break;
1642 }
1643
1644 pgstat_report_stat(false);
1645
1646 /*
1647 * Process any tables that are being synchronized in parallel, as well as
1648 * any newly added tables or sequences.
1649 */
1651
1652 /*
1653 * Similar to prepare case, the subskiplsn could be left in a case of
1654 * server crash but it's okay. See the comments in apply_handle_prepare().
1655 */
1658
1660
1662}
1663
1664/*
1665 * Handle ORIGIN message.
1666 *
1667 * TODO, support tracking of multiple origins
1668 */
1669static void
1671{
1672 /*
1673 * ORIGIN message can only come inside streaming transaction or inside
1674 * remote transaction and before any actual writes.
1675 */
1679 ereport(ERROR,
1681 errmsg_internal("ORIGIN message sent out of order")));
1682}
1683
1684/*
1685 * Initialize fileset (if not already done).
1686 *
1687 * Create a new file when first_segment is true, otherwise open the existing
1688 * file.
1689 */
1690void
1692{
1694
1695 /*
1696 * Initialize the worker's stream_fileset if we haven't yet. This will be
1697 * used for the entire duration of the worker so create it in a permanent
1698 * context. We create this on the very first streaming message from any
1699 * transaction and then use it for this and other streaming transactions.
1700 * Now, we could create a fileset at the start of the worker as well but
1701 * then we won't be sure that it will ever be used.
1702 */
1704 {
1706
1708
1711
1713 }
1714
1715 /* Open the spool file for this transaction. */
1717
1718 /* If this is not the first segment, open existing subxact file. */
1719 if (!first_segment)
1721
1723}
1724
1725/*
1726 * Handle STREAM START message.
1727 */
1728static void
1730{
1731 bool first_segment;
1734
1735 /* Save the message before it is consumed. */
1737
1739 ereport(ERROR,
1741 errmsg_internal("duplicate STREAM START message")));
1742
1743 /* There must not be an active streaming transaction. */
1745
1746 /* notify handle methods we're processing a remote transaction */
1748
1749 /* extract XID of the top-level transaction */
1751
1753 ereport(ERROR,
1755 errmsg_internal("invalid transaction ID in streamed replication transaction")));
1756
1758
1759 /* Try to allocate a worker for the streaming transaction. */
1760 if (first_segment)
1762
1764
1765 switch (apply_action)
1766 {
1768
1769 /*
1770 * Function stream_start_internal starts a transaction. This
1771 * transaction will be committed on the stream stop unless it is a
1772 * tablesync worker in which case it will be committed after
1773 * processing all the messages. We need this transaction for
1774 * handling the BufFile, used for serializing the streaming data
1775 * and subxact info.
1776 */
1778 break;
1779
1781 Assert(winfo);
1782
1783 /*
1784 * Once we start serializing the changes, the parallel apply
1785 * worker will wait for the leader to release the stream lock
1786 * until the end of the transaction. So, we don't need to release
1787 * the lock or increment the stream count in that case.
1788 */
1789 if (pa_send_data(winfo, s->len, s->data))
1790 {
1791 /*
1792 * Unlock the shared object lock so that the parallel apply
1793 * worker can continue to receive changes.
1794 */
1795 if (!first_segment)
1797
1798 /*
1799 * Increment the number of streaming blocks waiting to be
1800 * processed by parallel apply worker.
1801 */
1803
1804 /* Cache the parallel apply worker for this transaction. */
1806 break;
1807 }
1808
1809 /*
1810 * Switch to serialize mode when we are not able to send the
1811 * change to parallel apply worker.
1812 */
1814
1817 Assert(winfo);
1818
1819 /*
1820 * Open the spool file unless it was already opened when switching
1821 * to serialize mode. The transaction started in
1822 * stream_start_internal will be committed on the stream stop.
1823 */
1826
1828
1829 /* Cache the parallel apply worker for this transaction. */
1831 break;
1832
1834 if (first_segment)
1835 {
1836 /* Hold the lock until the end of the transaction. */
1839
1840 /*
1841 * Signal the leader apply worker, as it may be waiting for
1842 * us.
1843 */
1846 }
1847
1849 break;
1850
1851 default:
1852 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1853 break;
1854 }
1855
1857}
1858
1859/*
1860 * Update the information about subxacts and close the file.
1861 *
1862 * This function should be called when the stream_start_internal function has
1863 * been called.
1864 */
1865void
1867{
1868 /*
1869 * Serialize information about subxacts for the toplevel transaction, then
1870 * close the stream messages spool file.
1871 */
1874
1875 /* We must be in a valid transaction state */
1877
1878 /* Commit the per-stream transaction */
1880
1881 /* Reset per-stream context */
1883}
1884
1885/*
1886 * Handle STREAM STOP message.
1887 */
1888static void
1890{
1893
1895 ereport(ERROR,
1897 errmsg_internal("STREAM STOP message without STREAM START")));
1898
1900
1901 switch (apply_action)
1902 {
1905 break;
1906
1908 Assert(winfo);
1909
1910 /*
1911 * Lock before sending the STREAM_STOP message so that the leader
1912 * can hold the lock first and the parallel apply worker will wait
1913 * for leader to release the lock. See Locking Considerations atop
1914 * applyparallelworker.c.
1915 */
1917
1918 if (pa_send_data(winfo, s->len, s->data))
1919 {
1921 break;
1922 }
1923
1924 /*
1925 * Switch to serialize mode when we are not able to send the
1926 * change to parallel apply worker.
1927 */
1928 pa_switch_to_partial_serialize(winfo, true);
1929
1935 break;
1936
1938 elog(DEBUG1, "applied %u changes in the streaming chunk",
1940
1941 /*
1942 * By the time parallel apply worker is processing the changes in
1943 * the current streaming block, the leader apply worker may have
1944 * sent multiple streaming blocks. This can lead to parallel apply
1945 * worker start waiting even when there are more chunk of streams
1946 * in the queue. So, try to lock only if there is no message left
1947 * in the queue. See Locking Considerations atop
1948 * applyparallelworker.c.
1949 *
1950 * Note that here we have a race condition where we can start
1951 * waiting even when there are pending streaming chunks. This can
1952 * happen if the leader sends another streaming block and acquires
1953 * the stream lock again after the parallel apply worker checks
1954 * that there is no pending streaming block and before it actually
1955 * starts waiting on a lock. We can handle this case by not
1956 * allowing the leader to increment the stream block count during
1957 * the time parallel apply worker acquires the lock but it is not
1958 * clear whether that is worth the complexity.
1959 *
1960 * Now, if this missed chunk contains rollback to savepoint, then
1961 * there is a risk of deadlock which probably shouldn't happen
1962 * after restart.
1963 */
1965 break;
1966
1967 default:
1968 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1969 break;
1970 }
1971
1974
1975 /*
1976 * The parallel apply worker could be in a transaction in which case we
1977 * need to report the state as STATE_IDLEINTRANSACTION.
1978 */
1981 else
1983
1985}
1986
1987/*
1988 * Helper function to handle STREAM ABORT message when the transaction was
1989 * serialized to file.
1990 */
1991static void
1993{
1994 /*
1995 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1996 * just delete the files with serialized info.
1997 */
1998 if (xid == subxid)
2000 else
2001 {
2002 /*
2003 * OK, so it's a subxact. We need to read the subxact file for the
2004 * toplevel transaction, determine the offset tracked for the subxact,
2005 * and truncate the file with changes. We also remove the subxacts
2006 * with higher offsets (or rather higher XIDs).
2007 *
2008 * We intentionally scan the array from the tail, because we're likely
2009 * aborting a change for the most recent subtransactions.
2010 *
2011 * We can't use the binary search here as subxact XIDs won't
2012 * necessarily arrive in sorted order, consider the case where we have
2013 * released the savepoint for multiple subtransactions and then
2014 * performed rollback to savepoint for one of the earlier
2015 * sub-transaction.
2016 */
2017 int64 i;
2018 int64 subidx;
2019 BufFile *fd;
2020 bool found = false;
2021 char path[MAXPGPATH];
2022
2023 subidx = -1;
2026
2027 for (i = subxact_data.nsubxacts; i > 0; i--)
2028 {
2029 if (subxact_data.subxacts[i - 1].xid == subxid)
2030 {
2031 subidx = (i - 1);
2032 found = true;
2033 break;
2034 }
2035 }
2036
2037 /*
2038 * If it's an empty sub-transaction then we will not find the subxid
2039 * here so just cleanup the subxact info and return.
2040 */
2041 if (!found)
2042 {
2043 /* Cleanup the subxact info */
2047 return;
2048 }
2049
2050 /* open the changes file */
2053 O_RDWR, false);
2054
2055 /* OK, truncate the file at the right offset */
2059
2060 /* discard the subxacts added later */
2062
2063 /* write the updated subxact list */
2065
2068 }
2069}
2070
2071/*
2072 * Handle STREAM ABORT message.
2073 */
2074static void
2076{
2077 TransactionId xid;
2078 TransactionId subxid;
2082
2083 /* Save the message before it is consumed. */
2085 bool toplevel_xact;
2086
2088 ereport(ERROR,
2090 errmsg_internal("STREAM ABORT message without STREAM STOP")));
2091
2092 /* We receive abort information only when we can apply in parallel. */
2095
2096 xid = abort_data.xid;
2097 subxid = abort_data.subxid;
2098 toplevel_xact = (xid == subxid);
2099
2100 set_apply_error_context_xact(subxid, abort_data.abort_lsn);
2101
2103
2104 switch (apply_action)
2105 {
2106 case TRANS_LEADER_APPLY:
2107
2108 /*
2109 * We are in the leader apply worker and the transaction has been
2110 * serialized to file.
2111 */
2112 stream_abort_internal(xid, subxid);
2113
2114 elog(DEBUG1, "finished processing the STREAM ABORT command");
2115 break;
2116
2118 Assert(winfo);
2119
2120 /*
2121 * For the case of aborting the subtransaction, we increment the
2122 * number of streaming blocks and take the lock again before
2123 * sending the STREAM_ABORT to ensure that the parallel apply
2124 * worker will wait on the lock for the next set of changes after
2125 * processing the STREAM_ABORT message if it is not already
2126 * waiting for STREAM_STOP message.
2127 *
2128 * It is important to perform this locking before sending the
2129 * STREAM_ABORT message so that the leader can hold the lock first
2130 * and the parallel apply worker will wait for the leader to
2131 * release the lock. This is the same as what we do in
2132 * apply_handle_stream_stop. See Locking Considerations atop
2133 * applyparallelworker.c.
2134 */
2135 if (!toplevel_xact)
2136 {
2140 }
2141
2142 if (pa_send_data(winfo, s->len, s->data))
2143 {
2144 /*
2145 * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
2146 * wait here for the parallel apply worker to finish as that
2147 * is not required to maintain the commit order and won't have
2148 * the risk of failures due to transaction dependencies and
2149 * deadlocks. However, it is possible that before the parallel
2150 * worker finishes and we clear the worker info, the xid
2151 * wraparound happens on the upstream and a new transaction
2152 * with the same xid can appear and that can lead to duplicate
2153 * entries in ParallelApplyTxnHash. Yet another problem could
2154 * be that we may have serialized the changes in partial
2155 * serialize mode and the file containing xact changes may
2156 * already exist, and after xid wraparound trying to create
2157 * the file for the same xid can lead to an error. To avoid
2158 * these problems, we decide to wait for the aborts to finish.
2159 *
2160 * Note, it is okay to not update the flush location position
2161 * for aborts as in worst case that means such a transaction
2162 * won't be sent again after restart.
2163 */
2164 if (toplevel_xact)
2166
2167 break;
2168 }
2169
2170 /*
2171 * Switch to serialize mode when we are not able to send the
2172 * change to parallel apply worker.
2173 */
2174 pa_switch_to_partial_serialize(winfo, true);
2175
2178 Assert(winfo);
2179
2180 /*
2181 * Parallel apply worker might have applied some changes, so write
2182 * the STREAM_ABORT message so that it can rollback the
2183 * subtransaction if needed.
2184 */
2186 &original_msg);
2187
2188 if (toplevel_xact)
2189 {
2192 }
2193 break;
2194
2196
2197 /*
2198 * If the parallel apply worker is applying spooled messages then
2199 * close the file before aborting.
2200 */
2201 if (toplevel_xact && stream_fd)
2203
2205
2206 /*
2207 * We need to wait after processing rollback to savepoint for the
2208 * next set of changes.
2209 *
2210 * We have a race condition here due to which we can start waiting
2211 * here when there are more chunk of streams in the queue. See
2212 * apply_handle_stream_stop.
2213 */
2214 if (!toplevel_xact)
2216
2217 elog(DEBUG1, "finished processing the STREAM ABORT command");
2218 break;
2219
2220 default:
2221 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2222 break;
2223 }
2224
2226}
2227
2228/*
2229 * Ensure that the passed location is fileset's end.
2230 */
2231static void
2232ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
2233 pgoff_t offset)
2234{
2235 char path[MAXPGPATH];
2236 BufFile *fd;
2237 int last_fileno;
2239
2241
2243
2245
2246 fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2247
2248 BufFileSeek(fd, 0, 0, SEEK_END);
2250
2252
2254
2255 if (last_fileno != fileno || last_offset != offset)
2256 elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2257 path);
2258}
2259
2260/*
2261 * Common spoolfile processing.
2262 */
2263void
2265 XLogRecPtr lsn)
2266{
2267 int nchanges;
2268 char path[MAXPGPATH];
2269 char *buffer = NULL;
2271 ResourceOwner oldowner;
2272 int fileno;
2273 pgoff_t offset;
2274
2277
2278 /* Make sure we have an open transaction */
2280
2281 /*
2282 * Allocate file handle and memory required to process all the messages in
2283 * TopTransactionContext to avoid them getting reset after each message is
2284 * processed.
2285 */
2287
2288 /* Open the spool file for the committed/prepared transaction */
2290 elog(DEBUG1, "replaying changes from file \"%s\"", path);
2291
2292 /*
2293 * Make sure the file is owned by the toplevel transaction so that the
2294 * file will not be accidentally closed when aborting a subtransaction.
2295 */
2296 oldowner = CurrentResourceOwner;
2298
2299 stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2300
2301 CurrentResourceOwner = oldowner;
2302
2303 buffer = palloc(BLCKSZ);
2304
2306
2307 remote_final_lsn = lsn;
2308
2309 /*
2310 * Make sure the handle apply_dispatch methods are aware we're in a remote
2311 * transaction.
2312 */
2313 in_remote_transaction = true;
2315
2317
2318 /*
2319 * Read the entries one by one and pass them through the same logic as in
2320 * apply_dispatch.
2321 */
2322 nchanges = 0;
2323 while (true)
2324 {
2326 size_t nbytes;
2327 int len;
2328
2330
2331 /* read length of the on-disk record */
2332 nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2333
2334 /* have we reached end of the file? */
2335 if (nbytes == 0)
2336 break;
2337
2338 /* do we have a correct length? */
2339 if (len <= 0)
2340 elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2341 len, path);
2342
2343 /* make sure we have sufficiently large buffer */
2344 buffer = repalloc(buffer, len);
2345
2346 /* and finally read the data into the buffer */
2347 BufFileReadExact(stream_fd, buffer, len);
2348
2349 BufFileTell(stream_fd, &fileno, &offset);
2350
2351 /* init a stringinfo using the buffer and call apply_dispatch */
2352 initReadOnlyStringInfo(&s2, buffer, len);
2353
2354 /* Ensure we are reading the data into our memory context. */
2356
2358
2360
2362
2363 nchanges++;
2364
2365 /*
2366 * It is possible the file has been closed because we have processed
2367 * the transaction end message like stream_commit in which case that
2368 * must be the last message.
2369 */
2370 if (!stream_fd)
2371 {
2372 ensure_last_message(stream_fileset, xid, fileno, offset);
2373 break;
2374 }
2375
2376 if (nchanges % 1000 == 0)
2377 elog(DEBUG1, "replayed %d changes from file \"%s\"",
2378 nchanges, path);
2379 }
2380
2381 if (stream_fd)
2383
2384 elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2385 nchanges, path);
2386
2387 return;
2388}
2389
2390/*
2391 * Handle STREAM COMMIT message.
2392 */
2393static void
2395{
2396 TransactionId xid;
2400
2401 /* Save the message before it is consumed. */
2403
2405 ereport(ERROR,
2407 errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2408
2411
2413
2414 switch (apply_action)
2415 {
2416 case TRANS_LEADER_APPLY:
2417
2418 /*
2419 * The transaction has been serialized to file, so replay all the
2420 * spooled operations.
2421 */
2423 commit_data.commit_lsn);
2424
2426
2427 /* Unlink the files with serialized changes and subxact info. */
2429
2430 elog(DEBUG1, "finished processing the STREAM COMMIT command");
2431 break;
2432
2434 Assert(winfo);
2435
2436 if (pa_send_data(winfo, s->len, s->data))
2437 {
2438 /* Finish processing the streaming transaction. */
2439 pa_xact_finish(winfo, commit_data.end_lsn);
2440 break;
2441 }
2442
2443 /*
2444 * Switch to serialize mode when we are not able to send the
2445 * change to parallel apply worker.
2446 */
2447 pa_switch_to_partial_serialize(winfo, true);
2448
2451 Assert(winfo);
2452
2454 &original_msg);
2455
2457
2458 /* Finish processing the streaming transaction. */
2459 pa_xact_finish(winfo, commit_data.end_lsn);
2460 break;
2461
2463
2464 /*
2465 * If the parallel apply worker is applying spooled messages then
2466 * close the file before committing.
2467 */
2468 if (stream_fd)
2470
2472
2474
2475 /*
2476 * It is important to set the transaction state as finished before
2477 * releasing the lock. See pa_wait_for_xact_finish.
2478 */
2481
2483
2484 elog(DEBUG1, "finished processing the STREAM COMMIT command");
2485 break;
2486
2487 default:
2488 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2489 break;
2490 }
2491
2492 /*
2493 * Process any tables that are being synchronized in parallel, as well as
2494 * any newly added tables or sequences.
2495 */
2497
2499
2501}
2502
2503/*
2504 * Helper function for apply_handle_commit and apply_handle_stream_commit.
2505 */
2506static void
2508{
2509 if (is_skipping_changes())
2510 {
2512
2513 /*
2514 * Start a new transaction to clear the subskiplsn, if not started
2515 * yet.
2516 */
2517 if (!IsTransactionState())
2519 }
2520
2521 if (IsTransactionState())
2522 {
2523 /*
2524 * The transaction is either non-empty or skipped, so we clear the
2525 * subskiplsn.
2526 */
2528
2529 /*
2530 * Update origin state so we can restart streaming from correct
2531 * position in case of crash.
2532 */
2535
2537
2538 if (IsTransactionBlock())
2539 {
2540 EndTransactionBlock(false);
2542 }
2543
2544 pgstat_report_stat(false);
2545
2547 }
2548 else
2549 {
2550 /* Process any invalidation messages that might have accumulated. */
2553 }
2554
2555 in_remote_transaction = false;
2556}
2557
2558/*
2559 * Handle RELATION message.
2560 *
2561 * Note we don't do validation against local schema here. The validation
2562 * against local schema is postponed until first change for given relation
2563 * comes as we only care about it when applying changes for it anyway and we
2564 * do less locking this way.
2565 */
2566static void
2568{
2569 LogicalRepRelation *rel;
2570
2572 return;
2573
2574 rel = logicalrep_read_rel(s);
2576
2577 /* Also reset all entries in the partition map that refer to remoterel. */
2579}
2580
2581/*
2582 * Handle TYPE message.
2583 *
2584 * This implementation pays no attention to TYPE messages; we expect the user
2585 * to have set things up so that the incoming data is acceptable to the input
2586 * functions for the locally subscribed tables. Hence, we just read and
2587 * discard the message.
2588 */
2589static void
2599
2600/*
2601 * Check that we (the subscription owner) have sufficient privileges on the
2602 * target relation to perform the given operation.
2603 */
2604static void
2606{
2607 Oid relid;
2609
2610 relid = RelationGetRelid(rel);
2612 if (aclresult != ACLCHECK_OK)
2614 get_relkind_objtype(rel->rd_rel->relkind),
2615 get_rel_name(relid));
2616
2617 /*
2618 * We lack the infrastructure to honor RLS policies. It might be possible
2619 * to add such infrastructure here, but tablesync workers lack it, too, so
2620 * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2621 * but it seems dangerous to replicate a TRUNCATE and then refuse to
2622 * replicate subsequent INSERTs, so we forbid all commands the same.
2623 */
2624 if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2625 ereport(ERROR,
2627 errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2630}
2631
2632/*
2633 * Handle INSERT message.
2634 */
2635
2636static void
2638{
2641 LogicalRepRelId relid;
2644 EState *estate;
2647 bool run_as_owner;
2648
2649 /*
2650 * Quick return if we are skipping data modification changes or handling
2651 * streamed transactions.
2652 */
2653 if (is_skipping_changes() ||
2655 return;
2656
2658
2659 relid = logicalrep_read_insert(s, &newtup);
2662 {
2663 /*
2664 * The relation can't become interesting in the middle of the
2665 * transaction so it's safe to unlock it.
2666 */
2669 return;
2670 }
2671
2672 /*
2673 * Make sure that any user-supplied code runs as the table owner, unless
2674 * the user has opted out of that behavior.
2675 */
2677 if (!run_as_owner)
2678 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2679
2680 /* Set relation for error callback */
2682
2683 /* Initialize the executor state. */
2685 estate = edata->estate;
2688 &TTSOpsVirtual);
2689
2690 /* Process and store remote tuple in the slot */
2693 slot_fill_defaults(rel, estate, remoteslot);
2695
2696 /* For a partitioned table, insert the tuple into a partition. */
2697 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2700 else
2701 {
2702 ResultRelInfo *relinfo = edata->targetRelInfo;
2703
2704 ExecOpenIndices(relinfo, false);
2707 }
2708
2710
2711 /* Reset relation for error callback */
2713
2714 if (!run_as_owner)
2716
2718
2720}
2721
2722/*
2723 * Workhorse for apply_handle_insert()
2724 * relinfo is for the relation we're actually inserting into
2725 * (could be a child partition of edata->targetRelInfo)
2726 */
2727static void
2731{
2732 EState *estate = edata->estate;
2733
2734 /* Caller should have opened indexes already. */
2735 Assert(relinfo->ri_IndexRelationDescs != NULL ||
2736 !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
2737 RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
2738
2739 /* Caller will not have done this bit. */
2740 Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
2742
2743 /* Do the insert. */
2744 TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
2746}
2747
2748/*
2749 * Check if the logical replication relation is updatable and throw
2750 * appropriate error if it isn't.
2751 */
2752static void
2754{
2755 /*
2756 * For partitioned tables, we only need to care if the target partition is
2757 * updatable (aka has PK or RI defined for it).
2758 */
2759 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2760 return;
2761
2762 /* Updatable, no error. */
2763 if (rel->updatable)
2764 return;
2765
2766 /*
2767 * We are in error mode so it's fine this is somewhat slow. It's better to
2768 * give user correct error.
2769 */
2771 {
2772 ereport(ERROR,
2774 errmsg("publisher did not send replica identity column "
2775 "expected by the logical replication target relation \"%s.%s\"",
2776 rel->remoterel.nspname, rel->remoterel.relname)));
2777 }
2778
2779 ereport(ERROR,
2781 errmsg("logical replication target relation \"%s.%s\" has "
2782 "neither REPLICA IDENTITY index nor PRIMARY "
2783 "KEY and published relation does not have "
2784 "REPLICA IDENTITY FULL",
2785 rel->remoterel.nspname, rel->remoterel.relname)));
2786}
2787
2788/*
2789 * Handle UPDATE message.
2790 *
2791 * TODO: FDW support
2792 */
2793static void
2795{
2797 LogicalRepRelId relid;
2800 EState *estate;
2803 bool has_oldtup;
2807 bool run_as_owner;
2808
2809 /*
2810 * Quick return if we are skipping data modification changes or handling
2811 * streamed transactions.
2812 */
2813 if (is_skipping_changes() ||
2815 return;
2816
2818
2820 &newtup);
2823 {
2824 /*
2825 * The relation can't become interesting in the middle of the
2826 * transaction so it's safe to unlock it.
2827 */
2830 return;
2831 }
2832
2833 /* Set relation for error callback */
2835
2836 /* Check if we can do the update. */
2838
2839 /*
2840 * Make sure that any user-supplied code runs as the table owner, unless
2841 * the user has opted out of that behavior.
2842 */
2844 if (!run_as_owner)
2845 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2846
2847 /* Initialize the executor state. */
2849 estate = edata->estate;
2852 &TTSOpsVirtual);
2853
2854 /*
2855 * Populate updatedCols so that per-column triggers can fire, and so
2856 * executor can correctly pass down indexUnchanged hint. This could
2857 * include more columns than were actually changed on the publisher
2858 * because the logical replication protocol doesn't contain that
2859 * information. But it would for example exclude columns that only exist
2860 * on the subscriber, since we are not touching those.
2861 */
2863 for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2864 {
2865 CompactAttribute *att = TupleDescCompactAttr(remoteslot->tts_tupleDescriptor, i);
2866 int remoteattnum = rel->attrmap->attnums[i];
2867
2868 if (!att->attisdropped && remoteattnum >= 0)
2869 {
2870 Assert(remoteattnum < newtup.ncols);
2872 target_perminfo->updatedCols =
2873 bms_add_member(target_perminfo->updatedCols,
2875 }
2876 }
2877
2878 /* Build the search tuple. */
2881 has_oldtup ? &oldtup : &newtup);
2883
2884 /* For a partitioned table, apply update to correct partition. */
2885 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2888 else
2891
2893
2894 /* Reset relation for error callback */
2896
2897 if (!run_as_owner)
2899
2901
2903}
2904
2905/*
2906 * Workhorse for apply_handle_update()
2907 * relinfo is for the relation we're actually updating in
2908 * (could be a child partition of edata->targetRelInfo)
2909 */
2910static void
2915 Oid localindexoid)
2916{
2917 EState *estate = edata->estate;
2918 LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2919 Relation localrel = relinfo->ri_RelationDesc;
2920 EPQState epqstate;
2923 bool found;
2925
2926 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2927 ExecOpenIndices(relinfo, false);
2928
2929 found = FindReplTupleInLocalRel(edata, localrel,
2930 &relmapentry->remoterel,
2931 localindexoid,
2933
2934 /*
2935 * Tuple found.
2936 *
2937 * Note this will fail if there are other conflicting unique indexes.
2938 */
2939 if (found)
2940 {
2941 /*
2942 * Report the conflict if the tuple was modified by a different
2943 * origin.
2944 */
2946 &conflicttuple.origin, &conflicttuple.ts) &&
2948 {
2950
2951 /* Store the new tuple for conflict reporting */
2952 newslot = table_slot_create(localrel, &estate->es_tupleTable);
2953 slot_store_data(newslot, relmapentry, newtup);
2954
2955 conflicttuple.slot = localslot;
2956
2960 }
2961
2962 /* Process and store remote tuple in the slot */
2966
2967 EvalPlanQualSetSlot(&epqstate, remoteslot);
2968
2970
2971 /* Do the actual update. */
2972 TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
2973 ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2974 remoteslot);
2975 }
2976 else
2977 {
2980
2981 /*
2982 * Detecting whether the tuple was recently deleted or never existed
2983 * is crucial to avoid misleading the user during conflict handling.
2984 */
2985 if (FindDeletedTupleInLocalRel(localrel, localindexoid, remoteslot,
2986 &conflicttuple.xmin,
2987 &conflicttuple.origin,
2988 &conflicttuple.ts) &&
2991 else
2993
2994 /* Store the new tuple for conflict reporting */
2995 slot_store_data(newslot, relmapentry, newtup);
2996
2997 /*
2998 * The tuple to be updated could not be found or was deleted. Do
2999 * nothing except for emitting a log message.
3000 */
3003 }
3004
3005 /* Cleanup. */
3007 EvalPlanQualEnd(&epqstate);
3008}
3009
3010/*
3011 * Handle DELETE message.
3012 *
3013 * TODO: FDW support
3014 */
3015static void
3017{
3020 LogicalRepRelId relid;
3023 EState *estate;
3026 bool run_as_owner;
3027
3028 /*
3029 * Quick return if we are skipping data modification changes or handling
3030 * streamed transactions.
3031 */
3032 if (is_skipping_changes() ||
3034 return;
3035
3037
3038 relid = logicalrep_read_delete(s, &oldtup);
3041 {
3042 /*
3043 * The relation can't become interesting in the middle of the
3044 * transaction so it's safe to unlock it.
3045 */
3048 return;
3049 }
3050
3051 /* Set relation for error callback */
3053
3054 /* Check if we can do the delete. */
3056
3057 /*
3058 * Make sure that any user-supplied code runs as the table owner, unless
3059 * the user has opted out of that behavior.
3060 */
3062 if (!run_as_owner)
3063 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
3064
3065 /* Initialize the executor state. */
3067 estate = edata->estate;
3070 &TTSOpsVirtual);
3071
3072 /* Build the search tuple. */
3076
3077 /* For a partitioned table, apply delete to correct partition. */
3078 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3081 else
3082 {
3083 ResultRelInfo *relinfo = edata->targetRelInfo;
3084
3085 ExecOpenIndices(relinfo, false);
3089 }
3090
3092
3093 /* Reset relation for error callback */
3095
3096 if (!run_as_owner)
3098
3100
3102}
3103
3104/*
3105 * Workhorse for apply_handle_delete()
3106 * relinfo is for the relation we're actually deleting from
3107 * (could be a child partition of edata->targetRelInfo)
3108 */
3109static void
3113 Oid localindexoid)
3114{
3115 EState *estate = edata->estate;
3116 Relation localrel = relinfo->ri_RelationDesc;
3117 LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
3118 EPQState epqstate;
3121 bool found;
3122
3123 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3124
3125 /* Caller should have opened indexes already. */
3126 Assert(relinfo->ri_IndexRelationDescs != NULL ||
3127 !localrel->rd_rel->relhasindex ||
3128 RelationGetIndexList(localrel) == NIL);
3129
3130 found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
3132
3133 /* If found delete it. */
3134 if (found)
3135 {
3136 /*
3137 * Report the conflict if the tuple was modified by a different
3138 * origin.
3139 */
3141 &conflicttuple.origin, &conflicttuple.ts) &&
3143 {
3144 conflicttuple.slot = localslot;
3148 }
3149
3150 EvalPlanQualSetSlot(&epqstate, localslot);
3151
3152 /* Do the actual delete. */
3153 TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
3154 ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
3155 }
3156 else
3157 {
3158 /*
3159 * The tuple to be deleted could not be found. Do nothing except for
3160 * emitting a log message.
3161 */
3164 }
3165
3166 /* Cleanup. */
3167 EvalPlanQualEnd(&epqstate);
3168}
3169
3170/*
3171 * Try to find a tuple received from the publication side (in 'remoteslot') in
3172 * the corresponding local relation using either replica identity index,
3173 * primary key, index or if needed, sequential scan.
3174 *
3175 * Local tuple, if found, is returned in '*localslot'.
3176 */
3177static bool
3179 LogicalRepRelation *remoterel,
3183{
3184 EState *estate = edata->estate;
3185 bool found;
3186
3187 /*
3188 * Regardless of the top-level operation, we're performing a read here, so
3189 * check for SELECT privileges.
3190 */
3192
3193 *localslot = table_slot_create(localrel, &estate->es_tupleTable);
3194
3196 (remoterel->replident == REPLICA_IDENTITY_FULL));
3197
3199 {
3200#ifdef USE_ASSERT_CHECKING
3202
3203 /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
3205 (remoterel->replident == REPLICA_IDENTITY_FULL &&
3207 edata->targetRel->attrmap)));
3209#endif
3210
3211 found = RelationFindReplTupleByIndex(localrel, localidxoid,
3214 }
3215 else
3218
3219 return found;
3220}
3221
3222/*
3223 * Determine whether the index can reliably locate the deleted tuple in the
3224 * local relation.
3225 *
3226 * An index may exclude deleted tuples if it was re-indexed or re-created during
3227 * change application. Therefore, an index is considered usable only if the
3228 * conflict detection slot.xmin (conflict_detection_xmin) is greater than the
3229 * index tuple's xmin. This ensures that any tuples deleted prior to the index
3230 * creation or re-indexing are not relevant for conflict detection in the
3231 * current apply worker.
3232 *
3233 * Note that indexes may also be excluded if they were modified by other DDL
3234 * operations, such as ALTER INDEX. However, this is acceptable, as the
3235 * likelihood of such DDL changes coinciding with the need to scan dead
3236 * tuples for the update_deleted is low.
3237 */
3238static bool
3241{
3244
3246
3247 if (!HeapTupleIsValid(index_tuple)) /* should not happen */
3248 elog(ERROR, "cache lookup failed for index %u", localindexoid);
3249
3250 /*
3251 * No need to check for a frozen transaction ID, as
3252 * TransactionIdPrecedes() manages it internally, treating it as falling
3253 * behind the conflict_detection_xmin.
3254 */
3256
3258
3260}
3261
3262/*
3263 * Attempts to locate a deleted tuple in the local relation that matches the
3264 * values of the tuple received from the publication side (in 'remoteslot').
3265 * The search is performed using either the replica identity index, primary
3266 * key, other available index, or a sequential scan if necessary.
3267 *
3268 * Returns true if the deleted tuple is found. If found, the transaction ID,
3269 * origin, and commit timestamp of the deletion are stored in '*delete_xid',
3270 * '*delete_origin', and '*delete_time' respectively.
3271 */
3272static bool
3277{
3279
3280 /*
3281 * Return false if either dead tuples are not retained or commit timestamp
3282 * data is not available.
3283 */
3285 return false;
3286
3287 /*
3288 * For conflict detection, we use the leader worker's
3289 * oldest_nonremovable_xid value instead of invoking
3290 * GetOldestNonRemovableTransactionId() or using the conflict detection
3291 * slot's xmin. The oldest_nonremovable_xid acts as a threshold to
3292 * identify tuples that were recently deleted. These deleted tuples are no
3293 * longer visible to concurrent transactions. However, if a remote update
3294 * matches such a tuple, we log an update_deleted conflict.
3295 *
3296 * While GetOldestNonRemovableTransactionId() and slot.xmin may return
3297 * transaction IDs older than oldest_nonremovable_xid, for our current
3298 * purpose, it is acceptable to treat tuples deleted by transactions prior
3299 * to oldest_nonremovable_xid as update_missing conflicts.
3300 */
3302 {
3304 }
3305 else
3306 {
3307 LogicalRepWorker *leader;
3308
3309 /*
3310 * Obtain the information from the leader apply worker as only the
3311 * leader manages oldest_nonremovable_xid (see
3312 * maybe_advance_nonremovable_xid() for details).
3313 */
3317 false);
3318 if (!leader)
3319 {
3320 ereport(ERROR,
3322 errmsg("could not detect conflict as the leader apply worker has exited")));
3323 }
3324
3325 SpinLockAcquire(&leader->relmutex);
3327 SpinLockRelease(&leader->relmutex);
3329 }
3330
3331 /*
3332 * Return false if the leader apply worker has stopped retaining
3333 * information for detecting conflicts. This implies that update_deleted
3334 * can no longer be reliably detected.
3335 */
3337 return false;
3338
3339 if (OidIsValid(localidxoid) &&
3344 delete_time);
3345 else
3349}
3350
3351/*
3352 * This handles insert, update, delete on a partitioned table.
3353 */
3354static void
3358 CmdType operation)
3359{
3360 EState *estate = edata->estate;
3361 LogicalRepRelMapEntry *relmapentry = edata->targetRel;
3362 ResultRelInfo *relinfo = edata->targetRelInfo;
3363 Relation parentrel = relinfo->ri_RelationDesc;
3364 ModifyTableState *mtstate;
3365 PartitionTupleRouting *proute;
3367 Relation partrel;
3369 TupleConversionMap *map;
3372 AttrMap *attrmap = NULL;
3373
3374 /* ModifyTableState is needed for ExecFindPartition(). */
3375 edata->mtstate = mtstate = makeNode(ModifyTableState);
3376 mtstate->ps.plan = NULL;
3377 mtstate->ps.state = estate;
3378 mtstate->operation = operation;
3379 mtstate->resultRelInfo = relinfo;
3380
3381 /* ... as is PartitionTupleRouting. */
3382 edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
3383
3384 /*
3385 * Find the partition to which the "search tuple" belongs.
3386 */
3389 partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
3390 remoteslot, estate);
3392 partrel = partrelinfo->ri_RelationDesc;
3393
3394 /*
3395 * Check for supported relkind. We need this since partitions might be of
3396 * unsupported relkinds; and the set of partitions can change, so checking
3397 * at CREATE/ALTER SUBSCRIPTION would be insufficient.
3398 */
3399 CheckSubscriptionRelkind(partrel->rd_rel->relkind,
3400 relmapentry->remoterel.relkind,
3402 RelationGetRelationName(partrel));
3403
3404 /*
3405 * To perform any of the operations below, the tuple must match the
3406 * partition's rowtype. Convert if needed or just copy, using a dedicated
3407 * slot to store the tuple in any case.
3408 */
3409 remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
3410 if (remoteslot_part == NULL)
3411 remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
3412 map = ExecGetRootToChildMap(partrelinfo, estate);
3413 if (map != NULL)
3414 {
3415 attrmap = map->attrMap;
3418 }
3419 else
3420 {
3423 }
3425
3426 /* Check if we can do the update or delete on the leaf partition. */
3427 if (operation == CMD_UPDATE || operation == CMD_DELETE)
3428 {
3429 part_entry = logicalrep_partition_open(relmapentry, partrel,
3430 attrmap);
3432 }
3433
3434 switch (operation)
3435 {
3436 case CMD_INSERT:
3439 break;
3440
3441 case CMD_DELETE:
3444 part_entry->localindexoid);
3445 break;
3446
3447 case CMD_UPDATE:
3448
3449 /*
3450 * For UPDATE, depending on whether or not the updated tuple
3451 * satisfies the partition's constraint, perform a simple UPDATE
3452 * of the partition or move the updated tuple into a different
3453 * suitable partition.
3454 */
3455 {
3459 bool found;
3460 EPQState epqstate;
3462
3463 /* Get the matching local tuple from the partition. */
3464 found = FindReplTupleInLocalRel(edata, partrel,
3465 &part_entry->remoterel,
3466 part_entry->localindexoid,
3468 if (!found)
3469 {
3472
3473 /*
3474 * Detecting whether the tuple was recently deleted or
3475 * never existed is crucial to avoid misleading the user
3476 * during conflict handling.
3477 */
3478 if (FindDeletedTupleInLocalRel(partrel,
3479 part_entry->localindexoid,
3481 &conflicttuple.xmin,
3482 &conflicttuple.origin,
3483 &conflicttuple.ts) &&
3486 else
3488
3489 /* Store the new tuple for conflict reporting */
3491
3492 /*
3493 * The tuple to be updated could not be found or was
3494 * deleted. Do nothing except for emitting a log message.
3495 */
3499
3500 return;
3501 }
3502
3503 /*
3504 * Report the conflict if the tuple was modified by a
3505 * different origin.
3506 */
3508 &conflicttuple.origin,
3509 &conflicttuple.ts) &&
3511 {
3513
3514 /* Store the new tuple for conflict reporting */
3515 newslot = table_slot_create(partrel, &estate->es_tupleTable);
3517
3518 conflicttuple.slot = localslot;
3519
3523 }
3524
3525 /*
3526 * Apply the update to the local tuple, putting the result in
3527 * remoteslot_part.
3528 */
3531 newtup);
3533
3534 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3535
3536 /*
3537 * Does the updated tuple still satisfy the current
3538 * partition's constraint?
3539 */
3540 if (!partrel->rd_rel->relispartition ||
3542 false))
3543 {
3544 /*
3545 * Yes, so simply UPDATE the partition. We don't call
3546 * apply_handle_update_internal() here, which would
3547 * normally do the following work, to avoid repeating some
3548 * work already done above to find the local tuple in the
3549 * partition.
3550 */
3552
3554 TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
3555 ACL_UPDATE);
3556 ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3558 }
3559 else
3560 {
3561 /* Move the tuple into the new partition. */
3562
3563 /*
3564 * New partition will be found using tuple routing, which
3565 * can only occur via the parent table. We might need to
3566 * convert the tuple to the parent's rowtype. Note that
3567 * this is the tuple found in the partition, not the
3568 * original search tuple received by this function.
3569 */
3570 if (map)
3571 {
3575
3576 remoteslot =
3579 }
3580 else
3581 {
3584 }
3585
3586 /* Find the new partition. */
3589 proute, remoteslot,
3590 estate);
3593 partrel_new = partrelinfo_new->ri_RelationDesc;
3594
3595 /* Check that new partition also has supported relkind. */
3596 CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3597 relmapentry->remoterel.relkind,
3600
3601 /* DELETE old tuple found in the old partition. */
3602 EvalPlanQualSetSlot(&epqstate, localslot);
3603 TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);
3604 ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
3605
3606 /* INSERT new tuple into the new partition. */
3607
3608 /*
3609 * Convert the replacement tuple to match the destination
3610 * partition rowtype.
3611 */
3613 remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3614 if (remoteslot_part == NULL)
3616 &estate->es_tupleTable);
3618 if (map != NULL)
3619 {
3621 remoteslot,
3623 }
3624 else
3625 {
3627 remoteslot);
3629 }
3633 }
3634
3635 EvalPlanQualEnd(&epqstate);
3636 }
3637 break;
3638
3639 default:
3640 elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3641 break;
3642 }
3643}
3644
3645/*
3646 * Handle TRUNCATE message.
3647 *
3648 * TODO: FDW support
3649 */
3650static void
3652{
3653 bool cascade = false;
3654 bool restart_seqs = false;
3656 List *remote_rels = NIL;
3657 List *rels = NIL;
3658 List *part_rels = NIL;
3659 List *relids = NIL;
3661 ListCell *lc;
3662 LOCKMODE lockmode = AccessExclusiveLock;
3663
3664 /*
3665 * Quick return if we are skipping data modification changes or handling
3666 * streamed transactions.
3667 */
3668 if (is_skipping_changes() ||
3670 return;
3671
3673
3674 remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3675
3676 foreach(lc, remote_relids)
3677 {
3678 LogicalRepRelId relid = lfirst_oid(lc);
3680
3681 rel = logicalrep_rel_open(relid, lockmode);
3683 {
3684 /*
3685 * The relation can't become interesting in the middle of the
3686 * transaction so it's safe to unlock it.
3687 */
3688 logicalrep_rel_close(rel, lockmode);
3689 continue;
3690 }
3691
3694 rels = lappend(rels, rel->localrel);
3695 relids = lappend_oid(relids, rel->localreloid);
3698
3699 /*
3700 * Truncate partitions if we got a message to truncate a partitioned
3701 * table.
3702 */
3703 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3704 {
3705 ListCell *child;
3706 List *children = find_all_inheritors(rel->localreloid,
3707 lockmode,
3708 NULL);
3709
3710 foreach(child, children)
3711 {
3712 Oid childrelid = lfirst_oid(child);
3714
3715 if (list_member_oid(relids, childrelid))
3716 continue;
3717
3718 /* find_all_inheritors already got lock */
3720
3721 /*
3722 * Ignore temp tables of other backends. See similar code in
3723 * ExecuteTruncate().
3724 */
3726 {
3727 table_close(childrel, lockmode);
3728 continue;
3729 }
3730
3732 rels = lappend(rels, childrel);
3734 relids = lappend_oid(relids, childrelid);
3735 /* Log this relation only if needed for logical decoding */
3738 }
3739 }
3740 }
3741
3742 /*
3743 * Even if we used CASCADE on the upstream primary we explicitly default
3744 * to replaying changes without further cascading. This might be later
3745 * changeable with a user specified option.
3746 *
3747 * MySubscription->runasowner tells us whether we want to execute
3748 * replication actions as the subscription owner; the last argument to
3749 * TruncateGuts tells it whether we want to switch to the table owner.
3750 * Those are exactly opposite conditions.
3751 */
3753 relids,
3756 restart_seqs,
3758 foreach(lc, remote_rels)
3759 {
3761
3763 }
3764 foreach(lc, part_rels)
3765 {
3766 Relation rel = lfirst(lc);
3767
3768 table_close(rel, NoLock);
3769 }
3770
3772}
3773
3774
3775/*
3776 * Logical replication protocol message dispatcher.
3777 */
3778void
3780{
3781 LogicalRepMsgType action = pq_getmsgbyte(s);
3783
3784 /*
3785 * Set the current command being applied. Since this function can be
3786 * called recursively when applying spooled changes, save the current
3787 * command.
3788 */
3791
3792 switch (action)
3793 {
3796 break;
3797
3800 break;
3801
3804 break;
3805
3808 break;
3809
3812 break;
3813
3816 break;
3817
3820 break;
3821
3824 break;
3825
3828 break;
3829
3831
3832 /*
3833 * Logical replication does not use generic logical messages yet.
3834 * Although, it could be used by other applications that use this
3835 * output plugin.
3836 */
3837 break;
3838
3841 break;
3842
3845 break;
3846
3849 break;
3850
3853 break;
3854
3857 break;
3858
3861 break;
3862
3865 break;
3866
3869 break;
3870
3873 break;
3874
3875 default:
3876 ereport(ERROR,
3878 errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3879 }
3880
3881 /* Reset the current command */
3883}
3884
3885/*
3886 * Figure out which write/flush positions to report to the walsender process.
3887 *
3888 * We can't simply report back the last LSN the walsender sent us because the
3889 * local transaction might not yet be flushed to disk locally. Instead we
3890 * build a list that associates local with remote LSNs for every commit. When
3891 * reporting back the flush position to the sender we iterate that list and
3892 * check which entries on it are already locally flushed. Those we can report
3893 * as having been flushed.
3894 *
3895 * The have_pending_txes is true if there are outstanding transactions that
3896 * need to be flushed.
3897 */
3898static void
3900 bool *have_pending_txes)
3901{
3902 dlist_mutable_iter iter;
3904
3906 *flush = InvalidXLogRecPtr;
3907
3909 {
3910 FlushPosition *pos =
3911 dlist_container(FlushPosition, node, iter.cur);
3912
3913 *write = pos->remote_end;
3914
3915 if (pos->local_end <= local_flush)
3916 {
3917 *flush = pos->remote_end;
3918 dlist_delete(iter.cur);
3919 pfree(pos);
3920 }
3921 else
3922 {
3923 /*
3924 * Don't want to uselessly iterate over the rest of the list which
3925 * could potentially be long. Instead get the last element and
3926 * grab the write position from there.
3927 */
3929 &lsn_mapping);
3930 *write = pos->remote_end;
3931 *have_pending_txes = true;
3932 return;
3933 }
3934 }
3935
3937}
3938
3939/*
3940 * Store current remote/local lsn pair in the tracking list.
3941 */
3942void
3944{
3946
3947 /*
3948 * Skip for parallel apply workers, because the lsn_mapping is maintained
3949 * by the leader apply worker.
3950 */
3952 return;
3953
3954 /* Need to do this in permanent context */
3956
3957 /* Track commit lsn */
3959 flushpos->local_end = local_lsn;
3960 flushpos->remote_end = remote_lsn;
3961
3964}
3965
3966
3967/* Update statistics of the worker. */
3968static void
3980
3981/*
3982 * Apply main loop.
3983 */
3984static void
3986{
3988 bool ping_sent = false;
3989 TimeLineID tli;
3990 ErrorContextCallback errcallback;
3992
3993 /*
3994 * Init the ApplyMessageContext which we clean up after each replication
3995 * protocol message.
3996 */
3998 "ApplyMessageContext",
4000
4001 /*
4002 * This memory context is used for per-stream data when the streaming mode
4003 * is enabled. This context is reset on each stream stop.
4004 */
4006 "LogicalStreamingContext",
4008
4009 /* mark as idle, before starting to loop */
4011
4012 /*
4013 * Push apply error context callback. Fields will be filled while applying
4014 * a change.
4015 */
4016 errcallback.callback = apply_error_callback;
4017 errcallback.previous = error_context_stack;
4018 error_context_stack = &errcallback;
4020
4021 /* This outer loop iterates once per wait. */
4022 for (;;)
4023 {
4025 int rc;
4026 int len;
4027 char *buf = NULL;
4028 bool endofstream = false;
4029 long wait_time;
4030
4032
4034
4036
4037 if (len != 0)
4038 {
4039 /* Loop to process all available data (without blocking). */
4040 for (;;)
4041 {
4043
4044 if (len == 0)
4045 {
4046 break;
4047 }
4048 else if (len < 0)
4049 {
4050 ereport(LOG,
4051 (errmsg("data stream from publisher has ended")));
4052 endofstream = true;
4053 break;
4054 }
4055 else
4056 {
4057 int c;
4059
4061 {
4062 ConfigReloadPending = false;
4064 }
4065
4066 /* Reset timeout. */
4068 ping_sent = false;
4069
4070 rdt_data.last_recv_time = last_recv_timestamp;
4071
4072 /* Ensure we are reading the data into our memory context. */
4074
4076
4077 c = pq_getmsgbyte(&s);
4078
4079 if (c == PqReplMsg_WALData)
4080 {
4081 XLogRecPtr start_lsn;
4082 XLogRecPtr end_lsn;
4084
4085 start_lsn = pq_getmsgint64(&s);
4086 end_lsn = pq_getmsgint64(&s);
4088
4089 if (last_received < start_lsn)
4090 last_received = start_lsn;
4091
4092 if (last_received < end_lsn)
4093 last_received = end_lsn;
4094
4096
4097 apply_dispatch(&s);
4098
4100 }
4101 else if (c == PqReplMsg_Keepalive)
4102 {
4103 XLogRecPtr end_lsn;
4105 bool reply_requested;
4106
4107 end_lsn = pq_getmsgint64(&s);
4110
4111 if (last_received < end_lsn)
4112 last_received = end_lsn;
4113
4115
4117
4119 }
4120 else if (c == PqReplMsg_PrimaryStatusUpdate)
4121 {
4122 rdt_data.remote_lsn = pq_getmsgint64(&s);
4123 rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
4125 rdt_data.reply_time = pq_getmsgint64(&s);
4126
4127 /*
4128 * This should never happen, see
4129 * ProcessStandbyPSRequestMessage. But if it happens
4130 * due to a bug, we don't want to proceed as it can
4131 * incorrectly advance oldest_nonremovable_xid.
4132 */
4133 if (!XLogRecPtrIsValid(rdt_data.remote_lsn))
4134 elog(ERROR, "cannot get the latest WAL position from the publisher");
4135
4137
4138 UpdateWorkerStats(last_received, rdt_data.reply_time, false);
4139 }
4140 /* other message types are purposefully ignored */
4141
4143 }
4144
4146 }
4147 }
4148
4149 /* confirm all writes so far */
4150 send_feedback(last_received, false, false);
4151
4152 /* Reset the timestamp if no message was received */
4153 rdt_data.last_recv_time = 0;
4154
4156
4158 {
4159 /*
4160 * If we didn't get any transactions for a while there might be
4161 * unconsumed invalidation messages in the queue, consume them
4162 * now.
4163 */
4166
4167 /*
4168 * Process any relations that are being synchronized in parallel
4169 * and any newly added tables or sequences.
4170 */
4172 }
4173
4174 /* Cleanup the memory. */
4177
4178 /* Check if we need to exit the streaming loop. */
4179 if (endofstream)
4180 break;
4181
4182 /*
4183 * Wait for more data or latch. If we have unflushed transactions,
4184 * wake up after WalWriterDelay to see if they've been flushed yet (in
4185 * which case we should send a feedback message). Otherwise, there's
4186 * no particular urgency about waking up unless we get data or a
4187 * signal.
4188 */
4191 else
4193
4194 /*
4195 * Ensure to wake up when it's possible to advance the non-removable
4196 * transaction ID, or when the retention duration may have exceeded
4197 * max_retention_duration.
4198 */
4200 {
4201 if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
4202 rdt_data.xid_advance_interval)
4203 wait_time = Min(wait_time, rdt_data.xid_advance_interval);
4204 else if (MySubscription->maxretention > 0)
4206 }
4207
4211 fd, wait_time,
4213
4214 if (rc & WL_LATCH_SET)
4215 {
4218 }
4219
4221 {
4222 ConfigReloadPending = false;
4224 }
4225
4226 if (rc & WL_TIMEOUT)
4227 {
4228 /*
4229 * We didn't receive anything new. If we haven't heard anything
4230 * from the server for more than wal_receiver_timeout / 2, ping
4231 * the server. Also, if it's been longer than
4232 * wal_receiver_status_interval since the last update we sent,
4233 * send a status update to the primary anyway, to report any
4234 * progress in applying WAL.
4235 */
4236 bool requestReply = false;
4237
4238 /*
4239 * Check if time since last receive from primary has reached the
4240 * configured limit.
4241 */
4242 if (wal_receiver_timeout > 0)
4243 {
4246
4247 timeout =
4250
4251 if (now >= timeout)
4252 ereport(ERROR,
4254 errmsg("terminating logical replication worker due to timeout")));
4255
4256 /* Check to see if it's time for a ping. */
4257 if (!ping_sent)
4258 {
4260 (wal_receiver_timeout / 2));
4261 if (now >= timeout)
4262 {
4263 requestReply = true;
4264 ping_sent = true;
4265 }
4266 }
4267 }
4268
4270
4272
4273 /*
4274 * Force reporting to ensure long idle periods don't lead to
4275 * arbitrarily delayed stats. Stats can only be reported outside
4276 * of (implicit or explicit) transactions. That shouldn't lead to
4277 * stats being delayed for long, because transactions are either
4278 * sent as a whole on commit or streamed. Streamed transactions
4279 * are spilled to disk and applied on commit.
4280 */
4281 if (!IsTransactionState())
4282 pgstat_report_stat(true);
4283 }
4284 }
4285
4286 /* Pop the error context stack */
4287 error_context_stack = errcallback.previous;
4289
4290 /* All done */
4292}
4293
4294/*
4295 * Send a Standby Status Update message to server.
4296 *
4297 * 'recvpos' is the latest LSN we've received data to, force is set if we need
4298 * to send a response to avoid timeouts.
4299 */
4300static void
4302{
4303 static StringInfo reply_message = NULL;
4304 static TimestampTz send_time = 0;
4305
4308
4312 bool have_pending_txes;
4313
4314 /*
4315 * If the user doesn't want status to be reported to the publisher, be
4316 * sure to exit before doing anything at all.
4317 */
4318 if (!force && wal_receiver_status_interval <= 0)
4319 return;
4320
4321 /* It's legal to not pass a recvpos */
4322 if (recvpos < last_recvpos)
4324
4326
4327 /*
4328 * No outstanding transactions to flush, we can report the latest received
4329 * position. This is important for synchronous replication.
4330 */
4331 if (!have_pending_txes)
4333
4334 if (writepos < last_writepos)
4336
4337 if (flushpos < last_flushpos)
4339
4341
4342 /* if we've already reported everything we're good */
4343 if (!force &&
4348 return;
4349 send_time = now;
4350
4351 if (!reply_message)
4352 {
4354
4357 }
4358 else
4360
4362 pq_sendint64(reply_message, recvpos); /* write */
4363 pq_sendint64(reply_message, flushpos); /* flush */
4364 pq_sendint64(reply_message, writepos); /* apply */
4365 pq_sendint64(reply_message, now); /* sendTime */
4366 pq_sendbyte(reply_message, requestReply); /* replyRequested */
4367
4368 elog(DEBUG2, "sending feedback (force %d) to recv %X/%08X, write %X/%08X, flush %X/%08X",
4369 force,
4373
4376
4377 if (recvpos > last_recvpos)
4379 if (writepos > last_writepos)
4381 if (flushpos > last_flushpos)
4383}
4384
4385/*
4386 * Attempt to advance the non-removable transaction ID.
4387 *
4388 * See comments atop worker.c for details.
4389 */
4390static void
4399
4400/*
4401 * Preliminary check to determine if advancing the non-removable transaction ID
4402 * is allowed.
4403 */
4404static bool
4406{
4407 /*
4408 * It is sufficient to manage non-removable transaction ID for a
4409 * subscription by the main apply worker to detect update_deleted reliably
4410 * even for table sync or parallel apply workers.
4411 */
4413 return false;
4414
4415 /* No need to advance if retaining dead tuples is not required */
4417 return false;
4418
4419 return true;
4420}
4421
4422/*
4423 * Process phase transitions during the non-removable transaction ID
4424 * advancement. See comments atop worker.c for details of the transition.
4425 */
4426static void
4452
4453/*
4454 * Workhorse for the RDT_GET_CANDIDATE_XID phase.
4455 */
4456static void
4458{
4461
4462 /*
4463 * Use last_recv_time when applying changes in the loop to avoid
4464 * unnecessary system time retrieval. If last_recv_time is not available,
4465 * obtain the current timestamp.
4466 */
4467 now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4468
4469 /*
4470 * Compute the candidate_xid and request the publisher status at most once
4471 * per xid_advance_interval. Refer to adjust_xid_advance_interval() for
4472 * details on how this value is dynamically adjusted. This is to avoid
4473 * using CPU and network resources without making much progress.
4474 */
4475 if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4476 rdt_data->xid_advance_interval))
4477 return;
4478
4479 /*
4480 * Immediately update the timer, even if the function returns later
4481 * without setting candidate_xid due to inactivity on the subscriber. This
4482 * avoids frequent calls to GetOldestActiveTransactionId.
4483 */
4484 rdt_data->candidate_xid_time = now;
4485
4486 /*
4487 * Consider transactions in the current database, as only dead tuples from
4488 * this database are required for conflict detection.
4489 */
4491
4492 /*
4493 * Oldest active transaction ID (oldest_running_xid) can't be behind any
4494 * of its previously computed value.
4495 */
4498
4499 /* Return if the oldest_nonremovable_xid cannot be advanced */
4502 {
4504 return;
4505 }
4506
4508
4509 rdt_data->candidate_xid = oldest_running_xid;
4511
4512 /* process the next phase */
4514}
4515
4516/*
4517 * Workhorse for the RDT_REQUEST_PUBLISHER_STATUS phase.
4518 */
4519static void
4521{
4523
4524 if (!request_message)
4525 {
4527
4530 }
4531 else
4533
4534 /*
4535 * Send the current time to update the remote walsender's latest reply
4536 * message received time.
4537 */
4540
4541 elog(DEBUG2, "sending publisher status request message");
4542
4543 /* Send a request for the publisher status */
4545 request_message->data, request_message->len);
4546
4548
4549 /*
4550 * Skip calling maybe_advance_nonremovable_xid() since further transition
4551 * is possible only once we receive the publisher status message.
4552 */
4553}
4554
4555/*
4556 * Workhorse for the RDT_WAIT_FOR_PUBLISHER_STATUS phase.
4557 */
4558static void
4560 bool status_received)
4561{
4562 /*
4563 * Return if we have requested but not yet received the publisher status.
4564 */
4565 if (!status_received)
4566 return;
4567
4568 /*
4569 * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4570 * retaining conflict information for this worker.
4571 */
4573 {
4575 return;
4576 }
4577
4578 if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
4579 rdt_data->remote_wait_for = rdt_data->remote_nextxid;
4580
4581 /*
4582 * Check if all remote concurrent transactions that were active at the
4583 * first status request have now completed. If completed, proceed to the
4584 * next phase; otherwise, continue checking the publisher status until
4585 * these transactions finish.
4586 *
4587 * It's possible that transactions in the commit phase during the last
4588 * cycle have now finished committing, but remote_oldestxid remains older
4589 * than remote_wait_for. This can happen if some old transaction came in
4590 * the commit phase when we requested status in this cycle. We do not
4591 * handle this case explicitly as it's rare and the benefit doesn't
4592 * justify the required complexity. Tracking would require either caching
4593 * all xids at the publisher or sending them to subscribers. The condition
4594 * will resolve naturally once the remaining transactions are finished.
4595 *
4596 * Directly advancing the non-removable transaction ID is possible if
4597 * there are no activities on the publisher since the last advancement
4598 * cycle. However, it requires maintaining two fields, last_remote_nextxid
4599 * and last_remote_lsn, within the structure for comparison with the
4600 * current cycle's values. Considering the minimal cost of continuing in
4601 * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to
4602 * advance the transaction ID here.
4603 */
4604 if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for,
4605 rdt_data->remote_oldestxid))
4607 else
4609
4610 /* process the next phase */
4612}
4613
4614/*
4615 * Workhorse for the RDT_WAIT_FOR_LOCAL_FLUSH phase.
4616 */
4617static void
4619{
4620 Assert(XLogRecPtrIsValid(rdt_data->remote_lsn) &&
4621 TransactionIdIsValid(rdt_data->candidate_xid));
4622
4623 /*
4624 * We expect the publisher and subscriber clocks to be in sync using time
4625 * sync service like NTP. Otherwise, we will advance this worker's
4626 * oldest_nonremovable_xid prematurely, leading to the removal of rows
4627 * required to detect update_deleted reliably. This check primarily
4628 * addresses scenarios where the publisher's clock falls behind; if the
4629 * publisher's clock is ahead, subsequent transactions will naturally bear
4630 * later commit timestamps, conforming to the design outlined atop
4631 * worker.c.
4632 *
4633 * XXX Consider waiting for the publisher's clock to catch up with the
4634 * subscriber's before proceeding to the next phase.
4635 */
4636 if (TimestampDifferenceExceeds(rdt_data->reply_time,
4637 rdt_data->candidate_xid_time, 0))
4638 ereport(ERROR,
4639 errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"),
4640 errdetail_internal("The clock on the publisher is behind that of the subscriber."));
4641
4642 /*
4643 * Do not attempt to advance the non-removable transaction ID when table
4644 * sync is in progress. During this time, changes from a single
4645 * transaction may be applied by multiple table sync workers corresponding
4646 * to the target tables. So, it's necessary for all table sync workers to
4647 * apply and flush the corresponding changes before advancing the
4648 * transaction ID, otherwise, dead tuples that are still needed for
4649 * conflict detection in table sync workers could be removed prematurely.
4650 * However, confirming the apply and flush progress across all table sync
4651 * workers is complex and not worth the effort, so we simply return if not
4652 * all tables are in the READY state.
4653 *
4654 * Advancing the transaction ID is necessary even when no tables are
4655 * currently subscribed, to avoid retaining dead tuples unnecessarily.
4656 * While it might seem safe to skip all phases and directly assign
4657 * candidate_xid to oldest_nonremovable_xid during the
4658 * RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users
4659 * concurrently add tables to the subscription, the apply worker may not
4660 * process invalidations in time. Consequently,
4661 * HasSubscriptionTablesCached() might miss the new tables, leading to
4662 * premature advancement of oldest_nonremovable_xid.
4663 *
4664 * Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as
4665 * invalidations are guaranteed to be processed before applying changes
4666 * from newly added tables while waiting for the local flush to reach
4667 * remote_lsn.
4668 *
4669 * Additionally, even if we check for subscription tables during
4670 * RDT_GET_CANDIDATE_XID, they might be dropped before reaching
4671 * RDT_WAIT_FOR_LOCAL_FLUSH. Therefore, it's still necessary to verify
4672 * subscription tables at this stage to prevent unnecessary tuple
4673 * retention.
4674 */
4676 {
4678
4679 now = rdt_data->last_recv_time
4680 ? rdt_data->last_recv_time : GetCurrentTimestamp();
4681
4682 /*
4683 * Record the time spent waiting for table sync, it is needed for the
4684 * timeout check in should_stop_conflict_info_retention().
4685 */
4686 rdt_data->table_sync_wait_time =
4687 TimestampDifferenceMilliseconds(rdt_data->candidate_xid_time, now);
4688
4689 return;
4690 }
4691
4692 /*
4693 * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4694 * retaining conflict information for this worker.
4695 */
4697 {
4699 return;
4700 }
4701
4702 /*
4703 * Update and check the remote flush position if we are applying changes
4704 * in a loop. This is done at most once per WalWriterDelay to avoid
4705 * performing costly operations in get_flush_position() too frequently
4706 * during change application.
4707 */
4708 if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time &&
4709 TimestampDifferenceExceeds(rdt_data->flushpos_update_time,
4710 rdt_data->last_recv_time, WalWriterDelay))
4711 {
4714 bool have_pending_txes;
4715
4716 /* Fetch the latest remote flush position */
4718
4719 if (flushpos > last_flushpos)
4721
4722 rdt_data->flushpos_update_time = rdt_data->last_recv_time;
4723 }
4724
4725 /* Return to wait for the changes to be applied */
4726 if (last_flushpos < rdt_data->remote_lsn)
4727 return;
4728
4729 /*
4730 * Reaching this point implies should_stop_conflict_info_retention()
4731 * returned false earlier, meaning that the most recent duration for
4732 * advancing the non-removable transaction ID is within the
4733 * max_retention_duration or max_retention_duration is set to 0.
4734 *
4735 * Therefore, if conflict info retention was previously stopped due to a
4736 * timeout, it is now safe to resume retention.
4737 */
4739 {
4741 return;
4742 }
4743
4744 /*
4745 * Reaching here means the remote WAL position has been received, and all
4746 * transactions up to that position on the publisher have been applied and
4747 * flushed locally. So, we can advance the non-removable transaction ID.
4748 */
4752
4753 elog(DEBUG2, "confirmed flush up to remote lsn %X/%08X: new oldest_nonremovable_xid %u",
4754 LSN_FORMAT_ARGS(rdt_data->remote_lsn),
4755 rdt_data->candidate_xid);
4756
4757 /* Notify launcher to update the xmin of the conflict slot */
4759
4761
4762 /* process the next phase */
4764}
4765
4766/*
4767 * Check whether conflict information retention should be stopped due to
4768 * exceeding the maximum wait time (max_retention_duration).
4769 *
4770 * If retention should be stopped, return true. Otherwise, return false.
4771 */
4772static bool
4774{
4776
4777 Assert(TransactionIdIsValid(rdt_data->candidate_xid));
4780
4782 return false;
4783
4784 /*
4785 * Use last_recv_time when applying changes in the loop to avoid
4786 * unnecessary system time retrieval. If last_recv_time is not available,
4787 * obtain the current timestamp.
4788 */
4789 now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4790
4791 /*
4792 * Return early if the wait time has not exceeded the configured maximum
4793 * (max_retention_duration). Time spent waiting for table synchronization
4794 * is excluded from this calculation, as it occurs infrequently.
4795 */
4796 if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4798 rdt_data->table_sync_wait_time))
4799 return false;
4800
4801 return true;
4802}
4803
4804/*
4805 * Workhorse for the RDT_STOP_CONFLICT_INFO_RETENTION phase.
4806 */
4807static void
4809{
4810 /* Stop retention if not yet */
4812 {
4813 /*
4814 * If the retention status cannot be updated (e.g., due to active
4815 * transaction), skip further processing to avoid inconsistent
4816 * retention behavior.
4817 */
4818 if (!update_retention_status(false))
4819 return;
4820
4824
4825 ereport(LOG,
4826 errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
4828 errdetail("Retention is stopped because the apply process has not caught up with the publisher within the configured max_retention_duration."));
4829 }
4830
4832
4833 /*
4834 * If retention has been stopped, reset to the initial phase to retry
4835 * resuming retention. This reset is required to recalculate the current
4836 * wait time and resume retention if the time falls within
4837 * max_retention_duration.
4838 */
4840}
4841
4842/*
4843 * Workhorse for the RDT_RESUME_CONFLICT_INFO_RETENTION phase.
4844 */
4845static void
4847{
4848 /* We can't resume retention without updating retention status. */
4849 if (!update_retention_status(true))
4850 return;
4851
4852 ereport(LOG,
4853 errmsg("logical replication worker for subscription \"%s\" will resume retaining the information for detecting conflicts",
4856 ? errdetail("Retention is re-enabled because the apply process has caught up with the publisher within the configured max_retention_duration.")
4857 : errdetail("Retention is re-enabled because max_retention_duration has been set to unlimited."));
4858
4859 /*
4860 * Restart the worker to let the launcher initialize
4861 * oldest_nonremovable_xid at startup.
4862 *
4863 * While it's technically possible to derive this value on-the-fly using
4864 * the conflict detection slot's xmin, doing so risks a race condition:
4865 * the launcher might clean slot.xmin just after retention resumes. This
4866 * would make oldest_nonremovable_xid unreliable, especially during xid
4867 * wraparound.
4868 *
4869 * Although this can be prevented by introducing heavy weight locking, the
4870 * complexity it will bring doesn't seem worthwhile given how rarely
4871 * retention is resumed.
4872 */
4874}
4875
4876/*
4877 * Updates pg_subscription.subretentionactive to the given value within a
4878 * new transaction.
4879 *
4880 * If already inside an active transaction, skips the update and returns
4881 * false.
4882 *
4883 * Returns true if the update is successfully performed.
4884 */
4885static bool
4887{
4888 /*
4889 * Do not update the catalog during an active transaction. The transaction
4890 * may be started during change application, leading to a possible
4891 * rollback of catalog updates if the application fails subsequently.
4892 */
4893 if (IsTransactionState())
4894 return false;
4895
4897
4898 /*
4899 * Updating pg_subscription might involve TOAST table access, so ensure we
4900 * have a valid snapshot.
4901 */
4903
4904 /* Update pg_subscription.subretentionactive */
4906
4909
4910 /* Notify launcher to update the conflict slot */
4912
4914
4915 return true;
4916}
4917
4918/*
4919 * Reset all data fields of RetainDeadTuplesData except those used to
4920 * determine the timing for the next round of transaction ID advancement. We
4921 * can even use flushpos_update_time in the next round to decide whether to get
4922 * the latest flush position.
4923 */
4924static void
4926{
4928 rdt_data->remote_lsn = InvalidXLogRecPtr;
4929 rdt_data->remote_oldestxid = InvalidFullTransactionId;
4930 rdt_data->remote_nextxid = InvalidFullTransactionId;
4931 rdt_data->reply_time = 0;
4932 rdt_data->remote_wait_for = InvalidFullTransactionId;
4933 rdt_data->candidate_xid = InvalidTransactionId;
4934 rdt_data->table_sync_wait_time = 0;
4935}
4936
4937/*
4938 * Adjust the interval for advancing non-removable transaction IDs.
4939 *
4940 * If there is no activity on the node or retention has been stopped, we
4941 * progressively double the interval used to advance non-removable transaction
4942 * ID. This helps conserve CPU and network resources when there's little benefit
4943 * to frequent updates.
4944 *
4945 * The interval is capped by the lowest of the following:
4946 * - wal_receiver_status_interval (if set and retention is active),
4947 * - a default maximum of 3 minutes,
4948 * - max_retention_duration (if retention is active).
4949 *
4950 * This ensures the interval never exceeds the retention boundary, even if other
4951 * limits are higher. Once activity resumes on the node and the retention is
4952 * active, the interval is reset to lesser of 100ms and max_retention_duration,
4953 * allowing timely advancement of non-removable transaction ID.
4954 *
4955 * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
4956 * consider the other interval or a separate GUC if the need arises.
4957 */
4958static void
4960{
4961 if (rdt_data->xid_advance_interval && !new_xid_found)
4962 {
4966
4967 /*
4968 * No new transaction ID has been assigned since the last check, so
4969 * double the interval, but not beyond the maximum allowable value.
4970 */
4971 rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4972 max_interval);
4973 }
4974 else if (rdt_data->xid_advance_interval &&
4976 {
4977 /*
4978 * Retention has been stopped, so double the interval-capped at a
4979 * maximum of 3 minutes. The wal_receiver_status_interval is
4980 * intentionally not used as a upper bound, since the likelihood of
4981 * retention resuming is lower than that of general activity resuming.
4982 */
4983 rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4985 }
4986 else
4987 {
4988 /*
4989 * A new transaction ID was found or the interval is not yet
4990 * initialized, so set the interval to the minimum value.
4991 */
4992 rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
4993 }
4994
4995 /*
4996 * Ensure the wait time remains within the maximum retention time limit
4997 * when retention is active.
4998 */
5000 rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
5002}
5003
5004/*
5005 * Exit routine for apply workers due to subscription parameter changes.
5006 */
5007static void
5009{
5011 {
5012 /*
5013 * Don't stop the parallel apply worker as the leader will detect the
5014 * subscription parameter change and restart logical replication later
5015 * anyway. This also prevents the leader from reporting errors when
5016 * trying to communicate with a stopped parallel apply worker, which
5017 * would accidentally disable subscriptions if disable_on_error was
5018 * set.
5019 */
5020 return;
5021 }
5022
5023 /*
5024 * Reset the last-start time for this apply worker so that the launcher
5025 * will restart it without waiting for wal_retrieve_retry_interval if the
5026 * subscription is still active, and so that we won't leak that hash table
5027 * entry if it isn't.
5028 */
5031
5032 proc_exit(0);
5033}
5034
5035/*
5036 * Reread subscription info if needed.
5037 *
5038 * For significant changes, we react by exiting the current process; a new
5039 * one will be launched afterwards if needed.
5040 */
5041void
5043{
5046 bool started_tx = false;
5047
5048 /* When cache state is valid there is nothing to do here. */
5050 return;
5051
5052 /* This function might be called inside or outside of transaction. */
5053 if (!IsTransactionState())
5054 {
5056 started_tx = true;
5057 }
5058
5059 /* Ensure allocations in permanent context. */
5061
5063
5064 /*
5065 * Exit if the subscription was removed. This normally should not happen
5066 * as the worker gets killed during DROP SUBSCRIPTION.
5067 */
5068 if (!newsub)
5069 {
5070 ereport(LOG,
5071 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
5072 MySubscription->name)));
5073
5074 /* Ensure we remove no-longer-useful entry for worker's start time */
5077
5078 proc_exit(0);
5079 }
5080
5081 /* Exit if the subscription was disabled. */
5082 if (!newsub->enabled)
5083 {
5084 ereport(LOG,
5085 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
5086 MySubscription->name)));
5087
5089 }
5090
5091 /* !slotname should never happen when enabled is true. */
5092 Assert(newsub->slotname);
5093
5094 /* two-phase cannot be altered while the worker is running */
5095 Assert(newsub->twophasestate == MySubscription->twophasestate);
5096
5097 /*
5098 * Exit if any parameter that affects the remote connection was changed.
5099 * The launcher will start a new worker but note that the parallel apply
5100 * worker won't restart if the streaming option's value is changed from
5101 * 'parallel' to any other value or the server decides not to stream the
5102 * in-progress transaction.
5103 */
5104 if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
5105 strcmp(newsub->name, MySubscription->name) != 0 ||
5106 strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
5107 newsub->binary != MySubscription->binary ||
5108 newsub->stream != MySubscription->stream ||
5109 newsub->passwordrequired != MySubscription->passwordrequired ||
5110 strcmp(newsub->origin, MySubscription->origin) != 0 ||
5111 newsub->owner != MySubscription->owner ||
5112 !equal(newsub->publications, MySubscription->publications))
5113 {
5115 ereport(LOG,
5116 (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
5117 MySubscription->name)));
5118 else
5119 ereport(LOG,
5120 (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
5121 MySubscription->name)));
5122
5124 }
5125
5126 /*
5127 * Exit if the subscription owner's superuser privileges have been
5128 * revoked.
5129 */
5130 if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
5131 {
5133 ereport(LOG,
5134 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
5136 else
5137 ereport(LOG,
5138 errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
5140
5142 }
5143
5144 /* Check for other changes that should never happen too. */
5145 if (newsub->dbid != MySubscription->dbid)
5146 {
5147 elog(ERROR, "subscription %u changed unexpectedly",
5149 }
5150
5151 /* Clean old subscription info and switch to new one. */
5154
5156
5157 /* Change synchronous commit according to the user's wishes */
5158 SetConfigOption("synchronous_commit", MySubscription->synccommit,
5160
5161 /* Change wal_receiver_timeout according to the user's wishes */
5163
5164 if (started_tx)
5166
5167 MySubscriptionValid = true;
5168}
5169
5170/*
5171 * Change wal_receiver_timeout to MySubscription->walrcvtimeout.
5172 */
5173static void
5175{
5176 bool parsed;
5177 int val;
5179
5180 /*
5181 * Set the wal_receiver_timeout GUC to MySubscription->walrcvtimeout,
5182 * which comes from the subscription's wal_receiver_timeout option. If the
5183 * value is -1, reset the GUC to its default, meaning it will inherit from
5184 * the server config, command line, or role/database settings.
5185 */
5187 if (parsed && val == -1)
5188 SetConfigOption("wal_receiver_timeout", NULL,
5190 else
5191 SetConfigOption("wal_receiver_timeout", MySubscription->walrcvtimeout,
5193
5194 /*
5195 * Log the wal_receiver_timeout setting (in milliseconds) as a debug
5196 * message when it changes, to verify it was set correctly.
5197 */
5199 elog(DEBUG1, "logical replication worker for subscription \"%s\" wal_receiver_timeout: %d ms",
5201}
5202
5203/*
5204 * Callback from subscription syscache invalidation. Also needed for server or
5205 * user mapping invalidation, which can change the connection information for
5206 * subscriptions that connect using a server object.
5207 */
5208static void
5213
5214/*
5215 * subxact_info_write
5216 * Store information about subxacts for a toplevel transaction.
5217 *
5218 * For each subxact we store offset of its first change in the main file.
5219 * The file is always over-written as a whole.
5220 *
5221 * XXX We should only store subxacts that were not aborted yet.
5222 */
5223static void
5225{
5226 char path[MAXPGPATH];
5227 Size len;
5228 BufFile *fd;
5229
5231
5232 /* construct the subxact filename */
5233 subxact_filename(path, subid, xid);
5234
5235 /* Delete the subxacts file, if exists. */
5236 if (subxact_data.nsubxacts == 0)
5237 {
5240
5241 return;
5242 }
5243
5244 /*
5245 * Create the subxact file if it not already created, otherwise open the
5246 * existing file.
5247 */
5249 true);
5250 if (fd == NULL)
5252
5254
5255 /* Write the subxact count and subxact info */
5258
5260
5261 /* free the memory allocated for subxact info */
5263}
5264
5265/*
5266 * subxact_info_read
5267 * Restore information about subxacts of a streamed transaction.
5268 *
5269 * Read information about subxacts into the structure subxact_data that can be
5270 * used later.
5271 */
5272static void
5274{
5275 char path[MAXPGPATH];
5276 Size len;
5277 BufFile *fd;
5279
5283
5284 /*
5285 * If the subxact file doesn't exist that means we don't have any subxact
5286 * info.
5287 */
5288 subxact_filename(path, subid, xid);
5290 true);
5291 if (fd == NULL)
5292 return;
5293
5294 /* read number of subxact items */
5296
5298
5299 /* we keep the maximum as a power of 2 */
5301
5302 /*
5303 * Allocate subxact information in the logical streaming context. We need
5304 * this information during the complete stream so that we can add the sub
5305 * transaction info to this. On stream stop we will flush this information
5306 * to the subxact file and reset the logical streaming context.
5307 */
5312
5313 if (len > 0)
5315
5317}
5318
5319/*
5320 * subxact_info_add
5321 * Add information about a subxact (offset in the main file).
5322 */
5323static void
5325{
5326 SubXactInfo *subxacts = subxact_data.subxacts;
5327 int64 i;
5328
5329 /* We must have a valid top level stream xid and a stream fd. */
5331 Assert(stream_fd != NULL);
5332
5333 /*
5334 * If the XID matches the toplevel transaction, we don't want to add it.
5335 */
5336 if (stream_xid == xid)
5337 return;
5338
5339 /*
5340 * In most cases we're checking the same subxact as we've already seen in
5341 * the last call, so make sure to ignore it (this change comes later).
5342 */
5343 if (subxact_data.subxact_last == xid)
5344 return;
5345
5346 /* OK, remember we're processing this XID. */
5348
5349 /*
5350 * Check if the transaction is already present in the array of subxact. We
5351 * intentionally scan the array from the tail, because we're likely adding
5352 * a change for the most recent subtransactions.
5353 *
5354 * XXX Can we rely on the subxact XIDs arriving in sorted order? That
5355 * would allow us to use binary search here.
5356 */
5357 for (i = subxact_data.nsubxacts; i > 0; i--)
5358 {
5359 /* found, so we're done */
5360 if (subxacts[i - 1].xid == xid)
5361 return;
5362 }
5363
5364 /* This is a new subxact, so we need to add it to the array. */
5365 if (subxact_data.nsubxacts == 0)
5366 {
5368
5370
5371 /*
5372 * Allocate this memory for subxacts in per-stream context, see
5373 * subxact_info_read.
5374 */
5378 }
5380 {
5382 subxacts = repalloc_array(subxacts, SubXactInfo,
5384 }
5385
5386 subxacts[subxact_data.nsubxacts].xid = xid;
5387
5388 /*
5389 * Get the current offset of the stream file and store it as offset of
5390 * this subxact.
5391 */
5393 &subxacts[subxact_data.nsubxacts].fileno,
5394 &subxacts[subxact_data.nsubxacts].offset);
5395
5397 subxact_data.subxacts = subxacts;
5398}
5399
5400/* format filename for file containing the info about subxacts */
5401static inline void
5402subxact_filename(char *path, Oid subid, TransactionId xid)
5403{
5404 snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
5405}
5406
5407/* format filename for file containing serialized changes */
5408static inline void
5409changes_filename(char *path, Oid subid, TransactionId xid)
5410{
5411 snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
5412}
5413
5414/*
5415 * stream_cleanup_files
5416 * Cleanup files for a subscription / toplevel transaction.
5417 *
5418 * Remove files with serialized changes and subxact info for a particular
5419 * toplevel transaction. Each subscription has a separate set of files
5420 * for any toplevel transaction.
5421 */
5422void
5424{
5425 char path[MAXPGPATH];
5426
5427 /* Delete the changes file. */
5428 changes_filename(path, subid, xid);
5430
5431 /* Delete the subxact file, if it exists. */
5432 subxact_filename(path, subid, xid);
5434}
5435
5436/*
5437 * stream_open_file
5438 * Open a file that we'll use to serialize changes for a toplevel
5439 * transaction.
5440 *
5441 * Open a file for streamed changes from a toplevel transaction identified
5442 * by stream_xid (global variable). If it's the first chunk of streamed
5443 * changes for this transaction, create the buffile, otherwise open the
5444 * previously created file.
5445 */
5446static void
5448{
5449 char path[MAXPGPATH];
5451
5452 Assert(OidIsValid(subid));
5454 Assert(stream_fd == NULL);
5455
5456
5457 changes_filename(path, subid, xid);
5458 elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
5459
5460 /*
5461 * Create/open the buffiles under the logical streaming context so that we
5462 * have those files until stream stop.
5463 */
5465
5466 /*
5467 * If this is the first streamed segment, create the changes file.
5468 * Otherwise, just open the file for writing, in append mode.
5469 */
5470 if (first_segment)
5472 path);
5473 else
5474 {
5475 /*
5476 * Open the file and seek to the end of the file because we always
5477 * append the changes file.
5478 */
5480 path, O_RDWR, false);
5482 }
5483
5485}
5486
5487/*
5488 * stream_close_file
5489 * Close the currently open file with streamed changes.
5490 */
5491static void
5493{
5494 Assert(stream_fd != NULL);
5495
5497
5498 stream_fd = NULL;
5499}
5500
5501/*
5502 * stream_write_change
5503 * Serialize a change to a file for the current toplevel transaction.
5504 *
5505 * The change is serialized in a simple format, with length (not including
5506 * the length), action code (identifying the message type) and message
5507 * contents (without the subxact TransactionId value).
5508 */
5509static void
5511{
5512 int len;
5513
5514 Assert(stream_fd != NULL);
5515
5516 /* total on-disk size, including the action type character */
5517 len = (s->len - s->cursor) + sizeof(char);
5518
5519 /* first write the size */
5520 BufFileWrite(stream_fd, &len, sizeof(len));
5521
5522 /* then the action */
5523 BufFileWrite(stream_fd, &action, sizeof(action));
5524
5525 /* and finally the remaining part of the buffer (after the XID) */
5526 len = (s->len - s->cursor);
5527
5529}
5530
5531/*
5532 * stream_open_and_write_change
5533 * Serialize a message to a file for the given transaction.
5534 *
5535 * This function is similar to stream_write_change except that it will open the
5536 * target file if not already before writing the message and close the file at
5537 * the end.
5538 */
5539static void
5541{
5543
5544 if (!stream_fd)
5545 stream_start_internal(xid, false);
5546
5547 stream_write_change(action, s);
5549}
5550
5551/*
5552 * Sets streaming options including replication slot name and origin start
5553 * position. Workers need these options for logical replication.
5554 */
5555void
5557 char *slotname,
5559{
5560 int server_version;
5561
5562 options->logical = true;
5563 options->startpoint = *origin_startpos;
5564 options->slotname = slotname;
5565
5567 options->proto.logical.proto_version =
5572
5573 options->proto.logical.publication_names = MySubscription->publications;
5574 options->proto.logical.binary = MySubscription->binary;
5575
5576 /*
5577 * Assign the appropriate option value for streaming option according to
5578 * the 'streaming' mode and the publisher's ability to support that mode.
5579 */
5580 if (server_version >= 160000 &&
5582 {
5583 options->proto.logical.streaming_str = "parallel";
5585 }
5586 else if (server_version >= 140000 &&
5588 {
5589 options->proto.logical.streaming_str = "on";
5591 }
5592 else
5593 {
5594 options->proto.logical.streaming_str = NULL;
5596 }
5597
5598 options->proto.logical.twophase = false;
5599 options->proto.logical.origin = pstrdup(MySubscription->origin);
5600}
5601
5602/*
5603 * Cleanup the memory for subxacts and reset the related variables.
5604 */
5605static inline void
5616
5617/*
5618 * Common function to run the apply loop with error handling. Disable the
5619 * subscription, if necessary.
5620 *
5621 * Note that we don't handle FATAL errors which are probably because
5622 * of system resource error and are not repeatable.
5623 */
5624void
5626{
5627 PG_TRY();
5628 {
5630 }
5631 PG_CATCH();
5632 {
5633 /*
5634 * Reset the origin state to prevent the advancement of origin
5635 * progress if we fail to apply. Otherwise, this will result in
5636 * transaction loss as that transaction won't be sent again by the
5637 * server.
5638 */
5640
5643 else
5644 {
5645 /*
5646 * Report the worker failed while applying changes. Abort the
5647 * current transaction so that the stats message is sent in an
5648 * idle state.
5649 */
5652
5653 PG_RE_THROW();
5654 }
5655 }
5656 PG_END_TRY();
5657}
5658
5659/*
5660 * Runs the leader apply worker.
5661 *
5662 * It sets up replication origin, streaming options and then starts streaming.
5663 */
5664static void
5666{
5667 char originname[NAMEDATALEN];
5669 char *slotname = NULL;
5672 TimeLineID startpointTLI;
5673 char *err;
5674 bool must_use_password;
5675
5676 slotname = MySubscription->slotname;
5677
5678 /*
5679 * This shouldn't happen if the subscription is enabled, but guard against
5680 * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
5681 * slot is NULL.)
5682 */
5683 if (!slotname)
5684 ereport(ERROR,
5686 errmsg("subscription has no replication slot set")));
5687
5688 /* Setup replication origin tracking. */
5690 originname, sizeof(originname));
5693 if (!OidIsValid(originid))
5699
5700 /* Is the use of a password mandatory? */
5703
5705 true, must_use_password,
5707
5709 ereport(ERROR,
5711 errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
5712 MySubscription->name, err)));
5713
5714 /*
5715 * We don't really use the output identify_system for anything but it does
5716 * some initializations on the upstream so let's still call it.
5717 */
5719
5721
5723
5724 /*
5725 * Even when the two_phase mode is requested by the user, it remains as
5726 * the tri-state PENDING until all tablesyncs have reached READY state.
5727 * Only then, can it become ENABLED.
5728 *
5729 * Note: If the subscription has no tables then leave the state as
5730 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
5731 * work.
5732 */
5735 {
5736 /* Start streaming with two_phase enabled */
5737 options.proto.logical.twophase = true;
5739
5741
5742 /*
5743 * Updating pg_subscription might involve TOAST table access, so
5744 * ensure we have a valid snapshot.
5745 */
5747
5752 }
5753 else
5754 {
5756 }
5757
5759 (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
5764 "?")));
5765
5766 /* Run the main loop. */
5768}
5769
5770/*
5771 * Common initialization for leader apply worker, parallel apply worker,
5772 * tablesync worker and sequencesync worker.
5773 *
5774 * Initialize the database connection, in-memory subscription and necessary
5775 * config options.
5776 */
5777void
5779{
5781
5782 /* Run as replica session replication role. */
5783 SetConfigOption("session_replication_role", "replica",
5785
5786 /* Connect to our database. */
5789 0);
5790
5791 /*
5792 * Set always-secure search path, so malicious users can't redirect user
5793 * code (e.g. pg_index.indexprs).
5794 */
5795 SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
5796
5797 /* Load the subscription into persistent memory context. */
5799 "ApplyContext",
5803
5804 /*
5805 * Lock the subscription to prevent it from being concurrently dropped,
5806 * then re-verify its existence. After the initialization, the worker will
5807 * be terminated gracefully if the subscription is dropped.
5808 */
5812 if (!MySubscription)
5813 {
5814 ereport(LOG,
5815 (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
5817
5818 /* Ensure we remove no-longer-useful entry for worker's start time */
5821
5822 proc_exit(0);
5823 }
5824
5825 MySubscriptionValid = true;
5827
5828 if (!MySubscription->enabled)
5829 {
5830 ereport(LOG,
5831 (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
5832 MySubscription->name)));
5833
5835 }
5836
5837 /*
5838 * Restart the worker if retain_dead_tuples was enabled during startup.
5839 *
5840 * At this point, the replication slot used for conflict detection might
5841 * not exist yet, or could be dropped soon if the launcher perceives
5842 * retain_dead_tuples as disabled. To avoid unnecessary tracking of
5843 * oldest_nonremovable_xid when the slot is absent or at risk of being
5844 * dropped, a restart is initiated.
5845 *
5846 * The oldest_nonremovable_xid should be initialized only when the
5847 * subscription's retention is active before launching the worker. See
5848 * logicalrep_worker_launch.
5849 */
5850 if (am_leader_apply_worker() &&
5854 {
5855 ereport(LOG,
5856 errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
5857 MySubscription->name, "retain_dead_tuples"));
5858
5860 }
5861
5862 /* Setup synchronous commit according to the user's wishes */
5863 SetConfigOption("synchronous_commit", MySubscription->synccommit,
5865
5866 /* Change wal_receiver_timeout according to the user's wishes */
5868
5869 /*
5870 * Keep us informed about subscription or role changes. Note that the
5871 * role's superuser privilege can be revoked.
5872 */
5875 (Datum) 0);
5876 /* Changes to foreign servers may affect subscriptions using SERVER. */
5879 (Datum) 0);
5880 /* Changes to user mappings may affect subscriptions using SERVER. */
5883 (Datum) 0);
5884
5885 /*
5886 * Changes to FDW connection_function may affect subscriptions using
5887 * SERVER.
5888 */
5891 (Datum) 0);
5892
5895 (Datum) 0);
5896
5897 if (am_tablesync_worker())
5898 ereport(LOG,
5899 errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
5902 else if (am_sequencesync_worker())
5903 ereport(LOG,
5904 errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started",
5906 else
5907 ereport(LOG,
5908 errmsg("logical replication apply worker for subscription \"%s\" has started",
5910
5912
5913 /*
5914 * Register a callback to reset the origin state before aborting any
5915 * pending transaction during shutdown (see ShutdownPostgres()). This will
5916 * avoid origin advancement for an incomplete transaction which could
5917 * otherwise lead to its loss as such a transaction won't be sent by the
5918 * server again.
5919 *
5920 * Note that even a LOG or DEBUG statement placed after setting the origin
5921 * state may process a shutdown signal before committing the current apply
5922 * operation. So, it is important to register such a callback here.
5923 *
5924 * Register this callback here to ensure that all types of logical
5925 * replication workers that set up origins and apply remote transactions
5926 * are protected.
5927 */
5929}
5930
5931/*
5932 * Callback on exit to clear transaction-level replication origin state.
5933 */
5934static void
5936{
5938}
5939
5940/*
5941 * Common function to setup the leader apply, tablesync and sequencesync worker.
5942 */
5943void
5945{
5946 /* Attach to slot */
5948
5950
5951 /* Setup signal handling */
5954
5955 /*
5956 * We don't currently need any ResourceOwner in a walreceiver process, but
5957 * if we did, we could call CreateAuxProcessResourceOwner here.
5958 */
5959
5960 /* Initialise stats to a sanish value */
5963
5964 /* Load the libpq-specific functions */
5965 load_file("libpqwalreceiver", false);
5966
5968
5969 /* Connect to the origin and start the replication. */
5970 elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
5972
5973 /*
5974 * Setup callback for syscache so that we know when something changes in
5975 * the subscription relation state.
5976 */
5979 (Datum) 0);
5980}
5981
5982/* Logical Replication Apply worker entry point */
5983void
5985{
5987
5989
5991
5993
5995
5996 proc_exit(0);
5997}
5998
5999/*
6000 * After error recovery, disable the subscription in a new transaction
6001 * and exit cleanly.
6002 */
6003void
6005{
6006 /*
6007 * Emit the error message, and recover from the error state to an idle
6008 * state
6009 */
6011
6015
6017
6018 /*
6019 * Report the worker failed during sequence synchronization, table
6020 * synchronization, or apply.
6021 */
6023
6024 /* Disable the subscription */
6026
6027 /*
6028 * Updating pg_subscription might involve TOAST table access, so ensure we
6029 * have a valid snapshot.
6030 */
6032
6036
6037 /* Ensure we remove no-longer-useful entry for worker's start time */
6040
6041 /* Notify the subscription has been disabled and exit */
6042 ereport(LOG,
6043 errmsg("subscription \"%s\" has been disabled because of an error",
6045
6046 /*
6047 * Skip the track_commit_timestamp check when disabling the worker due to
6048 * an error, as verifying commit timestamps is unnecessary in this
6049 * context.
6050 */
6054
6055 proc_exit(0);
6056}
6057
6058/*
6059 * Is current process a logical replication worker?
6060 */
6061bool
6063{
6064 return MyLogicalRepWorker != NULL;
6065}
6066
6067/*
6068 * Is current process a logical replication parallel apply worker?
6069 */
6070bool
6075
6076/*
6077 * Start skipping changes of the transaction if the given LSN matches the
6078 * LSN specified by subscription's skiplsn.
6079 */
6080static void
6082{
6086
6087 /*
6088 * Quick return if it's not requested to skip this transaction. This
6089 * function is called for every remote transaction and we assume that
6090 * skipping the transaction is not used often.
6091 */
6093 MySubscription->skiplsn != finish_lsn))
6094 return;
6095
6096 /* Start skipping all changes of this transaction */
6097 skip_xact_finish_lsn = finish_lsn;
6098
6099 ereport(LOG,
6100 errmsg("logical replication starts skipping transaction at LSN %X/%08X",
6102}
6103
6104/*
6105 * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
6106 */
6107static void
6109{
6110 if (!is_skipping_changes())
6111 return;
6112
6113 ereport(LOG,
6114 errmsg("logical replication completed skipping transaction at LSN %X/%08X",
6116
6117 /* Stop skipping changes */
6119}
6120
6121/*
6122 * Clear subskiplsn of pg_subscription catalog.
6123 *
6124 * finish_lsn is the transaction's finish LSN that is used to check if the
6125 * subskiplsn matches it. If not matched, we raise a warning when clearing the
6126 * subskiplsn in order to inform users for cases e.g., where the user mistakenly
6127 * specified the wrong subskiplsn.
6128 */
6129static void
6131{
6132 Relation rel;
6134 HeapTuple tup;
6136 bool started_tx = false;
6137
6139 return;
6140
6141 if (!IsTransactionState())
6142 {
6144 started_tx = true;
6145 }
6146
6147 /*
6148 * Updating pg_subscription might involve TOAST table access, so ensure we
6149 * have a valid snapshot.
6150 */
6152
6153 /*
6154 * Protect subskiplsn of pg_subscription from being concurrently updated
6155 * while clearing it.
6156 */
6159
6161
6162 /* Fetch the existing tuple. */
6165
6166 if (!HeapTupleIsValid(tup))
6167 elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
6168
6170
6171 /*
6172 * Clear the subskiplsn. If the user has already changed subskiplsn before
6173 * clearing it we don't update the catalog and the replication origin
6174 * state won't get advanced. So in the worst case, if the server crashes
6175 * before sending an acknowledgment of the flush position the transaction
6176 * will be sent again and the user needs to set subskiplsn again. We can
6177 * reduce the possibility by logging a replication origin WAL record to
6178 * advance the origin LSN instead but there is no way to advance the
6179 * origin timestamp and it doesn't seem to be worth doing anything about
6180 * it since it's a very rare case.
6181 */
6182 if (subform->subskiplsn == myskiplsn)
6183 {
6184 bool nulls[Natts_pg_subscription];
6187
6188 memset(values, 0, sizeof(values));
6189 memset(nulls, false, sizeof(nulls));
6190 memset(replaces, false, sizeof(replaces));
6191
6192 /* reset subskiplsn */
6195
6197 replaces);
6198 CatalogTupleUpdate(rel, &tup->t_self, tup);
6199
6200 if (myskiplsn != finish_lsn)
6202 errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
6203 errdetail("Remote transaction's finish WAL location (LSN) %X/%08X did not match skip-LSN %X/%08X.",
6204 LSN_FORMAT_ARGS(finish_lsn),
6206 }
6207
6209 table_close(rel, NoLock);
6210
6212
6213 if (started_tx)
6215}
6216
6217/* Error callback to give more context info about the change being applied */
6218void
6220{
6222
6224 return;
6225
6226 Assert(errarg->origin_name);
6227
6228 if (errarg->rel == NULL)
6229 {
6230 if (!TransactionIdIsValid(errarg->remote_xid))
6231 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
6232 errarg->origin_name,
6234 else if (!XLogRecPtrIsValid(errarg->finish_lsn))
6235 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
6236 errarg->origin_name,
6238 errarg->remote_xid);
6239 else
6240 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
6241 errarg->origin_name,
6243 errarg->remote_xid,
6244 LSN_FORMAT_ARGS(errarg->finish_lsn));
6245 }
6246 else
6247 {
6248 if (errarg->remote_attnum < 0)
6249 {
6250 if (!XLogRecPtrIsValid(errarg->finish_lsn))
6251 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
6252 errarg->origin_name,
6254 errarg->rel->remoterel.nspname,
6255 errarg->rel->remoterel.relname,
6256 errarg->remote_xid);
6257 else
6258 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
6259 errarg->origin_name,
6261 errarg->rel->remoterel.nspname,
6262 errarg->rel->remoterel.relname,
6263 errarg->remote_xid,
6264 LSN_FORMAT_ARGS(errarg->finish_lsn));
6265 }
6266 else
6267 {
6268 if (!XLogRecPtrIsValid(errarg->finish_lsn))
6269 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
6270 errarg->origin_name,
6272 errarg->rel->remoterel.nspname,
6273 errarg->rel->remoterel.relname,
6274 errarg->rel->remoterel.attnames[errarg->remote_attnum],
6275 errarg->remote_xid);
6276 else
6277 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
6278 errarg->origin_name,
6280 errarg->rel->remoterel.nspname,
6281 errarg->rel->remoterel.relname,
6282 errarg->rel->remoterel.attnames[errarg->remote_attnum],
6283 errarg->remote_xid,
6284 LSN_FORMAT_ARGS(errarg->finish_lsn));
6285 }
6286 }
6287}
6288
6289/* Set transaction information of apply error callback */
6290static inline void
6296
6297/* Reset all information of apply error callback */
6298static inline void
6306
6307/*
6308 * Request wakeup of the workers for the given subscription OID
6309 * at commit of the current transaction.
6310 *
6311 * This is used to ensure that the workers process assorted changes
6312 * as soon as possible.
6313 */
6314void
6324
6325/*
6326 * Wake up the workers of any subscriptions that were changed in this xact.
6327 */
6328void
6330{
6332 {
6333 ListCell *lc;
6334
6337 {
6338 Oid subid = lfirst_oid(lc);
6339 List *workers;
6340 ListCell *lc2;
6341
6342 workers = logicalrep_workers_find(subid, true, false);
6343 foreach(lc2, workers)
6344 {
6346
6348 }
6349 }
6351 }
6352
6353 /* The List storage will be reclaimed automatically in xact cleanup. */
6355}
6356
6357/*
6358 * Allocate the origin name in long-lived context for error context message.
6359 */
6360void
6366
6367/*
6368 * Return the action to be taken for the given transaction. See
6369 * TransApplyAction for information on each of the actions.
6370 *
6371 * *winfo is assigned to the destination parallel worker info when the leader
6372 * apply worker has to pass all the transaction's changes to the parallel
6373 * apply worker.
6374 */
6375static TransApplyAction
6377{
6378 *winfo = NULL;
6379
6381 {
6382 return TRANS_PARALLEL_APPLY;
6383 }
6384
6385 /*
6386 * If we are processing this transaction using a parallel apply worker
6387 * then either we send the changes to the parallel worker or if the worker
6388 * is busy then serialize the changes to the file which will later be
6389 * processed by the parallel worker.
6390 */
6391 *winfo = pa_find_worker(xid);
6392
6393 if (*winfo && (*winfo)->serialize_changes)
6394 {
6396 }
6397 else if (*winfo)
6398 {
6400 }
6401
6402 /*
6403 * If there is no parallel worker involved to process this transaction
6404 * then we either directly apply the change or serialize it to a file
6405 * which will later be applied when the transaction finish message is
6406 * processed.
6407 */
6408 else if (in_streamed_transaction)
6409 {
6411 }
6412 else
6413 {
6414 return TRANS_LEADER_APPLY;
6415 }
6416}
AclResult
Definition acl.h:182
@ ACLCHECK_OK
Definition acl.h:183
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition aclchk.c:2654
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition aclchk.c:4057
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_reset_subtrans(void)
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerShared * MyParallelShared
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_allocate_worker(TransactionId xid)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void pa_decr_and_wait_stream_block(void)
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition atomics.h:424
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition worker.c:2753
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition worker.c:5402
static void begin_replication_step(void)
Definition worker.c:730
static void end_replication_step(void)
Definition worker.c:753
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
Definition worker.c:874
static void cleanup_subxact_info(void)
Definition worker.c:5606
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
Definition worker.c:5556
static void apply_handle_stream_prepare(StringInfo s)
Definition worker.c:1522
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition worker.c:2728
static void subxact_info_add(TransactionId xid)
Definition worker.c:5324
static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
Definition worker.c:4773
static XLogRecPtr last_flushpos
Definition worker.c:529
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition worker.c:5423
MemoryContext ApplyMessageContext
Definition worker.c:473
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition worker.c:685
static void apply_handle_type(StringInfo s)
Definition worker.c:2590
static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
Definition worker.c:4405
static void wait_for_local_flush(RetainDeadTuplesData *rdt_data)
Definition worker.c:4618
static void apply_handle_truncate(StringInfo s)
Definition worker.c:3651
RetainDeadTuplesPhase
Definition worker.c:390
@ RDT_WAIT_FOR_PUBLISHER_STATUS
Definition worker.c:393
@ RDT_RESUME_CONFLICT_INFO_RETENTION
Definition worker.c:396
@ RDT_GET_CANDIDATE_XID
Definition worker.c:391
@ RDT_REQUEST_PUBLISHER_STATUS
Definition worker.c:392
@ RDT_WAIT_FOR_LOCAL_FLUSH
Definition worker.c:394
@ RDT_STOP_CONFLICT_INFO_RETENTION
Definition worker.c:395
static void run_apply_worker(void)
Definition worker.c:5665
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition worker.c:3969
static void get_candidate_xid(RetainDeadTuplesData *rdt_data)
Definition worker.c:4457
static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
Definition worker.c:6376
TransApplyAction
Definition worker.c:372
@ TRANS_LEADER_SERIALIZE
Definition worker.c:377
@ TRANS_PARALLEL_APPLY
Definition worker.c:380
@ TRANS_LEADER_SEND_TO_PARALLEL
Definition worker.c:378
@ TRANS_LEADER_APPLY
Definition worker.c:374
@ TRANS_LEADER_PARTIAL_SERIALIZE
Definition worker.c:379
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition worker.c:781
static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
Definition worker.c:5540
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition worker.c:5409
bool InitializingApplyWorker
Definition worker.c:501
static void apply_worker_exit(void)
Definition worker.c:5008
static BufFile * stream_fd
Definition worker.c:522
static void apply_handle_update(StringInfo s)
Definition worker.c:2794
void stream_stop_internal(TransactionId xid)
Definition worker.c:1866
static void apply_handle_stream_commit(StringInfo s)
Definition worker.c:2394
void start_apply(XLogRecPtr origin_startpos)
Definition worker.c:5625
static void stop_skipping_changes(void)
Definition worker.c:6108
#define NAPTIME_PER_CYCLE
Definition worker.c:301
static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition worker.c:3178
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition worker.c:3899
static bool update_retention_status(bool active)
Definition worker.c:4886
static uint32 parallel_stream_nchanges
Definition worker.c:498
static void apply_handle_commit_prepared(StringInfo s)
Definition worker.c:1409
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition worker.c:3985
void LogicalRepWorkersWakeupAtCommit(Oid subid)
Definition worker.c:6315
#define MAX_XID_ADVANCE_INTERVAL
Definition worker.c:458
bool IsLogicalWorker(void)
Definition worker.c:6062
static ApplySubXactData subxact_data
Definition worker.c:547
static void ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, pgoff_t offset)
Definition worker.c:2232
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
Definition worker.c:3355
static ApplyErrorCallbackArg apply_error_callback_arg
Definition worker.c:461
static void subscription_change_cb(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
Definition worker.c:5209
bool in_remote_transaction
Definition worker.c:486
static XLogRecPtr skip_xact_finish_lsn
Definition worker.c:518
static void stream_open_file(Oid subid, TransactionId xid, bool first_segment)
Definition worker.c:5447
static void apply_handle_delete(StringInfo s)
Definition worker.c:3016
void apply_dispatch(StringInfo s)
Definition worker.c:3779
static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
Definition worker.c:4959
#define is_skipping_changes()
Definition worker.c:519
static void stream_write_change(char action, StringInfo s)
Definition worker.c:5510
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
Definition worker.c:6130
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
Definition worker.c:2911
#define MIN_XID_ADVANCE_INTERVAL
Definition worker.c:457
static void apply_handle_begin(StringInfo s)
Definition worker.c:1215
void DisableSubscriptionAndExit(void)
Definition worker.c:6004
static dlist_head lsn_mapping
Definition worker.c:310
bool IsLogicalParallelApplyWorker(void)
Definition worker.c:6071
void AtEOXact_LogicalRepWorkers(bool isCommit)
Definition worker.c:6329
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition worker.c:1021
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition worker.c:645
static void finish_edata(ApplyExecutionData *edata)
Definition worker.c:932
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition worker.c:1122
static void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
Definition worker.c:6291
ErrorContextCallback * apply_error_context_stack
Definition worker.c:471
static void stream_abort_internal(TransactionId xid, TransactionId subxid)
Definition worker.c:1992
static void apply_handle_commit(StringInfo s)
Definition worker.c:1240
static bool IsIndexUsableForFindingDeletedTuple(Oid localindexoid, TransactionId conflict_detection_xmin)
Definition worker.c:3239
void stream_start_internal(TransactionId xid, bool first_segment)
Definition worker.c:1691
static List * on_commit_wakeup_workers_subids
Definition worker.c:484
static void apply_handle_stream_abort(StringInfo s)
Definition worker.c:2075
static void apply_handle_relation(StringInfo s)
Definition worker.c:2567
void set_apply_error_context_origin(char *originname)
Definition worker.c:6361
static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data, bool status_received)
Definition worker.c:4559
MemoryContext ApplyContext
Definition worker.c:474
static void subxact_info_write(Oid subid, TransactionId xid)
Definition worker.c:5224
static void TargetPrivilegesCheck(Relation rel, AclMode mode)
Definition worker.c:2605
static void apply_handle_prepare(StringInfo s)
Definition worker.c:1335
static void apply_handle_rollback_prepared(StringInfo s)
Definition worker.c:1461
void SetupApplyOrSyncWorker(int worker_slot)
Definition worker.c:5944
static void apply_handle_stream_stop(StringInfo s)
Definition worker.c:1889
static void apply_handle_origin(StringInfo s)
Definition worker.c:1670
static void request_publisher_status(RetainDeadTuplesData *rdt_data)
Definition worker.c:4520
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition worker.c:4301
static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
Definition worker.c:4925
static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data, bool status_received)
Definition worker.c:4427
static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data, bool status_received)
Definition worker.c:4391
WalReceiverConn * LogRepWorkerWalRcvConn
Definition worker.c:479
static void resume_conflict_info_retention(RetainDeadTuplesData *rdt_data)
Definition worker.c:4846
static XLogRecPtr remote_final_lsn
Definition worker.c:487
static bool MySubscriptionValid
Definition worker.c:482
void apply_error_callback(void *arg)
Definition worker.c:6219
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition worker.c:3943
static MemoryContext LogicalStreamingContext
Definition worker.c:477
void maybe_reread_subscription(void)
Definition worker.c:5042
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
Definition worker.c:2507
void InitializeLogRepWorker(void)
Definition worker.c:5778
static void set_wal_receiver_timeout(void)
Definition worker.c:5174
static bool in_streamed_transaction
Definition worker.c:490
static void apply_handle_begin_prepare(StringInfo s)
Definition worker.c:1269
void ApplyWorkerMain(Datum main_arg)
Definition worker.c:5984
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition worker.c:2264
static void apply_handle_stream_start(StringInfo s)
Definition worker.c:1729
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Definition worker.c:6081
static void on_exit_clear_xact_state(int code, Datum arg)
Definition worker.c:5935
static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
Definition worker.c:4808
Subscription * MySubscription
Definition worker.c:481
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
Definition worker.c:1298
static void stream_close_file(void)
Definition worker.c:5492
static TransactionId stream_xid
Definition worker.c:492
static void apply_handle_insert(StringInfo s)
Definition worker.c:2637
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition worker.c:963
static bool FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, TupleTableSlot *remoteslot, TransactionId *delete_xid, ReplOriginId *delete_origin, TimestampTz *delete_time)
Definition worker.c:3273
static void subxact_info_read(Oid subid, TransactionId xid)
Definition worker.c:5273
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
Definition worker.c:3110
static void reset_apply_error_context_info(void)
Definition worker.c:6299
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1755
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1779
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1643
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1607
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_IDLE
@ STATE_IDLEINTRANSACTION
@ STATE_RUNNING
void BackgroundWorkerUnblockSignals(void)
Definition bgworker.c:934
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition bgworker.c:894
Bitmapset * bms_make_singleton(int x)
Definition bitmapset.c:216
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition bitmapset.c:799
static Datum values[MAXATTR]
Definition bootstrap.c:187
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition buffile.c:292
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition buffile.c:655
int BufFileSeek(BufFile *file, int fileno, pgoff_t offset, int whence)
Definition buffile.c:741
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition buffile.c:677
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
Definition buffile.c:665
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition buffile.c:268
void BufFileTruncateFileSet(BufFile *file, int fileno, pgoff_t offset)
Definition buffile.c:928
void BufFileTell(BufFile *file, int *fileno, pgoff_t *offset)
Definition buffile.c:833
void BufFileClose(BufFile *file)
Definition buffile.c:413
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
Definition buffile.c:365
#define Min(x, y)
Definition c.h:1054
#define likely(x)
Definition c.h:423
#define Assert(condition)
Definition c.h:906
int64_t int64
Definition c.h:576
uint64_t uint64
Definition c.h:580
uint32_t uint32
Definition c.h:579
#define pg_fallthrough
Definition c.h:144
uint32 TransactionId
Definition c.h:699
#define OidIsValid(objectId)
Definition c.h:821
size_t Size
Definition c.h:652
bool track_commit_timestamp
Definition commit_ts.c:109
bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, ReplOriginId *localorigin, TimestampTz *localts)
Definition conflict.c:63
void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples)
Definition conflict.c:104
void InitConflictIndexes(ResultRelInfo *relInfo)
Definition conflict.c:139
ConflictType
Definition conflict.h:32
@ CT_UPDATE_DELETED
Definition conflict.h:43
@ CT_DELETE_MISSING
Definition conflict.h:52
@ CT_UPDATE_ORIGIN_DIFFERS
Definition conflict.h:37
@ CT_UPDATE_MISSING
Definition conflict.h:46
@ CT_DELETE_ORIGIN_DIFFERS
Definition conflict.h:49
int64 TimestampTz
Definition timestamp.h:39
void load_file(const char *filename, bool restricted)
Definition dfmgr.c:149
Datum arg
Definition elog.c:1322
void EmitErrorReport(void)
Definition elog.c:1882
ErrorContextCallback * error_context_stack
Definition elog.c:99
void FlushErrorState(void)
Definition elog.c:2062
int errcode(int sqlerrcode)
Definition elog.c:874
#define LOG
Definition elog.h:31
#define PG_RE_THROW()
Definition elog.h:405
int int errdetail_internal(const char *fmt,...) pg_attribute_printf(1
#define errcontext
Definition elog.h:198
int errdetail(const char *fmt,...) pg_attribute_printf(1
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define PG_TRY(...)
Definition elog.h:372
#define WARNING
Definition elog.h:36
#define DEBUG2
Definition elog.h:29
#define PG_END_TRY(...)
Definition elog.h:397
#define DEBUG1
Definition elog.h:30
#define ERROR
Definition elog.h:39
#define PG_CATCH(...)
Definition elog.h:382
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
bool equal(const void *a, const void *b)
Definition equalfuncs.c:223
void err(int eval, const char *fmt,...)
Definition err.c:43
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition execExpr.c:143
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition execMain.c:1860
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam, List *resultRelations)
Definition execMain.c:2722
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition execMain.c:1247
void EvalPlanQualEnd(EPQState *epqstate)
Definition execMain.c:3183
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
void CheckSubscriptionRelkind(char localrelkind, char remoterelkind, const char *nspname, const char *relname)
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
bool RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, ReplOriginId *delete_origin, TimestampTz *delete_time)
bool RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, ReplOriginId *delete_origin, TimestampTz *delete_time)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
const TupleTableSlotOps TTSOpsVirtual
Definition execTuples.c:84
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
TupleConversionMap * ExecGetRootToChildMap(ResultRelInfo *resultRelInfo, EState *estate)
Definition execUtils.c:1326
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos, Bitmapset *unpruned_relids)
Definition execUtils.c:773
void FreeExecutorState(EState *estate)
Definition execUtils.c:192
EState * CreateExecutorState(void)
Definition execUtils.c:88
#define GetPerTupleExprContext(estate)
Definition executor.h:656
#define GetPerTupleMemoryContext(estate)
Definition executor.h:661
#define EvalPlanQualSetSlot(epqstate, slot)
Definition executor.h:289
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition executor.h:393
#define ERRCODE_PROTOCOL_VIOLATION
Definition fe-connect.c:96
#define palloc_object(type)
Definition fe_memutils.h:74
#define repalloc_array(pointer, type, count)
Definition fe_memutils.h:78
#define palloc_array(type, count)
Definition fe_memutils.h:76
#define palloc0_object(type)
Definition fe_memutils.h:75
void FileSetInit(FileSet *fileset)
Definition fileset.c:52
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
Definition fmgr.c:1772
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition fmgr.c:1754
struct Latch * MyLatch
Definition globals.c:63
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
bool parse_int(const char *value, int *result, int flags, const char **hintmsg)
Definition guc.c:2743
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition guc.c:4196
@ PGC_S_OVERRIDE
Definition guc.h:123
@ PGC_S_SESSION
Definition guc.h:126
@ PGC_SUSET
Definition guc.h:78
@ PGC_SIGHUP
Definition guc.h:75
@ PGC_BACKEND
Definition guc.h:77
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition heaptuple.c:1210
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static TransactionId HeapTupleHeaderGetXmin(const HeapTupleHeaderData *tup)
static void * GETSTRUCT(const HeapTupleData *tuple)
static void dlist_delete(dlist_node *node)
Definition ilist.h:405
#define dlist_tail_element(type, membername, lhead)
Definition ilist.h:612
#define dlist_foreach_modify(iter, lhead)
Definition ilist.h:640
static bool dlist_is_empty(const dlist_head *head)
Definition ilist.h:336
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition ilist.h:364
#define DLIST_STATIC_INIT(name)
Definition ilist.h:281
#define dlist_container(type, membername, ptr)
Definition ilist.h:593
void index_close(Relation relation, LOCKMODE lockmode)
Definition indexam.c:177
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition indexam.c:133
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition indexing.c:313
long val
Definition informix.c:689
#define write(a, b, c)
Definition win32.h:14
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
void AcceptInvalidationMessages(void)
Definition inval.c:930
void CacheRegisterSyscacheCallback(SysCacheIdentifier cacheid, SyscacheCallbackFunction func, Datum arg)
Definition inval.c:1816
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
void proc_exit(int code)
Definition ipc.c:105
int i
Definition isn.c:77
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition latch.c:223
void ResetLatch(Latch *latch)
Definition latch.c:374
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition launcher.c:294
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition launcher.c:747
void logicalrep_worker_attach(int slot)
Definition launcher.c:758
void ApplyLauncherWakeup(void)
Definition launcher.c:1195
LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
Definition launcher.c:259
void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
Definition launcher.c:724
LogicalRepWorker * MyLogicalRepWorker
Definition launcher.c:57
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition launcher.c:1155
List * lappend(List *list, void *datum)
Definition list.c:339
List * lappend_oid(List *list, Oid datum)
Definition list.c:375
List * list_append_unique_oid(List *list, Oid datum)
Definition list.c:1380
bool list_member_oid(const List *list, Oid datum)
Definition list.c:722
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
int LOCKMODE
Definition lockdefs.h:26
#define NoLock
Definition lockdefs.h:34
#define AccessExclusiveLock
Definition lockdefs.h:43
#define AccessShareLock
Definition lockdefs.h:36
#define RowExclusiveLock
Definition lockdefs.h:38
@ LockTupleExclusive
Definition lockoptions.h:59
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
#define LOGICALREP_COLUMN_UNCHANGED
LogicalRepMsgType
@ LOGICAL_REP_MSG_INSERT
@ LOGICAL_REP_MSG_TRUNCATE
@ LOGICAL_REP_MSG_STREAM_STOP
@ LOGICAL_REP_MSG_BEGIN
@ LOGICAL_REP_MSG_STREAM_PREPARE
@ LOGICAL_REP_MSG_STREAM_ABORT
@ LOGICAL_REP_MSG_BEGIN_PREPARE
@ LOGICAL_REP_MSG_STREAM_START
@ LOGICAL_REP_MSG_COMMIT
@ LOGICAL_REP_MSG_PREPARE
@ LOGICAL_REP_MSG_RELATION
@ LOGICAL_REP_MSG_MESSAGE
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
@ LOGICAL_REP_MSG_COMMIT_PREPARED
@ LOGICAL_REP_MSG_TYPE
@ LOGICAL_REP_MSG_DELETE
@ LOGICAL_REP_MSG_STREAM_COMMIT
@ LOGICAL_REP_MSG_ORIGIN
@ LOGICAL_REP_MSG_UPDATE
uint32 LogicalRepRelId
#define LOGICALREP_PROTO_VERSION_NUM
#define LOGICALREP_COLUMN_BINARY
#define LOGICALREP_COLUMN_TEXT
char * get_rel_name(Oid relid)
Definition lsyscache.c:2078
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition lsyscache.c:3026
char * get_namespace_name(Oid nspid)
Definition lsyscache.c:3518
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition lsyscache.c:3092
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1177
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1794
@ LW_SHARED
Definition lwlock.h:113
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition mcxt.c:1768
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:403
MemoryContext TopTransactionContext
Definition mcxt.c:171
char * pstrdup(const char *in)
Definition mcxt.c:1781
void * repalloc(void *pointer, Size size)
Definition mcxt.c:1632
void pfree(void *pointer)
Definition mcxt.c:1616
MemoryContext TopMemoryContext
Definition mcxt.c:166
void * palloc(Size size)
Definition mcxt.c:1387
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
#define RESUME_INTERRUPTS()
Definition miscadmin.h:136
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
#define HOLD_INTERRUPTS()
Definition miscadmin.h:134
Oid GetUserId(void)
Definition miscinit.c:470
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition miscinit.c:989
CmdType
Definition nodes.h:273
@ CMD_INSERT
Definition nodes.h:277
@ CMD_DELETE
Definition nodes.h:278
@ CMD_UPDATE
Definition nodes.h:276
#define makeNode(_type_)
Definition nodes.h:161
static char * errmsg
ObjectType get_relkind_objtype(char relkind)
ReplOriginId replorigin_create(const char *roname)
Definition origin.c:263
ReplOriginXactState replorigin_xact_state
Definition origin.c:167
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:232
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition origin.c:1329
void replorigin_xact_clear(bool clear_origin)
Definition origin.c:1353
void replorigin_session_setup(ReplOriginId node, int acquired_by)
Definition origin.c:1147
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
#define ACL_DELETE
Definition parsenodes.h:79
uint64 AclMode
Definition parsenodes.h:74
#define ACL_INSERT
Definition parsenodes.h:76
#define ACL_UPDATE
Definition parsenodes.h:78
@ RTE_RELATION
@ DROP_RESTRICT
#define ACL_SELECT
Definition parsenodes.h:77
#define ACL_TRUNCATE
Definition parsenodes.h:80
int16 attnum
FormData_pg_attribute * Form_pg_attribute
static uint32 pg_ceil_log2_32(uint32 num)
static PgChecksumMode mode
#define NAMEDATALEN
#define MAXPGPATH
const void size_t len
static int server_version
Definition pg_dumpall.c:122
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
#define lfirst(lc)
Definition pg_list.h:172
#define NIL
Definition pg_list.h:68
#define list_make1(x1)
Definition pg_list.h:212
static void * list_nth(const List *list, int n)
Definition pg_list.h:299
#define lfirst_oid(lc)
Definition pg_list.h:174
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
void FreeSubscription(Subscription *sub)
Subscription * GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
void DisableSubscription(Oid subid)
void UpdateDeadTupleRetentionStatus(Oid subid, bool active)
END_CATALOG_STRUCT typedef FormData_pg_subscription * Form_pg_subscription
static char buf[DEFAULT_XLOG_SEG_SIZE]
long pgstat_report_stat(bool force)
Definition pgstat.c:704
void pgstat_report_subscription_error(Oid subid)
int64 timestamp
Expr * expression_planner(Expr *expr)
Definition planner.c:6819
#define pqsignal
Definition port.h:547
int pgsocket
Definition port.h:29
#define snprintf
Definition port.h:260
#define PGINVALID_SOCKET
Definition port.h:31
off_t pgoff_t
Definition port.h:421
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:262
uint64_t Datum
Definition postgres.h:70
static int32 DatumGetInt32(Datum X)
Definition postgres.h:212
#define InvalidOid
unsigned int Oid
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition pqformat.c:414
int pq_getmsgbyte(StringInfo msg)
Definition pqformat.c:398
int64 pq_getmsgint64(StringInfo msg)
Definition pqformat.c:452
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition pqformat.h:152
char * c
static int fd(const char *x, int i)
static int fb(int x)
char * s2
TransactionId GetOldestActiveTransactionId(bool inCommitOnly, bool allDbs)
Definition procarray.c:2837
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition proto.c:98
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition proto.c:561
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
Definition proto.c:325
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
Definition proto.c:134
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition proto.c:757
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition proto.c:487
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition proto.c:615
void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
Definition proto.c:1187
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition proto.c:63
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
Definition proto.c:267
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition proto.c:698
const char * logicalrep_message_type(LogicalRepMsgType action)
Definition proto.c:1212
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition proto.c:365
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition proto.c:1132
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition proto.c:428
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition proto.c:228
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition proto.c:1082
#define PqReplMsg_WALData
Definition protocol.h:77
#define PqReplMsg_PrimaryStatusRequest
Definition protocol.h:83
#define PqReplMsg_Keepalive
Definition protocol.h:75
#define PqReplMsg_PrimaryStatusUpdate
Definition protocol.h:76
#define PqReplMsg_StandbyStatusUpdate
Definition protocol.h:84
static color newsub(struct colormap *cm, color co)
Definition regc_color.c:389
#define RelationGetRelid(relation)
Definition rel.h:514
#define RelationIsLogicallyLogged(relation)
Definition rel.h:710
#define RelationGetDescr(relation)
Definition rel.h:540
#define RelationGetRelationName(relation)
Definition rel.h:548
#define RELATION_IS_OTHER_TEMP(relation)
Definition rel.h:667
#define RelationGetNamespace(relation)
Definition rel.h:555
List * RelationGetIndexList(Relation relation)
Definition relcache.c:4831
ResourceOwner TopTransactionResourceOwner
Definition resowner.c:175
ResourceOwner CurrentResourceOwner
Definition resowner.c:173
Node * build_column_default(Relation rel, int attrno)
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition rls.c:52
@ RLS_ENABLED
Definition rls.h:45
Snapshot GetTransactionSnapshot(void)
Definition snapmgr.c:272
void PushActiveSnapshot(Snapshot snapshot)
Definition snapmgr.c:682
void PopActiveSnapshot(void)
Definition snapmgr.c:775
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
Definition relation.c:585
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition relation.c:647
bool IsIndexUsableForReplicaIdentityFull(Relation idxrel, AttrMap *attrmap)
Definition relation.c:835
Oid GetRelationIdentityOrPK(Relation rel)
Definition relation.c:905
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition relation.c:165
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition relation.c:518
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition relation.c:362
StringInfo makeStringInfo(void)
Definition stringinfo.c:72
void resetStringInfo(StringInfo str)
Definition stringinfo.c:126
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition stringinfo.h:157
TransactionId remote_xid
Definition worker.c:332
LogicalRepMsgType command
Definition worker.c:327
XLogRecPtr finish_lsn
Definition worker.c:333
LogicalRepRelMapEntry * rel
Definition worker.c:328
ResultRelInfo * targetRelInfo
Definition worker.c:317
EState * estate
Definition worker.c:314
PartitionTupleRouting * proute
Definition worker.c:321
ModifyTableState * mtstate
Definition worker.c:320
LogicalRepRelMapEntry * targetRel
Definition worker.c:316
uint32 nsubxacts
Definition worker.c:541
uint32 nsubxacts_max
Definition worker.c:542
SubXactInfo * subxacts
Definition worker.c:544
TransactionId subxact_last
Definition worker.c:543
int maplen
Definition attmap.h:37
AttrNumber * attnums
Definition attmap.h:36
List * es_rteperminfos
Definition execnodes.h:671
List * es_tupleTable
Definition execnodes.h:715
List * es_opened_result_relations
Definition execnodes.h:691
CommandId es_output_cid
Definition execnodes.h:685
struct ErrorContextCallback * previous
Definition elog.h:297
void(* callback)(void *arg)
Definition elog.h:298
dlist_node node
Definition worker.c:305
XLogRecPtr remote_end
Definition worker.c:307
XLogRecPtr local_end
Definition worker.c:306
Definition pg_list.h:54
LogicalRepRelation remoterel
TimestampTz last_recv_time
LogicalRepWorkerType type
TimestampTz reply_time
FileSet * stream_fileset
TransactionId oldest_nonremovable_xid
TimestampTz last_send_time
ResultRelInfo * resultRelInfo
Definition execnodes.h:1411
ParallelApplyWorkerShared * shared
pg_atomic_uint32 pending_stream_count
Plan * plan
Definition execnodes.h:1168
EState * state
Definition execnodes.h:1170
Form_pg_class rd_rel
Definition rel.h:111
ReplOriginId origin
Definition origin.h:45
XLogRecPtr origin_lsn
Definition origin.h:46
TimestampTz origin_timestamp
Definition origin.h:47
TimestampTz flushpos_update_time
Definition worker.c:434
FullTransactionId remote_oldestxid
Definition worker.c:414
FullTransactionId remote_wait_for
Definition worker.c:430
TimestampTz last_recv_time
Definition worker.c:445
TimestampTz candidate_xid_time
Definition worker.c:446
long table_sync_wait_time
Definition worker.c:438
FullTransactionId remote_nextxid
Definition worker.c:421
RetainDeadTuplesPhase phase
Definition worker.c:405
XLogRecPtr remote_lsn
Definition worker.c:406
TimestampTz reply_time
Definition worker.c:423
TransactionId candidate_xid
Definition worker.c:432
TransactionId xid
Definition worker.c:533
pgoff_t offset
Definition worker.c:535
int fileno
Definition worker.c:534
XLogRecPtr skiplsn
AttrMap * attrMap
Definition tupconvert.h:28
TupleDesc tts_tupleDescriptor
Definition tuptable.h:122
bool * tts_isnull
Definition tuptable.h:126
Datum * tts_values
Definition tuptable.h:124
dlist_node * cur
Definition ilist.h:200
void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
void ProcessSyncingRelations(XLogRecPtr current_lsn)
Definition syncutils.c:156
void InvalidateSyncingRelStates(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
Definition syncutils.c:101
#define FirstLowInvalidHeapAttributeNumber
Definition sysattr.h:27
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:264
HeapTuple SearchSysCache1(SysCacheIdentifier cacheId, Datum key1)
Definition syscache.c:220
#define SearchSysCacheCopy1(cacheId, key1)
Definition syscache.h:91
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition tableam.c:92
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs, bool run_as_table_owner)
Definition tablecmds.c:1991
bool AllTablesyncsReady(void)
Definition tablesync.c:1597
bool HasSubscriptionTablesCached(void)
Definition tablesync.c:1627
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition tablesync.c:1648
#define InvalidTransactionId
Definition transam.h:31
#define FullTransactionIdPrecedesOrEquals(a, b)
Definition transam.h:52
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition transam.h:282
static FullTransactionId FullTransactionIdFromU64(uint64 value)
Definition transam.h:81
#define TransactionIdEquals(id1, id2)
Definition transam.h:43
#define TransactionIdIsValid(xid)
Definition transam.h:41
#define InvalidFullTransactionId
Definition transam.h:56
#define FullTransactionIdIsValid(x)
Definition transam.h:55
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263
void AfterTriggerEndQuery(EState *estate)
Definition trigger.c:5135
void AfterTriggerBeginQuery(void)
Definition trigger.c:5115
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition tupconvert.c:103
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition tupconvert.c:193
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition tupdesc.h:160
static CompactAttribute * TupleDescCompactAttr(TupleDesc tupdesc, int i)
Definition tupdesc.h:175
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition tuptable.h:457
static void slot_getallattrs(TupleTableSlot *slot)
Definition tuptable.h:371
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition tuptable.h:524
void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res, int szgid)
Definition twophase.c:2749
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
Definition twophase.c:2690
void FinishPreparedTransaction(const char *gid, bool isCommit)
Definition twophase.c:1499
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition usercontext.c:87
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
const char * type
#define WL_SOCKET_READABLE
#define WL_TIMEOUT
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET
static StringInfoData reply_message
int wal_receiver_status_interval
Definition walreceiver.c:90
int wal_receiver_timeout
Definition walreceiver.c:91
#define walrcv_startstreaming(conn, options)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_send(conn, buffer, nbytes)
#define walrcv_server_version(conn)
#define walrcv_endstreaming(conn, next_tli)
#define walrcv_identify_system(conn, primary_tli)
#define walrcv_receive(conn, buffer, wait_fd)
int WalWriterDelay
Definition walwriter.c:71
#define SIGHUP
Definition win32_port.h:158
@ PARALLEL_TRANS_STARTED
@ PARALLEL_TRANS_FINISHED
static bool am_parallel_apply_worker(void)
@ WORKERTYPE_TABLESYNC
@ WORKERTYPE_UNKNOWN
@ WORKERTYPE_SEQUENCESYNC
@ WORKERTYPE_PARALLEL_APPLY
@ WORKERTYPE_APPLY
@ FS_SERIALIZE_DONE
static bool am_sequencesync_worker(void)
static bool am_tablesync_worker(void)
static bool am_leader_apply_worker(void)
bool IsTransactionOrTransactionBlock(void)
Definition xact.c:5012
bool PrepareTransactionBlock(const char *gid)
Definition xact.c:4015
bool IsTransactionState(void)
Definition xact.c:389
void CommandCounterIncrement(void)
Definition xact.c:1102
void StartTransactionCommand(void)
Definition xact.c:3081
void SetCurrentStatementStartTimestamp(void)
Definition xact.c:916
bool IsTransactionBlock(void)
Definition xact.c:4994
void BeginTransactionBlock(void)
Definition xact.c:3947
void CommitTransactionCommand(void)
Definition xact.c:3179
bool EndTransactionBlock(bool chain)
Definition xact.c:4067
void AbortOutOfAnyTransaction(void)
Definition xact.c:4885
CommandId GetCurrentCommandId(bool used)
Definition xact.c:831
#define GIDSIZE
Definition xact.h:31
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition xlog.c:6609
XLogRecPtr XactLastCommitEnd
Definition xlog.c:259
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint16 ReplOriginId
Definition xlogdefs.h:69
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
uint32 TimeLineID
Definition xlogdefs.h:63