PostgreSQL Source Code  git master
test_decoding.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * test_decoding.c
4  * example logical decoding output plugin
5  *
6  * Copyright (c) 2012-2020, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/test_decoding/test_decoding.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "catalog/pg_type.h"
16 
17 #include "replication/logical.h"
18 #include "replication/origin.h"
19 
20 #include "utils/builtins.h"
21 #include "utils/lsyscache.h"
22 #include "utils/memutils.h"
23 #include "utils/rel.h"
24 
26 
27 /* These must be available to dlsym() */
28 extern void _PG_init(void);
30 
31 typedef struct
32 {
37  bool only_local;
39 
40 /*
41  * Maintain the per-transaction level variables to track whether the
42  * transaction and or streams have written any changes. In streaming mode the
43  * transaction can be decoded in streams so along with maintaining whether the
44  * transaction has written any changes, we also need to track whether the
45  * current stream has written any changes. This is required so that if user
46  * has requested to skip the empty transactions we can skip the empty streams
47  * even though the transaction has written some changes.
48  */
49 typedef struct
50 {
54 
56  bool is_init);
59  ReorderBufferTXN *txn);
61  TestDecodingData *data,
62  ReorderBufferTXN *txn,
63  bool last_write);
65  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
67  ReorderBufferTXN *txn, Relation rel,
68  ReorderBufferChange *change);
70  ReorderBufferTXN *txn,
71  int nrelations, Relation relations[],
72  ReorderBufferChange *change);
74  RepOriginId origin_id);
76  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
77  bool transactional, const char *prefix,
78  Size sz, const char *message);
80  ReorderBufferTXN *txn);
82  TestDecodingData *data,
83  ReorderBufferTXN *txn,
84  bool last_write);
86  ReorderBufferTXN *txn);
88  ReorderBufferTXN *txn,
89  XLogRecPtr abort_lsn);
91  ReorderBufferTXN *txn,
92  XLogRecPtr commit_lsn);
94  ReorderBufferTXN *txn,
95  Relation relation,
96  ReorderBufferChange *change);
98  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
99  bool transactional, const char *prefix,
100  Size sz, const char *message);
102  ReorderBufferTXN *txn,
103  int nrelations, Relation relations[],
104  ReorderBufferChange *change);
105 
106 void
107 _PG_init(void)
108 {
109  /* other plugins can perform things here */
110 }
111 
112 /* specify output plugin callbacks */
113 void
115 {
117 
133 }
134 
135 
136 /* initialize this plugin */
137 static void
139  bool is_init)
140 {
141  ListCell *option;
142  TestDecodingData *data;
143  bool enable_streaming = false;
144 
145  data = palloc0(sizeof(TestDecodingData));
147  "text conversion context",
149  data->include_xids = true;
150  data->include_timestamp = false;
151  data->skip_empty_xacts = false;
152  data->only_local = false;
153 
154  ctx->output_plugin_private = data;
155 
157  opt->receive_rewrites = false;
158 
159  foreach(option, ctx->output_plugin_options)
160  {
161  DefElem *elem = lfirst(option);
162 
163  Assert(elem->arg == NULL || IsA(elem->arg, String));
164 
165  if (strcmp(elem->defname, "include-xids") == 0)
166  {
167  /* if option does not provide a value, it means its value is true */
168  if (elem->arg == NULL)
169  data->include_xids = true;
170  else if (!parse_bool(strVal(elem->arg), &data->include_xids))
171  ereport(ERROR,
172  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
173  errmsg("could not parse value \"%s\" for parameter \"%s\"",
174  strVal(elem->arg), elem->defname)));
175  }
176  else if (strcmp(elem->defname, "include-timestamp") == 0)
177  {
178  if (elem->arg == NULL)
179  data->include_timestamp = true;
180  else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
181  ereport(ERROR,
182  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
183  errmsg("could not parse value \"%s\" for parameter \"%s\"",
184  strVal(elem->arg), elem->defname)));
185  }
186  else if (strcmp(elem->defname, "force-binary") == 0)
187  {
188  bool force_binary;
189 
190  if (elem->arg == NULL)
191  continue;
192  else if (!parse_bool(strVal(elem->arg), &force_binary))
193  ereport(ERROR,
194  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
195  errmsg("could not parse value \"%s\" for parameter \"%s\"",
196  strVal(elem->arg), elem->defname)));
197 
198  if (force_binary)
200  }
201  else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
202  {
203 
204  if (elem->arg == NULL)
205  data->skip_empty_xacts = true;
206  else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
207  ereport(ERROR,
208  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
209  errmsg("could not parse value \"%s\" for parameter \"%s\"",
210  strVal(elem->arg), elem->defname)));
211  }
212  else if (strcmp(elem->defname, "only-local") == 0)
213  {
214 
215  if (elem->arg == NULL)
216  data->only_local = true;
217  else if (!parse_bool(strVal(elem->arg), &data->only_local))
218  ereport(ERROR,
219  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
220  errmsg("could not parse value \"%s\" for parameter \"%s\"",
221  strVal(elem->arg), elem->defname)));
222  }
223  else if (strcmp(elem->defname, "include-rewrites") == 0)
224  {
225 
226  if (elem->arg == NULL)
227  continue;
228  else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
229  ereport(ERROR,
230  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
231  errmsg("could not parse value \"%s\" for parameter \"%s\"",
232  strVal(elem->arg), elem->defname)));
233  }
234  else if (strcmp(elem->defname, "stream-changes") == 0)
235  {
236  if (elem->arg == NULL)
237  continue;
238  else if (!parse_bool(strVal(elem->arg), &enable_streaming))
239  ereport(ERROR,
240  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
241  errmsg("could not parse value \"%s\" for parameter \"%s\"",
242  strVal(elem->arg), elem->defname)));
243  }
244  else
245  {
246  ereport(ERROR,
247  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
248  errmsg("option \"%s\" = \"%s\" is unknown",
249  elem->defname,
250  elem->arg ? strVal(elem->arg) : "(null)")));
251  }
252  }
253 
254  ctx->streaming &= enable_streaming;
255 }
256 
257 /* cleanup this plugin's resources */
258 static void
260 {
262 
263  /* cleanup our own resources via memory context reset */
265 }
266 
267 /* BEGIN callback */
268 static void
270 {
272  TestDecodingTxnData *txndata =
274 
275  txndata->xact_wrote_changes = false;
276  txn->output_plugin_private = txndata;
277 
278  if (data->skip_empty_xacts)
279  return;
280 
281  pg_output_begin(ctx, data, txn, true);
282 }
283 
284 static void
286 {
287  OutputPluginPrepareWrite(ctx, last_write);
288  if (data->include_xids)
289  appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
290  else
291  appendStringInfoString(ctx->out, "BEGIN");
292  OutputPluginWrite(ctx, last_write);
293 }
294 
295 /* COMMIT callback */
296 static void
298  XLogRecPtr commit_lsn)
299 {
302  bool xact_wrote_changes = txndata->xact_wrote_changes;
303 
304  pfree(txndata);
305  txn->output_plugin_private = NULL;
306 
307  if (data->skip_empty_xacts && !xact_wrote_changes)
308  return;
309 
310  OutputPluginPrepareWrite(ctx, true);
311  if (data->include_xids)
312  appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
313  else
314  appendStringInfoString(ctx->out, "COMMIT");
315 
316  if (data->include_timestamp)
317  appendStringInfo(ctx->out, " (at %s)",
319 
320  OutputPluginWrite(ctx, true);
321 }
322 
323 static bool
325  RepOriginId origin_id)
326 {
328 
329  if (data->only_local && origin_id != InvalidRepOriginId)
330  return true;
331  return false;
332 }
333 
334 /*
335  * Print literal `outputstr' already represented as string of type `typid'
336  * into stringbuf `s'.
337  *
338  * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
339  * if standard_conforming_strings were enabled.
340  */
341 static void
342 print_literal(StringInfo s, Oid typid, char *outputstr)
343 {
344  const char *valptr;
345 
346  switch (typid)
347  {
348  case INT2OID:
349  case INT4OID:
350  case INT8OID:
351  case OIDOID:
352  case FLOAT4OID:
353  case FLOAT8OID:
354  case NUMERICOID:
355  /* NB: We don't care about Inf, NaN et al. */
356  appendStringInfoString(s, outputstr);
357  break;
358 
359  case BITOID:
360  case VARBITOID:
361  appendStringInfo(s, "B'%s'", outputstr);
362  break;
363 
364  case BOOLOID:
365  if (strcmp(outputstr, "t") == 0)
366  appendStringInfoString(s, "true");
367  else
368  appendStringInfoString(s, "false");
369  break;
370 
371  default:
372  appendStringInfoChar(s, '\'');
373  for (valptr = outputstr; *valptr; valptr++)
374  {
375  char ch = *valptr;
376 
377  if (SQL_STR_DOUBLE(ch, false))
378  appendStringInfoChar(s, ch);
379  appendStringInfoChar(s, ch);
380  }
381  appendStringInfoChar(s, '\'');
382  break;
383  }
384 }
385 
386 /* print the tuple 'tuple' into the StringInfo s */
387 static void
388 tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
389 {
390  int natt;
391 
392  /* print all columns individually */
393  for (natt = 0; natt < tupdesc->natts; natt++)
394  {
395  Form_pg_attribute attr; /* the attribute itself */
396  Oid typid; /* type of current attribute */
397  Oid typoutput; /* output function */
398  bool typisvarlena;
399  Datum origval; /* possibly toasted Datum */
400  bool isnull; /* column is null? */
401 
402  attr = TupleDescAttr(tupdesc, natt);
403 
404  /*
405  * don't print dropped columns, we can't be sure everything is
406  * available for them
407  */
408  if (attr->attisdropped)
409  continue;
410 
411  /*
412  * Don't print system columns, oid will already have been printed if
413  * present.
414  */
415  if (attr->attnum < 0)
416  continue;
417 
418  typid = attr->atttypid;
419 
420  /* get Datum from tuple */
421  origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
422 
423  if (isnull && skip_nulls)
424  continue;
425 
426  /* print attribute name */
427  appendStringInfoChar(s, ' ');
428  appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
429 
430  /* print attribute type */
431  appendStringInfoChar(s, '[');
433  appendStringInfoChar(s, ']');
434 
435  /* query output function */
436  getTypeOutputInfo(typid,
437  &typoutput, &typisvarlena);
438 
439  /* print separator */
440  appendStringInfoChar(s, ':');
441 
442  /* print data */
443  if (isnull)
444  appendStringInfoString(s, "null");
445  else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
446  appendStringInfoString(s, "unchanged-toast-datum");
447  else if (!typisvarlena)
448  print_literal(s, typid,
449  OidOutputFunctionCall(typoutput, origval));
450  else
451  {
452  Datum val; /* definitely detoasted Datum */
453 
454  val = PointerGetDatum(PG_DETOAST_DATUM(origval));
455  print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
456  }
457  }
458 }
459 
460 /*
461  * callback for individual changed tuples
462  */
463 static void
465  Relation relation, ReorderBufferChange *change)
466 {
467  TestDecodingData *data;
468  TestDecodingTxnData *txndata;
469  Form_pg_class class_form;
470  TupleDesc tupdesc;
471  MemoryContext old;
472 
473  data = ctx->output_plugin_private;
474  txndata = txn->output_plugin_private;
475 
476  /* output BEGIN if we haven't yet */
477  if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
478  {
479  pg_output_begin(ctx, data, txn, false);
480  }
481  txndata->xact_wrote_changes = true;
482 
483  class_form = RelationGetForm(relation);
484  tupdesc = RelationGetDescr(relation);
485 
486  /* Avoid leaking memory by using and resetting our own context */
487  old = MemoryContextSwitchTo(data->context);
488 
489  OutputPluginPrepareWrite(ctx, true);
490 
491  appendStringInfoString(ctx->out, "table ");
494  class_form->relrewrite ?
495  get_rel_name(class_form->relrewrite) :
496  NameStr(class_form->relname)));
497  appendStringInfoChar(ctx->out, ':');
498 
499  switch (change->action)
500  {
502  appendStringInfoString(ctx->out, " INSERT:");
503  if (change->data.tp.newtuple == NULL)
504  appendStringInfoString(ctx->out, " (no-tuple-data)");
505  else
506  tuple_to_stringinfo(ctx->out, tupdesc,
507  &change->data.tp.newtuple->tuple,
508  false);
509  break;
511  appendStringInfoString(ctx->out, " UPDATE:");
512  if (change->data.tp.oldtuple != NULL)
513  {
514  appendStringInfoString(ctx->out, " old-key:");
515  tuple_to_stringinfo(ctx->out, tupdesc,
516  &change->data.tp.oldtuple->tuple,
517  true);
518  appendStringInfoString(ctx->out, " new-tuple:");
519  }
520 
521  if (change->data.tp.newtuple == NULL)
522  appendStringInfoString(ctx->out, " (no-tuple-data)");
523  else
524  tuple_to_stringinfo(ctx->out, tupdesc,
525  &change->data.tp.newtuple->tuple,
526  false);
527  break;
529  appendStringInfoString(ctx->out, " DELETE:");
530 
531  /* if there was no PK, we only know that a delete happened */
532  if (change->data.tp.oldtuple == NULL)
533  appendStringInfoString(ctx->out, " (no-tuple-data)");
534  /* In DELETE, only the replica identity is present; display that */
535  else
536  tuple_to_stringinfo(ctx->out, tupdesc,
537  &change->data.tp.oldtuple->tuple,
538  true);
539  break;
540  default:
541  Assert(false);
542  }
543 
546 
547  OutputPluginWrite(ctx, true);
548 }
549 
550 static void
552  int nrelations, Relation relations[], ReorderBufferChange *change)
553 {
554  TestDecodingData *data;
555  TestDecodingTxnData *txndata;
556  MemoryContext old;
557  int i;
558 
559  data = ctx->output_plugin_private;
560  txndata = txn->output_plugin_private;
561 
562  /* output BEGIN if we haven't yet */
563  if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
564  {
565  pg_output_begin(ctx, data, txn, false);
566  }
567  txndata->xact_wrote_changes = true;
568 
569  /* Avoid leaking memory by using and resetting our own context */
570  old = MemoryContextSwitchTo(data->context);
571 
572  OutputPluginPrepareWrite(ctx, true);
573 
574  appendStringInfoString(ctx->out, "table ");
575 
576  for (i = 0; i < nrelations; i++)
577  {
578  if (i > 0)
579  appendStringInfoString(ctx->out, ", ");
580 
582  quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
583  NameStr(relations[i]->rd_rel->relname)));
584  }
585 
586  appendStringInfoString(ctx->out, ": TRUNCATE:");
587 
588  if (change->data.truncate.restart_seqs
589  || change->data.truncate.cascade)
590  {
591  if (change->data.truncate.restart_seqs)
592  appendStringInfoString(ctx->out, " restart_seqs");
593  if (change->data.truncate.cascade)
594  appendStringInfoString(ctx->out, " cascade");
595  }
596  else
597  appendStringInfoString(ctx->out, " (no-flags)");
598 
601 
602  OutputPluginWrite(ctx, true);
603 }
604 
605 static void
607  ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
608  const char *prefix, Size sz, const char *message)
609 {
610  OutputPluginPrepareWrite(ctx, true);
611  appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
612  transactional, prefix, sz);
613  appendBinaryStringInfo(ctx->out, message, sz);
614  OutputPluginWrite(ctx, true);
615 }
616 
617 static void
619  ReorderBufferTXN *txn)
620 {
623 
624  /*
625  * Allocate the txn plugin data for the first stream in the transaction.
626  */
627  if (txndata == NULL)
628  {
629  txndata =
631  txndata->xact_wrote_changes = false;
632  txn->output_plugin_private = txndata;
633  }
634 
635  txndata->stream_wrote_changes = false;
636  if (data->skip_empty_xacts)
637  return;
638  pg_output_stream_start(ctx, data, txn, true);
639 }
640 
641 static void
643 {
644  OutputPluginPrepareWrite(ctx, last_write);
645  if (data->include_xids)
646  appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
647  else
648  appendStringInfoString(ctx->out, "opening a streamed block for transaction");
649  OutputPluginWrite(ctx, last_write);
650 }
651 
652 static void
654  ReorderBufferTXN *txn)
655 {
658 
659  if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
660  return;
661 
662  OutputPluginPrepareWrite(ctx, true);
663  if (data->include_xids)
664  appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
665  else
666  appendStringInfoString(ctx->out, "closing a streamed block for transaction");
667  OutputPluginWrite(ctx, true);
668 }
669 
670 static void
672  ReorderBufferTXN *txn,
673  XLogRecPtr abort_lsn)
674 {
676 
677  /*
678  * stream abort can be sent for an individual subtransaction but we
679  * maintain the output_plugin_private only under the toptxn so if this is
680  * not the toptxn then fetch the toptxn.
681  */
682  ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
683  TestDecodingTxnData *txndata = toptxn->output_plugin_private;
684  bool xact_wrote_changes = txndata->xact_wrote_changes;
685 
686  if (txn->toptxn == NULL)
687  {
688  Assert(txn->output_plugin_private != NULL);
689  pfree(txndata);
690  txn->output_plugin_private = NULL;
691  }
692 
693  if (data->skip_empty_xacts && !xact_wrote_changes)
694  return;
695 
696  OutputPluginPrepareWrite(ctx, true);
697  if (data->include_xids)
698  appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
699  else
700  appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
701  OutputPluginWrite(ctx, true);
702 }
703 
704 static void
706  ReorderBufferTXN *txn,
707  XLogRecPtr commit_lsn)
708 {
711  bool xact_wrote_changes = txndata->xact_wrote_changes;
712 
713  pfree(txndata);
714  txn->output_plugin_private = NULL;
715 
716  if (data->skip_empty_xacts && !xact_wrote_changes)
717  return;
718 
719  OutputPluginPrepareWrite(ctx, true);
720 
721  if (data->include_xids)
722  appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
723  else
724  appendStringInfoString(ctx->out, "committing streamed transaction");
725 
726  if (data->include_timestamp)
727  appendStringInfo(ctx->out, " (at %s)",
729 
730  OutputPluginWrite(ctx, true);
731 }
732 
733 /*
734  * In streaming mode, we don't display the changes as the transaction can abort
735  * at a later point in time. We don't want users to see the changes until the
736  * transaction is committed.
737  */
738 static void
740  ReorderBufferTXN *txn,
741  Relation relation,
742  ReorderBufferChange *change)
743 {
746 
747  /* output stream start if we haven't yet */
748  if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
749  {
750  pg_output_stream_start(ctx, data, txn, false);
751  }
752  txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
753 
754  OutputPluginPrepareWrite(ctx, true);
755  if (data->include_xids)
756  appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
757  else
758  appendStringInfoString(ctx->out, "streaming change for transaction");
759  OutputPluginWrite(ctx, true);
760 }
761 
762 /*
763  * In streaming mode, we don't display the contents for transactional messages
764  * as the transaction can abort at a later point in time. We don't want users to
765  * see the message contents until the transaction is committed.
766  */
767 static void
769  ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
770  const char *prefix, Size sz, const char *message)
771 {
772  OutputPluginPrepareWrite(ctx, true);
773 
774  if (transactional)
775  {
776  appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
777  transactional, prefix, sz);
778  }
779  else
780  {
781  appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
782  transactional, prefix, sz);
783  appendBinaryStringInfo(ctx->out, message, sz);
784  }
785 
786  OutputPluginWrite(ctx, true);
787 }
788 
789 /*
790  * In streaming mode, we don't display the detailed information of Truncate.
791  * See pg_decode_stream_change.
792  */
793 static void
795  int nrelations, Relation relations[],
796  ReorderBufferChange *change)
797 {
800 
801  if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
802  {
803  pg_output_stream_start(ctx, data, txn, false);
804  }
805  txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
806 
807  OutputPluginPrepareWrite(ctx, true);
808  if (data->include_xids)
809  appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
810  else
811  appendStringInfoString(ctx->out, "streaming truncate for transaction");
812  OutputPluginWrite(ctx, true);
813 }
static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
LogicalDecodeTruncateCB truncate_cb
static void pg_decode_shutdown(LogicalDecodingContext *ctx)
TimestampTz commit_time
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: postgres.h:314
static void pg_decode_stream_abort(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
#define IsA(nodeptr, _type_)
Definition: nodes.h:578
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:212
#define AllocSetContextCreate
Definition: memutils.h:170
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:2789
static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:10934
#define RelationGetDescr(relation)
Definition: rel.h:483
static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
#define PointerGetDatum(X)
Definition: postgres.h:556
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define RelationGetForm(relation)
Definition: rel.h:451
PG_MODULE_MAGIC
Definition: test_decoding.c:25
LogicalDecodeMessageCB message_cb
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1869
LogicalDecodeStreamMessageCB stream_message_cb
union ReorderBufferChange::@98 data
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
uint16 RepOriginId
Definition: xlogdefs.h:58
#define strVal(v)
Definition: value.h:54
static void pg_decode_stream_commit(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
int errcode(int sqlerrcode)
Definition: elog.c:691
void * output_plugin_private
Definition: logical.h:75
char * format_type_be(Oid type_oid)
Definition: format_type.c:339
MemoryContext context
Definition: logical.h:35
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:137
LogicalDecodeStreamAbortCB stream_abort_cb
List * output_plugin_options
Definition: logical.h:58
bool parse_bool(const char *value, bool *result)
Definition: bool.c:30
unsigned int Oid
Definition: postgres_ext.h:31
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
OutputPluginOutputType output_type
Definition: output_plugin.h:28
void pfree(void *pointer)
Definition: mcxt.c:1057
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
static void print_literal(StringInfo s, Oid typid, char *outputstr)
#define ERROR
Definition: elog.h:43
LogicalDecodeCommitCB commit_cb
static void pg_decode_stream_stop(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:36
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
static void pg_decode_stream_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3196
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:193
struct ReorderBufferTXN * toptxn
static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Node * arg
Definition: parsenodes.h:734
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:188
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:11018
MemoryContext context
Definition: test_decoding.c:33
#define heap_getattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:762
static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void * palloc0(Size size)
Definition: mcxt.c:981
LogicalDecodeChangeCB change_cb
uintptr_t Datum
Definition: postgres.h:367
LogicalDecodeStreamTruncateCB stream_truncate_cb
TransactionId xid
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:840
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
#define ereport(elevel,...)
Definition: elog.h:155
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:585
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:800
#define lfirst(lc)
Definition: pg_list.h:169
size_t Size
Definition: c.h:528
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
LogicalDecodeShutdownCB shutdown_cb
LogicalDecodeStreamCommitCB stream_commit_cb
void _PG_init(void)
LogicalDecodeStartupCB startup_cb
#define InvalidRepOriginId
Definition: origin.h:33
FormData_pg_class * Form_pg_class
Definition: pg_class.h:153
static void pg_decode_stream_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
LogicalDecodeStreamStartCB stream_start_cb
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1657
int errmsg(const char *fmt,...)
Definition: elog.c:902
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:598
struct ReorderBufferChange::@98::@100 truncate
StringInfo out
Definition: logical.h:70
struct ReorderBufferChange::@98::@99 tp
int i
#define NameStr(name)
Definition: c.h:677
#define SQL_STR_DOUBLE(ch, escape_backslash)
Definition: c.h:1160
LogicalDecodeBeginCB begin_cb
#define PG_DETOAST_DATUM(datum)
Definition: fmgr.h:240
char * defname
Definition: parsenodes.h:733
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1845
LogicalDecodeStreamChangeCB stream_change_cb
void * output_plugin_private
#define RelationGetRelid(relation)
Definition: rel.h:457
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227
long val
Definition: informix.c:664
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1772
static void pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:959