PostgreSQL Source Code  git master
test_decoding.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * test_decoding.c
4  * example logical decoding output plugin
5  *
6  * Copyright (c) 2012-2022, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/test_decoding/test_decoding.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "catalog/pg_type.h"
16 
17 #include "replication/logical.h"
18 #include "replication/origin.h"
19 
20 #include "utils/builtins.h"
21 #include "utils/lsyscache.h"
22 #include "utils/memutils.h"
23 #include "utils/rel.h"
24 
26 
27 /* These must be available to dlsym() */
28 extern void _PG_init(void);
30 
31 typedef struct
32 {
37  bool only_local;
39 
40 /*
41  * Maintain the per-transaction level variables to track whether the
42  * transaction and or streams have written any changes. In streaming mode the
43  * transaction can be decoded in streams so along with maintaining whether the
44  * transaction has written any changes, we also need to track whether the
45  * current stream has written any changes. This is required so that if user
46  * has requested to skip the empty transactions we can skip the empty streams
47  * even though the transaction has written some changes.
48  */
49 typedef struct
50 {
54 
56  bool is_init);
59  ReorderBufferTXN *txn);
62  ReorderBufferTXN *txn,
63  bool last_write);
65  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
67  ReorderBufferTXN *txn, Relation rel,
68  ReorderBufferChange *change);
70  ReorderBufferTXN *txn,
71  int nrelations, Relation relations[],
72  ReorderBufferChange *change);
74  RepOriginId origin_id);
76  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
77  bool transactional, const char *prefix,
78  Size sz, const char *message);
80  TransactionId xid,
81  const char *gid);
83  ReorderBufferTXN *txn);
85  ReorderBufferTXN *txn,
86  XLogRecPtr prepare_lsn);
88  ReorderBufferTXN *txn,
89  XLogRecPtr commit_lsn);
91  ReorderBufferTXN *txn,
92  XLogRecPtr prepare_end_lsn,
93  TimestampTz prepare_time);
95  ReorderBufferTXN *txn);
98  ReorderBufferTXN *txn,
99  bool last_write);
101  ReorderBufferTXN *txn);
103  ReorderBufferTXN *txn,
104  XLogRecPtr abort_lsn);
106  ReorderBufferTXN *txn,
107  XLogRecPtr prepare_lsn);
109  ReorderBufferTXN *txn,
110  XLogRecPtr commit_lsn);
112  ReorderBufferTXN *txn,
113  Relation relation,
114  ReorderBufferChange *change);
116  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
117  bool transactional, const char *prefix,
118  Size sz, const char *message);
120  ReorderBufferTXN *txn,
121  int nrelations, Relation relations[],
122  ReorderBufferChange *change);
123 
124 void
125 _PG_init(void)
126 {
127  /* other plugins can perform things here */
128 }
129 
130 /* specify output plugin callbacks */
131 void
133 {
135 
157 }
158 
159 
160 /* initialize this plugin */
161 static void
163  bool is_init)
164 {
165  ListCell *option;
167  bool enable_streaming = false;
168 
169  data = palloc0(sizeof(TestDecodingData));
170  data->context = AllocSetContextCreate(ctx->context,
171  "text conversion context",
173  data->include_xids = true;
174  data->include_timestamp = false;
175  data->skip_empty_xacts = false;
176  data->only_local = false;
177 
179 
181  opt->receive_rewrites = false;
182 
183  foreach(option, ctx->output_plugin_options)
184  {
185  DefElem *elem = lfirst(option);
186 
187  Assert(elem->arg == NULL || IsA(elem->arg, String));
188 
189  if (strcmp(elem->defname, "include-xids") == 0)
190  {
191  /* if option does not provide a value, it means its value is true */
192  if (elem->arg == NULL)
193  data->include_xids = true;
194  else if (!parse_bool(strVal(elem->arg), &data->include_xids))
195  ereport(ERROR,
196  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
197  errmsg("could not parse value \"%s\" for parameter \"%s\"",
198  strVal(elem->arg), elem->defname)));
199  }
200  else if (strcmp(elem->defname, "include-timestamp") == 0)
201  {
202  if (elem->arg == NULL)
203  data->include_timestamp = true;
204  else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
205  ereport(ERROR,
206  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
207  errmsg("could not parse value \"%s\" for parameter \"%s\"",
208  strVal(elem->arg), elem->defname)));
209  }
210  else if (strcmp(elem->defname, "force-binary") == 0)
211  {
212  bool force_binary;
213 
214  if (elem->arg == NULL)
215  continue;
216  else if (!parse_bool(strVal(elem->arg), &force_binary))
217  ereport(ERROR,
218  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
219  errmsg("could not parse value \"%s\" for parameter \"%s\"",
220  strVal(elem->arg), elem->defname)));
221 
222  if (force_binary)
224  }
225  else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
226  {
227 
228  if (elem->arg == NULL)
229  data->skip_empty_xacts = true;
230  else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
231  ereport(ERROR,
232  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
233  errmsg("could not parse value \"%s\" for parameter \"%s\"",
234  strVal(elem->arg), elem->defname)));
235  }
236  else if (strcmp(elem->defname, "only-local") == 0)
237  {
238 
239  if (elem->arg == NULL)
240  data->only_local = true;
241  else if (!parse_bool(strVal(elem->arg), &data->only_local))
242  ereport(ERROR,
243  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
244  errmsg("could not parse value \"%s\" for parameter \"%s\"",
245  strVal(elem->arg), elem->defname)));
246  }
247  else if (strcmp(elem->defname, "include-rewrites") == 0)
248  {
249 
250  if (elem->arg == NULL)
251  continue;
252  else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
253  ereport(ERROR,
254  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
255  errmsg("could not parse value \"%s\" for parameter \"%s\"",
256  strVal(elem->arg), elem->defname)));
257  }
258  else if (strcmp(elem->defname, "stream-changes") == 0)
259  {
260  if (elem->arg == NULL)
261  continue;
262  else if (!parse_bool(strVal(elem->arg), &enable_streaming))
263  ereport(ERROR,
264  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
265  errmsg("could not parse value \"%s\" for parameter \"%s\"",
266  strVal(elem->arg), elem->defname)));
267  }
268  else
269  {
270  ereport(ERROR,
271  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
272  errmsg("option \"%s\" = \"%s\" is unknown",
273  elem->defname,
274  elem->arg ? strVal(elem->arg) : "(null)")));
275  }
276  }
277 
278  ctx->streaming &= enable_streaming;
279 }
280 
281 /* cleanup this plugin's resources */
282 static void
284 {
286 
287  /* cleanup our own resources via memory context reset */
288  MemoryContextDelete(data->context);
289 }
290 
291 /* BEGIN callback */
292 static void
294 {
296  TestDecodingTxnData *txndata =
298 
299  txndata->xact_wrote_changes = false;
300  txn->output_plugin_private = txndata;
301 
302  /*
303  * If asked to skip empty transactions, we'll emit BEGIN at the point
304  * where the first operation is received for this transaction.
305  */
306  if (data->skip_empty_xacts)
307  return;
308 
309  pg_output_begin(ctx, data, txn, true);
310 }
311 
312 static void
314 {
315  OutputPluginPrepareWrite(ctx, last_write);
316  if (data->include_xids)
317  appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
318  else
319  appendStringInfoString(ctx->out, "BEGIN");
320  OutputPluginWrite(ctx, last_write);
321 }
322 
323 /* COMMIT callback */
324 static void
326  XLogRecPtr commit_lsn)
327 {
330  bool xact_wrote_changes = txndata->xact_wrote_changes;
331 
332  pfree(txndata);
333  txn->output_plugin_private = NULL;
334 
335  if (data->skip_empty_xacts && !xact_wrote_changes)
336  return;
337 
338  OutputPluginPrepareWrite(ctx, true);
339  if (data->include_xids)
340  appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
341  else
342  appendStringInfoString(ctx->out, "COMMIT");
343 
344  if (data->include_timestamp)
345  appendStringInfo(ctx->out, " (at %s)",
347 
348  OutputPluginWrite(ctx, true);
349 }
350 
351 /* BEGIN PREPARE callback */
352 static void
354 {
356  TestDecodingTxnData *txndata =
358 
359  txndata->xact_wrote_changes = false;
360  txn->output_plugin_private = txndata;
361 
362  /*
363  * If asked to skip empty transactions, we'll emit BEGIN at the point
364  * where the first operation is received for this transaction.
365  */
366  if (data->skip_empty_xacts)
367  return;
368 
369  pg_output_begin(ctx, data, txn, true);
370 }
371 
372 /* PREPARE callback */
373 static void
375  XLogRecPtr prepare_lsn)
376 {
379 
380  /*
381  * If asked to skip empty transactions, we'll emit PREPARE at the point
382  * where the first operation is received for this transaction.
383  */
384  if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
385  return;
386 
387  OutputPluginPrepareWrite(ctx, true);
388 
389  appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
390  quote_literal_cstr(txn->gid));
391 
392  if (data->include_xids)
393  appendStringInfo(ctx->out, ", txid %u", txn->xid);
394 
395  if (data->include_timestamp)
396  appendStringInfo(ctx->out, " (at %s)",
398 
399  OutputPluginWrite(ctx, true);
400 }
401 
402 /* COMMIT PREPARED callback */
403 static void
405  XLogRecPtr commit_lsn)
406 {
408 
409  OutputPluginPrepareWrite(ctx, true);
410 
411  appendStringInfo(ctx->out, "COMMIT PREPARED %s",
412  quote_literal_cstr(txn->gid));
413 
414  if (data->include_xids)
415  appendStringInfo(ctx->out, ", txid %u", txn->xid);
416 
417  if (data->include_timestamp)
418  appendStringInfo(ctx->out, " (at %s)",
420 
421  OutputPluginWrite(ctx, true);
422 }
423 
424 /* ROLLBACK PREPARED callback */
425 static void
427  ReorderBufferTXN *txn,
428  XLogRecPtr prepare_end_lsn,
429  TimestampTz prepare_time)
430 {
432 
433  OutputPluginPrepareWrite(ctx, true);
434 
435  appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
436  quote_literal_cstr(txn->gid));
437 
438  if (data->include_xids)
439  appendStringInfo(ctx->out, ", txid %u", txn->xid);
440 
441  if (data->include_timestamp)
442  appendStringInfo(ctx->out, " (at %s)",
444 
445  OutputPluginWrite(ctx, true);
446 }
447 
448 /*
449  * Filter out two-phase transactions.
450  *
451  * Each plugin can implement its own filtering logic. Here we demonstrate a
452  * simple logic by checking the GID. If the GID contains the "_nodecode"
453  * substring, then we filter it out.
454  */
455 static bool
457  const char *gid)
458 {
459  if (strstr(gid, "_nodecode") != NULL)
460  return true;
461 
462  return false;
463 }
464 
465 static bool
467  RepOriginId origin_id)
468 {
470 
471  if (data->only_local && origin_id != InvalidRepOriginId)
472  return true;
473  return false;
474 }
475 
476 /*
477  * Print literal `outputstr' already represented as string of type `typid'
478  * into stringbuf `s'.
479  *
480  * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
481  * if standard_conforming_strings were enabled.
482  */
483 static void
484 print_literal(StringInfo s, Oid typid, char *outputstr)
485 {
486  const char *valptr;
487 
488  switch (typid)
489  {
490  case INT2OID:
491  case INT4OID:
492  case INT8OID:
493  case OIDOID:
494  case FLOAT4OID:
495  case FLOAT8OID:
496  case NUMERICOID:
497  /* NB: We don't care about Inf, NaN et al. */
498  appendStringInfoString(s, outputstr);
499  break;
500 
501  case BITOID:
502  case VARBITOID:
503  appendStringInfo(s, "B'%s'", outputstr);
504  break;
505 
506  case BOOLOID:
507  if (strcmp(outputstr, "t") == 0)
508  appendStringInfoString(s, "true");
509  else
510  appendStringInfoString(s, "false");
511  break;
512 
513  default:
514  appendStringInfoChar(s, '\'');
515  for (valptr = outputstr; *valptr; valptr++)
516  {
517  char ch = *valptr;
518 
519  if (SQL_STR_DOUBLE(ch, false))
520  appendStringInfoChar(s, ch);
521  appendStringInfoChar(s, ch);
522  }
523  appendStringInfoChar(s, '\'');
524  break;
525  }
526 }
527 
528 /* print the tuple 'tuple' into the StringInfo s */
529 static void
530 tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
531 {
532  int natt;
533 
534  /* print all columns individually */
535  for (natt = 0; natt < tupdesc->natts; natt++)
536  {
537  Form_pg_attribute attr; /* the attribute itself */
538  Oid typid; /* type of current attribute */
539  Oid typoutput; /* output function */
540  bool typisvarlena;
541  Datum origval; /* possibly toasted Datum */
542  bool isnull; /* column is null? */
543 
544  attr = TupleDescAttr(tupdesc, natt);
545 
546  /*
547  * don't print dropped columns, we can't be sure everything is
548  * available for them
549  */
550  if (attr->attisdropped)
551  continue;
552 
553  /*
554  * Don't print system columns, oid will already have been printed if
555  * present.
556  */
557  if (attr->attnum < 0)
558  continue;
559 
560  typid = attr->atttypid;
561 
562  /* get Datum from tuple */
563  origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
564 
565  if (isnull && skip_nulls)
566  continue;
567 
568  /* print attribute name */
569  appendStringInfoChar(s, ' ');
570  appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
571 
572  /* print attribute type */
573  appendStringInfoChar(s, '[');
575  appendStringInfoChar(s, ']');
576 
577  /* query output function */
578  getTypeOutputInfo(typid,
579  &typoutput, &typisvarlena);
580 
581  /* print separator */
582  appendStringInfoChar(s, ':');
583 
584  /* print data */
585  if (isnull)
586  appendStringInfoString(s, "null");
587  else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
588  appendStringInfoString(s, "unchanged-toast-datum");
589  else if (!typisvarlena)
590  print_literal(s, typid,
591  OidOutputFunctionCall(typoutput, origval));
592  else
593  {
594  Datum val; /* definitely detoasted Datum */
595 
597  print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
598  }
599  }
600 }
601 
602 /*
603  * callback for individual changed tuples
604  */
605 static void
607  Relation relation, ReorderBufferChange *change)
608 {
610  TestDecodingTxnData *txndata;
611  Form_pg_class class_form;
612  TupleDesc tupdesc;
613  MemoryContext old;
614 
616  txndata = txn->output_plugin_private;
617 
618  /* output BEGIN if we haven't yet */
619  if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
620  {
621  pg_output_begin(ctx, data, txn, false);
622  }
623  txndata->xact_wrote_changes = true;
624 
625  class_form = RelationGetForm(relation);
626  tupdesc = RelationGetDescr(relation);
627 
628  /* Avoid leaking memory by using and resetting our own context */
629  old = MemoryContextSwitchTo(data->context);
630 
631  OutputPluginPrepareWrite(ctx, true);
632 
633  appendStringInfoString(ctx->out, "table ");
636  class_form->relrewrite ?
637  get_rel_name(class_form->relrewrite) :
638  NameStr(class_form->relname)));
639  appendStringInfoChar(ctx->out, ':');
640 
641  switch (change->action)
642  {
644  appendStringInfoString(ctx->out, " INSERT:");
645  if (change->data.tp.newtuple == NULL)
646  appendStringInfoString(ctx->out, " (no-tuple-data)");
647  else
648  tuple_to_stringinfo(ctx->out, tupdesc,
649  &change->data.tp.newtuple->tuple,
650  false);
651  break;
653  appendStringInfoString(ctx->out, " UPDATE:");
654  if (change->data.tp.oldtuple != NULL)
655  {
656  appendStringInfoString(ctx->out, " old-key:");
657  tuple_to_stringinfo(ctx->out, tupdesc,
658  &change->data.tp.oldtuple->tuple,
659  true);
660  appendStringInfoString(ctx->out, " new-tuple:");
661  }
662 
663  if (change->data.tp.newtuple == NULL)
664  appendStringInfoString(ctx->out, " (no-tuple-data)");
665  else
666  tuple_to_stringinfo(ctx->out, tupdesc,
667  &change->data.tp.newtuple->tuple,
668  false);
669  break;
671  appendStringInfoString(ctx->out, " DELETE:");
672 
673  /* if there was no PK, we only know that a delete happened */
674  if (change->data.tp.oldtuple == NULL)
675  appendStringInfoString(ctx->out, " (no-tuple-data)");
676  /* In DELETE, only the replica identity is present; display that */
677  else
678  tuple_to_stringinfo(ctx->out, tupdesc,
679  &change->data.tp.oldtuple->tuple,
680  true);
681  break;
682  default:
683  Assert(false);
684  }
685 
687  MemoryContextReset(data->context);
688 
689  OutputPluginWrite(ctx, true);
690 }
691 
692 static void
694  int nrelations, Relation relations[], ReorderBufferChange *change)
695 {
697  TestDecodingTxnData *txndata;
698  MemoryContext old;
699  int i;
700 
702  txndata = txn->output_plugin_private;
703 
704  /* output BEGIN if we haven't yet */
705  if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
706  {
707  pg_output_begin(ctx, data, txn, false);
708  }
709  txndata->xact_wrote_changes = true;
710 
711  /* Avoid leaking memory by using and resetting our own context */
712  old = MemoryContextSwitchTo(data->context);
713 
714  OutputPluginPrepareWrite(ctx, true);
715 
716  appendStringInfoString(ctx->out, "table ");
717 
718  for (i = 0; i < nrelations; i++)
719  {
720  if (i > 0)
721  appendStringInfoString(ctx->out, ", ");
722 
724  quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
725  NameStr(relations[i]->rd_rel->relname)));
726  }
727 
728  appendStringInfoString(ctx->out, ": TRUNCATE:");
729 
730  if (change->data.truncate.restart_seqs
731  || change->data.truncate.cascade)
732  {
733  if (change->data.truncate.restart_seqs)
734  appendStringInfoString(ctx->out, " restart_seqs");
735  if (change->data.truncate.cascade)
736  appendStringInfoString(ctx->out, " cascade");
737  }
738  else
739  appendStringInfoString(ctx->out, " (no-flags)");
740 
742  MemoryContextReset(data->context);
743 
744  OutputPluginWrite(ctx, true);
745 }
746 
747 static void
749  ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
750  const char *prefix, Size sz, const char *message)
751 {
752  OutputPluginPrepareWrite(ctx, true);
753  appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
754  transactional, prefix, sz);
755  appendBinaryStringInfo(ctx->out, message, sz);
756  OutputPluginWrite(ctx, true);
757 }
758 
759 static void
761  ReorderBufferTXN *txn)
762 {
765 
766  /*
767  * Allocate the txn plugin data for the first stream in the transaction.
768  */
769  if (txndata == NULL)
770  {
771  txndata =
773  txndata->xact_wrote_changes = false;
774  txn->output_plugin_private = txndata;
775  }
776 
777  txndata->stream_wrote_changes = false;
778  if (data->skip_empty_xacts)
779  return;
780  pg_output_stream_start(ctx, data, txn, true);
781 }
782 
783 static void
785 {
786  OutputPluginPrepareWrite(ctx, last_write);
787  if (data->include_xids)
788  appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
789  else
790  appendStringInfoString(ctx->out, "opening a streamed block for transaction");
791  OutputPluginWrite(ctx, last_write);
792 }
793 
794 static void
796  ReorderBufferTXN *txn)
797 {
800 
801  if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
802  return;
803 
804  OutputPluginPrepareWrite(ctx, true);
805  if (data->include_xids)
806  appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
807  else
808  appendStringInfoString(ctx->out, "closing a streamed block for transaction");
809  OutputPluginWrite(ctx, true);
810 }
811 
812 static void
814  ReorderBufferTXN *txn,
815  XLogRecPtr abort_lsn)
816 {
818 
819  /*
820  * stream abort can be sent for an individual subtransaction but we
821  * maintain the output_plugin_private only under the toptxn so if this is
822  * not the toptxn then fetch the toptxn.
823  */
824  ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
825  TestDecodingTxnData *txndata = toptxn->output_plugin_private;
826  bool xact_wrote_changes = txndata->xact_wrote_changes;
827 
828  if (txn->toptxn == NULL)
829  {
830  Assert(txn->output_plugin_private != NULL);
831  pfree(txndata);
832  txn->output_plugin_private = NULL;
833  }
834 
835  if (data->skip_empty_xacts && !xact_wrote_changes)
836  return;
837 
838  OutputPluginPrepareWrite(ctx, true);
839  if (data->include_xids)
840  appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
841  else
842  appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
843  OutputPluginWrite(ctx, true);
844 }
845 
846 static void
848  ReorderBufferTXN *txn,
849  XLogRecPtr prepare_lsn)
850 {
853 
854  if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
855  return;
856 
857  OutputPluginPrepareWrite(ctx, true);
858 
859  if (data->include_xids)
860  appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
861  quote_literal_cstr(txn->gid), txn->xid);
862  else
863  appendStringInfo(ctx->out, "preparing streamed transaction %s",
864  quote_literal_cstr(txn->gid));
865 
866  if (data->include_timestamp)
867  appendStringInfo(ctx->out, " (at %s)",
869 
870  OutputPluginWrite(ctx, true);
871 }
872 
873 static void
875  ReorderBufferTXN *txn,
876  XLogRecPtr commit_lsn)
877 {
880  bool xact_wrote_changes = txndata->xact_wrote_changes;
881 
882  pfree(txndata);
883  txn->output_plugin_private = NULL;
884 
885  if (data->skip_empty_xacts && !xact_wrote_changes)
886  return;
887 
888  OutputPluginPrepareWrite(ctx, true);
889 
890  if (data->include_xids)
891  appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
892  else
893  appendStringInfoString(ctx->out, "committing streamed transaction");
894 
895  if (data->include_timestamp)
896  appendStringInfo(ctx->out, " (at %s)",
898 
899  OutputPluginWrite(ctx, true);
900 }
901 
902 /*
903  * In streaming mode, we don't display the changes as the transaction can abort
904  * at a later point in time. We don't want users to see the changes until the
905  * transaction is committed.
906  */
907 static void
909  ReorderBufferTXN *txn,
910  Relation relation,
911  ReorderBufferChange *change)
912 {
915 
916  /* output stream start if we haven't yet */
917  if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
918  {
919  pg_output_stream_start(ctx, data, txn, false);
920  }
921  txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
922 
923  OutputPluginPrepareWrite(ctx, true);
924  if (data->include_xids)
925  appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
926  else
927  appendStringInfoString(ctx->out, "streaming change for transaction");
928  OutputPluginWrite(ctx, true);
929 }
930 
931 /*
932  * In streaming mode, we don't display the contents for transactional messages
933  * as the transaction can abort at a later point in time. We don't want users to
934  * see the message contents until the transaction is committed.
935  */
936 static void
938  ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
939  const char *prefix, Size sz, const char *message)
940 {
941  OutputPluginPrepareWrite(ctx, true);
942 
943  if (transactional)
944  {
945  appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
946  transactional, prefix, sz);
947  }
948  else
949  {
950  appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
951  transactional, prefix, sz);
952  appendBinaryStringInfo(ctx->out, message, sz);
953  }
954 
955  OutputPluginWrite(ctx, true);
956 }
957 
958 /*
959  * In streaming mode, we don't display the detailed information of Truncate.
960  * See pg_decode_stream_change.
961  */
962 static void
964  int nrelations, Relation relations[],
965  ReorderBufferChange *change)
966 {
969 
970  if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
971  {
972  pg_output_stream_start(ctx, data, txn, false);
973  }
974  txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
975 
976  OutputPluginPrepareWrite(ctx, true);
977  if (data->include_xids)
978  appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
979  else
980  appendStringInfoString(ctx->out, "streaming truncate for transaction");
981  OutputPluginWrite(ctx, true);
982 }
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1768
bool parse_bool(const char *value, bool *result)
Definition: bool.c:30
#define NameStr(name)
Definition: c.h:681
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:963
#define SQL_STR_DOUBLE(ch, escape_backslash)
Definition: c.h:1161
uint32 TransactionId
Definition: c.h:587
size_t Size
Definition: c.h:540
int64 TimestampTz
Definition: timestamp.h:39
int errcode(int sqlerrcode)
Definition: elog.c:693
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define ERROR
Definition: elog.h:33
#define ereport(elevel,...)
Definition: elog.h:143
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1639
#define PG_DETOAST_DATUM(datum)
Definition: fmgr.h:240
char * format_type_be(Oid type_oid)
Definition: format_type.c:343
static Datum heap_getattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
Definition: htup_details.h:788
long val
Definition: informix.c:664
int i
Definition: isn.c:73
Assert(fmt[strlen(fmt) - 1] !='\n')
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:662
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:649
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3326
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:2864
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1933
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1909
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:143
void pfree(void *pointer)
Definition: mcxt.c:1175
void * palloc0(Size size)
Definition: mcxt.c:1099
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:906
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
#define AllocSetContextCreate
Definition: memutils.h:173
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:197
#define IsA(nodeptr, _type_)
Definition: nodes.h:624
#define InvalidRepOriginId
Definition: origin.h:33
@ OUTPUT_PLUGIN_BINARY_OUTPUT
Definition: output_plugin.h:19
@ OUTPUT_PLUGIN_TEXTUAL_OUTPUT
Definition: output_plugin.h:20
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:36
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:207
FormData_pg_class * Form_pg_class
Definition: pg_class.h:153
const void * data
#define lfirst(lc)
Definition: pg_list.h:169
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: postgres.h:327
uintptr_t Datum
Definition: postgres.h:411
#define PointerGetDatum(X)
Definition: postgres.h:600
unsigned int Oid
Definition: postgres_ext.h:31
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:102
#define RelationGetForm(relation)
Definition: rel.h:483
#define RelationGetRelid(relation)
Definition: rel.h:489
#define RelationGetDescr(relation)
Definition: rel.h:515
@ REORDER_BUFFER_CHANGE_INSERT
Definition: reorderbuffer.h:56
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:58
@ REORDER_BUFFER_CHANGE_UPDATE
Definition: reorderbuffer.h:57
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:12192
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:12276
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:188
char * defname
Definition: parsenodes.h:765
Node * arg
Definition: parsenodes.h:766
MemoryContext context
Definition: logical.h:36
StringInfo out
Definition: logical.h:71
void * output_plugin_private
Definition: logical.h:76
List * output_plugin_options
Definition: logical.h:59
LogicalDecodeStreamChangeCB stream_change_cb
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
LogicalDecodeStreamMessageCB stream_message_cb
LogicalDecodeFilterPrepareCB filter_prepare_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeTruncateCB truncate_cb
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
LogicalDecodeStreamPrepareCB stream_prepare_cb
LogicalDecodeCommitPreparedCB commit_prepared_cb
LogicalDecodeStreamStartCB stream_start_cb
LogicalDecodePrepareCB prepare_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeCommitCB commit_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamAbortCB stream_abort_cb
LogicalDecodeBeginPrepareCB begin_prepare_cb
LogicalDecodeChangeCB change_cb
LogicalDecodeShutdownCB shutdown_cb
OutputPluginOutputType output_type
Definition: output_plugin.h:28
ReorderBufferChangeType action
Definition: reorderbuffer.h:85
struct ReorderBufferChange::@105::@107 truncate
struct ReorderBufferChange::@105::@106 tp
union ReorderBufferChange::@105 data
TimestampTz commit_time
struct ReorderBufferTXN * toptxn
void * output_plugin_private
TimestampTz prepare_time
TransactionId xid
union ReorderBufferTXN::@111 xact_time
Definition: value.h:58
MemoryContext context
Definition: test_decoding.c:33
static void pg_decode_stream_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
static void pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void _PG_init(void)
static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
PG_MODULE_MAGIC
Definition: test_decoding.c:25
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static void print_literal(StringInfo s, Oid typid, char *outputstr)
static void pg_decode_stream_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void pg_decode_shutdown(LogicalDecodingContext *ctx)
static void pg_decode_stream_abort(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
static void pg_decode_stream_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
static void pg_decode_stream_commit(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
static void pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid, const char *gid)
static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
static void pg_decode_stream_stop(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define strVal(v)
Definition: value.h:72
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21