PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
worker.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * worker.c
3  * PostgreSQL logical replication worker (apply)
4  *
5  * Copyright (c) 2016-2017, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/worker.c
9  *
10  * NOTES
11  * This file contains the worker which applies logical changes as they come
12  * from remote logical replication stream.
13  *
14  * The main worker (apply) is started by logical replication worker
15  * launcher for every enabled subscription in a database. It uses
16  * walsender protocol to communicate with publisher.
17  *
18  * The apply worker may spawn additional workers (sync) for initial data
19  * synchronization of tables.
20  *
21  * This module includes server facing code and shares libpqwalreceiver
22  * module with walreceiver for providing the libpq specific functionality.
23  *
24  *-------------------------------------------------------------------------
25  */
26 
27 #include "postgres.h"
28 
29 #include "miscadmin.h"
30 #include "pgstat.h"
31 #include "funcapi.h"
32 
33 #include "access/xact.h"
34 #include "access/xlog_internal.h"
35 
36 #include "catalog/namespace.h"
38 
39 #include "commands/trigger.h"
40 
41 #include "executor/executor.h"
43 
44 #include "libpq/pqformat.h"
45 #include "libpq/pqsignal.h"
46 
47 #include "mb/pg_wchar.h"
48 
49 #include "nodes/makefuncs.h"
50 
51 #include "optimizer/planner.h"
52 
53 #include "parser/parse_relation.h"
54 
55 #include "postmaster/bgworker.h"
56 #include "postmaster/postmaster.h"
57 
58 #include "replication/decode.h"
59 #include "replication/logical.h"
64 #include "replication/origin.h"
65 #include "replication/snapbuild.h"
68 
69 #include "rewrite/rewriteHandler.h"
70 
71 #include "storage/bufmgr.h"
72 #include "storage/ipc.h"
73 #include "storage/lmgr.h"
74 #include "storage/proc.h"
75 #include "storage/procarray.h"
76 
77 #include "utils/builtins.h"
78 #include "utils/catcache.h"
79 #include "utils/datum.h"
80 #include "utils/fmgroids.h"
81 #include "utils/guc.h"
82 #include "utils/inval.h"
83 #include "utils/lsyscache.h"
84 #include "utils/memutils.h"
85 #include "utils/timeout.h"
86 #include "utils/tqual.h"
87 #include "utils/syscache.h"
88 
89 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
90 
91 typedef struct FlushPosition
92 {
97 
99 
100 typedef struct SlotErrCallbackArg
101 {
103  int attnum;
105 
108 
110 
112 bool MySubscriptionValid = false;
113 
115 
116 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
117 
118 static void store_flush_position(XLogRecPtr remote_lsn);
119 
120 static void reread_subscription(void);
121 
122 /*
123  * Make sure that we started local transaction.
124  *
125  * Also switches to ApplyContext as necessary.
126  */
127 static bool
129 {
130  if (IsTransactionState())
131  {
132  if (CurrentMemoryContext != ApplyContext)
133  MemoryContextSwitchTo(ApplyContext);
134  return false;
135  }
136 
138 
139  if (!MySubscriptionValid)
141 
142  MemoryContextSwitchTo(ApplyContext);
143  return true;
144 }
145 
146 
147 /*
148  * Executor state preparation for evaluation of constraint expressions,
149  * indexes and triggers.
150  *
151  * This is based on similar code in copy.c
152  */
153 static EState *
155 {
156  EState *estate;
157  ResultRelInfo *resultRelInfo;
158  RangeTblEntry *rte;
159 
160  estate = CreateExecutorState();
161 
162  rte = makeNode(RangeTblEntry);
163  rte->rtekind = RTE_RELATION;
164  rte->relid = RelationGetRelid(rel->localrel);
165  rte->relkind = rel->localrel->rd_rel->relkind;
166  estate->es_range_table = list_make1(rte);
167 
168  resultRelInfo = makeNode(ResultRelInfo);
169  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
170 
171  estate->es_result_relations = resultRelInfo;
172  estate->es_num_result_relations = 1;
173  estate->es_result_relation_info = resultRelInfo;
174 
175  /* Triggers might need a slot */
176  if (resultRelInfo->ri_TrigDesc)
177  estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
178 
179  return estate;
180 }
181 
182 /*
183  * Executes default values for columns for which we can't map to remote
184  * relation columns.
185  *
186  * This allows us to support tables which have more columns on the downstream
187  * than on the upstream.
188  */
189 static void
191  TupleTableSlot *slot)
192 {
193  TupleDesc desc = RelationGetDescr(rel->localrel);
194  int num_phys_attrs = desc->natts;
195  int i;
196  int attnum,
197  num_defaults = 0;
198  int *defmap;
199  ExprState **defexprs;
200  ExprContext *econtext;
201 
202  econtext = GetPerTupleExprContext(estate);
203 
204  /* We got all the data via replication, no need to evaluate anything. */
205  if (num_phys_attrs == rel->remoterel.natts)
206  return;
207 
208  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
209  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
210 
211  for (attnum = 0; attnum < num_phys_attrs; attnum++)
212  {
213  Expr *defexpr;
214 
215  if (desc->attrs[attnum]->attisdropped)
216  continue;
217 
218  if (rel->attrmap[attnum] >= 0)
219  continue;
220 
221  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
222 
223  if (defexpr != NULL)
224  {
225  /* Run the expression through planner */
226  defexpr = expression_planner(defexpr);
227 
228  /* Initialize executable expression in copycontext */
229  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
230  defmap[num_defaults] = attnum;
231  num_defaults++;
232  }
233 
234  }
235 
236  for (i = 0; i < num_defaults; i++)
237  slot->tts_values[defmap[i]] =
238  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
239 }
240 
241 /*
242  * Error callback to give more context info about type conversion failure.
243  */
244 static void
246 {
247  SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
248  Oid remotetypoid,
249  localtypoid;
250 
251  if (errarg->attnum < 0)
252  return;
253 
254  remotetypoid = errarg->rel->atttyps[errarg->attnum];
255  localtypoid = logicalrep_typmap_getid(remotetypoid);
256  errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
257  "remote type %s, local type %s",
258  errarg->rel->nspname, errarg->rel->relname,
259  errarg->rel->attnames[errarg->attnum],
260  format_type_be(remotetypoid),
261  format_type_be(localtypoid));
262 }
263 
264 /*
265  * Store data in C string form into slot.
266  * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
267  * use better.
268  */
269 static void
271  char **values)
272 {
273  int natts = slot->tts_tupleDescriptor->natts;
274  int i;
275  SlotErrCallbackArg errarg;
276  ErrorContextCallback errcallback;
277 
278  ExecClearTuple(slot);
279 
280  /* Push callback + info on the error context stack */
281  errarg.rel = &rel->remoterel;
282  errarg.attnum = -1;
283  errcallback.callback = slot_store_error_callback;
284  errcallback.arg = (void *) &errarg;
285  errcallback.previous = error_context_stack;
286  error_context_stack = &errcallback;
287 
288  /* Call the "in" function for each non-dropped attribute */
289  for (i = 0; i < natts; i++)
290  {
292  int remoteattnum = rel->attrmap[i];
293 
294  if (!att->attisdropped && remoteattnum >= 0 &&
295  values[remoteattnum] != NULL)
296  {
297  Oid typinput;
298  Oid typioparam;
299 
300  errarg.attnum = remoteattnum;
301 
302  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
303  slot->tts_values[i] = OidInputFunctionCall(typinput,
304  values[remoteattnum],
305  typioparam,
306  att->atttypmod);
307  slot->tts_isnull[i] = false;
308  }
309  else
310  {
311  /*
312  * We assign NULL to dropped attributes, NULL values, and missing
313  * values (missing values should be later filled using
314  * slot_fill_defaults).
315  */
316  slot->tts_values[i] = (Datum) 0;
317  slot->tts_isnull[i] = true;
318  }
319  }
320 
321  /* Pop the error context stack */
322  error_context_stack = errcallback.previous;
323 
324  ExecStoreVirtualTuple(slot);
325 }
326 
327 /*
328  * Modify slot with user data provided as C strigs.
329  * This is somewhat similar to heap_modify_tuple but also calls the type
330  * input function on the user data as the input is the text representation
331  * of the types.
332  */
333 static void
335  char **values, bool *replaces)
336 {
337  int natts = slot->tts_tupleDescriptor->natts;
338  int i;
339  SlotErrCallbackArg errarg;
340  ErrorContextCallback errcallback;
341 
342  slot_getallattrs(slot);
343  ExecClearTuple(slot);
344 
345  /* Push callback + info on the error context stack */
346  errarg.rel = &rel->remoterel;
347  errarg.attnum = -1;
348  errcallback.callback = slot_store_error_callback;
349  errcallback.arg = (void *) &errarg;
350  errcallback.previous = error_context_stack;
351  error_context_stack = &errcallback;
352 
353  /* Call the "in" function for each replaced attribute */
354  for (i = 0; i < natts; i++)
355  {
357  int remoteattnum = rel->attrmap[i];
358 
359  if (remoteattnum >= 0 && !replaces[remoteattnum])
360  continue;
361 
362  if (remoteattnum >= 0 && values[remoteattnum] != NULL)
363  {
364  Oid typinput;
365  Oid typioparam;
366 
367  errarg.attnum = remoteattnum;
368 
369  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
370  slot->tts_values[i] = OidInputFunctionCall(typinput, values[i],
371  typioparam,
372  att->atttypmod);
373  slot->tts_isnull[i] = false;
374  }
375  else
376  {
377  slot->tts_values[i] = (Datum) 0;
378  slot->tts_isnull[i] = true;
379  }
380  }
381 
382  /* Pop the error context stack */
383  error_context_stack = errcallback.previous;
384 
385  ExecStoreVirtualTuple(slot);
386 }
387 
388 /*
389  * Handle BEGIN message.
390  */
391 static void
393 {
394  LogicalRepBeginData begin_data;
395 
396  logicalrep_read_begin(s, &begin_data);
397 
400 
401  in_remote_transaction = true;
402 
404 }
405 
406 /*
407  * Handle COMMIT message.
408  *
409  * TODO, support tracking of multiple origins
410  */
411 static void
413 {
414  LogicalRepCommitData commit_data;
415 
416  logicalrep_read_commit(s, &commit_data);
417 
420 
421  if (IsTransactionState())
422  {
424 
425  store_flush_position(commit_data.end_lsn);
426  }
427 
428  in_remote_transaction = false;
429 
431 }
432 
433 /*
434  * Handle ORIGIN message.
435  *
436  * TODO, support tracking of multiple origins
437  */
438 static void
440 {
441  /*
442  * ORIGIN message can only come inside remote transaction and before
443  * any actual writes.
444  */
446  ereport(ERROR,
447  (errcode(ERRCODE_PROTOCOL_VIOLATION),
448  errmsg("ORIGIN message sent out of order")));
449 }
450 
451 /*
452  * Handle RELATION message.
453  *
454  * Note we don't do validation against local schema here. The validation
455  * against local schema is postponed until first change for given relation
456  * comes as we only care about it when applying changes for it anyway and we
457  * do less locking this way.
458  */
459 static void
461 {
462  LogicalRepRelation *rel;
463 
464  rel = logicalrep_read_rel(s);
466 }
467 
468 /*
469  * Handle TYPE message.
470  *
471  * Note we don't do local mapping here, that's done when the type is
472  * actually used.
473  */
474 static void
476 {
477  LogicalRepTyp typ;
478 
479  logicalrep_read_typ(s, &typ);
481 }
482 
483 /*
484  * Get replica identity index or if it is not defined a primary key.
485  *
486  * If neither is defined, returns InvalidOid
487  */
488 static Oid
490 {
491  Oid idxoid;
492 
493  idxoid = RelationGetReplicaIndex(rel);
494 
495  if (!OidIsValid(idxoid))
496  idxoid = RelationGetPrimaryKeyIndex(rel);
497 
498  return idxoid;
499 }
500 
501 /*
502  * Handle INSERT message.
503  */
504 static void
506 {
508  LogicalRepTupleData newtup;
509  LogicalRepRelId relid;
510  EState *estate;
511  TupleTableSlot *remoteslot;
512  MemoryContext oldctx;
513 
515 
516  relid = logicalrep_read_insert(s, &newtup);
518 
519  /* Initialize the executor state. */
520  estate = create_estate_for_relation(rel);
521  remoteslot = ExecInitExtraTupleSlot(estate);
523 
524  /* Process and store remote tuple in the slot */
526  slot_store_cstrings(remoteslot, rel, newtup.values);
527  slot_fill_defaults(rel, estate, remoteslot);
528  MemoryContextSwitchTo(oldctx);
529 
532 
533  /* Do the insert. */
534  ExecSimpleRelationInsert(estate, remoteslot);
535 
536  /* Cleanup. */
539  ExecResetTupleTable(estate->es_tupleTable, false);
540  FreeExecutorState(estate);
541 
543 
545 }
546 
547 /*
548  * Check if the logical replication relation is updatable and throw
549  * appropriate error if it isn't.
550  */
551 static void
553 {
554  /* Updatable, no error. */
555  if (rel->updatable)
556  return;
557 
558  /*
559  * We are in error mode so it's fine this is somewhat slow.
560  * It's better to give user correct error.
561  */
563  {
564  ereport(ERROR,
565  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
566  errmsg("publisher does not send replica identity column "
567  "expected by the logical replication target relation \"%s.%s\"",
568  rel->remoterel.nspname, rel->remoterel.relname)));
569  }
570 
571  ereport(ERROR,
572  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
573  errmsg("logical replication target relation \"%s.%s\" has "
574  "neither REPLICA IDENTIY index nor PRIMARY "
575  "KEY and published relation does not have "
576  "REPLICA IDENTITY FULL",
577  rel->remoterel.nspname, rel->remoterel.relname)));
578 }
579 
580 /*
581  * Handle UPDATE message.
582  *
583  * TODO: FDW support
584  */
585 static void
587 {
589  LogicalRepRelId relid;
590  Oid idxoid;
591  EState *estate;
592  EPQState epqstate;
593  LogicalRepTupleData oldtup;
594  LogicalRepTupleData newtup;
595  bool has_oldtup;
596  TupleTableSlot *localslot;
597  TupleTableSlot *remoteslot;
598  bool found;
599  MemoryContext oldctx;
600 
602 
603  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
604  &newtup);
606 
607  /* Check if we can do the update. */
609 
610  /* Initialize the executor state. */
611  estate = create_estate_for_relation(rel);
612  remoteslot = ExecInitExtraTupleSlot(estate);
614  localslot = ExecInitExtraTupleSlot(estate);
616  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
617 
620 
621  /* Build the search tuple. */
623  slot_store_cstrings(remoteslot, rel,
624  has_oldtup ? oldtup.values : newtup.values);
625  MemoryContextSwitchTo(oldctx);
626 
627  /*
628  * Try to find tuple using either replica identity index, primary key
629  * or if needed, sequential scan.
630  */
631  idxoid = GetRelationIdentityOrPK(rel->localrel);
632  Assert(OidIsValid(idxoid) ||
633  (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
634 
635  if (OidIsValid(idxoid))
636  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
638  remoteslot, localslot);
639  else
641  remoteslot, localslot);
642 
643  ExecClearTuple(remoteslot);
644 
645  /*
646  * Tuple found.
647  *
648  * Note this will fail if there are other conflicting unique indexes.
649  */
650  if (found)
651  {
652  /* Process and store remote tuple in the slot */
654  ExecStoreTuple(localslot->tts_tuple, remoteslot, InvalidBuffer, false);
655  slot_modify_cstrings(remoteslot, rel, newtup.values, newtup.changed);
656  MemoryContextSwitchTo(oldctx);
657 
658  EvalPlanQualSetSlot(&epqstate, remoteslot);
659 
660  /* Do the actual update. */
661  ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
662  }
663  else
664  {
665  /*
666  * The tuple to be updated could not be found.
667  *
668  * TODO what to do here, change the log level to LOG perhaps?
669  */
670  elog(DEBUG1,
671  "logical replication did not find row for update "
672  "in replication target relation \"%s\"",
674  }
675 
676  /* Cleanup. */
679  EvalPlanQualEnd(&epqstate);
680  ExecResetTupleTable(estate->es_tupleTable, false);
681  FreeExecutorState(estate);
682 
684 
686 }
687 
688 /*
689  * Handle DELETE message.
690  *
691  * TODO: FDW support
692  */
693 static void
695 {
697  LogicalRepTupleData oldtup;
698  LogicalRepRelId relid;
699  Oid idxoid;
700  EState *estate;
701  EPQState epqstate;
702  TupleTableSlot *remoteslot;
703  TupleTableSlot *localslot;
704  bool found;
705  MemoryContext oldctx;
706 
708 
709  relid = logicalrep_read_delete(s, &oldtup);
711 
712  /* Check if we can do the delete. */
714 
715  /* Initialize the executor state. */
716  estate = create_estate_for_relation(rel);
717  remoteslot = ExecInitExtraTupleSlot(estate);
719  localslot = ExecInitExtraTupleSlot(estate);
721  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
722 
725 
726  /* Find the tuple using the replica identity index. */
728  slot_store_cstrings(remoteslot, rel, oldtup.values);
729  MemoryContextSwitchTo(oldctx);
730 
731  /*
732  * Try to find tuple using either replica identity index, primary key
733  * or if needed, sequential scan.
734  */
735  idxoid = GetRelationIdentityOrPK(rel->localrel);
736  Assert(OidIsValid(idxoid) ||
738 
739  if (OidIsValid(idxoid))
740  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
742  remoteslot, localslot);
743  else
745  remoteslot, localslot);
746  /* If found delete it. */
747  if (found)
748  {
749  EvalPlanQualSetSlot(&epqstate, localslot);
750 
751  /* Do the actual delete. */
752  ExecSimpleRelationDelete(estate, &epqstate, localslot);
753  }
754  else
755  {
756  /* The tuple to be deleted could not be found.*/
757  ereport(DEBUG1,
758  (errmsg("logical replication could not find row for delete "
759  "in replication target %s",
761  }
762 
763  /* Cleanup. */
766  EvalPlanQualEnd(&epqstate);
767  ExecResetTupleTable(estate->es_tupleTable, false);
768  FreeExecutorState(estate);
769 
771 
773 }
774 
775 
776 /*
777  * Logical replication protocol message dispatcher.
778  */
779 static void
781 {
782  char action = pq_getmsgbyte(s);
783 
784  switch (action)
785  {
786  /* BEGIN */
787  case 'B':
789  break;
790  /* COMMIT */
791  case 'C':
793  break;
794  /* INSERT */
795  case 'I':
797  break;
798  /* UPDATE */
799  case 'U':
801  break;
802  /* DELETE */
803  case 'D':
805  break;
806  /* RELATION */
807  case 'R':
809  break;
810  /* TYPE */
811  case 'Y':
813  break;
814  /* ORIGIN */
815  case 'O':
817  break;
818  default:
819  ereport(ERROR,
820  (errcode(ERRCODE_PROTOCOL_VIOLATION),
821  errmsg("invalid logical replication message type %c", action)));
822  }
823 }
824 
825 /*
826  * Figure out which write/flush positions to report to the walsender process.
827  *
828  * We can't simply report back the last LSN the walsender sent us because the
829  * local transaction might not yet be flushed to disk locally. Instead we
830  * build a list that associates local with remote LSNs for every commit. When
831  * reporting back the flush position to the sender we iterate that list and
832  * check which entries on it are already locally flushed. Those we can report
833  * as having been flushed.
834  *
835  * The have_pending_txes is true if there are outstanding transactions that
836  * need to be flushed.
837  */
838 static void
840  bool *have_pending_txes)
841 {
842  dlist_mutable_iter iter;
843  XLogRecPtr local_flush = GetFlushRecPtr();
844 
845  *write = InvalidXLogRecPtr;
846  *flush = InvalidXLogRecPtr;
847 
848  dlist_foreach_modify(iter, &lsn_mapping)
849  {
850  FlushPosition *pos =
851  dlist_container(FlushPosition, node, iter.cur);
852 
853  *write = pos->remote_end;
854 
855  if (pos->local_end <= local_flush)
856  {
857  *flush = pos->remote_end;
858  dlist_delete(iter.cur);
859  pfree(pos);
860  }
861  else
862  {
863  /*
864  * Don't want to uselessly iterate over the rest of the list which
865  * could potentially be long. Instead get the last element and
866  * grab the write position from there.
867  */
868  pos = dlist_tail_element(FlushPosition, node,
869  &lsn_mapping);
870  *write = pos->remote_end;
871  *have_pending_txes = true;
872  return;
873  }
874  }
875 
876  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
877 }
878 
879 /*
880  * Store current remote/local lsn pair in the tracking list.
881  */
882 static void
884 {
885  FlushPosition *flushpos;
886 
887  /* Need to do this in permanent context */
888  MemoryContextSwitchTo(ApplyCacheContext);
889 
890  /* Track commit lsn */
891  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
892  flushpos->local_end = XactLastCommitEnd;
893  flushpos->remote_end = remote_lsn;
894 
895  dlist_push_tail(&lsn_mapping, &flushpos->node);
896  MemoryContextSwitchTo(ApplyContext);
897 }
898 
899 
900 /* Update statistics of the worker. */
901 static void
902 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
903 {
904  MyLogicalRepWorker->last_lsn = last_lsn;
905  MyLogicalRepWorker->last_send_time = send_time;
907  if (reply)
908  {
909  MyLogicalRepWorker->reply_lsn = last_lsn;
910  MyLogicalRepWorker->reply_time = send_time;
911  }
912 }
913 
914 /*
915  * Apply main loop.
916  */
917 static void
919 {
920  XLogRecPtr last_received = InvalidXLogRecPtr;
921 
922  /* Init the ApplyContext which we use for easier cleanup. */
924  "ApplyContext",
928 
929  /* mark as idle, before starting to loop */
931 
932  while (!got_SIGTERM)
933  {
935  int rc;
936  int len;
937  char *buf = NULL;
938  bool endofstream = false;
939  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
940  bool ping_sent = false;
941 
942  MemoryContextSwitchTo(ApplyContext);
943 
944  len = walrcv_receive(wrconn, &buf, &fd);
945 
946  if (len != 0)
947  {
948  /* Process the data */
949  for (;;)
950  {
952 
953  if (len == 0)
954  {
955  break;
956  }
957  else if (len < 0)
958  {
959  ereport(LOG,
960  (errmsg("data stream from publisher has ended")));
961  endofstream = true;
962  break;
963  }
964  else
965  {
966  int c;
967  StringInfoData s;
968 
969  /* Reset timeout. */
970  last_recv_timestamp = GetCurrentTimestamp();
971  ping_sent = false;
972 
973  /* Ensure we are reading the data into our memory context. */
974  MemoryContextSwitchTo(ApplyContext);
975 
976  s.data = buf;
977  s.len = len;
978  s.cursor = 0;
979  s.maxlen = -1;
980 
981  c = pq_getmsgbyte(&s);
982 
983  if (c == 'w')
984  {
985  XLogRecPtr start_lsn;
986  XLogRecPtr end_lsn;
987  TimestampTz send_time;
988 
989  start_lsn = pq_getmsgint64(&s);
990  end_lsn = pq_getmsgint64(&s);
991  send_time =
993 
994  if (last_received < start_lsn)
995  last_received = start_lsn;
996 
997  if (last_received < end_lsn)
998  last_received = end_lsn;
999 
1000  UpdateWorkerStats(last_received, send_time, false);
1001 
1002  apply_dispatch(&s);
1003  }
1004  else if (c == 'k')
1005  {
1008  bool reply_requested;
1009 
1010  endpos = pq_getmsgint64(&s);
1011  timestamp =
1013  reply_requested = pq_getmsgbyte(&s);
1014 
1015  send_feedback(endpos, reply_requested, false);
1016  UpdateWorkerStats(last_received, timestamp, true);
1017  }
1018  /* other message types are purposefully ignored */
1019  }
1020 
1021  len = walrcv_receive(wrconn, &buf, &fd);
1022  }
1023  }
1024 
1025  if (!in_remote_transaction)
1026  {
1027  /*
1028  * If we didn't get any transactions for a while there might be
1029  * unconsumed invalidation messages in the queue, consume them now.
1030  */
1032  /* Check for subscription change */
1033  if (!MySubscriptionValid)
1036  }
1037 
1038  /* confirm all writes at once */
1039  send_feedback(last_received, false, false);
1040 
1041  /* Cleanup the memory. */
1044 
1045  /* Check if we need to exit the streaming loop. */
1046  if (endofstream)
1047  break;
1048 
1049  /*
1050  * Wait for more data or latch.
1051  */
1055  fd, NAPTIME_PER_CYCLE,
1057 
1058  /* Emergency bailout if postmaster has died */
1059  if (rc & WL_POSTMASTER_DEATH)
1060  proc_exit(1);
1061 
1062  if (rc & WL_TIMEOUT)
1063  {
1064  /*
1065  * We didn't receive anything new. If we haven't heard
1066  * anything from the server for more than
1067  * wal_receiver_timeout / 2, ping the server. Also, if
1068  * it's been longer than wal_receiver_status_interval
1069  * since the last update we sent, send a status update to
1070  * the master anyway, to report any progress in applying
1071  * WAL.
1072  */
1073  bool requestReply = false;
1074 
1075  /*
1076  * Check if time since last receive from standby has
1077  * reached the configured limit.
1078  */
1079  if (wal_receiver_timeout > 0)
1080  {
1082  TimestampTz timeout;
1083 
1084  timeout =
1085  TimestampTzPlusMilliseconds(last_recv_timestamp,
1087 
1088  if (now >= timeout)
1089  ereport(ERROR,
1090  (errmsg("terminating logical replication worker due to timeout")));
1091 
1092  /*
1093  * We didn't receive anything new, for half of
1094  * receiver replication timeout. Ping the server.
1095  */
1096  if (!ping_sent)
1097  {
1098  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1099  (wal_receiver_timeout / 2));
1100  if (now >= timeout)
1101  {
1102  requestReply = true;
1103  ping_sent = true;
1104  }
1105  }
1106  }
1107 
1108  send_feedback(last_received, requestReply, requestReply);
1109  }
1110 
1112  }
1113 }
1114 
1115 /*
1116  * Send a Standby Status Update message to server.
1117  *
1118  * 'recvpos' is the latest LSN we've received data to, force is set if we need
1119  * to send a response to avoid timeouts.
1120  */
1121 static void
1122 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
1123 {
1124  static StringInfo reply_message = NULL;
1125  static TimestampTz send_time = 0;
1126 
1127  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1128  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1129  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1130 
1131  XLogRecPtr writepos;
1132  XLogRecPtr flushpos;
1133  TimestampTz now;
1134  bool have_pending_txes;
1135 
1136  /*
1137  * If the user doesn't want status to be reported to the publisher, be
1138  * sure to exit before doing anything at all.
1139  */
1140  if (!force && wal_receiver_status_interval <= 0)
1141  return;
1142 
1143  /* It's legal to not pass a recvpos */
1144  if (recvpos < last_recvpos)
1145  recvpos = last_recvpos;
1146 
1147  get_flush_position(&writepos, &flushpos, &have_pending_txes);
1148 
1149  /*
1150  * No outstanding transactions to flush, we can report the latest
1151  * received position. This is important for synchronous replication.
1152  */
1153  if (!have_pending_txes)
1154  flushpos = writepos = recvpos;
1155 
1156  if (writepos < last_writepos)
1157  writepos = last_writepos;
1158 
1159  if (flushpos < last_flushpos)
1160  flushpos = last_flushpos;
1161 
1162  now = GetCurrentTimestamp();
1163 
1164  /* if we've already reported everything we're good */
1165  if (!force &&
1166  writepos == last_writepos &&
1167  flushpos == last_flushpos &&
1168  !TimestampDifferenceExceeds(send_time, now,
1170  return;
1171  send_time = now;
1172 
1173  if (!reply_message)
1174  {
1175  MemoryContext oldctx = MemoryContextSwitchTo(ApplyCacheContext);
1176  reply_message = makeStringInfo();
1177  MemoryContextSwitchTo(oldctx);
1178  }
1179  else
1180  resetStringInfo(reply_message);
1181 
1182  pq_sendbyte(reply_message, 'r');
1183  pq_sendint64(reply_message, recvpos); /* write */
1184  pq_sendint64(reply_message, flushpos); /* flush */
1185  pq_sendint64(reply_message, writepos); /* apply */
1186  pq_sendint64(reply_message, now); /* sendTime */
1187  pq_sendbyte(reply_message, requestReply); /* replyRequested */
1188 
1189  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1190  force,
1191  (uint32) (recvpos >> 32), (uint32) recvpos,
1192  (uint32) (writepos >> 32), (uint32) writepos,
1193  (uint32) (flushpos >> 32), (uint32) flushpos
1194  );
1195 
1196  walrcv_send(wrconn, reply_message->data, reply_message->len);
1197 
1198  if (recvpos > last_recvpos)
1199  last_recvpos = recvpos;
1200  if (writepos > last_writepos)
1201  last_writepos = writepos;
1202  if (flushpos > last_flushpos)
1203  last_flushpos = flushpos;
1204 }
1205 
1206 
1207 /*
1208  * Reread subscription info and exit on change.
1209  */
1210 static void
1212 {
1213  MemoryContext oldctx;
1215 
1216  /* Ensure allocations in permanent context. */
1217  oldctx = MemoryContextSwitchTo(ApplyCacheContext);
1218 
1219  newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1220 
1221  /*
1222  * Exit if the subscription was removed.
1223  * This normally should not happen as the worker gets killed
1224  * during DROP SUBSCRIPTION.
1225  */
1226  if (!newsub)
1227  {
1228  ereport(LOG,
1229  (errmsg("logical replication worker for subscription \"%s\" will "
1230  "stop because the subscription was removed",
1231  MySubscription->name)));
1232 
1233  walrcv_disconnect(wrconn);
1234  proc_exit(0);
1235  }
1236 
1237  /*
1238  * Exit if connection string was changed. The launcher will start
1239  * new worker.
1240  */
1241  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1242  {
1243  ereport(LOG,
1244  (errmsg("logical replication worker for subscription \"%s\" will "
1245  "restart because the connection information was changed",
1246  MySubscription->name)));
1247 
1248  walrcv_disconnect(wrconn);
1249  proc_exit(0);
1250  }
1251 
1252  /*
1253  * Exit if publication list was changed. The launcher will start
1254  * new worker.
1255  */
1256  if (!equal(newsub->publications, MySubscription->publications))
1257  {
1258  ereport(LOG,
1259  (errmsg("logical replication worker for subscription \"%s\" will "
1260  "restart because subscription's publications were changed",
1261  MySubscription->name)));
1262 
1263  walrcv_disconnect(wrconn);
1264  proc_exit(0);
1265  }
1266 
1267  /*
1268  * Exit if the subscription was disabled.
1269  * This normally should not happen as the worker gets killed
1270  * during ALTER SUBSCRIPTION ... DISABLE.
1271  */
1272  if (!newsub->enabled)
1273  {
1274  ereport(LOG,
1275  (errmsg("logical replication worker for subscription \"%s\" will "
1276  "stop because the subscription was disabled",
1277  MySubscription->name)));
1278 
1279  walrcv_disconnect(wrconn);
1280  proc_exit(0);
1281  }
1282 
1283  /* Check for other changes that should never happen too. */
1284  if (newsub->dbid != MySubscription->dbid ||
1285  strcmp(newsub->name, MySubscription->name) != 0 ||
1286  strcmp(newsub->slotname, MySubscription->slotname) != 0)
1287  {
1288  elog(ERROR, "subscription %u changed unexpectedly",
1290  }
1291 
1292  /* Clean old subscription info and switch to new one. */
1293  FreeSubscription(MySubscription);
1294  MySubscription = newsub;
1295 
1296  MemoryContextSwitchTo(oldctx);
1297 
1298  MySubscriptionValid = true;
1299 }
1300 
1301 /*
1302  * Callback from subscription syscache invalidation.
1303  */
1304 static void
1305 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
1306 {
1307  MySubscriptionValid = false;
1308 }
1309 
1310 
1311 /* Logical Replication Apply worker entry point */
1312 void
1314 {
1315  int worker_slot = DatumGetObjectId(main_arg);
1316  MemoryContext oldctx;
1317  char originname[NAMEDATALEN];
1318  RepOriginId originid;
1319  XLogRecPtr origin_startpos;
1320  char *err;
1321  int server_version;
1322  TimeLineID startpointTLI;
1324 
1325  /* Attach to slot */
1326  logicalrep_worker_attach(worker_slot);
1327 
1328  /* Setup signal handling */
1331 
1332  /* Initialise stats to a sanish value */
1335 
1336  /* Make it easy to identify our processes. */
1337  SetConfigOption("application_name", MyBgworkerEntry->bgw_name,
1339 
1340  /* Load the libpq-specific functions */
1341  load_file("libpqwalreceiver", false);
1342 
1345  "logical replication apply");
1346 
1347  /* Run as replica session replication role. */
1348  SetConfigOption("session_replication_role", "replica",
1350 
1351  /* Connect to our database. */
1354 
1355  /* Load the subscription into persistent memory context. */
1357  ApplyCacheContext = AllocSetContextCreate(CacheMemoryContext,
1358  "ApplyCacheContext",
1361  oldctx = MemoryContextSwitchTo(ApplyCacheContext);
1362  MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
1363  MySubscriptionValid = true;
1364  MemoryContextSwitchTo(oldctx);
1365 
1366  if (!MySubscription->enabled)
1367  {
1368  ereport(LOG,
1369  (errmsg("logical replication worker for subscription \"%s\" will not "
1370  "start because the subscription was disabled during startup",
1371  MySubscription->name)));
1372 
1373  proc_exit(0);
1374  }
1375 
1376  /* Keep us informed about subscription changes. */
1379  (Datum) 0);
1380 
1381  ereport(LOG,
1382  (errmsg("logical replication apply for subscription \"%s\" has started",
1383  MySubscription->name)));
1384 
1385  /* Setup replication origin tracking. */
1386  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1387  originid = replorigin_by_name(originname, true);
1388  if (!OidIsValid(originid))
1389  originid = replorigin_create(originname);
1390  replorigin_session_setup(originid);
1391  replorigin_session_origin = originid;
1392  origin_startpos = replorigin_session_get_progress(false);
1393 
1395 
1396  /* Connect to the origin and start the replication. */
1397  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1398  MySubscription->conninfo);
1399  wrconn = walrcv_connect(MySubscription->conninfo, true,
1400  MySubscription->name, &err);
1401  if (wrconn == NULL)
1402  ereport(ERROR,
1403  (errmsg("could not connect to the publisher: %s", err)));
1404 
1405  /*
1406  * We don't really use the output identify_system for anything
1407  * but it does some initializations on the upstream so let's still
1408  * call it.
1409  */
1410  (void) walrcv_identify_system(wrconn, &startpointTLI, &server_version);
1411 
1412  /* Build logical replication streaming options. */
1413  options.logical = true;
1414  options.startpoint = origin_startpos;
1415  options.slotname = MySubscription->slotname;
1416  options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1417  options.proto.logical.publication_names = MySubscription->publications;
1418 
1419  /* Start streaming from the slot. */
1420  walrcv_startstreaming(wrconn, &options);
1421 
1422  /* Run the main loop. */
1423  ApplyLoop();
1424 
1425  walrcv_disconnect(wrconn);
1426 
1427  /* We should only get here if we received SIGTERM */
1428  proc_exit(0);
1429 }
void ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
Subscription * MySubscription
Definition: worker.c:111
static void apply_handle_type(StringInfo s)
Definition: worker.c:475
#define NIL
Definition: pg_list.h:69
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:320
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:154
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:489
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, Relation partition_root, int instrument_options)
Definition: execMain.c:1219
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:1122
WalReceiverConn * wrconn
Definition: worker.c:109
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate)
Definition: execTuples.c:852
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
static void store_flush_position(XLogRecPtr remote_lsn)
Definition: worker.c:883
static MemoryContext ApplyCacheContext
Definition: worker.c:107
uint32 TimeLineID
Definition: xlogdefs.h:45
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:277
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4588
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
#define WL_TIMEOUT
Definition: latch.h:127
static void apply_handle_insert(StringInfo s)
Definition: worker.c:505
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static dlist_head lsn_mapping
Definition: worker.c:98
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:2870
#define RelationGetDescr(relation)
Definition: rel.h:425
dlist_node node
Definition: worker.c:93
#define write(a, b, c)
Definition: win32.h:19
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1688
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2805
PGPROC * MyProc
Definition: proc.c:67
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:57
TimestampTz IntegerTimestampToTimestampTz(int64 timestamp)
Definition: timestamp.c:1740
LogicalRepRelation * rel
Definition: worker.c:102
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
#define DatumGetObjectId(X)
Definition: postgres.h:508
void CommitTransactionCommand(void)
Definition: xact.c:2745
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:902
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:336
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
StringInfo makeStringInfo(void)
Definition: stringinfo.c:29
static void reread_subscription(void)
Definition: worker.c:1211
union WalRcvStreamOptions::@53 proto
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:223
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:324
Expr * expression_planner(Expr *expr)
Definition: planner.c:5212
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
Form_pg_attribute * attrs
Definition: tupdesc.h:74
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
TimestampTz last_send_time
#define InvalidBuffer
Definition: buf.h:25
uint16 RepOriginId
Definition: xlogdefs.h:51
XLogRecPtr last_lsn
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:219
void proc_exit(int code)
Definition: ipc.c:99
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1107
XLogRecPtr remote_end
Definition: worker.c:95
int errcode(int sqlerrcode)
Definition: elog.c:575
char * format_type_be(Oid type_oid)
Definition: format_type.c:94
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:367
BackgroundWorker * MyBgworkerEntry
Definition: postmaster.c:189
Datum * tts_values
Definition: tuptable.h:125
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define WL_SOCKET_READABLE
Definition: latch.h:125
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8174
void PopActiveSnapshot(void)
Definition: snapmgr.c:807
Oid logicalrep_typmap_getid(Oid remoteid)
Definition: relation.c:441
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:980
void ResetLatch(volatile Latch *latch)
Definition: latch.c:462
List * es_range_table
Definition: execnodes.h:372
#define LOG
Definition: elog.h:26
Form_pg_class rd_rel
Definition: rel.h:113
unsigned int Oid
Definition: postgres_ext.h:31
struct SlotErrCallbackArg SlotErrCallbackArg
int wal_receiver_status_interval
Definition: walreceiver.c:73
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1821
struct ErrorContextCallback * previous
Definition: elog.h:238
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:300
#define OidIsValid(objectId)
Definition: c.h:533
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
static int fd(const char *x, int i)
Definition: preproc-init.c:105
int natts
Definition: tupdesc.h:73
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:206
#define ALLOCSET_DEFAULT_MINSIZE
Definition: memutils.h:142
int wal_receiver_timeout
Definition: walreceiver.c:74
Latch procLatch
Definition: proc.h:93
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:839
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:149
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:3028
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:160
ErrorContextCallback * error_context_stack
Definition: elog.c:88
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:190
#define list_make1(x1)
Definition: pg_list.h:133
#define NAMEDATALEN
void FreeExecutorState(EState *estate)
Definition: execUtils.c:167
static XLogRecPtr endpos
Subscription * GetSubscription(Oid subid, bool missing_ok)
#define GetPerTupleExprContext(estate)
Definition: executor.h:338
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:165
static StringInfoData reply_message
Definition: walreceiver.c:110
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:1305
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execQual.c:4266
double TimestampTz
Definition: timestamp.h:51
void pfree(void *pointer)
Definition: mcxt.c:992
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
#define REPLICA_IDENTITY_FULL
Definition: pg_class.h:179
#define ExecEvalExpr(expr, econtext, isNull)
Definition: executor.h:73
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:59
#define ERROR
Definition: elog.h:43
static bool ensure_transaction(void)
Definition: worker.c:128
LogicalRepRelation remoterel
#define NAPTIME_PER_CYCLE
Definition: worker.c:89
bool in_remote_transaction
Definition: worker.c:114
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:376
Definition: guc.h:75
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:270
static volatile sig_atomic_t got_SIGTERM
Definition: autovacuum.c:141
XLogRecPtr reply_lsn
static void apply_handle_delete(StringInfo s)
Definition: worker.c:694
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:145
#define DEBUG2
Definition: elog.h:24
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:151
char * c
#define walrcv_identify_system(conn, primary_tli, server_version)
Definition: walreceiver.h:215
void logicalrep_worker_attach(int slot)
Definition: launcher.c:407
#define NoLock
Definition: lockdefs.h:34
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:6633
static char * buf
Definition: pg_test_fsync.c:65
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:728
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:152
bool changed[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
bool * tts_isnull
Definition: tuptable.h:126
void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
ResultRelInfo * es_result_relations
Definition: execnodes.h:381
#define RowExclusiveLock
Definition: lockdefs.h:38
#define RelationGetRelationName(relation)
Definition: rel.h:433
XLogRecPtr startpoint
Definition: walreceiver.h:144
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:184
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
unsigned int uint32
Definition: c.h:265
int pgsocket
Definition: port.h:22
List * publications
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
static void apply_handle_begin(StringInfo s)
Definition: worker.c:392
RepOriginId replorigin_create(char *roname)
Definition: origin.c:235
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid)
Definition: postmaster.c:5468
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:230
TupleTableSlot * es_trig_tuple_slot
Definition: execnodes.h:387
#define ereport(elevel, rest)
Definition: elog.h:122
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2567
void slot_getallattrs(TupleTableSlot *slot)
Definition: heaptuple.c:1239
MemoryContext TopMemoryContext
Definition: mcxt.c:43
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:339
EState * CreateExecutorState(void)
Definition: execUtils.c:72
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:552
static MemoryContext ApplyContext
Definition: worker.c:106
static char ** options
XLogRecPtr final_lsn
Definition: logicalproto.h:65
#define DLIST_STATIC_INIT(name)
Definition: ilist.h:248
static void apply_handle_commit(StringInfo s)
Definition: worker.c:412
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:88
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:121
Node * build_column_default(Relation rel, int attrno)
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
List * es_tupleTable
Definition: execnodes.h:398
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:440
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1379
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:156
static void apply_handle_update(StringInfo s)
Definition: worker.c:586
uintptr_t Datum
Definition: postgres.h:374
void CommandCounterIncrement(void)
Definition: xact.c:921
void ExecSetSlotDescriptor(TupleTableSlot *slot, TupleDesc tupdesc)
Definition: execTuples.c:247
#define PGINVALID_SOCKET
Definition: port.h:24
void logicalrep_worker_sigterm(SIGNAL_ARGS)
Definition: launcher.c:457
int es_num_result_relations
Definition: execnodes.h:382
void EvalPlanQualInit(EPQState *epqstate, EState *estate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2617
static void apply_handle_relation(StringInfo s)
Definition: worker.c:460
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:80
XLogRecPtr local_end
Definition: worker.c:94
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
static void apply_dispatch(StringInfo s)
Definition: worker.c:780
#define makeNode(_type_)
Definition: nodes.h:556
TimestampTz last_recv_time
#define NULL
Definition: c.h:226
uint64 XLogRecPtr
Definition: xlogdefs.h:21
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:90
#define Assert(condition)
Definition: c.h:670
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:321
RepOriginId replorigin_session_origin
Definition: origin.c:150
static void apply_handle_origin(StringInfo s)
Definition: worker.c:439
void StartTransactionCommand(void)
Definition: xact.c:2675
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:133
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
static int server_version
Definition: pg_dumpall.c:81
void CreateCacheMemoryContext(void)
Definition: catcache.c:525
static void slot_store_error_callback(void *arg)
Definition: worker.c:245
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:211
bool IsTransactionState(void)
Definition: xact.c:349
#define walrcv_disconnect(conn)
Definition: walreceiver.h:231
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:225
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:92
double timestamp
bool MySubscriptionValid
Definition: worker.c:112
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:33
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:343
void FreeSubscription(Subscription *sub)
RTEKind rtekind
Definition: parsenodes.h:882
static Datum values[MAXATTR]
Definition: bootstrap.c:162
void(* callback)(void *arg)
Definition: elog.h:239
static void slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values, bool *replaces)
Definition: worker.c:334
void * palloc(Size size)
Definition: mcxt.c:891
int errmsg(const char *fmt,...)
Definition: elog.c:797
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
#define ALLOCSET_DEFAULT_INITSIZE
Definition: memutils.h:143
int i
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
#define errcontext
Definition: elog.h:164
void * arg
struct FlushPosition FlushPosition
#define ALLOCSET_DEFAULT_MAXSIZE
Definition: memutils.h:144
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
HeapTuple tts_tuple
Definition: tuptable.h:120
#define elog
Definition: elog.h:219
TimestampTz committime
Definition: logicalproto.h:66
XLogRecPtr commit_lsn
Definition: logicalproto.h:72
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1997
#define WL_LATCH_SET
Definition: latch.h:124
static void ApplyLoop(void)
Definition: worker.c:918
#define RelationGetRelid(relation)
Definition: rel.h:413
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1652
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:224
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4567
void ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
TimestampTz committime
Definition: logicalproto.h:74
void logicalrep_typmap_update(LogicalRepTyp *remotetyp)
Definition: relation.c:410
void ApplyWorkerMain(Datum main_arg)
Definition: worker.c:1313
uint32 LogicalRepRelId
Definition: logicalproto.h:37
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:488
TimestampTz reply_time
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:228
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
MemoryContext CacheMemoryContext
Definition: mcxt.c:46
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5497
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:209
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:383