PostgreSQL Source Code  git master
worker.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * worker.c
3  * PostgreSQL logical replication worker (apply)
4  *
5  * Copyright (c) 2016-2021, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/worker.c
9  *
10  * NOTES
11  * This file contains the worker which applies logical changes as they come
12  * from remote logical replication stream.
13  *
14  * The main worker (apply) is started by logical replication worker
15  * launcher for every enabled subscription in a database. It uses
16  * walsender protocol to communicate with publisher.
17  *
18  * This module includes server facing code and shares libpqwalreceiver
19  * module with walreceiver for providing the libpq specific functionality.
20  *
21  *
22  * STREAMED TRANSACTIONS
23  * ---------------------
24  * Streamed transactions (large transactions exceeding a memory limit on the
25  * upstream) are not applied immediately, but instead, the data is written
26  * to temporary files and then applied at once when the final commit arrives.
27  *
28  * Unlike the regular (non-streamed) case, handling streamed transactions has
29  * to handle aborts of both the toplevel transaction and subtransactions. This
30  * is achieved by tracking offsets for subtransactions, which is then used
31  * to truncate the file with serialized changes.
32  *
33  * The files are placed in tmp file directory by default, and the filenames
34  * include both the XID of the toplevel transaction and OID of the
35  * subscription. This is necessary so that different workers processing a
36  * remote transaction with the same XID doesn't interfere.
37  *
38  * We use BufFiles instead of using normal temporary files because (a) the
39  * BufFile infrastructure supports temporary files that exceed the OS file size
40  * limit, (b) provides a way for automatic clean up on the error and (c) provides
41  * a way to survive these files across local transactions and allow to open and
42  * close at stream start and close. We decided to use FileSet
43  * infrastructure as without that it deletes the files on the closure of the
44  * file and if we decide to keep stream files open across the start/stop stream
45  * then it will consume a lot of memory (more than 8K for each BufFile and
46  * there could be multiple such BufFiles as the subscriber could receive
47  * multiple start/stop streams for different transactions before getting the
48  * commit). Moreover, if we don't use FileSet then we also need to invent
49  * a new way to pass filenames to BufFile APIs so that we are allowed to open
50  * the file we desired across multiple stream-open calls for the same
51  * transaction.
52  *
53  * TWO_PHASE TRANSACTIONS
54  * ----------------------
55  * Two phase transactions are replayed at prepare and then committed or
56  * rolled back at commit prepared and rollback prepared respectively. It is
57  * possible to have a prepared transaction that arrives at the apply worker
58  * when the tablesync is busy doing the initial copy. In this case, the apply
59  * worker skips all the prepared operations [e.g. inserts] while the tablesync
60  * is still busy (see the condition of should_apply_changes_for_rel). The
61  * tablesync worker might not get such a prepared transaction because say it
62  * was prior to the initial consistent point but might have got some later
63  * commits. Now, the tablesync worker will exit without doing anything for the
64  * prepared transaction skipped by the apply worker as the sync location for it
65  * will be already ahead of the apply worker's current location. This would lead
66  * to an "empty prepare", because later when the apply worker does the commit
67  * prepare, there is nothing in it (the inserts were skipped earlier).
68  *
69  * To avoid this, and similar prepare confusions the subscription's two_phase
70  * commit is enabled only after the initial sync is over. The two_phase option
71  * has been implemented as a tri-state with values DISABLED, PENDING, and
72  * ENABLED.
73  *
74  * Even if the user specifies they want a subscription with two_phase = on,
75  * internally it will start with a tri-state of PENDING which only becomes
76  * ENABLED after all tablesync initializations are completed - i.e. when all
77  * tablesync workers have reached their READY state. In other words, the value
78  * PENDING is only a temporary state for subscription start-up.
79  *
80  * Until the two_phase is properly available (ENABLED) the subscription will
81  * behave as if two_phase = off. When the apply worker detects that all
82  * tablesyncs have become READY (while the tri-state was PENDING) it will
83  * restart the apply worker process. This happens in
84  * process_syncing_tables_for_apply.
85  *
86  * When the (re-started) apply worker finds that all tablesyncs are READY for a
87  * two_phase tri-state of PENDING it start streaming messages with the
88  * two_phase option which in turn enables the decoding of two-phase commits at
89  * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
90  * Now, it is possible that during the time we have not enabled two_phase, the
91  * publisher (replication server) would have skipped some prepares but we
92  * ensure that such prepares are sent along with commit prepare, see
93  * ReorderBufferFinishPrepared.
94  *
95  * If the subscription has no tables then a two_phase tri-state PENDING is
96  * left unchanged. This lets the user still do an ALTER TABLE REFRESH
97  * PUBLICATION which might otherwise be disallowed (see below).
98  *
99  * If ever a user needs to be aware of the tri-state value, they can fetch it
100  * from the pg_subscription catalog (see column subtwophasestate).
101  *
102  * We don't allow to toggle two_phase option of a subscription because it can
103  * lead to an inconsistent replica. Consider, initially, it was on and we have
104  * received some prepare then we turn it off, now at commit time the server
105  * will send the entire transaction data along with the commit. With some more
106  * analysis, we can allow changing this option from off to on but not sure if
107  * that alone would be useful.
108  *
109  * Finally, to avoid problems mentioned in previous paragraphs from any
110  * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
111  * to 'off' and then again back to 'on') there is a restriction for
112  * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
113  * the two_phase tri-state is ENABLED, except when copy_data = false.
114  *
115  * We can get prepare of the same GID more than once for the genuine cases
116  * where we have defined multiple subscriptions for publications on the same
117  * server and prepared transaction has operations on tables subscribed to those
118  * subscriptions. For such cases, if we use the GID sent by publisher one of
119  * the prepares will be successful and others will fail, in which case the
120  * server will send them again. Now, this can lead to a deadlock if user has
121  * set synchronous_standby_names for all the subscriptions on subscriber. To
122  * avoid such deadlocks, we generate a unique GID (consisting of the
123  * subscription oid and the xid of the prepared transaction) for each prepare
124  * transaction on the subscriber.
125  *-------------------------------------------------------------------------
126  */
127 
128 #include "postgres.h"
129 
130 #include <sys/stat.h>
131 #include <unistd.h>
132 
133 #include "access/table.h"
134 #include "access/tableam.h"
135 #include "access/twophase.h"
136 #include "access/xact.h"
137 #include "access/xlog_internal.h"
138 #include "catalog/catalog.h"
139 #include "catalog/namespace.h"
140 #include "catalog/partition.h"
141 #include "catalog/pg_inherits.h"
142 #include "catalog/pg_subscription.h"
144 #include "catalog/pg_tablespace.h"
145 #include "commands/tablecmds.h"
146 #include "commands/tablespace.h"
147 #include "commands/trigger.h"
148 #include "executor/executor.h"
149 #include "executor/execPartition.h"
151 #include "funcapi.h"
152 #include "libpq/pqformat.h"
153 #include "libpq/pqsignal.h"
154 #include "mb/pg_wchar.h"
155 #include "miscadmin.h"
156 #include "nodes/makefuncs.h"
157 #include "optimizer/optimizer.h"
158 #include "pgstat.h"
159 #include "postmaster/bgworker.h"
160 #include "postmaster/interrupt.h"
161 #include "postmaster/postmaster.h"
162 #include "postmaster/walwriter.h"
163 #include "replication/decode.h"
164 #include "replication/logical.h"
168 #include "replication/origin.h"
170 #include "replication/snapbuild.h"
171 #include "replication/walreceiver.h"
173 #include "rewrite/rewriteHandler.h"
174 #include "storage/buffile.h"
175 #include "storage/bufmgr.h"
176 #include "storage/fd.h"
177 #include "storage/ipc.h"
178 #include "storage/lmgr.h"
179 #include "storage/proc.h"
180 #include "storage/procarray.h"
181 #include "tcop/tcopprot.h"
182 #include "utils/builtins.h"
183 #include "utils/catcache.h"
184 #include "utils/dynahash.h"
185 #include "utils/datum.h"
186 #include "utils/fmgroids.h"
187 #include "utils/guc.h"
188 #include "utils/inval.h"
189 #include "utils/lsyscache.h"
190 #include "utils/memutils.h"
191 #include "utils/rel.h"
192 #include "utils/syscache.h"
193 #include "utils/timeout.h"
194 
195 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
196 
197 typedef struct FlushPosition
198 {
202 } FlushPosition;
203 
205 
206 typedef struct ApplyExecutionData
207 {
208  EState *estate; /* executor state, used to track resources */
209 
210  LogicalRepRelMapEntry *targetRel; /* replication target rel */
211  ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
212 
213  /* These fields are used when the target relation is partitioned: */
214  ModifyTableState *mtstate; /* dummy ModifyTable state */
215  PartitionTupleRouting *proute; /* partition routing info */
217 
218 /* Struct for saving and restoring apply errcontext information */
219 typedef struct ApplyErrorCallbackArg
220 {
221  LogicalRepMsgType command; /* 0 if invalid */
223 
224  /* Remote node information */
225  int remote_attnum; /* -1 if invalid */
227  TimestampTz ts; /* commit, rollback, or prepare timestamp */
229 
231 {
232  .command = 0,
233  .rel = NULL,
234  .remote_attnum = -1,
235  .remote_xid = InvalidTransactionId,
236  .ts = 0,
237 };
238 
241 
242 /* per stream context for streaming transactions */
244 
246 
248 bool MySubscriptionValid = false;
249 
252 
253 /* fields valid only when processing streamed transaction */
254 static bool in_streamed_transaction = false;
255 
257 
258 /* BufFile handle of the current streaming file */
259 static BufFile *stream_fd = NULL;
260 
261 typedef struct SubXactInfo
262 {
263  TransactionId xid; /* XID of the subxact */
264  int fileno; /* file number in the buffile */
265  off_t offset; /* offset in the file */
266 } SubXactInfo;
267 
268 /* Sub-transaction data for the current streaming transaction */
269 typedef struct ApplySubXactData
270 {
271  uint32 nsubxacts; /* number of sub-transactions */
272  uint32 nsubxacts_max; /* current capacity of subxacts */
273  TransactionId subxact_last; /* xid of the last sub-transaction */
274  SubXactInfo *subxacts; /* sub-xact offset in changes file */
276 
278 
279 static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
280 static inline void changes_filename(char *path, Oid subid, TransactionId xid);
281 
282 /*
283  * Information about subtransactions of a given toplevel transaction.
284  */
285 static void subxact_info_write(Oid subid, TransactionId xid);
286 static void subxact_info_read(Oid subid, TransactionId xid);
287 static void subxact_info_add(TransactionId xid);
288 static inline void cleanup_subxact_info(void);
289 
290 /*
291  * Serialize and deserialize changes for a toplevel transaction.
292  */
293 static void stream_cleanup_files(Oid subid, TransactionId xid);
294 static void stream_open_file(Oid subid, TransactionId xid, bool first);
295 static void stream_write_change(char action, StringInfo s);
296 static void stream_close_file(void);
297 
298 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
299 
300 static void store_flush_position(XLogRecPtr remote_lsn);
301 
302 static void maybe_reread_subscription(void);
303 
304 /* prototype needed because of stream_commit */
305 static void apply_dispatch(StringInfo s);
306 
307 static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
309  ResultRelInfo *relinfo,
310  TupleTableSlot *remoteslot);
312  ResultRelInfo *relinfo,
313  TupleTableSlot *remoteslot,
314  LogicalRepTupleData *newtup);
316  ResultRelInfo *relinfo,
317  TupleTableSlot *remoteslot);
318 static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
319  LogicalRepRelation *remoterel,
320  TupleTableSlot *remoteslot,
321  TupleTableSlot **localslot);
323  TupleTableSlot *remoteslot,
324  LogicalRepTupleData *newtup,
325  CmdType operation);
326 
327 /* Compute GID for two_phase transactions */
328 static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
329 
330 /* Common streaming function to apply all the spooled messages */
331 static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
332 
333 /* Functions for apply error callback */
334 static void apply_error_callback(void *arg);
335 static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts);
336 static inline void reset_apply_error_context_info(void);
337 
338 /*
339  * Should this worker apply changes for given relation.
340  *
341  * This is mainly needed for initial relation data sync as that runs in
342  * separate worker process running in parallel and we need some way to skip
343  * changes coming to the main apply worker during the sync of a table.
344  *
345  * Note we need to do smaller or equals comparison for SYNCDONE state because
346  * it might hold position of end of initial slot consistent point WAL
347  * record + 1 (ie start of next record) and next record can be COMMIT of
348  * transaction we are now processing (which is what we set remote_final_lsn
349  * to in apply_handle_begin).
350  */
351 static bool
353 {
354  if (am_tablesync_worker())
355  return MyLogicalRepWorker->relid == rel->localreloid;
356  else
357  return (rel->state == SUBREL_STATE_READY ||
358  (rel->state == SUBREL_STATE_SYNCDONE &&
359  rel->statelsn <= remote_final_lsn));
360 }
361 
362 /*
363  * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
364  *
365  * Start a transaction, if this is the first step (else we keep using the
366  * existing transaction).
367  * Also provide a global snapshot and ensure we run in ApplyMessageContext.
368  */
369 static void
371 {
373 
374  if (!IsTransactionState())
375  {
378  }
379 
381 
382  MemoryContextSwitchTo(ApplyMessageContext);
383 }
384 
385 /*
386  * Finish up one step of a replication transaction.
387  * Callers of begin_replication_step() must also call this.
388  *
389  * We don't close out the transaction here, but we should increment
390  * the command counter to make the effects of this step visible.
391  */
392 static void
394 {
396 
398 }
399 
400 /*
401  * Handle streamed transactions.
402  *
403  * If in streaming mode (receiving a block of streamed transaction), we
404  * simply redirect it to a file for the proper toplevel transaction.
405  *
406  * Returns true for streamed transactions, false otherwise (regular mode).
407  */
408 static bool
410 {
411  TransactionId xid;
412 
413  /* not in streaming mode */
415  return false;
416 
417  Assert(stream_fd != NULL);
419 
420  /*
421  * We should have received XID of the subxact as the first part of the
422  * message, so extract it.
423  */
424  xid = pq_getmsgint(s, 4);
425 
426  if (!TransactionIdIsValid(xid))
427  ereport(ERROR,
428  (errcode(ERRCODE_PROTOCOL_VIOLATION),
429  errmsg_internal("invalid transaction ID in streamed replication transaction")));
430 
431  /* Add the new subxact to the array (unless already there). */
432  subxact_info_add(xid);
433 
434  /* write the change to the current file */
435  stream_write_change(action, s);
436 
437  return true;
438 }
439 
440 /*
441  * Executor state preparation for evaluation of constraint expressions,
442  * indexes and triggers for the specified relation.
443  *
444  * Note that the caller must open and close any indexes to be updated.
445  */
446 static ApplyExecutionData *
448 {
449  ApplyExecutionData *edata;
450  EState *estate;
451  RangeTblEntry *rte;
452  ResultRelInfo *resultRelInfo;
453 
454  edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
455  edata->targetRel = rel;
456 
457  edata->estate = estate = CreateExecutorState();
458 
459  rte = makeNode(RangeTblEntry);
460  rte->rtekind = RTE_RELATION;
461  rte->relid = RelationGetRelid(rel->localrel);
462  rte->relkind = rel->localrel->rd_rel->relkind;
464  ExecInitRangeTable(estate, list_make1(rte));
465 
466  edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
467 
468  /*
469  * Use Relation opened by logicalrep_rel_open() instead of opening it
470  * again.
471  */
472  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
473 
474  /*
475  * We put the ResultRelInfo in the es_opened_result_relations list, even
476  * though we don't populate the es_result_relations array. That's a bit
477  * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
478  *
479  * ExecOpenIndices() is not called here either, each execution path doing
480  * an apply operation being responsible for that.
481  */
483  lappend(estate->es_opened_result_relations, resultRelInfo);
484 
485  estate->es_output_cid = GetCurrentCommandId(true);
486 
487  /* Prepare to catch AFTER triggers. */
489 
490  /* other fields of edata remain NULL for now */
491 
492  return edata;
493 }
494 
495 /*
496  * Finish any operations related to the executor state created by
497  * create_edata_for_relation().
498  */
499 static void
501 {
502  EState *estate = edata->estate;
503 
504  /* Handle any queued AFTER triggers. */
505  AfterTriggerEndQuery(estate);
506 
507  /* Shut down tuple routing, if any was done. */
508  if (edata->proute)
509  ExecCleanupTupleRouting(edata->mtstate, edata->proute);
510 
511  /*
512  * Cleanup. It might seem that we should call ExecCloseResultRelations()
513  * here, but we intentionally don't. It would close the rel we added to
514  * es_opened_result_relations above, which is wrong because we took no
515  * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
516  * any other relations opened during execution.
517  */
518  ExecResetTupleTable(estate->es_tupleTable, false);
519  FreeExecutorState(estate);
520  pfree(edata);
521 }
522 
523 /*
524  * Executes default values for columns for which we can't map to remote
525  * relation columns.
526  *
527  * This allows us to support tables which have more columns on the downstream
528  * than on the upstream.
529  */
530 static void
532  TupleTableSlot *slot)
533 {
534  TupleDesc desc = RelationGetDescr(rel->localrel);
535  int num_phys_attrs = desc->natts;
536  int i;
537  int attnum,
538  num_defaults = 0;
539  int *defmap;
540  ExprState **defexprs;
541  ExprContext *econtext;
542 
543  econtext = GetPerTupleExprContext(estate);
544 
545  /* We got all the data via replication, no need to evaluate anything. */
546  if (num_phys_attrs == rel->remoterel.natts)
547  return;
548 
549  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
550  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
551 
552  Assert(rel->attrmap->maplen == num_phys_attrs);
553  for (attnum = 0; attnum < num_phys_attrs; attnum++)
554  {
555  Expr *defexpr;
556 
557  if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
558  continue;
559 
560  if (rel->attrmap->attnums[attnum] >= 0)
561  continue;
562 
563  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
564 
565  if (defexpr != NULL)
566  {
567  /* Run the expression through planner */
568  defexpr = expression_planner(defexpr);
569 
570  /* Initialize executable expression in copycontext */
571  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
572  defmap[num_defaults] = attnum;
573  num_defaults++;
574  }
575 
576  }
577 
578  for (i = 0; i < num_defaults; i++)
579  slot->tts_values[defmap[i]] =
580  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
581 }
582 
583 /*
584  * Store tuple data into slot.
585  *
586  * Incoming data can be either text or binary format.
587  */
588 static void
590  LogicalRepTupleData *tupleData)
591 {
592  int natts = slot->tts_tupleDescriptor->natts;
593  int i;
594 
595  ExecClearTuple(slot);
596 
597  /* Call the "in" function for each non-dropped, non-null attribute */
598  Assert(natts == rel->attrmap->maplen);
599  for (i = 0; i < natts; i++)
600  {
602  int remoteattnum = rel->attrmap->attnums[i];
603 
604  if (!att->attisdropped && remoteattnum >= 0)
605  {
606  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
607 
608  Assert(remoteattnum < tupleData->ncols);
609 
610  /* Set attnum for error callback */
611  apply_error_callback_arg.remote_attnum = remoteattnum;
612 
613  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
614  {
615  Oid typinput;
616  Oid typioparam;
617 
618  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
619  slot->tts_values[i] =
620  OidInputFunctionCall(typinput, colvalue->data,
621  typioparam, att->atttypmod);
622  slot->tts_isnull[i] = false;
623  }
624  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
625  {
626  Oid typreceive;
627  Oid typioparam;
628 
629  /*
630  * In some code paths we may be asked to re-parse the same
631  * tuple data. Reset the StringInfo's cursor so that works.
632  */
633  colvalue->cursor = 0;
634 
635  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
636  slot->tts_values[i] =
637  OidReceiveFunctionCall(typreceive, colvalue,
638  typioparam, att->atttypmod);
639 
640  /* Trouble if it didn't eat the whole buffer */
641  if (colvalue->cursor != colvalue->len)
642  ereport(ERROR,
643  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
644  errmsg("incorrect binary data format in logical replication column %d",
645  remoteattnum + 1)));
646  slot->tts_isnull[i] = false;
647  }
648  else
649  {
650  /*
651  * NULL value from remote. (We don't expect to see
652  * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
653  * NULL.)
654  */
655  slot->tts_values[i] = (Datum) 0;
656  slot->tts_isnull[i] = true;
657  }
658 
659  /* Reset attnum for error callback */
660  apply_error_callback_arg.remote_attnum = -1;
661  }
662  else
663  {
664  /*
665  * We assign NULL to dropped attributes and missing values
666  * (missing values should be later filled using
667  * slot_fill_defaults).
668  */
669  slot->tts_values[i] = (Datum) 0;
670  slot->tts_isnull[i] = true;
671  }
672  }
673 
674  ExecStoreVirtualTuple(slot);
675 }
676 
677 /*
678  * Replace updated columns with data from the LogicalRepTupleData struct.
679  * This is somewhat similar to heap_modify_tuple but also calls the type
680  * input functions on the user data.
681  *
682  * "slot" is filled with a copy of the tuple in "srcslot", replacing
683  * columns provided in "tupleData" and leaving others as-is.
684  *
685  * Caution: unreplaced pass-by-ref columns in "slot" will point into the
686  * storage for "srcslot". This is OK for current usage, but someday we may
687  * need to materialize "slot" at the end to make it independent of "srcslot".
688  */
689 static void
692  LogicalRepTupleData *tupleData)
693 {
694  int natts = slot->tts_tupleDescriptor->natts;
695  int i;
696 
697  /* We'll fill "slot" with a virtual tuple, so we must start with ... */
698  ExecClearTuple(slot);
699 
700  /*
701  * Copy all the column data from srcslot, so that we'll have valid values
702  * for unreplaced columns.
703  */
704  Assert(natts == srcslot->tts_tupleDescriptor->natts);
705  slot_getallattrs(srcslot);
706  memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
707  memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
708 
709  /* Call the "in" function for each replaced attribute */
710  Assert(natts == rel->attrmap->maplen);
711  for (i = 0; i < natts; i++)
712  {
714  int remoteattnum = rel->attrmap->attnums[i];
715 
716  if (remoteattnum < 0)
717  continue;
718 
719  Assert(remoteattnum < tupleData->ncols);
720 
721  if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
722  {
723  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
724 
725  /* Set attnum for error callback */
726  apply_error_callback_arg.remote_attnum = remoteattnum;
727 
728  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
729  {
730  Oid typinput;
731  Oid typioparam;
732 
733  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
734  slot->tts_values[i] =
735  OidInputFunctionCall(typinput, colvalue->data,
736  typioparam, att->atttypmod);
737  slot->tts_isnull[i] = false;
738  }
739  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
740  {
741  Oid typreceive;
742  Oid typioparam;
743 
744  /*
745  * In some code paths we may be asked to re-parse the same
746  * tuple data. Reset the StringInfo's cursor so that works.
747  */
748  colvalue->cursor = 0;
749 
750  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
751  slot->tts_values[i] =
752  OidReceiveFunctionCall(typreceive, colvalue,
753  typioparam, att->atttypmod);
754 
755  /* Trouble if it didn't eat the whole buffer */
756  if (colvalue->cursor != colvalue->len)
757  ereport(ERROR,
758  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
759  errmsg("incorrect binary data format in logical replication column %d",
760  remoteattnum + 1)));
761  slot->tts_isnull[i] = false;
762  }
763  else
764  {
765  /* must be LOGICALREP_COLUMN_NULL */
766  slot->tts_values[i] = (Datum) 0;
767  slot->tts_isnull[i] = true;
768  }
769 
770  /* Reset attnum for error callback */
771  apply_error_callback_arg.remote_attnum = -1;
772  }
773  }
774 
775  /* And finally, declare that "slot" contains a valid virtual tuple */
776  ExecStoreVirtualTuple(slot);
777 }
778 
779 /*
780  * Handle BEGIN message.
781  */
782 static void
784 {
785  LogicalRepBeginData begin_data;
786 
787  logicalrep_read_begin(s, &begin_data);
788  set_apply_error_context_xact(begin_data.xid, begin_data.committime);
789 
790  remote_final_lsn = begin_data.final_lsn;
791 
792  in_remote_transaction = true;
793 
795 }
796 
797 /*
798  * Handle COMMIT message.
799  *
800  * TODO, support tracking of multiple origins
801  */
802 static void
804 {
805  LogicalRepCommitData commit_data;
806 
807  logicalrep_read_commit(s, &commit_data);
808 
809  if (commit_data.commit_lsn != remote_final_lsn)
810  ereport(ERROR,
811  (errcode(ERRCODE_PROTOCOL_VIOLATION),
812  errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
813  LSN_FORMAT_ARGS(commit_data.commit_lsn),
815 
816  apply_handle_commit_internal(&commit_data);
817 
818  /* Process any tables that are being synchronized in parallel. */
819  process_syncing_tables(commit_data.end_lsn);
820 
823 }
824 
825 /*
826  * Handle BEGIN PREPARE message.
827  */
828 static void
830 {
831  LogicalRepPreparedTxnData begin_data;
832 
833  /* Tablesync should never receive prepare. */
834  if (am_tablesync_worker())
835  ereport(ERROR,
836  (errcode(ERRCODE_PROTOCOL_VIOLATION),
837  errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
838 
839  logicalrep_read_begin_prepare(s, &begin_data);
840  set_apply_error_context_xact(begin_data.xid, begin_data.prepare_time);
841 
842  remote_final_lsn = begin_data.prepare_lsn;
843 
844  in_remote_transaction = true;
845 
847 }
848 
849 /*
850  * Common function to prepare the GID.
851  */
852 static void
854 {
855  char gid[GIDSIZE];
856 
857  /*
858  * Compute unique GID for two_phase transactions. We don't use GID of
859  * prepared transaction sent by server as that can lead to deadlock when
860  * we have multiple subscriptions from same node point to publications on
861  * the same node. See comments atop worker.c
862  */
863  TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
864  gid, sizeof(gid));
865 
866  /*
867  * BeginTransactionBlock is necessary to balance the EndTransactionBlock
868  * called within the PrepareTransactionBlock below.
869  */
871  CommitTransactionCommand(); /* Completes the preceding Begin command. */
872 
873  /*
874  * Update origin state so we can restart streaming from correct position
875  * in case of crash.
876  */
877  replorigin_session_origin_lsn = prepare_data->end_lsn;
879 
881 }
882 
883 /*
884  * Handle PREPARE message.
885  */
886 static void
888 {
889  LogicalRepPreparedTxnData prepare_data;
890 
891  logicalrep_read_prepare(s, &prepare_data);
892 
893  if (prepare_data.prepare_lsn != remote_final_lsn)
894  ereport(ERROR,
895  (errcode(ERRCODE_PROTOCOL_VIOLATION),
896  errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
897  LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
899 
900  /*
901  * Unlike commit, here, we always prepare the transaction even though no
902  * change has happened in this transaction. It is done this way because at
903  * commit prepared time, we won't know whether we have skipped preparing a
904  * transaction because of no change.
905  *
906  * XXX, We can optimize such that at commit prepared time, we first check
907  * whether we have prepared the transaction or not but that doesn't seem
908  * worthwhile because such cases shouldn't be common.
909  */
911 
912  apply_handle_prepare_internal(&prepare_data);
913 
916  pgstat_report_stat(false);
917 
918  store_flush_position(prepare_data.end_lsn);
919 
920  in_remote_transaction = false;
921 
922  /* Process any tables that are being synchronized in parallel. */
923  process_syncing_tables(prepare_data.end_lsn);
924 
927 }
928 
929 /*
930  * Handle a COMMIT PREPARED of a previously PREPARED transaction.
931  */
932 static void
934 {
935  LogicalRepCommitPreparedTxnData prepare_data;
936  char gid[GIDSIZE];
937 
938  logicalrep_read_commit_prepared(s, &prepare_data);
939  set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_time);
940 
941  /* Compute GID for two_phase transactions. */
942  TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
943  gid, sizeof(gid));
944 
945  /* There is no transaction when COMMIT PREPARED is called */
947 
948  /*
949  * Update origin state so we can restart streaming from correct position
950  * in case of crash.
951  */
952  replorigin_session_origin_lsn = prepare_data.end_lsn;
954 
955  FinishPreparedTransaction(gid, true);
958  pgstat_report_stat(false);
959 
960  store_flush_position(prepare_data.end_lsn);
961  in_remote_transaction = false;
962 
963  /* Process any tables that are being synchronized in parallel. */
964  process_syncing_tables(prepare_data.end_lsn);
965 
968 }
969 
970 /*
971  * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
972  */
973 static void
975 {
976  LogicalRepRollbackPreparedTxnData rollback_data;
977  char gid[GIDSIZE];
978 
979  logicalrep_read_rollback_prepared(s, &rollback_data);
980  set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_time);
981 
982  /* Compute GID for two_phase transactions. */
983  TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
984  gid, sizeof(gid));
985 
986  /*
987  * It is possible that we haven't received prepare because it occurred
988  * before walsender reached a consistent point or the two_phase was still
989  * not enabled by that time, so in such cases, we need to skip rollback
990  * prepared.
991  */
992  if (LookupGXact(gid, rollback_data.prepare_end_lsn,
993  rollback_data.prepare_time))
994  {
995  /*
996  * Update origin state so we can restart streaming from correct
997  * position in case of crash.
998  */
1001 
1002  /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1004  FinishPreparedTransaction(gid, false);
1007  }
1008 
1009  pgstat_report_stat(false);
1010 
1011  store_flush_position(rollback_data.rollback_end_lsn);
1012  in_remote_transaction = false;
1013 
1014  /* Process any tables that are being synchronized in parallel. */
1016 
1019 }
1020 
1021 /*
1022  * Handle STREAM PREPARE.
1023  *
1024  * Logic is in two parts:
1025  * 1. Replay all the spooled operations
1026  * 2. Mark the transaction as prepared
1027  */
1028 static void
1030 {
1031  LogicalRepPreparedTxnData prepare_data;
1032 
1034  ereport(ERROR,
1035  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1036  errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1037 
1038  /* Tablesync should never receive prepare. */
1039  if (am_tablesync_worker())
1040  ereport(ERROR,
1041  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1042  errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1043 
1044  logicalrep_read_stream_prepare(s, &prepare_data);
1045  set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_time);
1046 
1047  elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
1048 
1049  /* Replay all the spooled operations. */
1050  apply_spooled_messages(prepare_data.xid, prepare_data.prepare_lsn);
1051 
1052  /* Mark the transaction as prepared. */
1053  apply_handle_prepare_internal(&prepare_data);
1054 
1056 
1057  pgstat_report_stat(false);
1058 
1059  store_flush_position(prepare_data.end_lsn);
1060 
1061  in_remote_transaction = false;
1062 
1063  /* unlink the files with serialized changes and subxact info. */
1065 
1066  /* Process any tables that are being synchronized in parallel. */
1067  process_syncing_tables(prepare_data.end_lsn);
1068 
1070 
1072 }
1073 
1074 /*
1075  * Handle ORIGIN message.
1076  *
1077  * TODO, support tracking of multiple origins
1078  */
1079 static void
1081 {
1082  /*
1083  * ORIGIN message can only come inside streaming transaction or inside
1084  * remote transaction and before any actual writes.
1085  */
1086  if (!in_streamed_transaction &&
1089  ereport(ERROR,
1090  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1091  errmsg_internal("ORIGIN message sent out of order")));
1092 }
1093 
1094 /*
1095  * Handle STREAM START message.
1096  */
1097 static void
1099 {
1100  bool first_segment;
1101 
1103  ereport(ERROR,
1104  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1105  errmsg_internal("duplicate STREAM START message")));
1106 
1107  /*
1108  * Start a transaction on stream start, this transaction will be committed
1109  * on the stream stop unless it is a tablesync worker in which case it
1110  * will be committed after processing all the messages. We need the
1111  * transaction for handling the buffile, used for serializing the
1112  * streaming data and subxact info.
1113  */
1115 
1116  /* notify handle methods we're processing a remote transaction */
1117  in_streamed_transaction = true;
1118 
1119  /* extract XID of the top-level transaction */
1120  stream_xid = logicalrep_read_stream_start(s, &first_segment);
1121 
1123  ereport(ERROR,
1124  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1125  errmsg_internal("invalid transaction ID in streamed replication transaction")));
1126 
1128 
1129  /*
1130  * Initialize the worker's stream_fileset if we haven't yet. This will be
1131  * used for the entire duration of the worker so create it in a permanent
1132  * context. We create this on the very first streaming message from any
1133  * transaction and then use it for this and other streaming transactions.
1134  * Now, we could create a fileset at the start of the worker as well but
1135  * then we won't be sure that it will ever be used.
1136  */
1137  if (MyLogicalRepWorker->stream_fileset == NULL)
1138  {
1139  MemoryContext oldctx;
1140 
1141  oldctx = MemoryContextSwitchTo(ApplyContext);
1142 
1145 
1146  MemoryContextSwitchTo(oldctx);
1147  }
1148 
1149  /* open the spool file for this transaction */
1151 
1152  /* if this is not the first segment, open existing subxact file */
1153  if (!first_segment)
1155 
1157 
1159 }
1160 
1161 /*
1162  * Handle STREAM STOP message.
1163  */
1164 static void
1166 {
1168  ereport(ERROR,
1169  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1170  errmsg_internal("STREAM STOP message without STREAM START")));
1171 
1172  /*
1173  * Close the file with serialized changes, and serialize information about
1174  * subxacts for the toplevel transaction.
1175  */
1178 
1179  /* We must be in a valid transaction state */
1181 
1182  /* Commit the per-stream transaction */
1184 
1185  in_streamed_transaction = false;
1186 
1187  /* Reset per-stream context */
1188  MemoryContextReset(LogicalStreamingContext);
1189 
1192 }
1193 
1194 /*
1195  * Handle STREAM abort message.
1196  */
1197 static void
1199 {
1200  TransactionId xid;
1201  TransactionId subxid;
1202 
1204  ereport(ERROR,
1205  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1206  errmsg_internal("STREAM ABORT message without STREAM STOP")));
1207 
1208  logicalrep_read_stream_abort(s, &xid, &subxid);
1209 
1210  /*
1211  * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1212  * just delete the files with serialized info.
1213  */
1214  if (xid == subxid)
1215  {
1218  }
1219  else
1220  {
1221  /*
1222  * OK, so it's a subxact. We need to read the subxact file for the
1223  * toplevel transaction, determine the offset tracked for the subxact,
1224  * and truncate the file with changes. We also remove the subxacts
1225  * with higher offsets (or rather higher XIDs).
1226  *
1227  * We intentionally scan the array from the tail, because we're likely
1228  * aborting a change for the most recent subtransactions.
1229  *
1230  * We can't use the binary search here as subxact XIDs won't
1231  * necessarily arrive in sorted order, consider the case where we have
1232  * released the savepoint for multiple subtransactions and then
1233  * performed rollback to savepoint for one of the earlier
1234  * sub-transaction.
1235  */
1236  int64 i;
1237  int64 subidx;
1238  BufFile *fd;
1239  bool found = false;
1240  char path[MAXPGPATH];
1241 
1242  set_apply_error_context_xact(subxid, 0);
1243 
1244  subidx = -1;
1247 
1248  for (i = subxact_data.nsubxacts; i > 0; i--)
1249  {
1250  if (subxact_data.subxacts[i - 1].xid == subxid)
1251  {
1252  subidx = (i - 1);
1253  found = true;
1254  break;
1255  }
1256  }
1257 
1258  /*
1259  * If it's an empty sub-transaction then we will not find the subxid
1260  * here so just cleanup the subxact info and return.
1261  */
1262  if (!found)
1263  {
1264  /* Cleanup the subxact info */
1269  return;
1270  }
1271 
1272  /* open the changes file */
1275  O_RDWR, false);
1276 
1277  /* OK, truncate the file at the right offset */
1278  BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
1279  subxact_data.subxacts[subidx].offset);
1280  BufFileClose(fd);
1281 
1282  /* discard the subxacts added later */
1283  subxact_data.nsubxacts = subidx;
1284 
1285  /* write the updated subxact list */
1287 
1290  }
1291 
1293 }
1294 
1295 /*
1296  * Common spoolfile processing.
1297  */
1298 static void
1300 {
1302  int nchanges;
1303  char path[MAXPGPATH];
1304  char *buffer = NULL;
1305  MemoryContext oldcxt;
1306  BufFile *fd;
1307 
1308  /* Make sure we have an open transaction */
1310 
1311  /*
1312  * Allocate file handle and memory required to process all the messages in
1313  * TopTransactionContext to avoid them getting reset after each message is
1314  * processed.
1315  */
1317 
1318  /* Open the spool file for the committed/prepared transaction */
1320  elog(DEBUG1, "replaying changes from file \"%s\"", path);
1321 
1323  false);
1324 
1325  buffer = palloc(BLCKSZ);
1326  initStringInfo(&s2);
1327 
1328  MemoryContextSwitchTo(oldcxt);
1329 
1330  remote_final_lsn = lsn;
1331 
1332  /*
1333  * Make sure the handle apply_dispatch methods are aware we're in a remote
1334  * transaction.
1335  */
1336  in_remote_transaction = true;
1338 
1340 
1341  /*
1342  * Read the entries one by one and pass them through the same logic as in
1343  * apply_dispatch.
1344  */
1345  nchanges = 0;
1346  while (true)
1347  {
1348  int nbytes;
1349  int len;
1350 
1352 
1353  /* read length of the on-disk record */
1354  nbytes = BufFileRead(fd, &len, sizeof(len));
1355 
1356  /* have we reached end of the file? */
1357  if (nbytes == 0)
1358  break;
1359 
1360  /* do we have a correct length? */
1361  if (nbytes != sizeof(len))
1362  ereport(ERROR,
1364  errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1365  path)));
1366 
1367  if (len <= 0)
1368  elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
1369  len, path);
1370 
1371  /* make sure we have sufficiently large buffer */
1372  buffer = repalloc(buffer, len);
1373 
1374  /* and finally read the data into the buffer */
1375  if (BufFileRead(fd, buffer, len) != len)
1376  ereport(ERROR,
1378  errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1379  path)));
1380 
1381  /* copy the buffer to the stringinfo and call apply_dispatch */
1382  resetStringInfo(&s2);
1383  appendBinaryStringInfo(&s2, buffer, len);
1384 
1385  /* Ensure we are reading the data into our memory context. */
1386  oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
1387 
1388  apply_dispatch(&s2);
1389 
1390  MemoryContextReset(ApplyMessageContext);
1391 
1392  MemoryContextSwitchTo(oldcxt);
1393 
1394  nchanges++;
1395 
1396  if (nchanges % 1000 == 0)
1397  elog(DEBUG1, "replayed %d changes from file \"%s\"",
1398  nchanges, path);
1399  }
1400 
1401  BufFileClose(fd);
1402 
1403  pfree(buffer);
1404  pfree(s2.data);
1405 
1406  elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
1407  nchanges, path);
1408 
1409  return;
1410 }
1411 
1412 /*
1413  * Handle STREAM COMMIT message.
1414  */
1415 static void
1417 {
1418  TransactionId xid;
1419  LogicalRepCommitData commit_data;
1420 
1422  ereport(ERROR,
1423  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1424  errmsg_internal("STREAM COMMIT message without STREAM STOP")));
1425 
1426  xid = logicalrep_read_stream_commit(s, &commit_data);
1427  set_apply_error_context_xact(xid, commit_data.committime);
1428 
1429  elog(DEBUG1, "received commit for streamed transaction %u", xid);
1430 
1431  apply_spooled_messages(xid, commit_data.commit_lsn);
1432 
1433  apply_handle_commit_internal(&commit_data);
1434 
1435  /* unlink the files with serialized changes and subxact info */
1437 
1438  /* Process any tables that are being synchronized in parallel. */
1439  process_syncing_tables(commit_data.end_lsn);
1440 
1442 
1444 }
1445 
1446 /*
1447  * Helper function for apply_handle_commit and apply_handle_stream_commit.
1448  */
1449 static void
1451 {
1452  if (IsTransactionState())
1453  {
1454  /*
1455  * Update origin state so we can restart streaming from correct
1456  * position in case of crash.
1457  */
1458  replorigin_session_origin_lsn = commit_data->end_lsn;
1460 
1462  pgstat_report_stat(false);
1463 
1464  store_flush_position(commit_data->end_lsn);
1465  }
1466  else
1467  {
1468  /* Process any invalidation messages that might have accumulated. */
1471  }
1472 
1473  in_remote_transaction = false;
1474 }
1475 
1476 /*
1477  * Handle RELATION message.
1478  *
1479  * Note we don't do validation against local schema here. The validation
1480  * against local schema is postponed until first change for given relation
1481  * comes as we only care about it when applying changes for it anyway and we
1482  * do less locking this way.
1483  */
1484 static void
1486 {
1487  LogicalRepRelation *rel;
1488 
1490  return;
1491 
1492  rel = logicalrep_read_rel(s);
1494 }
1495 
1496 /*
1497  * Handle TYPE message.
1498  *
1499  * This is now vestigial; we read the info and discard it.
1500  */
1501 static void
1503 {
1504  LogicalRepTyp typ;
1505 
1507  return;
1508 
1509  logicalrep_read_typ(s, &typ);
1510 }
1511 
1512 /*
1513  * Get replica identity index or if it is not defined a primary key.
1514  *
1515  * If neither is defined, returns InvalidOid
1516  */
1517 static Oid
1519 {
1520  Oid idxoid;
1521 
1522  idxoid = RelationGetReplicaIndex(rel);
1523 
1524  if (!OidIsValid(idxoid))
1525  idxoid = RelationGetPrimaryKeyIndex(rel);
1526 
1527  return idxoid;
1528 }
1529 
1530 /*
1531  * Handle INSERT message.
1532  */
1533 
1534 static void
1536 {
1537  LogicalRepRelMapEntry *rel;
1538  LogicalRepTupleData newtup;
1539  LogicalRepRelId relid;
1540  ApplyExecutionData *edata;
1541  EState *estate;
1542  TupleTableSlot *remoteslot;
1543  MemoryContext oldctx;
1544 
1546  return;
1547 
1549 
1550  relid = logicalrep_read_insert(s, &newtup);
1551  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1552  if (!should_apply_changes_for_rel(rel))
1553  {
1554  /*
1555  * The relation can't become interesting in the middle of the
1556  * transaction so it's safe to unlock it.
1557  */
1560  return;
1561  }
1562 
1563  /* Set relation for error callback */
1564  apply_error_callback_arg.rel = rel;
1565 
1566  /* Initialize the executor state. */
1567  edata = create_edata_for_relation(rel);
1568  estate = edata->estate;
1569  remoteslot = ExecInitExtraTupleSlot(estate,
1570  RelationGetDescr(rel->localrel),
1571  &TTSOpsVirtual);
1572 
1573  /* Process and store remote tuple in the slot */
1575  slot_store_data(remoteslot, rel, &newtup);
1576  slot_fill_defaults(rel, estate, remoteslot);
1577  MemoryContextSwitchTo(oldctx);
1578 
1579  /* For a partitioned table, insert the tuple into a partition. */
1580  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1582  remoteslot, NULL, CMD_INSERT);
1583  else
1585  remoteslot);
1586 
1587  finish_edata(edata);
1588 
1589  /* Reset relation for error callback */
1590  apply_error_callback_arg.rel = NULL;
1591 
1593 
1595 }
1596 
1597 /*
1598  * Workhorse for apply_handle_insert()
1599  * relinfo is for the relation we're actually inserting into
1600  * (could be a child partition of edata->targetRelInfo)
1601  */
1602 static void
1604  ResultRelInfo *relinfo,
1605  TupleTableSlot *remoteslot)
1606 {
1607  EState *estate = edata->estate;
1608 
1609  /* We must open indexes here. */
1610  ExecOpenIndices(relinfo, false);
1611 
1612  /* Do the insert. */
1613  ExecSimpleRelationInsert(relinfo, estate, remoteslot);
1614 
1615  /* Cleanup. */
1616  ExecCloseIndices(relinfo);
1617 }
1618 
1619 /*
1620  * Check if the logical replication relation is updatable and throw
1621  * appropriate error if it isn't.
1622  */
1623 static void
1625 {
1626  /* Updatable, no error. */
1627  if (rel->updatable)
1628  return;
1629 
1630  /*
1631  * We are in error mode so it's fine this is somewhat slow. It's better to
1632  * give user correct error.
1633  */
1635  {
1636  ereport(ERROR,
1637  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1638  errmsg("publisher did not send replica identity column "
1639  "expected by the logical replication target relation \"%s.%s\"",
1640  rel->remoterel.nspname, rel->remoterel.relname)));
1641  }
1642 
1643  ereport(ERROR,
1644  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1645  errmsg("logical replication target relation \"%s.%s\" has "
1646  "neither REPLICA IDENTITY index nor PRIMARY "
1647  "KEY and published relation does not have "
1648  "REPLICA IDENTITY FULL",
1649  rel->remoterel.nspname, rel->remoterel.relname)));
1650 }
1651 
1652 /*
1653  * Handle UPDATE message.
1654  *
1655  * TODO: FDW support
1656  */
1657 static void
1659 {
1660  LogicalRepRelMapEntry *rel;
1661  LogicalRepRelId relid;
1662  ApplyExecutionData *edata;
1663  EState *estate;
1664  LogicalRepTupleData oldtup;
1665  LogicalRepTupleData newtup;
1666  bool has_oldtup;
1667  TupleTableSlot *remoteslot;
1668  RangeTblEntry *target_rte;
1669  MemoryContext oldctx;
1670 
1672  return;
1673 
1675 
1676  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
1677  &newtup);
1678  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1679  if (!should_apply_changes_for_rel(rel))
1680  {
1681  /*
1682  * The relation can't become interesting in the middle of the
1683  * transaction so it's safe to unlock it.
1684  */
1687  return;
1688  }
1689 
1690  /* Set relation for error callback */
1691  apply_error_callback_arg.rel = rel;
1692 
1693  /* Check if we can do the update. */
1695 
1696  /* Initialize the executor state. */
1697  edata = create_edata_for_relation(rel);
1698  estate = edata->estate;
1699  remoteslot = ExecInitExtraTupleSlot(estate,
1700  RelationGetDescr(rel->localrel),
1701  &TTSOpsVirtual);
1702 
1703  /*
1704  * Populate updatedCols so that per-column triggers can fire, and so
1705  * executor can correctly pass down indexUnchanged hint. This could
1706  * include more columns than were actually changed on the publisher
1707  * because the logical replication protocol doesn't contain that
1708  * information. But it would for example exclude columns that only exist
1709  * on the subscriber, since we are not touching those.
1710  */
1711  target_rte = list_nth(estate->es_range_table, 0);
1712  for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
1713  {
1715  int remoteattnum = rel->attrmap->attnums[i];
1716 
1717  if (!att->attisdropped && remoteattnum >= 0)
1718  {
1719  Assert(remoteattnum < newtup.ncols);
1720  if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
1721  target_rte->updatedCols =
1722  bms_add_member(target_rte->updatedCols,
1724  }
1725  }
1726 
1727  /* Also populate extraUpdatedCols, in case we have generated columns */
1728  fill_extraUpdatedCols(target_rte, rel->localrel);
1729 
1730  /* Build the search tuple. */
1732  slot_store_data(remoteslot, rel,
1733  has_oldtup ? &oldtup : &newtup);
1734  MemoryContextSwitchTo(oldctx);
1735 
1736  /* For a partitioned table, apply update to correct partition. */
1737  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1739  remoteslot, &newtup, CMD_UPDATE);
1740  else
1742  remoteslot, &newtup);
1743 
1744  finish_edata(edata);
1745 
1746  /* Reset relation for error callback */
1747  apply_error_callback_arg.rel = NULL;
1748 
1750 
1752 }
1753 
1754 /*
1755  * Workhorse for apply_handle_update()
1756  * relinfo is for the relation we're actually updating in
1757  * (could be a child partition of edata->targetRelInfo)
1758  */
1759 static void
1761  ResultRelInfo *relinfo,
1762  TupleTableSlot *remoteslot,
1763  LogicalRepTupleData *newtup)
1764 {
1765  EState *estate = edata->estate;
1766  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
1767  Relation localrel = relinfo->ri_RelationDesc;
1768  EPQState epqstate;
1769  TupleTableSlot *localslot;
1770  bool found;
1771  MemoryContext oldctx;
1772 
1773  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1774  ExecOpenIndices(relinfo, false);
1775 
1776  found = FindReplTupleInLocalRel(estate, localrel,
1777  &relmapentry->remoterel,
1778  remoteslot, &localslot);
1779  ExecClearTuple(remoteslot);
1780 
1781  /*
1782  * Tuple found.
1783  *
1784  * Note this will fail if there are other conflicting unique indexes.
1785  */
1786  if (found)
1787  {
1788  /* Process and store remote tuple in the slot */
1790  slot_modify_data(remoteslot, localslot, relmapentry, newtup);
1791  MemoryContextSwitchTo(oldctx);
1792 
1793  EvalPlanQualSetSlot(&epqstate, remoteslot);
1794 
1795  /* Do the actual update. */
1796  ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
1797  remoteslot);
1798  }
1799  else
1800  {
1801  /*
1802  * The tuple to be updated could not be found. Do nothing except for
1803  * emitting a log message.
1804  *
1805  * XXX should this be promoted to ereport(LOG) perhaps?
1806  */
1807  elog(DEBUG1,
1808  "logical replication did not find row to be updated "
1809  "in replication target relation \"%s\"",
1810  RelationGetRelationName(localrel));
1811  }
1812 
1813  /* Cleanup. */
1814  ExecCloseIndices(relinfo);
1815  EvalPlanQualEnd(&epqstate);
1816 }
1817 
1818 /*
1819  * Handle DELETE message.
1820  *
1821  * TODO: FDW support
1822  */
1823 static void
1825 {
1826  LogicalRepRelMapEntry *rel;
1827  LogicalRepTupleData oldtup;
1828  LogicalRepRelId relid;
1829  ApplyExecutionData *edata;
1830  EState *estate;
1831  TupleTableSlot *remoteslot;
1832  MemoryContext oldctx;
1833 
1835  return;
1836 
1838 
1839  relid = logicalrep_read_delete(s, &oldtup);
1840  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1841  if (!should_apply_changes_for_rel(rel))
1842  {
1843  /*
1844  * The relation can't become interesting in the middle of the
1845  * transaction so it's safe to unlock it.
1846  */
1849  return;
1850  }
1851 
1852  /* Set relation for error callback */
1853  apply_error_callback_arg.rel = rel;
1854 
1855  /* Check if we can do the delete. */
1857 
1858  /* Initialize the executor state. */
1859  edata = create_edata_for_relation(rel);
1860  estate = edata->estate;
1861  remoteslot = ExecInitExtraTupleSlot(estate,
1862  RelationGetDescr(rel->localrel),
1863  &TTSOpsVirtual);
1864 
1865  /* Build the search tuple. */
1867  slot_store_data(remoteslot, rel, &oldtup);
1868  MemoryContextSwitchTo(oldctx);
1869 
1870  /* For a partitioned table, apply delete to correct partition. */
1871  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1873  remoteslot, NULL, CMD_DELETE);
1874  else
1876  remoteslot);
1877 
1878  finish_edata(edata);
1879 
1880  /* Reset relation for error callback */
1881  apply_error_callback_arg.rel = NULL;
1882 
1884 
1886 }
1887 
1888 /*
1889  * Workhorse for apply_handle_delete()
1890  * relinfo is for the relation we're actually deleting from
1891  * (could be a child partition of edata->targetRelInfo)
1892  */
1893 static void
1895  ResultRelInfo *relinfo,
1896  TupleTableSlot *remoteslot)
1897 {
1898  EState *estate = edata->estate;
1899  Relation localrel = relinfo->ri_RelationDesc;
1900  LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
1901  EPQState epqstate;
1902  TupleTableSlot *localslot;
1903  bool found;
1904 
1905  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1906  ExecOpenIndices(relinfo, false);
1907 
1908  found = FindReplTupleInLocalRel(estate, localrel, remoterel,
1909  remoteslot, &localslot);
1910 
1911  /* If found delete it. */
1912  if (found)
1913  {
1914  EvalPlanQualSetSlot(&epqstate, localslot);
1915 
1916  /* Do the actual delete. */
1917  ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
1918  }
1919  else
1920  {
1921  /*
1922  * The tuple to be deleted could not be found. Do nothing except for
1923  * emitting a log message.
1924  *
1925  * XXX should this be promoted to ereport(LOG) perhaps?
1926  */
1927  elog(DEBUG1,
1928  "logical replication did not find row to be deleted "
1929  "in replication target relation \"%s\"",
1930  RelationGetRelationName(localrel));
1931  }
1932 
1933  /* Cleanup. */
1934  ExecCloseIndices(relinfo);
1935  EvalPlanQualEnd(&epqstate);
1936 }
1937 
1938 /*
1939  * Try to find a tuple received from the publication side (in 'remoteslot') in
1940  * the corresponding local relation using either replica identity index,
1941  * primary key or if needed, sequential scan.
1942  *
1943  * Local tuple, if found, is returned in '*localslot'.
1944  */
1945 static bool
1947  LogicalRepRelation *remoterel,
1948  TupleTableSlot *remoteslot,
1949  TupleTableSlot **localslot)
1950 {
1951  Oid idxoid;
1952  bool found;
1953 
1954  *localslot = table_slot_create(localrel, &estate->es_tupleTable);
1955 
1956  idxoid = GetRelationIdentityOrPK(localrel);
1957  Assert(OidIsValid(idxoid) ||
1958  (remoterel->replident == REPLICA_IDENTITY_FULL));
1959 
1960  if (OidIsValid(idxoid))
1961  found = RelationFindReplTupleByIndex(localrel, idxoid,
1963  remoteslot, *localslot);
1964  else
1965  found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
1966  remoteslot, *localslot);
1967 
1968  return found;
1969 }
1970 
1971 /*
1972  * This handles insert, update, delete on a partitioned table.
1973  */
1974 static void
1976  TupleTableSlot *remoteslot,
1977  LogicalRepTupleData *newtup,
1978  CmdType operation)
1979 {
1980  EState *estate = edata->estate;
1981  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
1982  ResultRelInfo *relinfo = edata->targetRelInfo;
1983  Relation parentrel = relinfo->ri_RelationDesc;
1984  ModifyTableState *mtstate;
1985  PartitionTupleRouting *proute;
1986  ResultRelInfo *partrelinfo;
1987  Relation partrel;
1988  TupleTableSlot *remoteslot_part;
1989  TupleConversionMap *map;
1990  MemoryContext oldctx;
1991 
1992  /* ModifyTableState is needed for ExecFindPartition(). */
1993  edata->mtstate = mtstate = makeNode(ModifyTableState);
1994  mtstate->ps.plan = NULL;
1995  mtstate->ps.state = estate;
1996  mtstate->operation = operation;
1997  mtstate->resultRelInfo = relinfo;
1998 
1999  /* ... as is PartitionTupleRouting. */
2000  edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
2001 
2002  /*
2003  * Find the partition to which the "search tuple" belongs.
2004  */
2005  Assert(remoteslot != NULL);
2007  partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
2008  remoteslot, estate);
2009  Assert(partrelinfo != NULL);
2010  partrel = partrelinfo->ri_RelationDesc;
2011 
2012  /*
2013  * To perform any of the operations below, the tuple must match the
2014  * partition's rowtype. Convert if needed or just copy, using a dedicated
2015  * slot to store the tuple in any case.
2016  */
2017  remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
2018  if (remoteslot_part == NULL)
2019  remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
2020  map = partrelinfo->ri_RootToPartitionMap;
2021  if (map != NULL)
2022  remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
2023  remoteslot_part);
2024  else
2025  {
2026  remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
2027  slot_getallattrs(remoteslot_part);
2028  }
2029  MemoryContextSwitchTo(oldctx);
2030 
2031  switch (operation)
2032  {
2033  case CMD_INSERT:
2034  apply_handle_insert_internal(edata, partrelinfo,
2035  remoteslot_part);
2036  break;
2037 
2038  case CMD_DELETE:
2039  apply_handle_delete_internal(edata, partrelinfo,
2040  remoteslot_part);
2041  break;
2042 
2043  case CMD_UPDATE:
2044 
2045  /*
2046  * For UPDATE, depending on whether or not the updated tuple
2047  * satisfies the partition's constraint, perform a simple UPDATE
2048  * of the partition or move the updated tuple into a different
2049  * suitable partition.
2050  */
2051  {
2052  AttrMap *attrmap = map ? map->attrMap : NULL;
2053  LogicalRepRelMapEntry *part_entry;
2054  TupleTableSlot *localslot;
2055  ResultRelInfo *partrelinfo_new;
2056  bool found;
2057 
2058  part_entry = logicalrep_partition_open(relmapentry, partrel,
2059  attrmap);
2060 
2061  /* Get the matching local tuple from the partition. */
2062  found = FindReplTupleInLocalRel(estate, partrel,
2063  &part_entry->remoterel,
2064  remoteslot_part, &localslot);
2065  if (!found)
2066  {
2067  /*
2068  * The tuple to be updated could not be found. Do nothing
2069  * except for emitting a log message.
2070  *
2071  * XXX should this be promoted to ereport(LOG) perhaps?
2072  */
2073  elog(DEBUG1,
2074  "logical replication did not find row to be updated "
2075  "in replication target relation's partition \"%s\"",
2076  RelationGetRelationName(partrel));
2077  return;
2078  }
2079 
2080  /*
2081  * Apply the update to the local tuple, putting the result in
2082  * remoteslot_part.
2083  */
2085  slot_modify_data(remoteslot_part, localslot, part_entry,
2086  newtup);
2087  MemoryContextSwitchTo(oldctx);
2088 
2089  /*
2090  * Does the updated tuple still satisfy the current
2091  * partition's constraint?
2092  */
2093  if (!partrel->rd_rel->relispartition ||
2094  ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
2095  false))
2096  {
2097  /*
2098  * Yes, so simply UPDATE the partition. We don't call
2099  * apply_handle_update_internal() here, which would
2100  * normally do the following work, to avoid repeating some
2101  * work already done above to find the local tuple in the
2102  * partition.
2103  */
2104  EPQState epqstate;
2105 
2106  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
2107  ExecOpenIndices(partrelinfo, false);
2108 
2109  EvalPlanQualSetSlot(&epqstate, remoteslot_part);
2110  ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
2111  localslot, remoteslot_part);
2112  ExecCloseIndices(partrelinfo);
2113  EvalPlanQualEnd(&epqstate);
2114  }
2115  else
2116  {
2117  /* Move the tuple into the new partition. */
2118 
2119  /*
2120  * New partition will be found using tuple routing, which
2121  * can only occur via the parent table. We might need to
2122  * convert the tuple to the parent's rowtype. Note that
2123  * this is the tuple found in the partition, not the
2124  * original search tuple received by this function.
2125  */
2126  if (map)
2127  {
2128  TupleConversionMap *PartitionToRootMap =
2130  RelationGetDescr(parentrel));
2131 
2132  remoteslot =
2133  execute_attr_map_slot(PartitionToRootMap->attrMap,
2134  remoteslot_part, remoteslot);
2135  }
2136  else
2137  {
2138  remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
2139  slot_getallattrs(remoteslot);
2140  }
2141 
2142 
2143  /* Find the new partition. */
2145  partrelinfo_new = ExecFindPartition(mtstate, relinfo,
2146  proute, remoteslot,
2147  estate);
2148  MemoryContextSwitchTo(oldctx);
2149  Assert(partrelinfo_new != partrelinfo);
2150 
2151  /* DELETE old tuple found in the old partition. */
2152  apply_handle_delete_internal(edata, partrelinfo,
2153  localslot);
2154 
2155  /* INSERT new tuple into the new partition. */
2156 
2157  /*
2158  * Convert the replacement tuple to match the destination
2159  * partition rowtype.
2160  */
2162  partrel = partrelinfo_new->ri_RelationDesc;
2163  remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
2164  if (remoteslot_part == NULL)
2165  remoteslot_part = table_slot_create(partrel,
2166  &estate->es_tupleTable);
2167  map = partrelinfo_new->ri_RootToPartitionMap;
2168  if (map != NULL)
2169  {
2170  remoteslot_part = execute_attr_map_slot(map->attrMap,
2171  remoteslot,
2172  remoteslot_part);
2173  }
2174  else
2175  {
2176  remoteslot_part = ExecCopySlot(remoteslot_part,
2177  remoteslot);
2178  slot_getallattrs(remoteslot);
2179  }
2180  MemoryContextSwitchTo(oldctx);
2181  apply_handle_insert_internal(edata, partrelinfo_new,
2182  remoteslot_part);
2183  }
2184  }
2185  break;
2186 
2187  default:
2188  elog(ERROR, "unrecognized CmdType: %d", (int) operation);
2189  break;
2190  }
2191 }
2192 
2193 /*
2194  * Handle TRUNCATE message.
2195  *
2196  * TODO: FDW support
2197  */
2198 static void
2200 {
2201  bool cascade = false;
2202  bool restart_seqs = false;
2203  List *remote_relids = NIL;
2204  List *remote_rels = NIL;
2205  List *rels = NIL;
2206  List *part_rels = NIL;
2207  List *relids = NIL;
2208  List *relids_logged = NIL;
2209  ListCell *lc;
2210  LOCKMODE lockmode = AccessExclusiveLock;
2211 
2213  return;
2214 
2216 
2217  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
2218 
2219  foreach(lc, remote_relids)
2220  {
2221  LogicalRepRelId relid = lfirst_oid(lc);
2222  LogicalRepRelMapEntry *rel;
2223 
2224  rel = logicalrep_rel_open(relid, lockmode);
2225  if (!should_apply_changes_for_rel(rel))
2226  {
2227  /*
2228  * The relation can't become interesting in the middle of the
2229  * transaction so it's safe to unlock it.
2230  */
2231  logicalrep_rel_close(rel, lockmode);
2232  continue;
2233  }
2234 
2235  remote_rels = lappend(remote_rels, rel);
2236  rels = lappend(rels, rel->localrel);
2237  relids = lappend_oid(relids, rel->localreloid);
2239  relids_logged = lappend_oid(relids_logged, rel->localreloid);
2240 
2241  /*
2242  * Truncate partitions if we got a message to truncate a partitioned
2243  * table.
2244  */
2245  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2246  {
2247  ListCell *child;
2248  List *children = find_all_inheritors(rel->localreloid,
2249  lockmode,
2250  NULL);
2251 
2252  foreach(child, children)
2253  {
2254  Oid childrelid = lfirst_oid(child);
2255  Relation childrel;
2256 
2257  if (list_member_oid(relids, childrelid))
2258  continue;
2259 
2260  /* find_all_inheritors already got lock */
2261  childrel = table_open(childrelid, NoLock);
2262 
2263  /*
2264  * Ignore temp tables of other backends. See similar code in
2265  * ExecuteTruncate().
2266  */
2267  if (RELATION_IS_OTHER_TEMP(childrel))
2268  {
2269  table_close(childrel, lockmode);
2270  continue;
2271  }
2272 
2273  rels = lappend(rels, childrel);
2274  part_rels = lappend(part_rels, childrel);
2275  relids = lappend_oid(relids, childrelid);
2276  /* Log this relation only if needed for logical decoding */
2277  if (RelationIsLogicallyLogged(childrel))
2278  relids_logged = lappend_oid(relids_logged, childrelid);
2279  }
2280  }
2281  }
2282 
2283  /*
2284  * Even if we used CASCADE on the upstream primary we explicitly default
2285  * to replaying changes without further cascading. This might be later
2286  * changeable with a user specified option.
2287  */
2288  ExecuteTruncateGuts(rels,
2289  relids,
2290  relids_logged,
2291  DROP_RESTRICT,
2292  restart_seqs);
2293  foreach(lc, remote_rels)
2294  {
2295  LogicalRepRelMapEntry *rel = lfirst(lc);
2296 
2298  }
2299  foreach(lc, part_rels)
2300  {
2301  Relation rel = lfirst(lc);
2302 
2303  table_close(rel, NoLock);
2304  }
2305 
2307 }
2308 
2309 
2310 /*
2311  * Logical replication protocol message dispatcher.
2312  */
2313 static void
2315 {
2317  LogicalRepMsgType saved_command;
2318 
2319  /*
2320  * Set the current command being applied. Since this function can be
2321  * called recusively when applying spooled changes, save the current
2322  * command.
2323  */
2324  saved_command = apply_error_callback_arg.command;
2325  apply_error_callback_arg.command = action;
2326 
2327  switch (action)
2328  {
2329  case LOGICAL_REP_MSG_BEGIN:
2330  apply_handle_begin(s);
2331  break;
2332 
2335  break;
2336 
2339  break;
2340 
2343  break;
2344 
2347  break;
2348 
2351  break;
2352 
2355  break;
2356 
2357  case LOGICAL_REP_MSG_TYPE:
2358  apply_handle_type(s);
2359  break;
2360 
2363  break;
2364 
2366 
2367  /*
2368  * Logical replication does not use generic logical messages yet.
2369  * Although, it could be used by other applications that use this
2370  * output plugin.
2371  */
2372  break;
2373 
2376  break;
2377 
2380  break;
2381 
2384  break;
2385 
2388  break;
2389 
2392  break;
2393 
2396  break;
2397 
2400  break;
2401 
2404  break;
2405 
2408  break;
2409 
2410  default:
2411  ereport(ERROR,
2412  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2413  errmsg("invalid logical replication message type \"%c\"", action)));
2414  }
2415 
2416  /* Reset the current command */
2417  apply_error_callback_arg.command = saved_command;
2418 }
2419 
2420 /*
2421  * Figure out which write/flush positions to report to the walsender process.
2422  *
2423  * We can't simply report back the last LSN the walsender sent us because the
2424  * local transaction might not yet be flushed to disk locally. Instead we
2425  * build a list that associates local with remote LSNs for every commit. When
2426  * reporting back the flush position to the sender we iterate that list and
2427  * check which entries on it are already locally flushed. Those we can report
2428  * as having been flushed.
2429  *
2430  * The have_pending_txes is true if there are outstanding transactions that
2431  * need to be flushed.
2432  */
2433 static void
2435  bool *have_pending_txes)
2436 {
2437  dlist_mutable_iter iter;
2438  XLogRecPtr local_flush = GetFlushRecPtr();
2439 
2440  *write = InvalidXLogRecPtr;
2441  *flush = InvalidXLogRecPtr;
2442 
2443  dlist_foreach_modify(iter, &lsn_mapping)
2444  {
2445  FlushPosition *pos =
2447 
2448  *write = pos->remote_end;
2449 
2450  if (pos->local_end <= local_flush)
2451  {
2452  *flush = pos->remote_end;
2453  dlist_delete(iter.cur);
2454  pfree(pos);
2455  }
2456  else
2457  {
2458  /*
2459  * Don't want to uselessly iterate over the rest of the list which
2460  * could potentially be long. Instead get the last element and
2461  * grab the write position from there.
2462  */
2464  &lsn_mapping);
2465  *write = pos->remote_end;
2466  *have_pending_txes = true;
2467  return;
2468  }
2469  }
2470 
2471  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
2472 }
2473 
2474 /*
2475  * Store current remote/local lsn pair in the tracking list.
2476  */
2477 static void
2479 {
2480  FlushPosition *flushpos;
2481 
2482  /* Need to do this in permanent context */
2483  MemoryContextSwitchTo(ApplyContext);
2484 
2485  /* Track commit lsn */
2486  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
2487  flushpos->local_end = XactLastCommitEnd;
2488  flushpos->remote_end = remote_lsn;
2489 
2490  dlist_push_tail(&lsn_mapping, &flushpos->node);
2491  MemoryContextSwitchTo(ApplyMessageContext);
2492 }
2493 
2494 
2495 /* Update statistics of the worker. */
2496 static void
2497 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
2498 {
2499  MyLogicalRepWorker->last_lsn = last_lsn;
2500  MyLogicalRepWorker->last_send_time = send_time;
2502  if (reply)
2503  {
2504  MyLogicalRepWorker->reply_lsn = last_lsn;
2505  MyLogicalRepWorker->reply_time = send_time;
2506  }
2507 }
2508 
2509 /*
2510  * Apply main loop.
2511  */
2512 static void
2514 {
2515  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
2516  bool ping_sent = false;
2517  TimeLineID tli;
2518  ErrorContextCallback errcallback;
2519 
2520  /*
2521  * Init the ApplyMessageContext which we clean up after each replication
2522  * protocol message.
2523  */
2524  ApplyMessageContext = AllocSetContextCreate(ApplyContext,
2525  "ApplyMessageContext",
2527 
2528  /*
2529  * This memory context is used for per-stream data when the streaming mode
2530  * is enabled. This context is reset on each stream stop.
2531  */
2532  LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
2533  "LogicalStreamingContext",
2535 
2536  /* mark as idle, before starting to loop */
2538 
2539  /*
2540  * Push apply error context callback. Fields will be filled during
2541  * applying a change.
2542  */
2543  errcallback.callback = apply_error_callback;
2544  errcallback.previous = error_context_stack;
2545  error_context_stack = &errcallback;
2546 
2547  /* This outer loop iterates once per wait. */
2548  for (;;)
2549  {
2551  int rc;
2552  int len;
2553  char *buf = NULL;
2554  bool endofstream = false;
2555  long wait_time;
2556 
2558 
2559  MemoryContextSwitchTo(ApplyMessageContext);
2560 
2561  len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
2562 
2563  if (len != 0)
2564  {
2565  /* Loop to process all available data (without blocking). */
2566  for (;;)
2567  {
2569 
2570  if (len == 0)
2571  {
2572  break;
2573  }
2574  else if (len < 0)
2575  {
2576  ereport(LOG,
2577  (errmsg("data stream from publisher has ended")));
2578  endofstream = true;
2579  break;
2580  }
2581  else
2582  {
2583  int c;
2584  StringInfoData s;
2585 
2586  /* Reset timeout. */
2587  last_recv_timestamp = GetCurrentTimestamp();
2588  ping_sent = false;
2589 
2590  /* Ensure we are reading the data into our memory context. */
2591  MemoryContextSwitchTo(ApplyMessageContext);
2592 
2593  s.data = buf;
2594  s.len = len;
2595  s.cursor = 0;
2596  s.maxlen = -1;
2597 
2598  c = pq_getmsgbyte(&s);
2599 
2600  if (c == 'w')
2601  {
2602  XLogRecPtr start_lsn;
2603  XLogRecPtr end_lsn;
2604  TimestampTz send_time;
2605 
2606  start_lsn = pq_getmsgint64(&s);
2607  end_lsn = pq_getmsgint64(&s);
2608  send_time = pq_getmsgint64(&s);
2609 
2610  if (last_received < start_lsn)
2611  last_received = start_lsn;
2612 
2613  if (last_received < end_lsn)
2614  last_received = end_lsn;
2615 
2616  UpdateWorkerStats(last_received, send_time, false);
2617 
2618  apply_dispatch(&s);
2619  }
2620  else if (c == 'k')
2621  {
2622  XLogRecPtr end_lsn;
2624  bool reply_requested;
2625 
2626  end_lsn = pq_getmsgint64(&s);
2627  timestamp = pq_getmsgint64(&s);
2628  reply_requested = pq_getmsgbyte(&s);
2629 
2630  if (last_received < end_lsn)
2631  last_received = end_lsn;
2632 
2633  send_feedback(last_received, reply_requested, false);
2634  UpdateWorkerStats(last_received, timestamp, true);
2635  }
2636  /* other message types are purposefully ignored */
2637 
2638  MemoryContextReset(ApplyMessageContext);
2639  }
2640 
2641  len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
2642  }
2643  }
2644 
2645  /* confirm all writes so far */
2646  send_feedback(last_received, false, false);
2647 
2649  {
2650  /*
2651  * If we didn't get any transactions for a while there might be
2652  * unconsumed invalidation messages in the queue, consume them
2653  * now.
2654  */
2657 
2658  /* Process any table synchronization changes. */
2659  process_syncing_tables(last_received);
2660  }
2661 
2662  /* Cleanup the memory. */
2663  MemoryContextResetAndDeleteChildren(ApplyMessageContext);
2665 
2666  /* Check if we need to exit the streaming loop. */
2667  if (endofstream)
2668  break;
2669 
2670  /*
2671  * Wait for more data or latch. If we have unflushed transactions,
2672  * wake up after WalWriterDelay to see if they've been flushed yet (in
2673  * which case we should send a feedback message). Otherwise, there's
2674  * no particular urgency about waking up unless we get data or a
2675  * signal.
2676  */
2677  if (!dlist_is_empty(&lsn_mapping))
2678  wait_time = WalWriterDelay;
2679  else
2680  wait_time = NAPTIME_PER_CYCLE;
2681 
2685  fd, wait_time,
2687 
2688  if (rc & WL_LATCH_SET)
2689  {
2692  }
2693 
2694  if (ConfigReloadPending)
2695  {
2696  ConfigReloadPending = false;
2698  }
2699 
2700  if (rc & WL_TIMEOUT)
2701  {
2702  /*
2703  * We didn't receive anything new. If we haven't heard anything
2704  * from the server for more than wal_receiver_timeout / 2, ping
2705  * the server. Also, if it's been longer than
2706  * wal_receiver_status_interval since the last update we sent,
2707  * send a status update to the primary anyway, to report any
2708  * progress in applying WAL.
2709  */
2710  bool requestReply = false;
2711 
2712  /*
2713  * Check if time since last receive from primary has reached the
2714  * configured limit.
2715  */
2716  if (wal_receiver_timeout > 0)
2717  {
2719  TimestampTz timeout;
2720 
2721  timeout =
2722  TimestampTzPlusMilliseconds(last_recv_timestamp,
2724 
2725  if (now >= timeout)
2726  ereport(ERROR,
2727  (errcode(ERRCODE_CONNECTION_FAILURE),
2728  errmsg("terminating logical replication worker due to timeout")));
2729 
2730  /* Check to see if it's time for a ping. */
2731  if (!ping_sent)
2732  {
2733  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
2734  (wal_receiver_timeout / 2));
2735  if (now >= timeout)
2736  {
2737  requestReply = true;
2738  ping_sent = true;
2739  }
2740  }
2741  }
2742 
2743  send_feedback(last_received, requestReply, requestReply);
2744  }
2745  }
2746 
2747  /* Pop the error context stack */
2748  error_context_stack = errcallback.previous;
2749 
2750  /* All done */
2751  walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
2752 }
2753 
2754 /*
2755  * Send a Standby Status Update message to server.
2756  *
2757  * 'recvpos' is the latest LSN we've received data to, force is set if we need
2758  * to send a response to avoid timeouts.
2759  */
2760 static void
2761 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
2762 {
2763  static StringInfo reply_message = NULL;
2764  static TimestampTz send_time = 0;
2765 
2766  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
2767  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
2768  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
2769 
2770  XLogRecPtr writepos;
2771  XLogRecPtr flushpos;
2772  TimestampTz now;
2773  bool have_pending_txes;
2774 
2775  /*
2776  * If the user doesn't want status to be reported to the publisher, be
2777  * sure to exit before doing anything at all.
2778  */
2779  if (!force && wal_receiver_status_interval <= 0)
2780  return;
2781 
2782  /* It's legal to not pass a recvpos */
2783  if (recvpos < last_recvpos)
2784  recvpos = last_recvpos;
2785 
2786  get_flush_position(&writepos, &flushpos, &have_pending_txes);
2787 
2788  /*
2789  * No outstanding transactions to flush, we can report the latest received
2790  * position. This is important for synchronous replication.
2791  */
2792  if (!have_pending_txes)
2793  flushpos = writepos = recvpos;
2794 
2795  if (writepos < last_writepos)
2796  writepos = last_writepos;
2797 
2798  if (flushpos < last_flushpos)
2799  flushpos = last_flushpos;
2800 
2801  now = GetCurrentTimestamp();
2802 
2803  /* if we've already reported everything we're good */
2804  if (!force &&
2805  writepos == last_writepos &&
2806  flushpos == last_flushpos &&
2807  !TimestampDifferenceExceeds(send_time, now,
2809  return;
2810  send_time = now;
2811 
2812  if (!reply_message)
2813  {
2814  MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
2815 
2816  reply_message = makeStringInfo();
2817  MemoryContextSwitchTo(oldctx);
2818  }
2819  else
2820  resetStringInfo(reply_message);
2821 
2822  pq_sendbyte(reply_message, 'r');
2823  pq_sendint64(reply_message, recvpos); /* write */
2824  pq_sendint64(reply_message, flushpos); /* flush */
2825  pq_sendint64(reply_message, writepos); /* apply */
2826  pq_sendint64(reply_message, now); /* sendTime */
2827  pq_sendbyte(reply_message, requestReply); /* replyRequested */
2828 
2829  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
2830  force,
2831  LSN_FORMAT_ARGS(recvpos),
2832  LSN_FORMAT_ARGS(writepos),
2833  LSN_FORMAT_ARGS(flushpos));
2834 
2835  walrcv_send(LogRepWorkerWalRcvConn,
2836  reply_message->data, reply_message->len);
2837 
2838  if (recvpos > last_recvpos)
2839  last_recvpos = recvpos;
2840  if (writepos > last_writepos)
2841  last_writepos = writepos;
2842  if (flushpos > last_flushpos)
2843  last_flushpos = flushpos;
2844 }
2845 
2846 /*
2847  * Reread subscription info if needed. Most changes will be exit.
2848  */
2849 static void
2851 {
2852  MemoryContext oldctx;
2854  bool started_tx = false;
2855 
2856  /* When cache state is valid there is nothing to do here. */
2857  if (MySubscriptionValid)
2858  return;
2859 
2860  /* This function might be called inside or outside of transaction. */
2861  if (!IsTransactionState())
2862  {
2864  started_tx = true;
2865  }
2866 
2867  /* Ensure allocations in permanent context. */
2868  oldctx = MemoryContextSwitchTo(ApplyContext);
2869 
2870  newsub = GetSubscription(MyLogicalRepWorker->subid, true);
2871 
2872  /*
2873  * Exit if the subscription was removed. This normally should not happen
2874  * as the worker gets killed during DROP SUBSCRIPTION.
2875  */
2876  if (!newsub)
2877  {
2878  ereport(LOG,
2879  (errmsg("logical replication apply worker for subscription \"%s\" will "
2880  "stop because the subscription was removed",
2881  MySubscription->name)));
2882 
2883  proc_exit(0);
2884  }
2885 
2886  /*
2887  * Exit if the subscription was disabled. This normally should not happen
2888  * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
2889  */
2890  if (!newsub->enabled)
2891  {
2892  ereport(LOG,
2893  (errmsg("logical replication apply worker for subscription \"%s\" will "
2894  "stop because the subscription was disabled",
2895  MySubscription->name)));
2896 
2897  proc_exit(0);
2898  }
2899 
2900  /* !slotname should never happen when enabled is true. */
2901  Assert(newsub->slotname);
2902 
2903  /* two-phase should not be altered */
2904  Assert(newsub->twophasestate == MySubscription->twophasestate);
2905 
2906  /*
2907  * Exit if any parameter that affects the remote connection was changed.
2908  * The launcher will start a new worker.
2909  */
2910  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
2911  strcmp(newsub->name, MySubscription->name) != 0 ||
2912  strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
2913  newsub->binary != MySubscription->binary ||
2914  newsub->stream != MySubscription->stream ||
2915  !equal(newsub->publications, MySubscription->publications))
2916  {
2917  ereport(LOG,
2918  (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
2919  MySubscription->name)));
2920 
2921  proc_exit(0);
2922  }
2923 
2924  /* Check for other changes that should never happen too. */
2925  if (newsub->dbid != MySubscription->dbid)
2926  {
2927  elog(ERROR, "subscription %u changed unexpectedly",
2929  }
2930 
2931  /* Clean old subscription info and switch to new one. */
2932  FreeSubscription(MySubscription);
2933  MySubscription = newsub;
2934 
2935  MemoryContextSwitchTo(oldctx);
2936 
2937  /* Change synchronous commit according to the user's wishes */
2938  SetConfigOption("synchronous_commit", MySubscription->synccommit,
2940 
2941  if (started_tx)
2943 
2944  MySubscriptionValid = true;
2945 }
2946 
2947 /*
2948  * Callback from subscription syscache invalidation.
2949  */
2950 static void
2951 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
2952 {
2953  MySubscriptionValid = false;
2954 }
2955 
2956 /*
2957  * subxact_info_write
2958  * Store information about subxacts for a toplevel transaction.
2959  *
2960  * For each subxact we store offset of it's first change in the main file.
2961  * The file is always over-written as a whole.
2962  *
2963  * XXX We should only store subxacts that were not aborted yet.
2964  */
2965 static void
2967 {
2968  char path[MAXPGPATH];
2969  Size len;
2970  BufFile *fd;
2971 
2973 
2974  /* construct the subxact filename */
2975  subxact_filename(path, subid, xid);
2976 
2977  /* Delete the subxacts file, if exists. */
2978  if (subxact_data.nsubxacts == 0)
2979  {
2982 
2983  return;
2984  }
2985 
2986  /*
2987  * Create the subxact file if it not already created, otherwise open the
2988  * existing file.
2989  */
2991  true);
2992  if (fd == NULL)
2994 
2995  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
2996 
2997  /* Write the subxact count and subxact info */
2998  BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
2999  BufFileWrite(fd, subxact_data.subxacts, len);
3000 
3001  BufFileClose(fd);
3002 
3003  /* free the memory allocated for subxact info */
3005 }
3006 
3007 /*
3008  * subxact_info_read
3009  * Restore information about subxacts of a streamed transaction.
3010  *
3011  * Read information about subxacts into the structure subxact_data that can be
3012  * used later.
3013  */
3014 static void
3016 {
3017  char path[MAXPGPATH];
3018  Size len;
3019  BufFile *fd;
3020  MemoryContext oldctx;
3021 
3022  Assert(!subxact_data.subxacts);
3023  Assert(subxact_data.nsubxacts == 0);
3024  Assert(subxact_data.nsubxacts_max == 0);
3025 
3026  /*
3027  * If the subxact file doesn't exist that means we don't have any subxact
3028  * info.
3029  */
3030  subxact_filename(path, subid, xid);
3032  true);
3033  if (fd == NULL)
3034  return;
3035 
3036  /* read number of subxact items */
3037  if (BufFileRead(fd, &subxact_data.nsubxacts,
3038  sizeof(subxact_data.nsubxacts)) !=
3039  sizeof(subxact_data.nsubxacts))
3040  ereport(ERROR,
3042  errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
3043  path)));
3044 
3045  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
3046 
3047  /* we keep the maximum as a power of 2 */
3048  subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts);
3049 
3050  /*
3051  * Allocate subxact information in the logical streaming context. We need
3052  * this information during the complete stream so that we can add the sub
3053  * transaction info to this. On stream stop we will flush this information
3054  * to the subxact file and reset the logical streaming context.
3055  */
3056  oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
3057  subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
3058  sizeof(SubXactInfo));
3059  MemoryContextSwitchTo(oldctx);
3060 
3061  if ((len > 0) && ((BufFileRead(fd, subxact_data.subxacts, len)) != len))
3062  ereport(ERROR,
3064  errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
3065  path)));
3066 
3067  BufFileClose(fd);
3068 }
3069 
3070 /*
3071  * subxact_info_add
3072  * Add information about a subxact (offset in the main file).
3073  */
3074 static void
3076 {
3077  SubXactInfo *subxacts = subxact_data.subxacts;
3078  int64 i;
3079 
3080  /* We must have a valid top level stream xid and a stream fd. */
3082  Assert(stream_fd != NULL);
3083 
3084  /*
3085  * If the XID matches the toplevel transaction, we don't want to add it.
3086  */
3087  if (stream_xid == xid)
3088  return;
3089 
3090  /*
3091  * In most cases we're checking the same subxact as we've already seen in
3092  * the last call, so make sure to ignore it (this change comes later).
3093  */
3094  if (subxact_data.subxact_last == xid)
3095  return;
3096 
3097  /* OK, remember we're processing this XID. */
3098  subxact_data.subxact_last = xid;
3099 
3100  /*
3101  * Check if the transaction is already present in the array of subxact. We
3102  * intentionally scan the array from the tail, because we're likely adding
3103  * a change for the most recent subtransactions.
3104  *
3105  * XXX Can we rely on the subxact XIDs arriving in sorted order? That
3106  * would allow us to use binary search here.
3107  */
3108  for (i = subxact_data.nsubxacts; i > 0; i--)
3109  {
3110  /* found, so we're done */
3111  if (subxacts[i - 1].xid == xid)
3112  return;
3113  }
3114 
3115  /* This is a new subxact, so we need to add it to the array. */
3116  if (subxact_data.nsubxacts == 0)
3117  {
3118  MemoryContext oldctx;
3119 
3120  subxact_data.nsubxacts_max = 128;
3121 
3122  /*
3123  * Allocate this memory for subxacts in per-stream context, see
3124  * subxact_info_read.
3125  */
3126  oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
3127  subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
3128  MemoryContextSwitchTo(oldctx);
3129  }
3130  else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
3131  {
3132  subxact_data.nsubxacts_max *= 2;
3133  subxacts = repalloc(subxacts,
3134  subxact_data.nsubxacts_max * sizeof(SubXactInfo));
3135  }
3136 
3137  subxacts[subxact_data.nsubxacts].xid = xid;
3138 
3139  /*
3140  * Get the current offset of the stream file and store it as offset of
3141  * this subxact.
3142  */
3143  BufFileTell(stream_fd,
3144  &subxacts[subxact_data.nsubxacts].fileno,
3145  &subxacts[subxact_data.nsubxacts].offset);
3146 
3147  subxact_data.nsubxacts++;
3148  subxact_data.subxacts = subxacts;
3149 }
3150 
3151 /* format filename for file containing the info about subxacts */
3152 static inline void
3153 subxact_filename(char *path, Oid subid, TransactionId xid)
3154 {
3155  snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
3156 }
3157 
3158 /* format filename for file containing serialized changes */
3159 static inline void
3160 changes_filename(char *path, Oid subid, TransactionId xid)
3161 {
3162  snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
3163 }
3164 
3165 /*
3166  * stream_cleanup_files
3167  * Cleanup files for a subscription / toplevel transaction.
3168  *
3169  * Remove files with serialized changes and subxact info for a particular
3170  * toplevel transaction. Each subscription has a separate set of files
3171  * for any toplevel transaction.
3172  */
3173 static void
3175 {
3176  char path[MAXPGPATH];
3177 
3178  /* Delete the changes file. */
3179  changes_filename(path, subid, xid);
3181 
3182  /* Delete the subxact file, if it exists. */
3183  subxact_filename(path, subid, xid);
3185 }
3186 
3187 /*
3188  * stream_open_file
3189  * Open a file that we'll use to serialize changes for a toplevel
3190  * transaction.
3191  *
3192  * Open a file for streamed changes from a toplevel transaction identified
3193  * by stream_xid (global variable). If it's the first chunk of streamed
3194  * changes for this transaction, create the buffile, otherwise open the
3195  * previously created file.
3196  *
3197  * This can only be called at the beginning of a "streaming" block, i.e.
3198  * between stream_start/stream_stop messages from the upstream.
3199  */
3200 static void
3201 stream_open_file(Oid subid, TransactionId xid, bool first_segment)
3202 {
3203  char path[MAXPGPATH];
3204  MemoryContext oldcxt;
3205 
3207  Assert(OidIsValid(subid));
3209  Assert(stream_fd == NULL);
3210 
3211 
3212  changes_filename(path, subid, xid);
3213  elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
3214 
3215  /*
3216  * Create/open the buffiles under the logical streaming context so that we
3217  * have those files until stream stop.
3218  */
3219  oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
3220 
3221  /*
3222  * If this is the first streamed segment, create the changes file.
3223  * Otherwise, just open the file for writing, in append mode.
3224  */
3225  if (first_segment)
3227  path);
3228  else
3229  {
3230  /*
3231  * Open the file and seek to the end of the file because we always
3232  * append the changes file.
3233  */
3235  path, O_RDWR, false);
3236  BufFileSeek(stream_fd, 0, 0, SEEK_END);
3237  }
3238 
3239  MemoryContextSwitchTo(oldcxt);
3240 }
3241 
3242 /*
3243  * stream_close_file
3244  * Close the currently open file with streamed changes.
3245  *
3246  * This can only be called at the end of a streaming block, i.e. at stream_stop
3247  * message from the upstream.
3248  */
3249 static void
3251 {
3254  Assert(stream_fd != NULL);
3255 
3256  BufFileClose(stream_fd);
3257 
3259  stream_fd = NULL;
3260 }
3261 
3262 /*
3263  * stream_write_change
3264  * Serialize a change to a file for the current toplevel transaction.
3265  *
3266  * The change is serialized in a simple format, with length (not including
3267  * the length), action code (identifying the message type) and message
3268  * contents (without the subxact TransactionId value).
3269  */
3270 static void
3272 {
3273  int len;
3274 
3277  Assert(stream_fd != NULL);
3278 
3279  /* total on-disk size, including the action type character */
3280  len = (s->len - s->cursor) + sizeof(char);
3281 
3282  /* first write the size */
3283  BufFileWrite(stream_fd, &len, sizeof(len));
3284 
3285  /* then the action */
3286  BufFileWrite(stream_fd, &action, sizeof(action));
3287 
3288  /* and finally the remaining part of the buffer (after the XID) */
3289  len = (s->len - s->cursor);
3290 
3291  BufFileWrite(stream_fd, &s->data[s->cursor], len);
3292 }
3293 
3294 /*
3295  * Cleanup the memory for subxacts and reset the related variables.
3296  */
3297 static inline void
3299 {
3300  if (subxact_data.subxacts)
3301  pfree(subxact_data.subxacts);
3302 
3303  subxact_data.subxacts = NULL;
3304  subxact_data.subxact_last = InvalidTransactionId;
3305  subxact_data.nsubxacts = 0;
3306  subxact_data.nsubxacts_max = 0;
3307 }
3308 
3309 /*
3310  * Form the prepared transaction GID for two_phase transactions.
3311  *
3312  * Return the GID in the supplied buffer.
3313  */
3314 static void
3315 TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
3316 {
3317  Assert(subid != InvalidRepOriginId);
3318 
3319  if (!TransactionIdIsValid(xid))
3320  ereport(ERROR,
3321  (errcode(ERRCODE_PROTOCOL_VIOLATION),
3322  errmsg_internal("invalid two-phase transaction ID")));
3323 
3324  snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
3325 }
3326 
3327 /* Logical Replication Apply worker entry point */
3328 void
3330 {
3331  int worker_slot = DatumGetInt32(main_arg);
3332  MemoryContext oldctx;
3333  char originname[NAMEDATALEN];
3334  XLogRecPtr origin_startpos;
3335  char *myslotname;
3337  int server_version;
3338 
3339  /* Attach to slot */
3340  logicalrep_worker_attach(worker_slot);
3341 
3342  /* Setup signal handling */
3344  pqsignal(SIGTERM, die);
3346 
3347  /*
3348  * We don't currently need any ResourceOwner in a walreceiver process, but
3349  * if we did, we could call CreateAuxProcessResourceOwner here.
3350  */
3351 
3352  /* Initialise stats to a sanish value */
3355 
3356  /* Load the libpq-specific functions */
3357  load_file("libpqwalreceiver", false);
3358 
3359  /* Run as replica session replication role. */
3360  SetConfigOption("session_replication_role", "replica",
3362 
3363  /* Connect to our database. */
3366  0);
3367 
3368  /*
3369  * Set always-secure search path, so malicious users can't redirect user
3370  * code (e.g. pg_index.indexprs).
3371  */
3372  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
3373 
3374  /* Load the subscription into persistent memory context. */
3375  ApplyContext = AllocSetContextCreate(TopMemoryContext,
3376  "ApplyContext",
3379  oldctx = MemoryContextSwitchTo(ApplyContext);
3380 
3381  MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
3382  if (!MySubscription)
3383  {
3384  ereport(LOG,
3385  (errmsg("logical replication apply worker for subscription %u will not "
3386  "start because the subscription was removed during startup",
3388  proc_exit(0);
3389  }
3390 
3391  MySubscriptionValid = true;
3392  MemoryContextSwitchTo(oldctx);
3393 
3394  if (!MySubscription->enabled)
3395  {
3396  ereport(LOG,
3397  (errmsg("logical replication apply worker for subscription \"%s\" will not "
3398  "start because the subscription was disabled during startup",
3399  MySubscription->name)));
3400 
3401  proc_exit(0);
3402  }
3403 
3404  /* Setup synchronous commit according to the user's wishes */
3405  SetConfigOption("synchronous_commit", MySubscription->synccommit,
3407 
3408  /* Keep us informed about subscription changes. */
3411  (Datum) 0);
3412 
3413  if (am_tablesync_worker())
3414  ereport(LOG,
3415  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
3416  MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
3417  else
3418  ereport(LOG,
3419  (errmsg("logical replication apply worker for subscription \"%s\" has started",
3420  MySubscription->name)));
3421 
3423 
3424  /* Connect to the origin and start the replication. */
3425  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
3426  MySubscription->conninfo);
3427 
3428  if (am_tablesync_worker())
3429  {
3430  char *syncslotname;
3431 
3432  /* This is table synchronization worker, call initial sync. */
3433  syncslotname = LogicalRepSyncTableStart(&origin_startpos);
3434 
3435  /* allocate slot name in long-lived context */
3436  myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
3437 
3438  pfree(syncslotname);
3439  }
3440  else
3441  {
3442  /* This is main apply worker */
3443  RepOriginId originid;
3444  TimeLineID startpointTLI;
3445  char *err;
3446 
3447  myslotname = MySubscription->slotname;
3448 
3449  /*
3450  * This shouldn't happen if the subscription is enabled, but guard
3451  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
3452  * crash if slot is NULL.)
3453  */
3454  if (!myslotname)
3455  ereport(ERROR,
3456  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3457  errmsg("subscription has no replication slot set")));
3458 
3459  /* Setup replication origin tracking. */
3461  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
3462  originid = replorigin_by_name(originname, true);
3463  if (!OidIsValid(originid))
3464  originid = replorigin_create(originname);
3465  replorigin_session_setup(originid);
3466  replorigin_session_origin = originid;
3467  origin_startpos = replorigin_session_get_progress(false);
3469 
3470  LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
3471  MySubscription->name, &err);
3472  if (LogRepWorkerWalRcvConn == NULL)
3473  ereport(ERROR,
3474  (errcode(ERRCODE_CONNECTION_FAILURE),
3475  errmsg("could not connect to the publisher: %s", err)));
3476 
3477  /*
3478  * We don't really use the output identify_system for anything but it
3479  * does some initializations on the upstream so let's still call it.
3480  */
3481  (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
3482  }
3483 
3484  /*
3485  * Setup callback for syscache so that we know when something changes in
3486  * the subscription relation state.
3487  */
3490  (Datum) 0);
3491 
3492  /* Build logical replication streaming options. */
3493  options.logical = true;
3494  options.startpoint = origin_startpos;
3495  options.slotname = myslotname;
3496 
3497  server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
3498  options.proto.logical.proto_version =
3499  server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
3500  server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
3502 
3503  options.proto.logical.publication_names = MySubscription->publications;
3504  options.proto.logical.binary = MySubscription->binary;
3505  options.proto.logical.streaming = MySubscription->stream;
3506  options.proto.logical.twophase = false;
3507 
3508  if (!am_tablesync_worker())
3509  {
3510  /*
3511  * Even when the two_phase mode is requested by the user, it remains
3512  * as the tri-state PENDING until all tablesyncs have reached READY
3513  * state. Only then, can it become ENABLED.
3514  *
3515  * Note: If the subscription has no tables then leave the state as
3516  * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
3517  * work.
3518  */
3519  if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
3521  {
3522  /* Start streaming with two_phase enabled */
3523  options.proto.logical.twophase = true;
3524  walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
3525 
3530  }
3531  else
3532  {
3533  walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
3534  }
3535 
3536  ereport(DEBUG1,
3537  (errmsg("logical replication apply worker for subscription \"%s\" two_phase is %s.",
3538  MySubscription->name,
3539  MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
3540  MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
3541  MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
3542  "?")));
3543  }
3544  else
3545  {
3546  /* Start normal logical streaming replication. */
3547  walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
3548  }
3549 
3550  /* Run the main loop. */
3551  LogicalRepApplyLoop(origin_startpos);
3552 
3553  proc_exit(0);
3554 }
3555 
3556 /*
3557  * Is current process a logical replication worker?
3558  */
3559 bool
3561 {
3562  return MyLogicalRepWorker != NULL;
3563 }
3564 
3565 /* Error callback to give more context info about the change being applied */
3566 static void
3568 {
3571 
3572  if (apply_error_callback_arg.command == 0)
3573  return;
3574 
3575  initStringInfo(&buf);
3576  appendStringInfo(&buf, _("processing remote data during \"%s\""),
3577  logicalrep_message_type(errarg->command));
3578 
3579  /* append relation information */
3580  if (errarg->rel)
3581  {
3582  appendStringInfo(&buf, _(" for replication target relation \"%s.%s\""),
3583  errarg->rel->remoterel.nspname,
3584  errarg->rel->remoterel.relname);
3585  if (errarg->remote_attnum >= 0)
3586  appendStringInfo(&buf, _(" column \"%s\""),
3587  errarg->rel->remoterel.attnames[errarg->remote_attnum]);
3588  }
3589 
3590  /* append transaction information */
3591  if (TransactionIdIsNormal(errarg->remote_xid))
3592  {
3593  appendStringInfo(&buf, _(" in transaction %u"), errarg->remote_xid);
3594  if (errarg->ts != 0)
3595  appendStringInfo(&buf, _(" at %s"),
3596  timestamptz_to_str(errarg->ts));
3597  }
3598 
3599  errcontext("%s", buf.data);
3600  pfree(buf.data);
3601 }
3602 
3603 /* Set transaction information of apply error callback */
3604 static inline void
3606 {
3607  apply_error_callback_arg.remote_xid = xid;
3608  apply_error_callback_arg.ts = ts;
3609 }
3610 
3611 /* Reset all information of apply error callback */
3612 static inline void
3614 {
3615  apply_error_callback_arg.command = 0;
3616  apply_error_callback_arg.rel = NULL;
3617  apply_error_callback_arg.remote_attnum = -1;
3619 }
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
Subscription * MySubscription
Definition: worker.c:247
static void apply_handle_type(StringInfo s)
Definition: worker.c:1502
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:475
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:352
#define NIL
Definition: pg_list.h:65
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
Definition: worker.c:1975
void ExecInitRangeTable(EState *estate, List *rangeTable)
Definition: execUtils.c:751
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:36
static void stream_close_file(void)
Definition: worker.c:3250
TransactionId xid
Definition: logicalproto.h:125
static void stream_write_change(char action, StringInfo s)
Definition: worker.c:3271
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:1518
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:2761
static void subxact_info_add(TransactionId xid)
Definition: worker.c:3075
Relation ri_RelationDesc
Definition: execnodes.h:411
static void subxact_info_read(Oid subid, TransactionId xid)
Definition: worker.c:3015
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:447
#define AllocSetContextCreate
Definition: memutils.h:173
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:420
#define DEBUG1
Definition: elog.h:25
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
Definition: proto.c:131
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
dlist_node * cur
Definition: ilist.h:180
static void store_flush_position(XLogRecPtr remote_lsn)
Definition: worker.c:2478
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:245
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1831
uint32 TimeLineID
Definition: xlogdefs.h:59
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:548
int fileno
Definition: worker.c:264
MemoryContext TopTransactionContext
Definition: mcxt.c:53
void AcceptInvalidationMessages(void)
Definition: inval.c:725
CommandId es_output_cid
Definition: execnodes.h:572
PartitionTupleRouting * proute
Definition: worker.c:215
static XLogRecPtr remote_final_lsn
Definition: worker.c:251
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4782
#define WL_TIMEOUT
Definition: latch.h:128
void ProcessConfigFile(GucContext context)
static void apply_handle_insert(StringInfo s)
Definition: worker.c:1535
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:543
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
uint32 TransactionId
Definition: c.h:587
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup)
Definition: worker.c:1760
static dlist_head lsn_mapping
Definition: worker.c:204
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:1109
#define DatumGetInt32(X)
Definition: postgres.h:516
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:662
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:3149
#define RelationGetDescr(relation)
Definition: rel.h:503
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:587
int LOCKMODE
Definition: lockdefs.h:26
LogicalRepMsgType command
Definition: worker.c:221
#define LOGICALREP_TWOPHASE_STATE_DISABLED
dlist_node node
Definition: worker.c:199
#define write(a, b, c)
Definition: win32.h:14
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:3160
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:1198
int64 timestamp
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:60
int64 TimestampTz
Definition: timestamp.h:39
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1192
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:412
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:602
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
void CommitTransactionCommand(void)
Definition: xact.c:2949
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:209
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:2497
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:354
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset)
Definition: buffile.c:873
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:286
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:422
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
Definition: twophase.c:2482
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:683
Expr * expression_planner(Expr *expr)
Definition: planner.c:5807
#define walrcv_server_version(conn)
Definition: walreceiver.h:414
int maplen
Definition: attmap.h:37
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define AccessShareLock
Definition: lockdefs.h:36
TimestampTz last_send_time
uint16 RepOriginId
Definition: xlogdefs.h:65
XLogRecPtr last_lsn
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:1098
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:418
union WalRcvStreamOptions::@104 proto
bool PrepareTransactionBlock(const char *gid)
Definition: xact.c:3715
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1206
XLogRecPtr remote_end
Definition: worker.c:201
int errcode(int sqlerrcode)
Definition: elog.c:698
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:92
CmdType operation
Definition: execnodes.h:1188
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:240
static void reset_apply_error_context_info(void)
Definition: worker.c:3613
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:433
Datum * tts_values
Definition: tuptable.h:126
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:143
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8680
LogicalRepRelMapEntry * targetRel
Definition: worker.c:210
void PopActiveSnapshot(void)
Definition: snapmgr.c:774
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:674
StringInfoData * colvalues
Definition: logicalproto.h:81
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:3153
void pgstat_report_stat(bool disconnect)
Definition: pgstat.c:858
EState * state
Definition: execnodes.h:968
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1071
List * es_range_table
Definition: execnodes.h:560
#define LOG
Definition: elog.h:26
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
Form_pg_class rd_rel
Definition: rel.h:109
unsigned int Oid
Definition: postgres_ext.h:31
void FileSetInit(FileSet *fileset)
Definition: fileset.c:54
TransactionId subxact_last
Definition: worker.c:273
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:920
bool IsLogicalWorker(void)
Definition: worker.c:3560
void(* callback)(void *arg)
Definition: elog.h:247
int wal_receiver_status_interval
Definition: walreceiver.c:89
List * lappend_oid(List *list, Oid datum)
Definition: list.c:372
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1711
struct ErrorContextCallback * previous
Definition: elog.h:246
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:250
#define OidIsValid(objectId)
Definition: c.h:710
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:93
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
static int fd(const char *x, int i)
Definition: preproc-init.c:105
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
Definition: buffile.c:359
uint32 nsubxacts
Definition: worker.c:271
void BufFileClose(BufFile *file)
Definition: buffile.c:407
void ResetLatch(Latch *latch)
Definition: latch.c:660
int wal_receiver_timeout
Definition: walreceiver.c:90
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:2434
Definition: attmap.h:34
struct ApplyErrorCallbackArg ApplyErrorCallbackArg
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:156
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2834
#define LOGICALREP_TWOPHASE_STATE_ENABLED
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:422
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
ErrorContextCallback * error_context_stack
Definition: elog.c:93
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:531
#define list_make1(x1)
Definition: pg_list.h:206
#define NAMEDATALEN
void FreeExecutorState(EState *estate)
Definition: execUtils.c:186
Subscription * GetSubscription(Oid subid, bool missing_ok)
#define GetPerTupleExprContext(estate)
Definition: executor.h:533
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:157
static StringInfoData reply_message
Definition: walreceiver.c:118
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:2951
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
FileSet * stream_fileset
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:91
static void apply_error_callback(void *arg)
Definition: worker.c:3567
void pfree(void *pointer)
Definition: mcxt.c:1169
ModifyTableState * mtstate
Definition: worker.c:214
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:515
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
SubXactInfo * subxacts
Definition: worker.c:274
static void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:3174
static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:1299
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
#define ERROR
Definition: elog.h:46
static void apply_handle_prepare(StringInfo s)
Definition: worker.c:887
PlanState ps
Definition: execnodes.h:1187
struct ApplySubXactData ApplySubXactData
LogicalRepRelation remoterel
#define NAPTIME_PER_CYCLE
Definition: worker.c:195
static ApplyErrorCallbackArg apply_error_callback_arg
Definition: worker.c:230
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
static void * list_nth(const List *list, int n)
Definition: pg_list.h:278
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:1165
struct ApplyExecutionData ApplyExecutionData
bool in_remote_transaction
Definition: worker.c:250
void fill_extraUpdatedCols(RangeTblEntry *target_rte, Relation target_relation)
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs)
Definition: tablecmds.c:1735
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:739
Definition: guc.h:75
#define MAXPGPATH
static bool in_streamed_transaction
Definition: worker.c:254
TupleTableSlot * ri_PartitionTupleSlot
Definition: execnodes.h:514
XLogRecPtr reply_lsn
#define GIDSIZE
Definition: xact.h:31
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:690
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:354
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
Definition: fmgr.c:1662
static bool am_tablesync_worker(void)
static void apply_handle_delete(StringInfo s)
Definition: worker.c:1824
char * logicalrep_message_type(LogicalRepMsgType action)
Definition: proto.c:1164
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
Definition: worker.c:1450
#define DEBUG2
Definition: elog.h:24
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:102
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition: worker.c:1603
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:155
char * c
static void apply_handle_stream_prepare(StringInfo s)
Definition: worker.c:1029
void logicalrep_worker_attach(int slot)
Definition: launcher.c:565
#define NoLock
Definition: lockdefs.h:34
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:8110
static char * buf
Definition: pg_test_fsync.c:68
ResultRelInfo * targetRelInfo
Definition: worker.c:211
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:680
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:156
bool * tts_isnull
Definition: tuptable.h:128
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:316
List * es_opened_result_relations
Definition: execnodes.h:578
#define RowExclusiveLock
Definition: lockdefs.h:38
#define LOGICALREP_TWOPHASE_STATE_PENDING
int errcode_for_file_access(void)
Definition: elog.c:721
#define SIGHUP
Definition: win32_port.h:167
TimestampTz ts
Definition: worker.c:227
EState * estate
Definition: worker.c:208
#define InvalidTransactionId
Definition: transam.h:31
#define RelationGetRelationName(relation)
Definition: rel.h:511
XLogRecPtr startpoint
Definition: walreceiver.h:170
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:207
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:500
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
unsigned int uint32
Definition: c.h:441
int pgsocket
Definition: port.h:31
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:409
static void apply_handle_begin(StringInfo s)
Definition: worker.c:783
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:258
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition: lsyscache.c:2887
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2821
AttrMap * attrMap
Definition: tupconvert.h:28
MemoryContext TopMemoryContext
Definition: mcxt.c:48
EState * CreateExecutorState(void)
Definition: execUtils.c:90
Definition: guc.h:72
int my_log2(long num)
Definition: dynahash.c:1765
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:1624
List * lappend(List *list, void *datum)
Definition: list.c:336
TransactionId xid
Definition: worker.c:263
static void apply_handle_begin_prepare(StringInfo s)
Definition: worker.c:829
MemoryContext ApplyContext
Definition: worker.c:240
static char ** options
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
void BeginTransactionBlock(void)
Definition: xact.c:3647
XLogRecPtr final_lsn
Definition: logicalproto.h:123
#define DLIST_STATIC_INIT(name)
Definition: ilist.h:248
static void apply_handle_commit(StringInfo s)
Definition: worker.c:803
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2413
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:67
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:225
Node * build_column_default(Relation rel, int attrno)
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:1416
List * es_tupleTable
Definition: execnodes.h:602
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1498
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1191
void * palloc0(Size size)
Definition: mcxt.c:1093
char * s2
static void apply_handle_update(StringInfo s)
Definition: worker.c:1658
uintptr_t Datum
Definition: postgres.h:411
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition: execMain.c:1193
void CommandCounterIncrement(void)
Definition: xact.c:1021
#define PGINVALID_SOCKET
Definition: port.h:33
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1255
static void begin_replication_step(void)
Definition: worker.c:370
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:1946
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition: worker.c:1894
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5752
Plan * plan
Definition: execnodes.h:966
static void apply_handle_relation(StringInfo s)
Definition: worker.c:1485
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition: relation.c:528
int16 attnum
Definition: pg_attribute.h:83
static void apply_handle_commit_prepared(StringInfo s)
Definition: worker.c:933
#define ereport(elevel,...)
Definition: elog.h:157
Bitmapset * updatedCols
Definition: parsenodes.h:1161
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:37
XLogRecPtr local_end
Definition: worker.c:200
static MemoryContext LogicalStreamingContext
Definition: worker.c:243
struct SubXactInfo SubXactInfo
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
Definition: buffile.c:755
void FinishPreparedTransaction(const char *gid, bool isCommit)
Definition: twophase.c:1409
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4670
static void apply_dispatch(StringInfo s)
Definition: worker.c:2314
int errmsg_internal(const char *fmt,...)
Definition: elog.c:996
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:38
static void maybe_reread_subscription(void)
Definition: worker.c:2850
#define makeNode(_type_)
Definition: nodes.h:584
TimestampTz last_recv_time
static MemoryContext ApplyMessageContext
Definition: worker.c:239
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:689
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:362
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:177
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition: buffile.c:262
bool AllTablesyncsReady(void)
Definition: tablesync.c:1230
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
#define lfirst(lc)
Definition: pg_list.h:169
RepOriginId replorigin_session_origin
Definition: origin.c:154
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:631
static void apply_handle_origin(StringInfo s)
Definition: worker.c:1080
void StartTransactionCommand(void)
Definition: xact.c:2848
AttrNumber * attnums
Definition: attmap.h:36
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
LogicalRepRelMapEntry * rel
Definition: worker.c:222
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
LogicalRepMsgType
Definition: logicalproto.h:51
static int server_version
Definition: pg_dumpall.c:83
size_t Size
Definition: c.h:540
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:477
TransactionId remote_xid
Definition: worker.c:226
bool IsTransactionState(void)
Definition: xact.c:371
static void end_replication_step(void)
Definition: worker.c:393
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:736
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:424
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:95
int WalWriterDelay
Definition: walwriter.c:70
bool MySubscriptionValid
Definition: worker.c:248
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1182
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
Definition: proto.c:322
off_t offset
Definition: worker.c:265
#define InvalidRepOriginId
Definition: origin.h:33
static void finish_edata(ApplyExecutionData *edata)
Definition: worker.c:500
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:538
void FreeSubscription(Subscription *sub)
RTEKind rtekind
Definition: parsenodes.h:1007
#define AccessExclusiveLock
Definition: lockdefs.h:45
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:256
static void subxact_info_write(Oid subid, TransactionId xid)
Definition: worker.c:2966
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4690
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:833
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
Definition: proto.c:264
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition: proto.c:1059
void * palloc(Size size)
Definition: mcxt.c:1062
TupleConversionMap * ri_RootToPartitionMap
Definition: execnodes.h:513
int errmsg(const char *fmt,...)
Definition: elog.c:909
void pgstat_report_activity(BackendState state, const char *cmd_str)
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1286
static void stream_open_file(Oid subid, TransactionId xid, bool first)
Definition: worker.c:3201
static ApplySubXactData subxact_data
Definition: worker.c:277
static BufFile * stream_fd
Definition: worker.c:259
uint32 nsubxacts_max
Definition: worker.c:272
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:2513
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
#define elog(elevel,...)
Definition: elog.h:232
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
int i
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:555
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
#define errcontext
Definition: elog.h:204
static void cleanup_subxact_info(void)
Definition: worker.c:3298
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1697
void * arg
struct Latch * MyLatch
Definition: globals.c:57
struct FlushPosition FlushPosition
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:589
static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
Definition: worker.c:3315
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:123
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
static void set_apply_error_context_xact(TransactionId xid, TimestampTz ts)
Definition: worker.c:3605
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:761
TimestampTz committime
Definition: logicalproto.h:124
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
void BufFileWrite(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:598
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static void apply_handle_rollback_prepared(StringInfo s)
Definition: worker.c:974
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389
Definition: pg_list.h:50
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1899
#define snprintf
Definition: port.h:217
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1644
#define WL_LATCH_SET
Definition: latch.h:125
#define _(x)
Definition: elog.c:89
#define RelationGetRelid(relation)
Definition: rel.h:477
static TransactionId stream_xid
Definition: worker.c:256
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
CmdType
Definition: nodes.h:680
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:231
#define die(msg)
Definition: pg_test_fsync.c:97
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4761
TimestampTz committime
Definition: logicalproto.h:132
void ApplyWorkerMain(Datum main_arg)
Definition: worker.c:3329
void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid)
Definition: proto.c:1151
uint32 LogicalRepRelId
Definition: logicalproto.h:95
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1552
#define lfirst_oid(lc)
Definition: pg_list.h:171
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
TimestampTz reply_time
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:227
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:268
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
Definition: worker.c:853
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5781
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1774
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:2199
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:404
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)