PostgreSQL Source Code  git master
test_decoding.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * test_decoding.c
4  * example logical decoding output plugin
5  *
6  * Copyright (c) 2012-2020, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/test_decoding/test_decoding.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "catalog/pg_type.h"
16 
17 #include "replication/logical.h"
18 #include "replication/origin.h"
19 
20 #include "utils/builtins.h"
21 #include "utils/lsyscache.h"
22 #include "utils/memutils.h"
23 #include "utils/rel.h"
24 
26 
27 /* These must be available to dlsym() */
28 extern void _PG_init(void);
30 
31 typedef struct
32 {
38  bool only_local;
40 
42  bool is_init);
45  ReorderBufferTXN *txn);
47  TestDecodingData *data,
48  ReorderBufferTXN *txn,
49  bool last_write);
51  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
53  ReorderBufferTXN *txn, Relation rel,
54  ReorderBufferChange *change);
56  ReorderBufferTXN *txn,
57  int nrelations, Relation relations[],
58  ReorderBufferChange *change);
60  RepOriginId origin_id);
62  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
63  bool transactional, const char *prefix,
64  Size sz, const char *message);
66  ReorderBufferTXN *txn);
68  ReorderBufferTXN *txn);
70  ReorderBufferTXN *txn,
71  XLogRecPtr abort_lsn);
73  ReorderBufferTXN *txn,
74  XLogRecPtr commit_lsn);
76  ReorderBufferTXN *txn,
77  Relation relation,
78  ReorderBufferChange *change);
80  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
81  bool transactional, const char *prefix,
82  Size sz, const char *message);
84  ReorderBufferTXN *txn,
85  int nrelations, Relation relations[],
86  ReorderBufferChange *change);
87 
88 void
89 _PG_init(void)
90 {
91  /* other plugins can perform things here */
92 }
93 
94 /* specify output plugin callbacks */
95 void
97 {
99 
115 }
116 
117 
118 /* initialize this plugin */
119 static void
121  bool is_init)
122 {
123  ListCell *option;
124  TestDecodingData *data;
125 
126  data = palloc0(sizeof(TestDecodingData));
128  "text conversion context",
130  data->include_xids = true;
131  data->include_timestamp = false;
132  data->skip_empty_xacts = false;
133  data->only_local = false;
134 
135  ctx->output_plugin_private = data;
136 
138  opt->receive_rewrites = false;
139 
140  foreach(option, ctx->output_plugin_options)
141  {
142  DefElem *elem = lfirst(option);
143 
144  Assert(elem->arg == NULL || IsA(elem->arg, String));
145 
146  if (strcmp(elem->defname, "include-xids") == 0)
147  {
148  /* if option does not provide a value, it means its value is true */
149  if (elem->arg == NULL)
150  data->include_xids = true;
151  else if (!parse_bool(strVal(elem->arg), &data->include_xids))
152  ereport(ERROR,
153  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
154  errmsg("could not parse value \"%s\" for parameter \"%s\"",
155  strVal(elem->arg), elem->defname)));
156  }
157  else if (strcmp(elem->defname, "include-timestamp") == 0)
158  {
159  if (elem->arg == NULL)
160  data->include_timestamp = true;
161  else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
162  ereport(ERROR,
163  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
164  errmsg("could not parse value \"%s\" for parameter \"%s\"",
165  strVal(elem->arg), elem->defname)));
166  }
167  else if (strcmp(elem->defname, "force-binary") == 0)
168  {
169  bool force_binary;
170 
171  if (elem->arg == NULL)
172  continue;
173  else if (!parse_bool(strVal(elem->arg), &force_binary))
174  ereport(ERROR,
175  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
176  errmsg("could not parse value \"%s\" for parameter \"%s\"",
177  strVal(elem->arg), elem->defname)));
178 
179  if (force_binary)
181  }
182  else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
183  {
184 
185  if (elem->arg == NULL)
186  data->skip_empty_xacts = true;
187  else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
188  ereport(ERROR,
189  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
190  errmsg("could not parse value \"%s\" for parameter \"%s\"",
191  strVal(elem->arg), elem->defname)));
192  }
193  else if (strcmp(elem->defname, "only-local") == 0)
194  {
195 
196  if (elem->arg == NULL)
197  data->only_local = true;
198  else if (!parse_bool(strVal(elem->arg), &data->only_local))
199  ereport(ERROR,
200  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
201  errmsg("could not parse value \"%s\" for parameter \"%s\"",
202  strVal(elem->arg), elem->defname)));
203  }
204  else if (strcmp(elem->defname, "include-rewrites") == 0)
205  {
206 
207  if (elem->arg == NULL)
208  continue;
209  else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
210  ereport(ERROR,
211  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
212  errmsg("could not parse value \"%s\" for parameter \"%s\"",
213  strVal(elem->arg), elem->defname)));
214  }
215  else
216  {
217  ereport(ERROR,
218  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
219  errmsg("option \"%s\" = \"%s\" is unknown",
220  elem->defname,
221  elem->arg ? strVal(elem->arg) : "(null)")));
222  }
223  }
224 }
225 
226 /* cleanup this plugin's resources */
227 static void
229 {
231 
232  /* cleanup our own resources via memory context reset */
234 }
235 
236 /* BEGIN callback */
237 static void
239 {
241 
242  data->xact_wrote_changes = false;
243  if (data->skip_empty_xacts)
244  return;
245 
246  pg_output_begin(ctx, data, txn, true);
247 }
248 
249 static void
251 {
252  OutputPluginPrepareWrite(ctx, last_write);
253  if (data->include_xids)
254  appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
255  else
256  appendStringInfoString(ctx->out, "BEGIN");
257  OutputPluginWrite(ctx, last_write);
258 }
259 
260 /* COMMIT callback */
261 static void
263  XLogRecPtr commit_lsn)
264 {
266 
267  if (data->skip_empty_xacts && !data->xact_wrote_changes)
268  return;
269 
270  OutputPluginPrepareWrite(ctx, true);
271  if (data->include_xids)
272  appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
273  else
274  appendStringInfoString(ctx->out, "COMMIT");
275 
276  if (data->include_timestamp)
277  appendStringInfo(ctx->out, " (at %s)",
279 
280  OutputPluginWrite(ctx, true);
281 }
282 
283 static bool
285  RepOriginId origin_id)
286 {
288 
289  if (data->only_local && origin_id != InvalidRepOriginId)
290  return true;
291  return false;
292 }
293 
294 /*
295  * Print literal `outputstr' already represented as string of type `typid'
296  * into stringbuf `s'.
297  *
298  * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
299  * if standard_conforming_strings were enabled.
300  */
301 static void
302 print_literal(StringInfo s, Oid typid, char *outputstr)
303 {
304  const char *valptr;
305 
306  switch (typid)
307  {
308  case INT2OID:
309  case INT4OID:
310  case INT8OID:
311  case OIDOID:
312  case FLOAT4OID:
313  case FLOAT8OID:
314  case NUMERICOID:
315  /* NB: We don't care about Inf, NaN et al. */
316  appendStringInfoString(s, outputstr);
317  break;
318 
319  case BITOID:
320  case VARBITOID:
321  appendStringInfo(s, "B'%s'", outputstr);
322  break;
323 
324  case BOOLOID:
325  if (strcmp(outputstr, "t") == 0)
326  appendStringInfoString(s, "true");
327  else
328  appendStringInfoString(s, "false");
329  break;
330 
331  default:
332  appendStringInfoChar(s, '\'');
333  for (valptr = outputstr; *valptr; valptr++)
334  {
335  char ch = *valptr;
336 
337  if (SQL_STR_DOUBLE(ch, false))
338  appendStringInfoChar(s, ch);
339  appendStringInfoChar(s, ch);
340  }
341  appendStringInfoChar(s, '\'');
342  break;
343  }
344 }
345 
346 /* print the tuple 'tuple' into the StringInfo s */
347 static void
348 tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
349 {
350  int natt;
351 
352  /* print all columns individually */
353  for (natt = 0; natt < tupdesc->natts; natt++)
354  {
355  Form_pg_attribute attr; /* the attribute itself */
356  Oid typid; /* type of current attribute */
357  Oid typoutput; /* output function */
358  bool typisvarlena;
359  Datum origval; /* possibly toasted Datum */
360  bool isnull; /* column is null? */
361 
362  attr = TupleDescAttr(tupdesc, natt);
363 
364  /*
365  * don't print dropped columns, we can't be sure everything is
366  * available for them
367  */
368  if (attr->attisdropped)
369  continue;
370 
371  /*
372  * Don't print system columns, oid will already have been printed if
373  * present.
374  */
375  if (attr->attnum < 0)
376  continue;
377 
378  typid = attr->atttypid;
379 
380  /* get Datum from tuple */
381  origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
382 
383  if (isnull && skip_nulls)
384  continue;
385 
386  /* print attribute name */
387  appendStringInfoChar(s, ' ');
388  appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
389 
390  /* print attribute type */
391  appendStringInfoChar(s, '[');
393  appendStringInfoChar(s, ']');
394 
395  /* query output function */
396  getTypeOutputInfo(typid,
397  &typoutput, &typisvarlena);
398 
399  /* print separator */
400  appendStringInfoChar(s, ':');
401 
402  /* print data */
403  if (isnull)
404  appendStringInfoString(s, "null");
405  else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
406  appendStringInfoString(s, "unchanged-toast-datum");
407  else if (!typisvarlena)
408  print_literal(s, typid,
409  OidOutputFunctionCall(typoutput, origval));
410  else
411  {
412  Datum val; /* definitely detoasted Datum */
413 
414  val = PointerGetDatum(PG_DETOAST_DATUM(origval));
415  print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
416  }
417  }
418 }
419 
420 /*
421  * callback for individual changed tuples
422  */
423 static void
425  Relation relation, ReorderBufferChange *change)
426 {
427  TestDecodingData *data;
428  Form_pg_class class_form;
429  TupleDesc tupdesc;
430  MemoryContext old;
431 
432  data = ctx->output_plugin_private;
433 
434  /* output BEGIN if we haven't yet */
435  if (data->skip_empty_xacts && !data->xact_wrote_changes)
436  {
437  pg_output_begin(ctx, data, txn, false);
438  }
439  data->xact_wrote_changes = true;
440 
441  class_form = RelationGetForm(relation);
442  tupdesc = RelationGetDescr(relation);
443 
444  /* Avoid leaking memory by using and resetting our own context */
445  old = MemoryContextSwitchTo(data->context);
446 
447  OutputPluginPrepareWrite(ctx, true);
448 
449  appendStringInfoString(ctx->out, "table ");
452  class_form->relrewrite ?
453  get_rel_name(class_form->relrewrite) :
454  NameStr(class_form->relname)));
455  appendStringInfoChar(ctx->out, ':');
456 
457  switch (change->action)
458  {
460  appendStringInfoString(ctx->out, " INSERT:");
461  if (change->data.tp.newtuple == NULL)
462  appendStringInfoString(ctx->out, " (no-tuple-data)");
463  else
464  tuple_to_stringinfo(ctx->out, tupdesc,
465  &change->data.tp.newtuple->tuple,
466  false);
467  break;
469  appendStringInfoString(ctx->out, " UPDATE:");
470  if (change->data.tp.oldtuple != NULL)
471  {
472  appendStringInfoString(ctx->out, " old-key:");
473  tuple_to_stringinfo(ctx->out, tupdesc,
474  &change->data.tp.oldtuple->tuple,
475  true);
476  appendStringInfoString(ctx->out, " new-tuple:");
477  }
478 
479  if (change->data.tp.newtuple == NULL)
480  appendStringInfoString(ctx->out, " (no-tuple-data)");
481  else
482  tuple_to_stringinfo(ctx->out, tupdesc,
483  &change->data.tp.newtuple->tuple,
484  false);
485  break;
487  appendStringInfoString(ctx->out, " DELETE:");
488 
489  /* if there was no PK, we only know that a delete happened */
490  if (change->data.tp.oldtuple == NULL)
491  appendStringInfoString(ctx->out, " (no-tuple-data)");
492  /* In DELETE, only the replica identity is present; display that */
493  else
494  tuple_to_stringinfo(ctx->out, tupdesc,
495  &change->data.tp.oldtuple->tuple,
496  true);
497  break;
498  default:
499  Assert(false);
500  }
501 
504 
505  OutputPluginWrite(ctx, true);
506 }
507 
508 static void
510  int nrelations, Relation relations[], ReorderBufferChange *change)
511 {
512  TestDecodingData *data;
513  MemoryContext old;
514  int i;
515 
516  data = ctx->output_plugin_private;
517 
518  /* output BEGIN if we haven't yet */
519  if (data->skip_empty_xacts && !data->xact_wrote_changes)
520  {
521  pg_output_begin(ctx, data, txn, false);
522  }
523  data->xact_wrote_changes = true;
524 
525  /* Avoid leaking memory by using and resetting our own context */
526  old = MemoryContextSwitchTo(data->context);
527 
528  OutputPluginPrepareWrite(ctx, true);
529 
530  appendStringInfoString(ctx->out, "table ");
531 
532  for (i = 0; i < nrelations; i++)
533  {
534  if (i > 0)
535  appendStringInfoString(ctx->out, ", ");
536 
538  quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
539  NameStr(relations[i]->rd_rel->relname)));
540  }
541 
542  appendStringInfoString(ctx->out, ": TRUNCATE:");
543 
544  if (change->data.truncate.restart_seqs
545  || change->data.truncate.cascade)
546  {
547  if (change->data.truncate.restart_seqs)
548  appendStringInfoString(ctx->out, " restart_seqs");
549  if (change->data.truncate.cascade)
550  appendStringInfoString(ctx->out, " cascade");
551  }
552  else
553  appendStringInfoString(ctx->out, " (no-flags)");
554 
557 
558  OutputPluginWrite(ctx, true);
559 }
560 
561 static void
563  ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
564  const char *prefix, Size sz, const char *message)
565 {
566  OutputPluginPrepareWrite(ctx, true);
567  appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
568  transactional, prefix, sz);
569  appendBinaryStringInfo(ctx->out, message, sz);
570  OutputPluginWrite(ctx, true);
571 }
572 
573 /*
574  * We never try to stream any empty xact so we don't need any special handling
575  * for skip_empty_xacts in streaming mode APIs.
576  */
577 static void
579  ReorderBufferTXN *txn)
580 {
582 
583  OutputPluginPrepareWrite(ctx, true);
584  if (data->include_xids)
585  appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
586  else
587  appendStringInfo(ctx->out, "opening a streamed block for transaction");
588  OutputPluginWrite(ctx, true);
589 }
590 
591 /*
592  * We never try to stream any empty xact so we don't need any special handling
593  * for skip_empty_xacts in streaming mode APIs.
594  */
595 static void
597  ReorderBufferTXN *txn)
598 {
600 
601  OutputPluginPrepareWrite(ctx, true);
602  if (data->include_xids)
603  appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
604  else
605  appendStringInfo(ctx->out, "closing a streamed block for transaction");
606  OutputPluginWrite(ctx, true);
607 }
608 
609 /*
610  * We never try to stream any empty xact so we don't need any special handling
611  * for skip_empty_xacts in streaming mode APIs.
612  */
613 static void
615  ReorderBufferTXN *txn,
616  XLogRecPtr abort_lsn)
617 {
619 
620  OutputPluginPrepareWrite(ctx, true);
621  if (data->include_xids)
622  appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
623  else
624  appendStringInfo(ctx->out, "aborting streamed (sub)transaction");
625  OutputPluginWrite(ctx, true);
626 }
627 
628 /*
629  * We never try to stream any empty xact so we don't need any special handling
630  * for skip_empty_xacts in streaming mode APIs.
631  */
632 static void
634  ReorderBufferTXN *txn,
635  XLogRecPtr commit_lsn)
636 {
638 
639  OutputPluginPrepareWrite(ctx, true);
640 
641  if (data->include_xids)
642  appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
643  else
644  appendStringInfo(ctx->out, "committing streamed transaction");
645 
646  if (data->include_timestamp)
647  appendStringInfo(ctx->out, " (at %s)",
649 
650  OutputPluginWrite(ctx, true);
651 }
652 
653 /*
654  * In streaming mode, we don't display the changes as the transaction can abort
655  * at a later point in time. We don't want users to see the changes until the
656  * transaction is committed.
657  */
658 static void
660  ReorderBufferTXN *txn,
661  Relation relation,
662  ReorderBufferChange *change)
663 {
665 
666  OutputPluginPrepareWrite(ctx, true);
667  if (data->include_xids)
668  appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
669  else
670  appendStringInfo(ctx->out, "streaming change for transaction");
671  OutputPluginWrite(ctx, true);
672 }
673 
674 /*
675  * In streaming mode, we don't display the contents for transactional messages
676  * as the transaction can abort at a later point in time. We don't want users to
677  * see the message contents until the transaction is committed.
678  */
679 static void
681  ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
682  const char *prefix, Size sz, const char *message)
683 {
684  OutputPluginPrepareWrite(ctx, true);
685 
686  if (transactional)
687  {
688  appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
689  transactional, prefix, sz);
690  }
691  else
692  {
693  appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
694  transactional, prefix, sz);
695  appendBinaryStringInfo(ctx->out, message, sz);
696  }
697 
698  OutputPluginWrite(ctx, true);
699 }
700 
701 /*
702  * In streaming mode, we don't display the detailed information of Truncate.
703  * See pg_decode_stream_change.
704  */
705 static void
707  int nrelations, Relation relations[],
708  ReorderBufferChange *change)
709 {
711 
712  OutputPluginPrepareWrite(ctx, true);
713  if (data->include_xids)
714  appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
715  else
716  appendStringInfo(ctx->out, "streaming truncate for transaction");
717  OutputPluginWrite(ctx, true);
718 }
static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
LogicalDecodeTruncateCB truncate_cb
static void pg_decode_shutdown(LogicalDecodingContext *ctx)
TimestampTz commit_time
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: postgres.h:314
static void pg_decode_stream_abort(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
#define IsA(nodeptr, _type_)
Definition: nodes.h:580
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
#define AllocSetContextCreate
Definition: memutils.h:170
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:2784
static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
struct ReorderBufferChange::@99::@101 truncate
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:10727
#define RelationGetDescr(relation)
Definition: rel.h:482
static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
#define PointerGetDatum(X)
Definition: postgres.h:556
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define RelationGetForm(relation)
Definition: rel.h:450
PG_MODULE_MAGIC
Definition: test_decoding.c:25
LogicalDecodeMessageCB message_cb
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1864
LogicalDecodeStreamMessageCB stream_message_cb
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
uint16 RepOriginId
Definition: xlogdefs.h:58
#define strVal(v)
Definition: value.h:54
static void pg_decode_stream_commit(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
int errcode(int sqlerrcode)
Definition: elog.c:610
void * output_plugin_private
Definition: logical.h:75
char * format_type_be(Oid type_oid)
Definition: format_type.c:339
MemoryContext context
Definition: logical.h:35
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
LogicalDecodeStreamAbortCB stream_abort_cb
List * output_plugin_options
Definition: logical.h:58
bool parse_bool(const char *value, bool *result)
Definition: bool.c:30
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
OutputPluginOutputType output_type
Definition: output_plugin.h:28
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
static void print_literal(StringInfo s, Oid typid, char *outputstr)
#define ERROR
Definition: elog.h:43
LogicalDecodeCommitCB commit_cb
static void pg_decode_stream_stop(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:36
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
static void pg_decode_stream_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3191
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:193
static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Node * arg
Definition: parsenodes.h:733
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:188
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:10811
MemoryContext context
Definition: test_decoding.c:33
#define heap_getattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:762
static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void * palloc0(Size size)
Definition: mcxt.c:980
LogicalDecodeChangeCB change_cb
uintptr_t Datum
Definition: postgres.h:367
LogicalDecodeStreamTruncateCB stream_truncate_cb
union ReorderBufferChange::@99 data
TransactionId xid
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
#define ereport(elevel,...)
Definition: elog.h:144
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:576
struct ReorderBufferChange::@99::@100 tp
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:745
#define lfirst(lc)
Definition: pg_list.h:190
size_t Size
Definition: c.h:473
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
LogicalDecodeShutdownCB shutdown_cb
LogicalDecodeStreamCommitCB stream_commit_cb
void _PG_init(void)
Definition: test_decoding.c:89
LogicalDecodeStartupCB startup_cb
#define InvalidRepOriginId
Definition: origin.h:33
FormData_pg_class * Form_pg_class
Definition: pg_class.h:153
static void pg_decode_stream_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
LogicalDecodeStreamStartCB stream_start_cb
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1657
int errmsg(const char *fmt,...)
Definition: elog.c:824
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:589
StringInfo out
Definition: logical.h:70
int i
#define NameStr(name)
Definition: c.h:622
#define SQL_STR_DOUBLE(ch, escape_backslash)
Definition: c.h:1134
LogicalDecodeBeginCB begin_cb
#define PG_DETOAST_DATUM(datum)
Definition: fmgr.h:240
char * defname
Definition: parsenodes.h:732
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1840
LogicalDecodeStreamChangeCB stream_change_cb
#define RelationGetRelid(relation)
Definition: rel.h:456
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227
long val
Definition: informix.c:664
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: test_decoding.c:96
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1736
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:904