PostgreSQL Source Code  git master
pgoutput.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pgoutput.c
4  * Logical Replication output plugin
5  *
6  * Copyright (c) 2012-2020, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/pgoutput/pgoutput.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/tupconvert.h"
16 #include "catalog/partition.h"
17 #include "catalog/pg_publication.h"
18 #include "commands/defrem.h"
19 #include "fmgr.h"
20 #include "replication/logical.h"
22 #include "replication/origin.h"
23 #include "replication/pgoutput.h"
24 #include "utils/int8.h"
25 #include "utils/inval.h"
26 #include "utils/lsyscache.h"
27 #include "utils/memutils.h"
28 #include "utils/syscache.h"
29 #include "utils/varlena.h"
30 
32 
34 
36  OutputPluginOptions *opt, bool is_init);
39  ReorderBufferTXN *txn);
41  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
43  ReorderBufferTXN *txn, Relation rel,
44  ReorderBufferChange *change);
46  ReorderBufferTXN *txn, int nrelations, Relation relations[],
47  ReorderBufferChange *change);
49  RepOriginId origin_id);
50 
51 static bool publications_valid;
52 
53 static List *LoadPublications(List *pubnames);
54 static void publication_invalidation_cb(Datum arg, int cacheid,
55  uint32 hashvalue);
56 static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx);
57 
58 /*
59  * Entry in the map used to remember which relation schemas we sent.
60  *
61  * For partitions, 'pubactions' considers not only the table's own
62  * publications, but also those of all of its ancestors.
63  */
64 typedef struct RelationSyncEntry
65 {
66  Oid relid; /* relation oid */
67 
68  /*
69  * Did we send the schema? If ancestor relid is set, its schema must also
70  * have been sent for this to be true.
71  */
73 
76 
77  /*
78  * OID of the relation to publish changes as. For a partition, this may
79  * be set to one of its ancestors whose schema will be used when
80  * replicating changes, if publish_via_partition_root is set for the
81  * publication.
82  */
84 
85  /*
86  * Map used when replicating using an ancestor's schema to convert tuples
87  * from partition's type to the ancestor's; NULL if publish_as_relid is
88  * same as 'relid' or if unnecessary due to partition and the ancestor
89  * having identical TupleDesc.
90  */
93 
94 /* Map used to remember which relation schemas we sent. */
95 static HTAB *RelationSyncCache = NULL;
96 
97 static void init_rel_sync_cache(MemoryContext decoding_context);
100 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
101  uint32 hashvalue);
102 
103 /*
104  * Specify output plugin callbacks
105  */
106 void
108 {
110 
118 }
119 
120 static void
122  List **publication_names, bool *binary)
123 {
124  ListCell *lc;
125  bool protocol_version_given = false;
126  bool publication_names_given = false;
127  bool binary_option_given = false;
128 
129  *binary = false;
130 
131  foreach(lc, options)
132  {
133  DefElem *defel = (DefElem *) lfirst(lc);
134 
135  Assert(defel->arg == NULL || IsA(defel->arg, String));
136 
137  /* Check each param, whether or not we recognize it */
138  if (strcmp(defel->defname, "proto_version") == 0)
139  {
140  int64 parsed;
141 
142  if (protocol_version_given)
143  ereport(ERROR,
144  (errcode(ERRCODE_SYNTAX_ERROR),
145  errmsg("conflicting or redundant options")));
146  protocol_version_given = true;
147 
148  if (!scanint8(strVal(defel->arg), true, &parsed))
149  ereport(ERROR,
150  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
151  errmsg("invalid proto_version")));
152 
153  if (parsed > PG_UINT32_MAX || parsed < 0)
154  ereport(ERROR,
155  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
156  errmsg("proto_version \"%s\" out of range",
157  strVal(defel->arg))));
158 
159  *protocol_version = (uint32) parsed;
160  }
161  else if (strcmp(defel->defname, "publication_names") == 0)
162  {
163  if (publication_names_given)
164  ereport(ERROR,
165  (errcode(ERRCODE_SYNTAX_ERROR),
166  errmsg("conflicting or redundant options")));
167  publication_names_given = true;
168 
169  if (!SplitIdentifierString(strVal(defel->arg), ',',
170  publication_names))
171  ereport(ERROR,
172  (errcode(ERRCODE_INVALID_NAME),
173  errmsg("invalid publication_names syntax")));
174  }
175  else if (strcmp(defel->defname, "binary") == 0)
176  {
177  if (binary_option_given)
178  ereport(ERROR,
179  (errcode(ERRCODE_SYNTAX_ERROR),
180  errmsg("conflicting or redundant options")));
181  binary_option_given = true;
182 
183  *binary = defGetBoolean(defel);
184  }
185  else
186  elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
187  }
188 }
189 
190 /*
191  * Initialize this plugin
192  */
193 static void
195  bool is_init)
196 {
197  PGOutputData *data = palloc0(sizeof(PGOutputData));
198 
199  /* Create our memory context for private allocations. */
201  "logical replication output context",
203 
204  ctx->output_plugin_private = data;
205 
206  /* This plugin uses binary protocol. */
208 
209  /*
210  * This is replication start and not slot initialization.
211  *
212  * Parse and validate options passed by the client.
213  */
214  if (!is_init)
215  {
216  /* Parse the params and ERROR if we see any we don't recognize */
218  &data->protocol_version,
219  &data->publication_names,
220  &data->binary);
221 
222  /* Check if we support requested protocol */
224  ereport(ERROR,
225  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
226  errmsg("client sent proto_version=%d but we only support protocol %d or lower",
228 
230  ereport(ERROR,
231  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
232  errmsg("client sent proto_version=%d but we only support protocol %d or higher",
234 
235  if (list_length(data->publication_names) < 1)
236  ereport(ERROR,
237  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
238  errmsg("publication_names parameter missing")));
239 
240  /* Init publication state. */
241  data->publications = NIL;
242  publications_valid = false;
245  (Datum) 0);
246 
247  /* Initialize relation schema cache. */
249  }
250 }
251 
252 /*
253  * BEGIN callback
254  */
255 static void
257 {
258  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
259 
260  OutputPluginPrepareWrite(ctx, !send_replication_origin);
261  logicalrep_write_begin(ctx->out, txn);
262 
263  if (send_replication_origin)
264  {
265  char *origin;
266 
267  /* Message boundary */
268  OutputPluginWrite(ctx, false);
269  OutputPluginPrepareWrite(ctx, true);
270 
271  /*----------
272  * XXX: which behaviour do we want here?
273  *
274  * Alternatives:
275  * - don't send origin message if origin name not found
276  * (that's what we do now)
277  * - throw error - that will break replication, not good
278  * - send some special "unknown" origin
279  *----------
280  */
281  if (replorigin_by_oid(txn->origin_id, true, &origin))
282  logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
283  }
284 
285  OutputPluginWrite(ctx, true);
286 }
287 
288 /*
289  * COMMIT callback
290  */
291 static void
293  XLogRecPtr commit_lsn)
294 {
296 
297  OutputPluginPrepareWrite(ctx, true);
298  logicalrep_write_commit(ctx->out, txn, commit_lsn);
299  OutputPluginWrite(ctx, true);
300 }
301 
302 /*
303  * Write the current schema of the relation and its ancestor (if any) if not
304  * done yet.
305  */
306 static void
308  Relation relation, RelationSyncEntry *relentry)
309 {
310  if (relentry->schema_sent)
311  return;
312 
313  /* If needed, send the ancestor's schema first. */
314  if (relentry->publish_as_relid != RelationGetRelid(relation))
315  {
316  Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
317  TupleDesc indesc = RelationGetDescr(relation);
318  TupleDesc outdesc = RelationGetDescr(ancestor);
319  MemoryContext oldctx;
320 
321  /* Map must live as long as the session does. */
323  relentry->map = convert_tuples_by_name(CreateTupleDescCopy(indesc),
324  CreateTupleDescCopy(outdesc));
325  MemoryContextSwitchTo(oldctx);
326  send_relation_and_attrs(ancestor, ctx);
327  RelationClose(ancestor);
328  }
329 
330  send_relation_and_attrs(relation, ctx);
331  relentry->schema_sent = true;
332 }
333 
334 /*
335  * Sends a relation
336  */
337 static void
339 {
340  TupleDesc desc = RelationGetDescr(relation);
341  int i;
342 
343  /*
344  * Write out type info if needed. We do that only for user-created types.
345  * We use FirstGenbkiObjectId as the cutoff, so that we only consider
346  * objects with hand-assigned OIDs to be "built in", not for instance any
347  * function or type defined in the information_schema. This is important
348  * because only hand-assigned OIDs can be expected to remain stable across
349  * major versions.
350  */
351  for (i = 0; i < desc->natts; i++)
352  {
353  Form_pg_attribute att = TupleDescAttr(desc, i);
354 
355  if (att->attisdropped || att->attgenerated)
356  continue;
357 
358  if (att->atttypid < FirstGenbkiObjectId)
359  continue;
360 
361  OutputPluginPrepareWrite(ctx, false);
362  logicalrep_write_typ(ctx->out, att->atttypid);
363  OutputPluginWrite(ctx, false);
364  }
365 
366  OutputPluginPrepareWrite(ctx, false);
367  logicalrep_write_rel(ctx->out, relation);
368  OutputPluginWrite(ctx, false);
369 }
370 
371 /*
372  * Sends the decoded DML over wire.
373  */
374 static void
376  Relation relation, ReorderBufferChange *change)
377 {
379  MemoryContext old;
380  RelationSyncEntry *relentry;
381 
382  if (!is_publishable_relation(relation))
383  return;
384 
385  relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
386 
387  /* First check the table filter */
388  switch (change->action)
389  {
391  if (!relentry->pubactions.pubinsert)
392  return;
393  break;
395  if (!relentry->pubactions.pubupdate)
396  return;
397  break;
399  if (!relentry->pubactions.pubdelete)
400  return;
401  break;
402  default:
403  Assert(false);
404  }
405 
406  /* Avoid leaking memory by using and resetting our own context */
407  old = MemoryContextSwitchTo(data->context);
408 
409  maybe_send_schema(ctx, relation, relentry);
410 
411  /* Send the data */
412  switch (change->action)
413  {
415  {
416  HeapTuple tuple = &change->data.tp.newtuple->tuple;
417 
418  /* Switch relation if publishing via root. */
419  if (relentry->publish_as_relid != RelationGetRelid(relation))
420  {
421  Assert(relation->rd_rel->relispartition);
422  relation = RelationIdGetRelation(relentry->publish_as_relid);
423  /* Convert tuple if needed. */
424  if (relentry->map)
425  tuple = execute_attr_map_tuple(tuple, relentry->map);
426  }
427 
428  OutputPluginPrepareWrite(ctx, true);
429  logicalrep_write_insert(ctx->out, relation, tuple,
430  data->binary);
431  OutputPluginWrite(ctx, true);
432  break;
433  }
435  {
436  HeapTuple oldtuple = change->data.tp.oldtuple ?
437  &change->data.tp.oldtuple->tuple : NULL;
438  HeapTuple newtuple = &change->data.tp.newtuple->tuple;
439 
440  /* Switch relation if publishing via root. */
441  if (relentry->publish_as_relid != RelationGetRelid(relation))
442  {
443  Assert(relation->rd_rel->relispartition);
444  relation = RelationIdGetRelation(relentry->publish_as_relid);
445  /* Convert tuples if needed. */
446  if (relentry->map)
447  {
448  oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
449  newtuple = execute_attr_map_tuple(newtuple, relentry->map);
450  }
451  }
452 
453  OutputPluginPrepareWrite(ctx, true);
454  logicalrep_write_update(ctx->out, relation, oldtuple, newtuple,
455  data->binary);
456  OutputPluginWrite(ctx, true);
457  break;
458  }
460  if (change->data.tp.oldtuple)
461  {
462  HeapTuple oldtuple = &change->data.tp.oldtuple->tuple;
463 
464  /* Switch relation if publishing via root. */
465  if (relentry->publish_as_relid != RelationGetRelid(relation))
466  {
467  Assert(relation->rd_rel->relispartition);
468  relation = RelationIdGetRelation(relentry->publish_as_relid);
469  /* Convert tuple if needed. */
470  if (relentry->map)
471  oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
472  }
473 
474  OutputPluginPrepareWrite(ctx, true);
475  logicalrep_write_delete(ctx->out, relation, oldtuple,
476  data->binary);
477  OutputPluginWrite(ctx, true);
478  }
479  else
480  elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
481  break;
482  default:
483  Assert(false);
484  }
485 
486  /* Cleanup */
489 }
490 
491 static void
493  int nrelations, Relation relations[], ReorderBufferChange *change)
494 {
496  MemoryContext old;
497  RelationSyncEntry *relentry;
498  int i;
499  int nrelids;
500  Oid *relids;
501 
502  old = MemoryContextSwitchTo(data->context);
503 
504  relids = palloc0(nrelations * sizeof(Oid));
505  nrelids = 0;
506 
507  for (i = 0; i < nrelations; i++)
508  {
509  Relation relation = relations[i];
510  Oid relid = RelationGetRelid(relation);
511 
512  if (!is_publishable_relation(relation))
513  continue;
514 
515  relentry = get_rel_sync_entry(data, relid);
516 
517  if (!relentry->pubactions.pubtruncate)
518  continue;
519 
520  /*
521  * Don't send partitions if the publication wants to send only the
522  * root tables through it.
523  */
524  if (relation->rd_rel->relispartition &&
525  relentry->publish_as_relid != relid)
526  continue;
527 
528  relids[nrelids++] = relid;
529  maybe_send_schema(ctx, relation, relentry);
530  }
531 
532  if (nrelids > 0)
533  {
534  OutputPluginPrepareWrite(ctx, true);
536  nrelids,
537  relids,
538  change->data.truncate.cascade,
539  change->data.truncate.restart_seqs);
540  OutputPluginWrite(ctx, true);
541  }
542 
545 }
546 
547 /*
548  * Currently we always forward.
549  */
550 static bool
552  RepOriginId origin_id)
553 {
554  return false;
555 }
556 
557 /*
558  * Shutdown the output plugin.
559  *
560  * Note, we don't need to clean the data->context as it's child context
561  * of the ctx->context so it will be cleaned up by logical decoding machinery.
562  */
563 static void
565 {
566  if (RelationSyncCache)
567  {
568  hash_destroy(RelationSyncCache);
569  RelationSyncCache = NULL;
570  }
571 }
572 
573 /*
574  * Load publications from the list of publication names.
575  */
576 static List *
578 {
579  List *result = NIL;
580  ListCell *lc;
581 
582  foreach(lc, pubnames)
583  {
584  char *pubname = (char *) lfirst(lc);
585  Publication *pub = GetPublicationByName(pubname, false);
586 
587  result = lappend(result, pub);
588  }
589 
590  return result;
591 }
592 
593 /*
594  * Publication cache invalidation callback.
595  */
596 static void
597 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
598 {
599  publications_valid = false;
600 
601  /*
602  * Also invalidate per-relation cache so that next time the filtering info
603  * is checked it will be updated with the new publication settings.
604  */
605  rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
606 }
607 
608 /*
609  * Initialize the relation schema sync cache for a decoding session.
610  *
611  * The hash table is destroyed at the end of a decoding session. While
612  * relcache invalidations still exist and will still be invoked, they
613  * will just see the null hash table global and take no action.
614  */
615 static void
617 {
618  HASHCTL ctl;
619  MemoryContext old_ctxt;
620 
621  if (RelationSyncCache != NULL)
622  return;
623 
624  /* Make a new hash table for the cache */
625  MemSet(&ctl, 0, sizeof(ctl));
626  ctl.keysize = sizeof(Oid);
627  ctl.entrysize = sizeof(RelationSyncEntry);
628  ctl.hcxt = cachectx;
629 
630  old_ctxt = MemoryContextSwitchTo(cachectx);
631  RelationSyncCache = hash_create("logical replication output relation cache",
632  128, &ctl,
634  (void) MemoryContextSwitchTo(old_ctxt);
635 
636  Assert(RelationSyncCache != NULL);
637 
641  (Datum) 0);
642 }
643 
644 /*
645  * Find or create entry in the relation schema cache.
646  *
647  * This looks up publications that the given relation is directly or
648  * indirectly part of (the latter if it's really the relation's ancestor that
649  * is part of a publication) and fills up the found entry with the information
650  * about which operations to publish and whether to use an ancestor's schema
651  * when publishing.
652  */
653 static RelationSyncEntry *
655 {
656  RelationSyncEntry *entry;
657  bool am_partition = get_rel_relispartition(relid);
658  char relkind = get_rel_relkind(relid);
659  bool found;
660  MemoryContext oldctx;
661 
662  Assert(RelationSyncCache != NULL);
663 
664  /* Find cached function info, creating if not found */
666  entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
667  (void *) &relid,
668  HASH_ENTER, &found);
669  MemoryContextSwitchTo(oldctx);
670  Assert(entry != NULL);
671 
672  /* Not found means schema wasn't sent */
673  if (!found || !entry->replicate_valid)
674  {
675  List *pubids = GetRelationPublications(relid);
676  ListCell *lc;
678 
679  /* Reload publications if needed before use. */
680  if (!publications_valid)
681  {
683  if (data->publications)
685 
687  MemoryContextSwitchTo(oldctx);
688  publications_valid = true;
689  }
690 
691  /*
692  * Build publication cache. We can't use one provided by relcache as
693  * relcache considers all publications given relation is in, but here
694  * we only need to consider ones that the subscriber requested.
695  */
696  entry->pubactions.pubinsert = entry->pubactions.pubupdate =
697  entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
698 
699  foreach(lc, data->publications)
700  {
701  Publication *pub = lfirst(lc);
702  bool publish = false;
703 
704  if (pub->alltables)
705  {
706  publish = true;
707  if (pub->pubviaroot && am_partition)
708  publish_as_relid = llast_oid(get_partition_ancestors(relid));
709  }
710 
711  if (!publish)
712  {
713  bool ancestor_published = false;
714 
715  /*
716  * For a partition, check if any of the ancestors are
717  * published. If so, note down the topmost ancestor that is
718  * published via this publication, which will be used as the
719  * relation via which to publish the partition's changes.
720  */
721  if (am_partition)
722  {
723  List *ancestors = get_partition_ancestors(relid);
724  ListCell *lc2;
725 
726  /*
727  * Find the "topmost" ancestor that is in this
728  * publication.
729  */
730  foreach(lc2, ancestors)
731  {
732  Oid ancestor = lfirst_oid(lc2);
733 
735  pub->oid))
736  {
737  ancestor_published = true;
738  if (pub->pubviaroot)
739  publish_as_relid = ancestor;
740  }
741  }
742  }
743 
744  if (list_member_oid(pubids, pub->oid) || ancestor_published)
745  publish = true;
746  }
747 
748  /*
749  * Don't publish changes for partitioned tables, because
750  * publishing those of its partitions suffices, unless partition
751  * changes won't be published due to pubviaroot being set.
752  */
753  if (publish &&
754  (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
755  {
756  entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
757  entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
758  entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
760  }
761 
762  if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
763  entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
764  break;
765  }
766 
767  list_free(pubids);
768 
770  entry->replicate_valid = true;
771  }
772 
773  if (!found)
774  entry->schema_sent = false;
775 
776  return entry;
777 }
778 
779 /*
780  * Relcache invalidation callback
781  */
782 static void
784 {
785  RelationSyncEntry *entry;
786 
787  /*
788  * We can get here if the plugin was used in SQL interface as the
789  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
790  * is no way to unregister the relcache invalidation callback.
791  */
792  if (RelationSyncCache == NULL)
793  return;
794 
795  /*
796  * Nobody keeps pointers to entries in this hash table around outside
797  * logical decoding callback calls - but invalidation events can come in
798  * *during* a callback if we access the relcache in the callback. Because
799  * of that we must mark the cache entry as invalid but not remove it from
800  * the hash while it could still be referenced, then prune it at a later
801  * safe point.
802  *
803  * Getting invalidations for relations that aren't in the table is
804  * entirely normal, since there's no way to unregister for an invalidation
805  * event. So we don't care if it's found or not.
806  */
807  entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
808  HASH_FIND, NULL);
809 
810  /*
811  * Reset schema sent status as the relation definition may have changed.
812  */
813  if (entry != NULL)
814  entry->schema_sent = false;
815 }
816 
817 /*
818  * Publication relation map syscache invalidation callback
819  */
820 static void
822 {
824  RelationSyncEntry *entry;
825 
826  /*
827  * We can get here if the plugin was used in SQL interface as the
828  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
829  * is no way to unregister the relcache invalidation callback.
830  */
831  if (RelationSyncCache == NULL)
832  return;
833 
834  /*
835  * There is no way to find which entry in our cache the hash belongs to so
836  * mark the whole cache as invalid.
837  */
838  hash_seq_init(&status, RelationSyncCache);
839  while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
840  entry->replicate_valid = false;
841 }
LogicalDecodeTruncateCB truncate_cb
#define NIL
Definition: pg_list.h:65
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
void logicalrep_write_typ(StringInfo out, Oid typoid)
Definition: proto.c:399
static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
Definition: pgoutput.c:375
PublicationActions pubactions
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:835
TupleDesc CreateTupleDescCopy(TupleDesc tupdesc)
Definition: tupdesc.c:110
#define IsA(nodeptr, _type_)
Definition: nodes.h:580
#define AllocSetContextCreate
Definition: memutils.h:170
#define DEBUG1
Definition: elog.h:25
static bool publications_valid
Definition: pgoutput.c:51
RepOriginId origin_id
bool replicate_valid
Definition: pgoutput.c:74
#define HASH_CONTEXT
Definition: hsearch.h:93
struct ReorderBufferChange::@99::@101 truncate
#define HASH_ELEM
Definition: hsearch.h:87
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:597
MemoryContext hcxt
Definition: hsearch.h:78
#define RelationGetDescr(relation)
Definition: rel.h:482
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: pgoutput.c:107
MemoryContext context
Definition: pgoutput.h:20
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
static void rel_sync_cache_relation_cb(Datum arg, Oid relid)
Definition: pgoutput.c:783
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1915
TupleConversionMap * map
Definition: pgoutput.c:91
static void pgoutput_shutdown(LogicalDecodingContext *ctx)
Definition: pgoutput.c:564
static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:292
static void maybe_send_schema(LogicalDecodingContext *ctx, Relation relation, RelationSyncEntry *relentry)
Definition: pgoutput.c:307
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
uint16 RepOriginId
Definition: xlogdefs.h:58
Size entrysize
Definition: hsearch.h:73
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:610
void * output_plugin_private
Definition: logical.h:75
#define MemSet(start, val, len)
Definition: c.h:949
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:74
MemoryContext context
Definition: logical.h:35
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
#define llast_oid(l)
Definition: pg_list.h:217
List * output_plugin_options
Definition: logical.h:58
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:112
#define PG_UINT32_MAX
Definition: c.h:458
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:927
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:432
Form_pg_class rd_rel
Definition: rel.h:109
unsigned int Oid
Definition: postgres_ext.h:31
struct RelationSyncEntry RelationSyncEntry
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:821
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
void list_free_deep(List *list)
Definition: list.c:1390
Publication * GetPublicationByName(const char *pubname, bool missing_ok)
XLogRecPtr origin_lsn
Oid publish_as_relid
Definition: pgoutput.c:83
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
Definition: inval.c:1476
OutputPluginOutputType output_type
Definition: output_plugin.h:28
Definition: dynahash.c:220
bool defGetBoolean(DefElem *def)
Definition: define.c:111
List * GetRelationPublications(Oid relid)
#define ERROR
Definition: elog.h:43
LogicalDecodeCommitCB commit_cb
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:36
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define LOGICALREP_PROTO_MIN_VERSION_NUM
Definition: logicalproto.h:27
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:102
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3686
bool is_publishable_relation(Relation rel)
static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: pgoutput.c:194
List * publication_names
Definition: pgoutput.h:25
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:193
unsigned int uint32
Definition: c.h:374
void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:297
void RelationClose(Relation relation)
Definition: relcache.c:2103
HeapTuple execute_attr_map_tuple(HeapTuple tuple, TupleConversionMap *map)
Definition: tupconvert.c:139
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Oid relid)
Definition: pgoutput.c:654
bool get_rel_relispartition(Oid relid)
Definition: lsyscache.c:1939
Node * arg
Definition: parsenodes.h:733
void logicalrep_write_rel(StringInfo out, Relation rel)
Definition: proto.c:349
void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool binary)
Definition: proto.c:250
List * lappend(List *list, void *datum)
Definition: list.c:321
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:45
static HTAB * RelationSyncCache
Definition: pgoutput.c:95
static void parse_output_parameters(List *options, uint32 *protocol_version, List **publication_names, bool *binary)
Definition: pgoutput.c:121
void logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple, bool binary)
Definition: proto.c:141
#define HASH_BLOBS
Definition: hsearch.h:88
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
Definition: logical.c:609
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1434
void * palloc0(Size size)
Definition: mcxt.c:980
LogicalDecodeChangeCB change_cb
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:328
uintptr_t Datum
Definition: postgres.h:367
union ReorderBufferChange::@99 data
Size keysize
Definition: hsearch.h:72
PG_MODULE_MAGIC
Definition: pgoutput.c:31
#define ereport(elevel,...)
Definition: elog.h:144
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:583
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:674
struct ReorderBufferChange::@99::@100 tp
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:745
#define lfirst(lc)
Definition: pg_list.h:190
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
Definition: pgoutput.c:551
bool binary
Definition: pgoutput.h:27
static int list_length(const List *l)
Definition: pg_list.h:169
LogicalDecodeShutdownCB shutdown_cb
static List * LoadPublications(List *pubnames)
Definition: pgoutput.c:577
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1410
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1400
LogicalDecodeStartupCB startup_cb
#define InvalidRepOriginId
Definition: origin.h:33
List * publications
Definition: pgoutput.h:26
static void init_rel_sync_cache(MemoryContext decoding_context)
Definition: pgoutput.c:616
#define FirstGenbkiObjectId
Definition: transam.h:153
int errmsg(const char *fmt,...)
Definition: elog.c:824
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:596
static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx)
Definition: pgoutput.c:338
void list_free(List *list)
Definition: list.c:1376
#define elog(elevel,...)
Definition: elog.h:214
StringInfo out
Definition: logical.h:70
int i
void * arg
LogicalDecodeBeginCB begin_cb
char * defname
Definition: parsenodes.h:732
void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary)
Definition: proto.c:180
static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Definition: pgoutput.c:492
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:225
static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:256
LogicalDecodeFilterByOriginCB filter_by_origin_cb
Definition: pg_list.h:50
List * get_partition_ancestors(Oid relid)
Definition: partition.c:115
#define RelationGetRelid(relation)
Definition: rel.h:456
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:1997
#define lfirst_oid(lc)
Definition: pg_list.h:192
MemoryContext CacheMemoryContext
Definition: mcxt.c:47
PublicationActions pubactions
Definition: pgoutput.c:75
bool scanint8(const char *str, bool errorOK, int64 *result)
Definition: int8.c:55
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:904
uint32 protocol_version
Definition: pgoutput.h:24