PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
proto.c File Reference
#include "postgres.h"
#include "access/sysattr.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_type.h"
#include "libpq/pqformat.h"
#include "replication/logicalproto.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
Include dependency graph for proto.c:

Go to the source code of this file.

Macros

#define LOGICALREP_IS_REPLICA_IDENTITY   1
 

Functions

static void logicalrep_write_attrs (StringInfo out, Relation rel)
 
static void logicalrep_write_tuple (StringInfo out, Relation rel, HeapTuple tuple)
 
static void logicalrep_read_attrs (StringInfo in, LogicalRepRelation *rel)
 
static void logicalrep_read_tuple (StringInfo in, LogicalRepTupleData *tuple)
 
static void logicalrep_write_namespace (StringInfo out, Oid nspid)
 
static const char * logicalrep_read_namespace (StringInfo in)
 
void logicalrep_write_begin (StringInfo out, ReorderBufferTXN *txn)
 
void logicalrep_read_begin (StringInfo in, LogicalRepBeginData *begin_data)
 
void logicalrep_write_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
void logicalrep_read_commit (StringInfo in, LogicalRepCommitData *commit_data)
 
void logicalrep_write_origin (StringInfo out, const char *origin, XLogRecPtr origin_lsn)
 
char * logicalrep_read_origin (StringInfo in, XLogRecPtr *origin_lsn)
 
void logicalrep_write_insert (StringInfo out, Relation rel, HeapTuple newtuple)
 
LogicalRepRelId logicalrep_read_insert (StringInfo in, LogicalRepTupleData *newtup)
 
void logicalrep_write_update (StringInfo out, Relation rel, HeapTuple oldtuple, HeapTuple newtuple)
 
LogicalRepRelId logicalrep_read_update (StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
 
void logicalrep_write_delete (StringInfo out, Relation rel, HeapTuple oldtuple)
 
LogicalRepRelId logicalrep_read_delete (StringInfo in, LogicalRepTupleData *oldtup)
 
void logicalrep_write_rel (StringInfo out, Relation rel)
 
LogicalRepRelationlogicalrep_read_rel (StringInfo in)
 
void logicalrep_write_typ (StringInfo out, Oid typoid)
 
void logicalrep_read_typ (StringInfo in, LogicalRepTyp *ltyp)
 

Macro Definition Documentation

#define LOGICALREP_IS_REPLICA_IDENTITY   1

Definition at line 27 of file proto.c.

Referenced by logicalrep_read_attrs(), and logicalrep_write_attrs().

Function Documentation

static void logicalrep_read_attrs ( StringInfo  in,
LogicalRepRelation rel 
)
static

Definition at line 567 of file proto.c.

References LogicalRepRelation::attkeys, LogicalRepRelation::attnames, LogicalRepRelation::atttyps, bms_add_member(), i, LOGICALREP_IS_REPLICA_IDENTITY, LogicalRepRelation::natts, NULL, palloc(), pq_getmsgbyte(), pq_getmsgint(), pq_getmsgstring(), and pstrdup().

Referenced by logicalrep_read_rel().

568 {
569  int i;
570  int natts;
571  char **attnames;
572  Oid *atttyps;
573  Bitmapset *attkeys = NULL;
574 
575  natts = pq_getmsgint(in, 2);
576  attnames = palloc(natts * sizeof(char *));
577  atttyps = palloc(natts * sizeof(Oid));
578 
579  /* read the attributes */
580  for (i = 0; i < natts; i++)
581  {
582  uint8 flags;
583 
584  /* Check for replica identity column */
585  flags = pq_getmsgbyte(in);
586  if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
587  attkeys = bms_add_member(attkeys, i);
588 
589  /* attribute name */
590  attnames[i] = pstrdup(pq_getmsgstring(in));
591 
592  /* attribute type id */
593  atttyps[i] = (Oid) pq_getmsgint(in, 4);
594 
595  /* we ignore attribute mode for now */
596  (void) pq_getmsgint(in, 4);
597  }
598 
599  rel->attnames = attnames;
600  rel->atttyps = atttyps;
601  rel->attkeys = attkeys;
602  rel->natts = natts;
603 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:621
char * pstrdup(const char *in)
Definition: mcxt.c:1165
unsigned char uint8
Definition: c.h:263
unsigned int Oid
Definition: postgres_ext.h:31
Bitmapset * attkeys
Definition: logicalproto.h:50
#define LOGICALREP_IS_REPLICA_IDENTITY
Definition: proto.c:27
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
#define NULL
Definition: c.h:226
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:668
void * palloc(Size size)
Definition: mcxt.c:891
int i
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:448
void logicalrep_read_begin ( StringInfo  in,
LogicalRepBeginData begin_data 
)

Definition at line 57 of file proto.c.

References LogicalRepBeginData::committime, elog, ERROR, LogicalRepBeginData::final_lsn, InvalidXLogRecPtr, pq_getmsgint(), pq_getmsgint64(), and LogicalRepBeginData::xid.

Referenced by apply_handle_begin().

58 {
59  /* read fields */
60  begin_data->final_lsn = pq_getmsgint64(in);
61  if (begin_data->final_lsn == InvalidXLogRecPtr)
62  elog(ERROR, "final_lsn not set in begin message");
63  begin_data->committime = pq_getmsgint64(in);
64  begin_data->xid = pq_getmsgint(in, 4);
65 }
TransactionId xid
Definition: logicalproto.h:67
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define ERROR
Definition: elog.h:43
XLogRecPtr final_lsn
Definition: logicalproto.h:65
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:448
#define elog
Definition: elog.h:219
TimestampTz committime
Definition: logicalproto.h:66
void logicalrep_read_commit ( StringInfo  in,
LogicalRepCommitData commit_data 
)

Definition at line 92 of file proto.c.

References LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, elog, LogicalRepCommitData::end_lsn, ERROR, pq_getmsgbyte(), and pq_getmsgint64().

Referenced by apply_handle_commit().

93 {
94  /* read flags (unused for now) */
95  uint8 flags = pq_getmsgbyte(in);
96 
97  if (flags != 0)
98  elog(ERROR, "unknown flags %u in commit message", flags);
99 
100  /* read fields */
101  commit_data->commit_lsn = pq_getmsgint64(in);
102  commit_data->end_lsn = pq_getmsgint64(in);
103  commit_data->committime = pq_getmsgint64(in);
104 }
unsigned char uint8
Definition: c.h:263
#define ERROR
Definition: elog.h:43
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
#define elog
Definition: elog.h:219
XLogRecPtr commit_lsn
Definition: logicalproto.h:72
TimestampTz committime
Definition: logicalproto.h:74
LogicalRepRelId logicalrep_read_delete ( StringInfo  in,
LogicalRepTupleData oldtup 
)

Definition at line 277 of file proto.c.

References elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_delete().

278 {
279  char action;
280  LogicalRepRelId relid;
281 
282  /* read the relation id */
283  relid = pq_getmsgint(in, 4);
284 
285  /* read and verify action */
286  action = pq_getmsgbyte(in);
287  if (action != 'K' && action != 'O')
288  elog(ERROR, "expected action 'O' or 'K', got %c", action);
289 
290  logicalrep_read_tuple(in, oldtup);
291 
292  return relid;
293 }
#define ERROR
Definition: elog.h:43
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:461
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:448
#define elog
Definition: elog.h:219
uint32 LogicalRepRelId
Definition: logicalproto.h:37
LogicalRepRelId logicalrep_read_insert ( StringInfo  in,
LogicalRepTupleData newtup 
)

Definition at line 160 of file proto.c.

References elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_insert().

161 {
162  char action;
163  LogicalRepRelId relid;
164 
165  /* read the relation id */
166  relid = pq_getmsgint(in, 4);
167 
168  action = pq_getmsgbyte(in);
169  if (action != 'N')
170  elog(ERROR, "expected new tuple but got %d",
171  action);
172 
173  logicalrep_read_tuple(in, newtup);
174 
175  return relid;
176 }
#define ERROR
Definition: elog.h:43
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:461
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:448
#define elog
Definition: elog.h:219
uint32 LogicalRepRelId
Definition: logicalproto.h:37
static const char * logicalrep_read_namespace ( StringInfo  in)
static

Definition at line 629 of file proto.c.

References pq_getmsgstring().

Referenced by logicalrep_read_rel(), and logicalrep_read_typ().

630 {
631  const char *nspname = pq_getmsgstring(in);
632 
633  if (nspname[0] == '\0')
634  nspname = "pg_catalog";
635 
636  return nspname;
637 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:621
char* logicalrep_read_origin ( StringInfo  in,
XLogRecPtr origin_lsn 
)

Definition at line 126 of file proto.c.

References pq_getmsgint64(), pq_getmsgstring(), and pstrdup().

127 {
128  /* fixed fields */
129  *origin_lsn = pq_getmsgint64(in);
130 
131  /* return origin */
132  return pstrdup(pq_getmsgstring(in));
133 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:621
char * pstrdup(const char *in)
Definition: mcxt.c:1165
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
LogicalRepRelation* logicalrep_read_rel ( StringInfo  in)

Definition at line 324 of file proto.c.

References logicalrep_read_attrs(), logicalrep_read_namespace(), LogicalRepRelation::nspname, palloc(), pq_getmsgbyte(), pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepRelation::relname, LogicalRepRelation::remoteid, and LogicalRepRelation::replident.

Referenced by apply_handle_relation().

325 {
327 
328  rel->remoteid = pq_getmsgint(in, 4);
329 
330  /* Read relation name from stream */
332  rel->relname = pstrdup(pq_getmsgstring(in));
333 
334  /* Read the replica identity. */
335  rel->replident = pq_getmsgbyte(in);
336 
337  /* Get attribute description */
338  logicalrep_read_attrs(in, rel);
339 
340  return rel;
341 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:621
char * pstrdup(const char *in)
Definition: mcxt.c:1165
LogicalRepRelId remoteid
Definition: logicalproto.h:43
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
Definition: proto.c:567
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:629
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
void * palloc(Size size)
Definition: mcxt.c:891
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:448
static void logicalrep_read_tuple ( StringInfo  in,
LogicalRepTupleData tuple 
)
static

Definition at line 461 of file proto.c.

References LogicalRepTupleData::changed, elog, ERROR, i, NULL, pq_getmsgbyte(), pq_getmsgbytes(), pq_getmsgint(), and LogicalRepTupleData::values.

Referenced by logicalrep_read_delete(), logicalrep_read_insert(), and logicalrep_read_update().

462 {
463  int i;
464  int natts;
465 
466  /* Get of attributes. */
467  natts = pq_getmsgint(in, 2);
468 
469  memset(tuple->changed, 0, sizeof(tuple->changed));
470 
471  /* Read the data */
472  for (i = 0; i < natts; i++)
473  {
474  char kind;
475  int len;
476 
477  kind = pq_getmsgbyte(in);
478 
479  switch (kind)
480  {
481  case 'n': /* null */
482  tuple->values[i] = NULL;
483  tuple->changed[i] = true;
484  break;
485  case 'u': /* unchanged column */
486  tuple->values[i] = (char *) 0xdeadbeef; /* make bad usage more obvious */
487  break;
488  case 't': /* text formatted value */
489  {
490  tuple->changed[i] = true;
491 
492  len = pq_getmsgint(in, 4); /* read length */
493 
494  /* and data */
495  tuple->values[i] = (char *) pq_getmsgbytes(in, len);
496  }
497  break;
498  default:
499  elog(ERROR, "unknown data representation type '%c'", kind);
500  }
501  }
502 }
const char * pq_getmsgbytes(StringInfo msg, int datalen)
Definition: pqformat.c:550
#define ERROR
Definition: elog.h:43
bool changed[MaxTupleAttributeNumber]
Definition: logicalproto.h:34
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
#define NULL
Definition: c.h:226
char * values[MaxTupleAttributeNumber]
Definition: logicalproto.h:33
int i
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:448
#define elog
Definition: elog.h:219
void logicalrep_read_typ ( StringInfo  in,
LogicalRepTyp ltyp 
)

Definition at line 376 of file proto.c.

References logicalrep_read_namespace(), LogicalRepTyp::nspname, pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepTyp::remoteid, and LogicalRepTyp::typname.

Referenced by apply_handle_type().

377 {
378  ltyp->remoteid = pq_getmsgint(in, 4);
379 
380  /* Read tupe name from stream */
382  ltyp->typname = pstrdup(pq_getmsgstring(in));
383 }
char * typname
Definition: logicalproto.h:58
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:621
char * pstrdup(const char *in)
Definition: mcxt.c:1165
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:629
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:448
char * nspname
Definition: logicalproto.h:57
LogicalRepRelId logicalrep_read_update ( StringInfo  in,
bool has_oldtuple,
LogicalRepTupleData oldtup,
LogicalRepTupleData newtup 
)

Definition at line 211 of file proto.c.

References elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_update().

214 {
215  char action;
216  LogicalRepRelId relid;
217 
218  /* read the relation id */
219  relid = pq_getmsgint(in, 4);
220 
221  /* read and verify action */
222  action = pq_getmsgbyte(in);
223  if (action != 'K' && action != 'O' && action != 'N')
224  elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
225  action);
226 
227  /* check for old tuple */
228  if (action == 'K' || action == 'O')
229  {
230  logicalrep_read_tuple(in, oldtup);
231  *has_oldtuple = true;
232 
233  action = pq_getmsgbyte(in);
234  }
235  else
236  *has_oldtuple = false;
237 
238  /* check for new tuple */
239  if (action != 'N')
240  elog(ERROR, "expected action 'N', got %c",
241  action);
242 
243  logicalrep_read_tuple(in, newtup);
244 
245  return relid;
246 }
#define ERROR
Definition: elog.h:43
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:461
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:448
#define elog
Definition: elog.h:219
uint32 LogicalRepRelId
Definition: logicalproto.h:37
static void logicalrep_write_attrs ( StringInfo  out,
Relation  rel 
)
static

Definition at line 508 of file proto.c.

References tupleDesc::attrs, bms_free(), bms_is_member(), FirstLowInvalidHeapAttributeNumber, i, INDEX_ATTR_BITMAP_IDENTITY_KEY, LOGICALREP_IS_REPLICA_IDENTITY, NameStr, tupleDesc::natts, NULL, pq_sendbyte(), pq_sendint(), pq_sendstring(), RelationData::rd_rel, RelationGetDescr, RelationGetIndexAttrBitmap(), and REPLICA_IDENTITY_FULL.

Referenced by logicalrep_write_rel().

509 {
510  TupleDesc desc;
511  int i;
512  uint16 nliveatts = 0;
513  Bitmapset *idattrs = NULL;
514  bool replidentfull;
515 
516  desc = RelationGetDescr(rel);
517 
518  /* send number of live attributes */
519  for (i = 0; i < desc->natts; i++)
520  {
521  if (desc->attrs[i]->attisdropped)
522  continue;
523  nliveatts++;
524  }
525  pq_sendint(out, nliveatts, 2);
526 
527  /* fetch bitmap of REPLICATION IDENTITY attributes */
528  replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
529  if (!replidentfull)
530  idattrs = RelationGetIndexAttrBitmap(rel,
532 
533  /* send the attributes */
534  for (i = 0; i < desc->natts; i++)
535  {
536  Form_pg_attribute att = desc->attrs[i];
537  uint8 flags = 0;
538 
539  if (att->attisdropped)
540  continue;
541 
542  /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
543  if (replidentfull ||
545  idattrs))
547 
548  pq_sendbyte(out, flags);
549 
550  /* attribute name */
551  pq_sendstring(out, NameStr(att->attname));
552 
553  /* attribute type id */
554  pq_sendint(out, (int) att->atttypid, sizeof(att->atttypid));
555 
556  /* attribute mode */
557  pq_sendint(out, att->atttypmod, sizeof(att->atttypmod));
558  }
559 
560  bms_free(idattrs);
561 }
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
#define RelationGetDescr(relation)
Definition: rel.h:425
Form_pg_attribute * attrs
Definition: tupdesc.h:74
unsigned char uint8
Definition: c.h:263
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:28
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:186
Form_pg_class rd_rel
Definition: rel.h:113
int natts
Definition: tupdesc.h:73
#define LOGICALREP_IS_REPLICA_IDENTITY
Definition: proto.c:27
unsigned short uint16
Definition: c.h:264
#define REPLICA_IDENTITY_FULL
Definition: pg_class.h:179
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:184
void bms_free(Bitmapset *a)
Definition: bitmapset.c:200
#define NULL
Definition: c.h:226
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
int i
Bitmapset * RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
Definition: relcache.c:4759
#define NameStr(name)
Definition: c.h:494
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:419
void logicalrep_write_begin ( StringInfo  out,
ReorderBufferTXN txn 
)

Definition at line 43 of file proto.c.

References ReorderBufferTXN::commit_time, ReorderBufferTXN::final_lsn, pq_sendbyte(), pq_sendint(), pq_sendint64(), and ReorderBufferTXN::xid.

Referenced by pgoutput_begin_txn().

44 {
45  pq_sendbyte(out, 'B'); /* BEGIN */
46 
47  /* fixed fields */
48  pq_sendint64(out, txn->final_lsn);
49  pq_sendint64(out, txn->commit_time);
50  pq_sendint(out, txn->xid, 4);
51 }
TimestampTz commit_time
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
XLogRecPtr final_lsn
TransactionId xid
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
void logicalrep_write_commit ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 72 of file proto.c.

References ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, pq_sendbyte(), and pq_sendint64().

Referenced by pgoutput_commit_txn().

74 {
75  uint8 flags = 0;
76 
77  pq_sendbyte(out, 'C'); /* sending COMMIT */
78 
79  /* send the flags field (unused for now) */
80  pq_sendbyte(out, flags);
81 
82  /* send fields */
83  pq_sendint64(out, commit_lsn);
84  pq_sendint64(out, txn->end_lsn);
85  pq_sendint64(out, txn->commit_time);
86 }
TimestampTz commit_time
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
unsigned char uint8
Definition: c.h:263
XLogRecPtr end_lsn
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
void logicalrep_write_delete ( StringInfo  out,
Relation  rel,
HeapTuple  oldtuple 
)

Definition at line 252 of file proto.c.

References Assert, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint(), RelationData::rd_rel, RelationGetRelid, REPLICA_IDENTITY_DEFAULT, REPLICA_IDENTITY_FULL, and REPLICA_IDENTITY_INDEX.

Referenced by pgoutput_change().

253 {
254  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
255  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
256  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
257 
258  pq_sendbyte(out, 'D'); /* action DELETE */
259 
260  /* use Oid as relation identifier */
261  pq_sendint(out, RelationGetRelid(rel), 4);
262 
263  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
264  pq_sendbyte(out, 'O'); /* old tuple follows */
265  else
266  pq_sendbyte(out, 'K'); /* old key follows */
267 
268  logicalrep_write_tuple(out, rel, oldtuple);
269 }
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
#define REPLICA_IDENTITY_DEFAULT
Definition: pg_class.h:175
Form_pg_class rd_rel
Definition: rel.h:113
#define REPLICA_IDENTITY_FULL
Definition: pg_class.h:179
#define Assert(condition)
Definition: c.h:670
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
#define REPLICA_IDENTITY_INDEX
Definition: pg_class.h:185
#define RelationGetRelid(relation)
Definition: rel.h:413
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
Definition: proto.c:389
void logicalrep_write_insert ( StringInfo  out,
Relation  rel,
HeapTuple  newtuple 
)

Definition at line 139 of file proto.c.

References Assert, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint(), RelationData::rd_rel, RelationGetRelid, REPLICA_IDENTITY_DEFAULT, REPLICA_IDENTITY_FULL, and REPLICA_IDENTITY_INDEX.

Referenced by pgoutput_change().

140 {
141  pq_sendbyte(out, 'I'); /* action INSERT */
142 
143  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
144  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
145  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
146 
147  /* use Oid as relation identifier */
148  pq_sendint(out, RelationGetRelid(rel), 4);
149 
150  pq_sendbyte(out, 'N'); /* new tuple follows */
151  logicalrep_write_tuple(out, rel, newtuple);
152 }
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
#define REPLICA_IDENTITY_DEFAULT
Definition: pg_class.h:175
Form_pg_class rd_rel
Definition: rel.h:113
#define REPLICA_IDENTITY_FULL
Definition: pg_class.h:179
#define Assert(condition)
Definition: c.h:670
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
#define REPLICA_IDENTITY_INDEX
Definition: pg_class.h:185
#define RelationGetRelid(relation)
Definition: rel.h:413
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
Definition: proto.c:389
static void logicalrep_write_namespace ( StringInfo  out,
Oid  nspid 
)
static

Definition at line 609 of file proto.c.

References elog, ERROR, get_namespace_name(), NULL, PG_CATALOG_NAMESPACE, pq_sendbyte(), and pq_sendstring().

Referenced by logicalrep_write_rel(), and logicalrep_write_typ().

610 {
611  if (nspid == PG_CATALOG_NAMESPACE)
612  pq_sendbyte(out, '\0');
613  else
614  {
615  char *nspname = get_namespace_name(nspid);
616 
617  if (nspname == NULL)
618  elog(ERROR, "cache lookup failed for namespace %u",
619  nspid);
620 
621  pq_sendstring(out, nspname);
622  }
623 }
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:186
#define ERROR
Definition: elog.h:43
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3006
#define PG_CATALOG_NAMESPACE
Definition: pg_namespace.h:71
#define NULL
Definition: c.h:226
#define elog
Definition: elog.h:219
void logicalrep_write_origin ( StringInfo  out,
const char *  origin,
XLogRecPtr  origin_lsn 
)

Definition at line 110 of file proto.c.

References pq_sendbyte(), pq_sendint64(), and pq_sendstring().

Referenced by pgoutput_begin_txn().

112 {
113  pq_sendbyte(out, 'O'); /* ORIGIN */
114 
115  /* fixed fields */
116  pq_sendint64(out, origin_lsn);
117 
118  /* origin string */
119  pq_sendstring(out, origin);
120 }
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:186
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
void logicalrep_write_rel ( StringInfo  out,
Relation  rel 
)

Definition at line 299 of file proto.c.

References logicalrep_write_attrs(), logicalrep_write_namespace(), pq_sendbyte(), pq_sendint(), pq_sendstring(), RelationData::rd_rel, RelationGetNamespace, RelationGetRelationName, and RelationGetRelid.

Referenced by pgoutput_change().

300 {
301  char *relname;
302 
303  pq_sendbyte(out, 'R'); /* sending RELATION */
304 
305  /* use Oid as relation identifier */
306  pq_sendint(out, RelationGetRelid(rel), 4);
307 
308  /* send qualified relation name */
310  relname = RelationGetRelationName(rel);
311  pq_sendstring(out, relname);
312 
313  /* send replica identity */
314  pq_sendbyte(out, rel->rd_rel->relreplident);
315 
316  /* send the attribute info */
317  logicalrep_write_attrs(out, rel);
318 }
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:609
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:186
Form_pg_class rd_rel
Definition: rel.h:113
static void logicalrep_write_attrs(StringInfo out, Relation rel)
Definition: proto.c:508
#define RelationGetRelationName(relation)
Definition: rel.h:433
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
#define RelationGetRelid(relation)
Definition: rel.h:413
#define RelationGetNamespace(relation)
Definition: rel.h:440
static void logicalrep_write_tuple ( StringInfo  out,
Relation  rel,
HeapTuple  tuple 
)
static

Definition at line 389 of file proto.c.

References appendBinaryStringInfo(), tupleDesc::attrs, elog, enlargeStringInfo(), ERROR, GETSTRUCT, heap_deform_tuple(), HeapTupleIsValid, i, MaxTupleAttributeNumber, tupleDesc::natts, ObjectIdGetDatum, OidOutputFunctionCall(), pfree(), pq_sendbyte(), pq_sendint(), RelationGetDescr, ReleaseSysCache(), SearchSysCache1, HeapTupleData::t_len, TYPEOID, values, and VARATT_IS_EXTERNAL_ONDISK.

Referenced by logicalrep_write_delete(), logicalrep_write_insert(), and logicalrep_write_update().

390 {
391  TupleDesc desc;
393  bool isnull[MaxTupleAttributeNumber];
394  int i;
395  uint16 nliveatts = 0;
396 
397  desc = RelationGetDescr(rel);
398 
399  for (i = 0; i < desc->natts; i++)
400  {
401  if (desc->attrs[i]->attisdropped)
402  continue;
403  nliveatts++;
404  }
405  pq_sendint(out, nliveatts, 2);
406 
407  /* try to allocate enough memory from the get-go */
408  enlargeStringInfo(out, tuple->t_len +
409  nliveatts * (1 + 4));
410 
411  heap_deform_tuple(tuple, desc, values, isnull);
412 
413  /* Write the values */
414  for (i = 0; i < desc->natts; i++)
415  {
416  HeapTuple typtup;
417  Form_pg_type typclass;
418  Form_pg_attribute att = desc->attrs[i];
419  char *outputstr;
420  int len;
421 
422  /* skip dropped columns */
423  if (att->attisdropped)
424  continue;
425 
426  if (isnull[i])
427  {
428  pq_sendbyte(out, 'n'); /* null column */
429  continue;
430  }
431  else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
432  {
433  pq_sendbyte(out, 'u'); /* unchanged toast column */
434  continue;
435  }
436 
437  typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
438  if (!HeapTupleIsValid(typtup))
439  elog(ERROR, "cache lookup failed for type %u", att->atttypid);
440  typclass = (Form_pg_type) GETSTRUCT(typtup);
441 
442  pq_sendbyte(out, 't'); /* 'text' data follows */
443 
444  outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
445  len = strlen(outputstr) + 1; /* null terminated */
446  pq_sendint(out, len, 4); /* length */
447  appendBinaryStringInfo(out, outputstr, len); /* data */
448 
449  pfree(outputstr);
450 
451  ReleaseSysCache(typtup);
452  }
453 }
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: postgres.h:317
#define GETSTRUCT(TUP)
Definition: htup_details.h:656
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
#define MaxTupleAttributeNumber
Definition: htup_details.h:33
#define RelationGetDescr(relation)
Definition: rel.h:425
Form_pg_attribute * attrs
Definition: tupdesc.h:74
FormData_pg_type * Form_pg_type
Definition: pg_type.h:233
int natts
Definition: tupdesc.h:73
#define SearchSysCache1(cacheId, key1)
Definition: syscache.h:149
unsigned short uint16
Definition: c.h:264
void pfree(void *pointer)
Definition: mcxt.c:992
#define ObjectIdGetDatum(X)
Definition: postgres.h:515
#define ERROR
Definition: elog.h:43
uint32 t_len
Definition: htup.h:64
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:277
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:184
uintptr_t Datum
Definition: postgres.h:374
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1083
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:935
static Datum values[MAXATTR]
Definition: bootstrap.c:162
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:2006
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
int i
#define elog
Definition: elog.h:219
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:240
void logicalrep_write_typ ( StringInfo  out,
Oid  typoid 
)

Definition at line 349 of file proto.c.

References elog, ERROR, getBaseType(), GETSTRUCT, HeapTupleIsValid, logicalrep_write_namespace(), NameStr, ObjectIdGetDatum, pq_sendbyte(), pq_sendint(), pq_sendstring(), ReleaseSysCache(), SearchSysCache1, and TYPEOID.

Referenced by pgoutput_change().

350 {
351  Oid basetypoid = getBaseType(typoid);
352  HeapTuple tup;
353  Form_pg_type typtup;
354 
355  pq_sendbyte(out, 'Y'); /* sending TYPE */
356 
357  tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
358  if (!HeapTupleIsValid(tup))
359  elog(ERROR, "cache lookup failed for type %u", basetypoid);
360  typtup = (Form_pg_type) GETSTRUCT(tup);
361 
362  /* use Oid as relation identifier */
363  pq_sendint(out, typoid, 4);
364 
365  /* send qualified type name */
366  logicalrep_write_namespace(out, typtup->typnamespace);
367  pq_sendstring(out, NameStr(typtup->typname));
368 
369  ReleaseSysCache(tup);
370 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:656
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:609
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:186
FormData_pg_type * Form_pg_type
Definition: pg_type.h:233
unsigned int Oid
Definition: postgres_ext.h:31
#define SearchSysCache1(cacheId, key1)
Definition: syscache.h:149
#define ObjectIdGetDatum(X)
Definition: postgres.h:515
#define ERROR
Definition: elog.h:43
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1083
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
#define NameStr(name)
Definition: c.h:494
#define elog
Definition: elog.h:219
Oid getBaseType(Oid typid)
Definition: lsyscache.c:2239
void logicalrep_write_update ( StringInfo  out,
Relation  rel,
HeapTuple  oldtuple,
HeapTuple  newtuple 
)

Definition at line 182 of file proto.c.

References Assert, logicalrep_write_tuple(), NULL, pq_sendbyte(), pq_sendint(), RelationData::rd_rel, RelationGetRelid, REPLICA_IDENTITY_DEFAULT, REPLICA_IDENTITY_FULL, and REPLICA_IDENTITY_INDEX.

Referenced by pgoutput_change().

184 {
185  pq_sendbyte(out, 'U'); /* action UPDATE */
186 
187  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
188  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
189  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
190 
191  /* use Oid as relation identifier */
192  pq_sendint(out, RelationGetRelid(rel), 4);
193 
194  if (oldtuple != NULL)
195  {
196  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
197  pq_sendbyte(out, 'O'); /* old tuple follows */
198  else
199  pq_sendbyte(out, 'K'); /* old key follows */
200  logicalrep_write_tuple(out, rel, oldtuple);
201  }
202 
203  pq_sendbyte(out, 'N'); /* new tuple follows */
204  logicalrep_write_tuple(out, rel, newtuple);
205 }
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
#define REPLICA_IDENTITY_DEFAULT
Definition: pg_class.h:175
Form_pg_class rd_rel
Definition: rel.h:113
#define REPLICA_IDENTITY_FULL
Definition: pg_class.h:179
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:670
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
#define REPLICA_IDENTITY_INDEX
Definition: pg_class.h:185
#define RelationGetRelid(relation)
Definition: rel.h:413
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
Definition: proto.c:389