PostgreSQL Source Code  git master
worker.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * worker.c
3  * PostgreSQL logical replication worker (apply)
4  *
5  * Copyright (c) 2016-2023, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/worker.c
9  *
10  * NOTES
11  * This file contains the worker which applies logical changes as they come
12  * from remote logical replication stream.
13  *
14  * The main worker (apply) is started by logical replication worker
15  * launcher for every enabled subscription in a database. It uses
16  * walsender protocol to communicate with publisher.
17  *
18  * This module includes server facing code and shares libpqwalreceiver
19  * module with walreceiver for providing the libpq specific functionality.
20  *
21  *
22  * STREAMED TRANSACTIONS
23  * ---------------------
24  * Streamed transactions (large transactions exceeding a memory limit on the
25  * upstream) are applied using one of two approaches:
26  *
27  * 1) Write to temporary files and apply when the final commit arrives
28  *
29  * This approach is used when the user has set the subscription's streaming
30  * option as on.
31  *
32  * Unlike the regular (non-streamed) case, handling streamed transactions has
33  * to handle aborts of both the toplevel transaction and subtransactions. This
34  * is achieved by tracking offsets for subtransactions, which is then used
35  * to truncate the file with serialized changes.
36  *
37  * The files are placed in tmp file directory by default, and the filenames
38  * include both the XID of the toplevel transaction and OID of the
39  * subscription. This is necessary so that different workers processing a
40  * remote transaction with the same XID doesn't interfere.
41  *
42  * We use BufFiles instead of using normal temporary files because (a) the
43  * BufFile infrastructure supports temporary files that exceed the OS file size
44  * limit, (b) provides a way for automatic clean up on the error and (c) provides
45  * a way to survive these files across local transactions and allow to open and
46  * close at stream start and close. We decided to use FileSet
47  * infrastructure as without that it deletes the files on the closure of the
48  * file and if we decide to keep stream files open across the start/stop stream
49  * then it will consume a lot of memory (more than 8K for each BufFile and
50  * there could be multiple such BufFiles as the subscriber could receive
51  * multiple start/stop streams for different transactions before getting the
52  * commit). Moreover, if we don't use FileSet then we also need to invent
53  * a new way to pass filenames to BufFile APIs so that we are allowed to open
54  * the file we desired across multiple stream-open calls for the same
55  * transaction.
56  *
57  * 2) Parallel apply workers.
58  *
59  * This approach is used when the user has set the subscription's streaming
60  * option as parallel. See logical/applyparallelworker.c for information about
61  * this approach.
62  *
63  * TWO_PHASE TRANSACTIONS
64  * ----------------------
65  * Two phase transactions are replayed at prepare and then committed or
66  * rolled back at commit prepared and rollback prepared respectively. It is
67  * possible to have a prepared transaction that arrives at the apply worker
68  * when the tablesync is busy doing the initial copy. In this case, the apply
69  * worker skips all the prepared operations [e.g. inserts] while the tablesync
70  * is still busy (see the condition of should_apply_changes_for_rel). The
71  * tablesync worker might not get such a prepared transaction because say it
72  * was prior to the initial consistent point but might have got some later
73  * commits. Now, the tablesync worker will exit without doing anything for the
74  * prepared transaction skipped by the apply worker as the sync location for it
75  * will be already ahead of the apply worker's current location. This would lead
76  * to an "empty prepare", because later when the apply worker does the commit
77  * prepare, there is nothing in it (the inserts were skipped earlier).
78  *
79  * To avoid this, and similar prepare confusions the subscription's two_phase
80  * commit is enabled only after the initial sync is over. The two_phase option
81  * has been implemented as a tri-state with values DISABLED, PENDING, and
82  * ENABLED.
83  *
84  * Even if the user specifies they want a subscription with two_phase = on,
85  * internally it will start with a tri-state of PENDING which only becomes
86  * ENABLED after all tablesync initializations are completed - i.e. when all
87  * tablesync workers have reached their READY state. In other words, the value
88  * PENDING is only a temporary state for subscription start-up.
89  *
90  * Until the two_phase is properly available (ENABLED) the subscription will
91  * behave as if two_phase = off. When the apply worker detects that all
92  * tablesyncs have become READY (while the tri-state was PENDING) it will
93  * restart the apply worker process. This happens in
94  * process_syncing_tables_for_apply.
95  *
96  * When the (re-started) apply worker finds that all tablesyncs are READY for a
97  * two_phase tri-state of PENDING it start streaming messages with the
98  * two_phase option which in turn enables the decoding of two-phase commits at
99  * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100  * Now, it is possible that during the time we have not enabled two_phase, the
101  * publisher (replication server) would have skipped some prepares but we
102  * ensure that such prepares are sent along with commit prepare, see
103  * ReorderBufferFinishPrepared.
104  *
105  * If the subscription has no tables then a two_phase tri-state PENDING is
106  * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107  * PUBLICATION which might otherwise be disallowed (see below).
108  *
109  * If ever a user needs to be aware of the tri-state value, they can fetch it
110  * from the pg_subscription catalog (see column subtwophasestate).
111  *
112  * We don't allow to toggle two_phase option of a subscription because it can
113  * lead to an inconsistent replica. Consider, initially, it was on and we have
114  * received some prepare then we turn it off, now at commit time the server
115  * will send the entire transaction data along with the commit. With some more
116  * analysis, we can allow changing this option from off to on but not sure if
117  * that alone would be useful.
118  *
119  * Finally, to avoid problems mentioned in previous paragraphs from any
120  * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
121  * to 'off' and then again back to 'on') there is a restriction for
122  * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
123  * the two_phase tri-state is ENABLED, except when copy_data = false.
124  *
125  * We can get prepare of the same GID more than once for the genuine cases
126  * where we have defined multiple subscriptions for publications on the same
127  * server and prepared transaction has operations on tables subscribed to those
128  * subscriptions. For such cases, if we use the GID sent by publisher one of
129  * the prepares will be successful and others will fail, in which case the
130  * server will send them again. Now, this can lead to a deadlock if user has
131  * set synchronous_standby_names for all the subscriptions on subscriber. To
132  * avoid such deadlocks, we generate a unique GID (consisting of the
133  * subscription oid and the xid of the prepared transaction) for each prepare
134  * transaction on the subscriber.
135  *-------------------------------------------------------------------------
136  */
137 
138 #include "postgres.h"
139 
140 #include <sys/stat.h>
141 #include <unistd.h>
142 
143 #include "access/table.h"
144 #include "access/tableam.h"
145 #include "access/twophase.h"
146 #include "access/xact.h"
147 #include "access/xlog_internal.h"
148 #include "catalog/catalog.h"
149 #include "catalog/indexing.h"
150 #include "catalog/namespace.h"
151 #include "catalog/partition.h"
152 #include "catalog/pg_inherits.h"
153 #include "catalog/pg_subscription.h"
155 #include "catalog/pg_tablespace.h"
156 #include "commands/tablecmds.h"
157 #include "commands/tablespace.h"
158 #include "commands/trigger.h"
159 #include "executor/executor.h"
160 #include "executor/execPartition.h"
162 #include "funcapi.h"
163 #include "libpq/pqformat.h"
164 #include "libpq/pqsignal.h"
165 #include "mb/pg_wchar.h"
166 #include "miscadmin.h"
167 #include "nodes/makefuncs.h"
168 #include "optimizer/optimizer.h"
169 #include "parser/parse_relation.h"
170 #include "pgstat.h"
171 #include "postmaster/bgworker.h"
172 #include "postmaster/interrupt.h"
173 #include "postmaster/postmaster.h"
174 #include "postmaster/walwriter.h"
175 #include "replication/decode.h"
176 #include "replication/logical.h"
181 #include "replication/origin.h"
183 #include "replication/snapbuild.h"
184 #include "replication/walreceiver.h"
186 #include "rewrite/rewriteHandler.h"
187 #include "storage/buffile.h"
188 #include "storage/bufmgr.h"
189 #include "storage/fd.h"
190 #include "storage/ipc.h"
191 #include "storage/lmgr.h"
192 #include "storage/proc.h"
193 #include "storage/procarray.h"
194 #include "tcop/tcopprot.h"
195 #include "utils/acl.h"
196 #include "utils/builtins.h"
197 #include "utils/catcache.h"
198 #include "utils/dynahash.h"
199 #include "utils/datum.h"
200 #include "utils/fmgroids.h"
201 #include "utils/guc.h"
202 #include "utils/inval.h"
203 #include "utils/lsyscache.h"
204 #include "utils/memutils.h"
205 #include "utils/pg_lsn.h"
206 #include "utils/rel.h"
207 #include "utils/rls.h"
208 #include "utils/syscache.h"
209 #include "utils/timeout.h"
210 
211 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
212 
213 typedef struct FlushPosition
214 {
219 
221 
222 typedef struct ApplyExecutionData
223 {
224  EState *estate; /* executor state, used to track resources */
225 
226  LogicalRepRelMapEntry *targetRel; /* replication target rel */
227  ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
228 
229  /* These fields are used when the target relation is partitioned: */
230  ModifyTableState *mtstate; /* dummy ModifyTable state */
231  PartitionTupleRouting *proute; /* partition routing info */
233 
234 /* Struct for saving and restoring apply errcontext information */
235 typedef struct ApplyErrorCallbackArg
236 {
237  LogicalRepMsgType command; /* 0 if invalid */
239 
240  /* Remote node information */
241  int remote_attnum; /* -1 if invalid */
244  char *origin_name;
246 
247 /*
248  * The action to be taken for the changes in the transaction.
249  *
250  * TRANS_LEADER_APPLY:
251  * This action means that we are in the leader apply worker or table sync
252  * worker. The changes of the transaction are either directly applied or
253  * are read from temporary files (for streaming transactions) and then
254  * applied by the worker.
255  *
256  * TRANS_LEADER_SERIALIZE:
257  * This action means that we are in the leader apply worker or table sync
258  * worker. Changes are written to temporary files and then applied when the
259  * final commit arrives.
260  *
261  * TRANS_LEADER_SEND_TO_PARALLEL:
262  * This action means that we are in the leader apply worker and need to send
263  * the changes to the parallel apply worker.
264  *
265  * TRANS_LEADER_PARTIAL_SERIALIZE:
266  * This action means that we are in the leader apply worker and have sent some
267  * changes directly to the parallel apply worker and the remaining changes are
268  * serialized to a file, due to timeout while sending data. The parallel apply
269  * worker will apply these serialized changes when the final commit arrives.
270  *
271  * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
272  * serializing changes, the leader worker also needs to serialize the
273  * STREAM_XXX message to a file, and wait for the parallel apply worker to
274  * finish the transaction when processing the transaction finish command. So
275  * this new action was introduced to keep the code and logic clear.
276  *
277  * TRANS_PARALLEL_APPLY:
278  * This action means that we are in the parallel apply worker and changes of
279  * the transaction are applied directly by the worker.
280  */
281 typedef enum
282 {
283  /* The action for non-streaming transactions. */
285 
286  /* Actions for streaming transactions. */
292 
293 /* errcontext tracker */
295 {
296  .command = 0,
297  .rel = NULL,
298  .remote_attnum = -1,
299  .remote_xid = InvalidTransactionId,
300  .finish_lsn = InvalidXLogRecPtr,
301  .origin_name = NULL,
302 };
303 
305 
308 
309 /* per stream context for streaming transactions */
311 
313 
315 static bool MySubscriptionValid = false;
316 
318 
321 
322 /* fields valid only when processing streamed transaction */
323 static bool in_streamed_transaction = false;
324 
326 
327 /*
328  * The number of changes applied by parallel apply worker during one streaming
329  * block.
330  */
332 
333 /*
334  * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
335  * the subscription if the remote transaction's finish LSN matches the subskiplsn.
336  * Once we start skipping changes, we don't stop it until we skip all changes of
337  * the transaction even if pg_subscription is updated and MySubscription->skiplsn
338  * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
339  * we don't skip receiving and spooling the changes since we decide whether or not
340  * to skip applying the changes when starting to apply changes. The subskiplsn is
341  * cleared after successfully skipping the transaction or applying non-empty
342  * transaction. The latter prevents the mistakenly specified subskiplsn from
343  * being left. Note that we cannot skip the streaming transactions when using
344  * parallel apply workers because we cannot get the finish LSN before applying
345  * the changes. So, we don't start parallel apply worker when finish LSN is set
346  * by the user.
347  */
349 #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
350 
351 /* BufFile handle of the current streaming file */
352 static BufFile *stream_fd = NULL;
353 
354 typedef struct SubXactInfo
355 {
356  TransactionId xid; /* XID of the subxact */
357  int fileno; /* file number in the buffile */
358  off_t offset; /* offset in the file */
360 
361 /* Sub-transaction data for the current streaming transaction */
362 typedef struct ApplySubXactData
363 {
364  uint32 nsubxacts; /* number of sub-transactions */
365  uint32 nsubxacts_max; /* current capacity of subxacts */
366  TransactionId subxact_last; /* xid of the last sub-transaction */
367  SubXactInfo *subxacts; /* sub-xact offset in changes file */
369 
371 
372 static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
373 static inline void changes_filename(char *path, Oid subid, TransactionId xid);
374 
375 /*
376  * Information about subtransactions of a given toplevel transaction.
377  */
378 static void subxact_info_write(Oid subid, TransactionId xid);
379 static void subxact_info_read(Oid subid, TransactionId xid);
380 static void subxact_info_add(TransactionId xid);
381 static inline void cleanup_subxact_info(void);
382 
383 /*
384  * Serialize and deserialize changes for a toplevel transaction.
385  */
386 static void stream_open_file(Oid subid, TransactionId xid,
387  bool first_segment);
388 static void stream_write_change(char action, StringInfo s);
390 static void stream_close_file(void);
391 
392 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
393 
394 static void DisableSubscriptionAndExit(void);
395 
396 static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
398  ResultRelInfo *relinfo,
399  TupleTableSlot *remoteslot);
401  ResultRelInfo *relinfo,
402  TupleTableSlot *remoteslot,
403  LogicalRepTupleData *newtup,
404  Oid localindexoid);
406  ResultRelInfo *relinfo,
407  TupleTableSlot *remoteslot,
408  Oid localindexoid);
409 static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
410  LogicalRepRelation *remoterel,
411  Oid localidxoid,
412  TupleTableSlot *remoteslot,
413  TupleTableSlot **localslot);
415  TupleTableSlot *remoteslot,
416  LogicalRepTupleData *newtup,
417  CmdType operation);
418 
419 /* Compute GID for two_phase transactions */
420 static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
421 
422 /* Functions for skipping changes */
423 static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
424 static void stop_skipping_changes(void);
425 static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
426 
427 /* Functions for apply error callback */
428 static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
429 static inline void reset_apply_error_context_info(void);
430 
432  ParallelApplyWorkerInfo **winfo);
433 
434 /*
435  * Return the name of the logical replication worker.
436  */
437 static const char *
439 {
440  if (am_tablesync_worker())
441  return _("logical replication table synchronization worker");
442  else if (am_parallel_apply_worker())
443  return _("logical replication parallel apply worker");
444  else
445  return _("logical replication apply worker");
446 }
447 
448 /*
449  * Form the origin name for the subscription.
450  *
451  * This is a common function for tablesync and other workers. Tablesync workers
452  * must pass a valid relid. Other callers must pass relid = InvalidOid.
453  *
454  * Return the name in the supplied buffer.
455  */
456 void
458  char *originname, Size szoriginname)
459 {
460  if (OidIsValid(relid))
461  {
462  /* Replication origin name for tablesync workers. */
463  snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
464  }
465  else
466  {
467  /* Replication origin name for non-tablesync workers. */
468  snprintf(originname, szoriginname, "pg_%u", suboid);
469  }
470 }
471 
472 /*
473  * Should this worker apply changes for given relation.
474  *
475  * This is mainly needed for initial relation data sync as that runs in
476  * separate worker process running in parallel and we need some way to skip
477  * changes coming to the leader apply worker during the sync of a table.
478  *
479  * Note we need to do smaller or equals comparison for SYNCDONE state because
480  * it might hold position of end of initial slot consistent point WAL
481  * record + 1 (ie start of next record) and next record can be COMMIT of
482  * transaction we are now processing (which is what we set remote_final_lsn
483  * to in apply_handle_begin).
484  *
485  * Note that for streaming transactions that are being applied in the parallel
486  * apply worker, we disallow applying changes if the target table in the
487  * subscription is not in the READY state, because we cannot decide whether to
488  * apply the change as we won't know remote_final_lsn by that time.
489  *
490  * We already checked this in pa_can_start() before assigning the
491  * streaming transaction to the parallel worker, but it also needs to be
492  * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
493  * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
494  * while applying this transaction.
495  */
496 static bool
498 {
499  if (am_tablesync_worker())
500  return MyLogicalRepWorker->relid == rel->localreloid;
501  else if (am_parallel_apply_worker())
502  {
503  /* We don't synchronize rel's that are in unknown state. */
504  if (rel->state != SUBREL_STATE_READY &&
505  rel->state != SUBREL_STATE_UNKNOWN)
506  ereport(ERROR,
507  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
508  errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
510  errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
511 
512  return rel->state == SUBREL_STATE_READY;
513  }
514  else
515  return (rel->state == SUBREL_STATE_READY ||
516  (rel->state == SUBREL_STATE_SYNCDONE &&
517  rel->statelsn <= remote_final_lsn));
518 }
519 
520 /*
521  * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
522  *
523  * Start a transaction, if this is the first step (else we keep using the
524  * existing transaction).
525  * Also provide a global snapshot and ensure we run in ApplyMessageContext.
526  */
527 static void
529 {
531 
532  if (!IsTransactionState())
533  {
536  }
537 
539 
541 }
542 
543 /*
544  * Finish up one step of a replication transaction.
545  * Callers of begin_replication_step() must also call this.
546  *
547  * We don't close out the transaction here, but we should increment
548  * the command counter to make the effects of this step visible.
549  */
550 static void
552 {
554 
556 }
557 
558 /*
559  * Handle streamed transactions for both the leader apply worker and the
560  * parallel apply workers.
561  *
562  * In the streaming case (receiving a block of the streamed transaction), for
563  * serialize mode, simply redirect it to a file for the proper toplevel
564  * transaction, and for parallel mode, the leader apply worker will send the
565  * changes to parallel apply workers and the parallel apply worker will define
566  * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
567  * messages will be applied by both leader apply worker and parallel apply
568  * workers).
569  *
570  * Returns true for streamed transactions (when the change is either serialized
571  * to file or sent to parallel apply worker), false otherwise (regular mode or
572  * needs to be processed by parallel apply worker).
573  *
574  * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
575  * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
576  * to a parallel apply worker.
577  */
578 static bool
580 {
581  TransactionId current_xid;
583  TransApplyAction apply_action;
584  StringInfoData original_msg;
585 
586  apply_action = get_transaction_apply_action(stream_xid, &winfo);
587 
588  /* not in streaming mode */
589  if (apply_action == TRANS_LEADER_APPLY)
590  return false;
591 
593 
594  /*
595  * The parallel apply worker needs the xid in this message to decide
596  * whether to define a savepoint, so save the original message that has
597  * not moved the cursor after the xid. We will serialize this message to a
598  * file in PARTIAL_SERIALIZE mode.
599  */
600  original_msg = *s;
601 
602  /*
603  * We should have received XID of the subxact as the first part of the
604  * message, so extract it.
605  */
606  current_xid = pq_getmsgint(s, 4);
607 
608  if (!TransactionIdIsValid(current_xid))
609  ereport(ERROR,
610  (errcode(ERRCODE_PROTOCOL_VIOLATION),
611  errmsg_internal("invalid transaction ID in streamed replication transaction")));
612 
613  switch (apply_action)
614  {
616  Assert(stream_fd);
617 
618  /* Add the new subxact to the array (unless already there). */
619  subxact_info_add(current_xid);
620 
621  /* Write the change to the current file */
623  return true;
624 
626  Assert(winfo);
627 
628  /*
629  * XXX The publisher side doesn't always send relation/type update
630  * messages after the streaming transaction, so also update the
631  * relation/type in leader apply worker. See function
632  * cleanup_rel_sync_cache.
633  */
634  if (pa_send_data(winfo, s->len, s->data))
635  return (action != LOGICAL_REP_MSG_RELATION &&
637 
638  /*
639  * Switch to serialize mode when we are not able to send the
640  * change to parallel apply worker.
641  */
642  pa_switch_to_partial_serialize(winfo, false);
643 
644  /* fall through */
646  stream_write_change(action, &original_msg);
647 
648  /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
649  return (action != LOGICAL_REP_MSG_RELATION &&
651 
654 
655  /* Define a savepoint for a subxact if needed. */
656  pa_start_subtrans(current_xid, stream_xid);
657  return false;
658 
659  default:
660  Assert(false);
661  return false; /* silence compiler warning */
662  }
663 }
664 
665 /*
666  * Executor state preparation for evaluation of constraint expressions,
667  * indexes and triggers for the specified relation.
668  *
669  * Note that the caller must open and close any indexes to be updated.
670  */
671 static ApplyExecutionData *
673 {
674  ApplyExecutionData *edata;
675  EState *estate;
676  RangeTblEntry *rte;
677  List *perminfos = NIL;
678  ResultRelInfo *resultRelInfo;
679 
680  edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
681  edata->targetRel = rel;
682 
683  edata->estate = estate = CreateExecutorState();
684 
685  rte = makeNode(RangeTblEntry);
686  rte->rtekind = RTE_RELATION;
687  rte->relid = RelationGetRelid(rel->localrel);
688  rte->relkind = rel->localrel->rd_rel->relkind;
690 
691  addRTEPermissionInfo(&perminfos, rte);
692 
693  ExecInitRangeTable(estate, list_make1(rte), perminfos);
694 
695  edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
696 
697  /*
698  * Use Relation opened by logicalrep_rel_open() instead of opening it
699  * again.
700  */
701  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
702 
703  /*
704  * We put the ResultRelInfo in the es_opened_result_relations list, even
705  * though we don't populate the es_result_relations array. That's a bit
706  * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
707  *
708  * ExecOpenIndices() is not called here either, each execution path doing
709  * an apply operation being responsible for that.
710  */
712  lappend(estate->es_opened_result_relations, resultRelInfo);
713 
714  estate->es_output_cid = GetCurrentCommandId(true);
715 
716  /* Prepare to catch AFTER triggers. */
718 
719  /* other fields of edata remain NULL for now */
720 
721  return edata;
722 }
723 
724 /*
725  * Finish any operations related to the executor state created by
726  * create_edata_for_relation().
727  */
728 static void
730 {
731  EState *estate = edata->estate;
732 
733  /* Handle any queued AFTER triggers. */
734  AfterTriggerEndQuery(estate);
735 
736  /* Shut down tuple routing, if any was done. */
737  if (edata->proute)
738  ExecCleanupTupleRouting(edata->mtstate, edata->proute);
739 
740  /*
741  * Cleanup. It might seem that we should call ExecCloseResultRelations()
742  * here, but we intentionally don't. It would close the rel we added to
743  * es_opened_result_relations above, which is wrong because we took no
744  * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
745  * any other relations opened during execution.
746  */
747  ExecResetTupleTable(estate->es_tupleTable, false);
748  FreeExecutorState(estate);
749  pfree(edata);
750 }
751 
752 /*
753  * Executes default values for columns for which we can't map to remote
754  * relation columns.
755  *
756  * This allows us to support tables which have more columns on the downstream
757  * than on the upstream.
758  */
759 static void
761  TupleTableSlot *slot)
762 {
763  TupleDesc desc = RelationGetDescr(rel->localrel);
764  int num_phys_attrs = desc->natts;
765  int i;
766  int attnum,
767  num_defaults = 0;
768  int *defmap;
769  ExprState **defexprs;
770  ExprContext *econtext;
771 
772  econtext = GetPerTupleExprContext(estate);
773 
774  /* We got all the data via replication, no need to evaluate anything. */
775  if (num_phys_attrs == rel->remoterel.natts)
776  return;
777 
778  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
779  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
780 
781  Assert(rel->attrmap->maplen == num_phys_attrs);
782  for (attnum = 0; attnum < num_phys_attrs; attnum++)
783  {
784  Expr *defexpr;
785 
786  if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
787  continue;
788 
789  if (rel->attrmap->attnums[attnum] >= 0)
790  continue;
791 
792  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
793 
794  if (defexpr != NULL)
795  {
796  /* Run the expression through planner */
797  defexpr = expression_planner(defexpr);
798 
799  /* Initialize executable expression in copycontext */
800  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
801  defmap[num_defaults] = attnum;
802  num_defaults++;
803  }
804  }
805 
806  for (i = 0; i < num_defaults; i++)
807  slot->tts_values[defmap[i]] =
808  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
809 }
810 
811 /*
812  * Store tuple data into slot.
813  *
814  * Incoming data can be either text or binary format.
815  */
816 static void
818  LogicalRepTupleData *tupleData)
819 {
820  int natts = slot->tts_tupleDescriptor->natts;
821  int i;
822 
823  ExecClearTuple(slot);
824 
825  /* Call the "in" function for each non-dropped, non-null attribute */
826  Assert(natts == rel->attrmap->maplen);
827  for (i = 0; i < natts; i++)
828  {
830  int remoteattnum = rel->attrmap->attnums[i];
831 
832  if (!att->attisdropped && remoteattnum >= 0)
833  {
834  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
835 
836  Assert(remoteattnum < tupleData->ncols);
837 
838  /* Set attnum for error callback */
840 
841  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
842  {
843  Oid typinput;
844  Oid typioparam;
845 
846  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
847  slot->tts_values[i] =
848  OidInputFunctionCall(typinput, colvalue->data,
849  typioparam, att->atttypmod);
850  slot->tts_isnull[i] = false;
851  }
852  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
853  {
854  Oid typreceive;
855  Oid typioparam;
856 
857  /*
858  * In some code paths we may be asked to re-parse the same
859  * tuple data. Reset the StringInfo's cursor so that works.
860  */
861  colvalue->cursor = 0;
862 
863  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
864  slot->tts_values[i] =
865  OidReceiveFunctionCall(typreceive, colvalue,
866  typioparam, att->atttypmod);
867 
868  /* Trouble if it didn't eat the whole buffer */
869  if (colvalue->cursor != colvalue->len)
870  ereport(ERROR,
871  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
872  errmsg("incorrect binary data format in logical replication column %d",
873  remoteattnum + 1)));
874  slot->tts_isnull[i] = false;
875  }
876  else
877  {
878  /*
879  * NULL value from remote. (We don't expect to see
880  * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
881  * NULL.)
882  */
883  slot->tts_values[i] = (Datum) 0;
884  slot->tts_isnull[i] = true;
885  }
886 
887  /* Reset attnum for error callback */
889  }
890  else
891  {
892  /*
893  * We assign NULL to dropped attributes and missing values
894  * (missing values should be later filled using
895  * slot_fill_defaults).
896  */
897  slot->tts_values[i] = (Datum) 0;
898  slot->tts_isnull[i] = true;
899  }
900  }
901 
902  ExecStoreVirtualTuple(slot);
903 }
904 
905 /*
906  * Replace updated columns with data from the LogicalRepTupleData struct.
907  * This is somewhat similar to heap_modify_tuple but also calls the type
908  * input functions on the user data.
909  *
910  * "slot" is filled with a copy of the tuple in "srcslot", replacing
911  * columns provided in "tupleData" and leaving others as-is.
912  *
913  * Caution: unreplaced pass-by-ref columns in "slot" will point into the
914  * storage for "srcslot". This is OK for current usage, but someday we may
915  * need to materialize "slot" at the end to make it independent of "srcslot".
916  */
917 static void
920  LogicalRepTupleData *tupleData)
921 {
922  int natts = slot->tts_tupleDescriptor->natts;
923  int i;
924 
925  /* We'll fill "slot" with a virtual tuple, so we must start with ... */
926  ExecClearTuple(slot);
927 
928  /*
929  * Copy all the column data from srcslot, so that we'll have valid values
930  * for unreplaced columns.
931  */
932  Assert(natts == srcslot->tts_tupleDescriptor->natts);
933  slot_getallattrs(srcslot);
934  memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
935  memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
936 
937  /* Call the "in" function for each replaced attribute */
938  Assert(natts == rel->attrmap->maplen);
939  for (i = 0; i < natts; i++)
940  {
942  int remoteattnum = rel->attrmap->attnums[i];
943 
944  if (remoteattnum < 0)
945  continue;
946 
947  Assert(remoteattnum < tupleData->ncols);
948 
949  if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
950  {
951  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
952 
953  /* Set attnum for error callback */
955 
956  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
957  {
958  Oid typinput;
959  Oid typioparam;
960 
961  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
962  slot->tts_values[i] =
963  OidInputFunctionCall(typinput, colvalue->data,
964  typioparam, att->atttypmod);
965  slot->tts_isnull[i] = false;
966  }
967  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
968  {
969  Oid typreceive;
970  Oid typioparam;
971 
972  /*
973  * In some code paths we may be asked to re-parse the same
974  * tuple data. Reset the StringInfo's cursor so that works.
975  */
976  colvalue->cursor = 0;
977 
978  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
979  slot->tts_values[i] =
980  OidReceiveFunctionCall(typreceive, colvalue,
981  typioparam, att->atttypmod);
982 
983  /* Trouble if it didn't eat the whole buffer */
984  if (colvalue->cursor != colvalue->len)
985  ereport(ERROR,
986  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
987  errmsg("incorrect binary data format in logical replication column %d",
988  remoteattnum + 1)));
989  slot->tts_isnull[i] = false;
990  }
991  else
992  {
993  /* must be LOGICALREP_COLUMN_NULL */
994  slot->tts_values[i] = (Datum) 0;
995  slot->tts_isnull[i] = true;
996  }
997 
998  /* Reset attnum for error callback */
1000  }
1001  }
1002 
1003  /* And finally, declare that "slot" contains a valid virtual tuple */
1004  ExecStoreVirtualTuple(slot);
1005 }
1006 
1007 /*
1008  * Handle BEGIN message.
1009  */
1010 static void
1012 {
1013  LogicalRepBeginData begin_data;
1014 
1015  /* There must not be an active streaming transaction. */
1017 
1018  logicalrep_read_begin(s, &begin_data);
1019  set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
1020 
1021  remote_final_lsn = begin_data.final_lsn;
1022 
1024 
1025  in_remote_transaction = true;
1026 
1028 }
1029 
1030 /*
1031  * Handle COMMIT message.
1032  *
1033  * TODO, support tracking of multiple origins
1034  */
1035 static void
1037 {
1038  LogicalRepCommitData commit_data;
1039 
1040  logicalrep_read_commit(s, &commit_data);
1041 
1042  if (commit_data.commit_lsn != remote_final_lsn)
1043  ereport(ERROR,
1044  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1045  errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
1046  LSN_FORMAT_ARGS(commit_data.commit_lsn),
1048 
1049  apply_handle_commit_internal(&commit_data);
1050 
1051  /* Process any tables that are being synchronized in parallel. */
1052  process_syncing_tables(commit_data.end_lsn);
1053 
1056 }
1057 
1058 /*
1059  * Handle BEGIN PREPARE message.
1060  */
1061 static void
1063 {
1064  LogicalRepPreparedTxnData begin_data;
1065 
1066  /* Tablesync should never receive prepare. */
1067  if (am_tablesync_worker())
1068  ereport(ERROR,
1069  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1070  errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1071 
1072  /* There must not be an active streaming transaction. */
1074 
1075  logicalrep_read_begin_prepare(s, &begin_data);
1076  set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1077 
1078  remote_final_lsn = begin_data.prepare_lsn;
1079 
1081 
1082  in_remote_transaction = true;
1083 
1085 }
1086 
1087 /*
1088  * Common function to prepare the GID.
1089  */
1090 static void
1092 {
1093  char gid[GIDSIZE];
1094 
1095  /*
1096  * Compute unique GID for two_phase transactions. We don't use GID of
1097  * prepared transaction sent by server as that can lead to deadlock when
1098  * we have multiple subscriptions from same node point to publications on
1099  * the same node. See comments atop worker.c
1100  */
1101  TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
1102  gid, sizeof(gid));
1103 
1104  /*
1105  * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1106  * called within the PrepareTransactionBlock below.
1107  */
1108  if (!IsTransactionBlock())
1109  {
1111  CommitTransactionCommand(); /* Completes the preceding Begin command. */
1112  }
1113 
1114  /*
1115  * Update origin state so we can restart streaming from correct position
1116  * in case of crash.
1117  */
1118  replorigin_session_origin_lsn = prepare_data->end_lsn;
1120 
1122 }
1123 
1124 /*
1125  * Handle PREPARE message.
1126  */
1127 static void
1129 {
1130  LogicalRepPreparedTxnData prepare_data;
1131 
1132  logicalrep_read_prepare(s, &prepare_data);
1133 
1134  if (prepare_data.prepare_lsn != remote_final_lsn)
1135  ereport(ERROR,
1136  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1137  errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
1138  LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1140 
1141  /*
1142  * Unlike commit, here, we always prepare the transaction even though no
1143  * change has happened in this transaction or all changes are skipped. It
1144  * is done this way because at commit prepared time, we won't know whether
1145  * we have skipped preparing a transaction because of those reasons.
1146  *
1147  * XXX, We can optimize such that at commit prepared time, we first check
1148  * whether we have prepared the transaction or not but that doesn't seem
1149  * worthwhile because such cases shouldn't be common.
1150  */
1152 
1153  apply_handle_prepare_internal(&prepare_data);
1154 
1157  pgstat_report_stat(false);
1158 
1160 
1161  in_remote_transaction = false;
1162 
1163  /* Process any tables that are being synchronized in parallel. */
1164  process_syncing_tables(prepare_data.end_lsn);
1165 
1166  /*
1167  * Since we have already prepared the transaction, in a case where the
1168  * server crashes before clearing the subskiplsn, it will be left but the
1169  * transaction won't be resent. But that's okay because it's a rare case
1170  * and the subskiplsn will be cleared when finishing the next transaction.
1171  */
1174 
1177 }
1178 
1179 /*
1180  * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1181  *
1182  * Note that we don't need to wait here if the transaction was prepared in a
1183  * parallel apply worker. In that case, we have already waited for the prepare
1184  * to finish in apply_handle_stream_prepare() which will ensure all the
1185  * operations in that transaction have happened in the subscriber, so no
1186  * concurrent transaction can cause deadlock or transaction dependency issues.
1187  */
1188 static void
1190 {
1191  LogicalRepCommitPreparedTxnData prepare_data;
1192  char gid[GIDSIZE];
1193 
1194  logicalrep_read_commit_prepared(s, &prepare_data);
1195  set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1196 
1197  /* Compute GID for two_phase transactions. */
1199  gid, sizeof(gid));
1200 
1201  /* There is no transaction when COMMIT PREPARED is called */
1203 
1204  /*
1205  * Update origin state so we can restart streaming from correct position
1206  * in case of crash.
1207  */
1208  replorigin_session_origin_lsn = prepare_data.end_lsn;
1210 
1211  FinishPreparedTransaction(gid, true);
1214  pgstat_report_stat(false);
1215 
1217  in_remote_transaction = false;
1218 
1219  /* Process any tables that are being synchronized in parallel. */
1220  process_syncing_tables(prepare_data.end_lsn);
1221 
1222  clear_subscription_skip_lsn(prepare_data.end_lsn);
1223 
1226 }
1227 
1228 /*
1229  * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1230  *
1231  * Note that we don't need to wait here if the transaction was prepared in a
1232  * parallel apply worker. In that case, we have already waited for the prepare
1233  * to finish in apply_handle_stream_prepare() which will ensure all the
1234  * operations in that transaction have happened in the subscriber, so no
1235  * concurrent transaction can cause deadlock or transaction dependency issues.
1236  */
1237 static void
1239 {
1240  LogicalRepRollbackPreparedTxnData rollback_data;
1241  char gid[GIDSIZE];
1242 
1243  logicalrep_read_rollback_prepared(s, &rollback_data);
1244  set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1245 
1246  /* Compute GID for two_phase transactions. */
1247  TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1248  gid, sizeof(gid));
1249 
1250  /*
1251  * It is possible that we haven't received prepare because it occurred
1252  * before walsender reached a consistent point or the two_phase was still
1253  * not enabled by that time, so in such cases, we need to skip rollback
1254  * prepared.
1255  */
1256  if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1257  rollback_data.prepare_time))
1258  {
1259  /*
1260  * Update origin state so we can restart streaming from correct
1261  * position in case of crash.
1262  */
1265 
1266  /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1268  FinishPreparedTransaction(gid, false);
1271 
1273  }
1274 
1275  pgstat_report_stat(false);
1276 
1278  in_remote_transaction = false;
1279 
1280  /* Process any tables that are being synchronized in parallel. */
1282 
1285 }
1286 
1287 /*
1288  * Handle STREAM PREPARE.
1289  */
1290 static void
1292 {
1293  LogicalRepPreparedTxnData prepare_data;
1294  ParallelApplyWorkerInfo *winfo;
1295  TransApplyAction apply_action;
1296 
1297  /* Save the message before it is consumed. */
1298  StringInfoData original_msg = *s;
1299 
1301  ereport(ERROR,
1302  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1303  errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1304 
1305  /* Tablesync should never receive prepare. */
1306  if (am_tablesync_worker())
1307  ereport(ERROR,
1308  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1309  errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1310 
1311  logicalrep_read_stream_prepare(s, &prepare_data);
1312  set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1313 
1314  apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1315 
1316  switch (apply_action)
1317  {
1318  case TRANS_LEADER_APPLY:
1319 
1320  /*
1321  * The transaction has been serialized to file, so replay all the
1322  * spooled operations.
1323  */
1325  prepare_data.xid, prepare_data.prepare_lsn);
1326 
1327  /* Mark the transaction as prepared. */
1328  apply_handle_prepare_internal(&prepare_data);
1329 
1331 
1333 
1334  in_remote_transaction = false;
1335 
1336  /* Unlink the files with serialized changes and subxact info. */
1338 
1339  elog(DEBUG1, "finished processing the STREAM PREPARE command");
1340  break;
1341 
1343  Assert(winfo);
1344 
1345  if (pa_send_data(winfo, s->len, s->data))
1346  {
1347  /* Finish processing the streaming transaction. */
1348  pa_xact_finish(winfo, prepare_data.end_lsn);
1349  break;
1350  }
1351 
1352  /*
1353  * Switch to serialize mode when we are not able to send the
1354  * change to parallel apply worker.
1355  */
1356  pa_switch_to_partial_serialize(winfo, true);
1357 
1358  /* fall through */
1360  Assert(winfo);
1361 
1362  stream_open_and_write_change(prepare_data.xid,
1364  &original_msg);
1365 
1367 
1368  /* Finish processing the streaming transaction. */
1369  pa_xact_finish(winfo, prepare_data.end_lsn);
1370  break;
1371 
1372  case TRANS_PARALLEL_APPLY:
1373 
1374  /*
1375  * If the parallel apply worker is applying spooled messages then
1376  * close the file before preparing.
1377  */
1378  if (stream_fd)
1380 
1382 
1383  /* Mark the transaction as prepared. */
1384  apply_handle_prepare_internal(&prepare_data);
1385 
1387 
1389 
1391 
1394 
1396 
1397  elog(DEBUG1, "finished processing the STREAM PREPARE command");
1398  break;
1399 
1400  default:
1401  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1402  break;
1403  }
1404 
1405  pgstat_report_stat(false);
1406 
1407  /* Process any tables that are being synchronized in parallel. */
1408  process_syncing_tables(prepare_data.end_lsn);
1409 
1410  /*
1411  * Similar to prepare case, the subskiplsn could be left in a case of
1412  * server crash but it's okay. See the comments in apply_handle_prepare().
1413  */
1416 
1418 
1420 }
1421 
1422 /*
1423  * Handle ORIGIN message.
1424  *
1425  * TODO, support tracking of multiple origins
1426  */
1427 static void
1429 {
1430  /*
1431  * ORIGIN message can only come inside streaming transaction or inside
1432  * remote transaction and before any actual writes.
1433  */
1434  if (!in_streamed_transaction &&
1437  ereport(ERROR,
1438  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1439  errmsg_internal("ORIGIN message sent out of order")));
1440 }
1441 
1442 /*
1443  * Initialize fileset (if not already done).
1444  *
1445  * Create a new file when first_segment is true, otherwise open the existing
1446  * file.
1447  */
1448 void
1449 stream_start_internal(TransactionId xid, bool first_segment)
1450 {
1452 
1453  /*
1454  * Initialize the worker's stream_fileset if we haven't yet. This will be
1455  * used for the entire duration of the worker so create it in a permanent
1456  * context. We create this on the very first streaming message from any
1457  * transaction and then use it for this and other streaming transactions.
1458  * Now, we could create a fileset at the start of the worker as well but
1459  * then we won't be sure that it will ever be used.
1460  */
1462  {
1463  MemoryContext oldctx;
1464 
1466 
1469 
1470  MemoryContextSwitchTo(oldctx);
1471  }
1472 
1473  /* Open the spool file for this transaction. */
1474  stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1475 
1476  /* If this is not the first segment, open existing subxact file. */
1477  if (!first_segment)
1479 
1481 }
1482 
1483 /*
1484  * Handle STREAM START message.
1485  */
1486 static void
1488 {
1489  bool first_segment;
1490  ParallelApplyWorkerInfo *winfo;
1491  TransApplyAction apply_action;
1492 
1493  /* Save the message before it is consumed. */
1494  StringInfoData original_msg = *s;
1495 
1497  ereport(ERROR,
1498  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1499  errmsg_internal("duplicate STREAM START message")));
1500 
1501  /* There must not be an active streaming transaction. */
1503 
1504  /* notify handle methods we're processing a remote transaction */
1505  in_streamed_transaction = true;
1506 
1507  /* extract XID of the top-level transaction */
1508  stream_xid = logicalrep_read_stream_start(s, &first_segment);
1509 
1511  ereport(ERROR,
1512  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1513  errmsg_internal("invalid transaction ID in streamed replication transaction")));
1514 
1516 
1517  /* Try to allocate a worker for the streaming transaction. */
1518  if (first_segment)
1520 
1521  apply_action = get_transaction_apply_action(stream_xid, &winfo);
1522 
1523  switch (apply_action)
1524  {
1526 
1527  /*
1528  * Function stream_start_internal starts a transaction. This
1529  * transaction will be committed on the stream stop unless it is a
1530  * tablesync worker in which case it will be committed after
1531  * processing all the messages. We need this transaction for
1532  * handling the BufFile, used for serializing the streaming data
1533  * and subxact info.
1534  */
1535  stream_start_internal(stream_xid, first_segment);
1536  break;
1537 
1539  Assert(winfo);
1540 
1541  /*
1542  * Once we start serializing the changes, the parallel apply
1543  * worker will wait for the leader to release the stream lock
1544  * until the end of the transaction. So, we don't need to release
1545  * the lock or increment the stream count in that case.
1546  */
1547  if (pa_send_data(winfo, s->len, s->data))
1548  {
1549  /*
1550  * Unlock the shared object lock so that the parallel apply
1551  * worker can continue to receive changes.
1552  */
1553  if (!first_segment)
1555 
1556  /*
1557  * Increment the number of streaming blocks waiting to be
1558  * processed by parallel apply worker.
1559  */
1561 
1562  /* Cache the parallel apply worker for this transaction. */
1564  break;
1565  }
1566 
1567  /*
1568  * Switch to serialize mode when we are not able to send the
1569  * change to parallel apply worker.
1570  */
1571  pa_switch_to_partial_serialize(winfo, !first_segment);
1572 
1573  /* fall through */
1575  Assert(winfo);
1576 
1577  /*
1578  * Open the spool file unless it was already opened when switching
1579  * to serialize mode. The transaction started in
1580  * stream_start_internal will be committed on the stream stop.
1581  */
1582  if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1583  stream_start_internal(stream_xid, first_segment);
1584 
1586 
1587  /* Cache the parallel apply worker for this transaction. */
1589  break;
1590 
1591  case TRANS_PARALLEL_APPLY:
1592  if (first_segment)
1593  {
1594  /* Hold the lock until the end of the transaction. */
1597 
1598  /*
1599  * Signal the leader apply worker, as it may be waiting for
1600  * us.
1601  */
1603  }
1604 
1606  break;
1607 
1608  default:
1609  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1610  break;
1611  }
1612 
1614 }
1615 
1616 /*
1617  * Update the information about subxacts and close the file.
1618  *
1619  * This function should be called when the stream_start_internal function has
1620  * been called.
1621  */
1622 void
1624 {
1625  /*
1626  * Serialize information about subxacts for the toplevel transaction, then
1627  * close the stream messages spool file.
1628  */
1631 
1632  /* We must be in a valid transaction state */
1634 
1635  /* Commit the per-stream transaction */
1637 
1638  /* Reset per-stream context */
1640 }
1641 
1642 /*
1643  * Handle STREAM STOP message.
1644  */
1645 static void
1647 {
1648  ParallelApplyWorkerInfo *winfo;
1649  TransApplyAction apply_action;
1650 
1652  ereport(ERROR,
1653  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1654  errmsg_internal("STREAM STOP message without STREAM START")));
1655 
1656  apply_action = get_transaction_apply_action(stream_xid, &winfo);
1657 
1658  switch (apply_action)
1659  {
1662  break;
1663 
1665  Assert(winfo);
1666 
1667  /*
1668  * Lock before sending the STREAM_STOP message so that the leader
1669  * can hold the lock first and the parallel apply worker will wait
1670  * for leader to release the lock. See Locking Considerations atop
1671  * applyparallelworker.c.
1672  */
1674 
1675  if (pa_send_data(winfo, s->len, s->data))
1676  {
1678  break;
1679  }
1680 
1681  /*
1682  * Switch to serialize mode when we are not able to send the
1683  * change to parallel apply worker.
1684  */
1685  pa_switch_to_partial_serialize(winfo, true);
1686 
1687  /* fall through */
1692  break;
1693 
1694  case TRANS_PARALLEL_APPLY:
1695  elog(DEBUG1, "applied %u changes in the streaming chunk",
1697 
1698  /*
1699  * By the time parallel apply worker is processing the changes in
1700  * the current streaming block, the leader apply worker may have
1701  * sent multiple streaming blocks. This can lead to parallel apply
1702  * worker start waiting even when there are more chunk of streams
1703  * in the queue. So, try to lock only if there is no message left
1704  * in the queue. See Locking Considerations atop
1705  * applyparallelworker.c.
1706  *
1707  * Note that here we have a race condition where we can start
1708  * waiting even when there are pending streaming chunks. This can
1709  * happen if the leader sends another streaming block and acquires
1710  * the stream lock again after the parallel apply worker checks
1711  * that there is no pending streaming block and before it actually
1712  * starts waiting on a lock. We can handle this case by not
1713  * allowing the leader to increment the stream block count during
1714  * the time parallel apply worker acquires the lock but it is not
1715  * clear whether that is worth the complexity.
1716  *
1717  * Now, if this missed chunk contains rollback to savepoint, then
1718  * there is a risk of deadlock which probably shouldn't happen
1719  * after restart.
1720  */
1722  break;
1723 
1724  default:
1725  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1726  break;
1727  }
1728 
1729  in_streamed_transaction = false;
1731 
1732  /*
1733  * The parallel apply worker could be in a transaction in which case we
1734  * need to report the state as STATE_IDLEINTRANSACTION.
1735  */
1738  else
1740 
1742 }
1743 
1744 /*
1745  * Helper function to handle STREAM ABORT message when the transaction was
1746  * serialized to file.
1747  */
1748 static void
1750 {
1751  /*
1752  * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1753  * just delete the files with serialized info.
1754  */
1755  if (xid == subxid)
1757  else
1758  {
1759  /*
1760  * OK, so it's a subxact. We need to read the subxact file for the
1761  * toplevel transaction, determine the offset tracked for the subxact,
1762  * and truncate the file with changes. We also remove the subxacts
1763  * with higher offsets (or rather higher XIDs).
1764  *
1765  * We intentionally scan the array from the tail, because we're likely
1766  * aborting a change for the most recent subtransactions.
1767  *
1768  * We can't use the binary search here as subxact XIDs won't
1769  * necessarily arrive in sorted order, consider the case where we have
1770  * released the savepoint for multiple subtransactions and then
1771  * performed rollback to savepoint for one of the earlier
1772  * sub-transaction.
1773  */
1774  int64 i;
1775  int64 subidx;
1776  BufFile *fd;
1777  bool found = false;
1778  char path[MAXPGPATH];
1779 
1780  subidx = -1;
1783 
1784  for (i = subxact_data.nsubxacts; i > 0; i--)
1785  {
1786  if (subxact_data.subxacts[i - 1].xid == subxid)
1787  {
1788  subidx = (i - 1);
1789  found = true;
1790  break;
1791  }
1792  }
1793 
1794  /*
1795  * If it's an empty sub-transaction then we will not find the subxid
1796  * here so just cleanup the subxact info and return.
1797  */
1798  if (!found)
1799  {
1800  /* Cleanup the subxact info */
1804  return;
1805  }
1806 
1807  /* open the changes file */
1810  O_RDWR, false);
1811 
1812  /* OK, truncate the file at the right offset */
1814  subxact_data.subxacts[subidx].offset);
1815  BufFileClose(fd);
1816 
1817  /* discard the subxacts added later */
1818  subxact_data.nsubxacts = subidx;
1819 
1820  /* write the updated subxact list */
1822 
1825  }
1826 }
1827 
1828 /*
1829  * Handle STREAM ABORT message.
1830  */
1831 static void
1833 {
1834  TransactionId xid;
1835  TransactionId subxid;
1836  LogicalRepStreamAbortData abort_data;
1837  ParallelApplyWorkerInfo *winfo;
1838  TransApplyAction apply_action;
1839 
1840  /* Save the message before it is consumed. */
1841  StringInfoData original_msg = *s;
1842  bool toplevel_xact;
1843 
1845  ereport(ERROR,
1846  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1847  errmsg_internal("STREAM ABORT message without STREAM STOP")));
1848 
1849  /* We receive abort information only when we can apply in parallel. */
1850  logicalrep_read_stream_abort(s, &abort_data,
1852 
1853  xid = abort_data.xid;
1854  subxid = abort_data.subxid;
1855  toplevel_xact = (xid == subxid);
1856 
1857  set_apply_error_context_xact(subxid, abort_data.abort_lsn);
1858 
1859  apply_action = get_transaction_apply_action(xid, &winfo);
1860 
1861  switch (apply_action)
1862  {
1863  case TRANS_LEADER_APPLY:
1864 
1865  /*
1866  * We are in the leader apply worker and the transaction has been
1867  * serialized to file.
1868  */
1869  stream_abort_internal(xid, subxid);
1870 
1871  elog(DEBUG1, "finished processing the STREAM ABORT command");
1872  break;
1873 
1875  Assert(winfo);
1876 
1877  /*
1878  * For the case of aborting the subtransaction, we increment the
1879  * number of streaming blocks and take the lock again before
1880  * sending the STREAM_ABORT to ensure that the parallel apply
1881  * worker will wait on the lock for the next set of changes after
1882  * processing the STREAM_ABORT message if it is not already
1883  * waiting for STREAM_STOP message.
1884  *
1885  * It is important to perform this locking before sending the
1886  * STREAM_ABORT message so that the leader can hold the lock first
1887  * and the parallel apply worker will wait for the leader to
1888  * release the lock. This is the same as what we do in
1889  * apply_handle_stream_stop. See Locking Considerations atop
1890  * applyparallelworker.c.
1891  */
1892  if (!toplevel_xact)
1893  {
1897  }
1898 
1899  if (pa_send_data(winfo, s->len, s->data))
1900  {
1901  /*
1902  * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
1903  * wait here for the parallel apply worker to finish as that
1904  * is not required to maintain the commit order and won't have
1905  * the risk of failures due to transaction dependencies and
1906  * deadlocks. However, it is possible that before the parallel
1907  * worker finishes and we clear the worker info, the xid
1908  * wraparound happens on the upstream and a new transaction
1909  * with the same xid can appear and that can lead to duplicate
1910  * entries in ParallelApplyTxnHash. Yet another problem could
1911  * be that we may have serialized the changes in partial
1912  * serialize mode and the file containing xact changes may
1913  * already exist, and after xid wraparound trying to create
1914  * the file for the same xid can lead to an error. To avoid
1915  * these problems, we decide to wait for the aborts to finish.
1916  *
1917  * Note, it is okay to not update the flush location position
1918  * for aborts as in worst case that means such a transaction
1919  * won't be sent again after restart.
1920  */
1921  if (toplevel_xact)
1923 
1924  break;
1925  }
1926 
1927  /*
1928  * Switch to serialize mode when we are not able to send the
1929  * change to parallel apply worker.
1930  */
1931  pa_switch_to_partial_serialize(winfo, true);
1932 
1933  /* fall through */
1935  Assert(winfo);
1936 
1937  /*
1938  * Parallel apply worker might have applied some changes, so write
1939  * the STREAM_ABORT message so that it can rollback the
1940  * subtransaction if needed.
1941  */
1943  &original_msg);
1944 
1945  if (toplevel_xact)
1946  {
1949  }
1950  break;
1951 
1952  case TRANS_PARALLEL_APPLY:
1953 
1954  /*
1955  * If the parallel apply worker is applying spooled messages then
1956  * close the file before aborting.
1957  */
1958  if (toplevel_xact && stream_fd)
1960 
1961  pa_stream_abort(&abort_data);
1962 
1963  /*
1964  * We need to wait after processing rollback to savepoint for the
1965  * next set of changes.
1966  *
1967  * We have a race condition here due to which we can start waiting
1968  * here when there are more chunk of streams in the queue. See
1969  * apply_handle_stream_stop.
1970  */
1971  if (!toplevel_xact)
1973 
1974  elog(DEBUG1, "finished processing the STREAM ABORT command");
1975  break;
1976 
1977  default:
1978  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1979  break;
1980  }
1981 
1983 }
1984 
1985 /*
1986  * Ensure that the passed location is fileset's end.
1987  */
1988 static void
1989 ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
1990  off_t offset)
1991 {
1992  char path[MAXPGPATH];
1993  BufFile *fd;
1994  int last_fileno;
1995  off_t last_offset;
1996 
1998 
2000 
2002 
2003  fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2004 
2005  BufFileSeek(fd, 0, 0, SEEK_END);
2006  BufFileTell(fd, &last_fileno, &last_offset);
2007 
2008  BufFileClose(fd);
2009 
2011 
2012  if (last_fileno != fileno || last_offset != offset)
2013  elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2014  path);
2015 }
2016 
2017 /*
2018  * Common spoolfile processing.
2019  */
2020 void
2022  XLogRecPtr lsn)
2023 {
2025  int nchanges;
2026  char path[MAXPGPATH];
2027  char *buffer = NULL;
2028  MemoryContext oldcxt;
2029  ResourceOwner oldowner;
2030  int fileno;
2031  off_t offset;
2032 
2033  if (!am_parallel_apply_worker())
2035 
2036  /* Make sure we have an open transaction */
2038 
2039  /*
2040  * Allocate file handle and memory required to process all the messages in
2041  * TopTransactionContext to avoid them getting reset after each message is
2042  * processed.
2043  */
2045 
2046  /* Open the spool file for the committed/prepared transaction */
2048  elog(DEBUG1, "replaying changes from file \"%s\"", path);
2049 
2050  /*
2051  * Make sure the file is owned by the toplevel transaction so that the
2052  * file will not be accidentally closed when aborting a subtransaction.
2053  */
2054  oldowner = CurrentResourceOwner;
2056 
2057  stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2058 
2059  CurrentResourceOwner = oldowner;
2060 
2061  buffer = palloc(BLCKSZ);
2062  initStringInfo(&s2);
2063 
2064  MemoryContextSwitchTo(oldcxt);
2065 
2066  remote_final_lsn = lsn;
2067 
2068  /*
2069  * Make sure the handle apply_dispatch methods are aware we're in a remote
2070  * transaction.
2071  */
2072  in_remote_transaction = true;
2074 
2076 
2077  /*
2078  * Read the entries one by one and pass them through the same logic as in
2079  * apply_dispatch.
2080  */
2081  nchanges = 0;
2082  while (true)
2083  {
2084  size_t nbytes;
2085  int len;
2086 
2088 
2089  /* read length of the on-disk record */
2090  nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2091 
2092  /* have we reached end of the file? */
2093  if (nbytes == 0)
2094  break;
2095 
2096  /* do we have a correct length? */
2097  if (len <= 0)
2098  elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2099  len, path);
2100 
2101  /* make sure we have sufficiently large buffer */
2102  buffer = repalloc(buffer, len);
2103 
2104  /* and finally read the data into the buffer */
2105  BufFileReadExact(stream_fd, buffer, len);
2106 
2107  BufFileTell(stream_fd, &fileno, &offset);
2108 
2109  /* copy the buffer to the stringinfo and call apply_dispatch */
2110  resetStringInfo(&s2);
2111  appendBinaryStringInfo(&s2, buffer, len);
2112 
2113  /* Ensure we are reading the data into our memory context. */
2115 
2116  apply_dispatch(&s2);
2117 
2119 
2120  MemoryContextSwitchTo(oldcxt);
2121 
2122  nchanges++;
2123 
2124  /*
2125  * It is possible the file has been closed because we have processed
2126  * the transaction end message like stream_commit in which case that
2127  * must be the last message.
2128  */
2129  if (!stream_fd)
2130  {
2131  ensure_last_message(stream_fileset, xid, fileno, offset);
2132  break;
2133  }
2134 
2135  if (nchanges % 1000 == 0)
2136  elog(DEBUG1, "replayed %d changes from file \"%s\"",
2137  nchanges, path);
2138  }
2139 
2140  if (stream_fd)
2142 
2143  elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2144  nchanges, path);
2145 
2146  return;
2147 }
2148 
2149 /*
2150  * Handle STREAM COMMIT message.
2151  */
2152 static void
2154 {
2155  TransactionId xid;
2156  LogicalRepCommitData commit_data;
2157  ParallelApplyWorkerInfo *winfo;
2158  TransApplyAction apply_action;
2159 
2160  /* Save the message before it is consumed. */
2161  StringInfoData original_msg = *s;
2162 
2164  ereport(ERROR,
2165  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2166  errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2167 
2168  xid = logicalrep_read_stream_commit(s, &commit_data);
2169  set_apply_error_context_xact(xid, commit_data.commit_lsn);
2170 
2171  apply_action = get_transaction_apply_action(xid, &winfo);
2172 
2173  switch (apply_action)
2174  {
2175  case TRANS_LEADER_APPLY:
2176 
2177  /*
2178  * The transaction has been serialized to file, so replay all the
2179  * spooled operations.
2180  */
2182  commit_data.commit_lsn);
2183 
2184  apply_handle_commit_internal(&commit_data);
2185 
2186  /* Unlink the files with serialized changes and subxact info. */
2188 
2189  elog(DEBUG1, "finished processing the STREAM COMMIT command");
2190  break;
2191 
2193  Assert(winfo);
2194 
2195  if (pa_send_data(winfo, s->len, s->data))
2196  {
2197  /* Finish processing the streaming transaction. */
2198  pa_xact_finish(winfo, commit_data.end_lsn);
2199  break;
2200  }
2201 
2202  /*
2203  * Switch to serialize mode when we are not able to send the
2204  * change to parallel apply worker.
2205  */
2206  pa_switch_to_partial_serialize(winfo, true);
2207 
2208  /* fall through */
2210  Assert(winfo);
2211 
2213  &original_msg);
2214 
2216 
2217  /* Finish processing the streaming transaction. */
2218  pa_xact_finish(winfo, commit_data.end_lsn);
2219  break;
2220 
2221  case TRANS_PARALLEL_APPLY:
2222 
2223  /*
2224  * If the parallel apply worker is applying spooled messages then
2225  * close the file before committing.
2226  */
2227  if (stream_fd)
2229 
2230  apply_handle_commit_internal(&commit_data);
2231 
2233 
2234  /*
2235  * It is important to set the transaction state as finished before
2236  * releasing the lock. See pa_wait_for_xact_finish.
2237  */
2240 
2242 
2243  elog(DEBUG1, "finished processing the STREAM COMMIT command");
2244  break;
2245 
2246  default:
2247  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2248  break;
2249  }
2250 
2251  /* Process any tables that are being synchronized in parallel. */
2252  process_syncing_tables(commit_data.end_lsn);
2253 
2255 
2257 }
2258 
2259 /*
2260  * Helper function for apply_handle_commit and apply_handle_stream_commit.
2261  */
2262 static void
2264 {
2265  if (is_skipping_changes())
2266  {
2268 
2269  /*
2270  * Start a new transaction to clear the subskiplsn, if not started
2271  * yet.
2272  */
2273  if (!IsTransactionState())
2275  }
2276 
2277  if (IsTransactionState())
2278  {
2279  /*
2280  * The transaction is either non-empty or skipped, so we clear the
2281  * subskiplsn.
2282  */
2284 
2285  /*
2286  * Update origin state so we can restart streaming from correct
2287  * position in case of crash.
2288  */
2289  replorigin_session_origin_lsn = commit_data->end_lsn;
2291 
2293 
2294  if (IsTransactionBlock())
2295  {
2296  EndTransactionBlock(false);
2298  }
2299 
2300  pgstat_report_stat(false);
2301 
2303  }
2304  else
2305  {
2306  /* Process any invalidation messages that might have accumulated. */
2309  }
2310 
2311  in_remote_transaction = false;
2312 }
2313 
2314 /*
2315  * Handle RELATION message.
2316  *
2317  * Note we don't do validation against local schema here. The validation
2318  * against local schema is postponed until first change for given relation
2319  * comes as we only care about it when applying changes for it anyway and we
2320  * do less locking this way.
2321  */
2322 static void
2324 {
2325  LogicalRepRelation *rel;
2326 
2328  return;
2329 
2330  rel = logicalrep_read_rel(s);
2332 
2333  /* Also reset all entries in the partition map that refer to remoterel. */
2335 }
2336 
2337 /*
2338  * Handle TYPE message.
2339  *
2340  * This implementation pays no attention to TYPE messages; we expect the user
2341  * to have set things up so that the incoming data is acceptable to the input
2342  * functions for the locally subscribed tables. Hence, we just read and
2343  * discard the message.
2344  */
2345 static void
2347 {
2348  LogicalRepTyp typ;
2349 
2351  return;
2352 
2353  logicalrep_read_typ(s, &typ);
2354 }
2355 
2356 /*
2357  * Check that we (the subscription owner) have sufficient privileges on the
2358  * target relation to perform the given operation.
2359  */
2360 static void
2362 {
2363  Oid relid;
2364  AclResult aclresult;
2365 
2366  relid = RelationGetRelid(rel);
2367  aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2368  if (aclresult != ACLCHECK_OK)
2369  aclcheck_error(aclresult,
2370  get_relkind_objtype(rel->rd_rel->relkind),
2371  get_rel_name(relid));
2372 
2373  /*
2374  * We lack the infrastructure to honor RLS policies. It might be possible
2375  * to add such infrastructure here, but tablesync workers lack it, too, so
2376  * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2377  * but it seems dangerous to replicate a TRUNCATE and then refuse to
2378  * replicate subsequent INSERTs, so we forbid all commands the same.
2379  */
2380  if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2381  ereport(ERROR,
2382  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2383  errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2384  GetUserNameFromId(GetUserId(), true),
2385  RelationGetRelationName(rel))));
2386 }
2387 
2388 /*
2389  * Handle INSERT message.
2390  */
2391 
2392 static void
2394 {
2395  LogicalRepRelMapEntry *rel;
2396  LogicalRepTupleData newtup;
2397  LogicalRepRelId relid;
2398  ApplyExecutionData *edata;
2399  EState *estate;
2400  TupleTableSlot *remoteslot;
2401  MemoryContext oldctx;
2402 
2403  /*
2404  * Quick return if we are skipping data modification changes or handling
2405  * streamed transactions.
2406  */
2407  if (is_skipping_changes() ||
2409  return;
2410 
2412 
2413  relid = logicalrep_read_insert(s, &newtup);
2414  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2415  if (!should_apply_changes_for_rel(rel))
2416  {
2417  /*
2418  * The relation can't become interesting in the middle of the
2419  * transaction so it's safe to unlock it.
2420  */
2423  return;
2424  }
2425 
2426  /* Set relation for error callback */
2428 
2429  /* Initialize the executor state. */
2430  edata = create_edata_for_relation(rel);
2431  estate = edata->estate;
2432  remoteslot = ExecInitExtraTupleSlot(estate,
2433  RelationGetDescr(rel->localrel),
2434  &TTSOpsVirtual);
2435 
2436  /* Process and store remote tuple in the slot */
2438  slot_store_data(remoteslot, rel, &newtup);
2439  slot_fill_defaults(rel, estate, remoteslot);
2440  MemoryContextSwitchTo(oldctx);
2441 
2442  /* For a partitioned table, insert the tuple into a partition. */
2443  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2445  remoteslot, NULL, CMD_INSERT);
2446  else
2448  remoteslot);
2449 
2450  finish_edata(edata);
2451 
2452  /* Reset relation for error callback */
2454 
2456 
2458 }
2459 
2460 /*
2461  * Workhorse for apply_handle_insert()
2462  * relinfo is for the relation we're actually inserting into
2463  * (could be a child partition of edata->targetRelInfo)
2464  */
2465 static void
2467  ResultRelInfo *relinfo,
2468  TupleTableSlot *remoteslot)
2469 {
2470  EState *estate = edata->estate;
2471 
2472  /* We must open indexes here. */
2473  ExecOpenIndices(relinfo, false);
2474 
2475  /* Do the insert. */
2477  ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2478 
2479  /* Cleanup. */
2480  ExecCloseIndices(relinfo);
2481 }
2482 
2483 /*
2484  * Check if the logical replication relation is updatable and throw
2485  * appropriate error if it isn't.
2486  */
2487 static void
2489 {
2490  /*
2491  * For partitioned tables, we only need to care if the target partition is
2492  * updatable (aka has PK or RI defined for it).
2493  */
2494  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2495  return;
2496 
2497  /* Updatable, no error. */
2498  if (rel->updatable)
2499  return;
2500 
2501  /*
2502  * We are in error mode so it's fine this is somewhat slow. It's better to
2503  * give user correct error.
2504  */
2506  {
2507  ereport(ERROR,
2508  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2509  errmsg("publisher did not send replica identity column "
2510  "expected by the logical replication target relation \"%s.%s\"",
2511  rel->remoterel.nspname, rel->remoterel.relname)));
2512  }
2513 
2514  ereport(ERROR,
2515  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2516  errmsg("logical replication target relation \"%s.%s\" has "
2517  "neither REPLICA IDENTITY index nor PRIMARY "
2518  "KEY and published relation does not have "
2519  "REPLICA IDENTITY FULL",
2520  rel->remoterel.nspname, rel->remoterel.relname)));
2521 }
2522 
2523 /*
2524  * Handle UPDATE message.
2525  *
2526  * TODO: FDW support
2527  */
2528 static void
2530 {
2531  LogicalRepRelMapEntry *rel;
2532  LogicalRepRelId relid;
2533  ApplyExecutionData *edata;
2534  EState *estate;
2535  LogicalRepTupleData oldtup;
2536  LogicalRepTupleData newtup;
2537  bool has_oldtup;
2538  TupleTableSlot *remoteslot;
2539  RTEPermissionInfo *target_perminfo;
2540  MemoryContext oldctx;
2541 
2542  /*
2543  * Quick return if we are skipping data modification changes or handling
2544  * streamed transactions.
2545  */
2546  if (is_skipping_changes() ||
2548  return;
2549 
2551 
2552  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2553  &newtup);
2554  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2555  if (!should_apply_changes_for_rel(rel))
2556  {
2557  /*
2558  * The relation can't become interesting in the middle of the
2559  * transaction so it's safe to unlock it.
2560  */
2563  return;
2564  }
2565 
2566  /* Set relation for error callback */
2568 
2569  /* Check if we can do the update. */
2571 
2572  /* Initialize the executor state. */
2573  edata = create_edata_for_relation(rel);
2574  estate = edata->estate;
2575  remoteslot = ExecInitExtraTupleSlot(estate,
2576  RelationGetDescr(rel->localrel),
2577  &TTSOpsVirtual);
2578 
2579  /*
2580  * Populate updatedCols so that per-column triggers can fire, and so
2581  * executor can correctly pass down indexUnchanged hint. This could
2582  * include more columns than were actually changed on the publisher
2583  * because the logical replication protocol doesn't contain that
2584  * information. But it would for example exclude columns that only exist
2585  * on the subscriber, since we are not touching those.
2586  */
2587  target_perminfo = list_nth(estate->es_rteperminfos, 0);
2588  for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2589  {
2591  int remoteattnum = rel->attrmap->attnums[i];
2592 
2593  if (!att->attisdropped && remoteattnum >= 0)
2594  {
2595  Assert(remoteattnum < newtup.ncols);
2596  if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2597  target_perminfo->updatedCols =
2598  bms_add_member(target_perminfo->updatedCols,
2600  }
2601  }
2602 
2603  /* Build the search tuple. */
2605  slot_store_data(remoteslot, rel,
2606  has_oldtup ? &oldtup : &newtup);
2607  MemoryContextSwitchTo(oldctx);
2608 
2609  /* For a partitioned table, apply update to correct partition. */
2610  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2612  remoteslot, &newtup, CMD_UPDATE);
2613  else
2615  remoteslot, &newtup, rel->localindexoid);
2616 
2617  finish_edata(edata);
2618 
2619  /* Reset relation for error callback */
2621 
2623 
2625 }
2626 
2627 /*
2628  * Workhorse for apply_handle_update()
2629  * relinfo is for the relation we're actually updating in
2630  * (could be a child partition of edata->targetRelInfo)
2631  */
2632 static void
2634  ResultRelInfo *relinfo,
2635  TupleTableSlot *remoteslot,
2636  LogicalRepTupleData *newtup,
2637  Oid localindexoid)
2638 {
2639  EState *estate = edata->estate;
2640  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2641  Relation localrel = relinfo->ri_RelationDesc;
2642  EPQState epqstate;
2643  TupleTableSlot *localslot;
2644  bool found;
2645  MemoryContext oldctx;
2646 
2647  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
2648  ExecOpenIndices(relinfo, false);
2649 
2650  found = FindReplTupleInLocalRel(estate, localrel,
2651  &relmapentry->remoterel,
2652  localindexoid,
2653  remoteslot, &localslot);
2654  ExecClearTuple(remoteslot);
2655 
2656  /*
2657  * Tuple found.
2658  *
2659  * Note this will fail if there are other conflicting unique indexes.
2660  */
2661  if (found)
2662  {
2663  /* Process and store remote tuple in the slot */
2665  slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2666  MemoryContextSwitchTo(oldctx);
2667 
2668  EvalPlanQualSetSlot(&epqstate, remoteslot);
2669 
2670  /* Do the actual update. */
2672  ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2673  remoteslot);
2674  }
2675  else
2676  {
2677  /*
2678  * The tuple to be updated could not be found. Do nothing except for
2679  * emitting a log message.
2680  *
2681  * XXX should this be promoted to ereport(LOG) perhaps?
2682  */
2683  elog(DEBUG1,
2684  "logical replication did not find row to be updated "
2685  "in replication target relation \"%s\"",
2686  RelationGetRelationName(localrel));
2687  }
2688 
2689  /* Cleanup. */
2690  ExecCloseIndices(relinfo);
2691  EvalPlanQualEnd(&epqstate);
2692 }
2693 
2694 /*
2695  * Handle DELETE message.
2696  *
2697  * TODO: FDW support
2698  */
2699 static void
2701 {
2702  LogicalRepRelMapEntry *rel;
2703  LogicalRepTupleData oldtup;
2704  LogicalRepRelId relid;
2705  ApplyExecutionData *edata;
2706  EState *estate;
2707  TupleTableSlot *remoteslot;
2708  MemoryContext oldctx;
2709 
2710  /*
2711  * Quick return if we are skipping data modification changes or handling
2712  * streamed transactions.
2713  */
2714  if (is_skipping_changes() ||
2716  return;
2717 
2719 
2720  relid = logicalrep_read_delete(s, &oldtup);
2721  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2722  if (!should_apply_changes_for_rel(rel))
2723  {
2724  /*
2725  * The relation can't become interesting in the middle of the
2726  * transaction so it's safe to unlock it.
2727  */
2730  return;
2731  }
2732 
2733  /* Set relation for error callback */
2735 
2736  /* Check if we can do the delete. */
2738 
2739  /* Initialize the executor state. */
2740  edata = create_edata_for_relation(rel);
2741  estate = edata->estate;
2742  remoteslot = ExecInitExtraTupleSlot(estate,
2743  RelationGetDescr(rel->localrel),
2744  &TTSOpsVirtual);
2745 
2746  /* Build the search tuple. */
2748  slot_store_data(remoteslot, rel, &oldtup);
2749  MemoryContextSwitchTo(oldctx);
2750 
2751  /* For a partitioned table, apply delete to correct partition. */
2752  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2754  remoteslot, NULL, CMD_DELETE);
2755  else
2757  remoteslot, rel->localindexoid);
2758 
2759  finish_edata(edata);
2760 
2761  /* Reset relation for error callback */
2763 
2765 
2767 }
2768 
2769 /*
2770  * Workhorse for apply_handle_delete()
2771  * relinfo is for the relation we're actually deleting from
2772  * (could be a child partition of edata->targetRelInfo)
2773  */
2774 static void
2776  ResultRelInfo *relinfo,
2777  TupleTableSlot *remoteslot,
2778  Oid localindexoid)
2779 {
2780  EState *estate = edata->estate;
2781  Relation localrel = relinfo->ri_RelationDesc;
2782  LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
2783  EPQState epqstate;
2784  TupleTableSlot *localslot;
2785  bool found;
2786 
2787  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
2788  ExecOpenIndices(relinfo, false);
2789 
2790  found = FindReplTupleInLocalRel(estate, localrel, remoterel, localindexoid,
2791  remoteslot, &localslot);
2792 
2793  /* If found delete it. */
2794  if (found)
2795  {
2796  EvalPlanQualSetSlot(&epqstate, localslot);
2797 
2798  /* Do the actual delete. */
2800  ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
2801  }
2802  else
2803  {
2804  /*
2805  * The tuple to be deleted could not be found. Do nothing except for
2806  * emitting a log message.
2807  *
2808  * XXX should this be promoted to ereport(LOG) perhaps?
2809  */
2810  elog(DEBUG1,
2811  "logical replication did not find row to be deleted "
2812  "in replication target relation \"%s\"",
2813  RelationGetRelationName(localrel));
2814  }
2815 
2816  /* Cleanup. */
2817  ExecCloseIndices(relinfo);
2818  EvalPlanQualEnd(&epqstate);
2819 }
2820 
2821 /*
2822  * Try to find a tuple received from the publication side (in 'remoteslot') in
2823  * the corresponding local relation using either replica identity index,
2824  * primary key, index or if needed, sequential scan.
2825  *
2826  * Local tuple, if found, is returned in '*localslot'.
2827  */
2828 static bool
2830  LogicalRepRelation *remoterel,
2831  Oid localidxoid,
2832  TupleTableSlot *remoteslot,
2833  TupleTableSlot **localslot)
2834 {
2835  bool found;
2836 
2837  /*
2838  * Regardless of the top-level operation, we're performing a read here, so
2839  * check for SELECT privileges.
2840  */
2841  TargetPrivilegesCheck(localrel, ACL_SELECT);
2842 
2843  *localslot = table_slot_create(localrel, &estate->es_tupleTable);
2844 
2845  Assert(OidIsValid(localidxoid) ||
2846  (remoterel->replident == REPLICA_IDENTITY_FULL));
2847 
2848  if (OidIsValid(localidxoid))
2849  found = RelationFindReplTupleByIndex(localrel, localidxoid,
2851  remoteslot, *localslot);
2852  else
2853  found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
2854  remoteslot, *localslot);
2855 
2856  return found;
2857 }
2858 
2859 /*
2860  * This handles insert, update, delete on a partitioned table.
2861  */
2862 static void
2864  TupleTableSlot *remoteslot,
2865  LogicalRepTupleData *newtup,
2866  CmdType operation)
2867 {
2868  EState *estate = edata->estate;
2869  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2870  ResultRelInfo *relinfo = edata->targetRelInfo;
2871  Relation parentrel = relinfo->ri_RelationDesc;
2872  ModifyTableState *mtstate;
2873  PartitionTupleRouting *proute;
2874  ResultRelInfo *partrelinfo;
2875  Relation partrel;
2876  TupleTableSlot *remoteslot_part;
2877  TupleConversionMap *map;
2878  MemoryContext oldctx;
2879  LogicalRepRelMapEntry *part_entry = NULL;
2880  AttrMap *attrmap = NULL;
2881 
2882  /* ModifyTableState is needed for ExecFindPartition(). */
2883  edata->mtstate = mtstate = makeNode(ModifyTableState);
2884  mtstate->ps.plan = NULL;
2885  mtstate->ps.state = estate;
2886  mtstate->operation = operation;
2887  mtstate->resultRelInfo = relinfo;
2888 
2889  /* ... as is PartitionTupleRouting. */
2890  edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
2891 
2892  /*
2893  * Find the partition to which the "search tuple" belongs.
2894  */
2895  Assert(remoteslot != NULL);
2897  partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
2898  remoteslot, estate);
2899  Assert(partrelinfo != NULL);
2900  partrel = partrelinfo->ri_RelationDesc;
2901 
2902  /*
2903  * Check for supported relkind. We need this since partitions might be of
2904  * unsupported relkinds; and the set of partitions can change, so checking
2905  * at CREATE/ALTER SUBSCRIPTION would be insufficient.
2906  */
2907  CheckSubscriptionRelkind(partrel->rd_rel->relkind,
2909  RelationGetRelationName(partrel));
2910 
2911  /*
2912  * To perform any of the operations below, the tuple must match the
2913  * partition's rowtype. Convert if needed or just copy, using a dedicated
2914  * slot to store the tuple in any case.
2915  */
2916  remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
2917  if (remoteslot_part == NULL)
2918  remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
2919  map = ExecGetRootToChildMap(partrelinfo, estate);
2920  if (map != NULL)
2921  {
2922  attrmap = map->attrMap;
2923  remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
2924  remoteslot_part);
2925  }
2926  else
2927  {
2928  remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
2929  slot_getallattrs(remoteslot_part);
2930  }
2931  MemoryContextSwitchTo(oldctx);
2932 
2933  /* Check if we can do the update or delete on the leaf partition. */
2934  if (operation == CMD_UPDATE || operation == CMD_DELETE)
2935  {
2936  part_entry = logicalrep_partition_open(relmapentry, partrel,
2937  attrmap);
2938  check_relation_updatable(part_entry);
2939  }
2940 
2941  switch (operation)
2942  {
2943  case CMD_INSERT:
2944  apply_handle_insert_internal(edata, partrelinfo,
2945  remoteslot_part);
2946  break;
2947 
2948  case CMD_DELETE:
2949  apply_handle_delete_internal(edata, partrelinfo,
2950  remoteslot_part,
2951  part_entry->localindexoid);
2952  break;
2953 
2954  case CMD_UPDATE:
2955 
2956  /*
2957  * For UPDATE, depending on whether or not the updated tuple
2958  * satisfies the partition's constraint, perform a simple UPDATE
2959  * of the partition or move the updated tuple into a different
2960  * suitable partition.
2961  */
2962  {
2963  TupleTableSlot *localslot;
2964  ResultRelInfo *partrelinfo_new;
2965  Relation partrel_new;
2966  bool found;
2967 
2968  /* Get the matching local tuple from the partition. */
2969  found = FindReplTupleInLocalRel(estate, partrel,
2970  &part_entry->remoterel,
2971  part_entry->localindexoid,
2972  remoteslot_part, &localslot);
2973  if (!found)
2974  {
2975  /*
2976  * The tuple to be updated could not be found. Do nothing
2977  * except for emitting a log message.
2978  *
2979  * XXX should this be promoted to ereport(LOG) perhaps?
2980  */
2981  elog(DEBUG1,
2982  "logical replication did not find row to be updated "
2983  "in replication target relation's partition \"%s\"",
2984  RelationGetRelationName(partrel));
2985  return;
2986  }
2987 
2988  /*
2989  * Apply the update to the local tuple, putting the result in
2990  * remoteslot_part.
2991  */
2993  slot_modify_data(remoteslot_part, localslot, part_entry,
2994  newtup);
2995  MemoryContextSwitchTo(oldctx);
2996 
2997  /*
2998  * Does the updated tuple still satisfy the current
2999  * partition's constraint?
3000  */
3001  if (!partrel->rd_rel->relispartition ||
3002  ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3003  false))
3004  {
3005  /*
3006  * Yes, so simply UPDATE the partition. We don't call
3007  * apply_handle_update_internal() here, which would
3008  * normally do the following work, to avoid repeating some
3009  * work already done above to find the local tuple in the
3010  * partition.
3011  */
3012  EPQState epqstate;
3013 
3014  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
3015  ExecOpenIndices(partrelinfo, false);
3016 
3017  EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3019  ACL_UPDATE);
3020  ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3021  localslot, remoteslot_part);
3022  ExecCloseIndices(partrelinfo);
3023  EvalPlanQualEnd(&epqstate);
3024  }
3025  else
3026  {
3027  /* Move the tuple into the new partition. */
3028 
3029  /*
3030  * New partition will be found using tuple routing, which
3031  * can only occur via the parent table. We might need to
3032  * convert the tuple to the parent's rowtype. Note that
3033  * this is the tuple found in the partition, not the
3034  * original search tuple received by this function.
3035  */
3036  if (map)
3037  {
3038  TupleConversionMap *PartitionToRootMap =
3040  RelationGetDescr(parentrel));
3041 
3042  remoteslot =
3043  execute_attr_map_slot(PartitionToRootMap->attrMap,
3044  remoteslot_part, remoteslot);
3045  }
3046  else
3047  {
3048  remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3049  slot_getallattrs(remoteslot);
3050  }
3051 
3052  /* Find the new partition. */
3054  partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3055  proute, remoteslot,
3056  estate);
3057  MemoryContextSwitchTo(oldctx);
3058  Assert(partrelinfo_new != partrelinfo);
3059  partrel_new = partrelinfo_new->ri_RelationDesc;
3060 
3061  /* Check that new partition also has supported relkind. */
3062  CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3064  RelationGetRelationName(partrel_new));
3065 
3066  /* DELETE old tuple found in the old partition. */
3067  apply_handle_delete_internal(edata, partrelinfo,
3068  localslot,
3069  part_entry->localindexoid);
3070 
3071  /* INSERT new tuple into the new partition. */
3072 
3073  /*
3074  * Convert the replacement tuple to match the destination
3075  * partition rowtype.
3076  */
3078  remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3079  if (remoteslot_part == NULL)
3080  remoteslot_part = table_slot_create(partrel_new,
3081  &estate->es_tupleTable);
3082  map = ExecGetRootToChildMap(partrelinfo_new, estate);
3083  if (map != NULL)
3084  {
3085  remoteslot_part = execute_attr_map_slot(map->attrMap,
3086  remoteslot,
3087  remoteslot_part);
3088  }
3089  else
3090  {
3091  remoteslot_part = ExecCopySlot(remoteslot_part,
3092  remoteslot);
3093  slot_getallattrs(remoteslot);
3094  }
3095  MemoryContextSwitchTo(oldctx);
3096  apply_handle_insert_internal(edata, partrelinfo_new,
3097  remoteslot_part);
3098  }
3099  }
3100  break;
3101 
3102  default:
3103  elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3104  break;
3105  }
3106 }
3107 
3108 /*
3109  * Handle TRUNCATE message.
3110  *
3111  * TODO: FDW support
3112  */
3113 static void
3115 {
3116  bool cascade = false;
3117  bool restart_seqs = false;
3118  List *remote_relids = NIL;
3119  List *remote_rels = NIL;
3120  List *rels = NIL;
3121  List *part_rels = NIL;
3122  List *relids = NIL;
3123  List *relids_logged = NIL;
3124  ListCell *lc;
3125  LOCKMODE lockmode = AccessExclusiveLock;
3126 
3127  /*
3128  * Quick return if we are skipping data modification changes or handling
3129  * streamed transactions.
3130  */
3131  if (is_skipping_changes() ||
3133  return;
3134 
3136 
3137  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3138 
3139  foreach(lc, remote_relids)
3140  {
3141  LogicalRepRelId relid = lfirst_oid(lc);
3142  LogicalRepRelMapEntry *rel;
3143 
3144  rel = logicalrep_rel_open(relid, lockmode);
3145  if (!should_apply_changes_for_rel(rel))
3146  {
3147  /*
3148  * The relation can't become interesting in the middle of the
3149  * transaction so it's safe to unlock it.
3150  */
3151  logicalrep_rel_close(rel, lockmode);
3152  continue;
3153  }
3154 
3155  remote_rels = lappend(remote_rels, rel);
3157  rels = lappend(rels, rel->localrel);
3158  relids = lappend_oid(relids, rel->localreloid);
3160  relids_logged = lappend_oid(relids_logged, rel->localreloid);
3161 
3162  /*
3163  * Truncate partitions if we got a message to truncate a partitioned
3164  * table.
3165  */
3166  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3167  {
3168  ListCell *child;
3169  List *children = find_all_inheritors(rel->localreloid,
3170  lockmode,
3171  NULL);
3172 
3173  foreach(child, children)
3174  {
3175  Oid childrelid = lfirst_oid(child);
3176  Relation childrel;
3177 
3178  if (list_member_oid(relids, childrelid))
3179  continue;
3180 
3181  /* find_all_inheritors already got lock */
3182  childrel = table_open(childrelid, NoLock);
3183 
3184  /*
3185  * Ignore temp tables of other backends. See similar code in
3186  * ExecuteTruncate().
3187  */
3188  if (RELATION_IS_OTHER_TEMP(childrel))
3189  {
3190  table_close(childrel, lockmode);
3191  continue;
3192  }
3193 
3195  rels = lappend(rels, childrel);
3196  part_rels = lappend(part_rels, childrel);
3197  relids = lappend_oid(relids, childrelid);
3198  /* Log this relation only if needed for logical decoding */
3199  if (RelationIsLogicallyLogged(childrel))
3200  relids_logged = lappend_oid(relids_logged, childrelid);
3201  }
3202  }
3203  }
3204 
3205  /*
3206  * Even if we used CASCADE on the upstream primary we explicitly default
3207  * to replaying changes without further cascading. This might be later
3208  * changeable with a user specified option.
3209  */
3210  ExecuteTruncateGuts(rels,
3211  relids,
3212  relids_logged,
3213  DROP_RESTRICT,
3214  restart_seqs);
3215  foreach(lc, remote_rels)
3216  {
3217  LogicalRepRelMapEntry *rel = lfirst(lc);
3218 
3220  }
3221  foreach(lc, part_rels)
3222  {
3223  Relation rel = lfirst(lc);
3224 
3225  table_close(rel, NoLock);
3226  }
3227 
3229 }
3230 
3231 
3232 /*
3233  * Logical replication protocol message dispatcher.
3234  */
3235 void
3237 {
3239  LogicalRepMsgType saved_command;
3240 
3241  /*
3242  * Set the current command being applied. Since this function can be
3243  * called recursively when applying spooled changes, save the current
3244  * command.
3245  */
3246  saved_command = apply_error_callback_arg.command;
3248 
3249  switch (action)
3250  {
3251  case LOGICAL_REP_MSG_BEGIN:
3252  apply_handle_begin(s);
3253  break;
3254 
3257  break;
3258 
3261  break;
3262 
3265  break;
3266 
3269  break;
3270 
3273  break;
3274 
3277  break;
3278 
3279  case LOGICAL_REP_MSG_TYPE:
3280  apply_handle_type(s);
3281  break;
3282 
3285  break;
3286 
3288 
3289  /*
3290  * Logical replication does not use generic logical messages yet.
3291  * Although, it could be used by other applications that use this
3292  * output plugin.
3293  */
3294  break;
3295 
3298  break;
3299 
3302  break;
3303 
3306  break;
3307 
3310  break;
3311 
3314  break;
3315 
3318  break;
3319 
3322  break;
3323 
3326  break;
3327 
3330  break;
3331 
3332  default:
3333  ereport(ERROR,
3334  (errcode(ERRCODE_PROTOCOL_VIOLATION),
3335  errmsg("invalid logical replication message type \"%c\"", action)));
3336  }
3337 
3338  /* Reset the current command */
3339  apply_error_callback_arg.command = saved_command;
3340 }
3341 
3342 /*
3343  * Figure out which write/flush positions to report to the walsender process.
3344  *
3345  * We can't simply report back the last LSN the walsender sent us because the
3346  * local transaction might not yet be flushed to disk locally. Instead we
3347  * build a list that associates local with remote LSNs for every commit. When
3348  * reporting back the flush position to the sender we iterate that list and
3349  * check which entries on it are already locally flushed. Those we can report
3350  * as having been flushed.
3351  *
3352  * The have_pending_txes is true if there are outstanding transactions that
3353  * need to be flushed.
3354  */
3355 static void
3357  bool *have_pending_txes)
3358 {
3359  dlist_mutable_iter iter;
3360  XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3361 
3363  *flush = InvalidXLogRecPtr;
3364 
3366  {
3367  FlushPosition *pos =
3368  dlist_container(FlushPosition, node, iter.cur);
3369 
3370  *write = pos->remote_end;
3371 
3372  if (pos->local_end <= local_flush)
3373  {
3374  *flush = pos->remote_end;
3375  dlist_delete(iter.cur);
3376  pfree(pos);
3377  }
3378  else
3379  {
3380  /*
3381  * Don't want to uselessly iterate over the rest of the list which
3382  * could potentially be long. Instead get the last element and
3383  * grab the write position from there.
3384  */
3385  pos = dlist_tail_element(FlushPosition, node,
3386  &lsn_mapping);
3387  *write = pos->remote_end;
3388  *have_pending_txes = true;
3389  return;
3390  }
3391  }
3392 
3393  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3394 }
3395 
3396 /*
3397  * Store current remote/local lsn pair in the tracking list.
3398  */
3399 void
3401 {
3402  FlushPosition *flushpos;
3403 
3404  /*
3405  * Skip for parallel apply workers, because the lsn_mapping is maintained
3406  * by the leader apply worker.
3407  */
3409  return;
3410 
3411  /* Need to do this in permanent context */
3413 
3414  /* Track commit lsn */
3415  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
3416  flushpos->local_end = local_lsn;
3417  flushpos->remote_end = remote_lsn;
3418 
3419  dlist_push_tail(&lsn_mapping, &flushpos->node);
3421 }
3422 
3423 
3424 /* Update statistics of the worker. */
3425 static void
3426 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
3427 {
3428  MyLogicalRepWorker->last_lsn = last_lsn;
3429  MyLogicalRepWorker->last_send_time = send_time;
3431  if (reply)
3432  {
3433  MyLogicalRepWorker->reply_lsn = last_lsn;
3434  MyLogicalRepWorker->reply_time = send_time;
3435  }
3436 }
3437 
3438 /*
3439  * Apply main loop.
3440  */
3441 static void
3443 {
3444  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3445  bool ping_sent = false;
3446  TimeLineID tli;
3447  ErrorContextCallback errcallback;
3448 
3449  /*
3450  * Init the ApplyMessageContext which we clean up after each replication
3451  * protocol message.
3452  */
3454  "ApplyMessageContext",
3456 
3457  /*
3458  * This memory context is used for per-stream data when the streaming mode
3459  * is enabled. This context is reset on each stream stop.
3460  */
3462  "LogicalStreamingContext",
3464 
3465  /* mark as idle, before starting to loop */
3467 
3468  /*
3469  * Push apply error context callback. Fields will be filled while applying
3470  * a change.
3471  */
3472  errcallback.callback = apply_error_callback;
3473  errcallback.previous = error_context_stack;
3474  error_context_stack = &errcallback;
3476 
3477  /* This outer loop iterates once per wait. */
3478  for (;;)
3479  {
3481  int rc;
3482  int len;
3483  char *buf = NULL;
3484  bool endofstream = false;
3485  long wait_time;
3486 
3488 
3490 
3492 
3493  if (len != 0)
3494  {
3495  /* Loop to process all available data (without blocking). */
3496  for (;;)
3497  {
3499 
3500  if (len == 0)
3501  {
3502  break;
3503  }
3504  else if (len < 0)
3505  {
3506  ereport(LOG,
3507  (errmsg("data stream from publisher has ended")));
3508  endofstream = true;
3509  break;
3510  }
3511  else
3512  {
3513  int c;
3514  StringInfoData s;
3515 
3516  /* Reset timeout. */
3517  last_recv_timestamp = GetCurrentTimestamp();
3518  ping_sent = false;
3519 
3520  /* Ensure we are reading the data into our memory context. */
3522 
3523  s.data = buf;
3524  s.len = len;
3525  s.cursor = 0;
3526  s.maxlen = -1;
3527 
3528  c = pq_getmsgbyte(&s);
3529 
3530  if (c == 'w')
3531  {
3532  XLogRecPtr start_lsn;
3533  XLogRecPtr end_lsn;
3534  TimestampTz send_time;
3535 
3536  start_lsn = pq_getmsgint64(&s);
3537  end_lsn = pq_getmsgint64(&s);
3538  send_time = pq_getmsgint64(&s);
3539 
3540  if (last_received < start_lsn)
3541  last_received = start_lsn;
3542 
3543  if (last_received < end_lsn)
3544  last_received = end_lsn;
3545 
3546  UpdateWorkerStats(last_received, send_time, false);
3547 
3548  apply_dispatch(&s);
3549  }
3550  else if (c == 'k')
3551  {
3552  XLogRecPtr end_lsn;
3554  bool reply_requested;
3555 
3556  end_lsn = pq_getmsgint64(&s);
3557  timestamp = pq_getmsgint64(&s);
3558  reply_requested = pq_getmsgbyte(&s);
3559 
3560  if (last_received < end_lsn)
3561  last_received = end_lsn;
3562 
3563  send_feedback(last_received, reply_requested, false);
3564  UpdateWorkerStats(last_received, timestamp, true);
3565  }
3566  /* other message types are purposefully ignored */
3567 
3569  }
3570 
3572  }
3573  }
3574 
3575  /* confirm all writes so far */
3576  send_feedback(last_received, false, false);
3577 
3579  {
3580  /*
3581  * If we didn't get any transactions for a while there might be
3582  * unconsumed invalidation messages in the queue, consume them
3583  * now.
3584  */
3587 
3588  /* Process any table synchronization changes. */
3589  process_syncing_tables(last_received);
3590  }
3591 
3592  /* Cleanup the memory. */
3595 
3596  /* Check if we need to exit the streaming loop. */
3597  if (endofstream)
3598  break;
3599 
3600  /*
3601  * Wait for more data or latch. If we have unflushed transactions,
3602  * wake up after WalWriterDelay to see if they've been flushed yet (in
3603  * which case we should send a feedback message). Otherwise, there's
3604  * no particular urgency about waking up unless we get data or a
3605  * signal.
3606  */
3607  if (!dlist_is_empty(&lsn_mapping))
3608  wait_time = WalWriterDelay;
3609  else
3610  wait_time = NAPTIME_PER_CYCLE;
3611 
3615  fd, wait_time,
3617 
3618  if (rc & WL_LATCH_SET)
3619  {
3622  }
3623 
3624  if (ConfigReloadPending)
3625  {
3626  ConfigReloadPending = false;
3628  }
3629 
3630  if (rc & WL_TIMEOUT)
3631  {
3632  /*
3633  * We didn't receive anything new. If we haven't heard anything
3634  * from the server for more than wal_receiver_timeout / 2, ping
3635  * the server. Also, if it's been longer than
3636  * wal_receiver_status_interval since the last update we sent,
3637  * send a status update to the primary anyway, to report any
3638  * progress in applying WAL.
3639  */
3640  bool requestReply = false;
3641 
3642  /*
3643  * Check if time since last receive from primary has reached the
3644  * configured limit.
3645  */
3646  if (wal_receiver_timeout > 0)
3647  {
3649  TimestampTz timeout;
3650 
3651  timeout =
3652  TimestampTzPlusMilliseconds(last_recv_timestamp,
3654 
3655  if (now >= timeout)
3656  ereport(ERROR,
3657  (errcode(ERRCODE_CONNECTION_FAILURE),
3658  errmsg("terminating logical replication worker due to timeout")));
3659 
3660  /* Check to see if it's time for a ping. */
3661  if (!ping_sent)
3662  {
3663  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
3664  (wal_receiver_timeout / 2));
3665  if (now >= timeout)
3666  {
3667  requestReply = true;
3668  ping_sent = true;
3669  }
3670  }
3671  }
3672 
3673  send_feedback(last_received, requestReply, requestReply);
3674 
3675  /*
3676  * Force reporting to ensure long idle periods don't lead to
3677  * arbitrarily delayed stats. Stats can only be reported outside
3678  * of (implicit or explicit) transactions. That shouldn't lead to
3679  * stats being delayed for long, because transactions are either
3680  * sent as a whole on commit or streamed. Streamed transactions
3681  * are spilled to disk and applied on commit.
3682  */
3683  if (!IsTransactionState())
3684  pgstat_report_stat(true);
3685  }
3686  }
3687 
3688  /* Pop the error context stack */
3689  error_context_stack = errcallback.previous;
3691 
3692  /* All done */
3694 }
3695 
3696 /*
3697  * Send a Standby Status Update message to server.
3698  *
3699  * 'recvpos' is the latest LSN we've received data to, force is set if we need
3700  * to send a response to avoid timeouts.
3701  */
3702 static void
3703 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
3704 {
3705  static StringInfo reply_message = NULL;
3706  static TimestampTz send_time = 0;
3707 
3708  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
3709  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
3710  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
3711 
3712  XLogRecPtr writepos;
3713  XLogRecPtr flushpos;
3714  TimestampTz now;
3715  bool have_pending_txes;
3716 
3717  /*
3718  * If the user doesn't want status to be reported to the publisher, be
3719  * sure to exit before doing anything at all.
3720  */
3721  if (!force && wal_receiver_status_interval <= 0)
3722  return;
3723 
3724  /* It's legal to not pass a recvpos */
3725  if (recvpos < last_recvpos)
3726  recvpos = last_recvpos;
3727 
3728  get_flush_position(&writepos, &flushpos, &have_pending_txes);
3729 
3730  /*
3731  * No outstanding transactions to flush, we can report the latest received
3732  * position. This is important for synchronous replication.
3733  */
3734  if (!have_pending_txes)
3735  flushpos = writepos = recvpos;
3736 
3737  if (writepos < last_writepos)
3738  writepos = last_writepos;
3739 
3740  if (flushpos < last_flushpos)
3741  flushpos = last_flushpos;
3742 
3744 
3745  /* if we've already reported everything we're good */
3746  if (!force &&
3747  writepos == last_writepos &&
3748  flushpos == last_flushpos &&
3749  !TimestampDifferenceExceeds(send_time, now,
3751  return;
3752  send_time = now;
3753 
3754  if (!reply_message)
3755  {
3757 
3759  MemoryContextSwitchTo(oldctx);
3760  }
3761  else
3763 
3764  pq_sendbyte(reply_message, 'r');
3765  pq_sendint64(reply_message, recvpos); /* write */
3766  pq_sendint64(reply_message, flushpos); /* flush */
3767  pq_sendint64(reply_message, writepos); /* apply */
3768  pq_sendint64(reply_message, now); /* sendTime */
3769  pq_sendbyte(reply_message, requestReply); /* replyRequested */
3770 
3771  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
3772  force,
3773  LSN_FORMAT_ARGS(recvpos),
3774  LSN_FORMAT_ARGS(writepos),
3775  LSN_FORMAT_ARGS(flushpos));
3776 
3779 
3780  if (recvpos > last_recvpos)
3781  last_recvpos = recvpos;
3782  if (writepos > last_writepos)
3783  last_writepos = writepos;
3784  if (flushpos > last_flushpos)
3785  last_flushpos = flushpos;
3786 }
3787 
3788 /*
3789  * Exit routine for apply workers due to subscription parameter changes.
3790  */
3791 static void
3793 {
3795  {
3796  /*
3797  * Don't stop the parallel apply worker as the leader will detect the
3798  * subscription parameter change and restart logical replication later
3799  * anyway. This also prevents the leader from reporting errors when
3800  * trying to communicate with a stopped parallel apply worker, which
3801  * would accidentally disable subscriptions if disable_on_error was
3802  * set.
3803  */
3804  return;
3805  }
3806 
3807  /*
3808  * Reset the last-start time for this apply worker so that the launcher
3809  * will restart it without waiting for wal_retrieve_retry_interval if the
3810  * subscription is still active, and so that we won't leak that hash table
3811  * entry if it isn't.
3812  */
3813  if (!am_tablesync_worker())
3815 
3816  proc_exit(0);
3817 }
3818 
3819 /*
3820  * Reread subscription info if needed. Most changes will be exit.
3821  */
3822 void
3824 {
3825  MemoryContext oldctx;
3827  bool started_tx = false;
3828 
3829  /* When cache state is valid there is nothing to do here. */
3830  if (MySubscriptionValid)
3831  return;
3832 
3833  /* This function might be called inside or outside of transaction. */
3834  if (!IsTransactionState())
3835  {
3837  started_tx = true;
3838  }
3839 
3840  /* Ensure allocations in permanent context. */
3842 
3844 
3845  /*
3846  * Exit if the subscription was removed. This normally should not happen
3847  * as the worker gets killed during DROP SUBSCRIPTION.
3848  */
3849  if (!newsub)
3850  {
3851  ereport(LOG,
3852  /* translator: first %s is the name of logical replication worker */
3853  (errmsg("%s for subscription \"%s\" will stop because the subscription was removed",
3855 
3856  /* Ensure we remove no-longer-useful entry for worker's start time */
3859  proc_exit(0);
3860  }
3861 
3862  /* Exit if the subscription was disabled. */
3863  if (!newsub->enabled)
3864  {
3865  ereport(LOG,
3866  /* translator: first %s is the name of logical replication worker */
3867  (errmsg("%s for subscription \"%s\" will stop because the subscription was disabled",
3869 
3871  }
3872 
3873  /* !slotname should never happen when enabled is true. */
3874  Assert(newsub->slotname);
3875 
3876  /* two-phase should not be altered */
3877  Assert(newsub->twophasestate == MySubscription->twophasestate);
3878 
3879  /*
3880  * Exit if any parameter that affects the remote connection was changed.
3881  * The launcher will start a new worker but note that the parallel apply
3882  * worker won't restart if the streaming option's value is changed from
3883  * 'parallel' to any other value or the server decides not to stream the
3884  * in-progress transaction.
3885  */
3886  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
3887  strcmp(newsub->name, MySubscription->name) != 0 ||
3888  strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
3889  newsub->binary != MySubscription->binary ||
3890  newsub->stream != MySubscription->stream ||
3891  strcmp(newsub->origin, MySubscription->origin) != 0 ||
3892  newsub->owner != MySubscription->owner ||
3893  !equal(newsub->publications, MySubscription->publications))
3894  {
3896  ereport(LOG,
3897  (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
3898  MySubscription->name)));
3899  else
3900  ereport(LOG,
3901  /* translator: first %s is the name of logical replication worker */
3902  (errmsg("%s for subscription \"%s\" will restart because of a parameter change",
3904 
3906  }
3907 
3908  /* Check for other changes that should never happen too. */
3909  if (newsub->dbid != MySubscription->dbid)
3910  {
3911  elog(ERROR, "subscription %u changed unexpectedly",
3913  }
3914 
3915  /* Clean old subscription info and switch to new one. */
3918 
3919  MemoryContextSwitchTo(oldctx);
3920 
3921  /* Change synchronous commit according to the user's wishes */
3922  SetConfigOption("synchronous_commit", MySubscription->synccommit,
3924 
3925  if (started_tx)
3927 
3928  MySubscriptionValid = true;
3929 }
3930 
3931 /*
3932  * Callback from subscription syscache invalidation.
3933  */
3934 static void
3935 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
3936 {
3937  MySubscriptionValid = false;
3938 }
3939 
3940 /*
3941  * subxact_info_write
3942  * Store information about subxacts for a toplevel transaction.
3943  *
3944  * For each subxact we store offset of it's first change in the main file.
3945  * The file is always over-written as a whole.
3946  *
3947  * XXX We should only store subxacts that were not aborted yet.
3948  */
3949 static void
3951 {
3952  char path[MAXPGPATH];
3953  Size len;
3954  BufFile *fd;
3955 
3957 
3958  /* construct the subxact filename */
3959  subxact_filename(path, subid, xid);
3960 
3961  /* Delete the subxacts file, if exists. */
3962  if (subxact_data.nsubxacts == 0)
3963  {
3966 
3967  return;
3968  }
3969 
3970  /*
3971  * Create the subxact file if it not already created, otherwise open the
3972  * existing file.
3973  */
3975  true);
3976  if (fd == NULL)
3978 
3979  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
3980 
3981  /* Write the subxact count and subxact info */
3984 
3985  BufFileClose(fd);
3986 
3987  /* free the memory allocated for subxact info */
3989 }
3990 
3991 /*
3992  * subxact_info_read
3993  * Restore information about subxacts of a streamed transaction.
3994  *
3995  * Read information about subxacts into the structure subxact_data that can be
3996  * used later.
3997  */
3998 static void
4000 {
4001  char path[MAXPGPATH];
4002  Size len;
4003  BufFile *fd;
4004  MemoryContext oldctx;
4005 
4009 
4010  /*
4011  * If the subxact file doesn't exist that means we don't have any subxact
4012  * info.
4013  */
4014  subxact_filename(path, subid, xid);
4016  true);
4017  if (fd == NULL)
4018  return;
4019 
4020  /* read number of subxact items */
4022 
4023  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4024 
4025  /* we keep the maximum as a power of 2 */
4027 
4028  /*
4029  * Allocate subxact information in the logical streaming context. We need
4030  * this information during the complete stream so that we can add the sub
4031  * transaction info to this. On stream stop we will flush this information
4032  * to the subxact file and reset the logical streaming context.
4033  */
4036  sizeof(SubXactInfo));
4037  MemoryContextSwitchTo(oldctx);
4038 
4039  if (len > 0)
4041 
4042  BufFileClose(fd);
4043 }
4044 
4045 /*
4046  * subxact_info_add
4047  * Add information about a subxact (offset in the main file).
4048  */
4049 static void
4051 {
4052  SubXactInfo *subxacts = subxact_data.subxacts;
4053  int64 i;
4054 
4055  /* We must have a valid top level stream xid and a stream fd. */
4057  Assert(stream_fd != NULL);
4058 
4059  /*
4060  * If the XID matches the toplevel transaction, we don't want to add it.
4061  */
4062  if (stream_xid == xid)
4063  return;
4064 
4065  /*
4066  * In most cases we're checking the same subxact as we've already seen in
4067  * the last call, so make sure to ignore it (this change comes later).
4068  */
4069  if (subxact_data.subxact_last == xid)
4070  return;
4071 
4072  /* OK, remember we're processing this XID. */
4073  subxact_data.subxact_last = xid;
4074 
4075  /*
4076  * Check if the transaction is already present in the array of subxact. We
4077  * intentionally scan the array from the tail, because we're likely adding
4078  * a change for the most recent subtransactions.
4079  *
4080  * XXX Can we rely on the subxact XIDs arriving in sorted order? That
4081  * would allow us to use binary search here.
4082  */
4083  for (i = subxact_data.nsubxacts; i > 0; i--)
4084  {
4085  /* found, so we're done */
4086  if (subxacts[i - 1].xid == xid)
4087  return;
4088  }
4089 
4090  /* This is a new subxact, so we need to add it to the array. */
4091  if (subxact_data.nsubxacts == 0)
4092  {
4093  MemoryContext oldctx;
4094 
4096 
4097  /*
4098  * Allocate this memory for subxacts in per-stream context, see
4099  * subxact_info_read.
4100  */
4102  subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
4103  MemoryContextSwitchTo(oldctx);
4104  }
4106  {
4108  subxacts = repalloc(subxacts,
4110  }
4111 
4112  subxacts[subxact_data.nsubxacts].xid = xid;
4113 
4114  /*
4115  * Get the current offset of the stream file and store it as offset of
4116  * this subxact.
4117  */
4119  &subxacts[subxact_data.nsubxacts].fileno,
4120  &subxacts[subxact_data.nsubxacts].offset);
4121 
4123  subxact_data.subxacts = subxacts;
4124 }
4125 
4126 /* format filename for file containing the info about subxacts */
4127 static inline void
4128 subxact_filename(char *path, Oid subid, TransactionId xid)
4129 {
4130  snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
4131 }
4132 
4133 /* format filename for file containing serialized changes */
4134 static inline void
4135 changes_filename(char *path, Oid subid, TransactionId xid)
4136 {
4137  snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
4138 }
4139 
4140 /*
4141  * stream_cleanup_files
4142  * Cleanup files for a subscription / toplevel transaction.
4143  *
4144  * Remove files with serialized changes and subxact info for a particular
4145  * toplevel transaction. Each subscription has a separate set of files
4146  * for any toplevel transaction.
4147  */
4148 void
4150 {
4151  char path[MAXPGPATH];
4152 
4153  /* Delete the changes file. */
4154  changes_filename(path, subid, xid);
4156 
4157  /* Delete the subxact file, if it exists. */
4158  subxact_filename(path, subid, xid);
4160 }
4161 
4162 /*
4163  * stream_open_file
4164  * Open a file that we'll use to serialize changes for a toplevel
4165  * transaction.
4166  *
4167  * Open a file for streamed changes from a toplevel transaction identified
4168  * by stream_xid (global variable). If it's the first chunk of streamed
4169  * changes for this transaction, create the buffile, otherwise open the
4170  * previously created file.
4171  */
4172 static void
4173 stream_open_file(Oid subid, TransactionId xid, bool first_segment)
4174 {
4175  char path[MAXPGPATH];
4176  MemoryContext oldcxt;
4177 
4178  Assert(OidIsValid(subid));
4180  Assert(stream_fd == NULL);
4181 
4182 
4183  changes_filename(path, subid, xid);
4184  elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
4185 
4186  /*
4187  * Create/open the buffiles under the logical streaming context so that we
4188  * have those files until stream stop.
4189  */
4191 
4192  /*
4193  * If this is the first streamed segment, create the changes file.
4194  * Otherwise, just open the file for writing, in append mode.
4195  */
4196  if (first_segment)
4198  path);
4199  else
4200  {
4201  /*
4202  * Open the file and seek to the end of the file because we always
4203  * append the changes file.
4204  */
4206  path, O_RDWR, false);
4207  BufFileSeek(stream_fd, 0, 0, SEEK_END);
4208  }
4209 
4210  MemoryContextSwitchTo(oldcxt);
4211 }
4212 
4213 /*
4214  * stream_close_file
4215  * Close the currently open file with streamed changes.
4216  */
4217 static void
4219 {
4220  Assert(stream_fd != NULL);
4221 
4223 
4224  stream_fd = NULL;
4225 }
4226 
4227 /*
4228  * stream_write_change
4229  * Serialize a change to a file for the current toplevel transaction.
4230  *
4231  * The change is serialized in a simple format, with length (not including
4232  * the length), action code (identifying the message type) and message
4233  * contents (without the subxact TransactionId value).
4234  */
4235 static void
4237 {
4238  int len;
4239 
4240  Assert(stream_fd != NULL);
4241 
4242  /* total on-disk size, including the action type character */
4243  len = (s->len - s->cursor) + sizeof(char);
4244 
4245  /* first write the size */
4246  BufFileWrite(stream_fd, &len, sizeof(len));
4247 
4248  /* then the action */
4249  BufFileWrite(stream_fd, &action, sizeof(action));
4250 
4251  /* and finally the remaining part of the buffer (after the XID) */
4252  len = (s->len - s->cursor);
4253 
4254  BufFileWrite(stream_fd, &s->data[s->cursor], len);
4255 }
4256 
4257 /*
4258  * stream_open_and_write_change
4259  * Serialize a message to a file for the given transaction.
4260  *
4261  * This function is similar to stream_write_change except that it will open the
4262  * target file if not already before writing the message and close the file at
4263  * the end.
4264  */
4265 static void
4267 {
4269 
4270  if (!stream_fd)
4271  stream_start_internal(xid, false);
4272 
4274  stream_stop_internal(xid);
4275 }
4276 
4277 /*
4278  * Cleanup the memory for subxacts and reset the related variables.
4279  */
4280 static inline void
4282 {
4283  if (subxact_data.subxacts)
4285 
4286  subxact_data.subxacts = NULL;
4288  subxact_data.nsubxacts = 0;
4290 }
4291 
4292 /*
4293  * Form the prepared transaction GID for two_phase transactions.
4294  *
4295  * Return the GID in the supplied buffer.
4296  */
4297 static void
4298 TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
4299 {
4300  Assert(subid != InvalidRepOriginId);
4301 
4302  if (!TransactionIdIsValid(xid))
4303  ereport(ERROR,
4304  (errcode(ERRCODE_PROTOCOL_VIOLATION),
4305  errmsg_internal("invalid two-phase transaction ID")));
4306 
4307  snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
4308 }
4309 
4310 /*
4311  * Execute the initial sync with error handling. Disable the subscription,
4312  * if it's required.
4313  *
4314  * Allocate the slot name in long-lived context on return. Note that we don't
4315  * handle FATAL errors which are probably because of system resource error and
4316  * are not repeatable.
4317  */
4318 static void
4319 start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
4320 {
4321  char *syncslotname = NULL;
4322 
4324 
4325  PG_TRY();
4326  {
4327  /* Call initial sync. */
4328  syncslotname = LogicalRepSyncTableStart(origin_startpos);
4329  }
4330  PG_CATCH();
4331  {
4334  else
4335  {
4336  /*
4337  * Report the worker failed during table synchronization. Abort
4338  * the current transaction so that the stats message is sent in an
4339  * idle state.
4340  */
4343 
4344  PG_RE_THROW();
4345  }
4346  }
4347  PG_END_TRY();
4348 
4349  /* allocate slot name in long-lived context */
4350  *myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
4351  pfree(syncslotname);
4352 }
4353 
4354 /*
4355  * Run the apply loop with error handling. Disable the subscription,
4356  * if necessary.
4357  *
4358  * Note that we don't handle FATAL errors which are probably because
4359  * of system resource error and are not repeatable.
4360  */
4361 static void
4362 start_apply(XLogRecPtr origin_startpos)
4363 {
4364  PG_TRY();
4365  {
4366  LogicalRepApplyLoop(origin_startpos);
4367  }
4368  PG_CATCH();
4369  {
4372  else
4373  {
4374  /*
4375  * Report the worker failed while applying changes. Abort the
4376  * current transaction so that the stats message is sent in an
4377  * idle state.
4378  */
4381 
4382  PG_RE_THROW();
4383  }
4384  }
4385  PG_END_TRY();
4386 }
4387 
4388 /*
4389  * Common initialization for leader apply worker and parallel apply worker.
4390  *
4391  * Initialize the database connection, in-memory subscription and necessary
4392  * config options.
4393  */
4394 void
4396 {
4397  MemoryContext oldctx;
4398 
4399  /* Run as replica session replication role. */
4400  SetConfigOption("session_replication_role", "replica",
4402 
4403  /* Connect to our database. */
4406  0);
4407 
4408  /*
4409  * Set always-secure search path, so malicious users can't redirect user
4410  * code (e.g. pg_index.indexprs).
4411  */
4412  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
4413 
4414  /* Load the subscription into persistent memory context. */
4416  "ApplyContext",
4420 
4422  if (!MySubscription)
4423  {
4424  ereport(LOG,
4425  /* translator: %s is the name of logical replication worker */
4426  (errmsg("%s for subscription %u will not start because the subscription was removed during startup",
4428 
4429  /* Ensure we remove no-longer-useful entry for worker's start time */
4432  proc_exit(0);
4433  }
4434 
4435  MySubscriptionValid = true;
4436  MemoryContextSwitchTo(oldctx);
4437 
4438  if (!MySubscription->enabled)
4439  {
4440  ereport(LOG,
4441  /* translator: first %s is the name of logical replication worker */
4442  (errmsg("%s for subscription \"%s\" will not start because the subscription was disabled during startup",
4444 
4446  }
4447 
4448  /* Setup synchronous commit according to the user's wishes */
4449  SetConfigOption("synchronous_commit", MySubscription->synccommit,
4451 
4452  /* Keep us informed about subscription changes. */
4455  (Datum) 0);
4456 
4457  if (am_tablesync_worker())
4458  ereport(LOG,
4459  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
4462  else
4463  ereport(LOG,
4464  /* translator: first %s is the name of logical replication worker */
4465  (errmsg("%s for subscription \"%s\" has started",
4467 
4469 }
4470 
4471 /* Logical Replication Apply worker entry point */
4472 void
4474 {
4475  int worker_slot = DatumGetInt32(main_arg);
4476  char originname[NAMEDATALEN];
4477  XLogRecPtr origin_startpos = InvalidXLogRecPtr;
4478  char *myslotname = NULL;
4480  int server_version;
4481 
4482  /* Attach to slot */
4483  logicalrep_worker_attach(worker_slot);
4484 
4485  /* Setup signal handling */
4487  pqsignal(SIGTERM, die);
4489 
4490  /*
4491  * We don't currently need any ResourceOwner in a walreceiver process, but
4492  * if we did, we could call CreateAuxProcessResourceOwner here.
4493  */
4494 
4495  /* Initialise stats to a sanish value */
4498 
4499  /* Load the libpq-specific functions */
4500  load_file("libpqwalreceiver", false);
4501 
4503 
4504  /* Connect to the origin and start the replication. */
4505  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
4507 
4508  if (am_tablesync_worker())
4509  {
4510  start_table_sync(&origin_startpos, &myslotname);
4511 
4514  originname,
4515  sizeof(originname));
4516  set_apply_error_context_origin(originname);
4517  }
4518  else
4519  {
4520  /* This is the leader apply worker */
4521  RepOriginId originid;
4522  TimeLineID startpointTLI;
4523  char *err;
4524 
4525  myslotname = MySubscription->slotname;
4526 
4527  /*
4528  * This shouldn't happen if the subscription is enabled, but guard
4529  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
4530  * crash if slot is NULL.)
4531  */
4532  if (!myslotname)
4533  ereport(ERROR,
4534  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
4535  errmsg("subscription has no replication slot set")));
4536 
4537  /* Setup replication origin tracking. */
4540  originname, sizeof(originname));
4541  originid = replorigin_by_name(originname, true);
4542  if (!OidIsValid(originid))
4543  originid = replorigin_create(originname);
4544  replorigin_session_setup(originid, 0);
4545  replorigin_session_origin = originid;
4546  origin_startpos = replorigin_session_get_progress(false);
4548 
4550  MySubscription->name, &err);
4551  if (LogRepWorkerWalRcvConn == NULL)
4552  ereport(ERROR,
4553  (errcode(ERRCODE_CONNECTION_FAILURE),
4554  errmsg("could not connect to the publisher: %s", err)));
4555 
4556  /*
4557  * We don't really use the output identify_system for anything but it
4558  * does some initializations on the upstream so let's still call it.
4559  */
4560  (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
4561 
4562  set_apply_error_context_origin(originname);
4563  }
4564 
4565  /*
4566  * Setup callback for syscache so that we know when something changes in
4567  * the subscription relation state.
4568  */
4571  (Datum) 0);
4572 
4573  /* Build logical replication streaming options. */
4574  options.logical = true;
4575  options.startpoint = origin_startpos;
4576  options.slotname = myslotname;
4577 
4579  options.proto.logical.proto_version =
4584 
4585  options.proto.logical.publication_names = MySubscription->publications;
4586  options.proto.logical.binary = MySubscription->binary;
4587 
4588  /*
4589  * Assign the appropriate option value for streaming option according to
4590  * the 'streaming' mode and the publisher's ability to support that mode.
4591  */
4592  if (server_version >= 160000 &&
4594  {
4595  options.proto.logical.streaming_str = "parallel";
4597  }
4598  else if (server_version >= 140000 &&
4600  {
4601  options.proto.logical.streaming_str = "on";
4603  }
4604  else
4605  {
4606  options.proto.logical.streaming_str = NULL;
4608  }
4609 
4610  options.proto.logical.twophase = false;
4611  options.proto.logical.origin = pstrdup(MySubscription->origin);
4612 
4613  if (!am_tablesync_worker())
4614  {
4615  /*
4616  * Even when the two_phase mode is requested by the user, it remains
4617  * as the tri-state PENDING until all tablesyncs have reached READY
4618  * state. Only then, can it become ENABLED.
4619  *
4620  * Note: If the subscription has no tables then leave the state as
4621  * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
4622  * work.
4623  */
4626  {
4627  /* Start streaming with two_phase enabled */
4628  options.proto.logical.twophase = true;
4630 
4635  }
4636  else
4637  {
4639  }
4640 
4641  ereport(DEBUG1,
4642  (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
4647  "?")));
4648  }
4649  else
4650  {
4651  /* Start normal logical streaming replication. */
4653  }
4654 
4655  /* Run the main loop. */
4656  start_apply(origin_startpos);
4657 
4658  proc_exit(0);
4659 }
4660 
4661 /*
4662  * After error recovery, disable the subscription in a new transaction
4663  * and exit cleanly.
4664  */
4665 static void
4667 {
4668  /*
4669  * Emit the error message, and recover from the error state to an idle
4670  * state
4671  */
4672  HOLD_INTERRUPTS();
4673 
4674  EmitErrorReport();
4676  FlushErrorState();
4677 
4679 
4680  /* Report the worker failed during either table synchronization or apply */
4682  !am_tablesync_worker());
4683 
4684  /* Disable the subscription */
4688 
4689  /* Ensure we remove no-longer-useful entry for worker's start time */
4692 
4693  /* Notify the subscription has been disabled and exit */
4694  ereport(LOG,
4695  errmsg("subscription \"%s\" has been disabled because of an error",
4696  MySubscription->name));
4697 
4698  proc_exit(0);
4699 }
4700 
4701 /*
4702  * Is current process a logical replication worker?
4703  */
4704 bool
4706 {
4707  return MyLogicalRepWorker != NULL;
4708 }
4709 
4710 /*
4711  * Is current process a logical replication parallel apply worker?
4712  */
4713 bool
4715 {
4717 }
4718 
4719 /*
4720  * Start skipping changes of the transaction if the given LSN matches the
4721  * LSN specified by subscription's skiplsn.
4722  */
4723 static void
4725 {
4729 
4730  /*
4731  * Quick return if it's not requested to skip this transaction. This
4732  * function is called for every remote transaction and we assume that
4733  * skipping the transaction is not used often.
4734  */
4736  MySubscription->skiplsn != finish_lsn))
4737  return;
4738 
4739  /* Start skipping all changes of this transaction */
4740  skip_xact_finish_lsn = finish_lsn;
4741 
4742  ereport(LOG,
4743  errmsg("logical replication starts skipping transaction at LSN %X/%X",
4745 }
4746 
4747 /*
4748  * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
4749  */
4750 static void
4752 {
4753  if (!is_skipping_changes())
4754  return;
4755 
4756  ereport(LOG,
4757  (errmsg("logical replication completed skipping transaction at LSN %X/%X",
4759 
4760  /* Stop skipping changes */
4762 }
4763 
4764 /*
4765  * Clear subskiplsn of pg_subscription catalog.
4766  *
4767  * finish_lsn is the transaction's finish LSN that is used to check if the
4768  * subskiplsn matches it. If not matched, we raise a warning when clearing the
4769  * subskiplsn in order to inform users for cases e.g., where the user mistakenly
4770  * specified the wrong subskiplsn.
4771  */
4772 static void
4774 {
4775  Relation rel;
4776  Form_pg_subscription subform;
4777  HeapTuple tup;
4778  XLogRecPtr myskiplsn = MySubscription->skiplsn;
4779  bool started_tx = false;
4780 
4782  return;
4783 
4784  if (!IsTransactionState())
4785  {
4787  started_tx = true;
4788  }
4789 
4790  /*
4791  * Protect subskiplsn of pg_subscription from being concurrently updated
4792  * while clearing it.
4793  */
4794  LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
4795  AccessShareLock);
4796 
4797  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
4798 
4799  /* Fetch the existing tuple. */
4802 
4803  if (!HeapTupleIsValid(tup))
4804  elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
4805 
4806  subform = (Form_pg_subscription) GETSTRUCT(tup);
4807 
4808  /*
4809  * Clear the subskiplsn. If the user has already changed subskiplsn before
4810  * clearing it we don't update the catalog and the replication origin
4811  * state won't get advanced. So in the worst case, if the server crashes
4812  * before sending an acknowledgment of the flush position the transaction
4813  * will be sent again and the user needs to set subskiplsn again. We can
4814  * reduce the possibility by logging a replication origin WAL record to
4815  * advance the origin LSN instead but there is no way to advance the
4816  * origin timestamp and it doesn't seem to be worth doing anything about
4817  * it since it's a very rare case.
4818  */
4819  if (subform->subskiplsn == myskiplsn)
4820  {
4821  bool nulls[Natts_pg_subscription];
4822  bool replaces[Natts_pg_subscription];
4823  Datum values[Natts_pg_subscription];
4824 
4825  memset(values, 0, sizeof(values));
4826  memset(nulls, false, sizeof(nulls));
4827  memset(replaces, false, sizeof(replaces));
4828 
4829  /* reset subskiplsn */
4830  values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
4831  replaces[Anum_pg_subscription_subskiplsn - 1] = true;
4832 
4833  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
4834  replaces);
4835  CatalogTupleUpdate(rel, &tup->t_self, tup);
4836 
4837  if (myskiplsn != finish_lsn)
4838  ereport(WARNING,
4839  errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
4840  errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
4841  LSN_FORMAT_ARGS(finish_lsn),
4842  LSN_FORMAT_ARGS(myskiplsn)));
4843  }
4844 
4845  heap_freetuple(tup);
4846  table_close(rel, NoLock);
4847 
4848  if (started_tx)
4850 }
4851 
4852 /* Error callback to give more context info about the change being applied */
4853 void
4855 {
4857 
4859  return;
4860 
4861  Assert(errarg->origin_name);
4862 
4863  if (errarg->rel == NULL)
4864  {
4865  if (!TransactionIdIsValid(errarg->remote_xid))
4866  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
4867  errarg->origin_name,
4868  logicalrep_message_type(errarg->command));
4869  else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4870  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
4871  errarg->origin_name,
4873  errarg->remote_xid);
4874  else
4875  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
4876  errarg->origin_name,
4878  errarg->remote_xid,
4879  LSN_FORMAT_ARGS(errarg->finish_lsn));
4880  }
4881  else
4882  {
4883  if (errarg->remote_attnum < 0)
4884  {
4885  if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4886  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
4887  errarg->origin_name,
4889  errarg->rel->remoterel.nspname,
4890  errarg->rel->remoterel.relname,
4891  errarg->remote_xid);
4892  else
4893  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
4894  errarg->origin_name,
4896  errarg->rel->remoterel.nspname,
4897  errarg->rel->remoterel.relname,
4898  errarg->remote_xid,
4899  LSN_FORMAT_ARGS(errarg->finish_lsn));
4900  }
4901  else
4902  {
4903  if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4904  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
4905  errarg->origin_name,
4907  errarg->rel->remoterel.nspname,
4908  errarg->rel->remoterel.relname,
4909  errarg->rel->remoterel.attnames[errarg->remote_attnum],
4910  errarg->remote_xid);
4911  else
4912  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
4913  errarg->origin_name,
4915  errarg->rel->remoterel.nspname,
4916  errarg->rel->remoterel.relname,
4917  errarg->rel->remoterel.attnames[errarg->remote_attnum],
4918  errarg->remote_xid,
4919  LSN_FORMAT_ARGS(errarg->finish_lsn));
4920  }
4921  }
4922 }
4923 
4924 /* Set transaction information of apply error callback */
4925 static inline void
4927 {
4930 }
4931 
4932 /* Reset all information of apply error callback */
4933 static inline void
4935 {
4940 }
4941 
4942 /*
4943  * Request wakeup of the workers for the given subscription OID
4944  * at commit of the current transaction.
4945  *
4946  * This is used to ensure that the workers process assorted changes
4947  * as soon as possible.
4948  */
4949 void
4951 {
4952  MemoryContext oldcxt;
4953 
4957  MemoryContextSwitchTo(oldcxt);
4958 }
4959 
4960 /*
4961  * Wake up the workers of any subscriptions that were changed in this xact.
4962  */
4963 void
4965 {
4966  if (isCommit && on_commit_wakeup_workers_subids != NIL)
4967  {
4968  ListCell *lc;
4969 
4970  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
4971  foreach(lc, on_commit_wakeup_workers_subids)
4972  {
4973  Oid subid = lfirst_oid(lc);
4974  List *workers;
4975  ListCell *lc2;
4976 
4977  workers = logicalrep_workers_find(subid, true);
4978  foreach(lc2, workers)
4979  {
4980  LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
4981 
4983  }
4984  }
4985  LWLockRelease(LogicalRepWorkerLock);
4986  }
4987 
4988  /* The List storage will be reclaimed automatically in xact cleanup. */
4990 }
4991 
4992 /*
4993  * Allocate the origin name in long-lived context for error context message.
4994  */
4995 void
4997 {
4999  originname);
5000 }
5001 
5002 /*
5003  * Return the action to be taken for the given transaction. See
5004  * TransApplyAction for information on each of the actions.
5005  *
5006  * *winfo is assigned to the destination parallel worker info when the leader
5007  * apply worker has to pass all the transaction's changes to the parallel
5008  * apply worker.
5009  */
5010 static TransApplyAction
5012 {
5013  *winfo = NULL;
5014 
5016  {
5017  return TRANS_PARALLEL_APPLY;
5018  }
5019 
5020  /*
5021  * If we are processing this transaction using a parallel apply worker then
5022  * either we send the changes to the parallel worker or if the worker is busy
5023  * then serialize the changes to the file which will later be processed by
5024  * the parallel worker.
5025  */
5026  *winfo = pa_find_worker(xid);
5027 
5028  if (*winfo && (*winfo)->serialize_changes)
5029  {
5031  }
5032  else if (*winfo)
5033  {
5035  }
5036 
5037  /*
5038  * If there is no parallel worker involved to process this transaction then
5039  * we either directly apply the change or serialize it to a file which will
5040  * later be applied when the transaction finish message is processed.
5041  */
5042  else if (in_streamed_transaction)
5043  {
5044  return TRANS_LEADER_SERIALIZE;
5045  }
5046  else
5047  {
5048  return TRANS_LEADER_APPLY;
5049  }
5050 }
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2679
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:3931
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_reset_subtrans(void)
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerShared * MyParallelShared
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_allocate_worker(TransactionId xid)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void pa_decr_and_wait_stream_block(void)
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:381
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:2488
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:4128
static void begin_replication_step(void)
Definition: worker.c:528
static void end_replication_step(void)
Definition: worker.c:551
static void cleanup_subxact_info(void)
Definition: worker.c:4281
static void apply_handle_stream_prepare(StringInfo s)
Definition: worker.c:1291
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition: worker.c:2466
static void subxact_info_add(TransactionId xid)
Definition: worker.c:4050
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:672
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:4149
MemoryContext ApplyMessageContext
Definition: worker.c:306
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:497
static void apply_handle_type(StringInfo s)
Definition: worker.c:2346
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:3114
void InitializeApplyWorker(void)
Definition: worker.c:4395
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:3426
static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
Definition: worker.c:4298
static void start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
Definition: worker.c:4319
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:3935
static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
Definition: worker.c:5011
TransApplyAction
Definition: worker.c:282
@ TRANS_LEADER_SERIALIZE
Definition: worker.c:287
@ TRANS_PARALLEL_APPLY
Definition: worker.c:290
@ TRANS_LEADER_SEND_TO_PARALLEL
Definition: worker.c:288
@ TRANS_LEADER_APPLY
Definition: worker.c:284
@ TRANS_LEADER_PARTIAL_SERIALIZE
Definition: worker.c:289
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:579
static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
Definition: worker.c:4266
struct ApplyExecutionData ApplyExecutionData
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:4135
static void apply_worker_exit(void)
Definition: worker.c:3792
static BufFile * stream_fd
Definition: worker.c:352
static void apply_handle_update(StringInfo s)
Definition: worker.c:2529
void stream_stop_internal(TransactionId xid)
Definition: worker.c:1623
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:2153
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:2829
static void stop_skipping_changes(void)
Definition: worker.c:4751
struct ApplySubXactData ApplySubXactData
#define NAPTIME_PER_CYCLE
Definition: worker.c:211
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:3356
static uint32 parallel_stream_nchanges
Definition: worker.c:331
static void apply_handle_commit_prepared(StringInfo s)
Definition: worker.c:1189
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:3442
void LogicalRepWorkersWakeupAtCommit(Oid subid)
Definition: worker.c:4950
bool IsLogicalWorker(void)
Definition: worker.c:4705
static ApplySubXactData subxact_data
Definition: worker.c:370
static void DisableSubscriptionAndExit(void)
Definition: worker.c:4666
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
Definition: worker.c:2863
ApplyErrorCallbackArg apply_error_callback_arg
Definition: worker.c:294
bool in_remote_transaction
Definition: worker.c:319
static XLogRecPtr skip_xact_finish_lsn
Definition: worker.c:348
static void stream_open_file(Oid subid, TransactionId xid, bool first_segment)
Definition: worker.c:4173
static void apply_handle_delete(StringInfo s)
Definition: worker.c:2700
void apply_dispatch(StringInfo s)
Definition: worker.c:3236
#define is_skipping_changes()
Definition: worker.c:349
static void stream_write_change(char action, StringInfo s)
Definition: worker.c:4236
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
Definition: worker.c:4773
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
Definition: worker.c:2633
static void ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, off_t offset)
Definition: worker.c:1989
static void apply_handle_begin(StringInfo s)
Definition: worker.c:1011
static dlist_head lsn_mapping
Definition: worker.c:220
bool IsLogicalParallelApplyWorker(void)
Definition: worker.c:4714
void AtEOXact_LogicalRepWorkers(bool isCommit)
Definition: worker.c:4964
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:817
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:457
static void finish_edata(ApplyExecutionData *edata)
Definition: worker.c:729
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:918
static void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:4926
static void start_apply(XLogRecPtr origin_startpos)
Definition: worker.c:4362
ErrorContextCallback * apply_error_context_stack
Definition: worker.c:304
static void stream_abort_internal(TransactionId xid, TransactionId subxid)
Definition: worker.c:1749
static void apply_handle_commit(StringInfo s)
Definition: worker.c:1036
void stream_start_internal(TransactionId xid, bool first_segment)
Definition: worker.c:1449
static List * on_commit_wakeup_workers_subids
Definition: worker.c:317
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:1832
static void apply_handle_relation(StringInfo s)
Definition: worker.c:2323
void set_apply_error_context_origin(char *originname)
Definition: worker.c:4996
struct ApplyErrorCallbackArg ApplyErrorCallbackArg
MemoryContext ApplyContext
Definition: worker.c:307
static void subxact_info_write(Oid subid, TransactionId xid)
Definition: worker.c:3950
static void TargetPrivilegesCheck(Relation rel, AclMode mode)
Definition: worker.c:2361
static void apply_handle_prepare(StringInfo s)
Definition: worker.c:1128
static void apply_handle_rollback_prepared(StringInfo s)
Definition: worker.c:1238
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:1646
static void apply_handle_origin(StringInfo s)
Definition: worker.c:1428
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:3703
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:312
static XLogRecPtr remote_final_lsn
Definition: worker.c:320
static bool MySubscriptionValid
Definition: worker.c:315
void apply_error_callback(void *arg)
Definition: worker.c:4854
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition: worker.c:3400
static MemoryContext LogicalStreamingContext
Definition: worker.c:310
void maybe_reread_subscription(void)
Definition: worker.c:3823
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
Definition: worker.c:2263
static bool in_streamed_transaction
Definition: worker.c:323
struct SubXactInfo SubXactInfo
static void apply_handle_begin_prepare(StringInfo s)
Definition: worker.c:1062
struct FlushPosition FlushPosition
void ApplyWorkerMain(Datum main_arg)
Definition: worker.c:4473
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:2021
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:1487
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Definition: worker.c:4724
Subscription * MySubscription
Definition: worker.c:314
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
Definition: worker.c:1091
static void stream_close_file(void)
Definition: worker.c:4218
static TransactionId stream_xid
Definition: worker.c:325
static void apply_handle_insert(StringInfo s)
Definition: worker.c:2393
static const char * get_worker_name(void)
Definition: worker.c:438
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:760
static void subxact_info_read(Oid subid, TransactionId xid)
Definition: worker.c:3999
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
Definition: worker.c:2775
static void reset_apply_error_context_info(void)
Definition: worker.c:4934
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1727
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1582
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1546
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_IDLE
@ STATE_IDLEINTRANSACTION
@ STATE_RUNNING
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:755
static Datum values[MAXATTR]
Definition: bootstrap.c:156
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:651
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:286
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
Definition: buffile.c:830
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition: buffile.c:673
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
Definition: buffile.c:661
void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset)
Definition: buffile.c:948
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:737
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition: buffile.c:262
void BufFileClose(BufFile *file)
Definition: buffile.c:407
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
Definition: buffile.c:359
unsigned int uint32
Definition: c.h:490
#define likely(x)
Definition: c.h:294
uint32 TransactionId
Definition: c.h:636
#define OidIsValid(objectId)
Definition: c.h:759
size_t Size
Definition: c.h:589
int64 TimestampTz
Definition: timestamp.h:39
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:144
int my_log2(long num)
Definition: dynahash.c:1760
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1156
void EmitErrorReport(void)
Definition: elog.c:1669
int errdetail(const char *fmt,...)
Definition: elog.c:1202
ErrorContextCallback * error_context_stack
Definition: elog.c:95
void FlushErrorState(void)
Definition: elog.c:1825
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define _(x)
Definition: elog.c:91
#define LOG
Definition: elog.h:31
#define PG_RE_THROW()
Definition: elog.h:411
#define errcontext
Definition: elog.h:196
#define PG_TRY(...)
Definition: elog.h:370
#define WARNING
Definition: elog.h:36
#define DEBUG2
Definition: elog.h:29
#define PG_END_TRY(...)
Definition: elog.h:395
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:380
#define ereport(elevel,...)
Definition: elog.h:149
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:223
void err(int eval, const char *fmt,...)
Definition: err.c:43
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:127
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:231
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:156
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1779
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition: execMain.c:1188
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2933
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2511
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1191
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83