PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
pgoutput.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pgoutput.c
4  * Logical Replication output plugin
5  *
6  * Copyright (c) 2012-2017, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/pgoutput/pgoutput.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "catalog/pg_publication.h"
16 
17 #include "replication/logical.h"
19 #include "replication/origin.h"
20 #include "replication/pgoutput.h"
21 
22 #include "utils/inval.h"
23 #include "utils/int8.h"
24 #include "utils/lsyscache.h"
25 #include "utils/memutils.h"
26 #include "utils/syscache.h"
27 #include "utils/varlena.h"
28 
30 
32 
34  OutputPluginOptions *opt, bool is_init);
37  ReorderBufferTXN *txn);
39  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
41  ReorderBufferTXN *txn, Relation rel,
42  ReorderBufferChange *change);
44  RepOriginId origin_id);
45 
46 static bool publications_valid;
47 
48 static List *LoadPublications(List *pubnames);
49 static void publication_invalidation_cb(Datum arg, int cacheid,
50  uint32 hashvalue);
51 
52 /* Entry in the map used to remember which relation schemas we sent. */
53 typedef struct RelationSyncEntry
54 {
55  Oid relid; /* relation oid */
56  bool schema_sent; /* did we send the schema? */
60 
61 /* Map used to remember which relation schemas we sent. */
62 static HTAB *RelationSyncCache = NULL;
63 
64 static void init_rel_sync_cache(MemoryContext decoding_context);
66 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
67 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
68  uint32 hashvalue);
69 
70 /*
71  * Specify output plugin callbacks
72  */
73 void
75 {
77 
84 }
85 
86 static void
88  List **publication_names)
89 {
90  ListCell *lc;
91  bool protocol_version_given = false;
92  bool publication_names_given = false;
93 
94  foreach(lc, options)
95  {
96  DefElem *defel = (DefElem *) lfirst(lc);
97 
98  Assert(defel->arg == NULL || IsA(defel->arg, String));
99 
100  /* Check each param, whether or not we recognize it */
101  if (strcmp(defel->defname, "proto_version") == 0)
102  {
103  int64 parsed;
104 
105  if (protocol_version_given)
106  ereport(ERROR,
107  (errcode(ERRCODE_SYNTAX_ERROR),
108  errmsg("conflicting or redundant options")));
109  protocol_version_given = true;
110 
111  if (!scanint8(strVal(defel->arg), true, &parsed))
112  ereport(ERROR,
113  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
114  errmsg("invalid proto_version")));
115 
116  if (parsed > PG_UINT32_MAX || parsed < 0)
117  ereport(ERROR,
118  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
119  errmsg("proto_version \"%s\" out of range",
120  strVal(defel->arg))));
121 
122  *protocol_version = (uint32) parsed;
123  }
124  else if (strcmp(defel->defname, "publication_names") == 0)
125  {
126  if (publication_names_given)
127  ereport(ERROR,
128  (errcode(ERRCODE_SYNTAX_ERROR),
129  errmsg("conflicting or redundant options")));
130  publication_names_given = true;
131 
132  if (!SplitIdentifierString(strVal(defel->arg), ',',
133  publication_names))
134  ereport(ERROR,
135  (errcode(ERRCODE_INVALID_NAME),
136  errmsg("invalid publication_names syntax")));
137  }
138  else
139  elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
140  }
141 }
142 
143 /*
144  * Initialize this plugin
145  */
146 static void
148  bool is_init)
149 {
150  PGOutputData *data = palloc0(sizeof(PGOutputData));
151 
152  /* Create our memory context for private allocations. */
154  "logical replication output context",
158 
159  ctx->output_plugin_private = data;
160 
161  /* This plugin uses binary protocol. */
163 
164  /*
165  * This is replication start and not slot initialization.
166  *
167  * Parse and validate options passed by the client.
168  */
169  if (!is_init)
170  {
171  /* Parse the params and ERROR if we see any we don't recognize */
173  &data->protocol_version,
174  &data->publication_names);
175 
176  /* Check if we support requested protocol */
178  ereport(ERROR,
179  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
180  errmsg("client sent proto_version=%d but we only support protocol %d or lower",
182 
184  ereport(ERROR,
185  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
186  errmsg("client sent proto_version=%d but we only support protocol %d or higher",
188 
189  if (list_length(data->publication_names) < 1)
190  ereport(ERROR,
191  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
192  errmsg("publication_names parameter missing")));
193 
194  /* Init publication state. */
195  data->publications = NIL;
196  publications_valid = false;
199  (Datum) 0);
200 
201  /* Initialize relation schema cache. */
203  }
204 }
205 
206 /*
207  * BEGIN callback
208  */
209 static void
211 {
212  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
213 
214  OutputPluginPrepareWrite(ctx, !send_replication_origin);
215  logicalrep_write_begin(ctx->out, txn);
216 
217  if (send_replication_origin)
218  {
219  char *origin;
220 
221  /* Message boundary */
222  OutputPluginWrite(ctx, false);
223  OutputPluginPrepareWrite(ctx, true);
224 
225  /*----------
226  * XXX: which behaviour do we want here?
227  *
228  * Alternatives:
229  * - don't send origin message if origin name not found
230  * (that's what we do now)
231  * - throw error - that will break replication, not good
232  * - send some special "unknown" origin
233  *----------
234  */
235  if (replorigin_by_oid(txn->origin_id, true, &origin))
236  logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
237  }
238 
239  OutputPluginWrite(ctx, true);
240 }
241 
242 /*
243  * COMMIT callback
244  */
245 static void
247  XLogRecPtr commit_lsn)
248 {
250 
251  OutputPluginPrepareWrite(ctx, true);
252  logicalrep_write_commit(ctx->out, txn, commit_lsn);
253  OutputPluginWrite(ctx, true);
254 }
255 
256 /*
257  * Sends the decoded DML over wire.
258  */
259 static void
261  Relation relation, ReorderBufferChange *change)
262 {
264  MemoryContext old;
265  RelationSyncEntry *relentry;
266 
267  relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
268 
269  /* First check the table filter */
270  switch (change->action)
271  {
273  if (!relentry->pubactions.pubinsert)
274  return;
275  break;
277  if (!relentry->pubactions.pubupdate)
278  return;
279  break;
281  if (!relentry->pubactions.pubdelete)
282  return;
283  break;
284  default:
285  Assert(false);
286  }
287 
288  /* Avoid leaking memory by using and resetting our own context */
289  old = MemoryContextSwitchTo(data->context);
290 
291  /*
292  * Write the relation schema if the current schema haven't been sent yet.
293  */
294  if (!relentry->schema_sent)
295  {
296  TupleDesc desc;
297  int i;
298 
299  desc = RelationGetDescr(relation);
300 
301  /*
302  * Write out type info if needed. We do that only for user created
303  * types.
304  */
305  for (i = 0; i < desc->natts; i++)
306  {
307  Form_pg_attribute att = TupleDescAttr(desc, i);
308 
309  if (att->attisdropped)
310  continue;
311 
312  if (att->atttypid < FirstNormalObjectId)
313  continue;
314 
315  OutputPluginPrepareWrite(ctx, false);
316  logicalrep_write_typ(ctx->out, att->atttypid);
317  OutputPluginWrite(ctx, false);
318  }
319 
320  OutputPluginPrepareWrite(ctx, false);
321  logicalrep_write_rel(ctx->out, relation);
322  OutputPluginWrite(ctx, false);
323  relentry->schema_sent = true;
324  }
325 
326  /* Send the data */
327  switch (change->action)
328  {
330  OutputPluginPrepareWrite(ctx, true);
331  logicalrep_write_insert(ctx->out, relation,
332  &change->data.tp.newtuple->tuple);
333  OutputPluginWrite(ctx, true);
334  break;
336  {
337  HeapTuple oldtuple = change->data.tp.oldtuple ?
338  &change->data.tp.oldtuple->tuple : NULL;
339 
340  OutputPluginPrepareWrite(ctx, true);
341  logicalrep_write_update(ctx->out, relation, oldtuple,
342  &change->data.tp.newtuple->tuple);
343  OutputPluginWrite(ctx, true);
344  break;
345  }
347  if (change->data.tp.oldtuple)
348  {
349  OutputPluginPrepareWrite(ctx, true);
350  logicalrep_write_delete(ctx->out, relation,
351  &change->data.tp.oldtuple->tuple);
352  OutputPluginWrite(ctx, true);
353  }
354  else
355  elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
356  break;
357  default:
358  Assert(false);
359  }
360 
361  /* Cleanup */
364 }
365 
366 /*
367  * Currently we always forward.
368  */
369 static bool
371  RepOriginId origin_id)
372 {
373  return false;
374 }
375 
376 /*
377  * Shutdown the output plugin.
378  *
379  * Note, we don't need to clean the data->context as it's child context
380  * of the ctx->context so it will be cleaned up by logical decoding machinery.
381  */
382 static void
384 {
385  if (RelationSyncCache)
386  {
387  hash_destroy(RelationSyncCache);
388  RelationSyncCache = NULL;
389  }
390 }
391 
392 /*
393  * Load publications from the list of publication names.
394  */
395 static List *
397 {
398  List *result = NIL;
399  ListCell *lc;
400 
401  foreach(lc, pubnames)
402  {
403  char *pubname = (char *) lfirst(lc);
404  Publication *pub = GetPublicationByName(pubname, false);
405 
406  result = lappend(result, pub);
407  }
408 
409  return result;
410 }
411 
412 /*
413  * Publication cache invalidation callback.
414  */
415 static void
416 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
417 {
418  publications_valid = false;
419 
420  /*
421  * Also invalidate per-relation cache so that next time the filtering info
422  * is checked it will be updated with the new publication settings.
423  */
424  rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
425 }
426 
427 /*
428  * Initialize the relation schema sync cache for a decoding session.
429  *
430  * The hash table is destroyed at the end of a decoding session. While
431  * relcache invalidations still exist and will still be invoked, they
432  * will just see the null hash table global and take no action.
433  */
434 static void
436 {
437  HASHCTL ctl;
438  MemoryContext old_ctxt;
439 
440  if (RelationSyncCache != NULL)
441  return;
442 
443  /* Make a new hash table for the cache */
444  MemSet(&ctl, 0, sizeof(ctl));
445  ctl.keysize = sizeof(Oid);
446  ctl.entrysize = sizeof(RelationSyncEntry);
447  ctl.hcxt = cachectx;
448 
449  old_ctxt = MemoryContextSwitchTo(cachectx);
450  RelationSyncCache = hash_create("logical replication output relation cache",
451  128, &ctl,
453  (void) MemoryContextSwitchTo(old_ctxt);
454 
455  Assert(RelationSyncCache != NULL);
456 
460  (Datum) 0);
461 }
462 
463 /*
464  * Find or create entry in the relation schema cache.
465  */
466 static RelationSyncEntry *
468 {
469  RelationSyncEntry *entry;
470  bool found;
471  MemoryContext oldctx;
472 
473  Assert(RelationSyncCache != NULL);
474 
475  /* Find cached function info, creating if not found */
477  entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
478  (void *) &relid,
479  HASH_ENTER, &found);
480  MemoryContextSwitchTo(oldctx);
481  Assert(entry != NULL);
482 
483  /* Not found means schema wasn't sent */
484  if (!found || !entry->replicate_valid)
485  {
486  List *pubids = GetRelationPublications(relid);
487  ListCell *lc;
488 
489  /* Reload publications if needed before use. */
490  if (!publications_valid)
491  {
493  if (data->publications)
495 
497  MemoryContextSwitchTo(oldctx);
498  publications_valid = true;
499  }
500 
501  /*
502  * Build publication cache. We can't use one provided by relcache as
503  * relcache considers all publications given relation is in, but here
504  * we only need to consider ones that the subscriber requested.
505  */
506  entry->pubactions.pubinsert = entry->pubactions.pubupdate =
507  entry->pubactions.pubdelete = false;
508 
509  foreach(lc, data->publications)
510  {
511  Publication *pub = lfirst(lc);
512 
513  /*
514  * Skip tables that look like they are from a heap rewrite (see
515  * make_new_heap()). We need to skip them because the subscriber
516  * won't have a table by that name to receive the data. That
517  * means we won't ship the new data in, say, an added column with
518  * a DEFAULT, but if the user applies the same DDL manually on the
519  * subscriber, then this will work out for them.
520  *
521  * We only need to consider the alltables case, because such a
522  * transient heap won't be an explicit member of a publication.
523  */
524  if (pub->alltables)
525  {
526  char *relname = get_rel_name(relid);
527  unsigned int u;
528  int n;
529 
530  if (sscanf(relname, "pg_temp_%u%n", &u, &n) == 1 &&
531  relname[n] == '\0')
532  {
534  break;
535  }
536  }
537 
538  if (pub->alltables || list_member_oid(pubids, pub->oid))
539  {
540  entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
541  entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
542  entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
543  }
544 
545  if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
546  entry->pubactions.pubdelete)
547  break;
548  }
549 
550  list_free(pubids);
551 
552  entry->replicate_valid = true;
553  }
554 
555  if (!found)
556  entry->schema_sent = false;
557 
558  return entry;
559 }
560 
561 /*
562  * Relcache invalidation callback
563  */
564 static void
566 {
567  RelationSyncEntry *entry;
568 
569  /*
570  * We can get here if the plugin was used in SQL interface as the
571  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
572  * is no way to unregister the relcache invalidation callback.
573  */
574  if (RelationSyncCache == NULL)
575  return;
576 
577  /*
578  * Nobody keeps pointers to entries in this hash table around outside
579  * logical decoding callback calls - but invalidation events can come in
580  * *during* a callback if we access the relcache in the callback. Because
581  * of that we must mark the cache entry as invalid but not remove it from
582  * the hash while it could still be referenced, then prune it at a later
583  * safe point.
584  *
585  * Getting invalidations for relations that aren't in the table is
586  * entirely normal, since there's no way to unregister for an invalidation
587  * event. So we don't care if it's found or not.
588  */
589  entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
590  HASH_FIND, NULL);
591 
592  /*
593  * Reset schema sent status as the relation definition may have changed.
594  */
595  if (entry != NULL)
596  entry->schema_sent = false;
597 }
598 
599 /*
600  * Publication relation map syscache invalidation callback
601  */
602 static void
604 {
606  RelationSyncEntry *entry;
607 
608  /*
609  * We can get here if the plugin was used in SQL interface as the
610  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
611  * is no way to unregister the relcache invalidation callback.
612  */
613  if (RelationSyncCache == NULL)
614  return;
615 
616  /*
617  * There is no way to find which entry in our cache the hash belongs to so
618  * mark the whole cache as invalid.
619  */
620  hash_seq_init(&status, RelationSyncCache);
621  while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
622  entry->replicate_valid = false;
623 }
#define NIL
Definition: pg_list.h:69
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
void logicalrep_write_typ(StringInfo out, Oid typoid)
Definition: proto.c:349
static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
Definition: pgoutput.c:260
PublicationActions pubactions
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:810
#define IsA(nodeptr, _type_)
Definition: nodes.h:561
#define DEBUG1
Definition: elog.h:25
static bool publications_valid
Definition: pgoutput.c:46
RepOriginId origin_id
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:35
bool replicate_valid
Definition: pgoutput.c:57
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:416
MemoryContext hcxt
Definition: hsearch.h:78
#define RelationGetDescr(relation)
Definition: rel.h:428
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: pgoutput.c:74
MemoryContext context
Definition: pgoutput.h:20
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:84
static void rel_sync_cache_relation_cb(Datum arg, Oid relid)
Definition: pgoutput.c:565
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1801
static void pgoutput_shutdown(LogicalDecodingContext *ctx)
Definition: pgoutput.c:383
static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:246
union ReorderBufferChange::@93 data
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
uint16 RepOriginId
Definition: xlogdefs.h:51
Size entrysize
Definition: hsearch.h:73
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:575
void * output_plugin_private
Definition: logical.h:71
#define MemSet(start, val, len)
Definition: c.h:863
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:72
MemoryContext context
Definition: logical.h:38
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:135
List * output_plugin_options
Definition: logical.h:54
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:110
#define PG_UINT32_MAX
Definition: c.h:326
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:902
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:417
unsigned int Oid
Definition: postgres_ext.h:31
struct RelationSyncEntry RelationSyncEntry
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:603
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
void list_free_deep(List *list)
Definition: list.c:1147
int natts
Definition: tupdesc.h:73
Publication * GetPublicationByName(const char *pubname, bool missing_ok)
XLogRecPtr origin_lsn
#define ALLOCSET_DEFAULT_MINSIZE
Definition: memutils.h:162
#define FirstNormalObjectId
Definition: transam.h:94
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
Definition: inval.c:1431
OutputPluginOutputType output_type
Definition: output_plugin.h:28
Definition: dynahash.c:208
List * GetRelationPublications(Oid relid)
#define ERROR
Definition: elog.h:43
LogicalDecodeCommitCB commit_cb
#define LOGICALREP_PROTO_MIN_VERSION_NUM
Definition: logicalproto.h:27
struct ReorderBufferChange::@93::@94 tp
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3264
static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: pgoutput.c:147
List * publication_names
Definition: pgoutput.h:26
void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
Definition: proto.c:252
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:187
unsigned int uint32
Definition: c.h:258
void logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
Definition: proto.c:139
#define ereport(elevel, rest)
Definition: elog.h:122
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Oid relid)
Definition: pgoutput.c:467
Node * arg
Definition: parsenodes.h:720
void logicalrep_write_rel(StringInfo out, Relation rel)
Definition: proto.c:299
List * lappend(List *list, void *datum)
Definition: list.c:128
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:43
static HTAB * RelationSyncCache
Definition: pgoutput.c:62
#define HASH_BLOBS
Definition: hsearch.h:88
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
Definition: logical.c:516
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1389
void * palloc0(Size size)
Definition: mcxt.c:877
LogicalDecodeChangeCB change_cb
Definition: output_plugin.h:99
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
uintptr_t Datum
Definition: postgres.h:372
Size keysize
Definition: hsearch.h:72
PG_MODULE_MAGIC
Definition: pgoutput.c:29
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:490
void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, HeapTuple newtuple)
Definition: proto.c:182
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:505
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:681
#define lfirst(lc)
Definition: pg_list.h:106
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
Definition: pgoutput.c:370
static int list_length(const List *l)
Definition: pg_list.h:89
LogicalDecodeShutdownCB shutdown_cb
static List * LoadPublications(List *pubnames)
Definition: pgoutput.c:396
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1385
static void parse_output_parameters(List *options, uint32 *protocol_version, List **publication_names)
Definition: pgoutput.c:87
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1375
LogicalDecodeStartupCB startup_cb
Definition: output_plugin.h:97
#define InvalidRepOriginId
Definition: origin.h:34
List * publications
Definition: pgoutput.h:27
static void init_rel_sync_cache(MemoryContext decoding_context)
Definition: pgoutput.c:435
int errmsg(const char *fmt,...)
Definition: elog.c:797
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:503
void list_free(List *list)
Definition: list.c:1133
StringInfo out
Definition: logical.h:66
#define ALLOCSET_DEFAULT_INITSIZE
Definition: memutils.h:163
int i
void * arg
LogicalDecodeBeginCB begin_cb
Definition: output_plugin.h:98
char * defname
Definition: parsenodes.h:719
#define ALLOCSET_DEFAULT_MAXSIZE
Definition: memutils.h:164
#define elog
Definition: elog.h:219
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:225
#define RELKIND_RELATION
Definition: pg_class.h:160
static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:210
LogicalDecodeFilterByOriginCB filter_by_origin_cb
Definition: pg_list.h:45
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1726
#define RelationGetRelid(relation)
Definition: rel.h:416
MemoryContext CacheMemoryContext
Definition: mcxt.c:46
PublicationActions pubactions
Definition: pgoutput.c:58
bool scanint8(const char *str, bool errorOK, int64 *result)
Definition: int8.c:55
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:789
uint32 protocol_version
Definition: pgoutput.h:24