PostgreSQL Source Code  git master
proto.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * proto.c
4  * logical replication protocol functions
5  *
6  * Copyright (c) 2015-2021, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/logical/proto.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/sysattr.h"
16 #include "catalog/pg_namespace.h"
17 #include "catalog/pg_type.h"
18 #include "libpq/pqformat.h"
20 #include "utils/lsyscache.h"
21 #include "utils/syscache.h"
22 
23 /*
24  * Protocol message flags.
25  */
26 #define LOGICALREP_IS_REPLICA_IDENTITY 1
27 
28 #define MESSAGE_TRANSACTIONAL (1<<0)
29 #define TRUNCATE_CASCADE (1<<0)
30 #define TRUNCATE_RESTART_SEQS (1<<1)
31 
32 static void logicalrep_write_attrs(StringInfo out, Relation rel);
33 static void logicalrep_write_tuple(StringInfo out, Relation rel,
34  HeapTuple tuple, bool binary);
35 
38 
39 static void logicalrep_write_namespace(StringInfo out, Oid nspid);
40 static const char *logicalrep_read_namespace(StringInfo in);
41 
42 /*
43  * Write BEGIN to the output stream.
44  */
45 void
47 {
49 
50  /* fixed fields */
51  pq_sendint64(out, txn->final_lsn);
53  pq_sendint32(out, txn->xid);
54 }
55 
56 /*
57  * Read transaction BEGIN from the stream.
58  */
59 void
61 {
62  /* read fields */
63  begin_data->final_lsn = pq_getmsgint64(in);
64  if (begin_data->final_lsn == InvalidXLogRecPtr)
65  elog(ERROR, "final_lsn not set in begin message");
66  begin_data->committime = pq_getmsgint64(in);
67  begin_data->xid = pq_getmsgint(in, 4);
68 }
69 
70 
71 /*
72  * Write COMMIT to the output stream.
73  */
74 void
76  XLogRecPtr commit_lsn)
77 {
78  uint8 flags = 0;
79 
81 
82  /* send the flags field (unused for now) */
83  pq_sendbyte(out, flags);
84 
85  /* send fields */
86  pq_sendint64(out, commit_lsn);
87  pq_sendint64(out, txn->end_lsn);
89 }
90 
91 /*
92  * Read transaction COMMIT from the stream.
93  */
94 void
96 {
97  /* read flags (unused for now) */
98  uint8 flags = pq_getmsgbyte(in);
99 
100  if (flags != 0)
101  elog(ERROR, "unrecognized flags %u in commit message", flags);
102 
103  /* read fields */
104  commit_data->commit_lsn = pq_getmsgint64(in);
105  commit_data->end_lsn = pq_getmsgint64(in);
106  commit_data->committime = pq_getmsgint64(in);
107 }
108 
109 /*
110  * Write BEGIN PREPARE to the output stream.
111  */
112 void
114 {
116 
117  /* fixed fields */
118  pq_sendint64(out, txn->final_lsn);
119  pq_sendint64(out, txn->end_lsn);
121  pq_sendint32(out, txn->xid);
122 
123  /* send gid */
124  pq_sendstring(out, txn->gid);
125 }
126 
127 /*
128  * Read transaction BEGIN PREPARE from the stream.
129  */
130 void
132 {
133  /* read fields */
134  begin_data->prepare_lsn = pq_getmsgint64(in);
135  if (begin_data->prepare_lsn == InvalidXLogRecPtr)
136  elog(ERROR, "prepare_lsn not set in begin prepare message");
137  begin_data->end_lsn = pq_getmsgint64(in);
138  if (begin_data->end_lsn == InvalidXLogRecPtr)
139  elog(ERROR, "end_lsn not set in begin prepare message");
140  begin_data->prepare_time = pq_getmsgint64(in);
141  begin_data->xid = pq_getmsgint(in, 4);
142 
143  /* read gid (copy it into a pre-allocated buffer) */
144  strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
145 }
146 
147 /*
148  * The core functionality for logicalrep_write_prepare and
149  * logicalrep_write_stream_prepare.
150  */
151 static void
153  ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
154 {
155  uint8 flags = 0;
156 
157  pq_sendbyte(out, type);
158 
159  /*
160  * This should only ever happen for two-phase commit transactions, in
161  * which case we expect to have a valid GID.
162  */
163  Assert(txn->gid != NULL);
164  Assert(rbtxn_prepared(txn));
166 
167  /* send the flags field */
168  pq_sendbyte(out, flags);
169 
170  /* send fields */
171  pq_sendint64(out, prepare_lsn);
172  pq_sendint64(out, txn->end_lsn);
174  pq_sendint32(out, txn->xid);
175 
176  /* send gid */
177  pq_sendstring(out, txn->gid);
178 }
179 
180 /*
181  * Write PREPARE to the output stream.
182  */
183 void
185  XLogRecPtr prepare_lsn)
186 {
188  txn, prepare_lsn);
189 }
190 
191 /*
192  * The core functionality for logicalrep_read_prepare and
193  * logicalrep_read_stream_prepare.
194  */
195 static void
197  LogicalRepPreparedTxnData *prepare_data)
198 {
199  /* read flags */
200  uint8 flags = pq_getmsgbyte(in);
201 
202  if (flags != 0)
203  elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
204 
205  /* read fields */
206  prepare_data->prepare_lsn = pq_getmsgint64(in);
207  if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
208  elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
209  prepare_data->end_lsn = pq_getmsgint64(in);
210  if (prepare_data->end_lsn == InvalidXLogRecPtr)
211  elog(ERROR, "end_lsn is not set in %s message", msgtype);
212  prepare_data->prepare_time = pq_getmsgint64(in);
213  prepare_data->xid = pq_getmsgint(in, 4);
214  if (prepare_data->xid == InvalidTransactionId)
215  elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
216 
217  /* read gid (copy it into a pre-allocated buffer) */
218  strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
219 }
220 
221 /*
222  * Read transaction PREPARE from the stream.
223  */
224 void
226 {
227  logicalrep_read_prepare_common(in, "prepare", prepare_data);
228 }
229 
230 /*
231  * Write COMMIT PREPARED to the output stream.
232  */
233 void
235  XLogRecPtr commit_lsn)
236 {
237  uint8 flags = 0;
238 
240 
241  /*
242  * This should only ever happen for two-phase commit transactions, in
243  * which case we expect to have a valid GID.
244  */
245  Assert(txn->gid != NULL);
246 
247  /* send the flags field */
248  pq_sendbyte(out, flags);
249 
250  /* send fields */
251  pq_sendint64(out, commit_lsn);
252  pq_sendint64(out, txn->end_lsn);
254  pq_sendint32(out, txn->xid);
255 
256  /* send gid */
257  pq_sendstring(out, txn->gid);
258 }
259 
260 /*
261  * Read transaction COMMIT PREPARED from the stream.
262  */
263 void
265 {
266  /* read flags */
267  uint8 flags = pq_getmsgbyte(in);
268 
269  if (flags != 0)
270  elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
271 
272  /* read fields */
273  prepare_data->commit_lsn = pq_getmsgint64(in);
274  if (prepare_data->commit_lsn == InvalidXLogRecPtr)
275  elog(ERROR, "commit_lsn is not set in commit prepared message");
276  prepare_data->end_lsn = pq_getmsgint64(in);
277  if (prepare_data->end_lsn == InvalidXLogRecPtr)
278  elog(ERROR, "end_lsn is not set in commit prepared message");
279  prepare_data->commit_time = pq_getmsgint64(in);
280  prepare_data->xid = pq_getmsgint(in, 4);
281 
282  /* read gid (copy it into a pre-allocated buffer) */
283  strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
284 }
285 
286 /*
287  * Write ROLLBACK PREPARED to the output stream.
288  */
289 void
291  XLogRecPtr prepare_end_lsn,
292  TimestampTz prepare_time)
293 {
294  uint8 flags = 0;
295 
297 
298  /*
299  * This should only ever happen for two-phase commit transactions, in
300  * which case we expect to have a valid GID.
301  */
302  Assert(txn->gid != NULL);
303 
304  /* send the flags field */
305  pq_sendbyte(out, flags);
306 
307  /* send fields */
308  pq_sendint64(out, prepare_end_lsn);
309  pq_sendint64(out, txn->end_lsn);
310  pq_sendint64(out, prepare_time);
312  pq_sendint32(out, txn->xid);
313 
314  /* send gid */
315  pq_sendstring(out, txn->gid);
316 }
317 
318 /*
319  * Read transaction ROLLBACK PREPARED from the stream.
320  */
321 void
323  LogicalRepRollbackPreparedTxnData *rollback_data)
324 {
325  /* read flags */
326  uint8 flags = pq_getmsgbyte(in);
327 
328  if (flags != 0)
329  elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
330 
331  /* read fields */
332  rollback_data->prepare_end_lsn = pq_getmsgint64(in);
333  if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
334  elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
335  rollback_data->rollback_end_lsn = pq_getmsgint64(in);
336  if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
337  elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
338  rollback_data->prepare_time = pq_getmsgint64(in);
339  rollback_data->rollback_time = pq_getmsgint64(in);
340  rollback_data->xid = pq_getmsgint(in, 4);
341 
342  /* read gid (copy it into a pre-allocated buffer) */
343  strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
344 }
345 
346 /*
347  * Write STREAM PREPARE to the output stream.
348  */
349 void
351  ReorderBufferTXN *txn,
352  XLogRecPtr prepare_lsn)
353 {
355  txn, prepare_lsn);
356 }
357 
358 /*
359  * Read STREAM PREPARE from the stream.
360  */
361 void
363 {
364  logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
365 }
366 
367 /*
368  * Write ORIGIN to the output stream.
369  */
370 void
371 logicalrep_write_origin(StringInfo out, const char *origin,
372  XLogRecPtr origin_lsn)
373 {
375 
376  /* fixed fields */
377  pq_sendint64(out, origin_lsn);
378 
379  /* origin string */
380  pq_sendstring(out, origin);
381 }
382 
383 /*
384  * Read ORIGIN from the output stream.
385  */
386 char *
388 {
389  /* fixed fields */
390  *origin_lsn = pq_getmsgint64(in);
391 
392  /* return origin */
393  return pstrdup(pq_getmsgstring(in));
394 }
395 
396 /*
397  * Write INSERT to the output stream.
398  */
399 void
401  HeapTuple newtuple, bool binary)
402 {
404 
405  /* transaction ID (if not valid, we're not streaming) */
406  if (TransactionIdIsValid(xid))
407  pq_sendint32(out, xid);
408 
409  /* use Oid as relation identifier */
410  pq_sendint32(out, RelationGetRelid(rel));
411 
412  pq_sendbyte(out, 'N'); /* new tuple follows */
413  logicalrep_write_tuple(out, rel, newtuple, binary);
414 }
415 
416 /*
417  * Read INSERT from stream.
418  *
419  * Fills the new tuple.
420  */
423 {
424  char action;
425  LogicalRepRelId relid;
426 
427  /* read the relation id */
428  relid = pq_getmsgint(in, 4);
429 
430  action = pq_getmsgbyte(in);
431  if (action != 'N')
432  elog(ERROR, "expected new tuple but got %d",
433  action);
434 
435  logicalrep_read_tuple(in, newtup);
436 
437  return relid;
438 }
439 
440 /*
441  * Write UPDATE to the output stream.
442  */
443 void
445  HeapTuple oldtuple, HeapTuple newtuple, bool binary)
446 {
448 
449  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
450  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
451  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
452 
453  /* transaction ID (if not valid, we're not streaming) */
454  if (TransactionIdIsValid(xid))
455  pq_sendint32(out, xid);
456 
457  /* use Oid as relation identifier */
458  pq_sendint32(out, RelationGetRelid(rel));
459 
460  if (oldtuple != NULL)
461  {
462  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
463  pq_sendbyte(out, 'O'); /* old tuple follows */
464  else
465  pq_sendbyte(out, 'K'); /* old key follows */
466  logicalrep_write_tuple(out, rel, oldtuple, binary);
467  }
468 
469  pq_sendbyte(out, 'N'); /* new tuple follows */
470  logicalrep_write_tuple(out, rel, newtuple, binary);
471 }
472 
473 /*
474  * Read UPDATE from stream.
475  */
477 logicalrep_read_update(StringInfo in, bool *has_oldtuple,
478  LogicalRepTupleData *oldtup,
479  LogicalRepTupleData *newtup)
480 {
481  char action;
482  LogicalRepRelId relid;
483 
484  /* read the relation id */
485  relid = pq_getmsgint(in, 4);
486 
487  /* read and verify action */
488  action = pq_getmsgbyte(in);
489  if (action != 'K' && action != 'O' && action != 'N')
490  elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
491  action);
492 
493  /* check for old tuple */
494  if (action == 'K' || action == 'O')
495  {
496  logicalrep_read_tuple(in, oldtup);
497  *has_oldtuple = true;
498 
499  action = pq_getmsgbyte(in);
500  }
501  else
502  *has_oldtuple = false;
503 
504  /* check for new tuple */
505  if (action != 'N')
506  elog(ERROR, "expected action 'N', got %c",
507  action);
508 
509  logicalrep_read_tuple(in, newtup);
510 
511  return relid;
512 }
513 
514 /*
515  * Write DELETE to the output stream.
516  */
517 void
519  HeapTuple oldtuple, bool binary)
520 {
521  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
522  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
523  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
524 
526 
527  /* transaction ID (if not valid, we're not streaming) */
528  if (TransactionIdIsValid(xid))
529  pq_sendint32(out, xid);
530 
531  /* use Oid as relation identifier */
532  pq_sendint32(out, RelationGetRelid(rel));
533 
534  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
535  pq_sendbyte(out, 'O'); /* old tuple follows */
536  else
537  pq_sendbyte(out, 'K'); /* old key follows */
538 
539  logicalrep_write_tuple(out, rel, oldtuple, binary);
540 }
541 
542 /*
543  * Read DELETE from stream.
544  *
545  * Fills the old tuple.
546  */
549 {
550  char action;
551  LogicalRepRelId relid;
552 
553  /* read the relation id */
554  relid = pq_getmsgint(in, 4);
555 
556  /* read and verify action */
557  action = pq_getmsgbyte(in);
558  if (action != 'K' && action != 'O')
559  elog(ERROR, "expected action 'O' or 'K', got %c", action);
560 
561  logicalrep_read_tuple(in, oldtup);
562 
563  return relid;
564 }
565 
566 /*
567  * Write TRUNCATE to the output stream.
568  */
569 void
571  TransactionId xid,
572  int nrelids,
573  Oid relids[],
574  bool cascade, bool restart_seqs)
575 {
576  int i;
577  uint8 flags = 0;
578 
580 
581  /* transaction ID (if not valid, we're not streaming) */
582  if (TransactionIdIsValid(xid))
583  pq_sendint32(out, xid);
584 
585  pq_sendint32(out, nrelids);
586 
587  /* encode and send truncate flags */
588  if (cascade)
589  flags |= TRUNCATE_CASCADE;
590  if (restart_seqs)
591  flags |= TRUNCATE_RESTART_SEQS;
592  pq_sendint8(out, flags);
593 
594  for (i = 0; i < nrelids; i++)
595  pq_sendint32(out, relids[i]);
596 }
597 
598 /*
599  * Read TRUNCATE from stream.
600  */
601 List *
603  bool *cascade, bool *restart_seqs)
604 {
605  int i;
606  int nrelids;
607  List *relids = NIL;
608  uint8 flags;
609 
610  nrelids = pq_getmsgint(in, 4);
611 
612  /* read and decode truncate flags */
613  flags = pq_getmsgint(in, 1);
614  *cascade = (flags & TRUNCATE_CASCADE) > 0;
615  *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
616 
617  for (i = 0; i < nrelids; i++)
618  relids = lappend_oid(relids, pq_getmsgint(in, 4));
619 
620  return relids;
621 }
622 
623 /*
624  * Write MESSAGE to stream
625  */
626 void
628  bool transactional, const char *prefix, Size sz,
629  const char *message)
630 {
631  uint8 flags = 0;
632 
634 
635  /* encode and send message flags */
636  if (transactional)
637  flags |= MESSAGE_TRANSACTIONAL;
638 
639  /* transaction ID (if not valid, we're not streaming) */
640  if (TransactionIdIsValid(xid))
641  pq_sendint32(out, xid);
642 
643  pq_sendint8(out, flags);
644  pq_sendint64(out, lsn);
645  pq_sendstring(out, prefix);
646  pq_sendint32(out, sz);
647  pq_sendbytes(out, message, sz);
648 }
649 
650 /*
651  * Write relation description to the output stream.
652  */
653 void
655 {
656  char *relname;
657 
659 
660  /* transaction ID (if not valid, we're not streaming) */
661  if (TransactionIdIsValid(xid))
662  pq_sendint32(out, xid);
663 
664  /* use Oid as relation identifier */
665  pq_sendint32(out, RelationGetRelid(rel));
666 
667  /* send qualified relation name */
669  relname = RelationGetRelationName(rel);
670  pq_sendstring(out, relname);
671 
672  /* send replica identity */
673  pq_sendbyte(out, rel->rd_rel->relreplident);
674 
675  /* send the attribute info */
676  logicalrep_write_attrs(out, rel);
677 }
678 
679 /*
680  * Read the relation info from stream and return as LogicalRepRelation.
681  */
684 {
686 
687  rel->remoteid = pq_getmsgint(in, 4);
688 
689  /* Read relation name from stream */
691  rel->relname = pstrdup(pq_getmsgstring(in));
692 
693  /* Read the replica identity. */
694  rel->replident = pq_getmsgbyte(in);
695 
696  /* Get attribute description */
697  logicalrep_read_attrs(in, rel);
698 
699  return rel;
700 }
701 
702 /*
703  * Write type info to the output stream.
704  *
705  * This function will always write base type info.
706  */
707 void
709 {
710  Oid basetypoid = getBaseType(typoid);
711  HeapTuple tup;
712  Form_pg_type typtup;
713 
715 
716  /* transaction ID (if not valid, we're not streaming) */
717  if (TransactionIdIsValid(xid))
718  pq_sendint32(out, xid);
719 
720  tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
721  if (!HeapTupleIsValid(tup))
722  elog(ERROR, "cache lookup failed for type %u", basetypoid);
723  typtup = (Form_pg_type) GETSTRUCT(tup);
724 
725  /* use Oid as relation identifier */
726  pq_sendint32(out, typoid);
727 
728  /* send qualified type name */
729  logicalrep_write_namespace(out, typtup->typnamespace);
730  pq_sendstring(out, NameStr(typtup->typname));
731 
732  ReleaseSysCache(tup);
733 }
734 
735 /*
736  * Read type info from the output stream.
737  */
738 void
740 {
741  ltyp->remoteid = pq_getmsgint(in, 4);
742 
743  /* Read type name from stream */
745  ltyp->typname = pstrdup(pq_getmsgstring(in));
746 }
747 
748 /*
749  * Write a tuple to the outputstream, in the most efficient format possible.
750  */
751 static void
753 {
754  TupleDesc desc;
756  bool isnull[MaxTupleAttributeNumber];
757  int i;
758  uint16 nliveatts = 0;
759 
760  desc = RelationGetDescr(rel);
761 
762  for (i = 0; i < desc->natts; i++)
763  {
764  if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
765  continue;
766  nliveatts++;
767  }
768  pq_sendint16(out, nliveatts);
769 
770  /* try to allocate enough memory from the get-go */
771  enlargeStringInfo(out, tuple->t_len +
772  nliveatts * (1 + 4));
773 
774  heap_deform_tuple(tuple, desc, values, isnull);
775 
776  /* Write the values */
777  for (i = 0; i < desc->natts; i++)
778  {
779  HeapTuple typtup;
780  Form_pg_type typclass;
781  Form_pg_attribute att = TupleDescAttr(desc, i);
782 
783  if (att->attisdropped || att->attgenerated)
784  continue;
785 
786  if (isnull[i])
787  {
789  continue;
790  }
791 
792  if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
793  {
794  /*
795  * Unchanged toasted datum. (Note that we don't promise to detect
796  * unchanged data in general; this is just a cheap check to avoid
797  * sending large values unnecessarily.)
798  */
800  continue;
801  }
802 
803  typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
804  if (!HeapTupleIsValid(typtup))
805  elog(ERROR, "cache lookup failed for type %u", att->atttypid);
806  typclass = (Form_pg_type) GETSTRUCT(typtup);
807 
808  /*
809  * Send in binary if requested and type has suitable send function.
810  */
811  if (binary && OidIsValid(typclass->typsend))
812  {
813  bytea *outputbytes;
814  int len;
815 
817  outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
818  len = VARSIZE(outputbytes) - VARHDRSZ;
819  pq_sendint(out, len, 4); /* length */
820  pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
821  pfree(outputbytes);
822  }
823  else
824  {
825  char *outputstr;
826 
828  outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
829  pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
830  pfree(outputstr);
831  }
832 
833  ReleaseSysCache(typtup);
834  }
835 }
836 
837 /*
838  * Read tuple in logical replication format from stream.
839  */
840 static void
842 {
843  int i;
844  int natts;
845 
846  /* Get number of attributes */
847  natts = pq_getmsgint(in, 2);
848 
849  /* Allocate space for per-column values; zero out unused StringInfoDatas */
850  tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
851  tuple->colstatus = (char *) palloc(natts * sizeof(char));
852  tuple->ncols = natts;
853 
854  /* Read the data */
855  for (i = 0; i < natts; i++)
856  {
857  char kind;
858  int len;
859  StringInfo value = &tuple->colvalues[i];
860 
861  kind = pq_getmsgbyte(in);
862  tuple->colstatus[i] = kind;
863 
864  switch (kind)
865  {
867  /* nothing more to do */
868  break;
870  /* we don't receive the value of an unchanged column */
871  break;
873  len = pq_getmsgint(in, 4); /* read length */
874 
875  /* and data */
876  value->data = palloc(len + 1);
877  pq_copymsgbytes(in, value->data, len);
878  value->data[len] = '\0';
879  /* make StringInfo fully valid */
880  value->len = len;
881  value->cursor = 0;
882  value->maxlen = len;
883  break;
885  len = pq_getmsgint(in, 4); /* read length */
886 
887  /* and data */
888  value->data = palloc(len + 1);
889  pq_copymsgbytes(in, value->data, len);
890  /* not strictly necessary but per StringInfo practice */
891  value->data[len] = '\0';
892  /* make StringInfo fully valid */
893  value->len = len;
894  value->cursor = 0;
895  value->maxlen = len;
896  break;
897  default:
898  elog(ERROR, "unrecognized data representation type '%c'", kind);
899  }
900  }
901 }
902 
903 /*
904  * Write relation attribute metadata to the stream.
905  */
906 static void
908 {
909  TupleDesc desc;
910  int i;
911  uint16 nliveatts = 0;
912  Bitmapset *idattrs = NULL;
913  bool replidentfull;
914 
915  desc = RelationGetDescr(rel);
916 
917  /* send number of live attributes */
918  for (i = 0; i < desc->natts; i++)
919  {
920  if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
921  continue;
922  nliveatts++;
923  }
924  pq_sendint16(out, nliveatts);
925 
926  /* fetch bitmap of REPLICATION IDENTITY attributes */
927  replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
928  if (!replidentfull)
929  idattrs = RelationGetIdentityKeyBitmap(rel);
930 
931  /* send the attributes */
932  for (i = 0; i < desc->natts; i++)
933  {
934  Form_pg_attribute att = TupleDescAttr(desc, i);
935  uint8 flags = 0;
936 
937  if (att->attisdropped || att->attgenerated)
938  continue;
939 
940  /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
941  if (replidentfull ||
943  idattrs))
945 
946  pq_sendbyte(out, flags);
947 
948  /* attribute name */
949  pq_sendstring(out, NameStr(att->attname));
950 
951  /* attribute type id */
952  pq_sendint32(out, (int) att->atttypid);
953 
954  /* attribute mode */
955  pq_sendint32(out, att->atttypmod);
956  }
957 
958  bms_free(idattrs);
959 }
960 
961 /*
962  * Read relation attribute metadata from the stream.
963  */
964 static void
966 {
967  int i;
968  int natts;
969  char **attnames;
970  Oid *atttyps;
971  Bitmapset *attkeys = NULL;
972 
973  natts = pq_getmsgint(in, 2);
974  attnames = palloc(natts * sizeof(char *));
975  atttyps = palloc(natts * sizeof(Oid));
976 
977  /* read the attributes */
978  for (i = 0; i < natts; i++)
979  {
980  uint8 flags;
981 
982  /* Check for replica identity column */
983  flags = pq_getmsgbyte(in);
984  if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
985  attkeys = bms_add_member(attkeys, i);
986 
987  /* attribute name */
988  attnames[i] = pstrdup(pq_getmsgstring(in));
989 
990  /* attribute type id */
991  atttyps[i] = (Oid) pq_getmsgint(in, 4);
992 
993  /* we ignore attribute mode for now */
994  (void) pq_getmsgint(in, 4);
995  }
996 
997  rel->attnames = attnames;
998  rel->atttyps = atttyps;
999  rel->attkeys = attkeys;
1000  rel->natts = natts;
1001 }
1002 
1003 /*
1004  * Write the namespace name or empty string for pg_catalog (to save space).
1005  */
1006 static void
1008 {
1009  if (nspid == PG_CATALOG_NAMESPACE)
1010  pq_sendbyte(out, '\0');
1011  else
1012  {
1013  char *nspname = get_namespace_name(nspid);
1014 
1015  if (nspname == NULL)
1016  elog(ERROR, "cache lookup failed for namespace %u",
1017  nspid);
1018 
1019  pq_sendstring(out, nspname);
1020  }
1021 }
1022 
1023 /*
1024  * Read the namespace name while treating empty string as pg_catalog.
1025  */
1026 static const char *
1028 {
1029  const char *nspname = pq_getmsgstring(in);
1030 
1031  if (nspname[0] == '\0')
1032  nspname = "pg_catalog";
1033 
1034  return nspname;
1035 }
1036 
1037 /*
1038  * Write the information for the start stream message to the output stream.
1039  */
1040 void
1042  TransactionId xid, bool first_segment)
1043 {
1045 
1047 
1048  /* transaction ID (we're starting to stream, so must be valid) */
1049  pq_sendint32(out, xid);
1050 
1051  /* 1 if this is the first streaming segment for this xid */
1052  pq_sendbyte(out, first_segment ? 1 : 0);
1053 }
1054 
1055 /*
1056  * Read the information about the start stream message from output stream.
1057  */
1060 {
1061  TransactionId xid;
1062 
1063  Assert(first_segment);
1064 
1065  xid = pq_getmsgint(in, 4);
1066  *first_segment = (pq_getmsgbyte(in) == 1);
1067 
1068  return xid;
1069 }
1070 
1071 /*
1072  * Write the stop stream message to the output stream.
1073  */
1074 void
1076 {
1078 }
1079 
1080 /*
1081  * Write STREAM COMMIT to the output stream.
1082  */
1083 void
1085  XLogRecPtr commit_lsn)
1086 {
1087  uint8 flags = 0;
1088 
1090 
1092 
1093  /* transaction ID */
1094  pq_sendint32(out, txn->xid);
1095 
1096  /* send the flags field (unused for now) */
1097  pq_sendbyte(out, flags);
1098 
1099  /* send fields */
1100  pq_sendint64(out, commit_lsn);
1101  pq_sendint64(out, txn->end_lsn);
1102  pq_sendint64(out, txn->xact_time.commit_time);
1103 }
1104 
1105 /*
1106  * Read STREAM COMMIT from the output stream.
1107  */
1110 {
1111  TransactionId xid;
1112  uint8 flags;
1113 
1114  xid = pq_getmsgint(in, 4);
1115 
1116  /* read flags (unused for now) */
1117  flags = pq_getmsgbyte(in);
1118 
1119  if (flags != 0)
1120  elog(ERROR, "unrecognized flags %u in commit message", flags);
1121 
1122  /* read fields */
1123  commit_data->commit_lsn = pq_getmsgint64(in);
1124  commit_data->end_lsn = pq_getmsgint64(in);
1125  commit_data->committime = pq_getmsgint64(in);
1126 
1127  return xid;
1128 }
1129 
1130 /*
1131  * Write STREAM ABORT to the output stream. Note that xid and subxid will be
1132  * same for the top-level transaction abort.
1133  */
1134 void
1136  TransactionId subxid)
1137 {
1139 
1141 
1142  /* transaction ID */
1143  pq_sendint32(out, xid);
1144  pq_sendint32(out, subxid);
1145 }
1146 
1147 /*
1148  * Read STREAM ABORT from the output stream.
1149  */
1150 void
1152  TransactionId *subxid)
1153 {
1154  Assert(xid && subxid);
1155 
1156  *xid = pq_getmsgint(in, 4);
1157  *subxid = pq_getmsgint(in, 4);
1158 }
1159 
1160 /*
1161  * Get string representing LogicalRepMsgType.
1162  */
1163 char *
1165 {
1166  switch (action)
1167  {
1168  case LOGICAL_REP_MSG_BEGIN:
1169  return "BEGIN";
1171  return "COMMIT";
1173  return "ORIGIN";
1175  return "INSERT";
1177  return "UPDATE";
1179  return "DELETE";
1181  return "TRUNCATE";
1183  return "RELATION";
1184  case LOGICAL_REP_MSG_TYPE:
1185  return "TYPE";
1187  return "MESSAGE";
1189  return "BEGIN PREPARE";
1191  return "PREPARE";
1193  return "COMMIT PREPARED";
1195  return "ROLLBACK PREPARED";
1197  return "STREAM START";
1199  return "STREAM STOP";
1201  return "STREAM COMMIT";
1203  return "STREAM ABORT";
1205  return "STREAM PREPARE";
1206  }
1207 
1208  elog(ERROR, "invalid logical replication message type \"%c\"", action);
1209 
1210  return NULL; /* keep compiler quiet */
1211 }
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
Definition: proto.c:752
#define NIL
Definition: pg_list.h:65
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
TimestampTz commit_time
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: postgres.h:327
TransactionId xid
Definition: logicalproto.h:125
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define rbtxn_prepared(txn)
static void logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:152
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
Definition: proto.c:131
#define VARDATA(PTR)
Definition: postgres.h:315
#define GETSTRUCT(TUP)
Definition: htup_details.h:654
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:548
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:570
static void pq_sendint(StringInfo buf, uint32 i, int b)
Definition: pqformat.h:172
uint32 TransactionId
Definition: c.h:587
#define MaxTupleAttributeNumber
Definition: htup_details.h:33
void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: proto.c:627
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:1109
#define RelationGetDescr(relation)
Definition: rel.h:503
void logicalrep_write_stream_stop(StringInfo out)
Definition: proto.c:1075
#define VARSIZE(PTR)
Definition: postgres.h:316
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:60
int64 TimestampTz
Definition: timestamp.h:39
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define VARHDRSZ
Definition: c.h:627
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:602
char * pstrdup(const char *in)
Definition: mcxt.c:1299
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:1007
#define LOGICALREP_COLUMN_NULL
Definition: logicalproto.h:90
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:683
unsigned char uint8
Definition: c.h:439
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid)
Definition: proto.c:1135
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:92
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:75
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, bool binary)
Definition: proto.c:518
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:371
StringInfoData * colvalues
Definition: logicalproto.h:81
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
Form_pg_class rd_rel
Definition: rel.h:109
NameData relname
Definition: pg_class.h:38
unsigned int Oid
Definition: postgres_ext.h:31
List * lappend_oid(List *list, Oid datum)
Definition: list.c:372
#define OidIsValid(objectId)
Definition: c.h:710
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:93
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, HeapTuple newtuple, bool binary)
Definition: proto.c:400
Bitmapset * attkeys
Definition: logicalproto.h:109
static void logicalrep_read_prepare_common(StringInfo in, char *msgtype, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:196
Bitmapset * RelationGetIdentityKeyBitmap(Relation relation)
Definition: relcache.c:5247
LogicalRepRelId remoteid
Definition: logicalproto.h:101
#define LOGICALREP_IS_REPLICA_IDENTITY
Definition: proto.c:26
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:422
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
Definition: proto.c:1041
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:91
unsigned short uint16
Definition: c.h:440
void pfree(void *pointer)
Definition: mcxt.c:1169
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
Definition: proto.c:654
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define ERROR
Definition: elog.h:46
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:739
char * logicalrep_message_type(LogicalRepMsgType action)
Definition: proto.c:1164
bytea * OidSendFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1672
uint32 t_len
Definition: htup.h:64
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3316
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:841
#define InvalidTransactionId
Definition: transam.h:31
static void logicalrep_write_attrs(StringInfo out, Relation rel)
Definition: proto.c:907
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:283
#define RelationGetRelationName(relation)
Definition: rel.h:511
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:207
XLogRecPtr final_lsn
void pq_sendcountedtext(StringInfo buf, const char *str, int slen, bool countincludesself)
Definition: pqformat.c:142
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
Definition: proto.c:965
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:1084
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:46
XLogRecPtr final_lsn
Definition: logicalproto.h:123
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1127
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:225
void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:184
#define MESSAGE_TRANSACTIONAL
Definition: proto.c:28
void * palloc0(Size size)
Definition: mcxt.c:1093
uintptr_t Datum
Definition: postgres.h:411
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1175
void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: proto.c:290
TransactionId xid
static struct @143 value
union ReorderBufferTXN::@103 xact_time
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:1027
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:350
void bms_free(Bitmapset *a)
Definition: bitmapset.c:208
void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:113
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:362
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
void pq_copymsgbytes(StringInfo msg, char *buf, int datalen)
Definition: pqformat.c:530
XLogRecPtr end_lsn
LogicalRepMsgType
Definition: logicalproto.h:51
size_t Size
Definition: c.h:540
FormData_pg_type * Form_pg_type
Definition: pg_type.h:261
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:477
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:736
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:95
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
Definition: proto.c:322
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:1249
void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:234
static Datum values[MAXATTR]
Definition: bootstrap.c:156
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
Definition: proto.c:264
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition: proto.c:1059
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1653
void * palloc(Size size)
Definition: mcxt.c:1062
#define elog(elevel,...)
Definition: elog.h:232
int i
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
Definition: proto.c:708
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
#define TRUNCATE_CASCADE
Definition: proto.c:29
#define NameStr(name)
Definition: c.h:681
TimestampTz prepare_time
Definition: c.h:621
char * logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
Definition: proto.c:387
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary)
Definition: proto.c:444
TimestampTz committime
Definition: logicalproto.h:124
#define TransactionIdIsValid(xid)
Definition: transam.h:41
Oid getBaseType(Oid typid)
Definition: lsyscache.c:2468
static void pq_sendint8(StringInfo buf, uint8 i)
Definition: pqformat.h:129
Definition: pg_list.h:50
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:427
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:30
#define RelationGetRelid(relation)
Definition: rel.h:477
TimestampTz committime
Definition: logicalproto.h:132
void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid)
Definition: proto.c:1151
uint32 LogicalRepRelId
Definition: logicalproto.h:95
#define RelationGetNamespace(relation)
Definition: rel.h:518