PostgreSQL Source Code  git master
worker.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * worker.c
3  * PostgreSQL logical replication worker (apply)
4  *
5  * Copyright (c) 2016-2020, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/worker.c
9  *
10  * NOTES
11  * This file contains the worker which applies logical changes as they come
12  * from remote logical replication stream.
13  *
14  * The main worker (apply) is started by logical replication worker
15  * launcher for every enabled subscription in a database. It uses
16  * walsender protocol to communicate with publisher.
17  *
18  * This module includes server facing code and shares libpqwalreceiver
19  * module with walreceiver for providing the libpq specific functionality.
20  *
21  *
22  * STREAMED TRANSACTIONS
23  * ---------------------
24  * Streamed transactions (large transactions exceeding a memory limit on the
25  * upstream) are not applied immediately, but instead, the data is written
26  * to temporary files and then applied at once when the final commit arrives.
27  *
28  * Unlike the regular (non-streamed) case, handling streamed transactions has
29  * to handle aborts of both the toplevel transaction and subtransactions. This
30  * is achieved by tracking offsets for subtransactions, which is then used
31  * to truncate the file with serialized changes.
32  *
33  * The files are placed in tmp file directory by default, and the filenames
34  * include both the XID of the toplevel transaction and OID of the
35  * subscription. This is necessary so that different workers processing a
36  * remote transaction with the same XID doesn't interfere.
37  *
38  * We use BufFiles instead of using normal temporary files because (a) the
39  * BufFile infrastructure supports temporary files that exceed the OS file size
40  * limit, (b) provides a way for automatic clean up on the error and (c) provides
41  * a way to survive these files across local transactions and allow to open and
42  * close at stream start and close. We decided to use SharedFileSet
43  * infrastructure as without that it deletes the files on the closure of the
44  * file and if we decide to keep stream files open across the start/stop stream
45  * then it will consume a lot of memory (more than 8K for each BufFile and
46  * there could be multiple such BufFiles as the subscriber could receive
47  * multiple start/stop streams for different transactions before getting the
48  * commit). Moreover, if we don't use SharedFileSet then we also need to invent
49  * a new way to pass filenames to BufFile APIs so that we are allowed to open
50  * the file we desired across multiple stream-open calls for the same
51  * transaction.
52  *-------------------------------------------------------------------------
53  */
54 
55 #include "postgres.h"
56 
57 #include <sys/stat.h>
58 #include <unistd.h>
59 
60 #include "access/table.h"
61 #include "access/tableam.h"
62 #include "access/xact.h"
63 #include "access/xlog_internal.h"
64 #include "catalog/catalog.h"
65 #include "catalog/namespace.h"
66 #include "catalog/partition.h"
67 #include "catalog/pg_inherits.h"
70 #include "catalog/pg_tablespace.h"
71 #include "commands/tablecmds.h"
72 #include "commands/tablespace.h"
73 #include "commands/trigger.h"
74 #include "executor/executor.h"
75 #include "executor/execPartition.h"
77 #include "funcapi.h"
78 #include "libpq/pqformat.h"
79 #include "libpq/pqsignal.h"
80 #include "mb/pg_wchar.h"
81 #include "miscadmin.h"
82 #include "nodes/makefuncs.h"
83 #include "optimizer/optimizer.h"
84 #include "pgstat.h"
85 #include "postmaster/bgworker.h"
86 #include "postmaster/interrupt.h"
87 #include "postmaster/postmaster.h"
88 #include "postmaster/walwriter.h"
89 #include "replication/decode.h"
90 #include "replication/logical.h"
94 #include "replication/origin.h"
96 #include "replication/snapbuild.h"
99 #include "rewrite/rewriteHandler.h"
100 #include "storage/buffile.h"
101 #include "storage/bufmgr.h"
102 #include "storage/fd.h"
103 #include "storage/ipc.h"
104 #include "storage/lmgr.h"
105 #include "storage/proc.h"
106 #include "storage/procarray.h"
107 #include "tcop/tcopprot.h"
108 #include "utils/builtins.h"
109 #include "utils/catcache.h"
110 #include "utils/dynahash.h"
111 #include "utils/datum.h"
112 #include "utils/fmgroids.h"
113 #include "utils/guc.h"
114 #include "utils/inval.h"
115 #include "utils/lsyscache.h"
116 #include "utils/memutils.h"
117 #include "utils/rel.h"
118 #include "utils/syscache.h"
119 #include "utils/timeout.h"
120 
121 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
122 
123 typedef struct FlushPosition
124 {
128 } FlushPosition;
129 
131 
132 typedef struct SlotErrCallbackArg
133 {
138 
139 /*
140  * Stream xid hash entry. Whenever we see a new xid we create this entry in the
141  * xidhash and along with it create the streaming file and store the fileset handle.
142  * The subxact file is created iff there is any subxact info under this xid. This
143  * entry is used on the subsequent streams for the xid to get the corresponding
144  * fileset handles, so storing them in hash makes the search faster.
145  */
146 typedef struct StreamXidHash
147 {
148  TransactionId xid; /* xid is the hash key and must be first */
149  SharedFileSet *stream_fileset; /* shared file set for stream data */
150  SharedFileSet *subxact_fileset; /* shared file set for subxact info */
151 } StreamXidHash;
152 
155 
156 /* per stream context for streaming transactions */
158 
160 
162 bool MySubscriptionValid = false;
163 
166 
167 /* fields valid only when processing streamed transaction */
169 
171 
172 /*
173  * Hash table for storing the streaming xid information along with shared file
174  * set for streaming and subxact files.
175  */
176 static HTAB *xidhash = NULL;
177 
178 /* BufFile handle of the current streaming file */
179 static BufFile *stream_fd = NULL;
180 
181 typedef struct SubXactInfo
182 {
183  TransactionId xid; /* XID of the subxact */
184  int fileno; /* file number in the buffile */
185  off_t offset; /* offset in the file */
186 } SubXactInfo;
187 
188 /* Sub-transaction data for the current streaming transaction */
189 typedef struct ApplySubXactData
190 {
191  uint32 nsubxacts; /* number of sub-transactions */
192  uint32 nsubxacts_max; /* current capacity of subxacts */
193  TransactionId subxact_last; /* xid of the last sub-transaction */
194  SubXactInfo *subxacts; /* sub-xact offset in changes file */
196 
198 
199 static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
200 static inline void changes_filename(char *path, Oid subid, TransactionId xid);
201 
202 /*
203  * Information about subtransactions of a given toplevel transaction.
204  */
205 static void subxact_info_write(Oid subid, TransactionId xid);
206 static void subxact_info_read(Oid subid, TransactionId xid);
207 static void subxact_info_add(TransactionId xid);
208 static inline void cleanup_subxact_info(void);
209 
210 /*
211  * Serialize and deserialize changes for a toplevel transaction.
212  */
213 static void stream_cleanup_files(Oid subid, TransactionId xid);
214 static void stream_open_file(Oid subid, TransactionId xid, bool first);
215 static void stream_write_change(char action, StringInfo s);
216 static void stream_close_file(void);
217 
218 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
219 
220 static void store_flush_position(XLogRecPtr remote_lsn);
221 
222 static void maybe_reread_subscription(void);
223 
224 /* prototype needed because of stream_commit */
225 static void apply_dispatch(StringInfo s);
226 
228  LogicalRepCommitData* commit_data);
229 static void apply_handle_insert_internal(ResultRelInfo *relinfo,
230  EState *estate, TupleTableSlot *remoteslot);
231 static void apply_handle_update_internal(ResultRelInfo *relinfo,
232  EState *estate, TupleTableSlot *remoteslot,
233  LogicalRepTupleData *newtup,
234  LogicalRepRelMapEntry *relmapentry);
235 static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
236  TupleTableSlot *remoteslot,
237  LogicalRepRelation *remoterel);
238 static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
239  LogicalRepRelation *remoterel,
240  TupleTableSlot *remoteslot,
241  TupleTableSlot **localslot);
242 static void apply_handle_tuple_routing(ResultRelInfo *relinfo,
243  EState *estate,
244  TupleTableSlot *remoteslot,
245  LogicalRepTupleData *newtup,
246  LogicalRepRelMapEntry *relmapentry,
247  CmdType operation);
248 
249 /*
250  * Should this worker apply changes for given relation.
251  *
252  * This is mainly needed for initial relation data sync as that runs in
253  * separate worker process running in parallel and we need some way to skip
254  * changes coming to the main apply worker during the sync of a table.
255  *
256  * Note we need to do smaller or equals comparison for SYNCDONE state because
257  * it might hold position of end of initial slot consistent point WAL
258  * record + 1 (ie start of next record) and next record can be COMMIT of
259  * transaction we are now processing (which is what we set remote_final_lsn
260  * to in apply_handle_begin).
261  */
262 static bool
264 {
265  if (am_tablesync_worker())
266  return MyLogicalRepWorker->relid == rel->localreloid;
267  else
268  return (rel->state == SUBREL_STATE_READY ||
269  (rel->state == SUBREL_STATE_SYNCDONE &&
270  rel->statelsn <= remote_final_lsn));
271 }
272 
273 /*
274  * Make sure that we started local transaction.
275  *
276  * Also switches to ApplyMessageContext as necessary.
277  */
278 static bool
280 {
281  if (IsTransactionState())
282  {
284 
285  if (CurrentMemoryContext != ApplyMessageContext)
286  MemoryContextSwitchTo(ApplyMessageContext);
287 
288  return false;
289  }
290 
293 
295 
296  MemoryContextSwitchTo(ApplyMessageContext);
297  return true;
298 }
299 
300 /*
301  * Handle streamed transactions.
302  *
303  * If in streaming mode (receiving a block of streamed transaction), we
304  * simply redirect it to a file for the proper toplevel transaction.
305  *
306  * Returns true for streamed transactions, false otherwise (regular mode).
307  */
308 static bool
310 {
311  TransactionId xid;
312 
313  /* not in streaming mode */
315  return false;
316 
317  Assert(stream_fd != NULL);
319 
320  /*
321  * We should have received XID of the subxact as the first part of the
322  * message, so extract it.
323  */
324  xid = pq_getmsgint(s, 4);
325 
327 
328  /* Add the new subxact to the array (unless already there). */
329  subxact_info_add(xid);
330 
331  /* write the change to the current file */
332  stream_write_change(action, s);
333 
334  return true;
335 }
336 
337 /*
338  * Executor state preparation for evaluation of constraint expressions,
339  * indexes and triggers.
340  *
341  * This is based on similar code in copy.c
342  */
343 static EState *
345 {
346  EState *estate;
347  RangeTblEntry *rte;
348 
349  estate = CreateExecutorState();
350 
351  rte = makeNode(RangeTblEntry);
352  rte->rtekind = RTE_RELATION;
353  rte->relid = RelationGetRelid(rel->localrel);
354  rte->relkind = rel->localrel->rd_rel->relkind;
356  ExecInitRangeTable(estate, list_make1(rte));
357 
358  estate->es_output_cid = GetCurrentCommandId(true);
359 
360  /* Prepare to catch AFTER triggers. */
362 
363  return estate;
364 }
365 
366 /*
367  * Executes default values for columns for which we can't map to remote
368  * relation columns.
369  *
370  * This allows us to support tables which have more columns on the downstream
371  * than on the upstream.
372  */
373 static void
375  TupleTableSlot *slot)
376 {
377  TupleDesc desc = RelationGetDescr(rel->localrel);
378  int num_phys_attrs = desc->natts;
379  int i;
380  int attnum,
381  num_defaults = 0;
382  int *defmap;
383  ExprState **defexprs;
384  ExprContext *econtext;
385 
386  econtext = GetPerTupleExprContext(estate);
387 
388  /* We got all the data via replication, no need to evaluate anything. */
389  if (num_phys_attrs == rel->remoterel.natts)
390  return;
391 
392  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
393  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
394 
395  Assert(rel->attrmap->maplen == num_phys_attrs);
396  for (attnum = 0; attnum < num_phys_attrs; attnum++)
397  {
398  Expr *defexpr;
399 
400  if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
401  continue;
402 
403  if (rel->attrmap->attnums[attnum] >= 0)
404  continue;
405 
406  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
407 
408  if (defexpr != NULL)
409  {
410  /* Run the expression through planner */
411  defexpr = expression_planner(defexpr);
412 
413  /* Initialize executable expression in copycontext */
414  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
415  defmap[num_defaults] = attnum;
416  num_defaults++;
417  }
418 
419  }
420 
421  for (i = 0; i < num_defaults; i++)
422  slot->tts_values[defmap[i]] =
423  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
424 }
425 
426 /*
427  * Error callback to give more context info about type conversion failure.
428  */
429 static void
431 {
432  SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
434  char *remotetypname;
435  Oid remotetypoid,
436  localtypoid;
437 
438  /* Nothing to do if remote attribute number is not set */
439  if (errarg->remote_attnum < 0)
440  return;
441 
442  rel = errarg->rel;
443  remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum];
444 
445  /* Fetch remote type name from the LogicalRepTypMap cache */
446  remotetypname = logicalrep_typmap_gettypname(remotetypoid);
447 
448  /* Fetch local type OID from the local sys cache */
449  localtypoid = get_atttype(rel->localreloid, errarg->local_attnum + 1);
450 
451  errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
452  "remote type %s, local type %s",
453  rel->remoterel.nspname, rel->remoterel.relname,
454  rel->remoterel.attnames[errarg->remote_attnum],
455  remotetypname,
456  format_type_be(localtypoid));
457 }
458 
459 /*
460  * Store tuple data into slot.
461  *
462  * Incoming data can be either text or binary format.
463  */
464 static void
466  LogicalRepTupleData *tupleData)
467 {
468  int natts = slot->tts_tupleDescriptor->natts;
469  int i;
470  SlotErrCallbackArg errarg;
471  ErrorContextCallback errcallback;
472 
473  ExecClearTuple(slot);
474 
475  /* Push callback + info on the error context stack */
476  errarg.rel = rel;
477  errarg.local_attnum = -1;
478  errarg.remote_attnum = -1;
479  errcallback.callback = slot_store_error_callback;
480  errcallback.arg = (void *) &errarg;
481  errcallback.previous = error_context_stack;
482  error_context_stack = &errcallback;
483 
484  /* Call the "in" function for each non-dropped, non-null attribute */
485  Assert(natts == rel->attrmap->maplen);
486  for (i = 0; i < natts; i++)
487  {
489  int remoteattnum = rel->attrmap->attnums[i];
490 
491  if (!att->attisdropped && remoteattnum >= 0)
492  {
493  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
494 
495  Assert(remoteattnum < tupleData->ncols);
496 
497  errarg.local_attnum = i;
498  errarg.remote_attnum = remoteattnum;
499 
500  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
501  {
502  Oid typinput;
503  Oid typioparam;
504 
505  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
506  slot->tts_values[i] =
507  OidInputFunctionCall(typinput, colvalue->data,
508  typioparam, att->atttypmod);
509  slot->tts_isnull[i] = false;
510  }
511  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
512  {
513  Oid typreceive;
514  Oid typioparam;
515 
516  /*
517  * In some code paths we may be asked to re-parse the same
518  * tuple data. Reset the StringInfo's cursor so that works.
519  */
520  colvalue->cursor = 0;
521 
522  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
523  slot->tts_values[i] =
524  OidReceiveFunctionCall(typreceive, colvalue,
525  typioparam, att->atttypmod);
526 
527  /* Trouble if it didn't eat the whole buffer */
528  if (colvalue->cursor != colvalue->len)
529  ereport(ERROR,
530  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
531  errmsg("incorrect binary data format in logical replication column %d",
532  remoteattnum + 1)));
533  slot->tts_isnull[i] = false;
534  }
535  else
536  {
537  /*
538  * NULL value from remote. (We don't expect to see
539  * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
540  * NULL.)
541  */
542  slot->tts_values[i] = (Datum) 0;
543  slot->tts_isnull[i] = true;
544  }
545 
546  errarg.local_attnum = -1;
547  errarg.remote_attnum = -1;
548  }
549  else
550  {
551  /*
552  * We assign NULL to dropped attributes and missing values
553  * (missing values should be later filled using
554  * slot_fill_defaults).
555  */
556  slot->tts_values[i] = (Datum) 0;
557  slot->tts_isnull[i] = true;
558  }
559  }
560 
561  /* Pop the error context stack */
562  error_context_stack = errcallback.previous;
563 
564  ExecStoreVirtualTuple(slot);
565 }
566 
567 /*
568  * Replace updated columns with data from the LogicalRepTupleData struct.
569  * This is somewhat similar to heap_modify_tuple but also calls the type
570  * input functions on the user data.
571  *
572  * "slot" is filled with a copy of the tuple in "srcslot", replacing
573  * columns provided in "tupleData" and leaving others as-is.
574  *
575  * Caution: unreplaced pass-by-ref columns in "slot" will point into the
576  * storage for "srcslot". This is OK for current usage, but someday we may
577  * need to materialize "slot" at the end to make it independent of "srcslot".
578  */
579 static void
582  LogicalRepTupleData *tupleData)
583 {
584  int natts = slot->tts_tupleDescriptor->natts;
585  int i;
586  SlotErrCallbackArg errarg;
587  ErrorContextCallback errcallback;
588 
589  /* We'll fill "slot" with a virtual tuple, so we must start with ... */
590  ExecClearTuple(slot);
591 
592  /*
593  * Copy all the column data from srcslot, so that we'll have valid values
594  * for unreplaced columns.
595  */
596  Assert(natts == srcslot->tts_tupleDescriptor->natts);
597  slot_getallattrs(srcslot);
598  memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
599  memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
600 
601  /* For error reporting, push callback + info on the error context stack */
602  errarg.rel = rel;
603  errarg.local_attnum = -1;
604  errarg.remote_attnum = -1;
605  errcallback.callback = slot_store_error_callback;
606  errcallback.arg = (void *) &errarg;
607  errcallback.previous = error_context_stack;
608  error_context_stack = &errcallback;
609 
610  /* Call the "in" function for each replaced attribute */
611  Assert(natts == rel->attrmap->maplen);
612  for (i = 0; i < natts; i++)
613  {
615  int remoteattnum = rel->attrmap->attnums[i];
616 
617  if (remoteattnum < 0)
618  continue;
619 
620  Assert(remoteattnum < tupleData->ncols);
621 
622  if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
623  {
624  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
625 
626  errarg.local_attnum = i;
627  errarg.remote_attnum = remoteattnum;
628 
629  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
630  {
631  Oid typinput;
632  Oid typioparam;
633 
634  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
635  slot->tts_values[i] =
636  OidInputFunctionCall(typinput, colvalue->data,
637  typioparam, att->atttypmod);
638  slot->tts_isnull[i] = false;
639  }
640  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
641  {
642  Oid typreceive;
643  Oid typioparam;
644 
645  /*
646  * In some code paths we may be asked to re-parse the same
647  * tuple data. Reset the StringInfo's cursor so that works.
648  */
649  colvalue->cursor = 0;
650 
651  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
652  slot->tts_values[i] =
653  OidReceiveFunctionCall(typreceive, colvalue,
654  typioparam, att->atttypmod);
655 
656  /* Trouble if it didn't eat the whole buffer */
657  if (colvalue->cursor != colvalue->len)
658  ereport(ERROR,
659  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
660  errmsg("incorrect binary data format in logical replication column %d",
661  remoteattnum + 1)));
662  slot->tts_isnull[i] = false;
663  }
664  else
665  {
666  /* must be LOGICALREP_COLUMN_NULL */
667  slot->tts_values[i] = (Datum) 0;
668  slot->tts_isnull[i] = true;
669  }
670 
671  errarg.local_attnum = -1;
672  errarg.remote_attnum = -1;
673  }
674  }
675 
676  /* Pop the error context stack */
677  error_context_stack = errcallback.previous;
678 
679  /* And finally, declare that "slot" contains a valid virtual tuple */
680  ExecStoreVirtualTuple(slot);
681 }
682 
683 /*
684  * Handle BEGIN message.
685  */
686 static void
688 {
689  LogicalRepBeginData begin_data;
690 
691  logicalrep_read_begin(s, &begin_data);
692 
693  remote_final_lsn = begin_data.final_lsn;
694 
695  in_remote_transaction = true;
696 
698 }
699 
700 /*
701  * Handle COMMIT message.
702  *
703  * TODO, support tracking of multiple origins
704  */
705 static void
707 {
708  LogicalRepCommitData commit_data;
709 
710  logicalrep_read_commit(s, &commit_data);
711 
712  Assert(commit_data.commit_lsn == remote_final_lsn);
713 
714  apply_handle_commit_internal(s, &commit_data);
715 
716  /* Process any tables that are being synchronized in parallel. */
717  process_syncing_tables(commit_data.end_lsn);
718 
720 }
721 
722 /*
723  * Handle ORIGIN message.
724  *
725  * TODO, support tracking of multiple origins
726  */
727 static void
729 {
730  /*
731  * ORIGIN message can only come inside streaming transaction or inside
732  * remote transaction and before any actual writes.
733  */
737  ereport(ERROR,
738  (errcode(ERRCODE_PROTOCOL_VIOLATION),
739  errmsg("ORIGIN message sent out of order")));
740 }
741 
742 /*
743  * Handle STREAM START message.
744  */
745 static void
747 {
748  bool first_segment;
749  HASHCTL hash_ctl;
750 
752 
753  /*
754  * Start a transaction on stream start, this transaction will be committed
755  * on the stream stop unless it is a tablesync worker in which case it will
756  * be committed after processing all the messages. We need the transaction
757  * for handling the buffile, used for serializing the streaming data and
758  * subxact info.
759  */
761 
762  /* notify handle methods we're processing a remote transaction */
764 
765  /* extract XID of the top-level transaction */
766  stream_xid = logicalrep_read_stream_start(s, &first_segment);
767 
768  /*
769  * Initialize the xidhash table if we haven't yet. This will be used for
770  * the entire duration of the apply worker so create it in permanent
771  * context.
772  */
773  if (xidhash == NULL)
774  {
775  hash_ctl.keysize = sizeof(TransactionId);
776  hash_ctl.entrysize = sizeof(StreamXidHash);
777  hash_ctl.hcxt = ApplyContext;
778  xidhash = hash_create("StreamXidHash", 1024, &hash_ctl,
780  }
781 
782  /* open the spool file for this transaction */
784 
785  /* if this is not the first segment, open existing subxact file */
786  if (!first_segment)
788 
790 }
791 
792 /*
793  * Handle STREAM STOP message.
794  */
795 static void
797 {
799 
800  /*
801  * Close the file with serialized changes, and serialize information about
802  * subxacts for the toplevel transaction.
803  */
806 
807  /* We must be in a valid transaction state */
809 
810  /* The synchronization worker runs in single transaction. */
811  if (!am_tablesync_worker())
812  {
813  /* Commit the per-stream transaction */
815  }
816 
817  in_streamed_transaction = false;
818 
819  /* Reset per-stream context */
820  MemoryContextReset(LogicalStreamingContext);
821 
823 }
824 
825 /*
826  * Handle STREAM abort message.
827  */
828 static void
830 {
831  TransactionId xid;
832  TransactionId subxid;
833 
835 
836  logicalrep_read_stream_abort(s, &xid, &subxid);
837 
838  /*
839  * If the two XIDs are the same, it's in fact abort of toplevel xact, so
840  * just delete the files with serialized info.
841  */
842  if (xid == subxid)
844  else
845  {
846  /*
847  * OK, so it's a subxact. We need to read the subxact file for the
848  * toplevel transaction, determine the offset tracked for the subxact,
849  * and truncate the file with changes. We also remove the subxacts
850  * with higher offsets (or rather higher XIDs).
851  *
852  * We intentionally scan the array from the tail, because we're likely
853  * aborting a change for the most recent subtransactions.
854  *
855  * We can't use the binary search here as subxact XIDs won't
856  * necessarily arrive in sorted order, consider the case where we have
857  * released the savepoint for multiple subtransactions and then
858  * performed rollback to savepoint for one of the earlier
859  * sub-transaction.
860  */
861 
862  int64 i;
863  int64 subidx;
864  BufFile *fd;
865  bool found = false;
866  char path[MAXPGPATH];
867  StreamXidHash *ent;
868 
869  subidx = -1;
872 
873  for (i = subxact_data.nsubxacts; i > 0; i--)
874  {
875  if (subxact_data.subxacts[i - 1].xid == subxid)
876  {
877  subidx = (i - 1);
878  found = true;
879  break;
880  }
881  }
882 
883  /*
884  * If it's an empty sub-transaction then we will not find the subxid
885  * here so just cleanup the subxact info and return.
886  */
887  if (!found)
888  {
889  /* Cleanup the subxact info */
891 
892  /* The synchronization worker runs in single transaction */
893  if (!am_tablesync_worker())
895  return;
896  }
897 
898  Assert((subidx >= 0) && (subidx < subxact_data.nsubxacts));
899 
900  ent = (StreamXidHash *) hash_search(xidhash,
901  (void *) &xid,
902  HASH_FIND,
903  &found);
904  Assert(found);
905 
906  /* open the changes file */
908  fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
909 
910  /* OK, truncate the file at the right offset */
911  BufFileTruncateShared(fd, subxact_data.subxacts[subidx].fileno,
912  subxact_data.subxacts[subidx].offset);
913  BufFileClose(fd);
914 
915  /* discard the subxacts added later */
916  subxact_data.nsubxacts = subidx;
917 
918  /* write the updated subxact list */
920 
921  if (!am_tablesync_worker())
923  }
924 }
925 
926 /*
927  * Handle STREAM COMMIT message.
928  */
929 static void
931 {
932  TransactionId xid;
934  int nchanges;
935  char path[MAXPGPATH];
936  char *buffer = NULL;
937  bool found;
938  LogicalRepCommitData commit_data;
939  StreamXidHash *ent;
940  MemoryContext oldcxt;
941  BufFile *fd;
942 
944 
945  xid = logicalrep_read_stream_commit(s, &commit_data);
946 
947  elog(DEBUG1, "received commit for streamed transaction %u", xid);
948 
950 
951  /*
952  * Allocate file handle and memory required to process all the messages in
953  * TopTransactionContext to avoid them getting reset after each message is
954  * processed.
955  */
957 
958  /* open the spool file for the committed transaction */
960  elog(DEBUG1, "replaying changes from file \"%s\"", path);
961  ent = (StreamXidHash *) hash_search(xidhash,
962  (void *) &xid,
963  HASH_FIND,
964  &found);
965  Assert(found);
966  fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY);
967 
968  buffer = palloc(BLCKSZ);
969  initStringInfo(&s2);
970 
971  MemoryContextSwitchTo(oldcxt);
972 
973  remote_final_lsn = commit_data.commit_lsn;
974 
975  /*
976  * Make sure the handle apply_dispatch methods are aware we're in a remote
977  * transaction.
978  */
979  in_remote_transaction = true;
981 
982  /*
983  * Read the entries one by one and pass them through the same logic as in
984  * apply_dispatch.
985  */
986  nchanges = 0;
987  while (true)
988  {
989  int nbytes;
990  int len;
991 
993 
994  /* read length of the on-disk record */
995  nbytes = BufFileRead(fd, &len, sizeof(len));
996 
997  /* have we reached end of the file? */
998  if (nbytes == 0)
999  break;
1000 
1001  /* do we have a correct length? */
1002  if (nbytes != sizeof(len))
1003  ereport(ERROR,
1005  errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1006  path)));
1007 
1008  Assert(len > 0);
1009 
1010  /* make sure we have sufficiently large buffer */
1011  buffer = repalloc(buffer, len);
1012 
1013  /* and finally read the data into the buffer */
1014  if (BufFileRead(fd, buffer, len) != len)
1015  ereport(ERROR,
1017  errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1018  path)));
1019 
1020  /* copy the buffer to the stringinfo and call apply_dispatch */
1021  resetStringInfo(&s2);
1022  appendBinaryStringInfo(&s2, buffer, len);
1023 
1024  /* Ensure we are reading the data into our memory context. */
1025  oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
1026 
1027  apply_dispatch(&s2);
1028 
1029  MemoryContextReset(ApplyMessageContext);
1030 
1031  MemoryContextSwitchTo(oldcxt);
1032 
1033  nchanges++;
1034 
1035  if (nchanges % 1000 == 0)
1036  elog(DEBUG1, "replayed %d changes from file '%s'",
1037  nchanges, path);
1038  }
1039 
1040  BufFileClose(fd);
1041 
1042  pfree(buffer);
1043  pfree(s2.data);
1044 
1045  elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
1046  nchanges, path);
1047 
1048  apply_handle_commit_internal(s, &commit_data);
1049 
1050  /* unlink the files with serialized changes and subxact info */
1052 
1053  /* Process any tables that are being synchronized in parallel. */
1054  process_syncing_tables(commit_data.end_lsn);
1055 
1057 }
1058 
1059 /*
1060  * Helper function for apply_handle_commit and apply_handle_stream_commit.
1061  */
1062 static void
1064 {
1065  /* The synchronization worker runs in single transaction. */
1067  {
1068  /*
1069  * Update origin state so we can restart streaming from correct
1070  * position in case of crash.
1071  */
1072  replorigin_session_origin_lsn = commit_data->end_lsn;
1074 
1076  pgstat_report_stat(false);
1077 
1078  store_flush_position(commit_data->end_lsn);
1079  }
1080  else
1081  {
1082  /* Process any invalidation messages that might have accumulated. */
1085  }
1086 
1087  in_remote_transaction = false;
1088 }
1089 
1090 /*
1091  * Handle RELATION message.
1092  *
1093  * Note we don't do validation against local schema here. The validation
1094  * against local schema is postponed until first change for given relation
1095  * comes as we only care about it when applying changes for it anyway and we
1096  * do less locking this way.
1097  */
1098 static void
1100 {
1101  LogicalRepRelation *rel;
1102 
1104  return;
1105 
1106  rel = logicalrep_read_rel(s);
1108 }
1109 
1110 /*
1111  * Handle TYPE message.
1112  *
1113  * Note we don't do local mapping here, that's done when the type is
1114  * actually used.
1115  */
1116 static void
1118 {
1119  LogicalRepTyp typ;
1120 
1122  return;
1123 
1124  logicalrep_read_typ(s, &typ);
1126 }
1127 
1128 /*
1129  * Get replica identity index or if it is not defined a primary key.
1130  *
1131  * If neither is defined, returns InvalidOid
1132  */
1133 static Oid
1135 {
1136  Oid idxoid;
1137 
1138  idxoid = RelationGetReplicaIndex(rel);
1139 
1140  if (!OidIsValid(idxoid))
1141  idxoid = RelationGetPrimaryKeyIndex(rel);
1142 
1143  return idxoid;
1144 }
1145 
1146 /*
1147  * Handle INSERT message.
1148  */
1149 
1150 static void
1152 {
1153  ResultRelInfo *resultRelInfo;
1154  LogicalRepRelMapEntry *rel;
1155  LogicalRepTupleData newtup;
1156  LogicalRepRelId relid;
1157  EState *estate;
1158  TupleTableSlot *remoteslot;
1159  MemoryContext oldctx;
1160 
1162  return;
1163 
1165 
1166  relid = logicalrep_read_insert(s, &newtup);
1167  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1168  if (!should_apply_changes_for_rel(rel))
1169  {
1170  /*
1171  * The relation can't become interesting in the middle of the
1172  * transaction so it's safe to unlock it.
1173  */
1175  return;
1176  }
1177 
1178  /* Initialize the executor state. */
1179  estate = create_estate_for_relation(rel);
1180  remoteslot = ExecInitExtraTupleSlot(estate,
1181  RelationGetDescr(rel->localrel),
1182  &TTSOpsVirtual);
1183  resultRelInfo = makeNode(ResultRelInfo);
1184  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
1185 
1186  /* Input functions may need an active snapshot, so get one */
1188 
1189  /* Process and store remote tuple in the slot */
1191  slot_store_data(remoteslot, rel, &newtup);
1192  slot_fill_defaults(rel, estate, remoteslot);
1193  MemoryContextSwitchTo(oldctx);
1194 
1195  /* For a partitioned table, insert the tuple into a partition. */
1196  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1197  apply_handle_tuple_routing(resultRelInfo, estate,
1198  remoteslot, NULL, rel, CMD_INSERT);
1199  else
1200  apply_handle_insert_internal(resultRelInfo, estate,
1201  remoteslot);
1202 
1204 
1205  /* Handle queued AFTER triggers. */
1206  AfterTriggerEndQuery(estate);
1207 
1208  ExecResetTupleTable(estate->es_tupleTable, false);
1209  FreeExecutorState(estate);
1210 
1212 
1214 }
1215 
1216 /* Workhorse for apply_handle_insert() */
1217 static void
1219  EState *estate, TupleTableSlot *remoteslot)
1220 {
1221  ExecOpenIndices(relinfo, false);
1222 
1223  /* Do the insert. */
1224  ExecSimpleRelationInsert(relinfo, estate, remoteslot);
1225 
1226  /* Cleanup. */
1227  ExecCloseIndices(relinfo);
1228 }
1229 
1230 /*
1231  * Check if the logical replication relation is updatable and throw
1232  * appropriate error if it isn't.
1233  */
1234 static void
1236 {
1237  /* Updatable, no error. */
1238  if (rel->updatable)
1239  return;
1240 
1241  /*
1242  * We are in error mode so it's fine this is somewhat slow. It's better to
1243  * give user correct error.
1244  */
1246  {
1247  ereport(ERROR,
1248  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1249  errmsg("publisher did not send replica identity column "
1250  "expected by the logical replication target relation \"%s.%s\"",
1251  rel->remoterel.nspname, rel->remoterel.relname)));
1252  }
1253 
1254  ereport(ERROR,
1255  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1256  errmsg("logical replication target relation \"%s.%s\" has "
1257  "neither REPLICA IDENTITY index nor PRIMARY "
1258  "KEY and published relation does not have "
1259  "REPLICA IDENTITY FULL",
1260  rel->remoterel.nspname, rel->remoterel.relname)));
1261 }
1262 
1263 /*
1264  * Handle UPDATE message.
1265  *
1266  * TODO: FDW support
1267  */
1268 static void
1270 {
1271  ResultRelInfo *resultRelInfo;
1272  LogicalRepRelMapEntry *rel;
1273  LogicalRepRelId relid;
1274  EState *estate;
1275  LogicalRepTupleData oldtup;
1276  LogicalRepTupleData newtup;
1277  bool has_oldtup;
1278  TupleTableSlot *remoteslot;
1279  RangeTblEntry *target_rte;
1280  MemoryContext oldctx;
1281 
1283  return;
1284 
1286 
1287  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
1288  &newtup);
1289  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1290  if (!should_apply_changes_for_rel(rel))
1291  {
1292  /*
1293  * The relation can't become interesting in the middle of the
1294  * transaction so it's safe to unlock it.
1295  */
1297  return;
1298  }
1299 
1300  /* Check if we can do the update. */
1302 
1303  /* Initialize the executor state. */
1304  estate = create_estate_for_relation(rel);
1305  remoteslot = ExecInitExtraTupleSlot(estate,
1306  RelationGetDescr(rel->localrel),
1307  &TTSOpsVirtual);
1308  resultRelInfo = makeNode(ResultRelInfo);
1309  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
1310 
1311  /*
1312  * Populate updatedCols so that per-column triggers can fire. This could
1313  * include more columns than were actually changed on the publisher
1314  * because the logical replication protocol doesn't contain that
1315  * information. But it would for example exclude columns that only exist
1316  * on the subscriber, since we are not touching those.
1317  */
1318  target_rte = list_nth(estate->es_range_table, 0);
1319  for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
1320  {
1322  int remoteattnum = rel->attrmap->attnums[i];
1323 
1324  if (!att->attisdropped && remoteattnum >= 0)
1325  {
1326  Assert(remoteattnum < newtup.ncols);
1327  if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
1328  target_rte->updatedCols =
1329  bms_add_member(target_rte->updatedCols,
1331  }
1332  }
1333 
1334  /* Also populate extraUpdatedCols, in case we have generated columns */
1335  fill_extraUpdatedCols(target_rte, rel->localrel);
1336 
1338 
1339  /* Build the search tuple. */
1341  slot_store_data(remoteslot, rel,
1342  has_oldtup ? &oldtup : &newtup);
1343  MemoryContextSwitchTo(oldctx);
1344 
1345  /* For a partitioned table, apply update to correct partition. */
1346  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1347  apply_handle_tuple_routing(resultRelInfo, estate,
1348  remoteslot, &newtup, rel, CMD_UPDATE);
1349  else
1350  apply_handle_update_internal(resultRelInfo, estate,
1351  remoteslot, &newtup, rel);
1352 
1354 
1355  /* Handle queued AFTER triggers. */
1356  AfterTriggerEndQuery(estate);
1357 
1358  ExecResetTupleTable(estate->es_tupleTable, false);
1359  FreeExecutorState(estate);
1360 
1362 
1364 }
1365 
1366 /* Workhorse for apply_handle_update() */
1367 static void
1369  EState *estate, TupleTableSlot *remoteslot,
1370  LogicalRepTupleData *newtup,
1371  LogicalRepRelMapEntry *relmapentry)
1372 {
1373  Relation localrel = relinfo->ri_RelationDesc;
1374  EPQState epqstate;
1375  TupleTableSlot *localslot;
1376  bool found;
1377  MemoryContext oldctx;
1378 
1379  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1380  ExecOpenIndices(relinfo, false);
1381 
1382  found = FindReplTupleInLocalRel(estate, localrel,
1383  &relmapentry->remoterel,
1384  remoteslot, &localslot);
1385  ExecClearTuple(remoteslot);
1386 
1387  /*
1388  * Tuple found.
1389  *
1390  * Note this will fail if there are other conflicting unique indexes.
1391  */
1392  if (found)
1393  {
1394  /* Process and store remote tuple in the slot */
1396  slot_modify_data(remoteslot, localslot, relmapentry, newtup);
1397  MemoryContextSwitchTo(oldctx);
1398 
1399  EvalPlanQualSetSlot(&epqstate, remoteslot);
1400 
1401  /* Do the actual update. */
1402  ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
1403  remoteslot);
1404  }
1405  else
1406  {
1407  /*
1408  * The tuple to be updated could not be found.
1409  *
1410  * TODO what to do here, change the log level to LOG perhaps?
1411  */
1412  elog(DEBUG1,
1413  "logical replication did not find row for update "
1414  "in replication target relation \"%s\"",
1415  RelationGetRelationName(localrel));
1416  }
1417 
1418  /* Cleanup. */
1419  ExecCloseIndices(relinfo);
1420  EvalPlanQualEnd(&epqstate);
1421 }
1422 
1423 /*
1424  * Handle DELETE message.
1425  *
1426  * TODO: FDW support
1427  */
1428 static void
1430 {
1431  ResultRelInfo *resultRelInfo;
1432  LogicalRepRelMapEntry *rel;
1433  LogicalRepTupleData oldtup;
1434  LogicalRepRelId relid;
1435  EState *estate;
1436  TupleTableSlot *remoteslot;
1437  MemoryContext oldctx;
1438 
1440  return;
1441 
1443 
1444  relid = logicalrep_read_delete(s, &oldtup);
1445  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1446  if (!should_apply_changes_for_rel(rel))
1447  {
1448  /*
1449  * The relation can't become interesting in the middle of the
1450  * transaction so it's safe to unlock it.
1451  */
1453  return;
1454  }
1455 
1456  /* Check if we can do the delete. */
1458 
1459  /* Initialize the executor state. */
1460  estate = create_estate_for_relation(rel);
1461  remoteslot = ExecInitExtraTupleSlot(estate,
1462  RelationGetDescr(rel->localrel),
1463  &TTSOpsVirtual);
1464  resultRelInfo = makeNode(ResultRelInfo);
1465  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
1466 
1468 
1469  /* Build the search tuple. */
1471  slot_store_data(remoteslot, rel, &oldtup);
1472  MemoryContextSwitchTo(oldctx);
1473 
1474  /* For a partitioned table, apply delete to correct partition. */
1475  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1476  apply_handle_tuple_routing(resultRelInfo, estate,
1477  remoteslot, NULL, rel, CMD_DELETE);
1478  else
1479  apply_handle_delete_internal(resultRelInfo, estate,
1480  remoteslot, &rel->remoterel);
1481 
1483 
1484  /* Handle queued AFTER triggers. */
1485  AfterTriggerEndQuery(estate);
1486 
1487  ExecResetTupleTable(estate->es_tupleTable, false);
1488  FreeExecutorState(estate);
1489 
1491 
1493 }
1494 
1495 /* Workhorse for apply_handle_delete() */
1496 static void
1498  TupleTableSlot *remoteslot,
1499  LogicalRepRelation *remoterel)
1500 {
1501  Relation localrel = relinfo->ri_RelationDesc;
1502  EPQState epqstate;
1503  TupleTableSlot *localslot;
1504  bool found;
1505 
1506  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1507  ExecOpenIndices(relinfo, false);
1508 
1509  found = FindReplTupleInLocalRel(estate, localrel, remoterel,
1510  remoteslot, &localslot);
1511 
1512  /* If found delete it. */
1513  if (found)
1514  {
1515  EvalPlanQualSetSlot(&epqstate, localslot);
1516 
1517  /* Do the actual delete. */
1518  ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
1519  }
1520  else
1521  {
1522  /* The tuple to be deleted could not be found. */
1523  elog(DEBUG1,
1524  "logical replication could not find row for delete "
1525  "in replication target relation \"%s\"",
1526  RelationGetRelationName(localrel));
1527  }
1528 
1529  /* Cleanup. */
1530  ExecCloseIndices(relinfo);
1531  EvalPlanQualEnd(&epqstate);
1532 }
1533 
1534 /*
1535  * Try to find a tuple received from the publication side (in 'remoteslot') in
1536  * the corresponding local relation using either replica identity index,
1537  * primary key or if needed, sequential scan.
1538  *
1539  * Local tuple, if found, is returned in '*localslot'.
1540  */
1541 static bool
1543  LogicalRepRelation *remoterel,
1544  TupleTableSlot *remoteslot,
1545  TupleTableSlot **localslot)
1546 {
1547  Oid idxoid;
1548  bool found;
1549 
1550  *localslot = table_slot_create(localrel, &estate->es_tupleTable);
1551 
1552  idxoid = GetRelationIdentityOrPK(localrel);
1553  Assert(OidIsValid(idxoid) ||
1554  (remoterel->replident == REPLICA_IDENTITY_FULL));
1555 
1556  if (OidIsValid(idxoid))
1557  found = RelationFindReplTupleByIndex(localrel, idxoid,
1559  remoteslot, *localslot);
1560  else
1561  found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
1562  remoteslot, *localslot);
1563 
1564  return found;
1565 }
1566 
1567 /*
1568  * This handles insert, update, delete on a partitioned table.
1569  */
1570 static void
1572  EState *estate,
1573  TupleTableSlot *remoteslot,
1574  LogicalRepTupleData *newtup,
1575  LogicalRepRelMapEntry *relmapentry,
1576  CmdType operation)
1577 {
1578  Relation parentrel = relinfo->ri_RelationDesc;
1579  ModifyTableState *mtstate = NULL;
1580  PartitionTupleRouting *proute = NULL;
1581  ResultRelInfo *partrelinfo;
1582  Relation partrel;
1583  TupleTableSlot *remoteslot_part;
1584  TupleConversionMap *map;
1585  MemoryContext oldctx;
1586 
1587  /* ModifyTableState is needed for ExecFindPartition(). */
1588  mtstate = makeNode(ModifyTableState);
1589  mtstate->ps.plan = NULL;
1590  mtstate->ps.state = estate;
1591  mtstate->operation = operation;
1592  mtstate->resultRelInfo = relinfo;
1593  proute = ExecSetupPartitionTupleRouting(estate, mtstate, parentrel);
1594 
1595  /*
1596  * Find the partition to which the "search tuple" belongs.
1597  */
1598  Assert(remoteslot != NULL);
1600  partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
1601  remoteslot, estate);
1602  Assert(partrelinfo != NULL);
1603  partrel = partrelinfo->ri_RelationDesc;
1604 
1605  /*
1606  * To perform any of the operations below, the tuple must match the
1607  * partition's rowtype. Convert if needed or just copy, using a dedicated
1608  * slot to store the tuple in any case.
1609  */
1610  remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
1611  if (remoteslot_part == NULL)
1612  remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
1613  map = partrelinfo->ri_RootToPartitionMap;
1614  if (map != NULL)
1615  remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
1616  remoteslot_part);
1617  else
1618  {
1619  remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
1620  slot_getallattrs(remoteslot_part);
1621  }
1622  MemoryContextSwitchTo(oldctx);
1623 
1624  switch (operation)
1625  {
1626  case CMD_INSERT:
1627  apply_handle_insert_internal(partrelinfo, estate,
1628  remoteslot_part);
1629  break;
1630 
1631  case CMD_DELETE:
1632  apply_handle_delete_internal(partrelinfo, estate,
1633  remoteslot_part,
1634  &relmapentry->remoterel);
1635  break;
1636 
1637  case CMD_UPDATE:
1638 
1639  /*
1640  * For UPDATE, depending on whether or not the updated tuple
1641  * satisfies the partition's constraint, perform a simple UPDATE
1642  * of the partition or move the updated tuple into a different
1643  * suitable partition.
1644  */
1645  {
1646  AttrMap *attrmap = map ? map->attrMap : NULL;
1647  LogicalRepRelMapEntry *part_entry;
1648  TupleTableSlot *localslot;
1649  ResultRelInfo *partrelinfo_new;
1650  bool found;
1651 
1652  part_entry = logicalrep_partition_open(relmapentry, partrel,
1653  attrmap);
1654 
1655  /* Get the matching local tuple from the partition. */
1656  found = FindReplTupleInLocalRel(estate, partrel,
1657  &part_entry->remoterel,
1658  remoteslot_part, &localslot);
1659 
1661  if (found)
1662  {
1663  /* Apply the update. */
1664  slot_modify_data(remoteslot_part, localslot,
1665  part_entry,
1666  newtup);
1667  MemoryContextSwitchTo(oldctx);
1668  }
1669  else
1670  {
1671  /*
1672  * The tuple to be updated could not be found.
1673  *
1674  * TODO what to do here, change the log level to LOG
1675  * perhaps?
1676  */
1677  elog(DEBUG1,
1678  "logical replication did not find row for update "
1679  "in replication target relation \"%s\"",
1680  RelationGetRelationName(partrel));
1681  }
1682 
1683  /*
1684  * Does the updated tuple still satisfy the current
1685  * partition's constraint?
1686  */
1687  if (!partrel->rd_rel->relispartition ||
1688  ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
1689  false))
1690  {
1691  /*
1692  * Yes, so simply UPDATE the partition. We don't call
1693  * apply_handle_update_internal() here, which would
1694  * normally do the following work, to avoid repeating some
1695  * work already done above to find the local tuple in the
1696  * partition.
1697  */
1698  EPQState epqstate;
1699 
1700  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1701  ExecOpenIndices(partrelinfo, false);
1702 
1703  EvalPlanQualSetSlot(&epqstate, remoteslot_part);
1704  ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
1705  localslot, remoteslot_part);
1706  ExecCloseIndices(partrelinfo);
1707  EvalPlanQualEnd(&epqstate);
1708  }
1709  else
1710  {
1711  /* Move the tuple into the new partition. */
1712 
1713  /*
1714  * New partition will be found using tuple routing, which
1715  * can only occur via the parent table. We might need to
1716  * convert the tuple to the parent's rowtype. Note that
1717  * this is the tuple found in the partition, not the
1718  * original search tuple received by this function.
1719  */
1720  if (map)
1721  {
1722  TupleConversionMap *PartitionToRootMap =
1724  RelationGetDescr(parentrel));
1725 
1726  remoteslot =
1727  execute_attr_map_slot(PartitionToRootMap->attrMap,
1728  remoteslot_part, remoteslot);
1729  }
1730  else
1731  {
1732  remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
1733  slot_getallattrs(remoteslot);
1734  }
1735 
1736 
1737  /* Find the new partition. */
1739  partrelinfo_new = ExecFindPartition(mtstate, relinfo,
1740  proute, remoteslot,
1741  estate);
1742  MemoryContextSwitchTo(oldctx);
1743  Assert(partrelinfo_new != partrelinfo);
1744 
1745  /* DELETE old tuple found in the old partition. */
1746  apply_handle_delete_internal(partrelinfo, estate,
1747  localslot,
1748  &relmapentry->remoterel);
1749 
1750  /* INSERT new tuple into the new partition. */
1751 
1752  /*
1753  * Convert the replacement tuple to match the destination
1754  * partition rowtype.
1755  */
1757  partrel = partrelinfo_new->ri_RelationDesc;
1758  remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
1759  if (remoteslot_part == NULL)
1760  remoteslot_part = table_slot_create(partrel,
1761  &estate->es_tupleTable);
1762  map = partrelinfo_new->ri_RootToPartitionMap;
1763  if (map != NULL)
1764  {
1765  remoteslot_part = execute_attr_map_slot(map->attrMap,
1766  remoteslot,
1767  remoteslot_part);
1768  }
1769  else
1770  {
1771  remoteslot_part = ExecCopySlot(remoteslot_part,
1772  remoteslot);
1773  slot_getallattrs(remoteslot);
1774  }
1775  MemoryContextSwitchTo(oldctx);
1776  apply_handle_insert_internal(partrelinfo_new, estate,
1777  remoteslot_part);
1778  }
1779  }
1780  break;
1781 
1782  default:
1783  elog(ERROR, "unrecognized CmdType: %d", (int) operation);
1784  break;
1785  }
1786 
1787  ExecCleanupTupleRouting(mtstate, proute);
1788 }
1789 
1790 /*
1791  * Handle TRUNCATE message.
1792  *
1793  * TODO: FDW support
1794  */
1795 static void
1797 {
1798  bool cascade = false;
1799  bool restart_seqs = false;
1800  List *remote_relids = NIL;
1801  List *remote_rels = NIL;
1802  List *rels = NIL;
1803  List *part_rels = NIL;
1804  List *relids = NIL;
1805  List *relids_logged = NIL;
1806  ListCell *lc;
1807 
1809  return;
1810 
1812 
1813  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
1814 
1815  foreach(lc, remote_relids)
1816  {
1817  LogicalRepRelId relid = lfirst_oid(lc);
1818  LogicalRepRelMapEntry *rel;
1819 
1820  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1821  if (!should_apply_changes_for_rel(rel))
1822  {
1823  /*
1824  * The relation can't become interesting in the middle of the
1825  * transaction so it's safe to unlock it.
1826  */
1828  continue;
1829  }
1830 
1831  remote_rels = lappend(remote_rels, rel);
1832  rels = lappend(rels, rel->localrel);
1833  relids = lappend_oid(relids, rel->localreloid);
1835  relids_logged = lappend_oid(relids_logged, rel->localreloid);
1836 
1837  /*
1838  * Truncate partitions if we got a message to truncate a partitioned
1839  * table.
1840  */
1841  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1842  {
1843  ListCell *child;
1844  List *children = find_all_inheritors(rel->localreloid,
1846  NULL);
1847 
1848  foreach(child, children)
1849  {
1850  Oid childrelid = lfirst_oid(child);
1851  Relation childrel;
1852 
1853  if (list_member_oid(relids, childrelid))
1854  continue;
1855 
1856  /* find_all_inheritors already got lock */
1857  childrel = table_open(childrelid, NoLock);
1858 
1859  /*
1860  * Ignore temp tables of other backends. See similar code in
1861  * ExecuteTruncate().
1862  */
1863  if (RELATION_IS_OTHER_TEMP(childrel))
1864  {
1865  table_close(childrel, RowExclusiveLock);
1866  continue;
1867  }
1868 
1869  rels = lappend(rels, childrel);
1870  part_rels = lappend(part_rels, childrel);
1871  relids = lappend_oid(relids, childrelid);
1872  /* Log this relation only if needed for logical decoding */
1873  if (RelationIsLogicallyLogged(childrel))
1874  relids_logged = lappend_oid(relids_logged, childrelid);
1875  }
1876  }
1877  }
1878 
1879  /*
1880  * Even if we used CASCADE on the upstream primary we explicitly default
1881  * to replaying changes without further cascading. This might be later
1882  * changeable with a user specified option.
1883  */
1884  ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
1885 
1886  foreach(lc, remote_rels)
1887  {
1888  LogicalRepRelMapEntry *rel = lfirst(lc);
1889 
1891  }
1892  foreach(lc, part_rels)
1893  {
1894  Relation rel = lfirst(lc);
1895 
1896  table_close(rel, NoLock);
1897  }
1898 
1900 }
1901 
1902 
1903 /*
1904  * Logical replication protocol message dispatcher.
1905  */
1906 static void
1908 {
1910 
1911  switch (action)
1912  {
1913  case LOGICAL_REP_MSG_BEGIN:
1914  apply_handle_begin(s);
1915  return;
1916 
1919  return;
1920 
1923  return;
1924 
1927  return;
1928 
1931  return;
1932 
1935  return;
1936 
1939  return;
1940 
1941  case LOGICAL_REP_MSG_TYPE:
1942  apply_handle_type(s);
1943  return;
1944 
1947  return;
1948 
1951  return;
1952 
1955  return;
1956 
1959  return;
1960 
1963  return;
1964  }
1965 
1966  ereport(ERROR,
1967  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1968  errmsg("invalid logical replication message type \"%c\"", action)));
1969 }
1970 
1971 /*
1972  * Figure out which write/flush positions to report to the walsender process.
1973  *
1974  * We can't simply report back the last LSN the walsender sent us because the
1975  * local transaction might not yet be flushed to disk locally. Instead we
1976  * build a list that associates local with remote LSNs for every commit. When
1977  * reporting back the flush position to the sender we iterate that list and
1978  * check which entries on it are already locally flushed. Those we can report
1979  * as having been flushed.
1980  *
1981  * The have_pending_txes is true if there are outstanding transactions that
1982  * need to be flushed.
1983  */
1984 static void
1986  bool *have_pending_txes)
1987 {
1988  dlist_mutable_iter iter;
1989  XLogRecPtr local_flush = GetFlushRecPtr();
1990 
1991  *write = InvalidXLogRecPtr;
1992  *flush = InvalidXLogRecPtr;
1993 
1994  dlist_foreach_modify(iter, &lsn_mapping)
1995  {
1996  FlushPosition *pos =
1998 
1999  *write = pos->remote_end;
2000 
2001  if (pos->local_end <= local_flush)
2002  {
2003  *flush = pos->remote_end;
2004  dlist_delete(iter.cur);
2005  pfree(pos);
2006  }
2007  else
2008  {
2009  /*
2010  * Don't want to uselessly iterate over the rest of the list which
2011  * could potentially be long. Instead get the last element and
2012  * grab the write position from there.
2013  */
2015  &lsn_mapping);
2016  *write = pos->remote_end;
2017  *have_pending_txes = true;
2018  return;
2019  }
2020  }
2021 
2022  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
2023 }
2024 
2025 /*
2026  * Store current remote/local lsn pair in the tracking list.
2027  */
2028 static void
2030 {
2031  FlushPosition *flushpos;
2032 
2033  /* Need to do this in permanent context */
2034  MemoryContextSwitchTo(ApplyContext);
2035 
2036  /* Track commit lsn */
2037  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
2038  flushpos->local_end = XactLastCommitEnd;
2039  flushpos->remote_end = remote_lsn;
2040 
2041  dlist_push_tail(&lsn_mapping, &flushpos->node);
2042  MemoryContextSwitchTo(ApplyMessageContext);
2043 }
2044 
2045 
2046 /* Update statistics of the worker. */
2047 static void
2048 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
2049 {
2050  MyLogicalRepWorker->last_lsn = last_lsn;
2051  MyLogicalRepWorker->last_send_time = send_time;
2053  if (reply)
2054  {
2055  MyLogicalRepWorker->reply_lsn = last_lsn;
2056  MyLogicalRepWorker->reply_time = send_time;
2057  }
2058 }
2059 
2060 /*
2061  * Apply main loop.
2062  */
2063 static void
2065 {
2066  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
2067  bool ping_sent = false;
2068  TimeLineID tli;
2069 
2070  /*
2071  * Init the ApplyMessageContext which we clean up after each replication
2072  * protocol message.
2073  */
2074  ApplyMessageContext = AllocSetContextCreate(ApplyContext,
2075  "ApplyMessageContext",
2077 
2078  /*
2079  * This memory context is used for per-stream data when the streaming mode
2080  * is enabled. This context is reset on each stream stop.
2081  */
2082  LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
2083  "LogicalStreamingContext",
2085 
2086  /* mark as idle, before starting to loop */
2088 
2089  /* This outer loop iterates once per wait. */
2090  for (;;)
2091  {
2093  int rc;
2094  int len;
2095  char *buf = NULL;
2096  bool endofstream = false;
2097  long wait_time;
2098 
2100 
2101  MemoryContextSwitchTo(ApplyMessageContext);
2102 
2103  len = walrcv_receive(wrconn, &buf, &fd);
2104 
2105  if (len != 0)
2106  {
2107  /* Loop to process all available data (without blocking). */
2108  for (;;)
2109  {
2111 
2112  if (len == 0)
2113  {
2114  break;
2115  }
2116  else if (len < 0)
2117  {
2118  ereport(LOG,
2119  (errmsg("data stream from publisher has ended")));
2120  endofstream = true;
2121  break;
2122  }
2123  else
2124  {
2125  int c;
2126  StringInfoData s;
2127 
2128  /* Reset timeout. */
2129  last_recv_timestamp = GetCurrentTimestamp();
2130  ping_sent = false;
2131 
2132  /* Ensure we are reading the data into our memory context. */
2133  MemoryContextSwitchTo(ApplyMessageContext);
2134 
2135  s.data = buf;
2136  s.len = len;
2137  s.cursor = 0;
2138  s.maxlen = -1;
2139 
2140  c = pq_getmsgbyte(&s);
2141 
2142  if (c == 'w')
2143  {
2144  XLogRecPtr start_lsn;
2145  XLogRecPtr end_lsn;
2146  TimestampTz send_time;
2147 
2148  start_lsn = pq_getmsgint64(&s);
2149  end_lsn = pq_getmsgint64(&s);
2150  send_time = pq_getmsgint64(&s);
2151 
2152  if (last_received < start_lsn)
2153  last_received = start_lsn;
2154 
2155  if (last_received < end_lsn)
2156  last_received = end_lsn;
2157 
2158  UpdateWorkerStats(last_received, send_time, false);
2159 
2160  apply_dispatch(&s);
2161  }
2162  else if (c == 'k')
2163  {
2164  XLogRecPtr end_lsn;
2166  bool reply_requested;
2167 
2168  end_lsn = pq_getmsgint64(&s);
2169  timestamp = pq_getmsgint64(&s);
2170  reply_requested = pq_getmsgbyte(&s);
2171 
2172  if (last_received < end_lsn)
2173  last_received = end_lsn;
2174 
2175  send_feedback(last_received, reply_requested, false);
2176  UpdateWorkerStats(last_received, timestamp, true);
2177  }
2178  /* other message types are purposefully ignored */
2179 
2180  MemoryContextReset(ApplyMessageContext);
2181  }
2182 
2183  len = walrcv_receive(wrconn, &buf, &fd);
2184  }
2185  }
2186 
2187  /* confirm all writes so far */
2188  send_feedback(last_received, false, false);
2189 
2191  {
2192  /*
2193  * If we didn't get any transactions for a while there might be
2194  * unconsumed invalidation messages in the queue, consume them
2195  * now.
2196  */
2199 
2200  /* Process any table synchronization changes. */
2201  process_syncing_tables(last_received);
2202  }
2203 
2204  /* Cleanup the memory. */
2205  MemoryContextResetAndDeleteChildren(ApplyMessageContext);
2207 
2208  /* Check if we need to exit the streaming loop. */
2209  if (endofstream)
2210  break;
2211 
2212  /*
2213  * Wait for more data or latch. If we have unflushed transactions,
2214  * wake up after WalWriterDelay to see if they've been flushed yet (in
2215  * which case we should send a feedback message). Otherwise, there's
2216  * no particular urgency about waking up unless we get data or a
2217  * signal.
2218  */
2219  if (!dlist_is_empty(&lsn_mapping))
2220  wait_time = WalWriterDelay;
2221  else
2222  wait_time = NAPTIME_PER_CYCLE;
2223 
2227  fd, wait_time,
2229 
2230  if (rc & WL_LATCH_SET)
2231  {
2234  }
2235 
2236  if (ConfigReloadPending)
2237  {
2238  ConfigReloadPending = false;
2240  }
2241 
2242  if (rc & WL_TIMEOUT)
2243  {
2244  /*
2245  * We didn't receive anything new. If we haven't heard anything
2246  * from the server for more than wal_receiver_timeout / 2, ping
2247  * the server. Also, if it's been longer than
2248  * wal_receiver_status_interval since the last update we sent,
2249  * send a status update to the primary anyway, to report any
2250  * progress in applying WAL.
2251  */
2252  bool requestReply = false;
2253 
2254  /*
2255  * Check if time since last receive from standby has reached the
2256  * configured limit.
2257  */
2258  if (wal_receiver_timeout > 0)
2259  {
2261  TimestampTz timeout;
2262 
2263  timeout =
2264  TimestampTzPlusMilliseconds(last_recv_timestamp,
2266 
2267  if (now >= timeout)
2268  ereport(ERROR,
2269  (errmsg("terminating logical replication worker due to timeout")));
2270 
2271  /* Check to see if it's time for a ping. */
2272  if (!ping_sent)
2273  {
2274  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
2275  (wal_receiver_timeout / 2));
2276  if (now >= timeout)
2277  {
2278  requestReply = true;
2279  ping_sent = true;
2280  }
2281  }
2282  }
2283 
2284  send_feedback(last_received, requestReply, requestReply);
2285  }
2286  }
2287 
2288  /* All done */
2289  walrcv_endstreaming(wrconn, &tli);
2290 }
2291 
2292 /*
2293  * Send a Standby Status Update message to server.
2294  *
2295  * 'recvpos' is the latest LSN we've received data to, force is set if we need
2296  * to send a response to avoid timeouts.
2297  */
2298 static void
2299 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
2300 {
2301  static StringInfo reply_message = NULL;
2302  static TimestampTz send_time = 0;
2303 
2304  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
2305  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
2306  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
2307 
2308  XLogRecPtr writepos;
2309  XLogRecPtr flushpos;
2310  TimestampTz now;
2311  bool have_pending_txes;
2312 
2313  /*
2314  * If the user doesn't want status to be reported to the publisher, be
2315  * sure to exit before doing anything at all.
2316  */
2317  if (!force && wal_receiver_status_interval <= 0)
2318  return;
2319 
2320  /* It's legal to not pass a recvpos */
2321  if (recvpos < last_recvpos)
2322  recvpos = last_recvpos;
2323 
2324  get_flush_position(&writepos, &flushpos, &have_pending_txes);
2325 
2326  /*
2327  * No outstanding transactions to flush, we can report the latest received
2328  * position. This is important for synchronous replication.
2329  */
2330  if (!have_pending_txes)
2331  flushpos = writepos = recvpos;
2332 
2333  if (writepos < last_writepos)
2334  writepos = last_writepos;
2335 
2336  if (flushpos < last_flushpos)
2337  flushpos = last_flushpos;
2338 
2339  now = GetCurrentTimestamp();
2340 
2341  /* if we've already reported everything we're good */
2342  if (!force &&
2343  writepos == last_writepos &&
2344  flushpos == last_flushpos &&
2345  !TimestampDifferenceExceeds(send_time, now,
2347  return;
2348  send_time = now;
2349 
2350  if (!reply_message)
2351  {
2352  MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
2353 
2354  reply_message = makeStringInfo();
2355  MemoryContextSwitchTo(oldctx);
2356  }
2357  else
2358  resetStringInfo(reply_message);
2359 
2360  pq_sendbyte(reply_message, 'r');
2361  pq_sendint64(reply_message, recvpos); /* write */
2362  pq_sendint64(reply_message, flushpos); /* flush */
2363  pq_sendint64(reply_message, writepos); /* apply */
2364  pq_sendint64(reply_message, now); /* sendTime */
2365  pq_sendbyte(reply_message, requestReply); /* replyRequested */
2366 
2367  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
2368  force,
2369  (uint32) (recvpos >> 32), (uint32) recvpos,
2370  (uint32) (writepos >> 32), (uint32) writepos,
2371  (uint32) (flushpos >> 32), (uint32) flushpos
2372  );
2373 
2374  walrcv_send(wrconn, reply_message->data, reply_message->len);
2375 
2376  if (recvpos > last_recvpos)
2377  last_recvpos = recvpos;
2378  if (writepos > last_writepos)
2379  last_writepos = writepos;
2380  if (flushpos > last_flushpos)
2381  last_flushpos = flushpos;
2382 }
2383 
2384 /*
2385  * Reread subscription info if needed. Most changes will be exit.
2386  */
2387 static void
2389 {
2390  MemoryContext oldctx;
2392  bool started_tx = false;
2393 
2394  /* When cache state is valid there is nothing to do here. */
2395  if (MySubscriptionValid)
2396  return;
2397 
2398  /* This function might be called inside or outside of transaction. */
2399  if (!IsTransactionState())
2400  {
2402  started_tx = true;
2403  }
2404 
2405  /* Ensure allocations in permanent context. */
2406  oldctx = MemoryContextSwitchTo(ApplyContext);
2407 
2408  newsub = GetSubscription(MyLogicalRepWorker->subid, true);
2409 
2410  /*
2411  * Exit if the subscription was removed. This normally should not happen
2412  * as the worker gets killed during DROP SUBSCRIPTION.
2413  */
2414  if (!newsub)
2415  {
2416  ereport(LOG,
2417  (errmsg("logical replication apply worker for subscription \"%s\" will "
2418  "stop because the subscription was removed",
2419  MySubscription->name)));
2420 
2421  proc_exit(0);
2422  }
2423 
2424  /*
2425  * Exit if the subscription was disabled. This normally should not happen
2426  * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
2427  */
2428  if (!newsub->enabled)
2429  {
2430  ereport(LOG,
2431  (errmsg("logical replication apply worker for subscription \"%s\" will "
2432  "stop because the subscription was disabled",
2433  MySubscription->name)));
2434 
2435  proc_exit(0);
2436  }
2437 
2438  /* !slotname should never happen when enabled is true. */
2439  Assert(newsub->slotname);
2440 
2441  /*
2442  * Exit if any parameter that affects the remote connection was changed.
2443  * The launcher will start a new worker.
2444  */
2445  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
2446  strcmp(newsub->name, MySubscription->name) != 0 ||
2447  strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
2448  newsub->binary != MySubscription->binary ||
2449  newsub->stream != MySubscription->stream ||
2450  !equal(newsub->publications, MySubscription->publications))
2451  {
2452  ereport(LOG,
2453  (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
2454  MySubscription->name)));
2455 
2456  proc_exit(0);
2457  }
2458 
2459  /* Check for other changes that should never happen too. */
2460  if (newsub->dbid != MySubscription->dbid)
2461  {
2462  elog(ERROR, "subscription %u changed unexpectedly",
2464  }
2465 
2466  /* Clean old subscription info and switch to new one. */
2467  FreeSubscription(MySubscription);
2468  MySubscription = newsub;
2469 
2470  MemoryContextSwitchTo(oldctx);
2471 
2472  /* Change synchronous commit according to the user's wishes */
2473  SetConfigOption("synchronous_commit", MySubscription->synccommit,
2475 
2476  if (started_tx)
2478 
2479  MySubscriptionValid = true;
2480 }
2481 
2482 /*
2483  * Callback from subscription syscache invalidation.
2484  */
2485 static void
2486 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
2487 {
2488  MySubscriptionValid = false;
2489 }
2490 
2491 /*
2492  * subxact_info_write
2493  * Store information about subxacts for a toplevel transaction.
2494  *
2495  * For each subxact we store offset of it's first change in the main file.
2496  * The file is always over-written as a whole.
2497  *
2498  * XXX We should only store subxacts that were not aborted yet.
2499  */
2500 static void
2502 {
2503  char path[MAXPGPATH];
2504  bool found;
2505  Size len;
2506  StreamXidHash *ent;
2507  BufFile *fd;
2508 
2510 
2511  /* find the xid entry in the xidhash */
2512  ent = (StreamXidHash *) hash_search(xidhash,
2513  (void *) &xid,
2514  HASH_FIND,
2515  &found);
2516  /* we must found the entry for its top transaction by this time */
2517  Assert(found);
2518 
2519  /*
2520  * If there is no subtransaction then nothing to do, but if already have
2521  * subxact file then delete that.
2522  */
2523  if (subxact_data.nsubxacts == 0)
2524  {
2525  if (ent->subxact_fileset)
2526  {
2529  pfree(ent->subxact_fileset);
2530  ent->subxact_fileset = NULL;
2531  }
2532  return;
2533  }
2534 
2535  subxact_filename(path, subid, xid);
2536 
2537  /*
2538  * Create the subxact file if it not already created, otherwise open the
2539  * existing file.
2540  */
2541  if (ent->subxact_fileset == NULL)
2542  {
2543  MemoryContext oldctx;
2544 
2545  /*
2546  * We need to maintain shared fileset across multiple stream
2547  * start/stop calls. So, need to allocate it in a persistent context.
2548  */
2549  oldctx = MemoryContextSwitchTo(ApplyContext);
2550  ent->subxact_fileset = palloc(sizeof(SharedFileSet));
2551  SharedFileSetInit(ent->subxact_fileset, NULL);
2552  MemoryContextSwitchTo(oldctx);
2553 
2554  fd = BufFileCreateShared(ent->subxact_fileset, path);
2555  }
2556  else
2557  fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR);
2558 
2559  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
2560 
2561  /* Write the subxact count and subxact info */
2562  BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
2563  BufFileWrite(fd, subxact_data.subxacts, len);
2564 
2565  BufFileClose(fd);
2566 
2567  /* free the memory allocated for subxact info */
2569 }
2570 
2571 /*
2572  * subxact_info_read
2573  * Restore information about subxacts of a streamed transaction.
2574  *
2575  * Read information about subxacts into the structure subxact_data that can be
2576  * used later.
2577  */
2578 static void
2580 {
2581  char path[MAXPGPATH];
2582  bool found;
2583  Size len;
2584  BufFile *fd;
2585  StreamXidHash *ent;
2586  MemoryContext oldctx;
2587 
2589  Assert(!subxact_data.subxacts);
2590  Assert(subxact_data.nsubxacts == 0);
2591  Assert(subxact_data.nsubxacts_max == 0);
2592 
2593  /* Find the stream xid entry in the xidhash */
2594  ent = (StreamXidHash *) hash_search(xidhash,
2595  (void *) &xid,
2596  HASH_FIND,
2597  &found);
2598 
2599  /*
2600  * If subxact_fileset is not valid that mean we don't have any subxact
2601  * info
2602  */
2603  if (ent->subxact_fileset == NULL)
2604  return;
2605 
2606  subxact_filename(path, subid, xid);
2607 
2608  fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDONLY);
2609 
2610  /* read number of subxact items */
2611  if (BufFileRead(fd, &subxact_data.nsubxacts,
2612  sizeof(subxact_data.nsubxacts)) !=
2613  sizeof(subxact_data.nsubxacts))
2614  ereport(ERROR,
2616  errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
2617  path)));
2618 
2619  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
2620 
2621  /* we keep the maximum as a power of 2 */
2622  subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts);
2623 
2624  /*
2625  * Allocate subxact information in the logical streaming context. We need
2626  * this information during the complete stream so that we can add the sub
2627  * transaction info to this. On stream stop we will flush this information
2628  * to the subxact file and reset the logical streaming context.
2629  */
2630  oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
2631  subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
2632  sizeof(SubXactInfo));
2633  MemoryContextSwitchTo(oldctx);
2634 
2635  if ((len > 0) && ((BufFileRead(fd, subxact_data.subxacts, len)) != len))
2636  ereport(ERROR,
2638  errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
2639  path)));
2640 
2641  BufFileClose(fd);
2642 }
2643 
2644 /*
2645  * subxact_info_add
2646  * Add information about a subxact (offset in the main file).
2647  */
2648 static void
2650 {
2651  SubXactInfo *subxacts = subxact_data.subxacts;
2652  int64 i;
2653 
2654  /* We must have a valid top level stream xid and a stream fd. */
2656  Assert(stream_fd != NULL);
2657 
2658  /*
2659  * If the XID matches the toplevel transaction, we don't want to add it.
2660  */
2661  if (stream_xid == xid)
2662  return;
2663 
2664  /*
2665  * In most cases we're checking the same subxact as we've already seen in
2666  * the last call, so make sure to ignore it (this change comes later).
2667  */
2668  if (subxact_data.subxact_last == xid)
2669  return;
2670 
2671  /* OK, remember we're processing this XID. */
2672  subxact_data.subxact_last = xid;
2673 
2674  /*
2675  * Check if the transaction is already present in the array of subxact. We
2676  * intentionally scan the array from the tail, because we're likely adding
2677  * a change for the most recent subtransactions.
2678  *
2679  * XXX Can we rely on the subxact XIDs arriving in sorted order? That
2680  * would allow us to use binary search here.
2681  */
2682  for (i = subxact_data.nsubxacts; i > 0; i--)
2683  {
2684  /* found, so we're done */
2685  if (subxacts[i - 1].xid == xid)
2686  return;
2687  }
2688 
2689  /* This is a new subxact, so we need to add it to the array. */
2690  if (subxact_data.nsubxacts == 0)
2691  {
2692  MemoryContext oldctx;
2693 
2694  subxact_data.nsubxacts_max = 128;
2695 
2696  /*
2697  * Allocate this memory for subxacts in per-stream context, see
2698  * subxact_info_read.
2699  */
2700  oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
2701  subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
2702  MemoryContextSwitchTo(oldctx);
2703  }
2704  else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
2705  {
2706  subxact_data.nsubxacts_max *= 2;
2707  subxacts = repalloc(subxacts,
2708  subxact_data.nsubxacts_max * sizeof(SubXactInfo));
2709  }
2710 
2711  subxacts[subxact_data.nsubxacts].xid = xid;
2712 
2713  /*
2714  * Get the current offset of the stream file and store it as offset of
2715  * this subxact.
2716  */
2717  BufFileTell(stream_fd,
2718  &subxacts[subxact_data.nsubxacts].fileno,
2719  &subxacts[subxact_data.nsubxacts].offset);
2720 
2721  subxact_data.nsubxacts++;
2722  subxact_data.subxacts = subxacts;
2723 }
2724 
2725 /* format filename for file containing the info about subxacts */
2726 static inline void
2727 subxact_filename(char *path, Oid subid, TransactionId xid)
2728 {
2729  snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
2730 }
2731 
2732 /* format filename for file containing serialized changes */
2733 static inline void
2734 changes_filename(char *path, Oid subid, TransactionId xid)
2735 {
2736  snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
2737 }
2738 
2739 /*
2740  * stream_cleanup_files
2741  * Cleanup files for a subscription / toplevel transaction.
2742  *
2743  * Remove files with serialized changes and subxact info for a particular
2744  * toplevel transaction. Each subscription has a separate set of files.
2745  */
2746 static void
2748 {
2749  char path[MAXPGPATH];
2750  StreamXidHash *ent;
2751 
2752  /* Remove the xid entry from the stream xid hash */
2753  ent = (StreamXidHash *) hash_search(xidhash,
2754  (void *) &xid,
2755  HASH_REMOVE,
2756  NULL);
2757  /* By this time we must have created the transaction entry */
2758  Assert(ent != NULL);
2759 
2760  /* Delete the change file and release the stream fileset memory */
2761  changes_filename(path, subid, xid);
2763  pfree(ent->stream_fileset);
2764  ent->stream_fileset = NULL;
2765 
2766  /* Delete the subxact file and release the memory, if it exist */
2767  if (ent->subxact_fileset)
2768  {
2769  subxact_filename(path, subid, xid);
2771  pfree(ent->subxact_fileset);
2772  ent->subxact_fileset = NULL;
2773  }
2774 }
2775 
2776 /*
2777  * stream_open_file
2778  * Open a file that we'll use to serialize changes for a toplevel
2779  * transaction.
2780  *
2781  * Open a file for streamed changes from a toplevel transaction identified
2782  * by stream_xid (global variable). If it's the first chunk of streamed
2783  * changes for this transaction, initialize the shared fileset and create the
2784  * buffile, otherwise open the previously created file.
2785  *
2786  * This can only be called at the beginning of a "streaming" block, i.e.
2787  * between stream_start/stream_stop messages from the upstream.
2788  */
2789 static void
2790 stream_open_file(Oid subid, TransactionId xid, bool first_segment)
2791 {
2792  char path[MAXPGPATH];
2793  bool found;
2794  MemoryContext oldcxt;
2795  StreamXidHash *ent;
2796 
2798  Assert(OidIsValid(subid));
2800  Assert(stream_fd == NULL);
2801 
2802  /* create or find the xid entry in the xidhash */
2803  ent = (StreamXidHash *) hash_search(xidhash,
2804  (void *) &xid,
2806  &found);
2807  Assert(first_segment || found);
2808  changes_filename(path, subid, xid);
2809  elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
2810 
2811  /*
2812  * Create/open the buffiles under the logical streaming context so that we
2813  * have those files until stream stop.
2814  */
2815  oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
2816 
2817  /*
2818  * If this is the first streamed segment, the file must not exist, so make
2819  * sure we're the ones creating it. Otherwise just open the file for
2820  * writing, in append mode.
2821  */
2822  if (first_segment)
2823  {
2824  MemoryContext savectx;
2825  SharedFileSet *fileset;
2826 
2827  /*
2828  * We need to maintain shared fileset across multiple stream
2829  * start/stop calls. So, need to allocate it in a persistent context.
2830  */
2831  savectx = MemoryContextSwitchTo(ApplyContext);
2832  fileset = palloc(sizeof(SharedFileSet));
2833 
2834  SharedFileSetInit(fileset, NULL);
2835  MemoryContextSwitchTo(savectx);
2836 
2837  stream_fd = BufFileCreateShared(fileset, path);
2838 
2839  /* Remember the fileset for the next stream of the same transaction */
2840  ent->xid = xid;
2841  ent->stream_fileset = fileset;
2842  ent->subxact_fileset = NULL;
2843  }
2844  else
2845  {
2846  /*
2847  * Open the file and seek to the end of the file because we always
2848  * append the changes file.
2849  */
2850  stream_fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
2851  BufFileSeek(stream_fd, 0, 0, SEEK_END);
2852  }
2853 
2854  MemoryContextSwitchTo(oldcxt);
2855 }
2856 
2857 /*
2858  * stream_close_file
2859  * Close the currently open file with streamed changes.
2860  *
2861  * This can only be called at the end of a streaming block, i.e. at stream_stop
2862  * message from the upstream.
2863  */
2864 static void
2866 {
2869  Assert(stream_fd != NULL);
2870 
2871  BufFileClose(stream_fd);
2872 
2874  stream_fd = NULL;
2875 }
2876 
2877 /*
2878  * stream_write_change
2879  * Serialize a change to a file for the current toplevel transaction.
2880  *
2881  * The change is serialized in a simple format, with length (not including
2882  * the length), action code (identifying the message type) and message
2883  * contents (without the subxact TransactionId value).
2884  */
2885 static void
2887 {
2888  int len;
2889 
2892  Assert(stream_fd != NULL);
2893 
2894  /* total on-disk size, including the action type character */
2895  len = (s->len - s->cursor) + sizeof(char);
2896 
2897  /* first write the size */
2898  BufFileWrite(stream_fd, &len, sizeof(len));
2899 
2900  /* then the action */
2901  BufFileWrite(stream_fd, &action, sizeof(action));
2902 
2903  /* and finally the remaining part of the buffer (after the XID) */
2904  len = (s->len - s->cursor);
2905 
2906  BufFileWrite(stream_fd, &s->data[s->cursor], len);
2907 }
2908 
2909 /*
2910  * Cleanup the memory for subxacts and reset the related variables.
2911  */
2912 static inline void
2914 {
2915  if (subxact_data.subxacts)
2916  pfree(subxact_data.subxacts);
2917 
2918  subxact_data.subxacts = NULL;
2919  subxact_data.subxact_last = InvalidTransactionId;
2920  subxact_data.nsubxacts = 0;
2921  subxact_data.nsubxacts_max = 0;
2922 }
2923 
2924 /* Logical Replication Apply worker entry point */
2925 void
2927 {
2928  int worker_slot = DatumGetInt32(main_arg);
2929  MemoryContext oldctx;
2930  char originname[NAMEDATALEN];
2931  XLogRecPtr origin_startpos;
2932  char *myslotname;
2934 
2935  /* Attach to slot */
2936  logicalrep_worker_attach(worker_slot);
2937 
2938  /* Setup signal handling */
2940  pqsignal(SIGTERM, die);
2942 
2943  /*
2944  * We don't currently need any ResourceOwner in a walreceiver process, but
2945  * if we did, we could call CreateAuxProcessResourceOwner here.
2946  */
2947 
2948  /* Initialise stats to a sanish value */
2951 
2952  /* Load the libpq-specific functions */
2953  load_file("libpqwalreceiver", false);
2954 
2955  /* Run as replica session replication role. */
2956  SetConfigOption("session_replication_role", "replica",
2958 
2959  /* Connect to our database. */
2962  0);
2963 
2964  /*
2965  * Set always-secure search path, so malicious users can't redirect user
2966  * code (e.g. pg_index.indexprs).
2967  */
2968  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
2969 
2970  /* Load the subscription into persistent memory context. */
2971  ApplyContext = AllocSetContextCreate(TopMemoryContext,
2972  "ApplyContext",
2975  oldctx = MemoryContextSwitchTo(ApplyContext);
2976 
2977  MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
2978  if (!MySubscription)
2979  {
2980  ereport(LOG,
2981  (errmsg("logical replication apply worker for subscription %u will not "
2982  "start because the subscription was removed during startup",
2984  proc_exit(0);
2985  }
2986 
2987  MySubscriptionValid = true;
2988  MemoryContextSwitchTo(oldctx);
2989 
2990  if (!MySubscription->enabled)
2991  {
2992  ereport(LOG,
2993  (errmsg("logical replication apply worker for subscription \"%s\" will not "
2994  "start because the subscription was disabled during startup",
2995  MySubscription->name)));
2996 
2997  proc_exit(0);
2998  }
2999 
3000  /* Setup synchronous commit according to the user's wishes */
3001  SetConfigOption("synchronous_commit", MySubscription->synccommit,
3003 
3004  /* Keep us informed about subscription changes. */
3007  (Datum) 0);
3008 
3009  if (am_tablesync_worker())
3010  ereport(LOG,
3011  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
3012  MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
3013  else
3014  ereport(LOG,
3015  (errmsg("logical replication apply worker for subscription \"%s\" has started",
3016  MySubscription->name)));
3017 
3019 
3020  /* Connect to the origin and start the replication. */
3021  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
3022  MySubscription->conninfo);
3023 
3024  if (am_tablesync_worker())
3025  {
3026  char *syncslotname;
3027 
3028  /* This is table synchronization worker, call initial sync. */
3029  syncslotname = LogicalRepSyncTableStart(&origin_startpos);
3030 
3031  /* allocate slot name in long-lived context */
3032  myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
3033 
3034  pfree(syncslotname);
3035  }
3036  else
3037  {
3038  /* This is main apply worker */
3039  RepOriginId originid;
3040  TimeLineID startpointTLI;
3041  char *err;
3042 
3043  myslotname = MySubscription->slotname;
3044 
3045  /*
3046  * This shouldn't happen if the subscription is enabled, but guard
3047  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
3048  * crash if slot is NULL.)
3049  */
3050  if (!myslotname)
3051  ereport(ERROR,
3052  (errmsg("subscription has no replication slot set")));
3053 
3054  /* Setup replication origin tracking. */
3056  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
3057  originid = replorigin_by_name(originname, true);
3058  if (!OidIsValid(originid))
3059  originid = replorigin_create(originname);
3060  replorigin_session_setup(originid);
3061  replorigin_session_origin = originid;
3062  origin_startpos = replorigin_session_get_progress(false);
3064 
3065  wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
3066  &err);
3067  if (wrconn == NULL)
3068  ereport(ERROR,
3069  (errmsg("could not connect to the publisher: %s", err)));
3070 
3071  /*
3072  * We don't really use the output identify_system for anything but it
3073  * does some initializations on the upstream so let's still call it.
3074  */
3075  (void) walrcv_identify_system(wrconn, &startpointTLI);
3076  }
3077 
3078  /*
3079  * Setup callback for syscache so that we know when something changes in
3080  * the subscription relation state.
3081  */
3084  (Datum) 0);
3085 
3086  /* Build logical replication streaming options. */
3087  options.logical = true;
3088  options.startpoint = origin_startpos;
3089  options.slotname = myslotname;
3090  options.proto.logical.proto_version =
3091  walrcv_server_version(wrconn) >= 140000 ?
3093  options.proto.logical.publication_names = MySubscription->publications;
3094  options.proto.logical.binary = MySubscription->binary;
3095  options.proto.logical.streaming = MySubscription->stream;
3096 
3097  /* Start normal logical streaming replication. */
3098  walrcv_startstreaming(wrconn, &options);
3099 
3100  /* Run the main loop. */
3101  LogicalRepApplyLoop(origin_startpos);
3102 
3103  proc_exit(0);
3104 }
3105 
3106 /*
3107  * Is current process a logical replication worker?
3108  */
3109 bool
3111 {
3112  return MyLogicalRepWorker != NULL;
3113 }
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
Subscription * MySubscription
Definition: worker.c:161
static void apply_handle_type(StringInfo s)
Definition: worker.c:1117
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:475
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:263
#define NIL
Definition: pg_list.h:65
void ExecInitRangeTable(EState *estate, List *rangeTable)
Definition: execUtils.c:750
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:32
static void stream_close_file(void)
Definition: worker.c:2865
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:344
static void stream_write_change(char action, StringInfo s)
Definition: worker.c:2886
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:1134
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, Relation partition_root, int instrument_options)
Definition: execMain.c:1196
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:2299
static void subxact_info_add(TransactionId xid)
Definition: worker.c:2649
Relation ri_RelationDesc
Definition: execnodes.h:412
WalReceiverConn * wrconn
Definition: worker.c:159
static void subxact_info_read(Oid subid, TransactionId xid)
Definition: worker.c:2579
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define AllocSetContextCreate
Definition: memutils.h:170
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:414
#define DEBUG1
Definition: elog.h:25
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
dlist_node * cur
Definition: ilist.h:180
static void store_flush_position(XLogRecPtr remote_lsn)
Definition: worker.c:2029
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1801
uint32 TimeLineID
Definition: xlogdefs.h:52
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:289
int fileno
Definition: worker.c:184
MemoryContext TopTransactionContext
Definition: mcxt.c:49
void AcceptInvalidationMessages(void)
Definition: inval.c:684
CommandId es_output_cid
Definition: execnodes.h:531
static XLogRecPtr remote_final_lsn
Definition: worker.c:165
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4724
#define HASH_CONTEXT
Definition: hsearch.h:91
#define HASH_ELEM
Definition: hsearch.h:85
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepRelation *remoterel)
Definition: worker.c:1497
static void apply_handle_insert(StringInfo s)
Definition: worker.c:1151
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
uint32 TransactionId
Definition: c.h:575
void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
Definition: sharedfileset.c:64
static dlist_head lsn_mapping
Definition: worker.c:130
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:823
MemoryContext hcxt
Definition: hsearch.h:77
#define DatumGetInt32(X)
Definition: postgres.h:472
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:650
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:3029
#define RelationGetDescr(relation)
Definition: rel.h:483
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:530
static void apply_handle_update_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, LogicalRepRelMapEntry *relmapentry)
Definition: worker.c:1368
dlist_node node
Definition: worker.c:125
#define write(a, b, c)
Definition: win32.h:14
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:2734
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3296
static void apply_handle_commit_internal(StringInfo s, LogicalRepCommitData *commit_data)
Definition: worker.c:1063
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:829
int64 timestamp
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:59
int64 TimestampTz
Definition: timestamp.h:39
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1157
LogicalRepRelMapEntry * rel
Definition: worker.c:134
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:406
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:343
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
void CommitTransactionCommand(void)
Definition: xact.c:2948
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:2048
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:362
void BufFileTruncateShared(BufFile *file, int fileno, off_t offset)
Definition: buffile.c:861
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:416
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:397
Expr * expression_planner(Expr *expr)
Definition: planner.c:6166
#define walrcv_server_version(conn)
Definition: walreceiver.h:408
int maplen
Definition: attmap.h:37
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define AccessShareLock
Definition: lockdefs.h:36
TimestampTz last_send_time
uint16 RepOriginId
Definition: xlogdefs.h:58
Size entrysize
Definition: hsearch.h:72
XLogRecPtr last_lsn
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:746
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:412
union WalRcvStreamOptions::@104 proto
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1187
XLogRecPtr remote_end
Definition: worker.c:127
int errcode(int sqlerrcode)
Definition: elog.c:691
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:81
char * format_type_be(Oid type_oid)
Definition: format_type.c:339
CmdType operation
Definition: execnodes.h:1149
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:449
Datum * tts_values
Definition: tuptable.h:126
#define WL_SOCKET_READABLE
Definition: latch.h:125
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:137
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8423
void PopActiveSnapshot(void)
Definition: snapmgr.c:759
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:636
StringInfoData * colvalues
Definition: logicalproto.h:70
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:2727
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:919
EState * state
Definition: execnodes.h:930
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1052
List * es_range_table
Definition: execnodes.h:519
#define LOG
Definition: elog.h:26
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
Form_pg_class rd_rel
Definition: rel.h:110
unsigned int Oid
Definition: postgres_ext.h:31
TransactionId subxact_last
Definition: worker.c:193
struct SlotErrCallbackArg SlotErrCallbackArg
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:820
bool IsLogicalWorker(void)
Definition: worker.c:3110
void(* callback)(void *arg)
Definition: elog.h:243
int wal_receiver_status_interval
Definition: walreceiver.c:88
List * lappend_oid(List *list, Oid datum)
Definition: list.c:357
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1709
struct ErrorContextCallback * previous
Definition: elog.h:242
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:250
#define OidIsValid(objectId)
Definition: c.h:706
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:82
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
static int fd(const char *x, int i)
Definition: preproc-init.c:105
uint32 nsubxacts
Definition: worker.c:191
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:209
void BufFileClose(BufFile *file)
Definition: buffile.c:395
void ResetLatch(Latch *latch)
Definition: latch.c:588
int wal_receiver_timeout
Definition: walreceiver.c:89
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:1985
Definition: attmap.h:34
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:151
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2810
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:163
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
ErrorContextCallback * error_context_stack
Definition: elog.c:92
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:374
#define list_make1(x1)
Definition: pg_list.h:206
#define NAMEDATALEN
void FreeExecutorState(EState *estate)
Definition: execUtils.c:185
Subscription * GetSubscription(Oid subid, bool missing_ok)
#define GetPerTupleExprContext(estate)
Definition: executor.h:509
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:173
static StringInfoData reply_message
Definition: walreceiver.c:117
Definition: dynahash.c:218
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:2486
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:80
void pfree(void *pointer)
Definition: mcxt.c:1057
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
SubXactInfo * subxacts
Definition: worker.c:194
static void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:2747
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
#define ERROR
Definition: elog.h:43
PlanState ps
Definition: execnodes.h:1148
static bool ensure_transaction(void)
Definition: worker.c:279
struct ApplySubXactData ApplySubXactData
LogicalRepRelation remoterel
#define NAPTIME_PER_CYCLE
Definition: worker.c:121
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
static void * list_nth(const List *list, int n)
Definition: pg_list.h:266
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:796
bool in_remote_transaction
Definition: worker.c:164
void fill_extraUpdatedCols(RangeTblEntry *target_rte, Relation target_relation)
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs)
Definition: tablecmds.c:1686
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:453
Definition: guc.h:75
#define MAXPGPATH
bool in_streamed_transaction
Definition: worker.c:168
TupleTableSlot * ri_PartitionTupleSlot
Definition: execnodes.h:492
XLogRecPtr reply_lsn
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:580
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:354
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
Definition: fmgr.c:1666
static bool am_tablesync_worker(void)
static void apply_handle_delete(StringInfo s)
Definition: worker.c:1429
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define DEBUG2
Definition: elog.h:24
BufFile * BufFileCreateShared(SharedFileSet *fileset, const char *name)
Definition: buffile.c:262
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:102
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:155
char * c
void logicalrep_worker_attach(int slot)
Definition: launcher.c:629
#define NoLock
Definition: lockdefs.h:34
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:7808
static char * buf
Definition: pg_test_fsync.c:68
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:680
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:156
bool * tts_isnull
Definition: tuptable.h:128
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:292
#define RowExclusiveLock
Definition: lockdefs.h:38
int errcode_for_file_access(void)
Definition: elog.c:714
#define SIGHUP
Definition: win32_port.h:159
#define InvalidTransactionId
Definition: transam.h:31
SharedFileSet * stream_fileset
Definition: worker.c:149
#define RelationGetRelationName(relation)
Definition: rel.h:491
XLogRecPtr startpoint
Definition: walreceiver.h:168
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:193
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:438
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
unsigned int uint32
Definition: c.h:429
int pgsocket
Definition: port.h:31
List * publications
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:309
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void apply_handle_begin(StringInfo s)
Definition: worker.c:687
RepOriginId replorigin_create(char *roname)
Definition: origin.c:240
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:274
Oid get_atttype(Oid relid, AttrNumber attnum)
Definition: lsyscache.c:911
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, ModifyTableState *mtstate, Relation rel)
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition: lsyscache.c:2822
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2756
AttrMap * attrMap
Definition: tupconvert.h:27
BufFile * BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode)
Definition: buffile.c:284
MemoryContext TopMemoryContext
Definition: mcxt.c:44
EState * CreateExecutorState(void)
Definition: execUtils.c:89
Definition: guc.h:72
int my_log2(long num)
Definition: dynahash.c:1730
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:1235
List * lappend(List *list, void *datum)
Definition: list.c:321
TransactionId xid
Definition: worker.c:183
MemoryContext ApplyContext
Definition: worker.c:154
static char ** options
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
XLogRecPtr final_lsn
Definition: logicalproto.h:112
#define DLIST_STATIC_INIT(name)
Definition: ilist.h:248
static void apply_handle_commit(StringInfo s)
Definition: worker.c:706
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2390
static void apply_handle_tuple_routing(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, LogicalRepRelMapEntry *relmapentry, CmdType operation)
Definition: worker.c:1571
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:67
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
Node * build_column_default(Relation rel, int attrno)
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:930
List * es_tupleTable
Definition: execnodes.h:561
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1434
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1161
char * s2
static void apply_handle_update(StringInfo s)
Definition: worker.c:1269
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:326
uintptr_t Datum
Definition: postgres.h:367
void CommandCounterIncrement(void)
Definition: xact.c:1021
#define PGINVALID_SOCKET
Definition: port.h:33
Size keysize
Definition: hsearch.h:71
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:1542
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5718
Plan * plan
Definition: execnodes.h:928
static void apply_handle_relation(StringInfo s)
Definition: worker.c:1099
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition: relation.c:632
int16 attnum
Definition: pg_attribute.h:79
SharedFileSet * subxact_fileset
Definition: worker.c:150
#define ereport(elevel,...)
Definition: elog.h:155
Bitmapset * updatedCols
Definition: parsenodes.h:1129
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:33
XLogRecPtr local_end
Definition: worker.c:126
static MemoryContext LogicalStreamingContext
Definition: worker.c:157
struct SubXactInfo SubXactInfo
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
static HTAB * xidhash
Definition: worker.c:176
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
Definition: buffile.c:743
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4502
static void apply_dispatch(StringInfo s)
Definition: worker.c:1907
static void maybe_reread_subscription(void)
Definition: worker.c:2388
#define makeNode(_type_)
Definition: nodes.h:575
TimestampTz last_recv_time
static MemoryContext ApplyMessageContext
Definition: worker.c:153
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:674
void SharedFileSetDeleteAll(SharedFileSet *fileset)
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:177
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:800
#define lfirst(lc)
Definition: pg_list.h:169
RepOriginId replorigin_session_origin
Definition: origin.c:154
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:594
static void apply_handle_origin(StringInfo s)
Definition: worker.c:728
void StartTransactionCommand(void)
Definition: xact.c:2847
AttrNumber * attnums
Definition: attmap.h:36
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
LogicalRepMsgType
Definition: logicalproto.h:46
size_t Size
Definition: c.h:528
static void slot_store_error_callback(void *arg)
Definition: worker.c:430
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:218
bool IsTransactionState(void)
Definition: xact.c:371
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:736
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:418
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:94
int WalWriterDelay
Definition: walwriter.c:70
bool MySubscriptionValid
Definition: worker.c:162
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1070
off_t offset
Definition: worker.c:185
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:514
void FreeSubscription(Subscription *sub)
char * logicalrep_typmap_gettypname(Oid remoteid)
Definition: relation.c:503
RTEKind rtekind
Definition: parsenodes.h:982
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:165
TransactionId xid
Definition: worker.c:148
static void subxact_info_write(Oid subid, TransactionId xid)
Definition: worker.c:2501
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4522
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:833
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition: proto.c:773
void * palloc(Size size)
Definition: mcxt.c:950
TupleConversionMap * ri_RootToPartitionMap
Definition: execnodes.h:491
int errmsg(const char *fmt,...)
Definition: elog.c:902
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1174
static void stream_open_file(Oid subid, TransactionId xid, bool first)
Definition: worker.c:2790
static ApplySubXactData subxact_data
Definition: worker.c:197
static BufFile * stream_fd
Definition: worker.c:179
uint32 nsubxacts_max
Definition: worker.c:192
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:2064
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
#define elog(elevel,...)
Definition: elog.h:228
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
int i
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:543
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
#define errcontext
Definition: elog.h:199
static void cleanup_subxact_info(void)
Definition: worker.c:2913
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1679
void * arg
struct Latch * MyLatch
Definition: globals.c:54
struct FlushPosition FlushPosition
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:465
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:122
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:761
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
void BufFileWrite(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:586
#define TransactionIdIsValid(xid)
Definition: transam.h:41
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389
Definition: pg_list.h:50
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1845
#define snprintf
Definition: port.h:215
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1648
#define WL_LATCH_SET
Definition: latch.h:124
#define RelationGetRelid(relation)
Definition: rel.h:457
static TransactionId stream_xid
Definition: worker.c:170
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542
CmdType
Definition: nodes.h:670
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:226
#define die(msg)
Definition: pg_test_fsync.c:97
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4703
struct StreamXidHash StreamXidHash
TimestampTz committime
Definition: logicalproto.h:121
void logicalrep_typmap_update(LogicalRepTyp *remotetyp)
Definition: relation.c:469
void ApplyWorkerMain(Datum main_arg)
Definition: worker.c:2926
void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid)
Definition: proto.c:865
uint32 LogicalRepRelId
Definition: logicalproto.h:84
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1522
#define lfirst_oid(lc)
Definition: pg_list.h:171
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
TimestampTz reply_time
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:217
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:257
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
static void apply_handle_insert_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot)
Definition: worker.c:1218
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5747
void pgstat_report_stat(bool force)
Definition: pgstat.c:861
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:1796
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:398