PostgreSQL Source Code  git master
proto.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * proto.c
4  * logical replication protocol functions
5  *
6  * Copyright (c) 2015-2018, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/logical/proto.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/sysattr.h"
16 #include "catalog/pg_namespace.h"
17 #include "catalog/pg_type.h"
18 #include "libpq/pqformat.h"
20 #include "utils/builtins.h"
21 #include "utils/lsyscache.h"
22 #include "utils/syscache.h"
23 
24 /*
25  * Protocol message flags.
26  */
27 #define LOGICALREP_IS_REPLICA_IDENTITY 1
28 
29 #define TRUNCATE_CASCADE (1<<0)
30 #define TRUNCATE_RESTART_SEQS (1<<1)
31 
32 static void logicalrep_write_attrs(StringInfo out, Relation rel);
33 static void logicalrep_write_tuple(StringInfo out, Relation rel,
34  HeapTuple tuple);
35 
38 
39 static void logicalrep_write_namespace(StringInfo out, Oid nspid);
40 static const char *logicalrep_read_namespace(StringInfo in);
41 
42 /*
43  * Write BEGIN to the output stream.
44  */
45 void
47 {
48  pq_sendbyte(out, 'B'); /* BEGIN */
49 
50  /* fixed fields */
51  pq_sendint64(out, txn->final_lsn);
52  pq_sendint64(out, txn->commit_time);
53  pq_sendint32(out, txn->xid);
54 }
55 
56 /*
57  * Read transaction BEGIN from the stream.
58  */
59 void
61 {
62  /* read fields */
63  begin_data->final_lsn = pq_getmsgint64(in);
64  if (begin_data->final_lsn == InvalidXLogRecPtr)
65  elog(ERROR, "final_lsn not set in begin message");
66  begin_data->committime = pq_getmsgint64(in);
67  begin_data->xid = pq_getmsgint(in, 4);
68 }
69 
70 
71 /*
72  * Write COMMIT to the output stream.
73  */
74 void
76  XLogRecPtr commit_lsn)
77 {
78  uint8 flags = 0;
79 
80  pq_sendbyte(out, 'C'); /* sending COMMIT */
81 
82  /* send the flags field (unused for now) */
83  pq_sendbyte(out, flags);
84 
85  /* send fields */
86  pq_sendint64(out, commit_lsn);
87  pq_sendint64(out, txn->end_lsn);
88  pq_sendint64(out, txn->commit_time);
89 }
90 
91 /*
92  * Read transaction COMMIT from the stream.
93  */
94 void
96 {
97  /* read flags (unused for now) */
98  uint8 flags = pq_getmsgbyte(in);
99 
100  if (flags != 0)
101  elog(ERROR, "unrecognized flags %u in commit message", flags);
102 
103  /* read fields */
104  commit_data->commit_lsn = pq_getmsgint64(in);
105  commit_data->end_lsn = pq_getmsgint64(in);
106  commit_data->committime = pq_getmsgint64(in);
107 }
108 
109 /*
110  * Write ORIGIN to the output stream.
111  */
112 void
113 logicalrep_write_origin(StringInfo out, const char *origin,
114  XLogRecPtr origin_lsn)
115 {
116  pq_sendbyte(out, 'O'); /* ORIGIN */
117 
118  /* fixed fields */
119  pq_sendint64(out, origin_lsn);
120 
121  /* origin string */
122  pq_sendstring(out, origin);
123 }
124 
125 /*
126  * Read ORIGIN from the output stream.
127  */
128 char *
130 {
131  /* fixed fields */
132  *origin_lsn = pq_getmsgint64(in);
133 
134  /* return origin */
135  return pstrdup(pq_getmsgstring(in));
136 }
137 
138 /*
139  * Write INSERT to the output stream.
140  */
141 void
143 {
144  pq_sendbyte(out, 'I'); /* action INSERT */
145 
146  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
147  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
148  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
149 
150  /* use Oid as relation identifier */
151  pq_sendint32(out, RelationGetRelid(rel));
152 
153  pq_sendbyte(out, 'N'); /* new tuple follows */
154  logicalrep_write_tuple(out, rel, newtuple);
155 }
156 
157 /*
158  * Read INSERT from stream.
159  *
160  * Fills the new tuple.
161  */
164 {
165  char action;
166  LogicalRepRelId relid;
167 
168  /* read the relation id */
169  relid = pq_getmsgint(in, 4);
170 
171  action = pq_getmsgbyte(in);
172  if (action != 'N')
173  elog(ERROR, "expected new tuple but got %d",
174  action);
175 
176  logicalrep_read_tuple(in, newtup);
177 
178  return relid;
179 }
180 
181 /*
182  * Write UPDATE to the output stream.
183  */
184 void
186  HeapTuple newtuple)
187 {
188  pq_sendbyte(out, 'U'); /* action UPDATE */
189 
190  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
191  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
192  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
193 
194  /* use Oid as relation identifier */
195  pq_sendint32(out, RelationGetRelid(rel));
196 
197  if (oldtuple != NULL)
198  {
199  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
200  pq_sendbyte(out, 'O'); /* old tuple follows */
201  else
202  pq_sendbyte(out, 'K'); /* old key follows */
203  logicalrep_write_tuple(out, rel, oldtuple);
204  }
205 
206  pq_sendbyte(out, 'N'); /* new tuple follows */
207  logicalrep_write_tuple(out, rel, newtuple);
208 }
209 
210 /*
211  * Read UPDATE from stream.
212  */
214 logicalrep_read_update(StringInfo in, bool *has_oldtuple,
215  LogicalRepTupleData *oldtup,
216  LogicalRepTupleData *newtup)
217 {
218  char action;
219  LogicalRepRelId relid;
220 
221  /* read the relation id */
222  relid = pq_getmsgint(in, 4);
223 
224  /* read and verify action */
225  action = pq_getmsgbyte(in);
226  if (action != 'K' && action != 'O' && action != 'N')
227  elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
228  action);
229 
230  /* check for old tuple */
231  if (action == 'K' || action == 'O')
232  {
233  logicalrep_read_tuple(in, oldtup);
234  *has_oldtuple = true;
235 
236  action = pq_getmsgbyte(in);
237  }
238  else
239  *has_oldtuple = false;
240 
241  /* check for new tuple */
242  if (action != 'N')
243  elog(ERROR, "expected action 'N', got %c",
244  action);
245 
246  logicalrep_read_tuple(in, newtup);
247 
248  return relid;
249 }
250 
251 /*
252  * Write DELETE to the output stream.
253  */
254 void
256 {
257  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
258  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
259  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
260 
261  pq_sendbyte(out, 'D'); /* action DELETE */
262 
263  /* use Oid as relation identifier */
264  pq_sendint32(out, RelationGetRelid(rel));
265 
266  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
267  pq_sendbyte(out, 'O'); /* old tuple follows */
268  else
269  pq_sendbyte(out, 'K'); /* old key follows */
270 
271  logicalrep_write_tuple(out, rel, oldtuple);
272 }
273 
274 /*
275  * Read DELETE from stream.
276  *
277  * Fills the old tuple.
278  */
281 {
282  char action;
283  LogicalRepRelId relid;
284 
285  /* read the relation id */
286  relid = pq_getmsgint(in, 4);
287 
288  /* read and verify action */
289  action = pq_getmsgbyte(in);
290  if (action != 'K' && action != 'O')
291  elog(ERROR, "expected action 'O' or 'K', got %c", action);
292 
293  logicalrep_read_tuple(in, oldtup);
294 
295  return relid;
296 }
297 
298 /*
299  * Write TRUNCATE to the output stream.
300  */
301 void
303  int nrelids,
304  Oid relids[],
305  bool cascade, bool restart_seqs)
306 {
307  int i;
308  uint8 flags = 0;
309 
310  pq_sendbyte(out, 'T'); /* action TRUNCATE */
311 
312  pq_sendint32(out, nrelids);
313 
314  /* encode and send truncate flags */
315  if (cascade)
316  flags |= TRUNCATE_CASCADE;
317  if (restart_seqs)
318  flags |= TRUNCATE_RESTART_SEQS;
319  pq_sendint8(out, flags);
320 
321  for (i = 0; i < nrelids; i++)
322  pq_sendint32(out, relids[i]);
323 }
324 
325 /*
326  * Read TRUNCATE from stream.
327  */
328 List *
330  bool *cascade, bool *restart_seqs)
331 {
332  int i;
333  int nrelids;
334  List *relids = NIL;
335  uint8 flags;
336 
337  nrelids = pq_getmsgint(in, 4);
338 
339  /* read and decode truncate flags */
340  flags = pq_getmsgint(in, 1);
341  *cascade = (flags & TRUNCATE_CASCADE) > 0;
342  *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
343 
344  for (i = 0; i < nrelids; i++)
345  relids = lappend_oid(relids, pq_getmsgint(in, 4));
346 
347  return relids;
348 }
349 
350 /*
351  * Write relation description to the output stream.
352  */
353 void
355 {
356  char *relname;
357 
358  pq_sendbyte(out, 'R'); /* sending RELATION */
359 
360  /* use Oid as relation identifier */
361  pq_sendint32(out, RelationGetRelid(rel));
362 
363  /* send qualified relation name */
365  relname = RelationGetRelationName(rel);
366  pq_sendstring(out, relname);
367 
368  /* send replica identity */
369  pq_sendbyte(out, rel->rd_rel->relreplident);
370 
371  /* send the attribute info */
372  logicalrep_write_attrs(out, rel);
373 }
374 
375 /*
376  * Read the relation info from stream and return as LogicalRepRelation.
377  */
380 {
382 
383  rel->remoteid = pq_getmsgint(in, 4);
384 
385  /* Read relation name from stream */
387  rel->relname = pstrdup(pq_getmsgstring(in));
388 
389  /* Read the replica identity. */
390  rel->replident = pq_getmsgbyte(in);
391 
392  /* Get attribute description */
393  logicalrep_read_attrs(in, rel);
394 
395  return rel;
396 }
397 
398 /*
399  * Write type info to the output stream.
400  *
401  * This function will always write base type info.
402  */
403 void
405 {
406  Oid basetypoid = getBaseType(typoid);
407  HeapTuple tup;
408  Form_pg_type typtup;
409 
410  pq_sendbyte(out, 'Y'); /* sending TYPE */
411 
412  tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
413  if (!HeapTupleIsValid(tup))
414  elog(ERROR, "cache lookup failed for type %u", basetypoid);
415  typtup = (Form_pg_type) GETSTRUCT(tup);
416 
417  /* use Oid as relation identifier */
418  pq_sendint32(out, typoid);
419 
420  /* send qualified type name */
421  logicalrep_write_namespace(out, typtup->typnamespace);
422  pq_sendstring(out, NameStr(typtup->typname));
423 
424  ReleaseSysCache(tup);
425 }
426 
427 /*
428  * Read type info from the output stream.
429  */
430 void
432 {
433  ltyp->remoteid = pq_getmsgint(in, 4);
434 
435  /* Read type name from stream */
437  ltyp->typname = pstrdup(pq_getmsgstring(in));
438 }
439 
440 /*
441  * Write a tuple to the outputstream, in the most efficient format possible.
442  */
443 static void
445 {
446  TupleDesc desc;
448  bool isnull[MaxTupleAttributeNumber];
449  int i;
450  uint16 nliveatts = 0;
451 
452  desc = RelationGetDescr(rel);
453 
454  for (i = 0; i < desc->natts; i++)
455  {
456  if (TupleDescAttr(desc, i)->attisdropped)
457  continue;
458  nliveatts++;
459  }
460  pq_sendint16(out, nliveatts);
461 
462  /* try to allocate enough memory from the get-go */
463  enlargeStringInfo(out, tuple->t_len +
464  nliveatts * (1 + 4));
465 
466  heap_deform_tuple(tuple, desc, values, isnull);
467 
468  /* Write the values */
469  for (i = 0; i < desc->natts; i++)
470  {
471  HeapTuple typtup;
472  Form_pg_type typclass;
473  Form_pg_attribute att = TupleDescAttr(desc, i);
474  char *outputstr;
475 
476  /* skip dropped columns */
477  if (att->attisdropped)
478  continue;
479 
480  if (isnull[i])
481  {
482  pq_sendbyte(out, 'n'); /* null column */
483  continue;
484  }
485  else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
486  {
487  pq_sendbyte(out, 'u'); /* unchanged toast column */
488  continue;
489  }
490 
491  typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
492  if (!HeapTupleIsValid(typtup))
493  elog(ERROR, "cache lookup failed for type %u", att->atttypid);
494  typclass = (Form_pg_type) GETSTRUCT(typtup);
495 
496  pq_sendbyte(out, 't'); /* 'text' data follows */
497 
498  outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
499  pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
500  pfree(outputstr);
501 
502  ReleaseSysCache(typtup);
503  }
504 }
505 
506 /*
507  * Read tuple in remote format from stream.
508  *
509  * The returned tuple points into the input stringinfo.
510  */
511 static void
513 {
514  int i;
515  int natts;
516 
517  /* Get number of attributes */
518  natts = pq_getmsgint(in, 2);
519 
520  memset(tuple->changed, 0, sizeof(tuple->changed));
521 
522  /* Read the data */
523  for (i = 0; i < natts; i++)
524  {
525  char kind;
526 
527  kind = pq_getmsgbyte(in);
528 
529  switch (kind)
530  {
531  case 'n': /* null */
532  tuple->values[i] = NULL;
533  tuple->changed[i] = true;
534  break;
535  case 'u': /* unchanged column */
536  /* we don't receive the value of an unchanged column */
537  tuple->values[i] = NULL;
538  break;
539  case 't': /* text formatted value */
540  {
541  int len;
542 
543  tuple->changed[i] = true;
544 
545  len = pq_getmsgint(in, 4); /* read length */
546 
547  /* and data */
548  tuple->values[i] = palloc(len + 1);
549  pq_copymsgbytes(in, tuple->values[i], len);
550  tuple->values[i][len] = '\0';
551  }
552  break;
553  default:
554  elog(ERROR, "unrecognized data representation type '%c'", kind);
555  }
556  }
557 }
558 
559 /*
560  * Write relation attributes to the stream.
561  */
562 static void
564 {
565  TupleDesc desc;
566  int i;
567  uint16 nliveatts = 0;
568  Bitmapset *idattrs = NULL;
569  bool replidentfull;
570 
571  desc = RelationGetDescr(rel);
572 
573  /* send number of live attributes */
574  for (i = 0; i < desc->natts; i++)
575  {
576  if (TupleDescAttr(desc, i)->attisdropped)
577  continue;
578  nliveatts++;
579  }
580  pq_sendint16(out, nliveatts);
581 
582  /* fetch bitmap of REPLICATION IDENTITY attributes */
583  replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
584  if (!replidentfull)
585  idattrs = RelationGetIndexAttrBitmap(rel,
587 
588  /* send the attributes */
589  for (i = 0; i < desc->natts; i++)
590  {
591  Form_pg_attribute att = TupleDescAttr(desc, i);
592  uint8 flags = 0;
593 
594  if (att->attisdropped)
595  continue;
596 
597  /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
598  if (replidentfull ||
600  idattrs))
602 
603  pq_sendbyte(out, flags);
604 
605  /* attribute name */
606  pq_sendstring(out, NameStr(att->attname));
607 
608  /* attribute type id */
609  pq_sendint32(out, (int) att->atttypid);
610 
611  /* attribute mode */
612  pq_sendint32(out, att->atttypmod);
613  }
614 
615  bms_free(idattrs);
616 }
617 
618 /*
619  * Read relation attribute names from the stream.
620  */
621 static void
623 {
624  int i;
625  int natts;
626  char **attnames;
627  Oid *atttyps;
628  Bitmapset *attkeys = NULL;
629 
630  natts = pq_getmsgint(in, 2);
631  attnames = palloc(natts * sizeof(char *));
632  atttyps = palloc(natts * sizeof(Oid));
633 
634  /* read the attributes */
635  for (i = 0; i < natts; i++)
636  {
637  uint8 flags;
638 
639  /* Check for replica identity column */
640  flags = pq_getmsgbyte(in);
641  if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
642  attkeys = bms_add_member(attkeys, i);
643 
644  /* attribute name */
645  attnames[i] = pstrdup(pq_getmsgstring(in));
646 
647  /* attribute type id */
648  atttyps[i] = (Oid) pq_getmsgint(in, 4);
649 
650  /* we ignore attribute mode for now */
651  (void) pq_getmsgint(in, 4);
652  }
653 
654  rel->attnames = attnames;
655  rel->atttyps = atttyps;
656  rel->attkeys = attkeys;
657  rel->natts = natts;
658 }
659 
660 /*
661  * Write the namespace name or empty string for pg_catalog (to save space).
662  */
663 static void
665 {
666  if (nspid == PG_CATALOG_NAMESPACE)
667  pq_sendbyte(out, '\0');
668  else
669  {
670  char *nspname = get_namespace_name(nspid);
671 
672  if (nspname == NULL)
673  elog(ERROR, "cache lookup failed for namespace %u",
674  nspid);
675 
676  pq_sendstring(out, nspname);
677  }
678 }
679 
680 /*
681  * Read the namespace name while treating empty string as pg_catalog.
682  */
683 static const char *
685 {
686  const char *nspname = pq_getmsgstring(in);
687 
688  if (nspname[0] == '\0')
689  nspname = "pg_catalog";
690 
691  return nspname;
692 }
#define NIL
Definition: pg_list.h:69
TimestampTz commit_time
void logicalrep_write_typ(StringInfo out, Oid typoid)
Definition: proto.c:404
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: postgres.h:314
TransactionId xid
Definition: logicalproto.h:68
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define GETSTRUCT(TUP)
Definition: htup_details.h:668
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:280
char * typname
Definition: logicalproto.h:60
#define MaxTupleAttributeNumber
Definition: htup_details.h:33
#define RelationGetDescr(relation)
Definition: rel.h:433
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:60
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
static void pq_sendint32(StringInfo buf, int32 i)
Definition: pqformat.h:148
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:93
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:329
char * pstrdup(const char *in)
Definition: mcxt.c:1161
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:664
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:379
unsigned char uint8
Definition: c.h:323
static void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.h:156
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:75
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:28
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:113
static void pq_sendbyte(StringInfo buf, int8 byt)
Definition: pqformat.h:164
Form_pg_class rd_rel
Definition: rel.h:84
unsigned int Oid
Definition: postgres_ext.h:31
List * lappend_oid(List *list, Oid datum)
Definition: list.c:164
Bitmapset * attkeys
Definition: logicalproto.h:52
int natts
Definition: tupdesc.h:82
static void pq_sendint8(StringInfo buf, int8 i)
Definition: pqformat.h:132
LogicalRepRelId remoteid
Definition: logicalproto.h:45
#define LOGICALREP_IS_REPLICA_IDENTITY
Definition: proto.c:27
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:163
unsigned short uint16
Definition: c.h:324
void pfree(void *pointer)
Definition: mcxt.c:1031
#define ObjectIdGetDatum(X)
Definition: postgres.h:492
#define ERROR
Definition: elog.h:43
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:431
uint32 t_len
Definition: htup.h:64
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3051
bool changed[MaxTupleAttributeNumber]
Definition: logicalproto.h:36
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:512
void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
Definition: proto.c:255
static void logicalrep_write_attrs(StringInfo out, Relation rel)
Definition: proto.c:563
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:264
#define RelationGetRelationName(relation)
Definition: rel.h:441
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:197
XLogRecPtr final_lsn
void pq_sendcountedtext(StringInfo buf, const char *str, int slen, bool countincludesself)
Definition: pqformat.c:142
void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:302
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
Definition: proto.c:622
void logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
Definition: proto.c:142
void logicalrep_write_rel(StringInfo out, Relation rel)
Definition: proto.c:354
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:46
XLogRecPtr final_lsn
Definition: logicalproto.h:66
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1112
uintptr_t Datum
Definition: postgres.h:367
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1160
TransactionId xid
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:684
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
void bms_free(Bitmapset *a)
Definition: bitmapset.c:267
void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, HeapTuple newtuple)
Definition: proto.c:185
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:699
void pq_copymsgbytes(StringInfo msg, char *buf, int datalen)
Definition: pqformat.c:530
XLogRecPtr end_lsn
FormData_pg_type * Form_pg_type
Definition: pg_type.h:247
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:214
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:764
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:95
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
static void pq_sendint16(StringInfo buf, int16 i)
Definition: pqformat.h:140
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:1315
static Datum values[MAXATTR]
Definition: bootstrap.c:164
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1833
void * palloc(Size size)
Definition: mcxt.c:924
int i
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
Bitmapset * RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
Definition: relcache.c:4759
#define TRUNCATE_CASCADE
Definition: proto.c:29
#define NameStr(name)
Definition: c.h:576
char * logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
Definition: proto.c:129
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
#define elog
Definition: elog.h:219
TimestampTz committime
Definition: logicalproto.h:67
XLogRecPtr commit_lsn
Definition: logicalproto.h:73
Oid getBaseType(Oid typid)
Definition: lsyscache.c:2275
Definition: pg_list.h:45
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:486
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:30
#define RelationGetRelid(relation)
Definition: rel.h:407
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
Definition: proto.c:444
TimestampTz committime
Definition: logicalproto.h:75
char * nspname
Definition: logicalproto.h:59
uint32 LogicalRepRelId
Definition: logicalproto.h:39
#define RelationGetNamespace(relation)
Definition: rel.h:448