PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
test_decoding.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * test_decoding.c
4 * example logical decoding output plugin
5 *
6 * Copyright (c) 2012-2025, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * contrib/test_decoding/test_decoding.c
10 *
11 *-------------------------------------------------------------------------
12 */
13#include "postgres.h"
14
15#include "catalog/pg_type.h"
16
17#include "replication/logical.h"
18#include "replication/origin.h"
19
20#include "utils/builtins.h"
21#include "utils/lsyscache.h"
22#include "utils/memutils.h"
23#include "utils/rel.h"
24
26 .name = "test_decoding",
27 .version = PG_VERSION
28);
29
30typedef struct
31{
38
39/*
40 * Maintain the per-transaction level variables to track whether the
41 * transaction and or streams have written any changes. In streaming mode the
42 * transaction can be decoded in streams so along with maintaining whether the
43 * transaction has written any changes, we also need to track whether the
44 * current stream has written any changes. This is required so that if user
45 * has requested to skip the empty transactions we can skip the empty streams
46 * even though the transaction has written some changes.
47 */
48typedef struct
49{
53
55 bool is_init);
58 ReorderBufferTXN *txn);
62 bool last_write);
64 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
66 ReorderBufferTXN *txn, Relation relation,
67 ReorderBufferChange *change);
70 int nrelations, Relation relations[],
71 ReorderBufferChange *change);
73 RepOriginId origin_id);
76 bool transactional, const char *prefix,
77 Size sz, const char *message);
79 TransactionId xid,
80 const char *gid);
82 ReorderBufferTXN *txn);
85 XLogRecPtr prepare_lsn);
88 XLogRecPtr commit_lsn);
91 XLogRecPtr prepare_end_lsn,
92 TimestampTz prepare_time);
94 ReorderBufferTXN *txn);
98 bool last_write);
100 ReorderBufferTXN *txn);
102 ReorderBufferTXN *txn,
103 XLogRecPtr abort_lsn);
105 ReorderBufferTXN *txn,
106 XLogRecPtr prepare_lsn);
108 ReorderBufferTXN *txn,
109 XLogRecPtr commit_lsn);
111 ReorderBufferTXN *txn,
112 Relation relation,
113 ReorderBufferChange *change);
116 bool transactional, const char *prefix,
117 Size sz, const char *message);
119 ReorderBufferTXN *txn,
120 int nrelations, Relation relations[],
121 ReorderBufferChange *change);
122
123void
125{
126 /* other plugins can perform things here */
127}
128
129/* specify output plugin callbacks */
130void
132{
154}
155
156
157/* initialize this plugin */
158static void
160 bool is_init)
161{
164 bool enable_streaming = false;
165
166 data = palloc0(sizeof(TestDecodingData));
167 data->context = AllocSetContextCreate(ctx->context,
168 "text conversion context",
170 data->include_xids = true;
171 data->include_timestamp = false;
172 data->skip_empty_xacts = false;
173 data->only_local = false;
174
176
178 opt->receive_rewrites = false;
179
180 foreach(option, ctx->output_plugin_options)
181 {
182 DefElem *elem = lfirst(option);
183
184 Assert(elem->arg == NULL || IsA(elem->arg, String));
185
186 if (strcmp(elem->defname, "include-xids") == 0)
187 {
188 /* if option does not provide a value, it means its value is true */
189 if (elem->arg == NULL)
190 data->include_xids = true;
191 else if (!parse_bool(strVal(elem->arg), &data->include_xids))
193 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
194 errmsg("could not parse value \"%s\" for parameter \"%s\"",
195 strVal(elem->arg), elem->defname)));
196 }
197 else if (strcmp(elem->defname, "include-timestamp") == 0)
198 {
199 if (elem->arg == NULL)
200 data->include_timestamp = true;
201 else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
203 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
204 errmsg("could not parse value \"%s\" for parameter \"%s\"",
205 strVal(elem->arg), elem->defname)));
206 }
207 else if (strcmp(elem->defname, "force-binary") == 0)
208 {
209 bool force_binary;
210
211 if (elem->arg == NULL)
212 continue;
213 else if (!parse_bool(strVal(elem->arg), &force_binary))
215 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
216 errmsg("could not parse value \"%s\" for parameter \"%s\"",
217 strVal(elem->arg), elem->defname)));
218
219 if (force_binary)
221 }
222 else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
223 {
224
225 if (elem->arg == NULL)
226 data->skip_empty_xacts = true;
227 else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
229 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
230 errmsg("could not parse value \"%s\" for parameter \"%s\"",
231 strVal(elem->arg), elem->defname)));
232 }
233 else if (strcmp(elem->defname, "only-local") == 0)
234 {
235
236 if (elem->arg == NULL)
237 data->only_local = true;
238 else if (!parse_bool(strVal(elem->arg), &data->only_local))
240 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
241 errmsg("could not parse value \"%s\" for parameter \"%s\"",
242 strVal(elem->arg), elem->defname)));
243 }
244 else if (strcmp(elem->defname, "include-rewrites") == 0)
245 {
246
247 if (elem->arg == NULL)
248 continue;
249 else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
251 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
252 errmsg("could not parse value \"%s\" for parameter \"%s\"",
253 strVal(elem->arg), elem->defname)));
254 }
255 else if (strcmp(elem->defname, "stream-changes") == 0)
256 {
257 if (elem->arg == NULL)
258 continue;
259 else if (!parse_bool(strVal(elem->arg), &enable_streaming))
261 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
262 errmsg("could not parse value \"%s\" for parameter \"%s\"",
263 strVal(elem->arg), elem->defname)));
264 }
265 else
266 {
268 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
269 errmsg("option \"%s\" = \"%s\" is unknown",
270 elem->defname,
271 elem->arg ? strVal(elem->arg) : "(null)")));
272 }
273 }
274
275 ctx->streaming &= enable_streaming;
276}
277
278/* cleanup this plugin's resources */
279static void
281{
283
284 /* cleanup our own resources via memory context reset */
285 MemoryContextDelete(data->context);
286}
287
288/* BEGIN callback */
289static void
291{
293 TestDecodingTxnData *txndata =
295
296 txndata->xact_wrote_changes = false;
297 txn->output_plugin_private = txndata;
298
299 /*
300 * If asked to skip empty transactions, we'll emit BEGIN at the point
301 * where the first operation is received for this transaction.
302 */
303 if (data->skip_empty_xacts)
304 return;
305
306 pg_output_begin(ctx, data, txn, true);
307}
308
309static void
311{
312 OutputPluginPrepareWrite(ctx, last_write);
313 if (data->include_xids)
314 appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
315 else
316 appendStringInfoString(ctx->out, "BEGIN");
317 OutputPluginWrite(ctx, last_write);
318}
319
320/* COMMIT callback */
321static void
323 XLogRecPtr commit_lsn)
324{
327 bool xact_wrote_changes = txndata->xact_wrote_changes;
328
329 pfree(txndata);
330 txn->output_plugin_private = NULL;
331
332 if (data->skip_empty_xacts && !xact_wrote_changes)
333 return;
334
335 OutputPluginPrepareWrite(ctx, true);
336 if (data->include_xids)
337 appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
338 else
339 appendStringInfoString(ctx->out, "COMMIT");
340
341 if (data->include_timestamp)
342 appendStringInfo(ctx->out, " (at %s)",
344
345 OutputPluginWrite(ctx, true);
346}
347
348/* BEGIN PREPARE callback */
349static void
351{
353 TestDecodingTxnData *txndata =
355
356 txndata->xact_wrote_changes = false;
357 txn->output_plugin_private = txndata;
358
359 /*
360 * If asked to skip empty transactions, we'll emit BEGIN at the point
361 * where the first operation is received for this transaction.
362 */
363 if (data->skip_empty_xacts)
364 return;
365
366 pg_output_begin(ctx, data, txn, true);
367}
368
369/* PREPARE callback */
370static void
372 XLogRecPtr prepare_lsn)
373{
376
377 /*
378 * If asked to skip empty transactions, we'll emit PREPARE at the point
379 * where the first operation is received for this transaction.
380 */
381 if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
382 return;
383
384 OutputPluginPrepareWrite(ctx, true);
385
386 appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
387 quote_literal_cstr(txn->gid));
388
389 if (data->include_xids)
390 appendStringInfo(ctx->out, ", txid %u", txn->xid);
391
392 if (data->include_timestamp)
393 appendStringInfo(ctx->out, " (at %s)",
395
396 OutputPluginWrite(ctx, true);
397}
398
399/* COMMIT PREPARED callback */
400static void
402 XLogRecPtr commit_lsn)
403{
405
406 OutputPluginPrepareWrite(ctx, true);
407
408 appendStringInfo(ctx->out, "COMMIT PREPARED %s",
409 quote_literal_cstr(txn->gid));
410
411 if (data->include_xids)
412 appendStringInfo(ctx->out, ", txid %u", txn->xid);
413
414 if (data->include_timestamp)
415 appendStringInfo(ctx->out, " (at %s)",
417
418 OutputPluginWrite(ctx, true);
419}
420
421/* ROLLBACK PREPARED callback */
422static void
424 ReorderBufferTXN *txn,
425 XLogRecPtr prepare_end_lsn,
426 TimestampTz prepare_time)
427{
429
430 OutputPluginPrepareWrite(ctx, true);
431
432 appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
433 quote_literal_cstr(txn->gid));
434
435 if (data->include_xids)
436 appendStringInfo(ctx->out, ", txid %u", txn->xid);
437
438 if (data->include_timestamp)
439 appendStringInfo(ctx->out, " (at %s)",
441
442 OutputPluginWrite(ctx, true);
443}
444
445/*
446 * Filter out two-phase transactions.
447 *
448 * Each plugin can implement its own filtering logic. Here we demonstrate a
449 * simple logic by checking the GID. If the GID contains the "_nodecode"
450 * substring, then we filter it out.
451 */
452static bool
454 const char *gid)
455{
456 if (strstr(gid, "_nodecode") != NULL)
457 return true;
458
459 return false;
460}
461
462static bool
464 RepOriginId origin_id)
465{
467
468 if (data->only_local && origin_id != InvalidRepOriginId)
469 return true;
470 return false;
471}
472
473/*
474 * Print literal `outputstr' already represented as string of type `typid'
475 * into stringbuf `s'.
476 *
477 * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
478 * if standard_conforming_strings were enabled.
479 */
480static void
481print_literal(StringInfo s, Oid typid, char *outputstr)
482{
483 const char *valptr;
484
485 switch (typid)
486 {
487 case INT2OID:
488 case INT4OID:
489 case INT8OID:
490 case OIDOID:
491 case FLOAT4OID:
492 case FLOAT8OID:
493 case NUMERICOID:
494 /* NB: We don't care about Inf, NaN et al. */
495 appendStringInfoString(s, outputstr);
496 break;
497
498 case BITOID:
499 case VARBITOID:
500 appendStringInfo(s, "B'%s'", outputstr);
501 break;
502
503 case BOOLOID:
504 if (strcmp(outputstr, "t") == 0)
505 appendStringInfoString(s, "true");
506 else
507 appendStringInfoString(s, "false");
508 break;
509
510 default:
511 appendStringInfoChar(s, '\'');
512 for (valptr = outputstr; *valptr; valptr++)
513 {
514 char ch = *valptr;
515
516 if (SQL_STR_DOUBLE(ch, false))
519 }
520 appendStringInfoChar(s, '\'');
521 break;
522 }
523}
524
525/* print the tuple 'tuple' into the StringInfo s */
526static void
527tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
528{
529 int natt;
530
531 /* print all columns individually */
532 for (natt = 0; natt < tupdesc->natts; natt++)
533 {
534 Form_pg_attribute attr; /* the attribute itself */
535 Oid typid; /* type of current attribute */
536 Oid typoutput; /* output function */
537 bool typisvarlena;
538 Datum origval; /* possibly toasted Datum */
539 bool isnull; /* column is null? */
540
541 attr = TupleDescAttr(tupdesc, natt);
542
543 /*
544 * don't print dropped columns, we can't be sure everything is
545 * available for them
546 */
547 if (attr->attisdropped)
548 continue;
549
550 /*
551 * Don't print system columns, oid will already have been printed if
552 * present.
553 */
554 if (attr->attnum < 0)
555 continue;
556
557 typid = attr->atttypid;
558
559 /* get Datum from tuple */
560 origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
561
562 if (isnull && skip_nulls)
563 continue;
564
565 /* print attribute name */
566 appendStringInfoChar(s, ' ');
568
569 /* print attribute type */
570 appendStringInfoChar(s, '[');
572 appendStringInfoChar(s, ']');
573
574 /* query output function */
575 getTypeOutputInfo(typid,
576 &typoutput, &typisvarlena);
577
578 /* print separator */
579 appendStringInfoChar(s, ':');
580
581 /* print data */
582 if (isnull)
583 appendStringInfoString(s, "null");
584 else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
585 appendStringInfoString(s, "unchanged-toast-datum");
586 else if (!typisvarlena)
587 print_literal(s, typid,
588 OidOutputFunctionCall(typoutput, origval));
589 else
590 {
591 Datum val; /* definitely detoasted Datum */
592
594 print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
595 }
596 }
597}
598
599/*
600 * callback for individual changed tuples
601 */
602static void
604 Relation relation, ReorderBufferChange *change)
605{
607 TestDecodingTxnData *txndata;
608 Form_pg_class class_form;
609 TupleDesc tupdesc;
610 MemoryContext old;
611
613 txndata = txn->output_plugin_private;
614
615 /* output BEGIN if we haven't yet */
616 if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
617 {
618 pg_output_begin(ctx, data, txn, false);
619 }
620 txndata->xact_wrote_changes = true;
621
622 class_form = RelationGetForm(relation);
623 tupdesc = RelationGetDescr(relation);
624
625 /* Avoid leaking memory by using and resetting our own context */
626 old = MemoryContextSwitchTo(data->context);
627
628 OutputPluginPrepareWrite(ctx, true);
629
630 appendStringInfoString(ctx->out, "table ");
633 class_form->relrewrite ?
634 get_rel_name(class_form->relrewrite) :
635 NameStr(class_form->relname)));
636 appendStringInfoChar(ctx->out, ':');
637
638 switch (change->action)
639 {
641 appendStringInfoString(ctx->out, " INSERT:");
642 if (change->data.tp.newtuple == NULL)
643 appendStringInfoString(ctx->out, " (no-tuple-data)");
644 else
645 tuple_to_stringinfo(ctx->out, tupdesc,
646 change->data.tp.newtuple,
647 false);
648 break;
650 appendStringInfoString(ctx->out, " UPDATE:");
651 if (change->data.tp.oldtuple != NULL)
652 {
653 appendStringInfoString(ctx->out, " old-key:");
654 tuple_to_stringinfo(ctx->out, tupdesc,
655 change->data.tp.oldtuple,
656 true);
657 appendStringInfoString(ctx->out, " new-tuple:");
658 }
659
660 if (change->data.tp.newtuple == NULL)
661 appendStringInfoString(ctx->out, " (no-tuple-data)");
662 else
663 tuple_to_stringinfo(ctx->out, tupdesc,
664 change->data.tp.newtuple,
665 false);
666 break;
668 appendStringInfoString(ctx->out, " DELETE:");
669
670 /* if there was no PK, we only know that a delete happened */
671 if (change->data.tp.oldtuple == NULL)
672 appendStringInfoString(ctx->out, " (no-tuple-data)");
673 /* In DELETE, only the replica identity is present; display that */
674 else
675 tuple_to_stringinfo(ctx->out, tupdesc,
676 change->data.tp.oldtuple,
677 true);
678 break;
679 default:
680 Assert(false);
681 }
682
684 MemoryContextReset(data->context);
685
686 OutputPluginWrite(ctx, true);
687}
688
689static void
691 int nrelations, Relation relations[], ReorderBufferChange *change)
692{
694 TestDecodingTxnData *txndata;
695 MemoryContext old;
696 int i;
697
699 txndata = txn->output_plugin_private;
700
701 /* output BEGIN if we haven't yet */
702 if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
703 {
704 pg_output_begin(ctx, data, txn, false);
705 }
706 txndata->xact_wrote_changes = true;
707
708 /* Avoid leaking memory by using and resetting our own context */
709 old = MemoryContextSwitchTo(data->context);
710
711 OutputPluginPrepareWrite(ctx, true);
712
713 appendStringInfoString(ctx->out, "table ");
714
715 for (i = 0; i < nrelations; i++)
716 {
717 if (i > 0)
718 appendStringInfoString(ctx->out, ", ");
719
721 quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
722 NameStr(relations[i]->rd_rel->relname)));
723 }
724
725 appendStringInfoString(ctx->out, ": TRUNCATE:");
726
727 if (change->data.truncate.restart_seqs
728 || change->data.truncate.cascade)
729 {
730 if (change->data.truncate.restart_seqs)
731 appendStringInfoString(ctx->out, " restart_seqs");
732 if (change->data.truncate.cascade)
733 appendStringInfoString(ctx->out, " cascade");
734 }
735 else
736 appendStringInfoString(ctx->out, " (no-flags)");
737
739 MemoryContextReset(data->context);
740
741 OutputPluginWrite(ctx, true);
742}
743
744static void
746 ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
747 const char *prefix, Size sz, const char *message)
748{
750 TestDecodingTxnData *txndata;
751
752 txndata = transactional ? txn->output_plugin_private : NULL;
753
754 /* output BEGIN if we haven't yet for transactional messages */
755 if (transactional && data->skip_empty_xacts && !txndata->xact_wrote_changes)
756 pg_output_begin(ctx, data, txn, false);
757
758 if (transactional)
759 txndata->xact_wrote_changes = true;
760
761 OutputPluginPrepareWrite(ctx, true);
762 appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
763 transactional, prefix, sz);
764 appendBinaryStringInfo(ctx->out, message, sz);
765 OutputPluginWrite(ctx, true);
766}
767
768static void
770 ReorderBufferTXN *txn)
771{
774
775 /*
776 * Allocate the txn plugin data for the first stream in the transaction.
777 */
778 if (txndata == NULL)
779 {
780 txndata =
782 txndata->xact_wrote_changes = false;
783 txn->output_plugin_private = txndata;
784 }
785
786 txndata->stream_wrote_changes = false;
787 if (data->skip_empty_xacts)
788 return;
789 pg_output_stream_start(ctx, data, txn, true);
790}
791
792static void
794{
795 OutputPluginPrepareWrite(ctx, last_write);
796 if (data->include_xids)
797 appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
798 else
799 appendStringInfoString(ctx->out, "opening a streamed block for transaction");
800 OutputPluginWrite(ctx, last_write);
801}
802
803static void
805 ReorderBufferTXN *txn)
806{
809
810 if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
811 return;
812
813 OutputPluginPrepareWrite(ctx, true);
814 if (data->include_xids)
815 appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
816 else
817 appendStringInfoString(ctx->out, "closing a streamed block for transaction");
818 OutputPluginWrite(ctx, true);
819}
820
821static void
823 ReorderBufferTXN *txn,
824 XLogRecPtr abort_lsn)
825{
827
828 /*
829 * stream abort can be sent for an individual subtransaction but we
830 * maintain the output_plugin_private only under the toptxn so if this is
831 * not the toptxn then fetch the toptxn.
832 */
833 ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
835 bool xact_wrote_changes = txndata->xact_wrote_changes;
836
837 if (rbtxn_is_toptxn(txn))
838 {
839 Assert(txn->output_plugin_private != NULL);
840 pfree(txndata);
841 txn->output_plugin_private = NULL;
842 }
843
844 if (data->skip_empty_xacts && !xact_wrote_changes)
845 return;
846
847 OutputPluginPrepareWrite(ctx, true);
848 if (data->include_xids)
849 appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
850 else
851 appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
852 OutputPluginWrite(ctx, true);
853}
854
855static void
857 ReorderBufferTXN *txn,
858 XLogRecPtr prepare_lsn)
859{
862
863 if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
864 return;
865
866 OutputPluginPrepareWrite(ctx, true);
867
868 if (data->include_xids)
869 appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
870 quote_literal_cstr(txn->gid), txn->xid);
871 else
872 appendStringInfo(ctx->out, "preparing streamed transaction %s",
873 quote_literal_cstr(txn->gid));
874
875 if (data->include_timestamp)
876 appendStringInfo(ctx->out, " (at %s)",
878
879 OutputPluginWrite(ctx, true);
880}
881
882static void
884 ReorderBufferTXN *txn,
885 XLogRecPtr commit_lsn)
886{
889 bool xact_wrote_changes = txndata->xact_wrote_changes;
890
891 pfree(txndata);
892 txn->output_plugin_private = NULL;
893
894 if (data->skip_empty_xacts && !xact_wrote_changes)
895 return;
896
897 OutputPluginPrepareWrite(ctx, true);
898
899 if (data->include_xids)
900 appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
901 else
902 appendStringInfoString(ctx->out, "committing streamed transaction");
903
904 if (data->include_timestamp)
905 appendStringInfo(ctx->out, " (at %s)",
907
908 OutputPluginWrite(ctx, true);
909}
910
911/*
912 * In streaming mode, we don't display the changes as the transaction can abort
913 * at a later point in time. We don't want users to see the changes until the
914 * transaction is committed.
915 */
916static void
918 ReorderBufferTXN *txn,
919 Relation relation,
920 ReorderBufferChange *change)
921{
924
925 /* output stream start if we haven't yet */
926 if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
927 {
928 pg_output_stream_start(ctx, data, txn, false);
929 }
930 txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
931
932 OutputPluginPrepareWrite(ctx, true);
933 if (data->include_xids)
934 appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
935 else
936 appendStringInfoString(ctx->out, "streaming change for transaction");
937 OutputPluginWrite(ctx, true);
938}
939
940/*
941 * In streaming mode, we don't display the contents for transactional messages
942 * as the transaction can abort at a later point in time. We don't want users to
943 * see the message contents until the transaction is committed.
944 */
945static void
947 ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
948 const char *prefix, Size sz, const char *message)
949{
950 /* Output stream start if we haven't yet for transactional messages. */
951 if (transactional)
952 {
955
956 if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
957 {
958 pg_output_stream_start(ctx, data, txn, false);
959 }
960 txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
961 }
962
963 OutputPluginPrepareWrite(ctx, true);
964
965 if (transactional)
966 {
967 appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
968 transactional, prefix, sz);
969 }
970 else
971 {
972 appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
973 transactional, prefix, sz);
974 appendBinaryStringInfo(ctx->out, message, sz);
975 }
976
977 OutputPluginWrite(ctx, true);
978}
979
980/*
981 * In streaming mode, we don't display the detailed information of Truncate.
982 * See pg_decode_stream_change.
983 */
984static void
986 int nrelations, Relation relations[],
987 ReorderBufferChange *change)
988{
991
992 if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
993 {
994 pg_output_stream_start(ctx, data, txn, false);
995 }
996 txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
997
998 OutputPluginPrepareWrite(ctx, true);
999 if (data->include_xids)
1000 appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
1001 else
1002 appendStringInfoString(ctx->out, "streaming truncate for transaction");
1003 OutputPluginWrite(ctx, true);
1004}
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1862
bool parse_bool(const char *value, bool *result)
Definition: bool.c:31
#define NameStr(name)
Definition: c.h:717
#define SQL_STR_DOUBLE(ch, escape_backslash)
Definition: c.h:1134
uint32 TransactionId
Definition: c.h:623
size_t Size
Definition: c.h:576
int64 TimestampTz
Definition: timestamp.h:39
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1763
#define PG_DETOAST_DATUM(datum)
Definition: fmgr.h:240
char * format_type_be(Oid type_oid)
Definition: format_type.c:343
Assert(PointerIsAligned(start, uint64))
static Datum heap_getattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
Definition: htup_details.h:904
long val
Definition: informix.c:689
int i
Definition: isn.c:77
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:703
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:690
char * get_rel_name(Oid relid)
Definition: lsyscache.c:2068
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:3047
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:2092
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3506
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:414
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1294
void pfree(void *pointer)
Definition: mcxt.c:2150
void * palloc0(Size size)
Definition: mcxt.c:1973
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:485
#define AllocSetContextCreate
Definition: memutils.h:149
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:180
#define IsA(nodeptr, _type_)
Definition: nodes.h:164
#define InvalidRepOriginId
Definition: origin.h:33
@ OUTPUT_PLUGIN_BINARY_OUTPUT
Definition: output_plugin.h:19
@ OUTPUT_PLUGIN_TEXTUAL_OUTPUT
Definition: output_plugin.h:20
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:202
FormData_pg_class * Form_pg_class
Definition: pg_class.h:156
const void * data
#define lfirst(lc)
Definition: pg_list.h:172
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:327
uintptr_t Datum
Definition: postgres.h:69
unsigned int Oid
Definition: postgres_ext.h:30
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:103
#define RelationGetForm(relation)
Definition: rel.h:510
#define RelationGetRelid(relation)
Definition: rel.h:516
#define RelationGetDescr(relation)
Definition: rel.h:542
#define rbtxn_is_toptxn(txn)
#define rbtxn_get_toptxn(txn)
@ REORDER_BUFFER_CHANGE_INSERT
Definition: reorderbuffer.h:52
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:54
@ REORDER_BUFFER_CHANGE_UPDATE
Definition: reorderbuffer.h:53
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:13103
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:13019
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition: stringinfo.c:281
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:230
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:242
char * defname
Definition: parsenodes.h:826
Node * arg
Definition: parsenodes.h:827
MemoryContext context
Definition: logical.h:36
StringInfo out
Definition: logical.h:71
void * output_plugin_private
Definition: logical.h:76
List * output_plugin_options
Definition: logical.h:59
LogicalDecodeStreamChangeCB stream_change_cb
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
LogicalDecodeStreamMessageCB stream_message_cb
LogicalDecodeFilterPrepareCB filter_prepare_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeTruncateCB truncate_cb
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
LogicalDecodeStreamPrepareCB stream_prepare_cb
LogicalDecodeCommitPreparedCB commit_prepared_cb
LogicalDecodeStreamStartCB stream_start_cb
LogicalDecodePrepareCB prepare_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeCommitCB commit_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamAbortCB stream_abort_cb
LogicalDecodeBeginPrepareCB begin_prepare_cb
LogicalDecodeChangeCB change_cb
LogicalDecodeShutdownCB shutdown_cb
OutputPluginOutputType output_type
Definition: output_plugin.h:28
struct ReorderBufferChange::@110::@112 truncate
ReorderBufferChangeType action
Definition: reorderbuffer.h:81
union ReorderBufferChange::@110 data
struct ReorderBufferChange::@110::@111 tp
TimestampTz commit_time
void * output_plugin_private
TimestampTz prepare_time
TransactionId xid
union ReorderBufferTXN::@116 xact_time
Definition: value.h:64
MemoryContext context
Definition: test_decoding.c:32
static void pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void _PG_init(void)
static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static void print_literal(StringInfo s, Oid typid, char *outputstr)
static void pg_decode_stream_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
PG_MODULE_MAGIC_EXT(.name="test_decoding",.version=PG_VERSION)
static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
static void pg_decode_shutdown(LogicalDecodingContext *ctx)
static void pg_decode_stream_abort(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
static void pg_decode_stream_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
static void pg_decode_stream_commit(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
static void pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid, const char *gid)
static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
static void pg_decode_stream_stop(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_stream_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:160
#define strVal(v)
Definition: value.h:82
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: varatt.h:290
const char * name
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21