PostgreSQL Source Code  git master
pgoutput.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pgoutput.c
4  * Logical Replication output plugin
5  *
6  * Copyright (c) 2012-2020, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/pgoutput/pgoutput.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/tupconvert.h"
16 #include "catalog/partition.h"
17 #include "catalog/pg_publication.h"
18 #include "fmgr.h"
19 #include "replication/logical.h"
21 #include "replication/origin.h"
22 #include "replication/pgoutput.h"
23 #include "utils/int8.h"
24 #include "utils/inval.h"
25 #include "utils/lsyscache.h"
26 #include "utils/memutils.h"
27 #include "utils/syscache.h"
28 #include "utils/varlena.h"
29 
31 
33 
35  OutputPluginOptions *opt, bool is_init);
38  ReorderBufferTXN *txn);
40  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
42  ReorderBufferTXN *txn, Relation rel,
43  ReorderBufferChange *change);
45  ReorderBufferTXN *txn, int nrelations, Relation relations[],
46  ReorderBufferChange *change);
48  RepOriginId origin_id);
49 
50 static bool publications_valid;
51 
52 static List *LoadPublications(List *pubnames);
53 static void publication_invalidation_cb(Datum arg, int cacheid,
54  uint32 hashvalue);
55 static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx);
56 
57 /*
58  * Entry in the map used to remember which relation schemas we sent.
59  *
60  * For partitions, 'pubactions' considers not only the table's own
61  * publications, but also those of all of its ancestors.
62  */
63 typedef struct RelationSyncEntry
64 {
65  Oid relid; /* relation oid */
66 
67  /*
68  * Did we send the schema? If ancestor relid is set, its schema must also
69  * have been sent for this to be true.
70  */
72 
75 
76  /*
77  * OID of the relation to publish changes as. For a partition, this may
78  * be set to one of its ancestors whose schema will be used when
79  * replicating changes, if publish_via_partition_root is set for the
80  * publication.
81  */
83 
84  /*
85  * Map used when replicating using an ancestor's schema to convert tuples
86  * from partition's type to the ancestor's; NULL if publish_as_relid is
87  * same as 'relid' or if unnecessary due to partition and the ancestor
88  * having identical TupleDesc.
89  */
92 
93 /* Map used to remember which relation schemas we sent. */
94 static HTAB *RelationSyncCache = NULL;
95 
96 static void init_rel_sync_cache(MemoryContext decoding_context);
99 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
100  uint32 hashvalue);
101 
102 /*
103  * Specify output plugin callbacks
104  */
105 void
107 {
109 
117 }
118 
119 static void
121  List **publication_names)
122 {
123  ListCell *lc;
124  bool protocol_version_given = false;
125  bool publication_names_given = false;
126 
127  foreach(lc, options)
128  {
129  DefElem *defel = (DefElem *) lfirst(lc);
130 
131  Assert(defel->arg == NULL || IsA(defel->arg, String));
132 
133  /* Check each param, whether or not we recognize it */
134  if (strcmp(defel->defname, "proto_version") == 0)
135  {
136  int64 parsed;
137 
138  if (protocol_version_given)
139  ereport(ERROR,
140  (errcode(ERRCODE_SYNTAX_ERROR),
141  errmsg("conflicting or redundant options")));
142  protocol_version_given = true;
143 
144  if (!scanint8(strVal(defel->arg), true, &parsed))
145  ereport(ERROR,
146  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
147  errmsg("invalid proto_version")));
148 
149  if (parsed > PG_UINT32_MAX || parsed < 0)
150  ereport(ERROR,
151  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
152  errmsg("proto_version \"%s\" out of range",
153  strVal(defel->arg))));
154 
155  *protocol_version = (uint32) parsed;
156  }
157  else if (strcmp(defel->defname, "publication_names") == 0)
158  {
159  if (publication_names_given)
160  ereport(ERROR,
161  (errcode(ERRCODE_SYNTAX_ERROR),
162  errmsg("conflicting or redundant options")));
163  publication_names_given = true;
164 
165  if (!SplitIdentifierString(strVal(defel->arg), ',',
166  publication_names))
167  ereport(ERROR,
168  (errcode(ERRCODE_INVALID_NAME),
169  errmsg("invalid publication_names syntax")));
170  }
171  else
172  elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
173  }
174 }
175 
176 /*
177  * Initialize this plugin
178  */
179 static void
181  bool is_init)
182 {
183  PGOutputData *data = palloc0(sizeof(PGOutputData));
184 
185  /* Create our memory context for private allocations. */
187  "logical replication output context",
189 
190  ctx->output_plugin_private = data;
191 
192  /* This plugin uses binary protocol. */
194 
195  /*
196  * This is replication start and not slot initialization.
197  *
198  * Parse and validate options passed by the client.
199  */
200  if (!is_init)
201  {
202  /* Parse the params and ERROR if we see any we don't recognize */
204  &data->protocol_version,
205  &data->publication_names);
206 
207  /* Check if we support requested protocol */
209  ereport(ERROR,
210  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
211  errmsg("client sent proto_version=%d but we only support protocol %d or lower",
213 
215  ereport(ERROR,
216  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
217  errmsg("client sent proto_version=%d but we only support protocol %d or higher",
219 
220  if (list_length(data->publication_names) < 1)
221  ereport(ERROR,
222  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
223  errmsg("publication_names parameter missing")));
224 
225  /* Init publication state. */
226  data->publications = NIL;
227  publications_valid = false;
230  (Datum) 0);
231 
232  /* Initialize relation schema cache. */
234  }
235 }
236 
237 /*
238  * BEGIN callback
239  */
240 static void
242 {
243  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
244 
245  OutputPluginPrepareWrite(ctx, !send_replication_origin);
246  logicalrep_write_begin(ctx->out, txn);
247 
248  if (send_replication_origin)
249  {
250  char *origin;
251 
252  /* Message boundary */
253  OutputPluginWrite(ctx, false);
254  OutputPluginPrepareWrite(ctx, true);
255 
256  /*----------
257  * XXX: which behaviour do we want here?
258  *
259  * Alternatives:
260  * - don't send origin message if origin name not found
261  * (that's what we do now)
262  * - throw error - that will break replication, not good
263  * - send some special "unknown" origin
264  *----------
265  */
266  if (replorigin_by_oid(txn->origin_id, true, &origin))
267  logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
268  }
269 
270  OutputPluginWrite(ctx, true);
271 }
272 
273 /*
274  * COMMIT callback
275  */
276 static void
278  XLogRecPtr commit_lsn)
279 {
281 
282  OutputPluginPrepareWrite(ctx, true);
283  logicalrep_write_commit(ctx->out, txn, commit_lsn);
284  OutputPluginWrite(ctx, true);
285 }
286 
287 /*
288  * Write the current schema of the relation and its ancestor (if any) if not
289  * done yet.
290  */
291 static void
293  Relation relation, RelationSyncEntry *relentry)
294 {
295  if (relentry->schema_sent)
296  return;
297 
298  /* If needed, send the ancestor's schema first. */
299  if (relentry->publish_as_relid != RelationGetRelid(relation))
300  {
301  Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
302  TupleDesc indesc = RelationGetDescr(relation);
303  TupleDesc outdesc = RelationGetDescr(ancestor);
304  MemoryContext oldctx;
305 
306  /* Map must live as long as the session does. */
308  relentry->map = convert_tuples_by_name(CreateTupleDescCopy(indesc),
309  CreateTupleDescCopy(outdesc));
310  MemoryContextSwitchTo(oldctx);
311  send_relation_and_attrs(ancestor, ctx);
312  RelationClose(ancestor);
313  }
314 
315  send_relation_and_attrs(relation, ctx);
316  relentry->schema_sent = true;
317 }
318 
319 /*
320  * Sends a relation
321  */
322 static void
324 {
325  TupleDesc desc = RelationGetDescr(relation);
326  int i;
327 
328  /*
329  * Write out type info if needed. We do that only for user-created types.
330  * We use FirstGenbkiObjectId as the cutoff, so that we only consider
331  * objects with hand-assigned OIDs to be "built in", not for instance any
332  * function or type defined in the information_schema. This is important
333  * because only hand-assigned OIDs can be expected to remain stable across
334  * major versions.
335  */
336  for (i = 0; i < desc->natts; i++)
337  {
338  Form_pg_attribute att = TupleDescAttr(desc, i);
339 
340  if (att->attisdropped || att->attgenerated)
341  continue;
342 
343  if (att->atttypid < FirstGenbkiObjectId)
344  continue;
345 
346  OutputPluginPrepareWrite(ctx, false);
347  logicalrep_write_typ(ctx->out, att->atttypid);
348  OutputPluginWrite(ctx, false);
349  }
350 
351  OutputPluginPrepareWrite(ctx, false);
352  logicalrep_write_rel(ctx->out, relation);
353  OutputPluginWrite(ctx, false);
354 }
355 
356 /*
357  * Sends the decoded DML over wire.
358  */
359 static void
361  Relation relation, ReorderBufferChange *change)
362 {
364  MemoryContext old;
365  RelationSyncEntry *relentry;
366 
367  if (!is_publishable_relation(relation))
368  return;
369 
370  relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
371 
372  /* First check the table filter */
373  switch (change->action)
374  {
376  if (!relentry->pubactions.pubinsert)
377  return;
378  break;
380  if (!relentry->pubactions.pubupdate)
381  return;
382  break;
384  if (!relentry->pubactions.pubdelete)
385  return;
386  break;
387  default:
388  Assert(false);
389  }
390 
391  /* Avoid leaking memory by using and resetting our own context */
392  old = MemoryContextSwitchTo(data->context);
393 
394  maybe_send_schema(ctx, relation, relentry);
395 
396  /* Send the data */
397  switch (change->action)
398  {
400  {
401  HeapTuple tuple = &change->data.tp.newtuple->tuple;
402 
403  /* Switch relation if publishing via root. */
404  if (relentry->publish_as_relid != RelationGetRelid(relation))
405  {
406  Assert(relation->rd_rel->relispartition);
407  relation = RelationIdGetRelation(relentry->publish_as_relid);
408  /* Convert tuple if needed. */
409  if (relentry->map)
410  tuple = execute_attr_map_tuple(tuple, relentry->map);
411  }
412 
413  OutputPluginPrepareWrite(ctx, true);
414  logicalrep_write_insert(ctx->out, relation, tuple);
415  OutputPluginWrite(ctx, true);
416  break;
417  }
419  {
420  HeapTuple oldtuple = change->data.tp.oldtuple ?
421  &change->data.tp.oldtuple->tuple : NULL;
422  HeapTuple newtuple = &change->data.tp.newtuple->tuple;
423 
424  /* Switch relation if publishing via root. */
425  if (relentry->publish_as_relid != RelationGetRelid(relation))
426  {
427  Assert(relation->rd_rel->relispartition);
428  relation = RelationIdGetRelation(relentry->publish_as_relid);
429  /* Convert tuples if needed. */
430  if (relentry->map)
431  {
432  oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
433  newtuple = execute_attr_map_tuple(newtuple, relentry->map);
434  }
435  }
436 
437  OutputPluginPrepareWrite(ctx, true);
438  logicalrep_write_update(ctx->out, relation, oldtuple, newtuple);
439  OutputPluginWrite(ctx, true);
440  break;
441  }
443  if (change->data.tp.oldtuple)
444  {
445  HeapTuple oldtuple = &change->data.tp.oldtuple->tuple;
446 
447  /* Switch relation if publishing via root. */
448  if (relentry->publish_as_relid != RelationGetRelid(relation))
449  {
450  Assert(relation->rd_rel->relispartition);
451  relation = RelationIdGetRelation(relentry->publish_as_relid);
452  /* Convert tuple if needed. */
453  if (relentry->map)
454  oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
455  }
456 
457  OutputPluginPrepareWrite(ctx, true);
458  logicalrep_write_delete(ctx->out, relation, oldtuple);
459  OutputPluginWrite(ctx, true);
460  }
461  else
462  elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
463  break;
464  default:
465  Assert(false);
466  }
467 
468  /* Cleanup */
471 }
472 
473 static void
475  int nrelations, Relation relations[], ReorderBufferChange *change)
476 {
478  MemoryContext old;
479  RelationSyncEntry *relentry;
480  int i;
481  int nrelids;
482  Oid *relids;
483 
484  old = MemoryContextSwitchTo(data->context);
485 
486  relids = palloc0(nrelations * sizeof(Oid));
487  nrelids = 0;
488 
489  for (i = 0; i < nrelations; i++)
490  {
491  Relation relation = relations[i];
492  Oid relid = RelationGetRelid(relation);
493 
494  if (!is_publishable_relation(relation))
495  continue;
496 
497  relentry = get_rel_sync_entry(data, relid);
498 
499  if (!relentry->pubactions.pubtruncate)
500  continue;
501 
502  /*
503  * Don't send partitions if the publication wants to send only the
504  * root tables through it.
505  */
506  if (relation->rd_rel->relispartition &&
507  relentry->publish_as_relid != relid)
508  continue;
509 
510  relids[nrelids++] = relid;
511  maybe_send_schema(ctx, relation, relentry);
512  }
513 
514  if (nrelids > 0)
515  {
516  OutputPluginPrepareWrite(ctx, true);
518  nrelids,
519  relids,
520  change->data.truncate.cascade,
521  change->data.truncate.restart_seqs);
522  OutputPluginWrite(ctx, true);
523  }
524 
527 }
528 
529 /*
530  * Currently we always forward.
531  */
532 static bool
534  RepOriginId origin_id)
535 {
536  return false;
537 }
538 
539 /*
540  * Shutdown the output plugin.
541  *
542  * Note, we don't need to clean the data->context as it's child context
543  * of the ctx->context so it will be cleaned up by logical decoding machinery.
544  */
545 static void
547 {
548  if (RelationSyncCache)
549  {
550  hash_destroy(RelationSyncCache);
551  RelationSyncCache = NULL;
552  }
553 }
554 
555 /*
556  * Load publications from the list of publication names.
557  */
558 static List *
560 {
561  List *result = NIL;
562  ListCell *lc;
563 
564  foreach(lc, pubnames)
565  {
566  char *pubname = (char *) lfirst(lc);
567  Publication *pub = GetPublicationByName(pubname, false);
568 
569  result = lappend(result, pub);
570  }
571 
572  return result;
573 }
574 
575 /*
576  * Publication cache invalidation callback.
577  */
578 static void
579 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
580 {
581  publications_valid = false;
582 
583  /*
584  * Also invalidate per-relation cache so that next time the filtering info
585  * is checked it will be updated with the new publication settings.
586  */
587  rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
588 }
589 
590 /*
591  * Initialize the relation schema sync cache for a decoding session.
592  *
593  * The hash table is destroyed at the end of a decoding session. While
594  * relcache invalidations still exist and will still be invoked, they
595  * will just see the null hash table global and take no action.
596  */
597 static void
599 {
600  HASHCTL ctl;
601  MemoryContext old_ctxt;
602 
603  if (RelationSyncCache != NULL)
604  return;
605 
606  /* Make a new hash table for the cache */
607  MemSet(&ctl, 0, sizeof(ctl));
608  ctl.keysize = sizeof(Oid);
609  ctl.entrysize = sizeof(RelationSyncEntry);
610  ctl.hcxt = cachectx;
611 
612  old_ctxt = MemoryContextSwitchTo(cachectx);
613  RelationSyncCache = hash_create("logical replication output relation cache",
614  128, &ctl,
616  (void) MemoryContextSwitchTo(old_ctxt);
617 
618  Assert(RelationSyncCache != NULL);
619 
623  (Datum) 0);
624 }
625 
626 /*
627  * Find or create entry in the relation schema cache.
628  *
629  * This looks up publications that the given relation is directly or
630  * indirectly part of (the latter if it's really the relation's ancestor that
631  * is part of a publication) and fills up the found entry with the information
632  * about which operations to publish and whether to use an ancestor's schema
633  * when publishing.
634  */
635 static RelationSyncEntry *
637 {
638  RelationSyncEntry *entry;
639  bool am_partition = get_rel_relispartition(relid);
640  char relkind = get_rel_relkind(relid);
641  bool found;
642  MemoryContext oldctx;
643 
644  Assert(RelationSyncCache != NULL);
645 
646  /* Find cached function info, creating if not found */
648  entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
649  (void *) &relid,
650  HASH_ENTER, &found);
651  MemoryContextSwitchTo(oldctx);
652  Assert(entry != NULL);
653 
654  /* Not found means schema wasn't sent */
655  if (!found || !entry->replicate_valid)
656  {
657  List *pubids = GetRelationPublications(relid);
658  ListCell *lc;
660 
661  /* Reload publications if needed before use. */
662  if (!publications_valid)
663  {
665  if (data->publications)
667 
669  MemoryContextSwitchTo(oldctx);
670  publications_valid = true;
671  }
672 
673  /*
674  * Build publication cache. We can't use one provided by relcache as
675  * relcache considers all publications given relation is in, but here
676  * we only need to consider ones that the subscriber requested.
677  */
678  entry->pubactions.pubinsert = entry->pubactions.pubupdate =
679  entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
680 
681  foreach(lc, data->publications)
682  {
683  Publication *pub = lfirst(lc);
684  bool publish = false;
685 
686  if (pub->alltables)
687  {
688  publish = true;
689  if (pub->pubviaroot && am_partition)
690  publish_as_relid = llast_oid(get_partition_ancestors(relid));
691  }
692 
693  if (!publish)
694  {
695  bool ancestor_published = false;
696 
697  /*
698  * For a partition, check if any of the ancestors are
699  * published. If so, note down the topmost ancestor that is
700  * published via this publication, which will be used as the
701  * relation via which to publish the partition's changes.
702  */
703  if (am_partition)
704  {
705  List *ancestors = get_partition_ancestors(relid);
706  ListCell *lc2;
707 
708  /*
709  * Find the "topmost" ancestor that is in this
710  * publication.
711  */
712  foreach(lc2, ancestors)
713  {
714  Oid ancestor = lfirst_oid(lc2);
715 
717  pub->oid))
718  {
719  ancestor_published = true;
720  if (pub->pubviaroot)
721  publish_as_relid = ancestor;
722  }
723  }
724  }
725 
726  if (list_member_oid(pubids, pub->oid) || ancestor_published)
727  publish = true;
728  }
729 
730  /*
731  * Don't publish changes for partitioned tables, because
732  * publishing those of its partitions suffices, unless partition
733  * changes won't be published due to pubviaroot being set.
734  */
735  if (publish &&
736  (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
737  {
738  entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
739  entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
740  entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
742  }
743 
744  if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
745  entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
746  break;
747  }
748 
749  list_free(pubids);
750 
752  entry->replicate_valid = true;
753  }
754 
755  if (!found)
756  entry->schema_sent = false;
757 
758  return entry;
759 }
760 
761 /*
762  * Relcache invalidation callback
763  */
764 static void
766 {
767  RelationSyncEntry *entry;
768 
769  /*
770  * We can get here if the plugin was used in SQL interface as the
771  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
772  * is no way to unregister the relcache invalidation callback.
773  */
774  if (RelationSyncCache == NULL)
775  return;
776 
777  /*
778  * Nobody keeps pointers to entries in this hash table around outside
779  * logical decoding callback calls - but invalidation events can come in
780  * *during* a callback if we access the relcache in the callback. Because
781  * of that we must mark the cache entry as invalid but not remove it from
782  * the hash while it could still be referenced, then prune it at a later
783  * safe point.
784  *
785  * Getting invalidations for relations that aren't in the table is
786  * entirely normal, since there's no way to unregister for an invalidation
787  * event. So we don't care if it's found or not.
788  */
789  entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
790  HASH_FIND, NULL);
791 
792  /*
793  * Reset schema sent status as the relation definition may have changed.
794  */
795  if (entry != NULL)
796  entry->schema_sent = false;
797 }
798 
799 /*
800  * Publication relation map syscache invalidation callback
801  */
802 static void
804 {
806  RelationSyncEntry *entry;
807 
808  /*
809  * We can get here if the plugin was used in SQL interface as the
810  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
811  * is no way to unregister the relcache invalidation callback.
812  */
813  if (RelationSyncCache == NULL)
814  return;
815 
816  /*
817  * There is no way to find which entry in our cache the hash belongs to so
818  * mark the whole cache as invalid.
819  */
820  hash_seq_init(&status, RelationSyncCache);
821  while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
822  entry->replicate_valid = false;
823 }
LogicalDecodeTruncateCB truncate_cb
#define NIL
Definition: pg_list.h:65
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
void logicalrep_write_typ(StringInfo out, Oid typoid)
Definition: proto.c:400
static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
Definition: pgoutput.c:360
PublicationActions pubactions
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:816
TupleDesc CreateTupleDescCopy(TupleDesc tupdesc)
Definition: tupdesc.c:110
#define IsA(nodeptr, _type_)
Definition: nodes.h:580
#define AllocSetContextCreate
Definition: memutils.h:170
#define DEBUG1
Definition: elog.h:25
static bool publications_valid
Definition: pgoutput.c:50
RepOriginId origin_id
bool replicate_valid
Definition: pgoutput.c:73
#define HASH_CONTEXT
Definition: hsearch.h:93
struct ReorderBufferChange::@99::@101 truncate
#define HASH_ELEM
Definition: hsearch.h:87
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:579
MemoryContext hcxt
Definition: hsearch.h:78
#define RelationGetDescr(relation)
Definition: rel.h:482
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: pgoutput.c:106
MemoryContext context
Definition: pgoutput.h:20
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
static void rel_sync_cache_relation_cb(Datum arg, Oid relid)
Definition: pgoutput.c:765
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1915
TupleConversionMap * map
Definition: pgoutput.c:90
static void pgoutput_shutdown(LogicalDecodingContext *ctx)
Definition: pgoutput.c:546
static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:277
static void maybe_send_schema(LogicalDecodingContext *ctx, Relation relation, RelationSyncEntry *relentry)
Definition: pgoutput.c:292
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
uint16 RepOriginId
Definition: xlogdefs.h:58
Size entrysize
Definition: hsearch.h:73
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:610
void * output_plugin_private
Definition: logical.h:75
#define MemSet(start, val, len)
Definition: c.h:971
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:75
MemoryContext context
Definition: logical.h:35
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
#define llast_oid(l)
Definition: pg_list.h:217
List * output_plugin_options
Definition: logical.h:58
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:113
#define PG_UINT32_MAX
Definition: c.h:451
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:908
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:437
Form_pg_class rd_rel
Definition: rel.h:109
unsigned int Oid
Definition: postgres_ext.h:31
struct RelationSyncEntry RelationSyncEntry
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:803
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
void list_free_deep(List *list)
Definition: list.c:1390
Publication * GetPublicationByName(const char *pubname, bool missing_ok)
XLogRecPtr origin_lsn
Oid publish_as_relid
Definition: pgoutput.c:82
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
Definition: inval.c:1468
OutputPluginOutputType output_type
Definition: output_plugin.h:28
Definition: dynahash.c:210
List * GetRelationPublications(Oid relid)
#define ERROR
Definition: elog.h:43
LogicalDecodeCommitCB commit_cb
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:36
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define LOGICALREP_PROTO_MIN_VERSION_NUM
Definition: logicalproto.h:27
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:102
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3686
bool is_publishable_relation(Relation rel)
static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: pgoutput.c:180
List * publication_names
Definition: pgoutput.h:26
void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
Definition: proto.c:251
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:193
unsigned int uint32
Definition: c.h:367
void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:298
void RelationClose(Relation relation)
Definition: relcache.c:2102
HeapTuple execute_attr_map_tuple(HeapTuple tuple, TupleConversionMap *map)
Definition: tupconvert.c:139
void logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
Definition: proto.c:142
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Oid relid)
Definition: pgoutput.c:636
bool get_rel_relispartition(Oid relid)
Definition: lsyscache.c:1939
Node * arg
Definition: parsenodes.h:733
void logicalrep_write_rel(StringInfo out, Relation rel)
Definition: proto.c:350
List * lappend(List *list, void *datum)
Definition: list.c:321
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:46
static HTAB * RelationSyncCache
Definition: pgoutput.c:94
#define HASH_BLOBS
Definition: hsearch.h:88
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
Definition: logical.c:552
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1426
void * palloc0(Size size)
Definition: mcxt.c:980
LogicalDecodeChangeCB change_cb
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:318
uintptr_t Datum
Definition: postgres.h:367
union ReorderBufferChange::@99 data
Size keysize
Definition: hsearch.h:72
PG_MODULE_MAGIC
Definition: pgoutput.c:30
#define ereport(elevel,...)
Definition: elog.h:144
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:526
void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, HeapTuple newtuple)
Definition: proto.c:181
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:674
struct ReorderBufferChange::@99::@100 tp
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:738
#define lfirst(lc)
Definition: pg_list.h:190
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
Definition: pgoutput.c:533
static int list_length(const List *l)
Definition: pg_list.h:169
LogicalDecodeShutdownCB shutdown_cb
static List * LoadPublications(List *pubnames)
Definition: pgoutput.c:559
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1391
static void parse_output_parameters(List *options, uint32 *protocol_version, List **publication_names)
Definition: pgoutput.c:120
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1381
LogicalDecodeStartupCB startup_cb
#define InvalidRepOriginId
Definition: origin.h:33
List * publications
Definition: pgoutput.h:27
static void init_rel_sync_cache(MemoryContext decoding_context)
Definition: pgoutput.c:598
#define FirstGenbkiObjectId
Definition: transam.h:153
int errmsg(const char *fmt,...)
Definition: elog.c:824
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:539
static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx)
Definition: pgoutput.c:323
void list_free(List *list)
Definition: list.c:1376
#define elog(elevel,...)
Definition: elog.h:214
StringInfo out
Definition: logical.h:70
int i
void * arg
LogicalDecodeBeginCB begin_cb
char * defname
Definition: parsenodes.h:732
static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Definition: pgoutput.c:474
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:225
static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:241
LogicalDecodeFilterByOriginCB filter_by_origin_cb
Definition: pg_list.h:50
List * get_partition_ancestors(Oid relid)
Definition: partition.c:115
#define RelationGetRelid(relation)
Definition: rel.h:456
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:1996
#define lfirst_oid(lc)
Definition: pg_list.h:192
MemoryContext CacheMemoryContext
Definition: mcxt.c:47
PublicationActions pubactions
Definition: pgoutput.c:74
bool scanint8(const char *str, bool errorOK, int64 *result)
Definition: int8.c:55
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:897
uint32 protocol_version
Definition: pgoutput.h:24