PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
worker.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * worker.c
3  * PostgreSQL logical replication worker (apply)
4  *
5  * Copyright (c) 2016-2017, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/worker.c
9  *
10  * NOTES
11  * This file contains the worker which applies logical changes as they come
12  * from remote logical replication stream.
13  *
14  * The main worker (apply) is started by logical replication worker
15  * launcher for every enabled subscription in a database. It uses
16  * walsender protocol to communicate with publisher.
17  *
18  * This module includes server facing code and shares libpqwalreceiver
19  * module with walreceiver for providing the libpq specific functionality.
20  *
21  *-------------------------------------------------------------------------
22  */
23 
24 #include "postgres.h"
25 
26 #include "miscadmin.h"
27 #include "pgstat.h"
28 #include "funcapi.h"
29 
30 #include "access/xact.h"
31 #include "access/xlog_internal.h"
32 
33 #include "catalog/namespace.h"
36 
37 #include "commands/trigger.h"
38 
39 #include "executor/executor.h"
41 
42 #include "libpq/pqformat.h"
43 #include "libpq/pqsignal.h"
44 
45 #include "mb/pg_wchar.h"
46 
47 #include "nodes/makefuncs.h"
48 
49 #include "optimizer/planner.h"
50 
51 #include "parser/parse_relation.h"
52 
53 #include "postmaster/bgworker.h"
54 #include "postmaster/postmaster.h"
55 
56 #include "replication/decode.h"
57 #include "replication/logical.h"
62 #include "replication/origin.h"
63 #include "replication/snapbuild.h"
66 
67 #include "rewrite/rewriteHandler.h"
68 
69 #include "storage/bufmgr.h"
70 #include "storage/ipc.h"
71 #include "storage/lmgr.h"
72 #include "storage/proc.h"
73 #include "storage/procarray.h"
74 
75 #include "tcop/tcopprot.h"
76 
77 #include "utils/builtins.h"
78 #include "utils/catcache.h"
79 #include "utils/datum.h"
80 #include "utils/fmgroids.h"
81 #include "utils/guc.h"
82 #include "utils/inval.h"
83 #include "utils/lsyscache.h"
84 #include "utils/memutils.h"
85 #include "utils/timeout.h"
86 #include "utils/tqual.h"
87 #include "utils/syscache.h"
88 
89 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
90 
91 typedef struct FlushPosition
92 {
97 
99 
100 typedef struct SlotErrCallbackArg
101 {
103  int attnum;
105 
108 
110 
112 bool MySubscriptionValid = false;
113 
116 
117 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
118 
119 static void store_flush_position(XLogRecPtr remote_lsn);
120 
121 static void maybe_reread_subscription(void);
122 
123 /* Flags set by signal handlers */
124 static volatile sig_atomic_t got_SIGHUP = false;
125 
126 /*
127  * Should this worker apply changes for given relation.
128  *
129  * This is mainly needed for initial relation data sync as that runs in
130  * separate worker process running in parallel and we need some way to skip
131  * changes coming to the main apply worker during the sync of a table.
132  *
133  * Note we need to do smaller or equals comparison for SYNCDONE state because
134  * it might hold position of end of initial slot consistent point WAL
135  * record + 1 (ie start of next record) and next record can be COMMIT of
136  * transaction we are now processing (which is what we set remote_final_lsn
137  * to in apply_handle_begin).
138  */
139 static bool
141 {
142  if (am_tablesync_worker())
143  return MyLogicalRepWorker->relid == rel->localreloid;
144  else
145  return (rel->state == SUBREL_STATE_READY ||
146  (rel->state == SUBREL_STATE_SYNCDONE &&
147  rel->statelsn <= remote_final_lsn));
148 }
149 
150 /*
151  * Make sure that we started local transaction.
152  *
153  * Also switches to ApplyMessageContext as necessary.
154  */
155 static bool
157 {
158  if (IsTransactionState())
159  {
161 
162  if (CurrentMemoryContext != ApplyMessageContext)
163  MemoryContextSwitchTo(ApplyMessageContext);
164 
165  return false;
166  }
167 
170 
172 
173  MemoryContextSwitchTo(ApplyMessageContext);
174  return true;
175 }
176 
177 
178 /*
179  * Executor state preparation for evaluation of constraint expressions,
180  * indexes and triggers.
181  *
182  * This is based on similar code in copy.c
183  */
184 static EState *
186 {
187  EState *estate;
188  ResultRelInfo *resultRelInfo;
189  RangeTblEntry *rte;
190 
191  estate = CreateExecutorState();
192 
193  rte = makeNode(RangeTblEntry);
194  rte->rtekind = RTE_RELATION;
195  rte->relid = RelationGetRelid(rel->localrel);
196  rte->relkind = rel->localrel->rd_rel->relkind;
197  estate->es_range_table = list_make1(rte);
198 
199  resultRelInfo = makeNode(ResultRelInfo);
200  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
201 
202  estate->es_result_relations = resultRelInfo;
203  estate->es_num_result_relations = 1;
204  estate->es_result_relation_info = resultRelInfo;
205 
206  /* Triggers might need a slot */
207  if (resultRelInfo->ri_TrigDesc)
208  estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
209 
210  /* Prepare to catch AFTER triggers. */
212 
213  return estate;
214 }
215 
216 /*
217  * Executes default values for columns for which we can't map to remote
218  * relation columns.
219  *
220  * This allows us to support tables which have more columns on the downstream
221  * than on the upstream.
222  */
223 static void
225  TupleTableSlot *slot)
226 {
227  TupleDesc desc = RelationGetDescr(rel->localrel);
228  int num_phys_attrs = desc->natts;
229  int i;
230  int attnum,
231  num_defaults = 0;
232  int *defmap;
233  ExprState **defexprs;
234  ExprContext *econtext;
235 
236  econtext = GetPerTupleExprContext(estate);
237 
238  /* We got all the data via replication, no need to evaluate anything. */
239  if (num_phys_attrs == rel->remoterel.natts)
240  return;
241 
242  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
243  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
244 
245  for (attnum = 0; attnum < num_phys_attrs; attnum++)
246  {
247  Expr *defexpr;
248 
249  if (desc->attrs[attnum]->attisdropped)
250  continue;
251 
252  if (rel->attrmap[attnum] >= 0)
253  continue;
254 
255  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
256 
257  if (defexpr != NULL)
258  {
259  /* Run the expression through planner */
260  defexpr = expression_planner(defexpr);
261 
262  /* Initialize executable expression in copycontext */
263  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
264  defmap[num_defaults] = attnum;
265  num_defaults++;
266  }
267 
268  }
269 
270  for (i = 0; i < num_defaults; i++)
271  slot->tts_values[defmap[i]] =
272  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
273 }
274 
275 /*
276  * Error callback to give more context info about type conversion failure.
277  */
278 static void
280 {
281  SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
282  Oid remotetypoid,
283  localtypoid;
284 
285  if (errarg->attnum < 0)
286  return;
287 
288  remotetypoid = errarg->rel->atttyps[errarg->attnum];
289  localtypoid = logicalrep_typmap_getid(remotetypoid);
290  errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
291  "remote type %s, local type %s",
292  errarg->rel->nspname, errarg->rel->relname,
293  errarg->rel->attnames[errarg->attnum],
294  format_type_be(remotetypoid),
295  format_type_be(localtypoid));
296 }
297 
298 /*
299  * Store data in C string form into slot.
300  * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
301  * use better.
302  */
303 static void
305  char **values)
306 {
307  int natts = slot->tts_tupleDescriptor->natts;
308  int i;
309  SlotErrCallbackArg errarg;
310  ErrorContextCallback errcallback;
311 
312  ExecClearTuple(slot);
313 
314  /* Push callback + info on the error context stack */
315  errarg.rel = &rel->remoterel;
316  errarg.attnum = -1;
317  errcallback.callback = slot_store_error_callback;
318  errcallback.arg = (void *) &errarg;
319  errcallback.previous = error_context_stack;
320  error_context_stack = &errcallback;
321 
322  /* Call the "in" function for each non-dropped attribute */
323  for (i = 0; i < natts; i++)
324  {
326  int remoteattnum = rel->attrmap[i];
327 
328  if (!att->attisdropped && remoteattnum >= 0 &&
329  values[remoteattnum] != NULL)
330  {
331  Oid typinput;
332  Oid typioparam;
333 
334  errarg.attnum = remoteattnum;
335 
336  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
337  slot->tts_values[i] = OidInputFunctionCall(typinput,
338  values[remoteattnum],
339  typioparam,
340  att->atttypmod);
341  slot->tts_isnull[i] = false;
342  }
343  else
344  {
345  /*
346  * We assign NULL to dropped attributes, NULL values, and missing
347  * values (missing values should be later filled using
348  * slot_fill_defaults).
349  */
350  slot->tts_values[i] = (Datum) 0;
351  slot->tts_isnull[i] = true;
352  }
353  }
354 
355  /* Pop the error context stack */
356  error_context_stack = errcallback.previous;
357 
358  ExecStoreVirtualTuple(slot);
359 }
360 
361 /*
362  * Modify slot with user data provided as C strings.
363  * This is somewhat similar to heap_modify_tuple but also calls the type
364  * input function on the user data as the input is the text representation
365  * of the types.
366  */
367 static void
369  char **values, bool *replaces)
370 {
371  int natts = slot->tts_tupleDescriptor->natts;
372  int i;
373  SlotErrCallbackArg errarg;
374  ErrorContextCallback errcallback;
375 
376  slot_getallattrs(slot);
377  ExecClearTuple(slot);
378 
379  /* Push callback + info on the error context stack */
380  errarg.rel = &rel->remoterel;
381  errarg.attnum = -1;
382  errcallback.callback = slot_store_error_callback;
383  errcallback.arg = (void *) &errarg;
384  errcallback.previous = error_context_stack;
385  error_context_stack = &errcallback;
386 
387  /* Call the "in" function for each replaced attribute */
388  for (i = 0; i < natts; i++)
389  {
391  int remoteattnum = rel->attrmap[i];
392 
393  if (remoteattnum >= 0 && !replaces[remoteattnum])
394  continue;
395 
396  if (remoteattnum >= 0 && values[remoteattnum] != NULL)
397  {
398  Oid typinput;
399  Oid typioparam;
400 
401  errarg.attnum = remoteattnum;
402 
403  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
404  slot->tts_values[i] = OidInputFunctionCall(typinput, values[i],
405  typioparam,
406  att->atttypmod);
407  slot->tts_isnull[i] = false;
408  }
409  else
410  {
411  slot->tts_values[i] = (Datum) 0;
412  slot->tts_isnull[i] = true;
413  }
414  }
415 
416  /* Pop the error context stack */
417  error_context_stack = errcallback.previous;
418 
419  ExecStoreVirtualTuple(slot);
420 }
421 
422 /*
423  * Handle BEGIN message.
424  */
425 static void
427 {
428  LogicalRepBeginData begin_data;
429 
430  logicalrep_read_begin(s, &begin_data);
431 
432  remote_final_lsn = begin_data.final_lsn;
433 
434  in_remote_transaction = true;
435 
437 }
438 
439 /*
440  * Handle COMMIT message.
441  *
442  * TODO, support tracking of multiple origins
443  */
444 static void
446 {
447  LogicalRepCommitData commit_data;
448 
449  logicalrep_read_commit(s, &commit_data);
450 
451  Assert(commit_data.commit_lsn == remote_final_lsn);
452 
453  /* The synchronization worker runs in single transaction. */
455  {
456  /*
457  * Update origin state so we can restart streaming from correct
458  * position in case of crash.
459  */
462 
464  pgstat_report_stat(false);
465 
466  store_flush_position(commit_data.end_lsn);
467  }
468  else
469  {
470  /* Process any invalidation messages that might have accumulated. */
473  }
474 
475  in_remote_transaction = false;
476 
477  /* Process any tables that are being synchronized in parallel. */
478  process_syncing_tables(commit_data.end_lsn);
479 
481 }
482 
483 /*
484  * Handle ORIGIN message.
485  *
486  * TODO, support tracking of multiple origins
487  */
488 static void
490 {
491  /*
492  * ORIGIN message can only come inside remote transaction and before any
493  * actual writes.
494  */
495  if (!in_remote_transaction ||
497  ereport(ERROR,
498  (errcode(ERRCODE_PROTOCOL_VIOLATION),
499  errmsg("ORIGIN message sent out of order")));
500 }
501 
502 /*
503  * Handle RELATION message.
504  *
505  * Note we don't do validation against local schema here. The validation
506  * against local schema is postponed until first change for given relation
507  * comes as we only care about it when applying changes for it anyway and we
508  * do less locking this way.
509  */
510 static void
512 {
513  LogicalRepRelation *rel;
514 
515  rel = logicalrep_read_rel(s);
517 }
518 
519 /*
520  * Handle TYPE message.
521  *
522  * Note we don't do local mapping here, that's done when the type is
523  * actually used.
524  */
525 static void
527 {
528  LogicalRepTyp typ;
529 
530  logicalrep_read_typ(s, &typ);
532 }
533 
534 /*
535  * Get replica identity index or if it is not defined a primary key.
536  *
537  * If neither is defined, returns InvalidOid
538  */
539 static Oid
541 {
542  Oid idxoid;
543 
544  idxoid = RelationGetReplicaIndex(rel);
545 
546  if (!OidIsValid(idxoid))
547  idxoid = RelationGetPrimaryKeyIndex(rel);
548 
549  return idxoid;
550 }
551 
552 /*
553  * Handle INSERT message.
554  */
555 static void
557 {
559  LogicalRepTupleData newtup;
560  LogicalRepRelId relid;
561  EState *estate;
562  TupleTableSlot *remoteslot;
563  MemoryContext oldctx;
564 
566 
567  relid = logicalrep_read_insert(s, &newtup);
570  {
571  /*
572  * The relation can't become interesting in the middle of the
573  * transaction so it's safe to unlock it.
574  */
576  return;
577  }
578 
579  /* Initialize the executor state. */
580  estate = create_estate_for_relation(rel);
581  remoteslot = ExecInitExtraTupleSlot(estate);
583 
584  /* Process and store remote tuple in the slot */
586  slot_store_cstrings(remoteslot, rel, newtup.values);
587  slot_fill_defaults(rel, estate, remoteslot);
588  MemoryContextSwitchTo(oldctx);
589 
592 
593  /* Do the insert. */
594  ExecSimpleRelationInsert(estate, remoteslot);
595 
596  /* Cleanup. */
599 
600  /* Handle queued AFTER triggers. */
601  AfterTriggerEndQuery(estate);
602 
603  ExecResetTupleTable(estate->es_tupleTable, false);
604  FreeExecutorState(estate);
605 
607 
609 }
610 
611 /*
612  * Check if the logical replication relation is updatable and throw
613  * appropriate error if it isn't.
614  */
615 static void
617 {
618  /* Updatable, no error. */
619  if (rel->updatable)
620  return;
621 
622  /*
623  * We are in error mode so it's fine this is somewhat slow. It's better to
624  * give user correct error.
625  */
627  {
628  ereport(ERROR,
629  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
630  errmsg("publisher does not send replica identity column "
631  "expected by the logical replication target relation \"%s.%s\"",
632  rel->remoterel.nspname, rel->remoterel.relname)));
633  }
634 
635  ereport(ERROR,
636  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
637  errmsg("logical replication target relation \"%s.%s\" has "
638  "neither REPLICA IDENTITY index nor PRIMARY "
639  "KEY and published relation does not have "
640  "REPLICA IDENTITY FULL",
641  rel->remoterel.nspname, rel->remoterel.relname)));
642 }
643 
644 /*
645  * Handle UPDATE message.
646  *
647  * TODO: FDW support
648  */
649 static void
651 {
653  LogicalRepRelId relid;
654  Oid idxoid;
655  EState *estate;
656  EPQState epqstate;
657  LogicalRepTupleData oldtup;
658  LogicalRepTupleData newtup;
659  bool has_oldtup;
660  TupleTableSlot *localslot;
661  TupleTableSlot *remoteslot;
662  bool found;
663  MemoryContext oldctx;
664 
666 
667  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
668  &newtup);
671  {
672  /*
673  * The relation can't become interesting in the middle of the
674  * transaction so it's safe to unlock it.
675  */
677  return;
678  }
679 
680  /* Check if we can do the update. */
682 
683  /* Initialize the executor state. */
684  estate = create_estate_for_relation(rel);
685  remoteslot = ExecInitExtraTupleSlot(estate);
687  localslot = ExecInitExtraTupleSlot(estate);
689  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
690 
693 
694  /* Build the search tuple. */
696  slot_store_cstrings(remoteslot, rel,
697  has_oldtup ? oldtup.values : newtup.values);
698  MemoryContextSwitchTo(oldctx);
699 
700  /*
701  * Try to find tuple using either replica identity index, primary key or
702  * if needed, sequential scan.
703  */
704  idxoid = GetRelationIdentityOrPK(rel->localrel);
705  Assert(OidIsValid(idxoid) ||
706  (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
707 
708  if (OidIsValid(idxoid))
709  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
711  remoteslot, localslot);
712  else
714  remoteslot, localslot);
715 
716  ExecClearTuple(remoteslot);
717 
718  /*
719  * Tuple found.
720  *
721  * Note this will fail if there are other conflicting unique indexes.
722  */
723  if (found)
724  {
725  /* Process and store remote tuple in the slot */
727  ExecStoreTuple(localslot->tts_tuple, remoteslot, InvalidBuffer, false);
728  slot_modify_cstrings(remoteslot, rel, newtup.values, newtup.changed);
729  MemoryContextSwitchTo(oldctx);
730 
731  EvalPlanQualSetSlot(&epqstate, remoteslot);
732 
733  /* Do the actual update. */
734  ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
735  }
736  else
737  {
738  /*
739  * The tuple to be updated could not be found.
740  *
741  * TODO what to do here, change the log level to LOG perhaps?
742  */
743  elog(DEBUG1,
744  "logical replication did not find row for update "
745  "in replication target relation \"%s\"",
747  }
748 
749  /* Cleanup. */
752 
753  /* Handle queued AFTER triggers. */
754  AfterTriggerEndQuery(estate);
755 
756  EvalPlanQualEnd(&epqstate);
757  ExecResetTupleTable(estate->es_tupleTable, false);
758  FreeExecutorState(estate);
759 
761 
763 }
764 
765 /*
766  * Handle DELETE message.
767  *
768  * TODO: FDW support
769  */
770 static void
772 {
774  LogicalRepTupleData oldtup;
775  LogicalRepRelId relid;
776  Oid idxoid;
777  EState *estate;
778  EPQState epqstate;
779  TupleTableSlot *remoteslot;
780  TupleTableSlot *localslot;
781  bool found;
782  MemoryContext oldctx;
783 
785 
786  relid = logicalrep_read_delete(s, &oldtup);
789  {
790  /*
791  * The relation can't become interesting in the middle of the
792  * transaction so it's safe to unlock it.
793  */
795  return;
796  }
797 
798  /* Check if we can do the delete. */
800 
801  /* Initialize the executor state. */
802  estate = create_estate_for_relation(rel);
803  remoteslot = ExecInitExtraTupleSlot(estate);
805  localslot = ExecInitExtraTupleSlot(estate);
807  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
808 
811 
812  /* Find the tuple using the replica identity index. */
814  slot_store_cstrings(remoteslot, rel, oldtup.values);
815  MemoryContextSwitchTo(oldctx);
816 
817  /*
818  * Try to find tuple using either replica identity index, primary key or
819  * if needed, sequential scan.
820  */
821  idxoid = GetRelationIdentityOrPK(rel->localrel);
822  Assert(OidIsValid(idxoid) ||
824 
825  if (OidIsValid(idxoid))
826  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
828  remoteslot, localslot);
829  else
831  remoteslot, localslot);
832  /* If found delete it. */
833  if (found)
834  {
835  EvalPlanQualSetSlot(&epqstate, localslot);
836 
837  /* Do the actual delete. */
838  ExecSimpleRelationDelete(estate, &epqstate, localslot);
839  }
840  else
841  {
842  /* The tuple to be deleted could not be found. */
843  ereport(DEBUG1,
844  (errmsg("logical replication could not find row for delete "
845  "in replication target %s",
847  }
848 
849  /* Cleanup. */
852 
853  /* Handle queued AFTER triggers. */
854  AfterTriggerEndQuery(estate);
855 
856  EvalPlanQualEnd(&epqstate);
857  ExecResetTupleTable(estate->es_tupleTable, false);
858  FreeExecutorState(estate);
859 
861 
863 }
864 
865 
866 /*
867  * Logical replication protocol message dispatcher.
868  */
869 static void
871 {
872  char action = pq_getmsgbyte(s);
873 
874  switch (action)
875  {
876  /* BEGIN */
877  case 'B':
879  break;
880  /* COMMIT */
881  case 'C':
883  break;
884  /* INSERT */
885  case 'I':
887  break;
888  /* UPDATE */
889  case 'U':
891  break;
892  /* DELETE */
893  case 'D':
895  break;
896  /* RELATION */
897  case 'R':
899  break;
900  /* TYPE */
901  case 'Y':
903  break;
904  /* ORIGIN */
905  case 'O':
907  break;
908  default:
909  ereport(ERROR,
910  (errcode(ERRCODE_PROTOCOL_VIOLATION),
911  errmsg("invalid logical replication message type %c", action)));
912  }
913 }
914 
915 /*
916  * Figure out which write/flush positions to report to the walsender process.
917  *
918  * We can't simply report back the last LSN the walsender sent us because the
919  * local transaction might not yet be flushed to disk locally. Instead we
920  * build a list that associates local with remote LSNs for every commit. When
921  * reporting back the flush position to the sender we iterate that list and
922  * check which entries on it are already locally flushed. Those we can report
923  * as having been flushed.
924  *
925  * The have_pending_txes is true if there are outstanding transactions that
926  * need to be flushed.
927  */
928 static void
930  bool *have_pending_txes)
931 {
932  dlist_mutable_iter iter;
933  XLogRecPtr local_flush = GetFlushRecPtr();
934 
935  *write = InvalidXLogRecPtr;
936  *flush = InvalidXLogRecPtr;
937 
938  dlist_foreach_modify(iter, &lsn_mapping)
939  {
940  FlushPosition *pos =
941  dlist_container(FlushPosition, node, iter.cur);
942 
943  *write = pos->remote_end;
944 
945  if (pos->local_end <= local_flush)
946  {
947  *flush = pos->remote_end;
948  dlist_delete(iter.cur);
949  pfree(pos);
950  }
951  else
952  {
953  /*
954  * Don't want to uselessly iterate over the rest of the list which
955  * could potentially be long. Instead get the last element and
956  * grab the write position from there.
957  */
958  pos = dlist_tail_element(FlushPosition, node,
959  &lsn_mapping);
960  *write = pos->remote_end;
961  *have_pending_txes = true;
962  return;
963  }
964  }
965 
966  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
967 }
968 
969 /*
970  * Store current remote/local lsn pair in the tracking list.
971  */
972 static void
974 {
975  FlushPosition *flushpos;
976 
977  /* Need to do this in permanent context */
978  MemoryContextSwitchTo(ApplyContext);
979 
980  /* Track commit lsn */
981  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
982  flushpos->local_end = XactLastCommitEnd;
983  flushpos->remote_end = remote_lsn;
984 
985  dlist_push_tail(&lsn_mapping, &flushpos->node);
986  MemoryContextSwitchTo(ApplyMessageContext);
987 }
988 
989 
990 /* Update statistics of the worker. */
991 static void
993 {
995  MyLogicalRepWorker->last_send_time = send_time;
997  if (reply)
998  {
1000  MyLogicalRepWorker->reply_time = send_time;
1001  }
1002 }
1003 
1004 /*
1005  * Apply main loop.
1006  */
1007 static void
1009 {
1010  /*
1011  * Init the ApplyMessageContext which we clean up after each replication
1012  * protocol message.
1013  */
1014  ApplyMessageContext = AllocSetContextCreate(ApplyContext,
1015  "ApplyMessageContext",
1017 
1018  /* mark as idle, before starting to loop */
1020 
1021  for (;;)
1022  {
1024  int rc;
1025  int len;
1026  char *buf = NULL;
1027  bool endofstream = false;
1028  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1029  bool ping_sent = false;
1030 
1032 
1033  MemoryContextSwitchTo(ApplyMessageContext);
1034 
1035  len = walrcv_receive(wrconn, &buf, &fd);
1036 
1037  if (len != 0)
1038  {
1039  /* Process the data */
1040  for (;;)
1041  {
1043 
1044  if (len == 0)
1045  {
1046  break;
1047  }
1048  else if (len < 0)
1049  {
1050  ereport(LOG,
1051  (errmsg("data stream from publisher has ended")));
1052  endofstream = true;
1053  break;
1054  }
1055  else
1056  {
1057  int c;
1058  StringInfoData s;
1059 
1060  /* Reset timeout. */
1061  last_recv_timestamp = GetCurrentTimestamp();
1062  ping_sent = false;
1063 
1064  /* Ensure we are reading the data into our memory context. */
1065  MemoryContextSwitchTo(ApplyMessageContext);
1066 
1067  s.data = buf;
1068  s.len = len;
1069  s.cursor = 0;
1070  s.maxlen = -1;
1071 
1072  c = pq_getmsgbyte(&s);
1073 
1074  if (c == 'w')
1075  {
1076  XLogRecPtr start_lsn;
1077  XLogRecPtr end_lsn;
1078  TimestampTz send_time;
1079 
1080  start_lsn = pq_getmsgint64(&s);
1081  end_lsn = pq_getmsgint64(&s);
1082  send_time = pq_getmsgint64(&s);
1083 
1084  if (last_received < start_lsn)
1085  last_received = start_lsn;
1086 
1087  if (last_received < end_lsn)
1088  last_received = end_lsn;
1089 
1090  UpdateWorkerStats(last_received, send_time, false);
1091 
1092  apply_dispatch(&s);
1093  }
1094  else if (c == 'k')
1095  {
1096  XLogRecPtr end_lsn;
1098  bool reply_requested;
1099 
1100  end_lsn = pq_getmsgint64(&s);
1101  timestamp = pq_getmsgint64(&s);
1102  reply_requested = pq_getmsgbyte(&s);
1103 
1104  if (last_received < end_lsn)
1105  last_received = end_lsn;
1106 
1107  send_feedback(last_received, reply_requested, false);
1108  UpdateWorkerStats(last_received, timestamp, true);
1109  }
1110  /* other message types are purposefully ignored */
1111 
1112  MemoryContextReset(ApplyMessageContext);
1113  }
1114 
1115  len = walrcv_receive(wrconn, &buf, &fd);
1116  }
1117 
1118  /* confirm all writes at once */
1119  send_feedback(last_received, false, false);
1120  }
1121 
1122  if (!in_remote_transaction)
1123  {
1124  /*
1125  * If we didn't get any transactions for a while there might be
1126  * unconsumed invalidation messages in the queue, consume them
1127  * now.
1128  */
1131 
1132  /* Process any table synchronization changes. */
1133  process_syncing_tables(last_received);
1134  }
1135 
1136  /* Cleanup the memory. */
1137  MemoryContextResetAndDeleteChildren(ApplyMessageContext);
1139 
1140  /* Check if we need to exit the streaming loop. */
1141  if (endofstream)
1142  {
1143  TimeLineID tli;
1144 
1145  walrcv_endstreaming(wrconn, &tli);
1146  break;
1147  }
1148 
1149  /*
1150  * Wait for more data or latch.
1151  */
1155  fd, NAPTIME_PER_CYCLE,
1157 
1158  /* Emergency bailout if postmaster has died */
1159  if (rc & WL_POSTMASTER_DEATH)
1160  proc_exit(1);
1161 
1162  if (rc & WL_LATCH_SET)
1163  {
1166  }
1167 
1168  if (got_SIGHUP)
1169  {
1170  got_SIGHUP = false;
1172  }
1173 
1174  if (rc & WL_TIMEOUT)
1175  {
1176  /*
1177  * We didn't receive anything new. If we haven't heard anything
1178  * from the server for more than wal_receiver_timeout / 2, ping
1179  * the server. Also, if it's been longer than
1180  * wal_receiver_status_interval since the last update we sent,
1181  * send a status update to the master anyway, to report any
1182  * progress in applying WAL.
1183  */
1184  bool requestReply = false;
1185 
1186  /*
1187  * Check if time since last receive from standby has reached the
1188  * configured limit.
1189  */
1190  if (wal_receiver_timeout > 0)
1191  {
1193  TimestampTz timeout;
1194 
1195  timeout =
1196  TimestampTzPlusMilliseconds(last_recv_timestamp,
1198 
1199  if (now >= timeout)
1200  ereport(ERROR,
1201  (errmsg("terminating logical replication worker due to timeout")));
1202 
1203  /*
1204  * We didn't receive anything new, for half of receiver
1205  * replication timeout. Ping the server.
1206  */
1207  if (!ping_sent)
1208  {
1209  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1210  (wal_receiver_timeout / 2));
1211  if (now >= timeout)
1212  {
1213  requestReply = true;
1214  ping_sent = true;
1215  }
1216  }
1217  }
1218 
1219  send_feedback(last_received, requestReply, requestReply);
1220  }
1221  }
1222 }
1223 
1224 /*
1225  * Send a Standby Status Update message to server.
1226  *
1227  * 'recvpos' is the latest LSN we've received data to, force is set if we need
1228  * to send a response to avoid timeouts.
1229  */
1230 static void
1231 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
1232 {
1233  static StringInfo reply_message = NULL;
1234  static TimestampTz send_time = 0;
1235 
1236  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1237  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1238  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1239 
1240  XLogRecPtr writepos;
1241  XLogRecPtr flushpos;
1242  TimestampTz now;
1243  bool have_pending_txes;
1244 
1245  /*
1246  * If the user doesn't want status to be reported to the publisher, be
1247  * sure to exit before doing anything at all.
1248  */
1249  if (!force && wal_receiver_status_interval <= 0)
1250  return;
1251 
1252  /* It's legal to not pass a recvpos */
1253  if (recvpos < last_recvpos)
1254  recvpos = last_recvpos;
1255 
1256  get_flush_position(&writepos, &flushpos, &have_pending_txes);
1257 
1258  /*
1259  * No outstanding transactions to flush, we can report the latest received
1260  * position. This is important for synchronous replication.
1261  */
1262  if (!have_pending_txes)
1263  flushpos = writepos = recvpos;
1264 
1265  if (writepos < last_writepos)
1266  writepos = last_writepos;
1267 
1268  if (flushpos < last_flushpos)
1269  flushpos = last_flushpos;
1270 
1271  now = GetCurrentTimestamp();
1272 
1273  /* if we've already reported everything we're good */
1274  if (!force &&
1275  writepos == last_writepos &&
1276  flushpos == last_flushpos &&
1277  !TimestampDifferenceExceeds(send_time, now,
1279  return;
1280  send_time = now;
1281 
1282  if (!reply_message)
1283  {
1284  MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
1285 
1286  reply_message = makeStringInfo();
1287  MemoryContextSwitchTo(oldctx);
1288  }
1289  else
1290  resetStringInfo(reply_message);
1291 
1292  pq_sendbyte(reply_message, 'r');
1293  pq_sendint64(reply_message, recvpos); /* write */
1294  pq_sendint64(reply_message, flushpos); /* flush */
1295  pq_sendint64(reply_message, writepos); /* apply */
1296  pq_sendint64(reply_message, now); /* sendTime */
1297  pq_sendbyte(reply_message, requestReply); /* replyRequested */
1298 
1299  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1300  force,
1301  (uint32) (recvpos >> 32), (uint32) recvpos,
1302  (uint32) (writepos >> 32), (uint32) writepos,
1303  (uint32) (flushpos >> 32), (uint32) flushpos
1304  );
1305 
1306  walrcv_send(wrconn, reply_message->data, reply_message->len);
1307 
1308  if (recvpos > last_recvpos)
1309  last_recvpos = recvpos;
1310  if (writepos > last_writepos)
1311  last_writepos = writepos;
1312  if (flushpos > last_flushpos)
1313  last_flushpos = flushpos;
1314 }
1315 
1316 /*
1317  * Reread subscription info if needed. Most changes will be exit.
1318  */
1319 static void
1321 {
1322  MemoryContext oldctx;
1324  bool started_tx = false;
1325 
1326  /* When cache state is valid there is nothing to do here. */
1327  if (MySubscriptionValid)
1328  return;
1329 
1330  /* This function might be called inside or outside of transaction. */
1331  if (!IsTransactionState())
1332  {
1334  started_tx = true;
1335  }
1336 
1337  /* Ensure allocations in permanent context. */
1338  oldctx = MemoryContextSwitchTo(ApplyContext);
1339 
1340  newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1341 
1342  /*
1343  * Exit if the subscription was removed. This normally should not happen
1344  * as the worker gets killed during DROP SUBSCRIPTION.
1345  */
1346  if (!newsub)
1347  {
1348  ereport(LOG,
1349  (errmsg("logical replication apply worker for subscription \"%s\" will "
1350  "stop because the subscription was removed",
1351  MySubscription->name)));
1352 
1353  proc_exit(0);
1354  }
1355 
1356  /*
1357  * Exit if the subscription was disabled. This normally should not happen
1358  * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
1359  */
1360  if (!newsub->enabled)
1361  {
1362  ereport(LOG,
1363  (errmsg("logical replication apply worker for subscription \"%s\" will "
1364  "stop because the subscription was disabled",
1365  MySubscription->name)));
1366 
1367  proc_exit(0);
1368  }
1369 
1370  /*
1371  * Exit if connection string was changed. The launcher will start new
1372  * worker.
1373  */
1374  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1375  {
1376  ereport(LOG,
1377  (errmsg("logical replication apply worker for subscription \"%s\" will "
1378  "restart because the connection information was changed",
1379  MySubscription->name)));
1380 
1381  proc_exit(0);
1382  }
1383 
1384  /*
1385  * Exit if subscription name was changed (it's used for
1386  * fallback_application_name). The launcher will start new worker.
1387  */
1388  if (strcmp(newsub->name, MySubscription->name) != 0)
1389  {
1390  ereport(LOG,
1391  (errmsg("logical replication apply worker for subscription \"%s\" will "
1392  "restart because subscription was renamed",
1393  MySubscription->name)));
1394 
1395  proc_exit(0);
1396  }
1397 
1398  /* !slotname should never happen when enabled is true. */
1399  Assert(newsub->slotname);
1400 
1401  /*
1402  * We need to make new connection to new slot if slot name has changed so
1403  * exit here as well if that's the case.
1404  */
1405  if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
1406  {
1407  ereport(LOG,
1408  (errmsg("logical replication apply worker for subscription \"%s\" will "
1409  "restart because the replication slot name was changed",
1410  MySubscription->name)));
1411 
1412  proc_exit(0);
1413  }
1414 
1415  /*
1416  * Exit if publication list was changed. The launcher will start new
1417  * worker.
1418  */
1419  if (!equal(newsub->publications, MySubscription->publications))
1420  {
1421  ereport(LOG,
1422  (errmsg("logical replication apply worker for subscription \"%s\" will "
1423  "restart because subscription's publications were changed",
1424  MySubscription->name)));
1425 
1426  proc_exit(0);
1427  }
1428 
1429  /* Check for other changes that should never happen too. */
1430  if (newsub->dbid != MySubscription->dbid)
1431  {
1432  elog(ERROR, "subscription %u changed unexpectedly",
1434  }
1435 
1436  /* Clean old subscription info and switch to new one. */
1437  FreeSubscription(MySubscription);
1438  MySubscription = newsub;
1439 
1440  MemoryContextSwitchTo(oldctx);
1441 
1442  /* Change synchronous commit according to the user's wishes */
1443  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1445 
1446  if (started_tx)
1448 
1449  MySubscriptionValid = true;
1450 }
1451 
1452 /*
1453  * Callback from subscription syscache invalidation.
1454  */
1455 static void
1456 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
1457 {
1458  MySubscriptionValid = false;
1459 }
1460 
1461 /* SIGHUP: set flag to reload configuration at next convenient time */
1462 static void
1464 {
1465  int save_errno = errno;
1466 
1467  got_SIGHUP = true;
1468 
1469  /* Waken anything waiting on the process latch */
1470  SetLatch(MyLatch);
1471 
1472  errno = save_errno;
1473 }
1474 
1475 /* Logical Replication Apply worker entry point */
1476 void
1478 {
1479  int worker_slot = DatumGetInt32(main_arg);
1480  MemoryContext oldctx;
1481  char originname[NAMEDATALEN];
1482  XLogRecPtr origin_startpos;
1483  char *myslotname;
1485 
1486  /* Attach to slot */
1487  logicalrep_worker_attach(worker_slot);
1488 
1489  /* Setup signal handling */
1491  pqsignal(SIGTERM, die);
1493 
1494  /* Initialise stats to a sanish value */
1497 
1498  /* Load the libpq-specific functions */
1499  load_file("libpqwalreceiver", false);
1500 
1503  "logical replication apply");
1504 
1505  /* Run as replica session replication role. */
1506  SetConfigOption("session_replication_role", "replica",
1508 
1509  /* Connect to our database. */
1512 
1513  /* Load the subscription into persistent memory context. */
1514  ApplyContext = AllocSetContextCreate(TopMemoryContext,
1515  "ApplyContext",
1518  oldctx = MemoryContextSwitchTo(ApplyContext);
1519  MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
1520  MySubscriptionValid = true;
1521  MemoryContextSwitchTo(oldctx);
1522 
1523  /* Setup synchronous commit according to the user's wishes */
1524  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1526 
1527  if (!MySubscription->enabled)
1528  {
1529  ereport(LOG,
1530  (errmsg("logical replication apply worker for subscription \"%s\" will not "
1531  "start because the subscription was disabled during startup",
1532  MySubscription->name)));
1533 
1534  proc_exit(0);
1535  }
1536 
1537  /* Keep us informed about subscription changes. */
1540  (Datum) 0);
1541 
1542  if (am_tablesync_worker())
1543  ereport(LOG,
1544  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
1545  MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
1546  else
1547  ereport(LOG,
1548  (errmsg("logical replication apply worker for subscription \"%s\" has started",
1549  MySubscription->name)));
1550 
1552 
1553  /* Connect to the origin and start the replication. */
1554  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1555  MySubscription->conninfo);
1556 
1557  if (am_tablesync_worker())
1558  {
1559  char *syncslotname;
1560 
1561  /* This is table synchroniation worker, call initial sync. */
1562  syncslotname = LogicalRepSyncTableStart(&origin_startpos);
1563 
1564  /* The slot name needs to be allocated in permanent memory context. */
1565  oldctx = MemoryContextSwitchTo(ApplyContext);
1566  myslotname = pstrdup(syncslotname);
1567  MemoryContextSwitchTo(oldctx);
1568 
1569  pfree(syncslotname);
1570  }
1571  else
1572  {
1573  /* This is main apply worker */
1574  RepOriginId originid;
1575  TimeLineID startpointTLI;
1576  char *err;
1577  int server_version;
1578 
1579  myslotname = MySubscription->slotname;
1580 
1581  /*
1582  * This shouldn't happen if the subscription is enabled, but guard
1583  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
1584  * crash if slot is NULL.)
1585  */
1586  if (!myslotname)
1587  ereport(ERROR,
1588  (errmsg("subscription has no replication slot set")));
1589 
1590  /* Setup replication origin tracking. */
1592  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1593  originid = replorigin_by_name(originname, true);
1594  if (!OidIsValid(originid))
1595  originid = replorigin_create(originname);
1596  replorigin_session_setup(originid);
1597  replorigin_session_origin = originid;
1598  origin_startpos = replorigin_session_get_progress(false);
1600 
1601  wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
1602  &err);
1603  if (wrconn == NULL)
1604  ereport(ERROR,
1605  (errmsg("could not connect to the publisher: %s", err)));
1606 
1607  /*
1608  * We don't really use the output identify_system for anything but it
1609  * does some initializations on the upstream so let's still call it.
1610  */
1611  (void) walrcv_identify_system(wrconn, &startpointTLI,
1612  &server_version);
1613 
1614  }
1615 
1616  /*
1617  * Setup callback for syscache so that we know when something changes in
1618  * the subscription relation state.
1619  */
1622  (Datum) 0);
1623 
1624  /* Build logical replication streaming options. */
1625  options.logical = true;
1626  options.startpoint = origin_startpos;
1627  options.slotname = myslotname;
1628  options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1629  options.proto.logical.publication_names = MySubscription->publications;
1630 
1631  /* Start normal logical streaming replication. */
1632  walrcv_startstreaming(wrconn, &options);
1633 
1634  /* Run the main loop. */
1635  LogicalRepApplyLoop(origin_startpos);
1636 
1637  proc_exit(0);
1638 }
1639 
1640 /*
1641  * Is current process a logical replication worker?
1642  */
1643 bool
1645 {
1646  return MyLogicalRepWorker != NULL;
1647 }
void ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
Subscription * MySubscription
Definition: worker.c:111
static void apply_handle_type(StringInfo s)
Definition: worker.c:526
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:140
#define NIL
Definition: pg_list.h:69
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:320
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:185
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:540
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, Relation partition_root, int instrument_options)
Definition: execMain.c:1299
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:1231
WalReceiverConn * wrconn
Definition: worker.c:109
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:254
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate)
Definition: execTuples.c:852
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
static void store_flush_position(XLogRecPtr remote_lsn)
Definition: worker.c:973
uint32 TimeLineID
Definition: xlogdefs.h:45
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:277
void AcceptInvalidationMessages(void)
Definition: inval.c:679
static XLogRecPtr remote_final_lsn
Definition: worker.c:115
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4676
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
static void apply_handle_insert(StringInfo s)
Definition: worker.c:556
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static dlist_head lsn_mapping
Definition: worker.c:98
#define DatumGetInt32(X)
Definition: postgres.h:478
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:2962
#define RelationGetDescr(relation)
Definition: rel.h:428
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:518
dlist_node node
Definition: worker.c:93
#define write(a, b, c)
Definition: win32.h:14
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2994
int64 timestamp
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:57
int64 TimestampTz
Definition: timestamp.h:39
LogicalRepRelation * rel
Definition: worker.c:102
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
char * pstrdup(const char *in)
Definition: mcxt.c:1077
void CommitTransactionCommand(void)
Definition: xact.c:2748
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:992
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:338
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
StringInfo makeStringInfo(void)
Definition: stringinfo.c:28
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:256
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:324
Expr * expression_planner(Expr *expr)
Definition: planner.c:5931
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
Form_pg_attribute * attrs
Definition: tupdesc.h:74
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
TimestampTz last_send_time
#define InvalidBuffer
Definition: buf.h:25
uint16 RepOriginId
Definition: xlogdefs.h:51
XLogRecPtr last_lsn
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:252
void proc_exit(int code)
Definition: ipc.c:99
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1110
XLogRecPtr remote_end
Definition: worker.c:95
int errcode(int sqlerrcode)
Definition: elog.c:575
char * format_type_be(Oid type_oid)
Definition: format_type.c:94
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:368
Datum * tts_values
Definition: tuptable.h:125
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define WL_SOCKET_READABLE
Definition: latch.h:125
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:135
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8221
void PopActiveSnapshot(void)
Definition: snapmgr.c:812
Oid logicalrep_typmap_getid(Oid remoteid)
Definition: relation.c:442
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:983
void ResetLatch(volatile Latch *latch)
Definition: latch.c:498
List * es_range_table
Definition: execnodes.h:431
#define LOG
Definition: elog.h:26
Form_pg_class rd_rel
Definition: rel.h:114
unsigned int Oid
Definition: postgres_ext.h:31
struct SlotErrCallbackArg SlotErrCallbackArg
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:789
bool IsLogicalWorker(void)
Definition: worker.c:1644
int wal_receiver_status_interval
Definition: walreceiver.c:74
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1649
struct ErrorContextCallback * previous
Definition: elog.h:238
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:304
#define OidIsValid(objectId)
Definition: c.h:538
XLogRecPtr last_lsn
Definition: walsender.c:213
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
static int fd(const char *x, int i)
Definition: preproc-init.c:105
int natts
Definition: tupdesc.h:73
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:206
int wal_receiver_timeout
Definition: walreceiver.c:75
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:929
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:149
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:3153
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:160
ErrorContextCallback * error_context_stack
Definition: elog.c:88
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:224
#define list_make1(x1)
Definition: pg_list.h:139
#define NAMEDATALEN
void FreeExecutorState(EState *estate)
Definition: execUtils.c:178
Subscription * GetSubscription(Oid subid, bool missing_ok)
#define GetPerTupleExprContext(estate)
Definition: executor.h:456
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:158
static StringInfoData reply_message
Definition: walreceiver.c:111
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:1456
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:950
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
#define REPLICA_IDENTITY_FULL
Definition: pg_class.h:179
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:63
#define ERROR
Definition: elog.h:43
static bool ensure_transaction(void)
Definition: worker.c:156
LogicalRepRelation remoterel
#define NAPTIME_PER_CYCLE
Definition: worker.c:89
bool in_remote_transaction
Definition: worker.c:114
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:376
Definition: guc.h:75
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:304
XLogRecPtr reply_lsn
static bool am_tablesync_worker(void)
static void apply_handle_delete(StringInfo s)
Definition: worker.c:771
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
#define DEBUG2
Definition: elog.h:24
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:151
char * c
#define walrcv_identify_system(conn, primary_tli, server_version)
Definition: walreceiver.h:248
void logicalrep_worker_attach(int slot)
Definition: launcher.c:546
#define NoLock
Definition: lockdefs.h:34
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:6672
static char * buf
Definition: pg_test_fsync.c:66
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:733
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:152
bool changed[MaxTupleAttributeNumber]
Definition: logicalproto.h:36
bool * tts_isnull
Definition: tuptable.h:126
void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:266
ResultRelInfo * es_result_relations
Definition: execnodes.h:441
#define RowExclusiveLock
Definition: lockdefs.h:38
#define RelationGetRelationName(relation)
Definition: rel.h:436
XLogRecPtr startpoint
Definition: walreceiver.h:145
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:187
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
unsigned int uint32
Definition: c.h:268
int pgsocket
Definition: port.h:22
List * publications
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
static void apply_handle_begin(StringInfo s)
Definition: worker.c:426
RepOriginId replorigin_create(char *roname)
Definition: origin.c:235
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid)
Definition: postmaster.c:5489
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:223
TupleTableSlot * es_trig_tuple_slot
Definition: execnodes.h:457
#define ereport(elevel, rest)
Definition: elog.h:122
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2599
void slot_getallattrs(TupleTableSlot *slot)
Definition: heaptuple.c:1237
MemoryContext TopMemoryContext
Definition: mcxt.c:43
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:366
EState * CreateExecutorState(void)
Definition: execUtils.c:80
Definition: guc.h:72
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:616
MemoryContext ApplyContext
Definition: worker.c:107
static char ** options
XLogRecPtr final_lsn
Definition: logicalproto.h:67
#define DLIST_STATIC_INIT(name)
Definition: ilist.h:248
static void apply_handle_commit(StringInfo s)
Definition: worker.c:445
union WalRcvStreamOptions::@97 proto
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:67
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:121
static void logicalrep_worker_sighup(SIGNAL_ARGS)
Definition: worker.c:1463
Node * build_column_default(Relation rel, int attrno)
#define SUBREL_STATE_SYNCDONE
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
List * es_tupleTable
Definition: execnodes.h:470
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1389
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:156
static void apply_handle_update(StringInfo s)
Definition: worker.c:650
uintptr_t Datum
Definition: postgres.h:372
void CommandCounterIncrement(void)
Definition: xact.c:922
void ExecSetSlotDescriptor(TupleTableSlot *slot, TupleDesc tupdesc)
Definition: execTuples.c:247
#define PGINVALID_SOCKET
Definition: port.h:24
int es_num_result_relations
Definition: execnodes.h:442
void EvalPlanQualInit(EPQState *epqstate, EState *estate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2742
#define SIGHUP
Definition: win32.h:188
static void apply_handle_relation(StringInfo s)
Definition: worker.c:511
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
XLogRecPtr local_end
Definition: worker.c:94
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4167
static void apply_dispatch(StringInfo s)
Definition: worker.c:870
static void maybe_reread_subscription(void)
Definition: worker.c:1320
#define makeNode(_type_)
Definition: nodes.h:557
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
TimestampTz last_recv_time
static MemoryContext ApplyMessageContext
Definition: worker.c:106
#define SIGNAL_ARGS
Definition: c.h:1079
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
RepOriginId replorigin_session_origin
Definition: origin.c:150
static void apply_handle_origin(StringInfo s)
Definition: worker.c:489
void StartTransactionCommand(void)
Definition: xact.c:2678
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:137
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
static int server_version
Definition: pg_dumpall.c:82
static void slot_store_error_callback(void *arg)
Definition: worker.c:279
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:211
bool IsTransactionState(void)
Definition: xact.c:350
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:258
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:92
bool MySubscriptionValid
Definition: worker.c:112
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:461
void FreeSubscription(Subscription *sub)
RTEKind rtekind
Definition: parsenodes.h:936
static Datum values[MAXATTR]
Definition: bootstrap.c:163
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4187
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:740
void(* callback)(void *arg)
Definition: elog.h:239
static void slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values, bool *replaces)
Definition: worker.c:368
void * palloc(Size size)
Definition: mcxt.c:849
int errmsg(const char *fmt,...)
Definition: elog.c:797
void die(SIGNAL_ARGS)
Definition: postgres.c:2617
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:1008
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
int i
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
#define errcontext
Definition: elog.h:164
void * arg
struct Latch * MyLatch
Definition: globals.c:52
struct FlushPosition FlushPosition
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:113
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:100
HeapTuple tts_tuple
Definition: tuptable.h:120
#define elog
Definition: elog.h:219
XLogRecPtr commit_lsn
Definition: logicalproto.h:74
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1726
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1738
#define WL_LATCH_SET
Definition: latch.h:124
#define SUBREL_STATE_READY
#define RelationGetRelid(relation)
Definition: rel.h:416
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:224
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4655
void ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
TimestampTz committime
Definition: logicalproto.h:76
void logicalrep_typmap_update(LogicalRepTyp *remotetyp)
Definition: relation.c:411
void ApplyWorkerMain(Datum main_arg)
Definition: worker.c:1477
uint32 LogicalRepRelId
Definition: logicalproto.h:39
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:488
TimestampTz reply_time
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:220
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:258
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5518
void pgstat_report_stat(bool force)
Definition: pgstat.c:812
static volatile sig_atomic_t got_SIGHUP
Definition: worker.c:124
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:242
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:443