PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
test_decoding.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * test_decoding.c
4 * example logical decoding output plugin
5 *
6 * Copyright (c) 2012-2024, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * contrib/test_decoding/test_decoding.c
10 *
11 *-------------------------------------------------------------------------
12 */
13#include "postgres.h"
14
15#include "catalog/pg_type.h"
16
17#include "replication/logical.h"
18#include "replication/origin.h"
19
20#include "utils/builtins.h"
21#include "utils/lsyscache.h"
22#include "utils/memutils.h"
23#include "utils/rel.h"
24
26
27typedef struct
28{
35
36/*
37 * Maintain the per-transaction level variables to track whether the
38 * transaction and or streams have written any changes. In streaming mode the
39 * transaction can be decoded in streams so along with maintaining whether the
40 * transaction has written any changes, we also need to track whether the
41 * current stream has written any changes. This is required so that if user
42 * has requested to skip the empty transactions we can skip the empty streams
43 * even though the transaction has written some changes.
44 */
45typedef struct
46{
50
52 bool is_init);
55 ReorderBufferTXN *txn);
59 bool last_write);
61 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
63 ReorderBufferTXN *txn, Relation relation,
64 ReorderBufferChange *change);
67 int nrelations, Relation relations[],
68 ReorderBufferChange *change);
70 RepOriginId origin_id);
73 bool transactional, const char *prefix,
74 Size sz, const char *message);
76 TransactionId xid,
77 const char *gid);
79 ReorderBufferTXN *txn);
82 XLogRecPtr prepare_lsn);
85 XLogRecPtr commit_lsn);
88 XLogRecPtr prepare_end_lsn,
89 TimestampTz prepare_time);
91 ReorderBufferTXN *txn);
95 bool last_write);
97 ReorderBufferTXN *txn);
100 XLogRecPtr abort_lsn);
102 ReorderBufferTXN *txn,
103 XLogRecPtr prepare_lsn);
105 ReorderBufferTXN *txn,
106 XLogRecPtr commit_lsn);
108 ReorderBufferTXN *txn,
109 Relation relation,
110 ReorderBufferChange *change);
113 bool transactional, const char *prefix,
114 Size sz, const char *message);
116 ReorderBufferTXN *txn,
117 int nrelations, Relation relations[],
118 ReorderBufferChange *change);
119
120void
122{
123 /* other plugins can perform things here */
124}
125
126/* specify output plugin callbacks */
127void
129{
151}
152
153
154/* initialize this plugin */
155static void
157 bool is_init)
158{
161 bool enable_streaming = false;
162
163 data = palloc0(sizeof(TestDecodingData));
164 data->context = AllocSetContextCreate(ctx->context,
165 "text conversion context",
167 data->include_xids = true;
168 data->include_timestamp = false;
169 data->skip_empty_xacts = false;
170 data->only_local = false;
171
173
175 opt->receive_rewrites = false;
176
177 foreach(option, ctx->output_plugin_options)
178 {
179 DefElem *elem = lfirst(option);
180
181 Assert(elem->arg == NULL || IsA(elem->arg, String));
182
183 if (strcmp(elem->defname, "include-xids") == 0)
184 {
185 /* if option does not provide a value, it means its value is true */
186 if (elem->arg == NULL)
187 data->include_xids = true;
188 else if (!parse_bool(strVal(elem->arg), &data->include_xids))
190 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
191 errmsg("could not parse value \"%s\" for parameter \"%s\"",
192 strVal(elem->arg), elem->defname)));
193 }
194 else if (strcmp(elem->defname, "include-timestamp") == 0)
195 {
196 if (elem->arg == NULL)
197 data->include_timestamp = true;
198 else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
200 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
201 errmsg("could not parse value \"%s\" for parameter \"%s\"",
202 strVal(elem->arg), elem->defname)));
203 }
204 else if (strcmp(elem->defname, "force-binary") == 0)
205 {
206 bool force_binary;
207
208 if (elem->arg == NULL)
209 continue;
210 else if (!parse_bool(strVal(elem->arg), &force_binary))
212 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
213 errmsg("could not parse value \"%s\" for parameter \"%s\"",
214 strVal(elem->arg), elem->defname)));
215
216 if (force_binary)
218 }
219 else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
220 {
221
222 if (elem->arg == NULL)
223 data->skip_empty_xacts = true;
224 else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
226 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
227 errmsg("could not parse value \"%s\" for parameter \"%s\"",
228 strVal(elem->arg), elem->defname)));
229 }
230 else if (strcmp(elem->defname, "only-local") == 0)
231 {
232
233 if (elem->arg == NULL)
234 data->only_local = true;
235 else if (!parse_bool(strVal(elem->arg), &data->only_local))
237 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
238 errmsg("could not parse value \"%s\" for parameter \"%s\"",
239 strVal(elem->arg), elem->defname)));
240 }
241 else if (strcmp(elem->defname, "include-rewrites") == 0)
242 {
243
244 if (elem->arg == NULL)
245 continue;
246 else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
248 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
249 errmsg("could not parse value \"%s\" for parameter \"%s\"",
250 strVal(elem->arg), elem->defname)));
251 }
252 else if (strcmp(elem->defname, "stream-changes") == 0)
253 {
254 if (elem->arg == NULL)
255 continue;
256 else if (!parse_bool(strVal(elem->arg), &enable_streaming))
258 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
259 errmsg("could not parse value \"%s\" for parameter \"%s\"",
260 strVal(elem->arg), elem->defname)));
261 }
262 else
263 {
265 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
266 errmsg("option \"%s\" = \"%s\" is unknown",
267 elem->defname,
268 elem->arg ? strVal(elem->arg) : "(null)")));
269 }
270 }
271
272 ctx->streaming &= enable_streaming;
273}
274
275/* cleanup this plugin's resources */
276static void
278{
280
281 /* cleanup our own resources via memory context reset */
282 MemoryContextDelete(data->context);
283}
284
285/* BEGIN callback */
286static void
288{
290 TestDecodingTxnData *txndata =
292
293 txndata->xact_wrote_changes = false;
294 txn->output_plugin_private = txndata;
295
296 /*
297 * If asked to skip empty transactions, we'll emit BEGIN at the point
298 * where the first operation is received for this transaction.
299 */
300 if (data->skip_empty_xacts)
301 return;
302
303 pg_output_begin(ctx, data, txn, true);
304}
305
306static void
308{
309 OutputPluginPrepareWrite(ctx, last_write);
310 if (data->include_xids)
311 appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
312 else
313 appendStringInfoString(ctx->out, "BEGIN");
314 OutputPluginWrite(ctx, last_write);
315}
316
317/* COMMIT callback */
318static void
320 XLogRecPtr commit_lsn)
321{
324 bool xact_wrote_changes = txndata->xact_wrote_changes;
325
326 pfree(txndata);
327 txn->output_plugin_private = NULL;
328
329 if (data->skip_empty_xacts && !xact_wrote_changes)
330 return;
331
332 OutputPluginPrepareWrite(ctx, true);
333 if (data->include_xids)
334 appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
335 else
336 appendStringInfoString(ctx->out, "COMMIT");
337
338 if (data->include_timestamp)
339 appendStringInfo(ctx->out, " (at %s)",
341
342 OutputPluginWrite(ctx, true);
343}
344
345/* BEGIN PREPARE callback */
346static void
348{
350 TestDecodingTxnData *txndata =
352
353 txndata->xact_wrote_changes = false;
354 txn->output_plugin_private = txndata;
355
356 /*
357 * If asked to skip empty transactions, we'll emit BEGIN at the point
358 * where the first operation is received for this transaction.
359 */
360 if (data->skip_empty_xacts)
361 return;
362
363 pg_output_begin(ctx, data, txn, true);
364}
365
366/* PREPARE callback */
367static void
369 XLogRecPtr prepare_lsn)
370{
373
374 /*
375 * If asked to skip empty transactions, we'll emit PREPARE at the point
376 * where the first operation is received for this transaction.
377 */
378 if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
379 return;
380
381 OutputPluginPrepareWrite(ctx, true);
382
383 appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
384 quote_literal_cstr(txn->gid));
385
386 if (data->include_xids)
387 appendStringInfo(ctx->out, ", txid %u", txn->xid);
388
389 if (data->include_timestamp)
390 appendStringInfo(ctx->out, " (at %s)",
392
393 OutputPluginWrite(ctx, true);
394}
395
396/* COMMIT PREPARED callback */
397static void
399 XLogRecPtr commit_lsn)
400{
402
403 OutputPluginPrepareWrite(ctx, true);
404
405 appendStringInfo(ctx->out, "COMMIT PREPARED %s",
406 quote_literal_cstr(txn->gid));
407
408 if (data->include_xids)
409 appendStringInfo(ctx->out, ", txid %u", txn->xid);
410
411 if (data->include_timestamp)
412 appendStringInfo(ctx->out, " (at %s)",
414
415 OutputPluginWrite(ctx, true);
416}
417
418/* ROLLBACK PREPARED callback */
419static void
421 ReorderBufferTXN *txn,
422 XLogRecPtr prepare_end_lsn,
423 TimestampTz prepare_time)
424{
426
427 OutputPluginPrepareWrite(ctx, true);
428
429 appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
430 quote_literal_cstr(txn->gid));
431
432 if (data->include_xids)
433 appendStringInfo(ctx->out, ", txid %u", txn->xid);
434
435 if (data->include_timestamp)
436 appendStringInfo(ctx->out, " (at %s)",
438
439 OutputPluginWrite(ctx, true);
440}
441
442/*
443 * Filter out two-phase transactions.
444 *
445 * Each plugin can implement its own filtering logic. Here we demonstrate a
446 * simple logic by checking the GID. If the GID contains the "_nodecode"
447 * substring, then we filter it out.
448 */
449static bool
451 const char *gid)
452{
453 if (strstr(gid, "_nodecode") != NULL)
454 return true;
455
456 return false;
457}
458
459static bool
461 RepOriginId origin_id)
462{
464
465 if (data->only_local && origin_id != InvalidRepOriginId)
466 return true;
467 return false;
468}
469
470/*
471 * Print literal `outputstr' already represented as string of type `typid'
472 * into stringbuf `s'.
473 *
474 * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
475 * if standard_conforming_strings were enabled.
476 */
477static void
478print_literal(StringInfo s, Oid typid, char *outputstr)
479{
480 const char *valptr;
481
482 switch (typid)
483 {
484 case INT2OID:
485 case INT4OID:
486 case INT8OID:
487 case OIDOID:
488 case FLOAT4OID:
489 case FLOAT8OID:
490 case NUMERICOID:
491 /* NB: We don't care about Inf, NaN et al. */
492 appendStringInfoString(s, outputstr);
493 break;
494
495 case BITOID:
496 case VARBITOID:
497 appendStringInfo(s, "B'%s'", outputstr);
498 break;
499
500 case BOOLOID:
501 if (strcmp(outputstr, "t") == 0)
502 appendStringInfoString(s, "true");
503 else
504 appendStringInfoString(s, "false");
505 break;
506
507 default:
508 appendStringInfoChar(s, '\'');
509 for (valptr = outputstr; *valptr; valptr++)
510 {
511 char ch = *valptr;
512
513 if (SQL_STR_DOUBLE(ch, false))
516 }
517 appendStringInfoChar(s, '\'');
518 break;
519 }
520}
521
522/* print the tuple 'tuple' into the StringInfo s */
523static void
524tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
525{
526 int natt;
527
528 /* print all columns individually */
529 for (natt = 0; natt < tupdesc->natts; natt++)
530 {
531 Form_pg_attribute attr; /* the attribute itself */
532 Oid typid; /* type of current attribute */
533 Oid typoutput; /* output function */
534 bool typisvarlena;
535 Datum origval; /* possibly toasted Datum */
536 bool isnull; /* column is null? */
537
538 attr = TupleDescAttr(tupdesc, natt);
539
540 /*
541 * don't print dropped columns, we can't be sure everything is
542 * available for them
543 */
544 if (attr->attisdropped)
545 continue;
546
547 /*
548 * Don't print system columns, oid will already have been printed if
549 * present.
550 */
551 if (attr->attnum < 0)
552 continue;
553
554 typid = attr->atttypid;
555
556 /* get Datum from tuple */
557 origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
558
559 if (isnull && skip_nulls)
560 continue;
561
562 /* print attribute name */
563 appendStringInfoChar(s, ' ');
565
566 /* print attribute type */
567 appendStringInfoChar(s, '[');
569 appendStringInfoChar(s, ']');
570
571 /* query output function */
572 getTypeOutputInfo(typid,
573 &typoutput, &typisvarlena);
574
575 /* print separator */
576 appendStringInfoChar(s, ':');
577
578 /* print data */
579 if (isnull)
580 appendStringInfoString(s, "null");
581 else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
582 appendStringInfoString(s, "unchanged-toast-datum");
583 else if (!typisvarlena)
584 print_literal(s, typid,
585 OidOutputFunctionCall(typoutput, origval));
586 else
587 {
588 Datum val; /* definitely detoasted Datum */
589
591 print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
592 }
593 }
594}
595
596/*
597 * callback for individual changed tuples
598 */
599static void
601 Relation relation, ReorderBufferChange *change)
602{
604 TestDecodingTxnData *txndata;
605 Form_pg_class class_form;
606 TupleDesc tupdesc;
607 MemoryContext old;
608
610 txndata = txn->output_plugin_private;
611
612 /* output BEGIN if we haven't yet */
613 if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
614 {
615 pg_output_begin(ctx, data, txn, false);
616 }
617 txndata->xact_wrote_changes = true;
618
619 class_form = RelationGetForm(relation);
620 tupdesc = RelationGetDescr(relation);
621
622 /* Avoid leaking memory by using and resetting our own context */
623 old = MemoryContextSwitchTo(data->context);
624
625 OutputPluginPrepareWrite(ctx, true);
626
627 appendStringInfoString(ctx->out, "table ");
630 class_form->relrewrite ?
631 get_rel_name(class_form->relrewrite) :
632 NameStr(class_form->relname)));
633 appendStringInfoChar(ctx->out, ':');
634
635 switch (change->action)
636 {
638 appendStringInfoString(ctx->out, " INSERT:");
639 if (change->data.tp.newtuple == NULL)
640 appendStringInfoString(ctx->out, " (no-tuple-data)");
641 else
642 tuple_to_stringinfo(ctx->out, tupdesc,
643 change->data.tp.newtuple,
644 false);
645 break;
647 appendStringInfoString(ctx->out, " UPDATE:");
648 if (change->data.tp.oldtuple != NULL)
649 {
650 appendStringInfoString(ctx->out, " old-key:");
651 tuple_to_stringinfo(ctx->out, tupdesc,
652 change->data.tp.oldtuple,
653 true);
654 appendStringInfoString(ctx->out, " new-tuple:");
655 }
656
657 if (change->data.tp.newtuple == NULL)
658 appendStringInfoString(ctx->out, " (no-tuple-data)");
659 else
660 tuple_to_stringinfo(ctx->out, tupdesc,
661 change->data.tp.newtuple,
662 false);
663 break;
665 appendStringInfoString(ctx->out, " DELETE:");
666
667 /* if there was no PK, we only know that a delete happened */
668 if (change->data.tp.oldtuple == NULL)
669 appendStringInfoString(ctx->out, " (no-tuple-data)");
670 /* In DELETE, only the replica identity is present; display that */
671 else
672 tuple_to_stringinfo(ctx->out, tupdesc,
673 change->data.tp.oldtuple,
674 true);
675 break;
676 default:
677 Assert(false);
678 }
679
681 MemoryContextReset(data->context);
682
683 OutputPluginWrite(ctx, true);
684}
685
686static void
688 int nrelations, Relation relations[], ReorderBufferChange *change)
689{
691 TestDecodingTxnData *txndata;
692 MemoryContext old;
693 int i;
694
696 txndata = txn->output_plugin_private;
697
698 /* output BEGIN if we haven't yet */
699 if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
700 {
701 pg_output_begin(ctx, data, txn, false);
702 }
703 txndata->xact_wrote_changes = true;
704
705 /* Avoid leaking memory by using and resetting our own context */
706 old = MemoryContextSwitchTo(data->context);
707
708 OutputPluginPrepareWrite(ctx, true);
709
710 appendStringInfoString(ctx->out, "table ");
711
712 for (i = 0; i < nrelations; i++)
713 {
714 if (i > 0)
715 appendStringInfoString(ctx->out, ", ");
716
718 quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
719 NameStr(relations[i]->rd_rel->relname)));
720 }
721
722 appendStringInfoString(ctx->out, ": TRUNCATE:");
723
724 if (change->data.truncate.restart_seqs
725 || change->data.truncate.cascade)
726 {
727 if (change->data.truncate.restart_seqs)
728 appendStringInfoString(ctx->out, " restart_seqs");
729 if (change->data.truncate.cascade)
730 appendStringInfoString(ctx->out, " cascade");
731 }
732 else
733 appendStringInfoString(ctx->out, " (no-flags)");
734
736 MemoryContextReset(data->context);
737
738 OutputPluginWrite(ctx, true);
739}
740
741static void
743 ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
744 const char *prefix, Size sz, const char *message)
745{
747 TestDecodingTxnData *txndata;
748
749 txndata = transactional ? txn->output_plugin_private : NULL;
750
751 /* output BEGIN if we haven't yet for transactional messages */
752 if (transactional && data->skip_empty_xacts && !txndata->xact_wrote_changes)
753 pg_output_begin(ctx, data, txn, false);
754
755 if (transactional)
756 txndata->xact_wrote_changes = true;
757
758 OutputPluginPrepareWrite(ctx, true);
759 appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
760 transactional, prefix, sz);
761 appendBinaryStringInfo(ctx->out, message, sz);
762 OutputPluginWrite(ctx, true);
763}
764
765static void
767 ReorderBufferTXN *txn)
768{
771
772 /*
773 * Allocate the txn plugin data for the first stream in the transaction.
774 */
775 if (txndata == NULL)
776 {
777 txndata =
779 txndata->xact_wrote_changes = false;
780 txn->output_plugin_private = txndata;
781 }
782
783 txndata->stream_wrote_changes = false;
784 if (data->skip_empty_xacts)
785 return;
786 pg_output_stream_start(ctx, data, txn, true);
787}
788
789static void
791{
792 OutputPluginPrepareWrite(ctx, last_write);
793 if (data->include_xids)
794 appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
795 else
796 appendStringInfoString(ctx->out, "opening a streamed block for transaction");
797 OutputPluginWrite(ctx, last_write);
798}
799
800static void
802 ReorderBufferTXN *txn)
803{
806
807 if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
808 return;
809
810 OutputPluginPrepareWrite(ctx, true);
811 if (data->include_xids)
812 appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
813 else
814 appendStringInfoString(ctx->out, "closing a streamed block for transaction");
815 OutputPluginWrite(ctx, true);
816}
817
818static void
820 ReorderBufferTXN *txn,
821 XLogRecPtr abort_lsn)
822{
824
825 /*
826 * stream abort can be sent for an individual subtransaction but we
827 * maintain the output_plugin_private only under the toptxn so if this is
828 * not the toptxn then fetch the toptxn.
829 */
830 ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
832 bool xact_wrote_changes = txndata->xact_wrote_changes;
833
834 if (rbtxn_is_toptxn(txn))
835 {
836 Assert(txn->output_plugin_private != NULL);
837 pfree(txndata);
838 txn->output_plugin_private = NULL;
839 }
840
841 if (data->skip_empty_xacts && !xact_wrote_changes)
842 return;
843
844 OutputPluginPrepareWrite(ctx, true);
845 if (data->include_xids)
846 appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
847 else
848 appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
849 OutputPluginWrite(ctx, true);
850}
851
852static void
854 ReorderBufferTXN *txn,
855 XLogRecPtr prepare_lsn)
856{
859
860 if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
861 return;
862
863 OutputPluginPrepareWrite(ctx, true);
864
865 if (data->include_xids)
866 appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
867 quote_literal_cstr(txn->gid), txn->xid);
868 else
869 appendStringInfo(ctx->out, "preparing streamed transaction %s",
870 quote_literal_cstr(txn->gid));
871
872 if (data->include_timestamp)
873 appendStringInfo(ctx->out, " (at %s)",
875
876 OutputPluginWrite(ctx, true);
877}
878
879static void
881 ReorderBufferTXN *txn,
882 XLogRecPtr commit_lsn)
883{
886 bool xact_wrote_changes = txndata->xact_wrote_changes;
887
888 pfree(txndata);
889 txn->output_plugin_private = NULL;
890
891 if (data->skip_empty_xacts && !xact_wrote_changes)
892 return;
893
894 OutputPluginPrepareWrite(ctx, true);
895
896 if (data->include_xids)
897 appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
898 else
899 appendStringInfoString(ctx->out, "committing streamed transaction");
900
901 if (data->include_timestamp)
902 appendStringInfo(ctx->out, " (at %s)",
904
905 OutputPluginWrite(ctx, true);
906}
907
908/*
909 * In streaming mode, we don't display the changes as the transaction can abort
910 * at a later point in time. We don't want users to see the changes until the
911 * transaction is committed.
912 */
913static void
915 ReorderBufferTXN *txn,
916 Relation relation,
917 ReorderBufferChange *change)
918{
921
922 /* output stream start if we haven't yet */
923 if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
924 {
925 pg_output_stream_start(ctx, data, txn, false);
926 }
927 txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
928
929 OutputPluginPrepareWrite(ctx, true);
930 if (data->include_xids)
931 appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
932 else
933 appendStringInfoString(ctx->out, "streaming change for transaction");
934 OutputPluginWrite(ctx, true);
935}
936
937/*
938 * In streaming mode, we don't display the contents for transactional messages
939 * as the transaction can abort at a later point in time. We don't want users to
940 * see the message contents until the transaction is committed.
941 */
942static void
944 ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
945 const char *prefix, Size sz, const char *message)
946{
947 /* Output stream start if we haven't yet for transactional messages. */
948 if (transactional)
949 {
952
953 if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
954 {
955 pg_output_stream_start(ctx, data, txn, false);
956 }
957 txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
958 }
959
960 OutputPluginPrepareWrite(ctx, true);
961
962 if (transactional)
963 {
964 appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
965 transactional, prefix, sz);
966 }
967 else
968 {
969 appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
970 transactional, prefix, sz);
971 appendBinaryStringInfo(ctx->out, message, sz);
972 }
973
974 OutputPluginWrite(ctx, true);
975}
976
977/*
978 * In streaming mode, we don't display the detailed information of Truncate.
979 * See pg_decode_stream_change.
980 */
981static void
983 int nrelations, Relation relations[],
984 ReorderBufferChange *change)
985{
988
989 if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
990 {
991 pg_output_stream_start(ctx, data, txn, false);
992 }
993 txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
994
995 OutputPluginPrepareWrite(ctx, true);
996 if (data->include_xids)
997 appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
998 else
999 appendStringInfoString(ctx->out, "streaming truncate for transaction");
1000 OutputPluginWrite(ctx, true);
1001}
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1843
bool parse_bool(const char *value, bool *result)
Definition: bool.c:31
#define NameStr(name)
Definition: c.h:700
#define Assert(condition)
Definition: c.h:812
#define SQL_STR_DOUBLE(ch, escape_backslash)
Definition: c.h:1117
uint32 TransactionId
Definition: c.h:606
size_t Size
Definition: c.h:559
int64 TimestampTz
Definition: timestamp.h:39
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1763
#define PG_DETOAST_DATUM(datum)
Definition: fmgr.h:240
char * format_type_be(Oid type_oid)
Definition: format_type.c:343
static Datum heap_getattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
Definition: htup_details.h:792
long val
Definition: informix.c:689
int i
Definition: isn.c:72
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:722
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:709
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1928
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:2907
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1952
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3366
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:383
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1215
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc0(Size size)
Definition: mcxt.c:1347
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define IsA(nodeptr, _type_)
Definition: nodes.h:158
#define InvalidRepOriginId
Definition: origin.h:33
@ OUTPUT_PLUGIN_BINARY_OUTPUT
Definition: output_plugin.h:19
@ OUTPUT_PLUGIN_TEXTUAL_OUTPUT
Definition: output_plugin.h:20
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:200
FormData_pg_class * Form_pg_class
Definition: pg_class.h:153
const void * data
#define lfirst(lc)
Definition: pg_list.h:172
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
uintptr_t Datum
Definition: postgres.h:64
unsigned int Oid
Definition: postgres_ext.h:31
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:103
MemoryContextSwitchTo(old_ctx)
#define RelationGetForm(relation)
Definition: rel.h:499
#define RelationGetRelid(relation)
Definition: rel.h:505
#define RelationGetDescr(relation)
Definition: rel.h:531
#define rbtxn_is_toptxn(txn)
#define rbtxn_get_toptxn(txn)
@ REORDER_BUFFER_CHANGE_INSERT
Definition: reorderbuffer.h:52
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:54
@ REORDER_BUFFER_CHANGE_UPDATE
Definition: reorderbuffer.h:53
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:12954
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:12870
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:94
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition: stringinfo.c:230
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:179
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:191
char * defname
Definition: parsenodes.h:817
Node * arg
Definition: parsenodes.h:818
MemoryContext context
Definition: logical.h:36
StringInfo out
Definition: logical.h:71
void * output_plugin_private
Definition: logical.h:76
List * output_plugin_options
Definition: logical.h:59
LogicalDecodeStreamChangeCB stream_change_cb
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
LogicalDecodeStreamMessageCB stream_message_cb
LogicalDecodeFilterPrepareCB filter_prepare_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeTruncateCB truncate_cb
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
LogicalDecodeStreamPrepareCB stream_prepare_cb
LogicalDecodeCommitPreparedCB commit_prepared_cb
LogicalDecodeStreamStartCB stream_start_cb
LogicalDecodePrepareCB prepare_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeCommitCB commit_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamAbortCB stream_abort_cb
LogicalDecodeBeginPrepareCB begin_prepare_cb
LogicalDecodeChangeCB change_cb
LogicalDecodeShutdownCB shutdown_cb
OutputPluginOutputType output_type
Definition: output_plugin.h:28
ReorderBufferChangeType action
Definition: reorderbuffer.h:81
struct ReorderBufferChange::@109::@111 truncate
struct ReorderBufferChange::@109::@110 tp
union ReorderBufferChange::@109 data
TimestampTz commit_time
union ReorderBufferTXN::@115 xact_time
void * output_plugin_private
TimestampTz prepare_time
TransactionId xid
Definition: value.h:64
MemoryContext context
Definition: test_decoding.c:29
static void pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void _PG_init(void)
static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
PG_MODULE_MAGIC
Definition: test_decoding.c:25
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static void print_literal(StringInfo s, Oid typid, char *outputstr)
static void pg_decode_stream_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
static void pg_decode_shutdown(LogicalDecodingContext *ctx)
static void pg_decode_stream_abort(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
static void pg_decode_stream_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
static void pg_decode_stream_commit(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
static void pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid, const char *gid)
static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
static void pg_decode_stream_stop(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_stream_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:152
#define strVal(v)
Definition: value.h:82
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: varatt.h:290
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21