PostgreSQL Source Code  git master
pgoutput.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pgoutput.c
4  * Logical Replication output plugin
5  *
6  * Copyright (c) 2012-2021, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/pgoutput/pgoutput.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/tupconvert.h"
16 #include "catalog/partition.h"
17 #include "catalog/pg_publication.h"
18 #include "commands/defrem.h"
19 #include "fmgr.h"
20 #include "replication/logical.h"
22 #include "replication/origin.h"
23 #include "replication/pgoutput.h"
24 #include "utils/int8.h"
25 #include "utils/inval.h"
26 #include "utils/lsyscache.h"
27 #include "utils/memutils.h"
28 #include "utils/syscache.h"
29 #include "utils/varlena.h"
30 
32 
34 
36  OutputPluginOptions *opt, bool is_init);
39  ReorderBufferTXN *txn);
41  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
43  ReorderBufferTXN *txn, Relation rel,
44  ReorderBufferChange *change);
46  ReorderBufferTXN *txn, int nrelations, Relation relations[],
47  ReorderBufferChange *change);
49  RepOriginId origin_id);
50 static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
51  ReorderBufferTXN *txn);
52 static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
53  ReorderBufferTXN *txn);
54 static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
55  ReorderBufferTXN *txn,
56  XLogRecPtr abort_lsn);
57 static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
58  ReorderBufferTXN *txn,
59  XLogRecPtr commit_lsn);
60 
61 static bool publications_valid;
62 static bool in_streaming;
63 
64 static List *LoadPublications(List *pubnames);
65 static void publication_invalidation_cb(Datum arg, int cacheid,
66  uint32 hashvalue);
67 static void send_relation_and_attrs(Relation relation, TransactionId xid,
69 
70 /*
71  * Entry in the map used to remember which relation schemas we sent.
72  *
73  * The schema_sent flag determines if the current schema record was already
74  * sent to the subscriber (in which case we don't need to send it again).
75  *
76  * The schema cache on downstream is however updated only at commit time,
77  * and with streamed transactions the commit order may be different from
78  * the order the transactions are sent in. Also, the (sub) transactions
79  * might get aborted so we need to send the schema for each (sub) transaction
80  * so that we don't lose the schema information on abort. For handling this,
81  * we maintain the list of xids (streamed_txns) for those we have already sent
82  * the schema.
83  *
84  * For partitions, 'pubactions' considers not only the table's own
85  * publications, but also those of all of its ancestors.
86  */
87 typedef struct RelationSyncEntry
88 {
89  Oid relid; /* relation oid */
90 
91  /*
92  * Did we send the schema? If ancestor relid is set, its schema must also
93  * have been sent for this to be true.
94  */
96  List *streamed_txns; /* streamed toplevel transactions with this
97  * schema */
98 
101 
102  /*
103  * OID of the relation to publish changes as. For a partition, this may
104  * be set to one of its ancestors whose schema will be used when
105  * replicating changes, if publish_via_partition_root is set for the
106  * publication.
107  */
109 
110  /*
111  * Map used when replicating using an ancestor's schema to convert tuples
112  * from partition's type to the ancestor's; NULL if publish_as_relid is
113  * same as 'relid' or if unnecessary due to partition and the ancestor
114  * having identical TupleDesc.
115  */
118 
119 /* Map used to remember which relation schemas we sent. */
120 static HTAB *RelationSyncCache = NULL;
121 
122 static void init_rel_sync_cache(MemoryContext decoding_context);
123 static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
126 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
127  uint32 hashvalue);
129  TransactionId xid);
131  TransactionId xid);
132 
133 /*
134  * Specify output plugin callbacks
135  */
136 void
138 {
140 
148 
149  /* transaction streaming */
156 }
157 
158 static void
160  List **publication_names, bool *binary,
161  bool *enable_streaming)
162 {
163  ListCell *lc;
164  bool protocol_version_given = false;
165  bool publication_names_given = false;
166  bool binary_option_given = false;
167  bool streaming_given = false;
168 
169  *binary = false;
170 
171  foreach(lc, options)
172  {
173  DefElem *defel = (DefElem *) lfirst(lc);
174 
175  Assert(defel->arg == NULL || IsA(defel->arg, String));
176 
177  /* Check each param, whether or not we recognize it */
178  if (strcmp(defel->defname, "proto_version") == 0)
179  {
180  int64 parsed;
181 
182  if (protocol_version_given)
183  ereport(ERROR,
184  (errcode(ERRCODE_SYNTAX_ERROR),
185  errmsg("conflicting or redundant options")));
186  protocol_version_given = true;
187 
188  if (!scanint8(strVal(defel->arg), true, &parsed))
189  ereport(ERROR,
190  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
191  errmsg("invalid proto_version")));
192 
193  if (parsed > PG_UINT32_MAX || parsed < 0)
194  ereport(ERROR,
195  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
196  errmsg("proto_version \"%s\" out of range",
197  strVal(defel->arg))));
198 
199  *protocol_version = (uint32) parsed;
200  }
201  else if (strcmp(defel->defname, "publication_names") == 0)
202  {
203  if (publication_names_given)
204  ereport(ERROR,
205  (errcode(ERRCODE_SYNTAX_ERROR),
206  errmsg("conflicting or redundant options")));
207  publication_names_given = true;
208 
209  if (!SplitIdentifierString(strVal(defel->arg), ',',
210  publication_names))
211  ereport(ERROR,
212  (errcode(ERRCODE_INVALID_NAME),
213  errmsg("invalid publication_names syntax")));
214  }
215  else if (strcmp(defel->defname, "binary") == 0)
216  {
217  if (binary_option_given)
218  ereport(ERROR,
219  (errcode(ERRCODE_SYNTAX_ERROR),
220  errmsg("conflicting or redundant options")));
221  binary_option_given = true;
222 
223  *binary = defGetBoolean(defel);
224  }
225  else if (strcmp(defel->defname, "streaming") == 0)
226  {
227  if (streaming_given)
228  ereport(ERROR,
229  (errcode(ERRCODE_SYNTAX_ERROR),
230  errmsg("conflicting or redundant options")));
231  streaming_given = true;
232 
233  *enable_streaming = defGetBoolean(defel);
234  }
235  else
236  elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
237  }
238 }
239 
240 /*
241  * Initialize this plugin
242  */
243 static void
245  bool is_init)
246 {
247  bool enable_streaming = false;
248  PGOutputData *data = palloc0(sizeof(PGOutputData));
249 
250  /* Create our memory context for private allocations. */
252  "logical replication output context",
254 
255  ctx->output_plugin_private = data;
256 
257  /* This plugin uses binary protocol. */
259 
260  /*
261  * This is replication start and not slot initialization.
262  *
263  * Parse and validate options passed by the client.
264  */
265  if (!is_init)
266  {
267  /* Parse the params and ERROR if we see any we don't recognize */
269  &data->protocol_version,
270  &data->publication_names,
271  &data->binary,
272  &enable_streaming);
273 
274  /* Check if we support requested protocol */
276  ereport(ERROR,
277  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
278  errmsg("client sent proto_version=%d but we only support protocol %d or lower",
280 
282  ereport(ERROR,
283  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
284  errmsg("client sent proto_version=%d but we only support protocol %d or higher",
286 
287  if (list_length(data->publication_names) < 1)
288  ereport(ERROR,
289  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
290  errmsg("publication_names parameter missing")));
291 
292  /*
293  * Decide whether to enable streaming. It is disabled by default, in
294  * which case we just update the flag in decoding context. Otherwise
295  * we only allow it with sufficient version of the protocol, and when
296  * the output plugin supports it.
297  */
298  if (!enable_streaming)
299  ctx->streaming = false;
301  ereport(ERROR,
302  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
303  errmsg("requested proto_version=%d does not support streaming, need %d or higher",
305  else if (!ctx->streaming)
306  ereport(ERROR,
307  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
308  errmsg("streaming requested, but not supported by output plugin")));
309 
310  /* Also remember we're currently not streaming any transaction. */
311  in_streaming = false;
312 
313  /* Init publication state. */
314  data->publications = NIL;
315  publications_valid = false;
318  (Datum) 0);
319 
320  /* Initialize relation schema cache. */
322  }
323  else
324  {
325  /* Disable the streaming during the slot initialization mode. */
326  ctx->streaming = false;
327  }
328 }
329 
330 /*
331  * BEGIN callback
332  */
333 static void
335 {
336  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
337 
338  OutputPluginPrepareWrite(ctx, !send_replication_origin);
339  logicalrep_write_begin(ctx->out, txn);
340 
341  if (send_replication_origin)
342  {
343  char *origin;
344 
345  /* Message boundary */
346  OutputPluginWrite(ctx, false);
347  OutputPluginPrepareWrite(ctx, true);
348 
349  /*----------
350  * XXX: which behaviour do we want here?
351  *
352  * Alternatives:
353  * - don't send origin message if origin name not found
354  * (that's what we do now)
355  * - throw error - that will break replication, not good
356  * - send some special "unknown" origin
357  *----------
358  */
359  if (replorigin_by_oid(txn->origin_id, true, &origin))
360  logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
361  }
362 
363  OutputPluginWrite(ctx, true);
364 }
365 
366 /*
367  * COMMIT callback
368  */
369 static void
371  XLogRecPtr commit_lsn)
372 {
374 
375  OutputPluginPrepareWrite(ctx, true);
376  logicalrep_write_commit(ctx->out, txn, commit_lsn);
377  OutputPluginWrite(ctx, true);
378 }
379 
380 /*
381  * Write the current schema of the relation and its ancestor (if any) if not
382  * done yet.
383  */
384 static void
387  Relation relation, RelationSyncEntry *relentry)
388 {
389  bool schema_sent;
392 
393  /*
394  * Remember XID of the (sub)transaction for the change. We don't care if
395  * it's top-level transaction or not (we have already sent that XID in
396  * start of the current streaming block).
397  *
398  * If we're not in a streaming block, just use InvalidTransactionId and
399  * the write methods will not include it.
400  */
401  if (in_streaming)
402  xid = change->txn->xid;
403 
404  if (change->txn->toptxn)
405  topxid = change->txn->toptxn->xid;
406  else
407  topxid = xid;
408 
409  /*
410  * Do we need to send the schema? We do track streamed transactions
411  * separately, because those may be applied later (and the regular
412  * transactions won't see their effects until then) and in an order that
413  * we don't know at this point.
414  *
415  * XXX There is a scope of optimization here. Currently, we always send
416  * the schema first time in a streaming transaction but we can probably
417  * avoid that by checking 'relentry->schema_sent' flag. However, before
418  * doing that we need to study its impact on the case where we have a mix
419  * of streaming and non-streaming transactions.
420  */
421  if (in_streaming)
422  schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
423  else
424  schema_sent = relentry->schema_sent;
425 
426  if (schema_sent)
427  return;
428 
429  /* If needed, send the ancestor's schema first. */
430  if (relentry->publish_as_relid != RelationGetRelid(relation))
431  {
432  Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
433  TupleDesc indesc = RelationGetDescr(relation);
434  TupleDesc outdesc = RelationGetDescr(ancestor);
435  MemoryContext oldctx;
436 
437  /* Map must live as long as the session does. */
439  relentry->map = convert_tuples_by_name(CreateTupleDescCopy(indesc),
440  CreateTupleDescCopy(outdesc));
441  MemoryContextSwitchTo(oldctx);
442  send_relation_and_attrs(ancestor, xid, ctx);
443  RelationClose(ancestor);
444  }
445 
446  send_relation_and_attrs(relation, xid, ctx);
447 
448  if (in_streaming)
449  set_schema_sent_in_streamed_txn(relentry, topxid);
450  else
451  relentry->schema_sent = true;
452 }
453 
454 /*
455  * Sends a relation
456  */
457 static void
460 {
461  TupleDesc desc = RelationGetDescr(relation);
462  int i;
463 
464  /*
465  * Write out type info if needed. We do that only for user-created types.
466  * We use FirstGenbkiObjectId as the cutoff, so that we only consider
467  * objects with hand-assigned OIDs to be "built in", not for instance any
468  * function or type defined in the information_schema. This is important
469  * because only hand-assigned OIDs can be expected to remain stable across
470  * major versions.
471  */
472  for (i = 0; i < desc->natts; i++)
473  {
474  Form_pg_attribute att = TupleDescAttr(desc, i);
475 
476  if (att->attisdropped || att->attgenerated)
477  continue;
478 
479  if (att->atttypid < FirstGenbkiObjectId)
480  continue;
481 
482  OutputPluginPrepareWrite(ctx, false);
483  logicalrep_write_typ(ctx->out, xid, att->atttypid);
484  OutputPluginWrite(ctx, false);
485  }
486 
487  OutputPluginPrepareWrite(ctx, false);
488  logicalrep_write_rel(ctx->out, xid, relation);
489  OutputPluginWrite(ctx, false);
490 }
491 
492 /*
493  * Sends the decoded DML over wire.
494  *
495  * This is called both in streaming and non-streaming modes.
496  */
497 static void
499  Relation relation, ReorderBufferChange *change)
500 {
502  MemoryContext old;
503  RelationSyncEntry *relentry;
505  Relation ancestor = NULL;
506 
507  if (!is_publishable_relation(relation))
508  return;
509 
510  /*
511  * Remember the xid for the change in streaming mode. We need to send xid
512  * with each change in the streaming mode so that subscriber can make
513  * their association and on aborts, it can discard the corresponding
514  * changes.
515  */
516  if (in_streaming)
517  xid = change->txn->xid;
518 
519  relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
520 
521  /* First check the table filter */
522  switch (change->action)
523  {
525  if (!relentry->pubactions.pubinsert)
526  return;
527  break;
529  if (!relentry->pubactions.pubupdate)
530  return;
531  break;
533  if (!relentry->pubactions.pubdelete)
534  return;
535  break;
536  default:
537  Assert(false);
538  }
539 
540  /* Avoid leaking memory by using and resetting our own context */
541  old = MemoryContextSwitchTo(data->context);
542 
543  maybe_send_schema(ctx, txn, change, relation, relentry);
544 
545  /* Send the data */
546  switch (change->action)
547  {
549  {
550  HeapTuple tuple = &change->data.tp.newtuple->tuple;
551 
552  /* Switch relation if publishing via root. */
553  if (relentry->publish_as_relid != RelationGetRelid(relation))
554  {
555  Assert(relation->rd_rel->relispartition);
556  ancestor = RelationIdGetRelation(relentry->publish_as_relid);
557  relation = ancestor;
558  /* Convert tuple if needed. */
559  if (relentry->map)
560  tuple = execute_attr_map_tuple(tuple, relentry->map);
561  }
562 
563  OutputPluginPrepareWrite(ctx, true);
564  logicalrep_write_insert(ctx->out, xid, relation, tuple,
565  data->binary);
566  OutputPluginWrite(ctx, true);
567  break;
568  }
570  {
571  HeapTuple oldtuple = change->data.tp.oldtuple ?
572  &change->data.tp.oldtuple->tuple : NULL;
573  HeapTuple newtuple = &change->data.tp.newtuple->tuple;
574 
575  /* Switch relation if publishing via root. */
576  if (relentry->publish_as_relid != RelationGetRelid(relation))
577  {
578  Assert(relation->rd_rel->relispartition);
579  ancestor = RelationIdGetRelation(relentry->publish_as_relid);
580  relation = ancestor;
581  /* Convert tuples if needed. */
582  if (relentry->map)
583  {
584  oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
585  newtuple = execute_attr_map_tuple(newtuple, relentry->map);
586  }
587  }
588 
589  OutputPluginPrepareWrite(ctx, true);
590  logicalrep_write_update(ctx->out, xid, relation, oldtuple,
591  newtuple, data->binary);
592  OutputPluginWrite(ctx, true);
593  break;
594  }
596  if (change->data.tp.oldtuple)
597  {
598  HeapTuple oldtuple = &change->data.tp.oldtuple->tuple;
599 
600  /* Switch relation if publishing via root. */
601  if (relentry->publish_as_relid != RelationGetRelid(relation))
602  {
603  Assert(relation->rd_rel->relispartition);
604  ancestor = RelationIdGetRelation(relentry->publish_as_relid);
605  relation = ancestor;
606  /* Convert tuple if needed. */
607  if (relentry->map)
608  oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
609  }
610 
611  OutputPluginPrepareWrite(ctx, true);
612  logicalrep_write_delete(ctx->out, xid, relation, oldtuple,
613  data->binary);
614  OutputPluginWrite(ctx, true);
615  }
616  else
617  elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
618  break;
619  default:
620  Assert(false);
621  }
622 
623  if (RelationIsValid(ancestor))
624  {
625  RelationClose(ancestor);
626  ancestor = NULL;
627  }
628 
629  /* Cleanup */
632 }
633 
634 static void
636  int nrelations, Relation relations[], ReorderBufferChange *change)
637 {
639  MemoryContext old;
640  RelationSyncEntry *relentry;
641  int i;
642  int nrelids;
643  Oid *relids;
645 
646  /* Remember the xid for the change in streaming mode. See pgoutput_change. */
647  if (in_streaming)
648  xid = change->txn->xid;
649 
650  old = MemoryContextSwitchTo(data->context);
651 
652  relids = palloc0(nrelations * sizeof(Oid));
653  nrelids = 0;
654 
655  for (i = 0; i < nrelations; i++)
656  {
657  Relation relation = relations[i];
658  Oid relid = RelationGetRelid(relation);
659 
660  if (!is_publishable_relation(relation))
661  continue;
662 
663  relentry = get_rel_sync_entry(data, relid);
664 
665  if (!relentry->pubactions.pubtruncate)
666  continue;
667 
668  /*
669  * Don't send partitions if the publication wants to send only the
670  * root tables through it.
671  */
672  if (relation->rd_rel->relispartition &&
673  relentry->publish_as_relid != relid)
674  continue;
675 
676  relids[nrelids++] = relid;
677  maybe_send_schema(ctx, txn, change, relation, relentry);
678  }
679 
680  if (nrelids > 0)
681  {
682  OutputPluginPrepareWrite(ctx, true);
684  xid,
685  nrelids,
686  relids,
687  change->data.truncate.cascade,
688  change->data.truncate.restart_seqs);
689  OutputPluginWrite(ctx, true);
690  }
691 
694 }
695 
696 /*
697  * Currently we always forward.
698  */
699 static bool
701  RepOriginId origin_id)
702 {
703  return false;
704 }
705 
706 /*
707  * Shutdown the output plugin.
708  *
709  * Note, we don't need to clean the data->context as it's child context
710  * of the ctx->context so it will be cleaned up by logical decoding machinery.
711  */
712 static void
714 {
715  if (RelationSyncCache)
716  {
717  hash_destroy(RelationSyncCache);
718  RelationSyncCache = NULL;
719  }
720 }
721 
722 /*
723  * Load publications from the list of publication names.
724  */
725 static List *
727 {
728  List *result = NIL;
729  ListCell *lc;
730 
731  foreach(lc, pubnames)
732  {
733  char *pubname = (char *) lfirst(lc);
734  Publication *pub = GetPublicationByName(pubname, false);
735 
736  result = lappend(result, pub);
737  }
738 
739  return result;
740 }
741 
742 /*
743  * Publication cache invalidation callback.
744  */
745 static void
746 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
747 {
748  publications_valid = false;
749 
750  /*
751  * Also invalidate per-relation cache so that next time the filtering info
752  * is checked it will be updated with the new publication settings.
753  */
754  rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
755 }
756 
757 /*
758  * START STREAM callback
759  */
760 static void
762  ReorderBufferTXN *txn)
763 {
764  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
765 
766  /* we can't nest streaming of transactions */
768 
769  /*
770  * If we already sent the first stream for this transaction then don't
771  * send the origin id in the subsequent streams.
772  */
773  if (rbtxn_is_streamed(txn))
774  send_replication_origin = false;
775 
776  OutputPluginPrepareWrite(ctx, !send_replication_origin);
778 
779  if (send_replication_origin)
780  {
781  char *origin;
782 
783  /* Message boundary */
784  OutputPluginWrite(ctx, false);
785  OutputPluginPrepareWrite(ctx, true);
786 
787  if (replorigin_by_oid(txn->origin_id, true, &origin))
789  }
790 
791  OutputPluginWrite(ctx, true);
792 
793  /* we're streaming a chunk of transaction now */
794  in_streaming = true;
795 }
796 
797 /*
798  * STOP STREAM callback
799  */
800 static void
802  ReorderBufferTXN *txn)
803 {
804  /* we should be streaming a trasanction */
806 
807  OutputPluginPrepareWrite(ctx, true);
809  OutputPluginWrite(ctx, true);
810 
811  /* we've stopped streaming a transaction */
812  in_streaming = false;
813 }
814 
815 /*
816  * Notify downstream to discard the streamed transaction (along with all
817  * it's subtransactions, if it's a toplevel transaction).
818  */
819 static void
821  ReorderBufferTXN *txn,
822  XLogRecPtr abort_lsn)
823 {
824  ReorderBufferTXN *toptxn;
825 
826  /*
827  * The abort should happen outside streaming block, even for streamed
828  * transactions. The transaction has to be marked as streamed, though.
829  */
831 
832  /* determine the toplevel transaction */
833  toptxn = (txn->toptxn) ? txn->toptxn : txn;
834 
835  Assert(rbtxn_is_streamed(toptxn));
836 
837  OutputPluginPrepareWrite(ctx, true);
838  logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid);
839  OutputPluginWrite(ctx, true);
840 
841  cleanup_rel_sync_cache(toptxn->xid, false);
842 }
843 
844 /*
845  * Notify downstream to apply the streamed transaction (along with all
846  * it's subtransactions).
847  */
848 static void
850  ReorderBufferTXN *txn,
851  XLogRecPtr commit_lsn)
852 {
853  /*
854  * The commit should happen outside streaming block, even for streamed
855  * transactions. The transaction has to be marked as streamed, though.
856  */
859 
861 
862  OutputPluginPrepareWrite(ctx, true);
863  logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
864  OutputPluginWrite(ctx, true);
865 
866  cleanup_rel_sync_cache(txn->xid, true);
867 }
868 
869 /*
870  * Initialize the relation schema sync cache for a decoding session.
871  *
872  * The hash table is destroyed at the end of a decoding session. While
873  * relcache invalidations still exist and will still be invoked, they
874  * will just see the null hash table global and take no action.
875  */
876 static void
878 {
879  HASHCTL ctl;
880 
881  if (RelationSyncCache != NULL)
882  return;
883 
884  /* Make a new hash table for the cache */
885  ctl.keysize = sizeof(Oid);
886  ctl.entrysize = sizeof(RelationSyncEntry);
887  ctl.hcxt = cachectx;
888 
889  RelationSyncCache = hash_create("logical replication output relation cache",
890  128, &ctl,
892 
893  Assert(RelationSyncCache != NULL);
894 
898  (Datum) 0);
899 }
900 
901 /*
902  * We expect relatively small number of streamed transactions.
903  */
904 static bool
906 {
907  ListCell *lc;
908 
909  foreach(lc, entry->streamed_txns)
910  {
911  if (xid == (uint32) lfirst_int(lc))
912  return true;
913  }
914 
915  return false;
916 }
917 
918 /*
919  * Add the xid in the rel sync entry for which we have already sent the schema
920  * of the relation.
921  */
922 static void
924 {
925  MemoryContext oldctx;
926 
928 
929  entry->streamed_txns = lappend_int(entry->streamed_txns, xid);
930 
931  MemoryContextSwitchTo(oldctx);
932 }
933 
934 /*
935  * Find or create entry in the relation schema cache.
936  *
937  * This looks up publications that the given relation is directly or
938  * indirectly part of (the latter if it's really the relation's ancestor that
939  * is part of a publication) and fills up the found entry with the information
940  * about which operations to publish and whether to use an ancestor's schema
941  * when publishing.
942  */
943 static RelationSyncEntry *
945 {
946  RelationSyncEntry *entry;
947  bool am_partition = get_rel_relispartition(relid);
948  char relkind = get_rel_relkind(relid);
949  bool found;
950  MemoryContext oldctx;
951 
952  Assert(RelationSyncCache != NULL);
953 
954  /* Find cached relation info, creating if not found */
955  entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
956  (void *) &relid,
957  HASH_ENTER, &found);
958  Assert(entry != NULL);
959 
960  /* Not found means schema wasn't sent */
961  if (!found)
962  {
963  /* immediately make a new entry valid enough to satisfy callbacks */
964  entry->schema_sent = false;
965  entry->streamed_txns = NIL;
966  entry->replicate_valid = false;
967  entry->pubactions.pubinsert = entry->pubactions.pubupdate =
968  entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
969  entry->publish_as_relid = InvalidOid;
970  }
971 
972  /* Validate the entry */
973  if (!entry->replicate_valid)
974  {
975  List *pubids = GetRelationPublications(relid);
976  ListCell *lc;
978 
979  /* Reload publications if needed before use. */
980  if (!publications_valid)
981  {
983  if (data->publications)
985 
987  MemoryContextSwitchTo(oldctx);
988  publications_valid = true;
989  }
990 
991  /*
992  * Build publication cache. We can't use one provided by relcache as
993  * relcache considers all publications given relation is in, but here
994  * we only need to consider ones that the subscriber requested.
995  */
996  foreach(lc, data->publications)
997  {
998  Publication *pub = lfirst(lc);
999  bool publish = false;
1000 
1001  if (pub->alltables)
1002  {
1003  publish = true;
1004  if (pub->pubviaroot && am_partition)
1005  publish_as_relid = llast_oid(get_partition_ancestors(relid));
1006  }
1007 
1008  if (!publish)
1009  {
1010  bool ancestor_published = false;
1011 
1012  /*
1013  * For a partition, check if any of the ancestors are
1014  * published. If so, note down the topmost ancestor that is
1015  * published via this publication, which will be used as the
1016  * relation via which to publish the partition's changes.
1017  */
1018  if (am_partition)
1019  {
1020  List *ancestors = get_partition_ancestors(relid);
1021  ListCell *lc2;
1022 
1023  /*
1024  * Find the "topmost" ancestor that is in this
1025  * publication.
1026  */
1027  foreach(lc2, ancestors)
1028  {
1029  Oid ancestor = lfirst_oid(lc2);
1030 
1032  pub->oid))
1033  {
1034  ancestor_published = true;
1035  if (pub->pubviaroot)
1036  publish_as_relid = ancestor;
1037  }
1038  }
1039  }
1040 
1041  if (list_member_oid(pubids, pub->oid) || ancestor_published)
1042  publish = true;
1043  }
1044 
1045  /*
1046  * Don't publish changes for partitioned tables, because
1047  * publishing those of its partitions suffices, unless partition
1048  * changes won't be published due to pubviaroot being set.
1049  */
1050  if (publish &&
1051  (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
1052  {
1053  entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
1054  entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
1055  entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
1057  }
1058 
1059  if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
1060  entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
1061  break;
1062  }
1063 
1064  list_free(pubids);
1065 
1067  entry->replicate_valid = true;
1068  }
1069 
1070  return entry;
1071 }
1072 
1073 /*
1074  * Cleanup list of streamed transactions and update the schema_sent flag.
1075  *
1076  * When a streamed transaction commits or aborts, we need to remove the
1077  * toplevel XID from the schema cache. If the transaction aborted, the
1078  * subscriber will simply throw away the schema records we streamed, so
1079  * we don't need to do anything else.
1080  *
1081  * If the transaction is committed, the subscriber will update the relation
1082  * cache - so tweak the schema_sent flag accordingly.
1083  */
1084 static void
1086 {
1087  HASH_SEQ_STATUS hash_seq;
1088  RelationSyncEntry *entry;
1089  ListCell *lc;
1090 
1091  Assert(RelationSyncCache != NULL);
1092 
1093  hash_seq_init(&hash_seq, RelationSyncCache);
1094  while ((entry = hash_seq_search(&hash_seq)) != NULL)
1095  {
1096  /*
1097  * We can set the schema_sent flag for an entry that has committed xid
1098  * in the list as that ensures that the subscriber would have the
1099  * corresponding schema and we don't need to send it unless there is
1100  * any invalidation for that relation.
1101  */
1102  foreach(lc, entry->streamed_txns)
1103  {
1104  if (xid == (uint32) lfirst_int(lc))
1105  {
1106  if (is_commit)
1107  entry->schema_sent = true;
1108 
1109  entry->streamed_txns =
1111  break;
1112  }
1113  }
1114  }
1115 }
1116 
1117 /*
1118  * Relcache invalidation callback
1119  */
1120 static void
1122 {
1123  RelationSyncEntry *entry;
1124 
1125  /*
1126  * We can get here if the plugin was used in SQL interface as the
1127  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
1128  * is no way to unregister the relcache invalidation callback.
1129  */
1130  if (RelationSyncCache == NULL)
1131  return;
1132 
1133  /*
1134  * Nobody keeps pointers to entries in this hash table around outside
1135  * logical decoding callback calls - but invalidation events can come in
1136  * *during* a callback if we access the relcache in the callback. Because
1137  * of that we must mark the cache entry as invalid but not remove it from
1138  * the hash while it could still be referenced, then prune it at a later
1139  * safe point.
1140  *
1141  * Getting invalidations for relations that aren't in the table is
1142  * entirely normal, since there's no way to unregister for an invalidation
1143  * event. So we don't care if it's found or not.
1144  */
1145  entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
1146  HASH_FIND, NULL);
1147 
1148  /*
1149  * Reset schema sent status as the relation definition may have changed.
1150  */
1151  if (entry != NULL)
1152  {
1153  entry->schema_sent = false;
1154  list_free(entry->streamed_txns);
1155  entry->streamed_txns = NIL;
1156  }
1157 }
1158 
1159 /*
1160  * Publication relation map syscache invalidation callback
1161  */
1162 static void
1164 {
1166  RelationSyncEntry *entry;
1167 
1168  /*
1169  * We can get here if the plugin was used in SQL interface as the
1170  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
1171  * is no way to unregister the relcache invalidation callback.
1172  */
1173  if (RelationSyncCache == NULL)
1174  return;
1175 
1176  /*
1177  * There is no way to find which entry in our cache the hash belongs to so
1178  * mark the whole cache as invalid.
1179  */
1180  hash_seq_init(&status, RelationSyncCache);
1181  while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
1182  entry->replicate_valid = false;
1183 }
List * streamed_txns
Definition: pgoutput.c:96
LogicalDecodeTruncateCB truncate_cb
#define NIL
Definition: pg_list.h:65
static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
Definition: pgoutput.c:498
static void parse_output_parameters(List *options, uint32 *protocol_version, List **publication_names, bool *binary, bool *enable_streaming)
Definition: pgoutput.c:159
PublicationActions pubactions
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:862
TupleDesc CreateTupleDescCopy(TupleDesc tupdesc)
Definition: tupdesc.c:110
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:579
#define AllocSetContextCreate
Definition: memutils.h:170
#define DEBUG1
Definition: elog.h:25
static bool publications_valid
Definition: pgoutput.c:61
RepOriginId origin_id
static bool in_streaming
Definition: pgoutput.c:62
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:311
bool replicate_valid
Definition: pgoutput.c:99
#define HASH_CONTEXT
Definition: hsearch.h:102
struct ReorderBufferChange::@99::@101 truncate
#define HASH_ELEM
Definition: hsearch.h:95
uint32 TransactionId
Definition: c.h:575
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:746
MemoryContext hcxt
Definition: hsearch.h:86
#define RelationGetDescr(relation)
Definition: rel.h:483
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: pgoutput.c:137
void logicalrep_write_stream_stop(StringInfo out)
Definition: proto.c:790
static void maybe_send_schema(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
Definition: pgoutput.c:385
MemoryContext context
Definition: pgoutput.h:20
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
static void rel_sync_cache_relation_cb(Datum arg, Oid relid)
Definition: pgoutput.c:1121
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1947
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:761
TupleConversionMap * map
Definition: pgoutput.c:116
static void pgoutput_shutdown(LogicalDecodingContext *ctx)
Definition: pgoutput.c:713
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:87
static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:370
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
uint16 RepOriginId
Definition: xlogdefs.h:58
Size entrysize
Definition: hsearch.h:76
#define strVal(v)
Definition: value.h:54
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid)
Definition: proto.c:850
int errcode(int sqlerrcode)
Definition: elog.c:704
static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx)
Definition: pgoutput.c:458
void * output_plugin_private
Definition: logical.h:75
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:74
MemoryContext context
Definition: logical.h:35
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, bool binary)
Definition: proto.c:259
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:137
#define llast_oid(l)
Definition: pg_list.h:196
LogicalDecodeStreamAbortCB stream_abort_cb
List * output_plugin_options
Definition: logical.h:58
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:112
#define PG_UINT32_MAX
Definition: c.h:513
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:432
Form_pg_class rd_rel
Definition: rel.h:110
unsigned int Oid
Definition: postgres_ext.h:31
struct RelationSyncEntry RelationSyncEntry
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:1163
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
void list_free_deep(List *list)
Definition: list.c:1390
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, HeapTuple newtuple, bool binary)
Definition: proto.c:141
Publication * GetPublicationByName(const char *pubname, bool missing_ok)
XLogRecPtr origin_lsn
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
Definition: inval.c:1476
#define foreach_delete_current(lst, cell)
Definition: pg_list.h:357
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
Definition: proto.c:756
OutputPluginOutputType output_type
Definition: output_plugin.h:28
#define rbtxn_is_streamed(txn)
Definition: dynahash.c:219
bool defGetBoolean(DefElem *def)
Definition: define.c:111
List * GetRelationPublications(Oid relid)
static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:849
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
Definition: proto.c:368
#define ERROR
Definition: elog.h:45
#define RelationIsValid(relation)
Definition: rel.h:430
#define lfirst_int(lc)
Definition: pg_list.h:170
LogicalDecodeCommitCB commit_cb
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:36
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define LOGICALREP_PROTO_MIN_VERSION_NUM
Definition: logicalproto.h:31
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:102
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3741
bool is_publishable_relation(Relation rel)
static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: pgoutput.c:244
List * publication_names
Definition: pgoutput.h:25
#define InvalidTransactionId
Definition: transam.h:31
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:193
unsigned int uint32
Definition: c.h:429
#define LOGICALREP_PROTO_MAX_VERSION_NUM
Definition: logicalproto.h:34
struct ReorderBufferTXN * toptxn
void RelationClose(Relation relation)
Definition: relcache.c:2123
HeapTuple execute_attr_map_tuple(HeapTuple tuple, TupleConversionMap *map)
Definition: tupconvert.c:139
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:799
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Oid relid)
Definition: pgoutput.c:944
List * lappend_int(List *list, int datum)
Definition: list.c:339
bool get_rel_relispartition(Oid relid)
Definition: lsyscache.c:1971
Node * arg
Definition: parsenodes.h:734
List * lappend(List *list, void *datum)
Definition: list.c:321
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:45
static HTAB * RelationSyncCache
Definition: pgoutput.c:120
static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:905
#define HASH_BLOBS
Definition: hsearch.h:97
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
Definition: logical.c:646
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1434
void * palloc0(Size size)
Definition: mcxt.c:981
LogicalDecodeChangeCB change_cb
uintptr_t Datum
Definition: postgres.h:367
LogicalDecodeStreamTruncateCB stream_truncate_cb
union ReorderBufferChange::@99 data
TransactionId xid
Size keysize
Definition: hsearch.h:75
PG_MODULE_MAGIC
Definition: pgoutput.c:31
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:155
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:33
static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:923
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:620
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:674
struct ReorderBufferChange::@99::@100 tp
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:792
#define lfirst(lc)
Definition: pg_list.h:169
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
Definition: pgoutput.c:700
bool binary
Definition: pgoutput.h:27
static int list_length(const List *l)
Definition: pg_list.h:149
LogicalDecodeShutdownCB shutdown_cb
LogicalDecodeStreamCommitCB stream_commit_cb
static List * LoadPublications(List *pubnames)
Definition: pgoutput.c:726
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
LogicalDecodeStartupCB startup_cb
#define InvalidRepOriginId
Definition: origin.h:33
List * publications
Definition: pgoutput.h:26
LogicalDecodeStreamStartCB stream_start_cb
static void init_rel_sync_cache(MemoryContext decoding_context)
Definition: pgoutput.c:877
#define FirstGenbkiObjectId
Definition: transam.h:188
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
Definition: pgoutput.c:1085
int errmsg(const char *fmt,...)
Definition: elog.c:915
static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:801
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:633
void list_free(List *list)
Definition: list.c:1376
#define elog(elevel,...)
Definition: elog.h:228
StringInfo out
Definition: logical.h:70
int i
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
Definition: proto.c:422
void * arg
LogicalDecodeBeginCB begin_cb
char * defname
Definition: parsenodes.h:733
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary)
Definition: proto.c:185
LogicalDecodeStreamStopCB stream_stop_cb
static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Definition: pgoutput.c:635
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:227
static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:334
LogicalDecodeFilterByOriginCB filter_by_origin_cb
Definition: pg_list.h:50
LogicalDecodeStreamChangeCB stream_change_cb
List * get_partition_ancestors(Oid relid)
Definition: partition.c:115
#define RelationGetRelid(relation)
Definition: rel.h:457
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2017
#define lfirst_oid(lc)
Definition: pg_list.h:171
static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
Definition: pgoutput.c:820
MemoryContext CacheMemoryContext
Definition: mcxt.c:47
PublicationActions pubactions
Definition: pgoutput.c:100
bool scanint8(const char *str, bool errorOK, int64 *result)
Definition: int8.c:55
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:951
uint32 protocol_version
Definition: pgoutput.h:24