PostgreSQL Source Code  git master
worker.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * worker.c
3  * PostgreSQL logical replication worker (apply)
4  *
5  * Copyright (c) 2016-2020, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/worker.c
9  *
10  * NOTES
11  * This file contains the worker which applies logical changes as they come
12  * from remote logical replication stream.
13  *
14  * The main worker (apply) is started by logical replication worker
15  * launcher for every enabled subscription in a database. It uses
16  * walsender protocol to communicate with publisher.
17  *
18  * This module includes server facing code and shares libpqwalreceiver
19  * module with walreceiver for providing the libpq specific functionality.
20  *
21  *-------------------------------------------------------------------------
22  */
23 
24 #include "postgres.h"
25 
26 #include "access/table.h"
27 #include "access/tableam.h"
28 #include "access/xact.h"
29 #include "access/xlog_internal.h"
30 #include "catalog/catalog.h"
31 #include "catalog/namespace.h"
34 #include "commands/tablecmds.h"
35 #include "commands/trigger.h"
36 #include "executor/executor.h"
38 #include "funcapi.h"
39 #include "libpq/pqformat.h"
40 #include "libpq/pqsignal.h"
41 #include "mb/pg_wchar.h"
42 #include "miscadmin.h"
43 #include "nodes/makefuncs.h"
44 #include "optimizer/optimizer.h"
45 #include "parser/analyze.h"
46 #include "parser/parse_relation.h"
47 #include "pgstat.h"
48 #include "postmaster/bgworker.h"
49 #include "postmaster/interrupt.h"
50 #include "postmaster/postmaster.h"
51 #include "postmaster/walwriter.h"
52 #include "replication/decode.h"
53 #include "replication/logical.h"
57 #include "replication/origin.h"
59 #include "replication/snapbuild.h"
62 #include "rewrite/rewriteHandler.h"
63 #include "storage/bufmgr.h"
64 #include "storage/ipc.h"
65 #include "storage/lmgr.h"
66 #include "storage/proc.h"
67 #include "storage/procarray.h"
68 #include "tcop/tcopprot.h"
69 #include "utils/builtins.h"
70 #include "utils/catcache.h"
71 #include "utils/datum.h"
72 #include "utils/fmgroids.h"
73 #include "utils/guc.h"
74 #include "utils/inval.h"
75 #include "utils/lsyscache.h"
76 #include "utils/memutils.h"
77 #include "utils/rel.h"
78 #include "utils/syscache.h"
79 #include "utils/timeout.h"
80 
81 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
82 
83 typedef struct FlushPosition
84 {
89 
91 
92 typedef struct SlotErrCallbackArg
93 {
98 
101 
103 
105 bool MySubscriptionValid = false;
106 
109 
110 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
111 
112 static void store_flush_position(XLogRecPtr remote_lsn);
113 
114 static void maybe_reread_subscription(void);
115 
116 static void apply_handle_insert_internal(ResultRelInfo *relinfo,
117  EState *estate, TupleTableSlot *remoteslot);
118 static void apply_handle_update_internal(ResultRelInfo *relinfo,
119  EState *estate, TupleTableSlot *remoteslot,
120  LogicalRepTupleData *newtup,
121  LogicalRepRelMapEntry *relmapentry);
122 static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
123  TupleTableSlot *remoteslot,
124  LogicalRepRelation *remoterel);
125 
126 /*
127  * Should this worker apply changes for given relation.
128  *
129  * This is mainly needed for initial relation data sync as that runs in
130  * separate worker process running in parallel and we need some way to skip
131  * changes coming to the main apply worker during the sync of a table.
132  *
133  * Note we need to do smaller or equals comparison for SYNCDONE state because
134  * it might hold position of end of initial slot consistent point WAL
135  * record + 1 (ie start of next record) and next record can be COMMIT of
136  * transaction we are now processing (which is what we set remote_final_lsn
137  * to in apply_handle_begin).
138  */
139 static bool
141 {
142  if (am_tablesync_worker())
143  return MyLogicalRepWorker->relid == rel->localreloid;
144  else
145  return (rel->state == SUBREL_STATE_READY ||
146  (rel->state == SUBREL_STATE_SYNCDONE &&
147  rel->statelsn <= remote_final_lsn));
148 }
149 
150 /*
151  * Make sure that we started local transaction.
152  *
153  * Also switches to ApplyMessageContext as necessary.
154  */
155 static bool
157 {
158  if (IsTransactionState())
159  {
161 
162  if (CurrentMemoryContext != ApplyMessageContext)
163  MemoryContextSwitchTo(ApplyMessageContext);
164 
165  return false;
166  }
167 
170 
172 
173  MemoryContextSwitchTo(ApplyMessageContext);
174  return true;
175 }
176 
177 
178 /*
179  * Executor state preparation for evaluation of constraint expressions,
180  * indexes and triggers.
181  *
182  * This is based on similar code in copy.c
183  */
184 static EState *
186 {
187  EState *estate;
188  ResultRelInfo *resultRelInfo;
189  RangeTblEntry *rte;
190 
191  estate = CreateExecutorState();
192 
193  rte = makeNode(RangeTblEntry);
194  rte->rtekind = RTE_RELATION;
195  rte->relid = RelationGetRelid(rel->localrel);
196  rte->relkind = rel->localrel->rd_rel->relkind;
198  ExecInitRangeTable(estate, list_make1(rte));
199 
200  resultRelInfo = makeNode(ResultRelInfo);
201  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
202 
203  estate->es_result_relations = resultRelInfo;
204  estate->es_num_result_relations = 1;
205  estate->es_result_relation_info = resultRelInfo;
206 
207  estate->es_output_cid = GetCurrentCommandId(true);
208 
209  /* Prepare to catch AFTER triggers. */
211 
212  return estate;
213 }
214 
215 /*
216  * Executes default values for columns for which we can't map to remote
217  * relation columns.
218  *
219  * This allows us to support tables which have more columns on the downstream
220  * than on the upstream.
221  */
222 static void
224  TupleTableSlot *slot)
225 {
226  TupleDesc desc = RelationGetDescr(rel->localrel);
227  int num_phys_attrs = desc->natts;
228  int i;
229  int attnum,
230  num_defaults = 0;
231  int *defmap;
232  ExprState **defexprs;
233  ExprContext *econtext;
234 
235  econtext = GetPerTupleExprContext(estate);
236 
237  /* We got all the data via replication, no need to evaluate anything. */
238  if (num_phys_attrs == rel->remoterel.natts)
239  return;
240 
241  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
242  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
243 
244  Assert(rel->attrmap->maplen == num_phys_attrs);
245  for (attnum = 0; attnum < num_phys_attrs; attnum++)
246  {
247  Expr *defexpr;
248 
249  if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
250  continue;
251 
252  if (rel->attrmap->attnums[attnum] >= 0)
253  continue;
254 
255  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
256 
257  if (defexpr != NULL)
258  {
259  /* Run the expression through planner */
260  defexpr = expression_planner(defexpr);
261 
262  /* Initialize executable expression in copycontext */
263  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
264  defmap[num_defaults] = attnum;
265  num_defaults++;
266  }
267 
268  }
269 
270  for (i = 0; i < num_defaults; i++)
271  slot->tts_values[defmap[i]] =
272  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
273 }
274 
275 /*
276  * Error callback to give more context info about type conversion failure.
277  */
278 static void
280 {
281  SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
283  char *remotetypname;
284  Oid remotetypoid,
285  localtypoid;
286 
287  /* Nothing to do if remote attribute number is not set */
288  if (errarg->remote_attnum < 0)
289  return;
290 
291  rel = errarg->rel;
292  remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum];
293 
294  /* Fetch remote type name from the LogicalRepTypMap cache */
295  remotetypname = logicalrep_typmap_gettypname(remotetypoid);
296 
297  /* Fetch local type OID from the local sys cache */
298  localtypoid = get_atttype(rel->localreloid, errarg->local_attnum + 1);
299 
300  errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
301  "remote type %s, local type %s",
302  rel->remoterel.nspname, rel->remoterel.relname,
303  rel->remoterel.attnames[errarg->remote_attnum],
304  remotetypname,
305  format_type_be(localtypoid));
306 }
307 
308 /*
309  * Store data in C string form into slot.
310  * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
311  * use better.
312  */
313 static void
315  char **values)
316 {
317  int natts = slot->tts_tupleDescriptor->natts;
318  int i;
319  SlotErrCallbackArg errarg;
320  ErrorContextCallback errcallback;
321 
322  ExecClearTuple(slot);
323 
324  /* Push callback + info on the error context stack */
325  errarg.rel = rel;
326  errarg.local_attnum = -1;
327  errarg.remote_attnum = -1;
328  errcallback.callback = slot_store_error_callback;
329  errcallback.arg = (void *) &errarg;
330  errcallback.previous = error_context_stack;
331  error_context_stack = &errcallback;
332 
333  /* Call the "in" function for each non-dropped attribute */
334  Assert(natts == rel->attrmap->maplen);
335  for (i = 0; i < natts; i++)
336  {
338  int remoteattnum = rel->attrmap->attnums[i];
339 
340  if (!att->attisdropped && remoteattnum >= 0 &&
341  values[remoteattnum] != NULL)
342  {
343  Oid typinput;
344  Oid typioparam;
345 
346  errarg.local_attnum = i;
347  errarg.remote_attnum = remoteattnum;
348 
349  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
350  slot->tts_values[i] =
351  OidInputFunctionCall(typinput, values[remoteattnum],
352  typioparam, att->atttypmod);
353  slot->tts_isnull[i] = false;
354 
355  errarg.local_attnum = -1;
356  errarg.remote_attnum = -1;
357  }
358  else
359  {
360  /*
361  * We assign NULL to dropped attributes, NULL values, and missing
362  * values (missing values should be later filled using
363  * slot_fill_defaults).
364  */
365  slot->tts_values[i] = (Datum) 0;
366  slot->tts_isnull[i] = true;
367  }
368  }
369 
370  /* Pop the error context stack */
371  error_context_stack = errcallback.previous;
372 
373  ExecStoreVirtualTuple(slot);
374 }
375 
376 /*
377  * Replace selected columns with user data provided as C strings.
378  * This is somewhat similar to heap_modify_tuple but also calls the type
379  * input functions on the user data.
380  * "slot" is filled with a copy of the tuple in "srcslot", with
381  * columns selected by the "replaces" array replaced with data values
382  * from "values".
383  * Caution: unreplaced pass-by-ref columns in "slot" will point into the
384  * storage for "srcslot". This is OK for current usage, but someday we may
385  * need to materialize "slot" at the end to make it independent of "srcslot".
386  */
387 static void
390  char **values, bool *replaces)
391 {
392  int natts = slot->tts_tupleDescriptor->natts;
393  int i;
394  SlotErrCallbackArg errarg;
395  ErrorContextCallback errcallback;
396 
397  /* We'll fill "slot" with a virtual tuple, so we must start with ... */
398  ExecClearTuple(slot);
399 
400  /*
401  * Copy all the column data from srcslot, so that we'll have valid values
402  * for unreplaced columns.
403  */
404  Assert(natts == srcslot->tts_tupleDescriptor->natts);
405  slot_getallattrs(srcslot);
406  memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
407  memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
408 
409  /* For error reporting, push callback + info on the error context stack */
410  errarg.rel = rel;
411  errarg.local_attnum = -1;
412  errarg.remote_attnum = -1;
413  errcallback.callback = slot_store_error_callback;
414  errcallback.arg = (void *) &errarg;
415  errcallback.previous = error_context_stack;
416  error_context_stack = &errcallback;
417 
418  /* Call the "in" function for each replaced attribute */
419  Assert(natts == rel->attrmap->maplen);
420  for (i = 0; i < natts; i++)
421  {
423  int remoteattnum = rel->attrmap->attnums[i];
424 
425  if (remoteattnum < 0)
426  continue;
427 
428  if (!replaces[remoteattnum])
429  continue;
430 
431  if (values[remoteattnum] != NULL)
432  {
433  Oid typinput;
434  Oid typioparam;
435 
436  errarg.local_attnum = i;
437  errarg.remote_attnum = remoteattnum;
438 
439  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
440  slot->tts_values[i] =
441  OidInputFunctionCall(typinput, values[remoteattnum],
442  typioparam, att->atttypmod);
443  slot->tts_isnull[i] = false;
444 
445  errarg.local_attnum = -1;
446  errarg.remote_attnum = -1;
447  }
448  else
449  {
450  slot->tts_values[i] = (Datum) 0;
451  slot->tts_isnull[i] = true;
452  }
453  }
454 
455  /* Pop the error context stack */
456  error_context_stack = errcallback.previous;
457 
458  /* And finally, declare that "slot" contains a valid virtual tuple */
459  ExecStoreVirtualTuple(slot);
460 }
461 
462 /*
463  * Handle BEGIN message.
464  */
465 static void
467 {
468  LogicalRepBeginData begin_data;
469 
470  logicalrep_read_begin(s, &begin_data);
471 
472  remote_final_lsn = begin_data.final_lsn;
473 
474  in_remote_transaction = true;
475 
477 }
478 
479 /*
480  * Handle COMMIT message.
481  *
482  * TODO, support tracking of multiple origins
483  */
484 static void
486 {
487  LogicalRepCommitData commit_data;
488 
489  logicalrep_read_commit(s, &commit_data);
490 
491  Assert(commit_data.commit_lsn == remote_final_lsn);
492 
493  /* The synchronization worker runs in single transaction. */
495  {
496  /*
497  * Update origin state so we can restart streaming from correct
498  * position in case of crash.
499  */
502 
504  pgstat_report_stat(false);
505 
506  store_flush_position(commit_data.end_lsn);
507  }
508  else
509  {
510  /* Process any invalidation messages that might have accumulated. */
513  }
514 
515  in_remote_transaction = false;
516 
517  /* Process any tables that are being synchronized in parallel. */
518  process_syncing_tables(commit_data.end_lsn);
519 
521 }
522 
523 /*
524  * Handle ORIGIN message.
525  *
526  * TODO, support tracking of multiple origins
527  */
528 static void
530 {
531  /*
532  * ORIGIN message can only come inside remote transaction and before any
533  * actual writes.
534  */
535  if (!in_remote_transaction ||
537  ereport(ERROR,
538  (errcode(ERRCODE_PROTOCOL_VIOLATION),
539  errmsg("ORIGIN message sent out of order")));
540 }
541 
542 /*
543  * Handle RELATION message.
544  *
545  * Note we don't do validation against local schema here. The validation
546  * against local schema is postponed until first change for given relation
547  * comes as we only care about it when applying changes for it anyway and we
548  * do less locking this way.
549  */
550 static void
552 {
553  LogicalRepRelation *rel;
554 
555  rel = logicalrep_read_rel(s);
557 }
558 
559 /*
560  * Handle TYPE message.
561  *
562  * Note we don't do local mapping here, that's done when the type is
563  * actually used.
564  */
565 static void
567 {
568  LogicalRepTyp typ;
569 
570  logicalrep_read_typ(s, &typ);
572 }
573 
574 /*
575  * Get replica identity index or if it is not defined a primary key.
576  *
577  * If neither is defined, returns InvalidOid
578  */
579 static Oid
581 {
582  Oid idxoid;
583 
584  idxoid = RelationGetReplicaIndex(rel);
585 
586  if (!OidIsValid(idxoid))
587  idxoid = RelationGetPrimaryKeyIndex(rel);
588 
589  return idxoid;
590 }
591 
592 /*
593  * Handle INSERT message.
594  */
595 
596 static void
598 {
600  LogicalRepTupleData newtup;
601  LogicalRepRelId relid;
602  EState *estate;
603  TupleTableSlot *remoteslot;
604  MemoryContext oldctx;
605 
607 
608  relid = logicalrep_read_insert(s, &newtup);
611  {
612  /*
613  * The relation can't become interesting in the middle of the
614  * transaction so it's safe to unlock it.
615  */
617  return;
618  }
619 
620  /* Initialize the executor state. */
621  estate = create_estate_for_relation(rel);
622  remoteslot = ExecInitExtraTupleSlot(estate,
624  &TTSOpsVirtual);
625 
626  /* Input functions may need an active snapshot, so get one */
628 
629  /* Process and store remote tuple in the slot */
631  slot_store_cstrings(remoteslot, rel, newtup.values);
632  slot_fill_defaults(rel, estate, remoteslot);
633  MemoryContextSwitchTo(oldctx);
634 
635  Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
637  remoteslot);
638 
640 
641  /* Handle queued AFTER triggers. */
642  AfterTriggerEndQuery(estate);
643 
644  ExecResetTupleTable(estate->es_tupleTable, false);
645  FreeExecutorState(estate);
646 
648 
650 }
651 
652 /* Workhorse for apply_handle_insert() */
653 static void
655  EState *estate, TupleTableSlot *remoteslot)
656 {
657  ExecOpenIndices(relinfo, false);
658 
659  /* Do the insert. */
660  ExecSimpleRelationInsert(estate, remoteslot);
661 
662  /* Cleanup. */
663  ExecCloseIndices(relinfo);
664 }
665 
666 /*
667  * Check if the logical replication relation is updatable and throw
668  * appropriate error if it isn't.
669  */
670 static void
672 {
673  /* Updatable, no error. */
674  if (rel->updatable)
675  return;
676 
677  /*
678  * We are in error mode so it's fine this is somewhat slow. It's better to
679  * give user correct error.
680  */
682  {
683  ereport(ERROR,
684  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
685  errmsg("publisher did not send replica identity column "
686  "expected by the logical replication target relation \"%s.%s\"",
687  rel->remoterel.nspname, rel->remoterel.relname)));
688  }
689 
690  ereport(ERROR,
691  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
692  errmsg("logical replication target relation \"%s.%s\" has "
693  "neither REPLICA IDENTITY index nor PRIMARY "
694  "KEY and published relation does not have "
695  "REPLICA IDENTITY FULL",
696  rel->remoterel.nspname, rel->remoterel.relname)));
697 }
698 
699 /*
700  * Handle UPDATE message.
701  *
702  * TODO: FDW support
703  */
704 static void
706 {
708  LogicalRepRelId relid;
709  EState *estate;
710  LogicalRepTupleData oldtup;
711  LogicalRepTupleData newtup;
712  bool has_oldtup;
713  TupleTableSlot *remoteslot;
714  RangeTblEntry *target_rte;
715  MemoryContext oldctx;
716 
718 
719  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
720  &newtup);
723  {
724  /*
725  * The relation can't become interesting in the middle of the
726  * transaction so it's safe to unlock it.
727  */
729  return;
730  }
731 
732  /* Check if we can do the update. */
734 
735  /* Initialize the executor state. */
736  estate = create_estate_for_relation(rel);
737  remoteslot = ExecInitExtraTupleSlot(estate,
739  &TTSOpsVirtual);
740 
741  /*
742  * Populate updatedCols so that per-column triggers can fire. This could
743  * include more columns than were actually changed on the publisher
744  * because the logical replication protocol doesn't contain that
745  * information. But it would for example exclude columns that only exist
746  * on the subscriber, since we are not touching those.
747  */
748  target_rte = list_nth(estate->es_range_table, 0);
749  for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
750  {
751  if (newtup.changed[i])
752  target_rte->updatedCols = bms_add_member(target_rte->updatedCols,
754  }
755 
757 
759 
760  /* Build the search tuple. */
762  slot_store_cstrings(remoteslot, rel,
763  has_oldtup ? oldtup.values : newtup.values);
764  MemoryContextSwitchTo(oldctx);
765 
766  Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
768  remoteslot, &newtup, rel);
769 
771 
772  /* Handle queued AFTER triggers. */
773  AfterTriggerEndQuery(estate);
774 
775  ExecResetTupleTable(estate->es_tupleTable, false);
776  FreeExecutorState(estate);
777 
779 
781 }
782 
783 /* Workhorse for apply_handle_update() */
784 static void
786  EState *estate, TupleTableSlot *remoteslot,
787  LogicalRepTupleData *newtup,
788  LogicalRepRelMapEntry *relmapentry)
789 {
790  Relation localrel = relinfo->ri_RelationDesc;
791  Oid idxoid;
792  EPQState epqstate;
793  TupleTableSlot *localslot;
794  bool found;
795  MemoryContext oldctx;
796 
797  localslot = table_slot_create(localrel, &estate->es_tupleTable);
798  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
799 
800  ExecOpenIndices(relinfo, false);
801 
802  /*
803  * Try to find tuple using either replica identity index, primary key or
804  * if needed, sequential scan.
805  */
806  idxoid = GetRelationIdentityOrPK(localrel);
807  Assert(OidIsValid(idxoid) ||
808  (relmapentry->remoterel.replident == REPLICA_IDENTITY_FULL));
809 
810  if (OidIsValid(idxoid))
811  found = RelationFindReplTupleByIndex(localrel, idxoid,
813  remoteslot, localslot);
814  else
816  remoteslot, localslot);
817 
818  ExecClearTuple(remoteslot);
819 
820  /*
821  * Tuple found.
822  *
823  * Note this will fail if there are other conflicting unique indexes.
824  */
825  if (found)
826  {
827  /* Process and store remote tuple in the slot */
829  slot_modify_cstrings(remoteslot, localslot, relmapentry,
830  newtup->values, newtup->changed);
831  MemoryContextSwitchTo(oldctx);
832 
833  EvalPlanQualSetSlot(&epqstate, remoteslot);
834 
835  /* Do the actual update. */
836  ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
837  }
838  else
839  {
840  /*
841  * The tuple to be updated could not be found.
842  *
843  * TODO what to do here, change the log level to LOG perhaps?
844  */
845  elog(DEBUG1,
846  "logical replication did not find row for update "
847  "in replication target relation \"%s\"",
848  RelationGetRelationName(localrel));
849  }
850 
851  /* Cleanup. */
852  ExecCloseIndices(relinfo);
853  EvalPlanQualEnd(&epqstate);
854 }
855 
856 /*
857  * Handle DELETE message.
858  *
859  * TODO: FDW support
860  */
861 static void
863 {
865  LogicalRepTupleData oldtup;
866  LogicalRepRelId relid;
867  EState *estate;
868  TupleTableSlot *remoteslot;
869  MemoryContext oldctx;
870 
872 
873  relid = logicalrep_read_delete(s, &oldtup);
876  {
877  /*
878  * The relation can't become interesting in the middle of the
879  * transaction so it's safe to unlock it.
880  */
882  return;
883  }
884 
885  /* Check if we can do the delete. */
887 
888  /* Initialize the executor state. */
889  estate = create_estate_for_relation(rel);
890  remoteslot = ExecInitExtraTupleSlot(estate,
892  &TTSOpsVirtual);
893 
895 
896  /* Build the search tuple. */
898  slot_store_cstrings(remoteslot, rel, oldtup.values);
899  MemoryContextSwitchTo(oldctx);
900 
901  Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
903  remoteslot, &rel->remoterel);
904 
906 
907  /* Handle queued AFTER triggers. */
908  AfterTriggerEndQuery(estate);
909 
910  ExecResetTupleTable(estate->es_tupleTable, false);
911  FreeExecutorState(estate);
912 
914 
916 }
917 
918 /* Workhorse for apply_handle_delete() */
919 static void
921  TupleTableSlot *remoteslot,
922  LogicalRepRelation *remoterel)
923 {
924  Relation localrel = relinfo->ri_RelationDesc;
925  Oid idxoid;
926  EPQState epqstate;
927  TupleTableSlot *localslot;
928  bool found;
929 
930  localslot = table_slot_create(localrel, &estate->es_tupleTable);
931  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
932 
933  ExecOpenIndices(relinfo, false);
934 
935  /*
936  * Try to find tuple using either replica identity index, primary key or
937  * if needed, sequential scan.
938  */
939  idxoid = GetRelationIdentityOrPK(localrel);
940  Assert(OidIsValid(idxoid) ||
941  (remoterel->replident == REPLICA_IDENTITY_FULL));
942 
943  if (OidIsValid(idxoid))
944  found = RelationFindReplTupleByIndex(localrel, idxoid,
946  remoteslot, localslot);
947  else
949  remoteslot, localslot);
950 
951  /* If found delete it. */
952  if (found)
953  {
954  EvalPlanQualSetSlot(&epqstate, localslot);
955 
956  /* Do the actual delete. */
957  ExecSimpleRelationDelete(estate, &epqstate, localslot);
958  }
959  else
960  {
961  /* The tuple to be deleted could not be found. */
962  elog(DEBUG1,
963  "logical replication could not find row for delete "
964  "in replication target relation \"%s\"",
965  RelationGetRelationName(localrel));
966  }
967 
968  /* Cleanup. */
969  ExecCloseIndices(relinfo);
970  EvalPlanQualEnd(&epqstate);
971 }
972 
973 /*
974  * Handle TRUNCATE message.
975  *
976  * TODO: FDW support
977  */
978 static void
980 {
981  bool cascade = false;
982  bool restart_seqs = false;
983  List *remote_relids = NIL;
984  List *remote_rels = NIL;
985  List *rels = NIL;
986  List *relids = NIL;
987  List *relids_logged = NIL;
988  ListCell *lc;
989 
991 
992  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
993 
994  foreach(lc, remote_relids)
995  {
996  LogicalRepRelId relid = lfirst_oid(lc);
998 
1000  if (!should_apply_changes_for_rel(rel))
1001  {
1002  /*
1003  * The relation can't become interesting in the middle of the
1004  * transaction so it's safe to unlock it.
1005  */
1007  continue;
1008  }
1009 
1010  remote_rels = lappend(remote_rels, rel);
1011  rels = lappend(rels, rel->localrel);
1012  relids = lappend_oid(relids, rel->localreloid);
1014  relids_logged = lappend_oid(relids_logged, rel->localreloid);
1015  }
1016 
1017  /*
1018  * Even if we used CASCADE on the upstream master we explicitly default to
1019  * replaying changes without further cascading. This might be later
1020  * changeable with a user specified option.
1021  */
1022  ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
1023 
1024  foreach(lc, remote_rels)
1025  {
1026  LogicalRepRelMapEntry *rel = lfirst(lc);
1027 
1029  }
1030 
1032 }
1033 
1034 
1035 /*
1036  * Logical replication protocol message dispatcher.
1037  */
1038 static void
1040 {
1041  char action = pq_getmsgbyte(s);
1042 
1043  switch (action)
1044  {
1045  /* BEGIN */
1046  case 'B':
1047  apply_handle_begin(s);
1048  break;
1049  /* COMMIT */
1050  case 'C':
1052  break;
1053  /* INSERT */
1054  case 'I':
1056  break;
1057  /* UPDATE */
1058  case 'U':
1060  break;
1061  /* DELETE */
1062  case 'D':
1064  break;
1065  /* TRUNCATE */
1066  case 'T':
1068  break;
1069  /* RELATION */
1070  case 'R':
1072  break;
1073  /* TYPE */
1074  case 'Y':
1075  apply_handle_type(s);
1076  break;
1077  /* ORIGIN */
1078  case 'O':
1080  break;
1081  default:
1082  ereport(ERROR,
1083  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1084  errmsg("invalid logical replication message type \"%c\"", action)));
1085  }
1086 }
1087 
1088 /*
1089  * Figure out which write/flush positions to report to the walsender process.
1090  *
1091  * We can't simply report back the last LSN the walsender sent us because the
1092  * local transaction might not yet be flushed to disk locally. Instead we
1093  * build a list that associates local with remote LSNs for every commit. When
1094  * reporting back the flush position to the sender we iterate that list and
1095  * check which entries on it are already locally flushed. Those we can report
1096  * as having been flushed.
1097  *
1098  * The have_pending_txes is true if there are outstanding transactions that
1099  * need to be flushed.
1100  */
1101 static void
1103  bool *have_pending_txes)
1104 {
1105  dlist_mutable_iter iter;
1106  XLogRecPtr local_flush = GetFlushRecPtr();
1107 
1108  *write = InvalidXLogRecPtr;
1109  *flush = InvalidXLogRecPtr;
1110 
1111  dlist_foreach_modify(iter, &lsn_mapping)
1112  {
1113  FlushPosition *pos =
1115 
1116  *write = pos->remote_end;
1117 
1118  if (pos->local_end <= local_flush)
1119  {
1120  *flush = pos->remote_end;
1121  dlist_delete(iter.cur);
1122  pfree(pos);
1123  }
1124  else
1125  {
1126  /*
1127  * Don't want to uselessly iterate over the rest of the list which
1128  * could potentially be long. Instead get the last element and
1129  * grab the write position from there.
1130  */
1132  &lsn_mapping);
1133  *write = pos->remote_end;
1134  *have_pending_txes = true;
1135  return;
1136  }
1137  }
1138 
1139  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
1140 }
1141 
1142 /*
1143  * Store current remote/local lsn pair in the tracking list.
1144  */
1145 static void
1147 {
1148  FlushPosition *flushpos;
1149 
1150  /* Need to do this in permanent context */
1151  MemoryContextSwitchTo(ApplyContext);
1152 
1153  /* Track commit lsn */
1154  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
1155  flushpos->local_end = XactLastCommitEnd;
1156  flushpos->remote_end = remote_lsn;
1157 
1158  dlist_push_tail(&lsn_mapping, &flushpos->node);
1159  MemoryContextSwitchTo(ApplyMessageContext);
1160 }
1161 
1162 
1163 /* Update statistics of the worker. */
1164 static void
1165 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
1166 {
1167  MyLogicalRepWorker->last_lsn = last_lsn;
1168  MyLogicalRepWorker->last_send_time = send_time;
1170  if (reply)
1171  {
1172  MyLogicalRepWorker->reply_lsn = last_lsn;
1173  MyLogicalRepWorker->reply_time = send_time;
1174  }
1175 }
1176 
1177 /*
1178  * Apply main loop.
1179  */
1180 static void
1182 {
1183  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1184 
1185  /*
1186  * Init the ApplyMessageContext which we clean up after each replication
1187  * protocol message.
1188  */
1189  ApplyMessageContext = AllocSetContextCreate(ApplyContext,
1190  "ApplyMessageContext",
1192 
1193  /* mark as idle, before starting to loop */
1195 
1196  for (;;)
1197  {
1199  int rc;
1200  int len;
1201  char *buf = NULL;
1202  bool endofstream = false;
1203  bool ping_sent = false;
1204  long wait_time;
1205 
1207 
1208  MemoryContextSwitchTo(ApplyMessageContext);
1209 
1210  len = walrcv_receive(wrconn, &buf, &fd);
1211 
1212  if (len != 0)
1213  {
1214  /* Process the data */
1215  for (;;)
1216  {
1218 
1219  if (len == 0)
1220  {
1221  break;
1222  }
1223  else if (len < 0)
1224  {
1225  ereport(LOG,
1226  (errmsg("data stream from publisher has ended")));
1227  endofstream = true;
1228  break;
1229  }
1230  else
1231  {
1232  int c;
1233  StringInfoData s;
1234 
1235  /* Reset timeout. */
1236  last_recv_timestamp = GetCurrentTimestamp();
1237  ping_sent = false;
1238 
1239  /* Ensure we are reading the data into our memory context. */
1240  MemoryContextSwitchTo(ApplyMessageContext);
1241 
1242  s.data = buf;
1243  s.len = len;
1244  s.cursor = 0;
1245  s.maxlen = -1;
1246 
1247  c = pq_getmsgbyte(&s);
1248 
1249  if (c == 'w')
1250  {
1251  XLogRecPtr start_lsn;
1252  XLogRecPtr end_lsn;
1253  TimestampTz send_time;
1254 
1255  start_lsn = pq_getmsgint64(&s);
1256  end_lsn = pq_getmsgint64(&s);
1257  send_time = pq_getmsgint64(&s);
1258 
1259  if (last_received < start_lsn)
1260  last_received = start_lsn;
1261 
1262  if (last_received < end_lsn)
1263  last_received = end_lsn;
1264 
1265  UpdateWorkerStats(last_received, send_time, false);
1266 
1267  apply_dispatch(&s);
1268  }
1269  else if (c == 'k')
1270  {
1271  XLogRecPtr end_lsn;
1273  bool reply_requested;
1274 
1275  end_lsn = pq_getmsgint64(&s);
1276  timestamp = pq_getmsgint64(&s);
1277  reply_requested = pq_getmsgbyte(&s);
1278 
1279  if (last_received < end_lsn)
1280  last_received = end_lsn;
1281 
1282  send_feedback(last_received, reply_requested, false);
1283  UpdateWorkerStats(last_received, timestamp, true);
1284  }
1285  /* other message types are purposefully ignored */
1286 
1287  MemoryContextReset(ApplyMessageContext);
1288  }
1289 
1290  len = walrcv_receive(wrconn, &buf, &fd);
1291  }
1292  }
1293 
1294  /* confirm all writes so far */
1295  send_feedback(last_received, false, false);
1296 
1297  if (!in_remote_transaction)
1298  {
1299  /*
1300  * If we didn't get any transactions for a while there might be
1301  * unconsumed invalidation messages in the queue, consume them
1302  * now.
1303  */
1306 
1307  /* Process any table synchronization changes. */
1308  process_syncing_tables(last_received);
1309  }
1310 
1311  /* Cleanup the memory. */
1312  MemoryContextResetAndDeleteChildren(ApplyMessageContext);
1314 
1315  /* Check if we need to exit the streaming loop. */
1316  if (endofstream)
1317  {
1318  TimeLineID tli;
1319 
1320  walrcv_endstreaming(wrconn, &tli);
1321  break;
1322  }
1323 
1324  /*
1325  * Wait for more data or latch. If we have unflushed transactions,
1326  * wake up after WalWriterDelay to see if they've been flushed yet (in
1327  * which case we should send a feedback message). Otherwise, there's
1328  * no particular urgency about waking up unless we get data or a
1329  * signal.
1330  */
1331  if (!dlist_is_empty(&lsn_mapping))
1332  wait_time = WalWriterDelay;
1333  else
1334  wait_time = NAPTIME_PER_CYCLE;
1335 
1339  fd, wait_time,
1341 
1342  if (rc & WL_LATCH_SET)
1343  {
1346  }
1347 
1348  if (ConfigReloadPending)
1349  {
1350  ConfigReloadPending = false;
1352  }
1353 
1354  if (rc & WL_TIMEOUT)
1355  {
1356  /*
1357  * We didn't receive anything new. If we haven't heard anything
1358  * from the server for more than wal_receiver_timeout / 2, ping
1359  * the server. Also, if it's been longer than
1360  * wal_receiver_status_interval since the last update we sent,
1361  * send a status update to the master anyway, to report any
1362  * progress in applying WAL.
1363  */
1364  bool requestReply = false;
1365 
1366  /*
1367  * Check if time since last receive from standby has reached the
1368  * configured limit.
1369  */
1370  if (wal_receiver_timeout > 0)
1371  {
1373  TimestampTz timeout;
1374 
1375  timeout =
1376  TimestampTzPlusMilliseconds(last_recv_timestamp,
1378 
1379  if (now >= timeout)
1380  ereport(ERROR,
1381  (errmsg("terminating logical replication worker due to timeout")));
1382 
1383  /*
1384  * We didn't receive anything new, for half of receiver
1385  * replication timeout. Ping the server.
1386  */
1387  if (!ping_sent)
1388  {
1389  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1390  (wal_receiver_timeout / 2));
1391  if (now >= timeout)
1392  {
1393  requestReply = true;
1394  ping_sent = true;
1395  }
1396  }
1397  }
1398 
1399  send_feedback(last_received, requestReply, requestReply);
1400  }
1401  }
1402 }
1403 
1404 /*
1405  * Send a Standby Status Update message to server.
1406  *
1407  * 'recvpos' is the latest LSN we've received data to, force is set if we need
1408  * to send a response to avoid timeouts.
1409  */
1410 static void
1411 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
1412 {
1413  static StringInfo reply_message = NULL;
1414  static TimestampTz send_time = 0;
1415 
1416  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1417  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1418  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1419 
1420  XLogRecPtr writepos;
1421  XLogRecPtr flushpos;
1422  TimestampTz now;
1423  bool have_pending_txes;
1424 
1425  /*
1426  * If the user doesn't want status to be reported to the publisher, be
1427  * sure to exit before doing anything at all.
1428  */
1429  if (!force && wal_receiver_status_interval <= 0)
1430  return;
1431 
1432  /* It's legal to not pass a recvpos */
1433  if (recvpos < last_recvpos)
1434  recvpos = last_recvpos;
1435 
1436  get_flush_position(&writepos, &flushpos, &have_pending_txes);
1437 
1438  /*
1439  * No outstanding transactions to flush, we can report the latest received
1440  * position. This is important for synchronous replication.
1441  */
1442  if (!have_pending_txes)
1443  flushpos = writepos = recvpos;
1444 
1445  if (writepos < last_writepos)
1446  writepos = last_writepos;
1447 
1448  if (flushpos < last_flushpos)
1449  flushpos = last_flushpos;
1450 
1451  now = GetCurrentTimestamp();
1452 
1453  /* if we've already reported everything we're good */
1454  if (!force &&
1455  writepos == last_writepos &&
1456  flushpos == last_flushpos &&
1457  !TimestampDifferenceExceeds(send_time, now,
1459  return;
1460  send_time = now;
1461 
1462  if (!reply_message)
1463  {
1464  MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
1465 
1466  reply_message = makeStringInfo();
1467  MemoryContextSwitchTo(oldctx);
1468  }
1469  else
1470  resetStringInfo(reply_message);
1471 
1472  pq_sendbyte(reply_message, 'r');
1473  pq_sendint64(reply_message, recvpos); /* write */
1474  pq_sendint64(reply_message, flushpos); /* flush */
1475  pq_sendint64(reply_message, writepos); /* apply */
1476  pq_sendint64(reply_message, now); /* sendTime */
1477  pq_sendbyte(reply_message, requestReply); /* replyRequested */
1478 
1479  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1480  force,
1481  (uint32) (recvpos >> 32), (uint32) recvpos,
1482  (uint32) (writepos >> 32), (uint32) writepos,
1483  (uint32) (flushpos >> 32), (uint32) flushpos
1484  );
1485 
1486  walrcv_send(wrconn, reply_message->data, reply_message->len);
1487 
1488  if (recvpos > last_recvpos)
1489  last_recvpos = recvpos;
1490  if (writepos > last_writepos)
1491  last_writepos = writepos;
1492  if (flushpos > last_flushpos)
1493  last_flushpos = flushpos;
1494 }
1495 
1496 /*
1497  * Reread subscription info if needed. Most changes will be exit.
1498  */
1499 static void
1501 {
1502  MemoryContext oldctx;
1504  bool started_tx = false;
1505 
1506  /* When cache state is valid there is nothing to do here. */
1507  if (MySubscriptionValid)
1508  return;
1509 
1510  /* This function might be called inside or outside of transaction. */
1511  if (!IsTransactionState())
1512  {
1514  started_tx = true;
1515  }
1516 
1517  /* Ensure allocations in permanent context. */
1518  oldctx = MemoryContextSwitchTo(ApplyContext);
1519 
1520  newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1521 
1522  /*
1523  * Exit if the subscription was removed. This normally should not happen
1524  * as the worker gets killed during DROP SUBSCRIPTION.
1525  */
1526  if (!newsub)
1527  {
1528  ereport(LOG,
1529  (errmsg("logical replication apply worker for subscription \"%s\" will "
1530  "stop because the subscription was removed",
1531  MySubscription->name)));
1532 
1533  proc_exit(0);
1534  }
1535 
1536  /*
1537  * Exit if the subscription was disabled. This normally should not happen
1538  * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
1539  */
1540  if (!newsub->enabled)
1541  {
1542  ereport(LOG,
1543  (errmsg("logical replication apply worker for subscription \"%s\" will "
1544  "stop because the subscription was disabled",
1545  MySubscription->name)));
1546 
1547  proc_exit(0);
1548  }
1549 
1550  /*
1551  * Exit if connection string was changed. The launcher will start new
1552  * worker.
1553  */
1554  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1555  {
1556  ereport(LOG,
1557  (errmsg("logical replication apply worker for subscription \"%s\" will "
1558  "restart because the connection information was changed",
1559  MySubscription->name)));
1560 
1561  proc_exit(0);
1562  }
1563 
1564  /*
1565  * Exit if subscription name was changed (it's used for
1566  * fallback_application_name). The launcher will start new worker.
1567  */
1568  if (strcmp(newsub->name, MySubscription->name) != 0)
1569  {
1570  ereport(LOG,
1571  (errmsg("logical replication apply worker for subscription \"%s\" will "
1572  "restart because subscription was renamed",
1573  MySubscription->name)));
1574 
1575  proc_exit(0);
1576  }
1577 
1578  /* !slotname should never happen when enabled is true. */
1579  Assert(newsub->slotname);
1580 
1581  /*
1582  * We need to make new connection to new slot if slot name has changed so
1583  * exit here as well if that's the case.
1584  */
1585  if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
1586  {
1587  ereport(LOG,
1588  (errmsg("logical replication apply worker for subscription \"%s\" will "
1589  "restart because the replication slot name was changed",
1590  MySubscription->name)));
1591 
1592  proc_exit(0);
1593  }
1594 
1595  /*
1596  * Exit if publication list was changed. The launcher will start new
1597  * worker.
1598  */
1599  if (!equal(newsub->publications, MySubscription->publications))
1600  {
1601  ereport(LOG,
1602  (errmsg("logical replication apply worker for subscription \"%s\" will "
1603  "restart because subscription's publications were changed",
1604  MySubscription->name)));
1605 
1606  proc_exit(0);
1607  }
1608 
1609  /* Check for other changes that should never happen too. */
1610  if (newsub->dbid != MySubscription->dbid)
1611  {
1612  elog(ERROR, "subscription %u changed unexpectedly",
1614  }
1615 
1616  /* Clean old subscription info and switch to new one. */
1617  FreeSubscription(MySubscription);
1618  MySubscription = newsub;
1619 
1620  MemoryContextSwitchTo(oldctx);
1621 
1622  /* Change synchronous commit according to the user's wishes */
1623  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1625 
1626  if (started_tx)
1628 
1629  MySubscriptionValid = true;
1630 }
1631 
1632 /*
1633  * Callback from subscription syscache invalidation.
1634  */
1635 static void
1636 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
1637 {
1638  MySubscriptionValid = false;
1639 }
1640 
1641 /* Logical Replication Apply worker entry point */
1642 void
1644 {
1645  int worker_slot = DatumGetInt32(main_arg);
1646  MemoryContext oldctx;
1647  char originname[NAMEDATALEN];
1648  XLogRecPtr origin_startpos;
1649  char *myslotname;
1651 
1652  /* Attach to slot */
1653  logicalrep_worker_attach(worker_slot);
1654 
1655  /* Setup signal handling */
1657  pqsignal(SIGTERM, die);
1659 
1660  /*
1661  * We don't currently need any ResourceOwner in a walreceiver process, but
1662  * if we did, we could call CreateAuxProcessResourceOwner here.
1663  */
1664 
1665  /* Initialise stats to a sanish value */
1668 
1669  /* Load the libpq-specific functions */
1670  load_file("libpqwalreceiver", false);
1671 
1672  /* Run as replica session replication role. */
1673  SetConfigOption("session_replication_role", "replica",
1675 
1676  /* Connect to our database. */
1679  0);
1680 
1681  /* Load the subscription into persistent memory context. */
1682  ApplyContext = AllocSetContextCreate(TopMemoryContext,
1683  "ApplyContext",
1686  oldctx = MemoryContextSwitchTo(ApplyContext);
1687 
1688  MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
1689  if (!MySubscription)
1690  {
1691  ereport(LOG,
1692  (errmsg("logical replication apply worker for subscription %u will not "
1693  "start because the subscription was removed during startup",
1695  proc_exit(0);
1696  }
1697 
1698  MySubscriptionValid = true;
1699  MemoryContextSwitchTo(oldctx);
1700 
1701  if (!MySubscription->enabled)
1702  {
1703  ereport(LOG,
1704  (errmsg("logical replication apply worker for subscription \"%s\" will not "
1705  "start because the subscription was disabled during startup",
1706  MySubscription->name)));
1707 
1708  proc_exit(0);
1709  }
1710 
1711  /* Setup synchronous commit according to the user's wishes */
1712  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1714 
1715  /* Keep us informed about subscription changes. */
1718  (Datum) 0);
1719 
1720  if (am_tablesync_worker())
1721  ereport(LOG,
1722  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
1723  MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
1724  else
1725  ereport(LOG,
1726  (errmsg("logical replication apply worker for subscription \"%s\" has started",
1727  MySubscription->name)));
1728 
1730 
1731  /* Connect to the origin and start the replication. */
1732  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1733  MySubscription->conninfo);
1734 
1735  if (am_tablesync_worker())
1736  {
1737  char *syncslotname;
1738 
1739  /* This is table synchronization worker, call initial sync. */
1740  syncslotname = LogicalRepSyncTableStart(&origin_startpos);
1741 
1742  /* The slot name needs to be allocated in permanent memory context. */
1743  oldctx = MemoryContextSwitchTo(ApplyContext);
1744  myslotname = pstrdup(syncslotname);
1745  MemoryContextSwitchTo(oldctx);
1746 
1747  pfree(syncslotname);
1748  }
1749  else
1750  {
1751  /* This is main apply worker */
1752  RepOriginId originid;
1753  TimeLineID startpointTLI;
1754  char *err;
1755 
1756  myslotname = MySubscription->slotname;
1757 
1758  /*
1759  * This shouldn't happen if the subscription is enabled, but guard
1760  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
1761  * crash if slot is NULL.)
1762  */
1763  if (!myslotname)
1764  ereport(ERROR,
1765  (errmsg("subscription has no replication slot set")));
1766 
1767  /* Setup replication origin tracking. */
1769  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1770  originid = replorigin_by_name(originname, true);
1771  if (!OidIsValid(originid))
1772  originid = replorigin_create(originname);
1773  replorigin_session_setup(originid);
1774  replorigin_session_origin = originid;
1775  origin_startpos = replorigin_session_get_progress(false);
1777 
1778  wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
1779  &err);
1780  if (wrconn == NULL)
1781  ereport(ERROR,
1782  (errmsg("could not connect to the publisher: %s", err)));
1783 
1784  /*
1785  * We don't really use the output identify_system for anything but it
1786  * does some initializations on the upstream so let's still call it.
1787  */
1788  (void) walrcv_identify_system(wrconn, &startpointTLI);
1789 
1790  }
1791 
1792  /*
1793  * Setup callback for syscache so that we know when something changes in
1794  * the subscription relation state.
1795  */
1798  (Datum) 0);
1799 
1800  /* Build logical replication streaming options. */
1801  options.logical = true;
1802  options.startpoint = origin_startpos;
1803  options.slotname = myslotname;
1804  options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1805  options.proto.logical.publication_names = MySubscription->publications;
1806 
1807  /* Start normal logical streaming replication. */
1808  walrcv_startstreaming(wrconn, &options);
1809 
1810  /* Run the main loop. */
1811  LogicalRepApplyLoop(origin_startpos);
1812 
1813  proc_exit(0);
1814 }
1815 
1816 /*
1817  * Is current process a logical replication worker?
1818  */
1819 bool
1821 {
1822  return MyLogicalRepWorker != NULL;
1823 }
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:77
void ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
Subscription * MySubscription
Definition: worker.c:104
static void apply_handle_type(StringInfo s)
Definition: worker.c:566
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:140
#define NIL
Definition: pg_list.h:65
void ExecInitRangeTable(EState *estate, List *rangeTable)
Definition: execUtils.c:724
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:185
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:580
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, Relation partition_root, int instrument_options)
Definition: execMain.c:1277
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:1411
Relation ri_RelationDesc
Definition: execnodes.h:413
WalReceiverConn * wrconn
Definition: worker.c:102
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define AllocSetContextCreate
Definition: memutils.h:170
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:279
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
static void store_flush_position(XLogRecPtr remote_lsn)
Definition: worker.c:1146
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1801
uint32 TimeLineID
Definition: xlogdefs.h:52
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:276
void AcceptInvalidationMessages(void)
Definition: inval.c:681
CommandId es_output_cid
Definition: execnodes.h:522
static XLogRecPtr remote_final_lsn
Definition: worker.c:108
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4543
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepRelation *remoterel)
Definition: worker.c:920
static void apply_handle_insert(StringInfo s)
Definition: worker.c:597
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
static dlist_head lsn_mapping
Definition: worker.c:90
#define DatumGetInt32(X)
Definition: postgres.h:472
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:3028
#define RelationGetDescr(relation)
Definition: rel.h:462
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:529
static void apply_handle_update_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, LogicalRepRelMapEntry *relmapentry)
Definition: worker.c:785
dlist_node node
Definition: worker.c:85
#define write(a, b, c)
Definition: win32.h:14
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3062
int64 timestamp
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:60
int64 TimestampTz
Definition: timestamp.h:39
LogicalRepRelMapEntry * rel
Definition: worker.c:94
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:271
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:325
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
char * pstrdup(const char *in)
Definition: mcxt.c:1186
void CommitTransactionCommand(void)
Definition: xact.c:2898
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:1165
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:363
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:281
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:375
Expr * expression_planner(Expr *expr)
Definition: planner.c:6046
int maplen
Definition: attmap.h:37
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define AccessShareLock
Definition: lockdefs.h:36
TimestampTz last_send_time
uint16 RepOriginId
Definition: xlogdefs.h:58
XLogRecPtr last_lsn
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:277
union WalRcvStreamOptions::@104 proto
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1188
XLogRecPtr remote_end
Definition: worker.c:87
int errcode(int sqlerrcode)
Definition: elog.c:610
char * format_type_be(Oid type_oid)
Definition: format_type.c:327
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:381
Datum * tts_values
Definition: tuptable.h:126
#define WL_SOCKET_READABLE
Definition: latch.h:125
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8384
void PopActiveSnapshot(void)
Definition: snapmgr.c:814
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:608
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1053
List * es_range_table
Definition: execnodes.h:510
#define LOG
Definition: elog.h:26
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
Form_pg_class rd_rel
Definition: rel.h:89
unsigned int Oid
Definition: postgres_ext.h:31
struct SlotErrCallbackArg SlotErrCallbackArg
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:817
bool IsLogicalWorker(void)
Definition: worker.c:1820
void(* callback)(void *arg)
Definition: elog.h:229
int wal_receiver_status_interval
Definition: walreceiver.c:88
List * lappend_oid(List *list, Oid datum)
Definition: list.c:358
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1682
struct ErrorContextCallback * previous
Definition: elog.h:228
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:306
#define OidIsValid(objectId)
Definition: c.h:644
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
static int fd(const char *x, int i)
Definition: preproc-init.c:105
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:208
void ResetLatch(Latch *latch)
Definition: latch.c:540
int wal_receiver_timeout
Definition: walreceiver.c:89
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:1102
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:151
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2932
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:159
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
ErrorContextCallback * error_context_stack
Definition: elog.c:92
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:223
#define list_make1(x1)
Definition: pg_list.h:227
#define NAMEDATALEN
void FreeExecutorState(EState *estate)
Definition: execUtils.c:190
Subscription * GetSubscription(Oid subid, bool missing_ok)
#define GetPerTupleExprContext(estate)
Definition: executor.h:506
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:154
static StringInfoData reply_message
Definition: walreceiver.c:124
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:1636
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1056
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
#define ERROR
Definition: elog.h:43
static bool ensure_transaction(void)
Definition: worker.c:156
void fill_extraUpdatedCols(RangeTblEntry *target_rte, TupleDesc tupdesc)
Definition: analyze.c:2359
LogicalRepRelation remoterel
#define NAPTIME_PER_CYCLE
Definition: worker.c:81
static void * list_nth(const List *list, int n)
Definition: pg_list.h:277
bool in_remote_transaction
Definition: worker.c:107
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs)
Definition: tablecmds.c:1669
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:427
Definition: guc.h:75
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:314
XLogRecPtr reply_lsn
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:354
static bool am_tablesync_worker(void)
static void apply_handle_delete(StringInfo s)
Definition: worker.c:862
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define DEBUG2
Definition: elog.h:24
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:153
char * c
void logicalrep_worker_attach(int slot)
Definition: launcher.c:625
#define NoLock
Definition: lockdefs.h:34
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:7658
static char * buf
Definition: pg_test_fsync.c:67
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:735
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:154
bool changed[MaxTupleAttributeNumber]
Definition: logicalproto.h:36
bool * tts_isnull
Definition: tuptable.h:128
void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:290
ResultRelInfo * es_result_relations
Definition: execnodes.h:525
#define RowExclusiveLock
Definition: lockdefs.h:38
#define SIGHUP
Definition: win32_port.h:153
#define RelationGetRelationName(relation)
Definition: rel.h:470
XLogRecPtr startpoint
Definition: walreceiver.h:159
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:193
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:390
unsigned int uint32
Definition: c.h:367
int pgsocket
Definition: port.h:31
List * publications
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void apply_handle_begin(StringInfo s)
Definition: worker.c:466
RepOriginId replorigin_create(char *roname)
Definition: origin.c:239
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:219
Oid get_atttype(Oid relid, AttrNumber attnum)
Definition: lsyscache.c:862
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2702
MemoryContext TopMemoryContext
Definition: mcxt.c:44
EState * CreateExecutorState(void)
Definition: execUtils.c:88
Definition: guc.h:72
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:671
List * lappend(List *list, void *datum)
Definition: list.c:322
MemoryContext ApplyContext
Definition: worker.c:100
static char ** options
XLogRecPtr final_lsn
Definition: logicalproto.h:67
#define DLIST_STATIC_INIT(name)
Definition: ilist.h:248
static void apply_handle_commit(StringInfo s)
Definition: worker.c:485
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2486
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:67
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
Node * build_column_default(Relation rel, int attrno)
List * es_tupleTable
Definition: execnodes.h:557
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1426
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1161
static void apply_handle_update(StringInfo s)
Definition: worker.c:705
uintptr_t Datum
Definition: postgres.h:367
void CommandCounterIncrement(void)
Definition: xact.c:1005
#define PGINVALID_SOCKET
Definition: port.h:33
int es_num_result_relations
Definition: execnodes.h:526
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5726
static void apply_handle_relation(StringInfo s)
Definition: worker.c:551
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int16 attnum
Definition: pg_attribute.h:79
#define ereport(elevel,...)
Definition: elog.h:144
Bitmapset * updatedCols
Definition: parsenodes.h:1122
XLogRecPtr local_end
Definition: worker.c:86
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4432
static void apply_dispatch(StringInfo s)
Definition: worker.c:1039
static void maybe_reread_subscription(void)
Definition: worker.c:1500
#define makeNode(_type_)
Definition: nodes.h:574
TimestampTz last_recv_time
static MemoryContext ApplyMessageContext
Definition: worker.c:99
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:738
#define lfirst(lc)
Definition: pg_list.h:190
RepOriginId replorigin_session_origin
Definition: origin.c:152
static void apply_handle_origin(StringInfo s)
Definition: worker.c:529
void StartTransactionCommand(void)
Definition: xact.c:2797
AttrNumber * attnums
Definition: attmap.h:36
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
static void slot_store_error_callback(void *arg)
Definition: worker.c:279
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:210
bool IsTransactionState(void)
Definition: xact.c:355
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:736
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:283
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:95
int WalWriterDelay
Definition: walwriter.c:70
bool MySubscriptionValid
Definition: worker.c:105
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:511
void FreeSubscription(Subscription *sub)
char * logicalrep_typmap_gettypname(Oid remoteid)
Definition: relation.c:435
RTEKind rtekind
Definition: parsenodes.h:975
static Datum values[MAXATTR]
Definition: bootstrap.c:167
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4452
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:817
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:824
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:1181
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
#define elog(elevel,...)
Definition: elog.h:214
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
int i
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
#define errcontext
Definition: elog.h:185
void * arg
struct Latch * MyLatch
Definition: globals.c:54
struct FlushPosition FlushPosition
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:123
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:745
XLogRecPtr commit_lsn
Definition: logicalproto.h:74
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389
static void slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, char **values, bool *replaces)
Definition: worker.c:388
Definition: pg_list.h:50
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1791
#define snprintf
Definition: port.h:192
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1648
#define WL_LATCH_SET
Definition: latch.h:124
#define RelationGetRelid(relation)
Definition: rel.h:436
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:226
#define die(msg)
Definition: pg_test_fsync.c:96
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4522
void ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
TimestampTz committime
Definition: logicalproto.h:76
void logicalrep_typmap_update(LogicalRepTyp *remotetyp)
Definition: relation.c:401
void ApplyWorkerMain(Datum main_arg)
Definition: worker.c:1643
uint32 LogicalRepRelId
Definition: logicalproto.h:39
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1522
#define lfirst_oid(lc)
Definition: pg_list.h:192
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
TimestampTz reply_time
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:215
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:256
static void apply_handle_insert_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot)
Definition: worker.c:654
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5755
void pgstat_report_stat(bool force)
Definition: pgstat.c:809
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:979
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:263
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:527