PostgreSQL Source Code  git master
worker.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * worker.c
3  * PostgreSQL logical replication worker (apply)
4  *
5  * Copyright (c) 2016-2024, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/worker.c
9  *
10  * NOTES
11  * This file contains the worker which applies logical changes as they come
12  * from remote logical replication stream.
13  *
14  * The main worker (apply) is started by logical replication worker
15  * launcher for every enabled subscription in a database. It uses
16  * walsender protocol to communicate with publisher.
17  *
18  * This module includes server facing code and shares libpqwalreceiver
19  * module with walreceiver for providing the libpq specific functionality.
20  *
21  *
22  * STREAMED TRANSACTIONS
23  * ---------------------
24  * Streamed transactions (large transactions exceeding a memory limit on the
25  * upstream) are applied using one of two approaches:
26  *
27  * 1) Write to temporary files and apply when the final commit arrives
28  *
29  * This approach is used when the user has set the subscription's streaming
30  * option as on.
31  *
32  * Unlike the regular (non-streamed) case, handling streamed transactions has
33  * to handle aborts of both the toplevel transaction and subtransactions. This
34  * is achieved by tracking offsets for subtransactions, which is then used
35  * to truncate the file with serialized changes.
36  *
37  * The files are placed in tmp file directory by default, and the filenames
38  * include both the XID of the toplevel transaction and OID of the
39  * subscription. This is necessary so that different workers processing a
40  * remote transaction with the same XID doesn't interfere.
41  *
42  * We use BufFiles instead of using normal temporary files because (a) the
43  * BufFile infrastructure supports temporary files that exceed the OS file size
44  * limit, (b) provides a way for automatic clean up on the error and (c) provides
45  * a way to survive these files across local transactions and allow to open and
46  * close at stream start and close. We decided to use FileSet
47  * infrastructure as without that it deletes the files on the closure of the
48  * file and if we decide to keep stream files open across the start/stop stream
49  * then it will consume a lot of memory (more than 8K for each BufFile and
50  * there could be multiple such BufFiles as the subscriber could receive
51  * multiple start/stop streams for different transactions before getting the
52  * commit). Moreover, if we don't use FileSet then we also need to invent
53  * a new way to pass filenames to BufFile APIs so that we are allowed to open
54  * the file we desired across multiple stream-open calls for the same
55  * transaction.
56  *
57  * 2) Parallel apply workers.
58  *
59  * This approach is used when the user has set the subscription's streaming
60  * option as parallel. See logical/applyparallelworker.c for information about
61  * this approach.
62  *
63  * TWO_PHASE TRANSACTIONS
64  * ----------------------
65  * Two phase transactions are replayed at prepare and then committed or
66  * rolled back at commit prepared and rollback prepared respectively. It is
67  * possible to have a prepared transaction that arrives at the apply worker
68  * when the tablesync is busy doing the initial copy. In this case, the apply
69  * worker skips all the prepared operations [e.g. inserts] while the tablesync
70  * is still busy (see the condition of should_apply_changes_for_rel). The
71  * tablesync worker might not get such a prepared transaction because say it
72  * was prior to the initial consistent point but might have got some later
73  * commits. Now, the tablesync worker will exit without doing anything for the
74  * prepared transaction skipped by the apply worker as the sync location for it
75  * will be already ahead of the apply worker's current location. This would lead
76  * to an "empty prepare", because later when the apply worker does the commit
77  * prepare, there is nothing in it (the inserts were skipped earlier).
78  *
79  * To avoid this, and similar prepare confusions the subscription's two_phase
80  * commit is enabled only after the initial sync is over. The two_phase option
81  * has been implemented as a tri-state with values DISABLED, PENDING, and
82  * ENABLED.
83  *
84  * Even if the user specifies they want a subscription with two_phase = on,
85  * internally it will start with a tri-state of PENDING which only becomes
86  * ENABLED after all tablesync initializations are completed - i.e. when all
87  * tablesync workers have reached their READY state. In other words, the value
88  * PENDING is only a temporary state for subscription start-up.
89  *
90  * Until the two_phase is properly available (ENABLED) the subscription will
91  * behave as if two_phase = off. When the apply worker detects that all
92  * tablesyncs have become READY (while the tri-state was PENDING) it will
93  * restart the apply worker process. This happens in
94  * process_syncing_tables_for_apply.
95  *
96  * When the (re-started) apply worker finds that all tablesyncs are READY for a
97  * two_phase tri-state of PENDING it start streaming messages with the
98  * two_phase option which in turn enables the decoding of two-phase commits at
99  * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100  * Now, it is possible that during the time we have not enabled two_phase, the
101  * publisher (replication server) would have skipped some prepares but we
102  * ensure that such prepares are sent along with commit prepare, see
103  * ReorderBufferFinishPrepared.
104  *
105  * If the subscription has no tables then a two_phase tri-state PENDING is
106  * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107  * PUBLICATION which might otherwise be disallowed (see below).
108  *
109  * If ever a user needs to be aware of the tri-state value, they can fetch it
110  * from the pg_subscription catalog (see column subtwophasestate).
111  *
112  * We don't allow to toggle two_phase option of a subscription because it can
113  * lead to an inconsistent replica. Consider, initially, it was on and we have
114  * received some prepare then we turn it off, now at commit time the server
115  * will send the entire transaction data along with the commit. With some more
116  * analysis, we can allow changing this option from off to on but not sure if
117  * that alone would be useful.
118  *
119  * Finally, to avoid problems mentioned in previous paragraphs from any
120  * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
121  * to 'off' and then again back to 'on') there is a restriction for
122  * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
123  * the two_phase tri-state is ENABLED, except when copy_data = false.
124  *
125  * We can get prepare of the same GID more than once for the genuine cases
126  * where we have defined multiple subscriptions for publications on the same
127  * server and prepared transaction has operations on tables subscribed to those
128  * subscriptions. For such cases, if we use the GID sent by publisher one of
129  * the prepares will be successful and others will fail, in which case the
130  * server will send them again. Now, this can lead to a deadlock if user has
131  * set synchronous_standby_names for all the subscriptions on subscriber. To
132  * avoid such deadlocks, we generate a unique GID (consisting of the
133  * subscription oid and the xid of the prepared transaction) for each prepare
134  * transaction on the subscriber.
135  *
136  * FAILOVER
137  * ----------------------
138  * The logical slot on the primary can be synced to the standby by specifying
139  * failover = true when creating the subscription. Enabling failover allows us
140  * to smoothly transition to the promoted standby, ensuring that we can
141  * subscribe to the new primary without losing any data.
142  *-------------------------------------------------------------------------
143  */
144 
145 #include "postgres.h"
146 
147 #include <sys/stat.h>
148 #include <unistd.h>
149 
150 #include "access/table.h"
151 #include "access/tableam.h"
152 #include "access/twophase.h"
153 #include "access/xact.h"
154 #include "catalog/indexing.h"
155 #include "catalog/pg_inherits.h"
156 #include "catalog/pg_subscription.h"
158 #include "commands/tablecmds.h"
159 #include "commands/trigger.h"
160 #include "executor/executor.h"
161 #include "executor/execPartition.h"
162 #include "libpq/pqformat.h"
163 #include "miscadmin.h"
164 #include "optimizer/optimizer.h"
165 #include "parser/parse_relation.h"
166 #include "pgstat.h"
167 #include "postmaster/bgworker.h"
168 #include "postmaster/interrupt.h"
169 #include "postmaster/walwriter.h"
174 #include "replication/origin.h"
175 #include "replication/walreceiver.h"
177 #include "rewrite/rewriteHandler.h"
178 #include "storage/buffile.h"
179 #include "storage/ipc.h"
180 #include "storage/lmgr.h"
181 #include "tcop/tcopprot.h"
182 #include "utils/acl.h"
183 #include "utils/dynahash.h"
184 #include "utils/guc.h"
185 #include "utils/inval.h"
186 #include "utils/lsyscache.h"
187 #include "utils/memutils.h"
188 #include "utils/pg_lsn.h"
189 #include "utils/rel.h"
190 #include "utils/rls.h"
191 #include "utils/snapmgr.h"
192 #include "utils/syscache.h"
193 #include "utils/usercontext.h"
194 
195 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
196 
197 typedef struct FlushPosition
198 {
203 
205 
206 typedef struct ApplyExecutionData
207 {
208  EState *estate; /* executor state, used to track resources */
209 
210  LogicalRepRelMapEntry *targetRel; /* replication target rel */
211  ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
212 
213  /* These fields are used when the target relation is partitioned: */
214  ModifyTableState *mtstate; /* dummy ModifyTable state */
215  PartitionTupleRouting *proute; /* partition routing info */
217 
218 /* Struct for saving and restoring apply errcontext information */
219 typedef struct ApplyErrorCallbackArg
220 {
221  LogicalRepMsgType command; /* 0 if invalid */
223 
224  /* Remote node information */
225  int remote_attnum; /* -1 if invalid */
228  char *origin_name;
230 
231 /*
232  * The action to be taken for the changes in the transaction.
233  *
234  * TRANS_LEADER_APPLY:
235  * This action means that we are in the leader apply worker or table sync
236  * worker. The changes of the transaction are either directly applied or
237  * are read from temporary files (for streaming transactions) and then
238  * applied by the worker.
239  *
240  * TRANS_LEADER_SERIALIZE:
241  * This action means that we are in the leader apply worker or table sync
242  * worker. Changes are written to temporary files and then applied when the
243  * final commit arrives.
244  *
245  * TRANS_LEADER_SEND_TO_PARALLEL:
246  * This action means that we are in the leader apply worker and need to send
247  * the changes to the parallel apply worker.
248  *
249  * TRANS_LEADER_PARTIAL_SERIALIZE:
250  * This action means that we are in the leader apply worker and have sent some
251  * changes directly to the parallel apply worker and the remaining changes are
252  * serialized to a file, due to timeout while sending data. The parallel apply
253  * worker will apply these serialized changes when the final commit arrives.
254  *
255  * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
256  * serializing changes, the leader worker also needs to serialize the
257  * STREAM_XXX message to a file, and wait for the parallel apply worker to
258  * finish the transaction when processing the transaction finish command. So
259  * this new action was introduced to keep the code and logic clear.
260  *
261  * TRANS_PARALLEL_APPLY:
262  * This action means that we are in the parallel apply worker and changes of
263  * the transaction are applied directly by the worker.
264  */
265 typedef enum
266 {
267  /* The action for non-streaming transactions. */
269 
270  /* Actions for streaming transactions. */
276 
277 /* errcontext tracker */
279 {
280  .command = 0,
281  .rel = NULL,
282  .remote_attnum = -1,
283  .remote_xid = InvalidTransactionId,
284  .finish_lsn = InvalidXLogRecPtr,
285  .origin_name = NULL,
286 };
287 
289 
292 
293 /* per stream context for streaming transactions */
295 
297 
299 static bool MySubscriptionValid = false;
300 
302 
305 
306 /* fields valid only when processing streamed transaction */
307 static bool in_streamed_transaction = false;
308 
310 
311 /*
312  * The number of changes applied by parallel apply worker during one streaming
313  * block.
314  */
316 
317 /* Are we initializing an apply worker? */
319 
320 /*
321  * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
322  * the subscription if the remote transaction's finish LSN matches the subskiplsn.
323  * Once we start skipping changes, we don't stop it until we skip all changes of
324  * the transaction even if pg_subscription is updated and MySubscription->skiplsn
325  * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
326  * we don't skip receiving and spooling the changes since we decide whether or not
327  * to skip applying the changes when starting to apply changes. The subskiplsn is
328  * cleared after successfully skipping the transaction or applying non-empty
329  * transaction. The latter prevents the mistakenly specified subskiplsn from
330  * being left. Note that we cannot skip the streaming transactions when using
331  * parallel apply workers because we cannot get the finish LSN before applying
332  * the changes. So, we don't start parallel apply worker when finish LSN is set
333  * by the user.
334  */
336 #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
337 
338 /* BufFile handle of the current streaming file */
339 static BufFile *stream_fd = NULL;
340 
341 typedef struct SubXactInfo
342 {
343  TransactionId xid; /* XID of the subxact */
344  int fileno; /* file number in the buffile */
345  off_t offset; /* offset in the file */
347 
348 /* Sub-transaction data for the current streaming transaction */
349 typedef struct ApplySubXactData
350 {
351  uint32 nsubxacts; /* number of sub-transactions */
352  uint32 nsubxacts_max; /* current capacity of subxacts */
353  TransactionId subxact_last; /* xid of the last sub-transaction */
354  SubXactInfo *subxacts; /* sub-xact offset in changes file */
356 
358 
359 static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
360 static inline void changes_filename(char *path, Oid subid, TransactionId xid);
361 
362 /*
363  * Information about subtransactions of a given toplevel transaction.
364  */
365 static void subxact_info_write(Oid subid, TransactionId xid);
366 static void subxact_info_read(Oid subid, TransactionId xid);
367 static void subxact_info_add(TransactionId xid);
368 static inline void cleanup_subxact_info(void);
369 
370 /*
371  * Serialize and deserialize changes for a toplevel transaction.
372  */
373 static void stream_open_file(Oid subid, TransactionId xid,
374  bool first_segment);
375 static void stream_write_change(char action, StringInfo s);
377 static void stream_close_file(void);
378 
379 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
380 
381 static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
383  ResultRelInfo *relinfo,
384  TupleTableSlot *remoteslot);
386  ResultRelInfo *relinfo,
387  TupleTableSlot *remoteslot,
388  LogicalRepTupleData *newtup,
389  Oid localindexoid);
391  ResultRelInfo *relinfo,
392  TupleTableSlot *remoteslot,
393  Oid localindexoid);
394 static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
395  LogicalRepRelation *remoterel,
396  Oid localidxoid,
397  TupleTableSlot *remoteslot,
398  TupleTableSlot **localslot);
400  TupleTableSlot *remoteslot,
401  LogicalRepTupleData *newtup,
402  CmdType operation);
403 
404 /* Functions for skipping changes */
405 static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
406 static void stop_skipping_changes(void);
407 static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
408 
409 /* Functions for apply error callback */
410 static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
411 static inline void reset_apply_error_context_info(void);
412 
414  ParallelApplyWorkerInfo **winfo);
415 
416 /*
417  * Form the origin name for the subscription.
418  *
419  * This is a common function for tablesync and other workers. Tablesync workers
420  * must pass a valid relid. Other callers must pass relid = InvalidOid.
421  *
422  * Return the name in the supplied buffer.
423  */
424 void
426  char *originname, Size szoriginname)
427 {
428  if (OidIsValid(relid))
429  {
430  /* Replication origin name for tablesync workers. */
431  snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
432  }
433  else
434  {
435  /* Replication origin name for non-tablesync workers. */
436  snprintf(originname, szoriginname, "pg_%u", suboid);
437  }
438 }
439 
440 /*
441  * Should this worker apply changes for given relation.
442  *
443  * This is mainly needed for initial relation data sync as that runs in
444  * separate worker process running in parallel and we need some way to skip
445  * changes coming to the leader apply worker during the sync of a table.
446  *
447  * Note we need to do smaller or equals comparison for SYNCDONE state because
448  * it might hold position of end of initial slot consistent point WAL
449  * record + 1 (ie start of next record) and next record can be COMMIT of
450  * transaction we are now processing (which is what we set remote_final_lsn
451  * to in apply_handle_begin).
452  *
453  * Note that for streaming transactions that are being applied in the parallel
454  * apply worker, we disallow applying changes if the target table in the
455  * subscription is not in the READY state, because we cannot decide whether to
456  * apply the change as we won't know remote_final_lsn by that time.
457  *
458  * We already checked this in pa_can_start() before assigning the
459  * streaming transaction to the parallel worker, but it also needs to be
460  * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
461  * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
462  * while applying this transaction.
463  */
464 static bool
466 {
467  switch (MyLogicalRepWorker->type)
468  {
470  return MyLogicalRepWorker->relid == rel->localreloid;
471 
473  /* We don't synchronize rel's that are in unknown state. */
474  if (rel->state != SUBREL_STATE_READY &&
475  rel->state != SUBREL_STATE_UNKNOWN)
476  ereport(ERROR,
477  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
478  errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
480  errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
481 
482  return rel->state == SUBREL_STATE_READY;
483 
484  case WORKERTYPE_APPLY:
485  return (rel->state == SUBREL_STATE_READY ||
486  (rel->state == SUBREL_STATE_SYNCDONE &&
487  rel->statelsn <= remote_final_lsn));
488 
489  case WORKERTYPE_UNKNOWN:
490  /* Should never happen. */
491  elog(ERROR, "Unknown worker type");
492  }
493 
494  return false; /* dummy for compiler */
495 }
496 
497 /*
498  * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
499  *
500  * Start a transaction, if this is the first step (else we keep using the
501  * existing transaction).
502  * Also provide a global snapshot and ensure we run in ApplyMessageContext.
503  */
504 static void
506 {
508 
509  if (!IsTransactionState())
510  {
513  }
514 
516 
518 }
519 
520 /*
521  * Finish up one step of a replication transaction.
522  * Callers of begin_replication_step() must also call this.
523  *
524  * We don't close out the transaction here, but we should increment
525  * the command counter to make the effects of this step visible.
526  */
527 static void
529 {
531 
533 }
534 
535 /*
536  * Handle streamed transactions for both the leader apply worker and the
537  * parallel apply workers.
538  *
539  * In the streaming case (receiving a block of the streamed transaction), for
540  * serialize mode, simply redirect it to a file for the proper toplevel
541  * transaction, and for parallel mode, the leader apply worker will send the
542  * changes to parallel apply workers and the parallel apply worker will define
543  * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
544  * messages will be applied by both leader apply worker and parallel apply
545  * workers).
546  *
547  * Returns true for streamed transactions (when the change is either serialized
548  * to file or sent to parallel apply worker), false otherwise (regular mode or
549  * needs to be processed by parallel apply worker).
550  *
551  * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
552  * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
553  * to a parallel apply worker.
554  */
555 static bool
557 {
558  TransactionId current_xid;
560  TransApplyAction apply_action;
561  StringInfoData original_msg;
562 
563  apply_action = get_transaction_apply_action(stream_xid, &winfo);
564 
565  /* not in streaming mode */
566  if (apply_action == TRANS_LEADER_APPLY)
567  return false;
568 
570 
571  /*
572  * The parallel apply worker needs the xid in this message to decide
573  * whether to define a savepoint, so save the original message that has
574  * not moved the cursor after the xid. We will serialize this message to a
575  * file in PARTIAL_SERIALIZE mode.
576  */
577  original_msg = *s;
578 
579  /*
580  * We should have received XID of the subxact as the first part of the
581  * message, so extract it.
582  */
583  current_xid = pq_getmsgint(s, 4);
584 
585  if (!TransactionIdIsValid(current_xid))
586  ereport(ERROR,
587  (errcode(ERRCODE_PROTOCOL_VIOLATION),
588  errmsg_internal("invalid transaction ID in streamed replication transaction")));
589 
590  switch (apply_action)
591  {
593  Assert(stream_fd);
594 
595  /* Add the new subxact to the array (unless already there). */
596  subxact_info_add(current_xid);
597 
598  /* Write the change to the current file */
600  return true;
601 
603  Assert(winfo);
604 
605  /*
606  * XXX The publisher side doesn't always send relation/type update
607  * messages after the streaming transaction, so also update the
608  * relation/type in leader apply worker. See function
609  * cleanup_rel_sync_cache.
610  */
611  if (pa_send_data(winfo, s->len, s->data))
612  return (action != LOGICAL_REP_MSG_RELATION &&
614 
615  /*
616  * Switch to serialize mode when we are not able to send the
617  * change to parallel apply worker.
618  */
619  pa_switch_to_partial_serialize(winfo, false);
620 
621  /* fall through */
623  stream_write_change(action, &original_msg);
624 
625  /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
626  return (action != LOGICAL_REP_MSG_RELATION &&
628 
631 
632  /* Define a savepoint for a subxact if needed. */
633  pa_start_subtrans(current_xid, stream_xid);
634  return false;
635 
636  default:
637  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
638  return false; /* silence compiler warning */
639  }
640 }
641 
642 /*
643  * Executor state preparation for evaluation of constraint expressions,
644  * indexes and triggers for the specified relation.
645  *
646  * Note that the caller must open and close any indexes to be updated.
647  */
648 static ApplyExecutionData *
650 {
651  ApplyExecutionData *edata;
652  EState *estate;
653  RangeTblEntry *rte;
654  List *perminfos = NIL;
655  ResultRelInfo *resultRelInfo;
656 
657  edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
658  edata->targetRel = rel;
659 
660  edata->estate = estate = CreateExecutorState();
661 
662  rte = makeNode(RangeTblEntry);
663  rte->rtekind = RTE_RELATION;
664  rte->relid = RelationGetRelid(rel->localrel);
665  rte->relkind = rel->localrel->rd_rel->relkind;
666  rte->rellockmode = AccessShareLock;
667 
668  addRTEPermissionInfo(&perminfos, rte);
669 
670  ExecInitRangeTable(estate, list_make1(rte), perminfos);
671 
672  edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
673 
674  /*
675  * Use Relation opened by logicalrep_rel_open() instead of opening it
676  * again.
677  */
678  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
679 
680  /*
681  * We put the ResultRelInfo in the es_opened_result_relations list, even
682  * though we don't populate the es_result_relations array. That's a bit
683  * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
684  *
685  * ExecOpenIndices() is not called here either, each execution path doing
686  * an apply operation being responsible for that.
687  */
689  lappend(estate->es_opened_result_relations, resultRelInfo);
690 
691  estate->es_output_cid = GetCurrentCommandId(true);
692 
693  /* Prepare to catch AFTER triggers. */
695 
696  /* other fields of edata remain NULL for now */
697 
698  return edata;
699 }
700 
701 /*
702  * Finish any operations related to the executor state created by
703  * create_edata_for_relation().
704  */
705 static void
707 {
708  EState *estate = edata->estate;
709 
710  /* Handle any queued AFTER triggers. */
711  AfterTriggerEndQuery(estate);
712 
713  /* Shut down tuple routing, if any was done. */
714  if (edata->proute)
715  ExecCleanupTupleRouting(edata->mtstate, edata->proute);
716 
717  /*
718  * Cleanup. It might seem that we should call ExecCloseResultRelations()
719  * here, but we intentionally don't. It would close the rel we added to
720  * es_opened_result_relations above, which is wrong because we took no
721  * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
722  * any other relations opened during execution.
723  */
724  ExecResetTupleTable(estate->es_tupleTable, false);
725  FreeExecutorState(estate);
726  pfree(edata);
727 }
728 
729 /*
730  * Executes default values for columns for which we can't map to remote
731  * relation columns.
732  *
733  * This allows us to support tables which have more columns on the downstream
734  * than on the upstream.
735  */
736 static void
738  TupleTableSlot *slot)
739 {
740  TupleDesc desc = RelationGetDescr(rel->localrel);
741  int num_phys_attrs = desc->natts;
742  int i;
743  int attnum,
744  num_defaults = 0;
745  int *defmap;
746  ExprState **defexprs;
747  ExprContext *econtext;
748 
749  econtext = GetPerTupleExprContext(estate);
750 
751  /* We got all the data via replication, no need to evaluate anything. */
752  if (num_phys_attrs == rel->remoterel.natts)
753  return;
754 
755  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
756  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
757 
758  Assert(rel->attrmap->maplen == num_phys_attrs);
759  for (attnum = 0; attnum < num_phys_attrs; attnum++)
760  {
761  Expr *defexpr;
762 
763  if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
764  continue;
765 
766  if (rel->attrmap->attnums[attnum] >= 0)
767  continue;
768 
769  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
770 
771  if (defexpr != NULL)
772  {
773  /* Run the expression through planner */
774  defexpr = expression_planner(defexpr);
775 
776  /* Initialize executable expression in copycontext */
777  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
778  defmap[num_defaults] = attnum;
779  num_defaults++;
780  }
781  }
782 
783  for (i = 0; i < num_defaults; i++)
784  slot->tts_values[defmap[i]] =
785  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
786 }
787 
788 /*
789  * Store tuple data into slot.
790  *
791  * Incoming data can be either text or binary format.
792  */
793 static void
795  LogicalRepTupleData *tupleData)
796 {
797  int natts = slot->tts_tupleDescriptor->natts;
798  int i;
799 
800  ExecClearTuple(slot);
801 
802  /* Call the "in" function for each non-dropped, non-null attribute */
803  Assert(natts == rel->attrmap->maplen);
804  for (i = 0; i < natts; i++)
805  {
807  int remoteattnum = rel->attrmap->attnums[i];
808 
809  if (!att->attisdropped && remoteattnum >= 0)
810  {
811  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
812 
813  Assert(remoteattnum < tupleData->ncols);
814 
815  /* Set attnum for error callback */
817 
818  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
819  {
820  Oid typinput;
821  Oid typioparam;
822 
823  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
824  slot->tts_values[i] =
825  OidInputFunctionCall(typinput, colvalue->data,
826  typioparam, att->atttypmod);
827  slot->tts_isnull[i] = false;
828  }
829  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
830  {
831  Oid typreceive;
832  Oid typioparam;
833 
834  /*
835  * In some code paths we may be asked to re-parse the same
836  * tuple data. Reset the StringInfo's cursor so that works.
837  */
838  colvalue->cursor = 0;
839 
840  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
841  slot->tts_values[i] =
842  OidReceiveFunctionCall(typreceive, colvalue,
843  typioparam, att->atttypmod);
844 
845  /* Trouble if it didn't eat the whole buffer */
846  if (colvalue->cursor != colvalue->len)
847  ereport(ERROR,
848  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
849  errmsg("incorrect binary data format in logical replication column %d",
850  remoteattnum + 1)));
851  slot->tts_isnull[i] = false;
852  }
853  else
854  {
855  /*
856  * NULL value from remote. (We don't expect to see
857  * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
858  * NULL.)
859  */
860  slot->tts_values[i] = (Datum) 0;
861  slot->tts_isnull[i] = true;
862  }
863 
864  /* Reset attnum for error callback */
866  }
867  else
868  {
869  /*
870  * We assign NULL to dropped attributes and missing values
871  * (missing values should be later filled using
872  * slot_fill_defaults).
873  */
874  slot->tts_values[i] = (Datum) 0;
875  slot->tts_isnull[i] = true;
876  }
877  }
878 
879  ExecStoreVirtualTuple(slot);
880 }
881 
882 /*
883  * Replace updated columns with data from the LogicalRepTupleData struct.
884  * This is somewhat similar to heap_modify_tuple but also calls the type
885  * input functions on the user data.
886  *
887  * "slot" is filled with a copy of the tuple in "srcslot", replacing
888  * columns provided in "tupleData" and leaving others as-is.
889  *
890  * Caution: unreplaced pass-by-ref columns in "slot" will point into the
891  * storage for "srcslot". This is OK for current usage, but someday we may
892  * need to materialize "slot" at the end to make it independent of "srcslot".
893  */
894 static void
897  LogicalRepTupleData *tupleData)
898 {
899  int natts = slot->tts_tupleDescriptor->natts;
900  int i;
901 
902  /* We'll fill "slot" with a virtual tuple, so we must start with ... */
903  ExecClearTuple(slot);
904 
905  /*
906  * Copy all the column data from srcslot, so that we'll have valid values
907  * for unreplaced columns.
908  */
909  Assert(natts == srcslot->tts_tupleDescriptor->natts);
910  slot_getallattrs(srcslot);
911  memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
912  memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
913 
914  /* Call the "in" function for each replaced attribute */
915  Assert(natts == rel->attrmap->maplen);
916  for (i = 0; i < natts; i++)
917  {
919  int remoteattnum = rel->attrmap->attnums[i];
920 
921  if (remoteattnum < 0)
922  continue;
923 
924  Assert(remoteattnum < tupleData->ncols);
925 
926  if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
927  {
928  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
929 
930  /* Set attnum for error callback */
932 
933  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
934  {
935  Oid typinput;
936  Oid typioparam;
937 
938  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
939  slot->tts_values[i] =
940  OidInputFunctionCall(typinput, colvalue->data,
941  typioparam, att->atttypmod);
942  slot->tts_isnull[i] = false;
943  }
944  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
945  {
946  Oid typreceive;
947  Oid typioparam;
948 
949  /*
950  * In some code paths we may be asked to re-parse the same
951  * tuple data. Reset the StringInfo's cursor so that works.
952  */
953  colvalue->cursor = 0;
954 
955  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
956  slot->tts_values[i] =
957  OidReceiveFunctionCall(typreceive, colvalue,
958  typioparam, att->atttypmod);
959 
960  /* Trouble if it didn't eat the whole buffer */
961  if (colvalue->cursor != colvalue->len)
962  ereport(ERROR,
963  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
964  errmsg("incorrect binary data format in logical replication column %d",
965  remoteattnum + 1)));
966  slot->tts_isnull[i] = false;
967  }
968  else
969  {
970  /* must be LOGICALREP_COLUMN_NULL */
971  slot->tts_values[i] = (Datum) 0;
972  slot->tts_isnull[i] = true;
973  }
974 
975  /* Reset attnum for error callback */
977  }
978  }
979 
980  /* And finally, declare that "slot" contains a valid virtual tuple */
981  ExecStoreVirtualTuple(slot);
982 }
983 
984 /*
985  * Handle BEGIN message.
986  */
987 static void
989 {
990  LogicalRepBeginData begin_data;
991 
992  /* There must not be an active streaming transaction. */
994 
995  logicalrep_read_begin(s, &begin_data);
996  set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
997 
998  remote_final_lsn = begin_data.final_lsn;
999 
1001 
1002  in_remote_transaction = true;
1003 
1005 }
1006 
1007 /*
1008  * Handle COMMIT message.
1009  *
1010  * TODO, support tracking of multiple origins
1011  */
1012 static void
1014 {
1015  LogicalRepCommitData commit_data;
1016 
1017  logicalrep_read_commit(s, &commit_data);
1018 
1019  if (commit_data.commit_lsn != remote_final_lsn)
1020  ereport(ERROR,
1021  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1022  errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
1023  LSN_FORMAT_ARGS(commit_data.commit_lsn),
1025 
1026  apply_handle_commit_internal(&commit_data);
1027 
1028  /* Process any tables that are being synchronized in parallel. */
1029  process_syncing_tables(commit_data.end_lsn);
1030 
1033 }
1034 
1035 /*
1036  * Handle BEGIN PREPARE message.
1037  */
1038 static void
1040 {
1041  LogicalRepPreparedTxnData begin_data;
1042 
1043  /* Tablesync should never receive prepare. */
1044  if (am_tablesync_worker())
1045  ereport(ERROR,
1046  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1047  errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1048 
1049  /* There must not be an active streaming transaction. */
1051 
1052  logicalrep_read_begin_prepare(s, &begin_data);
1053  set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1054 
1055  remote_final_lsn = begin_data.prepare_lsn;
1056 
1058 
1059  in_remote_transaction = true;
1060 
1062 }
1063 
1064 /*
1065  * Common function to prepare the GID.
1066  */
1067 static void
1069 {
1070  char gid[GIDSIZE];
1071 
1072  /*
1073  * Compute unique GID for two_phase transactions. We don't use GID of
1074  * prepared transaction sent by server as that can lead to deadlock when
1075  * we have multiple subscriptions from same node point to publications on
1076  * the same node. See comments atop worker.c
1077  */
1078  TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
1079  gid, sizeof(gid));
1080 
1081  /*
1082  * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1083  * called within the PrepareTransactionBlock below.
1084  */
1085  if (!IsTransactionBlock())
1086  {
1088  CommitTransactionCommand(); /* Completes the preceding Begin command. */
1089  }
1090 
1091  /*
1092  * Update origin state so we can restart streaming from correct position
1093  * in case of crash.
1094  */
1095  replorigin_session_origin_lsn = prepare_data->end_lsn;
1097 
1099 }
1100 
1101 /*
1102  * Handle PREPARE message.
1103  */
1104 static void
1106 {
1107  LogicalRepPreparedTxnData prepare_data;
1108 
1109  logicalrep_read_prepare(s, &prepare_data);
1110 
1111  if (prepare_data.prepare_lsn != remote_final_lsn)
1112  ereport(ERROR,
1113  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1114  errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
1115  LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1117 
1118  /*
1119  * Unlike commit, here, we always prepare the transaction even though no
1120  * change has happened in this transaction or all changes are skipped. It
1121  * is done this way because at commit prepared time, we won't know whether
1122  * we have skipped preparing a transaction because of those reasons.
1123  *
1124  * XXX, We can optimize such that at commit prepared time, we first check
1125  * whether we have prepared the transaction or not but that doesn't seem
1126  * worthwhile because such cases shouldn't be common.
1127  */
1129 
1130  apply_handle_prepare_internal(&prepare_data);
1131 
1134  pgstat_report_stat(false);
1135 
1137 
1138  in_remote_transaction = false;
1139 
1140  /* Process any tables that are being synchronized in parallel. */
1141  process_syncing_tables(prepare_data.end_lsn);
1142 
1143  /*
1144  * Since we have already prepared the transaction, in a case where the
1145  * server crashes before clearing the subskiplsn, it will be left but the
1146  * transaction won't be resent. But that's okay because it's a rare case
1147  * and the subskiplsn will be cleared when finishing the next transaction.
1148  */
1151 
1154 }
1155 
1156 /*
1157  * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1158  *
1159  * Note that we don't need to wait here if the transaction was prepared in a
1160  * parallel apply worker. In that case, we have already waited for the prepare
1161  * to finish in apply_handle_stream_prepare() which will ensure all the
1162  * operations in that transaction have happened in the subscriber, so no
1163  * concurrent transaction can cause deadlock or transaction dependency issues.
1164  */
1165 static void
1167 {
1168  LogicalRepCommitPreparedTxnData prepare_data;
1169  char gid[GIDSIZE];
1170 
1171  logicalrep_read_commit_prepared(s, &prepare_data);
1172  set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1173 
1174  /* Compute GID for two_phase transactions. */
1176  gid, sizeof(gid));
1177 
1178  /* There is no transaction when COMMIT PREPARED is called */
1180 
1181  /*
1182  * Update origin state so we can restart streaming from correct position
1183  * in case of crash.
1184  */
1185  replorigin_session_origin_lsn = prepare_data.end_lsn;
1187 
1188  FinishPreparedTransaction(gid, true);
1191  pgstat_report_stat(false);
1192 
1194  in_remote_transaction = false;
1195 
1196  /* Process any tables that are being synchronized in parallel. */
1197  process_syncing_tables(prepare_data.end_lsn);
1198 
1199  clear_subscription_skip_lsn(prepare_data.end_lsn);
1200 
1203 }
1204 
1205 /*
1206  * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1207  *
1208  * Note that we don't need to wait here if the transaction was prepared in a
1209  * parallel apply worker. In that case, we have already waited for the prepare
1210  * to finish in apply_handle_stream_prepare() which will ensure all the
1211  * operations in that transaction have happened in the subscriber, so no
1212  * concurrent transaction can cause deadlock or transaction dependency issues.
1213  */
1214 static void
1216 {
1217  LogicalRepRollbackPreparedTxnData rollback_data;
1218  char gid[GIDSIZE];
1219 
1220  logicalrep_read_rollback_prepared(s, &rollback_data);
1221  set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1222 
1223  /* Compute GID for two_phase transactions. */
1224  TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1225  gid, sizeof(gid));
1226 
1227  /*
1228  * It is possible that we haven't received prepare because it occurred
1229  * before walsender reached a consistent point or the two_phase was still
1230  * not enabled by that time, so in such cases, we need to skip rollback
1231  * prepared.
1232  */
1233  if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1234  rollback_data.prepare_time))
1235  {
1236  /*
1237  * Update origin state so we can restart streaming from correct
1238  * position in case of crash.
1239  */
1242 
1243  /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1245  FinishPreparedTransaction(gid, false);
1248 
1250  }
1251 
1252  pgstat_report_stat(false);
1253 
1255  in_remote_transaction = false;
1256 
1257  /* Process any tables that are being synchronized in parallel. */
1259 
1262 }
1263 
1264 /*
1265  * Handle STREAM PREPARE.
1266  */
1267 static void
1269 {
1270  LogicalRepPreparedTxnData prepare_data;
1271  ParallelApplyWorkerInfo *winfo;
1272  TransApplyAction apply_action;
1273 
1274  /* Save the message before it is consumed. */
1275  StringInfoData original_msg = *s;
1276 
1278  ereport(ERROR,
1279  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1280  errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1281 
1282  /* Tablesync should never receive prepare. */
1283  if (am_tablesync_worker())
1284  ereport(ERROR,
1285  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1286  errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1287 
1288  logicalrep_read_stream_prepare(s, &prepare_data);
1289  set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1290 
1291  apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1292 
1293  switch (apply_action)
1294  {
1295  case TRANS_LEADER_APPLY:
1296 
1297  /*
1298  * The transaction has been serialized to file, so replay all the
1299  * spooled operations.
1300  */
1302  prepare_data.xid, prepare_data.prepare_lsn);
1303 
1304  /* Mark the transaction as prepared. */
1305  apply_handle_prepare_internal(&prepare_data);
1306 
1308 
1310 
1311  in_remote_transaction = false;
1312 
1313  /* Unlink the files with serialized changes and subxact info. */
1315 
1316  elog(DEBUG1, "finished processing the STREAM PREPARE command");
1317  break;
1318 
1320  Assert(winfo);
1321 
1322  if (pa_send_data(winfo, s->len, s->data))
1323  {
1324  /* Finish processing the streaming transaction. */
1325  pa_xact_finish(winfo, prepare_data.end_lsn);
1326  break;
1327  }
1328 
1329  /*
1330  * Switch to serialize mode when we are not able to send the
1331  * change to parallel apply worker.
1332  */
1333  pa_switch_to_partial_serialize(winfo, true);
1334 
1335  /* fall through */
1337  Assert(winfo);
1338 
1339  stream_open_and_write_change(prepare_data.xid,
1341  &original_msg);
1342 
1344 
1345  /* Finish processing the streaming transaction. */
1346  pa_xact_finish(winfo, prepare_data.end_lsn);
1347  break;
1348 
1349  case TRANS_PARALLEL_APPLY:
1350 
1351  /*
1352  * If the parallel apply worker is applying spooled messages then
1353  * close the file before preparing.
1354  */
1355  if (stream_fd)
1357 
1359 
1360  /* Mark the transaction as prepared. */
1361  apply_handle_prepare_internal(&prepare_data);
1362 
1364 
1366 
1368 
1371 
1373 
1374  elog(DEBUG1, "finished processing the STREAM PREPARE command");
1375  break;
1376 
1377  default:
1378  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1379  break;
1380  }
1381 
1382  pgstat_report_stat(false);
1383 
1384  /* Process any tables that are being synchronized in parallel. */
1385  process_syncing_tables(prepare_data.end_lsn);
1386 
1387  /*
1388  * Similar to prepare case, the subskiplsn could be left in a case of
1389  * server crash but it's okay. See the comments in apply_handle_prepare().
1390  */
1393 
1395 
1397 }
1398 
1399 /*
1400  * Handle ORIGIN message.
1401  *
1402  * TODO, support tracking of multiple origins
1403  */
1404 static void
1406 {
1407  /*
1408  * ORIGIN message can only come inside streaming transaction or inside
1409  * remote transaction and before any actual writes.
1410  */
1411  if (!in_streamed_transaction &&
1414  ereport(ERROR,
1415  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1416  errmsg_internal("ORIGIN message sent out of order")));
1417 }
1418 
1419 /*
1420  * Initialize fileset (if not already done).
1421  *
1422  * Create a new file when first_segment is true, otherwise open the existing
1423  * file.
1424  */
1425 void
1426 stream_start_internal(TransactionId xid, bool first_segment)
1427 {
1429 
1430  /*
1431  * Initialize the worker's stream_fileset if we haven't yet. This will be
1432  * used for the entire duration of the worker so create it in a permanent
1433  * context. We create this on the very first streaming message from any
1434  * transaction and then use it for this and other streaming transactions.
1435  * Now, we could create a fileset at the start of the worker as well but
1436  * then we won't be sure that it will ever be used.
1437  */
1439  {
1440  MemoryContext oldctx;
1441 
1443 
1446 
1447  MemoryContextSwitchTo(oldctx);
1448  }
1449 
1450  /* Open the spool file for this transaction. */
1451  stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1452 
1453  /* If this is not the first segment, open existing subxact file. */
1454  if (!first_segment)
1456 
1458 }
1459 
1460 /*
1461  * Handle STREAM START message.
1462  */
1463 static void
1465 {
1466  bool first_segment;
1467  ParallelApplyWorkerInfo *winfo;
1468  TransApplyAction apply_action;
1469 
1470  /* Save the message before it is consumed. */
1471  StringInfoData original_msg = *s;
1472 
1474  ereport(ERROR,
1475  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1476  errmsg_internal("duplicate STREAM START message")));
1477 
1478  /* There must not be an active streaming transaction. */
1480 
1481  /* notify handle methods we're processing a remote transaction */
1482  in_streamed_transaction = true;
1483 
1484  /* extract XID of the top-level transaction */
1485  stream_xid = logicalrep_read_stream_start(s, &first_segment);
1486 
1488  ereport(ERROR,
1489  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1490  errmsg_internal("invalid transaction ID in streamed replication transaction")));
1491 
1493 
1494  /* Try to allocate a worker for the streaming transaction. */
1495  if (first_segment)
1497 
1498  apply_action = get_transaction_apply_action(stream_xid, &winfo);
1499 
1500  switch (apply_action)
1501  {
1503 
1504  /*
1505  * Function stream_start_internal starts a transaction. This
1506  * transaction will be committed on the stream stop unless it is a
1507  * tablesync worker in which case it will be committed after
1508  * processing all the messages. We need this transaction for
1509  * handling the BufFile, used for serializing the streaming data
1510  * and subxact info.
1511  */
1512  stream_start_internal(stream_xid, first_segment);
1513  break;
1514 
1516  Assert(winfo);
1517 
1518  /*
1519  * Once we start serializing the changes, the parallel apply
1520  * worker will wait for the leader to release the stream lock
1521  * until the end of the transaction. So, we don't need to release
1522  * the lock or increment the stream count in that case.
1523  */
1524  if (pa_send_data(winfo, s->len, s->data))
1525  {
1526  /*
1527  * Unlock the shared object lock so that the parallel apply
1528  * worker can continue to receive changes.
1529  */
1530  if (!first_segment)
1532 
1533  /*
1534  * Increment the number of streaming blocks waiting to be
1535  * processed by parallel apply worker.
1536  */
1538 
1539  /* Cache the parallel apply worker for this transaction. */
1541  break;
1542  }
1543 
1544  /*
1545  * Switch to serialize mode when we are not able to send the
1546  * change to parallel apply worker.
1547  */
1548  pa_switch_to_partial_serialize(winfo, !first_segment);
1549 
1550  /* fall through */
1552  Assert(winfo);
1553 
1554  /*
1555  * Open the spool file unless it was already opened when switching
1556  * to serialize mode. The transaction started in
1557  * stream_start_internal will be committed on the stream stop.
1558  */
1559  if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1560  stream_start_internal(stream_xid, first_segment);
1561 
1563 
1564  /* Cache the parallel apply worker for this transaction. */
1566  break;
1567 
1568  case TRANS_PARALLEL_APPLY:
1569  if (first_segment)
1570  {
1571  /* Hold the lock until the end of the transaction. */
1574 
1575  /*
1576  * Signal the leader apply worker, as it may be waiting for
1577  * us.
1578  */
1580  }
1581 
1583  break;
1584 
1585  default:
1586  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1587  break;
1588  }
1589 
1591 }
1592 
1593 /*
1594  * Update the information about subxacts and close the file.
1595  *
1596  * This function should be called when the stream_start_internal function has
1597  * been called.
1598  */
1599 void
1601 {
1602  /*
1603  * Serialize information about subxacts for the toplevel transaction, then
1604  * close the stream messages spool file.
1605  */
1608 
1609  /* We must be in a valid transaction state */
1611 
1612  /* Commit the per-stream transaction */
1614 
1615  /* Reset per-stream context */
1617 }
1618 
1619 /*
1620  * Handle STREAM STOP message.
1621  */
1622 static void
1624 {
1625  ParallelApplyWorkerInfo *winfo;
1626  TransApplyAction apply_action;
1627 
1629  ereport(ERROR,
1630  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1631  errmsg_internal("STREAM STOP message without STREAM START")));
1632 
1633  apply_action = get_transaction_apply_action(stream_xid, &winfo);
1634 
1635  switch (apply_action)
1636  {
1639  break;
1640 
1642  Assert(winfo);
1643 
1644  /*
1645  * Lock before sending the STREAM_STOP message so that the leader
1646  * can hold the lock first and the parallel apply worker will wait
1647  * for leader to release the lock. See Locking Considerations atop
1648  * applyparallelworker.c.
1649  */
1651 
1652  if (pa_send_data(winfo, s->len, s->data))
1653  {
1655  break;
1656  }
1657 
1658  /*
1659  * Switch to serialize mode when we are not able to send the
1660  * change to parallel apply worker.
1661  */
1662  pa_switch_to_partial_serialize(winfo, true);
1663 
1664  /* fall through */
1669  break;
1670 
1671  case TRANS_PARALLEL_APPLY:
1672  elog(DEBUG1, "applied %u changes in the streaming chunk",
1674 
1675  /*
1676  * By the time parallel apply worker is processing the changes in
1677  * the current streaming block, the leader apply worker may have
1678  * sent multiple streaming blocks. This can lead to parallel apply
1679  * worker start waiting even when there are more chunk of streams
1680  * in the queue. So, try to lock only if there is no message left
1681  * in the queue. See Locking Considerations atop
1682  * applyparallelworker.c.
1683  *
1684  * Note that here we have a race condition where we can start
1685  * waiting even when there are pending streaming chunks. This can
1686  * happen if the leader sends another streaming block and acquires
1687  * the stream lock again after the parallel apply worker checks
1688  * that there is no pending streaming block and before it actually
1689  * starts waiting on a lock. We can handle this case by not
1690  * allowing the leader to increment the stream block count during
1691  * the time parallel apply worker acquires the lock but it is not
1692  * clear whether that is worth the complexity.
1693  *
1694  * Now, if this missed chunk contains rollback to savepoint, then
1695  * there is a risk of deadlock which probably shouldn't happen
1696  * after restart.
1697  */
1699  break;
1700 
1701  default:
1702  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1703  break;
1704  }
1705 
1706  in_streamed_transaction = false;
1708 
1709  /*
1710  * The parallel apply worker could be in a transaction in which case we
1711  * need to report the state as STATE_IDLEINTRANSACTION.
1712  */
1715  else
1717 
1719 }
1720 
1721 /*
1722  * Helper function to handle STREAM ABORT message when the transaction was
1723  * serialized to file.
1724  */
1725 static void
1727 {
1728  /*
1729  * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1730  * just delete the files with serialized info.
1731  */
1732  if (xid == subxid)
1734  else
1735  {
1736  /*
1737  * OK, so it's a subxact. We need to read the subxact file for the
1738  * toplevel transaction, determine the offset tracked for the subxact,
1739  * and truncate the file with changes. We also remove the subxacts
1740  * with higher offsets (or rather higher XIDs).
1741  *
1742  * We intentionally scan the array from the tail, because we're likely
1743  * aborting a change for the most recent subtransactions.
1744  *
1745  * We can't use the binary search here as subxact XIDs won't
1746  * necessarily arrive in sorted order, consider the case where we have
1747  * released the savepoint for multiple subtransactions and then
1748  * performed rollback to savepoint for one of the earlier
1749  * sub-transaction.
1750  */
1751  int64 i;
1752  int64 subidx;
1753  BufFile *fd;
1754  bool found = false;
1755  char path[MAXPGPATH];
1756 
1757  subidx = -1;
1760 
1761  for (i = subxact_data.nsubxacts; i > 0; i--)
1762  {
1763  if (subxact_data.subxacts[i - 1].xid == subxid)
1764  {
1765  subidx = (i - 1);
1766  found = true;
1767  break;
1768  }
1769  }
1770 
1771  /*
1772  * If it's an empty sub-transaction then we will not find the subxid
1773  * here so just cleanup the subxact info and return.
1774  */
1775  if (!found)
1776  {
1777  /* Cleanup the subxact info */
1781  return;
1782  }
1783 
1784  /* open the changes file */
1787  O_RDWR, false);
1788 
1789  /* OK, truncate the file at the right offset */
1791  subxact_data.subxacts[subidx].offset);
1792  BufFileClose(fd);
1793 
1794  /* discard the subxacts added later */
1795  subxact_data.nsubxacts = subidx;
1796 
1797  /* write the updated subxact list */
1799 
1802  }
1803 }
1804 
1805 /*
1806  * Handle STREAM ABORT message.
1807  */
1808 static void
1810 {
1811  TransactionId xid;
1812  TransactionId subxid;
1813  LogicalRepStreamAbortData abort_data;
1814  ParallelApplyWorkerInfo *winfo;
1815  TransApplyAction apply_action;
1816 
1817  /* Save the message before it is consumed. */
1818  StringInfoData original_msg = *s;
1819  bool toplevel_xact;
1820 
1822  ereport(ERROR,
1823  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1824  errmsg_internal("STREAM ABORT message without STREAM STOP")));
1825 
1826  /* We receive abort information only when we can apply in parallel. */
1827  logicalrep_read_stream_abort(s, &abort_data,
1829 
1830  xid = abort_data.xid;
1831  subxid = abort_data.subxid;
1832  toplevel_xact = (xid == subxid);
1833 
1834  set_apply_error_context_xact(subxid, abort_data.abort_lsn);
1835 
1836  apply_action = get_transaction_apply_action(xid, &winfo);
1837 
1838  switch (apply_action)
1839  {
1840  case TRANS_LEADER_APPLY:
1841 
1842  /*
1843  * We are in the leader apply worker and the transaction has been
1844  * serialized to file.
1845  */
1846  stream_abort_internal(xid, subxid);
1847 
1848  elog(DEBUG1, "finished processing the STREAM ABORT command");
1849  break;
1850 
1852  Assert(winfo);
1853 
1854  /*
1855  * For the case of aborting the subtransaction, we increment the
1856  * number of streaming blocks and take the lock again before
1857  * sending the STREAM_ABORT to ensure that the parallel apply
1858  * worker will wait on the lock for the next set of changes after
1859  * processing the STREAM_ABORT message if it is not already
1860  * waiting for STREAM_STOP message.
1861  *
1862  * It is important to perform this locking before sending the
1863  * STREAM_ABORT message so that the leader can hold the lock first
1864  * and the parallel apply worker will wait for the leader to
1865  * release the lock. This is the same as what we do in
1866  * apply_handle_stream_stop. See Locking Considerations atop
1867  * applyparallelworker.c.
1868  */
1869  if (!toplevel_xact)
1870  {
1874  }
1875 
1876  if (pa_send_data(winfo, s->len, s->data))
1877  {
1878  /*
1879  * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
1880  * wait here for the parallel apply worker to finish as that
1881  * is not required to maintain the commit order and won't have
1882  * the risk of failures due to transaction dependencies and
1883  * deadlocks. However, it is possible that before the parallel
1884  * worker finishes and we clear the worker info, the xid
1885  * wraparound happens on the upstream and a new transaction
1886  * with the same xid can appear and that can lead to duplicate
1887  * entries in ParallelApplyTxnHash. Yet another problem could
1888  * be that we may have serialized the changes in partial
1889  * serialize mode and the file containing xact changes may
1890  * already exist, and after xid wraparound trying to create
1891  * the file for the same xid can lead to an error. To avoid
1892  * these problems, we decide to wait for the aborts to finish.
1893  *
1894  * Note, it is okay to not update the flush location position
1895  * for aborts as in worst case that means such a transaction
1896  * won't be sent again after restart.
1897  */
1898  if (toplevel_xact)
1900 
1901  break;
1902  }
1903 
1904  /*
1905  * Switch to serialize mode when we are not able to send the
1906  * change to parallel apply worker.
1907  */
1908  pa_switch_to_partial_serialize(winfo, true);
1909 
1910  /* fall through */
1912  Assert(winfo);
1913 
1914  /*
1915  * Parallel apply worker might have applied some changes, so write
1916  * the STREAM_ABORT message so that it can rollback the
1917  * subtransaction if needed.
1918  */
1920  &original_msg);
1921 
1922  if (toplevel_xact)
1923  {
1926  }
1927  break;
1928 
1929  case TRANS_PARALLEL_APPLY:
1930 
1931  /*
1932  * If the parallel apply worker is applying spooled messages then
1933  * close the file before aborting.
1934  */
1935  if (toplevel_xact && stream_fd)
1937 
1938  pa_stream_abort(&abort_data);
1939 
1940  /*
1941  * We need to wait after processing rollback to savepoint for the
1942  * next set of changes.
1943  *
1944  * We have a race condition here due to which we can start waiting
1945  * here when there are more chunk of streams in the queue. See
1946  * apply_handle_stream_stop.
1947  */
1948  if (!toplevel_xact)
1950 
1951  elog(DEBUG1, "finished processing the STREAM ABORT command");
1952  break;
1953 
1954  default:
1955  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1956  break;
1957  }
1958 
1960 }
1961 
1962 /*
1963  * Ensure that the passed location is fileset's end.
1964  */
1965 static void
1966 ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
1967  off_t offset)
1968 {
1969  char path[MAXPGPATH];
1970  BufFile *fd;
1971  int last_fileno;
1972  off_t last_offset;
1973 
1975 
1977 
1979 
1980  fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
1981 
1982  BufFileSeek(fd, 0, 0, SEEK_END);
1983  BufFileTell(fd, &last_fileno, &last_offset);
1984 
1985  BufFileClose(fd);
1986 
1988 
1989  if (last_fileno != fileno || last_offset != offset)
1990  elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
1991  path);
1992 }
1993 
1994 /*
1995  * Common spoolfile processing.
1996  */
1997 void
1999  XLogRecPtr lsn)
2000 {
2001  int nchanges;
2002  char path[MAXPGPATH];
2003  char *buffer = NULL;
2004  MemoryContext oldcxt;
2005  ResourceOwner oldowner;
2006  int fileno;
2007  off_t offset;
2008 
2009  if (!am_parallel_apply_worker())
2011 
2012  /* Make sure we have an open transaction */
2014 
2015  /*
2016  * Allocate file handle and memory required to process all the messages in
2017  * TopTransactionContext to avoid them getting reset after each message is
2018  * processed.
2019  */
2021 
2022  /* Open the spool file for the committed/prepared transaction */
2024  elog(DEBUG1, "replaying changes from file \"%s\"", path);
2025 
2026  /*
2027  * Make sure the file is owned by the toplevel transaction so that the
2028  * file will not be accidentally closed when aborting a subtransaction.
2029  */
2030  oldowner = CurrentResourceOwner;
2032 
2033  stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2034 
2035  CurrentResourceOwner = oldowner;
2036 
2037  buffer = palloc(BLCKSZ);
2038 
2039  MemoryContextSwitchTo(oldcxt);
2040 
2041  remote_final_lsn = lsn;
2042 
2043  /*
2044  * Make sure the handle apply_dispatch methods are aware we're in a remote
2045  * transaction.
2046  */
2047  in_remote_transaction = true;
2049 
2051 
2052  /*
2053  * Read the entries one by one and pass them through the same logic as in
2054  * apply_dispatch.
2055  */
2056  nchanges = 0;
2057  while (true)
2058  {
2060  size_t nbytes;
2061  int len;
2062 
2064 
2065  /* read length of the on-disk record */
2066  nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2067 
2068  /* have we reached end of the file? */
2069  if (nbytes == 0)
2070  break;
2071 
2072  /* do we have a correct length? */
2073  if (len <= 0)
2074  elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2075  len, path);
2076 
2077  /* make sure we have sufficiently large buffer */
2078  buffer = repalloc(buffer, len);
2079 
2080  /* and finally read the data into the buffer */
2081  BufFileReadExact(stream_fd, buffer, len);
2082 
2083  BufFileTell(stream_fd, &fileno, &offset);
2084 
2085  /* init a stringinfo using the buffer and call apply_dispatch */
2086  initReadOnlyStringInfo(&s2, buffer, len);
2087 
2088  /* Ensure we are reading the data into our memory context. */
2090 
2091  apply_dispatch(&s2);
2092 
2094 
2095  MemoryContextSwitchTo(oldcxt);
2096 
2097  nchanges++;
2098 
2099  /*
2100  * It is possible the file has been closed because we have processed
2101  * the transaction end message like stream_commit in which case that
2102  * must be the last message.
2103  */
2104  if (!stream_fd)
2105  {
2106  ensure_last_message(stream_fileset, xid, fileno, offset);
2107  break;
2108  }
2109 
2110  if (nchanges % 1000 == 0)
2111  elog(DEBUG1, "replayed %d changes from file \"%s\"",
2112  nchanges, path);
2113  }
2114 
2115  if (stream_fd)
2117 
2118  elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2119  nchanges, path);
2120 
2121  return;
2122 }
2123 
2124 /*
2125  * Handle STREAM COMMIT message.
2126  */
2127 static void
2129 {
2130  TransactionId xid;
2131  LogicalRepCommitData commit_data;
2132  ParallelApplyWorkerInfo *winfo;
2133  TransApplyAction apply_action;
2134 
2135  /* Save the message before it is consumed. */
2136  StringInfoData original_msg = *s;
2137 
2139  ereport(ERROR,
2140  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2141  errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2142 
2143  xid = logicalrep_read_stream_commit(s, &commit_data);
2144  set_apply_error_context_xact(xid, commit_data.commit_lsn);
2145 
2146  apply_action = get_transaction_apply_action(xid, &winfo);
2147 
2148  switch (apply_action)
2149  {
2150  case TRANS_LEADER_APPLY:
2151 
2152  /*
2153  * The transaction has been serialized to file, so replay all the
2154  * spooled operations.
2155  */
2157  commit_data.commit_lsn);
2158 
2159  apply_handle_commit_internal(&commit_data);
2160 
2161  /* Unlink the files with serialized changes and subxact info. */
2163 
2164  elog(DEBUG1, "finished processing the STREAM COMMIT command");
2165  break;
2166 
2168  Assert(winfo);
2169 
2170  if (pa_send_data(winfo, s->len, s->data))
2171  {
2172  /* Finish processing the streaming transaction. */
2173  pa_xact_finish(winfo, commit_data.end_lsn);
2174  break;
2175  }
2176 
2177  /*
2178  * Switch to serialize mode when we are not able to send the
2179  * change to parallel apply worker.
2180  */
2181  pa_switch_to_partial_serialize(winfo, true);
2182 
2183  /* fall through */
2185  Assert(winfo);
2186 
2188  &original_msg);
2189 
2191 
2192  /* Finish processing the streaming transaction. */
2193  pa_xact_finish(winfo, commit_data.end_lsn);
2194  break;
2195 
2196  case TRANS_PARALLEL_APPLY:
2197 
2198  /*
2199  * If the parallel apply worker is applying spooled messages then
2200  * close the file before committing.
2201  */
2202  if (stream_fd)
2204 
2205  apply_handle_commit_internal(&commit_data);
2206 
2208 
2209  /*
2210  * It is important to set the transaction state as finished before
2211  * releasing the lock. See pa_wait_for_xact_finish.
2212  */
2215 
2217 
2218  elog(DEBUG1, "finished processing the STREAM COMMIT command");
2219  break;
2220 
2221  default:
2222  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2223  break;
2224  }
2225 
2226  /* Process any tables that are being synchronized in parallel. */
2227  process_syncing_tables(commit_data.end_lsn);
2228 
2230 
2232 }
2233 
2234 /*
2235  * Helper function for apply_handle_commit and apply_handle_stream_commit.
2236  */
2237 static void
2239 {
2240  if (is_skipping_changes())
2241  {
2243 
2244  /*
2245  * Start a new transaction to clear the subskiplsn, if not started
2246  * yet.
2247  */
2248  if (!IsTransactionState())
2250  }
2251 
2252  if (IsTransactionState())
2253  {
2254  /*
2255  * The transaction is either non-empty or skipped, so we clear the
2256  * subskiplsn.
2257  */
2259 
2260  /*
2261  * Update origin state so we can restart streaming from correct
2262  * position in case of crash.
2263  */
2264  replorigin_session_origin_lsn = commit_data->end_lsn;
2266 
2268 
2269  if (IsTransactionBlock())
2270  {
2271  EndTransactionBlock(false);
2273  }
2274 
2275  pgstat_report_stat(false);
2276 
2278  }
2279  else
2280  {
2281  /* Process any invalidation messages that might have accumulated. */
2284  }
2285 
2286  in_remote_transaction = false;
2287 }
2288 
2289 /*
2290  * Handle RELATION message.
2291  *
2292  * Note we don't do validation against local schema here. The validation
2293  * against local schema is postponed until first change for given relation
2294  * comes as we only care about it when applying changes for it anyway and we
2295  * do less locking this way.
2296  */
2297 static void
2299 {
2300  LogicalRepRelation *rel;
2301 
2303  return;
2304 
2305  rel = logicalrep_read_rel(s);
2307 
2308  /* Also reset all entries in the partition map that refer to remoterel. */
2310 }
2311 
2312 /*
2313  * Handle TYPE message.
2314  *
2315  * This implementation pays no attention to TYPE messages; we expect the user
2316  * to have set things up so that the incoming data is acceptable to the input
2317  * functions for the locally subscribed tables. Hence, we just read and
2318  * discard the message.
2319  */
2320 static void
2322 {
2323  LogicalRepTyp typ;
2324 
2326  return;
2327 
2328  logicalrep_read_typ(s, &typ);
2329 }
2330 
2331 /*
2332  * Check that we (the subscription owner) have sufficient privileges on the
2333  * target relation to perform the given operation.
2334  */
2335 static void
2337 {
2338  Oid relid;
2339  AclResult aclresult;
2340 
2341  relid = RelationGetRelid(rel);
2342  aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2343  if (aclresult != ACLCHECK_OK)
2344  aclcheck_error(aclresult,
2345  get_relkind_objtype(rel->rd_rel->relkind),
2346  get_rel_name(relid));
2347 
2348  /*
2349  * We lack the infrastructure to honor RLS policies. It might be possible
2350  * to add such infrastructure here, but tablesync workers lack it, too, so
2351  * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2352  * but it seems dangerous to replicate a TRUNCATE and then refuse to
2353  * replicate subsequent INSERTs, so we forbid all commands the same.
2354  */
2355  if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2356  ereport(ERROR,
2357  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2358  errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2359  GetUserNameFromId(GetUserId(), true),
2360  RelationGetRelationName(rel))));
2361 }
2362 
2363 /*
2364  * Handle INSERT message.
2365  */
2366 
2367 static void
2369 {
2370  LogicalRepRelMapEntry *rel;
2371  LogicalRepTupleData newtup;
2372  LogicalRepRelId relid;
2373  UserContext ucxt;
2374  ApplyExecutionData *edata;
2375  EState *estate;
2376  TupleTableSlot *remoteslot;
2377  MemoryContext oldctx;
2378  bool run_as_owner;
2379 
2380  /*
2381  * Quick return if we are skipping data modification changes or handling
2382  * streamed transactions.
2383  */
2384  if (is_skipping_changes() ||
2386  return;
2387 
2389 
2390  relid = logicalrep_read_insert(s, &newtup);
2391  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2392  if (!should_apply_changes_for_rel(rel))
2393  {
2394  /*
2395  * The relation can't become interesting in the middle of the
2396  * transaction so it's safe to unlock it.
2397  */
2400  return;
2401  }
2402 
2403  /*
2404  * Make sure that any user-supplied code runs as the table owner, unless
2405  * the user has opted out of that behavior.
2406  */
2407  run_as_owner = MySubscription->runasowner;
2408  if (!run_as_owner)
2409  SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2410 
2411  /* Set relation for error callback */
2413 
2414  /* Initialize the executor state. */
2415  edata = create_edata_for_relation(rel);
2416  estate = edata->estate;
2417  remoteslot = ExecInitExtraTupleSlot(estate,
2418  RelationGetDescr(rel->localrel),
2419  &TTSOpsVirtual);
2420 
2421  /* Process and store remote tuple in the slot */
2423  slot_store_data(remoteslot, rel, &newtup);
2424  slot_fill_defaults(rel, estate, remoteslot);
2425  MemoryContextSwitchTo(oldctx);
2426 
2427  /* For a partitioned table, insert the tuple into a partition. */
2428  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2430  remoteslot, NULL, CMD_INSERT);
2431  else
2433  remoteslot);
2434 
2435  finish_edata(edata);
2436 
2437  /* Reset relation for error callback */
2439 
2440  if (!run_as_owner)
2441  RestoreUserContext(&ucxt);
2442 
2444 
2446 }
2447 
2448 /*
2449  * Workhorse for apply_handle_insert()
2450  * relinfo is for the relation we're actually inserting into
2451  * (could be a child partition of edata->targetRelInfo)
2452  */
2453 static void
2455  ResultRelInfo *relinfo,
2456  TupleTableSlot *remoteslot)
2457 {
2458  EState *estate = edata->estate;
2459 
2460  /* We must open indexes here. */
2461  ExecOpenIndices(relinfo, false);
2462 
2463  /* Do the insert. */
2465  ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2466 
2467  /* Cleanup. */
2468  ExecCloseIndices(relinfo);
2469 }
2470 
2471 /*
2472  * Check if the logical replication relation is updatable and throw
2473  * appropriate error if it isn't.
2474  */
2475 static void
2477 {
2478  /*
2479  * For partitioned tables, we only need to care if the target partition is
2480  * updatable (aka has PK or RI defined for it).
2481  */
2482  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2483  return;
2484 
2485  /* Updatable, no error. */
2486  if (rel->updatable)
2487  return;
2488 
2489  /*
2490  * We are in error mode so it's fine this is somewhat slow. It's better to
2491  * give user correct error.
2492  */
2494  {
2495  ereport(ERROR,
2496  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2497  errmsg("publisher did not send replica identity column "
2498  "expected by the logical replication target relation \"%s.%s\"",
2499  rel->remoterel.nspname, rel->remoterel.relname)));
2500  }
2501 
2502  ereport(ERROR,
2503  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2504  errmsg("logical replication target relation \"%s.%s\" has "
2505  "neither REPLICA IDENTITY index nor PRIMARY "
2506  "KEY and published relation does not have "
2507  "REPLICA IDENTITY FULL",
2508  rel->remoterel.nspname, rel->remoterel.relname)));
2509 }
2510 
2511 /*
2512  * Handle UPDATE message.
2513  *
2514  * TODO: FDW support
2515  */
2516 static void
2518 {
2519  LogicalRepRelMapEntry *rel;
2520  LogicalRepRelId relid;
2521  UserContext ucxt;
2522  ApplyExecutionData *edata;
2523  EState *estate;
2524  LogicalRepTupleData oldtup;
2525  LogicalRepTupleData newtup;
2526  bool has_oldtup;
2527  TupleTableSlot *remoteslot;
2528  RTEPermissionInfo *target_perminfo;
2529  MemoryContext oldctx;
2530  bool run_as_owner;
2531 
2532  /*
2533  * Quick return if we are skipping data modification changes or handling
2534  * streamed transactions.
2535  */
2536  if (is_skipping_changes() ||
2538  return;
2539 
2541 
2542  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2543  &newtup);
2544  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2545  if (!should_apply_changes_for_rel(rel))
2546  {
2547  /*
2548  * The relation can't become interesting in the middle of the
2549  * transaction so it's safe to unlock it.
2550  */
2553  return;
2554  }
2555 
2556  /* Set relation for error callback */
2558 
2559  /* Check if we can do the update. */
2561 
2562  /*
2563  * Make sure that any user-supplied code runs as the table owner, unless
2564  * the user has opted out of that behavior.
2565  */
2566  run_as_owner = MySubscription->runasowner;
2567  if (!run_as_owner)
2568  SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2569 
2570  /* Initialize the executor state. */
2571  edata = create_edata_for_relation(rel);
2572  estate = edata->estate;
2573  remoteslot = ExecInitExtraTupleSlot(estate,
2574  RelationGetDescr(rel->localrel),
2575  &TTSOpsVirtual);
2576 
2577  /*
2578  * Populate updatedCols so that per-column triggers can fire, and so
2579  * executor can correctly pass down indexUnchanged hint. This could
2580  * include more columns than were actually changed on the publisher
2581  * because the logical replication protocol doesn't contain that
2582  * information. But it would for example exclude columns that only exist
2583  * on the subscriber, since we are not touching those.
2584  */
2585  target_perminfo = list_nth(estate->es_rteperminfos, 0);
2586  for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2587  {
2589  int remoteattnum = rel->attrmap->attnums[i];
2590 
2591  if (!att->attisdropped && remoteattnum >= 0)
2592  {
2593  Assert(remoteattnum < newtup.ncols);
2594  if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2595  target_perminfo->updatedCols =
2596  bms_add_member(target_perminfo->updatedCols,
2598  }
2599  }
2600 
2601  /* Build the search tuple. */
2603  slot_store_data(remoteslot, rel,
2604  has_oldtup ? &oldtup : &newtup);
2605  MemoryContextSwitchTo(oldctx);
2606 
2607  /* For a partitioned table, apply update to correct partition. */
2608  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2610  remoteslot, &newtup, CMD_UPDATE);
2611  else
2613  remoteslot, &newtup, rel->localindexoid);
2614 
2615  finish_edata(edata);
2616 
2617  /* Reset relation for error callback */
2619 
2620  if (!run_as_owner)
2621  RestoreUserContext(&ucxt);
2622 
2624 
2626 }
2627 
2628 /*
2629  * Workhorse for apply_handle_update()
2630  * relinfo is for the relation we're actually updating in
2631  * (could be a child partition of edata->targetRelInfo)
2632  */
2633 static void
2635  ResultRelInfo *relinfo,
2636  TupleTableSlot *remoteslot,
2637  LogicalRepTupleData *newtup,
2638  Oid localindexoid)
2639 {
2640  EState *estate = edata->estate;
2641  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2642  Relation localrel = relinfo->ri_RelationDesc;
2643  EPQState epqstate;
2644  TupleTableSlot *localslot;
2645  bool found;
2646  MemoryContext oldctx;
2647 
2648  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2649  ExecOpenIndices(relinfo, false);
2650 
2651  found = FindReplTupleInLocalRel(edata, localrel,
2652  &relmapentry->remoterel,
2653  localindexoid,
2654  remoteslot, &localslot);
2655  ExecClearTuple(remoteslot);
2656 
2657  /*
2658  * Tuple found.
2659  *
2660  * Note this will fail if there are other conflicting unique indexes.
2661  */
2662  if (found)
2663  {
2664  /* Process and store remote tuple in the slot */
2666  slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2667  MemoryContextSwitchTo(oldctx);
2668 
2669  EvalPlanQualSetSlot(&epqstate, remoteslot);
2670 
2671  /* Do the actual update. */
2673  ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2674  remoteslot);
2675  }
2676  else
2677  {
2678  /*
2679  * The tuple to be updated could not be found. Do nothing except for
2680  * emitting a log message.
2681  *
2682  * XXX should this be promoted to ereport(LOG) perhaps?
2683  */
2684  elog(DEBUG1,
2685  "logical replication did not find row to be updated "
2686  "in replication target relation \"%s\"",
2687  RelationGetRelationName(localrel));
2688  }
2689 
2690  /* Cleanup. */
2691  ExecCloseIndices(relinfo);
2692  EvalPlanQualEnd(&epqstate);
2693 }
2694 
2695 /*
2696  * Handle DELETE message.
2697  *
2698  * TODO: FDW support
2699  */
2700 static void
2702 {
2703  LogicalRepRelMapEntry *rel;
2704  LogicalRepTupleData oldtup;
2705  LogicalRepRelId relid;
2706  UserContext ucxt;
2707  ApplyExecutionData *edata;
2708  EState *estate;
2709  TupleTableSlot *remoteslot;
2710  MemoryContext oldctx;
2711  bool run_as_owner;
2712 
2713  /*
2714  * Quick return if we are skipping data modification changes or handling
2715  * streamed transactions.
2716  */
2717  if (is_skipping_changes() ||
2719  return;
2720 
2722 
2723  relid = logicalrep_read_delete(s, &oldtup);
2724  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2725  if (!should_apply_changes_for_rel(rel))
2726  {
2727  /*
2728  * The relation can't become interesting in the middle of the
2729  * transaction so it's safe to unlock it.
2730  */
2733  return;
2734  }
2735 
2736  /* Set relation for error callback */
2738 
2739  /* Check if we can do the delete. */
2741 
2742  /*
2743  * Make sure that any user-supplied code runs as the table owner, unless
2744  * the user has opted out of that behavior.
2745  */
2746  run_as_owner = MySubscription->runasowner;
2747  if (!run_as_owner)
2748  SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2749 
2750  /* Initialize the executor state. */
2751  edata = create_edata_for_relation(rel);
2752  estate = edata->estate;
2753  remoteslot = ExecInitExtraTupleSlot(estate,
2754  RelationGetDescr(rel->localrel),
2755  &TTSOpsVirtual);
2756 
2757  /* Build the search tuple. */
2759  slot_store_data(remoteslot, rel, &oldtup);
2760  MemoryContextSwitchTo(oldctx);
2761 
2762  /* For a partitioned table, apply delete to correct partition. */
2763  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2765  remoteslot, NULL, CMD_DELETE);
2766  else
2768  remoteslot, rel->localindexoid);
2769 
2770  finish_edata(edata);
2771 
2772  /* Reset relation for error callback */
2774 
2775  if (!run_as_owner)
2776  RestoreUserContext(&ucxt);
2777 
2779 
2781 }
2782 
2783 /*
2784  * Workhorse for apply_handle_delete()
2785  * relinfo is for the relation we're actually deleting from
2786  * (could be a child partition of edata->targetRelInfo)
2787  */
2788 static void
2790  ResultRelInfo *relinfo,
2791  TupleTableSlot *remoteslot,
2792  Oid localindexoid)
2793 {
2794  EState *estate = edata->estate;
2795  Relation localrel = relinfo->ri_RelationDesc;
2796  LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
2797  EPQState epqstate;
2798  TupleTableSlot *localslot;
2799  bool found;
2800 
2801  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2802  ExecOpenIndices(relinfo, false);
2803 
2804  found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
2805  remoteslot, &localslot);
2806 
2807  /* If found delete it. */
2808  if (found)
2809  {
2810  EvalPlanQualSetSlot(&epqstate, localslot);
2811 
2812  /* Do the actual delete. */
2814  ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
2815  }
2816  else
2817  {
2818  /*
2819  * The tuple to be deleted could not be found. Do nothing except for
2820  * emitting a log message.
2821  *
2822  * XXX should this be promoted to ereport(LOG) perhaps?
2823  */
2824  elog(DEBUG1,
2825  "logical replication did not find row to be deleted "
2826  "in replication target relation \"%s\"",
2827  RelationGetRelationName(localrel));
2828  }
2829 
2830  /* Cleanup. */
2831  ExecCloseIndices(relinfo);
2832  EvalPlanQualEnd(&epqstate);
2833 }
2834 
2835 /*
2836  * Try to find a tuple received from the publication side (in 'remoteslot') in
2837  * the corresponding local relation using either replica identity index,
2838  * primary key, index or if needed, sequential scan.
2839  *
2840  * Local tuple, if found, is returned in '*localslot'.
2841  */
2842 static bool
2844  LogicalRepRelation *remoterel,
2845  Oid localidxoid,
2846  TupleTableSlot *remoteslot,
2847  TupleTableSlot **localslot)
2848 {
2849  EState *estate = edata->estate;
2850  bool found;
2851 
2852  /*
2853  * Regardless of the top-level operation, we're performing a read here, so
2854  * check for SELECT privileges.
2855  */
2856  TargetPrivilegesCheck(localrel, ACL_SELECT);
2857 
2858  *localslot = table_slot_create(localrel, &estate->es_tupleTable);
2859 
2860  Assert(OidIsValid(localidxoid) ||
2861  (remoterel->replident == REPLICA_IDENTITY_FULL));
2862 
2863  if (OidIsValid(localidxoid))
2864  {
2865 #ifdef USE_ASSERT_CHECKING
2866  Relation idxrel = index_open(localidxoid, AccessShareLock);
2867 
2868  /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
2869  Assert(GetRelationIdentityOrPK(idxrel) == localidxoid ||
2871  edata->targetRel->attrmap));
2872  index_close(idxrel, AccessShareLock);
2873 #endif
2874 
2875  found = RelationFindReplTupleByIndex(localrel, localidxoid,
2877  remoteslot, *localslot);
2878  }
2879  else
2880  found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
2881  remoteslot, *localslot);
2882 
2883  return found;
2884 }
2885 
2886 /*
2887  * This handles insert, update, delete on a partitioned table.
2888  */
2889 static void
2891  TupleTableSlot *remoteslot,
2892  LogicalRepTupleData *newtup,
2893  CmdType operation)
2894 {
2895  EState *estate = edata->estate;
2896  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2897  ResultRelInfo *relinfo = edata->targetRelInfo;
2898  Relation parentrel = relinfo->ri_RelationDesc;
2899  ModifyTableState *mtstate;
2900  PartitionTupleRouting *proute;
2901  ResultRelInfo *partrelinfo;
2902  Relation partrel;
2903  TupleTableSlot *remoteslot_part;
2904  TupleConversionMap *map;
2905  MemoryContext oldctx;
2906  LogicalRepRelMapEntry *part_entry = NULL;
2907  AttrMap *attrmap = NULL;
2908 
2909  /* ModifyTableState is needed for ExecFindPartition(). */
2910  edata->mtstate = mtstate = makeNode(ModifyTableState);
2911  mtstate->ps.plan = NULL;
2912  mtstate->ps.state = estate;
2913  mtstate->operation = operation;
2914  mtstate->resultRelInfo = relinfo;
2915 
2916  /* ... as is PartitionTupleRouting. */
2917  edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
2918 
2919  /*
2920  * Find the partition to which the "search tuple" belongs.
2921  */
2922  Assert(remoteslot != NULL);
2924  partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
2925  remoteslot, estate);
2926  Assert(partrelinfo != NULL);
2927  partrel = partrelinfo->ri_RelationDesc;
2928 
2929  /*
2930  * Check for supported relkind. We need this since partitions might be of
2931  * unsupported relkinds; and the set of partitions can change, so checking
2932  * at CREATE/ALTER SUBSCRIPTION would be insufficient.
2933  */
2934  CheckSubscriptionRelkind(partrel->rd_rel->relkind,
2936  RelationGetRelationName(partrel));
2937 
2938  /*
2939  * To perform any of the operations below, the tuple must match the
2940  * partition's rowtype. Convert if needed or just copy, using a dedicated
2941  * slot to store the tuple in any case.
2942  */
2943  remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
2944  if (remoteslot_part == NULL)
2945  remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
2946  map = ExecGetRootToChildMap(partrelinfo, estate);
2947  if (map != NULL)
2948  {
2949  attrmap = map->attrMap;
2950  remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
2951  remoteslot_part);
2952  }
2953  else
2954  {
2955  remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
2956  slot_getallattrs(remoteslot_part);
2957  }
2958  MemoryContextSwitchTo(oldctx);
2959 
2960  /* Check if we can do the update or delete on the leaf partition. */
2961  if (operation == CMD_UPDATE || operation == CMD_DELETE)
2962  {
2963  part_entry = logicalrep_partition_open(relmapentry, partrel,
2964  attrmap);
2965  check_relation_updatable(part_entry);
2966  }
2967 
2968  switch (operation)
2969  {
2970  case CMD_INSERT:
2971  apply_handle_insert_internal(edata, partrelinfo,
2972  remoteslot_part);
2973  break;
2974 
2975  case CMD_DELETE:
2976  apply_handle_delete_internal(edata, partrelinfo,
2977  remoteslot_part,
2978  part_entry->localindexoid);
2979  break;
2980 
2981  case CMD_UPDATE:
2982 
2983  /*
2984  * For UPDATE, depending on whether or not the updated tuple
2985  * satisfies the partition's constraint, perform a simple UPDATE
2986  * of the partition or move the updated tuple into a different
2987  * suitable partition.
2988  */
2989  {
2990  TupleTableSlot *localslot;
2991  ResultRelInfo *partrelinfo_new;
2992  Relation partrel_new;
2993  bool found;
2994 
2995  /* Get the matching local tuple from the partition. */
2996  found = FindReplTupleInLocalRel(edata, partrel,
2997  &part_entry->remoterel,
2998  part_entry->localindexoid,
2999  remoteslot_part, &localslot);
3000  if (!found)
3001  {
3002  /*
3003  * The tuple to be updated could not be found. Do nothing
3004  * except for emitting a log message.
3005  *
3006  * XXX should this be promoted to ereport(LOG) perhaps?
3007  */
3008  elog(DEBUG1,
3009  "logical replication did not find row to be updated "
3010  "in replication target relation's partition \"%s\"",
3011  RelationGetRelationName(partrel));
3012  return;
3013  }
3014 
3015  /*
3016  * Apply the update to the local tuple, putting the result in
3017  * remoteslot_part.
3018  */
3020  slot_modify_data(remoteslot_part, localslot, part_entry,
3021  newtup);
3022  MemoryContextSwitchTo(oldctx);
3023 
3024  /*
3025  * Does the updated tuple still satisfy the current
3026  * partition's constraint?
3027  */
3028  if (!partrel->rd_rel->relispartition ||
3029  ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3030  false))
3031  {
3032  /*
3033  * Yes, so simply UPDATE the partition. We don't call
3034  * apply_handle_update_internal() here, which would
3035  * normally do the following work, to avoid repeating some
3036  * work already done above to find the local tuple in the
3037  * partition.
3038  */
3039  EPQState epqstate;
3040 
3041  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3042  ExecOpenIndices(partrelinfo, false);
3043 
3044  EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3046  ACL_UPDATE);
3047  ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3048  localslot, remoteslot_part);
3049  ExecCloseIndices(partrelinfo);
3050  EvalPlanQualEnd(&epqstate);
3051  }
3052  else
3053  {
3054  /* Move the tuple into the new partition. */
3055 
3056  /*
3057  * New partition will be found using tuple routing, which
3058  * can only occur via the parent table. We might need to
3059  * convert the tuple to the parent's rowtype. Note that
3060  * this is the tuple found in the partition, not the
3061  * original search tuple received by this function.
3062  */
3063  if (map)
3064  {
3065  TupleConversionMap *PartitionToRootMap =
3067  RelationGetDescr(parentrel));
3068 
3069  remoteslot =
3070  execute_attr_map_slot(PartitionToRootMap->attrMap,
3071  remoteslot_part, remoteslot);
3072  }
3073  else
3074  {
3075  remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3076  slot_getallattrs(remoteslot);
3077  }
3078 
3079  /* Find the new partition. */
3081  partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3082  proute, remoteslot,
3083  estate);
3084  MemoryContextSwitchTo(oldctx);
3085  Assert(partrelinfo_new != partrelinfo);
3086  partrel_new = partrelinfo_new->ri_RelationDesc;
3087 
3088  /* Check that new partition also has supported relkind. */
3089  CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3091  RelationGetRelationName(partrel_new));
3092 
3093  /* DELETE old tuple found in the old partition. */
3094  apply_handle_delete_internal(edata, partrelinfo,
3095  localslot,
3096  part_entry->localindexoid);
3097 
3098  /* INSERT new tuple into the new partition. */
3099 
3100  /*
3101  * Convert the replacement tuple to match the destination
3102  * partition rowtype.
3103  */
3105  remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3106  if (remoteslot_part == NULL)
3107  remoteslot_part = table_slot_create(partrel_new,
3108  &estate->es_tupleTable);
3109  map = ExecGetRootToChildMap(partrelinfo_new, estate);
3110  if (map != NULL)
3111  {
3112  remoteslot_part = execute_attr_map_slot(map->attrMap,
3113  remoteslot,
3114  remoteslot_part);
3115  }
3116  else
3117  {
3118  remoteslot_part = ExecCopySlot(remoteslot_part,
3119  remoteslot);
3120  slot_getallattrs(remoteslot);
3121  }
3122  MemoryContextSwitchTo(oldctx);
3123  apply_handle_insert_internal(edata, partrelinfo_new,
3124  remoteslot_part);
3125  }
3126  }
3127  break;
3128 
3129  default:
3130  elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3131  break;
3132  }
3133 }
3134 
3135 /*
3136  * Handle TRUNCATE message.
3137  *
3138  * TODO: FDW support
3139  */
3140 static void
3142 {
3143  bool cascade = false;
3144  bool restart_seqs = false;
3145  List *remote_relids = NIL;
3146  List *remote_rels = NIL;
3147  List *rels = NIL;
3148  List *part_rels = NIL;
3149  List *relids = NIL;
3150  List *relids_logged = NIL;
3151  ListCell *lc;
3152  LOCKMODE lockmode = AccessExclusiveLock;
3153 
3154  /*
3155  * Quick return if we are skipping data modification changes or handling
3156  * streamed transactions.
3157  */
3158  if (is_skipping_changes() ||
3160  return;
3161 
3163 
3164  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3165 
3166  foreach(lc, remote_relids)
3167  {
3168  LogicalRepRelId relid = lfirst_oid(lc);
3169  LogicalRepRelMapEntry *rel;
3170 
3171  rel = logicalrep_rel_open(relid, lockmode);
3172  if (!should_apply_changes_for_rel(rel))
3173  {
3174  /*
3175  * The relation can't become interesting in the middle of the
3176  * transaction so it's safe to unlock it.
3177  */
3178  logicalrep_rel_close(rel, lockmode);
3179  continue;
3180  }
3181 
3182  remote_rels = lappend(remote_rels, rel);
3184  rels = lappend(rels, rel->localrel);
3185  relids = lappend_oid(relids, rel->localreloid);
3187  relids_logged = lappend_oid(relids_logged, rel->localreloid);
3188 
3189  /*
3190  * Truncate partitions if we got a message to truncate a partitioned
3191  * table.
3192  */
3193  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3194  {
3195  ListCell *child;
3196  List *children = find_all_inheritors(rel->localreloid,
3197  lockmode,
3198  NULL);
3199 
3200  foreach(child, children)
3201  {
3202  Oid childrelid = lfirst_oid(child);
3203  Relation childrel;
3204 
3205  if (list_member_oid(relids, childrelid))
3206  continue;
3207 
3208  /* find_all_inheritors already got lock */
3209  childrel = table_open(childrelid, NoLock);
3210 
3211  /*
3212  * Ignore temp tables of other backends. See similar code in
3213  * ExecuteTruncate().
3214  */
3215  if (RELATION_IS_OTHER_TEMP(childrel))
3216  {
3217  table_close(childrel, lockmode);
3218  continue;
3219  }
3220 
3222  rels = lappend(rels, childrel);
3223  part_rels = lappend(part_rels, childrel);
3224  relids = lappend_oid(relids, childrelid);
3225  /* Log this relation only if needed for logical decoding */
3226  if (RelationIsLogicallyLogged(childrel))
3227  relids_logged = lappend_oid(relids_logged, childrelid);
3228  }
3229  }
3230  }
3231 
3232  /*
3233  * Even if we used CASCADE on the upstream primary we explicitly default
3234  * to replaying changes without further cascading. This might be later
3235  * changeable with a user specified option.
3236  *
3237  * MySubscription->runasowner tells us whether we want to execute
3238  * replication actions as the subscription owner; the last argument to
3239  * TruncateGuts tells it whether we want to switch to the table owner.
3240  * Those are exactly opposite conditions.
3241  */
3242  ExecuteTruncateGuts(rels,
3243  relids,
3244  relids_logged,
3245  DROP_RESTRICT,
3246  restart_seqs,
3248  foreach(lc, remote_rels)
3249  {
3250  LogicalRepRelMapEntry *rel = lfirst(lc);
3251 
3253  }
3254  foreach(lc, part_rels)
3255  {
3256  Relation rel = lfirst(lc);
3257 
3258  table_close(rel, NoLock);
3259  }
3260 
3262 }
3263 
3264 
3265 /*
3266  * Logical replication protocol message dispatcher.
3267  */
3268 void
3270 {
3272  LogicalRepMsgType saved_command;
3273 
3274  /*
3275  * Set the current command being applied. Since this function can be
3276  * called recursively when applying spooled changes, save the current
3277  * command.
3278  */
3279  saved_command = apply_error_callback_arg.command;
3281 
3282  switch (action)
3283  {
3284  case LOGICAL_REP_MSG_BEGIN:
3285  apply_handle_begin(s);
3286  break;
3287 
3290  break;
3291 
3294  break;
3295 
3298  break;
3299 
3302  break;
3303 
3306  break;
3307 
3310  break;
3311 
3312  case LOGICAL_REP_MSG_TYPE:
3313  apply_handle_type(s);
3314  break;
3315 
3318  break;
3319 
3321 
3322  /*
3323  * Logical replication does not use generic logical messages yet.
3324  * Although, it could be used by other applications that use this
3325  * output plugin.
3326  */
3327  break;
3328 
3331  break;
3332 
3335  break;
3336 
3339  break;
3340 
3343  break;
3344 
3347  break;
3348 
3351  break;
3352 
3355  break;
3356 
3359  break;
3360 
3363  break;
3364 
3365  default:
3366  ereport(ERROR,
3367  (errcode(ERRCODE_PROTOCOL_VIOLATION),
3368  errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3369  }
3370 
3371  /* Reset the current command */
3372  apply_error_callback_arg.command = saved_command;
3373 }
3374 
3375 /*
3376  * Figure out which write/flush positions to report to the walsender process.
3377  *
3378  * We can't simply report back the last LSN the walsender sent us because the
3379  * local transaction might not yet be flushed to disk locally. Instead we
3380  * build a list that associates local with remote LSNs for every commit. When
3381  * reporting back the flush position to the sender we iterate that list and
3382  * check which entries on it are already locally flushed. Those we can report
3383  * as having been flushed.
3384  *
3385  * The have_pending_txes is true if there are outstanding transactions that
3386  * need to be flushed.
3387  */
3388 static void
3390  bool *have_pending_txes)
3391 {
3392  dlist_mutable_iter iter;
3393  XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3394 
3396  *flush = InvalidXLogRecPtr;
3397 
3399  {
3400  FlushPosition *pos =
3401  dlist_container(FlushPosition, node, iter.cur);
3402 
3403  *write = pos->remote_end;
3404 
3405  if (pos->local_end <= local_flush)
3406  {
3407  *flush = pos->remote_end;
3408  dlist_delete(iter.cur);
3409  pfree(pos);
3410  }
3411  else
3412  {
3413  /*
3414  * Don't want to uselessly iterate over the rest of the list which
3415  * could potentially be long. Instead get the last element and
3416  * grab the write position from there.
3417  */
3418  pos = dlist_tail_element(FlushPosition, node,
3419  &lsn_mapping);
3420  *write = pos->remote_end;
3421  *have_pending_txes = true;
3422  return;
3423  }
3424  }
3425 
3426  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3427 }
3428 
3429 /*
3430  * Store current remote/local lsn pair in the tracking list.
3431  */
3432 void
3434 {
3435  FlushPosition *flushpos;
3436 
3437  /*
3438  * Skip for parallel apply workers, because the lsn_mapping is maintained
3439  * by the leader apply worker.
3440  */
3442  return;
3443 
3444  /* Need to do this in permanent context */
3446 
3447  /* Track commit lsn */
3448  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
3449  flushpos->local_end = local_lsn;
3450  flushpos->remote_end = remote_lsn;
3451 
3452  dlist_push_tail(&lsn_mapping, &flushpos->node);
3454 }
3455 
3456 
3457 /* Update statistics of the worker. */
3458 static void
3459 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
3460 {
3461  MyLogicalRepWorker->last_lsn = last_lsn;
3462  MyLogicalRepWorker->last_send_time = send_time;
3464  if (reply)
3465  {
3466  MyLogicalRepWorker->reply_lsn = last_lsn;
3467  MyLogicalRepWorker->reply_time = send_time;
3468  }
3469 }
3470 
3471 /*
3472  * Apply main loop.
3473  */
3474 static void
3476 {
3477  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3478  bool ping_sent = false;
3479  TimeLineID tli;
3480  ErrorContextCallback errcallback;
3481 
3482  /*
3483  * Init the ApplyMessageContext which we clean up after each replication
3484  * protocol message.
3485  */
3487  "ApplyMessageContext",
3489 
3490  /*
3491  * This memory context is used for per-stream data when the streaming mode
3492  * is enabled. This context is reset on each stream stop.
3493  */
3495  "LogicalStreamingContext",
3497 
3498  /* mark as idle, before starting to loop */
3500 
3501  /*
3502  * Push apply error context callback. Fields will be filled while applying
3503  * a change.
3504  */
3505  errcallback.callback = apply_error_callback;
3506  errcallback.previous = error_context_stack;
3507  error_context_stack = &errcallback;
3509 
3510  /* This outer loop iterates once per wait. */
3511  for (;;)
3512  {
3514  int rc;
3515  int len;
3516  char *buf = NULL;
3517  bool endofstream = false;
3518  long wait_time;
3519 
3521 
3523 
3525 
3526  if (len != 0)
3527  {
3528  /* Loop to process all available data (without blocking). */
3529  for (;;)
3530  {
3532 
3533  if (len == 0)
3534  {
3535  break;
3536  }
3537  else if (len < 0)
3538  {
3539  ereport(LOG,
3540  (errmsg("data stream from publisher has ended")));
3541  endofstream = true;
3542  break;
3543  }
3544  else
3545  {
3546  int c;
3547  StringInfoData s;
3548 
3549  if (ConfigReloadPending)
3550  {
3551  ConfigReloadPending = false;
3553  }
3554 
3555  /* Reset timeout. */
3556  last_recv_timestamp = GetCurrentTimestamp();
3557  ping_sent = false;
3558 
3559  /* Ensure we are reading the data into our memory context. */
3561 
3563 
3564  c = pq_getmsgbyte(&s);
3565 
3566  if (c == 'w')
3567  {
3568  XLogRecPtr start_lsn;
3569  XLogRecPtr end_lsn;
3570  TimestampTz send_time;
3571 
3572  start_lsn = pq_getmsgint64(&s);
3573  end_lsn = pq_getmsgint64(&s);
3574  send_time = pq_getmsgint64(&s);
3575 
3576  if (last_received < start_lsn)
3577  last_received = start_lsn;
3578 
3579  if (last_received < end_lsn)
3580  last_received = end_lsn;
3581 
3582  UpdateWorkerStats(last_received, send_time, false);
3583 
3584  apply_dispatch(&s);
3585  }
3586  else if (c == 'k')
3587  {
3588  XLogRecPtr end_lsn;
3590  bool reply_requested;
3591 
3592  end_lsn = pq_getmsgint64(&s);
3593  timestamp = pq_getmsgint64(&s);
3594  reply_requested = pq_getmsgbyte(&s);
3595 
3596  if (last_received < end_lsn)
3597  last_received = end_lsn;
3598 
3599  send_feedback(last_received, reply_requested, false);
3600  UpdateWorkerStats(last_received, timestamp, true);
3601  }
3602  /* other message types are purposefully ignored */
3603 
3605  }
3606 
3608  }
3609  }
3610 
3611  /* confirm all writes so far */
3612  send_feedback(last_received, false, false);
3613 
3615  {
3616  /*
3617  * If we didn't get any transactions for a while there might be
3618  * unconsumed invalidation messages in the queue, consume them
3619  * now.
3620  */
3623 
3624  /* Process any table synchronization changes. */
3625  process_syncing_tables(last_received);
3626  }
3627 
3628  /* Cleanup the memory. */
3631 
3632  /* Check if we need to exit the streaming loop. */
3633  if (endofstream)
3634  break;
3635 
3636  /*
3637  * Wait for more data or latch. If we have unflushed transactions,
3638  * wake up after WalWriterDelay to see if they've been flushed yet (in
3639  * which case we should send a feedback message). Otherwise, there's
3640  * no particular urgency about waking up unless we get data or a
3641  * signal.
3642  */
3643  if (!dlist_is_empty(&lsn_mapping))
3644  wait_time = WalWriterDelay;
3645  else
3646  wait_time = NAPTIME_PER_CYCLE;
3647 
3651  fd, wait_time,
3652  WAIT_EVENT_LOGICAL_APPLY_MAIN);
3653 
3654  if (rc & WL_LATCH_SET)
3655  {
3658  }
3659 
3660  if (ConfigReloadPending)
3661  {
3662  ConfigReloadPending = false;
3664  }
3665 
3666  if (rc & WL_TIMEOUT)
3667  {
3668  /*
3669  * We didn't receive anything new. If we haven't heard anything
3670  * from the server for more than wal_receiver_timeout / 2, ping
3671  * the server. Also, if it's been longer than
3672  * wal_receiver_status_interval since the last update we sent,
3673  * send a status update to the primary anyway, to report any
3674  * progress in applying WAL.
3675  */
3676  bool requestReply = false;
3677 
3678  /*
3679  * Check if time since last receive from primary has reached the
3680  * configured limit.
3681  */
3682  if (wal_receiver_timeout > 0)
3683  {
3685  TimestampTz timeout;
3686 
3687  timeout =
3688  TimestampTzPlusMilliseconds(last_recv_timestamp,
3690 
3691  if (now >= timeout)
3692  ereport(ERROR,
3693  (errcode(ERRCODE_CONNECTION_FAILURE),
3694  errmsg("terminating logical replication worker due to timeout")));
3695 
3696  /* Check to see if it's time for a ping. */
3697  if (!ping_sent)
3698  {
3699  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
3700  (wal_receiver_timeout / 2));
3701  if (now >= timeout)
3702  {
3703  requestReply = true;
3704  ping_sent = true;
3705  }
3706  }
3707  }
3708 
3709  send_feedback(last_received, requestReply, requestReply);
3710 
3711  /*
3712  * Force reporting to ensure long idle periods don't lead to
3713  * arbitrarily delayed stats. Stats can only be reported outside
3714  * of (implicit or explicit) transactions. That shouldn't lead to
3715  * stats being delayed for long, because transactions are either
3716  * sent as a whole on commit or streamed. Streamed transactions
3717  * are spilled to disk and applied on commit.
3718  */
3719  if (!IsTransactionState())
3720  pgstat_report_stat(true);
3721  }
3722  }
3723 
3724  /* Pop the error context stack */
3725  error_context_stack = errcallback.previous;
3727 
3728  /* All done */
3730 }
3731 
3732 /*
3733  * Send a Standby Status Update message to server.
3734  *
3735  * 'recvpos' is the latest LSN we've received data to, force is set if we need
3736  * to send a response to avoid timeouts.
3737  */
3738 static void
3739 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
3740 {
3741  static StringInfo reply_message = NULL;
3742  static TimestampTz send_time = 0;
3743 
3744  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
3745  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
3746  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
3747 
3748  XLogRecPtr writepos;
3749  XLogRecPtr flushpos;
3750  TimestampTz now;
3751  bool have_pending_txes;
3752 
3753  /*
3754  * If the user doesn't want status to be reported to the publisher, be
3755  * sure to exit before doing anything at all.
3756  */
3757  if (!force && wal_receiver_status_interval <= 0)
3758  return;
3759 
3760  /* It's legal to not pass a recvpos */
3761  if (recvpos < last_recvpos)
3762  recvpos = last_recvpos;
3763 
3764  get_flush_position(&writepos, &flushpos, &have_pending_txes);
3765 
3766  /*
3767  * No outstanding transactions to flush, we can report the latest received
3768  * position. This is important for synchronous replication.
3769  */
3770  if (!have_pending_txes)
3771  flushpos = writepos = recvpos;
3772 
3773  if (writepos < last_writepos)
3774  writepos = last_writepos;
3775 
3776  if (flushpos < last_flushpos)
3777  flushpos = last_flushpos;
3778 
3780 
3781  /* if we've already reported everything we're good */
3782  if (!force &&
3783  writepos == last_writepos &&
3784  flushpos == last_flushpos &&
3785  !TimestampDifferenceExceeds(send_time, now,
3787  return;
3788  send_time = now;
3789 
3790  if (!reply_message)
3791  {
3793 
3795  MemoryContextSwitchTo(oldctx);
3796  }
3797  else
3799 
3800  pq_sendbyte(reply_message, 'r');
3801  pq_sendint64(reply_message, recvpos); /* write */
3802  pq_sendint64(reply_message, flushpos); /* flush */
3803  pq_sendint64(reply_message, writepos); /* apply */
3804  pq_sendint64(reply_message, now); /* sendTime */
3805  pq_sendbyte(reply_message, requestReply); /* replyRequested */
3806 
3807  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
3808  force,
3809  LSN_FORMAT_ARGS(recvpos),
3810  LSN_FORMAT_ARGS(writepos),
3811  LSN_FORMAT_ARGS(flushpos));
3812 
3815 
3816  if (recvpos > last_recvpos)
3817  last_recvpos = recvpos;
3818  if (writepos > last_writepos)
3819  last_writepos = writepos;
3820  if (flushpos > last_flushpos)
3821  last_flushpos = flushpos;
3822 }
3823 
3824 /*
3825  * Exit routine for apply workers due to subscription parameter changes.
3826  */
3827 static void
3829 {
3831  {
3832  /*
3833  * Don't stop the parallel apply worker as the leader will detect the
3834  * subscription parameter change and restart logical replication later
3835  * anyway. This also prevents the leader from reporting errors when
3836  * trying to communicate with a stopped parallel apply worker, which
3837  * would accidentally disable subscriptions if disable_on_error was
3838  * set.
3839  */
3840  return;
3841  }
3842 
3843  /*
3844  * Reset the last-start time for this apply worker so that the launcher
3845  * will restart it without waiting for wal_retrieve_retry_interval if the
3846  * subscription is still active, and so that we won't leak that hash table
3847  * entry if it isn't.
3848  */
3849  if (am_leader_apply_worker())
3851 
3852  proc_exit(0);
3853 }
3854 
3855 /*
3856  * Reread subscription info if needed. Most changes will be exit.
3857  */
3858 void
3860 {
3861  MemoryContext oldctx;
3863  bool started_tx = false;
3864 
3865  /* When cache state is valid there is nothing to do here. */
3866  if (MySubscriptionValid)
3867  return;
3868 
3869  /* This function might be called inside or outside of transaction. */
3870  if (!IsTransactionState())
3871  {
3873  started_tx = true;
3874  }
3875 
3876  /* Ensure allocations in permanent context. */
3878 
3880 
3881  /*
3882  * Exit if the subscription was removed. This normally should not happen
3883  * as the worker gets killed during DROP SUBSCRIPTION.
3884  */
3885  if (!newsub)
3886  {
3887  ereport(LOG,
3888  (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
3889  MySubscription->name)));
3890 
3891  /* Ensure we remove no-longer-useful entry for worker's start time */
3892  if (am_leader_apply_worker())
3894 
3895  proc_exit(0);
3896  }
3897 
3898  /* Exit if the subscription was disabled. */
3899  if (!newsub->enabled)
3900  {
3901  ereport(LOG,
3902  (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
3903  MySubscription->name)));
3904 
3906  }
3907 
3908  /* !slotname should never happen when enabled is true. */
3909  Assert(newsub->slotname);
3910 
3911  /* two-phase cannot be altered while the worker is running */
3912  Assert(newsub->twophasestate == MySubscription->twophasestate);
3913 
3914  /*
3915  * Exit if any parameter that affects the remote connection was changed.
3916  * The launcher will start a new worker but note that the parallel apply
3917  * worker won't restart if the streaming option's value is changed from
3918  * 'parallel' to any other value or the server decides not to stream the
3919  * in-progress transaction.
3920  */
3921  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
3922  strcmp(newsub->name, MySubscription->name) != 0 ||
3923  strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
3924  newsub->binary != MySubscription->binary ||
3925  newsub->stream != MySubscription->stream ||
3926  newsub->passwordrequired != MySubscription->passwordrequired ||
3927  strcmp(newsub->origin, MySubscription->origin) != 0 ||
3928  newsub->owner != MySubscription->owner ||
3929  !equal(newsub->publications, MySubscription->publications))
3930  {
3932  ereport(LOG,
3933  (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
3934  MySubscription->name)));
3935  else
3936  ereport(LOG,
3937  (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
3938  MySubscription->name)));
3939 
3941  }
3942 
3943  /*
3944  * Exit if the subscription owner's superuser privileges have been
3945  * revoked.
3946  */
3947  if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
3948  {
3950  ereport(LOG,
3951  errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
3952  MySubscription->name));
3953  else
3954  ereport(LOG,
3955  errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
3956  MySubscription->name));
3957 
3959  }
3960 
3961  /* Check for other changes that should never happen too. */
3962  if (newsub->dbid != MySubscription->dbid)
3963  {
3964  elog(ERROR, "subscription %u changed unexpectedly",
3966  }
3967 
3968  /* Clean old subscription info and switch to new one. */
3971 
3972  MemoryContextSwitchTo(oldctx);
3973 
3974  /* Change synchronous commit according to the user's wishes */
3975  SetConfigOption("synchronous_commit", MySubscription->synccommit,
3977 
3978  if (started_tx)
3980 
3981  MySubscriptionValid = true;
3982 }
3983 
3984 /*
3985  * Callback from subscription syscache invalidation.
3986  */
3987 static void
3988 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
3989 {
3990  MySubscriptionValid = false;
3991 }
3992 
3993 /*
3994  * subxact_info_write
3995  * Store information about subxacts for a toplevel transaction.
3996  *
3997  * For each subxact we store offset of it's first change in the main file.
3998  * The file is always over-written as a whole.
3999  *
4000  * XXX We should only store subxacts that were not aborted yet.
4001  */
4002 static void
4004 {
4005  char path[MAXPGPATH];
4006  Size len;
4007  BufFile *fd;
4008 
4010 
4011  /* construct the subxact filename */
4012  subxact_filename(path, subid, xid);
4013 
4014  /* Delete the subxacts file, if exists. */
4015  if (subxact_data.nsubxacts == 0)
4016  {
4019 
4020  return;
4021  }
4022 
4023  /*
4024  * Create the subxact file if it not already created, otherwise open the
4025  * existing file.
4026  */
4028  true);
4029  if (fd == NULL)
4031 
4032  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4033 
4034  /* Write the subxact count and subxact info */
4037 
4038  BufFileClose(fd);
4039 
4040  /* free the memory allocated for subxact info */
4042 }
4043 
4044 /*
4045  * subxact_info_read
4046  * Restore information about subxacts of a streamed transaction.
4047  *
4048  * Read information about subxacts into the structure subxact_data that can be
4049  * used later.
4050  */
4051 static void
4053 {
4054  char path[MAXPGPATH];
4055  Size len;
4056  BufFile *fd;
4057  MemoryContext oldctx;
4058 
4062 
4063  /*
4064  * If the subxact file doesn't exist that means we don't have any subxact
4065  * info.
4066  */
4067  subxact_filename(path, subid, xid);
4069  true);
4070  if (fd == NULL)
4071  return;
4072 
4073  /* read number of subxact items */
4075 
4076  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4077 
4078  /* we keep the maximum as a power of 2 */
4080 
4081  /*
4082  * Allocate subxact information in the logical streaming context. We need
4083  * this information during the complete stream so that we can add the sub
4084  * transaction info to this. On stream stop we will flush this information
4085  * to the subxact file and reset the logical streaming context.
4086  */
4089  sizeof(SubXactInfo));
4090  MemoryContextSwitchTo(oldctx);
4091 
4092  if (len > 0)
4094 
4095  BufFileClose(fd);
4096 }
4097 
4098 /*
4099  * subxact_info_add
4100  * Add information about a subxact (offset in the main file).
4101  */
4102 static void
4104 {
4105  SubXactInfo *subxacts = subxact_data.subxacts;
4106  int64 i;
4107 
4108  /* We must have a valid top level stream xid and a stream fd. */
4110  Assert(stream_fd != NULL);
4111 
4112  /*
4113  * If the XID matches the toplevel transaction, we don't want to add it.
4114  */
4115  if (stream_xid == xid)
4116  return;
4117 
4118  /*
4119  * In most cases we're checking the same subxact as we've already seen in
4120  * the last call, so make sure to ignore it (this change comes later).
4121  */
4122  if (subxact_data.subxact_last == xid)
4123  return;
4124 
4125  /* OK, remember we're processing this XID. */
4126  subxact_data.subxact_last = xid;
4127 
4128  /*
4129  * Check if the transaction is already present in the array of subxact. We
4130  * intentionally scan the array from the tail, because we're likely adding
4131  * a change for the most recent subtransactions.
4132  *
4133  * XXX Can we rely on the subxact XIDs arriving in sorted order? That
4134  * would allow us to use binary search here.
4135  */
4136  for (i = subxact_data.nsubxacts; i > 0; i--)
4137  {
4138  /* found, so we're done */
4139  if (subxacts[i - 1].xid == xid)
4140  return;
4141  }
4142 
4143  /* This is a new subxact, so we need to add it to the array. */
4144  if (subxact_data.nsubxacts == 0)
4145  {
4146  MemoryContext oldctx;
4147 
4149 
4150  /*
4151  * Allocate this memory for subxacts in per-stream context, see
4152  * subxact_info_read.
4153  */
4155  subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
4156  MemoryContextSwitchTo(oldctx);
4157  }
4159  {
4161  subxacts = repalloc(subxacts,
4163  }
4164 
4165  subxacts[subxact_data.nsubxacts].xid = xid;
4166 
4167  /*
4168  * Get the current offset of the stream file and store it as offset of
4169  * this subxact.
4170  */
4172  &subxacts[subxact_data.nsubxacts].fileno,
4173  &subxacts[subxact_data.nsubxacts].offset);
4174 
4176  subxact_data.subxacts = subxacts;
4177 }
4178 
4179 /* format filename for file containing the info about subxacts */
4180 static inline void
4181 subxact_filename(char *path, Oid subid, TransactionId xid)
4182 {
4183  snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
4184 }
4185 
4186 /* format filename for file containing serialized changes */
4187 static inline void
4188 changes_filename(char *path, Oid subid, TransactionId xid)
4189 {
4190  snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
4191 }
4192 
4193 /*
4194  * stream_cleanup_files
4195  * Cleanup files for a subscription / toplevel transaction.
4196  *
4197  * Remove files with serialized changes and subxact info for a particular
4198  * toplevel transaction. Each subscription has a separate set of files
4199  * for any toplevel transaction.
4200  */
4201 void
4203 {
4204  char path[MAXPGPATH];
4205 
4206  /* Delete the changes file. */
4207  changes_filename(path, subid, xid);
4209 
4210  /* Delete the subxact file, if it exists. */
4211  subxact_filename(path, subid, xid);
4213 }
4214 
4215 /*
4216  * stream_open_file
4217  * Open a file that we'll use to serialize changes for a toplevel
4218  * transaction.
4219  *
4220  * Open a file for streamed changes from a toplevel transaction identified
4221  * by stream_xid (global variable). If it's the first chunk of streamed
4222  * changes for this transaction, create the buffile, otherwise open the
4223  * previously created file.
4224  */
4225 static void
4226 stream_open_file(Oid subid, TransactionId xid, bool first_segment)
4227 {
4228  char path[MAXPGPATH];
4229  MemoryContext oldcxt;
4230 
4231  Assert(OidIsValid(subid));
4233  Assert(stream_fd == NULL);
4234 
4235 
4236  changes_filename(path, subid, xid);
4237  elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
4238 
4239  /*
4240  * Create/open the buffiles under the logical streaming context so that we
4241  * have those files until stream stop.
4242  */
4244 
4245  /*
4246  * If this is the first streamed segment, create the changes file.
4247  * Otherwise, just open the file for writing, in append mode.
4248  */
4249  if (first_segment)
4251  path);
4252  else
4253  {
4254  /*
4255  * Open the file and seek to the end of the file because we always
4256  * append the changes file.
4257  */
4259  path, O_RDWR, false);
4260  BufFileSeek(stream_fd, 0, 0, SEEK_END);
4261  }
4262 
4263  MemoryContextSwitchTo(oldcxt);
4264 }
4265 
4266 /*
4267  * stream_close_file
4268  * Close the currently open file with streamed changes.
4269  */
4270 static void
4272 {
4273  Assert(stream_fd != NULL);
4274 
4276 
4277  stream_fd = NULL;
4278 }
4279 
4280 /*
4281  * stream_write_change
4282  * Serialize a change to a file for the current toplevel transaction.
4283  *
4284  * The change is serialized in a simple format, with length (not including
4285  * the length), action code (identifying the message type) and message
4286  * contents (without the subxact TransactionId value).
4287  */
4288 static void
4290 {
4291  int len;
4292 
4293  Assert(stream_fd != NULL);
4294 
4295  /* total on-disk size, including the action type character */
4296  len = (s->len - s->cursor) + sizeof(char);
4297 
4298  /* first write the size */
4299  BufFileWrite(stream_fd, &len, sizeof(len));
4300 
4301  /* then the action */
4302  BufFileWrite(stream_fd, &action, sizeof(action));
4303 
4304  /* and finally the remaining part of the buffer (after the XID) */
4305  len = (s->len - s->cursor);
4306 
4307  BufFileWrite(stream_fd, &s->data[s->cursor], len);
4308 }
4309 
4310 /*
4311  * stream_open_and_write_change
4312  * Serialize a message to a file for the given transaction.
4313  *
4314  * This function is similar to stream_write_change except that it will open the
4315  * target file if not already before writing the message and close the file at
4316  * the end.
4317  */
4318 static void
4320 {
4322 
4323  if (!stream_fd)
4324  stream_start_internal(xid, false);
4325 
4327  stream_stop_internal(xid);
4328 }
4329 
4330 /*
4331  * Sets streaming options including replication slot name and origin start
4332  * position. Workers need these options for logical replication.
4333  */
4334 void
4336  char *slotname,
4337  XLogRecPtr *origin_startpos)
4338 {
4339  int server_version;
4340 
4341  options->logical = true;
4342  options->startpoint = *origin_startpos;
4343  options->slotname = slotname;
4344 
4346  options->proto.logical.proto_version =
4351 
4352  options->proto.logical.publication_names = MySubscription->publications;
4353  options->proto.logical.binary = MySubscription->binary;
4354 
4355  /*
4356  * Assign the appropriate option value for streaming option according to
4357  * the 'streaming' mode and the publisher's ability to support that mode.
4358  */
4359  if (server_version >= 160000 &&
4361  {
4362  options->proto.logical.streaming_str = "parallel";
4364  }
4365  else if (server_version >= 140000 &&
4367  {
4368  options->proto.logical.streaming_str = "on";
4370  }
4371  else
4372  {
4373  options->proto.logical.streaming_str = NULL;
4375  }
4376 
4377  options->proto.logical.twophase = false;
4378  options->proto.logical.origin = pstrdup(MySubscription->origin);
4379 }
4380 
4381 /*
4382  * Cleanup the memory for subxacts and reset the related variables.
4383  */
4384 static inline void
4386 {
4387  if (subxact_data.subxacts)
4389 
4390  subxact_data.subxacts = NULL;
4392  subxact_data.nsubxacts = 0;
4394 }
4395 
4396 /*
4397  * Common function to run the apply loop with error handling. Disable the
4398  * subscription, if necessary.
4399  *
4400  * Note that we don't handle FATAL errors which are probably because
4401  * of system resource error and are not repeatable.
4402  */
4403 void
4404 start_apply(XLogRecPtr origin_startpos)
4405 {
4406  PG_TRY();
4407  {
4408  LogicalRepApplyLoop(origin_startpos);
4409  }
4410  PG_CATCH();
4411  {
4414  else
4415  {
4416  /*
4417  * Report the worker failed while applying changes. Abort the
4418  * current transaction so that the stats message is sent in an
4419  * idle state.
4420  */
4423 
4424  PG_RE_THROW();
4425  }
4426  }
4427  PG_END_TRY();
4428 }
4429 
4430 /*
4431  * Runs the leader apply worker.
4432  *
4433  * It sets up replication origin, streaming options and then starts streaming.
4434  */
4435 static void
4437 {
4438  char originname[NAMEDATALEN];
4439  XLogRecPtr origin_startpos = InvalidXLogRecPtr;
4440  char *slotname = NULL;
4442  RepOriginId originid;
4443  TimeLineID startpointTLI;
4444  char *err;
4445  bool must_use_password;
4446 
4447  slotname = MySubscription->slotname;
4448 
4449  /*
4450  * This shouldn't happen if the subscription is enabled, but guard against
4451  * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
4452  * slot is NULL.)
4453  */
4454  if (!slotname)
4455  ereport(ERROR,
4456  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
4457  errmsg("subscription has no replication slot set")));
4458 
4459  /* Setup replication origin tracking. */
4461  originname, sizeof(originname));
4463  originid = replorigin_by_name(originname, true);
4464  if (!OidIsValid(originid))
4465  originid = replorigin_create(originname);
4466  replorigin_session_setup(originid, 0);
4467  replorigin_session_origin = originid;
4468  origin_startpos = replorigin_session_get_progress(false);
4470 
4471  /* Is the use of a password mandatory? */
4472  must_use_password = MySubscription->passwordrequired &&
4474 
4476  true, must_use_password,
4477  MySubscription->name, &err);
4478 
4479  if (LogRepWorkerWalRcvConn == NULL)
4480  ereport(ERROR,
4481  (errcode(ERRCODE_CONNECTION_FAILURE),
4482  errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
4483  MySubscription->name, err)));
4484 
4485  /*
4486  * We don't really use the output identify_system for anything but it does
4487  * some initializations on the upstream so let's still call it.
4488  */
4489  (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
4490 
4491  set_apply_error_context_origin(originname);
4492 
4493  set_stream_options(&options, slotname, &origin_startpos);
4494 
4495  /*
4496  * Even when the two_phase mode is requested by the user, it remains as
4497  * the tri-state PENDING until all tablesyncs have reached READY state.
4498  * Only then, can it become ENABLED.
4499  *
4500  * Note: If the subscription has no tables then leave the state as
4501  * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
4502  * work.
4503  */
4506  {
4507  /* Start streaming with two_phase enabled */
4508  options.proto.logical.twophase = true;
4510 
4515  }
4516  else
4517  {
4519  }
4520 
4521  ereport(DEBUG1,
4522  (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
4527  "?")));
4528 
4529  /* Run the main loop. */
4530  start_apply(origin_startpos);
4531 }
4532 
4533 /*
4534  * Common initialization for leader apply worker, parallel apply worker and
4535  * tablesync worker.
4536  *
4537  * Initialize the database connection, in-memory subscription and necessary
4538  * config options.
4539  */
4540 void
4542 {
4543  MemoryContext oldctx;
4544 
4545  /* Run as replica session replication role. */
4546  SetConfigOption("session_replication_role", "replica",
4548 
4549  /* Connect to our database. */
4552  0);
4553 
4554  /*
4555  * Set always-secure search path, so malicious users can't redirect user
4556  * code (e.g. pg_index.indexprs).
4557  */
4558  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
4559 
4560  /* Load the subscription into persistent memory context. */
4562  "ApplyContext",
4566 
4568  if (!MySubscription)
4569  {
4570  ereport(LOG,
4571  (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
4573 
4574  /* Ensure we remove no-longer-useful entry for worker's start time */
4575  if (am_leader_apply_worker())
4577 
4578  proc_exit(0);
4579  }
4580 
4581  MySubscriptionValid = true;
4582  MemoryContextSwitchTo(oldctx);
4583 
4584  if (!MySubscription->enabled)
4585  {
4586  ereport(LOG,
4587  (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
4588  MySubscription->name)));
4589 
4591  }
4592 
4593  /* Setup synchronous commit according to the user's wishes */
4594  SetConfigOption("synchronous_commit", MySubscription->synccommit,
4596 
4597  /*
4598  * Keep us informed about subscription or role changes. Note that the
4599  * role's superuser privilege can be revoked.
4600  */
4601  CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
4603  (Datum) 0);
4604 
4607  (Datum) 0);
4608 
4609  if (am_tablesync_worker())
4610  ereport(LOG,
4611  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
4614  else
4615  ereport(LOG,
4616  (errmsg("logical replication apply worker for subscription \"%s\" has started",
4617  MySubscription->name)));
4618 
4620 }
4621 
4622 /* Common function to setup the leader apply or tablesync worker. */
4623 void
4624 SetupApplyOrSyncWorker(int worker_slot)
4625 {
4626  /* Attach to slot */
4627  logicalrep_worker_attach(worker_slot);
4628 
4630 
4631  /* Setup signal handling */
4633  pqsignal(SIGTERM, die);
4635 
4636  /*
4637  * We don't currently need any ResourceOwner in a walreceiver process, but
4638  * if we did, we could call CreateAuxProcessResourceOwner here.
4639  */
4640 
4641  /* Initialise stats to a sanish value */
4644 
4645  /* Load the libpq-specific functions */
4646  load_file("libpqwalreceiver", false);
4647 
4649 
4650  /* Connect to the origin and start the replication. */
4651  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
4653 
4654  /*
4655  * Setup callback for syscache so that we know when something changes in
4656  * the subscription relation state.
4657  */
4658  CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
4660  (Datum) 0);
4661 }
4662 
4663 /* Logical Replication Apply worker entry point */
4664 void
4666 {
4667  int worker_slot = DatumGetInt32(main_arg);
4668 
4669  InitializingApplyWorker = true;
4670 
4671  SetupApplyOrSyncWorker(worker_slot);
4672 
4673  InitializingApplyWorker = false;
4674 
4675  run_apply_worker();
4676 
4677  proc_exit(0);
4678 }
4679 
4680 /*
4681  * After error recovery, disable the subscription in a new transaction
4682  * and exit cleanly.
4683  */
4684 void
4686 {
4687  /*
4688  * Emit the error message, and recover from the error state to an idle
4689  * state
4690  */
4691  HOLD_INTERRUPTS();
4692 
4693  EmitErrorReport();
4695  FlushErrorState();
4696 
4698 
4699  /* Report the worker failed during either table synchronization or apply */
4701  !am_tablesync_worker());
4702 
4703  /* Disable the subscription */
4707 
4708  /* Ensure we remove no-longer-useful entry for worker's start time */
4709  if (am_leader_apply_worker())
4711 
4712  /* Notify the subscription has been disabled and exit */
4713  ereport(LOG,
4714  errmsg("subscription \"%s\" has been disabled because of an error",
4715  MySubscription->name));
4716 
4717  proc_exit(0);
4718 }
4719 
4720 /*
4721  * Is current process a logical replication worker?
4722  */
4723 bool
4725 {
4726  return MyLogicalRepWorker != NULL;
4727 }
4728 
4729 /*
4730  * Is current process a logical replication parallel apply worker?
4731  */
4732 bool
4734 {
4736 }
4737 
4738 /*
4739  * Start skipping changes of the transaction if the given LSN matches the
4740  * LSN specified by subscription's skiplsn.
4741  */
4742 static void
4744 {
4748 
4749  /*
4750  * Quick return if it's not requested to skip this transaction. This
4751  * function is called for every remote transaction and we assume that
4752  * skipping the transaction is not used often.
4753  */
4755  MySubscription->skiplsn != finish_lsn))
4756  return;
4757 
4758  /* Start skipping all changes of this transaction */
4759  skip_xact_finish_lsn = finish_lsn;
4760 
4761  ereport(LOG,
4762  errmsg("logical replication starts skipping transaction at LSN %X/%X",
4764 }
4765 
4766 /*
4767  * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
4768  */
4769 static void
4771 {
4772  if (!is_skipping_changes())
4773  return;
4774 
4775  ereport(LOG,
4776  (errmsg("logical replication completed skipping transaction at LSN %X/%X",
4778 
4779  /* Stop skipping changes */
4781 }
4782 
4783 /*
4784  * Clear subskiplsn of pg_subscription catalog.
4785  *
4786  * finish_lsn is the transaction's finish LSN that is used to check if the
4787  * subskiplsn matches it. If not matched, we raise a warning when clearing the
4788  * subskiplsn in order to inform users for cases e.g., where the user mistakenly
4789  * specified the wrong subskiplsn.
4790  */
4791 static void
4793 {
4794  Relation rel;
4795  Form_pg_subscription subform;
4796  HeapTuple tup;
4797  XLogRecPtr myskiplsn = MySubscription->skiplsn;
4798  bool started_tx = false;
4799 
4801  return;
4802 
4803  if (!IsTransactionState())
4804  {
4806  started_tx = true;
4807  }
4808 
4809  /*
4810  * Protect subskiplsn of pg_subscription from being concurrently updated
4811  * while clearing it.
4812  */
4813  LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
4814  AccessShareLock);
4815 
4816  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
4817 
4818  /* Fetch the existing tuple. */
4819  tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
4821 
4822  if (!HeapTupleIsValid(tup))
4823  elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
4824 
4825  subform = (Form_pg_subscription) GETSTRUCT(tup);
4826 
4827  /*
4828  * Clear the subskiplsn. If the user has already changed subskiplsn before
4829  * clearing it we don't update the catalog and the replication origin
4830  * state won't get advanced. So in the worst case, if the server crashes
4831  * before sending an acknowledgment of the flush position the transaction
4832  * will be sent again and the user needs to set subskiplsn again. We can
4833  * reduce the possibility by logging a replication origin WAL record to
4834  * advance the origin LSN instead but there is no way to advance the
4835  * origin timestamp and it doesn't seem to be worth doing anything about
4836  * it since it's a very rare case.
4837  */
4838  if (subform->subskiplsn == myskiplsn)
4839  {
4840  bool nulls[Natts_pg_subscription];
4841  bool replaces[Natts_pg_subscription];
4842  Datum values[Natts_pg_subscription];
4843 
4844  memset(values, 0, sizeof(values));
4845  memset(nulls, false, sizeof(nulls));
4846  memset(replaces, false, sizeof(replaces));
4847 
4848  /* reset subskiplsn */
4849  values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
4850  replaces[Anum_pg_subscription_subskiplsn - 1] = true;
4851 
4852  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
4853  replaces);
4854  CatalogTupleUpdate(rel, &tup->t_self, tup);
4855 
4856  if (myskiplsn != finish_lsn)
4857  ereport(WARNING,
4858  errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
4859  errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
4860  LSN_FORMAT_ARGS(finish_lsn),
4861  LSN_FORMAT_ARGS(myskiplsn)));
4862  }
4863 
4864  heap_freetuple(tup);
4865  table_close(rel, NoLock);
4866 
4867  if (started_tx)
4869 }
4870 
4871 /* Error callback to give more context info about the change being applied */
4872 void
4874 {
4876 
4878  return;
4879 
4880  Assert(errarg->origin_name);
4881 
4882  if (errarg->rel == NULL)
4883  {
4884  if (!TransactionIdIsValid(errarg->remote_xid))
4885  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
4886  errarg->origin_name,
4887  logicalrep_message_type(errarg->command));
4888  else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4889  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
4890  errarg->origin_name,
4892  errarg->remote_xid);
4893  else
4894  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
4895  errarg->origin_name,
4897  errarg->remote_xid,
4898  LSN_FORMAT_ARGS(errarg->finish_lsn));
4899  }
4900  else
4901  {
4902  if (errarg->remote_attnum < 0)
4903  {
4904  if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4905  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
4906  errarg->origin_name,
4908  errarg->rel->remoterel.nspname,
4909  errarg->rel->remoterel.relname,
4910  errarg->remote_xid);
4911  else
4912  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
4913  errarg->origin_name,
4915  errarg->rel->remoterel.nspname,
4916  errarg->rel->remoterel.relname,
4917  errarg->remote_xid,
4918  LSN_FORMAT_ARGS(errarg->finish_lsn));
4919  }
4920  else
4921  {
4922  if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4923  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
4924  errarg->origin_name,
4926  errarg->rel->remoterel.nspname,
4927  errarg->rel->remoterel.relname,
4928  errarg->rel->remoterel.attnames[errarg->remote_attnum],
4929  errarg->remote_xid);
4930  else
4931  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
4932  errarg->origin_name,
4934  errarg->rel->remoterel.nspname,
4935  errarg->rel->remoterel.relname,
4936  errarg->rel->remoterel.attnames[errarg->remote_attnum],
4937  errarg->remote_xid,
4938  LSN_FORMAT_ARGS(errarg->finish_lsn));
4939  }
4940  }
4941 }
4942 
4943 /* Set transaction information of apply error callback */
4944 static inline void
4946 {
4949 }
4950 
4951 /* Reset all information of apply error callback */
4952 static inline void
4954 {
4959 }
4960 
4961 /*
4962  * Request wakeup of the workers for the given subscription OID
4963  * at commit of the current transaction.
4964  *
4965  * This is used to ensure that the workers process assorted changes
4966  * as soon as possible.
4967  */
4968 void
4970 {
4971  MemoryContext oldcxt;
4972 
4976  MemoryContextSwitchTo(oldcxt);
4977 }
4978 
4979 /*
4980  * Wake up the workers of any subscriptions that were changed in this xact.
4981  */
4982 void
4984 {
4985  if (isCommit && on_commit_wakeup_workers_subids != NIL)
4986  {
4987  ListCell *lc;
4988 
4989  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
4990  foreach(lc, on_commit_wakeup_workers_subids)
4991  {
4992  Oid subid = lfirst_oid(lc);
4993  List *workers;
4994  ListCell *lc2;
4995 
4996  workers = logicalrep_workers_find(subid, true, false);
4997  foreach(lc2, workers)
4998  {
4999  LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
5000 
5002  }
5003  }
5004  LWLockRelease(LogicalRepWorkerLock);
5005  }
5006 
5007  /* The List storage will be reclaimed automatically in xact cleanup. */
5009 }
5010 
5011 /*
5012  * Allocate the origin name in long-lived context for error context message.
5013  */
5014 void
5016 {
5018  originname);
5019 }
5020 
5021 /*
5022  * Return the action to be taken for the given transaction. See
5023  * TransApplyAction for information on each of the actions.
5024  *
5025  * *winfo is assigned to the destination parallel worker info when the leader
5026  * apply worker has to pass all the transaction's changes to the parallel
5027  * apply worker.
5028  */
5029 static TransApplyAction
5031 {
5032  *winfo = NULL;
5033 
5035  {
5036  return TRANS_PARALLEL_APPLY;
5037  }
5038 
5039  /*
5040  * If we are processing this transaction using a parallel apply worker
5041  * then either we send the changes to the parallel worker or if the worker
5042  * is busy then serialize the changes to the file which will later be
5043  * processed by the parallel worker.
5044  */
5045  *winfo = pa_find_worker(xid);
5046 
5047  if (*winfo && (*winfo)->serialize_changes)
5048  {
5050  }
5051  else if (*winfo)
5052  {
5054  }
5055 
5056  /*
5057  * If there is no parallel worker involved to process this transaction
5058  * then we either directly apply the change or serialize it to a file
5059  * which will later be applied when the transaction finish message is
5060  * processed.
5061  */
5062  else if (in_streamed_transaction)
5063  {
5064  return TRANS_LEADER_SERIALIZE;
5065  }
5066  else
5067  {
5068  return TRANS_LEADER_APPLY;
5069  }
5070 }
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2700
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4091
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_reset_subtrans(void)
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerShared * MyParallelShared
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_allocate_worker(TransactionId xid)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void pa_decr_and_wait_stream_block(void)
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:417
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:2476
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:4181
static void begin_replication_step(void)
Definition: worker.c:505
static void end_replication_step(void)
Definition: worker.c:528
static void cleanup_subxact_info(void)
Definition: worker.c:4385
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
Definition: worker.c:4335
static void apply_handle_stream_prepare(StringInfo s)
Definition: worker.c:1268
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition: worker.c:2454
static void subxact_info_add(TransactionId xid)
Definition: worker.c:4103
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:649
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:4202
MemoryContext ApplyMessageContext
Definition: worker.c:290
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:465
static void apply_handle_type(StringInfo s)
Definition: worker.c:2321
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:3141
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:3459
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:3988
static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
Definition: worker.c:5030
TransApplyAction
Definition: worker.c:266
@ TRANS_LEADER_SERIALIZE
Definition: worker.c:271
@ TRANS_PARALLEL_APPLY
Definition: worker.c:274
@ TRANS_LEADER_SEND_TO_PARALLEL
Definition: worker.c:272
@ TRANS_LEADER_APPLY
Definition: worker.c:268
@ TRANS_LEADER_PARTIAL_SERIALIZE
Definition: worker.c:273
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:556
static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
Definition: worker.c:4319
struct ApplyExecutionData ApplyExecutionData
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:4188
bool InitializingApplyWorker
Definition: worker.c:318
static void apply_worker_exit(void)
Definition: worker.c:3828
static BufFile * stream_fd
Definition: worker.c:339
static void apply_handle_update(StringInfo s)
Definition: worker.c:2517
void stream_stop_internal(TransactionId xid)
Definition: worker.c:1600
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:2128
void start_apply(XLogRecPtr origin_startpos)
Definition: worker.c:4404
static void stop_skipping_changes(void)
Definition: worker.c:4770
struct ApplySubXactData ApplySubXactData
#define NAPTIME_PER_CYCLE
Definition: worker.c:195
static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:2843
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:3389
static uint32 parallel_stream_nchanges
Definition: worker.c:315
static void apply_handle_commit_prepared(StringInfo s)
Definition: worker.c:1166
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:3475
void LogicalRepWorkersWakeupAtCommit(Oid subid)
Definition: worker.c:4969
bool IsLogicalWorker(void)
Definition: worker.c:4724
static ApplySubXactData subxact_data
Definition: worker.c:357
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
Definition: worker.c:2890
static ApplyErrorCallbackArg apply_error_callback_arg
Definition: worker.c:278
bool in_remote_transaction
Definition: worker.c:303
static XLogRecPtr skip_xact_finish_lsn
Definition: worker.c:335
static void stream_open_file(Oid subid, TransactionId xid, bool first_segment)
Definition: worker.c:4226
static void apply_handle_delete(StringInfo s)
Definition: worker.c:2701
void apply_dispatch(StringInfo s)
Definition: worker.c:3269
#define is_skipping_changes()
Definition: worker.c:336
static void stream_write_change(char action, StringInfo s)
Definition: worker.c:4289
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
Definition: worker.c:4792
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
Definition: worker.c:2634
static void ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, off_t offset)
Definition: worker.c:1966
static void apply_handle_begin(StringInfo s)
Definition: worker.c:988
void DisableSubscriptionAndExit(void)
Definition: worker.c:4685
static dlist_head lsn_mapping
Definition: worker.c:204
bool IsLogicalParallelApplyWorker(void)
Definition: worker.c:4733
void AtEOXact_LogicalRepWorkers(bool isCommit)
Definition: worker.c:4983
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:794
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:425
static void finish_edata(ApplyExecutionData *edata)
Definition: worker.c:706
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:895
static void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:4945
ErrorContextCallback * apply_error_context_stack
Definition: worker.c:288
static void stream_abort_internal(TransactionId xid, TransactionId subxid)
Definition: worker.c:1726
static void apply_handle_commit(StringInfo s)
Definition: worker.c:1013
void stream_start_internal(TransactionId xid, bool first_segment)
Definition: worker.c:1426
static List * on_commit_wakeup_workers_subids
Definition: worker.c:301
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:1809
static void apply_handle_relation(StringInfo s)
Definition: worker.c:2298
void set_apply_error_context_origin(char *originname)
Definition: worker.c:5015
struct ApplyErrorCallbackArg ApplyErrorCallbackArg
MemoryContext ApplyContext
Definition: worker.c:291
static void subxact_info_write(Oid subid, TransactionId xid)
Definition: worker.c:4003
static void TargetPrivilegesCheck(Relation rel, AclMode mode)
Definition: worker.c:2336
static void apply_handle_prepare(StringInfo s)
Definition: worker.c:1105
static void apply_handle_rollback_prepared(StringInfo s)
Definition: worker.c:1215
static void run_apply_worker()
Definition: worker.c:4436
void SetupApplyOrSyncWorker(int worker_slot)
Definition: worker.c:4624
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:1623
static void apply_handle_origin(StringInfo s)
Definition: worker.c:1405
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:3739
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:296
static XLogRecPtr remote_final_lsn
Definition: worker.c:304
static bool MySubscriptionValid
Definition: worker.c:299
void apply_error_callback(void *arg)
Definition: worker.c:4873
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition: worker.c:3433
static MemoryContext LogicalStreamingContext
Definition: worker.c:294
void maybe_reread_subscription(void)
Definition: worker.c:3859
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
Definition: worker.c:2238
void InitializeLogRepWorker(void)
Definition: worker.c:4541
static bool in_streamed_transaction
Definition: worker.c:307
struct SubXactInfo SubXactInfo
static void apply_handle_begin_prepare(StringInfo s)
Definition: worker.c:1039
struct FlushPosition FlushPosition
void ApplyWorkerMain(Datum main_arg)
Definition: worker.c:4665
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:1998
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:1464
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Definition: worker.c:4743
Subscription * MySubscription
Definition: worker.c:298
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
Definition: worker.c:1068
static void stream_close_file(void)
Definition: worker.c:4271
static TransactionId stream_xid
Definition: worker.c:309
static void apply_handle_insert(StringInfo s)
Definition: worker.c:2368
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:737
static void subxact_info_read(Oid subid, TransactionId xid)
Definition: worker.c:4052
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
Definition: worker.c:2789
static void reset_apply_error_context_info(void)
Definition: worker.c:4953
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1791
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1655
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1619
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_IDLE
@ STATE_IDLEINTRANSACTION
@ STATE_RUNNING
void BackgroundWorkerUnblockSignals(void)
Definition: bgworker.c:932
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: bgworker.c:892
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:815
static Datum values[MAXATTR]
Definition: bootstrap.c:150
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:654
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:291
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
Definition: buffile.c:833
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition: buffile.c:676
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
Definition: buffile.c:664
void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset)
Definition: buffile.c:928
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:740
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition: buffile.c:267
void BufFileClose(BufFile *file)
Definition: buffile.c:412
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
Definition: buffile.c:364
unsigned int uint32
Definition: c.h:506
#define likely(x)
Definition: c.h:310
#define Assert(condition)
Definition: c.h:858
uint32 TransactionId
Definition: c.h:652
#define OidIsValid(objectId)
Definition: c.h:775
size_t Size
Definition: c.h:605
int64 TimestampTz
Definition: timestamp.h:39
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:144
int my_log2(long num)
Definition: dynahash.c:1751
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1157
void EmitErrorReport(void)
Definition: elog.c:1670
int errdetail(const char *fmt,...)
Definition: elog.c:1203
ErrorContextCallback * error_context_stack
Definition: elog.c:94
void FlushErrorState(void)
Definition: elog.c:1850
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define LOG
Definition: elog.h:31
#define PG_RE_THROW()
Definition: elog.h:411
#define errcontext
Definition: elog.h:196
#define PG_TRY(...)
Definition: elog.h:370
#define WARNING
Definition: elog.h:36
#define DEBUG2
Definition: elog.h:29
#define PG_END_TRY(...)
Definition: elog.h:395
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:380
#define elog(elevel,...)
Definition: elog.h:224
#define ereport(elevel,...)
Definition: elog.h:149
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:223
void err(int eval, const char *fmt,...)
Definition: err.c:43
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:134
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:231
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:156
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1792
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam, List *resultRelations)
Definition: execMain.c:2539
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition: execMain.c:1199
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2982
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1278
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1639
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1918
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos)
Definition: execUtils.c:728
EState * CreateExecutorState(void)
Definition: execUtils.c:88
TupleConversionMap * ExecGetRootToChildMap(ResultRelInfo *resultRelInfo, EState *estate)
Definition: execUtils.c:1232
void FreeExecutorState(EState *estate)
Definition: execUtils.c:189
#define GetPerTupleExprContext(estate)
Definition: executor.h:550
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:555
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:244
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:333
void FileSetInit(FileSet *fileset)
Definition: fileset.c:52
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
Definition: fmgr.c:1772
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1754
struct Latch * MyLatch
Definition: globals.c:61
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:4282
@ PGC_S_OVERRIDE
Definition: guc.h:119
@ PGC_SUSET
Definition: guc.h:74
@ PGC_SIGHUP
Definition: guc.h:71
@ PGC_BACKEND
Definition: guc.h:73
void ProcessConfigFile(GucContext context)
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1209
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1434
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:612
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
#define DLIST_STATIC_INIT(name)
Definition: ilist.h:281
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
IndexInfo * BuildIndexInfo(Relation index)
Definition: index.c:2404
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:177
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:133
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
#define write(a, b, c)
Definition: win32.h:14
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void AcceptInvalidationMessages(void)
Definition: inval.c:806
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1516
void proc_exit(int code)
Definition: ipc.c:104
int i
Definition: isn.c:73
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:565
void ResetLatch(Latch *latch)
Definition: latch.c:724
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_TIMEOUT
Definition: latch.h:130
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:702
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition: launcher.c:275
void logicalrep_worker_attach(int slot)
Definition: launcher.c:713
void logicalrep_worker_wakeup(Oid subid, Oid relid)
Definition: launcher.c:682
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:54
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition: launcher.c:1081
List * lappend(List *list, void *datum)
Definition: list.c:339
List * lappend_oid(List *list, Oid datum)
Definition: list.c:375
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:722
List * list_append_unique_oid(List *list, Oid datum)
Definition: list.c:1380
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1073
int LOCKMODE
Definition: lockdefs.h:26
#define NoLock
Definition: lockdefs.h:34
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define AccessShareLock
Definition: lockdefs.h:36
#define RowExclusiveLock
Definition: lockdefs.h:38
@ LockTupleExclusive
Definition: lockoptions.h:58
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
Definition: logicalproto.h:44
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:42
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:43
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:97
LogicalRepMsgType
Definition: logicalproto.h:58
@ LOGICAL_REP_MSG_INSERT
Definition: logicalproto.h:62
@ LOGICAL_REP_MSG_TRUNCATE
Definition: logicalproto.h:65
@ LOGICAL_REP_MSG_STREAM_STOP
Definition: logicalproto.h:74
@ LOGICAL_REP_MSG_BEGIN
Definition: logicalproto.h:59
@ LOGICAL_REP_MSG_STREAM_PREPARE
Definition: logicalproto.h:77
@ LOGICAL_REP_MSG_STREAM_ABORT
Definition: logicalproto.h:76
@ LOGICAL_REP_MSG_BEGIN_PREPARE
Definition: logicalproto.h:69
@ LOGICAL_REP_MSG_STREAM_START
Definition: logicalproto.h:73
@ LOGICAL_REP_MSG_COMMIT
Definition: logicalproto.h:60
@ LOGICAL_REP_MSG_PREPARE
Definition: logicalproto.h:70
@ LOGICAL_REP_MSG_RELATION
Definition: logicalproto.h:66
@ LOGICAL_REP_MSG_MESSAGE
Definition: logicalproto.h:68
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
Definition: logicalproto.h:72
@ LOGICAL_REP_MSG_COMMIT_PREPARED
Definition: logicalproto.h:71
@ LOGICAL_REP_MSG_TYPE
Definition: logicalproto.h:67
@ LOGICAL_REP_MSG_DELETE
Definition: logicalproto.h:64
@ LOGICAL_REP_MSG_STREAM_COMMIT
Definition: logicalproto.h:75
@ LOGICAL_REP_MSG_ORIGIN
Definition: logicalproto.h:61
@ LOGICAL_REP_MSG_UPDATE
Definition: logicalproto.h:63
uint32 LogicalRepRelId
Definition: logicalproto.h:101
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:41
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:99
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:98
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3366
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1928
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2874
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition: lsyscache.c:2940
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
@ LW_SHARED
Definition: lwlock.h:115
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:383
MemoryContext TopTransactionContext
Definition: mcxt.c:154
char * pstrdup(const char *in)
Definition: mcxt.c:1696
void pfree(void *pointer)
Definition: mcxt.c:1521
MemoryContext TopMemoryContext
Definition: mcxt.c:149
void * palloc0(Size size)
Definition: mcxt.c:1347
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1541
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1683
void * palloc(Size size)
Definition: mcxt.c:1317
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:135
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:133
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition: miscinit.c:980
Oid GetUserId(void)
Definition: miscinit.c:514
CmdType
Definition: nodes.h:263
@ CMD_INSERT
Definition: nodes.h:267
@ CMD_DELETE
Definition: nodes.h:268
@ CMD_UPDATE
Definition: nodes.h:266
#define makeNode(_type_)
Definition: nodes.h:155
ObjectType get_relkind_objtype(char relkind)
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:157
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:221
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:252
RepOriginId replorigin_session_origin
Definition: origin.c:155
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1097
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1237
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:156
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
#define ACL_DELETE
Definition: parsenodes.h:79
uint64 AclMode
Definition: parsenodes.h:74
#define ACL_INSERT
Definition: parsenodes.h:76
#define ACL_UPDATE
Definition: parsenodes.h:78
@ RTE_RELATION
Definition: parsenodes.h:1028
@ DROP_RESTRICT
Definition: parsenodes.h:2334
#define ACL_SELECT
Definition: parsenodes.h:77
#define ACL_TRUNCATE
Definition: parsenodes.h:80
int16 attnum
Definition: pg_attribute.h:74
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:209
void * arg
static PgChecksumMode mode
Definition: pg_checksums.c:56
#define NAMEDATALEN
#define MAXPGPATH
const void size_t len
static int server_version
Definition: pg_dumpall.c:110
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:255
#define lfirst(lc)
Definition: pg_list.h:172
#define NIL
Definition: pg_list.h:68
#define list_make1(x1)
Definition: pg_list.h:212
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
#define lfirst_oid(lc)
Definition: pg_list.h:174
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
static char ** options
void FreeSubscription(Subscription *sub)
void DisableSubscription(Oid subid)
Subscription * GetSubscription(Oid subid, bool missing_ok)
#define LOGICALREP_STREAM_OFF
#define LOGICALREP_STREAM_PARALLEL
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_PENDING
#define LOGICALREP_TWOPHASE_STATE_ENABLED
FormData_pg_subscription * Form_pg_subscription
#define die(msg)
static char * buf
Definition: pg_test_fsync.c:73
long pgstat_report_stat(bool force)
Definition: pgstat.c:622
void pgstat_report_subscription_error(Oid subid, bool is_apply_error)
int64 timestamp
Expr * expression_planner(Expr *expr)
Definition: planner.c:6580
pqsigfunc pqsignal(int signo, pqsigfunc func)
int pgsocket
Definition: port.h:29
#define snprintf
Definition: port.h:238
#define PGINVALID_SOCKET
Definition: port.h:31
uintptr_t Datum
Definition: postgres.h:64
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:202
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:415
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:453
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152
char * c
static int fd(const char *x, int i)
Definition: preproc-init.c:105
char * s2
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:109
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:564
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
Definition: proto.c:336
const char * logicalrep_message_type(LogicalRepMsgType action)
Definition: proto.c:1217
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
Definition: proto.c:145
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:618
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:756
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:492
void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
Definition: proto.c:1192
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:74
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
Definition: proto.c:278
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:700
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:376
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:1137
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:436
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:239
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition: proto.c:1087
MemoryContextSwitchTo(old_ctx)
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389
#define RelationGetRelid(relation)
Definition: rel.h:505
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:701
#define RelationGetDescr(relation)
Definition: rel.h:531
#define RelationGetRelationName(relation)
Definition: rel.h:539
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:658
#define RelationGetNamespace(relation)
Definition: rel.h:546
ResourceOwner TopTransactionResourceOwner
Definition: resowner.c:167
ResourceOwner CurrentResourceOwner
Definition: resowner.c:165
Node * build_column_default(Relation rel, int attrno)
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition: rls.c:52
@ RLS_ENABLED
Definition: rls.h:45
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:216
void PushActiveSnapshot(Snapshot snapshot)
Definition: snapmgr.c:648
void PopActiveSnapshot(void)
Definition: snapmgr.c:743
void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
Definition: relation.c:540
bool IsIndexUsableForReplicaIdentityFull(IndexInfo *indexInfo, AttrMap *attrmap)
Definition: relation.c:804
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition: relation.c:602
Oid GetRelationIdentityOrPK(Relation rel)
Definition: relation.c:851
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:164
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:327
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:473
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:78
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition: stringinfo.h:130
TransactionId remote_xid
Definition: worker.c:226
LogicalRepMsgType command
Definition: worker.c:221
XLogRecPtr finish_lsn
Definition: worker.c:227
LogicalRepRelMapEntry * rel
Definition: worker.c:222
ResultRelInfo * targetRelInfo
Definition: worker.c:211
EState * estate
Definition: worker.c:208
PartitionTupleRouting * proute
Definition: worker.c:215
ModifyTableState * mtstate
Definition: worker.c:214
LogicalRepRelMapEntry * targetRel
Definition: worker.c:210
uint32 nsubxacts
Definition: worker.c:351
uint32 nsubxacts_max
Definition: worker.c:352
SubXactInfo * subxacts
Definition: worker.c:354
TransactionId subxact_last
Definition: worker.c:353
Definition: attmap.h:35
int maplen
Definition: attmap.h:37
AttrNumber * attnums
Definition: attmap.h:36
List * es_rteperminfos
Definition: execnodes.h:632
List * es_tupleTable
Definition: execnodes.h:669
List * es_opened_result_relations
Definition: execnodes.h:645
CommandId es_output_cid
Definition: execnodes.h:639
struct ErrorContextCallback * previous
Definition: elog.h:295
void(* callback)(void *arg)
Definition: elog.h:296
dlist_node node
Definition: worker.c:199
XLogRecPtr remote_end
Definition: worker.c:201
XLogRecPtr local_end
Definition: worker.c:200
ItemPointerData t_self
Definition: htup.h:65
Definition: pg_list.h:54
XLogRecPtr final_lsn
Definition: logicalproto.h:129
TransactionId xid
Definition: logicalproto.h:131
TimestampTz committime
Definition: logicalproto.h:138
LogicalRepRelation remoterel
StringInfoData * colvalues
Definition: logicalproto.h:87
TimestampTz last_recv_time
LogicalRepWorkerType type
TimestampTz reply_time
FileSet * stream_fileset
XLogRecPtr reply_lsn
XLogRecPtr last_lsn
TimestampTz last_send_time
CmdType operation
Definition: execnodes.h:1355
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1359
PlanState ps
Definition: execnodes.h:1354
ParallelApplyWorkerShared * shared
pg_atomic_uint32 pending_stream_count
Plan * plan
Definition: execnodes.h:1116
EState * state
Definition: execnodes.h:1118
Bitmapset * updatedCols
Definition: parsenodes.h:1299
RTEKind rtekind
Definition: parsenodes.h:1057
Form_pg_class rd_rel
Definition: rel.h:111
TupleTableSlot * ri_PartitionTupleSlot
Definition: execnodes.h:583
Relation ri_RelationDesc
Definition: execnodes.h:456
off_t offset
Definition: worker.c:345
TransactionId xid
Definition: worker.c:343
int fileno
Definition: worker.c:344
XLogRecPtr skiplsn
AttrMap * attrMap
Definition: tupconvert.h:28
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:123
bool * tts_isnull
Definition: tuptable.h:127
Datum * tts_values
Definition: tuptable.h:125
dlist_node * cur
Definition: ilist.h:200
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:86
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs, bool run_as_table_owner)
Definition: tablecmds.c:1884
bool AllTablesyncsReady(void)
Definition: tablesync.c:1732
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:281
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:667
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1757
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdIsValid(xid)
Definition: transam.h:41
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:5040
void AfterTriggerBeginQuery(void)
Definition: trigger.c:5020
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:192
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:102
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:454
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:509
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:368
void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res, int szgid)
Definition: twophase.c:2692
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
Definition: twophase.c:2633
void FinishPreparedTransaction(const char *gid, bool isCommit)
Definition: twophase.c:1503
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition: usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition: usercontext.c:87
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
static StringInfoData reply_message
Definition: walreceiver.c:131
int wal_receiver_status_interval
Definition: walreceiver.c:87
int wal_receiver_timeout
Definition: walreceiver.c:88
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:450
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:434
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:456
#define walrcv_server_version(conn)
Definition: walreceiver.h:446
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:452
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:442
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:454
int WalWriterDelay
Definition: walwriter.c:71
#define SIGHUP
Definition: win32_port.h:168
@ PARALLEL_TRANS_STARTED
@ PARALLEL_TRANS_FINISHED
static bool am_parallel_apply_worker(void)
@ WORKERTYPE_TABLESYNC
@ WORKERTYPE_UNKNOWN
@ WORKERTYPE_PARALLEL_APPLY
@ WORKERTYPE_APPLY
@ FS_SERIALIZE_DONE
static bool am_tablesync_worker(void)
static bool am_leader_apply_worker(void)
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4976
bool PrepareTransactionBlock(const char *gid)
Definition: xact.c:3979
bool IsTransactionState(void)
Definition: xact.c:385
void CommandCounterIncrement(void)
Definition: xact.c:1098
void StartTransactionCommand(void)
Definition: xact.c:3033
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:912
bool IsTransactionBlock(void)
Definition: xact.c:4958
void BeginTransactionBlock(void)
Definition: xact.c:3911
void CommitTransactionCommand(void)
Definition: xact.c:3131
bool EndTransactionBlock(bool chain)
Definition: xact.c:4031
void AbortOutOfAnyTransaction(void)
Definition: xact.c:4849
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:827
#define GIDSIZE
Definition: xact.h:31
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6469
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:253
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint32 TimeLineID
Definition: xlogdefs.h:59