PostgreSQL Source Code  git master
worker.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * worker.c
3  * PostgreSQL logical replication worker (apply)
4  *
5  * Copyright (c) 2016-2024, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/worker.c
9  *
10  * NOTES
11  * This file contains the worker which applies logical changes as they come
12  * from remote logical replication stream.
13  *
14  * The main worker (apply) is started by logical replication worker
15  * launcher for every enabled subscription in a database. It uses
16  * walsender protocol to communicate with publisher.
17  *
18  * This module includes server facing code and shares libpqwalreceiver
19  * module with walreceiver for providing the libpq specific functionality.
20  *
21  *
22  * STREAMED TRANSACTIONS
23  * ---------------------
24  * Streamed transactions (large transactions exceeding a memory limit on the
25  * upstream) are applied using one of two approaches:
26  *
27  * 1) Write to temporary files and apply when the final commit arrives
28  *
29  * This approach is used when the user has set the subscription's streaming
30  * option as on.
31  *
32  * Unlike the regular (non-streamed) case, handling streamed transactions has
33  * to handle aborts of both the toplevel transaction and subtransactions. This
34  * is achieved by tracking offsets for subtransactions, which is then used
35  * to truncate the file with serialized changes.
36  *
37  * The files are placed in tmp file directory by default, and the filenames
38  * include both the XID of the toplevel transaction and OID of the
39  * subscription. This is necessary so that different workers processing a
40  * remote transaction with the same XID doesn't interfere.
41  *
42  * We use BufFiles instead of using normal temporary files because (a) the
43  * BufFile infrastructure supports temporary files that exceed the OS file size
44  * limit, (b) provides a way for automatic clean up on the error and (c) provides
45  * a way to survive these files across local transactions and allow to open and
46  * close at stream start and close. We decided to use FileSet
47  * infrastructure as without that it deletes the files on the closure of the
48  * file and if we decide to keep stream files open across the start/stop stream
49  * then it will consume a lot of memory (more than 8K for each BufFile and
50  * there could be multiple such BufFiles as the subscriber could receive
51  * multiple start/stop streams for different transactions before getting the
52  * commit). Moreover, if we don't use FileSet then we also need to invent
53  * a new way to pass filenames to BufFile APIs so that we are allowed to open
54  * the file we desired across multiple stream-open calls for the same
55  * transaction.
56  *
57  * 2) Parallel apply workers.
58  *
59  * This approach is used when the user has set the subscription's streaming
60  * option as parallel. See logical/applyparallelworker.c for information about
61  * this approach.
62  *
63  * TWO_PHASE TRANSACTIONS
64  * ----------------------
65  * Two phase transactions are replayed at prepare and then committed or
66  * rolled back at commit prepared and rollback prepared respectively. It is
67  * possible to have a prepared transaction that arrives at the apply worker
68  * when the tablesync is busy doing the initial copy. In this case, the apply
69  * worker skips all the prepared operations [e.g. inserts] while the tablesync
70  * is still busy (see the condition of should_apply_changes_for_rel). The
71  * tablesync worker might not get such a prepared transaction because say it
72  * was prior to the initial consistent point but might have got some later
73  * commits. Now, the tablesync worker will exit without doing anything for the
74  * prepared transaction skipped by the apply worker as the sync location for it
75  * will be already ahead of the apply worker's current location. This would lead
76  * to an "empty prepare", because later when the apply worker does the commit
77  * prepare, there is nothing in it (the inserts were skipped earlier).
78  *
79  * To avoid this, and similar prepare confusions the subscription's two_phase
80  * commit is enabled only after the initial sync is over. The two_phase option
81  * has been implemented as a tri-state with values DISABLED, PENDING, and
82  * ENABLED.
83  *
84  * Even if the user specifies they want a subscription with two_phase = on,
85  * internally it will start with a tri-state of PENDING which only becomes
86  * ENABLED after all tablesync initializations are completed - i.e. when all
87  * tablesync workers have reached their READY state. In other words, the value
88  * PENDING is only a temporary state for subscription start-up.
89  *
90  * Until the two_phase is properly available (ENABLED) the subscription will
91  * behave as if two_phase = off. When the apply worker detects that all
92  * tablesyncs have become READY (while the tri-state was PENDING) it will
93  * restart the apply worker process. This happens in
94  * process_syncing_tables_for_apply.
95  *
96  * When the (re-started) apply worker finds that all tablesyncs are READY for a
97  * two_phase tri-state of PENDING it start streaming messages with the
98  * two_phase option which in turn enables the decoding of two-phase commits at
99  * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100  * Now, it is possible that during the time we have not enabled two_phase, the
101  * publisher (replication server) would have skipped some prepares but we
102  * ensure that such prepares are sent along with commit prepare, see
103  * ReorderBufferFinishPrepared.
104  *
105  * If the subscription has no tables then a two_phase tri-state PENDING is
106  * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107  * PUBLICATION which might otherwise be disallowed (see below).
108  *
109  * If ever a user needs to be aware of the tri-state value, they can fetch it
110  * from the pg_subscription catalog (see column subtwophasestate).
111  *
112  * We don't allow to toggle two_phase option of a subscription because it can
113  * lead to an inconsistent replica. Consider, initially, it was on and we have
114  * received some prepare then we turn it off, now at commit time the server
115  * will send the entire transaction data along with the commit. With some more
116  * analysis, we can allow changing this option from off to on but not sure if
117  * that alone would be useful.
118  *
119  * Finally, to avoid problems mentioned in previous paragraphs from any
120  * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
121  * to 'off' and then again back to 'on') there is a restriction for
122  * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
123  * the two_phase tri-state is ENABLED, except when copy_data = false.
124  *
125  * We can get prepare of the same GID more than once for the genuine cases
126  * where we have defined multiple subscriptions for publications on the same
127  * server and prepared transaction has operations on tables subscribed to those
128  * subscriptions. For such cases, if we use the GID sent by publisher one of
129  * the prepares will be successful and others will fail, in which case the
130  * server will send them again. Now, this can lead to a deadlock if user has
131  * set synchronous_standby_names for all the subscriptions on subscriber. To
132  * avoid such deadlocks, we generate a unique GID (consisting of the
133  * subscription oid and the xid of the prepared transaction) for each prepare
134  * transaction on the subscriber.
135  *
136  * FAILOVER
137  * ----------------------
138  * The logical slot on the primary can be synced to the standby by specifying
139  * failover = true when creating the subscription. Enabling failover allows us
140  * to smoothly transition to the promoted standby, ensuring that we can
141  * subscribe to the new primary without losing any data.
142  *-------------------------------------------------------------------------
143  */
144 
145 #include "postgres.h"
146 
147 #include <sys/stat.h>
148 #include <unistd.h>
149 
150 #include "access/genam.h"
151 #include "access/table.h"
152 #include "access/tableam.h"
153 #include "access/twophase.h"
154 #include "access/xact.h"
155 #include "access/xlog_internal.h"
156 #include "catalog/catalog.h"
157 #include "catalog/indexing.h"
158 #include "catalog/namespace.h"
159 #include "catalog/partition.h"
160 #include "catalog/pg_inherits.h"
161 #include "catalog/pg_subscription.h"
163 #include "catalog/pg_tablespace.h"
164 #include "commands/tablecmds.h"
165 #include "commands/tablespace.h"
166 #include "commands/trigger.h"
167 #include "executor/executor.h"
168 #include "executor/execPartition.h"
170 #include "funcapi.h"
171 #include "libpq/pqformat.h"
172 #include "libpq/pqsignal.h"
173 #include "mb/pg_wchar.h"
174 #include "miscadmin.h"
175 #include "nodes/makefuncs.h"
176 #include "optimizer/optimizer.h"
177 #include "parser/parse_relation.h"
178 #include "pgstat.h"
179 #include "postmaster/bgworker.h"
180 #include "postmaster/interrupt.h"
181 #include "postmaster/postmaster.h"
182 #include "postmaster/walwriter.h"
183 #include "replication/decode.h"
184 #include "replication/logical.h"
189 #include "replication/origin.h"
191 #include "replication/snapbuild.h"
192 #include "replication/walreceiver.h"
194 #include "rewrite/rewriteHandler.h"
195 #include "storage/buffile.h"
196 #include "storage/bufmgr.h"
197 #include "storage/fd.h"
198 #include "storage/ipc.h"
199 #include "storage/lmgr.h"
200 #include "storage/proc.h"
201 #include "storage/procarray.h"
202 #include "tcop/tcopprot.h"
203 #include "utils/acl.h"
204 #include "utils/builtins.h"
205 #include "utils/catcache.h"
206 #include "utils/dynahash.h"
207 #include "utils/datum.h"
208 #include "utils/fmgroids.h"
209 #include "utils/guc.h"
210 #include "utils/inval.h"
211 #include "utils/lsyscache.h"
212 #include "utils/memutils.h"
213 #include "utils/pg_lsn.h"
214 #include "utils/rel.h"
215 #include "utils/rls.h"
216 #include "utils/syscache.h"
217 #include "utils/timeout.h"
218 #include "utils/usercontext.h"
219 
220 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
221 
222 typedef struct FlushPosition
223 {
228 
230 
231 typedef struct ApplyExecutionData
232 {
233  EState *estate; /* executor state, used to track resources */
234 
235  LogicalRepRelMapEntry *targetRel; /* replication target rel */
236  ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
237 
238  /* These fields are used when the target relation is partitioned: */
239  ModifyTableState *mtstate; /* dummy ModifyTable state */
240  PartitionTupleRouting *proute; /* partition routing info */
242 
243 /* Struct for saving and restoring apply errcontext information */
244 typedef struct ApplyErrorCallbackArg
245 {
246  LogicalRepMsgType command; /* 0 if invalid */
248 
249  /* Remote node information */
250  int remote_attnum; /* -1 if invalid */
253  char *origin_name;
255 
256 /*
257  * The action to be taken for the changes in the transaction.
258  *
259  * TRANS_LEADER_APPLY:
260  * This action means that we are in the leader apply worker or table sync
261  * worker. The changes of the transaction are either directly applied or
262  * are read from temporary files (for streaming transactions) and then
263  * applied by the worker.
264  *
265  * TRANS_LEADER_SERIALIZE:
266  * This action means that we are in the leader apply worker or table sync
267  * worker. Changes are written to temporary files and then applied when the
268  * final commit arrives.
269  *
270  * TRANS_LEADER_SEND_TO_PARALLEL:
271  * This action means that we are in the leader apply worker and need to send
272  * the changes to the parallel apply worker.
273  *
274  * TRANS_LEADER_PARTIAL_SERIALIZE:
275  * This action means that we are in the leader apply worker and have sent some
276  * changes directly to the parallel apply worker and the remaining changes are
277  * serialized to a file, due to timeout while sending data. The parallel apply
278  * worker will apply these serialized changes when the final commit arrives.
279  *
280  * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
281  * serializing changes, the leader worker also needs to serialize the
282  * STREAM_XXX message to a file, and wait for the parallel apply worker to
283  * finish the transaction when processing the transaction finish command. So
284  * this new action was introduced to keep the code and logic clear.
285  *
286  * TRANS_PARALLEL_APPLY:
287  * This action means that we are in the parallel apply worker and changes of
288  * the transaction are applied directly by the worker.
289  */
290 typedef enum
291 {
292  /* The action for non-streaming transactions. */
294 
295  /* Actions for streaming transactions. */
301 
302 /* errcontext tracker */
304 {
305  .command = 0,
306  .rel = NULL,
307  .remote_attnum = -1,
308  .remote_xid = InvalidTransactionId,
309  .finish_lsn = InvalidXLogRecPtr,
310  .origin_name = NULL,
311 };
312 
314 
317 
318 /* per stream context for streaming transactions */
320 
322 
324 static bool MySubscriptionValid = false;
325 
327 
330 
331 /* fields valid only when processing streamed transaction */
332 static bool in_streamed_transaction = false;
333 
335 
336 /*
337  * The number of changes applied by parallel apply worker during one streaming
338  * block.
339  */
341 
342 /* Are we initializing an apply worker? */
344 
345 /*
346  * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
347  * the subscription if the remote transaction's finish LSN matches the subskiplsn.
348  * Once we start skipping changes, we don't stop it until we skip all changes of
349  * the transaction even if pg_subscription is updated and MySubscription->skiplsn
350  * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
351  * we don't skip receiving and spooling the changes since we decide whether or not
352  * to skip applying the changes when starting to apply changes. The subskiplsn is
353  * cleared after successfully skipping the transaction or applying non-empty
354  * transaction. The latter prevents the mistakenly specified subskiplsn from
355  * being left. Note that we cannot skip the streaming transactions when using
356  * parallel apply workers because we cannot get the finish LSN before applying
357  * the changes. So, we don't start parallel apply worker when finish LSN is set
358  * by the user.
359  */
361 #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
362 
363 /* BufFile handle of the current streaming file */
364 static BufFile *stream_fd = NULL;
365 
366 typedef struct SubXactInfo
367 {
368  TransactionId xid; /* XID of the subxact */
369  int fileno; /* file number in the buffile */
370  off_t offset; /* offset in the file */
372 
373 /* Sub-transaction data for the current streaming transaction */
374 typedef struct ApplySubXactData
375 {
376  uint32 nsubxacts; /* number of sub-transactions */
377  uint32 nsubxacts_max; /* current capacity of subxacts */
378  TransactionId subxact_last; /* xid of the last sub-transaction */
379  SubXactInfo *subxacts; /* sub-xact offset in changes file */
381 
383 
384 static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
385 static inline void changes_filename(char *path, Oid subid, TransactionId xid);
386 
387 /*
388  * Information about subtransactions of a given toplevel transaction.
389  */
390 static void subxact_info_write(Oid subid, TransactionId xid);
391 static void subxact_info_read(Oid subid, TransactionId xid);
392 static void subxact_info_add(TransactionId xid);
393 static inline void cleanup_subxact_info(void);
394 
395 /*
396  * Serialize and deserialize changes for a toplevel transaction.
397  */
398 static void stream_open_file(Oid subid, TransactionId xid,
399  bool first_segment);
400 static void stream_write_change(char action, StringInfo s);
402 static void stream_close_file(void);
403 
404 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
405 
406 static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
408  ResultRelInfo *relinfo,
409  TupleTableSlot *remoteslot);
411  ResultRelInfo *relinfo,
412  TupleTableSlot *remoteslot,
413  LogicalRepTupleData *newtup,
414  Oid localindexoid);
416  ResultRelInfo *relinfo,
417  TupleTableSlot *remoteslot,
418  Oid localindexoid);
419 static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
420  LogicalRepRelation *remoterel,
421  Oid localidxoid,
422  TupleTableSlot *remoteslot,
423  TupleTableSlot **localslot);
425  TupleTableSlot *remoteslot,
426  LogicalRepTupleData *newtup,
427  CmdType operation);
428 
429 /* Compute GID for two_phase transactions */
430 static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
431 
432 /* Functions for skipping changes */
433 static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
434 static void stop_skipping_changes(void);
435 static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
436 
437 /* Functions for apply error callback */
438 static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
439 static inline void reset_apply_error_context_info(void);
440 
442  ParallelApplyWorkerInfo **winfo);
443 
444 /*
445  * Form the origin name for the subscription.
446  *
447  * This is a common function for tablesync and other workers. Tablesync workers
448  * must pass a valid relid. Other callers must pass relid = InvalidOid.
449  *
450  * Return the name in the supplied buffer.
451  */
452 void
454  char *originname, Size szoriginname)
455 {
456  if (OidIsValid(relid))
457  {
458  /* Replication origin name for tablesync workers. */
459  snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
460  }
461  else
462  {
463  /* Replication origin name for non-tablesync workers. */
464  snprintf(originname, szoriginname, "pg_%u", suboid);
465  }
466 }
467 
468 /*
469  * Should this worker apply changes for given relation.
470  *
471  * This is mainly needed for initial relation data sync as that runs in
472  * separate worker process running in parallel and we need some way to skip
473  * changes coming to the leader apply worker during the sync of a table.
474  *
475  * Note we need to do smaller or equals comparison for SYNCDONE state because
476  * it might hold position of end of initial slot consistent point WAL
477  * record + 1 (ie start of next record) and next record can be COMMIT of
478  * transaction we are now processing (which is what we set remote_final_lsn
479  * to in apply_handle_begin).
480  *
481  * Note that for streaming transactions that are being applied in the parallel
482  * apply worker, we disallow applying changes if the target table in the
483  * subscription is not in the READY state, because we cannot decide whether to
484  * apply the change as we won't know remote_final_lsn by that time.
485  *
486  * We already checked this in pa_can_start() before assigning the
487  * streaming transaction to the parallel worker, but it also needs to be
488  * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
489  * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
490  * while applying this transaction.
491  */
492 static bool
494 {
495  switch (MyLogicalRepWorker->type)
496  {
498  return MyLogicalRepWorker->relid == rel->localreloid;
499 
501  /* We don't synchronize rel's that are in unknown state. */
502  if (rel->state != SUBREL_STATE_READY &&
503  rel->state != SUBREL_STATE_UNKNOWN)
504  ereport(ERROR,
505  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
506  errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
508  errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
509 
510  return rel->state == SUBREL_STATE_READY;
511 
512  case WORKERTYPE_APPLY:
513  return (rel->state == SUBREL_STATE_READY ||
514  (rel->state == SUBREL_STATE_SYNCDONE &&
515  rel->statelsn <= remote_final_lsn));
516 
517  case WORKERTYPE_UNKNOWN:
518  /* Should never happen. */
519  elog(ERROR, "Unknown worker type");
520  }
521 
522  return false; /* dummy for compiler */
523 }
524 
525 /*
526  * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
527  *
528  * Start a transaction, if this is the first step (else we keep using the
529  * existing transaction).
530  * Also provide a global snapshot and ensure we run in ApplyMessageContext.
531  */
532 static void
534 {
536 
537  if (!IsTransactionState())
538  {
541  }
542 
544 
546 }
547 
548 /*
549  * Finish up one step of a replication transaction.
550  * Callers of begin_replication_step() must also call this.
551  *
552  * We don't close out the transaction here, but we should increment
553  * the command counter to make the effects of this step visible.
554  */
555 static void
557 {
559 
561 }
562 
563 /*
564  * Handle streamed transactions for both the leader apply worker and the
565  * parallel apply workers.
566  *
567  * In the streaming case (receiving a block of the streamed transaction), for
568  * serialize mode, simply redirect it to a file for the proper toplevel
569  * transaction, and for parallel mode, the leader apply worker will send the
570  * changes to parallel apply workers and the parallel apply worker will define
571  * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
572  * messages will be applied by both leader apply worker and parallel apply
573  * workers).
574  *
575  * Returns true for streamed transactions (when the change is either serialized
576  * to file or sent to parallel apply worker), false otherwise (regular mode or
577  * needs to be processed by parallel apply worker).
578  *
579  * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
580  * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
581  * to a parallel apply worker.
582  */
583 static bool
585 {
586  TransactionId current_xid;
588  TransApplyAction apply_action;
589  StringInfoData original_msg;
590 
591  apply_action = get_transaction_apply_action(stream_xid, &winfo);
592 
593  /* not in streaming mode */
594  if (apply_action == TRANS_LEADER_APPLY)
595  return false;
596 
598 
599  /*
600  * The parallel apply worker needs the xid in this message to decide
601  * whether to define a savepoint, so save the original message that has
602  * not moved the cursor after the xid. We will serialize this message to a
603  * file in PARTIAL_SERIALIZE mode.
604  */
605  original_msg = *s;
606 
607  /*
608  * We should have received XID of the subxact as the first part of the
609  * message, so extract it.
610  */
611  current_xid = pq_getmsgint(s, 4);
612 
613  if (!TransactionIdIsValid(current_xid))
614  ereport(ERROR,
615  (errcode(ERRCODE_PROTOCOL_VIOLATION),
616  errmsg_internal("invalid transaction ID in streamed replication transaction")));
617 
618  switch (apply_action)
619  {
621  Assert(stream_fd);
622 
623  /* Add the new subxact to the array (unless already there). */
624  subxact_info_add(current_xid);
625 
626  /* Write the change to the current file */
628  return true;
629 
631  Assert(winfo);
632 
633  /*
634  * XXX The publisher side doesn't always send relation/type update
635  * messages after the streaming transaction, so also update the
636  * relation/type in leader apply worker. See function
637  * cleanup_rel_sync_cache.
638  */
639  if (pa_send_data(winfo, s->len, s->data))
640  return (action != LOGICAL_REP_MSG_RELATION &&
642 
643  /*
644  * Switch to serialize mode when we are not able to send the
645  * change to parallel apply worker.
646  */
647  pa_switch_to_partial_serialize(winfo, false);
648 
649  /* fall through */
651  stream_write_change(action, &original_msg);
652 
653  /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
654  return (action != LOGICAL_REP_MSG_RELATION &&
656 
659 
660  /* Define a savepoint for a subxact if needed. */
661  pa_start_subtrans(current_xid, stream_xid);
662  return false;
663 
664  default:
665  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
666  return false; /* silence compiler warning */
667  }
668 }
669 
670 /*
671  * Executor state preparation for evaluation of constraint expressions,
672  * indexes and triggers for the specified relation.
673  *
674  * Note that the caller must open and close any indexes to be updated.
675  */
676 static ApplyExecutionData *
678 {
679  ApplyExecutionData *edata;
680  EState *estate;
681  RangeTblEntry *rte;
682  List *perminfos = NIL;
683  ResultRelInfo *resultRelInfo;
684 
685  edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
686  edata->targetRel = rel;
687 
688  edata->estate = estate = CreateExecutorState();
689 
690  rte = makeNode(RangeTblEntry);
691  rte->rtekind = RTE_RELATION;
692  rte->relid = RelationGetRelid(rel->localrel);
693  rte->relkind = rel->localrel->rd_rel->relkind;
695 
696  addRTEPermissionInfo(&perminfos, rte);
697 
698  ExecInitRangeTable(estate, list_make1(rte), perminfos);
699 
700  edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
701 
702  /*
703  * Use Relation opened by logicalrep_rel_open() instead of opening it
704  * again.
705  */
706  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
707 
708  /*
709  * We put the ResultRelInfo in the es_opened_result_relations list, even
710  * though we don't populate the es_result_relations array. That's a bit
711  * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
712  *
713  * ExecOpenIndices() is not called here either, each execution path doing
714  * an apply operation being responsible for that.
715  */
717  lappend(estate->es_opened_result_relations, resultRelInfo);
718 
719  estate->es_output_cid = GetCurrentCommandId(true);
720 
721  /* Prepare to catch AFTER triggers. */
723 
724  /* other fields of edata remain NULL for now */
725 
726  return edata;
727 }
728 
729 /*
730  * Finish any operations related to the executor state created by
731  * create_edata_for_relation().
732  */
733 static void
735 {
736  EState *estate = edata->estate;
737 
738  /* Handle any queued AFTER triggers. */
739  AfterTriggerEndQuery(estate);
740 
741  /* Shut down tuple routing, if any was done. */
742  if (edata->proute)
743  ExecCleanupTupleRouting(edata->mtstate, edata->proute);
744 
745  /*
746  * Cleanup. It might seem that we should call ExecCloseResultRelations()
747  * here, but we intentionally don't. It would close the rel we added to
748  * es_opened_result_relations above, which is wrong because we took no
749  * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
750  * any other relations opened during execution.
751  */
752  ExecResetTupleTable(estate->es_tupleTable, false);
753  FreeExecutorState(estate);
754  pfree(edata);
755 }
756 
757 /*
758  * Executes default values for columns for which we can't map to remote
759  * relation columns.
760  *
761  * This allows us to support tables which have more columns on the downstream
762  * than on the upstream.
763  */
764 static void
766  TupleTableSlot *slot)
767 {
768  TupleDesc desc = RelationGetDescr(rel->localrel);
769  int num_phys_attrs = desc->natts;
770  int i;
771  int attnum,
772  num_defaults = 0;
773  int *defmap;
774  ExprState **defexprs;
775  ExprContext *econtext;
776 
777  econtext = GetPerTupleExprContext(estate);
778 
779  /* We got all the data via replication, no need to evaluate anything. */
780  if (num_phys_attrs == rel->remoterel.natts)
781  return;
782 
783  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
784  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
785 
786  Assert(rel->attrmap->maplen == num_phys_attrs);
787  for (attnum = 0; attnum < num_phys_attrs; attnum++)
788  {
789  Expr *defexpr;
790 
791  if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
792  continue;
793 
794  if (rel->attrmap->attnums[attnum] >= 0)
795  continue;
796 
797  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
798 
799  if (defexpr != NULL)
800  {
801  /* Run the expression through planner */
802  defexpr = expression_planner(defexpr);
803 
804  /* Initialize executable expression in copycontext */
805  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
806  defmap[num_defaults] = attnum;
807  num_defaults++;
808  }
809  }
810 
811  for (i = 0; i < num_defaults; i++)
812  slot->tts_values[defmap[i]] =
813  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
814 }
815 
816 /*
817  * Store tuple data into slot.
818  *
819  * Incoming data can be either text or binary format.
820  */
821 static void
823  LogicalRepTupleData *tupleData)
824 {
825  int natts = slot->tts_tupleDescriptor->natts;
826  int i;
827 
828  ExecClearTuple(slot);
829 
830  /* Call the "in" function for each non-dropped, non-null attribute */
831  Assert(natts == rel->attrmap->maplen);
832  for (i = 0; i < natts; i++)
833  {
835  int remoteattnum = rel->attrmap->attnums[i];
836 
837  if (!att->attisdropped && remoteattnum >= 0)
838  {
839  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
840 
841  Assert(remoteattnum < tupleData->ncols);
842 
843  /* Set attnum for error callback */
845 
846  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
847  {
848  Oid typinput;
849  Oid typioparam;
850 
851  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
852  slot->tts_values[i] =
853  OidInputFunctionCall(typinput, colvalue->data,
854  typioparam, att->atttypmod);
855  slot->tts_isnull[i] = false;
856  }
857  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
858  {
859  Oid typreceive;
860  Oid typioparam;
861 
862  /*
863  * In some code paths we may be asked to re-parse the same
864  * tuple data. Reset the StringInfo's cursor so that works.
865  */
866  colvalue->cursor = 0;
867 
868  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
869  slot->tts_values[i] =
870  OidReceiveFunctionCall(typreceive, colvalue,
871  typioparam, att->atttypmod);
872 
873  /* Trouble if it didn't eat the whole buffer */
874  if (colvalue->cursor != colvalue->len)
875  ereport(ERROR,
876  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
877  errmsg("incorrect binary data format in logical replication column %d",
878  remoteattnum + 1)));
879  slot->tts_isnull[i] = false;
880  }
881  else
882  {
883  /*
884  * NULL value from remote. (We don't expect to see
885  * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
886  * NULL.)
887  */
888  slot->tts_values[i] = (Datum) 0;
889  slot->tts_isnull[i] = true;
890  }
891 
892  /* Reset attnum for error callback */
894  }
895  else
896  {
897  /*
898  * We assign NULL to dropped attributes and missing values
899  * (missing values should be later filled using
900  * slot_fill_defaults).
901  */
902  slot->tts_values[i] = (Datum) 0;
903  slot->tts_isnull[i] = true;
904  }
905  }
906 
907  ExecStoreVirtualTuple(slot);
908 }
909 
910 /*
911  * Replace updated columns with data from the LogicalRepTupleData struct.
912  * This is somewhat similar to heap_modify_tuple but also calls the type
913  * input functions on the user data.
914  *
915  * "slot" is filled with a copy of the tuple in "srcslot", replacing
916  * columns provided in "tupleData" and leaving others as-is.
917  *
918  * Caution: unreplaced pass-by-ref columns in "slot" will point into the
919  * storage for "srcslot". This is OK for current usage, but someday we may
920  * need to materialize "slot" at the end to make it independent of "srcslot".
921  */
922 static void
925  LogicalRepTupleData *tupleData)
926 {
927  int natts = slot->tts_tupleDescriptor->natts;
928  int i;
929 
930  /* We'll fill "slot" with a virtual tuple, so we must start with ... */
931  ExecClearTuple(slot);
932 
933  /*
934  * Copy all the column data from srcslot, so that we'll have valid values
935  * for unreplaced columns.
936  */
937  Assert(natts == srcslot->tts_tupleDescriptor->natts);
938  slot_getallattrs(srcslot);
939  memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
940  memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
941 
942  /* Call the "in" function for each replaced attribute */
943  Assert(natts == rel->attrmap->maplen);
944  for (i = 0; i < natts; i++)
945  {
947  int remoteattnum = rel->attrmap->attnums[i];
948 
949  if (remoteattnum < 0)
950  continue;
951 
952  Assert(remoteattnum < tupleData->ncols);
953 
954  if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
955  {
956  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
957 
958  /* Set attnum for error callback */
960 
961  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
962  {
963  Oid typinput;
964  Oid typioparam;
965 
966  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
967  slot->tts_values[i] =
968  OidInputFunctionCall(typinput, colvalue->data,
969  typioparam, att->atttypmod);
970  slot->tts_isnull[i] = false;
971  }
972  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
973  {
974  Oid typreceive;
975  Oid typioparam;
976 
977  /*
978  * In some code paths we may be asked to re-parse the same
979  * tuple data. Reset the StringInfo's cursor so that works.
980  */
981  colvalue->cursor = 0;
982 
983  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
984  slot->tts_values[i] =
985  OidReceiveFunctionCall(typreceive, colvalue,
986  typioparam, att->atttypmod);
987 
988  /* Trouble if it didn't eat the whole buffer */
989  if (colvalue->cursor != colvalue->len)
990  ereport(ERROR,
991  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
992  errmsg("incorrect binary data format in logical replication column %d",
993  remoteattnum + 1)));
994  slot->tts_isnull[i] = false;
995  }
996  else
997  {
998  /* must be LOGICALREP_COLUMN_NULL */
999  slot->tts_values[i] = (Datum) 0;
1000  slot->tts_isnull[i] = true;
1001  }
1002 
1003  /* Reset attnum for error callback */
1005  }
1006  }
1007 
1008  /* And finally, declare that "slot" contains a valid virtual tuple */
1009  ExecStoreVirtualTuple(slot);
1010 }
1011 
1012 /*
1013  * Handle BEGIN message.
1014  */
1015 static void
1017 {
1018  LogicalRepBeginData begin_data;
1019 
1020  /* There must not be an active streaming transaction. */
1022 
1023  logicalrep_read_begin(s, &begin_data);
1024  set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
1025 
1026  remote_final_lsn = begin_data.final_lsn;
1027 
1029 
1030  in_remote_transaction = true;
1031 
1033 }
1034 
1035 /*
1036  * Handle COMMIT message.
1037  *
1038  * TODO, support tracking of multiple origins
1039  */
1040 static void
1042 {
1043  LogicalRepCommitData commit_data;
1044 
1045  logicalrep_read_commit(s, &commit_data);
1046 
1047  if (commit_data.commit_lsn != remote_final_lsn)
1048  ereport(ERROR,
1049  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1050  errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
1051  LSN_FORMAT_ARGS(commit_data.commit_lsn),
1053 
1054  apply_handle_commit_internal(&commit_data);
1055 
1056  /* Process any tables that are being synchronized in parallel. */
1057  process_syncing_tables(commit_data.end_lsn);
1058 
1061 }
1062 
1063 /*
1064  * Handle BEGIN PREPARE message.
1065  */
1066 static void
1068 {
1069  LogicalRepPreparedTxnData begin_data;
1070 
1071  /* Tablesync should never receive prepare. */
1072  if (am_tablesync_worker())
1073  ereport(ERROR,
1074  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1075  errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1076 
1077  /* There must not be an active streaming transaction. */
1079 
1080  logicalrep_read_begin_prepare(s, &begin_data);
1081  set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1082 
1083  remote_final_lsn = begin_data.prepare_lsn;
1084 
1086 
1087  in_remote_transaction = true;
1088 
1090 }
1091 
1092 /*
1093  * Common function to prepare the GID.
1094  */
1095 static void
1097 {
1098  char gid[GIDSIZE];
1099 
1100  /*
1101  * Compute unique GID for two_phase transactions. We don't use GID of
1102  * prepared transaction sent by server as that can lead to deadlock when
1103  * we have multiple subscriptions from same node point to publications on
1104  * the same node. See comments atop worker.c
1105  */
1106  TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
1107  gid, sizeof(gid));
1108 
1109  /*
1110  * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1111  * called within the PrepareTransactionBlock below.
1112  */
1113  if (!IsTransactionBlock())
1114  {
1116  CommitTransactionCommand(); /* Completes the preceding Begin command. */
1117  }
1118 
1119  /*
1120  * Update origin state so we can restart streaming from correct position
1121  * in case of crash.
1122  */
1123  replorigin_session_origin_lsn = prepare_data->end_lsn;
1125 
1127 }
1128 
1129 /*
1130  * Handle PREPARE message.
1131  */
1132 static void
1134 {
1135  LogicalRepPreparedTxnData prepare_data;
1136 
1137  logicalrep_read_prepare(s, &prepare_data);
1138 
1139  if (prepare_data.prepare_lsn != remote_final_lsn)
1140  ereport(ERROR,
1141  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1142  errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
1143  LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1145 
1146  /*
1147  * Unlike commit, here, we always prepare the transaction even though no
1148  * change has happened in this transaction or all changes are skipped. It
1149  * is done this way because at commit prepared time, we won't know whether
1150  * we have skipped preparing a transaction because of those reasons.
1151  *
1152  * XXX, We can optimize such that at commit prepared time, we first check
1153  * whether we have prepared the transaction or not but that doesn't seem
1154  * worthwhile because such cases shouldn't be common.
1155  */
1157 
1158  apply_handle_prepare_internal(&prepare_data);
1159 
1162  pgstat_report_stat(false);
1163 
1165 
1166  in_remote_transaction = false;
1167 
1168  /* Process any tables that are being synchronized in parallel. */
1169  process_syncing_tables(prepare_data.end_lsn);
1170 
1171  /*
1172  * Since we have already prepared the transaction, in a case where the
1173  * server crashes before clearing the subskiplsn, it will be left but the
1174  * transaction won't be resent. But that's okay because it's a rare case
1175  * and the subskiplsn will be cleared when finishing the next transaction.
1176  */
1179 
1182 }
1183 
1184 /*
1185  * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1186  *
1187  * Note that we don't need to wait here if the transaction was prepared in a
1188  * parallel apply worker. In that case, we have already waited for the prepare
1189  * to finish in apply_handle_stream_prepare() which will ensure all the
1190  * operations in that transaction have happened in the subscriber, so no
1191  * concurrent transaction can cause deadlock or transaction dependency issues.
1192  */
1193 static void
1195 {
1196  LogicalRepCommitPreparedTxnData prepare_data;
1197  char gid[GIDSIZE];
1198 
1199  logicalrep_read_commit_prepared(s, &prepare_data);
1200  set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1201 
1202  /* Compute GID for two_phase transactions. */
1204  gid, sizeof(gid));
1205 
1206  /* There is no transaction when COMMIT PREPARED is called */
1208 
1209  /*
1210  * Update origin state so we can restart streaming from correct position
1211  * in case of crash.
1212  */
1213  replorigin_session_origin_lsn = prepare_data.end_lsn;
1215 
1216  FinishPreparedTransaction(gid, true);
1219  pgstat_report_stat(false);
1220 
1222  in_remote_transaction = false;
1223 
1224  /* Process any tables that are being synchronized in parallel. */
1225  process_syncing_tables(prepare_data.end_lsn);
1226 
1227  clear_subscription_skip_lsn(prepare_data.end_lsn);
1228 
1231 }
1232 
1233 /*
1234  * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1235  *
1236  * Note that we don't need to wait here if the transaction was prepared in a
1237  * parallel apply worker. In that case, we have already waited for the prepare
1238  * to finish in apply_handle_stream_prepare() which will ensure all the
1239  * operations in that transaction have happened in the subscriber, so no
1240  * concurrent transaction can cause deadlock or transaction dependency issues.
1241  */
1242 static void
1244 {
1245  LogicalRepRollbackPreparedTxnData rollback_data;
1246  char gid[GIDSIZE];
1247 
1248  logicalrep_read_rollback_prepared(s, &rollback_data);
1249  set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1250 
1251  /* Compute GID for two_phase transactions. */
1252  TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1253  gid, sizeof(gid));
1254 
1255  /*
1256  * It is possible that we haven't received prepare because it occurred
1257  * before walsender reached a consistent point or the two_phase was still
1258  * not enabled by that time, so in such cases, we need to skip rollback
1259  * prepared.
1260  */
1261  if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1262  rollback_data.prepare_time))
1263  {
1264  /*
1265  * Update origin state so we can restart streaming from correct
1266  * position in case of crash.
1267  */
1270 
1271  /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1273  FinishPreparedTransaction(gid, false);
1276 
1278  }
1279 
1280  pgstat_report_stat(false);
1281 
1283  in_remote_transaction = false;
1284 
1285  /* Process any tables that are being synchronized in parallel. */
1287 
1290 }
1291 
1292 /*
1293  * Handle STREAM PREPARE.
1294  */
1295 static void
1297 {
1298  LogicalRepPreparedTxnData prepare_data;
1299  ParallelApplyWorkerInfo *winfo;
1300  TransApplyAction apply_action;
1301 
1302  /* Save the message before it is consumed. */
1303  StringInfoData original_msg = *s;
1304 
1306  ereport(ERROR,
1307  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1308  errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1309 
1310  /* Tablesync should never receive prepare. */
1311  if (am_tablesync_worker())
1312  ereport(ERROR,
1313  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1314  errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1315 
1316  logicalrep_read_stream_prepare(s, &prepare_data);
1317  set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1318 
1319  apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1320 
1321  switch (apply_action)
1322  {
1323  case TRANS_LEADER_APPLY:
1324 
1325  /*
1326  * The transaction has been serialized to file, so replay all the
1327  * spooled operations.
1328  */
1330  prepare_data.xid, prepare_data.prepare_lsn);
1331 
1332  /* Mark the transaction as prepared. */
1333  apply_handle_prepare_internal(&prepare_data);
1334 
1336 
1338 
1339  in_remote_transaction = false;
1340 
1341  /* Unlink the files with serialized changes and subxact info. */
1343 
1344  elog(DEBUG1, "finished processing the STREAM PREPARE command");
1345  break;
1346 
1348  Assert(winfo);
1349 
1350  if (pa_send_data(winfo, s->len, s->data))
1351  {
1352  /* Finish processing the streaming transaction. */
1353  pa_xact_finish(winfo, prepare_data.end_lsn);
1354  break;
1355  }
1356 
1357  /*
1358  * Switch to serialize mode when we are not able to send the
1359  * change to parallel apply worker.
1360  */
1361  pa_switch_to_partial_serialize(winfo, true);
1362 
1363  /* fall through */
1365  Assert(winfo);
1366 
1367  stream_open_and_write_change(prepare_data.xid,
1369  &original_msg);
1370 
1372 
1373  /* Finish processing the streaming transaction. */
1374  pa_xact_finish(winfo, prepare_data.end_lsn);
1375  break;
1376 
1377  case TRANS_PARALLEL_APPLY:
1378 
1379  /*
1380  * If the parallel apply worker is applying spooled messages then
1381  * close the file before preparing.
1382  */
1383  if (stream_fd)
1385 
1387 
1388  /* Mark the transaction as prepared. */
1389  apply_handle_prepare_internal(&prepare_data);
1390 
1392 
1394 
1396 
1399 
1401 
1402  elog(DEBUG1, "finished processing the STREAM PREPARE command");
1403  break;
1404 
1405  default:
1406  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1407  break;
1408  }
1409 
1410  pgstat_report_stat(false);
1411 
1412  /* Process any tables that are being synchronized in parallel. */
1413  process_syncing_tables(prepare_data.end_lsn);
1414 
1415  /*
1416  * Similar to prepare case, the subskiplsn could be left in a case of
1417  * server crash but it's okay. See the comments in apply_handle_prepare().
1418  */
1421 
1423 
1425 }
1426 
1427 /*
1428  * Handle ORIGIN message.
1429  *
1430  * TODO, support tracking of multiple origins
1431  */
1432 static void
1434 {
1435  /*
1436  * ORIGIN message can only come inside streaming transaction or inside
1437  * remote transaction and before any actual writes.
1438  */
1439  if (!in_streamed_transaction &&
1442  ereport(ERROR,
1443  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1444  errmsg_internal("ORIGIN message sent out of order")));
1445 }
1446 
1447 /*
1448  * Initialize fileset (if not already done).
1449  *
1450  * Create a new file when first_segment is true, otherwise open the existing
1451  * file.
1452  */
1453 void
1454 stream_start_internal(TransactionId xid, bool first_segment)
1455 {
1457 
1458  /*
1459  * Initialize the worker's stream_fileset if we haven't yet. This will be
1460  * used for the entire duration of the worker so create it in a permanent
1461  * context. We create this on the very first streaming message from any
1462  * transaction and then use it for this and other streaming transactions.
1463  * Now, we could create a fileset at the start of the worker as well but
1464  * then we won't be sure that it will ever be used.
1465  */
1467  {
1468  MemoryContext oldctx;
1469 
1471 
1474 
1475  MemoryContextSwitchTo(oldctx);
1476  }
1477 
1478  /* Open the spool file for this transaction. */
1479  stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1480 
1481  /* If this is not the first segment, open existing subxact file. */
1482  if (!first_segment)
1484 
1486 }
1487 
1488 /*
1489  * Handle STREAM START message.
1490  */
1491 static void
1493 {
1494  bool first_segment;
1495  ParallelApplyWorkerInfo *winfo;
1496  TransApplyAction apply_action;
1497 
1498  /* Save the message before it is consumed. */
1499  StringInfoData original_msg = *s;
1500 
1502  ereport(ERROR,
1503  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1504  errmsg_internal("duplicate STREAM START message")));
1505 
1506  /* There must not be an active streaming transaction. */
1508 
1509  /* notify handle methods we're processing a remote transaction */
1510  in_streamed_transaction = true;
1511 
1512  /* extract XID of the top-level transaction */
1513  stream_xid = logicalrep_read_stream_start(s, &first_segment);
1514 
1516  ereport(ERROR,
1517  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1518  errmsg_internal("invalid transaction ID in streamed replication transaction")));
1519 
1521 
1522  /* Try to allocate a worker for the streaming transaction. */
1523  if (first_segment)
1525 
1526  apply_action = get_transaction_apply_action(stream_xid, &winfo);
1527 
1528  switch (apply_action)
1529  {
1531 
1532  /*
1533  * Function stream_start_internal starts a transaction. This
1534  * transaction will be committed on the stream stop unless it is a
1535  * tablesync worker in which case it will be committed after
1536  * processing all the messages. We need this transaction for
1537  * handling the BufFile, used for serializing the streaming data
1538  * and subxact info.
1539  */
1540  stream_start_internal(stream_xid, first_segment);
1541  break;
1542 
1544  Assert(winfo);
1545 
1546  /*
1547  * Once we start serializing the changes, the parallel apply
1548  * worker will wait for the leader to release the stream lock
1549  * until the end of the transaction. So, we don't need to release
1550  * the lock or increment the stream count in that case.
1551  */
1552  if (pa_send_data(winfo, s->len, s->data))
1553  {
1554  /*
1555  * Unlock the shared object lock so that the parallel apply
1556  * worker can continue to receive changes.
1557  */
1558  if (!first_segment)
1560 
1561  /*
1562  * Increment the number of streaming blocks waiting to be
1563  * processed by parallel apply worker.
1564  */
1566 
1567  /* Cache the parallel apply worker for this transaction. */
1569  break;
1570  }
1571 
1572  /*
1573  * Switch to serialize mode when we are not able to send the
1574  * change to parallel apply worker.
1575  */
1576  pa_switch_to_partial_serialize(winfo, !first_segment);
1577 
1578  /* fall through */
1580  Assert(winfo);
1581 
1582  /*
1583  * Open the spool file unless it was already opened when switching
1584  * to serialize mode. The transaction started in
1585  * stream_start_internal will be committed on the stream stop.
1586  */
1587  if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1588  stream_start_internal(stream_xid, first_segment);
1589 
1591 
1592  /* Cache the parallel apply worker for this transaction. */
1594  break;
1595 
1596  case TRANS_PARALLEL_APPLY:
1597  if (first_segment)
1598  {
1599  /* Hold the lock until the end of the transaction. */
1602 
1603  /*
1604  * Signal the leader apply worker, as it may be waiting for
1605  * us.
1606  */
1608  }
1609 
1611  break;
1612 
1613  default:
1614  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1615  break;
1616  }
1617 
1619 }
1620 
1621 /*
1622  * Update the information about subxacts and close the file.
1623  *
1624  * This function should be called when the stream_start_internal function has
1625  * been called.
1626  */
1627 void
1629 {
1630  /*
1631  * Serialize information about subxacts for the toplevel transaction, then
1632  * close the stream messages spool file.
1633  */
1636 
1637  /* We must be in a valid transaction state */
1639 
1640  /* Commit the per-stream transaction */
1642 
1643  /* Reset per-stream context */
1645 }
1646 
1647 /*
1648  * Handle STREAM STOP message.
1649  */
1650 static void
1652 {
1653  ParallelApplyWorkerInfo *winfo;
1654  TransApplyAction apply_action;
1655 
1657  ereport(ERROR,
1658  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1659  errmsg_internal("STREAM STOP message without STREAM START")));
1660 
1661  apply_action = get_transaction_apply_action(stream_xid, &winfo);
1662 
1663  switch (apply_action)
1664  {
1667  break;
1668 
1670  Assert(winfo);
1671 
1672  /*
1673  * Lock before sending the STREAM_STOP message so that the leader
1674  * can hold the lock first and the parallel apply worker will wait
1675  * for leader to release the lock. See Locking Considerations atop
1676  * applyparallelworker.c.
1677  */
1679 
1680  if (pa_send_data(winfo, s->len, s->data))
1681  {
1683  break;
1684  }
1685 
1686  /*
1687  * Switch to serialize mode when we are not able to send the
1688  * change to parallel apply worker.
1689  */
1690  pa_switch_to_partial_serialize(winfo, true);
1691 
1692  /* fall through */
1697  break;
1698 
1699  case TRANS_PARALLEL_APPLY:
1700  elog(DEBUG1, "applied %u changes in the streaming chunk",
1702 
1703  /*
1704  * By the time parallel apply worker is processing the changes in
1705  * the current streaming block, the leader apply worker may have
1706  * sent multiple streaming blocks. This can lead to parallel apply
1707  * worker start waiting even when there are more chunk of streams
1708  * in the queue. So, try to lock only if there is no message left
1709  * in the queue. See Locking Considerations atop
1710  * applyparallelworker.c.
1711  *
1712  * Note that here we have a race condition where we can start
1713  * waiting even when there are pending streaming chunks. This can
1714  * happen if the leader sends another streaming block and acquires
1715  * the stream lock again after the parallel apply worker checks
1716  * that there is no pending streaming block and before it actually
1717  * starts waiting on a lock. We can handle this case by not
1718  * allowing the leader to increment the stream block count during
1719  * the time parallel apply worker acquires the lock but it is not
1720  * clear whether that is worth the complexity.
1721  *
1722  * Now, if this missed chunk contains rollback to savepoint, then
1723  * there is a risk of deadlock which probably shouldn't happen
1724  * after restart.
1725  */
1727  break;
1728 
1729  default:
1730  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1731  break;
1732  }
1733 
1734  in_streamed_transaction = false;
1736 
1737  /*
1738  * The parallel apply worker could be in a transaction in which case we
1739  * need to report the state as STATE_IDLEINTRANSACTION.
1740  */
1743  else
1745 
1747 }
1748 
1749 /*
1750  * Helper function to handle STREAM ABORT message when the transaction was
1751  * serialized to file.
1752  */
1753 static void
1755 {
1756  /*
1757  * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1758  * just delete the files with serialized info.
1759  */
1760  if (xid == subxid)
1762  else
1763  {
1764  /*
1765  * OK, so it's a subxact. We need to read the subxact file for the
1766  * toplevel transaction, determine the offset tracked for the subxact,
1767  * and truncate the file with changes. We also remove the subxacts
1768  * with higher offsets (or rather higher XIDs).
1769  *
1770  * We intentionally scan the array from the tail, because we're likely
1771  * aborting a change for the most recent subtransactions.
1772  *
1773  * We can't use the binary search here as subxact XIDs won't
1774  * necessarily arrive in sorted order, consider the case where we have
1775  * released the savepoint for multiple subtransactions and then
1776  * performed rollback to savepoint for one of the earlier
1777  * sub-transaction.
1778  */
1779  int64 i;
1780  int64 subidx;
1781  BufFile *fd;
1782  bool found = false;
1783  char path[MAXPGPATH];
1784 
1785  subidx = -1;
1788 
1789  for (i = subxact_data.nsubxacts; i > 0; i--)
1790  {
1791  if (subxact_data.subxacts[i - 1].xid == subxid)
1792  {
1793  subidx = (i - 1);
1794  found = true;
1795  break;
1796  }
1797  }
1798 
1799  /*
1800  * If it's an empty sub-transaction then we will not find the subxid
1801  * here so just cleanup the subxact info and return.
1802  */
1803  if (!found)
1804  {
1805  /* Cleanup the subxact info */
1809  return;
1810  }
1811 
1812  /* open the changes file */
1815  O_RDWR, false);
1816 
1817  /* OK, truncate the file at the right offset */
1819  subxact_data.subxacts[subidx].offset);
1820  BufFileClose(fd);
1821 
1822  /* discard the subxacts added later */
1823  subxact_data.nsubxacts = subidx;
1824 
1825  /* write the updated subxact list */
1827 
1830  }
1831 }
1832 
1833 /*
1834  * Handle STREAM ABORT message.
1835  */
1836 static void
1838 {
1839  TransactionId xid;
1840  TransactionId subxid;
1841  LogicalRepStreamAbortData abort_data;
1842  ParallelApplyWorkerInfo *winfo;
1843  TransApplyAction apply_action;
1844 
1845  /* Save the message before it is consumed. */
1846  StringInfoData original_msg = *s;
1847  bool toplevel_xact;
1848 
1850  ereport(ERROR,
1851  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1852  errmsg_internal("STREAM ABORT message without STREAM STOP")));
1853 
1854  /* We receive abort information only when we can apply in parallel. */
1855  logicalrep_read_stream_abort(s, &abort_data,
1857 
1858  xid = abort_data.xid;
1859  subxid = abort_data.subxid;
1860  toplevel_xact = (xid == subxid);
1861 
1862  set_apply_error_context_xact(subxid, abort_data.abort_lsn);
1863 
1864  apply_action = get_transaction_apply_action(xid, &winfo);
1865 
1866  switch (apply_action)
1867  {
1868  case TRANS_LEADER_APPLY:
1869 
1870  /*
1871  * We are in the leader apply worker and the transaction has been
1872  * serialized to file.
1873  */
1874  stream_abort_internal(xid, subxid);
1875 
1876  elog(DEBUG1, "finished processing the STREAM ABORT command");
1877  break;
1878 
1880  Assert(winfo);
1881 
1882  /*
1883  * For the case of aborting the subtransaction, we increment the
1884  * number of streaming blocks and take the lock again before
1885  * sending the STREAM_ABORT to ensure that the parallel apply
1886  * worker will wait on the lock for the next set of changes after
1887  * processing the STREAM_ABORT message if it is not already
1888  * waiting for STREAM_STOP message.
1889  *
1890  * It is important to perform this locking before sending the
1891  * STREAM_ABORT message so that the leader can hold the lock first
1892  * and the parallel apply worker will wait for the leader to
1893  * release the lock. This is the same as what we do in
1894  * apply_handle_stream_stop. See Locking Considerations atop
1895  * applyparallelworker.c.
1896  */
1897  if (!toplevel_xact)
1898  {
1902  }
1903 
1904  if (pa_send_data(winfo, s->len, s->data))
1905  {
1906  /*
1907  * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
1908  * wait here for the parallel apply worker to finish as that
1909  * is not required to maintain the commit order and won't have
1910  * the risk of failures due to transaction dependencies and
1911  * deadlocks. However, it is possible that before the parallel
1912  * worker finishes and we clear the worker info, the xid
1913  * wraparound happens on the upstream and a new transaction
1914  * with the same xid can appear and that can lead to duplicate
1915  * entries in ParallelApplyTxnHash. Yet another problem could
1916  * be that we may have serialized the changes in partial
1917  * serialize mode and the file containing xact changes may
1918  * already exist, and after xid wraparound trying to create
1919  * the file for the same xid can lead to an error. To avoid
1920  * these problems, we decide to wait for the aborts to finish.
1921  *
1922  * Note, it is okay to not update the flush location position
1923  * for aborts as in worst case that means such a transaction
1924  * won't be sent again after restart.
1925  */
1926  if (toplevel_xact)
1928 
1929  break;
1930  }
1931 
1932  /*
1933  * Switch to serialize mode when we are not able to send the
1934  * change to parallel apply worker.
1935  */
1936  pa_switch_to_partial_serialize(winfo, true);
1937 
1938  /* fall through */
1940  Assert(winfo);
1941 
1942  /*
1943  * Parallel apply worker might have applied some changes, so write
1944  * the STREAM_ABORT message so that it can rollback the
1945  * subtransaction if needed.
1946  */
1948  &original_msg);
1949 
1950  if (toplevel_xact)
1951  {
1954  }
1955  break;
1956 
1957  case TRANS_PARALLEL_APPLY:
1958 
1959  /*
1960  * If the parallel apply worker is applying spooled messages then
1961  * close the file before aborting.
1962  */
1963  if (toplevel_xact && stream_fd)
1965 
1966  pa_stream_abort(&abort_data);
1967 
1968  /*
1969  * We need to wait after processing rollback to savepoint for the
1970  * next set of changes.
1971  *
1972  * We have a race condition here due to which we can start waiting
1973  * here when there are more chunk of streams in the queue. See
1974  * apply_handle_stream_stop.
1975  */
1976  if (!toplevel_xact)
1978 
1979  elog(DEBUG1, "finished processing the STREAM ABORT command");
1980  break;
1981 
1982  default:
1983  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1984  break;
1985  }
1986 
1988 }
1989 
1990 /*
1991  * Ensure that the passed location is fileset's end.
1992  */
1993 static void
1994 ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
1995  off_t offset)
1996 {
1997  char path[MAXPGPATH];
1998  BufFile *fd;
1999  int last_fileno;
2000  off_t last_offset;
2001 
2003 
2005 
2007 
2008  fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2009 
2010  BufFileSeek(fd, 0, 0, SEEK_END);
2011  BufFileTell(fd, &last_fileno, &last_offset);
2012 
2013  BufFileClose(fd);
2014 
2016 
2017  if (last_fileno != fileno || last_offset != offset)
2018  elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2019  path);
2020 }
2021 
2022 /*
2023  * Common spoolfile processing.
2024  */
2025 void
2027  XLogRecPtr lsn)
2028 {
2029  int nchanges;
2030  char path[MAXPGPATH];
2031  char *buffer = NULL;
2032  MemoryContext oldcxt;
2033  ResourceOwner oldowner;
2034  int fileno;
2035  off_t offset;
2036 
2037  if (!am_parallel_apply_worker())
2039 
2040  /* Make sure we have an open transaction */
2042 
2043  /*
2044  * Allocate file handle and memory required to process all the messages in
2045  * TopTransactionContext to avoid them getting reset after each message is
2046  * processed.
2047  */
2049 
2050  /* Open the spool file for the committed/prepared transaction */
2052  elog(DEBUG1, "replaying changes from file \"%s\"", path);
2053 
2054  /*
2055  * Make sure the file is owned by the toplevel transaction so that the
2056  * file will not be accidentally closed when aborting a subtransaction.
2057  */
2058  oldowner = CurrentResourceOwner;
2060 
2061  stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2062 
2063  CurrentResourceOwner = oldowner;
2064 
2065  buffer = palloc(BLCKSZ);
2066 
2067  MemoryContextSwitchTo(oldcxt);
2068 
2069  remote_final_lsn = lsn;
2070 
2071  /*
2072  * Make sure the handle apply_dispatch methods are aware we're in a remote
2073  * transaction.
2074  */
2075  in_remote_transaction = true;
2077 
2079 
2080  /*
2081  * Read the entries one by one and pass them through the same logic as in
2082  * apply_dispatch.
2083  */
2084  nchanges = 0;
2085  while (true)
2086  {
2088  size_t nbytes;
2089  int len;
2090 
2092 
2093  /* read length of the on-disk record */
2094  nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2095 
2096  /* have we reached end of the file? */
2097  if (nbytes == 0)
2098  break;
2099 
2100  /* do we have a correct length? */
2101  if (len <= 0)
2102  elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2103  len, path);
2104 
2105  /* make sure we have sufficiently large buffer */
2106  buffer = repalloc(buffer, len);
2107 
2108  /* and finally read the data into the buffer */
2109  BufFileReadExact(stream_fd, buffer, len);
2110 
2111  BufFileTell(stream_fd, &fileno, &offset);
2112 
2113  /* init a stringinfo using the buffer and call apply_dispatch */
2114  initReadOnlyStringInfo(&s2, buffer, len);
2115 
2116  /* Ensure we are reading the data into our memory context. */
2118 
2119  apply_dispatch(&s2);
2120 
2122 
2123  MemoryContextSwitchTo(oldcxt);
2124 
2125  nchanges++;
2126 
2127  /*
2128  * It is possible the file has been closed because we have processed
2129  * the transaction end message like stream_commit in which case that
2130  * must be the last message.
2131  */
2132  if (!stream_fd)
2133  {
2134  ensure_last_message(stream_fileset, xid, fileno, offset);
2135  break;
2136  }
2137 
2138  if (nchanges % 1000 == 0)
2139  elog(DEBUG1, "replayed %d changes from file \"%s\"",
2140  nchanges, path);
2141  }
2142 
2143  if (stream_fd)
2145 
2146  elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2147  nchanges, path);
2148 
2149  return;
2150 }
2151 
2152 /*
2153  * Handle STREAM COMMIT message.
2154  */
2155 static void
2157 {
2158  TransactionId xid;
2159  LogicalRepCommitData commit_data;
2160  ParallelApplyWorkerInfo *winfo;
2161  TransApplyAction apply_action;
2162 
2163  /* Save the message before it is consumed. */
2164  StringInfoData original_msg = *s;
2165 
2167  ereport(ERROR,
2168  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2169  errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2170 
2171  xid = logicalrep_read_stream_commit(s, &commit_data);
2172  set_apply_error_context_xact(xid, commit_data.commit_lsn);
2173 
2174  apply_action = get_transaction_apply_action(xid, &winfo);
2175 
2176  switch (apply_action)
2177  {
2178  case TRANS_LEADER_APPLY:
2179 
2180  /*
2181  * The transaction has been serialized to file, so replay all the
2182  * spooled operations.
2183  */
2185  commit_data.commit_lsn);
2186 
2187  apply_handle_commit_internal(&commit_data);
2188 
2189  /* Unlink the files with serialized changes and subxact info. */
2191 
2192  elog(DEBUG1, "finished processing the STREAM COMMIT command");
2193  break;
2194 
2196  Assert(winfo);
2197 
2198  if (pa_send_data(winfo, s->len, s->data))
2199  {
2200  /* Finish processing the streaming transaction. */
2201  pa_xact_finish(winfo, commit_data.end_lsn);
2202  break;
2203  }
2204 
2205  /*
2206  * Switch to serialize mode when we are not able to send the
2207  * change to parallel apply worker.
2208  */
2209  pa_switch_to_partial_serialize(winfo, true);
2210 
2211  /* fall through */
2213  Assert(winfo);
2214 
2216  &original_msg);
2217 
2219 
2220  /* Finish processing the streaming transaction. */
2221  pa_xact_finish(winfo, commit_data.end_lsn);
2222  break;
2223 
2224  case TRANS_PARALLEL_APPLY:
2225 
2226  /*
2227  * If the parallel apply worker is applying spooled messages then
2228  * close the file before committing.
2229  */
2230  if (stream_fd)
2232 
2233  apply_handle_commit_internal(&commit_data);
2234 
2236 
2237  /*
2238  * It is important to set the transaction state as finished before
2239  * releasing the lock. See pa_wait_for_xact_finish.
2240  */
2243 
2245 
2246  elog(DEBUG1, "finished processing the STREAM COMMIT command");
2247  break;
2248 
2249  default:
2250  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2251  break;
2252  }
2253 
2254  /* Process any tables that are being synchronized in parallel. */
2255  process_syncing_tables(commit_data.end_lsn);
2256 
2258 
2260 }
2261 
2262 /*
2263  * Helper function for apply_handle_commit and apply_handle_stream_commit.
2264  */
2265 static void
2267 {
2268  if (is_skipping_changes())
2269  {
2271 
2272  /*
2273  * Start a new transaction to clear the subskiplsn, if not started
2274  * yet.
2275  */
2276  if (!IsTransactionState())
2278  }
2279 
2280  if (IsTransactionState())
2281  {
2282  /*
2283  * The transaction is either non-empty or skipped, so we clear the
2284  * subskiplsn.
2285  */
2287 
2288  /*
2289  * Update origin state so we can restart streaming from correct
2290  * position in case of crash.
2291  */
2292  replorigin_session_origin_lsn = commit_data->end_lsn;
2294 
2296 
2297  if (IsTransactionBlock())
2298  {
2299  EndTransactionBlock(false);
2301  }
2302 
2303  pgstat_report_stat(false);
2304 
2306  }
2307  else
2308  {
2309  /* Process any invalidation messages that might have accumulated. */
2312  }
2313 
2314  in_remote_transaction = false;
2315 }
2316 
2317 /*
2318  * Handle RELATION message.
2319  *
2320  * Note we don't do validation against local schema here. The validation
2321  * against local schema is postponed until first change for given relation
2322  * comes as we only care about it when applying changes for it anyway and we
2323  * do less locking this way.
2324  */
2325 static void
2327 {
2328  LogicalRepRelation *rel;
2329 
2331  return;
2332 
2333  rel = logicalrep_read_rel(s);
2335 
2336  /* Also reset all entries in the partition map that refer to remoterel. */
2338 }
2339 
2340 /*
2341  * Handle TYPE message.
2342  *
2343  * This implementation pays no attention to TYPE messages; we expect the user
2344  * to have set things up so that the incoming data is acceptable to the input
2345  * functions for the locally subscribed tables. Hence, we just read and
2346  * discard the message.
2347  */
2348 static void
2350 {
2351  LogicalRepTyp typ;
2352 
2354  return;
2355 
2356  logicalrep_read_typ(s, &typ);
2357 }
2358 
2359 /*
2360  * Check that we (the subscription owner) have sufficient privileges on the
2361  * target relation to perform the given operation.
2362  */
2363 static void
2365 {
2366  Oid relid;
2367  AclResult aclresult;
2368 
2369  relid = RelationGetRelid(rel);
2370  aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2371  if (aclresult != ACLCHECK_OK)
2372  aclcheck_error(aclresult,
2373  get_relkind_objtype(rel->rd_rel->relkind),
2374  get_rel_name(relid));
2375 
2376  /*
2377  * We lack the infrastructure to honor RLS policies. It might be possible
2378  * to add such infrastructure here, but tablesync workers lack it, too, so
2379  * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2380  * but it seems dangerous to replicate a TRUNCATE and then refuse to
2381  * replicate subsequent INSERTs, so we forbid all commands the same.
2382  */
2383  if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2384  ereport(ERROR,
2385  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2386  errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2387  GetUserNameFromId(GetUserId(), true),
2388  RelationGetRelationName(rel))));
2389 }
2390 
2391 /*
2392  * Handle INSERT message.
2393  */
2394 
2395 static void
2397 {
2398  LogicalRepRelMapEntry *rel;
2399  LogicalRepTupleData newtup;
2400  LogicalRepRelId relid;
2401  UserContext ucxt;
2402  ApplyExecutionData *edata;
2403  EState *estate;
2404  TupleTableSlot *remoteslot;
2405  MemoryContext oldctx;
2406  bool run_as_owner;
2407 
2408  /*
2409  * Quick return if we are skipping data modification changes or handling
2410  * streamed transactions.
2411  */
2412  if (is_skipping_changes() ||
2414  return;
2415 
2417 
2418  relid = logicalrep_read_insert(s, &newtup);
2419  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2420  if (!should_apply_changes_for_rel(rel))
2421  {
2422  /*
2423  * The relation can't become interesting in the middle of the
2424  * transaction so it's safe to unlock it.
2425  */
2428  return;
2429  }
2430 
2431  /*
2432  * Make sure that any user-supplied code runs as the table owner, unless
2433  * the user has opted out of that behavior.
2434  */
2435  run_as_owner = MySubscription->runasowner;
2436  if (!run_as_owner)
2437  SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2438 
2439  /* Set relation for error callback */
2441 
2442  /* Initialize the executor state. */
2443  edata = create_edata_for_relation(rel);
2444  estate = edata->estate;
2445  remoteslot = ExecInitExtraTupleSlot(estate,
2446  RelationGetDescr(rel->localrel),
2447  &TTSOpsVirtual);
2448 
2449  /* Process and store remote tuple in the slot */
2451  slot_store_data(remoteslot, rel, &newtup);
2452  slot_fill_defaults(rel, estate, remoteslot);
2453  MemoryContextSwitchTo(oldctx);
2454 
2455  /* For a partitioned table, insert the tuple into a partition. */
2456  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2458  remoteslot, NULL, CMD_INSERT);
2459  else
2461  remoteslot);
2462 
2463  finish_edata(edata);
2464 
2465  /* Reset relation for error callback */
2467 
2468  if (!run_as_owner)
2469  RestoreUserContext(&ucxt);
2470 
2472 
2474 }
2475 
2476 /*
2477  * Workhorse for apply_handle_insert()
2478  * relinfo is for the relation we're actually inserting into
2479  * (could be a child partition of edata->targetRelInfo)
2480  */
2481 static void
2483  ResultRelInfo *relinfo,
2484  TupleTableSlot *remoteslot)
2485 {
2486  EState *estate = edata->estate;
2487 
2488  /* We must open indexes here. */
2489  ExecOpenIndices(relinfo, false);
2490 
2491  /* Do the insert. */
2493  ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2494 
2495  /* Cleanup. */
2496  ExecCloseIndices(relinfo);
2497 }
2498 
2499 /*
2500  * Check if the logical replication relation is updatable and throw
2501  * appropriate error if it isn't.
2502  */
2503 static void
2505 {
2506  /*
2507  * For partitioned tables, we only need to care if the target partition is
2508  * updatable (aka has PK or RI defined for it).
2509  */
2510  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2511  return;
2512 
2513  /* Updatable, no error. */
2514  if (rel->updatable)
2515  return;
2516 
2517  /*
2518  * We are in error mode so it's fine this is somewhat slow. It's better to
2519  * give user correct error.
2520  */
2522  {
2523  ereport(ERROR,
2524  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2525  errmsg("publisher did not send replica identity column "
2526  "expected by the logical replication target relation \"%s.%s\"",
2527  rel->remoterel.nspname, rel->remoterel.relname)));
2528  }
2529 
2530  ereport(ERROR,
2531  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2532  errmsg("logical replication target relation \"%s.%s\" has "
2533  "neither REPLICA IDENTITY index nor PRIMARY "
2534  "KEY and published relation does not have "
2535  "REPLICA IDENTITY FULL",
2536  rel->remoterel.nspname, rel->remoterel.relname)));
2537 }
2538 
2539 /*
2540  * Handle UPDATE message.
2541  *
2542  * TODO: FDW support
2543  */
2544 static void
2546 {
2547  LogicalRepRelMapEntry *rel;
2548  LogicalRepRelId relid;
2549  UserContext ucxt;
2550  ApplyExecutionData *edata;
2551  EState *estate;
2552  LogicalRepTupleData oldtup;
2553  LogicalRepTupleData newtup;
2554  bool has_oldtup;
2555  TupleTableSlot *remoteslot;
2556  RTEPermissionInfo *target_perminfo;
2557  MemoryContext oldctx;
2558  bool run_as_owner;
2559 
2560  /*
2561  * Quick return if we are skipping data modification changes or handling
2562  * streamed transactions.
2563  */
2564  if (is_skipping_changes() ||
2566  return;
2567 
2569 
2570  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2571  &newtup);
2572  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2573  if (!should_apply_changes_for_rel(rel))
2574  {
2575  /*
2576  * The relation can't become interesting in the middle of the
2577  * transaction so it's safe to unlock it.
2578  */
2581  return;
2582  }
2583 
2584  /* Set relation for error callback */
2586 
2587  /* Check if we can do the update. */
2589 
2590  /*
2591  * Make sure that any user-supplied code runs as the table owner, unless
2592  * the user has opted out of that behavior.
2593  */
2594  run_as_owner = MySubscription->runasowner;
2595  if (!run_as_owner)
2596  SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2597 
2598  /* Initialize the executor state. */
2599  edata = create_edata_for_relation(rel);
2600  estate = edata->estate;
2601  remoteslot = ExecInitExtraTupleSlot(estate,
2602  RelationGetDescr(rel->localrel),
2603  &TTSOpsVirtual);
2604 
2605  /*
2606  * Populate updatedCols so that per-column triggers can fire, and so
2607  * executor can correctly pass down indexUnchanged hint. This could
2608  * include more columns than were actually changed on the publisher
2609  * because the logical replication protocol doesn't contain that
2610  * information. But it would for example exclude columns that only exist
2611  * on the subscriber, since we are not touching those.
2612  */
2613  target_perminfo = list_nth(estate->es_rteperminfos, 0);
2614  for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2615  {
2617  int remoteattnum = rel->attrmap->attnums[i];
2618 
2619  if (!att->attisdropped && remoteattnum >= 0)
2620  {
2621  Assert(remoteattnum < newtup.ncols);
2622  if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2623  target_perminfo->updatedCols =
2624  bms_add_member(target_perminfo->updatedCols,
2626  }
2627  }
2628 
2629  /* Build the search tuple. */
2631  slot_store_data(remoteslot, rel,
2632  has_oldtup ? &oldtup : &newtup);
2633  MemoryContextSwitchTo(oldctx);
2634 
2635  /* For a partitioned table, apply update to correct partition. */
2636  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2638  remoteslot, &newtup, CMD_UPDATE);
2639  else
2641  remoteslot, &newtup, rel->localindexoid);
2642 
2643  finish_edata(edata);
2644 
2645  /* Reset relation for error callback */
2647 
2648  if (!run_as_owner)
2649  RestoreUserContext(&ucxt);
2650 
2652 
2654 }
2655 
2656 /*
2657  * Workhorse for apply_handle_update()
2658  * relinfo is for the relation we're actually updating in
2659  * (could be a child partition of edata->targetRelInfo)
2660  */
2661 static void
2663  ResultRelInfo *relinfo,
2664  TupleTableSlot *remoteslot,
2665  LogicalRepTupleData *newtup,
2666  Oid localindexoid)
2667 {
2668  EState *estate = edata->estate;
2669  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2670  Relation localrel = relinfo->ri_RelationDesc;
2671  EPQState epqstate;
2672  TupleTableSlot *localslot;
2673  bool found;
2674  MemoryContext oldctx;
2675 
2676  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2677  ExecOpenIndices(relinfo, false);
2678 
2679  found = FindReplTupleInLocalRel(edata, localrel,
2680  &relmapentry->remoterel,
2681  localindexoid,
2682  remoteslot, &localslot);
2683  ExecClearTuple(remoteslot);
2684 
2685  /*
2686  * Tuple found.
2687  *
2688  * Note this will fail if there are other conflicting unique indexes.
2689  */
2690  if (found)
2691  {
2692  /* Process and store remote tuple in the slot */
2694  slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2695  MemoryContextSwitchTo(oldctx);
2696 
2697  EvalPlanQualSetSlot(&epqstate, remoteslot);
2698 
2699  /* Do the actual update. */
2701  ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2702  remoteslot);
2703  }
2704  else
2705  {
2706  /*
2707  * The tuple to be updated could not be found. Do nothing except for
2708  * emitting a log message.
2709  *
2710  * XXX should this be promoted to ereport(LOG) perhaps?
2711  */
2712  elog(DEBUG1,
2713  "logical replication did not find row to be updated "
2714  "in replication target relation \"%s\"",
2715  RelationGetRelationName(localrel));
2716  }
2717 
2718  /* Cleanup. */
2719  ExecCloseIndices(relinfo);
2720  EvalPlanQualEnd(&epqstate);
2721 }
2722 
2723 /*
2724  * Handle DELETE message.
2725  *
2726  * TODO: FDW support
2727  */
2728 static void
2730 {
2731  LogicalRepRelMapEntry *rel;
2732  LogicalRepTupleData oldtup;
2733  LogicalRepRelId relid;
2734  UserContext ucxt;
2735  ApplyExecutionData *edata;
2736  EState *estate;
2737  TupleTableSlot *remoteslot;
2738  MemoryContext oldctx;
2739  bool run_as_owner;
2740 
2741  /*
2742  * Quick return if we are skipping data modification changes or handling
2743  * streamed transactions.
2744  */
2745  if (is_skipping_changes() ||
2747  return;
2748 
2750 
2751  relid = logicalrep_read_delete(s, &oldtup);
2752  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2753  if (!should_apply_changes_for_rel(rel))
2754  {
2755  /*
2756  * The relation can't become interesting in the middle of the
2757  * transaction so it's safe to unlock it.
2758  */
2761  return;
2762  }
2763 
2764  /* Set relation for error callback */
2766 
2767  /* Check if we can do the delete. */
2769 
2770  /*
2771  * Make sure that any user-supplied code runs as the table owner, unless
2772  * the user has opted out of that behavior.
2773  */
2774  run_as_owner = MySubscription->runasowner;
2775  if (!run_as_owner)
2776  SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2777 
2778  /* Initialize the executor state. */
2779  edata = create_edata_for_relation(rel);
2780  estate = edata->estate;
2781  remoteslot = ExecInitExtraTupleSlot(estate,
2782  RelationGetDescr(rel->localrel),
2783  &TTSOpsVirtual);
2784 
2785  /* Build the search tuple. */
2787  slot_store_data(remoteslot, rel, &oldtup);
2788  MemoryContextSwitchTo(oldctx);
2789 
2790  /* For a partitioned table, apply delete to correct partition. */
2791  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2793  remoteslot, NULL, CMD_DELETE);
2794  else
2796  remoteslot, rel->localindexoid);
2797 
2798  finish_edata(edata);
2799 
2800  /* Reset relation for error callback */
2802 
2803  if (!run_as_owner)
2804  RestoreUserContext(&ucxt);
2805 
2807 
2809 }
2810 
2811 /*
2812  * Workhorse for apply_handle_delete()
2813  * relinfo is for the relation we're actually deleting from
2814  * (could be a child partition of edata->targetRelInfo)
2815  */
2816 static void
2818  ResultRelInfo *relinfo,
2819  TupleTableSlot *remoteslot,
2820  Oid localindexoid)
2821 {
2822  EState *estate = edata->estate;
2823  Relation localrel = relinfo->ri_RelationDesc;
2824  LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
2825  EPQState epqstate;
2826  TupleTableSlot *localslot;
2827  bool found;
2828 
2829  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2830  ExecOpenIndices(relinfo, false);
2831 
2832  found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
2833  remoteslot, &localslot);
2834 
2835  /* If found delete it. */
2836  if (found)
2837  {
2838  EvalPlanQualSetSlot(&epqstate, localslot);
2839 
2840  /* Do the actual delete. */
2842  ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
2843  }
2844  else
2845  {
2846  /*
2847  * The tuple to be deleted could not be found. Do nothing except for
2848  * emitting a log message.
2849  *
2850  * XXX should this be promoted to ereport(LOG) perhaps?
2851  */
2852  elog(DEBUG1,
2853  "logical replication did not find row to be deleted "
2854  "in replication target relation \"%s\"",
2855  RelationGetRelationName(localrel));
2856  }
2857 
2858  /* Cleanup. */
2859  ExecCloseIndices(relinfo);
2860  EvalPlanQualEnd(&epqstate);
2861 }
2862 
2863 /*
2864  * Try to find a tuple received from the publication side (in 'remoteslot') in
2865  * the corresponding local relation using either replica identity index,
2866  * primary key, index or if needed, sequential scan.
2867  *
2868  * Local tuple, if found, is returned in '*localslot'.
2869  */
2870 static bool
2872  LogicalRepRelation *remoterel,
2873  Oid localidxoid,
2874  TupleTableSlot *remoteslot,
2875  TupleTableSlot **localslot)
2876 {
2877  EState *estate = edata->estate;
2878  bool found;
2879 
2880  /*
2881  * Regardless of the top-level operation, we're performing a read here, so
2882  * check for SELECT privileges.
2883  */
2884  TargetPrivilegesCheck(localrel, ACL_SELECT);
2885 
2886  *localslot = table_slot_create(localrel, &estate->es_tupleTable);
2887 
2888  Assert(OidIsValid(localidxoid) ||
2889  (remoterel->replident == REPLICA_IDENTITY_FULL));
2890 
2891  if (OidIsValid(localidxoid))
2892  {
2893 #ifdef USE_ASSERT_CHECKING
2894  Relation idxrel = index_open(localidxoid, AccessShareLock);
2895 
2896  /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
2897  Assert(GetRelationIdentityOrPK(idxrel) == localidxoid ||
2899  edata->targetRel->attrmap));
2900  index_close(idxrel, AccessShareLock);
2901 #endif
2902 
2903  found = RelationFindReplTupleByIndex(localrel, localidxoid,
2905  remoteslot, *localslot);
2906  }
2907  else
2908  found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
2909  remoteslot, *localslot);
2910 
2911  return found;
2912 }
2913 
2914 /*
2915  * This handles insert, update, delete on a partitioned table.
2916  */
2917 static void
2919  TupleTableSlot *remoteslot,
2920  LogicalRepTupleData *newtup,
2921  CmdType operation)
2922 {
2923  EState *estate = edata->estate;
2924  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2925  ResultRelInfo *relinfo = edata->targetRelInfo;
2926  Relation parentrel = relinfo->ri_RelationDesc;
2927  ModifyTableState *mtstate;
2928  PartitionTupleRouting *proute;
2929  ResultRelInfo *partrelinfo;
2930  Relation partrel;
2931  TupleTableSlot *remoteslot_part;
2932  TupleConversionMap *map;
2933  MemoryContext oldctx;
2934  LogicalRepRelMapEntry *part_entry = NULL;
2935  AttrMap *attrmap = NULL;
2936 
2937  /* ModifyTableState is needed for ExecFindPartition(). */
2938  edata->mtstate = mtstate = makeNode(ModifyTableState);
2939  mtstate->ps.plan = NULL;
2940  mtstate->ps.state = estate;
2941  mtstate->operation = operation;
2942  mtstate->resultRelInfo = relinfo;
2943 
2944  /* ... as is PartitionTupleRouting. */
2945  edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
2946 
2947  /*
2948  * Find the partition to which the "search tuple" belongs.
2949  */
2950  Assert(remoteslot != NULL);
2952  partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
2953  remoteslot, estate);
2954  Assert(partrelinfo != NULL);
2955  partrel = partrelinfo->ri_RelationDesc;
2956 
2957  /*
2958  * Check for supported relkind. We need this since partitions might be of
2959  * unsupported relkinds; and the set of partitions can change, so checking
2960  * at CREATE/ALTER SUBSCRIPTION would be insufficient.
2961  */
2962  CheckSubscriptionRelkind(partrel->rd_rel->relkind,
2964  RelationGetRelationName(partrel));
2965 
2966  /*
2967  * To perform any of the operations below, the tuple must match the
2968  * partition's rowtype. Convert if needed or just copy, using a dedicated
2969  * slot to store the tuple in any case.
2970  */
2971  remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
2972  if (remoteslot_part == NULL)
2973  remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
2974  map = ExecGetRootToChildMap(partrelinfo, estate);
2975  if (map != NULL)
2976  {
2977  attrmap = map->attrMap;
2978  remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
2979  remoteslot_part);
2980  }
2981  else
2982  {
2983  remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
2984  slot_getallattrs(remoteslot_part);
2985  }
2986  MemoryContextSwitchTo(oldctx);
2987 
2988  /* Check if we can do the update or delete on the leaf partition. */
2989  if (operation == CMD_UPDATE || operation == CMD_DELETE)
2990  {
2991  part_entry = logicalrep_partition_open(relmapentry, partrel,
2992  attrmap);
2993  check_relation_updatable(part_entry);
2994  }
2995 
2996  switch (operation)
2997  {
2998  case CMD_INSERT:
2999  apply_handle_insert_internal(edata, partrelinfo,
3000  remoteslot_part);
3001  break;
3002 
3003  case CMD_DELETE:
3004  apply_handle_delete_internal(edata, partrelinfo,
3005  remoteslot_part,
3006  part_entry->localindexoid);
3007  break;
3008 
3009  case CMD_UPDATE:
3010 
3011  /*
3012  * For UPDATE, depending on whether or not the updated tuple
3013  * satisfies the partition's constraint, perform a simple UPDATE
3014  * of the partition or move the updated tuple into a different
3015  * suitable partition.
3016  */
3017  {
3018  TupleTableSlot *localslot;
3019  ResultRelInfo *partrelinfo_new;
3020  Relation partrel_new;
3021  bool found;
3022 
3023  /* Get the matching local tuple from the partition. */
3024  found = FindReplTupleInLocalRel(edata, partrel,
3025  &part_entry->remoterel,
3026  part_entry->localindexoid,
3027  remoteslot_part, &localslot);
3028  if (!found)
3029  {
3030  /*
3031  * The tuple to be updated could not be found. Do nothing
3032  * except for emitting a log message.
3033  *
3034  * XXX should this be promoted to ereport(LOG) perhaps?
3035  */
3036  elog(DEBUG1,
3037  "logical replication did not find row to be updated "
3038  "in replication target relation's partition \"%s\"",
3039  RelationGetRelationName(partrel));
3040  return;
3041  }
3042 
3043  /*
3044  * Apply the update to the local tuple, putting the result in
3045  * remoteslot_part.
3046  */
3048  slot_modify_data(remoteslot_part, localslot, part_entry,
3049  newtup);
3050  MemoryContextSwitchTo(oldctx);
3051 
3052  /*
3053  * Does the updated tuple still satisfy the current
3054  * partition's constraint?
3055  */
3056  if (!partrel->rd_rel->relispartition ||
3057  ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3058  false))
3059  {
3060  /*
3061  * Yes, so simply UPDATE the partition. We don't call
3062  * apply_handle_update_internal() here, which would
3063  * normally do the following work, to avoid repeating some
3064  * work already done above to find the local tuple in the
3065  * partition.
3066  */
3067  EPQState epqstate;
3068 
3069  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3070  ExecOpenIndices(partrelinfo, false);
3071 
3072  EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3074  ACL_UPDATE);
3075  ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3076  localslot, remoteslot_part);
3077  ExecCloseIndices(partrelinfo);
3078  EvalPlanQualEnd(&epqstate);
3079  }
3080  else
3081  {
3082  /* Move the tuple into the new partition. */
3083 
3084  /*
3085  * New partition will be found using tuple routing, which
3086  * can only occur via the parent table. We might need to
3087  * convert the tuple to the parent's rowtype. Note that
3088  * this is the tuple found in the partition, not the
3089  * original search tuple received by this function.
3090  */
3091  if (map)
3092  {
3093  TupleConversionMap *PartitionToRootMap =
3095  RelationGetDescr(parentrel));
3096 
3097  remoteslot =
3098  execute_attr_map_slot(PartitionToRootMap->attrMap,
3099  remoteslot_part, remoteslot);
3100  }
3101  else
3102  {
3103  remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3104  slot_getallattrs(remoteslot);
3105  }
3106 
3107  /* Find the new partition. */
3109  partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3110  proute, remoteslot,
3111  estate);
3112  MemoryContextSwitchTo(oldctx);
3113  Assert(partrelinfo_new != partrelinfo);
3114  partrel_new = partrelinfo_new->ri_RelationDesc;
3115 
3116  /* Check that new partition also has supported relkind. */
3117  CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3119  RelationGetRelationName(partrel_new));
3120 
3121  /* DELETE old tuple found in the old partition. */
3122  apply_handle_delete_internal(edata, partrelinfo,
3123  localslot,
3124  part_entry->localindexoid);
3125 
3126  /* INSERT new tuple into the new partition. */
3127 
3128  /*
3129  * Convert the replacement tuple to match the destination
3130  * partition rowtype.
3131  */
3133  remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3134  if (remoteslot_part == NULL)
3135  remoteslot_part = table_slot_create(partrel_new,
3136  &estate->es_tupleTable);
3137  map = ExecGetRootToChildMap(partrelinfo_new, estate);
3138  if (map != NULL)
3139  {
3140  remoteslot_part = execute_attr_map_slot(map->attrMap,
3141  remoteslot,
3142  remoteslot_part);
3143  }
3144  else
3145  {
3146  remoteslot_part = ExecCopySlot(remoteslot_part,
3147  remoteslot);
3148  slot_getallattrs(remoteslot);
3149  }
3150  MemoryContextSwitchTo(oldctx);
3151  apply_handle_insert_internal(edata, partrelinfo_new,
3152  remoteslot_part);
3153  }
3154  }
3155  break;
3156 
3157  default:
3158  elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3159  break;
3160  }
3161 }
3162 
3163 /*
3164  * Handle TRUNCATE message.
3165  *
3166  * TODO: FDW support
3167  */
3168 static void
3170 {
3171  bool cascade = false;
3172  bool restart_seqs = false;
3173  List *remote_relids = NIL;
3174  List *remote_rels = NIL;
3175  List *rels = NIL;
3176  List *part_rels = NIL;
3177  List *relids = NIL;
3178  List *relids_logged = NIL;
3179  ListCell *lc;
3180  LOCKMODE lockmode = AccessExclusiveLock;
3181 
3182  /*
3183  * Quick return if we are skipping data modification changes or handling
3184  * streamed transactions.
3185  */
3186  if (is_skipping_changes() ||
3188  return;
3189 
3191 
3192  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3193 
3194  foreach(lc, remote_relids)
3195  {
3196  LogicalRepRelId relid = lfirst_oid(lc);
3197  LogicalRepRelMapEntry *rel;
3198 
3199  rel = logicalrep_rel_open(relid, lockmode);
3200  if (!should_apply_changes_for_rel(rel))
3201  {
3202  /*
3203  * The relation can't become interesting in the middle of the
3204  * transaction so it's safe to unlock it.
3205  */
3206  logicalrep_rel_close(rel, lockmode);
3207  continue;
3208  }
3209 
3210  remote_rels = lappend(remote_rels, rel);
3212  rels = lappend(rels, rel->localrel);
3213  relids = lappend_oid(relids, rel->localreloid);
3215  relids_logged = lappend_oid(relids_logged, rel->localreloid);
3216 
3217  /*
3218  * Truncate partitions if we got a message to truncate a partitioned
3219  * table.
3220  */
3221  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3222  {
3223  ListCell *child;
3224  List *children = find_all_inheritors(rel->localreloid,
3225  lockmode,
3226  NULL);
3227 
3228  foreach(child, children)
3229  {
3230  Oid childrelid = lfirst_oid(child);
3231  Relation childrel;
3232 
3233  if (list_member_oid(relids, childrelid))
3234  continue;
3235 
3236  /* find_all_inheritors already got lock */
3237  childrel = table_open(childrelid, NoLock);
3238 
3239  /*
3240  * Ignore temp tables of other backends. See similar code in
3241  * ExecuteTruncate().
3242  */
3243  if (RELATION_IS_OTHER_TEMP(childrel))
3244  {
3245  table_close(childrel, lockmode);
3246  continue;
3247  }
3248 
3250  rels = lappend(rels, childrel);
3251  part_rels = lappend(part_rels, childrel);
3252  relids = lappend_oid(relids, childrelid);
3253  /* Log this relation only if needed for logical decoding */
3254  if (RelationIsLogicallyLogged(childrel))
3255  relids_logged = lappend_oid(relids_logged, childrelid);
3256  }
3257  }
3258  }
3259 
3260  /*
3261  * Even if we used CASCADE on the upstream primary we explicitly default
3262  * to replaying changes without further cascading. This might be later
3263  * changeable with a user specified option.
3264  *
3265  * MySubscription->runasowner tells us whether we want to execute
3266  * replication actions as the subscription owner; the last argument to
3267  * TruncateGuts tells it whether we want to switch to the table owner.
3268  * Those are exactly opposite conditions.
3269  */
3270  ExecuteTruncateGuts(rels,
3271  relids,
3272  relids_logged,
3273  DROP_RESTRICT,
3274  restart_seqs,
3276  foreach(lc, remote_rels)
3277  {
3278  LogicalRepRelMapEntry *rel = lfirst(lc);
3279 
3281  }
3282  foreach(lc, part_rels)
3283  {
3284  Relation rel = lfirst(lc);
3285 
3286  table_close(rel, NoLock);
3287  }
3288 
3290 }
3291 
3292 
3293 /*
3294  * Logical replication protocol message dispatcher.
3295  */
3296 void
3298 {
3300  LogicalRepMsgType saved_command;
3301 
3302  /*
3303  * Set the current command being applied. Since this function can be
3304  * called recursively when applying spooled changes, save the current
3305  * command.
3306  */
3307  saved_command = apply_error_callback_arg.command;
3309 
3310  switch (action)
3311  {
3312  case LOGICAL_REP_MSG_BEGIN:
3313  apply_handle_begin(s);
3314  break;
3315 
3318  break;
3319 
3322  break;
3323 
3326  break;
3327 
3330  break;
3331 
3334  break;
3335 
3338  break;
3339 
3340  case LOGICAL_REP_MSG_TYPE:
3341  apply_handle_type(s);
3342  break;
3343 
3346  break;
3347 
3349 
3350  /*
3351  * Logical replication does not use generic logical messages yet.
3352  * Although, it could be used by other applications that use this
3353  * output plugin.
3354  */
3355  break;
3356 
3359  break;
3360 
3363  break;
3364 
3367  break;
3368 
3371  break;
3372 
3375  break;
3376 
3379  break;
3380 
3383  break;
3384 
3387  break;
3388 
3391  break;
3392 
3393  default:
3394  ereport(ERROR,
3395  (errcode(ERRCODE_PROTOCOL_VIOLATION),
3396  errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3397  }
3398 
3399  /* Reset the current command */
3400  apply_error_callback_arg.command = saved_command;
3401 }
3402 
3403 /*
3404  * Figure out which write/flush positions to report to the walsender process.
3405  *
3406  * We can't simply report back the last LSN the walsender sent us because the
3407  * local transaction might not yet be flushed to disk locally. Instead we
3408  * build a list that associates local with remote LSNs for every commit. When
3409  * reporting back the flush position to the sender we iterate that list and
3410  * check which entries on it are already locally flushed. Those we can report
3411  * as having been flushed.
3412  *
3413  * The have_pending_txes is true if there are outstanding transactions that
3414  * need to be flushed.
3415  */
3416 static void
3418  bool *have_pending_txes)
3419 {
3420  dlist_mutable_iter iter;
3421  XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3422 
3424  *flush = InvalidXLogRecPtr;
3425 
3427  {
3428  FlushPosition *pos =
3429  dlist_container(FlushPosition, node, iter.cur);
3430 
3431  *write = pos->remote_end;
3432 
3433  if (pos->local_end <= local_flush)
3434  {
3435  *flush = pos->remote_end;
3436  dlist_delete(iter.cur);
3437  pfree(pos);
3438  }
3439  else
3440  {
3441  /*
3442  * Don't want to uselessly iterate over the rest of the list which
3443  * could potentially be long. Instead get the last element and
3444  * grab the write position from there.
3445  */
3446  pos = dlist_tail_element(FlushPosition, node,
3447  &lsn_mapping);
3448  *write = pos->remote_end;
3449  *have_pending_txes = true;
3450  return;
3451  }
3452  }
3453 
3454  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3455 }
3456 
3457 /*
3458  * Store current remote/local lsn pair in the tracking list.
3459  */
3460 void
3462 {
3463  FlushPosition *flushpos;
3464 
3465  /*
3466  * Skip for parallel apply workers, because the lsn_mapping is maintained
3467  * by the leader apply worker.
3468  */
3470  return;
3471 
3472  /* Need to do this in permanent context */
3474 
3475  /* Track commit lsn */
3476  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
3477  flushpos->local_end = local_lsn;
3478  flushpos->remote_end = remote_lsn;
3479 
3480  dlist_push_tail(&lsn_mapping, &flushpos->node);
3482 }
3483 
3484 
3485 /* Update statistics of the worker. */
3486 static void
3487 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
3488 {
3489  MyLogicalRepWorker->last_lsn = last_lsn;
3490  MyLogicalRepWorker->last_send_time = send_time;
3492  if (reply)
3493  {
3494  MyLogicalRepWorker->reply_lsn = last_lsn;
3495  MyLogicalRepWorker->reply_time = send_time;
3496  }
3497 }
3498 
3499 /*
3500  * Apply main loop.
3501  */
3502 static void
3504 {
3505  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3506  bool ping_sent = false;
3507  TimeLineID tli;
3508  ErrorContextCallback errcallback;
3509 
3510  /*
3511  * Init the ApplyMessageContext which we clean up after each replication
3512  * protocol message.
3513  */
3515  "ApplyMessageContext",
3517 
3518  /*
3519  * This memory context is used for per-stream data when the streaming mode
3520  * is enabled. This context is reset on each stream stop.
3521  */
3523  "LogicalStreamingContext",
3525 
3526  /* mark as idle, before starting to loop */
3528 
3529  /*
3530  * Push apply error context callback. Fields will be filled while applying
3531  * a change.
3532  */
3533  errcallback.callback = apply_error_callback;
3534  errcallback.previous = error_context_stack;
3535  error_context_stack = &errcallback;
3537 
3538  /* This outer loop iterates once per wait. */
3539  for (;;)
3540  {
3542  int rc;
3543  int len;
3544  char *buf = NULL;
3545  bool endofstream = false;
3546  long wait_time;
3547 
3549 
3551 
3553 
3554  if (len != 0)
3555  {
3556  /* Loop to process all available data (without blocking). */
3557  for (;;)
3558  {
3560 
3561  if (len == 0)
3562  {
3563  break;
3564  }
3565  else if (len < 0)
3566  {
3567  ereport(LOG,
3568  (errmsg("data stream from publisher has ended")));
3569  endofstream = true;
3570  break;
3571  }
3572  else
3573  {
3574  int c;
3575  StringInfoData s;
3576 
3577  if (ConfigReloadPending)
3578  {
3579  ConfigReloadPending = false;
3581  }
3582 
3583  /* Reset timeout. */
3584  last_recv_timestamp = GetCurrentTimestamp();
3585  ping_sent = false;
3586 
3587  /* Ensure we are reading the data into our memory context. */
3589 
3591 
3592  c = pq_getmsgbyte(&s);
3593 
3594  if (c == 'w')
3595  {
3596  XLogRecPtr start_lsn;
3597  XLogRecPtr end_lsn;
3598  TimestampTz send_time;
3599 
3600  start_lsn = pq_getmsgint64(&s);
3601  end_lsn = pq_getmsgint64(&s);
3602  send_time = pq_getmsgint64(&s);
3603 
3604  if (last_received < start_lsn)
3605  last_received = start_lsn;
3606 
3607  if (last_received < end_lsn)
3608  last_received = end_lsn;
3609 
3610  UpdateWorkerStats(last_received, send_time, false);
3611 
3612  apply_dispatch(&s);
3613  }
3614  else if (c == 'k')
3615  {
3616  XLogRecPtr end_lsn;
3618  bool reply_requested;
3619 
3620  end_lsn = pq_getmsgint64(&s);
3621  timestamp = pq_getmsgint64(&s);
3622  reply_requested = pq_getmsgbyte(&s);
3623 
3624  if (last_received < end_lsn)
3625  last_received = end_lsn;
3626 
3627  send_feedback(last_received, reply_requested, false);
3628  UpdateWorkerStats(last_received, timestamp, true);
3629  }
3630  /* other message types are purposefully ignored */
3631 
3633  }
3634 
3636  }
3637  }
3638 
3639  /* confirm all writes so far */
3640  send_feedback(last_received, false, false);
3641 
3643  {
3644  /*
3645  * If we didn't get any transactions for a while there might be
3646  * unconsumed invalidation messages in the queue, consume them
3647  * now.
3648  */
3651 
3652  /* Process any table synchronization changes. */
3653  process_syncing_tables(last_received);
3654  }
3655 
3656  /* Cleanup the memory. */
3659 
3660  /* Check if we need to exit the streaming loop. */
3661  if (endofstream)
3662  break;
3663 
3664  /*
3665  * Wait for more data or latch. If we have unflushed transactions,
3666  * wake up after WalWriterDelay to see if they've been flushed yet (in
3667  * which case we should send a feedback message). Otherwise, there's
3668  * no particular urgency about waking up unless we get data or a
3669  * signal.
3670  */
3671  if (!dlist_is_empty(&lsn_mapping))
3672  wait_time = WalWriterDelay;
3673  else
3674  wait_time = NAPTIME_PER_CYCLE;
3675 
3679  fd, wait_time,
3680  WAIT_EVENT_LOGICAL_APPLY_MAIN);
3681 
3682  if (rc & WL_LATCH_SET)
3683  {
3686  }
3687 
3688  if (ConfigReloadPending)
3689  {
3690  ConfigReloadPending = false;
3692  }
3693 
3694  if (rc & WL_TIMEOUT)
3695  {
3696  /*
3697  * We didn't receive anything new. If we haven't heard anything
3698  * from the server for more than wal_receiver_timeout / 2, ping
3699  * the server. Also, if it's been longer than
3700  * wal_receiver_status_interval since the last update we sent,
3701  * send a status update to the primary anyway, to report any
3702  * progress in applying WAL.
3703  */
3704  bool requestReply = false;
3705 
3706  /*
3707  * Check if time since last receive from primary has reached the
3708  * configured limit.
3709  */
3710  if (wal_receiver_timeout > 0)
3711  {
3713  TimestampTz timeout;
3714 
3715  timeout =
3716  TimestampTzPlusMilliseconds(last_recv_timestamp,
3718 
3719  if (now >= timeout)
3720  ereport(ERROR,
3721  (errcode(ERRCODE_CONNECTION_FAILURE),
3722  errmsg("terminating logical replication worker due to timeout")));
3723 
3724  /* Check to see if it's time for a ping. */
3725  if (!ping_sent)
3726  {
3727  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
3728  (wal_receiver_timeout / 2));
3729  if (now >= timeout)
3730  {
3731  requestReply = true;
3732  ping_sent = true;
3733  }
3734  }
3735  }
3736 
3737  send_feedback(last_received, requestReply, requestReply);
3738 
3739  /*
3740  * Force reporting to ensure long idle periods don't lead to
3741  * arbitrarily delayed stats. Stats can only be reported outside
3742  * of (implicit or explicit) transactions. That shouldn't lead to
3743  * stats being delayed for long, because transactions are either
3744  * sent as a whole on commit or streamed. Streamed transactions
3745  * are spilled to disk and applied on commit.
3746  */
3747  if (!IsTransactionState())
3748  pgstat_report_stat(true);
3749  }
3750  }
3751 
3752  /* Pop the error context stack */
3753  error_context_stack = errcallback.previous;
3755 
3756  /* All done */
3758 }
3759 
3760 /*
3761  * Send a Standby Status Update message to server.
3762  *
3763  * 'recvpos' is the latest LSN we've received data to, force is set if we need
3764  * to send a response to avoid timeouts.
3765  */
3766 static void
3767 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
3768 {
3769  static StringInfo reply_message = NULL;
3770  static TimestampTz send_time = 0;
3771 
3772  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
3773  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
3774  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
3775 
3776  XLogRecPtr writepos;
3777  XLogRecPtr flushpos;
3778  TimestampTz now;
3779  bool have_pending_txes;
3780 
3781  /*
3782  * If the user doesn't want status to be reported to the publisher, be
3783  * sure to exit before doing anything at all.
3784  */
3785  if (!force && wal_receiver_status_interval <= 0)
3786  return;
3787 
3788  /* It's legal to not pass a recvpos */
3789  if (recvpos < last_recvpos)
3790  recvpos = last_recvpos;
3791 
3792  get_flush_position(&writepos, &flushpos, &have_pending_txes);
3793 
3794  /*
3795  * No outstanding transactions to flush, we can report the latest received
3796  * position. This is important for synchronous replication.
3797  */
3798  if (!have_pending_txes)
3799  flushpos = writepos = recvpos;
3800 
3801  if (writepos < last_writepos)
3802  writepos = last_writepos;
3803 
3804  if (flushpos < last_flushpos)
3805  flushpos = last_flushpos;
3806 
3808 
3809  /* if we've already reported everything we're good */
3810  if (!force &&
3811  writepos == last_writepos &&
3812  flushpos == last_flushpos &&
3813  !TimestampDifferenceExceeds(send_time, now,
3815  return;
3816  send_time = now;
3817 
3818  if (!reply_message)
3819  {
3821 
3823  MemoryContextSwitchTo(oldctx);
3824  }
3825  else
3827 
3828  pq_sendbyte(reply_message, 'r');
3829  pq_sendint64(reply_message, recvpos); /* write */
3830  pq_sendint64(reply_message, flushpos); /* flush */
3831  pq_sendint64(reply_message, writepos); /* apply */
3832  pq_sendint64(reply_message, now); /* sendTime */
3833  pq_sendbyte(reply_message, requestReply); /* replyRequested */
3834 
3835  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
3836  force,
3837  LSN_FORMAT_ARGS(recvpos),
3838  LSN_FORMAT_ARGS(writepos),
3839  LSN_FORMAT_ARGS(flushpos));
3840 
3843 
3844  if (recvpos > last_recvpos)
3845  last_recvpos = recvpos;
3846  if (writepos > last_writepos)
3847  last_writepos = writepos;
3848  if (flushpos > last_flushpos)
3849  last_flushpos = flushpos;
3850 }
3851 
3852 /*
3853  * Exit routine for apply workers due to subscription parameter changes.
3854  */
3855 static void
3857 {
3859  {
3860  /*
3861  * Don't stop the parallel apply worker as the leader will detect the
3862  * subscription parameter change and restart logical replication later
3863  * anyway. This also prevents the leader from reporting errors when
3864  * trying to communicate with a stopped parallel apply worker, which
3865  * would accidentally disable subscriptions if disable_on_error was
3866  * set.
3867  */
3868  return;
3869  }
3870 
3871  /*
3872  * Reset the last-start time for this apply worker so that the launcher
3873  * will restart it without waiting for wal_retrieve_retry_interval if the
3874  * subscription is still active, and so that we won't leak that hash table
3875  * entry if it isn't.
3876  */
3877  if (am_leader_apply_worker())
3879 
3880  proc_exit(0);
3881 }
3882 
3883 /*
3884  * Reread subscription info if needed. Most changes will be exit.
3885  */
3886 void
3888 {
3889  MemoryContext oldctx;
3891  bool started_tx = false;
3892 
3893  /* When cache state is valid there is nothing to do here. */
3894  if (MySubscriptionValid)
3895  return;
3896 
3897  /* This function might be called inside or outside of transaction. */
3898  if (!IsTransactionState())
3899  {
3901  started_tx = true;
3902  }
3903 
3904  /* Ensure allocations in permanent context. */
3906 
3908 
3909  /*
3910  * Exit if the subscription was removed. This normally should not happen
3911  * as the worker gets killed during DROP SUBSCRIPTION.
3912  */
3913  if (!newsub)
3914  {
3915  ereport(LOG,
3916  (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
3917  MySubscription->name)));
3918 
3919  /* Ensure we remove no-longer-useful entry for worker's start time */
3920  if (am_leader_apply_worker())
3922 
3923  proc_exit(0);
3924  }
3925 
3926  /* Exit if the subscription was disabled. */
3927  if (!newsub->enabled)
3928  {
3929  ereport(LOG,
3930  (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
3931  MySubscription->name)));
3932 
3934  }
3935 
3936  /* !slotname should never happen when enabled is true. */
3937  Assert(newsub->slotname);
3938 
3939  /* two-phase should not be altered */
3940  Assert(newsub->twophasestate == MySubscription->twophasestate);
3941 
3942  /*
3943  * Exit if any parameter that affects the remote connection was changed.
3944  * The launcher will start a new worker but note that the parallel apply
3945  * worker won't restart if the streaming option's value is changed from
3946  * 'parallel' to any other value or the server decides not to stream the
3947  * in-progress transaction.
3948  */
3949  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
3950  strcmp(newsub->name, MySubscription->name) != 0 ||
3951  strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
3952  newsub->binary != MySubscription->binary ||
3953  newsub->stream != MySubscription->stream ||
3954  newsub->passwordrequired != MySubscription->passwordrequired ||
3955  strcmp(newsub->origin, MySubscription->origin) != 0 ||
3956  newsub->owner != MySubscription->owner ||
3957  !equal(newsub->publications, MySubscription->publications))
3958  {
3960  ereport(LOG,
3961  (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
3962  MySubscription->name)));
3963  else
3964  ereport(LOG,
3965  (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
3966  MySubscription->name)));
3967 
3969  }
3970 
3971  /*
3972  * Exit if the subscription owner's superuser privileges have been
3973  * revoked.
3974  */
3975  if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
3976  {
3978  ereport(LOG,
3979  errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
3980  MySubscription->name));
3981  else
3982  ereport(LOG,
3983  errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
3984  MySubscription->name));
3985 
3987  }
3988 
3989  /* Check for other changes that should never happen too. */
3990  if (newsub->dbid != MySubscription->dbid)
3991  {
3992  elog(ERROR, "subscription %u changed unexpectedly",
3994  }
3995 
3996  /* Clean old subscription info and switch to new one. */
3999 
4000  MemoryContextSwitchTo(oldctx);
4001 
4002  /* Change synchronous commit according to the user's wishes */
4003  SetConfigOption("synchronous_commit", MySubscription->synccommit,
4005 
4006  if (started_tx)
4008 
4009  MySubscriptionValid = true;
4010 }
4011 
4012 /*
4013  * Callback from subscription syscache invalidation.
4014  */
4015 static void
4016 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
4017 {
4018  MySubscriptionValid = false;
4019 }
4020 
4021 /*
4022  * subxact_info_write
4023  * Store information about subxacts for a toplevel transaction.
4024  *
4025  * For each subxact we store offset of it's first change in the main file.
4026  * The file is always over-written as a whole.
4027  *
4028  * XXX We should only store subxacts that were not aborted yet.
4029  */
4030 static void
4032 {
4033  char path[MAXPGPATH];
4034  Size len;
4035  BufFile *fd;
4036 
4038 
4039  /* construct the subxact filename */
4040  subxact_filename(path, subid, xid);
4041 
4042  /* Delete the subxacts file, if exists. */
4043  if (subxact_data.nsubxacts == 0)
4044  {
4047 
4048  return;
4049  }
4050 
4051  /*
4052  * Create the subxact file if it not already created, otherwise open the
4053  * existing file.
4054  */
4056  true);
4057  if (fd == NULL)
4059 
4060  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4061 
4062  /* Write the subxact count and subxact info */
4065 
4066  BufFileClose(fd);
4067 
4068  /* free the memory allocated for subxact info */
4070 }
4071 
4072 /*
4073  * subxact_info_read
4074  * Restore information about subxacts of a streamed transaction.
4075  *
4076  * Read information about subxacts into the structure subxact_data that can be
4077  * used later.
4078  */
4079 static void
4081 {
4082  char path[MAXPGPATH];
4083  Size len;
4084  BufFile *fd;
4085  MemoryContext oldctx;
4086 
4090 
4091  /*
4092  * If the subxact file doesn't exist that means we don't have any subxact
4093  * info.
4094  */
4095  subxact_filename(path, subid, xid);
4097  true);
4098  if (fd == NULL)
4099  return;
4100 
4101  /* read number of subxact items */
4103 
4104  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4105 
4106  /* we keep the maximum as a power of 2 */
4108 
4109  /*
4110  * Allocate subxact information in the logical streaming context. We need
4111  * this information during the complete stream so that we can add the sub
4112  * transaction info to this. On stream stop we will flush this information
4113  * to the subxact file and reset the logical streaming context.
4114  */
4117  sizeof(SubXactInfo));
4118  MemoryContextSwitchTo(oldctx);
4119 
4120  if (len > 0)
4122 
4123  BufFileClose(fd);
4124 }
4125 
4126 /*
4127  * subxact_info_add
4128  * Add information about a subxact (offset in the main file).
4129  */
4130 static void
4132 {
4133  SubXactInfo *subxacts = subxact_data.subxacts;
4134  int64 i;
4135 
4136  /* We must have a valid top level stream xid and a stream fd. */
4138  Assert(stream_fd != NULL);
4139 
4140  /*
4141  * If the XID matches the toplevel transaction, we don't want to add it.
4142  */
4143  if (stream_xid == xid)
4144  return;
4145 
4146  /*
4147  * In most cases we're checking the same subxact as we've already seen in
4148  * the last call, so make sure to ignore it (this change comes later).
4149  */
4150  if (subxact_data.subxact_last == xid)
4151  return;
4152 
4153  /* OK, remember we're processing this XID. */
4154  subxact_data.subxact_last = xid;
4155 
4156  /*
4157  * Check if the transaction is already present in the array of subxact. We
4158  * intentionally scan the array from the tail, because we're likely adding
4159  * a change for the most recent subtransactions.
4160  *
4161  * XXX Can we rely on the subxact XIDs arriving in sorted order? That
4162  * would allow us to use binary search here.
4163  */
4164  for (i = subxact_data.nsubxacts; i > 0; i--)
4165  {
4166  /* found, so we're done */
4167  if (subxacts[i - 1].xid == xid)
4168  return;
4169  }
4170 
4171  /* This is a new subxact, so we need to add it to the array. */
4172  if (subxact_data.nsubxacts == 0)
4173  {
4174  MemoryContext oldctx;
4175 
4177 
4178  /*
4179  * Allocate this memory for subxacts in per-stream context, see
4180  * subxact_info_read.
4181  */
4183  subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
4184  MemoryContextSwitchTo(oldctx);
4185  }
4187  {
4189  subxacts = repalloc(subxacts,
4191  }
4192 
4193  subxacts[subxact_data.nsubxacts].xid = xid;
4194 
4195  /*
4196  * Get the current offset of the stream file and store it as offset of
4197  * this subxact.
4198  */
4200  &subxacts[subxact_data.nsubxacts].fileno,
4201  &subxacts[subxact_data.nsubxacts].offset);
4202 
4204  subxact_data.subxacts = subxacts;
4205 }
4206 
4207 /* format filename for file containing the info about subxacts */
4208 static inline void
4209 subxact_filename(char *path, Oid subid, TransactionId xid)
4210 {
4211  snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
4212 }
4213 
4214 /* format filename for file containing serialized changes */
4215 static inline void
4216 changes_filename(char *path, Oid subid, TransactionId xid)
4217 {
4218  snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
4219 }
4220 
4221 /*
4222  * stream_cleanup_files
4223  * Cleanup files for a subscription / toplevel transaction.
4224  *
4225  * Remove files with serialized changes and subxact info for a particular
4226  * toplevel transaction. Each subscription has a separate set of files
4227  * for any toplevel transaction.
4228  */
4229 void
4231 {
4232  char path[MAXPGPATH];
4233 
4234  /* Delete the changes file. */
4235  changes_filename(path, subid, xid);
4237 
4238  /* Delete the subxact file, if it exists. */
4239  subxact_filename(path, subid, xid);
4241 }
4242 
4243 /*
4244  * stream_open_file
4245  * Open a file that we'll use to serialize changes for a toplevel
4246  * transaction.
4247  *
4248  * Open a file for streamed changes from a toplevel transaction identified
4249  * by stream_xid (global variable). If it's the first chunk of streamed
4250  * changes for this transaction, create the buffile, otherwise open the
4251  * previously created file.
4252  */
4253 static void
4254 stream_open_file(Oid subid, TransactionId xid, bool first_segment)
4255 {
4256  char path[MAXPGPATH];
4257  MemoryContext oldcxt;
4258 
4259  Assert(OidIsValid(subid));
4261  Assert(stream_fd == NULL);
4262 
4263 
4264  changes_filename(path, subid, xid);
4265  elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
4266 
4267  /*
4268  * Create/open the buffiles under the logical streaming context so that we
4269  * have those files until stream stop.
4270  */
4272 
4273  /*
4274  * If this is the first streamed segment, create the changes file.
4275  * Otherwise, just open the file for writing, in append mode.
4276  */
4277  if (first_segment)
4279  path);
4280  else
4281  {
4282  /*
4283  * Open the file and seek to the end of the file because we always
4284  * append the changes file.
4285  */
4287  path, O_RDWR, false);
4288  BufFileSeek(stream_fd, 0, 0, SEEK_END);
4289  }
4290 
4291  MemoryContextSwitchTo(oldcxt);
4292 }
4293 
4294 /*
4295  * stream_close_file
4296  * Close the currently open file with streamed changes.
4297  */
4298 static void
4300 {
4301  Assert(stream_fd != NULL);
4302 
4304 
4305  stream_fd = NULL;
4306 }
4307 
4308 /*
4309  * stream_write_change
4310  * Serialize a change to a file for the current toplevel transaction.
4311  *
4312  * The change is serialized in a simple format, with length (not including
4313  * the length), action code (identifying the message type) and message
4314  * contents (without the subxact TransactionId value).
4315  */
4316 static void
4318 {
4319  int len;
4320 
4321  Assert(stream_fd != NULL);
4322 
4323  /* total on-disk size, including the action type character */
4324  len = (s->len - s->cursor) + sizeof(char);
4325 
4326  /* first write the size */
4327  BufFileWrite(stream_fd, &len, sizeof(len));
4328 
4329  /* then the action */
4330  BufFileWrite(stream_fd, &action, sizeof(action));
4331 
4332  /* and finally the remaining part of the buffer (after the XID) */
4333  len = (s->len - s->cursor);
4334 
4335  BufFileWrite(stream_fd, &s->data[s->cursor], len);
4336 }
4337 
4338 /*
4339  * stream_open_and_write_change
4340  * Serialize a message to a file for the given transaction.
4341  *
4342  * This function is similar to stream_write_change except that it will open the
4343  * target file if not already before writing the message and close the file at
4344  * the end.
4345  */
4346 static void
4348 {
4350 
4351  if (!stream_fd)
4352  stream_start_internal(xid, false);
4353 
4355  stream_stop_internal(xid);
4356 }
4357 
4358 /*
4359  * Sets streaming options including replication slot name and origin start
4360  * position. Workers need these options for logical replication.
4361  */
4362 void
4364  char *slotname,
4365  XLogRecPtr *origin_startpos)
4366 {
4367  int server_version;
4368 
4369  options->logical = true;
4370  options->startpoint = *origin_startpos;
4371  options->slotname = slotname;
4372 
4374  options->proto.logical.proto_version =
4379 
4380  options->proto.logical.publication_names = MySubscription->publications;
4381  options->proto.logical.binary = MySubscription->binary;
4382 
4383  /*
4384  * Assign the appropriate option value for streaming option according to
4385  * the 'streaming' mode and the publisher's ability to support that mode.
4386  */
4387  if (server_version >= 160000 &&
4389  {
4390  options->proto.logical.streaming_str = "parallel";
4392  }
4393  else if (server_version >= 140000 &&
4395  {
4396  options->proto.logical.streaming_str = "on";
4398  }
4399  else
4400  {
4401  options->proto.logical.streaming_str = NULL;
4403  }
4404 
4405  options->proto.logical.twophase = false;
4406  options->proto.logical.origin = pstrdup(MySubscription->origin);
4407 }
4408 
4409 /*
4410  * Cleanup the memory for subxacts and reset the related variables.
4411  */
4412 static inline void
4414 {
4415  if (subxact_data.subxacts)
4417 
4418  subxact_data.subxacts = NULL;
4420  subxact_data.nsubxacts = 0;
4422 }
4423 
4424 /*
4425  * Form the prepared transaction GID for two_phase transactions.
4426  *
4427  * Return the GID in the supplied buffer.
4428  */
4429 static void
4430 TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
4431 {
4432  Assert(subid != InvalidRepOriginId);
4433 
4434  if (!TransactionIdIsValid(xid))
4435  ereport(ERROR,
4436  (errcode(ERRCODE_PROTOCOL_VIOLATION),
4437  errmsg_internal("invalid two-phase transaction ID")));
4438 
4439  snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
4440 }
4441 
4442 /*
4443  * Common function to run the apply loop with error handling. Disable the
4444  * subscription, if necessary.
4445  *
4446  * Note that we don't handle FATAL errors which are probably because
4447  * of system resource error and are not repeatable.
4448  */
4449 void
4450 start_apply(XLogRecPtr origin_startpos)
4451 {
4452  PG_TRY();
4453  {
4454  LogicalRepApplyLoop(origin_startpos);
4455  }
4456  PG_CATCH();
4457  {
4460  else
4461  {
4462  /*
4463  * Report the worker failed while applying changes. Abort the
4464  * current transaction so that the stats message is sent in an
4465  * idle state.
4466  */
4469 
4470  PG_RE_THROW();
4471  }
4472  }
4473  PG_END_TRY();
4474 }
4475 
4476 /*
4477  * Runs the leader apply worker.
4478  *
4479  * It sets up replication origin, streaming options and then starts streaming.
4480  */
4481 static void
4483 {
4484  char originname[NAMEDATALEN];
4485  XLogRecPtr origin_startpos = InvalidXLogRecPtr;
4486  char *slotname = NULL;
4488  RepOriginId originid;
4489  TimeLineID startpointTLI;
4490  char *err;
4491  bool must_use_password;
4492 
4493  slotname = MySubscription->slotname;
4494 
4495  /*
4496  * This shouldn't happen if the subscription is enabled, but guard against
4497  * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
4498  * slot is NULL.)
4499  */
4500  if (!slotname)
4501  ereport(ERROR,
4502  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
4503  errmsg("subscription has no replication slot set")));
4504 
4505  /* Setup replication origin tracking. */
4507  originname, sizeof(originname));
4509  originid = replorigin_by_name(originname, true);
4510  if (!OidIsValid(originid))
4511  originid = replorigin_create(originname);
4512  replorigin_session_setup(originid, 0);
4513  replorigin_session_origin = originid;
4514  origin_startpos = replorigin_session_get_progress(false);
4516 
4517  /* Is the use of a password mandatory? */
4518  must_use_password = MySubscription->passwordrequired &&
4520 
4522  true, must_use_password,
4523  MySubscription->name, &err);
4524 
4525  if (LogRepWorkerWalRcvConn == NULL)
4526  ereport(ERROR,
4527  (errcode(ERRCODE_CONNECTION_FAILURE),
4528  errmsg("could not connect to the publisher: %s", err)));
4529 
4530  /*
4531  * We don't really use the output identify_system for anything but it does
4532  * some initializations on the upstream so let's still call it.
4533  */
4534  (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
4535 
4536  set_apply_error_context_origin(originname);
4537 
4538  set_stream_options(&options, slotname, &origin_startpos);
4539 
4540  /*
4541  * Even when the two_phase mode is requested by the user, it remains as
4542  * the tri-state PENDING until all tablesyncs have reached READY state.
4543  * Only then, can it become ENABLED.
4544  *
4545  * Note: If the subscription has no tables then leave the state as
4546  * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
4547  * work.
4548  */
4551  {
4552  /* Start streaming with two_phase enabled */
4553  options.proto.logical.twophase = true;
4555 
4560  }
4561  else
4562  {
4564  }
4565 
4566  ereport(DEBUG1,
4567  (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
4572  "?")));
4573 
4574  /* Run the main loop. */
4575  start_apply(origin_startpos);
4576 }
4577 
4578 /*
4579  * Common initialization for leader apply worker, parallel apply worker and
4580  * tablesync worker.
4581  *
4582  * Initialize the database connection, in-memory subscription and necessary
4583  * config options.
4584  */
4585 void
4587 {
4588  MemoryContext oldctx;
4589 
4590  /* Run as replica session replication role. */
4591  SetConfigOption("session_replication_role", "replica",
4593 
4594  /* Connect to our database. */
4597  0);
4598 
4599  /*
4600  * Set always-secure search path, so malicious users can't redirect user
4601  * code (e.g. pg_index.indexprs).
4602  */
4603  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
4604 
4605  /* Load the subscription into persistent memory context. */
4607  "ApplyContext",
4611 
4613  if (!MySubscription)
4614  {
4615  ereport(LOG,
4616  (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
4618 
4619  /* Ensure we remove no-longer-useful entry for worker's start time */
4620  if (am_leader_apply_worker())
4622 
4623  proc_exit(0);
4624  }
4625 
4626  MySubscriptionValid = true;
4627  MemoryContextSwitchTo(oldctx);
4628 
4629  if (!MySubscription->enabled)
4630  {
4631  ereport(LOG,
4632  (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
4633  MySubscription->name)));
4634 
4636  }
4637 
4638  /* Setup synchronous commit according to the user's wishes */
4639  SetConfigOption("synchronous_commit", MySubscription->synccommit,
4641 
4642  /*
4643  * Keep us informed about subscription or role changes. Note that the
4644  * role's superuser privilege can be revoked.
4645  */
4646  CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
4648  (Datum) 0);
4649 
4652  (Datum) 0);
4653 
4654  if (am_tablesync_worker())
4655  ereport(LOG,
4656  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
4659  else
4660  ereport(LOG,
4661  (errmsg("logical replication apply worker for subscription \"%s\" has started",
4662  MySubscription->name)));
4663 
4665 }
4666 
4667 /* Common function to setup the leader apply or tablesync worker. */
4668 void
4669 SetupApplyOrSyncWorker(int worker_slot)
4670 {
4671  /* Attach to slot */
4672  logicalrep_worker_attach(worker_slot);
4673 
4675 
4676  /* Setup signal handling */
4678  pqsignal(SIGTERM, die);
4680 
4681  /*
4682  * We don't currently need any ResourceOwner in a walreceiver process, but
4683  * if we did, we could call CreateAuxProcessResourceOwner here.
4684  */
4685 
4686  /* Initialise stats to a sanish value */
4689 
4690  /* Load the libpq-specific functions */
4691  load_file("libpqwalreceiver", false);
4692 
4694 
4695  /* Connect to the origin and start the replication. */
4696  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
4698 
4699  /*
4700  * Setup callback for syscache so that we know when something changes in
4701  * the subscription relation state.
4702  */
4703  CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
4705  (Datum) 0);
4706 }
4707 
4708 /* Logical Replication Apply worker entry point */
4709 void
4711 {
4712  int worker_slot = DatumGetInt32(main_arg);
4713 
4714  InitializingApplyWorker = true;
4715 
4716  SetupApplyOrSyncWorker(worker_slot);
4717 
4718  InitializingApplyWorker = false;
4719 
4720  run_apply_worker();
4721 
4722  proc_exit(0);
4723 }
4724 
4725 /*
4726  * After error recovery, disable the subscription in a new transaction
4727  * and exit cleanly.
4728  */
4729 void
4731 {
4732  /*
4733  * Emit the error message, and recover from the error state to an idle
4734  * state
4735  */
4736  HOLD_INTERRUPTS();
4737 
4738  EmitErrorReport();
4740  FlushErrorState();
4741 
4743 
4744  /* Report the worker failed during either table synchronization or apply */
4746  !am_tablesync_worker());
4747 
4748  /* Disable the subscription */
4752 
4753  /* Ensure we remove no-longer-useful entry for worker's start time */
4754  if (am_leader_apply_worker())
4756 
4757  /* Notify the subscription has been disabled and exit */
4758  ereport(LOG,
4759  errmsg("subscription \"%s\" has been disabled because of an error",
4760  MySubscription->name));
4761 
4762  proc_exit(0);
4763 }
4764 
4765 /*
4766  * Is current process a logical replication worker?
4767  */
4768 bool
4770 {
4771  return MyLogicalRepWorker != NULL;
4772 }
4773 
4774 /*
4775  * Is current process a logical replication parallel apply worker?
4776  */
4777 bool
4779 {
4781 }
4782 
4783 /*
4784  * Start skipping changes of the transaction if the given LSN matches the
4785  * LSN specified by subscription's skiplsn.
4786  */
4787 static void
4789 {
4793 
4794  /*
4795  * Quick return if it's not requested to skip this transaction. This
4796  * function is called for every remote transaction and we assume that
4797  * skipping the transaction is not used often.
4798  */
4800  MySubscription->skiplsn != finish_lsn))
4801  return;
4802 
4803  /* Start skipping all changes of this transaction */
4804  skip_xact_finish_lsn = finish_lsn;
4805 
4806  ereport(LOG,
4807  errmsg("logical replication starts skipping transaction at LSN %X/%X",
4809 }
4810 
4811 /*
4812  * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
4813  */
4814 static void
4816 {
4817  if (!is_skipping_changes())
4818  return;
4819 
4820  ereport(LOG,
4821  (errmsg("logical replication completed skipping transaction at LSN %X/%X",
4823 
4824  /* Stop skipping changes */
4826 }
4827 
4828 /*
4829  * Clear subskiplsn of pg_subscription catalog.
4830  *
4831  * finish_lsn is the transaction's finish LSN that is used to check if the
4832  * subskiplsn matches it. If not matched, we raise a warning when clearing the
4833  * subskiplsn in order to inform users for cases e.g., where the user mistakenly
4834  * specified the wrong subskiplsn.
4835  */
4836 static void
4838 {
4839  Relation rel;
4840  Form_pg_subscription subform;
4841  HeapTuple tup;
4842  XLogRecPtr myskiplsn = MySubscription->skiplsn;
4843  bool started_tx = false;
4844 
4846  return;
4847 
4848  if (!IsTransactionState())
4849  {
4851  started_tx = true;
4852  }
4853 
4854  /*
4855  * Protect subskiplsn of pg_subscription from being concurrently updated
4856  * while clearing it.
4857  */
4858  LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
4859  AccessShareLock);
4860 
4861  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
4862 
4863  /* Fetch the existing tuple. */
4864  tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
4866 
4867  if (!HeapTupleIsValid(tup))
4868  elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
4869 
4870  subform = (Form_pg_subscription) GETSTRUCT(tup);
4871 
4872  /*
4873  * Clear the subskiplsn. If the user has already changed subskiplsn before
4874  * clearing it we don't update the catalog and the replication origin
4875  * state won't get advanced. So in the worst case, if the server crashes
4876  * before sending an acknowledgment of the flush position the transaction
4877  * will be sent again and the user needs to set subskiplsn again. We can
4878  * reduce the possibility by logging a replication origin WAL record to
4879  * advance the origin LSN instead but there is no way to advance the
4880  * origin timestamp and it doesn't seem to be worth doing anything about
4881  * it since it's a very rare case.
4882  */
4883  if (subform->subskiplsn == myskiplsn)
4884  {
4885  bool nulls[Natts_pg_subscription];
4886  bool replaces[Natts_pg_subscription];
4887  Datum values[Natts_pg_subscription];
4888 
4889  memset(values, 0, sizeof(values));
4890  memset(nulls, false, sizeof(nulls));
4891  memset(replaces, false, sizeof(replaces));
4892 
4893  /* reset subskiplsn */
4894  values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
4895  replaces[Anum_pg_subscription_subskiplsn - 1] = true;
4896 
4897  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
4898  replaces);
4899  CatalogTupleUpdate(rel, &tup->t_self, tup);
4900 
4901  if (myskiplsn != finish_lsn)
4902  ereport(WARNING,
4903  errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
4904  errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
4905  LSN_FORMAT_ARGS(finish_lsn),
4906  LSN_FORMAT_ARGS(myskiplsn)));
4907  }
4908 
4909  heap_freetuple(tup);
4910  table_close(rel, NoLock);
4911 
4912  if (started_tx)
4914 }
4915 
4916 /* Error callback to give more context info about the change being applied */
4917 void
4919 {
4921 
4923  return;
4924 
4925  Assert(errarg->origin_name);
4926 
4927  if (errarg->rel == NULL)
4928  {
4929  if (!TransactionIdIsValid(errarg->remote_xid))
4930  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
4931  errarg->origin_name,
4932  logicalrep_message_type(errarg->command));
4933  else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4934  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
4935  errarg->origin_name,
4937  errarg->remote_xid);
4938  else
4939  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
4940  errarg->origin_name,
4942  errarg->remote_xid,
4943  LSN_FORMAT_ARGS(errarg->finish_lsn));
4944  }
4945  else
4946  {
4947  if (errarg->remote_attnum < 0)
4948  {
4949  if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4950  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
4951  errarg->origin_name,
4953  errarg->rel->remoterel.nspname,
4954  errarg->rel->remoterel.relname,
4955  errarg->remote_xid);
4956  else
4957  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
4958  errarg->origin_name,
4960  errarg->rel->remoterel.nspname,
4961  errarg->rel->remoterel.relname,
4962  errarg->remote_xid,
4963  LSN_FORMAT_ARGS(errarg->finish_lsn));
4964  }
4965  else
4966  {
4967  if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4968  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
4969  errarg->origin_name,
4971  errarg->rel->remoterel.nspname,
4972  errarg->rel->remoterel.relname,
4973  errarg->rel->remoterel.attnames[errarg->remote_attnum],
4974  errarg->remote_xid);
4975  else
4976  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
4977  errarg->origin_name,
4979  errarg->rel->remoterel.nspname,
4980  errarg->rel->remoterel.relname,
4981  errarg->rel->remoterel.attnames[errarg->remote_attnum],
4982  errarg->remote_xid,
4983  LSN_FORMAT_ARGS(errarg->finish_lsn));
4984  }
4985  }
4986 }
4987 
4988 /* Set transaction information of apply error callback */
4989 static inline void
4991 {
4994 }
4995 
4996 /* Reset all information of apply error callback */
4997 static inline void
4999 {
5004 }
5005 
5006 /*
5007  * Request wakeup of the workers for the given subscription OID
5008  * at commit of the current transaction.
5009  *
5010  * This is used to ensure that the workers process assorted changes
5011  * as soon as possible.
5012  */
5013 void
5015 {
5016  MemoryContext oldcxt;
5017 
5021  MemoryContextSwitchTo(oldcxt);
5022 }
5023 
5024 /*
5025  * Wake up the workers of any subscriptions that were changed in this xact.
5026  */
5027 void
5029 {
5030  if (isCommit && on_commit_wakeup_workers_subids != NIL)
5031  {
5032  ListCell *lc;
5033 
5034  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
5035  foreach(lc, on_commit_wakeup_workers_subids)
5036  {
5037  Oid subid = lfirst_oid(lc);
5038  List *workers;
5039  ListCell *lc2;
5040 
5041  workers = logicalrep_workers_find(subid, true);
5042  foreach(lc2, workers)
5043  {
5044  LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
5045 
5047  }
5048  }
5049  LWLockRelease(LogicalRepWorkerLock);
5050  }
5051 
5052  /* The List storage will be reclaimed automatically in xact cleanup. */
5054 }
5055 
5056 /*
5057  * Allocate the origin name in long-lived context for error context message.
5058  */
5059 void
5061 {
5063  originname);
5064 }
5065 
5066 /*
5067  * Return the action to be taken for the given transaction. See
5068  * TransApplyAction for information on each of the actions.
5069  *
5070  * *winfo is assigned to the destination parallel worker info when the leader
5071  * apply worker has to pass all the transaction's changes to the parallel
5072  * apply worker.
5073  */
5074 static TransApplyAction
5076 {
5077  *winfo = NULL;
5078 
5080  {
5081  return TRANS_PARALLEL_APPLY;
5082  }
5083 
5084  /*
5085  * If we are processing this transaction using a parallel apply worker
5086  * then either we send the changes to the parallel worker or if the worker
5087  * is busy then serialize the changes to the file which will later be
5088  * processed by the parallel worker.
5089  */
5090  *winfo = pa_find_worker(xid);
5091 
5092  if (*winfo && (*winfo)->serialize_changes)
5093  {
5095  }
5096  else if (*winfo)
5097  {
5099  }
5100 
5101  /*
5102  * If there is no parallel worker involved to process this transaction
5103  * then we either directly apply the change or serialize it to a file
5104  * which will later be applied when the transaction finish message is
5105  * processed.
5106  */
5107  else if (in_streamed_transaction)
5108  {
5109  return TRANS_LEADER_SERIALIZE;
5110  }
5111  else
5112  {
5113  return TRANS_LEADER_APPLY;
5114  }
5115 }
AclResult
Definition: acl.h:181
@ ACLCHECK_OK
Definition: acl.h:182
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2701
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4081
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_reset_subtrans(void)
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerShared * MyParallelShared
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_allocate_worker(TransactionId xid)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void pa_decr_and_wait_stream_block(void)
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:381
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:2504
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:4209
static void begin_replication_step(void)
Definition: worker.c:533
static void end_replication_step(void)
Definition: worker.c:556
static void cleanup_subxact_info(void)
Definition: worker.c:4413
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
Definition: worker.c:4363
static void apply_handle_stream_prepare(StringInfo s)
Definition: worker.c:1296
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition: worker.c:2482
static void subxact_info_add(TransactionId xid)
Definition: worker.c:4131
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:677
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:4230
MemoryContext ApplyMessageContext
Definition: worker.c:315
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:493
static void apply_handle_type(StringInfo s)
Definition: worker.c:2349
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:3169
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:3487
static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
Definition: worker.c:4430
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:4016
static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
Definition: worker.c:5075
TransApplyAction
Definition: worker.c:291
@ TRANS_LEADER_SERIALIZE
Definition: worker.c:296
@ TRANS_PARALLEL_APPLY
Definition: worker.c:299
@ TRANS_LEADER_SEND_TO_PARALLEL
Definition: worker.c:297
@ TRANS_LEADER_APPLY
Definition: worker.c:293
@ TRANS_LEADER_PARTIAL_SERIALIZE
Definition: worker.c:298
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:584
static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
Definition: worker.c:4347
struct ApplyExecutionData ApplyExecutionData
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:4216
bool InitializingApplyWorker
Definition: worker.c:343
static void apply_worker_exit(void)
Definition: worker.c:3856
static BufFile * stream_fd
Definition: worker.c:364
static void apply_handle_update(StringInfo s)
Definition: worker.c:2545
void stream_stop_internal(TransactionId xid)
Definition: worker.c:1628
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:2156
void start_apply(XLogRecPtr origin_startpos)
Definition: worker.c:4450
static void stop_skipping_changes(void)
Definition: worker.c:4815
struct ApplySubXactData ApplySubXactData
#define NAPTIME_PER_CYCLE
Definition: worker.c:220
static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:2871
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:3417
static uint32 parallel_stream_nchanges
Definition: worker.c:340
static void apply_handle_commit_prepared(StringInfo s)
Definition: worker.c:1194
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:3503
void LogicalRepWorkersWakeupAtCommit(Oid subid)
Definition: worker.c:5014
bool IsLogicalWorker(void)
Definition: worker.c:4769
static ApplySubXactData subxact_data
Definition: worker.c:382
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
Definition: worker.c:2918
ApplyErrorCallbackArg apply_error_callback_arg
Definition: worker.c:303
bool in_remote_transaction
Definition: worker.c:328
static XLogRecPtr skip_xact_finish_lsn
Definition: worker.c:360
static void stream_open_file(Oid subid, TransactionId xid, bool first_segment)
Definition: worker.c:4254
static void apply_handle_delete(StringInfo s)
Definition: worker.c:2729
void apply_dispatch(StringInfo s)
Definition: worker.c:3297
#define is_skipping_changes()
Definition: worker.c:361
static void stream_write_change(char action, StringInfo s)
Definition: worker.c:4317
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
Definition: worker.c:4837
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
Definition: worker.c:2662
static void ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, off_t offset)
Definition: worker.c:1994
static void apply_handle_begin(StringInfo s)
Definition: worker.c:1016
void DisableSubscriptionAndExit(void)
Definition: worker.c:4730
static dlist_head lsn_mapping
Definition: worker.c:229
bool IsLogicalParallelApplyWorker(void)
Definition: worker.c:4778
void AtEOXact_LogicalRepWorkers(bool isCommit)
Definition: worker.c:5028
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:822
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:453
static void finish_edata(ApplyExecutionData *edata)
Definition: worker.c:734
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:923
static void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:4990
ErrorContextCallback * apply_error_context_stack
Definition: worker.c:313
static void stream_abort_internal(TransactionId xid, TransactionId subxid)
Definition: worker.c:1754
static void apply_handle_commit(StringInfo s)
Definition: worker.c:1041
void stream_start_internal(TransactionId xid, bool first_segment)
Definition: worker.c:1454
static List * on_commit_wakeup_workers_subids
Definition: worker.c:326
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:1837
static void apply_handle_relation(StringInfo s)
Definition: worker.c:2326
void set_apply_error_context_origin(char *originname)
Definition: worker.c:5060
struct ApplyErrorCallbackArg ApplyErrorCallbackArg
MemoryContext ApplyContext
Definition: worker.c:316
static void subxact_info_write(Oid subid, TransactionId xid)
Definition: worker.c:4031
static void TargetPrivilegesCheck(Relation rel, AclMode mode)
Definition: worker.c:2364
static void apply_handle_prepare(StringInfo s)
Definition: worker.c:1133
static void apply_handle_rollback_prepared(StringInfo s)
Definition: worker.c:1243
static void run_apply_worker()
Definition: worker.c:4482
void SetupApplyOrSyncWorker(int worker_slot)
Definition: worker.c:4669
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:1651
static void apply_handle_origin(StringInfo s)
Definition: worker.c:1433
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:3767
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:321
static XLogRecPtr remote_final_lsn
Definition: worker.c:329
static bool MySubscriptionValid
Definition: worker.c:324
void apply_error_callback(void *arg)
Definition: worker.c:4918
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition: worker.c:3461
static MemoryContext LogicalStreamingContext
Definition: worker.c:319
void maybe_reread_subscription(void)
Definition: worker.c:3887
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
Definition: worker.c:2266
void InitializeLogRepWorker(void)
Definition: worker.c:4586
static bool in_streamed_transaction
Definition: worker.c:332
struct SubXactInfo SubXactInfo
static void apply_handle_begin_prepare(StringInfo s)
Definition: worker.c:1067
struct FlushPosition FlushPosition
void ApplyWorkerMain(Datum main_arg)
Definition: worker.c:4710
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:2026
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:1492
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Definition: worker.c:4788
Subscription * MySubscription
Definition: worker.c:323
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
Definition: worker.c:1096
static void stream_close_file(void)
Definition: worker.c:4299
static TransactionId stream_xid
Definition: worker.c:334
static void apply_handle_insert(StringInfo s)
Definition: worker.c:2396
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:765
static void subxact_info_read(Oid subid, TransactionId xid)
Definition: worker.c:4080
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
Definition: worker.c:2817
static void reset_apply_error_context_info(void)
Definition: worker.c:4998
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1791
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1655
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1619
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_IDLE
@ STATE_IDLEINTRANSACTION
@ STATE_RUNNING
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:828
static Datum values[MAXATTR]
Definition: bootstrap.c:156
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:654
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:291
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
Definition: buffile.c:833
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition: buffile.c:676
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
Definition: buffile.c:664
void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset)
Definition: buffile.c:933
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:740
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition: buffile.c:267
void BufFileClose(BufFile *file)
Definition: buffile.c:412
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
Definition: buffile.c:364
unsigned int uint32
Definition: c.h:495
#define likely(x)
Definition: c.h:299
uint32 TransactionId
Definition: c.h:641
#define OidIsValid(objectId)
Definition: c.h:764
size_t Size
Definition: c.h:594
int64 TimestampTz
Definition: timestamp.h:39
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:144
int my_log2(long num)
Definition: dynahash.c:1760
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1162
void EmitErrorReport(void)
Definition: elog.c:1675
int errdetail(const char *fmt,...)
Definition: elog.c:1208
ErrorContextCallback * error_context_stack
Definition: elog.c:95
void FlushErrorState(void)
Definition: elog.c:1831
int errcode(int sqlerrcode)
Definition: elog.c:860
int errmsg(const char *fmt,...)
Definition: elog.c:1075