PostgreSQL Source Code  git master
proto.c File Reference
#include "postgres.h"
#include "access/sysattr.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_type.h"
#include "libpq/pqformat.h"
#include "replication/logicalproto.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
Include dependency graph for proto.c:

Go to the source code of this file.

Macros

#define LOGICALREP_IS_REPLICA_IDENTITY   1
 
#define MESSAGE_TRANSACTIONAL   (1<<0)
 
#define TRUNCATE_CASCADE   (1<<0)
 
#define TRUNCATE_RESTART_SEQS   (1<<1)
 

Functions

static void logicalrep_write_attrs (StringInfo out, Relation rel)
 
static void logicalrep_write_tuple (StringInfo out, Relation rel, HeapTuple tuple, bool binary)
 
static void logicalrep_read_attrs (StringInfo in, LogicalRepRelation *rel)
 
static void logicalrep_read_tuple (StringInfo in, LogicalRepTupleData *tuple)
 
static void logicalrep_write_namespace (StringInfo out, Oid nspid)
 
static const char * logicalrep_read_namespace (StringInfo in)
 
void logicalrep_write_begin (StringInfo out, ReorderBufferTXN *txn)
 
void logicalrep_read_begin (StringInfo in, LogicalRepBeginData *begin_data)
 
void logicalrep_write_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
void logicalrep_read_commit (StringInfo in, LogicalRepCommitData *commit_data)
 
void logicalrep_write_origin (StringInfo out, const char *origin, XLogRecPtr origin_lsn)
 
char * logicalrep_read_origin (StringInfo in, XLogRecPtr *origin_lsn)
 
void logicalrep_write_insert (StringInfo out, TransactionId xid, Relation rel, HeapTuple newtuple, bool binary)
 
LogicalRepRelId logicalrep_read_insert (StringInfo in, LogicalRepTupleData *newtup)
 
void logicalrep_write_update (StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary)
 
LogicalRepRelId logicalrep_read_update (StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
 
void logicalrep_write_delete (StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, bool binary)
 
LogicalRepRelId logicalrep_read_delete (StringInfo in, LogicalRepTupleData *oldtup)
 
void logicalrep_write_truncate (StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
 
Listlogicalrep_read_truncate (StringInfo in, bool *cascade, bool *restart_seqs)
 
void logicalrep_write_message (StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
void logicalrep_write_rel (StringInfo out, TransactionId xid, Relation rel)
 
LogicalRepRelationlogicalrep_read_rel (StringInfo in)
 
void logicalrep_write_typ (StringInfo out, TransactionId xid, Oid typoid)
 
void logicalrep_read_typ (StringInfo in, LogicalRepTyp *ltyp)
 
void logicalrep_write_stream_start (StringInfo out, TransactionId xid, bool first_segment)
 
TransactionId logicalrep_read_stream_start (StringInfo in, bool *first_segment)
 
void logicalrep_write_stream_stop (StringInfo out)
 
void logicalrep_write_stream_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
TransactionId logicalrep_read_stream_commit (StringInfo in, LogicalRepCommitData *commit_data)
 
void logicalrep_write_stream_abort (StringInfo out, TransactionId xid, TransactionId subxid)
 
void logicalrep_read_stream_abort (StringInfo in, TransactionId *xid, TransactionId *subxid)
 

Macro Definition Documentation

◆ LOGICALREP_IS_REPLICA_IDENTITY

#define LOGICALREP_IS_REPLICA_IDENTITY   1

Definition at line 26 of file proto.c.

Referenced by logicalrep_read_attrs(), and logicalrep_write_attrs().

◆ MESSAGE_TRANSACTIONAL

#define MESSAGE_TRANSACTIONAL   (1<<0)

Definition at line 28 of file proto.c.

Referenced by logicalrep_write_message().

◆ TRUNCATE_CASCADE

#define TRUNCATE_CASCADE   (1<<0)

Definition at line 29 of file proto.c.

Referenced by logicalrep_read_truncate(), and logicalrep_write_truncate().

◆ TRUNCATE_RESTART_SEQS

#define TRUNCATE_RESTART_SEQS   (1<<1)

Definition at line 30 of file proto.c.

Referenced by logicalrep_read_truncate(), and logicalrep_write_truncate().

Function Documentation

◆ logicalrep_read_attrs()

static void logicalrep_read_attrs ( StringInfo  in,
LogicalRepRelation rel 
)
static

Definition at line 708 of file proto.c.

References LogicalRepRelation::attkeys, LogicalRepRelation::attnames, LogicalRepRelation::atttyps, bms_add_member(), i, LOGICALREP_IS_REPLICA_IDENTITY, LogicalRepRelation::natts, palloc(), pq_getmsgbyte(), pq_getmsgint(), pq_getmsgstring(), and pstrdup().

Referenced by logicalrep_read_rel().

709 {
710  int i;
711  int natts;
712  char **attnames;
713  Oid *atttyps;
714  Bitmapset *attkeys = NULL;
715 
716  natts = pq_getmsgint(in, 2);
717  attnames = palloc(natts * sizeof(char *));
718  atttyps = palloc(natts * sizeof(Oid));
719 
720  /* read the attributes */
721  for (i = 0; i < natts; i++)
722  {
723  uint8 flags;
724 
725  /* Check for replica identity column */
726  flags = pq_getmsgbyte(in);
727  if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
728  attkeys = bms_add_member(attkeys, i);
729 
730  /* attribute name */
731  attnames[i] = pstrdup(pq_getmsgstring(in));
732 
733  /* attribute type id */
734  atttyps[i] = (Oid) pq_getmsgint(in, 4);
735 
736  /* we ignore attribute mode for now */
737  (void) pq_getmsgint(in, 4);
738  }
739 
740  rel->attnames = attnames;
741  rel->atttyps = atttyps;
742  rel->attkeys = attkeys;
743  rel->natts = natts;
744 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * pstrdup(const char *in)
Definition: mcxt.c:1299
unsigned char uint8
Definition: c.h:439
unsigned int Oid
Definition: postgres_ext.h:31
Bitmapset * attkeys
Definition: logicalproto.h:99
#define LOGICALREP_IS_REPLICA_IDENTITY
Definition: proto.c:26
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:736
void * palloc(Size size)
Definition: mcxt.c:1062
int i
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_begin()

void logicalrep_read_begin ( StringInfo  in,
LogicalRepBeginData begin_data 
)

Definition at line 60 of file proto.c.

References LogicalRepBeginData::committime, elog, ERROR, LogicalRepBeginData::final_lsn, InvalidXLogRecPtr, pq_getmsgint(), pq_getmsgint64(), and LogicalRepBeginData::xid.

Referenced by apply_handle_begin().

61 {
62  /* read fields */
63  begin_data->final_lsn = pq_getmsgint64(in);
64  if (begin_data->final_lsn == InvalidXLogRecPtr)
65  elog(ERROR, "final_lsn not set in begin message");
66  begin_data->committime = pq_getmsgint64(in);
67  begin_data->xid = pq_getmsgint(in, 4);
68 }
TransactionId xid
Definition: logicalproto.h:115
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define ERROR
Definition: elog.h:46
XLogRecPtr final_lsn
Definition: logicalproto.h:113
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
TimestampTz committime
Definition: logicalproto.h:114

◆ logicalrep_read_commit()

void logicalrep_read_commit ( StringInfo  in,
LogicalRepCommitData commit_data 
)

Definition at line 95 of file proto.c.

References LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, elog, LogicalRepCommitData::end_lsn, ERROR, pq_getmsgbyte(), and pq_getmsgint64().

Referenced by apply_handle_commit().

96 {
97  /* read flags (unused for now) */
98  uint8 flags = pq_getmsgbyte(in);
99 
100  if (flags != 0)
101  elog(ERROR, "unrecognized flags %u in commit message", flags);
102 
103  /* read fields */
104  commit_data->commit_lsn = pq_getmsgint64(in);
105  commit_data->end_lsn = pq_getmsgint64(in);
106  commit_data->committime = pq_getmsgint64(in);
107 }
unsigned char uint8
Definition: c.h:439
#define ERROR
Definition: elog.h:46
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
TimestampTz committime
Definition: logicalproto.h:122

◆ logicalrep_read_delete()

LogicalRepRelId logicalrep_read_delete ( StringInfo  in,
LogicalRepTupleData oldtup 
)

Definition at line 290 of file proto.c.

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_delete().

291 {
292  char action;
293  LogicalRepRelId relid;
294 
295  /* read the relation id */
296  relid = pq_getmsgint(in, 4);
297 
298  /* read and verify action */
299  action = pq_getmsgbyte(in);
300  if (action != 'K' && action != 'O')
301  elog(ERROR, "expected action 'O' or 'K', got %c", action);
302 
303  logicalrep_read_tuple(in, oldtup);
304 
305  return relid;
306 }
#define ERROR
Definition: elog.h:46
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:583
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:232
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
uint32 LogicalRepRelId
Definition: logicalproto.h:85

◆ logicalrep_read_insert()

LogicalRepRelId logicalrep_read_insert ( StringInfo  in,
LogicalRepTupleData newtup 
)

Definition at line 164 of file proto.c.

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_insert().

165 {
166  char action;
167  LogicalRepRelId relid;
168 
169  /* read the relation id */
170  relid = pq_getmsgint(in, 4);
171 
172  action = pq_getmsgbyte(in);
173  if (action != 'N')
174  elog(ERROR, "expected new tuple but got %d",
175  action);
176 
177  logicalrep_read_tuple(in, newtup);
178 
179  return relid;
180 }
#define ERROR
Definition: elog.h:46
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:583
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:232
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
uint32 LogicalRepRelId
Definition: logicalproto.h:85

◆ logicalrep_read_namespace()

static const char * logicalrep_read_namespace ( StringInfo  in)
static

Definition at line 770 of file proto.c.

References pq_getmsgstring().

Referenced by logicalrep_read_rel(), and logicalrep_read_typ().

771 {
772  const char *nspname = pq_getmsgstring(in);
773 
774  if (nspname[0] == '\0')
775  nspname = "pg_catalog";
776 
777  return nspname;
778 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581

◆ logicalrep_read_origin()

char* logicalrep_read_origin ( StringInfo  in,
XLogRecPtr origin_lsn 
)

Definition at line 129 of file proto.c.

References pq_getmsgint64(), pq_getmsgstring(), and pstrdup().

130 {
131  /* fixed fields */
132  *origin_lsn = pq_getmsgint64(in);
133 
134  /* return origin */
135  return pstrdup(pq_getmsgstring(in));
136 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * pstrdup(const char *in)
Definition: mcxt.c:1299
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455

◆ logicalrep_read_rel()

LogicalRepRelation* logicalrep_read_rel ( StringInfo  in)

Definition at line 425 of file proto.c.

References logicalrep_read_attrs(), logicalrep_read_namespace(), LogicalRepRelation::nspname, palloc(), pq_getmsgbyte(), pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepRelation::relname, LogicalRepRelation::remoteid, and LogicalRepRelation::replident.

Referenced by apply_handle_relation().

426 {
428 
429  rel->remoteid = pq_getmsgint(in, 4);
430 
431  /* Read relation name from stream */
433  rel->relname = pstrdup(pq_getmsgstring(in));
434 
435  /* Read the replica identity. */
436  rel->replident = pq_getmsgbyte(in);
437 
438  /* Get attribute description */
439  logicalrep_read_attrs(in, rel);
440 
441  return rel;
442 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * pstrdup(const char *in)
Definition: mcxt.c:1299
LogicalRepRelId remoteid
Definition: logicalproto.h:91
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
Definition: proto.c:708
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:770
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
void * palloc(Size size)
Definition: mcxt.c:1062
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_stream_abort()

void logicalrep_read_stream_abort ( StringInfo  in,
TransactionId xid,
TransactionId subxid 
)

Definition at line 894 of file proto.c.

References Assert, and pq_getmsgint().

Referenced by apply_handle_stream_abort().

896 {
897  Assert(xid && subxid);
898 
899  *xid = pq_getmsgint(in, 4);
900  *subxid = pq_getmsgint(in, 4);
901 }
#define Assert(condition)
Definition: c.h:804
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_stream_commit()

TransactionId logicalrep_read_stream_commit ( StringInfo  in,
LogicalRepCommitData commit_data 
)

Definition at line 852 of file proto.c.

References LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, elog, LogicalRepCommitData::end_lsn, ERROR, pq_getmsgbyte(), pq_getmsgint(), and pq_getmsgint64().

Referenced by apply_handle_stream_commit().

853 {
854  TransactionId xid;
855  uint8 flags;
856 
857  xid = pq_getmsgint(in, 4);
858 
859  /* read flags (unused for now) */
860  flags = pq_getmsgbyte(in);
861 
862  if (flags != 0)
863  elog(ERROR, "unrecognized flags %u in commit message", flags);
864 
865  /* read fields */
866  commit_data->commit_lsn = pq_getmsgint64(in);
867  commit_data->end_lsn = pq_getmsgint64(in);
868  commit_data->committime = pq_getmsgint64(in);
869 
870  return xid;
871 }
uint32 TransactionId
Definition: c.h:587
unsigned char uint8
Definition: c.h:439
#define ERROR
Definition: elog.h:46
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
TimestampTz committime
Definition: logicalproto.h:122

◆ logicalrep_read_stream_start()

TransactionId logicalrep_read_stream_start ( StringInfo  in,
bool first_segment 
)

Definition at line 802 of file proto.c.

References Assert, pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_stream_start().

803 {
804  TransactionId xid;
805 
806  Assert(first_segment);
807 
808  xid = pq_getmsgint(in, 4);
809  *first_segment = (pq_getmsgbyte(in) == 1);
810 
811  return xid;
812 }
uint32 TransactionId
Definition: c.h:587
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define Assert(condition)
Definition: c.h:804
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_truncate()

List* logicalrep_read_truncate ( StringInfo  in,
bool cascade,
bool restart_seqs 
)

Definition at line 344 of file proto.c.

References i, lappend_oid(), NIL, pq_getmsgint(), TRUNCATE_CASCADE, and TRUNCATE_RESTART_SEQS.

Referenced by apply_handle_truncate().

346 {
347  int i;
348  int nrelids;
349  List *relids = NIL;
350  uint8 flags;
351 
352  nrelids = pq_getmsgint(in, 4);
353 
354  /* read and decode truncate flags */
355  flags = pq_getmsgint(in, 1);
356  *cascade = (flags & TRUNCATE_CASCADE) > 0;
357  *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
358 
359  for (i = 0; i < nrelids; i++)
360  relids = lappend_oid(relids, pq_getmsgint(in, 4));
361 
362  return relids;
363 }
#define NIL
Definition: pg_list.h:65
unsigned char uint8
Definition: c.h:439
List * lappend_oid(List *list, Oid datum)
Definition: list.c:372
int i
#define TRUNCATE_CASCADE
Definition: proto.c:29
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
Definition: pg_list.h:50
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:30

◆ logicalrep_read_tuple()

static void logicalrep_read_tuple ( StringInfo  in,
LogicalRepTupleData tuple 
)
static

Definition at line 583 of file proto.c.

References LogicalRepTupleData::colstatus, LogicalRepTupleData::colvalues, StringInfoData::cursor, StringInfoData::data, elog, ERROR, i, StringInfoData::len, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_NULL, LOGICALREP_COLUMN_TEXT, LOGICALREP_COLUMN_UNCHANGED, StringInfoData::maxlen, LogicalRepTupleData::ncols, palloc(), palloc0(), pq_copymsgbytes(), pq_getmsgbyte(), pq_getmsgint(), and value.

Referenced by logicalrep_read_delete(), logicalrep_read_insert(), and logicalrep_read_update().

584 {
585  int i;
586  int natts;
587 
588  /* Get number of attributes */
589  natts = pq_getmsgint(in, 2);
590 
591  /* Allocate space for per-column values; zero out unused StringInfoDatas */
592  tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
593  tuple->colstatus = (char *) palloc(natts * sizeof(char));
594  tuple->ncols = natts;
595 
596  /* Read the data */
597  for (i = 0; i < natts; i++)
598  {
599  char kind;
600  int len;
601  StringInfo value = &tuple->colvalues[i];
602 
603  kind = pq_getmsgbyte(in);
604  tuple->colstatus[i] = kind;
605 
606  switch (kind)
607  {
609  /* nothing more to do */
610  break;
612  /* we don't receive the value of an unchanged column */
613  break;
615  len = pq_getmsgint(in, 4); /* read length */
616 
617  /* and data */
618  value->data = palloc(len + 1);
619  pq_copymsgbytes(in, value->data, len);
620  value->data[len] = '\0';
621  /* make StringInfo fully valid */
622  value->len = len;
623  value->cursor = 0;
624  value->maxlen = len;
625  break;
627  len = pq_getmsgint(in, 4); /* read length */
628 
629  /* and data */
630  value->data = palloc(len + 1);
631  pq_copymsgbytes(in, value->data, len);
632  /* not strictly necessary but per StringInfo practice */
633  value->data[len] = '\0';
634  /* make StringInfo fully valid */
635  value->len = len;
636  value->cursor = 0;
637  value->maxlen = len;
638  break;
639  default:
640  elog(ERROR, "unrecognized data representation type '%c'", kind);
641  }
642  }
643 }
static struct @142 value
#define LOGICALREP_COLUMN_NULL
Definition: logicalproto.h:80
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:82
StringInfoData * colvalues
Definition: logicalproto.h:71
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:83
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:81
#define ERROR
Definition: elog.h:46
void * palloc0(Size size)
Definition: mcxt.c:1093
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
void pq_copymsgbytes(StringInfo msg, char *buf, int datalen)
Definition: pqformat.c:530
void * palloc(Size size)
Definition: mcxt.c:1062
#define elog(elevel,...)
Definition: elog.h:232
int i
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_typ()

void logicalrep_read_typ ( StringInfo  in,
LogicalRepTyp ltyp 
)

Definition at line 481 of file proto.c.

References logicalrep_read_namespace(), LogicalRepTyp::nspname, pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepTyp::remoteid, and LogicalRepTyp::typname.

Referenced by apply_handle_type().

482 {
483  ltyp->remoteid = pq_getmsgint(in, 4);
484 
485  /* Read type name from stream */
487  ltyp->typname = pstrdup(pq_getmsgstring(in));
488 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * pstrdup(const char *in)
Definition: mcxt.c:1299
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:770
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_update()

LogicalRepRelId logicalrep_read_update ( StringInfo  in,
bool has_oldtuple,
LogicalRepTupleData oldtup,
LogicalRepTupleData newtup 
)

Definition at line 219 of file proto.c.

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_update().

222 {
223  char action;
224  LogicalRepRelId relid;
225 
226  /* read the relation id */
227  relid = pq_getmsgint(in, 4);
228 
229  /* read and verify action */
230  action = pq_getmsgbyte(in);
231  if (action != 'K' && action != 'O' && action != 'N')
232  elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
233  action);
234 
235  /* check for old tuple */
236  if (action == 'K' || action == 'O')
237  {
238  logicalrep_read_tuple(in, oldtup);
239  *has_oldtuple = true;
240 
241  action = pq_getmsgbyte(in);
242  }
243  else
244  *has_oldtuple = false;
245 
246  /* check for new tuple */
247  if (action != 'N')
248  elog(ERROR, "expected action 'N', got %c",
249  action);
250 
251  logicalrep_read_tuple(in, newtup);
252 
253  return relid;
254 }
#define ERROR
Definition: elog.h:46
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:583
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:232
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
uint32 LogicalRepRelId
Definition: logicalproto.h:85

◆ logicalrep_write_attrs()

static void logicalrep_write_attrs ( StringInfo  out,
Relation  rel 
)
static

Definition at line 649 of file proto.c.

References bms_free(), bms_is_member(), FirstLowInvalidHeapAttributeNumber, i, INDEX_ATTR_BITMAP_IDENTITY_KEY, LOGICALREP_IS_REPLICA_IDENTITY, NameStr, TupleDescData::natts, pq_sendbyte(), pq_sendint16(), pq_sendint32(), pq_sendstring(), RelationData::rd_rel, RelationGetDescr, RelationGetIndexAttrBitmap(), and TupleDescAttr.

Referenced by logicalrep_write_rel().

650 {
651  TupleDesc desc;
652  int i;
653  uint16 nliveatts = 0;
654  Bitmapset *idattrs = NULL;
655  bool replidentfull;
656 
657  desc = RelationGetDescr(rel);
658 
659  /* send number of live attributes */
660  for (i = 0; i < desc->natts; i++)
661  {
662  if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
663  continue;
664  nliveatts++;
665  }
666  pq_sendint16(out, nliveatts);
667 
668  /* fetch bitmap of REPLICATION IDENTITY attributes */
669  replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
670  if (!replidentfull)
671  idattrs = RelationGetIndexAttrBitmap(rel,
673 
674  /* send the attributes */
675  for (i = 0; i < desc->natts; i++)
676  {
677  Form_pg_attribute att = TupleDescAttr(desc, i);
678  uint8 flags = 0;
679 
680  if (att->attisdropped || att->attgenerated)
681  continue;
682 
683  /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
684  if (replidentfull ||
686  idattrs))
688 
689  pq_sendbyte(out, flags);
690 
691  /* attribute name */
692  pq_sendstring(out, NameStr(att->attname));
693 
694  /* attribute type id */
695  pq_sendint32(out, (int) att->atttypid);
696 
697  /* attribute mode */
698  pq_sendint32(out, att->atttypmod);
699  }
700 
701  bms_free(idattrs);
702 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
#define RelationGetDescr(relation)
Definition: rel.h:483
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
unsigned char uint8
Definition: c.h:439
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
Form_pg_class rd_rel
Definition: rel.h:110
#define LOGICALREP_IS_REPLICA_IDENTITY
Definition: proto.c:26
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
unsigned short uint16
Definition: c.h:440
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:203
void bms_free(Bitmapset *a)
Definition: bitmapset.c:208
int i
Bitmapset * RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
Definition: relcache.c:4981
#define NameStr(name)
Definition: c.h:681
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:427

◆ logicalrep_write_begin()

void logicalrep_write_begin ( StringInfo  out,
ReorderBufferTXN txn 
)

Definition at line 46 of file proto.c.

References ReorderBufferTXN::commit_time, ReorderBufferTXN::final_lsn, LOGICAL_REP_MSG_BEGIN, pq_sendbyte(), pq_sendint32(), pq_sendint64(), and ReorderBufferTXN::xid.

Referenced by pgoutput_begin_txn().

47 {
49 
50  /* fixed fields */
51  pq_sendint64(out, txn->final_lsn);
52  pq_sendint64(out, txn->commit_time);
53  pq_sendint32(out, txn->xid);
54 }
TimestampTz commit_time
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
XLogRecPtr final_lsn
TransactionId xid

◆ logicalrep_write_commit()

void logicalrep_write_commit ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 75 of file proto.c.

References ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, LOGICAL_REP_MSG_COMMIT, pq_sendbyte(), and pq_sendint64().

Referenced by pgoutput_commit_txn().

77 {
78  uint8 flags = 0;
79 
81 
82  /* send the flags field (unused for now) */
83  pq_sendbyte(out, flags);
84 
85  /* send fields */
86  pq_sendint64(out, commit_lsn);
87  pq_sendint64(out, txn->end_lsn);
88  pq_sendint64(out, txn->commit_time);
89 }
TimestampTz commit_time
unsigned char uint8
Definition: c.h:439
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
XLogRecPtr end_lsn

◆ logicalrep_write_delete()

void logicalrep_write_delete ( StringInfo  out,
TransactionId  xid,
Relation  rel,
HeapTuple  oldtuple,
bool  binary 
)

Definition at line 260 of file proto.c.

References Assert, LOGICAL_REP_MSG_DELETE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

262 {
263  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
264  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
265  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
266 
268 
269  /* transaction ID (if not valid, we're not streaming) */
270  if (TransactionIdIsValid(xid))
271  pq_sendint32(out, xid);
272 
273  /* use Oid as relation identifier */
274  pq_sendint32(out, RelationGetRelid(rel));
275 
276  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
277  pq_sendbyte(out, 'O'); /* old tuple follows */
278  else
279  pq_sendbyte(out, 'K'); /* old key follows */
280 
281  logicalrep_write_tuple(out, rel, oldtuple, binary);
282 }
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
Definition: proto.c:494
Form_pg_class rd_rel
Definition: rel.h:110
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:804
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:457

◆ logicalrep_write_insert()

void logicalrep_write_insert ( StringInfo  out,
TransactionId  xid,
Relation  rel,
HeapTuple  newtuple,
bool  binary 
)

Definition at line 142 of file proto.c.

References LOGICAL_REP_MSG_INSERT, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

144 {
146 
147  /* transaction ID (if not valid, we're not streaming) */
148  if (TransactionIdIsValid(xid))
149  pq_sendint32(out, xid);
150 
151  /* use Oid as relation identifier */
152  pq_sendint32(out, RelationGetRelid(rel));
153 
154  pq_sendbyte(out, 'N'); /* new tuple follows */
155  logicalrep_write_tuple(out, rel, newtuple, binary);
156 }
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
Definition: proto.c:494
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:457

◆ logicalrep_write_message()

void logicalrep_write_message ( StringInfo  out,
TransactionId  xid,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)

Definition at line 369 of file proto.c.

References LOGICAL_REP_MSG_MESSAGE, MESSAGE_TRANSACTIONAL, pq_sendbyte(), pq_sendbytes(), pq_sendint32(), pq_sendint64(), pq_sendint8(), pq_sendstring(), and TransactionIdIsValid.

Referenced by pgoutput_message().

372 {
373  uint8 flags = 0;
374 
376 
377  /* encode and send message flags */
378  if (transactional)
379  flags |= MESSAGE_TRANSACTIONAL;
380 
381  /* transaction ID (if not valid, we're not streaming) */
382  if (TransactionIdIsValid(xid))
383  pq_sendint32(out, xid);
384 
385  pq_sendint8(out, flags);
386  pq_sendint64(out, lsn);
387  pq_sendstring(out, prefix);
388  pq_sendint32(out, sz);
389  pq_sendbytes(out, message, sz);
390 }
unsigned char uint8
Definition: c.h:439
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define MESSAGE_TRANSACTIONAL
Definition: proto.c:28
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static void pq_sendint8(StringInfo buf, uint8 i)
Definition: pqformat.h:129

◆ logicalrep_write_namespace()

static void logicalrep_write_namespace ( StringInfo  out,
Oid  nspid 
)
static

Definition at line 750 of file proto.c.

References elog, ERROR, get_namespace_name(), pq_sendbyte(), and pq_sendstring().

Referenced by logicalrep_write_rel(), and logicalrep_write_typ().

751 {
752  if (nspid == PG_CATALOG_NAMESPACE)
753  pq_sendbyte(out, '\0');
754  else
755  {
756  char *nspname = get_namespace_name(nspid);
757 
758  if (nspname == NULL)
759  elog(ERROR, "cache lookup failed for namespace %u",
760  nspid);
761 
762  pq_sendstring(out, nspname);
763  }
764 }
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define ERROR
Definition: elog.h:46
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3316
#define elog(elevel,...)
Definition: elog.h:232

◆ logicalrep_write_origin()

void logicalrep_write_origin ( StringInfo  out,
const char *  origin,
XLogRecPtr  origin_lsn 
)

Definition at line 113 of file proto.c.

References LOGICAL_REP_MSG_ORIGIN, pq_sendbyte(), pq_sendint64(), and pq_sendstring().

Referenced by pgoutput_begin_txn(), and pgoutput_stream_start().

115 {
117 
118  /* fixed fields */
119  pq_sendint64(out, origin_lsn);
120 
121  /* origin string */
122  pq_sendstring(out, origin);
123 }
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161

◆ logicalrep_write_rel()

void logicalrep_write_rel ( StringInfo  out,
TransactionId  xid,
Relation  rel 
)

Definition at line 396 of file proto.c.

References LOGICAL_REP_MSG_RELATION, logicalrep_write_attrs(), logicalrep_write_namespace(), pq_sendbyte(), pq_sendint32(), pq_sendstring(), RelationData::rd_rel, RelationGetNamespace, RelationGetRelationName, RelationGetRelid, relname, and TransactionIdIsValid.

Referenced by send_relation_and_attrs().

397 {
398  char *relname;
399 
401 
402  /* transaction ID (if not valid, we're not streaming) */
403  if (TransactionIdIsValid(xid))
404  pq_sendint32(out, xid);
405 
406  /* use Oid as relation identifier */
407  pq_sendint32(out, RelationGetRelid(rel));
408 
409  /* send qualified relation name */
411  relname = RelationGetRelationName(rel);
412  pq_sendstring(out, relname);
413 
414  /* send replica identity */
415  pq_sendbyte(out, rel->rd_rel->relreplident);
416 
417  /* send the attribute info */
418  logicalrep_write_attrs(out, rel);
419 }
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:750
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
Form_pg_class rd_rel
Definition: rel.h:110
NameData relname
Definition: pg_class.h:38
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static void logicalrep_write_attrs(StringInfo out, Relation rel)
Definition: proto.c:649
#define RelationGetRelationName(relation)
Definition: rel.h:491
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:457
#define RelationGetNamespace(relation)
Definition: rel.h:498

◆ logicalrep_write_stream_abort()

void logicalrep_write_stream_abort ( StringInfo  out,
TransactionId  xid,
TransactionId  subxid 
)

Definition at line 878 of file proto.c.

References Assert, LOGICAL_REP_MSG_STREAM_ABORT, pq_sendbyte(), pq_sendint32(), and TransactionIdIsValid.

Referenced by pgoutput_stream_abort().

880 {
882 
884 
885  /* transaction ID */
886  pq_sendint32(out, xid);
887  pq_sendint32(out, subxid);
888 }
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:804
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ logicalrep_write_stream_commit()

void logicalrep_write_stream_commit ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 827 of file proto.c.

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, LOGICAL_REP_MSG_STREAM_COMMIT, pq_sendbyte(), pq_sendint32(), pq_sendint64(), TransactionIdIsValid, and ReorderBufferTXN::xid.

Referenced by pgoutput_stream_commit().

829 {
830  uint8 flags = 0;
831 
833 
835 
836  /* transaction ID */
837  pq_sendint32(out, txn->xid);
838 
839  /* send the flags field (unused for now) */
840  pq_sendbyte(out, flags);
841 
842  /* send fields */
843  pq_sendint64(out, commit_lsn);
844  pq_sendint64(out, txn->end_lsn);
845  pq_sendint64(out, txn->commit_time);
846 }
TimestampTz commit_time
unsigned char uint8
Definition: c.h:439
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
TransactionId xid
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ logicalrep_write_stream_start()

void logicalrep_write_stream_start ( StringInfo  out,
TransactionId  xid,
bool  first_segment 
)

Definition at line 784 of file proto.c.

References Assert, LOGICAL_REP_MSG_STREAM_START, pq_sendbyte(), pq_sendint32(), and TransactionIdIsValid.

Referenced by pgoutput_stream_start().

786 {
788 
790 
791  /* transaction ID (we're starting to stream, so must be valid) */
792  pq_sendint32(out, xid);
793 
794  /* 1 if this is the first streaming segment for this xid */
795  pq_sendbyte(out, first_segment ? 1 : 0);
796 }
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:804
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ logicalrep_write_stream_stop()

void logicalrep_write_stream_stop ( StringInfo  out)

Definition at line 818 of file proto.c.

References LOGICAL_REP_MSG_STREAM_END, and pq_sendbyte().

Referenced by pgoutput_stream_stop().

819 {
821 }
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161

◆ logicalrep_write_truncate()

void logicalrep_write_truncate ( StringInfo  out,
TransactionId  xid,
int  nrelids,
Oid  relids[],
bool  cascade,
bool  restart_seqs 
)

Definition at line 312 of file proto.c.

References i, LOGICAL_REP_MSG_TRUNCATE, pq_sendbyte(), pq_sendint32(), pq_sendint8(), TransactionIdIsValid, TRUNCATE_CASCADE, and TRUNCATE_RESTART_SEQS.

Referenced by pgoutput_truncate().

317 {
318  int i;
319  uint8 flags = 0;
320 
322 
323  /* transaction ID (if not valid, we're not streaming) */
324  if (TransactionIdIsValid(xid))
325  pq_sendint32(out, xid);
326 
327  pq_sendint32(out, nrelids);
328 
329  /* encode and send truncate flags */
330  if (cascade)
331  flags |= TRUNCATE_CASCADE;
332  if (restart_seqs)
333  flags |= TRUNCATE_RESTART_SEQS;
334  pq_sendint8(out, flags);
335 
336  for (i = 0; i < nrelids; i++)
337  pq_sendint32(out, relids[i]);
338 }
unsigned char uint8
Definition: c.h:439
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
int i
#define TRUNCATE_CASCADE
Definition: proto.c:29
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static void pq_sendint8(StringInfo buf, uint8 i)
Definition: pqformat.h:129
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:30

◆ logicalrep_write_tuple()

static void logicalrep_write_tuple ( StringInfo  out,
Relation  rel,
HeapTuple  tuple,
bool  binary 
)
static

Definition at line 494 of file proto.c.

References elog, enlargeStringInfo(), ERROR, GETSTRUCT, heap_deform_tuple(), HeapTupleIsValid, i, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_NULL, LOGICALREP_COLUMN_TEXT, LOGICALREP_COLUMN_UNCHANGED, MaxTupleAttributeNumber, TupleDescData::natts, ObjectIdGetDatum, OidIsValid, OidOutputFunctionCall(), OidSendFunctionCall(), pfree(), pq_sendbyte(), pq_sendbytes(), pq_sendcountedtext(), pq_sendint(), pq_sendint16(), RelationGetDescr, ReleaseSysCache(), SearchSysCache1(), HeapTupleData::t_len, TupleDescAttr, TYPEOID, values, VARATT_IS_EXTERNAL_ONDISK, VARDATA, VARHDRSZ, and VARSIZE.

Referenced by logicalrep_write_delete(), logicalrep_write_insert(), and logicalrep_write_update().

495 {
496  TupleDesc desc;
498  bool isnull[MaxTupleAttributeNumber];
499  int i;
500  uint16 nliveatts = 0;
501 
502  desc = RelationGetDescr(rel);
503 
504  for (i = 0; i < desc->natts; i++)
505  {
506  if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
507  continue;
508  nliveatts++;
509  }
510  pq_sendint16(out, nliveatts);
511 
512  /* try to allocate enough memory from the get-go */
513  enlargeStringInfo(out, tuple->t_len +
514  nliveatts * (1 + 4));
515 
516  heap_deform_tuple(tuple, desc, values, isnull);
517 
518  /* Write the values */
519  for (i = 0; i < desc->natts; i++)
520  {
521  HeapTuple typtup;
522  Form_pg_type typclass;
523  Form_pg_attribute att = TupleDescAttr(desc, i);
524 
525  if (att->attisdropped || att->attgenerated)
526  continue;
527 
528  if (isnull[i])
529  {
531  continue;
532  }
533 
534  if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
535  {
536  /*
537  * Unchanged toasted datum. (Note that we don't promise to detect
538  * unchanged data in general; this is just a cheap check to avoid
539  * sending large values unnecessarily.)
540  */
542  continue;
543  }
544 
545  typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
546  if (!HeapTupleIsValid(typtup))
547  elog(ERROR, "cache lookup failed for type %u", att->atttypid);
548  typclass = (Form_pg_type) GETSTRUCT(typtup);
549 
550  /*
551  * Send in binary if requested and type has suitable send function.
552  */
553  if (binary && OidIsValid(typclass->typsend))
554  {
555  bytea *outputbytes;
556  int len;
557 
559  outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
560  len = VARSIZE(outputbytes) - VARHDRSZ;
561  pq_sendint(out, len, 4); /* length */
562  pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
563  pfree(outputbytes);
564  }
565  else
566  {
567  char *outputstr;
568 
570  outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
571  pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
572  pfree(outputstr);
573  }
574 
575  ReleaseSysCache(typtup);
576  }
577 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: postgres.h:327
#define VARDATA(PTR)
Definition: postgres.h:315
#define GETSTRUCT(TUP)
Definition: htup_details.h:654
static void pq_sendint(StringInfo buf, uint32 i, int b)
Definition: pqformat.h:172
#define MaxTupleAttributeNumber
Definition: htup_details.h:33
#define RelationGetDescr(relation)
Definition: rel.h:483
#define VARSIZE(PTR)
Definition: postgres.h:316
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define VARHDRSZ
Definition: c.h:627
#define LOGICALREP_COLUMN_NULL
Definition: logicalproto.h:80
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:82
#define OidIsValid(objectId)
Definition: c.h:710
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:83
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:81
unsigned short uint16
Definition: c.h:440
void pfree(void *pointer)
Definition: mcxt.c:1169
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define ERROR
Definition: elog.h:46
bytea * OidSendFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1675
uint32 t_len
Definition: htup.h:64
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:283
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:203
void pq_sendcountedtext(StringInfo buf, const char *str, int slen, bool countincludesself)
Definition: pqformat.c:142
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1127
uintptr_t Datum
Definition: postgres.h:411
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1175
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
FormData_pg_type * Form_pg_type
Definition: pg_type.h:261
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:1249
static Datum values[MAXATTR]
Definition: bootstrap.c:166
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1656
#define elog(elevel,...)
Definition: elog.h:232
int i
Definition: c.h:621

◆ logicalrep_write_typ()

void logicalrep_write_typ ( StringInfo  out,
TransactionId  xid,
Oid  typoid 
)

Definition at line 450 of file proto.c.

References elog, ERROR, getBaseType(), GETSTRUCT, HeapTupleIsValid, LOGICAL_REP_MSG_TYPE, logicalrep_write_namespace(), NameStr, ObjectIdGetDatum, pq_sendbyte(), pq_sendint32(), pq_sendstring(), ReleaseSysCache(), SearchSysCache1(), TransactionIdIsValid, and TYPEOID.

Referenced by send_relation_and_attrs().

451 {
452  Oid basetypoid = getBaseType(typoid);
453  HeapTuple tup;
454  Form_pg_type typtup;
455 
457 
458  /* transaction ID (if not valid, we're not streaming) */
459  if (TransactionIdIsValid(xid))
460  pq_sendint32(out, xid);
461 
462  tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
463  if (!HeapTupleIsValid(tup))
464  elog(ERROR, "cache lookup failed for type %u", basetypoid);
465  typtup = (Form_pg_type) GETSTRUCT(tup);
466 
467  /* use Oid as relation identifier */
468  pq_sendint32(out, typoid);
469 
470  /* send qualified type name */
471  logicalrep_write_namespace(out, typtup->typnamespace);
472  pq_sendstring(out, NameStr(typtup->typname));
473 
474  ReleaseSysCache(tup);
475 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:654
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:750
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
unsigned int Oid
Definition: postgres_ext.h:31
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define ERROR
Definition: elog.h:46
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1127
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1175
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
FormData_pg_type * Form_pg_type
Definition: pg_type.h:261
#define elog(elevel,...)
Definition: elog.h:232
#define NameStr(name)
Definition: c.h:681
#define TransactionIdIsValid(xid)
Definition: transam.h:41
Oid getBaseType(Oid typid)
Definition: lsyscache.c:2468

◆ logicalrep_write_update()

void logicalrep_write_update ( StringInfo  out,
TransactionId  xid,
Relation  rel,
HeapTuple  oldtuple,
HeapTuple  newtuple,
bool  binary 
)

Definition at line 186 of file proto.c.

References Assert, LOGICAL_REP_MSG_UPDATE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

188 {
190 
191  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
192  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
193  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
194 
195  /* transaction ID (if not valid, we're not streaming) */
196  if (TransactionIdIsValid(xid))
197  pq_sendint32(out, xid);
198 
199  /* use Oid as relation identifier */
200  pq_sendint32(out, RelationGetRelid(rel));
201 
202  if (oldtuple != NULL)
203  {
204  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
205  pq_sendbyte(out, 'O'); /* old tuple follows */
206  else
207  pq_sendbyte(out, 'K'); /* old key follows */
208  logicalrep_write_tuple(out, rel, oldtuple, binary);
209  }
210 
211  pq_sendbyte(out, 'N'); /* new tuple follows */
212  logicalrep_write_tuple(out, rel, newtuple, binary);
213 }
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
Definition: proto.c:494
Form_pg_class rd_rel
Definition: rel.h:110
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:804
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:457