PostgreSQL Source Code  git master
pgoutput.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pgoutput.c
4  * Logical Replication output plugin
5  *
6  * Copyright (c) 2012-2019, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/pgoutput/pgoutput.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "catalog/pg_publication.h"
16 
17 #include "fmgr.h"
18 
19 #include "replication/logical.h"
21 #include "replication/origin.h"
22 #include "replication/pgoutput.h"
23 
24 #include "utils/inval.h"
25 #include "utils/int8.h"
26 #include "utils/memutils.h"
27 #include "utils/syscache.h"
28 #include "utils/varlena.h"
29 
31 
33 
35  OutputPluginOptions *opt, bool is_init);
38  ReorderBufferTXN *txn);
40  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
42  ReorderBufferTXN *txn, Relation rel,
43  ReorderBufferChange *change);
45  ReorderBufferTXN *txn, int nrelations, Relation relations[],
46  ReorderBufferChange *change);
48  RepOriginId origin_id);
49 
50 static bool publications_valid;
51 
52 static List *LoadPublications(List *pubnames);
53 static void publication_invalidation_cb(Datum arg, int cacheid,
54  uint32 hashvalue);
55 
56 /* Entry in the map used to remember which relation schemas we sent. */
57 typedef struct RelationSyncEntry
58 {
59  Oid relid; /* relation oid */
60  bool schema_sent; /* did we send the schema? */
64 
65 /* Map used to remember which relation schemas we sent. */
66 static HTAB *RelationSyncCache = NULL;
67 
68 static void init_rel_sync_cache(MemoryContext decoding_context);
71 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
72  uint32 hashvalue);
73 
74 /*
75  * Specify output plugin callbacks
76  */
77 void
79 {
81 
89 }
90 
91 static void
93  List **publication_names)
94 {
95  ListCell *lc;
96  bool protocol_version_given = false;
97  bool publication_names_given = false;
98 
99  foreach(lc, options)
100  {
101  DefElem *defel = (DefElem *) lfirst(lc);
102 
103  Assert(defel->arg == NULL || IsA(defel->arg, String));
104 
105  /* Check each param, whether or not we recognize it */
106  if (strcmp(defel->defname, "proto_version") == 0)
107  {
108  int64 parsed;
109 
110  if (protocol_version_given)
111  ereport(ERROR,
112  (errcode(ERRCODE_SYNTAX_ERROR),
113  errmsg("conflicting or redundant options")));
114  protocol_version_given = true;
115 
116  if (!scanint8(strVal(defel->arg), true, &parsed))
117  ereport(ERROR,
118  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
119  errmsg("invalid proto_version")));
120 
121  if (parsed > PG_UINT32_MAX || parsed < 0)
122  ereport(ERROR,
123  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
124  errmsg("proto_version \"%s\" out of range",
125  strVal(defel->arg))));
126 
127  *protocol_version = (uint32) parsed;
128  }
129  else if (strcmp(defel->defname, "publication_names") == 0)
130  {
131  if (publication_names_given)
132  ereport(ERROR,
133  (errcode(ERRCODE_SYNTAX_ERROR),
134  errmsg("conflicting or redundant options")));
135  publication_names_given = true;
136 
137  if (!SplitIdentifierString(strVal(defel->arg), ',',
138  publication_names))
139  ereport(ERROR,
140  (errcode(ERRCODE_INVALID_NAME),
141  errmsg("invalid publication_names syntax")));
142  }
143  else
144  elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
145  }
146 }
147 
148 /*
149  * Initialize this plugin
150  */
151 static void
153  bool is_init)
154 {
155  PGOutputData *data = palloc0(sizeof(PGOutputData));
156 
157  /* Create our memory context for private allocations. */
159  "logical replication output context",
161 
162  ctx->output_plugin_private = data;
163 
164  /* This plugin uses binary protocol. */
166 
167  /*
168  * This is replication start and not slot initialization.
169  *
170  * Parse and validate options passed by the client.
171  */
172  if (!is_init)
173  {
174  /* Parse the params and ERROR if we see any we don't recognize */
176  &data->protocol_version,
177  &data->publication_names);
178 
179  /* Check if we support requested protocol */
181  ereport(ERROR,
182  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
183  errmsg("client sent proto_version=%d but we only support protocol %d or lower",
185 
187  ereport(ERROR,
188  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
189  errmsg("client sent proto_version=%d but we only support protocol %d or higher",
191 
192  if (list_length(data->publication_names) < 1)
193  ereport(ERROR,
194  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
195  errmsg("publication_names parameter missing")));
196 
197  /* Init publication state. */
198  data->publications = NIL;
199  publications_valid = false;
202  (Datum) 0);
203 
204  /* Initialize relation schema cache. */
206  }
207 }
208 
209 /*
210  * BEGIN callback
211  */
212 static void
214 {
215  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
216 
217  OutputPluginPrepareWrite(ctx, !send_replication_origin);
218  logicalrep_write_begin(ctx->out, txn);
219 
220  if (send_replication_origin)
221  {
222  char *origin;
223 
224  /* Message boundary */
225  OutputPluginWrite(ctx, false);
226  OutputPluginPrepareWrite(ctx, true);
227 
228  /*----------
229  * XXX: which behaviour do we want here?
230  *
231  * Alternatives:
232  * - don't send origin message if origin name not found
233  * (that's what we do now)
234  * - throw error - that will break replication, not good
235  * - send some special "unknown" origin
236  *----------
237  */
238  if (replorigin_by_oid(txn->origin_id, true, &origin))
239  logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
240  }
241 
242  OutputPluginWrite(ctx, true);
243 }
244 
245 /*
246  * COMMIT callback
247  */
248 static void
250  XLogRecPtr commit_lsn)
251 {
253 
254  OutputPluginPrepareWrite(ctx, true);
255  logicalrep_write_commit(ctx->out, txn, commit_lsn);
256  OutputPluginWrite(ctx, true);
257 }
258 
259 /*
260  * Write the relation schema if the current schema hasn't been sent yet.
261  */
262 static void
264  Relation relation, RelationSyncEntry *relentry)
265 {
266  if (!relentry->schema_sent)
267  {
268  TupleDesc desc;
269  int i;
270 
271  desc = RelationGetDescr(relation);
272 
273  /*
274  * Write out type info if needed. We do that only for user-created
275  * types. We use FirstGenbkiObjectId as the cutoff, so that we only
276  * consider objects with hand-assigned OIDs to be "built in", not for
277  * instance any function or type defined in the information_schema.
278  * This is important because only hand-assigned OIDs can be expected
279  * to remain stable across major versions.
280  */
281  for (i = 0; i < desc->natts; i++)
282  {
283  Form_pg_attribute att = TupleDescAttr(desc, i);
284 
285  if (att->attisdropped || att->attgenerated)
286  continue;
287 
288  if (att->atttypid < FirstGenbkiObjectId)
289  continue;
290 
291  OutputPluginPrepareWrite(ctx, false);
292  logicalrep_write_typ(ctx->out, att->atttypid);
293  OutputPluginWrite(ctx, false);
294  }
295 
296  OutputPluginPrepareWrite(ctx, false);
297  logicalrep_write_rel(ctx->out, relation);
298  OutputPluginWrite(ctx, false);
299  relentry->schema_sent = true;
300  }
301 }
302 
303 /*
304  * Sends the decoded DML over wire.
305  */
306 static void
308  Relation relation, ReorderBufferChange *change)
309 {
311  MemoryContext old;
312  RelationSyncEntry *relentry;
313 
314  if (!is_publishable_relation(relation))
315  return;
316 
317  relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
318 
319  /* First check the table filter */
320  switch (change->action)
321  {
323  if (!relentry->pubactions.pubinsert)
324  return;
325  break;
327  if (!relentry->pubactions.pubupdate)
328  return;
329  break;
331  if (!relentry->pubactions.pubdelete)
332  return;
333  break;
334  default:
335  Assert(false);
336  }
337 
338  /* Avoid leaking memory by using and resetting our own context */
339  old = MemoryContextSwitchTo(data->context);
340 
341  maybe_send_schema(ctx, relation, relentry);
342 
343  /* Send the data */
344  switch (change->action)
345  {
347  OutputPluginPrepareWrite(ctx, true);
348  logicalrep_write_insert(ctx->out, relation,
349  &change->data.tp.newtuple->tuple);
350  OutputPluginWrite(ctx, true);
351  break;
353  {
354  HeapTuple oldtuple = change->data.tp.oldtuple ?
355  &change->data.tp.oldtuple->tuple : NULL;
356 
357  OutputPluginPrepareWrite(ctx, true);
358  logicalrep_write_update(ctx->out, relation, oldtuple,
359  &change->data.tp.newtuple->tuple);
360  OutputPluginWrite(ctx, true);
361  break;
362  }
364  if (change->data.tp.oldtuple)
365  {
366  OutputPluginPrepareWrite(ctx, true);
367  logicalrep_write_delete(ctx->out, relation,
368  &change->data.tp.oldtuple->tuple);
369  OutputPluginWrite(ctx, true);
370  }
371  else
372  elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
373  break;
374  default:
375  Assert(false);
376  }
377 
378  /* Cleanup */
381 }
382 
383 static void
385  int nrelations, Relation relations[], ReorderBufferChange *change)
386 {
388  MemoryContext old;
389  RelationSyncEntry *relentry;
390  int i;
391  int nrelids;
392  Oid *relids;
393 
394  old = MemoryContextSwitchTo(data->context);
395 
396  relids = palloc0(nrelations * sizeof(Oid));
397  nrelids = 0;
398 
399  for (i = 0; i < nrelations; i++)
400  {
401  Relation relation = relations[i];
402  Oid relid = RelationGetRelid(relation);
403 
404  if (!is_publishable_relation(relation))
405  continue;
406 
407  relentry = get_rel_sync_entry(data, relid);
408 
409  if (!relentry->pubactions.pubtruncate)
410  continue;
411 
412  relids[nrelids++] = relid;
413  maybe_send_schema(ctx, relation, relentry);
414  }
415 
416  if (nrelids > 0)
417  {
418  OutputPluginPrepareWrite(ctx, true);
420  nrelids,
421  relids,
422  change->data.truncate.cascade,
423  change->data.truncate.restart_seqs);
424  OutputPluginWrite(ctx, true);
425  }
426 
429 }
430 
431 /*
432  * Currently we always forward.
433  */
434 static bool
436  RepOriginId origin_id)
437 {
438  return false;
439 }
440 
441 /*
442  * Shutdown the output plugin.
443  *
444  * Note, we don't need to clean the data->context as it's child context
445  * of the ctx->context so it will be cleaned up by logical decoding machinery.
446  */
447 static void
449 {
450  if (RelationSyncCache)
451  {
452  hash_destroy(RelationSyncCache);
453  RelationSyncCache = NULL;
454  }
455 }
456 
457 /*
458  * Load publications from the list of publication names.
459  */
460 static List *
462 {
463  List *result = NIL;
464  ListCell *lc;
465 
466  foreach(lc, pubnames)
467  {
468  char *pubname = (char *) lfirst(lc);
469  Publication *pub = GetPublicationByName(pubname, false);
470 
471  result = lappend(result, pub);
472  }
473 
474  return result;
475 }
476 
477 /*
478  * Publication cache invalidation callback.
479  */
480 static void
481 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
482 {
483  publications_valid = false;
484 
485  /*
486  * Also invalidate per-relation cache so that next time the filtering info
487  * is checked it will be updated with the new publication settings.
488  */
489  rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
490 }
491 
492 /*
493  * Initialize the relation schema sync cache for a decoding session.
494  *
495  * The hash table is destroyed at the end of a decoding session. While
496  * relcache invalidations still exist and will still be invoked, they
497  * will just see the null hash table global and take no action.
498  */
499 static void
501 {
502  HASHCTL ctl;
503  MemoryContext old_ctxt;
504 
505  if (RelationSyncCache != NULL)
506  return;
507 
508  /* Make a new hash table for the cache */
509  MemSet(&ctl, 0, sizeof(ctl));
510  ctl.keysize = sizeof(Oid);
511  ctl.entrysize = sizeof(RelationSyncEntry);
512  ctl.hcxt = cachectx;
513 
514  old_ctxt = MemoryContextSwitchTo(cachectx);
515  RelationSyncCache = hash_create("logical replication output relation cache",
516  128, &ctl,
518  (void) MemoryContextSwitchTo(old_ctxt);
519 
520  Assert(RelationSyncCache != NULL);
521 
525  (Datum) 0);
526 }
527 
528 /*
529  * Find or create entry in the relation schema cache.
530  */
531 static RelationSyncEntry *
533 {
534  RelationSyncEntry *entry;
535  bool found;
536  MemoryContext oldctx;
537 
538  Assert(RelationSyncCache != NULL);
539 
540  /* Find cached function info, creating if not found */
542  entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
543  (void *) &relid,
544  HASH_ENTER, &found);
545  MemoryContextSwitchTo(oldctx);
546  Assert(entry != NULL);
547 
548  /* Not found means schema wasn't sent */
549  if (!found || !entry->replicate_valid)
550  {
551  List *pubids = GetRelationPublications(relid);
552  ListCell *lc;
553 
554  /* Reload publications if needed before use. */
555  if (!publications_valid)
556  {
558  if (data->publications)
560 
562  MemoryContextSwitchTo(oldctx);
563  publications_valid = true;
564  }
565 
566  /*
567  * Build publication cache. We can't use one provided by relcache as
568  * relcache considers all publications given relation is in, but here
569  * we only need to consider ones that the subscriber requested.
570  */
571  entry->pubactions.pubinsert = entry->pubactions.pubupdate =
572  entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
573 
574  foreach(lc, data->publications)
575  {
576  Publication *pub = lfirst(lc);
577 
578  if (pub->alltables || list_member_oid(pubids, pub->oid))
579  {
580  entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
581  entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
582  entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
584  }
585 
586  if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
587  entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
588  break;
589  }
590 
591  list_free(pubids);
592 
593  entry->replicate_valid = true;
594  }
595 
596  if (!found)
597  entry->schema_sent = false;
598 
599  return entry;
600 }
601 
602 /*
603  * Relcache invalidation callback
604  */
605 static void
607 {
608  RelationSyncEntry *entry;
609 
610  /*
611  * We can get here if the plugin was used in SQL interface as the
612  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
613  * is no way to unregister the relcache invalidation callback.
614  */
615  if (RelationSyncCache == NULL)
616  return;
617 
618  /*
619  * Nobody keeps pointers to entries in this hash table around outside
620  * logical decoding callback calls - but invalidation events can come in
621  * *during* a callback if we access the relcache in the callback. Because
622  * of that we must mark the cache entry as invalid but not remove it from
623  * the hash while it could still be referenced, then prune it at a later
624  * safe point.
625  *
626  * Getting invalidations for relations that aren't in the table is
627  * entirely normal, since there's no way to unregister for an invalidation
628  * event. So we don't care if it's found or not.
629  */
630  entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
631  HASH_FIND, NULL);
632 
633  /*
634  * Reset schema sent status as the relation definition may have changed.
635  */
636  if (entry != NULL)
637  entry->schema_sent = false;
638 }
639 
640 /*
641  * Publication relation map syscache invalidation callback
642  */
643 static void
645 {
647  RelationSyncEntry *entry;
648 
649  /*
650  * We can get here if the plugin was used in SQL interface as the
651  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
652  * is no way to unregister the relcache invalidation callback.
653  */
654  if (RelationSyncCache == NULL)
655  return;
656 
657  /*
658  * There is no way to find which entry in our cache the hash belongs to so
659  * mark the whole cache as invalid.
660  */
661  hash_seq_init(&status, RelationSyncCache);
662  while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
663  entry->replicate_valid = false;
664 }
LogicalDecodeTruncateCB truncate_cb
#define NIL
Definition: pg_list.h:65
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
void logicalrep_write_typ(StringInfo out, Oid typoid)
Definition: proto.c:404
static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
Definition: pgoutput.c:307
PublicationActions pubactions
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:814
#define IsA(nodeptr, _type_)
Definition: nodes.h:576
#define AllocSetContextCreate
Definition: memutils.h:170
#define DEBUG1
Definition: elog.h:25
static bool publications_valid
Definition: pgoutput.c:50
RepOriginId origin_id
bool replicate_valid
Definition: pgoutput.c:61
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:481
MemoryContext hcxt
Definition: hsearch.h:78
#define RelationGetDescr(relation)
Definition: rel.h:445
struct ReorderBufferChange::@101::@102 tp
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: pgoutput.c:78
MemoryContext context
Definition: pgoutput.h:20
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
static void rel_sync_cache_relation_cb(Datum arg, Oid relid)
Definition: pgoutput.c:606
static void pgoutput_shutdown(LogicalDecodingContext *ctx)
Definition: pgoutput.c:448
static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:249
static void maybe_send_schema(LogicalDecodingContext *ctx, Relation relation, RelationSyncEntry *relentry)
Definition: pgoutput.c:263
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
uint16 RepOriginId
Definition: xlogdefs.h:58
Size entrysize
Definition: hsearch.h:73
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:570
void * output_plugin_private
Definition: logical.h:76
#define MemSet(start, val, len)
Definition: c.h:955
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:75
MemoryContext context
Definition: logical.h:36
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
List * output_plugin_options
Definition: logical.h:59
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:113
#define PG_UINT32_MAX
Definition: c.h:442
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:435
unsigned int Oid
Definition: postgres_ext.h:31
struct RelationSyncEntry RelationSyncEntry
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:644
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
void list_free_deep(List *list)
Definition: list.c:1391
Publication * GetPublicationByName(const char *pubname, bool missing_ok)
XLogRecPtr origin_lsn
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
Definition: inval.c:1468
OutputPluginOutputType output_type
Definition: output_plugin.h:28
Definition: dynahash.c:208
List * GetRelationPublications(Oid relid)
#define ERROR
Definition: elog.h:43
LogicalDecodeCommitCB commit_cb
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:36
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define LOGICALREP_PROTO_MIN_VERSION_NUM
Definition: logicalproto.h:27
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3644
bool is_publishable_relation(Relation rel)
static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: pgoutput.c:152
List * publication_names
Definition: pgoutput.h:26
void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
Definition: proto.c:255
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:200
unsigned int uint32
Definition: c.h:358
void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:302
void logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
Definition: proto.c:142
#define ereport(elevel, rest)
Definition: elog.h:141
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Oid relid)
Definition: pgoutput.c:532
Node * arg
Definition: parsenodes.h:731
void logicalrep_write_rel(StringInfo out, Relation rel)
Definition: proto.c:354
List * lappend(List *list, void *datum)
Definition: list.c:322
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:46
static HTAB * RelationSyncCache
Definition: pgoutput.c:66
#define HASH_BLOBS
Definition: hsearch.h:88
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
Definition: logical.c:553
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1426
void * palloc0(Size size)
Definition: mcxt.c:980
LogicalDecodeChangeCB change_cb
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
uintptr_t Datum
Definition: postgres.h:367
Size keysize
Definition: hsearch.h:72
PG_MODULE_MAGIC
Definition: pgoutput.c:30
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:527
void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, HeapTuple newtuple)
Definition: proto.c:185
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:675
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:732
#define lfirst(lc)
Definition: pg_list.h:190
union ReorderBufferChange::@101 data
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
Definition: pgoutput.c:435
static int list_length(const List *l)
Definition: pg_list.h:169
LogicalDecodeShutdownCB shutdown_cb
static List * LoadPublications(List *pubnames)
Definition: pgoutput.c:461
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
static void parse_output_parameters(List *options, uint32 *protocol_version, List **publication_names)
Definition: pgoutput.c:92
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379
LogicalDecodeStartupCB startup_cb
#define InvalidRepOriginId
Definition: origin.h:33
List * publications
Definition: pgoutput.h:27
static void init_rel_sync_cache(MemoryContext decoding_context)
Definition: pgoutput.c:500
#define FirstGenbkiObjectId
Definition: transam.h:139
int errmsg(const char *fmt,...)
Definition: elog.c:784
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:540
void list_free(List *list)
Definition: list.c:1377
#define elog(elevel,...)
Definition: elog.h:226
StringInfo out
Definition: logical.h:71
int i
void * arg
LogicalDecodeBeginCB begin_cb
char * defname
Definition: parsenodes.h:730
static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Definition: pgoutput.c:384
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:227
static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:213
LogicalDecodeFilterByOriginCB filter_by_origin_cb
Definition: pg_list.h:50
#define RelationGetRelid(relation)
Definition: rel.h:419
struct ReorderBufferChange::@101::@103 truncate
MemoryContext CacheMemoryContext
Definition: mcxt.c:47
PublicationActions pubactions
Definition: pgoutput.c:62
bool scanint8(const char *str, bool errorOK, int64 *result)
Definition: int8.c:57
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:881
uint32 protocol_version
Definition: pgoutput.h:24