PostgreSQL Source Code  git master
pgoutput.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pgoutput.c
4  * Logical Replication output plugin
5  *
6  * Copyright (c) 2012-2024, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/pgoutput/pgoutput.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/tupconvert.h"
16 #include "catalog/partition.h"
17 #include "catalog/pg_publication.h"
20 #include "commands/defrem.h"
22 #include "executor/executor.h"
23 #include "fmgr.h"
24 #include "nodes/makefuncs.h"
25 #include "optimizer/optimizer.h"
26 #include "parser/parse_relation.h"
27 #include "replication/logical.h"
29 #include "replication/origin.h"
30 #include "replication/pgoutput.h"
31 #include "utils/builtins.h"
32 #include "utils/inval.h"
33 #include "utils/lsyscache.h"
34 #include "utils/memutils.h"
35 #include "utils/rel.h"
36 #include "utils/syscache.h"
37 #include "utils/varlena.h"
38 
40 
42  OutputPluginOptions *opt, bool is_init);
45  ReorderBufferTXN *txn);
47  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
49  ReorderBufferTXN *txn, Relation relation,
50  ReorderBufferChange *change);
52  ReorderBufferTXN *txn, int nrelations, Relation relations[],
53  ReorderBufferChange *change);
55  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
56  bool transactional, const char *prefix,
57  Size sz, const char *message);
59  RepOriginId origin_id);
61  ReorderBufferTXN *txn);
63  ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
65  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
67  ReorderBufferTXN *txn,
68  XLogRecPtr prepare_end_lsn,
69  TimestampTz prepare_time);
70 static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
71  ReorderBufferTXN *txn);
72 static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
73  ReorderBufferTXN *txn);
74 static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
75  ReorderBufferTXN *txn,
76  XLogRecPtr abort_lsn);
77 static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
78  ReorderBufferTXN *txn,
79  XLogRecPtr commit_lsn);
81  ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
82 
83 static bool publications_valid;
84 
85 static List *LoadPublications(List *pubnames);
86 static void publication_invalidation_cb(Datum arg, int cacheid,
87  uint32 hashvalue);
88 static void send_relation_and_attrs(Relation relation, TransactionId xid,
90  Bitmapset *columns);
92  RepOriginId origin_id, XLogRecPtr origin_lsn,
93  bool send_origin);
94 
95 /*
96  * Only 3 publication actions are used for row filtering ("insert", "update",
97  * "delete"). See RelationSyncEntry.exprstate[].
98  */
100 {
104 };
105 
106 #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
107 
108 /*
109  * Entry in the map used to remember which relation schemas we sent.
110  *
111  * The schema_sent flag determines if the current schema record for the
112  * relation (and for its ancestor if publish_as_relid is set) was already
113  * sent to the subscriber (in which case we don't need to send it again).
114  *
115  * The schema cache on downstream is however updated only at commit time,
116  * and with streamed transactions the commit order may be different from
117  * the order the transactions are sent in. Also, the (sub) transactions
118  * might get aborted so we need to send the schema for each (sub) transaction
119  * so that we don't lose the schema information on abort. For handling this,
120  * we maintain the list of xids (streamed_txns) for those we have already sent
121  * the schema.
122  *
123  * For partitions, 'pubactions' considers not only the table's own
124  * publications, but also those of all of its ancestors.
125  */
126 typedef struct RelationSyncEntry
127 {
128  Oid relid; /* relation oid */
129 
130  bool replicate_valid; /* overall validity flag for entry */
131 
133  List *streamed_txns; /* streamed toplevel transactions with this
134  * schema */
135 
136  /* are we publishing this rel? */
138 
139  /*
140  * ExprState array for row filter. Different publication actions don't
141  * allow multiple expressions to always be combined into one, because
142  * updates or deletes restrict the column in expression to be part of the
143  * replica identity index whereas inserts do not have this restriction, so
144  * there is one ExprState per publication action.
145  */
147  EState *estate; /* executor state used for row filter */
148  TupleTableSlot *new_slot; /* slot for storing new tuple */
149  TupleTableSlot *old_slot; /* slot for storing old tuple */
150 
151  /*
152  * OID of the relation to publish changes as. For a partition, this may
153  * be set to one of its ancestors whose schema will be used when
154  * replicating changes, if publish_via_partition_root is set for the
155  * publication.
156  */
158 
159  /*
160  * Map used when replicating using an ancestor's schema to convert tuples
161  * from partition's type to the ancestor's; NULL if publish_as_relid is
162  * same as 'relid' or if unnecessary due to partition and the ancestor
163  * having identical TupleDesc.
164  */
166 
167  /*
168  * Columns included in the publication, or NULL if all columns are
169  * included implicitly. Note that the attnums in this bitmap are not
170  * shifted by FirstLowInvalidHeapAttributeNumber.
171  */
173 
174  /*
175  * Private context to store additional data for this entry - state for the
176  * row filter expressions, column list, etc.
177  */
180 
181 /*
182  * Maintain a per-transaction level variable to track whether the transaction
183  * has sent BEGIN. BEGIN is only sent when the first change in a transaction
184  * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
185  * messages for empty transactions which saves network bandwidth.
186  *
187  * This optimization is not used for prepared transactions because if the
188  * WALSender restarts after prepare of a transaction and before commit prepared
189  * of the same transaction then we won't be able to figure out if we have
190  * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
191  * because we would have lost the in-memory txndata information that was
192  * present prior to the restart. This will result in sending a spurious
193  * COMMIT PREPARED without a corresponding prepared transaction at the
194  * downstream which would lead to an error when it tries to process it.
195  *
196  * XXX We could achieve this optimization by changing protocol to send
197  * additional information so that downstream can detect that the corresponding
198  * prepare has not been sent. However, adding such a check for every
199  * transaction in the downstream could be costly so we might want to do it
200  * optionally.
201  *
202  * We also don't have this optimization for streamed transactions because
203  * they can contain prepared transactions.
204  */
205 typedef struct PGOutputTxnData
206 {
207  bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
209 
210 /* Map used to remember which relation schemas we sent. */
211 static HTAB *RelationSyncCache = NULL;
212 
213 static void init_rel_sync_cache(MemoryContext cachectx);
214 static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
216  Relation relation);
217 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
218 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
219  uint32 hashvalue);
221  TransactionId xid);
223  TransactionId xid);
224 static void init_tuple_slot(PGOutputData *data, Relation relation,
225  RelationSyncEntry *entry);
226 
227 /* row filter routines */
230  List *publications,
231  RelationSyncEntry *entry);
233  ExprContext *econtext);
234 static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
235  TupleTableSlot **new_slot_ptr,
236  RelationSyncEntry *entry,
238 
239 /* column list routines */
241  List *publications,
242  RelationSyncEntry *entry);
243 
244 /*
245  * Specify output plugin callbacks
246  */
247 void
249 {
256 
263 
264  /* transaction streaming */
272  /* transaction streaming - two-phase commit */
274 }
275 
276 static void
278 {
279  ListCell *lc;
280  bool protocol_version_given = false;
281  bool publication_names_given = false;
282  bool binary_option_given = false;
283  bool messages_option_given = false;
284  bool streaming_given = false;
285  bool two_phase_option_given = false;
286  bool origin_option_given = false;
287 
288  data->binary = false;
289  data->streaming = LOGICALREP_STREAM_OFF;
290  data->messages = false;
291  data->two_phase = false;
292 
293  foreach(lc, options)
294  {
295  DefElem *defel = (DefElem *) lfirst(lc);
296 
297  Assert(defel->arg == NULL || IsA(defel->arg, String));
298 
299  /* Check each param, whether or not we recognize it */
300  if (strcmp(defel->defname, "proto_version") == 0)
301  {
302  unsigned long parsed;
303  char *endptr;
304 
305  if (protocol_version_given)
306  ereport(ERROR,
307  (errcode(ERRCODE_SYNTAX_ERROR),
308  errmsg("conflicting or redundant options")));
309  protocol_version_given = true;
310 
311  errno = 0;
312  parsed = strtoul(strVal(defel->arg), &endptr, 10);
313  if (errno != 0 || *endptr != '\0')
314  ereport(ERROR,
315  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
316  errmsg("invalid proto_version")));
317 
318  if (parsed > PG_UINT32_MAX)
319  ereport(ERROR,
320  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
321  errmsg("proto_version \"%s\" out of range",
322  strVal(defel->arg))));
323 
324  data->protocol_version = (uint32) parsed;
325  }
326  else if (strcmp(defel->defname, "publication_names") == 0)
327  {
328  if (publication_names_given)
329  ereport(ERROR,
330  (errcode(ERRCODE_SYNTAX_ERROR),
331  errmsg("conflicting or redundant options")));
332  publication_names_given = true;
333 
334  if (!SplitIdentifierString(strVal(defel->arg), ',',
335  &data->publication_names))
336  ereport(ERROR,
337  (errcode(ERRCODE_INVALID_NAME),
338  errmsg("invalid publication_names syntax")));
339  }
340  else if (strcmp(defel->defname, "binary") == 0)
341  {
342  if (binary_option_given)
343  ereport(ERROR,
344  (errcode(ERRCODE_SYNTAX_ERROR),
345  errmsg("conflicting or redundant options")));
346  binary_option_given = true;
347 
348  data->binary = defGetBoolean(defel);
349  }
350  else if (strcmp(defel->defname, "messages") == 0)
351  {
352  if (messages_option_given)
353  ereport(ERROR,
354  (errcode(ERRCODE_SYNTAX_ERROR),
355  errmsg("conflicting or redundant options")));
356  messages_option_given = true;
357 
358  data->messages = defGetBoolean(defel);
359  }
360  else if (strcmp(defel->defname, "streaming") == 0)
361  {
362  if (streaming_given)
363  ereport(ERROR,
364  (errcode(ERRCODE_SYNTAX_ERROR),
365  errmsg("conflicting or redundant options")));
366  streaming_given = true;
367 
368  data->streaming = defGetStreamingMode(defel);
369  }
370  else if (strcmp(defel->defname, "two_phase") == 0)
371  {
372  if (two_phase_option_given)
373  ereport(ERROR,
374  (errcode(ERRCODE_SYNTAX_ERROR),
375  errmsg("conflicting or redundant options")));
376  two_phase_option_given = true;
377 
378  data->two_phase = defGetBoolean(defel);
379  }
380  else if (strcmp(defel->defname, "origin") == 0)
381  {
382  char *origin;
383 
384  if (origin_option_given)
385  ereport(ERROR,
386  errcode(ERRCODE_SYNTAX_ERROR),
387  errmsg("conflicting or redundant options"));
388  origin_option_given = true;
389 
390  origin = defGetString(defel);
391  if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
392  data->publish_no_origin = true;
393  else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
394  data->publish_no_origin = false;
395  else
396  ereport(ERROR,
397  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
398  errmsg("unrecognized origin value: \"%s\"", origin));
399  }
400  else
401  elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
402  }
403 
404  /* Check required options */
405  if (!protocol_version_given)
406  ereport(ERROR,
407  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
408  errmsg("proto_version option missing"));
409  if (!publication_names_given)
410  ereport(ERROR,
411  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
412  errmsg("publication_names option missing"));
413 }
414 
415 /*
416  * Initialize this plugin
417  */
418 static void
420  bool is_init)
421 {
423  static bool publication_callback_registered = false;
424 
425  /* Create our memory context for private allocations. */
426  data->context = AllocSetContextCreate(ctx->context,
427  "logical replication output context",
429 
430  data->cachectx = AllocSetContextCreate(ctx->context,
431  "logical replication cache context",
433 
435 
436  /* This plugin uses binary protocol. */
438 
439  /*
440  * This is replication start and not slot initialization.
441  *
442  * Parse and validate options passed by the client.
443  */
444  if (!is_init)
445  {
446  /* Parse the params and ERROR if we see any we don't recognize */
448 
449  /* Check if we support requested protocol */
450  if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
451  ereport(ERROR,
452  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
453  errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
454  data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
455 
456  if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
457  ereport(ERROR,
458  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
459  errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
460  data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
461 
462  /*
463  * Decide whether to enable streaming. It is disabled by default, in
464  * which case we just update the flag in decoding context. Otherwise
465  * we only allow it with sufficient version of the protocol, and when
466  * the output plugin supports it.
467  */
468  if (data->streaming == LOGICALREP_STREAM_OFF)
469  ctx->streaming = false;
470  else if (data->streaming == LOGICALREP_STREAM_ON &&
471  data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
472  ereport(ERROR,
473  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
474  errmsg("requested proto_version=%d does not support streaming, need %d or higher",
475  data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
476  else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
478  ereport(ERROR,
479  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
480  errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
482  else if (!ctx->streaming)
483  ereport(ERROR,
484  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
485  errmsg("streaming requested, but not supported by output plugin")));
486 
487  /*
488  * Here, we just check whether the two-phase option is passed by
489  * plugin and decide whether to enable it at later point of time. It
490  * remains enabled if the previous start-up has done so. But we only
491  * allow the option to be passed in with sufficient version of the
492  * protocol, and when the output plugin supports it.
493  */
494  if (!data->two_phase)
495  ctx->twophase_opt_given = false;
496  else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
497  ereport(ERROR,
498  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
499  errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
500  data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
501  else if (!ctx->twophase)
502  ereport(ERROR,
503  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
504  errmsg("two-phase commit requested, but not supported by output plugin")));
505  else
506  ctx->twophase_opt_given = true;
507 
508  /* Init publication state. */
509  data->publications = NIL;
510  publications_valid = false;
511 
512  /*
513  * Register callback for pg_publication if we didn't already do that
514  * during some previous call in this process.
515  */
516  if (!publication_callback_registered)
517  {
518  CacheRegisterSyscacheCallback(PUBLICATIONOID,
520  (Datum) 0);
521  publication_callback_registered = true;
522  }
523 
524  /* Initialize relation schema cache. */
526  }
527  else
528  {
529  /*
530  * Disable the streaming and prepared transactions during the slot
531  * initialization mode.
532  */
533  ctx->streaming = false;
534  ctx->twophase = false;
535  }
536 }
537 
538 /*
539  * BEGIN callback.
540  *
541  * Don't send the BEGIN message here instead postpone it until the first
542  * change. In logical replication, a common scenario is to replicate a set of
543  * tables (instead of all tables) and transactions whose changes were on
544  * the table(s) that are not published will produce empty transactions. These
545  * empty transactions will send BEGIN and COMMIT messages to subscribers,
546  * using bandwidth on something with little/no use for logical replication.
547  */
548 static void
550 {
552  sizeof(PGOutputTxnData));
553 
554  txn->output_plugin_private = txndata;
555 }
556 
557 /*
558  * Send BEGIN.
559  *
560  * This is called while processing the first change of the transaction.
561  */
562 static void
564 {
565  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
567 
568  Assert(txndata);
569  Assert(!txndata->sent_begin_txn);
570 
571  OutputPluginPrepareWrite(ctx, !send_replication_origin);
572  logicalrep_write_begin(ctx->out, txn);
573  txndata->sent_begin_txn = true;
574 
575  send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
576  send_replication_origin);
577 
578  OutputPluginWrite(ctx, true);
579 }
580 
581 /*
582  * COMMIT callback
583  */
584 static void
586  XLogRecPtr commit_lsn)
587 {
589  bool sent_begin_txn;
590 
591  Assert(txndata);
592 
593  /*
594  * We don't need to send the commit message unless some relevant change
595  * from this transaction has been sent to the downstream.
596  */
597  sent_begin_txn = txndata->sent_begin_txn;
598  OutputPluginUpdateProgress(ctx, !sent_begin_txn);
599  pfree(txndata);
600  txn->output_plugin_private = NULL;
601 
602  if (!sent_begin_txn)
603  {
604  elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
605  return;
606  }
607 
608  OutputPluginPrepareWrite(ctx, true);
609  logicalrep_write_commit(ctx->out, txn, commit_lsn);
610  OutputPluginWrite(ctx, true);
611 }
612 
613 /*
614  * BEGIN PREPARE callback
615  */
616 static void
618 {
619  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
620 
621  OutputPluginPrepareWrite(ctx, !send_replication_origin);
623 
624  send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
625  send_replication_origin);
626 
627  OutputPluginWrite(ctx, true);
628 }
629 
630 /*
631  * PREPARE callback
632  */
633 static void
635  XLogRecPtr prepare_lsn)
636 {
637  OutputPluginUpdateProgress(ctx, false);
638 
639  OutputPluginPrepareWrite(ctx, true);
640  logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
641  OutputPluginWrite(ctx, true);
642 }
643 
644 /*
645  * COMMIT PREPARED callback
646  */
647 static void
649  XLogRecPtr commit_lsn)
650 {
651  OutputPluginUpdateProgress(ctx, false);
652 
653  OutputPluginPrepareWrite(ctx, true);
654  logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
655  OutputPluginWrite(ctx, true);
656 }
657 
658 /*
659  * ROLLBACK PREPARED callback
660  */
661 static void
663  ReorderBufferTXN *txn,
664  XLogRecPtr prepare_end_lsn,
665  TimestampTz prepare_time)
666 {
667  OutputPluginUpdateProgress(ctx, false);
668 
669  OutputPluginPrepareWrite(ctx, true);
670  logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
671  prepare_time);
672  OutputPluginWrite(ctx, true);
673 }
674 
675 /*
676  * Write the current schema of the relation and its ancestor (if any) if not
677  * done yet.
678  */
679 static void
681  ReorderBufferChange *change,
682  Relation relation, RelationSyncEntry *relentry)
683 {
685  bool schema_sent;
688 
689  /*
690  * Remember XID of the (sub)transaction for the change. We don't care if
691  * it's top-level transaction or not (we have already sent that XID in
692  * start of the current streaming block).
693  *
694  * If we're not in a streaming block, just use InvalidTransactionId and
695  * the write methods will not include it.
696  */
697  if (data->in_streaming)
698  xid = change->txn->xid;
699 
700  if (rbtxn_is_subtxn(change->txn))
701  topxid = rbtxn_get_toptxn(change->txn)->xid;
702  else
703  topxid = xid;
704 
705  /*
706  * Do we need to send the schema? We do track streamed transactions
707  * separately, because those may be applied later (and the regular
708  * transactions won't see their effects until then) and in an order that
709  * we don't know at this point.
710  *
711  * XXX There is a scope of optimization here. Currently, we always send
712  * the schema first time in a streaming transaction but we can probably
713  * avoid that by checking 'relentry->schema_sent' flag. However, before
714  * doing that we need to study its impact on the case where we have a mix
715  * of streaming and non-streaming transactions.
716  */
717  if (data->in_streaming)
718  schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
719  else
720  schema_sent = relentry->schema_sent;
721 
722  /* Nothing to do if we already sent the schema. */
723  if (schema_sent)
724  return;
725 
726  /*
727  * Send the schema. If the changes will be published using an ancestor's
728  * schema, not the relation's own, send that ancestor's schema before
729  * sending relation's own (XXX - maybe sending only the former suffices?).
730  */
731  if (relentry->publish_as_relid != RelationGetRelid(relation))
732  {
733  Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
734 
735  send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
736  RelationClose(ancestor);
737  }
738 
739  send_relation_and_attrs(relation, xid, ctx, relentry->columns);
740 
741  if (data->in_streaming)
742  set_schema_sent_in_streamed_txn(relentry, topxid);
743  else
744  relentry->schema_sent = true;
745 }
746 
747 /*
748  * Sends a relation
749  */
750 static void
753  Bitmapset *columns)
754 {
755  TupleDesc desc = RelationGetDescr(relation);
756  int i;
757 
758  /*
759  * Write out type info if needed. We do that only for user-created types.
760  * We use FirstGenbkiObjectId as the cutoff, so that we only consider
761  * objects with hand-assigned OIDs to be "built in", not for instance any
762  * function or type defined in the information_schema. This is important
763  * because only hand-assigned OIDs can be expected to remain stable across
764  * major versions.
765  */
766  for (i = 0; i < desc->natts; i++)
767  {
768  Form_pg_attribute att = TupleDescAttr(desc, i);
769 
770  if (att->attisdropped || att->attgenerated)
771  continue;
772 
773  if (att->atttypid < FirstGenbkiObjectId)
774  continue;
775 
776  /* Skip this attribute if it's not present in the column list */
777  if (columns != NULL && !bms_is_member(att->attnum, columns))
778  continue;
779 
780  OutputPluginPrepareWrite(ctx, false);
781  logicalrep_write_typ(ctx->out, xid, att->atttypid);
782  OutputPluginWrite(ctx, false);
783  }
784 
785  OutputPluginPrepareWrite(ctx, false);
786  logicalrep_write_rel(ctx->out, xid, relation, columns);
787  OutputPluginWrite(ctx, false);
788 }
789 
790 /*
791  * Executor state preparation for evaluation of row filter expressions for the
792  * specified relation.
793  */
794 static EState *
796 {
797  EState *estate;
798  RangeTblEntry *rte;
799  List *perminfos = NIL;
800 
801  estate = CreateExecutorState();
802 
803  rte = makeNode(RangeTblEntry);
804  rte->rtekind = RTE_RELATION;
805  rte->relid = RelationGetRelid(rel);
806  rte->relkind = rel->rd_rel->relkind;
808 
809  addRTEPermissionInfo(&perminfos, rte);
810 
811  ExecInitRangeTable(estate, list_make1(rte), perminfos);
812 
813  estate->es_output_cid = GetCurrentCommandId(false);
814 
815  return estate;
816 }
817 
818 /*
819  * Evaluates row filter.
820  *
821  * If the row filter evaluates to NULL, it is taken as false i.e. the change
822  * isn't replicated.
823  */
824 static bool
826 {
827  Datum ret;
828  bool isnull;
829 
830  Assert(state != NULL);
831 
832  ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
833 
834  elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
835  isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
836  isnull ? "true" : "false");
837 
838  if (isnull)
839  return false;
840 
841  return DatumGetBool(ret);
842 }
843 
844 /*
845  * Make sure the per-entry memory context exists.
846  */
847 static void
849 {
850  Relation relation;
851 
852  /* The context may already exist, in which case bail out. */
853  if (entry->entry_cxt)
854  return;
855 
856  relation = RelationIdGetRelation(entry->publish_as_relid);
857 
858  entry->entry_cxt = AllocSetContextCreate(data->cachectx,
859  "entry private context",
861 
863  RelationGetRelationName(relation));
864 }
865 
866 /*
867  * Initialize the row filter.
868  */
869 static void
871  RelationSyncEntry *entry)
872 {
873  ListCell *lc;
874  List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
875  bool no_filter[] = {false, false, false}; /* One per pubaction */
876  MemoryContext oldctx;
877  int idx;
878  bool has_filter = true;
879  Oid schemaid = get_rel_namespace(entry->publish_as_relid);
880 
881  /*
882  * Find if there are any row filters for this relation. If there are, then
883  * prepare the necessary ExprState and cache it in entry->exprstate. To
884  * build an expression state, we need to ensure the following:
885  *
886  * All the given publication-table mappings must be checked.
887  *
888  * Multiple publications might have multiple row filters for this
889  * relation. Since row filter usage depends on the DML operation, there
890  * are multiple lists (one for each operation) to which row filters will
891  * be appended.
892  *
893  * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
894  * expression" so it takes precedence.
895  */
896  foreach(lc, publications)
897  {
898  Publication *pub = lfirst(lc);
899  HeapTuple rftuple = NULL;
900  Datum rfdatum = 0;
901  bool pub_no_filter = true;
902 
903  /*
904  * If the publication is FOR ALL TABLES, or the publication includes a
905  * FOR TABLES IN SCHEMA where the table belongs to the referred
906  * schema, then it is treated the same as if there are no row filters
907  * (even if other publications have a row filter).
908  */
909  if (!pub->alltables &&
910  !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
911  ObjectIdGetDatum(schemaid),
912  ObjectIdGetDatum(pub->oid)))
913  {
914  /*
915  * Check for the presence of a row filter in this publication.
916  */
917  rftuple = SearchSysCache2(PUBLICATIONRELMAP,
919  ObjectIdGetDatum(pub->oid));
920 
921  if (HeapTupleIsValid(rftuple))
922  {
923  /* Null indicates no filter. */
924  rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
925  Anum_pg_publication_rel_prqual,
926  &pub_no_filter);
927  }
928  }
929 
930  if (pub_no_filter)
931  {
932  if (rftuple)
933  ReleaseSysCache(rftuple);
934 
935  no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
936  no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
937  no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
938 
939  /*
940  * Quick exit if all the DML actions are publicized via this
941  * publication.
942  */
943  if (no_filter[PUBACTION_INSERT] &&
944  no_filter[PUBACTION_UPDATE] &&
945  no_filter[PUBACTION_DELETE])
946  {
947  has_filter = false;
948  break;
949  }
950 
951  /* No additional work for this publication. Next one. */
952  continue;
953  }
954 
955  /* Form the per pubaction row filter lists. */
956  if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
957  rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
958  TextDatumGetCString(rfdatum));
959  if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
960  rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
961  TextDatumGetCString(rfdatum));
962  if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
963  rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
964  TextDatumGetCString(rfdatum));
965 
966  ReleaseSysCache(rftuple);
967  } /* loop all subscribed publications */
968 
969  /* Clean the row filter */
970  for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
971  {
972  if (no_filter[idx])
973  {
974  list_free_deep(rfnodes[idx]);
975  rfnodes[idx] = NIL;
976  }
977  }
978 
979  if (has_filter)
980  {
982 
984 
985  /*
986  * Now all the filters for all pubactions are known. Combine them when
987  * their pubactions are the same.
988  */
989  oldctx = MemoryContextSwitchTo(entry->entry_cxt);
990  entry->estate = create_estate_for_relation(relation);
991  for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
992  {
993  List *filters = NIL;
994  Expr *rfnode;
995 
996  if (rfnodes[idx] == NIL)
997  continue;
998 
999  foreach(lc, rfnodes[idx])
1000  filters = lappend(filters, stringToNode((char *) lfirst(lc)));
1001 
1002  /* combine the row filter and cache the ExprState */
1003  rfnode = make_orclause(filters);
1004  entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
1005  } /* for each pubaction */
1006  MemoryContextSwitchTo(oldctx);
1007 
1008  RelationClose(relation);
1009  }
1010 }
1011 
1012 /*
1013  * Initialize the column list.
1014  */
1015 static void
1017  RelationSyncEntry *entry)
1018 {
1019  ListCell *lc;
1020  bool first = true;
1021  Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1022 
1023  /*
1024  * Find if there are any column lists for this relation. If there are,
1025  * build a bitmap using the column lists.
1026  *
1027  * Multiple publications might have multiple column lists for this
1028  * relation.
1029  *
1030  * Note that we don't support the case where the column list is different
1031  * for the same table when combining publications. See comments atop
1032  * fetch_table_list. But one can later change the publication so we still
1033  * need to check all the given publication-table mappings and report an
1034  * error if any publications have a different column list.
1035  *
1036  * FOR ALL TABLES and FOR TABLES IN SCHEMA imply "don't use column list".
1037  */
1038  foreach(lc, publications)
1039  {
1040  Publication *pub = lfirst(lc);
1041  HeapTuple cftuple = NULL;
1042  Datum cfdatum = 0;
1043  Bitmapset *cols = NULL;
1044 
1045  /*
1046  * If the publication is FOR ALL TABLES then it is treated the same as
1047  * if there are no column lists (even if other publications have a
1048  * list).
1049  */
1050  if (!pub->alltables)
1051  {
1052  bool pub_no_list = true;
1053 
1054  /*
1055  * Check for the presence of a column list in this publication.
1056  *
1057  * Note: If we find no pg_publication_rel row, it's a publication
1058  * defined for a whole schema, so it can't have a column list,
1059  * just like a FOR ALL TABLES publication.
1060  */
1061  cftuple = SearchSysCache2(PUBLICATIONRELMAP,
1063  ObjectIdGetDatum(pub->oid));
1064 
1065  if (HeapTupleIsValid(cftuple))
1066  {
1067  /* Lookup the column list attribute. */
1068  cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
1069  Anum_pg_publication_rel_prattrs,
1070  &pub_no_list);
1071 
1072  /* Build the column list bitmap in the per-entry context. */
1073  if (!pub_no_list) /* when not null */
1074  {
1075  int i;
1076  int nliveatts = 0;
1077  TupleDesc desc = RelationGetDescr(relation);
1078 
1080 
1081  cols = pub_collist_to_bitmapset(cols, cfdatum,
1082  entry->entry_cxt);
1083 
1084  /* Get the number of live attributes. */
1085  for (i = 0; i < desc->natts; i++)
1086  {
1087  Form_pg_attribute att = TupleDescAttr(desc, i);
1088 
1089  if (att->attisdropped || att->attgenerated)
1090  continue;
1091 
1092  nliveatts++;
1093  }
1094 
1095  /*
1096  * If column list includes all the columns of the table,
1097  * set it to NULL.
1098  */
1099  if (bms_num_members(cols) == nliveatts)
1100  {
1101  bms_free(cols);
1102  cols = NULL;
1103  }
1104  }
1105 
1106  ReleaseSysCache(cftuple);
1107  }
1108  }
1109 
1110  if (first)
1111  {
1112  entry->columns = cols;
1113  first = false;
1114  }
1115  else if (!bms_equal(entry->columns, cols))
1116  ereport(ERROR,
1117  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1118  errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
1120  RelationGetRelationName(relation)));
1121  } /* loop all subscribed publications */
1122 
1123  RelationClose(relation);
1124 }
1125 
1126 /*
1127  * Initialize the slot for storing new and old tuples, and build the map that
1128  * will be used to convert the relation's tuples into the ancestor's format.
1129  */
1130 static void
1132  RelationSyncEntry *entry)
1133 {
1134  MemoryContext oldctx;
1135  TupleDesc oldtupdesc;
1136  TupleDesc newtupdesc;
1137 
1138  oldctx = MemoryContextSwitchTo(data->cachectx);
1139 
1140  /*
1141  * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1142  * live as long as the cache remains.
1143  */
1144  oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1145  newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1146 
1147  entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1148  entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
1149 
1150  MemoryContextSwitchTo(oldctx);
1151 
1152  /*
1153  * Cache the map that will be used to convert the relation's tuples into
1154  * the ancestor's format, if needed.
1155  */
1156  if (entry->publish_as_relid != RelationGetRelid(relation))
1157  {
1158  Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
1159  TupleDesc indesc = RelationGetDescr(relation);
1160  TupleDesc outdesc = RelationGetDescr(ancestor);
1161 
1162  /* Map must live as long as the session does. */
1164 
1165  entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
1166 
1167  MemoryContextSwitchTo(oldctx);
1168  RelationClose(ancestor);
1169  }
1170 }
1171 
1172 /*
1173  * Change is checked against the row filter if any.
1174  *
1175  * Returns true if the change is to be replicated, else false.
1176  *
1177  * For inserts, evaluate the row filter for new tuple.
1178  * For deletes, evaluate the row filter for old tuple.
1179  * For updates, evaluate the row filter for old and new tuple.
1180  *
1181  * For updates, if both evaluations are true, we allow sending the UPDATE and
1182  * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
1183  * only one of the tuples matches the row filter expression, we transform
1184  * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
1185  * following rules:
1186  *
1187  * Case 1: old-row (no match) new-row (no match) -> (drop change)
1188  * Case 2: old-row (no match) new row (match) -> INSERT
1189  * Case 3: old-row (match) new-row (no match) -> DELETE
1190  * Case 4: old-row (match) new row (match) -> UPDATE
1191  *
1192  * The new action is updated in the action parameter.
1193  *
1194  * The new slot could be updated when transforming the UPDATE into INSERT,
1195  * because the original new tuple might not have column values from the replica
1196  * identity.
1197  *
1198  * Examples:
1199  * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
1200  * Since the old tuple satisfies, the initial table synchronization copied this
1201  * row (or another method was used to guarantee that there is data
1202  * consistency). However, after the UPDATE the new tuple doesn't satisfy the
1203  * row filter, so from a data consistency perspective, that row should be
1204  * removed on the subscriber. The UPDATE should be transformed into a DELETE
1205  * statement and be sent to the subscriber. Keeping this row on the subscriber
1206  * is undesirable because it doesn't reflect what was defined in the row filter
1207  * expression on the publisher. This row on the subscriber would likely not be
1208  * modified by replication again. If someone inserted a new row with the same
1209  * old identifier, replication could stop due to a constraint violation.
1210  *
1211  * Let's say the old tuple doesn't match the row filter but the new tuple does.
1212  * Since the old tuple doesn't satisfy, the initial table synchronization
1213  * probably didn't copy this row. However, after the UPDATE the new tuple does
1214  * satisfy the row filter, so from a data consistency perspective, that row
1215  * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
1216  * statements have no effect (it matches no row -- see
1217  * apply_handle_update_internal()). So, the UPDATE should be transformed into a
1218  * INSERT statement and be sent to the subscriber. However, this might surprise
1219  * someone who expects the data set to satisfy the row filter expression on the
1220  * provider.
1221  */
1222 static bool
1224  TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
1226 {
1227  TupleDesc desc;
1228  int i;
1229  bool old_matched,
1230  new_matched,
1231  result;
1232  TupleTableSlot *tmp_new_slot;
1233  TupleTableSlot *new_slot = *new_slot_ptr;
1234  ExprContext *ecxt;
1235  ExprState *filter_exprstate;
1236 
1237  /*
1238  * We need this map to avoid relying on ReorderBufferChangeType enums
1239  * having specific values.
1240  */
1241  static const int map_changetype_pubaction[] = {
1245  };
1246 
1250 
1251  Assert(new_slot || old_slot);
1252 
1253  /* Get the corresponding row filter */
1254  filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1255 
1256  /* Bail out if there is no row filter */
1257  if (!filter_exprstate)
1258  return true;
1259 
1260  elog(DEBUG3, "table \"%s.%s\" has row filter",
1262  RelationGetRelationName(relation));
1263 
1265 
1266  ecxt = GetPerTupleExprContext(entry->estate);
1267 
1268  /*
1269  * For the following occasions where there is only one tuple, we can
1270  * evaluate the row filter for that tuple and return.
1271  *
1272  * For inserts, we only have the new tuple.
1273  *
1274  * For updates, we can have only a new tuple when none of the replica
1275  * identity columns changed and none of those columns have external data
1276  * but we still need to evaluate the row filter for the new tuple as the
1277  * existing values of those columns might not match the filter. Also,
1278  * users can use constant expressions in the row filter, so we anyway need
1279  * to evaluate it for the new tuple.
1280  *
1281  * For deletes, we only have the old tuple.
1282  */
1283  if (!new_slot || !old_slot)
1284  {
1285  ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1286  result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1287 
1288  return result;
1289  }
1290 
1291  /*
1292  * Both the old and new tuples must be valid only for updates and need to
1293  * be checked against the row filter.
1294  */
1295  Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1296 
1297  slot_getallattrs(new_slot);
1298  slot_getallattrs(old_slot);
1299 
1300  tmp_new_slot = NULL;
1301  desc = RelationGetDescr(relation);
1302 
1303  /*
1304  * The new tuple might not have all the replica identity columns, in which
1305  * case it needs to be copied over from the old tuple.
1306  */
1307  for (i = 0; i < desc->natts; i++)
1308  {
1309  Form_pg_attribute att = TupleDescAttr(desc, i);
1310 
1311  /*
1312  * if the column in the new tuple or old tuple is null, nothing to do
1313  */
1314  if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1315  continue;
1316 
1317  /*
1318  * Unchanged toasted replica identity columns are only logged in the
1319  * old tuple. Copy this over to the new tuple. The changed (or WAL
1320  * Logged) toast values are always assembled in memory and set as
1321  * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1322  */
1323  if (att->attlen == -1 &&
1324  VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
1325  !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
1326  {
1327  if (!tmp_new_slot)
1328  {
1329  tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
1330  ExecClearTuple(tmp_new_slot);
1331 
1332  memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1333  desc->natts * sizeof(Datum));
1334  memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1335  desc->natts * sizeof(bool));
1336  }
1337 
1338  tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1339  tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1340  }
1341  }
1342 
1343  ecxt->ecxt_scantuple = old_slot;
1344  old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1345 
1346  if (tmp_new_slot)
1347  {
1348  ExecStoreVirtualTuple(tmp_new_slot);
1349  ecxt->ecxt_scantuple = tmp_new_slot;
1350  }
1351  else
1352  ecxt->ecxt_scantuple = new_slot;
1353 
1354  new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1355 
1356  /*
1357  * Case 1: if both tuples don't match the row filter, bailout. Send
1358  * nothing.
1359  */
1360  if (!old_matched && !new_matched)
1361  return false;
1362 
1363  /*
1364  * Case 2: if the old tuple doesn't satisfy the row filter but the new
1365  * tuple does, transform the UPDATE into INSERT.
1366  *
1367  * Use the newly transformed tuple that must contain the column values for
1368  * all the replica identity columns. This is required to ensure that the
1369  * while inserting the tuple in the downstream node, we have all the
1370  * required column values.
1371  */
1372  if (!old_matched && new_matched)
1373  {
1375 
1376  if (tmp_new_slot)
1377  *new_slot_ptr = tmp_new_slot;
1378  }
1379 
1380  /*
1381  * Case 3: if the old tuple satisfies the row filter but the new tuple
1382  * doesn't, transform the UPDATE into DELETE.
1383  *
1384  * This transformation does not require another tuple. The Old tuple will
1385  * be used for DELETE.
1386  */
1387  else if (old_matched && !new_matched)
1389 
1390  /*
1391  * Case 4: if both tuples match the row filter, transformation isn't
1392  * required. (*action is default UPDATE).
1393  */
1394 
1395  return true;
1396 }
1397 
1398 /*
1399  * Sends the decoded DML over wire.
1400  *
1401  * This is called both in streaming and non-streaming modes.
1402  */
1403 static void
1405  Relation relation, ReorderBufferChange *change)
1406 {
1409  MemoryContext old;
1410  RelationSyncEntry *relentry;
1412  Relation ancestor = NULL;
1413  Relation targetrel = relation;
1415  TupleTableSlot *old_slot = NULL;
1416  TupleTableSlot *new_slot = NULL;
1417 
1418  if (!is_publishable_relation(relation))
1419  return;
1420 
1421  /*
1422  * Remember the xid for the change in streaming mode. We need to send xid
1423  * with each change in the streaming mode so that subscriber can make
1424  * their association and on aborts, it can discard the corresponding
1425  * changes.
1426  */
1427  if (data->in_streaming)
1428  xid = change->txn->xid;
1429 
1430  relentry = get_rel_sync_entry(data, relation);
1431 
1432  /* First check the table filter */
1433  switch (action)
1434  {
1436  if (!relentry->pubactions.pubinsert)
1437  return;
1438  break;
1440  if (!relentry->pubactions.pubupdate)
1441  return;
1442  break;
1444  if (!relentry->pubactions.pubdelete)
1445  return;
1446 
1447  /*
1448  * This is only possible if deletes are allowed even when replica
1449  * identity is not defined for a table. Since the DELETE action
1450  * can't be published, we simply return.
1451  */
1452  if (!change->data.tp.oldtuple)
1453  {
1454  elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1455  return;
1456  }
1457  break;
1458  default:
1459  Assert(false);
1460  }
1461 
1462  /* Avoid leaking memory by using and resetting our own context */
1463  old = MemoryContextSwitchTo(data->context);
1464 
1465  /* Switch relation if publishing via root. */
1466  if (relentry->publish_as_relid != RelationGetRelid(relation))
1467  {
1468  Assert(relation->rd_rel->relispartition);
1469  ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1470  targetrel = ancestor;
1471  }
1472 
1473  if (change->data.tp.oldtuple)
1474  {
1475  old_slot = relentry->old_slot;
1476  ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
1477 
1478  /* Convert tuple if needed. */
1479  if (relentry->attrmap)
1480  {
1482  &TTSOpsVirtual);
1483 
1484  old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1485  }
1486  }
1487 
1488  if (change->data.tp.newtuple)
1489  {
1490  new_slot = relentry->new_slot;
1491  ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
1492 
1493  /* Convert tuple if needed. */
1494  if (relentry->attrmap)
1495  {
1497  &TTSOpsVirtual);
1498 
1499  new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1500  }
1501  }
1502 
1503  /*
1504  * Check row filter.
1505  *
1506  * Updates could be transformed to inserts or deletes based on the results
1507  * of the row filter for old and new tuple.
1508  */
1509  if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1510  goto cleanup;
1511 
1512  /*
1513  * Send BEGIN if we haven't yet.
1514  *
1515  * We send the BEGIN message after ensuring that we will actually send the
1516  * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1517  * transactions.
1518  */
1519  if (txndata && !txndata->sent_begin_txn)
1520  pgoutput_send_begin(ctx, txn);
1521 
1522  /*
1523  * Schema should be sent using the original relation because it also sends
1524  * the ancestor's relation.
1525  */
1526  maybe_send_schema(ctx, change, relation, relentry);
1527 
1528  OutputPluginPrepareWrite(ctx, true);
1529 
1530  /* Send the data */
1531  switch (action)
1532  {
1534  logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1535  data->binary, relentry->columns);
1536  break;
1538  logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
1539  new_slot, data->binary, relentry->columns);
1540  break;
1542  logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
1543  data->binary, relentry->columns);
1544  break;
1545  default:
1546  Assert(false);
1547  }
1548 
1549  OutputPluginWrite(ctx, true);
1550 
1551 cleanup:
1552  if (RelationIsValid(ancestor))
1553  {
1554  RelationClose(ancestor);
1555  ancestor = NULL;
1556  }
1557 
1558  MemoryContextSwitchTo(old);
1559  MemoryContextReset(data->context);
1560 }
1561 
1562 static void
1564  int nrelations, Relation relations[], ReorderBufferChange *change)
1565 {
1568  MemoryContext old;
1569  RelationSyncEntry *relentry;
1570  int i;
1571  int nrelids;
1572  Oid *relids;
1574 
1575  /* Remember the xid for the change in streaming mode. See pgoutput_change. */
1576  if (data->in_streaming)
1577  xid = change->txn->xid;
1578 
1579  old = MemoryContextSwitchTo(data->context);
1580 
1581  relids = palloc0(nrelations * sizeof(Oid));
1582  nrelids = 0;
1583 
1584  for (i = 0; i < nrelations; i++)
1585  {
1586  Relation relation = relations[i];
1587  Oid relid = RelationGetRelid(relation);
1588 
1589  if (!is_publishable_relation(relation))
1590  continue;
1591 
1592  relentry = get_rel_sync_entry(data, relation);
1593 
1594  if (!relentry->pubactions.pubtruncate)
1595  continue;
1596 
1597  /*
1598  * Don't send partitions if the publication wants to send only the
1599  * root tables through it.
1600  */
1601  if (relation->rd_rel->relispartition &&
1602  relentry->publish_as_relid != relid)
1603  continue;
1604 
1605  relids[nrelids++] = relid;
1606 
1607  /* Send BEGIN if we haven't yet */
1608  if (txndata && !txndata->sent_begin_txn)
1609  pgoutput_send_begin(ctx, txn);
1610 
1611  maybe_send_schema(ctx, change, relation, relentry);
1612  }
1613 
1614  if (nrelids > 0)
1615  {
1616  OutputPluginPrepareWrite(ctx, true);
1618  xid,
1619  nrelids,
1620  relids,
1621  change->data.truncate.cascade,
1622  change->data.truncate.restart_seqs);
1623  OutputPluginWrite(ctx, true);
1624  }
1625 
1626  MemoryContextSwitchTo(old);
1627  MemoryContextReset(data->context);
1628 }
1629 
1630 static void
1632  XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
1633  const char *message)
1634 {
1637 
1638  if (!data->messages)
1639  return;
1640 
1641  /*
1642  * Remember the xid for the message in streaming mode. See
1643  * pgoutput_change.
1644  */
1645  if (data->in_streaming)
1646  xid = txn->xid;
1647 
1648  /*
1649  * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1650  */
1651  if (transactional)
1652  {
1654 
1655  /* Send BEGIN if we haven't yet */
1656  if (txndata && !txndata->sent_begin_txn)
1657  pgoutput_send_begin(ctx, txn);
1658  }
1659 
1660  OutputPluginPrepareWrite(ctx, true);
1662  xid,
1663  message_lsn,
1664  transactional,
1665  prefix,
1666  sz,
1667  message);
1668  OutputPluginWrite(ctx, true);
1669 }
1670 
1671 /*
1672  * Return true if the data is associated with an origin and the user has
1673  * requested the changes that don't have an origin, false otherwise.
1674  */
1675 static bool
1677  RepOriginId origin_id)
1678 {
1680 
1681  if (data->publish_no_origin && origin_id != InvalidRepOriginId)
1682  return true;
1683 
1684  return false;
1685 }
1686 
1687 /*
1688  * Shutdown the output plugin.
1689  *
1690  * Note, we don't need to clean the data->context and data->cachectx as
1691  * they are child contexts of the ctx->context so they will be cleaned up by
1692  * logical decoding machinery.
1693  */
1694 static void
1696 {
1697  if (RelationSyncCache)
1698  {
1700  RelationSyncCache = NULL;
1701  }
1702 }
1703 
1704 /*
1705  * Load publications from the list of publication names.
1706  */
1707 static List *
1709 {
1710  List *result = NIL;
1711  ListCell *lc;
1712 
1713  foreach(lc, pubnames)
1714  {
1715  char *pubname = (char *) lfirst(lc);
1716  Publication *pub = GetPublicationByName(pubname, false);
1717 
1718  result = lappend(result, pub);
1719  }
1720 
1721  return result;
1722 }
1723 
1724 /*
1725  * Publication syscache invalidation callback.
1726  *
1727  * Called for invalidations on pg_publication.
1728  */
1729 static void
1731 {
1732  publications_valid = false;
1733 
1734  /*
1735  * Also invalidate per-relation cache so that next time the filtering info
1736  * is checked it will be updated with the new publication settings.
1737  */
1738  rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
1739 }
1740 
1741 /*
1742  * START STREAM callback
1743  */
1744 static void
1746  ReorderBufferTXN *txn)
1747 {
1749  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1750 
1751  /* we can't nest streaming of transactions */
1752  Assert(!data->in_streaming);
1753 
1754  /*
1755  * If we already sent the first stream for this transaction then don't
1756  * send the origin id in the subsequent streams.
1757  */
1758  if (rbtxn_is_streamed(txn))
1759  send_replication_origin = false;
1760 
1761  OutputPluginPrepareWrite(ctx, !send_replication_origin);
1763 
1765  send_replication_origin);
1766 
1767  OutputPluginWrite(ctx, true);
1768 
1769  /* we're streaming a chunk of transaction now */
1770  data->in_streaming = true;
1771 }
1772 
1773 /*
1774  * STOP STREAM callback
1775  */
1776 static void
1778  ReorderBufferTXN *txn)
1779 {
1781 
1782  /* we should be streaming a transaction */
1783  Assert(data->in_streaming);
1784 
1785  OutputPluginPrepareWrite(ctx, true);
1787  OutputPluginWrite(ctx, true);
1788 
1789  /* we've stopped streaming a transaction */
1790  data->in_streaming = false;
1791 }
1792 
1793 /*
1794  * Notify downstream to discard the streamed transaction (along with all
1795  * it's subtransactions, if it's a toplevel transaction).
1796  */
1797 static void
1799  ReorderBufferTXN *txn,
1800  XLogRecPtr abort_lsn)
1801 {
1802  ReorderBufferTXN *toptxn;
1804  bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
1805 
1806  /*
1807  * The abort should happen outside streaming block, even for streamed
1808  * transactions. The transaction has to be marked as streamed, though.
1809  */
1810  Assert(!data->in_streaming);
1811 
1812  /* determine the toplevel transaction */
1813  toptxn = rbtxn_get_toptxn(txn);
1814 
1815  Assert(rbtxn_is_streamed(toptxn));
1816 
1817  OutputPluginPrepareWrite(ctx, true);
1818  logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
1819  txn->xact_time.abort_time, write_abort_info);
1820 
1821  OutputPluginWrite(ctx, true);
1822 
1823  cleanup_rel_sync_cache(toptxn->xid, false);
1824 }
1825 
1826 /*
1827  * Notify downstream to apply the streamed transaction (along with all
1828  * it's subtransactions).
1829  */
1830 static void
1832  ReorderBufferTXN *txn,
1833  XLogRecPtr commit_lsn)
1834 {
1836 
1837  /*
1838  * The commit should happen outside streaming block, even for streamed
1839  * transactions. The transaction has to be marked as streamed, though.
1840  */
1841  Assert(!data->in_streaming);
1842  Assert(rbtxn_is_streamed(txn));
1843 
1844  OutputPluginUpdateProgress(ctx, false);
1845 
1846  OutputPluginPrepareWrite(ctx, true);
1847  logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1848  OutputPluginWrite(ctx, true);
1849 
1850  cleanup_rel_sync_cache(txn->xid, true);
1851 }
1852 
1853 /*
1854  * PREPARE callback (for streaming two-phase commit).
1855  *
1856  * Notify the downstream to prepare the transaction.
1857  */
1858 static void
1860  ReorderBufferTXN *txn,
1861  XLogRecPtr prepare_lsn)
1862 {
1863  Assert(rbtxn_is_streamed(txn));
1864 
1865  OutputPluginUpdateProgress(ctx, false);
1866  OutputPluginPrepareWrite(ctx, true);
1867  logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1868  OutputPluginWrite(ctx, true);
1869 }
1870 
1871 /*
1872  * Initialize the relation schema sync cache for a decoding session.
1873  *
1874  * The hash table is destroyed at the end of a decoding session. While
1875  * relcache invalidations still exist and will still be invoked, they
1876  * will just see the null hash table global and take no action.
1877  */
1878 static void
1880 {
1881  HASHCTL ctl;
1882  static bool relation_callbacks_registered = false;
1883 
1884  /* Nothing to do if hash table already exists */
1885  if (RelationSyncCache != NULL)
1886  return;
1887 
1888  /* Make a new hash table for the cache */
1889  ctl.keysize = sizeof(Oid);
1890  ctl.entrysize = sizeof(RelationSyncEntry);
1891  ctl.hcxt = cachectx;
1892 
1893  RelationSyncCache = hash_create("logical replication output relation cache",
1894  128, &ctl,
1896 
1897  Assert(RelationSyncCache != NULL);
1898 
1899  /* No more to do if we already registered callbacks */
1900  if (relation_callbacks_registered)
1901  return;
1902 
1903  /* We must update the cache entry for a relation after a relcache flush */
1905 
1906  /*
1907  * Flush all cache entries after a pg_namespace change, in case it was a
1908  * schema rename affecting a relation being replicated.
1909  */
1910  CacheRegisterSyscacheCallback(NAMESPACEOID,
1912  (Datum) 0);
1913 
1914  /*
1915  * Flush all cache entries after any publication changes. (We need no
1916  * callback entry for pg_publication, because publication_invalidation_cb
1917  * will take care of it.)
1918  */
1919  CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
1921  (Datum) 0);
1922  CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
1924  (Datum) 0);
1925 
1926  relation_callbacks_registered = true;
1927 }
1928 
1929 /*
1930  * We expect relatively small number of streamed transactions.
1931  */
1932 static bool
1934 {
1935  return list_member_xid(entry->streamed_txns, xid);
1936 }
1937 
1938 /*
1939  * Add the xid in the rel sync entry for which we have already sent the schema
1940  * of the relation.
1941  */
1942 static void
1944 {
1945  MemoryContext oldctx;
1946 
1948 
1949  entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
1950 
1951  MemoryContextSwitchTo(oldctx);
1952 }
1953 
1954 /*
1955  * Find or create entry in the relation schema cache.
1956  *
1957  * This looks up publications that the given relation is directly or
1958  * indirectly part of (the latter if it's really the relation's ancestor that
1959  * is part of a publication) and fills up the found entry with the information
1960  * about which operations to publish and whether to use an ancestor's schema
1961  * when publishing.
1962  */
1963 static RelationSyncEntry *
1965 {
1966  RelationSyncEntry *entry;
1967  bool found;
1968  MemoryContext oldctx;
1969  Oid relid = RelationGetRelid(relation);
1970 
1971  Assert(RelationSyncCache != NULL);
1972 
1973  /* Find cached relation info, creating if not found */
1975  &relid,
1976  HASH_ENTER, &found);
1977  Assert(entry != NULL);
1978 
1979  /* initialize entry, if it's new */
1980  if (!found)
1981  {
1982  entry->replicate_valid = false;
1983  entry->schema_sent = false;
1984  entry->streamed_txns = NIL;
1985  entry->pubactions.pubinsert = entry->pubactions.pubupdate =
1986  entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
1987  entry->new_slot = NULL;
1988  entry->old_slot = NULL;
1989  memset(entry->exprstate, 0, sizeof(entry->exprstate));
1990  entry->entry_cxt = NULL;
1991  entry->publish_as_relid = InvalidOid;
1992  entry->columns = NULL;
1993  entry->attrmap = NULL;
1994  }
1995 
1996  /* Validate the entry */
1997  if (!entry->replicate_valid)
1998  {
1999  Oid schemaId = get_rel_namespace(relid);
2000  List *pubids = GetRelationPublications(relid);
2001 
2002  /*
2003  * We don't acquire a lock on the namespace system table as we build
2004  * the cache entry using a historic snapshot and all the later changes
2005  * are absorbed while decoding WAL.
2006  */
2007  List *schemaPubids = GetSchemaPublications(schemaId);
2008  ListCell *lc;
2009  Oid publish_as_relid = relid;
2010  int publish_ancestor_level = 0;
2011  bool am_partition = get_rel_relispartition(relid);
2012  char relkind = get_rel_relkind(relid);
2013  List *rel_publications = NIL;
2014 
2015  /* Reload publications if needed before use. */
2016  if (!publications_valid)
2017  {
2019  if (data->publications)
2020  {
2021  list_free_deep(data->publications);
2022  data->publications = NIL;
2023  }
2024  data->publications = LoadPublications(data->publication_names);
2025  MemoryContextSwitchTo(oldctx);
2026  publications_valid = true;
2027  }
2028 
2029  /*
2030  * Reset schema_sent status as the relation definition may have
2031  * changed. Also reset pubactions to empty in case rel was dropped
2032  * from a publication. Also free any objects that depended on the
2033  * earlier definition.
2034  */
2035  entry->schema_sent = false;
2036  list_free(entry->streamed_txns);
2037  entry->streamed_txns = NIL;
2038  bms_free(entry->columns);
2039  entry->columns = NULL;
2040  entry->pubactions.pubinsert = false;
2041  entry->pubactions.pubupdate = false;
2042  entry->pubactions.pubdelete = false;
2043  entry->pubactions.pubtruncate = false;
2044 
2045  /*
2046  * Tuple slots cleanups. (Will be rebuilt later if needed).
2047  */
2048  if (entry->old_slot)
2050  if (entry->new_slot)
2052 
2053  entry->old_slot = NULL;
2054  entry->new_slot = NULL;
2055 
2056  if (entry->attrmap)
2057  free_attrmap(entry->attrmap);
2058  entry->attrmap = NULL;
2059 
2060  /*
2061  * Row filter cache cleanups.
2062  */
2063  if (entry->entry_cxt)
2065 
2066  entry->entry_cxt = NULL;
2067  entry->estate = NULL;
2068  memset(entry->exprstate, 0, sizeof(entry->exprstate));
2069 
2070  /*
2071  * Build publication cache. We can't use one provided by relcache as
2072  * relcache considers all publications that the given relation is in,
2073  * but here we only need to consider ones that the subscriber
2074  * requested.
2075  */
2076  foreach(lc, data->publications)
2077  {
2078  Publication *pub = lfirst(lc);
2079  bool publish = false;
2080 
2081  /*
2082  * Under what relid should we publish changes in this publication?
2083  * We'll use the top-most relid across all publications. Also
2084  * track the ancestor level for this publication.
2085  */
2086  Oid pub_relid = relid;
2087  int ancestor_level = 0;
2088 
2089  /*
2090  * If this is a FOR ALL TABLES publication, pick the partition
2091  * root and set the ancestor level accordingly.
2092  */
2093  if (pub->alltables)
2094  {
2095  publish = true;
2096  if (pub->pubviaroot && am_partition)
2097  {
2098  List *ancestors = get_partition_ancestors(relid);
2099 
2100  pub_relid = llast_oid(ancestors);
2101  ancestor_level = list_length(ancestors);
2102  }
2103  }
2104 
2105  if (!publish)
2106  {
2107  bool ancestor_published = false;
2108 
2109  /*
2110  * For a partition, check if any of the ancestors are
2111  * published. If so, note down the topmost ancestor that is
2112  * published via this publication, which will be used as the
2113  * relation via which to publish the partition's changes.
2114  */
2115  if (am_partition)
2116  {
2117  Oid ancestor;
2118  int level;
2119  List *ancestors = get_partition_ancestors(relid);
2120 
2121  ancestor = GetTopMostAncestorInPublication(pub->oid,
2122  ancestors,
2123  &level);
2124 
2125  if (ancestor != InvalidOid)
2126  {
2127  ancestor_published = true;
2128  if (pub->pubviaroot)
2129  {
2130  pub_relid = ancestor;
2131  ancestor_level = level;
2132  }
2133  }
2134  }
2135 
2136  if (list_member_oid(pubids, pub->oid) ||
2137  list_member_oid(schemaPubids, pub->oid) ||
2138  ancestor_published)
2139  publish = true;
2140  }
2141 
2142  /*
2143  * If the relation is to be published, determine actions to
2144  * publish, and list of columns, if appropriate.
2145  *
2146  * Don't publish changes for partitioned tables, because
2147  * publishing those of its partitions suffices, unless partition
2148  * changes won't be published due to pubviaroot being set.
2149  */
2150  if (publish &&
2151  (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2152  {
2153  entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
2154  entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
2155  entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
2157 
2158  /*
2159  * We want to publish the changes as the top-most ancestor
2160  * across all publications. So we need to check if the already
2161  * calculated level is higher than the new one. If yes, we can
2162  * ignore the new value (as it's a child). Otherwise the new
2163  * value is an ancestor, so we keep it.
2164  */
2165  if (publish_ancestor_level > ancestor_level)
2166  continue;
2167 
2168  /*
2169  * If we found an ancestor higher up in the tree, discard the
2170  * list of publications through which we replicate it, and use
2171  * the new ancestor.
2172  */
2173  if (publish_ancestor_level < ancestor_level)
2174  {
2175  publish_as_relid = pub_relid;
2176  publish_ancestor_level = ancestor_level;
2177 
2178  /* reset the publication list for this relation */
2179  rel_publications = NIL;
2180  }
2181  else
2182  {
2183  /* Same ancestor level, has to be the same OID. */
2184  Assert(publish_as_relid == pub_relid);
2185  }
2186 
2187  /* Track publications for this ancestor. */
2188  rel_publications = lappend(rel_publications, pub);
2189  }
2190  }
2191 
2192  entry->publish_as_relid = publish_as_relid;
2193 
2194  /*
2195  * Initialize the tuple slot, map, and row filter. These are only used
2196  * when publishing inserts, updates, or deletes.
2197  */
2198  if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2199  entry->pubactions.pubdelete)
2200  {
2201  /* Initialize the tuple slot and map */
2202  init_tuple_slot(data, relation, entry);
2203 
2204  /* Initialize the row filter */
2205  pgoutput_row_filter_init(data, rel_publications, entry);
2206 
2207  /* Initialize the column list */
2208  pgoutput_column_list_init(data, rel_publications, entry);
2209  }
2210 
2211  list_free(pubids);
2212  list_free(schemaPubids);
2213  list_free(rel_publications);
2214 
2215  entry->replicate_valid = true;
2216  }
2217 
2218  return entry;
2219 }
2220 
2221 /*
2222  * Cleanup list of streamed transactions and update the schema_sent flag.
2223  *
2224  * When a streamed transaction commits or aborts, we need to remove the
2225  * toplevel XID from the schema cache. If the transaction aborted, the
2226  * subscriber will simply throw away the schema records we streamed, so
2227  * we don't need to do anything else.
2228  *
2229  * If the transaction is committed, the subscriber will update the relation
2230  * cache - so tweak the schema_sent flag accordingly.
2231  */
2232 static void
2234 {
2235  HASH_SEQ_STATUS hash_seq;
2236  RelationSyncEntry *entry;
2237 
2238  Assert(RelationSyncCache != NULL);
2239 
2240  hash_seq_init(&hash_seq, RelationSyncCache);
2241  while ((entry = hash_seq_search(&hash_seq)) != NULL)
2242  {
2243  /*
2244  * We can set the schema_sent flag for an entry that has committed xid
2245  * in the list as that ensures that the subscriber would have the
2246  * corresponding schema and we don't need to send it unless there is
2247  * any invalidation for that relation.
2248  */
2249  foreach_xid(streamed_txn, entry->streamed_txns)
2250  {
2251  if (xid == streamed_txn)
2252  {
2253  if (is_commit)
2254  entry->schema_sent = true;
2255 
2256  entry->streamed_txns =
2257  foreach_delete_current(entry->streamed_txns, streamed_txn);
2258  break;
2259  }
2260  }
2261  }
2262 }
2263 
2264 /*
2265  * Relcache invalidation callback
2266  */
2267 static void
2269 {
2270  RelationSyncEntry *entry;
2271 
2272  /*
2273  * We can get here if the plugin was used in SQL interface as the
2274  * RelationSyncCache is destroyed when the decoding finishes, but there is
2275  * no way to unregister the relcache invalidation callback.
2276  */
2277  if (RelationSyncCache == NULL)
2278  return;
2279 
2280  /*
2281  * Nobody keeps pointers to entries in this hash table around outside
2282  * logical decoding callback calls - but invalidation events can come in
2283  * *during* a callback if we do any syscache access in the callback.
2284  * Because of that we must mark the cache entry as invalid but not damage
2285  * any of its substructure here. The next get_rel_sync_entry() call will
2286  * rebuild it all.
2287  */
2288  if (OidIsValid(relid))
2289  {
2290  /*
2291  * Getting invalidations for relations that aren't in the table is
2292  * entirely normal. So we don't care if it's found or not.
2293  */
2294  entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
2295  HASH_FIND, NULL);
2296  if (entry != NULL)
2297  entry->replicate_valid = false;
2298  }
2299  else
2300  {
2301  /* Whole cache must be flushed. */
2302  HASH_SEQ_STATUS status;
2303 
2304  hash_seq_init(&status, RelationSyncCache);
2305  while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2306  {
2307  entry->replicate_valid = false;
2308  }
2309  }
2310 }
2311 
2312 /*
2313  * Publication relation/schema map syscache invalidation callback
2314  *
2315  * Called for invalidations on pg_publication, pg_publication_rel,
2316  * pg_publication_namespace, and pg_namespace.
2317  */
2318 static void
2320 {
2321  HASH_SEQ_STATUS status;
2322  RelationSyncEntry *entry;
2323 
2324  /*
2325  * We can get here if the plugin was used in SQL interface as the
2326  * RelationSyncCache is destroyed when the decoding finishes, but there is
2327  * no way to unregister the invalidation callbacks.
2328  */
2329  if (RelationSyncCache == NULL)
2330  return;
2331 
2332  /*
2333  * We have no easy way to identify which cache entries this invalidation
2334  * event might have affected, so just mark them all invalid.
2335  */
2336  hash_seq_init(&status, RelationSyncCache);
2337  while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2338  {
2339  entry->replicate_valid = false;
2340  }
2341 }
2342 
2343 /* Send Replication origin */
2344 static void
2346  XLogRecPtr origin_lsn, bool send_origin)
2347 {
2348  if (send_origin)
2349  {
2350  char *origin;
2351 
2352  /*----------
2353  * XXX: which behaviour do we want here?
2354  *
2355  * Alternatives:
2356  * - don't send origin message if origin name not found
2357  * (that's what we do now)
2358  * - throw error - that will break replication, not good
2359  * - send some special "unknown" origin
2360  *----------
2361  */
2362  if (replorigin_by_oid(origin_id, true, &origin))
2363  {
2364  /* Message boundary */
2365  OutputPluginWrite(ctx, false);
2366  OutputPluginPrepareWrite(ctx, true);
2367 
2368  logicalrep_write_origin(ctx->out, origin, origin_lsn);
2369  }
2370  }
2371 }
Datum idx(PG_FUNCTION_ARGS)
Definition: _int_op.c:259
void free_attrmap(AttrMap *map)
Definition: attmap.c:57
AttrMap * build_attrmap_by_name_if_req(TupleDesc indesc, TupleDesc outdesc, bool missing_ok)
Definition: attmap.c:264
bool bms_equal(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:155
void bms_free(Bitmapset *a)
Definition: bitmapset.c:252
int bms_num_members(const Bitmapset *a)
Definition: bitmapset.c:764
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:523
static void cleanup(void)
Definition: bootstrap.c:686
#define TextDatumGetCString(d)
Definition: builtins.h:98
unsigned int uint32
Definition: c.h:495
#define PG_UINT32_MAX
Definition: c.h:579
#define PG_USED_FOR_ASSERTS_ONLY
Definition: c.h:171
uint32 TransactionId
Definition: c.h:641
#define OidIsValid(objectId)
Definition: c.h:764
size_t Size
Definition: c.h:594
int64 TimestampTz
Definition: timestamp.h:39
bool defGetBoolean(DefElem *def)
Definition: define.c:108
char * defGetString(DefElem *def)
Definition: define.c:49
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:863
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:953
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:350
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1431
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1421
int errcode(int sqlerrcode)
Definition: elog.c:860
int errmsg(const char *fmt,...)
Definition: elog.c:1075
#define DEBUG3
Definition: elog.h:28
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
ExprState * ExecPrepareExpr(Expr *node, EState *estate)
Definition: execExpr.c:733
TupleTableSlot * MakeTupleTableSlot(TupleDesc tupleDesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1111
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1551
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1253
TupleTableSlot * ExecStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1351
const TupleTableSlotOps TTSOpsHeapTuple
Definition: execTuples.c:84
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1237
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos)
Definition: execUtils.c:733
EState * CreateExecutorState(void)
Definition: execUtils.c:93
#define ResetPerTupleExprContext(estate)
Definition: executor.h:558
#define GetPerTupleExprContext(estate)
Definition: executor.h:549
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:347
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
Definition: inval.c:1559
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1517
int i
Definition: isn.c:73
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
Assert(fmt[strlen(fmt) - 1] !='\n')
List * lappend_xid(List *list, TransactionId datum)
Definition: list.c:393
bool list_member_xid(const List *list, TransactionId datum)
Definition: list.c:742
List * lappend(List *list, void *datum)
Definition: list.c:339
void list_free(List *list)
Definition: list.c:1546
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:722
void list_free_deep(List *list)
Definition: list.c:1560
#define AccessShareLock
Definition: lockdefs.h:36
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:716
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact)
Definition: logical.c:729
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:703
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
Definition: logicalproto.h:44
#define LOGICALREP_PROTO_MIN_VERSION_NUM
Definition: logicalproto.h:40
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:42
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:43
#define LOGICALREP_PROTO_MAX_VERSION_NUM
Definition: logicalproto.h:45
bool get_rel_relispartition(Oid relid)
Definition: lsyscache.c:2004
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3321
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1980
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1929
Expr * make_orclause(List *orclauses)
Definition: makefuncs.c:655
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:330
void pfree(void *pointer)
Definition: mcxt.c:1431
void * palloc0(Size size)
Definition: mcxt.c:1232
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1077
MemoryContext CacheMemoryContext
Definition: mcxt.c:144
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:403
#define AllocSetContextCreate
Definition: memutils.h:128
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:152
#define ALLOCSET_SMALL_SIZES
Definition: memutils.h:162
#define MemoryContextCopyAndSetIdentifier(cxt, id)
Definition: memutils.h:100
#define IsA(nodeptr, _type_)
Definition: nodes.h:158
#define makeNode(_type_)
Definition: nodes.h:155
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:466
#define InvalidRepOriginId
Definition: origin.h:33
@ OUTPUT_PLUGIN_BINARY_OUTPUT
Definition: output_plugin.h:19
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
@ RTE_RELATION
Definition: parsenodes.h:1006
List * get_partition_ancestors(Oid relid)
Definition: partition.c:135
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:209
void * arg
const void * data
#define lfirst(lc)
Definition: pg_list.h:172
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
#define foreach_delete_current(lst, var_or_cell)
Definition: pg_list.h:391
#define foreach_xid(var, lst)
Definition: pg_list.h:472
#define list_make1(x1)
Definition: pg_list.h:212
#define llast_oid(l)
Definition: pg_list.h:200
Publication * GetPublicationByName(const char *pubname, bool missing_ok)
List * GetSchemaPublications(Oid schemaid)
List * GetRelationPublications(Oid relid)
Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
Bitmapset * pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
bool is_publishable_relation(Relation rel)
#define LOGICALREP_ORIGIN_NONE
#define LOGICALREP_STREAM_ON
#define LOGICALREP_ORIGIN_ANY
#define LOGICALREP_STREAM_OFF
#define LOGICALREP_STREAM_PARALLEL
static List * LoadPublications(List *pubnames)
Definition: pgoutput.c:1708
static void pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:563
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:2319
struct RelationSyncEntry RelationSyncEntry
static void pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
Definition: pgoutput.c:848
static void parse_output_parameters(List *options, PGOutputData *data)
Definition: pgoutput.c:277
static void init_tuple_slot(PGOutputData *data, Relation relation, RelationSyncEntry *entry)
Definition: pgoutput.c:1131
static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
Definition: pgoutput.c:825
static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
Definition: pgoutput.c:1404
#define NUM_ROWFILTER_PUBACTIONS
Definition: pgoutput.c:106
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:617
struct PGOutputTxnData PGOutputTxnData
static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: pgoutput.c:419
static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Definition: pgoutput.c:1563
static void init_rel_sync_cache(MemoryContext cachectx)
Definition: pgoutput.c:1879
RowFilterPubAction
Definition: pgoutput.c:100
@ PUBACTION_INSERT
Definition: pgoutput.c:101
@ PUBACTION_UPDATE
Definition: pgoutput.c:102
@ PUBACTION_DELETE
Definition: pgoutput.c:103
PG_MODULE_MAGIC
Definition: pgoutput.c:39
static void rel_sync_cache_relation_cb(Datum arg, Oid relid)
Definition: pgoutput.c:2268
static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: pgoutput.c:634
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Relation relation)
Definition: pgoutput.c:1964
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
Definition: pgoutput.c:1676
static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin)
Definition: pgoutput.c:2345
static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: pgoutput.c:662
static void pgoutput_shutdown(LogicalDecodingContext *ctx)
Definition: pgoutput.c:1695
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
Definition: pgoutput.c:2233
static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
Definition: pgoutput.c:1798
static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: pgoutput.c:1859
static void maybe_send_schema(LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
Definition: pgoutput.c:680
static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:549
static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx, Bitmapset *columns)
Definition: pgoutput.c:751
static HTAB * RelationSyncCache
Definition: pgoutput.c:211
static void pgoutput_row_filter_init(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition: pgoutput.c:870
static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:1831
static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:648
static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot, TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry, ReorderBufferChangeType *action)
Definition: pgoutput.c:1223
static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:1943
static void pgoutput_column_list_init(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition: pgoutput.c:1016
static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:1777
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:1745
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:1730
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: pgoutput.c:248
static void pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: pgoutput.c:1631
static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:585
static bool publications_valid
Definition: pgoutput.c:83
static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:1933
static EState * create_estate_for_relation(Relation rel)
Definition: pgoutput.c:795
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
static bool DatumGetBool(Datum X)
Definition: postgres.h:90
uintptr_t Datum
Definition: postgres.h:64
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:89
void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: proto.c:304
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:385
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid, XLogRecPtr abort_lsn, TimestampTz abort_time, bool write_abort_info)
Definition: proto.c:1166
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns)
Definition: proto.c:670
void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: proto.c:643
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns)
Definition: proto.c:458
void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:198
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns)
Definition: proto.c:414
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
Definition: proto.c:725
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:586
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:60
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns)
Definition: proto.c:533
void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:248
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:1112
void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:364
void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:127
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
Definition: proto.c:1069
void logicalrep_write_stream_stop(StringInfo out)
Definition: proto.c:1103
void * stringToNode(const char *str)
Definition: read.c:90
#define RelationGetRelid(relation)
Definition: rel.h:504
#define RelationGetDescr(relation)
Definition: rel.h:530
#define RelationGetRelationName(relation)
Definition: rel.h:538
#define RelationIsValid(relation)
Definition: rel.h:477
#define RelationGetNamespace(relation)
Definition: rel.h:545
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2057
void RelationClose(Relation relation)
Definition: relcache.c:2188
#define rbtxn_is_streamed(txn)
#define rbtxn_get_toptxn(txn)
#define rbtxn_is_subtxn(txn)
ReorderBufferChangeType
Definition: reorderbuffer.h:45
@ REORDER_BUFFER_CHANGE_INSERT
Definition: reorderbuffer.h:46
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:48
@ REORDER_BUFFER_CHANGE_UPDATE
Definition: reorderbuffer.h:47
Definition: attmap.h:35
char * defname
Definition: parsenodes.h:802
Node * arg
Definition: parsenodes.h:803
CommandId es_output_cid
Definition: execnodes.h:637
TupleTableSlot * ecxt_scantuple
Definition: execnodes.h:255
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
Definition: dynahash.c:220
Definition: pg_list.h:54
MemoryContext context
Definition: logical.h:36
StringInfo out
Definition: logical.h:71
void * output_plugin_private
Definition: logical.h:76
List * output_plugin_options
Definition: logical.h:59
LogicalDecodeStreamChangeCB stream_change_cb
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
LogicalDecodeStreamMessageCB stream_message_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeTruncateCB truncate_cb
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
LogicalDecodeStreamPrepareCB stream_prepare_cb
LogicalDecodeCommitPreparedCB commit_prepared_cb
LogicalDecodeStreamStartCB stream_start_cb
LogicalDecodePrepareCB prepare_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeCommitCB commit_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamAbortCB stream_abort_cb
LogicalDecodeBeginPrepareCB begin_prepare_cb
LogicalDecodeChangeCB change_cb
LogicalDecodeShutdownCB shutdown_cb
OutputPluginOutputType output_type
Definition: output_plugin.h:28
bool sent_begin_txn
Definition: pgoutput.c:207
PublicationActions pubactions
RTEKind rtekind
Definition: parsenodes.h:1025
Form_pg_class rd_rel
Definition: rel.h:111
ExprState * exprstate[NUM_ROWFILTER_PUBACTIONS]
Definition: pgoutput.c:146
Bitmapset * columns
Definition: pgoutput.c:172
PublicationActions pubactions
Definition: pgoutput.c:137
TupleTableSlot * old_slot
Definition: pgoutput.c:149
bool replicate_valid
Definition: pgoutput.c:130
MemoryContext entry_cxt
Definition: pgoutput.c:178
EState * estate
Definition: pgoutput.c:147
TupleTableSlot * new_slot
Definition: pgoutput.c:148
List * streamed_txns
Definition: pgoutput.c:133
AttrMap * attrmap
Definition: pgoutput.c:165
struct ReorderBufferChange::@100::@101 tp
ReorderBufferChangeType action
Definition: reorderbuffer.h:75
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:78
struct ReorderBufferChange::@100::@102 truncate
union ReorderBufferChange::@100 data
RepOriginId origin_id
TimestampTz abort_time
void * output_plugin_private
XLogRecPtr origin_lsn
TransactionId xid
union ReorderBufferTXN::@106 xact_time
Definition: value.h:64
bool * tts_isnull
Definition: tuptable.h:127
Datum * tts_values
Definition: tuptable.h:125
Definition: regguts.h:323
char defGetStreamingMode(DefElem *def)
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:267
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:480
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:230
#define SearchSysCacheExists2(cacheId, key1, key2)
Definition: syscache.h:97
#define InvalidTransactionId
Definition: transam.h:31
#define FirstGenbkiObjectId
Definition: transam.h:195
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:192
TupleDesc CreateTupleDescCopyConstr(TupleDesc tupdesc)
Definition: tupdesc.c:173
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:433
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:362
#define strVal(v)
Definition: value.h:82
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: varatt.h:290
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3456
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:818
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28