PostgreSQL Source Code  git master
proto.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * proto.c
4  * logical replication protocol functions
5  *
6  * Copyright (c) 2015-2021, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/logical/proto.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/sysattr.h"
16 #include "catalog/pg_namespace.h"
17 #include "catalog/pg_type.h"
18 #include "libpq/pqformat.h"
20 #include "utils/lsyscache.h"
21 #include "utils/syscache.h"
22 
23 /*
24  * Protocol message flags.
25  */
26 #define LOGICALREP_IS_REPLICA_IDENTITY 1
27 
28 #define TRUNCATE_CASCADE (1<<0)
29 #define TRUNCATE_RESTART_SEQS (1<<1)
30 
31 static void logicalrep_write_attrs(StringInfo out, Relation rel);
32 static void logicalrep_write_tuple(StringInfo out, Relation rel,
33  HeapTuple tuple, bool binary);
34 
37 
38 static void logicalrep_write_namespace(StringInfo out, Oid nspid);
39 static const char *logicalrep_read_namespace(StringInfo in);
40 
41 /*
42  * Write BEGIN to the output stream.
43  */
44 void
46 {
48 
49  /* fixed fields */
50  pq_sendint64(out, txn->final_lsn);
51  pq_sendint64(out, txn->commit_time);
52  pq_sendint32(out, txn->xid);
53 }
54 
55 /*
56  * Read transaction BEGIN from the stream.
57  */
58 void
60 {
61  /* read fields */
62  begin_data->final_lsn = pq_getmsgint64(in);
63  if (begin_data->final_lsn == InvalidXLogRecPtr)
64  elog(ERROR, "final_lsn not set in begin message");
65  begin_data->committime = pq_getmsgint64(in);
66  begin_data->xid = pq_getmsgint(in, 4);
67 }
68 
69 
70 /*
71  * Write COMMIT to the output stream.
72  */
73 void
75  XLogRecPtr commit_lsn)
76 {
77  uint8 flags = 0;
78 
80 
81  /* send the flags field (unused for now) */
82  pq_sendbyte(out, flags);
83 
84  /* send fields */
85  pq_sendint64(out, commit_lsn);
86  pq_sendint64(out, txn->end_lsn);
87  pq_sendint64(out, txn->commit_time);
88 }
89 
90 /*
91  * Read transaction COMMIT from the stream.
92  */
93 void
95 {
96  /* read flags (unused for now) */
97  uint8 flags = pq_getmsgbyte(in);
98 
99  if (flags != 0)
100  elog(ERROR, "unrecognized flags %u in commit message", flags);
101 
102  /* read fields */
103  commit_data->commit_lsn = pq_getmsgint64(in);
104  commit_data->end_lsn = pq_getmsgint64(in);
105  commit_data->committime = pq_getmsgint64(in);
106 }
107 
108 /*
109  * Write ORIGIN to the output stream.
110  */
111 void
112 logicalrep_write_origin(StringInfo out, const char *origin,
113  XLogRecPtr origin_lsn)
114 {
116 
117  /* fixed fields */
118  pq_sendint64(out, origin_lsn);
119 
120  /* origin string */
121  pq_sendstring(out, origin);
122 }
123 
124 /*
125  * Read ORIGIN from the output stream.
126  */
127 char *
129 {
130  /* fixed fields */
131  *origin_lsn = pq_getmsgint64(in);
132 
133  /* return origin */
134  return pstrdup(pq_getmsgstring(in));
135 }
136 
137 /*
138  * Write INSERT to the output stream.
139  */
140 void
142  HeapTuple newtuple, bool binary)
143 {
145 
146  /* transaction ID (if not valid, we're not streaming) */
147  if (TransactionIdIsValid(xid))
148  pq_sendint32(out, xid);
149 
150  /* use Oid as relation identifier */
151  pq_sendint32(out, RelationGetRelid(rel));
152 
153  pq_sendbyte(out, 'N'); /* new tuple follows */
154  logicalrep_write_tuple(out, rel, newtuple, binary);
155 }
156 
157 /*
158  * Read INSERT from stream.
159  *
160  * Fills the new tuple.
161  */
164 {
165  char action;
166  LogicalRepRelId relid;
167 
168  /* read the relation id */
169  relid = pq_getmsgint(in, 4);
170 
171  action = pq_getmsgbyte(in);
172  if (action != 'N')
173  elog(ERROR, "expected new tuple but got %d",
174  action);
175 
176  logicalrep_read_tuple(in, newtup);
177 
178  return relid;
179 }
180 
181 /*
182  * Write UPDATE to the output stream.
183  */
184 void
186  HeapTuple oldtuple, HeapTuple newtuple, bool binary)
187 {
189 
190  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
191  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
192  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
193 
194  /* transaction ID (if not valid, we're not streaming) */
195  if (TransactionIdIsValid(xid))
196  pq_sendint32(out, xid);
197 
198  /* use Oid as relation identifier */
199  pq_sendint32(out, RelationGetRelid(rel));
200 
201  if (oldtuple != NULL)
202  {
203  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
204  pq_sendbyte(out, 'O'); /* old tuple follows */
205  else
206  pq_sendbyte(out, 'K'); /* old key follows */
207  logicalrep_write_tuple(out, rel, oldtuple, binary);
208  }
209 
210  pq_sendbyte(out, 'N'); /* new tuple follows */
211  logicalrep_write_tuple(out, rel, newtuple, binary);
212 }
213 
214 /*
215  * Read UPDATE from stream.
216  */
218 logicalrep_read_update(StringInfo in, bool *has_oldtuple,
219  LogicalRepTupleData *oldtup,
220  LogicalRepTupleData *newtup)
221 {
222  char action;
223  LogicalRepRelId relid;
224 
225  /* read the relation id */
226  relid = pq_getmsgint(in, 4);
227 
228  /* read and verify action */
229  action = pq_getmsgbyte(in);
230  if (action != 'K' && action != 'O' && action != 'N')
231  elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
232  action);
233 
234  /* check for old tuple */
235  if (action == 'K' || action == 'O')
236  {
237  logicalrep_read_tuple(in, oldtup);
238  *has_oldtuple = true;
239 
240  action = pq_getmsgbyte(in);
241  }
242  else
243  *has_oldtuple = false;
244 
245  /* check for new tuple */
246  if (action != 'N')
247  elog(ERROR, "expected action 'N', got %c",
248  action);
249 
250  logicalrep_read_tuple(in, newtup);
251 
252  return relid;
253 }
254 
255 /*
256  * Write DELETE to the output stream.
257  */
258 void
260  HeapTuple oldtuple, bool binary)
261 {
262  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
263  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
264  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
265 
267 
268  /* transaction ID (if not valid, we're not streaming) */
269  if (TransactionIdIsValid(xid))
270  pq_sendint32(out, xid);
271 
272  /* use Oid as relation identifier */
273  pq_sendint32(out, RelationGetRelid(rel));
274 
275  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
276  pq_sendbyte(out, 'O'); /* old tuple follows */
277  else
278  pq_sendbyte(out, 'K'); /* old key follows */
279 
280  logicalrep_write_tuple(out, rel, oldtuple, binary);
281 }
282 
283 /*
284  * Read DELETE from stream.
285  *
286  * Fills the old tuple.
287  */
290 {
291  char action;
292  LogicalRepRelId relid;
293 
294  /* read the relation id */
295  relid = pq_getmsgint(in, 4);
296 
297  /* read and verify action */
298  action = pq_getmsgbyte(in);
299  if (action != 'K' && action != 'O')
300  elog(ERROR, "expected action 'O' or 'K', got %c", action);
301 
302  logicalrep_read_tuple(in, oldtup);
303 
304  return relid;
305 }
306 
307 /*
308  * Write TRUNCATE to the output stream.
309  */
310 void
312  TransactionId xid,
313  int nrelids,
314  Oid relids[],
315  bool cascade, bool restart_seqs)
316 {
317  int i;
318  uint8 flags = 0;
319 
321 
322  /* transaction ID (if not valid, we're not streaming) */
323  if (TransactionIdIsValid(xid))
324  pq_sendint32(out, xid);
325 
326  pq_sendint32(out, nrelids);
327 
328  /* encode and send truncate flags */
329  if (cascade)
330  flags |= TRUNCATE_CASCADE;
331  if (restart_seqs)
332  flags |= TRUNCATE_RESTART_SEQS;
333  pq_sendint8(out, flags);
334 
335  for (i = 0; i < nrelids; i++)
336  pq_sendint32(out, relids[i]);
337 }
338 
339 /*
340  * Read TRUNCATE from stream.
341  */
342 List *
344  bool *cascade, bool *restart_seqs)
345 {
346  int i;
347  int nrelids;
348  List *relids = NIL;
349  uint8 flags;
350 
351  nrelids = pq_getmsgint(in, 4);
352 
353  /* read and decode truncate flags */
354  flags = pq_getmsgint(in, 1);
355  *cascade = (flags & TRUNCATE_CASCADE) > 0;
356  *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
357 
358  for (i = 0; i < nrelids; i++)
359  relids = lappend_oid(relids, pq_getmsgint(in, 4));
360 
361  return relids;
362 }
363 
364 /*
365  * Write relation description to the output stream.
366  */
367 void
369 {
370  char *relname;
371 
373 
374  /* transaction ID (if not valid, we're not streaming) */
375  if (TransactionIdIsValid(xid))
376  pq_sendint32(out, xid);
377 
378  /* use Oid as relation identifier */
379  pq_sendint32(out, RelationGetRelid(rel));
380 
381  /* send qualified relation name */
383  relname = RelationGetRelationName(rel);
384  pq_sendstring(out, relname);
385 
386  /* send replica identity */
387  pq_sendbyte(out, rel->rd_rel->relreplident);
388 
389  /* send the attribute info */
390  logicalrep_write_attrs(out, rel);
391 }
392 
393 /*
394  * Read the relation info from stream and return as LogicalRepRelation.
395  */
398 {
400 
401  rel->remoteid = pq_getmsgint(in, 4);
402 
403  /* Read relation name from stream */
405  rel->relname = pstrdup(pq_getmsgstring(in));
406 
407  /* Read the replica identity. */
408  rel->replident = pq_getmsgbyte(in);
409 
410  /* Get attribute description */
411  logicalrep_read_attrs(in, rel);
412 
413  return rel;
414 }
415 
416 /*
417  * Write type info to the output stream.
418  *
419  * This function will always write base type info.
420  */
421 void
423 {
424  Oid basetypoid = getBaseType(typoid);
425  HeapTuple tup;
426  Form_pg_type typtup;
427 
429 
430  /* transaction ID (if not valid, we're not streaming) */
431  if (TransactionIdIsValid(xid))
432  pq_sendint32(out, xid);
433 
434  tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
435  if (!HeapTupleIsValid(tup))
436  elog(ERROR, "cache lookup failed for type %u", basetypoid);
437  typtup = (Form_pg_type) GETSTRUCT(tup);
438 
439  /* use Oid as relation identifier */
440  pq_sendint32(out, typoid);
441 
442  /* send qualified type name */
443  logicalrep_write_namespace(out, typtup->typnamespace);
444  pq_sendstring(out, NameStr(typtup->typname));
445 
446  ReleaseSysCache(tup);
447 }
448 
449 /*
450  * Read type info from the output stream.
451  */
452 void
454 {
455  ltyp->remoteid = pq_getmsgint(in, 4);
456 
457  /* Read type name from stream */
459  ltyp->typname = pstrdup(pq_getmsgstring(in));
460 }
461 
462 /*
463  * Write a tuple to the outputstream, in the most efficient format possible.
464  */
465 static void
467 {
468  TupleDesc desc;
470  bool isnull[MaxTupleAttributeNumber];
471  int i;
472  uint16 nliveatts = 0;
473 
474  desc = RelationGetDescr(rel);
475 
476  for (i = 0; i < desc->natts; i++)
477  {
478  if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
479  continue;
480  nliveatts++;
481  }
482  pq_sendint16(out, nliveatts);
483 
484  /* try to allocate enough memory from the get-go */
485  enlargeStringInfo(out, tuple->t_len +
486  nliveatts * (1 + 4));
487 
488  heap_deform_tuple(tuple, desc, values, isnull);
489 
490  /* Write the values */
491  for (i = 0; i < desc->natts; i++)
492  {
493  HeapTuple typtup;
494  Form_pg_type typclass;
495  Form_pg_attribute att = TupleDescAttr(desc, i);
496 
497  if (att->attisdropped || att->attgenerated)
498  continue;
499 
500  if (isnull[i])
501  {
503  continue;
504  }
505 
506  if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
507  {
508  /*
509  * Unchanged toasted datum. (Note that we don't promise to detect
510  * unchanged data in general; this is just a cheap check to avoid
511  * sending large values unnecessarily.)
512  */
514  continue;
515  }
516 
517  typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
518  if (!HeapTupleIsValid(typtup))
519  elog(ERROR, "cache lookup failed for type %u", att->atttypid);
520  typclass = (Form_pg_type) GETSTRUCT(typtup);
521 
522  /*
523  * Send in binary if requested and type has suitable send function.
524  */
525  if (binary && OidIsValid(typclass->typsend))
526  {
527  bytea *outputbytes;
528  int len;
529 
531  outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
532  len = VARSIZE(outputbytes) - VARHDRSZ;
533  pq_sendint(out, len, 4); /* length */
534  pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
535  pfree(outputbytes);
536  }
537  else
538  {
539  char *outputstr;
540 
542  outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
543  pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
544  pfree(outputstr);
545  }
546 
547  ReleaseSysCache(typtup);
548  }
549 }
550 
551 /*
552  * Read tuple in logical replication format from stream.
553  */
554 static void
556 {
557  int i;
558  int natts;
559 
560  /* Get number of attributes */
561  natts = pq_getmsgint(in, 2);
562 
563  /* Allocate space for per-column values; zero out unused StringInfoDatas */
564  tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
565  tuple->colstatus = (char *) palloc(natts * sizeof(char));
566  tuple->ncols = natts;
567 
568  /* Read the data */
569  for (i = 0; i < natts; i++)
570  {
571  char kind;
572  int len;
573  StringInfo value = &tuple->colvalues[i];
574 
575  kind = pq_getmsgbyte(in);
576  tuple->colstatus[i] = kind;
577 
578  switch (kind)
579  {
581  /* nothing more to do */
582  break;
584  /* we don't receive the value of an unchanged column */
585  break;
587  len = pq_getmsgint(in, 4); /* read length */
588 
589  /* and data */
590  value->data = palloc(len + 1);
591  pq_copymsgbytes(in, value->data, len);
592  value->data[len] = '\0';
593  /* make StringInfo fully valid */
594  value->len = len;
595  value->cursor = 0;
596  value->maxlen = len;
597  break;
599  len = pq_getmsgint(in, 4); /* read length */
600 
601  /* and data */
602  value->data = palloc(len + 1);
603  pq_copymsgbytes(in, value->data, len);
604  /* not strictly necessary but per StringInfo practice */
605  value->data[len] = '\0';
606  /* make StringInfo fully valid */
607  value->len = len;
608  value->cursor = 0;
609  value->maxlen = len;
610  break;
611  default:
612  elog(ERROR, "unrecognized data representation type '%c'", kind);
613  }
614  }
615 }
616 
617 /*
618  * Write relation attribute metadata to the stream.
619  */
620 static void
622 {
623  TupleDesc desc;
624  int i;
625  uint16 nliveatts = 0;
626  Bitmapset *idattrs = NULL;
627  bool replidentfull;
628 
629  desc = RelationGetDescr(rel);
630 
631  /* send number of live attributes */
632  for (i = 0; i < desc->natts; i++)
633  {
634  if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
635  continue;
636  nliveatts++;
637  }
638  pq_sendint16(out, nliveatts);
639 
640  /* fetch bitmap of REPLICATION IDENTITY attributes */
641  replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
642  if (!replidentfull)
643  idattrs = RelationGetIndexAttrBitmap(rel,
645 
646  /* send the attributes */
647  for (i = 0; i < desc->natts; i++)
648  {
649  Form_pg_attribute att = TupleDescAttr(desc, i);
650  uint8 flags = 0;
651 
652  if (att->attisdropped || att->attgenerated)
653  continue;
654 
655  /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
656  if (replidentfull ||
658  idattrs))
660 
661  pq_sendbyte(out, flags);
662 
663  /* attribute name */
664  pq_sendstring(out, NameStr(att->attname));
665 
666  /* attribute type id */
667  pq_sendint32(out, (int) att->atttypid);
668 
669  /* attribute mode */
670  pq_sendint32(out, att->atttypmod);
671  }
672 
673  bms_free(idattrs);
674 }
675 
676 /*
677  * Read relation attribute metadata from the stream.
678  */
679 static void
681 {
682  int i;
683  int natts;
684  char **attnames;
685  Oid *atttyps;
686  Bitmapset *attkeys = NULL;
687 
688  natts = pq_getmsgint(in, 2);
689  attnames = palloc(natts * sizeof(char *));
690  atttyps = palloc(natts * sizeof(Oid));
691 
692  /* read the attributes */
693  for (i = 0; i < natts; i++)
694  {
695  uint8 flags;
696 
697  /* Check for replica identity column */
698  flags = pq_getmsgbyte(in);
699  if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
700  attkeys = bms_add_member(attkeys, i);
701 
702  /* attribute name */
703  attnames[i] = pstrdup(pq_getmsgstring(in));
704 
705  /* attribute type id */
706  atttyps[i] = (Oid) pq_getmsgint(in, 4);
707 
708  /* we ignore attribute mode for now */
709  (void) pq_getmsgint(in, 4);
710  }
711 
712  rel->attnames = attnames;
713  rel->atttyps = atttyps;
714  rel->attkeys = attkeys;
715  rel->natts = natts;
716 }
717 
718 /*
719  * Write the namespace name or empty string for pg_catalog (to save space).
720  */
721 static void
723 {
724  if (nspid == PG_CATALOG_NAMESPACE)
725  pq_sendbyte(out, '\0');
726  else
727  {
728  char *nspname = get_namespace_name(nspid);
729 
730  if (nspname == NULL)
731  elog(ERROR, "cache lookup failed for namespace %u",
732  nspid);
733 
734  pq_sendstring(out, nspname);
735  }
736 }
737 
738 /*
739  * Read the namespace name while treating empty string as pg_catalog.
740  */
741 static const char *
743 {
744  const char *nspname = pq_getmsgstring(in);
745 
746  if (nspname[0] == '\0')
747  nspname = "pg_catalog";
748 
749  return nspname;
750 }
751 
752 /*
753  * Write the information for the start stream message to the output stream.
754  */
755 void
757  TransactionId xid, bool first_segment)
758 {
760 
762 
763  /* transaction ID (we're starting to stream, so must be valid) */
764  pq_sendint32(out, xid);
765 
766  /* 1 if this is the first streaming segment for this xid */
767  pq_sendbyte(out, first_segment ? 1 : 0);
768 }
769 
770 /*
771  * Read the information about the start stream message from output stream.
772  */
774 logicalrep_read_stream_start(StringInfo in, bool *first_segment)
775 {
776  TransactionId xid;
777 
778  Assert(first_segment);
779 
780  xid = pq_getmsgint(in, 4);
781  *first_segment = (pq_getmsgbyte(in) == 1);
782 
783  return xid;
784 }
785 
786 /*
787  * Write the stop stream message to the output stream.
788  */
789 void
791 {
793 }
794 
795 /*
796  * Write STREAM COMMIT to the output stream.
797  */
798 void
800  XLogRecPtr commit_lsn)
801 {
802  uint8 flags = 0;
803 
805 
807 
808  /* transaction ID */
809  pq_sendint32(out, txn->xid);
810 
811  /* send the flags field (unused for now) */
812  pq_sendbyte(out, flags);
813 
814  /* send fields */
815  pq_sendint64(out, commit_lsn);
816  pq_sendint64(out, txn->end_lsn);
817  pq_sendint64(out, txn->commit_time);
818 }
819 
820 /*
821  * Read STREAM COMMIT from the output stream.
822  */
825 {
826  TransactionId xid;
827  uint8 flags;
828 
829  xid = pq_getmsgint(in, 4);
830 
831  /* read flags (unused for now) */
832  flags = pq_getmsgbyte(in);
833 
834  if (flags != 0)
835  elog(ERROR, "unrecognized flags %u in commit message", flags);
836 
837  /* read fields */
838  commit_data->commit_lsn = pq_getmsgint64(in);
839  commit_data->end_lsn = pq_getmsgint64(in);
840  commit_data->committime = pq_getmsgint64(in);
841 
842  return xid;
843 }
844 
845 /*
846  * Write STREAM ABORT to the output stream. Note that xid and subxid will be
847  * same for the top-level transaction abort.
848  */
849 void
851  TransactionId subxid)
852 {
854 
856 
857  /* transaction ID */
858  pq_sendint32(out, xid);
859  pq_sendint32(out, subxid);
860 }
861 
862 /*
863  * Read STREAM ABORT from the output stream.
864  */
865 void
867  TransactionId *subxid)
868 {
869  Assert(xid && subxid);
870 
871  *xid = pq_getmsgint(in, 4);
872  *subxid = pq_getmsgint(in, 4);
873 }
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
Definition: proto.c:466
#define NIL
Definition: pg_list.h:65
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
TimestampTz commit_time
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: postgres.h:314
TransactionId xid
Definition: logicalproto.h:114
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define VARDATA(PTR)
Definition: postgres.h:302
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:289
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:311
static void pq_sendint(StringInfo buf, uint32 i, int b)
Definition: pqformat.h:172
uint32 TransactionId
Definition: c.h:575
#define MaxTupleAttributeNumber
Definition: htup_details.h:33
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:824
#define RelationGetDescr(relation)
Definition: rel.h:483
void logicalrep_write_stream_stop(StringInfo out)
Definition: proto.c:790
#define VARSIZE(PTR)
Definition: postgres.h:303
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:59
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define VARHDRSZ
Definition: c.h:615
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:343
char * pstrdup(const char *in)
Definition: mcxt.c:1187
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:722
#define LOGICALREP_COLUMN_NULL
Definition: logicalproto.h:79
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:397
static struct @144 value
unsigned char uint8
Definition: c.h:427
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid)
Definition: proto.c:850
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:81
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:74
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, bool binary)
Definition: proto.c:259
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:112
StringInfoData * colvalues
Definition: logicalproto.h:70
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
Form_pg_class rd_rel
Definition: rel.h:110
NameData relname
Definition: pg_class.h:38
unsigned int Oid
Definition: postgres_ext.h:31
List * lappend_oid(List *list, Oid datum)
Definition: list.c:372
#define OidIsValid(objectId)
Definition: c.h:698
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:82
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, HeapTuple newtuple, bool binary)
Definition: proto.c:141
Bitmapset * attkeys
Definition: logicalproto.h:98
LogicalRepRelId remoteid
Definition: logicalproto.h:90
#define LOGICALREP_IS_REPLICA_IDENTITY
Definition: proto.c:26
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:163
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
Definition: proto.c:756
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:80
unsigned short uint16
Definition: c.h:428
void pfree(void *pointer)
Definition: mcxt.c:1057
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
Definition: proto.c:368
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:45
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:453
bytea * OidSendFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1675
uint32 t_len
Definition: htup.h:64
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3289
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:555
static void logicalrep_write_attrs(StringInfo out, Relation rel)
Definition: proto.c:621
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:283
#define RelationGetRelationName(relation)
Definition: rel.h:491
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:193
XLogRecPtr final_lsn
void pq_sendcountedtext(StringInfo buf, const char *str, int slen, bool countincludesself)
Definition: pqformat.c:142
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
Definition: proto.c:680
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:799
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:45
XLogRecPtr final_lsn
Definition: logicalproto.h:112
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1127
void * palloc0(Size size)
Definition: mcxt.c:981
uintptr_t Datum
Definition: postgres.h:367
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1175
TransactionId xid
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:742
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
void bms_free(Bitmapset *a)
Definition: bitmapset.c:208
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:792
void pq_copymsgbytes(StringInfo msg, char *buf, int datalen)
Definition: pqformat.c:530
XLogRecPtr end_lsn
FormData_pg_type * Form_pg_type
Definition: pg_type.h:261
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:218
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:736
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:94
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:1249
static Datum values[MAXATTR]
Definition: bootstrap.c:165
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition: proto.c:774
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1656
void * palloc(Size size)
Definition: mcxt.c:950
#define elog(elevel,...)
Definition: elog.h:228
int i
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
Definition: proto.c:422
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
Bitmapset * RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
Definition: relcache.c:4956
#define TRUNCATE_CASCADE
Definition: proto.c:28
#define NameStr(name)
Definition: c.h:669
Definition: c.h:609
char * logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
Definition: proto.c:128
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary)
Definition: proto.c:185
TimestampTz committime
Definition: logicalproto.h:113
#define TransactionIdIsValid(xid)
Definition: transam.h:41
Oid getBaseType(Oid typid)
Definition: lsyscache.c:2441
static void pq_sendint8(StringInfo buf, uint8 i)
Definition: pqformat.h:129
Definition: pg_list.h:50
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:427
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:29
#define RelationGetRelid(relation)
Definition: rel.h:457
TimestampTz committime
Definition: logicalproto.h:121
void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid)
Definition: proto.c:866
uint32 LogicalRepRelId
Definition: logicalproto.h:84
#define RelationGetNamespace(relation)
Definition: rel.h:498