PostgreSQL Source Code  git master
worker.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * worker.c
3  * PostgreSQL logical replication worker (apply)
4  *
5  * Copyright (c) 2016-2020, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/worker.c
9  *
10  * NOTES
11  * This file contains the worker which applies logical changes as they come
12  * from remote logical replication stream.
13  *
14  * The main worker (apply) is started by logical replication worker
15  * launcher for every enabled subscription in a database. It uses
16  * walsender protocol to communicate with publisher.
17  *
18  * This module includes server facing code and shares libpqwalreceiver
19  * module with walreceiver for providing the libpq specific functionality.
20  *
21  *-------------------------------------------------------------------------
22  */
23 
24 #include "postgres.h"
25 
26 #include "access/table.h"
27 #include "access/tableam.h"
28 #include "access/xact.h"
29 #include "access/xlog_internal.h"
30 #include "catalog/catalog.h"
31 #include "catalog/namespace.h"
34 #include "commands/tablecmds.h"
35 #include "commands/trigger.h"
36 #include "executor/executor.h"
38 #include "funcapi.h"
39 #include "libpq/pqformat.h"
40 #include "libpq/pqsignal.h"
41 #include "mb/pg_wchar.h"
42 #include "miscadmin.h"
43 #include "nodes/makefuncs.h"
44 #include "optimizer/optimizer.h"
45 #include "parser/parse_relation.h"
46 #include "pgstat.h"
47 #include "postmaster/bgworker.h"
48 #include "postmaster/interrupt.h"
49 #include "postmaster/postmaster.h"
50 #include "postmaster/walwriter.h"
51 #include "replication/decode.h"
52 #include "replication/logical.h"
56 #include "replication/origin.h"
58 #include "replication/snapbuild.h"
61 #include "rewrite/rewriteHandler.h"
62 #include "storage/bufmgr.h"
63 #include "storage/ipc.h"
64 #include "storage/lmgr.h"
65 #include "storage/proc.h"
66 #include "storage/procarray.h"
67 #include "tcop/tcopprot.h"
68 #include "utils/builtins.h"
69 #include "utils/catcache.h"
70 #include "utils/datum.h"
71 #include "utils/fmgroids.h"
72 #include "utils/guc.h"
73 #include "utils/inval.h"
74 #include "utils/lsyscache.h"
75 #include "utils/memutils.h"
76 #include "utils/rel.h"
77 #include "utils/syscache.h"
78 #include "utils/timeout.h"
79 
80 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
81 
82 typedef struct FlushPosition
83 {
88 
90 
91 typedef struct SlotErrCallbackArg
92 {
97 
100 
102 
104 bool MySubscriptionValid = false;
105 
108 
109 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
110 
111 static void store_flush_position(XLogRecPtr remote_lsn);
112 
113 static void maybe_reread_subscription(void);
114 
115 /*
116  * Should this worker apply changes for given relation.
117  *
118  * This is mainly needed for initial relation data sync as that runs in
119  * separate worker process running in parallel and we need some way to skip
120  * changes coming to the main apply worker during the sync of a table.
121  *
122  * Note we need to do smaller or equals comparison for SYNCDONE state because
123  * it might hold position of end of initial slot consistent point WAL
124  * record + 1 (ie start of next record) and next record can be COMMIT of
125  * transaction we are now processing (which is what we set remote_final_lsn
126  * to in apply_handle_begin).
127  */
128 static bool
130 {
131  if (am_tablesync_worker())
132  return MyLogicalRepWorker->relid == rel->localreloid;
133  else
134  return (rel->state == SUBREL_STATE_READY ||
135  (rel->state == SUBREL_STATE_SYNCDONE &&
136  rel->statelsn <= remote_final_lsn));
137 }
138 
139 /*
140  * Make sure that we started local transaction.
141  *
142  * Also switches to ApplyMessageContext as necessary.
143  */
144 static bool
146 {
147  if (IsTransactionState())
148  {
150 
151  if (CurrentMemoryContext != ApplyMessageContext)
152  MemoryContextSwitchTo(ApplyMessageContext);
153 
154  return false;
155  }
156 
159 
161 
162  MemoryContextSwitchTo(ApplyMessageContext);
163  return true;
164 }
165 
166 
167 /*
168  * Executor state preparation for evaluation of constraint expressions,
169  * indexes and triggers.
170  *
171  * This is based on similar code in copy.c
172  */
173 static EState *
175 {
176  EState *estate;
177  ResultRelInfo *resultRelInfo;
178  RangeTblEntry *rte;
179 
180  estate = CreateExecutorState();
181 
182  rte = makeNode(RangeTblEntry);
183  rte->rtekind = RTE_RELATION;
184  rte->relid = RelationGetRelid(rel->localrel);
185  rte->relkind = rel->localrel->rd_rel->relkind;
187  ExecInitRangeTable(estate, list_make1(rte));
188 
189  resultRelInfo = makeNode(ResultRelInfo);
190  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
191 
192  estate->es_result_relations = resultRelInfo;
193  estate->es_num_result_relations = 1;
194  estate->es_result_relation_info = resultRelInfo;
195 
196  estate->es_output_cid = GetCurrentCommandId(true);
197 
198  /* Prepare to catch AFTER triggers. */
200 
201  return estate;
202 }
203 
204 /*
205  * Executes default values for columns for which we can't map to remote
206  * relation columns.
207  *
208  * This allows us to support tables which have more columns on the downstream
209  * than on the upstream.
210  */
211 static void
213  TupleTableSlot *slot)
214 {
215  TupleDesc desc = RelationGetDescr(rel->localrel);
216  int num_phys_attrs = desc->natts;
217  int i;
218  int attnum,
219  num_defaults = 0;
220  int *defmap;
221  ExprState **defexprs;
222  ExprContext *econtext;
223 
224  econtext = GetPerTupleExprContext(estate);
225 
226  /* We got all the data via replication, no need to evaluate anything. */
227  if (num_phys_attrs == rel->remoterel.natts)
228  return;
229 
230  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
231  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
232 
233  Assert(rel->attrmap->maplen == num_phys_attrs);
234  for (attnum = 0; attnum < num_phys_attrs; attnum++)
235  {
236  Expr *defexpr;
237 
238  if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
239  continue;
240 
241  if (rel->attrmap->attnums[attnum] >= 0)
242  continue;
243 
244  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
245 
246  if (defexpr != NULL)
247  {
248  /* Run the expression through planner */
249  defexpr = expression_planner(defexpr);
250 
251  /* Initialize executable expression in copycontext */
252  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
253  defmap[num_defaults] = attnum;
254  num_defaults++;
255  }
256 
257  }
258 
259  for (i = 0; i < num_defaults; i++)
260  slot->tts_values[defmap[i]] =
261  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
262 }
263 
264 /*
265  * Error callback to give more context info about type conversion failure.
266  */
267 static void
269 {
270  SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
272  char *remotetypname;
273  Oid remotetypoid,
274  localtypoid;
275 
276  /* Nothing to do if remote attribute number is not set */
277  if (errarg->remote_attnum < 0)
278  return;
279 
280  rel = errarg->rel;
281  remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum];
282 
283  /* Fetch remote type name from the LogicalRepTypMap cache */
284  remotetypname = logicalrep_typmap_gettypname(remotetypoid);
285 
286  /* Fetch local type OID from the local sys cache */
287  localtypoid = get_atttype(rel->localreloid, errarg->local_attnum + 1);
288 
289  errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
290  "remote type %s, local type %s",
291  rel->remoterel.nspname, rel->remoterel.relname,
292  rel->remoterel.attnames[errarg->remote_attnum],
293  remotetypname,
294  format_type_be(localtypoid));
295 }
296 
297 /*
298  * Store data in C string form into slot.
299  * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
300  * use better.
301  */
302 static void
304  char **values)
305 {
306  int natts = slot->tts_tupleDescriptor->natts;
307  int i;
308  SlotErrCallbackArg errarg;
309  ErrorContextCallback errcallback;
310 
311  ExecClearTuple(slot);
312 
313  /* Push callback + info on the error context stack */
314  errarg.rel = rel;
315  errarg.local_attnum = -1;
316  errarg.remote_attnum = -1;
317  errcallback.callback = slot_store_error_callback;
318  errcallback.arg = (void *) &errarg;
319  errcallback.previous = error_context_stack;
320  error_context_stack = &errcallback;
321 
322  /* Call the "in" function for each non-dropped attribute */
323  Assert(natts == rel->attrmap->maplen);
324  for (i = 0; i < natts; i++)
325  {
327  int remoteattnum = rel->attrmap->attnums[i];
328 
329  if (!att->attisdropped && remoteattnum >= 0 &&
330  values[remoteattnum] != NULL)
331  {
332  Oid typinput;
333  Oid typioparam;
334 
335  errarg.local_attnum = i;
336  errarg.remote_attnum = remoteattnum;
337 
338  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
339  slot->tts_values[i] =
340  OidInputFunctionCall(typinput, values[remoteattnum],
341  typioparam, att->atttypmod);
342  slot->tts_isnull[i] = false;
343 
344  errarg.local_attnum = -1;
345  errarg.remote_attnum = -1;
346  }
347  else
348  {
349  /*
350  * We assign NULL to dropped attributes, NULL values, and missing
351  * values (missing values should be later filled using
352  * slot_fill_defaults).
353  */
354  slot->tts_values[i] = (Datum) 0;
355  slot->tts_isnull[i] = true;
356  }
357  }
358 
359  /* Pop the error context stack */
360  error_context_stack = errcallback.previous;
361 
362  ExecStoreVirtualTuple(slot);
363 }
364 
365 /*
366  * Replace selected columns with user data provided as C strings.
367  * This is somewhat similar to heap_modify_tuple but also calls the type
368  * input functions on the user data.
369  * "slot" is filled with a copy of the tuple in "srcslot", with
370  * columns selected by the "replaces" array replaced with data values
371  * from "values".
372  * Caution: unreplaced pass-by-ref columns in "slot" will point into the
373  * storage for "srcslot". This is OK for current usage, but someday we may
374  * need to materialize "slot" at the end to make it independent of "srcslot".
375  */
376 static void
379  char **values, bool *replaces)
380 {
381  int natts = slot->tts_tupleDescriptor->natts;
382  int i;
383  SlotErrCallbackArg errarg;
384  ErrorContextCallback errcallback;
385 
386  /* We'll fill "slot" with a virtual tuple, so we must start with ... */
387  ExecClearTuple(slot);
388 
389  /*
390  * Copy all the column data from srcslot, so that we'll have valid values
391  * for unreplaced columns.
392  */
393  Assert(natts == srcslot->tts_tupleDescriptor->natts);
394  slot_getallattrs(srcslot);
395  memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
396  memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
397 
398  /* For error reporting, push callback + info on the error context stack */
399  errarg.rel = rel;
400  errarg.local_attnum = -1;
401  errarg.remote_attnum = -1;
402  errcallback.callback = slot_store_error_callback;
403  errcallback.arg = (void *) &errarg;
404  errcallback.previous = error_context_stack;
405  error_context_stack = &errcallback;
406 
407  /* Call the "in" function for each replaced attribute */
408  Assert(natts == rel->attrmap->maplen);
409  for (i = 0; i < natts; i++)
410  {
412  int remoteattnum = rel->attrmap->attnums[i];
413 
414  if (remoteattnum < 0)
415  continue;
416 
417  if (!replaces[remoteattnum])
418  continue;
419 
420  if (values[remoteattnum] != NULL)
421  {
422  Oid typinput;
423  Oid typioparam;
424 
425  errarg.local_attnum = i;
426  errarg.remote_attnum = remoteattnum;
427 
428  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
429  slot->tts_values[i] =
430  OidInputFunctionCall(typinput, values[remoteattnum],
431  typioparam, att->atttypmod);
432  slot->tts_isnull[i] = false;
433 
434  errarg.local_attnum = -1;
435  errarg.remote_attnum = -1;
436  }
437  else
438  {
439  slot->tts_values[i] = (Datum) 0;
440  slot->tts_isnull[i] = true;
441  }
442  }
443 
444  /* Pop the error context stack */
445  error_context_stack = errcallback.previous;
446 
447  /* And finally, declare that "slot" contains a valid virtual tuple */
448  ExecStoreVirtualTuple(slot);
449 }
450 
451 /*
452  * Handle BEGIN message.
453  */
454 static void
456 {
457  LogicalRepBeginData begin_data;
458 
459  logicalrep_read_begin(s, &begin_data);
460 
461  remote_final_lsn = begin_data.final_lsn;
462 
463  in_remote_transaction = true;
464 
466 }
467 
468 /*
469  * Handle COMMIT message.
470  *
471  * TODO, support tracking of multiple origins
472  */
473 static void
475 {
476  LogicalRepCommitData commit_data;
477 
478  logicalrep_read_commit(s, &commit_data);
479 
480  Assert(commit_data.commit_lsn == remote_final_lsn);
481 
482  /* The synchronization worker runs in single transaction. */
484  {
485  /*
486  * Update origin state so we can restart streaming from correct
487  * position in case of crash.
488  */
491 
493  pgstat_report_stat(false);
494 
495  store_flush_position(commit_data.end_lsn);
496  }
497  else
498  {
499  /* Process any invalidation messages that might have accumulated. */
502  }
503 
504  in_remote_transaction = false;
505 
506  /* Process any tables that are being synchronized in parallel. */
507  process_syncing_tables(commit_data.end_lsn);
508 
510 }
511 
512 /*
513  * Handle ORIGIN message.
514  *
515  * TODO, support tracking of multiple origins
516  */
517 static void
519 {
520  /*
521  * ORIGIN message can only come inside remote transaction and before any
522  * actual writes.
523  */
524  if (!in_remote_transaction ||
526  ereport(ERROR,
527  (errcode(ERRCODE_PROTOCOL_VIOLATION),
528  errmsg("ORIGIN message sent out of order")));
529 }
530 
531 /*
532  * Handle RELATION message.
533  *
534  * Note we don't do validation against local schema here. The validation
535  * against local schema is postponed until first change for given relation
536  * comes as we only care about it when applying changes for it anyway and we
537  * do less locking this way.
538  */
539 static void
541 {
542  LogicalRepRelation *rel;
543 
544  rel = logicalrep_read_rel(s);
546 }
547 
548 /*
549  * Handle TYPE message.
550  *
551  * Note we don't do local mapping here, that's done when the type is
552  * actually used.
553  */
554 static void
556 {
557  LogicalRepTyp typ;
558 
559  logicalrep_read_typ(s, &typ);
561 }
562 
563 /*
564  * Get replica identity index or if it is not defined a primary key.
565  *
566  * If neither is defined, returns InvalidOid
567  */
568 static Oid
570 {
571  Oid idxoid;
572 
573  idxoid = RelationGetReplicaIndex(rel);
574 
575  if (!OidIsValid(idxoid))
576  idxoid = RelationGetPrimaryKeyIndex(rel);
577 
578  return idxoid;
579 }
580 
581 /*
582  * Handle INSERT message.
583  */
584 static void
586 {
588  LogicalRepTupleData newtup;
589  LogicalRepRelId relid;
590  EState *estate;
591  TupleTableSlot *remoteslot;
592  MemoryContext oldctx;
593 
595 
596  relid = logicalrep_read_insert(s, &newtup);
599  {
600  /*
601  * The relation can't become interesting in the middle of the
602  * transaction so it's safe to unlock it.
603  */
605  return;
606  }
607 
608  /* Initialize the executor state. */
609  estate = create_estate_for_relation(rel);
610  remoteslot = ExecInitExtraTupleSlot(estate,
612  &TTSOpsVirtual);
613 
614  /* Input functions may need an active snapshot, so get one */
616 
617  /* Process and store remote tuple in the slot */
619  slot_store_cstrings(remoteslot, rel, newtup.values);
620  slot_fill_defaults(rel, estate, remoteslot);
621  MemoryContextSwitchTo(oldctx);
622 
624 
625  /* Do the insert. */
626  ExecSimpleRelationInsert(estate, remoteslot);
627 
628  /* Cleanup. */
631 
632  /* Handle queued AFTER triggers. */
633  AfterTriggerEndQuery(estate);
634 
635  ExecResetTupleTable(estate->es_tupleTable, false);
636  FreeExecutorState(estate);
637 
639 
641 }
642 
643 /*
644  * Check if the logical replication relation is updatable and throw
645  * appropriate error if it isn't.
646  */
647 static void
649 {
650  /* Updatable, no error. */
651  if (rel->updatable)
652  return;
653 
654  /*
655  * We are in error mode so it's fine this is somewhat slow. It's better to
656  * give user correct error.
657  */
659  {
660  ereport(ERROR,
661  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
662  errmsg("publisher did not send replica identity column "
663  "expected by the logical replication target relation \"%s.%s\"",
664  rel->remoterel.nspname, rel->remoterel.relname)));
665  }
666 
667  ereport(ERROR,
668  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
669  errmsg("logical replication target relation \"%s.%s\" has "
670  "neither REPLICA IDENTITY index nor PRIMARY "
671  "KEY and published relation does not have "
672  "REPLICA IDENTITY FULL",
673  rel->remoterel.nspname, rel->remoterel.relname)));
674 }
675 
676 /*
677  * Handle UPDATE message.
678  *
679  * TODO: FDW support
680  */
681 static void
683 {
685  LogicalRepRelId relid;
686  Oid idxoid;
687  EState *estate;
688  EPQState epqstate;
689  LogicalRepTupleData oldtup;
690  LogicalRepTupleData newtup;
691  bool has_oldtup;
692  TupleTableSlot *localslot;
693  TupleTableSlot *remoteslot;
694  RangeTblEntry *target_rte;
695  bool found;
696  MemoryContext oldctx;
697 
699 
700  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
701  &newtup);
704  {
705  /*
706  * The relation can't become interesting in the middle of the
707  * transaction so it's safe to unlock it.
708  */
710  return;
711  }
712 
713  /* Check if we can do the update. */
715 
716  /* Initialize the executor state. */
717  estate = create_estate_for_relation(rel);
718  remoteslot = ExecInitExtraTupleSlot(estate,
720  &TTSOpsVirtual);
721  localslot = table_slot_create(rel->localrel,
722  &estate->es_tupleTable);
723  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
724 
725  /*
726  * Populate updatedCols so that per-column triggers can fire. This could
727  * include more columns than were actually changed on the publisher
728  * because the logical replication protocol doesn't contain that
729  * information. But it would for example exclude columns that only exist
730  * on the subscriber, since we are not touching those.
731  */
732  target_rte = list_nth(estate->es_range_table, 0);
733  for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
734  {
735  if (newtup.changed[i])
736  target_rte->updatedCols = bms_add_member(target_rte->updatedCols,
738  }
739 
742 
743  /* Build the search tuple. */
745  slot_store_cstrings(remoteslot, rel,
746  has_oldtup ? oldtup.values : newtup.values);
747  MemoryContextSwitchTo(oldctx);
748 
749  /*
750  * Try to find tuple using either replica identity index, primary key or
751  * if needed, sequential scan.
752  */
753  idxoid = GetRelationIdentityOrPK(rel->localrel);
754  Assert(OidIsValid(idxoid) ||
755  (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
756 
757  if (OidIsValid(idxoid))
758  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
760  remoteslot, localslot);
761  else
763  remoteslot, localslot);
764 
765  ExecClearTuple(remoteslot);
766 
767  /*
768  * Tuple found.
769  *
770  * Note this will fail if there are other conflicting unique indexes.
771  */
772  if (found)
773  {
774  /* Process and store remote tuple in the slot */
776  slot_modify_cstrings(remoteslot, localslot, rel,
777  newtup.values, newtup.changed);
778  MemoryContextSwitchTo(oldctx);
779 
780  EvalPlanQualSetSlot(&epqstate, remoteslot);
781 
782  /* Do the actual update. */
783  ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
784  }
785  else
786  {
787  /*
788  * The tuple to be updated could not be found.
789  *
790  * TODO what to do here, change the log level to LOG perhaps?
791  */
792  elog(DEBUG1,
793  "logical replication did not find row for update "
794  "in replication target relation \"%s\"",
796  }
797 
798  /* Cleanup. */
801 
802  /* Handle queued AFTER triggers. */
803  AfterTriggerEndQuery(estate);
804 
805  EvalPlanQualEnd(&epqstate);
806  ExecResetTupleTable(estate->es_tupleTable, false);
807  FreeExecutorState(estate);
808 
810 
812 }
813 
814 /*
815  * Handle DELETE message.
816  *
817  * TODO: FDW support
818  */
819 static void
821 {
823  LogicalRepTupleData oldtup;
824  LogicalRepRelId relid;
825  Oid idxoid;
826  EState *estate;
827  EPQState epqstate;
828  TupleTableSlot *remoteslot;
829  TupleTableSlot *localslot;
830  bool found;
831  MemoryContext oldctx;
832 
834 
835  relid = logicalrep_read_delete(s, &oldtup);
838  {
839  /*
840  * The relation can't become interesting in the middle of the
841  * transaction so it's safe to unlock it.
842  */
844  return;
845  }
846 
847  /* Check if we can do the delete. */
849 
850  /* Initialize the executor state. */
851  estate = create_estate_for_relation(rel);
852  remoteslot = ExecInitExtraTupleSlot(estate,
854  &TTSOpsVirtual);
855  localslot = table_slot_create(rel->localrel,
856  &estate->es_tupleTable);
857  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
858 
861 
862  /* Find the tuple using the replica identity index. */
864  slot_store_cstrings(remoteslot, rel, oldtup.values);
865  MemoryContextSwitchTo(oldctx);
866 
867  /*
868  * Try to find tuple using either replica identity index, primary key or
869  * if needed, sequential scan.
870  */
871  idxoid = GetRelationIdentityOrPK(rel->localrel);
872  Assert(OidIsValid(idxoid) ||
873  (rel->remoterel.replident == REPLICA_IDENTITY_FULL));
874 
875  if (OidIsValid(idxoid))
876  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
878  remoteslot, localslot);
879  else
881  remoteslot, localslot);
882  /* If found delete it. */
883  if (found)
884  {
885  EvalPlanQualSetSlot(&epqstate, localslot);
886 
887  /* Do the actual delete. */
888  ExecSimpleRelationDelete(estate, &epqstate, localslot);
889  }
890  else
891  {
892  /* The tuple to be deleted could not be found. */
893  elog(DEBUG1,
894  "logical replication could not find row for delete "
895  "in replication target relation \"%s\"",
897  }
898 
899  /* Cleanup. */
902 
903  /* Handle queued AFTER triggers. */
904  AfterTriggerEndQuery(estate);
905 
906  EvalPlanQualEnd(&epqstate);
907  ExecResetTupleTable(estate->es_tupleTable, false);
908  FreeExecutorState(estate);
909 
911 
913 }
914 
915 /*
916  * Handle TRUNCATE message.
917  *
918  * TODO: FDW support
919  */
920 static void
922 {
923  bool cascade = false;
924  bool restart_seqs = false;
925  List *remote_relids = NIL;
926  List *remote_rels = NIL;
927  List *rels = NIL;
928  List *relids = NIL;
929  List *relids_logged = NIL;
930  ListCell *lc;
931 
933 
934  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
935 
936  foreach(lc, remote_relids)
937  {
938  LogicalRepRelId relid = lfirst_oid(lc);
940 
943  {
944  /*
945  * The relation can't become interesting in the middle of the
946  * transaction so it's safe to unlock it.
947  */
949  continue;
950  }
951 
952  remote_rels = lappend(remote_rels, rel);
953  rels = lappend(rels, rel->localrel);
954  relids = lappend_oid(relids, rel->localreloid);
956  relids_logged = lappend_oid(relids_logged, rel->localreloid);
957  }
958 
959  /*
960  * Even if we used CASCADE on the upstream master we explicitly default to
961  * replaying changes without further cascading. This might be later
962  * changeable with a user specified option.
963  */
964  ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
965 
966  foreach(lc, remote_rels)
967  {
968  LogicalRepRelMapEntry *rel = lfirst(lc);
969 
971  }
972 
974 }
975 
976 
977 /*
978  * Logical replication protocol message dispatcher.
979  */
980 static void
982 {
983  char action = pq_getmsgbyte(s);
984 
985  switch (action)
986  {
987  /* BEGIN */
988  case 'B':
990  break;
991  /* COMMIT */
992  case 'C':
994  break;
995  /* INSERT */
996  case 'I':
998  break;
999  /* UPDATE */
1000  case 'U':
1002  break;
1003  /* DELETE */
1004  case 'D':
1006  break;
1007  /* TRUNCATE */
1008  case 'T':
1010  break;
1011  /* RELATION */
1012  case 'R':
1014  break;
1015  /* TYPE */
1016  case 'Y':
1017  apply_handle_type(s);
1018  break;
1019  /* ORIGIN */
1020  case 'O':
1022  break;
1023  default:
1024  ereport(ERROR,
1025  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1026  errmsg("invalid logical replication message type \"%c\"", action)));
1027  }
1028 }
1029 
1030 /*
1031  * Figure out which write/flush positions to report to the walsender process.
1032  *
1033  * We can't simply report back the last LSN the walsender sent us because the
1034  * local transaction might not yet be flushed to disk locally. Instead we
1035  * build a list that associates local with remote LSNs for every commit. When
1036  * reporting back the flush position to the sender we iterate that list and
1037  * check which entries on it are already locally flushed. Those we can report
1038  * as having been flushed.
1039  *
1040  * The have_pending_txes is true if there are outstanding transactions that
1041  * need to be flushed.
1042  */
1043 static void
1045  bool *have_pending_txes)
1046 {
1047  dlist_mutable_iter iter;
1048  XLogRecPtr local_flush = GetFlushRecPtr();
1049 
1050  *write = InvalidXLogRecPtr;
1051  *flush = InvalidXLogRecPtr;
1052 
1053  dlist_foreach_modify(iter, &lsn_mapping)
1054  {
1055  FlushPosition *pos =
1057 
1058  *write = pos->remote_end;
1059 
1060  if (pos->local_end <= local_flush)
1061  {
1062  *flush = pos->remote_end;
1063  dlist_delete(iter.cur);
1064  pfree(pos);
1065  }
1066  else
1067  {
1068  /*
1069  * Don't want to uselessly iterate over the rest of the list which
1070  * could potentially be long. Instead get the last element and
1071  * grab the write position from there.
1072  */
1074  &lsn_mapping);
1075  *write = pos->remote_end;
1076  *have_pending_txes = true;
1077  return;
1078  }
1079  }
1080 
1081  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
1082 }
1083 
1084 /*
1085  * Store current remote/local lsn pair in the tracking list.
1086  */
1087 static void
1089 {
1090  FlushPosition *flushpos;
1091 
1092  /* Need to do this in permanent context */
1093  MemoryContextSwitchTo(ApplyContext);
1094 
1095  /* Track commit lsn */
1096  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
1097  flushpos->local_end = XactLastCommitEnd;
1098  flushpos->remote_end = remote_lsn;
1099 
1100  dlist_push_tail(&lsn_mapping, &flushpos->node);
1101  MemoryContextSwitchTo(ApplyMessageContext);
1102 }
1103 
1104 
1105 /* Update statistics of the worker. */
1106 static void
1107 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
1108 {
1109  MyLogicalRepWorker->last_lsn = last_lsn;
1110  MyLogicalRepWorker->last_send_time = send_time;
1112  if (reply)
1113  {
1114  MyLogicalRepWorker->reply_lsn = last_lsn;
1115  MyLogicalRepWorker->reply_time = send_time;
1116  }
1117 }
1118 
1119 /*
1120  * Apply main loop.
1121  */
1122 static void
1124 {
1125  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1126 
1127  /*
1128  * Init the ApplyMessageContext which we clean up after each replication
1129  * protocol message.
1130  */
1131  ApplyMessageContext = AllocSetContextCreate(ApplyContext,
1132  "ApplyMessageContext",
1134 
1135  /* mark as idle, before starting to loop */
1137 
1138  for (;;)
1139  {
1141  int rc;
1142  int len;
1143  char *buf = NULL;
1144  bool endofstream = false;
1145  bool ping_sent = false;
1146  long wait_time;
1147 
1149 
1150  MemoryContextSwitchTo(ApplyMessageContext);
1151 
1152  len = walrcv_receive(wrconn, &buf, &fd);
1153 
1154  if (len != 0)
1155  {
1156  /* Process the data */
1157  for (;;)
1158  {
1160 
1161  if (len == 0)
1162  {
1163  break;
1164  }
1165  else if (len < 0)
1166  {
1167  ereport(LOG,
1168  (errmsg("data stream from publisher has ended")));
1169  endofstream = true;
1170  break;
1171  }
1172  else
1173  {
1174  int c;
1175  StringInfoData s;
1176 
1177  /* Reset timeout. */
1178  last_recv_timestamp = GetCurrentTimestamp();
1179  ping_sent = false;
1180 
1181  /* Ensure we are reading the data into our memory context. */
1182  MemoryContextSwitchTo(ApplyMessageContext);
1183 
1184  s.data = buf;
1185  s.len = len;
1186  s.cursor = 0;
1187  s.maxlen = -1;
1188 
1189  c = pq_getmsgbyte(&s);
1190 
1191  if (c == 'w')
1192  {
1193  XLogRecPtr start_lsn;
1194  XLogRecPtr end_lsn;
1195  TimestampTz send_time;
1196 
1197  start_lsn = pq_getmsgint64(&s);
1198  end_lsn = pq_getmsgint64(&s);
1199  send_time = pq_getmsgint64(&s);
1200 
1201  if (last_received < start_lsn)
1202  last_received = start_lsn;
1203 
1204  if (last_received < end_lsn)
1205  last_received = end_lsn;
1206 
1207  UpdateWorkerStats(last_received, send_time, false);
1208 
1209  apply_dispatch(&s);
1210  }
1211  else if (c == 'k')
1212  {
1213  XLogRecPtr end_lsn;
1215  bool reply_requested;
1216 
1217  end_lsn = pq_getmsgint64(&s);
1218  timestamp = pq_getmsgint64(&s);
1219  reply_requested = pq_getmsgbyte(&s);
1220 
1221  if (last_received < end_lsn)
1222  last_received = end_lsn;
1223 
1224  send_feedback(last_received, reply_requested, false);
1225  UpdateWorkerStats(last_received, timestamp, true);
1226  }
1227  /* other message types are purposefully ignored */
1228 
1229  MemoryContextReset(ApplyMessageContext);
1230  }
1231 
1232  len = walrcv_receive(wrconn, &buf, &fd);
1233  }
1234  }
1235 
1236  /* confirm all writes so far */
1237  send_feedback(last_received, false, false);
1238 
1239  if (!in_remote_transaction)
1240  {
1241  /*
1242  * If we didn't get any transactions for a while there might be
1243  * unconsumed invalidation messages in the queue, consume them
1244  * now.
1245  */
1248 
1249  /* Process any table synchronization changes. */
1250  process_syncing_tables(last_received);
1251  }
1252 
1253  /* Cleanup the memory. */
1254  MemoryContextResetAndDeleteChildren(ApplyMessageContext);
1256 
1257  /* Check if we need to exit the streaming loop. */
1258  if (endofstream)
1259  {
1260  TimeLineID tli;
1261 
1262  walrcv_endstreaming(wrconn, &tli);
1263  break;
1264  }
1265 
1266  /*
1267  * Wait for more data or latch. If we have unflushed transactions,
1268  * wake up after WalWriterDelay to see if they've been flushed yet (in
1269  * which case we should send a feedback message). Otherwise, there's
1270  * no particular urgency about waking up unless we get data or a
1271  * signal.
1272  */
1273  if (!dlist_is_empty(&lsn_mapping))
1274  wait_time = WalWriterDelay;
1275  else
1276  wait_time = NAPTIME_PER_CYCLE;
1277 
1281  fd, wait_time,
1283 
1284  if (rc & WL_LATCH_SET)
1285  {
1288  }
1289 
1290  if (ConfigReloadPending)
1291  {
1292  ConfigReloadPending = false;
1294  }
1295 
1296  if (rc & WL_TIMEOUT)
1297  {
1298  /*
1299  * We didn't receive anything new. If we haven't heard anything
1300  * from the server for more than wal_receiver_timeout / 2, ping
1301  * the server. Also, if it's been longer than
1302  * wal_receiver_status_interval since the last update we sent,
1303  * send a status update to the master anyway, to report any
1304  * progress in applying WAL.
1305  */
1306  bool requestReply = false;
1307 
1308  /*
1309  * Check if time since last receive from standby has reached the
1310  * configured limit.
1311  */
1312  if (wal_receiver_timeout > 0)
1313  {
1315  TimestampTz timeout;
1316 
1317  timeout =
1318  TimestampTzPlusMilliseconds(last_recv_timestamp,
1320 
1321  if (now >= timeout)
1322  ereport(ERROR,
1323  (errmsg("terminating logical replication worker due to timeout")));
1324 
1325  /*
1326  * We didn't receive anything new, for half of receiver
1327  * replication timeout. Ping the server.
1328  */
1329  if (!ping_sent)
1330  {
1331  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1332  (wal_receiver_timeout / 2));
1333  if (now >= timeout)
1334  {
1335  requestReply = true;
1336  ping_sent = true;
1337  }
1338  }
1339  }
1340 
1341  send_feedback(last_received, requestReply, requestReply);
1342  }
1343  }
1344 }
1345 
1346 /*
1347  * Send a Standby Status Update message to server.
1348  *
1349  * 'recvpos' is the latest LSN we've received data to, force is set if we need
1350  * to send a response to avoid timeouts.
1351  */
1352 static void
1353 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
1354 {
1355  static StringInfo reply_message = NULL;
1356  static TimestampTz send_time = 0;
1357 
1358  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1359  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1360  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1361 
1362  XLogRecPtr writepos;
1363  XLogRecPtr flushpos;
1364  TimestampTz now;
1365  bool have_pending_txes;
1366 
1367  /*
1368  * If the user doesn't want status to be reported to the publisher, be
1369  * sure to exit before doing anything at all.
1370  */
1371  if (!force && wal_receiver_status_interval <= 0)
1372  return;
1373 
1374  /* It's legal to not pass a recvpos */
1375  if (recvpos < last_recvpos)
1376  recvpos = last_recvpos;
1377 
1378  get_flush_position(&writepos, &flushpos, &have_pending_txes);
1379 
1380  /*
1381  * No outstanding transactions to flush, we can report the latest received
1382  * position. This is important for synchronous replication.
1383  */
1384  if (!have_pending_txes)
1385  flushpos = writepos = recvpos;
1386 
1387  if (writepos < last_writepos)
1388  writepos = last_writepos;
1389 
1390  if (flushpos < last_flushpos)
1391  flushpos = last_flushpos;
1392 
1393  now = GetCurrentTimestamp();
1394 
1395  /* if we've already reported everything we're good */
1396  if (!force &&
1397  writepos == last_writepos &&
1398  flushpos == last_flushpos &&
1399  !TimestampDifferenceExceeds(send_time, now,
1401  return;
1402  send_time = now;
1403 
1404  if (!reply_message)
1405  {
1406  MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
1407 
1408  reply_message = makeStringInfo();
1409  MemoryContextSwitchTo(oldctx);
1410  }
1411  else
1412  resetStringInfo(reply_message);
1413 
1414  pq_sendbyte(reply_message, 'r');
1415  pq_sendint64(reply_message, recvpos); /* write */
1416  pq_sendint64(reply_message, flushpos); /* flush */
1417  pq_sendint64(reply_message, writepos); /* apply */
1418  pq_sendint64(reply_message, now); /* sendTime */
1419  pq_sendbyte(reply_message, requestReply); /* replyRequested */
1420 
1421  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1422  force,
1423  (uint32) (recvpos >> 32), (uint32) recvpos,
1424  (uint32) (writepos >> 32), (uint32) writepos,
1425  (uint32) (flushpos >> 32), (uint32) flushpos
1426  );
1427 
1428  walrcv_send(wrconn, reply_message->data, reply_message->len);
1429 
1430  if (recvpos > last_recvpos)
1431  last_recvpos = recvpos;
1432  if (writepos > last_writepos)
1433  last_writepos = writepos;
1434  if (flushpos > last_flushpos)
1435  last_flushpos = flushpos;
1436 }
1437 
1438 /*
1439  * Reread subscription info if needed. Most changes will be exit.
1440  */
1441 static void
1443 {
1444  MemoryContext oldctx;
1446  bool started_tx = false;
1447 
1448  /* When cache state is valid there is nothing to do here. */
1449  if (MySubscriptionValid)
1450  return;
1451 
1452  /* This function might be called inside or outside of transaction. */
1453  if (!IsTransactionState())
1454  {
1456  started_tx = true;
1457  }
1458 
1459  /* Ensure allocations in permanent context. */
1460  oldctx = MemoryContextSwitchTo(ApplyContext);
1461 
1462  newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1463 
1464  /*
1465  * Exit if the subscription was removed. This normally should not happen
1466  * as the worker gets killed during DROP SUBSCRIPTION.
1467  */
1468  if (!newsub)
1469  {
1470  ereport(LOG,
1471  (errmsg("logical replication apply worker for subscription \"%s\" will "
1472  "stop because the subscription was removed",
1473  MySubscription->name)));
1474 
1475  proc_exit(0);
1476  }
1477 
1478  /*
1479  * Exit if the subscription was disabled. This normally should not happen
1480  * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
1481  */
1482  if (!newsub->enabled)
1483  {
1484  ereport(LOG,
1485  (errmsg("logical replication apply worker for subscription \"%s\" will "
1486  "stop because the subscription was disabled",
1487  MySubscription->name)));
1488 
1489  proc_exit(0);
1490  }
1491 
1492  /*
1493  * Exit if connection string was changed. The launcher will start new
1494  * worker.
1495  */
1496  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1497  {
1498  ereport(LOG,
1499  (errmsg("logical replication apply worker for subscription \"%s\" will "
1500  "restart because the connection information was changed",
1501  MySubscription->name)));
1502 
1503  proc_exit(0);
1504  }
1505 
1506  /*
1507  * Exit if subscription name was changed (it's used for
1508  * fallback_application_name). The launcher will start new worker.
1509  */
1510  if (strcmp(newsub->name, MySubscription->name) != 0)
1511  {
1512  ereport(LOG,
1513  (errmsg("logical replication apply worker for subscription \"%s\" will "
1514  "restart because subscription was renamed",
1515  MySubscription->name)));
1516 
1517  proc_exit(0);
1518  }
1519 
1520  /* !slotname should never happen when enabled is true. */
1521  Assert(newsub->slotname);
1522 
1523  /*
1524  * We need to make new connection to new slot if slot name has changed so
1525  * exit here as well if that's the case.
1526  */
1527  if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
1528  {
1529  ereport(LOG,
1530  (errmsg("logical replication apply worker for subscription \"%s\" will "
1531  "restart because the replication slot name was changed",
1532  MySubscription->name)));
1533 
1534  proc_exit(0);
1535  }
1536 
1537  /*
1538  * Exit if publication list was changed. The launcher will start new
1539  * worker.
1540  */
1541  if (!equal(newsub->publications, MySubscription->publications))
1542  {
1543  ereport(LOG,
1544  (errmsg("logical replication apply worker for subscription \"%s\" will "
1545  "restart because subscription's publications were changed",
1546  MySubscription->name)));
1547 
1548  proc_exit(0);
1549  }
1550 
1551  /* Check for other changes that should never happen too. */
1552  if (newsub->dbid != MySubscription->dbid)
1553  {
1554  elog(ERROR, "subscription %u changed unexpectedly",
1556  }
1557 
1558  /* Clean old subscription info and switch to new one. */
1559  FreeSubscription(MySubscription);
1560  MySubscription = newsub;
1561 
1562  MemoryContextSwitchTo(oldctx);
1563 
1564  /* Change synchronous commit according to the user's wishes */
1565  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1567 
1568  if (started_tx)
1570 
1571  MySubscriptionValid = true;
1572 }
1573 
1574 /*
1575  * Callback from subscription syscache invalidation.
1576  */
1577 static void
1578 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
1579 {
1580  MySubscriptionValid = false;
1581 }
1582 
1583 /* Logical Replication Apply worker entry point */
1584 void
1586 {
1587  int worker_slot = DatumGetInt32(main_arg);
1588  MemoryContext oldctx;
1589  char originname[NAMEDATALEN];
1590  XLogRecPtr origin_startpos;
1591  char *myslotname;
1593 
1594  /* Attach to slot */
1595  logicalrep_worker_attach(worker_slot);
1596 
1597  /* Setup signal handling */
1599  pqsignal(SIGTERM, die);
1601 
1602  /*
1603  * We don't currently need any ResourceOwner in a walreceiver process, but
1604  * if we did, we could call CreateAuxProcessResourceOwner here.
1605  */
1606 
1607  /* Initialise stats to a sanish value */
1610 
1611  /* Load the libpq-specific functions */
1612  load_file("libpqwalreceiver", false);
1613 
1614  /* Run as replica session replication role. */
1615  SetConfigOption("session_replication_role", "replica",
1617 
1618  /* Connect to our database. */
1621  0);
1622 
1623  /* Load the subscription into persistent memory context. */
1624  ApplyContext = AllocSetContextCreate(TopMemoryContext,
1625  "ApplyContext",
1628  oldctx = MemoryContextSwitchTo(ApplyContext);
1629 
1630  MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
1631  if (!MySubscription)
1632  {
1633  ereport(LOG,
1634  (errmsg("logical replication apply worker for subscription %u will not "
1635  "start because the subscription was removed during startup",
1637  proc_exit(0);
1638  }
1639 
1640  MySubscriptionValid = true;
1641  MemoryContextSwitchTo(oldctx);
1642 
1643  if (!MySubscription->enabled)
1644  {
1645  ereport(LOG,
1646  (errmsg("logical replication apply worker for subscription \"%s\" will not "
1647  "start because the subscription was disabled during startup",
1648  MySubscription->name)));
1649 
1650  proc_exit(0);
1651  }
1652 
1653  /* Setup synchronous commit according to the user's wishes */
1654  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1656 
1657  /* Keep us informed about subscription changes. */
1660  (Datum) 0);
1661 
1662  if (am_tablesync_worker())
1663  ereport(LOG,
1664  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
1665  MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
1666  else
1667  ereport(LOG,
1668  (errmsg("logical replication apply worker for subscription \"%s\" has started",
1669  MySubscription->name)));
1670 
1672 
1673  /* Connect to the origin and start the replication. */
1674  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1675  MySubscription->conninfo);
1676 
1677  if (am_tablesync_worker())
1678  {
1679  char *syncslotname;
1680 
1681  /* This is table synchronization worker, call initial sync. */
1682  syncslotname = LogicalRepSyncTableStart(&origin_startpos);
1683 
1684  /* The slot name needs to be allocated in permanent memory context. */
1685  oldctx = MemoryContextSwitchTo(ApplyContext);
1686  myslotname = pstrdup(syncslotname);
1687  MemoryContextSwitchTo(oldctx);
1688 
1689  pfree(syncslotname);
1690  }
1691  else
1692  {
1693  /* This is main apply worker */
1694  RepOriginId originid;
1695  TimeLineID startpointTLI;
1696  char *err;
1697 
1698  myslotname = MySubscription->slotname;
1699 
1700  /*
1701  * This shouldn't happen if the subscription is enabled, but guard
1702  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
1703  * crash if slot is NULL.)
1704  */
1705  if (!myslotname)
1706  ereport(ERROR,
1707  (errmsg("subscription has no replication slot set")));
1708 
1709  /* Setup replication origin tracking. */
1711  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1712  originid = replorigin_by_name(originname, true);
1713  if (!OidIsValid(originid))
1714  originid = replorigin_create(originname);
1715  replorigin_session_setup(originid);
1716  replorigin_session_origin = originid;
1717  origin_startpos = replorigin_session_get_progress(false);
1719 
1720  wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
1721  &err);
1722  if (wrconn == NULL)
1723  ereport(ERROR,
1724  (errmsg("could not connect to the publisher: %s", err)));
1725 
1726  /*
1727  * We don't really use the output identify_system for anything but it
1728  * does some initializations on the upstream so let's still call it.
1729  */
1730  (void) walrcv_identify_system(wrconn, &startpointTLI);
1731 
1732  }
1733 
1734  /*
1735  * Setup callback for syscache so that we know when something changes in
1736  * the subscription relation state.
1737  */
1740  (Datum) 0);
1741 
1742  /* Build logical replication streaming options. */
1743  options.logical = true;
1744  options.startpoint = origin_startpos;
1745  options.slotname = myslotname;
1746  options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1747  options.proto.logical.publication_names = MySubscription->publications;
1748 
1749  /* Start normal logical streaming replication. */
1750  walrcv_startstreaming(wrconn, &options);
1751 
1752  /* Run the main loop. */
1753  LogicalRepApplyLoop(origin_startpos);
1754 
1755  proc_exit(0);
1756 }
1757 
1758 /*
1759  * Is current process a logical replication worker?
1760  */
1761 bool
1763 {
1764  return MyLogicalRepWorker != NULL;
1765 }
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:77
void ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
Subscription * MySubscription
Definition: worker.c:103
static void apply_handle_type(StringInfo s)
Definition: worker.c:555
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:129
#define NIL
Definition: pg_list.h:65
void ExecInitRangeTable(EState *estate, List *rangeTable)
Definition: execUtils.c:724
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:174
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:569
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, Relation partition_root, int instrument_options)
Definition: execMain.c:1277
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:1353
WalReceiverConn * wrconn
Definition: worker.c:101
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define AllocSetContextCreate
Definition: memutils.h:170
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:280
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
static void store_flush_position(XLogRecPtr remote_lsn)
Definition: worker.c:1088
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1801
uint32 TimeLineID
Definition: xlogdefs.h:52
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:276
void AcceptInvalidationMessages(void)
Definition: inval.c:681
CommandId es_output_cid
Definition: execnodes.h:516
static XLogRecPtr remote_final_lsn
Definition: worker.c:107
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4539
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
static void apply_handle_insert(StringInfo s)
Definition: worker.c:585
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
static dlist_head lsn_mapping
Definition: worker.c:89
#define DatumGetInt32(X)
Definition: postgres.h:472
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:3018
#define RelationGetDescr(relation)
Definition: rel.h:454
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:529
dlist_node node
Definition: worker.c:84
#define write(a, b, c)
Definition: win32.h:14
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3114
int64 timestamp
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:60
int64 TimestampTz
Definition: timestamp.h:39
LogicalRepRelMapEntry * rel
Definition: worker.c:93
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:272
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:325
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
char * pstrdup(const char *in)
Definition: mcxt.c:1186
void CommitTransactionCommand(void)
Definition: xact.c:2898
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:1107
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:352
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:282
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:375
Expr * expression_planner(Expr *expr)
Definition: planner.c:6051
int maplen
Definition: attmap.h:37
union WalRcvStreamOptions::@106 proto
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define AccessShareLock
Definition: lockdefs.h:36
TimestampTz last_send_time
uint16 RepOriginId
Definition: xlogdefs.h:58
XLogRecPtr last_lsn
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:278
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1188
XLogRecPtr remote_end
Definition: worker.c:86
int errcode(int sqlerrcode)
Definition: elog.c:608
char * format_type_be(Oid type_oid)
Definition: format_type.c:326
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:381
Datum * tts_values
Definition: tuptable.h:126
#define WL_SOCKET_READABLE
Definition: latch.h:125
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8262
void PopActiveSnapshot(void)
Definition: snapmgr.c:814
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:600
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1053
List * es_range_table
Definition: execnodes.h:504
#define LOG
Definition: elog.h:26
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
Form_pg_class rd_rel
Definition: rel.h:84
unsigned int Oid
Definition: postgres_ext.h:31
struct SlotErrCallbackArg SlotErrCallbackArg
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:798
bool IsLogicalWorker(void)
Definition: worker.c:1762
void(* callback)(void *arg)
Definition: elog.h:256
int wal_receiver_status_interval
Definition: walreceiver.c:77
List * lappend_oid(List *list, Oid datum)
Definition: list.c:358
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1682
struct ErrorContextCallback * previous
Definition: elog.h:255
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:306
#define OidIsValid(objectId)
Definition: c.h:645
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
static int fd(const char *x, int i)
Definition: preproc-init.c:105
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:208
void ResetLatch(Latch *latch)
Definition: latch.c:519
int wal_receiver_timeout
Definition: walreceiver.c:78
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:1044
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:151
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2944
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:159
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
ErrorContextCallback * error_context_stack
Definition: elog.c:91
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:212
#define list_make1(x1)
Definition: pg_list.h:227
#define NAMEDATALEN
void FreeExecutorState(EState *estate)
Definition: execUtils.c:190
Subscription * GetSubscription(Oid subid, bool missing_ok)
#define GetPerTupleExprContext(estate)
Definition: executor.h:501
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:154
static StringInfoData reply_message
Definition: walreceiver.c:114
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:1578
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1056
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
#define ERROR
Definition: elog.h:43
static bool ensure_transaction(void)
Definition: worker.c:145
LogicalRepRelation remoterel
#define NAPTIME_PER_CYCLE
Definition: worker.c:80
static void * list_nth(const List *list, int n)
Definition: pg_list.h:277
bool in_remote_transaction
Definition: worker.c:106
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs)
Definition: tablecmds.c:1645
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:427
Definition: guc.h:75
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:303
XLogRecPtr reply_lsn
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:354
static bool am_tablesync_worker(void)
static void apply_handle_delete(StringInfo s)
Definition: worker.c:820
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define DEBUG2
Definition: elog.h:24
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:153
char * c
void logicalrep_worker_attach(int slot)
Definition: launcher.c:625
#define NoLock
Definition: lockdefs.h:34
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:7559
static char * buf
Definition: pg_test_fsync.c:67
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:735
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:154
bool changed[MaxTupleAttributeNumber]
Definition: logicalproto.h:36
bool * tts_isnull
Definition: tuptable.h:128
void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:285
ResultRelInfo * es_result_relations
Definition: execnodes.h:519
#define RowExclusiveLock
Definition: lockdefs.h:38
#define SIGHUP
Definition: win32_port.h:154
#define RelationGetRelationName(relation)
Definition: rel.h:462
XLogRecPtr startpoint
Definition: walreceiver.h:160
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:200
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
unsigned int uint32
Definition: c.h:359
int pgsocket
Definition: port.h:31
List * publications
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void apply_handle_begin(StringInfo s)
Definition: worker.c:455
RepOriginId replorigin_create(char *roname)
Definition: origin.c:239
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:219
Oid get_atttype(Oid relid, AttrNumber attnum)
Definition: lsyscache.c:861
#define ereport(elevel, rest)
Definition: elog.h:141
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2641
MemoryContext TopMemoryContext
Definition: mcxt.c:44
EState * CreateExecutorState(void)
Definition: execUtils.c:88
Definition: guc.h:72
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:648
List * lappend(List *list, void *datum)
Definition: list.c:322
MemoryContext ApplyContext
Definition: worker.c:99
static char ** options
XLogRecPtr final_lsn
Definition: logicalproto.h:66
#define DLIST_STATIC_INIT(name)
Definition: ilist.h:248
static void apply_handle_commit(StringInfo s)
Definition: worker.c:474
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2484
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:67
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
Node * build_column_default(Relation rel, int attrno)
List * es_tupleTable
Definition: execnodes.h:551
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1426
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1161
static void apply_handle_update(StringInfo s)
Definition: worker.c:682
uintptr_t Datum
Definition: postgres.h:367
void CommandCounterIncrement(void)
Definition: xact.c:1005
#define PGINVALID_SOCKET
Definition: port.h:33
int es_num_result_relations
Definition: execnodes.h:520
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5708
static void apply_handle_relation(StringInfo s)
Definition: worker.c:540
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int16 attnum
Definition: pg_attribute.h:79
Bitmapset * updatedCols
Definition: parsenodes.h:1121
XLogRecPtr local_end
Definition: worker.c:85
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4785
static void apply_dispatch(StringInfo s)
Definition: worker.c:981
static void maybe_reread_subscription(void)
Definition: worker.c:1442
#define makeNode(_type_)
Definition: nodes.h:573
TimestampTz last_recv_time
static MemoryContext ApplyMessageContext
Definition: worker.c:98
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:739
#define lfirst(lc)
Definition: pg_list.h:190
RepOriginId replorigin_session_origin
Definition: origin.c:152
static void apply_handle_origin(StringInfo s)
Definition: worker.c:518
void StartTransactionCommand(void)
Definition: xact.c:2797
AttrNumber * attnums
Definition: attmap.h:36
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
static void slot_store_error_callback(void *arg)
Definition: worker.c:268
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:210
bool IsTransactionState(void)
Definition: xact.c:355
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:736
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:284
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:95
int WalWriterDelay
Definition: walwriter.c:70
bool MySubscriptionValid
Definition: worker.c:104
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:506
void FreeSubscription(Subscription *sub)
char * logicalrep_typmap_gettypname(Oid remoteid)
Definition: relation.c:435
RTEKind rtekind
Definition: parsenodes.h:974
static Datum values[MAXATTR]
Definition: bootstrap.c:167
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4805
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:817
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:822
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:1123
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
#define elog(elevel,...)
Definition: elog.h:228
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
int i
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
#define errcontext
Definition: elog.h:183
void * arg
struct Latch * MyLatch
Definition: globals.c:54
struct FlushPosition FlushPosition
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:121
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:745
XLogRecPtr commit_lsn
Definition: logicalproto.h:73
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389
static void slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, char **values, bool *replaces)
Definition: worker.c:377
Definition: pg_list.h:50
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1730
#define snprintf
Definition: port.h:192
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1646
#define WL_LATCH_SET
Definition: latch.h:124
#define RelationGetRelid(relation)
Definition: rel.h:428
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:226
#define die(msg)
Definition: pg_test_fsync.c:96
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4518
void ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
TimestampTz committime
Definition: logicalproto.h:75
void logicalrep_typmap_update(LogicalRepTyp *remotetyp)
Definition: relation.c:401
void ApplyWorkerMain(Datum main_arg)
Definition: worker.c:1585
uint32 LogicalRepRelId
Definition: logicalproto.h:39
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1522
#define lfirst_oid(lc)
Definition: pg_list.h:192
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
TimestampTz reply_time
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:210
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:256
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5737
void pgstat_report_stat(bool force)
Definition: pgstat.c:806
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:921
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:264
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:521