PostgreSQL Source Code  git master
worker.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * worker.c
3  * PostgreSQL logical replication worker (apply)
4  *
5  * Copyright (c) 2016-2020, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/worker.c
9  *
10  * NOTES
11  * This file contains the worker which applies logical changes as they come
12  * from remote logical replication stream.
13  *
14  * The main worker (apply) is started by logical replication worker
15  * launcher for every enabled subscription in a database. It uses
16  * walsender protocol to communicate with publisher.
17  *
18  * This module includes server facing code and shares libpqwalreceiver
19  * module with walreceiver for providing the libpq specific functionality.
20  *
21  *
22  * STREAMED TRANSACTIONS
23  * ---------------------
24  * Streamed transactions (large transactions exceeding a memory limit on the
25  * upstream) are not applied immediately, but instead, the data is written
26  * to temporary files and then applied at once when the final commit arrives.
27  *
28  * Unlike the regular (non-streamed) case, handling streamed transactions has
29  * to handle aborts of both the toplevel transaction and subtransactions. This
30  * is achieved by tracking offsets for subtransactions, which is then used
31  * to truncate the file with serialized changes.
32  *
33  * The files are placed in tmp file directory by default, and the filenames
34  * include both the XID of the toplevel transaction and OID of the
35  * subscription. This is necessary so that different workers processing a
36  * remote transaction with the same XID doesn't interfere.
37  *
38  * We use BufFiles instead of using normal temporary files because (a) the
39  * BufFile infrastructure supports temporary files that exceed the OS file size
40  * limit, (b) provides a way for automatic clean up on the error and (c) provides
41  * a way to survive these files across local transactions and allow to open and
42  * close at stream start and close. We decided to use SharedFileSet
43  * infrastructure as without that it deletes the files on the closure of the
44  * file and if we decide to keep stream files open across the start/stop stream
45  * then it will consume a lot of memory (more than 8K for each BufFile and
46  * there could be multiple such BufFiles as the subscriber could receive
47  * multiple start/stop streams for different transactions before getting the
48  * commit). Moreover, if we don't use SharedFileSet then we also need to invent
49  * a new way to pass filenames to BufFile APIs so that we are allowed to open
50  * the file we desired across multiple stream-open calls for the same
51  * transaction.
52  *-------------------------------------------------------------------------
53  */
54 
55 #include "postgres.h"
56 
57 #include <sys/stat.h>
58 #include <unistd.h>
59 
60 #include "access/table.h"
61 #include "access/tableam.h"
62 #include "access/xact.h"
63 #include "access/xlog_internal.h"
64 #include "catalog/catalog.h"
65 #include "catalog/namespace.h"
66 #include "catalog/partition.h"
67 #include "catalog/pg_inherits.h"
70 #include "catalog/pg_tablespace.h"
71 #include "commands/tablecmds.h"
72 #include "commands/tablespace.h"
73 #include "commands/trigger.h"
74 #include "executor/executor.h"
75 #include "executor/execPartition.h"
77 #include "funcapi.h"
78 #include "libpq/pqformat.h"
79 #include "libpq/pqsignal.h"
80 #include "mb/pg_wchar.h"
81 #include "miscadmin.h"
82 #include "nodes/makefuncs.h"
83 #include "optimizer/optimizer.h"
84 #include "parser/analyze.h"
85 #include "parser/parse_relation.h"
86 #include "pgstat.h"
87 #include "postmaster/bgworker.h"
88 #include "postmaster/interrupt.h"
89 #include "postmaster/postmaster.h"
90 #include "postmaster/walwriter.h"
91 #include "replication/decode.h"
92 #include "replication/logical.h"
96 #include "replication/origin.h"
98 #include "replication/snapbuild.h"
101 #include "rewrite/rewriteHandler.h"
102 #include "storage/buffile.h"
103 #include "storage/bufmgr.h"
104 #include "storage/fd.h"
105 #include "storage/ipc.h"
106 #include "storage/lmgr.h"
107 #include "storage/proc.h"
108 #include "storage/procarray.h"
109 #include "tcop/tcopprot.h"
110 #include "utils/builtins.h"
111 #include "utils/catcache.h"
112 #include "utils/dynahash.h"
113 #include "utils/datum.h"
114 #include "utils/fmgroids.h"
115 #include "utils/guc.h"
116 #include "utils/inval.h"
117 #include "utils/lsyscache.h"
118 #include "utils/memutils.h"
119 #include "utils/rel.h"
120 #include "utils/syscache.h"
121 #include "utils/timeout.h"
122 
123 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
124 
125 typedef struct FlushPosition
126 {
130 } FlushPosition;
131 
133 
134 typedef struct SlotErrCallbackArg
135 {
140 
141 /*
142  * Stream xid hash entry. Whenever we see a new xid we create this entry in the
143  * xidhash and along with it create the streaming file and store the fileset handle.
144  * The subxact file is created iff there is any subxact info under this xid. This
145  * entry is used on the subsequent streams for the xid to get the corresponding
146  * fileset handles, so storing them in hash makes the search faster.
147  */
148 typedef struct StreamXidHash
149 {
150  TransactionId xid; /* xid is the hash key and must be first */
151  SharedFileSet *stream_fileset; /* shared file set for stream data */
152  SharedFileSet *subxact_fileset; /* shared file set for subxact info */
153 } StreamXidHash;
154 
157 
158 /* per stream context for streaming transactions */
160 
162 
164 bool MySubscriptionValid = false;
165 
168 
169 /* fields valid only when processing streamed transaction */
171 
173 
174 /*
175  * Hash table for storing the streaming xid information along with shared file
176  * set for streaming and subxact files.
177  */
178 static HTAB *xidhash = NULL;
179 
180 /* BufFile handle of the current streaming file */
181 static BufFile *stream_fd = NULL;
182 
183 typedef struct SubXactInfo
184 {
185  TransactionId xid; /* XID of the subxact */
186  int fileno; /* file number in the buffile */
187  off_t offset; /* offset in the file */
188 } SubXactInfo;
189 
190 /* Sub-transaction data for the current streaming transaction */
191 typedef struct ApplySubXactData
192 {
193  uint32 nsubxacts; /* number of sub-transactions */
194  uint32 nsubxacts_max; /* current capacity of subxacts */
195  TransactionId subxact_last; /* xid of the last sub-transaction */
196  SubXactInfo *subxacts; /* sub-xact offset in changes file */
198 
200 
201 static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
202 static inline void changes_filename(char *path, Oid subid, TransactionId xid);
203 
204 /*
205  * Information about subtransactions of a given toplevel transaction.
206  */
207 static void subxact_info_write(Oid subid, TransactionId xid);
208 static void subxact_info_read(Oid subid, TransactionId xid);
209 static void subxact_info_add(TransactionId xid);
210 static inline void cleanup_subxact_info(void);
211 
212 /*
213  * Serialize and deserialize changes for a toplevel transaction.
214  */
215 static void stream_cleanup_files(Oid subid, TransactionId xid);
216 static void stream_open_file(Oid subid, TransactionId xid, bool first);
217 static void stream_write_change(char action, StringInfo s);
218 static void stream_close_file(void);
219 
220 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
221 
222 static void store_flush_position(XLogRecPtr remote_lsn);
223 
224 static void maybe_reread_subscription(void);
225 
226 /* prototype needed because of stream_commit */
227 static void apply_dispatch(StringInfo s);
228 
229 static void apply_handle_insert_internal(ResultRelInfo *relinfo,
230  EState *estate, TupleTableSlot *remoteslot);
231 static void apply_handle_update_internal(ResultRelInfo *relinfo,
232  EState *estate, TupleTableSlot *remoteslot,
233  LogicalRepTupleData *newtup,
234  LogicalRepRelMapEntry *relmapentry);
235 static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
236  TupleTableSlot *remoteslot,
237  LogicalRepRelation *remoterel);
238 static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
239  LogicalRepRelation *remoterel,
240  TupleTableSlot *remoteslot,
241  TupleTableSlot **localslot);
242 static void apply_handle_tuple_routing(ResultRelInfo *relinfo,
243  EState *estate,
244  TupleTableSlot *remoteslot,
245  LogicalRepTupleData *newtup,
246  LogicalRepRelMapEntry *relmapentry,
247  CmdType operation);
248 
249 /*
250  * Should this worker apply changes for given relation.
251  *
252  * This is mainly needed for initial relation data sync as that runs in
253  * separate worker process running in parallel and we need some way to skip
254  * changes coming to the main apply worker during the sync of a table.
255  *
256  * Note we need to do smaller or equals comparison for SYNCDONE state because
257  * it might hold position of end of initial slot consistent point WAL
258  * record + 1 (ie start of next record) and next record can be COMMIT of
259  * transaction we are now processing (which is what we set remote_final_lsn
260  * to in apply_handle_begin).
261  */
262 static bool
264 {
265  if (am_tablesync_worker())
266  return MyLogicalRepWorker->relid == rel->localreloid;
267  else
268  return (rel->state == SUBREL_STATE_READY ||
269  (rel->state == SUBREL_STATE_SYNCDONE &&
270  rel->statelsn <= remote_final_lsn));
271 }
272 
273 /*
274  * Make sure that we started local transaction.
275  *
276  * Also switches to ApplyMessageContext as necessary.
277  */
278 static bool
280 {
281  if (IsTransactionState())
282  {
284 
285  if (CurrentMemoryContext != ApplyMessageContext)
286  MemoryContextSwitchTo(ApplyMessageContext);
287 
288  return false;
289  }
290 
293 
295 
296  MemoryContextSwitchTo(ApplyMessageContext);
297  return true;
298 }
299 
300 /*
301  * Handle streamed transactions.
302  *
303  * If in streaming mode (receiving a block of streamed transaction), we
304  * simply redirect it to a file for the proper toplevel transaction.
305  *
306  * Returns true for streamed transactions, false otherwise (regular mode).
307  */
308 static bool
310 {
311  TransactionId xid;
312 
313  /* not in streaming mode */
315  return false;
316 
317  Assert(stream_fd != NULL);
319 
320  /*
321  * We should have received XID of the subxact as the first part of the
322  * message, so extract it.
323  */
324  xid = pq_getmsgint(s, 4);
325 
327 
328  /* Add the new subxact to the array (unless already there). */
329  subxact_info_add(xid);
330 
331  /* write the change to the current file */
332  stream_write_change(action, s);
333 
334  return true;
335 }
336 
337 /*
338  * Executor state preparation for evaluation of constraint expressions,
339  * indexes and triggers.
340  *
341  * This is based on similar code in copy.c
342  */
343 static EState *
345 {
346  EState *estate;
347  ResultRelInfo *resultRelInfo;
348  RangeTblEntry *rte;
349 
350  estate = CreateExecutorState();
351 
352  rte = makeNode(RangeTblEntry);
353  rte->rtekind = RTE_RELATION;
354  rte->relid = RelationGetRelid(rel->localrel);
355  rte->relkind = rel->localrel->rd_rel->relkind;
357  ExecInitRangeTable(estate, list_make1(rte));
358 
359  resultRelInfo = makeNode(ResultRelInfo);
360  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
361 
362  estate->es_result_relations = resultRelInfo;
363  estate->es_num_result_relations = 1;
364  estate->es_result_relation_info = resultRelInfo;
365 
366  estate->es_output_cid = GetCurrentCommandId(true);
367 
368  /* Prepare to catch AFTER triggers. */
370 
371  return estate;
372 }
373 
374 /*
375  * Executes default values for columns for which we can't map to remote
376  * relation columns.
377  *
378  * This allows us to support tables which have more columns on the downstream
379  * than on the upstream.
380  */
381 static void
383  TupleTableSlot *slot)
384 {
385  TupleDesc desc = RelationGetDescr(rel->localrel);
386  int num_phys_attrs = desc->natts;
387  int i;
388  int attnum,
389  num_defaults = 0;
390  int *defmap;
391  ExprState **defexprs;
392  ExprContext *econtext;
393 
394  econtext = GetPerTupleExprContext(estate);
395 
396  /* We got all the data via replication, no need to evaluate anything. */
397  if (num_phys_attrs == rel->remoterel.natts)
398  return;
399 
400  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
401  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
402 
403  Assert(rel->attrmap->maplen == num_phys_attrs);
404  for (attnum = 0; attnum < num_phys_attrs; attnum++)
405  {
406  Expr *defexpr;
407 
408  if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
409  continue;
410 
411  if (rel->attrmap->attnums[attnum] >= 0)
412  continue;
413 
414  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
415 
416  if (defexpr != NULL)
417  {
418  /* Run the expression through planner */
419  defexpr = expression_planner(defexpr);
420 
421  /* Initialize executable expression in copycontext */
422  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
423  defmap[num_defaults] = attnum;
424  num_defaults++;
425  }
426 
427  }
428 
429  for (i = 0; i < num_defaults; i++)
430  slot->tts_values[defmap[i]] =
431  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
432 }
433 
434 /*
435  * Error callback to give more context info about type conversion failure.
436  */
437 static void
439 {
440  SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
442  char *remotetypname;
443  Oid remotetypoid,
444  localtypoid;
445 
446  /* Nothing to do if remote attribute number is not set */
447  if (errarg->remote_attnum < 0)
448  return;
449 
450  rel = errarg->rel;
451  remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum];
452 
453  /* Fetch remote type name from the LogicalRepTypMap cache */
454  remotetypname = logicalrep_typmap_gettypname(remotetypoid);
455 
456  /* Fetch local type OID from the local sys cache */
457  localtypoid = get_atttype(rel->localreloid, errarg->local_attnum + 1);
458 
459  errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
460  "remote type %s, local type %s",
461  rel->remoterel.nspname, rel->remoterel.relname,
462  rel->remoterel.attnames[errarg->remote_attnum],
463  remotetypname,
464  format_type_be(localtypoid));
465 }
466 
467 /*
468  * Store tuple data into slot.
469  *
470  * Incoming data can be either text or binary format.
471  */
472 static void
474  LogicalRepTupleData *tupleData)
475 {
476  int natts = slot->tts_tupleDescriptor->natts;
477  int i;
478  SlotErrCallbackArg errarg;
479  ErrorContextCallback errcallback;
480 
481  ExecClearTuple(slot);
482 
483  /* Push callback + info on the error context stack */
484  errarg.rel = rel;
485  errarg.local_attnum = -1;
486  errarg.remote_attnum = -1;
487  errcallback.callback = slot_store_error_callback;
488  errcallback.arg = (void *) &errarg;
489  errcallback.previous = error_context_stack;
490  error_context_stack = &errcallback;
491 
492  /* Call the "in" function for each non-dropped, non-null attribute */
493  Assert(natts == rel->attrmap->maplen);
494  for (i = 0; i < natts; i++)
495  {
497  int remoteattnum = rel->attrmap->attnums[i];
498 
499  if (!att->attisdropped && remoteattnum >= 0)
500  {
501  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
502 
503  Assert(remoteattnum < tupleData->ncols);
504 
505  errarg.local_attnum = i;
506  errarg.remote_attnum = remoteattnum;
507 
508  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
509  {
510  Oid typinput;
511  Oid typioparam;
512 
513  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
514  slot->tts_values[i] =
515  OidInputFunctionCall(typinput, colvalue->data,
516  typioparam, att->atttypmod);
517  slot->tts_isnull[i] = false;
518  }
519  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
520  {
521  Oid typreceive;
522  Oid typioparam;
523 
524  /*
525  * In some code paths we may be asked to re-parse the same
526  * tuple data. Reset the StringInfo's cursor so that works.
527  */
528  colvalue->cursor = 0;
529 
530  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
531  slot->tts_values[i] =
532  OidReceiveFunctionCall(typreceive, colvalue,
533  typioparam, att->atttypmod);
534 
535  /* Trouble if it didn't eat the whole buffer */
536  if (colvalue->cursor != colvalue->len)
537  ereport(ERROR,
538  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
539  errmsg("incorrect binary data format in logical replication column %d",
540  remoteattnum + 1)));
541  slot->tts_isnull[i] = false;
542  }
543  else
544  {
545  /*
546  * NULL value from remote. (We don't expect to see
547  * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
548  * NULL.)
549  */
550  slot->tts_values[i] = (Datum) 0;
551  slot->tts_isnull[i] = true;
552  }
553 
554  errarg.local_attnum = -1;
555  errarg.remote_attnum = -1;
556  }
557  else
558  {
559  /*
560  * We assign NULL to dropped attributes and missing values
561  * (missing values should be later filled using
562  * slot_fill_defaults).
563  */
564  slot->tts_values[i] = (Datum) 0;
565  slot->tts_isnull[i] = true;
566  }
567  }
568 
569  /* Pop the error context stack */
570  error_context_stack = errcallback.previous;
571 
572  ExecStoreVirtualTuple(slot);
573 }
574 
575 /*
576  * Replace updated columns with data from the LogicalRepTupleData struct.
577  * This is somewhat similar to heap_modify_tuple but also calls the type
578  * input functions on the user data.
579  *
580  * "slot" is filled with a copy of the tuple in "srcslot", replacing
581  * columns provided in "tupleData" and leaving others as-is.
582  *
583  * Caution: unreplaced pass-by-ref columns in "slot" will point into the
584  * storage for "srcslot". This is OK for current usage, but someday we may
585  * need to materialize "slot" at the end to make it independent of "srcslot".
586  */
587 static void
590  LogicalRepTupleData *tupleData)
591 {
592  int natts = slot->tts_tupleDescriptor->natts;
593  int i;
594  SlotErrCallbackArg errarg;
595  ErrorContextCallback errcallback;
596 
597  /* We'll fill "slot" with a virtual tuple, so we must start with ... */
598  ExecClearTuple(slot);
599 
600  /*
601  * Copy all the column data from srcslot, so that we'll have valid values
602  * for unreplaced columns.
603  */
604  Assert(natts == srcslot->tts_tupleDescriptor->natts);
605  slot_getallattrs(srcslot);
606  memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
607  memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
608 
609  /* For error reporting, push callback + info on the error context stack */
610  errarg.rel = rel;
611  errarg.local_attnum = -1;
612  errarg.remote_attnum = -1;
613  errcallback.callback = slot_store_error_callback;
614  errcallback.arg = (void *) &errarg;
615  errcallback.previous = error_context_stack;
616  error_context_stack = &errcallback;
617 
618  /* Call the "in" function for each replaced attribute */
619  Assert(natts == rel->attrmap->maplen);
620  for (i = 0; i < natts; i++)
621  {
623  int remoteattnum = rel->attrmap->attnums[i];
624 
625  if (remoteattnum < 0)
626  continue;
627 
628  Assert(remoteattnum < tupleData->ncols);
629 
630  if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
631  {
632  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
633 
634  errarg.local_attnum = i;
635  errarg.remote_attnum = remoteattnum;
636 
637  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
638  {
639  Oid typinput;
640  Oid typioparam;
641 
642  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
643  slot->tts_values[i] =
644  OidInputFunctionCall(typinput, colvalue->data,
645  typioparam, att->atttypmod);
646  slot->tts_isnull[i] = false;
647  }
648  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
649  {
650  Oid typreceive;
651  Oid typioparam;
652 
653  /*
654  * In some code paths we may be asked to re-parse the same
655  * tuple data. Reset the StringInfo's cursor so that works.
656  */
657  colvalue->cursor = 0;
658 
659  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
660  slot->tts_values[i] =
661  OidReceiveFunctionCall(typreceive, colvalue,
662  typioparam, att->atttypmod);
663 
664  /* Trouble if it didn't eat the whole buffer */
665  if (colvalue->cursor != colvalue->len)
666  ereport(ERROR,
667  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
668  errmsg("incorrect binary data format in logical replication column %d",
669  remoteattnum + 1)));
670  slot->tts_isnull[i] = false;
671  }
672  else
673  {
674  /* must be LOGICALREP_COLUMN_NULL */
675  slot->tts_values[i] = (Datum) 0;
676  slot->tts_isnull[i] = true;
677  }
678 
679  errarg.local_attnum = -1;
680  errarg.remote_attnum = -1;
681  }
682  }
683 
684  /* Pop the error context stack */
685  error_context_stack = errcallback.previous;
686 
687  /* And finally, declare that "slot" contains a valid virtual tuple */
688  ExecStoreVirtualTuple(slot);
689 }
690 
691 /*
692  * Handle BEGIN message.
693  */
694 static void
696 {
697  LogicalRepBeginData begin_data;
698 
699  logicalrep_read_begin(s, &begin_data);
700 
701  remote_final_lsn = begin_data.final_lsn;
702 
703  in_remote_transaction = true;
704 
706 }
707 
708 /*
709  * Handle COMMIT message.
710  *
711  * TODO, support tracking of multiple origins
712  */
713 static void
715 {
716  LogicalRepCommitData commit_data;
717 
718  logicalrep_read_commit(s, &commit_data);
719 
720  Assert(commit_data.commit_lsn == remote_final_lsn);
721 
722  /* The synchronization worker runs in single transaction. */
724  {
725  /*
726  * Update origin state so we can restart streaming from correct
727  * position in case of crash.
728  */
731 
733  pgstat_report_stat(false);
734 
735  store_flush_position(commit_data.end_lsn);
736  }
737  else
738  {
739  /* Process any invalidation messages that might have accumulated. */
742  }
743 
744  in_remote_transaction = false;
745 
746  /* Process any tables that are being synchronized in parallel. */
747  process_syncing_tables(commit_data.end_lsn);
748 
750 }
751 
752 /*
753  * Handle ORIGIN message.
754  *
755  * TODO, support tracking of multiple origins
756  */
757 static void
759 {
760  /*
761  * ORIGIN message can only come inside streaming transaction or inside
762  * remote transaction and before any actual writes.
763  */
767  ereport(ERROR,
768  (errcode(ERRCODE_PROTOCOL_VIOLATION),
769  errmsg("ORIGIN message sent out of order")));
770 }
771 
772 /*
773  * Handle STREAM START message.
774  */
775 static void
777 {
778  bool first_segment;
779  HASHCTL hash_ctl;
780 
782 
783  /*
784  * Start a transaction on stream start, this transaction will be committed
785  * on the stream stop. We need the transaction for handling the buffile,
786  * used for serializing the streaming data and subxact info.
787  */
789 
790  /* notify handle methods we're processing a remote transaction */
792 
793  /* extract XID of the top-level transaction */
794  stream_xid = logicalrep_read_stream_start(s, &first_segment);
795 
796  /*
797  * Initialize the xidhash table if we haven't yet. This will be used for
798  * the entire duration of the apply worker so create it in permanent
799  * context.
800  */
801  if (xidhash == NULL)
802  {
803  hash_ctl.keysize = sizeof(TransactionId);
804  hash_ctl.entrysize = sizeof(StreamXidHash);
805  hash_ctl.hcxt = ApplyContext;
806  xidhash = hash_create("StreamXidHash", 1024, &hash_ctl,
808  }
809 
810  /* open the spool file for this transaction */
812 
813  /* if this is not the first segment, open existing subxact file */
814  if (!first_segment)
816 
818 }
819 
820 /*
821  * Handle STREAM STOP message.
822  */
823 static void
825 {
827 
828  /*
829  * Close the file with serialized changes, and serialize information about
830  * subxacts for the toplevel transaction.
831  */
834 
835  /* We must be in a valid transaction state */
837 
838  /* Commit the per-stream transaction */
840 
841  in_streamed_transaction = false;
842 
843  /* Reset per-stream context */
844  MemoryContextReset(LogicalStreamingContext);
845 
847 }
848 
849 /*
850  * Handle STREAM abort message.
851  */
852 static void
854 {
855  TransactionId xid;
856  TransactionId subxid;
857 
859 
860  logicalrep_read_stream_abort(s, &xid, &subxid);
861 
862  /*
863  * If the two XIDs are the same, it's in fact abort of toplevel xact, so
864  * just delete the files with serialized info.
865  */
866  if (xid == subxid)
868  else
869  {
870  /*
871  * OK, so it's a subxact. We need to read the subxact file for the
872  * toplevel transaction, determine the offset tracked for the subxact,
873  * and truncate the file with changes. We also remove the subxacts
874  * with higher offsets (or rather higher XIDs).
875  *
876  * We intentionally scan the array from the tail, because we're likely
877  * aborting a change for the most recent subtransactions.
878  *
879  * We can't use the binary search here as subxact XIDs won't
880  * necessarily arrive in sorted order, consider the case where we have
881  * released the savepoint for multiple subtransactions and then
882  * performed rollback to savepoint for one of the earlier
883  * sub-transaction.
884  */
885 
886  int64 i;
887  int64 subidx;
888  BufFile *fd;
889  bool found = false;
890  char path[MAXPGPATH];
891  StreamXidHash *ent;
892 
893  subidx = -1;
896 
897  for (i = subxact_data.nsubxacts; i > 0; i--)
898  {
899  if (subxact_data.subxacts[i - 1].xid == subxid)
900  {
901  subidx = (i - 1);
902  found = true;
903  break;
904  }
905  }
906 
907  /*
908  * If it's an empty sub-transaction then we will not find the subxid
909  * here so just cleanup the subxact info and return.
910  */
911  if (!found)
912  {
913  /* Cleanup the subxact info */
916  return;
917  }
918 
919  Assert((subidx >= 0) && (subidx < subxact_data.nsubxacts));
920 
921  ent = (StreamXidHash *) hash_search(xidhash,
922  (void *) &xid,
923  HASH_FIND,
924  &found);
925  Assert(found);
926 
927  /* open the changes file */
929  fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
930 
931  /* OK, truncate the file at the right offset */
932  BufFileTruncateShared(fd, subxact_data.subxacts[subidx].fileno,
933  subxact_data.subxacts[subidx].offset);
934  BufFileClose(fd);
935 
936  /* discard the subxacts added later */
937  subxact_data.nsubxacts = subidx;
938 
939  /* write the updated subxact list */
942  }
943 }
944 
945 /*
946  * Handle STREAM COMMIT message.
947  */
948 static void
950 {
951  TransactionId xid;
953  int nchanges;
954  char path[MAXPGPATH];
955  char *buffer = NULL;
956  bool found;
957  LogicalRepCommitData commit_data;
958  StreamXidHash *ent;
959  MemoryContext oldcxt;
960  BufFile *fd;
961 
963 
964  xid = logicalrep_read_stream_commit(s, &commit_data);
965 
966  elog(DEBUG1, "received commit for streamed transaction %u", xid);
967 
969 
970  /*
971  * Allocate file handle and memory required to process all the messages in
972  * TopTransactionContext to avoid them getting reset after each message is
973  * processed.
974  */
976 
977  /* open the spool file for the committed transaction */
979  elog(DEBUG1, "replaying changes from file \"%s\"", path);
980  ent = (StreamXidHash *) hash_search(xidhash,
981  (void *) &xid,
982  HASH_FIND,
983  &found);
984  Assert(found);
985  fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY);
986 
987  buffer = palloc(BLCKSZ);
988  initStringInfo(&s2);
989 
990  MemoryContextSwitchTo(oldcxt);
991 
992  remote_final_lsn = commit_data.commit_lsn;
993 
994  /*
995  * Make sure the handle apply_dispatch methods are aware we're in a remote
996  * transaction.
997  */
998  in_remote_transaction = true;
1000 
1001  /*
1002  * Read the entries one by one and pass them through the same logic as in
1003  * apply_dispatch.
1004  */
1005  nchanges = 0;
1006  while (true)
1007  {
1008  int nbytes;
1009  int len;
1010 
1012 
1013  /* read length of the on-disk record */
1014  nbytes = BufFileRead(fd, &len, sizeof(len));
1015 
1016  /* have we reached end of the file? */
1017  if (nbytes == 0)
1018  break;
1019 
1020  /* do we have a correct length? */
1021  if (nbytes != sizeof(len))
1022  ereport(ERROR,
1024  errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1025  path)));
1026 
1027  Assert(len > 0);
1028 
1029  /* make sure we have sufficiently large buffer */
1030  buffer = repalloc(buffer, len);
1031 
1032  /* and finally read the data into the buffer */
1033  if (BufFileRead(fd, buffer, len) != len)
1034  ereport(ERROR,
1036  errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1037  path)));
1038 
1039  /* copy the buffer to the stringinfo and call apply_dispatch */
1040  resetStringInfo(&s2);
1041  appendBinaryStringInfo(&s2, buffer, len);
1042 
1043  /* Ensure we are reading the data into our memory context. */
1044  oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
1045 
1046  apply_dispatch(&s2);
1047 
1048  MemoryContextReset(ApplyMessageContext);
1049 
1050  MemoryContextSwitchTo(oldcxt);
1051 
1052  nchanges++;
1053 
1054  if (nchanges % 1000 == 0)
1055  elog(DEBUG1, "replayed %d changes from file '%s'",
1056  nchanges, path);
1057  }
1058 
1059  BufFileClose(fd);
1060 
1061  /*
1062  * Update origin state so we can restart streaming from correct position
1063  * in case of crash.
1064  */
1065  replorigin_session_origin_lsn = commit_data.end_lsn;
1067 
1068  pfree(buffer);
1069  pfree(s2.data);
1070 
1072  pgstat_report_stat(false);
1073 
1074  store_flush_position(commit_data.end_lsn);
1075 
1076  elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
1077  nchanges, path);
1078 
1079  in_remote_transaction = false;
1080 
1081  /* Process any tables that are being synchronized in parallel. */
1082  process_syncing_tables(commit_data.end_lsn);
1083 
1084  /* unlink the files with serialized changes and subxact info */
1086 
1088 }
1089 
1090 /*
1091  * Handle RELATION message.
1092  *
1093  * Note we don't do validation against local schema here. The validation
1094  * against local schema is postponed until first change for given relation
1095  * comes as we only care about it when applying changes for it anyway and we
1096  * do less locking this way.
1097  */
1098 static void
1100 {
1101  LogicalRepRelation *rel;
1102 
1103  if (handle_streamed_transaction('R', s))
1104  return;
1105 
1106  rel = logicalrep_read_rel(s);
1108 }
1109 
1110 /*
1111  * Handle TYPE message.
1112  *
1113  * Note we don't do local mapping here, that's done when the type is
1114  * actually used.
1115  */
1116 static void
1118 {
1119  LogicalRepTyp typ;
1120 
1121  if (handle_streamed_transaction('Y', s))
1122  return;
1123 
1124  logicalrep_read_typ(s, &typ);
1126 }
1127 
1128 /*
1129  * Get replica identity index or if it is not defined a primary key.
1130  *
1131  * If neither is defined, returns InvalidOid
1132  */
1133 static Oid
1135 {
1136  Oid idxoid;
1137 
1138  idxoid = RelationGetReplicaIndex(rel);
1139 
1140  if (!OidIsValid(idxoid))
1141  idxoid = RelationGetPrimaryKeyIndex(rel);
1142 
1143  return idxoid;
1144 }
1145 
1146 /*
1147  * Handle INSERT message.
1148  */
1149 
1150 static void
1152 {
1153  LogicalRepRelMapEntry *rel;
1154  LogicalRepTupleData newtup;
1155  LogicalRepRelId relid;
1156  EState *estate;
1157  TupleTableSlot *remoteslot;
1158  MemoryContext oldctx;
1159 
1160  if (handle_streamed_transaction('I', s))
1161  return;
1162 
1164 
1165  relid = logicalrep_read_insert(s, &newtup);
1166  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1167  if (!should_apply_changes_for_rel(rel))
1168  {
1169  /*
1170  * The relation can't become interesting in the middle of the
1171  * transaction so it's safe to unlock it.
1172  */
1174  return;
1175  }
1176 
1177  /* Initialize the executor state. */
1178  estate = create_estate_for_relation(rel);
1179  remoteslot = ExecInitExtraTupleSlot(estate,
1180  RelationGetDescr(rel->localrel),
1181  &TTSOpsVirtual);
1182 
1183  /* Input functions may need an active snapshot, so get one */
1185 
1186  /* Process and store remote tuple in the slot */
1188  slot_store_data(remoteslot, rel, &newtup);
1189  slot_fill_defaults(rel, estate, remoteslot);
1190  MemoryContextSwitchTo(oldctx);
1191 
1192  /* For a partitioned table, insert the tuple into a partition. */
1193  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1195  remoteslot, NULL, rel, CMD_INSERT);
1196  else
1198  remoteslot);
1199 
1201 
1202  /* Handle queued AFTER triggers. */
1203  AfterTriggerEndQuery(estate);
1204 
1205  ExecResetTupleTable(estate->es_tupleTable, false);
1206  FreeExecutorState(estate);
1207 
1209 
1211 }
1212 
1213 /* Workhorse for apply_handle_insert() */
1214 static void
1216  EState *estate, TupleTableSlot *remoteslot)
1217 {
1218  ExecOpenIndices(relinfo, false);
1219 
1220  /* Do the insert. */
1221  ExecSimpleRelationInsert(estate, remoteslot);
1222 
1223  /* Cleanup. */
1224  ExecCloseIndices(relinfo);
1225 }
1226 
1227 /*
1228  * Check if the logical replication relation is updatable and throw
1229  * appropriate error if it isn't.
1230  */
1231 static void
1233 {
1234  /* Updatable, no error. */
1235  if (rel->updatable)
1236  return;
1237 
1238  /*
1239  * We are in error mode so it's fine this is somewhat slow. It's better to
1240  * give user correct error.
1241  */
1243  {
1244  ereport(ERROR,
1245  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1246  errmsg("publisher did not send replica identity column "
1247  "expected by the logical replication target relation \"%s.%s\"",
1248  rel->remoterel.nspname, rel->remoterel.relname)));
1249  }
1250 
1251  ereport(ERROR,
1252  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1253  errmsg("logical replication target relation \"%s.%s\" has "
1254  "neither REPLICA IDENTITY index nor PRIMARY "
1255  "KEY and published relation does not have "
1256  "REPLICA IDENTITY FULL",
1257  rel->remoterel.nspname, rel->remoterel.relname)));
1258 }
1259 
1260 /*
1261  * Handle UPDATE message.
1262  *
1263  * TODO: FDW support
1264  */
1265 static void
1267 {
1268  LogicalRepRelMapEntry *rel;
1269  LogicalRepRelId relid;
1270  EState *estate;
1271  LogicalRepTupleData oldtup;
1272  LogicalRepTupleData newtup;
1273  bool has_oldtup;
1274  TupleTableSlot *remoteslot;
1275  RangeTblEntry *target_rte;
1276  MemoryContext oldctx;
1277 
1278  if (handle_streamed_transaction('U', s))
1279  return;
1280 
1282 
1283  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
1284  &newtup);
1285  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1286  if (!should_apply_changes_for_rel(rel))
1287  {
1288  /*
1289  * The relation can't become interesting in the middle of the
1290  * transaction so it's safe to unlock it.
1291  */
1293  return;
1294  }
1295 
1296  /* Check if we can do the update. */
1298 
1299  /* Initialize the executor state. */
1300  estate = create_estate_for_relation(rel);
1301  remoteslot = ExecInitExtraTupleSlot(estate,
1302  RelationGetDescr(rel->localrel),
1303  &TTSOpsVirtual);
1304 
1305  /*
1306  * Populate updatedCols so that per-column triggers can fire. This could
1307  * include more columns than were actually changed on the publisher
1308  * because the logical replication protocol doesn't contain that
1309  * information. But it would for example exclude columns that only exist
1310  * on the subscriber, since we are not touching those.
1311  */
1312  target_rte = list_nth(estate->es_range_table, 0);
1313  for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
1314  {
1316  int remoteattnum = rel->attrmap->attnums[i];
1317 
1318  if (!att->attisdropped && remoteattnum >= 0)
1319  {
1320  Assert(remoteattnum < newtup.ncols);
1321  if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
1322  target_rte->updatedCols =
1323  bms_add_member(target_rte->updatedCols,
1325  }
1326  }
1327 
1328  fill_extraUpdatedCols(target_rte, RelationGetDescr(rel->localrel));
1329 
1331 
1332  /* Build the search tuple. */
1334  slot_store_data(remoteslot, rel,
1335  has_oldtup ? &oldtup : &newtup);
1336  MemoryContextSwitchTo(oldctx);
1337 
1338  /* For a partitioned table, apply update to correct partition. */
1339  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1341  remoteslot, &newtup, rel, CMD_UPDATE);
1342  else
1344  remoteslot, &newtup, rel);
1345 
1347 
1348  /* Handle queued AFTER triggers. */
1349  AfterTriggerEndQuery(estate);
1350 
1351  ExecResetTupleTable(estate->es_tupleTable, false);
1352  FreeExecutorState(estate);
1353 
1355 
1357 }
1358 
1359 /* Workhorse for apply_handle_update() */
1360 static void
1362  EState *estate, TupleTableSlot *remoteslot,
1363  LogicalRepTupleData *newtup,
1364  LogicalRepRelMapEntry *relmapentry)
1365 {
1366  Relation localrel = relinfo->ri_RelationDesc;
1367  EPQState epqstate;
1368  TupleTableSlot *localslot;
1369  bool found;
1370  MemoryContext oldctx;
1371 
1372  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1373  ExecOpenIndices(relinfo, false);
1374 
1375  found = FindReplTupleInLocalRel(estate, localrel,
1376  &relmapentry->remoterel,
1377  remoteslot, &localslot);
1378  ExecClearTuple(remoteslot);
1379 
1380  /*
1381  * Tuple found.
1382  *
1383  * Note this will fail if there are other conflicting unique indexes.
1384  */
1385  if (found)
1386  {
1387  /* Process and store remote tuple in the slot */
1389  slot_modify_data(remoteslot, localslot, relmapentry, newtup);
1390  MemoryContextSwitchTo(oldctx);
1391 
1392  EvalPlanQualSetSlot(&epqstate, remoteslot);
1393 
1394  /* Do the actual update. */
1395  ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
1396  }
1397  else
1398  {
1399  /*
1400  * The tuple to be updated could not be found.
1401  *
1402  * TODO what to do here, change the log level to LOG perhaps?
1403  */
1404  elog(DEBUG1,
1405  "logical replication did not find row for update "
1406  "in replication target relation \"%s\"",
1407  RelationGetRelationName(localrel));
1408  }
1409 
1410  /* Cleanup. */
1411  ExecCloseIndices(relinfo);
1412  EvalPlanQualEnd(&epqstate);
1413 }
1414 
1415 /*
1416  * Handle DELETE message.
1417  *
1418  * TODO: FDW support
1419  */
1420 static void
1422 {
1423  LogicalRepRelMapEntry *rel;
1424  LogicalRepTupleData oldtup;
1425  LogicalRepRelId relid;
1426  EState *estate;
1427  TupleTableSlot *remoteslot;
1428  MemoryContext oldctx;
1429 
1430  if (handle_streamed_transaction('D', s))
1431  return;
1432 
1434 
1435  relid = logicalrep_read_delete(s, &oldtup);
1436  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1437  if (!should_apply_changes_for_rel(rel))
1438  {
1439  /*
1440  * The relation can't become interesting in the middle of the
1441  * transaction so it's safe to unlock it.
1442  */
1444  return;
1445  }
1446 
1447  /* Check if we can do the delete. */
1449 
1450  /* Initialize the executor state. */
1451  estate = create_estate_for_relation(rel);
1452  remoteslot = ExecInitExtraTupleSlot(estate,
1453  RelationGetDescr(rel->localrel),
1454  &TTSOpsVirtual);
1455 
1457 
1458  /* Build the search tuple. */
1460  slot_store_data(remoteslot, rel, &oldtup);
1461  MemoryContextSwitchTo(oldctx);
1462 
1463  /* For a partitioned table, apply delete to correct partition. */
1464  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1466  remoteslot, NULL, rel, CMD_DELETE);
1467  else
1469  remoteslot, &rel->remoterel);
1470 
1472 
1473  /* Handle queued AFTER triggers. */
1474  AfterTriggerEndQuery(estate);
1475 
1476  ExecResetTupleTable(estate->es_tupleTable, false);
1477  FreeExecutorState(estate);
1478 
1480 
1482 }
1483 
1484 /* Workhorse for apply_handle_delete() */
1485 static void
1487  TupleTableSlot *remoteslot,
1488  LogicalRepRelation *remoterel)
1489 {
1490  Relation localrel = relinfo->ri_RelationDesc;
1491  EPQState epqstate;
1492  TupleTableSlot *localslot;
1493  bool found;
1494 
1495  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1496  ExecOpenIndices(relinfo, false);
1497 
1498  found = FindReplTupleInLocalRel(estate, localrel, remoterel,
1499  remoteslot, &localslot);
1500 
1501  /* If found delete it. */
1502  if (found)
1503  {
1504  EvalPlanQualSetSlot(&epqstate, localslot);
1505 
1506  /* Do the actual delete. */
1507  ExecSimpleRelationDelete(estate, &epqstate, localslot);
1508  }
1509  else
1510  {
1511  /* The tuple to be deleted could not be found. */
1512  elog(DEBUG1,
1513  "logical replication could not find row for delete "
1514  "in replication target relation \"%s\"",
1515  RelationGetRelationName(localrel));
1516  }
1517 
1518  /* Cleanup. */
1519  ExecCloseIndices(relinfo);
1520  EvalPlanQualEnd(&epqstate);
1521 }
1522 
1523 /*
1524  * Try to find a tuple received from the publication side (in 'remoteslot') in
1525  * the corresponding local relation using either replica identity index,
1526  * primary key or if needed, sequential scan.
1527  *
1528  * Local tuple, if found, is returned in '*localslot'.
1529  */
1530 static bool
1532  LogicalRepRelation *remoterel,
1533  TupleTableSlot *remoteslot,
1534  TupleTableSlot **localslot)
1535 {
1536  Oid idxoid;
1537  bool found;
1538 
1539  *localslot = table_slot_create(localrel, &estate->es_tupleTable);
1540 
1541  idxoid = GetRelationIdentityOrPK(localrel);
1542  Assert(OidIsValid(idxoid) ||
1543  (remoterel->replident == REPLICA_IDENTITY_FULL));
1544 
1545  if (OidIsValid(idxoid))
1546  found = RelationFindReplTupleByIndex(localrel, idxoid,
1548  remoteslot, *localslot);
1549  else
1550  found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
1551  remoteslot, *localslot);
1552 
1553  return found;
1554 }
1555 
1556 /*
1557  * This handles insert, update, delete on a partitioned table.
1558  */
1559 static void
1561  EState *estate,
1562  TupleTableSlot *remoteslot,
1563  LogicalRepTupleData *newtup,
1564  LogicalRepRelMapEntry *relmapentry,
1565  CmdType operation)
1566 {
1567  Relation parentrel = relinfo->ri_RelationDesc;
1568  ModifyTableState *mtstate = NULL;
1569  PartitionTupleRouting *proute = NULL;
1570  ResultRelInfo *partrelinfo;
1571  Relation partrel;
1572  TupleTableSlot *remoteslot_part;
1573  PartitionRoutingInfo *partinfo;
1574  TupleConversionMap *map;
1575  MemoryContext oldctx;
1576 
1577  /* ModifyTableState is needed for ExecFindPartition(). */
1578  mtstate = makeNode(ModifyTableState);
1579  mtstate->ps.plan = NULL;
1580  mtstate->ps.state = estate;
1581  mtstate->operation = operation;
1582  mtstate->resultRelInfo = relinfo;
1583  proute = ExecSetupPartitionTupleRouting(estate, mtstate, parentrel);
1584 
1585  /*
1586  * Find the partition to which the "search tuple" belongs.
1587  */
1588  Assert(remoteslot != NULL);
1590  partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
1591  remoteslot, estate);
1592  Assert(partrelinfo != NULL);
1593  partrel = partrelinfo->ri_RelationDesc;
1594 
1595  /*
1596  * To perform any of the operations below, the tuple must match the
1597  * partition's rowtype. Convert if needed or just copy, using a dedicated
1598  * slot to store the tuple in any case.
1599  */
1600  partinfo = partrelinfo->ri_PartitionInfo;
1601  remoteslot_part = partinfo->pi_PartitionTupleSlot;
1602  if (remoteslot_part == NULL)
1603  remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
1604  map = partinfo->pi_RootToPartitionMap;
1605  if (map != NULL)
1606  remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
1607  remoteslot_part);
1608  else
1609  {
1610  remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
1611  slot_getallattrs(remoteslot_part);
1612  }
1613  MemoryContextSwitchTo(oldctx);
1614 
1615  estate->es_result_relation_info = partrelinfo;
1616  switch (operation)
1617  {
1618  case CMD_INSERT:
1619  apply_handle_insert_internal(partrelinfo, estate,
1620  remoteslot_part);
1621  break;
1622 
1623  case CMD_DELETE:
1624  apply_handle_delete_internal(partrelinfo, estate,
1625  remoteslot_part,
1626  &relmapentry->remoterel);
1627  break;
1628 
1629  case CMD_UPDATE:
1630 
1631  /*
1632  * For UPDATE, depending on whether or not the updated tuple
1633  * satisfies the partition's constraint, perform a simple UPDATE
1634  * of the partition or move the updated tuple into a different
1635  * suitable partition.
1636  */
1637  {
1638  AttrMap *attrmap = map ? map->attrMap : NULL;
1639  LogicalRepRelMapEntry *part_entry;
1640  TupleTableSlot *localslot;
1641  ResultRelInfo *partrelinfo_new;
1642  bool found;
1643 
1644  part_entry = logicalrep_partition_open(relmapentry, partrel,
1645  attrmap);
1646 
1647  /* Get the matching local tuple from the partition. */
1648  found = FindReplTupleInLocalRel(estate, partrel,
1649  &part_entry->remoterel,
1650  remoteslot_part, &localslot);
1651 
1653  if (found)
1654  {
1655  /* Apply the update. */
1656  slot_modify_data(remoteslot_part, localslot,
1657  part_entry,
1658  newtup);
1659  MemoryContextSwitchTo(oldctx);
1660  }
1661  else
1662  {
1663  /*
1664  * The tuple to be updated could not be found.
1665  *
1666  * TODO what to do here, change the log level to LOG
1667  * perhaps?
1668  */
1669  elog(DEBUG1,
1670  "logical replication did not find row for update "
1671  "in replication target relation \"%s\"",
1672  RelationGetRelationName(partrel));
1673  }
1674 
1675  /*
1676  * Does the updated tuple still satisfy the current
1677  * partition's constraint?
1678  */
1679  if (!partrel->rd_rel->relispartition ||
1680  ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
1681  false))
1682  {
1683  /*
1684  * Yes, so simply UPDATE the partition. We don't call
1685  * apply_handle_update_internal() here, which would
1686  * normally do the following work, to avoid repeating some
1687  * work already done above to find the local tuple in the
1688  * partition.
1689  */
1690  EPQState epqstate;
1691 
1692  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1693  ExecOpenIndices(partrelinfo, false);
1694 
1695  EvalPlanQualSetSlot(&epqstate, remoteslot_part);
1696  ExecSimpleRelationUpdate(estate, &epqstate, localslot,
1697  remoteslot_part);
1698  ExecCloseIndices(partrelinfo);
1699  EvalPlanQualEnd(&epqstate);
1700  }
1701  else
1702  {
1703  /* Move the tuple into the new partition. */
1704 
1705  /*
1706  * New partition will be found using tuple routing, which
1707  * can only occur via the parent table. We might need to
1708  * convert the tuple to the parent's rowtype. Note that
1709  * this is the tuple found in the partition, not the
1710  * original search tuple received by this function.
1711  */
1712  if (map)
1713  {
1714  TupleConversionMap *PartitionToRootMap =
1716  RelationGetDescr(parentrel));
1717 
1718  remoteslot =
1719  execute_attr_map_slot(PartitionToRootMap->attrMap,
1720  remoteslot_part, remoteslot);
1721  }
1722  else
1723  {
1724  remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
1725  slot_getallattrs(remoteslot);
1726  }
1727 
1728 
1729  /* Find the new partition. */
1731  partrelinfo_new = ExecFindPartition(mtstate, relinfo,
1732  proute, remoteslot,
1733  estate);
1734  MemoryContextSwitchTo(oldctx);
1735  Assert(partrelinfo_new != partrelinfo);
1736 
1737  /* DELETE old tuple found in the old partition. */
1738  estate->es_result_relation_info = partrelinfo;
1739  apply_handle_delete_internal(partrelinfo, estate,
1740  localslot,
1741  &relmapentry->remoterel);
1742 
1743  /* INSERT new tuple into the new partition. */
1744 
1745  /*
1746  * Convert the replacement tuple to match the destination
1747  * partition rowtype.
1748  */
1750  partrel = partrelinfo_new->ri_RelationDesc;
1751  partinfo = partrelinfo_new->ri_PartitionInfo;
1752  remoteslot_part = partinfo->pi_PartitionTupleSlot;
1753  if (remoteslot_part == NULL)
1754  remoteslot_part = table_slot_create(partrel,
1755  &estate->es_tupleTable);
1756  map = partinfo->pi_RootToPartitionMap;
1757  if (map != NULL)
1758  {
1759  remoteslot_part = execute_attr_map_slot(map->attrMap,
1760  remoteslot,
1761  remoteslot_part);
1762  }
1763  else
1764  {
1765  remoteslot_part = ExecCopySlot(remoteslot_part,
1766  remoteslot);
1767  slot_getallattrs(remoteslot);
1768  }
1769  MemoryContextSwitchTo(oldctx);
1770  estate->es_result_relation_info = partrelinfo_new;
1771  apply_handle_insert_internal(partrelinfo_new, estate,
1772  remoteslot_part);
1773  }
1774  }
1775  break;
1776 
1777  default:
1778  elog(ERROR, "unrecognized CmdType: %d", (int) operation);
1779  break;
1780  }
1781 
1782  ExecCleanupTupleRouting(mtstate, proute);
1783 }
1784 
1785 /*
1786  * Handle TRUNCATE message.
1787  *
1788  * TODO: FDW support
1789  */
1790 static void
1792 {
1793  bool cascade = false;
1794  bool restart_seqs = false;
1795  List *remote_relids = NIL;
1796  List *remote_rels = NIL;
1797  List *rels = NIL;
1798  List *part_rels = NIL;
1799  List *relids = NIL;
1800  List *relids_logged = NIL;
1801  ListCell *lc;
1802 
1803  if (handle_streamed_transaction('T', s))
1804  return;
1805 
1807 
1808  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
1809 
1810  foreach(lc, remote_relids)
1811  {
1812  LogicalRepRelId relid = lfirst_oid(lc);
1813  LogicalRepRelMapEntry *rel;
1814 
1815  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1816  if (!should_apply_changes_for_rel(rel))
1817  {
1818  /*
1819  * The relation can't become interesting in the middle of the
1820  * transaction so it's safe to unlock it.
1821  */
1823  continue;
1824  }
1825 
1826  remote_rels = lappend(remote_rels, rel);
1827  rels = lappend(rels, rel->localrel);
1828  relids = lappend_oid(relids, rel->localreloid);
1830  relids_logged = lappend_oid(relids_logged, rel->localreloid);
1831 
1832  /*
1833  * Truncate partitions if we got a message to truncate a partitioned
1834  * table.
1835  */
1836  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1837  {
1838  ListCell *child;
1839  List *children = find_all_inheritors(rel->localreloid,
1841  NULL);
1842 
1843  foreach(child, children)
1844  {
1845  Oid childrelid = lfirst_oid(child);
1846  Relation childrel;
1847 
1848  if (list_member_oid(relids, childrelid))
1849  continue;
1850 
1851  /* find_all_inheritors already got lock */
1852  childrel = table_open(childrelid, NoLock);
1853 
1854  /*
1855  * Ignore temp tables of other backends. See similar code in
1856  * ExecuteTruncate().
1857  */
1858  if (RELATION_IS_OTHER_TEMP(childrel))
1859  {
1860  table_close(childrel, RowExclusiveLock);
1861  continue;
1862  }
1863 
1864  rels = lappend(rels, childrel);
1865  part_rels = lappend(part_rels, childrel);
1866  relids = lappend_oid(relids, childrelid);
1867  /* Log this relation only if needed for logical decoding */
1868  if (RelationIsLogicallyLogged(childrel))
1869  relids_logged = lappend_oid(relids_logged, childrelid);
1870  }
1871  }
1872  }
1873 
1874  /*
1875  * Even if we used CASCADE on the upstream primary we explicitly default
1876  * to replaying changes without further cascading. This might be later
1877  * changeable with a user specified option.
1878  */
1879  ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
1880 
1881  foreach(lc, remote_rels)
1882  {
1883  LogicalRepRelMapEntry *rel = lfirst(lc);
1884 
1886  }
1887  foreach(lc, part_rels)
1888  {
1889  Relation rel = lfirst(lc);
1890 
1891  table_close(rel, NoLock);
1892  }
1893 
1895 }
1896 
1897 
1898 /*
1899  * Logical replication protocol message dispatcher.
1900  */
1901 static void
1903 {
1904  char action = pq_getmsgbyte(s);
1905 
1906  switch (action)
1907  {
1908  /* BEGIN */
1909  case 'B':
1910  apply_handle_begin(s);
1911  break;
1912  /* COMMIT */
1913  case 'C':
1915  break;
1916  /* INSERT */
1917  case 'I':
1919  break;
1920  /* UPDATE */
1921  case 'U':
1923  break;
1924  /* DELETE */
1925  case 'D':
1927  break;
1928  /* TRUNCATE */
1929  case 'T':
1931  break;
1932  /* RELATION */
1933  case 'R':
1935  break;
1936  /* TYPE */
1937  case 'Y':
1938  apply_handle_type(s);
1939  break;
1940  /* ORIGIN */
1941  case 'O':
1943  break;
1944  /* STREAM START */
1945  case 'S':
1947  break;
1948  /* STREAM END */
1949  case 'E':
1951  break;
1952  /* STREAM ABORT */
1953  case 'A':
1955  break;
1956  /* STREAM COMMIT */
1957  case 'c':
1959  break;
1960  default:
1961  ereport(ERROR,
1962  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1963  errmsg("invalid logical replication message type \"%c\"", action)));
1964  }
1965 }
1966 
1967 /*
1968  * Figure out which write/flush positions to report to the walsender process.
1969  *
1970  * We can't simply report back the last LSN the walsender sent us because the
1971  * local transaction might not yet be flushed to disk locally. Instead we
1972  * build a list that associates local with remote LSNs for every commit. When
1973  * reporting back the flush position to the sender we iterate that list and
1974  * check which entries on it are already locally flushed. Those we can report
1975  * as having been flushed.
1976  *
1977  * The have_pending_txes is true if there are outstanding transactions that
1978  * need to be flushed.
1979  */
1980 static void
1982  bool *have_pending_txes)
1983 {
1984  dlist_mutable_iter iter;
1985  XLogRecPtr local_flush = GetFlushRecPtr();
1986 
1987  *write = InvalidXLogRecPtr;
1988  *flush = InvalidXLogRecPtr;
1989 
1990  dlist_foreach_modify(iter, &lsn_mapping)
1991  {
1992  FlushPosition *pos =
1994 
1995  *write = pos->remote_end;
1996 
1997  if (pos->local_end <= local_flush)
1998  {
1999  *flush = pos->remote_end;
2000  dlist_delete(iter.cur);
2001  pfree(pos);
2002  }
2003  else
2004  {
2005  /*
2006  * Don't want to uselessly iterate over the rest of the list which
2007  * could potentially be long. Instead get the last element and
2008  * grab the write position from there.
2009  */
2011  &lsn_mapping);
2012  *write = pos->remote_end;
2013  *have_pending_txes = true;
2014  return;
2015  }
2016  }
2017 
2018  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
2019 }
2020 
2021 /*
2022  * Store current remote/local lsn pair in the tracking list.
2023  */
2024 static void
2026 {
2027  FlushPosition *flushpos;
2028 
2029  /* Need to do this in permanent context */
2030  MemoryContextSwitchTo(ApplyContext);
2031 
2032  /* Track commit lsn */
2033  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
2034  flushpos->local_end = XactLastCommitEnd;
2035  flushpos->remote_end = remote_lsn;
2036 
2037  dlist_push_tail(&lsn_mapping, &flushpos->node);
2038  MemoryContextSwitchTo(ApplyMessageContext);
2039 }
2040 
2041 
2042 /* Update statistics of the worker. */
2043 static void
2044 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
2045 {
2046  MyLogicalRepWorker->last_lsn = last_lsn;
2047  MyLogicalRepWorker->last_send_time = send_time;
2049  if (reply)
2050  {
2051  MyLogicalRepWorker->reply_lsn = last_lsn;
2052  MyLogicalRepWorker->reply_time = send_time;
2053  }
2054 }
2055 
2056 /*
2057  * Apply main loop.
2058  */
2059 static void
2061 {
2062  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
2063  bool ping_sent = false;
2064 
2065  /*
2066  * Init the ApplyMessageContext which we clean up after each replication
2067  * protocol message.
2068  */
2069  ApplyMessageContext = AllocSetContextCreate(ApplyContext,
2070  "ApplyMessageContext",
2072 
2073  /*
2074  * This memory context is used for per-stream data when the streaming mode
2075  * is enabled. This context is reset on each stream stop.
2076  */
2077  LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
2078  "LogicalStreamingContext",
2080 
2081  /* mark as idle, before starting to loop */
2083 
2084  /* This outer loop iterates once per wait. */
2085  for (;;)
2086  {
2088  int rc;
2089  int len;
2090  char *buf = NULL;
2091  bool endofstream = false;
2092  long wait_time;
2093 
2095 
2096  MemoryContextSwitchTo(ApplyMessageContext);
2097 
2098  len = walrcv_receive(wrconn, &buf, &fd);
2099 
2100  if (len != 0)
2101  {
2102  /* Loop to process all available data (without blocking). */
2103  for (;;)
2104  {
2106 
2107  if (len == 0)
2108  {
2109  break;
2110  }
2111  else if (len < 0)
2112  {
2113  ereport(LOG,
2114  (errmsg("data stream from publisher has ended")));
2115  endofstream = true;
2116  break;
2117  }
2118  else
2119  {
2120  int c;
2121  StringInfoData s;
2122 
2123  /* Reset timeout. */
2124  last_recv_timestamp = GetCurrentTimestamp();
2125  ping_sent = false;
2126 
2127  /* Ensure we are reading the data into our memory context. */
2128  MemoryContextSwitchTo(ApplyMessageContext);
2129 
2130  s.data = buf;
2131  s.len = len;
2132  s.cursor = 0;
2133  s.maxlen = -1;
2134 
2135  c = pq_getmsgbyte(&s);
2136 
2137  if (c == 'w')
2138  {
2139  XLogRecPtr start_lsn;
2140  XLogRecPtr end_lsn;
2141  TimestampTz send_time;
2142 
2143  start_lsn = pq_getmsgint64(&s);
2144  end_lsn = pq_getmsgint64(&s);
2145  send_time = pq_getmsgint64(&s);
2146 
2147  if (last_received < start_lsn)
2148  last_received = start_lsn;
2149 
2150  if (last_received < end_lsn)
2151  last_received = end_lsn;
2152 
2153  UpdateWorkerStats(last_received, send_time, false);
2154 
2155  apply_dispatch(&s);
2156  }
2157  else if (c == 'k')
2158  {
2159  XLogRecPtr end_lsn;
2161  bool reply_requested;
2162 
2163  end_lsn = pq_getmsgint64(&s);
2164  timestamp = pq_getmsgint64(&s);
2165  reply_requested = pq_getmsgbyte(&s);
2166 
2167  if (last_received < end_lsn)
2168  last_received = end_lsn;
2169 
2170  send_feedback(last_received, reply_requested, false);
2171  UpdateWorkerStats(last_received, timestamp, true);
2172  }
2173  /* other message types are purposefully ignored */
2174 
2175  MemoryContextReset(ApplyMessageContext);
2176  }
2177 
2178  len = walrcv_receive(wrconn, &buf, &fd);
2179  }
2180  }
2181 
2182  /* confirm all writes so far */
2183  send_feedback(last_received, false, false);
2184 
2186  {
2187  /*
2188  * If we didn't get any transactions for a while there might be
2189  * unconsumed invalidation messages in the queue, consume them
2190  * now.
2191  */
2194 
2195  /* Process any table synchronization changes. */
2196  process_syncing_tables(last_received);
2197  }
2198 
2199  /* Cleanup the memory. */
2200  MemoryContextResetAndDeleteChildren(ApplyMessageContext);
2202 
2203  /* Check if we need to exit the streaming loop. */
2204  if (endofstream)
2205  {
2206  TimeLineID tli;
2207 
2208  walrcv_endstreaming(wrconn, &tli);
2209  break;
2210  }
2211 
2212  /*
2213  * Wait for more data or latch. If we have unflushed transactions,
2214  * wake up after WalWriterDelay to see if they've been flushed yet (in
2215  * which case we should send a feedback message). Otherwise, there's
2216  * no particular urgency about waking up unless we get data or a
2217  * signal.
2218  */
2219  if (!dlist_is_empty(&lsn_mapping))
2220  wait_time = WalWriterDelay;
2221  else
2222  wait_time = NAPTIME_PER_CYCLE;
2223 
2227  fd, wait_time,
2229 
2230  if (rc & WL_LATCH_SET)
2231  {
2234  }
2235 
2236  if (ConfigReloadPending)
2237  {
2238  ConfigReloadPending = false;
2240  }
2241 
2242  if (rc & WL_TIMEOUT)
2243  {
2244  /*
2245  * We didn't receive anything new. If we haven't heard anything
2246  * from the server for more than wal_receiver_timeout / 2, ping
2247  * the server. Also, if it's been longer than
2248  * wal_receiver_status_interval since the last update we sent,
2249  * send a status update to the primary anyway, to report any
2250  * progress in applying WAL.
2251  */
2252  bool requestReply = false;
2253 
2254  /*
2255  * Check if time since last receive from standby has reached the
2256  * configured limit.
2257  */
2258  if (wal_receiver_timeout > 0)
2259  {
2261  TimestampTz timeout;
2262 
2263  timeout =
2264  TimestampTzPlusMilliseconds(last_recv_timestamp,
2266 
2267  if (now >= timeout)
2268  ereport(ERROR,
2269  (errmsg("terminating logical replication worker due to timeout")));
2270 
2271  /* Check to see if it's time for a ping. */
2272  if (!ping_sent)
2273  {
2274  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
2275  (wal_receiver_timeout / 2));
2276  if (now >= timeout)
2277  {
2278  requestReply = true;
2279  ping_sent = true;
2280  }
2281  }
2282  }
2283 
2284  send_feedback(last_received, requestReply, requestReply);
2285  }
2286  }
2287 }
2288 
2289 /*
2290  * Send a Standby Status Update message to server.
2291  *
2292  * 'recvpos' is the latest LSN we've received data to, force is set if we need
2293  * to send a response to avoid timeouts.
2294  */
2295 static void
2296 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
2297 {
2298  static StringInfo reply_message = NULL;
2299  static TimestampTz send_time = 0;
2300 
2301  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
2302  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
2303  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
2304 
2305  XLogRecPtr writepos;
2306  XLogRecPtr flushpos;
2307  TimestampTz now;
2308  bool have_pending_txes;
2309 
2310  /*
2311  * If the user doesn't want status to be reported to the publisher, be
2312  * sure to exit before doing anything at all.
2313  */
2314  if (!force && wal_receiver_status_interval <= 0)
2315  return;
2316 
2317  /* It's legal to not pass a recvpos */
2318  if (recvpos < last_recvpos)
2319  recvpos = last_recvpos;
2320 
2321  get_flush_position(&writepos, &flushpos, &have_pending_txes);
2322 
2323  /*
2324  * No outstanding transactions to flush, we can report the latest received
2325  * position. This is important for synchronous replication.
2326  */
2327  if (!have_pending_txes)
2328  flushpos = writepos = recvpos;
2329 
2330  if (writepos < last_writepos)
2331  writepos = last_writepos;
2332 
2333  if (flushpos < last_flushpos)
2334  flushpos = last_flushpos;
2335 
2336  now = GetCurrentTimestamp();
2337 
2338  /* if we've already reported everything we're good */
2339  if (!force &&
2340  writepos == last_writepos &&
2341  flushpos == last_flushpos &&
2342  !TimestampDifferenceExceeds(send_time, now,
2344  return;
2345  send_time = now;
2346 
2347  if (!reply_message)
2348  {
2349  MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
2350 
2351  reply_message = makeStringInfo();
2352  MemoryContextSwitchTo(oldctx);
2353  }
2354  else
2355  resetStringInfo(reply_message);
2356 
2357  pq_sendbyte(reply_message, 'r');
2358  pq_sendint64(reply_message, recvpos); /* write */
2359  pq_sendint64(reply_message, flushpos); /* flush */
2360  pq_sendint64(reply_message, writepos); /* apply */
2361  pq_sendint64(reply_message, now); /* sendTime */
2362  pq_sendbyte(reply_message, requestReply); /* replyRequested */
2363 
2364  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
2365  force,
2366  (uint32) (recvpos >> 32), (uint32) recvpos,
2367  (uint32) (writepos >> 32), (uint32) writepos,
2368  (uint32) (flushpos >> 32), (uint32) flushpos
2369  );
2370 
2371  walrcv_send(wrconn, reply_message->data, reply_message->len);
2372 
2373  if (recvpos > last_recvpos)
2374  last_recvpos = recvpos;
2375  if (writepos > last_writepos)
2376  last_writepos = writepos;
2377  if (flushpos > last_flushpos)
2378  last_flushpos = flushpos;
2379 }
2380 
2381 /*
2382  * Reread subscription info if needed. Most changes will be exit.
2383  */
2384 static void
2386 {
2387  MemoryContext oldctx;
2389  bool started_tx = false;
2390 
2391  /* When cache state is valid there is nothing to do here. */
2392  if (MySubscriptionValid)
2393  return;
2394 
2395  /* This function might be called inside or outside of transaction. */
2396  if (!IsTransactionState())
2397  {
2399  started_tx = true;
2400  }
2401 
2402  /* Ensure allocations in permanent context. */
2403  oldctx = MemoryContextSwitchTo(ApplyContext);
2404 
2405  newsub = GetSubscription(MyLogicalRepWorker->subid, true);
2406 
2407  /*
2408  * Exit if the subscription was removed. This normally should not happen
2409  * as the worker gets killed during DROP SUBSCRIPTION.
2410  */
2411  if (!newsub)
2412  {
2413  ereport(LOG,
2414  (errmsg("logical replication apply worker for subscription \"%s\" will "
2415  "stop because the subscription was removed",
2416  MySubscription->name)));
2417 
2418  proc_exit(0);
2419  }
2420 
2421  /*
2422  * Exit if the subscription was disabled. This normally should not happen
2423  * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
2424  */
2425  if (!newsub->enabled)
2426  {
2427  ereport(LOG,
2428  (errmsg("logical replication apply worker for subscription \"%s\" will "
2429  "stop because the subscription was disabled",
2430  MySubscription->name)));
2431 
2432  proc_exit(0);
2433  }
2434 
2435  /* !slotname should never happen when enabled is true. */
2436  Assert(newsub->slotname);
2437 
2438  /*
2439  * Exit if any parameter that affects the remote connection was changed.
2440  * The launcher will start a new worker.
2441  */
2442  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
2443  strcmp(newsub->name, MySubscription->name) != 0 ||
2444  strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
2445  newsub->binary != MySubscription->binary ||
2446  newsub->stream != MySubscription->stream ||
2447  !equal(newsub->publications, MySubscription->publications))
2448  {
2449  ereport(LOG,
2450  (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
2451  MySubscription->name)));
2452 
2453  proc_exit(0);
2454  }
2455 
2456  /* Check for other changes that should never happen too. */
2457  if (newsub->dbid != MySubscription->dbid)
2458  {
2459  elog(ERROR, "subscription %u changed unexpectedly",
2461  }
2462 
2463  /* Clean old subscription info and switch to new one. */
2464  FreeSubscription(MySubscription);
2465  MySubscription = newsub;
2466 
2467  MemoryContextSwitchTo(oldctx);
2468 
2469  /* Change synchronous commit according to the user's wishes */
2470  SetConfigOption("synchronous_commit", MySubscription->synccommit,
2472 
2473  if (started_tx)
2475 
2476  MySubscriptionValid = true;
2477 }
2478 
2479 /*
2480  * Callback from subscription syscache invalidation.
2481  */
2482 static void
2483 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
2484 {
2485  MySubscriptionValid = false;
2486 }
2487 
2488 /*
2489  * subxact_info_write
2490  * Store information about subxacts for a toplevel transaction.
2491  *
2492  * For each subxact we store offset of it's first change in the main file.
2493  * The file is always over-written as a whole.
2494  *
2495  * XXX We should only store subxacts that were not aborted yet.
2496  */
2497 static void
2499 {
2500  char path[MAXPGPATH];
2501  bool found;
2502  Size len;
2503  StreamXidHash *ent;
2504  BufFile *fd;
2505 
2507 
2508  /* find the xid entry in the xidhash */
2509  ent = (StreamXidHash *) hash_search(xidhash,
2510  (void *) &xid,
2511  HASH_FIND,
2512  &found);
2513  /* we must found the entry for its top transaction by this time */
2514  Assert(found);
2515 
2516  /*
2517  * If there is no subtransaction then nothing to do, but if already have
2518  * subxact file then delete that.
2519  */
2520  if (subxact_data.nsubxacts == 0)
2521  {
2522  if (ent->subxact_fileset)
2523  {
2526  pfree(ent->subxact_fileset);
2527  ent->subxact_fileset = NULL;
2528  }
2529  return;
2530  }
2531 
2532  subxact_filename(path, subid, xid);
2533 
2534  /*
2535  * Create the subxact file if it not already created, otherwise open the
2536  * existing file.
2537  */
2538  if (ent->subxact_fileset == NULL)
2539  {
2540  MemoryContext oldctx;
2541 
2542  /*
2543  * We need to maintain shared fileset across multiple stream
2544  * start/stop calls. So, need to allocate it in a persistent context.
2545  */
2546  oldctx = MemoryContextSwitchTo(ApplyContext);
2547  ent->subxact_fileset = palloc(sizeof(SharedFileSet));
2548  SharedFileSetInit(ent->subxact_fileset, NULL);
2549  MemoryContextSwitchTo(oldctx);
2550 
2551  fd = BufFileCreateShared(ent->subxact_fileset, path);
2552  }
2553  else
2554  fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR);
2555 
2556  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
2557 
2558  /* Write the subxact count and subxact info */
2559  BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
2560  BufFileWrite(fd, subxact_data.subxacts, len);
2561 
2562  BufFileClose(fd);
2563 
2564  /* free the memory allocated for subxact info */
2566 }
2567 
2568 /*
2569  * subxact_info_read
2570  * Restore information about subxacts of a streamed transaction.
2571  *
2572  * Read information about subxacts into the structure subxact_data that can be
2573  * used later.
2574  */
2575 static void
2577 {
2578  char path[MAXPGPATH];
2579  bool found;
2580  Size len;
2581  BufFile *fd;
2582  StreamXidHash *ent;
2583  MemoryContext oldctx;
2584 
2586  Assert(!subxact_data.subxacts);
2587  Assert(subxact_data.nsubxacts == 0);
2588  Assert(subxact_data.nsubxacts_max == 0);
2589 
2590  /* Find the stream xid entry in the xidhash */
2591  ent = (StreamXidHash *) hash_search(xidhash,
2592  (void *) &xid,
2593  HASH_FIND,
2594  &found);
2595 
2596  /*
2597  * If subxact_fileset is not valid that mean we don't have any subxact
2598  * info
2599  */
2600  if (ent->subxact_fileset == NULL)
2601  return;
2602 
2603  subxact_filename(path, subid, xid);
2604 
2605  fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDONLY);
2606 
2607  /* read number of subxact items */
2608  if (BufFileRead(fd, &subxact_data.nsubxacts,
2609  sizeof(subxact_data.nsubxacts)) !=
2610  sizeof(subxact_data.nsubxacts))
2611  ereport(ERROR,
2613  errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
2614  path)));
2615 
2616  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
2617 
2618  /* we keep the maximum as a power of 2 */
2619  subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts);
2620 
2621  /*
2622  * Allocate subxact information in the logical streaming context. We need
2623  * this information during the complete stream so that we can add the sub
2624  * transaction info to this. On stream stop we will flush this information
2625  * to the subxact file and reset the logical streaming context.
2626  */
2627  oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
2628  subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
2629  sizeof(SubXactInfo));
2630  MemoryContextSwitchTo(oldctx);
2631 
2632  if ((len > 0) && ((BufFileRead(fd, subxact_data.subxacts, len)) != len))
2633  ereport(ERROR,
2635  errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
2636  path)));
2637 
2638  BufFileClose(fd);
2639 }
2640 
2641 /*
2642  * subxact_info_add
2643  * Add information about a subxact (offset in the main file).
2644  */
2645 static void
2647 {
2648  SubXactInfo *subxacts = subxact_data.subxacts;
2649  int64 i;
2650 
2651  /* We must have a valid top level stream xid and a stream fd. */
2653  Assert(stream_fd != NULL);
2654 
2655  /*
2656  * If the XID matches the toplevel transaction, we don't want to add it.
2657  */
2658  if (stream_xid == xid)
2659  return;
2660 
2661  /*
2662  * In most cases we're checking the same subxact as we've already seen in
2663  * the last call, so make sure to ignore it (this change comes later).
2664  */
2665  if (subxact_data.subxact_last == xid)
2666  return;
2667 
2668  /* OK, remember we're processing this XID. */
2669  subxact_data.subxact_last = xid;
2670 
2671  /*
2672  * Check if the transaction is already present in the array of subxact. We
2673  * intentionally scan the array from the tail, because we're likely adding
2674  * a change for the most recent subtransactions.
2675  *
2676  * XXX Can we rely on the subxact XIDs arriving in sorted order? That
2677  * would allow us to use binary search here.
2678  */
2679  for (i = subxact_data.nsubxacts; i > 0; i--)
2680  {
2681  /* found, so we're done */
2682  if (subxacts[i - 1].xid == xid)
2683  return;
2684  }
2685 
2686  /* This is a new subxact, so we need to add it to the array. */
2687  if (subxact_data.nsubxacts == 0)
2688  {
2689  MemoryContext oldctx;
2690 
2691  subxact_data.nsubxacts_max = 128;
2692 
2693  /*
2694  * Allocate this memory for subxacts in per-stream context, see
2695  * subxact_info_read.
2696  */
2697  oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
2698  subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
2699  MemoryContextSwitchTo(oldctx);
2700  }
2701  else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
2702  {
2703  subxact_data.nsubxacts_max *= 2;
2704  subxacts = repalloc(subxacts,
2705  subxact_data.nsubxacts_max * sizeof(SubXactInfo));
2706  }
2707 
2708  subxacts[subxact_data.nsubxacts].xid = xid;
2709 
2710  /*
2711  * Get the current offset of the stream file and store it as offset of
2712  * this subxact.
2713  */
2714  BufFileTell(stream_fd,
2715  &subxacts[subxact_data.nsubxacts].fileno,
2716  &subxacts[subxact_data.nsubxacts].offset);
2717 
2718  subxact_data.nsubxacts++;
2719  subxact_data.subxacts = subxacts;
2720 }
2721 
2722 /* format filename for file containing the info about subxacts */
2723 static inline void
2724 subxact_filename(char *path, Oid subid, TransactionId xid)
2725 {
2726  snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
2727 }
2728 
2729 /* format filename for file containing serialized changes */
2730 static inline void
2731 changes_filename(char *path, Oid subid, TransactionId xid)
2732 {
2733  snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
2734 }
2735 
2736 /*
2737  * stream_cleanup_files
2738  * Cleanup files for a subscription / toplevel transaction.
2739  *
2740  * Remove files with serialized changes and subxact info for a particular
2741  * toplevel transaction. Each subscription has a separate set of files.
2742  */
2743 static void
2745 {
2746  char path[MAXPGPATH];
2747  StreamXidHash *ent;
2748 
2749  /* Remove the xid entry from the stream xid hash */
2750  ent = (StreamXidHash *) hash_search(xidhash,
2751  (void *) &xid,
2752  HASH_REMOVE,
2753  NULL);
2754  /* By this time we must have created the transaction entry */
2755  Assert(ent != NULL);
2756 
2757  /* Delete the change file and release the stream fileset memory */
2758  changes_filename(path, subid, xid);
2760  pfree(ent->stream_fileset);
2761  ent->stream_fileset = NULL;
2762 
2763  /* Delete the subxact file and release the memory, if it exist */
2764  if (ent->subxact_fileset)
2765  {
2766  subxact_filename(path, subid, xid);
2768  pfree(ent->subxact_fileset);
2769  ent->subxact_fileset = NULL;
2770  }
2771 }
2772 
2773 /*
2774  * stream_open_file
2775  * Open a file that we'll use to serialize changes for a toplevel
2776  * transaction.
2777  *
2778  * Open a file for streamed changes from a toplevel transaction identified
2779  * by stream_xid (global variable). If it's the first chunk of streamed
2780  * changes for this transaction, initialize the shared fileset and create the
2781  * buffile, otherwise open the previously created file.
2782  *
2783  * This can only be called at the beginning of a "streaming" block, i.e.
2784  * between stream_start/stream_stop messages from the upstream.
2785  */
2786 static void
2787 stream_open_file(Oid subid, TransactionId xid, bool first_segment)
2788 {
2789  char path[MAXPGPATH];
2790  bool found;
2791  MemoryContext oldcxt;
2792  StreamXidHash *ent;
2793 
2795  Assert(OidIsValid(subid));
2797  Assert(stream_fd == NULL);
2798 
2799  /* create or find the xid entry in the xidhash */
2800  ent = (StreamXidHash *) hash_search(xidhash,
2801  (void *) &xid,
2803  &found);
2804  Assert(first_segment || found);
2805  changes_filename(path, subid, xid);
2806  elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
2807 
2808  /*
2809  * Create/open the buffiles under the logical streaming context so that we
2810  * have those files until stream stop.
2811  */
2812  oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
2813 
2814  /*
2815  * If this is the first streamed segment, the file must not exist, so make
2816  * sure we're the ones creating it. Otherwise just open the file for
2817  * writing, in append mode.
2818  */
2819  if (first_segment)
2820  {
2821  MemoryContext savectx;
2822  SharedFileSet *fileset;
2823 
2824  /*
2825  * We need to maintain shared fileset across multiple stream
2826  * start/stop calls. So, need to allocate it in a persistent context.
2827  */
2828  savectx = MemoryContextSwitchTo(ApplyContext);
2829  fileset = palloc(sizeof(SharedFileSet));
2830 
2831  SharedFileSetInit(fileset, NULL);
2832  MemoryContextSwitchTo(savectx);
2833 
2834  stream_fd = BufFileCreateShared(fileset, path);
2835 
2836  /* Remember the fileset for the next stream of the same transaction */
2837  ent->xid = xid;
2838  ent->stream_fileset = fileset;
2839  ent->subxact_fileset = NULL;
2840  }
2841  else
2842  {
2843  /*
2844  * Open the file and seek to the end of the file because we always
2845  * append the changes file.
2846  */
2847  stream_fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
2848  BufFileSeek(stream_fd, 0, 0, SEEK_END);
2849  }
2850 
2851  MemoryContextSwitchTo(oldcxt);
2852 }
2853 
2854 /*
2855  * stream_close_file
2856  * Close the currently open file with streamed changes.
2857  *
2858  * This can only be called at the end of a streaming block, i.e. at stream_stop
2859  * message from the upstream.
2860  */
2861 static void
2863 {
2866  Assert(stream_fd != NULL);
2867 
2868  BufFileClose(stream_fd);
2869 
2871  stream_fd = NULL;
2872 }
2873 
2874 /*
2875  * stream_write_change
2876  * Serialize a change to a file for the current toplevel transaction.
2877  *
2878  * The change is serialized in a simple format, with length (not including
2879  * the length), action code (identifying the message type) and message
2880  * contents (without the subxact TransactionId value).
2881  */
2882 static void
2884 {
2885  int len;
2886 
2889  Assert(stream_fd != NULL);
2890 
2891  /* total on-disk size, including the action type character */
2892  len = (s->len - s->cursor) + sizeof(char);
2893 
2894  /* first write the size */
2895  BufFileWrite(stream_fd, &len, sizeof(len));
2896 
2897  /* then the action */
2898  BufFileWrite(stream_fd, &action, sizeof(action));
2899 
2900  /* and finally the remaining part of the buffer (after the XID) */
2901  len = (s->len - s->cursor);
2902 
2903  BufFileWrite(stream_fd, &s->data[s->cursor], len);
2904 }
2905 
2906 /*
2907  * Cleanup the memory for subxacts and reset the related variables.
2908  */
2909 static inline void
2911 {
2912  if (subxact_data.subxacts)
2913  pfree(subxact_data.subxacts);
2914 
2915  subxact_data.subxacts = NULL;
2916  subxact_data.subxact_last = InvalidTransactionId;
2917  subxact_data.nsubxacts = 0;
2918  subxact_data.nsubxacts_max = 0;
2919 }
2920 
2921 /* Logical Replication Apply worker entry point */
2922 void
2924 {
2925  int worker_slot = DatumGetInt32(main_arg);
2926  MemoryContext oldctx;
2927  char originname[NAMEDATALEN];
2928  XLogRecPtr origin_startpos;
2929  char *myslotname;
2931 
2932  /* Attach to slot */
2933  logicalrep_worker_attach(worker_slot);
2934 
2935  /* Setup signal handling */
2937  pqsignal(SIGTERM, die);
2939 
2940  /*
2941  * We don't currently need any ResourceOwner in a walreceiver process, but
2942  * if we did, we could call CreateAuxProcessResourceOwner here.
2943  */
2944 
2945  /* Initialise stats to a sanish value */
2948 
2949  /* Load the libpq-specific functions */
2950  load_file("libpqwalreceiver", false);
2951 
2952  /* Run as replica session replication role. */
2953  SetConfigOption("session_replication_role", "replica",
2955 
2956  /* Connect to our database. */
2959  0);
2960 
2961  /*
2962  * Set always-secure search path, so malicious users can't redirect user
2963  * code (e.g. pg_index.indexprs).
2964  */
2965  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
2966 
2967  /* Load the subscription into persistent memory context. */
2968  ApplyContext = AllocSetContextCreate(TopMemoryContext,
2969  "ApplyContext",
2972  oldctx = MemoryContextSwitchTo(ApplyContext);
2973 
2974  MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
2975  if (!MySubscription)
2976  {
2977  ereport(LOG,
2978  (errmsg("logical replication apply worker for subscription %u will not "
2979  "start because the subscription was removed during startup",
2981  proc_exit(0);
2982  }
2983 
2984  MySubscriptionValid = true;
2985  MemoryContextSwitchTo(oldctx);
2986 
2987  if (!MySubscription->enabled)
2988  {
2989  ereport(LOG,
2990  (errmsg("logical replication apply worker for subscription \"%s\" will not "
2991  "start because the subscription was disabled during startup",
2992  MySubscription->name)));
2993 
2994  proc_exit(0);
2995  }
2996 
2997  /* Setup synchronous commit according to the user's wishes */
2998  SetConfigOption("synchronous_commit", MySubscription->synccommit,
3000 
3001  /* Keep us informed about subscription changes. */
3004  (Datum) 0);
3005 
3006  if (am_tablesync_worker())
3007  ereport(LOG,
3008  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
3009  MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
3010  else
3011  ereport(LOG,
3012  (errmsg("logical replication apply worker for subscription \"%s\" has started",
3013  MySubscription->name)));
3014 
3016 
3017  /* Connect to the origin and start the replication. */
3018  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
3019  MySubscription->conninfo);
3020 
3021  if (am_tablesync_worker())
3022  {
3023  char *syncslotname;
3024 
3025  /* This is table synchronization worker, call initial sync. */
3026  syncslotname = LogicalRepSyncTableStart(&origin_startpos);
3027 
3028  /* The slot name needs to be allocated in permanent memory context. */
3029  oldctx = MemoryContextSwitchTo(ApplyContext);
3030  myslotname = pstrdup(syncslotname);
3031  MemoryContextSwitchTo(oldctx);
3032 
3033  pfree(syncslotname);
3034  }
3035  else
3036  {
3037  /* This is main apply worker */
3038  RepOriginId originid;
3039  TimeLineID startpointTLI;
3040  char *err;
3041 
3042  myslotname = MySubscription->slotname;
3043 
3044  /*
3045  * This shouldn't happen if the subscription is enabled, but guard
3046  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
3047  * crash if slot is NULL.)
3048  */
3049  if (!myslotname)
3050  ereport(ERROR,
3051  (errmsg("subscription has no replication slot set")));
3052 
3053  /* Setup replication origin tracking. */
3055  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
3056  originid = replorigin_by_name(originname, true);
3057  if (!OidIsValid(originid))
3058  originid = replorigin_create(originname);
3059  replorigin_session_setup(originid);
3060  replorigin_session_origin = originid;
3061  origin_startpos = replorigin_session_get_progress(false);
3063 
3064  wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
3065  &err);
3066  if (wrconn == NULL)
3067  ereport(ERROR,
3068  (errmsg("could not connect to the publisher: %s", err)));
3069 
3070  /*
3071  * We don't really use the output identify_system for anything but it
3072  * does some initializations on the upstream so let's still call it.
3073  */
3074  (void) walrcv_identify_system(wrconn, &startpointTLI);
3075 
3076  }
3077 
3078  /*
3079  * Setup callback for syscache so that we know when something changes in
3080  * the subscription relation state.
3081  */
3084  (Datum) 0);
3085 
3086  /* Build logical replication streaming options. */
3087  options.logical = true;
3088  options.startpoint = origin_startpos;
3089  options.slotname = myslotname;
3090  options.proto.logical.proto_version =
3091  walrcv_server_version(wrconn) >= 140000 ?
3093  options.proto.logical.publication_names = MySubscription->publications;
3094  options.proto.logical.binary = MySubscription->binary;
3095  options.proto.logical.streaming = MySubscription->stream;
3096 
3097  /* Start normal logical streaming replication. */
3098  walrcv_startstreaming(wrconn, &options);
3099 
3100  /* Run the main loop. */
3101  LogicalRepApplyLoop(origin_startpos);
3102 
3103  proc_exit(0);
3104 }
3105 
3106 /*
3107  * Is current process a logical replication worker?
3108  */
3109 bool
3111 {
3112  return MyLogicalRepWorker != NULL;
3113 }
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
void ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
Subscription * MySubscription
Definition: worker.c:163
static void apply_handle_type(StringInfo s)
Definition: worker.c:1117
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:475
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:263
#define NIL
Definition: pg_list.h:65
void ExecInitRangeTable(EState *estate, List *rangeTable)
Definition: execUtils.c:765
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:32
static void stream_close_file(void)
Definition: worker.c:2862
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:344
static void stream_write_change(char action, StringInfo s)
Definition: worker.c:2883
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:1134
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, Relation partition_root, int instrument_options)
Definition: execMain.c:1277
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:2296
static void subxact_info_add(TransactionId xid)
Definition: worker.c:2646
Relation ri_RelationDesc
Definition: execnodes.h:413
WalReceiverConn * wrconn
Definition: worker.c:161
static void subxact_info_read(Oid subid, TransactionId xid)
Definition: worker.c:2576
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define AllocSetContextCreate
Definition: memutils.h:170
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:414
#define DEBUG1
Definition: elog.h:25
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
dlist_node * cur
Definition: ilist.h:180
static void store_flush_position(XLogRecPtr remote_lsn)
Definition: worker.c:2025
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1801
uint32 TimeLineID
Definition: xlogdefs.h:52
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:289
int fileno
Definition: worker.c:186
MemoryContext TopTransactionContext
Definition: mcxt.c:49
void AcceptInvalidationMessages(void)
Definition: inval.c:684
CommandId es_output_cid
Definition: execnodes.h:519
static XLogRecPtr remote_final_lsn
Definition: worker.c:167
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4723
#define HASH_CONTEXT
Definition: hsearch.h:91
#define HASH_ELEM
Definition: hsearch.h:85
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepRelation *remoterel)
Definition: worker.c:1486
static void apply_handle_insert(StringInfo s)
Definition: worker.c:1151
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
uint32 TransactionId
Definition: c.h:520
void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
Definition: sharedfileset.c:64
static dlist_head lsn_mapping
Definition: worker.c:132
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:823
MemoryContext hcxt
Definition: hsearch.h:77
#define DatumGetInt32(X)
Definition: postgres.h:472
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:650
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:3032
#define RelationGetDescr(relation)
Definition: rel.h:482
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:529
static void apply_handle_update_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, LogicalRepRelMapEntry *relmapentry)
Definition: worker.c:1361
dlist_node node
Definition: worker.c:127
#define write(a, b, c)
Definition: win32.h:14
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:2731
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3132
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:853
int64 timestamp
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:59
int64 TimestampTz
Definition: timestamp.h:39
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1161
LogicalRepRelMapEntry * rel
Definition: worker.c:136
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:406
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:343
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
struct PartitionRoutingInfo * ri_PartitionInfo
Definition: execnodes.h:487
char * pstrdup(const char *in)
Definition: mcxt.c:1187
void CommitTransactionCommand(void)
Definition: xact.c:2947
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:2044
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:362
void BufFileTruncateShared(BufFile *file, int fileno, off_t offset)
Definition: buffile.c:861
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:416
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:397
Expr * expression_planner(Expr *expr)
Definition: planner.c:6177
#define walrcv_server_version(conn)
Definition: walreceiver.h:408
int maplen
Definition: attmap.h:37
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define AccessShareLock
Definition: lockdefs.h:36
TimestampTz last_send_time
uint16 RepOriginId
Definition: xlogdefs.h:58
Size entrysize
Definition: hsearch.h:72
XLogRecPtr last_lsn
static bool handle_streamed_transaction(const char action, StringInfo s)
Definition: worker.c:309
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:776
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:412
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1186
XLogRecPtr remote_end
Definition: worker.c:129
int errcode(int sqlerrcode)
Definition: elog.c:610
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:54
char * format_type_be(Oid type_oid)
Definition: format_type.c:339
CmdType operation
Definition: execnodes.h:1153
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:415
Datum * tts_values
Definition: tuptable.h:126
#define WL_SOCKET_READABLE
Definition: latch.h:125
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:137
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8426
void PopActiveSnapshot(void)
Definition: snapmgr.c:759
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:635
StringInfoData * colvalues
Definition: logicalproto.h:43
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:2724
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:919
EState * state
Definition: execnodes.h:934
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1051
List * es_range_table
Definition: execnodes.h:507
#define LOG
Definition: elog.h:26
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
Form_pg_class rd_rel
Definition: rel.h:109
unsigned int Oid
Definition: postgres_ext.h:31
TransactionId subxact_last
Definition: worker.c:195
struct SlotErrCallbackArg SlotErrCallbackArg
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:816
bool IsLogicalWorker(void)
Definition: worker.c:3110
void(* callback)(void *arg)
Definition: elog.h:229
int wal_receiver_status_interval
Definition: walreceiver.c:88
List * lappend_oid(List *list, Oid datum)
Definition: list.c:357
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1673
struct ErrorContextCallback * previous
Definition: elog.h:228
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:250
#define OidIsValid(objectId)
Definition: c.h:651
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:55
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
static int fd(const char *x, int i)
Definition: preproc-init.c:105
uint32 nsubxacts
Definition: worker.c:193
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:209
void BufFileClose(BufFile *file)
Definition: buffile.c:395
void ResetLatch(Latch *latch)
Definition: latch.c:588
int wal_receiver_timeout
Definition: walreceiver.c:89
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:1981
Definition: attmap.h:34
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:151
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2921
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:163
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
ErrorContextCallback * error_context_stack
Definition: elog.c:92
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:382
#define list_make1(x1)
Definition: pg_list.h:226
#define NAMEDATALEN
void FreeExecutorState(EState *estate)
Definition: execUtils.c:191
Subscription * GetSubscription(Oid subid, bool missing_ok)
#define GetPerTupleExprContext(estate)
Definition: executor.h:507
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:173
static StringInfoData reply_message
Definition: walreceiver.c:124
Definition: dynahash.c:218
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:2483
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:53
TupleConversionMap * pi_RootToPartitionMap
Definition: execPartition.h:37
void pfree(void *pointer)
Definition: mcxt.c:1057
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
SubXactInfo * subxacts
Definition: worker.c:196
static void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:2744
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
#define ERROR
Definition: elog.h:43
PlanState ps
Definition: execnodes.h:1152
static bool ensure_transaction(void)
Definition: worker.c:279
struct ApplySubXactData ApplySubXactData
void fill_extraUpdatedCols(RangeTblEntry *target_rte, TupleDesc tupdesc)
Definition: analyze.c:2368
LogicalRepRelation remoterel
#define NAPTIME_PER_CYCLE
Definition: worker.c:123
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
static void * list_nth(const List *list, int n)
Definition: pg_list.h:286
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:824
bool in_remote_transaction
Definition: worker.c:166
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs)
Definition: tablecmds.c:1685
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:453
Definition: guc.h:75
#define MAXPGPATH
bool in_streamed_transaction
Definition: worker.c:170
XLogRecPtr reply_lsn
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:588
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:354
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
Definition: fmgr.c:1666
static bool am_tablesync_worker(void)
static void apply_handle_delete(StringInfo s)
Definition: worker.c:1421
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define DEBUG2
Definition: elog.h:24
BufFile * BufFileCreateShared(SharedFileSet *fileset, const char *name)
Definition: buffile.c:262
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:102
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:155
char * c
void logicalrep_worker_attach(int slot)
Definition: launcher.c:629
#define NoLock
Definition: lockdefs.h:34
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:7714
static char * buf
Definition: pg_test_fsync.c:68
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:680
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:156
bool * tts_isnull
Definition: tuptable.h:128
void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:290
ResultRelInfo * es_result_relations
Definition: execnodes.h:522
#define RowExclusiveLock
Definition: lockdefs.h:38
int errcode_for_file_access(void)
Definition: elog.c:633
#define SIGHUP
Definition: win32_port.h:153
#define InvalidTransactionId
Definition: transam.h:31
SharedFileSet * stream_fileset
Definition: worker.c:151
#define RelationGetRelationName(relation)
Definition: rel.h:490
XLogRecPtr startpoint
Definition: walreceiver.h:168
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:193
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:438
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
unsigned int uint32
Definition: c.h:374
int pgsocket
Definition: port.h:31
List * publications
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void apply_handle_begin(StringInfo s)
Definition: worker.c:695
RepOriginId replorigin_create(char *roname)
Definition: origin.c:240
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:237
Oid get_atttype(Oid relid, AttrNumber attnum)
Definition: lsyscache.c:911
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, ModifyTableState *mtstate, Relation rel)
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition: lsyscache.c:2817
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2751
AttrMap * attrMap
Definition: tupconvert.h:27
BufFile * BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode)
Definition: buffile.c:284
MemoryContext TopMemoryContext
Definition: mcxt.c:44
EState * CreateExecutorState(void)
Definition: execUtils.c:89
Definition: guc.h:72
int my_log2(long num)
Definition: dynahash.c:1730
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:1232
List * lappend(List *list, void *datum)
Definition: list.c:321
TransactionId xid
Definition: worker.c:185
MemoryContext ApplyContext
Definition: worker.c:156
static char ** options
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
XLogRecPtr final_lsn
Definition: logicalproto.h:85
#define DLIST_STATIC_INIT(name)
Definition: ilist.h:248
static void apply_handle_commit(StringInfo s)
Definition: worker.c:714
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2475
static void apply_handle_tuple_routing(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, LogicalRepRelMapEntry *relmapentry, CmdType operation)
Definition: worker.c:1560
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:67
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
Node * build_column_default(Relation rel, int attrno)
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:949
List * es_tupleTable
Definition: execnodes.h:554
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1434
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1161
char * s2
static void apply_handle_update(StringInfo s)
Definition: worker.c:1266
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:326
uintptr_t Datum
Definition: postgres.h:367
void CommandCounterIncrement(void)
Definition: xact.c:1021
#define PGINVALID_SOCKET
Definition: port.h:33
Size keysize
Definition: hsearch.h:71
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:1531
int es_num_result_relations
Definition: execnodes.h:523
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5759
Plan * plan
Definition: execnodes.h:932
static void apply_handle_relation(StringInfo s)
Definition: worker.c:1099
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition: relation.c:598
int16 attnum
Definition: pg_attribute.h:79
SharedFileSet * subxact_fileset
Definition: worker.c:152
#define ereport(elevel,...)
Definition: elog.h:144
Bitmapset * updatedCols
Definition: parsenodes.h:1124
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:33
XLogRecPtr local_end
Definition: worker.c:128
static MemoryContext LogicalStreamingContext
Definition: worker.c:159
struct SubXactInfo SubXactInfo
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
static HTAB * xidhash
Definition: worker.c:178
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
Definition: buffile.c:743
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4432
static void apply_dispatch(StringInfo s)
Definition: worker.c:1902
static void maybe_reread_subscription(void)
Definition: worker.c:2385
#define makeNode(_type_)
Definition: nodes.h:576
TimestampTz last_recv_time
static MemoryContext ApplyMessageContext
Definition: worker.c:155
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:674
void SharedFileSetDeleteAll(SharedFileSet *fileset)
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:177
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:745
#define lfirst(lc)
Definition: pg_list.h:189
RepOriginId replorigin_session_origin
Definition: origin.c:154
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:593
static void apply_handle_origin(StringInfo s)
Definition: worker.c:758
void StartTransactionCommand(void)
Definition: xact.c:2846
AttrNumber * attnums
Definition: attmap.h:36
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:473
static void slot_store_error_callback(void *arg)
Definition: worker.c:438
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:218
bool IsTransactionState(void)
Definition: xact.c:371
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:736
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:418
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:94
int WalWriterDelay
Definition: walwriter.c:70
bool MySubscriptionValid
Definition: worker.c:164
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1070
off_t offset
Definition: worker.c:187
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:512
void FreeSubscription(Subscription *sub)
char * logicalrep_typmap_gettypname(Oid remoteid)
Definition: relation.c:469
RTEKind rtekind
Definition: parsenodes.h:977
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:165
TransactionId xid
Definition: worker.c:150
static void subxact_info_write(Oid subid, TransactionId xid)
Definition: worker.c:2498
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4452
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:833
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition: proto.c:773
void * palloc(Size size)
Definition: mcxt.c:950
int errmsg(const char *fmt,...)
Definition: elog.c:824
static void stream_open_file(Oid subid, TransactionId xid, bool first)
Definition: worker.c:2787
static ApplySubXactData subxact_data
Definition: worker.c:199
static BufFile * stream_fd
Definition: worker.c:181
uint32 nsubxacts_max
Definition: worker.c:194
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:2060
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
#define elog(elevel,...)
Definition: elog.h:214
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
int i
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:543
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
#define errcontext
Definition: elog.h:185
static void cleanup_subxact_info(void)
Definition: worker.c:2910
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1764
void * arg
struct Latch * MyLatch
Definition: globals.c:54
struct FlushPosition FlushPosition
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:473
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:123
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
union WalRcvStreamOptions::@103 proto
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:761
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
void BufFileWrite(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:586
#define TransactionIdIsValid(xid)
Definition: transam.h:41
XLogRecPtr commit_lsn
Definition: logicalproto.h:92
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389
Definition: pg_list.h:50
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1840
#define snprintf
Definition: port.h:193
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1648
#define WL_LATCH_SET
Definition: latch.h:124
#define RelationGetRelid(relation)
Definition: rel.h:456
static TransactionId stream_xid
Definition: worker.c:172
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1538
CmdType
Definition: nodes.h:671
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:226
#define die(msg)
Definition: pg_test_fsync.c:97
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4702
struct StreamXidHash StreamXidHash
void ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
TimestampTz committime
Definition: logicalproto.h:94
void logicalrep_typmap_update(LogicalRepTyp *remotetyp)
Definition: relation.c:435
void ApplyWorkerMain(Datum main_arg)
Definition: worker.c:2923
void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid)
Definition: proto.c:865
uint32 LogicalRepRelId
Definition: logicalproto.h:57
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1522
#define lfirst_oid(lc)
Definition: pg_list.h:191
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
TimestampTz reply_time
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:215
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:256
TupleTableSlot * pi_PartitionTupleSlot
Definition: execPartition.h:49
static void apply_handle_insert_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot)
Definition: worker.c:1215
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5788
void pgstat_report_stat(bool force)
Definition: pgstat.c:839
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:1791
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:398
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:524