PostgreSQL Source Code  git master
worker.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * worker.c
3  * PostgreSQL logical replication worker (apply)
4  *
5  * Copyright (c) 2016-2022, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/worker.c
9  *
10  * NOTES
11  * This file contains the worker which applies logical changes as they come
12  * from remote logical replication stream.
13  *
14  * The main worker (apply) is started by logical replication worker
15  * launcher for every enabled subscription in a database. It uses
16  * walsender protocol to communicate with publisher.
17  *
18  * This module includes server facing code and shares libpqwalreceiver
19  * module with walreceiver for providing the libpq specific functionality.
20  *
21  *
22  * STREAMED TRANSACTIONS
23  * ---------------------
24  * Streamed transactions (large transactions exceeding a memory limit on the
25  * upstream) are not applied immediately, but instead, the data is written
26  * to temporary files and then applied at once when the final commit arrives.
27  *
28  * Unlike the regular (non-streamed) case, handling streamed transactions has
29  * to handle aborts of both the toplevel transaction and subtransactions. This
30  * is achieved by tracking offsets for subtransactions, which is then used
31  * to truncate the file with serialized changes.
32  *
33  * The files are placed in tmp file directory by default, and the filenames
34  * include both the XID of the toplevel transaction and OID of the
35  * subscription. This is necessary so that different workers processing a
36  * remote transaction with the same XID doesn't interfere.
37  *
38  * We use BufFiles instead of using normal temporary files because (a) the
39  * BufFile infrastructure supports temporary files that exceed the OS file size
40  * limit, (b) provides a way for automatic clean up on the error and (c) provides
41  * a way to survive these files across local transactions and allow to open and
42  * close at stream start and close. We decided to use FileSet
43  * infrastructure as without that it deletes the files on the closure of the
44  * file and if we decide to keep stream files open across the start/stop stream
45  * then it will consume a lot of memory (more than 8K for each BufFile and
46  * there could be multiple such BufFiles as the subscriber could receive
47  * multiple start/stop streams for different transactions before getting the
48  * commit). Moreover, if we don't use FileSet then we also need to invent
49  * a new way to pass filenames to BufFile APIs so that we are allowed to open
50  * the file we desired across multiple stream-open calls for the same
51  * transaction.
52  *
53  * TWO_PHASE TRANSACTIONS
54  * ----------------------
55  * Two phase transactions are replayed at prepare and then committed or
56  * rolled back at commit prepared and rollback prepared respectively. It is
57  * possible to have a prepared transaction that arrives at the apply worker
58  * when the tablesync is busy doing the initial copy. In this case, the apply
59  * worker skips all the prepared operations [e.g. inserts] while the tablesync
60  * is still busy (see the condition of should_apply_changes_for_rel). The
61  * tablesync worker might not get such a prepared transaction because say it
62  * was prior to the initial consistent point but might have got some later
63  * commits. Now, the tablesync worker will exit without doing anything for the
64  * prepared transaction skipped by the apply worker as the sync location for it
65  * will be already ahead of the apply worker's current location. This would lead
66  * to an "empty prepare", because later when the apply worker does the commit
67  * prepare, there is nothing in it (the inserts were skipped earlier).
68  *
69  * To avoid this, and similar prepare confusions the subscription's two_phase
70  * commit is enabled only after the initial sync is over. The two_phase option
71  * has been implemented as a tri-state with values DISABLED, PENDING, and
72  * ENABLED.
73  *
74  * Even if the user specifies they want a subscription with two_phase = on,
75  * internally it will start with a tri-state of PENDING which only becomes
76  * ENABLED after all tablesync initializations are completed - i.e. when all
77  * tablesync workers have reached their READY state. In other words, the value
78  * PENDING is only a temporary state for subscription start-up.
79  *
80  * Until the two_phase is properly available (ENABLED) the subscription will
81  * behave as if two_phase = off. When the apply worker detects that all
82  * tablesyncs have become READY (while the tri-state was PENDING) it will
83  * restart the apply worker process. This happens in
84  * process_syncing_tables_for_apply.
85  *
86  * When the (re-started) apply worker finds that all tablesyncs are READY for a
87  * two_phase tri-state of PENDING it start streaming messages with the
88  * two_phase option which in turn enables the decoding of two-phase commits at
89  * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
90  * Now, it is possible that during the time we have not enabled two_phase, the
91  * publisher (replication server) would have skipped some prepares but we
92  * ensure that such prepares are sent along with commit prepare, see
93  * ReorderBufferFinishPrepared.
94  *
95  * If the subscription has no tables then a two_phase tri-state PENDING is
96  * left unchanged. This lets the user still do an ALTER TABLE REFRESH
97  * PUBLICATION which might otherwise be disallowed (see below).
98  *
99  * If ever a user needs to be aware of the tri-state value, they can fetch it
100  * from the pg_subscription catalog (see column subtwophasestate).
101  *
102  * We don't allow to toggle two_phase option of a subscription because it can
103  * lead to an inconsistent replica. Consider, initially, it was on and we have
104  * received some prepare then we turn it off, now at commit time the server
105  * will send the entire transaction data along with the commit. With some more
106  * analysis, we can allow changing this option from off to on but not sure if
107  * that alone would be useful.
108  *
109  * Finally, to avoid problems mentioned in previous paragraphs from any
110  * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
111  * to 'off' and then again back to 'on') there is a restriction for
112  * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
113  * the two_phase tri-state is ENABLED, except when copy_data = false.
114  *
115  * We can get prepare of the same GID more than once for the genuine cases
116  * where we have defined multiple subscriptions for publications on the same
117  * server and prepared transaction has operations on tables subscribed to those
118  * subscriptions. For such cases, if we use the GID sent by publisher one of
119  * the prepares will be successful and others will fail, in which case the
120  * server will send them again. Now, this can lead to a deadlock if user has
121  * set synchronous_standby_names for all the subscriptions on subscriber. To
122  * avoid such deadlocks, we generate a unique GID (consisting of the
123  * subscription oid and the xid of the prepared transaction) for each prepare
124  * transaction on the subscriber.
125  *-------------------------------------------------------------------------
126  */
127 
128 #include "postgres.h"
129 
130 #include <sys/stat.h>
131 #include <unistd.h>
132 
133 #include "access/table.h"
134 #include "access/tableam.h"
135 #include "access/twophase.h"
136 #include "access/xact.h"
137 #include "access/xlog_internal.h"
138 #include "catalog/catalog.h"
139 #include "catalog/namespace.h"
140 #include "catalog/partition.h"
141 #include "catalog/pg_inherits.h"
142 #include "catalog/pg_subscription.h"
144 #include "catalog/pg_tablespace.h"
145 #include "commands/tablecmds.h"
146 #include "commands/tablespace.h"
147 #include "commands/trigger.h"
148 #include "executor/executor.h"
149 #include "executor/execPartition.h"
151 #include "funcapi.h"
152 #include "libpq/pqformat.h"
153 #include "libpq/pqsignal.h"
154 #include "mb/pg_wchar.h"
155 #include "miscadmin.h"
156 #include "nodes/makefuncs.h"
157 #include "optimizer/optimizer.h"
158 #include "pgstat.h"
159 #include "postmaster/bgworker.h"
160 #include "postmaster/interrupt.h"
161 #include "postmaster/postmaster.h"
162 #include "postmaster/walwriter.h"
163 #include "replication/decode.h"
164 #include "replication/logical.h"
168 #include "replication/origin.h"
170 #include "replication/snapbuild.h"
171 #include "replication/walreceiver.h"
173 #include "rewrite/rewriteHandler.h"
174 #include "storage/buffile.h"
175 #include "storage/bufmgr.h"
176 #include "storage/fd.h"
177 #include "storage/ipc.h"
178 #include "storage/lmgr.h"
179 #include "storage/proc.h"
180 #include "storage/procarray.h"
181 #include "tcop/tcopprot.h"
182 #include "utils/acl.h"
183 #include "utils/builtins.h"
184 #include "utils/catcache.h"
185 #include "utils/dynahash.h"
186 #include "utils/datum.h"
187 #include "utils/fmgroids.h"
188 #include "utils/guc.h"
189 #include "utils/inval.h"
190 #include "utils/lsyscache.h"
191 #include "utils/memutils.h"
192 #include "utils/rel.h"
193 #include "utils/rls.h"
194 #include "utils/syscache.h"
195 #include "utils/timeout.h"
196 
197 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
198 
199 typedef struct FlushPosition
200 {
205 
207 
208 typedef struct ApplyExecutionData
209 {
210  EState *estate; /* executor state, used to track resources */
211 
212  LogicalRepRelMapEntry *targetRel; /* replication target rel */
213  ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
214 
215  /* These fields are used when the target relation is partitioned: */
216  ModifyTableState *mtstate; /* dummy ModifyTable state */
217  PartitionTupleRouting *proute; /* partition routing info */
219 
220 /* Struct for saving and restoring apply errcontext information */
221 typedef struct ApplyErrorCallbackArg
222 {
223  LogicalRepMsgType command; /* 0 if invalid */
225 
226  /* Remote node information */
227  int remote_attnum; /* -1 if invalid */
229  TimestampTz ts; /* commit, rollback, or prepare timestamp */
231 
233 {
234  .command = 0,
235  .rel = NULL,
236  .remote_attnum = -1,
237  .remote_xid = InvalidTransactionId,
238  .ts = 0,
239 };
240 
243 
244 /* per stream context for streaming transactions */
246 
248 
250 bool MySubscriptionValid = false;
251 
254 
255 /* fields valid only when processing streamed transaction */
256 static bool in_streamed_transaction = false;
257 
259 
260 /* BufFile handle of the current streaming file */
261 static BufFile *stream_fd = NULL;
262 
263 typedef struct SubXactInfo
264 {
265  TransactionId xid; /* XID of the subxact */
266  int fileno; /* file number in the buffile */
267  off_t offset; /* offset in the file */
269 
270 /* Sub-transaction data for the current streaming transaction */
271 typedef struct ApplySubXactData
272 {
273  uint32 nsubxacts; /* number of sub-transactions */
274  uint32 nsubxacts_max; /* current capacity of subxacts */
275  TransactionId subxact_last; /* xid of the last sub-transaction */
276  SubXactInfo *subxacts; /* sub-xact offset in changes file */
278 
280 
281 static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
282 static inline void changes_filename(char *path, Oid subid, TransactionId xid);
283 
284 /*
285  * Information about subtransactions of a given toplevel transaction.
286  */
287 static void subxact_info_write(Oid subid, TransactionId xid);
288 static void subxact_info_read(Oid subid, TransactionId xid);
289 static void subxact_info_add(TransactionId xid);
290 static inline void cleanup_subxact_info(void);
291 
292 /*
293  * Serialize and deserialize changes for a toplevel transaction.
294  */
295 static void stream_cleanup_files(Oid subid, TransactionId xid);
296 static void stream_open_file(Oid subid, TransactionId xid, bool first);
297 static void stream_write_change(char action, StringInfo s);
298 static void stream_close_file(void);
299 
300 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
301 
302 static void store_flush_position(XLogRecPtr remote_lsn);
303 
304 static void maybe_reread_subscription(void);
305 
306 /* prototype needed because of stream_commit */
307 static void apply_dispatch(StringInfo s);
308 
309 static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
311  ResultRelInfo *relinfo,
312  TupleTableSlot *remoteslot);
314  ResultRelInfo *relinfo,
315  TupleTableSlot *remoteslot,
316  LogicalRepTupleData *newtup);
318  ResultRelInfo *relinfo,
319  TupleTableSlot *remoteslot);
320 static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
321  LogicalRepRelation *remoterel,
322  TupleTableSlot *remoteslot,
323  TupleTableSlot **localslot);
325  TupleTableSlot *remoteslot,
326  LogicalRepTupleData *newtup,
327  CmdType operation);
328 
329 /* Compute GID for two_phase transactions */
330 static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
331 
332 /* Common streaming function to apply all the spooled messages */
333 static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
334 
335 /* Functions for apply error callback */
336 static void apply_error_callback(void *arg);
337 static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts);
338 static inline void reset_apply_error_context_info(void);
339 
340 /*
341  * Should this worker apply changes for given relation.
342  *
343  * This is mainly needed for initial relation data sync as that runs in
344  * separate worker process running in parallel and we need some way to skip
345  * changes coming to the main apply worker during the sync of a table.
346  *
347  * Note we need to do smaller or equals comparison for SYNCDONE state because
348  * it might hold position of end of initial slot consistent point WAL
349  * record + 1 (ie start of next record) and next record can be COMMIT of
350  * transaction we are now processing (which is what we set remote_final_lsn
351  * to in apply_handle_begin).
352  */
353 static bool
355 {
356  if (am_tablesync_worker())
357  return MyLogicalRepWorker->relid == rel->localreloid;
358  else
359  return (rel->state == SUBREL_STATE_READY ||
360  (rel->state == SUBREL_STATE_SYNCDONE &&
361  rel->statelsn <= remote_final_lsn));
362 }
363 
364 /*
365  * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
366  *
367  * Start a transaction, if this is the first step (else we keep using the
368  * existing transaction).
369  * Also provide a global snapshot and ensure we run in ApplyMessageContext.
370  */
371 static void
373 {
375 
376  if (!IsTransactionState())
377  {
380  }
381 
383 
385 }
386 
387 /*
388  * Finish up one step of a replication transaction.
389  * Callers of begin_replication_step() must also call this.
390  *
391  * We don't close out the transaction here, but we should increment
392  * the command counter to make the effects of this step visible.
393  */
394 static void
396 {
398 
400 }
401 
402 /*
403  * Handle streamed transactions.
404  *
405  * If in streaming mode (receiving a block of streamed transaction), we
406  * simply redirect it to a file for the proper toplevel transaction.
407  *
408  * Returns true for streamed transactions, false otherwise (regular mode).
409  */
410 static bool
412 {
413  TransactionId xid;
414 
415  /* not in streaming mode */
417  return false;
418 
419  Assert(stream_fd != NULL);
421 
422  /*
423  * We should have received XID of the subxact as the first part of the
424  * message, so extract it.
425  */
426  xid = pq_getmsgint(s, 4);
427 
428  if (!TransactionIdIsValid(xid))
429  ereport(ERROR,
430  (errcode(ERRCODE_PROTOCOL_VIOLATION),
431  errmsg_internal("invalid transaction ID in streamed replication transaction")));
432 
433  /* Add the new subxact to the array (unless already there). */
434  subxact_info_add(xid);
435 
436  /* write the change to the current file */
438 
439  return true;
440 }
441 
442 /*
443  * Executor state preparation for evaluation of constraint expressions,
444  * indexes and triggers for the specified relation.
445  *
446  * Note that the caller must open and close any indexes to be updated.
447  */
448 static ApplyExecutionData *
450 {
451  ApplyExecutionData *edata;
452  EState *estate;
453  RangeTblEntry *rte;
454  ResultRelInfo *resultRelInfo;
455 
456  edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
457  edata->targetRel = rel;
458 
459  edata->estate = estate = CreateExecutorState();
460 
461  rte = makeNode(RangeTblEntry);
462  rte->rtekind = RTE_RELATION;
463  rte->relid = RelationGetRelid(rel->localrel);
464  rte->relkind = rel->localrel->rd_rel->relkind;
466  ExecInitRangeTable(estate, list_make1(rte));
467 
468  edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
469 
470  /*
471  * Use Relation opened by logicalrep_rel_open() instead of opening it
472  * again.
473  */
474  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
475 
476  /*
477  * We put the ResultRelInfo in the es_opened_result_relations list, even
478  * though we don't populate the es_result_relations array. That's a bit
479  * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
480  *
481  * ExecOpenIndices() is not called here either, each execution path doing
482  * an apply operation being responsible for that.
483  */
485  lappend(estate->es_opened_result_relations, resultRelInfo);
486 
487  estate->es_output_cid = GetCurrentCommandId(true);
488 
489  /* Prepare to catch AFTER triggers. */
491 
492  /* other fields of edata remain NULL for now */
493 
494  return edata;
495 }
496 
497 /*
498  * Finish any operations related to the executor state created by
499  * create_edata_for_relation().
500  */
501 static void
503 {
504  EState *estate = edata->estate;
505 
506  /* Handle any queued AFTER triggers. */
507  AfterTriggerEndQuery(estate);
508 
509  /* Shut down tuple routing, if any was done. */
510  if (edata->proute)
511  ExecCleanupTupleRouting(edata->mtstate, edata->proute);
512 
513  /*
514  * Cleanup. It might seem that we should call ExecCloseResultRelations()
515  * here, but we intentionally don't. It would close the rel we added to
516  * es_opened_result_relations above, which is wrong because we took no
517  * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
518  * any other relations opened during execution.
519  */
520  ExecResetTupleTable(estate->es_tupleTable, false);
521  FreeExecutorState(estate);
522  pfree(edata);
523 }
524 
525 /*
526  * Executes default values for columns for which we can't map to remote
527  * relation columns.
528  *
529  * This allows us to support tables which have more columns on the downstream
530  * than on the upstream.
531  */
532 static void
534  TupleTableSlot *slot)
535 {
536  TupleDesc desc = RelationGetDescr(rel->localrel);
537  int num_phys_attrs = desc->natts;
538  int i;
539  int attnum,
540  num_defaults = 0;
541  int *defmap;
542  ExprState **defexprs;
543  ExprContext *econtext;
544 
545  econtext = GetPerTupleExprContext(estate);
546 
547  /* We got all the data via replication, no need to evaluate anything. */
548  if (num_phys_attrs == rel->remoterel.natts)
549  return;
550 
551  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
552  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
553 
554  Assert(rel->attrmap->maplen == num_phys_attrs);
555  for (attnum = 0; attnum < num_phys_attrs; attnum++)
556  {
557  Expr *defexpr;
558 
559  if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
560  continue;
561 
562  if (rel->attrmap->attnums[attnum] >= 0)
563  continue;
564 
565  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
566 
567  if (defexpr != NULL)
568  {
569  /* Run the expression through planner */
570  defexpr = expression_planner(defexpr);
571 
572  /* Initialize executable expression in copycontext */
573  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
574  defmap[num_defaults] = attnum;
575  num_defaults++;
576  }
577 
578  }
579 
580  for (i = 0; i < num_defaults; i++)
581  slot->tts_values[defmap[i]] =
582  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
583 }
584 
585 /*
586  * Store tuple data into slot.
587  *
588  * Incoming data can be either text or binary format.
589  */
590 static void
592  LogicalRepTupleData *tupleData)
593 {
594  int natts = slot->tts_tupleDescriptor->natts;
595  int i;
596 
597  ExecClearTuple(slot);
598 
599  /* Call the "in" function for each non-dropped, non-null attribute */
600  Assert(natts == rel->attrmap->maplen);
601  for (i = 0; i < natts; i++)
602  {
604  int remoteattnum = rel->attrmap->attnums[i];
605 
606  if (!att->attisdropped && remoteattnum >= 0)
607  {
608  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
609 
610  Assert(remoteattnum < tupleData->ncols);
611 
612  /* Set attnum for error callback */
614 
615  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
616  {
617  Oid typinput;
618  Oid typioparam;
619 
620  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
621  slot->tts_values[i] =
622  OidInputFunctionCall(typinput, colvalue->data,
623  typioparam, att->atttypmod);
624  slot->tts_isnull[i] = false;
625  }
626  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
627  {
628  Oid typreceive;
629  Oid typioparam;
630 
631  /*
632  * In some code paths we may be asked to re-parse the same
633  * tuple data. Reset the StringInfo's cursor so that works.
634  */
635  colvalue->cursor = 0;
636 
637  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
638  slot->tts_values[i] =
639  OidReceiveFunctionCall(typreceive, colvalue,
640  typioparam, att->atttypmod);
641 
642  /* Trouble if it didn't eat the whole buffer */
643  if (colvalue->cursor != colvalue->len)
644  ereport(ERROR,
645  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
646  errmsg("incorrect binary data format in logical replication column %d",
647  remoteattnum + 1)));
648  slot->tts_isnull[i] = false;
649  }
650  else
651  {
652  /*
653  * NULL value from remote. (We don't expect to see
654  * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
655  * NULL.)
656  */
657  slot->tts_values[i] = (Datum) 0;
658  slot->tts_isnull[i] = true;
659  }
660 
661  /* Reset attnum for error callback */
663  }
664  else
665  {
666  /*
667  * We assign NULL to dropped attributes and missing values
668  * (missing values should be later filled using
669  * slot_fill_defaults).
670  */
671  slot->tts_values[i] = (Datum) 0;
672  slot->tts_isnull[i] = true;
673  }
674  }
675 
676  ExecStoreVirtualTuple(slot);
677 }
678 
679 /*
680  * Replace updated columns with data from the LogicalRepTupleData struct.
681  * This is somewhat similar to heap_modify_tuple but also calls the type
682  * input functions on the user data.
683  *
684  * "slot" is filled with a copy of the tuple in "srcslot", replacing
685  * columns provided in "tupleData" and leaving others as-is.
686  *
687  * Caution: unreplaced pass-by-ref columns in "slot" will point into the
688  * storage for "srcslot". This is OK for current usage, but someday we may
689  * need to materialize "slot" at the end to make it independent of "srcslot".
690  */
691 static void
694  LogicalRepTupleData *tupleData)
695 {
696  int natts = slot->tts_tupleDescriptor->natts;
697  int i;
698 
699  /* We'll fill "slot" with a virtual tuple, so we must start with ... */
700  ExecClearTuple(slot);
701 
702  /*
703  * Copy all the column data from srcslot, so that we'll have valid values
704  * for unreplaced columns.
705  */
706  Assert(natts == srcslot->tts_tupleDescriptor->natts);
707  slot_getallattrs(srcslot);
708  memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
709  memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
710 
711  /* Call the "in" function for each replaced attribute */
712  Assert(natts == rel->attrmap->maplen);
713  for (i = 0; i < natts; i++)
714  {
716  int remoteattnum = rel->attrmap->attnums[i];
717 
718  if (remoteattnum < 0)
719  continue;
720 
721  Assert(remoteattnum < tupleData->ncols);
722 
723  if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
724  {
725  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
726 
727  /* Set attnum for error callback */
729 
730  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
731  {
732  Oid typinput;
733  Oid typioparam;
734 
735  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
736  slot->tts_values[i] =
737  OidInputFunctionCall(typinput, colvalue->data,
738  typioparam, att->atttypmod);
739  slot->tts_isnull[i] = false;
740  }
741  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
742  {
743  Oid typreceive;
744  Oid typioparam;
745 
746  /*
747  * In some code paths we may be asked to re-parse the same
748  * tuple data. Reset the StringInfo's cursor so that works.
749  */
750  colvalue->cursor = 0;
751 
752  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
753  slot->tts_values[i] =
754  OidReceiveFunctionCall(typreceive, colvalue,
755  typioparam, att->atttypmod);
756 
757  /* Trouble if it didn't eat the whole buffer */
758  if (colvalue->cursor != colvalue->len)
759  ereport(ERROR,
760  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
761  errmsg("incorrect binary data format in logical replication column %d",
762  remoteattnum + 1)));
763  slot->tts_isnull[i] = false;
764  }
765  else
766  {
767  /* must be LOGICALREP_COLUMN_NULL */
768  slot->tts_values[i] = (Datum) 0;
769  slot->tts_isnull[i] = true;
770  }
771 
772  /* Reset attnum for error callback */
774  }
775  }
776 
777  /* And finally, declare that "slot" contains a valid virtual tuple */
778  ExecStoreVirtualTuple(slot);
779 }
780 
781 /*
782  * Handle BEGIN message.
783  */
784 static void
786 {
787  LogicalRepBeginData begin_data;
788 
789  logicalrep_read_begin(s, &begin_data);
790  set_apply_error_context_xact(begin_data.xid, begin_data.committime);
791 
792  remote_final_lsn = begin_data.final_lsn;
793 
794  in_remote_transaction = true;
795 
797 }
798 
799 /*
800  * Handle COMMIT message.
801  *
802  * TODO, support tracking of multiple origins
803  */
804 static void
806 {
807  LogicalRepCommitData commit_data;
808 
809  logicalrep_read_commit(s, &commit_data);
810 
811  if (commit_data.commit_lsn != remote_final_lsn)
812  ereport(ERROR,
813  (errcode(ERRCODE_PROTOCOL_VIOLATION),
814  errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
815  LSN_FORMAT_ARGS(commit_data.commit_lsn),
817 
818  apply_handle_commit_internal(&commit_data);
819 
820  /* Process any tables that are being synchronized in parallel. */
821  process_syncing_tables(commit_data.end_lsn);
822 
825 }
826 
827 /*
828  * Handle BEGIN PREPARE message.
829  */
830 static void
832 {
833  LogicalRepPreparedTxnData begin_data;
834 
835  /* Tablesync should never receive prepare. */
836  if (am_tablesync_worker())
837  ereport(ERROR,
838  (errcode(ERRCODE_PROTOCOL_VIOLATION),
839  errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
840 
841  logicalrep_read_begin_prepare(s, &begin_data);
842  set_apply_error_context_xact(begin_data.xid, begin_data.prepare_time);
843 
844  remote_final_lsn = begin_data.prepare_lsn;
845 
846  in_remote_transaction = true;
847 
849 }
850 
851 /*
852  * Common function to prepare the GID.
853  */
854 static void
856 {
857  char gid[GIDSIZE];
858 
859  /*
860  * Compute unique GID for two_phase transactions. We don't use GID of
861  * prepared transaction sent by server as that can lead to deadlock when
862  * we have multiple subscriptions from same node point to publications on
863  * the same node. See comments atop worker.c
864  */
866  gid, sizeof(gid));
867 
868  /*
869  * BeginTransactionBlock is necessary to balance the EndTransactionBlock
870  * called within the PrepareTransactionBlock below.
871  */
873  CommitTransactionCommand(); /* Completes the preceding Begin command. */
874 
875  /*
876  * Update origin state so we can restart streaming from correct position
877  * in case of crash.
878  */
879  replorigin_session_origin_lsn = prepare_data->end_lsn;
881 
883 }
884 
885 /*
886  * Handle PREPARE message.
887  */
888 static void
890 {
891  LogicalRepPreparedTxnData prepare_data;
892 
893  logicalrep_read_prepare(s, &prepare_data);
894 
895  if (prepare_data.prepare_lsn != remote_final_lsn)
896  ereport(ERROR,
897  (errcode(ERRCODE_PROTOCOL_VIOLATION),
898  errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
899  LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
901 
902  /*
903  * Unlike commit, here, we always prepare the transaction even though no
904  * change has happened in this transaction. It is done this way because at
905  * commit prepared time, we won't know whether we have skipped preparing a
906  * transaction because of no change.
907  *
908  * XXX, We can optimize such that at commit prepared time, we first check
909  * whether we have prepared the transaction or not but that doesn't seem
910  * worthwhile because such cases shouldn't be common.
911  */
913 
914  apply_handle_prepare_internal(&prepare_data);
915 
918  pgstat_report_stat(false);
919 
920  store_flush_position(prepare_data.end_lsn);
921 
922  in_remote_transaction = false;
923 
924  /* Process any tables that are being synchronized in parallel. */
925  process_syncing_tables(prepare_data.end_lsn);
926 
929 }
930 
931 /*
932  * Handle a COMMIT PREPARED of a previously PREPARED transaction.
933  */
934 static void
936 {
937  LogicalRepCommitPreparedTxnData prepare_data;
938  char gid[GIDSIZE];
939 
940  logicalrep_read_commit_prepared(s, &prepare_data);
941  set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_time);
942 
943  /* Compute GID for two_phase transactions. */
945  gid, sizeof(gid));
946 
947  /* There is no transaction when COMMIT PREPARED is called */
949 
950  /*
951  * Update origin state so we can restart streaming from correct position
952  * in case of crash.
953  */
954  replorigin_session_origin_lsn = prepare_data.end_lsn;
956 
957  FinishPreparedTransaction(gid, true);
960  pgstat_report_stat(false);
961 
962  store_flush_position(prepare_data.end_lsn);
963  in_remote_transaction = false;
964 
965  /* Process any tables that are being synchronized in parallel. */
966  process_syncing_tables(prepare_data.end_lsn);
967 
970 }
971 
972 /*
973  * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
974  */
975 static void
977 {
978  LogicalRepRollbackPreparedTxnData rollback_data;
979  char gid[GIDSIZE];
980 
981  logicalrep_read_rollback_prepared(s, &rollback_data);
982  set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_time);
983 
984  /* Compute GID for two_phase transactions. */
986  gid, sizeof(gid));
987 
988  /*
989  * It is possible that we haven't received prepare because it occurred
990  * before walsender reached a consistent point or the two_phase was still
991  * not enabled by that time, so in such cases, we need to skip rollback
992  * prepared.
993  */
994  if (LookupGXact(gid, rollback_data.prepare_end_lsn,
995  rollback_data.prepare_time))
996  {
997  /*
998  * Update origin state so we can restart streaming from correct
999  * position in case of crash.
1000  */
1003 
1004  /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1006  FinishPreparedTransaction(gid, false);
1009  }
1010 
1011  pgstat_report_stat(false);
1012 
1013  store_flush_position(rollback_data.rollback_end_lsn);
1014  in_remote_transaction = false;
1015 
1016  /* Process any tables that are being synchronized in parallel. */
1018 
1021 }
1022 
1023 /*
1024  * Handle STREAM PREPARE.
1025  *
1026  * Logic is in two parts:
1027  * 1. Replay all the spooled operations
1028  * 2. Mark the transaction as prepared
1029  */
1030 static void
1032 {
1033  LogicalRepPreparedTxnData prepare_data;
1034 
1036  ereport(ERROR,
1037  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1038  errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1039 
1040  /* Tablesync should never receive prepare. */
1041  if (am_tablesync_worker())
1042  ereport(ERROR,
1043  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1044  errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1045 
1046  logicalrep_read_stream_prepare(s, &prepare_data);
1047  set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_time);
1048 
1049  elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
1050 
1051  /* Replay all the spooled operations. */
1052  apply_spooled_messages(prepare_data.xid, prepare_data.prepare_lsn);
1053 
1054  /* Mark the transaction as prepared. */
1055  apply_handle_prepare_internal(&prepare_data);
1056 
1058 
1059  pgstat_report_stat(false);
1060 
1061  store_flush_position(prepare_data.end_lsn);
1062 
1063  in_remote_transaction = false;
1064 
1065  /* unlink the files with serialized changes and subxact info. */
1067 
1068  /* Process any tables that are being synchronized in parallel. */
1069  process_syncing_tables(prepare_data.end_lsn);
1070 
1072 
1074 }
1075 
1076 /*
1077  * Handle ORIGIN message.
1078  *
1079  * TODO, support tracking of multiple origins
1080  */
1081 static void
1083 {
1084  /*
1085  * ORIGIN message can only come inside streaming transaction or inside
1086  * remote transaction and before any actual writes.
1087  */
1088  if (!in_streamed_transaction &&
1091  ereport(ERROR,
1092  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1093  errmsg_internal("ORIGIN message sent out of order")));
1094 }
1095 
1096 /*
1097  * Handle STREAM START message.
1098  */
1099 static void
1101 {
1102  bool first_segment;
1103 
1105  ereport(ERROR,
1106  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1107  errmsg_internal("duplicate STREAM START message")));
1108 
1109  /*
1110  * Start a transaction on stream start, this transaction will be committed
1111  * on the stream stop unless it is a tablesync worker in which case it
1112  * will be committed after processing all the messages. We need the
1113  * transaction for handling the buffile, used for serializing the
1114  * streaming data and subxact info.
1115  */
1117 
1118  /* notify handle methods we're processing a remote transaction */
1119  in_streamed_transaction = true;
1120 
1121  /* extract XID of the top-level transaction */
1122  stream_xid = logicalrep_read_stream_start(s, &first_segment);
1123 
1125  ereport(ERROR,
1126  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1127  errmsg_internal("invalid transaction ID in streamed replication transaction")));
1128 
1130 
1131  /*
1132  * Initialize the worker's stream_fileset if we haven't yet. This will be
1133  * used for the entire duration of the worker so create it in a permanent
1134  * context. We create this on the very first streaming message from any
1135  * transaction and then use it for this and other streaming transactions.
1136  * Now, we could create a fileset at the start of the worker as well but
1137  * then we won't be sure that it will ever be used.
1138  */
1139  if (MyLogicalRepWorker->stream_fileset == NULL)
1140  {
1141  MemoryContext oldctx;
1142 
1144 
1147 
1148  MemoryContextSwitchTo(oldctx);
1149  }
1150 
1151  /* open the spool file for this transaction */
1153 
1154  /* if this is not the first segment, open existing subxact file */
1155  if (!first_segment)
1157 
1159 
1161 }
1162 
1163 /*
1164  * Handle STREAM STOP message.
1165  */
1166 static void
1168 {
1170  ereport(ERROR,
1171  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1172  errmsg_internal("STREAM STOP message without STREAM START")));
1173 
1174  /*
1175  * Close the file with serialized changes, and serialize information about
1176  * subxacts for the toplevel transaction.
1177  */
1180 
1181  /* We must be in a valid transaction state */
1183 
1184  /* Commit the per-stream transaction */
1186 
1187  in_streamed_transaction = false;
1188 
1189  /* Reset per-stream context */
1191 
1194 }
1195 
1196 /*
1197  * Handle STREAM abort message.
1198  */
1199 static void
1201 {
1202  TransactionId xid;
1203  TransactionId subxid;
1204 
1206  ereport(ERROR,
1207  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1208  errmsg_internal("STREAM ABORT message without STREAM STOP")));
1209 
1210  logicalrep_read_stream_abort(s, &xid, &subxid);
1211 
1212  /*
1213  * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1214  * just delete the files with serialized info.
1215  */
1216  if (xid == subxid)
1217  {
1220  }
1221  else
1222  {
1223  /*
1224  * OK, so it's a subxact. We need to read the subxact file for the
1225  * toplevel transaction, determine the offset tracked for the subxact,
1226  * and truncate the file with changes. We also remove the subxacts
1227  * with higher offsets (or rather higher XIDs).
1228  *
1229  * We intentionally scan the array from the tail, because we're likely
1230  * aborting a change for the most recent subtransactions.
1231  *
1232  * We can't use the binary search here as subxact XIDs won't
1233  * necessarily arrive in sorted order, consider the case where we have
1234  * released the savepoint for multiple subtransactions and then
1235  * performed rollback to savepoint for one of the earlier
1236  * sub-transaction.
1237  */
1238  int64 i;
1239  int64 subidx;
1240  BufFile *fd;
1241  bool found = false;
1242  char path[MAXPGPATH];
1243 
1244  set_apply_error_context_xact(subxid, 0);
1245 
1246  subidx = -1;
1249 
1250  for (i = subxact_data.nsubxacts; i > 0; i--)
1251  {
1252  if (subxact_data.subxacts[i - 1].xid == subxid)
1253  {
1254  subidx = (i - 1);
1255  found = true;
1256  break;
1257  }
1258  }
1259 
1260  /*
1261  * If it's an empty sub-transaction then we will not find the subxid
1262  * here so just cleanup the subxact info and return.
1263  */
1264  if (!found)
1265  {
1266  /* Cleanup the subxact info */
1271  return;
1272  }
1273 
1274  /* open the changes file */
1277  O_RDWR, false);
1278 
1279  /* OK, truncate the file at the right offset */
1281  subxact_data.subxacts[subidx].offset);
1282  BufFileClose(fd);
1283 
1284  /* discard the subxacts added later */
1285  subxact_data.nsubxacts = subidx;
1286 
1287  /* write the updated subxact list */
1289 
1292  }
1293 
1295 }
1296 
1297 /*
1298  * Common spoolfile processing.
1299  */
1300 static void
1302 {
1304  int nchanges;
1305  char path[MAXPGPATH];
1306  char *buffer = NULL;
1307  MemoryContext oldcxt;
1308  BufFile *fd;
1309 
1310  /* Make sure we have an open transaction */
1312 
1313  /*
1314  * Allocate file handle and memory required to process all the messages in
1315  * TopTransactionContext to avoid them getting reset after each message is
1316  * processed.
1317  */
1319 
1320  /* Open the spool file for the committed/prepared transaction */
1322  elog(DEBUG1, "replaying changes from file \"%s\"", path);
1323 
1325  false);
1326 
1327  buffer = palloc(BLCKSZ);
1328  initStringInfo(&s2);
1329 
1330  MemoryContextSwitchTo(oldcxt);
1331 
1332  remote_final_lsn = lsn;
1333 
1334  /*
1335  * Make sure the handle apply_dispatch methods are aware we're in a remote
1336  * transaction.
1337  */
1338  in_remote_transaction = true;
1340 
1342 
1343  /*
1344  * Read the entries one by one and pass them through the same logic as in
1345  * apply_dispatch.
1346  */
1347  nchanges = 0;
1348  while (true)
1349  {
1350  int nbytes;
1351  int len;
1352 
1354 
1355  /* read length of the on-disk record */
1356  nbytes = BufFileRead(fd, &len, sizeof(len));
1357 
1358  /* have we reached end of the file? */
1359  if (nbytes == 0)
1360  break;
1361 
1362  /* do we have a correct length? */
1363  if (nbytes != sizeof(len))
1364  ereport(ERROR,
1366  errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1367  path)));
1368 
1369  if (len <= 0)
1370  elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
1371  len, path);
1372 
1373  /* make sure we have sufficiently large buffer */
1374  buffer = repalloc(buffer, len);
1375 
1376  /* and finally read the data into the buffer */
1377  if (BufFileRead(fd, buffer, len) != len)
1378  ereport(ERROR,
1380  errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1381  path)));
1382 
1383  /* copy the buffer to the stringinfo and call apply_dispatch */
1384  resetStringInfo(&s2);
1385  appendBinaryStringInfo(&s2, buffer, len);
1386 
1387  /* Ensure we are reading the data into our memory context. */
1389 
1390  apply_dispatch(&s2);
1391 
1393 
1394  MemoryContextSwitchTo(oldcxt);
1395 
1396  nchanges++;
1397 
1398  if (nchanges % 1000 == 0)
1399  elog(DEBUG1, "replayed %d changes from file \"%s\"",
1400  nchanges, path);
1401  }
1402 
1403  BufFileClose(fd);
1404 
1405  pfree(buffer);
1406  pfree(s2.data);
1407 
1408  elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
1409  nchanges, path);
1410 
1411  return;
1412 }
1413 
1414 /*
1415  * Handle STREAM COMMIT message.
1416  */
1417 static void
1419 {
1420  TransactionId xid;
1421  LogicalRepCommitData commit_data;
1422 
1424  ereport(ERROR,
1425  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1426  errmsg_internal("STREAM COMMIT message without STREAM STOP")));
1427 
1428  xid = logicalrep_read_stream_commit(s, &commit_data);
1429  set_apply_error_context_xact(xid, commit_data.committime);
1430 
1431  elog(DEBUG1, "received commit for streamed transaction %u", xid);
1432 
1433  apply_spooled_messages(xid, commit_data.commit_lsn);
1434 
1435  apply_handle_commit_internal(&commit_data);
1436 
1437  /* unlink the files with serialized changes and subxact info */
1439 
1440  /* Process any tables that are being synchronized in parallel. */
1441  process_syncing_tables(commit_data.end_lsn);
1442 
1444 
1446 }
1447 
1448 /*
1449  * Helper function for apply_handle_commit and apply_handle_stream_commit.
1450  */
1451 static void
1453 {
1454  if (IsTransactionState())
1455  {
1456  /*
1457  * Update origin state so we can restart streaming from correct
1458  * position in case of crash.
1459  */
1460  replorigin_session_origin_lsn = commit_data->end_lsn;
1462 
1464  pgstat_report_stat(false);
1465 
1466  store_flush_position(commit_data->end_lsn);
1467  }
1468  else
1469  {
1470  /* Process any invalidation messages that might have accumulated. */
1473  }
1474 
1475  in_remote_transaction = false;
1476 }
1477 
1478 /*
1479  * Handle RELATION message.
1480  *
1481  * Note we don't do validation against local schema here. The validation
1482  * against local schema is postponed until first change for given relation
1483  * comes as we only care about it when applying changes for it anyway and we
1484  * do less locking this way.
1485  */
1486 static void
1488 {
1489  LogicalRepRelation *rel;
1490 
1492  return;
1493 
1494  rel = logicalrep_read_rel(s);
1496 }
1497 
1498 /*
1499  * Handle TYPE message.
1500  *
1501  * This implementation pays no attention to TYPE messages; we expect the user
1502  * to have set things up so that the incoming data is acceptable to the input
1503  * functions for the locally subscribed tables. Hence, we just read and
1504  * discard the message.
1505  */
1506 static void
1508 {
1509  LogicalRepTyp typ;
1510 
1512  return;
1513 
1514  logicalrep_read_typ(s, &typ);
1515 }
1516 
1517 /*
1518  * Get replica identity index or if it is not defined a primary key.
1519  *
1520  * If neither is defined, returns InvalidOid
1521  */
1522 static Oid
1524 {
1525  Oid idxoid;
1526 
1527  idxoid = RelationGetReplicaIndex(rel);
1528 
1529  if (!OidIsValid(idxoid))
1530  idxoid = RelationGetPrimaryKeyIndex(rel);
1531 
1532  return idxoid;
1533 }
1534 
1535 /*
1536  * Check that we (the subscription owner) have sufficient privileges on the
1537  * target relation to perform the given operation.
1538  */
1539 static void
1541 {
1542  Oid relid;
1543  AclResult aclresult;
1544 
1545  relid = RelationGetRelid(rel);
1546  aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
1547  if (aclresult != ACLCHECK_OK)
1548  aclcheck_error(aclresult,
1549  get_relkind_objtype(rel->rd_rel->relkind),
1550  get_rel_name(relid));
1551 
1552  /*
1553  * We lack the infrastructure to honor RLS policies. It might be possible
1554  * to add such infrastructure here, but tablesync workers lack it, too, so
1555  * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
1556  * but it seems dangerous to replicate a TRUNCATE and then refuse to
1557  * replicate subsequent INSERTs, so we forbid all commands the same.
1558  */
1559  if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
1560  ereport(ERROR,
1561  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1562  errmsg("\"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1563  GetUserNameFromId(GetUserId(), true),
1564  RelationGetRelationName(rel))));
1565 }
1566 
1567 /*
1568  * Handle INSERT message.
1569  */
1570 
1571 static void
1573 {
1574  LogicalRepRelMapEntry *rel;
1575  LogicalRepTupleData newtup;
1576  LogicalRepRelId relid;
1577  ApplyExecutionData *edata;
1578  EState *estate;
1579  TupleTableSlot *remoteslot;
1580  MemoryContext oldctx;
1581 
1583  return;
1584 
1586 
1587  relid = logicalrep_read_insert(s, &newtup);
1588  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1589  if (!should_apply_changes_for_rel(rel))
1590  {
1591  /*
1592  * The relation can't become interesting in the middle of the
1593  * transaction so it's safe to unlock it.
1594  */
1597  return;
1598  }
1599 
1600  /* Set relation for error callback */
1602 
1603  /* Initialize the executor state. */
1604  edata = create_edata_for_relation(rel);
1605  estate = edata->estate;
1606  remoteslot = ExecInitExtraTupleSlot(estate,
1607  RelationGetDescr(rel->localrel),
1608  &TTSOpsVirtual);
1609 
1610  /* Process and store remote tuple in the slot */
1612  slot_store_data(remoteslot, rel, &newtup);
1613  slot_fill_defaults(rel, estate, remoteslot);
1614  MemoryContextSwitchTo(oldctx);
1615 
1616  /* For a partitioned table, insert the tuple into a partition. */
1617  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1619  remoteslot, NULL, CMD_INSERT);
1620  else
1622  remoteslot);
1623 
1624  finish_edata(edata);
1625 
1626  /* Reset relation for error callback */
1628 
1630 
1632 }
1633 
1634 /*
1635  * Workhorse for apply_handle_insert()
1636  * relinfo is for the relation we're actually inserting into
1637  * (could be a child partition of edata->targetRelInfo)
1638  */
1639 static void
1641  ResultRelInfo *relinfo,
1642  TupleTableSlot *remoteslot)
1643 {
1644  EState *estate = edata->estate;
1645 
1646  /* We must open indexes here. */
1647  ExecOpenIndices(relinfo, false);
1648 
1649  /* Do the insert. */
1651  ExecSimpleRelationInsert(relinfo, estate, remoteslot);
1652 
1653  /* Cleanup. */
1654  ExecCloseIndices(relinfo);
1655 }
1656 
1657 /*
1658  * Check if the logical replication relation is updatable and throw
1659  * appropriate error if it isn't.
1660  */
1661 static void
1663 {
1664  /* Updatable, no error. */
1665  if (rel->updatable)
1666  return;
1667 
1668  /*
1669  * We are in error mode so it's fine this is somewhat slow. It's better to
1670  * give user correct error.
1671  */
1673  {
1674  ereport(ERROR,
1675  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1676  errmsg("publisher did not send replica identity column "
1677  "expected by the logical replication target relation \"%s.%s\"",
1678  rel->remoterel.nspname, rel->remoterel.relname)));
1679  }
1680 
1681  ereport(ERROR,
1682  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1683  errmsg("logical replication target relation \"%s.%s\" has "
1684  "neither REPLICA IDENTITY index nor PRIMARY "
1685  "KEY and published relation does not have "
1686  "REPLICA IDENTITY FULL",
1687  rel->remoterel.nspname, rel->remoterel.relname)));
1688 }
1689 
1690 /*
1691  * Handle UPDATE message.
1692  *
1693  * TODO: FDW support
1694  */
1695 static void
1697 {
1698  LogicalRepRelMapEntry *rel;
1699  LogicalRepRelId relid;
1700  ApplyExecutionData *edata;
1701  EState *estate;
1702  LogicalRepTupleData oldtup;
1703  LogicalRepTupleData newtup;
1704  bool has_oldtup;
1705  TupleTableSlot *remoteslot;
1706  RangeTblEntry *target_rte;
1707  MemoryContext oldctx;
1708 
1710  return;
1711 
1713 
1714  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
1715  &newtup);
1716  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1717  if (!should_apply_changes_for_rel(rel))
1718  {
1719  /*
1720  * The relation can't become interesting in the middle of the
1721  * transaction so it's safe to unlock it.
1722  */
1725  return;
1726  }
1727 
1728  /* Set relation for error callback */
1730 
1731  /* Check if we can do the update. */
1733 
1734  /* Initialize the executor state. */
1735  edata = create_edata_for_relation(rel);
1736  estate = edata->estate;
1737  remoteslot = ExecInitExtraTupleSlot(estate,
1738  RelationGetDescr(rel->localrel),
1739  &TTSOpsVirtual);
1740 
1741  /*
1742  * Populate updatedCols so that per-column triggers can fire, and so
1743  * executor can correctly pass down indexUnchanged hint. This could
1744  * include more columns than were actually changed on the publisher
1745  * because the logical replication protocol doesn't contain that
1746  * information. But it would for example exclude columns that only exist
1747  * on the subscriber, since we are not touching those.
1748  */
1749  target_rte = list_nth(estate->es_range_table, 0);
1750  for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
1751  {
1753  int remoteattnum = rel->attrmap->attnums[i];
1754 
1755  if (!att->attisdropped && remoteattnum >= 0)
1756  {
1757  Assert(remoteattnum < newtup.ncols);
1758  if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
1759  target_rte->updatedCols =
1760  bms_add_member(target_rte->updatedCols,
1762  }
1763  }
1764 
1765  /* Also populate extraUpdatedCols, in case we have generated columns */
1766  fill_extraUpdatedCols(target_rte, rel->localrel);
1767 
1768  /* Build the search tuple. */
1770  slot_store_data(remoteslot, rel,
1771  has_oldtup ? &oldtup : &newtup);
1772  MemoryContextSwitchTo(oldctx);
1773 
1774  /* For a partitioned table, apply update to correct partition. */
1775  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1777  remoteslot, &newtup, CMD_UPDATE);
1778  else
1780  remoteslot, &newtup);
1781 
1782  finish_edata(edata);
1783 
1784  /* Reset relation for error callback */
1786 
1788 
1790 }
1791 
1792 /*
1793  * Workhorse for apply_handle_update()
1794  * relinfo is for the relation we're actually updating in
1795  * (could be a child partition of edata->targetRelInfo)
1796  */
1797 static void
1799  ResultRelInfo *relinfo,
1800  TupleTableSlot *remoteslot,
1801  LogicalRepTupleData *newtup)
1802 {
1803  EState *estate = edata->estate;
1804  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
1805  Relation localrel = relinfo->ri_RelationDesc;
1806  EPQState epqstate;
1807  TupleTableSlot *localslot;
1808  bool found;
1809  MemoryContext oldctx;
1810 
1811  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1812  ExecOpenIndices(relinfo, false);
1813 
1814  found = FindReplTupleInLocalRel(estate, localrel,
1815  &relmapentry->remoterel,
1816  remoteslot, &localslot);
1817  ExecClearTuple(remoteslot);
1818 
1819  /*
1820  * Tuple found.
1821  *
1822  * Note this will fail if there are other conflicting unique indexes.
1823  */
1824  if (found)
1825  {
1826  /* Process and store remote tuple in the slot */
1828  slot_modify_data(remoteslot, localslot, relmapentry, newtup);
1829  MemoryContextSwitchTo(oldctx);
1830 
1831  EvalPlanQualSetSlot(&epqstate, remoteslot);
1832 
1833  /* Do the actual update. */
1835  ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
1836  remoteslot);
1837  }
1838  else
1839  {
1840  /*
1841  * The tuple to be updated could not be found. Do nothing except for
1842  * emitting a log message.
1843  *
1844  * XXX should this be promoted to ereport(LOG) perhaps?
1845  */
1846  elog(DEBUG1,
1847  "logical replication did not find row to be updated "
1848  "in replication target relation \"%s\"",
1849  RelationGetRelationName(localrel));
1850  }
1851 
1852  /* Cleanup. */
1853  ExecCloseIndices(relinfo);
1854  EvalPlanQualEnd(&epqstate);
1855 }
1856 
1857 /*
1858  * Handle DELETE message.
1859  *
1860  * TODO: FDW support
1861  */
1862 static void
1864 {
1865  LogicalRepRelMapEntry *rel;
1866  LogicalRepTupleData oldtup;
1867  LogicalRepRelId relid;
1868  ApplyExecutionData *edata;
1869  EState *estate;
1870  TupleTableSlot *remoteslot;
1871  MemoryContext oldctx;
1872 
1874  return;
1875 
1877 
1878  relid = logicalrep_read_delete(s, &oldtup);
1879  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1880  if (!should_apply_changes_for_rel(rel))
1881  {
1882  /*
1883  * The relation can't become interesting in the middle of the
1884  * transaction so it's safe to unlock it.
1885  */
1888  return;
1889  }
1890 
1891  /* Set relation for error callback */
1893 
1894  /* Check if we can do the delete. */
1896 
1897  /* Initialize the executor state. */
1898  edata = create_edata_for_relation(rel);
1899  estate = edata->estate;
1900  remoteslot = ExecInitExtraTupleSlot(estate,
1901  RelationGetDescr(rel->localrel),
1902  &TTSOpsVirtual);
1903 
1904  /* Build the search tuple. */
1906  slot_store_data(remoteslot, rel, &oldtup);
1907  MemoryContextSwitchTo(oldctx);
1908 
1909  /* For a partitioned table, apply delete to correct partition. */
1910  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1912  remoteslot, NULL, CMD_DELETE);
1913  else
1915  remoteslot);
1916 
1917  finish_edata(edata);
1918 
1919  /* Reset relation for error callback */
1921 
1923 
1925 }
1926 
1927 /*
1928  * Workhorse for apply_handle_delete()
1929  * relinfo is for the relation we're actually deleting from
1930  * (could be a child partition of edata->targetRelInfo)
1931  */
1932 static void
1934  ResultRelInfo *relinfo,
1935  TupleTableSlot *remoteslot)
1936 {
1937  EState *estate = edata->estate;
1938  Relation localrel = relinfo->ri_RelationDesc;
1939  LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
1940  EPQState epqstate;
1941  TupleTableSlot *localslot;
1942  bool found;
1943 
1944  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1945  ExecOpenIndices(relinfo, false);
1946 
1947  found = FindReplTupleInLocalRel(estate, localrel, remoterel,
1948  remoteslot, &localslot);
1949 
1950  /* If found delete it. */
1951  if (found)
1952  {
1953  EvalPlanQualSetSlot(&epqstate, localslot);
1954 
1955  /* Do the actual delete. */
1957  ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
1958  }
1959  else
1960  {
1961  /*
1962  * The tuple to be deleted could not be found. Do nothing except for
1963  * emitting a log message.
1964  *
1965  * XXX should this be promoted to ereport(LOG) perhaps?
1966  */
1967  elog(DEBUG1,
1968  "logical replication did not find row to be deleted "
1969  "in replication target relation \"%s\"",
1970  RelationGetRelationName(localrel));
1971  }
1972 
1973  /* Cleanup. */
1974  ExecCloseIndices(relinfo);
1975  EvalPlanQualEnd(&epqstate);
1976 }
1977 
1978 /*
1979  * Try to find a tuple received from the publication side (in 'remoteslot') in
1980  * the corresponding local relation using either replica identity index,
1981  * primary key or if needed, sequential scan.
1982  *
1983  * Local tuple, if found, is returned in '*localslot'.
1984  */
1985 static bool
1987  LogicalRepRelation *remoterel,
1988  TupleTableSlot *remoteslot,
1989  TupleTableSlot **localslot)
1990 {
1991  Oid idxoid;
1992  bool found;
1993 
1994  /*
1995  * Regardless of the top-level operation, we're performing a read here, so
1996  * check for SELECT privileges.
1997  */
1998  TargetPrivilegesCheck(localrel, ACL_SELECT);
1999 
2000  *localslot = table_slot_create(localrel, &estate->es_tupleTable);
2001 
2002  idxoid = GetRelationIdentityOrPK(localrel);
2003  Assert(OidIsValid(idxoid) ||
2004  (remoterel->replident == REPLICA_IDENTITY_FULL));
2005 
2006  if (OidIsValid(idxoid))
2007  found = RelationFindReplTupleByIndex(localrel, idxoid,
2009  remoteslot, *localslot);
2010  else
2011  found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
2012  remoteslot, *localslot);
2013 
2014  return found;
2015 }
2016 
2017 /*
2018  * This handles insert, update, delete on a partitioned table.
2019  */
2020 static void
2022  TupleTableSlot *remoteslot,
2023  LogicalRepTupleData *newtup,
2024  CmdType operation)
2025 {
2026  EState *estate = edata->estate;
2027  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2028  ResultRelInfo *relinfo = edata->targetRelInfo;
2029  Relation parentrel = relinfo->ri_RelationDesc;
2030  ModifyTableState *mtstate;
2031  PartitionTupleRouting *proute;
2032  ResultRelInfo *partrelinfo;
2033  Relation partrel;
2034  TupleTableSlot *remoteslot_part;
2035  TupleConversionMap *map;
2036  MemoryContext oldctx;
2037 
2038  /* ModifyTableState is needed for ExecFindPartition(). */
2039  edata->mtstate = mtstate = makeNode(ModifyTableState);
2040  mtstate->ps.plan = NULL;
2041  mtstate->ps.state = estate;
2042  mtstate->operation = operation;
2043  mtstate->resultRelInfo = relinfo;
2044 
2045  /* ... as is PartitionTupleRouting. */
2046  edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
2047 
2048  /*
2049  * Find the partition to which the "search tuple" belongs.
2050  */
2051  Assert(remoteslot != NULL);
2053  partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
2054  remoteslot, estate);
2055  Assert(partrelinfo != NULL);
2056  partrel = partrelinfo->ri_RelationDesc;
2057 
2058  /*
2059  * To perform any of the operations below, the tuple must match the
2060  * partition's rowtype. Convert if needed or just copy, using a dedicated
2061  * slot to store the tuple in any case.
2062  */
2063  remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
2064  if (remoteslot_part == NULL)
2065  remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
2066  map = partrelinfo->ri_RootToPartitionMap;
2067  if (map != NULL)
2068  remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
2069  remoteslot_part);
2070  else
2071  {
2072  remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
2073  slot_getallattrs(remoteslot_part);
2074  }
2075  MemoryContextSwitchTo(oldctx);
2076 
2077  switch (operation)
2078  {
2079  case CMD_INSERT:
2080  apply_handle_insert_internal(edata, partrelinfo,
2081  remoteslot_part);
2082  break;
2083 
2084  case CMD_DELETE:
2085  apply_handle_delete_internal(edata, partrelinfo,
2086  remoteslot_part);
2087  break;
2088 
2089  case CMD_UPDATE:
2090 
2091  /*
2092  * For UPDATE, depending on whether or not the updated tuple
2093  * satisfies the partition's constraint, perform a simple UPDATE
2094  * of the partition or move the updated tuple into a different
2095  * suitable partition.
2096  */
2097  {
2098  AttrMap *attrmap = map ? map->attrMap : NULL;
2099  LogicalRepRelMapEntry *part_entry;
2100  TupleTableSlot *localslot;
2101  ResultRelInfo *partrelinfo_new;
2102  bool found;
2103 
2104  part_entry = logicalrep_partition_open(relmapentry, partrel,
2105  attrmap);
2106 
2107  /* Get the matching local tuple from the partition. */
2108  found = FindReplTupleInLocalRel(estate, partrel,
2109  &part_entry->remoterel,
2110  remoteslot_part, &localslot);
2111  if (!found)
2112  {
2113  /*
2114  * The tuple to be updated could not be found. Do nothing
2115  * except for emitting a log message.
2116  *
2117  * XXX should this be promoted to ereport(LOG) perhaps?
2118  */
2119  elog(DEBUG1,
2120  "logical replication did not find row to be updated "
2121  "in replication target relation's partition \"%s\"",
2122  RelationGetRelationName(partrel));
2123  return;
2124  }
2125 
2126  /*
2127  * Apply the update to the local tuple, putting the result in
2128  * remoteslot_part.
2129  */
2131  slot_modify_data(remoteslot_part, localslot, part_entry,
2132  newtup);
2133  MemoryContextSwitchTo(oldctx);
2134 
2135  /*
2136  * Does the updated tuple still satisfy the current
2137  * partition's constraint?
2138  */
2139  if (!partrel->rd_rel->relispartition ||
2140  ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
2141  false))
2142  {
2143  /*
2144  * Yes, so simply UPDATE the partition. We don't call
2145  * apply_handle_update_internal() here, which would
2146  * normally do the following work, to avoid repeating some
2147  * work already done above to find the local tuple in the
2148  * partition.
2149  */
2150  EPQState epqstate;
2151 
2152  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
2153  ExecOpenIndices(partrelinfo, false);
2154 
2155  EvalPlanQualSetSlot(&epqstate, remoteslot_part);
2157  ACL_UPDATE);
2158  ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
2159  localslot, remoteslot_part);
2160  ExecCloseIndices(partrelinfo);
2161  EvalPlanQualEnd(&epqstate);
2162  }
2163  else
2164  {
2165  /* Move the tuple into the new partition. */
2166 
2167  /*
2168  * New partition will be found using tuple routing, which
2169  * can only occur via the parent table. We might need to
2170  * convert the tuple to the parent's rowtype. Note that
2171  * this is the tuple found in the partition, not the
2172  * original search tuple received by this function.
2173  */
2174  if (map)
2175  {
2176  TupleConversionMap *PartitionToRootMap =
2178  RelationGetDescr(parentrel));
2179 
2180  remoteslot =
2181  execute_attr_map_slot(PartitionToRootMap->attrMap,
2182  remoteslot_part, remoteslot);
2183  }
2184  else
2185  {
2186  remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
2187  slot_getallattrs(remoteslot);
2188  }
2189 
2190 
2191  /* Find the new partition. */
2193  partrelinfo_new = ExecFindPartition(mtstate, relinfo,
2194  proute, remoteslot,
2195  estate);
2196  MemoryContextSwitchTo(oldctx);
2197  Assert(partrelinfo_new != partrelinfo);
2198 
2199  /* DELETE old tuple found in the old partition. */
2200  apply_handle_delete_internal(edata, partrelinfo,
2201  localslot);
2202 
2203  /* INSERT new tuple into the new partition. */
2204 
2205  /*
2206  * Convert the replacement tuple to match the destination
2207  * partition rowtype.
2208  */
2210  partrel = partrelinfo_new->ri_RelationDesc;
2211  remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
2212  if (remoteslot_part == NULL)
2213  remoteslot_part = table_slot_create(partrel,
2214  &estate->es_tupleTable);
2215  map = partrelinfo_new->ri_RootToPartitionMap;
2216  if (map != NULL)
2217  {
2218  remoteslot_part = execute_attr_map_slot(map->attrMap,
2219  remoteslot,
2220  remoteslot_part);
2221  }
2222  else
2223  {
2224  remoteslot_part = ExecCopySlot(remoteslot_part,
2225  remoteslot);
2226  slot_getallattrs(remoteslot);
2227  }
2228  MemoryContextSwitchTo(oldctx);
2229  apply_handle_insert_internal(edata, partrelinfo_new,
2230  remoteslot_part);
2231  }
2232  }
2233  break;
2234 
2235  default:
2236  elog(ERROR, "unrecognized CmdType: %d", (int) operation);
2237  break;
2238  }
2239 }
2240 
2241 /*
2242  * Handle TRUNCATE message.
2243  *
2244  * TODO: FDW support
2245  */
2246 static void
2248 {
2249  bool cascade = false;
2250  bool restart_seqs = false;
2251  List *remote_relids = NIL;
2252  List *remote_rels = NIL;
2253  List *rels = NIL;
2254  List *part_rels = NIL;
2255  List *relids = NIL;
2256  List *relids_logged = NIL;
2257  ListCell *lc;
2258  LOCKMODE lockmode = AccessExclusiveLock;
2259 
2261  return;
2262 
2264 
2265  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
2266 
2267  foreach(lc, remote_relids)
2268  {
2269  LogicalRepRelId relid = lfirst_oid(lc);
2270  LogicalRepRelMapEntry *rel;
2271 
2272  rel = logicalrep_rel_open(relid, lockmode);
2273  if (!should_apply_changes_for_rel(rel))
2274  {
2275  /*
2276  * The relation can't become interesting in the middle of the
2277  * transaction so it's safe to unlock it.
2278  */
2279  logicalrep_rel_close(rel, lockmode);
2280  continue;
2281  }
2282 
2283  remote_rels = lappend(remote_rels, rel);
2285  rels = lappend(rels, rel->localrel);
2286  relids = lappend_oid(relids, rel->localreloid);
2288  relids_logged = lappend_oid(relids_logged, rel->localreloid);
2289 
2290  /*
2291  * Truncate partitions if we got a message to truncate a partitioned
2292  * table.
2293  */
2294  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2295  {
2296  ListCell *child;
2297  List *children = find_all_inheritors(rel->localreloid,
2298  lockmode,
2299  NULL);
2300 
2301  foreach(child, children)
2302  {
2303  Oid childrelid = lfirst_oid(child);
2304  Relation childrel;
2305 
2306  if (list_member_oid(relids, childrelid))
2307  continue;
2308 
2309  /* find_all_inheritors already got lock */
2310  childrel = table_open(childrelid, NoLock);
2311 
2312  /*
2313  * Ignore temp tables of other backends. See similar code in
2314  * ExecuteTruncate().
2315  */
2316  if (RELATION_IS_OTHER_TEMP(childrel))
2317  {
2318  table_close(childrel, lockmode);
2319  continue;
2320  }
2321 
2323  rels = lappend(rels, childrel);
2324  part_rels = lappend(part_rels, childrel);
2325  relids = lappend_oid(relids, childrelid);
2326  /* Log this relation only if needed for logical decoding */
2327  if (RelationIsLogicallyLogged(childrel))
2328  relids_logged = lappend_oid(relids_logged, childrelid);
2329  }
2330  }
2331  }
2332 
2333  /*
2334  * Even if we used CASCADE on the upstream primary we explicitly default
2335  * to replaying changes without further cascading. This might be later
2336  * changeable with a user specified option.
2337  */
2338  ExecuteTruncateGuts(rels,
2339  relids,
2340  relids_logged,
2341  DROP_RESTRICT,
2342  restart_seqs);
2343  foreach(lc, remote_rels)
2344  {
2345  LogicalRepRelMapEntry *rel = lfirst(lc);
2346 
2348  }
2349  foreach(lc, part_rels)
2350  {
2351  Relation rel = lfirst(lc);
2352 
2353  table_close(rel, NoLock);
2354  }
2355 
2357 }
2358 
2359 
2360 /*
2361  * Logical replication protocol message dispatcher.
2362  */
2363 static void
2365 {
2367  LogicalRepMsgType saved_command;
2368 
2369  /*
2370  * Set the current command being applied. Since this function can be
2371  * called recusively when applying spooled changes, save the current
2372  * command.
2373  */
2374  saved_command = apply_error_callback_arg.command;
2376 
2377  switch (action)
2378  {
2379  case LOGICAL_REP_MSG_BEGIN:
2380  apply_handle_begin(s);
2381  break;
2382 
2385  break;
2386 
2389  break;
2390 
2393  break;
2394 
2397  break;
2398 
2401  break;
2402 
2405  break;
2406 
2407  case LOGICAL_REP_MSG_TYPE:
2408  apply_handle_type(s);
2409  break;
2410 
2413  break;
2414 
2416 
2417  /*
2418  * Logical replication does not use generic logical messages yet.
2419  * Although, it could be used by other applications that use this
2420  * output plugin.
2421  */
2422  break;
2423 
2426  break;
2427 
2430  break;
2431 
2434  break;
2435 
2438  break;
2439 
2442  break;
2443 
2446  break;
2447 
2450  break;
2451 
2454  break;
2455 
2458  break;
2459 
2460  default:
2461  ereport(ERROR,
2462  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2463  errmsg("invalid logical replication message type \"%c\"", action)));
2464  }
2465 
2466  /* Reset the current command */
2467  apply_error_callback_arg.command = saved_command;
2468 }
2469 
2470 /*
2471  * Figure out which write/flush positions to report to the walsender process.
2472  *
2473  * We can't simply report back the last LSN the walsender sent us because the
2474  * local transaction might not yet be flushed to disk locally. Instead we
2475  * build a list that associates local with remote LSNs for every commit. When
2476  * reporting back the flush position to the sender we iterate that list and
2477  * check which entries on it are already locally flushed. Those we can report
2478  * as having been flushed.
2479  *
2480  * The have_pending_txes is true if there are outstanding transactions that
2481  * need to be flushed.
2482  */
2483 static void
2485  bool *have_pending_txes)
2486 {
2487  dlist_mutable_iter iter;
2488  XLogRecPtr local_flush = GetFlushRecPtr(NULL);
2489 
2491  *flush = InvalidXLogRecPtr;
2492 
2494  {
2495  FlushPosition *pos =
2496  dlist_container(FlushPosition, node, iter.cur);
2497 
2498  *write = pos->remote_end;
2499 
2500  if (pos->local_end <= local_flush)
2501  {
2502  *flush = pos->remote_end;
2503  dlist_delete(iter.cur);
2504  pfree(pos);
2505  }
2506  else
2507  {
2508  /*
2509  * Don't want to uselessly iterate over the rest of the list which
2510  * could potentially be long. Instead get the last element and
2511  * grab the write position from there.
2512  */
2513  pos = dlist_tail_element(FlushPosition, node,
2514  &lsn_mapping);
2515  *write = pos->remote_end;
2516  *have_pending_txes = true;
2517  return;
2518  }
2519  }
2520 
2521  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
2522 }
2523 
2524 /*
2525  * Store current remote/local lsn pair in the tracking list.
2526  */
2527 static void
2529 {
2530  FlushPosition *flushpos;
2531 
2532  /* Need to do this in permanent context */
2534 
2535  /* Track commit lsn */
2536  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
2537  flushpos->local_end = XactLastCommitEnd;
2538  flushpos->remote_end = remote_lsn;
2539 
2540  dlist_push_tail(&lsn_mapping, &flushpos->node);
2542 }
2543 
2544 
2545 /* Update statistics of the worker. */
2546 static void
2547 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
2548 {
2549  MyLogicalRepWorker->last_lsn = last_lsn;
2550  MyLogicalRepWorker->last_send_time = send_time;
2552  if (reply)
2553  {
2554  MyLogicalRepWorker->reply_lsn = last_lsn;
2555  MyLogicalRepWorker->reply_time = send_time;
2556  }
2557 }
2558 
2559 /*
2560  * Apply main loop.
2561  */
2562 static void
2564 {
2565  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
2566  bool ping_sent = false;
2567  TimeLineID tli;
2568  ErrorContextCallback errcallback;
2569 
2570  /*
2571  * Init the ApplyMessageContext which we clean up after each replication
2572  * protocol message.
2573  */
2575  "ApplyMessageContext",
2577 
2578  /*
2579  * This memory context is used for per-stream data when the streaming mode
2580  * is enabled. This context is reset on each stream stop.
2581  */
2583  "LogicalStreamingContext",
2585 
2586  /* mark as idle, before starting to loop */
2588 
2589  /*
2590  * Push apply error context callback. Fields will be filled during
2591  * applying a change.
2592  */
2593  errcallback.callback = apply_error_callback;
2594  errcallback.previous = error_context_stack;
2595  error_context_stack = &errcallback;
2596 
2597  /* This outer loop iterates once per wait. */
2598  for (;;)
2599  {
2601  int rc;
2602  int len;
2603  char *buf = NULL;
2604  bool endofstream = false;
2605  long wait_time;
2606 
2608 
2610 
2612 
2613  if (len != 0)
2614  {
2615  /* Loop to process all available data (without blocking). */
2616  for (;;)
2617  {
2619 
2620  if (len == 0)
2621  {
2622  break;
2623  }
2624  else if (len < 0)
2625  {
2626  ereport(LOG,
2627  (errmsg("data stream from publisher has ended")));
2628  endofstream = true;
2629  break;
2630  }
2631  else
2632  {
2633  int c;
2634  StringInfoData s;
2635 
2636  /* Reset timeout. */
2637  last_recv_timestamp = GetCurrentTimestamp();
2638  ping_sent = false;
2639 
2640  /* Ensure we are reading the data into our memory context. */
2642 
2643  s.data = buf;
2644  s.len = len;
2645  s.cursor = 0;
2646  s.maxlen = -1;
2647 
2648  c = pq_getmsgbyte(&s);
2649 
2650  if (c == 'w')
2651  {
2652  XLogRecPtr start_lsn;
2653  XLogRecPtr end_lsn;
2654  TimestampTz send_time;
2655 
2656  start_lsn = pq_getmsgint64(&s);
2657  end_lsn = pq_getmsgint64(&s);
2658  send_time = pq_getmsgint64(&s);
2659 
2660  if (last_received < start_lsn)
2661  last_received = start_lsn;
2662 
2663  if (last_received < end_lsn)
2664  last_received = end_lsn;
2665 
2666  UpdateWorkerStats(last_received, send_time, false);
2667 
2668  apply_dispatch(&s);
2669  }
2670  else if (c == 'k')
2671  {
2672  XLogRecPtr end_lsn;
2674  bool reply_requested;
2675 
2676  end_lsn = pq_getmsgint64(&s);
2677  timestamp = pq_getmsgint64(&s);
2678  reply_requested = pq_getmsgbyte(&s);
2679 
2680  if (last_received < end_lsn)
2681  last_received = end_lsn;
2682 
2683  send_feedback(last_received, reply_requested, false);
2684  UpdateWorkerStats(last_received, timestamp, true);
2685  }
2686  /* other message types are purposefully ignored */
2687 
2689  }
2690 
2692  }
2693  }
2694 
2695  /* confirm all writes so far */
2696  send_feedback(last_received, false, false);
2697 
2699  {
2700  /*
2701  * If we didn't get any transactions for a while there might be
2702  * unconsumed invalidation messages in the queue, consume them
2703  * now.
2704  */
2707 
2708  /* Process any table synchronization changes. */
2709  process_syncing_tables(last_received);
2710  }
2711 
2712  /* Cleanup the memory. */
2715 
2716  /* Check if we need to exit the streaming loop. */
2717  if (endofstream)
2718  break;
2719 
2720  /*
2721  * Wait for more data or latch. If we have unflushed transactions,
2722  * wake up after WalWriterDelay to see if they've been flushed yet (in
2723  * which case we should send a feedback message). Otherwise, there's
2724  * no particular urgency about waking up unless we get data or a
2725  * signal.
2726  */
2727  if (!dlist_is_empty(&lsn_mapping))
2728  wait_time = WalWriterDelay;
2729  else
2730  wait_time = NAPTIME_PER_CYCLE;
2731 
2735  fd, wait_time,
2737 
2738  if (rc & WL_LATCH_SET)
2739  {
2742  }
2743 
2744  if (ConfigReloadPending)
2745  {
2746  ConfigReloadPending = false;
2748  }
2749 
2750  if (rc & WL_TIMEOUT)
2751  {
2752  /*
2753  * We didn't receive anything new. If we haven't heard anything
2754  * from the server for more than wal_receiver_timeout / 2, ping
2755  * the server. Also, if it's been longer than
2756  * wal_receiver_status_interval since the last update we sent,
2757  * send a status update to the primary anyway, to report any
2758  * progress in applying WAL.
2759  */
2760  bool requestReply = false;
2761 
2762  /*
2763  * Check if time since last receive from primary has reached the
2764  * configured limit.
2765  */
2766  if (wal_receiver_timeout > 0)
2767  {
2769  TimestampTz timeout;
2770 
2771  timeout =
2772  TimestampTzPlusMilliseconds(last_recv_timestamp,
2774 
2775  if (now >= timeout)
2776  ereport(ERROR,
2777  (errcode(ERRCODE_CONNECTION_FAILURE),
2778  errmsg("terminating logical replication worker due to timeout")));
2779 
2780  /* Check to see if it's time for a ping. */
2781  if (!ping_sent)
2782  {
2783  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
2784  (wal_receiver_timeout / 2));
2785  if (now >= timeout)
2786  {
2787  requestReply = true;
2788  ping_sent = true;
2789  }
2790  }
2791  }
2792 
2793  send_feedback(last_received, requestReply, requestReply);
2794  }
2795  }
2796 
2797  /* Pop the error context stack */
2798  error_context_stack = errcallback.previous;
2799 
2800  /* All done */
2802 }
2803 
2804 /*
2805  * Send a Standby Status Update message to server.
2806  *
2807  * 'recvpos' is the latest LSN we've received data to, force is set if we need
2808  * to send a response to avoid timeouts.
2809  */
2810 static void
2811 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
2812 {
2813  static StringInfo reply_message = NULL;
2814  static TimestampTz send_time = 0;
2815 
2816  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
2817  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
2818  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
2819 
2820  XLogRecPtr writepos;
2821  XLogRecPtr flushpos;
2822  TimestampTz now;
2823  bool have_pending_txes;
2824 
2825  /*
2826  * If the user doesn't want status to be reported to the publisher, be
2827  * sure to exit before doing anything at all.
2828  */
2829  if (!force && wal_receiver_status_interval <= 0)
2830  return;
2831 
2832  /* It's legal to not pass a recvpos */
2833  if (recvpos < last_recvpos)
2834  recvpos = last_recvpos;
2835 
2836  get_flush_position(&writepos, &flushpos, &have_pending_txes);
2837 
2838  /*
2839  * No outstanding transactions to flush, we can report the latest received
2840  * position. This is important for synchronous replication.
2841  */
2842  if (!have_pending_txes)
2843  flushpos = writepos = recvpos;
2844 
2845  if (writepos < last_writepos)
2846  writepos = last_writepos;
2847 
2848  if (flushpos < last_flushpos)
2849  flushpos = last_flushpos;
2850 
2852 
2853  /* if we've already reported everything we're good */
2854  if (!force &&
2855  writepos == last_writepos &&
2856  flushpos == last_flushpos &&
2857  !TimestampDifferenceExceeds(send_time, now,
2859  return;
2860  send_time = now;
2861 
2862  if (!reply_message)
2863  {
2865 
2867  MemoryContextSwitchTo(oldctx);
2868  }
2869  else
2871 
2872  pq_sendbyte(reply_message, 'r');
2873  pq_sendint64(reply_message, recvpos); /* write */
2874  pq_sendint64(reply_message, flushpos); /* flush */
2875  pq_sendint64(reply_message, writepos); /* apply */
2876  pq_sendint64(reply_message, now); /* sendTime */
2877  pq_sendbyte(reply_message, requestReply); /* replyRequested */
2878 
2879  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
2880  force,
2881  LSN_FORMAT_ARGS(recvpos),
2882  LSN_FORMAT_ARGS(writepos),
2883  LSN_FORMAT_ARGS(flushpos));
2884 
2887 
2888  if (recvpos > last_recvpos)
2889  last_recvpos = recvpos;
2890  if (writepos > last_writepos)
2891  last_writepos = writepos;
2892  if (flushpos > last_flushpos)
2893  last_flushpos = flushpos;
2894 }
2895 
2896 /*
2897  * Reread subscription info if needed. Most changes will be exit.
2898  */
2899 static void
2901 {
2902  MemoryContext oldctx;
2904  bool started_tx = false;
2905 
2906  /* When cache state is valid there is nothing to do here. */
2907  if (MySubscriptionValid)
2908  return;
2909 
2910  /* This function might be called inside or outside of transaction. */
2911  if (!IsTransactionState())
2912  {
2914  started_tx = true;
2915  }
2916 
2917  /* Ensure allocations in permanent context. */
2919 
2921 
2922  /*
2923  * Exit if the subscription was removed. This normally should not happen
2924  * as the worker gets killed during DROP SUBSCRIPTION.
2925  */
2926  if (!newsub)
2927  {
2928  ereport(LOG,
2929  (errmsg("logical replication apply worker for subscription \"%s\" will "
2930  "stop because the subscription was removed",
2931  MySubscription->name)));
2932 
2933  proc_exit(0);
2934  }
2935 
2936  /*
2937  * Exit if the subscription was disabled. This normally should not happen
2938  * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
2939  */
2940  if (!newsub->enabled)
2941  {
2942  ereport(LOG,
2943  (errmsg("logical replication apply worker for subscription \"%s\" will "
2944  "stop because the subscription was disabled",
2945  MySubscription->name)));
2946 
2947  proc_exit(0);
2948  }
2949 
2950  /* !slotname should never happen when enabled is true. */
2951  Assert(newsub->slotname);
2952 
2953  /* two-phase should not be altered */
2954  Assert(newsub->twophasestate == MySubscription->twophasestate);
2955 
2956  /*
2957  * Exit if any parameter that affects the remote connection was changed.
2958  * The launcher will start a new worker.
2959  */
2960  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
2961  strcmp(newsub->name, MySubscription->name) != 0 ||
2962  strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
2963  newsub->binary != MySubscription->binary ||
2964  newsub->stream != MySubscription->stream ||
2965  newsub->owner != MySubscription->owner ||
2966  !equal(newsub->publications, MySubscription->publications))
2967  {
2968  ereport(LOG,
2969  (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
2970  MySubscription->name)));
2971 
2972  proc_exit(0);
2973  }
2974 
2975  /* Check for other changes that should never happen too. */
2976  if (newsub->dbid != MySubscription->dbid)
2977  {
2978  elog(ERROR, "subscription %u changed unexpectedly",
2980  }
2981 
2982  /* Clean old subscription info and switch to new one. */
2985 
2986  MemoryContextSwitchTo(oldctx);
2987 
2988  /* Change synchronous commit according to the user's wishes */
2989  SetConfigOption("synchronous_commit", MySubscription->synccommit,
2991 
2992  if (started_tx)
2994 
2995  MySubscriptionValid = true;
2996 }
2997 
2998 /*
2999  * Callback from subscription syscache invalidation.
3000  */
3001 static void
3002 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
3003 {
3004  MySubscriptionValid = false;
3005 }
3006 
3007 /*
3008  * subxact_info_write
3009  * Store information about subxacts for a toplevel transaction.
3010  *
3011  * For each subxact we store offset of it's first change in the main file.
3012  * The file is always over-written as a whole.
3013  *
3014  * XXX We should only store subxacts that were not aborted yet.
3015  */
3016 static void
3018 {
3019  char path[MAXPGPATH];
3020  Size len;
3021  BufFile *fd;
3022 
3024 
3025  /* construct the subxact filename */
3026  subxact_filename(path, subid, xid);
3027 
3028  /* Delete the subxacts file, if exists. */
3029  if (subxact_data.nsubxacts == 0)
3030  {
3033 
3034  return;
3035  }
3036 
3037  /*
3038  * Create the subxact file if it not already created, otherwise open the
3039  * existing file.
3040  */
3042  true);
3043  if (fd == NULL)
3045 
3046  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
3047 
3048  /* Write the subxact count and subxact info */
3051 
3052  BufFileClose(fd);
3053 
3054  /* free the memory allocated for subxact info */
3056 }
3057 
3058 /*
3059  * subxact_info_read
3060  * Restore information about subxacts of a streamed transaction.
3061  *
3062  * Read information about subxacts into the structure subxact_data that can be
3063  * used later.
3064  */
3065 static void
3067 {
3068  char path[MAXPGPATH];
3069  Size len;
3070  BufFile *fd;
3071  MemoryContext oldctx;
3072 
3076 
3077  /*
3078  * If the subxact file doesn't exist that means we don't have any subxact
3079  * info.
3080  */
3081  subxact_filename(path, subid, xid);
3083  true);
3084  if (fd == NULL)
3085  return;
3086 
3087  /* read number of subxact items */
3089  sizeof(subxact_data.nsubxacts)) !=
3090  sizeof(subxact_data.nsubxacts))
3091  ereport(ERROR,
3093  errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
3094  path)));
3095 
3096  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
3097 
3098  /* we keep the maximum as a power of 2 */
3100 
3101  /*
3102  * Allocate subxact information in the logical streaming context. We need
3103  * this information during the complete stream so that we can add the sub
3104  * transaction info to this. On stream stop we will flush this information
3105  * to the subxact file and reset the logical streaming context.
3106  */
3109  sizeof(SubXactInfo));
3110  MemoryContextSwitchTo(oldctx);
3111 
3112  if ((len > 0) && ((BufFileRead(fd, subxact_data.subxacts, len)) != len))
3113  ereport(ERROR,
3115  errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
3116  path)));
3117 
3118  BufFileClose(fd);
3119 }
3120 
3121 /*
3122  * subxact_info_add
3123  * Add information about a subxact (offset in the main file).
3124  */
3125 static void
3127 {
3128  SubXactInfo *subxacts = subxact_data.subxacts;
3129  int64 i;
3130 
3131  /* We must have a valid top level stream xid and a stream fd. */
3133  Assert(stream_fd != NULL);
3134 
3135  /*
3136  * If the XID matches the toplevel transaction, we don't want to add it.
3137  */
3138  if (stream_xid == xid)
3139  return;
3140 
3141  /*
3142  * In most cases we're checking the same subxact as we've already seen in
3143  * the last call, so make sure to ignore it (this change comes later).
3144  */
3145  if (subxact_data.subxact_last == xid)
3146  return;
3147 
3148  /* OK, remember we're processing this XID. */
3149  subxact_data.subxact_last = xid;
3150 
3151  /*
3152  * Check if the transaction is already present in the array of subxact. We
3153  * intentionally scan the array from the tail, because we're likely adding
3154  * a change for the most recent subtransactions.
3155  *
3156  * XXX Can we rely on the subxact XIDs arriving in sorted order? That
3157  * would allow us to use binary search here.
3158  */
3159  for (i = subxact_data.nsubxacts; i > 0; i--)
3160  {
3161  /* found, so we're done */
3162  if (subxacts[i - 1].xid == xid)
3163  return;
3164  }
3165 
3166  /* This is a new subxact, so we need to add it to the array. */
3167  if (subxact_data.nsubxacts == 0)
3168  {
3169  MemoryContext oldctx;
3170 
3172 
3173  /*
3174  * Allocate this memory for subxacts in per-stream context, see
3175  * subxact_info_read.
3176  */
3178  subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
3179  MemoryContextSwitchTo(oldctx);
3180  }
3182  {
3184  subxacts = repalloc(subxacts,
3186  }
3187 
3188  subxacts[subxact_data.nsubxacts].xid = xid;
3189 
3190  /*
3191  * Get the current offset of the stream file and store it as offset of
3192  * this subxact.
3193  */
3195  &subxacts[subxact_data.nsubxacts].fileno,
3196  &subxacts[subxact_data.nsubxacts].offset);
3197 
3199  subxact_data.subxacts = subxacts;
3200 }
3201 
3202 /* format filename for file containing the info about subxacts */
3203 static inline void
3204 subxact_filename(char *path, Oid subid, TransactionId xid)
3205 {
3206  snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
3207 }
3208 
3209 /* format filename for file containing serialized changes */
3210 static inline void
3211 changes_filename(char *path, Oid subid, TransactionId xid)
3212 {
3213  snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
3214 }
3215 
3216 /*
3217  * stream_cleanup_files
3218  * Cleanup files for a subscription / toplevel transaction.
3219  *
3220  * Remove files with serialized changes and subxact info for a particular
3221  * toplevel transaction. Each subscription has a separate set of files
3222  * for any toplevel transaction.
3223  */
3224 static void
3226 {
3227  char path[MAXPGPATH];
3228 
3229  /* Delete the changes file. */
3230  changes_filename(path, subid, xid);
3232 
3233  /* Delete the subxact file, if it exists. */
3234  subxact_filename(path, subid, xid);
3236 }
3237 
3238 /*
3239  * stream_open_file
3240  * Open a file that we'll use to serialize changes for a toplevel
3241  * transaction.
3242  *
3243  * Open a file for streamed changes from a toplevel transaction identified
3244  * by stream_xid (global variable). If it's the first chunk of streamed
3245  * changes for this transaction, create the buffile, otherwise open the
3246  * previously created file.
3247  *
3248  * This can only be called at the beginning of a "streaming" block, i.e.
3249  * between stream_start/stream_stop messages from the upstream.
3250  */
3251 static void
3252 stream_open_file(Oid subid, TransactionId xid, bool first_segment)
3253 {
3254  char path[MAXPGPATH];
3255  MemoryContext oldcxt;
3256 
3258  Assert(OidIsValid(subid));
3260  Assert(stream_fd == NULL);
3261 
3262 
3263  changes_filename(path, subid, xid);
3264  elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
3265 
3266  /*
3267  * Create/open the buffiles under the logical streaming context so that we
3268  * have those files until stream stop.
3269  */
3271 
3272  /*
3273  * If this is the first streamed segment, create the changes file.
3274  * Otherwise, just open the file for writing, in append mode.
3275  */
3276  if (first_segment)
3278  path);
3279  else
3280  {
3281  /*
3282  * Open the file and seek to the end of the file because we always
3283  * append the changes file.
3284  */
3286  path, O_RDWR, false);
3287  BufFileSeek(stream_fd, 0, 0, SEEK_END);
3288  }
3289 
3290  MemoryContextSwitchTo(oldcxt);
3291 }
3292 
3293 /*
3294  * stream_close_file
3295  * Close the currently open file with streamed changes.
3296  *
3297  * This can only be called at the end of a streaming block, i.e. at stream_stop
3298  * message from the upstream.
3299  */
3300 static void
3302 {
3305  Assert(stream_fd != NULL);
3306 
3308 
3310  stream_fd = NULL;
3311 }
3312 
3313 /*
3314  * stream_write_change
3315  * Serialize a change to a file for the current toplevel transaction.
3316  *
3317  * The change is serialized in a simple format, with length (not including
3318  * the length), action code (identifying the message type) and message
3319  * contents (without the subxact TransactionId value).
3320  */
3321 static void
3323 {
3324  int len;
3325 
3328  Assert(stream_fd != NULL);
3329 
3330  /* total on-disk size, including the action type character */
3331  len = (s->len - s->cursor) + sizeof(char);
3332 
3333  /* first write the size */
3334  BufFileWrite(stream_fd, &len, sizeof(len));
3335 
3336  /* then the action */
3337  BufFileWrite(stream_fd, &action, sizeof(action));
3338 
3339  /* and finally the remaining part of the buffer (after the XID) */
3340  len = (s->len - s->cursor);
3341 
3342  BufFileWrite(stream_fd, &s->data[s->cursor], len);
3343 }
3344 
3345 /*
3346  * Cleanup the memory for subxacts and reset the related variables.
3347  */
3348 static inline void
3350 {
3351  if (subxact_data.subxacts)
3353 
3354  subxact_data.subxacts = NULL;
3356  subxact_data.nsubxacts = 0;
3358 }
3359 
3360 /*
3361  * Form the prepared transaction GID for two_phase transactions.
3362  *
3363  * Return the GID in the supplied buffer.
3364  */
3365 static void
3366 TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
3367 {
3368  Assert(subid != InvalidRepOriginId);
3369 
3370  if (!TransactionIdIsValid(xid))
3371  ereport(ERROR,
3372  (errcode(ERRCODE_PROTOCOL_VIOLATION),
3373  errmsg_internal("invalid two-phase transaction ID")));
3374 
3375  snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
3376 }
3377 
3378 /* Logical Replication Apply worker entry point */
3379 void
3381 {
3382  int worker_slot = DatumGetInt32(main_arg);
3384  MemoryContext oldctx;
3385  char originname[NAMEDATALEN];
3386  XLogRecPtr origin_startpos;
3387  char *myslotname;
3389  int server_version;
3390 
3391  /* Attach to slot */
3392  logicalrep_worker_attach(worker_slot);
3393 
3394  /* Setup signal handling */
3396  pqsignal(SIGTERM, die);
3398 
3399  /*
3400  * We don't currently need any ResourceOwner in a walreceiver process, but
3401  * if we did, we could call CreateAuxProcessResourceOwner here.
3402  */
3403 
3404  /* Initialise stats to a sanish value */
3407 
3408  /* Load the libpq-specific functions */
3409  load_file("libpqwalreceiver", false);
3410 
3411  /* Run as replica session replication role. */
3412  SetConfigOption("session_replication_role", "replica",
3414 
3415  /* Connect to our database. */
3418  0);
3419 
3420  /*
3421  * Set always-secure search path, so malicious users can't redirect user
3422  * code (e.g. pg_index.indexprs).
3423  */
3424  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
3425 
3426  /* Load the subscription into persistent memory context. */
3428  "ApplyContext",
3432 
3434  if (!MySubscription)
3435  {
3436  ereport(LOG,
3437  (errmsg("logical replication apply worker for subscription %u will not "
3438  "start because the subscription was removed during startup",
3440  proc_exit(0);
3441  }
3442 
3443  MySubscriptionValid = true;
3444  MemoryContextSwitchTo(oldctx);
3445 
3446  if (!MySubscription->enabled)
3447  {
3448  ereport(LOG,
3449  (errmsg("logical replication apply worker for subscription \"%s\" will not "
3450  "start because the subscription was disabled during startup",
3451  MySubscription->name)));
3452 
3453  proc_exit(0);
3454  }
3455 
3456  /* Setup synchronous commit according to the user's wishes */
3457  SetConfigOption("synchronous_commit", MySubscription->synccommit,
3459 
3460  /* Keep us informed about subscription changes. */
3463  (Datum) 0);
3464 
3465  if (am_tablesync_worker())
3466  ereport(LOG,
3467  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
3469  else
3470  ereport(LOG,
3471  (errmsg("logical replication apply worker for subscription \"%s\" has started",
3472  MySubscription->name)));
3473 
3475 
3476  /* Connect to the origin and start the replication. */
3477  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
3479 
3480  if (am_tablesync_worker())
3481  {
3482  char *syncslotname;
3483 
3484  PG_TRY();
3485  {
3486  /* This is table synchronization worker, call initial sync. */
3487  syncslotname = LogicalRepSyncTableStart(&origin_startpos);
3488  }
3489  PG_CATCH();
3490  {
3491  MemoryContext ecxt = MemoryContextSwitchTo(cctx);
3492  ErrorData *errdata = CopyErrorData();
3493 
3494  /*
3495  * Report the table sync error. There is no corresponding message
3496  * type for table synchronization.
3497  */
3501  0, /* message type */
3503  errdata->message);
3504  MemoryContextSwitchTo(ecxt);
3505  PG_RE_THROW();
3506  }
3507  PG_END_TRY();
3508 
3509  /* allocate slot name in long-lived context */
3510  myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
3511 
3512  pfree(syncslotname);
3513  }
3514  else
3515  {
3516  /* This is main apply worker */
3517  RepOriginId originid;
3518  TimeLineID startpointTLI;
3519  char *err;
3520 
3521  myslotname = MySubscription->slotname;
3522 
3523  /*
3524  * This shouldn't happen if the subscription is enabled, but guard
3525  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
3526  * crash if slot is NULL.)
3527  */
3528  if (!myslotname)
3529  ereport(ERROR,
3530  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3531  errmsg("subscription has no replication slot set")));
3532 
3533  /* Setup replication origin tracking. */
3535  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
3536  originid = replorigin_by_name(originname, true);
3537  if (!OidIsValid(originid))
3538  originid = replorigin_create(originname);
3539  replorigin_session_setup(originid);
3540  replorigin_session_origin = originid;
3541  origin_startpos = replorigin_session_get_progress(false);
3543 
3545  MySubscription->name, &err);
3546  if (LogRepWorkerWalRcvConn == NULL)
3547  ereport(ERROR,
3548  (errcode(ERRCODE_CONNECTION_FAILURE),
3549  errmsg("could not connect to the publisher: %s", err)));
3550 
3551  /*
3552  * We don't really use the output identify_system for anything but it
3553  * does some initializations on the upstream so let's still call it.
3554  */
3555  (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
3556  }
3557 
3558  /*
3559  * Setup callback for syscache so that we know when something changes in
3560  * the subscription relation state.
3561  */
3564  (Datum) 0);
3565 
3566  /* Build logical replication streaming options. */
3567  options.logical = true;
3568  options.startpoint = origin_startpos;
3569  options.slotname = myslotname;
3570 
3572  options.proto.logical.proto_version =
3576 
3577  options.proto.logical.publication_names = MySubscription->publications;
3578  options.proto.logical.binary = MySubscription->binary;
3579  options.proto.logical.streaming = MySubscription->stream;
3580  options.proto.logical.twophase = false;
3581 
3582  if (!am_tablesync_worker())
3583  {
3584  /*
3585  * Even when the two_phase mode is requested by the user, it remains
3586  * as the tri-state PENDING until all tablesyncs have reached READY
3587  * state. Only then, can it become ENABLED.
3588  *
3589  * Note: If the subscription has no tables then leave the state as
3590  * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
3591  * work.
3592  */
3595  {
3596  /* Start streaming with two_phase enabled */
3597  options.proto.logical.twophase = true;
3599 
3604  }
3605  else
3606  {
3608  }
3609 
3610  ereport(DEBUG1,
3611  (errmsg("logical replication apply worker for subscription \"%s\" two_phase is %s.",
3616  "?")));
3617  }
3618  else
3619  {
3620  /* Start normal logical streaming replication. */
3622  }
3623 
3624  /* Run the main loop. */
3625  PG_TRY();
3626  {
3627  LogicalRepApplyLoop(origin_startpos);
3628  }
3629  PG_CATCH();
3630  {
3631  /* report the apply error */
3633  {
3634  MemoryContext ecxt = MemoryContextSwitchTo(cctx);
3635  ErrorData *errdata = CopyErrorData();
3636 
3641  : InvalidOid,
3644  errdata->message);
3645  MemoryContextSwitchTo(ecxt);
3646  }
3647 
3648  PG_RE_THROW();
3649  }
3650  PG_END_TRY();
3651 
3652  proc_exit(0);
3653 }
3654 
3655 /*
3656  * Is current process a logical replication worker?
3657  */
3658 bool
3660 {
3661  return MyLogicalRepWorker != NULL;
3662 }
3663 
3664 /* Error callback to give more context info about the change being applied */
3665 static void
3667 {
3670 
3672  return;
3673 
3674  initStringInfo(&buf);
3675  appendStringInfo(&buf, _("processing remote data during \"%s\""),
3676  logicalrep_message_type(errarg->command));
3677 
3678  /* append relation information */
3679  if (errarg->rel)
3680  {
3681  appendStringInfo(&buf, _(" for replication target relation \"%s.%s\""),
3682  errarg->rel->remoterel.nspname,
3683  errarg->rel->remoterel.relname);
3684  if (errarg->remote_attnum >= 0)
3685  appendStringInfo(&buf, _(" column \"%s\""),
3686  errarg->rel->remoterel.attnames[errarg->remote_attnum]);
3687  }
3688 
3689  /* append transaction information */
3690  if (TransactionIdIsNormal(errarg->remote_xid))
3691  {
3692  appendStringInfo(&buf, _(" in transaction %u"), errarg->remote_xid);
3693  if (errarg->ts != 0)
3694  appendStringInfo(&buf, _(" at %s"),
3695  timestamptz_to_str(errarg->ts));
3696  }
3697 
3698  errcontext("%s", buf.data);
3699  pfree(buf.data);
3700 }
3701 
3702 /* Set transaction information of apply error callback */
3703 static inline void
3705 {
3708 }
3709 
3710 /* Reset all information of apply error callback */
3711 static inline void
3713 {
3718 }
AclResult
Definition: acl.h:178
@ ACLCHECK_OK
Definition: acl.h:179
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3308
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4682
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:1662
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:3204
static void begin_replication_step(void)
Definition: worker.c:372
static void end_replication_step(void)
Definition: worker.c:395
static void cleanup_subxact_info(void)
Definition: worker.c:3349
static void apply_handle_stream_prepare(StringInfo s)
Definition: worker.c:1031
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition: worker.c:1640
static void maybe_reread_subscription(void)
Definition: worker.c:2900
static void subxact_info_add(TransactionId xid)
Definition: worker.c:3126
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:449
static MemoryContext ApplyMessageContext
Definition: worker.c:241
static void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:3225
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:354
static void apply_handle_type(StringInfo s)
Definition: worker.c:1507
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:2247
static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:1301
static void set_apply_error_context_xact(TransactionId xid, TimestampTz ts)
Definition: worker.c:3704
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:2547
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup)
Definition: worker.c:1798
static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
Definition: worker.c:3366
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:3002
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:411
struct ApplyExecutionData ApplyExecutionData
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:3211
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:1523
static BufFile * stream_fd
Definition: worker.c:261
static void apply_dispatch(StringInfo s)
Definition: worker.c:2364
static void apply_handle_update(StringInfo s)
Definition: worker.c:1696
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:1418
struct ApplySubXactData ApplySubXactData
#define NAPTIME_PER_CYCLE
Definition: worker.c:197
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:2484
static void apply_handle_commit_prepared(StringInfo s)
Definition: worker.c:935
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:2563
bool IsLogicalWorker(void)
Definition: worker.c:3659
static ApplySubXactData subxact_data
Definition: worker.c:279
static void store_flush_position(XLogRecPtr remote_lsn)
Definition: worker.c:2528
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
Definition: worker.c:2021
static ApplyErrorCallbackArg apply_error_callback_arg
Definition: worker.c:232
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:1986
bool in_remote_transaction
Definition: worker.c:252
static void apply_handle_delete(StringInfo s)
Definition: worker.c:1863
static void stream_write_change(char action, StringInfo s)
Definition: worker.c:3322
static void apply_handle_begin(StringInfo s)
Definition: worker.c:785
static dlist_head lsn_mapping
Definition: worker.c:206
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition: worker.c:1933
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:591
static void finish_edata(ApplyExecutionData *edata)
Definition: worker.c:502
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:692
static void apply_handle_commit(StringInfo s)
Definition: worker.c:805
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:1200
static void apply_handle_relation(StringInfo s)
Definition: worker.c:1487
struct ApplyErrorCallbackArg ApplyErrorCallbackArg
MemoryContext ApplyContext
Definition: worker.c:242
static void subxact_info_write(Oid subid, TransactionId xid)
Definition: worker.c:3017
static void TargetPrivilegesCheck(Relation rel, AclMode mode)
Definition: worker.c:1540
static void apply_handle_prepare(StringInfo s)
Definition: worker.c:889
static void apply_handle_rollback_prepared(StringInfo s)
Definition: worker.c:976
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:1167
static void apply_handle_origin(StringInfo s)
Definition: worker.c:1082
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:2811
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:247
static XLogRecPtr remote_final_lsn
Definition: worker.c:253
bool MySubscriptionValid
Definition: worker.c:250
static MemoryContext LogicalStreamingContext
Definition: worker.c:245
static void stream_open_file(Oid subid, TransactionId xid, bool first)
Definition: worker.c:3252
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
Definition: worker.c:1452
static bool in_streamed_transaction
Definition: worker.c:256
struct SubXactInfo SubXactInfo
static void apply_handle_begin_prepare(StringInfo s)
Definition: worker.c:831
struct FlushPosition FlushPosition
void ApplyWorkerMain(Datum main_arg)
Definition: worker.c:3380
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:1100
static void apply_error_callback(void *arg)
Definition: worker.c:3666
Subscription * MySubscription
Definition: worker.c:249
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
Definition: worker.c:855
static void stream_close_file(void)
Definition: worker.c:3301
static TransactionId stream_xid
Definition: worker.c:258
static void apply_handle_insert(StringInfo s)
Definition: worker.c:1572
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:533
static void subxact_info_read(Oid subid, TransactionId xid)
Definition: worker.c:3066
static void reset_apply_error_context_info(void)
Definition: worker.c:3712
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1711
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1774
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_IDLE
@ STATE_RUNNING
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:738
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:555
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:286
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
Definition: buffile.c:755
void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset)
Definition: buffile.c:873
void BufFileWrite(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:598
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:662
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition: buffile.c:262
void BufFileClose(BufFile *file)
Definition: buffile.c:407
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
Definition: buffile.c:359
unsigned int uint32
Definition: c.h:441
uint32 TransactionId
Definition: c.h:587
#define OidIsValid(objectId)
Definition: c.h:710
size_t Size
Definition: c.h:540
int64 TimestampTz
Definition: timestamp.h:39
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
int my_log2(long num)
Definition: dynahash.c:1765
int errmsg_internal(const char *fmt,...)
Definition: elog.c:991
int errcode_for_file_access(void)
Definition: elog.c:716
ErrorContextCallback * error_context_stack
Definition: elog.c:93
int errcode(int sqlerrcode)
Definition: elog.c:693
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define _(x)
Definition: elog.c:89
ErrorData * CopyErrorData(void)
Definition: elog.c:1555
#define LOG
Definition: elog.h:25
#define PG_RE_THROW()
Definition: elog.h:340
#define errcontext
Definition: elog.h:190
#define PG_END_TRY()
Definition: elog.h:324
#define PG_TRY()
Definition: elog.h:299
#define DEBUG2
Definition: elog.h:23
#define DEBUG1
Definition: elog.h:24
#define ERROR
Definition: elog.h:33
#define elog(elevel,...)
Definition: elog.h:218
#define PG_CATCH()
Definition: elog.h:309
#define ereport(elevel,...)
Definition: elog.h:143
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:3170
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:123
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:231
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:156
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1697
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition: execMain.c:1193
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2834
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2413
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1191
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1552
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1831
void ExecInitRangeTable(EState *estate, List *rangeTable)
Definition: execUtils.c:751
EState * CreateExecutorState(void)
Definition: execUtils.c:90
void FreeExecutorState(EState *estate)
Definition: execUtils.c:186
#define GetPerTupleExprContext(estate)
Definition: executor.h:533
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:538
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:227
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:316
void FileSetInit(FileSet *fileset)
Definition: fileset.c:54
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
Definition: fmgr.c:1662
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1644
struct Latch * MyLatch
Definition: globals.c:57
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:8141
@ PGC_S_OVERRIDE
Definition: guc.h:116
@ PGC_SUSET
Definition: guc.h:75
@ PGC_SIGHUP
Definition: guc.h:72
@ PGC_BACKEND
Definition: guc.h:74
void ProcessConfigFile(GucContext context)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:515
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:543
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
#define DLIST_STATIC_INIT(name)
Definition: ilist.h:248
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
#define write(a, b, c)
Definition: win32.h:14
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void AcceptInvalidationMessages(void)
Definition: inval.c:745
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1518
void proc_exit(int code)
Definition: ipc.c:104
int i
Definition: isn.c:73
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:500
void ResetLatch(Latch *latch)
Definition: latch.c:660
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_TIMEOUT
Definition: latch.h:128
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
void logicalrep_worker_attach(int slot)
Definition: launcher.c:565
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
Assert(fmt[strlen(fmt) - 1] !='\n')
List * lappend(List *list, void *datum)
Definition: list.c:336
List * lappend_oid(List *list, Oid datum)
Definition: list.c:372
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:701
int LOCKMODE
Definition: lockdefs.h:26
#define NoLock
Definition: lockdefs.h:34
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define AccessShareLock
Definition: lockdefs.h:36
#define RowExclusiveLock
Definition: lockdefs.h:38
@ LockTupleExclusive
Definition: lockoptions.h:58
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:37
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:38
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:91
LogicalRepMsgType
Definition: logicalproto.h:52
@ LOGICAL_REP_MSG_INSERT
Definition: logicalproto.h:56
@ LOGICAL_REP_MSG_TRUNCATE
Definition: logicalproto.h:59
@ LOGICAL_REP_MSG_STREAM_STOP
Definition: logicalproto.h:68
@ LOGICAL_REP_MSG_BEGIN
Definition: logicalproto.h:53
@ LOGICAL_REP_MSG_STREAM_PREPARE
Definition: logicalproto.h:71
@ LOGICAL_REP_MSG_STREAM_ABORT
Definition: logicalproto.h:70
@ LOGICAL_REP_MSG_BEGIN_PREPARE
Definition: logicalproto.h:63
@ LOGICAL_REP_MSG_STREAM_START
Definition: logicalproto.h:67
@ LOGICAL_REP_MSG_COMMIT
Definition: logicalproto.h:54
@ LOGICAL_REP_MSG_PREPARE
Definition: logicalproto.h:64
@ LOGICAL_REP_MSG_RELATION
Definition: logicalproto.h:60
@ LOGICAL_REP_MSG_MESSAGE
Definition: logicalproto.h:62
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
Definition: logicalproto.h:66
@ LOGICAL_REP_MSG_COMMIT_PREPARED
Definition: logicalproto.h:65
@ LOGICAL_REP_MSG_TYPE
Definition: logicalproto.h:61
@ LOGICAL_REP_MSG_DELETE
Definition: logicalproto.h:58
@ LOGICAL_REP_MSG_STREAM_COMMIT
Definition: logicalproto.h:69
@ LOGICAL_REP_MSG_ORIGIN
Definition: logicalproto.h:55
@ LOGICAL_REP_MSG_UPDATE
Definition: logicalproto.h:57
uint32 LogicalRepRelId
Definition: logicalproto.h:95
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:36
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:93
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:92
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1899
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2821
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition: lsyscache.c:2887
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:143
MemoryContext TopTransactionContext
Definition: mcxt.c:53
void pfree(void *pointer)
Definition: mcxt.c:1169
MemoryContext TopMemoryContext
Definition: mcxt.c:48
void * palloc0(Size size)
Definition: mcxt.c:1093
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1182
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1286
void * palloc(Size size)
Definition: mcxt.c:1062
#define AllocSetContextCreate
Definition: memutils.h:173
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:67
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition: miscinit.c:910
Oid GetUserId(void)
Definition: miscinit.c:495
CmdType
Definition: nodes.h:684
@ CMD_INSERT
Definition: nodes.h:688
@ CMD_DELETE
Definition: nodes.h:689
@ CMD_UPDATE
Definition: nodes.h:687
#define makeNode(_type_)
Definition: nodes.h:587
ObjectType get_relkind_objtype(char relkind)
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:156
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:209
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:240
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1071
RepOriginId replorigin_session_origin
Definition: origin.c:154
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1206
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:155
#define InvalidRepOriginId
Definition: origin.h:33
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define ACL_DELETE
Definition: parsenodes.h:85
#define ACL_INSERT
Definition: parsenodes.h:82
#define ACL_UPDATE
Definition: parsenodes.h:84
@ RTE_RELATION
Definition: parsenodes.h:990
@ DROP_RESTRICT
Definition: parsenodes.h:1861
#define ACL_SELECT
Definition: parsenodes.h:83
#define ACL_TRUNCATE
Definition: parsenodes.h:86
uint32 AclMode
Definition: parsenodes.h:80
int16 attnum
Definition: pg_attribute.h:83
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:207
void * arg
static PgChecksumMode mode
Definition: pg_checksums.c:65
#define NAMEDATALEN
#define MAXPGPATH
const void size_t len
static int server_version
Definition: pg_dumpall.c:83
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:256
#define lfirst(lc)
Definition: pg_list.h:169
#define NIL
Definition: pg_list.h:65
#define list_make1(x1)
Definition: pg_list.h:206
static void * list_nth(const List *list, int n)
Definition: pg_list.h:278
#define lfirst_oid(lc)
Definition: pg_list.h:171
static char ** options
void FreeSubscription(Subscription *sub)
Subscription * GetSubscription(Oid subid, bool missing_ok)
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_PENDING
#define LOGICALREP_TWOPHASE_STATE_ENABLED
#define die(msg)
Definition: pg_test_fsync.c:98
static char * buf
Definition: pg_test_fsync.c:70
void pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid, LogicalRepMsgType command, TransactionId xid, const char *errmsg)
Definition: pgstat.c:1952
void pgstat_report_stat(bool disconnect)
Definition: pgstat.c:861
int64 timestamp
Expr * expression_planner(Expr *expr)
Definition: planner.c:5807
int pgsocket
Definition: port.h:29
#define snprintf
Definition: port.h:225
#define PGINVALID_SOCKET
Definition: port.h:31
uintptr_t Datum
Definition: postgres.h:411
#define DatumGetInt32(X)
Definition: postgres.h:516
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5815
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5786
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
char * c
static int fd(const char *x, int i)
Definition: preproc-init.c:105
char * s2
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:95
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:548
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
Definition: proto.c:322
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
Definition: proto.c:131
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:602
char * logicalrep_message_type(LogicalRepMsgType action)
Definition: proto.c:1164
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:739
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:477
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:60
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
Definition: proto.c:264
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:683
void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid)
Definition: proto.c:1151
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:362
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:1109
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:422
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:225
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition: proto.c:1059
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389
#define RelationGetRelid(relation)
Definition: rel.h:478
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:675
#define RelationGetDescr(relation)
Definition: rel.h:504
#define RelationGetRelationName(relation)
Definition: rel.h:512
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:632
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4837
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4858
void fill_extraUpdatedCols(RangeTblEntry *target_rte, Relation target_relation)
Node * build_column_default(Relation rel, int attrno)
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition: rls.c:52
@ RLS_ENABLED
Definition: rls.h:45
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:180
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:250
void PopActiveSnapshot(void)
Definition: snapmgr.c:774
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:680
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition: relation.c:528
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:157
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:258
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:433
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
TransactionId remote_xid
Definition: worker.c:228
LogicalRepMsgType command
Definition: worker.c:223
TimestampTz ts
Definition: worker.c:229
LogicalRepRelMapEntry * rel
Definition: worker.c:224
ResultRelInfo * targetRelInfo
Definition: worker.c:213
EState * estate
Definition: worker.c:210
PartitionTupleRouting * proute
Definition: worker.c:217
ModifyTableState * mtstate
Definition: worker.c:216
LogicalRepRelMapEntry * targetRel
Definition: worker.c:212
uint32 nsubxacts
Definition: worker.c:273
uint32 nsubxacts_max
Definition: worker.c:274
SubXactInfo * subxacts
Definition: worker.c:276
TransactionId subxact_last
Definition: worker.c:275
Definition: attmap.h:35
int maplen
Definition: attmap.h:37
AttrNumber * attnums
Definition: attmap.h:36
List * es_range_table
Definition: execnodes.h:565
List * es_tupleTable
Definition: execnodes.h:607
List * es_opened_result_relations
Definition: execnodes.h:583
CommandId es_output_cid
Definition: execnodes.h:577
struct ErrorContextCallback * previous
Definition: elog.h:232
void(* callback)(void *arg)
Definition: elog.h:233
char * message
Definition: elog.h:368
dlist_node node
Definition: worker.c:201
XLogRecPtr remote_end
Definition: worker.c:203
XLogRecPtr local_end
Definition: worker.c:202
Definition: pg_list.h:51
XLogRecPtr final_lsn
Definition: logicalproto.h:123
TransactionId xid
Definition: logicalproto.h:125
TimestampTz committime
Definition: logicalproto.h:124
TimestampTz committime
Definition: logicalproto.h:132
LogicalRepRelation remoterel
StringInfoData * colvalues
Definition: logicalproto.h:81
TimestampTz last_recv_time
TimestampTz reply_time
FileSet * stream_fileset
XLogRecPtr reply_lsn
XLogRecPtr last_lsn
TimestampTz last_send_time
CmdType operation
Definition: execnodes.h:1193
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1197
PlanState ps
Definition: execnodes.h:1192
Plan * plan
Definition: execnodes.h:971
EState * state
Definition: execnodes.h:973
Bitmapset * updatedCols
Definition: parsenodes.h:1161
RTEKind rtekind
Definition: parsenodes.h:1007
Form_pg_class rd_rel
Definition: rel.h:109
TupleTableSlot * ri_PartitionTupleSlot
Definition: execnodes.h:519
Relation ri_RelationDesc
Definition: execnodes.h:416
TupleConversionMap * ri_RootToPartitionMap
Definition: execnodes.h:518
off_t offset
Definition: worker.c:267
TransactionId xid
Definition: worker.c:265
int fileno
Definition: worker.c:266
AttrMap * attrMap
Definition: tupconvert.h:28
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
bool * tts_isnull
Definition: tuptable.h:128
Datum * tts_values
Definition: tuptable.h:126
dlist_node * cur
Definition: ilist.h:180
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
@ SUBSCRIPTIONRELMAP
Definition: syscache.h:98
@ SUBSCRIPTIONOID
Definition: syscache.h:97
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs)
Definition: tablecmds.c:1755
bool AllTablesyncsReady(void)
Definition: tablesync.c:1258
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:270
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:589
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:922
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1283
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4771
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4751
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:177
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:102
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:475
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:354
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
Definition: twophase.c:2534
void FinishPreparedTransaction(const char *gid, bool isCommit)
Definition: twophase.c:1461
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
@ WAIT_EVENT_LOGICAL_APPLY_MAIN
Definition: wait_event.h:43
static StringInfoData reply_message
Definition: walreceiver.c:118
int wal_receiver_status_interval
Definition: walreceiver.c:89
int wal_receiver_timeout
Definition: walreceiver.c:90
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:418
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:404
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:424
#define walrcv_server_version(conn)
Definition: walreceiver.h:414
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:420
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:412
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:422
int WalWriterDelay
Definition: walwriter.c:70
#define SIGHUP
Definition: win32_port.h:167
static bool am_tablesync_worker(void)
bool PrepareTransactionBlock(const char *gid)
Definition: xact.c:3776
bool IsTransactionState(void)
Definition: xact.c:373
void CommandCounterIncrement(void)
Definition: xact.c:1073
void StartTransactionCommand(void)
Definition: xact.c:2909
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:885
void BeginTransactionBlock(void)
Definition: xact.c:3708
void CommitTransactionCommand(void)
Definition: xact.c:3010
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:813
#define GIDSIZE
Definition: xact.h:31
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:8713
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:344
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint32 TimeLineID
Definition: xlogdefs.h:59