PostgreSQL Source Code  git master
proto.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * proto.c
4  * logical replication protocol functions
5  *
6  * Copyright (c) 2015-2024, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/logical/proto.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/sysattr.h"
16 #include "catalog/pg_namespace.h"
17 #include "catalog/pg_type.h"
18 #include "libpq/pqformat.h"
20 #include "utils/lsyscache.h"
21 #include "utils/syscache.h"
22 
23 /*
24  * Protocol message flags.
25  */
26 #define LOGICALREP_IS_REPLICA_IDENTITY 1
27 
28 #define MESSAGE_TRANSACTIONAL (1<<0)
29 #define TRUNCATE_CASCADE (1<<0)
30 #define TRUNCATE_RESTART_SEQS (1<<1)
31 
32 static void logicalrep_write_attrs(StringInfo out, Relation rel,
33  Bitmapset *columns, bool include_gencols);
34 static void logicalrep_write_tuple(StringInfo out, Relation rel,
35  TupleTableSlot *slot,
36  bool binary, Bitmapset *columns,
37  bool include_gencols);
40 
42 static const char *logicalrep_read_namespace(StringInfo in);
43 
44 /*
45  * Write BEGIN to the output stream.
46  */
47 void
49 {
51 
52  /* fixed fields */
53  pq_sendint64(out, txn->final_lsn);
55  pq_sendint32(out, txn->xid);
56 }
57 
58 /*
59  * Read transaction BEGIN from the stream.
60  */
61 void
63 {
64  /* read fields */
65  begin_data->final_lsn = pq_getmsgint64(in);
66  if (begin_data->final_lsn == InvalidXLogRecPtr)
67  elog(ERROR, "final_lsn not set in begin message");
68  begin_data->committime = pq_getmsgint64(in);
69  begin_data->xid = pq_getmsgint(in, 4);
70 }
71 
72 
73 /*
74  * Write COMMIT to the output stream.
75  */
76 void
78  XLogRecPtr commit_lsn)
79 {
80  uint8 flags = 0;
81 
83 
84  /* send the flags field (unused for now) */
85  pq_sendbyte(out, flags);
86 
87  /* send fields */
88  pq_sendint64(out, commit_lsn);
89  pq_sendint64(out, txn->end_lsn);
91 }
92 
93 /*
94  * Read transaction COMMIT from the stream.
95  */
96 void
98 {
99  /* read flags (unused for now) */
100  uint8 flags = pq_getmsgbyte(in);
101 
102  if (flags != 0)
103  elog(ERROR, "unrecognized flags %u in commit message", flags);
104 
105  /* read fields */
106  commit_data->commit_lsn = pq_getmsgint64(in);
107  commit_data->end_lsn = pq_getmsgint64(in);
108  commit_data->committime = pq_getmsgint64(in);
109 }
110 
111 /*
112  * Write BEGIN PREPARE to the output stream.
113  */
114 void
116 {
118 
119  /* fixed fields */
120  pq_sendint64(out, txn->final_lsn);
121  pq_sendint64(out, txn->end_lsn);
123  pq_sendint32(out, txn->xid);
124 
125  /* send gid */
126  pq_sendstring(out, txn->gid);
127 }
128 
129 /*
130  * Read transaction BEGIN PREPARE from the stream.
131  */
132 void
134 {
135  /* read fields */
136  begin_data->prepare_lsn = pq_getmsgint64(in);
137  if (begin_data->prepare_lsn == InvalidXLogRecPtr)
138  elog(ERROR, "prepare_lsn not set in begin prepare message");
139  begin_data->end_lsn = pq_getmsgint64(in);
140  if (begin_data->end_lsn == InvalidXLogRecPtr)
141  elog(ERROR, "end_lsn not set in begin prepare message");
142  begin_data->prepare_time = pq_getmsgint64(in);
143  begin_data->xid = pq_getmsgint(in, 4);
144 
145  /* read gid (copy it into a pre-allocated buffer) */
146  strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
147 }
148 
149 /*
150  * The core functionality for logicalrep_write_prepare and
151  * logicalrep_write_stream_prepare.
152  */
153 static void
155  ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
156 {
157  uint8 flags = 0;
158 
159  pq_sendbyte(out, type);
160 
161  /*
162  * This should only ever happen for two-phase commit transactions, in
163  * which case we expect to have a valid GID.
164  */
165  Assert(txn->gid != NULL);
166  Assert(rbtxn_prepared(txn));
168 
169  /* send the flags field */
170  pq_sendbyte(out, flags);
171 
172  /* send fields */
173  pq_sendint64(out, prepare_lsn);
174  pq_sendint64(out, txn->end_lsn);
176  pq_sendint32(out, txn->xid);
177 
178  /* send gid */
179  pq_sendstring(out, txn->gid);
180 }
181 
182 /*
183  * Write PREPARE to the output stream.
184  */
185 void
187  XLogRecPtr prepare_lsn)
188 {
190  txn, prepare_lsn);
191 }
192 
193 /*
194  * The core functionality for logicalrep_read_prepare and
195  * logicalrep_read_stream_prepare.
196  */
197 static void
199  LogicalRepPreparedTxnData *prepare_data)
200 {
201  /* read flags */
202  uint8 flags = pq_getmsgbyte(in);
203 
204  if (flags != 0)
205  elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
206 
207  /* read fields */
208  prepare_data->prepare_lsn = pq_getmsgint64(in);
209  if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
210  elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
211  prepare_data->end_lsn = pq_getmsgint64(in);
212  if (prepare_data->end_lsn == InvalidXLogRecPtr)
213  elog(ERROR, "end_lsn is not set in %s message", msgtype);
214  prepare_data->prepare_time = pq_getmsgint64(in);
215  prepare_data->xid = pq_getmsgint(in, 4);
216  if (prepare_data->xid == InvalidTransactionId)
217  elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
218 
219  /* read gid (copy it into a pre-allocated buffer) */
220  strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
221 }
222 
223 /*
224  * Read transaction PREPARE from the stream.
225  */
226 void
228 {
229  logicalrep_read_prepare_common(in, "prepare", prepare_data);
230 }
231 
232 /*
233  * Write COMMIT PREPARED to the output stream.
234  */
235 void
237  XLogRecPtr commit_lsn)
238 {
239  uint8 flags = 0;
240 
242 
243  /*
244  * This should only ever happen for two-phase commit transactions, in
245  * which case we expect to have a valid GID.
246  */
247  Assert(txn->gid != NULL);
248 
249  /* send the flags field */
250  pq_sendbyte(out, flags);
251 
252  /* send fields */
253  pq_sendint64(out, commit_lsn);
254  pq_sendint64(out, txn->end_lsn);
256  pq_sendint32(out, txn->xid);
257 
258  /* send gid */
259  pq_sendstring(out, txn->gid);
260 }
261 
262 /*
263  * Read transaction COMMIT PREPARED from the stream.
264  */
265 void
267 {
268  /* read flags */
269  uint8 flags = pq_getmsgbyte(in);
270 
271  if (flags != 0)
272  elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
273 
274  /* read fields */
275  prepare_data->commit_lsn = pq_getmsgint64(in);
276  if (prepare_data->commit_lsn == InvalidXLogRecPtr)
277  elog(ERROR, "commit_lsn is not set in commit prepared message");
278  prepare_data->end_lsn = pq_getmsgint64(in);
279  if (prepare_data->end_lsn == InvalidXLogRecPtr)
280  elog(ERROR, "end_lsn is not set in commit prepared message");
281  prepare_data->commit_time = pq_getmsgint64(in);
282  prepare_data->xid = pq_getmsgint(in, 4);
283 
284  /* read gid (copy it into a pre-allocated buffer) */
285  strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
286 }
287 
288 /*
289  * Write ROLLBACK PREPARED to the output stream.
290  */
291 void
293  XLogRecPtr prepare_end_lsn,
294  TimestampTz prepare_time)
295 {
296  uint8 flags = 0;
297 
299 
300  /*
301  * This should only ever happen for two-phase commit transactions, in
302  * which case we expect to have a valid GID.
303  */
304  Assert(txn->gid != NULL);
305 
306  /* send the flags field */
307  pq_sendbyte(out, flags);
308 
309  /* send fields */
310  pq_sendint64(out, prepare_end_lsn);
311  pq_sendint64(out, txn->end_lsn);
312  pq_sendint64(out, prepare_time);
314  pq_sendint32(out, txn->xid);
315 
316  /* send gid */
317  pq_sendstring(out, txn->gid);
318 }
319 
320 /*
321  * Read transaction ROLLBACK PREPARED from the stream.
322  */
323 void
325  LogicalRepRollbackPreparedTxnData *rollback_data)
326 {
327  /* read flags */
328  uint8 flags = pq_getmsgbyte(in);
329 
330  if (flags != 0)
331  elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
332 
333  /* read fields */
334  rollback_data->prepare_end_lsn = pq_getmsgint64(in);
335  if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
336  elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
337  rollback_data->rollback_end_lsn = pq_getmsgint64(in);
338  if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
339  elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
340  rollback_data->prepare_time = pq_getmsgint64(in);
341  rollback_data->rollback_time = pq_getmsgint64(in);
342  rollback_data->xid = pq_getmsgint(in, 4);
343 
344  /* read gid (copy it into a pre-allocated buffer) */
345  strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
346 }
347 
348 /*
349  * Write STREAM PREPARE to the output stream.
350  */
351 void
353  ReorderBufferTXN *txn,
354  XLogRecPtr prepare_lsn)
355 {
357  txn, prepare_lsn);
358 }
359 
360 /*
361  * Read STREAM PREPARE from the stream.
362  */
363 void
365 {
366  logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
367 }
368 
369 /*
370  * Write ORIGIN to the output stream.
371  */
372 void
373 logicalrep_write_origin(StringInfo out, const char *origin,
374  XLogRecPtr origin_lsn)
375 {
377 
378  /* fixed fields */
379  pq_sendint64(out, origin_lsn);
380 
381  /* origin string */
382  pq_sendstring(out, origin);
383 }
384 
385 /*
386  * Read ORIGIN from the output stream.
387  */
388 char *
390 {
391  /* fixed fields */
392  *origin_lsn = pq_getmsgint64(in);
393 
394  /* return origin */
395  return pstrdup(pq_getmsgstring(in));
396 }
397 
398 /*
399  * Write INSERT to the output stream.
400  */
401 void
403  TupleTableSlot *newslot, bool binary,
404  Bitmapset *columns, bool include_gencols)
405 {
407 
408  /* transaction ID (if not valid, we're not streaming) */
409  if (TransactionIdIsValid(xid))
410  pq_sendint32(out, xid);
411 
412  /* use Oid as relation identifier */
413  pq_sendint32(out, RelationGetRelid(rel));
414 
415  pq_sendbyte(out, 'N'); /* new tuple follows */
416  logicalrep_write_tuple(out, rel, newslot, binary, columns, include_gencols);
417 }
418 
419 /*
420  * Read INSERT from stream.
421  *
422  * Fills the new tuple.
423  */
426 {
427  char action;
428  LogicalRepRelId relid;
429 
430  /* read the relation id */
431  relid = pq_getmsgint(in, 4);
432 
433  action = pq_getmsgbyte(in);
434  if (action != 'N')
435  elog(ERROR, "expected new tuple but got %d",
436  action);
437 
438  logicalrep_read_tuple(in, newtup);
439 
440  return relid;
441 }
442 
443 /*
444  * Write UPDATE to the output stream.
445  */
446 void
448  TupleTableSlot *oldslot, TupleTableSlot *newslot,
449  bool binary, Bitmapset *columns, bool include_gencols)
450 {
452 
453  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
454  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
455  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
456 
457  /* transaction ID (if not valid, we're not streaming) */
458  if (TransactionIdIsValid(xid))
459  pq_sendint32(out, xid);
460 
461  /* use Oid as relation identifier */
462  pq_sendint32(out, RelationGetRelid(rel));
463 
464  if (oldslot != NULL)
465  {
466  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
467  pq_sendbyte(out, 'O'); /* old tuple follows */
468  else
469  pq_sendbyte(out, 'K'); /* old key follows */
470  logicalrep_write_tuple(out, rel, oldslot, binary, columns,
471  include_gencols);
472  }
473 
474  pq_sendbyte(out, 'N'); /* new tuple follows */
475  logicalrep_write_tuple(out, rel, newslot, binary, columns, include_gencols);
476 }
477 
478 /*
479  * Read UPDATE from stream.
480  */
482 logicalrep_read_update(StringInfo in, bool *has_oldtuple,
483  LogicalRepTupleData *oldtup,
484  LogicalRepTupleData *newtup)
485 {
486  char action;
487  LogicalRepRelId relid;
488 
489  /* read the relation id */
490  relid = pq_getmsgint(in, 4);
491 
492  /* read and verify action */
493  action = pq_getmsgbyte(in);
494  if (action != 'K' && action != 'O' && action != 'N')
495  elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
496  action);
497 
498  /* check for old tuple */
499  if (action == 'K' || action == 'O')
500  {
501  logicalrep_read_tuple(in, oldtup);
502  *has_oldtuple = true;
503 
504  action = pq_getmsgbyte(in);
505  }
506  else
507  *has_oldtuple = false;
508 
509  /* check for new tuple */
510  if (action != 'N')
511  elog(ERROR, "expected action 'N', got %c",
512  action);
513 
514  logicalrep_read_tuple(in, newtup);
515 
516  return relid;
517 }
518 
519 /*
520  * Write DELETE to the output stream.
521  */
522 void
524  TupleTableSlot *oldslot, bool binary,
525  Bitmapset *columns, bool include_gencols)
526 {
527  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
528  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
529  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
530 
532 
533  /* transaction ID (if not valid, we're not streaming) */
534  if (TransactionIdIsValid(xid))
535  pq_sendint32(out, xid);
536 
537  /* use Oid as relation identifier */
538  pq_sendint32(out, RelationGetRelid(rel));
539 
540  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
541  pq_sendbyte(out, 'O'); /* old tuple follows */
542  else
543  pq_sendbyte(out, 'K'); /* old key follows */
544 
545  logicalrep_write_tuple(out, rel, oldslot, binary, columns, include_gencols);
546 }
547 
548 /*
549  * Read DELETE from stream.
550  *
551  * Fills the old tuple.
552  */
555 {
556  char action;
557  LogicalRepRelId relid;
558 
559  /* read the relation id */
560  relid = pq_getmsgint(in, 4);
561 
562  /* read and verify action */
563  action = pq_getmsgbyte(in);
564  if (action != 'K' && action != 'O')
565  elog(ERROR, "expected action 'O' or 'K', got %c", action);
566 
567  logicalrep_read_tuple(in, oldtup);
568 
569  return relid;
570 }
571 
572 /*
573  * Write TRUNCATE to the output stream.
574  */
575 void
577  TransactionId xid,
578  int nrelids,
579  Oid relids[],
580  bool cascade, bool restart_seqs)
581 {
582  int i;
583  uint8 flags = 0;
584 
586 
587  /* transaction ID (if not valid, we're not streaming) */
588  if (TransactionIdIsValid(xid))
589  pq_sendint32(out, xid);
590 
591  pq_sendint32(out, nrelids);
592 
593  /* encode and send truncate flags */
594  if (cascade)
595  flags |= TRUNCATE_CASCADE;
596  if (restart_seqs)
597  flags |= TRUNCATE_RESTART_SEQS;
598  pq_sendint8(out, flags);
599 
600  for (i = 0; i < nrelids; i++)
601  pq_sendint32(out, relids[i]);
602 }
603 
604 /*
605  * Read TRUNCATE from stream.
606  */
607 List *
609  bool *cascade, bool *restart_seqs)
610 {
611  int i;
612  int nrelids;
613  List *relids = NIL;
614  uint8 flags;
615 
616  nrelids = pq_getmsgint(in, 4);
617 
618  /* read and decode truncate flags */
619  flags = pq_getmsgint(in, 1);
620  *cascade = (flags & TRUNCATE_CASCADE) > 0;
621  *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
622 
623  for (i = 0; i < nrelids; i++)
624  relids = lappend_oid(relids, pq_getmsgint(in, 4));
625 
626  return relids;
627 }
628 
629 /*
630  * Write MESSAGE to stream
631  */
632 void
634  bool transactional, const char *prefix, Size sz,
635  const char *message)
636 {
637  uint8 flags = 0;
638 
640 
641  /* encode and send message flags */
642  if (transactional)
643  flags |= MESSAGE_TRANSACTIONAL;
644 
645  /* transaction ID (if not valid, we're not streaming) */
646  if (TransactionIdIsValid(xid))
647  pq_sendint32(out, xid);
648 
649  pq_sendint8(out, flags);
650  pq_sendint64(out, lsn);
651  pq_sendstring(out, prefix);
652  pq_sendint32(out, sz);
653  pq_sendbytes(out, message, sz);
654 }
655 
656 /*
657  * Write relation description to the output stream.
658  */
659 void
661  Bitmapset *columns, bool include_gencols)
662 {
663  char *relname;
664 
666 
667  /* transaction ID (if not valid, we're not streaming) */
668  if (TransactionIdIsValid(xid))
669  pq_sendint32(out, xid);
670 
671  /* use Oid as relation identifier */
672  pq_sendint32(out, RelationGetRelid(rel));
673 
674  /* send qualified relation name */
677  pq_sendstring(out, relname);
678 
679  /* send replica identity */
680  pq_sendbyte(out, rel->rd_rel->relreplident);
681 
682  /* send the attribute info */
683  logicalrep_write_attrs(out, rel, columns, include_gencols);
684 }
685 
686 /*
687  * Read the relation info from stream and return as LogicalRepRelation.
688  */
691 {
693 
694  rel->remoteid = pq_getmsgint(in, 4);
695 
696  /* Read relation name from stream */
698  rel->relname = pstrdup(pq_getmsgstring(in));
699 
700  /* Read the replica identity. */
701  rel->replident = pq_getmsgbyte(in);
702 
703  /* Get attribute description */
704  logicalrep_read_attrs(in, rel);
705 
706  return rel;
707 }
708 
709 /*
710  * Write type info to the output stream.
711  *
712  * This function will always write base type info.
713  */
714 void
716 {
717  Oid basetypoid = getBaseType(typoid);
718  HeapTuple tup;
719  Form_pg_type typtup;
720 
722 
723  /* transaction ID (if not valid, we're not streaming) */
724  if (TransactionIdIsValid(xid))
725  pq_sendint32(out, xid);
726 
727  tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
728  if (!HeapTupleIsValid(tup))
729  elog(ERROR, "cache lookup failed for type %u", basetypoid);
730  typtup = (Form_pg_type) GETSTRUCT(tup);
731 
732  /* use Oid as type identifier */
733  pq_sendint32(out, typoid);
734 
735  /* send qualified type name */
736  logicalrep_write_namespace(out, typtup->typnamespace);
737  pq_sendstring(out, NameStr(typtup->typname));
738 
739  ReleaseSysCache(tup);
740 }
741 
742 /*
743  * Read type info from the output stream.
744  */
745 void
747 {
748  ltyp->remoteid = pq_getmsgint(in, 4);
749 
750  /* Read type name from stream */
752  ltyp->typname = pstrdup(pq_getmsgstring(in));
753 }
754 
755 /*
756  * Write a tuple to the outputstream, in the most efficient format possible.
757  */
758 static void
760  bool binary, Bitmapset *columns, bool include_gencols)
761 {
762  TupleDesc desc;
763  Datum *values;
764  bool *isnull;
765  int i;
766  uint16 nliveatts = 0;
767 
768  desc = RelationGetDescr(rel);
769 
770  for (i = 0; i < desc->natts; i++)
771  {
772  Form_pg_attribute att = TupleDescAttr(desc, i);
773 
774  if (!logicalrep_should_publish_column(att, columns, include_gencols))
775  continue;
776 
777  nliveatts++;
778  }
779  pq_sendint16(out, nliveatts);
780 
781  slot_getallattrs(slot);
782  values = slot->tts_values;
783  isnull = slot->tts_isnull;
784 
785  /* Write the values */
786  for (i = 0; i < desc->natts; i++)
787  {
788  HeapTuple typtup;
789  Form_pg_type typclass;
790  Form_pg_attribute att = TupleDescAttr(desc, i);
791 
792  if (!logicalrep_should_publish_column(att, columns, include_gencols))
793  continue;
794 
795  if (isnull[i])
796  {
798  continue;
799  }
800 
801  if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
802  {
803  /*
804  * Unchanged toasted datum. (Note that we don't promise to detect
805  * unchanged data in general; this is just a cheap check to avoid
806  * sending large values unnecessarily.)
807  */
809  continue;
810  }
811 
812  typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
813  if (!HeapTupleIsValid(typtup))
814  elog(ERROR, "cache lookup failed for type %u", att->atttypid);
815  typclass = (Form_pg_type) GETSTRUCT(typtup);
816 
817  /*
818  * Send in binary if requested and type has suitable send function.
819  */
820  if (binary && OidIsValid(typclass->typsend))
821  {
822  bytea *outputbytes;
823  int len;
824 
826  outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
827  len = VARSIZE(outputbytes) - VARHDRSZ;
828  pq_sendint(out, len, 4); /* length */
829  pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
830  pfree(outputbytes);
831  }
832  else
833  {
834  char *outputstr;
835 
837  outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
838  pq_sendcountedtext(out, outputstr, strlen(outputstr));
839  pfree(outputstr);
840  }
841 
842  ReleaseSysCache(typtup);
843  }
844 }
845 
846 /*
847  * Read tuple in logical replication format from stream.
848  */
849 static void
851 {
852  int i;
853  int natts;
854 
855  /* Get number of attributes */
856  natts = pq_getmsgint(in, 2);
857 
858  /* Allocate space for per-column values; zero out unused StringInfoDatas */
859  tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
860  tuple->colstatus = (char *) palloc(natts * sizeof(char));
861  tuple->ncols = natts;
862 
863  /* Read the data */
864  for (i = 0; i < natts; i++)
865  {
866  char *buff;
867  char kind;
868  int len;
869  StringInfo value = &tuple->colvalues[i];
870 
871  kind = pq_getmsgbyte(in);
872  tuple->colstatus[i] = kind;
873 
874  switch (kind)
875  {
877  /* nothing more to do */
878  break;
880  /* we don't receive the value of an unchanged column */
881  break;
884  len = pq_getmsgint(in, 4); /* read length */
885 
886  /* and data */
887  buff = palloc(len + 1);
888  pq_copymsgbytes(in, buff, len);
889 
890  /*
891  * NUL termination is required for LOGICALREP_COLUMN_TEXT mode
892  * as input functions require that. For
893  * LOGICALREP_COLUMN_BINARY it's not technically required, but
894  * it's harmless.
895  */
896  buff[len] = '\0';
897 
899  break;
900  default:
901  elog(ERROR, "unrecognized data representation type '%c'", kind);
902  }
903  }
904 }
905 
906 /*
907  * Write relation attribute metadata to the stream.
908  */
909 static void
911  bool include_gencols)
912 {
913  TupleDesc desc;
914  int i;
915  uint16 nliveatts = 0;
916  Bitmapset *idattrs = NULL;
917  bool replidentfull;
918 
919  desc = RelationGetDescr(rel);
920 
921  /* send number of live attributes */
922  for (i = 0; i < desc->natts; i++)
923  {
924  Form_pg_attribute att = TupleDescAttr(desc, i);
925 
926  if (!logicalrep_should_publish_column(att, columns, include_gencols))
927  continue;
928 
929  nliveatts++;
930  }
931  pq_sendint16(out, nliveatts);
932 
933  /* fetch bitmap of REPLICATION IDENTITY attributes */
934  replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
935  if (!replidentfull)
936  idattrs = RelationGetIdentityKeyBitmap(rel);
937 
938  /* send the attributes */
939  for (i = 0; i < desc->natts; i++)
940  {
941  Form_pg_attribute att = TupleDescAttr(desc, i);
942  uint8 flags = 0;
943 
944  if (!logicalrep_should_publish_column(att, columns, include_gencols))
945  continue;
946 
947  /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
948  if (replidentfull ||
950  idattrs))
952 
953  pq_sendbyte(out, flags);
954 
955  /* attribute name */
956  pq_sendstring(out, NameStr(att->attname));
957 
958  /* attribute type id */
959  pq_sendint32(out, (int) att->atttypid);
960 
961  /* attribute mode */
962  pq_sendint32(out, att->atttypmod);
963  }
964 
965  bms_free(idattrs);
966 }
967 
968 /*
969  * Read relation attribute metadata from the stream.
970  */
971 static void
973 {
974  int i;
975  int natts;
976  char **attnames;
977  Oid *atttyps;
978  Bitmapset *attkeys = NULL;
979 
980  natts = pq_getmsgint(in, 2);
981  attnames = palloc(natts * sizeof(char *));
982  atttyps = palloc(natts * sizeof(Oid));
983 
984  /* read the attributes */
985  for (i = 0; i < natts; i++)
986  {
987  uint8 flags;
988 
989  /* Check for replica identity column */
990  flags = pq_getmsgbyte(in);
991  if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
992  attkeys = bms_add_member(attkeys, i);
993 
994  /* attribute name */
995  attnames[i] = pstrdup(pq_getmsgstring(in));
996 
997  /* attribute type id */
998  atttyps[i] = (Oid) pq_getmsgint(in, 4);
999 
1000  /* we ignore attribute mode for now */
1001  (void) pq_getmsgint(in, 4);
1002  }
1003 
1004  rel->attnames = attnames;
1005  rel->atttyps = atttyps;
1006  rel->attkeys = attkeys;
1007  rel->natts = natts;
1008 }
1009 
1010 /*
1011  * Write the namespace name or empty string for pg_catalog (to save space).
1012  */
1013 static void
1015 {
1016  if (nspid == PG_CATALOG_NAMESPACE)
1017  pq_sendbyte(out, '\0');
1018  else
1019  {
1020  char *nspname = get_namespace_name(nspid);
1021 
1022  if (nspname == NULL)
1023  elog(ERROR, "cache lookup failed for namespace %u",
1024  nspid);
1025 
1026  pq_sendstring(out, nspname);
1027  }
1028 }
1029 
1030 /*
1031  * Read the namespace name while treating empty string as pg_catalog.
1032  */
1033 static const char *
1035 {
1036  const char *nspname = pq_getmsgstring(in);
1037 
1038  if (nspname[0] == '\0')
1039  nspname = "pg_catalog";
1040 
1041  return nspname;
1042 }
1043 
1044 /*
1045  * Write the information for the start stream message to the output stream.
1046  */
1047 void
1049  TransactionId xid, bool first_segment)
1050 {
1052 
1054 
1055  /* transaction ID (we're starting to stream, so must be valid) */
1056  pq_sendint32(out, xid);
1057 
1058  /* 1 if this is the first streaming segment for this xid */
1059  pq_sendbyte(out, first_segment ? 1 : 0);
1060 }
1061 
1062 /*
1063  * Read the information about the start stream message from output stream.
1064  */
1067 {
1068  TransactionId xid;
1069 
1070  Assert(first_segment);
1071 
1072  xid = pq_getmsgint(in, 4);
1073  *first_segment = (pq_getmsgbyte(in) == 1);
1074 
1075  return xid;
1076 }
1077 
1078 /*
1079  * Write the stop stream message to the output stream.
1080  */
1081 void
1083 {
1085 }
1086 
1087 /*
1088  * Write STREAM COMMIT to the output stream.
1089  */
1090 void
1092  XLogRecPtr commit_lsn)
1093 {
1094  uint8 flags = 0;
1095 
1097 
1099 
1100  /* transaction ID */
1101  pq_sendint32(out, txn->xid);
1102 
1103  /* send the flags field (unused for now) */
1104  pq_sendbyte(out, flags);
1105 
1106  /* send fields */
1107  pq_sendint64(out, commit_lsn);
1108  pq_sendint64(out, txn->end_lsn);
1109  pq_sendint64(out, txn->xact_time.commit_time);
1110 }
1111 
1112 /*
1113  * Read STREAM COMMIT from the output stream.
1114  */
1117 {
1118  TransactionId xid;
1119  uint8 flags;
1120 
1121  xid = pq_getmsgint(in, 4);
1122 
1123  /* read flags (unused for now) */
1124  flags = pq_getmsgbyte(in);
1125 
1126  if (flags != 0)
1127  elog(ERROR, "unrecognized flags %u in commit message", flags);
1128 
1129  /* read fields */
1130  commit_data->commit_lsn = pq_getmsgint64(in);
1131  commit_data->end_lsn = pq_getmsgint64(in);
1132  commit_data->committime = pq_getmsgint64(in);
1133 
1134  return xid;
1135 }
1136 
1137 /*
1138  * Write STREAM ABORT to the output stream. Note that xid and subxid will be
1139  * same for the top-level transaction abort.
1140  *
1141  * If write_abort_info is true, send the abort_lsn and abort_time fields,
1142  * otherwise don't.
1143  */
1144 void
1146  TransactionId subxid, XLogRecPtr abort_lsn,
1147  TimestampTz abort_time, bool write_abort_info)
1148 {
1150 
1152 
1153  /* transaction ID */
1154  pq_sendint32(out, xid);
1155  pq_sendint32(out, subxid);
1156 
1157  if (write_abort_info)
1158  {
1159  pq_sendint64(out, abort_lsn);
1160  pq_sendint64(out, abort_time);
1161  }
1162 }
1163 
1164 /*
1165  * Read STREAM ABORT from the output stream.
1166  *
1167  * If read_abort_info is true, read the abort_lsn and abort_time fields,
1168  * otherwise don't.
1169  */
1170 void
1172  LogicalRepStreamAbortData *abort_data,
1173  bool read_abort_info)
1174 {
1175  Assert(abort_data);
1176 
1177  abort_data->xid = pq_getmsgint(in, 4);
1178  abort_data->subxid = pq_getmsgint(in, 4);
1179 
1180  if (read_abort_info)
1181  {
1182  abort_data->abort_lsn = pq_getmsgint64(in);
1183  abort_data->abort_time = pq_getmsgint64(in);
1184  }
1185  else
1186  {
1187  abort_data->abort_lsn = InvalidXLogRecPtr;
1188  abort_data->abort_time = 0;
1189  }
1190 }
1191 
1192 /*
1193  * Get string representing LogicalRepMsgType.
1194  */
1195 const char *
1197 {
1198  static char err_unknown[20];
1199 
1200  switch (action)
1201  {
1202  case LOGICAL_REP_MSG_BEGIN:
1203  return "BEGIN";
1205  return "COMMIT";
1207  return "ORIGIN";
1209  return "INSERT";
1211  return "UPDATE";
1213  return "DELETE";
1215  return "TRUNCATE";
1217  return "RELATION";
1218  case LOGICAL_REP_MSG_TYPE:
1219  return "TYPE";
1221  return "MESSAGE";
1223  return "BEGIN PREPARE";
1225  return "PREPARE";
1227  return "COMMIT PREPARED";
1229  return "ROLLBACK PREPARED";
1231  return "STREAM START";
1233  return "STREAM STOP";
1235  return "STREAM COMMIT";
1237  return "STREAM ABORT";
1239  return "STREAM PREPARE";
1240  }
1241 
1242  /*
1243  * This message provides context in the error raised when applying a
1244  * logical message. So we can't throw an error here. Return an unknown
1245  * indicator value so that the original error is still reported.
1246  */
1247  snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);
1248 
1249  return err_unknown;
1250 }
1251 
1252 /*
1253  * Check if the column 'att' of a table should be published.
1254  *
1255  * 'columns' represents the publication column list (if any) for that table.
1256  *
1257  * 'include_gencols' flag indicates whether generated columns should be
1258  * published when there is no column list. Typically, this will have the same
1259  * value as the 'publish_generated_columns' publication parameter.
1260  *
1261  * Note that generated columns can be published only when present in a
1262  * publication column list, or when include_gencols is true.
1263  */
1264 bool
1266  bool include_gencols)
1267 {
1268  if (att->attisdropped)
1269  return false;
1270 
1271  /* If a column list is provided, publish only the cols in that list. */
1272  if (columns)
1273  return bms_is_member(att->attnum, columns);
1274 
1275  /* All non-generated columns are always published. */
1276  return att->attgenerated ? include_gencols : true;
1277 }
void bms_free(Bitmapset *a)
Definition: bitmapset.c:239
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:510
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:815
static Datum values[MAXATTR]
Definition: bootstrap.c:151
#define NameStr(name)
Definition: c.h:700
uint8_t uint8
Definition: c.h:483
#define VARHDRSZ
Definition: c.h:646
#define Assert(condition)
Definition: c.h:812
uint16_t uint16
Definition: c.h:484
uint32 TransactionId
Definition: c.h:606
#define OidIsValid(objectId)
Definition: c.h:729
size_t Size
Definition: c.h:559
int nspid
int64 TimestampTz
Definition: timestamp.h:39
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1763
bytea * OidSendFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1782
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
static struct @160 value
int i
Definition: isn.c:72
List * lappend_oid(List *list, Oid datum)
Definition: list.c:375
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:97
LogicalRepMsgType
Definition: logicalproto.h:58
@ LOGICAL_REP_MSG_INSERT
Definition: logicalproto.h:62
@ LOGICAL_REP_MSG_TRUNCATE
Definition: logicalproto.h:65
@ LOGICAL_REP_MSG_STREAM_STOP
Definition: logicalproto.h:74
@ LOGICAL_REP_MSG_BEGIN
Definition: logicalproto.h:59
@ LOGICAL_REP_MSG_STREAM_PREPARE
Definition: logicalproto.h:77
@ LOGICAL_REP_MSG_STREAM_ABORT
Definition: logicalproto.h:76
@ LOGICAL_REP_MSG_BEGIN_PREPARE
Definition: logicalproto.h:69
@ LOGICAL_REP_MSG_STREAM_START
Definition: logicalproto.h:73
@ LOGICAL_REP_MSG_COMMIT
Definition: logicalproto.h:60
@ LOGICAL_REP_MSG_PREPARE
Definition: logicalproto.h:70
@ LOGICAL_REP_MSG_RELATION
Definition: logicalproto.h:66
@ LOGICAL_REP_MSG_MESSAGE
Definition: logicalproto.h:68
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
Definition: logicalproto.h:72
@ LOGICAL_REP_MSG_COMMIT_PREPARED
Definition: logicalproto.h:71
@ LOGICAL_REP_MSG_TYPE
Definition: logicalproto.h:67
@ LOGICAL_REP_MSG_DELETE
Definition: logicalproto.h:64
@ LOGICAL_REP_MSG_STREAM_COMMIT
Definition: logicalproto.h:75
@ LOGICAL_REP_MSG_ORIGIN
Definition: logicalproto.h:61
@ LOGICAL_REP_MSG_UPDATE
Definition: logicalproto.h:63
uint32 LogicalRepRelId
Definition: logicalproto.h:101
#define LOGICALREP_COLUMN_NULL
Definition: logicalproto.h:96
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:99
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:98
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3366
Oid getBaseType(Oid typid)
Definition: lsyscache.c:2521
char * pstrdup(const char *in)
Definition: mcxt.c:1696
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc0(Size size)
Definition: mcxt.c:1347
void * palloc(Size size)
Definition: mcxt.c:1317
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:209
NameData relname
Definition: pg_class.h:38
const void size_t len
#define NIL
Definition: pg_list.h:68
FormData_pg_type * Form_pg_type
Definition: pg_type.h:261
#define snprintf
Definition: port.h:238
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
uintptr_t Datum
Definition: postgres.h:64
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
unsigned int Oid
Definition: postgres_ext.h:31
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:415
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
Definition: pqformat.c:126
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:579
void pq_copymsgbytes(StringInfo msg, char *buf, int datalen)
Definition: pqformat.c:528
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:195
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:453
void pq_sendcountedtext(StringInfo buf, const char *str, int slen)
Definition: pqformat.c:142
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:144
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152
static void pq_sendint8(StringInfo buf, uint8 i)
Definition: pqformat.h:128
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:136
static void pq_sendint(StringInfo buf, uint32 i, int b)
Definition: pqformat.h:171
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:97
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:1014
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:30
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:554
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:77
void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: proto.c:292
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
Definition: proto.c:324
const char * logicalrep_message_type(LogicalRepMsgType action)
Definition: proto.c:1196
static void logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:154
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns, bool include_gencols)
Definition: proto.c:660
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:373
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
Definition: proto.c:133
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid, XLogRecPtr abort_lsn, TimestampTz abort_time, bool write_abort_info)
Definition: proto.c:1145
static void logicalrep_read_prepare_common(StringInfo in, char *msgtype, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:198
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:608
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:746
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns, bool include_gencols)
Definition: proto.c:402
static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, bool binary, Bitmapset *columns, bool include_gencols)
Definition: proto.c:759
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:482
void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: proto.c:633
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
Definition: proto.c:972
void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
Definition: proto.c:1171
void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:186
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:62
#define MESSAGE_TRANSACTIONAL
Definition: proto.c:28
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
Definition: proto.c:715
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:576
bool logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns, bool include_gencols)
Definition: proto.c:1265
#define LOGICALREP_IS_REPLICA_IDENTITY
Definition: proto.c:26
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns, bool include_gencols)
Definition: proto.c:523
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:850
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:48
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
Definition: proto.c:266
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:690
void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:236
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:1091
void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:352
static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, bool include_gencols)
Definition: proto.c:910
#define TRUNCATE_CASCADE
Definition: proto.c:29
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns, bool include_gencols)
Definition: proto.c:447
void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:115
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:364
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
Definition: proto.c:1048
char * logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
Definition: proto.c:389
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:1116
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:425
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:227
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:1034
void logicalrep_write_stream_stop(StringInfo out)
Definition: proto.c:1082
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition: proto.c:1066
#define RelationGetRelid(relation)
Definition: rel.h:505
#define RelationGetDescr(relation)
Definition: rel.h:531
#define RelationGetRelationName(relation)
Definition: rel.h:539
#define RelationGetNamespace(relation)
Definition: rel.h:546
Bitmapset * RelationGetIdentityKeyBitmap(Relation relation)
Definition: relcache.c:5506
#define rbtxn_prepared(txn)
static void initStringInfoFromString(StringInfo str, char *data, int len)
Definition: stringinfo.h:148
Definition: pg_list.h:54
XLogRecPtr final_lsn
Definition: logicalproto.h:129
TransactionId xid
Definition: logicalproto.h:131
TimestampTz committime
Definition: logicalproto.h:130
TimestampTz committime
Definition: logicalproto.h:138
LogicalRepRelId remoteid
Definition: logicalproto.h:107
Bitmapset * attkeys
Definition: logicalproto.h:115
StringInfoData * colvalues
Definition: logicalproto.h:87
Form_pg_class rd_rel
Definition: rel.h:111
TimestampTz commit_time
XLogRecPtr final_lsn
XLogRecPtr end_lsn
TimestampTz prepare_time
TransactionId xid
union ReorderBufferTXN::@114 xact_time
bool * tts_isnull
Definition: tuptable.h:127
Datum * tts_values
Definition: tuptable.h:125
Definition: c.h:641
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:269
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:221
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:368
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: varatt.h:290
#define VARDATA(PTR)
Definition: varatt.h:278
#define VARSIZE(PTR)
Definition: varatt.h:279
const char * type
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28