PostgreSQL Source Code  git master
proto.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * proto.c
4  * logical replication protocol functions
5  *
6  * Copyright (c) 2015-2022, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/logical/proto.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/sysattr.h"
16 #include "catalog/pg_namespace.h"
17 #include "catalog/pg_type.h"
18 #include "libpq/pqformat.h"
20 #include "utils/lsyscache.h"
21 #include "utils/syscache.h"
22 
23 /*
24  * Protocol message flags.
25  */
26 #define LOGICALREP_IS_REPLICA_IDENTITY 1
27 
28 #define MESSAGE_TRANSACTIONAL (1<<0)
29 #define TRUNCATE_CASCADE (1<<0)
30 #define TRUNCATE_RESTART_SEQS (1<<1)
31 
32 static void logicalrep_write_attrs(StringInfo out, Relation rel,
33  Bitmapset *columns);
34 static void logicalrep_write_tuple(StringInfo out, Relation rel,
35  TupleTableSlot *slot,
36  bool binary, Bitmapset *columns);
39 
40 static void logicalrep_write_namespace(StringInfo out, Oid nspid);
41 static const char *logicalrep_read_namespace(StringInfo in);
42 
43 /*
44  * Check if a column is covered by a column list.
45  *
46  * Need to be careful about NULL, which is treated as a column list covering
47  * all columns.
48  */
49 static bool
51 {
52  return (columns == NULL || bms_is_member(attnum, columns));
53 }
54 
55 
56 /*
57  * Write BEGIN to the output stream.
58  */
59 void
61 {
63 
64  /* fixed fields */
65  pq_sendint64(out, txn->final_lsn);
67  pq_sendint32(out, txn->xid);
68 }
69 
70 /*
71  * Read transaction BEGIN from the stream.
72  */
73 void
75 {
76  /* read fields */
77  begin_data->final_lsn = pq_getmsgint64(in);
78  if (begin_data->final_lsn == InvalidXLogRecPtr)
79  elog(ERROR, "final_lsn not set in begin message");
80  begin_data->committime = pq_getmsgint64(in);
81  begin_data->xid = pq_getmsgint(in, 4);
82 }
83 
84 
85 /*
86  * Write COMMIT to the output stream.
87  */
88 void
90  XLogRecPtr commit_lsn)
91 {
92  uint8 flags = 0;
93 
95 
96  /* send the flags field (unused for now) */
97  pq_sendbyte(out, flags);
98 
99  /* send fields */
100  pq_sendint64(out, commit_lsn);
101  pq_sendint64(out, txn->end_lsn);
103 }
104 
105 /*
106  * Read transaction COMMIT from the stream.
107  */
108 void
110 {
111  /* read flags (unused for now) */
112  uint8 flags = pq_getmsgbyte(in);
113 
114  if (flags != 0)
115  elog(ERROR, "unrecognized flags %u in commit message", flags);
116 
117  /* read fields */
118  commit_data->commit_lsn = pq_getmsgint64(in);
119  commit_data->end_lsn = pq_getmsgint64(in);
120  commit_data->committime = pq_getmsgint64(in);
121 }
122 
123 /*
124  * Write BEGIN PREPARE to the output stream.
125  */
126 void
128 {
130 
131  /* fixed fields */
132  pq_sendint64(out, txn->final_lsn);
133  pq_sendint64(out, txn->end_lsn);
135  pq_sendint32(out, txn->xid);
136 
137  /* send gid */
138  pq_sendstring(out, txn->gid);
139 }
140 
141 /*
142  * Read transaction BEGIN PREPARE from the stream.
143  */
144 void
146 {
147  /* read fields */
148  begin_data->prepare_lsn = pq_getmsgint64(in);
149  if (begin_data->prepare_lsn == InvalidXLogRecPtr)
150  elog(ERROR, "prepare_lsn not set in begin prepare message");
151  begin_data->end_lsn = pq_getmsgint64(in);
152  if (begin_data->end_lsn == InvalidXLogRecPtr)
153  elog(ERROR, "end_lsn not set in begin prepare message");
154  begin_data->prepare_time = pq_getmsgint64(in);
155  begin_data->xid = pq_getmsgint(in, 4);
156 
157  /* read gid (copy it into a pre-allocated buffer) */
158  strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
159 }
160 
161 /*
162  * The core functionality for logicalrep_write_prepare and
163  * logicalrep_write_stream_prepare.
164  */
165 static void
167  ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
168 {
169  uint8 flags = 0;
170 
171  pq_sendbyte(out, type);
172 
173  /*
174  * This should only ever happen for two-phase commit transactions, in
175  * which case we expect to have a valid GID.
176  */
177  Assert(txn->gid != NULL);
178  Assert(rbtxn_prepared(txn));
180 
181  /* send the flags field */
182  pq_sendbyte(out, flags);
183 
184  /* send fields */
185  pq_sendint64(out, prepare_lsn);
186  pq_sendint64(out, txn->end_lsn);
188  pq_sendint32(out, txn->xid);
189 
190  /* send gid */
191  pq_sendstring(out, txn->gid);
192 }
193 
194 /*
195  * Write PREPARE to the output stream.
196  */
197 void
199  XLogRecPtr prepare_lsn)
200 {
202  txn, prepare_lsn);
203 }
204 
205 /*
206  * The core functionality for logicalrep_read_prepare and
207  * logicalrep_read_stream_prepare.
208  */
209 static void
211  LogicalRepPreparedTxnData *prepare_data)
212 {
213  /* read flags */
214  uint8 flags = pq_getmsgbyte(in);
215 
216  if (flags != 0)
217  elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
218 
219  /* read fields */
220  prepare_data->prepare_lsn = pq_getmsgint64(in);
221  if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
222  elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
223  prepare_data->end_lsn = pq_getmsgint64(in);
224  if (prepare_data->end_lsn == InvalidXLogRecPtr)
225  elog(ERROR, "end_lsn is not set in %s message", msgtype);
226  prepare_data->prepare_time = pq_getmsgint64(in);
227  prepare_data->xid = pq_getmsgint(in, 4);
228  if (prepare_data->xid == InvalidTransactionId)
229  elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
230 
231  /* read gid (copy it into a pre-allocated buffer) */
232  strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
233 }
234 
235 /*
236  * Read transaction PREPARE from the stream.
237  */
238 void
240 {
241  logicalrep_read_prepare_common(in, "prepare", prepare_data);
242 }
243 
244 /*
245  * Write COMMIT PREPARED to the output stream.
246  */
247 void
249  XLogRecPtr commit_lsn)
250 {
251  uint8 flags = 0;
252 
254 
255  /*
256  * This should only ever happen for two-phase commit transactions, in
257  * which case we expect to have a valid GID.
258  */
259  Assert(txn->gid != NULL);
260 
261  /* send the flags field */
262  pq_sendbyte(out, flags);
263 
264  /* send fields */
265  pq_sendint64(out, commit_lsn);
266  pq_sendint64(out, txn->end_lsn);
268  pq_sendint32(out, txn->xid);
269 
270  /* send gid */
271  pq_sendstring(out, txn->gid);
272 }
273 
274 /*
275  * Read transaction COMMIT PREPARED from the stream.
276  */
277 void
279 {
280  /* read flags */
281  uint8 flags = pq_getmsgbyte(in);
282 
283  if (flags != 0)
284  elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
285 
286  /* read fields */
287  prepare_data->commit_lsn = pq_getmsgint64(in);
288  if (prepare_data->commit_lsn == InvalidXLogRecPtr)
289  elog(ERROR, "commit_lsn is not set in commit prepared message");
290  prepare_data->end_lsn = pq_getmsgint64(in);
291  if (prepare_data->end_lsn == InvalidXLogRecPtr)
292  elog(ERROR, "end_lsn is not set in commit prepared message");
293  prepare_data->commit_time = pq_getmsgint64(in);
294  prepare_data->xid = pq_getmsgint(in, 4);
295 
296  /* read gid (copy it into a pre-allocated buffer) */
297  strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
298 }
299 
300 /*
301  * Write ROLLBACK PREPARED to the output stream.
302  */
303 void
305  XLogRecPtr prepare_end_lsn,
306  TimestampTz prepare_time)
307 {
308  uint8 flags = 0;
309 
311 
312  /*
313  * This should only ever happen for two-phase commit transactions, in
314  * which case we expect to have a valid GID.
315  */
316  Assert(txn->gid != NULL);
317 
318  /* send the flags field */
319  pq_sendbyte(out, flags);
320 
321  /* send fields */
322  pq_sendint64(out, prepare_end_lsn);
323  pq_sendint64(out, txn->end_lsn);
324  pq_sendint64(out, prepare_time);
326  pq_sendint32(out, txn->xid);
327 
328  /* send gid */
329  pq_sendstring(out, txn->gid);
330 }
331 
332 /*
333  * Read transaction ROLLBACK PREPARED from the stream.
334  */
335 void
337  LogicalRepRollbackPreparedTxnData *rollback_data)
338 {
339  /* read flags */
340  uint8 flags = pq_getmsgbyte(in);
341 
342  if (flags != 0)
343  elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
344 
345  /* read fields */
346  rollback_data->prepare_end_lsn = pq_getmsgint64(in);
347  if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
348  elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
349  rollback_data->rollback_end_lsn = pq_getmsgint64(in);
350  if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
351  elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
352  rollback_data->prepare_time = pq_getmsgint64(in);
353  rollback_data->rollback_time = pq_getmsgint64(in);
354  rollback_data->xid = pq_getmsgint(in, 4);
355 
356  /* read gid (copy it into a pre-allocated buffer) */
357  strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
358 }
359 
360 /*
361  * Write STREAM PREPARE to the output stream.
362  */
363 void
365  ReorderBufferTXN *txn,
366  XLogRecPtr prepare_lsn)
367 {
369  txn, prepare_lsn);
370 }
371 
372 /*
373  * Read STREAM PREPARE from the stream.
374  */
375 void
377 {
378  logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
379 }
380 
381 /*
382  * Write ORIGIN to the output stream.
383  */
384 void
385 logicalrep_write_origin(StringInfo out, const char *origin,
386  XLogRecPtr origin_lsn)
387 {
389 
390  /* fixed fields */
391  pq_sendint64(out, origin_lsn);
392 
393  /* origin string */
394  pq_sendstring(out, origin);
395 }
396 
397 /*
398  * Read ORIGIN from the output stream.
399  */
400 char *
402 {
403  /* fixed fields */
404  *origin_lsn = pq_getmsgint64(in);
405 
406  /* return origin */
407  return pstrdup(pq_getmsgstring(in));
408 }
409 
410 /*
411  * Write INSERT to the output stream.
412  */
413 void
415  TupleTableSlot *newslot, bool binary, Bitmapset *columns)
416 {
418 
419  /* transaction ID (if not valid, we're not streaming) */
420  if (TransactionIdIsValid(xid))
421  pq_sendint32(out, xid);
422 
423  /* use Oid as relation identifier */
424  pq_sendint32(out, RelationGetRelid(rel));
425 
426  pq_sendbyte(out, 'N'); /* new tuple follows */
427  logicalrep_write_tuple(out, rel, newslot, binary, columns);
428 }
429 
430 /*
431  * Read INSERT from stream.
432  *
433  * Fills the new tuple.
434  */
437 {
438  char action;
439  LogicalRepRelId relid;
440 
441  /* read the relation id */
442  relid = pq_getmsgint(in, 4);
443 
444  action = pq_getmsgbyte(in);
445  if (action != 'N')
446  elog(ERROR, "expected new tuple but got %d",
447  action);
448 
449  logicalrep_read_tuple(in, newtup);
450 
451  return relid;
452 }
453 
454 /*
455  * Write UPDATE to the output stream.
456  */
457 void
459  TupleTableSlot *oldslot, TupleTableSlot *newslot,
460  bool binary, Bitmapset *columns)
461 {
463 
464  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
465  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
466  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
467 
468  /* transaction ID (if not valid, we're not streaming) */
469  if (TransactionIdIsValid(xid))
470  pq_sendint32(out, xid);
471 
472  /* use Oid as relation identifier */
473  pq_sendint32(out, RelationGetRelid(rel));
474 
475  if (oldslot != NULL)
476  {
477  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
478  pq_sendbyte(out, 'O'); /* old tuple follows */
479  else
480  pq_sendbyte(out, 'K'); /* old key follows */
481  logicalrep_write_tuple(out, rel, oldslot, binary, columns);
482  }
483 
484  pq_sendbyte(out, 'N'); /* new tuple follows */
485  logicalrep_write_tuple(out, rel, newslot, binary, columns);
486 }
487 
488 /*
489  * Read UPDATE from stream.
490  */
492 logicalrep_read_update(StringInfo in, bool *has_oldtuple,
493  LogicalRepTupleData *oldtup,
494  LogicalRepTupleData *newtup)
495 {
496  char action;
497  LogicalRepRelId relid;
498 
499  /* read the relation id */
500  relid = pq_getmsgint(in, 4);
501 
502  /* read and verify action */
503  action = pq_getmsgbyte(in);
504  if (action != 'K' && action != 'O' && action != 'N')
505  elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
506  action);
507 
508  /* check for old tuple */
509  if (action == 'K' || action == 'O')
510  {
511  logicalrep_read_tuple(in, oldtup);
512  *has_oldtuple = true;
513 
514  action = pq_getmsgbyte(in);
515  }
516  else
517  *has_oldtuple = false;
518 
519  /* check for new tuple */
520  if (action != 'N')
521  elog(ERROR, "expected action 'N', got %c",
522  action);
523 
524  logicalrep_read_tuple(in, newtup);
525 
526  return relid;
527 }
528 
529 /*
530  * Write DELETE to the output stream.
531  */
532 void
534  TupleTableSlot *oldslot, bool binary,
535  Bitmapset *columns)
536 {
537  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
538  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
539  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
540 
542 
543  /* transaction ID (if not valid, we're not streaming) */
544  if (TransactionIdIsValid(xid))
545  pq_sendint32(out, xid);
546 
547  /* use Oid as relation identifier */
548  pq_sendint32(out, RelationGetRelid(rel));
549 
550  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
551  pq_sendbyte(out, 'O'); /* old tuple follows */
552  else
553  pq_sendbyte(out, 'K'); /* old key follows */
554 
555  logicalrep_write_tuple(out, rel, oldslot, binary, columns);
556 }
557 
558 /*
559  * Read DELETE from stream.
560  *
561  * Fills the old tuple.
562  */
565 {
566  char action;
567  LogicalRepRelId relid;
568 
569  /* read the relation id */
570  relid = pq_getmsgint(in, 4);
571 
572  /* read and verify action */
573  action = pq_getmsgbyte(in);
574  if (action != 'K' && action != 'O')
575  elog(ERROR, "expected action 'O' or 'K', got %c", action);
576 
577  logicalrep_read_tuple(in, oldtup);
578 
579  return relid;
580 }
581 
582 /*
583  * Write TRUNCATE to the output stream.
584  */
585 void
587  TransactionId xid,
588  int nrelids,
589  Oid relids[],
590  bool cascade, bool restart_seqs)
591 {
592  int i;
593  uint8 flags = 0;
594 
596 
597  /* transaction ID (if not valid, we're not streaming) */
598  if (TransactionIdIsValid(xid))
599  pq_sendint32(out, xid);
600 
601  pq_sendint32(out, nrelids);
602 
603  /* encode and send truncate flags */
604  if (cascade)
605  flags |= TRUNCATE_CASCADE;
606  if (restart_seqs)
607  flags |= TRUNCATE_RESTART_SEQS;
608  pq_sendint8(out, flags);
609 
610  for (i = 0; i < nrelids; i++)
611  pq_sendint32(out, relids[i]);
612 }
613 
614 /*
615  * Read TRUNCATE from stream.
616  */
617 List *
619  bool *cascade, bool *restart_seqs)
620 {
621  int i;
622  int nrelids;
623  List *relids = NIL;
624  uint8 flags;
625 
626  nrelids = pq_getmsgint(in, 4);
627 
628  /* read and decode truncate flags */
629  flags = pq_getmsgint(in, 1);
630  *cascade = (flags & TRUNCATE_CASCADE) > 0;
631  *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
632 
633  for (i = 0; i < nrelids; i++)
634  relids = lappend_oid(relids, pq_getmsgint(in, 4));
635 
636  return relids;
637 }
638 
639 /*
640  * Write MESSAGE to stream
641  */
642 void
644  bool transactional, const char *prefix, Size sz,
645  const char *message)
646 {
647  uint8 flags = 0;
648 
650 
651  /* encode and send message flags */
652  if (transactional)
653  flags |= MESSAGE_TRANSACTIONAL;
654 
655  /* transaction ID (if not valid, we're not streaming) */
656  if (TransactionIdIsValid(xid))
657  pq_sendint32(out, xid);
658 
659  pq_sendint8(out, flags);
660  pq_sendint64(out, lsn);
661  pq_sendstring(out, prefix);
662  pq_sendint32(out, sz);
663  pq_sendbytes(out, message, sz);
664 }
665 
666 /*
667  * Write relation description to the output stream.
668  */
669 void
671  Bitmapset *columns)
672 {
673  char *relname;
674 
676 
677  /* transaction ID (if not valid, we're not streaming) */
678  if (TransactionIdIsValid(xid))
679  pq_sendint32(out, xid);
680 
681  /* use Oid as relation identifier */
682  pq_sendint32(out, RelationGetRelid(rel));
683 
684  /* send qualified relation name */
687  pq_sendstring(out, relname);
688 
689  /* send replica identity */
690  pq_sendbyte(out, rel->rd_rel->relreplident);
691 
692  /* send the attribute info */
693  logicalrep_write_attrs(out, rel, columns);
694 }
695 
696 /*
697  * Read the relation info from stream and return as LogicalRepRelation.
698  */
701 {
703 
704  rel->remoteid = pq_getmsgint(in, 4);
705 
706  /* Read relation name from stream */
708  rel->relname = pstrdup(pq_getmsgstring(in));
709 
710  /* Read the replica identity. */
711  rel->replident = pq_getmsgbyte(in);
712 
713  /* Get attribute description */
714  logicalrep_read_attrs(in, rel);
715 
716  return rel;
717 }
718 
719 /*
720  * Write type info to the output stream.
721  *
722  * This function will always write base type info.
723  */
724 void
726 {
727  Oid basetypoid = getBaseType(typoid);
728  HeapTuple tup;
729  Form_pg_type typtup;
730 
732 
733  /* transaction ID (if not valid, we're not streaming) */
734  if (TransactionIdIsValid(xid))
735  pq_sendint32(out, xid);
736 
737  tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
738  if (!HeapTupleIsValid(tup))
739  elog(ERROR, "cache lookup failed for type %u", basetypoid);
740  typtup = (Form_pg_type) GETSTRUCT(tup);
741 
742  /* use Oid as relation identifier */
743  pq_sendint32(out, typoid);
744 
745  /* send qualified type name */
746  logicalrep_write_namespace(out, typtup->typnamespace);
747  pq_sendstring(out, NameStr(typtup->typname));
748 
749  ReleaseSysCache(tup);
750 }
751 
752 /*
753  * Read type info from the output stream.
754  */
755 void
757 {
758  ltyp->remoteid = pq_getmsgint(in, 4);
759 
760  /* Read type name from stream */
762  ltyp->typname = pstrdup(pq_getmsgstring(in));
763 }
764 
765 /*
766  * Write a tuple to the outputstream, in the most efficient format possible.
767  */
768 static void
770  bool binary, Bitmapset *columns)
771 {
772  TupleDesc desc;
773  Datum *values;
774  bool *isnull;
775  int i;
776  uint16 nliveatts = 0;
777 
778  desc = RelationGetDescr(rel);
779 
780  for (i = 0; i < desc->natts; i++)
781  {
782  Form_pg_attribute att = TupleDescAttr(desc, i);
783 
784  if (att->attisdropped || att->attgenerated)
785  continue;
786 
787  if (!column_in_column_list(att->attnum, columns))
788  continue;
789 
790  nliveatts++;
791  }
792  pq_sendint16(out, nliveatts);
793 
794  slot_getallattrs(slot);
795  values = slot->tts_values;
796  isnull = slot->tts_isnull;
797 
798  /* Write the values */
799  for (i = 0; i < desc->natts; i++)
800  {
801  HeapTuple typtup;
802  Form_pg_type typclass;
803  Form_pg_attribute att = TupleDescAttr(desc, i);
804 
805  if (att->attisdropped || att->attgenerated)
806  continue;
807 
808  if (!column_in_column_list(att->attnum, columns))
809  continue;
810 
811  if (isnull[i])
812  {
814  continue;
815  }
816 
817  if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
818  {
819  /*
820  * Unchanged toasted datum. (Note that we don't promise to detect
821  * unchanged data in general; this is just a cheap check to avoid
822  * sending large values unnecessarily.)
823  */
825  continue;
826  }
827 
828  typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
829  if (!HeapTupleIsValid(typtup))
830  elog(ERROR, "cache lookup failed for type %u", att->atttypid);
831  typclass = (Form_pg_type) GETSTRUCT(typtup);
832 
833  /*
834  * Send in binary if requested and type has suitable send function.
835  */
836  if (binary && OidIsValid(typclass->typsend))
837  {
838  bytea *outputbytes;
839  int len;
840 
842  outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
843  len = VARSIZE(outputbytes) - VARHDRSZ;
844  pq_sendint(out, len, 4); /* length */
845  pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
846  pfree(outputbytes);
847  }
848  else
849  {
850  char *outputstr;
851 
853  outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
854  pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
855  pfree(outputstr);
856  }
857 
858  ReleaseSysCache(typtup);
859  }
860 }
861 
862 /*
863  * Read tuple in logical replication format from stream.
864  */
865 static void
867 {
868  int i;
869  int natts;
870 
871  /* Get number of attributes */
872  natts = pq_getmsgint(in, 2);
873 
874  /* Allocate space for per-column values; zero out unused StringInfoDatas */
875  tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
876  tuple->colstatus = (char *) palloc(natts * sizeof(char));
877  tuple->ncols = natts;
878 
879  /* Read the data */
880  for (i = 0; i < natts; i++)
881  {
882  char kind;
883  int len;
884  StringInfo value = &tuple->colvalues[i];
885 
886  kind = pq_getmsgbyte(in);
887  tuple->colstatus[i] = kind;
888 
889  switch (kind)
890  {
892  /* nothing more to do */
893  break;
895  /* we don't receive the value of an unchanged column */
896  break;
898  len = pq_getmsgint(in, 4); /* read length */
899 
900  /* and data */
901  value->data = palloc(len + 1);
902  pq_copymsgbytes(in, value->data, len);
903  value->data[len] = '\0';
904  /* make StringInfo fully valid */
905  value->len = len;
906  value->cursor = 0;
907  value->maxlen = len;
908  break;
910  len = pq_getmsgint(in, 4); /* read length */
911 
912  /* and data */
913  value->data = palloc(len + 1);
914  pq_copymsgbytes(in, value->data, len);
915  /* not strictly necessary but per StringInfo practice */
916  value->data[len] = '\0';
917  /* make StringInfo fully valid */
918  value->len = len;
919  value->cursor = 0;
920  value->maxlen = len;
921  break;
922  default:
923  elog(ERROR, "unrecognized data representation type '%c'", kind);
924  }
925  }
926 }
927 
928 /*
929  * Write relation attribute metadata to the stream.
930  */
931 static void
933 {
934  TupleDesc desc;
935  int i;
936  uint16 nliveatts = 0;
937  Bitmapset *idattrs = NULL;
938  bool replidentfull;
939 
940  desc = RelationGetDescr(rel);
941 
942  /* send number of live attributes */
943  for (i = 0; i < desc->natts; i++)
944  {
945  Form_pg_attribute att = TupleDescAttr(desc, i);
946 
947  if (att->attisdropped || att->attgenerated)
948  continue;
949 
950  if (!column_in_column_list(att->attnum, columns))
951  continue;
952 
953  nliveatts++;
954  }
955  pq_sendint16(out, nliveatts);
956 
957  /* fetch bitmap of REPLICATION IDENTITY attributes */
958  replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
959  if (!replidentfull)
960  idattrs = RelationGetIdentityKeyBitmap(rel);
961 
962  /* send the attributes */
963  for (i = 0; i < desc->natts; i++)
964  {
965  Form_pg_attribute att = TupleDescAttr(desc, i);
966  uint8 flags = 0;
967 
968  if (att->attisdropped || att->attgenerated)
969  continue;
970 
971  if (!column_in_column_list(att->attnum, columns))
972  continue;
973 
974  /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
975  if (replidentfull ||
977  idattrs))
979 
980  pq_sendbyte(out, flags);
981 
982  /* attribute name */
983  pq_sendstring(out, NameStr(att->attname));
984 
985  /* attribute type id */
986  pq_sendint32(out, (int) att->atttypid);
987 
988  /* attribute mode */
989  pq_sendint32(out, att->atttypmod);
990  }
991 
992  bms_free(idattrs);
993 }
994 
995 /*
996  * Read relation attribute metadata from the stream.
997  */
998 static void
1000 {
1001  int i;
1002  int natts;
1003  char **attnames;
1004  Oid *atttyps;
1005  Bitmapset *attkeys = NULL;
1006 
1007  natts = pq_getmsgint(in, 2);
1008  attnames = palloc(natts * sizeof(char *));
1009  atttyps = palloc(natts * sizeof(Oid));
1010 
1011  /* read the attributes */
1012  for (i = 0; i < natts; i++)
1013  {
1014  uint8 flags;
1015 
1016  /* Check for replica identity column */
1017  flags = pq_getmsgbyte(in);
1018  if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
1019  attkeys = bms_add_member(attkeys, i);
1020 
1021  /* attribute name */
1022  attnames[i] = pstrdup(pq_getmsgstring(in));
1023 
1024  /* attribute type id */
1025  atttyps[i] = (Oid) pq_getmsgint(in, 4);
1026 
1027  /* we ignore attribute mode for now */
1028  (void) pq_getmsgint(in, 4);
1029  }
1030 
1031  rel->attnames = attnames;
1032  rel->atttyps = atttyps;
1033  rel->attkeys = attkeys;
1034  rel->natts = natts;
1035 }
1036 
1037 /*
1038  * Write the namespace name or empty string for pg_catalog (to save space).
1039  */
1040 static void
1042 {
1043  if (nspid == PG_CATALOG_NAMESPACE)
1044  pq_sendbyte(out, '\0');
1045  else
1046  {
1047  char *nspname = get_namespace_name(nspid);
1048 
1049  if (nspname == NULL)
1050  elog(ERROR, "cache lookup failed for namespace %u",
1051  nspid);
1052 
1053  pq_sendstring(out, nspname);
1054  }
1055 }
1056 
1057 /*
1058  * Read the namespace name while treating empty string as pg_catalog.
1059  */
1060 static const char *
1062 {
1063  const char *nspname = pq_getmsgstring(in);
1064 
1065  if (nspname[0] == '\0')
1066  nspname = "pg_catalog";
1067 
1068  return nspname;
1069 }
1070 
1071 /*
1072  * Write the information for the start stream message to the output stream.
1073  */
1074 void
1076  TransactionId xid, bool first_segment)
1077 {
1079 
1081 
1082  /* transaction ID (we're starting to stream, so must be valid) */
1083  pq_sendint32(out, xid);
1084 
1085  /* 1 if this is the first streaming segment for this xid */
1086  pq_sendbyte(out, first_segment ? 1 : 0);
1087 }
1088 
1089 /*
1090  * Read the information about the start stream message from output stream.
1091  */
1094 {
1095  TransactionId xid;
1096 
1097  Assert(first_segment);
1098 
1099  xid = pq_getmsgint(in, 4);
1100  *first_segment = (pq_getmsgbyte(in) == 1);
1101 
1102  return xid;
1103 }
1104 
1105 /*
1106  * Write the stop stream message to the output stream.
1107  */
1108 void
1110 {
1112 }
1113 
1114 /*
1115  * Write STREAM COMMIT to the output stream.
1116  */
1117 void
1119  XLogRecPtr commit_lsn)
1120 {
1121  uint8 flags = 0;
1122 
1124 
1126 
1127  /* transaction ID */
1128  pq_sendint32(out, txn->xid);
1129 
1130  /* send the flags field (unused for now) */
1131  pq_sendbyte(out, flags);
1132 
1133  /* send fields */
1134  pq_sendint64(out, commit_lsn);
1135  pq_sendint64(out, txn->end_lsn);
1136  pq_sendint64(out, txn->xact_time.commit_time);
1137 }
1138 
1139 /*
1140  * Read STREAM COMMIT from the output stream.
1141  */
1144 {
1145  TransactionId xid;
1146  uint8 flags;
1147 
1148  xid = pq_getmsgint(in, 4);
1149 
1150  /* read flags (unused for now) */
1151  flags = pq_getmsgbyte(in);
1152 
1153  if (flags != 0)
1154  elog(ERROR, "unrecognized flags %u in commit message", flags);
1155 
1156  /* read fields */
1157  commit_data->commit_lsn = pq_getmsgint64(in);
1158  commit_data->end_lsn = pq_getmsgint64(in);
1159  commit_data->committime = pq_getmsgint64(in);
1160 
1161  return xid;
1162 }
1163 
1164 /*
1165  * Write STREAM ABORT to the output stream. Note that xid and subxid will be
1166  * same for the top-level transaction abort.
1167  */
1168 void
1170  TransactionId subxid)
1171 {
1173 
1175 
1176  /* transaction ID */
1177  pq_sendint32(out, xid);
1178  pq_sendint32(out, subxid);
1179 }
1180 
1181 /*
1182  * Read STREAM ABORT from the output stream.
1183  */
1184 void
1186  TransactionId *subxid)
1187 {
1188  Assert(xid && subxid);
1189 
1190  *xid = pq_getmsgint(in, 4);
1191  *subxid = pq_getmsgint(in, 4);
1192 }
1193 
1194 /*
1195  * Get string representing LogicalRepMsgType.
1196  */
1197 char *
1199 {
1200  switch (action)
1201  {
1202  case LOGICAL_REP_MSG_BEGIN:
1203  return "BEGIN";
1205  return "COMMIT";
1207  return "ORIGIN";
1209  return "INSERT";
1211  return "UPDATE";
1213  return "DELETE";
1215  return "TRUNCATE";
1217  return "RELATION";
1218  case LOGICAL_REP_MSG_TYPE:
1219  return "TYPE";
1221  return "MESSAGE";
1223  return "BEGIN PREPARE";
1225  return "PREPARE";
1227  return "COMMIT PREPARED";
1229  return "ROLLBACK PREPARED";
1231  return "STREAM START";
1233  return "STREAM STOP";
1235  return "STREAM COMMIT";
1237  return "STREAM ABORT";
1239  return "STREAM PREPARE";
1240  }
1241 
1242  elog(ERROR, "invalid logical replication message type \"%c\"", action);
1243 
1244  return NULL; /* keep compiler quiet */
1245 }
void bms_free(Bitmapset *a)
Definition: bitmapset.c:209
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:428
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:739
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define NameStr(name)
Definition: c.h:682
unsigned short uint16
Definition: c.h:441
#define VARHDRSZ
Definition: c.h:628
unsigned char uint8
Definition: c.h:440
uint32 TransactionId
Definition: c.h:588
#define OidIsValid(objectId)
Definition: c.h:711
size_t Size
Definition: c.h:541
int64 TimestampTz
Definition: timestamp.h:39
#define ERROR
Definition: elog.h:39
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1705
bytea * OidSendFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1724
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:649
static struct @143 value
int i
Definition: isn.c:73
Assert(fmt[strlen(fmt) - 1] !='\n')
List * lappend_oid(List *list, Oid datum)
Definition: list.c:374
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:92
LogicalRepMsgType
Definition: logicalproto.h:53
@ LOGICAL_REP_MSG_INSERT
Definition: logicalproto.h:57
@ LOGICAL_REP_MSG_TRUNCATE
Definition: logicalproto.h:60
@ LOGICAL_REP_MSG_STREAM_STOP
Definition: logicalproto.h:69
@ LOGICAL_REP_MSG_BEGIN
Definition: logicalproto.h:54
@ LOGICAL_REP_MSG_STREAM_PREPARE
Definition: logicalproto.h:72
@ LOGICAL_REP_MSG_STREAM_ABORT
Definition: logicalproto.h:71
@ LOGICAL_REP_MSG_BEGIN_PREPARE
Definition: logicalproto.h:64
@ LOGICAL_REP_MSG_STREAM_START
Definition: logicalproto.h:68
@ LOGICAL_REP_MSG_COMMIT
Definition: logicalproto.h:55
@ LOGICAL_REP_MSG_PREPARE
Definition: logicalproto.h:65
@ LOGICAL_REP_MSG_RELATION
Definition: logicalproto.h:61
@ LOGICAL_REP_MSG_MESSAGE
Definition: logicalproto.h:63
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
Definition: logicalproto.h:67
@ LOGICAL_REP_MSG_COMMIT_PREPARED
Definition: logicalproto.h:66
@ LOGICAL_REP_MSG_TYPE
Definition: logicalproto.h:62
@ LOGICAL_REP_MSG_DELETE
Definition: logicalproto.h:59
@ LOGICAL_REP_MSG_STREAM_COMMIT
Definition: logicalproto.h:70
@ LOGICAL_REP_MSG_ORIGIN
Definition: logicalproto.h:56
@ LOGICAL_REP_MSG_UPDATE
Definition: logicalproto.h:58
uint32 LogicalRepRelId
Definition: logicalproto.h:96
#define LOGICALREP_COLUMN_NULL
Definition: logicalproto.h:91
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:94
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:93
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3331
Oid getBaseType(Oid typid)
Definition: lsyscache.c:2479
char * pstrdup(const char *in)
Definition: mcxt.c:1483
void pfree(void *pointer)
Definition: mcxt.c:1306
void * palloc0(Size size)
Definition: mcxt.c:1230
void * palloc(Size size)
Definition: mcxt.c:1199
int16 attnum
Definition: pg_attribute.h:83
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:207
NameData relname
Definition: pg_class.h:38
const void size_t len
#define NIL
Definition: pg_list.h:66
FormData_pg_type * Form_pg_type
Definition: pg_type.h:261
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: postgres.h:328
uintptr_t Datum
Definition: postgres.h:412
#define VARDATA(PTR)
Definition: postgres.h:316
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:600
#define VARSIZE(PTR)
Definition: postgres.h:317
unsigned int Oid
Definition: postgres_ext.h:31
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
void pq_copymsgbytes(StringInfo msg, char *buf, int datalen)
Definition: pqformat.c:530
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
void pq_sendcountedtext(StringInfo buf, const char *str, int slen, bool countincludesself)
Definition: pqformat.c:142
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendint8(StringInfo buf, uint8 i)
Definition: pqformat.h:129
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
static void pq_sendint(StringInfo buf, uint32 i, int b)
Definition: pqformat.h:172
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:109
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:1041
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:30
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:564
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:89
void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: proto.c:304
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
Definition: proto.c:336
static void logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:166
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:385
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
Definition: proto.c:145
static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
Definition: proto.c:932
static void logicalrep_read_prepare_common(StringInfo in, char *msgtype, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:210
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:618
char * logicalrep_message_type(LogicalRepMsgType action)
Definition: proto.c:1198
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:756
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns)
Definition: proto.c:670
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:492
void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: proto.c:643
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns)
Definition: proto.c:458
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
Definition: proto.c:999
void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:198
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:74
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns)
Definition: proto.c:414
#define MESSAGE_TRANSACTIONAL
Definition: proto.c:28
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
Definition: proto.c:725
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:586
#define LOGICALREP_IS_REPLICA_IDENTITY
Definition: proto.c:26
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:866
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:60
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
Definition: proto.c:278
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns)
Definition: proto.c:533
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:700
void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:248
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:1118
void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:364
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid)
Definition: proto.c:1169
#define TRUNCATE_CASCADE
Definition: proto.c:29
void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid)
Definition: proto.c:1185
void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:127
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:376
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
Definition: proto.c:1075
static bool column_in_column_list(int attnum, Bitmapset *columns)
Definition: proto.c:50
static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, bool binary, Bitmapset *columns)
Definition: proto.c:769
char * logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
Definition: proto.c:401
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:1143
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:436
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:239
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:1061
void logicalrep_write_stream_stop(StringInfo out)
Definition: proto.c:1109
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition: proto.c:1093
#define RelationGetRelid(relation)
Definition: rel.h:501
#define RelationGetDescr(relation)
Definition: rel.h:527
#define RelationGetRelationName(relation)
Definition: rel.h:535
#define RelationGetNamespace(relation)
Definition: rel.h:542
Bitmapset * RelationGetIdentityKeyBitmap(Relation relation)
Definition: relcache.c:5412
#define rbtxn_prepared(txn)
Definition: pg_list.h:52
XLogRecPtr final_lsn
Definition: logicalproto.h:124
TransactionId xid
Definition: logicalproto.h:126
TimestampTz committime
Definition: logicalproto.h:125
TimestampTz committime
Definition: logicalproto.h:133
LogicalRepRelId remoteid
Definition: logicalproto.h:102
Bitmapset * attkeys
Definition: logicalproto.h:110
StringInfoData * colvalues
Definition: logicalproto.h:82
Form_pg_class rd_rel
Definition: rel.h:110
TimestampTz commit_time
XLogRecPtr final_lsn
XLogRecPtr end_lsn
TimestampTz prepare_time
TransactionId xid
union ReorderBufferTXN::@103 xact_time
bool * tts_isnull
Definition: tuptable.h:128
Datum * tts_values
Definition: tuptable.h:126
Definition: c.h:623
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1221
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1173
@ TYPEOID
Definition: syscache.h:114
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:362
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28