PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
worker.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * worker.c
3  * PostgreSQL logical replication worker (apply)
4  *
5  * Copyright (c) 2016-2017, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/worker.c
9  *
10  * NOTES
11  * This file contains the worker which applies logical changes as they come
12  * from remote logical replication stream.
13  *
14  * The main worker (apply) is started by logical replication worker
15  * launcher for every enabled subscription in a database. It uses
16  * walsender protocol to communicate with publisher.
17  *
18  * This module includes server facing code and shares libpqwalreceiver
19  * module with walreceiver for providing the libpq specific functionality.
20  *
21  *-------------------------------------------------------------------------
22  */
23 
24 #include "postgres.h"
25 
26 #include "miscadmin.h"
27 #include "pgstat.h"
28 #include "funcapi.h"
29 
30 #include "access/xact.h"
31 #include "access/xlog_internal.h"
32 
33 #include "catalog/namespace.h"
36 
37 #include "commands/trigger.h"
38 
39 #include "executor/executor.h"
41 
42 #include "libpq/pqformat.h"
43 #include "libpq/pqsignal.h"
44 
45 #include "mb/pg_wchar.h"
46 
47 #include "nodes/makefuncs.h"
48 
49 #include "optimizer/planner.h"
50 
51 #include "parser/parse_relation.h"
52 
53 #include "postmaster/bgworker.h"
54 #include "postmaster/postmaster.h"
55 #include "postmaster/walwriter.h"
56 
57 #include "replication/decode.h"
58 #include "replication/logical.h"
63 #include "replication/origin.h"
64 #include "replication/snapbuild.h"
67 
68 #include "rewrite/rewriteHandler.h"
69 
70 #include "storage/bufmgr.h"
71 #include "storage/ipc.h"
72 #include "storage/lmgr.h"
73 #include "storage/proc.h"
74 #include "storage/procarray.h"
75 
76 #include "tcop/tcopprot.h"
77 
78 #include "utils/builtins.h"
79 #include "utils/catcache.h"
80 #include "utils/datum.h"
81 #include "utils/fmgroids.h"
82 #include "utils/guc.h"
83 #include "utils/inval.h"
84 #include "utils/lsyscache.h"
85 #include "utils/memutils.h"
86 #include "utils/timeout.h"
87 #include "utils/tqual.h"
88 #include "utils/syscache.h"
89 
90 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
91 
92 typedef struct FlushPosition
93 {
98 
100 
101 typedef struct SlotErrCallbackArg
102 {
104  int attnum;
106 
109 
111 
113 bool MySubscriptionValid = false;
114 
117 
118 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
119 
120 static void store_flush_position(XLogRecPtr remote_lsn);
121 
122 static void maybe_reread_subscription(void);
123 
124 /* Flags set by signal handlers */
125 static volatile sig_atomic_t got_SIGHUP = false;
126 
127 /*
128  * Should this worker apply changes for given relation.
129  *
130  * This is mainly needed for initial relation data sync as that runs in
131  * separate worker process running in parallel and we need some way to skip
132  * changes coming to the main apply worker during the sync of a table.
133  *
134  * Note we need to do smaller or equals comparison for SYNCDONE state because
135  * it might hold position of end of initial slot consistent point WAL
136  * record + 1 (ie start of next record) and next record can be COMMIT of
137  * transaction we are now processing (which is what we set remote_final_lsn
138  * to in apply_handle_begin).
139  */
140 static bool
142 {
143  if (am_tablesync_worker())
144  return MyLogicalRepWorker->relid == rel->localreloid;
145  else
146  return (rel->state == SUBREL_STATE_READY ||
147  (rel->state == SUBREL_STATE_SYNCDONE &&
148  rel->statelsn <= remote_final_lsn));
149 }
150 
151 /*
152  * Make sure that we started local transaction.
153  *
154  * Also switches to ApplyMessageContext as necessary.
155  */
156 static bool
158 {
159  if (IsTransactionState())
160  {
162 
163  if (CurrentMemoryContext != ApplyMessageContext)
164  MemoryContextSwitchTo(ApplyMessageContext);
165 
166  return false;
167  }
168 
171 
173 
174  MemoryContextSwitchTo(ApplyMessageContext);
175  return true;
176 }
177 
178 
179 /*
180  * Executor state preparation for evaluation of constraint expressions,
181  * indexes and triggers.
182  *
183  * This is based on similar code in copy.c
184  */
185 static EState *
187 {
188  EState *estate;
189  ResultRelInfo *resultRelInfo;
190  RangeTblEntry *rte;
191 
192  estate = CreateExecutorState();
193 
194  rte = makeNode(RangeTblEntry);
195  rte->rtekind = RTE_RELATION;
196  rte->relid = RelationGetRelid(rel->localrel);
197  rte->relkind = rel->localrel->rd_rel->relkind;
198  estate->es_range_table = list_make1(rte);
199 
200  resultRelInfo = makeNode(ResultRelInfo);
201  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
202 
203  estate->es_result_relations = resultRelInfo;
204  estate->es_num_result_relations = 1;
205  estate->es_result_relation_info = resultRelInfo;
206 
207  /* Triggers might need a slot */
208  if (resultRelInfo->ri_TrigDesc)
209  estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
210 
211  /* Prepare to catch AFTER triggers. */
213 
214  return estate;
215 }
216 
217 /*
218  * Executes default values for columns for which we can't map to remote
219  * relation columns.
220  *
221  * This allows us to support tables which have more columns on the downstream
222  * than on the upstream.
223  */
224 static void
226  TupleTableSlot *slot)
227 {
228  TupleDesc desc = RelationGetDescr(rel->localrel);
229  int num_phys_attrs = desc->natts;
230  int i;
231  int attnum,
232  num_defaults = 0;
233  int *defmap;
234  ExprState **defexprs;
235  ExprContext *econtext;
236 
237  econtext = GetPerTupleExprContext(estate);
238 
239  /* We got all the data via replication, no need to evaluate anything. */
240  if (num_phys_attrs == rel->remoterel.natts)
241  return;
242 
243  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
244  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
245 
246  for (attnum = 0; attnum < num_phys_attrs; attnum++)
247  {
248  Expr *defexpr;
249 
250  if (TupleDescAttr(desc, attnum)->attisdropped)
251  continue;
252 
253  if (rel->attrmap[attnum] >= 0)
254  continue;
255 
256  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
257 
258  if (defexpr != NULL)
259  {
260  /* Run the expression through planner */
261  defexpr = expression_planner(defexpr);
262 
263  /* Initialize executable expression in copycontext */
264  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
265  defmap[num_defaults] = attnum;
266  num_defaults++;
267  }
268 
269  }
270 
271  for (i = 0; i < num_defaults; i++)
272  slot->tts_values[defmap[i]] =
273  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
274 }
275 
276 /*
277  * Error callback to give more context info about type conversion failure.
278  */
279 static void
281 {
282  SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
283  Oid remotetypoid,
284  localtypoid;
285 
286  if (errarg->attnum < 0)
287  return;
288 
289  remotetypoid = errarg->rel->atttyps[errarg->attnum];
290  localtypoid = logicalrep_typmap_getid(remotetypoid);
291  errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
292  "remote type %s, local type %s",
293  errarg->rel->nspname, errarg->rel->relname,
294  errarg->rel->attnames[errarg->attnum],
295  format_type_be(remotetypoid),
296  format_type_be(localtypoid));
297 }
298 
299 /*
300  * Store data in C string form into slot.
301  * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
302  * use better.
303  */
304 static void
306  char **values)
307 {
308  int natts = slot->tts_tupleDescriptor->natts;
309  int i;
310  SlotErrCallbackArg errarg;
311  ErrorContextCallback errcallback;
312 
313  ExecClearTuple(slot);
314 
315  /* Push callback + info on the error context stack */
316  errarg.rel = &rel->remoterel;
317  errarg.attnum = -1;
318  errcallback.callback = slot_store_error_callback;
319  errcallback.arg = (void *) &errarg;
320  errcallback.previous = error_context_stack;
321  error_context_stack = &errcallback;
322 
323  /* Call the "in" function for each non-dropped attribute */
324  for (i = 0; i < natts; i++)
325  {
327  int remoteattnum = rel->attrmap[i];
328 
329  if (!att->attisdropped && remoteattnum >= 0 &&
330  values[remoteattnum] != NULL)
331  {
332  Oid typinput;
333  Oid typioparam;
334 
335  errarg.attnum = remoteattnum;
336 
337  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
338  slot->tts_values[i] = OidInputFunctionCall(typinput,
339  values[remoteattnum],
340  typioparam,
341  att->atttypmod);
342  slot->tts_isnull[i] = false;
343  }
344  else
345  {
346  /*
347  * We assign NULL to dropped attributes, NULL values, and missing
348  * values (missing values should be later filled using
349  * slot_fill_defaults).
350  */
351  slot->tts_values[i] = (Datum) 0;
352  slot->tts_isnull[i] = true;
353  }
354  }
355 
356  /* Pop the error context stack */
357  error_context_stack = errcallback.previous;
358 
359  ExecStoreVirtualTuple(slot);
360 }
361 
362 /*
363  * Modify slot with user data provided as C strings.
364  * This is somewhat similar to heap_modify_tuple but also calls the type
365  * input function on the user data as the input is the text representation
366  * of the types.
367  */
368 static void
370  char **values, bool *replaces)
371 {
372  int natts = slot->tts_tupleDescriptor->natts;
373  int i;
374  SlotErrCallbackArg errarg;
375  ErrorContextCallback errcallback;
376 
377  slot_getallattrs(slot);
378  ExecClearTuple(slot);
379 
380  /* Push callback + info on the error context stack */
381  errarg.rel = &rel->remoterel;
382  errarg.attnum = -1;
383  errcallback.callback = slot_store_error_callback;
384  errcallback.arg = (void *) &errarg;
385  errcallback.previous = error_context_stack;
386  error_context_stack = &errcallback;
387 
388  /* Call the "in" function for each replaced attribute */
389  for (i = 0; i < natts; i++)
390  {
392  int remoteattnum = rel->attrmap[i];
393 
394  if (remoteattnum >= 0 && !replaces[remoteattnum])
395  continue;
396 
397  if (remoteattnum >= 0 && values[remoteattnum] != NULL)
398  {
399  Oid typinput;
400  Oid typioparam;
401 
402  errarg.attnum = remoteattnum;
403 
404  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
405  slot->tts_values[i] = OidInputFunctionCall(typinput,
406  values[remoteattnum],
407  typioparam,
408  att->atttypmod);
409  slot->tts_isnull[i] = false;
410  }
411  else
412  {
413  slot->tts_values[i] = (Datum) 0;
414  slot->tts_isnull[i] = true;
415  }
416  }
417 
418  /* Pop the error context stack */
419  error_context_stack = errcallback.previous;
420 
421  ExecStoreVirtualTuple(slot);
422 }
423 
424 /*
425  * Handle BEGIN message.
426  */
427 static void
429 {
430  LogicalRepBeginData begin_data;
431 
432  logicalrep_read_begin(s, &begin_data);
433 
434  remote_final_lsn = begin_data.final_lsn;
435 
436  in_remote_transaction = true;
437 
439 }
440 
441 /*
442  * Handle COMMIT message.
443  *
444  * TODO, support tracking of multiple origins
445  */
446 static void
448 {
449  LogicalRepCommitData commit_data;
450 
451  logicalrep_read_commit(s, &commit_data);
452 
453  Assert(commit_data.commit_lsn == remote_final_lsn);
454 
455  /* The synchronization worker runs in single transaction. */
457  {
458  /*
459  * Update origin state so we can restart streaming from correct
460  * position in case of crash.
461  */
464 
466  pgstat_report_stat(false);
467 
468  store_flush_position(commit_data.end_lsn);
469  }
470  else
471  {
472  /* Process any invalidation messages that might have accumulated. */
475  }
476 
477  in_remote_transaction = false;
478 
479  /* Process any tables that are being synchronized in parallel. */
480  process_syncing_tables(commit_data.end_lsn);
481 
483 }
484 
485 /*
486  * Handle ORIGIN message.
487  *
488  * TODO, support tracking of multiple origins
489  */
490 static void
492 {
493  /*
494  * ORIGIN message can only come inside remote transaction and before any
495  * actual writes.
496  */
497  if (!in_remote_transaction ||
499  ereport(ERROR,
500  (errcode(ERRCODE_PROTOCOL_VIOLATION),
501  errmsg("ORIGIN message sent out of order")));
502 }
503 
504 /*
505  * Handle RELATION message.
506  *
507  * Note we don't do validation against local schema here. The validation
508  * against local schema is postponed until first change for given relation
509  * comes as we only care about it when applying changes for it anyway and we
510  * do less locking this way.
511  */
512 static void
514 {
515  LogicalRepRelation *rel;
516 
517  rel = logicalrep_read_rel(s);
519 }
520 
521 /*
522  * Handle TYPE message.
523  *
524  * Note we don't do local mapping here, that's done when the type is
525  * actually used.
526  */
527 static void
529 {
530  LogicalRepTyp typ;
531 
532  logicalrep_read_typ(s, &typ);
534 }
535 
536 /*
537  * Get replica identity index or if it is not defined a primary key.
538  *
539  * If neither is defined, returns InvalidOid
540  */
541 static Oid
543 {
544  Oid idxoid;
545 
546  idxoid = RelationGetReplicaIndex(rel);
547 
548  if (!OidIsValid(idxoid))
549  idxoid = RelationGetPrimaryKeyIndex(rel);
550 
551  return idxoid;
552 }
553 
554 /*
555  * Handle INSERT message.
556  */
557 static void
559 {
561  LogicalRepTupleData newtup;
562  LogicalRepRelId relid;
563  EState *estate;
564  TupleTableSlot *remoteslot;
565  MemoryContext oldctx;
566 
568 
569  relid = logicalrep_read_insert(s, &newtup);
572  {
573  /*
574  * The relation can't become interesting in the middle of the
575  * transaction so it's safe to unlock it.
576  */
578  return;
579  }
580 
581  /* Initialize the executor state. */
582  estate = create_estate_for_relation(rel);
583  remoteslot = ExecInitExtraTupleSlot(estate);
585 
586  /* Process and store remote tuple in the slot */
588  slot_store_cstrings(remoteslot, rel, newtup.values);
589  slot_fill_defaults(rel, estate, remoteslot);
590  MemoryContextSwitchTo(oldctx);
591 
594 
595  /* Do the insert. */
596  ExecSimpleRelationInsert(estate, remoteslot);
597 
598  /* Cleanup. */
601 
602  /* Handle queued AFTER triggers. */
603  AfterTriggerEndQuery(estate);
604 
605  ExecResetTupleTable(estate->es_tupleTable, false);
606  FreeExecutorState(estate);
607 
609 
611 }
612 
613 /*
614  * Check if the logical replication relation is updatable and throw
615  * appropriate error if it isn't.
616  */
617 static void
619 {
620  /* Updatable, no error. */
621  if (rel->updatable)
622  return;
623 
624  /*
625  * We are in error mode so it's fine this is somewhat slow. It's better to
626  * give user correct error.
627  */
629  {
630  ereport(ERROR,
631  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
632  errmsg("publisher does not send replica identity column "
633  "expected by the logical replication target relation \"%s.%s\"",
634  rel->remoterel.nspname, rel->remoterel.relname)));
635  }
636 
637  ereport(ERROR,
638  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
639  errmsg("logical replication target relation \"%s.%s\" has "
640  "neither REPLICA IDENTITY index nor PRIMARY "
641  "KEY and published relation does not have "
642  "REPLICA IDENTITY FULL",
643  rel->remoterel.nspname, rel->remoterel.relname)));
644 }
645 
646 /*
647  * Handle UPDATE message.
648  *
649  * TODO: FDW support
650  */
651 static void
653 {
655  LogicalRepRelId relid;
656  Oid idxoid;
657  EState *estate;
658  EPQState epqstate;
659  LogicalRepTupleData oldtup;
660  LogicalRepTupleData newtup;
661  bool has_oldtup;
662  TupleTableSlot *localslot;
663  TupleTableSlot *remoteslot;
664  bool found;
665  MemoryContext oldctx;
666 
668 
669  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
670  &newtup);
673  {
674  /*
675  * The relation can't become interesting in the middle of the
676  * transaction so it's safe to unlock it.
677  */
679  return;
680  }
681 
682  /* Check if we can do the update. */
684 
685  /* Initialize the executor state. */
686  estate = create_estate_for_relation(rel);
687  remoteslot = ExecInitExtraTupleSlot(estate);
689  localslot = ExecInitExtraTupleSlot(estate);
691  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
692 
695 
696  /* Build the search tuple. */
698  slot_store_cstrings(remoteslot, rel,
699  has_oldtup ? oldtup.values : newtup.values);
700  MemoryContextSwitchTo(oldctx);
701 
702  /*
703  * Try to find tuple using either replica identity index, primary key or
704  * if needed, sequential scan.
705  */
706  idxoid = GetRelationIdentityOrPK(rel->localrel);
707  Assert(OidIsValid(idxoid) ||
708  (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
709 
710  if (OidIsValid(idxoid))
711  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
713  remoteslot, localslot);
714  else
716  remoteslot, localslot);
717 
718  ExecClearTuple(remoteslot);
719 
720  /*
721  * Tuple found.
722  *
723  * Note this will fail if there are other conflicting unique indexes.
724  */
725  if (found)
726  {
727  /* Process and store remote tuple in the slot */
729  ExecStoreTuple(localslot->tts_tuple, remoteslot, InvalidBuffer, false);
730  slot_modify_cstrings(remoteslot, rel, newtup.values, newtup.changed);
731  MemoryContextSwitchTo(oldctx);
732 
733  EvalPlanQualSetSlot(&epqstate, remoteslot);
734 
735  /* Do the actual update. */
736  ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
737  }
738  else
739  {
740  /*
741  * The tuple to be updated could not be found.
742  *
743  * TODO what to do here, change the log level to LOG perhaps?
744  */
745  elog(DEBUG1,
746  "logical replication did not find row for update "
747  "in replication target relation \"%s\"",
749  }
750 
751  /* Cleanup. */
754 
755  /* Handle queued AFTER triggers. */
756  AfterTriggerEndQuery(estate);
757 
758  EvalPlanQualEnd(&epqstate);
759  ExecResetTupleTable(estate->es_tupleTable, false);
760  FreeExecutorState(estate);
761 
763 
765 }
766 
767 /*
768  * Handle DELETE message.
769  *
770  * TODO: FDW support
771  */
772 static void
774 {
776  LogicalRepTupleData oldtup;
777  LogicalRepRelId relid;
778  Oid idxoid;
779  EState *estate;
780  EPQState epqstate;
781  TupleTableSlot *remoteslot;
782  TupleTableSlot *localslot;
783  bool found;
784  MemoryContext oldctx;
785 
787 
788  relid = logicalrep_read_delete(s, &oldtup);
791  {
792  /*
793  * The relation can't become interesting in the middle of the
794  * transaction so it's safe to unlock it.
795  */
797  return;
798  }
799 
800  /* Check if we can do the delete. */
802 
803  /* Initialize the executor state. */
804  estate = create_estate_for_relation(rel);
805  remoteslot = ExecInitExtraTupleSlot(estate);
807  localslot = ExecInitExtraTupleSlot(estate);
809  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
810 
813 
814  /* Find the tuple using the replica identity index. */
816  slot_store_cstrings(remoteslot, rel, oldtup.values);
817  MemoryContextSwitchTo(oldctx);
818 
819  /*
820  * Try to find tuple using either replica identity index, primary key or
821  * if needed, sequential scan.
822  */
823  idxoid = GetRelationIdentityOrPK(rel->localrel);
824  Assert(OidIsValid(idxoid) ||
826 
827  if (OidIsValid(idxoid))
828  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
830  remoteslot, localslot);
831  else
833  remoteslot, localslot);
834  /* If found delete it. */
835  if (found)
836  {
837  EvalPlanQualSetSlot(&epqstate, localslot);
838 
839  /* Do the actual delete. */
840  ExecSimpleRelationDelete(estate, &epqstate, localslot);
841  }
842  else
843  {
844  /* The tuple to be deleted could not be found. */
845  ereport(DEBUG1,
846  (errmsg("logical replication could not find row for delete "
847  "in replication target %s",
849  }
850 
851  /* Cleanup. */
854 
855  /* Handle queued AFTER triggers. */
856  AfterTriggerEndQuery(estate);
857 
858  EvalPlanQualEnd(&epqstate);
859  ExecResetTupleTable(estate->es_tupleTable, false);
860  FreeExecutorState(estate);
861 
863 
865 }
866 
867 
868 /*
869  * Logical replication protocol message dispatcher.
870  */
871 static void
873 {
874  char action = pq_getmsgbyte(s);
875 
876  switch (action)
877  {
878  /* BEGIN */
879  case 'B':
881  break;
882  /* COMMIT */
883  case 'C':
885  break;
886  /* INSERT */
887  case 'I':
889  break;
890  /* UPDATE */
891  case 'U':
893  break;
894  /* DELETE */
895  case 'D':
897  break;
898  /* RELATION */
899  case 'R':
901  break;
902  /* TYPE */
903  case 'Y':
905  break;
906  /* ORIGIN */
907  case 'O':
909  break;
910  default:
911  ereport(ERROR,
912  (errcode(ERRCODE_PROTOCOL_VIOLATION),
913  errmsg("invalid logical replication message type %c", action)));
914  }
915 }
916 
917 /*
918  * Figure out which write/flush positions to report to the walsender process.
919  *
920  * We can't simply report back the last LSN the walsender sent us because the
921  * local transaction might not yet be flushed to disk locally. Instead we
922  * build a list that associates local with remote LSNs for every commit. When
923  * reporting back the flush position to the sender we iterate that list and
924  * check which entries on it are already locally flushed. Those we can report
925  * as having been flushed.
926  *
927  * The have_pending_txes is true if there are outstanding transactions that
928  * need to be flushed.
929  */
930 static void
932  bool *have_pending_txes)
933 {
934  dlist_mutable_iter iter;
935  XLogRecPtr local_flush = GetFlushRecPtr();
936 
937  *write = InvalidXLogRecPtr;
938  *flush = InvalidXLogRecPtr;
939 
940  dlist_foreach_modify(iter, &lsn_mapping)
941  {
942  FlushPosition *pos =
943  dlist_container(FlushPosition, node, iter.cur);
944 
945  *write = pos->remote_end;
946 
947  if (pos->local_end <= local_flush)
948  {
949  *flush = pos->remote_end;
950  dlist_delete(iter.cur);
951  pfree(pos);
952  }
953  else
954  {
955  /*
956  * Don't want to uselessly iterate over the rest of the list which
957  * could potentially be long. Instead get the last element and
958  * grab the write position from there.
959  */
960  pos = dlist_tail_element(FlushPosition, node,
961  &lsn_mapping);
962  *write = pos->remote_end;
963  *have_pending_txes = true;
964  return;
965  }
966  }
967 
968  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
969 }
970 
971 /*
972  * Store current remote/local lsn pair in the tracking list.
973  */
974 static void
976 {
977  FlushPosition *flushpos;
978 
979  /* Need to do this in permanent context */
980  MemoryContextSwitchTo(ApplyContext);
981 
982  /* Track commit lsn */
983  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
984  flushpos->local_end = XactLastCommitEnd;
985  flushpos->remote_end = remote_lsn;
986 
987  dlist_push_tail(&lsn_mapping, &flushpos->node);
988  MemoryContextSwitchTo(ApplyMessageContext);
989 }
990 
991 
992 /* Update statistics of the worker. */
993 static void
995 {
997  MyLogicalRepWorker->last_send_time = send_time;
999  if (reply)
1000  {
1002  MyLogicalRepWorker->reply_time = send_time;
1003  }
1004 }
1005 
1006 /*
1007  * Apply main loop.
1008  */
1009 static void
1011 {
1012  /*
1013  * Init the ApplyMessageContext which we clean up after each replication
1014  * protocol message.
1015  */
1016  ApplyMessageContext = AllocSetContextCreate(ApplyContext,
1017  "ApplyMessageContext",
1019 
1020  /* mark as idle, before starting to loop */
1022 
1023  for (;;)
1024  {
1026  int rc;
1027  int len;
1028  char *buf = NULL;
1029  bool endofstream = false;
1030  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1031  bool ping_sent = false;
1032  long wait_time;
1033 
1035 
1036  MemoryContextSwitchTo(ApplyMessageContext);
1037 
1038  len = walrcv_receive(wrconn, &buf, &fd);
1039 
1040  if (len != 0)
1041  {
1042  /* Process the data */
1043  for (;;)
1044  {
1046 
1047  if (len == 0)
1048  {
1049  break;
1050  }
1051  else if (len < 0)
1052  {
1053  ereport(LOG,
1054  (errmsg("data stream from publisher has ended")));
1055  endofstream = true;
1056  break;
1057  }
1058  else
1059  {
1060  int c;
1061  StringInfoData s;
1062 
1063  /* Reset timeout. */
1064  last_recv_timestamp = GetCurrentTimestamp();
1065  ping_sent = false;
1066 
1067  /* Ensure we are reading the data into our memory context. */
1068  MemoryContextSwitchTo(ApplyMessageContext);
1069 
1070  s.data = buf;
1071  s.len = len;
1072  s.cursor = 0;
1073  s.maxlen = -1;
1074 
1075  c = pq_getmsgbyte(&s);
1076 
1077  if (c == 'w')
1078  {
1079  XLogRecPtr start_lsn;
1080  XLogRecPtr end_lsn;
1081  TimestampTz send_time;
1082 
1083  start_lsn = pq_getmsgint64(&s);
1084  end_lsn = pq_getmsgint64(&s);
1085  send_time = pq_getmsgint64(&s);
1086 
1087  if (last_received < start_lsn)
1088  last_received = start_lsn;
1089 
1090  if (last_received < end_lsn)
1091  last_received = end_lsn;
1092 
1093  UpdateWorkerStats(last_received, send_time, false);
1094 
1095  apply_dispatch(&s);
1096  }
1097  else if (c == 'k')
1098  {
1099  XLogRecPtr end_lsn;
1101  bool reply_requested;
1102 
1103  end_lsn = pq_getmsgint64(&s);
1104  timestamp = pq_getmsgint64(&s);
1105  reply_requested = pq_getmsgbyte(&s);
1106 
1107  if (last_received < end_lsn)
1108  last_received = end_lsn;
1109 
1110  send_feedback(last_received, reply_requested, false);
1111  UpdateWorkerStats(last_received, timestamp, true);
1112  }
1113  /* other message types are purposefully ignored */
1114 
1115  MemoryContextReset(ApplyMessageContext);
1116  }
1117 
1118  len = walrcv_receive(wrconn, &buf, &fd);
1119  }
1120  }
1121 
1122  /* confirm all writes so far */
1123  send_feedback(last_received, false, false);
1124 
1125  if (!in_remote_transaction)
1126  {
1127  /*
1128  * If we didn't get any transactions for a while there might be
1129  * unconsumed invalidation messages in the queue, consume them
1130  * now.
1131  */
1134 
1135  /* Process any table synchronization changes. */
1136  process_syncing_tables(last_received);
1137  }
1138 
1139  /* Cleanup the memory. */
1140  MemoryContextResetAndDeleteChildren(ApplyMessageContext);
1142 
1143  /* Check if we need to exit the streaming loop. */
1144  if (endofstream)
1145  {
1146  TimeLineID tli;
1147 
1148  walrcv_endstreaming(wrconn, &tli);
1149  break;
1150  }
1151 
1152  /*
1153  * Wait for more data or latch. If we have unflushed transactions,
1154  * wake up after WalWriterDelay to see if they've been flushed yet (in
1155  * which case we should send a feedback message). Otherwise, there's
1156  * no particular urgency about waking up unless we get data or a
1157  * signal.
1158  */
1159  if (!dlist_is_empty(&lsn_mapping))
1160  wait_time = WalWriterDelay;
1161  else
1162  wait_time = NAPTIME_PER_CYCLE;
1163 
1167  fd, wait_time,
1169 
1170  /* Emergency bailout if postmaster has died */
1171  if (rc & WL_POSTMASTER_DEATH)
1172  proc_exit(1);
1173 
1174  if (rc & WL_LATCH_SET)
1175  {
1178  }
1179 
1180  if (got_SIGHUP)
1181  {
1182  got_SIGHUP = false;
1184  }
1185 
1186  if (rc & WL_TIMEOUT)
1187  {
1188  /*
1189  * We didn't receive anything new. If we haven't heard anything
1190  * from the server for more than wal_receiver_timeout / 2, ping
1191  * the server. Also, if it's been longer than
1192  * wal_receiver_status_interval since the last update we sent,
1193  * send a status update to the master anyway, to report any
1194  * progress in applying WAL.
1195  */
1196  bool requestReply = false;
1197 
1198  /*
1199  * Check if time since last receive from standby has reached the
1200  * configured limit.
1201  */
1202  if (wal_receiver_timeout > 0)
1203  {
1205  TimestampTz timeout;
1206 
1207  timeout =
1208  TimestampTzPlusMilliseconds(last_recv_timestamp,
1210 
1211  if (now >= timeout)
1212  ereport(ERROR,
1213  (errmsg("terminating logical replication worker due to timeout")));
1214 
1215  /*
1216  * We didn't receive anything new, for half of receiver
1217  * replication timeout. Ping the server.
1218  */
1219  if (!ping_sent)
1220  {
1221  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1222  (wal_receiver_timeout / 2));
1223  if (now >= timeout)
1224  {
1225  requestReply = true;
1226  ping_sent = true;
1227  }
1228  }
1229  }
1230 
1231  send_feedback(last_received, requestReply, requestReply);
1232  }
1233  }
1234 }
1235 
1236 /*
1237  * Send a Standby Status Update message to server.
1238  *
1239  * 'recvpos' is the latest LSN we've received data to, force is set if we need
1240  * to send a response to avoid timeouts.
1241  */
1242 static void
1243 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
1244 {
1245  static StringInfo reply_message = NULL;
1246  static TimestampTz send_time = 0;
1247 
1248  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1249  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1250  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1251 
1252  XLogRecPtr writepos;
1253  XLogRecPtr flushpos;
1254  TimestampTz now;
1255  bool have_pending_txes;
1256 
1257  /*
1258  * If the user doesn't want status to be reported to the publisher, be
1259  * sure to exit before doing anything at all.
1260  */
1261  if (!force && wal_receiver_status_interval <= 0)
1262  return;
1263 
1264  /* It's legal to not pass a recvpos */
1265  if (recvpos < last_recvpos)
1266  recvpos = last_recvpos;
1267 
1268  get_flush_position(&writepos, &flushpos, &have_pending_txes);
1269 
1270  /*
1271  * No outstanding transactions to flush, we can report the latest received
1272  * position. This is important for synchronous replication.
1273  */
1274  if (!have_pending_txes)
1275  flushpos = writepos = recvpos;
1276 
1277  if (writepos < last_writepos)
1278  writepos = last_writepos;
1279 
1280  if (flushpos < last_flushpos)
1281  flushpos = last_flushpos;
1282 
1283  now = GetCurrentTimestamp();
1284 
1285  /* if we've already reported everything we're good */
1286  if (!force &&
1287  writepos == last_writepos &&
1288  flushpos == last_flushpos &&
1289  !TimestampDifferenceExceeds(send_time, now,
1291  return;
1292  send_time = now;
1293 
1294  if (!reply_message)
1295  {
1296  MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
1297 
1298  reply_message = makeStringInfo();
1299  MemoryContextSwitchTo(oldctx);
1300  }
1301  else
1302  resetStringInfo(reply_message);
1303 
1304  pq_sendbyte(reply_message, 'r');
1305  pq_sendint64(reply_message, recvpos); /* write */
1306  pq_sendint64(reply_message, flushpos); /* flush */
1307  pq_sendint64(reply_message, writepos); /* apply */
1308  pq_sendint64(reply_message, now); /* sendTime */
1309  pq_sendbyte(reply_message, requestReply); /* replyRequested */
1310 
1311  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1312  force,
1313  (uint32) (recvpos >> 32), (uint32) recvpos,
1314  (uint32) (writepos >> 32), (uint32) writepos,
1315  (uint32) (flushpos >> 32), (uint32) flushpos
1316  );
1317 
1318  walrcv_send(wrconn, reply_message->data, reply_message->len);
1319 
1320  if (recvpos > last_recvpos)
1321  last_recvpos = recvpos;
1322  if (writepos > last_writepos)
1323  last_writepos = writepos;
1324  if (flushpos > last_flushpos)
1325  last_flushpos = flushpos;
1326 }
1327 
1328 /*
1329  * Reread subscription info if needed. Most changes will be exit.
1330  */
1331 static void
1333 {
1334  MemoryContext oldctx;
1336  bool started_tx = false;
1337 
1338  /* When cache state is valid there is nothing to do here. */
1339  if (MySubscriptionValid)
1340  return;
1341 
1342  /* This function might be called inside or outside of transaction. */
1343  if (!IsTransactionState())
1344  {
1346  started_tx = true;
1347  }
1348 
1349  /* Ensure allocations in permanent context. */
1350  oldctx = MemoryContextSwitchTo(ApplyContext);
1351 
1352  newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1353 
1354  /*
1355  * Exit if the subscription was removed. This normally should not happen
1356  * as the worker gets killed during DROP SUBSCRIPTION.
1357  */
1358  if (!newsub)
1359  {
1360  ereport(LOG,
1361  (errmsg("logical replication apply worker for subscription \"%s\" will "
1362  "stop because the subscription was removed",
1363  MySubscription->name)));
1364 
1365  proc_exit(0);
1366  }
1367 
1368  /*
1369  * Exit if the subscription was disabled. This normally should not happen
1370  * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
1371  */
1372  if (!newsub->enabled)
1373  {
1374  ereport(LOG,
1375  (errmsg("logical replication apply worker for subscription \"%s\" will "
1376  "stop because the subscription was disabled",
1377  MySubscription->name)));
1378 
1379  proc_exit(0);
1380  }
1381 
1382  /*
1383  * Exit if connection string was changed. The launcher will start new
1384  * worker.
1385  */
1386  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1387  {
1388  ereport(LOG,
1389  (errmsg("logical replication apply worker for subscription \"%s\" will "
1390  "restart because the connection information was changed",
1391  MySubscription->name)));
1392 
1393  proc_exit(0);
1394  }
1395 
1396  /*
1397  * Exit if subscription name was changed (it's used for
1398  * fallback_application_name). The launcher will start new worker.
1399  */
1400  if (strcmp(newsub->name, MySubscription->name) != 0)
1401  {
1402  ereport(LOG,
1403  (errmsg("logical replication apply worker for subscription \"%s\" will "
1404  "restart because subscription was renamed",
1405  MySubscription->name)));
1406 
1407  proc_exit(0);
1408  }
1409 
1410  /* !slotname should never happen when enabled is true. */
1411  Assert(newsub->slotname);
1412 
1413  /*
1414  * We need to make new connection to new slot if slot name has changed so
1415  * exit here as well if that's the case.
1416  */
1417  if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
1418  {
1419  ereport(LOG,
1420  (errmsg("logical replication apply worker for subscription \"%s\" will "
1421  "restart because the replication slot name was changed",
1422  MySubscription->name)));
1423 
1424  proc_exit(0);
1425  }
1426 
1427  /*
1428  * Exit if publication list was changed. The launcher will start new
1429  * worker.
1430  */
1431  if (!equal(newsub->publications, MySubscription->publications))
1432  {
1433  ereport(LOG,
1434  (errmsg("logical replication apply worker for subscription \"%s\" will "
1435  "restart because subscription's publications were changed",
1436  MySubscription->name)));
1437 
1438  proc_exit(0);
1439  }
1440 
1441  /* Check for other changes that should never happen too. */
1442  if (newsub->dbid != MySubscription->dbid)
1443  {
1444  elog(ERROR, "subscription %u changed unexpectedly",
1446  }
1447 
1448  /* Clean old subscription info and switch to new one. */
1449  FreeSubscription(MySubscription);
1450  MySubscription = newsub;
1451 
1452  MemoryContextSwitchTo(oldctx);
1453 
1454  /* Change synchronous commit according to the user's wishes */
1455  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1457 
1458  if (started_tx)
1460 
1461  MySubscriptionValid = true;
1462 }
1463 
1464 /*
1465  * Callback from subscription syscache invalidation.
1466  */
1467 static void
1468 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
1469 {
1470  MySubscriptionValid = false;
1471 }
1472 
1473 /* SIGHUP: set flag to reload configuration at next convenient time */
1474 static void
1476 {
1477  int save_errno = errno;
1478 
1479  got_SIGHUP = true;
1480 
1481  /* Waken anything waiting on the process latch */
1482  SetLatch(MyLatch);
1483 
1484  errno = save_errno;
1485 }
1486 
1487 /* Logical Replication Apply worker entry point */
1488 void
1490 {
1491  int worker_slot = DatumGetInt32(main_arg);
1492  MemoryContext oldctx;
1493  char originname[NAMEDATALEN];
1494  XLogRecPtr origin_startpos;
1495  char *myslotname;
1497 
1498  /* Attach to slot */
1499  logicalrep_worker_attach(worker_slot);
1500 
1501  /* Setup signal handling */
1503  pqsignal(SIGTERM, die);
1505 
1506  /* Initialise stats to a sanish value */
1509 
1510  /* Load the libpq-specific functions */
1511  load_file("libpqwalreceiver", false);
1512 
1515  "logical replication apply");
1516 
1517  /* Run as replica session replication role. */
1518  SetConfigOption("session_replication_role", "replica",
1520 
1521  /* Connect to our database. */
1524 
1525  /* Load the subscription into persistent memory context. */
1526  ApplyContext = AllocSetContextCreate(TopMemoryContext,
1527  "ApplyContext",
1530  oldctx = MemoryContextSwitchTo(ApplyContext);
1531  MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
1532  MySubscriptionValid = true;
1533  MemoryContextSwitchTo(oldctx);
1534 
1535  /* Setup synchronous commit according to the user's wishes */
1536  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1538 
1539  if (!MySubscription->enabled)
1540  {
1541  ereport(LOG,
1542  (errmsg("logical replication apply worker for subscription \"%s\" will not "
1543  "start because the subscription was disabled during startup",
1544  MySubscription->name)));
1545 
1546  proc_exit(0);
1547  }
1548 
1549  /* Keep us informed about subscription changes. */
1552  (Datum) 0);
1553 
1554  if (am_tablesync_worker())
1555  ereport(LOG,
1556  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
1557  MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
1558  else
1559  ereport(LOG,
1560  (errmsg("logical replication apply worker for subscription \"%s\" has started",
1561  MySubscription->name)));
1562 
1564 
1565  /* Connect to the origin and start the replication. */
1566  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1567  MySubscription->conninfo);
1568 
1569  if (am_tablesync_worker())
1570  {
1571  char *syncslotname;
1572 
1573  /* This is table synchroniation worker, call initial sync. */
1574  syncslotname = LogicalRepSyncTableStart(&origin_startpos);
1575 
1576  /* The slot name needs to be allocated in permanent memory context. */
1577  oldctx = MemoryContextSwitchTo(ApplyContext);
1578  myslotname = pstrdup(syncslotname);
1579  MemoryContextSwitchTo(oldctx);
1580 
1581  pfree(syncslotname);
1582  }
1583  else
1584  {
1585  /* This is main apply worker */
1586  RepOriginId originid;
1587  TimeLineID startpointTLI;
1588  char *err;
1589  int server_version;
1590 
1591  myslotname = MySubscription->slotname;
1592 
1593  /*
1594  * This shouldn't happen if the subscription is enabled, but guard
1595  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
1596  * crash if slot is NULL.)
1597  */
1598  if (!myslotname)
1599  ereport(ERROR,
1600  (errmsg("subscription has no replication slot set")));
1601 
1602  /* Setup replication origin tracking. */
1604  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1605  originid = replorigin_by_name(originname, true);
1606  if (!OidIsValid(originid))
1607  originid = replorigin_create(originname);
1608  replorigin_session_setup(originid);
1609  replorigin_session_origin = originid;
1610  origin_startpos = replorigin_session_get_progress(false);
1612 
1613  wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
1614  &err);
1615  if (wrconn == NULL)
1616  ereport(ERROR,
1617  (errmsg("could not connect to the publisher: %s", err)));
1618 
1619  /*
1620  * We don't really use the output identify_system for anything but it
1621  * does some initializations on the upstream so let's still call it.
1622  */
1623  (void) walrcv_identify_system(wrconn, &startpointTLI,
1624  &server_version);
1625 
1626  }
1627 
1628  /*
1629  * Setup callback for syscache so that we know when something changes in
1630  * the subscription relation state.
1631  */
1634  (Datum) 0);
1635 
1636  /* Build logical replication streaming options. */
1637  options.logical = true;
1638  options.startpoint = origin_startpos;
1639  options.slotname = myslotname;
1640  options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1641  options.proto.logical.publication_names = MySubscription->publications;
1642 
1643  /* Start normal logical streaming replication. */
1644  walrcv_startstreaming(wrconn, &options);
1645 
1646  /* Run the main loop. */
1647  LogicalRepApplyLoop(origin_startpos);
1648 
1649  proc_exit(0);
1650 }
1651 
1652 /*
1653  * Is current process a logical replication worker?
1654  */
1655 bool
1657 {
1658  return MyLogicalRepWorker != NULL;
1659 }
void ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
Subscription * MySubscription
Definition: worker.c:112
static void apply_handle_type(StringInfo s)
Definition: worker.c:528
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:141
#define NIL
Definition: pg_list.h:69
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:320
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:186
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:542
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, Relation partition_root, int instrument_options)
Definition: execMain.c:1300
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:1243
WalReceiverConn * wrconn
Definition: worker.c:110
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:254
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate)
Definition: execTuples.c:852
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
static void store_flush_position(XLogRecPtr remote_lsn)
Definition: worker.c:975
uint32 TimeLineID
Definition: xlogdefs.h:45
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:277
void AcceptInvalidationMessages(void)
Definition: inval.c:679
static XLogRecPtr remote_final_lsn
Definition: worker.c:116
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4688
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
static void apply_handle_insert(StringInfo s)
Definition: worker.c:558
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static dlist_head lsn_mapping
Definition: worker.c:99
#define DatumGetInt32(X)
Definition: postgres.h:478
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:2962
#define RelationGetDescr(relation)
Definition: rel.h:428
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:544
dlist_node node
Definition: worker.c:94
#define write(a, b, c)
Definition: win32.h:14
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2994
int64 timestamp
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:57
int64 TimestampTz
Definition: timestamp.h:39
LogicalRepRelation * rel
Definition: worker.c:103
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:84
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
char * pstrdup(const char *in)
Definition: mcxt.c:1077
void CommitTransactionCommand(void)
Definition: xact.c:2750
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:994
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:338
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
StringInfo makeStringInfo(void)
Definition: stringinfo.c:28
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:256
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:324
Expr * expression_planner(Expr *expr)
Definition: planner.c:5939
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
TimestampTz last_send_time
#define InvalidBuffer
Definition: buf.h:25
uint16 RepOriginId
Definition: xlogdefs.h:51
XLogRecPtr last_lsn
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:252
void proc_exit(int code)
Definition: ipc.c:99
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1145
XLogRecPtr remote_end
Definition: worker.c:96
int errcode(int sqlerrcode)
Definition: elog.c:575
char * format_type_be(Oid type_oid)
Definition: format_type.c:94
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:372
Datum * tts_values
Definition: tuptable.h:125
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define WL_SOCKET_READABLE
Definition: latch.h:125
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:135
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8227
void PopActiveSnapshot(void)
Definition: snapmgr.c:812
Oid logicalrep_typmap_getid(Oid remoteid)
Definition: relation.c:446
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1010
void ResetLatch(volatile Latch *latch)
Definition: latch.c:497
List * es_range_table
Definition: execnodes.h:431
#define LOG
Definition: elog.h:26
Form_pg_class rd_rel
Definition: rel.h:114
unsigned int Oid
Definition: postgres_ext.h:31
struct SlotErrCallbackArg SlotErrCallbackArg
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:815
bool IsLogicalWorker(void)
Definition: worker.c:1656
int wal_receiver_status_interval
Definition: walreceiver.c:74
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1649
struct ErrorContextCallback * previous
Definition: elog.h:238
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:304
#define OidIsValid(objectId)
Definition: c.h:538
XLogRecPtr last_lsn
Definition: walsender.c:213
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
static int fd(const char *x, int i)
Definition: preproc-init.c:105
int natts
Definition: tupdesc.h:73
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:211
int wal_receiver_timeout
Definition: walreceiver.c:75
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:931
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:149
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:3198
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:160
ErrorContextCallback * error_context_stack
Definition: elog.c:88
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:225
#define list_make1(x1)
Definition: pg_list.h:139
#define NAMEDATALEN
void FreeExecutorState(EState *estate)
Definition: execUtils.c:183
Subscription * GetSubscription(Oid subid, bool missing_ok)
#define GetPerTupleExprContext(estate)
Definition: executor.h:475
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:158
static StringInfoData reply_message
Definition: walreceiver.c:111
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:1468
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:950
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
#define REPLICA_IDENTITY_FULL
Definition: pg_class.h:179
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:63
#define ERROR
Definition: elog.h:43
static bool ensure_transaction(void)
Definition: worker.c:157
LogicalRepRelation remoterel
#define NAPTIME_PER_CYCLE
Definition: worker.c:90
bool in_remote_transaction
Definition: worker.c:115
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:376
Definition: guc.h:75
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:305
XLogRecPtr reply_lsn
static bool am_tablesync_worker(void)
static void apply_handle_delete(StringInfo s)
Definition: worker.c:773
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
#define DEBUG2
Definition: elog.h:24
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:156
char * c
#define walrcv_identify_system(conn, primary_tli, server_version)
Definition: walreceiver.h:248
void logicalrep_worker_attach(int slot)
Definition: launcher.c:604
#define NoLock
Definition: lockdefs.h:34
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:6683
static char * buf
Definition: pg_test_fsync.c:66
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:733
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:157
bool changed[MaxTupleAttributeNumber]
Definition: logicalproto.h:36
bool * tts_isnull
Definition: tuptable.h:126
void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:285
ResultRelInfo * es_result_relations
Definition: execnodes.h:441
#define RowExclusiveLock
Definition: lockdefs.h:38
#define RelationGetRelationName(relation)
Definition: rel.h:436
XLogRecPtr startpoint
Definition: walreceiver.h:145
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:187
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
unsigned int uint32
Definition: c.h:268
int pgsocket
Definition: port.h:22
List * publications
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
static void apply_handle_begin(StringInfo s)
Definition: worker.c:428
RepOriginId replorigin_create(char *roname)
Definition: origin.c:240
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid)
Definition: postmaster.c:5550
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:223
TupleTableSlot * es_trig_tuple_slot
Definition: execnodes.h:460
#define ereport(elevel, rest)
Definition: elog.h:122
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2599
void slot_getallattrs(TupleTableSlot *slot)
Definition: heaptuple.c:1238
MemoryContext TopMemoryContext
Definition: mcxt.c:43
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:366
EState * CreateExecutorState(void)
Definition: execUtils.c:80
Definition: guc.h:72
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:618
MemoryContext ApplyContext
Definition: worker.c:108
static char ** options
XLogRecPtr final_lsn
Definition: logicalproto.h:67
#define DLIST_STATIC_INIT(name)
Definition: ilist.h:248
static void apply_handle_commit(StringInfo s)
Definition: worker.c:447
union WalRcvStreamOptions::@97 proto
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:67
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:121
static void logicalrep_worker_sighup(SIGNAL_ARGS)
Definition: worker.c:1475
Node * build_column_default(Relation rel, int attrno)
#define SUBREL_STATE_SYNCDONE
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
List * es_tupleTable
Definition: execnodes.h:473
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1389
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:156
static void apply_handle_update(StringInfo s)
Definition: worker.c:652
uintptr_t Datum
Definition: postgres.h:372
void CommandCounterIncrement(void)
Definition: xact.c:922
void ExecSetSlotDescriptor(TupleTableSlot *slot, TupleDesc tupdesc)
Definition: execTuples.c:247
#define PGINVALID_SOCKET
Definition: port.h:24
int es_num_result_relations
Definition: execnodes.h:442
void EvalPlanQualInit(EPQState *epqstate, EState *estate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2787
#define SIGHUP
Definition: win32.h:188
static void apply_handle_relation(StringInfo s)
Definition: worker.c:513
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
XLogRecPtr local_end
Definition: worker.c:95
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4343
static void apply_dispatch(StringInfo s)
Definition: worker.c:872
static void maybe_reread_subscription(void)
Definition: worker.c:1332
#define makeNode(_type_)
Definition: nodes.h:557
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
TimestampTz last_recv_time
static MemoryContext ApplyMessageContext
Definition: worker.c:107
#define SIGNAL_ARGS
Definition: c.h:1080
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:676
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
RepOriginId replorigin_session_origin
Definition: origin.c:155
static void apply_handle_origin(StringInfo s)
Definition: worker.c:491
void StartTransactionCommand(void)
Definition: xact.c:2680
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:137
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
static int server_version
Definition: pg_dumpall.c:82
static void slot_store_error_callback(void *arg)
Definition: worker.c:280
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:211
bool IsTransactionState(void)
Definition: xact.c:350
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:258
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:92
int WalWriterDelay
Definition: walwriter.c:68
bool MySubscriptionValid
Definition: worker.c:113
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:480
void FreeSubscription(Subscription *sub)
RTEKind rtekind
Definition: parsenodes.h:944
static Datum values[MAXATTR]
Definition: bootstrap.c:163
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4363
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:740
void(* callback)(void *arg)
Definition: elog.h:239
static void slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values, bool *replaces)
Definition: worker.c:369
void * palloc(Size size)
Definition: mcxt.c:849
int errmsg(const char *fmt,...)
Definition: elog.c:797
void die(SIGNAL_ARGS)
Definition: postgres.c:2617
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:1010
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
int i
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
#define errcontext
Definition: elog.h:164
void * arg
struct Latch * MyLatch
Definition: globals.c:52
struct FlushPosition FlushPosition
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:113
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
HeapTuple tts_tuple
Definition: tuptable.h:120
#define elog
Definition: elog.h:219
XLogRecPtr commit_lsn
Definition: logicalproto.h:74
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1726
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1738
#define WL_LATCH_SET
Definition: latch.h:124
#define SUBREL_STATE_READY
#define RelationGetRelid(relation)
Definition: rel.h:416
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:224
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4667
void ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
TimestampTz committime
Definition: logicalproto.h:76
void logicalrep_typmap_update(LogicalRepTyp *remotetyp)
Definition: relation.c:415
void ApplyWorkerMain(Datum main_arg)
Definition: worker.c:1489
uint32 LogicalRepRelId
Definition: logicalproto.h:39
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:488
TimestampTz reply_time
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:222
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:271
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5579
void pgstat_report_stat(bool force)
Definition: pgstat.c:812
static volatile sig_atomic_t got_SIGHUP
Definition: worker.c:125
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:242
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:443