PostgreSQL Source Code  git master
logicalproto.h File Reference
Include dependency graph for logicalproto.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  LogicalRepTupleData
 
struct  LogicalRepRelation
 
struct  LogicalRepTyp
 
struct  LogicalRepBeginData
 
struct  LogicalRepCommitData
 

Macros

#define LOGICALREP_PROTO_MIN_VERSION_NUM   1
 
#define LOGICALREP_PROTO_VERSION_NUM   1
 
#define LOGICALREP_PROTO_STREAM_VERSION_NUM   2
 
#define LOGICALREP_PROTO_MAX_VERSION_NUM   LOGICALREP_PROTO_STREAM_VERSION_NUM
 
#define LOGICALREP_COLUMN_NULL   'n'
 
#define LOGICALREP_COLUMN_UNCHANGED   'u'
 
#define LOGICALREP_COLUMN_TEXT   't'
 
#define LOGICALREP_COLUMN_BINARY   'b' /* added in PG14 */
 

Typedefs

typedef enum LogicalRepMsgType LogicalRepMsgType
 
typedef struct LogicalRepTupleData LogicalRepTupleData
 
typedef uint32 LogicalRepRelId
 
typedef struct LogicalRepRelation LogicalRepRelation
 
typedef struct LogicalRepTyp LogicalRepTyp
 
typedef struct LogicalRepBeginData LogicalRepBeginData
 
typedef struct LogicalRepCommitData LogicalRepCommitData
 

Enumerations

enum  LogicalRepMsgType {
  LOGICAL_REP_MSG_BEGIN = 'B', LOGICAL_REP_MSG_COMMIT = 'C', LOGICAL_REP_MSG_ORIGIN = 'O', LOGICAL_REP_MSG_INSERT = 'I',
  LOGICAL_REP_MSG_UPDATE = 'U', LOGICAL_REP_MSG_DELETE = 'D', LOGICAL_REP_MSG_TRUNCATE = 'T', LOGICAL_REP_MSG_RELATION = 'R',
  LOGICAL_REP_MSG_TYPE = 'Y', LOGICAL_REP_MSG_MESSAGE = 'M', LOGICAL_REP_MSG_STREAM_START = 'S', LOGICAL_REP_MSG_STREAM_END = 'E',
  LOGICAL_REP_MSG_STREAM_COMMIT = 'c', LOGICAL_REP_MSG_STREAM_ABORT = 'A'
}
 

Functions

void logicalrep_write_begin (StringInfo out, ReorderBufferTXN *txn)
 
void logicalrep_read_begin (StringInfo in, LogicalRepBeginData *begin_data)
 
void logicalrep_write_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
void logicalrep_read_commit (StringInfo in, LogicalRepCommitData *commit_data)
 
void logicalrep_write_origin (StringInfo out, const char *origin, XLogRecPtr origin_lsn)
 
char * logicalrep_read_origin (StringInfo in, XLogRecPtr *origin_lsn)
 
void logicalrep_write_insert (StringInfo out, TransactionId xid, Relation rel, HeapTuple newtuple, bool binary)
 
LogicalRepRelId logicalrep_read_insert (StringInfo in, LogicalRepTupleData *newtup)
 
void logicalrep_write_update (StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary)
 
LogicalRepRelId logicalrep_read_update (StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
 
void logicalrep_write_delete (StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, bool binary)
 
LogicalRepRelId logicalrep_read_delete (StringInfo in, LogicalRepTupleData *oldtup)
 
void logicalrep_write_truncate (StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
 
Listlogicalrep_read_truncate (StringInfo in, bool *cascade, bool *restart_seqs)
 
void logicalrep_write_message (StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
void logicalrep_write_rel (StringInfo out, TransactionId xid, Relation rel)
 
LogicalRepRelationlogicalrep_read_rel (StringInfo in)
 
void logicalrep_write_typ (StringInfo out, TransactionId xid, Oid typoid)
 
void logicalrep_read_typ (StringInfo out, LogicalRepTyp *ltyp)
 
void logicalrep_write_stream_start (StringInfo out, TransactionId xid, bool first_segment)
 
TransactionId logicalrep_read_stream_start (StringInfo in, bool *first_segment)
 
void logicalrep_write_stream_stop (StringInfo out)
 
void logicalrep_write_stream_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
TransactionId logicalrep_read_stream_commit (StringInfo out, LogicalRepCommitData *commit_data)
 
void logicalrep_write_stream_abort (StringInfo out, TransactionId xid, TransactionId subxid)
 
void logicalrep_read_stream_abort (StringInfo in, TransactionId *xid, TransactionId *subxid)
 

Macro Definition Documentation

◆ LOGICALREP_COLUMN_BINARY

#define LOGICALREP_COLUMN_BINARY   'b' /* added in PG14 */

◆ LOGICALREP_COLUMN_NULL

#define LOGICALREP_COLUMN_NULL   'n'

Definition at line 80 of file logicalproto.h.

Referenced by logicalrep_read_tuple(), and logicalrep_write_tuple().

◆ LOGICALREP_COLUMN_TEXT

#define LOGICALREP_COLUMN_TEXT   't'

◆ LOGICALREP_COLUMN_UNCHANGED

#define LOGICALREP_COLUMN_UNCHANGED   'u'

◆ LOGICALREP_PROTO_MAX_VERSION_NUM

#define LOGICALREP_PROTO_MAX_VERSION_NUM   LOGICALREP_PROTO_STREAM_VERSION_NUM

Definition at line 34 of file logicalproto.h.

Referenced by pgoutput_startup().

◆ LOGICALREP_PROTO_MIN_VERSION_NUM

#define LOGICALREP_PROTO_MIN_VERSION_NUM   1

Definition at line 31 of file logicalproto.h.

Referenced by pgoutput_startup().

◆ LOGICALREP_PROTO_STREAM_VERSION_NUM

#define LOGICALREP_PROTO_STREAM_VERSION_NUM   2

Definition at line 33 of file logicalproto.h.

Referenced by ApplyWorkerMain(), and pgoutput_startup().

◆ LOGICALREP_PROTO_VERSION_NUM

#define LOGICALREP_PROTO_VERSION_NUM   1

Definition at line 32 of file logicalproto.h.

Referenced by ApplyWorkerMain().

Typedef Documentation

◆ LogicalRepBeginData

◆ LogicalRepCommitData

◆ LogicalRepMsgType

◆ LogicalRepRelation

◆ LogicalRepRelId

Definition at line 85 of file logicalproto.h.

◆ LogicalRepTupleData

◆ LogicalRepTyp

typedef struct LogicalRepTyp LogicalRepTyp

Enumeration Type Documentation

◆ LogicalRepMsgType

Enumerator
LOGICAL_REP_MSG_BEGIN 
LOGICAL_REP_MSG_COMMIT 
LOGICAL_REP_MSG_ORIGIN 
LOGICAL_REP_MSG_INSERT 
LOGICAL_REP_MSG_UPDATE 
LOGICAL_REP_MSG_DELETE 
LOGICAL_REP_MSG_TRUNCATE 
LOGICAL_REP_MSG_RELATION 
LOGICAL_REP_MSG_TYPE 
LOGICAL_REP_MSG_MESSAGE 
LOGICAL_REP_MSG_STREAM_START 
LOGICAL_REP_MSG_STREAM_END 
LOGICAL_REP_MSG_STREAM_COMMIT 
LOGICAL_REP_MSG_STREAM_ABORT 

Definition at line 46 of file logicalproto.h.

Function Documentation

◆ logicalrep_read_begin()

void logicalrep_read_begin ( StringInfo  in,
LogicalRepBeginData begin_data 
)

Definition at line 60 of file proto.c.

References LogicalRepBeginData::committime, elog, ERROR, LogicalRepBeginData::final_lsn, InvalidXLogRecPtr, pq_getmsgint(), pq_getmsgint64(), and LogicalRepBeginData::xid.

Referenced by apply_handle_begin().

61 {
62  /* read fields */
63  begin_data->final_lsn = pq_getmsgint64(in);
64  if (begin_data->final_lsn == InvalidXLogRecPtr)
65  elog(ERROR, "final_lsn not set in begin message");
66  begin_data->committime = pq_getmsgint64(in);
67  begin_data->xid = pq_getmsgint(in, 4);
68 }
TransactionId xid
Definition: logicalproto.h:115
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define ERROR
Definition: elog.h:46
XLogRecPtr final_lsn
Definition: logicalproto.h:113
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
TimestampTz committime
Definition: logicalproto.h:114

◆ logicalrep_read_commit()

void logicalrep_read_commit ( StringInfo  in,
LogicalRepCommitData commit_data 
)

Definition at line 95 of file proto.c.

References LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, elog, LogicalRepCommitData::end_lsn, ERROR, pq_getmsgbyte(), and pq_getmsgint64().

Referenced by apply_handle_commit().

96 {
97  /* read flags (unused for now) */
98  uint8 flags = pq_getmsgbyte(in);
99 
100  if (flags != 0)
101  elog(ERROR, "unrecognized flags %u in commit message", flags);
102 
103  /* read fields */
104  commit_data->commit_lsn = pq_getmsgint64(in);
105  commit_data->end_lsn = pq_getmsgint64(in);
106  commit_data->committime = pq_getmsgint64(in);
107 }
unsigned char uint8
Definition: c.h:439
#define ERROR
Definition: elog.h:46
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
TimestampTz committime
Definition: logicalproto.h:122

◆ logicalrep_read_delete()

LogicalRepRelId logicalrep_read_delete ( StringInfo  in,
LogicalRepTupleData oldtup 
)

Definition at line 290 of file proto.c.

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_delete().

291 {
292  char action;
293  LogicalRepRelId relid;
294 
295  /* read the relation id */
296  relid = pq_getmsgint(in, 4);
297 
298  /* read and verify action */
299  action = pq_getmsgbyte(in);
300  if (action != 'K' && action != 'O')
301  elog(ERROR, "expected action 'O' or 'K', got %c", action);
302 
303  logicalrep_read_tuple(in, oldtup);
304 
305  return relid;
306 }
#define ERROR
Definition: elog.h:46
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:583
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:232
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
uint32 LogicalRepRelId
Definition: logicalproto.h:85

◆ logicalrep_read_insert()

LogicalRepRelId logicalrep_read_insert ( StringInfo  in,
LogicalRepTupleData newtup 
)

Definition at line 164 of file proto.c.

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_insert().

165 {
166  char action;
167  LogicalRepRelId relid;
168 
169  /* read the relation id */
170  relid = pq_getmsgint(in, 4);
171 
172  action = pq_getmsgbyte(in);
173  if (action != 'N')
174  elog(ERROR, "expected new tuple but got %d",
175  action);
176 
177  logicalrep_read_tuple(in, newtup);
178 
179  return relid;
180 }
#define ERROR
Definition: elog.h:46
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:583
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:232
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
uint32 LogicalRepRelId
Definition: logicalproto.h:85

◆ logicalrep_read_origin()

char* logicalrep_read_origin ( StringInfo  in,
XLogRecPtr origin_lsn 
)

Definition at line 129 of file proto.c.

References pq_getmsgint64(), pq_getmsgstring(), and pstrdup().

130 {
131  /* fixed fields */
132  *origin_lsn = pq_getmsgint64(in);
133 
134  /* return origin */
135  return pstrdup(pq_getmsgstring(in));
136 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * pstrdup(const char *in)
Definition: mcxt.c:1299
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455

◆ logicalrep_read_rel()

LogicalRepRelation* logicalrep_read_rel ( StringInfo  in)

Definition at line 425 of file proto.c.

References logicalrep_read_attrs(), logicalrep_read_namespace(), LogicalRepRelation::nspname, palloc(), pq_getmsgbyte(), pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepRelation::relname, LogicalRepRelation::remoteid, and LogicalRepRelation::replident.

Referenced by apply_handle_relation().

426 {
428 
429  rel->remoteid = pq_getmsgint(in, 4);
430 
431  /* Read relation name from stream */
433  rel->relname = pstrdup(pq_getmsgstring(in));
434 
435  /* Read the replica identity. */
436  rel->replident = pq_getmsgbyte(in);
437 
438  /* Get attribute description */
439  logicalrep_read_attrs(in, rel);
440 
441  return rel;
442 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * pstrdup(const char *in)
Definition: mcxt.c:1299
LogicalRepRelId remoteid
Definition: logicalproto.h:91
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
Definition: proto.c:708
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:770
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
void * palloc(Size size)
Definition: mcxt.c:1062
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_stream_abort()

void logicalrep_read_stream_abort ( StringInfo  in,
TransactionId xid,
TransactionId subxid 
)

Definition at line 894 of file proto.c.

References Assert, and pq_getmsgint().

Referenced by apply_handle_stream_abort().

896 {
897  Assert(xid && subxid);
898 
899  *xid = pq_getmsgint(in, 4);
900  *subxid = pq_getmsgint(in, 4);
901 }
#define Assert(condition)
Definition: c.h:804
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_stream_commit()

TransactionId logicalrep_read_stream_commit ( StringInfo  out,
LogicalRepCommitData commit_data 
)

Definition at line 852 of file proto.c.

References LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, elog, LogicalRepCommitData::end_lsn, ERROR, pq_getmsgbyte(), pq_getmsgint(), and pq_getmsgint64().

Referenced by apply_handle_stream_commit().

853 {
854  TransactionId xid;
855  uint8 flags;
856 
857  xid = pq_getmsgint(in, 4);
858 
859  /* read flags (unused for now) */
860  flags = pq_getmsgbyte(in);
861 
862  if (flags != 0)
863  elog(ERROR, "unrecognized flags %u in commit message", flags);
864 
865  /* read fields */
866  commit_data->commit_lsn = pq_getmsgint64(in);
867  commit_data->end_lsn = pq_getmsgint64(in);
868  commit_data->committime = pq_getmsgint64(in);
869 
870  return xid;
871 }
uint32 TransactionId
Definition: c.h:587
unsigned char uint8
Definition: c.h:439
#define ERROR
Definition: elog.h:46
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
TimestampTz committime
Definition: logicalproto.h:122

◆ logicalrep_read_stream_start()

TransactionId logicalrep_read_stream_start ( StringInfo  in,
bool first_segment 
)

Definition at line 802 of file proto.c.

References Assert, pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_stream_start().

803 {
804  TransactionId xid;
805 
806  Assert(first_segment);
807 
808  xid = pq_getmsgint(in, 4);
809  *first_segment = (pq_getmsgbyte(in) == 1);
810 
811  return xid;
812 }
uint32 TransactionId
Definition: c.h:587
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define Assert(condition)
Definition: c.h:804
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_truncate()

List* logicalrep_read_truncate ( StringInfo  in,
bool cascade,
bool restart_seqs 
)

Definition at line 344 of file proto.c.

References i, lappend_oid(), NIL, pq_getmsgint(), TRUNCATE_CASCADE, and TRUNCATE_RESTART_SEQS.

Referenced by apply_handle_truncate().

346 {
347  int i;
348  int nrelids;
349  List *relids = NIL;
350  uint8 flags;
351 
352  nrelids = pq_getmsgint(in, 4);
353 
354  /* read and decode truncate flags */
355  flags = pq_getmsgint(in, 1);
356  *cascade = (flags & TRUNCATE_CASCADE) > 0;
357  *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
358 
359  for (i = 0; i < nrelids; i++)
360  relids = lappend_oid(relids, pq_getmsgint(in, 4));
361 
362  return relids;
363 }
#define NIL
Definition: pg_list.h:65
unsigned char uint8
Definition: c.h:439
List * lappend_oid(List *list, Oid datum)
Definition: list.c:372
int i
#define TRUNCATE_CASCADE
Definition: proto.c:29
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
Definition: pg_list.h:50
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:30

◆ logicalrep_read_typ()

void logicalrep_read_typ ( StringInfo  out,
LogicalRepTyp ltyp 
)

Definition at line 481 of file proto.c.

References logicalrep_read_namespace(), LogicalRepTyp::nspname, pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepTyp::remoteid, and LogicalRepTyp::typname.

Referenced by apply_handle_type().

482 {
483  ltyp->remoteid = pq_getmsgint(in, 4);
484 
485  /* Read type name from stream */
487  ltyp->typname = pstrdup(pq_getmsgstring(in));
488 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * pstrdup(const char *in)
Definition: mcxt.c:1299
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:770
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_update()

LogicalRepRelId logicalrep_read_update ( StringInfo  in,
bool has_oldtuple,
LogicalRepTupleData oldtup,
LogicalRepTupleData newtup 
)

Definition at line 219 of file proto.c.

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_update().

222 {
223  char action;
224  LogicalRepRelId relid;
225 
226  /* read the relation id */
227  relid = pq_getmsgint(in, 4);
228 
229  /* read and verify action */
230  action = pq_getmsgbyte(in);
231  if (action != 'K' && action != 'O' && action != 'N')
232  elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
233  action);
234 
235  /* check for old tuple */
236  if (action == 'K' || action == 'O')
237  {
238  logicalrep_read_tuple(in, oldtup);
239  *has_oldtuple = true;
240 
241  action = pq_getmsgbyte(in);
242  }
243  else
244  *has_oldtuple = false;
245 
246  /* check for new tuple */
247  if (action != 'N')
248  elog(ERROR, "expected action 'N', got %c",
249  action);
250 
251  logicalrep_read_tuple(in, newtup);
252 
253  return relid;
254 }
#define ERROR
Definition: elog.h:46
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:583
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:232
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
uint32 LogicalRepRelId
Definition: logicalproto.h:85

◆ logicalrep_write_begin()

void logicalrep_write_begin ( StringInfo  out,
ReorderBufferTXN txn 
)

Definition at line 46 of file proto.c.

References ReorderBufferTXN::commit_time, ReorderBufferTXN::final_lsn, LOGICAL_REP_MSG_BEGIN, pq_sendbyte(), pq_sendint32(), pq_sendint64(), and ReorderBufferTXN::xid.

Referenced by pgoutput_begin_txn().

47 {
49 
50  /* fixed fields */
51  pq_sendint64(out, txn->final_lsn);
52  pq_sendint64(out, txn->commit_time);
53  pq_sendint32(out, txn->xid);
54 }
TimestampTz commit_time
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
XLogRecPtr final_lsn
TransactionId xid

◆ logicalrep_write_commit()

void logicalrep_write_commit ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 75 of file proto.c.

References ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, LOGICAL_REP_MSG_COMMIT, pq_sendbyte(), and pq_sendint64().

Referenced by pgoutput_commit_txn().

77 {
78  uint8 flags = 0;
79 
81 
82  /* send the flags field (unused for now) */
83  pq_sendbyte(out, flags);
84 
85  /* send fields */
86  pq_sendint64(out, commit_lsn);
87  pq_sendint64(out, txn->end_lsn);
88  pq_sendint64(out, txn->commit_time);
89 }
TimestampTz commit_time
unsigned char uint8
Definition: c.h:439
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
XLogRecPtr end_lsn

◆ logicalrep_write_delete()

void logicalrep_write_delete ( StringInfo  out,
TransactionId  xid,
Relation  rel,
HeapTuple  oldtuple,
bool  binary 
)

Definition at line 260 of file proto.c.

References Assert, LOGICAL_REP_MSG_DELETE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

262 {
263  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
264  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
265  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
266 
268 
269  /* transaction ID (if not valid, we're not streaming) */
270  if (TransactionIdIsValid(xid))
271  pq_sendint32(out, xid);
272 
273  /* use Oid as relation identifier */
274  pq_sendint32(out, RelationGetRelid(rel));
275 
276  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
277  pq_sendbyte(out, 'O'); /* old tuple follows */
278  else
279  pq_sendbyte(out, 'K'); /* old key follows */
280 
281  logicalrep_write_tuple(out, rel, oldtuple, binary);
282 }
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
Definition: proto.c:494
Form_pg_class rd_rel
Definition: rel.h:110
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:804
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:457

◆ logicalrep_write_insert()

void logicalrep_write_insert ( StringInfo  out,
TransactionId  xid,
Relation  rel,
HeapTuple  newtuple,
bool  binary 
)

Definition at line 142 of file proto.c.

References LOGICAL_REP_MSG_INSERT, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

144 {
146 
147  /* transaction ID (if not valid, we're not streaming) */
148  if (TransactionIdIsValid(xid))
149  pq_sendint32(out, xid);
150 
151  /* use Oid as relation identifier */
152  pq_sendint32(out, RelationGetRelid(rel));
153 
154  pq_sendbyte(out, 'N'); /* new tuple follows */
155  logicalrep_write_tuple(out, rel, newtuple, binary);
156 }
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
Definition: proto.c:494
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:457

◆ logicalrep_write_message()

void logicalrep_write_message ( StringInfo  out,
TransactionId  xid,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)

Definition at line 369 of file proto.c.

References LOGICAL_REP_MSG_MESSAGE, MESSAGE_TRANSACTIONAL, pq_sendbyte(), pq_sendbytes(), pq_sendint32(), pq_sendint64(), pq_sendint8(), pq_sendstring(), and TransactionIdIsValid.

Referenced by pgoutput_message().

372 {
373  uint8 flags = 0;
374 
376 
377  /* encode and send message flags */
378  if (transactional)
379  flags |= MESSAGE_TRANSACTIONAL;
380 
381  /* transaction ID (if not valid, we're not streaming) */
382  if (TransactionIdIsValid(xid))
383  pq_sendint32(out, xid);
384 
385  pq_sendint8(out, flags);
386  pq_sendint64(out, lsn);
387  pq_sendstring(out, prefix);
388  pq_sendint32(out, sz);
389  pq_sendbytes(out, message, sz);
390 }
unsigned char uint8
Definition: c.h:439
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define MESSAGE_TRANSACTIONAL
Definition: proto.c:28
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static void pq_sendint8(StringInfo buf, uint8 i)
Definition: pqformat.h:129

◆ logicalrep_write_origin()

void logicalrep_write_origin ( StringInfo  out,
const char *  origin,
XLogRecPtr  origin_lsn 
)

Definition at line 113 of file proto.c.

References LOGICAL_REP_MSG_ORIGIN, pq_sendbyte(), pq_sendint64(), and pq_sendstring().

Referenced by pgoutput_begin_txn(), and pgoutput_stream_start().

115 {
117 
118  /* fixed fields */
119  pq_sendint64(out, origin_lsn);
120 
121  /* origin string */
122  pq_sendstring(out, origin);
123 }
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161

◆ logicalrep_write_rel()

void logicalrep_write_rel ( StringInfo  out,
TransactionId  xid,
Relation  rel 
)

Definition at line 396 of file proto.c.

References LOGICAL_REP_MSG_RELATION, logicalrep_write_attrs(), logicalrep_write_namespace(), pq_sendbyte(), pq_sendint32(), pq_sendstring(), RelationData::rd_rel, RelationGetNamespace, RelationGetRelationName, RelationGetRelid, relname, and TransactionIdIsValid.

Referenced by send_relation_and_attrs().

397 {
398  char *relname;
399 
401 
402  /* transaction ID (if not valid, we're not streaming) */
403  if (TransactionIdIsValid(xid))
404  pq_sendint32(out, xid);
405 
406  /* use Oid as relation identifier */
407  pq_sendint32(out, RelationGetRelid(rel));
408 
409  /* send qualified relation name */
411  relname = RelationGetRelationName(rel);
412  pq_sendstring(out, relname);
413 
414  /* send replica identity */
415  pq_sendbyte(out, rel->rd_rel->relreplident);
416 
417  /* send the attribute info */
418  logicalrep_write_attrs(out, rel);
419 }
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:750
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
Form_pg_class rd_rel
Definition: rel.h:110
NameData relname
Definition: pg_class.h:38
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static void logicalrep_write_attrs(StringInfo out, Relation rel)
Definition: proto.c:649
#define RelationGetRelationName(relation)
Definition: rel.h:491
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:457
#define RelationGetNamespace(relation)
Definition: rel.h:498

◆ logicalrep_write_stream_abort()

void logicalrep_write_stream_abort ( StringInfo  out,
TransactionId  xid,
TransactionId  subxid 
)

Definition at line 878 of file proto.c.

References Assert, LOGICAL_REP_MSG_STREAM_ABORT, pq_sendbyte(), pq_sendint32(), and TransactionIdIsValid.

Referenced by pgoutput_stream_abort().

880 {
882 
884 
885  /* transaction ID */
886  pq_sendint32(out, xid);
887  pq_sendint32(out, subxid);
888 }
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:804
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ logicalrep_write_stream_commit()

void logicalrep_write_stream_commit ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 827 of file proto.c.

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, LOGICAL_REP_MSG_STREAM_COMMIT, pq_sendbyte(), pq_sendint32(), pq_sendint64(), TransactionIdIsValid, and ReorderBufferTXN::xid.

Referenced by pgoutput_stream_commit().

829 {
830  uint8 flags = 0;
831 
833 
835 
836  /* transaction ID */
837  pq_sendint32(out, txn->xid);
838 
839  /* send the flags field (unused for now) */
840  pq_sendbyte(out, flags);
841 
842  /* send fields */
843  pq_sendint64(out, commit_lsn);
844  pq_sendint64(out, txn->end_lsn);
845  pq_sendint64(out, txn->commit_time);
846 }
TimestampTz commit_time
unsigned char uint8
Definition: c.h:439
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
TransactionId xid
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ logicalrep_write_stream_start()

void logicalrep_write_stream_start ( StringInfo  out,
TransactionId  xid,
bool  first_segment 
)

Definition at line 784 of file proto.c.

References Assert, LOGICAL_REP_MSG_STREAM_START, pq_sendbyte(), pq_sendint32(), and TransactionIdIsValid.

Referenced by pgoutput_stream_start().

786 {
788 
790 
791  /* transaction ID (we're starting to stream, so must be valid) */
792  pq_sendint32(out, xid);
793 
794  /* 1 if this is the first streaming segment for this xid */
795  pq_sendbyte(out, first_segment ? 1 : 0);
796 }
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:804
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ logicalrep_write_stream_stop()

void logicalrep_write_stream_stop ( StringInfo  out)

Definition at line 818 of file proto.c.

References LOGICAL_REP_MSG_STREAM_END, and pq_sendbyte().

Referenced by pgoutput_stream_stop().

819 {
821 }
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161

◆ logicalrep_write_truncate()

void logicalrep_write_truncate ( StringInfo  out,
TransactionId  xid,
int  nrelids,
Oid  relids[],
bool  cascade,
bool  restart_seqs 
)

Definition at line 312 of file proto.c.

References i, LOGICAL_REP_MSG_TRUNCATE, pq_sendbyte(), pq_sendint32(), pq_sendint8(), TransactionIdIsValid, TRUNCATE_CASCADE, and TRUNCATE_RESTART_SEQS.

Referenced by pgoutput_truncate().

317 {
318  int i;
319  uint8 flags = 0;
320 
322 
323  /* transaction ID (if not valid, we're not streaming) */
324  if (TransactionIdIsValid(xid))
325  pq_sendint32(out, xid);
326 
327  pq_sendint32(out, nrelids);
328 
329  /* encode and send truncate flags */
330  if (cascade)
331  flags |= TRUNCATE_CASCADE;
332  if (restart_seqs)
333  flags |= TRUNCATE_RESTART_SEQS;
334  pq_sendint8(out, flags);
335 
336  for (i = 0; i < nrelids; i++)
337  pq_sendint32(out, relids[i]);
338 }
unsigned char uint8
Definition: c.h:439
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
int i
#define TRUNCATE_CASCADE
Definition: proto.c:29
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static void pq_sendint8(StringInfo buf, uint8 i)
Definition: pqformat.h:129
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:30

◆ logicalrep_write_typ()

void logicalrep_write_typ ( StringInfo  out,
TransactionId  xid,
Oid  typoid 
)

Definition at line 450 of file proto.c.

References elog, ERROR, getBaseType(), GETSTRUCT, HeapTupleIsValid, LOGICAL_REP_MSG_TYPE, logicalrep_write_namespace(), NameStr, ObjectIdGetDatum, pq_sendbyte(), pq_sendint32(), pq_sendstring(), ReleaseSysCache(), SearchSysCache1(), TransactionIdIsValid, and TYPEOID.

Referenced by send_relation_and_attrs().

451 {
452  Oid basetypoid = getBaseType(typoid);
453  HeapTuple tup;
454  Form_pg_type typtup;
455 
457 
458  /* transaction ID (if not valid, we're not streaming) */
459  if (TransactionIdIsValid(xid))
460  pq_sendint32(out, xid);
461 
462  tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
463  if (!HeapTupleIsValid(tup))
464  elog(ERROR, "cache lookup failed for type %u", basetypoid);
465  typtup = (Form_pg_type) GETSTRUCT(tup);
466 
467  /* use Oid as relation identifier */
468  pq_sendint32(out, typoid);
469 
470  /* send qualified type name */
471  logicalrep_write_namespace(out, typtup->typnamespace);
472  pq_sendstring(out, NameStr(typtup->typname));
473 
474  ReleaseSysCache(tup);
475 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:654
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:750
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
unsigned int Oid
Definition: postgres_ext.h:31
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define ERROR
Definition: elog.h:46
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1127
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1175
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
FormData_pg_type * Form_pg_type
Definition: pg_type.h:261
#define elog(elevel,...)
Definition: elog.h:232
#define NameStr(name)
Definition: c.h:681
#define TransactionIdIsValid(xid)
Definition: transam.h:41
Oid getBaseType(Oid typid)
Definition: lsyscache.c:2468

◆ logicalrep_write_update()

void logicalrep_write_update ( StringInfo  out,
TransactionId  xid,
Relation  rel,
HeapTuple  oldtuple,
HeapTuple  newtuple,
bool  binary 
)

Definition at line 186 of file proto.c.

References Assert, LOGICAL_REP_MSG_UPDATE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

188 {
190 
191  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
192  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
193  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
194 
195  /* transaction ID (if not valid, we're not streaming) */
196  if (TransactionIdIsValid(xid))
197  pq_sendint32(out, xid);
198 
199  /* use Oid as relation identifier */
200  pq_sendint32(out, RelationGetRelid(rel));
201 
202  if (oldtuple != NULL)
203  {
204  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
205  pq_sendbyte(out, 'O'); /* old tuple follows */
206  else
207  pq_sendbyte(out, 'K'); /* old key follows */
208  logicalrep_write_tuple(out, rel, oldtuple, binary);
209  }
210 
211  pq_sendbyte(out, 'N'); /* new tuple follows */
212  logicalrep_write_tuple(out, rel, newtuple, binary);
213 }
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
Definition: proto.c:494
Form_pg_class rd_rel
Definition: rel.h:110
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:804
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:457