PostgreSQL Source Code  git master
worker.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * worker.c
3  * PostgreSQL logical replication worker (apply)
4  *
5  * Copyright (c) 2016-2021, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/worker.c
9  *
10  * NOTES
11  * This file contains the worker which applies logical changes as they come
12  * from remote logical replication stream.
13  *
14  * The main worker (apply) is started by logical replication worker
15  * launcher for every enabled subscription in a database. It uses
16  * walsender protocol to communicate with publisher.
17  *
18  * This module includes server facing code and shares libpqwalreceiver
19  * module with walreceiver for providing the libpq specific functionality.
20  *
21  *
22  * STREAMED TRANSACTIONS
23  * ---------------------
24  * Streamed transactions (large transactions exceeding a memory limit on the
25  * upstream) are not applied immediately, but instead, the data is written
26  * to temporary files and then applied at once when the final commit arrives.
27  *
28  * Unlike the regular (non-streamed) case, handling streamed transactions has
29  * to handle aborts of both the toplevel transaction and subtransactions. This
30  * is achieved by tracking offsets for subtransactions, which is then used
31  * to truncate the file with serialized changes.
32  *
33  * The files are placed in tmp file directory by default, and the filenames
34  * include both the XID of the toplevel transaction and OID of the
35  * subscription. This is necessary so that different workers processing a
36  * remote transaction with the same XID doesn't interfere.
37  *
38  * We use BufFiles instead of using normal temporary files because (a) the
39  * BufFile infrastructure supports temporary files that exceed the OS file size
40  * limit, (b) provides a way for automatic clean up on the error and (c) provides
41  * a way to survive these files across local transactions and allow to open and
42  * close at stream start and close. We decided to use SharedFileSet
43  * infrastructure as without that it deletes the files on the closure of the
44  * file and if we decide to keep stream files open across the start/stop stream
45  * then it will consume a lot of memory (more than 8K for each BufFile and
46  * there could be multiple such BufFiles as the subscriber could receive
47  * multiple start/stop streams for different transactions before getting the
48  * commit). Moreover, if we don't use SharedFileSet then we also need to invent
49  * a new way to pass filenames to BufFile APIs so that we are allowed to open
50  * the file we desired across multiple stream-open calls for the same
51  * transaction.
52  *
53  * TWO_PHASE TRANSACTIONS
54  * ----------------------
55  * Two phase transactions are replayed at prepare and then committed or
56  * rolled back at commit prepared and rollback prepared respectively. It is
57  * possible to have a prepared transaction that arrives at the apply worker
58  * when the tablesync is busy doing the initial copy. In this case, the apply
59  * worker skips all the prepared operations [e.g. inserts] while the tablesync
60  * is still busy (see the condition of should_apply_changes_for_rel). The
61  * tablesync worker might not get such a prepared transaction because say it
62  * was prior to the initial consistent point but might have got some later
63  * commits. Now, the tablesync worker will exit without doing anything for the
64  * prepared transaction skipped by the apply worker as the sync location for it
65  * will be already ahead of the apply worker's current location. This would lead
66  * to an "empty prepare", because later when the apply worker does the commit
67  * prepare, there is nothing in it (the inserts were skipped earlier).
68  *
69  * To avoid this, and similar prepare confusions the subscription's two_phase
70  * commit is enabled only after the initial sync is over. The two_phase option
71  * has been implemented as a tri-state with values DISABLED, PENDING, and
72  * ENABLED.
73  *
74  * Even if the user specifies they want a subscription with two_phase = on,
75  * internally it will start with a tri-state of PENDING which only becomes
76  * ENABLED after all tablesync initializations are completed - i.e. when all
77  * tablesync workers have reached their READY state. In other words, the value
78  * PENDING is only a temporary state for subscription start-up.
79  *
80  * Until the two_phase is properly available (ENABLED) the subscription will
81  * behave as if two_phase = off. When the apply worker detects that all
82  * tablesyncs have become READY (while the tri-state was PENDING) it will
83  * restart the apply worker process. This happens in
84  * process_syncing_tables_for_apply.
85  *
86  * When the (re-started) apply worker finds that all tablesyncs are READY for a
87  * two_phase tri-state of PENDING it start streaming messages with the
88  * two_phase option which in turn enables the decoding of two-phase commits at
89  * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
90  * Now, it is possible that during the time we have not enabled two_phase, the
91  * publisher (replication server) would have skipped some prepares but we
92  * ensure that such prepares are sent along with commit prepare, see
93  * ReorderBufferFinishPrepared.
94  *
95  * If the subscription has no tables then a two_phase tri-state PENDING is
96  * left unchanged. This lets the user still do an ALTER TABLE REFRESH
97  * PUBLICATION which might otherwise be disallowed (see below).
98  *
99  * If ever a user needs to be aware of the tri-state value, they can fetch it
100  * from the pg_subscription catalog (see column subtwophasestate).
101  *
102  * We don't allow to toggle two_phase option of a subscription because it can
103  * lead to an inconsistent replica. Consider, initially, it was on and we have
104  * received some prepare then we turn it off, now at commit time the server
105  * will send the entire transaction data along with the commit. With some more
106  * analysis, we can allow changing this option from off to on but not sure if
107  * that alone would be useful.
108  *
109  * Finally, to avoid problems mentioned in previous paragraphs from any
110  * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
111  * to 'off' and then again back to 'on') there is a restriction for
112  * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
113  * the two_phase tri-state is ENABLED, except when copy_data = false.
114  *
115  * We can get prepare of the same GID more than once for the genuine cases
116  * where we have defined multiple subscriptions for publications on the same
117  * server and prepared transaction has operations on tables subscribed to those
118  * subscriptions. For such cases, if we use the GID sent by publisher one of
119  * the prepares will be successful and others will fail, in which case the
120  * server will send them again. Now, this can lead to a deadlock if user has
121  * set synchronous_standby_names for all the subscriptions on subscriber. To
122  * avoid such deadlocks, we generate a unique GID (consisting of the
123  * subscription oid and the xid of the prepared transaction) for each prepare
124  * transaction on the subscriber.
125  *-------------------------------------------------------------------------
126  */
127 
128 #include "postgres.h"
129 
130 #include <sys/stat.h>
131 #include <unistd.h>
132 
133 #include "access/table.h"
134 #include "access/tableam.h"
135 #include "access/twophase.h"
136 #include "access/xact.h"
137 #include "access/xlog_internal.h"
138 #include "catalog/catalog.h"
139 #include "catalog/namespace.h"
140 #include "catalog/partition.h"
141 #include "catalog/pg_inherits.h"
142 #include "catalog/pg_subscription.h"
144 #include "catalog/pg_tablespace.h"
145 #include "commands/tablecmds.h"
146 #include "commands/tablespace.h"
147 #include "commands/trigger.h"
148 #include "executor/executor.h"
149 #include "executor/execPartition.h"
151 #include "funcapi.h"
152 #include "libpq/pqformat.h"
153 #include "libpq/pqsignal.h"
154 #include "mb/pg_wchar.h"
155 #include "miscadmin.h"
156 #include "nodes/makefuncs.h"
157 #include "optimizer/optimizer.h"
158 #include "pgstat.h"
159 #include "postmaster/bgworker.h"
160 #include "postmaster/interrupt.h"
161 #include "postmaster/postmaster.h"
162 #include "postmaster/walwriter.h"
163 #include "replication/decode.h"
164 #include "replication/logical.h"
168 #include "replication/origin.h"
170 #include "replication/snapbuild.h"
171 #include "replication/walreceiver.h"
173 #include "rewrite/rewriteHandler.h"
174 #include "storage/buffile.h"
175 #include "storage/bufmgr.h"
176 #include "storage/fd.h"
177 #include "storage/ipc.h"
178 #include "storage/lmgr.h"
179 #include "storage/proc.h"
180 #include "storage/procarray.h"
181 #include "tcop/tcopprot.h"
182 #include "utils/builtins.h"
183 #include "utils/catcache.h"
184 #include "utils/dynahash.h"
185 #include "utils/datum.h"
186 #include "utils/fmgroids.h"
187 #include "utils/guc.h"
188 #include "utils/inval.h"
189 #include "utils/lsyscache.h"
190 #include "utils/memutils.h"
191 #include "utils/rel.h"
192 #include "utils/syscache.h"
193 #include "utils/timeout.h"
194 
195 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
196 
197 typedef struct FlushPosition
198 {
202 } FlushPosition;
203 
205 
206 typedef struct SlotErrCallbackArg
207 {
211 
212 typedef struct ApplyExecutionData
213 {
214  EState *estate; /* executor state, used to track resources */
215 
216  LogicalRepRelMapEntry *targetRel; /* replication target rel */
217  ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
218 
219  /* These fields are used when the target relation is partitioned: */
220  ModifyTableState *mtstate; /* dummy ModifyTable state */
221  PartitionTupleRouting *proute; /* partition routing info */
223 
224 /*
225  * Stream xid hash entry. Whenever we see a new xid we create this entry in the
226  * xidhash and along with it create the streaming file and store the fileset handle.
227  * The subxact file is created iff there is any subxact info under this xid. This
228  * entry is used on the subsequent streams for the xid to get the corresponding
229  * fileset handles, so storing them in hash makes the search faster.
230  */
231 typedef struct StreamXidHash
232 {
233  TransactionId xid; /* xid is the hash key and must be first */
234  SharedFileSet *stream_fileset; /* shared file set for stream data */
235  SharedFileSet *subxact_fileset; /* shared file set for subxact info */
236 } StreamXidHash;
237 
240 
241 /* per stream context for streaming transactions */
243 
245 
247 bool MySubscriptionValid = false;
248 
251 
252 /* fields valid only when processing streamed transaction */
253 static bool in_streamed_transaction = false;
254 
256 
257 /*
258  * Hash table for storing the streaming xid information along with shared file
259  * set for streaming and subxact files.
260  */
261 static HTAB *xidhash = NULL;
262 
263 /* BufFile handle of the current streaming file */
264 static BufFile *stream_fd = NULL;
265 
266 typedef struct SubXactInfo
267 {
268  TransactionId xid; /* XID of the subxact */
269  int fileno; /* file number in the buffile */
270  off_t offset; /* offset in the file */
271 } SubXactInfo;
272 
273 /* Sub-transaction data for the current streaming transaction */
274 typedef struct ApplySubXactData
275 {
276  uint32 nsubxacts; /* number of sub-transactions */
277  uint32 nsubxacts_max; /* current capacity of subxacts */
278  TransactionId subxact_last; /* xid of the last sub-transaction */
279  SubXactInfo *subxacts; /* sub-xact offset in changes file */
281 
283 
284 static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
285 static inline void changes_filename(char *path, Oid subid, TransactionId xid);
286 
287 /*
288  * Information about subtransactions of a given toplevel transaction.
289  */
290 static void subxact_info_write(Oid subid, TransactionId xid);
291 static void subxact_info_read(Oid subid, TransactionId xid);
292 static void subxact_info_add(TransactionId xid);
293 static inline void cleanup_subxact_info(void);
294 
295 /*
296  * Serialize and deserialize changes for a toplevel transaction.
297  */
298 static void stream_cleanup_files(Oid subid, TransactionId xid);
299 static void stream_open_file(Oid subid, TransactionId xid, bool first);
300 static void stream_write_change(char action, StringInfo s);
301 static void stream_close_file(void);
302 
303 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
304 
305 static void store_flush_position(XLogRecPtr remote_lsn);
306 
307 static void maybe_reread_subscription(void);
308 
309 /* prototype needed because of stream_commit */
310 static void apply_dispatch(StringInfo s);
311 
312 static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
314  ResultRelInfo *relinfo,
315  TupleTableSlot *remoteslot);
317  ResultRelInfo *relinfo,
318  TupleTableSlot *remoteslot,
319  LogicalRepTupleData *newtup);
321  ResultRelInfo *relinfo,
322  TupleTableSlot *remoteslot);
323 static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
324  LogicalRepRelation *remoterel,
325  TupleTableSlot *remoteslot,
326  TupleTableSlot **localslot);
328  TupleTableSlot *remoteslot,
329  LogicalRepTupleData *newtup,
330  CmdType operation);
331 
332 /* Compute GID for two_phase transactions */
333 static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
334 
335 /* Common streaming function to apply all the spooled messages */
336 static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
337 
338 /*
339  * Should this worker apply changes for given relation.
340  *
341  * This is mainly needed for initial relation data sync as that runs in
342  * separate worker process running in parallel and we need some way to skip
343  * changes coming to the main apply worker during the sync of a table.
344  *
345  * Note we need to do smaller or equals comparison for SYNCDONE state because
346  * it might hold position of end of initial slot consistent point WAL
347  * record + 1 (ie start of next record) and next record can be COMMIT of
348  * transaction we are now processing (which is what we set remote_final_lsn
349  * to in apply_handle_begin).
350  */
351 static bool
353 {
354  if (am_tablesync_worker())
355  return MyLogicalRepWorker->relid == rel->localreloid;
356  else
357  return (rel->state == SUBREL_STATE_READY ||
358  (rel->state == SUBREL_STATE_SYNCDONE &&
359  rel->statelsn <= remote_final_lsn));
360 }
361 
362 /*
363  * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
364  *
365  * Start a transaction, if this is the first step (else we keep using the
366  * existing transaction).
367  * Also provide a global snapshot and ensure we run in ApplyMessageContext.
368  */
369 static void
371 {
373 
374  if (!IsTransactionState())
375  {
378  }
379 
381 
382  MemoryContextSwitchTo(ApplyMessageContext);
383 }
384 
385 /*
386  * Finish up one step of a replication transaction.
387  * Callers of begin_replication_step() must also call this.
388  *
389  * We don't close out the transaction here, but we should increment
390  * the command counter to make the effects of this step visible.
391  */
392 static void
394 {
396 
398 }
399 
400 /*
401  * Handle streamed transactions.
402  *
403  * If in streaming mode (receiving a block of streamed transaction), we
404  * simply redirect it to a file for the proper toplevel transaction.
405  *
406  * Returns true for streamed transactions, false otherwise (regular mode).
407  */
408 static bool
410 {
411  TransactionId xid;
412 
413  /* not in streaming mode */
415  return false;
416 
417  Assert(stream_fd != NULL);
419 
420  /*
421  * We should have received XID of the subxact as the first part of the
422  * message, so extract it.
423  */
424  xid = pq_getmsgint(s, 4);
425 
426  if (!TransactionIdIsValid(xid))
427  ereport(ERROR,
428  (errcode(ERRCODE_PROTOCOL_VIOLATION),
429  errmsg_internal("invalid transaction ID in streamed replication transaction")));
430 
431  /* Add the new subxact to the array (unless already there). */
432  subxact_info_add(xid);
433 
434  /* write the change to the current file */
435  stream_write_change(action, s);
436 
437  return true;
438 }
439 
440 /*
441  * Executor state preparation for evaluation of constraint expressions,
442  * indexes and triggers for the specified relation.
443  *
444  * Note that the caller must open and close any indexes to be updated.
445  */
446 static ApplyExecutionData *
448 {
449  ApplyExecutionData *edata;
450  EState *estate;
451  RangeTblEntry *rte;
452  ResultRelInfo *resultRelInfo;
453 
454  edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
455  edata->targetRel = rel;
456 
457  edata->estate = estate = CreateExecutorState();
458 
459  rte = makeNode(RangeTblEntry);
460  rte->rtekind = RTE_RELATION;
461  rte->relid = RelationGetRelid(rel->localrel);
462  rte->relkind = rel->localrel->rd_rel->relkind;
464  ExecInitRangeTable(estate, list_make1(rte));
465 
466  edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
467 
468  /*
469  * Use Relation opened by logicalrep_rel_open() instead of opening it
470  * again.
471  */
472  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
473 
474  /*
475  * We put the ResultRelInfo in the es_opened_result_relations list, even
476  * though we don't populate the es_result_relations array. That's a bit
477  * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
478  *
479  * ExecOpenIndices() is not called here either, each execution path doing
480  * an apply operation being responsible for that.
481  */
483  lappend(estate->es_opened_result_relations, resultRelInfo);
484 
485  estate->es_output_cid = GetCurrentCommandId(true);
486 
487  /* Prepare to catch AFTER triggers. */
489 
490  /* other fields of edata remain NULL for now */
491 
492  return edata;
493 }
494 
495 /*
496  * Finish any operations related to the executor state created by
497  * create_edata_for_relation().
498  */
499 static void
501 {
502  EState *estate = edata->estate;
503 
504  /* Handle any queued AFTER triggers. */
505  AfterTriggerEndQuery(estate);
506 
507  /* Shut down tuple routing, if any was done. */
508  if (edata->proute)
509  ExecCleanupTupleRouting(edata->mtstate, edata->proute);
510 
511  /*
512  * Cleanup. It might seem that we should call ExecCloseResultRelations()
513  * here, but we intentionally don't. It would close the rel we added to
514  * es_opened_result_relations above, which is wrong because we took no
515  * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
516  * any other relations opened during execution.
517  */
518  ExecResetTupleTable(estate->es_tupleTable, false);
519  FreeExecutorState(estate);
520  pfree(edata);
521 }
522 
523 /*
524  * Executes default values for columns for which we can't map to remote
525  * relation columns.
526  *
527  * This allows us to support tables which have more columns on the downstream
528  * than on the upstream.
529  */
530 static void
532  TupleTableSlot *slot)
533 {
534  TupleDesc desc = RelationGetDescr(rel->localrel);
535  int num_phys_attrs = desc->natts;
536  int i;
537  int attnum,
538  num_defaults = 0;
539  int *defmap;
540  ExprState **defexprs;
541  ExprContext *econtext;
542 
543  econtext = GetPerTupleExprContext(estate);
544 
545  /* We got all the data via replication, no need to evaluate anything. */
546  if (num_phys_attrs == rel->remoterel.natts)
547  return;
548 
549  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
550  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
551 
552  Assert(rel->attrmap->maplen == num_phys_attrs);
553  for (attnum = 0; attnum < num_phys_attrs; attnum++)
554  {
555  Expr *defexpr;
556 
557  if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
558  continue;
559 
560  if (rel->attrmap->attnums[attnum] >= 0)
561  continue;
562 
563  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
564 
565  if (defexpr != NULL)
566  {
567  /* Run the expression through planner */
568  defexpr = expression_planner(defexpr);
569 
570  /* Initialize executable expression in copycontext */
571  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
572  defmap[num_defaults] = attnum;
573  num_defaults++;
574  }
575 
576  }
577 
578  for (i = 0; i < num_defaults; i++)
579  slot->tts_values[defmap[i]] =
580  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
581 }
582 
583 /*
584  * Error callback to give more context info about data conversion failures
585  * while reading data from the remote server.
586  */
587 static void
589 {
590  SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
592 
593  /* Nothing to do if remote attribute number is not set */
594  if (errarg->remote_attnum < 0)
595  return;
596 
597  rel = errarg->rel;
598  errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\"",
599  rel->remoterel.nspname, rel->remoterel.relname,
600  rel->remoterel.attnames[errarg->remote_attnum]);
601 }
602 
603 /*
604  * Store tuple data into slot.
605  *
606  * Incoming data can be either text or binary format.
607  */
608 static void
610  LogicalRepTupleData *tupleData)
611 {
612  int natts = slot->tts_tupleDescriptor->natts;
613  int i;
614  SlotErrCallbackArg errarg;
615  ErrorContextCallback errcallback;
616 
617  ExecClearTuple(slot);
618 
619  /* Push callback + info on the error context stack */
620  errarg.rel = rel;
621  errarg.remote_attnum = -1;
622  errcallback.callback = slot_store_error_callback;
623  errcallback.arg = (void *) &errarg;
624  errcallback.previous = error_context_stack;
625  error_context_stack = &errcallback;
626 
627  /* Call the "in" function for each non-dropped, non-null attribute */
628  Assert(natts == rel->attrmap->maplen);
629  for (i = 0; i < natts; i++)
630  {
632  int remoteattnum = rel->attrmap->attnums[i];
633 
634  if (!att->attisdropped && remoteattnum >= 0)
635  {
636  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
637 
638  Assert(remoteattnum < tupleData->ncols);
639 
640  errarg.remote_attnum = remoteattnum;
641 
642  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
643  {
644  Oid typinput;
645  Oid typioparam;
646 
647  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
648  slot->tts_values[i] =
649  OidInputFunctionCall(typinput, colvalue->data,
650  typioparam, att->atttypmod);
651  slot->tts_isnull[i] = false;
652  }
653  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
654  {
655  Oid typreceive;
656  Oid typioparam;
657 
658  /*
659  * In some code paths we may be asked to re-parse the same
660  * tuple data. Reset the StringInfo's cursor so that works.
661  */
662  colvalue->cursor = 0;
663 
664  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
665  slot->tts_values[i] =
666  OidReceiveFunctionCall(typreceive, colvalue,
667  typioparam, att->atttypmod);
668 
669  /* Trouble if it didn't eat the whole buffer */
670  if (colvalue->cursor != colvalue->len)
671  ereport(ERROR,
672  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
673  errmsg("incorrect binary data format in logical replication column %d",
674  remoteattnum + 1)));
675  slot->tts_isnull[i] = false;
676  }
677  else
678  {
679  /*
680  * NULL value from remote. (We don't expect to see
681  * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
682  * NULL.)
683  */
684  slot->tts_values[i] = (Datum) 0;
685  slot->tts_isnull[i] = true;
686  }
687 
688  errarg.remote_attnum = -1;
689  }
690  else
691  {
692  /*
693  * We assign NULL to dropped attributes and missing values
694  * (missing values should be later filled using
695  * slot_fill_defaults).
696  */
697  slot->tts_values[i] = (Datum) 0;
698  slot->tts_isnull[i] = true;
699  }
700  }
701 
702  /* Pop the error context stack */
703  error_context_stack = errcallback.previous;
704 
705  ExecStoreVirtualTuple(slot);
706 }
707 
708 /*
709  * Replace updated columns with data from the LogicalRepTupleData struct.
710  * This is somewhat similar to heap_modify_tuple but also calls the type
711  * input functions on the user data.
712  *
713  * "slot" is filled with a copy of the tuple in "srcslot", replacing
714  * columns provided in "tupleData" and leaving others as-is.
715  *
716  * Caution: unreplaced pass-by-ref columns in "slot" will point into the
717  * storage for "srcslot". This is OK for current usage, but someday we may
718  * need to materialize "slot" at the end to make it independent of "srcslot".
719  */
720 static void
723  LogicalRepTupleData *tupleData)
724 {
725  int natts = slot->tts_tupleDescriptor->natts;
726  int i;
727  SlotErrCallbackArg errarg;
728  ErrorContextCallback errcallback;
729 
730  /* We'll fill "slot" with a virtual tuple, so we must start with ... */
731  ExecClearTuple(slot);
732 
733  /*
734  * Copy all the column data from srcslot, so that we'll have valid values
735  * for unreplaced columns.
736  */
737  Assert(natts == srcslot->tts_tupleDescriptor->natts);
738  slot_getallattrs(srcslot);
739  memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
740  memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
741 
742  /* For error reporting, push callback + info on the error context stack */
743  errarg.rel = rel;
744  errarg.remote_attnum = -1;
745  errcallback.callback = slot_store_error_callback;
746  errcallback.arg = (void *) &errarg;
747  errcallback.previous = error_context_stack;
748  error_context_stack = &errcallback;
749 
750  /* Call the "in" function for each replaced attribute */
751  Assert(natts == rel->attrmap->maplen);
752  for (i = 0; i < natts; i++)
753  {
755  int remoteattnum = rel->attrmap->attnums[i];
756 
757  if (remoteattnum < 0)
758  continue;
759 
760  Assert(remoteattnum < tupleData->ncols);
761 
762  if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
763  {
764  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
765 
766  errarg.remote_attnum = remoteattnum;
767 
768  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
769  {
770  Oid typinput;
771  Oid typioparam;
772 
773  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
774  slot->tts_values[i] =
775  OidInputFunctionCall(typinput, colvalue->data,
776  typioparam, att->atttypmod);
777  slot->tts_isnull[i] = false;
778  }
779  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
780  {
781  Oid typreceive;
782  Oid typioparam;
783 
784  /*
785  * In some code paths we may be asked to re-parse the same
786  * tuple data. Reset the StringInfo's cursor so that works.
787  */
788  colvalue->cursor = 0;
789 
790  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
791  slot->tts_values[i] =
792  OidReceiveFunctionCall(typreceive, colvalue,
793  typioparam, att->atttypmod);
794 
795  /* Trouble if it didn't eat the whole buffer */
796  if (colvalue->cursor != colvalue->len)
797  ereport(ERROR,
798  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
799  errmsg("incorrect binary data format in logical replication column %d",
800  remoteattnum + 1)));
801  slot->tts_isnull[i] = false;
802  }
803  else
804  {
805  /* must be LOGICALREP_COLUMN_NULL */
806  slot->tts_values[i] = (Datum) 0;
807  slot->tts_isnull[i] = true;
808  }
809 
810  errarg.remote_attnum = -1;
811  }
812  }
813 
814  /* Pop the error context stack */
815  error_context_stack = errcallback.previous;
816 
817  /* And finally, declare that "slot" contains a valid virtual tuple */
818  ExecStoreVirtualTuple(slot);
819 }
820 
821 /*
822  * Handle BEGIN message.
823  */
824 static void
826 {
827  LogicalRepBeginData begin_data;
828 
829  logicalrep_read_begin(s, &begin_data);
830 
831  remote_final_lsn = begin_data.final_lsn;
832 
833  in_remote_transaction = true;
834 
836 }
837 
838 /*
839  * Handle COMMIT message.
840  *
841  * TODO, support tracking of multiple origins
842  */
843 static void
845 {
846  LogicalRepCommitData commit_data;
847 
848  logicalrep_read_commit(s, &commit_data);
849 
850  if (commit_data.commit_lsn != remote_final_lsn)
851  ereport(ERROR,
852  (errcode(ERRCODE_PROTOCOL_VIOLATION),
853  errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
854  LSN_FORMAT_ARGS(commit_data.commit_lsn),
856 
857  apply_handle_commit_internal(&commit_data);
858 
859  /* Process any tables that are being synchronized in parallel. */
860  process_syncing_tables(commit_data.end_lsn);
861 
863 }
864 
865 /*
866  * Handle BEGIN PREPARE message.
867  */
868 static void
870 {
871  LogicalRepPreparedTxnData begin_data;
872 
873  /* Tablesync should never receive prepare. */
874  if (am_tablesync_worker())
875  ereport(ERROR,
876  (errcode(ERRCODE_PROTOCOL_VIOLATION),
877  errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
878 
879  logicalrep_read_begin_prepare(s, &begin_data);
880 
881  remote_final_lsn = begin_data.prepare_lsn;
882 
883  in_remote_transaction = true;
884 
886 }
887 
888 /*
889  * Common function to prepare the GID.
890  */
891 static void
893 {
894  char gid[GIDSIZE];
895 
896  /*
897  * Compute unique GID for two_phase transactions. We don't use GID of
898  * prepared transaction sent by server as that can lead to deadlock when
899  * we have multiple subscriptions from same node point to publications on
900  * the same node. See comments atop worker.c
901  */
902  TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
903  gid, sizeof(gid));
904 
905  /*
906  * BeginTransactionBlock is necessary to balance the EndTransactionBlock
907  * called within the PrepareTransactionBlock below.
908  */
910  CommitTransactionCommand(); /* Completes the preceding Begin command. */
911 
912  /*
913  * Update origin state so we can restart streaming from correct position
914  * in case of crash.
915  */
916  replorigin_session_origin_lsn = prepare_data->end_lsn;
918 
920 }
921 
922 /*
923  * Handle PREPARE message.
924  */
925 static void
927 {
928  LogicalRepPreparedTxnData prepare_data;
929 
930  logicalrep_read_prepare(s, &prepare_data);
931 
932  if (prepare_data.prepare_lsn != remote_final_lsn)
933  ereport(ERROR,
934  (errcode(ERRCODE_PROTOCOL_VIOLATION),
935  errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
936  LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
938 
939  /*
940  * Unlike commit, here, we always prepare the transaction even though no
941  * change has happened in this transaction. It is done this way because at
942  * commit prepared time, we won't know whether we have skipped preparing a
943  * transaction because of no change.
944  *
945  * XXX, We can optimize such that at commit prepared time, we first check
946  * whether we have prepared the transaction or not but that doesn't seem
947  * worthwhile because such cases shouldn't be common.
948  */
950 
951  apply_handle_prepare_internal(&prepare_data);
952 
955  pgstat_report_stat(false);
956 
957  store_flush_position(prepare_data.end_lsn);
958 
959  in_remote_transaction = false;
960 
961  /* Process any tables that are being synchronized in parallel. */
962  process_syncing_tables(prepare_data.end_lsn);
963 
965 }
966 
967 /*
968  * Handle a COMMIT PREPARED of a previously PREPARED transaction.
969  */
970 static void
972 {
973  LogicalRepCommitPreparedTxnData prepare_data;
974  char gid[GIDSIZE];
975 
976  logicalrep_read_commit_prepared(s, &prepare_data);
977 
978  /* Compute GID for two_phase transactions. */
979  TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
980  gid, sizeof(gid));
981 
982  /* There is no transaction when COMMIT PREPARED is called */
984 
985  /*
986  * Update origin state so we can restart streaming from correct position
987  * in case of crash.
988  */
989  replorigin_session_origin_lsn = prepare_data.end_lsn;
991 
992  FinishPreparedTransaction(gid, true);
995  pgstat_report_stat(false);
996 
997  store_flush_position(prepare_data.end_lsn);
998  in_remote_transaction = false;
999 
1000  /* Process any tables that are being synchronized in parallel. */
1001  process_syncing_tables(prepare_data.end_lsn);
1002 
1004 }
1005 
1006 /*
1007  * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1008  */
1009 static void
1011 {
1012  LogicalRepRollbackPreparedTxnData rollback_data;
1013  char gid[GIDSIZE];
1014 
1015  logicalrep_read_rollback_prepared(s, &rollback_data);
1016 
1017  /* Compute GID for two_phase transactions. */
1018  TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1019  gid, sizeof(gid));
1020 
1021  /*
1022  * It is possible that we haven't received prepare because it occurred
1023  * before walsender reached a consistent point or the two_phase was still
1024  * not enabled by that time, so in such cases, we need to skip rollback
1025  * prepared.
1026  */
1027  if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1028  rollback_data.prepare_time))
1029  {
1030  /*
1031  * Update origin state so we can restart streaming from correct
1032  * position in case of crash.
1033  */
1036 
1037  /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1039  FinishPreparedTransaction(gid, false);
1042  }
1043 
1044  pgstat_report_stat(false);
1045 
1046  store_flush_position(rollback_data.rollback_end_lsn);
1047  in_remote_transaction = false;
1048 
1049  /* Process any tables that are being synchronized in parallel. */
1051 
1053 }
1054 
1055 /*
1056  * Handle STREAM PREPARE.
1057  *
1058  * Logic is in two parts:
1059  * 1. Replay all the spooled operations
1060  * 2. Mark the transaction as prepared
1061  */
1062 static void
1064 {
1065  LogicalRepPreparedTxnData prepare_data;
1066 
1068  ereport(ERROR,
1069  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1070  errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1071 
1072  /* Tablesync should never receive prepare. */
1073  if (am_tablesync_worker())
1074  ereport(ERROR,
1075  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1076  errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1077 
1078  logicalrep_read_stream_prepare(s, &prepare_data);
1079 
1080  elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
1081 
1082  /* Replay all the spooled operations. */
1083  apply_spooled_messages(prepare_data.xid, prepare_data.prepare_lsn);
1084 
1085  /* Mark the transaction as prepared. */
1086  apply_handle_prepare_internal(&prepare_data);
1087 
1089 
1090  pgstat_report_stat(false);
1091 
1092  store_flush_position(prepare_data.end_lsn);
1093 
1094  in_remote_transaction = false;
1095 
1096  /* unlink the files with serialized changes and subxact info. */
1098 
1099  /* Process any tables that are being synchronized in parallel. */
1100  process_syncing_tables(prepare_data.end_lsn);
1101 
1103 }
1104 
1105 /*
1106  * Handle ORIGIN message.
1107  *
1108  * TODO, support tracking of multiple origins
1109  */
1110 static void
1112 {
1113  /*
1114  * ORIGIN message can only come inside streaming transaction or inside
1115  * remote transaction and before any actual writes.
1116  */
1117  if (!in_streamed_transaction &&
1120  ereport(ERROR,
1121  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1122  errmsg_internal("ORIGIN message sent out of order")));
1123 }
1124 
1125 /*
1126  * Handle STREAM START message.
1127  */
1128 static void
1130 {
1131  bool first_segment;
1132  HASHCTL hash_ctl;
1133 
1135  ereport(ERROR,
1136  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1137  errmsg_internal("duplicate STREAM START message")));
1138 
1139  /*
1140  * Start a transaction on stream start, this transaction will be committed
1141  * on the stream stop unless it is a tablesync worker in which case it
1142  * will be committed after processing all the messages. We need the
1143  * transaction for handling the buffile, used for serializing the
1144  * streaming data and subxact info.
1145  */
1147 
1148  /* notify handle methods we're processing a remote transaction */
1149  in_streamed_transaction = true;
1150 
1151  /* extract XID of the top-level transaction */
1152  stream_xid = logicalrep_read_stream_start(s, &first_segment);
1153 
1155  ereport(ERROR,
1156  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1157  errmsg_internal("invalid transaction ID in streamed replication transaction")));
1158 
1159  /*
1160  * Initialize the xidhash table if we haven't yet. This will be used for
1161  * the entire duration of the apply worker so create it in permanent
1162  * context.
1163  */
1164  if (xidhash == NULL)
1165  {
1166  hash_ctl.keysize = sizeof(TransactionId);
1167  hash_ctl.entrysize = sizeof(StreamXidHash);
1168  hash_ctl.hcxt = ApplyContext;
1169  xidhash = hash_create("StreamXidHash", 1024, &hash_ctl,
1171  }
1172 
1173  /* open the spool file for this transaction */
1175 
1176  /* if this is not the first segment, open existing subxact file */
1177  if (!first_segment)
1179 
1181 
1183 }
1184 
1185 /*
1186  * Handle STREAM STOP message.
1187  */
1188 static void
1190 {
1192  ereport(ERROR,
1193  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1194  errmsg_internal("STREAM STOP message without STREAM START")));
1195 
1196  /*
1197  * Close the file with serialized changes, and serialize information about
1198  * subxacts for the toplevel transaction.
1199  */
1202 
1203  /* We must be in a valid transaction state */
1205 
1206  /* Commit the per-stream transaction */
1208 
1209  in_streamed_transaction = false;
1210 
1211  /* Reset per-stream context */
1212  MemoryContextReset(LogicalStreamingContext);
1213 
1215 }
1216 
1217 /*
1218  * Handle STREAM abort message.
1219  */
1220 static void
1222 {
1223  TransactionId xid;
1224  TransactionId subxid;
1225 
1227  ereport(ERROR,
1228  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1229  errmsg_internal("STREAM ABORT message without STREAM STOP")));
1230 
1231  logicalrep_read_stream_abort(s, &xid, &subxid);
1232 
1233  /*
1234  * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1235  * just delete the files with serialized info.
1236  */
1237  if (xid == subxid)
1239  else
1240  {
1241  /*
1242  * OK, so it's a subxact. We need to read the subxact file for the
1243  * toplevel transaction, determine the offset tracked for the subxact,
1244  * and truncate the file with changes. We also remove the subxacts
1245  * with higher offsets (or rather higher XIDs).
1246  *
1247  * We intentionally scan the array from the tail, because we're likely
1248  * aborting a change for the most recent subtransactions.
1249  *
1250  * We can't use the binary search here as subxact XIDs won't
1251  * necessarily arrive in sorted order, consider the case where we have
1252  * released the savepoint for multiple subtransactions and then
1253  * performed rollback to savepoint for one of the earlier
1254  * sub-transaction.
1255  */
1256  int64 i;
1257  int64 subidx;
1258  BufFile *fd;
1259  bool found = false;
1260  char path[MAXPGPATH];
1261  StreamXidHash *ent;
1262 
1263  subidx = -1;
1266 
1267  for (i = subxact_data.nsubxacts; i > 0; i--)
1268  {
1269  if (subxact_data.subxacts[i - 1].xid == subxid)
1270  {
1271  subidx = (i - 1);
1272  found = true;
1273  break;
1274  }
1275  }
1276 
1277  /*
1278  * If it's an empty sub-transaction then we will not find the subxid
1279  * here so just cleanup the subxact info and return.
1280  */
1281  if (!found)
1282  {
1283  /* Cleanup the subxact info */
1287  return;
1288  }
1289 
1290  ent = (StreamXidHash *) hash_search(xidhash,
1291  (void *) &xid,
1292  HASH_FIND,
1293  NULL);
1294  if (!ent)
1295  ereport(ERROR,
1296  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1297  errmsg_internal("transaction %u not found in stream XID hash table",
1298  xid)));
1299 
1300  /* open the changes file */
1302  fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
1303 
1304  /* OK, truncate the file at the right offset */
1305  BufFileTruncateShared(fd, subxact_data.subxacts[subidx].fileno,
1306  subxact_data.subxacts[subidx].offset);
1307  BufFileClose(fd);
1308 
1309  /* discard the subxacts added later */
1310  subxact_data.nsubxacts = subidx;
1311 
1312  /* write the updated subxact list */
1314 
1317  }
1318 }
1319 
1320 /*
1321  * Common spoolfile processing.
1322  */
1323 static void
1325 {
1327  int nchanges;
1328  char path[MAXPGPATH];
1329  char *buffer = NULL;
1330  StreamXidHash *ent;
1331  MemoryContext oldcxt;
1332  BufFile *fd;
1333 
1334  /* Make sure we have an open transaction */
1336 
1337  /*
1338  * Allocate file handle and memory required to process all the messages in
1339  * TopTransactionContext to avoid them getting reset after each message is
1340  * processed.
1341  */
1343 
1344  /* Open the spool file for the committed/prepared transaction */
1346  elog(DEBUG1, "replaying changes from file \"%s\"", path);
1347 
1348  ent = (StreamXidHash *) hash_search(xidhash,
1349  (void *) &xid,
1350  HASH_FIND,
1351  NULL);
1352  if (!ent)
1353  ereport(ERROR,
1354  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1355  errmsg_internal("transaction %u not found in stream XID hash table",
1356  xid)));
1357 
1358  fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY);
1359 
1360  buffer = palloc(BLCKSZ);
1361  initStringInfo(&s2);
1362 
1363  MemoryContextSwitchTo(oldcxt);
1364 
1365  remote_final_lsn = lsn;
1366 
1367  /*
1368  * Make sure the handle apply_dispatch methods are aware we're in a remote
1369  * transaction.
1370  */
1371  in_remote_transaction = true;
1373 
1375 
1376  /*
1377  * Read the entries one by one and pass them through the same logic as in
1378  * apply_dispatch.
1379  */
1380  nchanges = 0;
1381  while (true)
1382  {
1383  int nbytes;
1384  int len;
1385 
1387 
1388  /* read length of the on-disk record */
1389  nbytes = BufFileRead(fd, &len, sizeof(len));
1390 
1391  /* have we reached end of the file? */
1392  if (nbytes == 0)
1393  break;
1394 
1395  /* do we have a correct length? */
1396  if (nbytes != sizeof(len))
1397  ereport(ERROR,
1399  errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1400  path)));
1401 
1402  if (len <= 0)
1403  elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
1404  len, path);
1405 
1406  /* make sure we have sufficiently large buffer */
1407  buffer = repalloc(buffer, len);
1408 
1409  /* and finally read the data into the buffer */
1410  if (BufFileRead(fd, buffer, len) != len)
1411  ereport(ERROR,
1413  errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1414  path)));
1415 
1416  /* copy the buffer to the stringinfo and call apply_dispatch */
1417  resetStringInfo(&s2);
1418  appendBinaryStringInfo(&s2, buffer, len);
1419 
1420  /* Ensure we are reading the data into our memory context. */
1421  oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
1422 
1423  apply_dispatch(&s2);
1424 
1425  MemoryContextReset(ApplyMessageContext);
1426 
1427  MemoryContextSwitchTo(oldcxt);
1428 
1429  nchanges++;
1430 
1431  if (nchanges % 1000 == 0)
1432  elog(DEBUG1, "replayed %d changes from file \"%s\"",
1433  nchanges, path);
1434  }
1435 
1436  BufFileClose(fd);
1437 
1438  pfree(buffer);
1439  pfree(s2.data);
1440 
1441  elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
1442  nchanges, path);
1443 
1444  return;
1445 }
1446 
1447 /*
1448  * Handle STREAM COMMIT message.
1449  */
1450 static void
1452 {
1453  TransactionId xid;
1454  LogicalRepCommitData commit_data;
1455 
1457  ereport(ERROR,
1458  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1459  errmsg_internal("STREAM COMMIT message without STREAM STOP")));
1460 
1461  xid = logicalrep_read_stream_commit(s, &commit_data);
1462 
1463  elog(DEBUG1, "received commit for streamed transaction %u", xid);
1464 
1465  apply_spooled_messages(xid, commit_data.commit_lsn);
1466 
1467  apply_handle_commit_internal(&commit_data);
1468 
1469  /* unlink the files with serialized changes and subxact info */
1471 
1472  /* Process any tables that are being synchronized in parallel. */
1473  process_syncing_tables(commit_data.end_lsn);
1474 
1476 }
1477 
1478 /*
1479  * Helper function for apply_handle_commit and apply_handle_stream_commit.
1480  */
1481 static void
1483 {
1484  if (IsTransactionState())
1485  {
1486  /*
1487  * Update origin state so we can restart streaming from correct
1488  * position in case of crash.
1489  */
1490  replorigin_session_origin_lsn = commit_data->end_lsn;
1492 
1494  pgstat_report_stat(false);
1495 
1496  store_flush_position(commit_data->end_lsn);
1497  }
1498  else
1499  {
1500  /* Process any invalidation messages that might have accumulated. */
1503  }
1504 
1505  in_remote_transaction = false;
1506 }
1507 
1508 /*
1509  * Handle RELATION message.
1510  *
1511  * Note we don't do validation against local schema here. The validation
1512  * against local schema is postponed until first change for given relation
1513  * comes as we only care about it when applying changes for it anyway and we
1514  * do less locking this way.
1515  */
1516 static void
1518 {
1519  LogicalRepRelation *rel;
1520 
1522  return;
1523 
1524  rel = logicalrep_read_rel(s);
1526 }
1527 
1528 /*
1529  * Handle TYPE message.
1530  *
1531  * This is now vestigial; we read the info and discard it.
1532  */
1533 static void
1535 {
1536  LogicalRepTyp typ;
1537 
1539  return;
1540 
1541  logicalrep_read_typ(s, &typ);
1542 }
1543 
1544 /*
1545  * Get replica identity index or if it is not defined a primary key.
1546  *
1547  * If neither is defined, returns InvalidOid
1548  */
1549 static Oid
1551 {
1552  Oid idxoid;
1553 
1554  idxoid = RelationGetReplicaIndex(rel);
1555 
1556  if (!OidIsValid(idxoid))
1557  idxoid = RelationGetPrimaryKeyIndex(rel);
1558 
1559  return idxoid;
1560 }
1561 
1562 /*
1563  * Handle INSERT message.
1564  */
1565 
1566 static void
1568 {
1569  LogicalRepRelMapEntry *rel;
1570  LogicalRepTupleData newtup;
1571  LogicalRepRelId relid;
1572  ApplyExecutionData *edata;
1573  EState *estate;
1574  TupleTableSlot *remoteslot;
1575  MemoryContext oldctx;
1576 
1578  return;
1579 
1581 
1582  relid = logicalrep_read_insert(s, &newtup);
1583  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1584  if (!should_apply_changes_for_rel(rel))
1585  {
1586  /*
1587  * The relation can't become interesting in the middle of the
1588  * transaction so it's safe to unlock it.
1589  */
1592  return;
1593  }
1594 
1595  /* Initialize the executor state. */
1596  edata = create_edata_for_relation(rel);
1597  estate = edata->estate;
1598  remoteslot = ExecInitExtraTupleSlot(estate,
1599  RelationGetDescr(rel->localrel),
1600  &TTSOpsVirtual);
1601 
1602  /* Process and store remote tuple in the slot */
1604  slot_store_data(remoteslot, rel, &newtup);
1605  slot_fill_defaults(rel, estate, remoteslot);
1606  MemoryContextSwitchTo(oldctx);
1607 
1608  /* For a partitioned table, insert the tuple into a partition. */
1609  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1611  remoteslot, NULL, CMD_INSERT);
1612  else
1614  remoteslot);
1615 
1616  finish_edata(edata);
1617 
1619 
1621 }
1622 
1623 /*
1624  * Workhorse for apply_handle_insert()
1625  * relinfo is for the relation we're actually inserting into
1626  * (could be a child partition of edata->targetRelInfo)
1627  */
1628 static void
1630  ResultRelInfo *relinfo,
1631  TupleTableSlot *remoteslot)
1632 {
1633  EState *estate = edata->estate;
1634 
1635  /* We must open indexes here. */
1636  ExecOpenIndices(relinfo, false);
1637 
1638  /* Do the insert. */
1639  ExecSimpleRelationInsert(relinfo, estate, remoteslot);
1640 
1641  /* Cleanup. */
1642  ExecCloseIndices(relinfo);
1643 }
1644 
1645 /*
1646  * Check if the logical replication relation is updatable and throw
1647  * appropriate error if it isn't.
1648  */
1649 static void
1651 {
1652  /* Updatable, no error. */
1653  if (rel->updatable)
1654  return;
1655 
1656  /*
1657  * We are in error mode so it's fine this is somewhat slow. It's better to
1658  * give user correct error.
1659  */
1661  {
1662  ereport(ERROR,
1663  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1664  errmsg("publisher did not send replica identity column "
1665  "expected by the logical replication target relation \"%s.%s\"",
1666  rel->remoterel.nspname, rel->remoterel.relname)));
1667  }
1668 
1669  ereport(ERROR,
1670  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1671  errmsg("logical replication target relation \"%s.%s\" has "
1672  "neither REPLICA IDENTITY index nor PRIMARY "
1673  "KEY and published relation does not have "
1674  "REPLICA IDENTITY FULL",
1675  rel->remoterel.nspname, rel->remoterel.relname)));
1676 }
1677 
1678 /*
1679  * Handle UPDATE message.
1680  *
1681  * TODO: FDW support
1682  */
1683 static void
1685 {
1686  LogicalRepRelMapEntry *rel;
1687  LogicalRepRelId relid;
1688  ApplyExecutionData *edata;
1689  EState *estate;
1690  LogicalRepTupleData oldtup;
1691  LogicalRepTupleData newtup;
1692  bool has_oldtup;
1693  TupleTableSlot *remoteslot;
1694  RangeTblEntry *target_rte;
1695  MemoryContext oldctx;
1696 
1698  return;
1699 
1701 
1702  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
1703  &newtup);
1704  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1705  if (!should_apply_changes_for_rel(rel))
1706  {
1707  /*
1708  * The relation can't become interesting in the middle of the
1709  * transaction so it's safe to unlock it.
1710  */
1713  return;
1714  }
1715 
1716  /* Check if we can do the update. */
1718 
1719  /* Initialize the executor state. */
1720  edata = create_edata_for_relation(rel);
1721  estate = edata->estate;
1722  remoteslot = ExecInitExtraTupleSlot(estate,
1723  RelationGetDescr(rel->localrel),
1724  &TTSOpsVirtual);
1725 
1726  /*
1727  * Populate updatedCols so that per-column triggers can fire, and so
1728  * executor can correctly pass down indexUnchanged hint. This could
1729  * include more columns than were actually changed on the publisher
1730  * because the logical replication protocol doesn't contain that
1731  * information. But it would for example exclude columns that only exist
1732  * on the subscriber, since we are not touching those.
1733  */
1734  target_rte = list_nth(estate->es_range_table, 0);
1735  for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
1736  {
1738  int remoteattnum = rel->attrmap->attnums[i];
1739 
1740  if (!att->attisdropped && remoteattnum >= 0)
1741  {
1742  Assert(remoteattnum < newtup.ncols);
1743  if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
1744  target_rte->updatedCols =
1745  bms_add_member(target_rte->updatedCols,
1747  }
1748  }
1749 
1750  /* Also populate extraUpdatedCols, in case we have generated columns */
1751  fill_extraUpdatedCols(target_rte, rel->localrel);
1752 
1753  /* Build the search tuple. */
1755  slot_store_data(remoteslot, rel,
1756  has_oldtup ? &oldtup : &newtup);
1757  MemoryContextSwitchTo(oldctx);
1758 
1759  /* For a partitioned table, apply update to correct partition. */
1760  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1762  remoteslot, &newtup, CMD_UPDATE);
1763  else
1765  remoteslot, &newtup);
1766 
1767  finish_edata(edata);
1768 
1770 
1772 }
1773 
1774 /*
1775  * Workhorse for apply_handle_update()
1776  * relinfo is for the relation we're actually updating in
1777  * (could be a child partition of edata->targetRelInfo)
1778  */
1779 static void
1781  ResultRelInfo *relinfo,
1782  TupleTableSlot *remoteslot,
1783  LogicalRepTupleData *newtup)
1784 {
1785  EState *estate = edata->estate;
1786  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
1787  Relation localrel = relinfo->ri_RelationDesc;
1788  EPQState epqstate;
1789  TupleTableSlot *localslot;
1790  bool found;
1791  MemoryContext oldctx;
1792 
1793  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1794  ExecOpenIndices(relinfo, false);
1795 
1796  found = FindReplTupleInLocalRel(estate, localrel,
1797  &relmapentry->remoterel,
1798  remoteslot, &localslot);
1799  ExecClearTuple(remoteslot);
1800 
1801  /*
1802  * Tuple found.
1803  *
1804  * Note this will fail if there are other conflicting unique indexes.
1805  */
1806  if (found)
1807  {
1808  /* Process and store remote tuple in the slot */
1810  slot_modify_data(remoteslot, localslot, relmapentry, newtup);
1811  MemoryContextSwitchTo(oldctx);
1812 
1813  EvalPlanQualSetSlot(&epqstate, remoteslot);
1814 
1815  /* Do the actual update. */
1816  ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
1817  remoteslot);
1818  }
1819  else
1820  {
1821  /*
1822  * The tuple to be updated could not be found. Do nothing except for
1823  * emitting a log message.
1824  *
1825  * XXX should this be promoted to ereport(LOG) perhaps?
1826  */
1827  elog(DEBUG1,
1828  "logical replication did not find row to be updated "
1829  "in replication target relation \"%s\"",
1830  RelationGetRelationName(localrel));
1831  }
1832 
1833  /* Cleanup. */
1834  ExecCloseIndices(relinfo);
1835  EvalPlanQualEnd(&epqstate);
1836 }
1837 
1838 /*
1839  * Handle DELETE message.
1840  *
1841  * TODO: FDW support
1842  */
1843 static void
1845 {
1846  LogicalRepRelMapEntry *rel;
1847  LogicalRepTupleData oldtup;
1848  LogicalRepRelId relid;
1849  ApplyExecutionData *edata;
1850  EState *estate;
1851  TupleTableSlot *remoteslot;
1852  MemoryContext oldctx;
1853 
1855  return;
1856 
1858 
1859  relid = logicalrep_read_delete(s, &oldtup);
1860  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1861  if (!should_apply_changes_for_rel(rel))
1862  {
1863  /*
1864  * The relation can't become interesting in the middle of the
1865  * transaction so it's safe to unlock it.
1866  */
1869  return;
1870  }
1871 
1872  /* Check if we can do the delete. */
1874 
1875  /* Initialize the executor state. */
1876  edata = create_edata_for_relation(rel);
1877  estate = edata->estate;
1878  remoteslot = ExecInitExtraTupleSlot(estate,
1879  RelationGetDescr(rel->localrel),
1880  &TTSOpsVirtual);
1881 
1882  /* Build the search tuple. */
1884  slot_store_data(remoteslot, rel, &oldtup);
1885  MemoryContextSwitchTo(oldctx);
1886 
1887  /* For a partitioned table, apply delete to correct partition. */
1888  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1890  remoteslot, NULL, CMD_DELETE);
1891  else
1893  remoteslot);
1894 
1895  finish_edata(edata);
1896 
1898 
1900 }
1901 
1902 /*
1903  * Workhorse for apply_handle_delete()
1904  * relinfo is for the relation we're actually deleting from
1905  * (could be a child partition of edata->targetRelInfo)
1906  */
1907 static void
1909  ResultRelInfo *relinfo,
1910  TupleTableSlot *remoteslot)
1911 {
1912  EState *estate = edata->estate;
1913  Relation localrel = relinfo->ri_RelationDesc;
1914  LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
1915  EPQState epqstate;
1916  TupleTableSlot *localslot;
1917  bool found;
1918 
1919  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1920  ExecOpenIndices(relinfo, false);
1921 
1922  found = FindReplTupleInLocalRel(estate, localrel, remoterel,
1923  remoteslot, &localslot);
1924 
1925  /* If found delete it. */
1926  if (found)
1927  {
1928  EvalPlanQualSetSlot(&epqstate, localslot);
1929 
1930  /* Do the actual delete. */
1931  ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
1932  }
1933  else
1934  {
1935  /*
1936  * The tuple to be deleted could not be found. Do nothing except for
1937  * emitting a log message.
1938  *
1939  * XXX should this be promoted to ereport(LOG) perhaps?
1940  */
1941  elog(DEBUG1,
1942  "logical replication did not find row to be deleted "
1943  "in replication target relation \"%s\"",
1944  RelationGetRelationName(localrel));
1945  }
1946 
1947  /* Cleanup. */
1948  ExecCloseIndices(relinfo);
1949  EvalPlanQualEnd(&epqstate);
1950 }
1951 
1952 /*
1953  * Try to find a tuple received from the publication side (in 'remoteslot') in
1954  * the corresponding local relation using either replica identity index,
1955  * primary key or if needed, sequential scan.
1956  *
1957  * Local tuple, if found, is returned in '*localslot'.
1958  */
1959 static bool
1961  LogicalRepRelation *remoterel,
1962  TupleTableSlot *remoteslot,
1963  TupleTableSlot **localslot)
1964 {
1965  Oid idxoid;
1966  bool found;
1967 
1968  *localslot = table_slot_create(localrel, &estate->es_tupleTable);
1969 
1970  idxoid = GetRelationIdentityOrPK(localrel);
1971  Assert(OidIsValid(idxoid) ||
1972  (remoterel->replident == REPLICA_IDENTITY_FULL));
1973 
1974  if (OidIsValid(idxoid))
1975  found = RelationFindReplTupleByIndex(localrel, idxoid,
1977  remoteslot, *localslot);
1978  else
1979  found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
1980  remoteslot, *localslot);
1981 
1982  return found;
1983 }
1984 
1985 /*
1986  * This handles insert, update, delete on a partitioned table.
1987  */
1988 static void
1990  TupleTableSlot *remoteslot,
1991  LogicalRepTupleData *newtup,
1992  CmdType operation)
1993 {
1994  EState *estate = edata->estate;
1995  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
1996  ResultRelInfo *relinfo = edata->targetRelInfo;
1997  Relation parentrel = relinfo->ri_RelationDesc;
1998  ModifyTableState *mtstate;
1999  PartitionTupleRouting *proute;
2000  ResultRelInfo *partrelinfo;
2001  Relation partrel;
2002  TupleTableSlot *remoteslot_part;
2003  TupleConversionMap *map;
2004  MemoryContext oldctx;
2005 
2006  /* ModifyTableState is needed for ExecFindPartition(). */
2007  edata->mtstate = mtstate = makeNode(ModifyTableState);
2008  mtstate->ps.plan = NULL;
2009  mtstate->ps.state = estate;
2010  mtstate->operation = operation;
2011  mtstate->resultRelInfo = relinfo;
2012 
2013  /* ... as is PartitionTupleRouting. */
2014  edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
2015 
2016  /*
2017  * Find the partition to which the "search tuple" belongs.
2018  */
2019  Assert(remoteslot != NULL);
2021  partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
2022  remoteslot, estate);
2023  Assert(partrelinfo != NULL);
2024  partrel = partrelinfo->ri_RelationDesc;
2025 
2026  /*
2027  * To perform any of the operations below, the tuple must match the
2028  * partition's rowtype. Convert if needed or just copy, using a dedicated
2029  * slot to store the tuple in any case.
2030  */
2031  remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
2032  if (remoteslot_part == NULL)
2033  remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
2034  map = partrelinfo->ri_RootToPartitionMap;
2035  if (map != NULL)
2036  remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
2037  remoteslot_part);
2038  else
2039  {
2040  remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
2041  slot_getallattrs(remoteslot_part);
2042  }
2043  MemoryContextSwitchTo(oldctx);
2044 
2045  switch (operation)
2046  {
2047  case CMD_INSERT:
2048  apply_handle_insert_internal(edata, partrelinfo,
2049  remoteslot_part);
2050  break;
2051 
2052  case CMD_DELETE:
2053  apply_handle_delete_internal(edata, partrelinfo,
2054  remoteslot_part);
2055  break;
2056 
2057  case CMD_UPDATE:
2058 
2059  /*
2060  * For UPDATE, depending on whether or not the updated tuple
2061  * satisfies the partition's constraint, perform a simple UPDATE
2062  * of the partition or move the updated tuple into a different
2063  * suitable partition.
2064  */
2065  {
2066  AttrMap *attrmap = map ? map->attrMap : NULL;
2067  LogicalRepRelMapEntry *part_entry;
2068  TupleTableSlot *localslot;
2069  ResultRelInfo *partrelinfo_new;
2070  bool found;
2071 
2072  part_entry = logicalrep_partition_open(relmapentry, partrel,
2073  attrmap);
2074 
2075  /* Get the matching local tuple from the partition. */
2076  found = FindReplTupleInLocalRel(estate, partrel,
2077  &part_entry->remoterel,
2078  remoteslot_part, &localslot);
2079  if (!found)
2080  {
2081  /*
2082  * The tuple to be updated could not be found. Do nothing
2083  * except for emitting a log message.
2084  *
2085  * XXX should this be promoted to ereport(LOG) perhaps?
2086  */
2087  elog(DEBUG1,
2088  "logical replication did not find row to be updated "
2089  "in replication target relation's partition \"%s\"",
2090  RelationGetRelationName(partrel));
2091  return;
2092  }
2093 
2094  /*
2095  * Apply the update to the local tuple, putting the result in
2096  * remoteslot_part.
2097  */
2099  slot_modify_data(remoteslot_part, localslot, part_entry,
2100  newtup);
2101  MemoryContextSwitchTo(oldctx);
2102 
2103  /*
2104  * Does the updated tuple still satisfy the current
2105  * partition's constraint?
2106  */
2107  if (!partrel->rd_rel->relispartition ||
2108  ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
2109  false))
2110  {
2111  /*
2112  * Yes, so simply UPDATE the partition. We don't call
2113  * apply_handle_update_internal() here, which would
2114  * normally do the following work, to avoid repeating some
2115  * work already done above to find the local tuple in the
2116  * partition.
2117  */
2118  EPQState epqstate;
2119 
2120  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
2121  ExecOpenIndices(partrelinfo, false);
2122 
2123  EvalPlanQualSetSlot(&epqstate, remoteslot_part);
2124  ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
2125  localslot, remoteslot_part);
2126  ExecCloseIndices(partrelinfo);
2127  EvalPlanQualEnd(&epqstate);
2128  }
2129  else
2130  {
2131  /* Move the tuple into the new partition. */
2132 
2133  /*
2134  * New partition will be found using tuple routing, which
2135  * can only occur via the parent table. We might need to
2136  * convert the tuple to the parent's rowtype. Note that
2137  * this is the tuple found in the partition, not the
2138  * original search tuple received by this function.
2139  */
2140  if (map)
2141  {
2142  TupleConversionMap *PartitionToRootMap =
2144  RelationGetDescr(parentrel));
2145 
2146  remoteslot =
2147  execute_attr_map_slot(PartitionToRootMap->attrMap,
2148  remoteslot_part, remoteslot);
2149  }
2150  else
2151  {
2152  remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
2153  slot_getallattrs(remoteslot);
2154  }
2155 
2156 
2157  /* Find the new partition. */
2159  partrelinfo_new = ExecFindPartition(mtstate, relinfo,
2160  proute, remoteslot,
2161  estate);
2162  MemoryContextSwitchTo(oldctx);
2163  Assert(partrelinfo_new != partrelinfo);
2164 
2165  /* DELETE old tuple found in the old partition. */
2166  apply_handle_delete_internal(edata, partrelinfo,
2167  localslot);
2168 
2169  /* INSERT new tuple into the new partition. */
2170 
2171  /*
2172  * Convert the replacement tuple to match the destination
2173  * partition rowtype.
2174  */
2176  partrel = partrelinfo_new->ri_RelationDesc;
2177  remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
2178  if (remoteslot_part == NULL)
2179  remoteslot_part = table_slot_create(partrel,
2180  &estate->es_tupleTable);
2181  map = partrelinfo_new->ri_RootToPartitionMap;
2182  if (map != NULL)
2183  {
2184  remoteslot_part = execute_attr_map_slot(map->attrMap,
2185  remoteslot,
2186  remoteslot_part);
2187  }
2188  else
2189  {
2190  remoteslot_part = ExecCopySlot(remoteslot_part,
2191  remoteslot);
2192  slot_getallattrs(remoteslot);
2193  }
2194  MemoryContextSwitchTo(oldctx);
2195  apply_handle_insert_internal(edata, partrelinfo_new,
2196  remoteslot_part);
2197  }
2198  }
2199  break;
2200 
2201  default:
2202  elog(ERROR, "unrecognized CmdType: %d", (int) operation);
2203  break;
2204  }
2205 }
2206 
2207 /*
2208  * Handle TRUNCATE message.
2209  *
2210  * TODO: FDW support
2211  */
2212 static void
2214 {
2215  bool cascade = false;
2216  bool restart_seqs = false;
2217  List *remote_relids = NIL;
2218  List *remote_rels = NIL;
2219  List *rels = NIL;
2220  List *part_rels = NIL;
2221  List *relids = NIL;
2222  List *relids_logged = NIL;
2223  ListCell *lc;
2224  LOCKMODE lockmode = AccessExclusiveLock;
2225 
2227  return;
2228 
2230 
2231  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
2232 
2233  foreach(lc, remote_relids)
2234  {
2235  LogicalRepRelId relid = lfirst_oid(lc);
2236  LogicalRepRelMapEntry *rel;
2237 
2238  rel = logicalrep_rel_open(relid, lockmode);
2239  if (!should_apply_changes_for_rel(rel))
2240  {
2241  /*
2242  * The relation can't become interesting in the middle of the
2243  * transaction so it's safe to unlock it.
2244  */
2245  logicalrep_rel_close(rel, lockmode);
2246  continue;
2247  }
2248 
2249  remote_rels = lappend(remote_rels, rel);
2250  rels = lappend(rels, rel->localrel);
2251  relids = lappend_oid(relids, rel->localreloid);
2253  relids_logged = lappend_oid(relids_logged, rel->localreloid);
2254 
2255  /*
2256  * Truncate partitions if we got a message to truncate a partitioned
2257  * table.
2258  */
2259  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2260  {
2261  ListCell *child;
2262  List *children = find_all_inheritors(rel->localreloid,
2263  lockmode,
2264  NULL);
2265 
2266  foreach(child, children)
2267  {
2268  Oid childrelid = lfirst_oid(child);
2269  Relation childrel;
2270 
2271  if (list_member_oid(relids, childrelid))
2272  continue;
2273 
2274  /* find_all_inheritors already got lock */
2275  childrel = table_open(childrelid, NoLock);
2276 
2277  /*
2278  * Ignore temp tables of other backends. See similar code in
2279  * ExecuteTruncate().
2280  */
2281  if (RELATION_IS_OTHER_TEMP(childrel))
2282  {
2283  table_close(childrel, lockmode);
2284  continue;
2285  }
2286 
2287  rels = lappend(rels, childrel);
2288  part_rels = lappend(part_rels, childrel);
2289  relids = lappend_oid(relids, childrelid);
2290  /* Log this relation only if needed for logical decoding */
2291  if (RelationIsLogicallyLogged(childrel))
2292  relids_logged = lappend_oid(relids_logged, childrelid);
2293  }
2294  }
2295  }
2296 
2297  /*
2298  * Even if we used CASCADE on the upstream primary we explicitly default
2299  * to replaying changes without further cascading. This might be later
2300  * changeable with a user specified option.
2301  */
2302  ExecuteTruncateGuts(rels,
2303  relids,
2304  relids_logged,
2305  DROP_RESTRICT,
2306  restart_seqs);
2307  foreach(lc, remote_rels)
2308  {
2309  LogicalRepRelMapEntry *rel = lfirst(lc);
2310 
2312  }
2313  foreach(lc, part_rels)
2314  {
2315  Relation rel = lfirst(lc);
2316 
2317  table_close(rel, NoLock);
2318  }
2319 
2321 }
2322 
2323 
2324 /*
2325  * Logical replication protocol message dispatcher.
2326  */
2327 static void
2329 {
2331 
2332  switch (action)
2333  {
2334  case LOGICAL_REP_MSG_BEGIN:
2335  apply_handle_begin(s);
2336  return;
2337 
2340  return;
2341 
2344  return;
2345 
2348  return;
2349 
2352  return;
2353 
2356  return;
2357 
2360  return;
2361 
2362  case LOGICAL_REP_MSG_TYPE:
2363  apply_handle_type(s);
2364  return;
2365 
2368  return;
2369 
2371 
2372  /*
2373  * Logical replication does not use generic logical messages yet.
2374  * Although, it could be used by other applications that use this
2375  * output plugin.
2376  */
2377  return;
2378 
2381  return;
2382 
2385  return;
2386 
2389  return;
2390 
2393  return;
2394 
2397  return;
2398 
2401  return;
2402 
2405  return;
2406 
2409  return;
2410 
2413  return;
2414  }
2415 
2416  ereport(ERROR,
2417  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2418  errmsg_internal("invalid logical replication message type \"%c\"",
2419  action)));
2420 }
2421 
2422 /*
2423  * Figure out which write/flush positions to report to the walsender process.
2424  *
2425  * We can't simply report back the last LSN the walsender sent us because the
2426  * local transaction might not yet be flushed to disk locally. Instead we
2427  * build a list that associates local with remote LSNs for every commit. When
2428  * reporting back the flush position to the sender we iterate that list and
2429  * check which entries on it are already locally flushed. Those we can report
2430  * as having been flushed.
2431  *
2432  * The have_pending_txes is true if there are outstanding transactions that
2433  * need to be flushed.
2434  */
2435 static void
2437  bool *have_pending_txes)
2438 {
2439  dlist_mutable_iter iter;
2440  XLogRecPtr local_flush = GetFlushRecPtr();
2441 
2442  *write = InvalidXLogRecPtr;
2443  *flush = InvalidXLogRecPtr;
2444 
2445  dlist_foreach_modify(iter, &lsn_mapping)
2446  {
2447  FlushPosition *pos =
2449 
2450  *write = pos->remote_end;
2451 
2452  if (pos->local_end <= local_flush)
2453  {
2454  *flush = pos->remote_end;
2455  dlist_delete(iter.cur);
2456  pfree(pos);
2457  }
2458  else
2459  {
2460  /*
2461  * Don't want to uselessly iterate over the rest of the list which
2462  * could potentially be long. Instead get the last element and
2463  * grab the write position from there.
2464  */
2466  &lsn_mapping);
2467  *write = pos->remote_end;
2468  *have_pending_txes = true;
2469  return;
2470  }
2471  }
2472 
2473  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
2474 }
2475 
2476 /*
2477  * Store current remote/local lsn pair in the tracking list.
2478  */
2479 static void
2481 {
2482  FlushPosition *flushpos;
2483 
2484  /* Need to do this in permanent context */
2485  MemoryContextSwitchTo(ApplyContext);
2486 
2487  /* Track commit lsn */
2488  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
2489  flushpos->local_end = XactLastCommitEnd;
2490  flushpos->remote_end = remote_lsn;
2491 
2492  dlist_push_tail(&lsn_mapping, &flushpos->node);
2493  MemoryContextSwitchTo(ApplyMessageContext);
2494 }
2495 
2496 
2497 /* Update statistics of the worker. */
2498 static void
2499 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
2500 {
2501  MyLogicalRepWorker->last_lsn = last_lsn;
2502  MyLogicalRepWorker->last_send_time = send_time;
2504  if (reply)
2505  {
2506  MyLogicalRepWorker->reply_lsn = last_lsn;
2507  MyLogicalRepWorker->reply_time = send_time;
2508  }
2509 }
2510 
2511 /*
2512  * Apply main loop.
2513  */
2514 static void
2516 {
2517  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
2518  bool ping_sent = false;
2519  TimeLineID tli;
2520 
2521  /*
2522  * Init the ApplyMessageContext which we clean up after each replication
2523  * protocol message.
2524  */
2525  ApplyMessageContext = AllocSetContextCreate(ApplyContext,
2526  "ApplyMessageContext",
2528 
2529  /*
2530  * This memory context is used for per-stream data when the streaming mode
2531  * is enabled. This context is reset on each stream stop.
2532  */
2533  LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
2534  "LogicalStreamingContext",
2536 
2537  /* mark as idle, before starting to loop */
2539 
2540  /* This outer loop iterates once per wait. */
2541  for (;;)
2542  {
2544  int rc;
2545  int len;
2546  char *buf = NULL;
2547  bool endofstream = false;
2548  long wait_time;
2549 
2551 
2552  MemoryContextSwitchTo(ApplyMessageContext);
2553 
2554  len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
2555 
2556  if (len != 0)
2557  {
2558  /* Loop to process all available data (without blocking). */
2559  for (;;)
2560  {
2562 
2563  if (len == 0)
2564  {
2565  break;
2566  }
2567  else if (len < 0)
2568  {
2569  ereport(LOG,
2570  (errmsg("data stream from publisher has ended")));
2571  endofstream = true;
2572  break;
2573  }
2574  else
2575  {
2576  int c;
2577  StringInfoData s;
2578 
2579  /* Reset timeout. */
2580  last_recv_timestamp = GetCurrentTimestamp();
2581  ping_sent = false;
2582 
2583  /* Ensure we are reading the data into our memory context. */
2584  MemoryContextSwitchTo(ApplyMessageContext);
2585 
2586  s.data = buf;
2587  s.len = len;
2588  s.cursor = 0;
2589  s.maxlen = -1;
2590 
2591  c = pq_getmsgbyte(&s);
2592 
2593  if (c == 'w')
2594  {
2595  XLogRecPtr start_lsn;
2596  XLogRecPtr end_lsn;
2597  TimestampTz send_time;
2598 
2599  start_lsn = pq_getmsgint64(&s);
2600  end_lsn = pq_getmsgint64(&s);
2601  send_time = pq_getmsgint64(&s);
2602 
2603  if (last_received < start_lsn)
2604  last_received = start_lsn;
2605 
2606  if (last_received < end_lsn)
2607  last_received = end_lsn;
2608 
2609  UpdateWorkerStats(last_received, send_time, false);
2610 
2611  apply_dispatch(&s);
2612  }
2613  else if (c == 'k')
2614  {
2615  XLogRecPtr end_lsn;
2617  bool reply_requested;
2618 
2619  end_lsn = pq_getmsgint64(&s);
2620  timestamp = pq_getmsgint64(&s);
2621  reply_requested = pq_getmsgbyte(&s);
2622 
2623  if (last_received < end_lsn)
2624  last_received = end_lsn;
2625 
2626  send_feedback(last_received, reply_requested, false);
2627  UpdateWorkerStats(last_received, timestamp, true);
2628  }
2629  /* other message types are purposefully ignored */
2630 
2631  MemoryContextReset(ApplyMessageContext);
2632  }
2633 
2634  len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
2635  }
2636  }
2637 
2638  /* confirm all writes so far */
2639  send_feedback(last_received, false, false);
2640 
2642  {
2643  /*
2644  * If we didn't get any transactions for a while there might be
2645  * unconsumed invalidation messages in the queue, consume them
2646  * now.
2647  */
2650 
2651  /* Process any table synchronization changes. */
2652  process_syncing_tables(last_received);
2653  }
2654 
2655  /* Cleanup the memory. */
2656  MemoryContextResetAndDeleteChildren(ApplyMessageContext);
2658 
2659  /* Check if we need to exit the streaming loop. */
2660  if (endofstream)
2661  break;
2662 
2663  /*
2664  * Wait for more data or latch. If we have unflushed transactions,
2665  * wake up after WalWriterDelay to see if they've been flushed yet (in
2666  * which case we should send a feedback message). Otherwise, there's
2667  * no particular urgency about waking up unless we get data or a
2668  * signal.
2669  */
2670  if (!dlist_is_empty(&lsn_mapping))
2671  wait_time = WalWriterDelay;
2672  else
2673  wait_time = NAPTIME_PER_CYCLE;
2674 
2678  fd, wait_time,
2680 
2681  if (rc & WL_LATCH_SET)
2682  {
2685  }
2686 
2687  if (ConfigReloadPending)
2688  {
2689  ConfigReloadPending = false;
2691  }
2692 
2693  if (rc & WL_TIMEOUT)
2694  {
2695  /*
2696  * We didn't receive anything new. If we haven't heard anything
2697  * from the server for more than wal_receiver_timeout / 2, ping
2698  * the server. Also, if it's been longer than
2699  * wal_receiver_status_interval since the last update we sent,
2700  * send a status update to the primary anyway, to report any
2701  * progress in applying WAL.
2702  */
2703  bool requestReply = false;
2704 
2705  /*
2706  * Check if time since last receive from primary has reached the
2707  * configured limit.
2708  */
2709  if (wal_receiver_timeout > 0)
2710  {
2712  TimestampTz timeout;
2713 
2714  timeout =
2715  TimestampTzPlusMilliseconds(last_recv_timestamp,
2717 
2718  if (now >= timeout)
2719  ereport(ERROR,
2720  (errcode(ERRCODE_CONNECTION_FAILURE),
2721  errmsg("terminating logical replication worker due to timeout")));
2722 
2723  /* Check to see if it's time for a ping. */
2724  if (!ping_sent)
2725  {
2726  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
2727  (wal_receiver_timeout / 2));
2728  if (now >= timeout)
2729  {
2730  requestReply = true;
2731  ping_sent = true;
2732  }
2733  }
2734  }
2735 
2736  send_feedback(last_received, requestReply, requestReply);
2737  }
2738  }
2739 
2740  /* All done */
2741  walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
2742 }
2743 
2744 /*
2745  * Send a Standby Status Update message to server.
2746  *
2747  * 'recvpos' is the latest LSN we've received data to, force is set if we need
2748  * to send a response to avoid timeouts.
2749  */
2750 static void
2751 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
2752 {
2753  static StringInfo reply_message = NULL;
2754  static TimestampTz send_time = 0;
2755 
2756  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
2757  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
2758  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
2759 
2760  XLogRecPtr writepos;
2761  XLogRecPtr flushpos;
2762  TimestampTz now;
2763  bool have_pending_txes;
2764 
2765  /*
2766  * If the user doesn't want status to be reported to the publisher, be
2767  * sure to exit before doing anything at all.
2768  */
2769  if (!force && wal_receiver_status_interval <= 0)
2770  return;
2771 
2772  /* It's legal to not pass a recvpos */
2773  if (recvpos < last_recvpos)
2774  recvpos = last_recvpos;
2775 
2776  get_flush_position(&writepos, &flushpos, &have_pending_txes);
2777 
2778  /*
2779  * No outstanding transactions to flush, we can report the latest received
2780  * position. This is important for synchronous replication.
2781  */
2782  if (!have_pending_txes)
2783  flushpos = writepos = recvpos;
2784 
2785  if (writepos < last_writepos)
2786  writepos = last_writepos;
2787 
2788  if (flushpos < last_flushpos)
2789  flushpos = last_flushpos;
2790 
2791  now = GetCurrentTimestamp();
2792 
2793  /* if we've already reported everything we're good */
2794  if (!force &&
2795  writepos == last_writepos &&
2796  flushpos == last_flushpos &&
2797  !TimestampDifferenceExceeds(send_time, now,
2799  return;
2800  send_time = now;
2801 
2802  if (!reply_message)
2803  {
2804  MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
2805 
2806  reply_message = makeStringInfo();
2807  MemoryContextSwitchTo(oldctx);
2808  }
2809  else
2810  resetStringInfo(reply_message);
2811 
2812  pq_sendbyte(reply_message, 'r');
2813  pq_sendint64(reply_message, recvpos); /* write */
2814  pq_sendint64(reply_message, flushpos); /* flush */
2815  pq_sendint64(reply_message, writepos); /* apply */
2816  pq_sendint64(reply_message, now); /* sendTime */
2817  pq_sendbyte(reply_message, requestReply); /* replyRequested */
2818 
2819  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
2820  force,
2821  LSN_FORMAT_ARGS(recvpos),
2822  LSN_FORMAT_ARGS(writepos),
2823  LSN_FORMAT_ARGS(flushpos));
2824 
2825  walrcv_send(LogRepWorkerWalRcvConn,
2826  reply_message->data, reply_message->len);
2827 
2828  if (recvpos > last_recvpos)
2829  last_recvpos = recvpos;
2830  if (writepos > last_writepos)
2831  last_writepos = writepos;
2832  if (flushpos > last_flushpos)
2833  last_flushpos = flushpos;
2834 }
2835 
2836 /*
2837  * Reread subscription info if needed. Most changes will be exit.
2838  */
2839 static void
2841 {
2842  MemoryContext oldctx;
2844  bool started_tx = false;
2845 
2846  /* When cache state is valid there is nothing to do here. */
2847  if (MySubscriptionValid)
2848  return;
2849 
2850  /* This function might be called inside or outside of transaction. */
2851  if (!IsTransactionState())
2852  {
2854  started_tx = true;
2855  }
2856 
2857  /* Ensure allocations in permanent context. */
2858  oldctx = MemoryContextSwitchTo(ApplyContext);
2859 
2860  newsub = GetSubscription(MyLogicalRepWorker->subid, true);
2861 
2862  /*
2863  * Exit if the subscription was removed. This normally should not happen
2864  * as the worker gets killed during DROP SUBSCRIPTION.
2865  */
2866  if (!newsub)
2867  {
2868  ereport(LOG,
2869  (errmsg("logical replication apply worker for subscription \"%s\" will "
2870  "stop because the subscription was removed",
2871  MySubscription->name)));
2872 
2873  proc_exit(0);
2874  }
2875 
2876  /*
2877  * Exit if the subscription was disabled. This normally should not happen
2878  * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
2879  */
2880  if (!newsub->enabled)
2881  {
2882  ereport(LOG,
2883  (errmsg("logical replication apply worker for subscription \"%s\" will "
2884  "stop because the subscription was disabled",
2885  MySubscription->name)));
2886 
2887  proc_exit(0);
2888  }
2889 
2890  /* !slotname should never happen when enabled is true. */
2891  Assert(newsub->slotname);
2892 
2893  /* two-phase should not be altered */
2894  Assert(newsub->twophasestate == MySubscription->twophasestate);
2895 
2896  /*
2897  * Exit if any parameter that affects the remote connection was changed.
2898  * The launcher will start a new worker.
2899  */
2900  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
2901  strcmp(newsub->name, MySubscription->name) != 0 ||
2902  strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
2903  newsub->binary != MySubscription->binary ||
2904  newsub->stream != MySubscription->stream ||
2905  !equal(newsub->publications, MySubscription->publications))
2906  {
2907  ereport(LOG,
2908  (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
2909  MySubscription->name)));
2910 
2911  proc_exit(0);
2912  }
2913 
2914  /* Check for other changes that should never happen too. */
2915  if (newsub->dbid != MySubscription->dbid)
2916  {
2917  elog(ERROR, "subscription %u changed unexpectedly",
2919  }
2920 
2921  /* Clean old subscription info and switch to new one. */
2922  FreeSubscription(MySubscription);
2923  MySubscription = newsub;
2924 
2925  MemoryContextSwitchTo(oldctx);
2926 
2927  /* Change synchronous commit according to the user's wishes */
2928  SetConfigOption("synchronous_commit", MySubscription->synccommit,
2930 
2931  if (started_tx)
2933 
2934  MySubscriptionValid = true;
2935 }
2936 
2937 /*
2938  * Callback from subscription syscache invalidation.
2939  */
2940 static void
2941 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
2942 {
2943  MySubscriptionValid = false;
2944 }
2945 
2946 /*
2947  * subxact_info_write
2948  * Store information about subxacts for a toplevel transaction.
2949  *
2950  * For each subxact we store offset of it's first change in the main file.
2951  * The file is always over-written as a whole.
2952  *
2953  * XXX We should only store subxacts that were not aborted yet.
2954  */
2955 static void
2957 {
2958  char path[MAXPGPATH];
2959  Size len;
2960  StreamXidHash *ent;
2961  BufFile *fd;
2962 
2964 
2965  /* Find the xid entry in the xidhash */
2966  ent = (StreamXidHash *) hash_search(xidhash,
2967  (void *) &xid,
2968  HASH_FIND,
2969  NULL);
2970  /* By this time we must have created the transaction entry */
2971  Assert(ent);
2972 
2973  /*
2974  * If there is no subtransaction then nothing to do, but if already have
2975  * subxact file then delete that.
2976  */
2977  if (subxact_data.nsubxacts == 0)
2978  {
2979  if (ent->subxact_fileset)
2980  {
2983  pfree(ent->subxact_fileset);
2984  ent->subxact_fileset = NULL;
2985  }
2986  return;
2987  }
2988 
2989  subxact_filename(path, subid, xid);
2990 
2991  /*
2992  * Create the subxact file if it not already created, otherwise open the
2993  * existing file.
2994  */
2995  if (ent->subxact_fileset == NULL)
2996  {
2997  MemoryContext oldctx;
2998 
2999  /*
3000  * We need to maintain shared fileset across multiple stream
3001  * start/stop calls. So, need to allocate it in a persistent context.
3002  */
3003  oldctx = MemoryContextSwitchTo(ApplyContext);
3004  ent->subxact_fileset = palloc(sizeof(SharedFileSet));
3005  SharedFileSetInit(ent->subxact_fileset, NULL);
3006  MemoryContextSwitchTo(oldctx);
3007 
3008  fd = BufFileCreateShared(ent->subxact_fileset, path);
3009  }
3010  else
3011  fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR);
3012 
3013  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
3014 
3015  /* Write the subxact count and subxact info */
3016  BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
3017  BufFileWrite(fd, subxact_data.subxacts, len);
3018 
3019  BufFileClose(fd);
3020 
3021  /* free the memory allocated for subxact info */
3023 }
3024 
3025 /*
3026  * subxact_info_read
3027  * Restore information about subxacts of a streamed transaction.
3028  *
3029  * Read information about subxacts into the structure subxact_data that can be
3030  * used later.
3031  */
3032 static void
3034 {
3035  char path[MAXPGPATH];
3036  Size len;
3037  BufFile *fd;
3038  StreamXidHash *ent;
3039  MemoryContext oldctx;
3040 
3041  Assert(!subxact_data.subxacts);
3042  Assert(subxact_data.nsubxacts == 0);
3043  Assert(subxact_data.nsubxacts_max == 0);
3044 
3045  /* Find the stream xid entry in the xidhash */
3046  ent = (StreamXidHash *) hash_search(xidhash,
3047  (void *) &xid,
3048  HASH_FIND,
3049  NULL);
3050  if (!ent)
3051  ereport(ERROR,
3052  (errcode(ERRCODE_PROTOCOL_VIOLATION),
3053  errmsg_internal("transaction %u not found in stream XID hash table",
3054  xid)));
3055 
3056  /*
3057  * If subxact_fileset is not valid that mean we don't have any subxact
3058  * info
3059  */
3060  if (ent->subxact_fileset == NULL)
3061  return;
3062 
3063  subxact_filename(path, subid, xid);
3064 
3065  fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDONLY);
3066 
3067  /* read number of subxact items */
3068  if (BufFileRead(fd, &subxact_data.nsubxacts,
3069  sizeof(subxact_data.nsubxacts)) !=
3070  sizeof(subxact_data.nsubxacts))
3071  ereport(ERROR,
3073  errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
3074  path)));
3075 
3076  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
3077 
3078  /* we keep the maximum as a power of 2 */
3079  subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts);
3080 
3081  /*
3082  * Allocate subxact information in the logical streaming context. We need
3083  * this information during the complete stream so that we can add the sub
3084  * transaction info to this. On stream stop we will flush this information
3085  * to the subxact file and reset the logical streaming context.
3086  */
3087  oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
3088  subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
3089  sizeof(SubXactInfo));
3090  MemoryContextSwitchTo(oldctx);
3091 
3092  if ((len > 0) && ((BufFileRead(fd, subxact_data.subxacts, len)) != len))
3093  ereport(ERROR,
3095  errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
3096  path)));
3097 
3098  BufFileClose(fd);
3099 }
3100 
3101 /*
3102  * subxact_info_add
3103  * Add information about a subxact (offset in the main file).
3104  */
3105 static void
3107 {
3108  SubXactInfo *subxacts = subxact_data.subxacts;
3109  int64 i;
3110 
3111  /* We must have a valid top level stream xid and a stream fd. */
3113  Assert(stream_fd != NULL);
3114 
3115  /*
3116  * If the XID matches the toplevel transaction, we don't want to add it.
3117  */
3118  if (stream_xid == xid)
3119  return;
3120 
3121  /*
3122  * In most cases we're checking the same subxact as we've already seen in
3123  * the last call, so make sure to ignore it (this change comes later).
3124  */
3125  if (subxact_data.subxact_last == xid)
3126  return;
3127 
3128  /* OK, remember we're processing this XID. */
3129  subxact_data.subxact_last = xid;
3130 
3131  /*
3132  * Check if the transaction is already present in the array of subxact. We
3133  * intentionally scan the array from the tail, because we're likely adding
3134  * a change for the most recent subtransactions.
3135  *
3136  * XXX Can we rely on the subxact XIDs arriving in sorted order? That
3137  * would allow us to use binary search here.
3138  */
3139  for (i = subxact_data.nsubxacts; i > 0; i--)
3140  {
3141  /* found, so we're done */
3142  if (subxacts[i - 1].xid == xid)
3143  return;
3144  }
3145 
3146  /* This is a new subxact, so we need to add it to the array. */
3147  if (subxact_data.nsubxacts == 0)
3148  {
3149  MemoryContext oldctx;
3150 
3151  subxact_data.nsubxacts_max = 128;
3152 
3153  /*
3154  * Allocate this memory for subxacts in per-stream context, see
3155  * subxact_info_read.
3156  */
3157  oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
3158  subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
3159  MemoryContextSwitchTo(oldctx);
3160  }
3161  else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
3162  {
3163  subxact_data.nsubxacts_max *= 2;
3164  subxacts = repalloc(subxacts,
3165  subxact_data.nsubxacts_max * sizeof(SubXactInfo));
3166  }
3167 
3168  subxacts[subxact_data.nsubxacts].xid = xid;
3169 
3170  /*
3171  * Get the current offset of the stream file and store it as offset of
3172  * this subxact.
3173  */
3174  BufFileTell(stream_fd,
3175  &subxacts[subxact_data.nsubxacts].fileno,
3176  &subxacts[subxact_data.nsubxacts].offset);
3177 
3178  subxact_data.nsubxacts++;
3179  subxact_data.subxacts = subxacts;
3180 }
3181 
3182 /* format filename for file containing the info about subxacts */
3183 static inline void
3184 subxact_filename(char *path, Oid subid, TransactionId xid)
3185 {
3186  snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
3187 }
3188 
3189 /* format filename for file containing serialized changes */
3190 static inline void
3191 changes_filename(char *path, Oid subid, TransactionId xid)
3192 {
3193  snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
3194 }
3195 
3196 /*
3197  * stream_cleanup_files
3198  * Cleanup files for a subscription / toplevel transaction.
3199  *
3200  * Remove files with serialized changes and subxact info for a particular
3201  * toplevel transaction. Each subscription has a separate set of files.
3202  */
3203 static void
3205 {
3206  char path[MAXPGPATH];
3207  StreamXidHash *ent;
3208 
3209  /* Find the xid entry in the xidhash */
3210  ent = (StreamXidHash *) hash_search(xidhash,
3211  (void *) &xid,
3212  HASH_FIND,
3213  NULL);
3214  if (!ent)
3215  ereport(ERROR,
3216  (errcode(ERRCODE_PROTOCOL_VIOLATION),
3217  errmsg_internal("transaction %u not found in stream XID hash table",
3218  xid)));
3219 
3220  /* Delete the change file and release the stream fileset memory */
3221  changes_filename(path, subid, xid);
3223  pfree(ent->stream_fileset);
3224  ent->stream_fileset = NULL;
3225 
3226  /* Delete the subxact file and release the memory, if it exist */
3227  if (ent->subxact_fileset)
3228  {
3229  subxact_filename(path, subid, xid);
3231  pfree(ent->subxact_fileset);
3232  ent->subxact_fileset = NULL;
3233  }
3234 
3235  /* Remove the xid entry from the stream xid hash */
3236  hash_search(xidhash, (void *) &xid, HASH_REMOVE, NULL);
3237 }
3238 
3239 /*
3240  * stream_open_file
3241  * Open a file that we'll use to serialize changes for a toplevel
3242  * transaction.
3243  *
3244  * Open a file for streamed changes from a toplevel transaction identified
3245  * by stream_xid (global variable). If it's the first chunk of streamed
3246  * changes for this transaction, initialize the shared fileset and create the
3247  * buffile, otherwise open the previously created file.
3248  *
3249  * This can only be called at the beginning of a "streaming" block, i.e.
3250  * between stream_start/stream_stop messages from the upstream.
3251  */
3252 static void
3253 stream_open_file(Oid subid, TransactionId xid, bool first_segment)
3254 {
3255  char path[MAXPGPATH];
3256  bool found;
3257  MemoryContext oldcxt;
3258  StreamXidHash *ent;
3259 
3261  Assert(OidIsValid(subid));
3263  Assert(stream_fd == NULL);
3264 
3265  /* create or find the xid entry in the xidhash */
3266  ent = (StreamXidHash *) hash_search(xidhash,
3267  (void *) &xid,
3268  HASH_ENTER,
3269  &found);
3270 
3271  changes_filename(path, subid, xid);
3272  elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
3273 
3274  /*
3275  * Create/open the buffiles under the logical streaming context so that we
3276  * have those files until stream stop.
3277  */
3278  oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
3279 
3280  /*
3281  * If this is the first streamed segment, the file must not exist, so make
3282  * sure we're the ones creating it. Otherwise just open the file for
3283  * writing, in append mode.
3284  */
3285  if (first_segment)
3286  {
3287  MemoryContext savectx;
3288  SharedFileSet *fileset;
3289 
3290  if (found)
3291  ereport(ERROR,
3292  (errcode(ERRCODE_PROTOCOL_VIOLATION),
3293  errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
3294 
3295  /*
3296  * We need to maintain shared fileset across multiple stream
3297  * start/stop calls. So, need to allocate it in a persistent context.
3298  */
3299  savectx = MemoryContextSwitchTo(ApplyContext);
3300  fileset = palloc(sizeof(SharedFileSet));
3301 
3302  SharedFileSetInit(fileset, NULL);
3303  MemoryContextSwitchTo(savectx);
3304 
3305  stream_fd = BufFileCreateShared(fileset, path);
3306 
3307  /* Remember the fileset for the next stream of the same transaction */
3308  ent->xid = xid;
3309  ent->stream_fileset = fileset;
3310  ent->subxact_fileset = NULL;
3311  }
3312  else
3313  {
3314  if (!found)
3315  ereport(ERROR,
3316  (errcode(ERRCODE_PROTOCOL_VIOLATION),
3317  errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
3318 
3319  /*
3320  * Open the file and seek to the end of the file because we always
3321  * append the changes file.
3322  */
3323  stream_fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
3324  BufFileSeek(stream_fd, 0, 0, SEEK_END);
3325  }
3326 
3327  MemoryContextSwitchTo(oldcxt);
3328 }
3329 
3330 /*
3331  * stream_close_file
3332  * Close the currently open file with streamed changes.
3333  *
3334  * This can only be called at the end of a streaming block, i.e. at stream_stop
3335  * message from the upstream.
3336  */
3337 static void
3339 {
3342  Assert(stream_fd != NULL);
3343 
3344  BufFileClose(stream_fd);
3345 
3347  stream_fd = NULL;
3348 }
3349 
3350 /*
3351  * stream_write_change
3352  * Serialize a change to a file for the current toplevel transaction.
3353  *
3354  * The change is serialized in a simple format, with length (not including
3355  * the length), action code (identifying the message type) and message
3356  * contents (without the subxact TransactionId value).
3357  */
3358 static void
3360 {
3361  int len;
3362 
3365  Assert(stream_fd != NULL);
3366 
3367  /* total on-disk size, including the action type character */
3368  len = (s->len - s->cursor) + sizeof(char);
3369 
3370  /* first write the size */
3371  BufFileWrite(stream_fd, &len, sizeof(len));
3372 
3373  /* then the action */
3374  BufFileWrite(stream_fd, &action, sizeof(action));
3375 
3376  /* and finally the remaining part of the buffer (after the XID) */
3377  len = (s->len - s->cursor);
3378 
3379  BufFileWrite(stream_fd, &s->data[s->cursor], len);
3380 }
3381 
3382 /*
3383  * Cleanup the memory for subxacts and reset the related variables.
3384  */
3385 static inline void
3387 {
3388  if (subxact_data.subxacts)
3389  pfree(subxact_data.subxacts);
3390 
3391  subxact_data.subxacts = NULL;
3392  subxact_data.subxact_last = InvalidTransactionId;
3393  subxact_data.nsubxacts = 0;
3394  subxact_data.nsubxacts_max = 0;
3395 }
3396 
3397 /*
3398  * Form the prepared transaction GID for two_phase transactions.
3399  *
3400  * Return the GID in the supplied buffer.
3401  */
3402 static void
3403 TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
3404 {
3405  Assert(subid != InvalidRepOriginId);
3406 
3407  if (!TransactionIdIsValid(xid))
3408  ereport(ERROR,
3409  (errcode(ERRCODE_PROTOCOL_VIOLATION),
3410  errmsg_internal("invalid two-phase transaction ID")));
3411 
3412  snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
3413 }
3414 
3415 /* Logical Replication Apply worker entry point */
3416 void
3418 {
3419  int worker_slot = DatumGetInt32(main_arg);
3420  MemoryContext oldctx;
3421  char originname[NAMEDATALEN];
3422  XLogRecPtr origin_startpos;
3423  char *myslotname;
3425  int server_version;
3426 
3427  /* Attach to slot */
3428  logicalrep_worker_attach(worker_slot);
3429 
3430  /* Setup signal handling */
3432  pqsignal(SIGTERM, die);
3434 
3435  /*
3436  * We don't currently need any ResourceOwner in a walreceiver process, but
3437  * if we did, we could call CreateAuxProcessResourceOwner here.
3438  */
3439 
3440  /* Initialise stats to a sanish value */
3443 
3444  /* Load the libpq-specific functions */
3445  load_file("libpqwalreceiver", false);
3446 
3447  /* Run as replica session replication role. */
3448  SetConfigOption("session_replication_role", "replica",
3450 
3451  /* Connect to our database. */
3454  0);
3455 
3456  /*
3457  * Set always-secure search path, so malicious users can't redirect user
3458  * code (e.g. pg_index.indexprs).
3459  */
3460  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
3461 
3462  /* Load the subscription into persistent memory context. */
3463  ApplyContext = AllocSetContextCreate(TopMemoryContext,
3464  "ApplyContext",
3467  oldctx = MemoryContextSwitchTo(ApplyContext);
3468 
3469  MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
3470  if (!MySubscription)
3471  {
3472  ereport(LOG,
3473  (errmsg("logical replication apply worker for subscription %u will not "
3474  "start because the subscription was removed during startup",
3476  proc_exit(0);
3477  }
3478 
3479  MySubscriptionValid = true;
3480  MemoryContextSwitchTo(oldctx);
3481 
3482  if (!MySubscription->enabled)
3483  {
3484  ereport(LOG,
3485  (errmsg("logical replication apply worker for subscription \"%s\" will not "
3486  "start because the subscription was disabled during startup",
3487  MySubscription->name)));
3488 
3489  proc_exit(0);
3490  }
3491 
3492  /* Setup synchronous commit according to the user's wishes */
3493  SetConfigOption("synchronous_commit", MySubscription->synccommit,
3495 
3496  /* Keep us informed about subscription changes. */
3499  (Datum) 0);
3500 
3501  if (am_tablesync_worker())
3502  ereport(LOG,
3503  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
3504  MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
3505  else
3506  ereport(LOG,
3507  (errmsg("logical replication apply worker for subscription \"%s\" has started",
3508  MySubscription->name)));
3509 
3511 
3512  /* Connect to the origin and start the replication. */
3513  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
3514  MySubscription->conninfo);
3515 
3516  if (am_tablesync_worker())
3517  {
3518  char *syncslotname;
3519 
3520  /* This is table synchronization worker, call initial sync. */
3521  syncslotname = LogicalRepSyncTableStart(&origin_startpos);
3522 
3523  /* allocate slot name in long-lived context */
3524  myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
3525 
3526  pfree(syncslotname);
3527  }
3528  else
3529  {
3530  /* This is main apply worker */
3531  RepOriginId originid;
3532  TimeLineID startpointTLI;
3533  char *err;
3534 
3535  myslotname = MySubscription->slotname;
3536 
3537  /*
3538  * This shouldn't happen if the subscription is enabled, but guard
3539  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
3540  * crash if slot is NULL.)
3541  */
3542  if (!myslotname)
3543  ereport(ERROR,
3544  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3545  errmsg("subscription has no replication slot set")));
3546 
3547  /* Setup replication origin tracking. */
3549  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
3550  originid = replorigin_by_name(originname, true);
3551  if (!OidIsValid(originid))
3552  originid = replorigin_create(originname);
3553  replorigin_session_setup(originid);
3554  replorigin_session_origin = originid;
3555  origin_startpos = replorigin_session_get_progress(false);
3557 
3558  LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
3559  MySubscription->name, &err);
3560  if (LogRepWorkerWalRcvConn == NULL)
3561  ereport(ERROR,
3562  (errcode(ERRCODE_CONNECTION_FAILURE),
3563  errmsg("could not connect to the publisher: %s", err)));
3564 
3565  /*
3566  * We don't really use the output identify_system for anything but it
3567  * does some initializations on the upstream so let's still call it.
3568  */
3569  (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
3570  }
3571 
3572  /*
3573  * Setup callback for syscache so that we know when something changes in
3574  * the subscription relation state.
3575  */
3578  (Datum) 0);
3579 
3580  /* Build logical replication streaming options. */
3581  options.logical = true;
3582  options.startpoint = origin_startpos;
3583  options.slotname = myslotname;
3584 
3585  server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
3586  options.proto.logical.proto_version =
3587  server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
3588  server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
3590 
3591  options.proto.logical.publication_names = MySubscription->publications;
3592  options.proto.logical.binary = MySubscription->binary;
3593  options.proto.logical.streaming = MySubscription->stream;
3594  options.proto.logical.twophase = false;
3595 
3596  if (!am_tablesync_worker())
3597  {
3598  /*
3599  * Even when the two_phase mode is requested by the user, it remains
3600  * as the tri-state PENDING until all tablesyncs have reached READY
3601  * state. Only then, can it become ENABLED.
3602  *
3603  * Note: If the subscription has no tables then leave the state as
3604  * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
3605  * work.
3606  */
3607  if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
3609  {
3610  /* Start streaming with two_phase enabled */
3611  options.proto.logical.twophase = true;
3612  walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
3613 
3618  }
3619  else
3620  {
3621  walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
3622  }
3623 
3624  ereport(DEBUG1,
3625  (errmsg("logical replication apply worker for subscription \"%s\" two_phase is %s.",
3626  MySubscription->name,
3627  MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
3628  MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
3629  MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
3630  "?")));
3631  }
3632  else
3633  {
3634  /* Start normal logical streaming replication. */
3635  walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
3636  }
3637 
3638  /* Run the main loop. */
3639  LogicalRepApplyLoop(origin_startpos);
3640 
3641  proc_exit(0);
3642 }
3643 
3644 /*
3645  * Is current process a logical replication worker?
3646  */
3647 bool
3649 {
3650  return MyLogicalRepWorker != NULL;
3651 }
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
Subscription * MySubscription
Definition: worker.c:246
static void apply_handle_type(StringInfo s)
Definition: worker.c:1534
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:475
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:352
#define NIL
Definition: pg_list.h:65
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
Definition: worker.c:1989
void ExecInitRangeTable(EState *estate, List *rangeTable)
Definition: execUtils.c:751
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:36
static void stream_close_file(void)
Definition: worker.c:3338
static void stream_write_change(char action, StringInfo s)
Definition: worker.c:3359
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:1550
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:2751
static void subxact_info_add(TransactionId xid)
Definition: worker.c:3106
Relation ri_RelationDesc
Definition: execnodes.h:411
static void subxact_info_read(Oid subid, TransactionId xid)
Definition: worker.c:3033
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:447
#define AllocSetContextCreate
Definition: memutils.h:173
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:420
#define DEBUG1
Definition: elog.h:25
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
Definition: proto.c:131
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
dlist_node * cur
Definition: ilist.h:180
static void store_flush_position(XLogRecPtr remote_lsn)
Definition: worker.c:2480
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:244
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1831
uint32 TimeLineID
Definition: xlogdefs.h:59
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:548
int fileno
Definition: worker.c:269
MemoryContext TopTransactionContext
Definition: mcxt.c:53
void AcceptInvalidationMessages(void)
Definition: inval.c:688
CommandId es_output_cid
Definition: execnodes.h:572
PartitionTupleRouting * proute
Definition: worker.c:221
static XLogRecPtr remote_final_lsn
Definition: worker.c:250
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4782
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define WL_TIMEOUT
Definition: latch.h:128
void ProcessConfigFile(GucContext context)
static void apply_handle_insert(StringInfo s)
Definition: worker.c:1567
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:543
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
uint32 TransactionId
Definition: c.h:587
void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
Definition: sharedfileset.c:64
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup)
Definition: worker.c:1780
static dlist_head lsn_mapping
Definition: worker.c:204
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:1109
MemoryContext hcxt
Definition: hsearch.h:86
#define DatumGetInt32(X)
Definition: postgres.h:516
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:650
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:3122
#define RelationGetDescr(relation)
Definition: rel.h:503
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:587
int LOCKMODE
Definition: lockdefs.h:26
#define LOGICALREP_TWOPHASE_STATE_DISABLED
dlist_node node
Definition: worker.c:199
#define write(a, b, c)
Definition: win32.h:14
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:3191
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:1221
int64 timestamp
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:60
int64 TimestampTz
Definition: timestamp.h:39
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1192
LogicalRepRelMapEntry * rel
Definition: worker.c:208
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:412
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:602
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
void CommitTransactionCommand(void)
Definition: xact.c:2939
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:209
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:2499
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:345
void BufFileTruncateShared(BufFile *file, int fileno, off_t offset)
Definition: buffile.c:861
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:422
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
Definition: twophase.c:2478
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:683
Expr * expression_planner(Expr *expr)
Definition: planner.c:5653
#define walrcv_server_version(conn)
Definition: walreceiver.h:414
int maplen
Definition: attmap.h:37
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define AccessShareLock
Definition: lockdefs.h:36
TimestampTz last_send_time
uint16 RepOriginId
Definition: xlogdefs.h:65
Size entrysize
Definition: hsearch.h:76
XLogRecPtr last_lsn
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:1129
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:418
union WalRcvStreamOptions::@104 proto
bool PrepareTransactionBlock(const char *gid)
Definition: xact.c:3705
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1206
XLogRecPtr remote_end
Definition: worker.c:201
int errcode(int sqlerrcode)
Definition: elog.c:698
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:92
CmdType operation
Definition: execnodes.h:1188
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:240
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:433
Datum * tts_values
Definition: tuptable.h:126
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:143
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8563
LogicalRepRelMapEntry * targetRel
Definition: worker.c:216
void PopActiveSnapshot(void)
Definition: snapmgr.c:759
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:674
StringInfoData * colvalues
Definition: logicalproto.h:81
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:3184
void pgstat_report_stat(bool disconnect)
Definition: pgstat.c:843
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
EState * state
Definition: execnodes.h:968
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1071
List * es_range_table
Definition: execnodes.h:560
#define LOG
Definition: elog.h:26
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
Form_pg_class rd_rel
Definition: rel.h:109
unsigned int Oid
Definition: postgres_ext.h:31
TransactionId subxact_last
Definition: worker.c:278
struct SlotErrCallbackArg SlotErrCallbackArg
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:920
bool IsLogicalWorker(void)
Definition: worker.c:3648
void(* callback)(void *arg)
Definition: elog.h:247
int wal_receiver_status_interval
Definition: walreceiver.c:89
List * lappend_oid(List *list, Oid datum)
Definition: list.c:372
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1711
struct ErrorContextCallback * previous
Definition: elog.h:246
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:250
#define OidIsValid(objectId)
Definition: c.h:710
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:93
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
static int fd(const char *x, int i)
Definition: preproc-init.c:105
uint32 nsubxacts
Definition: worker.c:276
void BufFileClose(BufFile *file)
Definition: buffile.c:395
void ResetLatch(Latch *latch)
Definition: latch.c:660
int wal_receiver_timeout
Definition: walreceiver.c:90
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:2436
Definition: attmap.h:34
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:156
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2834
#define LOGICALREP_TWOPHASE_STATE_ENABLED
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:422
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
ErrorContextCallback * error_context_stack
Definition: elog.c:93
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:531
#define list_make1(x1)
Definition: pg_list.h:206
#define NAMEDATALEN
void FreeExecutorState(EState *estate)
Definition: execUtils.c:186
Subscription * GetSubscription(Oid subid, bool missing_ok)
#define GetPerTupleExprContext(estate)
Definition: executor.h:533
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:157
static StringInfoData reply_message
Definition: walreceiver.c:118
Definition: dynahash.c:219
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:2941
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:91
void pfree(void *pointer)
Definition: mcxt.c:1169
ModifyTableState * mtstate
Definition: worker.c:220
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:515
SubXactInfo * subxacts
Definition: worker.c:279
static void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:3204
static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:1324
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
#define ERROR
Definition: elog.h:46
static void apply_handle_prepare(StringInfo s)
Definition: worker.c:926
PlanState ps
Definition: execnodes.h:1187
struct ApplySubXactData ApplySubXactData
LogicalRepRelation remoterel
#define NAPTIME_PER_CYCLE
Definition: worker.c:195
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
static void * list_nth(const List *list, int n)
Definition: pg_list.h:278
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:1189
struct ApplyExecutionData ApplyExecutionData
bool in_remote_transaction
Definition: worker.c:249
void fill_extraUpdatedCols(RangeTblEntry *target_rte, Relation target_relation)
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs)
Definition: tablecmds.c:1736
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:739
Definition: guc.h:75
#define MAXPGPATH
static bool in_streamed_transaction
Definition: worker.c:253
TupleTableSlot * ri_PartitionTupleSlot
Definition: execnodes.h:514
XLogRecPtr reply_lsn
#define GIDSIZE
Definition: xact.h:31
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:721
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:354
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
Definition: fmgr.c:1662
static bool am_tablesync_worker(void)
static void apply_handle_delete(StringInfo s)
Definition: worker.c:1844
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
Definition: worker.c:1482
#define DEBUG2
Definition: elog.h:24
BufFile * BufFileCreateShared(SharedFileSet *fileset, const char *name)
Definition: buffile.c:262
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:102
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition: worker.c:1629
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:155
char * c
static void apply_handle_stream_prepare(StringInfo s)
Definition: worker.c:1063
void logicalrep_worker_attach(int slot)
Definition: launcher.c:564
#define NoLock
Definition: lockdefs.h:34
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:8086
static char * buf
Definition: pg_test_fsync.c:68
ResultRelInfo * targetRelInfo
Definition: worker.c:217
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:680
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:156
bool * tts_isnull
Definition: tuptable.h:128
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:316
List * es_opened_result_relations
Definition: execnodes.h:578
#define RowExclusiveLock
Definition: lockdefs.h:38
#define LOGICALREP_TWOPHASE_STATE_PENDING
int errcode_for_file_access(void)
Definition: elog.c:721
#define SIGHUP
Definition: win32_port.h:159
EState * estate
Definition: worker.c:214
#define InvalidTransactionId
Definition: transam.h:31
SharedFileSet * stream_fileset
Definition: worker.c:234
#define RelationGetRelationName(relation)
Definition: rel.h:511
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
XLogRecPtr startpoint
Definition: walreceiver.h:170
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:207
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:500
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
unsigned int uint32
Definition: c.h:441
int pgsocket
Definition: port.h:31
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:409
static void apply_handle_begin(StringInfo s)
Definition: worker.c:825
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:258
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition: lsyscache.c:2887
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2821
AttrMap * attrMap
Definition: tupconvert.h:28
BufFile * BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode)
Definition: buffile.c:284
MemoryContext TopMemoryContext
Definition: mcxt.c:48
EState * CreateExecutorState(void)
Definition: execUtils.c:90
Definition: guc.h:72
int my_log2(long num)
Definition: dynahash.c:1765
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:1650
List * lappend(List *list, void *datum)
Definition: list.c:336
TransactionId xid
Definition: worker.c:268
static void apply_handle_begin_prepare(StringInfo s)
Definition: worker.c:869
MemoryContext ApplyContext
Definition: worker.c:239
static char ** options
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
void BeginTransactionBlock(void)
Definition: xact.c:3637
XLogRecPtr final_lsn
Definition: logicalproto.h:123
#define DLIST_STATIC_INIT(name)
Definition: ilist.h:248
static void apply_handle_commit(StringInfo s)
Definition: worker.c:844
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2413
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:67
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
#define HASH_BLOBS
Definition: hsearch.h:97
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:225
Node * build_column_default(Relation rel, int attrno)
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:1451
List * es_tupleTable
Definition: execnodes.h:602
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1435
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1191
void * palloc0(Size size)
Definition: mcxt.c:1093
char * s2
static void apply_handle_update(StringInfo s)
Definition: worker.c:1684
uintptr_t Datum
Definition: postgres.h:411
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition: execMain.c:1193
void CommandCounterIncrement(void)
Definition: xact.c:1021
#define PGINVALID_SOCKET
Definition: port.h:33
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1255
static void begin_replication_step(void)
Definition: worker.c:370
Size keysize
Definition: hsearch.h:75
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:1960
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition: worker.c:1908
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5708
Plan * plan
Definition: execnodes.h:966
static void apply_handle_relation(StringInfo s)
Definition: worker.c:1517
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition: relation.c:528
int16 attnum
Definition: pg_attribute.h:83
static void apply_handle_commit_prepared(StringInfo s)
Definition: worker.c:971
SharedFileSet * subxact_fileset
Definition: worker.c:235
#define ereport(elevel,...)
Definition: elog.h:157
Bitmapset * updatedCols
Definition: parsenodes.h:1149
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:37
XLogRecPtr local_end
Definition: worker.c:200
static MemoryContext LogicalStreamingContext
Definition: worker.c:242
struct SubXactInfo SubXactInfo
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
static HTAB * xidhash
Definition: worker.c:261
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
Definition: buffile.c:743
void FinishPreparedTransaction(const char *gid, bool isCommit)
Definition: twophase.c:1409
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4670
static void apply_dispatch(StringInfo s)
Definition: worker.c:2328
int errmsg_internal(const char *fmt,...)
Definition: elog.c:996
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:38
static void maybe_reread_subscription(void)
Definition: worker.c:2840
#define makeNode(_type_)
Definition: nodes.h:587
TimestampTz last_recv_time
static MemoryContext ApplyMessageContext
Definition: worker.c:238
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:689
void SharedFileSetDeleteAll(SharedFileSet *fileset)
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:362
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:177
bool AllTablesyncsReady(void)
Definition: tablesync.c:1230
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
#define lfirst(lc)
Definition: pg_list.h:169
RepOriginId replorigin_session_origin
Definition: origin.c:154
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:631
static void apply_handle_origin(StringInfo s)
Definition: worker.c:1111
void StartTransactionCommand(void)
Definition: xact.c:2838
AttrNumber * attnums
Definition: attmap.h:36
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
LogicalRepMsgType
Definition: logicalproto.h:51
static int server_version
Definition: pg_dumpall.c:83
size_t Size
Definition: c.h:540
static void slot_store_error_callback(void *arg)
Definition: worker.c:588
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:477
bool IsTransactionState(void)
Definition: xact.c:371
static void end_replication_step(void)
Definition: worker.c:393
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:736
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:424
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:95
int WalWriterDelay
Definition: walwriter.c:70
bool MySubscriptionValid
Definition: worker.c:247
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1182
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
Definition: proto.c:322
off_t offset
Definition: worker.c:270
#define InvalidRepOriginId
Definition: origin.h:33
static void finish_edata(ApplyExecutionData *edata)
Definition: worker.c:500
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:538
void FreeSubscription(Subscription *sub)
RTEKind rtekind
Definition: parsenodes.h:995
#define AccessExclusiveLock
Definition: lockdefs.h:45
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:256
TransactionId xid
Definition: worker.c:233
static void subxact_info_write(Oid subid, TransactionId xid)
Definition: worker.c:2956
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4690
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:833
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
Definition: proto.c:264
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition: proto.c:1059
void * palloc(Size size)
Definition: mcxt.c:1062
TupleConversionMap * ri_RootToPartitionMap
Definition: execnodes.h:513
int errmsg(const char *fmt,...)
Definition: elog.c:909
void pgstat_report_activity(BackendState state, const char *cmd_str)
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1286
static void stream_open_file(Oid subid, TransactionId xid, bool first)
Definition: worker.c:3253
static ApplySubXactData subxact_data
Definition: worker.c:282
static BufFile * stream_fd
Definition: worker.c:264
uint32 nsubxacts_max
Definition: worker.c:277
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:2515
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
#define elog(elevel,...)
Definition: elog.h:232
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
int i
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:543
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
#define errcontext
Definition: elog.h:204
static void cleanup_subxact_info(void)
Definition: worker.c:3386
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1697
void * arg
struct Latch * MyLatch
Definition: globals.c:57
struct FlushPosition FlushPosition
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:609
static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
Definition: worker.c:3403
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:123
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:761
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
void BufFileWrite(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:586
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static void apply_handle_rollback_prepared(StringInfo s)
Definition: worker.c:1010
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389
Definition: pg_list.h:50
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1899
#define snprintf
Definition: port.h:216
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1644
#define WL_LATCH_SET
Definition: latch.h:125
#define RelationGetRelid(relation)
Definition: rel.h:477
static TransactionId stream_xid
Definition: worker.c:255
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
CmdType
Definition: nodes.h:682
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:231
#define die(msg)
Definition: pg_test_fsync.c:97
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4761
struct StreamXidHash StreamXidHash
TimestampTz committime
Definition: logicalproto.h:132
void ApplyWorkerMain(Datum main_arg)
Definition: worker.c:3417
void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid)
Definition: proto.c:1151
uint32 LogicalRepRelId
Definition: logicalproto.h:95
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1552
#define lfirst_oid(lc)
Definition: pg_list.h:171
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
TimestampTz reply_time
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:227
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:268
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
Definition: worker.c:892
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5737
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:2213
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:404
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)