PostgreSQL Source Code  git master
worker.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * worker.c
3  * PostgreSQL logical replication worker (apply)
4  *
5  * Copyright (c) 2016-2017, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/worker.c
9  *
10  * NOTES
11  * This file contains the worker which applies logical changes as they come
12  * from remote logical replication stream.
13  *
14  * The main worker (apply) is started by logical replication worker
15  * launcher for every enabled subscription in a database. It uses
16  * walsender protocol to communicate with publisher.
17  *
18  * This module includes server facing code and shares libpqwalreceiver
19  * module with walreceiver for providing the libpq specific functionality.
20  *
21  *-------------------------------------------------------------------------
22  */
23 
24 #include "postgres.h"
25 
26 #include "miscadmin.h"
27 #include "pgstat.h"
28 #include "funcapi.h"
29 
30 #include "access/xact.h"
31 #include "access/xlog_internal.h"
32 
33 #include "catalog/namespace.h"
36 
37 #include "commands/trigger.h"
38 
39 #include "executor/executor.h"
41 
42 #include "libpq/pqformat.h"
43 #include "libpq/pqsignal.h"
44 
45 #include "mb/pg_wchar.h"
46 
47 #include "nodes/makefuncs.h"
48 
49 #include "optimizer/planner.h"
50 
51 #include "parser/parse_relation.h"
52 
53 #include "postmaster/bgworker.h"
54 #include "postmaster/postmaster.h"
55 #include "postmaster/walwriter.h"
56 
57 #include "replication/decode.h"
58 #include "replication/logical.h"
63 #include "replication/origin.h"
64 #include "replication/snapbuild.h"
67 
68 #include "rewrite/rewriteHandler.h"
69 
70 #include "storage/bufmgr.h"
71 #include "storage/ipc.h"
72 #include "storage/lmgr.h"
73 #include "storage/proc.h"
74 #include "storage/procarray.h"
75 
76 #include "tcop/tcopprot.h"
77 
78 #include "utils/builtins.h"
79 #include "utils/catcache.h"
80 #include "utils/datum.h"
81 #include "utils/fmgroids.h"
82 #include "utils/guc.h"
83 #include "utils/inval.h"
84 #include "utils/lsyscache.h"
85 #include "utils/memutils.h"
86 #include "utils/timeout.h"
87 #include "utils/tqual.h"
88 #include "utils/syscache.h"
89 
90 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
91 
92 typedef struct FlushPosition
93 {
98 
100 
101 typedef struct SlotErrCallbackArg
102 {
104  int attnum;
106 
109 
111 
113 bool MySubscriptionValid = false;
114 
117 
118 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
119 
120 static void store_flush_position(XLogRecPtr remote_lsn);
121 
122 static void maybe_reread_subscription(void);
123 
124 /* Flags set by signal handlers */
125 static volatile sig_atomic_t got_SIGHUP = false;
126 
127 /*
128  * Should this worker apply changes for given relation.
129  *
130  * This is mainly needed for initial relation data sync as that runs in
131  * separate worker process running in parallel and we need some way to skip
132  * changes coming to the main apply worker during the sync of a table.
133  *
134  * Note we need to do smaller or equals comparison for SYNCDONE state because
135  * it might hold position of end of initial slot consistent point WAL
136  * record + 1 (ie start of next record) and next record can be COMMIT of
137  * transaction we are now processing (which is what we set remote_final_lsn
138  * to in apply_handle_begin).
139  */
140 static bool
142 {
143  if (am_tablesync_worker())
144  return MyLogicalRepWorker->relid == rel->localreloid;
145  else
146  return (rel->state == SUBREL_STATE_READY ||
147  (rel->state == SUBREL_STATE_SYNCDONE &&
148  rel->statelsn <= remote_final_lsn));
149 }
150 
151 /*
152  * Make sure that we started local transaction.
153  *
154  * Also switches to ApplyMessageContext as necessary.
155  */
156 static bool
158 {
159  if (IsTransactionState())
160  {
162 
163  if (CurrentMemoryContext != ApplyMessageContext)
164  MemoryContextSwitchTo(ApplyMessageContext);
165 
166  return false;
167  }
168 
171 
173 
174  MemoryContextSwitchTo(ApplyMessageContext);
175  return true;
176 }
177 
178 
179 /*
180  * Executor state preparation for evaluation of constraint expressions,
181  * indexes and triggers.
182  *
183  * This is based on similar code in copy.c
184  */
185 static EState *
187 {
188  EState *estate;
189  ResultRelInfo *resultRelInfo;
190  RangeTblEntry *rte;
191 
192  estate = CreateExecutorState();
193 
194  rte = makeNode(RangeTblEntry);
195  rte->rtekind = RTE_RELATION;
196  rte->relid = RelationGetRelid(rel->localrel);
197  rte->relkind = rel->localrel->rd_rel->relkind;
198  estate->es_range_table = list_make1(rte);
199 
200  resultRelInfo = makeNode(ResultRelInfo);
201  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
202 
203  estate->es_result_relations = resultRelInfo;
204  estate->es_num_result_relations = 1;
205  estate->es_result_relation_info = resultRelInfo;
206 
207  estate->es_output_cid = GetCurrentCommandId(true);
208 
209  /* Triggers might need a slot */
210  if (resultRelInfo->ri_TrigDesc)
211  estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
212 
213  /* Prepare to catch AFTER triggers. */
215 
216  return estate;
217 }
218 
219 /*
220  * Executes default values for columns for which we can't map to remote
221  * relation columns.
222  *
223  * This allows us to support tables which have more columns on the downstream
224  * than on the upstream.
225  */
226 static void
228  TupleTableSlot *slot)
229 {
230  TupleDesc desc = RelationGetDescr(rel->localrel);
231  int num_phys_attrs = desc->natts;
232  int i;
233  int attnum,
234  num_defaults = 0;
235  int *defmap;
236  ExprState **defexprs;
237  ExprContext *econtext;
238 
239  econtext = GetPerTupleExprContext(estate);
240 
241  /* We got all the data via replication, no need to evaluate anything. */
242  if (num_phys_attrs == rel->remoterel.natts)
243  return;
244 
245  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
246  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
247 
248  for (attnum = 0; attnum < num_phys_attrs; attnum++)
249  {
250  Expr *defexpr;
251 
252  if (TupleDescAttr(desc, attnum)->attisdropped)
253  continue;
254 
255  if (rel->attrmap[attnum] >= 0)
256  continue;
257 
258  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
259 
260  if (defexpr != NULL)
261  {
262  /* Run the expression through planner */
263  defexpr = expression_planner(defexpr);
264 
265  /* Initialize executable expression in copycontext */
266  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
267  defmap[num_defaults] = attnum;
268  num_defaults++;
269  }
270 
271  }
272 
273  for (i = 0; i < num_defaults; i++)
274  slot->tts_values[defmap[i]] =
275  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
276 }
277 
278 /*
279  * Error callback to give more context info about type conversion failure.
280  */
281 static void
283 {
284  SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
285  Oid remotetypoid,
286  localtypoid;
287 
288  if (errarg->attnum < 0)
289  return;
290 
291  remotetypoid = errarg->rel->atttyps[errarg->attnum];
292  localtypoid = logicalrep_typmap_getid(remotetypoid);
293  errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
294  "remote type %s, local type %s",
295  errarg->rel->nspname, errarg->rel->relname,
296  errarg->rel->attnames[errarg->attnum],
297  format_type_be(remotetypoid),
298  format_type_be(localtypoid));
299 }
300 
301 /*
302  * Store data in C string form into slot.
303  * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
304  * use better.
305  */
306 static void
308  char **values)
309 {
310  int natts = slot->tts_tupleDescriptor->natts;
311  int i;
312  SlotErrCallbackArg errarg;
313  ErrorContextCallback errcallback;
314 
315  ExecClearTuple(slot);
316 
317  /* Push callback + info on the error context stack */
318  errarg.rel = &rel->remoterel;
319  errarg.attnum = -1;
320  errcallback.callback = slot_store_error_callback;
321  errcallback.arg = (void *) &errarg;
322  errcallback.previous = error_context_stack;
323  error_context_stack = &errcallback;
324 
325  /* Call the "in" function for each non-dropped attribute */
326  for (i = 0; i < natts; i++)
327  {
329  int remoteattnum = rel->attrmap[i];
330 
331  if (!att->attisdropped && remoteattnum >= 0 &&
332  values[remoteattnum] != NULL)
333  {
334  Oid typinput;
335  Oid typioparam;
336 
337  errarg.attnum = remoteattnum;
338 
339  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
340  slot->tts_values[i] = OidInputFunctionCall(typinput,
341  values[remoteattnum],
342  typioparam,
343  att->atttypmod);
344  slot->tts_isnull[i] = false;
345  }
346  else
347  {
348  /*
349  * We assign NULL to dropped attributes, NULL values, and missing
350  * values (missing values should be later filled using
351  * slot_fill_defaults).
352  */
353  slot->tts_values[i] = (Datum) 0;
354  slot->tts_isnull[i] = true;
355  }
356  }
357 
358  /* Pop the error context stack */
359  error_context_stack = errcallback.previous;
360 
361  ExecStoreVirtualTuple(slot);
362 }
363 
364 /*
365  * Modify slot with user data provided as C strings.
366  * This is somewhat similar to heap_modify_tuple but also calls the type
367  * input function on the user data as the input is the text representation
368  * of the types.
369  */
370 static void
372  char **values, bool *replaces)
373 {
374  int natts = slot->tts_tupleDescriptor->natts;
375  int i;
376  SlotErrCallbackArg errarg;
377  ErrorContextCallback errcallback;
378 
379  slot_getallattrs(slot);
380  ExecClearTuple(slot);
381 
382  /* Push callback + info on the error context stack */
383  errarg.rel = &rel->remoterel;
384  errarg.attnum = -1;
385  errcallback.callback = slot_store_error_callback;
386  errcallback.arg = (void *) &errarg;
387  errcallback.previous = error_context_stack;
388  error_context_stack = &errcallback;
389 
390  /* Call the "in" function for each replaced attribute */
391  for (i = 0; i < natts; i++)
392  {
394  int remoteattnum = rel->attrmap[i];
395 
396  if (remoteattnum < 0)
397  continue;
398 
399  if (!replaces[remoteattnum])
400  continue;
401 
402  if (values[remoteattnum] != NULL)
403  {
404  Oid typinput;
405  Oid typioparam;
406 
407  errarg.attnum = remoteattnum;
408 
409  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
410  slot->tts_values[i] = OidInputFunctionCall(typinput,
411  values[remoteattnum],
412  typioparam,
413  att->atttypmod);
414  slot->tts_isnull[i] = false;
415  }
416  else
417  {
418  slot->tts_values[i] = (Datum) 0;
419  slot->tts_isnull[i] = true;
420  }
421  }
422 
423  /* Pop the error context stack */
424  error_context_stack = errcallback.previous;
425 
426  ExecStoreVirtualTuple(slot);
427 }
428 
429 /*
430  * Handle BEGIN message.
431  */
432 static void
434 {
435  LogicalRepBeginData begin_data;
436 
437  logicalrep_read_begin(s, &begin_data);
438 
439  remote_final_lsn = begin_data.final_lsn;
440 
441  in_remote_transaction = true;
442 
444 }
445 
446 /*
447  * Handle COMMIT message.
448  *
449  * TODO, support tracking of multiple origins
450  */
451 static void
453 {
454  LogicalRepCommitData commit_data;
455 
456  logicalrep_read_commit(s, &commit_data);
457 
458  Assert(commit_data.commit_lsn == remote_final_lsn);
459 
460  /* The synchronization worker runs in single transaction. */
462  {
463  /*
464  * Update origin state so we can restart streaming from correct
465  * position in case of crash.
466  */
469 
471  pgstat_report_stat(false);
472 
473  store_flush_position(commit_data.end_lsn);
474  }
475  else
476  {
477  /* Process any invalidation messages that might have accumulated. */
480  }
481 
482  in_remote_transaction = false;
483 
484  /* Process any tables that are being synchronized in parallel. */
485  process_syncing_tables(commit_data.end_lsn);
486 
488 }
489 
490 /*
491  * Handle ORIGIN message.
492  *
493  * TODO, support tracking of multiple origins
494  */
495 static void
497 {
498  /*
499  * ORIGIN message can only come inside remote transaction and before any
500  * actual writes.
501  */
502  if (!in_remote_transaction ||
504  ereport(ERROR,
505  (errcode(ERRCODE_PROTOCOL_VIOLATION),
506  errmsg("ORIGIN message sent out of order")));
507 }
508 
509 /*
510  * Handle RELATION message.
511  *
512  * Note we don't do validation against local schema here. The validation
513  * against local schema is postponed until first change for given relation
514  * comes as we only care about it when applying changes for it anyway and we
515  * do less locking this way.
516  */
517 static void
519 {
520  LogicalRepRelation *rel;
521 
522  rel = logicalrep_read_rel(s);
524 }
525 
526 /*
527  * Handle TYPE message.
528  *
529  * Note we don't do local mapping here, that's done when the type is
530  * actually used.
531  */
532 static void
534 {
535  LogicalRepTyp typ;
536 
537  logicalrep_read_typ(s, &typ);
539 }
540 
541 /*
542  * Get replica identity index or if it is not defined a primary key.
543  *
544  * If neither is defined, returns InvalidOid
545  */
546 static Oid
548 {
549  Oid idxoid;
550 
551  idxoid = RelationGetReplicaIndex(rel);
552 
553  if (!OidIsValid(idxoid))
554  idxoid = RelationGetPrimaryKeyIndex(rel);
555 
556  return idxoid;
557 }
558 
559 /*
560  * Handle INSERT message.
561  */
562 static void
564 {
566  LogicalRepTupleData newtup;
567  LogicalRepRelId relid;
568  EState *estate;
569  TupleTableSlot *remoteslot;
570  MemoryContext oldctx;
571 
573 
574  relid = logicalrep_read_insert(s, &newtup);
577  {
578  /*
579  * The relation can't become interesting in the middle of the
580  * transaction so it's safe to unlock it.
581  */
583  return;
584  }
585 
586  /* Initialize the executor state. */
587  estate = create_estate_for_relation(rel);
588  remoteslot = ExecInitExtraTupleSlot(estate);
590 
591  /* Process and store remote tuple in the slot */
593  slot_store_cstrings(remoteslot, rel, newtup.values);
594  slot_fill_defaults(rel, estate, remoteslot);
595  MemoryContextSwitchTo(oldctx);
596 
599 
600  /* Do the insert. */
601  ExecSimpleRelationInsert(estate, remoteslot);
602 
603  /* Cleanup. */
606 
607  /* Handle queued AFTER triggers. */
608  AfterTriggerEndQuery(estate);
609 
610  ExecResetTupleTable(estate->es_tupleTable, false);
611  FreeExecutorState(estate);
612 
614 
616 }
617 
618 /*
619  * Check if the logical replication relation is updatable and throw
620  * appropriate error if it isn't.
621  */
622 static void
624 {
625  /* Updatable, no error. */
626  if (rel->updatable)
627  return;
628 
629  /*
630  * We are in error mode so it's fine this is somewhat slow. It's better to
631  * give user correct error.
632  */
634  {
635  ereport(ERROR,
636  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
637  errmsg("publisher did not send replica identity column "
638  "expected by the logical replication target relation \"%s.%s\"",
639  rel->remoterel.nspname, rel->remoterel.relname)));
640  }
641 
642  ereport(ERROR,
643  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
644  errmsg("logical replication target relation \"%s.%s\" has "
645  "neither REPLICA IDENTITY index nor PRIMARY "
646  "KEY and published relation does not have "
647  "REPLICA IDENTITY FULL",
648  rel->remoterel.nspname, rel->remoterel.relname)));
649 }
650 
651 /*
652  * Handle UPDATE message.
653  *
654  * TODO: FDW support
655  */
656 static void
658 {
660  LogicalRepRelId relid;
661  Oid idxoid;
662  EState *estate;
663  EPQState epqstate;
664  LogicalRepTupleData oldtup;
665  LogicalRepTupleData newtup;
666  bool has_oldtup;
667  TupleTableSlot *localslot;
668  TupleTableSlot *remoteslot;
669  bool found;
670  MemoryContext oldctx;
671 
673 
674  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
675  &newtup);
678  {
679  /*
680  * The relation can't become interesting in the middle of the
681  * transaction so it's safe to unlock it.
682  */
684  return;
685  }
686 
687  /* Check if we can do the update. */
689 
690  /* Initialize the executor state. */
691  estate = create_estate_for_relation(rel);
692  remoteslot = ExecInitExtraTupleSlot(estate);
694  localslot = ExecInitExtraTupleSlot(estate);
696  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
697 
700 
701  /* Build the search tuple. */
703  slot_store_cstrings(remoteslot, rel,
704  has_oldtup ? oldtup.values : newtup.values);
705  MemoryContextSwitchTo(oldctx);
706 
707  /*
708  * Try to find tuple using either replica identity index, primary key or
709  * if needed, sequential scan.
710  */
711  idxoid = GetRelationIdentityOrPK(rel->localrel);
712  Assert(OidIsValid(idxoid) ||
713  (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
714 
715  if (OidIsValid(idxoid))
716  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
718  remoteslot, localslot);
719  else
721  remoteslot, localslot);
722 
723  ExecClearTuple(remoteslot);
724 
725  /*
726  * Tuple found.
727  *
728  * Note this will fail if there are other conflicting unique indexes.
729  */
730  if (found)
731  {
732  /* Process and store remote tuple in the slot */
734  ExecStoreTuple(localslot->tts_tuple, remoteslot, InvalidBuffer, false);
735  slot_modify_cstrings(remoteslot, rel, newtup.values, newtup.changed);
736  MemoryContextSwitchTo(oldctx);
737 
738  EvalPlanQualSetSlot(&epqstate, remoteslot);
739 
740  /* Do the actual update. */
741  ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
742  }
743  else
744  {
745  /*
746  * The tuple to be updated could not be found.
747  *
748  * TODO what to do here, change the log level to LOG perhaps?
749  */
750  elog(DEBUG1,
751  "logical replication did not find row for update "
752  "in replication target relation \"%s\"",
754  }
755 
756  /* Cleanup. */
759 
760  /* Handle queued AFTER triggers. */
761  AfterTriggerEndQuery(estate);
762 
763  EvalPlanQualEnd(&epqstate);
764  ExecResetTupleTable(estate->es_tupleTable, false);
765  FreeExecutorState(estate);
766 
768 
770 }
771 
772 /*
773  * Handle DELETE message.
774  *
775  * TODO: FDW support
776  */
777 static void
779 {
781  LogicalRepTupleData oldtup;
782  LogicalRepRelId relid;
783  Oid idxoid;
784  EState *estate;
785  EPQState epqstate;
786  TupleTableSlot *remoteslot;
787  TupleTableSlot *localslot;
788  bool found;
789  MemoryContext oldctx;
790 
792 
793  relid = logicalrep_read_delete(s, &oldtup);
796  {
797  /*
798  * The relation can't become interesting in the middle of the
799  * transaction so it's safe to unlock it.
800  */
802  return;
803  }
804 
805  /* Check if we can do the delete. */
807 
808  /* Initialize the executor state. */
809  estate = create_estate_for_relation(rel);
810  remoteslot = ExecInitExtraTupleSlot(estate);
812  localslot = ExecInitExtraTupleSlot(estate);
814  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
815 
818 
819  /* Find the tuple using the replica identity index. */
821  slot_store_cstrings(remoteslot, rel, oldtup.values);
822  MemoryContextSwitchTo(oldctx);
823 
824  /*
825  * Try to find tuple using either replica identity index, primary key or
826  * if needed, sequential scan.
827  */
828  idxoid = GetRelationIdentityOrPK(rel->localrel);
829  Assert(OidIsValid(idxoid) ||
831 
832  if (OidIsValid(idxoid))
833  found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
835  remoteslot, localslot);
836  else
838  remoteslot, localslot);
839  /* If found delete it. */
840  if (found)
841  {
842  EvalPlanQualSetSlot(&epqstate, localslot);
843 
844  /* Do the actual delete. */
845  ExecSimpleRelationDelete(estate, &epqstate, localslot);
846  }
847  else
848  {
849  /* The tuple to be deleted could not be found. */
850  ereport(DEBUG1,
851  (errmsg("logical replication could not find row for delete "
852  "in replication target relation \"%s\"",
854  }
855 
856  /* Cleanup. */
859 
860  /* Handle queued AFTER triggers. */
861  AfterTriggerEndQuery(estate);
862 
863  EvalPlanQualEnd(&epqstate);
864  ExecResetTupleTable(estate->es_tupleTable, false);
865  FreeExecutorState(estate);
866 
868 
870 }
871 
872 
873 /*
874  * Logical replication protocol message dispatcher.
875  */
876 static void
878 {
879  char action = pq_getmsgbyte(s);
880 
881  switch (action)
882  {
883  /* BEGIN */
884  case 'B':
886  break;
887  /* COMMIT */
888  case 'C':
890  break;
891  /* INSERT */
892  case 'I':
894  break;
895  /* UPDATE */
896  case 'U':
898  break;
899  /* DELETE */
900  case 'D':
902  break;
903  /* RELATION */
904  case 'R':
906  break;
907  /* TYPE */
908  case 'Y':
910  break;
911  /* ORIGIN */
912  case 'O':
914  break;
915  default:
916  ereport(ERROR,
917  (errcode(ERRCODE_PROTOCOL_VIOLATION),
918  errmsg("invalid logical replication message type \"%c\"", action)));
919  }
920 }
921 
922 /*
923  * Figure out which write/flush positions to report to the walsender process.
924  *
925  * We can't simply report back the last LSN the walsender sent us because the
926  * local transaction might not yet be flushed to disk locally. Instead we
927  * build a list that associates local with remote LSNs for every commit. When
928  * reporting back the flush position to the sender we iterate that list and
929  * check which entries on it are already locally flushed. Those we can report
930  * as having been flushed.
931  *
932  * The have_pending_txes is true if there are outstanding transactions that
933  * need to be flushed.
934  */
935 static void
937  bool *have_pending_txes)
938 {
939  dlist_mutable_iter iter;
940  XLogRecPtr local_flush = GetFlushRecPtr();
941 
942  *write = InvalidXLogRecPtr;
943  *flush = InvalidXLogRecPtr;
944 
945  dlist_foreach_modify(iter, &lsn_mapping)
946  {
947  FlushPosition *pos =
949 
950  *write = pos->remote_end;
951 
952  if (pos->local_end <= local_flush)
953  {
954  *flush = pos->remote_end;
955  dlist_delete(iter.cur);
956  pfree(pos);
957  }
958  else
959  {
960  /*
961  * Don't want to uselessly iterate over the rest of the list which
962  * could potentially be long. Instead get the last element and
963  * grab the write position from there.
964  */
966  &lsn_mapping);
967  *write = pos->remote_end;
968  *have_pending_txes = true;
969  return;
970  }
971  }
972 
973  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
974 }
975 
976 /*
977  * Store current remote/local lsn pair in the tracking list.
978  */
979 static void
981 {
982  FlushPosition *flushpos;
983 
984  /* Need to do this in permanent context */
985  MemoryContextSwitchTo(ApplyContext);
986 
987  /* Track commit lsn */
988  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
989  flushpos->local_end = XactLastCommitEnd;
990  flushpos->remote_end = remote_lsn;
991 
992  dlist_push_tail(&lsn_mapping, &flushpos->node);
993  MemoryContextSwitchTo(ApplyMessageContext);
994 }
995 
996 
997 /* Update statistics of the worker. */
998 static void
1000 {
1002  MyLogicalRepWorker->last_send_time = send_time;
1004  if (reply)
1005  {
1007  MyLogicalRepWorker->reply_time = send_time;
1008  }
1009 }
1010 
1011 /*
1012  * Apply main loop.
1013  */
1014 static void
1016 {
1017  /*
1018  * Init the ApplyMessageContext which we clean up after each replication
1019  * protocol message.
1020  */
1021  ApplyMessageContext = AllocSetContextCreate(ApplyContext,
1022  "ApplyMessageContext",
1024 
1025  /* mark as idle, before starting to loop */
1027 
1028  for (;;)
1029  {
1031  int rc;
1032  int len;
1033  char *buf = NULL;
1034  bool endofstream = false;
1035  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1036  bool ping_sent = false;
1037  long wait_time;
1038 
1040 
1041  MemoryContextSwitchTo(ApplyMessageContext);
1042 
1043  len = walrcv_receive(wrconn, &buf, &fd);
1044 
1045  if (len != 0)
1046  {
1047  /* Process the data */
1048  for (;;)
1049  {
1051 
1052  if (len == 0)
1053  {
1054  break;
1055  }
1056  else if (len < 0)
1057  {
1058  ereport(LOG,
1059  (errmsg("data stream from publisher has ended")));
1060  endofstream = true;
1061  break;
1062  }
1063  else
1064  {
1065  int c;
1066  StringInfoData s;
1067 
1068  /* Reset timeout. */
1069  last_recv_timestamp = GetCurrentTimestamp();
1070  ping_sent = false;
1071 
1072  /* Ensure we are reading the data into our memory context. */
1073  MemoryContextSwitchTo(ApplyMessageContext);
1074 
1075  s.data = buf;
1076  s.len = len;
1077  s.cursor = 0;
1078  s.maxlen = -1;
1079 
1080  c = pq_getmsgbyte(&s);
1081 
1082  if (c == 'w')
1083  {
1084  XLogRecPtr start_lsn;
1085  XLogRecPtr end_lsn;
1086  TimestampTz send_time;
1087 
1088  start_lsn = pq_getmsgint64(&s);
1089  end_lsn = pq_getmsgint64(&s);
1090  send_time = pq_getmsgint64(&s);
1091 
1092  if (last_received < start_lsn)
1093  last_received = start_lsn;
1094 
1095  if (last_received < end_lsn)
1096  last_received = end_lsn;
1097 
1098  UpdateWorkerStats(last_received, send_time, false);
1099 
1100  apply_dispatch(&s);
1101  }
1102  else if (c == 'k')
1103  {
1104  XLogRecPtr end_lsn;
1106  bool reply_requested;
1107 
1108  end_lsn = pq_getmsgint64(&s);
1109  timestamp = pq_getmsgint64(&s);
1110  reply_requested = pq_getmsgbyte(&s);
1111 
1112  if (last_received < end_lsn)
1113  last_received = end_lsn;
1114 
1115  send_feedback(last_received, reply_requested, false);
1116  UpdateWorkerStats(last_received, timestamp, true);
1117  }
1118  /* other message types are purposefully ignored */
1119 
1120  MemoryContextReset(ApplyMessageContext);
1121  }
1122 
1123  len = walrcv_receive(wrconn, &buf, &fd);
1124  }
1125  }
1126 
1127  /* confirm all writes so far */
1128  send_feedback(last_received, false, false);
1129 
1130  if (!in_remote_transaction)
1131  {
1132  /*
1133  * If we didn't get any transactions for a while there might be
1134  * unconsumed invalidation messages in the queue, consume them
1135  * now.
1136  */
1139 
1140  /* Process any table synchronization changes. */
1141  process_syncing_tables(last_received);
1142  }
1143 
1144  /* Cleanup the memory. */
1145  MemoryContextResetAndDeleteChildren(ApplyMessageContext);
1147 
1148  /* Check if we need to exit the streaming loop. */
1149  if (endofstream)
1150  {
1151  TimeLineID tli;
1152 
1153  walrcv_endstreaming(wrconn, &tli);
1154  break;
1155  }
1156 
1157  /*
1158  * Wait for more data or latch. If we have unflushed transactions,
1159  * wake up after WalWriterDelay to see if they've been flushed yet (in
1160  * which case we should send a feedback message). Otherwise, there's
1161  * no particular urgency about waking up unless we get data or a
1162  * signal.
1163  */
1164  if (!dlist_is_empty(&lsn_mapping))
1165  wait_time = WalWriterDelay;
1166  else
1167  wait_time = NAPTIME_PER_CYCLE;
1168 
1172  fd, wait_time,
1174 
1175  /* Emergency bailout if postmaster has died */
1176  if (rc & WL_POSTMASTER_DEATH)
1177  proc_exit(1);
1178 
1179  if (rc & WL_LATCH_SET)
1180  {
1183  }
1184 
1185  if (got_SIGHUP)
1186  {
1187  got_SIGHUP = false;
1189  }
1190 
1191  if (rc & WL_TIMEOUT)
1192  {
1193  /*
1194  * We didn't receive anything new. If we haven't heard anything
1195  * from the server for more than wal_receiver_timeout / 2, ping
1196  * the server. Also, if it's been longer than
1197  * wal_receiver_status_interval since the last update we sent,
1198  * send a status update to the master anyway, to report any
1199  * progress in applying WAL.
1200  */
1201  bool requestReply = false;
1202 
1203  /*
1204  * Check if time since last receive from standby has reached the
1205  * configured limit.
1206  */
1207  if (wal_receiver_timeout > 0)
1208  {
1210  TimestampTz timeout;
1211 
1212  timeout =
1213  TimestampTzPlusMilliseconds(last_recv_timestamp,
1215 
1216  if (now >= timeout)
1217  ereport(ERROR,
1218  (errmsg("terminating logical replication worker due to timeout")));
1219 
1220  /*
1221  * We didn't receive anything new, for half of receiver
1222  * replication timeout. Ping the server.
1223  */
1224  if (!ping_sent)
1225  {
1226  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1227  (wal_receiver_timeout / 2));
1228  if (now >= timeout)
1229  {
1230  requestReply = true;
1231  ping_sent = true;
1232  }
1233  }
1234  }
1235 
1236  send_feedback(last_received, requestReply, requestReply);
1237  }
1238  }
1239 }
1240 
1241 /*
1242  * Send a Standby Status Update message to server.
1243  *
1244  * 'recvpos' is the latest LSN we've received data to, force is set if we need
1245  * to send a response to avoid timeouts.
1246  */
1247 static void
1248 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
1249 {
1250  static StringInfo reply_message = NULL;
1251  static TimestampTz send_time = 0;
1252 
1253  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1254  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1255  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1256 
1257  XLogRecPtr writepos;
1258  XLogRecPtr flushpos;
1259  TimestampTz now;
1260  bool have_pending_txes;
1261 
1262  /*
1263  * If the user doesn't want status to be reported to the publisher, be
1264  * sure to exit before doing anything at all.
1265  */
1266  if (!force && wal_receiver_status_interval <= 0)
1267  return;
1268 
1269  /* It's legal to not pass a recvpos */
1270  if (recvpos < last_recvpos)
1271  recvpos = last_recvpos;
1272 
1273  get_flush_position(&writepos, &flushpos, &have_pending_txes);
1274 
1275  /*
1276  * No outstanding transactions to flush, we can report the latest received
1277  * position. This is important for synchronous replication.
1278  */
1279  if (!have_pending_txes)
1280  flushpos = writepos = recvpos;
1281 
1282  if (writepos < last_writepos)
1283  writepos = last_writepos;
1284 
1285  if (flushpos < last_flushpos)
1286  flushpos = last_flushpos;
1287 
1288  now = GetCurrentTimestamp();
1289 
1290  /* if we've already reported everything we're good */
1291  if (!force &&
1292  writepos == last_writepos &&
1293  flushpos == last_flushpos &&
1294  !TimestampDifferenceExceeds(send_time, now,
1296  return;
1297  send_time = now;
1298 
1299  if (!reply_message)
1300  {
1301  MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
1302 
1303  reply_message = makeStringInfo();
1304  MemoryContextSwitchTo(oldctx);
1305  }
1306  else
1307  resetStringInfo(reply_message);
1308 
1309  pq_sendbyte(reply_message, 'r');
1310  pq_sendint64(reply_message, recvpos); /* write */
1311  pq_sendint64(reply_message, flushpos); /* flush */
1312  pq_sendint64(reply_message, writepos); /* apply */
1313  pq_sendint64(reply_message, now); /* sendTime */
1314  pq_sendbyte(reply_message, requestReply); /* replyRequested */
1315 
1316  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1317  force,
1318  (uint32) (recvpos >> 32), (uint32) recvpos,
1319  (uint32) (writepos >> 32), (uint32) writepos,
1320  (uint32) (flushpos >> 32), (uint32) flushpos
1321  );
1322 
1323  walrcv_send(wrconn, reply_message->data, reply_message->len);
1324 
1325  if (recvpos > last_recvpos)
1326  last_recvpos = recvpos;
1327  if (writepos > last_writepos)
1328  last_writepos = writepos;
1329  if (flushpos > last_flushpos)
1330  last_flushpos = flushpos;
1331 }
1332 
1333 /*
1334  * Reread subscription info if needed. Most changes will be exit.
1335  */
1336 static void
1338 {
1339  MemoryContext oldctx;
1341  bool started_tx = false;
1342 
1343  /* When cache state is valid there is nothing to do here. */
1344  if (MySubscriptionValid)
1345  return;
1346 
1347  /* This function might be called inside or outside of transaction. */
1348  if (!IsTransactionState())
1349  {
1351  started_tx = true;
1352  }
1353 
1354  /* Ensure allocations in permanent context. */
1355  oldctx = MemoryContextSwitchTo(ApplyContext);
1356 
1357  newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1358 
1359  /*
1360  * Exit if the subscription was removed. This normally should not happen
1361  * as the worker gets killed during DROP SUBSCRIPTION.
1362  */
1363  if (!newsub)
1364  {
1365  ereport(LOG,
1366  (errmsg("logical replication apply worker for subscription \"%s\" will "
1367  "stop because the subscription was removed",
1368  MySubscription->name)));
1369 
1370  proc_exit(0);
1371  }
1372 
1373  /*
1374  * Exit if the subscription was disabled. This normally should not happen
1375  * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
1376  */
1377  if (!newsub->enabled)
1378  {
1379  ereport(LOG,
1380  (errmsg("logical replication apply worker for subscription \"%s\" will "
1381  "stop because the subscription was disabled",
1382  MySubscription->name)));
1383 
1384  proc_exit(0);
1385  }
1386 
1387  /*
1388  * Exit if connection string was changed. The launcher will start new
1389  * worker.
1390  */
1391  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1392  {
1393  ereport(LOG,
1394  (errmsg("logical replication apply worker for subscription \"%s\" will "
1395  "restart because the connection information was changed",
1396  MySubscription->name)));
1397 
1398  proc_exit(0);
1399  }
1400 
1401  /*
1402  * Exit if subscription name was changed (it's used for
1403  * fallback_application_name). The launcher will start new worker.
1404  */
1405  if (strcmp(newsub->name, MySubscription->name) != 0)
1406  {
1407  ereport(LOG,
1408  (errmsg("logical replication apply worker for subscription \"%s\" will "
1409  "restart because subscription was renamed",
1410  MySubscription->name)));
1411 
1412  proc_exit(0);
1413  }
1414 
1415  /* !slotname should never happen when enabled is true. */
1416  Assert(newsub->slotname);
1417 
1418  /*
1419  * We need to make new connection to new slot if slot name has changed so
1420  * exit here as well if that's the case.
1421  */
1422  if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
1423  {
1424  ereport(LOG,
1425  (errmsg("logical replication apply worker for subscription \"%s\" will "
1426  "restart because the replication slot name was changed",
1427  MySubscription->name)));
1428 
1429  proc_exit(0);
1430  }
1431 
1432  /*
1433  * Exit if publication list was changed. The launcher will start new
1434  * worker.
1435  */
1436  if (!equal(newsub->publications, MySubscription->publications))
1437  {
1438  ereport(LOG,
1439  (errmsg("logical replication apply worker for subscription \"%s\" will "
1440  "restart because subscription's publications were changed",
1441  MySubscription->name)));
1442 
1443  proc_exit(0);
1444  }
1445 
1446  /* Check for other changes that should never happen too. */
1447  if (newsub->dbid != MySubscription->dbid)
1448  {
1449  elog(ERROR, "subscription %u changed unexpectedly",
1451  }
1452 
1453  /* Clean old subscription info and switch to new one. */
1454  FreeSubscription(MySubscription);
1455  MySubscription = newsub;
1456 
1457  MemoryContextSwitchTo(oldctx);
1458 
1459  /* Change synchronous commit according to the user's wishes */
1460  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1462 
1463  if (started_tx)
1465 
1466  MySubscriptionValid = true;
1467 }
1468 
1469 /*
1470  * Callback from subscription syscache invalidation.
1471  */
1472 static void
1473 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
1474 {
1475  MySubscriptionValid = false;
1476 }
1477 
1478 /* SIGHUP: set flag to reload configuration at next convenient time */
1479 static void
1481 {
1482  int save_errno = errno;
1483 
1484  got_SIGHUP = true;
1485 
1486  /* Waken anything waiting on the process latch */
1487  SetLatch(MyLatch);
1488 
1489  errno = save_errno;
1490 }
1491 
1492 /* Logical Replication Apply worker entry point */
1493 void
1495 {
1496  int worker_slot = DatumGetInt32(main_arg);
1497  MemoryContext oldctx;
1498  char originname[NAMEDATALEN];
1499  XLogRecPtr origin_startpos;
1500  char *myslotname;
1502 
1503  /* Attach to slot */
1504  logicalrep_worker_attach(worker_slot);
1505 
1506  /* Setup signal handling */
1508  pqsignal(SIGTERM, die);
1510 
1511  /* Initialise stats to a sanish value */
1514 
1515  /* Load the libpq-specific functions */
1516  load_file("libpqwalreceiver", false);
1517 
1518  Assert(CurrentResourceOwner == NULL);
1520  "logical replication apply");
1521 
1522  /* Run as replica session replication role. */
1523  SetConfigOption("session_replication_role", "replica",
1525 
1526  /* Connect to our database. */
1529 
1530  /* Load the subscription into persistent memory context. */
1531  ApplyContext = AllocSetContextCreate(TopMemoryContext,
1532  "ApplyContext",
1535  oldctx = MemoryContextSwitchTo(ApplyContext);
1536  MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
1537  MySubscriptionValid = true;
1538  MemoryContextSwitchTo(oldctx);
1539 
1540  /* Setup synchronous commit according to the user's wishes */
1541  SetConfigOption("synchronous_commit", MySubscription->synccommit,
1543 
1544  if (!MySubscription->enabled)
1545  {
1546  ereport(LOG,
1547  (errmsg("logical replication apply worker for subscription \"%s\" will not "
1548  "start because the subscription was disabled during startup",
1549  MySubscription->name)));
1550 
1551  proc_exit(0);
1552  }
1553 
1554  /* Keep us informed about subscription changes. */
1557  (Datum) 0);
1558 
1559  if (am_tablesync_worker())
1560  ereport(LOG,
1561  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
1562  MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
1563  else
1564  ereport(LOG,
1565  (errmsg("logical replication apply worker for subscription \"%s\" has started",
1566  MySubscription->name)));
1567 
1569 
1570  /* Connect to the origin and start the replication. */
1571  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1572  MySubscription->conninfo);
1573 
1574  if (am_tablesync_worker())
1575  {
1576  char *syncslotname;
1577 
1578  /* This is table synchroniation worker, call initial sync. */
1579  syncslotname = LogicalRepSyncTableStart(&origin_startpos);
1580 
1581  /* The slot name needs to be allocated in permanent memory context. */
1582  oldctx = MemoryContextSwitchTo(ApplyContext);
1583  myslotname = pstrdup(syncslotname);
1584  MemoryContextSwitchTo(oldctx);
1585 
1586  pfree(syncslotname);
1587  }
1588  else
1589  {
1590  /* This is main apply worker */
1591  RepOriginId originid;
1592  TimeLineID startpointTLI;
1593  char *err;
1594  int server_version;
1595 
1596  myslotname = MySubscription->slotname;
1597 
1598  /*
1599  * This shouldn't happen if the subscription is enabled, but guard
1600  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
1601  * crash if slot is NULL.)
1602  */
1603  if (!myslotname)
1604  ereport(ERROR,
1605  (errmsg("subscription has no replication slot set")));
1606 
1607  /* Setup replication origin tracking. */
1609  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1610  originid = replorigin_by_name(originname, true);
1611  if (!OidIsValid(originid))
1612  originid = replorigin_create(originname);
1613  replorigin_session_setup(originid);
1614  replorigin_session_origin = originid;
1615  origin_startpos = replorigin_session_get_progress(false);
1617 
1618  wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
1619  &err);
1620  if (wrconn == NULL)
1621  ereport(ERROR,
1622  (errmsg("could not connect to the publisher: %s", err)));
1623 
1624  /*
1625  * We don't really use the output identify_system for anything but it
1626  * does some initializations on the upstream so let's still call it.
1627  */
1628  (void) walrcv_identify_system(wrconn, &startpointTLI,
1629  &server_version);
1630 
1631  }
1632 
1633  /*
1634  * Setup callback for syscache so that we know when something changes in
1635  * the subscription relation state.
1636  */
1639  (Datum) 0);
1640 
1641  /* Build logical replication streaming options. */
1642  options.logical = true;
1643  options.startpoint = origin_startpos;
1644  options.slotname = myslotname;
1645  options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1646  options.proto.logical.publication_names = MySubscription->publications;
1647 
1648  /* Start normal logical streaming replication. */
1649  walrcv_startstreaming(wrconn, &options);
1650 
1651  /* Run the main loop. */
1652  LogicalRepApplyLoop(origin_startpos);
1653 
1654  proc_exit(0);
1655 }
1656 
1657 /*
1658  * Is current process a logical replication worker?
1659  */
1660 bool
1662 {
1663  return MyLogicalRepWorker != NULL;
1664 }
void ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
Subscription * MySubscription
Definition: worker.c:112
static void apply_handle_type(StringInfo s)
Definition: worker.c:533
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:141
#define NIL
Definition: pg_list.h:69
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:320
static EState * create_estate_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:186
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:547
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, Relation partition_root, int instrument_options)
Definition: execMain.c:1306
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:1248
WalReceiverConn * wrconn
Definition: worker.c:110
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:255
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate)
Definition: execTuples.c:852
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
static void store_flush_position(XLogRecPtr remote_lsn)
Definition: worker.c:980
uint32 TimeLineID
Definition: xlogdefs.h:45
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:277
void AcceptInvalidationMessages(void)
Definition: inval.c:679
CommandId es_output_cid
Definition: execnodes.h:439
static XLogRecPtr remote_final_lsn
Definition: worker.c:116
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4691
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
static void apply_handle_insert(StringInfo s)
Definition: worker.c:563
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
static dlist_head lsn_mapping
Definition: worker.c:99
#define DatumGetInt32(X)
Definition: postgres.h:478
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:2984
#define RelationGetDescr(relation)
Definition: rel.h:437
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:544
dlist_node node
Definition: worker.c:94
#define write(a, b, c)
Definition: win32.h:14
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2994
int64 timestamp
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:57
int64 TimestampTz
Definition: timestamp.h:39
LogicalRepRelation * rel
Definition: worker.c:103
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:90
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
char * pstrdup(const char *in)
Definition: mcxt.c:1076
void CommitTransactionCommand(void)
Definition: xact.c:2744
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:999
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:340
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
StringInfo makeStringInfo(void)
Definition: stringinfo.c:28
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:257
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:324
Expr * expression_planner(Expr *expr)
Definition: planner.c:6026
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
TimestampTz last_send_time
#define InvalidBuffer
Definition: buf.h:25
uint16 RepOriginId
Definition: xlogdefs.h:51
XLogRecPtr last_lsn
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:253
void proc_exit(int code)
Definition: ipc.c:99
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1148
static void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.h:156
XLogRecPtr remote_end
Definition: worker.c:96
int errcode(int sqlerrcode)
Definition: elog.c:575
char * format_type_be(Oid type_oid)
Definition: format_type.c:94
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:372
Datum * tts_values
Definition: tuptable.h:125
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define WL_SOCKET_READABLE
Definition: latch.h:125
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:135
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8254
void PopActiveSnapshot(void)
Definition: snapmgr.c:812
Oid logicalrep_typmap_getid(Oid remoteid)
Definition: relation.c:446
static void pq_sendbyte(StringInfo buf, int8 byt)
Definition: pqformat.h:164
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1013
void ResetLatch(volatile Latch *latch)
Definition: latch.c:497
List * es_range_table
Definition: execnodes.h:432
#define LOG
Definition: elog.h:26
Form_pg_class rd_rel
Definition: rel.h:114
unsigned int Oid
Definition: postgres_ext.h:31
struct SlotErrCallbackArg SlotErrCallbackArg
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:815
bool IsLogicalWorker(void)
Definition: worker.c:1661
void(* callback)(void *arg)
Definition: elog.h:239
int wal_receiver_status_interval
Definition: walreceiver.c:74
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1649
struct ErrorContextCallback * previous
Definition: elog.h:238
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:304
#define OidIsValid(objectId)
Definition: c.h:576
XLogRecPtr last_lsn
Definition: walsender.c:213
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
static int fd(const char *x, int i)
Definition: preproc-init.c:105
int natts
Definition: tupdesc.h:79
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:211
int wal_receiver_timeout
Definition: walreceiver.c:75
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:936
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:149
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:3208
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:160
ErrorContextCallback * error_context_stack
Definition: elog.c:88
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:227
#define list_make1(x1)
Definition: pg_list.h:139
#define NAMEDATALEN
void FreeExecutorState(EState *estate)
Definition: execUtils.c:186
Subscription * GetSubscription(Oid subid, bool missing_ok)
#define GetPerTupleExprContext(estate)
Definition: executor.h:467
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:158
static StringInfoData reply_message
Definition: walreceiver.c:111
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:1473
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:949
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
#define REPLICA_IDENTITY_FULL
Definition: pg_class.h:179
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:63
#define ERROR
Definition: elog.h:43
static bool ensure_transaction(void)
Definition: worker.c:157
LogicalRepRelation remoterel
#define NAPTIME_PER_CYCLE
Definition: worker.c:90
bool in_remote_transaction
Definition: worker.c:115
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:376
Definition: guc.h:75
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
Definition: worker.c:307
XLogRecPtr reply_lsn
static bool am_tablesync_worker(void)
static void apply_handle_delete(StringInfo s)
Definition: worker.c:778
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:170
#define DEBUG2
Definition: elog.h:24
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:156
char * c
#define walrcv_identify_system(conn, primary_tli, server_version)
Definition: walreceiver.h:249
void logicalrep_worker_attach(int slot)
Definition: launcher.c:612
#define NoLock
Definition: lockdefs.h:34
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:6700
static char * buf
Definition: pg_test_fsync.c:67
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:733
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:157
bool changed[MaxTupleAttributeNumber]
Definition: logicalproto.h:36
bool * tts_isnull
Definition: tuptable.h:126
void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:276
ResultRelInfo * es_result_relations
Definition: execnodes.h:442
#define RowExclusiveLock
Definition: lockdefs.h:38
#define SIGHUP
Definition: win32_port.h:163
#define RelationGetRelationName(relation)
Definition: rel.h:445
XLogRecPtr startpoint
Definition: walreceiver.h:146
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:187
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
unsigned int uint32
Definition: c.h:296
int pgsocket
Definition: port.h:31
List * publications
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
static void apply_handle_begin(StringInfo s)
Definition: worker.c:433
RepOriginId replorigin_create(char *roname)
Definition: origin.c:242
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid)
Definition: postmaster.c:5611
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:223
TupleTableSlot * es_trig_tuple_slot
Definition: execnodes.h:461
#define ereport(elevel, rest)
Definition: elog.h:122
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2632
void slot_getallattrs(TupleTableSlot *slot)
Definition: heaptuple.c:1238
MemoryContext TopMemoryContext
Definition: mcxt.c:43
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:367
EState * CreateExecutorState(void)
Definition: execUtils.c:81
Definition: guc.h:72
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:623
MemoryContext ApplyContext
Definition: worker.c:108
static char ** options
XLogRecPtr final_lsn
Definition: logicalproto.h:67
#define DLIST_STATIC_INIT(name)
Definition: ilist.h:248
static void apply_handle_commit(StringInfo s)
Definition: worker.c:452
union WalRcvStreamOptions::@97 proto
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:67
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:121
static void logicalrep_worker_sighup(SIGNAL_ARGS)
Definition: worker.c:1480
Node * build_column_default(Relation rel, int attrno)
#define SUBREL_STATE_SYNCDONE
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
List * es_tupleTable
Definition: execnodes.h:474
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:342
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1389
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:156
static void apply_handle_update(StringInfo s)
Definition: worker.c:657
uintptr_t Datum
Definition: postgres.h:372
void CommandCounterIncrement(void)
Definition: xact.c:915
void ExecSetSlotDescriptor(TupleTableSlot *slot, TupleDesc tupdesc)
Definition: execTuples.c:247
#define PGINVALID_SOCKET
Definition: port.h:33
int es_num_result_relations
Definition: execnodes.h:443
void EvalPlanQualInit(EPQState *epqstate, EState *estate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2794
static void apply_handle_relation(StringInfo s)
Definition: worker.c:518
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
XLogRecPtr local_end
Definition: worker.c:95
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4526
static void apply_dispatch(StringInfo s)
Definition: worker.c:877
static void maybe_reread_subscription(void)
Definition: worker.c:1337
#define makeNode(_type_)
Definition: nodes.h:560
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
TimestampTz last_recv_time
static MemoryContext ApplyMessageContext
Definition: worker.c:107
#define SIGNAL_ARGS
Definition: c.h:1066
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:670
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
RepOriginId replorigin_session_origin
Definition: origin.c:155
static void apply_handle_origin(StringInfo s)
Definition: worker.c:496
void StartTransactionCommand(void)
Definition: xact.c:2673
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:137
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
static int server_version
Definition: pg_dumpall.c:82
static void slot_store_error_callback(void *arg)
Definition: worker.c:282
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:211
bool IsTransactionState(void)
Definition: xact.c:351
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:259
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:92
int WalWriterDelay
Definition: walwriter.c:68
bool MySubscriptionValid
Definition: worker.c:113
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:472
void FreeSubscription(Subscription *sub)
RTEKind rtekind
Definition: parsenodes.h:951
static Datum values[MAXATTR]
Definition: bootstrap.c:164
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4546
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:733
static void slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values, bool *replaces)
Definition: worker.c:371
void * palloc(Size size)
Definition: mcxt.c:848
int errmsg(const char *fmt,...)
Definition: elog.c:797
void die(SIGNAL_ARGS)
Definition: postgres.c:2652
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:1015
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
int i
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
#define errcontext
Definition: elog.h:164
void * arg
struct Latch * MyLatch
Definition: globals.c:52
struct FlushPosition FlushPosition
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:113
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
HeapTuple tts_tuple
Definition: tuptable.h:120
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:680
#define elog
Definition: elog.h:219
XLogRecPtr commit_lsn
Definition: logicalproto.h:74
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1745
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1733
#define WL_LATCH_SET
Definition: latch.h:124
#define SUBREL_STATE_READY
#define RelationGetRelid(relation)
Definition: rel.h:425
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:224
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4670
void ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
TimestampTz committime
Definition: logicalproto.h:76
void logicalrep_typmap_update(LogicalRepTyp *remotetyp)
Definition: relation.c:415
void ApplyWorkerMain(Datum main_arg)
Definition: worker.c:1494
uint32 LogicalRepRelId
Definition: logicalproto.h:39
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:488
TimestampTz reply_time
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:212
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:271
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5640
void pgstat_report_stat(bool force)
Definition: pgstat.c:812
static volatile sig_atomic_t got_SIGHUP
Definition: worker.c:125
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:243
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:444