PostgreSQL Source Code  git master
logicalproto.h File Reference
#include "access/xact.h"
#include "replication/reorderbuffer.h"
#include "utils/rel.h"
Include dependency graph for logicalproto.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  LogicalRepTupleData
 
struct  LogicalRepRelation
 
struct  LogicalRepTyp
 
struct  LogicalRepBeginData
 
struct  LogicalRepCommitData
 
struct  LogicalRepPreparedTxnData
 
struct  LogicalRepCommitPreparedTxnData
 
struct  LogicalRepRollbackPreparedTxnData
 

Macros

#define LOGICALREP_PROTO_MIN_VERSION_NUM   1
 
#define LOGICALREP_PROTO_VERSION_NUM   1
 
#define LOGICALREP_PROTO_STREAM_VERSION_NUM   2
 
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM   3
 
#define LOGICALREP_PROTO_MAX_VERSION_NUM   LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
 
#define LOGICALREP_COLUMN_NULL   'n'
 
#define LOGICALREP_COLUMN_UNCHANGED   'u'
 
#define LOGICALREP_COLUMN_TEXT   't'
 
#define LOGICALREP_COLUMN_BINARY   'b' /* added in PG14 */
 

Typedefs

typedef enum LogicalRepMsgType LogicalRepMsgType
 
typedef struct LogicalRepTupleData LogicalRepTupleData
 
typedef uint32 LogicalRepRelId
 
typedef struct LogicalRepRelation LogicalRepRelation
 
typedef struct LogicalRepTyp LogicalRepTyp
 
typedef struct LogicalRepBeginData LogicalRepBeginData
 
typedef struct LogicalRepCommitData LogicalRepCommitData
 
typedef struct LogicalRepPreparedTxnData LogicalRepPreparedTxnData
 
typedef struct LogicalRepCommitPreparedTxnData LogicalRepCommitPreparedTxnData
 
typedef struct LogicalRepRollbackPreparedTxnData LogicalRepRollbackPreparedTxnData
 

Enumerations

enum  LogicalRepMsgType {
  LOGICAL_REP_MSG_BEGIN = 'B', LOGICAL_REP_MSG_COMMIT = 'C', LOGICAL_REP_MSG_ORIGIN = 'O', LOGICAL_REP_MSG_INSERT = 'I',
  LOGICAL_REP_MSG_UPDATE = 'U', LOGICAL_REP_MSG_DELETE = 'D', LOGICAL_REP_MSG_TRUNCATE = 'T', LOGICAL_REP_MSG_RELATION = 'R',
  LOGICAL_REP_MSG_TYPE = 'Y', LOGICAL_REP_MSG_MESSAGE = 'M', LOGICAL_REP_MSG_BEGIN_PREPARE = 'b', LOGICAL_REP_MSG_PREPARE = 'P',
  LOGICAL_REP_MSG_COMMIT_PREPARED = 'K', LOGICAL_REP_MSG_ROLLBACK_PREPARED = 'r', LOGICAL_REP_MSG_STREAM_START = 'S', LOGICAL_REP_MSG_STREAM_END = 'E',
  LOGICAL_REP_MSG_STREAM_COMMIT = 'c', LOGICAL_REP_MSG_STREAM_ABORT = 'A'
}
 

Functions

void logicalrep_write_begin (StringInfo out, ReorderBufferTXN *txn)
 
void logicalrep_read_begin (StringInfo in, LogicalRepBeginData *begin_data)
 
void logicalrep_write_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
void logicalrep_read_commit (StringInfo in, LogicalRepCommitData *commit_data)
 
void logicalrep_write_begin_prepare (StringInfo out, ReorderBufferTXN *txn)
 
void logicalrep_read_begin_prepare (StringInfo in, LogicalRepPreparedTxnData *begin_data)
 
void logicalrep_write_prepare (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
void logicalrep_read_prepare (StringInfo in, LogicalRepPreparedTxnData *prepare_data)
 
void logicalrep_write_commit_prepared (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
void logicalrep_read_commit_prepared (StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
 
void logicalrep_write_rollback_prepared (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
void logicalrep_read_rollback_prepared (StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
 
void logicalrep_write_origin (StringInfo out, const char *origin, XLogRecPtr origin_lsn)
 
char * logicalrep_read_origin (StringInfo in, XLogRecPtr *origin_lsn)
 
void logicalrep_write_insert (StringInfo out, TransactionId xid, Relation rel, HeapTuple newtuple, bool binary)
 
LogicalRepRelId logicalrep_read_insert (StringInfo in, LogicalRepTupleData *newtup)
 
void logicalrep_write_update (StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary)
 
LogicalRepRelId logicalrep_read_update (StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
 
void logicalrep_write_delete (StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, bool binary)
 
LogicalRepRelId logicalrep_read_delete (StringInfo in, LogicalRepTupleData *oldtup)
 
void logicalrep_write_truncate (StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
 
Listlogicalrep_read_truncate (StringInfo in, bool *cascade, bool *restart_seqs)
 
void logicalrep_write_message (StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
void logicalrep_write_rel (StringInfo out, TransactionId xid, Relation rel)
 
LogicalRepRelationlogicalrep_read_rel (StringInfo in)
 
void logicalrep_write_typ (StringInfo out, TransactionId xid, Oid typoid)
 
void logicalrep_read_typ (StringInfo out, LogicalRepTyp *ltyp)
 
void logicalrep_write_stream_start (StringInfo out, TransactionId xid, bool first_segment)
 
TransactionId logicalrep_read_stream_start (StringInfo in, bool *first_segment)
 
void logicalrep_write_stream_stop (StringInfo out)
 
void logicalrep_write_stream_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
TransactionId logicalrep_read_stream_commit (StringInfo out, LogicalRepCommitData *commit_data)
 
void logicalrep_write_stream_abort (StringInfo out, TransactionId xid, TransactionId subxid)
 
void logicalrep_read_stream_abort (StringInfo in, TransactionId *xid, TransactionId *subxid)
 

Macro Definition Documentation

◆ LOGICALREP_COLUMN_BINARY

#define LOGICALREP_COLUMN_BINARY   'b' /* added in PG14 */

◆ LOGICALREP_COLUMN_NULL

#define LOGICALREP_COLUMN_NULL   'n'

Definition at line 89 of file logicalproto.h.

Referenced by logicalrep_read_tuple(), and logicalrep_write_tuple().

◆ LOGICALREP_COLUMN_TEXT

#define LOGICALREP_COLUMN_TEXT   't'

◆ LOGICALREP_COLUMN_UNCHANGED

#define LOGICALREP_COLUMN_UNCHANGED   'u'

◆ LOGICALREP_PROTO_MAX_VERSION_NUM

#define LOGICALREP_PROTO_MAX_VERSION_NUM   LOGICALREP_PROTO_TWOPHASE_VERSION_NUM

Definition at line 39 of file logicalproto.h.

Referenced by pgoutput_startup().

◆ LOGICALREP_PROTO_MIN_VERSION_NUM

#define LOGICALREP_PROTO_MIN_VERSION_NUM   1

Definition at line 35 of file logicalproto.h.

Referenced by pgoutput_startup().

◆ LOGICALREP_PROTO_STREAM_VERSION_NUM

#define LOGICALREP_PROTO_STREAM_VERSION_NUM   2

Definition at line 37 of file logicalproto.h.

Referenced by ApplyWorkerMain(), and pgoutput_startup().

◆ LOGICALREP_PROTO_TWOPHASE_VERSION_NUM

#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM   3

Definition at line 38 of file logicalproto.h.

Referenced by ApplyWorkerMain(), and pgoutput_startup().

◆ LOGICALREP_PROTO_VERSION_NUM

#define LOGICALREP_PROTO_VERSION_NUM   1

Definition at line 36 of file logicalproto.h.

Referenced by ApplyWorkerMain().

Typedef Documentation

◆ LogicalRepBeginData

◆ LogicalRepCommitData

◆ LogicalRepCommitPreparedTxnData

◆ LogicalRepMsgType

◆ LogicalRepPreparedTxnData

◆ LogicalRepRelation

◆ LogicalRepRelId

Definition at line 94 of file logicalproto.h.

◆ LogicalRepRollbackPreparedTxnData

◆ LogicalRepTupleData

◆ LogicalRepTyp

typedef struct LogicalRepTyp LogicalRepTyp

Enumeration Type Documentation

◆ LogicalRepMsgType

Enumerator
LOGICAL_REP_MSG_BEGIN 
LOGICAL_REP_MSG_COMMIT 
LOGICAL_REP_MSG_ORIGIN 
LOGICAL_REP_MSG_INSERT 
LOGICAL_REP_MSG_UPDATE 
LOGICAL_REP_MSG_DELETE 
LOGICAL_REP_MSG_TRUNCATE 
LOGICAL_REP_MSG_RELATION 
LOGICAL_REP_MSG_TYPE 
LOGICAL_REP_MSG_MESSAGE 
LOGICAL_REP_MSG_BEGIN_PREPARE 
LOGICAL_REP_MSG_PREPARE 
LOGICAL_REP_MSG_COMMIT_PREPARED 
LOGICAL_REP_MSG_ROLLBACK_PREPARED 
LOGICAL_REP_MSG_STREAM_START 
LOGICAL_REP_MSG_STREAM_END 
LOGICAL_REP_MSG_STREAM_COMMIT 
LOGICAL_REP_MSG_STREAM_ABORT 

Definition at line 51 of file logicalproto.h.

52 {
LogicalRepMsgType
Definition: logicalproto.h:51

Function Documentation

◆ logicalrep_read_begin()

void logicalrep_read_begin ( StringInfo  in,
LogicalRepBeginData begin_data 
)

Definition at line 60 of file proto.c.

References LogicalRepBeginData::committime, elog, ERROR, LogicalRepBeginData::final_lsn, InvalidXLogRecPtr, pq_getmsgint(), pq_getmsgint64(), and LogicalRepBeginData::xid.

Referenced by apply_handle_begin().

61 {
62  /* read fields */
63  begin_data->final_lsn = pq_getmsgint64(in);
64  if (begin_data->final_lsn == InvalidXLogRecPtr)
65  elog(ERROR, "final_lsn not set in begin message");
66  begin_data->committime = pq_getmsgint64(in);
67  begin_data->xid = pq_getmsgint(in, 4);
68 }
TransactionId xid
Definition: logicalproto.h:124
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define ERROR
Definition: elog.h:46
XLogRecPtr final_lsn
Definition: logicalproto.h:122
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
TimestampTz committime
Definition: logicalproto.h:123

◆ logicalrep_read_begin_prepare()

void logicalrep_read_begin_prepare ( StringInfo  in,
LogicalRepPreparedTxnData begin_data 
)

Definition at line 131 of file proto.c.

References elog, LogicalRepPreparedTxnData::end_lsn, ERROR, LogicalRepPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepPreparedTxnData::prepare_lsn, LogicalRepPreparedTxnData::prepare_time, strlcpy(), and LogicalRepPreparedTxnData::xid.

Referenced by apply_handle_begin_prepare().

132 {
133  /* read fields */
134  begin_data->prepare_lsn = pq_getmsgint64(in);
135  if (begin_data->prepare_lsn == InvalidXLogRecPtr)
136  elog(ERROR, "prepare_lsn not set in begin prepare message");
137  begin_data->end_lsn = pq_getmsgint64(in);
138  if (begin_data->end_lsn == InvalidXLogRecPtr)
139  elog(ERROR, "end_lsn not set in begin prepare message");
140  begin_data->prepare_time = pq_getmsgint64(in);
141  begin_data->xid = pq_getmsgint(in, 4);
142 
143  /* read gid (copy it into a pre-allocated buffer) */
144  strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
145 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
#define ERROR
Definition: elog.h:46
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_commit()

void logicalrep_read_commit ( StringInfo  in,
LogicalRepCommitData commit_data 
)

Definition at line 95 of file proto.c.

References LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, elog, LogicalRepCommitData::end_lsn, ERROR, pq_getmsgbyte(), and pq_getmsgint64().

Referenced by apply_handle_commit().

96 {
97  /* read flags (unused for now) */
98  uint8 flags = pq_getmsgbyte(in);
99 
100  if (flags != 0)
101  elog(ERROR, "unrecognized flags %u in commit message", flags);
102 
103  /* read fields */
104  commit_data->commit_lsn = pq_getmsgint64(in);
105  commit_data->end_lsn = pq_getmsgint64(in);
106  commit_data->committime = pq_getmsgint64(in);
107 }
unsigned char uint8
Definition: c.h:439
#define ERROR
Definition: elog.h:46
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
TimestampTz committime
Definition: logicalproto.h:131

◆ logicalrep_read_commit_prepared()

void logicalrep_read_commit_prepared ( StringInfo  in,
LogicalRepCommitPreparedTxnData prepare_data 
)

Definition at line 260 of file proto.c.

References LogicalRepCommitPreparedTxnData::commit_lsn, LogicalRepCommitPreparedTxnData::commit_time, elog, LogicalRepCommitPreparedTxnData::end_lsn, ERROR, LogicalRepCommitPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), strlcpy(), and LogicalRepCommitPreparedTxnData::xid.

Referenced by apply_handle_commit_prepared().

261 {
262  /* read flags */
263  uint8 flags = pq_getmsgbyte(in);
264 
265  if (flags != 0)
266  elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
267 
268  /* read fields */
269  prepare_data->commit_lsn = pq_getmsgint64(in);
270  if (prepare_data->commit_lsn == InvalidXLogRecPtr)
271  elog(ERROR, "commit_lsn is not set in commit prepared message");
272  prepare_data->end_lsn = pq_getmsgint64(in);
273  if (prepare_data->end_lsn == InvalidXLogRecPtr)
274  elog(ERROR, "end_lsn is not set in commit prepared message");
275  prepare_data->commit_time = pq_getmsgint64(in);
276  prepare_data->xid = pq_getmsgint(in, 4);
277 
278  /* read gid (copy it into a pre-allocated buffer) */
279  strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
280 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
unsigned char uint8
Definition: c.h:439
#define ERROR
Definition: elog.h:46
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_delete()

LogicalRepRelId logicalrep_read_delete ( StringInfo  in,
LogicalRepTupleData oldtup 
)

Definition at line 523 of file proto.c.

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_delete().

524 {
525  char action;
526  LogicalRepRelId relid;
527 
528  /* read the relation id */
529  relid = pq_getmsgint(in, 4);
530 
531  /* read and verify action */
532  action = pq_getmsgbyte(in);
533  if (action != 'K' && action != 'O')
534  elog(ERROR, "expected action 'O' or 'K', got %c", action);
535 
536  logicalrep_read_tuple(in, oldtup);
537 
538  return relid;
539 }
#define ERROR
Definition: elog.h:46
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:816
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:232
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
uint32 LogicalRepRelId
Definition: logicalproto.h:94

◆ logicalrep_read_insert()

LogicalRepRelId logicalrep_read_insert ( StringInfo  in,
LogicalRepTupleData newtup 
)

Definition at line 397 of file proto.c.

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_insert().

398 {
399  char action;
400  LogicalRepRelId relid;
401 
402  /* read the relation id */
403  relid = pq_getmsgint(in, 4);
404 
405  action = pq_getmsgbyte(in);
406  if (action != 'N')
407  elog(ERROR, "expected new tuple but got %d",
408  action);
409 
410  logicalrep_read_tuple(in, newtup);
411 
412  return relid;
413 }
#define ERROR
Definition: elog.h:46
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:816
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:232
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
uint32 LogicalRepRelId
Definition: logicalproto.h:94

◆ logicalrep_read_origin()

char* logicalrep_read_origin ( StringInfo  in,
XLogRecPtr origin_lsn 
)

Definition at line 362 of file proto.c.

References pq_getmsgint64(), pq_getmsgstring(), and pstrdup().

363 {
364  /* fixed fields */
365  *origin_lsn = pq_getmsgint64(in);
366 
367  /* return origin */
368  return pstrdup(pq_getmsgstring(in));
369 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * pstrdup(const char *in)
Definition: mcxt.c:1299
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455

◆ logicalrep_read_prepare()

void logicalrep_read_prepare ( StringInfo  in,
LogicalRepPreparedTxnData prepare_data 
)

Definition at line 221 of file proto.c.

References logicalrep_read_prepare_common().

Referenced by apply_handle_prepare().

222 {
223  logicalrep_read_prepare_common(in, "prepare", prepare_data);
224 }
static void logicalrep_read_prepare_common(StringInfo in, char *msgtype, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:194

◆ logicalrep_read_rel()

LogicalRepRelation* logicalrep_read_rel ( StringInfo  in)

Definition at line 658 of file proto.c.

References logicalrep_read_attrs(), logicalrep_read_namespace(), LogicalRepRelation::nspname, palloc(), pq_getmsgbyte(), pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepRelation::relname, LogicalRepRelation::remoteid, and LogicalRepRelation::replident.

Referenced by apply_handle_relation().

659 {
661 
662  rel->remoteid = pq_getmsgint(in, 4);
663 
664  /* Read relation name from stream */
666  rel->relname = pstrdup(pq_getmsgstring(in));
667 
668  /* Read the replica identity. */
669  rel->replident = pq_getmsgbyte(in);
670 
671  /* Get attribute description */
672  logicalrep_read_attrs(in, rel);
673 
674  return rel;
675 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * pstrdup(const char *in)
Definition: mcxt.c:1299
LogicalRepRelId remoteid
Definition: logicalproto.h:100
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
Definition: proto.c:940
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:1002
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
void * palloc(Size size)
Definition: mcxt.c:1062
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_rollback_prepared()

void logicalrep_read_rollback_prepared ( StringInfo  in,
LogicalRepRollbackPreparedTxnData rollback_data 
)

Definition at line 318 of file proto.c.

References elog, ERROR, LogicalRepRollbackPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepRollbackPreparedTxnData::prepare_end_lsn, LogicalRepRollbackPreparedTxnData::prepare_time, LogicalRepRollbackPreparedTxnData::rollback_end_lsn, LogicalRepRollbackPreparedTxnData::rollback_time, strlcpy(), and LogicalRepRollbackPreparedTxnData::xid.

Referenced by apply_handle_rollback_prepared().

320 {
321  /* read flags */
322  uint8 flags = pq_getmsgbyte(in);
323 
324  if (flags != 0)
325  elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
326 
327  /* read fields */
328  rollback_data->prepare_end_lsn = pq_getmsgint64(in);
329  if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
330  elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
331  rollback_data->rollback_end_lsn = pq_getmsgint64(in);
332  if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
333  elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
334  rollback_data->prepare_time = pq_getmsgint64(in);
335  rollback_data->rollback_time = pq_getmsgint64(in);
336  rollback_data->xid = pq_getmsgint(in, 4);
337 
338  /* read gid (copy it into a pre-allocated buffer) */
339  strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
340 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
unsigned char uint8
Definition: c.h:439
#define ERROR
Definition: elog.h:46
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_stream_abort()

void logicalrep_read_stream_abort ( StringInfo  in,
TransactionId xid,
TransactionId subxid 
)

Definition at line 1126 of file proto.c.

References Assert, and pq_getmsgint().

Referenced by apply_handle_stream_abort().

1128 {
1129  Assert(xid && subxid);
1130 
1131  *xid = pq_getmsgint(in, 4);
1132  *subxid = pq_getmsgint(in, 4);
1133 }
#define Assert(condition)
Definition: c.h:804
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_stream_commit()

TransactionId logicalrep_read_stream_commit ( StringInfo  out,
LogicalRepCommitData commit_data 
)

Definition at line 1084 of file proto.c.

References LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, elog, LogicalRepCommitData::end_lsn, ERROR, pq_getmsgbyte(), pq_getmsgint(), and pq_getmsgint64().

Referenced by apply_handle_stream_commit().

1085 {
1086  TransactionId xid;
1087  uint8 flags;
1088 
1089  xid = pq_getmsgint(in, 4);
1090 
1091  /* read flags (unused for now) */
1092  flags = pq_getmsgbyte(in);
1093 
1094  if (flags != 0)
1095  elog(ERROR, "unrecognized flags %u in commit message", flags);
1096 
1097  /* read fields */
1098  commit_data->commit_lsn = pq_getmsgint64(in);
1099  commit_data->end_lsn = pq_getmsgint64(in);
1100  commit_data->committime = pq_getmsgint64(in);
1101 
1102  return xid;
1103 }
uint32 TransactionId
Definition: c.h:587
unsigned char uint8
Definition: c.h:439
#define ERROR
Definition: elog.h:46
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
TimestampTz committime
Definition: logicalproto.h:131

◆ logicalrep_read_stream_start()

TransactionId logicalrep_read_stream_start ( StringInfo  in,
bool first_segment 
)

Definition at line 1034 of file proto.c.

References Assert, pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_stream_start().

1035 {
1036  TransactionId xid;
1037 
1038  Assert(first_segment);
1039 
1040  xid = pq_getmsgint(in, 4);
1041  *first_segment = (pq_getmsgbyte(in) == 1);
1042 
1043  return xid;
1044 }
uint32 TransactionId
Definition: c.h:587
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define Assert(condition)
Definition: c.h:804
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_truncate()

List* logicalrep_read_truncate ( StringInfo  in,
bool cascade,
bool restart_seqs 
)

Definition at line 577 of file proto.c.

References i, lappend_oid(), NIL, pq_getmsgint(), TRUNCATE_CASCADE, and TRUNCATE_RESTART_SEQS.

Referenced by apply_handle_truncate().

579 {
580  int i;
581  int nrelids;
582  List *relids = NIL;
583  uint8 flags;
584 
585  nrelids = pq_getmsgint(in, 4);
586 
587  /* read and decode truncate flags */
588  flags = pq_getmsgint(in, 1);
589  *cascade = (flags & TRUNCATE_CASCADE) > 0;
590  *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
591 
592  for (i = 0; i < nrelids; i++)
593  relids = lappend_oid(relids, pq_getmsgint(in, 4));
594 
595  return relids;
596 }
#define NIL
Definition: pg_list.h:65
unsigned char uint8
Definition: c.h:439
List * lappend_oid(List *list, Oid datum)
Definition: list.c:372
int i
#define TRUNCATE_CASCADE
Definition: proto.c:29
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
Definition: pg_list.h:50
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:30

◆ logicalrep_read_typ()

void logicalrep_read_typ ( StringInfo  out,
LogicalRepTyp ltyp 
)

Definition at line 714 of file proto.c.

References logicalrep_read_namespace(), LogicalRepTyp::nspname, pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepTyp::remoteid, and LogicalRepTyp::typname.

Referenced by apply_handle_type().

715 {
716  ltyp->remoteid = pq_getmsgint(in, 4);
717 
718  /* Read type name from stream */
720  ltyp->typname = pstrdup(pq_getmsgstring(in));
721 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * pstrdup(const char *in)
Definition: mcxt.c:1299
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:1002
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_update()

LogicalRepRelId logicalrep_read_update ( StringInfo  in,
bool has_oldtuple,
LogicalRepTupleData oldtup,
LogicalRepTupleData newtup 
)

Definition at line 452 of file proto.c.

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_update().

455 {
456  char action;
457  LogicalRepRelId relid;
458 
459  /* read the relation id */
460  relid = pq_getmsgint(in, 4);
461 
462  /* read and verify action */
463  action = pq_getmsgbyte(in);
464  if (action != 'K' && action != 'O' && action != 'N')
465  elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
466  action);
467 
468  /* check for old tuple */
469  if (action == 'K' || action == 'O')
470  {
471  logicalrep_read_tuple(in, oldtup);
472  *has_oldtuple = true;
473 
474  action = pq_getmsgbyte(in);
475  }
476  else
477  *has_oldtuple = false;
478 
479  /* check for new tuple */
480  if (action != 'N')
481  elog(ERROR, "expected action 'N', got %c",
482  action);
483 
484  logicalrep_read_tuple(in, newtup);
485 
486  return relid;
487 }
#define ERROR
Definition: elog.h:46
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:816
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:232
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
uint32 LogicalRepRelId
Definition: logicalproto.h:94

◆ logicalrep_write_begin()

void logicalrep_write_begin ( StringInfo  out,
ReorderBufferTXN txn 
)

Definition at line 46 of file proto.c.

References ReorderBufferTXN::commit_time, ReorderBufferTXN::final_lsn, LOGICAL_REP_MSG_BEGIN, pq_sendbyte(), pq_sendint32(), pq_sendint64(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_begin_txn().

47 {
49 
50  /* fixed fields */
51  pq_sendint64(out, txn->final_lsn);
53  pq_sendint32(out, txn->xid);
54 }
TimestampTz commit_time
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
XLogRecPtr final_lsn
TransactionId xid
union ReorderBufferTXN::@103 xact_time

◆ logicalrep_write_begin_prepare()

void logicalrep_write_begin_prepare ( StringInfo  out,
ReorderBufferTXN txn 
)

Definition at line 113 of file proto.c.

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_BEGIN_PREPARE, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::prepare_time, ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_begin_prepare_txn().

114 {
116 
117  /* fixed fields */
118  pq_sendint64(out, txn->final_lsn);
119  pq_sendint64(out, txn->end_lsn);
121  pq_sendint32(out, txn->xid);
122 
123  /* send gid */
124  pq_sendstring(out, txn->gid);
125 }
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
XLogRecPtr final_lsn
TransactionId xid
union ReorderBufferTXN::@103 xact_time
XLogRecPtr end_lsn
TimestampTz prepare_time

◆ logicalrep_write_commit()

void logicalrep_write_commit ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 75 of file proto.c.

References ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, LOGICAL_REP_MSG_COMMIT, pq_sendbyte(), pq_sendint64(), and ReorderBufferTXN::xact_time.

Referenced by pgoutput_commit_txn().

77 {
78  uint8 flags = 0;
79 
81 
82  /* send the flags field (unused for now) */
83  pq_sendbyte(out, flags);
84 
85  /* send fields */
86  pq_sendint64(out, commit_lsn);
87  pq_sendint64(out, txn->end_lsn);
89 }
TimestampTz commit_time
unsigned char uint8
Definition: c.h:439
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
union ReorderBufferTXN::@103 xact_time
XLogRecPtr end_lsn

◆ logicalrep_write_commit_prepared()

void logicalrep_write_commit_prepared ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 230 of file proto.c.

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_COMMIT_PREPARED, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_commit_prepared_txn().

232 {
233  uint8 flags = 0;
234 
236 
237  /*
238  * This should only ever happen for two-phase commit transactions, in
239  * which case we expect to have a valid GID.
240  */
241  Assert(txn->gid != NULL);
242 
243  /* send the flags field */
244  pq_sendbyte(out, flags);
245 
246  /* send fields */
247  pq_sendint64(out, commit_lsn);
248  pq_sendint64(out, txn->end_lsn);
250  pq_sendint32(out, txn->xid);
251 
252  /* send gid */
253  pq_sendstring(out, txn->gid);
254 }
TimestampTz commit_time
unsigned char uint8
Definition: c.h:439
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
TransactionId xid
union ReorderBufferTXN::@103 xact_time
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn

◆ logicalrep_write_delete()

void logicalrep_write_delete ( StringInfo  out,
TransactionId  xid,
Relation  rel,
HeapTuple  oldtuple,
bool  binary 
)

Definition at line 493 of file proto.c.

References Assert, LOGICAL_REP_MSG_DELETE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

495 {
496  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
497  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
498  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
499 
501 
502  /* transaction ID (if not valid, we're not streaming) */
503  if (TransactionIdIsValid(xid))
504  pq_sendint32(out, xid);
505 
506  /* use Oid as relation identifier */
507  pq_sendint32(out, RelationGetRelid(rel));
508 
509  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
510  pq_sendbyte(out, 'O'); /* old tuple follows */
511  else
512  pq_sendbyte(out, 'K'); /* old key follows */
513 
514  logicalrep_write_tuple(out, rel, oldtuple, binary);
515 }
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
Definition: proto.c:727
Form_pg_class rd_rel
Definition: rel.h:109
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:804
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:477

◆ logicalrep_write_insert()

void logicalrep_write_insert ( StringInfo  out,
TransactionId  xid,
Relation  rel,
HeapTuple  newtuple,
bool  binary 
)

Definition at line 375 of file proto.c.

References LOGICAL_REP_MSG_INSERT, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

377 {
379 
380  /* transaction ID (if not valid, we're not streaming) */
381  if (TransactionIdIsValid(xid))
382  pq_sendint32(out, xid);
383 
384  /* use Oid as relation identifier */
385  pq_sendint32(out, RelationGetRelid(rel));
386 
387  pq_sendbyte(out, 'N'); /* new tuple follows */
388  logicalrep_write_tuple(out, rel, newtuple, binary);
389 }
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
Definition: proto.c:727
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:477

◆ logicalrep_write_message()

void logicalrep_write_message ( StringInfo  out,
TransactionId  xid,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)

Definition at line 602 of file proto.c.

References LOGICAL_REP_MSG_MESSAGE, MESSAGE_TRANSACTIONAL, pq_sendbyte(), pq_sendbytes(), pq_sendint32(), pq_sendint64(), pq_sendint8(), pq_sendstring(), and TransactionIdIsValid.

Referenced by pgoutput_message().

605 {
606  uint8 flags = 0;
607 
609 
610  /* encode and send message flags */
611  if (transactional)
612  flags |= MESSAGE_TRANSACTIONAL;
613 
614  /* transaction ID (if not valid, we're not streaming) */
615  if (TransactionIdIsValid(xid))
616  pq_sendint32(out, xid);
617 
618  pq_sendint8(out, flags);
619  pq_sendint64(out, lsn);
620  pq_sendstring(out, prefix);
621  pq_sendint32(out, sz);
622  pq_sendbytes(out, message, sz);
623 }
unsigned char uint8
Definition: c.h:439
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define MESSAGE_TRANSACTIONAL
Definition: proto.c:28
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static void pq_sendint8(StringInfo buf, uint8 i)
Definition: pqformat.h:129

◆ logicalrep_write_origin()

void logicalrep_write_origin ( StringInfo  out,
const char *  origin,
XLogRecPtr  origin_lsn 
)

Definition at line 346 of file proto.c.

References LOGICAL_REP_MSG_ORIGIN, pq_sendbyte(), pq_sendint64(), and pq_sendstring().

Referenced by send_repl_origin().

348 {
350 
351  /* fixed fields */
352  pq_sendint64(out, origin_lsn);
353 
354  /* origin string */
355  pq_sendstring(out, origin);
356 }
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161

◆ logicalrep_write_prepare()

void logicalrep_write_prepare ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)

Definition at line 183 of file proto.c.

References LOGICAL_REP_MSG_PREPARE, and logicalrep_write_prepare_common().

Referenced by pgoutput_prepare_txn().

185 {
187  txn, prepare_lsn);
188 }
static void logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:151

◆ logicalrep_write_rel()

void logicalrep_write_rel ( StringInfo  out,
TransactionId  xid,
Relation  rel 
)

Definition at line 629 of file proto.c.

References LOGICAL_REP_MSG_RELATION, logicalrep_write_attrs(), logicalrep_write_namespace(), pq_sendbyte(), pq_sendint32(), pq_sendstring(), RelationData::rd_rel, RelationGetNamespace, RelationGetRelationName, RelationGetRelid, relname, and TransactionIdIsValid.

Referenced by send_relation_and_attrs().

630 {
631  char *relname;
632 
634 
635  /* transaction ID (if not valid, we're not streaming) */
636  if (TransactionIdIsValid(xid))
637  pq_sendint32(out, xid);
638 
639  /* use Oid as relation identifier */
640  pq_sendint32(out, RelationGetRelid(rel));
641 
642  /* send qualified relation name */
644  relname = RelationGetRelationName(rel);
645  pq_sendstring(out, relname);
646 
647  /* send replica identity */
648  pq_sendbyte(out, rel->rd_rel->relreplident);
649 
650  /* send the attribute info */
651  logicalrep_write_attrs(out, rel);
652 }
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:982
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
Form_pg_class rd_rel
Definition: rel.h:109
NameData relname
Definition: pg_class.h:38
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static void logicalrep_write_attrs(StringInfo out, Relation rel)
Definition: proto.c:882
#define RelationGetRelationName(relation)
Definition: rel.h:511
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:477
#define RelationGetNamespace(relation)
Definition: rel.h:518

◆ logicalrep_write_rollback_prepared()

void logicalrep_write_rollback_prepared ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  prepare_end_lsn,
TimestampTz  prepare_time 
)

Definition at line 286 of file proto.c.

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_ROLLBACK_PREPARED, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_rollback_prepared_txn().

289 {
290  uint8 flags = 0;
291 
293 
294  /*
295  * This should only ever happen for two-phase commit transactions, in
296  * which case we expect to have a valid GID.
297  */
298  Assert(txn->gid != NULL);
299 
300  /* send the flags field */
301  pq_sendbyte(out, flags);
302 
303  /* send fields */
304  pq_sendint64(out, prepare_end_lsn);
305  pq_sendint64(out, txn->end_lsn);
306  pq_sendint64(out, prepare_time);
308  pq_sendint32(out, txn->xid);
309 
310  /* send gid */
311  pq_sendstring(out, txn->gid);
312 }
TimestampTz commit_time
unsigned char uint8
Definition: c.h:439
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
TransactionId xid
union ReorderBufferTXN::@103 xact_time
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn

◆ logicalrep_write_stream_abort()

void logicalrep_write_stream_abort ( StringInfo  out,
TransactionId  xid,
TransactionId  subxid 
)

Definition at line 1110 of file proto.c.

References Assert, LOGICAL_REP_MSG_STREAM_ABORT, pq_sendbyte(), pq_sendint32(), and TransactionIdIsValid.

Referenced by pgoutput_stream_abort().

1112 {
1114 
1116 
1117  /* transaction ID */
1118  pq_sendint32(out, xid);
1119  pq_sendint32(out, subxid);
1120 }
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:804
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ logicalrep_write_stream_commit()

void logicalrep_write_stream_commit ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 1059 of file proto.c.

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, LOGICAL_REP_MSG_STREAM_COMMIT, pq_sendbyte(), pq_sendint32(), pq_sendint64(), TransactionIdIsValid, ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_stream_commit().

1061 {
1062  uint8 flags = 0;
1063 
1065 
1067 
1068  /* transaction ID */
1069  pq_sendint32(out, txn->xid);
1070 
1071  /* send the flags field (unused for now) */
1072  pq_sendbyte(out, flags);
1073 
1074  /* send fields */
1075  pq_sendint64(out, commit_lsn);
1076  pq_sendint64(out, txn->end_lsn);
1077  pq_sendint64(out, txn->xact_time.commit_time);
1078 }
TimestampTz commit_time
unsigned char uint8
Definition: c.h:439
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
TransactionId xid
union ReorderBufferTXN::@103 xact_time
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ logicalrep_write_stream_start()

void logicalrep_write_stream_start ( StringInfo  out,
TransactionId  xid,
bool  first_segment 
)

Definition at line 1016 of file proto.c.

References Assert, LOGICAL_REP_MSG_STREAM_START, pq_sendbyte(), pq_sendint32(), and TransactionIdIsValid.

Referenced by pgoutput_stream_start().

1018 {
1020 
1022 
1023  /* transaction ID (we're starting to stream, so must be valid) */
1024  pq_sendint32(out, xid);
1025 
1026  /* 1 if this is the first streaming segment for this xid */
1027  pq_sendbyte(out, first_segment ? 1 : 0);
1028 }
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:804
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ logicalrep_write_stream_stop()

void logicalrep_write_stream_stop ( StringInfo  out)

Definition at line 1050 of file proto.c.

References LOGICAL_REP_MSG_STREAM_END, and pq_sendbyte().

Referenced by pgoutput_stream_stop().

1051 {
1053 }
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161

◆ logicalrep_write_truncate()

void logicalrep_write_truncate ( StringInfo  out,
TransactionId  xid,
int  nrelids,
Oid  relids[],
bool  cascade,
bool  restart_seqs 
)

Definition at line 545 of file proto.c.

References i, LOGICAL_REP_MSG_TRUNCATE, pq_sendbyte(), pq_sendint32(), pq_sendint8(), TransactionIdIsValid, TRUNCATE_CASCADE, and TRUNCATE_RESTART_SEQS.

Referenced by pgoutput_truncate().

550 {
551  int i;
552  uint8 flags = 0;
553 
555 
556  /* transaction ID (if not valid, we're not streaming) */
557  if (TransactionIdIsValid(xid))
558  pq_sendint32(out, xid);
559 
560  pq_sendint32(out, nrelids);
561 
562  /* encode and send truncate flags */
563  if (cascade)
564  flags |= TRUNCATE_CASCADE;
565  if (restart_seqs)
566  flags |= TRUNCATE_RESTART_SEQS;
567  pq_sendint8(out, flags);
568 
569  for (i = 0; i < nrelids; i++)
570  pq_sendint32(out, relids[i]);
571 }
unsigned char uint8
Definition: c.h:439
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
int i
#define TRUNCATE_CASCADE
Definition: proto.c:29
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static void pq_sendint8(StringInfo buf, uint8 i)
Definition: pqformat.h:129
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:30

◆ logicalrep_write_typ()

void logicalrep_write_typ ( StringInfo  out,
TransactionId  xid,
Oid  typoid 
)

Definition at line 683 of file proto.c.

References elog, ERROR, getBaseType(), GETSTRUCT, HeapTupleIsValid, LOGICAL_REP_MSG_TYPE, logicalrep_write_namespace(), NameStr, ObjectIdGetDatum, pq_sendbyte(), pq_sendint32(), pq_sendstring(), ReleaseSysCache(), SearchSysCache1(), TransactionIdIsValid, and TYPEOID.

Referenced by send_relation_and_attrs().

684 {
685  Oid basetypoid = getBaseType(typoid);
686  HeapTuple tup;
687  Form_pg_type typtup;
688 
690 
691  /* transaction ID (if not valid, we're not streaming) */
692  if (TransactionIdIsValid(xid))
693  pq_sendint32(out, xid);
694 
695  tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
696  if (!HeapTupleIsValid(tup))
697  elog(ERROR, "cache lookup failed for type %u", basetypoid);
698  typtup = (Form_pg_type) GETSTRUCT(tup);
699 
700  /* use Oid as relation identifier */
701  pq_sendint32(out, typoid);
702 
703  /* send qualified type name */
704  logicalrep_write_namespace(out, typtup->typnamespace);
705  pq_sendstring(out, NameStr(typtup->typname));
706 
707  ReleaseSysCache(tup);
708 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:654
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:982
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
unsigned int Oid
Definition: postgres_ext.h:31
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define ERROR
Definition: elog.h:46
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1127
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1175
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
FormData_pg_type * Form_pg_type
Definition: pg_type.h:261
#define elog(elevel,...)
Definition: elog.h:232
#define NameStr(name)
Definition: c.h:681
#define TransactionIdIsValid(xid)
Definition: transam.h:41
Oid getBaseType(Oid typid)
Definition: lsyscache.c:2468

◆ logicalrep_write_update()

void logicalrep_write_update ( StringInfo  out,
TransactionId  xid,
Relation  rel,
HeapTuple  oldtuple,
HeapTuple  newtuple,
bool  binary 
)

Definition at line 419 of file proto.c.

References Assert, LOGICAL_REP_MSG_UPDATE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

421 {
423 
424  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
425  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
426  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
427 
428  /* transaction ID (if not valid, we're not streaming) */
429  if (TransactionIdIsValid(xid))
430  pq_sendint32(out, xid);
431 
432  /* use Oid as relation identifier */
433  pq_sendint32(out, RelationGetRelid(rel));
434 
435  if (oldtuple != NULL)
436  {
437  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
438  pq_sendbyte(out, 'O'); /* old tuple follows */
439  else
440  pq_sendbyte(out, 'K'); /* old key follows */
441  logicalrep_write_tuple(out, rel, oldtuple, binary);
442  }
443 
444  pq_sendbyte(out, 'N'); /* new tuple follows */
445  logicalrep_write_tuple(out, rel, newtuple, binary);
446 }
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
Definition: proto.c:727
Form_pg_class rd_rel
Definition: rel.h:109
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:804
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:477