PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
worker.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * worker.c
3  * PostgreSQL logical replication worker (apply)
4  *
5  * Copyright (c) 2016-2017, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/worker.c
9  *
10  * NOTES
11  * This file contains the worker which applies logical changes as they come
12  * from remote logical replication stream.
13  *
14  * The main worker (apply) is started by logical replication worker
15  * launcher for every enabled subscription in a database. It uses
16  * walsender protocol to communicate with publisher.
17  *
18  * This module includes server facing code and shares libpqwalreceiver
19  * module with walreceiver for providing the libpq specific functionality.
20  *
21  *-------------------------------------------------------------------------
22  */
23 
24 #include "postgres.h"
25 
26 #include "miscadmin.h"
27 #include "pgstat.h"
28 #include "funcapi.h"
29 
30 #include "access/xact.h"
31 #include "access/xlog_internal.h"
32 
33 #include "catalog/namespace.h"
36 
37 #include "commands/trigger.h"
38 
39 #include "executor/executor.h"
41 
42 #include "libpq/pqformat.h"
43 #include "libpq/pqsignal.h"
44 
45 #include "mb/pg_wchar.h"
46 
47 #include "nodes/makefuncs.h"
48 
49 #include "optimizer/planner.h"
50 
51 #include "parser/parse_relation.h"
52 
53 #include "postmaster/bgworker.h"
54 #include "postmaster/postmaster.h"
55 
56 #include "replication/decode.h"
57 #include "replication/logical.h"
62 #include "replication/origin.h"
63 #include "replication/snapbuild.h"
66 
67 #include "rewrite/rewriteHandler.h"
68 
69 #include "storage/bufmgr.h"
70 #include "storage/ipc.h"
71 #include "storage/lmgr.h"
72 #include "storage/proc.h"
73 #include "storage/procarray.h"
74 
75 #include "utils/builtins.h"
76 #include "utils/catcache.h"
77 #include "utils/datum.h"
78 #include "utils/fmgroids.h"
79 #include "utils/guc.h"
80 #include "utils/inval.h"
81 #include "utils/lsyscache.h"
82 #include "utils/memutils.h"
83 #include "utils/timeout.h"
84 #include "utils/tqual.h"
85 #include "utils/syscache.h"
86 
87 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
88 
89 typedef struct FlushPosition
90 {
95 
97 
98 typedef struct SlotErrCallbackArg
99 {
101  int attnum;
103 
106 
108 
110 bool MySubscriptionValid = false;
111 
114 
115 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
116 
117 static void store_flush_position(XLogRecPtr remote_lsn);
118 
119 static void reread_subscription(void);
120 
121 /*
122  * Should this worker apply changes for given relation.
123  *
124  * This is mainly needed for initial relation data sync as that runs in
125  * separate worker process running in parallel and we need some way to skip
126  * changes coming to the main apply worker during the sync of a table.
127  *
128  * Note we need to do smaller or equals comparison for SYNCDONE state because
129  * it might hold position of end of initial slot consistent point WAL
130  * record + 1 (ie start of next record) and next record can be COMMIT of
131  * transaction we are now processing (which is what we set remote_final_lsn
132  * to in apply_handle_begin).
133  */
134 static bool
136 {
137  if (am_tablesync_worker())
138  return MyLogicalRepWorker->relid == rel->localreloid;
139  else
140  return (rel->state == SUBREL_STATE_READY ||
141  (rel->state == SUBREL_STATE_SYNCDONE &&
142  rel->statelsn <= remote_final_lsn));
143 }
144 
145 /*
146  * Make sure that we started local transaction.
147  *
148  * Also switches to ApplyContext as necessary.
149  */
150 static bool
152 {
153  if (IsTransactionState())
154  {
155  if (CurrentMemoryContext != ApplyContext)
156  MemoryContextSwitchTo(ApplyContext);
157  return false;
158  }
159 
161 
162  if (!MySubscriptionValid)
164 
165  MemoryContextSwitchTo(ApplyContext);
166  return true;
167 }
168 
169 
170 /*
171  * Executor state preparation for evaluation of constraint expressions,
172  * indexes and triggers.
173  *
174  * This is based on similar code in copy.c
175  */
176 static EState *
178 {
179  EState *estate;
180  ResultRelInfo *resultRelInfo;
181  RangeTblEntry *rte;
182 
183  estate = CreateExecutorState();
184 
185  rte = makeNode(RangeTblEntry);
186  rte->rtekind = RTE_RELATION;
187  rte->relid = RelationGetRelid(rel->localrel);
188  rte->relkind = rel->localrel->rd_rel->relkind;
189  estate->es_range_table = list_make1(rte);
190 
191  resultRelInfo = makeNode(ResultRelInfo);
192  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
193 
194  estate->es_result_relations = resultRelInfo;
195  estate->es_num_result_relations = 1;
196  estate->es_result_relation_info = resultRelInfo;
197 
198  /* Triggers might need a slot */
199  if (resultRelInfo->ri_TrigDesc)
200  estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
201 
202  /* Prepare to catch AFTER triggers. */
204 
205  return estate;
206 }
207 
208 /*
209  * Executes default values for columns for which we can't map to remote
210  * relation columns.
211  *
212  * This allows us to support tables which have more columns on the downstream
213  * than on the upstream.
214  */
215 static void
217  TupleTableSlot *slot)
218 {
219  TupleDesc desc = RelationGetDescr(rel->localrel);
220  int num_phys_attrs = desc->natts;
221  int i;
222  int attnum,
223  num_defaults = 0;
224  int *defmap;
225  ExprState **defexprs;
226  ExprContext *econtext;
227 
228  econtext = GetPerTupleExprContext(estate);
229 
230  /* We got all the data via replication, no need to evaluate anything. */
231  if (num_phys_attrs == rel->remoterel.natts)
232  return;
233 
234  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
235  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
236 
237  for (attnum = 0; attnum < num_phys_attrs; attnum++)
238  {
239  Expr *defexpr;
240 
241  if (desc->attrs[attnum]->attisdropped)
242  continue;
243 
244  if (rel->attrmap[attnum] >= 0)
245  continue;
246 
247  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
248 
249  if (defexpr != NULL)
250  {
251  /* Run the expression through planner */
252  defexpr = expression_planner(defexpr);
253 
254  /* Initialize executable expression in copycontext */
255  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
256  defmap[num_defaults] = attnum;
257  num_defaults++;
258  }
259 
260  }
261 
262  for (i = 0; i < num_defaults; i++)
263  slot->tts_values[defmap[i]] =
264  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
265 }
266 
267 /*
268  * Error callback to give more context info about type conversion failure.
269  */
270 static void
272 {
273  SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
274  Oid remotetypoid,
275  localtypoid;
276 
277  if (errarg->attnum < 0)
278  return;
279 
280  remotetypoid = errarg->rel->atttyps[errarg->attnum];
281  localtypoid = logicalrep_typmap_getid(remotetypoid);
282  errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
283  "remote type %s, local type %s",
284  errarg->rel->nspname, errarg->rel->relname,
285  errarg->rel->attnames[errarg->attnum],
286  format_type_be(remotetypoid),
287  format_type_be(localtypoid));
288 }
289 
290 /*
291  * Store data in C string form into slot.
292  * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
293  * use better.
294  */
295 static void
297  char **values)
298 {
299  int natts = slot->tts_tupleDescriptor->natts;
300  int i;
301  SlotErrCallbackArg errarg;
302  ErrorContextCallback errcallback;
303 
304  ExecClearTuple(slot);
305 
306  /* Push callback + info on the error context stack */
307  errarg.rel = &rel->remoterel;
308  errarg.attnum = -1;
309  errcallback.callback = slot_store_error_callback;
310  errcallback.arg = (void *) &errarg;
311  errcallback.previous = error_context_stack;
312  error_context_stack = &errcallback;
313 
314  /* Call the "in" function for each non-dropped attribute */
315  for (i = 0; i < natts; i++)
316  {
318  int remoteattnum = rel->attrmap[i];
319 
320  if (!att->attisdropped && remoteattnum >= 0 &&
321  values[remoteattnum] != NULL)
322  {
323  Oid typinput;
324  Oid typioparam;
325 
326  errarg.attnum = remoteattnum;
327 
328  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
329  slot->tts_values[i] = OidInputFunctionCall(typinput,
330  values[remoteattnum],
331  typioparam,
332  att->atttypmod);
333  slot->tts_isnull[i] = false;
334  }
335  else
336  {
337  /*
338  * We assign NULL to dropped attributes, NULL values, and missing
339  * values (missing values should be later filled using
340  * slot_fill_defaults).
341  */
342  slot->tts_values[i] = (Datum) 0;
343  slot->tts_isnull[i] = true;
344  }
345  }
346 
347  /* Pop the error context stack */
348  error_context_stack = errcallback.previous;
349 
350  ExecStoreVirtualTuple(slot);
351 }
352 
353 /*
354  * Modify slot with user data provided as C strings.
355  * This is somewhat similar to heap_modify_tuple but also calls the type
356  * input function on the user data as the input is the text representation
357  * of the types.
358  */
359 static void
361  char **values, bool *replaces)
362 {
363  int natts = slot->tts_tupleDescriptor->natts;
364  int i;
365  SlotErrCallbackArg errarg;
366  ErrorContextCallback errcallback;
367 
368  slot_getallattrs(slot);
369  ExecClearTuple(slot);
370 
371  /* Push callback + info on the error context stack */
372  errarg.rel = &rel->remoterel;
373  errarg.attnum = -1;
374  errcallback.callback = slot_store_error_callback;
375  errcallback.arg = (void *) &errarg;
376  errcallback.previous = error_context_stack;
377  error_context_stack = &errcallback;
378 
379  /* Call the "in" function for each replaced attribute */
380  for (i = 0; i < natts; i++)
381  {
383  int remoteattnum = rel->attrmap[i];
384 
385  if (remoteattnum >= 0 && !replaces[remoteattnum])
386  continue;
387 
388  if (remoteattnum >= 0 && values[remoteattnum] != NULL)
389  {
390  Oid typinput;
391  Oid typioparam;
392 
393  errarg.attnum = remoteattnum;
394 
395  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
396  slot->tts_values[i] = OidInputFunctionCall(typinput, values[i],
397  typioparam,
398  att->atttypmod);
399  slot->tts_isnull[i] = false;
400  }
401  else
402  {
403  slot->tts_values[i] = (Datum) 0;
404  slot->tts_isnull[i] = true;
405  }
406  }
407 
408  /* Pop the error context stack */
409  error_context_stack = errcallback.previous;
410 
411  ExecStoreVirtualTuple(slot);
412 }
413 
414 /*
415  * Handle BEGIN message.
416  */
417 static void
419 {
420  LogicalRepBeginData begin_data;
421 
422  logicalrep_read_begin(s, &begin_data);
423 
424  remote_final_lsn = begin_data.final_lsn;
425 
426  in_remote_transaction = true;
427 
429 }
430 
431 /*
432  * Handle COMMIT message.
433  *
434  * TODO, support tracking of multiple origins
435  */
436 static void
438 {
439  LogicalRepCommitData commit_data;
440 
441  logicalrep_read_commit(s, &commit_data);
442 
443  Assert(commit_data.commit_lsn == remote_final_lsn);
444 
445  /* The synchronization worker runs in single transaction. */
447  {
448  /*
449  * Update origin state so we can restart streaming from correct
450  * position in case of crash.
451  */
454 
456 
457  store_flush_position(commit_data.end_lsn);
458  }
459 
460  in_remote_transaction = false;
461 
462  /* Process any tables that are being synchronized in parallel. */
463  process_syncing_tables(commit_data.end_lsn);
464 
465  pgstat_report_stat(false);
467 }
468 
469 /*
470  * Handle ORIGIN message.
471  *
472  * TODO, support tracking of multiple origins
473  */
474 static void
476 {
477  /*
478  * ORIGIN message can only come inside remote transaction and before
479  * any actual writes.
480  */
481  if (!in_remote_transaction ||
483  ereport(ERROR,
484  (errcode(ERRCODE_PROTOCOL_VIOLATION),
485  errmsg("ORIGIN message sent out of order")));
486 }
487 
488 /*
489  * Handle RELATION message.
490  *
491  * Note we don't do validation against local schema here. The validation
492  * against local schema is postponed until first change for given relation
493  * comes as we only care about it when applying changes for it anyway and we
494  * do less locking this way.
495  */
496 static void
498 {
499  LogicalRepRelation *rel;
500 
501  rel = logicalrep_read_rel(s);
503 }
504 
505 /*
506  * Handle TYPE message.
507  *
508  * Note we don't do local mapping here, that's done when the type is
509  * actually used.
510  */
511 static void
513 {
514  LogicalRepTyp typ;
515 
516  logicalrep_read_typ(s, &typ);
518 }
519 
520 /*
521  * Get replica identity index or if it is not defined a primary key.
522  *
523  * If neither is defined, returns InvalidOid
524  */
525 static Oid
527 {
528  Oid idxoid;
529 
530  idxoid = RelationGetReplicaIndex(rel);
531 
532  if (!OidIsValid(idxoid))
533  idxoid = RelationGetPrimaryKeyIndex(rel);
534 
535  return idxoid;
536 }
537 
538 /*
539  * Handle INSERT message.
540  */
541 static void
543 {
545  LogicalRepTupleData newtup;
546  LogicalRepRelId relid;
547  EState *estate;
548  TupleTableSlot *remoteslot;
549  MemoryContext oldctx;
550 
552 
553  relid = logicalrep_read_insert(s, &newtup);
556  {
557  /*
558  * The relation can't become interesting in the middle of the
559  * transaction so it's safe to unlock it.
560  */
562  return;
563  }
564 
565  /* Initialize the executor state. */
566  estate = create_estate_for_relation(rel);
567  remoteslot = ExecInitExtraTupleSlot(estate);
569 
570  /* Process and store remote tuple in the slot */
572  slot_store_cstrings(remoteslot, rel, newtup.values);
573  slot_fill_defaults(rel, estate, remoteslot);
574  MemoryContextSwitchTo(oldctx);
575 
578 
579  /* Do the insert. */
580  ExecSimpleRelationInsert(estate, remoteslot);
581 
582  /* Cleanup. */
585 
586  /* Handle queued AFTER triggers. */
587  AfterTriggerEndQuery(estate);
588 
589  ExecResetTupleTable(estate->es_tupleTable, false);
590  FreeExecutorState(estate);
591 
593 
595 }
596 
597 /*
598  * Check if the logical replication relation is updatable and throw
599  * appropriate error if it isn't.
600  */
601 static void
603 {
604  /* Updatable, no error. */
605  if (rel->updatable)
606  return;
607 
608  /*
609  * We are in error mode so it's fine this is somewhat slow.
610  * It's better to give user correct error.
611  */
613  {
614  ereport(ERROR,
615  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
616  errmsg("publisher does not send replica identity column "
617  "expected by the logical replication target relation \"%s.%s\"",
618  rel->remoterel.nspname, rel->remoterel.relname)));
619  }
620 
621  ereport(ERROR,
622  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
623  errmsg("logical replication target relation \"%s.%s\" has "
624  "neither REPLICA IDENTITY index nor PRIMARY "
625  "KEY and published relation does not have "
626  "REPLICA IDENTITY FULL",
627  rel->remoterel.nspname, rel->remoterel.relname)));
628 }
629 
630 /*
631  * Handle UPDATE message.
632  *
633  * TODO: FDW support
634  */
635 static void
637 {
639  LogicalRepRelId relid;
640  Oid idxoid;
641  EState *estate;
642  EPQState epqstate;
643  LogicalRepTupleData oldtup;
644  LogicalRepTupleData newtup;
645  bool has_oldtup;
646  TupleTableSlot *localslot;
647  TupleTableSlot *remoteslot;
648  bool found;
649  MemoryContext oldctx;
650 
652 
653  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
654  &newtup);
657  {
658  /*
659  * The relation can't become interesting in the middle of the
660  * transaction so it's safe to unlock it.
661  */
663  return;
664  }
665 
666  /* Check if we can do the update. */
668 
669  /* Initialize the executor state. */
670  estate = create_estate_for_relation(rel);
671  remoteslot = ExecInitExtraTupleSlot(estate);
673  localslot = ExecInitExtraTupleSlot(estate);
675  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
676 
679 
680  /* Build the search tuple. */
682  slot_store_cstrings(remoteslot, rel,
683  has_oldtup ? oldtup.values : newtup.values);
684  MemoryContextSwitchTo(oldctx);
685 
686  /*
687  * Try to find tuple using either replica identity index, primary key
688  * or if needed, sequential scan.
689  */
690  idxoid = GetRelationIdentityOrPK(rel->localrel);
691  Assert(OidIsValid(idxoid) ||
692  (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
693 
694  if (OidIsValid(idxoid))
695  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
697  remoteslot, localslot);
698  else
700  remoteslot, localslot);
701 
702  ExecClearTuple(remoteslot);
703 
704  /*
705  * Tuple found.
706  *
707  * Note this will fail if there are other conflicting unique indexes.
708  */
709  if (found)
710  {
711  /* Process and store remote tuple in the slot */
713  ExecStoreTuple(localslot->tts_tuple, remoteslot, InvalidBuffer, false);
714  slot_modify_cstrings(remoteslot, rel, newtup.values, newtup.changed);
715  MemoryContextSwitchTo(oldctx);
716 
717  EvalPlanQualSetSlot(&epqstate, remoteslot);
718 
719  /* Do the actual update. */
720  ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
721  }
722  else
723  {
724  /*
725  * The tuple to be updated could not be found.
726  *
727  * TODO what to do here, change the log level to LOG perhaps?
728  */
729  elog(DEBUG1,
730  "logical replication did not find row for update "
731  "in replication target relation \"%s\"",
733  }
734 
735  /* Cleanup. */
738 
739  /* Handle queued AFTER triggers. */
740  AfterTriggerEndQuery(estate);
741 
742  EvalPlanQualEnd(&epqstate);
743  ExecResetTupleTable(estate->es_tupleTable, false);
744  FreeExecutorState(estate);
745 
747 
749 }
750 
751 /*
752  * Handle DELETE message.
753  *
754  * TODO: FDW support
755  */
756 static void
758 {
760  LogicalRepTupleData oldtup;
761  LogicalRepRelId relid;
762  Oid idxoid;
763  EState *estate;
764  EPQState epqstate;
765  TupleTableSlot *remoteslot;
766  TupleTableSlot *localslot;
767  bool found;
768  MemoryContext oldctx;
769 
771 
772  relid = logicalrep_read_delete(s, &oldtup);
775  {
776  /*
777  * The relation can't become interesting in the middle of the
778  * transaction so it's safe to unlock it.
779  */
781  return;
782  }
783 
784  /* Check if we can do the delete. */
786 
787  /* Initialize the executor state. */
788  estate = create_estate_for_relation(rel);
789  remoteslot = ExecInitExtraTupleSlot(estate);
791  localslot = ExecInitExtraTupleSlot(estate);
793  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
794 
797 
798  /* Find the tuple using the replica identity index. */
800  slot_store_cstrings(remoteslot, rel, oldtup.values);
801  MemoryContextSwitchTo(oldctx);
802 
803  /*
804  * Try to find tuple using either replica identity index, primary key
805  * or if needed, sequential scan.
806  */
807  idxoid = GetRelationIdentityOrPK(rel->localrel);
808  Assert(OidIsValid(idxoid) ||
810 
811  if (OidIsValid(idxoid))
812  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
814  remoteslot, localslot);
815  else
817  remoteslot, localslot);
818  /* If found delete it. */
819  if (found)
820  {
821  EvalPlanQualSetSlot(&epqstate, localslot);
822 
823  /* Do the actual delete. */
824  ExecSimpleRelationDelete(estate, &epqstate, localslot);
825  }
826  else
827  {
828  /* The tuple to be deleted could not be found.*/
829  ereport(DEBUG1,
830  (errmsg("logical replication could not find row for delete "
831  "in replication target %s",
833  }
834 
835  /* Cleanup. */
838 
839  /* Handle queued AFTER triggers. */
840  AfterTriggerEndQuery(estate);
841 
842  EvalPlanQualEnd(&epqstate);
843  ExecResetTupleTable(estate->es_tupleTable, false);
844  FreeExecutorState(estate);
845 
847 
849 }
850 
851 
852 /*
853  * Logical replication protocol message dispatcher.
854  */
855 static void
857 {
858  char action = pq_getmsgbyte(s);
859 
860  switch (action)
861  {
862  /* BEGIN */
863  case 'B':
865  break;
866  /* COMMIT */
867  case 'C':
869  break;
870  /* INSERT */
871  case 'I':
873  break;
874  /* UPDATE */
875  case 'U':
877  break;
878  /* DELETE */
879  case 'D':
881  break;
882  /* RELATION */
883  case 'R':
885  break;
886  /* TYPE */
887  case 'Y':
889  break;
890  /* ORIGIN */
891  case 'O':
893  break;
894  default:
895  ereport(ERROR,
896  (errcode(ERRCODE_PROTOCOL_VIOLATION),
897  errmsg("invalid logical replication message type %c", action)));
898  }
899 }
900 
901 /*
902  * Figure out which write/flush positions to report to the walsender process.
903  *
904  * We can't simply report back the last LSN the walsender sent us because the
905  * local transaction might not yet be flushed to disk locally. Instead we
906  * build a list that associates local with remote LSNs for every commit. When
907  * reporting back the flush position to the sender we iterate that list and
908  * check which entries on it are already locally flushed. Those we can report
909  * as having been flushed.
910  *
911  * The have_pending_txes is true if there are outstanding transactions that
912  * need to be flushed.
913  */
914 static void
916  bool *have_pending_txes)
917 {
918  dlist_mutable_iter iter;
919  XLogRecPtr local_flush = GetFlushRecPtr();
920 
921  *write = InvalidXLogRecPtr;
922  *flush = InvalidXLogRecPtr;
923 
924  dlist_foreach_modify(iter, &lsn_mapping)
925  {
926  FlushPosition *pos =
927  dlist_container(FlushPosition, node, iter.cur);
928 
929  *write = pos->remote_end;
930 
931  if (pos->local_end <= local_flush)
932  {
933  *flush = pos->remote_end;
934  dlist_delete(iter.cur);
935  pfree(pos);
936  }
937  else
938  {
939  /*
940  * Don't want to uselessly iterate over the rest of the list which
941  * could potentially be long. Instead get the last element and
942  * grab the write position from there.
943  */
944  pos = dlist_tail_element(FlushPosition, node,
945  &lsn_mapping);
946  *write = pos->remote_end;
947  *have_pending_txes = true;
948  return;
949  }
950  }
951 
952  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
953 }
954 
955 /*
956  * Store current remote/local lsn pair in the tracking list.
957  */
958 static void
960 {
961  FlushPosition *flushpos;
962 
963  /* Need to do this in permanent context */
964  MemoryContextSwitchTo(ApplyCacheContext);
965 
966  /* Track commit lsn */
967  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
968  flushpos->local_end = XactLastCommitEnd;
969  flushpos->remote_end = remote_lsn;
970 
971  dlist_push_tail(&lsn_mapping, &flushpos->node);
972  MemoryContextSwitchTo(ApplyContext);
973 }
974 
975 
976 /* Update statistics of the worker. */
977 static void
979 {
981  MyLogicalRepWorker->last_send_time = send_time;
983  if (reply)
984  {
986  MyLogicalRepWorker->reply_time = send_time;
987  }
988 }
989 
990 /*
991  * Apply main loop.
992  */
993 static void
995 {
996  /* Init the ApplyContext which we use for easier cleanup. */
998  "ApplyContext",
1002 
1003  /* mark as idle, before starting to loop */
1005 
1006  while (!got_SIGTERM)
1007  {
1009  int rc;
1010  int len;
1011  char *buf = NULL;
1012  bool endofstream = false;
1013  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1014  bool ping_sent = false;
1015 
1016  MemoryContextSwitchTo(ApplyContext);
1017 
1018  len = walrcv_receive(wrconn, &buf, &fd);
1019 
1020  if (len != 0)
1021  {
1022  /* Process the data */
1023  for (;;)
1024  {
1026 
1027  if (len == 0)
1028  {
1029  break;
1030  }
1031  else if (len < 0)
1032  {
1033  ereport(LOG,
1034  (errmsg("data stream from publisher has ended")));
1035  endofstream = true;
1036  break;
1037  }
1038  else
1039  {
1040  int c;
1041  StringInfoData s;
1042 
1043  /* Reset timeout. */
1044  last_recv_timestamp = GetCurrentTimestamp();
1045  ping_sent = false;
1046 
1047  /* Ensure we are reading the data into our memory context. */
1048  MemoryContextSwitchTo(ApplyContext);
1049 
1050  s.data = buf;
1051  s.len = len;
1052  s.cursor = 0;
1053  s.maxlen = -1;
1054 
1055  c = pq_getmsgbyte(&s);
1056 
1057  if (c == 'w')
1058  {
1059  XLogRecPtr start_lsn;
1060  XLogRecPtr end_lsn;
1061  TimestampTz send_time;
1062 
1063  start_lsn = pq_getmsgint64(&s);
1064  end_lsn = pq_getmsgint64(&s);
1065  send_time = pq_getmsgint64(&s);
1066 
1067  if (last_received < start_lsn)
1068  last_received = start_lsn;
1069 
1070  if (last_received < end_lsn)
1071  last_received = end_lsn;
1072 
1073  UpdateWorkerStats(last_received, send_time, false);
1074 
1075  apply_dispatch(&s);
1076  }
1077  else if (c == 'k')
1078  {
1079  XLogRecPtr end_lsn;
1081  bool reply_requested;
1082 
1083  end_lsn = pq_getmsgint64(&s);
1084  timestamp = pq_getmsgint64(&s);
1085  reply_requested = pq_getmsgbyte(&s);
1086 
1087  if (last_received < end_lsn)
1088  last_received = end_lsn;
1089 
1090  send_feedback(last_received, reply_requested, false);
1091  UpdateWorkerStats(last_received, timestamp, true);
1092  }
1093  /* other message types are purposefully ignored */
1094  }
1095 
1096  len = walrcv_receive(wrconn, &buf, &fd);
1097  }
1098 
1099  /* confirm all writes at once */
1100  send_feedback(last_received, false, false);
1101  }
1102 
1103  if (!in_remote_transaction)
1104  {
1105  /*
1106  * If we didn't get any transactions for a while there might be
1107  * unconsumed invalidation messages in the queue, consume them now.
1108  */
1110  if (!MySubscriptionValid)
1112 
1113  /* Process any table synchronization changes. */
1114  process_syncing_tables(last_received);
1115  }
1116 
1117  /* Cleanup the memory. */
1120 
1121  /* Check if we need to exit the streaming loop. */
1122  if (endofstream)
1123  {
1124  TimeLineID tli;
1125  walrcv_endstreaming(wrconn, &tli);
1126  break;
1127  }
1128 
1129  /*
1130  * Wait for more data or latch.
1131  */
1135  fd, NAPTIME_PER_CYCLE,
1137 
1138  /* Emergency bailout if postmaster has died */
1139  if (rc & WL_POSTMASTER_DEATH)
1140  proc_exit(1);
1141 
1142  if (got_SIGHUP)
1143  {
1144  got_SIGHUP = false;
1146  }
1147 
1148  if (rc & WL_TIMEOUT)
1149  {
1150  /*
1151  * We didn't receive anything new. If we haven't heard
1152  * anything from the server for more than
1153  * wal_receiver_timeout / 2, ping the server. Also, if
1154  * it's been longer than wal_receiver_status_interval
1155  * since the last update we sent, send a status update to
1156  * the master anyway, to report any progress in applying
1157  * WAL.
1158  */
1159  bool requestReply = false;
1160 
1161  /*
1162  * Check if time since last receive from standby has
1163  * reached the configured limit.
1164  */
1165  if (wal_receiver_timeout > 0)
1166  {
1168  TimestampTz timeout;
1169 
1170  timeout =
1171  TimestampTzPlusMilliseconds(last_recv_timestamp,
1173 
1174  if (now >= timeout)
1175  ereport(ERROR,
1176  (errmsg("terminating logical replication worker due to timeout")));
1177 
1178  /*
1179  * We didn't receive anything new, for half of
1180  * receiver replication timeout. Ping the server.
1181  */
1182  if (!ping_sent)
1183  {
1184  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1185  (wal_receiver_timeout / 2));
1186  if (now >= timeout)
1187  {
1188  requestReply = true;
1189  ping_sent = true;
1190  }
1191  }
1192  }
1193 
1194  send_feedback(last_received, requestReply, requestReply);
1195  }
1196 
1198  }
1199 }
1200 
1201 /*
1202  * Send a Standby Status Update message to server.
1203  *
1204  * 'recvpos' is the latest LSN we've received data to, force is set if we need
1205  * to send a response to avoid timeouts.
1206  */
1207 static void
1208 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
1209 {
1210  static StringInfo reply_message = NULL;
1211  static TimestampTz send_time = 0;
1212 
1213  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1214  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1215  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1216 
1217  XLogRecPtr writepos;
1218  XLogRecPtr flushpos;
1219  TimestampTz now;
1220  bool have_pending_txes;
1221 
1222  /*
1223  * If the user doesn't want status to be reported to the publisher, be
1224  * sure to exit before doing anything at all.
1225  */
1226  if (!force && wal_receiver_status_interval <= 0)
1227  return;
1228 
1229  /* It's legal to not pass a recvpos */
1230  if (recvpos < last_recvpos)
1231  recvpos = last_recvpos;
1232 
1233  get_flush_position(&writepos, &flushpos, &have_pending_txes);
1234 
1235  /*
1236  * No outstanding transactions to flush, we can report the latest
1237  * received position. This is important for synchronous replication.
1238  */
1239  if (!have_pending_txes)
1240  flushpos = writepos = recvpos;
1241 
1242  if (writepos < last_writepos)
1243  writepos = last_writepos;
1244 
1245  if (flushpos < last_flushpos)
1246  flushpos = last_flushpos;
1247 
1248  now = GetCurrentTimestamp();
1249 
1250  /* if we've already reported everything we're good */
1251  if (!force &&
1252  writepos == last_writepos &&
1253  flushpos == last_flushpos &&
1254  !TimestampDifferenceExceeds(send_time, now,
1256  return;
1257  send_time = now;
1258 
1259  if (!reply_message)
1260  {
1261  MemoryContext oldctx = MemoryContextSwitchTo(ApplyCacheContext);
1262  reply_message = makeStringInfo();
1263  MemoryContextSwitchTo(oldctx);
1264  }
1265  else
1266  resetStringInfo(reply_message);
1267 
1268  pq_sendbyte(reply_message, 'r');
1269  pq_sendint64(reply_message, recvpos); /* write */
1270  pq_sendint64(reply_message, flushpos); /* flush */
1271  pq_sendint64(reply_message, writepos); /* apply */
1272  pq_sendint64(reply_message, now); /* sendTime */
1273  pq_sendbyte(reply_message, requestReply); /* replyRequested */
1274 
1275  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1276  force,
1277  (uint32) (recvpos >> 32), (uint32) recvpos,
1278  (uint32) (writepos >> 32), (uint32) writepos,
1279  (uint32) (flushpos >> 32), (uint32) flushpos
1280  );
1281 
1282  walrcv_send(wrconn, reply_message->data, reply_message->len);
1283 
1284  if (recvpos > last_recvpos)
1285  last_recvpos = recvpos;
1286  if (writepos > last_writepos)
1287  last_writepos = writepos;
1288  if (flushpos > last_flushpos)
1289  last_flushpos = flushpos;
1290 }
1291 
1292 
1293 /*
1294  * Reread subscription info and exit on change.
1295  */
1296 static void
1298 {
1299  MemoryContext oldctx;
1301  bool started_tx = false;
1302 
1303  /* This function might be called inside or outside of transaction. */
1304  if (!IsTransactionState())
1305  {
1307  started_tx = true;
1308  }
1309 
1310  /* Ensure allocations in permanent context. */
1311  oldctx = MemoryContextSwitchTo(ApplyCacheContext);
1312 
1313  newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1314 
1315  /*
1316  * Exit if the subscription was removed.
1317  * This normally should not happen as the worker gets killed
1318  * during DROP SUBSCRIPTION.
1319  */
1320  if (!newsub)
1321  {
1322  ereport(LOG,
1323  (errmsg("logical replication worker for subscription \"%s\" will "
1324  "stop because the subscription was removed",
1325  MySubscription->name)));
1326 
1327  walrcv_disconnect(wrconn);
1328  proc_exit(0);
1329  }
1330 
1331  /*
1332  * Exit if connection string was changed. The launcher will start
1333  * new worker.
1334  */
1335  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1336  {
1337  ereport(LOG,
1338  (errmsg("logical replication worker for subscription \"%s\" will "
1339  "restart because the connection information was changed",
1340  MySubscription->name)));
1341 
1342  walrcv_disconnect(wrconn);
1343  proc_exit(0);
1344  }
1345 
1346  /*
1347  * Exit if subscription name was changed (it's used for
1348  * fallback_application_name). The launcher will start new worker.
1349  */
1350  if (strcmp(newsub->name, MySubscription->name) != 0)
1351  {
1352  ereport(LOG,
1353  (errmsg("logical replication worker for subscription \"%s\" will "
1354  "restart because subscription was renamed",
1355  MySubscription->name)));
1356 
1357  walrcv_disconnect(wrconn);
1358  proc_exit(0);
1359  }
1360 
1361  /*
1362  * We need to make new connection to new slot if slot name has changed
1363  * so exit here as well if that's the case.
1364  */
1365  if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
1366  {
1367  ereport(LOG,
1368  (errmsg("logical replication worker for subscription \"%s\" will "
1369  "restart because the replication slot name was changed",
1370  MySubscription->name)));
1371 
1372  walrcv_disconnect(wrconn);
1373  proc_exit(0);
1374  }
1375 
1376  /*
1377  * Exit if publication list was changed. The launcher will start
1378  * new worker.
1379  */
1380  if (!equal(newsub->publications, MySubscription->publications))
1381  {
1382  ereport(LOG,
1383  (errmsg("logical replication worker for subscription \"%s\" will "
1384  "restart because subscription's publications were changed",
1385  MySubscription->name)));
1386 
1387  walrcv_disconnect(wrconn);
1388  proc_exit(0);
1389  }
1390 
1391  /*
1392  * Exit if the subscription was disabled.
1393  * This normally should not happen as the worker gets killed
1394  * during ALTER SUBSCRIPTION ... DISABLE.
1395  */
1396  if (!newsub->enabled)
1397  {
1398  ereport(LOG,
1399  (errmsg("logical replication worker for subscription \"%s\" will "
1400  "stop because the subscription was disabled",
1401  MySubscription->name)));
1402 
1403  walrcv_disconnect(wrconn);
1404  proc_exit(0);
1405  }
1406 
1407  /* Check for other changes that should never happen too. */
1408  if (newsub->dbid != MySubscription->dbid)
1409  {
1410  elog(ERROR, "subscription %u changed unexpectedly",
1412  }
1413 
1414  /* Clean old subscription info and switch to new one. */
1415  FreeSubscription(MySubscription);
1416  MySubscription = newsub;
1417 
1418  MemoryContextSwitchTo(oldctx);
1419 
1420  /* Change synchronous commit according to the user's wishes */
1421  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1423 
1424  if (started_tx)
1426 
1427  MySubscriptionValid = true;
1428 }
1429 
1430 /*
1431  * Callback from subscription syscache invalidation.
1432  */
1433 static void
1434 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
1435 {
1436  MySubscriptionValid = false;
1437 }
1438 
1439 
1440 /* Logical Replication Apply worker entry point */
1441 void
1443 {
1444  int worker_slot = DatumGetInt32(main_arg);
1445  MemoryContext oldctx;
1446  char originname[NAMEDATALEN];
1447  XLogRecPtr origin_startpos;
1448  char *myslotname;
1450 
1451  /* Attach to slot */
1452  logicalrep_worker_attach(worker_slot);
1453 
1454  /* Setup signal handling */
1458 
1459  /* Initialise stats to a sanish value */
1462 
1463  /* Make it easy to identify our processes. */
1464  SetConfigOption("application_name", MyBgworkerEntry->bgw_name,
1466 
1467  /* Load the libpq-specific functions */
1468  load_file("libpqwalreceiver", false);
1469 
1472  "logical replication apply");
1473 
1474  /* Run as replica session replication role. */
1475  SetConfigOption("session_replication_role", "replica",
1477 
1478  /* Connect to our database. */
1481 
1482  /* Load the subscription into persistent memory context. */
1484  ApplyCacheContext = AllocSetContextCreate(CacheMemoryContext,
1485  "ApplyCacheContext",
1488  oldctx = MemoryContextSwitchTo(ApplyCacheContext);
1489  MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
1490  MySubscriptionValid = true;
1491  MemoryContextSwitchTo(oldctx);
1492 
1493  /* Setup synchronous commit according to the user's wishes */
1494  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1496 
1497  if (!MySubscription->enabled)
1498  {
1499  ereport(LOG,
1500  (errmsg("logical replication worker for subscription \"%s\" will not "
1501  "start because the subscription was disabled during startup",
1502  MySubscription->name)));
1503 
1504  proc_exit(0);
1505  }
1506 
1507  /* Keep us informed about subscription changes. */
1510  (Datum) 0);
1511 
1512  if (am_tablesync_worker())
1513  elog(LOG, "logical replication sync for subscription %s, table %s started",
1514  MySubscription->name, get_rel_name(MyLogicalRepWorker->relid));
1515  else
1516  elog(LOG, "logical replication apply for subscription %s started",
1517  MySubscription->name);
1518 
1520 
1521  /* Connect to the origin and start the replication. */
1522  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1523  MySubscription->conninfo);
1524 
1525  if (am_tablesync_worker())
1526  {
1527  char *syncslotname;
1528 
1529  /* This is table synchroniation worker, call initial sync. */
1530  syncslotname = LogicalRepSyncTableStart(&origin_startpos);
1531 
1532  /* The slot name needs to be allocated in permanent memory context. */
1533  oldctx = MemoryContextSwitchTo(ApplyCacheContext);
1534  myslotname = pstrdup(syncslotname);
1535  MemoryContextSwitchTo(oldctx);
1536 
1537  pfree(syncslotname);
1538  }
1539  else
1540  {
1541  /* This is main apply worker */
1542  RepOriginId originid;
1543  TimeLineID startpointTLI;
1544  char *err;
1545  int server_version;
1546 
1547  myslotname = MySubscription->slotname;
1548 
1549  /* Setup replication origin tracking. */
1551  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1552  originid = replorigin_by_name(originname, true);
1553  if (!OidIsValid(originid))
1554  originid = replorigin_create(originname);
1555  replorigin_session_setup(originid);
1556  replorigin_session_origin = originid;
1557  origin_startpos = replorigin_session_get_progress(false);
1559 
1560  wrconn = walrcv_connect(MySubscription->conninfo, true, myslotname,
1561  &err);
1562  if (wrconn == NULL)
1563  ereport(ERROR,
1564  (errmsg("could not connect to the publisher: %s", err)));
1565 
1566  /*
1567  * We don't really use the output identify_system for anything
1568  * but it does some initializations on the upstream so let's still
1569  * call it.
1570  */
1571  (void) walrcv_identify_system(wrconn, &startpointTLI,
1572  &server_version);
1573 
1574  }
1575 
1576  /*
1577  * Setup callback for syscache so that we know when something
1578  * changes in the subscription relation state.
1579  */
1582  (Datum) 0);
1583 
1584  /* Build logical replication streaming options. */
1585  options.logical = true;
1586  options.startpoint = origin_startpos;
1587  options.slotname = myslotname;
1588  options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1589  options.proto.logical.publication_names = MySubscription->publications;
1590 
1591  /* Start normal logical streaming replication. */
1592  walrcv_startstreaming(wrconn, &options);
1593 
1594  /* Run the main loop. */
1595  LogicalRepApplyLoop(origin_startpos);
1596 
1597  walrcv_disconnect(wrconn);
1598 
1599  /* We should only get here if we received SIGTERM */
1600  proc_exit(0);
1601 }
void ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
Subscription * MySubscription
Definition: worker.c:109
static void apply_handle_type(StringInfo s)
Definition: worker.c:512
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:135
#define NIL
Definition: pg_list.h:69
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:320
void logicalrep_worker_sighup(SIGNAL_ARGS)
Definition: launcher.c:635
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:177
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:526
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, Relation partition_root, int instrument_options)
Definition: execMain.c:1260
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:1208
WalReceiverConn * wrconn
Definition: worker.c:107
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:254
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate)
Definition: execTuples.c:852
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
static void store_flush_position(XLogRecPtr remote_lsn)
Definition: worker.c:959
MemoryContext ApplyCacheContext
Definition: worker.c:105
uint32 TimeLineID
Definition: xlogdefs.h:45
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:277
void AcceptInvalidationMessages(void)
Definition: inval.c:672
static XLogRecPtr remote_final_lsn
Definition: worker.c:113
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4665
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
static void apply_handle_insert(StringInfo s)
Definition: worker.c:542
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static dlist_head lsn_mapping
Definition: worker.c:96
#define DatumGetInt32(X)
Definition: postgres.h:478
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:2962
#define RelationGetDescr(relation)
Definition: rel.h:429
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:466
dlist_node node
Definition: worker.c:91
#define write(a, b, c)
Definition: win32.h:14
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2958
int64 timestamp
PGPROC * MyProc
Definition: proc.c:67
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:57
int64 TimestampTz
Definition: timestamp.h:39
LogicalRepRelation * rel
Definition: worker.c:100
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
char * pstrdup(const char *in)
Definition: mcxt.c:1077
void CommitTransactionCommand(void)
Definition: xact.c:2747
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:978
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:338
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
StringInfo makeStringInfo(void)
Definition: stringinfo.c:29
static void reread_subscription(void)
Definition: worker.c:1297
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:256
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:324
Expr * expression_planner(Expr *expr)
Definition: planner.c:5928
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
Form_pg_attribute * attrs
Definition: tupdesc.h:74
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
TimestampTz last_send_time
#define InvalidBuffer
Definition: buf.h:25
uint16 RepOriginId
Definition: xlogdefs.h:51
XLogRecPtr last_lsn
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:252
void proc_exit(int code)
Definition: ipc.c:99
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1110
XLogRecPtr remote_end
Definition: worker.c:93
int errcode(int sqlerrcode)
Definition: elog.c:575
char * format_type_be(Oid type_oid)
Definition: format_type.c:94
static volatile sig_atomic_t got_SIGHUP
Definition: autovacuum.c:140
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:365
BackgroundWorker * MyBgworkerEntry
Definition: postmaster.c:189
Datum * tts_values
Definition: tuptable.h:125
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define WL_SOCKET_READABLE
Definition: latch.h:125
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8223
void PopActiveSnapshot(void)
Definition: snapmgr.c:807
Oid logicalrep_typmap_getid(Oid remoteid)
Definition: relation.c:439
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:983
void ResetLatch(volatile Latch *latch)
Definition: latch.c:498
List * es_range_table
Definition: execnodes.h:411
#define LOG
Definition: elog.h:26
Form_pg_class rd_rel
Definition: rel.h:114
unsigned int Oid
Definition: postgres_ext.h:31
struct SlotErrCallbackArg SlotErrCallbackArg
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:751
int wal_receiver_status_interval
Definition: walreceiver.c:74
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1649
struct ErrorContextCallback * previous
Definition: elog.h:238
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:300
#define OidIsValid(objectId)
Definition: c.h:538
XLogRecPtr last_lsn
Definition: walsender.c:206
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
static int fd(const char *x, int i)
Definition: preproc-init.c:105
int natts
Definition: tupdesc.h:73
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:206
#define ALLOCSET_DEFAULT_MINSIZE
Definition: memutils.h:162
int wal_receiver_timeout
Definition: walreceiver.c:75
Latch procLatch
Definition: proc.h:103
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:915
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:149
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:3107
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:160
ErrorContextCallback * error_context_stack
Definition: elog.c:88
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:216
#define list_make1(x1)
Definition: pg_list.h:139
#define NAMEDATALEN
void FreeExecutorState(EState *estate)
Definition: execUtils.c:178
Subscription * GetSubscription(Oid subid, bool missing_ok)
#define GetPerTupleExprContext(estate)
Definition: executor.h:455
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:157
static StringInfoData reply_message
Definition: walreceiver.c:111
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:1434
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:950
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
#define REPLICA_IDENTITY_FULL
Definition: pg_class.h:179
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:63
#define ERROR
Definition: elog.h:43
static bool ensure_transaction(void)
Definition: worker.c:151
LogicalRepRelation remoterel
#define NAPTIME_PER_CYCLE
Definition: worker.c:87
bool in_remote_transaction
Definition: worker.c:112
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:376
Definition: guc.h:75
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:296
static volatile sig_atomic_t got_SIGTERM
Definition: autovacuum.c:142
XLogRecPtr reply_lsn
static bool am_tablesync_worker(void)
static void apply_handle_delete(StringInfo s)
Definition: worker.c:757
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
#define DEBUG2
Definition: elog.h:24
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:151
char * c
#define walrcv_identify_system(conn, primary_tli, server_version)
Definition: walreceiver.h:248
void logicalrep_worker_attach(int slot)
Definition: launcher.c:535
#define NoLock
Definition: lockdefs.h:34
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:6676
static char * buf
Definition: pg_test_fsync.c:66
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:728
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:152
bool changed[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
bool * tts_isnull
Definition: tuptable.h:126
void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:265
ResultRelInfo * es_result_relations
Definition: execnodes.h:421
#define RowExclusiveLock
Definition: lockdefs.h:38
#define RelationGetRelationName(relation)
Definition: rel.h:437
XLogRecPtr startpoint
Definition: walreceiver.h:147
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:187
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
unsigned int uint32
Definition: c.h:268
int pgsocket
Definition: port.h:22
List * publications
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
static void apply_handle_begin(StringInfo s)
Definition: worker.c:418
RepOriginId replorigin_create(char *roname)
Definition: origin.c:235
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid)
Definition: postmaster.c:5489
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:222
TupleTableSlot * es_trig_tuple_slot
Definition: execnodes.h:427
#define ereport(elevel, rest)
Definition: elog.h:122
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2599
void slot_getallattrs(TupleTableSlot *slot)
Definition: heaptuple.c:1239
MemoryContext TopMemoryContext
Definition: mcxt.c:43
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:378
EState * CreateExecutorState(void)
Definition: execUtils.c:80
Definition: guc.h:72
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:602
static MemoryContext ApplyContext
Definition: worker.c:104
static char ** options
XLogRecPtr final_lsn
Definition: logicalproto.h:65
#define DLIST_STATIC_INIT(name)
Definition: ilist.h:248
static void apply_handle_commit(StringInfo s)
Definition: worker.c:437
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:67
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:121
Node * build_column_default(Relation rel, int attrno)
#define SUBREL_STATE_SYNCDONE
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
List * es_tupleTable
Definition: execnodes.h:440
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1382
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:156
static void apply_handle_update(StringInfo s)
Definition: worker.c:636
uintptr_t Datum
Definition: postgres.h:372
void CommandCounterIncrement(void)
Definition: xact.c:922
void ExecSetSlotDescriptor(TupleTableSlot *slot, TupleDesc tupdesc)
Definition: execTuples.c:247
#define PGINVALID_SOCKET
Definition: port.h:24
void logicalrep_worker_sigterm(SIGNAL_ARGS)
Definition: launcher.c:621
int es_num_result_relations
Definition: execnodes.h:422
void EvalPlanQualInit(EPQState *epqstate, EState *estate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2696
#define SIGHUP
Definition: win32.h:188
static void apply_handle_relation(StringInfo s)
Definition: worker.c:497
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
union WalRcvStreamOptions::@97 proto
XLogRecPtr local_end
Definition: worker.c:92
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4157
static void apply_dispatch(StringInfo s)
Definition: worker.c:856
#define makeNode(_type_)
Definition: nodes.h:557
TimestampTz last_recv_time
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:90
#define Assert(condition)
Definition: c.h:675
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
RepOriginId replorigin_session_origin
Definition: origin.c:150
static void apply_handle_origin(StringInfo s)
Definition: worker.c:475
void StartTransactionCommand(void)
Definition: xact.c:2677
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:137
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
static int server_version
Definition: pg_dumpall.c:80
void CreateCacheMemoryContext(void)
Definition: catcache.c:525
static void slot_store_error_callback(void *arg)
Definition: worker.c:271
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:211
bool IsTransactionState(void)
Definition: xact.c:350
#define walrcv_disconnect(conn)
Definition: walreceiver.h:264
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:258
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:92
bool MySubscriptionValid
Definition: worker.c:110
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:33
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:460
void FreeSubscription(Subscription *sub)
RTEKind rtekind
Definition: parsenodes.h:929
static Datum values[MAXATTR]
Definition: bootstrap.c:163
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4177
void(* callback)(void *arg)
Definition: elog.h:239
static void slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values, bool *replaces)
Definition: worker.c:360
void * palloc(Size size)
Definition: mcxt.c:849
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:994
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
#define ALLOCSET_DEFAULT_INITSIZE
Definition: memutils.h:163
int i
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
#define errcontext
Definition: elog.h:164
void * arg
struct FlushPosition FlushPosition
#define ALLOCSET_DEFAULT_MAXSIZE
Definition: memutils.h:164
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:113
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
HeapTuple tts_tuple
Definition: tuptable.h:120
#define elog
Definition: elog.h:219
XLogRecPtr commit_lsn
Definition: logicalproto.h:72
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1726
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1738
#define WL_LATCH_SET
Definition: latch.h:124
#define SUBREL_STATE_READY
#define RelationGetRelid(relation)
Definition: rel.h:417
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:224
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4644
void ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
TimestampTz committime
Definition: logicalproto.h:74
void logicalrep_typmap_update(LogicalRepTyp *remotetyp)
Definition: relation.c:408
void ApplyWorkerMain(Datum main_arg)
Definition: worker.c:1442
uint32 LogicalRepRelId
Definition: logicalproto.h:37
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:488
TimestampTz reply_time
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:219
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:193
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
MemoryContext CacheMemoryContext
Definition: mcxt.c:46
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5518
void pgstat_report_stat(bool force)
Definition: pgstat.c:780
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:242
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:423