PostgreSQL Source Code  git master
proto.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * proto.c
4  * logical replication protocol functions
5  *
6  * Copyright (c) 2015-2020, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/logical/proto.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/sysattr.h"
16 #include "catalog/pg_namespace.h"
17 #include "catalog/pg_type.h"
18 #include "libpq/pqformat.h"
20 #include "utils/lsyscache.h"
21 #include "utils/syscache.h"
22 
23 /*
24  * Protocol message flags.
25  */
26 #define LOGICALREP_IS_REPLICA_IDENTITY 1
27 
28 #define TRUNCATE_CASCADE (1<<0)
29 #define TRUNCATE_RESTART_SEQS (1<<1)
30 
31 static void logicalrep_write_attrs(StringInfo out, Relation rel);
32 static void logicalrep_write_tuple(StringInfo out, Relation rel,
33  HeapTuple tuple, bool binary);
34 
37 
38 static void logicalrep_write_namespace(StringInfo out, Oid nspid);
39 static const char *logicalrep_read_namespace(StringInfo in);
40 
41 /*
42  * Write BEGIN to the output stream.
43  */
44 void
46 {
47  pq_sendbyte(out, 'B'); /* BEGIN */
48 
49  /* fixed fields */
50  pq_sendint64(out, txn->final_lsn);
51  pq_sendint64(out, txn->commit_time);
52  pq_sendint32(out, txn->xid);
53 }
54 
55 /*
56  * Read transaction BEGIN from the stream.
57  */
58 void
60 {
61  /* read fields */
62  begin_data->final_lsn = pq_getmsgint64(in);
63  if (begin_data->final_lsn == InvalidXLogRecPtr)
64  elog(ERROR, "final_lsn not set in begin message");
65  begin_data->committime = pq_getmsgint64(in);
66  begin_data->xid = pq_getmsgint(in, 4);
67 }
68 
69 
70 /*
71  * Write COMMIT to the output stream.
72  */
73 void
75  XLogRecPtr commit_lsn)
76 {
77  uint8 flags = 0;
78 
79  pq_sendbyte(out, 'C'); /* sending COMMIT */
80 
81  /* send the flags field (unused for now) */
82  pq_sendbyte(out, flags);
83 
84  /* send fields */
85  pq_sendint64(out, commit_lsn);
86  pq_sendint64(out, txn->end_lsn);
87  pq_sendint64(out, txn->commit_time);
88 }
89 
90 /*
91  * Read transaction COMMIT from the stream.
92  */
93 void
95 {
96  /* read flags (unused for now) */
97  uint8 flags = pq_getmsgbyte(in);
98 
99  if (flags != 0)
100  elog(ERROR, "unrecognized flags %u in commit message", flags);
101 
102  /* read fields */
103  commit_data->commit_lsn = pq_getmsgint64(in);
104  commit_data->end_lsn = pq_getmsgint64(in);
105  commit_data->committime = pq_getmsgint64(in);
106 }
107 
108 /*
109  * Write ORIGIN to the output stream.
110  */
111 void
112 logicalrep_write_origin(StringInfo out, const char *origin,
113  XLogRecPtr origin_lsn)
114 {
115  pq_sendbyte(out, 'O'); /* ORIGIN */
116 
117  /* fixed fields */
118  pq_sendint64(out, origin_lsn);
119 
120  /* origin string */
121  pq_sendstring(out, origin);
122 }
123 
124 /*
125  * Read ORIGIN from the output stream.
126  */
127 char *
129 {
130  /* fixed fields */
131  *origin_lsn = pq_getmsgint64(in);
132 
133  /* return origin */
134  return pstrdup(pq_getmsgstring(in));
135 }
136 
137 /*
138  * Write INSERT to the output stream.
139  */
140 void
141 logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple, bool binary)
142 {
143  pq_sendbyte(out, 'I'); /* action INSERT */
144 
145  /* use Oid as relation identifier */
146  pq_sendint32(out, RelationGetRelid(rel));
147 
148  pq_sendbyte(out, 'N'); /* new tuple follows */
149  logicalrep_write_tuple(out, rel, newtuple, binary);
150 }
151 
152 /*
153  * Read INSERT from stream.
154  *
155  * Fills the new tuple.
156  */
159 {
160  char action;
161  LogicalRepRelId relid;
162 
163  /* read the relation id */
164  relid = pq_getmsgint(in, 4);
165 
166  action = pq_getmsgbyte(in);
167  if (action != 'N')
168  elog(ERROR, "expected new tuple but got %d",
169  action);
170 
171  logicalrep_read_tuple(in, newtup);
172 
173  return relid;
174 }
175 
176 /*
177  * Write UPDATE to the output stream.
178  */
179 void
181  HeapTuple newtuple, bool binary)
182 {
183  pq_sendbyte(out, 'U'); /* action UPDATE */
184 
185  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
186  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
187  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
188 
189  /* use Oid as relation identifier */
190  pq_sendint32(out, RelationGetRelid(rel));
191 
192  if (oldtuple != NULL)
193  {
194  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
195  pq_sendbyte(out, 'O'); /* old tuple follows */
196  else
197  pq_sendbyte(out, 'K'); /* old key follows */
198  logicalrep_write_tuple(out, rel, oldtuple, binary);
199  }
200 
201  pq_sendbyte(out, 'N'); /* new tuple follows */
202  logicalrep_write_tuple(out, rel, newtuple, binary);
203 }
204 
205 /*
206  * Read UPDATE from stream.
207  */
209 logicalrep_read_update(StringInfo in, bool *has_oldtuple,
210  LogicalRepTupleData *oldtup,
211  LogicalRepTupleData *newtup)
212 {
213  char action;
214  LogicalRepRelId relid;
215 
216  /* read the relation id */
217  relid = pq_getmsgint(in, 4);
218 
219  /* read and verify action */
220  action = pq_getmsgbyte(in);
221  if (action != 'K' && action != 'O' && action != 'N')
222  elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
223  action);
224 
225  /* check for old tuple */
226  if (action == 'K' || action == 'O')
227  {
228  logicalrep_read_tuple(in, oldtup);
229  *has_oldtuple = true;
230 
231  action = pq_getmsgbyte(in);
232  }
233  else
234  *has_oldtuple = false;
235 
236  /* check for new tuple */
237  if (action != 'N')
238  elog(ERROR, "expected action 'N', got %c",
239  action);
240 
241  logicalrep_read_tuple(in, newtup);
242 
243  return relid;
244 }
245 
246 /*
247  * Write DELETE to the output stream.
248  */
249 void
250 logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool binary)
251 {
252  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
253  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
254  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
255 
256  pq_sendbyte(out, 'D'); /* action DELETE */
257 
258  /* use Oid as relation identifier */
259  pq_sendint32(out, RelationGetRelid(rel));
260 
261  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
262  pq_sendbyte(out, 'O'); /* old tuple follows */
263  else
264  pq_sendbyte(out, 'K'); /* old key follows */
265 
266  logicalrep_write_tuple(out, rel, oldtuple, binary);
267 }
268 
269 /*
270  * Read DELETE from stream.
271  *
272  * Fills the old tuple.
273  */
276 {
277  char action;
278  LogicalRepRelId relid;
279 
280  /* read the relation id */
281  relid = pq_getmsgint(in, 4);
282 
283  /* read and verify action */
284  action = pq_getmsgbyte(in);
285  if (action != 'K' && action != 'O')
286  elog(ERROR, "expected action 'O' or 'K', got %c", action);
287 
288  logicalrep_read_tuple(in, oldtup);
289 
290  return relid;
291 }
292 
293 /*
294  * Write TRUNCATE to the output stream.
295  */
296 void
298  int nrelids,
299  Oid relids[],
300  bool cascade, bool restart_seqs)
301 {
302  int i;
303  uint8 flags = 0;
304 
305  pq_sendbyte(out, 'T'); /* action TRUNCATE */
306 
307  pq_sendint32(out, nrelids);
308 
309  /* encode and send truncate flags */
310  if (cascade)
311  flags |= TRUNCATE_CASCADE;
312  if (restart_seqs)
313  flags |= TRUNCATE_RESTART_SEQS;
314  pq_sendint8(out, flags);
315 
316  for (i = 0; i < nrelids; i++)
317  pq_sendint32(out, relids[i]);
318 }
319 
320 /*
321  * Read TRUNCATE from stream.
322  */
323 List *
325  bool *cascade, bool *restart_seqs)
326 {
327  int i;
328  int nrelids;
329  List *relids = NIL;
330  uint8 flags;
331 
332  nrelids = pq_getmsgint(in, 4);
333 
334  /* read and decode truncate flags */
335  flags = pq_getmsgint(in, 1);
336  *cascade = (flags & TRUNCATE_CASCADE) > 0;
337  *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
338 
339  for (i = 0; i < nrelids; i++)
340  relids = lappend_oid(relids, pq_getmsgint(in, 4));
341 
342  return relids;
343 }
344 
345 /*
346  * Write relation description to the output stream.
347  */
348 void
350 {
351  char *relname;
352 
353  pq_sendbyte(out, 'R'); /* sending RELATION */
354 
355  /* use Oid as relation identifier */
356  pq_sendint32(out, RelationGetRelid(rel));
357 
358  /* send qualified relation name */
360  relname = RelationGetRelationName(rel);
361  pq_sendstring(out, relname);
362 
363  /* send replica identity */
364  pq_sendbyte(out, rel->rd_rel->relreplident);
365 
366  /* send the attribute info */
367  logicalrep_write_attrs(out, rel);
368 }
369 
370 /*
371  * Read the relation info from stream and return as LogicalRepRelation.
372  */
375 {
377 
378  rel->remoteid = pq_getmsgint(in, 4);
379 
380  /* Read relation name from stream */
382  rel->relname = pstrdup(pq_getmsgstring(in));
383 
384  /* Read the replica identity. */
385  rel->replident = pq_getmsgbyte(in);
386 
387  /* Get attribute description */
388  logicalrep_read_attrs(in, rel);
389 
390  return rel;
391 }
392 
393 /*
394  * Write type info to the output stream.
395  *
396  * This function will always write base type info.
397  */
398 void
400 {
401  Oid basetypoid = getBaseType(typoid);
402  HeapTuple tup;
403  Form_pg_type typtup;
404 
405  pq_sendbyte(out, 'Y'); /* sending TYPE */
406 
407  tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
408  if (!HeapTupleIsValid(tup))
409  elog(ERROR, "cache lookup failed for type %u", basetypoid);
410  typtup = (Form_pg_type) GETSTRUCT(tup);
411 
412  /* use Oid as relation identifier */
413  pq_sendint32(out, typoid);
414 
415  /* send qualified type name */
416  logicalrep_write_namespace(out, typtup->typnamespace);
417  pq_sendstring(out, NameStr(typtup->typname));
418 
419  ReleaseSysCache(tup);
420 }
421 
422 /*
423  * Read type info from the output stream.
424  */
425 void
427 {
428  ltyp->remoteid = pq_getmsgint(in, 4);
429 
430  /* Read type name from stream */
432  ltyp->typname = pstrdup(pq_getmsgstring(in));
433 }
434 
435 /*
436  * Write a tuple to the outputstream, in the most efficient format possible.
437  */
438 static void
440 {
441  TupleDesc desc;
443  bool isnull[MaxTupleAttributeNumber];
444  int i;
445  uint16 nliveatts = 0;
446 
447  desc = RelationGetDescr(rel);
448 
449  for (i = 0; i < desc->natts; i++)
450  {
451  if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
452  continue;
453  nliveatts++;
454  }
455  pq_sendint16(out, nliveatts);
456 
457  /* try to allocate enough memory from the get-go */
458  enlargeStringInfo(out, tuple->t_len +
459  nliveatts * (1 + 4));
460 
461  heap_deform_tuple(tuple, desc, values, isnull);
462 
463  /* Write the values */
464  for (i = 0; i < desc->natts; i++)
465  {
466  HeapTuple typtup;
467  Form_pg_type typclass;
468  Form_pg_attribute att = TupleDescAttr(desc, i);
469  char *outputstr;
470 
471  if (att->attisdropped || att->attgenerated)
472  continue;
473 
474  if (isnull[i])
475  {
477  continue;
478  }
479 
480  if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
481  {
482  /*
483  * Unchanged toasted datum. (Note that we don't promise to detect
484  * unchanged data in general; this is just a cheap check to avoid
485  * sending large values unnecessarily.)
486  */
488  continue;
489  }
490 
491  typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
492  if (!HeapTupleIsValid(typtup))
493  elog(ERROR, "cache lookup failed for type %u", att->atttypid);
494  typclass = (Form_pg_type) GETSTRUCT(typtup);
495 
496  /*
497  * Send in binary if requested and type has suitable send function.
498  */
499  if (binary && OidIsValid(typclass->typsend))
500  {
501  bytea *outputbytes;
502  int len;
503 
505  outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
506  len = VARSIZE(outputbytes) - VARHDRSZ;
507  pq_sendint(out, len, 4); /* length */
508  pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
509  pfree(outputbytes);
510  }
511  else
512  {
514  outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
515  pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
516  pfree(outputstr);
517  }
518 
519  ReleaseSysCache(typtup);
520  }
521 }
522 
523 /*
524  * Read tuple in logical replication format from stream.
525  */
526 static void
528 {
529  int i;
530  int natts;
531 
532  /* Get number of attributes */
533  natts = pq_getmsgint(in, 2);
534 
535  /* Allocate space for per-column values; zero out unused StringInfoDatas */
536  tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
537  tuple->colstatus = (char *) palloc(natts * sizeof(char));
538  tuple->ncols = natts;
539 
540  /* Read the data */
541  for (i = 0; i < natts; i++)
542  {
543  char kind;
544  int len;
545  StringInfo value = &tuple->colvalues[i];
546 
547  kind = pq_getmsgbyte(in);
548  tuple->colstatus[i] = kind;
549 
550  switch (kind)
551  {
553  /* nothing more to do */
554  break;
556  /* we don't receive the value of an unchanged column */
557  break;
559  len = pq_getmsgint(in, 4); /* read length */
560 
561  /* and data */
562  value->data = palloc(len + 1);
563  pq_copymsgbytes(in, value->data, len);
564  value->data[len] = '\0';
565  /* make StringInfo fully valid */
566  value->len = len;
567  value->cursor = 0;
568  value->maxlen = len;
569  break;
571  len = pq_getmsgint(in, 4); /* read length */
572 
573  /* and data */
574  value->data = palloc(len + 1);
575  pq_copymsgbytes(in, value->data, len);
576  /* not strictly necessary but per StringInfo practice */
577  value->data[len] = '\0';
578  /* make StringInfo fully valid */
579  value->len = len;
580  value->cursor = 0;
581  value->maxlen = len;
582  break;
583  default:
584  elog(ERROR, "unrecognized data representation type '%c'", kind);
585  }
586  }
587 }
588 
589 /*
590  * Write relation attribute metadata to the stream.
591  */
592 static void
594 {
595  TupleDesc desc;
596  int i;
597  uint16 nliveatts = 0;
598  Bitmapset *idattrs = NULL;
599  bool replidentfull;
600 
601  desc = RelationGetDescr(rel);
602 
603  /* send number of live attributes */
604  for (i = 0; i < desc->natts; i++)
605  {
606  if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
607  continue;
608  nliveatts++;
609  }
610  pq_sendint16(out, nliveatts);
611 
612  /* fetch bitmap of REPLICATION IDENTITY attributes */
613  replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
614  if (!replidentfull)
615  idattrs = RelationGetIndexAttrBitmap(rel,
617 
618  /* send the attributes */
619  for (i = 0; i < desc->natts; i++)
620  {
621  Form_pg_attribute att = TupleDescAttr(desc, i);
622  uint8 flags = 0;
623 
624  if (att->attisdropped || att->attgenerated)
625  continue;
626 
627  /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
628  if (replidentfull ||
630  idattrs))
632 
633  pq_sendbyte(out, flags);
634 
635  /* attribute name */
636  pq_sendstring(out, NameStr(att->attname));
637 
638  /* attribute type id */
639  pq_sendint32(out, (int) att->atttypid);
640 
641  /* attribute mode */
642  pq_sendint32(out, att->atttypmod);
643  }
644 
645  bms_free(idattrs);
646 }
647 
648 /*
649  * Read relation attribute metadata from the stream.
650  */
651 static void
653 {
654  int i;
655  int natts;
656  char **attnames;
657  Oid *atttyps;
658  Bitmapset *attkeys = NULL;
659 
660  natts = pq_getmsgint(in, 2);
661  attnames = palloc(natts * sizeof(char *));
662  atttyps = palloc(natts * sizeof(Oid));
663 
664  /* read the attributes */
665  for (i = 0; i < natts; i++)
666  {
667  uint8 flags;
668 
669  /* Check for replica identity column */
670  flags = pq_getmsgbyte(in);
671  if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
672  attkeys = bms_add_member(attkeys, i);
673 
674  /* attribute name */
675  attnames[i] = pstrdup(pq_getmsgstring(in));
676 
677  /* attribute type id */
678  atttyps[i] = (Oid) pq_getmsgint(in, 4);
679 
680  /* we ignore attribute mode for now */
681  (void) pq_getmsgint(in, 4);
682  }
683 
684  rel->attnames = attnames;
685  rel->atttyps = atttyps;
686  rel->attkeys = attkeys;
687  rel->natts = natts;
688 }
689 
690 /*
691  * Write the namespace name or empty string for pg_catalog (to save space).
692  */
693 static void
695 {
696  if (nspid == PG_CATALOG_NAMESPACE)
697  pq_sendbyte(out, '\0');
698  else
699  {
700  char *nspname = get_namespace_name(nspid);
701 
702  if (nspname == NULL)
703  elog(ERROR, "cache lookup failed for namespace %u",
704  nspid);
705 
706  pq_sendstring(out, nspname);
707  }
708 }
709 
710 /*
711  * Read the namespace name while treating empty string as pg_catalog.
712  */
713 static const char *
715 {
716  const char *nspname = pq_getmsgstring(in);
717 
718  if (nspname[0] == '\0')
719  nspname = "pg_catalog";
720 
721  return nspname;
722 }
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
Definition: proto.c:439
#define NIL
Definition: pg_list.h:65
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
TimestampTz commit_time
void logicalrep_write_typ(StringInfo out, Oid typoid)
Definition: proto.c:399
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: postgres.h:314
TransactionId xid
Definition: logicalproto.h:81
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define VARDATA(PTR)
Definition: postgres.h:302
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:275
static void pq_sendint(StringInfo buf, uint32 i, int b)
Definition: pqformat.h:172
char * typname
Definition: logicalproto.h:73
#define MaxTupleAttributeNumber
Definition: htup_details.h:33
#define RelationGetDescr(relation)
Definition: rel.h:482
#define VARSIZE(PTR)
Definition: postgres.h:303
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:59
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define VARHDRSZ
Definition: c.h:568
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:324
char * pstrdup(const char *in)
Definition: mcxt.c:1186
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:694
#define LOGICALREP_COLUMN_NULL
Definition: logicalproto.h:46
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:374
unsigned char uint8
Definition: c.h:372
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:48
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:74
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:112
StringInfoData * colvalues
Definition: logicalproto.h:37
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
Form_pg_class rd_rel
Definition: rel.h:109
NameData relname
Definition: pg_class.h:38
unsigned int Oid
Definition: postgres_ext.h:31
List * lappend_oid(List *list, Oid datum)
Definition: list.c:357
#define OidIsValid(objectId)
Definition: c.h:651
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:49
Bitmapset * attkeys
Definition: logicalproto.h:65
LogicalRepRelId remoteid
Definition: logicalproto.h:57
#define LOGICALREP_IS_REPLICA_IDENTITY
Definition: proto.c:26
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:158
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:47
unsigned short uint16
Definition: c.h:373
void pfree(void *pointer)
Definition: mcxt.c:1056
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:426
bytea * OidSendFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1676
uint32 t_len
Definition: htup.h:64
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3191
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:527
static void logicalrep_write_attrs(StringInfo out, Relation rel)
Definition: proto.c:593
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:283
#define RelationGetRelationName(relation)
Definition: rel.h:490
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:193
XLogRecPtr final_lsn
void pq_sendcountedtext(StringInfo buf, const char *str, int slen, bool countincludesself)
Definition: pqformat.c:142
void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:297
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
Definition: proto.c:652
void logicalrep_write_rel(StringInfo out, Relation rel)
Definition: proto.c:349
void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool binary)
Definition: proto.c:250
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:45
XLogRecPtr final_lsn
Definition: logicalproto.h:79
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1116
void logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple, bool binary)
Definition: proto.c:141
void * palloc0(Size size)
Definition: mcxt.c:980
uintptr_t Datum
Definition: postgres.h:367
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164
TransactionId xid
static struct @143 value
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:714
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
void bms_free(Bitmapset *a)
Definition: bitmapset.c:208
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:745
void pq_copymsgbytes(StringInfo msg, char *buf, int datalen)
Definition: pqformat.c:530
XLogRecPtr end_lsn
FormData_pg_type * Form_pg_type
Definition: pg_type.h:255
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:209
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:736
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:94
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:1249
static Datum values[MAXATTR]
Definition: bootstrap.c:167
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1657
void * palloc(Size size)
Definition: mcxt.c:949
#define elog(elevel,...)
Definition: elog.h:214
int i
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
Bitmapset * RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
Definition: relcache.c:4937
#define TRUNCATE_CASCADE
Definition: proto.c:28
#define NameStr(name)
Definition: c.h:622
Definition: c.h:562
char * logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
Definition: proto.c:128
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary)
Definition: proto.c:180
TimestampTz committime
Definition: logicalproto.h:80
XLogRecPtr commit_lsn
Definition: logicalproto.h:86
Oid getBaseType(Oid typid)
Definition: lsyscache.c:2409
static void pq_sendint8(StringInfo buf, uint8 i)
Definition: pqformat.h:129
Definition: pg_list.h:50
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:427
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:29
#define RelationGetRelid(relation)
Definition: rel.h:456
TimestampTz committime
Definition: logicalproto.h:88
char * nspname
Definition: logicalproto.h:72
uint32 LogicalRepRelId
Definition: logicalproto.h:51
#define RelationGetNamespace(relation)
Definition: rel.h:497