PostgreSQL Source Code  git master
logicalproto.h File Reference
#include "access/xact.h"
#include "replication/reorderbuffer.h"
#include "utils/rel.h"
Include dependency graph for logicalproto.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  LogicalRepTupleData
 
struct  LogicalRepRelation
 
struct  LogicalRepTyp
 
struct  LogicalRepBeginData
 
struct  LogicalRepCommitData
 
struct  LogicalRepPreparedTxnData
 
struct  LogicalRepCommitPreparedTxnData
 
struct  LogicalRepRollbackPreparedTxnData
 

Macros

#define LOGICALREP_PROTO_MIN_VERSION_NUM   1
 
#define LOGICALREP_PROTO_VERSION_NUM   1
 
#define LOGICALREP_PROTO_STREAM_VERSION_NUM   2
 
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM   3
 
#define LOGICALREP_PROTO_MAX_VERSION_NUM   LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
 
#define LOGICALREP_COLUMN_NULL   'n'
 
#define LOGICALREP_COLUMN_UNCHANGED   'u'
 
#define LOGICALREP_COLUMN_TEXT   't'
 
#define LOGICALREP_COLUMN_BINARY   'b' /* added in PG14 */
 

Typedefs

typedef enum LogicalRepMsgType LogicalRepMsgType
 
typedef struct LogicalRepTupleData LogicalRepTupleData
 
typedef uint32 LogicalRepRelId
 
typedef struct LogicalRepRelation LogicalRepRelation
 
typedef struct LogicalRepTyp LogicalRepTyp
 
typedef struct LogicalRepBeginData LogicalRepBeginData
 
typedef struct LogicalRepCommitData LogicalRepCommitData
 
typedef struct LogicalRepPreparedTxnData LogicalRepPreparedTxnData
 
typedef struct LogicalRepCommitPreparedTxnData LogicalRepCommitPreparedTxnData
 
typedef struct LogicalRepRollbackPreparedTxnData LogicalRepRollbackPreparedTxnData
 

Enumerations

enum  LogicalRepMsgType {
  LOGICAL_REP_MSG_BEGIN = 'B', LOGICAL_REP_MSG_COMMIT = 'C', LOGICAL_REP_MSG_ORIGIN = 'O', LOGICAL_REP_MSG_INSERT = 'I',
  LOGICAL_REP_MSG_UPDATE = 'U', LOGICAL_REP_MSG_DELETE = 'D', LOGICAL_REP_MSG_TRUNCATE = 'T', LOGICAL_REP_MSG_RELATION = 'R',
  LOGICAL_REP_MSG_TYPE = 'Y', LOGICAL_REP_MSG_MESSAGE = 'M', LOGICAL_REP_MSG_BEGIN_PREPARE = 'b', LOGICAL_REP_MSG_PREPARE = 'P',
  LOGICAL_REP_MSG_COMMIT_PREPARED = 'K', LOGICAL_REP_MSG_ROLLBACK_PREPARED = 'r', LOGICAL_REP_MSG_STREAM_START = 'S', LOGICAL_REP_MSG_STREAM_STOP = 'E',
  LOGICAL_REP_MSG_STREAM_COMMIT = 'c', LOGICAL_REP_MSG_STREAM_ABORT = 'A', LOGICAL_REP_MSG_STREAM_PREPARE = 'p'
}
 

Functions

void logicalrep_write_begin (StringInfo out, ReorderBufferTXN *txn)
 
void logicalrep_read_begin (StringInfo in, LogicalRepBeginData *begin_data)
 
void logicalrep_write_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
void logicalrep_read_commit (StringInfo in, LogicalRepCommitData *commit_data)
 
void logicalrep_write_begin_prepare (StringInfo out, ReorderBufferTXN *txn)
 
void logicalrep_read_begin_prepare (StringInfo in, LogicalRepPreparedTxnData *begin_data)
 
void logicalrep_write_prepare (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
void logicalrep_read_prepare (StringInfo in, LogicalRepPreparedTxnData *prepare_data)
 
void logicalrep_write_commit_prepared (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
void logicalrep_read_commit_prepared (StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
 
void logicalrep_write_rollback_prepared (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
void logicalrep_read_rollback_prepared (StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
 
void logicalrep_write_stream_prepare (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
void logicalrep_read_stream_prepare (StringInfo in, LogicalRepPreparedTxnData *prepare_data)
 
void logicalrep_write_origin (StringInfo out, const char *origin, XLogRecPtr origin_lsn)
 
char * logicalrep_read_origin (StringInfo in, XLogRecPtr *origin_lsn)
 
void logicalrep_write_insert (StringInfo out, TransactionId xid, Relation rel, HeapTuple newtuple, bool binary)
 
LogicalRepRelId logicalrep_read_insert (StringInfo in, LogicalRepTupleData *newtup)
 
void logicalrep_write_update (StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary)
 
LogicalRepRelId logicalrep_read_update (StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
 
void logicalrep_write_delete (StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, bool binary)
 
LogicalRepRelId logicalrep_read_delete (StringInfo in, LogicalRepTupleData *oldtup)
 
void logicalrep_write_truncate (StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
 
Listlogicalrep_read_truncate (StringInfo in, bool *cascade, bool *restart_seqs)
 
void logicalrep_write_message (StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
void logicalrep_write_rel (StringInfo out, TransactionId xid, Relation rel)
 
LogicalRepRelationlogicalrep_read_rel (StringInfo in)
 
void logicalrep_write_typ (StringInfo out, TransactionId xid, Oid typoid)
 
void logicalrep_read_typ (StringInfo out, LogicalRepTyp *ltyp)
 
void logicalrep_write_stream_start (StringInfo out, TransactionId xid, bool first_segment)
 
TransactionId logicalrep_read_stream_start (StringInfo in, bool *first_segment)
 
void logicalrep_write_stream_stop (StringInfo out)
 
void logicalrep_write_stream_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
TransactionId logicalrep_read_stream_commit (StringInfo out, LogicalRepCommitData *commit_data)
 
void logicalrep_write_stream_abort (StringInfo out, TransactionId xid, TransactionId subxid)
 
void logicalrep_read_stream_abort (StringInfo in, TransactionId *xid, TransactionId *subxid)
 
char * logicalrep_message_type (LogicalRepMsgType action)
 

Macro Definition Documentation

◆ LOGICALREP_COLUMN_BINARY

#define LOGICALREP_COLUMN_BINARY   'b' /* added in PG14 */

◆ LOGICALREP_COLUMN_NULL

#define LOGICALREP_COLUMN_NULL   'n'

Definition at line 90 of file logicalproto.h.

Referenced by logicalrep_read_tuple(), and logicalrep_write_tuple().

◆ LOGICALREP_COLUMN_TEXT

#define LOGICALREP_COLUMN_TEXT   't'

◆ LOGICALREP_COLUMN_UNCHANGED

#define LOGICALREP_COLUMN_UNCHANGED   'u'

◆ LOGICALREP_PROTO_MAX_VERSION_NUM

#define LOGICALREP_PROTO_MAX_VERSION_NUM   LOGICALREP_PROTO_TWOPHASE_VERSION_NUM

Definition at line 39 of file logicalproto.h.

Referenced by pgoutput_startup().

◆ LOGICALREP_PROTO_MIN_VERSION_NUM

#define LOGICALREP_PROTO_MIN_VERSION_NUM   1

Definition at line 35 of file logicalproto.h.

Referenced by pgoutput_startup().

◆ LOGICALREP_PROTO_STREAM_VERSION_NUM

#define LOGICALREP_PROTO_STREAM_VERSION_NUM   2

Definition at line 37 of file logicalproto.h.

Referenced by ApplyWorkerMain(), and pgoutput_startup().

◆ LOGICALREP_PROTO_TWOPHASE_VERSION_NUM

#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM   3

Definition at line 38 of file logicalproto.h.

Referenced by ApplyWorkerMain(), and pgoutput_startup().

◆ LOGICALREP_PROTO_VERSION_NUM

#define LOGICALREP_PROTO_VERSION_NUM   1

Definition at line 36 of file logicalproto.h.

Referenced by ApplyWorkerMain().

Typedef Documentation

◆ LogicalRepBeginData

◆ LogicalRepCommitData

◆ LogicalRepCommitPreparedTxnData

◆ LogicalRepMsgType

◆ LogicalRepPreparedTxnData

◆ LogicalRepRelation

◆ LogicalRepRelId

Definition at line 95 of file logicalproto.h.

◆ LogicalRepRollbackPreparedTxnData

◆ LogicalRepTupleData

◆ LogicalRepTyp

typedef struct LogicalRepTyp LogicalRepTyp

Enumeration Type Documentation

◆ LogicalRepMsgType

Enumerator
LOGICAL_REP_MSG_BEGIN 
LOGICAL_REP_MSG_COMMIT 
LOGICAL_REP_MSG_ORIGIN 
LOGICAL_REP_MSG_INSERT 
LOGICAL_REP_MSG_UPDATE 
LOGICAL_REP_MSG_DELETE 
LOGICAL_REP_MSG_TRUNCATE 
LOGICAL_REP_MSG_RELATION 
LOGICAL_REP_MSG_TYPE 
LOGICAL_REP_MSG_MESSAGE 
LOGICAL_REP_MSG_BEGIN_PREPARE 
LOGICAL_REP_MSG_PREPARE 
LOGICAL_REP_MSG_COMMIT_PREPARED 
LOGICAL_REP_MSG_ROLLBACK_PREPARED 
LOGICAL_REP_MSG_STREAM_START 
LOGICAL_REP_MSG_STREAM_STOP 
LOGICAL_REP_MSG_STREAM_COMMIT 
LOGICAL_REP_MSG_STREAM_ABORT 
LOGICAL_REP_MSG_STREAM_PREPARE 

Definition at line 51 of file logicalproto.h.

52 {
LogicalRepMsgType
Definition: logicalproto.h:51

Function Documentation

◆ logicalrep_message_type()

char* logicalrep_message_type ( LogicalRepMsgType  action)

Definition at line 1164 of file proto.c.

References elog, ERROR, LOGICAL_REP_MSG_BEGIN, LOGICAL_REP_MSG_BEGIN_PREPARE, LOGICAL_REP_MSG_COMMIT, LOGICAL_REP_MSG_COMMIT_PREPARED, LOGICAL_REP_MSG_DELETE, LOGICAL_REP_MSG_INSERT, LOGICAL_REP_MSG_MESSAGE, LOGICAL_REP_MSG_ORIGIN, LOGICAL_REP_MSG_PREPARE, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_ROLLBACK_PREPARED, LOGICAL_REP_MSG_STREAM_ABORT, LOGICAL_REP_MSG_STREAM_COMMIT, LOGICAL_REP_MSG_STREAM_PREPARE, LOGICAL_REP_MSG_STREAM_START, LOGICAL_REP_MSG_STREAM_STOP, LOGICAL_REP_MSG_TRUNCATE, LOGICAL_REP_MSG_TYPE, and LOGICAL_REP_MSG_UPDATE.

Referenced by apply_error_callback().

1165 {
1166  switch (action)
1167  {
1168  case LOGICAL_REP_MSG_BEGIN:
1169  return "BEGIN";
1171  return "COMMIT";
1173  return "ORIGIN";
1175  return "INSERT";
1177  return "UPDATE";
1179  return "DELETE";
1181  return "TRUNCATE";
1183  return "RELATION";
1184  case LOGICAL_REP_MSG_TYPE:
1185  return "TYPE";
1187  return "MESSAGE";
1189  return "BEGIN PREPARE";
1191  return "PREPARE";
1193  return "COMMIT PREPARED";
1195  return "ROLLBACK PREPARED";
1197  return "STREAM START";
1199  return "STREAM STOP";
1201  return "STREAM COMMIT";
1203  return "STREAM ABORT";
1205  return "STREAM PREPARE";
1206  }
1207 
1208  elog(ERROR, "invalid logical replication message type \"%c\"", action);
1209 
1210  return NULL; /* keep compiler quiet */
1211 }
#define ERROR
Definition: elog.h:46
#define elog(elevel,...)
Definition: elog.h:232

◆ logicalrep_read_begin()

void logicalrep_read_begin ( StringInfo  in,
LogicalRepBeginData begin_data 
)

Definition at line 60 of file proto.c.

References LogicalRepBeginData::committime, elog, ERROR, LogicalRepBeginData::final_lsn, InvalidXLogRecPtr, pq_getmsgint(), pq_getmsgint64(), and LogicalRepBeginData::xid.

Referenced by apply_handle_begin().

61 {
62  /* read fields */
63  begin_data->final_lsn = pq_getmsgint64(in);
64  if (begin_data->final_lsn == InvalidXLogRecPtr)
65  elog(ERROR, "final_lsn not set in begin message");
66  begin_data->committime = pq_getmsgint64(in);
67  begin_data->xid = pq_getmsgint(in, 4);
68 }
TransactionId xid
Definition: logicalproto.h:125
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define ERROR
Definition: elog.h:46
XLogRecPtr final_lsn
Definition: logicalproto.h:123
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
TimestampTz committime
Definition: logicalproto.h:124

◆ logicalrep_read_begin_prepare()

void logicalrep_read_begin_prepare ( StringInfo  in,
LogicalRepPreparedTxnData begin_data 
)

Definition at line 131 of file proto.c.

References elog, LogicalRepPreparedTxnData::end_lsn, ERROR, LogicalRepPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepPreparedTxnData::prepare_lsn, LogicalRepPreparedTxnData::prepare_time, strlcpy(), and LogicalRepPreparedTxnData::xid.

Referenced by apply_handle_begin_prepare().

132 {
133  /* read fields */
134  begin_data->prepare_lsn = pq_getmsgint64(in);
135  if (begin_data->prepare_lsn == InvalidXLogRecPtr)
136  elog(ERROR, "prepare_lsn not set in begin prepare message");
137  begin_data->end_lsn = pq_getmsgint64(in);
138  if (begin_data->end_lsn == InvalidXLogRecPtr)
139  elog(ERROR, "end_lsn not set in begin prepare message");
140  begin_data->prepare_time = pq_getmsgint64(in);
141  begin_data->xid = pq_getmsgint(in, 4);
142 
143  /* read gid (copy it into a pre-allocated buffer) */
144  strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
145 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
#define ERROR
Definition: elog.h:46
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_commit()

void logicalrep_read_commit ( StringInfo  in,
LogicalRepCommitData commit_data 
)

Definition at line 95 of file proto.c.

References LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, elog, LogicalRepCommitData::end_lsn, ERROR, pq_getmsgbyte(), and pq_getmsgint64().

Referenced by apply_handle_commit().

96 {
97  /* read flags (unused for now) */
98  uint8 flags = pq_getmsgbyte(in);
99 
100  if (flags != 0)
101  elog(ERROR, "unrecognized flags %u in commit message", flags);
102 
103  /* read fields */
104  commit_data->commit_lsn = pq_getmsgint64(in);
105  commit_data->end_lsn = pq_getmsgint64(in);
106  commit_data->committime = pq_getmsgint64(in);
107 }
unsigned char uint8
Definition: c.h:439
#define ERROR
Definition: elog.h:46
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
TimestampTz committime
Definition: logicalproto.h:132

◆ logicalrep_read_commit_prepared()

void logicalrep_read_commit_prepared ( StringInfo  in,
LogicalRepCommitPreparedTxnData prepare_data 
)

Definition at line 264 of file proto.c.

References LogicalRepCommitPreparedTxnData::commit_lsn, LogicalRepCommitPreparedTxnData::commit_time, elog, LogicalRepCommitPreparedTxnData::end_lsn, ERROR, LogicalRepCommitPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), strlcpy(), and LogicalRepCommitPreparedTxnData::xid.

Referenced by apply_handle_commit_prepared().

265 {
266  /* read flags */
267  uint8 flags = pq_getmsgbyte(in);
268 
269  if (flags != 0)
270  elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
271 
272  /* read fields */
273  prepare_data->commit_lsn = pq_getmsgint64(in);
274  if (prepare_data->commit_lsn == InvalidXLogRecPtr)
275  elog(ERROR, "commit_lsn is not set in commit prepared message");
276  prepare_data->end_lsn = pq_getmsgint64(in);
277  if (prepare_data->end_lsn == InvalidXLogRecPtr)
278  elog(ERROR, "end_lsn is not set in commit prepared message");
279  prepare_data->commit_time = pq_getmsgint64(in);
280  prepare_data->xid = pq_getmsgint(in, 4);
281 
282  /* read gid (copy it into a pre-allocated buffer) */
283  strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
284 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
unsigned char uint8
Definition: c.h:439
#define ERROR
Definition: elog.h:46
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_delete()

LogicalRepRelId logicalrep_read_delete ( StringInfo  in,
LogicalRepTupleData oldtup 
)

Definition at line 548 of file proto.c.

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_delete().

549 {
550  char action;
551  LogicalRepRelId relid;
552 
553  /* read the relation id */
554  relid = pq_getmsgint(in, 4);
555 
556  /* read and verify action */
557  action = pq_getmsgbyte(in);
558  if (action != 'K' && action != 'O')
559  elog(ERROR, "expected action 'O' or 'K', got %c", action);
560 
561  logicalrep_read_tuple(in, oldtup);
562 
563  return relid;
564 }
#define ERROR
Definition: elog.h:46
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:841
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:232
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
uint32 LogicalRepRelId
Definition: logicalproto.h:95

◆ logicalrep_read_insert()

LogicalRepRelId logicalrep_read_insert ( StringInfo  in,
LogicalRepTupleData newtup 
)

Definition at line 422 of file proto.c.

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_insert().

423 {
424  char action;
425  LogicalRepRelId relid;
426 
427  /* read the relation id */
428  relid = pq_getmsgint(in, 4);
429 
430  action = pq_getmsgbyte(in);
431  if (action != 'N')
432  elog(ERROR, "expected new tuple but got %d",
433  action);
434 
435  logicalrep_read_tuple(in, newtup);
436 
437  return relid;
438 }
#define ERROR
Definition: elog.h:46
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:841
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:232
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
uint32 LogicalRepRelId
Definition: logicalproto.h:95

◆ logicalrep_read_origin()

char* logicalrep_read_origin ( StringInfo  in,
XLogRecPtr origin_lsn 
)

Definition at line 387 of file proto.c.

References pq_getmsgint64(), pq_getmsgstring(), and pstrdup().

388 {
389  /* fixed fields */
390  *origin_lsn = pq_getmsgint64(in);
391 
392  /* return origin */
393  return pstrdup(pq_getmsgstring(in));
394 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * pstrdup(const char *in)
Definition: mcxt.c:1299
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455

◆ logicalrep_read_prepare()

void logicalrep_read_prepare ( StringInfo  in,
LogicalRepPreparedTxnData prepare_data 
)

Definition at line 225 of file proto.c.

References logicalrep_read_prepare_common().

Referenced by apply_handle_prepare().

226 {
227  logicalrep_read_prepare_common(in, "prepare", prepare_data);
228 }
static void logicalrep_read_prepare_common(StringInfo in, char *msgtype, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:196

◆ logicalrep_read_rel()

LogicalRepRelation* logicalrep_read_rel ( StringInfo  in)

Definition at line 683 of file proto.c.

References logicalrep_read_attrs(), logicalrep_read_namespace(), LogicalRepRelation::nspname, palloc(), pq_getmsgbyte(), pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepRelation::relname, LogicalRepRelation::remoteid, and LogicalRepRelation::replident.

Referenced by apply_handle_relation().

684 {
686 
687  rel->remoteid = pq_getmsgint(in, 4);
688 
689  /* Read relation name from stream */
691  rel->relname = pstrdup(pq_getmsgstring(in));
692 
693  /* Read the replica identity. */
694  rel->replident = pq_getmsgbyte(in);
695 
696  /* Get attribute description */
697  logicalrep_read_attrs(in, rel);
698 
699  return rel;
700 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * pstrdup(const char *in)
Definition: mcxt.c:1299
LogicalRepRelId remoteid
Definition: logicalproto.h:101
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
Definition: proto.c:965
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:1027
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
void * palloc(Size size)
Definition: mcxt.c:1062
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_rollback_prepared()

void logicalrep_read_rollback_prepared ( StringInfo  in,
LogicalRepRollbackPreparedTxnData rollback_data 
)

Definition at line 322 of file proto.c.

References elog, ERROR, LogicalRepRollbackPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepRollbackPreparedTxnData::prepare_end_lsn, LogicalRepRollbackPreparedTxnData::prepare_time, LogicalRepRollbackPreparedTxnData::rollback_end_lsn, LogicalRepRollbackPreparedTxnData::rollback_time, strlcpy(), and LogicalRepRollbackPreparedTxnData::xid.

Referenced by apply_handle_rollback_prepared().

324 {
325  /* read flags */
326  uint8 flags = pq_getmsgbyte(in);
327 
328  if (flags != 0)
329  elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
330 
331  /* read fields */
332  rollback_data->prepare_end_lsn = pq_getmsgint64(in);
333  if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
334  elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
335  rollback_data->rollback_end_lsn = pq_getmsgint64(in);
336  if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
337  elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
338  rollback_data->prepare_time = pq_getmsgint64(in);
339  rollback_data->rollback_time = pq_getmsgint64(in);
340  rollback_data->xid = pq_getmsgint(in, 4);
341 
342  /* read gid (copy it into a pre-allocated buffer) */
343  strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
344 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
unsigned char uint8
Definition: c.h:439
#define ERROR
Definition: elog.h:46
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_stream_abort()

void logicalrep_read_stream_abort ( StringInfo  in,
TransactionId xid,
TransactionId subxid 
)

Definition at line 1151 of file proto.c.

References Assert, and pq_getmsgint().

Referenced by apply_handle_stream_abort().

1153 {
1154  Assert(xid && subxid);
1155 
1156  *xid = pq_getmsgint(in, 4);
1157  *subxid = pq_getmsgint(in, 4);
1158 }
#define Assert(condition)
Definition: c.h:804
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_stream_commit()

TransactionId logicalrep_read_stream_commit ( StringInfo  out,
LogicalRepCommitData commit_data 
)

Definition at line 1109 of file proto.c.

References LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, elog, LogicalRepCommitData::end_lsn, ERROR, pq_getmsgbyte(), pq_getmsgint(), and pq_getmsgint64().

Referenced by apply_handle_stream_commit().

1110 {
1111  TransactionId xid;
1112  uint8 flags;
1113 
1114  xid = pq_getmsgint(in, 4);
1115 
1116  /* read flags (unused for now) */
1117  flags = pq_getmsgbyte(in);
1118 
1119  if (flags != 0)
1120  elog(ERROR, "unrecognized flags %u in commit message", flags);
1121 
1122  /* read fields */
1123  commit_data->commit_lsn = pq_getmsgint64(in);
1124  commit_data->end_lsn = pq_getmsgint64(in);
1125  commit_data->committime = pq_getmsgint64(in);
1126 
1127  return xid;
1128 }
uint32 TransactionId
Definition: c.h:587
unsigned char uint8
Definition: c.h:439
#define ERROR
Definition: elog.h:46
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
TimestampTz committime
Definition: logicalproto.h:132

◆ logicalrep_read_stream_prepare()

void logicalrep_read_stream_prepare ( StringInfo  in,
LogicalRepPreparedTxnData prepare_data 
)

Definition at line 362 of file proto.c.

References logicalrep_read_prepare_common().

Referenced by apply_handle_stream_prepare().

363 {
364  logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
365 }
static void logicalrep_read_prepare_common(StringInfo in, char *msgtype, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:196

◆ logicalrep_read_stream_start()

TransactionId logicalrep_read_stream_start ( StringInfo  in,
bool first_segment 
)

Definition at line 1059 of file proto.c.

References Assert, pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_stream_start().

1060 {
1061  TransactionId xid;
1062 
1063  Assert(first_segment);
1064 
1065  xid = pq_getmsgint(in, 4);
1066  *first_segment = (pq_getmsgbyte(in) == 1);
1067 
1068  return xid;
1069 }
uint32 TransactionId
Definition: c.h:587
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define Assert(condition)
Definition: c.h:804
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_truncate()

List* logicalrep_read_truncate ( StringInfo  in,
bool cascade,
bool restart_seqs 
)

Definition at line 602 of file proto.c.

References i, lappend_oid(), NIL, pq_getmsgint(), TRUNCATE_CASCADE, and TRUNCATE_RESTART_SEQS.

Referenced by apply_handle_truncate().

604 {
605  int i;
606  int nrelids;
607  List *relids = NIL;
608  uint8 flags;
609 
610  nrelids = pq_getmsgint(in, 4);
611 
612  /* read and decode truncate flags */
613  flags = pq_getmsgint(in, 1);
614  *cascade = (flags & TRUNCATE_CASCADE) > 0;
615  *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
616 
617  for (i = 0; i < nrelids; i++)
618  relids = lappend_oid(relids, pq_getmsgint(in, 4));
619 
620  return relids;
621 }
#define NIL
Definition: pg_list.h:65
unsigned char uint8
Definition: c.h:439
List * lappend_oid(List *list, Oid datum)
Definition: list.c:372
int i
#define TRUNCATE_CASCADE
Definition: proto.c:29
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
Definition: pg_list.h:50
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:30

◆ logicalrep_read_typ()

void logicalrep_read_typ ( StringInfo  out,
LogicalRepTyp ltyp 
)

Definition at line 739 of file proto.c.

References logicalrep_read_namespace(), LogicalRepTyp::nspname, pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepTyp::remoteid, and LogicalRepTyp::typname.

Referenced by apply_handle_type().

740 {
741  ltyp->remoteid = pq_getmsgint(in, 4);
742 
743  /* Read type name from stream */
745  ltyp->typname = pstrdup(pq_getmsgstring(in));
746 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * pstrdup(const char *in)
Definition: mcxt.c:1299
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:1027
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_update()

LogicalRepRelId logicalrep_read_update ( StringInfo  in,
bool has_oldtuple,
LogicalRepTupleData oldtup,
LogicalRepTupleData newtup 
)

Definition at line 477 of file proto.c.

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_update().

480 {
481  char action;
482  LogicalRepRelId relid;
483 
484  /* read the relation id */
485  relid = pq_getmsgint(in, 4);
486 
487  /* read and verify action */
488  action = pq_getmsgbyte(in);
489  if (action != 'K' && action != 'O' && action != 'N')
490  elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
491  action);
492 
493  /* check for old tuple */
494  if (action == 'K' || action == 'O')
495  {
496  logicalrep_read_tuple(in, oldtup);
497  *has_oldtuple = true;
498 
499  action = pq_getmsgbyte(in);
500  }
501  else
502  *has_oldtuple = false;
503 
504  /* check for new tuple */
505  if (action != 'N')
506  elog(ERROR, "expected action 'N', got %c",
507  action);
508 
509  logicalrep_read_tuple(in, newtup);
510 
511  return relid;
512 }
#define ERROR
Definition: elog.h:46
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:841
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:232
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
uint32 LogicalRepRelId
Definition: logicalproto.h:95

◆ logicalrep_write_begin()

void logicalrep_write_begin ( StringInfo  out,
ReorderBufferTXN txn 
)

Definition at line 46 of file proto.c.

References ReorderBufferTXN::commit_time, ReorderBufferTXN::final_lsn, LOGICAL_REP_MSG_BEGIN, pq_sendbyte(), pq_sendint32(), pq_sendint64(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_begin_txn().

47 {
49 
50  /* fixed fields */
51  pq_sendint64(out, txn->final_lsn);
53  pq_sendint32(out, txn->xid);
54 }
TimestampTz commit_time
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
XLogRecPtr final_lsn
TransactionId xid
union ReorderBufferTXN::@103 xact_time

◆ logicalrep_write_begin_prepare()

void logicalrep_write_begin_prepare ( StringInfo  out,
ReorderBufferTXN txn 
)

Definition at line 113 of file proto.c.

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_BEGIN_PREPARE, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::prepare_time, ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_begin_prepare_txn().

114 {
116 
117  /* fixed fields */
118  pq_sendint64(out, txn->final_lsn);
119  pq_sendint64(out, txn->end_lsn);
121  pq_sendint32(out, txn->xid);
122 
123  /* send gid */
124  pq_sendstring(out, txn->gid);
125 }
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
XLogRecPtr final_lsn
TransactionId xid
union ReorderBufferTXN::@103 xact_time
XLogRecPtr end_lsn
TimestampTz prepare_time

◆ logicalrep_write_commit()

void logicalrep_write_commit ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 75 of file proto.c.

References ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, LOGICAL_REP_MSG_COMMIT, pq_sendbyte(), pq_sendint64(), and ReorderBufferTXN::xact_time.

Referenced by pgoutput_commit_txn().

77 {
78  uint8 flags = 0;
79 
81 
82  /* send the flags field (unused for now) */
83  pq_sendbyte(out, flags);
84 
85  /* send fields */
86  pq_sendint64(out, commit_lsn);
87  pq_sendint64(out, txn->end_lsn);
89 }
TimestampTz commit_time
unsigned char uint8
Definition: c.h:439
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
union ReorderBufferTXN::@103 xact_time
XLogRecPtr end_lsn

◆ logicalrep_write_commit_prepared()

void logicalrep_write_commit_prepared ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 234 of file proto.c.

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_COMMIT_PREPARED, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_commit_prepared_txn().

236 {
237  uint8 flags = 0;
238 
240 
241  /*
242  * This should only ever happen for two-phase commit transactions, in
243  * which case we expect to have a valid GID.
244  */
245  Assert(txn->gid != NULL);
246 
247  /* send the flags field */
248  pq_sendbyte(out, flags);
249 
250  /* send fields */
251  pq_sendint64(out, commit_lsn);
252  pq_sendint64(out, txn->end_lsn);
254  pq_sendint32(out, txn->xid);
255 
256  /* send gid */
257  pq_sendstring(out, txn->gid);
258 }
TimestampTz commit_time
unsigned char uint8
Definition: c.h:439
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
TransactionId xid
union ReorderBufferTXN::@103 xact_time
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn

◆ logicalrep_write_delete()

void logicalrep_write_delete ( StringInfo  out,
TransactionId  xid,
Relation  rel,
HeapTuple  oldtuple,
bool  binary 
)

Definition at line 518 of file proto.c.

References Assert, LOGICAL_REP_MSG_DELETE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

520 {
521  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
522  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
523  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
524 
526 
527  /* transaction ID (if not valid, we're not streaming) */
528  if (TransactionIdIsValid(xid))
529  pq_sendint32(out, xid);
530 
531  /* use Oid as relation identifier */
532  pq_sendint32(out, RelationGetRelid(rel));
533 
534  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
535  pq_sendbyte(out, 'O'); /* old tuple follows */
536  else
537  pq_sendbyte(out, 'K'); /* old key follows */
538 
539  logicalrep_write_tuple(out, rel, oldtuple, binary);
540 }
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
Definition: proto.c:752
Form_pg_class rd_rel
Definition: rel.h:109
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:804
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:477

◆ logicalrep_write_insert()

void logicalrep_write_insert ( StringInfo  out,
TransactionId  xid,
Relation  rel,
HeapTuple  newtuple,
bool  binary 
)

Definition at line 400 of file proto.c.

References LOGICAL_REP_MSG_INSERT, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

402 {
404 
405  /* transaction ID (if not valid, we're not streaming) */
406  if (TransactionIdIsValid(xid))
407  pq_sendint32(out, xid);
408 
409  /* use Oid as relation identifier */
410  pq_sendint32(out, RelationGetRelid(rel));
411 
412  pq_sendbyte(out, 'N'); /* new tuple follows */
413  logicalrep_write_tuple(out, rel, newtuple, binary);
414 }
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
Definition: proto.c:752
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:477

◆ logicalrep_write_message()

void logicalrep_write_message ( StringInfo  out,
TransactionId  xid,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)

Definition at line 627 of file proto.c.

References LOGICAL_REP_MSG_MESSAGE, MESSAGE_TRANSACTIONAL, pq_sendbyte(), pq_sendbytes(), pq_sendint32(), pq_sendint64(), pq_sendint8(), pq_sendstring(), and TransactionIdIsValid.

Referenced by pgoutput_message().

630 {
631  uint8 flags = 0;
632 
634 
635  /* encode and send message flags */
636  if (transactional)
637  flags |= MESSAGE_TRANSACTIONAL;
638 
639  /* transaction ID (if not valid, we're not streaming) */
640  if (TransactionIdIsValid(xid))
641  pq_sendint32(out, xid);
642 
643  pq_sendint8(out, flags);
644  pq_sendint64(out, lsn);
645  pq_sendstring(out, prefix);
646  pq_sendint32(out, sz);
647  pq_sendbytes(out, message, sz);
648 }
unsigned char uint8
Definition: c.h:439
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define MESSAGE_TRANSACTIONAL
Definition: proto.c:28
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static void pq_sendint8(StringInfo buf, uint8 i)
Definition: pqformat.h:129

◆ logicalrep_write_origin()

void logicalrep_write_origin ( StringInfo  out,
const char *  origin,
XLogRecPtr  origin_lsn 
)

Definition at line 371 of file proto.c.

References LOGICAL_REP_MSG_ORIGIN, pq_sendbyte(), pq_sendint64(), and pq_sendstring().

Referenced by send_repl_origin().

373 {
375 
376  /* fixed fields */
377  pq_sendint64(out, origin_lsn);
378 
379  /* origin string */
380  pq_sendstring(out, origin);
381 }
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161

◆ logicalrep_write_prepare()

void logicalrep_write_prepare ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)

Definition at line 184 of file proto.c.

References LOGICAL_REP_MSG_PREPARE, and logicalrep_write_prepare_common().

Referenced by pgoutput_prepare_txn().

186 {
188  txn, prepare_lsn);
189 }
static void logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:152

◆ logicalrep_write_rel()

void logicalrep_write_rel ( StringInfo  out,
TransactionId  xid,
Relation  rel 
)

Definition at line 654 of file proto.c.

References LOGICAL_REP_MSG_RELATION, logicalrep_write_attrs(), logicalrep_write_namespace(), pq_sendbyte(), pq_sendint32(), pq_sendstring(), RelationData::rd_rel, RelationGetNamespace, RelationGetRelationName, RelationGetRelid, relname, and TransactionIdIsValid.

Referenced by send_relation_and_attrs().

655 {
656  char *relname;
657 
659 
660  /* transaction ID (if not valid, we're not streaming) */
661  if (TransactionIdIsValid(xid))
662  pq_sendint32(out, xid);
663 
664  /* use Oid as relation identifier */
665  pq_sendint32(out, RelationGetRelid(rel));
666 
667  /* send qualified relation name */
669  relname = RelationGetRelationName(rel);
670  pq_sendstring(out, relname);
671 
672  /* send replica identity */
673  pq_sendbyte(out, rel->rd_rel->relreplident);
674 
675  /* send the attribute info */
676  logicalrep_write_attrs(out, rel);
677 }
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:1007
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
Form_pg_class rd_rel
Definition: rel.h:109
NameData relname
Definition: pg_class.h:38
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static void logicalrep_write_attrs(StringInfo out, Relation rel)
Definition: proto.c:907
#define RelationGetRelationName(relation)
Definition: rel.h:511
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:477
#define RelationGetNamespace(relation)
Definition: rel.h:518

◆ logicalrep_write_rollback_prepared()

void logicalrep_write_rollback_prepared ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  prepare_end_lsn,
TimestampTz  prepare_time 
)

Definition at line 290 of file proto.c.

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_ROLLBACK_PREPARED, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_rollback_prepared_txn().

293 {
294  uint8 flags = 0;
295 
297 
298  /*
299  * This should only ever happen for two-phase commit transactions, in
300  * which case we expect to have a valid GID.
301  */
302  Assert(txn->gid != NULL);
303 
304  /* send the flags field */
305  pq_sendbyte(out, flags);
306 
307  /* send fields */
308  pq_sendint64(out, prepare_end_lsn);
309  pq_sendint64(out, txn->end_lsn);
310  pq_sendint64(out, prepare_time);
312  pq_sendint32(out, txn->xid);
313 
314  /* send gid */
315  pq_sendstring(out, txn->gid);
316 }
TimestampTz commit_time
unsigned char uint8
Definition: c.h:439
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
TransactionId xid
union ReorderBufferTXN::@103 xact_time
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn

◆ logicalrep_write_stream_abort()

void logicalrep_write_stream_abort ( StringInfo  out,
TransactionId  xid,
TransactionId  subxid 
)

Definition at line 1135 of file proto.c.

References Assert, LOGICAL_REP_MSG_STREAM_ABORT, pq_sendbyte(), pq_sendint32(), and TransactionIdIsValid.

Referenced by pgoutput_stream_abort().

1137 {
1139 
1141 
1142  /* transaction ID */
1143  pq_sendint32(out, xid);
1144  pq_sendint32(out, subxid);
1145 }
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:804
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ logicalrep_write_stream_commit()

void logicalrep_write_stream_commit ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 1084 of file proto.c.

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, LOGICAL_REP_MSG_STREAM_COMMIT, pq_sendbyte(), pq_sendint32(), pq_sendint64(), TransactionIdIsValid, ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_stream_commit().

1086 {
1087  uint8 flags = 0;
1088 
1090 
1092 
1093  /* transaction ID */
1094  pq_sendint32(out, txn->xid);
1095 
1096  /* send the flags field (unused for now) */
1097  pq_sendbyte(out, flags);
1098 
1099  /* send fields */
1100  pq_sendint64(out, commit_lsn);
1101  pq_sendint64(out, txn->end_lsn);
1102  pq_sendint64(out, txn->xact_time.commit_time);
1103 }
TimestampTz commit_time
unsigned char uint8
Definition: c.h:439
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
TransactionId xid
union ReorderBufferTXN::@103 xact_time
#define Assert(condition)
Definition: c.h:804
XLogRecPtr end_lsn
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ logicalrep_write_stream_prepare()

void logicalrep_write_stream_prepare ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)

Definition at line 350 of file proto.c.

References LOGICAL_REP_MSG_STREAM_PREPARE, and logicalrep_write_prepare_common().

Referenced by pgoutput_stream_prepare_txn().

353 {
355  txn, prepare_lsn);
356 }
static void logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:152

◆ logicalrep_write_stream_start()

void logicalrep_write_stream_start ( StringInfo  out,
TransactionId  xid,
bool  first_segment 
)

Definition at line 1041 of file proto.c.

References Assert, LOGICAL_REP_MSG_STREAM_START, pq_sendbyte(), pq_sendint32(), and TransactionIdIsValid.

Referenced by pgoutput_stream_start().

1043 {
1045 
1047 
1048  /* transaction ID (we're starting to stream, so must be valid) */
1049  pq_sendint32(out, xid);
1050 
1051  /* 1 if this is the first streaming segment for this xid */
1052  pq_sendbyte(out, first_segment ? 1 : 0);
1053 }
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:804
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ logicalrep_write_stream_stop()

void logicalrep_write_stream_stop ( StringInfo  out)

Definition at line 1075 of file proto.c.

References LOGICAL_REP_MSG_STREAM_STOP, and pq_sendbyte().

Referenced by pgoutput_stream_stop().

1076 {
1078 }
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161

◆ logicalrep_write_truncate()

void logicalrep_write_truncate ( StringInfo  out,
TransactionId  xid,
int  nrelids,
Oid  relids[],
bool  cascade,
bool  restart_seqs 
)

Definition at line 570 of file proto.c.

References i, LOGICAL_REP_MSG_TRUNCATE, pq_sendbyte(), pq_sendint32(), pq_sendint8(), TransactionIdIsValid, TRUNCATE_CASCADE, and TRUNCATE_RESTART_SEQS.

Referenced by pgoutput_truncate().

575 {
576  int i;
577  uint8 flags = 0;
578 
580 
581  /* transaction ID (if not valid, we're not streaming) */
582  if (TransactionIdIsValid(xid))
583  pq_sendint32(out, xid);
584 
585  pq_sendint32(out, nrelids);
586 
587  /* encode and send truncate flags */
588  if (cascade)
589  flags |= TRUNCATE_CASCADE;
590  if (restart_seqs)
591  flags |= TRUNCATE_RESTART_SEQS;
592  pq_sendint8(out, flags);
593 
594  for (i = 0; i < nrelids; i++)
595  pq_sendint32(out, relids[i]);
596 }
unsigned char uint8
Definition: c.h:439
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
int i
#define TRUNCATE_CASCADE
Definition: proto.c:29
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static void pq_sendint8(StringInfo buf, uint8 i)
Definition: pqformat.h:129
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:30

◆ logicalrep_write_typ()

void logicalrep_write_typ ( StringInfo  out,
TransactionId  xid,
Oid  typoid 
)

Definition at line 708 of file proto.c.

References elog, ERROR, getBaseType(), GETSTRUCT, HeapTupleIsValid, LOGICAL_REP_MSG_TYPE, logicalrep_write_namespace(), NameStr, ObjectIdGetDatum, pq_sendbyte(), pq_sendint32(), pq_sendstring(), ReleaseSysCache(), SearchSysCache1(), TransactionIdIsValid, and TYPEOID.

Referenced by send_relation_and_attrs().

709 {
710  Oid basetypoid = getBaseType(typoid);
711  HeapTuple tup;
712  Form_pg_type typtup;
713 
715 
716  /* transaction ID (if not valid, we're not streaming) */
717  if (TransactionIdIsValid(xid))
718  pq_sendint32(out, xid);
719 
720  tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
721  if (!HeapTupleIsValid(tup))
722  elog(ERROR, "cache lookup failed for type %u", basetypoid);
723  typtup = (Form_pg_type) GETSTRUCT(tup);
724 
725  /* use Oid as relation identifier */
726  pq_sendint32(out, typoid);
727 
728  /* send qualified type name */
729  logicalrep_write_namespace(out, typtup->typnamespace);
730  pq_sendstring(out, NameStr(typtup->typname));
731 
732  ReleaseSysCache(tup);
733 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:654
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:1007
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
unsigned int Oid
Definition: postgres_ext.h:31
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define ERROR
Definition: elog.h:46
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1127
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1175
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
FormData_pg_type * Form_pg_type
Definition: pg_type.h:261
#define elog(elevel,...)
Definition: elog.h:232
#define NameStr(name)
Definition: c.h:681
#define TransactionIdIsValid(xid)
Definition: transam.h:41
Oid getBaseType(Oid typid)
Definition: lsyscache.c:2468

◆ logicalrep_write_update()

void logicalrep_write_update ( StringInfo  out,
TransactionId  xid,
Relation  rel,
HeapTuple  oldtuple,
HeapTuple  newtuple,
bool  binary 
)

Definition at line 444 of file proto.c.

References Assert, LOGICAL_REP_MSG_UPDATE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

446 {
448 
449  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
450  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
451  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
452 
453  /* transaction ID (if not valid, we're not streaming) */
454  if (TransactionIdIsValid(xid))
455  pq_sendint32(out, xid);
456 
457  /* use Oid as relation identifier */
458  pq_sendint32(out, RelationGetRelid(rel));
459 
460  if (oldtuple != NULL)
461  {
462  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
463  pq_sendbyte(out, 'O'); /* old tuple follows */
464  else
465  pq_sendbyte(out, 'K'); /* old key follows */
466  logicalrep_write_tuple(out, rel, oldtuple, binary);
467  }
468 
469  pq_sendbyte(out, 'N'); /* new tuple follows */
470  logicalrep_write_tuple(out, rel, newtuple, binary);
471 }
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
Definition: proto.c:752
Form_pg_class rd_rel
Definition: rel.h:109
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:804
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:477