PostgreSQL Source Code  git master
proto.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * proto.c
4  * logical replication protocol functions
5  *
6  * Copyright (c) 2015-2020, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/logical/proto.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/sysattr.h"
16 #include "catalog/pg_namespace.h"
17 #include "catalog/pg_type.h"
18 #include "libpq/pqformat.h"
20 #include "utils/builtins.h"
21 #include "utils/lsyscache.h"
22 #include "utils/syscache.h"
23 
24 /*
25  * Protocol message flags.
26  */
27 #define LOGICALREP_IS_REPLICA_IDENTITY 1
28 
29 #define TRUNCATE_CASCADE (1<<0)
30 #define TRUNCATE_RESTART_SEQS (1<<1)
31 
32 static void logicalrep_write_attrs(StringInfo out, Relation rel);
33 static void logicalrep_write_tuple(StringInfo out, Relation rel,
34  HeapTuple tuple);
35 
38 
39 static void logicalrep_write_namespace(StringInfo out, Oid nspid);
40 static const char *logicalrep_read_namespace(StringInfo in);
41 
42 /*
43  * Write BEGIN to the output stream.
44  */
45 void
47 {
48  pq_sendbyte(out, 'B'); /* BEGIN */
49 
50  /* fixed fields */
51  pq_sendint64(out, txn->final_lsn);
52  pq_sendint64(out, txn->commit_time);
53  pq_sendint32(out, txn->xid);
54 }
55 
56 /*
57  * Read transaction BEGIN from the stream.
58  */
59 void
61 {
62  /* read fields */
63  begin_data->final_lsn = pq_getmsgint64(in);
64  if (begin_data->final_lsn == InvalidXLogRecPtr)
65  elog(ERROR, "final_lsn not set in begin message");
66  begin_data->committime = pq_getmsgint64(in);
67  begin_data->xid = pq_getmsgint(in, 4);
68 }
69 
70 
71 /*
72  * Write COMMIT to the output stream.
73  */
74 void
76  XLogRecPtr commit_lsn)
77 {
78  uint8 flags = 0;
79 
80  pq_sendbyte(out, 'C'); /* sending COMMIT */
81 
82  /* send the flags field (unused for now) */
83  pq_sendbyte(out, flags);
84 
85  /* send fields */
86  pq_sendint64(out, commit_lsn);
87  pq_sendint64(out, txn->end_lsn);
88  pq_sendint64(out, txn->commit_time);
89 }
90 
91 /*
92  * Read transaction COMMIT from the stream.
93  */
94 void
96 {
97  /* read flags (unused for now) */
98  uint8 flags = pq_getmsgbyte(in);
99 
100  if (flags != 0)
101  elog(ERROR, "unrecognized flags %u in commit message", flags);
102 
103  /* read fields */
104  commit_data->commit_lsn = pq_getmsgint64(in);
105  commit_data->end_lsn = pq_getmsgint64(in);
106  commit_data->committime = pq_getmsgint64(in);
107 }
108 
109 /*
110  * Write ORIGIN to the output stream.
111  */
112 void
113 logicalrep_write_origin(StringInfo out, const char *origin,
114  XLogRecPtr origin_lsn)
115 {
116  pq_sendbyte(out, 'O'); /* ORIGIN */
117 
118  /* fixed fields */
119  pq_sendint64(out, origin_lsn);
120 
121  /* origin string */
122  pq_sendstring(out, origin);
123 }
124 
125 /*
126  * Read ORIGIN from the output stream.
127  */
128 char *
130 {
131  /* fixed fields */
132  *origin_lsn = pq_getmsgint64(in);
133 
134  /* return origin */
135  return pstrdup(pq_getmsgstring(in));
136 }
137 
138 /*
139  * Write INSERT to the output stream.
140  */
141 void
143 {
144  pq_sendbyte(out, 'I'); /* action INSERT */
145 
146  /* use Oid as relation identifier */
147  pq_sendint32(out, RelationGetRelid(rel));
148 
149  pq_sendbyte(out, 'N'); /* new tuple follows */
150  logicalrep_write_tuple(out, rel, newtuple);
151 }
152 
153 /*
154  * Read INSERT from stream.
155  *
156  * Fills the new tuple.
157  */
160 {
161  char action;
162  LogicalRepRelId relid;
163 
164  /* read the relation id */
165  relid = pq_getmsgint(in, 4);
166 
167  action = pq_getmsgbyte(in);
168  if (action != 'N')
169  elog(ERROR, "expected new tuple but got %d",
170  action);
171 
172  logicalrep_read_tuple(in, newtup);
173 
174  return relid;
175 }
176 
177 /*
178  * Write UPDATE to the output stream.
179  */
180 void
182  HeapTuple newtuple)
183 {
184  pq_sendbyte(out, 'U'); /* action UPDATE */
185 
186  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
187  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
188  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
189 
190  /* use Oid as relation identifier */
191  pq_sendint32(out, RelationGetRelid(rel));
192 
193  if (oldtuple != NULL)
194  {
195  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
196  pq_sendbyte(out, 'O'); /* old tuple follows */
197  else
198  pq_sendbyte(out, 'K'); /* old key follows */
199  logicalrep_write_tuple(out, rel, oldtuple);
200  }
201 
202  pq_sendbyte(out, 'N'); /* new tuple follows */
203  logicalrep_write_tuple(out, rel, newtuple);
204 }
205 
206 /*
207  * Read UPDATE from stream.
208  */
210 logicalrep_read_update(StringInfo in, bool *has_oldtuple,
211  LogicalRepTupleData *oldtup,
212  LogicalRepTupleData *newtup)
213 {
214  char action;
215  LogicalRepRelId relid;
216 
217  /* read the relation id */
218  relid = pq_getmsgint(in, 4);
219 
220  /* read and verify action */
221  action = pq_getmsgbyte(in);
222  if (action != 'K' && action != 'O' && action != 'N')
223  elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
224  action);
225 
226  /* check for old tuple */
227  if (action == 'K' || action == 'O')
228  {
229  logicalrep_read_tuple(in, oldtup);
230  *has_oldtuple = true;
231 
232  action = pq_getmsgbyte(in);
233  }
234  else
235  *has_oldtuple = false;
236 
237  /* check for new tuple */
238  if (action != 'N')
239  elog(ERROR, "expected action 'N', got %c",
240  action);
241 
242  logicalrep_read_tuple(in, newtup);
243 
244  return relid;
245 }
246 
247 /*
248  * Write DELETE to the output stream.
249  */
250 void
252 {
253  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
254  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
255  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
256 
257  pq_sendbyte(out, 'D'); /* action DELETE */
258 
259  /* use Oid as relation identifier */
260  pq_sendint32(out, RelationGetRelid(rel));
261 
262  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
263  pq_sendbyte(out, 'O'); /* old tuple follows */
264  else
265  pq_sendbyte(out, 'K'); /* old key follows */
266 
267  logicalrep_write_tuple(out, rel, oldtuple);
268 }
269 
270 /*
271  * Read DELETE from stream.
272  *
273  * Fills the old tuple.
274  */
277 {
278  char action;
279  LogicalRepRelId relid;
280 
281  /* read the relation id */
282  relid = pq_getmsgint(in, 4);
283 
284  /* read and verify action */
285  action = pq_getmsgbyte(in);
286  if (action != 'K' && action != 'O')
287  elog(ERROR, "expected action 'O' or 'K', got %c", action);
288 
289  logicalrep_read_tuple(in, oldtup);
290 
291  return relid;
292 }
293 
294 /*
295  * Write TRUNCATE to the output stream.
296  */
297 void
299  int nrelids,
300  Oid relids[],
301  bool cascade, bool restart_seqs)
302 {
303  int i;
304  uint8 flags = 0;
305 
306  pq_sendbyte(out, 'T'); /* action TRUNCATE */
307 
308  pq_sendint32(out, nrelids);
309 
310  /* encode and send truncate flags */
311  if (cascade)
312  flags |= TRUNCATE_CASCADE;
313  if (restart_seqs)
314  flags |= TRUNCATE_RESTART_SEQS;
315  pq_sendint8(out, flags);
316 
317  for (i = 0; i < nrelids; i++)
318  pq_sendint32(out, relids[i]);
319 }
320 
321 /*
322  * Read TRUNCATE from stream.
323  */
324 List *
326  bool *cascade, bool *restart_seqs)
327 {
328  int i;
329  int nrelids;
330  List *relids = NIL;
331  uint8 flags;
332 
333  nrelids = pq_getmsgint(in, 4);
334 
335  /* read and decode truncate flags */
336  flags = pq_getmsgint(in, 1);
337  *cascade = (flags & TRUNCATE_CASCADE) > 0;
338  *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
339 
340  for (i = 0; i < nrelids; i++)
341  relids = lappend_oid(relids, pq_getmsgint(in, 4));
342 
343  return relids;
344 }
345 
346 /*
347  * Write relation description to the output stream.
348  */
349 void
351 {
352  char *relname;
353 
354  pq_sendbyte(out, 'R'); /* sending RELATION */
355 
356  /* use Oid as relation identifier */
357  pq_sendint32(out, RelationGetRelid(rel));
358 
359  /* send qualified relation name */
361  relname = RelationGetRelationName(rel);
362  pq_sendstring(out, relname);
363 
364  /* send replica identity */
365  pq_sendbyte(out, rel->rd_rel->relreplident);
366 
367  /* send the attribute info */
368  logicalrep_write_attrs(out, rel);
369 }
370 
371 /*
372  * Read the relation info from stream and return as LogicalRepRelation.
373  */
376 {
378 
379  rel->remoteid = pq_getmsgint(in, 4);
380 
381  /* Read relation name from stream */
383  rel->relname = pstrdup(pq_getmsgstring(in));
384 
385  /* Read the replica identity. */
386  rel->replident = pq_getmsgbyte(in);
387 
388  /* Get attribute description */
389  logicalrep_read_attrs(in, rel);
390 
391  return rel;
392 }
393 
394 /*
395  * Write type info to the output stream.
396  *
397  * This function will always write base type info.
398  */
399 void
401 {
402  Oid basetypoid = getBaseType(typoid);
403  HeapTuple tup;
404  Form_pg_type typtup;
405 
406  pq_sendbyte(out, 'Y'); /* sending TYPE */
407 
408  tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
409  if (!HeapTupleIsValid(tup))
410  elog(ERROR, "cache lookup failed for type %u", basetypoid);
411  typtup = (Form_pg_type) GETSTRUCT(tup);
412 
413  /* use Oid as relation identifier */
414  pq_sendint32(out, typoid);
415 
416  /* send qualified type name */
417  logicalrep_write_namespace(out, typtup->typnamespace);
418  pq_sendstring(out, NameStr(typtup->typname));
419 
420  ReleaseSysCache(tup);
421 }
422 
423 /*
424  * Read type info from the output stream.
425  */
426 void
428 {
429  ltyp->remoteid = pq_getmsgint(in, 4);
430 
431  /* Read type name from stream */
433  ltyp->typname = pstrdup(pq_getmsgstring(in));
434 }
435 
436 /*
437  * Write a tuple to the outputstream, in the most efficient format possible.
438  */
439 static void
441 {
442  TupleDesc desc;
444  bool isnull[MaxTupleAttributeNumber];
445  int i;
446  uint16 nliveatts = 0;
447 
448  desc = RelationGetDescr(rel);
449 
450  for (i = 0; i < desc->natts; i++)
451  {
452  if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
453  continue;
454  nliveatts++;
455  }
456  pq_sendint16(out, nliveatts);
457 
458  /* try to allocate enough memory from the get-go */
459  enlargeStringInfo(out, tuple->t_len +
460  nliveatts * (1 + 4));
461 
462  heap_deform_tuple(tuple, desc, values, isnull);
463 
464  /* Write the values */
465  for (i = 0; i < desc->natts; i++)
466  {
467  HeapTuple typtup;
468  Form_pg_type typclass;
469  Form_pg_attribute att = TupleDescAttr(desc, i);
470  char *outputstr;
471 
472  if (att->attisdropped || att->attgenerated)
473  continue;
474 
475  if (isnull[i])
476  {
477  pq_sendbyte(out, 'n'); /* null column */
478  continue;
479  }
480  else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
481  {
482  pq_sendbyte(out, 'u'); /* unchanged toast column */
483  continue;
484  }
485 
486  typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
487  if (!HeapTupleIsValid(typtup))
488  elog(ERROR, "cache lookup failed for type %u", att->atttypid);
489  typclass = (Form_pg_type) GETSTRUCT(typtup);
490 
491  pq_sendbyte(out, 't'); /* 'text' data follows */
492 
493  outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
494  pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
495  pfree(outputstr);
496 
497  ReleaseSysCache(typtup);
498  }
499 }
500 
501 /*
502  * Read tuple in remote format from stream.
503  *
504  * The returned tuple points into the input stringinfo.
505  */
506 static void
508 {
509  int i;
510  int natts;
511 
512  /* Get number of attributes */
513  natts = pq_getmsgint(in, 2);
514 
515  memset(tuple->changed, 0, sizeof(tuple->changed));
516 
517  /* Read the data */
518  for (i = 0; i < natts; i++)
519  {
520  char kind;
521 
522  kind = pq_getmsgbyte(in);
523 
524  switch (kind)
525  {
526  case 'n': /* null */
527  tuple->values[i] = NULL;
528  tuple->changed[i] = true;
529  break;
530  case 'u': /* unchanged column */
531  /* we don't receive the value of an unchanged column */
532  tuple->values[i] = NULL;
533  break;
534  case 't': /* text formatted value */
535  {
536  int len;
537 
538  tuple->changed[i] = true;
539 
540  len = pq_getmsgint(in, 4); /* read length */
541 
542  /* and data */
543  tuple->values[i] = palloc(len + 1);
544  pq_copymsgbytes(in, tuple->values[i], len);
545  tuple->values[i][len] = '\0';
546  }
547  break;
548  default:
549  elog(ERROR, "unrecognized data representation type '%c'", kind);
550  }
551  }
552 }
553 
554 /*
555  * Write relation attributes to the stream.
556  */
557 static void
559 {
560  TupleDesc desc;
561  int i;
562  uint16 nliveatts = 0;
563  Bitmapset *idattrs = NULL;
564  bool replidentfull;
565 
566  desc = RelationGetDescr(rel);
567 
568  /* send number of live attributes */
569  for (i = 0; i < desc->natts; i++)
570  {
571  if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
572  continue;
573  nliveatts++;
574  }
575  pq_sendint16(out, nliveatts);
576 
577  /* fetch bitmap of REPLICATION IDENTITY attributes */
578  replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
579  if (!replidentfull)
580  idattrs = RelationGetIndexAttrBitmap(rel,
582 
583  /* send the attributes */
584  for (i = 0; i < desc->natts; i++)
585  {
586  Form_pg_attribute att = TupleDescAttr(desc, i);
587  uint8 flags = 0;
588 
589  if (att->attisdropped || att->attgenerated)
590  continue;
591 
592  /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
593  if (replidentfull ||
595  idattrs))
597 
598  pq_sendbyte(out, flags);
599 
600  /* attribute name */
601  pq_sendstring(out, NameStr(att->attname));
602 
603  /* attribute type id */
604  pq_sendint32(out, (int) att->atttypid);
605 
606  /* attribute mode */
607  pq_sendint32(out, att->atttypmod);
608  }
609 
610  bms_free(idattrs);
611 }
612 
613 /*
614  * Read relation attribute names from the stream.
615  */
616 static void
618 {
619  int i;
620  int natts;
621  char **attnames;
622  Oid *atttyps;
623  Bitmapset *attkeys = NULL;
624 
625  natts = pq_getmsgint(in, 2);
626  attnames = palloc(natts * sizeof(char *));
627  atttyps = palloc(natts * sizeof(Oid));
628 
629  /* read the attributes */
630  for (i = 0; i < natts; i++)
631  {
632  uint8 flags;
633 
634  /* Check for replica identity column */
635  flags = pq_getmsgbyte(in);
636  if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
637  attkeys = bms_add_member(attkeys, i);
638 
639  /* attribute name */
640  attnames[i] = pstrdup(pq_getmsgstring(in));
641 
642  /* attribute type id */
643  atttyps[i] = (Oid) pq_getmsgint(in, 4);
644 
645  /* we ignore attribute mode for now */
646  (void) pq_getmsgint(in, 4);
647  }
648 
649  rel->attnames = attnames;
650  rel->atttyps = atttyps;
651  rel->attkeys = attkeys;
652  rel->natts = natts;
653 }
654 
655 /*
656  * Write the namespace name or empty string for pg_catalog (to save space).
657  */
658 static void
660 {
661  if (nspid == PG_CATALOG_NAMESPACE)
662  pq_sendbyte(out, '\0');
663  else
664  {
665  char *nspname = get_namespace_name(nspid);
666 
667  if (nspname == NULL)
668  elog(ERROR, "cache lookup failed for namespace %u",
669  nspid);
670 
671  pq_sendstring(out, nspname);
672  }
673 }
674 
675 /*
676  * Read the namespace name while treating empty string as pg_catalog.
677  */
678 static const char *
680 {
681  const char *nspname = pq_getmsgstring(in);
682 
683  if (nspname[0] == '\0')
684  nspname = "pg_catalog";
685 
686  return nspname;
687 }
#define NIL
Definition: pg_list.h:65
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
TimestampTz commit_time
void logicalrep_write_typ(StringInfo out, Oid typoid)
Definition: proto.c:400
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: postgres.h:314
TransactionId xid
Definition: logicalproto.h:69
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:276
char * typname
Definition: logicalproto.h:61
#define MaxTupleAttributeNumber
Definition: htup_details.h:33
#define RelationGetDescr(relation)
Definition: rel.h:482
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:60
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:325
char * pstrdup(const char *in)
Definition: mcxt.c:1186
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:659
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:375
unsigned char uint8
Definition: c.h:365
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:75
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:113
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
Form_pg_class rd_rel
Definition: rel.h:109
NameData relname
Definition: pg_class.h:38
unsigned int Oid
Definition: postgres_ext.h:31
List * lappend_oid(List *list, Oid datum)
Definition: list.c:357
Bitmapset * attkeys
Definition: logicalproto.h:53
LogicalRepRelId remoteid
Definition: logicalproto.h:45
#define LOGICALREP_IS_REPLICA_IDENTITY
Definition: proto.c:27
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:159
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
unsigned short uint16
Definition: c.h:366
void pfree(void *pointer)
Definition: mcxt.c:1056
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:427
uint32 t_len
Definition: htup.h:64
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3155
bool changed[MaxTupleAttributeNumber]
Definition: logicalproto.h:36
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:507
void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
Definition: proto.c:251
static void logicalrep_write_attrs(StringInfo out, Relation rel)
Definition: proto.c:558
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:283
#define RelationGetRelationName(relation)
Definition: rel.h:490
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:193
XLogRecPtr final_lsn
void pq_sendcountedtext(StringInfo buf, const char *str, int slen, bool countincludesself)
Definition: pqformat.c:142
void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:298
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
Definition: proto.c:617
void logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
Definition: proto.c:142
void logicalrep_write_rel(StringInfo out, Relation rel)
Definition: proto.c:350
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:46
XLogRecPtr final_lsn
Definition: logicalproto.h:67
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1116
uintptr_t Datum
Definition: postgres.h:367
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164
TransactionId xid
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:679
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
void bms_free(Bitmapset *a)
Definition: bitmapset.c:208
void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, HeapTuple newtuple)
Definition: proto.c:181
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:738
void pq_copymsgbytes(StringInfo msg, char *buf, int datalen)
Definition: pqformat.c:530
XLogRecPtr end_lsn
FormData_pg_type * Form_pg_type
Definition: pg_type.h:255
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:210
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:736
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:95
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:1249
static Datum values[MAXATTR]
Definition: bootstrap.c:167
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1657
void * palloc(Size size)
Definition: mcxt.c:949
#define elog(elevel,...)
Definition: elog.h:214
int i
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
Bitmapset * RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
Definition: relcache.c:4936
#define TRUNCATE_CASCADE
Definition: proto.c:29
#define NameStr(name)
Definition: c.h:615
char * logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
Definition: proto.c:129
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
TimestampTz committime
Definition: logicalproto.h:68
XLogRecPtr commit_lsn
Definition: logicalproto.h:74
Oid getBaseType(Oid typid)
Definition: lsyscache.c:2360
static void pq_sendint8(StringInfo buf, uint8 i)
Definition: pqformat.h:129
Definition: pg_list.h:50
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:427
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:30
#define RelationGetRelid(relation)
Definition: rel.h:456
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
Definition: proto.c:440
TimestampTz committime
Definition: logicalproto.h:76
char * nspname
Definition: logicalproto.h:60
uint32 LogicalRepRelId
Definition: logicalproto.h:39
#define RelationGetNamespace(relation)
Definition: rel.h:497