PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
pgoutput.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pgoutput.c
4  * Logical Replication output plugin
5  *
6  * Copyright (c) 2012-2017, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/pgoutput/pgoutput.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "catalog/pg_publication.h"
16 
17 #include "replication/logical.h"
19 #include "replication/origin.h"
20 #include "replication/pgoutput.h"
21 
22 #include "utils/inval.h"
23 #include "utils/int8.h"
24 #include "utils/memutils.h"
25 #include "utils/syscache.h"
26 #include "utils/varlena.h"
27 
29 
31 
33  OutputPluginOptions *opt, bool is_init);
36  ReorderBufferTXN *txn);
38  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
40  ReorderBufferTXN *txn, Relation rel,
41  ReorderBufferChange *change);
43  RepOriginId origin_id);
44 
45 static bool publications_valid;
46 
47 static List *LoadPublications(List *pubnames);
48 static void publication_invalidation_cb(Datum arg, int cacheid,
49  uint32 hashvalue);
50 
51 /* Entry in the map used to remember which relation schemas we sent. */
52 typedef struct RelationSyncEntry
53 {
54  Oid relid; /* relation oid */
55  bool schema_sent; /* did we send the schema? */
59 
60 /* Map used to remember which relation schemas we sent. */
62 
63 static void init_rel_sync_cache(MemoryContext decoding_context);
65 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
66 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
67  uint32 hashvalue);
68 
69 /*
70  * Specify output plugin callbacks
71  */
72 void
74 {
76 
83 }
84 
85 static void
87  List **publication_names)
88 {
89  ListCell *lc;
90  bool protocol_version_given = false;
91  bool publication_names_given = false;
92 
93  foreach(lc, options)
94  {
95  DefElem *defel = (DefElem *) lfirst(lc);
96 
97  Assert(defel->arg == NULL || IsA(defel->arg, String));
98 
99  /* Check each param, whether or not we recognize it */
100  if (strcmp(defel->defname, "proto_version") == 0)
101  {
102  int64 parsed;
103 
104  if (protocol_version_given)
105  ereport(ERROR,
106  (errcode(ERRCODE_SYNTAX_ERROR),
107  errmsg("conflicting or redundant options")));
108  protocol_version_given = true;
109 
110  if (!scanint8(strVal(defel->arg), true, &parsed))
111  ereport(ERROR,
112  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
113  errmsg("invalid proto_version")));
114 
115  if (parsed > PG_UINT32_MAX || parsed < 0)
116  ereport(ERROR,
117  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
118  errmsg("proto_verson \"%s\" out of range",
119  strVal(defel->arg))));
120 
121  *protocol_version = (uint32) parsed;
122  }
123  else if (strcmp(defel->defname, "publication_names") == 0)
124  {
125  if (publication_names_given)
126  ereport(ERROR,
127  (errcode(ERRCODE_SYNTAX_ERROR),
128  errmsg("conflicting or redundant options")));
129  publication_names_given = true;
130 
131  if (!SplitIdentifierString(strVal(defel->arg), ',',
132  publication_names))
133  ereport(ERROR,
134  (errcode(ERRCODE_INVALID_NAME),
135  errmsg("invalid publication_names syntax")));
136  }
137  else
138  elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
139  }
140 }
141 
142 /*
143  * Initialize this plugin
144  */
145 static void
147  bool is_init)
148 {
149  PGOutputData *data = palloc0(sizeof(PGOutputData));
150 
151  /* Create our memory context for private allocations. */
153  "logical replication output context",
157 
158  ctx->output_plugin_private = data;
159 
160  /* This plugin uses binary protocol. */
162 
163  /*
164  * This is replication start and not slot initialization.
165  *
166  * Parse and validate options passed by the client.
167  */
168  if (!is_init)
169  {
170  /* Parse the params and ERROR if we see any we don't recognize */
172  &data->protocol_version,
173  &data->publication_names);
174 
175  /* Check if we support requested protocol */
177  ereport(ERROR,
178  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
179  errmsg("client sent proto_version=%d but we only support protocol %d or lower",
181 
183  ereport(ERROR,
184  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
185  errmsg("client sent proto_version=%d but we only support protocol %d or higher",
187 
188  if (list_length(data->publication_names) < 1)
189  ereport(ERROR,
190  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
191  errmsg("publication_names parameter missing")));
192 
193  /* Init publication state. */
194  data->publications = NIL;
195  publications_valid = false;
198  (Datum) 0);
199 
200  /* Initialize relation schema cache. */
202  }
203 }
204 
205 /*
206  * BEGIN callback
207  */
208 static void
210 {
211  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
212 
213  OutputPluginPrepareWrite(ctx, !send_replication_origin);
214  logicalrep_write_begin(ctx->out, txn);
215 
216  if (send_replication_origin)
217  {
218  char *origin;
219 
220  /* Message boundary */
221  OutputPluginWrite(ctx, false);
222  OutputPluginPrepareWrite(ctx, true);
223 
224  /*----------
225  * XXX: which behaviour do we want here?
226  *
227  * Alternatives:
228  * - don't send origin message if origin name not found
229  * (that's what we do now)
230  * - throw error - that will break replication, not good
231  * - send some special "unknown" origin
232  *----------
233  */
234  if (replorigin_by_oid(txn->origin_id, true, &origin))
235  logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
236  }
237 
238  OutputPluginWrite(ctx, true);
239 }
240 
241 /*
242  * COMMIT callback
243  */
244 static void
246  XLogRecPtr commit_lsn)
247 {
249 
250  OutputPluginPrepareWrite(ctx, true);
251  logicalrep_write_commit(ctx->out, txn, commit_lsn);
252  OutputPluginWrite(ctx, true);
253 }
254 
255 /*
256  * Sends the decoded DML over wire.
257  */
258 static void
260  Relation relation, ReorderBufferChange *change)
261 {
263  MemoryContext old;
264  RelationSyncEntry *relentry;
265 
266  relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
267 
268  /* First check the table filter */
269  switch (change->action)
270  {
272  if (!relentry->pubactions.pubinsert)
273  return;
274  break;
276  if (!relentry->pubactions.pubupdate)
277  return;
278  break;
280  if (!relentry->pubactions.pubdelete)
281  return;
282  break;
283  default:
284  Assert(false);
285  }
286 
287  /* Avoid leaking memory by using and resetting our own context */
288  old = MemoryContextSwitchTo(data->context);
289 
290  /*
291  * Write the relation schema if the current schema haven't been sent yet.
292  */
293  if (!relentry->schema_sent)
294  {
295  TupleDesc desc;
296  int i;
297 
298  desc = RelationGetDescr(relation);
299 
300  /*
301  * Write out type info if needed. We do that only for user created
302  * types.
303  */
304  for (i = 0; i < desc->natts; i++)
305  {
306  Form_pg_attribute att = desc->attrs[i];
307 
308  if (att->attisdropped)
309  continue;
310 
311  if (att->atttypid < FirstNormalObjectId)
312  continue;
313 
314  OutputPluginPrepareWrite(ctx, false);
315  logicalrep_write_typ(ctx->out, att->atttypid);
316  OutputPluginWrite(ctx, false);
317  }
318 
319  OutputPluginPrepareWrite(ctx, false);
320  logicalrep_write_rel(ctx->out, relation);
321  OutputPluginWrite(ctx, false);
322  relentry->schema_sent = true;
323  }
324 
325  /* Send the data */
326  switch (change->action)
327  {
329  OutputPluginPrepareWrite(ctx, true);
330  logicalrep_write_insert(ctx->out, relation,
331  &change->data.tp.newtuple->tuple);
332  OutputPluginWrite(ctx, true);
333  break;
335  {
336  HeapTuple oldtuple = change->data.tp.oldtuple ?
337  &change->data.tp.oldtuple->tuple : NULL;
338 
339  OutputPluginPrepareWrite(ctx, true);
340  logicalrep_write_update(ctx->out, relation, oldtuple,
341  &change->data.tp.newtuple->tuple);
342  OutputPluginWrite(ctx, true);
343  break;
344  }
346  if (change->data.tp.oldtuple)
347  {
348  OutputPluginPrepareWrite(ctx, true);
349  logicalrep_write_delete(ctx->out, relation,
350  &change->data.tp.oldtuple->tuple);
351  OutputPluginWrite(ctx, true);
352  }
353  else
354  elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
355  break;
356  default:
357  Assert(false);
358  }
359 
360  /* Cleanup */
363 }
364 
365 /*
366  * Currently we always forward.
367  */
368 static bool
370  RepOriginId origin_id)
371 {
372  return false;
373 }
374 
375 /*
376  * Shutdown the output plugin.
377  *
378  * Note, we don't need to clean the data->context as it's child context
379  * of the ctx->context so it will be cleaned up by logical decoding machinery.
380  */
381 static void
383 {
384  if (RelationSyncCache)
385  {
386  hash_destroy(RelationSyncCache);
387  RelationSyncCache = NULL;
388  }
389 }
390 
391 /*
392  * Load publications from the list of publication names.
393  */
394 static List *
396 {
397  List *result = NIL;
398  ListCell *lc;
399 
400  foreach(lc, pubnames)
401  {
402  char *pubname = (char *) lfirst(lc);
403  Publication *pub = GetPublicationByName(pubname, false);
404 
405  result = lappend(result, pub);
406  }
407 
408  return result;
409 }
410 
411 /*
412  * Publication cache invalidation callback.
413  */
414 static void
415 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
416 {
417  publications_valid = false;
418 
419  /*
420  * Also invalidate per-relation cache so that next time the filtering info
421  * is checked it will be updated with the new publication settings.
422  */
423  rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
424 }
425 
426 /*
427  * Initialize the relation schema sync cache for a decoding session.
428  *
429  * The hash table is destroyed at the end of a decoding session. While
430  * relcache invalidations still exist and will still be invoked, they
431  * will just see the null hash table global and take no action.
432  */
433 static void
435 {
436  HASHCTL ctl;
437  MemoryContext old_ctxt;
438 
439  if (RelationSyncCache != NULL)
440  return;
441 
442  /* Make a new hash table for the cache */
443  MemSet(&ctl, 0, sizeof(ctl));
444  ctl.keysize = sizeof(Oid);
445  ctl.entrysize = sizeof(RelationSyncEntry);
446  ctl.hcxt = cachectx;
447 
448  old_ctxt = MemoryContextSwitchTo(cachectx);
449  RelationSyncCache = hash_create("logical replication output relation cache",
450  128, &ctl,
452  (void) MemoryContextSwitchTo(old_ctxt);
453 
454  Assert(RelationSyncCache != NULL);
455 
459  (Datum) 0);
460 }
461 
462 /*
463  * Find or create entry in the relation schema cache.
464  */
465 static RelationSyncEntry *
467 {
468  RelationSyncEntry *entry;
469  bool found;
470  MemoryContext oldctx;
471 
472  Assert(RelationSyncCache != NULL);
473 
474  /* Find cached function info, creating if not found */
476  entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
477  (void *) &relid,
478  HASH_ENTER, &found);
479  MemoryContextSwitchTo(oldctx);
480  Assert(entry != NULL);
481 
482  /* Not found means schema wasn't sent */
483  if (!found || !entry->replicate_valid)
484  {
485  List *pubids = GetRelationPublications(relid);
486  ListCell *lc;
487 
488  /* Reload publications if needed before use. */
489  if (!publications_valid)
490  {
492  if (data->publications)
494 
496  MemoryContextSwitchTo(oldctx);
497  publications_valid = true;
498  }
499 
500  /*
501  * Build publication cache. We can't use one provided by relcache as
502  * relcache considers all publications given relation is in, but here
503  * we only need to consider ones that the subscriber requested.
504  */
505  entry->pubactions.pubinsert = entry->pubactions.pubupdate =
506  entry->pubactions.pubdelete = false;
507 
508  foreach(lc, data->publications)
509  {
510  Publication *pub = lfirst(lc);
511 
512  if (pub->alltables || list_member_oid(pubids, pub->oid))
513  {
514  entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
515  entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
516  entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
517  }
518 
519  if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
520  entry->pubactions.pubdelete)
521  break;
522  }
523 
524  list_free(pubids);
525 
526  entry->replicate_valid = true;
527  }
528 
529  if (!found)
530  entry->schema_sent = false;
531 
532  return entry;
533 }
534 
535 /*
536  * Relcache invalidation callback
537  */
538 static void
540 {
541  RelationSyncEntry *entry;
542 
543  /*
544  * We can get here if the plugin was used in SQL interface as the
545  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
546  * is no way to unregister the relcache invalidation callback.
547  */
548  if (RelationSyncCache == NULL)
549  return;
550 
551  /*
552  * Nobody keeps pointers to entries in this hash table around outside
553  * logical decoding callback calls - but invalidation events can come in
554  * *during* a callback if we access the relcache in the callback. Because
555  * of that we must mark the cache entry as invalid but not remove it from
556  * the hash while it could still be referenced, then prune it at a later
557  * safe point.
558  *
559  * Getting invalidations for relations that aren't in the table is
560  * entirely normal, since there's no way to unregister for an invalidation
561  * event. So we don't care if it's found or not.
562  */
563  entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
564  HASH_FIND, NULL);
565 
566  /*
567  * Reset schema sent status as the relation definition may have changed.
568  */
569  if (entry != NULL)
570  entry->schema_sent = false;
571 }
572 
573 /*
574  * Publication relation map syscache invalidation callback
575  */
576 static void
578 {
580  RelationSyncEntry *entry;
581 
582  /*
583  * We can get here if the plugin was used in SQL interface as the
584  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
585  * is no way to unregister the relcache invalidation callback.
586  */
587  if (RelationSyncCache == NULL)
588  return;
589 
590  /*
591  * There is no way to find which entry in our cache the hash belongs to so
592  * mark the whole cache as invalid.
593  */
594  hash_seq_init(&status, RelationSyncCache);
595  while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
596  entry->replicate_valid = false;
597 }
#define NIL
Definition: pg_list.h:69
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
void logicalrep_write_typ(StringInfo out, Oid typoid)
Definition: proto.c:349
static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
Definition: pgoutput.c:259
PublicationActions pubactions
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:793
#define IsA(nodeptr, _type_)
Definition: nodes.h:560
#define DEBUG1
Definition: elog.h:25
static bool publications_valid
Definition: pgoutput.c:45
RepOriginId origin_id
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:35
bool replicate_valid
Definition: pgoutput.c:56
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:415
MemoryContext hcxt
Definition: hsearch.h:78
#define RelationGetDescr(relation)
Definition: rel.h:428
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: pgoutput.c:73
MemoryContext context
Definition: pgoutput.h:20
static void rel_sync_cache_relation_cb(Datum arg, Oid relid)
Definition: pgoutput.c:539
static void pgoutput_shutdown(LogicalDecodingContext *ctx)
Definition: pgoutput.c:382
static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:245
union ReorderBufferChange::@93 data
Form_pg_attribute * attrs
Definition: tupdesc.h:74
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
uint16 RepOriginId
Definition: xlogdefs.h:51
Size entrysize
Definition: hsearch.h:73
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:575
void * output_plugin_private
Definition: logical.h:71
#define MemSet(start, val, len)
Definition: c.h:857
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:72
MemoryContext context
Definition: logical.h:38
return result
Definition: formatting.c:1632
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:135
List * output_plugin_options
Definition: logical.h:54
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:110
#define PG_UINT32_MAX
Definition: c.h:341
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:885
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:398
unsigned int Oid
Definition: postgres_ext.h:31
struct RelationSyncEntry RelationSyncEntry
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:577
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
void list_free_deep(List *list)
Definition: list.c:1147
int natts
Definition: tupdesc.h:73
Publication * GetPublicationByName(const char *pubname, bool missing_ok)
XLogRecPtr origin_lsn
#define ALLOCSET_DEFAULT_MINSIZE
Definition: memutils.h:162
#define FirstNormalObjectId
Definition: transam.h:94
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
Definition: inval.c:1431
OutputPluginOutputType output_type
Definition: output_plugin.h:28
Definition: dynahash.c:193
List * GetRelationPublications(Oid relid)
#define ERROR
Definition: elog.h:43
LogicalDecodeCommitCB commit_cb
#define LOGICALREP_PROTO_MIN_VERSION_NUM
Definition: logicalproto.h:27
struct ReorderBufferChange::@93::@94 tp
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3248
static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: pgoutput.c:146
List * publication_names
Definition: pgoutput.h:26
void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
Definition: proto.c:252
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:187
unsigned int uint32
Definition: c.h:268
void logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
Definition: proto.c:139
#define ereport(elevel, rest)
Definition: elog.h:122
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Oid relid)
Definition: pgoutput.c:466
Node * arg
Definition: parsenodes.h:720
void logicalrep_write_rel(StringInfo out, Relation rel)
Definition: proto.c:299
List * lappend(List *list, void *datum)
Definition: list.c:128
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:43
static HTAB * RelationSyncCache
Definition: pgoutput.c:61
#define HASH_BLOBS
Definition: hsearch.h:88
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
Definition: logical.c:516
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1389
void * palloc0(Size size)
Definition: mcxt.c:878
LogicalDecodeChangeCB change_cb
Definition: output_plugin.h:99
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:301
uintptr_t Datum
Definition: postgres.h:372
Size keysize
Definition: hsearch.h:72
PG_MODULE_MAGIC
Definition: pgoutput.c:28
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:490
void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, HeapTuple newtuple)
Definition: proto.c:182
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:505
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
#define lfirst(lc)
Definition: pg_list.h:106
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
Definition: pgoutput.c:369
static int list_length(const List *l)
Definition: pg_list.h:89
LogicalDecodeShutdownCB shutdown_cb
static List * LoadPublications(List *pubnames)
Definition: pgoutput.c:395
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1351
static void parse_output_parameters(List *options, uint32 *protocol_version, List **publication_names)
Definition: pgoutput.c:86
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1341
LogicalDecodeStartupCB startup_cb
Definition: output_plugin.h:97
#define InvalidRepOriginId
Definition: origin.h:34
List * publications
Definition: pgoutput.h:27
static void init_rel_sync_cache(MemoryContext decoding_context)
Definition: pgoutput.c:434
int errmsg(const char *fmt,...)
Definition: elog.c:797
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:503
void list_free(List *list)
Definition: list.c:1133
StringInfo out
Definition: logical.h:66
#define ALLOCSET_DEFAULT_INITSIZE
Definition: memutils.h:163
int i
void * arg
LogicalDecodeBeginCB begin_cb
Definition: output_plugin.h:98
char * defname
Definition: parsenodes.h:719
#define ALLOCSET_DEFAULT_MAXSIZE
Definition: memutils.h:164
#define elog
Definition: elog.h:219
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:224
static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:209
LogicalDecodeFilterByOriginCB filter_by_origin_cb
Definition: pg_list.h:45
#define RelationGetRelid(relation)
Definition: rel.h:416
MemoryContext CacheMemoryContext
Definition: mcxt.c:46
PublicationActions pubactions
Definition: pgoutput.c:57
bool scanint8(const char *str, bool errorOK, int64 *result)
Definition: int8.c:55
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:783
uint32 protocol_version
Definition: pgoutput.h:24