PostgreSQL Source Code  git master
logicalproto.h File Reference
Include dependency graph for logicalproto.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  LogicalRepTupleData
 
struct  LogicalRepRelation
 
struct  LogicalRepTyp
 
struct  LogicalRepBeginData
 
struct  LogicalRepCommitData
 

Macros

#define LOGICALREP_PROTO_MIN_VERSION_NUM   1
 
#define LOGICALREP_PROTO_VERSION_NUM   1
 
#define LOGICALREP_PROTO_STREAM_VERSION_NUM   2
 
#define LOGICALREP_PROTO_MAX_VERSION_NUM   LOGICALREP_PROTO_STREAM_VERSION_NUM
 
#define LOGICALREP_COLUMN_NULL   'n'
 
#define LOGICALREP_COLUMN_UNCHANGED   'u'
 
#define LOGICALREP_COLUMN_TEXT   't'
 
#define LOGICALREP_COLUMN_BINARY   'b' /* added in PG14 */
 

Typedefs

typedef enum LogicalRepMsgType LogicalRepMsgType
 
typedef struct LogicalRepTupleData LogicalRepTupleData
 
typedef uint32 LogicalRepRelId
 
typedef struct LogicalRepRelation LogicalRepRelation
 
typedef struct LogicalRepTyp LogicalRepTyp
 
typedef struct LogicalRepBeginData LogicalRepBeginData
 
typedef struct LogicalRepCommitData LogicalRepCommitData
 

Enumerations

enum  LogicalRepMsgType {
  LOGICAL_REP_MSG_BEGIN = 'B', LOGICAL_REP_MSG_COMMIT = 'C', LOGICAL_REP_MSG_ORIGIN = 'O', LOGICAL_REP_MSG_INSERT = 'I',
  LOGICAL_REP_MSG_UPDATE = 'U', LOGICAL_REP_MSG_DELETE = 'D', LOGICAL_REP_MSG_TRUNCATE = 'T', LOGICAL_REP_MSG_RELATION = 'R',
  LOGICAL_REP_MSG_TYPE = 'Y', LOGICAL_REP_MSG_STREAM_START = 'S', LOGICAL_REP_MSG_STREAM_END = 'E', LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
  LOGICAL_REP_MSG_STREAM_ABORT = 'A'
}
 

Functions

void logicalrep_write_begin (StringInfo out, ReorderBufferTXN *txn)
 
void logicalrep_read_begin (StringInfo in, LogicalRepBeginData *begin_data)
 
void logicalrep_write_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
void logicalrep_read_commit (StringInfo in, LogicalRepCommitData *commit_data)
 
void logicalrep_write_origin (StringInfo out, const char *origin, XLogRecPtr origin_lsn)
 
char * logicalrep_read_origin (StringInfo in, XLogRecPtr *origin_lsn)
 
void logicalrep_write_insert (StringInfo out, TransactionId xid, Relation rel, HeapTuple newtuple, bool binary)
 
LogicalRepRelId logicalrep_read_insert (StringInfo in, LogicalRepTupleData *newtup)
 
void logicalrep_write_update (StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary)
 
LogicalRepRelId logicalrep_read_update (StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
 
void logicalrep_write_delete (StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, bool binary)
 
LogicalRepRelId logicalrep_read_delete (StringInfo in, LogicalRepTupleData *oldtup)
 
void logicalrep_write_truncate (StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
 
Listlogicalrep_read_truncate (StringInfo in, bool *cascade, bool *restart_seqs)
 
void logicalrep_write_rel (StringInfo out, TransactionId xid, Relation rel)
 
LogicalRepRelationlogicalrep_read_rel (StringInfo in)
 
void logicalrep_write_typ (StringInfo out, TransactionId xid, Oid typoid)
 
void logicalrep_read_typ (StringInfo out, LogicalRepTyp *ltyp)
 
void logicalrep_write_stream_start (StringInfo out, TransactionId xid, bool first_segment)
 
TransactionId logicalrep_read_stream_start (StringInfo in, bool *first_segment)
 
void logicalrep_write_stream_stop (StringInfo out)
 
void logicalrep_write_stream_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
TransactionId logicalrep_read_stream_commit (StringInfo out, LogicalRepCommitData *commit_data)
 
void logicalrep_write_stream_abort (StringInfo out, TransactionId xid, TransactionId subxid)
 
void logicalrep_read_stream_abort (StringInfo in, TransactionId *xid, TransactionId *subxid)
 

Macro Definition Documentation

◆ LOGICALREP_COLUMN_BINARY

#define LOGICALREP_COLUMN_BINARY   'b' /* added in PG14 */

◆ LOGICALREP_COLUMN_NULL

#define LOGICALREP_COLUMN_NULL   'n'

Definition at line 79 of file logicalproto.h.

Referenced by logicalrep_read_tuple(), and logicalrep_write_tuple().

◆ LOGICALREP_COLUMN_TEXT

#define LOGICALREP_COLUMN_TEXT   't'

◆ LOGICALREP_COLUMN_UNCHANGED

#define LOGICALREP_COLUMN_UNCHANGED   'u'

◆ LOGICALREP_PROTO_MAX_VERSION_NUM

#define LOGICALREP_PROTO_MAX_VERSION_NUM   LOGICALREP_PROTO_STREAM_VERSION_NUM

Definition at line 34 of file logicalproto.h.

Referenced by pgoutput_startup().

◆ LOGICALREP_PROTO_MIN_VERSION_NUM

#define LOGICALREP_PROTO_MIN_VERSION_NUM   1

Definition at line 31 of file logicalproto.h.

Referenced by pgoutput_startup().

◆ LOGICALREP_PROTO_STREAM_VERSION_NUM

#define LOGICALREP_PROTO_STREAM_VERSION_NUM   2

Definition at line 33 of file logicalproto.h.

Referenced by ApplyWorkerMain(), and pgoutput_startup().

◆ LOGICALREP_PROTO_VERSION_NUM

#define LOGICALREP_PROTO_VERSION_NUM   1

Definition at line 32 of file logicalproto.h.

Referenced by ApplyWorkerMain().

Typedef Documentation

◆ LogicalRepBeginData

◆ LogicalRepCommitData

◆ LogicalRepMsgType

◆ LogicalRepRelation

◆ LogicalRepRelId

Definition at line 84 of file logicalproto.h.

◆ LogicalRepTupleData

◆ LogicalRepTyp

typedef struct LogicalRepTyp LogicalRepTyp

Enumeration Type Documentation

◆ LogicalRepMsgType

Enumerator
LOGICAL_REP_MSG_BEGIN 
LOGICAL_REP_MSG_COMMIT 
LOGICAL_REP_MSG_ORIGIN 
LOGICAL_REP_MSG_INSERT 
LOGICAL_REP_MSG_UPDATE 
LOGICAL_REP_MSG_DELETE 
LOGICAL_REP_MSG_TRUNCATE 
LOGICAL_REP_MSG_RELATION 
LOGICAL_REP_MSG_TYPE 
LOGICAL_REP_MSG_STREAM_START 
LOGICAL_REP_MSG_STREAM_END 
LOGICAL_REP_MSG_STREAM_COMMIT 
LOGICAL_REP_MSG_STREAM_ABORT 

Definition at line 46 of file logicalproto.h.

Function Documentation

◆ logicalrep_read_begin()

void logicalrep_read_begin ( StringInfo  in,
LogicalRepBeginData begin_data 
)

Definition at line 59 of file proto.c.

References LogicalRepBeginData::committime, elog, ERROR, LogicalRepBeginData::final_lsn, InvalidXLogRecPtr, pq_getmsgint(), pq_getmsgint64(), and LogicalRepBeginData::xid.

Referenced by apply_handle_begin().

60 {
61  /* read fields */
62  begin_data->final_lsn = pq_getmsgint64(in);
63  if (begin_data->final_lsn == InvalidXLogRecPtr)
64  elog(ERROR, "final_lsn not set in begin message");
65  begin_data->committime = pq_getmsgint64(in);
66  begin_data->xid = pq_getmsgint(in, 4);
67 }
TransactionId xid
Definition: logicalproto.h:114
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define ERROR
Definition: elog.h:45
XLogRecPtr final_lsn
Definition: logicalproto.h:112
#define elog(elevel,...)
Definition: elog.h:228
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
TimestampTz committime
Definition: logicalproto.h:113

◆ logicalrep_read_commit()

void logicalrep_read_commit ( StringInfo  in,
LogicalRepCommitData commit_data 
)

Definition at line 94 of file proto.c.

References LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, elog, LogicalRepCommitData::end_lsn, ERROR, pq_getmsgbyte(), and pq_getmsgint64().

Referenced by apply_handle_commit().

95 {
96  /* read flags (unused for now) */
97  uint8 flags = pq_getmsgbyte(in);
98 
99  if (flags != 0)
100  elog(ERROR, "unrecognized flags %u in commit message", flags);
101 
102  /* read fields */
103  commit_data->commit_lsn = pq_getmsgint64(in);
104  commit_data->end_lsn = pq_getmsgint64(in);
105  commit_data->committime = pq_getmsgint64(in);
106 }
unsigned char uint8
Definition: c.h:427
#define ERROR
Definition: elog.h:45
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:228
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
TimestampTz committime
Definition: logicalproto.h:121

◆ logicalrep_read_delete()

LogicalRepRelId logicalrep_read_delete ( StringInfo  in,
LogicalRepTupleData oldtup 
)

Definition at line 289 of file proto.c.

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_delete().

290 {
291  char action;
292  LogicalRepRelId relid;
293 
294  /* read the relation id */
295  relid = pq_getmsgint(in, 4);
296 
297  /* read and verify action */
298  action = pq_getmsgbyte(in);
299  if (action != 'K' && action != 'O')
300  elog(ERROR, "expected action 'O' or 'K', got %c", action);
301 
302  logicalrep_read_tuple(in, oldtup);
303 
304  return relid;
305 }
#define ERROR
Definition: elog.h:45
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:555
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:228
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
uint32 LogicalRepRelId
Definition: logicalproto.h:84

◆ logicalrep_read_insert()

LogicalRepRelId logicalrep_read_insert ( StringInfo  in,
LogicalRepTupleData newtup 
)

Definition at line 163 of file proto.c.

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_insert().

164 {
165  char action;
166  LogicalRepRelId relid;
167 
168  /* read the relation id */
169  relid = pq_getmsgint(in, 4);
170 
171  action = pq_getmsgbyte(in);
172  if (action != 'N')
173  elog(ERROR, "expected new tuple but got %d",
174  action);
175 
176  logicalrep_read_tuple(in, newtup);
177 
178  return relid;
179 }
#define ERROR
Definition: elog.h:45
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:555
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:228
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
uint32 LogicalRepRelId
Definition: logicalproto.h:84

◆ logicalrep_read_origin()

char* logicalrep_read_origin ( StringInfo  in,
XLogRecPtr origin_lsn 
)

Definition at line 128 of file proto.c.

References pq_getmsgint64(), pq_getmsgstring(), and pstrdup().

129 {
130  /* fixed fields */
131  *origin_lsn = pq_getmsgint64(in);
132 
133  /* return origin */
134  return pstrdup(pq_getmsgstring(in));
135 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * pstrdup(const char *in)
Definition: mcxt.c:1187
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455

◆ logicalrep_read_rel()

LogicalRepRelation* logicalrep_read_rel ( StringInfo  in)

Definition at line 397 of file proto.c.

References logicalrep_read_attrs(), logicalrep_read_namespace(), LogicalRepRelation::nspname, palloc(), pq_getmsgbyte(), pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepRelation::relname, LogicalRepRelation::remoteid, and LogicalRepRelation::replident.

Referenced by apply_handle_relation().

398 {
400 
401  rel->remoteid = pq_getmsgint(in, 4);
402 
403  /* Read relation name from stream */
405  rel->relname = pstrdup(pq_getmsgstring(in));
406 
407  /* Read the replica identity. */
408  rel->replident = pq_getmsgbyte(in);
409 
410  /* Get attribute description */
411  logicalrep_read_attrs(in, rel);
412 
413  return rel;
414 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * pstrdup(const char *in)
Definition: mcxt.c:1187
LogicalRepRelId remoteid
Definition: logicalproto.h:90
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
Definition: proto.c:680
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:742
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
void * palloc(Size size)
Definition: mcxt.c:950
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_stream_abort()

void logicalrep_read_stream_abort ( StringInfo  in,
TransactionId xid,
TransactionId subxid 
)

Definition at line 866 of file proto.c.

References Assert, and pq_getmsgint().

Referenced by apply_handle_stream_abort().

868 {
869  Assert(xid && subxid);
870 
871  *xid = pq_getmsgint(in, 4);
872  *subxid = pq_getmsgint(in, 4);
873 }
#define Assert(condition)
Definition: c.h:792
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_stream_commit()

TransactionId logicalrep_read_stream_commit ( StringInfo  out,
LogicalRepCommitData commit_data 
)

Definition at line 824 of file proto.c.

References LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, elog, LogicalRepCommitData::end_lsn, ERROR, pq_getmsgbyte(), pq_getmsgint(), and pq_getmsgint64().

Referenced by apply_handle_stream_commit().

825 {
826  TransactionId xid;
827  uint8 flags;
828 
829  xid = pq_getmsgint(in, 4);
830 
831  /* read flags (unused for now) */
832  flags = pq_getmsgbyte(in);
833 
834  if (flags != 0)
835  elog(ERROR, "unrecognized flags %u in commit message", flags);
836 
837  /* read fields */
838  commit_data->commit_lsn = pq_getmsgint64(in);
839  commit_data->end_lsn = pq_getmsgint64(in);
840  commit_data->committime = pq_getmsgint64(in);
841 
842  return xid;
843 }
uint32 TransactionId
Definition: c.h:575
unsigned char uint8
Definition: c.h:427
#define ERROR
Definition: elog.h:45
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:228
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
TimestampTz committime
Definition: logicalproto.h:121

◆ logicalrep_read_stream_start()

TransactionId logicalrep_read_stream_start ( StringInfo  in,
bool first_segment 
)

Definition at line 774 of file proto.c.

References Assert, pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_stream_start().

775 {
776  TransactionId xid;
777 
778  Assert(first_segment);
779 
780  xid = pq_getmsgint(in, 4);
781  *first_segment = (pq_getmsgbyte(in) == 1);
782 
783  return xid;
784 }
uint32 TransactionId
Definition: c.h:575
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define Assert(condition)
Definition: c.h:792
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_truncate()

List* logicalrep_read_truncate ( StringInfo  in,
bool cascade,
bool restart_seqs 
)

Definition at line 343 of file proto.c.

References i, lappend_oid(), NIL, pq_getmsgint(), TRUNCATE_CASCADE, and TRUNCATE_RESTART_SEQS.

Referenced by apply_handle_truncate().

345 {
346  int i;
347  int nrelids;
348  List *relids = NIL;
349  uint8 flags;
350 
351  nrelids = pq_getmsgint(in, 4);
352 
353  /* read and decode truncate flags */
354  flags = pq_getmsgint(in, 1);
355  *cascade = (flags & TRUNCATE_CASCADE) > 0;
356  *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
357 
358  for (i = 0; i < nrelids; i++)
359  relids = lappend_oid(relids, pq_getmsgint(in, 4));
360 
361  return relids;
362 }
#define NIL
Definition: pg_list.h:65
unsigned char uint8
Definition: c.h:427
List * lappend_oid(List *list, Oid datum)
Definition: list.c:372
int i
#define TRUNCATE_CASCADE
Definition: proto.c:28
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
Definition: pg_list.h:50
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:29

◆ logicalrep_read_typ()

void logicalrep_read_typ ( StringInfo  out,
LogicalRepTyp ltyp 
)

Definition at line 453 of file proto.c.

References logicalrep_read_namespace(), LogicalRepTyp::nspname, pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepTyp::remoteid, and LogicalRepTyp::typname.

Referenced by apply_handle_type().

454 {
455  ltyp->remoteid = pq_getmsgint(in, 4);
456 
457  /* Read type name from stream */
459  ltyp->typname = pstrdup(pq_getmsgstring(in));
460 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * pstrdup(const char *in)
Definition: mcxt.c:1187
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:742
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_update()

LogicalRepRelId logicalrep_read_update ( StringInfo  in,
bool has_oldtuple,
LogicalRepTupleData oldtup,
LogicalRepTupleData newtup 
)

Definition at line 218 of file proto.c.

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_update().

221 {
222  char action;
223  LogicalRepRelId relid;
224 
225  /* read the relation id */
226  relid = pq_getmsgint(in, 4);
227 
228  /* read and verify action */
229  action = pq_getmsgbyte(in);
230  if (action != 'K' && action != 'O' && action != 'N')
231  elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
232  action);
233 
234  /* check for old tuple */
235  if (action == 'K' || action == 'O')
236  {
237  logicalrep_read_tuple(in, oldtup);
238  *has_oldtuple = true;
239 
240  action = pq_getmsgbyte(in);
241  }
242  else
243  *has_oldtuple = false;
244 
245  /* check for new tuple */
246  if (action != 'N')
247  elog(ERROR, "expected action 'N', got %c",
248  action);
249 
250  logicalrep_read_tuple(in, newtup);
251 
252  return relid;
253 }
#define ERROR
Definition: elog.h:45
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:555
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:228
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
uint32 LogicalRepRelId
Definition: logicalproto.h:84

◆ logicalrep_write_begin()

void logicalrep_write_begin ( StringInfo  out,
ReorderBufferTXN txn 
)

Definition at line 45 of file proto.c.

References ReorderBufferTXN::commit_time, ReorderBufferTXN::final_lsn, LOGICAL_REP_MSG_BEGIN, pq_sendbyte(), pq_sendint32(), pq_sendint64(), and ReorderBufferTXN::xid.

Referenced by pgoutput_begin_txn().

46 {
48 
49  /* fixed fields */
50  pq_sendint64(out, txn->final_lsn);
51  pq_sendint64(out, txn->commit_time);
52  pq_sendint32(out, txn->xid);
53 }
TimestampTz commit_time
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
XLogRecPtr final_lsn
TransactionId xid

◆ logicalrep_write_commit()

void logicalrep_write_commit ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 74 of file proto.c.

References ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, LOGICAL_REP_MSG_COMMIT, pq_sendbyte(), and pq_sendint64().

Referenced by pgoutput_commit_txn().

76 {
77  uint8 flags = 0;
78 
80 
81  /* send the flags field (unused for now) */
82  pq_sendbyte(out, flags);
83 
84  /* send fields */
85  pq_sendint64(out, commit_lsn);
86  pq_sendint64(out, txn->end_lsn);
87  pq_sendint64(out, txn->commit_time);
88 }
TimestampTz commit_time
unsigned char uint8
Definition: c.h:427
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
XLogRecPtr end_lsn

◆ logicalrep_write_delete()

void logicalrep_write_delete ( StringInfo  out,
TransactionId  xid,
Relation  rel,
HeapTuple  oldtuple,
bool  binary 
)

Definition at line 259 of file proto.c.

References Assert, LOGICAL_REP_MSG_DELETE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

261 {
262  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
263  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
264  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
265 
267 
268  /* transaction ID (if not valid, we're not streaming) */
269  if (TransactionIdIsValid(xid))
270  pq_sendint32(out, xid);
271 
272  /* use Oid as relation identifier */
273  pq_sendint32(out, RelationGetRelid(rel));
274 
275  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
276  pq_sendbyte(out, 'O'); /* old tuple follows */
277  else
278  pq_sendbyte(out, 'K'); /* old key follows */
279 
280  logicalrep_write_tuple(out, rel, oldtuple, binary);
281 }
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
Definition: proto.c:466
Form_pg_class rd_rel
Definition: rel.h:110
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:792
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:457

◆ logicalrep_write_insert()

void logicalrep_write_insert ( StringInfo  out,
TransactionId  xid,
Relation  rel,
HeapTuple  newtuple,
bool  binary 
)

Definition at line 141 of file proto.c.

References LOGICAL_REP_MSG_INSERT, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

143 {
145 
146  /* transaction ID (if not valid, we're not streaming) */
147  if (TransactionIdIsValid(xid))
148  pq_sendint32(out, xid);
149 
150  /* use Oid as relation identifier */
151  pq_sendint32(out, RelationGetRelid(rel));
152 
153  pq_sendbyte(out, 'N'); /* new tuple follows */
154  logicalrep_write_tuple(out, rel, newtuple, binary);
155 }
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
Definition: proto.c:466
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:457

◆ logicalrep_write_origin()

void logicalrep_write_origin ( StringInfo  out,
const char *  origin,
XLogRecPtr  origin_lsn 
)

Definition at line 112 of file proto.c.

References LOGICAL_REP_MSG_ORIGIN, pq_sendbyte(), pq_sendint64(), and pq_sendstring().

Referenced by pgoutput_begin_txn(), and pgoutput_stream_start().

114 {
116 
117  /* fixed fields */
118  pq_sendint64(out, origin_lsn);
119 
120  /* origin string */
121  pq_sendstring(out, origin);
122 }
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161

◆ logicalrep_write_rel()

void logicalrep_write_rel ( StringInfo  out,
TransactionId  xid,
Relation  rel 
)

Definition at line 368 of file proto.c.

References LOGICAL_REP_MSG_RELATION, logicalrep_write_attrs(), logicalrep_write_namespace(), pq_sendbyte(), pq_sendint32(), pq_sendstring(), RelationData::rd_rel, RelationGetNamespace, RelationGetRelationName, RelationGetRelid, relname, and TransactionIdIsValid.

Referenced by send_relation_and_attrs().

369 {
370  char *relname;
371 
373 
374  /* transaction ID (if not valid, we're not streaming) */
375  if (TransactionIdIsValid(xid))
376  pq_sendint32(out, xid);
377 
378  /* use Oid as relation identifier */
379  pq_sendint32(out, RelationGetRelid(rel));
380 
381  /* send qualified relation name */
383  relname = RelationGetRelationName(rel);
384  pq_sendstring(out, relname);
385 
386  /* send replica identity */
387  pq_sendbyte(out, rel->rd_rel->relreplident);
388 
389  /* send the attribute info */
390  logicalrep_write_attrs(out, rel);
391 }
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:722
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
Form_pg_class rd_rel
Definition: rel.h:110
NameData relname
Definition: pg_class.h:38
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static void logicalrep_write_attrs(StringInfo out, Relation rel)
Definition: proto.c:621
#define RelationGetRelationName(relation)
Definition: rel.h:491
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:457
#define RelationGetNamespace(relation)
Definition: rel.h:498

◆ logicalrep_write_stream_abort()

void logicalrep_write_stream_abort ( StringInfo  out,
TransactionId  xid,
TransactionId  subxid 
)

Definition at line 850 of file proto.c.

References Assert, LOGICAL_REP_MSG_STREAM_ABORT, pq_sendbyte(), pq_sendint32(), and TransactionIdIsValid.

Referenced by pgoutput_stream_abort().

852 {
854 
856 
857  /* transaction ID */
858  pq_sendint32(out, xid);
859  pq_sendint32(out, subxid);
860 }
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:792
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ logicalrep_write_stream_commit()

void logicalrep_write_stream_commit ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 799 of file proto.c.

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, LOGICAL_REP_MSG_STREAM_COMMIT, pq_sendbyte(), pq_sendint32(), pq_sendint64(), TransactionIdIsValid, and ReorderBufferTXN::xid.

Referenced by pgoutput_stream_commit().

801 {
802  uint8 flags = 0;
803 
805 
807 
808  /* transaction ID */
809  pq_sendint32(out, txn->xid);
810 
811  /* send the flags field (unused for now) */
812  pq_sendbyte(out, flags);
813 
814  /* send fields */
815  pq_sendint64(out, commit_lsn);
816  pq_sendint64(out, txn->end_lsn);
817  pq_sendint64(out, txn->commit_time);
818 }
TimestampTz commit_time
unsigned char uint8
Definition: c.h:427
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
TransactionId xid
#define Assert(condition)
Definition: c.h:792
XLogRecPtr end_lsn
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ logicalrep_write_stream_start()

void logicalrep_write_stream_start ( StringInfo  out,
TransactionId  xid,
bool  first_segment 
)

Definition at line 756 of file proto.c.

References Assert, LOGICAL_REP_MSG_STREAM_START, pq_sendbyte(), pq_sendint32(), and TransactionIdIsValid.

Referenced by pgoutput_stream_start().

758 {
760 
762 
763  /* transaction ID (we're starting to stream, so must be valid) */
764  pq_sendint32(out, xid);
765 
766  /* 1 if this is the first streaming segment for this xid */
767  pq_sendbyte(out, first_segment ? 1 : 0);
768 }
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:792
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ logicalrep_write_stream_stop()

void logicalrep_write_stream_stop ( StringInfo  out)

Definition at line 790 of file proto.c.

References LOGICAL_REP_MSG_STREAM_END, and pq_sendbyte().

Referenced by pgoutput_stream_stop().

791 {
793 }
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161

◆ logicalrep_write_truncate()

void logicalrep_write_truncate ( StringInfo  out,
TransactionId  xid,
int  nrelids,
Oid  relids[],
bool  cascade,
bool  restart_seqs 
)

Definition at line 311 of file proto.c.

References i, LOGICAL_REP_MSG_TRUNCATE, pq_sendbyte(), pq_sendint32(), pq_sendint8(), TransactionIdIsValid, TRUNCATE_CASCADE, and TRUNCATE_RESTART_SEQS.

Referenced by pgoutput_truncate().

316 {
317  int i;
318  uint8 flags = 0;
319 
321 
322  /* transaction ID (if not valid, we're not streaming) */
323  if (TransactionIdIsValid(xid))
324  pq_sendint32(out, xid);
325 
326  pq_sendint32(out, nrelids);
327 
328  /* encode and send truncate flags */
329  if (cascade)
330  flags |= TRUNCATE_CASCADE;
331  if (restart_seqs)
332  flags |= TRUNCATE_RESTART_SEQS;
333  pq_sendint8(out, flags);
334 
335  for (i = 0; i < nrelids; i++)
336  pq_sendint32(out, relids[i]);
337 }
unsigned char uint8
Definition: c.h:427
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
int i
#define TRUNCATE_CASCADE
Definition: proto.c:28
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static void pq_sendint8(StringInfo buf, uint8 i)
Definition: pqformat.h:129
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:29

◆ logicalrep_write_typ()

void logicalrep_write_typ ( StringInfo  out,
TransactionId  xid,
Oid  typoid 
)

Definition at line 422 of file proto.c.

References elog, ERROR, getBaseType(), GETSTRUCT, HeapTupleIsValid, LOGICAL_REP_MSG_TYPE, logicalrep_write_namespace(), NameStr, ObjectIdGetDatum, pq_sendbyte(), pq_sendint32(), pq_sendstring(), ReleaseSysCache(), SearchSysCache1(), TransactionIdIsValid, and TYPEOID.

Referenced by send_relation_and_attrs().

423 {
424  Oid basetypoid = getBaseType(typoid);
425  HeapTuple tup;
426  Form_pg_type typtup;
427 
429 
430  /* transaction ID (if not valid, we're not streaming) */
431  if (TransactionIdIsValid(xid))
432  pq_sendint32(out, xid);
433 
434  tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
435  if (!HeapTupleIsValid(tup))
436  elog(ERROR, "cache lookup failed for type %u", basetypoid);
437  typtup = (Form_pg_type) GETSTRUCT(tup);
438 
439  /* use Oid as relation identifier */
440  pq_sendint32(out, typoid);
441 
442  /* send qualified type name */
443  logicalrep_write_namespace(out, typtup->typnamespace);
444  pq_sendstring(out, NameStr(typtup->typname));
445 
446  ReleaseSysCache(tup);
447 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:722
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
unsigned int Oid
Definition: postgres_ext.h:31
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:45
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1127
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1175
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
FormData_pg_type * Form_pg_type
Definition: pg_type.h:261
#define elog(elevel,...)
Definition: elog.h:228
#define NameStr(name)
Definition: c.h:669
#define TransactionIdIsValid(xid)
Definition: transam.h:41
Oid getBaseType(Oid typid)
Definition: lsyscache.c:2441

◆ logicalrep_write_update()

void logicalrep_write_update ( StringInfo  out,
TransactionId  xid,
Relation  rel,
HeapTuple  oldtuple,
HeapTuple  newtuple,
bool  binary 
)

Definition at line 185 of file proto.c.

References Assert, LOGICAL_REP_MSG_UPDATE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

187 {
189 
190  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
191  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
192  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
193 
194  /* transaction ID (if not valid, we're not streaming) */
195  if (TransactionIdIsValid(xid))
196  pq_sendint32(out, xid);
197 
198  /* use Oid as relation identifier */
199  pq_sendint32(out, RelationGetRelid(rel));
200 
201  if (oldtuple != NULL)
202  {
203  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
204  pq_sendbyte(out, 'O'); /* old tuple follows */
205  else
206  pq_sendbyte(out, 'K'); /* old key follows */
207  logicalrep_write_tuple(out, rel, oldtuple, binary);
208  }
209 
210  pq_sendbyte(out, 'N'); /* new tuple follows */
211  logicalrep_write_tuple(out, rel, newtuple, binary);
212 }
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
Definition: proto.c:466
Form_pg_class rd_rel
Definition: rel.h:110
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:792
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:457