PostgreSQL Source Code  git master
worker.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * worker.c
3  * PostgreSQL logical replication worker (apply)
4  *
5  * Copyright (c) 2016-2022, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/worker.c
9  *
10  * NOTES
11  * This file contains the worker which applies logical changes as they come
12  * from remote logical replication stream.
13  *
14  * The main worker (apply) is started by logical replication worker
15  * launcher for every enabled subscription in a database. It uses
16  * walsender protocol to communicate with publisher.
17  *
18  * This module includes server facing code and shares libpqwalreceiver
19  * module with walreceiver for providing the libpq specific functionality.
20  *
21  *
22  * STREAMED TRANSACTIONS
23  * ---------------------
24  * Streamed transactions (large transactions exceeding a memory limit on the
25  * upstream) are not applied immediately, but instead, the data is written
26  * to temporary files and then applied at once when the final commit arrives.
27  *
28  * Unlike the regular (non-streamed) case, handling streamed transactions has
29  * to handle aborts of both the toplevel transaction and subtransactions. This
30  * is achieved by tracking offsets for subtransactions, which is then used
31  * to truncate the file with serialized changes.
32  *
33  * The files are placed in tmp file directory by default, and the filenames
34  * include both the XID of the toplevel transaction and OID of the
35  * subscription. This is necessary so that different workers processing a
36  * remote transaction with the same XID doesn't interfere.
37  *
38  * We use BufFiles instead of using normal temporary files because (a) the
39  * BufFile infrastructure supports temporary files that exceed the OS file size
40  * limit, (b) provides a way for automatic clean up on the error and (c) provides
41  * a way to survive these files across local transactions and allow to open and
42  * close at stream start and close. We decided to use FileSet
43  * infrastructure as without that it deletes the files on the closure of the
44  * file and if we decide to keep stream files open across the start/stop stream
45  * then it will consume a lot of memory (more than 8K for each BufFile and
46  * there could be multiple such BufFiles as the subscriber could receive
47  * multiple start/stop streams for different transactions before getting the
48  * commit). Moreover, if we don't use FileSet then we also need to invent
49  * a new way to pass filenames to BufFile APIs so that we are allowed to open
50  * the file we desired across multiple stream-open calls for the same
51  * transaction.
52  *
53  * TWO_PHASE TRANSACTIONS
54  * ----------------------
55  * Two phase transactions are replayed at prepare and then committed or
56  * rolled back at commit prepared and rollback prepared respectively. It is
57  * possible to have a prepared transaction that arrives at the apply worker
58  * when the tablesync is busy doing the initial copy. In this case, the apply
59  * worker skips all the prepared operations [e.g. inserts] while the tablesync
60  * is still busy (see the condition of should_apply_changes_for_rel). The
61  * tablesync worker might not get such a prepared transaction because say it
62  * was prior to the initial consistent point but might have got some later
63  * commits. Now, the tablesync worker will exit without doing anything for the
64  * prepared transaction skipped by the apply worker as the sync location for it
65  * will be already ahead of the apply worker's current location. This would lead
66  * to an "empty prepare", because later when the apply worker does the commit
67  * prepare, there is nothing in it (the inserts were skipped earlier).
68  *
69  * To avoid this, and similar prepare confusions the subscription's two_phase
70  * commit is enabled only after the initial sync is over. The two_phase option
71  * has been implemented as a tri-state with values DISABLED, PENDING, and
72  * ENABLED.
73  *
74  * Even if the user specifies they want a subscription with two_phase = on,
75  * internally it will start with a tri-state of PENDING which only becomes
76  * ENABLED after all tablesync initializations are completed - i.e. when all
77  * tablesync workers have reached their READY state. In other words, the value
78  * PENDING is only a temporary state for subscription start-up.
79  *
80  * Until the two_phase is properly available (ENABLED) the subscription will
81  * behave as if two_phase = off. When the apply worker detects that all
82  * tablesyncs have become READY (while the tri-state was PENDING) it will
83  * restart the apply worker process. This happens in
84  * process_syncing_tables_for_apply.
85  *
86  * When the (re-started) apply worker finds that all tablesyncs are READY for a
87  * two_phase tri-state of PENDING it start streaming messages with the
88  * two_phase option which in turn enables the decoding of two-phase commits at
89  * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
90  * Now, it is possible that during the time we have not enabled two_phase, the
91  * publisher (replication server) would have skipped some prepares but we
92  * ensure that such prepares are sent along with commit prepare, see
93  * ReorderBufferFinishPrepared.
94  *
95  * If the subscription has no tables then a two_phase tri-state PENDING is
96  * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
97  * PUBLICATION which might otherwise be disallowed (see below).
98  *
99  * If ever a user needs to be aware of the tri-state value, they can fetch it
100  * from the pg_subscription catalog (see column subtwophasestate).
101  *
102  * We don't allow to toggle two_phase option of a subscription because it can
103  * lead to an inconsistent replica. Consider, initially, it was on and we have
104  * received some prepare then we turn it off, now at commit time the server
105  * will send the entire transaction data along with the commit. With some more
106  * analysis, we can allow changing this option from off to on but not sure if
107  * that alone would be useful.
108  *
109  * Finally, to avoid problems mentioned in previous paragraphs from any
110  * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
111  * to 'off' and then again back to 'on') there is a restriction for
112  * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
113  * the two_phase tri-state is ENABLED, except when copy_data = false.
114  *
115  * We can get prepare of the same GID more than once for the genuine cases
116  * where we have defined multiple subscriptions for publications on the same
117  * server and prepared transaction has operations on tables subscribed to those
118  * subscriptions. For such cases, if we use the GID sent by publisher one of
119  * the prepares will be successful and others will fail, in which case the
120  * server will send them again. Now, this can lead to a deadlock if user has
121  * set synchronous_standby_names for all the subscriptions on subscriber. To
122  * avoid such deadlocks, we generate a unique GID (consisting of the
123  * subscription oid and the xid of the prepared transaction) for each prepare
124  * transaction on the subscriber.
125  *-------------------------------------------------------------------------
126  */
127 
128 #include "postgres.h"
129 
130 #include <sys/stat.h>
131 #include <unistd.h>
132 
133 #include "access/table.h"
134 #include "access/tableam.h"
135 #include "access/twophase.h"
136 #include "access/xact.h"
137 #include "access/xlog_internal.h"
138 #include "catalog/catalog.h"
139 #include "catalog/indexing.h"
140 #include "catalog/namespace.h"
141 #include "catalog/partition.h"
142 #include "catalog/pg_inherits.h"
143 #include "catalog/pg_subscription.h"
145 #include "catalog/pg_tablespace.h"
146 #include "commands/tablecmds.h"
147 #include "commands/tablespace.h"
148 #include "commands/trigger.h"
149 #include "executor/executor.h"
150 #include "executor/execPartition.h"
152 #include "funcapi.h"
153 #include "libpq/pqformat.h"
154 #include "libpq/pqsignal.h"
155 #include "mb/pg_wchar.h"
156 #include "miscadmin.h"
157 #include "nodes/makefuncs.h"
158 #include "optimizer/optimizer.h"
159 #include "pgstat.h"
160 #include "postmaster/bgworker.h"
161 #include "postmaster/interrupt.h"
162 #include "postmaster/postmaster.h"
163 #include "postmaster/walwriter.h"
164 #include "replication/decode.h"
165 #include "replication/logical.h"
169 #include "replication/origin.h"
171 #include "replication/snapbuild.h"
172 #include "replication/walreceiver.h"
174 #include "rewrite/rewriteHandler.h"
175 #include "storage/buffile.h"
176 #include "storage/bufmgr.h"
177 #include "storage/fd.h"
178 #include "storage/ipc.h"
179 #include "storage/lmgr.h"
180 #include "storage/proc.h"
181 #include "storage/procarray.h"
182 #include "tcop/tcopprot.h"
183 #include "utils/acl.h"
184 #include "utils/builtins.h"
185 #include "utils/catcache.h"
186 #include "utils/dynahash.h"
187 #include "utils/datum.h"
188 #include "utils/fmgroids.h"
189 #include "utils/guc.h"
190 #include "utils/inval.h"
191 #include "utils/lsyscache.h"
192 #include "utils/memutils.h"
193 #include "utils/pg_lsn.h"
194 #include "utils/rel.h"
195 #include "utils/rls.h"
196 #include "utils/syscache.h"
197 #include "utils/timeout.h"
198 
199 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
200 
201 typedef struct FlushPosition
202 {
207 
209 
210 typedef struct ApplyExecutionData
211 {
212  EState *estate; /* executor state, used to track resources */
213 
214  LogicalRepRelMapEntry *targetRel; /* replication target rel */
215  ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
216 
217  /* These fields are used when the target relation is partitioned: */
218  ModifyTableState *mtstate; /* dummy ModifyTable state */
219  PartitionTupleRouting *proute; /* partition routing info */
221 
222 /* Struct for saving and restoring apply errcontext information */
223 typedef struct ApplyErrorCallbackArg
224 {
225  LogicalRepMsgType command; /* 0 if invalid */
227 
228  /* Remote node information */
229  int remote_attnum; /* -1 if invalid */
232  char *origin_name;
234 
236 {
237  .command = 0,
238  .rel = NULL,
239  .remote_attnum = -1,
240  .remote_xid = InvalidTransactionId,
241  .finish_lsn = InvalidXLogRecPtr,
242  .origin_name = NULL,
243 };
244 
247 
248 /* per stream context for streaming transactions */
250 
252 
254 static bool MySubscriptionValid = false;
255 
258 
259 /* fields valid only when processing streamed transaction */
260 static bool in_streamed_transaction = false;
261 
263 
264 /*
265  * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
266  * the subscription if the remote transaction's finish LSN matches the subskiplsn.
267  * Once we start skipping changes, we don't stop it until we skip all changes of
268  * the transaction even if pg_subscription is updated and MySubscription->skiplsn
269  * gets changed or reset during that. Also, in streaming transaction cases, we
270  * don't skip receiving and spooling the changes since we decide whether or not
271  * to skip applying the changes when starting to apply changes. The subskiplsn is
272  * cleared after successfully skipping the transaction or applying non-empty
273  * transaction. The latter prevents the mistakenly specified subskiplsn from
274  * being left.
275  */
277 #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
278 
279 /* BufFile handle of the current streaming file */
280 static BufFile *stream_fd = NULL;
281 
282 typedef struct SubXactInfo
283 {
284  TransactionId xid; /* XID of the subxact */
285  int fileno; /* file number in the buffile */
286  off_t offset; /* offset in the file */
288 
289 /* Sub-transaction data for the current streaming transaction */
290 typedef struct ApplySubXactData
291 {
292  uint32 nsubxacts; /* number of sub-transactions */
293  uint32 nsubxacts_max; /* current capacity of subxacts */
294  TransactionId subxact_last; /* xid of the last sub-transaction */
295  SubXactInfo *subxacts; /* sub-xact offset in changes file */
297 
299 
300 static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
301 static inline void changes_filename(char *path, Oid subid, TransactionId xid);
302 
303 /*
304  * Information about subtransactions of a given toplevel transaction.
305  */
306 static void subxact_info_write(Oid subid, TransactionId xid);
307 static void subxact_info_read(Oid subid, TransactionId xid);
308 static void subxact_info_add(TransactionId xid);
309 static inline void cleanup_subxact_info(void);
310 
311 /*
312  * Serialize and deserialize changes for a toplevel transaction.
313  */
314 static void stream_cleanup_files(Oid subid, TransactionId xid);
315 static void stream_open_file(Oid subid, TransactionId xid, bool first);
316 static void stream_write_change(char action, StringInfo s);
317 static void stream_close_file(void);
318 
319 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
320 
321 static void store_flush_position(XLogRecPtr remote_lsn);
322 
323 static void maybe_reread_subscription(void);
324 
325 static void DisableSubscriptionAndExit(void);
326 
327 /* prototype needed because of stream_commit */
328 static void apply_dispatch(StringInfo s);
329 
330 static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
332  ResultRelInfo *relinfo,
333  TupleTableSlot *remoteslot);
335  ResultRelInfo *relinfo,
336  TupleTableSlot *remoteslot,
337  LogicalRepTupleData *newtup);
339  ResultRelInfo *relinfo,
340  TupleTableSlot *remoteslot);
341 static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
342  LogicalRepRelation *remoterel,
343  TupleTableSlot *remoteslot,
344  TupleTableSlot **localslot);
346  TupleTableSlot *remoteslot,
347  LogicalRepTupleData *newtup,
348  CmdType operation);
349 
350 /* Compute GID for two_phase transactions */
351 static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
352 
353 /* Common streaming function to apply all the spooled messages */
354 static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
355 
356 /* Functions for skipping changes */
357 static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
358 static void stop_skipping_changes(void);
359 static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
360 
361 /* Functions for apply error callback */
362 static void apply_error_callback(void *arg);
363 static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
364 static inline void reset_apply_error_context_info(void);
365 
366 /*
367  * Should this worker apply changes for given relation.
368  *
369  * This is mainly needed for initial relation data sync as that runs in
370  * separate worker process running in parallel and we need some way to skip
371  * changes coming to the main apply worker during the sync of a table.
372  *
373  * Note we need to do smaller or equals comparison for SYNCDONE state because
374  * it might hold position of end of initial slot consistent point WAL
375  * record + 1 (ie start of next record) and next record can be COMMIT of
376  * transaction we are now processing (which is what we set remote_final_lsn
377  * to in apply_handle_begin).
378  */
379 static bool
381 {
382  if (am_tablesync_worker())
383  return MyLogicalRepWorker->relid == rel->localreloid;
384  else
385  return (rel->state == SUBREL_STATE_READY ||
386  (rel->state == SUBREL_STATE_SYNCDONE &&
387  rel->statelsn <= remote_final_lsn));
388 }
389 
390 /*
391  * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
392  *
393  * Start a transaction, if this is the first step (else we keep using the
394  * existing transaction).
395  * Also provide a global snapshot and ensure we run in ApplyMessageContext.
396  */
397 static void
399 {
401 
402  if (!IsTransactionState())
403  {
406  }
407 
409 
411 }
412 
413 /*
414  * Finish up one step of a replication transaction.
415  * Callers of begin_replication_step() must also call this.
416  *
417  * We don't close out the transaction here, but we should increment
418  * the command counter to make the effects of this step visible.
419  */
420 static void
422 {
424 
426 }
427 
428 /*
429  * Handle streamed transactions.
430  *
431  * If in streaming mode (receiving a block of streamed transaction), we
432  * simply redirect it to a file for the proper toplevel transaction.
433  *
434  * Returns true for streamed transactions, false otherwise (regular mode).
435  */
436 static bool
438 {
439  TransactionId xid;
440 
441  /* not in streaming mode */
443  return false;
444 
445  Assert(stream_fd != NULL);
447 
448  /*
449  * We should have received XID of the subxact as the first part of the
450  * message, so extract it.
451  */
452  xid = pq_getmsgint(s, 4);
453 
454  if (!TransactionIdIsValid(xid))
455  ereport(ERROR,
456  (errcode(ERRCODE_PROTOCOL_VIOLATION),
457  errmsg_internal("invalid transaction ID in streamed replication transaction")));
458 
459  /* Add the new subxact to the array (unless already there). */
460  subxact_info_add(xid);
461 
462  /* write the change to the current file */
464 
465  return true;
466 }
467 
468 /*
469  * Executor state preparation for evaluation of constraint expressions,
470  * indexes and triggers for the specified relation.
471  *
472  * Note that the caller must open and close any indexes to be updated.
473  */
474 static ApplyExecutionData *
476 {
477  ApplyExecutionData *edata;
478  EState *estate;
479  RangeTblEntry *rte;
480  ResultRelInfo *resultRelInfo;
481 
482  edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
483  edata->targetRel = rel;
484 
485  edata->estate = estate = CreateExecutorState();
486 
487  rte = makeNode(RangeTblEntry);
488  rte->rtekind = RTE_RELATION;
489  rte->relid = RelationGetRelid(rel->localrel);
490  rte->relkind = rel->localrel->rd_rel->relkind;
492  ExecInitRangeTable(estate, list_make1(rte));
493 
494  edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
495 
496  /*
497  * Use Relation opened by logicalrep_rel_open() instead of opening it
498  * again.
499  */
500  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
501 
502  /*
503  * We put the ResultRelInfo in the es_opened_result_relations list, even
504  * though we don't populate the es_result_relations array. That's a bit
505  * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
506  *
507  * ExecOpenIndices() is not called here either, each execution path doing
508  * an apply operation being responsible for that.
509  */
511  lappend(estate->es_opened_result_relations, resultRelInfo);
512 
513  estate->es_output_cid = GetCurrentCommandId(true);
514 
515  /* Prepare to catch AFTER triggers. */
517 
518  /* other fields of edata remain NULL for now */
519 
520  return edata;
521 }
522 
523 /*
524  * Finish any operations related to the executor state created by
525  * create_edata_for_relation().
526  */
527 static void
529 {
530  EState *estate = edata->estate;
531 
532  /* Handle any queued AFTER triggers. */
533  AfterTriggerEndQuery(estate);
534 
535  /* Shut down tuple routing, if any was done. */
536  if (edata->proute)
537  ExecCleanupTupleRouting(edata->mtstate, edata->proute);
538 
539  /*
540  * Cleanup. It might seem that we should call ExecCloseResultRelations()
541  * here, but we intentionally don't. It would close the rel we added to
542  * es_opened_result_relations above, which is wrong because we took no
543  * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
544  * any other relations opened during execution.
545  */
546  ExecResetTupleTable(estate->es_tupleTable, false);
547  FreeExecutorState(estate);
548  pfree(edata);
549 }
550 
551 /*
552  * Executes default values for columns for which we can't map to remote
553  * relation columns.
554  *
555  * This allows us to support tables which have more columns on the downstream
556  * than on the upstream.
557  */
558 static void
560  TupleTableSlot *slot)
561 {
562  TupleDesc desc = RelationGetDescr(rel->localrel);
563  int num_phys_attrs = desc->natts;
564  int i;
565  int attnum,
566  num_defaults = 0;
567  int *defmap;
568  ExprState **defexprs;
569  ExprContext *econtext;
570 
571  econtext = GetPerTupleExprContext(estate);
572 
573  /* We got all the data via replication, no need to evaluate anything. */
574  if (num_phys_attrs == rel->remoterel.natts)
575  return;
576 
577  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
578  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
579 
580  Assert(rel->attrmap->maplen == num_phys_attrs);
581  for (attnum = 0; attnum < num_phys_attrs; attnum++)
582  {
583  Expr *defexpr;
584 
585  if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
586  continue;
587 
588  if (rel->attrmap->attnums[attnum] >= 0)
589  continue;
590 
591  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
592 
593  if (defexpr != NULL)
594  {
595  /* Run the expression through planner */
596  defexpr = expression_planner(defexpr);
597 
598  /* Initialize executable expression in copycontext */
599  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
600  defmap[num_defaults] = attnum;
601  num_defaults++;
602  }
603  }
604 
605  for (i = 0; i < num_defaults; i++)
606  slot->tts_values[defmap[i]] =
607  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
608 }
609 
610 /*
611  * Store tuple data into slot.
612  *
613  * Incoming data can be either text or binary format.
614  */
615 static void
617  LogicalRepTupleData *tupleData)
618 {
619  int natts = slot->tts_tupleDescriptor->natts;
620  int i;
621 
622  ExecClearTuple(slot);
623 
624  /* Call the "in" function for each non-dropped, non-null attribute */
625  Assert(natts == rel->attrmap->maplen);
626  for (i = 0; i < natts; i++)
627  {
629  int remoteattnum = rel->attrmap->attnums[i];
630 
631  if (!att->attisdropped && remoteattnum >= 0)
632  {
633  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
634 
635  Assert(remoteattnum < tupleData->ncols);
636 
637  /* Set attnum for error callback */
639 
640  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
641  {
642  Oid typinput;
643  Oid typioparam;
644 
645  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
646  slot->tts_values[i] =
647  OidInputFunctionCall(typinput, colvalue->data,
648  typioparam, att->atttypmod);
649  slot->tts_isnull[i] = false;
650  }
651  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
652  {
653  Oid typreceive;
654  Oid typioparam;
655 
656  /*
657  * In some code paths we may be asked to re-parse the same
658  * tuple data. Reset the StringInfo's cursor so that works.
659  */
660  colvalue->cursor = 0;
661 
662  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
663  slot->tts_values[i] =
664  OidReceiveFunctionCall(typreceive, colvalue,
665  typioparam, att->atttypmod);
666 
667  /* Trouble if it didn't eat the whole buffer */
668  if (colvalue->cursor != colvalue->len)
669  ereport(ERROR,
670  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
671  errmsg("incorrect binary data format in logical replication column %d",
672  remoteattnum + 1)));
673  slot->tts_isnull[i] = false;
674  }
675  else
676  {
677  /*
678  * NULL value from remote. (We don't expect to see
679  * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
680  * NULL.)
681  */
682  slot->tts_values[i] = (Datum) 0;
683  slot->tts_isnull[i] = true;
684  }
685 
686  /* Reset attnum for error callback */
688  }
689  else
690  {
691  /*
692  * We assign NULL to dropped attributes and missing values
693  * (missing values should be later filled using
694  * slot_fill_defaults).
695  */
696  slot->tts_values[i] = (Datum) 0;
697  slot->tts_isnull[i] = true;
698  }
699  }
700 
701  ExecStoreVirtualTuple(slot);
702 }
703 
704 /*
705  * Replace updated columns with data from the LogicalRepTupleData struct.
706  * This is somewhat similar to heap_modify_tuple but also calls the type
707  * input functions on the user data.
708  *
709  * "slot" is filled with a copy of the tuple in "srcslot", replacing
710  * columns provided in "tupleData" and leaving others as-is.
711  *
712  * Caution: unreplaced pass-by-ref columns in "slot" will point into the
713  * storage for "srcslot". This is OK for current usage, but someday we may
714  * need to materialize "slot" at the end to make it independent of "srcslot".
715  */
716 static void
719  LogicalRepTupleData *tupleData)
720 {
721  int natts = slot->tts_tupleDescriptor->natts;
722  int i;
723 
724  /* We'll fill "slot" with a virtual tuple, so we must start with ... */
725  ExecClearTuple(slot);
726 
727  /*
728  * Copy all the column data from srcslot, so that we'll have valid values
729  * for unreplaced columns.
730  */
731  Assert(natts == srcslot->tts_tupleDescriptor->natts);
732  slot_getallattrs(srcslot);
733  memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
734  memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
735 
736  /* Call the "in" function for each replaced attribute */
737  Assert(natts == rel->attrmap->maplen);
738  for (i = 0; i < natts; i++)
739  {
741  int remoteattnum = rel->attrmap->attnums[i];
742 
743  if (remoteattnum < 0)
744  continue;
745 
746  Assert(remoteattnum < tupleData->ncols);
747 
748  if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
749  {
750  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
751 
752  /* Set attnum for error callback */
754 
755  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
756  {
757  Oid typinput;
758  Oid typioparam;
759 
760  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
761  slot->tts_values[i] =
762  OidInputFunctionCall(typinput, colvalue->data,
763  typioparam, att->atttypmod);
764  slot->tts_isnull[i] = false;
765  }
766  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
767  {
768  Oid typreceive;
769  Oid typioparam;
770 
771  /*
772  * In some code paths we may be asked to re-parse the same
773  * tuple data. Reset the StringInfo's cursor so that works.
774  */
775  colvalue->cursor = 0;
776 
777  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
778  slot->tts_values[i] =
779  OidReceiveFunctionCall(typreceive, colvalue,
780  typioparam, att->atttypmod);
781 
782  /* Trouble if it didn't eat the whole buffer */
783  if (colvalue->cursor != colvalue->len)
784  ereport(ERROR,
785  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
786  errmsg("incorrect binary data format in logical replication column %d",
787  remoteattnum + 1)));
788  slot->tts_isnull[i] = false;
789  }
790  else
791  {
792  /* must be LOGICALREP_COLUMN_NULL */
793  slot->tts_values[i] = (Datum) 0;
794  slot->tts_isnull[i] = true;
795  }
796 
797  /* Reset attnum for error callback */
799  }
800  }
801 
802  /* And finally, declare that "slot" contains a valid virtual tuple */
803  ExecStoreVirtualTuple(slot);
804 }
805 
806 /*
807  * Handle BEGIN message.
808  */
809 static void
811 {
812  LogicalRepBeginData begin_data;
813 
814  logicalrep_read_begin(s, &begin_data);
815  set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
816 
817  remote_final_lsn = begin_data.final_lsn;
818 
820 
821  in_remote_transaction = true;
822 
824 }
825 
826 /*
827  * Handle COMMIT message.
828  *
829  * TODO, support tracking of multiple origins
830  */
831 static void
833 {
834  LogicalRepCommitData commit_data;
835 
836  logicalrep_read_commit(s, &commit_data);
837 
838  if (commit_data.commit_lsn != remote_final_lsn)
839  ereport(ERROR,
840  (errcode(ERRCODE_PROTOCOL_VIOLATION),
841  errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
842  LSN_FORMAT_ARGS(commit_data.commit_lsn),
844 
845  apply_handle_commit_internal(&commit_data);
846 
847  /* Process any tables that are being synchronized in parallel. */
848  process_syncing_tables(commit_data.end_lsn);
849 
852 }
853 
854 /*
855  * Handle BEGIN PREPARE message.
856  */
857 static void
859 {
860  LogicalRepPreparedTxnData begin_data;
861 
862  /* Tablesync should never receive prepare. */
863  if (am_tablesync_worker())
864  ereport(ERROR,
865  (errcode(ERRCODE_PROTOCOL_VIOLATION),
866  errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
867 
868  logicalrep_read_begin_prepare(s, &begin_data);
869  set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
870 
871  remote_final_lsn = begin_data.prepare_lsn;
872 
874 
875  in_remote_transaction = true;
876 
878 }
879 
880 /*
881  * Common function to prepare the GID.
882  */
883 static void
885 {
886  char gid[GIDSIZE];
887 
888  /*
889  * Compute unique GID for two_phase transactions. We don't use GID of
890  * prepared transaction sent by server as that can lead to deadlock when
891  * we have multiple subscriptions from same node point to publications on
892  * the same node. See comments atop worker.c
893  */
895  gid, sizeof(gid));
896 
897  /*
898  * BeginTransactionBlock is necessary to balance the EndTransactionBlock
899  * called within the PrepareTransactionBlock below.
900  */
902  CommitTransactionCommand(); /* Completes the preceding Begin command. */
903 
904  /*
905  * Update origin state so we can restart streaming from correct position
906  * in case of crash.
907  */
908  replorigin_session_origin_lsn = prepare_data->end_lsn;
910 
912 }
913 
914 /*
915  * Handle PREPARE message.
916  */
917 static void
919 {
920  LogicalRepPreparedTxnData prepare_data;
921 
922  logicalrep_read_prepare(s, &prepare_data);
923 
924  if (prepare_data.prepare_lsn != remote_final_lsn)
925  ereport(ERROR,
926  (errcode(ERRCODE_PROTOCOL_VIOLATION),
927  errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
928  LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
930 
931  /*
932  * Unlike commit, here, we always prepare the transaction even though no
933  * change has happened in this transaction or all changes are skipped. It
934  * is done this way because at commit prepared time, we won't know whether
935  * we have skipped preparing a transaction because of those reasons.
936  *
937  * XXX, We can optimize such that at commit prepared time, we first check
938  * whether we have prepared the transaction or not but that doesn't seem
939  * worthwhile because such cases shouldn't be common.
940  */
942 
943  apply_handle_prepare_internal(&prepare_data);
944 
947  pgstat_report_stat(false);
948 
949  store_flush_position(prepare_data.end_lsn);
950 
951  in_remote_transaction = false;
952 
953  /* Process any tables that are being synchronized in parallel. */
954  process_syncing_tables(prepare_data.end_lsn);
955 
956  /*
957  * Since we have already prepared the transaction, in a case where the
958  * server crashes before clearing the subskiplsn, it will be left but the
959  * transaction won't be resent. But that's okay because it's a rare case
960  * and the subskiplsn will be cleared when finishing the next transaction.
961  */
964 
967 }
968 
969 /*
970  * Handle a COMMIT PREPARED of a previously PREPARED transaction.
971  */
972 static void
974 {
975  LogicalRepCommitPreparedTxnData prepare_data;
976  char gid[GIDSIZE];
977 
978  logicalrep_read_commit_prepared(s, &prepare_data);
979  set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
980 
981  /* Compute GID for two_phase transactions. */
983  gid, sizeof(gid));
984 
985  /* There is no transaction when COMMIT PREPARED is called */
987 
988  /*
989  * Update origin state so we can restart streaming from correct position
990  * in case of crash.
991  */
992  replorigin_session_origin_lsn = prepare_data.end_lsn;
994 
995  FinishPreparedTransaction(gid, true);
998  pgstat_report_stat(false);
999 
1000  store_flush_position(prepare_data.end_lsn);
1001  in_remote_transaction = false;
1002 
1003  /* Process any tables that are being synchronized in parallel. */
1004  process_syncing_tables(prepare_data.end_lsn);
1005 
1006  clear_subscription_skip_lsn(prepare_data.end_lsn);
1007 
1010 }
1011 
1012 /*
1013  * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1014  */
1015 static void
1017 {
1018  LogicalRepRollbackPreparedTxnData rollback_data;
1019  char gid[GIDSIZE];
1020 
1021  logicalrep_read_rollback_prepared(s, &rollback_data);
1022  set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1023 
1024  /* Compute GID for two_phase transactions. */
1025  TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1026  gid, sizeof(gid));
1027 
1028  /*
1029  * It is possible that we haven't received prepare because it occurred
1030  * before walsender reached a consistent point or the two_phase was still
1031  * not enabled by that time, so in such cases, we need to skip rollback
1032  * prepared.
1033  */
1034  if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1035  rollback_data.prepare_time))
1036  {
1037  /*
1038  * Update origin state so we can restart streaming from correct
1039  * position in case of crash.
1040  */
1043 
1044  /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1046  FinishPreparedTransaction(gid, false);
1049 
1051  }
1052 
1053  pgstat_report_stat(false);
1054 
1055  store_flush_position(rollback_data.rollback_end_lsn);
1056  in_remote_transaction = false;
1057 
1058  /* Process any tables that are being synchronized in parallel. */
1060 
1063 }
1064 
1065 /*
1066  * Handle STREAM PREPARE.
1067  *
1068  * Logic is in two parts:
1069  * 1. Replay all the spooled operations
1070  * 2. Mark the transaction as prepared
1071  */
1072 static void
1074 {
1075  LogicalRepPreparedTxnData prepare_data;
1076 
1078  ereport(ERROR,
1079  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1080  errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1081 
1082  /* Tablesync should never receive prepare. */
1083  if (am_tablesync_worker())
1084  ereport(ERROR,
1085  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1086  errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1087 
1088  logicalrep_read_stream_prepare(s, &prepare_data);
1089  set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1090 
1091  elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
1092 
1093  /* Replay all the spooled operations. */
1094  apply_spooled_messages(prepare_data.xid, prepare_data.prepare_lsn);
1095 
1096  /* Mark the transaction as prepared. */
1097  apply_handle_prepare_internal(&prepare_data);
1098 
1100 
1101  pgstat_report_stat(false);
1102 
1103  store_flush_position(prepare_data.end_lsn);
1104 
1105  in_remote_transaction = false;
1106 
1107  /* unlink the files with serialized changes and subxact info. */
1109 
1110  /* Process any tables that are being synchronized in parallel. */
1111  process_syncing_tables(prepare_data.end_lsn);
1112 
1113  /*
1114  * Similar to prepare case, the subskiplsn could be left in a case of
1115  * server crash but it's okay. See the comments in apply_handle_prepare().
1116  */
1119 
1121 
1123 }
1124 
1125 /*
1126  * Handle ORIGIN message.
1127  *
1128  * TODO, support tracking of multiple origins
1129  */
1130 static void
1132 {
1133  /*
1134  * ORIGIN message can only come inside streaming transaction or inside
1135  * remote transaction and before any actual writes.
1136  */
1137  if (!in_streamed_transaction &&
1140  ereport(ERROR,
1141  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1142  errmsg_internal("ORIGIN message sent out of order")));
1143 }
1144 
1145 /*
1146  * Handle STREAM START message.
1147  */
1148 static void
1150 {
1151  bool first_segment;
1152 
1154  ereport(ERROR,
1155  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1156  errmsg_internal("duplicate STREAM START message")));
1157 
1158  /*
1159  * Start a transaction on stream start, this transaction will be committed
1160  * on the stream stop unless it is a tablesync worker in which case it
1161  * will be committed after processing all the messages. We need the
1162  * transaction for handling the buffile, used for serializing the
1163  * streaming data and subxact info.
1164  */
1166 
1167  /* notify handle methods we're processing a remote transaction */
1168  in_streamed_transaction = true;
1169 
1170  /* extract XID of the top-level transaction */
1171  stream_xid = logicalrep_read_stream_start(s, &first_segment);
1172 
1174  ereport(ERROR,
1175  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1176  errmsg_internal("invalid transaction ID in streamed replication transaction")));
1177 
1179 
1180  /*
1181  * Initialize the worker's stream_fileset if we haven't yet. This will be
1182  * used for the entire duration of the worker so create it in a permanent
1183  * context. We create this on the very first streaming message from any
1184  * transaction and then use it for this and other streaming transactions.
1185  * Now, we could create a fileset at the start of the worker as well but
1186  * then we won't be sure that it will ever be used.
1187  */
1188  if (MyLogicalRepWorker->stream_fileset == NULL)
1189  {
1190  MemoryContext oldctx;
1191 
1193 
1196 
1197  MemoryContextSwitchTo(oldctx);
1198  }
1199 
1200  /* open the spool file for this transaction */
1202 
1203  /* if this is not the first segment, open existing subxact file */
1204  if (!first_segment)
1206 
1208 
1210 }
1211 
1212 /*
1213  * Handle STREAM STOP message.
1214  */
1215 static void
1217 {
1219  ereport(ERROR,
1220  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1221  errmsg_internal("STREAM STOP message without STREAM START")));
1222 
1223  /*
1224  * Close the file with serialized changes, and serialize information about
1225  * subxacts for the toplevel transaction.
1226  */
1229 
1230  /* We must be in a valid transaction state */
1232 
1233  /* Commit the per-stream transaction */
1235 
1236  in_streamed_transaction = false;
1237 
1238  /* Reset per-stream context */
1240 
1243 }
1244 
1245 /*
1246  * Handle STREAM abort message.
1247  */
1248 static void
1250 {
1251  TransactionId xid;
1252  TransactionId subxid;
1253 
1255  ereport(ERROR,
1256  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1257  errmsg_internal("STREAM ABORT message without STREAM STOP")));
1258 
1259  logicalrep_read_stream_abort(s, &xid, &subxid);
1260 
1261  /*
1262  * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1263  * just delete the files with serialized info.
1264  */
1265  if (xid == subxid)
1266  {
1269  }
1270  else
1271  {
1272  /*
1273  * OK, so it's a subxact. We need to read the subxact file for the
1274  * toplevel transaction, determine the offset tracked for the subxact,
1275  * and truncate the file with changes. We also remove the subxacts
1276  * with higher offsets (or rather higher XIDs).
1277  *
1278  * We intentionally scan the array from the tail, because we're likely
1279  * aborting a change for the most recent subtransactions.
1280  *
1281  * We can't use the binary search here as subxact XIDs won't
1282  * necessarily arrive in sorted order, consider the case where we have
1283  * released the savepoint for multiple subtransactions and then
1284  * performed rollback to savepoint for one of the earlier
1285  * sub-transaction.
1286  */
1287  int64 i;
1288  int64 subidx;
1289  BufFile *fd;
1290  bool found = false;
1291  char path[MAXPGPATH];
1292 
1294 
1295  subidx = -1;
1298 
1299  for (i = subxact_data.nsubxacts; i > 0; i--)
1300  {
1301  if (subxact_data.subxacts[i - 1].xid == subxid)
1302  {
1303  subidx = (i - 1);
1304  found = true;
1305  break;
1306  }
1307  }
1308 
1309  /*
1310  * If it's an empty sub-transaction then we will not find the subxid
1311  * here so just cleanup the subxact info and return.
1312  */
1313  if (!found)
1314  {
1315  /* Cleanup the subxact info */
1320  return;
1321  }
1322 
1323  /* open the changes file */
1326  O_RDWR, false);
1327 
1328  /* OK, truncate the file at the right offset */
1330  subxact_data.subxacts[subidx].offset);
1331  BufFileClose(fd);
1332 
1333  /* discard the subxacts added later */
1334  subxact_data.nsubxacts = subidx;
1335 
1336  /* write the updated subxact list */
1338 
1341  }
1342 
1344 }
1345 
1346 /*
1347  * Common spoolfile processing.
1348  */
1349 static void
1351 {
1353  int nchanges;
1354  char path[MAXPGPATH];
1355  char *buffer = NULL;
1356  MemoryContext oldcxt;
1357  BufFile *fd;
1358 
1360 
1361  /* Make sure we have an open transaction */
1363 
1364  /*
1365  * Allocate file handle and memory required to process all the messages in
1366  * TopTransactionContext to avoid them getting reset after each message is
1367  * processed.
1368  */
1370 
1371  /* Open the spool file for the committed/prepared transaction */
1373  elog(DEBUG1, "replaying changes from file \"%s\"", path);
1374 
1376  false);
1377 
1378  buffer = palloc(BLCKSZ);
1379  initStringInfo(&s2);
1380 
1381  MemoryContextSwitchTo(oldcxt);
1382 
1383  remote_final_lsn = lsn;
1384 
1385  /*
1386  * Make sure the handle apply_dispatch methods are aware we're in a remote
1387  * transaction.
1388  */
1389  in_remote_transaction = true;
1391 
1393 
1394  /*
1395  * Read the entries one by one and pass them through the same logic as in
1396  * apply_dispatch.
1397  */
1398  nchanges = 0;
1399  while (true)
1400  {
1401  int nbytes;
1402  int len;
1403 
1405 
1406  /* read length of the on-disk record */
1407  nbytes = BufFileRead(fd, &len, sizeof(len));
1408 
1409  /* have we reached end of the file? */
1410  if (nbytes == 0)
1411  break;
1412 
1413  /* do we have a correct length? */
1414  if (nbytes != sizeof(len))
1415  ereport(ERROR,
1417  errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1418  path)));
1419 
1420  if (len <= 0)
1421  elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
1422  len, path);
1423 
1424  /* make sure we have sufficiently large buffer */
1425  buffer = repalloc(buffer, len);
1426 
1427  /* and finally read the data into the buffer */
1428  if (BufFileRead(fd, buffer, len) != len)
1429  ereport(ERROR,
1431  errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1432  path)));
1433 
1434  /* copy the buffer to the stringinfo and call apply_dispatch */
1435  resetStringInfo(&s2);
1436  appendBinaryStringInfo(&s2, buffer, len);
1437 
1438  /* Ensure we are reading the data into our memory context. */
1440 
1441  apply_dispatch(&s2);
1442 
1444 
1445  MemoryContextSwitchTo(oldcxt);
1446 
1447  nchanges++;
1448 
1449  if (nchanges % 1000 == 0)
1450  elog(DEBUG1, "replayed %d changes from file \"%s\"",
1451  nchanges, path);
1452  }
1453 
1454  BufFileClose(fd);
1455 
1456  pfree(buffer);
1457  pfree(s2.data);
1458 
1459  elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
1460  nchanges, path);
1461 
1462  return;
1463 }
1464 
1465 /*
1466  * Handle STREAM COMMIT message.
1467  */
1468 static void
1470 {
1471  TransactionId xid;
1472  LogicalRepCommitData commit_data;
1473 
1475  ereport(ERROR,
1476  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1477  errmsg_internal("STREAM COMMIT message without STREAM STOP")));
1478 
1479  xid = logicalrep_read_stream_commit(s, &commit_data);
1480  set_apply_error_context_xact(xid, commit_data.commit_lsn);
1481 
1482  elog(DEBUG1, "received commit for streamed transaction %u", xid);
1483 
1484  apply_spooled_messages(xid, commit_data.commit_lsn);
1485 
1486  apply_handle_commit_internal(&commit_data);
1487 
1488  /* unlink the files with serialized changes and subxact info */
1490 
1491  /* Process any tables that are being synchronized in parallel. */
1492  process_syncing_tables(commit_data.end_lsn);
1493 
1495 
1497 }
1498 
1499 /*
1500  * Helper function for apply_handle_commit and apply_handle_stream_commit.
1501  */
1502 static void
1504 {
1505  if (is_skipping_changes())
1506  {
1508 
1509  /*
1510  * Start a new transaction to clear the subskiplsn, if not started
1511  * yet.
1512  */
1513  if (!IsTransactionState())
1515  }
1516 
1517  if (IsTransactionState())
1518  {
1519  /*
1520  * The transaction is either non-empty or skipped, so we clear the
1521  * subskiplsn.
1522  */
1524 
1525  /*
1526  * Update origin state so we can restart streaming from correct
1527  * position in case of crash.
1528  */
1529  replorigin_session_origin_lsn = commit_data->end_lsn;
1531 
1533  pgstat_report_stat(false);
1534 
1535  store_flush_position(commit_data->end_lsn);
1536  }
1537  else
1538  {
1539  /* Process any invalidation messages that might have accumulated. */
1542  }
1543 
1544  in_remote_transaction = false;
1545 }
1546 
1547 /*
1548  * Handle RELATION message.
1549  *
1550  * Note we don't do validation against local schema here. The validation
1551  * against local schema is postponed until first change for given relation
1552  * comes as we only care about it when applying changes for it anyway and we
1553  * do less locking this way.
1554  */
1555 static void
1557 {
1558  LogicalRepRelation *rel;
1559 
1561  return;
1562 
1563  rel = logicalrep_read_rel(s);
1565 
1566  /* Also reset all entries in the partition map that refer to remoterel. */
1568 }
1569 
1570 /*
1571  * Handle TYPE message.
1572  *
1573  * This implementation pays no attention to TYPE messages; we expect the user
1574  * to have set things up so that the incoming data is acceptable to the input
1575  * functions for the locally subscribed tables. Hence, we just read and
1576  * discard the message.
1577  */
1578 static void
1580 {
1581  LogicalRepTyp typ;
1582 
1584  return;
1585 
1586  logicalrep_read_typ(s, &typ);
1587 }
1588 
1589 /*
1590  * Get replica identity index or if it is not defined a primary key.
1591  *
1592  * If neither is defined, returns InvalidOid
1593  */
1594 static Oid
1596 {
1597  Oid idxoid;
1598 
1599  idxoid = RelationGetReplicaIndex(rel);
1600 
1601  if (!OidIsValid(idxoid))
1602  idxoid = RelationGetPrimaryKeyIndex(rel);
1603 
1604  return idxoid;
1605 }
1606 
1607 /*
1608  * Check that we (the subscription owner) have sufficient privileges on the
1609  * target relation to perform the given operation.
1610  */
1611 static void
1613 {
1614  Oid relid;
1615  AclResult aclresult;
1616 
1617  relid = RelationGetRelid(rel);
1618  aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
1619  if (aclresult != ACLCHECK_OK)
1620  aclcheck_error(aclresult,
1621  get_relkind_objtype(rel->rd_rel->relkind),
1622  get_rel_name(relid));
1623 
1624  /*
1625  * We lack the infrastructure to honor RLS policies. It might be possible
1626  * to add such infrastructure here, but tablesync workers lack it, too, so
1627  * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
1628  * but it seems dangerous to replicate a TRUNCATE and then refuse to
1629  * replicate subsequent INSERTs, so we forbid all commands the same.
1630  */
1631  if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
1632  ereport(ERROR,
1633  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1634  errmsg("\"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1635  GetUserNameFromId(GetUserId(), true),
1636  RelationGetRelationName(rel))));
1637 }
1638 
1639 /*
1640  * Handle INSERT message.
1641  */
1642 
1643 static void
1645 {
1646  LogicalRepRelMapEntry *rel;
1647  LogicalRepTupleData newtup;
1648  LogicalRepRelId relid;
1649  ApplyExecutionData *edata;
1650  EState *estate;
1651  TupleTableSlot *remoteslot;
1652  MemoryContext oldctx;
1653 
1654  /*
1655  * Quick return if we are skipping data modification changes or handling
1656  * streamed transactions.
1657  */
1658  if (is_skipping_changes() ||
1660  return;
1661 
1663 
1664  relid = logicalrep_read_insert(s, &newtup);
1665  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1666  if (!should_apply_changes_for_rel(rel))
1667  {
1668  /*
1669  * The relation can't become interesting in the middle of the
1670  * transaction so it's safe to unlock it.
1671  */
1674  return;
1675  }
1676 
1677  /* Set relation for error callback */
1679 
1680  /* Initialize the executor state. */
1681  edata = create_edata_for_relation(rel);
1682  estate = edata->estate;
1683  remoteslot = ExecInitExtraTupleSlot(estate,
1684  RelationGetDescr(rel->localrel),
1685  &TTSOpsVirtual);
1686 
1687  /* Process and store remote tuple in the slot */
1689  slot_store_data(remoteslot, rel, &newtup);
1690  slot_fill_defaults(rel, estate, remoteslot);
1691  MemoryContextSwitchTo(oldctx);
1692 
1693  /* For a partitioned table, insert the tuple into a partition. */
1694  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1696  remoteslot, NULL, CMD_INSERT);
1697  else
1699  remoteslot);
1700 
1701  finish_edata(edata);
1702 
1703  /* Reset relation for error callback */
1705 
1707 
1709 }
1710 
1711 /*
1712  * Workhorse for apply_handle_insert()
1713  * relinfo is for the relation we're actually inserting into
1714  * (could be a child partition of edata->targetRelInfo)
1715  */
1716 static void
1718  ResultRelInfo *relinfo,
1719  TupleTableSlot *remoteslot)
1720 {
1721  EState *estate = edata->estate;
1722 
1723  /* We must open indexes here. */
1724  ExecOpenIndices(relinfo, false);
1725 
1726  /* Do the insert. */
1728  ExecSimpleRelationInsert(relinfo, estate, remoteslot);
1729 
1730  /* Cleanup. */
1731  ExecCloseIndices(relinfo);
1732 }
1733 
1734 /*
1735  * Check if the logical replication relation is updatable and throw
1736  * appropriate error if it isn't.
1737  */
1738 static void
1740 {
1741  /*
1742  * For partitioned tables, we only need to care if the target partition is
1743  * updatable (aka has PK or RI defined for it).
1744  */
1745  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1746  return;
1747 
1748  /* Updatable, no error. */
1749  if (rel->updatable)
1750  return;
1751 
1752  /*
1753  * We are in error mode so it's fine this is somewhat slow. It's better to
1754  * give user correct error.
1755  */
1757  {
1758  ereport(ERROR,
1759  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1760  errmsg("publisher did not send replica identity column "
1761  "expected by the logical replication target relation \"%s.%s\"",
1762  rel->remoterel.nspname, rel->remoterel.relname)));
1763  }
1764 
1765  ereport(ERROR,
1766  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1767  errmsg("logical replication target relation \"%s.%s\" has "
1768  "neither REPLICA IDENTITY index nor PRIMARY "
1769  "KEY and published relation does not have "
1770  "REPLICA IDENTITY FULL",
1771  rel->remoterel.nspname, rel->remoterel.relname)));
1772 }
1773 
1774 /*
1775  * Handle UPDATE message.
1776  *
1777  * TODO: FDW support
1778  */
1779 static void
1781 {
1782  LogicalRepRelMapEntry *rel;
1783  LogicalRepRelId relid;
1784  ApplyExecutionData *edata;
1785  EState *estate;
1786  LogicalRepTupleData oldtup;
1787  LogicalRepTupleData newtup;
1788  bool has_oldtup;
1789  TupleTableSlot *remoteslot;
1790  RangeTblEntry *target_rte;
1791  MemoryContext oldctx;
1792 
1793  /*
1794  * Quick return if we are skipping data modification changes or handling
1795  * streamed transactions.
1796  */
1797  if (is_skipping_changes() ||
1799  return;
1800 
1802 
1803  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
1804  &newtup);
1805  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1806  if (!should_apply_changes_for_rel(rel))
1807  {
1808  /*
1809  * The relation can't become interesting in the middle of the
1810  * transaction so it's safe to unlock it.
1811  */
1814  return;
1815  }
1816 
1817  /* Set relation for error callback */
1819 
1820  /* Check if we can do the update. */
1822 
1823  /* Initialize the executor state. */
1824  edata = create_edata_for_relation(rel);
1825  estate = edata->estate;
1826  remoteslot = ExecInitExtraTupleSlot(estate,
1827  RelationGetDescr(rel->localrel),
1828  &TTSOpsVirtual);
1829 
1830  /*
1831  * Populate updatedCols so that per-column triggers can fire, and so
1832  * executor can correctly pass down indexUnchanged hint. This could
1833  * include more columns than were actually changed on the publisher
1834  * because the logical replication protocol doesn't contain that
1835  * information. But it would for example exclude columns that only exist
1836  * on the subscriber, since we are not touching those.
1837  */
1838  target_rte = list_nth(estate->es_range_table, 0);
1839  for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
1840  {
1842  int remoteattnum = rel->attrmap->attnums[i];
1843 
1844  if (!att->attisdropped && remoteattnum >= 0)
1845  {
1846  Assert(remoteattnum < newtup.ncols);
1847  if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
1848  target_rte->updatedCols =
1849  bms_add_member(target_rte->updatedCols,
1851  }
1852  }
1853 
1854  /* Also populate extraUpdatedCols, in case we have generated columns */
1855  fill_extraUpdatedCols(target_rte, rel->localrel);
1856 
1857  /* Build the search tuple. */
1859  slot_store_data(remoteslot, rel,
1860  has_oldtup ? &oldtup : &newtup);
1861  MemoryContextSwitchTo(oldctx);
1862 
1863  /* For a partitioned table, apply update to correct partition. */
1864  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1866  remoteslot, &newtup, CMD_UPDATE);
1867  else
1869  remoteslot, &newtup);
1870 
1871  finish_edata(edata);
1872 
1873  /* Reset relation for error callback */
1875 
1877 
1879 }
1880 
1881 /*
1882  * Workhorse for apply_handle_update()
1883  * relinfo is for the relation we're actually updating in
1884  * (could be a child partition of edata->targetRelInfo)
1885  */
1886 static void
1888  ResultRelInfo *relinfo,
1889  TupleTableSlot *remoteslot,
1890  LogicalRepTupleData *newtup)
1891 {
1892  EState *estate = edata->estate;
1893  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
1894  Relation localrel = relinfo->ri_RelationDesc;
1895  EPQState epqstate;
1896  TupleTableSlot *localslot;
1897  bool found;
1898  MemoryContext oldctx;
1899 
1900  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1901  ExecOpenIndices(relinfo, false);
1902 
1903  found = FindReplTupleInLocalRel(estate, localrel,
1904  &relmapentry->remoterel,
1905  remoteslot, &localslot);
1906  ExecClearTuple(remoteslot);
1907 
1908  /*
1909  * Tuple found.
1910  *
1911  * Note this will fail if there are other conflicting unique indexes.
1912  */
1913  if (found)
1914  {
1915  /* Process and store remote tuple in the slot */
1917  slot_modify_data(remoteslot, localslot, relmapentry, newtup);
1918  MemoryContextSwitchTo(oldctx);
1919 
1920  EvalPlanQualSetSlot(&epqstate, remoteslot);
1921 
1922  /* Do the actual update. */
1924  ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
1925  remoteslot);
1926  }
1927  else
1928  {
1929  /*
1930  * The tuple to be updated could not be found. Do nothing except for
1931  * emitting a log message.
1932  *
1933  * XXX should this be promoted to ereport(LOG) perhaps?
1934  */
1935  elog(DEBUG1,
1936  "logical replication did not find row to be updated "
1937  "in replication target relation \"%s\"",
1938  RelationGetRelationName(localrel));
1939  }
1940 
1941  /* Cleanup. */
1942  ExecCloseIndices(relinfo);
1943  EvalPlanQualEnd(&epqstate);
1944 }
1945 
1946 /*
1947  * Handle DELETE message.
1948  *
1949  * TODO: FDW support
1950  */
1951 static void
1953 {
1954  LogicalRepRelMapEntry *rel;
1955  LogicalRepTupleData oldtup;
1956  LogicalRepRelId relid;
1957  ApplyExecutionData *edata;
1958  EState *estate;
1959  TupleTableSlot *remoteslot;
1960  MemoryContext oldctx;
1961 
1962  /*
1963  * Quick return if we are skipping data modification changes or handling
1964  * streamed transactions.
1965  */
1966  if (is_skipping_changes() ||
1968  return;
1969 
1971 
1972  relid = logicalrep_read_delete(s, &oldtup);
1973  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1974  if (!should_apply_changes_for_rel(rel))
1975  {
1976  /*
1977  * The relation can't become interesting in the middle of the
1978  * transaction so it's safe to unlock it.
1979  */
1982  return;
1983  }
1984 
1985  /* Set relation for error callback */
1987 
1988  /* Check if we can do the delete. */
1990 
1991  /* Initialize the executor state. */
1992  edata = create_edata_for_relation(rel);
1993  estate = edata->estate;
1994  remoteslot = ExecInitExtraTupleSlot(estate,
1995  RelationGetDescr(rel->localrel),
1996  &TTSOpsVirtual);
1997 
1998  /* Build the search tuple. */
2000  slot_store_data(remoteslot, rel, &oldtup);
2001  MemoryContextSwitchTo(oldctx);
2002 
2003  /* For a partitioned table, apply delete to correct partition. */
2004  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2006  remoteslot, NULL, CMD_DELETE);
2007  else
2009  remoteslot);
2010 
2011  finish_edata(edata);
2012 
2013  /* Reset relation for error callback */
2015 
2017 
2019 }
2020 
2021 /*
2022  * Workhorse for apply_handle_delete()
2023  * relinfo is for the relation we're actually deleting from
2024  * (could be a child partition of edata->targetRelInfo)
2025  */
2026 static void
2028  ResultRelInfo *relinfo,
2029  TupleTableSlot *remoteslot)
2030 {
2031  EState *estate = edata->estate;
2032  Relation localrel = relinfo->ri_RelationDesc;
2033  LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
2034  EPQState epqstate;
2035  TupleTableSlot *localslot;
2036  bool found;
2037 
2038  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
2039  ExecOpenIndices(relinfo, false);
2040 
2041  found = FindReplTupleInLocalRel(estate, localrel, remoterel,
2042  remoteslot, &localslot);
2043 
2044  /* If found delete it. */
2045  if (found)
2046  {
2047  EvalPlanQualSetSlot(&epqstate, localslot);
2048 
2049  /* Do the actual delete. */
2051  ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
2052  }
2053  else
2054  {
2055  /*
2056  * The tuple to be deleted could not be found. Do nothing except for
2057  * emitting a log message.
2058  *
2059  * XXX should this be promoted to ereport(LOG) perhaps?
2060  */
2061  elog(DEBUG1,
2062  "logical replication did not find row to be deleted "
2063  "in replication target relation \"%s\"",
2064  RelationGetRelationName(localrel));
2065  }
2066 
2067  /* Cleanup. */
2068  ExecCloseIndices(relinfo);
2069  EvalPlanQualEnd(&epqstate);
2070 }
2071 
2072 /*
2073  * Try to find a tuple received from the publication side (in 'remoteslot') in
2074  * the corresponding local relation using either replica identity index,
2075  * primary key or if needed, sequential scan.
2076  *
2077  * Local tuple, if found, is returned in '*localslot'.
2078  */
2079 static bool
2081  LogicalRepRelation *remoterel,
2082  TupleTableSlot *remoteslot,
2083  TupleTableSlot **localslot)
2084 {
2085  Oid idxoid;
2086  bool found;
2087 
2088  /*
2089  * Regardless of the top-level operation, we're performing a read here, so
2090  * check for SELECT privileges.
2091  */
2092  TargetPrivilegesCheck(localrel, ACL_SELECT);
2093 
2094  *localslot = table_slot_create(localrel, &estate->es_tupleTable);
2095 
2096  idxoid = GetRelationIdentityOrPK(localrel);
2097  Assert(OidIsValid(idxoid) ||
2098  (remoterel->replident == REPLICA_IDENTITY_FULL));
2099 
2100  if (OidIsValid(idxoid))
2101  found = RelationFindReplTupleByIndex(localrel, idxoid,
2103  remoteslot, *localslot);
2104  else
2105  found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
2106  remoteslot, *localslot);
2107 
2108  return found;
2109 }
2110 
2111 /*
2112  * This handles insert, update, delete on a partitioned table.
2113  */
2114 static void
2116  TupleTableSlot *remoteslot,
2117  LogicalRepTupleData *newtup,
2118  CmdType operation)
2119 {
2120  EState *estate = edata->estate;
2121  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2122  ResultRelInfo *relinfo = edata->targetRelInfo;
2123  Relation parentrel = relinfo->ri_RelationDesc;
2124  ModifyTableState *mtstate;
2125  PartitionTupleRouting *proute;
2126  ResultRelInfo *partrelinfo;
2127  Relation partrel;
2128  TupleTableSlot *remoteslot_part;
2129  TupleConversionMap *map;
2130  MemoryContext oldctx;
2131  LogicalRepRelMapEntry *part_entry = NULL;
2132  AttrMap *attrmap = NULL;
2133 
2134  /* ModifyTableState is needed for ExecFindPartition(). */
2135  edata->mtstate = mtstate = makeNode(ModifyTableState);
2136  mtstate->ps.plan = NULL;
2137  mtstate->ps.state = estate;
2138  mtstate->operation = operation;
2139  mtstate->resultRelInfo = relinfo;
2140 
2141  /* ... as is PartitionTupleRouting. */
2142  edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
2143 
2144  /*
2145  * Find the partition to which the "search tuple" belongs.
2146  */
2147  Assert(remoteslot != NULL);
2149  partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
2150  remoteslot, estate);
2151  Assert(partrelinfo != NULL);
2152  partrel = partrelinfo->ri_RelationDesc;
2153 
2154  /*
2155  * To perform any of the operations below, the tuple must match the
2156  * partition's rowtype. Convert if needed or just copy, using a dedicated
2157  * slot to store the tuple in any case.
2158  */
2159  remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
2160  if (remoteslot_part == NULL)
2161  remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
2162  map = partrelinfo->ri_RootToPartitionMap;
2163  if (map != NULL)
2164  {
2165  attrmap = map->attrMap;
2166  remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
2167  remoteslot_part);
2168  }
2169  else
2170  {
2171  remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
2172  slot_getallattrs(remoteslot_part);
2173  }
2174  MemoryContextSwitchTo(oldctx);
2175 
2176  /* Check if we can do the update or delete on the leaf partition. */
2177  if (operation == CMD_UPDATE || operation == CMD_DELETE)
2178  {
2179  part_entry = logicalrep_partition_open(relmapentry, partrel,
2180  attrmap);
2181  check_relation_updatable(part_entry);
2182  }
2183 
2184  switch (operation)
2185  {
2186  case CMD_INSERT:
2187  apply_handle_insert_internal(edata, partrelinfo,
2188  remoteslot_part);
2189  break;
2190 
2191  case CMD_DELETE:
2192  apply_handle_delete_internal(edata, partrelinfo,
2193  remoteslot_part);
2194  break;
2195 
2196  case CMD_UPDATE:
2197 
2198  /*
2199  * For UPDATE, depending on whether or not the updated tuple
2200  * satisfies the partition's constraint, perform a simple UPDATE
2201  * of the partition or move the updated tuple into a different
2202  * suitable partition.
2203  */
2204  {
2205  TupleTableSlot *localslot;
2206  ResultRelInfo *partrelinfo_new;
2207  bool found;
2208 
2209  /* Get the matching local tuple from the partition. */
2210  found = FindReplTupleInLocalRel(estate, partrel,
2211  &part_entry->remoterel,
2212  remoteslot_part, &localslot);
2213  if (!found)
2214  {
2215  /*
2216  * The tuple to be updated could not be found. Do nothing
2217  * except for emitting a log message.
2218  *
2219  * XXX should this be promoted to ereport(LOG) perhaps?
2220  */
2221  elog(DEBUG1,
2222  "logical replication did not find row to be updated "
2223  "in replication target relation's partition \"%s\"",
2224  RelationGetRelationName(partrel));
2225  return;
2226  }
2227 
2228  /*
2229  * Apply the update to the local tuple, putting the result in
2230  * remoteslot_part.
2231  */
2233  slot_modify_data(remoteslot_part, localslot, part_entry,
2234  newtup);
2235  MemoryContextSwitchTo(oldctx);
2236 
2237  /*
2238  * Does the updated tuple still satisfy the current
2239  * partition's constraint?
2240  */
2241  if (!partrel->rd_rel->relispartition ||
2242  ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
2243  false))
2244  {
2245  /*
2246  * Yes, so simply UPDATE the partition. We don't call
2247  * apply_handle_update_internal() here, which would
2248  * normally do the following work, to avoid repeating some
2249  * work already done above to find the local tuple in the
2250  * partition.
2251  */
2252  EPQState epqstate;
2253 
2254  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
2255  ExecOpenIndices(partrelinfo, false);
2256 
2257  EvalPlanQualSetSlot(&epqstate, remoteslot_part);
2259  ACL_UPDATE);
2260  ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
2261  localslot, remoteslot_part);
2262  ExecCloseIndices(partrelinfo);
2263  EvalPlanQualEnd(&epqstate);
2264  }
2265  else
2266  {
2267  /* Move the tuple into the new partition. */
2268 
2269  /*
2270  * New partition will be found using tuple routing, which
2271  * can only occur via the parent table. We might need to
2272  * convert the tuple to the parent's rowtype. Note that
2273  * this is the tuple found in the partition, not the
2274  * original search tuple received by this function.
2275  */
2276  if (map)
2277  {
2278  TupleConversionMap *PartitionToRootMap =
2280  RelationGetDescr(parentrel));
2281 
2282  remoteslot =
2283  execute_attr_map_slot(PartitionToRootMap->attrMap,
2284  remoteslot_part, remoteslot);
2285  }
2286  else
2287  {
2288  remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
2289  slot_getallattrs(remoteslot);
2290  }
2291 
2292 
2293  /* Find the new partition. */
2295  partrelinfo_new = ExecFindPartition(mtstate, relinfo,
2296  proute, remoteslot,
2297  estate);
2298  MemoryContextSwitchTo(oldctx);
2299  Assert(partrelinfo_new != partrelinfo);
2300 
2301  /* DELETE old tuple found in the old partition. */
2302  apply_handle_delete_internal(edata, partrelinfo,
2303  localslot);
2304 
2305  /* INSERT new tuple into the new partition. */
2306 
2307  /*
2308  * Convert the replacement tuple to match the destination
2309  * partition rowtype.
2310  */
2312  partrel = partrelinfo_new->ri_RelationDesc;
2313  remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
2314  if (remoteslot_part == NULL)
2315  remoteslot_part = table_slot_create(partrel,
2316  &estate->es_tupleTable);
2317  map = partrelinfo_new->ri_RootToPartitionMap;
2318  if (map != NULL)
2319  {
2320  remoteslot_part = execute_attr_map_slot(map->attrMap,
2321  remoteslot,
2322  remoteslot_part);
2323  }
2324  else
2325  {
2326  remoteslot_part = ExecCopySlot(remoteslot_part,
2327  remoteslot);
2328  slot_getallattrs(remoteslot);
2329  }
2330  MemoryContextSwitchTo(oldctx);
2331  apply_handle_insert_internal(edata, partrelinfo_new,
2332  remoteslot_part);
2333  }
2334  }
2335  break;
2336 
2337  default:
2338  elog(ERROR, "unrecognized CmdType: %d", (int) operation);
2339  break;
2340  }
2341 }
2342 
2343 /*
2344  * Handle TRUNCATE message.
2345  *
2346  * TODO: FDW support
2347  */
2348 static void
2350 {
2351  bool cascade = false;
2352  bool restart_seqs = false;
2353  List *remote_relids = NIL;
2354  List *remote_rels = NIL;
2355  List *rels = NIL;
2356  List *part_rels = NIL;
2357  List *relids = NIL;
2358  List *relids_logged = NIL;
2359  ListCell *lc;
2360  LOCKMODE lockmode = AccessExclusiveLock;
2361 
2362  /*
2363  * Quick return if we are skipping data modification changes or handling
2364  * streamed transactions.
2365  */
2366  if (is_skipping_changes() ||
2368  return;
2369 
2371 
2372  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
2373 
2374  foreach(lc, remote_relids)
2375  {
2376  LogicalRepRelId relid = lfirst_oid(lc);
2377  LogicalRepRelMapEntry *rel;
2378 
2379  rel = logicalrep_rel_open(relid, lockmode);
2380  if (!should_apply_changes_for_rel(rel))
2381  {
2382  /*
2383  * The relation can't become interesting in the middle of the
2384  * transaction so it's safe to unlock it.
2385  */
2386  logicalrep_rel_close(rel, lockmode);
2387  continue;
2388  }
2389 
2390  remote_rels = lappend(remote_rels, rel);
2392  rels = lappend(rels, rel->localrel);
2393  relids = lappend_oid(relids, rel->localreloid);
2395  relids_logged = lappend_oid(relids_logged, rel->localreloid);
2396 
2397  /*
2398  * Truncate partitions if we got a message to truncate a partitioned
2399  * table.
2400  */
2401  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2402  {
2403  ListCell *child;
2404  List *children = find_all_inheritors(rel->localreloid,
2405  lockmode,
2406  NULL);
2407 
2408  foreach(child, children)
2409  {
2410  Oid childrelid = lfirst_oid(child);
2411  Relation childrel;
2412 
2413  if (list_member_oid(relids, childrelid))
2414  continue;
2415 
2416  /* find_all_inheritors already got lock */
2417  childrel = table_open(childrelid, NoLock);
2418 
2419  /*
2420  * Ignore temp tables of other backends. See similar code in
2421  * ExecuteTruncate().
2422  */
2423  if (RELATION_IS_OTHER_TEMP(childrel))
2424  {
2425  table_close(childrel, lockmode);
2426  continue;
2427  }
2428 
2430  rels = lappend(rels, childrel);
2431  part_rels = lappend(part_rels, childrel);
2432  relids = lappend_oid(relids, childrelid);
2433  /* Log this relation only if needed for logical decoding */
2434  if (RelationIsLogicallyLogged(childrel))
2435  relids_logged = lappend_oid(relids_logged, childrelid);
2436  }
2437  }
2438  }
2439 
2440  /*
2441  * Even if we used CASCADE on the upstream primary we explicitly default
2442  * to replaying changes without further cascading. This might be later
2443  * changeable with a user specified option.
2444  */
2445  ExecuteTruncateGuts(rels,
2446  relids,
2447  relids_logged,
2448  DROP_RESTRICT,
2449  restart_seqs);
2450  foreach(lc, remote_rels)
2451  {
2452  LogicalRepRelMapEntry *rel = lfirst(lc);
2453 
2455  }
2456  foreach(lc, part_rels)
2457  {
2458  Relation rel = lfirst(lc);
2459 
2460  table_close(rel, NoLock);
2461  }
2462 
2464 }
2465 
2466 
2467 /*
2468  * Logical replication protocol message dispatcher.
2469  */
2470 static void
2472 {
2474  LogicalRepMsgType saved_command;
2475 
2476  /*
2477  * Set the current command being applied. Since this function can be
2478  * called recursively when applying spooled changes, save the current
2479  * command.
2480  */
2481  saved_command = apply_error_callback_arg.command;
2483 
2484  switch (action)
2485  {
2486  case LOGICAL_REP_MSG_BEGIN:
2487  apply_handle_begin(s);
2488  break;
2489 
2492  break;
2493 
2496  break;
2497 
2500  break;
2501 
2504  break;
2505 
2508  break;
2509 
2512  break;
2513 
2514  case LOGICAL_REP_MSG_TYPE:
2515  apply_handle_type(s);
2516  break;
2517 
2520  break;
2521 
2523 
2524  /*
2525  * Logical replication does not use generic logical messages yet.
2526  * Although, it could be used by other applications that use this
2527  * output plugin.
2528  */
2529  break;
2530 
2533  break;
2534 
2537  break;
2538 
2541  break;
2542 
2545  break;
2546 
2549  break;
2550 
2553  break;
2554 
2557  break;
2558 
2561  break;
2562 
2565  break;
2566 
2567  default:
2568  ereport(ERROR,
2569  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2570  errmsg("invalid logical replication message type \"%c\"", action)));
2571  }
2572 
2573  /* Reset the current command */
2574  apply_error_callback_arg.command = saved_command;
2575 }
2576 
2577 /*
2578  * Figure out which write/flush positions to report to the walsender process.
2579  *
2580  * We can't simply report back the last LSN the walsender sent us because the
2581  * local transaction might not yet be flushed to disk locally. Instead we
2582  * build a list that associates local with remote LSNs for every commit. When
2583  * reporting back the flush position to the sender we iterate that list and
2584  * check which entries on it are already locally flushed. Those we can report
2585  * as having been flushed.
2586  *
2587  * The have_pending_txes is true if there are outstanding transactions that
2588  * need to be flushed.
2589  */
2590 static void
2592  bool *have_pending_txes)
2593 {
2594  dlist_mutable_iter iter;
2595  XLogRecPtr local_flush = GetFlushRecPtr(NULL);
2596 
2598  *flush = InvalidXLogRecPtr;
2599 
2601  {
2602  FlushPosition *pos =
2603  dlist_container(FlushPosition, node, iter.cur);
2604 
2605  *write = pos->remote_end;
2606 
2607  if (pos->local_end <= local_flush)
2608  {
2609  *flush = pos->remote_end;
2610  dlist_delete(iter.cur);
2611  pfree(pos);
2612  }
2613  else
2614  {
2615  /*
2616  * Don't want to uselessly iterate over the rest of the list which
2617  * could potentially be long. Instead get the last element and
2618  * grab the write position from there.
2619  */
2620  pos = dlist_tail_element(FlushPosition, node,
2621  &lsn_mapping);
2622  *write = pos->remote_end;
2623  *have_pending_txes = true;
2624  return;
2625  }
2626  }
2627 
2628  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
2629 }
2630 
2631 /*
2632  * Store current remote/local lsn pair in the tracking list.
2633  */
2634 static void
2636 {
2637  FlushPosition *flushpos;
2638 
2639  /* Need to do this in permanent context */
2641 
2642  /* Track commit lsn */
2643  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
2644  flushpos->local_end = XactLastCommitEnd;
2645  flushpos->remote_end = remote_lsn;
2646 
2647  dlist_push_tail(&lsn_mapping, &flushpos->node);
2649 }
2650 
2651 
2652 /* Update statistics of the worker. */
2653 static void
2654 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
2655 {
2656  MyLogicalRepWorker->last_lsn = last_lsn;
2657  MyLogicalRepWorker->last_send_time = send_time;
2659  if (reply)
2660  {
2661  MyLogicalRepWorker->reply_lsn = last_lsn;
2662  MyLogicalRepWorker->reply_time = send_time;
2663  }
2664 }
2665 
2666 /*
2667  * Apply main loop.
2668  */
2669 static void
2671 {
2672  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
2673  bool ping_sent = false;
2674  TimeLineID tli;
2675  ErrorContextCallback errcallback;
2676 
2677  /*
2678  * Init the ApplyMessageContext which we clean up after each replication
2679  * protocol message.
2680  */
2682  "ApplyMessageContext",
2684 
2685  /*
2686  * This memory context is used for per-stream data when the streaming mode
2687  * is enabled. This context is reset on each stream stop.
2688  */
2690  "LogicalStreamingContext",
2692 
2693  /* mark as idle, before starting to loop */
2695 
2696  /*
2697  * Push apply error context callback. Fields will be filled while applying
2698  * a change.
2699  */
2700  errcallback.callback = apply_error_callback;
2701  errcallback.previous = error_context_stack;
2702  error_context_stack = &errcallback;
2703 
2704  /* This outer loop iterates once per wait. */
2705  for (;;)
2706  {
2708  int rc;
2709  int len;
2710  char *buf = NULL;
2711  bool endofstream = false;
2712  long wait_time;
2713 
2715 
2717 
2719 
2720  if (len != 0)
2721  {
2722  /* Loop to process all available data (without blocking). */
2723  for (;;)
2724  {
2726 
2727  if (len == 0)
2728  {
2729  break;
2730  }
2731  else if (len < 0)
2732  {
2733  ereport(LOG,
2734  (errmsg("data stream from publisher has ended")));
2735  endofstream = true;
2736  break;
2737  }
2738  else
2739  {
2740  int c;
2741  StringInfoData s;
2742 
2743  /* Reset timeout. */
2744  last_recv_timestamp = GetCurrentTimestamp();
2745  ping_sent = false;
2746 
2747  /* Ensure we are reading the data into our memory context. */
2749 
2750  s.data = buf;
2751  s.len = len;
2752  s.cursor = 0;
2753  s.maxlen = -1;
2754 
2755  c = pq_getmsgbyte(&s);
2756 
2757  if (c == 'w')
2758  {
2759  XLogRecPtr start_lsn;
2760  XLogRecPtr end_lsn;
2761  TimestampTz send_time;
2762 
2763  start_lsn = pq_getmsgint64(&s);
2764  end_lsn = pq_getmsgint64(&s);
2765  send_time = pq_getmsgint64(&s);
2766 
2767  if (last_received < start_lsn)
2768  last_received = start_lsn;
2769 
2770  if (last_received < end_lsn)
2771  last_received = end_lsn;
2772 
2773  UpdateWorkerStats(last_received, send_time, false);
2774 
2775  apply_dispatch(&s);
2776  }
2777  else if (c == 'k')
2778  {
2779  XLogRecPtr end_lsn;
2781  bool reply_requested;
2782 
2783  end_lsn = pq_getmsgint64(&s);
2784  timestamp = pq_getmsgint64(&s);
2785  reply_requested = pq_getmsgbyte(&s);
2786 
2787  if (last_received < end_lsn)
2788  last_received = end_lsn;
2789 
2790  send_feedback(last_received, reply_requested, false);
2791  UpdateWorkerStats(last_received, timestamp, true);
2792  }
2793  /* other message types are purposefully ignored */
2794 
2796  }
2797 
2799  }
2800  }
2801 
2802  /* confirm all writes so far */
2803  send_feedback(last_received, false, false);
2804 
2806  {
2807  /*
2808  * If we didn't get any transactions for a while there might be
2809  * unconsumed invalidation messages in the queue, consume them
2810  * now.
2811  */
2814 
2815  /* Process any table synchronization changes. */
2816  process_syncing_tables(last_received);
2817  }
2818 
2819  /* Cleanup the memory. */
2822 
2823  /* Check if we need to exit the streaming loop. */
2824  if (endofstream)
2825  break;
2826 
2827  /*
2828  * Wait for more data or latch. If we have unflushed transactions,
2829  * wake up after WalWriterDelay to see if they've been flushed yet (in
2830  * which case we should send a feedback message). Otherwise, there's
2831  * no particular urgency about waking up unless we get data or a
2832  * signal.
2833  */
2834  if (!dlist_is_empty(&lsn_mapping))
2835  wait_time = WalWriterDelay;
2836  else
2837  wait_time = NAPTIME_PER_CYCLE;
2838 
2842  fd, wait_time,
2844 
2845  if (rc & WL_LATCH_SET)
2846  {
2849  }
2850 
2851  if (ConfigReloadPending)
2852  {
2853  ConfigReloadPending = false;
2855  }
2856 
2857  if (rc & WL_TIMEOUT)
2858  {
2859  /*
2860  * We didn't receive anything new. If we haven't heard anything
2861  * from the server for more than wal_receiver_timeout / 2, ping
2862  * the server. Also, if it's been longer than
2863  * wal_receiver_status_interval since the last update we sent,
2864  * send a status update to the primary anyway, to report any
2865  * progress in applying WAL.
2866  */
2867  bool requestReply = false;
2868 
2869  /*
2870  * Check if time since last receive from primary has reached the
2871  * configured limit.
2872  */
2873  if (wal_receiver_timeout > 0)
2874  {
2876  TimestampTz timeout;
2877 
2878  timeout =
2879  TimestampTzPlusMilliseconds(last_recv_timestamp,
2881 
2882  if (now >= timeout)
2883  ereport(ERROR,
2884  (errcode(ERRCODE_CONNECTION_FAILURE),
2885  errmsg("terminating logical replication worker due to timeout")));
2886 
2887  /* Check to see if it's time for a ping. */
2888  if (!ping_sent)
2889  {
2890  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
2891  (wal_receiver_timeout / 2));
2892  if (now >= timeout)
2893  {
2894  requestReply = true;
2895  ping_sent = true;
2896  }
2897  }
2898  }
2899 
2900  send_feedback(last_received, requestReply, requestReply);
2901 
2902  /*
2903  * Force reporting to ensure long idle periods don't lead to
2904  * arbitrarily delayed stats. Stats can only be reported outside
2905  * of (implicit or explicit) transactions. That shouldn't lead to
2906  * stats being delayed for long, because transactions are either
2907  * sent as a whole on commit or streamed. Streamed transactions
2908  * are spilled to disk and applied on commit.
2909  */
2910  if (!IsTransactionState())
2911  pgstat_report_stat(true);
2912  }
2913  }
2914 
2915  /* Pop the error context stack */
2916  error_context_stack = errcallback.previous;
2917 
2918  /* All done */
2920 }
2921 
2922 /*
2923  * Send a Standby Status Update message to server.
2924  *
2925  * 'recvpos' is the latest LSN we've received data to, force is set if we need
2926  * to send a response to avoid timeouts.
2927  */
2928 static void
2929 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
2930 {
2931  static StringInfo reply_message = NULL;
2932  static TimestampTz send_time = 0;
2933 
2934  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
2935  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
2936  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
2937 
2938  XLogRecPtr writepos;
2939  XLogRecPtr flushpos;
2940  TimestampTz now;
2941  bool have_pending_txes;
2942 
2943  /*
2944  * If the user doesn't want status to be reported to the publisher, be
2945  * sure to exit before doing anything at all.
2946  */
2947  if (!force && wal_receiver_status_interval <= 0)
2948  return;
2949 
2950  /* It's legal to not pass a recvpos */
2951  if (recvpos < last_recvpos)
2952  recvpos = last_recvpos;
2953 
2954  get_flush_position(&writepos, &flushpos, &have_pending_txes);
2955 
2956  /*
2957  * No outstanding transactions to flush, we can report the latest received
2958  * position. This is important for synchronous replication.
2959  */
2960  if (!have_pending_txes)
2961  flushpos = writepos = recvpos;
2962 
2963  if (writepos < last_writepos)
2964  writepos = last_writepos;
2965 
2966  if (flushpos < last_flushpos)
2967  flushpos = last_flushpos;
2968 
2970 
2971  /* if we've already reported everything we're good */
2972  if (!force &&
2973  writepos == last_writepos &&
2974  flushpos == last_flushpos &&
2975  !TimestampDifferenceExceeds(send_time, now,
2977  return;
2978  send_time = now;
2979 
2980  if (!reply_message)
2981  {
2983 
2985  MemoryContextSwitchTo(oldctx);
2986  }
2987  else
2989 
2990  pq_sendbyte(reply_message, 'r');
2991  pq_sendint64(reply_message, recvpos); /* write */
2992  pq_sendint64(reply_message, flushpos); /* flush */
2993  pq_sendint64(reply_message, writepos); /* apply */
2994  pq_sendint64(reply_message, now); /* sendTime */
2995  pq_sendbyte(reply_message, requestReply); /* replyRequested */
2996 
2997  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
2998  force,
2999  LSN_FORMAT_ARGS(recvpos),
3000  LSN_FORMAT_ARGS(writepos),
3001  LSN_FORMAT_ARGS(flushpos));
3002 
3005 
3006  if (recvpos > last_recvpos)
3007  last_recvpos = recvpos;
3008  if (writepos > last_writepos)
3009  last_writepos = writepos;
3010  if (flushpos > last_flushpos)
3011  last_flushpos = flushpos;
3012 }
3013 
3014 /*
3015  * Reread subscription info if needed. Most changes will be exit.
3016  */
3017 static void
3019 {
3020  MemoryContext oldctx;
3022  bool started_tx = false;
3023 
3024  /* When cache state is valid there is nothing to do here. */
3025  if (MySubscriptionValid)
3026  return;
3027 
3028  /* This function might be called inside or outside of transaction. */
3029  if (!IsTransactionState())
3030  {
3032  started_tx = true;
3033  }
3034 
3035  /* Ensure allocations in permanent context. */
3037 
3039 
3040  /*
3041  * Exit if the subscription was removed. This normally should not happen
3042  * as the worker gets killed during DROP SUBSCRIPTION.
3043  */
3044  if (!newsub)
3045  {
3046  ereport(LOG,
3047  (errmsg("logical replication apply worker for subscription \"%s\" will "
3048  "stop because the subscription was removed",
3049  MySubscription->name)));
3050 
3051  proc_exit(0);
3052  }
3053 
3054  /* Exit if the subscription was disabled. */
3055  if (!newsub->enabled)
3056  {
3057  ereport(LOG,
3058  (errmsg("logical replication apply worker for subscription \"%s\" will "
3059  "stop because the subscription was disabled",
3060  MySubscription->name)));
3061 
3062  proc_exit(0);
3063  }
3064 
3065  /* !slotname should never happen when enabled is true. */
3066  Assert(newsub->slotname);
3067 
3068  /* two-phase should not be altered */
3069  Assert(newsub->twophasestate == MySubscription->twophasestate);
3070 
3071  /*
3072  * Exit if any parameter that affects the remote connection was changed.
3073  * The launcher will start a new worker.
3074  */
3075  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
3076  strcmp(newsub->name, MySubscription->name) != 0 ||
3077  strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
3078  newsub->binary != MySubscription->binary ||
3079  newsub->stream != MySubscription->stream ||
3080  newsub->owner != MySubscription->owner ||
3081  !equal(newsub->publications, MySubscription->publications))
3082  {
3083  ereport(LOG,
3084  (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
3085  MySubscription->name)));
3086 
3087  proc_exit(0);
3088  }
3089 
3090  /* Check for other changes that should never happen too. */
3091  if (newsub->dbid != MySubscription->dbid)
3092  {
3093  elog(ERROR, "subscription %u changed unexpectedly",
3095  }
3096 
3097  /* Clean old subscription info and switch to new one. */
3100 
3101  MemoryContextSwitchTo(oldctx);
3102 
3103  /* Change synchronous commit according to the user's wishes */
3104  SetConfigOption("synchronous_commit", MySubscription->synccommit,
3106 
3107  if (started_tx)
3109 
3110  MySubscriptionValid = true;
3111 }
3112 
3113 /*
3114  * Callback from subscription syscache invalidation.
3115  */
3116 static void
3117 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
3118 {
3119  MySubscriptionValid = false;
3120 }
3121 
3122 /*
3123  * subxact_info_write
3124  * Store information about subxacts for a toplevel transaction.
3125  *
3126  * For each subxact we store offset of it's first change in the main file.
3127  * The file is always over-written as a whole.
3128  *
3129  * XXX We should only store subxacts that were not aborted yet.
3130  */
3131 static void
3133 {
3134  char path[MAXPGPATH];
3135  Size len;
3136  BufFile *fd;
3137 
3139 
3140  /* construct the subxact filename */
3141  subxact_filename(path, subid, xid);
3142 
3143  /* Delete the subxacts file, if exists. */
3144  if (subxact_data.nsubxacts == 0)
3145  {
3148 
3149  return;
3150  }
3151 
3152  /*
3153  * Create the subxact file if it not already created, otherwise open the
3154  * existing file.
3155  */
3157  true);
3158  if (fd == NULL)
3160 
3161  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
3162 
3163  /* Write the subxact count and subxact info */
3166 
3167  BufFileClose(fd);
3168 
3169  /* free the memory allocated for subxact info */
3171 }
3172 
3173 /*
3174  * subxact_info_read
3175  * Restore information about subxacts of a streamed transaction.
3176  *
3177  * Read information about subxacts into the structure subxact_data that can be
3178  * used later.
3179  */
3180 static void
3182 {
3183  char path[MAXPGPATH];
3184  Size len;
3185  BufFile *fd;
3186  MemoryContext oldctx;
3187 
3191 
3192  /*
3193  * If the subxact file doesn't exist that means we don't have any subxact
3194  * info.
3195  */
3196  subxact_filename(path, subid, xid);
3198  true);
3199  if (fd == NULL)
3200  return;
3201 
3202  /* read number of subxact items */
3204  sizeof(subxact_data.nsubxacts)) !=
3205  sizeof(subxact_data.nsubxacts))
3206  ereport(ERROR,
3208  errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
3209  path)));
3210 
3211  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
3212 
3213  /* we keep the maximum as a power of 2 */
3215 
3216  /*
3217  * Allocate subxact information in the logical streaming context. We need
3218  * this information during the complete stream so that we can add the sub
3219  * transaction info to this. On stream stop we will flush this information
3220  * to the subxact file and reset the logical streaming context.
3221  */
3224  sizeof(SubXactInfo));
3225  MemoryContextSwitchTo(oldctx);
3226 
3227  if ((len > 0) && ((BufFileRead(fd, subxact_data.subxacts, len)) != len))
3228  ereport(ERROR,
3230  errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
3231  path)));
3232 
3233  BufFileClose(fd);
3234 }
3235 
3236 /*
3237  * subxact_info_add
3238  * Add information about a subxact (offset in the main file).
3239  */
3240 static void
3242 {
3243  SubXactInfo *subxacts = subxact_data.subxacts;
3244  int64 i;
3245 
3246  /* We must have a valid top level stream xid and a stream fd. */
3248  Assert(stream_fd != NULL);
3249 
3250  /*
3251  * If the XID matches the toplevel transaction, we don't want to add it.
3252  */
3253  if (stream_xid == xid)
3254  return;
3255 
3256  /*
3257  * In most cases we're checking the same subxact as we've already seen in
3258  * the last call, so make sure to ignore it (this change comes later).
3259  */
3260  if (subxact_data.subxact_last == xid)
3261  return;
3262 
3263  /* OK, remember we're processing this XID. */
3264  subxact_data.subxact_last = xid;
3265 
3266  /*
3267  * Check if the transaction is already present in the array of subxact. We
3268  * intentionally scan the array from the tail, because we're likely adding
3269  * a change for the most recent subtransactions.
3270  *
3271  * XXX Can we rely on the subxact XIDs arriving in sorted order? That
3272  * would allow us to use binary search here.
3273  */
3274  for (i = subxact_data.nsubxacts; i > 0; i--)
3275  {
3276  /* found, so we're done */
3277  if (subxacts[i - 1].xid == xid)
3278  return;
3279  }
3280 
3281  /* This is a new subxact, so we need to add it to the array. */
3282  if (subxact_data.nsubxacts == 0)
3283  {
3284  MemoryContext oldctx;
3285 
3287 
3288  /*
3289  * Allocate this memory for subxacts in per-stream context, see
3290  * subxact_info_read.
3291  */
3293  subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
3294  MemoryContextSwitchTo(oldctx);
3295  }
3297  {
3299  subxacts = repalloc(subxacts,
3301  }
3302 
3303  subxacts[subxact_data.nsubxacts].xid = xid;
3304 
3305  /*
3306  * Get the current offset of the stream file and store it as offset of
3307  * this subxact.
3308  */
3310  &subxacts[subxact_data.nsubxacts].fileno,
3311  &subxacts[subxact_data.nsubxacts].offset);
3312 
3314  subxact_data.subxacts = subxacts;
3315 }
3316 
3317 /* format filename for file containing the info about subxacts */
3318 static inline void
3319 subxact_filename(char *path, Oid subid, TransactionId xid)
3320 {
3321  snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
3322 }
3323 
3324 /* format filename for file containing serialized changes */
3325 static inline void
3326 changes_filename(char *path, Oid subid, TransactionId xid)
3327 {
3328  snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
3329 }
3330 
3331 /*
3332  * stream_cleanup_files
3333  * Cleanup files for a subscription / toplevel transaction.
3334  *
3335  * Remove files with serialized changes and subxact info for a particular
3336  * toplevel transaction. Each subscription has a separate set of files
3337  * for any toplevel transaction.
3338  */
3339 static void
3341 {
3342  char path[MAXPGPATH];
3343 
3344  /* Delete the changes file. */
3345  changes_filename(path, subid, xid);
3347 
3348  /* Delete the subxact file, if it exists. */
3349  subxact_filename(path, subid, xid);
3351 }
3352 
3353 /*
3354  * stream_open_file
3355  * Open a file that we'll use to serialize changes for a toplevel
3356  * transaction.
3357  *
3358  * Open a file for streamed changes from a toplevel transaction identified
3359  * by stream_xid (global variable). If it's the first chunk of streamed
3360  * changes for this transaction, create the buffile, otherwise open the
3361  * previously created file.
3362  *
3363  * This can only be called at the beginning of a "streaming" block, i.e.
3364  * between stream_start/stream_stop messages from the upstream.
3365  */
3366 static void
3367 stream_open_file(Oid subid, TransactionId xid, bool first_segment)
3368 {
3369  char path[MAXPGPATH];
3370  MemoryContext oldcxt;
3371 
3373  Assert(OidIsValid(subid));
3375  Assert(stream_fd == NULL);
3376 
3377 
3378  changes_filename(path, subid, xid);
3379  elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
3380 
3381  /*
3382  * Create/open the buffiles under the logical streaming context so that we
3383  * have those files until stream stop.
3384  */
3386 
3387  /*
3388  * If this is the first streamed segment, create the changes file.
3389  * Otherwise, just open the file for writing, in append mode.
3390  */
3391  if (first_segment)
3393  path);
3394  else
3395  {
3396  /*
3397  * Open the file and seek to the end of the file because we always
3398  * append the changes file.
3399  */
3401  path, O_RDWR, false);
3402  BufFileSeek(stream_fd, 0, 0, SEEK_END);
3403  }
3404 
3405  MemoryContextSwitchTo(oldcxt);
3406 }
3407 
3408 /*
3409  * stream_close_file
3410  * Close the currently open file with streamed changes.
3411  *
3412  * This can only be called at the end of a streaming block, i.e. at stream_stop
3413  * message from the upstream.
3414  */
3415 static void
3417 {
3420  Assert(stream_fd != NULL);
3421 
3423 
3425  stream_fd = NULL;
3426 }
3427 
3428 /*
3429  * stream_write_change
3430  * Serialize a change to a file for the current toplevel transaction.
3431  *
3432  * The change is serialized in a simple format, with length (not including
3433  * the length), action code (identifying the message type) and message
3434  * contents (without the subxact TransactionId value).
3435  */
3436 static void
3438 {
3439  int len;
3440 
3443  Assert(stream_fd != NULL);
3444 
3445  /* total on-disk size, including the action type character */
3446  len = (s->len - s->cursor) + sizeof(char);
3447 
3448  /* first write the size */
3449  BufFileWrite(stream_fd, &len, sizeof(len));
3450 
3451  /* then the action */
3452  BufFileWrite(stream_fd, &action, sizeof(action));
3453 
3454  /* and finally the remaining part of the buffer (after the XID) */
3455  len = (s->len - s->cursor);
3456 
3457  BufFileWrite(stream_fd, &s->data[s->cursor], len);
3458 }
3459 
3460 /*
3461  * Cleanup the memory for subxacts and reset the related variables.
3462  */
3463 static inline void
3465 {
3466  if (subxact_data.subxacts)
3468 
3469  subxact_data.subxacts = NULL;
3471  subxact_data.nsubxacts = 0;
3473 }
3474 
3475 /*
3476  * Form the prepared transaction GID for two_phase transactions.
3477  *
3478  * Return the GID in the supplied buffer.
3479  */
3480 static void
3481 TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
3482 {
3483  Assert(subid != InvalidRepOriginId);
3484 
3485  if (!TransactionIdIsValid(xid))
3486  ereport(ERROR,
3487  (errcode(ERRCODE_PROTOCOL_VIOLATION),
3488  errmsg_internal("invalid two-phase transaction ID")));
3489 
3490  snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
3491 }
3492 
3493 /*
3494  * Execute the initial sync with error handling. Disable the subscription,
3495  * if it's required.
3496  *
3497  * Allocate the slot name in long-lived context on return. Note that we don't
3498  * handle FATAL errors which are probably because of system resource error and
3499  * are not repeatable.
3500  */
3501 static void
3502 start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
3503 {
3504  char *syncslotname = NULL;
3505 
3507 
3508  PG_TRY();
3509  {
3510  /* Call initial sync. */
3511  syncslotname = LogicalRepSyncTableStart(origin_startpos);
3512  }
3513  PG_CATCH();
3514  {
3517  else
3518  {
3519  /*
3520  * Report the worker failed during table synchronization. Abort
3521  * the current transaction so that the stats message is sent in an
3522  * idle state.
3523  */
3526 
3527  PG_RE_THROW();
3528  }
3529  }
3530  PG_END_TRY();
3531 
3532  /* allocate slot name in long-lived context */
3533  *myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
3534  pfree(syncslotname);
3535 }
3536 
3537 /*
3538  * Run the apply loop with error handling. Disable the subscription,
3539  * if necessary.
3540  *
3541  * Note that we don't handle FATAL errors which are probably because
3542  * of system resource error and are not repeatable.
3543  */
3544 static void
3545 start_apply(XLogRecPtr origin_startpos)
3546 {
3547  PG_TRY();
3548  {
3549  LogicalRepApplyLoop(origin_startpos);
3550  }
3551  PG_CATCH();
3552  {
3555  else
3556  {
3557  /*
3558  * Report the worker failed while applying changes. Abort the
3559  * current transaction so that the stats message is sent in an
3560  * idle state.
3561  */
3564 
3565  PG_RE_THROW();
3566  }
3567  }
3568  PG_END_TRY();
3569 }
3570 
3571 /* Logical Replication Apply worker entry point */
3572 void
3574 {
3575  int worker_slot = DatumGetInt32(main_arg);
3576  MemoryContext oldctx;
3577  char originname[NAMEDATALEN];
3578  XLogRecPtr origin_startpos = InvalidXLogRecPtr;
3579  char *myslotname = NULL;
3581  int server_version;
3582 
3583  /* Attach to slot */
3584  logicalrep_worker_attach(worker_slot);
3585 
3586  /* Setup signal handling */
3588  pqsignal(SIGTERM, die);
3590 
3591  /*
3592  * We don't currently need any ResourceOwner in a walreceiver process, but
3593  * if we did, we could call CreateAuxProcessResourceOwner here.
3594  */
3595 
3596  /* Initialise stats to a sanish value */
3599 
3600  /* Load the libpq-specific functions */
3601  load_file("libpqwalreceiver", false);
3602 
3603  /* Run as replica session replication role. */
3604  SetConfigOption("session_replication_role", "replica",
3606 
3607  /* Connect to our database. */
3610  0);
3611 
3612  /*
3613  * Set always-secure search path, so malicious users can't redirect user
3614  * code (e.g. pg_index.indexprs).
3615  */
3616  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
3617 
3618  /* Load the subscription into persistent memory context. */
3620  "ApplyContext",
3624 
3626  if (!MySubscription)
3627  {
3628  ereport(LOG,
3629  (errmsg("logical replication apply worker for subscription %u will not "
3630  "start because the subscription was removed during startup",
3632  proc_exit(0);
3633  }
3634 
3635  MySubscriptionValid = true;
3636  MemoryContextSwitchTo(oldctx);
3637 
3638  if (!MySubscription->enabled)
3639  {
3640  ereport(LOG,
3641  (errmsg("logical replication apply worker for subscription \"%s\" will not "
3642  "start because the subscription was disabled during startup",
3643  MySubscription->name)));
3644 
3645  proc_exit(0);
3646  }
3647 
3648  /* Setup synchronous commit according to the user's wishes */
3649  SetConfigOption("synchronous_commit", MySubscription->synccommit,
3651 
3652  /* Keep us informed about subscription changes. */
3655  (Datum) 0);
3656 
3657  if (am_tablesync_worker())
3658  ereport(LOG,
3659  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
3661  else
3662  ereport(LOG,
3663  (errmsg("logical replication apply worker for subscription \"%s\" has started",
3664  MySubscription->name)));
3665 
3667 
3668  /* Connect to the origin and start the replication. */
3669  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
3671 
3672  if (am_tablesync_worker())
3673  {
3674  start_table_sync(&origin_startpos, &myslotname);
3675 
3676  /*
3677  * Allocate the origin name in long-lived context for error context
3678  * message.
3679  */
3682  originname,
3683  sizeof(originname));
3685  originname);
3686  }
3687  else
3688  {
3689  /* This is main apply worker */
3690  RepOriginId originid;
3691  TimeLineID startpointTLI;
3692  char *err;
3693 
3694  myslotname = MySubscription->slotname;
3695 
3696  /*
3697  * This shouldn't happen if the subscription is enabled, but guard
3698  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
3699  * crash if slot is NULL.)
3700  */
3701  if (!myslotname)
3702  ereport(ERROR,
3703  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3704  errmsg("subscription has no replication slot set")));
3705 
3706  /* Setup replication origin tracking. */
3708  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
3709  originid = replorigin_by_name(originname, true);
3710  if (!OidIsValid(originid))
3711  originid = replorigin_create(originname);
3712  replorigin_session_setup(originid);
3713  replorigin_session_origin = originid;
3714  origin_startpos = replorigin_session_get_progress(false);
3716 
3718  MySubscription->name, &err);
3719  if (LogRepWorkerWalRcvConn == NULL)
3720  ereport(ERROR,
3721  (errcode(ERRCODE_CONNECTION_FAILURE),
3722  errmsg("could not connect to the publisher: %s", err)));
3723 
3724  /*
3725  * We don't really use the output identify_system for anything but it
3726  * does some initializations on the upstream so let's still call it.
3727  */
3728  (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
3729 
3730  /*
3731  * Allocate the origin name in long-lived context for error context
3732  * message.
3733  */
3735  originname);
3736  }
3737 
3738  /*
3739  * Setup callback for syscache so that we know when something changes in
3740  * the subscription relation state.
3741  */
3744  (Datum) 0);
3745 
3746  /* Build logical replication streaming options. */
3747  options.logical = true;
3748  options.startpoint = origin_startpos;
3749  options.slotname = myslotname;
3750 
3752  options.proto.logical.proto_version =
3756 
3757  options.proto.logical.publication_names = MySubscription->publications;
3758  options.proto.logical.binary = MySubscription->binary;
3759  options.proto.logical.streaming = MySubscription->stream;
3760  options.proto.logical.twophase = false;
3761 
3762  if (!am_tablesync_worker())
3763  {
3764  /*
3765  * Even when the two_phase mode is requested by the user, it remains
3766  * as the tri-state PENDING until all tablesyncs have reached READY
3767  * state. Only then, can it become ENABLED.
3768  *
3769  * Note: If the subscription has no tables then leave the state as
3770  * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
3771  * work.
3772  */
3775  {
3776  /* Start streaming with two_phase enabled */
3777  options.proto.logical.twophase = true;
3779 
3784  }
3785  else
3786  {
3788  }
3789 
3790  ereport(DEBUG1,
3791  (errmsg("logical replication apply worker for subscription \"%s\" two_phase is %s",
3796  "?")));
3797  }
3798  else
3799  {
3800  /* Start normal logical streaming replication. */
3802  }
3803 
3804  /* Run the main loop. */
3805  start_apply(origin_startpos);
3806 
3807  proc_exit(0);
3808 }
3809 
3810 /*
3811  * After error recovery, disable the subscription in a new transaction
3812  * and exit cleanly.
3813  */
3814 static void
3816 {
3817  /*
3818  * Emit the error message, and recover from the error state to an idle
3819  * state
3820  */
3821  HOLD_INTERRUPTS();
3822 
3823  EmitErrorReport();
3825  FlushErrorState();
3826 
3828 
3829  /* Report the worker failed during either table synchronization or apply */
3831  !am_tablesync_worker());
3832 
3833  /* Disable the subscription */
3837 
3838  /* Notify the subscription has been disabled and exit */
3839  ereport(LOG,
3840  errmsg("logical replication subscription \"%s\" has been disabled due to an error",
3841  MySubscription->name));
3842 
3843  proc_exit(0);
3844 }
3845 
3846 /*
3847  * Is current process a logical replication worker?
3848  */
3849 bool
3851 {
3852  return MyLogicalRepWorker != NULL;
3853 }
3854 
3855 /*
3856  * Start skipping changes of the transaction if the given LSN matches the
3857  * LSN specified by subscription's skiplsn.
3858  */
3859 static void
3861 {
3865 
3866  /*
3867  * Quick return if it's not requested to skip this transaction. This
3868  * function is called for every remote transaction and we assume that
3869  * skipping the transaction is not used often.
3870  */
3872  MySubscription->skiplsn != finish_lsn))
3873  return;
3874 
3875  /* Start skipping all changes of this transaction */
3876  skip_xact_finish_lsn = finish_lsn;
3877 
3878  ereport(LOG,
3879  errmsg("start skipping logical replication transaction finished at %X/%X",
3881 }
3882 
3883 /*
3884  * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
3885  */
3886 static void
3888 {
3889  if (!is_skipping_changes())
3890  return;
3891 
3892  ereport(LOG,
3893  (errmsg("done skipping logical replication transaction finished at %X/%X",
3895 
3896  /* Stop skipping changes */
3898 }
3899 
3900 /*
3901  * Clear subskiplsn of pg_subscription catalog.
3902  *
3903  * finish_lsn is the transaction's finish LSN that is used to check if the
3904  * subskiplsn matches it. If not matched, we raise a warning when clearing the
3905  * subskiplsn in order to inform users for cases e.g., where the user mistakenly
3906  * specified the wrong subskiplsn.
3907  */
3908 static void
3910 {
3911  Relation rel;
3912  Form_pg_subscription subform;
3913  HeapTuple tup;
3914  XLogRecPtr myskiplsn = MySubscription->skiplsn;
3915  bool started_tx = false;
3916 
3917  if (likely(XLogRecPtrIsInvalid(myskiplsn)))
3918  return;
3919 
3920  if (!IsTransactionState())
3921  {
3923  started_tx = true;
3924  }
3925 
3926  /*
3927  * Protect subskiplsn of pg_subscription from being concurrently updated
3928  * while clearing it.
3929  */
3930  LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
3931  AccessShareLock);
3932 
3933  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
3934 
3935  /* Fetch the existing tuple. */
3938 
3939  if (!HeapTupleIsValid(tup))
3940  elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
3941 
3942  subform = (Form_pg_subscription) GETSTRUCT(tup);
3943 
3944  /*
3945  * Clear the subskiplsn. If the user has already changed subskiplsn before
3946  * clearing it we don't update the catalog and the replication origin
3947  * state won't get advanced. So in the worst case, if the server crashes
3948  * before sending an acknowledgment of the flush position the transaction
3949  * will be sent again and the user needs to set subskiplsn again. We can
3950  * reduce the possibility by logging a replication origin WAL record to
3951  * advance the origin LSN instead but there is no way to advance the
3952  * origin timestamp and it doesn't seem to be worth doing anything about
3953  * it since it's a very rare case.
3954  */
3955  if (subform->subskiplsn == myskiplsn)
3956  {
3957  bool nulls[Natts_pg_subscription];
3958  bool replaces[Natts_pg_subscription];
3959  Datum values[Natts_pg_subscription];
3960 
3961  memset(values, 0, sizeof(values));
3962  memset(nulls, false, sizeof(nulls));
3963  memset(replaces, false, sizeof(replaces));
3964 
3965  /* reset subskiplsn */
3966  values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
3967  replaces[Anum_pg_subscription_subskiplsn - 1] = true;
3968 
3969  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
3970  replaces);
3971  CatalogTupleUpdate(rel, &tup->t_self, tup);
3972 
3973  if (myskiplsn != finish_lsn)
3974  ereport(WARNING,
3975  errmsg("skip-LSN of logical replication subscription \"%s\" cleared", MySubscription->name),
3976  errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X",
3977  LSN_FORMAT_ARGS(finish_lsn),
3978  LSN_FORMAT_ARGS(myskiplsn)));
3979  }
3980 
3981  heap_freetuple(tup);
3982  table_close(rel, NoLock);
3983 
3984  if (started_tx)
3986 }
3987 
3988 /* Error callback to give more context info about the change being applied */
3989 static void
3991 {
3993 
3995  return;
3996 
3997  Assert(errarg->origin_name);
3998 
3999  if (errarg->rel == NULL)
4000  {
4001  if (!TransactionIdIsValid(errarg->remote_xid))
4002  errcontext("processing remote data for replication origin \"%s\" during \"%s\"",
4003  errarg->origin_name,
4004  logicalrep_message_type(errarg->command));
4005  else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4006  errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u",
4007  errarg->origin_name,
4009  errarg->remote_xid);
4010  else
4011  errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u finished at %X/%X",
4012  errarg->origin_name,
4014  errarg->remote_xid,
4015  LSN_FORMAT_ARGS(errarg->finish_lsn));
4016  }
4017  else if (errarg->remote_attnum < 0)
4018  errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" in transaction %u finished at %X/%X",
4019  errarg->origin_name,
4021  errarg->rel->remoterel.nspname,
4022  errarg->rel->remoterel.relname,
4023  errarg->remote_xid,
4024  LSN_FORMAT_ARGS(errarg->finish_lsn));
4025  else
4026  errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u finished at %X/%X",
4027  errarg->origin_name,
4029  errarg->rel->remoterel.nspname,
4030  errarg->rel->remoterel.relname,
4031  errarg->rel->remoterel.attnames[errarg->remote_attnum],
4032  errarg->remote_xid,
4033  LSN_FORMAT_ARGS(errarg->finish_lsn));
4034 }
4035 
4036 /* Set transaction information of apply error callback */
4037 static inline void
4039 {
4042 }
4043 
4044 /* Reset all information of apply error callback */
4045 static inline void
4047 {
4052 }
AclResult
Definition: acl.h:181
@ ACLCHECK_OK
Definition: acl.h:182
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3512
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:5007
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:1739
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:3319
static void begin_replication_step(void)
Definition: worker.c:398
static void end_replication_step(void)
Definition: worker.c:421
static void cleanup_subxact_info(void)
Definition: worker.c:3464
static void apply_handle_stream_prepare(StringInfo s)
Definition: worker.c:1073
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition: worker.c:1717
static void maybe_reread_subscription(void)
Definition: worker.c:3018
static void subxact_info_add(TransactionId xid)
Definition: worker.c:3241
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:475
static MemoryContext ApplyMessageContext
Definition: worker.c:245
static void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:3340
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:380
static void apply_handle_type(StringInfo s)
Definition: worker.c:1579
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:2349
static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:1350
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:2654
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup)
Definition: worker.c:1887
static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
Definition: worker.c:3481
static void start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
Definition: worker.c:3502
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:3117
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:437
struct ApplyExecutionData ApplyExecutionData
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:3326
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:1595
static BufFile * stream_fd
Definition: worker.c:280
static void apply_dispatch(StringInfo s)
Definition: worker.c:2471
static void apply_handle_update(StringInfo s)
Definition: worker.c:1780
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:1469
static void stop_skipping_changes(void)
Definition: worker.c:3887
struct ApplySubXactData ApplySubXactData
#define NAPTIME_PER_CYCLE
Definition: worker.c:199
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:2591
static void apply_handle_commit_prepared(StringInfo s)
Definition: worker.c:973
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:2670
bool IsLogicalWorker(void)
Definition: worker.c:3850
static ApplySubXactData subxact_data
Definition: worker.c:298
static void DisableSubscriptionAndExit(void)
Definition: worker.c:3815
static void store_flush_position(XLogRecPtr remote_lsn)
Definition: worker.c:2635
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
Definition: worker.c:2115
static ApplyErrorCallbackArg apply_error_callback_arg
Definition: worker.c:235
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:2080
bool in_remote_transaction
Definition: worker.c:256
static XLogRecPtr skip_xact_finish_lsn
Definition: worker.c:276
static void apply_handle_delete(StringInfo s)
Definition: worker.c:1952
#define is_skipping_changes()
Definition: worker.c:277
static void stream_write_change(char action, StringInfo s)
Definition: worker.c:3437
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
Definition: worker.c:3909
static void apply_handle_begin(StringInfo s)
Definition: worker.c:810
static dlist_head lsn_mapping
Definition: worker.c:208
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition: worker.c:2027
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:616
static void finish_edata(ApplyExecutionData *edata)
Definition: worker.c:528
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:717
static void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:4038
static void start_apply(XLogRecPtr origin_startpos)
Definition: worker.c:3545
static void apply_handle_commit(StringInfo s)
Definition: worker.c:832
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:1249
static void apply_handle_relation(StringInfo s)
Definition: worker.c:1556
struct ApplyErrorCallbackArg ApplyErrorCallbackArg
MemoryContext ApplyContext
Definition: worker.c:246
static void subxact_info_write(Oid subid, TransactionId xid)
Definition: worker.c:3132
static void TargetPrivilegesCheck(Relation rel, AclMode mode)
Definition: worker.c:1612
static void apply_handle_prepare(StringInfo s)
Definition: worker.c:918
static void apply_handle_rollback_prepared(StringInfo s)
Definition: worker.c:1016
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:1216
static void apply_handle_origin(StringInfo s)
Definition: worker.c:1131
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:2929
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:251
static XLogRecPtr remote_final_lsn
Definition: worker.c:257
static bool MySubscriptionValid
Definition: worker.c:254
static MemoryContext LogicalStreamingContext
Definition: worker.c:249
static void stream_open_file(Oid subid, TransactionId xid, bool first)
Definition: worker.c:3367
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
Definition: worker.c:1503
static bool in_streamed_transaction
Definition: worker.c:260
struct SubXactInfo SubXactInfo
static void apply_handle_begin_prepare(StringInfo s)
Definition: worker.c:858
struct FlushPosition FlushPosition
void ApplyWorkerMain(Datum main_arg)
Definition: worker.c:3573
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:1149
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Definition: worker.c:3860
static void apply_error_callback(void *arg)
Definition: worker.c:3990
Subscription * MySubscription
Definition: worker.c:253
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
Definition: worker.c:884
static void stream_close_file(void)
Definition: worker.c:3416
static TransactionId stream_xid
Definition: worker.c:262
static void apply_handle_insert(StringInfo s)
Definition: worker.c:1644
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:559
static void subxact_info_read(Oid subid, TransactionId xid)
Definition: worker.c:3181
static void reset_apply_error_context_info(void)
Definition: worker.c:4046
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1705
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1538
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_IDLE
@ STATE_RUNNING
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:738
static Datum values[MAXATTR]
Definition: bootstrap.c:156
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:582
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:286
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
Definition: buffile.c:782
void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset)
Definition: buffile.c:900
void BufFileWrite(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:625
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:689
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition: buffile.c:262
void BufFileClose(BufFile *file)
Definition: buffile.c:407
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
Definition: buffile.c:359
unsigned int uint32
Definition: c.h:452
#define likely(x)
Definition: c.h:283
uint32 TransactionId
Definition: c.h:598
#define OidIsValid(objectId)
Definition: c.h:721
size_t Size
Definition: c.h:551
int64 TimestampTz
Definition: timestamp.h:39
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:144
int my_log2(long num)
Definition: dynahash.c:1765
int errmsg_internal(const char *fmt,...)
Definition: elog.c:991
void EmitErrorReport(void)
Definition: elog.c:1504
int errcode_for_file_access(void)
Definition: elog.c:716
int errdetail(const char *fmt,...)
Definition: elog.c:1037
ErrorContextCallback * error_context_stack
Definition: elog.c:93
void FlushErrorState(void)
Definition: elog.c:1649
int errcode(int sqlerrcode)
Definition: elog.c:693
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define LOG
Definition: elog.h:25
#define PG_RE_THROW()
Definition: elog.h:340
#define errcontext
Definition: elog.h:190
#define PG_END_TRY()
Definition: elog.h:324
#define PG_TRY()
Definition: elog.h:299
#define WARNING
Definition: elog.h:30
#define DEBUG2
Definition: elog.h:23
#define DEBUG1
Definition: elog.h:24
#define ERROR
Definition: elog.h:33
#define elog(elevel,...)
Definition: elog.h:218
#define PG_CATCH()
Definition: elog.h:309
#define ereport(elevel,...)
Definition: elog.h:143
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:3564
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:160
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:231
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:156
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1782
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition: execMain.c:1195
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2932
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2511
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1191
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1552
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1831
void ExecInitRangeTable(EState *estate, List *rangeTable)
Definition: execUtils.c:751
EState * CreateExecutorState(void)
Definition: execUtils.c:90
void FreeExecutorState(EState *estate)
Definition: execUtils.c:186
#define GetPerTupleExprContext(estate)
Definition: executor.h:537
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:542
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:229
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:320
void FileSetInit(FileSet *fileset)
Definition: fileset.c:54
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
Definition: fmgr.c:1648
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1630
struct Latch * MyLatch
Definition: globals.c:58
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:8376
@ PGC_S_OVERRIDE
Definition: guc.h:120
@ PGC_SUSET
Definition: guc.h:75
@ PGC_SIGHUP
Definition: guc.h:72
@ PGC_BACKEND
Definition: guc.h:74
void ProcessConfigFile(GucContext context)
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1113
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:649
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:515
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:543
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define DLIST_STATIC_INIT(name)
Definition: ilist.h:248
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:301
#define write(a, b, c)
Definition: win32.h:14
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void AcceptInvalidationMessages(void)
Definition: inval.c:746
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1519
void proc_exit(int code)
Definition: ipc.c:104
int i
Definition: isn.c:73
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:524
void ResetLatch(Latch *latch)
Definition: latch.c:683
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_TIMEOUT
Definition: latch.h:128
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
void logicalrep_worker_attach(int slot)
Definition: launcher.c:564
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:58
Assert(fmt[strlen(fmt) - 1] !='\n')
List * lappend(List *list, void *datum)
Definition: list.c:336
List * lappend_oid(List *list, Oid datum)
Definition: list.c:372
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:701
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1046
int LOCKMODE
Definition: lockdefs.h:26
#define NoLock
Definition: lockdefs.h:34
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define AccessShareLock
Definition: lockdefs.h:36
#define RowExclusiveLock
Definition: lockdefs.h:38
@ LockTupleExclusive
Definition: lockoptions.h:58
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:38
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:39
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:92
LogicalRepMsgType
Definition: logicalproto.h:53
@ LOGICAL_REP_MSG_INSERT
Definition: logicalproto.h:57
@ LOGICAL_REP_MSG_TRUNCATE
Definition: logicalproto.h:60
@ LOGICAL_REP_MSG_STREAM_STOP
Definition: logicalproto.h:69
@ LOGICAL_REP_MSG_BEGIN
Definition: logicalproto.h:54
@ LOGICAL_REP_MSG_STREAM_PREPARE
Definition: logicalproto.h:72
@ LOGICAL_REP_MSG_STREAM_ABORT
Definition: logicalproto.h:71
@ LOGICAL_REP_MSG_BEGIN_PREPARE
Definition: logicalproto.h:64
@ LOGICAL_REP_MSG_STREAM_START
Definition: logicalproto.h:68
@ LOGICAL_REP_MSG_COMMIT
Definition: logicalproto.h:55
@ LOGICAL_REP_MSG_PREPARE
Definition: logicalproto.h:65
@ LOGICAL_REP_MSG_RELATION
Definition: logicalproto.h:61
@ LOGICAL_REP_MSG_MESSAGE
Definition: logicalproto.h:63
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
Definition: logicalproto.h:67
@ LOGICAL_REP_MSG_COMMIT_PREPARED
Definition: logicalproto.h:66
@ LOGICAL_REP_MSG_TYPE
Definition: logicalproto.h:62
@ LOGICAL_REP_MSG_DELETE
Definition: logicalproto.h:59
@ LOGICAL_REP_MSG_STREAM_COMMIT
Definition: logicalproto.h:70
@ LOGICAL_REP_MSG_ORIGIN
Definition: logicalproto.h:56
@ LOGICAL_REP_MSG_UPDATE
Definition: logicalproto.h:58
uint32 LogicalRepRelId
Definition: logicalproto.h:96
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:37
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:94
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:93
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1909
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2831
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition: lsyscache.c:2897
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:143
MemoryContext TopTransactionContext
Definition: mcxt.c:53
void pfree(void *pointer)
Definition: mcxt.c:1175
MemoryContext TopMemoryContext
Definition: mcxt.c:48
void * palloc0(Size size)
Definition: mcxt.c:1099
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1188
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1292
void * palloc(Size size)
Definition: mcxt.c:1068
#define AllocSetContextCreate
Definition: memutils.h:173
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:197
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:67
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:134
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:132
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition: miscinit.c:913
Oid GetUserId(void)
Definition: miscinit.c:492
CmdType
Definition: nodes.h:719
@ CMD_INSERT
Definition: nodes.h:723
@ CMD_DELETE
Definition: nodes.h:724
@ CMD_UPDATE
Definition: nodes.h:722
#define makeNode(_type_)
Definition: nodes.h:621
ObjectType get_relkind_objtype(char relkind)
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:157
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:209
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:240
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1071
RepOriginId replorigin_session_origin
Definition: origin.c:155
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1206
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:156
#define InvalidRepOriginId
Definition: origin.h:33
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define ACL_DELETE
Definition: parsenodes.h:85
#define ACL_INSERT
Definition: parsenodes.h:82
#define ACL_UPDATE
Definition: parsenodes.h:84
@ RTE_RELATION
Definition: parsenodes.h:998
@ DROP_RESTRICT
Definition: parsenodes.h:2207
#define ACL_SELECT
Definition: parsenodes.h:83
#define ACL_TRUNCATE
Definition: parsenodes.h:86
uint32 AclMode
Definition: parsenodes.h:80
int16 attnum
Definition: pg_attribute.h:83
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:207
void * arg
static PgChecksumMode mode
Definition: pg_checksums.c:65
#define NAMEDATALEN
#define MAXPGPATH
const void size_t len
static int server_version
Definition: pg_dumpall.c:85
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:256
#define lfirst(lc)
Definition: pg_list.h:169
#define NIL
Definition: pg_list.h:65
#define list_make1(x1)
Definition: pg_list.h:206
static void * list_nth(const List *list, int n)
Definition: pg_list.h:278
#define lfirst_oid(lc)
Definition: pg_list.h:171
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
static char ** options
void FreeSubscription(Subscription *sub)
void DisableSubscription(Oid subid)
Subscription * GetSubscription(Oid subid, bool missing_ok)
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_PENDING
#define LOGICALREP_TWOPHASE_STATE_ENABLED
FormData_pg_subscription * Form_pg_subscription
#define die(msg)
Definition: pg_test_fsync.c:95
static char * buf
Definition: pg_test_fsync.c:67
long pgstat_report_stat(bool force)
Definition: pgstat.c:565
void pgstat_report_subscription_error(Oid subid, bool is_apply_error)
int64 timestamp
Expr * expression_planner(Expr *expr)
Definition: planner.c:5879
int pgsocket
Definition: port.h:29
#define snprintf
Definition: port.h:225
#define PGINVALID_SOCKET
Definition: port.h:31
uintptr_t Datum
Definition: postgres.h:411
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define DatumGetInt32(X)
Definition: postgres.h:516
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5713
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5684
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
char * c
static int fd(const char *x, int i)
Definition: preproc-init.c:105
char * s2
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:109
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:563
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
Definition: proto.c:336
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
Definition: proto.c:145
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:617
char * logicalrep_message_type(LogicalRepMsgType action)
Definition: proto.c:1197
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:755
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:492
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:74
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
Definition: proto.c:278
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:699
void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid)
Definition: proto.c:1184
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:376
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:1142
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:436
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:239
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition: proto.c:1092
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389
#define RelationGetRelid(relation)
Definition: rel.h:488
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:685
#define RelationGetDescr(relation)
Definition: rel.h:514
#define RelationGetRelationName(relation)
Definition: rel.h:522
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:642
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4863
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4884
void fill_extraUpdatedCols(RangeTblEntry *target_rte, Relation target_relation)
Node * build_column_default(Relation rel, int attrno)
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition: rls.c:52
@ RLS_ENABLED
Definition: rls.h:45
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:180
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:250
void PopActiveSnapshot(void)
Definition: snapmgr.c:776
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:682
void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
Definition: relation.c:523
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition: relation.c:585
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:157
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:319
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:456
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
TransactionId remote_xid
Definition: worker.c:230
LogicalRepMsgType command
Definition: worker.c:225
XLogRecPtr finish_lsn
Definition: worker.c:231
LogicalRepRelMapEntry * rel
Definition: worker.c:226
ResultRelInfo * targetRelInfo
Definition: worker.c:215
EState * estate
Definition: worker.c:212
PartitionTupleRouting * proute
Definition: worker.c:219
ModifyTableState * mtstate
Definition: worker.c:218
LogicalRepRelMapEntry * targetRel
Definition: worker.c:214
uint32 nsubxacts
Definition: worker.c:292
uint32 nsubxacts_max
Definition: worker.c:293
SubXactInfo * subxacts
Definition: worker.c:295
TransactionId subxact_last
Definition: worker.c:294
Definition: attmap.h:35
int maplen
Definition: attmap.h:37
AttrNumber * attnums
Definition: attmap.h:36
List * es_range_table
Definition: execnodes.h:592
List * es_tupleTable
Definition: execnodes.h:634
List * es_opened_result_relations
Definition: execnodes.h:610
CommandId es_output_cid
Definition: execnodes.h:604
struct ErrorContextCallback * previous
Definition: elog.h:232
void(* callback)(void *arg)
Definition: elog.h:233
dlist_node node
Definition: worker.c:203
XLogRecPtr remote_end
Definition: worker.c:205
XLogRecPtr local_end
Definition: worker.c:204
ItemPointerData t_self
Definition: htup.h:65
Definition: pg_list.h:51
XLogRecPtr final_lsn
Definition: logicalproto.h:124
TransactionId xid
Definition: logicalproto.h:126
TimestampTz committime
Definition: logicalproto.h:133
LogicalRepRelation remoterel
StringInfoData * colvalues
Definition: logicalproto.h:82
TimestampTz last_recv_time
TimestampTz reply_time
FileSet * stream_fileset
XLogRecPtr reply_lsn
XLogRecPtr last_lsn
TimestampTz last_send_time
CmdType operation
Definition: execnodes.h:1226
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1230
PlanState ps
Definition: execnodes.h:1225
Plan * plan
Definition: execnodes.h:998
EState * state
Definition: execnodes.h:1000
Bitmapset * updatedCols
Definition: parsenodes.h:1169
RTEKind rtekind
Definition: parsenodes.h:1015
Form_pg_class rd_rel
Definition: rel.h:109
TupleTableSlot * ri_PartitionTupleSlot
Definition: execnodes.h:540
Relation ri_RelationDesc
Definition: execnodes.h:433
TupleConversionMap * ri_RootToPartitionMap
Definition: execnodes.h:539
off_t offset
Definition: worker.c:286
TransactionId xid
Definition: worker.c:284
int fileno
Definition: worker.c:285
XLogRecPtr skiplsn
AttrMap * attrMap
Definition: tupconvert.h:28
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
bool * tts_isnull
Definition: tuptable.h:128
Datum * tts_values
Definition: tuptable.h:126
dlist_node * cur
Definition: ilist.h:180
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:179
@ SUBSCRIPTIONRELMAP
Definition: syscache.h:100
@ SUBSCRIPTIONOID
Definition: syscache.h:99
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs)
Definition: tablecmds.c:1782
bool AllTablesyncsReady(void)
Definition: tablesync.c:1519
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:271
void ReplicationOriginNameForTablesync(Oid suboid, Oid relid, char *originname, int szorgname)
Definition: tablesync.c:1168
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:590
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:1183
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1544
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdIsValid(xid)
Definition: transam.h:41
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4981
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4961
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:177
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:102
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:475
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:354
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
Definition: twophase.c:2579
void FinishPreparedTransaction(const char *gid, bool isCommit)
Definition: twophase.c:1482
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
@ WAIT_EVENT_LOGICAL_APPLY_MAIN
Definition: wait_event.h:43
static StringInfoData reply_message
Definition: walreceiver.c:119
int wal_receiver_status_interval
Definition: walreceiver.c:90
int wal_receiver_timeout
Definition: walreceiver.c:91
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:418
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:404
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:424
#define walrcv_server_version(conn)
Definition: walreceiver.h:414
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:420
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:412
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:422
int WalWriterDelay
Definition: walwriter.c:70
#define SIGHUP
Definition: win32_port.h:167
static bool am_tablesync_worker(void)
bool PrepareTransactionBlock(const char *gid)
Definition: xact.c:3788
bool IsTransactionState(void)
Definition: xact.c:374
void CommandCounterIncrement(void)
Definition: xact.c:1074
void StartTransactionCommand(void)
Definition: xact.c:2925
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:886
void BeginTransactionBlock(void)
Definition: xact.c:3720
void CommitTransactionCommand(void)
Definition: xact.c:3022
void AbortOutOfAnyTransaction(void)
Definition: xact.c:4662
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:814
#define GIDSIZE
Definition: xact.h:31
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:5927
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:252
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint32 TimeLineID
Definition: xlogdefs.h:59