PostgreSQL Source Code  git master
proto.c File Reference
#include "postgres.h"
#include "access/sysattr.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_type.h"
#include "libpq/pqformat.h"
#include "replication/logicalproto.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
Include dependency graph for proto.c:

Go to the source code of this file.

Macros

#define LOGICALREP_IS_REPLICA_IDENTITY   1
 
#define MESSAGE_TRANSACTIONAL   (1<<0)
 
#define TRUNCATE_CASCADE   (1<<0)
 
#define TRUNCATE_RESTART_SEQS   (1<<1)
 

Functions

static void logicalrep_write_attrs (StringInfo out, Relation rel)
 
static void logicalrep_write_tuple (StringInfo out, Relation rel, HeapTuple tuple, bool binary)
 
static void logicalrep_read_attrs (StringInfo in, LogicalRepRelation *rel)
 
static void logicalrep_read_tuple (StringInfo in, LogicalRepTupleData *tuple)
 
static void logicalrep_write_namespace (StringInfo out, Oid nspid)
 
static const char * logicalrep_read_namespace (StringInfo in)
 
void logicalrep_write_begin (StringInfo out, ReorderBufferTXN *txn)
 
void logicalrep_read_begin (StringInfo in, LogicalRepBeginData *begin_data)
 
void logicalrep_write_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
void logicalrep_read_commit (StringInfo in, LogicalRepCommitData *commit_data)
 
void logicalrep_write_begin_prepare (StringInfo out, ReorderBufferTXN *txn)
 
void logicalrep_read_begin_prepare (StringInfo in, LogicalRepPreparedTxnData *begin_data)
 
static void logicalrep_write_prepare_common (StringInfo out, LogicalRepMsgType type, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
void logicalrep_write_prepare (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static void logicalrep_read_prepare_common (StringInfo in, char *msgtype, LogicalRepPreparedTxnData *prepare_data)
 
void logicalrep_read_prepare (StringInfo in, LogicalRepPreparedTxnData *prepare_data)
 
void logicalrep_write_commit_prepared (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
void logicalrep_read_commit_prepared (StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
 
void logicalrep_write_rollback_prepared (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
void logicalrep_read_rollback_prepared (StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
 
void logicalrep_write_stream_prepare (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
void logicalrep_read_stream_prepare (StringInfo in, LogicalRepPreparedTxnData *prepare_data)
 
void logicalrep_write_origin (StringInfo out, const char *origin, XLogRecPtr origin_lsn)
 
char * logicalrep_read_origin (StringInfo in, XLogRecPtr *origin_lsn)
 
void logicalrep_write_insert (StringInfo out, TransactionId xid, Relation rel, HeapTuple newtuple, bool binary)
 
LogicalRepRelId logicalrep_read_insert (StringInfo in, LogicalRepTupleData *newtup)
 
void logicalrep_write_update (StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary)
 
LogicalRepRelId logicalrep_read_update (StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
 
void logicalrep_write_delete (StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, bool binary)
 
LogicalRepRelId logicalrep_read_delete (StringInfo in, LogicalRepTupleData *oldtup)
 
void logicalrep_write_truncate (StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
 
Listlogicalrep_read_truncate (StringInfo in, bool *cascade, bool *restart_seqs)
 
void logicalrep_write_message (StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
void logicalrep_write_rel (StringInfo out, TransactionId xid, Relation rel)
 
LogicalRepRelationlogicalrep_read_rel (StringInfo in)
 
void logicalrep_write_typ (StringInfo out, TransactionId xid, Oid typoid)
 
void logicalrep_read_typ (StringInfo in, LogicalRepTyp *ltyp)
 
void logicalrep_write_stream_start (StringInfo out, TransactionId xid, bool first_segment)
 
TransactionId logicalrep_read_stream_start (StringInfo in, bool *first_segment)
 
void logicalrep_write_stream_stop (StringInfo out)
 
void logicalrep_write_stream_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
TransactionId logicalrep_read_stream_commit (StringInfo in, LogicalRepCommitData *commit_data)
 
void logicalrep_write_stream_abort (StringInfo out, TransactionId xid, TransactionId subxid)
 
void logicalrep_read_stream_abort (StringInfo in, TransactionId *xid, TransactionId *subxid)
 
char * logicalrep_message_type (LogicalRepMsgType action)
 

Macro Definition Documentation

◆ LOGICALREP_IS_REPLICA_IDENTITY

#define LOGICALREP_IS_REPLICA_IDENTITY   1

Definition at line 26 of file proto.c.

Referenced by logicalrep_read_attrs(), and logicalrep_write_attrs().

◆ MESSAGE_TRANSACTIONAL

#define MESSAGE_TRANSACTIONAL   (1<<0)

Definition at line 28 of file proto.c.

Referenced by logicalrep_write_message().

◆ TRUNCATE_CASCADE

#define TRUNCATE_CASCADE   (1<<0)

Definition at line 29 of file proto.c.

Referenced by logicalrep_read_truncate(), and logicalrep_write_truncate().

◆ TRUNCATE_RESTART_SEQS

#define TRUNCATE_RESTART_SEQS   (1<<1)

Definition at line 30 of file proto.c.

Referenced by logicalrep_read_truncate(), and logicalrep_write_truncate().

Function Documentation

◆ logicalrep_message_type()

char* logicalrep_message_type ( LogicalRepMsgType  action)

Definition at line 1164 of file proto.c.

References elog, ERROR, LOGICAL_REP_MSG_BEGIN, LOGICAL_REP_MSG_BEGIN_PREPARE, LOGICAL_REP_MSG_COMMIT, LOGICAL_REP_MSG_COMMIT_PREPARED, LOGICAL_REP_MSG_DELETE, LOGICAL_REP_MSG_INSERT, LOGICAL_REP_MSG_MESSAGE, LOGICAL_REP_MSG_ORIGIN, LOGICAL_REP_MSG_PREPARE, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_ROLLBACK_PREPARED, LOGICAL_REP_MSG_STREAM_ABORT, LOGICAL_REP_MSG_STREAM_COMMIT, LOGICAL_REP_MSG_STREAM_PREPARE, LOGICAL_REP_MSG_STREAM_START, LOGICAL_REP_MSG_STREAM_STOP, LOGICAL_REP_MSG_TRUNCATE, LOGICAL_REP_MSG_TYPE, and LOGICAL_REP_MSG_UPDATE.

Referenced by apply_error_callback().

1165 {
1166  switch (action)
1167  {
1168  case LOGICAL_REP_MSG_BEGIN:
1169  return "BEGIN";
1171  return "COMMIT";
1173  return "ORIGIN";
1175  return "INSERT";
1177  return "UPDATE";
1179  return "DELETE";
1181  return "TRUNCATE";
1183  return "RELATION";
1184  case LOGICAL_REP_MSG_TYPE:
1185  return "TYPE";
1187  return "MESSAGE";
1189  return "BEGIN PREPARE";
1191  return "PREPARE";
1193  return "COMMIT PREPARED";
1195  return "ROLLBACK PREPARED";
1197  return "STREAM START";
1199  return "STREAM STOP";
1201  return "STREAM COMMIT";
1203  return "STREAM ABORT";
1205  return "STREAM PREPARE";
1206  }
1207 
1208  elog(ERROR, "invalid logical replication message type \"%c\"", action);
1209 
1210  return NULL; /* keep compiler quiet */
1211 }
#define ERROR
Definition: elog.h:46
#define elog(elevel,...)
Definition: elog.h:232

◆ logicalrep_read_attrs()

static void logicalrep_read_attrs ( StringInfo  in,
LogicalRepRelation rel 
)
static

Definition at line 965 of file proto.c.

References LogicalRepRelation::attkeys, LogicalRepRelation::attnames, LogicalRepRelation::atttyps, bms_add_member(), i, LOGICALREP_IS_REPLICA_IDENTITY, LogicalRepRelation::natts, palloc(), pq_getmsgbyte(), pq_getmsgint(), pq_getmsgstring(), and pstrdup().

Referenced by logicalrep_read_rel().

966 {
967  int i;
968  int natts;
969  char **attnames;
970  Oid *atttyps;
971  Bitmapset *attkeys = NULL;
972 
973  natts = pq_getmsgint(in, 2);
974  attnames = palloc(natts * sizeof(char *));
975  atttyps = palloc(natts * sizeof(Oid));
976 
977  /* read the attributes */
978  for (i = 0; i < natts; i++)
979  {
980  uint8 flags;
981 
982  /* Check for replica identity column */
983  flags = pq_getmsgbyte(in);
984  if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
985  attkeys = bms_add_member(attkeys, i);
986 
987  /* attribute name */
988  attnames[i] = pstrdup(pq_getmsgstring(in));
989 
990  /* attribute type id */
991  atttyps[i] = (Oid) pq_getmsgint(in, 4);
992 
993  /* we ignore attribute mode for now */
994  (void) pq_getmsgint(in, 4);
995  }
996 
997  rel->attnames = attnames;
998  rel->atttyps = atttyps;
999  rel->attkeys = attkeys;
1000  rel->natts = natts;
1001 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * pstrdup(const char *in)
Definition: mcxt.c:1299
unsigned char uint8
Definition: c.h:439
unsigned int Oid
Definition: postgres_ext.h:31
Bitmapset * attkeys
Definition: logicalproto.h:109
#define LOGICALREP_IS_REPLICA_IDENTITY
Definition: proto.c:26
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:736
void * palloc(Size size)
Definition: mcxt.c:1062
int i
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_begin()

void logicalrep_read_begin ( StringInfo  in,
LogicalRepBeginData begin_data 
)

Definition at line 60 of file proto.c.

References LogicalRepBeginData::committime, elog, ERROR, LogicalRepBeginData::final_lsn, InvalidXLogRecPtr, pq_getmsgint(), pq_getmsgint64(), and LogicalRepBeginData::xid.

Referenced by apply_handle_begin().

61 {
62  /* read fields */
63  begin_data->final_lsn = pq_getmsgint64(in);
64  if (begin_data->final_lsn == InvalidXLogRecPtr)
65  elog(ERROR, "final_lsn not set in begin message");
66  begin_data->committime = pq_getmsgint64(in);
67  begin_data->xid = pq_getmsgint(in, 4);
68 }
TransactionId xid
Definition: logicalproto.h:125
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define ERROR
Definition: elog.h:46
XLogRecPtr final_lsn
Definition: logicalproto.h:123
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
TimestampTz committime
Definition: logicalproto.h:124

◆ logicalrep_read_begin_prepare()

void logicalrep_read_begin_prepare ( StringInfo  in,
LogicalRepPreparedTxnData begin_data 
)

Definition at line 131 of file proto.c.

References elog, LogicalRepPreparedTxnData::end_lsn, ERROR, LogicalRepPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepPreparedTxnData::prepare_lsn, LogicalRepPreparedTxnData::prepare_time, strlcpy(), and LogicalRepPreparedTxnData::xid.

Referenced by apply_handle_begin_prepare().

132 {
133  /* read fields */
134  begin_data->prepare_lsn = pq_getmsgint64(in);
135  if (begin_data->prepare_lsn == InvalidXLogRecPtr)
136  elog(ERROR, "prepare_lsn not set in begin prepare message");
137  begin_data->end_lsn = pq_getmsgint64(in);
138  if (begin_data->end_lsn == InvalidXLogRecPtr)
139  elog(ERROR, "end_lsn not set in begin prepare message");
140  begin_data->prepare_time = pq_getmsgint64(in);
141  begin_data->xid = pq_getmsgint(in, 4);
142 
143  /* read gid (copy it into a pre-allocated buffer) */
144  strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
145 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
#define ERROR
Definition: elog.h:46
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_commit()

void logicalrep_read_commit ( StringInfo  in,
LogicalRepCommitData commit_data 
)

Definition at line 95 of file proto.c.

References LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, elog, LogicalRepCommitData::end_lsn, ERROR, pq_getmsgbyte(), and pq_getmsgint64().

Referenced by apply_handle_commit().

96 {
97  /* read flags (unused for now) */
98  uint8 flags = pq_getmsgbyte(in);
99 
100  if (flags != 0)
101  elog(ERROR, "unrecognized flags %u in commit message", flags);
102 
103  /* read fields */
104  commit_data->commit_lsn = pq_getmsgint64(in);
105  commit_data->end_lsn = pq_getmsgint64(in);
106  commit_data->committime = pq_getmsgint64(in);
107 }
unsigned char uint8
Definition: c.h:439
#define ERROR
Definition: elog.h:46
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
TimestampTz committime
Definition: logicalproto.h:132

◆ logicalrep_read_commit_prepared()

void logicalrep_read_commit_prepared ( StringInfo  in,
LogicalRepCommitPreparedTxnData prepare_data 
)

Definition at line 264 of file proto.c.

References LogicalRepCommitPreparedTxnData::commit_lsn, LogicalRepCommitPreparedTxnData::commit_time, elog, LogicalRepCommitPreparedTxnData::end_lsn, ERROR, LogicalRepCommitPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), strlcpy(), and LogicalRepCommitPreparedTxnData::xid.

Referenced by apply_handle_commit_prepared().

265 {
266  /* read flags */
267  uint8 flags = pq_getmsgbyte(in);
268 
269  if (flags != 0)
270  elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
271 
272  /* read fields */
273  prepare_data->commit_lsn = pq_getmsgint64(in);
274  if (prepare_data->commit_lsn == InvalidXLogRecPtr)
275  elog(ERROR, "commit_lsn is not set in commit prepared message");
276  prepare_data->end_lsn = pq_getmsgint64(in);
277  if (prepare_data->end_lsn == InvalidXLogRecPtr)
278  elog(ERROR, "end_lsn is not set in commit prepared message");
279  prepare_data->commit_time = pq_getmsgint64(in);
280  prepare_data->xid = pq_getmsgint(in, 4);
281 
282  /* read gid (copy it into a pre-allocated buffer) */
283  strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
284 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
unsigned char uint8
Definition: c.h:439
#define ERROR
Definition: elog.h:46
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_delete()

LogicalRepRelId logicalrep_read_delete ( StringInfo  in,
LogicalRepTupleData oldtup 
)

Definition at line 548 of file proto.c.

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_delete().

549 {
550  char action;
551  LogicalRepRelId relid;
552 
553  /* read the relation id */
554  relid = pq_getmsgint(in, 4);
555 
556  /* read and verify action */
557  action = pq_getmsgbyte(in);
558  if (action != 'K' && action != 'O')
559  elog(ERROR, "expected action 'O' or 'K', got %c", action);
560 
561  logicalrep_read_tuple(in, oldtup);
562 
563  return relid;
564 }
#define ERROR
Definition: elog.h:46
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:841
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:232
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
uint32 LogicalRepRelId
Definition: logicalproto.h:95

◆ logicalrep_read_insert()

LogicalRepRelId logicalrep_read_insert ( StringInfo  in,
LogicalRepTupleData newtup 
)

Definition at line 422 of file proto.c.

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_insert().

423 {
424  char action;
425  LogicalRepRelId relid;
426 
427  /* read the relation id */
428  relid = pq_getmsgint(in, 4);
429 
430  action = pq_getmsgbyte(in);
431  if (action != 'N')
432  elog(ERROR, "expected new tuple but got %d",
433  action);
434 
435  logicalrep_read_tuple(in, newtup);
436 
437  return relid;
438 }
#define ERROR
Definition: elog.h:46
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:841
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:232
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
uint32 LogicalRepRelId
Definition: logicalproto.h:95

◆ logicalrep_read_namespace()

static const char * logicalrep_read_namespace ( StringInfo  in)
static

Definition at line 1027 of file proto.c.

References pq_getmsgstring().

Referenced by logicalrep_read_rel(), and logicalrep_read_typ().

1028 {
1029  const char *nspname = pq_getmsgstring(in);
1030 
1031  if (nspname[0] == '\0')
1032  nspname = "pg_catalog";
1033 
1034  return nspname;
1035 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581

◆ logicalrep_read_origin()

char* logicalrep_read_origin ( StringInfo  in,
XLogRecPtr origin_lsn 
)

Definition at line 387 of file proto.c.

References pq_getmsgint64(), pq_getmsgstring(), and pstrdup().

388 {
389  /* fixed fields */
390  *origin_lsn = pq_getmsgint64(in);
391 
392  /* return origin */
393  return pstrdup(pq_getmsgstring(in));
394 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * pstrdup(const char *in)
Definition: mcxt.c:1299
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455

◆ logicalrep_read_prepare()

void logicalrep_read_prepare ( StringInfo  in,
LogicalRepPreparedTxnData prepare_data 
)

Definition at line 225 of file proto.c.

References logicalrep_read_prepare_common().

Referenced by apply_handle_prepare().

226 {
227  logicalrep_read_prepare_common(in, "prepare", prepare_data);
228 }
static void logicalrep_read_prepare_common(StringInfo in, char *msgtype, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:196

◆ logicalrep_read_prepare_common()

static void logicalrep_read_prepare_common ( StringInfo  in,
char *  msgtype,
LogicalRepPreparedTxnData prepare_data 
)
static

Definition at line 196 of file proto.c.

References elog, LogicalRepPreparedTxnData::end_lsn, ERROR, LogicalRepPreparedTxnData::gid, InvalidTransactionId, InvalidXLogRecPtr, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepPreparedTxnData::prepare_lsn, LogicalRepPreparedTxnData::prepare_time, strlcpy(), and LogicalRepPreparedTxnData::xid.

Referenced by logicalrep_read_prepare(), and logicalrep_read_stream_prepare().

198 {
199  /* read flags */
200  uint8 flags = pq_getmsgbyte(in);
201 
202  if (flags != 0)
203  elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
204 
205  /* read fields */
206  prepare_data->prepare_lsn = pq_getmsgint64(in);
207  if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
208  elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
209  prepare_data->end_lsn = pq_getmsgint64(in);
210  if (prepare_data->end_lsn == InvalidXLogRecPtr)
211  elog(ERROR, "end_lsn is not set in %s message", msgtype);
212  prepare_data->prepare_time = pq_getmsgint64(in);
213  prepare_data->xid = pq_getmsgint(in, 4);
214  if (prepare_data->xid == InvalidTransactionId)
215  elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
216 
217  /* read gid (copy it into a pre-allocated buffer) */
218  strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
219 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
unsigned char uint8
Definition: c.h:439
#define ERROR
Definition: elog.h:46
#define InvalidTransactionId
Definition: transam.h:31
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_rel()

LogicalRepRelation* logicalrep_read_rel ( StringInfo  in)

Definition at line 683 of file proto.c.

References logicalrep_read_attrs(), logicalrep_read_namespace(), LogicalRepRelation::nspname, palloc(), pq_getmsgbyte(), pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepRelation::relname, LogicalRepRelation::remoteid, and LogicalRepRelation::replident.

Referenced by apply_handle_relation().

684 {
686 
687  rel->remoteid = pq_getmsgint(in, 4);
688 
689  /* Read relation name from stream */
691  rel->relname = pstrdup(pq_getmsgstring(in));
692 
693  /* Read the replica identity. */
694  rel->replident = pq_getmsgbyte(in);
695 
696  /* Get attribute description */
697  logicalrep_read_attrs(in, rel);
698 
699  return rel;
700 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * pstrdup(const char *in)
Definition: mcxt.c:1299
LogicalRepRelId remoteid
Definition: logicalproto.h:101
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
Definition: proto.c:965
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:1027
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
void * palloc(Size size)
Definition: mcxt.c:1062
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_rollback_prepared()

void logicalrep_read_rollback_prepared ( StringInfo  in,
LogicalRepRollbackPreparedTxnData rollback_data 
)

Definition at line 322 of file proto.c.

References elog, ERROR, LogicalRepRollbackPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepRollbackPreparedTxnData::prepare_end_lsn, LogicalRepRollbackPreparedTxnData::prepare_time, LogicalRepRollbackPreparedTxnData::rollback_end_lsn, LogicalRepRollbackPreparedTxnData::rollback_time, strlcpy(), and LogicalRepRollbackPreparedTxnData::xid.

Referenced by apply_handle_rollback_prepared().

324 {
325  /* read flags */
326  uint8 flags = pq_getmsgbyte(in);
327 
328  if (flags != 0)
329  elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
330 
331  /* read fields */
332  rollback_data->prepare_end_lsn = pq_getmsgint64(in);
333  if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
334  elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
335  rollback_data->rollback_end_lsn = pq_getmsgint64(in);
336  if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
337  elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
338  rollback_data->prepare_time = pq_getmsgint64(in);
339  rollback_data->rollback_time = pq_getmsgint64(in);
340  rollback_data->xid = pq_getmsgint(in, 4);
341 
342  /* read gid (copy it into a pre-allocated buffer) */
343  strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
344 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
unsigned char uint8
Definition: c.h:439
#define ERROR
Definition: elog.h:46
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_stream_abort()

void logicalrep_read_stream_abort ( StringInfo  in,
TransactionId xid,
TransactionId subxid 
)

Definition at line 1151 of file proto.c.

References Assert, and pq_getmsgint().

Referenced by apply_handle_stream_abort().

1153 {
1154  Assert(xid && subxid);
1155 
1156  *xid = pq_getmsgint(in, 4);
1157  *subxid = pq_getmsgint(in, 4);
1158 }
#define Assert(condition)
Definition: c.h:804
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_stream_commit()

TransactionId logicalrep_read_stream_commit ( StringInfo  in,
LogicalRepCommitData commit_data 
)

Definition at line 1109 of file proto.c.

References LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, elog, LogicalRepCommitData::end_lsn, ERROR, pq_getmsgbyte(), pq_getmsgint(), and pq_getmsgint64().

Referenced by apply_handle_stream_commit().

1110 {
1111  TransactionId xid;
1112  uint8 flags;
1113 
1114  xid = pq_getmsgint(in, 4);
1115 
1116  /* read flags (unused for now) */
1117  flags = pq_getmsgbyte(in);
1118 
1119  if (flags != 0)
1120  elog(ERROR, "unrecognized flags %u in commit message", flags);
1121 
1122  /* read fields */
1123  commit_data->commit_lsn = pq_getmsgint64(in);
1124  commit_data->end_lsn = pq_getmsgint64(in);
1125  commit_data->committime = pq_getmsgint64(in);
1126 
1127  return xid;
1128 }
uint32 TransactionId
Definition: c.h:587
unsigned char uint8
Definition: c.h:439
#define ERROR
Definition: elog.h:46
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
TimestampTz committime
Definition: logicalproto.h:132

◆ logicalrep_read_stream_prepare()

void logicalrep_read_stream_prepare ( StringInfo  in,
LogicalRepPreparedTxnData prepare_data 
)

Definition at line 362 of file proto.c.

References logicalrep_read_prepare_common().

Referenced by apply_handle_stream_prepare().

363 {
364  logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
365 }
static void logicalrep_read_prepare_common(StringInfo in, char *msgtype, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:196

◆ logicalrep_read_stream_start()

TransactionId logicalrep_read_stream_start ( StringInfo  in,
bool first_segment 
)

Definition at line 1059 of file proto.c.

References Assert, pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_stream_start().

1060 {
1061  TransactionId xid;
1062 
1063  Assert(first_segment);
1064 
1065  xid = pq_getmsgint(in, 4);
1066  *first_segment = (pq_getmsgbyte(in) == 1);
1067 
1068  return xid;
1069 }
uint32 TransactionId
Definition: c.h:587
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define Assert(condition)
Definition: c.h:804
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_truncate()

List* logicalrep_read_truncate ( StringInfo  in,
bool cascade,
bool restart_seqs 
)

Definition at line 602 of file proto.c.

References i, lappend_oid(), NIL, pq_getmsgint(), TRUNCATE_CASCADE, and TRUNCATE_RESTART_SEQS.

Referenced by apply_handle_truncate().

604 {
605  int i;
606  int nrelids;
607  List *relids = NIL;
608  uint8 flags;
609 
610  nrelids = pq_getmsgint(in, 4);
611 
612  /* read and decode truncate flags */
613  flags = pq_getmsgint(in, 1);
614  *cascade = (flags & TRUNCATE_CASCADE) > 0;
615  *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
616 
617  for (i = 0; i < nrelids; i++)
618  relids = lappend_oid(relids, pq_getmsgint(in, 4));
619 
620  return relids;
621 }
#define NIL
Definition: pg_list.h:65
unsigned char uint8
Definition: c.h:439
List * lappend_oid(List *list, Oid datum)
Definition: list.c:372
int i
#define TRUNCATE_CASCADE
Definition: proto.c:29
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
Definition: pg_list.h:50
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:30

◆ logicalrep_read_tuple()

static void logicalrep_read_tuple ( StringInfo  in,
LogicalRepTupleData tuple 
)
static

Definition at line 841 of file proto.c.

References LogicalRepTupleData::colstatus, LogicalRepTupleData::colvalues, StringInfoData::cursor, StringInfoData::data, elog, ERROR, i, StringInfoData::len, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_NULL, LOGICALREP_COLUMN_TEXT, LOGICALREP_COLUMN_UNCHANGED, StringInfoData::maxlen, LogicalRepTupleData::ncols, palloc(), palloc0(), pq_copymsgbytes(), pq_getmsgbyte(), pq_getmsgint(), and value.

Referenced by logicalrep_read_delete(), logicalrep_read_insert(), and logicalrep_read_update().

842 {
843  int i;
844  int natts;
845 
846  /* Get number of attributes */
847  natts = pq_getmsgint(in, 2);
848 
849  /* Allocate space for per-column values; zero out unused StringInfoDatas */
850  tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
851  tuple->colstatus = (char *) palloc(natts * sizeof(char));
852  tuple->ncols = natts;
853 
854  /* Read the data */
855  for (i = 0; i < natts; i++)
856  {
857  char kind;
858  int len;
859  StringInfo value = &tuple->colvalues[i];
860 
861  kind = pq_getmsgbyte(in);
862  tuple->colstatus[i] = kind;
863 
864  switch (kind)
865  {
867  /* nothing more to do */
868  break;
870  /* we don't receive the value of an unchanged column */
871  break;
873  len = pq_getmsgint(in, 4); /* read length */
874 
875  /* and data */
876  value->data = palloc(len + 1);
877  pq_copymsgbytes(in, value->data, len);
878  value->data[len] = '\0';
879  /* make StringInfo fully valid */
880  value->len = len;
881  value->cursor = 0;
882  value->maxlen = len;
883  break;
885  len = pq_getmsgint(in, 4); /* read length */
886 
887  /* and data */
888  value->data = palloc(len + 1);
889  pq_copymsgbytes(in, value->data, len);
890  /* not strictly necessary but per StringInfo practice */
891  value->data[len] = '\0';
892  /* make StringInfo fully valid */
893  value->len = len;
894  value->cursor = 0;
895  value->maxlen = len;
896  break;
897  default:
898  elog(ERROR, "unrecognized data representation type '%c'", kind);
899  }
900  }
901 }
#define LOGICALREP_COLUMN_NULL
Definition: logicalproto.h:90
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:92
StringInfoData * colvalues
Definition: logicalproto.h:81
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:93
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:91
#define ERROR
Definition: elog.h:46
void * palloc0(Size size)
Definition: mcxt.c:1093
static struct @143 value
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
void pq_copymsgbytes(StringInfo msg, char *buf, int datalen)
Definition: pqformat.c:530
void * palloc(Size size)
Definition: mcxt.c:1062
#define elog(elevel,...)
Definition: elog.h:232
int i
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_typ()

void logicalrep_read_typ ( StringInfo  in,
LogicalRepTyp ltyp 
)

Definition at line 739 of file proto.c.

References logicalrep_read_namespace(), LogicalRepTyp::nspname, pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepTyp::remoteid, and LogicalRepTyp::typname.

Referenced by apply_handle_type().

740 {
741  ltyp->remoteid = pq_getmsgint(in, 4);
742 
743  /* Read type name from stream */
745  ltyp->typname = pstrdup(pq_getmsgstring(in));
746 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * pstrdup(const char *in)
Definition: mcxt.c:1299
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:1027
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_update()

LogicalRepRelId logicalrep_read_update ( StringInfo  in,
bool has_oldtuple,
LogicalRepTupleData oldtup,
LogicalRepTupleData newtup 
)

Definition at line 477 of file proto.c.

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_update().

480 {
481  char action;
482  LogicalRepRelId relid;
483 
484  /* read the relation id */
485  relid = pq_getmsgint(in, 4);
486 
487  /* read and verify action */
488  action = pq_getmsgbyte(in);
489  if (action != 'K' && action != 'O' && action != 'N')
490  elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
491  action);
492 
493  /* check for old tuple */
494  if (action == 'K' || action == 'O')
495  {
496  logicalrep_read_tuple(in, oldtup);
497  *has_oldtuple = true;
498 
499  action = pq_getmsgbyte(in);
500  }
501  else
502  *has_oldtuple = false;
503 
504  /* check for new tuple */
505  if (action != 'N')
506  elog(ERROR, "expected action 'N', got %c",
507  action);
508 
509  logicalrep_read_tuple(in, newtup);
510 
511  return relid;
512 }
#define ERROR
Definition: elog.h:46
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:841
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:232
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
uint32 LogicalRepRelId
Definition: logicalproto.h:95

◆ logicalrep_write_attrs()

static void logicalrep_write_attrs ( StringInfo  out,
Relation  rel 
)
static

Definition at line 907 of file proto.c.

References bms_free(), bms_is_member(), FirstLowInvalidHeapAttributeNumber, i, LOGICALREP_IS_REPLICA_IDENTITY, NameStr, TupleDescData::natts, pq_sendbyte(), pq_sendint16(), pq_sendint32(), pq_sendstring(), RelationData::rd_rel, RelationGetDescr, RelationGetIdentityKeyBitmap(), and TupleDescAttr.

Referenced by logicalrep_write_rel().

908 {
909  TupleDesc desc;
910  int i;
911  uint16 nliveatts = 0;
912  Bitmapset *idattrs = NULL;
913  bool replidentfull;
914 
915  desc = RelationGetDescr(rel);
916 
917  /* send number of live attributes */
918  for (i = 0; i < desc->natts; i++)
919  {
920  if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
921  continue;
922  nliveatts++;
923  }
924  pq_sendint16(out, nliveatts);
925 
926  /* fetch bitmap of REPLICATION IDENTITY attributes */
927  replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
928  if (!replidentfull)
929  idattrs = RelationGetIdentityKeyBitmap(rel);
930 
931  /* send the attributes */
932  for (i = 0; i < desc->natts; i++)
933  {
934  Form_pg_attribute att = TupleDescAttr(desc, i);
935  uint8 flags = 0;
936 
937  if (att->attisdropped || att->attgenerated)
938  continue;
939 
940  /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
941  if (replidentfull ||
943  idattrs))
945 
946  pq_sendbyte(out, flags);
947 
948  /* attribute name */
949  pq_sendstring(out, NameStr(att->attname));
950 
951  /* attribute type id */
952  pq_sendint32(out, (int) att->atttypid);
953 
954  /* attribute mode */
955  pq_sendint32(out, att->atttypmod);
956  }
957 
958  bms_free(idattrs);
959 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
#define RelationGetDescr(relation)
Definition: rel.h:503
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
unsigned char uint8
Definition: c.h:439
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
Form_pg_class rd_rel
Definition: rel.h:109
Bitmapset * RelationGetIdentityKeyBitmap(Relation relation)
Definition: relcache.c:5247
#define LOGICALREP_IS_REPLICA_IDENTITY
Definition: proto.c:26
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
unsigned short uint16
Definition: c.h:440
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:207
void bms_free(Bitmapset *a)
Definition: bitmapset.c:208
int i
#define NameStr(name)
Definition: c.h:681
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:427

◆ logicalrep_write_begin()

void logicalrep_write_begin ( StringInfo  out,
ReorderBufferTXN txn 
)

Definition at line 46 of file proto.c.

References ReorderBufferTXN::commit_time, ReorderBufferTXN::final_lsn, LOGICAL_REP_MSG_BEGIN, pq_sendbyte(), pq_sendint32(), pq_sendint64(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_begin_txn().

47 {
49 
50  /* fixed fields */
51  pq_sendint64(out, txn->final_lsn);
53  pq_sendint32(out, txn->xid);
54 }
TimestampTz commit_time
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
XLogRecPtr final_lsn
TransactionId xid
union ReorderBufferTXN::@103 xact_time

◆ logicalrep_write_begin_prepare()

void logicalrep_write_begin_prepare ( StringInfo  out,
ReorderBufferTXN txn 
)

Definition at line 113 of file proto.c.

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_BEGIN_PREPARE, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::prepare_time, ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_begin_prepare_txn().

114 {
116 
117  /* fixed fields */
118  pq_sendint64(out, txn->final_lsn);
119  pq_sendint64(out, txn->end_lsn);
121  pq_sendint32(out, txn->xid);
122 
123  /* send gid */
124  pq_sendstring(out, txn->gid);
125 }
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
XLogRecPtr final_lsn
TransactionId xid
union ReorderBufferTXN::@103 xact_time
XLogRecPtr end_lsn
TimestampTz prepare_time

◆ logicalrep_write_commit()

void logicalrep_write_commit ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 75 of file proto.c.

References ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, LOGICAL_REP_MSG_COMMIT, pq_sendbyte(), pq_sendint64(), and ReorderBufferTXN::xact_time.

Referenced by pgoutput_commit_txn().

77 {
78  uint8 flags = 0;
79 
81 
82  /* send the flags field (unused for now) */
83  pq_sendbyte(out, flags);
84 
85  /* send fields */
86  pq_sendint64(out, commit_lsn);
87  pq_sendint64(out, txn->end_lsn);
89 }
TimestampTz commit_time
unsigned char uint8
Definition: c.h:439
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
union ReorderBufferTXN::@103 xact_time
XLogRecPtr end_lsn

◆ logicalrep_write_commit_prepared()

void logicalrep_write_commit_prepared ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 234 of file proto.c.

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_COMMIT_PREPARED, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_commit_prepared_txn().

236 {
237  uint8 flags = 0;
238 
240 
241  /*
242  * This should only ever happen for two-phase commit transactions, in
243  * which case we expect to have a valid GID.
244  */
245  Assert(txn->gid != NULL);
246 
247  /* send the flags field */
248  pq_sendbyte(out, flags);
249 
250  /* send fields */
251  pq_sendint64(out, commit_lsn);
252  pq_sendint64(out, txn->end_lsn);
254  pq_sendint32(out, txn->xid);
255 
256  /* send gid */
257  pq_sendstring(out, txn->gid);
258 }
TimestampTz commit_time
unsigned char uint8
Definition: c.h:439
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
TransactionId xid
union ReorderBufferTXN::@103 xact_time
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn

◆ logicalrep_write_delete()

void logicalrep_write_delete ( StringInfo  out,
TransactionId  xid,
Relation  rel,
HeapTuple  oldtuple,
bool  binary 
)

Definition at line 518 of file proto.c.

References Assert, LOGICAL_REP_MSG_DELETE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

520 {
521  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
522  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
523  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
524 
526 
527  /* transaction ID (if not valid, we're not streaming) */
528  if (TransactionIdIsValid(xid))
529  pq_sendint32(out, xid);
530 
531  /* use Oid as relation identifier */
532  pq_sendint32(out, RelationGetRelid(rel));
533 
534  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
535  pq_sendbyte(out, 'O'); /* old tuple follows */
536  else
537  pq_sendbyte(out, 'K'); /* old key follows */
538 
539  logicalrep_write_tuple(out, rel, oldtuple, binary);
540 }
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
Definition: proto.c:752
Form_pg_class rd_rel
Definition: rel.h:109
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:804
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:477

◆ logicalrep_write_insert()

void logicalrep_write_insert ( StringInfo  out,
TransactionId  xid,
Relation  rel,
HeapTuple  newtuple,
bool  binary 
)

Definition at line 400 of file proto.c.

References LOGICAL_REP_MSG_INSERT, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

402 {
404 
405  /* transaction ID (if not valid, we're not streaming) */
406  if (TransactionIdIsValid(xid))
407  pq_sendint32(out, xid);
408 
409  /* use Oid as relation identifier */
410  pq_sendint32(out, RelationGetRelid(rel));
411 
412  pq_sendbyte(out, 'N'); /* new tuple follows */
413  logicalrep_write_tuple(out, rel, newtuple, binary);
414 }
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
Definition: proto.c:752
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:477

◆ logicalrep_write_message()

void logicalrep_write_message ( StringInfo  out,
TransactionId  xid,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)

Definition at line 627 of file proto.c.

References LOGICAL_REP_MSG_MESSAGE, MESSAGE_TRANSACTIONAL, pq_sendbyte(), pq_sendbytes(), pq_sendint32(), pq_sendint64(), pq_sendint8(), pq_sendstring(), and TransactionIdIsValid.

Referenced by pgoutput_message().

630 {
631  uint8 flags = 0;
632 
634 
635  /* encode and send message flags */
636  if (transactional)
637  flags |= MESSAGE_TRANSACTIONAL;
638 
639  /* transaction ID (if not valid, we're not streaming) */
640  if (TransactionIdIsValid(xid))
641  pq_sendint32(out, xid);
642 
643  pq_sendint8(out, flags);
644  pq_sendint64(out, lsn);
645  pq_sendstring(out, prefix);
646  pq_sendint32(out, sz);
647  pq_sendbytes(out, message, sz);
648 }
unsigned char uint8
Definition: c.h:439
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define MESSAGE_TRANSACTIONAL
Definition: proto.c:28
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static void pq_sendint8(StringInfo buf, uint8 i)
Definition: pqformat.h:129

◆ logicalrep_write_namespace()

static void logicalrep_write_namespace ( StringInfo  out,
Oid  nspid 
)
static

Definition at line 1007 of file proto.c.

References elog, ERROR, get_namespace_name(), pq_sendbyte(), and pq_sendstring().

Referenced by logicalrep_write_rel(), and logicalrep_write_typ().

1008 {
1009  if (nspid == PG_CATALOG_NAMESPACE)
1010  pq_sendbyte(out, '\0');
1011  else
1012  {
1013  char *nspname = get_namespace_name(nspid);
1014 
1015  if (nspname == NULL)
1016  elog(ERROR, "cache lookup failed for namespace %u",
1017  nspid);
1018 
1019  pq_sendstring(out, nspname);
1020  }
1021 }
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define ERROR
Definition: elog.h:46
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3316
#define elog(elevel,...)
Definition: elog.h:232

◆ logicalrep_write_origin()

void logicalrep_write_origin ( StringInfo  out,
const char *  origin,
XLogRecPtr  origin_lsn 
)

Definition at line 371 of file proto.c.

References LOGICAL_REP_MSG_ORIGIN, pq_sendbyte(), pq_sendint64(), and pq_sendstring().

Referenced by send_repl_origin().

373 {
375 
376  /* fixed fields */
377  pq_sendint64(out, origin_lsn);
378 
379  /* origin string */
380  pq_sendstring(out, origin);
381 }
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161

◆ logicalrep_write_prepare()

void logicalrep_write_prepare ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)

Definition at line 184 of file proto.c.

References LOGICAL_REP_MSG_PREPARE, and logicalrep_write_prepare_common().

Referenced by pgoutput_prepare_txn().

186 {
188  txn, prepare_lsn);
189 }
static void logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:152

◆ logicalrep_write_prepare_common()

static void logicalrep_write_prepare_common ( StringInfo  out,
LogicalRepMsgType  type,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 152 of file proto.c.

References Assert, ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::prepare_time, rbtxn_prepared, TransactionIdIsValid, ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by logicalrep_write_prepare(), and logicalrep_write_stream_prepare().

154 {
155  uint8 flags = 0;
156 
157  pq_sendbyte(out, type);
158 
159  /*
160  * This should only ever happen for two-phase commit transactions, in
161  * which case we expect to have a valid GID.
162  */
163  Assert(txn->gid != NULL);
164  Assert(rbtxn_prepared(txn));
166 
167  /* send the flags field */
168  pq_sendbyte(out, flags);
169 
170  /* send fields */
171  pq_sendint64(out, prepare_lsn);
172  pq_sendint64(out, txn->end_lsn);
174  pq_sendint32(out, txn->xid);
175 
176  /* send gid */
177  pq_sendstring(out, txn->gid);
178 }
#define rbtxn_prepared(txn)
unsigned char uint8
Definition: c.h:439
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
TransactionId xid
union ReorderBufferTXN::@103 xact_time
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn
TimestampTz prepare_time
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ logicalrep_write_rel()

void logicalrep_write_rel ( StringInfo  out,
TransactionId  xid,
Relation  rel 
)

Definition at line 654 of file proto.c.

References LOGICAL_REP_MSG_RELATION, logicalrep_write_attrs(), logicalrep_write_namespace(), pq_sendbyte(), pq_sendint32(), pq_sendstring(), RelationData::rd_rel, RelationGetNamespace, RelationGetRelationName, RelationGetRelid, relname, and TransactionIdIsValid.

Referenced by send_relation_and_attrs().

655 {
656  char *relname;
657 
659 
660  /* transaction ID (if not valid, we're not streaming) */
661  if (TransactionIdIsValid(xid))
662  pq_sendint32(out, xid);
663 
664  /* use Oid as relation identifier */
665  pq_sendint32(out, RelationGetRelid(rel));
666 
667  /* send qualified relation name */
669  relname = RelationGetRelationName(rel);
670  pq_sendstring(out, relname);
671 
672  /* send replica identity */
673  pq_sendbyte(out, rel->rd_rel->relreplident);
674 
675  /* send the attribute info */
676  logicalrep_write_attrs(out, rel);
677 }
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:1007
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
Form_pg_class rd_rel
Definition: rel.h:109
NameData relname
Definition: pg_class.h:38
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static void logicalrep_write_attrs(StringInfo out, Relation rel)
Definition: proto.c:907
#define RelationGetRelationName(relation)
Definition: rel.h:511
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:477
#define RelationGetNamespace(relation)
Definition: rel.h:518

◆ logicalrep_write_rollback_prepared()

void logicalrep_write_rollback_prepared ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  prepare_end_lsn,
TimestampTz  prepare_time 
)

Definition at line 290 of file proto.c.

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_ROLLBACK_PREPARED, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_rollback_prepared_txn().

293 {
294  uint8 flags = 0;
295 
297 
298  /*
299  * This should only ever happen for two-phase commit transactions, in
300  * which case we expect to have a valid GID.
301  */
302  Assert(txn->gid != NULL);
303 
304  /* send the flags field */
305  pq_sendbyte(out, flags);
306 
307  /* send fields */
308  pq_sendint64(out, prepare_end_lsn);
309  pq_sendint64(out, txn->end_lsn);
310  pq_sendint64(out, prepare_time);
312  pq_sendint32(out, txn->xid);
313 
314  /* send gid */
315  pq_sendstring(out, txn->gid);
316 }
TimestampTz commit_time
unsigned char uint8
Definition: c.h:439
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
TransactionId xid
union ReorderBufferTXN::@103 xact_time
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn

◆ logicalrep_write_stream_abort()

void logicalrep_write_stream_abort ( StringInfo  out,
TransactionId  xid,
TransactionId  subxid 
)

Definition at line 1135 of file proto.c.

References Assert, LOGICAL_REP_MSG_STREAM_ABORT, pq_sendbyte(), pq_sendint32(), and TransactionIdIsValid.

Referenced by pgoutput_stream_abort().

1137 {
1139 
1141 
1142  /* transaction ID */
1143  pq_sendint32(out, xid);
1144  pq_sendint32(out, subxid);
1145 }
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:804
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ logicalrep_write_stream_commit()

void logicalrep_write_stream_commit ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 1084 of file proto.c.

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, LOGICAL_REP_MSG_STREAM_COMMIT, pq_sendbyte(), pq_sendint32(), pq_sendint64(), TransactionIdIsValid, ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_stream_commit().

1086 {
1087  uint8 flags = 0;
1088 
1090 
1092 
1093  /* transaction ID */
1094  pq_sendint32(out, txn->xid);
1095 
1096  /* send the flags field (unused for now) */
1097  pq_sendbyte(out, flags);
1098 
1099  /* send fields */
1100  pq_sendint64(out, commit_lsn);
1101  pq_sendint64(out, txn->end_lsn);
1102  pq_sendint64(out, txn->xact_time.commit_time);
1103 }
TimestampTz commit_time
unsigned char uint8
Definition: c.h:439
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
TransactionId xid
union ReorderBufferTXN::@103 xact_time
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ logicalrep_write_stream_prepare()

void logicalrep_write_stream_prepare ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)

Definition at line 350 of file proto.c.

References LOGICAL_REP_MSG_STREAM_PREPARE, and logicalrep_write_prepare_common().

Referenced by pgoutput_stream_prepare_txn().

353 {
355  txn, prepare_lsn);
356 }
static void logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:152

◆ logicalrep_write_stream_start()

void logicalrep_write_stream_start ( StringInfo  out,
TransactionId  xid,
bool  first_segment 
)

Definition at line 1041 of file proto.c.

References Assert, LOGICAL_REP_MSG_STREAM_START, pq_sendbyte(), pq_sendint32(), and TransactionIdIsValid.

Referenced by pgoutput_stream_start().

1043 {
1045 
1047 
1048  /* transaction ID (we're starting to stream, so must be valid) */
1049  pq_sendint32(out, xid);
1050 
1051  /* 1 if this is the first streaming segment for this xid */
1052  pq_sendbyte(out, first_segment ? 1 : 0);
1053 }
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:804
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ logicalrep_write_stream_stop()

void logicalrep_write_stream_stop ( StringInfo  out)

Definition at line 1075 of file proto.c.

References LOGICAL_REP_MSG_STREAM_STOP, and pq_sendbyte().

Referenced by pgoutput_stream_stop().

1076 {
1078 }
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161

◆ logicalrep_write_truncate()

void logicalrep_write_truncate ( StringInfo  out,
TransactionId  xid,
int  nrelids,
Oid  relids[],
bool  cascade,
bool  restart_seqs 
)

Definition at line 570 of file proto.c.

References i, LOGICAL_REP_MSG_TRUNCATE, pq_sendbyte(), pq_sendint32(), pq_sendint8(), TransactionIdIsValid, TRUNCATE_CASCADE, and TRUNCATE_RESTART_SEQS.

Referenced by pgoutput_truncate().

575 {
576  int i;
577  uint8 flags = 0;
578 
580 
581  /* transaction ID (if not valid, we're not streaming) */
582  if (TransactionIdIsValid(xid))
583  pq_sendint32(out, xid);
584 
585  pq_sendint32(out, nrelids);
586 
587  /* encode and send truncate flags */
588  if (cascade)
589  flags |= TRUNCATE_CASCADE;
590  if (restart_seqs)
591  flags |= TRUNCATE_RESTART_SEQS;
592  pq_sendint8(out, flags);
593 
594  for (i = 0; i < nrelids; i++)
595  pq_sendint32(out, relids[i]);
596 }
unsigned char uint8
Definition: c.h:439
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
int i
#define TRUNCATE_CASCADE
Definition: proto.c:29
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static void pq_sendint8(StringInfo buf, uint8 i)
Definition: pqformat.h:129
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:30

◆ logicalrep_write_tuple()

static void logicalrep_write_tuple ( StringInfo  out,
Relation  rel,
HeapTuple  tuple,
bool  binary 
)
static

Definition at line 752 of file proto.c.

References elog, enlargeStringInfo(), ERROR, GETSTRUCT, heap_deform_tuple(), HeapTupleIsValid, i, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_NULL, LOGICALREP_COLUMN_TEXT, LOGICALREP_COLUMN_UNCHANGED, MaxTupleAttributeNumber, TupleDescData::natts, ObjectIdGetDatum, OidIsValid, OidOutputFunctionCall(), OidSendFunctionCall(), pfree(), pq_sendbyte(), pq_sendbytes(), pq_sendcountedtext(), pq_sendint(), pq_sendint16(), RelationGetDescr, ReleaseSysCache(), SearchSysCache1(), HeapTupleData::t_len, TupleDescAttr, TYPEOID, values, VARATT_IS_EXTERNAL_ONDISK, VARDATA, VARHDRSZ, and VARSIZE.

Referenced by logicalrep_write_delete(), logicalrep_write_insert(), and logicalrep_write_update().

753 {
754  TupleDesc desc;
756  bool isnull[MaxTupleAttributeNumber];
757  int i;
758  uint16 nliveatts = 0;
759 
760  desc = RelationGetDescr(rel);
761 
762  for (i = 0; i < desc->natts; i++)
763  {
764  if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
765  continue;
766  nliveatts++;
767  }
768  pq_sendint16(out, nliveatts);
769 
770  /* try to allocate enough memory from the get-go */
771  enlargeStringInfo(out, tuple->t_len +
772  nliveatts * (1 + 4));
773 
774  heap_deform_tuple(tuple, desc, values, isnull);
775 
776  /* Write the values */
777  for (i = 0; i < desc->natts; i++)
778  {
779  HeapTuple typtup;
780  Form_pg_type typclass;
781  Form_pg_attribute att = TupleDescAttr(desc, i);
782 
783  if (att->attisdropped || att->attgenerated)
784  continue;
785 
786  if (isnull[i])
787  {
789  continue;
790  }
791 
792  if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
793  {
794  /*
795  * Unchanged toasted datum. (Note that we don't promise to detect
796  * unchanged data in general; this is just a cheap check to avoid
797  * sending large values unnecessarily.)
798  */
800  continue;
801  }
802 
803  typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
804  if (!HeapTupleIsValid(typtup))
805  elog(ERROR, "cache lookup failed for type %u", att->atttypid);
806  typclass = (Form_pg_type) GETSTRUCT(typtup);
807 
808  /*
809  * Send in binary if requested and type has suitable send function.
810  */
811  if (binary && OidIsValid(typclass->typsend))
812  {
813  bytea *outputbytes;
814  int len;
815 
817  outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
818  len = VARSIZE(outputbytes) - VARHDRSZ;
819  pq_sendint(out, len, 4); /* length */
820  pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
821  pfree(outputbytes);
822  }
823  else
824  {
825  char *outputstr;
826 
828  outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
829  pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
830  pfree(outputstr);
831  }
832 
833  ReleaseSysCache(typtup);
834  }
835 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: postgres.h:327
#define VARDATA(PTR)
Definition: postgres.h:315
#define GETSTRUCT(TUP)
Definition: htup_details.h:654
static void pq_sendint(StringInfo buf, uint32 i, int b)
Definition: pqformat.h:172
#define MaxTupleAttributeNumber
Definition: htup_details.h:33
#define RelationGetDescr(relation)
Definition: rel.h:503
#define VARSIZE(PTR)
Definition: postgres.h:316
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define VARHDRSZ
Definition: c.h:627
#define LOGICALREP_COLUMN_NULL
Definition: logicalproto.h:90
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:92
#define OidIsValid(objectId)
Definition: c.h:710
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:93
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:91
unsigned short uint16
Definition: c.h:440
void pfree(void *pointer)
Definition: mcxt.c:1169
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define ERROR
Definition: elog.h:46
bytea * OidSendFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1672
uint32 t_len
Definition: htup.h:64
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:283
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:207
void pq_sendcountedtext(StringInfo buf, const char *str, int slen, bool countincludesself)
Definition: pqformat.c:142
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1127
uintptr_t Datum
Definition: postgres.h:411
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1175
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
FormData_pg_type * Form_pg_type
Definition: pg_type.h:261
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:1249
static Datum values[MAXATTR]
Definition: bootstrap.c:156
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1653
#define elog(elevel,...)
Definition: elog.h:232
int i
Definition: c.h:621

◆ logicalrep_write_typ()

void logicalrep_write_typ ( StringInfo  out,
TransactionId  xid,
Oid  typoid 
)

Definition at line 708 of file proto.c.

References elog, ERROR, getBaseType(), GETSTRUCT, HeapTupleIsValid, LOGICAL_REP_MSG_TYPE, logicalrep_write_namespace(), NameStr, ObjectIdGetDatum, pq_sendbyte(), pq_sendint32(), pq_sendstring(), ReleaseSysCache(), SearchSysCache1(), TransactionIdIsValid, and TYPEOID.

Referenced by send_relation_and_attrs().

709 {
710  Oid basetypoid = getBaseType(typoid);
711  HeapTuple tup;
712  Form_pg_type typtup;
713 
715 
716  /* transaction ID (if not valid, we're not streaming) */
717  if (TransactionIdIsValid(xid))
718  pq_sendint32(out, xid);
719 
720  tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
721  if (!HeapTupleIsValid(tup))
722  elog(ERROR, "cache lookup failed for type %u", basetypoid);
723  typtup = (Form_pg_type) GETSTRUCT(tup);
724 
725  /* use Oid as relation identifier */
726  pq_sendint32(out, typoid);
727 
728  /* send qualified type name */
729  logicalrep_write_namespace(out, typtup->typnamespace);
730  pq_sendstring(out, NameStr(typtup->typname));
731 
732  ReleaseSysCache(tup);
733 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:654
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:1007
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
unsigned int Oid
Definition: postgres_ext.h:31
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define ERROR
Definition: elog.h:46
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1127
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1175
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
FormData_pg_type * Form_pg_type
Definition: pg_type.h:261
#define elog(elevel,...)
Definition: elog.h:232
#define NameStr(name)
Definition: c.h:681
#define TransactionIdIsValid(xid)
Definition: transam.h:41
Oid getBaseType(Oid typid)
Definition: lsyscache.c:2468

◆ logicalrep_write_update()

void logicalrep_write_update ( StringInfo  out,
TransactionId  xid,
Relation  rel,
HeapTuple  oldtuple,
HeapTuple  newtuple,
bool  binary 
)

Definition at line 444 of file proto.c.

References Assert, LOGICAL_REP_MSG_UPDATE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

446 {
448 
449  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
450  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
451  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
452 
453  /* transaction ID (if not valid, we're not streaming) */
454  if (TransactionIdIsValid(xid))
455  pq_sendint32(out, xid);
456 
457  /* use Oid as relation identifier */
458  pq_sendint32(out, RelationGetRelid(rel));
459 
460  if (oldtuple != NULL)
461  {
462  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
463  pq_sendbyte(out, 'O'); /* old tuple follows */
464  else
465  pq_sendbyte(out, 'K'); /* old key follows */
466  logicalrep_write_tuple(out, rel, oldtuple, binary);
467  }
468 
469  pq_sendbyte(out, 'N'); /* new tuple follows */
470  logicalrep_write_tuple(out, rel, newtuple, binary);
471 }
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
Definition: proto.c:752
Form_pg_class rd_rel
Definition: rel.h:109
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:804
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:477