PostgreSQL Source Code  git master
worker.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * worker.c
3  * PostgreSQL logical replication worker (apply)
4  *
5  * Copyright (c) 2016-2023, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/worker.c
9  *
10  * NOTES
11  * This file contains the worker which applies logical changes as they come
12  * from remote logical replication stream.
13  *
14  * The main worker (apply) is started by logical replication worker
15  * launcher for every enabled subscription in a database. It uses
16  * walsender protocol to communicate with publisher.
17  *
18  * This module includes server facing code and shares libpqwalreceiver
19  * module with walreceiver for providing the libpq specific functionality.
20  *
21  *
22  * STREAMED TRANSACTIONS
23  * ---------------------
24  * Streamed transactions (large transactions exceeding a memory limit on the
25  * upstream) are applied using one of two approaches:
26  *
27  * 1) Write to temporary files and apply when the final commit arrives
28  *
29  * This approach is used when the user has set the subscription's streaming
30  * option as on.
31  *
32  * Unlike the regular (non-streamed) case, handling streamed transactions has
33  * to handle aborts of both the toplevel transaction and subtransactions. This
34  * is achieved by tracking offsets for subtransactions, which is then used
35  * to truncate the file with serialized changes.
36  *
37  * The files are placed in tmp file directory by default, and the filenames
38  * include both the XID of the toplevel transaction and OID of the
39  * subscription. This is necessary so that different workers processing a
40  * remote transaction with the same XID doesn't interfere.
41  *
42  * We use BufFiles instead of using normal temporary files because (a) the
43  * BufFile infrastructure supports temporary files that exceed the OS file size
44  * limit, (b) provides a way for automatic clean up on the error and (c) provides
45  * a way to survive these files across local transactions and allow to open and
46  * close at stream start and close. We decided to use FileSet
47  * infrastructure as without that it deletes the files on the closure of the
48  * file and if we decide to keep stream files open across the start/stop stream
49  * then it will consume a lot of memory (more than 8K for each BufFile and
50  * there could be multiple such BufFiles as the subscriber could receive
51  * multiple start/stop streams for different transactions before getting the
52  * commit). Moreover, if we don't use FileSet then we also need to invent
53  * a new way to pass filenames to BufFile APIs so that we are allowed to open
54  * the file we desired across multiple stream-open calls for the same
55  * transaction.
56  *
57  * 2) Parallel apply workers.
58  *
59  * This approach is used when the user has set the subscription's streaming
60  * option as parallel. See logical/applyparallelworker.c for information about
61  * this approach.
62  *
63  * TWO_PHASE TRANSACTIONS
64  * ----------------------
65  * Two phase transactions are replayed at prepare and then committed or
66  * rolled back at commit prepared and rollback prepared respectively. It is
67  * possible to have a prepared transaction that arrives at the apply worker
68  * when the tablesync is busy doing the initial copy. In this case, the apply
69  * worker skips all the prepared operations [e.g. inserts] while the tablesync
70  * is still busy (see the condition of should_apply_changes_for_rel). The
71  * tablesync worker might not get such a prepared transaction because say it
72  * was prior to the initial consistent point but might have got some later
73  * commits. Now, the tablesync worker will exit without doing anything for the
74  * prepared transaction skipped by the apply worker as the sync location for it
75  * will be already ahead of the apply worker's current location. This would lead
76  * to an "empty prepare", because later when the apply worker does the commit
77  * prepare, there is nothing in it (the inserts were skipped earlier).
78  *
79  * To avoid this, and similar prepare confusions the subscription's two_phase
80  * commit is enabled only after the initial sync is over. The two_phase option
81  * has been implemented as a tri-state with values DISABLED, PENDING, and
82  * ENABLED.
83  *
84  * Even if the user specifies they want a subscription with two_phase = on,
85  * internally it will start with a tri-state of PENDING which only becomes
86  * ENABLED after all tablesync initializations are completed - i.e. when all
87  * tablesync workers have reached their READY state. In other words, the value
88  * PENDING is only a temporary state for subscription start-up.
89  *
90  * Until the two_phase is properly available (ENABLED) the subscription will
91  * behave as if two_phase = off. When the apply worker detects that all
92  * tablesyncs have become READY (while the tri-state was PENDING) it will
93  * restart the apply worker process. This happens in
94  * process_syncing_tables_for_apply.
95  *
96  * When the (re-started) apply worker finds that all tablesyncs are READY for a
97  * two_phase tri-state of PENDING it start streaming messages with the
98  * two_phase option which in turn enables the decoding of two-phase commits at
99  * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100  * Now, it is possible that during the time we have not enabled two_phase, the
101  * publisher (replication server) would have skipped some prepares but we
102  * ensure that such prepares are sent along with commit prepare, see
103  * ReorderBufferFinishPrepared.
104  *
105  * If the subscription has no tables then a two_phase tri-state PENDING is
106  * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107  * PUBLICATION which might otherwise be disallowed (see below).
108  *
109  * If ever a user needs to be aware of the tri-state value, they can fetch it
110  * from the pg_subscription catalog (see column subtwophasestate).
111  *
112  * We don't allow to toggle two_phase option of a subscription because it can
113  * lead to an inconsistent replica. Consider, initially, it was on and we have
114  * received some prepare then we turn it off, now at commit time the server
115  * will send the entire transaction data along with the commit. With some more
116  * analysis, we can allow changing this option from off to on but not sure if
117  * that alone would be useful.
118  *
119  * Finally, to avoid problems mentioned in previous paragraphs from any
120  * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
121  * to 'off' and then again back to 'on') there is a restriction for
122  * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
123  * the two_phase tri-state is ENABLED, except when copy_data = false.
124  *
125  * We can get prepare of the same GID more than once for the genuine cases
126  * where we have defined multiple subscriptions for publications on the same
127  * server and prepared transaction has operations on tables subscribed to those
128  * subscriptions. For such cases, if we use the GID sent by publisher one of
129  * the prepares will be successful and others will fail, in which case the
130  * server will send them again. Now, this can lead to a deadlock if user has
131  * set synchronous_standby_names for all the subscriptions on subscriber. To
132  * avoid such deadlocks, we generate a unique GID (consisting of the
133  * subscription oid and the xid of the prepared transaction) for each prepare
134  * transaction on the subscriber.
135  *-------------------------------------------------------------------------
136  */
137 
138 #include "postgres.h"
139 
140 #include <sys/stat.h>
141 #include <unistd.h>
142 
143 #include "access/genam.h"
144 #include "access/table.h"
145 #include "access/tableam.h"
146 #include "access/twophase.h"
147 #include "access/xact.h"
148 #include "access/xlog_internal.h"
149 #include "catalog/catalog.h"
150 #include "catalog/indexing.h"
151 #include "catalog/namespace.h"
152 #include "catalog/partition.h"
153 #include "catalog/pg_inherits.h"
154 #include "catalog/pg_subscription.h"
156 #include "catalog/pg_tablespace.h"
157 #include "commands/tablecmds.h"
158 #include "commands/tablespace.h"
159 #include "commands/trigger.h"
160 #include "executor/executor.h"
161 #include "executor/execPartition.h"
163 #include "funcapi.h"
164 #include "libpq/pqformat.h"
165 #include "libpq/pqsignal.h"
166 #include "mb/pg_wchar.h"
167 #include "miscadmin.h"
168 #include "nodes/makefuncs.h"
169 #include "optimizer/optimizer.h"
170 #include "parser/parse_relation.h"
171 #include "pgstat.h"
172 #include "postmaster/bgworker.h"
173 #include "postmaster/interrupt.h"
174 #include "postmaster/postmaster.h"
175 #include "postmaster/walwriter.h"
176 #include "replication/decode.h"
177 #include "replication/logical.h"
182 #include "replication/origin.h"
184 #include "replication/snapbuild.h"
185 #include "replication/walreceiver.h"
187 #include "rewrite/rewriteHandler.h"
188 #include "storage/buffile.h"
189 #include "storage/bufmgr.h"
190 #include "storage/fd.h"
191 #include "storage/ipc.h"
192 #include "storage/lmgr.h"
193 #include "storage/proc.h"
194 #include "storage/procarray.h"
195 #include "tcop/tcopprot.h"
196 #include "utils/acl.h"
197 #include "utils/builtins.h"
198 #include "utils/catcache.h"
199 #include "utils/dynahash.h"
200 #include "utils/datum.h"
201 #include "utils/fmgroids.h"
202 #include "utils/guc.h"
203 #include "utils/inval.h"
204 #include "utils/lsyscache.h"
205 #include "utils/memutils.h"
206 #include "utils/pg_lsn.h"
207 #include "utils/rel.h"
208 #include "utils/rls.h"
209 #include "utils/syscache.h"
210 #include "utils/timeout.h"
211 #include "utils/usercontext.h"
212 
213 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
214 
215 typedef struct FlushPosition
216 {
221 
223 
224 typedef struct ApplyExecutionData
225 {
226  EState *estate; /* executor state, used to track resources */
227 
228  LogicalRepRelMapEntry *targetRel; /* replication target rel */
229  ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
230 
231  /* These fields are used when the target relation is partitioned: */
232  ModifyTableState *mtstate; /* dummy ModifyTable state */
233  PartitionTupleRouting *proute; /* partition routing info */
235 
236 /* Struct for saving and restoring apply errcontext information */
237 typedef struct ApplyErrorCallbackArg
238 {
239  LogicalRepMsgType command; /* 0 if invalid */
241 
242  /* Remote node information */
243  int remote_attnum; /* -1 if invalid */
246  char *origin_name;
248 
249 /*
250  * The action to be taken for the changes in the transaction.
251  *
252  * TRANS_LEADER_APPLY:
253  * This action means that we are in the leader apply worker or table sync
254  * worker. The changes of the transaction are either directly applied or
255  * are read from temporary files (for streaming transactions) and then
256  * applied by the worker.
257  *
258  * TRANS_LEADER_SERIALIZE:
259  * This action means that we are in the leader apply worker or table sync
260  * worker. Changes are written to temporary files and then applied when the
261  * final commit arrives.
262  *
263  * TRANS_LEADER_SEND_TO_PARALLEL:
264  * This action means that we are in the leader apply worker and need to send
265  * the changes to the parallel apply worker.
266  *
267  * TRANS_LEADER_PARTIAL_SERIALIZE:
268  * This action means that we are in the leader apply worker and have sent some
269  * changes directly to the parallel apply worker and the remaining changes are
270  * serialized to a file, due to timeout while sending data. The parallel apply
271  * worker will apply these serialized changes when the final commit arrives.
272  *
273  * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
274  * serializing changes, the leader worker also needs to serialize the
275  * STREAM_XXX message to a file, and wait for the parallel apply worker to
276  * finish the transaction when processing the transaction finish command. So
277  * this new action was introduced to keep the code and logic clear.
278  *
279  * TRANS_PARALLEL_APPLY:
280  * This action means that we are in the parallel apply worker and changes of
281  * the transaction are applied directly by the worker.
282  */
283 typedef enum
284 {
285  /* The action for non-streaming transactions. */
287 
288  /* Actions for streaming transactions. */
294 
295 /* errcontext tracker */
297 {
298  .command = 0,
299  .rel = NULL,
300  .remote_attnum = -1,
301  .remote_xid = InvalidTransactionId,
302  .finish_lsn = InvalidXLogRecPtr,
303  .origin_name = NULL,
304 };
305 
307 
310 
311 /* per stream context for streaming transactions */
313 
315 
317 static bool MySubscriptionValid = false;
318 
320 
323 
324 /* fields valid only when processing streamed transaction */
325 static bool in_streamed_transaction = false;
326 
328 
329 /*
330  * The number of changes applied by parallel apply worker during one streaming
331  * block.
332  */
334 
335 /* Are we initializing a apply worker? */
337 
338 /*
339  * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
340  * the subscription if the remote transaction's finish LSN matches the subskiplsn.
341  * Once we start skipping changes, we don't stop it until we skip all changes of
342  * the transaction even if pg_subscription is updated and MySubscription->skiplsn
343  * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
344  * we don't skip receiving and spooling the changes since we decide whether or not
345  * to skip applying the changes when starting to apply changes. The subskiplsn is
346  * cleared after successfully skipping the transaction or applying non-empty
347  * transaction. The latter prevents the mistakenly specified subskiplsn from
348  * being left. Note that we cannot skip the streaming transactions when using
349  * parallel apply workers because we cannot get the finish LSN before applying
350  * the changes. So, we don't start parallel apply worker when finish LSN is set
351  * by the user.
352  */
354 #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
355 
356 /* BufFile handle of the current streaming file */
357 static BufFile *stream_fd = NULL;
358 
359 typedef struct SubXactInfo
360 {
361  TransactionId xid; /* XID of the subxact */
362  int fileno; /* file number in the buffile */
363  off_t offset; /* offset in the file */
365 
366 /* Sub-transaction data for the current streaming transaction */
367 typedef struct ApplySubXactData
368 {
369  uint32 nsubxacts; /* number of sub-transactions */
370  uint32 nsubxacts_max; /* current capacity of subxacts */
371  TransactionId subxact_last; /* xid of the last sub-transaction */
372  SubXactInfo *subxacts; /* sub-xact offset in changes file */
374 
376 
377 static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
378 static inline void changes_filename(char *path, Oid subid, TransactionId xid);
379 
380 /*
381  * Information about subtransactions of a given toplevel transaction.
382  */
383 static void subxact_info_write(Oid subid, TransactionId xid);
384 static void subxact_info_read(Oid subid, TransactionId xid);
385 static void subxact_info_add(TransactionId xid);
386 static inline void cleanup_subxact_info(void);
387 
388 /*
389  * Serialize and deserialize changes for a toplevel transaction.
390  */
391 static void stream_open_file(Oid subid, TransactionId xid,
392  bool first_segment);
393 static void stream_write_change(char action, StringInfo s);
395 static void stream_close_file(void);
396 
397 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
398 
399 static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
401  ResultRelInfo *relinfo,
402  TupleTableSlot *remoteslot);
404  ResultRelInfo *relinfo,
405  TupleTableSlot *remoteslot,
406  LogicalRepTupleData *newtup,
407  Oid localindexoid);
409  ResultRelInfo *relinfo,
410  TupleTableSlot *remoteslot,
411  Oid localindexoid);
412 static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
413  LogicalRepRelation *remoterel,
414  Oid localidxoid,
415  TupleTableSlot *remoteslot,
416  TupleTableSlot **localslot);
418  TupleTableSlot *remoteslot,
419  LogicalRepTupleData *newtup,
420  CmdType operation);
421 
422 /* Compute GID for two_phase transactions */
423 static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
424 
425 /* Functions for skipping changes */
426 static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
427 static void stop_skipping_changes(void);
428 static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
429 
430 /* Functions for apply error callback */
431 static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
432 static inline void reset_apply_error_context_info(void);
433 
435  ParallelApplyWorkerInfo **winfo);
436 
437 /*
438  * Form the origin name for the subscription.
439  *
440  * This is a common function for tablesync and other workers. Tablesync workers
441  * must pass a valid relid. Other callers must pass relid = InvalidOid.
442  *
443  * Return the name in the supplied buffer.
444  */
445 void
447  char *originname, Size szoriginname)
448 {
449  if (OidIsValid(relid))
450  {
451  /* Replication origin name for tablesync workers. */
452  snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
453  }
454  else
455  {
456  /* Replication origin name for non-tablesync workers. */
457  snprintf(originname, szoriginname, "pg_%u", suboid);
458  }
459 }
460 
461 /*
462  * Should this worker apply changes for given relation.
463  *
464  * This is mainly needed for initial relation data sync as that runs in
465  * separate worker process running in parallel and we need some way to skip
466  * changes coming to the leader apply worker during the sync of a table.
467  *
468  * Note we need to do smaller or equals comparison for SYNCDONE state because
469  * it might hold position of end of initial slot consistent point WAL
470  * record + 1 (ie start of next record) and next record can be COMMIT of
471  * transaction we are now processing (which is what we set remote_final_lsn
472  * to in apply_handle_begin).
473  *
474  * Note that for streaming transactions that are being applied in the parallel
475  * apply worker, we disallow applying changes if the target table in the
476  * subscription is not in the READY state, because we cannot decide whether to
477  * apply the change as we won't know remote_final_lsn by that time.
478  *
479  * We already checked this in pa_can_start() before assigning the
480  * streaming transaction to the parallel worker, but it also needs to be
481  * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
482  * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
483  * while applying this transaction.
484  */
485 static bool
487 {
488  switch (MyLogicalRepWorker->type)
489  {
491  return MyLogicalRepWorker->relid == rel->localreloid;
492 
494  /* We don't synchronize rel's that are in unknown state. */
495  if (rel->state != SUBREL_STATE_READY &&
496  rel->state != SUBREL_STATE_UNKNOWN)
497  ereport(ERROR,
498  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
499  errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
501  errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
502 
503  return rel->state == SUBREL_STATE_READY;
504 
505  case WORKERTYPE_APPLY:
506  return (rel->state == SUBREL_STATE_READY ||
507  (rel->state == SUBREL_STATE_SYNCDONE &&
508  rel->statelsn <= remote_final_lsn));
509 
510  case WORKERTYPE_UNKNOWN:
511  /* Should never happen. */
512  elog(ERROR, "Unknown worker type");
513  }
514 
515  return false; /* dummy for compiler */
516 }
517 
518 /*
519  * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
520  *
521  * Start a transaction, if this is the first step (else we keep using the
522  * existing transaction).
523  * Also provide a global snapshot and ensure we run in ApplyMessageContext.
524  */
525 static void
527 {
529 
530  if (!IsTransactionState())
531  {
534  }
535 
537 
539 }
540 
541 /*
542  * Finish up one step of a replication transaction.
543  * Callers of begin_replication_step() must also call this.
544  *
545  * We don't close out the transaction here, but we should increment
546  * the command counter to make the effects of this step visible.
547  */
548 static void
550 {
552 
554 }
555 
556 /*
557  * Handle streamed transactions for both the leader apply worker and the
558  * parallel apply workers.
559  *
560  * In the streaming case (receiving a block of the streamed transaction), for
561  * serialize mode, simply redirect it to a file for the proper toplevel
562  * transaction, and for parallel mode, the leader apply worker will send the
563  * changes to parallel apply workers and the parallel apply worker will define
564  * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
565  * messages will be applied by both leader apply worker and parallel apply
566  * workers).
567  *
568  * Returns true for streamed transactions (when the change is either serialized
569  * to file or sent to parallel apply worker), false otherwise (regular mode or
570  * needs to be processed by parallel apply worker).
571  *
572  * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
573  * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
574  * to a parallel apply worker.
575  */
576 static bool
578 {
579  TransactionId current_xid;
581  TransApplyAction apply_action;
582  StringInfoData original_msg;
583 
584  apply_action = get_transaction_apply_action(stream_xid, &winfo);
585 
586  /* not in streaming mode */
587  if (apply_action == TRANS_LEADER_APPLY)
588  return false;
589 
591 
592  /*
593  * The parallel apply worker needs the xid in this message to decide
594  * whether to define a savepoint, so save the original message that has
595  * not moved the cursor after the xid. We will serialize this message to a
596  * file in PARTIAL_SERIALIZE mode.
597  */
598  original_msg = *s;
599 
600  /*
601  * We should have received XID of the subxact as the first part of the
602  * message, so extract it.
603  */
604  current_xid = pq_getmsgint(s, 4);
605 
606  if (!TransactionIdIsValid(current_xid))
607  ereport(ERROR,
608  (errcode(ERRCODE_PROTOCOL_VIOLATION),
609  errmsg_internal("invalid transaction ID in streamed replication transaction")));
610 
611  switch (apply_action)
612  {
614  Assert(stream_fd);
615 
616  /* Add the new subxact to the array (unless already there). */
617  subxact_info_add(current_xid);
618 
619  /* Write the change to the current file */
621  return true;
622 
624  Assert(winfo);
625 
626  /*
627  * XXX The publisher side doesn't always send relation/type update
628  * messages after the streaming transaction, so also update the
629  * relation/type in leader apply worker. See function
630  * cleanup_rel_sync_cache.
631  */
632  if (pa_send_data(winfo, s->len, s->data))
633  return (action != LOGICAL_REP_MSG_RELATION &&
635 
636  /*
637  * Switch to serialize mode when we are not able to send the
638  * change to parallel apply worker.
639  */
640  pa_switch_to_partial_serialize(winfo, false);
641 
642  /* fall through */
644  stream_write_change(action, &original_msg);
645 
646  /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
647  return (action != LOGICAL_REP_MSG_RELATION &&
649 
652 
653  /* Define a savepoint for a subxact if needed. */
654  pa_start_subtrans(current_xid, stream_xid);
655  return false;
656 
657  default:
658  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
659  return false; /* silence compiler warning */
660  }
661 }
662 
663 /*
664  * Executor state preparation for evaluation of constraint expressions,
665  * indexes and triggers for the specified relation.
666  *
667  * Note that the caller must open and close any indexes to be updated.
668  */
669 static ApplyExecutionData *
671 {
672  ApplyExecutionData *edata;
673  EState *estate;
674  RangeTblEntry *rte;
675  List *perminfos = NIL;
676  ResultRelInfo *resultRelInfo;
677 
678  edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
679  edata->targetRel = rel;
680 
681  edata->estate = estate = CreateExecutorState();
682 
683  rte = makeNode(RangeTblEntry);
684  rte->rtekind = RTE_RELATION;
685  rte->relid = RelationGetRelid(rel->localrel);
686  rte->relkind = rel->localrel->rd_rel->relkind;
688 
689  addRTEPermissionInfo(&perminfos, rte);
690 
691  ExecInitRangeTable(estate, list_make1(rte), perminfos);
692 
693  edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
694 
695  /*
696  * Use Relation opened by logicalrep_rel_open() instead of opening it
697  * again.
698  */
699  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
700 
701  /*
702  * We put the ResultRelInfo in the es_opened_result_relations list, even
703  * though we don't populate the es_result_relations array. That's a bit
704  * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
705  *
706  * ExecOpenIndices() is not called here either, each execution path doing
707  * an apply operation being responsible for that.
708  */
710  lappend(estate->es_opened_result_relations, resultRelInfo);
711 
712  estate->es_output_cid = GetCurrentCommandId(true);
713 
714  /* Prepare to catch AFTER triggers. */
716 
717  /* other fields of edata remain NULL for now */
718 
719  return edata;
720 }
721 
722 /*
723  * Finish any operations related to the executor state created by
724  * create_edata_for_relation().
725  */
726 static void
728 {
729  EState *estate = edata->estate;
730 
731  /* Handle any queued AFTER triggers. */
732  AfterTriggerEndQuery(estate);
733 
734  /* Shut down tuple routing, if any was done. */
735  if (edata->proute)
736  ExecCleanupTupleRouting(edata->mtstate, edata->proute);
737 
738  /*
739  * Cleanup. It might seem that we should call ExecCloseResultRelations()
740  * here, but we intentionally don't. It would close the rel we added to
741  * es_opened_result_relations above, which is wrong because we took no
742  * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
743  * any other relations opened during execution.
744  */
745  ExecResetTupleTable(estate->es_tupleTable, false);
746  FreeExecutorState(estate);
747  pfree(edata);
748 }
749 
750 /*
751  * Executes default values for columns for which we can't map to remote
752  * relation columns.
753  *
754  * This allows us to support tables which have more columns on the downstream
755  * than on the upstream.
756  */
757 static void
759  TupleTableSlot *slot)
760 {
761  TupleDesc desc = RelationGetDescr(rel->localrel);
762  int num_phys_attrs = desc->natts;
763  int i;
764  int attnum,
765  num_defaults = 0;
766  int *defmap;
767  ExprState **defexprs;
768  ExprContext *econtext;
769 
770  econtext = GetPerTupleExprContext(estate);
771 
772  /* We got all the data via replication, no need to evaluate anything. */
773  if (num_phys_attrs == rel->remoterel.natts)
774  return;
775 
776  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
777  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
778 
779  Assert(rel->attrmap->maplen == num_phys_attrs);
780  for (attnum = 0; attnum < num_phys_attrs; attnum++)
781  {
782  Expr *defexpr;
783 
784  if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
785  continue;
786 
787  if (rel->attrmap->attnums[attnum] >= 0)
788  continue;
789 
790  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
791 
792  if (defexpr != NULL)
793  {
794  /* Run the expression through planner */
795  defexpr = expression_planner(defexpr);
796 
797  /* Initialize executable expression in copycontext */
798  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
799  defmap[num_defaults] = attnum;
800  num_defaults++;
801  }
802  }
803 
804  for (i = 0; i < num_defaults; i++)
805  slot->tts_values[defmap[i]] =
806  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
807 }
808 
809 /*
810  * Store tuple data into slot.
811  *
812  * Incoming data can be either text or binary format.
813  */
814 static void
816  LogicalRepTupleData *tupleData)
817 {
818  int natts = slot->tts_tupleDescriptor->natts;
819  int i;
820 
821  ExecClearTuple(slot);
822 
823  /* Call the "in" function for each non-dropped, non-null attribute */
824  Assert(natts == rel->attrmap->maplen);
825  for (i = 0; i < natts; i++)
826  {
828  int remoteattnum = rel->attrmap->attnums[i];
829 
830  if (!att->attisdropped && remoteattnum >= 0)
831  {
832  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
833 
834  Assert(remoteattnum < tupleData->ncols);
835 
836  /* Set attnum for error callback */
838 
839  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
840  {
841  Oid typinput;
842  Oid typioparam;
843 
844  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
845  slot->tts_values[i] =
846  OidInputFunctionCall(typinput, colvalue->data,
847  typioparam, att->atttypmod);
848  slot->tts_isnull[i] = false;
849  }
850  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
851  {
852  Oid typreceive;
853  Oid typioparam;
854 
855  /*
856  * In some code paths we may be asked to re-parse the same
857  * tuple data. Reset the StringInfo's cursor so that works.
858  */
859  colvalue->cursor = 0;
860 
861  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
862  slot->tts_values[i] =
863  OidReceiveFunctionCall(typreceive, colvalue,
864  typioparam, att->atttypmod);
865 
866  /* Trouble if it didn't eat the whole buffer */
867  if (colvalue->cursor != colvalue->len)
868  ereport(ERROR,
869  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
870  errmsg("incorrect binary data format in logical replication column %d",
871  remoteattnum + 1)));
872  slot->tts_isnull[i] = false;
873  }
874  else
875  {
876  /*
877  * NULL value from remote. (We don't expect to see
878  * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
879  * NULL.)
880  */
881  slot->tts_values[i] = (Datum) 0;
882  slot->tts_isnull[i] = true;
883  }
884 
885  /* Reset attnum for error callback */
887  }
888  else
889  {
890  /*
891  * We assign NULL to dropped attributes and missing values
892  * (missing values should be later filled using
893  * slot_fill_defaults).
894  */
895  slot->tts_values[i] = (Datum) 0;
896  slot->tts_isnull[i] = true;
897  }
898  }
899 
900  ExecStoreVirtualTuple(slot);
901 }
902 
903 /*
904  * Replace updated columns with data from the LogicalRepTupleData struct.
905  * This is somewhat similar to heap_modify_tuple but also calls the type
906  * input functions on the user data.
907  *
908  * "slot" is filled with a copy of the tuple in "srcslot", replacing
909  * columns provided in "tupleData" and leaving others as-is.
910  *
911  * Caution: unreplaced pass-by-ref columns in "slot" will point into the
912  * storage for "srcslot". This is OK for current usage, but someday we may
913  * need to materialize "slot" at the end to make it independent of "srcslot".
914  */
915 static void
918  LogicalRepTupleData *tupleData)
919 {
920  int natts = slot->tts_tupleDescriptor->natts;
921  int i;
922 
923  /* We'll fill "slot" with a virtual tuple, so we must start with ... */
924  ExecClearTuple(slot);
925 
926  /*
927  * Copy all the column data from srcslot, so that we'll have valid values
928  * for unreplaced columns.
929  */
930  Assert(natts == srcslot->tts_tupleDescriptor->natts);
931  slot_getallattrs(srcslot);
932  memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
933  memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
934 
935  /* Call the "in" function for each replaced attribute */
936  Assert(natts == rel->attrmap->maplen);
937  for (i = 0; i < natts; i++)
938  {
940  int remoteattnum = rel->attrmap->attnums[i];
941 
942  if (remoteattnum < 0)
943  continue;
944 
945  Assert(remoteattnum < tupleData->ncols);
946 
947  if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
948  {
949  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
950 
951  /* Set attnum for error callback */
953 
954  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
955  {
956  Oid typinput;
957  Oid typioparam;
958 
959  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
960  slot->tts_values[i] =
961  OidInputFunctionCall(typinput, colvalue->data,
962  typioparam, att->atttypmod);
963  slot->tts_isnull[i] = false;
964  }
965  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
966  {
967  Oid typreceive;
968  Oid typioparam;
969 
970  /*
971  * In some code paths we may be asked to re-parse the same
972  * tuple data. Reset the StringInfo's cursor so that works.
973  */
974  colvalue->cursor = 0;
975 
976  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
977  slot->tts_values[i] =
978  OidReceiveFunctionCall(typreceive, colvalue,
979  typioparam, att->atttypmod);
980 
981  /* Trouble if it didn't eat the whole buffer */
982  if (colvalue->cursor != colvalue->len)
983  ereport(ERROR,
984  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
985  errmsg("incorrect binary data format in logical replication column %d",
986  remoteattnum + 1)));
987  slot->tts_isnull[i] = false;
988  }
989  else
990  {
991  /* must be LOGICALREP_COLUMN_NULL */
992  slot->tts_values[i] = (Datum) 0;
993  slot->tts_isnull[i] = true;
994  }
995 
996  /* Reset attnum for error callback */
998  }
999  }
1000 
1001  /* And finally, declare that "slot" contains a valid virtual tuple */
1002  ExecStoreVirtualTuple(slot);
1003 }
1004 
1005 /*
1006  * Handle BEGIN message.
1007  */
1008 static void
1010 {
1011  LogicalRepBeginData begin_data;
1012 
1013  /* There must not be an active streaming transaction. */
1015 
1016  logicalrep_read_begin(s, &begin_data);
1017  set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
1018 
1019  remote_final_lsn = begin_data.final_lsn;
1020 
1022 
1023  in_remote_transaction = true;
1024 
1026 }
1027 
1028 /*
1029  * Handle COMMIT message.
1030  *
1031  * TODO, support tracking of multiple origins
1032  */
1033 static void
1035 {
1036  LogicalRepCommitData commit_data;
1037 
1038  logicalrep_read_commit(s, &commit_data);
1039 
1040  if (commit_data.commit_lsn != remote_final_lsn)
1041  ereport(ERROR,
1042  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1043  errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
1044  LSN_FORMAT_ARGS(commit_data.commit_lsn),
1046 
1047  apply_handle_commit_internal(&commit_data);
1048 
1049  /* Process any tables that are being synchronized in parallel. */
1050  process_syncing_tables(commit_data.end_lsn);
1051 
1054 }
1055 
1056 /*
1057  * Handle BEGIN PREPARE message.
1058  */
1059 static void
1061 {
1062  LogicalRepPreparedTxnData begin_data;
1063 
1064  /* Tablesync should never receive prepare. */
1065  if (am_tablesync_worker())
1066  ereport(ERROR,
1067  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1068  errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1069 
1070  /* There must not be an active streaming transaction. */
1072 
1073  logicalrep_read_begin_prepare(s, &begin_data);
1074  set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1075 
1076  remote_final_lsn = begin_data.prepare_lsn;
1077 
1079 
1080  in_remote_transaction = true;
1081 
1083 }
1084 
1085 /*
1086  * Common function to prepare the GID.
1087  */
1088 static void
1090 {
1091  char gid[GIDSIZE];
1092 
1093  /*
1094  * Compute unique GID for two_phase transactions. We don't use GID of
1095  * prepared transaction sent by server as that can lead to deadlock when
1096  * we have multiple subscriptions from same node point to publications on
1097  * the same node. See comments atop worker.c
1098  */
1099  TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
1100  gid, sizeof(gid));
1101 
1102  /*
1103  * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1104  * called within the PrepareTransactionBlock below.
1105  */
1106  if (!IsTransactionBlock())
1107  {
1109  CommitTransactionCommand(); /* Completes the preceding Begin command. */
1110  }
1111 
1112  /*
1113  * Update origin state so we can restart streaming from correct position
1114  * in case of crash.
1115  */
1116  replorigin_session_origin_lsn = prepare_data->end_lsn;
1118 
1120 }
1121 
1122 /*
1123  * Handle PREPARE message.
1124  */
1125 static void
1127 {
1128  LogicalRepPreparedTxnData prepare_data;
1129 
1130  logicalrep_read_prepare(s, &prepare_data);
1131 
1132  if (prepare_data.prepare_lsn != remote_final_lsn)
1133  ereport(ERROR,
1134  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1135  errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
1136  LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1138 
1139  /*
1140  * Unlike commit, here, we always prepare the transaction even though no
1141  * change has happened in this transaction or all changes are skipped. It
1142  * is done this way because at commit prepared time, we won't know whether
1143  * we have skipped preparing a transaction because of those reasons.
1144  *
1145  * XXX, We can optimize such that at commit prepared time, we first check
1146  * whether we have prepared the transaction or not but that doesn't seem
1147  * worthwhile because such cases shouldn't be common.
1148  */
1150 
1151  apply_handle_prepare_internal(&prepare_data);
1152 
1155  pgstat_report_stat(false);
1156 
1158 
1159  in_remote_transaction = false;
1160 
1161  /* Process any tables that are being synchronized in parallel. */
1162  process_syncing_tables(prepare_data.end_lsn);
1163 
1164  /*
1165  * Since we have already prepared the transaction, in a case where the
1166  * server crashes before clearing the subskiplsn, it will be left but the
1167  * transaction won't be resent. But that's okay because it's a rare case
1168  * and the subskiplsn will be cleared when finishing the next transaction.
1169  */
1172 
1175 }
1176 
1177 /*
1178  * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1179  *
1180  * Note that we don't need to wait here if the transaction was prepared in a
1181  * parallel apply worker. In that case, we have already waited for the prepare
1182  * to finish in apply_handle_stream_prepare() which will ensure all the
1183  * operations in that transaction have happened in the subscriber, so no
1184  * concurrent transaction can cause deadlock or transaction dependency issues.
1185  */
1186 static void
1188 {
1189  LogicalRepCommitPreparedTxnData prepare_data;
1190  char gid[GIDSIZE];
1191 
1192  logicalrep_read_commit_prepared(s, &prepare_data);
1193  set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1194 
1195  /* Compute GID for two_phase transactions. */
1197  gid, sizeof(gid));
1198 
1199  /* There is no transaction when COMMIT PREPARED is called */
1201 
1202  /*
1203  * Update origin state so we can restart streaming from correct position
1204  * in case of crash.
1205  */
1206  replorigin_session_origin_lsn = prepare_data.end_lsn;
1208 
1209  FinishPreparedTransaction(gid, true);
1212  pgstat_report_stat(false);
1213 
1215  in_remote_transaction = false;
1216 
1217  /* Process any tables that are being synchronized in parallel. */
1218  process_syncing_tables(prepare_data.end_lsn);
1219 
1220  clear_subscription_skip_lsn(prepare_data.end_lsn);
1221 
1224 }
1225 
1226 /*
1227  * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1228  *
1229  * Note that we don't need to wait here if the transaction was prepared in a
1230  * parallel apply worker. In that case, we have already waited for the prepare
1231  * to finish in apply_handle_stream_prepare() which will ensure all the
1232  * operations in that transaction have happened in the subscriber, so no
1233  * concurrent transaction can cause deadlock or transaction dependency issues.
1234  */
1235 static void
1237 {
1238  LogicalRepRollbackPreparedTxnData rollback_data;
1239  char gid[GIDSIZE];
1240 
1241  logicalrep_read_rollback_prepared(s, &rollback_data);
1242  set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1243 
1244  /* Compute GID for two_phase transactions. */
1245  TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1246  gid, sizeof(gid));
1247 
1248  /*
1249  * It is possible that we haven't received prepare because it occurred
1250  * before walsender reached a consistent point or the two_phase was still
1251  * not enabled by that time, so in such cases, we need to skip rollback
1252  * prepared.
1253  */
1254  if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1255  rollback_data.prepare_time))
1256  {
1257  /*
1258  * Update origin state so we can restart streaming from correct
1259  * position in case of crash.
1260  */
1263 
1264  /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1266  FinishPreparedTransaction(gid, false);
1269 
1271  }
1272 
1273  pgstat_report_stat(false);
1274 
1276  in_remote_transaction = false;
1277 
1278  /* Process any tables that are being synchronized in parallel. */
1280 
1283 }
1284 
1285 /*
1286  * Handle STREAM PREPARE.
1287  */
1288 static void
1290 {
1291  LogicalRepPreparedTxnData prepare_data;
1292  ParallelApplyWorkerInfo *winfo;
1293  TransApplyAction apply_action;
1294 
1295  /* Save the message before it is consumed. */
1296  StringInfoData original_msg = *s;
1297 
1299  ereport(ERROR,
1300  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1301  errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1302 
1303  /* Tablesync should never receive prepare. */
1304  if (am_tablesync_worker())
1305  ereport(ERROR,
1306  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1307  errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1308 
1309  logicalrep_read_stream_prepare(s, &prepare_data);
1310  set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1311 
1312  apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1313 
1314  switch (apply_action)
1315  {
1316  case TRANS_LEADER_APPLY:
1317 
1318  /*
1319  * The transaction has been serialized to file, so replay all the
1320  * spooled operations.
1321  */
1323  prepare_data.xid, prepare_data.prepare_lsn);
1324 
1325  /* Mark the transaction as prepared. */
1326  apply_handle_prepare_internal(&prepare_data);
1327 
1329 
1331 
1332  in_remote_transaction = false;
1333 
1334  /* Unlink the files with serialized changes and subxact info. */
1336 
1337  elog(DEBUG1, "finished processing the STREAM PREPARE command");
1338  break;
1339 
1341  Assert(winfo);
1342 
1343  if (pa_send_data(winfo, s->len, s->data))
1344  {
1345  /* Finish processing the streaming transaction. */
1346  pa_xact_finish(winfo, prepare_data.end_lsn);
1347  break;
1348  }
1349 
1350  /*
1351  * Switch to serialize mode when we are not able to send the
1352  * change to parallel apply worker.
1353  */
1354  pa_switch_to_partial_serialize(winfo, true);
1355 
1356  /* fall through */
1358  Assert(winfo);
1359 
1360  stream_open_and_write_change(prepare_data.xid,
1362  &original_msg);
1363 
1365 
1366  /* Finish processing the streaming transaction. */
1367  pa_xact_finish(winfo, prepare_data.end_lsn);
1368  break;
1369 
1370  case TRANS_PARALLEL_APPLY:
1371 
1372  /*
1373  * If the parallel apply worker is applying spooled messages then
1374  * close the file before preparing.
1375  */
1376  if (stream_fd)
1378 
1380 
1381  /* Mark the transaction as prepared. */
1382  apply_handle_prepare_internal(&prepare_data);
1383 
1385 
1387 
1389 
1392 
1394 
1395  elog(DEBUG1, "finished processing the STREAM PREPARE command");
1396  break;
1397 
1398  default:
1399  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1400  break;
1401  }
1402 
1403  pgstat_report_stat(false);
1404 
1405  /* Process any tables that are being synchronized in parallel. */
1406  process_syncing_tables(prepare_data.end_lsn);
1407 
1408  /*
1409  * Similar to prepare case, the subskiplsn could be left in a case of
1410  * server crash but it's okay. See the comments in apply_handle_prepare().
1411  */
1414 
1416 
1418 }
1419 
1420 /*
1421  * Handle ORIGIN message.
1422  *
1423  * TODO, support tracking of multiple origins
1424  */
1425 static void
1427 {
1428  /*
1429  * ORIGIN message can only come inside streaming transaction or inside
1430  * remote transaction and before any actual writes.
1431  */
1432  if (!in_streamed_transaction &&
1435  ereport(ERROR,
1436  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1437  errmsg_internal("ORIGIN message sent out of order")));
1438 }
1439 
1440 /*
1441  * Initialize fileset (if not already done).
1442  *
1443  * Create a new file when first_segment is true, otherwise open the existing
1444  * file.
1445  */
1446 void
1447 stream_start_internal(TransactionId xid, bool first_segment)
1448 {
1450 
1451  /*
1452  * Initialize the worker's stream_fileset if we haven't yet. This will be
1453  * used for the entire duration of the worker so create it in a permanent
1454  * context. We create this on the very first streaming message from any
1455  * transaction and then use it for this and other streaming transactions.
1456  * Now, we could create a fileset at the start of the worker as well but
1457  * then we won't be sure that it will ever be used.
1458  */
1460  {
1461  MemoryContext oldctx;
1462 
1464 
1467 
1468  MemoryContextSwitchTo(oldctx);
1469  }
1470 
1471  /* Open the spool file for this transaction. */
1472  stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1473 
1474  /* If this is not the first segment, open existing subxact file. */
1475  if (!first_segment)
1477 
1479 }
1480 
1481 /*
1482  * Handle STREAM START message.
1483  */
1484 static void
1486 {
1487  bool first_segment;
1488  ParallelApplyWorkerInfo *winfo;
1489  TransApplyAction apply_action;
1490 
1491  /* Save the message before it is consumed. */
1492  StringInfoData original_msg = *s;
1493 
1495  ereport(ERROR,
1496  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1497  errmsg_internal("duplicate STREAM START message")));
1498 
1499  /* There must not be an active streaming transaction. */
1501 
1502  /* notify handle methods we're processing a remote transaction */
1503  in_streamed_transaction = true;
1504 
1505  /* extract XID of the top-level transaction */
1506  stream_xid = logicalrep_read_stream_start(s, &first_segment);
1507 
1509  ereport(ERROR,
1510  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1511  errmsg_internal("invalid transaction ID in streamed replication transaction")));
1512 
1514 
1515  /* Try to allocate a worker for the streaming transaction. */
1516  if (first_segment)
1518 
1519  apply_action = get_transaction_apply_action(stream_xid, &winfo);
1520 
1521  switch (apply_action)
1522  {
1524 
1525  /*
1526  * Function stream_start_internal starts a transaction. This
1527  * transaction will be committed on the stream stop unless it is a
1528  * tablesync worker in which case it will be committed after
1529  * processing all the messages. We need this transaction for
1530  * handling the BufFile, used for serializing the streaming data
1531  * and subxact info.
1532  */
1533  stream_start_internal(stream_xid, first_segment);
1534  break;
1535 
1537  Assert(winfo);
1538 
1539  /*
1540  * Once we start serializing the changes, the parallel apply
1541  * worker will wait for the leader to release the stream lock
1542  * until the end of the transaction. So, we don't need to release
1543  * the lock or increment the stream count in that case.
1544  */
1545  if (pa_send_data(winfo, s->len, s->data))
1546  {
1547  /*
1548  * Unlock the shared object lock so that the parallel apply
1549  * worker can continue to receive changes.
1550  */
1551  if (!first_segment)
1553 
1554  /*
1555  * Increment the number of streaming blocks waiting to be
1556  * processed by parallel apply worker.
1557  */
1559 
1560  /* Cache the parallel apply worker for this transaction. */
1562  break;
1563  }
1564 
1565  /*
1566  * Switch to serialize mode when we are not able to send the
1567  * change to parallel apply worker.
1568  */
1569  pa_switch_to_partial_serialize(winfo, !first_segment);
1570 
1571  /* fall through */
1573  Assert(winfo);
1574 
1575  /*
1576  * Open the spool file unless it was already opened when switching
1577  * to serialize mode. The transaction started in
1578  * stream_start_internal will be committed on the stream stop.
1579  */
1580  if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1581  stream_start_internal(stream_xid, first_segment);
1582 
1584 
1585  /* Cache the parallel apply worker for this transaction. */
1587  break;
1588 
1589  case TRANS_PARALLEL_APPLY:
1590  if (first_segment)
1591  {
1592  /* Hold the lock until the end of the transaction. */
1595 
1596  /*
1597  * Signal the leader apply worker, as it may be waiting for
1598  * us.
1599  */
1601  }
1602 
1604  break;
1605 
1606  default:
1607  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1608  break;
1609  }
1610 
1612 }
1613 
1614 /*
1615  * Update the information about subxacts and close the file.
1616  *
1617  * This function should be called when the stream_start_internal function has
1618  * been called.
1619  */
1620 void
1622 {
1623  /*
1624  * Serialize information about subxacts for the toplevel transaction, then
1625  * close the stream messages spool file.
1626  */
1629 
1630  /* We must be in a valid transaction state */
1632 
1633  /* Commit the per-stream transaction */
1635 
1636  /* Reset per-stream context */
1638 }
1639 
1640 /*
1641  * Handle STREAM STOP message.
1642  */
1643 static void
1645 {
1646  ParallelApplyWorkerInfo *winfo;
1647  TransApplyAction apply_action;
1648 
1650  ereport(ERROR,
1651  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1652  errmsg_internal("STREAM STOP message without STREAM START")));
1653 
1654  apply_action = get_transaction_apply_action(stream_xid, &winfo);
1655 
1656  switch (apply_action)
1657  {
1660  break;
1661 
1663  Assert(winfo);
1664 
1665  /*
1666  * Lock before sending the STREAM_STOP message so that the leader
1667  * can hold the lock first and the parallel apply worker will wait
1668  * for leader to release the lock. See Locking Considerations atop
1669  * applyparallelworker.c.
1670  */
1672 
1673  if (pa_send_data(winfo, s->len, s->data))
1674  {
1676  break;
1677  }
1678 
1679  /*
1680  * Switch to serialize mode when we are not able to send the
1681  * change to parallel apply worker.
1682  */
1683  pa_switch_to_partial_serialize(winfo, true);
1684 
1685  /* fall through */
1690  break;
1691 
1692  case TRANS_PARALLEL_APPLY:
1693  elog(DEBUG1, "applied %u changes in the streaming chunk",
1695 
1696  /*
1697  * By the time parallel apply worker is processing the changes in
1698  * the current streaming block, the leader apply worker may have
1699  * sent multiple streaming blocks. This can lead to parallel apply
1700  * worker start waiting even when there are more chunk of streams
1701  * in the queue. So, try to lock only if there is no message left
1702  * in the queue. See Locking Considerations atop
1703  * applyparallelworker.c.
1704  *
1705  * Note that here we have a race condition where we can start
1706  * waiting even when there are pending streaming chunks. This can
1707  * happen if the leader sends another streaming block and acquires
1708  * the stream lock again after the parallel apply worker checks
1709  * that there is no pending streaming block and before it actually
1710  * starts waiting on a lock. We can handle this case by not
1711  * allowing the leader to increment the stream block count during
1712  * the time parallel apply worker acquires the lock but it is not
1713  * clear whether that is worth the complexity.
1714  *
1715  * Now, if this missed chunk contains rollback to savepoint, then
1716  * there is a risk of deadlock which probably shouldn't happen
1717  * after restart.
1718  */
1720  break;
1721 
1722  default:
1723  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1724  break;
1725  }
1726 
1727  in_streamed_transaction = false;
1729 
1730  /*
1731  * The parallel apply worker could be in a transaction in which case we
1732  * need to report the state as STATE_IDLEINTRANSACTION.
1733  */
1736  else
1738 
1740 }
1741 
1742 /*
1743  * Helper function to handle STREAM ABORT message when the transaction was
1744  * serialized to file.
1745  */
1746 static void
1748 {
1749  /*
1750  * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1751  * just delete the files with serialized info.
1752  */
1753  if (xid == subxid)
1755  else
1756  {
1757  /*
1758  * OK, so it's a subxact. We need to read the subxact file for the
1759  * toplevel transaction, determine the offset tracked for the subxact,
1760  * and truncate the file with changes. We also remove the subxacts
1761  * with higher offsets (or rather higher XIDs).
1762  *
1763  * We intentionally scan the array from the tail, because we're likely
1764  * aborting a change for the most recent subtransactions.
1765  *
1766  * We can't use the binary search here as subxact XIDs won't
1767  * necessarily arrive in sorted order, consider the case where we have
1768  * released the savepoint for multiple subtransactions and then
1769  * performed rollback to savepoint for one of the earlier
1770  * sub-transaction.
1771  */
1772  int64 i;
1773  int64 subidx;
1774  BufFile *fd;
1775  bool found = false;
1776  char path[MAXPGPATH];
1777 
1778  subidx = -1;
1781 
1782  for (i = subxact_data.nsubxacts; i > 0; i--)
1783  {
1784  if (subxact_data.subxacts[i - 1].xid == subxid)
1785  {
1786  subidx = (i - 1);
1787  found = true;
1788  break;
1789  }
1790  }
1791 
1792  /*
1793  * If it's an empty sub-transaction then we will not find the subxid
1794  * here so just cleanup the subxact info and return.
1795  */
1796  if (!found)
1797  {
1798  /* Cleanup the subxact info */
1802  return;
1803  }
1804 
1805  /* open the changes file */
1808  O_RDWR, false);
1809 
1810  /* OK, truncate the file at the right offset */
1812  subxact_data.subxacts[subidx].offset);
1813  BufFileClose(fd);
1814 
1815  /* discard the subxacts added later */
1816  subxact_data.nsubxacts = subidx;
1817 
1818  /* write the updated subxact list */
1820 
1823  }
1824 }
1825 
1826 /*
1827  * Handle STREAM ABORT message.
1828  */
1829 static void
1831 {
1832  TransactionId xid;
1833  TransactionId subxid;
1834  LogicalRepStreamAbortData abort_data;
1835  ParallelApplyWorkerInfo *winfo;
1836  TransApplyAction apply_action;
1837 
1838  /* Save the message before it is consumed. */
1839  StringInfoData original_msg = *s;
1840  bool toplevel_xact;
1841 
1843  ereport(ERROR,
1844  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1845  errmsg_internal("STREAM ABORT message without STREAM STOP")));
1846 
1847  /* We receive abort information only when we can apply in parallel. */
1848  logicalrep_read_stream_abort(s, &abort_data,
1850 
1851  xid = abort_data.xid;
1852  subxid = abort_data.subxid;
1853  toplevel_xact = (xid == subxid);
1854 
1855  set_apply_error_context_xact(subxid, abort_data.abort_lsn);
1856 
1857  apply_action = get_transaction_apply_action(xid, &winfo);
1858 
1859  switch (apply_action)
1860  {
1861  case TRANS_LEADER_APPLY:
1862 
1863  /*
1864  * We are in the leader apply worker and the transaction has been
1865  * serialized to file.
1866  */
1867  stream_abort_internal(xid, subxid);
1868 
1869  elog(DEBUG1, "finished processing the STREAM ABORT command");
1870  break;
1871 
1873  Assert(winfo);
1874 
1875  /*
1876  * For the case of aborting the subtransaction, we increment the
1877  * number of streaming blocks and take the lock again before
1878  * sending the STREAM_ABORT to ensure that the parallel apply
1879  * worker will wait on the lock for the next set of changes after
1880  * processing the STREAM_ABORT message if it is not already
1881  * waiting for STREAM_STOP message.
1882  *
1883  * It is important to perform this locking before sending the
1884  * STREAM_ABORT message so that the leader can hold the lock first
1885  * and the parallel apply worker will wait for the leader to
1886  * release the lock. This is the same as what we do in
1887  * apply_handle_stream_stop. See Locking Considerations atop
1888  * applyparallelworker.c.
1889  */
1890  if (!toplevel_xact)
1891  {
1895  }
1896 
1897  if (pa_send_data(winfo, s->len, s->data))
1898  {
1899  /*
1900  * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
1901  * wait here for the parallel apply worker to finish as that
1902  * is not required to maintain the commit order and won't have
1903  * the risk of failures due to transaction dependencies and
1904  * deadlocks. However, it is possible that before the parallel
1905  * worker finishes and we clear the worker info, the xid
1906  * wraparound happens on the upstream and a new transaction
1907  * with the same xid can appear and that can lead to duplicate
1908  * entries in ParallelApplyTxnHash. Yet another problem could
1909  * be that we may have serialized the changes in partial
1910  * serialize mode and the file containing xact changes may
1911  * already exist, and after xid wraparound trying to create
1912  * the file for the same xid can lead to an error. To avoid
1913  * these problems, we decide to wait for the aborts to finish.
1914  *
1915  * Note, it is okay to not update the flush location position
1916  * for aborts as in worst case that means such a transaction
1917  * won't be sent again after restart.
1918  */
1919  if (toplevel_xact)
1921 
1922  break;
1923  }
1924 
1925  /*
1926  * Switch to serialize mode when we are not able to send the
1927  * change to parallel apply worker.
1928  */
1929  pa_switch_to_partial_serialize(winfo, true);
1930 
1931  /* fall through */
1933  Assert(winfo);
1934 
1935  /*
1936  * Parallel apply worker might have applied some changes, so write
1937  * the STREAM_ABORT message so that it can rollback the
1938  * subtransaction if needed.
1939  */
1941  &original_msg);
1942 
1943  if (toplevel_xact)
1944  {
1947  }
1948  break;
1949 
1950  case TRANS_PARALLEL_APPLY:
1951 
1952  /*
1953  * If the parallel apply worker is applying spooled messages then
1954  * close the file before aborting.
1955  */
1956  if (toplevel_xact && stream_fd)
1958 
1959  pa_stream_abort(&abort_data);
1960 
1961  /*
1962  * We need to wait after processing rollback to savepoint for the
1963  * next set of changes.
1964  *
1965  * We have a race condition here due to which we can start waiting
1966  * here when there are more chunk of streams in the queue. See
1967  * apply_handle_stream_stop.
1968  */
1969  if (!toplevel_xact)
1971 
1972  elog(DEBUG1, "finished processing the STREAM ABORT command");
1973  break;
1974 
1975  default:
1976  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1977  break;
1978  }
1979 
1981 }
1982 
1983 /*
1984  * Ensure that the passed location is fileset's end.
1985  */
1986 static void
1987 ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
1988  off_t offset)
1989 {
1990  char path[MAXPGPATH];
1991  BufFile *fd;
1992  int last_fileno;
1993  off_t last_offset;
1994 
1996 
1998 
2000 
2001  fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2002 
2003  BufFileSeek(fd, 0, 0, SEEK_END);
2004  BufFileTell(fd, &last_fileno, &last_offset);
2005 
2006  BufFileClose(fd);
2007 
2009 
2010  if (last_fileno != fileno || last_offset != offset)
2011  elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2012  path);
2013 }
2014 
2015 /*
2016  * Common spoolfile processing.
2017  */
2018 void
2020  XLogRecPtr lsn)
2021 {
2023  int nchanges;
2024  char path[MAXPGPATH];
2025  char *buffer = NULL;
2026  MemoryContext oldcxt;
2027  ResourceOwner oldowner;
2028  int fileno;
2029  off_t offset;
2030 
2031  if (!am_parallel_apply_worker())
2033 
2034  /* Make sure we have an open transaction */
2036 
2037  /*
2038  * Allocate file handle and memory required to process all the messages in
2039  * TopTransactionContext to avoid them getting reset after each message is
2040  * processed.
2041  */
2043 
2044  /* Open the spool file for the committed/prepared transaction */
2046  elog(DEBUG1, "replaying changes from file \"%s\"", path);
2047 
2048  /*
2049  * Make sure the file is owned by the toplevel transaction so that the
2050  * file will not be accidentally closed when aborting a subtransaction.
2051  */
2052  oldowner = CurrentResourceOwner;
2054 
2055  stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2056 
2057  CurrentResourceOwner = oldowner;
2058 
2059  buffer = palloc(BLCKSZ);
2060  initStringInfo(&s2);
2061 
2062  MemoryContextSwitchTo(oldcxt);
2063 
2064  remote_final_lsn = lsn;
2065 
2066  /*
2067  * Make sure the handle apply_dispatch methods are aware we're in a remote
2068  * transaction.
2069  */
2070  in_remote_transaction = true;
2072 
2074 
2075  /*
2076  * Read the entries one by one and pass them through the same logic as in
2077  * apply_dispatch.
2078  */
2079  nchanges = 0;
2080  while (true)
2081  {
2082  size_t nbytes;
2083  int len;
2084 
2086 
2087  /* read length of the on-disk record */
2088  nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2089 
2090  /* have we reached end of the file? */
2091  if (nbytes == 0)
2092  break;
2093 
2094  /* do we have a correct length? */
2095  if (len <= 0)
2096  elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2097  len, path);
2098 
2099  /* make sure we have sufficiently large buffer */
2100  buffer = repalloc(buffer, len);
2101 
2102  /* and finally read the data into the buffer */
2103  BufFileReadExact(stream_fd, buffer, len);
2104 
2105  BufFileTell(stream_fd, &fileno, &offset);
2106 
2107  /* copy the buffer to the stringinfo and call apply_dispatch */
2108  resetStringInfo(&s2);
2109  appendBinaryStringInfo(&s2, buffer, len);
2110 
2111  /* Ensure we are reading the data into our memory context. */
2113 
2114  apply_dispatch(&s2);
2115 
2117 
2118  MemoryContextSwitchTo(oldcxt);
2119 
2120  nchanges++;
2121 
2122  /*
2123  * It is possible the file has been closed because we have processed
2124  * the transaction end message like stream_commit in which case that
2125  * must be the last message.
2126  */
2127  if (!stream_fd)
2128  {
2129  ensure_last_message(stream_fileset, xid, fileno, offset);
2130  break;
2131  }
2132 
2133  if (nchanges % 1000 == 0)
2134  elog(DEBUG1, "replayed %d changes from file \"%s\"",
2135  nchanges, path);
2136  }
2137 
2138  if (stream_fd)
2140 
2141  elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2142  nchanges, path);
2143 
2144  return;
2145 }
2146 
2147 /*
2148  * Handle STREAM COMMIT message.
2149  */
2150 static void
2152 {
2153  TransactionId xid;
2154  LogicalRepCommitData commit_data;
2155  ParallelApplyWorkerInfo *winfo;
2156  TransApplyAction apply_action;
2157 
2158  /* Save the message before it is consumed. */
2159  StringInfoData original_msg = *s;
2160 
2162  ereport(ERROR,
2163  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2164  errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2165 
2166  xid = logicalrep_read_stream_commit(s, &commit_data);
2167  set_apply_error_context_xact(xid, commit_data.commit_lsn);
2168 
2169  apply_action = get_transaction_apply_action(xid, &winfo);
2170 
2171  switch (apply_action)
2172  {
2173  case TRANS_LEADER_APPLY:
2174 
2175  /*
2176  * The transaction has been serialized to file, so replay all the
2177  * spooled operations.
2178  */
2180  commit_data.commit_lsn);
2181 
2182  apply_handle_commit_internal(&commit_data);
2183 
2184  /* Unlink the files with serialized changes and subxact info. */
2186 
2187  elog(DEBUG1, "finished processing the STREAM COMMIT command");
2188  break;
2189 
2191  Assert(winfo);
2192 
2193  if (pa_send_data(winfo, s->len, s->data))
2194  {
2195  /* Finish processing the streaming transaction. */
2196  pa_xact_finish(winfo, commit_data.end_lsn);
2197  break;
2198  }
2199 
2200  /*
2201  * Switch to serialize mode when we are not able to send the
2202  * change to parallel apply worker.
2203  */
2204  pa_switch_to_partial_serialize(winfo, true);
2205 
2206  /* fall through */
2208  Assert(winfo);
2209 
2211  &original_msg);
2212 
2214 
2215  /* Finish processing the streaming transaction. */
2216  pa_xact_finish(winfo, commit_data.end_lsn);
2217  break;
2218 
2219  case TRANS_PARALLEL_APPLY:
2220 
2221  /*
2222  * If the parallel apply worker is applying spooled messages then
2223  * close the file before committing.
2224  */
2225  if (stream_fd)
2227 
2228  apply_handle_commit_internal(&commit_data);
2229 
2231 
2232  /*
2233  * It is important to set the transaction state as finished before
2234  * releasing the lock. See pa_wait_for_xact_finish.
2235  */
2238 
2240 
2241  elog(DEBUG1, "finished processing the STREAM COMMIT command");
2242  break;
2243 
2244  default:
2245  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2246  break;
2247  }
2248 
2249  /* Process any tables that are being synchronized in parallel. */
2250  process_syncing_tables(commit_data.end_lsn);
2251 
2253 
2255 }
2256 
2257 /*
2258  * Helper function for apply_handle_commit and apply_handle_stream_commit.
2259  */
2260 static void
2262 {
2263  if (is_skipping_changes())
2264  {
2266 
2267  /*
2268  * Start a new transaction to clear the subskiplsn, if not started
2269  * yet.
2270  */
2271  if (!IsTransactionState())
2273  }
2274 
2275  if (IsTransactionState())
2276  {
2277  /*
2278  * The transaction is either non-empty or skipped, so we clear the
2279  * subskiplsn.
2280  */
2282 
2283  /*
2284  * Update origin state so we can restart streaming from correct
2285  * position in case of crash.
2286  */
2287  replorigin_session_origin_lsn = commit_data->end_lsn;
2289 
2291 
2292  if (IsTransactionBlock())
2293  {
2294  EndTransactionBlock(false);
2296  }
2297 
2298  pgstat_report_stat(false);
2299 
2301  }
2302  else
2303  {
2304  /* Process any invalidation messages that might have accumulated. */
2307  }
2308 
2309  in_remote_transaction = false;
2310 }
2311 
2312 /*
2313  * Handle RELATION message.
2314  *
2315  * Note we don't do validation against local schema here. The validation
2316  * against local schema is postponed until first change for given relation
2317  * comes as we only care about it when applying changes for it anyway and we
2318  * do less locking this way.
2319  */
2320 static void
2322 {
2323  LogicalRepRelation *rel;
2324 
2326  return;
2327 
2328  rel = logicalrep_read_rel(s);
2330 
2331  /* Also reset all entries in the partition map that refer to remoterel. */
2333 }
2334 
2335 /*
2336  * Handle TYPE message.
2337  *
2338  * This implementation pays no attention to TYPE messages; we expect the user
2339  * to have set things up so that the incoming data is acceptable to the input
2340  * functions for the locally subscribed tables. Hence, we just read and
2341  * discard the message.
2342  */
2343 static void
2345 {
2346  LogicalRepTyp typ;
2347 
2349  return;
2350 
2351  logicalrep_read_typ(s, &typ);
2352 }
2353 
2354 /*
2355  * Check that we (the subscription owner) have sufficient privileges on the
2356  * target relation to perform the given operation.
2357  */
2358 static void
2360 {
2361  Oid relid;
2362  AclResult aclresult;
2363 
2364  relid = RelationGetRelid(rel);
2365  aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2366  if (aclresult != ACLCHECK_OK)
2367  aclcheck_error(aclresult,
2368  get_relkind_objtype(rel->rd_rel->relkind),
2369  get_rel_name(relid));
2370 
2371  /*
2372  * We lack the infrastructure to honor RLS policies. It might be possible
2373  * to add such infrastructure here, but tablesync workers lack it, too, so
2374  * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2375  * but it seems dangerous to replicate a TRUNCATE and then refuse to
2376  * replicate subsequent INSERTs, so we forbid all commands the same.
2377  */
2378  if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2379  ereport(ERROR,
2380  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2381  errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2382  GetUserNameFromId(GetUserId(), true),
2383  RelationGetRelationName(rel))));
2384 }
2385 
2386 /*
2387  * Handle INSERT message.
2388  */
2389 
2390 static void
2392 {
2393  LogicalRepRelMapEntry *rel;
2394  LogicalRepTupleData newtup;
2395  LogicalRepRelId relid;
2396  UserContext ucxt;
2397  ApplyExecutionData *edata;
2398  EState *estate;
2399  TupleTableSlot *remoteslot;
2400  MemoryContext oldctx;
2401  bool run_as_owner;
2402 
2403  /*
2404  * Quick return if we are skipping data modification changes or handling
2405  * streamed transactions.
2406  */
2407  if (is_skipping_changes() ||
2409  return;
2410 
2412 
2413  relid = logicalrep_read_insert(s, &newtup);
2414  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2415  if (!should_apply_changes_for_rel(rel))
2416  {
2417  /*
2418  * The relation can't become interesting in the middle of the
2419  * transaction so it's safe to unlock it.
2420  */
2423  return;
2424  }
2425 
2426  /*
2427  * Make sure that any user-supplied code runs as the table owner, unless
2428  * the user has opted out of that behavior.
2429  */
2430  run_as_owner = MySubscription->runasowner;
2431  if (!run_as_owner)
2432  SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2433 
2434  /* Set relation for error callback */
2436 
2437  /* Initialize the executor state. */
2438  edata = create_edata_for_relation(rel);
2439  estate = edata->estate;
2440  remoteslot = ExecInitExtraTupleSlot(estate,
2441  RelationGetDescr(rel->localrel),
2442  &TTSOpsVirtual);
2443 
2444  /* Process and store remote tuple in the slot */
2446  slot_store_data(remoteslot, rel, &newtup);
2447  slot_fill_defaults(rel, estate, remoteslot);
2448  MemoryContextSwitchTo(oldctx);
2449 
2450  /* For a partitioned table, insert the tuple into a partition. */
2451  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2453  remoteslot, NULL, CMD_INSERT);
2454  else
2456  remoteslot);
2457 
2458  finish_edata(edata);
2459 
2460  /* Reset relation for error callback */
2462 
2463  if (!run_as_owner)
2464  RestoreUserContext(&ucxt);
2465 
2467 
2469 }
2470 
2471 /*
2472  * Workhorse for apply_handle_insert()
2473  * relinfo is for the relation we're actually inserting into
2474  * (could be a child partition of edata->targetRelInfo)
2475  */
2476 static void
2478  ResultRelInfo *relinfo,
2479  TupleTableSlot *remoteslot)
2480 {
2481  EState *estate = edata->estate;
2482 
2483  /* We must open indexes here. */
2484  ExecOpenIndices(relinfo, false);
2485 
2486  /* Do the insert. */
2488  ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2489 
2490  /* Cleanup. */
2491  ExecCloseIndices(relinfo);
2492 }
2493 
2494 /*
2495  * Check if the logical replication relation is updatable and throw
2496  * appropriate error if it isn't.
2497  */
2498 static void
2500 {
2501  /*
2502  * For partitioned tables, we only need to care if the target partition is
2503  * updatable (aka has PK or RI defined for it).
2504  */
2505  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2506  return;
2507 
2508  /* Updatable, no error. */
2509  if (rel->updatable)
2510  return;
2511 
2512  /*
2513  * We are in error mode so it's fine this is somewhat slow. It's better to
2514  * give user correct error.
2515  */
2517  {
2518  ereport(ERROR,
2519  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2520  errmsg("publisher did not send replica identity column "
2521  "expected by the logical replication target relation \"%s.%s\"",
2522  rel->remoterel.nspname, rel->remoterel.relname)));
2523  }
2524 
2525  ereport(ERROR,
2526  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2527  errmsg("logical replication target relation \"%s.%s\" has "
2528  "neither REPLICA IDENTITY index nor PRIMARY "
2529  "KEY and published relation does not have "
2530  "REPLICA IDENTITY FULL",
2531  rel->remoterel.nspname, rel->remoterel.relname)));
2532 }
2533 
2534 /*
2535  * Handle UPDATE message.
2536  *
2537  * TODO: FDW support
2538  */
2539 static void
2541 {
2542  LogicalRepRelMapEntry *rel;
2543  LogicalRepRelId relid;
2544  UserContext ucxt;
2545  ApplyExecutionData *edata;
2546  EState *estate;
2547  LogicalRepTupleData oldtup;
2548  LogicalRepTupleData newtup;
2549  bool has_oldtup;
2550  TupleTableSlot *remoteslot;
2551  RTEPermissionInfo *target_perminfo;
2552  MemoryContext oldctx;
2553  bool run_as_owner;
2554 
2555  /*
2556  * Quick return if we are skipping data modification changes or handling
2557  * streamed transactions.
2558  */
2559  if (is_skipping_changes() ||
2561  return;
2562 
2564 
2565  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2566  &newtup);
2567  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2568  if (!should_apply_changes_for_rel(rel))
2569  {
2570  /*
2571  * The relation can't become interesting in the middle of the
2572  * transaction so it's safe to unlock it.
2573  */
2576  return;
2577  }
2578 
2579  /* Set relation for error callback */
2581 
2582  /* Check if we can do the update. */
2584 
2585  /*
2586  * Make sure that any user-supplied code runs as the table owner, unless
2587  * the user has opted out of that behavior.
2588  */
2589  run_as_owner = MySubscription->runasowner;
2590  if (!run_as_owner)
2591  SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2592 
2593  /* Initialize the executor state. */
2594  edata = create_edata_for_relation(rel);
2595  estate = edata->estate;
2596  remoteslot = ExecInitExtraTupleSlot(estate,
2597  RelationGetDescr(rel->localrel),
2598  &TTSOpsVirtual);
2599 
2600  /*
2601  * Populate updatedCols so that per-column triggers can fire, and so
2602  * executor can correctly pass down indexUnchanged hint. This could
2603  * include more columns than were actually changed on the publisher
2604  * because the logical replication protocol doesn't contain that
2605  * information. But it would for example exclude columns that only exist
2606  * on the subscriber, since we are not touching those.
2607  */
2608  target_perminfo = list_nth(estate->es_rteperminfos, 0);
2609  for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2610  {
2612  int remoteattnum = rel->attrmap->attnums[i];
2613 
2614  if (!att->attisdropped && remoteattnum >= 0)
2615  {
2616  Assert(remoteattnum < newtup.ncols);
2617  if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2618  target_perminfo->updatedCols =
2619  bms_add_member(target_perminfo->updatedCols,
2621  }
2622  }
2623 
2624  /* Build the search tuple. */
2626  slot_store_data(remoteslot, rel,
2627  has_oldtup ? &oldtup : &newtup);
2628  MemoryContextSwitchTo(oldctx);
2629 
2630  /* For a partitioned table, apply update to correct partition. */
2631  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2633  remoteslot, &newtup, CMD_UPDATE);
2634  else
2636  remoteslot, &newtup, rel->localindexoid);
2637 
2638  finish_edata(edata);
2639 
2640  /* Reset relation for error callback */
2642 
2643  if (!run_as_owner)
2644  RestoreUserContext(&ucxt);
2645 
2647 
2649 }
2650 
2651 /*
2652  * Workhorse for apply_handle_update()
2653  * relinfo is for the relation we're actually updating in
2654  * (could be a child partition of edata->targetRelInfo)
2655  */
2656 static void
2658  ResultRelInfo *relinfo,
2659  TupleTableSlot *remoteslot,
2660  LogicalRepTupleData *newtup,
2661  Oid localindexoid)
2662 {
2663  EState *estate = edata->estate;
2664  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2665  Relation localrel = relinfo->ri_RelationDesc;
2666  EPQState epqstate;
2667  TupleTableSlot *localslot;
2668  bool found;
2669  MemoryContext oldctx;
2670 
2671  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2672  ExecOpenIndices(relinfo, false);
2673 
2674  found = FindReplTupleInLocalRel(edata, localrel,
2675  &relmapentry->remoterel,
2676  localindexoid,
2677  remoteslot, &localslot);
2678  ExecClearTuple(remoteslot);
2679 
2680  /*
2681  * Tuple found.
2682  *
2683  * Note this will fail if there are other conflicting unique indexes.
2684  */
2685  if (found)
2686  {
2687  /* Process and store remote tuple in the slot */
2689  slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2690  MemoryContextSwitchTo(oldctx);
2691 
2692  EvalPlanQualSetSlot(&epqstate, remoteslot);
2693 
2694  /* Do the actual update. */
2696  ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2697  remoteslot);
2698  }
2699  else
2700  {
2701  /*
2702  * The tuple to be updated could not be found. Do nothing except for
2703  * emitting a log message.
2704  *
2705  * XXX should this be promoted to ereport(LOG) perhaps?
2706  */
2707  elog(DEBUG1,
2708  "logical replication did not find row to be updated "
2709  "in replication target relation \"%s\"",
2710  RelationGetRelationName(localrel));
2711  }
2712 
2713  /* Cleanup. */
2714  ExecCloseIndices(relinfo);
2715  EvalPlanQualEnd(&epqstate);
2716 }
2717 
2718 /*
2719  * Handle DELETE message.
2720  *
2721  * TODO: FDW support
2722  */
2723 static void
2725 {
2726  LogicalRepRelMapEntry *rel;
2727  LogicalRepTupleData oldtup;
2728  LogicalRepRelId relid;
2729  UserContext ucxt;
2730  ApplyExecutionData *edata;
2731  EState *estate;
2732  TupleTableSlot *remoteslot;
2733  MemoryContext oldctx;
2734  bool run_as_owner;
2735 
2736  /*
2737  * Quick return if we are skipping data modification changes or handling
2738  * streamed transactions.
2739  */
2740  if (is_skipping_changes() ||
2742  return;
2743 
2745 
2746  relid = logicalrep_read_delete(s, &oldtup);
2747  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2748  if (!should_apply_changes_for_rel(rel))
2749  {
2750  /*
2751  * The relation can't become interesting in the middle of the
2752  * transaction so it's safe to unlock it.
2753  */
2756  return;
2757  }
2758 
2759  /* Set relation for error callback */
2761 
2762  /* Check if we can do the delete. */
2764 
2765  /*
2766  * Make sure that any user-supplied code runs as the table owner, unless
2767  * the user has opted out of that behavior.
2768  */
2769  run_as_owner = MySubscription->runasowner;
2770  if (!run_as_owner)
2771  SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2772 
2773  /* Initialize the executor state. */
2774  edata = create_edata_for_relation(rel);
2775  estate = edata->estate;
2776  remoteslot = ExecInitExtraTupleSlot(estate,
2777  RelationGetDescr(rel->localrel),
2778  &TTSOpsVirtual);
2779 
2780  /* Build the search tuple. */
2782  slot_store_data(remoteslot, rel, &oldtup);
2783  MemoryContextSwitchTo(oldctx);
2784 
2785  /* For a partitioned table, apply delete to correct partition. */
2786  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2788  remoteslot, NULL, CMD_DELETE);
2789  else
2791  remoteslot, rel->localindexoid);
2792 
2793  finish_edata(edata);
2794 
2795  /* Reset relation for error callback */
2797 
2798  if (!run_as_owner)
2799  RestoreUserContext(&ucxt);
2800 
2802 
2804 }
2805 
2806 /*
2807  * Workhorse for apply_handle_delete()
2808  * relinfo is for the relation we're actually deleting from
2809  * (could be a child partition of edata->targetRelInfo)
2810  */
2811 static void
2813  ResultRelInfo *relinfo,
2814  TupleTableSlot *remoteslot,
2815  Oid localindexoid)
2816 {
2817  EState *estate = edata->estate;
2818  Relation localrel = relinfo->ri_RelationDesc;
2819  LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
2820  EPQState epqstate;
2821  TupleTableSlot *localslot;
2822  bool found;
2823 
2824  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2825  ExecOpenIndices(relinfo, false);
2826 
2827  found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
2828  remoteslot, &localslot);
2829 
2830  /* If found delete it. */
2831  if (found)
2832  {
2833  EvalPlanQualSetSlot(&epqstate, localslot);
2834 
2835  /* Do the actual delete. */
2837  ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
2838  }
2839  else
2840  {
2841  /*
2842  * The tuple to be deleted could not be found. Do nothing except for
2843  * emitting a log message.
2844  *
2845  * XXX should this be promoted to ereport(LOG) perhaps?
2846  */
2847  elog(DEBUG1,
2848  "logical replication did not find row to be deleted "
2849  "in replication target relation \"%s\"",
2850  RelationGetRelationName(localrel));
2851  }
2852 
2853  /* Cleanup. */
2854  ExecCloseIndices(relinfo);
2855  EvalPlanQualEnd(&epqstate);
2856 }
2857 
2858 /*
2859  * Try to find a tuple received from the publication side (in 'remoteslot') in
2860  * the corresponding local relation using either replica identity index,
2861  * primary key, index or if needed, sequential scan.
2862  *
2863  * Local tuple, if found, is returned in '*localslot'.
2864  */
2865 static bool
2867  LogicalRepRelation *remoterel,
2868  Oid localidxoid,
2869  TupleTableSlot *remoteslot,
2870  TupleTableSlot **localslot)
2871 {
2872  EState *estate = edata->estate;
2873  bool found;
2874 
2875  /*
2876  * Regardless of the top-level operation, we're performing a read here, so
2877  * check for SELECT privileges.
2878  */
2879  TargetPrivilegesCheck(localrel, ACL_SELECT);
2880 
2881  *localslot = table_slot_create(localrel, &estate->es_tupleTable);
2882 
2883  Assert(OidIsValid(localidxoid) ||
2884  (remoterel->replident == REPLICA_IDENTITY_FULL));
2885 
2886  if (OidIsValid(localidxoid))
2887  {
2888 #ifdef USE_ASSERT_CHECKING
2889  Relation idxrel = index_open(localidxoid, AccessShareLock);
2890 
2891  /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
2892  Assert(GetRelationIdentityOrPK(idxrel) == localidxoid ||
2894  edata->targetRel->attrmap));
2895  index_close(idxrel, AccessShareLock);
2896 #endif
2897 
2898  found = RelationFindReplTupleByIndex(localrel, localidxoid,
2900  remoteslot, *localslot);
2901  }
2902  else
2903  found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
2904  remoteslot, *localslot);
2905 
2906  return found;
2907 }
2908 
2909 /*
2910  * This handles insert, update, delete on a partitioned table.
2911  */
2912 static void
2914  TupleTableSlot *remoteslot,
2915  LogicalRepTupleData *newtup,
2916  CmdType operation)
2917 {
2918  EState *estate = edata->estate;
2919  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2920  ResultRelInfo *relinfo = edata->targetRelInfo;
2921  Relation parentrel = relinfo->ri_RelationDesc;
2922  ModifyTableState *mtstate;
2923  PartitionTupleRouting *proute;
2924  ResultRelInfo *partrelinfo;
2925  Relation partrel;
2926  TupleTableSlot *remoteslot_part;
2927  TupleConversionMap *map;
2928  MemoryContext oldctx;
2929  LogicalRepRelMapEntry *part_entry = NULL;
2930  AttrMap *attrmap = NULL;
2931 
2932  /* ModifyTableState is needed for ExecFindPartition(). */
2933  edata->mtstate = mtstate = makeNode(ModifyTableState);
2934  mtstate->ps.plan = NULL;
2935  mtstate->ps.state = estate;
2936  mtstate->operation = operation;
2937  mtstate->resultRelInfo = relinfo;
2938 
2939  /* ... as is PartitionTupleRouting. */
2940  edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
2941 
2942  /*
2943  * Find the partition to which the "search tuple" belongs.
2944  */
2945  Assert(remoteslot != NULL);
2947  partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
2948  remoteslot, estate);
2949  Assert(partrelinfo != NULL);
2950  partrel = partrelinfo->ri_RelationDesc;
2951 
2952  /*
2953  * Check for supported relkind. We need this since partitions might be of
2954  * unsupported relkinds; and the set of partitions can change, so checking
2955  * at CREATE/ALTER SUBSCRIPTION would be insufficient.
2956  */
2957  CheckSubscriptionRelkind(partrel->rd_rel->relkind,
2959  RelationGetRelationName(partrel));
2960 
2961  /*
2962  * To perform any of the operations below, the tuple must match the
2963  * partition's rowtype. Convert if needed or just copy, using a dedicated
2964  * slot to store the tuple in any case.
2965  */
2966  remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
2967  if (remoteslot_part == NULL)
2968  remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
2969  map = ExecGetRootToChildMap(partrelinfo, estate);
2970  if (map != NULL)
2971  {
2972  attrmap = map->attrMap;
2973  remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
2974  remoteslot_part);
2975  }
2976  else
2977  {
2978  remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
2979  slot_getallattrs(remoteslot_part);
2980  }
2981  MemoryContextSwitchTo(oldctx);
2982 
2983  /* Check if we can do the update or delete on the leaf partition. */
2984  if (operation == CMD_UPDATE || operation == CMD_DELETE)
2985  {
2986  part_entry = logicalrep_partition_open(relmapentry, partrel,
2987  attrmap);
2988  check_relation_updatable(part_entry);
2989  }
2990 
2991  switch (operation)
2992  {
2993  case CMD_INSERT:
2994  apply_handle_insert_internal(edata, partrelinfo,
2995  remoteslot_part);
2996  break;
2997 
2998  case CMD_DELETE:
2999  apply_handle_delete_internal(edata, partrelinfo,
3000  remoteslot_part,
3001  part_entry->localindexoid);
3002  break;
3003 
3004  case CMD_UPDATE:
3005 
3006  /*
3007  * For UPDATE, depending on whether or not the updated tuple
3008  * satisfies the partition's constraint, perform a simple UPDATE
3009  * of the partition or move the updated tuple into a different
3010  * suitable partition.
3011  */
3012  {
3013  TupleTableSlot *localslot;
3014  ResultRelInfo *partrelinfo_new;
3015  Relation partrel_new;
3016  bool found;
3017 
3018  /* Get the matching local tuple from the partition. */
3019  found = FindReplTupleInLocalRel(edata, partrel,
3020  &part_entry->remoterel,
3021  part_entry->localindexoid,
3022  remoteslot_part, &localslot);
3023  if (!found)
3024  {
3025  /*
3026  * The tuple to be updated could not be found. Do nothing
3027  * except for emitting a log message.
3028  *
3029  * XXX should this be promoted to ereport(LOG) perhaps?
3030  */
3031  elog(DEBUG1,
3032  "logical replication did not find row to be updated "
3033  "in replication target relation's partition \"%s\"",
3034  RelationGetRelationName(partrel));
3035  return;
3036  }
3037 
3038  /*
3039  * Apply the update to the local tuple, putting the result in
3040  * remoteslot_part.
3041  */
3043  slot_modify_data(remoteslot_part, localslot, part_entry,
3044  newtup);
3045  MemoryContextSwitchTo(oldctx);
3046 
3047  /*
3048  * Does the updated tuple still satisfy the current
3049  * partition's constraint?
3050  */
3051  if (!partrel->rd_rel->relispartition ||
3052  ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3053  false))
3054  {
3055  /*
3056  * Yes, so simply UPDATE the partition. We don't call
3057  * apply_handle_update_internal() here, which would
3058  * normally do the following work, to avoid repeating some
3059  * work already done above to find the local tuple in the
3060  * partition.
3061  */
3062  EPQState epqstate;
3063 
3064  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3065  ExecOpenIndices(partrelinfo, false);
3066 
3067  EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3069  ACL_UPDATE);
3070  ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3071  localslot, remoteslot_part);
3072  ExecCloseIndices(partrelinfo);
3073  EvalPlanQualEnd(&epqstate);
3074  }
3075  else
3076  {
3077  /* Move the tuple into the new partition. */
3078 
3079  /*
3080  * New partition will be found using tuple routing, which
3081  * can only occur via the parent table. We might need to
3082  * convert the tuple to the parent's rowtype. Note that
3083  * this is the tuple found in the partition, not the
3084  * original search tuple received by this function.
3085  */
3086  if (map)
3087  {
3088  TupleConversionMap *PartitionToRootMap =
3090  RelationGetDescr(parentrel));
3091 
3092  remoteslot =
3093  execute_attr_map_slot(PartitionToRootMap->attrMap,
3094  remoteslot_part, remoteslot);
3095  }
3096  else
3097  {
3098  remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3099  slot_getallattrs(remoteslot);
3100  }
3101 
3102  /* Find the new partition. */
3104  partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3105  proute, remoteslot,
3106  estate);
3107  MemoryContextSwitchTo(oldctx);
3108  Assert(partrelinfo_new != partrelinfo);
3109  partrel_new = partrelinfo_new->ri_RelationDesc;
3110 
3111  /* Check that new partition also has supported relkind. */
3112  CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3114  RelationGetRelationName(partrel_new));
3115 
3116  /* DELETE old tuple found in the old partition. */
3117  apply_handle_delete_internal(edata, partrelinfo,
3118  localslot,
3119  part_entry->localindexoid);
3120 
3121  /* INSERT new tuple into the new partition. */
3122 
3123  /*
3124  * Convert the replacement tuple to match the destination
3125  * partition rowtype.
3126  */
3128  remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3129  if (remoteslot_part == NULL)
3130  remoteslot_part = table_slot_create(partrel_new,
3131  &estate->es_tupleTable);
3132  map = ExecGetRootToChildMap(partrelinfo_new, estate);
3133  if (map != NULL)
3134  {
3135  remoteslot_part = execute_attr_map_slot(map->attrMap,
3136  remoteslot,
3137  remoteslot_part);
3138  }
3139  else
3140  {
3141  remoteslot_part = ExecCopySlot(remoteslot_part,
3142  remoteslot);
3143  slot_getallattrs(remoteslot);
3144  }
3145  MemoryContextSwitchTo(oldctx);
3146  apply_handle_insert_internal(edata, partrelinfo_new,
3147  remoteslot_part);
3148  }
3149  }
3150  break;
3151 
3152  default:
3153  elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3154  break;
3155  }
3156 }
3157 
3158 /*
3159  * Handle TRUNCATE message.
3160  *
3161  * TODO: FDW support
3162  */
3163 static void
3165 {
3166  bool cascade = false;
3167  bool restart_seqs = false;
3168  List *remote_relids = NIL;
3169  List *remote_rels = NIL;
3170  List *rels = NIL;
3171  List *part_rels = NIL;
3172  List *relids = NIL;
3173  List *relids_logged = NIL;
3174  ListCell *lc;
3175  LOCKMODE lockmode = AccessExclusiveLock;
3176 
3177  /*
3178  * Quick return if we are skipping data modification changes or handling
3179  * streamed transactions.
3180  */
3181  if (is_skipping_changes() ||
3183  return;
3184 
3186 
3187  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3188 
3189  foreach(lc, remote_relids)
3190  {
3191  LogicalRepRelId relid = lfirst_oid(lc);
3192  LogicalRepRelMapEntry *rel;
3193 
3194  rel = logicalrep_rel_open(relid, lockmode);
3195  if (!should_apply_changes_for_rel(rel))
3196  {
3197  /*
3198  * The relation can't become interesting in the middle of the
3199  * transaction so it's safe to unlock it.
3200  */
3201  logicalrep_rel_close(rel, lockmode);
3202  continue;
3203  }
3204 
3205  remote_rels = lappend(remote_rels, rel);
3207  rels = lappend(rels, rel->localrel);
3208  relids = lappend_oid(relids, rel->localreloid);
3210  relids_logged = lappend_oid(relids_logged, rel->localreloid);
3211 
3212  /*
3213  * Truncate partitions if we got a message to truncate a partitioned
3214  * table.
3215  */
3216  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3217  {
3218  ListCell *child;
3219  List *children = find_all_inheritors(rel->localreloid,
3220  lockmode,
3221  NULL);
3222 
3223  foreach(child, children)
3224  {
3225  Oid childrelid = lfirst_oid(child);
3226  Relation childrel;
3227 
3228  if (list_member_oid(relids, childrelid))
3229  continue;
3230 
3231  /* find_all_inheritors already got lock */
3232  childrel = table_open(childrelid, NoLock);
3233 
3234  /*
3235  * Ignore temp tables of other backends. See similar code in
3236  * ExecuteTruncate().
3237  */
3238  if (RELATION_IS_OTHER_TEMP(childrel))
3239  {
3240  table_close(childrel, lockmode);
3241  continue;
3242  }
3243 
3245  rels = lappend(rels, childrel);
3246  part_rels = lappend(part_rels, childrel);
3247  relids = lappend_oid(relids, childrelid);
3248  /* Log this relation only if needed for logical decoding */
3249  if (RelationIsLogicallyLogged(childrel))
3250  relids_logged = lappend_oid(relids_logged, childrelid);
3251  }
3252  }
3253  }
3254 
3255  /*
3256  * Even if we used CASCADE on the upstream primary we explicitly default
3257  * to replaying changes without further cascading. This might be later
3258  * changeable with a user specified option.
3259  *
3260  * MySubscription->runasowner tells us whether we want to execute
3261  * replication actions as the subscription owner; the last argument to
3262  * TruncateGuts tells it whether we want to switch to the table owner.
3263  * Those are exactly opposite conditions.
3264  */
3265  ExecuteTruncateGuts(rels,
3266  relids,
3267  relids_logged,
3268  DROP_RESTRICT,
3269  restart_seqs,
3271  foreach(lc, remote_rels)
3272  {
3273  LogicalRepRelMapEntry *rel = lfirst(lc);
3274 
3276  }
3277  foreach(lc, part_rels)
3278  {
3279  Relation rel = lfirst(lc);
3280 
3281  table_close(rel, NoLock);
3282  }
3283 
3285 }
3286 
3287 
3288 /*
3289  * Logical replication protocol message dispatcher.
3290  */
3291 void
3293 {
3295  LogicalRepMsgType saved_command;
3296 
3297  /*
3298  * Set the current command being applied. Since this function can be
3299  * called recursively when applying spooled changes, save the current
3300  * command.
3301  */
3302  saved_command = apply_error_callback_arg.command;
3304 
3305  switch (action)
3306  {
3307  case LOGICAL_REP_MSG_BEGIN:
3308  apply_handle_begin(s);
3309  break;
3310 
3313  break;
3314 
3317  break;
3318 
3321  break;
3322 
3325  break;
3326 
3329  break;
3330 
3333  break;
3334 
3335  case LOGICAL_REP_MSG_TYPE:
3336  apply_handle_type(s);
3337  break;
3338 
3341  break;
3342 
3344 
3345  /*
3346  * Logical replication does not use generic logical messages yet.
3347  * Although, it could be used by other applications that use this
3348  * output plugin.
3349  */
3350  break;
3351 
3354  break;
3355 
3358  break;
3359 
3362  break;
3363 
3366  break;
3367 
3370  break;
3371 
3374  break;
3375 
3378  break;
3379 
3382  break;
3383 
3386  break;
3387 
3388  default:
3389  ereport(ERROR,
3390  (errcode(ERRCODE_PROTOCOL_VIOLATION),
3391  errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3392  }
3393 
3394  /* Reset the current command */
3395  apply_error_callback_arg.command = saved_command;
3396 }
3397 
3398 /*
3399  * Figure out which write/flush positions to report to the walsender process.
3400  *
3401  * We can't simply report back the last LSN the walsender sent us because the
3402  * local transaction might not yet be flushed to disk locally. Instead we
3403  * build a list that associates local with remote LSNs for every commit. When
3404  * reporting back the flush position to the sender we iterate that list and
3405  * check which entries on it are already locally flushed. Those we can report
3406  * as having been flushed.
3407  *
3408  * The have_pending_txes is true if there are outstanding transactions that
3409  * need to be flushed.
3410  */
3411 static void
3413  bool *have_pending_txes)
3414 {
3415  dlist_mutable_iter iter;
3416  XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3417 
3419  *flush = InvalidXLogRecPtr;
3420 
3422  {
3423  FlushPosition *pos =
3424  dlist_container(FlushPosition, node, iter.cur);
3425 
3426  *write = pos->remote_end;
3427 
3428  if (pos->local_end <= local_flush)
3429  {
3430  *flush = pos->remote_end;
3431  dlist_delete(iter.cur);
3432  pfree(pos);
3433  }
3434  else
3435  {
3436  /*
3437  * Don't want to uselessly iterate over the rest of the list which
3438  * could potentially be long. Instead get the last element and
3439  * grab the write position from there.
3440  */
3441  pos = dlist_tail_element(FlushPosition, node,
3442  &lsn_mapping);
3443  *write = pos->remote_end;
3444  *have_pending_txes = true;
3445  return;
3446  }
3447  }
3448 
3449  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3450 }
3451 
3452 /*
3453  * Store current remote/local lsn pair in the tracking list.
3454  */
3455 void
3457 {
3458  FlushPosition *flushpos;
3459 
3460  /*
3461  * Skip for parallel apply workers, because the lsn_mapping is maintained
3462  * by the leader apply worker.
3463  */
3465  return;
3466 
3467  /* Need to do this in permanent context */
3469 
3470  /* Track commit lsn */
3471  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
3472  flushpos->local_end = local_lsn;
3473  flushpos->remote_end = remote_lsn;
3474 
3475  dlist_push_tail(&lsn_mapping, &flushpos->node);
3477 }
3478 
3479 
3480 /* Update statistics of the worker. */
3481 static void
3482 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
3483 {
3484  MyLogicalRepWorker->last_lsn = last_lsn;
3485  MyLogicalRepWorker->last_send_time = send_time;
3487  if (reply)
3488  {
3489  MyLogicalRepWorker->reply_lsn = last_lsn;
3490  MyLogicalRepWorker->reply_time = send_time;
3491  }
3492 }
3493 
3494 /*
3495  * Apply main loop.
3496  */
3497 static void
3499 {
3500  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3501  bool ping_sent = false;
3502  TimeLineID tli;
3503  ErrorContextCallback errcallback;
3504 
3505  /*
3506  * Init the ApplyMessageContext which we clean up after each replication
3507  * protocol message.
3508  */
3510  "ApplyMessageContext",
3512 
3513  /*
3514  * This memory context is used for per-stream data when the streaming mode
3515  * is enabled. This context is reset on each stream stop.
3516  */
3518  "LogicalStreamingContext",
3520 
3521  /* mark as idle, before starting to loop */
3523 
3524  /*
3525  * Push apply error context callback. Fields will be filled while applying
3526  * a change.
3527  */
3528  errcallback.callback = apply_error_callback;
3529  errcallback.previous = error_context_stack;
3530  error_context_stack = &errcallback;
3532 
3533  /* This outer loop iterates once per wait. */
3534  for (;;)
3535  {
3537  int rc;
3538  int len;
3539  char *buf = NULL;
3540  bool endofstream = false;
3541  long wait_time;
3542 
3544 
3546 
3548 
3549  if (len != 0)
3550  {
3551  /* Loop to process all available data (without blocking). */
3552  for (;;)
3553  {
3555 
3556  if (len == 0)
3557  {
3558  break;
3559  }
3560  else if (len < 0)
3561  {
3562  ereport(LOG,
3563  (errmsg("data stream from publisher has ended")));
3564  endofstream = true;
3565  break;
3566  }
3567  else
3568  {
3569  int c;
3570  StringInfoData s;
3571 
3572  if (ConfigReloadPending)
3573  {
3574  ConfigReloadPending = false;
3576  }
3577 
3578  /* Reset timeout. */
3579  last_recv_timestamp = GetCurrentTimestamp();
3580  ping_sent = false;
3581 
3582  /* Ensure we are reading the data into our memory context. */
3584 
3585  s.data = buf;
3586  s.len = len;
3587  s.cursor = 0;
3588  s.maxlen = -1;
3589 
3590  c = pq_getmsgbyte(&s);
3591 
3592  if (c == 'w')
3593  {
3594  XLogRecPtr start_lsn;
3595  XLogRecPtr end_lsn;
3596  TimestampTz send_time;
3597 
3598  start_lsn = pq_getmsgint64(&s);
3599  end_lsn = pq_getmsgint64(&s);
3600  send_time = pq_getmsgint64(&s);
3601 
3602  if (last_received < start_lsn)
3603  last_received = start_lsn;
3604 
3605  if (last_received < end_lsn)
3606  last_received = end_lsn;
3607 
3608  UpdateWorkerStats(last_received, send_time, false);
3609 
3610  apply_dispatch(&s);
3611  }
3612  else if (c == 'k')
3613  {
3614  XLogRecPtr end_lsn;
3616  bool reply_requested;
3617 
3618  end_lsn = pq_getmsgint64(&s);
3619  timestamp = pq_getmsgint64(&s);
3620  reply_requested = pq_getmsgbyte(&s);
3621 
3622  if (last_received < end_lsn)
3623  last_received = end_lsn;
3624 
3625  send_feedback(last_received, reply_requested, false);
3626  UpdateWorkerStats(last_received, timestamp, true);
3627  }
3628  /* other message types are purposefully ignored */
3629 
3631  }
3632 
3634  }
3635  }
3636 
3637  /* confirm all writes so far */
3638  send_feedback(last_received, false, false);
3639 
3641  {
3642  /*
3643  * If we didn't get any transactions for a while there might be
3644  * unconsumed invalidation messages in the queue, consume them
3645  * now.
3646  */
3649 
3650  /* Process any table synchronization changes. */
3651  process_syncing_tables(last_received);
3652  }
3653 
3654  /* Cleanup the memory. */
3657 
3658  /* Check if we need to exit the streaming loop. */
3659  if (endofstream)
3660  break;
3661 
3662  /*
3663  * Wait for more data or latch. If we have unflushed transactions,
3664  * wake up after WalWriterDelay to see if they've been flushed yet (in
3665  * which case we should send a feedback message). Otherwise, there's
3666  * no particular urgency about waking up unless we get data or a
3667  * signal.
3668  */
3669  if (!dlist_is_empty(&lsn_mapping))
3670  wait_time = WalWriterDelay;
3671  else
3672  wait_time = NAPTIME_PER_CYCLE;
3673 
3677  fd, wait_time,
3678  WAIT_EVENT_LOGICAL_APPLY_MAIN);
3679 
3680  if (rc & WL_LATCH_SET)
3681  {
3684  }
3685 
3686  if (ConfigReloadPending)
3687  {
3688  ConfigReloadPending = false;
3690  }
3691 
3692  if (rc & WL_TIMEOUT)
3693  {
3694  /*
3695  * We didn't receive anything new. If we haven't heard anything
3696  * from the server for more than wal_receiver_timeout / 2, ping
3697  * the server. Also, if it's been longer than
3698  * wal_receiver_status_interval since the last update we sent,
3699  * send a status update to the primary anyway, to report any
3700  * progress in applying WAL.
3701  */
3702  bool requestReply = false;
3703 
3704  /*
3705  * Check if time since last receive from primary has reached the
3706  * configured limit.
3707  */
3708  if (wal_receiver_timeout > 0)
3709  {
3711  TimestampTz timeout;
3712 
3713  timeout =
3714  TimestampTzPlusMilliseconds(last_recv_timestamp,
3716 
3717  if (now >= timeout)
3718  ereport(ERROR,
3719  (errcode(ERRCODE_CONNECTION_FAILURE),
3720  errmsg("terminating logical replication worker due to timeout")));
3721 
3722  /* Check to see if it's time for a ping. */
3723  if (!ping_sent)
3724  {
3725  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
3726  (wal_receiver_timeout / 2));
3727  if (now >= timeout)
3728  {
3729  requestReply = true;
3730  ping_sent = true;
3731  }
3732  }
3733  }
3734 
3735  send_feedback(last_received, requestReply, requestReply);
3736 
3737  /*
3738  * Force reporting to ensure long idle periods don't lead to
3739  * arbitrarily delayed stats. Stats can only be reported outside
3740  * of (implicit or explicit) transactions. That shouldn't lead to
3741  * stats being delayed for long, because transactions are either
3742  * sent as a whole on commit or streamed. Streamed transactions
3743  * are spilled to disk and applied on commit.
3744  */
3745  if (!IsTransactionState())
3746  pgstat_report_stat(true);
3747  }
3748  }
3749 
3750  /* Pop the error context stack */
3751  error_context_stack = errcallback.previous;
3753 
3754  /* All done */
3756 }
3757 
3758 /*
3759  * Send a Standby Status Update message to server.
3760  *
3761  * 'recvpos' is the latest LSN we've received data to, force is set if we need
3762  * to send a response to avoid timeouts.
3763  */
3764 static void
3765 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
3766 {
3767  static StringInfo reply_message = NULL;
3768  static TimestampTz send_time = 0;
3769 
3770  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
3771  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
3772  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
3773 
3774  XLogRecPtr writepos;
3775  XLogRecPtr flushpos;
3776  TimestampTz now;
3777  bool have_pending_txes;
3778 
3779  /*
3780  * If the user doesn't want status to be reported to the publisher, be
3781  * sure to exit before doing anything at all.
3782  */
3783  if (!force && wal_receiver_status_interval <= 0)
3784  return;
3785 
3786  /* It's legal to not pass a recvpos */
3787  if (recvpos < last_recvpos)
3788  recvpos = last_recvpos;
3789 
3790  get_flush_position(&writepos, &flushpos, &have_pending_txes);
3791 
3792  /*
3793  * No outstanding transactions to flush, we can report the latest received
3794  * position. This is important for synchronous replication.
3795  */
3796  if (!have_pending_txes)
3797  flushpos = writepos = recvpos;
3798 
3799  if (writepos < last_writepos)
3800  writepos = last_writepos;
3801 
3802  if (flushpos < last_flushpos)
3803  flushpos = last_flushpos;
3804 
3806 
3807  /* if we've already reported everything we're good */
3808  if (!force &&
3809  writepos == last_writepos &&
3810  flushpos == last_flushpos &&
3811  !TimestampDifferenceExceeds(send_time, now,
3813  return;
3814  send_time = now;
3815 
3816  if (!reply_message)
3817  {
3819 
3821  MemoryContextSwitchTo(oldctx);
3822  }
3823  else
3825 
3826  pq_sendbyte(reply_message, 'r');
3827  pq_sendint64(reply_message, recvpos); /* write */
3828  pq_sendint64(reply_message, flushpos); /* flush */
3829  pq_sendint64(reply_message, writepos); /* apply */
3830  pq_sendint64(reply_message, now); /* sendTime */
3831  pq_sendbyte(reply_message, requestReply); /* replyRequested */
3832 
3833  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
3834  force,
3835  LSN_FORMAT_ARGS(recvpos),
3836  LSN_FORMAT_ARGS(writepos),
3837  LSN_FORMAT_ARGS(flushpos));
3838 
3841 
3842  if (recvpos > last_recvpos)
3843  last_recvpos = recvpos;
3844  if (writepos > last_writepos)
3845  last_writepos = writepos;
3846  if (flushpos > last_flushpos)
3847  last_flushpos = flushpos;
3848 }
3849 
3850 /*
3851  * Exit routine for apply workers due to subscription parameter changes.
3852  */
3853 static void
3855 {
3857  {
3858  /*
3859  * Don't stop the parallel apply worker as the leader will detect the
3860  * subscription parameter change and restart logical replication later
3861  * anyway. This also prevents the leader from reporting errors when
3862  * trying to communicate with a stopped parallel apply worker, which
3863  * would accidentally disable subscriptions if disable_on_error was
3864  * set.
3865  */
3866  return;
3867  }
3868 
3869  /*
3870  * Reset the last-start time for this apply worker so that the launcher
3871  * will restart it without waiting for wal_retrieve_retry_interval if the
3872  * subscription is still active, and so that we won't leak that hash table
3873  * entry if it isn't.
3874  */
3875  if (am_leader_apply_worker())
3877 
3878  proc_exit(0);
3879 }
3880 
3881 /*
3882  * Reread subscription info if needed. Most changes will be exit.
3883  */
3884 void
3886 {
3887  MemoryContext oldctx;
3889  bool started_tx = false;
3890 
3891  /* When cache state is valid there is nothing to do here. */
3892  if (MySubscriptionValid)
3893  return;
3894 
3895  /* This function might be called inside or outside of transaction. */
3896  if (!IsTransactionState())
3897  {
3899  started_tx = true;
3900  }
3901 
3902  /* Ensure allocations in permanent context. */
3904 
3906 
3907  /*
3908  * Exit if the subscription was removed. This normally should not happen
3909  * as the worker gets killed during DROP SUBSCRIPTION.
3910  */
3911  if (!newsub)
3912  {
3913  ereport(LOG,
3914  (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
3915  MySubscription->name)));
3916 
3917  /* Ensure we remove no-longer-useful entry for worker's start time */
3918  if (am_leader_apply_worker())
3920 
3921  proc_exit(0);
3922  }
3923 
3924  /* Exit if the subscription was disabled. */
3925  if (!newsub->enabled)
3926  {
3927  ereport(LOG,
3928  (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
3929  MySubscription->name)));
3930 
3932  }
3933 
3934  /* !slotname should never happen when enabled is true. */
3935  Assert(newsub->slotname);
3936 
3937  /* two-phase should not be altered */
3938  Assert(newsub->twophasestate == MySubscription->twophasestate);
3939 
3940  /*
3941  * Exit if any parameter that affects the remote connection was changed.
3942  * The launcher will start a new worker but note that the parallel apply
3943  * worker won't restart if the streaming option's value is changed from
3944  * 'parallel' to any other value or the server decides not to stream the
3945  * in-progress transaction.
3946  */
3947  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
3948  strcmp(newsub->name, MySubscription->name) != 0 ||
3949  strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
3950  newsub->binary != MySubscription->binary ||
3951  newsub->stream != MySubscription->stream ||
3952  newsub->passwordrequired != MySubscription->passwordrequired ||
3953  strcmp(newsub->origin, MySubscription->origin) != 0 ||
3954  newsub->owner != MySubscription->owner ||
3955  !equal(newsub->publications, MySubscription->publications))
3956  {
3958  ereport(LOG,
3959  (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
3960  MySubscription->name)));
3961  else
3962  ereport(LOG,
3963  (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
3964  MySubscription->name)));
3965 
3967  }
3968 
3969  /* Check for other changes that should never happen too. */
3970  if (newsub->dbid != MySubscription->dbid)
3971  {
3972  elog(ERROR, "subscription %u changed unexpectedly",
3974  }
3975 
3976  /* Clean old subscription info and switch to new one. */
3979 
3980  MemoryContextSwitchTo(oldctx);
3981 
3982  /* Change synchronous commit according to the user's wishes */
3983  SetConfigOption("synchronous_commit", MySubscription->synccommit,
3985 
3986  if (started_tx)
3988 
3989  MySubscriptionValid = true;
3990 }
3991 
3992 /*
3993  * Callback from subscription syscache invalidation.
3994  */
3995 static void
3996 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
3997 {
3998  MySubscriptionValid = false;
3999 }
4000 
4001 /*
4002  * subxact_info_write
4003  * Store information about subxacts for a toplevel transaction.
4004  *
4005  * For each subxact we store offset of it's first change in the main file.
4006  * The file is always over-written as a whole.
4007  *
4008  * XXX We should only store subxacts that were not aborted yet.
4009  */
4010 static void
4012 {
4013  char path[MAXPGPATH];
4014  Size len;
4015  BufFile *fd;
4016 
4018 
4019  /* construct the subxact filename */
4020  subxact_filename(path, subid, xid);
4021 
4022  /* Delete the subxacts file, if exists. */
4023  if (subxact_data.nsubxacts == 0)
4024  {
4027 
4028  return;
4029  }
4030 
4031  /*
4032  * Create the subxact file if it not already created, otherwise open the
4033  * existing file.
4034  */
4036  true);
4037  if (fd == NULL)
4039 
4040  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4041 
4042  /* Write the subxact count and subxact info */
4045 
4046  BufFileClose(fd);
4047 
4048  /* free the memory allocated for subxact info */
4050 }
4051 
4052 /*
4053  * subxact_info_read
4054  * Restore information about subxacts of a streamed transaction.
4055  *
4056  * Read information about subxacts into the structure subxact_data that can be
4057  * used later.
4058  */
4059 static void
4061 {
4062  char path[MAXPGPATH];
4063  Size len;
4064  BufFile *fd;
4065  MemoryContext oldctx;
4066 
4070 
4071  /*
4072  * If the subxact file doesn't exist that means we don't have any subxact
4073  * info.
4074  */
4075  subxact_filename(path, subid, xid);
4077  true);
4078  if (fd == NULL)
4079  return;
4080 
4081  /* read number of subxact items */
4083 
4084  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4085 
4086  /* we keep the maximum as a power of 2 */
4088 
4089  /*
4090  * Allocate subxact information in the logical streaming context. We need
4091  * this information during the complete stream so that we can add the sub
4092  * transaction info to this. On stream stop we will flush this information
4093  * to the subxact file and reset the logical streaming context.
4094  */
4097  sizeof(SubXactInfo));
4098  MemoryContextSwitchTo(oldctx);
4099 
4100  if (len > 0)
4102 
4103  BufFileClose(fd);
4104 }
4105 
4106 /*
4107  * subxact_info_add
4108  * Add information about a subxact (offset in the main file).
4109  */
4110 static void
4112 {
4113  SubXactInfo *subxacts = subxact_data.subxacts;
4114  int64 i;
4115 
4116  /* We must have a valid top level stream xid and a stream fd. */
4118  Assert(stream_fd != NULL);
4119 
4120  /*
4121  * If the XID matches the toplevel transaction, we don't want to add it.
4122  */
4123  if (stream_xid == xid)
4124  return;
4125 
4126  /*
4127  * In most cases we're checking the same subxact as we've already seen in
4128  * the last call, so make sure to ignore it (this change comes later).
4129  */
4130  if (subxact_data.subxact_last == xid)
4131  return;
4132 
4133  /* OK, remember we're processing this XID. */
4134  subxact_data.subxact_last = xid;
4135 
4136  /*
4137  * Check if the transaction is already present in the array of subxact. We
4138  * intentionally scan the array from the tail, because we're likely adding
4139  * a change for the most recent subtransactions.
4140  *
4141  * XXX Can we rely on the subxact XIDs arriving in sorted order? That
4142  * would allow us to use binary search here.
4143  */
4144  for (i = subxact_data.nsubxacts; i > 0; i--)
4145  {
4146  /* found, so we're done */
4147  if (subxacts[i - 1].xid == xid)
4148  return;
4149  }
4150 
4151  /* This is a new subxact, so we need to add it to the array. */
4152  if (subxact_data.nsubxacts == 0)
4153  {
4154  MemoryContext oldctx;
4155 
4157 
4158  /*
4159  * Allocate this memory for subxacts in per-stream context, see
4160  * subxact_info_read.
4161  */
4163  subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
4164  MemoryContextSwitchTo(oldctx);
4165  }
4167  {
4169  subxacts = repalloc(subxacts,
4171  }
4172 
4173  subxacts[subxact_data.nsubxacts].xid = xid;
4174 
4175  /*
4176  * Get the current offset of the stream file and store it as offset of
4177  * this subxact.
4178  */
4180  &subxacts[subxact_data.nsubxacts].fileno,
4181  &subxacts[subxact_data.nsubxacts].offset);
4182 
4184  subxact_data.subxacts = subxacts;
4185 }
4186 
4187 /* format filename for file containing the info about subxacts */
4188 static inline void
4189 subxact_filename(char *path, Oid subid, TransactionId xid)
4190 {
4191  snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
4192 }
4193 
4194 /* format filename for file containing serialized changes */
4195 static inline void
4196 changes_filename(char *path, Oid subid, TransactionId xid)
4197 {
4198  snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
4199 }
4200 
4201 /*
4202  * stream_cleanup_files
4203  * Cleanup files for a subscription / toplevel transaction.
4204  *
4205  * Remove files with serialized changes and subxact info for a particular
4206  * toplevel transaction. Each subscription has a separate set of files
4207  * for any toplevel transaction.
4208  */
4209 void
4211 {
4212  char path[MAXPGPATH];
4213 
4214  /* Delete the changes file. */
4215  changes_filename(path, subid, xid);
4217 
4218  /* Delete the subxact file, if it exists. */
4219  subxact_filename(path, subid, xid);
4221 }
4222 
4223 /*
4224  * stream_open_file
4225  * Open a file that we'll use to serialize changes for a toplevel
4226  * transaction.
4227  *
4228  * Open a file for streamed changes from a toplevel transaction identified
4229  * by stream_xid (global variable). If it's the first chunk of streamed
4230  * changes for this transaction, create the buffile, otherwise open the
4231  * previously created file.
4232  */
4233 static void
4234 stream_open_file(Oid subid, TransactionId xid, bool first_segment)
4235 {
4236  char path[MAXPGPATH];
4237  MemoryContext oldcxt;
4238 
4239  Assert(OidIsValid(subid));
4241  Assert(stream_fd == NULL);
4242 
4243 
4244  changes_filename(path, subid, xid);
4245  elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
4246 
4247  /*
4248  * Create/open the buffiles under the logical streaming context so that we
4249  * have those files until stream stop.
4250  */
4252 
4253  /*
4254  * If this is the first streamed segment, create the changes file.
4255  * Otherwise, just open the file for writing, in append mode.
4256  */
4257  if (first_segment)
4259  path);
4260  else
4261  {
4262  /*
4263  * Open the file and seek to the end of the file because we always
4264  * append the changes file.
4265  */
4267  path, O_RDWR, false);
4268  BufFileSeek(stream_fd, 0, 0, SEEK_END);
4269  }
4270 
4271  MemoryContextSwitchTo(oldcxt);
4272 }
4273 
4274 /*
4275  * stream_close_file
4276  * Close the currently open file with streamed changes.
4277  */
4278 static void
4280 {
4281  Assert(stream_fd != NULL);
4282 
4284 
4285  stream_fd = NULL;
4286 }
4287 
4288 /*
4289  * stream_write_change
4290  * Serialize a change to a file for the current toplevel transaction.
4291  *
4292  * The change is serialized in a simple format, with length (not including
4293  * the length), action code (identifying the message type) and message
4294  * contents (without the subxact TransactionId value).
4295  */
4296 static void
4298 {
4299  int len;
4300 
4301  Assert(stream_fd != NULL);
4302 
4303  /* total on-disk size, including the action type character */
4304  len = (s->len - s->cursor) + sizeof(char);
4305 
4306  /* first write the size */
4307  BufFileWrite(stream_fd, &len, sizeof(len));
4308 
4309  /* then the action */
4310  BufFileWrite(stream_fd, &action, sizeof(action));
4311 
4312  /* and finally the remaining part of the buffer (after the XID) */
4313  len = (s->len - s->cursor);
4314 
4315  BufFileWrite(stream_fd, &s->data[s->cursor], len);
4316 }
4317 
4318 /*
4319  * stream_open_and_write_change
4320  * Serialize a message to a file for the given transaction.
4321  *
4322  * This function is similar to stream_write_change except that it will open the
4323  * target file if not already before writing the message and close the file at
4324  * the end.
4325  */
4326 static void
4328 {
4330 
4331  if (!stream_fd)
4332  stream_start_internal(xid, false);
4333 
4335  stream_stop_internal(xid);
4336 }
4337 
4338 /*
4339  * Sets streaming options including replication slot name and origin start
4340  * position. Workers need these options for logical replication.
4341  */
4342 void
4344  char *slotname,
4345  XLogRecPtr *origin_startpos)
4346 {
4347  int server_version;
4348 
4349  options->logical = true;
4350  options->startpoint = *origin_startpos;
4351  options->slotname = slotname;
4352 
4354  options->proto.logical.proto_version =
4359 
4360  options->proto.logical.publication_names = MySubscription->publications;
4361  options->proto.logical.binary = MySubscription->binary;
4362 
4363  /*
4364  * Assign the appropriate option value for streaming option according to
4365  * the 'streaming' mode and the publisher's ability to support that mode.
4366  */
4367  if (server_version >= 160000 &&
4369  {
4370  options->proto.logical.streaming_str = "parallel";
4372  }
4373  else if (server_version >= 140000 &&
4375  {
4376  options->proto.logical.streaming_str = "on";
4378  }
4379  else
4380  {
4381  options->proto.logical.streaming_str = NULL;
4383  }
4384 
4385  options->proto.logical.twophase = false;
4386  options->proto.logical.origin = pstrdup(MySubscription->origin);
4387 }
4388 
4389 /*
4390  * Cleanup the memory for subxacts and reset the related variables.
4391  */
4392 static inline void
4394 {
4395  if (subxact_data.subxacts)
4397 
4398  subxact_data.subxacts = NULL;
4400  subxact_data.nsubxacts = 0;
4402 }
4403 
4404 /*
4405  * Form the prepared transaction GID for two_phase transactions.
4406  *
4407  * Return the GID in the supplied buffer.
4408  */
4409 static void
4410 TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
4411 {
4412  Assert(subid != InvalidRepOriginId);
4413 
4414  if (!TransactionIdIsValid(xid))
4415  ereport(ERROR,
4416  (errcode(ERRCODE_PROTOCOL_VIOLATION),
4417  errmsg_internal("invalid two-phase transaction ID")));
4418 
4419  snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
4420 }
4421 
4422 /*
4423  * Common function to run the apply loop with error handling. Disable the
4424  * subscription, if necessary.
4425  *
4426  * Note that we don't handle FATAL errors which are probably because
4427  * of system resource error and are not repeatable.
4428  */
4429 void
4430 start_apply(XLogRecPtr origin_startpos)
4431 {
4432  PG_TRY();
4433  {
4434  LogicalRepApplyLoop(origin_startpos);
4435  }
4436  PG_CATCH();
4437  {
4440  else
4441  {
4442  /*
4443  * Report the worker failed while applying changes. Abort the
4444  * current transaction so that the stats message is sent in an
4445  * idle state.
4446  */
4449 
4450  PG_RE_THROW();
4451  }
4452  }
4453  PG_END_TRY();
4454 }
4455 
4456 /*
4457  * Runs the leader apply worker.
4458  *
4459  * It sets up replication origin, streaming options and then starts streaming.
4460  */
4461 static void
4463 {
4464  char originname[NAMEDATALEN];
4465  XLogRecPtr origin_startpos = InvalidXLogRecPtr;
4466  char *slotname = NULL;
4468  RepOriginId originid;
4469  TimeLineID startpointTLI;
4470  char *err;
4471  bool must_use_password;
4472 
4473  slotname = MySubscription->slotname;
4474 
4475  /*
4476  * This shouldn't happen if the subscription is enabled, but guard against
4477  * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
4478  * slot is NULL.)
4479  */
4480  if (!slotname)
4481  ereport(ERROR,
4482  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
4483  errmsg("subscription has no replication slot set")));
4484 
4485  /* Setup replication origin tracking. */
4487  originname, sizeof(originname));
4489  originid = replorigin_by_name(originname, true);
4490  if (!OidIsValid(originid))
4491  originid = replorigin_create(originname);
4492  replorigin_session_setup(originid, 0);
4493  replorigin_session_origin = originid;
4494  origin_startpos = replorigin_session_get_progress(false);
4495 
4496  /* Is the use of a password mandatory? */
4497  must_use_password = MySubscription->passwordrequired &&
4499 
4500  /* Note that the superuser_arg call can access the DB */
4502 
4504  must_use_password,
4505  MySubscription->name, &err);
4506 
4507  if (LogRepWorkerWalRcvConn == NULL)
4508  ereport(ERROR,
4509  (errcode(ERRCODE_CONNECTION_FAILURE),
4510  errmsg("could not connect to the publisher: %s", err)));
4511 
4512  /*
4513  * We don't really use the output identify_system for anything but it does
4514  * some initializations on the upstream so let's still call it.
4515  */
4516  (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
4517 
4518  set_apply_error_context_origin(originname);
4519 
4520  set_stream_options(&options, slotname, &origin_startpos);
4521 
4522  /*
4523  * Even when the two_phase mode is requested by the user, it remains as
4524  * the tri-state PENDING until all tablesyncs have reached READY state.
4525  * Only then, can it become ENABLED.
4526  *
4527  * Note: If the subscription has no tables then leave the state as
4528  * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
4529  * work.
4530  */
4533  {
4534  /* Start streaming with two_phase enabled */
4535  options.proto.logical.twophase = true;
4537 
4542  }
4543  else
4544  {
4546  }
4547 
4548  ereport(DEBUG1,
4549  (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
4554  "?")));
4555 
4556  /* Run the main loop. */
4557  start_apply(origin_startpos);
4558 }
4559 
4560 /*
4561  * Common initialization for leader apply worker, parallel apply worker and
4562  * tablesync worker.
4563  *
4564  * Initialize the database connection, in-memory subscription and necessary
4565  * config options.
4566  */
4567 void
4569 {
4570  MemoryContext oldctx;
4571 
4572  /* Run as replica session replication role. */
4573  SetConfigOption("session_replication_role", "replica",
4575 
4576  /* Connect to our database. */
4579  0);
4580 
4581  /*
4582  * Set always-secure search path, so malicious users can't redirect user
4583  * code (e.g. pg_index.indexprs).
4584  */
4585  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
4586 
4587  /* Load the subscription into persistent memory context. */
4589  "ApplyContext",
4593 
4595  if (!MySubscription)
4596  {
4597  ereport(LOG,
4598  (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
4600 
4601  /* Ensure we remove no-longer-useful entry for worker's start time */
4602  if (am_leader_apply_worker())
4604 
4605  proc_exit(0);
4606  }
4607 
4608  MySubscriptionValid = true;
4609  MemoryContextSwitchTo(oldctx);
4610 
4611  if (!MySubscription->enabled)
4612  {
4613  ereport(LOG,
4614  (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
4615  MySubscription->name)));
4616 
4618  }
4619 
4620  /* Setup synchronous commit according to the user's wishes */
4621  SetConfigOption("synchronous_commit", MySubscription->synccommit,
4623 
4624  /* Keep us informed about subscription changes. */
4627  (Datum) 0);
4628 
4629  if (am_tablesync_worker())
4630  ereport(LOG,
4631  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
4634  else
4635  ereport(LOG,
4636  (errmsg("logical replication apply worker for subscription \"%s\" has started",
4637  MySubscription->name)));
4638 
4640 }
4641 
4642 /* Common function to setup the leader apply or tablesync worker. */
4643 void
4644 SetupApplyOrSyncWorker(int worker_slot)
4645 {
4646  /* Attach to slot */
4647  logicalrep_worker_attach(worker_slot);
4648 
4650 
4651  /* Setup signal handling */
4653  pqsignal(SIGTERM, die);
4655 
4656  /*
4657  * We don't currently need any ResourceOwner in a walreceiver process, but
4658  * if we did, we could call CreateAuxProcessResourceOwner here.
4659  */
4660 
4661  /* Initialise stats to a sanish value */
4664 
4665  /* Load the libpq-specific functions */
4666  load_file("libpqwalreceiver", false);
4667 
4669 
4670  /* Connect to the origin and start the replication. */
4671  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
4673 
4674  /*
4675  * Setup callback for syscache so that we know when something changes in
4676  * the subscription relation state.
4677  */
4680  (Datum) 0);
4681 }
4682 
4683 /* Logical Replication Apply worker entry point */
4684 void
4686 {
4687  int worker_slot = DatumGetInt32(main_arg);
4688 
4689  InitializingApplyWorker = true;
4690 
4691  SetupApplyOrSyncWorker(worker_slot);
4692 
4693  InitializingApplyWorker = false;
4694 
4695  run_apply_worker();
4696 
4697  proc_exit(0);
4698 }
4699 
4700 /*
4701  * After error recovery, disable the subscription in a new transaction
4702  * and exit cleanly.
4703  */
4704 void
4706 {
4707  /*
4708  * Emit the error message, and recover from the error state to an idle
4709  * state
4710  */
4711  HOLD_INTERRUPTS();
4712 
4713  EmitErrorReport();
4715  FlushErrorState();
4716 
4718 
4719  /* Report the worker failed during either table synchronization or apply */
4721  !am_tablesync_worker());
4722 
4723  /* Disable the subscription */
4727 
4728  /* Ensure we remove no-longer-useful entry for worker's start time */
4729  if (am_leader_apply_worker())
4731 
4732  /* Notify the subscription has been disabled and exit */
4733  ereport(LOG,
4734  errmsg("subscription \"%s\" has been disabled because of an error",
4735  MySubscription->name));
4736 
4737  proc_exit(0);
4738 }
4739 
4740 /*
4741  * Is current process a logical replication worker?
4742  */
4743 bool
4745 {
4746  return MyLogicalRepWorker != NULL;
4747 }
4748 
4749 /*
4750  * Is current process a logical replication parallel apply worker?
4751  */
4752 bool
4754 {
4756 }
4757 
4758 /*
4759  * Start skipping changes of the transaction if the given LSN matches the
4760  * LSN specified by subscription's skiplsn.
4761  */
4762 static void
4764 {
4768 
4769  /*
4770  * Quick return if it's not requested to skip this transaction. This
4771  * function is called for every remote transaction and we assume that
4772  * skipping the transaction is not used often.
4773  */
4775  MySubscription->skiplsn != finish_lsn))
4776  return;
4777 
4778  /* Start skipping all changes of this transaction */
4779  skip_xact_finish_lsn = finish_lsn;
4780 
4781  ereport(LOG,
4782  errmsg("logical replication starts skipping transaction at LSN %X/%X",
4784 }
4785 
4786 /*
4787  * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
4788  */
4789 static void
4791 {
4792  if (!is_skipping_changes())
4793  return;
4794 
4795  ereport(LOG,
4796  (errmsg("logical replication completed skipping transaction at LSN %X/%X",
4798 
4799  /* Stop skipping changes */
4801 }
4802 
4803 /*
4804  * Clear subskiplsn of pg_subscription catalog.
4805  *
4806  * finish_lsn is the transaction's finish LSN that is used to check if the
4807  * subskiplsn matches it. If not matched, we raise a warning when clearing the
4808  * subskiplsn in order to inform users for cases e.g., where the user mistakenly
4809  * specified the wrong subskiplsn.
4810  */
4811 static void
4813 {
4814  Relation rel;
4815  Form_pg_subscription subform;
4816  HeapTuple tup;
4817  XLogRecPtr myskiplsn = MySubscription->skiplsn;
4818  bool started_tx = false;
4819 
4821  return;
4822 
4823  if (!IsTransactionState())
4824  {
4826  started_tx = true;
4827  }
4828 
4829  /*
4830  * Protect subskiplsn of pg_subscription from being concurrently updated
4831  * while clearing it.
4832  */
4833  LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
4834  AccessShareLock);
4835 
4836  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
4837 
4838  /* Fetch the existing tuple. */
4841 
4842  if (!HeapTupleIsValid(tup))
4843  elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
4844 
4845  subform = (Form_pg_subscription) GETSTRUCT(tup);
4846 
4847  /*
4848  * Clear the subskiplsn. If the user has already changed subskiplsn before
4849  * clearing it we don't update the catalog and the replication origin
4850  * state won't get advanced. So in the worst case, if the server crashes
4851  * before sending an acknowledgment of the flush position the transaction
4852  * will be sent again and the user needs to set subskiplsn again. We can
4853  * reduce the possibility by logging a replication origin WAL record to
4854  * advance the origin LSN instead but there is no way to advance the
4855  * origin timestamp and it doesn't seem to be worth doing anything about
4856  * it since it's a very rare case.
4857  */
4858  if (subform->subskiplsn == myskiplsn)
4859  {
4860  bool nulls[Natts_pg_subscription];
4861  bool replaces[Natts_pg_subscription];
4862  Datum values[Natts_pg_subscription];
4863 
4864  memset(values, 0, sizeof(values));
4865  memset(nulls, false, sizeof(nulls));
4866  memset(replaces, false, sizeof(replaces));
4867 
4868  /* reset subskiplsn */
4869  values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
4870  replaces[Anum_pg_subscription_subskiplsn - 1] = true;
4871 
4872  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
4873  replaces);
4874  CatalogTupleUpdate(rel, &tup->t_self, tup);
4875 
4876  if (myskiplsn != finish_lsn)
4877  ereport(WARNING,
4878  errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
4879  errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
4880  LSN_FORMAT_ARGS(finish_lsn),
4881  LSN_FORMAT_ARGS(myskiplsn)));
4882  }
4883 
4884  heap_freetuple(tup);
4885  table_close(rel, NoLock);
4886 
4887  if (started_tx)
4889 }
4890 
4891 /* Error callback to give more context info about the change being applied */
4892 void
4894 {
4896 
4898  return;
4899 
4900  Assert(errarg->origin_name);
4901 
4902  if (errarg->rel == NULL)
4903  {
4904  if (!TransactionIdIsValid(errarg->remote_xid))
4905  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
4906  errarg->origin_name,
4907  logicalrep_message_type(errarg->command));
4908  else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4909  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
4910  errarg->origin_name,
4912  errarg->remote_xid);
4913  else
4914  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
4915  errarg->origin_name,
4917  errarg->remote_xid,
4918  LSN_FORMAT_ARGS(errarg->finish_lsn));
4919  }
4920  else
4921  {
4922  if (errarg->remote_attnum < 0)
4923  {
4924  if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4925  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
4926  errarg->origin_name,
4928  errarg->rel->remoterel.nspname,
4929  errarg->rel->remoterel.relname,
4930  errarg->remote_xid);
4931  else
4932  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
4933  errarg->origin_name,
4935  errarg->rel->remoterel.nspname,
4936  errarg->rel->remoterel.relname,
4937  errarg->remote_xid,
4938  LSN_FORMAT_ARGS(errarg->finish_lsn));
4939  }
4940  else
4941  {
4942  if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4943  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
4944  errarg->origin_name,
4946  errarg->rel->remoterel.nspname,
4947  errarg->rel->remoterel.relname,
4948  errarg->rel->remoterel.attnames[errarg->remote_attnum],
4949  errarg->remote_xid);
4950  else
4951  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
4952  errarg->origin_name,
4954  errarg->rel->remoterel.nspname,
4955  errarg->rel->remoterel.relname,
4956  errarg->rel->remoterel.attnames[errarg->remote_attnum],
4957  errarg->remote_xid,
4958  LSN_FORMAT_ARGS(errarg->finish_lsn));
4959  }
4960  }
4961 }
4962 
4963 /* Set transaction information of apply error callback */
4964 static inline void
4966 {
4969 }
4970 
4971 /* Reset all information of apply error callback */
4972 static inline void
4974 {
4979 }
4980 
4981 /*
4982  * Request wakeup of the workers for the given subscription OID
4983  * at commit of the current transaction.
4984  *
4985  * This is used to ensure that the workers process assorted changes
4986  * as soon as possible.
4987  */
4988 void
4990 {
4991  MemoryContext oldcxt;
4992 
4996  MemoryContextSwitchTo(oldcxt);
4997 }
4998 
4999 /*
5000  * Wake up the workers of any subscriptions that were changed in this xact.
5001  */
5002 void
5004 {
5005  if (isCommit && on_commit_wakeup_workers_subids != NIL)
5006  {
5007  ListCell *lc;
5008 
5009  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
5010  foreach(lc, on_commit_wakeup_workers_subids)
5011  {
5012  Oid subid = lfirst_oid(lc);
5013  List *workers;
5014  ListCell *lc2;
5015 
5016  workers = logicalrep_workers_find(subid, true);
5017  foreach(lc2, workers)
5018  {
5019  LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
5020 
5022  }
5023  }
5024  LWLockRelease(LogicalRepWorkerLock);
5025  }
5026 
5027  /* The List storage will be reclaimed automatically in xact cleanup. */
5029 }
5030 
5031 /*
5032  * Allocate the origin name in long-lived context for error context message.
5033  */
5034 void
5036 {
5038  originname);
5039 }
5040 
5041 /*
5042  * Return the action to be taken for the given transaction. See
5043  * TransApplyAction for information on each of the actions.
5044  *
5045  * *winfo is assigned to the destination parallel worker info when the leader
5046  * apply worker has to pass all the transaction's changes to the parallel
5047  * apply worker.
5048  */
5049 static TransApplyAction
5051 {
5052  *winfo = NULL;
5053 
5055  {
5056  return TRANS_PARALLEL_APPLY;
5057  }
5058 
5059  /*
5060  * If we are processing this transaction using a parallel apply worker
5061  * then either we send the changes to the parallel worker or if the worker
5062  * is busy then serialize the changes to the file which will later be
5063  * processed by the parallel worker.
5064  */
5065  *winfo = pa_find_worker(xid);
5066 
5067  if (*winfo && (*winfo)->serialize_changes)
5068  {
5070  }
5071  else if (*winfo)
5072  {
5074  }
5075 
5076  /*
5077  * If there is no parallel worker involved to process this transaction
5078  * then we either directly apply the change or serialize it to a file
5079  * which will later be applied when the transaction finish message is
5080  * processed.
5081  */
5082  else if (in_streamed_transaction)
5083  {
5084  return TRANS_LEADER_SERIALIZE;
5085  }
5086  else
5087  {
5088  return TRANS_LEADER_APPLY;
5089  }
5090 }
AclResult
Definition: acl.h:181
@ ACLCHECK_OK
Definition: acl.h:182
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2669
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:3908
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_reset_subtrans(void)
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerShared * MyParallelShared
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_allocate_worker(TransactionId xid)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void pa_decr_and_wait_stream_block(void)
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:381
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:2499
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:4189
static void begin_replication_step(void)
Definition: worker.c:526
static void end_replication_step(void)
Definition: worker.c:549
static void cleanup_subxact_info(void)
Definition: worker.c:4393
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
Definition: worker.c:4343
static void apply_handle_stream_prepare(StringInfo s)
Definition: worker.c:1289
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition: worker.c:2477
static void subxact_info_add(TransactionId xid)
Definition: worker.c:4111
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:670
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:4210
MemoryContext ApplyMessageContext
Definition: worker.c:308
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:486
static void apply_handle_type(StringInfo s)
Definition: worker.c:2344
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:3164
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:3482
static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
Definition: worker.c:4410
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:3996
static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
Definition: worker.c:5050
TransApplyAction
Definition: worker.c:284
@ TRANS_LEADER_SERIALIZE
Definition: worker.c:289
@ TRANS_PARALLEL_APPLY
Definition: worker.c:292
@ TRANS_LEADER_SEND_TO_PARALLEL
Definition: worker.c:290
@ TRANS_LEADER_APPLY
Definition: worker.c:286
@ TRANS_LEADER_PARTIAL_SERIALIZE
Definition: worker.c:291
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:577
static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
Definition: worker.c:4327
struct ApplyExecutionData ApplyExecutionData
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:4196
bool InitializingApplyWorker
Definition: worker.c:336
static void apply_worker_exit(void)
Definition: worker.c:3854
static BufFile * stream_fd
Definition: worker.c:357
static void apply_handle_update(StringInfo s)
Definition: worker.c:2540
void stream_stop_internal(TransactionId xid)
Definition: worker.c:1621
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:2151
void start_apply(XLogRecPtr origin_startpos)
Definition: worker.c:4430
static void stop_skipping_changes(void)
Definition: worker.c:4790
struct ApplySubXactData ApplySubXactData
#define NAPTIME_PER_CYCLE
Definition: worker.c:213
static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:2866
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:3412
static uint32 parallel_stream_nchanges
Definition: worker.c:333
static void apply_handle_commit_prepared(StringInfo s)
Definition: worker.c:1187
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:3498
void LogicalRepWorkersWakeupAtCommit(Oid subid)
Definition: worker.c:4989
bool IsLogicalWorker(void)
Definition: worker.c:4744
static ApplySubXactData subxact_data
Definition: worker.c:375
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
Definition: worker.c:2913
ApplyErrorCallbackArg apply_error_callback_arg
Definition: worker.c:296
bool in_remote_transaction
Definition: worker.c:321
static XLogRecPtr skip_xact_finish_lsn
Definition: worker.c:353
static void stream_open_file(Oid subid, TransactionId xid, bool first_segment)
Definition: worker.c:4234
static void apply_handle_delete(StringInfo s)
Definition: worker.c:2724
void apply_dispatch(StringInfo s)
Definition: worker.c:3292
#define is_skipping_changes()
Definition: worker.c:354
static void stream_write_change(char action, StringInfo s)
Definition: worker.c:4297
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
Definition: worker.c:4812
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
Definition: worker.c:2657
static void ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, off_t offset)
Definition: worker.c:1987
static void apply_handle_begin(StringInfo s)
Definition: worker.c:1009
void DisableSubscriptionAndExit(void)
Definition: worker.c:4705
static dlist_head lsn_mapping
Definition: worker.c:222
bool IsLogicalParallelApplyWorker(void)
Definition: worker.c:4753
void AtEOXact_LogicalRepWorkers(bool isCommit)
Definition: worker.c:5003
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:815
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:446
static void finish_edata(ApplyExecutionData *edata)
Definition: worker.c:727
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:916
static void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:4965
ErrorContextCallback * apply_error_context_stack
Definition: worker.c:306
static void stream_abort_internal(TransactionId xid, TransactionId subxid)
Definition: worker.c:1747
static void apply_handle_commit(StringInfo s)
Definition: worker.c:1034
void stream_start_internal(TransactionId xid, bool first_segment)
Definition: worker.c:1447
static List * on_commit_wakeup_workers_subids
Definition: worker.c:319
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:1830
static void apply_handle_relation(StringInfo s)
Definition: worker.c:2321
void set_apply_error_context_origin(char *originname)
Definition: worker.c:5035
struct ApplyErrorCallbackArg ApplyErrorCallbackArg
MemoryContext ApplyContext
Definition: worker.c:309
static void subxact_info_write(Oid subid, TransactionId xid)
Definition: worker.c:4011
static void TargetPrivilegesCheck(Relation rel, AclMode mode)
Definition: worker.c:2359
static void apply_handle_prepare(StringInfo s)
Definition: worker.c:1126
static void apply_handle_rollback_prepared(StringInfo s)
Definition: worker.c:1236
static void run_apply_worker()
Definition: worker.c:4462
void SetupApplyOrSyncWorker(int worker_slot)
Definition: worker.c:4644
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:1644
static void apply_handle_origin(StringInfo s)
Definition: worker.c:1426
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:3765
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:314
static XLogRecPtr remote_final_lsn
Definition: worker.c:322
static bool MySubscriptionValid
Definition: worker.c:317
void apply_error_callback(void *arg)
Definition: worker.c:4893
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition: worker.c:3456
static MemoryContext LogicalStreamingContext
Definition: worker.c:312
void maybe_reread_subscription(void)
Definition: worker.c:3885
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
Definition: worker.c:2261
void InitializeLogRepWorker(void)
Definition: worker.c:4568
static bool in_streamed_transaction
Definition: worker.c:325
struct SubXactInfo SubXactInfo
static void apply_handle_begin_prepare(StringInfo s)
Definition: worker.c:1060
struct FlushPosition FlushPosition
void ApplyWorkerMain(Datum main_arg)
Definition: worker.c:4685
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:2019
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:1485
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Definition: worker.c:4763
Subscription * MySubscription
Definition: worker.c:316
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
Definition: worker.c:1089
static void stream_close_file(void)
Definition: worker.c:4279
static TransactionId stream_xid
Definition: worker.c:327
static void apply_handle_insert(StringInfo s)
Definition: worker.c:2391
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:758
static void subxact_info_read(Oid subid, TransactionId xid)
Definition: worker.c:4060
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
Definition: worker.c:2812
static void reset_apply_error_context_info(void)
Definition: worker.c:4973
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1719
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_IDLE
@ STATE_IDLEINTRANSACTION
@ STATE_RUNNING
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:753
static Datum values[MAXATTR]
Definition: bootstrap.c:156
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:654
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:291
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
Definition: buffile.c:833
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition: buffile.c:676
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
Definition: buffile.c:664
void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset)
Definition: buffile.c:951
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:740
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition: buffile.c:267
void BufFileClose(BufFile *file)
Definition: buffile.c:412
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
Definition: buffile.c:364
unsigned int uint32
Definition: c.h:495
#define likely(x)
Definition: c.h:299
uint32 TransactionId
Definition: c.h:641
#define OidIsValid(objectId)
Definition: c.h:764
size_t Size
Definition: c.h:594
int64 TimestampTz
Definition: timestamp.h:39
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:144
int my_log2(long num)
Definition: dynahash.c:1760
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1156
void EmitErrorReport(void)
Definition: elog.c:1669
int errdetail(const char *fmt,...)
Definition: elog.c:1202
ErrorContextCallback * error_context_stack
Definition: elog.c:95
void FlushErrorState(void)
Definition: elog.c:1825
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define LOG
Definition: elog.h:31
#define PG_RE_THROW()
Definition: elog.h:411
#define errcontext
Definition: elog.h:196
#define PG_TRY(...)
Definition: elog.h:370
#define WARNING
Definition: elog.h:36
#define DEBUG2
Definition: elog.h:29
#define PG_END_TRY(...)
Definition: elog.h:395
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:380
#define ereport(elevel,...)
Definition: elog.h:149
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:223
void err(int eval, const char *fmt,...)
Definition: err.c:43
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:128
<