PostgreSQL Source Code git master
proto.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * proto.c
4 * logical replication protocol functions
5 *
6 * Copyright (c) 2015-2025, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/replication/logical/proto.c
10 *
11 *-------------------------------------------------------------------------
12 */
13#include "postgres.h"
14
15#include "access/sysattr.h"
17#include "catalog/pg_type.h"
18#include "libpq/pqformat.h"
20#include "utils/lsyscache.h"
21#include "utils/syscache.h"
22
23/*
24 * Protocol message flags.
25 */
26#define LOGICALREP_IS_REPLICA_IDENTITY 1
27
28#define MESSAGE_TRANSACTIONAL (1<<0)
29#define TRUNCATE_CASCADE (1<<0)
30#define TRUNCATE_RESTART_SEQS (1<<1)
31
32static void logicalrep_write_attrs(StringInfo out, Relation rel,
33 Bitmapset *columns,
34 PublishGencolsType include_gencols_type);
35static void logicalrep_write_tuple(StringInfo out, Relation rel,
36 TupleTableSlot *slot,
37 bool binary, Bitmapset *columns,
38 PublishGencolsType include_gencols_type);
41
43static const char *logicalrep_read_namespace(StringInfo in);
44
45/*
46 * Write BEGIN to the output stream.
47 */
48void
50{
52
53 /* fixed fields */
54 pq_sendint64(out, txn->final_lsn);
56 pq_sendint32(out, txn->xid);
57}
58
59/*
60 * Read transaction BEGIN from the stream.
61 */
62void
64{
65 /* read fields */
66 begin_data->final_lsn = pq_getmsgint64(in);
67 if (begin_data->final_lsn == InvalidXLogRecPtr)
68 elog(ERROR, "final_lsn not set in begin message");
69 begin_data->committime = pq_getmsgint64(in);
70 begin_data->xid = pq_getmsgint(in, 4);
71}
72
73
74/*
75 * Write COMMIT to the output stream.
76 */
77void
79 XLogRecPtr commit_lsn)
80{
81 uint8 flags = 0;
82
84
85 /* send the flags field (unused for now) */
86 pq_sendbyte(out, flags);
87
88 /* send fields */
89 pq_sendint64(out, commit_lsn);
90 pq_sendint64(out, txn->end_lsn);
92}
93
94/*
95 * Read transaction COMMIT from the stream.
96 */
97void
99{
100 /* read flags (unused for now) */
101 uint8 flags = pq_getmsgbyte(in);
102
103 if (flags != 0)
104 elog(ERROR, "unrecognized flags %u in commit message", flags);
105
106 /* read fields */
107 commit_data->commit_lsn = pq_getmsgint64(in);
108 commit_data->end_lsn = pq_getmsgint64(in);
109 commit_data->committime = pq_getmsgint64(in);
110}
111
112/*
113 * Write BEGIN PREPARE to the output stream.
114 */
115void
117{
119
120 /* fixed fields */
121 pq_sendint64(out, txn->final_lsn);
122 pq_sendint64(out, txn->end_lsn);
124 pq_sendint32(out, txn->xid);
125
126 /* send gid */
127 pq_sendstring(out, txn->gid);
128}
129
130/*
131 * Read transaction BEGIN PREPARE from the stream.
132 */
133void
135{
136 /* read fields */
137 begin_data->prepare_lsn = pq_getmsgint64(in);
138 if (begin_data->prepare_lsn == InvalidXLogRecPtr)
139 elog(ERROR, "prepare_lsn not set in begin prepare message");
140 begin_data->end_lsn = pq_getmsgint64(in);
141 if (begin_data->end_lsn == InvalidXLogRecPtr)
142 elog(ERROR, "end_lsn not set in begin prepare message");
143 begin_data->prepare_time = pq_getmsgint64(in);
144 begin_data->xid = pq_getmsgint(in, 4);
145
146 /* read gid (copy it into a pre-allocated buffer) */
147 strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
148}
149
150/*
151 * The core functionality for logicalrep_write_prepare and
152 * logicalrep_write_stream_prepare.
153 */
154static void
156 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
157{
158 uint8 flags = 0;
159
160 pq_sendbyte(out, type);
161
162 /*
163 * This should only ever happen for two-phase commit transactions, in
164 * which case we expect to have a valid GID.
165 */
166 Assert(txn->gid != NULL);
169
170 /* send the flags field */
171 pq_sendbyte(out, flags);
172
173 /* send fields */
174 pq_sendint64(out, prepare_lsn);
175 pq_sendint64(out, txn->end_lsn);
177 pq_sendint32(out, txn->xid);
178
179 /* send gid */
180 pq_sendstring(out, txn->gid);
181}
182
183/*
184 * Write PREPARE to the output stream.
185 */
186void
188 XLogRecPtr prepare_lsn)
189{
191 txn, prepare_lsn);
192}
193
194/*
195 * The core functionality for logicalrep_read_prepare and
196 * logicalrep_read_stream_prepare.
197 */
198static void
200 LogicalRepPreparedTxnData *prepare_data)
201{
202 /* read flags */
203 uint8 flags = pq_getmsgbyte(in);
204
205 if (flags != 0)
206 elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
207
208 /* read fields */
209 prepare_data->prepare_lsn = pq_getmsgint64(in);
210 if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
211 elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
212 prepare_data->end_lsn = pq_getmsgint64(in);
213 if (prepare_data->end_lsn == InvalidXLogRecPtr)
214 elog(ERROR, "end_lsn is not set in %s message", msgtype);
215 prepare_data->prepare_time = pq_getmsgint64(in);
216 prepare_data->xid = pq_getmsgint(in, 4);
217 if (prepare_data->xid == InvalidTransactionId)
218 elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
219
220 /* read gid (copy it into a pre-allocated buffer) */
221 strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
222}
223
224/*
225 * Read transaction PREPARE from the stream.
226 */
227void
229{
230 logicalrep_read_prepare_common(in, "prepare", prepare_data);
231}
232
233/*
234 * Write COMMIT PREPARED to the output stream.
235 */
236void
238 XLogRecPtr commit_lsn)
239{
240 uint8 flags = 0;
241
243
244 /*
245 * This should only ever happen for two-phase commit transactions, in
246 * which case we expect to have a valid GID.
247 */
248 Assert(txn->gid != NULL);
249
250 /* send the flags field */
251 pq_sendbyte(out, flags);
252
253 /* send fields */
254 pq_sendint64(out, commit_lsn);
255 pq_sendint64(out, txn->end_lsn);
257 pq_sendint32(out, txn->xid);
258
259 /* send gid */
260 pq_sendstring(out, txn->gid);
261}
262
263/*
264 * Read transaction COMMIT PREPARED from the stream.
265 */
266void
268{
269 /* read flags */
270 uint8 flags = pq_getmsgbyte(in);
271
272 if (flags != 0)
273 elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
274
275 /* read fields */
276 prepare_data->commit_lsn = pq_getmsgint64(in);
277 if (prepare_data->commit_lsn == InvalidXLogRecPtr)
278 elog(ERROR, "commit_lsn is not set in commit prepared message");
279 prepare_data->end_lsn = pq_getmsgint64(in);
280 if (prepare_data->end_lsn == InvalidXLogRecPtr)
281 elog(ERROR, "end_lsn is not set in commit prepared message");
282 prepare_data->commit_time = pq_getmsgint64(in);
283 prepare_data->xid = pq_getmsgint(in, 4);
284
285 /* read gid (copy it into a pre-allocated buffer) */
286 strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
287}
288
289/*
290 * Write ROLLBACK PREPARED to the output stream.
291 */
292void
294 XLogRecPtr prepare_end_lsn,
295 TimestampTz prepare_time)
296{
297 uint8 flags = 0;
298
300
301 /*
302 * This should only ever happen for two-phase commit transactions, in
303 * which case we expect to have a valid GID.
304 */
305 Assert(txn->gid != NULL);
306
307 /* send the flags field */
308 pq_sendbyte(out, flags);
309
310 /* send fields */
311 pq_sendint64(out, prepare_end_lsn);
312 pq_sendint64(out, txn->end_lsn);
313 pq_sendint64(out, prepare_time);
315 pq_sendint32(out, txn->xid);
316
317 /* send gid */
318 pq_sendstring(out, txn->gid);
319}
320
321/*
322 * Read transaction ROLLBACK PREPARED from the stream.
323 */
324void
327{
328 /* read flags */
329 uint8 flags = pq_getmsgbyte(in);
330
331 if (flags != 0)
332 elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
333
334 /* read fields */
335 rollback_data->prepare_end_lsn = pq_getmsgint64(in);
336 if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
337 elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
338 rollback_data->rollback_end_lsn = pq_getmsgint64(in);
339 if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
340 elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
341 rollback_data->prepare_time = pq_getmsgint64(in);
342 rollback_data->rollback_time = pq_getmsgint64(in);
343 rollback_data->xid = pq_getmsgint(in, 4);
344
345 /* read gid (copy it into a pre-allocated buffer) */
346 strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
347}
348
349/*
350 * Write STREAM PREPARE to the output stream.
351 */
352void
354 ReorderBufferTXN *txn,
355 XLogRecPtr prepare_lsn)
356{
358 txn, prepare_lsn);
359}
360
361/*
362 * Read STREAM PREPARE from the stream.
363 */
364void
366{
367 logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
368}
369
370/*
371 * Write ORIGIN to the output stream.
372 */
373void
374logicalrep_write_origin(StringInfo out, const char *origin,
375 XLogRecPtr origin_lsn)
376{
378
379 /* fixed fields */
380 pq_sendint64(out, origin_lsn);
381
382 /* origin string */
383 pq_sendstring(out, origin);
384}
385
386/*
387 * Read ORIGIN from the output stream.
388 */
389char *
391{
392 /* fixed fields */
393 *origin_lsn = pq_getmsgint64(in);
394
395 /* return origin */
396 return pstrdup(pq_getmsgstring(in));
397}
398
399/*
400 * Write INSERT to the output stream.
401 */
402void
404 TupleTableSlot *newslot, bool binary,
405 Bitmapset *columns,
406 PublishGencolsType include_gencols_type)
407{
409
410 /* transaction ID (if not valid, we're not streaming) */
411 if (TransactionIdIsValid(xid))
412 pq_sendint32(out, xid);
413
414 /* use Oid as relation identifier */
416
417 pq_sendbyte(out, 'N'); /* new tuple follows */
418 logicalrep_write_tuple(out, rel, newslot, binary, columns,
419 include_gencols_type);
420}
421
422/*
423 * Read INSERT from stream.
424 *
425 * Fills the new tuple.
426 */
429{
430 char action;
431 LogicalRepRelId relid;
432
433 /* read the relation id */
434 relid = pq_getmsgint(in, 4);
435
436 action = pq_getmsgbyte(in);
437 if (action != 'N')
438 elog(ERROR, "expected new tuple but got %d",
439 action);
440
441 logicalrep_read_tuple(in, newtup);
442
443 return relid;
444}
445
446/*
447 * Write UPDATE to the output stream.
448 */
449void
451 TupleTableSlot *oldslot, TupleTableSlot *newslot,
452 bool binary, Bitmapset *columns,
453 PublishGencolsType include_gencols_type)
454{
456
457 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
458 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
459 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
460
461 /* transaction ID (if not valid, we're not streaming) */
462 if (TransactionIdIsValid(xid))
463 pq_sendint32(out, xid);
464
465 /* use Oid as relation identifier */
467
468 if (oldslot != NULL)
469 {
470 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
471 pq_sendbyte(out, 'O'); /* old tuple follows */
472 else
473 pq_sendbyte(out, 'K'); /* old key follows */
474 logicalrep_write_tuple(out, rel, oldslot, binary, columns,
475 include_gencols_type);
476 }
477
478 pq_sendbyte(out, 'N'); /* new tuple follows */
479 logicalrep_write_tuple(out, rel, newslot, binary, columns,
480 include_gencols_type);
481}
482
483/*
484 * Read UPDATE from stream.
485 */
487logicalrep_read_update(StringInfo in, bool *has_oldtuple,
488 LogicalRepTupleData *oldtup,
489 LogicalRepTupleData *newtup)
490{
491 char action;
492 LogicalRepRelId relid;
493
494 /* read the relation id */
495 relid = pq_getmsgint(in, 4);
496
497 /* read and verify action */
498 action = pq_getmsgbyte(in);
499 if (action != 'K' && action != 'O' && action != 'N')
500 elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
501 action);
502
503 /* check for old tuple */
504 if (action == 'K' || action == 'O')
505 {
506 logicalrep_read_tuple(in, oldtup);
507 *has_oldtuple = true;
508
509 action = pq_getmsgbyte(in);
510 }
511 else
512 *has_oldtuple = false;
513
514 /* check for new tuple */
515 if (action != 'N')
516 elog(ERROR, "expected action 'N', got %c",
517 action);
518
519 logicalrep_read_tuple(in, newtup);
520
521 return relid;
522}
523
524/*
525 * Write DELETE to the output stream.
526 */
527void
529 TupleTableSlot *oldslot, bool binary,
530 Bitmapset *columns,
531 PublishGencolsType include_gencols_type)
532{
533 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
534 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
535 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
536
538
539 /* transaction ID (if not valid, we're not streaming) */
540 if (TransactionIdIsValid(xid))
541 pq_sendint32(out, xid);
542
543 /* use Oid as relation identifier */
545
546 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
547 pq_sendbyte(out, 'O'); /* old tuple follows */
548 else
549 pq_sendbyte(out, 'K'); /* old key follows */
550
551 logicalrep_write_tuple(out, rel, oldslot, binary, columns,
552 include_gencols_type);
553}
554
555/*
556 * Read DELETE from stream.
557 *
558 * Fills the old tuple.
559 */
562{
563 char action;
564 LogicalRepRelId relid;
565
566 /* read the relation id */
567 relid = pq_getmsgint(in, 4);
568
569 /* read and verify action */
570 action = pq_getmsgbyte(in);
571 if (action != 'K' && action != 'O')
572 elog(ERROR, "expected action 'O' or 'K', got %c", action);
573
574 logicalrep_read_tuple(in, oldtup);
575
576 return relid;
577}
578
579/*
580 * Write TRUNCATE to the output stream.
581 */
582void
584 TransactionId xid,
585 int nrelids,
586 Oid relids[],
587 bool cascade, bool restart_seqs)
588{
589 int i;
590 uint8 flags = 0;
591
593
594 /* transaction ID (if not valid, we're not streaming) */
595 if (TransactionIdIsValid(xid))
596 pq_sendint32(out, xid);
597
598 pq_sendint32(out, nrelids);
599
600 /* encode and send truncate flags */
601 if (cascade)
602 flags |= TRUNCATE_CASCADE;
603 if (restart_seqs)
604 flags |= TRUNCATE_RESTART_SEQS;
605 pq_sendint8(out, flags);
606
607 for (i = 0; i < nrelids; i++)
608 pq_sendint32(out, relids[i]);
609}
610
611/*
612 * Read TRUNCATE from stream.
613 */
614List *
616 bool *cascade, bool *restart_seqs)
617{
618 int i;
619 int nrelids;
620 List *relids = NIL;
621 uint8 flags;
622
623 nrelids = pq_getmsgint(in, 4);
624
625 /* read and decode truncate flags */
626 flags = pq_getmsgint(in, 1);
627 *cascade = (flags & TRUNCATE_CASCADE) > 0;
628 *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
629
630 for (i = 0; i < nrelids; i++)
631 relids = lappend_oid(relids, pq_getmsgint(in, 4));
632
633 return relids;
634}
635
636/*
637 * Write MESSAGE to stream
638 */
639void
641 bool transactional, const char *prefix, Size sz,
642 const char *message)
643{
644 uint8 flags = 0;
645
647
648 /* encode and send message flags */
649 if (transactional)
650 flags |= MESSAGE_TRANSACTIONAL;
651
652 /* transaction ID (if not valid, we're not streaming) */
653 if (TransactionIdIsValid(xid))
654 pq_sendint32(out, xid);
655
656 pq_sendint8(out, flags);
657 pq_sendint64(out, lsn);
658 pq_sendstring(out, prefix);
659 pq_sendint32(out, sz);
660 pq_sendbytes(out, message, sz);
661}
662
663/*
664 * Write relation description to the output stream.
665 */
666void
668 Bitmapset *columns,
669 PublishGencolsType include_gencols_type)
670{
671 char *relname;
672
674
675 /* transaction ID (if not valid, we're not streaming) */
676 if (TransactionIdIsValid(xid))
677 pq_sendint32(out, xid);
678
679 /* use Oid as relation identifier */
681
682 /* send qualified relation name */
686
687 /* send replica identity */
688 pq_sendbyte(out, rel->rd_rel->relreplident);
689
690 /* send the attribute info */
691 logicalrep_write_attrs(out, rel, columns, include_gencols_type);
692}
693
694/*
695 * Read the relation info from stream and return as LogicalRepRelation.
696 */
699{
701
702 rel->remoteid = pq_getmsgint(in, 4);
703
704 /* Read relation name from stream */
706 rel->relname = pstrdup(pq_getmsgstring(in));
707
708 /* Read the replica identity. */
709 rel->replident = pq_getmsgbyte(in);
710
711 /* Get attribute description */
712 logicalrep_read_attrs(in, rel);
713
714 return rel;
715}
716
717/*
718 * Write type info to the output stream.
719 *
720 * This function will always write base type info.
721 */
722void
724{
725 Oid basetypoid = getBaseType(typoid);
726 HeapTuple tup;
727 Form_pg_type typtup;
728
730
731 /* transaction ID (if not valid, we're not streaming) */
732 if (TransactionIdIsValid(xid))
733 pq_sendint32(out, xid);
734
735 tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
736 if (!HeapTupleIsValid(tup))
737 elog(ERROR, "cache lookup failed for type %u", basetypoid);
738 typtup = (Form_pg_type) GETSTRUCT(tup);
739
740 /* use Oid as type identifier */
741 pq_sendint32(out, typoid);
742
743 /* send qualified type name */
744 logicalrep_write_namespace(out, typtup->typnamespace);
745 pq_sendstring(out, NameStr(typtup->typname));
746
747 ReleaseSysCache(tup);
748}
749
750/*
751 * Read type info from the output stream.
752 */
753void
755{
756 ltyp->remoteid = pq_getmsgint(in, 4);
757
758 /* Read type name from stream */
760 ltyp->typname = pstrdup(pq_getmsgstring(in));
761}
762
763/*
764 * Write a tuple to the outputstream, in the most efficient format possible.
765 */
766static void
768 bool binary, Bitmapset *columns,
769 PublishGencolsType include_gencols_type)
770{
771 TupleDesc desc;
772 Datum *values;
773 bool *isnull;
774 int i;
775 uint16 nliveatts = 0;
776
777 desc = RelationGetDescr(rel);
778
779 for (i = 0; i < desc->natts; i++)
780 {
781 Form_pg_attribute att = TupleDescAttr(desc, i);
782
783 if (!logicalrep_should_publish_column(att, columns,
784 include_gencols_type))
785 continue;
786
787 nliveatts++;
788 }
789 pq_sendint16(out, nliveatts);
790
791 slot_getallattrs(slot);
792 values = slot->tts_values;
793 isnull = slot->tts_isnull;
794
795 /* Write the values */
796 for (i = 0; i < desc->natts; i++)
797 {
798 HeapTuple typtup;
799 Form_pg_type typclass;
800 Form_pg_attribute att = TupleDescAttr(desc, i);
801
802 if (!logicalrep_should_publish_column(att, columns,
803 include_gencols_type))
804 continue;
805
806 if (isnull[i])
807 {
809 continue;
810 }
811
812 if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
813 {
814 /*
815 * Unchanged toasted datum. (Note that we don't promise to detect
816 * unchanged data in general; this is just a cheap check to avoid
817 * sending large values unnecessarily.)
818 */
820 continue;
821 }
822
823 typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
824 if (!HeapTupleIsValid(typtup))
825 elog(ERROR, "cache lookup failed for type %u", att->atttypid);
826 typclass = (Form_pg_type) GETSTRUCT(typtup);
827
828 /*
829 * Send in binary if requested and type has suitable send function.
830 */
831 if (binary && OidIsValid(typclass->typsend))
832 {
833 bytea *outputbytes;
834 int len;
835
837 outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
838 len = VARSIZE(outputbytes) - VARHDRSZ;
839 pq_sendint(out, len, 4); /* length */
840 pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
841 pfree(outputbytes);
842 }
843 else
844 {
845 char *outputstr;
846
848 outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
849 pq_sendcountedtext(out, outputstr, strlen(outputstr));
850 pfree(outputstr);
851 }
852
853 ReleaseSysCache(typtup);
854 }
855}
856
857/*
858 * Read tuple in logical replication format from stream.
859 */
860static void
862{
863 int i;
864 int natts;
865
866 /* Get number of attributes */
867 natts = pq_getmsgint(in, 2);
868
869 /* Allocate space for per-column values; zero out unused StringInfoDatas */
870 tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
871 tuple->colstatus = (char *) palloc(natts * sizeof(char));
872 tuple->ncols = natts;
873
874 /* Read the data */
875 for (i = 0; i < natts; i++)
876 {
877 char *buff;
878 char kind;
879 int len;
880 StringInfo value = &tuple->colvalues[i];
881
882 kind = pq_getmsgbyte(in);
883 tuple->colstatus[i] = kind;
884
885 switch (kind)
886 {
888 /* nothing more to do */
889 break;
891 /* we don't receive the value of an unchanged column */
892 break;
895 len = pq_getmsgint(in, 4); /* read length */
896
897 /* and data */
898 buff = palloc(len + 1);
899 pq_copymsgbytes(in, buff, len);
900
901 /*
902 * NUL termination is required for LOGICALREP_COLUMN_TEXT mode
903 * as input functions require that. For
904 * LOGICALREP_COLUMN_BINARY it's not technically required, but
905 * it's harmless.
906 */
907 buff[len] = '\0';
908
910 break;
911 default:
912 elog(ERROR, "unrecognized data representation type '%c'", kind);
913 }
914 }
915}
916
917/*
918 * Write relation attribute metadata to the stream.
919 */
920static void
922 PublishGencolsType include_gencols_type)
923{
924 TupleDesc desc;
925 int i;
926 uint16 nliveatts = 0;
927 Bitmapset *idattrs = NULL;
928 bool replidentfull;
929
930 desc = RelationGetDescr(rel);
931
932 /* send number of live attributes */
933 for (i = 0; i < desc->natts; i++)
934 {
935 Form_pg_attribute att = TupleDescAttr(desc, i);
936
937 if (!logicalrep_should_publish_column(att, columns,
938 include_gencols_type))
939 continue;
940
941 nliveatts++;
942 }
943 pq_sendint16(out, nliveatts);
944
945 /* fetch bitmap of REPLICATION IDENTITY attributes */
946 replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
947 if (!replidentfull)
948 idattrs = RelationGetIdentityKeyBitmap(rel);
949
950 /* send the attributes */
951 for (i = 0; i < desc->natts; i++)
952 {
953 Form_pg_attribute att = TupleDescAttr(desc, i);
954 uint8 flags = 0;
955
956 if (!logicalrep_should_publish_column(att, columns,
957 include_gencols_type))
958 continue;
959
960 /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
961 if (replidentfull ||
963 idattrs))
965
966 pq_sendbyte(out, flags);
967
968 /* attribute name */
969 pq_sendstring(out, NameStr(att->attname));
970
971 /* attribute type id */
972 pq_sendint32(out, (int) att->atttypid);
973
974 /* attribute mode */
975 pq_sendint32(out, att->atttypmod);
976 }
977
978 bms_free(idattrs);
979}
980
981/*
982 * Read relation attribute metadata from the stream.
983 */
984static void
986{
987 int i;
988 int natts;
989 char **attnames;
990 Oid *atttyps;
991 Bitmapset *attkeys = NULL;
992
993 natts = pq_getmsgint(in, 2);
994 attnames = palloc(natts * sizeof(char *));
995 atttyps = palloc(natts * sizeof(Oid));
996
997 /* read the attributes */
998 for (i = 0; i < natts; i++)
999 {
1000 uint8 flags;
1001
1002 /* Check for replica identity column */
1003 flags = pq_getmsgbyte(in);
1005 attkeys = bms_add_member(attkeys, i);
1006
1007 /* attribute name */
1008 attnames[i] = pstrdup(pq_getmsgstring(in));
1009
1010 /* attribute type id */
1011 atttyps[i] = (Oid) pq_getmsgint(in, 4);
1012
1013 /* we ignore attribute mode for now */
1014 (void) pq_getmsgint(in, 4);
1015 }
1016
1017 rel->attnames = attnames;
1018 rel->atttyps = atttyps;
1019 rel->attkeys = attkeys;
1020 rel->natts = natts;
1021}
1022
1023/*
1024 * Write the namespace name or empty string for pg_catalog (to save space).
1025 */
1026static void
1028{
1029 if (nspid == PG_CATALOG_NAMESPACE)
1030 pq_sendbyte(out, '\0');
1031 else
1032 {
1033 char *nspname = get_namespace_name(nspid);
1034
1035 if (nspname == NULL)
1036 elog(ERROR, "cache lookup failed for namespace %u",
1037 nspid);
1038
1039 pq_sendstring(out, nspname);
1040 }
1041}
1042
1043/*
1044 * Read the namespace name while treating empty string as pg_catalog.
1045 */
1046static const char *
1048{
1049 const char *nspname = pq_getmsgstring(in);
1050
1051 if (nspname[0] == '\0')
1052 nspname = "pg_catalog";
1053
1054 return nspname;
1055}
1056
1057/*
1058 * Write the information for the start stream message to the output stream.
1059 */
1060void
1062 TransactionId xid, bool first_segment)
1063{
1065
1067
1068 /* transaction ID (we're starting to stream, so must be valid) */
1069 pq_sendint32(out, xid);
1070
1071 /* 1 if this is the first streaming segment for this xid */
1072 pq_sendbyte(out, first_segment ? 1 : 0);
1073}
1074
1075/*
1076 * Read the information about the start stream message from output stream.
1077 */
1080{
1081 TransactionId xid;
1082
1083 Assert(first_segment);
1084
1085 xid = pq_getmsgint(in, 4);
1086 *first_segment = (pq_getmsgbyte(in) == 1);
1087
1088 return xid;
1089}
1090
1091/*
1092 * Write the stop stream message to the output stream.
1093 */
1094void
1096{
1098}
1099
1100/*
1101 * Write STREAM COMMIT to the output stream.
1102 */
1103void
1105 XLogRecPtr commit_lsn)
1106{
1107 uint8 flags = 0;
1108
1110
1112
1113 /* transaction ID */
1114 pq_sendint32(out, txn->xid);
1115
1116 /* send the flags field (unused for now) */
1117 pq_sendbyte(out, flags);
1118
1119 /* send fields */
1120 pq_sendint64(out, commit_lsn);
1121 pq_sendint64(out, txn->end_lsn);
1123}
1124
1125/*
1126 * Read STREAM COMMIT from the output stream.
1127 */
1130{
1131 TransactionId xid;
1132 uint8 flags;
1133
1134 xid = pq_getmsgint(in, 4);
1135
1136 /* read flags (unused for now) */
1137 flags = pq_getmsgbyte(in);
1138
1139 if (flags != 0)
1140 elog(ERROR, "unrecognized flags %u in commit message", flags);
1141
1142 /* read fields */
1143 commit_data->commit_lsn = pq_getmsgint64(in);
1144 commit_data->end_lsn = pq_getmsgint64(in);
1145 commit_data->committime = pq_getmsgint64(in);
1146
1147 return xid;
1148}
1149
1150/*
1151 * Write STREAM ABORT to the output stream. Note that xid and subxid will be
1152 * same for the top-level transaction abort.
1153 *
1154 * If write_abort_info is true, send the abort_lsn and abort_time fields,
1155 * otherwise don't.
1156 */
1157void
1159 TransactionId subxid, XLogRecPtr abort_lsn,
1160 TimestampTz abort_time, bool write_abort_info)
1161{
1163
1165
1166 /* transaction ID */
1167 pq_sendint32(out, xid);
1168 pq_sendint32(out, subxid);
1169
1170 if (write_abort_info)
1171 {
1172 pq_sendint64(out, abort_lsn);
1173 pq_sendint64(out, abort_time);
1174 }
1175}
1176
1177/*
1178 * Read STREAM ABORT from the output stream.
1179 *
1180 * If read_abort_info is true, read the abort_lsn and abort_time fields,
1181 * otherwise don't.
1182 */
1183void
1185 LogicalRepStreamAbortData *abort_data,
1186 bool read_abort_info)
1187{
1188 Assert(abort_data);
1189
1190 abort_data->xid = pq_getmsgint(in, 4);
1191 abort_data->subxid = pq_getmsgint(in, 4);
1192
1193 if (read_abort_info)
1194 {
1195 abort_data->abort_lsn = pq_getmsgint64(in);
1196 abort_data->abort_time = pq_getmsgint64(in);
1197 }
1198 else
1199 {
1200 abort_data->abort_lsn = InvalidXLogRecPtr;
1201 abort_data->abort_time = 0;
1202 }
1203}
1204
1205/*
1206 * Get string representing LogicalRepMsgType.
1207 */
1208const char *
1210{
1211 static char err_unknown[20];
1212
1213 switch (action)
1214 {
1216 return "BEGIN";
1218 return "COMMIT";
1220 return "ORIGIN";
1222 return "INSERT";
1224 return "UPDATE";
1226 return "DELETE";
1228 return "TRUNCATE";
1230 return "RELATION";
1232 return "TYPE";
1234 return "MESSAGE";
1236 return "BEGIN PREPARE";
1238 return "PREPARE";
1240 return "COMMIT PREPARED";
1242 return "ROLLBACK PREPARED";
1244 return "STREAM START";
1246 return "STREAM STOP";
1248 return "STREAM COMMIT";
1250 return "STREAM ABORT";
1252 return "STREAM PREPARE";
1253 }
1254
1255 /*
1256 * This message provides context in the error raised when applying a
1257 * logical message. So we can't throw an error here. Return an unknown
1258 * indicator value so that the original error is still reported.
1259 */
1260 snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);
1261
1262 return err_unknown;
1263}
1264
1265/*
1266 * Check if the column 'att' of a table should be published.
1267 *
1268 * 'columns' represents the publication column list (if any) for that table.
1269 *
1270 * 'include_gencols_type' value indicates whether generated columns should be
1271 * published when there is no column list. Typically, this will have the same
1272 * value as the 'publish_generated_columns' publication parameter.
1273 *
1274 * Note that generated columns can be published only when present in a
1275 * publication column list, or when include_gencols_type is
1276 * PUBLISH_GENCOLS_STORED.
1277 */
1278bool
1280 PublishGencolsType include_gencols_type)
1281{
1282 if (att->attisdropped)
1283 return false;
1284
1285 /* If a column list is provided, publish only the cols in that list. */
1286 if (columns)
1287 return bms_is_member(att->attnum, columns);
1288
1289 /* All non-generated columns are always published. */
1290 if (!att->attgenerated)
1291 return true;
1292
1293 /*
1294 * Stored generated columns are only published when the user sets
1295 * publish_generated_columns as stored.
1296 */
1297 if (att->attgenerated == ATTRIBUTE_GENERATED_STORED)
1298 return include_gencols_type == PUBLISH_GENCOLS_STORED;
1299
1300 return false;
1301}
void bms_free(Bitmapset *a)
Definition: bitmapset.c:239
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:510
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:815
static Datum values[MAXATTR]
Definition: bootstrap.c:151
#define NameStr(name)
Definition: c.h:717
uint8_t uint8
Definition: c.h:500
#define VARHDRSZ
Definition: c.h:663
uint16_t uint16
Definition: c.h:501
uint32 TransactionId
Definition: c.h:623
#define OidIsValid(objectId)
Definition: c.h:746
size_t Size
Definition: c.h:576
int nspid
int64 TimestampTz
Definition: timestamp.h:39
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1763
bytea * OidSendFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1782
Assert(PointerIsAligned(start, uint64))
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
Definition: htup_details.h:728
static struct @165 value
int i
Definition: isn.c:74
List * lappend_oid(List *list, Oid datum)
Definition: list.c:375
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:97
LogicalRepMsgType
Definition: logicalproto.h:58
@ LOGICAL_REP_MSG_INSERT
Definition: logicalproto.h:62
@ LOGICAL_REP_MSG_TRUNCATE
Definition: logicalproto.h:65
@ LOGICAL_REP_MSG_STREAM_STOP
Definition: logicalproto.h:74
@ LOGICAL_REP_MSG_BEGIN
Definition: logicalproto.h:59
@ LOGICAL_REP_MSG_STREAM_PREPARE
Definition: logicalproto.h:77
@ LOGICAL_REP_MSG_STREAM_ABORT
Definition: logicalproto.h:76
@ LOGICAL_REP_MSG_BEGIN_PREPARE
Definition: logicalproto.h:69
@ LOGICAL_REP_MSG_STREAM_START
Definition: logicalproto.h:73
@ LOGICAL_REP_MSG_COMMIT
Definition: logicalproto.h:60
@ LOGICAL_REP_MSG_PREPARE
Definition: logicalproto.h:70
@ LOGICAL_REP_MSG_RELATION
Definition: logicalproto.h:66
@ LOGICAL_REP_MSG_MESSAGE
Definition: logicalproto.h:68
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
Definition: logicalproto.h:72
@ LOGICAL_REP_MSG_COMMIT_PREPARED
Definition: logicalproto.h:71
@ LOGICAL_REP_MSG_TYPE
Definition: logicalproto.h:67
@ LOGICAL_REP_MSG_DELETE
Definition: logicalproto.h:64
@ LOGICAL_REP_MSG_STREAM_COMMIT
Definition: logicalproto.h:75
@ LOGICAL_REP_MSG_ORIGIN
Definition: logicalproto.h:61
@ LOGICAL_REP_MSG_UPDATE
Definition: logicalproto.h:63
uint32 LogicalRepRelId
Definition: logicalproto.h:101
#define LOGICALREP_COLUMN_NULL
Definition: logicalproto.h:96
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:99
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:98
Oid getBaseType(Oid typid)
Definition: lsyscache.c:2604
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3449
char * pstrdup(const char *in)
Definition: mcxt.c:1699
void pfree(void *pointer)
Definition: mcxt.c:1524
void * palloc0(Size size)
Definition: mcxt.c:1347
void * palloc(Size size)
Definition: mcxt.c:1317
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:200
NameData relname
Definition: pg_class.h:38
const void size_t len
#define NIL
Definition: pg_list.h:68
FormData_pg_type * Form_pg_type
Definition: pg_type.h:261
#define snprintf
Definition: port.h:239
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
uintptr_t Datum
Definition: postgres.h:69
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:257
unsigned int Oid
Definition: postgres_ext.h:32
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:415
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
Definition: pqformat.c:126
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:579
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:195
void pq_copymsgbytes(StringInfo msg, void *buf, int datalen)
Definition: pqformat.c:528
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:453
void pq_sendcountedtext(StringInfo buf, const char *str, int slen)
Definition: pqformat.c:142
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:144
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152
static void pq_sendint8(StringInfo buf, uint8 i)
Definition: pqformat.h:128
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:136
static void pq_sendint(StringInfo buf, uint32 i, int b)
Definition: pqformat.h:171
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:98
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:1027
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:30
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:561
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:78
void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: proto.c:293
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
Definition: proto.c:325
static void logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:155
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:403
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:374
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:667
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
Definition: proto.c:134
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid, XLogRecPtr abort_lsn, TimestampTz abort_time, bool write_abort_info)
Definition: proto.c:1158
static void logicalrep_read_prepare_common(StringInfo in, char *msgtype, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:199
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:754
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:487
void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: proto.c:640
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
Definition: proto.c:985
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:615
void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
Definition: proto.c:1184
void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:187
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:63
#define MESSAGE_TRANSACTIONAL
Definition: proto.c:28
char * logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
Definition: proto.c:390
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
Definition: proto.c:723
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:528
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:583
static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:767
#define LOGICALREP_IS_REPLICA_IDENTITY
Definition: proto.c:26
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:861
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:49
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
Definition: proto.c:267
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:698
void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:237
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:1104
void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:353
#define TRUNCATE_CASCADE
Definition: proto.c:29
const char * logicalrep_message_type(LogicalRepMsgType action)
Definition: proto.c:1209
void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:116
static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:921
bool logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:1279
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:365
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
Definition: proto.c:1061
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:450
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:1129
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:428
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:228
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:1047
void logicalrep_write_stream_stop(StringInfo out)
Definition: proto.c:1095
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition: proto.c:1079
#define RelationGetRelid(relation)
Definition: rel.h:512
#define RelationGetDescr(relation)
Definition: rel.h:538
#define RelationGetRelationName(relation)
Definition: rel.h:546
#define RelationGetNamespace(relation)
Definition: rel.h:553
Bitmapset * RelationGetIdentityKeyBitmap(Relation relation)
Definition: relcache.c:5504
#define rbtxn_is_prepared(txn)
static void initStringInfoFromString(StringInfo str, char *data, int len)
Definition: stringinfo.h:175
Definition: pg_list.h:54
XLogRecPtr final_lsn
Definition: logicalproto.h:129
TransactionId xid
Definition: logicalproto.h:131
TimestampTz committime
Definition: logicalproto.h:130
TimestampTz committime
Definition: logicalproto.h:138
LogicalRepRelId remoteid
Definition: logicalproto.h:107
Bitmapset * attkeys
Definition: logicalproto.h:115
StringInfoData * colvalues
Definition: logicalproto.h:87
Form_pg_class rd_rel
Definition: rel.h:111
TimestampTz commit_time
XLogRecPtr final_lsn
XLogRecPtr end_lsn
TimestampTz prepare_time
TransactionId xid
union ReorderBufferTXN::@116 xact_time
bool * tts_isnull
Definition: tuptable.h:127
Datum * tts_values
Definition: tuptable.h:125
Definition: c.h:658
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:269
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:221
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:154
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:368
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: varatt.h:290
#define VARDATA(PTR)
Definition: varatt.h:278
#define VARSIZE(PTR)
Definition: varatt.h:279
const char * type
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28