PostgreSQL Source Code  git master
pgoutput.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pgoutput.c
4  * Logical Replication output plugin
5  *
6  * Copyright (c) 2012-2019, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/pgoutput/pgoutput.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "catalog/pg_publication.h"
16 #include "fmgr.h"
17 #include "replication/logical.h"
19 #include "replication/origin.h"
20 #include "replication/pgoutput.h"
21 #include "utils/int8.h"
22 #include "utils/inval.h"
23 #include "utils/memutils.h"
24 #include "utils/syscache.h"
25 #include "utils/varlena.h"
26 
28 
30 
32  OutputPluginOptions *opt, bool is_init);
35  ReorderBufferTXN *txn);
37  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
39  ReorderBufferTXN *txn, Relation rel,
40  ReorderBufferChange *change);
42  ReorderBufferTXN *txn, int nrelations, Relation relations[],
43  ReorderBufferChange *change);
45  RepOriginId origin_id);
46 
47 static bool publications_valid;
48 
49 static List *LoadPublications(List *pubnames);
50 static void publication_invalidation_cb(Datum arg, int cacheid,
51  uint32 hashvalue);
52 
53 /* Entry in the map used to remember which relation schemas we sent. */
54 typedef struct RelationSyncEntry
55 {
56  Oid relid; /* relation oid */
57  bool schema_sent; /* did we send the schema? */
61 
62 /* Map used to remember which relation schemas we sent. */
63 static HTAB *RelationSyncCache = NULL;
64 
65 static void init_rel_sync_cache(MemoryContext decoding_context);
68 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
69  uint32 hashvalue);
70 
71 /*
72  * Specify output plugin callbacks
73  */
74 void
76 {
78 
86 }
87 
88 static void
90  List **publication_names)
91 {
92  ListCell *lc;
93  bool protocol_version_given = false;
94  bool publication_names_given = false;
95 
96  foreach(lc, options)
97  {
98  DefElem *defel = (DefElem *) lfirst(lc);
99 
100  Assert(defel->arg == NULL || IsA(defel->arg, String));
101 
102  /* Check each param, whether or not we recognize it */
103  if (strcmp(defel->defname, "proto_version") == 0)
104  {
105  int64 parsed;
106 
107  if (protocol_version_given)
108  ereport(ERROR,
109  (errcode(ERRCODE_SYNTAX_ERROR),
110  errmsg("conflicting or redundant options")));
111  protocol_version_given = true;
112 
113  if (!scanint8(strVal(defel->arg), true, &parsed))
114  ereport(ERROR,
115  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
116  errmsg("invalid proto_version")));
117 
118  if (parsed > PG_UINT32_MAX || parsed < 0)
119  ereport(ERROR,
120  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
121  errmsg("proto_version \"%s\" out of range",
122  strVal(defel->arg))));
123 
124  *protocol_version = (uint32) parsed;
125  }
126  else if (strcmp(defel->defname, "publication_names") == 0)
127  {
128  if (publication_names_given)
129  ereport(ERROR,
130  (errcode(ERRCODE_SYNTAX_ERROR),
131  errmsg("conflicting or redundant options")));
132  publication_names_given = true;
133 
134  if (!SplitIdentifierString(strVal(defel->arg), ',',
135  publication_names))
136  ereport(ERROR,
137  (errcode(ERRCODE_INVALID_NAME),
138  errmsg("invalid publication_names syntax")));
139  }
140  else
141  elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
142  }
143 }
144 
145 /*
146  * Initialize this plugin
147  */
148 static void
150  bool is_init)
151 {
152  PGOutputData *data = palloc0(sizeof(PGOutputData));
153 
154  /* Create our memory context for private allocations. */
156  "logical replication output context",
158 
159  ctx->output_plugin_private = data;
160 
161  /* This plugin uses binary protocol. */
163 
164  /*
165  * This is replication start and not slot initialization.
166  *
167  * Parse and validate options passed by the client.
168  */
169  if (!is_init)
170  {
171  /* Parse the params and ERROR if we see any we don't recognize */
173  &data->protocol_version,
174  &data->publication_names);
175 
176  /* Check if we support requested protocol */
178  ereport(ERROR,
179  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
180  errmsg("client sent proto_version=%d but we only support protocol %d or lower",
182 
184  ereport(ERROR,
185  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
186  errmsg("client sent proto_version=%d but we only support protocol %d or higher",
188 
189  if (list_length(data->publication_names) < 1)
190  ereport(ERROR,
191  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
192  errmsg("publication_names parameter missing")));
193 
194  /* Init publication state. */
195  data->publications = NIL;
196  publications_valid = false;
199  (Datum) 0);
200 
201  /* Initialize relation schema cache. */
203  }
204 }
205 
206 /*
207  * BEGIN callback
208  */
209 static void
211 {
212  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
213 
214  OutputPluginPrepareWrite(ctx, !send_replication_origin);
215  logicalrep_write_begin(ctx->out, txn);
216 
217  if (send_replication_origin)
218  {
219  char *origin;
220 
221  /* Message boundary */
222  OutputPluginWrite(ctx, false);
223  OutputPluginPrepareWrite(ctx, true);
224 
225  /*----------
226  * XXX: which behaviour do we want here?
227  *
228  * Alternatives:
229  * - don't send origin message if origin name not found
230  * (that's what we do now)
231  * - throw error - that will break replication, not good
232  * - send some special "unknown" origin
233  *----------
234  */
235  if (replorigin_by_oid(txn->origin_id, true, &origin))
236  logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
237  }
238 
239  OutputPluginWrite(ctx, true);
240 }
241 
242 /*
243  * COMMIT callback
244  */
245 static void
247  XLogRecPtr commit_lsn)
248 {
250 
251  OutputPluginPrepareWrite(ctx, true);
252  logicalrep_write_commit(ctx->out, txn, commit_lsn);
253  OutputPluginWrite(ctx, true);
254 }
255 
256 /*
257  * Write the relation schema if the current schema hasn't been sent yet.
258  */
259 static void
261  Relation relation, RelationSyncEntry *relentry)
262 {
263  if (!relentry->schema_sent)
264  {
265  TupleDesc desc;
266  int i;
267 
268  desc = RelationGetDescr(relation);
269 
270  /*
271  * Write out type info if needed. We do that only for user-created
272  * types. We use FirstGenbkiObjectId as the cutoff, so that we only
273  * consider objects with hand-assigned OIDs to be "built in", not for
274  * instance any function or type defined in the information_schema.
275  * This is important because only hand-assigned OIDs can be expected
276  * to remain stable across major versions.
277  */
278  for (i = 0; i < desc->natts; i++)
279  {
280  Form_pg_attribute att = TupleDescAttr(desc, i);
281 
282  if (att->attisdropped || att->attgenerated)
283  continue;
284 
285  if (att->atttypid < FirstGenbkiObjectId)
286  continue;
287 
288  OutputPluginPrepareWrite(ctx, false);
289  logicalrep_write_typ(ctx->out, att->atttypid);
290  OutputPluginWrite(ctx, false);
291  }
292 
293  OutputPluginPrepareWrite(ctx, false);
294  logicalrep_write_rel(ctx->out, relation);
295  OutputPluginWrite(ctx, false);
296  relentry->schema_sent = true;
297  }
298 }
299 
300 /*
301  * Sends the decoded DML over wire.
302  */
303 static void
305  Relation relation, ReorderBufferChange *change)
306 {
308  MemoryContext old;
309  RelationSyncEntry *relentry;
310 
311  if (!is_publishable_relation(relation))
312  return;
313 
314  relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
315 
316  /* First check the table filter */
317  switch (change->action)
318  {
320  if (!relentry->pubactions.pubinsert)
321  return;
322  break;
324  if (!relentry->pubactions.pubupdate)
325  return;
326  break;
328  if (!relentry->pubactions.pubdelete)
329  return;
330  break;
331  default:
332  Assert(false);
333  }
334 
335  /* Avoid leaking memory by using and resetting our own context */
336  old = MemoryContextSwitchTo(data->context);
337 
338  maybe_send_schema(ctx, relation, relentry);
339 
340  /* Send the data */
341  switch (change->action)
342  {
344  OutputPluginPrepareWrite(ctx, true);
345  logicalrep_write_insert(ctx->out, relation,
346  &change->data.tp.newtuple->tuple);
347  OutputPluginWrite(ctx, true);
348  break;
350  {
351  HeapTuple oldtuple = change->data.tp.oldtuple ?
352  &change->data.tp.oldtuple->tuple : NULL;
353 
354  OutputPluginPrepareWrite(ctx, true);
355  logicalrep_write_update(ctx->out, relation, oldtuple,
356  &change->data.tp.newtuple->tuple);
357  OutputPluginWrite(ctx, true);
358  break;
359  }
361  if (change->data.tp.oldtuple)
362  {
363  OutputPluginPrepareWrite(ctx, true);
364  logicalrep_write_delete(ctx->out, relation,
365  &change->data.tp.oldtuple->tuple);
366  OutputPluginWrite(ctx, true);
367  }
368  else
369  elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
370  break;
371  default:
372  Assert(false);
373  }
374 
375  /* Cleanup */
378 }
379 
380 static void
382  int nrelations, Relation relations[], ReorderBufferChange *change)
383 {
385  MemoryContext old;
386  RelationSyncEntry *relentry;
387  int i;
388  int nrelids;
389  Oid *relids;
390 
391  old = MemoryContextSwitchTo(data->context);
392 
393  relids = palloc0(nrelations * sizeof(Oid));
394  nrelids = 0;
395 
396  for (i = 0; i < nrelations; i++)
397  {
398  Relation relation = relations[i];
399  Oid relid = RelationGetRelid(relation);
400 
401  if (!is_publishable_relation(relation))
402  continue;
403 
404  relentry = get_rel_sync_entry(data, relid);
405 
406  if (!relentry->pubactions.pubtruncate)
407  continue;
408 
409  relids[nrelids++] = relid;
410  maybe_send_schema(ctx, relation, relentry);
411  }
412 
413  if (nrelids > 0)
414  {
415  OutputPluginPrepareWrite(ctx, true);
417  nrelids,
418  relids,
419  change->data.truncate.cascade,
420  change->data.truncate.restart_seqs);
421  OutputPluginWrite(ctx, true);
422  }
423 
426 }
427 
428 /*
429  * Currently we always forward.
430  */
431 static bool
433  RepOriginId origin_id)
434 {
435  return false;
436 }
437 
438 /*
439  * Shutdown the output plugin.
440  *
441  * Note, we don't need to clean the data->context as it's child context
442  * of the ctx->context so it will be cleaned up by logical decoding machinery.
443  */
444 static void
446 {
447  if (RelationSyncCache)
448  {
449  hash_destroy(RelationSyncCache);
450  RelationSyncCache = NULL;
451  }
452 }
453 
454 /*
455  * Load publications from the list of publication names.
456  */
457 static List *
459 {
460  List *result = NIL;
461  ListCell *lc;
462 
463  foreach(lc, pubnames)
464  {
465  char *pubname = (char *) lfirst(lc);
466  Publication *pub = GetPublicationByName(pubname, false);
467 
468  result = lappend(result, pub);
469  }
470 
471  return result;
472 }
473 
474 /*
475  * Publication cache invalidation callback.
476  */
477 static void
478 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
479 {
480  publications_valid = false;
481 
482  /*
483  * Also invalidate per-relation cache so that next time the filtering info
484  * is checked it will be updated with the new publication settings.
485  */
486  rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
487 }
488 
489 /*
490  * Initialize the relation schema sync cache for a decoding session.
491  *
492  * The hash table is destroyed at the end of a decoding session. While
493  * relcache invalidations still exist and will still be invoked, they
494  * will just see the null hash table global and take no action.
495  */
496 static void
498 {
499  HASHCTL ctl;
500  MemoryContext old_ctxt;
501 
502  if (RelationSyncCache != NULL)
503  return;
504 
505  /* Make a new hash table for the cache */
506  MemSet(&ctl, 0, sizeof(ctl));
507  ctl.keysize = sizeof(Oid);
508  ctl.entrysize = sizeof(RelationSyncEntry);
509  ctl.hcxt = cachectx;
510 
511  old_ctxt = MemoryContextSwitchTo(cachectx);
512  RelationSyncCache = hash_create("logical replication output relation cache",
513  128, &ctl,
515  (void) MemoryContextSwitchTo(old_ctxt);
516 
517  Assert(RelationSyncCache != NULL);
518 
522  (Datum) 0);
523 }
524 
525 /*
526  * Find or create entry in the relation schema cache.
527  */
528 static RelationSyncEntry *
530 {
531  RelationSyncEntry *entry;
532  bool found;
533  MemoryContext oldctx;
534 
535  Assert(RelationSyncCache != NULL);
536 
537  /* Find cached function info, creating if not found */
539  entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
540  (void *) &relid,
541  HASH_ENTER, &found);
542  MemoryContextSwitchTo(oldctx);
543  Assert(entry != NULL);
544 
545  /* Not found means schema wasn't sent */
546  if (!found || !entry->replicate_valid)
547  {
548  List *pubids = GetRelationPublications(relid);
549  ListCell *lc;
550 
551  /* Reload publications if needed before use. */
552  if (!publications_valid)
553  {
555  if (data->publications)
557 
559  MemoryContextSwitchTo(oldctx);
560  publications_valid = true;
561  }
562 
563  /*
564  * Build publication cache. We can't use one provided by relcache as
565  * relcache considers all publications given relation is in, but here
566  * we only need to consider ones that the subscriber requested.
567  */
568  entry->pubactions.pubinsert = entry->pubactions.pubupdate =
569  entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
570 
571  foreach(lc, data->publications)
572  {
573  Publication *pub = lfirst(lc);
574 
575  if (pub->alltables || list_member_oid(pubids, pub->oid))
576  {
577  entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
578  entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
579  entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
581  }
582 
583  if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
584  entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
585  break;
586  }
587 
588  list_free(pubids);
589 
590  entry->replicate_valid = true;
591  }
592 
593  if (!found)
594  entry->schema_sent = false;
595 
596  return entry;
597 }
598 
599 /*
600  * Relcache invalidation callback
601  */
602 static void
604 {
605  RelationSyncEntry *entry;
606 
607  /*
608  * We can get here if the plugin was used in SQL interface as the
609  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
610  * is no way to unregister the relcache invalidation callback.
611  */
612  if (RelationSyncCache == NULL)
613  return;
614 
615  /*
616  * Nobody keeps pointers to entries in this hash table around outside
617  * logical decoding callback calls - but invalidation events can come in
618  * *during* a callback if we access the relcache in the callback. Because
619  * of that we must mark the cache entry as invalid but not remove it from
620  * the hash while it could still be referenced, then prune it at a later
621  * safe point.
622  *
623  * Getting invalidations for relations that aren't in the table is
624  * entirely normal, since there's no way to unregister for an invalidation
625  * event. So we don't care if it's found or not.
626  */
627  entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
628  HASH_FIND, NULL);
629 
630  /*
631  * Reset schema sent status as the relation definition may have changed.
632  */
633  if (entry != NULL)
634  entry->schema_sent = false;
635 }
636 
637 /*
638  * Publication relation map syscache invalidation callback
639  */
640 static void
642 {
644  RelationSyncEntry *entry;
645 
646  /*
647  * We can get here if the plugin was used in SQL interface as the
648  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
649  * is no way to unregister the relcache invalidation callback.
650  */
651  if (RelationSyncCache == NULL)
652  return;
653 
654  /*
655  * There is no way to find which entry in our cache the hash belongs to so
656  * mark the whole cache as invalid.
657  */
658  hash_seq_init(&status, RelationSyncCache);
659  while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
660  entry->replicate_valid = false;
661 }
LogicalDecodeTruncateCB truncate_cb
#define NIL
Definition: pg_list.h:65
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
void logicalrep_write_typ(StringInfo out, Oid typoid)
Definition: proto.c:404
static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
Definition: pgoutput.c:304
PublicationActions pubactions
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:814
#define IsA(nodeptr, _type_)
Definition: nodes.h:576
#define AllocSetContextCreate
Definition: memutils.h:170
#define DEBUG1
Definition: elog.h:25
static bool publications_valid
Definition: pgoutput.c:47
RepOriginId origin_id
bool replicate_valid
Definition: pgoutput.c:58
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:478
MemoryContext hcxt
Definition: hsearch.h:78
#define RelationGetDescr(relation)
Definition: rel.h:448
struct ReorderBufferChange::@101::@102 tp
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: pgoutput.c:75
MemoryContext context
Definition: pgoutput.h:20
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
static void rel_sync_cache_relation_cb(Datum arg, Oid relid)
Definition: pgoutput.c:603
static void pgoutput_shutdown(LogicalDecodingContext *ctx)
Definition: pgoutput.c:445
static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:246
static void maybe_send_schema(LogicalDecodingContext *ctx, Relation relation, RelationSyncEntry *relentry)
Definition: pgoutput.c:260
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
uint16 RepOriginId
Definition: xlogdefs.h:58
Size entrysize
Definition: hsearch.h:73
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:608
void * output_plugin_private
Definition: logical.h:75
#define MemSet(start, val, len)
Definition: c.h:962
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:75
MemoryContext context
Definition: logical.h:35
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
List * output_plugin_options
Definition: logical.h:58
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:113
#define PG_UINT32_MAX
Definition: c.h:443
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:431
unsigned int Oid
Definition: postgres_ext.h:31
struct RelationSyncEntry RelationSyncEntry
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:641
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
void list_free_deep(List *list)
Definition: list.c:1391
Publication * GetPublicationByName(const char *pubname, bool missing_ok)
XLogRecPtr origin_lsn
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
Definition: inval.c:1468
OutputPluginOutputType output_type
Definition: output_plugin.h:28
Definition: dynahash.c:208
List * GetRelationPublications(Oid relid)
#define ERROR
Definition: elog.h:43
LogicalDecodeCommitCB commit_cb
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:36
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define LOGICALREP_PROTO_MIN_VERSION_NUM
Definition: logicalproto.h:27
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3652
bool is_publishable_relation(Relation rel)
static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: pgoutput.c:149
List * publication_names
Definition: pgoutput.h:26
void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
Definition: proto.c:255
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:200
unsigned int uint32
Definition: c.h:359
void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:302
void logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
Definition: proto.c:142
#define ereport(elevel, rest)
Definition: elog.h:141
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Oid relid)
Definition: pgoutput.c:529
Node * arg
Definition: parsenodes.h:731
void logicalrep_write_rel(StringInfo out, Relation rel)
Definition: proto.c:354
List * lappend(List *list, void *datum)
Definition: list.c:322
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:46
static HTAB * RelationSyncCache
Definition: pgoutput.c:63
#define HASH_BLOBS
Definition: hsearch.h:88
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
Definition: logical.c:549
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1426
void * palloc0(Size size)
Definition: mcxt.c:980
LogicalDecodeChangeCB change_cb
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
uintptr_t Datum
Definition: postgres.h:367
Size keysize
Definition: hsearch.h:72
PG_MODULE_MAGIC
Definition: pgoutput.c:27
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:523
void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, HeapTuple newtuple)
Definition: proto.c:185
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:675
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:739
#define lfirst(lc)
Definition: pg_list.h:190
union ReorderBufferChange::@101 data
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
Definition: pgoutput.c:432
static int list_length(const List *l)
Definition: pg_list.h:169
LogicalDecodeShutdownCB shutdown_cb
static List * LoadPublications(List *pubnames)
Definition: pgoutput.c:458
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
static void parse_output_parameters(List *options, uint32 *protocol_version, List **publication_names)
Definition: pgoutput.c:89
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379
LogicalDecodeStartupCB startup_cb
#define InvalidRepOriginId
Definition: origin.h:33
List * publications
Definition: pgoutput.h:27
static void init_rel_sync_cache(MemoryContext decoding_context)
Definition: pgoutput.c:497
#define FirstGenbkiObjectId
Definition: transam.h:139
int errmsg(const char *fmt,...)
Definition: elog.c:822
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:536
void list_free(List *list)
Definition: list.c:1377
#define elog(elevel,...)
Definition: elog.h:228
StringInfo out
Definition: logical.h:70
int i
void * arg
LogicalDecodeBeginCB begin_cb
char * defname
Definition: parsenodes.h:730
static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Definition: pgoutput.c:381
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:226
static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:210
LogicalDecodeFilterByOriginCB filter_by_origin_cb
Definition: pg_list.h:50
#define RelationGetRelid(relation)
Definition: rel.h:422
struct ReorderBufferChange::@101::@103 truncate
MemoryContext CacheMemoryContext
Definition: mcxt.c:47
PublicationActions pubactions
Definition: pgoutput.c:59
bool scanint8(const char *str, bool errorOK, int64 *result)
Definition: int8.c:56
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:888
uint32 protocol_version
Definition: pgoutput.h:24