PostgreSQL Source Code  git master
worker.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * worker.c
3  * PostgreSQL logical replication worker (apply)
4  *
5  * Copyright (c) 2016-2019, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/worker.c
9  *
10  * NOTES
11  * This file contains the worker which applies logical changes as they come
12  * from remote logical replication stream.
13  *
14  * The main worker (apply) is started by logical replication worker
15  * launcher for every enabled subscription in a database. It uses
16  * walsender protocol to communicate with publisher.
17  *
18  * This module includes server facing code and shares libpqwalreceiver
19  * module with walreceiver for providing the libpq specific functionality.
20  *
21  *-------------------------------------------------------------------------
22  */
23 
24 #include "postgres.h"
25 
26 #include "access/table.h"
27 #include "access/tableam.h"
28 #include "access/xact.h"
29 #include "access/xlog_internal.h"
30 #include "catalog/catalog.h"
31 #include "catalog/namespace.h"
34 #include "commands/tablecmds.h"
35 #include "commands/trigger.h"
36 #include "executor/executor.h"
38 #include "funcapi.h"
39 #include "libpq/pqformat.h"
40 #include "libpq/pqsignal.h"
41 #include "mb/pg_wchar.h"
42 #include "miscadmin.h"
43 #include "nodes/makefuncs.h"
44 #include "optimizer/optimizer.h"
45 #include "parser/parse_relation.h"
46 #include "pgstat.h"
47 #include "postmaster/bgworker.h"
48 #include "postmaster/postmaster.h"
49 #include "postmaster/walwriter.h"
50 #include "replication/decode.h"
51 #include "replication/logical.h"
55 #include "replication/origin.h"
57 #include "replication/snapbuild.h"
60 #include "rewrite/rewriteHandler.h"
61 #include "storage/bufmgr.h"
62 #include "storage/ipc.h"
63 #include "storage/lmgr.h"
64 #include "storage/proc.h"
65 #include "storage/procarray.h"
66 #include "tcop/tcopprot.h"
67 #include "utils/builtins.h"
68 #include "utils/catcache.h"
69 #include "utils/datum.h"
70 #include "utils/fmgroids.h"
71 #include "utils/guc.h"
72 #include "utils/inval.h"
73 #include "utils/lsyscache.h"
74 #include "utils/memutils.h"
75 #include "utils/rel.h"
76 #include "utils/syscache.h"
77 #include "utils/timeout.h"
78 
79 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
80 
81 typedef struct FlushPosition
82 {
87 
89 
90 typedef struct SlotErrCallbackArg
91 {
96 
99 
101 
103 bool MySubscriptionValid = false;
104 
107 
108 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
109 
110 static void store_flush_position(XLogRecPtr remote_lsn);
111 
112 static void maybe_reread_subscription(void);
113 
114 /* Flags set by signal handlers */
115 static volatile sig_atomic_t got_SIGHUP = false;
116 
117 /*
118  * Should this worker apply changes for given relation.
119  *
120  * This is mainly needed for initial relation data sync as that runs in
121  * separate worker process running in parallel and we need some way to skip
122  * changes coming to the main apply worker during the sync of a table.
123  *
124  * Note we need to do smaller or equals comparison for SYNCDONE state because
125  * it might hold position of end of initial slot consistent point WAL
126  * record + 1 (ie start of next record) and next record can be COMMIT of
127  * transaction we are now processing (which is what we set remote_final_lsn
128  * to in apply_handle_begin).
129  */
130 static bool
132 {
133  if (am_tablesync_worker())
134  return MyLogicalRepWorker->relid == rel->localreloid;
135  else
136  return (rel->state == SUBREL_STATE_READY ||
137  (rel->state == SUBREL_STATE_SYNCDONE &&
138  rel->statelsn <= remote_final_lsn));
139 }
140 
141 /*
142  * Make sure that we started local transaction.
143  *
144  * Also switches to ApplyMessageContext as necessary.
145  */
146 static bool
148 {
149  if (IsTransactionState())
150  {
152 
153  if (CurrentMemoryContext != ApplyMessageContext)
154  MemoryContextSwitchTo(ApplyMessageContext);
155 
156  return false;
157  }
158 
161 
163 
164  MemoryContextSwitchTo(ApplyMessageContext);
165  return true;
166 }
167 
168 
169 /*
170  * Executor state preparation for evaluation of constraint expressions,
171  * indexes and triggers.
172  *
173  * This is based on similar code in copy.c
174  */
175 static EState *
177 {
178  EState *estate;
179  ResultRelInfo *resultRelInfo;
180  RangeTblEntry *rte;
181 
182  estate = CreateExecutorState();
183 
184  rte = makeNode(RangeTblEntry);
185  rte->rtekind = RTE_RELATION;
186  rte->relid = RelationGetRelid(rel->localrel);
187  rte->relkind = rel->localrel->rd_rel->relkind;
189  ExecInitRangeTable(estate, list_make1(rte));
190 
191  resultRelInfo = makeNode(ResultRelInfo);
192  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
193 
194  estate->es_result_relations = resultRelInfo;
195  estate->es_num_result_relations = 1;
196  estate->es_result_relation_info = resultRelInfo;
197 
198  estate->es_output_cid = GetCurrentCommandId(true);
199 
200  /* Prepare to catch AFTER triggers. */
202 
203  return estate;
204 }
205 
206 /*
207  * Executes default values for columns for which we can't map to remote
208  * relation columns.
209  *
210  * This allows us to support tables which have more columns on the downstream
211  * than on the upstream.
212  */
213 static void
215  TupleTableSlot *slot)
216 {
217  TupleDesc desc = RelationGetDescr(rel->localrel);
218  int num_phys_attrs = desc->natts;
219  int i;
220  int attnum,
221  num_defaults = 0;
222  int *defmap;
223  ExprState **defexprs;
224  ExprContext *econtext;
225 
226  econtext = GetPerTupleExprContext(estate);
227 
228  /* We got all the data via replication, no need to evaluate anything. */
229  if (num_phys_attrs == rel->remoterel.natts)
230  return;
231 
232  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
233  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
234 
235  for (attnum = 0; attnum < num_phys_attrs; attnum++)
236  {
237  Expr *defexpr;
238 
239  if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
240  continue;
241 
242  if (rel->attrmap[attnum] >= 0)
243  continue;
244 
245  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
246 
247  if (defexpr != NULL)
248  {
249  /* Run the expression through planner */
250  defexpr = expression_planner(defexpr);
251 
252  /* Initialize executable expression in copycontext */
253  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
254  defmap[num_defaults] = attnum;
255  num_defaults++;
256  }
257 
258  }
259 
260  for (i = 0; i < num_defaults; i++)
261  slot->tts_values[defmap[i]] =
262  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
263 }
264 
265 /*
266  * Error callback to give more context info about type conversion failure.
267  */
268 static void
270 {
271  SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
273  char *remotetypname;
274  Oid remotetypoid,
275  localtypoid;
276 
277  /* Nothing to do if remote attribute number is not set */
278  if (errarg->remote_attnum < 0)
279  return;
280 
281  rel = errarg->rel;
282  remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum];
283 
284  /* Fetch remote type name from the LogicalRepTypMap cache */
285  remotetypname = logicalrep_typmap_gettypname(remotetypoid);
286 
287  /* Fetch local type OID from the local sys cache */
288  localtypoid = get_atttype(rel->localreloid, errarg->local_attnum + 1);
289 
290  errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
291  "remote type %s, local type %s",
292  rel->remoterel.nspname, rel->remoterel.relname,
293  rel->remoterel.attnames[errarg->remote_attnum],
294  remotetypname,
295  format_type_be(localtypoid));
296 }
297 
298 /*
299  * Store data in C string form into slot.
300  * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
301  * use better.
302  */
303 static void
305  char **values)
306 {
307  int natts = slot->tts_tupleDescriptor->natts;
308  int i;
309  SlotErrCallbackArg errarg;
310  ErrorContextCallback errcallback;
311 
312  ExecClearTuple(slot);
313 
314  /* Push callback + info on the error context stack */
315  errarg.rel = rel;
316  errarg.local_attnum = -1;
317  errarg.remote_attnum = -1;
318  errcallback.callback = slot_store_error_callback;
319  errcallback.arg = (void *) &errarg;
320  errcallback.previous = error_context_stack;
321  error_context_stack = &errcallback;
322 
323  /* Call the "in" function for each non-dropped attribute */
324  for (i = 0; i < natts; i++)
325  {
327  int remoteattnum = rel->attrmap[i];
328 
329  if (!att->attisdropped && remoteattnum >= 0 &&
330  values[remoteattnum] != NULL)
331  {
332  Oid typinput;
333  Oid typioparam;
334 
335  errarg.local_attnum = i;
336  errarg.remote_attnum = remoteattnum;
337 
338  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
339  slot->tts_values[i] =
340  OidInputFunctionCall(typinput, values[remoteattnum],
341  typioparam, att->atttypmod);
342  slot->tts_isnull[i] = false;
343 
344  errarg.local_attnum = -1;
345  errarg.remote_attnum = -1;
346  }
347  else
348  {
349  /*
350  * We assign NULL to dropped attributes, NULL values, and missing
351  * values (missing values should be later filled using
352  * slot_fill_defaults).
353  */
354  slot->tts_values[i] = (Datum) 0;
355  slot->tts_isnull[i] = true;
356  }
357  }
358 
359  /* Pop the error context stack */
360  error_context_stack = errcallback.previous;
361 
362  ExecStoreVirtualTuple(slot);
363 }
364 
365 /*
366  * Modify slot with user data provided as C strings.
367  * This is somewhat similar to heap_modify_tuple but also calls the type
368  * input function on the user data as the input is the text representation
369  * of the types.
370  */
371 static void
373  char **values, bool *replaces)
374 {
375  int natts = slot->tts_tupleDescriptor->natts;
376  int i;
377  SlotErrCallbackArg errarg;
378  ErrorContextCallback errcallback;
379 
380  slot_getallattrs(slot);
381  ExecClearTuple(slot);
382 
383  /* Push callback + info on the error context stack */
384  errarg.rel = rel;
385  errarg.local_attnum = -1;
386  errarg.remote_attnum = -1;
387  errcallback.callback = slot_store_error_callback;
388  errcallback.arg = (void *) &errarg;
389  errcallback.previous = error_context_stack;
390  error_context_stack = &errcallback;
391 
392  /* Call the "in" function for each replaced attribute */
393  for (i = 0; i < natts; i++)
394  {
396  int remoteattnum = rel->attrmap[i];
397 
398  if (remoteattnum < 0)
399  continue;
400 
401  if (!replaces[remoteattnum])
402  continue;
403 
404  if (values[remoteattnum] != NULL)
405  {
406  Oid typinput;
407  Oid typioparam;
408 
409  errarg.local_attnum = i;
410  errarg.remote_attnum = remoteattnum;
411 
412  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
413  slot->tts_values[i] =
414  OidInputFunctionCall(typinput, values[remoteattnum],
415  typioparam, att->atttypmod);
416  slot->tts_isnull[i] = false;
417 
418  errarg.local_attnum = -1;
419  errarg.remote_attnum = -1;
420  }
421  else
422  {
423  slot->tts_values[i] = (Datum) 0;
424  slot->tts_isnull[i] = true;
425  }
426  }
427 
428  /* Pop the error context stack */
429  error_context_stack = errcallback.previous;
430 
431  ExecStoreVirtualTuple(slot);
432 }
433 
434 /*
435  * Handle BEGIN message.
436  */
437 static void
439 {
440  LogicalRepBeginData begin_data;
441 
442  logicalrep_read_begin(s, &begin_data);
443 
444  remote_final_lsn = begin_data.final_lsn;
445 
446  in_remote_transaction = true;
447 
449 }
450 
451 /*
452  * Handle COMMIT message.
453  *
454  * TODO, support tracking of multiple origins
455  */
456 static void
458 {
459  LogicalRepCommitData commit_data;
460 
461  logicalrep_read_commit(s, &commit_data);
462 
463  Assert(commit_data.commit_lsn == remote_final_lsn);
464 
465  /* The synchronization worker runs in single transaction. */
467  {
468  /*
469  * Update origin state so we can restart streaming from correct
470  * position in case of crash.
471  */
474 
476  pgstat_report_stat(false);
477 
478  store_flush_position(commit_data.end_lsn);
479  }
480  else
481  {
482  /* Process any invalidation messages that might have accumulated. */
485  }
486 
487  in_remote_transaction = false;
488 
489  /* Process any tables that are being synchronized in parallel. */
490  process_syncing_tables(commit_data.end_lsn);
491 
493 }
494 
495 /*
496  * Handle ORIGIN message.
497  *
498  * TODO, support tracking of multiple origins
499  */
500 static void
502 {
503  /*
504  * ORIGIN message can only come inside remote transaction and before any
505  * actual writes.
506  */
507  if (!in_remote_transaction ||
509  ereport(ERROR,
510  (errcode(ERRCODE_PROTOCOL_VIOLATION),
511  errmsg("ORIGIN message sent out of order")));
512 }
513 
514 /*
515  * Handle RELATION message.
516  *
517  * Note we don't do validation against local schema here. The validation
518  * against local schema is postponed until first change for given relation
519  * comes as we only care about it when applying changes for it anyway and we
520  * do less locking this way.
521  */
522 static void
524 {
525  LogicalRepRelation *rel;
526 
527  rel = logicalrep_read_rel(s);
529 }
530 
531 /*
532  * Handle TYPE message.
533  *
534  * Note we don't do local mapping here, that's done when the type is
535  * actually used.
536  */
537 static void
539 {
540  LogicalRepTyp typ;
541 
542  logicalrep_read_typ(s, &typ);
544 }
545 
546 /*
547  * Get replica identity index or if it is not defined a primary key.
548  *
549  * If neither is defined, returns InvalidOid
550  */
551 static Oid
553 {
554  Oid idxoid;
555 
556  idxoid = RelationGetReplicaIndex(rel);
557 
558  if (!OidIsValid(idxoid))
559  idxoid = RelationGetPrimaryKeyIndex(rel);
560 
561  return idxoid;
562 }
563 
564 /*
565  * Handle INSERT message.
566  */
567 static void
569 {
571  LogicalRepTupleData newtup;
572  LogicalRepRelId relid;
573  EState *estate;
574  TupleTableSlot *remoteslot;
575  MemoryContext oldctx;
576 
578 
579  relid = logicalrep_read_insert(s, &newtup);
582  {
583  /*
584  * The relation can't become interesting in the middle of the
585  * transaction so it's safe to unlock it.
586  */
588  return;
589  }
590 
591  /* Initialize the executor state. */
592  estate = create_estate_for_relation(rel);
593  remoteslot = ExecInitExtraTupleSlot(estate,
595  &TTSOpsVirtual);
596 
597  /* Input functions may need an active snapshot, so get one */
599 
600  /* Process and store remote tuple in the slot */
602  slot_store_cstrings(remoteslot, rel, newtup.values);
603  slot_fill_defaults(rel, estate, remoteslot);
604  MemoryContextSwitchTo(oldctx);
605 
607 
608  /* Do the insert. */
609  ExecSimpleRelationInsert(estate, remoteslot);
610 
611  /* Cleanup. */
614 
615  /* Handle queued AFTER triggers. */
616  AfterTriggerEndQuery(estate);
617 
618  ExecResetTupleTable(estate->es_tupleTable, false);
619  FreeExecutorState(estate);
620 
622 
624 }
625 
626 /*
627  * Check if the logical replication relation is updatable and throw
628  * appropriate error if it isn't.
629  */
630 static void
632 {
633  /* Updatable, no error. */
634  if (rel->updatable)
635  return;
636 
637  /*
638  * We are in error mode so it's fine this is somewhat slow. It's better to
639  * give user correct error.
640  */
642  {
643  ereport(ERROR,
644  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
645  errmsg("publisher did not send replica identity column "
646  "expected by the logical replication target relation \"%s.%s\"",
647  rel->remoterel.nspname, rel->remoterel.relname)));
648  }
649 
650  ereport(ERROR,
651  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
652  errmsg("logical replication target relation \"%s.%s\" has "
653  "neither REPLICA IDENTITY index nor PRIMARY "
654  "KEY and published relation does not have "
655  "REPLICA IDENTITY FULL",
656  rel->remoterel.nspname, rel->remoterel.relname)));
657 }
658 
659 /*
660  * Handle UPDATE message.
661  *
662  * TODO: FDW support
663  */
664 static void
666 {
668  LogicalRepRelId relid;
669  Oid idxoid;
670  EState *estate;
671  EPQState epqstate;
672  LogicalRepTupleData oldtup;
673  LogicalRepTupleData newtup;
674  bool has_oldtup;
675  TupleTableSlot *localslot;
676  TupleTableSlot *remoteslot;
677  bool found;
678  MemoryContext oldctx;
679 
681 
682  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
683  &newtup);
686  {
687  /*
688  * The relation can't become interesting in the middle of the
689  * transaction so it's safe to unlock it.
690  */
692  return;
693  }
694 
695  /* Check if we can do the update. */
697 
698  /* Initialize the executor state. */
699  estate = create_estate_for_relation(rel);
700  remoteslot = ExecInitExtraTupleSlot(estate,
702  &TTSOpsVirtual);
703  localslot = table_slot_create(rel->localrel,
704  &estate->es_tupleTable);
705  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
706 
709 
710  /* Build the search tuple. */
712  slot_store_cstrings(remoteslot, rel,
713  has_oldtup ? oldtup.values : newtup.values);
714  MemoryContextSwitchTo(oldctx);
715 
716  /*
717  * Try to find tuple using either replica identity index, primary key or
718  * if needed, sequential scan.
719  */
720  idxoid = GetRelationIdentityOrPK(rel->localrel);
721  Assert(OidIsValid(idxoid) ||
722  (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
723 
724  if (OidIsValid(idxoid))
725  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
727  remoteslot, localslot);
728  else
730  remoteslot, localslot);
731 
732  ExecClearTuple(remoteslot);
733 
734  /*
735  * Tuple found.
736  *
737  * Note this will fail if there are other conflicting unique indexes.
738  */
739  if (found)
740  {
741  /* Process and store remote tuple in the slot */
743  ExecCopySlot(remoteslot, localslot);
744  slot_modify_cstrings(remoteslot, rel, newtup.values, newtup.changed);
745  MemoryContextSwitchTo(oldctx);
746 
747  EvalPlanQualSetSlot(&epqstate, remoteslot);
748 
749  /* Do the actual update. */
750  ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
751  }
752  else
753  {
754  /*
755  * The tuple to be updated could not be found.
756  *
757  * TODO what to do here, change the log level to LOG perhaps?
758  */
759  elog(DEBUG1,
760  "logical replication did not find row for update "
761  "in replication target relation \"%s\"",
763  }
764 
765  /* Cleanup. */
768 
769  /* Handle queued AFTER triggers. */
770  AfterTriggerEndQuery(estate);
771 
772  EvalPlanQualEnd(&epqstate);
773  ExecResetTupleTable(estate->es_tupleTable, false);
774  FreeExecutorState(estate);
775 
777 
779 }
780 
781 /*
782  * Handle DELETE message.
783  *
784  * TODO: FDW support
785  */
786 static void
788 {
790  LogicalRepTupleData oldtup;
791  LogicalRepRelId relid;
792  Oid idxoid;
793  EState *estate;
794  EPQState epqstate;
795  TupleTableSlot *remoteslot;
796  TupleTableSlot *localslot;
797  bool found;
798  MemoryContext oldctx;
799 
801 
802  relid = logicalrep_read_delete(s, &oldtup);
805  {
806  /*
807  * The relation can't become interesting in the middle of the
808  * transaction so it's safe to unlock it.
809  */
811  return;
812  }
813 
814  /* Check if we can do the delete. */
816 
817  /* Initialize the executor state. */
818  estate = create_estate_for_relation(rel);
819  remoteslot = ExecInitExtraTupleSlot(estate,
821  &TTSOpsVirtual);
822  localslot = table_slot_create(rel->localrel,
823  &estate->es_tupleTable);
824  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
825 
828 
829  /* Find the tuple using the replica identity index. */
831  slot_store_cstrings(remoteslot, rel, oldtup.values);
832  MemoryContextSwitchTo(oldctx);
833 
834  /*
835  * Try to find tuple using either replica identity index, primary key or
836  * if needed, sequential scan.
837  */
838  idxoid = GetRelationIdentityOrPK(rel->localrel);
839  Assert(OidIsValid(idxoid) ||
840  (rel->remoterel.replident == REPLICA_IDENTITY_FULL));
841 
842  if (OidIsValid(idxoid))
843  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
845  remoteslot, localslot);
846  else
848  remoteslot, localslot);
849  /* If found delete it. */
850  if (found)
851  {
852  EvalPlanQualSetSlot(&epqstate, localslot);
853 
854  /* Do the actual delete. */
855  ExecSimpleRelationDelete(estate, &epqstate, localslot);
856  }
857  else
858  {
859  /* The tuple to be deleted could not be found. */
860  elog(DEBUG1,
861  "logical replication could not find row for delete "
862  "in replication target relation \"%s\"",
864  }
865 
866  /* Cleanup. */
869 
870  /* Handle queued AFTER triggers. */
871  AfterTriggerEndQuery(estate);
872 
873  EvalPlanQualEnd(&epqstate);
874  ExecResetTupleTable(estate->es_tupleTable, false);
875  FreeExecutorState(estate);
876 
878 
880 }
881 
882 /*
883  * Handle TRUNCATE message.
884  *
885  * TODO: FDW support
886  */
887 static void
889 {
890  bool cascade = false;
891  bool restart_seqs = false;
892  List *remote_relids = NIL;
893  List *remote_rels = NIL;
894  List *rels = NIL;
895  List *relids = NIL;
896  List *relids_logged = NIL;
897  ListCell *lc;
898 
900 
901  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
902 
903  foreach(lc, remote_relids)
904  {
905  LogicalRepRelId relid = lfirst_oid(lc);
907 
910  {
911  /*
912  * The relation can't become interesting in the middle of the
913  * transaction so it's safe to unlock it.
914  */
916  continue;
917  }
918 
919  remote_rels = lappend(remote_rels, rel);
920  rels = lappend(rels, rel->localrel);
921  relids = lappend_oid(relids, rel->localreloid);
923  relids_logged = lappend_oid(relids_logged, rel->localreloid);
924  }
925 
926  /*
927  * Even if we used CASCADE on the upstream master we explicitly default to
928  * replaying changes without further cascading. This might be later
929  * changeable with a user specified option.
930  */
931  ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
932 
933  foreach(lc, remote_rels)
934  {
935  LogicalRepRelMapEntry *rel = lfirst(lc);
936 
938  }
939 
941 }
942 
943 
944 /*
945  * Logical replication protocol message dispatcher.
946  */
947 static void
949 {
950  char action = pq_getmsgbyte(s);
951 
952  switch (action)
953  {
954  /* BEGIN */
955  case 'B':
957  break;
958  /* COMMIT */
959  case 'C':
961  break;
962  /* INSERT */
963  case 'I':
965  break;
966  /* UPDATE */
967  case 'U':
969  break;
970  /* DELETE */
971  case 'D':
973  break;
974  /* TRUNCATE */
975  case 'T':
977  break;
978  /* RELATION */
979  case 'R':
981  break;
982  /* TYPE */
983  case 'Y':
985  break;
986  /* ORIGIN */
987  case 'O':
989  break;
990  default:
991  ereport(ERROR,
992  (errcode(ERRCODE_PROTOCOL_VIOLATION),
993  errmsg("invalid logical replication message type \"%c\"", action)));
994  }
995 }
996 
997 /*
998  * Figure out which write/flush positions to report to the walsender process.
999  *
1000  * We can't simply report back the last LSN the walsender sent us because the
1001  * local transaction might not yet be flushed to disk locally. Instead we
1002  * build a list that associates local with remote LSNs for every commit. When
1003  * reporting back the flush position to the sender we iterate that list and
1004  * check which entries on it are already locally flushed. Those we can report
1005  * as having been flushed.
1006  *
1007  * The have_pending_txes is true if there are outstanding transactions that
1008  * need to be flushed.
1009  */
1010 static void
1012  bool *have_pending_txes)
1013 {
1014  dlist_mutable_iter iter;
1015  XLogRecPtr local_flush = GetFlushRecPtr();
1016 
1017  *write = InvalidXLogRecPtr;
1018  *flush = InvalidXLogRecPtr;
1019 
1020  dlist_foreach_modify(iter, &lsn_mapping)
1021  {
1022  FlushPosition *pos =
1024 
1025  *write = pos->remote_end;
1026 
1027  if (pos->local_end <= local_flush)
1028  {
1029  *flush = pos->remote_end;
1030  dlist_delete(iter.cur);
1031  pfree(pos);
1032  }
1033  else
1034  {
1035  /*
1036  * Don't want to uselessly iterate over the rest of the list which
1037  * could potentially be long. Instead get the last element and
1038  * grab the write position from there.
1039  */
1041  &lsn_mapping);
1042  *write = pos->remote_end;
1043  *have_pending_txes = true;
1044  return;
1045  }
1046  }
1047 
1048  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
1049 }
1050 
1051 /*
1052  * Store current remote/local lsn pair in the tracking list.
1053  */
1054 static void
1056 {
1057  FlushPosition *flushpos;
1058 
1059  /* Need to do this in permanent context */
1060  MemoryContextSwitchTo(ApplyContext);
1061 
1062  /* Track commit lsn */
1063  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
1064  flushpos->local_end = XactLastCommitEnd;
1065  flushpos->remote_end = remote_lsn;
1066 
1067  dlist_push_tail(&lsn_mapping, &flushpos->node);
1068  MemoryContextSwitchTo(ApplyMessageContext);
1069 }
1070 
1071 
1072 /* Update statistics of the worker. */
1073 static void
1074 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
1075 {
1076  MyLogicalRepWorker->last_lsn = last_lsn;
1077  MyLogicalRepWorker->last_send_time = send_time;
1079  if (reply)
1080  {
1081  MyLogicalRepWorker->reply_lsn = last_lsn;
1082  MyLogicalRepWorker->reply_time = send_time;
1083  }
1084 }
1085 
1086 /*
1087  * Apply main loop.
1088  */
1089 static void
1091 {
1092  /*
1093  * Init the ApplyMessageContext which we clean up after each replication
1094  * protocol message.
1095  */
1096  ApplyMessageContext = AllocSetContextCreate(ApplyContext,
1097  "ApplyMessageContext",
1099 
1100  /* mark as idle, before starting to loop */
1102 
1103  for (;;)
1104  {
1106  int rc;
1107  int len;
1108  char *buf = NULL;
1109  bool endofstream = false;
1110  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1111  bool ping_sent = false;
1112  long wait_time;
1113 
1115 
1116  MemoryContextSwitchTo(ApplyMessageContext);
1117 
1118  len = walrcv_receive(wrconn, &buf, &fd);
1119 
1120  if (len != 0)
1121  {
1122  /* Process the data */
1123  for (;;)
1124  {
1126 
1127  if (len == 0)
1128  {
1129  break;
1130  }
1131  else if (len < 0)
1132  {
1133  ereport(LOG,
1134  (errmsg("data stream from publisher has ended")));
1135  endofstream = true;
1136  break;
1137  }
1138  else
1139  {
1140  int c;
1141  StringInfoData s;
1142 
1143  /* Reset timeout. */
1144  last_recv_timestamp = GetCurrentTimestamp();
1145  ping_sent = false;
1146 
1147  /* Ensure we are reading the data into our memory context. */
1148  MemoryContextSwitchTo(ApplyMessageContext);
1149 
1150  s.data = buf;
1151  s.len = len;
1152  s.cursor = 0;
1153  s.maxlen = -1;
1154 
1155  c = pq_getmsgbyte(&s);
1156 
1157  if (c == 'w')
1158  {
1159  XLogRecPtr start_lsn;
1160  XLogRecPtr end_lsn;
1161  TimestampTz send_time;
1162 
1163  start_lsn = pq_getmsgint64(&s);
1164  end_lsn = pq_getmsgint64(&s);
1165  send_time = pq_getmsgint64(&s);
1166 
1167  if (last_received < start_lsn)
1168  last_received = start_lsn;
1169 
1170  if (last_received < end_lsn)
1171  last_received = end_lsn;
1172 
1173  UpdateWorkerStats(last_received, send_time, false);
1174 
1175  apply_dispatch(&s);
1176  }
1177  else if (c == 'k')
1178  {
1179  XLogRecPtr end_lsn;
1181  bool reply_requested;
1182 
1183  end_lsn = pq_getmsgint64(&s);
1184  timestamp = pq_getmsgint64(&s);
1185  reply_requested = pq_getmsgbyte(&s);
1186 
1187  if (last_received < end_lsn)
1188  last_received = end_lsn;
1189 
1190  send_feedback(last_received, reply_requested, false);
1191  UpdateWorkerStats(last_received, timestamp, true);
1192  }
1193  /* other message types are purposefully ignored */
1194 
1195  MemoryContextReset(ApplyMessageContext);
1196  }
1197 
1198  len = walrcv_receive(wrconn, &buf, &fd);
1199  }
1200  }
1201 
1202  /* confirm all writes so far */
1203  send_feedback(last_received, false, false);
1204 
1205  if (!in_remote_transaction)
1206  {
1207  /*
1208  * If we didn't get any transactions for a while there might be
1209  * unconsumed invalidation messages in the queue, consume them
1210  * now.
1211  */
1214 
1215  /* Process any table synchronization changes. */
1216  process_syncing_tables(last_received);
1217  }
1218 
1219  /* Cleanup the memory. */
1220  MemoryContextResetAndDeleteChildren(ApplyMessageContext);
1222 
1223  /* Check if we need to exit the streaming loop. */
1224  if (endofstream)
1225  {
1226  TimeLineID tli;
1227 
1228  walrcv_endstreaming(wrconn, &tli);
1229  break;
1230  }
1231 
1232  /*
1233  * Wait for more data or latch. If we have unflushed transactions,
1234  * wake up after WalWriterDelay to see if they've been flushed yet (in
1235  * which case we should send a feedback message). Otherwise, there's
1236  * no particular urgency about waking up unless we get data or a
1237  * signal.
1238  */
1239  if (!dlist_is_empty(&lsn_mapping))
1240  wait_time = WalWriterDelay;
1241  else
1242  wait_time = NAPTIME_PER_CYCLE;
1243 
1247  fd, wait_time,
1249 
1250  if (rc & WL_LATCH_SET)
1251  {
1254  }
1255 
1256  if (got_SIGHUP)
1257  {
1258  got_SIGHUP = false;
1260  }
1261 
1262  if (rc & WL_TIMEOUT)
1263  {
1264  /*
1265  * We didn't receive anything new. If we haven't heard anything
1266  * from the server for more than wal_receiver_timeout / 2, ping
1267  * the server. Also, if it's been longer than
1268  * wal_receiver_status_interval since the last update we sent,
1269  * send a status update to the master anyway, to report any
1270  * progress in applying WAL.
1271  */
1272  bool requestReply = false;
1273 
1274  /*
1275  * Check if time since last receive from standby has reached the
1276  * configured limit.
1277  */
1278  if (wal_receiver_timeout > 0)
1279  {
1281  TimestampTz timeout;
1282 
1283  timeout =
1284  TimestampTzPlusMilliseconds(last_recv_timestamp,
1286 
1287  if (now >= timeout)
1288  ereport(ERROR,
1289  (errmsg("terminating logical replication worker due to timeout")));
1290 
1291  /*
1292  * We didn't receive anything new, for half of receiver
1293  * replication timeout. Ping the server.
1294  */
1295  if (!ping_sent)
1296  {
1297  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1298  (wal_receiver_timeout / 2));
1299  if (now >= timeout)
1300  {
1301  requestReply = true;
1302  ping_sent = true;
1303  }
1304  }
1305  }
1306 
1307  send_feedback(last_received, requestReply, requestReply);
1308  }
1309  }
1310 }
1311 
1312 /*
1313  * Send a Standby Status Update message to server.
1314  *
1315  * 'recvpos' is the latest LSN we've received data to, force is set if we need
1316  * to send a response to avoid timeouts.
1317  */
1318 static void
1319 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
1320 {
1321  static StringInfo reply_message = NULL;
1322  static TimestampTz send_time = 0;
1323 
1324  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1325  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1326  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1327 
1328  XLogRecPtr writepos;
1329  XLogRecPtr flushpos;
1330  TimestampTz now;
1331  bool have_pending_txes;
1332 
1333  /*
1334  * If the user doesn't want status to be reported to the publisher, be
1335  * sure to exit before doing anything at all.
1336  */
1337  if (!force && wal_receiver_status_interval <= 0)
1338  return;
1339 
1340  /* It's legal to not pass a recvpos */
1341  if (recvpos < last_recvpos)
1342  recvpos = last_recvpos;
1343 
1344  get_flush_position(&writepos, &flushpos, &have_pending_txes);
1345 
1346  /*
1347  * No outstanding transactions to flush, we can report the latest received
1348  * position. This is important for synchronous replication.
1349  */
1350  if (!have_pending_txes)
1351  flushpos = writepos = recvpos;
1352 
1353  if (writepos < last_writepos)
1354  writepos = last_writepos;
1355 
1356  if (flushpos < last_flushpos)
1357  flushpos = last_flushpos;
1358 
1359  now = GetCurrentTimestamp();
1360 
1361  /* if we've already reported everything we're good */
1362  if (!force &&
1363  writepos == last_writepos &&
1364  flushpos == last_flushpos &&
1365  !TimestampDifferenceExceeds(send_time, now,
1367  return;
1368  send_time = now;
1369 
1370  if (!reply_message)
1371  {
1372  MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
1373 
1374  reply_message = makeStringInfo();
1375  MemoryContextSwitchTo(oldctx);
1376  }
1377  else
1378  resetStringInfo(reply_message);
1379 
1380  pq_sendbyte(reply_message, 'r');
1381  pq_sendint64(reply_message, recvpos); /* write */
1382  pq_sendint64(reply_message, flushpos); /* flush */
1383  pq_sendint64(reply_message, writepos); /* apply */
1384  pq_sendint64(reply_message, now); /* sendTime */
1385  pq_sendbyte(reply_message, requestReply); /* replyRequested */
1386 
1387  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1388  force,
1389  (uint32) (recvpos >> 32), (uint32) recvpos,
1390  (uint32) (writepos >> 32), (uint32) writepos,
1391  (uint32) (flushpos >> 32), (uint32) flushpos
1392  );
1393 
1394  walrcv_send(wrconn, reply_message->data, reply_message->len);
1395 
1396  if (recvpos > last_recvpos)
1397  last_recvpos = recvpos;
1398  if (writepos > last_writepos)
1399  last_writepos = writepos;
1400  if (flushpos > last_flushpos)
1401  last_flushpos = flushpos;
1402 }
1403 
1404 /*
1405  * Reread subscription info if needed. Most changes will be exit.
1406  */
1407 static void
1409 {
1410  MemoryContext oldctx;
1412  bool started_tx = false;
1413 
1414  /* When cache state is valid there is nothing to do here. */
1415  if (MySubscriptionValid)
1416  return;
1417 
1418  /* This function might be called inside or outside of transaction. */
1419  if (!IsTransactionState())
1420  {
1422  started_tx = true;
1423  }
1424 
1425  /* Ensure allocations in permanent context. */
1426  oldctx = MemoryContextSwitchTo(ApplyContext);
1427 
1428  newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1429 
1430  /*
1431  * Exit if the subscription was removed. This normally should not happen
1432  * as the worker gets killed during DROP SUBSCRIPTION.
1433  */
1434  if (!newsub)
1435  {
1436  ereport(LOG,
1437  (errmsg("logical replication apply worker for subscription \"%s\" will "
1438  "stop because the subscription was removed",
1439  MySubscription->name)));
1440 
1441  proc_exit(0);
1442  }
1443 
1444  /*
1445  * Exit if the subscription was disabled. This normally should not happen
1446  * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
1447  */
1448  if (!newsub->enabled)
1449  {
1450  ereport(LOG,
1451  (errmsg("logical replication apply worker for subscription \"%s\" will "
1452  "stop because the subscription was disabled",
1453  MySubscription->name)));
1454 
1455  proc_exit(0);
1456  }
1457 
1458  /*
1459  * Exit if connection string was changed. The launcher will start new
1460  * worker.
1461  */
1462  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1463  {
1464  ereport(LOG,
1465  (errmsg("logical replication apply worker for subscription \"%s\" will "
1466  "restart because the connection information was changed",
1467  MySubscription->name)));
1468 
1469  proc_exit(0);
1470  }
1471 
1472  /*
1473  * Exit if subscription name was changed (it's used for
1474  * fallback_application_name). The launcher will start new worker.
1475  */
1476  if (strcmp(newsub->name, MySubscription->name) != 0)
1477  {
1478  ereport(LOG,
1479  (errmsg("logical replication apply worker for subscription \"%s\" will "
1480  "restart because subscription was renamed",
1481  MySubscription->name)));
1482 
1483  proc_exit(0);
1484  }
1485 
1486  /* !slotname should never happen when enabled is true. */
1487  Assert(newsub->slotname);
1488 
1489  /*
1490  * We need to make new connection to new slot if slot name has changed so
1491  * exit here as well if that's the case.
1492  */
1493  if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
1494  {
1495  ereport(LOG,
1496  (errmsg("logical replication apply worker for subscription \"%s\" will "
1497  "restart because the replication slot name was changed",
1498  MySubscription->name)));
1499 
1500  proc_exit(0);
1501  }
1502 
1503  /*
1504  * Exit if publication list was changed. The launcher will start new
1505  * worker.
1506  */
1507  if (!equal(newsub->publications, MySubscription->publications))
1508  {
1509  ereport(LOG,
1510  (errmsg("logical replication apply worker for subscription \"%s\" will "
1511  "restart because subscription's publications were changed",
1512  MySubscription->name)));
1513 
1514  proc_exit(0);
1515  }
1516 
1517  /* Check for other changes that should never happen too. */
1518  if (newsub->dbid != MySubscription->dbid)
1519  {
1520  elog(ERROR, "subscription %u changed unexpectedly",
1522  }
1523 
1524  /* Clean old subscription info and switch to new one. */
1525  FreeSubscription(MySubscription);
1526  MySubscription = newsub;
1527 
1528  MemoryContextSwitchTo(oldctx);
1529 
1530  /* Change synchronous commit according to the user's wishes */
1531  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1533 
1534  if (started_tx)
1536 
1537  MySubscriptionValid = true;
1538 }
1539 
1540 /*
1541  * Callback from subscription syscache invalidation.
1542  */
1543 static void
1544 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
1545 {
1546  MySubscriptionValid = false;
1547 }
1548 
1549 /* SIGHUP: set flag to reload configuration at next convenient time */
1550 static void
1552 {
1553  int save_errno = errno;
1554 
1555  got_SIGHUP = true;
1556 
1557  /* Waken anything waiting on the process latch */
1558  SetLatch(MyLatch);
1559 
1560  errno = save_errno;
1561 }
1562 
1563 /* Logical Replication Apply worker entry point */
1564 void
1566 {
1567  int worker_slot = DatumGetInt32(main_arg);
1568  MemoryContext oldctx;
1569  char originname[NAMEDATALEN];
1570  XLogRecPtr origin_startpos;
1571  char *myslotname;
1573 
1574  /* Attach to slot */
1575  logicalrep_worker_attach(worker_slot);
1576 
1577  /* Setup signal handling */
1579  pqsignal(SIGTERM, die);
1581 
1582  /*
1583  * We don't currently need any ResourceOwner in a walreceiver process, but
1584  * if we did, we could call CreateAuxProcessResourceOwner here.
1585  */
1586 
1587  /* Initialise stats to a sanish value */
1590 
1591  /* Load the libpq-specific functions */
1592  load_file("libpqwalreceiver", false);
1593 
1594  /* Run as replica session replication role. */
1595  SetConfigOption("session_replication_role", "replica",
1597 
1598  /* Connect to our database. */
1601  0);
1602 
1603  /* Load the subscription into persistent memory context. */
1604  ApplyContext = AllocSetContextCreate(TopMemoryContext,
1605  "ApplyContext",
1608  oldctx = MemoryContextSwitchTo(ApplyContext);
1609 
1610  MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
1611  if (!MySubscription)
1612  {
1613  ereport(LOG,
1614  (errmsg("logical replication apply worker for subscription %u will not "
1615  "start because the subscription was removed during startup",
1617  proc_exit(0);
1618  }
1619 
1620  MySubscriptionValid = true;
1621  MemoryContextSwitchTo(oldctx);
1622 
1623  if (!MySubscription->enabled)
1624  {
1625  ereport(LOG,
1626  (errmsg("logical replication apply worker for subscription \"%s\" will not "
1627  "start because the subscription was disabled during startup",
1628  MySubscription->name)));
1629 
1630  proc_exit(0);
1631  }
1632 
1633  /* Setup synchronous commit according to the user's wishes */
1634  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1636 
1637  /* Keep us informed about subscription changes. */
1640  (Datum) 0);
1641 
1642  if (am_tablesync_worker())
1643  ereport(LOG,
1644  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
1645  MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
1646  else
1647  ereport(LOG,
1648  (errmsg("logical replication apply worker for subscription \"%s\" has started",
1649  MySubscription->name)));
1650 
1652 
1653  /* Connect to the origin and start the replication. */
1654  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1655  MySubscription->conninfo);
1656 
1657  if (am_tablesync_worker())
1658  {
1659  char *syncslotname;
1660 
1661  /* This is table synchronization worker, call initial sync. */
1662  syncslotname = LogicalRepSyncTableStart(&origin_startpos);
1663 
1664  /* The slot name needs to be allocated in permanent memory context. */
1665  oldctx = MemoryContextSwitchTo(ApplyContext);
1666  myslotname = pstrdup(syncslotname);
1667  MemoryContextSwitchTo(oldctx);
1668 
1669  pfree(syncslotname);
1670  }
1671  else
1672  {
1673  /* This is main apply worker */
1674  RepOriginId originid;
1675  TimeLineID startpointTLI;
1676  char *err;
1677 
1678  myslotname = MySubscription->slotname;
1679 
1680  /*
1681  * This shouldn't happen if the subscription is enabled, but guard
1682  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
1683  * crash if slot is NULL.)
1684  */
1685  if (!myslotname)
1686  ereport(ERROR,
1687  (errmsg("subscription has no replication slot set")));
1688 
1689  /* Setup replication origin tracking. */
1691  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1692  originid = replorigin_by_name(originname, true);
1693  if (!OidIsValid(originid))
1694  originid = replorigin_create(originname);
1695  replorigin_session_setup(originid);
1696  replorigin_session_origin = originid;
1697  origin_startpos = replorigin_session_get_progress(false);
1699 
1700  wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
1701  &err);
1702  if (wrconn == NULL)
1703  ereport(ERROR,
1704  (errmsg("could not connect to the publisher: %s", err)));
1705 
1706  /*
1707  * We don't really use the output identify_system for anything but it
1708  * does some initializations on the upstream so let's still call it.
1709  */
1710  (void) walrcv_identify_system(wrconn, &startpointTLI);
1711 
1712  }
1713 
1714  /*
1715  * Setup callback for syscache so that we know when something changes in
1716  * the subscription relation state.
1717  */
1720  (Datum) 0);
1721 
1722  /* Build logical replication streaming options. */
1723  options.logical = true;
1724  options.startpoint = origin_startpos;
1725  options.slotname = myslotname;
1726  options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1727  options.proto.logical.publication_names = MySubscription->publications;
1728 
1729  /* Start normal logical streaming replication. */
1730  walrcv_startstreaming(wrconn, &options);
1731 
1732  /* Run the main loop. */
1733  LogicalRepApplyLoop(origin_startpos);
1734 
1735  proc_exit(0);
1736 }
1737 
1738 /*
1739  * Is current process a logical replication worker?
1740  */
1741 bool
1743 {
1744  return MyLogicalRepWorker != NULL;
1745 }
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:77
void ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
Subscription * MySubscription
Definition: worker.c:102
static void apply_handle_type(StringInfo s)
Definition: worker.c:538
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:476
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:131
#define NIL
Definition: pg_list.h:65
void ExecInitRangeTable(EState *estate, List *rangeTable)
Definition: execUtils.c:726
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:176
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:552
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, Relation partition_root, int instrument_options)
Definition: execMain.c:1279
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:1319
WalReceiverConn * wrconn
Definition: worker.c:100
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define AllocSetContextCreate
Definition: memutils.h:169
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:271
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
static void store_flush_position(XLogRecPtr remote_lsn)
Definition: worker.c:1055
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1796
uint32 TimeLineID
Definition: xlogdefs.h:52
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:280
void AcceptInvalidationMessages(void)
Definition: inval.c:681
CommandId es_output_cid
Definition: execnodes.h:517
static XLogRecPtr remote_final_lsn
Definition: worker.c:106
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4557
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
static void apply_handle_insert(StringInfo s)
Definition: worker.c:568
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:426
static dlist_head lsn_mapping
Definition: worker.c:88
#define DatumGetInt32(X)
Definition: postgres.h:472
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:2998
#define RelationGetDescr(relation)
Definition: rel.h:442
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:536
dlist_node node
Definition: worker.c:83
#define write(a, b, c)
Definition: win32.h:14
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3121
int64 timestamp
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:60
int64 TimestampTz
Definition: timestamp.h:39
LogicalRepRelMapEntry * rel
Definition: worker.c:92
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:263
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:329
char * pstrdup(const char *in)
Definition: mcxt.c:1161
void CommitTransactionCommand(void)
Definition: xact.c:2895
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:1074
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:353
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
StringInfo makeStringInfo(void)
Definition: stringinfo.c:28
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:273
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:379
Expr * expression_planner(Expr *expr)
Definition: planner.c:6046
union WalRcvStreamOptions::@106 proto
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define AccessShareLock
Definition: lockdefs.h:36
TimestampTz last_send_time
uint16 RepOriginId
Definition: xlogdefs.h:58
XLogRecPtr last_lsn
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:269
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1192
XLogRecPtr remote_end
Definition: worker.c:85
int errcode(int sqlerrcode)
Definition: elog.c:570
char * format_type_be(Oid type_oid)
Definition: format_type.c:326
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:368
Datum * tts_values
Definition: tuptable.h:126
#define WL_SOCKET_READABLE
Definition: latch.h:125
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8230
void PopActiveSnapshot(void)
Definition: snapmgr.c:814
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:588
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1057
#define LOG
Definition: elog.h:26
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
Form_pg_class rd_rel
Definition: rel.h:83
unsigned int Oid
Definition: postgres_ext.h:31
void SetLatch(Latch *latch)
Definition: latch.c:436
struct SlotErrCallbackArg SlotErrCallbackArg
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:805
bool IsLogicalWorker(void)
Definition: worker.c:1742
void(* callback)(void *arg)
Definition: elog.h:254
int wal_receiver_status_interval
Definition: walreceiver.c:75
List * lappend_oid(List *list, Oid datum)
Definition: list.c:357
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1668
struct ErrorContextCallback * previous
Definition: elog.h:253
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:306
#define OidIsValid(objectId)
Definition: c.h:638
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
static int fd(const char *x, int i)
Definition: preproc-init.c:105
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:212
void ResetLatch(Latch *latch)
Definition: latch.c:519
int wal_receiver_timeout
Definition: walreceiver.c:76
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:1011
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:151
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2939
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:163
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
ErrorContextCallback * error_context_stack
Definition: elog.c:88
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:214
#define list_make1(x1)
Definition: pg_list.h:227
#define NAMEDATALEN
void FreeExecutorState(EState *estate)
Definition: execUtils.c:192
Subscription * GetSubscription(Oid subid, bool missing_ok)
#define GetPerTupleExprContext(estate)
Definition: executor.h:501
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:154
static StringInfoData reply_message
Definition: walreceiver.c:112
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:1544
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1031
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:64
#define ERROR
Definition: elog.h:43
static bool ensure_transaction(void)
Definition: worker.c:147
LogicalRepRelation remoterel
#define NAPTIME_PER_CYCLE
Definition: worker.c:79
bool in_remote_transaction
Definition: worker.c:105
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs)
Definition: tablecmds.c:1617
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:431
Definition: guc.h:75
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:304
XLogRecPtr reply_lsn
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:355
static bool am_tablesync_worker(void)
static void apply_handle_delete(StringInfo s)
Definition: worker.c:787
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:191
#define DEBUG2
Definition: elog.h:24
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:157
char * c
void logicalrep_worker_attach(int slot)
Definition: launcher.c:637
#define NoLock
Definition: lockdefs.h:34
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:7487
static char * buf
Definition: pg_test_fsync.c:68
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:735
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:158
bool changed[MaxTupleAttributeNumber]
Definition: logicalproto.h:36
bool * tts_isnull
Definition: tuptable.h:128
void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:285
ResultRelInfo * es_result_relations
Definition: execnodes.h:520
#define RowExclusiveLock
Definition: lockdefs.h:38
#define SIGHUP
Definition: win32_port.h:163
#define RelationGetRelationName(relation)
Definition: rel.h:450
XLogRecPtr startpoint
Definition: walreceiver.h:153
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:200
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
unsigned int uint32
Definition: c.h:358
int pgsocket
Definition: port.h:31
List * publications
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void apply_handle_begin(StringInfo s)
Definition: worker.c:438
RepOriginId replorigin_create(char *roname)
Definition: origin.c:243
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:219
Oid get_atttype(Oid relid, AttrNumber attnum)
Definition: lsyscache.c:861
#define ereport(elevel, rest)
Definition: elog.h:141
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2641
MemoryContext TopMemoryContext
Definition: mcxt.c:44
EState * CreateExecutorState(void)
Definition: execUtils.c:88
Definition: guc.h:72
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:631
List * lappend(List *list, void *datum)
Definition: list.c:321
MemoryContext ApplyContext
Definition: worker.c:98
static char ** options
XLogRecPtr final_lsn
Definition: logicalproto.h:66
#define DLIST_STATIC_INIT(name)
Definition: ilist.h:248
static void apply_handle_commit(StringInfo s)
Definition: worker.c:457
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:67
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
static void logicalrep_worker_sighup(SIGNAL_ARGS)
Definition: worker.c:1551
Node * build_column_default(Relation rel, int attrno)
List * es_tupleTable
Definition: execnodes.h:552
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1426
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1156
static void apply_handle_update(StringInfo s)
Definition: worker.c:665
uintptr_t Datum
Definition: postgres.h:367
void CommandCounterIncrement(void)
Definition: xact.c:1003
#define PGINVALID_SOCKET
Definition: port.h:33
int es_num_result_relations
Definition: execnodes.h:521
void EvalPlanQualInit(EPQState *epqstate, EState *estate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2505
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5633
static void apply_handle_relation(StringInfo s)
Definition: worker.c:523
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int16 attnum
Definition: pg_attribute.h:79
XLogRecPtr local_end
Definition: worker.c:84
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4784
static void apply_dispatch(StringInfo s)
Definition: worker.c:948
static void maybe_reread_subscription(void)
Definition: worker.c:1408
#define makeNode(_type_)
Definition: nodes.h:572
TimestampTz last_recv_time
static MemoryContext ApplyMessageContext
Definition: worker.c:97
#define SIGNAL_ARGS
Definition: c.h:1259
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:732
#define lfirst(lc)
Definition: pg_list.h:190
RepOriginId replorigin_session_origin
Definition: origin.c:156
static void apply_handle_origin(StringInfo s)
Definition: worker.c:501
void StartTransactionCommand(void)
Definition: xact.c:2794
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
static void slot_store_error_callback(void *arg)
Definition: worker.c:269
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:214
bool IsTransactionState(void)
Definition: xact.c:356
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:275
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:95
int WalWriterDelay
Definition: walwriter.c:68
bool MySubscriptionValid
Definition: worker.c:103
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:506
void FreeSubscription(Subscription *sub)
char * logicalrep_typmap_gettypname(Oid remoteid)
Definition: relation.c:422
RTEKind rtekind
Definition: parsenodes.h:974
static Datum values[MAXATTR]
Definition: bootstrap.c:167
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4804
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:818
static void slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values, bool *replaces)
Definition: worker.c:372
void * palloc(Size size)
Definition: mcxt.c:924
int errmsg(const char *fmt,...)
Definition: elog.c:784
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:1090
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
#define elog(elevel,...)
Definition: elog.h:226
int i
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
#define errcontext
Definition: elog.h:183
void * arg
struct Latch * MyLatch
Definition: globals.c:54
struct FlushPosition FlushPosition
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:121
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:746
XLogRecPtr commit_lsn
Definition: logicalproto.h:73
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389
Definition: pg_list.h:50
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1730
#define snprintf
Definition: port.h:192
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1646
#define WL_LATCH_SET
Definition: latch.h:124
#define RelationGetRelid(relation)
Definition: rel.h:416
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:226
#define die(msg)
Definition: pg_test_fsync.c:97
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4536
void ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
TimestampTz committime
Definition: logicalproto.h:75
void logicalrep_typmap_update(LogicalRepTyp *remotetyp)
Definition: relation.c:388
void ApplyWorkerMain(Datum main_arg)
Definition: worker.c:1565
uint32 LogicalRepRelId
Definition: logicalproto.h:39
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1517
#define lfirst_oid(lc)
Definition: pg_list.h:192
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
TimestampTz reply_time
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:210
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:263
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5662
void pgstat_report_stat(bool force)
Definition: pgstat.c:813
static volatile sig_atomic_t got_SIGHUP
Definition: worker.c:115
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:888
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:255
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:522