PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
worker.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 * worker.c
3 * PostgreSQL logical replication worker (apply)
4 *
5 * Copyright (c) 2016-2025, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/worker.c
9 *
10 * NOTES
11 * This file contains the worker which applies logical changes as they come
12 * from remote logical replication stream.
13 *
14 * The main worker (apply) is started by logical replication worker
15 * launcher for every enabled subscription in a database. It uses
16 * walsender protocol to communicate with publisher.
17 *
18 * This module includes server facing code and shares libpqwalreceiver
19 * module with walreceiver for providing the libpq specific functionality.
20 *
21 *
22 * STREAMED TRANSACTIONS
23 * ---------------------
24 * Streamed transactions (large transactions exceeding a memory limit on the
25 * upstream) are applied using one of two approaches:
26 *
27 * 1) Write to temporary files and apply when the final commit arrives
28 *
29 * This approach is used when the user has set the subscription's streaming
30 * option as on.
31 *
32 * Unlike the regular (non-streamed) case, handling streamed transactions has
33 * to handle aborts of both the toplevel transaction and subtransactions. This
34 * is achieved by tracking offsets for subtransactions, which is then used
35 * to truncate the file with serialized changes.
36 *
37 * The files are placed in tmp file directory by default, and the filenames
38 * include both the XID of the toplevel transaction and OID of the
39 * subscription. This is necessary so that different workers processing a
40 * remote transaction with the same XID doesn't interfere.
41 *
42 * We use BufFiles instead of using normal temporary files because (a) the
43 * BufFile infrastructure supports temporary files that exceed the OS file size
44 * limit, (b) provides a way for automatic clean up on the error and (c) provides
45 * a way to survive these files across local transactions and allow to open and
46 * close at stream start and close. We decided to use FileSet
47 * infrastructure as without that it deletes the files on the closure of the
48 * file and if we decide to keep stream files open across the start/stop stream
49 * then it will consume a lot of memory (more than 8K for each BufFile and
50 * there could be multiple such BufFiles as the subscriber could receive
51 * multiple start/stop streams for different transactions before getting the
52 * commit). Moreover, if we don't use FileSet then we also need to invent
53 * a new way to pass filenames to BufFile APIs so that we are allowed to open
54 * the file we desired across multiple stream-open calls for the same
55 * transaction.
56 *
57 * 2) Parallel apply workers.
58 *
59 * This approach is used when the user has set the subscription's streaming
60 * option as parallel. See logical/applyparallelworker.c for information about
61 * this approach.
62 *
63 * TWO_PHASE TRANSACTIONS
64 * ----------------------
65 * Two phase transactions are replayed at prepare and then committed or
66 * rolled back at commit prepared and rollback prepared respectively. It is
67 * possible to have a prepared transaction that arrives at the apply worker
68 * when the tablesync is busy doing the initial copy. In this case, the apply
69 * worker skips all the prepared operations [e.g. inserts] while the tablesync
70 * is still busy (see the condition of should_apply_changes_for_rel). The
71 * tablesync worker might not get such a prepared transaction because say it
72 * was prior to the initial consistent point but might have got some later
73 * commits. Now, the tablesync worker will exit without doing anything for the
74 * prepared transaction skipped by the apply worker as the sync location for it
75 * will be already ahead of the apply worker's current location. This would lead
76 * to an "empty prepare", because later when the apply worker does the commit
77 * prepare, there is nothing in it (the inserts were skipped earlier).
78 *
79 * To avoid this, and similar prepare confusions the subscription's two_phase
80 * commit is enabled only after the initial sync is over. The two_phase option
81 * has been implemented as a tri-state with values DISABLED, PENDING, and
82 * ENABLED.
83 *
84 * Even if the user specifies they want a subscription with two_phase = on,
85 * internally it will start with a tri-state of PENDING which only becomes
86 * ENABLED after all tablesync initializations are completed - i.e. when all
87 * tablesync workers have reached their READY state. In other words, the value
88 * PENDING is only a temporary state for subscription start-up.
89 *
90 * Until the two_phase is properly available (ENABLED) the subscription will
91 * behave as if two_phase = off. When the apply worker detects that all
92 * tablesyncs have become READY (while the tri-state was PENDING) it will
93 * restart the apply worker process. This happens in
94 * process_syncing_tables_for_apply.
95 *
96 * When the (re-started) apply worker finds that all tablesyncs are READY for a
97 * two_phase tri-state of PENDING it start streaming messages with the
98 * two_phase option which in turn enables the decoding of two-phase commits at
99 * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100 * Now, it is possible that during the time we have not enabled two_phase, the
101 * publisher (replication server) would have skipped some prepares but we
102 * ensure that such prepares are sent along with commit prepare, see
103 * ReorderBufferFinishPrepared.
104 *
105 * If the subscription has no tables then a two_phase tri-state PENDING is
106 * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107 * PUBLICATION which might otherwise be disallowed (see below).
108 *
109 * If ever a user needs to be aware of the tri-state value, they can fetch it
110 * from the pg_subscription catalog (see column subtwophasestate).
111 *
112 * We don't allow to toggle two_phase option of a subscription because it can
113 * lead to an inconsistent replica. Consider, initially, it was on and we have
114 * received some prepare then we turn it off, now at commit time the server
115 * will send the entire transaction data along with the commit. With some more
116 * analysis, we can allow changing this option from off to on but not sure if
117 * that alone would be useful.
118 *
119 * Finally, to avoid problems mentioned in previous paragraphs from any
120 * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
121 * to 'off' and then again back to 'on') there is a restriction for
122 * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
123 * the two_phase tri-state is ENABLED, except when copy_data = false.
124 *
125 * We can get prepare of the same GID more than once for the genuine cases
126 * where we have defined multiple subscriptions for publications on the same
127 * server and prepared transaction has operations on tables subscribed to those
128 * subscriptions. For such cases, if we use the GID sent by publisher one of
129 * the prepares will be successful and others will fail, in which case the
130 * server will send them again. Now, this can lead to a deadlock if user has
131 * set synchronous_standby_names for all the subscriptions on subscriber. To
132 * avoid such deadlocks, we generate a unique GID (consisting of the
133 * subscription oid and the xid of the prepared transaction) for each prepare
134 * transaction on the subscriber.
135 *
136 * FAILOVER
137 * ----------------------
138 * The logical slot on the primary can be synced to the standby by specifying
139 * failover = true when creating the subscription. Enabling failover allows us
140 * to smoothly transition to the promoted standby, ensuring that we can
141 * subscribe to the new primary without losing any data.
142 *-------------------------------------------------------------------------
143 */
144
145#include "postgres.h"
146
147#include <sys/stat.h>
148#include <unistd.h>
149
150#include "access/table.h"
151#include "access/tableam.h"
152#include "access/twophase.h"
153#include "access/xact.h"
154#include "catalog/indexing.h"
155#include "catalog/pg_inherits.h"
158#include "commands/tablecmds.h"
159#include "commands/trigger.h"
160#include "executor/executor.h"
162#include "libpq/pqformat.h"
163#include "miscadmin.h"
164#include "optimizer/optimizer.h"
166#include "pgstat.h"
167#include "postmaster/bgworker.h"
168#include "postmaster/interrupt.h"
169#include "postmaster/walwriter.h"
170#include "replication/conflict.h"
175#include "replication/origin.h"
179#include "storage/buffile.h"
180#include "storage/ipc.h"
181#include "storage/lmgr.h"
182#include "tcop/tcopprot.h"
183#include "utils/acl.h"
184#include "utils/dynahash.h"
185#include "utils/guc.h"
186#include "utils/inval.h"
187#include "utils/lsyscache.h"
188#include "utils/memutils.h"
189#include "utils/pg_lsn.h"
190#include "utils/rel.h"
191#include "utils/rls.h"
192#include "utils/snapmgr.h"
193#include "utils/syscache.h"
194#include "utils/usercontext.h"
195
196#define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
197
198typedef struct FlushPosition
199{
204
206
207typedef struct ApplyExecutionData
208{
209 EState *estate; /* executor state, used to track resources */
210
211 LogicalRepRelMapEntry *targetRel; /* replication target rel */
212 ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
213
214 /* These fields are used when the target relation is partitioned: */
215 ModifyTableState *mtstate; /* dummy ModifyTable state */
216 PartitionTupleRouting *proute; /* partition routing info */
218
219/* Struct for saving and restoring apply errcontext information */
221{
222 LogicalRepMsgType command; /* 0 if invalid */
224
225 /* Remote node information */
226 int remote_attnum; /* -1 if invalid */
231
232/*
233 * The action to be taken for the changes in the transaction.
234 *
235 * TRANS_LEADER_APPLY:
236 * This action means that we are in the leader apply worker or table sync
237 * worker. The changes of the transaction are either directly applied or
238 * are read from temporary files (for streaming transactions) and then
239 * applied by the worker.
240 *
241 * TRANS_LEADER_SERIALIZE:
242 * This action means that we are in the leader apply worker or table sync
243 * worker. Changes are written to temporary files and then applied when the
244 * final commit arrives.
245 *
246 * TRANS_LEADER_SEND_TO_PARALLEL:
247 * This action means that we are in the leader apply worker and need to send
248 * the changes to the parallel apply worker.
249 *
250 * TRANS_LEADER_PARTIAL_SERIALIZE:
251 * This action means that we are in the leader apply worker and have sent some
252 * changes directly to the parallel apply worker and the remaining changes are
253 * serialized to a file, due to timeout while sending data. The parallel apply
254 * worker will apply these serialized changes when the final commit arrives.
255 *
256 * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
257 * serializing changes, the leader worker also needs to serialize the
258 * STREAM_XXX message to a file, and wait for the parallel apply worker to
259 * finish the transaction when processing the transaction finish command. So
260 * this new action was introduced to keep the code and logic clear.
261 *
262 * TRANS_PARALLEL_APPLY:
263 * This action means that we are in the parallel apply worker and changes of
264 * the transaction are applied directly by the worker.
265 */
266typedef enum
267{
268 /* The action for non-streaming transactions. */
270
271 /* Actions for streaming transactions. */
277
278/* errcontext tracker */
280{
281 .command = 0,
282 .rel = NULL,
283 .remote_attnum = -1,
284 .remote_xid = InvalidTransactionId,
285 .finish_lsn = InvalidXLogRecPtr,
286 .origin_name = NULL,
287};
288
290
293
294/* per stream context for streaming transactions */
296
298
300static bool MySubscriptionValid = false;
301
303
306
307/* fields valid only when processing streamed transaction */
308static bool in_streamed_transaction = false;
309
311
312/*
313 * The number of changes applied by parallel apply worker during one streaming
314 * block.
315 */
317
318/* Are we initializing an apply worker? */
320
321/*
322 * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
323 * the subscription if the remote transaction's finish LSN matches the subskiplsn.
324 * Once we start skipping changes, we don't stop it until we skip all changes of
325 * the transaction even if pg_subscription is updated and MySubscription->skiplsn
326 * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
327 * we don't skip receiving and spooling the changes since we decide whether or not
328 * to skip applying the changes when starting to apply changes. The subskiplsn is
329 * cleared after successfully skipping the transaction or applying non-empty
330 * transaction. The latter prevents the mistakenly specified subskiplsn from
331 * being left. Note that we cannot skip the streaming transactions when using
332 * parallel apply workers because we cannot get the finish LSN before applying
333 * the changes. So, we don't start parallel apply worker when finish LSN is set
334 * by the user.
335 */
337#define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
338
339/* BufFile handle of the current streaming file */
340static BufFile *stream_fd = NULL;
341
342typedef struct SubXactInfo
343{
344 TransactionId xid; /* XID of the subxact */
345 int fileno; /* file number in the buffile */
346 off_t offset; /* offset in the file */
348
349/* Sub-transaction data for the current streaming transaction */
350typedef struct ApplySubXactData
351{
352 uint32 nsubxacts; /* number of sub-transactions */
353 uint32 nsubxacts_max; /* current capacity of subxacts */
354 TransactionId subxact_last; /* xid of the last sub-transaction */
355 SubXactInfo *subxacts; /* sub-xact offset in changes file */
357
359
360static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
361static inline void changes_filename(char *path, Oid subid, TransactionId xid);
362
363/*
364 * Information about subtransactions of a given toplevel transaction.
365 */
366static void subxact_info_write(Oid subid, TransactionId xid);
367static void subxact_info_read(Oid subid, TransactionId xid);
368static void subxact_info_add(TransactionId xid);
369static inline void cleanup_subxact_info(void);
370
371/*
372 * Serialize and deserialize changes for a toplevel transaction.
373 */
374static void stream_open_file(Oid subid, TransactionId xid,
375 bool first_segment);
376static void stream_write_change(char action, StringInfo s);
378static void stream_close_file(void);
379
380static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
381
382static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
384 ResultRelInfo *relinfo,
385 TupleTableSlot *remoteslot);
387 ResultRelInfo *relinfo,
388 TupleTableSlot *remoteslot,
389 LogicalRepTupleData *newtup,
390 Oid localindexoid);
392 ResultRelInfo *relinfo,
393 TupleTableSlot *remoteslot,
394 Oid localindexoid);
395static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
396 LogicalRepRelation *remoterel,
397 Oid localidxoid,
398 TupleTableSlot *remoteslot,
399 TupleTableSlot **localslot);
401 TupleTableSlot *remoteslot,
402 LogicalRepTupleData *newtup,
403 CmdType operation);
404
405/* Functions for skipping changes */
406static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
407static void stop_skipping_changes(void);
408static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
409
410/* Functions for apply error callback */
411static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
412static inline void reset_apply_error_context_info(void);
413
416
417/*
418 * Form the origin name for the subscription.
419 *
420 * This is a common function for tablesync and other workers. Tablesync workers
421 * must pass a valid relid. Other callers must pass relid = InvalidOid.
422 *
423 * Return the name in the supplied buffer.
424 */
425void
427 char *originname, Size szoriginname)
428{
429 if (OidIsValid(relid))
430 {
431 /* Replication origin name for tablesync workers. */
432 snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
433 }
434 else
435 {
436 /* Replication origin name for non-tablesync workers. */
437 snprintf(originname, szoriginname, "pg_%u", suboid);
438 }
439}
440
441/*
442 * Should this worker apply changes for given relation.
443 *
444 * This is mainly needed for initial relation data sync as that runs in
445 * separate worker process running in parallel and we need some way to skip
446 * changes coming to the leader apply worker during the sync of a table.
447 *
448 * Note we need to do smaller or equals comparison for SYNCDONE state because
449 * it might hold position of end of initial slot consistent point WAL
450 * record + 1 (ie start of next record) and next record can be COMMIT of
451 * transaction we are now processing (which is what we set remote_final_lsn
452 * to in apply_handle_begin).
453 *
454 * Note that for streaming transactions that are being applied in the parallel
455 * apply worker, we disallow applying changes if the target table in the
456 * subscription is not in the READY state, because we cannot decide whether to
457 * apply the change as we won't know remote_final_lsn by that time.
458 *
459 * We already checked this in pa_can_start() before assigning the
460 * streaming transaction to the parallel worker, but it also needs to be
461 * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
462 * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
463 * while applying this transaction.
464 */
465static bool
467{
468 switch (MyLogicalRepWorker->type)
469 {
471 return MyLogicalRepWorker->relid == rel->localreloid;
472
474 /* We don't synchronize rel's that are in unknown state. */
475 if (rel->state != SUBREL_STATE_READY &&
476 rel->state != SUBREL_STATE_UNKNOWN)
478 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
479 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
481 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
482
483 return rel->state == SUBREL_STATE_READY;
484
485 case WORKERTYPE_APPLY:
486 return (rel->state == SUBREL_STATE_READY ||
487 (rel->state == SUBREL_STATE_SYNCDONE &&
488 rel->statelsn <= remote_final_lsn));
489
491 /* Should never happen. */
492 elog(ERROR, "Unknown worker type");
493 }
494
495 return false; /* dummy for compiler */
496}
497
498/*
499 * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
500 *
501 * Start a transaction, if this is the first step (else we keep using the
502 * existing transaction).
503 * Also provide a global snapshot and ensure we run in ApplyMessageContext.
504 */
505static void
507{
509
510 if (!IsTransactionState())
511 {
514 }
515
517
519}
520
521/*
522 * Finish up one step of a replication transaction.
523 * Callers of begin_replication_step() must also call this.
524 *
525 * We don't close out the transaction here, but we should increment
526 * the command counter to make the effects of this step visible.
527 */
528static void
530{
532
534}
535
536/*
537 * Handle streamed transactions for both the leader apply worker and the
538 * parallel apply workers.
539 *
540 * In the streaming case (receiving a block of the streamed transaction), for
541 * serialize mode, simply redirect it to a file for the proper toplevel
542 * transaction, and for parallel mode, the leader apply worker will send the
543 * changes to parallel apply workers and the parallel apply worker will define
544 * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
545 * messages will be applied by both leader apply worker and parallel apply
546 * workers).
547 *
548 * Returns true for streamed transactions (when the change is either serialized
549 * to file or sent to parallel apply worker), false otherwise (regular mode or
550 * needs to be processed by parallel apply worker).
551 *
552 * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
553 * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
554 * to a parallel apply worker.
555 */
556static bool
558{
559 TransactionId current_xid;
561 TransApplyAction apply_action;
562 StringInfoData original_msg;
563
564 apply_action = get_transaction_apply_action(stream_xid, &winfo);
565
566 /* not in streaming mode */
567 if (apply_action == TRANS_LEADER_APPLY)
568 return false;
569
571
572 /*
573 * The parallel apply worker needs the xid in this message to decide
574 * whether to define a savepoint, so save the original message that has
575 * not moved the cursor after the xid. We will serialize this message to a
576 * file in PARTIAL_SERIALIZE mode.
577 */
578 original_msg = *s;
579
580 /*
581 * We should have received XID of the subxact as the first part of the
582 * message, so extract it.
583 */
584 current_xid = pq_getmsgint(s, 4);
585
586 if (!TransactionIdIsValid(current_xid))
588 (errcode(ERRCODE_PROTOCOL_VIOLATION),
589 errmsg_internal("invalid transaction ID in streamed replication transaction")));
590
591 switch (apply_action)
592 {
595
596 /* Add the new subxact to the array (unless already there). */
597 subxact_info_add(current_xid);
598
599 /* Write the change to the current file */
601 return true;
602
604 Assert(winfo);
605
606 /*
607 * XXX The publisher side doesn't always send relation/type update
608 * messages after the streaming transaction, so also update the
609 * relation/type in leader apply worker. See function
610 * cleanup_rel_sync_cache.
611 */
612 if (pa_send_data(winfo, s->len, s->data))
613 return (action != LOGICAL_REP_MSG_RELATION &&
615
616 /*
617 * Switch to serialize mode when we are not able to send the
618 * change to parallel apply worker.
619 */
620 pa_switch_to_partial_serialize(winfo, false);
621
622 /* fall through */
624 stream_write_change(action, &original_msg);
625
626 /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
627 return (action != LOGICAL_REP_MSG_RELATION &&
629
632
633 /* Define a savepoint for a subxact if needed. */
634 pa_start_subtrans(current_xid, stream_xid);
635 return false;
636
637 default:
638 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
639 return false; /* silence compiler warning */
640 }
641}
642
643/*
644 * Executor state preparation for evaluation of constraint expressions,
645 * indexes and triggers for the specified relation.
646 *
647 * Note that the caller must open and close any indexes to be updated.
648 */
649static ApplyExecutionData *
651{
652 ApplyExecutionData *edata;
653 EState *estate;
654 RangeTblEntry *rte;
655 List *perminfos = NIL;
656 ResultRelInfo *resultRelInfo;
657
658 edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
659 edata->targetRel = rel;
660
661 edata->estate = estate = CreateExecutorState();
662
663 rte = makeNode(RangeTblEntry);
664 rte->rtekind = RTE_RELATION;
665 rte->relid = RelationGetRelid(rel->localrel);
666 rte->relkind = rel->localrel->rd_rel->relkind;
667 rte->rellockmode = AccessShareLock;
668
669 addRTEPermissionInfo(&perminfos, rte);
670
671 ExecInitRangeTable(estate, list_make1(rte), perminfos,
673
674 edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
675
676 /*
677 * Use Relation opened by logicalrep_rel_open() instead of opening it
678 * again.
679 */
680 InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
681
682 /*
683 * We put the ResultRelInfo in the es_opened_result_relations list, even
684 * though we don't populate the es_result_relations array. That's a bit
685 * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
686 *
687 * ExecOpenIndices() is not called here either, each execution path doing
688 * an apply operation being responsible for that.
689 */
691 lappend(estate->es_opened_result_relations, resultRelInfo);
692
693 estate->es_output_cid = GetCurrentCommandId(true);
694
695 /* Prepare to catch AFTER triggers. */
697
698 /* other fields of edata remain NULL for now */
699
700 return edata;
701}
702
703/*
704 * Finish any operations related to the executor state created by
705 * create_edata_for_relation().
706 */
707static void
709{
710 EState *estate = edata->estate;
711
712 /* Handle any queued AFTER triggers. */
713 AfterTriggerEndQuery(estate);
714
715 /* Shut down tuple routing, if any was done. */
716 if (edata->proute)
717 ExecCleanupTupleRouting(edata->mtstate, edata->proute);
718
719 /*
720 * Cleanup. It might seem that we should call ExecCloseResultRelations()
721 * here, but we intentionally don't. It would close the rel we added to
722 * es_opened_result_relations above, which is wrong because we took no
723 * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
724 * any other relations opened during execution.
725 */
726 ExecResetTupleTable(estate->es_tupleTable, false);
727 FreeExecutorState(estate);
728 pfree(edata);
729}
730
731/*
732 * Executes default values for columns for which we can't map to remote
733 * relation columns.
734 *
735 * This allows us to support tables which have more columns on the downstream
736 * than on the upstream.
737 */
738static void
740 TupleTableSlot *slot)
741{
743 int num_phys_attrs = desc->natts;
744 int i;
745 int attnum,
746 num_defaults = 0;
747 int *defmap;
748 ExprState **defexprs;
749 ExprContext *econtext;
750
751 econtext = GetPerTupleExprContext(estate);
752
753 /* We got all the data via replication, no need to evaluate anything. */
754 if (num_phys_attrs == rel->remoterel.natts)
755 return;
756
757 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
758 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
759
760 Assert(rel->attrmap->maplen == num_phys_attrs);
761 for (attnum = 0; attnum < num_phys_attrs; attnum++)
762 {
763 Expr *defexpr;
764
765 if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
766 continue;
767
768 if (rel->attrmap->attnums[attnum] >= 0)
769 continue;
770
771 defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
772
773 if (defexpr != NULL)
774 {
775 /* Run the expression through planner */
776 defexpr = expression_planner(defexpr);
777
778 /* Initialize executable expression in copycontext */
779 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
780 defmap[num_defaults] = attnum;
781 num_defaults++;
782 }
783 }
784
785 for (i = 0; i < num_defaults; i++)
786 slot->tts_values[defmap[i]] =
787 ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
788}
789
790/*
791 * Store tuple data into slot.
792 *
793 * Incoming data can be either text or binary format.
794 */
795static void
797 LogicalRepTupleData *tupleData)
798{
799 int natts = slot->tts_tupleDescriptor->natts;
800 int i;
801
802 ExecClearTuple(slot);
803
804 /* Call the "in" function for each non-dropped, non-null attribute */
805 Assert(natts == rel->attrmap->maplen);
806 for (i = 0; i < natts; i++)
807 {
809 int remoteattnum = rel->attrmap->attnums[i];
810
811 if (!att->attisdropped && remoteattnum >= 0)
812 {
813 StringInfo colvalue = &tupleData->colvalues[remoteattnum];
814
815 Assert(remoteattnum < tupleData->ncols);
816
817 /* Set attnum for error callback */
819
820 if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
821 {
822 Oid typinput;
823 Oid typioparam;
824
825 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
826 slot->tts_values[i] =
827 OidInputFunctionCall(typinput, colvalue->data,
828 typioparam, att->atttypmod);
829 slot->tts_isnull[i] = false;
830 }
831 else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
832 {
833 Oid typreceive;
834 Oid typioparam;
835
836 /*
837 * In some code paths we may be asked to re-parse the same
838 * tuple data. Reset the StringInfo's cursor so that works.
839 */
840 colvalue->cursor = 0;
841
842 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
843 slot->tts_values[i] =
844 OidReceiveFunctionCall(typreceive, colvalue,
845 typioparam, att->atttypmod);
846
847 /* Trouble if it didn't eat the whole buffer */
848 if (colvalue->cursor != colvalue->len)
850 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
851 errmsg("incorrect binary data format in logical replication column %d",
852 remoteattnum + 1)));
853 slot->tts_isnull[i] = false;
854 }
855 else
856 {
857 /*
858 * NULL value from remote. (We don't expect to see
859 * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
860 * NULL.)
861 */
862 slot->tts_values[i] = (Datum) 0;
863 slot->tts_isnull[i] = true;
864 }
865
866 /* Reset attnum for error callback */
868 }
869 else
870 {
871 /*
872 * We assign NULL to dropped attributes and missing values
873 * (missing values should be later filled using
874 * slot_fill_defaults).
875 */
876 slot->tts_values[i] = (Datum) 0;
877 slot->tts_isnull[i] = true;
878 }
879 }
880
882}
883
884/*
885 * Replace updated columns with data from the LogicalRepTupleData struct.
886 * This is somewhat similar to heap_modify_tuple but also calls the type
887 * input functions on the user data.
888 *
889 * "slot" is filled with a copy of the tuple in "srcslot", replacing
890 * columns provided in "tupleData" and leaving others as-is.
891 *
892 * Caution: unreplaced pass-by-ref columns in "slot" will point into the
893 * storage for "srcslot". This is OK for current usage, but someday we may
894 * need to materialize "slot" at the end to make it independent of "srcslot".
895 */
896static void
899 LogicalRepTupleData *tupleData)
900{
901 int natts = slot->tts_tupleDescriptor->natts;
902 int i;
903
904 /* We'll fill "slot" with a virtual tuple, so we must start with ... */
905 ExecClearTuple(slot);
906
907 /*
908 * Copy all the column data from srcslot, so that we'll have valid values
909 * for unreplaced columns.
910 */
911 Assert(natts == srcslot->tts_tupleDescriptor->natts);
912 slot_getallattrs(srcslot);
913 memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
914 memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
915
916 /* Call the "in" function for each replaced attribute */
917 Assert(natts == rel->attrmap->maplen);
918 for (i = 0; i < natts; i++)
919 {
921 int remoteattnum = rel->attrmap->attnums[i];
922
923 if (remoteattnum < 0)
924 continue;
925
926 Assert(remoteattnum < tupleData->ncols);
927
928 if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
929 {
930 StringInfo colvalue = &tupleData->colvalues[remoteattnum];
931
932 /* Set attnum for error callback */
934
935 if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
936 {
937 Oid typinput;
938 Oid typioparam;
939
940 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
941 slot->tts_values[i] =
942 OidInputFunctionCall(typinput, colvalue->data,
943 typioparam, att->atttypmod);
944 slot->tts_isnull[i] = false;
945 }
946 else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
947 {
948 Oid typreceive;
949 Oid typioparam;
950
951 /*
952 * In some code paths we may be asked to re-parse the same
953 * tuple data. Reset the StringInfo's cursor so that works.
954 */
955 colvalue->cursor = 0;
956
957 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
958 slot->tts_values[i] =
959 OidReceiveFunctionCall(typreceive, colvalue,
960 typioparam, att->atttypmod);
961
962 /* Trouble if it didn't eat the whole buffer */
963 if (colvalue->cursor != colvalue->len)
965 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
966 errmsg("incorrect binary data format in logical replication column %d",
967 remoteattnum + 1)));
968 slot->tts_isnull[i] = false;
969 }
970 else
971 {
972 /* must be LOGICALREP_COLUMN_NULL */
973 slot->tts_values[i] = (Datum) 0;
974 slot->tts_isnull[i] = true;
975 }
976
977 /* Reset attnum for error callback */
979 }
980 }
981
982 /* And finally, declare that "slot" contains a valid virtual tuple */
984}
985
986/*
987 * Handle BEGIN message.
988 */
989static void
991{
992 LogicalRepBeginData begin_data;
993
994 /* There must not be an active streaming transaction. */
996
997 logicalrep_read_begin(s, &begin_data);
998 set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
999
1000 remote_final_lsn = begin_data.final_lsn;
1001
1003
1004 in_remote_transaction = true;
1005
1007}
1008
1009/*
1010 * Handle COMMIT message.
1011 *
1012 * TODO, support tracking of multiple origins
1013 */
1014static void
1016{
1017 LogicalRepCommitData commit_data;
1018
1019 logicalrep_read_commit(s, &commit_data);
1020
1021 if (commit_data.commit_lsn != remote_final_lsn)
1022 ereport(ERROR,
1023 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1024 errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
1025 LSN_FORMAT_ARGS(commit_data.commit_lsn),
1027
1028 apply_handle_commit_internal(&commit_data);
1029
1030 /* Process any tables that are being synchronized in parallel. */
1031 process_syncing_tables(commit_data.end_lsn);
1032
1035}
1036
1037/*
1038 * Handle BEGIN PREPARE message.
1039 */
1040static void
1042{
1043 LogicalRepPreparedTxnData begin_data;
1044
1045 /* Tablesync should never receive prepare. */
1046 if (am_tablesync_worker())
1047 ereport(ERROR,
1048 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1049 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1050
1051 /* There must not be an active streaming transaction. */
1053
1054 logicalrep_read_begin_prepare(s, &begin_data);
1055 set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1056
1057 remote_final_lsn = begin_data.prepare_lsn;
1058
1060
1061 in_remote_transaction = true;
1062
1064}
1065
1066/*
1067 * Common function to prepare the GID.
1068 */
1069static void
1071{
1072 char gid[GIDSIZE];
1073
1074 /*
1075 * Compute unique GID for two_phase transactions. We don't use GID of
1076 * prepared transaction sent by server as that can lead to deadlock when
1077 * we have multiple subscriptions from same node point to publications on
1078 * the same node. See comments atop worker.c
1079 */
1081 gid, sizeof(gid));
1082
1083 /*
1084 * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1085 * called within the PrepareTransactionBlock below.
1086 */
1087 if (!IsTransactionBlock())
1088 {
1090 CommitTransactionCommand(); /* Completes the preceding Begin command. */
1091 }
1092
1093 /*
1094 * Update origin state so we can restart streaming from correct position
1095 * in case of crash.
1096 */
1097 replorigin_session_origin_lsn = prepare_data->end_lsn;
1099
1101}
1102
1103/*
1104 * Handle PREPARE message.
1105 */
1106static void
1108{
1109 LogicalRepPreparedTxnData prepare_data;
1110
1111 logicalrep_read_prepare(s, &prepare_data);
1112
1113 if (prepare_data.prepare_lsn != remote_final_lsn)
1114 ereport(ERROR,
1115 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1116 errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
1117 LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1119
1120 /*
1121 * Unlike commit, here, we always prepare the transaction even though no
1122 * change has happened in this transaction or all changes are skipped. It
1123 * is done this way because at commit prepared time, we won't know whether
1124 * we have skipped preparing a transaction because of those reasons.
1125 *
1126 * XXX, We can optimize such that at commit prepared time, we first check
1127 * whether we have prepared the transaction or not but that doesn't seem
1128 * worthwhile because such cases shouldn't be common.
1129 */
1131
1132 apply_handle_prepare_internal(&prepare_data);
1133
1136 pgstat_report_stat(false);
1137
1138 /*
1139 * It is okay not to set the local_end LSN for the prepare because we
1140 * always flush the prepare record. So, we can send the acknowledgment of
1141 * the remote_end LSN as soon as prepare is finished.
1142 *
1143 * XXX For the sake of consistency with commit, we could have set it with
1144 * the LSN of prepare but as of now we don't track that value similar to
1145 * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
1146 * it.
1147 */
1149
1150 in_remote_transaction = false;
1151
1152 /* Process any tables that are being synchronized in parallel. */
1153 process_syncing_tables(prepare_data.end_lsn);
1154
1155 /*
1156 * Since we have already prepared the transaction, in a case where the
1157 * server crashes before clearing the subskiplsn, it will be left but the
1158 * transaction won't be resent. But that's okay because it's a rare case
1159 * and the subskiplsn will be cleared when finishing the next transaction.
1160 */
1163
1166}
1167
1168/*
1169 * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1170 *
1171 * Note that we don't need to wait here if the transaction was prepared in a
1172 * parallel apply worker. In that case, we have already waited for the prepare
1173 * to finish in apply_handle_stream_prepare() which will ensure all the
1174 * operations in that transaction have happened in the subscriber, so no
1175 * concurrent transaction can cause deadlock or transaction dependency issues.
1176 */
1177static void
1179{
1181 char gid[GIDSIZE];
1182
1183 logicalrep_read_commit_prepared(s, &prepare_data);
1184 set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1185
1186 /* Compute GID for two_phase transactions. */
1188 gid, sizeof(gid));
1189
1190 /* There is no transaction when COMMIT PREPARED is called */
1192
1193 /*
1194 * Update origin state so we can restart streaming from correct position
1195 * in case of crash.
1196 */
1199
1200 FinishPreparedTransaction(gid, true);
1203 pgstat_report_stat(false);
1204
1206 in_remote_transaction = false;
1207
1208 /* Process any tables that are being synchronized in parallel. */
1209 process_syncing_tables(prepare_data.end_lsn);
1210
1212
1215}
1216
1217/*
1218 * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1219 *
1220 * Note that we don't need to wait here if the transaction was prepared in a
1221 * parallel apply worker. In that case, we have already waited for the prepare
1222 * to finish in apply_handle_stream_prepare() which will ensure all the
1223 * operations in that transaction have happened in the subscriber, so no
1224 * concurrent transaction can cause deadlock or transaction dependency issues.
1225 */
1226static void
1228{
1230 char gid[GIDSIZE];
1231
1232 logicalrep_read_rollback_prepared(s, &rollback_data);
1233 set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1234
1235 /* Compute GID for two_phase transactions. */
1237 gid, sizeof(gid));
1238
1239 /*
1240 * It is possible that we haven't received prepare because it occurred
1241 * before walsender reached a consistent point or the two_phase was still
1242 * not enabled by that time, so in such cases, we need to skip rollback
1243 * prepared.
1244 */
1245 if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1246 rollback_data.prepare_time))
1247 {
1248 /*
1249 * Update origin state so we can restart streaming from correct
1250 * position in case of crash.
1251 */
1254
1255 /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1257 FinishPreparedTransaction(gid, false);
1260
1262 }
1263
1264 pgstat_report_stat(false);
1265
1266 /*
1267 * It is okay not to set the local_end LSN for the rollback of prepared
1268 * transaction because we always flush the WAL record for it. See
1269 * apply_handle_prepare.
1270 */
1272 in_remote_transaction = false;
1273
1274 /* Process any tables that are being synchronized in parallel. */
1276
1279}
1280
1281/*
1282 * Handle STREAM PREPARE.
1283 */
1284static void
1286{
1287 LogicalRepPreparedTxnData prepare_data;
1289 TransApplyAction apply_action;
1290
1291 /* Save the message before it is consumed. */
1292 StringInfoData original_msg = *s;
1293
1295 ereport(ERROR,
1296 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1297 errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1298
1299 /* Tablesync should never receive prepare. */
1300 if (am_tablesync_worker())
1301 ereport(ERROR,
1302 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1303 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1304
1305 logicalrep_read_stream_prepare(s, &prepare_data);
1306 set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1307
1308 apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1309
1310 switch (apply_action)
1311 {
1312 case TRANS_LEADER_APPLY:
1313
1314 /*
1315 * The transaction has been serialized to file, so replay all the
1316 * spooled operations.
1317 */
1319 prepare_data.xid, prepare_data.prepare_lsn);
1320
1321 /* Mark the transaction as prepared. */
1322 apply_handle_prepare_internal(&prepare_data);
1323
1325
1326 /*
1327 * It is okay not to set the local_end LSN for the prepare because
1328 * we always flush the prepare record. See apply_handle_prepare.
1329 */
1331
1332 in_remote_transaction = false;
1333
1334 /* Unlink the files with serialized changes and subxact info. */
1336
1337 elog(DEBUG1, "finished processing the STREAM PREPARE command");
1338 break;
1339
1341 Assert(winfo);
1342
1343 if (pa_send_data(winfo, s->len, s->data))
1344 {
1345 /* Finish processing the streaming transaction. */
1346 pa_xact_finish(winfo, prepare_data.end_lsn);
1347 break;
1348 }
1349
1350 /*
1351 * Switch to serialize mode when we are not able to send the
1352 * change to parallel apply worker.
1353 */
1354 pa_switch_to_partial_serialize(winfo, true);
1355
1356 /* fall through */
1358 Assert(winfo);
1359
1360 stream_open_and_write_change(prepare_data.xid,
1362 &original_msg);
1363
1365
1366 /* Finish processing the streaming transaction. */
1367 pa_xact_finish(winfo, prepare_data.end_lsn);
1368 break;
1369
1371
1372 /*
1373 * If the parallel apply worker is applying spooled messages then
1374 * close the file before preparing.
1375 */
1376 if (stream_fd)
1378
1380
1381 /* Mark the transaction as prepared. */
1382 apply_handle_prepare_internal(&prepare_data);
1383
1385
1387
1388 /*
1389 * It is okay not to set the local_end LSN for the prepare because
1390 * we always flush the prepare record. See apply_handle_prepare.
1391 */
1393
1396
1398
1399 elog(DEBUG1, "finished processing the STREAM PREPARE command");
1400 break;
1401
1402 default:
1403 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1404 break;
1405 }
1406
1407 pgstat_report_stat(false);
1408
1409 /* Process any tables that are being synchronized in parallel. */
1410 process_syncing_tables(prepare_data.end_lsn);
1411
1412 /*
1413 * Similar to prepare case, the subskiplsn could be left in a case of
1414 * server crash but it's okay. See the comments in apply_handle_prepare().
1415 */
1418
1420
1422}
1423
1424/*
1425 * Handle ORIGIN message.
1426 *
1427 * TODO, support tracking of multiple origins
1428 */
1429static void
1431{
1432 /*
1433 * ORIGIN message can only come inside streaming transaction or inside
1434 * remote transaction and before any actual writes.
1435 */
1439 ereport(ERROR,
1440 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1441 errmsg_internal("ORIGIN message sent out of order")));
1442}
1443
1444/*
1445 * Initialize fileset (if not already done).
1446 *
1447 * Create a new file when first_segment is true, otherwise open the existing
1448 * file.
1449 */
1450void
1451stream_start_internal(TransactionId xid, bool first_segment)
1452{
1454
1455 /*
1456 * Initialize the worker's stream_fileset if we haven't yet. This will be
1457 * used for the entire duration of the worker so create it in a permanent
1458 * context. We create this on the very first streaming message from any
1459 * transaction and then use it for this and other streaming transactions.
1460 * Now, we could create a fileset at the start of the worker as well but
1461 * then we won't be sure that it will ever be used.
1462 */
1464 {
1465 MemoryContext oldctx;
1466
1468
1471
1472 MemoryContextSwitchTo(oldctx);
1473 }
1474
1475 /* Open the spool file for this transaction. */
1476 stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1477
1478 /* If this is not the first segment, open existing subxact file. */
1479 if (!first_segment)
1481
1483}
1484
1485/*
1486 * Handle STREAM START message.
1487 */
1488static void
1490{
1491 bool first_segment;
1493 TransApplyAction apply_action;
1494
1495 /* Save the message before it is consumed. */
1496 StringInfoData original_msg = *s;
1497
1499 ereport(ERROR,
1500 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1501 errmsg_internal("duplicate STREAM START message")));
1502
1503 /* There must not be an active streaming transaction. */
1505
1506 /* notify handle methods we're processing a remote transaction */
1508
1509 /* extract XID of the top-level transaction */
1510 stream_xid = logicalrep_read_stream_start(s, &first_segment);
1511
1513 ereport(ERROR,
1514 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1515 errmsg_internal("invalid transaction ID in streamed replication transaction")));
1516
1518
1519 /* Try to allocate a worker for the streaming transaction. */
1520 if (first_segment)
1522
1523 apply_action = get_transaction_apply_action(stream_xid, &winfo);
1524
1525 switch (apply_action)
1526 {
1528
1529 /*
1530 * Function stream_start_internal starts a transaction. This
1531 * transaction will be committed on the stream stop unless it is a
1532 * tablesync worker in which case it will be committed after
1533 * processing all the messages. We need this transaction for
1534 * handling the BufFile, used for serializing the streaming data
1535 * and subxact info.
1536 */
1537 stream_start_internal(stream_xid, first_segment);
1538 break;
1539
1541 Assert(winfo);
1542
1543 /*
1544 * Once we start serializing the changes, the parallel apply
1545 * worker will wait for the leader to release the stream lock
1546 * until the end of the transaction. So, we don't need to release
1547 * the lock or increment the stream count in that case.
1548 */
1549 if (pa_send_data(winfo, s->len, s->data))
1550 {
1551 /*
1552 * Unlock the shared object lock so that the parallel apply
1553 * worker can continue to receive changes.
1554 */
1555 if (!first_segment)
1557
1558 /*
1559 * Increment the number of streaming blocks waiting to be
1560 * processed by parallel apply worker.
1561 */
1563
1564 /* Cache the parallel apply worker for this transaction. */
1566 break;
1567 }
1568
1569 /*
1570 * Switch to serialize mode when we are not able to send the
1571 * change to parallel apply worker.
1572 */
1573 pa_switch_to_partial_serialize(winfo, !first_segment);
1574
1575 /* fall through */
1577 Assert(winfo);
1578
1579 /*
1580 * Open the spool file unless it was already opened when switching
1581 * to serialize mode. The transaction started in
1582 * stream_start_internal will be committed on the stream stop.
1583 */
1584 if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1585 stream_start_internal(stream_xid, first_segment);
1586
1588
1589 /* Cache the parallel apply worker for this transaction. */
1591 break;
1592
1594 if (first_segment)
1595 {
1596 /* Hold the lock until the end of the transaction. */
1599
1600 /*
1601 * Signal the leader apply worker, as it may be waiting for
1602 * us.
1603 */
1605 }
1606
1608 break;
1609
1610 default:
1611 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1612 break;
1613 }
1614
1616}
1617
1618/*
1619 * Update the information about subxacts and close the file.
1620 *
1621 * This function should be called when the stream_start_internal function has
1622 * been called.
1623 */
1624void
1626{
1627 /*
1628 * Serialize information about subxacts for the toplevel transaction, then
1629 * close the stream messages spool file.
1630 */
1633
1634 /* We must be in a valid transaction state */
1636
1637 /* Commit the per-stream transaction */
1639
1640 /* Reset per-stream context */
1642}
1643
1644/*
1645 * Handle STREAM STOP message.
1646 */
1647static void
1649{
1651 TransApplyAction apply_action;
1652
1654 ereport(ERROR,
1655 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1656 errmsg_internal("STREAM STOP message without STREAM START")));
1657
1658 apply_action = get_transaction_apply_action(stream_xid, &winfo);
1659
1660 switch (apply_action)
1661 {
1664 break;
1665
1667 Assert(winfo);
1668
1669 /*
1670 * Lock before sending the STREAM_STOP message so that the leader
1671 * can hold the lock first and the parallel apply worker will wait
1672 * for leader to release the lock. See Locking Considerations atop
1673 * applyparallelworker.c.
1674 */
1676
1677 if (pa_send_data(winfo, s->len, s->data))
1678 {
1680 break;
1681 }
1682
1683 /*
1684 * Switch to serialize mode when we are not able to send the
1685 * change to parallel apply worker.
1686 */
1687 pa_switch_to_partial_serialize(winfo, true);
1688
1689 /* fall through */
1694 break;
1695
1697 elog(DEBUG1, "applied %u changes in the streaming chunk",
1699
1700 /*
1701 * By the time parallel apply worker is processing the changes in
1702 * the current streaming block, the leader apply worker may have
1703 * sent multiple streaming blocks. This can lead to parallel apply
1704 * worker start waiting even when there are more chunk of streams
1705 * in the queue. So, try to lock only if there is no message left
1706 * in the queue. See Locking Considerations atop
1707 * applyparallelworker.c.
1708 *
1709 * Note that here we have a race condition where we can start
1710 * waiting even when there are pending streaming chunks. This can
1711 * happen if the leader sends another streaming block and acquires
1712 * the stream lock again after the parallel apply worker checks
1713 * that there is no pending streaming block and before it actually
1714 * starts waiting on a lock. We can handle this case by not
1715 * allowing the leader to increment the stream block count during
1716 * the time parallel apply worker acquires the lock but it is not
1717 * clear whether that is worth the complexity.
1718 *
1719 * Now, if this missed chunk contains rollback to savepoint, then
1720 * there is a risk of deadlock which probably shouldn't happen
1721 * after restart.
1722 */
1724 break;
1725
1726 default:
1727 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1728 break;
1729 }
1730
1733
1734 /*
1735 * The parallel apply worker could be in a transaction in which case we
1736 * need to report the state as STATE_IDLEINTRANSACTION.
1737 */
1740 else
1742
1744}
1745
1746/*
1747 * Helper function to handle STREAM ABORT message when the transaction was
1748 * serialized to file.
1749 */
1750static void
1752{
1753 /*
1754 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1755 * just delete the files with serialized info.
1756 */
1757 if (xid == subxid)
1759 else
1760 {
1761 /*
1762 * OK, so it's a subxact. We need to read the subxact file for the
1763 * toplevel transaction, determine the offset tracked for the subxact,
1764 * and truncate the file with changes. We also remove the subxacts
1765 * with higher offsets (or rather higher XIDs).
1766 *
1767 * We intentionally scan the array from the tail, because we're likely
1768 * aborting a change for the most recent subtransactions.
1769 *
1770 * We can't use the binary search here as subxact XIDs won't
1771 * necessarily arrive in sorted order, consider the case where we have
1772 * released the savepoint for multiple subtransactions and then
1773 * performed rollback to savepoint for one of the earlier
1774 * sub-transaction.
1775 */
1776 int64 i;
1777 int64 subidx;
1778 BufFile *fd;
1779 bool found = false;
1780 char path[MAXPGPATH];
1781
1782 subidx = -1;
1785
1786 for (i = subxact_data.nsubxacts; i > 0; i--)
1787 {
1788 if (subxact_data.subxacts[i - 1].xid == subxid)
1789 {
1790 subidx = (i - 1);
1791 found = true;
1792 break;
1793 }
1794 }
1795
1796 /*
1797 * If it's an empty sub-transaction then we will not find the subxid
1798 * here so just cleanup the subxact info and return.
1799 */
1800 if (!found)
1801 {
1802 /* Cleanup the subxact info */
1806 return;
1807 }
1808
1809 /* open the changes file */
1812 O_RDWR, false);
1813
1814 /* OK, truncate the file at the right offset */
1816 subxact_data.subxacts[subidx].offset);
1818
1819 /* discard the subxacts added later */
1820 subxact_data.nsubxacts = subidx;
1821
1822 /* write the updated subxact list */
1824
1827 }
1828}
1829
1830/*
1831 * Handle STREAM ABORT message.
1832 */
1833static void
1835{
1836 TransactionId xid;
1837 TransactionId subxid;
1838 LogicalRepStreamAbortData abort_data;
1840 TransApplyAction apply_action;
1841
1842 /* Save the message before it is consumed. */
1843 StringInfoData original_msg = *s;
1844 bool toplevel_xact;
1845
1847 ereport(ERROR,
1848 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1849 errmsg_internal("STREAM ABORT message without STREAM STOP")));
1850
1851 /* We receive abort information only when we can apply in parallel. */
1852 logicalrep_read_stream_abort(s, &abort_data,
1854
1855 xid = abort_data.xid;
1856 subxid = abort_data.subxid;
1857 toplevel_xact = (xid == subxid);
1858
1859 set_apply_error_context_xact(subxid, abort_data.abort_lsn);
1860
1861 apply_action = get_transaction_apply_action(xid, &winfo);
1862
1863 switch (apply_action)
1864 {
1865 case TRANS_LEADER_APPLY:
1866
1867 /*
1868 * We are in the leader apply worker and the transaction has been
1869 * serialized to file.
1870 */
1871 stream_abort_internal(xid, subxid);
1872
1873 elog(DEBUG1, "finished processing the STREAM ABORT command");
1874 break;
1875
1877 Assert(winfo);
1878
1879 /*
1880 * For the case of aborting the subtransaction, we increment the
1881 * number of streaming blocks and take the lock again before
1882 * sending the STREAM_ABORT to ensure that the parallel apply
1883 * worker will wait on the lock for the next set of changes after
1884 * processing the STREAM_ABORT message if it is not already
1885 * waiting for STREAM_STOP message.
1886 *
1887 * It is important to perform this locking before sending the
1888 * STREAM_ABORT message so that the leader can hold the lock first
1889 * and the parallel apply worker will wait for the leader to
1890 * release the lock. This is the same as what we do in
1891 * apply_handle_stream_stop. See Locking Considerations atop
1892 * applyparallelworker.c.
1893 */
1894 if (!toplevel_xact)
1895 {
1899 }
1900
1901 if (pa_send_data(winfo, s->len, s->data))
1902 {
1903 /*
1904 * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
1905 * wait here for the parallel apply worker to finish as that
1906 * is not required to maintain the commit order and won't have
1907 * the risk of failures due to transaction dependencies and
1908 * deadlocks. However, it is possible that before the parallel
1909 * worker finishes and we clear the worker info, the xid
1910 * wraparound happens on the upstream and a new transaction
1911 * with the same xid can appear and that can lead to duplicate
1912 * entries in ParallelApplyTxnHash. Yet another problem could
1913 * be that we may have serialized the changes in partial
1914 * serialize mode and the file containing xact changes may
1915 * already exist, and after xid wraparound trying to create
1916 * the file for the same xid can lead to an error. To avoid
1917 * these problems, we decide to wait for the aborts to finish.
1918 *
1919 * Note, it is okay to not update the flush location position
1920 * for aborts as in worst case that means such a transaction
1921 * won't be sent again after restart.
1922 */
1923 if (toplevel_xact)
1925
1926 break;
1927 }
1928
1929 /*
1930 * Switch to serialize mode when we are not able to send the
1931 * change to parallel apply worker.
1932 */
1933 pa_switch_to_partial_serialize(winfo, true);
1934
1935 /* fall through */
1937 Assert(winfo);
1938
1939 /*
1940 * Parallel apply worker might have applied some changes, so write
1941 * the STREAM_ABORT message so that it can rollback the
1942 * subtransaction if needed.
1943 */
1945 &original_msg);
1946
1947 if (toplevel_xact)
1948 {
1951 }
1952 break;
1953
1955
1956 /*
1957 * If the parallel apply worker is applying spooled messages then
1958 * close the file before aborting.
1959 */
1960 if (toplevel_xact && stream_fd)
1962
1963 pa_stream_abort(&abort_data);
1964
1965 /*
1966 * We need to wait after processing rollback to savepoint for the
1967 * next set of changes.
1968 *
1969 * We have a race condition here due to which we can start waiting
1970 * here when there are more chunk of streams in the queue. See
1971 * apply_handle_stream_stop.
1972 */
1973 if (!toplevel_xact)
1975
1976 elog(DEBUG1, "finished processing the STREAM ABORT command");
1977 break;
1978
1979 default:
1980 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1981 break;
1982 }
1983
1985}
1986
1987/*
1988 * Ensure that the passed location is fileset's end.
1989 */
1990static void
1991ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
1992 off_t offset)
1993{
1994 char path[MAXPGPATH];
1995 BufFile *fd;
1996 int last_fileno;
1997 off_t last_offset;
1998
2000
2002
2004
2005 fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2006
2007 BufFileSeek(fd, 0, 0, SEEK_END);
2008 BufFileTell(fd, &last_fileno, &last_offset);
2009
2011
2013
2014 if (last_fileno != fileno || last_offset != offset)
2015 elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2016 path);
2017}
2018
2019/*
2020 * Common spoolfile processing.
2021 */
2022void
2024 XLogRecPtr lsn)
2025{
2026 int nchanges;
2027 char path[MAXPGPATH];
2028 char *buffer = NULL;
2029 MemoryContext oldcxt;
2030 ResourceOwner oldowner;
2031 int fileno;
2032 off_t offset;
2033
2036
2037 /* Make sure we have an open transaction */
2039
2040 /*
2041 * Allocate file handle and memory required to process all the messages in
2042 * TopTransactionContext to avoid them getting reset after each message is
2043 * processed.
2044 */
2046
2047 /* Open the spool file for the committed/prepared transaction */
2049 elog(DEBUG1, "replaying changes from file \"%s\"", path);
2050
2051 /*
2052 * Make sure the file is owned by the toplevel transaction so that the
2053 * file will not be accidentally closed when aborting a subtransaction.
2054 */
2055 oldowner = CurrentResourceOwner;
2057
2058 stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2059
2060 CurrentResourceOwner = oldowner;
2061
2062 buffer = palloc(BLCKSZ);
2063
2064 MemoryContextSwitchTo(oldcxt);
2065
2066 remote_final_lsn = lsn;
2067
2068 /*
2069 * Make sure the handle apply_dispatch methods are aware we're in a remote
2070 * transaction.
2071 */
2072 in_remote_transaction = true;
2074
2076
2077 /*
2078 * Read the entries one by one and pass them through the same logic as in
2079 * apply_dispatch.
2080 */
2081 nchanges = 0;
2082 while (true)
2083 {
2085 size_t nbytes;
2086 int len;
2087
2089
2090 /* read length of the on-disk record */
2091 nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2092
2093 /* have we reached end of the file? */
2094 if (nbytes == 0)
2095 break;
2096
2097 /* do we have a correct length? */
2098 if (len <= 0)
2099 elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2100 len, path);
2101
2102 /* make sure we have sufficiently large buffer */
2103 buffer = repalloc(buffer, len);
2104
2105 /* and finally read the data into the buffer */
2106 BufFileReadExact(stream_fd, buffer, len);
2107
2108 BufFileTell(stream_fd, &fileno, &offset);
2109
2110 /* init a stringinfo using the buffer and call apply_dispatch */
2111 initReadOnlyStringInfo(&s2, buffer, len);
2112
2113 /* Ensure we are reading the data into our memory context. */
2115
2117
2119
2120 MemoryContextSwitchTo(oldcxt);
2121
2122 nchanges++;
2123
2124 /*
2125 * It is possible the file has been closed because we have processed
2126 * the transaction end message like stream_commit in which case that
2127 * must be the last message.
2128 */
2129 if (!stream_fd)
2130 {
2131 ensure_last_message(stream_fileset, xid, fileno, offset);
2132 break;
2133 }
2134
2135 if (nchanges % 1000 == 0)
2136 elog(DEBUG1, "replayed %d changes from file \"%s\"",
2137 nchanges, path);
2138 }
2139
2140 if (stream_fd)
2142
2143 elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2144 nchanges, path);
2145
2146 return;
2147}
2148
2149/*
2150 * Handle STREAM COMMIT message.
2151 */
2152static void
2154{
2155 TransactionId xid;
2156 LogicalRepCommitData commit_data;
2158 TransApplyAction apply_action;
2159
2160 /* Save the message before it is consumed. */
2161 StringInfoData original_msg = *s;
2162
2164 ereport(ERROR,
2165 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2166 errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2167
2168 xid = logicalrep_read_stream_commit(s, &commit_data);
2169 set_apply_error_context_xact(xid, commit_data.commit_lsn);
2170
2171 apply_action = get_transaction_apply_action(xid, &winfo);
2172
2173 switch (apply_action)
2174 {
2175 case TRANS_LEADER_APPLY:
2176
2177 /*
2178 * The transaction has been serialized to file, so replay all the
2179 * spooled operations.
2180 */
2182 commit_data.commit_lsn);
2183
2184 apply_handle_commit_internal(&commit_data);
2185
2186 /* Unlink the files with serialized changes and subxact info. */
2188
2189 elog(DEBUG1, "finished processing the STREAM COMMIT command");
2190 break;
2191
2193 Assert(winfo);
2194
2195 if (pa_send_data(winfo, s->len, s->data))
2196 {
2197 /* Finish processing the streaming transaction. */
2198 pa_xact_finish(winfo, commit_data.end_lsn);
2199 break;
2200 }
2201
2202 /*
2203 * Switch to serialize mode when we are not able to send the
2204 * change to parallel apply worker.
2205 */
2206 pa_switch_to_partial_serialize(winfo, true);
2207
2208 /* fall through */
2210 Assert(winfo);
2211
2213 &original_msg);
2214
2216
2217 /* Finish processing the streaming transaction. */
2218 pa_xact_finish(winfo, commit_data.end_lsn);
2219 break;
2220
2222
2223 /*
2224 * If the parallel apply worker is applying spooled messages then
2225 * close the file before committing.
2226 */
2227 if (stream_fd)
2229
2230 apply_handle_commit_internal(&commit_data);
2231
2233
2234 /*
2235 * It is important to set the transaction state as finished before
2236 * releasing the lock. See pa_wait_for_xact_finish.
2237 */
2240
2242
2243 elog(DEBUG1, "finished processing the STREAM COMMIT command");
2244 break;
2245
2246 default:
2247 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2248 break;
2249 }
2250
2251 /* Process any tables that are being synchronized in parallel. */
2252 process_syncing_tables(commit_data.end_lsn);
2253
2255
2257}
2258
2259/*
2260 * Helper function for apply_handle_commit and apply_handle_stream_commit.
2261 */
2262static void
2264{
2265 if (is_skipping_changes())
2266 {
2268
2269 /*
2270 * Start a new transaction to clear the subskiplsn, if not started
2271 * yet.
2272 */
2273 if (!IsTransactionState())
2275 }
2276
2277 if (IsTransactionState())
2278 {
2279 /*
2280 * The transaction is either non-empty or skipped, so we clear the
2281 * subskiplsn.
2282 */
2284
2285 /*
2286 * Update origin state so we can restart streaming from correct
2287 * position in case of crash.
2288 */
2291
2293
2294 if (IsTransactionBlock())
2295 {
2296 EndTransactionBlock(false);
2298 }
2299
2300 pgstat_report_stat(false);
2301
2303 }
2304 else
2305 {
2306 /* Process any invalidation messages that might have accumulated. */
2309 }
2310
2311 in_remote_transaction = false;
2312}
2313
2314/*
2315 * Handle RELATION message.
2316 *
2317 * Note we don't do validation against local schema here. The validation
2318 * against local schema is postponed until first change for given relation
2319 * comes as we only care about it when applying changes for it anyway and we
2320 * do less locking this way.
2321 */
2322static void
2324{
2325 LogicalRepRelation *rel;
2326
2328 return;
2329
2330 rel = logicalrep_read_rel(s);
2332
2333 /* Also reset all entries in the partition map that refer to remoterel. */
2335}
2336
2337/*
2338 * Handle TYPE message.
2339 *
2340 * This implementation pays no attention to TYPE messages; we expect the user
2341 * to have set things up so that the incoming data is acceptable to the input
2342 * functions for the locally subscribed tables. Hence, we just read and
2343 * discard the message.
2344 */
2345static void
2347{
2348 LogicalRepTyp typ;
2349
2351 return;
2352
2353 logicalrep_read_typ(s, &typ);
2354}
2355
2356/*
2357 * Check that we (the subscription owner) have sufficient privileges on the
2358 * target relation to perform the given operation.
2359 */
2360static void
2362{
2363 Oid relid;
2364 AclResult aclresult;
2365
2366 relid = RelationGetRelid(rel);
2367 aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2368 if (aclresult != ACLCHECK_OK)
2369 aclcheck_error(aclresult,
2370 get_relkind_objtype(rel->rd_rel->relkind),
2371 get_rel_name(relid));
2372
2373 /*
2374 * We lack the infrastructure to honor RLS policies. It might be possible
2375 * to add such infrastructure here, but tablesync workers lack it, too, so
2376 * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2377 * but it seems dangerous to replicate a TRUNCATE and then refuse to
2378 * replicate subsequent INSERTs, so we forbid all commands the same.
2379 */
2380 if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2381 ereport(ERROR,
2382 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2383 errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2386}
2387
2388/*
2389 * Handle INSERT message.
2390 */
2391
2392static void
2394{
2396 LogicalRepTupleData newtup;
2397 LogicalRepRelId relid;
2398 UserContext ucxt;
2399 ApplyExecutionData *edata;
2400 EState *estate;
2401 TupleTableSlot *remoteslot;
2402 MemoryContext oldctx;
2403 bool run_as_owner;
2404
2405 /*
2406 * Quick return if we are skipping data modification changes or handling
2407 * streamed transactions.
2408 */
2409 if (is_skipping_changes() ||
2411 return;
2412
2414
2415 relid = logicalrep_read_insert(s, &newtup);
2418 {
2419 /*
2420 * The relation can't become interesting in the middle of the
2421 * transaction so it's safe to unlock it.
2422 */
2425 return;
2426 }
2427
2428 /*
2429 * Make sure that any user-supplied code runs as the table owner, unless
2430 * the user has opted out of that behavior.
2431 */
2432 run_as_owner = MySubscription->runasowner;
2433 if (!run_as_owner)
2434 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2435
2436 /* Set relation for error callback */
2438
2439 /* Initialize the executor state. */
2440 edata = create_edata_for_relation(rel);
2441 estate = edata->estate;
2442 remoteslot = ExecInitExtraTupleSlot(estate,
2444 &TTSOpsVirtual);
2445
2446 /* Process and store remote tuple in the slot */
2448 slot_store_data(remoteslot, rel, &newtup);
2449 slot_fill_defaults(rel, estate, remoteslot);
2450 MemoryContextSwitchTo(oldctx);
2451
2452 /* For a partitioned table, insert the tuple into a partition. */
2453 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2455 remoteslot, NULL, CMD_INSERT);
2456 else
2457 {
2458 ResultRelInfo *relinfo = edata->targetRelInfo;
2459
2460 ExecOpenIndices(relinfo, false);
2461 apply_handle_insert_internal(edata, relinfo, remoteslot);
2462 ExecCloseIndices(relinfo);
2463 }
2464
2465 finish_edata(edata);
2466
2467 /* Reset relation for error callback */
2469
2470 if (!run_as_owner)
2471 RestoreUserContext(&ucxt);
2472
2474
2476}
2477
2478/*
2479 * Workhorse for apply_handle_insert()
2480 * relinfo is for the relation we're actually inserting into
2481 * (could be a child partition of edata->targetRelInfo)
2482 */
2483static void
2485 ResultRelInfo *relinfo,
2486 TupleTableSlot *remoteslot)
2487{
2488 EState *estate = edata->estate;
2489
2490 /* Caller should have opened indexes already. */
2491 Assert(relinfo->ri_IndexRelationDescs != NULL ||
2492 !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
2494
2495 /* Caller will not have done this bit. */
2497 InitConflictIndexes(relinfo);
2498
2499 /* Do the insert. */
2501 ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2502}
2503
2504/*
2505 * Check if the logical replication relation is updatable and throw
2506 * appropriate error if it isn't.
2507 */
2508static void
2510{
2511 /*
2512 * For partitioned tables, we only need to care if the target partition is
2513 * updatable (aka has PK or RI defined for it).
2514 */
2515 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2516 return;
2517
2518 /* Updatable, no error. */
2519 if (rel->updatable)
2520 return;
2521
2522 /*
2523 * We are in error mode so it's fine this is somewhat slow. It's better to
2524 * give user correct error.
2525 */
2527 {
2528 ereport(ERROR,
2529 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2530 errmsg("publisher did not send replica identity column "
2531 "expected by the logical replication target relation \"%s.%s\"",
2532 rel->remoterel.nspname, rel->remoterel.relname)));
2533 }
2534
2535 ereport(ERROR,
2536 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2537 errmsg("logical replication target relation \"%s.%s\" has "
2538 "neither REPLICA IDENTITY index nor PRIMARY "
2539 "KEY and published relation does not have "
2540 "REPLICA IDENTITY FULL",
2541 rel->remoterel.nspname, rel->remoterel.relname)));
2542}
2543
2544/*
2545 * Handle UPDATE message.
2546 *
2547 * TODO: FDW support
2548 */
2549static void
2551{
2553 LogicalRepRelId relid;
2554 UserContext ucxt;
2555 ApplyExecutionData *edata;
2556 EState *estate;
2557 LogicalRepTupleData oldtup;
2558 LogicalRepTupleData newtup;
2559 bool has_oldtup;
2560 TupleTableSlot *remoteslot;
2561 RTEPermissionInfo *target_perminfo;
2562 MemoryContext oldctx;
2563 bool run_as_owner;
2564
2565 /*
2566 * Quick return if we are skipping data modification changes or handling
2567 * streamed transactions.
2568 */
2569 if (is_skipping_changes() ||
2571 return;
2572
2574
2575 relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2576 &newtup);
2579 {
2580 /*
2581 * The relation can't become interesting in the middle of the
2582 * transaction so it's safe to unlock it.
2583 */
2586 return;
2587 }
2588
2589 /* Set relation for error callback */
2591
2592 /* Check if we can do the update. */
2594
2595 /*
2596 * Make sure that any user-supplied code runs as the table owner, unless
2597 * the user has opted out of that behavior.
2598 */
2599 run_as_owner = MySubscription->runasowner;
2600 if (!run_as_owner)
2601 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2602
2603 /* Initialize the executor state. */
2604 edata = create_edata_for_relation(rel);
2605 estate = edata->estate;
2606 remoteslot = ExecInitExtraTupleSlot(estate,
2608 &TTSOpsVirtual);
2609
2610 /*
2611 * Populate updatedCols so that per-column triggers can fire, and so
2612 * executor can correctly pass down indexUnchanged hint. This could
2613 * include more columns than were actually changed on the publisher
2614 * because the logical replication protocol doesn't contain that
2615 * information. But it would for example exclude columns that only exist
2616 * on the subscriber, since we are not touching those.
2617 */
2618 target_perminfo = list_nth(estate->es_rteperminfos, 0);
2619 for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2620 {
2622 int remoteattnum = rel->attrmap->attnums[i];
2623
2624 if (!att->attisdropped && remoteattnum >= 0)
2625 {
2626 Assert(remoteattnum < newtup.ncols);
2627 if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2628 target_perminfo->updatedCols =
2629 bms_add_member(target_perminfo->updatedCols,
2631 }
2632 }
2633
2634 /* Build the search tuple. */
2636 slot_store_data(remoteslot, rel,
2637 has_oldtup ? &oldtup : &newtup);
2638 MemoryContextSwitchTo(oldctx);
2639
2640 /* For a partitioned table, apply update to correct partition. */
2641 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2643 remoteslot, &newtup, CMD_UPDATE);
2644 else
2646 remoteslot, &newtup, rel->localindexoid);
2647
2648 finish_edata(edata);
2649
2650 /* Reset relation for error callback */
2652
2653 if (!run_as_owner)
2654 RestoreUserContext(&ucxt);
2655
2657
2659}
2660
2661/*
2662 * Workhorse for apply_handle_update()
2663 * relinfo is for the relation we're actually updating in
2664 * (could be a child partition of edata->targetRelInfo)
2665 */
2666static void
2668 ResultRelInfo *relinfo,
2669 TupleTableSlot *remoteslot,
2670 LogicalRepTupleData *newtup,
2671 Oid localindexoid)
2672{
2673 EState *estate = edata->estate;
2674 LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2675 Relation localrel = relinfo->ri_RelationDesc;
2676 EPQState epqstate;
2677 TupleTableSlot *localslot = NULL;
2678 ConflictTupleInfo conflicttuple = {0};
2679 bool found;
2680 MemoryContext oldctx;
2681
2682 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2683 ExecOpenIndices(relinfo, false);
2684
2685 found = FindReplTupleInLocalRel(edata, localrel,
2686 &relmapentry->remoterel,
2687 localindexoid,
2688 remoteslot, &localslot);
2689
2690 /*
2691 * Tuple found.
2692 *
2693 * Note this will fail if there are other conflicting unique indexes.
2694 */
2695 if (found)
2696 {
2697 /*
2698 * Report the conflict if the tuple was modified by a different
2699 * origin.
2700 */
2701 if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
2702 &conflicttuple.origin, &conflicttuple.ts) &&
2703 conflicttuple.origin != replorigin_session_origin)
2704 {
2705 TupleTableSlot *newslot;
2706
2707 /* Store the new tuple for conflict reporting */
2708 newslot = table_slot_create(localrel, &estate->es_tupleTable);
2709 slot_store_data(newslot, relmapentry, newtup);
2710
2711 conflicttuple.slot = localslot;
2712
2714 remoteslot, newslot,
2715 list_make1(&conflicttuple));
2716 }
2717
2718 /* Process and store remote tuple in the slot */
2720 slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2721 MemoryContextSwitchTo(oldctx);
2722
2723 EvalPlanQualSetSlot(&epqstate, remoteslot);
2724
2725 InitConflictIndexes(relinfo);
2726
2727 /* Do the actual update. */
2729 ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2730 remoteslot);
2731 }
2732 else
2733 {
2734 TupleTableSlot *newslot = localslot;
2735
2736 /* Store the new tuple for conflict reporting */
2737 slot_store_data(newslot, relmapentry, newtup);
2738
2739 /*
2740 * The tuple to be updated could not be found. Do nothing except for
2741 * emitting a log message.
2742 */
2743 ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
2744 remoteslot, newslot, list_make1(&conflicttuple));
2745 }
2746
2747 /* Cleanup. */
2748 ExecCloseIndices(relinfo);
2749 EvalPlanQualEnd(&epqstate);
2750}
2751
2752/*
2753 * Handle DELETE message.
2754 *
2755 * TODO: FDW support
2756 */
2757static void
2759{
2761 LogicalRepTupleData oldtup;
2762 LogicalRepRelId relid;
2763 UserContext ucxt;
2764 ApplyExecutionData *edata;
2765 EState *estate;
2766 TupleTableSlot *remoteslot;
2767 MemoryContext oldctx;
2768 bool run_as_owner;
2769
2770 /*
2771 * Quick return if we are skipping data modification changes or handling
2772 * streamed transactions.
2773 */
2774 if (is_skipping_changes() ||
2776 return;
2777
2779
2780 relid = logicalrep_read_delete(s, &oldtup);
2783 {
2784 /*
2785 * The relation can't become interesting in the middle of the
2786 * transaction so it's safe to unlock it.
2787 */
2790 return;
2791 }
2792
2793 /* Set relation for error callback */
2795
2796 /* Check if we can do the delete. */
2798
2799 /*
2800 * Make sure that any user-supplied code runs as the table owner, unless
2801 * the user has opted out of that behavior.
2802 */
2803 run_as_owner = MySubscription->runasowner;
2804 if (!run_as_owner)
2805 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2806
2807 /* Initialize the executor state. */
2808 edata = create_edata_for_relation(rel);
2809 estate = edata->estate;
2810 remoteslot = ExecInitExtraTupleSlot(estate,
2812 &TTSOpsVirtual);
2813
2814 /* Build the search tuple. */
2816 slot_store_data(remoteslot, rel, &oldtup);
2817 MemoryContextSwitchTo(oldctx);
2818
2819 /* For a partitioned table, apply delete to correct partition. */
2820 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2822 remoteslot, NULL, CMD_DELETE);
2823 else
2824 {
2825 ResultRelInfo *relinfo = edata->targetRelInfo;
2826
2827 ExecOpenIndices(relinfo, false);
2828 apply_handle_delete_internal(edata, relinfo,
2829 remoteslot, rel->localindexoid);
2830 ExecCloseIndices(relinfo);
2831 }
2832
2833 finish_edata(edata);
2834
2835 /* Reset relation for error callback */
2837
2838 if (!run_as_owner)
2839 RestoreUserContext(&ucxt);
2840
2842
2844}
2845
2846/*
2847 * Workhorse for apply_handle_delete()
2848 * relinfo is for the relation we're actually deleting from
2849 * (could be a child partition of edata->targetRelInfo)
2850 */
2851static void
2853 ResultRelInfo *relinfo,
2854 TupleTableSlot *remoteslot,
2855 Oid localindexoid)
2856{
2857 EState *estate = edata->estate;
2858 Relation localrel = relinfo->ri_RelationDesc;
2859 LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
2860 EPQState epqstate;
2861 TupleTableSlot *localslot;
2862 ConflictTupleInfo conflicttuple = {0};
2863 bool found;
2864
2865 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2866
2867 /* Caller should have opened indexes already. */
2868 Assert(relinfo->ri_IndexRelationDescs != NULL ||
2869 !localrel->rd_rel->relhasindex ||
2870 RelationGetIndexList(localrel) == NIL);
2871
2872 found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
2873 remoteslot, &localslot);
2874
2875 /* If found delete it. */
2876 if (found)
2877 {
2878 /*
2879 * Report the conflict if the tuple was modified by a different
2880 * origin.
2881 */
2882 if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
2883 &conflicttuple.origin, &conflicttuple.ts) &&
2884 conflicttuple.origin != replorigin_session_origin)
2885 {
2886 conflicttuple.slot = localslot;
2888 remoteslot, NULL,
2889 list_make1(&conflicttuple));
2890 }
2891
2892 EvalPlanQualSetSlot(&epqstate, localslot);
2893
2894 /* Do the actual delete. */
2896 ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
2897 }
2898 else
2899 {
2900 /*
2901 * The tuple to be deleted could not be found. Do nothing except for
2902 * emitting a log message.
2903 */
2904 ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
2905 remoteslot, NULL, list_make1(&conflicttuple));
2906 }
2907
2908 /* Cleanup. */
2909 EvalPlanQualEnd(&epqstate);
2910}
2911
2912/*
2913 * Try to find a tuple received from the publication side (in 'remoteslot') in
2914 * the corresponding local relation using either replica identity index,
2915 * primary key, index or if needed, sequential scan.
2916 *
2917 * Local tuple, if found, is returned in '*localslot'.
2918 */
2919static bool
2921 LogicalRepRelation *remoterel,
2922 Oid localidxoid,
2923 TupleTableSlot *remoteslot,
2924 TupleTableSlot **localslot)
2925{
2926 EState *estate = edata->estate;
2927 bool found;
2928
2929 /*
2930 * Regardless of the top-level operation, we're performing a read here, so
2931 * check for SELECT privileges.
2932 */
2934
2935 *localslot = table_slot_create(localrel, &estate->es_tupleTable);
2936
2937 Assert(OidIsValid(localidxoid) ||
2938 (remoterel->replident == REPLICA_IDENTITY_FULL));
2939
2940 if (OidIsValid(localidxoid))
2941 {
2942#ifdef USE_ASSERT_CHECKING
2943 Relation idxrel = index_open(localidxoid, AccessShareLock);
2944
2945 /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
2946 Assert(GetRelationIdentityOrPK(localrel) == localidxoid ||
2947 (remoterel->replident == REPLICA_IDENTITY_FULL &&
2949 edata->targetRel->attrmap)));
2951#endif
2952
2953 found = RelationFindReplTupleByIndex(localrel, localidxoid,
2955 remoteslot, *localslot);
2956 }
2957 else
2959 remoteslot, *localslot);
2960
2961 return found;
2962}
2963
2964/*
2965 * This handles insert, update, delete on a partitioned table.
2966 */
2967static void
2969 TupleTableSlot *remoteslot,
2970 LogicalRepTupleData *newtup,
2971 CmdType operation)
2972{
2973 EState *estate = edata->estate;
2974 LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2975 ResultRelInfo *relinfo = edata->targetRelInfo;
2976 Relation parentrel = relinfo->ri_RelationDesc;
2977 ModifyTableState *mtstate;
2978 PartitionTupleRouting *proute;
2979 ResultRelInfo *partrelinfo;
2980 Relation partrel;
2981 TupleTableSlot *remoteslot_part;
2982 TupleConversionMap *map;
2983 MemoryContext oldctx;
2984 LogicalRepRelMapEntry *part_entry = NULL;
2985 AttrMap *attrmap = NULL;
2986
2987 /* ModifyTableState is needed for ExecFindPartition(). */
2988 edata->mtstate = mtstate = makeNode(ModifyTableState);
2989 mtstate->ps.plan = NULL;
2990 mtstate->ps.state = estate;
2991 mtstate->operation = operation;
2992 mtstate->resultRelInfo = relinfo;
2993
2994 /* ... as is PartitionTupleRouting. */
2995 edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
2996
2997 /*
2998 * Find the partition to which the "search tuple" belongs.
2999 */
3000 Assert(remoteslot != NULL);
3002 partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
3003 remoteslot, estate);
3004 Assert(partrelinfo != NULL);
3005 partrel = partrelinfo->ri_RelationDesc;
3006
3007 /*
3008 * Check for supported relkind. We need this since partitions might be of
3009 * unsupported relkinds; and the set of partitions can change, so checking
3010 * at CREATE/ALTER SUBSCRIPTION would be insufficient.
3011 */
3012 CheckSubscriptionRelkind(partrel->rd_rel->relkind,
3014 RelationGetRelationName(partrel));
3015
3016 /*
3017 * To perform any of the operations below, the tuple must match the
3018 * partition's rowtype. Convert if needed or just copy, using a dedicated
3019 * slot to store the tuple in any case.
3020 */
3021 remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
3022 if (remoteslot_part == NULL)
3023 remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
3024 map = ExecGetRootToChildMap(partrelinfo, estate);
3025 if (map != NULL)
3026 {
3027 attrmap = map->attrMap;
3028 remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
3029 remoteslot_part);
3030 }
3031 else
3032 {
3033 remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
3034 slot_getallattrs(remoteslot_part);
3035 }
3036 MemoryContextSwitchTo(oldctx);
3037
3038 /* Check if we can do the update or delete on the leaf partition. */
3039 if (operation == CMD_UPDATE || operation == CMD_DELETE)
3040 {
3041 part_entry = logicalrep_partition_open(relmapentry, partrel,
3042 attrmap);
3043 check_relation_updatable(part_entry);
3044 }
3045
3046 switch (operation)
3047 {
3048 case CMD_INSERT:
3049 apply_handle_insert_internal(edata, partrelinfo,
3050 remoteslot_part);
3051 break;
3052
3053 case CMD_DELETE:
3054 apply_handle_delete_internal(edata, partrelinfo,
3055 remoteslot_part,
3056 part_entry->localindexoid);
3057 break;
3058
3059 case CMD_UPDATE:
3060
3061 /*
3062 * For UPDATE, depending on whether or not the updated tuple
3063 * satisfies the partition's constraint, perform a simple UPDATE
3064 * of the partition or move the updated tuple into a different
3065 * suitable partition.
3066 */
3067 {
3068 TupleTableSlot *localslot;
3069 ResultRelInfo *partrelinfo_new;
3070 Relation partrel_new;
3071 bool found;
3072 EPQState epqstate;
3073 ConflictTupleInfo conflicttuple = {0};
3074
3075 /* Get the matching local tuple from the partition. */
3076 found = FindReplTupleInLocalRel(edata, partrel,
3077 &part_entry->remoterel,
3078 part_entry->localindexoid,
3079 remoteslot_part, &localslot);
3080 if (!found)
3081 {
3082 TupleTableSlot *newslot = localslot;
3083
3084 /* Store the new tuple for conflict reporting */
3085 slot_store_data(newslot, part_entry, newtup);
3086
3087 /*
3088 * The tuple to be updated could not be found. Do nothing
3089 * except for emitting a log message.
3090 */
3091 ReportApplyConflict(estate, partrelinfo, LOG,
3092 CT_UPDATE_MISSING, remoteslot_part,
3093 newslot, list_make1(&conflicttuple));
3094
3095 return;
3096 }
3097
3098 /*
3099 * Report the conflict if the tuple was modified by a
3100 * different origin.
3101 */
3102 if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3103 &conflicttuple.origin,
3104 &conflicttuple.ts) &&
3105 conflicttuple.origin != replorigin_session_origin)
3106 {
3107 TupleTableSlot *newslot;
3108
3109 /* Store the new tuple for conflict reporting */
3110 newslot = table_slot_create(partrel, &estate->es_tupleTable);
3111 slot_store_data(newslot, part_entry, newtup);
3112
3113 conflicttuple.slot = localslot;
3114
3115 ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
3116 remoteslot_part, newslot,
3117 list_make1(&conflicttuple));
3118 }
3119
3120 /*
3121 * Apply the update to the local tuple, putting the result in
3122 * remoteslot_part.
3123 */
3125 slot_modify_data(remoteslot_part, localslot, part_entry,
3126 newtup);
3127 MemoryContextSwitchTo(oldctx);
3128
3129 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3130
3131 /*
3132 * Does the updated tuple still satisfy the current
3133 * partition's constraint?
3134 */
3135 if (!partrel->rd_rel->relispartition ||
3136 ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3137 false))
3138 {
3139 /*
3140 * Yes, so simply UPDATE the partition. We don't call
3141 * apply_handle_update_internal() here, which would
3142 * normally do the following work, to avoid repeating some
3143 * work already done above to find the local tuple in the
3144 * partition.
3145 */
3146 InitConflictIndexes(partrelinfo);
3147
3148 EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3150 ACL_UPDATE);
3151 ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3152 localslot, remoteslot_part);
3153 }
3154 else
3155 {
3156 /* Move the tuple into the new partition. */
3157
3158 /*
3159 * New partition will be found using tuple routing, which
3160 * can only occur via the parent table. We might need to
3161 * convert the tuple to the parent's rowtype. Note that
3162 * this is the tuple found in the partition, not the
3163 * original search tuple received by this function.
3164 */
3165 if (map)
3166 {
3167 TupleConversionMap *PartitionToRootMap =
3169 RelationGetDescr(parentrel));
3170
3171 remoteslot =
3172 execute_attr_map_slot(PartitionToRootMap->attrMap,
3173 remoteslot_part, remoteslot);
3174 }
3175 else
3176 {
3177 remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3178 slot_getallattrs(remoteslot);
3179 }
3180
3181 /* Find the new partition. */
3183 partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3184 proute, remoteslot,
3185 estate);
3186 MemoryContextSwitchTo(oldctx);
3187 Assert(partrelinfo_new != partrelinfo);
3188 partrel_new = partrelinfo_new->ri_RelationDesc;
3189
3190 /* Check that new partition also has supported relkind. */
3191 CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3193 RelationGetRelationName(partrel_new));
3194
3195 /* DELETE old tuple found in the old partition. */
3196 EvalPlanQualSetSlot(&epqstate, localslot);
3198 ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
3199
3200 /* INSERT new tuple into the new partition. */
3201
3202 /*
3203 * Convert the replacement tuple to match the destination
3204 * partition rowtype.
3205 */
3207 remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3208 if (remoteslot_part == NULL)
3209 remoteslot_part = table_slot_create(partrel_new,
3210 &estate->es_tupleTable);
3211 map = ExecGetRootToChildMap(partrelinfo_new, estate);
3212 if (map != NULL)
3213 {
3214 remoteslot_part = execute_attr_map_slot(map->attrMap,
3215 remoteslot,
3216 remoteslot_part);
3217 }
3218 else
3219 {
3220 remoteslot_part = ExecCopySlot(remoteslot_part,
3221 remoteslot);
3222 slot_getallattrs(remoteslot);
3223 }
3224 MemoryContextSwitchTo(oldctx);
3225 apply_handle_insert_internal(edata, partrelinfo_new,
3226 remoteslot_part);
3227 }
3228
3229 EvalPlanQualEnd(&epqstate);
3230 }
3231 break;
3232
3233 default:
3234 elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3235 break;
3236 }
3237}
3238
3239/*
3240 * Handle TRUNCATE message.
3241 *
3242 * TODO: FDW support
3243 */
3244static void
3246{
3247 bool cascade = false;
3248 bool restart_seqs = false;
3249 List *remote_relids = NIL;
3250 List *remote_rels = NIL;
3251 List *rels = NIL;
3252 List *part_rels = NIL;
3253 List *relids = NIL;
3254 List *relids_logged = NIL;
3255 ListCell *lc;
3256 LOCKMODE lockmode = AccessExclusiveLock;
3257
3258 /*
3259 * Quick return if we are skipping data modification changes or handling
3260 * streamed transactions.
3261 */
3262 if (is_skipping_changes() ||
3264 return;
3265
3267
3268 remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3269
3270 foreach(lc, remote_relids)
3271 {
3272 LogicalRepRelId relid = lfirst_oid(lc);
3274
3275 rel = logicalrep_rel_open(relid, lockmode);
3277 {
3278 /*
3279 * The relation can't become interesting in the middle of the
3280 * transaction so it's safe to unlock it.
3281 */
3282 logicalrep_rel_close(rel, lockmode);
3283 continue;
3284 }
3285
3286 remote_rels = lappend(remote_rels, rel);
3288 rels = lappend(rels, rel->localrel);
3289 relids = lappend_oid(relids, rel->localreloid);
3291 relids_logged = lappend_oid(relids_logged, rel->localreloid);
3292
3293 /*
3294 * Truncate partitions if we got a message to truncate a partitioned
3295 * table.
3296 */
3297 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3298 {
3299 ListCell *child;
3300 List *children = find_all_inheritors(rel->localreloid,
3301 lockmode,
3302 NULL);
3303
3304 foreach(child, children)
3305 {
3306 Oid childrelid = lfirst_oid(child);
3307 Relation childrel;
3308
3309 if (list_member_oid(relids, childrelid))
3310 continue;
3311
3312 /* find_all_inheritors already got lock */
3313 childrel = table_open(childrelid, NoLock);
3314
3315 /*
3316 * Ignore temp tables of other backends. See similar code in
3317 * ExecuteTruncate().
3318 */
3319 if (RELATION_IS_OTHER_TEMP(childrel))
3320 {
3321 table_close(childrel, lockmode);
3322 continue;
3323 }
3324
3326 rels = lappend(rels, childrel);
3327 part_rels = lappend(part_rels, childrel);
3328 relids = lappend_oid(relids, childrelid);
3329 /* Log this relation only if needed for logical decoding */
3330 if (RelationIsLogicallyLogged(childrel))
3331 relids_logged = lappend_oid(relids_logged, childrelid);
3332 }
3333 }
3334 }
3335
3336 /*
3337 * Even if we used CASCADE on the upstream primary we explicitly default
3338 * to replaying changes without further cascading. This might be later
3339 * changeable with a user specified option.
3340 *
3341 * MySubscription->runasowner tells us whether we want to execute
3342 * replication actions as the subscription owner; the last argument to
3343 * TruncateGuts tells it whether we want to switch to the table owner.
3344 * Those are exactly opposite conditions.
3345 */
3347 relids,
3348 relids_logged,
3350 restart_seqs,
3352 foreach(lc, remote_rels)
3353 {
3354 LogicalRepRelMapEntry *rel = lfirst(lc);
3355
3357 }
3358 foreach(lc, part_rels)
3359 {
3360 Relation rel = lfirst(lc);
3361
3362 table_close(rel, NoLock);
3363 }
3364
3366}
3367
3368
3369/*
3370 * Logical replication protocol message dispatcher.
3371 */
3372void
3374{
3376 LogicalRepMsgType saved_command;
3377
3378 /*
3379 * Set the current command being applied. Since this function can be
3380 * called recursively when applying spooled changes, save the current
3381 * command.
3382 */
3383 saved_command = apply_error_callback_arg.command;
3385
3386 switch (action)
3387 {
3390 break;
3391
3394 break;
3395
3398 break;
3399
3402 break;
3403
3406 break;
3407
3410 break;
3411
3414 break;
3415
3418 break;
3419
3422 break;
3423
3425
3426 /*
3427 * Logical replication does not use generic logical messages yet.
3428 * Although, it could be used by other applications that use this
3429 * output plugin.
3430 */
3431 break;
3432
3435 break;
3436
3439 break;
3440
3443 break;
3444
3447 break;
3448
3451 break;
3452
3455 break;
3456
3459 break;
3460
3463 break;
3464
3467 break;
3468
3469 default:
3470 ereport(ERROR,
3471 (errcode(ERRCODE_PROTOCOL_VIOLATION),
3472 errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3473 }
3474
3475 /* Reset the current command */
3476 apply_error_callback_arg.command = saved_command;
3477}
3478
3479/*
3480 * Figure out which write/flush positions to report to the walsender process.
3481 *
3482 * We can't simply report back the last LSN the walsender sent us because the
3483 * local transaction might not yet be flushed to disk locally. Instead we
3484 * build a list that associates local with remote LSNs for every commit. When
3485 * reporting back the flush position to the sender we iterate that list and
3486 * check which entries on it are already locally flushed. Those we can report
3487 * as having been flushed.
3488 *
3489 * The have_pending_txes is true if there are outstanding transactions that
3490 * need to be flushed.
3491 */
3492static void
3494 bool *have_pending_txes)
3495{
3496 dlist_mutable_iter iter;
3497 XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3498
3500 *flush = InvalidXLogRecPtr;
3501
3503 {
3504 FlushPosition *pos =
3505 dlist_container(FlushPosition, node, iter.cur);
3506
3507 *write = pos->remote_end;
3508
3509 if (pos->local_end <= local_flush)
3510 {
3511 *flush = pos->remote_end;
3512 dlist_delete(iter.cur);
3513 pfree(pos);
3514 }
3515 else
3516 {
3517 /*
3518 * Don't want to uselessly iterate over the rest of the list which
3519 * could potentially be long. Instead get the last element and
3520 * grab the write position from there.
3521 */
3523 &lsn_mapping);
3524 *write = pos->remote_end;
3525 *have_pending_txes = true;
3526 return;
3527 }
3528 }
3529
3530 *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3531}
3532
3533/*
3534 * Store current remote/local lsn pair in the tracking list.
3535 */
3536void
3538{
3539 FlushPosition *flushpos;
3540
3541 /*
3542 * Skip for parallel apply workers, because the lsn_mapping is maintained
3543 * by the leader apply worker.
3544 */
3546 return;
3547
3548 /* Need to do this in permanent context */
3550
3551 /* Track commit lsn */
3552 flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
3553 flushpos->local_end = local_lsn;
3554 flushpos->remote_end = remote_lsn;
3555
3556 dlist_push_tail(&lsn_mapping, &flushpos->node);
3558}
3559
3560
3561/* Update statistics of the worker. */
3562static void
3563UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
3564{
3565 MyLogicalRepWorker->last_lsn = last_lsn;
3568 if (reply)
3569 {
3570 MyLogicalRepWorker->reply_lsn = last_lsn;
3571 MyLogicalRepWorker->reply_time = send_time;
3572 }
3573}
3574
3575/*
3576 * Apply main loop.
3577 */
3578static void
3580{
3581 TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3582 bool ping_sent = false;
3583 TimeLineID tli;
3584 ErrorContextCallback errcallback;
3585
3586 /*
3587 * Init the ApplyMessageContext which we clean up after each replication
3588 * protocol message.
3589 */
3591 "ApplyMessageContext",
3593
3594 /*
3595 * This memory context is used for per-stream data when the streaming mode
3596 * is enabled. This context is reset on each stream stop.
3597 */
3599 "LogicalStreamingContext",
3601
3602 /* mark as idle, before starting to loop */
3604
3605 /*
3606 * Push apply error context callback. Fields will be filled while applying
3607 * a change.
3608 */
3609 errcallback.callback = apply_error_callback;
3610 errcallback.previous = error_context_stack;
3611 error_context_stack = &errcallback;
3613
3614 /* This outer loop iterates once per wait. */
3615 for (;;)
3616 {
3618 int rc;
3619 int len;
3620 char *buf = NULL;
3621 bool endofstream = false;
3622 long wait_time;
3623
3625
3627
3629
3630 if (len != 0)
3631 {
3632 /* Loop to process all available data (without blocking). */
3633 for (;;)
3634 {
3636
3637 if (len == 0)
3638 {
3639 break;
3640 }
3641 else if (len < 0)
3642 {
3643 ereport(LOG,
3644 (errmsg("data stream from publisher has ended")));
3645 endofstream = true;
3646 break;
3647 }
3648 else
3649 {
3650 int c;
3652
3654 {
3655 ConfigReloadPending = false;
3657 }
3658
3659 /* Reset timeout. */
3660 last_recv_timestamp = GetCurrentTimestamp();
3661 ping_sent = false;
3662
3663 /* Ensure we are reading the data into our memory context. */
3665
3667
3668 c = pq_getmsgbyte(&s);
3669
3670 if (c == 'w')
3671 {
3672 XLogRecPtr start_lsn;
3673 XLogRecPtr end_lsn;
3674 TimestampTz send_time;
3675
3676 start_lsn = pq_getmsgint64(&s);
3677 end_lsn = pq_getmsgint64(&s);
3678 send_time = pq_getmsgint64(&s);
3679
3680 if (last_received < start_lsn)
3681 last_received = start_lsn;
3682
3683 if (last_received < end_lsn)
3684 last_received = end_lsn;
3685
3686 UpdateWorkerStats(last_received, send_time, false);
3687
3688 apply_dispatch(&s);
3689 }
3690 else if (c == 'k')
3691 {
3692 XLogRecPtr end_lsn;
3694 bool reply_requested;
3695
3696 end_lsn = pq_getmsgint64(&s);
3698 reply_requested = pq_getmsgbyte(&s);
3699
3700 if (last_received < end_lsn)
3701 last_received = end_lsn;
3702
3703 send_feedback(last_received, reply_requested, false);
3704 UpdateWorkerStats(last_received, timestamp, true);
3705 }
3706 /* other message types are purposefully ignored */
3707
3709 }
3710
3712 }
3713 }
3714
3715 /* confirm all writes so far */
3716 send_feedback(last_received, false, false);
3717
3719 {
3720 /*
3721 * If we didn't get any transactions for a while there might be
3722 * unconsumed invalidation messages in the queue, consume them
3723 * now.
3724 */
3727
3728 /* Process any table synchronization changes. */
3729 process_syncing_tables(last_received);
3730 }
3731
3732 /* Cleanup the memory. */
3735
3736 /* Check if we need to exit the streaming loop. */
3737 if (endofstream)
3738 break;
3739
3740 /*
3741 * Wait for more data or latch. If we have unflushed transactions,
3742 * wake up after WalWriterDelay to see if they've been flushed yet (in
3743 * which case we should send a feedback message). Otherwise, there's
3744 * no particular urgency about waking up unless we get data or a
3745 * signal.
3746 */
3748 wait_time = WalWriterDelay;
3749 else
3750 wait_time = NAPTIME_PER_CYCLE;
3751
3755 fd, wait_time,
3756 WAIT_EVENT_LOGICAL_APPLY_MAIN);
3757
3758 if (rc & WL_LATCH_SET)
3759 {
3762 }
3763
3765 {
3766 ConfigReloadPending = false;
3768 }
3769
3770 if (rc & WL_TIMEOUT)
3771 {
3772 /*
3773 * We didn't receive anything new. If we haven't heard anything
3774 * from the server for more than wal_receiver_timeout / 2, ping
3775 * the server. Also, if it's been longer than
3776 * wal_receiver_status_interval since the last update we sent,
3777 * send a status update to the primary anyway, to report any
3778 * progress in applying WAL.
3779 */
3780 bool requestReply = false;
3781
3782 /*
3783 * Check if time since last receive from primary has reached the
3784 * configured limit.
3785 */
3786 if (wal_receiver_timeout > 0)
3787 {
3789 TimestampTz timeout;
3790
3791 timeout =
3792 TimestampTzPlusMilliseconds(last_recv_timestamp,
3794
3795 if (now >= timeout)
3796 ereport(ERROR,
3797 (errcode(ERRCODE_CONNECTION_FAILURE),
3798 errmsg("terminating logical replication worker due to timeout")));
3799
3800 /* Check to see if it's time for a ping. */
3801 if (!ping_sent)
3802 {
3803 timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
3804 (wal_receiver_timeout / 2));
3805 if (now >= timeout)
3806 {
3807 requestReply = true;
3808 ping_sent = true;
3809 }
3810 }
3811 }
3812
3813 send_feedback(last_received, requestReply, requestReply);
3814
3815 /*
3816 * Force reporting to ensure long idle periods don't lead to
3817 * arbitrarily delayed stats. Stats can only be reported outside
3818 * of (implicit or explicit) transactions. That shouldn't lead to
3819 * stats being delayed for long, because transactions are either
3820 * sent as a whole on commit or streamed. Streamed transactions
3821 * are spilled to disk and applied on commit.
3822 */
3823 if (!IsTransactionState())
3824 pgstat_report_stat(true);
3825 }
3826 }
3827
3828 /* Pop the error context stack */
3829 error_context_stack = errcallback.previous;
3831
3832 /* All done */
3834}
3835
3836/*
3837 * Send a Standby Status Update message to server.
3838 *
3839 * 'recvpos' is the latest LSN we've received data to, force is set if we need
3840 * to send a response to avoid timeouts.
3841 */
3842static void
3843send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
3844{
3845 static StringInfo reply_message = NULL;
3846 static TimestampTz send_time = 0;
3847
3848 static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
3849 static XLogRecPtr last_writepos = InvalidXLogRecPtr;
3850 static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
3851
3852 XLogRecPtr writepos;
3853 XLogRecPtr flushpos;
3855 bool have_pending_txes;
3856
3857 /*
3858 * If the user doesn't want status to be reported to the publisher, be
3859 * sure to exit before doing anything at all.
3860 */
3861 if (!force && wal_receiver_status_interval <= 0)
3862 return;
3863
3864 /* It's legal to not pass a recvpos */
3865 if (recvpos < last_recvpos)
3866 recvpos = last_recvpos;
3867
3868 get_flush_position(&writepos, &flushpos, &have_pending_txes);
3869
3870 /*
3871 * No outstanding transactions to flush, we can report the latest received
3872 * position. This is important for synchronous replication.
3873 */
3874 if (!have_pending_txes)
3875 flushpos = writepos = recvpos;
3876
3877 if (writepos < last_writepos)
3878 writepos = last_writepos;
3879
3880 if (flushpos < last_flushpos)
3881 flushpos = last_flushpos;
3882
3884
3885 /* if we've already reported everything we're good */
3886 if (!force &&
3887 writepos == last_writepos &&
3888 flushpos == last_flushpos &&
3889 !TimestampDifferenceExceeds(send_time, now,
3891 return;
3892 send_time = now;
3893
3894 if (!reply_message)
3895 {
3897
3899 MemoryContextSwitchTo(oldctx);
3900 }
3901 else
3903
3905 pq_sendint64(reply_message, recvpos); /* write */
3906 pq_sendint64(reply_message, flushpos); /* flush */
3907 pq_sendint64(reply_message, writepos); /* apply */
3908 pq_sendint64(reply_message, now); /* sendTime */
3909 pq_sendbyte(reply_message, requestReply); /* replyRequested */
3910
3911 elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
3912 force,
3913 LSN_FORMAT_ARGS(recvpos),
3914 LSN_FORMAT_ARGS(writepos),
3915 LSN_FORMAT_ARGS(flushpos));
3916
3919
3920 if (recvpos > last_recvpos)
3921 last_recvpos = recvpos;
3922 if (writepos > last_writepos)
3923 last_writepos = writepos;
3924 if (flushpos > last_flushpos)
3925 last_flushpos = flushpos;
3926}
3927
3928/*
3929 * Exit routine for apply workers due to subscription parameter changes.
3930 */
3931static void
3933{
3935 {
3936 /*
3937 * Don't stop the parallel apply worker as the leader will detect the
3938 * subscription parameter change and restart logical replication later
3939 * anyway. This also prevents the leader from reporting errors when
3940 * trying to communicate with a stopped parallel apply worker, which
3941 * would accidentally disable subscriptions if disable_on_error was
3942 * set.
3943 */
3944 return;
3945 }
3946
3947 /*
3948 * Reset the last-start time for this apply worker so that the launcher
3949 * will restart it without waiting for wal_retrieve_retry_interval if the
3950 * subscription is still active, and so that we won't leak that hash table
3951 * entry if it isn't.
3952 */
3955
3956 proc_exit(0);
3957}
3958
3959/*
3960 * Reread subscription info if needed.
3961 *
3962 * For significant changes, we react by exiting the current process; a new
3963 * one will be launched afterwards if needed.
3964 */
3965void
3967{
3968 MemoryContext oldctx;
3970 bool started_tx = false;
3971
3972 /* When cache state is valid there is nothing to do here. */
3974 return;
3975
3976 /* This function might be called inside or outside of transaction. */
3977 if (!IsTransactionState())
3978 {
3980 started_tx = true;
3981 }
3982
3983 /* Ensure allocations in permanent context. */
3985
3987
3988 /*
3989 * Exit if the subscription was removed. This normally should not happen
3990 * as the worker gets killed during DROP SUBSCRIPTION.
3991 */
3992 if (!newsub)
3993 {
3994 ereport(LOG,
3995 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
3996 MySubscription->name)));
3997
3998 /* Ensure we remove no-longer-useful entry for worker's start time */
4001
4002 proc_exit(0);
4003 }
4004
4005 /* Exit if the subscription was disabled. */
4006 if (!newsub->enabled)
4007 {
4008 ereport(LOG,
4009 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
4010 MySubscription->name)));
4011
4013 }
4014
4015 /* !slotname should never happen when enabled is true. */
4016 Assert(newsub->slotname);
4017
4018 /* two-phase cannot be altered while the worker is running */
4019 Assert(newsub->twophasestate == MySubscription->twophasestate);
4020
4021 /*
4022 * Exit if any parameter that affects the remote connection was changed.
4023 * The launcher will start a new worker but note that the parallel apply
4024 * worker won't restart if the streaming option's value is changed from
4025 * 'parallel' to any other value or the server decides not to stream the
4026 * in-progress transaction.
4027 */
4028 if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
4029 strcmp(newsub->name, MySubscription->name) != 0 ||
4030 strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
4031 newsub->binary != MySubscription->binary ||
4032 newsub->stream != MySubscription->stream ||
4033 newsub->passwordrequired != MySubscription->passwordrequired ||
4034 strcmp(newsub->origin, MySubscription->origin) != 0 ||
4035 newsub->owner != MySubscription->owner ||
4036 !equal(newsub->publications, MySubscription->publications))
4037 {
4039 ereport(LOG,
4040 (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
4041 MySubscription->name)));
4042 else
4043 ereport(LOG,
4044 (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
4045 MySubscription->name)));
4046
4048 }
4049
4050 /*
4051 * Exit if the subscription owner's superuser privileges have been
4052 * revoked.
4053 */
4054 if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
4055 {
4057 ereport(LOG,
4058 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
4060 else
4061 ereport(LOG,
4062 errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
4064
4066 }
4067
4068 /* Check for other changes that should never happen too. */
4069 if (newsub->dbid != MySubscription->dbid)
4070 {
4071 elog(ERROR, "subscription %u changed unexpectedly",
4073 }
4074
4075 /* Clean old subscription info and switch to new one. */
4078
4079 MemoryContextSwitchTo(oldctx);
4080
4081 /* Change synchronous commit according to the user's wishes */
4082 SetConfigOption("synchronous_commit", MySubscription->synccommit,
4084
4085 if (started_tx)
4087
4088 MySubscriptionValid = true;
4089}
4090
4091/*
4092 * Callback from subscription syscache invalidation.
4093 */
4094static void
4095subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
4096{
4097 MySubscriptionValid = false;
4098}
4099
4100/*
4101 * subxact_info_write
4102 * Store information about subxacts for a toplevel transaction.
4103 *
4104 * For each subxact we store offset of its first change in the main file.
4105 * The file is always over-written as a whole.
4106 *
4107 * XXX We should only store subxacts that were not aborted yet.
4108 */
4109static void
4111{
4112 char path[MAXPGPATH];
4113 Size len;
4114 BufFile *fd;
4115
4117
4118 /* construct the subxact filename */
4119 subxact_filename(path, subid, xid);
4120
4121 /* Delete the subxacts file, if exists. */
4122 if (subxact_data.nsubxacts == 0)
4123 {
4126
4127 return;
4128 }
4129
4130 /*
4131 * Create the subxact file if it not already created, otherwise open the
4132 * existing file.
4133 */
4135 true);
4136 if (fd == NULL)
4138
4140
4141 /* Write the subxact count and subxact info */
4144
4146
4147 /* free the memory allocated for subxact info */
4149}
4150
4151/*
4152 * subxact_info_read
4153 * Restore information about subxacts of a streamed transaction.
4154 *
4155 * Read information about subxacts into the structure subxact_data that can be
4156 * used later.
4157 */
4158static void
4160{
4161 char path[MAXPGPATH];
4162 Size len;
4163 BufFile *fd;
4164 MemoryContext oldctx;
4165
4169
4170 /*
4171 * If the subxact file doesn't exist that means we don't have any subxact
4172 * info.
4173 */
4174 subxact_filename(path, subid, xid);
4176 true);
4177 if (fd == NULL)
4178 return;
4179
4180 /* read number of subxact items */
4182
4184
4185 /* we keep the maximum as a power of 2 */
4187
4188 /*
4189 * Allocate subxact information in the logical streaming context. We need
4190 * this information during the complete stream so that we can add the sub
4191 * transaction info to this. On stream stop we will flush this information
4192 * to the subxact file and reset the logical streaming context.
4193 */
4196 sizeof(SubXactInfo));
4197 MemoryContextSwitchTo(oldctx);
4198
4199 if (len > 0)
4201
4203}
4204
4205/*
4206 * subxact_info_add
4207 * Add information about a subxact (offset in the main file).
4208 */
4209static void
4211{
4212 SubXactInfo *subxacts = subxact_data.subxacts;
4213 int64 i;
4214
4215 /* We must have a valid top level stream xid and a stream fd. */
4217 Assert(stream_fd != NULL);
4218
4219 /*
4220 * If the XID matches the toplevel transaction, we don't want to add it.
4221 */
4222 if (stream_xid == xid)
4223 return;
4224
4225 /*
4226 * In most cases we're checking the same subxact as we've already seen in
4227 * the last call, so make sure to ignore it (this change comes later).
4228 */
4229 if (subxact_data.subxact_last == xid)
4230 return;
4231
4232 /* OK, remember we're processing this XID. */
4234
4235 /*
4236 * Check if the transaction is already present in the array of subxact. We
4237 * intentionally scan the array from the tail, because we're likely adding
4238 * a change for the most recent subtransactions.
4239 *
4240 * XXX Can we rely on the subxact XIDs arriving in sorted order? That
4241 * would allow us to use binary search here.
4242 */
4243 for (i = subxact_data.nsubxacts; i > 0; i--)
4244 {
4245 /* found, so we're done */
4246 if (subxacts[i - 1].xid == xid)
4247 return;
4248 }
4249
4250 /* This is a new subxact, so we need to add it to the array. */
4251 if (subxact_data.nsubxacts == 0)
4252 {
4253 MemoryContext oldctx;
4254
4256
4257 /*
4258 * Allocate this memory for subxacts in per-stream context, see
4259 * subxact_info_read.
4260 */
4262 subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
4263 MemoryContextSwitchTo(oldctx);
4264 }
4266 {
4268 subxacts = repalloc(subxacts,
4270 }
4271
4272 subxacts[subxact_data.nsubxacts].xid = xid;
4273
4274 /*
4275 * Get the current offset of the stream file and store it as offset of
4276 * this subxact.
4277 */
4279 &subxacts[subxact_data.nsubxacts].fileno,
4280 &subxacts[subxact_data.nsubxacts].offset);
4281
4283 subxact_data.subxacts = subxacts;
4284}
4285
4286/* format filename for file containing the info about subxacts */
4287static inline void
4288subxact_filename(char *path, Oid subid, TransactionId xid)
4289{
4290 snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
4291}
4292
4293/* format filename for file containing serialized changes */
4294static inline void
4295changes_filename(char *path, Oid subid, TransactionId xid)
4296{
4297 snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
4298}
4299
4300/*
4301 * stream_cleanup_files
4302 * Cleanup files for a subscription / toplevel transaction.
4303 *
4304 * Remove files with serialized changes and subxact info for a particular
4305 * toplevel transaction. Each subscription has a separate set of files
4306 * for any toplevel transaction.
4307 */
4308void
4310{
4311 char path[MAXPGPATH];
4312
4313 /* Delete the changes file. */
4314 changes_filename(path, subid, xid);
4316
4317 /* Delete the subxact file, if it exists. */
4318 subxact_filename(path, subid, xid);
4320}
4321
4322/*
4323 * stream_open_file
4324 * Open a file that we'll use to serialize changes for a toplevel
4325 * transaction.
4326 *
4327 * Open a file for streamed changes from a toplevel transaction identified
4328 * by stream_xid (global variable). If it's the first chunk of streamed
4329 * changes for this transaction, create the buffile, otherwise open the
4330 * previously created file.
4331 */
4332static void
4333stream_open_file(Oid subid, TransactionId xid, bool first_segment)
4334{
4335 char path[MAXPGPATH];
4336 MemoryContext oldcxt;
4337
4338 Assert(OidIsValid(subid));
4340 Assert(stream_fd == NULL);
4341
4342
4343 changes_filename(path, subid, xid);
4344 elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
4345
4346 /*
4347 * Create/open the buffiles under the logical streaming context so that we
4348 * have those files until stream stop.
4349 */
4351
4352 /*
4353 * If this is the first streamed segment, create the changes file.
4354 * Otherwise, just open the file for writing, in append mode.
4355 */
4356 if (first_segment)
4358 path);
4359 else
4360 {
4361 /*
4362 * Open the file and seek to the end of the file because we always
4363 * append the changes file.
4364 */
4366 path, O_RDWR, false);
4367 BufFileSeek(stream_fd, 0, 0, SEEK_END);
4368 }
4369
4370 MemoryContextSwitchTo(oldcxt);
4371}
4372
4373/*
4374 * stream_close_file
4375 * Close the currently open file with streamed changes.
4376 */
4377static void
4379{
4380 Assert(stream_fd != NULL);
4381
4383
4384 stream_fd = NULL;
4385}
4386
4387/*
4388 * stream_write_change
4389 * Serialize a change to a file for the current toplevel transaction.
4390 *
4391 * The change is serialized in a simple format, with length (not including
4392 * the length), action code (identifying the message type) and message
4393 * contents (without the subxact TransactionId value).
4394 */
4395static void
4397{
4398 int len;
4399
4400 Assert(stream_fd != NULL);
4401
4402 /* total on-disk size, including the action type character */
4403 len = (s->len - s->cursor) + sizeof(char);
4404
4405 /* first write the size */
4406 BufFileWrite(stream_fd, &len, sizeof(len));
4407
4408 /* then the action */
4409 BufFileWrite(stream_fd, &action, sizeof(action));
4410
4411 /* and finally the remaining part of the buffer (after the XID) */
4412 len = (s->len - s->cursor);
4413
4415}
4416
4417/*
4418 * stream_open_and_write_change
4419 * Serialize a message to a file for the given transaction.
4420 *
4421 * This function is similar to stream_write_change except that it will open the
4422 * target file if not already before writing the message and close the file at
4423 * the end.
4424 */
4425static void
4427{
4429
4430 if (!stream_fd)
4431 stream_start_internal(xid, false);
4432
4435}
4436
4437/*
4438 * Sets streaming options including replication slot name and origin start
4439 * position. Workers need these options for logical replication.
4440 */
4441void
4443 char *slotname,
4444 XLogRecPtr *origin_startpos)
4445{
4446 int server_version;
4447
4448 options->logical = true;
4449 options->startpoint = *origin_startpos;
4450 options->slotname = slotname;
4451
4453 options->proto.logical.proto_version =
4458
4459 options->proto.logical.publication_names = MySubscription->publications;
4460 options->proto.logical.binary = MySubscription->binary;
4461
4462 /*
4463 * Assign the appropriate option value for streaming option according to
4464 * the 'streaming' mode and the publisher's ability to support that mode.
4465 */
4466 if (server_version >= 160000 &&
4467 MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
4468 {
4469 options->proto.logical.streaming_str = "parallel";
4471 }
4472 else if (server_version >= 140000 &&
4473 MySubscription->stream != LOGICALREP_STREAM_OFF)
4474 {
4475 options->proto.logical.streaming_str = "on";
4477 }
4478 else
4479 {
4480 options->proto.logical.streaming_str = NULL;
4482 }
4483
4484 options->proto.logical.twophase = false;
4485 options->proto.logical.origin = pstrdup(MySubscription->origin);
4486}
4487
4488/*
4489 * Cleanup the memory for subxacts and reset the related variables.
4490 */
4491static inline void
4493{
4496
4497 subxact_data.subxacts = NULL;
4501}
4502
4503/*
4504 * Common function to run the apply loop with error handling. Disable the
4505 * subscription, if necessary.
4506 *
4507 * Note that we don't handle FATAL errors which are probably because
4508 * of system resource error and are not repeatable.
4509 */
4510void
4511start_apply(XLogRecPtr origin_startpos)
4512{
4513 PG_TRY();
4514 {
4515 LogicalRepApplyLoop(origin_startpos);
4516 }
4517 PG_CATCH();
4518 {
4521 else
4522 {
4523 /*
4524 * Report the worker failed while applying changes. Abort the
4525 * current transaction so that the stats message is sent in an
4526 * idle state.
4527 */
4530
4531 PG_RE_THROW();
4532 }
4533 }
4534 PG_END_TRY();
4535}
4536
4537/*
4538 * Runs the leader apply worker.
4539 *
4540 * It sets up replication origin, streaming options and then starts streaming.
4541 */
4542static void
4544{
4545 char originname[NAMEDATALEN];
4546 XLogRecPtr origin_startpos = InvalidXLogRecPtr;
4547 char *slotname = NULL;
4549 RepOriginId originid;
4550 TimeLineID startpointTLI;
4551 char *err;
4552 bool must_use_password;
4553
4554 slotname = MySubscription->slotname;
4555
4556 /*
4557 * This shouldn't happen if the subscription is enabled, but guard against
4558 * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
4559 * slot is NULL.)
4560 */
4561 if (!slotname)
4562 ereport(ERROR,
4563 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
4564 errmsg("subscription has no replication slot set")));
4565
4566 /* Setup replication origin tracking. */
4568 originname, sizeof(originname));
4570 originid = replorigin_by_name(originname, true);
4571 if (!OidIsValid(originid))
4572 originid = replorigin_create(originname);
4573 replorigin_session_setup(originid, 0);
4574 replorigin_session_origin = originid;
4575 origin_startpos = replorigin_session_get_progress(false);
4577
4578 /* Is the use of a password mandatory? */
4579 must_use_password = MySubscription->passwordrequired &&
4581
4583 true, must_use_password,
4585
4586 if (LogRepWorkerWalRcvConn == NULL)
4587 ereport(ERROR,
4588 (errcode(ERRCODE_CONNECTION_FAILURE),
4589 errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
4590 MySubscription->name, err)));
4591
4592 /*
4593 * We don't really use the output identify_system for anything but it does
4594 * some initializations on the upstream so let's still call it.
4595 */
4596 (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
4597
4599
4600 set_stream_options(&options, slotname, &origin_startpos);
4601
4602 /*
4603 * Even when the two_phase mode is requested by the user, it remains as
4604 * the tri-state PENDING until all tablesyncs have reached READY state.
4605 * Only then, can it become ENABLED.
4606 *
4607 * Note: If the subscription has no tables then leave the state as
4608 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
4609 * work.
4610 */
4611 if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
4613 {
4614 /* Start streaming with two_phase enabled */
4615 options.proto.logical.twophase = true;
4617
4619 UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
4620 MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
4622 }
4623 else
4624 {
4626 }
4627
4629 (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
4631 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
4632 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
4633 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
4634 "?")));
4635
4636 /* Run the main loop. */
4637 start_apply(origin_startpos);
4638}
4639
4640/*
4641 * Common initialization for leader apply worker, parallel apply worker and
4642 * tablesync worker.
4643 *
4644 * Initialize the database connection, in-memory subscription and necessary
4645 * config options.
4646 */
4647void
4649{
4650 MemoryContext oldctx;
4651
4652 /* Run as replica session replication role. */
4653 SetConfigOption("session_replication_role", "replica",
4655
4656 /* Connect to our database. */
4659 0);
4660
4661 /*
4662 * Set always-secure search path, so malicious users can't redirect user
4663 * code (e.g. pg_index.indexprs).
4664 */
4665 SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
4666
4667 /* Load the subscription into persistent memory context. */
4669 "ApplyContext",
4673
4675 if (!MySubscription)
4676 {
4677 ereport(LOG,
4678 (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
4680
4681 /* Ensure we remove no-longer-useful entry for worker's start time */
4684
4685 proc_exit(0);
4686 }
4687
4688 MySubscriptionValid = true;
4689 MemoryContextSwitchTo(oldctx);
4690
4691 if (!MySubscription->enabled)
4692 {
4693 ereport(LOG,
4694 (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
4695 MySubscription->name)));
4696
4698 }
4699
4700 /* Setup synchronous commit according to the user's wishes */
4701 SetConfigOption("synchronous_commit", MySubscription->synccommit,
4703
4704 /*
4705 * Keep us informed about subscription or role changes. Note that the
4706 * role's superuser privilege can be revoked.
4707 */
4708 CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
4710 (Datum) 0);
4711
4714 (Datum) 0);
4715
4716 if (am_tablesync_worker())
4717 ereport(LOG,
4718 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
4721 else
4722 ereport(LOG,
4723 (errmsg("logical replication apply worker for subscription \"%s\" has started",
4724 MySubscription->name)));
4725
4727}
4728
4729/*
4730 * Reset the origin state.
4731 */
4732static void
4734{
4738}
4739
4740/* Common function to setup the leader apply or tablesync worker. */
4741void
4743{
4744 /* Attach to slot */
4745 logicalrep_worker_attach(worker_slot);
4746
4748
4749 /* Setup signal handling */
4751 pqsignal(SIGTERM, die);
4753
4754 /*
4755 * We don't currently need any ResourceOwner in a walreceiver process, but
4756 * if we did, we could call CreateAuxProcessResourceOwner here.
4757 */
4758
4759 /* Initialise stats to a sanish value */
4762
4763 /* Load the libpq-specific functions */
4764 load_file("libpqwalreceiver", false);
4765
4767
4768 /*
4769 * Register a callback to reset the origin state before aborting any
4770 * pending transaction during shutdown (see ShutdownPostgres()). This will
4771 * avoid origin advancement for an in-complete transaction which could
4772 * otherwise lead to its loss as such a transaction won't be sent by the
4773 * server again.
4774 *
4775 * Note that even a LOG or DEBUG statement placed after setting the origin
4776 * state may process a shutdown signal before committing the current apply
4777 * operation. So, it is important to register such a callback here.
4778 */
4780
4781 /* Connect to the origin and start the replication. */
4782 elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
4784
4785 /*
4786 * Setup callback for syscache so that we know when something changes in
4787 * the subscription relation state.
4788 */
4789 CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
4791 (Datum) 0);
4792}
4793
4794/* Logical Replication Apply worker entry point */
4795void
4797{
4798 int worker_slot = DatumGetInt32(main_arg);
4799
4801
4802 SetupApplyOrSyncWorker(worker_slot);
4803
4805
4807
4808 proc_exit(0);
4809}
4810
4811/*
4812 * After error recovery, disable the subscription in a new transaction
4813 * and exit cleanly.
4814 */
4815void
4817{
4818 /*
4819 * Emit the error message, and recover from the error state to an idle
4820 * state
4821 */
4823
4827
4829
4830 /* Report the worker failed during either table synchronization or apply */
4833
4834 /* Disable the subscription */
4838
4839 /* Ensure we remove no-longer-useful entry for worker's start time */
4842
4843 /* Notify the subscription has been disabled and exit */
4844 ereport(LOG,
4845 errmsg("subscription \"%s\" has been disabled because of an error",
4847
4848 proc_exit(0);
4849}
4850
4851/*
4852 * Is current process a logical replication worker?
4853 */
4854bool
4856{
4857 return MyLogicalRepWorker != NULL;
4858}
4859
4860/*
4861 * Is current process a logical replication parallel apply worker?
4862 */
4863bool
4865{
4867}
4868
4869/*
4870 * Start skipping changes of the transaction if the given LSN matches the
4871 * LSN specified by subscription's skiplsn.
4872 */
4873static void
4875{
4879
4880 /*
4881 * Quick return if it's not requested to skip this transaction. This
4882 * function is called for every remote transaction and we assume that
4883 * skipping the transaction is not used often.
4884 */
4886 MySubscription->skiplsn != finish_lsn))
4887 return;
4888
4889 /* Start skipping all changes of this transaction */
4890 skip_xact_finish_lsn = finish_lsn;
4891
4892 ereport(LOG,
4893 errmsg("logical replication starts skipping transaction at LSN %X/%X",
4895}
4896
4897/*
4898 * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
4899 */
4900static void
4902{
4903 if (!is_skipping_changes())
4904 return;
4905
4906 ereport(LOG,
4907 (errmsg("logical replication completed skipping transaction at LSN %X/%X",
4909
4910 /* Stop skipping changes */
4912}
4913
4914/*
4915 * Clear subskiplsn of pg_subscription catalog.
4916 *
4917 * finish_lsn is the transaction's finish LSN that is used to check if the
4918 * subskiplsn matches it. If not matched, we raise a warning when clearing the
4919 * subskiplsn in order to inform users for cases e.g., where the user mistakenly
4920 * specified the wrong subskiplsn.
4921 */
4922static void
4924{
4925 Relation rel;
4926 Form_pg_subscription subform;
4927 HeapTuple tup;
4928 XLogRecPtr myskiplsn = MySubscription->skiplsn;
4929 bool started_tx = false;
4930
4932 return;
4933
4934 if (!IsTransactionState())
4935 {
4937 started_tx = true;
4938 }
4939
4940 /*
4941 * Protect subskiplsn of pg_subscription from being concurrently updated
4942 * while clearing it.
4943 */
4944 LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
4946
4947 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
4948
4949 /* Fetch the existing tuple. */
4950 tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
4952
4953 if (!HeapTupleIsValid(tup))
4954 elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
4955
4956 subform = (Form_pg_subscription) GETSTRUCT(tup);
4957
4958 /*
4959 * Clear the subskiplsn. If the user has already changed subskiplsn before
4960 * clearing it we don't update the catalog and the replication origin
4961 * state won't get advanced. So in the worst case, if the server crashes
4962 * before sending an acknowledgment of the flush position the transaction
4963 * will be sent again and the user needs to set subskiplsn again. We can
4964 * reduce the possibility by logging a replication origin WAL record to
4965 * advance the origin LSN instead but there is no way to advance the
4966 * origin timestamp and it doesn't seem to be worth doing anything about
4967 * it since it's a very rare case.
4968 */
4969 if (subform->subskiplsn == myskiplsn)
4970 {
4971 bool nulls[Natts_pg_subscription];
4972 bool replaces[Natts_pg_subscription];
4973 Datum values[Natts_pg_subscription];
4974
4975 memset(values, 0, sizeof(values));
4976 memset(nulls, false, sizeof(nulls));
4977 memset(replaces, false, sizeof(replaces));
4978
4979 /* reset subskiplsn */
4980 values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
4981 replaces[Anum_pg_subscription_subskiplsn - 1] = true;
4982
4983 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
4984 replaces);
4985 CatalogTupleUpdate(rel, &tup->t_self, tup);
4986
4987 if (myskiplsn != finish_lsn)
4989 errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
4990 errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
4991 LSN_FORMAT_ARGS(finish_lsn),
4992 LSN_FORMAT_ARGS(myskiplsn)));
4993 }
4994
4995 heap_freetuple(tup);
4996 table_close(rel, NoLock);
4997
4998 if (started_tx)
5000}
5001
5002/* Error callback to give more context info about the change being applied */
5003void
5005{
5007 int elevel;
5008
5010 return;
5011
5012 Assert(errarg->origin_name);
5013
5014 elevel = geterrlevel();
5015
5016 /*
5017 * Reset the origin state to prevent the advancement of origin progress if
5018 * we fail to apply. Otherwise, this will result in transaction loss as
5019 * that transaction won't be sent again by the server.
5020 */
5021 if (elevel >= ERROR)
5022 replorigin_reset(0, (Datum) 0);
5023
5024 if (errarg->rel == NULL)
5025 {
5026 if (!TransactionIdIsValid(errarg->remote_xid))
5027 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
5028 errarg->origin_name,
5030 else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5031 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
5032 errarg->origin_name,
5034 errarg->remote_xid);
5035 else
5036 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
5037 errarg->origin_name,
5039 errarg->remote_xid,
5040 LSN_FORMAT_ARGS(errarg->finish_lsn));
5041 }
5042 else
5043 {
5044 if (errarg->remote_attnum < 0)
5045 {
5046 if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5047 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
5048 errarg->origin_name,
5050 errarg->rel->remoterel.nspname,
5051 errarg->rel->remoterel.relname,
5052 errarg->remote_xid);
5053 else
5054 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
5055 errarg->origin_name,
5057 errarg->rel->remoterel.nspname,
5058 errarg->rel->remoterel.relname,
5059 errarg->remote_xid,
5060 LSN_FORMAT_ARGS(errarg->finish_lsn));
5061 }
5062 else
5063 {
5064 if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5065 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
5066 errarg->origin_name,
5068 errarg->rel->remoterel.nspname,
5069 errarg->rel->remoterel.relname,
5070 errarg->rel->remoterel.attnames[errarg->remote_attnum],
5071 errarg->remote_xid);
5072 else
5073 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
5074 errarg->origin_name,
5076 errarg->rel->remoterel.nspname,
5077 errarg->rel->remoterel.relname,
5078 errarg->rel->remoterel.attnames[errarg->remote_attnum],
5079 errarg->remote_xid,
5080 LSN_FORMAT_ARGS(errarg->finish_lsn));
5081 }
5082 }
5083}
5084
5085/* Set transaction information of apply error callback */
5086static inline void
5088{
5091}
5092
5093/* Reset all information of apply error callback */
5094static inline void
5096{
5101}
5102
5103/*
5104 * Request wakeup of the workers for the given subscription OID
5105 * at commit of the current transaction.
5106 *
5107 * This is used to ensure that the workers process assorted changes
5108 * as soon as possible.
5109 */
5110void
5112{
5113 MemoryContext oldcxt;
5114
5118 MemoryContextSwitchTo(oldcxt);
5119}
5120
5121/*
5122 * Wake up the workers of any subscriptions that were changed in this xact.
5123 */
5124void
5126{
5127 if (isCommit && on_commit_wakeup_workers_subids != NIL)
5128 {
5129 ListCell *lc;
5130
5131 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
5133 {
5134 Oid subid = lfirst_oid(lc);
5135 List *workers;
5136 ListCell *lc2;
5137
5138 workers = logicalrep_workers_find(subid, true, false);
5139 foreach(lc2, workers)
5140 {
5141 LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
5142
5144 }
5145 }
5146 LWLockRelease(LogicalRepWorkerLock);
5147 }
5148
5149 /* The List storage will be reclaimed automatically in xact cleanup. */
5151}
5152
5153/*
5154 * Allocate the origin name in long-lived context for error context message.
5155 */
5156void
5158{
5160 originname);
5161}
5162
5163/*
5164 * Return the action to be taken for the given transaction. See
5165 * TransApplyAction for information on each of the actions.
5166 *
5167 * *winfo is assigned to the destination parallel worker info when the leader
5168 * apply worker has to pass all the transaction's changes to the parallel
5169 * apply worker.
5170 */
5171static TransApplyAction
5173{
5174 *winfo = NULL;
5175
5177 {
5178 return TRANS_PARALLEL_APPLY;
5179 }
5180
5181 /*
5182 * If we are processing this transaction using a parallel apply worker
5183 * then either we send the changes to the parallel worker or if the worker
5184 * is busy then serialize the changes to the file which will later be
5185 * processed by the parallel worker.
5186 */
5187 *winfo = pa_find_worker(xid);
5188
5189 if (*winfo && (*winfo)->serialize_changes)
5190 {
5192 }
5193 else if (*winfo)
5194 {
5196 }
5197
5198 /*
5199 * If there is no parallel worker involved to process this transaction
5200 * then we either directly apply the change or serialize it to a file
5201 * which will later be applied when the transaction finish message is
5202 * processed.
5203 */
5204 else if (in_streamed_transaction)
5205 {
5207 }
5208 else
5209 {
5210 return TRANS_LEADER_APPLY;
5211 }
5212}
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2639
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4024
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_reset_subtrans(void)
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerShared * MyParallelShared
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_allocate_worker(TransactionId xid)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void pa_decr_and_wait_stream_block(void)
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:424
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:2509
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:4288
static void begin_replication_step(void)
Definition: worker.c:506
static void end_replication_step(void)
Definition: worker.c:529
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:650
static void cleanup_subxact_info(void)
Definition: worker.c:4492
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
Definition: worker.c:4442
static void apply_handle_stream_prepare(StringInfo s)
Definition: worker.c:1285
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition: worker.c:2484
static void subxact_info_add(TransactionId xid)
Definition: worker.c:4210
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:4309
MemoryContext ApplyMessageContext
Definition: worker.c:291
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:466
static void apply_handle_type(StringInfo s)
Definition: worker.c:2346
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:3245
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:3563
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:4095
static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
Definition: worker.c:5172
TransApplyAction
Definition: worker.c:267
@ TRANS_LEADER_SERIALIZE
Definition: worker.c:272
@ TRANS_PARALLEL_APPLY
Definition: worker.c:275
@ TRANS_LEADER_SEND_TO_PARALLEL
Definition: worker.c:273
@ TRANS_LEADER_APPLY
Definition: worker.c:269
@ TRANS_LEADER_PARTIAL_SERIALIZE
Definition: worker.c:274
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:557
static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
Definition: worker.c:4426
struct ApplyExecutionData ApplyExecutionData
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:4295
bool InitializingApplyWorker
Definition: worker.c:319
static void apply_worker_exit(void)
Definition: worker.c:3932
static BufFile * stream_fd
Definition: worker.c:340
static void apply_handle_update(StringInfo s)
Definition: worker.c:2550
void stream_stop_internal(TransactionId xid)
Definition: worker.c:1625
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:2153
void start_apply(XLogRecPtr origin_startpos)
Definition: worker.c:4511
static void stop_skipping_changes(void)
Definition: worker.c:4901
struct ApplySubXactData ApplySubXactData
#define NAPTIME_PER_CYCLE
Definition: worker.c:196
static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:2920
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:3493
static uint32 parallel_stream_nchanges
Definition: worker.c:316
static void apply_handle_commit_prepared(StringInfo s)
Definition: worker.c:1178
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:3579
void LogicalRepWorkersWakeupAtCommit(Oid subid)
Definition: worker.c:5111
bool IsLogicalWorker(void)
Definition: worker.c:4855
static ApplySubXactData subxact_data
Definition: worker.c:358
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
Definition: worker.c:2968
static ApplyErrorCallbackArg apply_error_callback_arg
Definition: worker.c:279
bool in_remote_transaction
Definition: worker.c:304
static XLogRecPtr skip_xact_finish_lsn
Definition: worker.c:336
static void stream_open_file(Oid subid, TransactionId xid, bool first_segment)
Definition: worker.c:4333
static void apply_handle_delete(StringInfo s)
Definition: worker.c:2758
void apply_dispatch(StringInfo s)
Definition: worker.c:3373
#define is_skipping_changes()
Definition: worker.c:337
static void stream_write_change(char action, StringInfo s)
Definition: worker.c:4396
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
Definition: worker.c:4923
static void replorigin_reset(int code, Datum arg)
Definition: worker.c:4733
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
Definition: worker.c:2667
static void ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, off_t offset)
Definition: worker.c:1991
static void apply_handle_begin(StringInfo s)
Definition: worker.c:990
void DisableSubscriptionAndExit(void)
Definition: worker.c:4816
static dlist_head lsn_mapping
Definition: worker.c:205
bool IsLogicalParallelApplyWorker(void)
Definition: worker.c:4864
void AtEOXact_LogicalRepWorkers(bool isCommit)
Definition: worker.c:5125
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:796
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:426
static void finish_edata(ApplyExecutionData *edata)
Definition: worker.c:708
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:897
static void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:5087
ErrorContextCallback * apply_error_context_stack
Definition: worker.c:289
static void stream_abort_internal(TransactionId xid, TransactionId subxid)
Definition: worker.c:1751
static void apply_handle_commit(StringInfo s)
Definition: worker.c:1015
void stream_start_internal(TransactionId xid, bool first_segment)
Definition: worker.c:1451
static List * on_commit_wakeup_workers_subids
Definition: worker.c:302
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:1834
static void apply_handle_relation(StringInfo s)
Definition: worker.c:2323
void set_apply_error_context_origin(char *originname)
Definition: worker.c:5157
struct ApplyErrorCallbackArg ApplyErrorCallbackArg
MemoryContext ApplyContext
Definition: worker.c:292
static void subxact_info_write(Oid subid, TransactionId xid)
Definition: worker.c:4110
static void TargetPrivilegesCheck(Relation rel, AclMode mode)
Definition: worker.c:2361
static void apply_handle_prepare(StringInfo s)
Definition: worker.c:1107
static void apply_handle_rollback_prepared(StringInfo s)
Definition: worker.c:1227
static void run_apply_worker()
Definition: worker.c:4543
void SetupApplyOrSyncWorker(int worker_slot)
Definition: worker.c:4742
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:1648
static void apply_handle_origin(StringInfo s)
Definition: worker.c:1430
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:3843
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:297
static XLogRecPtr remote_final_lsn
Definition: worker.c:305
static bool MySubscriptionValid
Definition: worker.c:300
void apply_error_callback(void *arg)
Definition: worker.c:5004
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition: worker.c:3537
static MemoryContext LogicalStreamingContext
Definition: worker.c:295
void maybe_reread_subscription(void)
Definition: worker.c:3966
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
Definition: worker.c:2263
void InitializeLogRepWorker(void)
Definition: worker.c:4648
static bool in_streamed_transaction
Definition: worker.c:308
struct SubXactInfo SubXactInfo
static void apply_handle_begin_prepare(StringInfo s)
Definition: worker.c:1041
struct FlushPosition FlushPosition
void ApplyWorkerMain(Datum main_arg)
Definition: worker.c:4796
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:2023
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:1489
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Definition: worker.c:4874
Subscription * MySubscription
Definition: worker.c:299
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
Definition: worker.c:1070
static void stream_close_file(void)
Definition: worker.c:4378
static TransactionId stream_xid
Definition: worker.c:310
static void apply_handle_insert(StringInfo s)
Definition: worker.c:2393
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:739
static void subxact_info_read(Oid subid, TransactionId xid)
Definition: worker.c:4159
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
Definition: worker.c:2852
static void reset_apply_error_context_info(void)
Definition: worker.c:5095
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1781
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_IDLE
@ STATE_IDLEINTRANSACTION
@ STATE_RUNNING
void BackgroundWorkerUnblockSignals(void)
Definition: bgworker.c:926
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: bgworker.c:886
Bitmapset * bms_make_singleton(int x)
Definition: bitmapset.c:216
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:815
static Datum values[MAXATTR]
Definition: bootstrap.c:151
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:291
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:654
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
Definition: buffile.c:833
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition: buffile.c:676
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
Definition: buffile.c:664
void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset)
Definition: buffile.c:928
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition: buffile.c:267
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:740
void BufFileClose(BufFile *file)
Definition: buffile.c:412
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
Definition: buffile.c:364
#define likely(x)
Definition: c.h:346
int64_t int64
Definition: c.h:499
uint32_t uint32
Definition: c.h:502
uint32 TransactionId
Definition: c.h:623
#define OidIsValid(objectId)
Definition: c.h:746
size_t Size
Definition: c.h:576
void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples)
Definition: conflict.c:103
void InitConflictIndexes(ResultRelInfo *relInfo)
Definition: conflict.c:138
bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, RepOriginId *localorigin, TimestampTz *localts)
Definition: conflict.c:62
@ CT_DELETE_MISSING
Definition: conflict.h:42
@ CT_UPDATE_ORIGIN_DIFFERS
Definition: conflict.h:30
@ CT_UPDATE_MISSING
Definition: conflict.h:36
@ CT_DELETE_ORIGIN_DIFFERS
Definition: conflict.h:39
int64 TimestampTz
Definition: timestamp.h:39
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:134
int my_log2(long num)
Definition: dynahash.c:1794
int geterrlevel(void)
Definition: elog.c:1600
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1158
void EmitErrorReport(void)
Definition: elog.c:1709
int errdetail(const char *fmt,...)
Definition: elog.c:1204
ErrorContextCallback * error_context_stack
Definition: elog.c:95
void FlushErrorState(void)
Definition: elog.c:1889
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define LOG
Definition: elog.h:31
#define PG_RE_THROW()
Definition: elog.h:405
#define errcontext
Definition: elog.h:197
#define PG_TRY(...)
Definition: elog.h:372
#define WARNING
Definition: elog.h:36
#define DEBUG2
Definition: elog.h:29
#define PG_END_TRY(...)
Definition: elog.h:397
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:382
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:149
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:223
void err(int eval, const char *fmt,...)
Definition: err.c:43
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:143
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:238
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:160
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1932
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam, List *resultRelations)
Definition: execMain.c:2794
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition: execMain.c:1329
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:3249
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1380
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1741
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2020
TupleConversionMap * ExecGetRootToChildMap(ResultRelInfo *resultRelInfo, EState *estate)
Definition: execUtils.c:1327
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos, Bitmapset *unpruned_relids)
Definition: execUtils.c:774
void FreeExecutorState(EState *estate)
Definition: execUtils.c:193
EState * CreateExecutorState(void)
Definition: execUtils.c:88
#define GetPerTupleExprContext(estate)
Definition: executor.h:678
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:683
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:287
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:415
void FileSetInit(FileSet *fileset)
Definition: fileset.c:52
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
Definition: fmgr.c:1772
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1754
struct Latch * MyLatch
Definition: globals.c:64
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:4332
@ PGC_S_OVERRIDE
Definition: guc.h:123
@ PGC_SUSET
Definition: guc.h:78
@ PGC_SIGHUP
Definition: guc.h:75
@ PGC_BACKEND
Definition: guc.h:77
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1210
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
Definition: htup_details.h:728
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:612
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
#define DLIST_STATIC_INIT(name)
Definition: ilist.h:281
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:177
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:133
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
#define write(a, b, c)
Definition: win32.h:14
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:65
void AcceptInvalidationMessages(void)
Definition: inval.c:929
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1802
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337
void proc_exit(int code)
Definition: ipc.c:104
int i
Definition: isn.c:77
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:221
void ResetLatch(Latch *latch)
Definition: latch.c:372
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition: launcher.c:266
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:693
void logicalrep_worker_attach(int slot)
Definition: launcher.c:704
void logicalrep_worker_wakeup(Oid subid, Oid relid)
Definition: launcher.c:673
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:54
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition: launcher.c:1072
List * lappend(List *list, void *datum)
Definition: list.c:339
List * lappend_oid(List *list, Oid datum)
Definition: list.c:375
List * list_append_unique_oid(List *list, Oid datum)
Definition: list.c:1380
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:722
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1082
int LOCKMODE
Definition: lockdefs.h:26
#define NoLock
Definition: lockdefs.h:34
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define AccessShareLock
Definition: lockdefs.h:36
#define RowExclusiveLock
Definition: lockdefs.h:38
@ LockTupleExclusive
Definition: lockoptions.h:58
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
Definition: logicalproto.h:44
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:42
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:43
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:97
LogicalRepMsgType
Definition: logicalproto.h:58
@ LOGICAL_REP_MSG_INSERT
Definition: logicalproto.h:62
@ LOGICAL_REP_MSG_TRUNCATE
Definition: logicalproto.h:65
@ LOGICAL_REP_MSG_STREAM_STOP
Definition: logicalproto.h:74
@ LOGICAL_REP_MSG_BEGIN
Definition: logicalproto.h:59
@ LOGICAL_REP_MSG_STREAM_PREPARE
Definition: logicalproto.h:77
@ LOGICAL_REP_MSG_STREAM_ABORT
Definition: logicalproto.h:76
@ LOGICAL_REP_MSG_BEGIN_PREPARE
Definition: logicalproto.h:69
@ LOGICAL_REP_MSG_STREAM_START
Definition: logicalproto.h:73
@ LOGICAL_REP_MSG_COMMIT
Definition: logicalproto.h:60
@ LOGICAL_REP_MSG_PREPARE
Definition: logicalproto.h:70
@ LOGICAL_REP_MSG_RELATION
Definition: logicalproto.h:66
@ LOGICAL_REP_MSG_MESSAGE
Definition: logicalproto.h:68
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
Definition: logicalproto.h:72
@ LOGICAL_REP_MSG_COMMIT_PREPARED
Definition: logicalproto.h:71
@ LOGICAL_REP_MSG_TYPE
Definition: logicalproto.h:67
@ LOGICAL_REP_MSG_DELETE
Definition: logicalproto.h:64
@ LOGICAL_REP_MSG_STREAM_COMMIT
Definition: logicalproto.h:75
@ LOGICAL_REP_MSG_ORIGIN
Definition: logicalproto.h:61
@ LOGICAL_REP_MSG_UPDATE
Definition: logicalproto.h:63
uint32 LogicalRepRelId
Definition: logicalproto.h:101
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:41
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:99
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:98
char * get_rel_name(Oid relid)
Definition: lsyscache.c:2068
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:3014
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3506
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition: lsyscache.c:3080
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1182
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1902
@ LW_SHARED
Definition: lwlock.h:115
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:2309
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:414
MemoryContext TopTransactionContext
Definition: mcxt.c:170
char * pstrdup(const char *in)
Definition: mcxt.c:2322
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:2167
void pfree(void *pointer)
Definition: mcxt.c:2147
void * palloc0(Size size)
Definition: mcxt.c:1970
MemoryContext TopMemoryContext
Definition: mcxt.c:165
void * palloc(Size size)
Definition: mcxt.c:1940
#define AllocSetContextCreate
Definition: memutils.h:149
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:180
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:136
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:134
Oid GetUserId(void)
Definition: miscinit.c:520
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition: miscinit.c:1039
CmdType
Definition: nodes.h:269
@ CMD_INSERT
Definition: nodes.h:273
@ CMD_DELETE
Definition: nodes.h:274
@ CMD_UPDATE
Definition: nodes.h:272
#define makeNode(_type_)
Definition: nodes.h:161
ObjectType get_relkind_objtype(char relkind)
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:165
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:226
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:257
RepOriginId replorigin_session_origin
Definition: origin.c:163
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1097
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1237
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:164
#define InvalidRepOriginId
Definition: origin.h:33
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
#define ACL_DELETE
Definition: parsenodes.h:79
uint64 AclMode
Definition: parsenodes.h:74
#define ACL_INSERT
Definition: parsenodes.h:76
#define ACL_UPDATE
Definition: parsenodes.h:78
@ RTE_RELATION
Definition: parsenodes.h:1026
@ DROP_RESTRICT
Definition: parsenodes.h:2390
#define ACL_SELECT
Definition: parsenodes.h:77
#define ACL_TRUNCATE
Definition: parsenodes.h:80
int16 attnum
Definition: pg_attribute.h:74
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:202
void * arg
static PgChecksumMode mode
Definition: pg_checksums.c:55
#define NAMEDATALEN
#define MAXPGPATH
const void size_t len
static int server_version
Definition: pg_dumpall.c:113
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:255
#define lfirst(lc)
Definition: pg_list.h:172
#define NIL
Definition: pg_list.h:68
#define list_make1(x1)
Definition: pg_list.h:212
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
#define lfirst_oid(lc)
Definition: pg_list.h:174
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
static char ** options
void FreeSubscription(Subscription *sub)
void DisableSubscription(Oid subid)
Subscription * GetSubscription(Oid subid, bool missing_ok)
FormData_pg_subscription * Form_pg_subscription
#define die(msg)
static char * buf
Definition: pg_test_fsync.c:72
long pgstat_report_stat(bool force)
Definition: pgstat.c:691
void pgstat_report_subscription_error(Oid subid, bool is_apply_error)
int64 timestamp
Expr * expression_planner(Expr *expr)
Definition: planner.c:6645
#define pqsignal
Definition: port.h:531
int pgsocket
Definition: port.h:29
#define snprintf
Definition: port.h:239
#define PGINVALID_SOCKET
Definition: port.h:31
uintptr_t Datum
Definition: postgres.h:69
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:257
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:207
#define InvalidOid
Definition: postgres_ext.h:35
unsigned int Oid
Definition: postgres_ext.h:30
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:415
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:453
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152
char * c
static int fd(const char *x, int i)
Definition: preproc-init.c:105
char * s2
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:98
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:561
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
Definition: proto.c:325
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
Definition: proto.c:134
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:754
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:487
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:615
void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
Definition: proto.c:1184
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:63
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
Definition: proto.c:267
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:698
const char * logicalrep_message_type(LogicalRepMsgType action)
Definition: proto.c:1209
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:365
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:1129
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:428
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:228
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition: proto.c:1079
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389
#define RelationGetRelid(relation)
Definition: rel.h:516
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:712
#define RelationGetDescr(relation)
Definition: rel.h:542
#define RelationGetRelationName(relation)
Definition: rel.h:550
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:669
#define RelationGetNamespace(relation)
Definition: rel.h:557
List * RelationGetIndexList(Relation relation)
Definition: relcache.c:4819
ResourceOwner TopTransactionResourceOwner
Definition: resowner.c:175
ResourceOwner CurrentResourceOwner
Definition: resowner.c:173
Node * build_column_default(Relation rel, int attrno)
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition: rls.c:52
@ RLS_ENABLED
Definition: rls.h:45
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:271
void PushActiveSnapshot(Snapshot snapshot)
Definition: snapmgr.c:669
void PopActiveSnapshot(void)
Definition: snapmgr.c:762
void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
Definition: relation.c:571
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition: relation.c:633
bool IsIndexUsableForReplicaIdentityFull(Relation idxrel, AttrMap *attrmap)
Definition: relation.c:821
Oid GetRelationIdentityOrPK(Relation rel)
Definition: relation.c:891
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:164
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:504
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:349
StringInfo makeStringInfo(void)
Definition: stringinfo.c:72
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:126
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition: stringinfo.h:157
TransactionId remote_xid
Definition: worker.c:227
LogicalRepMsgType command
Definition: worker.c:222
XLogRecPtr finish_lsn
Definition: worker.c:228
LogicalRepRelMapEntry * rel
Definition: worker.c:223
ResultRelInfo * targetRelInfo
Definition: worker.c:212
EState * estate
Definition: worker.c:209
PartitionTupleRouting * proute
Definition: worker.c:216
ModifyTableState * mtstate
Definition: worker.c:215
LogicalRepRelMapEntry * targetRel
Definition: worker.c:211
uint32 nsubxacts
Definition: worker.c:352
uint32 nsubxacts_max
Definition: worker.c:353
SubXactInfo * subxacts
Definition: worker.c:355
TransactionId subxact_last
Definition: worker.c:354
Definition: attmap.h:35
int maplen
Definition: attmap.h:37
AttrNumber * attnums
Definition: attmap.h:36
TimestampTz ts
Definition: conflict.h:68
RepOriginId origin
Definition: conflict.h:67
TransactionId xmin
Definition: conflict.h:65
TupleTableSlot * slot
Definition: conflict.h:61
List * es_rteperminfos
Definition: execnodes.h:665
List * es_tupleTable
Definition: execnodes.h:710
List * es_opened_result_relations
Definition: execnodes.h:686
CommandId es_output_cid
Definition: execnodes.h:680
struct ErrorContextCallback * previous
Definition: elog.h:297
void(* callback)(void *arg)
Definition: elog.h:298
dlist_node node
Definition: worker.c:200
XLogRecPtr remote_end
Definition: worker.c:202
XLogRecPtr local_end
Definition: worker.c:201
ItemPointerData t_self
Definition: htup.h:65
Definition: pg_list.h:54
XLogRecPtr final_lsn
Definition: logicalproto.h:129
TransactionId xid
Definition: logicalproto.h:131
TimestampTz committime
Definition: logicalproto.h:138
LogicalRepRelation remoterel
StringInfoData * colvalues
Definition: logicalproto.h:87
TimestampTz last_recv_time
LogicalRepWorkerType type
TimestampTz reply_time
FileSet * stream_fileset
XLogRecPtr reply_lsn
XLogRecPtr last_lsn
TimestampTz last_send_time
CmdType operation
Definition: execnodes.h:1398
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1402
PlanState ps
Definition: execnodes.h:1397
ParallelApplyWorkerShared * shared
pg_atomic_uint32 pending_stream_count
Plan * plan
Definition: execnodes.h:1159
EState * state
Definition: execnodes.h:1161
Bitmapset * updatedCols
Definition: parsenodes.h:1309
RTEKind rtekind
Definition: parsenodes.h:1061
Form_pg_class rd_rel
Definition: rel.h:111
TupleTableSlot * ri_PartitionTupleSlot
Definition: execnodes.h:616
List * ri_onConflictArbiterIndexes
Definition: execnodes.h:575
Relation ri_RelationDesc
Definition: execnodes.h:475
RelationPtr ri_IndexRelationDescs
Definition: execnodes.h:481
off_t offset
Definition: worker.c:346
TransactionId xid
Definition: worker.c:344
int fileno
Definition: worker.c:345
XLogRecPtr skiplsn
AttrMap * attrMap
Definition: tupconvert.h:28
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:123
bool * tts_isnull
Definition: tuptable.h:127
Datum * tts_values
Definition: tuptable.h:125
dlist_node * cur
Definition: ilist.h:200
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:91
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:92
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs, bool run_as_table_owner)
Definition: tablecmds.c:1974
bool AllTablesyncsReady(void)
Definition: tablesync.c:1738
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:280
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:666
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1763
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdIsValid(xid)
Definition: transam.h:41
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:5088
void AfterTriggerBeginQuery(void)
Definition: trigger.c:5053
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:102
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:192
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:160
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:458
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:372
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:525
void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res, int szgid)
Definition: twophase.c:2682
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
Definition: twophase.c:2623
void FinishPreparedTransaction(const char *gid, bool isCommit)
Definition: twophase.c:1487
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition: usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition: usercontext.c:87
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
#define WL_SOCKET_READABLE
Definition: waiteventset.h:35
#define WL_TIMEOUT
Definition: waiteventset.h:37
#define WL_EXIT_ON_PM_DEATH
Definition: waiteventset.h:39
#define WL_LATCH_SET
Definition: waiteventset.h:34
static StringInfoData reply_message
Definition: walreceiver.c:132
int wal_receiver_status_interval
Definition: walreceiver.c:88
int wal_receiver_timeout
Definition: walreceiver.c:89
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:451
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:435
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:457
#define walrcv_server_version(conn)
Definition: walreceiver.h:447
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:453
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:443
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:455
int WalWriterDelay
Definition: walwriter.c:70
#define SIGHUP
Definition: win32_port.h:158
@ PARALLEL_TRANS_STARTED
@ PARALLEL_TRANS_FINISHED
static bool am_parallel_apply_worker(void)
@ WORKERTYPE_TABLESYNC
@ WORKERTYPE_UNKNOWN
@ WORKERTYPE_PARALLEL_APPLY
@ WORKERTYPE_APPLY
@ FS_SERIALIZE_DONE
static bool am_tablesync_worker(void)
static bool am_leader_apply_worker(void)
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4989
bool PrepareTransactionBlock(const char *gid)
Definition: xact.c:3992
bool IsTransactionState(void)
Definition: xact.c:387
void CommandCounterIncrement(void)
Definition: xact.c:1100
void StartTransactionCommand(void)
Definition: xact.c:3059
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:914
bool IsTransactionBlock(void)
Definition: xact.c:4971
void BeginTransactionBlock(void)
Definition: xact.c:3924
void CommitTransactionCommand(void)
Definition: xact.c:3157
bool EndTransactionBlock(bool chain)
Definition: xact.c:4044
void AbortOutOfAnyTransaction(void)
Definition: xact.c:4862
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:829
#define GIDSIZE
Definition: xact.h:31
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6687
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:255
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint32 TimeLineID
Definition: xlogdefs.h:59