PostgreSQL Source Code git master
worker.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 * worker.c
3 * PostgreSQL logical replication worker (apply)
4 *
5 * Copyright (c) 2016-2025, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/worker.c
9 *
10 * NOTES
11 * This file contains the worker which applies logical changes as they come
12 * from remote logical replication stream.
13 *
14 * The main worker (apply) is started by logical replication worker
15 * launcher for every enabled subscription in a database. It uses
16 * walsender protocol to communicate with publisher.
17 *
18 * This module includes server facing code and shares libpqwalreceiver
19 * module with walreceiver for providing the libpq specific functionality.
20 *
21 *
22 * STREAMED TRANSACTIONS
23 * ---------------------
24 * Streamed transactions (large transactions exceeding a memory limit on the
25 * upstream) are applied using one of two approaches:
26 *
27 * 1) Write to temporary files and apply when the final commit arrives
28 *
29 * This approach is used when the user has set the subscription's streaming
30 * option as on.
31 *
32 * Unlike the regular (non-streamed) case, handling streamed transactions has
33 * to handle aborts of both the toplevel transaction and subtransactions. This
34 * is achieved by tracking offsets for subtransactions, which is then used
35 * to truncate the file with serialized changes.
36 *
37 * The files are placed in tmp file directory by default, and the filenames
38 * include both the XID of the toplevel transaction and OID of the
39 * subscription. This is necessary so that different workers processing a
40 * remote transaction with the same XID doesn't interfere.
41 *
42 * We use BufFiles instead of using normal temporary files because (a) the
43 * BufFile infrastructure supports temporary files that exceed the OS file size
44 * limit, (b) provides a way for automatic clean up on the error and (c) provides
45 * a way to survive these files across local transactions and allow to open and
46 * close at stream start and close. We decided to use FileSet
47 * infrastructure as without that it deletes the files on the closure of the
48 * file and if we decide to keep stream files open across the start/stop stream
49 * then it will consume a lot of memory (more than 8K for each BufFile and
50 * there could be multiple such BufFiles as the subscriber could receive
51 * multiple start/stop streams for different transactions before getting the
52 * commit). Moreover, if we don't use FileSet then we also need to invent
53 * a new way to pass filenames to BufFile APIs so that we are allowed to open
54 * the file we desired across multiple stream-open calls for the same
55 * transaction.
56 *
57 * 2) Parallel apply workers.
58 *
59 * This approach is used when the user has set the subscription's streaming
60 * option as parallel. See logical/applyparallelworker.c for information about
61 * this approach.
62 *
63 * TWO_PHASE TRANSACTIONS
64 * ----------------------
65 * Two phase transactions are replayed at prepare and then committed or
66 * rolled back at commit prepared and rollback prepared respectively. It is
67 * possible to have a prepared transaction that arrives at the apply worker
68 * when the tablesync is busy doing the initial copy. In this case, the apply
69 * worker skips all the prepared operations [e.g. inserts] while the tablesync
70 * is still busy (see the condition of should_apply_changes_for_rel). The
71 * tablesync worker might not get such a prepared transaction because say it
72 * was prior to the initial consistent point but might have got some later
73 * commits. Now, the tablesync worker will exit without doing anything for the
74 * prepared transaction skipped by the apply worker as the sync location for it
75 * will be already ahead of the apply worker's current location. This would lead
76 * to an "empty prepare", because later when the apply worker does the commit
77 * prepare, there is nothing in it (the inserts were skipped earlier).
78 *
79 * To avoid this, and similar prepare confusions the subscription's two_phase
80 * commit is enabled only after the initial sync is over. The two_phase option
81 * has been implemented as a tri-state with values DISABLED, PENDING, and
82 * ENABLED.
83 *
84 * Even if the user specifies they want a subscription with two_phase = on,
85 * internally it will start with a tri-state of PENDING which only becomes
86 * ENABLED after all tablesync initializations are completed - i.e. when all
87 * tablesync workers have reached their READY state. In other words, the value
88 * PENDING is only a temporary state for subscription start-up.
89 *
90 * Until the two_phase is properly available (ENABLED) the subscription will
91 * behave as if two_phase = off. When the apply worker detects that all
92 * tablesyncs have become READY (while the tri-state was PENDING) it will
93 * restart the apply worker process. This happens in
94 * process_syncing_tables_for_apply.
95 *
96 * When the (re-started) apply worker finds that all tablesyncs are READY for a
97 * two_phase tri-state of PENDING it start streaming messages with the
98 * two_phase option which in turn enables the decoding of two-phase commits at
99 * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100 * Now, it is possible that during the time we have not enabled two_phase, the
101 * publisher (replication server) would have skipped some prepares but we
102 * ensure that such prepares are sent along with commit prepare, see
103 * ReorderBufferFinishPrepared.
104 *
105 * If the subscription has no tables then a two_phase tri-state PENDING is
106 * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107 * PUBLICATION which might otherwise be disallowed (see below).
108 *
109 * If ever a user needs to be aware of the tri-state value, they can fetch it
110 * from the pg_subscription catalog (see column subtwophasestate).
111 *
112 * We don't allow to toggle two_phase option of a subscription because it can
113 * lead to an inconsistent replica. Consider, initially, it was on and we have
114 * received some prepare then we turn it off, now at commit time the server
115 * will send the entire transaction data along with the commit. With some more
116 * analysis, we can allow changing this option from off to on but not sure if
117 * that alone would be useful.
118 *
119 * Finally, to avoid problems mentioned in previous paragraphs from any
120 * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
121 * to 'off' and then again back to 'on') there is a restriction for
122 * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
123 * the two_phase tri-state is ENABLED, except when copy_data = false.
124 *
125 * We can get prepare of the same GID more than once for the genuine cases
126 * where we have defined multiple subscriptions for publications on the same
127 * server and prepared transaction has operations on tables subscribed to those
128 * subscriptions. For such cases, if we use the GID sent by publisher one of
129 * the prepares will be successful and others will fail, in which case the
130 * server will send them again. Now, this can lead to a deadlock if user has
131 * set synchronous_standby_names for all the subscriptions on subscriber. To
132 * avoid such deadlocks, we generate a unique GID (consisting of the
133 * subscription oid and the xid of the prepared transaction) for each prepare
134 * transaction on the subscriber.
135 *
136 * FAILOVER
137 * ----------------------
138 * The logical slot on the primary can be synced to the standby by specifying
139 * failover = true when creating the subscription. Enabling failover allows us
140 * to smoothly transition to the promoted standby, ensuring that we can
141 * subscribe to the new primary without losing any data.
142 *-------------------------------------------------------------------------
143 */
144
145#include "postgres.h"
146
147#include <sys/stat.h>
148#include <unistd.h>
149
150#include "access/table.h"
151#include "access/tableam.h"
152#include "access/twophase.h"
153#include "access/xact.h"
154#include "catalog/indexing.h"
155#include "catalog/pg_inherits.h"
158#include "commands/tablecmds.h"
159#include "commands/trigger.h"
160#include "executor/executor.h"
162#include "libpq/pqformat.h"
163#include "miscadmin.h"
164#include "optimizer/optimizer.h"
166#include "pgstat.h"
167#include "postmaster/bgworker.h"
168#include "postmaster/interrupt.h"
169#include "postmaster/walwriter.h"
170#include "replication/conflict.h"
175#include "replication/origin.h"
179#include "storage/buffile.h"
180#include "storage/ipc.h"
181#include "storage/lmgr.h"
182#include "tcop/tcopprot.h"
183#include "utils/acl.h"
184#include "utils/dynahash.h"
185#include "utils/guc.h"
186#include "utils/inval.h"
187#include "utils/lsyscache.h"
188#include "utils/memutils.h"
189#include "utils/pg_lsn.h"
190#include "utils/rel.h"
191#include "utils/rls.h"
192#include "utils/snapmgr.h"
193#include "utils/syscache.h"
194#include "utils/usercontext.h"
195
196#define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
197
198typedef struct FlushPosition
199{
204
206
207typedef struct ApplyExecutionData
208{
209 EState *estate; /* executor state, used to track resources */
210
211 LogicalRepRelMapEntry *targetRel; /* replication target rel */
212 ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
213
214 /* These fields are used when the target relation is partitioned: */
215 ModifyTableState *mtstate; /* dummy ModifyTable state */
216 PartitionTupleRouting *proute; /* partition routing info */
218
219/* Struct for saving and restoring apply errcontext information */
221{
222 LogicalRepMsgType command; /* 0 if invalid */
224
225 /* Remote node information */
226 int remote_attnum; /* -1 if invalid */
231
232/*
233 * The action to be taken for the changes in the transaction.
234 *
235 * TRANS_LEADER_APPLY:
236 * This action means that we are in the leader apply worker or table sync
237 * worker. The changes of the transaction are either directly applied or
238 * are read from temporary files (for streaming transactions) and then
239 * applied by the worker.
240 *
241 * TRANS_LEADER_SERIALIZE:
242 * This action means that we are in the leader apply worker or table sync
243 * worker. Changes are written to temporary files and then applied when the
244 * final commit arrives.
245 *
246 * TRANS_LEADER_SEND_TO_PARALLEL:
247 * This action means that we are in the leader apply worker and need to send
248 * the changes to the parallel apply worker.
249 *
250 * TRANS_LEADER_PARTIAL_SERIALIZE:
251 * This action means that we are in the leader apply worker and have sent some
252 * changes directly to the parallel apply worker and the remaining changes are
253 * serialized to a file, due to timeout while sending data. The parallel apply
254 * worker will apply these serialized changes when the final commit arrives.
255 *
256 * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
257 * serializing changes, the leader worker also needs to serialize the
258 * STREAM_XXX message to a file, and wait for the parallel apply worker to
259 * finish the transaction when processing the transaction finish command. So
260 * this new action was introduced to keep the code and logic clear.
261 *
262 * TRANS_PARALLEL_APPLY:
263 * This action means that we are in the parallel apply worker and changes of
264 * the transaction are applied directly by the worker.
265 */
266typedef enum
267{
268 /* The action for non-streaming transactions. */
270
271 /* Actions for streaming transactions. */
277
278/* errcontext tracker */
280{
281 .command = 0,
282 .rel = NULL,
283 .remote_attnum = -1,
284 .remote_xid = InvalidTransactionId,
285 .finish_lsn = InvalidXLogRecPtr,
286 .origin_name = NULL,
287};
288
290
293
294/* per stream context for streaming transactions */
296
298
300static bool MySubscriptionValid = false;
301
303
306
307/* fields valid only when processing streamed transaction */
308static bool in_streamed_transaction = false;
309
311
312/*
313 * The number of changes applied by parallel apply worker during one streaming
314 * block.
315 */
317
318/* Are we initializing an apply worker? */
320
321/*
322 * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
323 * the subscription if the remote transaction's finish LSN matches the subskiplsn.
324 * Once we start skipping changes, we don't stop it until we skip all changes of
325 * the transaction even if pg_subscription is updated and MySubscription->skiplsn
326 * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
327 * we don't skip receiving and spooling the changes since we decide whether or not
328 * to skip applying the changes when starting to apply changes. The subskiplsn is
329 * cleared after successfully skipping the transaction or applying non-empty
330 * transaction. The latter prevents the mistakenly specified subskiplsn from
331 * being left. Note that we cannot skip the streaming transactions when using
332 * parallel apply workers because we cannot get the finish LSN before applying
333 * the changes. So, we don't start parallel apply worker when finish LSN is set
334 * by the user.
335 */
337#define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
338
339/* BufFile handle of the current streaming file */
340static BufFile *stream_fd = NULL;
341
342typedef struct SubXactInfo
343{
344 TransactionId xid; /* XID of the subxact */
345 int fileno; /* file number in the buffile */
346 off_t offset; /* offset in the file */
348
349/* Sub-transaction data for the current streaming transaction */
350typedef struct ApplySubXactData
351{
352 uint32 nsubxacts; /* number of sub-transactions */
353 uint32 nsubxacts_max; /* current capacity of subxacts */
354 TransactionId subxact_last; /* xid of the last sub-transaction */
355 SubXactInfo *subxacts; /* sub-xact offset in changes file */
357
359
360static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
361static inline void changes_filename(char *path, Oid subid, TransactionId xid);
362
363/*
364 * Information about subtransactions of a given toplevel transaction.
365 */
366static void subxact_info_write(Oid subid, TransactionId xid);
367static void subxact_info_read(Oid subid, TransactionId xid);
368static void subxact_info_add(TransactionId xid);
369static inline void cleanup_subxact_info(void);
370
371/*
372 * Serialize and deserialize changes for a toplevel transaction.
373 */
374static void stream_open_file(Oid subid, TransactionId xid,
375 bool first_segment);
376static void stream_write_change(char action, StringInfo s);
378static void stream_close_file(void);
379
380static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
381
382static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
384 ResultRelInfo *relinfo,
385 TupleTableSlot *remoteslot);
387 ResultRelInfo *relinfo,
388 TupleTableSlot *remoteslot,
389 LogicalRepTupleData *newtup,
390 Oid localindexoid);
392 ResultRelInfo *relinfo,
393 TupleTableSlot *remoteslot,
394 Oid localindexoid);
395static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
396 LogicalRepRelation *remoterel,
397 Oid localidxoid,
398 TupleTableSlot *remoteslot,
399 TupleTableSlot **localslot);
401 TupleTableSlot *remoteslot,
402 LogicalRepTupleData *newtup,
403 CmdType operation);
404
405/* Functions for skipping changes */
406static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
407static void stop_skipping_changes(void);
408static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
409
410/* Functions for apply error callback */
411static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
412static inline void reset_apply_error_context_info(void);
413
416
417/*
418 * Form the origin name for the subscription.
419 *
420 * This is a common function for tablesync and other workers. Tablesync workers
421 * must pass a valid relid. Other callers must pass relid = InvalidOid.
422 *
423 * Return the name in the supplied buffer.
424 */
425void
427 char *originname, Size szoriginname)
428{
429 if (OidIsValid(relid))
430 {
431 /* Replication origin name for tablesync workers. */
432 snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
433 }
434 else
435 {
436 /* Replication origin name for non-tablesync workers. */
437 snprintf(originname, szoriginname, "pg_%u", suboid);
438 }
439}
440
441/*
442 * Should this worker apply changes for given relation.
443 *
444 * This is mainly needed for initial relation data sync as that runs in
445 * separate worker process running in parallel and we need some way to skip
446 * changes coming to the leader apply worker during the sync of a table.
447 *
448 * Note we need to do smaller or equals comparison for SYNCDONE state because
449 * it might hold position of end of initial slot consistent point WAL
450 * record + 1 (ie start of next record) and next record can be COMMIT of
451 * transaction we are now processing (which is what we set remote_final_lsn
452 * to in apply_handle_begin).
453 *
454 * Note that for streaming transactions that are being applied in the parallel
455 * apply worker, we disallow applying changes if the target table in the
456 * subscription is not in the READY state, because we cannot decide whether to
457 * apply the change as we won't know remote_final_lsn by that time.
458 *
459 * We already checked this in pa_can_start() before assigning the
460 * streaming transaction to the parallel worker, but it also needs to be
461 * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
462 * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
463 * while applying this transaction.
464 */
465static bool
467{
468 switch (MyLogicalRepWorker->type)
469 {
471 return MyLogicalRepWorker->relid == rel->localreloid;
472
474 /* We don't synchronize rel's that are in unknown state. */
475 if (rel->state != SUBREL_STATE_READY &&
476 rel->state != SUBREL_STATE_UNKNOWN)
478 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
479 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
481 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
482
483 return rel->state == SUBREL_STATE_READY;
484
485 case WORKERTYPE_APPLY:
486 return (rel->state == SUBREL_STATE_READY ||
487 (rel->state == SUBREL_STATE_SYNCDONE &&
488 rel->statelsn <= remote_final_lsn));
489
491 /* Should never happen. */
492 elog(ERROR, "Unknown worker type");
493 }
494
495 return false; /* dummy for compiler */
496}
497
498/*
499 * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
500 *
501 * Start a transaction, if this is the first step (else we keep using the
502 * existing transaction).
503 * Also provide a global snapshot and ensure we run in ApplyMessageContext.
504 */
505static void
507{
509
510 if (!IsTransactionState())
511 {
514 }
515
517
519}
520
521/*
522 * Finish up one step of a replication transaction.
523 * Callers of begin_replication_step() must also call this.
524 *
525 * We don't close out the transaction here, but we should increment
526 * the command counter to make the effects of this step visible.
527 */
528static void
530{
532
534}
535
536/*
537 * Handle streamed transactions for both the leader apply worker and the
538 * parallel apply workers.
539 *
540 * In the streaming case (receiving a block of the streamed transaction), for
541 * serialize mode, simply redirect it to a file for the proper toplevel
542 * transaction, and for parallel mode, the leader apply worker will send the
543 * changes to parallel apply workers and the parallel apply worker will define
544 * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
545 * messages will be applied by both leader apply worker and parallel apply
546 * workers).
547 *
548 * Returns true for streamed transactions (when the change is either serialized
549 * to file or sent to parallel apply worker), false otherwise (regular mode or
550 * needs to be processed by parallel apply worker).
551 *
552 * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
553 * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
554 * to a parallel apply worker.
555 */
556static bool
558{
559 TransactionId current_xid;
561 TransApplyAction apply_action;
562 StringInfoData original_msg;
563
564 apply_action = get_transaction_apply_action(stream_xid, &winfo);
565
566 /* not in streaming mode */
567 if (apply_action == TRANS_LEADER_APPLY)
568 return false;
569
571
572 /*
573 * The parallel apply worker needs the xid in this message to decide
574 * whether to define a savepoint, so save the original message that has
575 * not moved the cursor after the xid. We will serialize this message to a
576 * file in PARTIAL_SERIALIZE mode.
577 */
578 original_msg = *s;
579
580 /*
581 * We should have received XID of the subxact as the first part of the
582 * message, so extract it.
583 */
584 current_xid = pq_getmsgint(s, 4);
585
586 if (!TransactionIdIsValid(current_xid))
588 (errcode(ERRCODE_PROTOCOL_VIOLATION),
589 errmsg_internal("invalid transaction ID in streamed replication transaction")));
590
591 switch (apply_action)
592 {
595
596 /* Add the new subxact to the array (unless already there). */
597 subxact_info_add(current_xid);
598
599 /* Write the change to the current file */
601 return true;
602
604 Assert(winfo);
605
606 /*
607 * XXX The publisher side doesn't always send relation/type update
608 * messages after the streaming transaction, so also update the
609 * relation/type in leader apply worker. See function
610 * cleanup_rel_sync_cache.
611 */
612 if (pa_send_data(winfo, s->len, s->data))
613 return (action != LOGICAL_REP_MSG_RELATION &&
615
616 /*
617 * Switch to serialize mode when we are not able to send the
618 * change to parallel apply worker.
619 */
620 pa_switch_to_partial_serialize(winfo, false);
621
622 /* fall through */
624 stream_write_change(action, &original_msg);
625
626 /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
627 return (action != LOGICAL_REP_MSG_RELATION &&
629
632
633 /* Define a savepoint for a subxact if needed. */
634 pa_start_subtrans(current_xid, stream_xid);
635 return false;
636
637 default:
638 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
639 return false; /* silence compiler warning */
640 }
641}
642
643/*
644 * Executor state preparation for evaluation of constraint expressions,
645 * indexes and triggers for the specified relation.
646 *
647 * Note that the caller must open and close any indexes to be updated.
648 */
649static ApplyExecutionData *
651{
652 ApplyExecutionData *edata;
653 EState *estate;
654 RangeTblEntry *rte;
655 List *perminfos = NIL;
656 ResultRelInfo *resultRelInfo;
657
658 edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
659 edata->targetRel = rel;
660
661 edata->estate = estate = CreateExecutorState();
662
663 rte = makeNode(RangeTblEntry);
664 rte->rtekind = RTE_RELATION;
665 rte->relid = RelationGetRelid(rel->localrel);
666 rte->relkind = rel->localrel->rd_rel->relkind;
667 rte->rellockmode = AccessShareLock;
668
669 addRTEPermissionInfo(&perminfos, rte);
670
671 ExecInitRangeTable(estate, list_make1(rte), perminfos);
672
673 edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
674
675 /*
676 * Use Relation opened by logicalrep_rel_open() instead of opening it
677 * again.
678 */
679 InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
680
681 /*
682 * We put the ResultRelInfo in the es_opened_result_relations list, even
683 * though we don't populate the es_result_relations array. That's a bit
684 * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
685 *
686 * ExecOpenIndices() is not called here either, each execution path doing
687 * an apply operation being responsible for that.
688 */
690 lappend(estate->es_opened_result_relations, resultRelInfo);
691
692 estate->es_output_cid = GetCurrentCommandId(true);
693
694 /* Prepare to catch AFTER triggers. */
696
697 /* other fields of edata remain NULL for now */
698
699 return edata;
700}
701
702/*
703 * Finish any operations related to the executor state created by
704 * create_edata_for_relation().
705 */
706static void
708{
709 EState *estate = edata->estate;
710
711 /* Handle any queued AFTER triggers. */
712 AfterTriggerEndQuery(estate);
713
714 /* Shut down tuple routing, if any was done. */
715 if (edata->proute)
716 ExecCleanupTupleRouting(edata->mtstate, edata->proute);
717
718 /*
719 * Cleanup. It might seem that we should call ExecCloseResultRelations()
720 * here, but we intentionally don't. It would close the rel we added to
721 * es_opened_result_relations above, which is wrong because we took no
722 * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
723 * any other relations opened during execution.
724 */
725 ExecResetTupleTable(estate->es_tupleTable, false);
726 FreeExecutorState(estate);
727 pfree(edata);
728}
729
730/*
731 * Executes default values for columns for which we can't map to remote
732 * relation columns.
733 *
734 * This allows us to support tables which have more columns on the downstream
735 * than on the upstream.
736 */
737static void
739 TupleTableSlot *slot)
740{
742 int num_phys_attrs = desc->natts;
743 int i;
744 int attnum,
745 num_defaults = 0;
746 int *defmap;
747 ExprState **defexprs;
748 ExprContext *econtext;
749
750 econtext = GetPerTupleExprContext(estate);
751
752 /* We got all the data via replication, no need to evaluate anything. */
753 if (num_phys_attrs == rel->remoterel.natts)
754 return;
755
756 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
757 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
758
759 Assert(rel->attrmap->maplen == num_phys_attrs);
760 for (attnum = 0; attnum < num_phys_attrs; attnum++)
761 {
762 Expr *defexpr;
763
764 if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
765 continue;
766
767 if (rel->attrmap->attnums[attnum] >= 0)
768 continue;
769
770 defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
771
772 if (defexpr != NULL)
773 {
774 /* Run the expression through planner */
775 defexpr = expression_planner(defexpr);
776
777 /* Initialize executable expression in copycontext */
778 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
779 defmap[num_defaults] = attnum;
780 num_defaults++;
781 }
782 }
783
784 for (i = 0; i < num_defaults; i++)
785 slot->tts_values[defmap[i]] =
786 ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
787}
788
789/*
790 * Store tuple data into slot.
791 *
792 * Incoming data can be either text or binary format.
793 */
794static void
796 LogicalRepTupleData *tupleData)
797{
798 int natts = slot->tts_tupleDescriptor->natts;
799 int i;
800
801 ExecClearTuple(slot);
802
803 /* Call the "in" function for each non-dropped, non-null attribute */
804 Assert(natts == rel->attrmap->maplen);
805 for (i = 0; i < natts; i++)
806 {
808 int remoteattnum = rel->attrmap->attnums[i];
809
810 if (!att->attisdropped && remoteattnum >= 0)
811 {
812 StringInfo colvalue = &tupleData->colvalues[remoteattnum];
813
814 Assert(remoteattnum < tupleData->ncols);
815
816 /* Set attnum for error callback */
818
819 if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
820 {
821 Oid typinput;
822 Oid typioparam;
823
824 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
825 slot->tts_values[i] =
826 OidInputFunctionCall(typinput, colvalue->data,
827 typioparam, att->atttypmod);
828 slot->tts_isnull[i] = false;
829 }
830 else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
831 {
832 Oid typreceive;
833 Oid typioparam;
834
835 /*
836 * In some code paths we may be asked to re-parse the same
837 * tuple data. Reset the StringInfo's cursor so that works.
838 */
839 colvalue->cursor = 0;
840
841 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
842 slot->tts_values[i] =
843 OidReceiveFunctionCall(typreceive, colvalue,
844 typioparam, att->atttypmod);
845
846 /* Trouble if it didn't eat the whole buffer */
847 if (colvalue->cursor != colvalue->len)
849 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
850 errmsg("incorrect binary data format in logical replication column %d",
851 remoteattnum + 1)));
852 slot->tts_isnull[i] = false;
853 }
854 else
855 {
856 /*
857 * NULL value from remote. (We don't expect to see
858 * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
859 * NULL.)
860 */
861 slot->tts_values[i] = (Datum) 0;
862 slot->tts_isnull[i] = true;
863 }
864
865 /* Reset attnum for error callback */
867 }
868 else
869 {
870 /*
871 * We assign NULL to dropped attributes and missing values
872 * (missing values should be later filled using
873 * slot_fill_defaults).
874 */
875 slot->tts_values[i] = (Datum) 0;
876 slot->tts_isnull[i] = true;
877 }
878 }
879
881}
882
883/*
884 * Replace updated columns with data from the LogicalRepTupleData struct.
885 * This is somewhat similar to heap_modify_tuple but also calls the type
886 * input functions on the user data.
887 *
888 * "slot" is filled with a copy of the tuple in "srcslot", replacing
889 * columns provided in "tupleData" and leaving others as-is.
890 *
891 * Caution: unreplaced pass-by-ref columns in "slot" will point into the
892 * storage for "srcslot". This is OK for current usage, but someday we may
893 * need to materialize "slot" at the end to make it independent of "srcslot".
894 */
895static void
898 LogicalRepTupleData *tupleData)
899{
900 int natts = slot->tts_tupleDescriptor->natts;
901 int i;
902
903 /* We'll fill "slot" with a virtual tuple, so we must start with ... */
904 ExecClearTuple(slot);
905
906 /*
907 * Copy all the column data from srcslot, so that we'll have valid values
908 * for unreplaced columns.
909 */
910 Assert(natts == srcslot->tts_tupleDescriptor->natts);
911 slot_getallattrs(srcslot);
912 memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
913 memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
914
915 /* Call the "in" function for each replaced attribute */
916 Assert(natts == rel->attrmap->maplen);
917 for (i = 0; i < natts; i++)
918 {
920 int remoteattnum = rel->attrmap->attnums[i];
921
922 if (remoteattnum < 0)
923 continue;
924
925 Assert(remoteattnum < tupleData->ncols);
926
927 if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
928 {
929 StringInfo colvalue = &tupleData->colvalues[remoteattnum];
930
931 /* Set attnum for error callback */
933
934 if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
935 {
936 Oid typinput;
937 Oid typioparam;
938
939 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
940 slot->tts_values[i] =
941 OidInputFunctionCall(typinput, colvalue->data,
942 typioparam, att->atttypmod);
943 slot->tts_isnull[i] = false;
944 }
945 else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
946 {
947 Oid typreceive;
948 Oid typioparam;
949
950 /*
951 * In some code paths we may be asked to re-parse the same
952 * tuple data. Reset the StringInfo's cursor so that works.
953 */
954 colvalue->cursor = 0;
955
956 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
957 slot->tts_values[i] =
958 OidReceiveFunctionCall(typreceive, colvalue,
959 typioparam, att->atttypmod);
960
961 /* Trouble if it didn't eat the whole buffer */
962 if (colvalue->cursor != colvalue->len)
964 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
965 errmsg("incorrect binary data format in logical replication column %d",
966 remoteattnum + 1)));
967 slot->tts_isnull[i] = false;
968 }
969 else
970 {
971 /* must be LOGICALREP_COLUMN_NULL */
972 slot->tts_values[i] = (Datum) 0;
973 slot->tts_isnull[i] = true;
974 }
975
976 /* Reset attnum for error callback */
978 }
979 }
980
981 /* And finally, declare that "slot" contains a valid virtual tuple */
983}
984
985/*
986 * Handle BEGIN message.
987 */
988static void
990{
991 LogicalRepBeginData begin_data;
992
993 /* There must not be an active streaming transaction. */
995
996 logicalrep_read_begin(s, &begin_data);
997 set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
998
999 remote_final_lsn = begin_data.final_lsn;
1000
1002
1003 in_remote_transaction = true;
1004
1006}
1007
1008/*
1009 * Handle COMMIT message.
1010 *
1011 * TODO, support tracking of multiple origins
1012 */
1013static void
1015{
1016 LogicalRepCommitData commit_data;
1017
1018 logicalrep_read_commit(s, &commit_data);
1019
1020 if (commit_data.commit_lsn != remote_final_lsn)
1021 ereport(ERROR,
1022 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1023 errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
1024 LSN_FORMAT_ARGS(commit_data.commit_lsn),
1026
1027 apply_handle_commit_internal(&commit_data);
1028
1029 /* Process any tables that are being synchronized in parallel. */
1030 process_syncing_tables(commit_data.end_lsn);
1031
1034}
1035
1036/*
1037 * Handle BEGIN PREPARE message.
1038 */
1039static void
1041{
1042 LogicalRepPreparedTxnData begin_data;
1043
1044 /* Tablesync should never receive prepare. */
1045 if (am_tablesync_worker())
1046 ereport(ERROR,
1047 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1048 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1049
1050 /* There must not be an active streaming transaction. */
1052
1053 logicalrep_read_begin_prepare(s, &begin_data);
1054 set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1055
1056 remote_final_lsn = begin_data.prepare_lsn;
1057
1059
1060 in_remote_transaction = true;
1061
1063}
1064
1065/*
1066 * Common function to prepare the GID.
1067 */
1068static void
1070{
1071 char gid[GIDSIZE];
1072
1073 /*
1074 * Compute unique GID for two_phase transactions. We don't use GID of
1075 * prepared transaction sent by server as that can lead to deadlock when
1076 * we have multiple subscriptions from same node point to publications on
1077 * the same node. See comments atop worker.c
1078 */
1080 gid, sizeof(gid));
1081
1082 /*
1083 * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1084 * called within the PrepareTransactionBlock below.
1085 */
1086 if (!IsTransactionBlock())
1087 {
1089 CommitTransactionCommand(); /* Completes the preceding Begin command. */
1090 }
1091
1092 /*
1093 * Update origin state so we can restart streaming from correct position
1094 * in case of crash.
1095 */
1096 replorigin_session_origin_lsn = prepare_data->end_lsn;
1098
1100}
1101
1102/*
1103 * Handle PREPARE message.
1104 */
1105static void
1107{
1108 LogicalRepPreparedTxnData prepare_data;
1109
1110 logicalrep_read_prepare(s, &prepare_data);
1111
1112 if (prepare_data.prepare_lsn != remote_final_lsn)
1113 ereport(ERROR,
1114 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1115 errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
1116 LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1118
1119 /*
1120 * Unlike commit, here, we always prepare the transaction even though no
1121 * change has happened in this transaction or all changes are skipped. It
1122 * is done this way because at commit prepared time, we won't know whether
1123 * we have skipped preparing a transaction because of those reasons.
1124 *
1125 * XXX, We can optimize such that at commit prepared time, we first check
1126 * whether we have prepared the transaction or not but that doesn't seem
1127 * worthwhile because such cases shouldn't be common.
1128 */
1130
1131 apply_handle_prepare_internal(&prepare_data);
1132
1135 pgstat_report_stat(false);
1136
1137 /*
1138 * It is okay not to set the local_end LSN for the prepare because we
1139 * always flush the prepare record. So, we can send the acknowledgment of
1140 * the remote_end LSN as soon as prepare is finished.
1141 *
1142 * XXX For the sake of consistency with commit, we could have set it with
1143 * the LSN of prepare but as of now we don't track that value similar to
1144 * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
1145 * it.
1146 */
1148
1149 in_remote_transaction = false;
1150
1151 /* Process any tables that are being synchronized in parallel. */
1152 process_syncing_tables(prepare_data.end_lsn);
1153
1154 /*
1155 * Since we have already prepared the transaction, in a case where the
1156 * server crashes before clearing the subskiplsn, it will be left but the
1157 * transaction won't be resent. But that's okay because it's a rare case
1158 * and the subskiplsn will be cleared when finishing the next transaction.
1159 */
1162
1165}
1166
1167/*
1168 * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1169 *
1170 * Note that we don't need to wait here if the transaction was prepared in a
1171 * parallel apply worker. In that case, we have already waited for the prepare
1172 * to finish in apply_handle_stream_prepare() which will ensure all the
1173 * operations in that transaction have happened in the subscriber, so no
1174 * concurrent transaction can cause deadlock or transaction dependency issues.
1175 */
1176static void
1178{
1180 char gid[GIDSIZE];
1181
1182 logicalrep_read_commit_prepared(s, &prepare_data);
1183 set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1184
1185 /* Compute GID for two_phase transactions. */
1187 gid, sizeof(gid));
1188
1189 /* There is no transaction when COMMIT PREPARED is called */
1191
1192 /*
1193 * Update origin state so we can restart streaming from correct position
1194 * in case of crash.
1195 */
1198
1199 FinishPreparedTransaction(gid, true);
1202 pgstat_report_stat(false);
1203
1205 in_remote_transaction = false;
1206
1207 /* Process any tables that are being synchronized in parallel. */
1208 process_syncing_tables(prepare_data.end_lsn);
1209
1211
1214}
1215
1216/*
1217 * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1218 *
1219 * Note that we don't need to wait here if the transaction was prepared in a
1220 * parallel apply worker. In that case, we have already waited for the prepare
1221 * to finish in apply_handle_stream_prepare() which will ensure all the
1222 * operations in that transaction have happened in the subscriber, so no
1223 * concurrent transaction can cause deadlock or transaction dependency issues.
1224 */
1225static void
1227{
1229 char gid[GIDSIZE];
1230
1231 logicalrep_read_rollback_prepared(s, &rollback_data);
1232 set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1233
1234 /* Compute GID for two_phase transactions. */
1236 gid, sizeof(gid));
1237
1238 /*
1239 * It is possible that we haven't received prepare because it occurred
1240 * before walsender reached a consistent point or the two_phase was still
1241 * not enabled by that time, so in such cases, we need to skip rollback
1242 * prepared.
1243 */
1244 if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1245 rollback_data.prepare_time))
1246 {
1247 /*
1248 * Update origin state so we can restart streaming from correct
1249 * position in case of crash.
1250 */
1253
1254 /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1256 FinishPreparedTransaction(gid, false);
1259
1261 }
1262
1263 pgstat_report_stat(false);
1264
1265 /*
1266 * It is okay not to set the local_end LSN for the rollback of prepared
1267 * transaction because we always flush the WAL record for it. See
1268 * apply_handle_prepare.
1269 */
1271 in_remote_transaction = false;
1272
1273 /* Process any tables that are being synchronized in parallel. */
1275
1278}
1279
1280/*
1281 * Handle STREAM PREPARE.
1282 */
1283static void
1285{
1286 LogicalRepPreparedTxnData prepare_data;
1288 TransApplyAction apply_action;
1289
1290 /* Save the message before it is consumed. */
1291 StringInfoData original_msg = *s;
1292
1294 ereport(ERROR,
1295 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1296 errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1297
1298 /* Tablesync should never receive prepare. */
1299 if (am_tablesync_worker())
1300 ereport(ERROR,
1301 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1302 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1303
1304 logicalrep_read_stream_prepare(s, &prepare_data);
1305 set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1306
1307 apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1308
1309 switch (apply_action)
1310 {
1311 case TRANS_LEADER_APPLY:
1312
1313 /*
1314 * The transaction has been serialized to file, so replay all the
1315 * spooled operations.
1316 */
1318 prepare_data.xid, prepare_data.prepare_lsn);
1319
1320 /* Mark the transaction as prepared. */
1321 apply_handle_prepare_internal(&prepare_data);
1322
1324
1325 /*
1326 * It is okay not to set the local_end LSN for the prepare because
1327 * we always flush the prepare record. See apply_handle_prepare.
1328 */
1330
1331 in_remote_transaction = false;
1332
1333 /* Unlink the files with serialized changes and subxact info. */
1335
1336 elog(DEBUG1, "finished processing the STREAM PREPARE command");
1337 break;
1338
1340 Assert(winfo);
1341
1342 if (pa_send_data(winfo, s->len, s->data))
1343 {
1344 /* Finish processing the streaming transaction. */
1345 pa_xact_finish(winfo, prepare_data.end_lsn);
1346 break;
1347 }
1348
1349 /*
1350 * Switch to serialize mode when we are not able to send the
1351 * change to parallel apply worker.
1352 */
1353 pa_switch_to_partial_serialize(winfo, true);
1354
1355 /* fall through */
1357 Assert(winfo);
1358
1359 stream_open_and_write_change(prepare_data.xid,
1361 &original_msg);
1362
1364
1365 /* Finish processing the streaming transaction. */
1366 pa_xact_finish(winfo, prepare_data.end_lsn);
1367 break;
1368
1370
1371 /*
1372 * If the parallel apply worker is applying spooled messages then
1373 * close the file before preparing.
1374 */
1375 if (stream_fd)
1377
1379
1380 /* Mark the transaction as prepared. */
1381 apply_handle_prepare_internal(&prepare_data);
1382
1384
1386
1387 /*
1388 * It is okay not to set the local_end LSN for the prepare because
1389 * we always flush the prepare record. See apply_handle_prepare.
1390 */
1392
1395
1397
1398 elog(DEBUG1, "finished processing the STREAM PREPARE command");
1399 break;
1400
1401 default:
1402 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1403 break;
1404 }
1405
1406 pgstat_report_stat(false);
1407
1408 /* Process any tables that are being synchronized in parallel. */
1409 process_syncing_tables(prepare_data.end_lsn);
1410
1411 /*
1412 * Similar to prepare case, the subskiplsn could be left in a case of
1413 * server crash but it's okay. See the comments in apply_handle_prepare().
1414 */
1417
1419
1421}
1422
1423/*
1424 * Handle ORIGIN message.
1425 *
1426 * TODO, support tracking of multiple origins
1427 */
1428static void
1430{
1431 /*
1432 * ORIGIN message can only come inside streaming transaction or inside
1433 * remote transaction and before any actual writes.
1434 */
1438 ereport(ERROR,
1439 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1440 errmsg_internal("ORIGIN message sent out of order")));
1441}
1442
1443/*
1444 * Initialize fileset (if not already done).
1445 *
1446 * Create a new file when first_segment is true, otherwise open the existing
1447 * file.
1448 */
1449void
1450stream_start_internal(TransactionId xid, bool first_segment)
1451{
1453
1454 /*
1455 * Initialize the worker's stream_fileset if we haven't yet. This will be
1456 * used for the entire duration of the worker so create it in a permanent
1457 * context. We create this on the very first streaming message from any
1458 * transaction and then use it for this and other streaming transactions.
1459 * Now, we could create a fileset at the start of the worker as well but
1460 * then we won't be sure that it will ever be used.
1461 */
1463 {
1464 MemoryContext oldctx;
1465
1467
1470
1471 MemoryContextSwitchTo(oldctx);
1472 }
1473
1474 /* Open the spool file for this transaction. */
1475 stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1476
1477 /* If this is not the first segment, open existing subxact file. */
1478 if (!first_segment)
1480
1482}
1483
1484/*
1485 * Handle STREAM START message.
1486 */
1487static void
1489{
1490 bool first_segment;
1492 TransApplyAction apply_action;
1493
1494 /* Save the message before it is consumed. */
1495 StringInfoData original_msg = *s;
1496
1498 ereport(ERROR,
1499 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1500 errmsg_internal("duplicate STREAM START message")));
1501
1502 /* There must not be an active streaming transaction. */
1504
1505 /* notify handle methods we're processing a remote transaction */
1507
1508 /* extract XID of the top-level transaction */
1509 stream_xid = logicalrep_read_stream_start(s, &first_segment);
1510
1512 ereport(ERROR,
1513 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1514 errmsg_internal("invalid transaction ID in streamed replication transaction")));
1515
1517
1518 /* Try to allocate a worker for the streaming transaction. */
1519 if (first_segment)
1521
1522 apply_action = get_transaction_apply_action(stream_xid, &winfo);
1523
1524 switch (apply_action)
1525 {
1527
1528 /*
1529 * Function stream_start_internal starts a transaction. This
1530 * transaction will be committed on the stream stop unless it is a
1531 * tablesync worker in which case it will be committed after
1532 * processing all the messages. We need this transaction for
1533 * handling the BufFile, used for serializing the streaming data
1534 * and subxact info.
1535 */
1536 stream_start_internal(stream_xid, first_segment);
1537 break;
1538
1540 Assert(winfo);
1541
1542 /*
1543 * Once we start serializing the changes, the parallel apply
1544 * worker will wait for the leader to release the stream lock
1545 * until the end of the transaction. So, we don't need to release
1546 * the lock or increment the stream count in that case.
1547 */
1548 if (pa_send_data(winfo, s->len, s->data))
1549 {
1550 /*
1551 * Unlock the shared object lock so that the parallel apply
1552 * worker can continue to receive changes.
1553 */
1554 if (!first_segment)
1556
1557 /*
1558 * Increment the number of streaming blocks waiting to be
1559 * processed by parallel apply worker.
1560 */
1562
1563 /* Cache the parallel apply worker for this transaction. */
1565 break;
1566 }
1567
1568 /*
1569 * Switch to serialize mode when we are not able to send the
1570 * change to parallel apply worker.
1571 */
1572 pa_switch_to_partial_serialize(winfo, !first_segment);
1573
1574 /* fall through */
1576 Assert(winfo);
1577
1578 /*
1579 * Open the spool file unless it was already opened when switching
1580 * to serialize mode. The transaction started in
1581 * stream_start_internal will be committed on the stream stop.
1582 */
1583 if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1584 stream_start_internal(stream_xid, first_segment);
1585
1587
1588 /* Cache the parallel apply worker for this transaction. */
1590 break;
1591
1593 if (first_segment)
1594 {
1595 /* Hold the lock until the end of the transaction. */
1598
1599 /*
1600 * Signal the leader apply worker, as it may be waiting for
1601 * us.
1602 */
1604 }
1605
1607 break;
1608
1609 default:
1610 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1611 break;
1612 }
1613
1615}
1616
1617/*
1618 * Update the information about subxacts and close the file.
1619 *
1620 * This function should be called when the stream_start_internal function has
1621 * been called.
1622 */
1623void
1625{
1626 /*
1627 * Serialize information about subxacts for the toplevel transaction, then
1628 * close the stream messages spool file.
1629 */
1632
1633 /* We must be in a valid transaction state */
1635
1636 /* Commit the per-stream transaction */
1638
1639 /* Reset per-stream context */
1641}
1642
1643/*
1644 * Handle STREAM STOP message.
1645 */
1646static void
1648{
1650 TransApplyAction apply_action;
1651
1653 ereport(ERROR,
1654 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1655 errmsg_internal("STREAM STOP message without STREAM START")));
1656
1657 apply_action = get_transaction_apply_action(stream_xid, &winfo);
1658
1659 switch (apply_action)
1660 {
1663 break;
1664
1666 Assert(winfo);
1667
1668 /*
1669 * Lock before sending the STREAM_STOP message so that the leader
1670 * can hold the lock first and the parallel apply worker will wait
1671 * for leader to release the lock. See Locking Considerations atop
1672 * applyparallelworker.c.
1673 */
1675
1676 if (pa_send_data(winfo, s->len, s->data))
1677 {
1679 break;
1680 }
1681
1682 /*
1683 * Switch to serialize mode when we are not able to send the
1684 * change to parallel apply worker.
1685 */
1686 pa_switch_to_partial_serialize(winfo, true);
1687
1688 /* fall through */
1693 break;
1694
1696 elog(DEBUG1, "applied %u changes in the streaming chunk",
1698
1699 /*
1700 * By the time parallel apply worker is processing the changes in
1701 * the current streaming block, the leader apply worker may have
1702 * sent multiple streaming blocks. This can lead to parallel apply
1703 * worker start waiting even when there are more chunk of streams
1704 * in the queue. So, try to lock only if there is no message left
1705 * in the queue. See Locking Considerations atop
1706 * applyparallelworker.c.
1707 *
1708 * Note that here we have a race condition where we can start
1709 * waiting even when there are pending streaming chunks. This can
1710 * happen if the leader sends another streaming block and acquires
1711 * the stream lock again after the parallel apply worker checks
1712 * that there is no pending streaming block and before it actually
1713 * starts waiting on a lock. We can handle this case by not
1714 * allowing the leader to increment the stream block count during
1715 * the time parallel apply worker acquires the lock but it is not
1716 * clear whether that is worth the complexity.
1717 *
1718 * Now, if this missed chunk contains rollback to savepoint, then
1719 * there is a risk of deadlock which probably shouldn't happen
1720 * after restart.
1721 */
1723 break;
1724
1725 default:
1726 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1727 break;
1728 }
1729
1732
1733 /*
1734 * The parallel apply worker could be in a transaction in which case we
1735 * need to report the state as STATE_IDLEINTRANSACTION.
1736 */
1739 else
1741
1743}
1744
1745/*
1746 * Helper function to handle STREAM ABORT message when the transaction was
1747 * serialized to file.
1748 */
1749static void
1751{
1752 /*
1753 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1754 * just delete the files with serialized info.
1755 */
1756 if (xid == subxid)
1758 else
1759 {
1760 /*
1761 * OK, so it's a subxact. We need to read the subxact file for the
1762 * toplevel transaction, determine the offset tracked for the subxact,
1763 * and truncate the file with changes. We also remove the subxacts
1764 * with higher offsets (or rather higher XIDs).
1765 *
1766 * We intentionally scan the array from the tail, because we're likely
1767 * aborting a change for the most recent subtransactions.
1768 *
1769 * We can't use the binary search here as subxact XIDs won't
1770 * necessarily arrive in sorted order, consider the case where we have
1771 * released the savepoint for multiple subtransactions and then
1772 * performed rollback to savepoint for one of the earlier
1773 * sub-transaction.
1774 */
1775 int64 i;
1776 int64 subidx;
1777 BufFile *fd;
1778 bool found = false;
1779 char path[MAXPGPATH];
1780
1781 subidx = -1;
1784
1785 for (i = subxact_data.nsubxacts; i > 0; i--)
1786 {
1787 if (subxact_data.subxacts[i - 1].xid == subxid)
1788 {
1789 subidx = (i - 1);
1790 found = true;
1791 break;
1792 }
1793 }
1794
1795 /*
1796 * If it's an empty sub-transaction then we will not find the subxid
1797 * here so just cleanup the subxact info and return.
1798 */
1799 if (!found)
1800 {
1801 /* Cleanup the subxact info */
1805 return;
1806 }
1807
1808 /* open the changes file */
1811 O_RDWR, false);
1812
1813 /* OK, truncate the file at the right offset */
1815 subxact_data.subxacts[subidx].offset);
1817
1818 /* discard the subxacts added later */
1819 subxact_data.nsubxacts = subidx;
1820
1821 /* write the updated subxact list */
1823
1826 }
1827}
1828
1829/*
1830 * Handle STREAM ABORT message.
1831 */
1832static void
1834{
1835 TransactionId xid;
1836 TransactionId subxid;
1837 LogicalRepStreamAbortData abort_data;
1839 TransApplyAction apply_action;
1840
1841 /* Save the message before it is consumed. */
1842 StringInfoData original_msg = *s;
1843 bool toplevel_xact;
1844
1846 ereport(ERROR,
1847 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1848 errmsg_internal("STREAM ABORT message without STREAM STOP")));
1849
1850 /* We receive abort information only when we can apply in parallel. */
1851 logicalrep_read_stream_abort(s, &abort_data,
1853
1854 xid = abort_data.xid;
1855 subxid = abort_data.subxid;
1856 toplevel_xact = (xid == subxid);
1857
1858 set_apply_error_context_xact(subxid, abort_data.abort_lsn);
1859
1860 apply_action = get_transaction_apply_action(xid, &winfo);
1861
1862 switch (apply_action)
1863 {
1864 case TRANS_LEADER_APPLY:
1865
1866 /*
1867 * We are in the leader apply worker and the transaction has been
1868 * serialized to file.
1869 */
1870 stream_abort_internal(xid, subxid);
1871
1872 elog(DEBUG1, "finished processing the STREAM ABORT command");
1873 break;
1874
1876 Assert(winfo);
1877
1878 /*
1879 * For the case of aborting the subtransaction, we increment the
1880 * number of streaming blocks and take the lock again before
1881 * sending the STREAM_ABORT to ensure that the parallel apply
1882 * worker will wait on the lock for the next set of changes after
1883 * processing the STREAM_ABORT message if it is not already
1884 * waiting for STREAM_STOP message.
1885 *
1886 * It is important to perform this locking before sending the
1887 * STREAM_ABORT message so that the leader can hold the lock first
1888 * and the parallel apply worker will wait for the leader to
1889 * release the lock. This is the same as what we do in
1890 * apply_handle_stream_stop. See Locking Considerations atop
1891 * applyparallelworker.c.
1892 */
1893 if (!toplevel_xact)
1894 {
1898 }
1899
1900 if (pa_send_data(winfo, s->len, s->data))
1901 {
1902 /*
1903 * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
1904 * wait here for the parallel apply worker to finish as that
1905 * is not required to maintain the commit order and won't have
1906 * the risk of failures due to transaction dependencies and
1907 * deadlocks. However, it is possible that before the parallel
1908 * worker finishes and we clear the worker info, the xid
1909 * wraparound happens on the upstream and a new transaction
1910 * with the same xid can appear and that can lead to duplicate
1911 * entries in ParallelApplyTxnHash. Yet another problem could
1912 * be that we may have serialized the changes in partial
1913 * serialize mode and the file containing xact changes may
1914 * already exist, and after xid wraparound trying to create
1915 * the file for the same xid can lead to an error. To avoid
1916 * these problems, we decide to wait for the aborts to finish.
1917 *
1918 * Note, it is okay to not update the flush location position
1919 * for aborts as in worst case that means such a transaction
1920 * won't be sent again after restart.
1921 */
1922 if (toplevel_xact)
1924
1925 break;
1926 }
1927
1928 /*
1929 * Switch to serialize mode when we are not able to send the
1930 * change to parallel apply worker.
1931 */
1932 pa_switch_to_partial_serialize(winfo, true);
1933
1934 /* fall through */
1936 Assert(winfo);
1937
1938 /*
1939 * Parallel apply worker might have applied some changes, so write
1940 * the STREAM_ABORT message so that it can rollback the
1941 * subtransaction if needed.
1942 */
1944 &original_msg);
1945
1946 if (toplevel_xact)
1947 {
1950 }
1951 break;
1952
1954
1955 /*
1956 * If the parallel apply worker is applying spooled messages then
1957 * close the file before aborting.
1958 */
1959 if (toplevel_xact && stream_fd)
1961
1962 pa_stream_abort(&abort_data);
1963
1964 /*
1965 * We need to wait after processing rollback to savepoint for the
1966 * next set of changes.
1967 *
1968 * We have a race condition here due to which we can start waiting
1969 * here when there are more chunk of streams in the queue. See
1970 * apply_handle_stream_stop.
1971 */
1972 if (!toplevel_xact)
1974
1975 elog(DEBUG1, "finished processing the STREAM ABORT command");
1976 break;
1977
1978 default:
1979 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1980 break;
1981 }
1982
1984}
1985
1986/*
1987 * Ensure that the passed location is fileset's end.
1988 */
1989static void
1990ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
1991 off_t offset)
1992{
1993 char path[MAXPGPATH];
1994 BufFile *fd;
1995 int last_fileno;
1996 off_t last_offset;
1997
1999
2001
2003
2004 fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2005
2006 BufFileSeek(fd, 0, 0, SEEK_END);
2007 BufFileTell(fd, &last_fileno, &last_offset);
2008
2010
2012
2013 if (last_fileno != fileno || last_offset != offset)
2014 elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2015 path);
2016}
2017
2018/*
2019 * Common spoolfile processing.
2020 */
2021void
2023 XLogRecPtr lsn)
2024{
2025 int nchanges;
2026 char path[MAXPGPATH];
2027 char *buffer = NULL;
2028 MemoryContext oldcxt;
2029 ResourceOwner oldowner;
2030 int fileno;
2031 off_t offset;
2032
2035
2036 /* Make sure we have an open transaction */
2038
2039 /*
2040 * Allocate file handle and memory required to process all the messages in
2041 * TopTransactionContext to avoid them getting reset after each message is
2042 * processed.
2043 */
2045
2046 /* Open the spool file for the committed/prepared transaction */
2048 elog(DEBUG1, "replaying changes from file \"%s\"", path);
2049
2050 /*
2051 * Make sure the file is owned by the toplevel transaction so that the
2052 * file will not be accidentally closed when aborting a subtransaction.
2053 */
2054 oldowner = CurrentResourceOwner;
2056
2057 stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2058
2059 CurrentResourceOwner = oldowner;
2060
2061 buffer = palloc(BLCKSZ);
2062
2063 MemoryContextSwitchTo(oldcxt);
2064
2065 remote_final_lsn = lsn;
2066
2067 /*
2068 * Make sure the handle apply_dispatch methods are aware we're in a remote
2069 * transaction.
2070 */
2071 in_remote_transaction = true;
2073
2075
2076 /*
2077 * Read the entries one by one and pass them through the same logic as in
2078 * apply_dispatch.
2079 */
2080 nchanges = 0;
2081 while (true)
2082 {
2084 size_t nbytes;
2085 int len;
2086
2088
2089 /* read length of the on-disk record */
2090 nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2091
2092 /* have we reached end of the file? */
2093 if (nbytes == 0)
2094 break;
2095
2096 /* do we have a correct length? */
2097 if (len <= 0)
2098 elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2099 len, path);
2100
2101 /* make sure we have sufficiently large buffer */
2102 buffer = repalloc(buffer, len);
2103
2104 /* and finally read the data into the buffer */
2105 BufFileReadExact(stream_fd, buffer, len);
2106
2107 BufFileTell(stream_fd, &fileno, &offset);
2108
2109 /* init a stringinfo using the buffer and call apply_dispatch */
2110 initReadOnlyStringInfo(&s2, buffer, len);
2111
2112 /* Ensure we are reading the data into our memory context. */
2114
2116
2118
2119 MemoryContextSwitchTo(oldcxt);
2120
2121 nchanges++;
2122
2123 /*
2124 * It is possible the file has been closed because we have processed
2125 * the transaction end message like stream_commit in which case that
2126 * must be the last message.
2127 */
2128 if (!stream_fd)
2129 {
2130 ensure_last_message(stream_fileset, xid, fileno, offset);
2131 break;
2132 }
2133
2134 if (nchanges % 1000 == 0)
2135 elog(DEBUG1, "replayed %d changes from file \"%s\"",
2136 nchanges, path);
2137 }
2138
2139 if (stream_fd)
2141
2142 elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2143 nchanges, path);
2144
2145 return;
2146}
2147
2148/*
2149 * Handle STREAM COMMIT message.
2150 */
2151static void
2153{
2154 TransactionId xid;
2155 LogicalRepCommitData commit_data;
2157 TransApplyAction apply_action;
2158
2159 /* Save the message before it is consumed. */
2160 StringInfoData original_msg = *s;
2161
2163 ereport(ERROR,
2164 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2165 errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2166
2167 xid = logicalrep_read_stream_commit(s, &commit_data);
2168 set_apply_error_context_xact(xid, commit_data.commit_lsn);
2169
2170 apply_action = get_transaction_apply_action(xid, &winfo);
2171
2172 switch (apply_action)
2173 {
2174 case TRANS_LEADER_APPLY:
2175
2176 /*
2177 * The transaction has been serialized to file, so replay all the
2178 * spooled operations.
2179 */
2181 commit_data.commit_lsn);
2182
2183 apply_handle_commit_internal(&commit_data);
2184
2185 /* Unlink the files with serialized changes and subxact info. */
2187
2188 elog(DEBUG1, "finished processing the STREAM COMMIT command");
2189 break;
2190
2192 Assert(winfo);
2193
2194 if (pa_send_data(winfo, s->len, s->data))
2195 {
2196 /* Finish processing the streaming transaction. */
2197 pa_xact_finish(winfo, commit_data.end_lsn);
2198 break;
2199 }
2200
2201 /*
2202 * Switch to serialize mode when we are not able to send the
2203 * change to parallel apply worker.
2204 */
2205 pa_switch_to_partial_serialize(winfo, true);
2206
2207 /* fall through */
2209 Assert(winfo);
2210
2212 &original_msg);
2213
2215
2216 /* Finish processing the streaming transaction. */
2217 pa_xact_finish(winfo, commit_data.end_lsn);
2218 break;
2219
2221
2222 /*
2223 * If the parallel apply worker is applying spooled messages then
2224 * close the file before committing.
2225 */
2226 if (stream_fd)
2228
2229 apply_handle_commit_internal(&commit_data);
2230
2232
2233 /*
2234 * It is important to set the transaction state as finished before
2235 * releasing the lock. See pa_wait_for_xact_finish.
2236 */
2239
2241
2242 elog(DEBUG1, "finished processing the STREAM COMMIT command");
2243 break;
2244
2245 default:
2246 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2247 break;
2248 }
2249
2250 /* Process any tables that are being synchronized in parallel. */
2251 process_syncing_tables(commit_data.end_lsn);
2252
2254
2256}
2257
2258/*
2259 * Helper function for apply_handle_commit and apply_handle_stream_commit.
2260 */
2261static void
2263{
2264 if (is_skipping_changes())
2265 {
2267
2268 /*
2269 * Start a new transaction to clear the subskiplsn, if not started
2270 * yet.
2271 */
2272 if (!IsTransactionState())
2274 }
2275
2276 if (IsTransactionState())
2277 {
2278 /*
2279 * The transaction is either non-empty or skipped, so we clear the
2280 * subskiplsn.
2281 */
2283
2284 /*
2285 * Update origin state so we can restart streaming from correct
2286 * position in case of crash.
2287 */
2290
2292
2293 if (IsTransactionBlock())
2294 {
2295 EndTransactionBlock(false);
2297 }
2298
2299 pgstat_report_stat(false);
2300
2302 }
2303 else
2304 {
2305 /* Process any invalidation messages that might have accumulated. */
2308 }
2309
2310 in_remote_transaction = false;
2311}
2312
2313/*
2314 * Handle RELATION message.
2315 *
2316 * Note we don't do validation against local schema here. The validation
2317 * against local schema is postponed until first change for given relation
2318 * comes as we only care about it when applying changes for it anyway and we
2319 * do less locking this way.
2320 */
2321static void
2323{
2324 LogicalRepRelation *rel;
2325
2327 return;
2328
2329 rel = logicalrep_read_rel(s);
2331
2332 /* Also reset all entries in the partition map that refer to remoterel. */
2334}
2335
2336/*
2337 * Handle TYPE message.
2338 *
2339 * This implementation pays no attention to TYPE messages; we expect the user
2340 * to have set things up so that the incoming data is acceptable to the input
2341 * functions for the locally subscribed tables. Hence, we just read and
2342 * discard the message.
2343 */
2344static void
2346{
2347 LogicalRepTyp typ;
2348
2350 return;
2351
2352 logicalrep_read_typ(s, &typ);
2353}
2354
2355/*
2356 * Check that we (the subscription owner) have sufficient privileges on the
2357 * target relation to perform the given operation.
2358 */
2359static void
2361{
2362 Oid relid;
2363 AclResult aclresult;
2364
2365 relid = RelationGetRelid(rel);
2366 aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2367 if (aclresult != ACLCHECK_OK)
2368 aclcheck_error(aclresult,
2369 get_relkind_objtype(rel->rd_rel->relkind),
2370 get_rel_name(relid));
2371
2372 /*
2373 * We lack the infrastructure to honor RLS policies. It might be possible
2374 * to add such infrastructure here, but tablesync workers lack it, too, so
2375 * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2376 * but it seems dangerous to replicate a TRUNCATE and then refuse to
2377 * replicate subsequent INSERTs, so we forbid all commands the same.
2378 */
2379 if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2380 ereport(ERROR,
2381 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2382 errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2385}
2386
2387/*
2388 * Handle INSERT message.
2389 */
2390
2391static void
2393{
2395 LogicalRepTupleData newtup;
2396 LogicalRepRelId relid;
2397 UserContext ucxt;
2398 ApplyExecutionData *edata;
2399 EState *estate;
2400 TupleTableSlot *remoteslot;
2401 MemoryContext oldctx;
2402 bool run_as_owner;
2403
2404 /*
2405 * Quick return if we are skipping data modification changes or handling
2406 * streamed transactions.
2407 */
2408 if (is_skipping_changes() ||
2410 return;
2411
2413
2414 relid = logicalrep_read_insert(s, &newtup);
2417 {
2418 /*
2419 * The relation can't become interesting in the middle of the
2420 * transaction so it's safe to unlock it.
2421 */
2424 return;
2425 }
2426
2427 /*
2428 * Make sure that any user-supplied code runs as the table owner, unless
2429 * the user has opted out of that behavior.
2430 */
2431 run_as_owner = MySubscription->runasowner;
2432 if (!run_as_owner)
2433 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2434
2435 /* Set relation for error callback */
2437
2438 /* Initialize the executor state. */
2439 edata = create_edata_for_relation(rel);
2440 estate = edata->estate;
2441 remoteslot = ExecInitExtraTupleSlot(estate,
2443 &TTSOpsVirtual);
2444
2445 /* Process and store remote tuple in the slot */
2447 slot_store_data(remoteslot, rel, &newtup);
2448 slot_fill_defaults(rel, estate, remoteslot);
2449 MemoryContextSwitchTo(oldctx);
2450
2451 /* For a partitioned table, insert the tuple into a partition. */
2452 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2454 remoteslot, NULL, CMD_INSERT);
2455 else
2457 remoteslot);
2458
2459 finish_edata(edata);
2460
2461 /* Reset relation for error callback */
2463
2464 if (!run_as_owner)
2465 RestoreUserContext(&ucxt);
2466
2468
2470}
2471
2472/*
2473 * Workhorse for apply_handle_insert()
2474 * relinfo is for the relation we're actually inserting into
2475 * (could be a child partition of edata->targetRelInfo)
2476 */
2477static void
2479 ResultRelInfo *relinfo,
2480 TupleTableSlot *remoteslot)
2481{
2482 EState *estate = edata->estate;
2483
2484 /* We must open indexes here. */
2485 ExecOpenIndices(relinfo, true);
2486 InitConflictIndexes(relinfo);
2487
2488 /* Do the insert. */
2490 ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2491
2492 /* Cleanup. */
2493 ExecCloseIndices(relinfo);
2494}
2495
2496/*
2497 * Check if the logical replication relation is updatable and throw
2498 * appropriate error if it isn't.
2499 */
2500static void
2502{
2503 /*
2504 * For partitioned tables, we only need to care if the target partition is
2505 * updatable (aka has PK or RI defined for it).
2506 */
2507 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2508 return;
2509
2510 /* Updatable, no error. */
2511 if (rel->updatable)
2512 return;
2513
2514 /*
2515 * We are in error mode so it's fine this is somewhat slow. It's better to
2516 * give user correct error.
2517 */
2519 {
2520 ereport(ERROR,
2521 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2522 errmsg("publisher did not send replica identity column "
2523 "expected by the logical replication target relation \"%s.%s\"",
2524 rel->remoterel.nspname, rel->remoterel.relname)));
2525 }
2526
2527 ereport(ERROR,
2528 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2529 errmsg("logical replication target relation \"%s.%s\" has "
2530 "neither REPLICA IDENTITY index nor PRIMARY "
2531 "KEY and published relation does not have "
2532 "REPLICA IDENTITY FULL",
2533 rel->remoterel.nspname, rel->remoterel.relname)));
2534}
2535
2536/*
2537 * Handle UPDATE message.
2538 *
2539 * TODO: FDW support
2540 */
2541static void
2543{
2545 LogicalRepRelId relid;
2546 UserContext ucxt;
2547 ApplyExecutionData *edata;
2548 EState *estate;
2549 LogicalRepTupleData oldtup;
2550 LogicalRepTupleData newtup;
2551 bool has_oldtup;
2552 TupleTableSlot *remoteslot;
2553 RTEPermissionInfo *target_perminfo;
2554 MemoryContext oldctx;
2555 bool run_as_owner;
2556
2557 /*
2558 * Quick return if we are skipping data modification changes or handling
2559 * streamed transactions.
2560 */
2561 if (is_skipping_changes() ||
2563 return;
2564
2566
2567 relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2568 &newtup);
2571 {
2572 /*
2573 * The relation can't become interesting in the middle of the
2574 * transaction so it's safe to unlock it.
2575 */
2578 return;
2579 }
2580
2581 /* Set relation for error callback */
2583
2584 /* Check if we can do the update. */
2586
2587 /*
2588 * Make sure that any user-supplied code runs as the table owner, unless
2589 * the user has opted out of that behavior.
2590 */
2591 run_as_owner = MySubscription->runasowner;
2592 if (!run_as_owner)
2593 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2594
2595 /* Initialize the executor state. */
2596 edata = create_edata_for_relation(rel);
2597 estate = edata->estate;
2598 remoteslot = ExecInitExtraTupleSlot(estate,
2600 &TTSOpsVirtual);
2601
2602 /*
2603 * Populate updatedCols so that per-column triggers can fire, and so
2604 * executor can correctly pass down indexUnchanged hint. This could
2605 * include more columns than were actually changed on the publisher
2606 * because the logical replication protocol doesn't contain that
2607 * information. But it would for example exclude columns that only exist
2608 * on the subscriber, since we are not touching those.
2609 */
2610 target_perminfo = list_nth(estate->es_rteperminfos, 0);
2611 for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2612 {
2614 int remoteattnum = rel->attrmap->attnums[i];
2615
2616 if (!att->attisdropped && remoteattnum >= 0)
2617 {
2618 Assert(remoteattnum < newtup.ncols);
2619 if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2620 target_perminfo->updatedCols =
2621 bms_add_member(target_perminfo->updatedCols,
2623 }
2624 }
2625
2626 /* Build the search tuple. */
2628 slot_store_data(remoteslot, rel,
2629 has_oldtup ? &oldtup : &newtup);
2630 MemoryContextSwitchTo(oldctx);
2631
2632 /* For a partitioned table, apply update to correct partition. */
2633 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2635 remoteslot, &newtup, CMD_UPDATE);
2636 else
2638 remoteslot, &newtup, rel->localindexoid);
2639
2640 finish_edata(edata);
2641
2642 /* Reset relation for error callback */
2644
2645 if (!run_as_owner)
2646 RestoreUserContext(&ucxt);
2647
2649
2651}
2652
2653/*
2654 * Workhorse for apply_handle_update()
2655 * relinfo is for the relation we're actually updating in
2656 * (could be a child partition of edata->targetRelInfo)
2657 */
2658static void
2660 ResultRelInfo *relinfo,
2661 TupleTableSlot *remoteslot,
2662 LogicalRepTupleData *newtup,
2663 Oid localindexoid)
2664{
2665 EState *estate = edata->estate;
2666 LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2667 Relation localrel = relinfo->ri_RelationDesc;
2668 EPQState epqstate;
2669 TupleTableSlot *localslot;
2670 bool found;
2671 MemoryContext oldctx;
2672
2673 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2674 ExecOpenIndices(relinfo, true);
2675
2676 found = FindReplTupleInLocalRel(edata, localrel,
2677 &relmapentry->remoterel,
2678 localindexoid,
2679 remoteslot, &localslot);
2680
2681 /*
2682 * Tuple found.
2683 *
2684 * Note this will fail if there are other conflicting unique indexes.
2685 */
2686 if (found)
2687 {
2688 RepOriginId localorigin;
2689 TransactionId localxmin;
2690 TimestampTz localts;
2691
2692 /*
2693 * Report the conflict if the tuple was modified by a different
2694 * origin.
2695 */
2696 if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
2697 localorigin != replorigin_session_origin)
2698 {
2699 TupleTableSlot *newslot;
2700
2701 /* Store the new tuple for conflict reporting */
2702 newslot = table_slot_create(localrel, &estate->es_tupleTable);
2703 slot_store_data(newslot, relmapentry, newtup);
2704
2706 remoteslot, localslot, newslot,
2707 InvalidOid, localxmin, localorigin, localts);
2708 }
2709
2710 /* Process and store remote tuple in the slot */
2712 slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2713 MemoryContextSwitchTo(oldctx);
2714
2715 EvalPlanQualSetSlot(&epqstate, remoteslot);
2716
2717 InitConflictIndexes(relinfo);
2718
2719 /* Do the actual update. */
2721 ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2722 remoteslot);
2723 }
2724 else
2725 {
2726 TupleTableSlot *newslot = localslot;
2727
2728 /* Store the new tuple for conflict reporting */
2729 slot_store_data(newslot, relmapentry, newtup);
2730
2731 /*
2732 * The tuple to be updated could not be found. Do nothing except for
2733 * emitting a log message.
2734 */
2735 ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
2736 remoteslot, NULL, newslot,
2739 }
2740
2741 /* Cleanup. */
2742 ExecCloseIndices(relinfo);
2743 EvalPlanQualEnd(&epqstate);
2744}
2745
2746/*
2747 * Handle DELETE message.
2748 *
2749 * TODO: FDW support
2750 */
2751static void
2753{
2755 LogicalRepTupleData oldtup;
2756 LogicalRepRelId relid;
2757 UserContext ucxt;
2758 ApplyExecutionData *edata;
2759 EState *estate;
2760 TupleTableSlot *remoteslot;
2761 MemoryContext oldctx;
2762 bool run_as_owner;
2763
2764 /*
2765 * Quick return if we are skipping data modification changes or handling
2766 * streamed transactions.
2767 */
2768 if (is_skipping_changes() ||
2770 return;
2771
2773
2774 relid = logicalrep_read_delete(s, &oldtup);
2777 {
2778 /*
2779 * The relation can't become interesting in the middle of the
2780 * transaction so it's safe to unlock it.
2781 */
2784 return;
2785 }
2786
2787 /* Set relation for error callback */
2789
2790 /* Check if we can do the delete. */
2792
2793 /*
2794 * Make sure that any user-supplied code runs as the table owner, unless
2795 * the user has opted out of that behavior.
2796 */
2797 run_as_owner = MySubscription->runasowner;
2798 if (!run_as_owner)
2799 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2800
2801 /* Initialize the executor state. */
2802 edata = create_edata_for_relation(rel);
2803 estate = edata->estate;
2804 remoteslot = ExecInitExtraTupleSlot(estate,
2806 &TTSOpsVirtual);
2807
2808 /* Build the search tuple. */
2810 slot_store_data(remoteslot, rel, &oldtup);
2811 MemoryContextSwitchTo(oldctx);
2812
2813 /* For a partitioned table, apply delete to correct partition. */
2814 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2816 remoteslot, NULL, CMD_DELETE);
2817 else
2819 remoteslot, rel->localindexoid);
2820
2821 finish_edata(edata);
2822
2823 /* Reset relation for error callback */
2825
2826 if (!run_as_owner)
2827 RestoreUserContext(&ucxt);
2828
2830
2832}
2833
2834/*
2835 * Workhorse for apply_handle_delete()
2836 * relinfo is for the relation we're actually deleting from
2837 * (could be a child partition of edata->targetRelInfo)
2838 */
2839static void
2841 ResultRelInfo *relinfo,
2842 TupleTableSlot *remoteslot,
2843 Oid localindexoid)
2844{
2845 EState *estate = edata->estate;
2846 Relation localrel = relinfo->ri_RelationDesc;
2847 LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
2848 EPQState epqstate;
2849 TupleTableSlot *localslot;
2850 bool found;
2851
2852 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2853 ExecOpenIndices(relinfo, false);
2854
2855 found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
2856 remoteslot, &localslot);
2857
2858 /* If found delete it. */
2859 if (found)
2860 {
2861 RepOriginId localorigin;
2862 TransactionId localxmin;
2863 TimestampTz localts;
2864
2865 /*
2866 * Report the conflict if the tuple was modified by a different
2867 * origin.
2868 */
2869 if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
2870 localorigin != replorigin_session_origin)
2872 remoteslot, localslot, NULL,
2873 InvalidOid, localxmin, localorigin, localts);
2874
2875 EvalPlanQualSetSlot(&epqstate, localslot);
2876
2877 /* Do the actual delete. */
2879 ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
2880 }
2881 else
2882 {
2883 /*
2884 * The tuple to be deleted could not be found. Do nothing except for
2885 * emitting a log message.
2886 */
2887 ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
2888 remoteslot, NULL, NULL,
2891 }
2892
2893 /* Cleanup. */
2894 ExecCloseIndices(relinfo);
2895 EvalPlanQualEnd(&epqstate);
2896}
2897
2898/*
2899 * Try to find a tuple received from the publication side (in 'remoteslot') in
2900 * the corresponding local relation using either replica identity index,
2901 * primary key, index or if needed, sequential scan.
2902 *
2903 * Local tuple, if found, is returned in '*localslot'.
2904 */
2905static bool
2907 LogicalRepRelation *remoterel,
2908 Oid localidxoid,
2909 TupleTableSlot *remoteslot,
2910 TupleTableSlot **localslot)
2911{
2912 EState *estate = edata->estate;
2913 bool found;
2914
2915 /*
2916 * Regardless of the top-level operation, we're performing a read here, so
2917 * check for SELECT privileges.
2918 */
2920
2921 *localslot = table_slot_create(localrel, &estate->es_tupleTable);
2922
2923 Assert(OidIsValid(localidxoid) ||
2924 (remoterel->replident == REPLICA_IDENTITY_FULL));
2925
2926 if (OidIsValid(localidxoid))
2927 {
2928#ifdef USE_ASSERT_CHECKING
2929 Relation idxrel = index_open(localidxoid, AccessShareLock);
2930
2931 /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
2932 Assert(GetRelationIdentityOrPK(localrel) == localidxoid ||
2933 (remoterel->replident == REPLICA_IDENTITY_FULL &&
2935 edata->targetRel->attrmap)));
2937#endif
2938
2939 found = RelationFindReplTupleByIndex(localrel, localidxoid,
2941 remoteslot, *localslot);
2942 }
2943 else
2945 remoteslot, *localslot);
2946
2947 return found;
2948}
2949
2950/*
2951 * This handles insert, update, delete on a partitioned table.
2952 */
2953static void
2955 TupleTableSlot *remoteslot,
2956 LogicalRepTupleData *newtup,
2957 CmdType operation)
2958{
2959 EState *estate = edata->estate;
2960 LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2961 ResultRelInfo *relinfo = edata->targetRelInfo;
2962 Relation parentrel = relinfo->ri_RelationDesc;
2963 ModifyTableState *mtstate;
2964 PartitionTupleRouting *proute;
2965 ResultRelInfo *partrelinfo;
2966 Relation partrel;
2967 TupleTableSlot *remoteslot_part;
2968 TupleConversionMap *map;
2969 MemoryContext oldctx;
2970 LogicalRepRelMapEntry *part_entry = NULL;
2971 AttrMap *attrmap = NULL;
2972
2973 /* ModifyTableState is needed for ExecFindPartition(). */
2974 edata->mtstate = mtstate = makeNode(ModifyTableState);
2975 mtstate->ps.plan = NULL;
2976 mtstate->ps.state = estate;
2977 mtstate->operation = operation;
2978 mtstate->resultRelInfo = relinfo;
2979
2980 /* ... as is PartitionTupleRouting. */
2981 edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
2982
2983 /*
2984 * Find the partition to which the "search tuple" belongs.
2985 */
2986 Assert(remoteslot != NULL);
2988 partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
2989 remoteslot, estate);
2990 Assert(partrelinfo != NULL);
2991 partrel = partrelinfo->ri_RelationDesc;
2992
2993 /*
2994 * Check for supported relkind. We need this since partitions might be of
2995 * unsupported relkinds; and the set of partitions can change, so checking
2996 * at CREATE/ALTER SUBSCRIPTION would be insufficient.
2997 */
2998 CheckSubscriptionRelkind(partrel->rd_rel->relkind,
3000 RelationGetRelationName(partrel));
3001
3002 /*
3003 * To perform any of the operations below, the tuple must match the
3004 * partition's rowtype. Convert if needed or just copy, using a dedicated
3005 * slot to store the tuple in any case.
3006 */
3007 remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
3008 if (remoteslot_part == NULL)
3009 remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
3010 map = ExecGetRootToChildMap(partrelinfo, estate);
3011 if (map != NULL)
3012 {
3013 attrmap = map->attrMap;
3014 remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
3015 remoteslot_part);
3016 }
3017 else
3018 {
3019 remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
3020 slot_getallattrs(remoteslot_part);
3021 }
3022 MemoryContextSwitchTo(oldctx);
3023
3024 /* Check if we can do the update or delete on the leaf partition. */
3025 if (operation == CMD_UPDATE || operation == CMD_DELETE)
3026 {
3027 part_entry = logicalrep_partition_open(relmapentry, partrel,
3028 attrmap);
3029 check_relation_updatable(part_entry);
3030 }
3031
3032 switch (operation)
3033 {
3034 case CMD_INSERT:
3035 apply_handle_insert_internal(edata, partrelinfo,
3036 remoteslot_part);
3037 break;
3038
3039 case CMD_DELETE:
3040 apply_handle_delete_internal(edata, partrelinfo,
3041 remoteslot_part,
3042 part_entry->localindexoid);
3043 break;
3044
3045 case CMD_UPDATE:
3046
3047 /*
3048 * For UPDATE, depending on whether or not the updated tuple
3049 * satisfies the partition's constraint, perform a simple UPDATE
3050 * of the partition or move the updated tuple into a different
3051 * suitable partition.
3052 */
3053 {
3054 TupleTableSlot *localslot;
3055 ResultRelInfo *partrelinfo_new;
3056 Relation partrel_new;
3057 bool found;
3058 EPQState epqstate;
3059 RepOriginId localorigin;
3060 TransactionId localxmin;
3061 TimestampTz localts;
3062
3063 /* Get the matching local tuple from the partition. */
3064 found = FindReplTupleInLocalRel(edata, partrel,
3065 &part_entry->remoterel,
3066 part_entry->localindexoid,
3067 remoteslot_part, &localslot);
3068 if (!found)
3069 {
3070 TupleTableSlot *newslot = localslot;
3071
3072 /* Store the new tuple for conflict reporting */
3073 slot_store_data(newslot, part_entry, newtup);
3074
3075 /*
3076 * The tuple to be updated could not be found. Do nothing
3077 * except for emitting a log message.
3078 */
3079 ReportApplyConflict(estate, partrelinfo,
3081 remoteslot_part, NULL, newslot,
3084
3085 return;
3086 }
3087
3088 /*
3089 * Report the conflict if the tuple was modified by a
3090 * different origin.
3091 */
3092 if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
3093 localorigin != replorigin_session_origin)
3094 {
3095 TupleTableSlot *newslot;
3096
3097 /* Store the new tuple for conflict reporting */
3098 newslot = table_slot_create(partrel, &estate->es_tupleTable);
3099 slot_store_data(newslot, part_entry, newtup);
3100
3101 ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
3102 remoteslot_part, localslot, newslot,
3103 InvalidOid, localxmin, localorigin,
3104 localts);
3105 }
3106
3107 /*
3108 * Apply the update to the local tuple, putting the result in
3109 * remoteslot_part.
3110 */
3112 slot_modify_data(remoteslot_part, localslot, part_entry,
3113 newtup);
3114 MemoryContextSwitchTo(oldctx);
3115
3116 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3117
3118 /*
3119 * Does the updated tuple still satisfy the current
3120 * partition's constraint?
3121 */
3122 if (!partrel->rd_rel->relispartition ||
3123 ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3124 false))
3125 {
3126 /*
3127 * Yes, so simply UPDATE the partition. We don't call
3128 * apply_handle_update_internal() here, which would
3129 * normally do the following work, to avoid repeating some
3130 * work already done above to find the local tuple in the
3131 * partition.
3132 */
3133 ExecOpenIndices(partrelinfo, true);
3134 InitConflictIndexes(partrelinfo);
3135
3136 EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3138 ACL_UPDATE);
3139 ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3140 localslot, remoteslot_part);
3141 }
3142 else
3143 {
3144 /* Move the tuple into the new partition. */
3145
3146 /*
3147 * New partition will be found using tuple routing, which
3148 * can only occur via the parent table. We might need to
3149 * convert the tuple to the parent's rowtype. Note that
3150 * this is the tuple found in the partition, not the
3151 * original search tuple received by this function.
3152 */
3153 if (map)
3154 {
3155 TupleConversionMap *PartitionToRootMap =
3157 RelationGetDescr(parentrel));
3158
3159 remoteslot =
3160 execute_attr_map_slot(PartitionToRootMap->attrMap,
3161 remoteslot_part, remoteslot);
3162 }
3163 else
3164 {
3165 remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3166 slot_getallattrs(remoteslot);
3167 }
3168
3169 /* Find the new partition. */
3171 partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3172 proute, remoteslot,
3173 estate);
3174 MemoryContextSwitchTo(oldctx);
3175 Assert(partrelinfo_new != partrelinfo);
3176 partrel_new = partrelinfo_new->ri_RelationDesc;
3177
3178 /* Check that new partition also has supported relkind. */
3179 CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3181 RelationGetRelationName(partrel_new));
3182
3183 ExecOpenIndices(partrelinfo, false);
3184
3185 /* DELETE old tuple found in the old partition. */
3186 EvalPlanQualSetSlot(&epqstate, localslot);
3188 ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
3189
3190 /* INSERT new tuple into the new partition. */
3191
3192 /*
3193 * Convert the replacement tuple to match the destination
3194 * partition rowtype.
3195 */
3197 remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3198 if (remoteslot_part == NULL)
3199 remoteslot_part = table_slot_create(partrel_new,
3200 &estate->es_tupleTable);
3201 map = ExecGetRootToChildMap(partrelinfo_new, estate);
3202 if (map != NULL)
3203 {
3204 remoteslot_part = execute_attr_map_slot(map->attrMap,
3205 remoteslot,
3206 remoteslot_part);
3207 }
3208 else
3209 {
3210 remoteslot_part = ExecCopySlot(remoteslot_part,
3211 remoteslot);
3212 slot_getallattrs(remoteslot);
3213 }
3214 MemoryContextSwitchTo(oldctx);
3215 apply_handle_insert_internal(edata, partrelinfo_new,
3216 remoteslot_part);
3217 }
3218
3219 ExecCloseIndices(partrelinfo);
3220 EvalPlanQualEnd(&epqstate);
3221 }
3222 break;
3223
3224 default:
3225 elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3226 break;
3227 }
3228}
3229
3230/*
3231 * Handle TRUNCATE message.
3232 *
3233 * TODO: FDW support
3234 */
3235static void
3237{
3238 bool cascade = false;
3239 bool restart_seqs = false;
3240 List *remote_relids = NIL;
3241 List *remote_rels = NIL;
3242 List *rels = NIL;
3243 List *part_rels = NIL;
3244 List *relids = NIL;
3245 List *relids_logged = NIL;
3246 ListCell *lc;
3247 LOCKMODE lockmode = AccessExclusiveLock;
3248
3249 /*
3250 * Quick return if we are skipping data modification changes or handling
3251 * streamed transactions.
3252 */
3253 if (is_skipping_changes() ||
3255 return;
3256
3258
3259 remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3260
3261 foreach(lc, remote_relids)
3262 {
3263 LogicalRepRelId relid = lfirst_oid(lc);
3265
3266 rel = logicalrep_rel_open(relid, lockmode);
3268 {
3269 /*
3270 * The relation can't become interesting in the middle of the
3271 * transaction so it's safe to unlock it.
3272 */
3273 logicalrep_rel_close(rel, lockmode);
3274 continue;
3275 }
3276
3277 remote_rels = lappend(remote_rels, rel);
3279 rels = lappend(rels, rel->localrel);
3280 relids = lappend_oid(relids, rel->localreloid);
3282 relids_logged = lappend_oid(relids_logged, rel->localreloid);
3283
3284 /*
3285 * Truncate partitions if we got a message to truncate a partitioned
3286 * table.
3287 */
3288 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3289 {
3290 ListCell *child;
3291 List *children = find_all_inheritors(rel->localreloid,
3292 lockmode,
3293 NULL);
3294
3295 foreach(child, children)
3296 {
3297 Oid childrelid = lfirst_oid(child);
3298 Relation childrel;
3299
3300 if (list_member_oid(relids, childrelid))
3301 continue;
3302
3303 /* find_all_inheritors already got lock */
3304 childrel = table_open(childrelid, NoLock);
3305
3306 /*
3307 * Ignore temp tables of other backends. See similar code in
3308 * ExecuteTruncate().
3309 */
3310 if (RELATION_IS_OTHER_TEMP(childrel))
3311 {
3312 table_close(childrel, lockmode);
3313 continue;
3314 }
3315
3317 rels = lappend(rels, childrel);
3318 part_rels = lappend(part_rels, childrel);
3319 relids = lappend_oid(relids, childrelid);
3320 /* Log this relation only if needed for logical decoding */
3321 if (RelationIsLogicallyLogged(childrel))
3322 relids_logged = lappend_oid(relids_logged, childrelid);
3323 }
3324 }
3325 }
3326
3327 /*
3328 * Even if we used CASCADE on the upstream primary we explicitly default
3329 * to replaying changes without further cascading. This might be later
3330 * changeable with a user specified option.
3331 *
3332 * MySubscription->runasowner tells us whether we want to execute
3333 * replication actions as the subscription owner; the last argument to
3334 * TruncateGuts tells it whether we want to switch to the table owner.
3335 * Those are exactly opposite conditions.
3336 */
3338 relids,
3339 relids_logged,
3341 restart_seqs,
3343 foreach(lc, remote_rels)
3344 {
3345 LogicalRepRelMapEntry *rel = lfirst(lc);
3346
3348 }
3349 foreach(lc, part_rels)
3350 {
3351 Relation rel = lfirst(lc);
3352
3353 table_close(rel, NoLock);
3354 }
3355
3357}
3358
3359
3360/*
3361 * Logical replication protocol message dispatcher.
3362 */
3363void
3365{
3367 LogicalRepMsgType saved_command;
3368
3369 /*
3370 * Set the current command being applied. Since this function can be
3371 * called recursively when applying spooled changes, save the current
3372 * command.
3373 */
3374 saved_command = apply_error_callback_arg.command;
3376
3377 switch (action)
3378 {
3381 break;
3382
3385 break;
3386
3389 break;
3390
3393 break;
3394
3397 break;
3398
3401 break;
3402
3405 break;
3406
3409 break;
3410
3413 break;
3414
3416
3417 /*
3418 * Logical replication does not use generic logical messages yet.
3419 * Although, it could be used by other applications that use this
3420 * output plugin.
3421 */
3422 break;
3423
3426 break;
3427
3430 break;
3431
3434 break;
3435
3438 break;
3439
3442 break;
3443
3446 break;
3447
3450 break;
3451
3454 break;
3455
3458 break;
3459
3460 default:
3461 ereport(ERROR,
3462 (errcode(ERRCODE_PROTOCOL_VIOLATION),
3463 errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3464 }
3465
3466 /* Reset the current command */
3467 apply_error_callback_arg.command = saved_command;
3468}
3469
3470/*
3471 * Figure out which write/flush positions to report to the walsender process.
3472 *
3473 * We can't simply report back the last LSN the walsender sent us because the
3474 * local transaction might not yet be flushed to disk locally. Instead we
3475 * build a list that associates local with remote LSNs for every commit. When
3476 * reporting back the flush position to the sender we iterate that list and
3477 * check which entries on it are already locally flushed. Those we can report
3478 * as having been flushed.
3479 *
3480 * The have_pending_txes is true if there are outstanding transactions that
3481 * need to be flushed.
3482 */
3483static void
3485 bool *have_pending_txes)
3486{
3487 dlist_mutable_iter iter;
3488 XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3489
3491 *flush = InvalidXLogRecPtr;
3492
3494 {
3495 FlushPosition *pos =
3496 dlist_container(FlushPosition, node, iter.cur);
3497
3498 *write = pos->remote_end;
3499
3500 if (pos->local_end <= local_flush)
3501 {
3502 *flush = pos->remote_end;
3503 dlist_delete(iter.cur);
3504 pfree(pos);
3505 }
3506 else
3507 {
3508 /*
3509 * Don't want to uselessly iterate over the rest of the list which
3510 * could potentially be long. Instead get the last element and
3511 * grab the write position from there.
3512 */
3514 &lsn_mapping);
3515 *write = pos->remote_end;
3516 *have_pending_txes = true;
3517 return;
3518 }
3519 }
3520
3521 *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3522}
3523
3524/*
3525 * Store current remote/local lsn pair in the tracking list.
3526 */
3527void
3529{
3530 FlushPosition *flushpos;
3531
3532 /*
3533 * Skip for parallel apply workers, because the lsn_mapping is maintained
3534 * by the leader apply worker.
3535 */
3537 return;
3538
3539 /* Need to do this in permanent context */
3541
3542 /* Track commit lsn */
3543 flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
3544 flushpos->local_end = local_lsn;
3545 flushpos->remote_end = remote_lsn;
3546
3547 dlist_push_tail(&lsn_mapping, &flushpos->node);
3549}
3550
3551
3552/* Update statistics of the worker. */
3553static void
3554UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
3555{
3556 MyLogicalRepWorker->last_lsn = last_lsn;
3559 if (reply)
3560 {
3561 MyLogicalRepWorker->reply_lsn = last_lsn;
3562 MyLogicalRepWorker->reply_time = send_time;
3563 }
3564}
3565
3566/*
3567 * Apply main loop.
3568 */
3569static void
3571{
3572 TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3573 bool ping_sent = false;
3574 TimeLineID tli;
3575 ErrorContextCallback errcallback;
3576
3577 /*
3578 * Init the ApplyMessageContext which we clean up after each replication
3579 * protocol message.
3580 */
3582 "ApplyMessageContext",
3584
3585 /*
3586 * This memory context is used for per-stream data when the streaming mode
3587 * is enabled. This context is reset on each stream stop.
3588 */
3590 "LogicalStreamingContext",
3592
3593 /* mark as idle, before starting to loop */
3595
3596 /*
3597 * Push apply error context callback. Fields will be filled while applying
3598 * a change.
3599 */
3600 errcallback.callback = apply_error_callback;
3601 errcallback.previous = error_context_stack;
3602 error_context_stack = &errcallback;
3604
3605 /* This outer loop iterates once per wait. */
3606 for (;;)
3607 {
3609 int rc;
3610 int len;
3611 char *buf = NULL;
3612 bool endofstream = false;
3613 long wait_time;
3614
3616
3618
3620
3621 if (len != 0)
3622 {
3623 /* Loop to process all available data (without blocking). */
3624 for (;;)
3625 {
3627
3628 if (len == 0)
3629 {
3630 break;
3631 }
3632 else if (len < 0)
3633 {
3634 ereport(LOG,
3635 (errmsg("data stream from publisher has ended")));
3636 endofstream = true;
3637 break;
3638 }
3639 else
3640 {
3641 int c;
3643
3645 {
3646 ConfigReloadPending = false;
3648 }
3649
3650 /* Reset timeout. */
3651 last_recv_timestamp = GetCurrentTimestamp();
3652 ping_sent = false;
3653
3654 /* Ensure we are reading the data into our memory context. */
3656
3658
3659 c = pq_getmsgbyte(&s);
3660
3661 if (c == 'w')
3662 {
3663 XLogRecPtr start_lsn;
3664 XLogRecPtr end_lsn;
3665 TimestampTz send_time;
3666
3667 start_lsn = pq_getmsgint64(&s);
3668 end_lsn = pq_getmsgint64(&s);
3669 send_time = pq_getmsgint64(&s);
3670
3671 if (last_received < start_lsn)
3672 last_received = start_lsn;
3673
3674 if (last_received < end_lsn)
3675 last_received = end_lsn;
3676
3677 UpdateWorkerStats(last_received, send_time, false);
3678
3679 apply_dispatch(&s);
3680 }
3681 else if (c == 'k')
3682 {
3683 XLogRecPtr end_lsn;
3685 bool reply_requested;
3686
3687 end_lsn = pq_getmsgint64(&s);
3689 reply_requested = pq_getmsgbyte(&s);
3690
3691 if (last_received < end_lsn)
3692 last_received = end_lsn;
3693
3694 send_feedback(last_received, reply_requested, false);
3695 UpdateWorkerStats(last_received, timestamp, true);
3696 }
3697 /* other message types are purposefully ignored */
3698
3700 }
3701
3703 }
3704 }
3705
3706 /* confirm all writes so far */
3707 send_feedback(last_received, false, false);
3708
3710 {
3711 /*
3712 * If we didn't get any transactions for a while there might be
3713 * unconsumed invalidation messages in the queue, consume them
3714 * now.
3715 */
3718
3719 /* Process any table synchronization changes. */
3720 process_syncing_tables(last_received);
3721 }
3722
3723 /* Cleanup the memory. */
3726
3727 /* Check if we need to exit the streaming loop. */
3728 if (endofstream)
3729 break;
3730
3731 /*
3732 * Wait for more data or latch. If we have unflushed transactions,
3733 * wake up after WalWriterDelay to see if they've been flushed yet (in
3734 * which case we should send a feedback message). Otherwise, there's
3735 * no particular urgency about waking up unless we get data or a
3736 * signal.
3737 */
3739 wait_time = WalWriterDelay;
3740 else
3741 wait_time = NAPTIME_PER_CYCLE;
3742
3746 fd, wait_time,
3747 WAIT_EVENT_LOGICAL_APPLY_MAIN);
3748
3749 if (rc & WL_LATCH_SET)
3750 {
3753 }
3754
3756 {
3757 ConfigReloadPending = false;
3759 }
3760
3761 if (rc & WL_TIMEOUT)
3762 {
3763 /*
3764 * We didn't receive anything new. If we haven't heard anything
3765 * from the server for more than wal_receiver_timeout / 2, ping
3766 * the server. Also, if it's been longer than
3767 * wal_receiver_status_interval since the last update we sent,
3768 * send a status update to the primary anyway, to report any
3769 * progress in applying WAL.
3770 */
3771 bool requestReply = false;
3772
3773 /*
3774 * Check if time since last receive from primary has reached the
3775 * configured limit.
3776 */
3777 if (wal_receiver_timeout > 0)
3778 {
3780 TimestampTz timeout;
3781
3782 timeout =
3783 TimestampTzPlusMilliseconds(last_recv_timestamp,
3785
3786 if (now >= timeout)
3787 ereport(ERROR,
3788 (errcode(ERRCODE_CONNECTION_FAILURE),
3789 errmsg("terminating logical replication worker due to timeout")));
3790
3791 /* Check to see if it's time for a ping. */
3792 if (!ping_sent)
3793 {
3794 timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
3795 (wal_receiver_timeout / 2));
3796 if (now >= timeout)
3797 {
3798 requestReply = true;
3799 ping_sent = true;
3800 }
3801 }
3802 }
3803
3804 send_feedback(last_received, requestReply, requestReply);
3805
3806 /*
3807 * Force reporting to ensure long idle periods don't lead to
3808 * arbitrarily delayed stats. Stats can only be reported outside
3809 * of (implicit or explicit) transactions. That shouldn't lead to
3810 * stats being delayed for long, because transactions are either
3811 * sent as a whole on commit or streamed. Streamed transactions
3812 * are spilled to disk and applied on commit.
3813 */
3814 if (!IsTransactionState())
3815 pgstat_report_stat(true);
3816 }
3817 }
3818
3819 /* Pop the error context stack */
3820 error_context_stack = errcallback.previous;
3822
3823 /* All done */
3825}
3826
3827/*
3828 * Send a Standby Status Update message to server.
3829 *
3830 * 'recvpos' is the latest LSN we've received data to, force is set if we need
3831 * to send a response to avoid timeouts.
3832 */
3833static void
3834send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
3835{
3836 static StringInfo reply_message = NULL;
3837 static TimestampTz send_time = 0;
3838
3839 static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
3840 static XLogRecPtr last_writepos = InvalidXLogRecPtr;
3841 static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
3842
3843 XLogRecPtr writepos;
3844 XLogRecPtr flushpos;
3846 bool have_pending_txes;
3847
3848 /*
3849 * If the user doesn't want status to be reported to the publisher, be
3850 * sure to exit before doing anything at all.
3851 */
3852 if (!force && wal_receiver_status_interval <= 0)
3853 return;
3854
3855 /* It's legal to not pass a recvpos */
3856 if (recvpos < last_recvpos)
3857 recvpos = last_recvpos;
3858
3859 get_flush_position(&writepos, &flushpos, &have_pending_txes);
3860
3861 /*
3862 * No outstanding transactions to flush, we can report the latest received
3863 * position. This is important for synchronous replication.
3864 */
3865 if (!have_pending_txes)
3866 flushpos = writepos = recvpos;
3867
3868 if (writepos < last_writepos)
3869 writepos = last_writepos;
3870
3871 if (flushpos < last_flushpos)
3872 flushpos = last_flushpos;
3873
3875
3876 /* if we've already reported everything we're good */
3877 if (!force &&
3878 writepos == last_writepos &&
3879 flushpos == last_flushpos &&
3880 !TimestampDifferenceExceeds(send_time, now,
3882 return;
3883 send_time = now;
3884
3885 if (!reply_message)
3886 {
3888
3890 MemoryContextSwitchTo(oldctx);
3891 }
3892 else
3894
3896 pq_sendint64(reply_message, recvpos); /* write */
3897 pq_sendint64(reply_message, flushpos); /* flush */
3898 pq_sendint64(reply_message, writepos); /* apply */
3899 pq_sendint64(reply_message, now); /* sendTime */
3900 pq_sendbyte(reply_message, requestReply); /* replyRequested */
3901
3902 elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
3903 force,
3904 LSN_FORMAT_ARGS(recvpos),
3905 LSN_FORMAT_ARGS(writepos),
3906 LSN_FORMAT_ARGS(flushpos));
3907
3910
3911 if (recvpos > last_recvpos)
3912 last_recvpos = recvpos;
3913 if (writepos > last_writepos)
3914 last_writepos = writepos;
3915 if (flushpos > last_flushpos)
3916 last_flushpos = flushpos;
3917}
3918
3919/*
3920 * Exit routine for apply workers due to subscription parameter changes.
3921 */
3922static void
3924{
3926 {
3927 /*
3928 * Don't stop the parallel apply worker as the leader will detect the
3929 * subscription parameter change and restart logical replication later
3930 * anyway. This also prevents the leader from reporting errors when
3931 * trying to communicate with a stopped parallel apply worker, which
3932 * would accidentally disable subscriptions if disable_on_error was
3933 * set.
3934 */
3935 return;
3936 }
3937
3938 /*
3939 * Reset the last-start time for this apply worker so that the launcher
3940 * will restart it without waiting for wal_retrieve_retry_interval if the
3941 * subscription is still active, and so that we won't leak that hash table
3942 * entry if it isn't.
3943 */
3946
3947 proc_exit(0);
3948}
3949
3950/*
3951 * Reread subscription info if needed.
3952 *
3953 * For significant changes, we react by exiting the current process; a new
3954 * one will be launched afterwards if needed.
3955 */
3956void
3958{
3959 MemoryContext oldctx;
3961 bool started_tx = false;
3962
3963 /* When cache state is valid there is nothing to do here. */
3965 return;
3966
3967 /* This function might be called inside or outside of transaction. */
3968 if (!IsTransactionState())
3969 {
3971 started_tx = true;
3972 }
3973
3974 /* Ensure allocations in permanent context. */
3976
3978
3979 /*
3980 * Exit if the subscription was removed. This normally should not happen
3981 * as the worker gets killed during DROP SUBSCRIPTION.
3982 */
3983 if (!newsub)
3984 {
3985 ereport(LOG,
3986 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
3987 MySubscription->name)));
3988
3989 /* Ensure we remove no-longer-useful entry for worker's start time */
3992
3993 proc_exit(0);
3994 }
3995
3996 /* Exit if the subscription was disabled. */
3997 if (!newsub->enabled)
3998 {
3999 ereport(LOG,
4000 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
4001 MySubscription->name)));
4002
4004 }
4005
4006 /* !slotname should never happen when enabled is true. */
4007 Assert(newsub->slotname);
4008
4009 /* two-phase cannot be altered while the worker is running */
4010 Assert(newsub->twophasestate == MySubscription->twophasestate);
4011
4012 /*
4013 * Exit if any parameter that affects the remote connection was changed.
4014 * The launcher will start a new worker but note that the parallel apply
4015 * worker won't restart if the streaming option's value is changed from
4016 * 'parallel' to any other value or the server decides not to stream the
4017 * in-progress transaction.
4018 */
4019 if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
4020 strcmp(newsub->name, MySubscription->name) != 0 ||
4021 strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
4022 newsub->binary != MySubscription->binary ||
4023 newsub->stream != MySubscription->stream ||
4024 newsub->passwordrequired != MySubscription->passwordrequired ||
4025 strcmp(newsub->origin, MySubscription->origin) != 0 ||
4026 newsub->owner != MySubscription->owner ||
4027 !equal(newsub->publications, MySubscription->publications))
4028 {
4030 ereport(LOG,
4031 (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
4032 MySubscription->name)));
4033 else
4034 ereport(LOG,
4035 (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
4036 MySubscription->name)));
4037
4039 }
4040
4041 /*
4042 * Exit if the subscription owner's superuser privileges have been
4043 * revoked.
4044 */
4045 if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
4046 {
4048 ereport(LOG,
4049 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
4051 else
4052 ereport(LOG,
4053 errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
4055
4057 }
4058
4059 /* Check for other changes that should never happen too. */
4060 if (newsub->dbid != MySubscription->dbid)
4061 {
4062 elog(ERROR, "subscription %u changed unexpectedly",
4064 }
4065
4066 /* Clean old subscription info and switch to new one. */
4069
4070 MemoryContextSwitchTo(oldctx);
4071
4072 /* Change synchronous commit according to the user's wishes */
4073 SetConfigOption("synchronous_commit", MySubscription->synccommit,
4075
4076 if (started_tx)
4078
4079 MySubscriptionValid = true;
4080}
4081
4082/*
4083 * Callback from subscription syscache invalidation.
4084 */
4085static void
4086subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
4087{
4088 MySubscriptionValid = false;
4089}
4090
4091/*
4092 * subxact_info_write
4093 * Store information about subxacts for a toplevel transaction.
4094 *
4095 * For each subxact we store offset of it's first change in the main file.
4096 * The file is always over-written as a whole.
4097 *
4098 * XXX We should only store subxacts that were not aborted yet.
4099 */
4100static void
4102{
4103 char path[MAXPGPATH];
4104 Size len;
4105 BufFile *fd;
4106
4108
4109 /* construct the subxact filename */
4110 subxact_filename(path, subid, xid);
4111
4112 /* Delete the subxacts file, if exists. */
4113 if (subxact_data.nsubxacts == 0)
4114 {
4117
4118 return;
4119 }
4120
4121 /*
4122 * Create the subxact file if it not already created, otherwise open the
4123 * existing file.
4124 */
4126 true);
4127 if (fd == NULL)
4129
4131
4132 /* Write the subxact count and subxact info */
4135
4137
4138 /* free the memory allocated for subxact info */
4140}
4141
4142/*
4143 * subxact_info_read
4144 * Restore information about subxacts of a streamed transaction.
4145 *
4146 * Read information about subxacts into the structure subxact_data that can be
4147 * used later.
4148 */
4149static void
4151{
4152 char path[MAXPGPATH];
4153 Size len;
4154 BufFile *fd;
4155 MemoryContext oldctx;
4156
4160
4161 /*
4162 * If the subxact file doesn't exist that means we don't have any subxact
4163 * info.
4164 */
4165 subxact_filename(path, subid, xid);
4167 true);
4168 if (fd == NULL)
4169 return;
4170
4171 /* read number of subxact items */
4173
4175
4176 /* we keep the maximum as a power of 2 */
4178
4179 /*
4180 * Allocate subxact information in the logical streaming context. We need
4181 * this information during the complete stream so that we can add the sub
4182 * transaction info to this. On stream stop we will flush this information
4183 * to the subxact file and reset the logical streaming context.
4184 */
4187 sizeof(SubXactInfo));
4188 MemoryContextSwitchTo(oldctx);
4189
4190 if (len > 0)
4192
4194}
4195
4196/*
4197 * subxact_info_add
4198 * Add information about a subxact (offset in the main file).
4199 */
4200static void
4202{
4203 SubXactInfo *subxacts = subxact_data.subxacts;
4204 int64 i;
4205
4206 /* We must have a valid top level stream xid and a stream fd. */
4208 Assert(stream_fd != NULL);
4209
4210 /*
4211 * If the XID matches the toplevel transaction, we don't want to add it.
4212 */
4213 if (stream_xid == xid)
4214 return;
4215
4216 /*
4217 * In most cases we're checking the same subxact as we've already seen in
4218 * the last call, so make sure to ignore it (this change comes later).
4219 */
4220 if (subxact_data.subxact_last == xid)
4221 return;
4222
4223 /* OK, remember we're processing this XID. */
4225
4226 /*
4227 * Check if the transaction is already present in the array of subxact. We
4228 * intentionally scan the array from the tail, because we're likely adding
4229 * a change for the most recent subtransactions.
4230 *
4231 * XXX Can we rely on the subxact XIDs arriving in sorted order? That
4232 * would allow us to use binary search here.
4233 */
4234 for (i = subxact_data.nsubxacts; i > 0; i--)
4235 {
4236 /* found, so we're done */
4237 if (subxacts[i - 1].xid == xid)
4238 return;
4239 }
4240
4241 /* This is a new subxact, so we need to add it to the array. */
4242 if (subxact_data.nsubxacts == 0)
4243 {
4244 MemoryContext oldctx;
4245
4247
4248 /*
4249 * Allocate this memory for subxacts in per-stream context, see
4250 * subxact_info_read.
4251 */
4253 subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
4254 MemoryContextSwitchTo(oldctx);
4255 }
4257 {
4259 subxacts = repalloc(subxacts,
4261 }
4262
4263 subxacts[subxact_data.nsubxacts].xid = xid;
4264
4265 /*
4266 * Get the current offset of the stream file and store it as offset of
4267 * this subxact.
4268 */
4270 &subxacts[subxact_data.nsubxacts].fileno,
4271 &subxacts[subxact_data.nsubxacts].offset);
4272
4274 subxact_data.subxacts = subxacts;
4275}
4276
4277/* format filename for file containing the info about subxacts */
4278static inline void
4279subxact_filename(char *path, Oid subid, TransactionId xid)
4280{
4281 snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
4282}
4283
4284/* format filename for file containing serialized changes */
4285static inline void
4286changes_filename(char *path, Oid subid, TransactionId xid)
4287{
4288 snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
4289}
4290
4291/*
4292 * stream_cleanup_files
4293 * Cleanup files for a subscription / toplevel transaction.
4294 *
4295 * Remove files with serialized changes and subxact info for a particular
4296 * toplevel transaction. Each subscription has a separate set of files
4297 * for any toplevel transaction.
4298 */
4299void
4301{
4302 char path[MAXPGPATH];
4303
4304 /* Delete the changes file. */
4305 changes_filename(path, subid, xid);
4307
4308 /* Delete the subxact file, if it exists. */
4309 subxact_filename(path, subid, xid);
4311}
4312
4313/*
4314 * stream_open_file
4315 * Open a file that we'll use to serialize changes for a toplevel
4316 * transaction.
4317 *
4318 * Open a file for streamed changes from a toplevel transaction identified
4319 * by stream_xid (global variable). If it's the first chunk of streamed
4320 * changes for this transaction, create the buffile, otherwise open the
4321 * previously created file.
4322 */
4323static void
4324stream_open_file(Oid subid, TransactionId xid, bool first_segment)
4325{
4326 char path[MAXPGPATH];
4327 MemoryContext oldcxt;
4328
4329 Assert(OidIsValid(subid));
4331 Assert(stream_fd == NULL);
4332
4333
4334 changes_filename(path, subid, xid);
4335 elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
4336
4337 /*
4338 * Create/open the buffiles under the logical streaming context so that we
4339 * have those files until stream stop.
4340 */
4342
4343 /*
4344 * If this is the first streamed segment, create the changes file.
4345 * Otherwise, just open the file for writing, in append mode.
4346 */
4347 if (first_segment)
4349 path);
4350 else
4351 {
4352 /*
4353 * Open the file and seek to the end of the file because we always
4354 * append the changes file.
4355 */
4357 path, O_RDWR, false);
4358 BufFileSeek(stream_fd, 0, 0, SEEK_END);
4359 }
4360
4361 MemoryContextSwitchTo(oldcxt);
4362}
4363
4364/*
4365 * stream_close_file
4366 * Close the currently open file with streamed changes.
4367 */
4368static void
4370{
4371 Assert(stream_fd != NULL);
4372
4374
4375 stream_fd = NULL;
4376}
4377
4378/*
4379 * stream_write_change
4380 * Serialize a change to a file for the current toplevel transaction.
4381 *
4382 * The change is serialized in a simple format, with length (not including
4383 * the length), action code (identifying the message type) and message
4384 * contents (without the subxact TransactionId value).
4385 */
4386static void
4388{
4389 int len;
4390
4391 Assert(stream_fd != NULL);
4392
4393 /* total on-disk size, including the action type character */
4394 len = (s->len - s->cursor) + sizeof(char);
4395
4396 /* first write the size */
4397 BufFileWrite(stream_fd, &len, sizeof(len));
4398
4399 /* then the action */
4400 BufFileWrite(stream_fd, &action, sizeof(action));
4401
4402 /* and finally the remaining part of the buffer (after the XID) */
4403 len = (s->len - s->cursor);
4404
4406}
4407
4408/*
4409 * stream_open_and_write_change
4410 * Serialize a message to a file for the given transaction.
4411 *
4412 * This function is similar to stream_write_change except that it will open the
4413 * target file if not already before writing the message and close the file at
4414 * the end.
4415 */
4416static void
4418{
4420
4421 if (!stream_fd)
4422 stream_start_internal(xid, false);
4423
4426}
4427
4428/*
4429 * Sets streaming options including replication slot name and origin start
4430 * position. Workers need these options for logical replication.
4431 */
4432void
4434 char *slotname,
4435 XLogRecPtr *origin_startpos)
4436{
4437 int server_version;
4438
4439 options->logical = true;
4440 options->startpoint = *origin_startpos;
4441 options->slotname = slotname;
4442
4444 options->proto.logical.proto_version =
4449
4450 options->proto.logical.publication_names = MySubscription->publications;
4451 options->proto.logical.binary = MySubscription->binary;
4452
4453 /*
4454 * Assign the appropriate option value for streaming option according to
4455 * the 'streaming' mode and the publisher's ability to support that mode.
4456 */
4457 if (server_version >= 160000 &&
4458 MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
4459 {
4460 options->proto.logical.streaming_str = "parallel";
4462 }
4463 else if (server_version >= 140000 &&
4464 MySubscription->stream != LOGICALREP_STREAM_OFF)
4465 {
4466 options->proto.logical.streaming_str = "on";
4468 }
4469 else
4470 {
4471 options->proto.logical.streaming_str = NULL;
4473 }
4474
4475 options->proto.logical.twophase = false;
4476 options->proto.logical.origin = pstrdup(MySubscription->origin);
4477}
4478
4479/*
4480 * Cleanup the memory for subxacts and reset the related variables.
4481 */
4482static inline void
4484{
4487
4488 subxact_data.subxacts = NULL;
4492}
4493
4494/*
4495 * Common function to run the apply loop with error handling. Disable the
4496 * subscription, if necessary.
4497 *
4498 * Note that we don't handle FATAL errors which are probably because
4499 * of system resource error and are not repeatable.
4500 */
4501void
4502start_apply(XLogRecPtr origin_startpos)
4503{
4504 PG_TRY();
4505 {
4506 LogicalRepApplyLoop(origin_startpos);
4507 }
4508 PG_CATCH();
4509 {
4512 else
4513 {
4514 /*
4515 * Report the worker failed while applying changes. Abort the
4516 * current transaction so that the stats message is sent in an
4517 * idle state.
4518 */
4521
4522 PG_RE_THROW();
4523 }
4524 }
4525 PG_END_TRY();
4526}
4527
4528/*
4529 * Runs the leader apply worker.
4530 *
4531 * It sets up replication origin, streaming options and then starts streaming.
4532 */
4533static void
4535{
4536 char originname[NAMEDATALEN];
4537 XLogRecPtr origin_startpos = InvalidXLogRecPtr;
4538 char *slotname = NULL;
4540 RepOriginId originid;
4541 TimeLineID startpointTLI;
4542 char *err;
4543 bool must_use_password;
4544
4545 slotname = MySubscription->slotname;
4546
4547 /*
4548 * This shouldn't happen if the subscription is enabled, but guard against
4549 * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
4550 * slot is NULL.)
4551 */
4552 if (!slotname)
4553 ereport(ERROR,
4554 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
4555 errmsg("subscription has no replication slot set")));
4556
4557 /* Setup replication origin tracking. */
4559 originname, sizeof(originname));
4561 originid = replorigin_by_name(originname, true);
4562 if (!OidIsValid(originid))
4563 originid = replorigin_create(originname);
4564 replorigin_session_setup(originid, 0);
4565 replorigin_session_origin = originid;
4566 origin_startpos = replorigin_session_get_progress(false);
4568
4569 /* Is the use of a password mandatory? */
4570 must_use_password = MySubscription->passwordrequired &&
4572
4574 true, must_use_password,
4576
4577 if (LogRepWorkerWalRcvConn == NULL)
4578 ereport(ERROR,
4579 (errcode(ERRCODE_CONNECTION_FAILURE),
4580 errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
4581 MySubscription->name, err)));
4582
4583 /*
4584 * We don't really use the output identify_system for anything but it does
4585 * some initializations on the upstream so let's still call it.
4586 */
4587 (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
4588
4590
4591 set_stream_options(&options, slotname, &origin_startpos);
4592
4593 /*
4594 * Even when the two_phase mode is requested by the user, it remains as
4595 * the tri-state PENDING until all tablesyncs have reached READY state.
4596 * Only then, can it become ENABLED.
4597 *
4598 * Note: If the subscription has no tables then leave the state as
4599 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
4600 * work.
4601 */
4602 if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
4604 {
4605 /* Start streaming with two_phase enabled */
4606 options.proto.logical.twophase = true;
4608
4610 UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
4611 MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
4613 }
4614 else
4615 {
4617 }
4618
4620 (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
4622 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
4623 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
4624 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
4625 "?")));
4626
4627 /* Run the main loop. */
4628 start_apply(origin_startpos);
4629}
4630
4631/*
4632 * Common initialization for leader apply worker, parallel apply worker and
4633 * tablesync worker.
4634 *
4635 * Initialize the database connection, in-memory subscription and necessary
4636 * config options.
4637 */
4638void
4640{
4641 MemoryContext oldctx;
4642
4643 /* Run as replica session replication role. */
4644 SetConfigOption("session_replication_role", "replica",
4646
4647 /* Connect to our database. */
4650 0);
4651
4652 /*
4653 * Set always-secure search path, so malicious users can't redirect user
4654 * code (e.g. pg_index.indexprs).
4655 */
4656 SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
4657
4658 /* Load the subscription into persistent memory context. */
4660 "ApplyContext",
4664
4666 if (!MySubscription)
4667 {
4668 ereport(LOG,
4669 (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
4671
4672 /* Ensure we remove no-longer-useful entry for worker's start time */
4675
4676 proc_exit(0);
4677 }
4678
4679 MySubscriptionValid = true;
4680 MemoryContextSwitchTo(oldctx);
4681
4682 if (!MySubscription->enabled)
4683 {
4684 ereport(LOG,
4685 (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
4686 MySubscription->name)));
4687
4689 }
4690
4691 /* Setup synchronous commit according to the user's wishes */
4692 SetConfigOption("synchronous_commit", MySubscription->synccommit,
4694
4695 /*
4696 * Keep us informed about subscription or role changes. Note that the
4697 * role's superuser privilege can be revoked.
4698 */
4699 CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
4701 (Datum) 0);
4702
4705 (Datum) 0);
4706
4707 if (am_tablesync_worker())
4708 ereport(LOG,
4709 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
4712 else
4713 ereport(LOG,
4714 (errmsg("logical replication apply worker for subscription \"%s\" has started",
4715 MySubscription->name)));
4716
4718}
4719
4720/*
4721 * Reset the origin state.
4722 */
4723static void
4725{
4729}
4730
4731/* Common function to setup the leader apply or tablesync worker. */
4732void
4734{
4735 /* Attach to slot */
4736 logicalrep_worker_attach(worker_slot);
4737
4739
4740 /* Setup signal handling */
4742 pqsignal(SIGTERM, die);
4744
4745 /*
4746 * We don't currently need any ResourceOwner in a walreceiver process, but
4747 * if we did, we could call CreateAuxProcessResourceOwner here.
4748 */
4749
4750 /* Initialise stats to a sanish value */
4753
4754 /* Load the libpq-specific functions */
4755 load_file("libpqwalreceiver", false);
4756
4758
4759 /*
4760 * Register a callback to reset the origin state before aborting any
4761 * pending transaction during shutdown (see ShutdownPostgres()). This will
4762 * avoid origin advancement for an in-complete transaction which could
4763 * otherwise lead to its loss as such a transaction won't be sent by the
4764 * server again.
4765 *
4766 * Note that even a LOG or DEBUG statement placed after setting the origin
4767 * state may process a shutdown signal before committing the current apply
4768 * operation. So, it is important to register such a callback here.
4769 */
4771
4772 /* Connect to the origin and start the replication. */
4773 elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
4775
4776 /*
4777 * Setup callback for syscache so that we know when something changes in
4778 * the subscription relation state.
4779 */
4780 CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
4782 (Datum) 0);
4783}
4784
4785/* Logical Replication Apply worker entry point */
4786void
4788{
4789 int worker_slot = DatumGetInt32(main_arg);
4790
4792
4793 SetupApplyOrSyncWorker(worker_slot);
4794
4796
4798
4799 proc_exit(0);
4800}
4801
4802/*
4803 * After error recovery, disable the subscription in a new transaction
4804 * and exit cleanly.
4805 */
4806void
4808{
4809 /*
4810 * Emit the error message, and recover from the error state to an idle
4811 * state
4812 */
4814
4818
4820
4821 /* Report the worker failed during either table synchronization or apply */
4824
4825 /* Disable the subscription */
4829
4830 /* Ensure we remove no-longer-useful entry for worker's start time */
4833
4834 /* Notify the subscription has been disabled and exit */
4835 ereport(LOG,
4836 errmsg("subscription \"%s\" has been disabled because of an error",
4838
4839 proc_exit(0);
4840}
4841
4842/*
4843 * Is current process a logical replication worker?
4844 */
4845bool
4847{
4848 return MyLogicalRepWorker != NULL;
4849}
4850
4851/*
4852 * Is current process a logical replication parallel apply worker?
4853 */
4854bool
4856{
4858}
4859
4860/*
4861 * Start skipping changes of the transaction if the given LSN matches the
4862 * LSN specified by subscription's skiplsn.
4863 */
4864static void
4866{
4870
4871 /*
4872 * Quick return if it's not requested to skip this transaction. This
4873 * function is called for every remote transaction and we assume that
4874 * skipping the transaction is not used often.
4875 */
4877 MySubscription->skiplsn != finish_lsn))
4878 return;
4879
4880 /* Start skipping all changes of this transaction */
4881 skip_xact_finish_lsn = finish_lsn;
4882
4883 ereport(LOG,
4884 errmsg("logical replication starts skipping transaction at LSN %X/%X",
4886}
4887
4888/*
4889 * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
4890 */
4891static void
4893{
4894 if (!is_skipping_changes())
4895 return;
4896
4897 ereport(LOG,
4898 (errmsg("logical replication completed skipping transaction at LSN %X/%X",
4900
4901 /* Stop skipping changes */
4903}
4904
4905/*
4906 * Clear subskiplsn of pg_subscription catalog.
4907 *
4908 * finish_lsn is the transaction's finish LSN that is used to check if the
4909 * subskiplsn matches it. If not matched, we raise a warning when clearing the
4910 * subskiplsn in order to inform users for cases e.g., where the user mistakenly
4911 * specified the wrong subskiplsn.
4912 */
4913static void
4915{
4916 Relation rel;
4917 Form_pg_subscription subform;
4918 HeapTuple tup;
4919 XLogRecPtr myskiplsn = MySubscription->skiplsn;
4920 bool started_tx = false;
4921
4923 return;
4924
4925 if (!IsTransactionState())
4926 {
4928 started_tx = true;
4929 }
4930
4931 /*
4932 * Protect subskiplsn of pg_subscription from being concurrently updated
4933 * while clearing it.
4934 */
4935 LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
4937
4938 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
4939
4940 /* Fetch the existing tuple. */
4941 tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
4943
4944 if (!HeapTupleIsValid(tup))
4945 elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
4946
4947 subform = (Form_pg_subscription) GETSTRUCT(tup);
4948
4949 /*
4950 * Clear the subskiplsn. If the user has already changed subskiplsn before
4951 * clearing it we don't update the catalog and the replication origin
4952 * state won't get advanced. So in the worst case, if the server crashes
4953 * before sending an acknowledgment of the flush position the transaction
4954 * will be sent again and the user needs to set subskiplsn again. We can
4955 * reduce the possibility by logging a replication origin WAL record to
4956 * advance the origin LSN instead but there is no way to advance the
4957 * origin timestamp and it doesn't seem to be worth doing anything about
4958 * it since it's a very rare case.
4959 */
4960 if (subform->subskiplsn == myskiplsn)
4961 {
4962 bool nulls[Natts_pg_subscription];
4963 bool replaces[Natts_pg_subscription];
4964 Datum values[Natts_pg_subscription];
4965
4966 memset(values, 0, sizeof(values));
4967 memset(nulls, false, sizeof(nulls));
4968 memset(replaces, false, sizeof(replaces));
4969
4970 /* reset subskiplsn */
4971 values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
4972 replaces[Anum_pg_subscription_subskiplsn - 1] = true;
4973
4974 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
4975 replaces);
4976 CatalogTupleUpdate(rel, &tup->t_self, tup);
4977
4978 if (myskiplsn != finish_lsn)
4980 errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
4981 errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
4982 LSN_FORMAT_ARGS(finish_lsn),
4983 LSN_FORMAT_ARGS(myskiplsn)));
4984 }
4985
4986 heap_freetuple(tup);
4987 table_close(rel, NoLock);
4988
4989 if (started_tx)
4991}
4992
4993/* Error callback to give more context info about the change being applied */
4994void
4996{
4998 int elevel;
4999
5001 return;
5002
5003 Assert(errarg->origin_name);
5004
5005 elevel = geterrlevel();
5006
5007 /*
5008 * Reset the origin state to prevent the advancement of origin progress if
5009 * we fail to apply. Otherwise, this will result in transaction loss as
5010 * that transaction won't be sent again by the server.
5011 */
5012 if (elevel >= ERROR)
5013 replorigin_reset(0, (Datum) 0);
5014
5015 if (errarg->rel == NULL)
5016 {
5017 if (!TransactionIdIsValid(errarg->remote_xid))
5018 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
5019 errarg->origin_name,
5021 else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5022 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
5023 errarg->origin_name,
5025 errarg->remote_xid);
5026 else
5027 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
5028 errarg->origin_name,
5030 errarg->remote_xid,
5031 LSN_FORMAT_ARGS(errarg->finish_lsn));
5032 }
5033 else
5034 {
5035 if (errarg->remote_attnum < 0)
5036 {
5037 if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5038 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
5039 errarg->origin_name,
5041 errarg->rel->remoterel.nspname,
5042 errarg->rel->remoterel.relname,
5043 errarg->remote_xid);
5044 else
5045 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
5046 errarg->origin_name,
5048 errarg->rel->remoterel.nspname,
5049 errarg->rel->remoterel.relname,
5050 errarg->remote_xid,
5051 LSN_FORMAT_ARGS(errarg->finish_lsn));
5052 }
5053 else
5054 {
5055 if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5056 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
5057 errarg->origin_name,
5059 errarg->rel->remoterel.nspname,
5060 errarg->rel->remoterel.relname,
5061 errarg->rel->remoterel.attnames[errarg->remote_attnum],
5062 errarg->remote_xid);
5063 else
5064 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
5065 errarg->origin_name,
5067 errarg->rel->remoterel.nspname,
5068 errarg->rel->remoterel.relname,
5069 errarg->rel->remoterel.attnames[errarg->remote_attnum],
5070 errarg->remote_xid,
5071 LSN_FORMAT_ARGS(errarg->finish_lsn));
5072 }
5073 }
5074}
5075
5076/* Set transaction information of apply error callback */
5077static inline void
5079{
5082}
5083
5084/* Reset all information of apply error callback */
5085static inline void
5087{
5092}
5093
5094/*
5095 * Request wakeup of the workers for the given subscription OID
5096 * at commit of the current transaction.
5097 *
5098 * This is used to ensure that the workers process assorted changes
5099 * as soon as possible.
5100 */
5101void
5103{
5104 MemoryContext oldcxt;
5105
5109 MemoryContextSwitchTo(oldcxt);
5110}
5111
5112/*
5113 * Wake up the workers of any subscriptions that were changed in this xact.
5114 */
5115void
5117{
5118 if (isCommit && on_commit_wakeup_workers_subids != NIL)
5119 {
5120 ListCell *lc;
5121
5122 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
5124 {
5125 Oid subid = lfirst_oid(lc);
5126 List *workers;
5127 ListCell *lc2;
5128
5129 workers = logicalrep_workers_find(subid, true, false);
5130 foreach(lc2, workers)
5131 {
5132 LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
5133
5135 }
5136 }
5137 LWLockRelease(LogicalRepWorkerLock);
5138 }
5139
5140 /* The List storage will be reclaimed automatically in xact cleanup. */
5142}
5143
5144/*
5145 * Allocate the origin name in long-lived context for error context message.
5146 */
5147void
5149{
5151 originname);
5152}
5153
5154/*
5155 * Return the action to be taken for the given transaction. See
5156 * TransApplyAction for information on each of the actions.
5157 *
5158 * *winfo is assigned to the destination parallel worker info when the leader
5159 * apply worker has to pass all the transaction's changes to the parallel
5160 * apply worker.
5161 */
5162static TransApplyAction
5164{
5165 *winfo = NULL;
5166
5168 {
5169 return TRANS_PARALLEL_APPLY;
5170 }
5171
5172 /*
5173 * If we are processing this transaction using a parallel apply worker
5174 * then either we send the changes to the parallel worker or if the worker