PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
logicalproto.h File Reference
#include "access/xact.h"
#include "executor/tuptable.h"
#include "replication/reorderbuffer.h"
#include "utils/rel.h"
Include dependency graph for logicalproto.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  LogicalRepTupleData
 
struct  LogicalRepRelation
 
struct  LogicalRepTyp
 
struct  LogicalRepBeginData
 
struct  LogicalRepCommitData
 
struct  LogicalRepPreparedTxnData
 
struct  LogicalRepCommitPreparedTxnData
 
struct  LogicalRepRollbackPreparedTxnData
 
struct  LogicalRepStreamAbortData
 

Macros

#define LOGICALREP_PROTO_MIN_VERSION_NUM   1
 
#define LOGICALREP_PROTO_VERSION_NUM   1
 
#define LOGICALREP_PROTO_STREAM_VERSION_NUM   2
 
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM   3
 
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM   4
 
#define LOGICALREP_PROTO_MAX_VERSION_NUM   LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
 
#define LOGICALREP_COLUMN_NULL   'n'
 
#define LOGICALREP_COLUMN_UNCHANGED   'u'
 
#define LOGICALREP_COLUMN_TEXT   't'
 
#define LOGICALREP_COLUMN_BINARY   'b' /* added in PG14 */
 

Typedefs

typedef enum LogicalRepMsgType LogicalRepMsgType
 
typedef struct LogicalRepTupleData LogicalRepTupleData
 
typedef uint32 LogicalRepRelId
 
typedef struct LogicalRepRelation LogicalRepRelation
 
typedef struct LogicalRepTyp LogicalRepTyp
 
typedef struct LogicalRepBeginData LogicalRepBeginData
 
typedef struct LogicalRepCommitData LogicalRepCommitData
 
typedef struct LogicalRepPreparedTxnData LogicalRepPreparedTxnData
 
typedef struct LogicalRepCommitPreparedTxnData LogicalRepCommitPreparedTxnData
 
typedef struct LogicalRepRollbackPreparedTxnData LogicalRepRollbackPreparedTxnData
 
typedef struct LogicalRepStreamAbortData LogicalRepStreamAbortData
 

Enumerations

enum  LogicalRepMsgType {
  LOGICAL_REP_MSG_BEGIN = 'B' , LOGICAL_REP_MSG_COMMIT = 'C' , LOGICAL_REP_MSG_ORIGIN = 'O' , LOGICAL_REP_MSG_INSERT = 'I' ,
  LOGICAL_REP_MSG_UPDATE = 'U' , LOGICAL_REP_MSG_DELETE = 'D' , LOGICAL_REP_MSG_TRUNCATE = 'T' , LOGICAL_REP_MSG_RELATION = 'R' ,
  LOGICAL_REP_MSG_TYPE = 'Y' , LOGICAL_REP_MSG_MESSAGE = 'M' , LOGICAL_REP_MSG_BEGIN_PREPARE = 'b' , LOGICAL_REP_MSG_PREPARE = 'P' ,
  LOGICAL_REP_MSG_COMMIT_PREPARED = 'K' , LOGICAL_REP_MSG_ROLLBACK_PREPARED = 'r' , LOGICAL_REP_MSG_STREAM_START = 'S' , LOGICAL_REP_MSG_STREAM_STOP = 'E' ,
  LOGICAL_REP_MSG_STREAM_COMMIT = 'c' , LOGICAL_REP_MSG_STREAM_ABORT = 'A' , LOGICAL_REP_MSG_STREAM_PREPARE = 'p'
}
 

Functions

void logicalrep_write_begin (StringInfo out, ReorderBufferTXN *txn)
 
void logicalrep_read_begin (StringInfo in, LogicalRepBeginData *begin_data)
 
void logicalrep_write_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
void logicalrep_read_commit (StringInfo in, LogicalRepCommitData *commit_data)
 
void logicalrep_write_begin_prepare (StringInfo out, ReorderBufferTXN *txn)
 
void logicalrep_read_begin_prepare (StringInfo in, LogicalRepPreparedTxnData *begin_data)
 
void logicalrep_write_prepare (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
void logicalrep_read_prepare (StringInfo in, LogicalRepPreparedTxnData *prepare_data)
 
void logicalrep_write_commit_prepared (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
void logicalrep_read_commit_prepared (StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
 
void logicalrep_write_rollback_prepared (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
void logicalrep_read_rollback_prepared (StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
 
void logicalrep_write_stream_prepare (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
void logicalrep_read_stream_prepare (StringInfo in, LogicalRepPreparedTxnData *prepare_data)
 
void logicalrep_write_origin (StringInfo out, const char *origin, XLogRecPtr origin_lsn)
 
char * logicalrep_read_origin (StringInfo in, XLogRecPtr *origin_lsn)
 
void logicalrep_write_insert (StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns, bool include_gencols)
 
LogicalRepRelId logicalrep_read_insert (StringInfo in, LogicalRepTupleData *newtup)
 
void logicalrep_write_update (StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns, bool include_gencols)
 
LogicalRepRelId logicalrep_read_update (StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
 
void logicalrep_write_delete (StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns, bool include_gencols)
 
LogicalRepRelId logicalrep_read_delete (StringInfo in, LogicalRepTupleData *oldtup)
 
void logicalrep_write_truncate (StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
 
Listlogicalrep_read_truncate (StringInfo in, bool *cascade, bool *restart_seqs)
 
void logicalrep_write_message (StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
void logicalrep_write_rel (StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns, bool include_gencols)
 
LogicalRepRelationlogicalrep_read_rel (StringInfo in)
 
void logicalrep_write_typ (StringInfo out, TransactionId xid, Oid typoid)
 
void logicalrep_read_typ (StringInfo in, LogicalRepTyp *ltyp)
 
void logicalrep_write_stream_start (StringInfo out, TransactionId xid, bool first_segment)
 
TransactionId logicalrep_read_stream_start (StringInfo in, bool *first_segment)
 
void logicalrep_write_stream_stop (StringInfo out)
 
void logicalrep_write_stream_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
TransactionId logicalrep_read_stream_commit (StringInfo in, LogicalRepCommitData *commit_data)
 
void logicalrep_write_stream_abort (StringInfo out, TransactionId xid, TransactionId subxid, XLogRecPtr abort_lsn, TimestampTz abort_time, bool write_abort_info)
 
void logicalrep_read_stream_abort (StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
 
const char * logicalrep_message_type (LogicalRepMsgType action)
 
bool logicalrep_should_publish_column (Form_pg_attribute att, Bitmapset *columns, bool include_gencols)
 

Macro Definition Documentation

◆ LOGICALREP_COLUMN_BINARY

#define LOGICALREP_COLUMN_BINARY   'b' /* added in PG14 */

Definition at line 99 of file logicalproto.h.

◆ LOGICALREP_COLUMN_NULL

#define LOGICALREP_COLUMN_NULL   'n'

Definition at line 96 of file logicalproto.h.

◆ LOGICALREP_COLUMN_TEXT

#define LOGICALREP_COLUMN_TEXT   't'

Definition at line 98 of file logicalproto.h.

◆ LOGICALREP_COLUMN_UNCHANGED

#define LOGICALREP_COLUMN_UNCHANGED   'u'

Definition at line 97 of file logicalproto.h.

◆ LOGICALREP_PROTO_MAX_VERSION_NUM

#define LOGICALREP_PROTO_MAX_VERSION_NUM   LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM

Definition at line 45 of file logicalproto.h.

◆ LOGICALREP_PROTO_MIN_VERSION_NUM

#define LOGICALREP_PROTO_MIN_VERSION_NUM   1

Definition at line 40 of file logicalproto.h.

◆ LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM

#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM   4

Definition at line 44 of file logicalproto.h.

◆ LOGICALREP_PROTO_STREAM_VERSION_NUM

#define LOGICALREP_PROTO_STREAM_VERSION_NUM   2

Definition at line 42 of file logicalproto.h.

◆ LOGICALREP_PROTO_TWOPHASE_VERSION_NUM

#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM   3

Definition at line 43 of file logicalproto.h.

◆ LOGICALREP_PROTO_VERSION_NUM

#define LOGICALREP_PROTO_VERSION_NUM   1

Definition at line 41 of file logicalproto.h.

Typedef Documentation

◆ LogicalRepBeginData

◆ LogicalRepCommitData

◆ LogicalRepCommitPreparedTxnData

◆ LogicalRepMsgType

◆ LogicalRepPreparedTxnData

◆ LogicalRepRelation

◆ LogicalRepRelId

Definition at line 101 of file logicalproto.h.

◆ LogicalRepRollbackPreparedTxnData

◆ LogicalRepStreamAbortData

◆ LogicalRepTupleData

◆ LogicalRepTyp

typedef struct LogicalRepTyp LogicalRepTyp

Enumeration Type Documentation

◆ LogicalRepMsgType

Enumerator
LOGICAL_REP_MSG_BEGIN 
LOGICAL_REP_MSG_COMMIT 
LOGICAL_REP_MSG_ORIGIN 
LOGICAL_REP_MSG_INSERT 
LOGICAL_REP_MSG_UPDATE 
LOGICAL_REP_MSG_DELETE 
LOGICAL_REP_MSG_TRUNCATE 
LOGICAL_REP_MSG_RELATION 
LOGICAL_REP_MSG_TYPE 
LOGICAL_REP_MSG_MESSAGE 
LOGICAL_REP_MSG_BEGIN_PREPARE 
LOGICAL_REP_MSG_PREPARE 
LOGICAL_REP_MSG_COMMIT_PREPARED 
LOGICAL_REP_MSG_ROLLBACK_PREPARED 
LOGICAL_REP_MSG_STREAM_START 
LOGICAL_REP_MSG_STREAM_STOP 
LOGICAL_REP_MSG_STREAM_COMMIT 
LOGICAL_REP_MSG_STREAM_ABORT 
LOGICAL_REP_MSG_STREAM_PREPARE 

Definition at line 57 of file logicalproto.h.

58{
LogicalRepMsgType
Definition: logicalproto.h:58
@ LOGICAL_REP_MSG_INSERT
Definition: logicalproto.h:62
@ LOGICAL_REP_MSG_TRUNCATE
Definition: logicalproto.h:65
@ LOGICAL_REP_MSG_STREAM_STOP
Definition: logicalproto.h:74
@ LOGICAL_REP_MSG_BEGIN
Definition: logicalproto.h:59
@ LOGICAL_REP_MSG_STREAM_PREPARE
Definition: logicalproto.h:77
@ LOGICAL_REP_MSG_STREAM_ABORT
Definition: logicalproto.h:76
@ LOGICAL_REP_MSG_BEGIN_PREPARE
Definition: logicalproto.h:69
@ LOGICAL_REP_MSG_STREAM_START
Definition: logicalproto.h:73
@ LOGICAL_REP_MSG_COMMIT
Definition: logicalproto.h:60
@ LOGICAL_REP_MSG_PREPARE
Definition: logicalproto.h:70
@ LOGICAL_REP_MSG_RELATION
Definition: logicalproto.h:66
@ LOGICAL_REP_MSG_MESSAGE
Definition: logicalproto.h:68
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
Definition: logicalproto.h:72
@ LOGICAL_REP_MSG_COMMIT_PREPARED
Definition: logicalproto.h:71
@ LOGICAL_REP_MSG_TYPE
Definition: logicalproto.h:67
@ LOGICAL_REP_MSG_DELETE
Definition: logicalproto.h:64
@ LOGICAL_REP_MSG_STREAM_COMMIT
Definition: logicalproto.h:75
@ LOGICAL_REP_MSG_ORIGIN
Definition: logicalproto.h:61
@ LOGICAL_REP_MSG_UPDATE
Definition: logicalproto.h:63

Function Documentation

◆ logicalrep_message_type()

const char * logicalrep_message_type ( LogicalRepMsgType  action)

Definition at line 1196 of file proto.c.

1197{
1198 static char err_unknown[20];
1199
1200 switch (action)
1201 {
1203 return "BEGIN";
1205 return "COMMIT";
1207 return "ORIGIN";
1209 return "INSERT";
1211 return "UPDATE";
1213 return "DELETE";
1215 return "TRUNCATE";
1217 return "RELATION";
1219 return "TYPE";
1221 return "MESSAGE";
1223 return "BEGIN PREPARE";
1225 return "PREPARE";
1227 return "COMMIT PREPARED";
1229 return "ROLLBACK PREPARED";
1231 return "STREAM START";
1233 return "STREAM STOP";
1235 return "STREAM COMMIT";
1237 return "STREAM ABORT";
1239 return "STREAM PREPARE";
1240 }
1241
1242 /*
1243 * This message provides context in the error raised when applying a
1244 * logical message. So we can't throw an error here. Return an unknown
1245 * indicator value so that the original error is still reported.
1246 */
1247 snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);
1248
1249 return err_unknown;
1250}
#define snprintf
Definition: port.h:238

References generate_unaccent_rules::action, LOGICAL_REP_MSG_BEGIN, LOGICAL_REP_MSG_BEGIN_PREPARE, LOGICAL_REP_MSG_COMMIT, LOGICAL_REP_MSG_COMMIT_PREPARED, LOGICAL_REP_MSG_DELETE, LOGICAL_REP_MSG_INSERT, LOGICAL_REP_MSG_MESSAGE, LOGICAL_REP_MSG_ORIGIN, LOGICAL_REP_MSG_PREPARE, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_ROLLBACK_PREPARED, LOGICAL_REP_MSG_STREAM_ABORT, LOGICAL_REP_MSG_STREAM_COMMIT, LOGICAL_REP_MSG_STREAM_PREPARE, LOGICAL_REP_MSG_STREAM_START, LOGICAL_REP_MSG_STREAM_STOP, LOGICAL_REP_MSG_TRUNCATE, LOGICAL_REP_MSG_TYPE, LOGICAL_REP_MSG_UPDATE, and snprintf.

Referenced by apply_error_callback().

◆ logicalrep_read_begin()

void logicalrep_read_begin ( StringInfo  in,
LogicalRepBeginData begin_data 
)

Definition at line 62 of file proto.c.

63{
64 /* read fields */
65 begin_data->final_lsn = pq_getmsgint64(in);
66 if (begin_data->final_lsn == InvalidXLogRecPtr)
67 elog(ERROR, "final_lsn not set in begin message");
68 begin_data->committime = pq_getmsgint64(in);
69 begin_data->xid = pq_getmsgint(in, 4);
70}
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:415
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:453
XLogRecPtr final_lsn
Definition: logicalproto.h:129
TransactionId xid
Definition: logicalproto.h:131
TimestampTz committime
Definition: logicalproto.h:130
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References LogicalRepBeginData::committime, elog, ERROR, LogicalRepBeginData::final_lsn, InvalidXLogRecPtr, pq_getmsgint(), pq_getmsgint64(), and LogicalRepBeginData::xid.

Referenced by apply_handle_begin().

◆ logicalrep_read_begin_prepare()

void logicalrep_read_begin_prepare ( StringInfo  in,
LogicalRepPreparedTxnData begin_data 
)

Definition at line 133 of file proto.c.

134{
135 /* read fields */
136 begin_data->prepare_lsn = pq_getmsgint64(in);
137 if (begin_data->prepare_lsn == InvalidXLogRecPtr)
138 elog(ERROR, "prepare_lsn not set in begin prepare message");
139 begin_data->end_lsn = pq_getmsgint64(in);
140 if (begin_data->end_lsn == InvalidXLogRecPtr)
141 elog(ERROR, "end_lsn not set in begin prepare message");
142 begin_data->prepare_time = pq_getmsgint64(in);
143 begin_data->xid = pq_getmsgint(in, 4);
144
145 /* read gid (copy it into a pre-allocated buffer) */
146 strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
147}
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:579

References elog, LogicalRepPreparedTxnData::end_lsn, ERROR, LogicalRepPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepPreparedTxnData::prepare_lsn, LogicalRepPreparedTxnData::prepare_time, strlcpy(), and LogicalRepPreparedTxnData::xid.

Referenced by apply_handle_begin_prepare().

◆ logicalrep_read_commit()

void logicalrep_read_commit ( StringInfo  in,
LogicalRepCommitData commit_data 
)

Definition at line 97 of file proto.c.

98{
99 /* read flags (unused for now) */
100 uint8 flags = pq_getmsgbyte(in);
101
102 if (flags != 0)
103 elog(ERROR, "unrecognized flags %u in commit message", flags);
104
105 /* read fields */
106 commit_data->commit_lsn = pq_getmsgint64(in);
107 commit_data->end_lsn = pq_getmsgint64(in);
108 commit_data->committime = pq_getmsgint64(in);
109}
uint8_t uint8
Definition: c.h:483
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
TimestampTz committime
Definition: logicalproto.h:138

References LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, elog, LogicalRepCommitData::end_lsn, ERROR, pq_getmsgbyte(), and pq_getmsgint64().

Referenced by apply_handle_commit().

◆ logicalrep_read_commit_prepared()

void logicalrep_read_commit_prepared ( StringInfo  in,
LogicalRepCommitPreparedTxnData prepare_data 
)

Definition at line 266 of file proto.c.

267{
268 /* read flags */
269 uint8 flags = pq_getmsgbyte(in);
270
271 if (flags != 0)
272 elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
273
274 /* read fields */
275 prepare_data->commit_lsn = pq_getmsgint64(in);
276 if (prepare_data->commit_lsn == InvalidXLogRecPtr)
277 elog(ERROR, "commit_lsn is not set in commit prepared message");
278 prepare_data->end_lsn = pq_getmsgint64(in);
279 if (prepare_data->end_lsn == InvalidXLogRecPtr)
280 elog(ERROR, "end_lsn is not set in commit prepared message");
281 prepare_data->commit_time = pq_getmsgint64(in);
282 prepare_data->xid = pq_getmsgint(in, 4);
283
284 /* read gid (copy it into a pre-allocated buffer) */
285 strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
286}

References LogicalRepCommitPreparedTxnData::commit_lsn, LogicalRepCommitPreparedTxnData::commit_time, elog, LogicalRepCommitPreparedTxnData::end_lsn, ERROR, LogicalRepCommitPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), strlcpy(), and LogicalRepCommitPreparedTxnData::xid.

Referenced by apply_handle_commit_prepared().

◆ logicalrep_read_delete()

LogicalRepRelId logicalrep_read_delete ( StringInfo  in,
LogicalRepTupleData oldtup 
)

Definition at line 554 of file proto.c.

555{
556 char action;
557 LogicalRepRelId relid;
558
559 /* read the relation id */
560 relid = pq_getmsgint(in, 4);
561
562 /* read and verify action */
563 action = pq_getmsgbyte(in);
564 if (action != 'K' && action != 'O')
565 elog(ERROR, "expected action 'O' or 'K', got %c", action);
566
567 logicalrep_read_tuple(in, oldtup);
568
569 return relid;
570}
uint32 LogicalRepRelId
Definition: logicalproto.h:101
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:850

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_delete().

◆ logicalrep_read_insert()

LogicalRepRelId logicalrep_read_insert ( StringInfo  in,
LogicalRepTupleData newtup 
)

Definition at line 425 of file proto.c.

426{
427 char action;
428 LogicalRepRelId relid;
429
430 /* read the relation id */
431 relid = pq_getmsgint(in, 4);
432
433 action = pq_getmsgbyte(in);
434 if (action != 'N')
435 elog(ERROR, "expected new tuple but got %d",
436 action);
437
438 logicalrep_read_tuple(in, newtup);
439
440 return relid;
441}

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_insert().

◆ logicalrep_read_origin()

char * logicalrep_read_origin ( StringInfo  in,
XLogRecPtr origin_lsn 
)

Definition at line 389 of file proto.c.

390{
391 /* fixed fields */
392 *origin_lsn = pq_getmsgint64(in);
393
394 /* return origin */
395 return pstrdup(pq_getmsgstring(in));
396}
char * pstrdup(const char *in)
Definition: mcxt.c:1696

References pq_getmsgint64(), pq_getmsgstring(), and pstrdup().

◆ logicalrep_read_prepare()

void logicalrep_read_prepare ( StringInfo  in,
LogicalRepPreparedTxnData prepare_data 
)

Definition at line 227 of file proto.c.

228{
229 logicalrep_read_prepare_common(in, "prepare", prepare_data);
230}
static void logicalrep_read_prepare_common(StringInfo in, char *msgtype, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:198

References logicalrep_read_prepare_common().

Referenced by apply_handle_prepare().

◆ logicalrep_read_rel()

LogicalRepRelation * logicalrep_read_rel ( StringInfo  in)

Definition at line 690 of file proto.c.

691{
693
694 rel->remoteid = pq_getmsgint(in, 4);
695
696 /* Read relation name from stream */
698 rel->relname = pstrdup(pq_getmsgstring(in));
699
700 /* Read the replica identity. */
701 rel->replident = pq_getmsgbyte(in);
702
703 /* Get attribute description */
704 logicalrep_read_attrs(in, rel);
705
706 return rel;
707}
void * palloc(Size size)
Definition: mcxt.c:1317
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
Definition: proto.c:972
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:1034
LogicalRepRelId remoteid
Definition: logicalproto.h:107

References logicalrep_read_attrs(), logicalrep_read_namespace(), LogicalRepRelation::nspname, palloc(), pq_getmsgbyte(), pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepRelation::relname, LogicalRepRelation::remoteid, and LogicalRepRelation::replident.

Referenced by apply_handle_relation().

◆ logicalrep_read_rollback_prepared()

void logicalrep_read_rollback_prepared ( StringInfo  in,
LogicalRepRollbackPreparedTxnData rollback_data 
)

Definition at line 324 of file proto.c.

326{
327 /* read flags */
328 uint8 flags = pq_getmsgbyte(in);
329
330 if (flags != 0)
331 elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
332
333 /* read fields */
334 rollback_data->prepare_end_lsn = pq_getmsgint64(in);
335 if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
336 elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
337 rollback_data->rollback_end_lsn = pq_getmsgint64(in);
338 if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
339 elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
340 rollback_data->prepare_time = pq_getmsgint64(in);
341 rollback_data->rollback_time = pq_getmsgint64(in);
342 rollback_data->xid = pq_getmsgint(in, 4);
343
344 /* read gid (copy it into a pre-allocated buffer) */
345 strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
346}

References elog, ERROR, LogicalRepRollbackPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepRollbackPreparedTxnData::prepare_end_lsn, LogicalRepRollbackPreparedTxnData::prepare_time, LogicalRepRollbackPreparedTxnData::rollback_end_lsn, LogicalRepRollbackPreparedTxnData::rollback_time, strlcpy(), and LogicalRepRollbackPreparedTxnData::xid.

Referenced by apply_handle_rollback_prepared().

◆ logicalrep_read_stream_abort()

void logicalrep_read_stream_abort ( StringInfo  in,
LogicalRepStreamAbortData abort_data,
bool  read_abort_info 
)

Definition at line 1171 of file proto.c.

1174{
1175 Assert(abort_data);
1176
1177 abort_data->xid = pq_getmsgint(in, 4);
1178 abort_data->subxid = pq_getmsgint(in, 4);
1179
1180 if (read_abort_info)
1181 {
1182 abort_data->abort_lsn = pq_getmsgint64(in);
1183 abort_data->abort_time = pq_getmsgint64(in);
1184 }
1185 else
1186 {
1187 abort_data->abort_lsn = InvalidXLogRecPtr;
1188 abort_data->abort_time = 0;
1189 }
1190}
#define Assert(condition)
Definition: c.h:812

References LogicalRepStreamAbortData::abort_lsn, LogicalRepStreamAbortData::abort_time, Assert, InvalidXLogRecPtr, pq_getmsgint(), pq_getmsgint64(), LogicalRepStreamAbortData::subxid, and LogicalRepStreamAbortData::xid.

Referenced by apply_handle_stream_abort().

◆ logicalrep_read_stream_commit()

TransactionId logicalrep_read_stream_commit ( StringInfo  in,
LogicalRepCommitData commit_data 
)

Definition at line 1116 of file proto.c.

1117{
1118 TransactionId xid;
1119 uint8 flags;
1120
1121 xid = pq_getmsgint(in, 4);
1122
1123 /* read flags (unused for now) */
1124 flags = pq_getmsgbyte(in);
1125
1126 if (flags != 0)
1127 elog(ERROR, "unrecognized flags %u in commit message", flags);
1128
1129 /* read fields */
1130 commit_data->commit_lsn = pq_getmsgint64(in);
1131 commit_data->end_lsn = pq_getmsgint64(in);
1132 commit_data->committime = pq_getmsgint64(in);
1133
1134 return xid;
1135}
uint32 TransactionId
Definition: c.h:606

References LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, elog, LogicalRepCommitData::end_lsn, ERROR, pq_getmsgbyte(), pq_getmsgint(), and pq_getmsgint64().

Referenced by apply_handle_stream_commit().

◆ logicalrep_read_stream_prepare()

void logicalrep_read_stream_prepare ( StringInfo  in,
LogicalRepPreparedTxnData prepare_data 
)

Definition at line 364 of file proto.c.

365{
366 logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
367}

References logicalrep_read_prepare_common().

Referenced by apply_handle_stream_prepare().

◆ logicalrep_read_stream_start()

TransactionId logicalrep_read_stream_start ( StringInfo  in,
bool *  first_segment 
)

Definition at line 1066 of file proto.c.

1067{
1068 TransactionId xid;
1069
1070 Assert(first_segment);
1071
1072 xid = pq_getmsgint(in, 4);
1073 *first_segment = (pq_getmsgbyte(in) == 1);
1074
1075 return xid;
1076}

References Assert, pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_stream_start().

◆ logicalrep_read_truncate()

List * logicalrep_read_truncate ( StringInfo  in,
bool *  cascade,
bool *  restart_seqs 
)

Definition at line 608 of file proto.c.

610{
611 int i;
612 int nrelids;
613 List *relids = NIL;
614 uint8 flags;
615
616 nrelids = pq_getmsgint(in, 4);
617
618 /* read and decode truncate flags */
619 flags = pq_getmsgint(in, 1);
620 *cascade = (flags & TRUNCATE_CASCADE) > 0;
621 *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
622
623 for (i = 0; i < nrelids; i++)
624 relids = lappend_oid(relids, pq_getmsgint(in, 4));
625
626 return relids;
627}
int i
Definition: isn.c:72
List * lappend_oid(List *list, Oid datum)
Definition: list.c:375
#define NIL
Definition: pg_list.h:68
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:30
#define TRUNCATE_CASCADE
Definition: proto.c:29
Definition: pg_list.h:54

References i, lappend_oid(), NIL, pq_getmsgint(), TRUNCATE_CASCADE, and TRUNCATE_RESTART_SEQS.

Referenced by apply_handle_truncate().

◆ logicalrep_read_typ()

void logicalrep_read_typ ( StringInfo  in,
LogicalRepTyp ltyp 
)

Definition at line 746 of file proto.c.

747{
748 ltyp->remoteid = pq_getmsgint(in, 4);
749
750 /* Read type name from stream */
752 ltyp->typname = pstrdup(pq_getmsgstring(in));
753}

References logicalrep_read_namespace(), LogicalRepTyp::nspname, pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepTyp::remoteid, and LogicalRepTyp::typname.

Referenced by apply_handle_type().

◆ logicalrep_read_update()

LogicalRepRelId logicalrep_read_update ( StringInfo  in,
bool *  has_oldtuple,
LogicalRepTupleData oldtup,
LogicalRepTupleData newtup 
)

Definition at line 482 of file proto.c.

485{
486 char action;
487 LogicalRepRelId relid;
488
489 /* read the relation id */
490 relid = pq_getmsgint(in, 4);
491
492 /* read and verify action */
493 action = pq_getmsgbyte(in);
494 if (action != 'K' && action != 'O' && action != 'N')
495 elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
496 action);
497
498 /* check for old tuple */
499 if (action == 'K' || action == 'O')
500 {
501 logicalrep_read_tuple(in, oldtup);
502 *has_oldtuple = true;
503
504 action = pq_getmsgbyte(in);
505 }
506 else
507 *has_oldtuple = false;
508
509 /* check for new tuple */
510 if (action != 'N')
511 elog(ERROR, "expected action 'N', got %c",
512 action);
513
514 logicalrep_read_tuple(in, newtup);
515
516 return relid;
517}

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_update().

◆ logicalrep_should_publish_column()

bool logicalrep_should_publish_column ( Form_pg_attribute  att,
Bitmapset columns,
bool  include_gencols 
)

Definition at line 1265 of file proto.c.

1267{
1268 if (att->attisdropped)
1269 return false;
1270
1271 /* If a column list is provided, publish only the cols in that list. */
1272 if (columns)
1273 return bms_is_member(att->attnum, columns);
1274
1275 /* All non-generated columns are always published. */
1276 return att->attgenerated ? include_gencols : true;
1277}
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:510

References bms_is_member().

Referenced by logicalrep_write_attrs(), logicalrep_write_tuple(), and send_relation_and_attrs().

◆ logicalrep_write_begin()

void logicalrep_write_begin ( StringInfo  out,
ReorderBufferTXN txn 
)

Definition at line 48 of file proto.c.

49{
51
52 /* fixed fields */
53 pq_sendint64(out, txn->final_lsn);
55 pq_sendint32(out, txn->xid);
56}
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:144
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152
TimestampTz commit_time
XLogRecPtr final_lsn
union ReorderBufferTXN::@115 xact_time
TransactionId xid

References ReorderBufferTXN::commit_time, ReorderBufferTXN::final_lsn, LOGICAL_REP_MSG_BEGIN, pq_sendbyte(), pq_sendint32(), pq_sendint64(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_send_begin().

◆ logicalrep_write_begin_prepare()

void logicalrep_write_begin_prepare ( StringInfo  out,
ReorderBufferTXN txn 
)

Definition at line 115 of file proto.c.

116{
118
119 /* fixed fields */
120 pq_sendint64(out, txn->final_lsn);
121 pq_sendint64(out, txn->end_lsn);
123 pq_sendint32(out, txn->xid);
124
125 /* send gid */
126 pq_sendstring(out, txn->gid);
127}
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:195
XLogRecPtr end_lsn
TimestampTz prepare_time

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_BEGIN_PREPARE, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::prepare_time, ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_begin_prepare_txn().

◆ logicalrep_write_commit()

void logicalrep_write_commit ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 77 of file proto.c.

79{
80 uint8 flags = 0;
81
83
84 /* send the flags field (unused for now) */
85 pq_sendbyte(out, flags);
86
87 /* send fields */
88 pq_sendint64(out, commit_lsn);
89 pq_sendint64(out, txn->end_lsn);
91}

References ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, LOGICAL_REP_MSG_COMMIT, pq_sendbyte(), pq_sendint64(), and ReorderBufferTXN::xact_time.

Referenced by pgoutput_commit_txn().

◆ logicalrep_write_commit_prepared()

void logicalrep_write_commit_prepared ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 236 of file proto.c.

238{
239 uint8 flags = 0;
240
242
243 /*
244 * This should only ever happen for two-phase commit transactions, in
245 * which case we expect to have a valid GID.
246 */
247 Assert(txn->gid != NULL);
248
249 /* send the flags field */
250 pq_sendbyte(out, flags);
251
252 /* send fields */
253 pq_sendint64(out, commit_lsn);
254 pq_sendint64(out, txn->end_lsn);
256 pq_sendint32(out, txn->xid);
257
258 /* send gid */
259 pq_sendstring(out, txn->gid);
260}

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_COMMIT_PREPARED, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_commit_prepared_txn().

◆ logicalrep_write_delete()

void logicalrep_write_delete ( StringInfo  out,
TransactionId  xid,
Relation  rel,
TupleTableSlot oldslot,
bool  binary,
Bitmapset columns,
bool  include_gencols 
)

Definition at line 523 of file proto.c.

526{
527 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
528 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
529 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
530
532
533 /* transaction ID (if not valid, we're not streaming) */
534 if (TransactionIdIsValid(xid))
535 pq_sendint32(out, xid);
536
537 /* use Oid as relation identifier */
539
540 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
541 pq_sendbyte(out, 'O'); /* old tuple follows */
542 else
543 pq_sendbyte(out, 'K'); /* old key follows */
544
545 logicalrep_write_tuple(out, rel, oldslot, binary, columns, include_gencols);
546}
static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, bool binary, Bitmapset *columns, bool include_gencols)
Definition: proto.c:759
#define RelationGetRelid(relation)
Definition: rel.h:505
Form_pg_class rd_rel
Definition: rel.h:111
#define TransactionIdIsValid(xid)
Definition: transam.h:41

References Assert, LOGICAL_REP_MSG_DELETE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

◆ logicalrep_write_insert()

void logicalrep_write_insert ( StringInfo  out,
TransactionId  xid,
Relation  rel,
TupleTableSlot newslot,
bool  binary,
Bitmapset columns,
bool  include_gencols 
)

Definition at line 402 of file proto.c.

405{
407
408 /* transaction ID (if not valid, we're not streaming) */
409 if (TransactionIdIsValid(xid))
410 pq_sendint32(out, xid);
411
412 /* use Oid as relation identifier */
414
415 pq_sendbyte(out, 'N'); /* new tuple follows */
416 logicalrep_write_tuple(out, rel, newslot, binary, columns, include_gencols);
417}

References LOGICAL_REP_MSG_INSERT, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

◆ logicalrep_write_message()

void logicalrep_write_message ( StringInfo  out,
TransactionId  xid,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)

Definition at line 633 of file proto.c.

636{
637 uint8 flags = 0;
638
640
641 /* encode and send message flags */
642 if (transactional)
643 flags |= MESSAGE_TRANSACTIONAL;
644
645 /* transaction ID (if not valid, we're not streaming) */
646 if (TransactionIdIsValid(xid))
647 pq_sendint32(out, xid);
648
649 pq_sendint8(out, flags);
650 pq_sendint64(out, lsn);
651 pq_sendstring(out, prefix);
652 pq_sendint32(out, sz);
653 pq_sendbytes(out, message, sz);
654}
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
Definition: pqformat.c:126
static void pq_sendint8(StringInfo buf, uint8 i)
Definition: pqformat.h:128
#define MESSAGE_TRANSACTIONAL
Definition: proto.c:28

References LOGICAL_REP_MSG_MESSAGE, MESSAGE_TRANSACTIONAL, pq_sendbyte(), pq_sendbytes(), pq_sendint32(), pq_sendint64(), pq_sendint8(), pq_sendstring(), and TransactionIdIsValid.

Referenced by pgoutput_message().

◆ logicalrep_write_origin()

void logicalrep_write_origin ( StringInfo  out,
const char *  origin,
XLogRecPtr  origin_lsn 
)

Definition at line 373 of file proto.c.

375{
377
378 /* fixed fields */
379 pq_sendint64(out, origin_lsn);
380
381 /* origin string */
382 pq_sendstring(out, origin);
383}

References LOGICAL_REP_MSG_ORIGIN, pq_sendbyte(), pq_sendint64(), and pq_sendstring().

Referenced by send_repl_origin().

◆ logicalrep_write_prepare()

void logicalrep_write_prepare ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)

Definition at line 186 of file proto.c.

188{
190 txn, prepare_lsn);
191}
static void logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:154

References LOGICAL_REP_MSG_PREPARE, and logicalrep_write_prepare_common().

Referenced by pgoutput_prepare_txn().

◆ logicalrep_write_rel()

void logicalrep_write_rel ( StringInfo  out,
TransactionId  xid,
Relation  rel,
Bitmapset columns,
bool  include_gencols 
)

Definition at line 660 of file proto.c.

662{
663 char *relname;
664
666
667 /* transaction ID (if not valid, we're not streaming) */
668 if (TransactionIdIsValid(xid))
669 pq_sendint32(out, xid);
670
671 /* use Oid as relation identifier */
673
674 /* send qualified relation name */
678
679 /* send replica identity */
680 pq_sendbyte(out, rel->rd_rel->relreplident);
681
682 /* send the attribute info */
683 logicalrep_write_attrs(out, rel, columns, include_gencols);
684}
NameData relname
Definition: pg_class.h:38
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:1014
static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, bool include_gencols)
Definition: proto.c:910
#define RelationGetRelationName(relation)
Definition: rel.h:539
#define RelationGetNamespace(relation)
Definition: rel.h:546

References LOGICAL_REP_MSG_RELATION, logicalrep_write_attrs(), logicalrep_write_namespace(), pq_sendbyte(), pq_sendint32(), pq_sendstring(), RelationData::rd_rel, RelationGetNamespace, RelationGetRelationName, RelationGetRelid, relname, and TransactionIdIsValid.

Referenced by send_relation_and_attrs().

◆ logicalrep_write_rollback_prepared()

void logicalrep_write_rollback_prepared ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  prepare_end_lsn,
TimestampTz  prepare_time 
)

Definition at line 292 of file proto.c.

295{
296 uint8 flags = 0;
297
299
300 /*
301 * This should only ever happen for two-phase commit transactions, in
302 * which case we expect to have a valid GID.
303 */
304 Assert(txn->gid != NULL);
305
306 /* send the flags field */
307 pq_sendbyte(out, flags);
308
309 /* send fields */
310 pq_sendint64(out, prepare_end_lsn);
311 pq_sendint64(out, txn->end_lsn);
312 pq_sendint64(out, prepare_time);
314 pq_sendint32(out, txn->xid);
315
316 /* send gid */
317 pq_sendstring(out, txn->gid);
318}

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_ROLLBACK_PREPARED, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_rollback_prepared_txn().

◆ logicalrep_write_stream_abort()

void logicalrep_write_stream_abort ( StringInfo  out,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  abort_lsn,
TimestampTz  abort_time,
bool  write_abort_info 
)

Definition at line 1145 of file proto.c.

1148{
1150
1152
1153 /* transaction ID */
1154 pq_sendint32(out, xid);
1155 pq_sendint32(out, subxid);
1156
1157 if (write_abort_info)
1158 {
1159 pq_sendint64(out, abort_lsn);
1160 pq_sendint64(out, abort_time);
1161 }
1162}

References Assert, LOGICAL_REP_MSG_STREAM_ABORT, pq_sendbyte(), pq_sendint32(), pq_sendint64(), and TransactionIdIsValid.

Referenced by pgoutput_stream_abort().

◆ logicalrep_write_stream_commit()

void logicalrep_write_stream_commit ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 1091 of file proto.c.

1093{
1094 uint8 flags = 0;
1095
1097
1099
1100 /* transaction ID */
1101 pq_sendint32(out, txn->xid);
1102
1103 /* send the flags field (unused for now) */
1104 pq_sendbyte(out, flags);
1105
1106 /* send fields */
1107 pq_sendint64(out, commit_lsn);
1108 pq_sendint64(out, txn->end_lsn);
1110}

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, LOGICAL_REP_MSG_STREAM_COMMIT, pq_sendbyte(), pq_sendint32(), pq_sendint64(), TransactionIdIsValid, ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_stream_commit().

◆ logicalrep_write_stream_prepare()

void logicalrep_write_stream_prepare ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)

Definition at line 352 of file proto.c.

355{
357 txn, prepare_lsn);
358}

References LOGICAL_REP_MSG_STREAM_PREPARE, and logicalrep_write_prepare_common().

Referenced by pgoutput_stream_prepare_txn().

◆ logicalrep_write_stream_start()

void logicalrep_write_stream_start ( StringInfo  out,
TransactionId  xid,
bool  first_segment 
)

Definition at line 1048 of file proto.c.

1050{
1052
1054
1055 /* transaction ID (we're starting to stream, so must be valid) */
1056 pq_sendint32(out, xid);
1057
1058 /* 1 if this is the first streaming segment for this xid */
1059 pq_sendbyte(out, first_segment ? 1 : 0);
1060}

References Assert, LOGICAL_REP_MSG_STREAM_START, pq_sendbyte(), pq_sendint32(), and TransactionIdIsValid.

Referenced by pgoutput_stream_start().

◆ logicalrep_write_stream_stop()

void logicalrep_write_stream_stop ( StringInfo  out)

Definition at line 1082 of file proto.c.

1083{
1085}

References LOGICAL_REP_MSG_STREAM_STOP, and pq_sendbyte().

Referenced by pgoutput_stream_stop().

◆ logicalrep_write_truncate()

void logicalrep_write_truncate ( StringInfo  out,
TransactionId  xid,
int  nrelids,
Oid  relids[],
bool  cascade,
bool  restart_seqs 
)

Definition at line 576 of file proto.c.

581{
582 int i;
583 uint8 flags = 0;
584
586
587 /* transaction ID (if not valid, we're not streaming) */
588 if (TransactionIdIsValid(xid))
589 pq_sendint32(out, xid);
590
591 pq_sendint32(out, nrelids);
592
593 /* encode and send truncate flags */
594 if (cascade)
595 flags |= TRUNCATE_CASCADE;
596 if (restart_seqs)
597 flags |= TRUNCATE_RESTART_SEQS;
598 pq_sendint8(out, flags);
599
600 for (i = 0; i < nrelids; i++)
601 pq_sendint32(out, relids[i]);
602}

References i, LOGICAL_REP_MSG_TRUNCATE, pq_sendbyte(), pq_sendint32(), pq_sendint8(), TransactionIdIsValid, TRUNCATE_CASCADE, and TRUNCATE_RESTART_SEQS.

Referenced by pgoutput_truncate().

◆ logicalrep_write_typ()

void logicalrep_write_typ ( StringInfo  out,
TransactionId  xid,
Oid  typoid 
)

Definition at line 715 of file proto.c.

716{
717 Oid basetypoid = getBaseType(typoid);
718 HeapTuple tup;
719 Form_pg_type typtup;
720
722
723 /* transaction ID (if not valid, we're not streaming) */
724 if (TransactionIdIsValid(xid))
725 pq_sendint32(out, xid);
726
727 tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
728 if (!HeapTupleIsValid(tup))
729 elog(ERROR, "cache lookup failed for type %u", basetypoid);
730 typtup = (Form_pg_type) GETSTRUCT(tup);
731
732 /* use Oid as type identifier */
733 pq_sendint32(out, typoid);
734
735 /* send qualified type name */
736 logicalrep_write_namespace(out, typtup->typnamespace);
737 pq_sendstring(out, NameStr(typtup->typname));
738
739 ReleaseSysCache(tup);
740}
#define NameStr(name)
Definition: c.h:700
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
Oid getBaseType(Oid typid)
Definition: lsyscache.c:2521
FormData_pg_type * Form_pg_type
Definition: pg_type.h:261
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
unsigned int Oid
Definition: postgres_ext.h:31
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:269
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:221

References elog, ERROR, getBaseType(), GETSTRUCT, HeapTupleIsValid, LOGICAL_REP_MSG_TYPE, logicalrep_write_namespace(), NameStr, ObjectIdGetDatum(), pq_sendbyte(), pq_sendint32(), pq_sendstring(), ReleaseSysCache(), SearchSysCache1(), and TransactionIdIsValid.

Referenced by send_relation_and_attrs().

◆ logicalrep_write_update()

void logicalrep_write_update ( StringInfo  out,
TransactionId  xid,
Relation  rel,
TupleTableSlot oldslot,
TupleTableSlot newslot,
bool  binary,
Bitmapset columns,
bool  include_gencols 
)

Definition at line 447 of file proto.c.

450{
452
453 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
454 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
455 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
456
457 /* transaction ID (if not valid, we're not streaming) */
458 if (TransactionIdIsValid(xid))
459 pq_sendint32(out, xid);
460
461 /* use Oid as relation identifier */
463
464 if (oldslot != NULL)
465 {
466 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
467 pq_sendbyte(out, 'O'); /* old tuple follows */
468 else
469 pq_sendbyte(out, 'K'); /* old key follows */
470 logicalrep_write_tuple(out, rel, oldslot, binary, columns,
471 include_gencols);
472 }
473
474 pq_sendbyte(out, 'N'); /* new tuple follows */
475 logicalrep_write_tuple(out, rel, newslot, binary, columns, include_gencols);
476}

References Assert, LOGICAL_REP_MSG_UPDATE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().