PostgreSQL Source Code  git master
worker.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * worker.c
3  * PostgreSQL logical replication worker (apply)
4  *
5  * Copyright (c) 2016-2021, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/worker.c
9  *
10  * NOTES
11  * This file contains the worker which applies logical changes as they come
12  * from remote logical replication stream.
13  *
14  * The main worker (apply) is started by logical replication worker
15  * launcher for every enabled subscription in a database. It uses
16  * walsender protocol to communicate with publisher.
17  *
18  * This module includes server facing code and shares libpqwalreceiver
19  * module with walreceiver for providing the libpq specific functionality.
20  *
21  *
22  * STREAMED TRANSACTIONS
23  * ---------------------
24  * Streamed transactions (large transactions exceeding a memory limit on the
25  * upstream) are not applied immediately, but instead, the data is written
26  * to temporary files and then applied at once when the final commit arrives.
27  *
28  * Unlike the regular (non-streamed) case, handling streamed transactions has
29  * to handle aborts of both the toplevel transaction and subtransactions. This
30  * is achieved by tracking offsets for subtransactions, which is then used
31  * to truncate the file with serialized changes.
32  *
33  * The files are placed in tmp file directory by default, and the filenames
34  * include both the XID of the toplevel transaction and OID of the
35  * subscription. This is necessary so that different workers processing a
36  * remote transaction with the same XID doesn't interfere.
37  *
38  * We use BufFiles instead of using normal temporary files because (a) the
39  * BufFile infrastructure supports temporary files that exceed the OS file size
40  * limit, (b) provides a way for automatic clean up on the error and (c) provides
41  * a way to survive these files across local transactions and allow to open and
42  * close at stream start and close. We decided to use SharedFileSet
43  * infrastructure as without that it deletes the files on the closure of the
44  * file and if we decide to keep stream files open across the start/stop stream
45  * then it will consume a lot of memory (more than 8K for each BufFile and
46  * there could be multiple such BufFiles as the subscriber could receive
47  * multiple start/stop streams for different transactions before getting the
48  * commit). Moreover, if we don't use SharedFileSet then we also need to invent
49  * a new way to pass filenames to BufFile APIs so that we are allowed to open
50  * the file we desired across multiple stream-open calls for the same
51  * transaction.
52  *-------------------------------------------------------------------------
53  */
54 
55 #include "postgres.h"
56 
57 #include <sys/stat.h>
58 #include <unistd.h>
59 
60 #include "access/table.h"
61 #include "access/tableam.h"
62 #include "access/xact.h"
63 #include "access/xlog_internal.h"
64 #include "catalog/catalog.h"
65 #include "catalog/namespace.h"
66 #include "catalog/partition.h"
67 #include "catalog/pg_inherits.h"
70 #include "catalog/pg_tablespace.h"
71 #include "commands/tablecmds.h"
72 #include "commands/tablespace.h"
73 #include "commands/trigger.h"
74 #include "executor/executor.h"
75 #include "executor/execPartition.h"
77 #include "funcapi.h"
78 #include "libpq/pqformat.h"
79 #include "libpq/pqsignal.h"
80 #include "mb/pg_wchar.h"
81 #include "miscadmin.h"
82 #include "nodes/makefuncs.h"
83 #include "optimizer/optimizer.h"
84 #include "pgstat.h"
85 #include "postmaster/bgworker.h"
86 #include "postmaster/interrupt.h"
87 #include "postmaster/postmaster.h"
88 #include "postmaster/walwriter.h"
89 #include "replication/decode.h"
90 #include "replication/logical.h"
94 #include "replication/origin.h"
96 #include "replication/snapbuild.h"
99 #include "rewrite/rewriteHandler.h"
100 #include "storage/buffile.h"
101 #include "storage/bufmgr.h"
102 #include "storage/fd.h"
103 #include "storage/ipc.h"
104 #include "storage/lmgr.h"
105 #include "storage/proc.h"
106 #include "storage/procarray.h"
107 #include "tcop/tcopprot.h"
108 #include "utils/builtins.h"
109 #include "utils/catcache.h"
110 #include "utils/dynahash.h"
111 #include "utils/datum.h"
112 #include "utils/fmgroids.h"
113 #include "utils/guc.h"
114 #include "utils/inval.h"
115 #include "utils/lsyscache.h"
116 #include "utils/memutils.h"
117 #include "utils/rel.h"
118 #include "utils/syscache.h"
119 #include "utils/timeout.h"
120 
121 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
122 
123 typedef struct FlushPosition
124 {
128 } FlushPosition;
129 
131 
132 typedef struct SlotErrCallbackArg
133 {
138 
139 /*
140  * Stream xid hash entry. Whenever we see a new xid we create this entry in the
141  * xidhash and along with it create the streaming file and store the fileset handle.
142  * The subxact file is created iff there is any subxact info under this xid. This
143  * entry is used on the subsequent streams for the xid to get the corresponding
144  * fileset handles, so storing them in hash makes the search faster.
145  */
146 typedef struct StreamXidHash
147 {
148  TransactionId xid; /* xid is the hash key and must be first */
149  SharedFileSet *stream_fileset; /* shared file set for stream data */
150  SharedFileSet *subxact_fileset; /* shared file set for subxact info */
151 } StreamXidHash;
152 
155 
156 /* per stream context for streaming transactions */
158 
160 
162 bool MySubscriptionValid = false;
163 
166 
167 /* fields valid only when processing streamed transaction */
169 
171 
172 /*
173  * Hash table for storing the streaming xid information along with shared file
174  * set for streaming and subxact files.
175  */
176 static HTAB *xidhash = NULL;
177 
178 /* BufFile handle of the current streaming file */
179 static BufFile *stream_fd = NULL;
180 
181 typedef struct SubXactInfo
182 {
183  TransactionId xid; /* XID of the subxact */
184  int fileno; /* file number in the buffile */
185  off_t offset; /* offset in the file */
186 } SubXactInfo;
187 
188 /* Sub-transaction data for the current streaming transaction */
189 typedef struct ApplySubXactData
190 {
191  uint32 nsubxacts; /* number of sub-transactions */
192  uint32 nsubxacts_max; /* current capacity of subxacts */
193  TransactionId subxact_last; /* xid of the last sub-transaction */
194  SubXactInfo *subxacts; /* sub-xact offset in changes file */
196 
198 
199 static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
200 static inline void changes_filename(char *path, Oid subid, TransactionId xid);
201 
202 /*
203  * Information about subtransactions of a given toplevel transaction.
204  */
205 static void subxact_info_write(Oid subid, TransactionId xid);
206 static void subxact_info_read(Oid subid, TransactionId xid);
207 static void subxact_info_add(TransactionId xid);
208 static inline void cleanup_subxact_info(void);
209 
210 /*
211  * Serialize and deserialize changes for a toplevel transaction.
212  */
213 static void stream_cleanup_files(Oid subid, TransactionId xid);
214 static void stream_open_file(Oid subid, TransactionId xid, bool first);
215 static void stream_write_change(char action, StringInfo s);
216 static void stream_close_file(void);
217 
218 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
219 
220 static void store_flush_position(XLogRecPtr remote_lsn);
221 
222 static void maybe_reread_subscription(void);
223 
224 /* prototype needed because of stream_commit */
225 static void apply_dispatch(StringInfo s);
226 
228  LogicalRepCommitData *commit_data);
229 static void apply_handle_insert_internal(ResultRelInfo *relinfo,
230  EState *estate, TupleTableSlot *remoteslot);
231 static void apply_handle_update_internal(ResultRelInfo *relinfo,
232  EState *estate, TupleTableSlot *remoteslot,
233  LogicalRepTupleData *newtup,
234  LogicalRepRelMapEntry *relmapentry);
235 static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
236  TupleTableSlot *remoteslot,
237  LogicalRepRelation *remoterel);
238 static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
239  LogicalRepRelation *remoterel,
240  TupleTableSlot *remoteslot,
241  TupleTableSlot **localslot);
242 static void apply_handle_tuple_routing(ResultRelInfo *relinfo,
243  EState *estate,
244  TupleTableSlot *remoteslot,
245  LogicalRepTupleData *newtup,
246  LogicalRepRelMapEntry *relmapentry,
247  CmdType operation);
248 
249 /*
250  * Should this worker apply changes for given relation.
251  *
252  * This is mainly needed for initial relation data sync as that runs in
253  * separate worker process running in parallel and we need some way to skip
254  * changes coming to the main apply worker during the sync of a table.
255  *
256  * Note we need to do smaller or equals comparison for SYNCDONE state because
257  * it might hold position of end of initial slot consistent point WAL
258  * record + 1 (ie start of next record) and next record can be COMMIT of
259  * transaction we are now processing (which is what we set remote_final_lsn
260  * to in apply_handle_begin).
261  */
262 static bool
264 {
265  if (am_tablesync_worker())
266  return MyLogicalRepWorker->relid == rel->localreloid;
267  else
268  return (rel->state == SUBREL_STATE_READY ||
269  (rel->state == SUBREL_STATE_SYNCDONE &&
270  rel->statelsn <= remote_final_lsn));
271 }
272 
273 /*
274  * Make sure that we started local transaction.
275  *
276  * Also switches to ApplyMessageContext as necessary.
277  */
278 static bool
280 {
281  if (IsTransactionState())
282  {
284 
285  if (CurrentMemoryContext != ApplyMessageContext)
286  MemoryContextSwitchTo(ApplyMessageContext);
287 
288  return false;
289  }
290 
293 
295 
296  MemoryContextSwitchTo(ApplyMessageContext);
297  return true;
298 }
299 
300 /*
301  * Handle streamed transactions.
302  *
303  * If in streaming mode (receiving a block of streamed transaction), we
304  * simply redirect it to a file for the proper toplevel transaction.
305  *
306  * Returns true for streamed transactions, false otherwise (regular mode).
307  */
308 static bool
310 {
311  TransactionId xid;
312 
313  /* not in streaming mode */
315  return false;
316 
317  Assert(stream_fd != NULL);
319 
320  /*
321  * We should have received XID of the subxact as the first part of the
322  * message, so extract it.
323  */
324  xid = pq_getmsgint(s, 4);
325 
327 
328  /* Add the new subxact to the array (unless already there). */
329  subxact_info_add(xid);
330 
331  /* write the change to the current file */
332  stream_write_change(action, s);
333 
334  return true;
335 }
336 
337 /*
338  * Executor state preparation for evaluation of constraint expressions,
339  * indexes and triggers.
340  *
341  * resultRelInfo is a ResultRelInfo for the relation to be passed to the
342  * executor routines. The caller must open and close any indexes to be
343  * updated independently of the relation registered here.
344  */
345 static EState *
347  ResultRelInfo **resultRelInfo)
348 {
349  EState *estate;
350  RangeTblEntry *rte;
351 
352  estate = CreateExecutorState();
353 
354  rte = makeNode(RangeTblEntry);
355  rte->rtekind = RTE_RELATION;
356  rte->relid = RelationGetRelid(rel->localrel);
357  rte->relkind = rel->localrel->rd_rel->relkind;
359  ExecInitRangeTable(estate, list_make1(rte));
360 
361  *resultRelInfo = makeNode(ResultRelInfo);
362 
363  /*
364  * Use Relation opened by logicalrep_rel_open() instead of opening it
365  * again.
366  */
367  InitResultRelInfo(*resultRelInfo, rel->localrel, 1, NULL, 0);
368 
369  /*
370  * We put the ResultRelInfo in the es_opened_result_relations list, even
371  * though we don't populate the es_result_relations array. That's a bit
372  * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
373  * Also, because we did not open the Relation ourselves here, there is no
374  * need to worry about closing it.
375  *
376  * ExecOpenIndices() is not called here either, each execution path doing
377  * an apply operation being responsible for that.
378  */
380  lappend(estate->es_opened_result_relations, *resultRelInfo);
381 
382  estate->es_output_cid = GetCurrentCommandId(true);
383 
384  /* Prepare to catch AFTER triggers. */
386 
387  return estate;
388 }
389 
390 /*
391  * Finish any operations related to the executor state created by
392  * create_estate_for_relation().
393  */
394 static void
396 {
397  /* Handle any queued AFTER triggers. */
398  AfterTriggerEndQuery(estate);
399 
400  /* Cleanup. */
401  ExecResetTupleTable(estate->es_tupleTable, false);
402  FreeExecutorState(estate);
403 }
404 
405 /*
406  * Executes default values for columns for which we can't map to remote
407  * relation columns.
408  *
409  * This allows us to support tables which have more columns on the downstream
410  * than on the upstream.
411  */
412 static void
414  TupleTableSlot *slot)
415 {
416  TupleDesc desc = RelationGetDescr(rel->localrel);
417  int num_phys_attrs = desc->natts;
418  int i;
419  int attnum,
420  num_defaults = 0;
421  int *defmap;
422  ExprState **defexprs;
423  ExprContext *econtext;
424 
425  econtext = GetPerTupleExprContext(estate);
426 
427  /* We got all the data via replication, no need to evaluate anything. */
428  if (num_phys_attrs == rel->remoterel.natts)
429  return;
430 
431  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
432  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
433 
434  Assert(rel->attrmap->maplen == num_phys_attrs);
435  for (attnum = 0; attnum < num_phys_attrs; attnum++)
436  {
437  Expr *defexpr;
438 
439  if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
440  continue;
441 
442  if (rel->attrmap->attnums[attnum] >= 0)
443  continue;
444 
445  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
446 
447  if (defexpr != NULL)
448  {
449  /* Run the expression through planner */
450  defexpr = expression_planner(defexpr);
451 
452  /* Initialize executable expression in copycontext */
453  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
454  defmap[num_defaults] = attnum;
455  num_defaults++;
456  }
457 
458  }
459 
460  for (i = 0; i < num_defaults; i++)
461  slot->tts_values[defmap[i]] =
462  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
463 }
464 
465 /*
466  * Error callback to give more context info about type conversion failure.
467  */
468 static void
470 {
471  SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
473  char *remotetypname;
474  Oid remotetypoid,
475  localtypoid;
476 
477  /* Nothing to do if remote attribute number is not set */
478  if (errarg->remote_attnum < 0)
479  return;
480 
481  rel = errarg->rel;
482  remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum];
483 
484  /* Fetch remote type name from the LogicalRepTypMap cache */
485  remotetypname = logicalrep_typmap_gettypname(remotetypoid);
486 
487  /* Fetch local type OID from the local sys cache */
488  localtypoid = get_atttype(rel->localreloid, errarg->local_attnum + 1);
489 
490  errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
491  "remote type %s, local type %s",
492  rel->remoterel.nspname, rel->remoterel.relname,
493  rel->remoterel.attnames[errarg->remote_attnum],
494  remotetypname,
495  format_type_be(localtypoid));
496 }
497 
498 /*
499  * Store tuple data into slot.
500  *
501  * Incoming data can be either text or binary format.
502  */
503 static void
505  LogicalRepTupleData *tupleData)
506 {
507  int natts = slot->tts_tupleDescriptor->natts;
508  int i;
509  SlotErrCallbackArg errarg;
510  ErrorContextCallback errcallback;
511 
512  ExecClearTuple(slot);
513 
514  /* Push callback + info on the error context stack */
515  errarg.rel = rel;
516  errarg.local_attnum = -1;
517  errarg.remote_attnum = -1;
518  errcallback.callback = slot_store_error_callback;
519  errcallback.arg = (void *) &errarg;
520  errcallback.previous = error_context_stack;
521  error_context_stack = &errcallback;
522 
523  /* Call the "in" function for each non-dropped, non-null attribute */
524  Assert(natts == rel->attrmap->maplen);
525  for (i = 0; i < natts; i++)
526  {
528  int remoteattnum = rel->attrmap->attnums[i];
529 
530  if (!att->attisdropped && remoteattnum >= 0)
531  {
532  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
533 
534  Assert(remoteattnum < tupleData->ncols);
535 
536  errarg.local_attnum = i;
537  errarg.remote_attnum = remoteattnum;
538 
539  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
540  {
541  Oid typinput;
542  Oid typioparam;
543 
544  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
545  slot->tts_values[i] =
546  OidInputFunctionCall(typinput, colvalue->data,
547  typioparam, att->atttypmod);
548  slot->tts_isnull[i] = false;
549  }
550  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
551  {
552  Oid typreceive;
553  Oid typioparam;
554 
555  /*
556  * In some code paths we may be asked to re-parse the same
557  * tuple data. Reset the StringInfo's cursor so that works.
558  */
559  colvalue->cursor = 0;
560 
561  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
562  slot->tts_values[i] =
563  OidReceiveFunctionCall(typreceive, colvalue,
564  typioparam, att->atttypmod);
565 
566  /* Trouble if it didn't eat the whole buffer */
567  if (colvalue->cursor != colvalue->len)
568  ereport(ERROR,
569  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
570  errmsg("incorrect binary data format in logical replication column %d",
571  remoteattnum + 1)));
572  slot->tts_isnull[i] = false;
573  }
574  else
575  {
576  /*
577  * NULL value from remote. (We don't expect to see
578  * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
579  * NULL.)
580  */
581  slot->tts_values[i] = (Datum) 0;
582  slot->tts_isnull[i] = true;
583  }
584 
585  errarg.local_attnum = -1;
586  errarg.remote_attnum = -1;
587  }
588  else
589  {
590  /*
591  * We assign NULL to dropped attributes and missing values
592  * (missing values should be later filled using
593  * slot_fill_defaults).
594  */
595  slot->tts_values[i] = (Datum) 0;
596  slot->tts_isnull[i] = true;
597  }
598  }
599 
600  /* Pop the error context stack */
601  error_context_stack = errcallback.previous;
602 
603  ExecStoreVirtualTuple(slot);
604 }
605 
606 /*
607  * Replace updated columns with data from the LogicalRepTupleData struct.
608  * This is somewhat similar to heap_modify_tuple but also calls the type
609  * input functions on the user data.
610  *
611  * "slot" is filled with a copy of the tuple in "srcslot", replacing
612  * columns provided in "tupleData" and leaving others as-is.
613  *
614  * Caution: unreplaced pass-by-ref columns in "slot" will point into the
615  * storage for "srcslot". This is OK for current usage, but someday we may
616  * need to materialize "slot" at the end to make it independent of "srcslot".
617  */
618 static void
621  LogicalRepTupleData *tupleData)
622 {
623  int natts = slot->tts_tupleDescriptor->natts;
624  int i;
625  SlotErrCallbackArg errarg;
626  ErrorContextCallback errcallback;
627 
628  /* We'll fill "slot" with a virtual tuple, so we must start with ... */
629  ExecClearTuple(slot);
630 
631  /*
632  * Copy all the column data from srcslot, so that we'll have valid values
633  * for unreplaced columns.
634  */
635  Assert(natts == srcslot->tts_tupleDescriptor->natts);
636  slot_getallattrs(srcslot);
637  memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
638  memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
639 
640  /* For error reporting, push callback + info on the error context stack */
641  errarg.rel = rel;
642  errarg.local_attnum = -1;
643  errarg.remote_attnum = -1;
644  errcallback.callback = slot_store_error_callback;
645  errcallback.arg = (void *) &errarg;
646  errcallback.previous = error_context_stack;
647  error_context_stack = &errcallback;
648 
649  /* Call the "in" function for each replaced attribute */
650  Assert(natts == rel->attrmap->maplen);
651  for (i = 0; i < natts; i++)
652  {
654  int remoteattnum = rel->attrmap->attnums[i];
655 
656  if (remoteattnum < 0)
657  continue;
658 
659  Assert(remoteattnum < tupleData->ncols);
660 
661  if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
662  {
663  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
664 
665  errarg.local_attnum = i;
666  errarg.remote_attnum = remoteattnum;
667 
668  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
669  {
670  Oid typinput;
671  Oid typioparam;
672 
673  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
674  slot->tts_values[i] =
675  OidInputFunctionCall(typinput, colvalue->data,
676  typioparam, att->atttypmod);
677  slot->tts_isnull[i] = false;
678  }
679  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
680  {
681  Oid typreceive;
682  Oid typioparam;
683 
684  /*
685  * In some code paths we may be asked to re-parse the same
686  * tuple data. Reset the StringInfo's cursor so that works.
687  */
688  colvalue->cursor = 0;
689 
690  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
691  slot->tts_values[i] =
692  OidReceiveFunctionCall(typreceive, colvalue,
693  typioparam, att->atttypmod);
694 
695  /* Trouble if it didn't eat the whole buffer */
696  if (colvalue->cursor != colvalue->len)
697  ereport(ERROR,
698  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
699  errmsg("incorrect binary data format in logical replication column %d",
700  remoteattnum + 1)));
701  slot->tts_isnull[i] = false;
702  }
703  else
704  {
705  /* must be LOGICALREP_COLUMN_NULL */
706  slot->tts_values[i] = (Datum) 0;
707  slot->tts_isnull[i] = true;
708  }
709 
710  errarg.local_attnum = -1;
711  errarg.remote_attnum = -1;
712  }
713  }
714 
715  /* Pop the error context stack */
716  error_context_stack = errcallback.previous;
717 
718  /* And finally, declare that "slot" contains a valid virtual tuple */
719  ExecStoreVirtualTuple(slot);
720 }
721 
722 /*
723  * Handle BEGIN message.
724  */
725 static void
727 {
728  LogicalRepBeginData begin_data;
729 
730  logicalrep_read_begin(s, &begin_data);
731 
732  remote_final_lsn = begin_data.final_lsn;
733 
734  in_remote_transaction = true;
735 
737 }
738 
739 /*
740  * Handle COMMIT message.
741  *
742  * TODO, support tracking of multiple origins
743  */
744 static void
746 {
747  LogicalRepCommitData commit_data;
748 
749  logicalrep_read_commit(s, &commit_data);
750 
751  Assert(commit_data.commit_lsn == remote_final_lsn);
752 
753  apply_handle_commit_internal(s, &commit_data);
754 
755  /* Process any tables that are being synchronized in parallel. */
756  process_syncing_tables(commit_data.end_lsn);
757 
759 }
760 
761 /*
762  * Handle ORIGIN message.
763  *
764  * TODO, support tracking of multiple origins
765  */
766 static void
768 {
769  /*
770  * ORIGIN message can only come inside streaming transaction or inside
771  * remote transaction and before any actual writes.
772  */
776  ereport(ERROR,
777  (errcode(ERRCODE_PROTOCOL_VIOLATION),
778  errmsg("ORIGIN message sent out of order")));
779 }
780 
781 /*
782  * Handle STREAM START message.
783  */
784 static void
786 {
787  bool first_segment;
788  HASHCTL hash_ctl;
789 
791 
792  /*
793  * Start a transaction on stream start, this transaction will be committed
794  * on the stream stop unless it is a tablesync worker in which case it
795  * will be committed after processing all the messages. We need the
796  * transaction for handling the buffile, used for serializing the
797  * streaming data and subxact info.
798  */
800 
801  /* notify handle methods we're processing a remote transaction */
803 
804  /* extract XID of the top-level transaction */
805  stream_xid = logicalrep_read_stream_start(s, &first_segment);
806 
807  /*
808  * Initialize the xidhash table if we haven't yet. This will be used for
809  * the entire duration of the apply worker so create it in permanent
810  * context.
811  */
812  if (xidhash == NULL)
813  {
814  hash_ctl.keysize = sizeof(TransactionId);
815  hash_ctl.entrysize = sizeof(StreamXidHash);
816  hash_ctl.hcxt = ApplyContext;
817  xidhash = hash_create("StreamXidHash", 1024, &hash_ctl,
819  }
820 
821  /* open the spool file for this transaction */
823 
824  /* if this is not the first segment, open existing subxact file */
825  if (!first_segment)
827 
829 }
830 
831 /*
832  * Handle STREAM STOP message.
833  */
834 static void
836 {
838 
839  /*
840  * Close the file with serialized changes, and serialize information about
841  * subxacts for the toplevel transaction.
842  */
845 
846  /* We must be in a valid transaction state */
848 
849  /* Commit the per-stream transaction */
851 
852  in_streamed_transaction = false;
853 
854  /* Reset per-stream context */
855  MemoryContextReset(LogicalStreamingContext);
856 
858 }
859 
860 /*
861  * Handle STREAM abort message.
862  */
863 static void
865 {
866  TransactionId xid;
867  TransactionId subxid;
868 
870 
871  logicalrep_read_stream_abort(s, &xid, &subxid);
872 
873  /*
874  * If the two XIDs are the same, it's in fact abort of toplevel xact, so
875  * just delete the files with serialized info.
876  */
877  if (xid == subxid)
879  else
880  {
881  /*
882  * OK, so it's a subxact. We need to read the subxact file for the
883  * toplevel transaction, determine the offset tracked for the subxact,
884  * and truncate the file with changes. We also remove the subxacts
885  * with higher offsets (or rather higher XIDs).
886  *
887  * We intentionally scan the array from the tail, because we're likely
888  * aborting a change for the most recent subtransactions.
889  *
890  * We can't use the binary search here as subxact XIDs won't
891  * necessarily arrive in sorted order, consider the case where we have
892  * released the savepoint for multiple subtransactions and then
893  * performed rollback to savepoint for one of the earlier
894  * sub-transaction.
895  */
896 
897  int64 i;
898  int64 subidx;
899  BufFile *fd;
900  bool found = false;
901  char path[MAXPGPATH];
902  StreamXidHash *ent;
903 
904  subidx = -1;
907 
908  for (i = subxact_data.nsubxacts; i > 0; i--)
909  {
910  if (subxact_data.subxacts[i - 1].xid == subxid)
911  {
912  subidx = (i - 1);
913  found = true;
914  break;
915  }
916  }
917 
918  /*
919  * If it's an empty sub-transaction then we will not find the subxid
920  * here so just cleanup the subxact info and return.
921  */
922  if (!found)
923  {
924  /* Cleanup the subxact info */
926 
928  return;
929  }
930 
931  Assert((subidx >= 0) && (subidx < subxact_data.nsubxacts));
932 
933  ent = (StreamXidHash *) hash_search(xidhash,
934  (void *) &xid,
935  HASH_FIND,
936  &found);
937  Assert(found);
938 
939  /* open the changes file */
941  fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
942 
943  /* OK, truncate the file at the right offset */
944  BufFileTruncateShared(fd, subxact_data.subxacts[subidx].fileno,
945  subxact_data.subxacts[subidx].offset);
946  BufFileClose(fd);
947 
948  /* discard the subxacts added later */
949  subxact_data.nsubxacts = subidx;
950 
951  /* write the updated subxact list */
953 
955  }
956 }
957 
958 /*
959  * Handle STREAM COMMIT message.
960  */
961 static void
963 {
964  TransactionId xid;
966  int nchanges;
967  char path[MAXPGPATH];
968  char *buffer = NULL;
969  bool found;
970  LogicalRepCommitData commit_data;
971  StreamXidHash *ent;
972  MemoryContext oldcxt;
973  BufFile *fd;
974 
976 
977  xid = logicalrep_read_stream_commit(s, &commit_data);
978 
979  elog(DEBUG1, "received commit for streamed transaction %u", xid);
980 
982 
983  /*
984  * Allocate file handle and memory required to process all the messages in
985  * TopTransactionContext to avoid them getting reset after each message is
986  * processed.
987  */
989 
990  /* open the spool file for the committed transaction */
992  elog(DEBUG1, "replaying changes from file \"%s\"", path);
993  ent = (StreamXidHash *) hash_search(xidhash,
994  (void *) &xid,
995  HASH_FIND,
996  &found);
997  Assert(found);
998  fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY);
999 
1000  buffer = palloc(BLCKSZ);
1001  initStringInfo(&s2);
1002 
1003  MemoryContextSwitchTo(oldcxt);
1004 
1005  remote_final_lsn = commit_data.commit_lsn;
1006 
1007  /*
1008  * Make sure the handle apply_dispatch methods are aware we're in a remote
1009  * transaction.
1010  */
1011  in_remote_transaction = true;
1013 
1014  /*
1015  * Read the entries one by one and pass them through the same logic as in
1016  * apply_dispatch.
1017  */
1018  nchanges = 0;
1019  while (true)
1020  {
1021  int nbytes;
1022  int len;
1023 
1025 
1026  /* read length of the on-disk record */
1027  nbytes = BufFileRead(fd, &len, sizeof(len));
1028 
1029  /* have we reached end of the file? */
1030  if (nbytes == 0)
1031  break;
1032 
1033  /* do we have a correct length? */
1034  if (nbytes != sizeof(len))
1035  ereport(ERROR,
1037  errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1038  path)));
1039 
1040  Assert(len > 0);
1041 
1042  /* make sure we have sufficiently large buffer */
1043  buffer = repalloc(buffer, len);
1044 
1045  /* and finally read the data into the buffer */
1046  if (BufFileRead(fd, buffer, len) != len)
1047  ereport(ERROR,
1049  errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1050  path)));
1051 
1052  /* copy the buffer to the stringinfo and call apply_dispatch */
1053  resetStringInfo(&s2);
1054  appendBinaryStringInfo(&s2, buffer, len);
1055 
1056  /* Ensure we are reading the data into our memory context. */
1057  oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
1058 
1059  apply_dispatch(&s2);
1060 
1061  MemoryContextReset(ApplyMessageContext);
1062 
1063  MemoryContextSwitchTo(oldcxt);
1064 
1065  nchanges++;
1066 
1067  if (nchanges % 1000 == 0)
1068  elog(DEBUG1, "replayed %d changes from file '%s'",
1069  nchanges, path);
1070  }
1071 
1072  BufFileClose(fd);
1073 
1074  pfree(buffer);
1075  pfree(s2.data);
1076 
1077  elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
1078  nchanges, path);
1079 
1080  apply_handle_commit_internal(s, &commit_data);
1081 
1082  /* unlink the files with serialized changes and subxact info */
1084 
1085  /* Process any tables that are being synchronized in parallel. */
1086  process_syncing_tables(commit_data.end_lsn);
1087 
1089 }
1090 
1091 /*
1092  * Helper function for apply_handle_commit and apply_handle_stream_commit.
1093  */
1094 static void
1096 {
1097  if (IsTransactionState())
1098  {
1099  /*
1100  * Update origin state so we can restart streaming from correct
1101  * position in case of crash.
1102  */
1103  replorigin_session_origin_lsn = commit_data->end_lsn;
1105 
1107  pgstat_report_stat(false);
1108 
1109  store_flush_position(commit_data->end_lsn);
1110  }
1111  else
1112  {
1113  /* Process any invalidation messages that might have accumulated. */
1116  }
1117 
1118  in_remote_transaction = false;
1119 }
1120 
1121 /*
1122  * Handle RELATION message.
1123  *
1124  * Note we don't do validation against local schema here. The validation
1125  * against local schema is postponed until first change for given relation
1126  * comes as we only care about it when applying changes for it anyway and we
1127  * do less locking this way.
1128  */
1129 static void
1131 {
1132  LogicalRepRelation *rel;
1133 
1135  return;
1136 
1137  rel = logicalrep_read_rel(s);
1139 }
1140 
1141 /*
1142  * Handle TYPE message.
1143  *
1144  * Note we don't do local mapping here, that's done when the type is
1145  * actually used.
1146  */
1147 static void
1149 {
1150  LogicalRepTyp typ;
1151 
1153  return;
1154 
1155  logicalrep_read_typ(s, &typ);
1157 }
1158 
1159 /*
1160  * Get replica identity index or if it is not defined a primary key.
1161  *
1162  * If neither is defined, returns InvalidOid
1163  */
1164 static Oid
1166 {
1167  Oid idxoid;
1168 
1169  idxoid = RelationGetReplicaIndex(rel);
1170 
1171  if (!OidIsValid(idxoid))
1172  idxoid = RelationGetPrimaryKeyIndex(rel);
1173 
1174  return idxoid;
1175 }
1176 
1177 /*
1178  * Handle INSERT message.
1179  */
1180 
1181 static void
1183 {
1184  ResultRelInfo *resultRelInfo;
1185  LogicalRepRelMapEntry *rel;
1186  LogicalRepTupleData newtup;
1187  LogicalRepRelId relid;
1188  EState *estate;
1189  TupleTableSlot *remoteslot;
1190  MemoryContext oldctx;
1191 
1193  return;
1194 
1196 
1197  relid = logicalrep_read_insert(s, &newtup);
1198  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1199  if (!should_apply_changes_for_rel(rel))
1200  {
1201  /*
1202  * The relation can't become interesting in the middle of the
1203  * transaction so it's safe to unlock it.
1204  */
1206  return;
1207  }
1208 
1209  /* Initialize the executor state. */
1210  estate = create_estate_for_relation(rel, &resultRelInfo);
1211  remoteslot = ExecInitExtraTupleSlot(estate,
1212  RelationGetDescr(rel->localrel),
1213  &TTSOpsVirtual);
1214 
1215  /* Input functions may need an active snapshot, so get one */
1217 
1218  /* Process and store remote tuple in the slot */
1220  slot_store_data(remoteslot, rel, &newtup);
1221  slot_fill_defaults(rel, estate, remoteslot);
1222  MemoryContextSwitchTo(oldctx);
1223 
1224  /* For a partitioned table, insert the tuple into a partition. */
1225  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1226  apply_handle_tuple_routing(resultRelInfo, estate,
1227  remoteslot, NULL, rel, CMD_INSERT);
1228  else
1229  apply_handle_insert_internal(resultRelInfo, estate,
1230  remoteslot);
1231 
1233 
1234  finish_estate(estate);
1235 
1237 
1239 }
1240 
1241 /* Workhorse for apply_handle_insert() */
1242 static void
1244  EState *estate, TupleTableSlot *remoteslot)
1245 {
1246  ExecOpenIndices(relinfo, false);
1247 
1248  /* Do the insert. */
1249  ExecSimpleRelationInsert(relinfo, estate, remoteslot);
1250 
1251  /* Cleanup. */
1252  ExecCloseIndices(relinfo);
1253 }
1254 
1255 /*
1256  * Check if the logical replication relation is updatable and throw
1257  * appropriate error if it isn't.
1258  */
1259 static void
1261 {
1262  /* Updatable, no error. */
1263  if (rel->updatable)
1264  return;
1265 
1266  /*
1267  * We are in error mode so it's fine this is somewhat slow. It's better to
1268  * give user correct error.
1269  */
1271  {
1272  ereport(ERROR,
1273  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1274  errmsg("publisher did not send replica identity column "
1275  "expected by the logical replication target relation \"%s.%s\"",
1276  rel->remoterel.nspname, rel->remoterel.relname)));
1277  }
1278 
1279  ereport(ERROR,
1280  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1281  errmsg("logical replication target relation \"%s.%s\" has "
1282  "neither REPLICA IDENTITY index nor PRIMARY "
1283  "KEY and published relation does not have "
1284  "REPLICA IDENTITY FULL",
1285  rel->remoterel.nspname, rel->remoterel.relname)));
1286 }
1287 
1288 /*
1289  * Handle UPDATE message.
1290  *
1291  * TODO: FDW support
1292  */
1293 static void
1295 {
1296  ResultRelInfo *resultRelInfo;
1297  LogicalRepRelMapEntry *rel;
1298  LogicalRepRelId relid;
1299  EState *estate;
1300  LogicalRepTupleData oldtup;
1301  LogicalRepTupleData newtup;
1302  bool has_oldtup;
1303  TupleTableSlot *remoteslot;
1304  RangeTblEntry *target_rte;
1305  MemoryContext oldctx;
1306 
1308  return;
1309 
1311 
1312  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
1313  &newtup);
1314  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1315  if (!should_apply_changes_for_rel(rel))
1316  {
1317  /*
1318  * The relation can't become interesting in the middle of the
1319  * transaction so it's safe to unlock it.
1320  */
1322  return;
1323  }
1324 
1325  /* Check if we can do the update. */
1327 
1328  /* Initialize the executor state. */
1329  estate = create_estate_for_relation(rel, &resultRelInfo);
1330  remoteslot = ExecInitExtraTupleSlot(estate,
1331  RelationGetDescr(rel->localrel),
1332  &TTSOpsVirtual);
1333 
1334  /*
1335  * Populate updatedCols so that per-column triggers can fire, and so
1336  * executor can correctly pass down indexUnchanged hint. This could
1337  * include more columns than were actually changed on the publisher
1338  * because the logical replication protocol doesn't contain that
1339  * information. But it would for example exclude columns that only exist
1340  * on the subscriber, since we are not touching those.
1341  */
1342  target_rte = list_nth(estate->es_range_table, 0);
1343  for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
1344  {
1346  int remoteattnum = rel->attrmap->attnums[i];
1347 
1348  if (!att->attisdropped && remoteattnum >= 0)
1349  {
1350  Assert(remoteattnum < newtup.ncols);
1351  if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
1352  target_rte->updatedCols =
1353  bms_add_member(target_rte->updatedCols,
1355  }
1356  }
1357 
1358  /* Also populate extraUpdatedCols, in case we have generated columns */
1359  fill_extraUpdatedCols(target_rte, rel->localrel);
1360 
1362 
1363  /* Build the search tuple. */
1365  slot_store_data(remoteslot, rel,
1366  has_oldtup ? &oldtup : &newtup);
1367  MemoryContextSwitchTo(oldctx);
1368 
1369  /* For a partitioned table, apply update to correct partition. */
1370  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1371  apply_handle_tuple_routing(resultRelInfo, estate,
1372  remoteslot, &newtup, rel, CMD_UPDATE);
1373  else
1374  apply_handle_update_internal(resultRelInfo, estate,
1375  remoteslot, &newtup, rel);
1376 
1378 
1379  finish_estate(estate);
1380 
1382 
1384 }
1385 
1386 /* Workhorse for apply_handle_update() */
1387 static void
1389  EState *estate, TupleTableSlot *remoteslot,
1390  LogicalRepTupleData *newtup,
1391  LogicalRepRelMapEntry *relmapentry)
1392 {
1393  Relation localrel = relinfo->ri_RelationDesc;
1394  EPQState epqstate;
1395  TupleTableSlot *localslot;
1396  bool found;
1397  MemoryContext oldctx;
1398 
1399  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1400  ExecOpenIndices(relinfo, false);
1401 
1402  found = FindReplTupleInLocalRel(estate, localrel,
1403  &relmapentry->remoterel,
1404  remoteslot, &localslot);
1405  ExecClearTuple(remoteslot);
1406 
1407  /*
1408  * Tuple found.
1409  *
1410  * Note this will fail if there are other conflicting unique indexes.
1411  */
1412  if (found)
1413  {
1414  /* Process and store remote tuple in the slot */
1416  slot_modify_data(remoteslot, localslot, relmapentry, newtup);
1417  MemoryContextSwitchTo(oldctx);
1418 
1419  EvalPlanQualSetSlot(&epqstate, remoteslot);
1420 
1421  /* Do the actual update. */
1422  ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
1423  remoteslot);
1424  }
1425  else
1426  {
1427  /*
1428  * The tuple to be updated could not be found.
1429  *
1430  * TODO what to do here, change the log level to LOG perhaps?
1431  */
1432  elog(DEBUG1,
1433  "logical replication did not find row for update "
1434  "in replication target relation \"%s\"",
1435  RelationGetRelationName(localrel));
1436  }
1437 
1438  /* Cleanup. */
1439  ExecCloseIndices(relinfo);
1440  EvalPlanQualEnd(&epqstate);
1441 }
1442 
1443 /*
1444  * Handle DELETE message.
1445  *
1446  * TODO: FDW support
1447  */
1448 static void
1450 {
1451  ResultRelInfo *resultRelInfo;
1452  LogicalRepRelMapEntry *rel;
1453  LogicalRepTupleData oldtup;
1454  LogicalRepRelId relid;
1455  EState *estate;
1456  TupleTableSlot *remoteslot;
1457  MemoryContext oldctx;
1458 
1460  return;
1461 
1463 
1464  relid = logicalrep_read_delete(s, &oldtup);
1465  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1466  if (!should_apply_changes_for_rel(rel))
1467  {
1468  /*
1469  * The relation can't become interesting in the middle of the
1470  * transaction so it's safe to unlock it.
1471  */
1473  return;
1474  }
1475 
1476  /* Check if we can do the delete. */
1478 
1479  /* Initialize the executor state. */
1480  estate = create_estate_for_relation(rel, &resultRelInfo);
1481  remoteslot = ExecInitExtraTupleSlot(estate,
1482  RelationGetDescr(rel->localrel),
1483  &TTSOpsVirtual);
1484 
1486 
1487  /* Build the search tuple. */
1489  slot_store_data(remoteslot, rel, &oldtup);
1490  MemoryContextSwitchTo(oldctx);
1491 
1492  /* For a partitioned table, apply delete to correct partition. */
1493  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1494  apply_handle_tuple_routing(resultRelInfo, estate,
1495  remoteslot, NULL, rel, CMD_DELETE);
1496  else
1497  apply_handle_delete_internal(resultRelInfo, estate,
1498  remoteslot, &rel->remoterel);
1499 
1501 
1502  finish_estate(estate);
1503 
1505 
1507 }
1508 
1509 /* Workhorse for apply_handle_delete() */
1510 static void
1512  TupleTableSlot *remoteslot,
1513  LogicalRepRelation *remoterel)
1514 {
1515  Relation localrel = relinfo->ri_RelationDesc;
1516  EPQState epqstate;
1517  TupleTableSlot *localslot;
1518  bool found;
1519 
1520  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1521  ExecOpenIndices(relinfo, false);
1522 
1523  found = FindReplTupleInLocalRel(estate, localrel, remoterel,
1524  remoteslot, &localslot);
1525 
1526  /* If found delete it. */
1527  if (found)
1528  {
1529  EvalPlanQualSetSlot(&epqstate, localslot);
1530 
1531  /* Do the actual delete. */
1532  ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
1533  }
1534  else
1535  {
1536  /* The tuple to be deleted could not be found. */
1537  elog(DEBUG1,
1538  "logical replication did not find row for delete "
1539  "in replication target relation \"%s\"",
1540  RelationGetRelationName(localrel));
1541  }
1542 
1543  /* Cleanup. */
1544  ExecCloseIndices(relinfo);
1545  EvalPlanQualEnd(&epqstate);
1546 }
1547 
1548 /*
1549  * Try to find a tuple received from the publication side (in 'remoteslot') in
1550  * the corresponding local relation using either replica identity index,
1551  * primary key or if needed, sequential scan.
1552  *
1553  * Local tuple, if found, is returned in '*localslot'.
1554  */
1555 static bool
1557  LogicalRepRelation *remoterel,
1558  TupleTableSlot *remoteslot,
1559  TupleTableSlot **localslot)
1560 {
1561  Oid idxoid;
1562  bool found;
1563 
1564  *localslot = table_slot_create(localrel, &estate->es_tupleTable);
1565 
1566  idxoid = GetRelationIdentityOrPK(localrel);
1567  Assert(OidIsValid(idxoid) ||
1568  (remoterel->replident == REPLICA_IDENTITY_FULL));
1569 
1570  if (OidIsValid(idxoid))
1571  found = RelationFindReplTupleByIndex(localrel, idxoid,
1573  remoteslot, *localslot);
1574  else
1575  found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
1576  remoteslot, *localslot);
1577 
1578  return found;
1579 }
1580 
1581 /*
1582  * This handles insert, update, delete on a partitioned table.
1583  */
1584 static void
1586  EState *estate,
1587  TupleTableSlot *remoteslot,
1588  LogicalRepTupleData *newtup,
1589  LogicalRepRelMapEntry *relmapentry,
1590  CmdType operation)
1591 {
1592  Relation parentrel = relinfo->ri_RelationDesc;
1593  ModifyTableState *mtstate = NULL;
1594  PartitionTupleRouting *proute = NULL;
1595  ResultRelInfo *partrelinfo;
1596  Relation partrel;
1597  TupleTableSlot *remoteslot_part;
1598  TupleConversionMap *map;
1599  MemoryContext oldctx;
1600 
1601  /* ModifyTableState is needed for ExecFindPartition(). */
1602  mtstate = makeNode(ModifyTableState);
1603  mtstate->ps.plan = NULL;
1604  mtstate->ps.state = estate;
1605  mtstate->operation = operation;
1606  mtstate->resultRelInfo = relinfo;
1607  proute = ExecSetupPartitionTupleRouting(estate, parentrel);
1608 
1609  /*
1610  * Find the partition to which the "search tuple" belongs.
1611  */
1612  Assert(remoteslot != NULL);
1614  partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
1615  remoteslot, estate);
1616  Assert(partrelinfo != NULL);
1617  partrel = partrelinfo->ri_RelationDesc;
1618 
1619  /*
1620  * To perform any of the operations below, the tuple must match the
1621  * partition's rowtype. Convert if needed or just copy, using a dedicated
1622  * slot to store the tuple in any case.
1623  */
1624  remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
1625  if (remoteslot_part == NULL)
1626  remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
1627  map = partrelinfo->ri_RootToPartitionMap;
1628  if (map != NULL)
1629  remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
1630  remoteslot_part);
1631  else
1632  {
1633  remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
1634  slot_getallattrs(remoteslot_part);
1635  }
1636  MemoryContextSwitchTo(oldctx);
1637 
1638  switch (operation)
1639  {
1640  case CMD_INSERT:
1641  apply_handle_insert_internal(partrelinfo, estate,
1642  remoteslot_part);
1643  break;
1644 
1645  case CMD_DELETE:
1646  apply_handle_delete_internal(partrelinfo, estate,
1647  remoteslot_part,
1648  &relmapentry->remoterel);
1649  break;
1650 
1651  case CMD_UPDATE:
1652 
1653  /*
1654  * For UPDATE, depending on whether or not the updated tuple
1655  * satisfies the partition's constraint, perform a simple UPDATE
1656  * of the partition or move the updated tuple into a different
1657  * suitable partition.
1658  */
1659  {
1660  AttrMap *attrmap = map ? map->attrMap : NULL;
1661  LogicalRepRelMapEntry *part_entry;
1662  TupleTableSlot *localslot;
1663  ResultRelInfo *partrelinfo_new;
1664  bool found;
1665 
1666  part_entry = logicalrep_partition_open(relmapentry, partrel,
1667  attrmap);
1668 
1669  /* Get the matching local tuple from the partition. */
1670  found = FindReplTupleInLocalRel(estate, partrel,
1671  &part_entry->remoterel,
1672  remoteslot_part, &localslot);
1673 
1675  if (found)
1676  {
1677  /* Apply the update. */
1678  slot_modify_data(remoteslot_part, localslot,
1679  part_entry,
1680  newtup);
1681  MemoryContextSwitchTo(oldctx);
1682  }
1683  else
1684  {
1685  /*
1686  * The tuple to be updated could not be found.
1687  *
1688  * TODO what to do here, change the log level to LOG
1689  * perhaps?
1690  */
1691  elog(DEBUG1,
1692  "logical replication did not find row for update "
1693  "in replication target relation \"%s\"",
1694  RelationGetRelationName(partrel));
1695  }
1696 
1697  /*
1698  * Does the updated tuple still satisfy the current
1699  * partition's constraint?
1700  */
1701  if (!partrel->rd_rel->relispartition ||
1702  ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
1703  false))
1704  {
1705  /*
1706  * Yes, so simply UPDATE the partition. We don't call
1707  * apply_handle_update_internal() here, which would
1708  * normally do the following work, to avoid repeating some
1709  * work already done above to find the local tuple in the
1710  * partition.
1711  */
1712  EPQState epqstate;
1713 
1714  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1715  ExecOpenIndices(partrelinfo, false);
1716 
1717  EvalPlanQualSetSlot(&epqstate, remoteslot_part);
1718  ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
1719  localslot, remoteslot_part);
1720  ExecCloseIndices(partrelinfo);
1721  EvalPlanQualEnd(&epqstate);
1722  }
1723  else
1724  {
1725  /* Move the tuple into the new partition. */
1726 
1727  /*
1728  * New partition will be found using tuple routing, which
1729  * can only occur via the parent table. We might need to
1730  * convert the tuple to the parent's rowtype. Note that
1731  * this is the tuple found in the partition, not the
1732  * original search tuple received by this function.
1733  */
1734  if (map)
1735  {
1736  TupleConversionMap *PartitionToRootMap =
1738  RelationGetDescr(parentrel));
1739 
1740  remoteslot =
1741  execute_attr_map_slot(PartitionToRootMap->attrMap,
1742  remoteslot_part, remoteslot);
1743  }
1744  else
1745  {
1746  remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
1747  slot_getallattrs(remoteslot);
1748  }
1749 
1750 
1751  /* Find the new partition. */
1753  partrelinfo_new = ExecFindPartition(mtstate, relinfo,
1754  proute, remoteslot,
1755  estate);
1756  MemoryContextSwitchTo(oldctx);
1757  Assert(partrelinfo_new != partrelinfo);
1758 
1759  /* DELETE old tuple found in the old partition. */
1760  apply_handle_delete_internal(partrelinfo, estate,
1761  localslot,
1762  &relmapentry->remoterel);
1763 
1764  /* INSERT new tuple into the new partition. */
1765 
1766  /*
1767  * Convert the replacement tuple to match the destination
1768  * partition rowtype.
1769  */
1771  partrel = partrelinfo_new->ri_RelationDesc;
1772  remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
1773  if (remoteslot_part == NULL)
1774  remoteslot_part = table_slot_create(partrel,
1775  &estate->es_tupleTable);
1776  map = partrelinfo_new->ri_RootToPartitionMap;
1777  if (map != NULL)
1778  {
1779  remoteslot_part = execute_attr_map_slot(map->attrMap,
1780  remoteslot,
1781  remoteslot_part);
1782  }
1783  else
1784  {
1785  remoteslot_part = ExecCopySlot(remoteslot_part,
1786  remoteslot);
1787  slot_getallattrs(remoteslot);
1788  }
1789  MemoryContextSwitchTo(oldctx);
1790  apply_handle_insert_internal(partrelinfo_new, estate,
1791  remoteslot_part);
1792  }
1793  }
1794  break;
1795 
1796  default:
1797  elog(ERROR, "unrecognized CmdType: %d", (int) operation);
1798  break;
1799  }
1800 
1801  ExecCleanupTupleRouting(mtstate, proute);
1802 }
1803 
1804 /*
1805  * Handle TRUNCATE message.
1806  *
1807  * TODO: FDW support
1808  */
1809 static void
1811 {
1812  bool cascade = false;
1813  bool restart_seqs = false;
1814  List *remote_relids = NIL;
1815  List *remote_rels = NIL;
1816  List *rels = NIL;
1817  List *part_rels = NIL;
1818  List *relids = NIL;
1819  List *relids_extra = NIL;
1820  List *relids_logged = NIL;
1821  ListCell *lc;
1822 
1824  return;
1825 
1827 
1828  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
1829 
1830  foreach(lc, remote_relids)
1831  {
1832  LogicalRepRelId relid = lfirst_oid(lc);
1833  LogicalRepRelMapEntry *rel;
1834 
1835  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1836  if (!should_apply_changes_for_rel(rel))
1837  {
1838  /*
1839  * The relation can't become interesting in the middle of the
1840  * transaction so it's safe to unlock it.
1841  */
1843  continue;
1844  }
1845 
1846  remote_rels = lappend(remote_rels, rel);
1847  rels = lappend(rels, rel->localrel);
1848  relids = lappend_oid(relids, rel->localreloid);
1849  relids_extra = lappend_int(relids_extra, TRUNCATE_REL_CONTEXT_NORMAL);
1851  relids_logged = lappend_oid(relids_logged, rel->localreloid);
1852 
1853  /*
1854  * Truncate partitions if we got a message to truncate a partitioned
1855  * table.
1856  */
1857  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1858  {
1859  ListCell *child;
1860  List *children = find_all_inheritors(rel->localreloid,
1862  NULL);
1863 
1864  foreach(child, children)
1865  {
1866  Oid childrelid = lfirst_oid(child);
1867  Relation childrel;
1868 
1869  if (list_member_oid(relids, childrelid))
1870  continue;
1871 
1872  /* find_all_inheritors already got lock */
1873  childrel = table_open(childrelid, NoLock);
1874 
1875  /*
1876  * Ignore temp tables of other backends. See similar code in
1877  * ExecuteTruncate().
1878  */
1879  if (RELATION_IS_OTHER_TEMP(childrel))
1880  {
1881  table_close(childrel, RowExclusiveLock);
1882  continue;
1883  }
1884 
1885  rels = lappend(rels, childrel);
1886  part_rels = lappend(part_rels, childrel);
1887  relids = lappend_oid(relids, childrelid);
1888  relids_extra = lappend_int(relids_extra, TRUNCATE_REL_CONTEXT_CASCADING);
1889  /* Log this relation only if needed for logical decoding */
1890  if (RelationIsLogicallyLogged(childrel))
1891  relids_logged = lappend_oid(relids_logged, childrelid);
1892  }
1893  }
1894  }
1895 
1896  /*
1897  * Even if we used CASCADE on the upstream primary we explicitly default
1898  * to replaying changes without further cascading. This might be later
1899  * changeable with a user specified option.
1900  */
1901  ExecuteTruncateGuts(rels,
1902  relids,
1903  relids_extra,
1904  relids_logged,
1905  DROP_RESTRICT,
1906  restart_seqs);
1907  foreach(lc, remote_rels)
1908  {
1909  LogicalRepRelMapEntry *rel = lfirst(lc);
1910 
1912  }
1913  foreach(lc, part_rels)
1914  {
1915  Relation rel = lfirst(lc);
1916 
1917  table_close(rel, NoLock);
1918  }
1919 
1921 }
1922 
1923 
1924 /*
1925  * Logical replication protocol message dispatcher.
1926  */
1927 static void
1929 {
1931 
1932  switch (action)
1933  {
1934  case LOGICAL_REP_MSG_BEGIN:
1935  apply_handle_begin(s);
1936  return;
1937 
1940  return;
1941 
1944  return;
1945 
1948  return;
1949 
1952  return;
1953 
1956  return;
1957 
1960  return;
1961 
1962  case LOGICAL_REP_MSG_TYPE:
1963  apply_handle_type(s);
1964  return;
1965 
1968  return;
1969 
1971 
1972  /*
1973  * Logical replication does not use generic logical messages yet.
1974  * Although, it could be used by other applications that use this
1975  * output plugin.
1976  */
1977  return;
1978 
1981  return;
1982 
1985  return;
1986 
1989  return;
1990 
1993  return;
1994  }
1995 
1996  ereport(ERROR,
1997  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1998  errmsg("invalid logical replication message type \"%c\"", action)));
1999 }
2000 
2001 /*
2002  * Figure out which write/flush positions to report to the walsender process.
2003  *
2004  * We can't simply report back the last LSN the walsender sent us because the
2005  * local transaction might not yet be flushed to disk locally. Instead we
2006  * build a list that associates local with remote LSNs for every commit. When
2007  * reporting back the flush position to the sender we iterate that list and
2008  * check which entries on it are already locally flushed. Those we can report
2009  * as having been flushed.
2010  *
2011  * The have_pending_txes is true if there are outstanding transactions that
2012  * need to be flushed.
2013  */
2014 static void
2016  bool *have_pending_txes)
2017 {
2018  dlist_mutable_iter iter;
2019  XLogRecPtr local_flush = GetFlushRecPtr();
2020 
2021  *write = InvalidXLogRecPtr;
2022  *flush = InvalidXLogRecPtr;
2023 
2024  dlist_foreach_modify(iter, &lsn_mapping)
2025  {
2026  FlushPosition *pos =
2028 
2029  *write = pos->remote_end;
2030 
2031  if (pos->local_end <= local_flush)
2032  {
2033  *flush = pos->remote_end;
2034  dlist_delete(iter.cur);
2035  pfree(pos);
2036  }
2037  else
2038  {
2039  /*
2040  * Don't want to uselessly iterate over the rest of the list which
2041  * could potentially be long. Instead get the last element and
2042  * grab the write position from there.
2043  */
2045  &lsn_mapping);
2046  *write = pos->remote_end;
2047  *have_pending_txes = true;
2048  return;
2049  }
2050  }
2051 
2052  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
2053 }
2054 
2055 /*
2056  * Store current remote/local lsn pair in the tracking list.
2057  */
2058 static void
2060 {
2061  FlushPosition *flushpos;
2062 
2063  /* Need to do this in permanent context */
2064  MemoryContextSwitchTo(ApplyContext);
2065 
2066  /* Track commit lsn */
2067  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
2068  flushpos->local_end = XactLastCommitEnd;
2069  flushpos->remote_end = remote_lsn;
2070 
2071  dlist_push_tail(&lsn_mapping, &flushpos->node);
2072  MemoryContextSwitchTo(ApplyMessageContext);
2073 }
2074 
2075 
2076 /* Update statistics of the worker. */
2077 static void
2078 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
2079 {
2080  MyLogicalRepWorker->last_lsn = last_lsn;
2081  MyLogicalRepWorker->last_send_time = send_time;
2083  if (reply)
2084  {
2085  MyLogicalRepWorker->reply_lsn = last_lsn;
2086  MyLogicalRepWorker->reply_time = send_time;
2087  }
2088 }
2089 
2090 /*
2091  * Apply main loop.
2092  */
2093 static void
2095 {
2096  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
2097  bool ping_sent = false;
2098  TimeLineID tli;
2099 
2100  /*
2101  * Init the ApplyMessageContext which we clean up after each replication
2102  * protocol message.
2103  */
2104  ApplyMessageContext = AllocSetContextCreate(ApplyContext,
2105  "ApplyMessageContext",
2107 
2108  /*
2109  * This memory context is used for per-stream data when the streaming mode
2110  * is enabled. This context is reset on each stream stop.
2111  */
2112  LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
2113  "LogicalStreamingContext",
2115 
2116  /* mark as idle, before starting to loop */
2118 
2119  /* This outer loop iterates once per wait. */
2120  for (;;)
2121  {
2123  int rc;
2124  int len;
2125  char *buf = NULL;
2126  bool endofstream = false;
2127  long wait_time;
2128 
2130 
2131  MemoryContextSwitchTo(ApplyMessageContext);
2132 
2133  len = walrcv_receive(wrconn, &buf, &fd);
2134 
2135  if (len != 0)
2136  {
2137  /* Loop to process all available data (without blocking). */
2138  for (;;)
2139  {
2141 
2142  if (len == 0)
2143  {
2144  break;
2145  }
2146  else if (len < 0)
2147  {
2148  ereport(LOG,
2149  (errmsg("data stream from publisher has ended")));
2150  endofstream = true;
2151  break;
2152  }
2153  else
2154  {
2155  int c;
2156  StringInfoData s;
2157 
2158  /* Reset timeout. */
2159  last_recv_timestamp = GetCurrentTimestamp();
2160  ping_sent = false;
2161 
2162  /* Ensure we are reading the data into our memory context. */
2163  MemoryContextSwitchTo(ApplyMessageContext);
2164 
2165  s.data = buf;
2166  s.len = len;
2167  s.cursor = 0;
2168  s.maxlen = -1;
2169 
2170  c = pq_getmsgbyte(&s);
2171 
2172  if (c == 'w')
2173  {
2174  XLogRecPtr start_lsn;
2175  XLogRecPtr end_lsn;
2176  TimestampTz send_time;
2177 
2178  start_lsn = pq_getmsgint64(&s);
2179  end_lsn = pq_getmsgint64(&s);
2180  send_time = pq_getmsgint64(&s);
2181 
2182  if (last_received < start_lsn)
2183  last_received = start_lsn;
2184 
2185  if (last_received < end_lsn)
2186  last_received = end_lsn;
2187 
2188  UpdateWorkerStats(last_received, send_time, false);
2189 
2190  apply_dispatch(&s);
2191  }
2192  else if (c == 'k')
2193  {
2194  XLogRecPtr end_lsn;
2196  bool reply_requested;
2197 
2198  end_lsn = pq_getmsgint64(&s);
2199  timestamp = pq_getmsgint64(&s);
2200  reply_requested = pq_getmsgbyte(&s);
2201 
2202  if (last_received < end_lsn)
2203  last_received = end_lsn;
2204 
2205  send_feedback(last_received, reply_requested, false);
2206  UpdateWorkerStats(last_received, timestamp, true);
2207  }
2208  /* other message types are purposefully ignored */
2209 
2210  MemoryContextReset(ApplyMessageContext);
2211  }
2212 
2213  len = walrcv_receive(wrconn, &buf, &fd);
2214  }
2215  }
2216 
2217  /* confirm all writes so far */
2218  send_feedback(last_received, false, false);
2219 
2221  {
2222  /*
2223  * If we didn't get any transactions for a while there might be
2224  * unconsumed invalidation messages in the queue, consume them
2225  * now.
2226  */
2229 
2230  /* Process any table synchronization changes. */
2231  process_syncing_tables(last_received);
2232  }
2233 
2234  /* Cleanup the memory. */
2235  MemoryContextResetAndDeleteChildren(ApplyMessageContext);
2237 
2238  /* Check if we need to exit the streaming loop. */
2239  if (endofstream)
2240  break;
2241 
2242  /*
2243  * Wait for more data or latch. If we have unflushed transactions,
2244  * wake up after WalWriterDelay to see if they've been flushed yet (in
2245  * which case we should send a feedback message). Otherwise, there's
2246  * no particular urgency about waking up unless we get data or a
2247  * signal.
2248  */
2249  if (!dlist_is_empty(&lsn_mapping))
2250  wait_time = WalWriterDelay;
2251  else
2252  wait_time = NAPTIME_PER_CYCLE;
2253 
2257  fd, wait_time,
2259 
2260  if (rc & WL_LATCH_SET)
2261  {
2264  }
2265 
2266  if (ConfigReloadPending)
2267  {
2268  ConfigReloadPending = false;
2270  }
2271 
2272  if (rc & WL_TIMEOUT)
2273  {
2274  /*
2275  * We didn't receive anything new. If we haven't heard anything
2276  * from the server for more than wal_receiver_timeout / 2, ping
2277  * the server. Also, if it's been longer than
2278  * wal_receiver_status_interval since the last update we sent,
2279  * send a status update to the primary anyway, to report any
2280  * progress in applying WAL.
2281  */
2282  bool requestReply = false;
2283 
2284  /*
2285  * Check if time since last receive from primary has reached the
2286  * configured limit.
2287  */
2288  if (wal_receiver_timeout > 0)
2289  {
2291  TimestampTz timeout;
2292 
2293  timeout =
2294  TimestampTzPlusMilliseconds(last_recv_timestamp,
2296 
2297  if (now >= timeout)
2298  ereport(ERROR,
2299  (errmsg("terminating logical replication worker due to timeout")));
2300 
2301  /* Check to see if it's time for a ping. */
2302  if (!ping_sent)
2303  {
2304  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
2305  (wal_receiver_timeout / 2));
2306  if (now >= timeout)
2307  {
2308  requestReply = true;
2309  ping_sent = true;
2310  }
2311  }
2312  }
2313 
2314  send_feedback(last_received, requestReply, requestReply);
2315  }
2316  }
2317 
2318  /* All done */
2319  walrcv_endstreaming(wrconn, &tli);
2320 }
2321 
2322 /*
2323  * Send a Standby Status Update message to server.
2324  *
2325  * 'recvpos' is the latest LSN we've received data to, force is set if we need
2326  * to send a response to avoid timeouts.
2327  */
2328 static void
2329 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
2330 {
2331  static StringInfo reply_message = NULL;
2332  static TimestampTz send_time = 0;
2333 
2334  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
2335  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
2336  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
2337 
2338  XLogRecPtr writepos;
2339  XLogRecPtr flushpos;
2340  TimestampTz now;
2341  bool have_pending_txes;
2342 
2343  /*
2344  * If the user doesn't want status to be reported to the publisher, be
2345  * sure to exit before doing anything at all.
2346  */
2347  if (!force && wal_receiver_status_interval <= 0)
2348  return;
2349 
2350  /* It's legal to not pass a recvpos */
2351  if (recvpos < last_recvpos)
2352  recvpos = last_recvpos;
2353 
2354  get_flush_position(&writepos, &flushpos, &have_pending_txes);
2355 
2356  /*
2357  * No outstanding transactions to flush, we can report the latest received
2358  * position. This is important for synchronous replication.
2359  */
2360  if (!have_pending_txes)
2361  flushpos = writepos = recvpos;
2362 
2363  if (writepos < last_writepos)
2364  writepos = last_writepos;
2365 
2366  if (flushpos < last_flushpos)
2367  flushpos = last_flushpos;
2368 
2369  now = GetCurrentTimestamp();
2370 
2371  /* if we've already reported everything we're good */
2372  if (!force &&
2373  writepos == last_writepos &&
2374  flushpos == last_flushpos &&
2375  !TimestampDifferenceExceeds(send_time, now,
2377  return;
2378  send_time = now;
2379 
2380  if (!reply_message)
2381  {
2382  MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
2383 
2384  reply_message = makeStringInfo();
2385  MemoryContextSwitchTo(oldctx);
2386  }
2387  else
2388  resetStringInfo(reply_message);
2389 
2390  pq_sendbyte(reply_message, 'r');
2391  pq_sendint64(reply_message, recvpos); /* write */
2392  pq_sendint64(reply_message, flushpos); /* flush */
2393  pq_sendint64(reply_message, writepos); /* apply */
2394  pq_sendint64(reply_message, now); /* sendTime */
2395  pq_sendbyte(reply_message, requestReply); /* replyRequested */
2396 
2397  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
2398  force,
2399  LSN_FORMAT_ARGS(recvpos),
2400  LSN_FORMAT_ARGS(writepos),
2401  LSN_FORMAT_ARGS(flushpos));
2402 
2403  walrcv_send(wrconn, reply_message->data, reply_message->len);
2404 
2405  if (recvpos > last_recvpos)
2406  last_recvpos = recvpos;
2407  if (writepos > last_writepos)
2408  last_writepos = writepos;
2409  if (flushpos > last_flushpos)
2410  last_flushpos = flushpos;
2411 }
2412 
2413 /*
2414  * Reread subscription info if needed. Most changes will be exit.
2415  */
2416 static void
2418 {
2419  MemoryContext oldctx;
2421  bool started_tx = false;
2422 
2423  /* When cache state is valid there is nothing to do here. */
2424  if (MySubscriptionValid)
2425  return;
2426 
2427  /* This function might be called inside or outside of transaction. */
2428  if (!IsTransactionState())
2429  {
2431  started_tx = true;
2432  }
2433 
2434  /* Ensure allocations in permanent context. */
2435  oldctx = MemoryContextSwitchTo(ApplyContext);
2436 
2437  newsub = GetSubscription(MyLogicalRepWorker->subid, true);
2438 
2439  /*
2440  * Exit if the subscription was removed. This normally should not happen
2441  * as the worker gets killed during DROP SUBSCRIPTION.
2442  */
2443  if (!newsub)
2444  {
2445  ereport(LOG,
2446  (errmsg("logical replication apply worker for subscription \"%s\" will "
2447  "stop because the subscription was removed",
2448  MySubscription->name)));
2449 
2450  proc_exit(0);
2451  }
2452 
2453  /*
2454  * Exit if the subscription was disabled. This normally should not happen
2455  * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
2456  */
2457  if (!newsub->enabled)
2458  {
2459  ereport(LOG,
2460  (errmsg("logical replication apply worker for subscription \"%s\" will "
2461  "stop because the subscription was disabled",
2462  MySubscription->name)));
2463 
2464  proc_exit(0);
2465  }
2466 
2467  /* !slotname should never happen when enabled is true. */
2468  Assert(newsub->slotname);
2469 
2470  /*
2471  * Exit if any parameter that affects the remote connection was changed.
2472  * The launcher will start a new worker.
2473  */
2474  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
2475  strcmp(newsub->name, MySubscription->name) != 0 ||
2476  strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
2477  newsub->binary != MySubscription->binary ||
2478  newsub->stream != MySubscription->stream ||
2479  !equal(newsub->publications, MySubscription->publications))
2480  {
2481  ereport(LOG,
2482  (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
2483  MySubscription->name)));
2484 
2485  proc_exit(0);
2486  }
2487 
2488  /* Check for other changes that should never happen too. */
2489  if (newsub->dbid != MySubscription->dbid)
2490  {
2491  elog(ERROR, "subscription %u changed unexpectedly",
2493  }
2494 
2495  /* Clean old subscription info and switch to new one. */
2496  FreeSubscription(MySubscription);
2497  MySubscription = newsub;
2498 
2499  MemoryContextSwitchTo(oldctx);
2500 
2501  /* Change synchronous commit according to the user's wishes */
2502  SetConfigOption("synchronous_commit", MySubscription->synccommit,
2504 
2505  if (started_tx)
2507 
2508  MySubscriptionValid = true;
2509 }
2510 
2511 /*
2512  * Callback from subscription syscache invalidation.
2513  */
2514 static void
2515 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
2516 {
2517  MySubscriptionValid = false;
2518 }
2519 
2520 /*
2521  * subxact_info_write
2522  * Store information about subxacts for a toplevel transaction.
2523  *
2524  * For each subxact we store offset of it's first change in the main file.
2525  * The file is always over-written as a whole.
2526  *
2527  * XXX We should only store subxacts that were not aborted yet.
2528  */
2529 static void
2531 {
2532  char path[MAXPGPATH];
2533  bool found;
2534  Size len;
2535  StreamXidHash *ent;
2536  BufFile *fd;
2537 
2539 
2540  /* find the xid entry in the xidhash */
2541  ent = (StreamXidHash *) hash_search(xidhash,
2542  (void *) &xid,
2543  HASH_FIND,
2544  &found);
2545  /* we must found the entry for its top transaction by this time */
2546  Assert(found);
2547 
2548  /*
2549  * If there is no subtransaction then nothing to do, but if already have
2550  * subxact file then delete that.
2551  */
2552  if (subxact_data.nsubxacts == 0)
2553  {
2554  if (ent->subxact_fileset)
2555  {
2558  pfree(ent->subxact_fileset);
2559  ent->subxact_fileset = NULL;
2560  }
2561  return;
2562  }
2563 
2564  subxact_filename(path, subid, xid);
2565 
2566  /*
2567  * Create the subxact file if it not already created, otherwise open the
2568  * existing file.
2569  */
2570  if (ent->subxact_fileset == NULL)
2571  {
2572  MemoryContext oldctx;
2573 
2574  /*
2575  * We need to maintain shared fileset across multiple stream
2576  * start/stop calls. So, need to allocate it in a persistent context.
2577  */
2578  oldctx = MemoryContextSwitchTo(ApplyContext);
2579  ent->subxact_fileset = palloc(sizeof(SharedFileSet));
2580  SharedFileSetInit(ent->subxact_fileset, NULL);
2581  MemoryContextSwitchTo(oldctx);
2582 
2583  fd = BufFileCreateShared(ent->subxact_fileset, path);
2584  }
2585  else
2586  fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR);
2587 
2588  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
2589 
2590  /* Write the subxact count and subxact info */
2591  BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
2592  BufFileWrite(fd, subxact_data.subxacts, len);
2593 
2594  BufFileClose(fd);
2595 
2596  /* free the memory allocated for subxact info */
2598 }
2599 
2600 /*
2601  * subxact_info_read
2602  * Restore information about subxacts of a streamed transaction.
2603  *
2604  * Read information about subxacts into the structure subxact_data that can be
2605  * used later.
2606  */
2607 static void
2609 {
2610  char path[MAXPGPATH];
2611  bool found;
2612  Size len;
2613  BufFile *fd;
2614  StreamXidHash *ent;
2615  MemoryContext oldctx;
2616 
2618  Assert(!subxact_data.subxacts);
2619  Assert(subxact_data.nsubxacts == 0);
2620  Assert(subxact_data.nsubxacts_max == 0);
2621 
2622  /* Find the stream xid entry in the xidhash */
2623  ent = (StreamXidHash *) hash_search(xidhash,
2624  (void *) &xid,
2625  HASH_FIND,
2626  &found);
2627 
2628  /*
2629  * If subxact_fileset is not valid that mean we don't have any subxact
2630  * info
2631  */
2632  if (ent->subxact_fileset == NULL)
2633  return;
2634 
2635  subxact_filename(path, subid, xid);
2636 
2637  fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDONLY);
2638 
2639  /* read number of subxact items */
2640  if (BufFileRead(fd, &subxact_data.nsubxacts,
2641  sizeof(subxact_data.nsubxacts)) !=
2642  sizeof(subxact_data.nsubxacts))
2643  ereport(ERROR,
2645  errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
2646  path)));
2647 
2648  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
2649 
2650  /* we keep the maximum as a power of 2 */
2651  subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts);
2652 
2653  /*
2654  * Allocate subxact information in the logical streaming context. We need
2655  * this information during the complete stream so that we can add the sub
2656  * transaction info to this. On stream stop we will flush this information
2657  * to the subxact file and reset the logical streaming context.
2658  */
2659  oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
2660  subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
2661  sizeof(SubXactInfo));
2662  MemoryContextSwitchTo(oldctx);
2663 
2664  if ((len > 0) && ((BufFileRead(fd, subxact_data.subxacts, len)) != len))
2665  ereport(ERROR,
2667  errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
2668  path)));
2669 
2670  BufFileClose(fd);
2671 }
2672 
2673 /*
2674  * subxact_info_add
2675  * Add information about a subxact (offset in the main file).
2676  */
2677 static void
2679 {
2680  SubXactInfo *subxacts = subxact_data.subxacts;
2681  int64 i;
2682 
2683  /* We must have a valid top level stream xid and a stream fd. */
2685  Assert(stream_fd != NULL);
2686 
2687  /*
2688  * If the XID matches the toplevel transaction, we don't want to add it.
2689  */
2690  if (stream_xid == xid)
2691  return;
2692 
2693  /*
2694  * In most cases we're checking the same subxact as we've already seen in
2695  * the last call, so make sure to ignore it (this change comes later).
2696  */
2697  if (subxact_data.subxact_last == xid)
2698  return;
2699 
2700  /* OK, remember we're processing this XID. */
2701  subxact_data.subxact_last = xid;
2702 
2703  /*
2704  * Check if the transaction is already present in the array of subxact. We
2705  * intentionally scan the array from the tail, because we're likely adding
2706  * a change for the most recent subtransactions.
2707  *
2708  * XXX Can we rely on the subxact XIDs arriving in sorted order? That
2709  * would allow us to use binary search here.
2710  */
2711  for (i = subxact_data.nsubxacts; i > 0; i--)
2712  {
2713  /* found, so we're done */
2714  if (subxacts[i - 1].xid == xid)
2715  return;
2716  }
2717 
2718  /* This is a new subxact, so we need to add it to the array. */
2719  if (subxact_data.nsubxacts == 0)
2720  {
2721  MemoryContext oldctx;
2722 
2723  subxact_data.nsubxacts_max = 128;
2724 
2725  /*
2726  * Allocate this memory for subxacts in per-stream context, see
2727  * subxact_info_read.
2728  */
2729  oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
2730  subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
2731  MemoryContextSwitchTo(oldctx);
2732  }
2733  else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
2734  {
2735  subxact_data.nsubxacts_max *= 2;
2736  subxacts = repalloc(subxacts,
2737  subxact_data.nsubxacts_max * sizeof(SubXactInfo));
2738  }
2739 
2740  subxacts[subxact_data.nsubxacts].xid = xid;
2741 
2742  /*
2743  * Get the current offset of the stream file and store it as offset of
2744  * this subxact.
2745  */
2746  BufFileTell(stream_fd,
2747  &subxacts[subxact_data.nsubxacts].fileno,
2748  &subxacts[subxact_data.nsubxacts].offset);
2749 
2750  subxact_data.nsubxacts++;
2751  subxact_data.subxacts = subxacts;
2752 }
2753 
2754 /* format filename for file containing the info about subxacts */
2755 static inline void
2756 subxact_filename(char *path, Oid subid, TransactionId xid)
2757 {
2758  snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
2759 }
2760 
2761 /* format filename for file containing serialized changes */
2762 static inline void
2763 changes_filename(char *path, Oid subid, TransactionId xid)
2764 {
2765  snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
2766 }
2767 
2768 /*
2769  * stream_cleanup_files
2770  * Cleanup files for a subscription / toplevel transaction.
2771  *
2772  * Remove files with serialized changes and subxact info for a particular
2773  * toplevel transaction. Each subscription has a separate set of files.
2774  */
2775 static void
2777 {
2778  char path[MAXPGPATH];
2779  StreamXidHash *ent;
2780  bool found = false;
2781 
2782  /* By this time we must have created the transaction entry */
2783  ent = (StreamXidHash *) hash_search(xidhash,
2784  (void *) &xid,
2785  HASH_FIND,
2786  &found);
2787  Assert(found);
2788 
2789  /* Delete the change file and release the stream fileset memory */
2790  changes_filename(path, subid, xid);
2792  pfree(ent->stream_fileset);
2793  ent->stream_fileset = NULL;
2794 
2795  /* Delete the subxact file and release the memory, if it exist */
2796  if (ent->subxact_fileset)
2797  {
2798  subxact_filename(path, subid, xid);
2800  pfree(ent->subxact_fileset);
2801  ent->subxact_fileset = NULL;
2802  }
2803 
2804  /* Remove the xid entry from the stream xid hash */
2805  hash_search(xidhash, (void *) &xid, HASH_REMOVE, NULL);
2806 }
2807 
2808 /*
2809  * stream_open_file
2810  * Open a file that we'll use to serialize changes for a toplevel
2811  * transaction.
2812  *
2813  * Open a file for streamed changes from a toplevel transaction identified
2814  * by stream_xid (global variable). If it's the first chunk of streamed
2815  * changes for this transaction, initialize the shared fileset and create the
2816  * buffile, otherwise open the previously created file.
2817  *
2818  * This can only be called at the beginning of a "streaming" block, i.e.
2819  * between stream_start/stream_stop messages from the upstream.
2820  */
2821 static void
2822 stream_open_file(Oid subid, TransactionId xid, bool first_segment)
2823 {
2824  char path[MAXPGPATH];
2825  bool found;
2826  MemoryContext oldcxt;
2827  StreamXidHash *ent;
2828 
2830  Assert(OidIsValid(subid));
2832  Assert(stream_fd == NULL);
2833 
2834  /* create or find the xid entry in the xidhash */
2835  ent = (StreamXidHash *) hash_search(xidhash,
2836  (void *) &xid,
2838  &found);
2839  Assert(first_segment || found);
2840  changes_filename(path, subid, xid);
2841  elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
2842 
2843  /*
2844  * Create/open the buffiles under the logical streaming context so that we
2845  * have those files until stream stop.
2846  */
2847  oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
2848 
2849  /*
2850  * If this is the first streamed segment, the file must not exist, so make
2851  * sure we're the ones creating it. Otherwise just open the file for
2852  * writing, in append mode.
2853  */
2854  if (first_segment)
2855  {
2856  MemoryContext savectx;
2857  SharedFileSet *fileset;
2858 
2859  /*
2860  * We need to maintain shared fileset across multiple stream
2861  * start/stop calls. So, need to allocate it in a persistent context.
2862  */
2863  savectx = MemoryContextSwitchTo(ApplyContext);
2864  fileset = palloc(sizeof(SharedFileSet));
2865 
2866  SharedFileSetInit(fileset, NULL);
2867  MemoryContextSwitchTo(savectx);
2868 
2869  stream_fd = BufFileCreateShared(fileset, path);
2870 
2871  /* Remember the fileset for the next stream of the same transaction */
2872  ent->xid = xid;
2873  ent->stream_fileset = fileset;
2874  ent->subxact_fileset = NULL;
2875  }
2876  else
2877  {
2878  /*
2879  * Open the file and seek to the end of the file because we always
2880  * append the changes file.
2881  */
2882  stream_fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
2883  BufFileSeek(stream_fd, 0, 0, SEEK_END);
2884  }
2885 
2886  MemoryContextSwitchTo(oldcxt);
2887 }
2888 
2889 /*
2890  * stream_close_file
2891  * Close the currently open file with streamed changes.
2892  *
2893  * This can only be called at the end of a streaming block, i.e. at stream_stop
2894  * message from the upstream.
2895  */
2896 static void
2898 {
2901  Assert(stream_fd != NULL);
2902 
2903  BufFileClose(stream_fd);
2904 
2906  stream_fd = NULL;
2907 }
2908 
2909 /*
2910  * stream_write_change
2911  * Serialize a change to a file for the current toplevel transaction.
2912  *
2913  * The change is serialized in a simple format, with length (not including
2914  * the length), action code (identifying the message type) and message
2915  * contents (without the subxact TransactionId value).
2916  */
2917 static void
2919 {
2920  int len;
2921 
2924  Assert(stream_fd != NULL);
2925 
2926  /* total on-disk size, including the action type character */
2927  len = (s->len - s->cursor) + sizeof(char);
2928 
2929  /* first write the size */
2930  BufFileWrite(stream_fd, &len, sizeof(len));
2931 
2932  /* then the action */
2933  BufFileWrite(stream_fd, &action, sizeof(action));
2934 
2935  /* and finally the remaining part of the buffer (after the XID) */
2936  len = (s->len - s->cursor);
2937 
2938  BufFileWrite(stream_fd, &s->data[s->cursor], len);
2939 }
2940 
2941 /*
2942  * Cleanup the memory for subxacts and reset the related variables.
2943  */
2944 static inline void
2946 {
2947  if (subxact_data.subxacts)
2948  pfree(subxact_data.subxacts);
2949 
2950  subxact_data.subxacts = NULL;
2951  subxact_data.subxact_last = InvalidTransactionId;
2952  subxact_data.nsubxacts = 0;
2953  subxact_data.nsubxacts_max = 0;
2954 }
2955 
2956 /* Logical Replication Apply worker entry point */
2957 void
2959 {
2960  int worker_slot = DatumGetInt32(main_arg);
2961  MemoryContext oldctx;
2962  char originname[NAMEDATALEN];
2963  XLogRecPtr origin_startpos;
2964  char *myslotname;
2966 
2967  /* Attach to slot */
2968  logicalrep_worker_attach(worker_slot);
2969 
2970  /* Setup signal handling */
2972  pqsignal(SIGTERM, die);
2974 
2975  /*
2976  * We don't currently need any ResourceOwner in a walreceiver process, but
2977  * if we did, we could call CreateAuxProcessResourceOwner here.
2978  */
2979 
2980  /* Initialise stats to a sanish value */
2983 
2984  /* Load the libpq-specific functions */
2985  load_file("libpqwalreceiver", false);
2986 
2987  /* Run as replica session replication role. */
2988  SetConfigOption("session_replication_role", "replica",
2990 
2991  /* Connect to our database. */
2994  0);
2995 
2996  /*
2997  * Set always-secure search path, so malicious users can't redirect user
2998  * code (e.g. pg_index.indexprs).
2999  */
3000  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
3001 
3002  /* Load the subscription into persistent memory context. */
3003  ApplyContext = AllocSetContextCreate(TopMemoryContext,
3004  "ApplyContext",
3007  oldctx = MemoryContextSwitchTo(ApplyContext);
3008 
3009  MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
3010  if (!MySubscription)
3011  {
3012  ereport(LOG,
3013  (errmsg("logical replication apply worker for subscription %u will not "
3014  "start because the subscription was removed during startup",
3016  proc_exit(0);
3017  }
3018 
3019  MySubscriptionValid = true;
3020  MemoryContextSwitchTo(oldctx);
3021 
3022  if (!MySubscription->enabled)
3023  {
3024  ereport(LOG,
3025  (errmsg("logical replication apply worker for subscription \"%s\" will not "
3026  "start because the subscription was disabled during startup",
3027  MySubscription->name)));
3028 
3029  proc_exit(0);
3030  }
3031 
3032  /* Setup synchronous commit according to the user's wishes */
3033  SetConfigOption("synchronous_commit", MySubscription->synccommit,
3035 
3036  /* Keep us informed about subscription changes. */
3039  (Datum) 0);
3040 
3041  if (am_tablesync_worker())
3042  ereport(LOG,
3043  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
3044  MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
3045  else
3046  ereport(LOG,
3047  (errmsg("logical replication apply worker for subscription \"%s\" has started",
3048  MySubscription->name)));
3049 
3051 
3052  /* Connect to the origin and start the replication. */
3053  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
3054  MySubscription->conninfo);
3055 
3056  if (am_tablesync_worker())
3057  {
3058  char *syncslotname;
3059 
3060  /* This is table synchronization worker, call initial sync. */
3061  syncslotname = LogicalRepSyncTableStart(&origin_startpos);
3062 
3063  /* allocate slot name in long-lived context */
3064  myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
3065 
3066  pfree(syncslotname);
3067  }
3068  else
3069  {
3070  /* This is main apply worker */
3071  RepOriginId originid;
3072  TimeLineID startpointTLI;
3073  char *err;
3074 
3075  myslotname = MySubscription->slotname;
3076 
3077  /*
3078  * This shouldn't happen if the subscription is enabled, but guard
3079  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
3080  * crash if slot is NULL.)
3081  */
3082  if (!myslotname)
3083  ereport(ERROR,
3084  (errmsg("subscription has no replication slot set")));
3085 
3086  /* Setup replication origin tracking. */
3088  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
3089  originid = replorigin_by_name(originname, true);
3090  if (!OidIsValid(originid))
3091  originid = replorigin_create(originname);
3092  replorigin_session_setup(originid);
3093  replorigin_session_origin = originid;
3094  origin_startpos = replorigin_session_get_progress(false);
3096 
3097  wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
3098  &err);
3099  if (wrconn == NULL)
3100  ereport(ERROR,
3101  (errmsg("could not connect to the publisher: %s", err)));
3102 
3103  /*
3104  * We don't really use the output identify_system for anything but it
3105  * does some initializations on the upstream so let's still call it.
3106  */
3107  (void) walrcv_identify_system(wrconn, &startpointTLI);
3108  }
3109 
3110  /*
3111  * Setup callback for syscache so that we know when something changes in
3112  * the subscription relation state.
3113  */
3116  (Datum) 0);
3117 
3118  /* Build logical replication streaming options. */
3119  options.logical = true;
3120  options.startpoint = origin_startpos;
3121  options.slotname = myslotname;
3122  options.proto.logical.proto_version =
3123  walrcv_server_version(wrconn) >= 140000 ?
3125  options.proto.logical.publication_names = MySubscription->publications;
3126  options.proto.logical.binary = MySubscription->binary;
3127  options.proto.logical.streaming = MySubscription->stream;
3128 
3129  /* Start normal logical streaming replication. */
3130  walrcv_startstreaming(wrconn, &options);
3131 
3132  /* Run the main loop. */
3133  LogicalRepApplyLoop(origin_startpos);
3134 
3135  proc_exit(0);
3136 }
3137 
3138 /*
3139  * Is current process a logical replication worker?
3140  */
3141 bool
3143 {
3144  return MyLogicalRepWorker != NULL;
3145 }
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
Subscription * MySubscription
Definition: worker.c:161
static void apply_handle_type(StringInfo s)
Definition: worker.c:1148
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:475
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:263
#define NIL
Definition: pg_list.h:65
void ExecInitRangeTable(EState *estate, List *rangeTable)
Definition: execUtils.c:751
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:32
static void stream_close_file(void)
Definition: worker.c:2897
static void stream_write_change(char action, StringInfo s)
Definition: worker.c:2918
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:1165
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:2329
static void subxact_info_add(TransactionId xid)
Definition: worker.c:2678
Relation ri_RelationDesc
Definition: execnodes.h:411
WalReceiverConn * wrconn
Definition: worker.c:159
static void subxact_info_read(Oid subid, TransactionId xid)
Definition: worker.c:2608
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define AllocSetContextCreate
Definition: memutils.h:173
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:417
#define DEBUG1
Definition: elog.h:25
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
dlist_node * cur
Definition: ilist.h:180
static void store_flush_position(XLogRecPtr remote_lsn)
Definition: worker.c:2059
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1831
uint32 TimeLineID
Definition: xlogdefs.h:59
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:290
int fileno
Definition: worker.c:184
MemoryContext TopTransactionContext
Definition: mcxt.c:53
void AcceptInvalidationMessages(void)
Definition: inval.c:687
CommandId es_output_cid
Definition: execnodes.h:570
static XLogRecPtr remote_final_lsn
Definition: worker.c:165
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4760
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define WL_TIMEOUT
Definition: latch.h:128
void ProcessConfigFile(GucContext context)
static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepRelation *remoterel)
Definition: worker.c:1511
static void apply_handle_insert(StringInfo s)
Definition: worker.c:1182
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:543
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
uint32 TransactionId
Definition: c.h:587
void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
Definition: sharedfileset.c:64
static dlist_head lsn_mapping
Definition: worker.c:130
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:852
MemoryContext hcxt
Definition: hsearch.h:86
#define DatumGetInt32(X)
Definition: postgres.h:516
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:650
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:3105
#define RelationGetDescr(relation)
Definition: rel.h:483
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:587
static void apply_handle_update_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, LogicalRepRelMapEntry *relmapentry)
Definition: worker.c:1388
dlist_node node
Definition: worker.c:125
#define write(a, b, c)
Definition: win32.h:14
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:2763
static void apply_handle_commit_internal(StringInfo s, LogicalRepCommitData *commit_data)
Definition: worker.c:1095
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:864
int64 timestamp
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:60
int64 TimestampTz
Definition: timestamp.h:39
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1188
LogicalRepRelMapEntry * rel
Definition: worker.c:134
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:409
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:344
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
void CommitTransactionCommand(void)
Definition: xact.c:2939
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:2078
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:366
void BufFileTruncateShared(BufFile *file, int fileno, off_t offset)
Definition: buffile.c:861
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:419
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:425
Expr * expression_planner(Expr *expr)
Definition: planner.c:5653
#define walrcv_server_version(conn)
Definition: walreceiver.h:411
int maplen
Definition: attmap.h:37
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define AccessShareLock
Definition: lockdefs.h:36
TimestampTz last_send_time
uint16 RepOriginId
Definition: xlogdefs.h:65
Size entrysize
Definition: hsearch.h:76
XLogRecPtr last_lsn
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:785
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:415
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1203
XLogRecPtr remote_end
Definition: worker.c:127
int errcode(int sqlerrcode)
Definition: elog.c:698
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:82
char * format_type_be(Oid type_oid)
Definition: format_type.c:339
CmdType operation
Definition: execnodes.h:1184
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:447
Datum * tts_values
Definition: tuptable.h:126
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:143
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8589
void PopActiveSnapshot(void)
Definition: snapmgr.c:759
#define TRUNCATE_REL_CONTEXT_CASCADING
Definition: tablecmds.h:30
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:643
StringInfoData * colvalues
Definition: logicalproto.h:71
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:2756
void pgstat_report_stat(bool disconnect)
Definition: pgstat.c:846
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
EState * state
Definition: execnodes.h:966
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1068
List * es_range_table
Definition: execnodes.h:558
#define LOG
Definition: elog.h:26
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
Form_pg_class rd_rel
Definition: rel.h:110
unsigned int Oid
Definition: postgres_ext.h:31
TransactionId subxact_last
Definition: worker.c:193
struct SlotErrCallbackArg SlotErrCallbackArg
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:913
bool IsLogicalWorker(void)
Definition: worker.c:3142
void(* callback)(void *arg)
Definition: elog.h:247
int wal_receiver_status_interval
Definition: walreceiver.c:89
List * lappend_oid(List *list, Oid datum)
Definition: list.c:372
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1711
struct ErrorContextCallback * previous
Definition: elog.h:246
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:250
#define OidIsValid(objectId)
Definition: c.h:710
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:83
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
static int fd(const char *x, int i)
Definition: preproc-init.c:105
uint32 nsubxacts
Definition: worker.c:191
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:209
void BufFileClose(BufFile *file)
Definition: buffile.c:395
void ResetLatch(Latch *latch)
Definition: latch.c:660
int wal_receiver_timeout
Definition: walreceiver.c:90
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:2015
Definition: attmap.h:34
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:156
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2834
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:164
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
ErrorContextCallback * error_context_stack
Definition: elog.c:93
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:413
#define list_make1(x1)
Definition: pg_list.h:206
#define NAMEDATALEN
void FreeExecutorState(EState *estate)
Definition: execUtils.c:186
Subscription * GetSubscription(Oid subid, bool missing_ok)
#define GetPerTupleExprContext(estate)
Definition: executor.h:532
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:171
static StringInfoData reply_message
Definition: walreceiver.c:118
Definition: dynahash.c:219
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:2515
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:81
void pfree(void *pointer)
Definition: mcxt.c:1169
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:515
SubXactInfo * subxacts
Definition: worker.c:194
static void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:2776
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
#define ERROR
Definition: elog.h:46
PlanState ps
Definition: execnodes.h:1183
static bool ensure_transaction(void)
Definition: worker.c:279
struct ApplySubXactData ApplySubXactData
LogicalRepRelation remoterel
#define NAPTIME_PER_CYCLE
Definition: worker.c:121
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_extra, List *relids_logged, DropBehavior behavior, bool restart_seqs)
Definition: tablecmds.c:1748
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
static void * list_nth(const List *list, int n)
Definition: pg_list.h:278
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:835
bool in_remote_transaction
Definition: worker.c:164
void fill_extraUpdatedCols(RangeTblEntry *target_rte, Relation target_relation)
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:481
Definition: guc.h:75
#define MAXPGPATH
bool in_streamed_transaction
Definition: worker.c:168
TupleTableSlot * ri_PartitionTupleSlot
Definition: execnodes.h:513
XLogRecPtr reply_lsn
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:619
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:354
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
Definition: fmgr.c:1662
static bool am_tablesync_worker(void)
static void apply_handle_delete(StringInfo s)
Definition: worker.c:1449
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
#define DEBUG2
Definition: elog.h:24
BufFile * BufFileCreateShared(SharedFileSet *fileset, const char *name)
Definition: buffile.c:262
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:102
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:155
char * c
void logicalrep_worker_attach(int slot)
Definition: launcher.c:570
#define NoLock
Definition: lockdefs.h:34
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:8103
static char * buf
Definition: pg_test_fsync.c:68
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:680
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:156
bool * tts_isnull
Definition: tuptable.h:128
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:315
List * es_opened_result_relations
Definition: execnodes.h:576
#define RowExclusiveLock
Definition: lockdefs.h:38
int errcode_for_file_access(void)
Definition: elog.c:721
#define SIGHUP
Definition: win32_port.h:159
#define InvalidTransactionId
Definition: transam.h:31
SharedFileSet * stream_fileset
Definition: worker.c:149
#define RelationGetRelationName(relation)
Definition: rel.h:491
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
XLogRecPtr startpoint
Definition: walreceiver.h:170
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:203
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:500
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
unsigned int uint32
Definition: c.h:441
int pgsocket
Definition: port.h:31
List * publications
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:309
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
static void apply_handle_begin(StringInfo s)
Definition: worker.c:726
RepOriginId replorigin_create(char *roname)
Definition: origin.c:240
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:272
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel, ResultRelInfo **resultRelInfo)
Definition: worker.c:346
Oid get_atttype(Oid relid, AttrNumber attnum)
Definition: lsyscache.c:938
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition: lsyscache.c:2887
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2821
AttrMap * attrMap
Definition: tupconvert.h:28
BufFile * BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode)
Definition: buffile.c:284
MemoryContext TopMemoryContext
Definition: mcxt.c:48
EState * CreateExecutorState(void)
Definition: execUtils.c:90
List * lappend_int(List *list, int datum)
Definition: list.c:354
Definition: guc.h:72
int my_log2(long num)
Definition: dynahash.c:1765
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:1260
List * lappend(List *list, void *datum)
Definition: list.c:336
TransactionId xid
Definition: worker.c:183
MemoryContext ApplyContext
Definition: worker.c:154
static char ** options
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
XLogRecPtr final_lsn
Definition: logicalproto.h:113
#define DLIST_STATIC_INIT(name)
Definition: ilist.h:248
static void apply_handle_commit(StringInfo s)
Definition: worker.c:745
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2413
static void apply_handle_tuple_routing(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, LogicalRepRelMapEntry *relmapentry, CmdType operation)
Definition: worker.c:1585
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:67
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
#define HASH_BLOBS
Definition: hsearch.h:97
Node * build_column_default(Relation rel, int attrno)
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:962
List * es_tupleTable
Definition: execnodes.h:600
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1434
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1191
char * s2
static void apply_handle_update(StringInfo s)
Definition: worker.c:1294
uintptr_t Datum
Definition: postgres.h:411
void CommandCounterIncrement(void)
Definition: xact.c:1021
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition: execMain.c:1193
#define PGINVALID_SOCKET
Definition: port.h:33
Size keysize
Definition: hsearch.h:75
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:1556
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5694
Plan * plan
Definition: execnodes.h:964
static void apply_handle_relation(StringInfo s)
Definition: worker.c:1130
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition: relation.c:629
int16 attnum
Definition: pg_attribute.h:83
SharedFileSet * subxact_fileset
Definition: worker.c:150
#define ereport(elevel,...)
Definition: elog.h:157
Bitmapset * updatedCols
Definition: parsenodes.h:1149
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:33
XLogRecPtr local_end
Definition: worker.c:126
static MemoryContext LogicalStreamingContext
Definition: worker.c:157
struct SubXactInfo SubXactInfo
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
static HTAB * xidhash
Definition: worker.c:176
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
Definition: buffile.c:743
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4520
static void apply_dispatch(StringInfo s)
Definition: worker.c:1928
static void maybe_reread_subscription(void)
Definition: worker.c:2417
#define makeNode(_type_)
Definition: nodes.h:587
TimestampTz last_recv_time
static MemoryContext ApplyMessageContext
Definition: worker.c:153
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:689
void SharedFileSetDeleteAll(SharedFileSet *fileset)
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:177
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
#define lfirst(lc)
Definition: pg_list.h:169
RepOriginId replorigin_session_origin
Definition: origin.c:154
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:600
static void apply_handle_origin(StringInfo s)
Definition: worker.c:767
void StartTransactionCommand(void)
Definition: xact.c:2838
AttrNumber * attnums
Definition: attmap.h:36
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
LogicalRepMsgType
Definition: logicalproto.h:46
size_t Size
Definition: c.h:540
static void slot_store_error_callback(void *arg)
Definition: worker.c:469
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:219
bool IsTransactionState(void)
Definition: xact.c:371
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:736
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:421
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:95
int WalWriterDelay
Definition: walwriter.c:70
bool MySubscriptionValid
Definition: worker.c:162
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1182
off_t offset
Definition: worker.c:185
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:537
void FreeSubscription(Subscription *sub)
static void finish_estate(EState *estate)
Definition: worker.c:395
char * logicalrep_typmap_gettypname(Oid remoteid)
Definition: relation.c:501
RTEKind rtekind
Definition: parsenodes.h:995
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:213
TransactionId xid
Definition: worker.c:148
static void subxact_info_write(Oid subid, TransactionId xid)
Definition: worker.c:2530
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4540
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:833
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition: proto.c:802
void * palloc(Size size)
Definition: mcxt.c:1062
TupleConversionMap * ri_RootToPartitionMap
Definition: execnodes.h:512
int errmsg(const char *fmt,...)
Definition: elog.c:909
void pgstat_report_activity(BackendState state, const char *cmd_str)
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1286
static void stream_open_file(Oid subid, TransactionId xid, bool first)
Definition: worker.c:2822
static ApplySubXactData subxact_data
Definition: worker.c:197
static BufFile * stream_fd
Definition: worker.c:179
uint32 nsubxacts_max
Definition: worker.c:192
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:2094
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
#define elog(elevel,...)
Definition: elog.h:232
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
int i
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:543
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
#define errcontext
Definition: elog.h:204
static void cleanup_subxact_info(void)
Definition: worker.c:2945
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1697
void * arg
struct Latch * MyLatch
Definition: globals.c:57
struct FlushPosition FlushPosition
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:504
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:123
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:102
union WalRcvStreamOptions::@103 proto
#define TRUNCATE_REL_CONTEXT_NORMAL
Definition: tablecmds.h:28
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:761
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
void BufFileWrite(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:586
#define TransactionIdIsValid(xid)
Definition: transam.h:41
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389
Definition: pg_list.h:50
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1899
#define snprintf
Definition: port.h:216
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1644
#define WL_LATCH_SET
Definition: latch.h:125
#define RelationGetRelid(relation)
Definition: rel.h:457
static TransactionId stream_xid
Definition: worker.c:170
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
CmdType
Definition: nodes.h:682
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:231
#define die(msg)
Definition: pg_test_fsync.c:97
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4739
struct StreamXidHash StreamXidHash
TimestampTz committime
Definition: logicalproto.h:122
void logicalrep_typmap_update(LogicalRepTyp *remotetyp)
Definition: relation.c:467
void ApplyWorkerMain(Datum main_arg)
Definition: worker.c:2958
void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid)
Definition: proto.c:894
uint32 LogicalRepRelId
Definition: logicalproto.h:85
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1552
#define lfirst_oid(lc)
Definition: pg_list.h:171
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
TimestampTz reply_time
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:227
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:264
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
static void apply_handle_insert_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot)
Definition: worker.c:1243
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5723
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:1810
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:401
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)