PostgreSQL Source Code git master
logicalproto.h File Reference
#include "access/xact.h"
#include "executor/tuptable.h"
#include "replication/reorderbuffer.h"
#include "utils/rel.h"
Include dependency graph for logicalproto.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  LogicalRepTupleData
 
struct  LogicalRepRelation
 
struct  LogicalRepTyp
 
struct  LogicalRepBeginData
 
struct  LogicalRepCommitData
 
struct  LogicalRepPreparedTxnData
 
struct  LogicalRepCommitPreparedTxnData
 
struct  LogicalRepRollbackPreparedTxnData
 
struct  LogicalRepStreamAbortData
 

Macros

#define LOGICALREP_PROTO_MIN_VERSION_NUM   1
 
#define LOGICALREP_PROTO_VERSION_NUM   1
 
#define LOGICALREP_PROTO_STREAM_VERSION_NUM   2
 
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM   3
 
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM   4
 
#define LOGICALREP_PROTO_MAX_VERSION_NUM   LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
 
#define LOGICALREP_COLUMN_NULL   'n'
 
#define LOGICALREP_COLUMN_UNCHANGED   'u'
 
#define LOGICALREP_COLUMN_TEXT   't'
 
#define LOGICALREP_COLUMN_BINARY   'b' /* added in PG14 */
 

Typedefs

typedef enum LogicalRepMsgType LogicalRepMsgType
 
typedef struct LogicalRepTupleData LogicalRepTupleData
 
typedef uint32 LogicalRepRelId
 
typedef struct LogicalRepRelation LogicalRepRelation
 
typedef struct LogicalRepTyp LogicalRepTyp
 
typedef struct LogicalRepBeginData LogicalRepBeginData
 
typedef struct LogicalRepCommitData LogicalRepCommitData
 
typedef struct LogicalRepPreparedTxnData LogicalRepPreparedTxnData
 
typedef struct LogicalRepCommitPreparedTxnData LogicalRepCommitPreparedTxnData
 
typedef struct LogicalRepRollbackPreparedTxnData LogicalRepRollbackPreparedTxnData
 
typedef struct LogicalRepStreamAbortData LogicalRepStreamAbortData
 

Enumerations

enum  LogicalRepMsgType {
  LOGICAL_REP_MSG_BEGIN = 'B' , LOGICAL_REP_MSG_COMMIT = 'C' , LOGICAL_REP_MSG_ORIGIN = 'O' , LOGICAL_REP_MSG_INSERT = 'I' ,
  LOGICAL_REP_MSG_UPDATE = 'U' , LOGICAL_REP_MSG_DELETE = 'D' , LOGICAL_REP_MSG_TRUNCATE = 'T' , LOGICAL_REP_MSG_RELATION = 'R' ,
  LOGICAL_REP_MSG_TYPE = 'Y' , LOGICAL_REP_MSG_MESSAGE = 'M' , LOGICAL_REP_MSG_BEGIN_PREPARE = 'b' , LOGICAL_REP_MSG_PREPARE = 'P' ,
  LOGICAL_REP_MSG_COMMIT_PREPARED = 'K' , LOGICAL_REP_MSG_ROLLBACK_PREPARED = 'r' , LOGICAL_REP_MSG_STREAM_START = 'S' , LOGICAL_REP_MSG_STREAM_STOP = 'E' ,
  LOGICAL_REP_MSG_STREAM_COMMIT = 'c' , LOGICAL_REP_MSG_STREAM_ABORT = 'A' , LOGICAL_REP_MSG_STREAM_PREPARE = 'p'
}
 

Functions

void logicalrep_write_begin (StringInfo out, ReorderBufferTXN *txn)
 
void logicalrep_read_begin (StringInfo in, LogicalRepBeginData *begin_data)
 
void logicalrep_write_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
void logicalrep_read_commit (StringInfo in, LogicalRepCommitData *commit_data)
 
void logicalrep_write_begin_prepare (StringInfo out, ReorderBufferTXN *txn)
 
void logicalrep_read_begin_prepare (StringInfo in, LogicalRepPreparedTxnData *begin_data)
 
void logicalrep_write_prepare (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
void logicalrep_read_prepare (StringInfo in, LogicalRepPreparedTxnData *prepare_data)
 
void logicalrep_write_commit_prepared (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
void logicalrep_read_commit_prepared (StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
 
void logicalrep_write_rollback_prepared (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
void logicalrep_read_rollback_prepared (StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
 
void logicalrep_write_stream_prepare (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
void logicalrep_read_stream_prepare (StringInfo in, LogicalRepPreparedTxnData *prepare_data)
 
void logicalrep_write_origin (StringInfo out, const char *origin, XLogRecPtr origin_lsn)
 
char * logicalrep_read_origin (StringInfo in, XLogRecPtr *origin_lsn)
 
void logicalrep_write_insert (StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
 
LogicalRepRelId logicalrep_read_insert (StringInfo in, LogicalRepTupleData *newtup)
 
void logicalrep_write_update (StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
 
LogicalRepRelId logicalrep_read_update (StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
 
void logicalrep_write_delete (StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
 
LogicalRepRelId logicalrep_read_delete (StringInfo in, LogicalRepTupleData *oldtup)
 
void logicalrep_write_truncate (StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
 
Listlogicalrep_read_truncate (StringInfo in, bool *cascade, bool *restart_seqs)
 
void logicalrep_write_message (StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
void logicalrep_write_rel (StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
 
LogicalRepRelationlogicalrep_read_rel (StringInfo in)
 
void logicalrep_write_typ (StringInfo out, TransactionId xid, Oid typoid)
 
void logicalrep_read_typ (StringInfo in, LogicalRepTyp *ltyp)
 
void logicalrep_write_stream_start (StringInfo out, TransactionId xid, bool first_segment)
 
TransactionId logicalrep_read_stream_start (StringInfo in, bool *first_segment)
 
void logicalrep_write_stream_stop (StringInfo out)
 
void logicalrep_write_stream_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
TransactionId logicalrep_read_stream_commit (StringInfo in, LogicalRepCommitData *commit_data)
 
void logicalrep_write_stream_abort (StringInfo out, TransactionId xid, TransactionId subxid, XLogRecPtr abort_lsn, TimestampTz abort_time, bool write_abort_info)
 
void logicalrep_read_stream_abort (StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
 
const char * logicalrep_message_type (LogicalRepMsgType action)
 
bool logicalrep_should_publish_column (Form_pg_attribute att, Bitmapset *columns, PublishGencolsType include_gencols_type)
 

Macro Definition Documentation

◆ LOGICALREP_COLUMN_BINARY

#define LOGICALREP_COLUMN_BINARY   'b' /* added in PG14 */

Definition at line 99 of file logicalproto.h.

◆ LOGICALREP_COLUMN_NULL

#define LOGICALREP_COLUMN_NULL   'n'

Definition at line 96 of file logicalproto.h.

◆ LOGICALREP_COLUMN_TEXT

#define LOGICALREP_COLUMN_TEXT   't'

Definition at line 98 of file logicalproto.h.

◆ LOGICALREP_COLUMN_UNCHANGED

#define LOGICALREP_COLUMN_UNCHANGED   'u'

Definition at line 97 of file logicalproto.h.

◆ LOGICALREP_PROTO_MAX_VERSION_NUM

#define LOGICALREP_PROTO_MAX_VERSION_NUM   LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM

Definition at line 45 of file logicalproto.h.

◆ LOGICALREP_PROTO_MIN_VERSION_NUM

#define LOGICALREP_PROTO_MIN_VERSION_NUM   1

Definition at line 40 of file logicalproto.h.

◆ LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM

#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM   4

Definition at line 44 of file logicalproto.h.

◆ LOGICALREP_PROTO_STREAM_VERSION_NUM

#define LOGICALREP_PROTO_STREAM_VERSION_NUM   2

Definition at line 42 of file logicalproto.h.

◆ LOGICALREP_PROTO_TWOPHASE_VERSION_NUM

#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM   3

Definition at line 43 of file logicalproto.h.

◆ LOGICALREP_PROTO_VERSION_NUM

#define LOGICALREP_PROTO_VERSION_NUM   1

Definition at line 41 of file logicalproto.h.

Typedef Documentation

◆ LogicalRepBeginData

◆ LogicalRepCommitData

◆ LogicalRepCommitPreparedTxnData

◆ LogicalRepMsgType

◆ LogicalRepPreparedTxnData

◆ LogicalRepRelation

◆ LogicalRepRelId

Definition at line 101 of file logicalproto.h.

◆ LogicalRepRollbackPreparedTxnData

◆ LogicalRepStreamAbortData

◆ LogicalRepTupleData

◆ LogicalRepTyp

typedef struct LogicalRepTyp LogicalRepTyp

Enumeration Type Documentation

◆ LogicalRepMsgType

Enumerator
LOGICAL_REP_MSG_BEGIN 
LOGICAL_REP_MSG_COMMIT 
LOGICAL_REP_MSG_ORIGIN 
LOGICAL_REP_MSG_INSERT 
LOGICAL_REP_MSG_UPDATE 
LOGICAL_REP_MSG_DELETE 
LOGICAL_REP_MSG_TRUNCATE 
LOGICAL_REP_MSG_RELATION 
LOGICAL_REP_MSG_TYPE 
LOGICAL_REP_MSG_MESSAGE 
LOGICAL_REP_MSG_BEGIN_PREPARE 
LOGICAL_REP_MSG_PREPARE 
LOGICAL_REP_MSG_COMMIT_PREPARED 
LOGICAL_REP_MSG_ROLLBACK_PREPARED 
LOGICAL_REP_MSG_STREAM_START 
LOGICAL_REP_MSG_STREAM_STOP 
LOGICAL_REP_MSG_STREAM_COMMIT 
LOGICAL_REP_MSG_STREAM_ABORT 
LOGICAL_REP_MSG_STREAM_PREPARE 

Definition at line 57 of file logicalproto.h.

58{
LogicalRepMsgType
Definition: logicalproto.h:58
@ LOGICAL_REP_MSG_INSERT
Definition: logicalproto.h:62
@ LOGICAL_REP_MSG_TRUNCATE
Definition: logicalproto.h:65
@ LOGICAL_REP_MSG_STREAM_STOP
Definition: logicalproto.h:74
@ LOGICAL_REP_MSG_BEGIN
Definition: logicalproto.h:59
@ LOGICAL_REP_MSG_STREAM_PREPARE
Definition: logicalproto.h:77
@ LOGICAL_REP_MSG_STREAM_ABORT
Definition: logicalproto.h:76
@ LOGICAL_REP_MSG_BEGIN_PREPARE
Definition: logicalproto.h:69
@ LOGICAL_REP_MSG_STREAM_START
Definition: logicalproto.h:73
@ LOGICAL_REP_MSG_COMMIT
Definition: logicalproto.h:60
@ LOGICAL_REP_MSG_PREPARE
Definition: logicalproto.h:70
@ LOGICAL_REP_MSG_RELATION
Definition: logicalproto.h:66
@ LOGICAL_REP_MSG_MESSAGE
Definition: logicalproto.h:68
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
Definition: logicalproto.h:72
@ LOGICAL_REP_MSG_COMMIT_PREPARED
Definition: logicalproto.h:71
@ LOGICAL_REP_MSG_TYPE
Definition: logicalproto.h:67
@ LOGICAL_REP_MSG_DELETE
Definition: logicalproto.h:64
@ LOGICAL_REP_MSG_STREAM_COMMIT
Definition: logicalproto.h:75
@ LOGICAL_REP_MSG_ORIGIN
Definition: logicalproto.h:61
@ LOGICAL_REP_MSG_UPDATE
Definition: logicalproto.h:63

Function Documentation

◆ logicalrep_message_type()

const char * logicalrep_message_type ( LogicalRepMsgType  action)

Definition at line 1209 of file proto.c.

1210{
1211 static char err_unknown[20];
1212
1213 switch (action)
1214 {
1216 return "BEGIN";
1218 return "COMMIT";
1220 return "ORIGIN";
1222 return "INSERT";
1224 return "UPDATE";
1226 return "DELETE";
1228 return "TRUNCATE";
1230 return "RELATION";
1232 return "TYPE";
1234 return "MESSAGE";
1236 return "BEGIN PREPARE";
1238 return "PREPARE";
1240 return "COMMIT PREPARED";
1242 return "ROLLBACK PREPARED";
1244 return "STREAM START";
1246 return "STREAM STOP";
1248 return "STREAM COMMIT";
1250 return "STREAM ABORT";
1252 return "STREAM PREPARE";
1253 }
1254
1255 /*
1256 * This message provides context in the error raised when applying a
1257 * logical message. So we can't throw an error here. Return an unknown
1258 * indicator value so that the original error is still reported.
1259 */
1260 snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);
1261
1262 return err_unknown;
1263}
#define snprintf
Definition: port.h:239

References generate_unaccent_rules::action, LOGICAL_REP_MSG_BEGIN, LOGICAL_REP_MSG_BEGIN_PREPARE, LOGICAL_REP_MSG_COMMIT, LOGICAL_REP_MSG_COMMIT_PREPARED, LOGICAL_REP_MSG_DELETE, LOGICAL_REP_MSG_INSERT, LOGICAL_REP_MSG_MESSAGE, LOGICAL_REP_MSG_ORIGIN, LOGICAL_REP_MSG_PREPARE, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_ROLLBACK_PREPARED, LOGICAL_REP_MSG_STREAM_ABORT, LOGICAL_REP_MSG_STREAM_COMMIT, LOGICAL_REP_MSG_STREAM_PREPARE, LOGICAL_REP_MSG_STREAM_START, LOGICAL_REP_MSG_STREAM_STOP, LOGICAL_REP_MSG_TRUNCATE, LOGICAL_REP_MSG_TYPE, LOGICAL_REP_MSG_UPDATE, and snprintf.

Referenced by apply_error_callback().

◆ logicalrep_read_begin()

void logicalrep_read_begin ( StringInfo  in,
LogicalRepBeginData begin_data 
)

Definition at line 63 of file proto.c.

64{
65 /* read fields */
66 begin_data->final_lsn = pq_getmsgint64(in);
67 if (begin_data->final_lsn == InvalidXLogRecPtr)
68 elog(ERROR, "final_lsn not set in begin message");
69 begin_data->committime = pq_getmsgint64(in);
70 begin_data->xid = pq_getmsgint(in, 4);
71}
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:415
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:453
XLogRecPtr final_lsn
Definition: logicalproto.h:129
TransactionId xid
Definition: logicalproto.h:131
TimestampTz committime
Definition: logicalproto.h:130
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References LogicalRepBeginData::committime, elog, ERROR, LogicalRepBeginData::final_lsn, InvalidXLogRecPtr, pq_getmsgint(), pq_getmsgint64(), and LogicalRepBeginData::xid.

Referenced by apply_handle_begin().

◆ logicalrep_read_begin_prepare()

void logicalrep_read_begin_prepare ( StringInfo  in,
LogicalRepPreparedTxnData begin_data 
)

Definition at line 134 of file proto.c.

135{
136 /* read fields */
137 begin_data->prepare_lsn = pq_getmsgint64(in);
138 if (begin_data->prepare_lsn == InvalidXLogRecPtr)
139 elog(ERROR, "prepare_lsn not set in begin prepare message");
140 begin_data->end_lsn = pq_getmsgint64(in);
141 if (begin_data->end_lsn == InvalidXLogRecPtr)
142 elog(ERROR, "end_lsn not set in begin prepare message");
143 begin_data->prepare_time = pq_getmsgint64(in);
144 begin_data->xid = pq_getmsgint(in, 4);
145
146 /* read gid (copy it into a pre-allocated buffer) */
147 strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
148}
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:579

References elog, LogicalRepPreparedTxnData::end_lsn, ERROR, LogicalRepPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepPreparedTxnData::prepare_lsn, LogicalRepPreparedTxnData::prepare_time, strlcpy(), and LogicalRepPreparedTxnData::xid.

Referenced by apply_handle_begin_prepare().

◆ logicalrep_read_commit()

void logicalrep_read_commit ( StringInfo  in,
LogicalRepCommitData commit_data 
)

Definition at line 98 of file proto.c.

99{
100 /* read flags (unused for now) */
101 uint8 flags = pq_getmsgbyte(in);
102
103 if (flags != 0)
104 elog(ERROR, "unrecognized flags %u in commit message", flags);
105
106 /* read fields */
107 commit_data->commit_lsn = pq_getmsgint64(in);
108 commit_data->end_lsn = pq_getmsgint64(in);
109 commit_data->committime = pq_getmsgint64(in);
110}
uint8_t uint8
Definition: c.h:486
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
TimestampTz committime
Definition: logicalproto.h:138

References LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, elog, LogicalRepCommitData::end_lsn, ERROR, pq_getmsgbyte(), and pq_getmsgint64().

Referenced by apply_handle_commit().

◆ logicalrep_read_commit_prepared()

void logicalrep_read_commit_prepared ( StringInfo  in,
LogicalRepCommitPreparedTxnData prepare_data 
)

Definition at line 267 of file proto.c.

268{
269 /* read flags */
270 uint8 flags = pq_getmsgbyte(in);
271
272 if (flags != 0)
273 elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
274
275 /* read fields */
276 prepare_data->commit_lsn = pq_getmsgint64(in);
277 if (prepare_data->commit_lsn == InvalidXLogRecPtr)
278 elog(ERROR, "commit_lsn is not set in commit prepared message");
279 prepare_data->end_lsn = pq_getmsgint64(in);
280 if (prepare_data->end_lsn == InvalidXLogRecPtr)
281 elog(ERROR, "end_lsn is not set in commit prepared message");
282 prepare_data->commit_time = pq_getmsgint64(in);
283 prepare_data->xid = pq_getmsgint(in, 4);
284
285 /* read gid (copy it into a pre-allocated buffer) */
286 strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
287}

References LogicalRepCommitPreparedTxnData::commit_lsn, LogicalRepCommitPreparedTxnData::commit_time, elog, LogicalRepCommitPreparedTxnData::end_lsn, ERROR, LogicalRepCommitPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), strlcpy(), and LogicalRepCommitPreparedTxnData::xid.

Referenced by apply_handle_commit_prepared().

◆ logicalrep_read_delete()

LogicalRepRelId logicalrep_read_delete ( StringInfo  in,
LogicalRepTupleData oldtup 
)

Definition at line 561 of file proto.c.

562{
563 char action;
564 LogicalRepRelId relid;
565
566 /* read the relation id */
567 relid = pq_getmsgint(in, 4);
568
569 /* read and verify action */
570 action = pq_getmsgbyte(in);
571 if (action != 'K' && action != 'O')
572 elog(ERROR, "expected action 'O' or 'K', got %c", action);
573
574 logicalrep_read_tuple(in, oldtup);
575
576 return relid;
577}
uint32 LogicalRepRelId
Definition: logicalproto.h:101
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:861

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_delete().

◆ logicalrep_read_insert()

LogicalRepRelId logicalrep_read_insert ( StringInfo  in,
LogicalRepTupleData newtup 
)

Definition at line 428 of file proto.c.

429{
430 char action;
431 LogicalRepRelId relid;
432
433 /* read the relation id */
434 relid = pq_getmsgint(in, 4);
435
436 action = pq_getmsgbyte(in);
437 if (action != 'N')
438 elog(ERROR, "expected new tuple but got %d",
439 action);
440
441 logicalrep_read_tuple(in, newtup);
442
443 return relid;
444}

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_insert().

◆ logicalrep_read_origin()

char * logicalrep_read_origin ( StringInfo  in,
XLogRecPtr origin_lsn 
)

Definition at line 390 of file proto.c.

391{
392 /* fixed fields */
393 *origin_lsn = pq_getmsgint64(in);
394
395 /* return origin */
396 return pstrdup(pq_getmsgstring(in));
397}
char * pstrdup(const char *in)
Definition: mcxt.c:1696

References pq_getmsgint64(), pq_getmsgstring(), and pstrdup().

◆ logicalrep_read_prepare()

void logicalrep_read_prepare ( StringInfo  in,
LogicalRepPreparedTxnData prepare_data 
)

Definition at line 228 of file proto.c.

229{
230 logicalrep_read_prepare_common(in, "prepare", prepare_data);
231}
static void logicalrep_read_prepare_common(StringInfo in, char *msgtype, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:199

References logicalrep_read_prepare_common().

Referenced by apply_handle_prepare().

◆ logicalrep_read_rel()

LogicalRepRelation * logicalrep_read_rel ( StringInfo  in)

Definition at line 698 of file proto.c.

699{
701
702 rel->remoteid = pq_getmsgint(in, 4);
703
704 /* Read relation name from stream */
706 rel->relname = pstrdup(pq_getmsgstring(in));
707
708 /* Read the replica identity. */
709 rel->replident = pq_getmsgbyte(in);
710
711 /* Get attribute description */
712 logicalrep_read_attrs(in, rel);
713
714 return rel;
715}
void * palloc(Size size)
Definition: mcxt.c:1317
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
Definition: proto.c:985
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:1047
LogicalRepRelId remoteid
Definition: logicalproto.h:107

References logicalrep_read_attrs(), logicalrep_read_namespace(), LogicalRepRelation::nspname, palloc(), pq_getmsgbyte(), pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepRelation::relname, LogicalRepRelation::remoteid, and LogicalRepRelation::replident.

Referenced by apply_handle_relation().

◆ logicalrep_read_rollback_prepared()

void logicalrep_read_rollback_prepared ( StringInfo  in,
LogicalRepRollbackPreparedTxnData rollback_data 
)

Definition at line 325 of file proto.c.

327{
328 /* read flags */
329 uint8 flags = pq_getmsgbyte(in);
330
331 if (flags != 0)
332 elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
333
334 /* read fields */
335 rollback_data->prepare_end_lsn = pq_getmsgint64(in);
336 if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
337 elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
338 rollback_data->rollback_end_lsn = pq_getmsgint64(in);
339 if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
340 elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
341 rollback_data->prepare_time = pq_getmsgint64(in);
342 rollback_data->rollback_time = pq_getmsgint64(in);
343 rollback_data->xid = pq_getmsgint(in, 4);
344
345 /* read gid (copy it into a pre-allocated buffer) */
346 strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
347}

References elog, ERROR, LogicalRepRollbackPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepRollbackPreparedTxnData::prepare_end_lsn, LogicalRepRollbackPreparedTxnData::prepare_time, LogicalRepRollbackPreparedTxnData::rollback_end_lsn, LogicalRepRollbackPreparedTxnData::rollback_time, strlcpy(), and LogicalRepRollbackPreparedTxnData::xid.

Referenced by apply_handle_rollback_prepared().

◆ logicalrep_read_stream_abort()

void logicalrep_read_stream_abort ( StringInfo  in,
LogicalRepStreamAbortData abort_data,
bool  read_abort_info 
)

Definition at line 1184 of file proto.c.

1187{
1188 Assert(abort_data);
1189
1190 abort_data->xid = pq_getmsgint(in, 4);
1191 abort_data->subxid = pq_getmsgint(in, 4);
1192
1193 if (read_abort_info)
1194 {
1195 abort_data->abort_lsn = pq_getmsgint64(in);
1196 abort_data->abort_time = pq_getmsgint64(in);
1197 }
1198 else
1199 {
1200 abort_data->abort_lsn = InvalidXLogRecPtr;
1201 abort_data->abort_time = 0;
1202 }
1203}
#define Assert(condition)
Definition: c.h:815

References LogicalRepStreamAbortData::abort_lsn, LogicalRepStreamAbortData::abort_time, Assert, InvalidXLogRecPtr, pq_getmsgint(), pq_getmsgint64(), LogicalRepStreamAbortData::subxid, and LogicalRepStreamAbortData::xid.

Referenced by apply_handle_stream_abort().

◆ logicalrep_read_stream_commit()

TransactionId logicalrep_read_stream_commit ( StringInfo  in,
LogicalRepCommitData commit_data 
)

Definition at line 1129 of file proto.c.

1130{
1131 TransactionId xid;
1132 uint8 flags;
1133
1134 xid = pq_getmsgint(in, 4);
1135
1136 /* read flags (unused for now) */
1137 flags = pq_getmsgbyte(in);
1138
1139 if (flags != 0)
1140 elog(ERROR, "unrecognized flags %u in commit message", flags);
1141
1142 /* read fields */
1143 commit_data->commit_lsn = pq_getmsgint64(in);
1144 commit_data->end_lsn = pq_getmsgint64(in);
1145 commit_data->committime = pq_getmsgint64(in);
1146
1147 return xid;
1148}
uint32 TransactionId
Definition: c.h:609

References LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, elog, LogicalRepCommitData::end_lsn, ERROR, pq_getmsgbyte(), pq_getmsgint(), and pq_getmsgint64().

Referenced by apply_handle_stream_commit().

◆ logicalrep_read_stream_prepare()

void logicalrep_read_stream_prepare ( StringInfo  in,
LogicalRepPreparedTxnData prepare_data 
)

Definition at line 365 of file proto.c.

366{
367 logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
368}

References logicalrep_read_prepare_common().

Referenced by apply_handle_stream_prepare().

◆ logicalrep_read_stream_start()

TransactionId logicalrep_read_stream_start ( StringInfo  in,
bool *  first_segment 
)

Definition at line 1079 of file proto.c.

1080{
1081 TransactionId xid;
1082
1083 Assert(first_segment);
1084
1085 xid = pq_getmsgint(in, 4);
1086 *first_segment = (pq_getmsgbyte(in) == 1);
1087
1088 return xid;
1089}

References Assert, pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_stream_start().

◆ logicalrep_read_truncate()

List * logicalrep_read_truncate ( StringInfo  in,
bool *  cascade,
bool *  restart_seqs 
)

Definition at line 615 of file proto.c.

617{
618 int i;
619 int nrelids;
620 List *relids = NIL;
621 uint8 flags;
622
623 nrelids = pq_getmsgint(in, 4);
624
625 /* read and decode truncate flags */
626 flags = pq_getmsgint(in, 1);
627 *cascade = (flags & TRUNCATE_CASCADE) > 0;
628 *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
629
630 for (i = 0; i < nrelids; i++)
631 relids = lappend_oid(relids, pq_getmsgint(in, 4));
632
633 return relids;
634}
int i
Definition: isn.c:72
List * lappend_oid(List *list, Oid datum)
Definition: list.c:375
#define NIL
Definition: pg_list.h:68
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:30
#define TRUNCATE_CASCADE
Definition: proto.c:29
Definition: pg_list.h:54

References i, lappend_oid(), NIL, pq_getmsgint(), TRUNCATE_CASCADE, and TRUNCATE_RESTART_SEQS.

Referenced by apply_handle_truncate().

◆ logicalrep_read_typ()

void logicalrep_read_typ ( StringInfo  in,
LogicalRepTyp ltyp 
)

Definition at line 754 of file proto.c.

755{
756 ltyp->remoteid = pq_getmsgint(in, 4);
757
758 /* Read type name from stream */
760 ltyp->typname = pstrdup(pq_getmsgstring(in));
761}

References logicalrep_read_namespace(), LogicalRepTyp::nspname, pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepTyp::remoteid, and LogicalRepTyp::typname.

Referenced by apply_handle_type().

◆ logicalrep_read_update()

LogicalRepRelId logicalrep_read_update ( StringInfo  in,
bool *  has_oldtuple,
LogicalRepTupleData oldtup,
LogicalRepTupleData newtup 
)

Definition at line 487 of file proto.c.

490{
491 char action;
492 LogicalRepRelId relid;
493
494 /* read the relation id */
495 relid = pq_getmsgint(in, 4);
496
497 /* read and verify action */
498 action = pq_getmsgbyte(in);
499 if (action != 'K' && action != 'O' && action != 'N')
500 elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
501 action);
502
503 /* check for old tuple */
504 if (action == 'K' || action == 'O')
505 {
506 logicalrep_read_tuple(in, oldtup);
507 *has_oldtuple = true;
508
509 action = pq_getmsgbyte(in);
510 }
511 else
512 *has_oldtuple = false;
513
514 /* check for new tuple */
515 if (action != 'N')
516 elog(ERROR, "expected action 'N', got %c",
517 action);
518
519 logicalrep_read_tuple(in, newtup);
520
521 return relid;
522}

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_update().

◆ logicalrep_should_publish_column()

bool logicalrep_should_publish_column ( Form_pg_attribute  att,
Bitmapset columns,
PublishGencolsType  include_gencols_type 
)

Definition at line 1279 of file proto.c.

1281{
1282 if (att->attisdropped)
1283 return false;
1284
1285 /* If a column list is provided, publish only the cols in that list. */
1286 if (columns)
1287 return bms_is_member(att->attnum, columns);
1288
1289 /* All non-generated columns are always published. */
1290 if (!att->attgenerated)
1291 return true;
1292
1293 /*
1294 * Stored generated columns are only published when the user sets
1295 * publish_generated_columns as stored.
1296 */
1297 if (att->attgenerated == ATTRIBUTE_GENERATED_STORED)
1298 return include_gencols_type == PUBLISH_GENCOLS_STORED;
1299
1300 return false;
1301}
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:510

References bms_is_member().

Referenced by logicalrep_write_attrs(), logicalrep_write_tuple(), and send_relation_and_attrs().

◆ logicalrep_write_begin()

void logicalrep_write_begin ( StringInfo  out,
ReorderBufferTXN txn 
)

Definition at line 49 of file proto.c.

50{
52
53 /* fixed fields */
54 pq_sendint64(out, txn->final_lsn);
56 pq_sendint32(out, txn->xid);
57}
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:144
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152
TimestampTz commit_time
XLogRecPtr final_lsn
TransactionId xid
union ReorderBufferTXN::@116 xact_time

References ReorderBufferTXN::commit_time, ReorderBufferTXN::final_lsn, LOGICAL_REP_MSG_BEGIN, pq_sendbyte(), pq_sendint32(), pq_sendint64(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_send_begin().

◆ logicalrep_write_begin_prepare()

void logicalrep_write_begin_prepare ( StringInfo  out,
ReorderBufferTXN txn 
)

Definition at line 116 of file proto.c.

117{
119
120 /* fixed fields */
121 pq_sendint64(out, txn->final_lsn);
122 pq_sendint64(out, txn->end_lsn);
124 pq_sendint32(out, txn->xid);
125
126 /* send gid */
127 pq_sendstring(out, txn->gid);
128}
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:195
XLogRecPtr end_lsn
TimestampTz prepare_time

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_BEGIN_PREPARE, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::prepare_time, ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_begin_prepare_txn().

◆ logicalrep_write_commit()

void logicalrep_write_commit ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 78 of file proto.c.

80{
81 uint8 flags = 0;
82
84
85 /* send the flags field (unused for now) */
86 pq_sendbyte(out, flags);
87
88 /* send fields */
89 pq_sendint64(out, commit_lsn);
90 pq_sendint64(out, txn->end_lsn);
92}

References ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, LOGICAL_REP_MSG_COMMIT, pq_sendbyte(), pq_sendint64(), and ReorderBufferTXN::xact_time.

Referenced by pgoutput_commit_txn().

◆ logicalrep_write_commit_prepared()

void logicalrep_write_commit_prepared ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 237 of file proto.c.

239{
240 uint8 flags = 0;
241
243
244 /*
245 * This should only ever happen for two-phase commit transactions, in
246 * which case we expect to have a valid GID.
247 */
248 Assert(txn->gid != NULL);
249
250 /* send the flags field */
251 pq_sendbyte(out, flags);
252
253 /* send fields */
254 pq_sendint64(out, commit_lsn);
255 pq_sendint64(out, txn->end_lsn);
257 pq_sendint32(out, txn->xid);
258
259 /* send gid */
260 pq_sendstring(out, txn->gid);
261}

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_COMMIT_PREPARED, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_commit_prepared_txn().

◆ logicalrep_write_delete()

void logicalrep_write_delete ( StringInfo  out,
TransactionId  xid,
Relation  rel,
TupleTableSlot oldslot,
bool  binary,
Bitmapset columns,
PublishGencolsType  include_gencols_type 
)

Definition at line 528 of file proto.c.

532{
533 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
534 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
535 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
536
538
539 /* transaction ID (if not valid, we're not streaming) */
540 if (TransactionIdIsValid(xid))
541 pq_sendint32(out, xid);
542
543 /* use Oid as relation identifier */
545
546 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
547 pq_sendbyte(out, 'O'); /* old tuple follows */
548 else
549 pq_sendbyte(out, 'K'); /* old key follows */
550
551 logicalrep_write_tuple(out, rel, oldslot, binary, columns,
552 include_gencols_type);
553}
static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:767
#define RelationGetRelid(relation)
Definition: rel.h:506
Form_pg_class rd_rel
Definition: rel.h:111
#define TransactionIdIsValid(xid)
Definition: transam.h:41

References Assert, LOGICAL_REP_MSG_DELETE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

◆ logicalrep_write_insert()

void logicalrep_write_insert ( StringInfo  out,
TransactionId  xid,
Relation  rel,
TupleTableSlot newslot,
bool  binary,
Bitmapset columns,
PublishGencolsType  include_gencols_type 
)

Definition at line 403 of file proto.c.

407{
409
410 /* transaction ID (if not valid, we're not streaming) */
411 if (TransactionIdIsValid(xid))
412 pq_sendint32(out, xid);
413
414 /* use Oid as relation identifier */
416
417 pq_sendbyte(out, 'N'); /* new tuple follows */
418 logicalrep_write_tuple(out, rel, newslot, binary, columns,
419 include_gencols_type);
420}

References LOGICAL_REP_MSG_INSERT, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

◆ logicalrep_write_message()

void logicalrep_write_message ( StringInfo  out,
TransactionId  xid,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)

Definition at line 640 of file proto.c.

643{
644 uint8 flags = 0;
645
647
648 /* encode and send message flags */
649 if (transactional)
650 flags |= MESSAGE_TRANSACTIONAL;
651
652 /* transaction ID (if not valid, we're not streaming) */
653 if (TransactionIdIsValid(xid))
654 pq_sendint32(out, xid);
655
656 pq_sendint8(out, flags);
657 pq_sendint64(out, lsn);
658 pq_sendstring(out, prefix);
659 pq_sendint32(out, sz);
660 pq_sendbytes(out, message, sz);
661}
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
Definition: pqformat.c:126
static void pq_sendint8(StringInfo buf, uint8 i)
Definition: pqformat.h:128
#define MESSAGE_TRANSACTIONAL
Definition: proto.c:28

References LOGICAL_REP_MSG_MESSAGE, MESSAGE_TRANSACTIONAL, pq_sendbyte(), pq_sendbytes(), pq_sendint32(), pq_sendint64(), pq_sendint8(), pq_sendstring(), and TransactionIdIsValid.

Referenced by pgoutput_message().

◆ logicalrep_write_origin()

void logicalrep_write_origin ( StringInfo  out,
const char *  origin,
XLogRecPtr  origin_lsn 
)

Definition at line 374 of file proto.c.

376{
378
379 /* fixed fields */
380 pq_sendint64(out, origin_lsn);
381
382 /* origin string */
383 pq_sendstring(out, origin);
384}

References LOGICAL_REP_MSG_ORIGIN, pq_sendbyte(), pq_sendint64(), and pq_sendstring().

Referenced by send_repl_origin().

◆ logicalrep_write_prepare()

void logicalrep_write_prepare ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)

Definition at line 187 of file proto.c.

189{
191 txn, prepare_lsn);
192}
static void logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:155

References LOGICAL_REP_MSG_PREPARE, and logicalrep_write_prepare_common().

Referenced by pgoutput_prepare_txn().

◆ logicalrep_write_rel()

void logicalrep_write_rel ( StringInfo  out,
TransactionId  xid,
Relation  rel,
Bitmapset columns,
PublishGencolsType  include_gencols_type 
)

Definition at line 667 of file proto.c.

670{
671 char *relname;
672
674
675 /* transaction ID (if not valid, we're not streaming) */
676 if (TransactionIdIsValid(xid))
677 pq_sendint32(out, xid);
678
679 /* use Oid as relation identifier */
681
682 /* send qualified relation name */
686
687 /* send replica identity */
688 pq_sendbyte(out, rel->rd_rel->relreplident);
689
690 /* send the attribute info */
691 logicalrep_write_attrs(out, rel, columns, include_gencols_type);
692}
NameData relname
Definition: pg_class.h:38
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:1027
static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:921
#define RelationGetRelationName(relation)
Definition: rel.h:540
#define RelationGetNamespace(relation)
Definition: rel.h:547

References LOGICAL_REP_MSG_RELATION, logicalrep_write_attrs(), logicalrep_write_namespace(), pq_sendbyte(), pq_sendint32(), pq_sendstring(), RelationData::rd_rel, RelationGetNamespace, RelationGetRelationName, RelationGetRelid, relname, and TransactionIdIsValid.

Referenced by send_relation_and_attrs().

◆ logicalrep_write_rollback_prepared()

void logicalrep_write_rollback_prepared ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  prepare_end_lsn,
TimestampTz  prepare_time 
)

Definition at line 293 of file proto.c.

296{
297 uint8 flags = 0;
298
300
301 /*
302 * This should only ever happen for two-phase commit transactions, in
303 * which case we expect to have a valid GID.
304 */
305 Assert(txn->gid != NULL);
306
307 /* send the flags field */
308 pq_sendbyte(out, flags);
309
310 /* send fields */
311 pq_sendint64(out, prepare_end_lsn);
312 pq_sendint64(out, txn->end_lsn);
313 pq_sendint64(out, prepare_time);
315 pq_sendint32(out, txn->xid);
316
317 /* send gid */
318 pq_sendstring(out, txn->gid);
319}

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_ROLLBACK_PREPARED, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_rollback_prepared_txn().

◆ logicalrep_write_stream_abort()

void logicalrep_write_stream_abort ( StringInfo  out,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  abort_lsn,
TimestampTz  abort_time,
bool  write_abort_info 
)

Definition at line 1158 of file proto.c.

1161{
1163
1165
1166 /* transaction ID */
1167 pq_sendint32(out, xid);
1168 pq_sendint32(out, subxid);
1169
1170 if (write_abort_info)
1171 {
1172 pq_sendint64(out, abort_lsn);
1173 pq_sendint64(out, abort_time);
1174 }
1175}

References Assert, LOGICAL_REP_MSG_STREAM_ABORT, pq_sendbyte(), pq_sendint32(), pq_sendint64(), and TransactionIdIsValid.

Referenced by pgoutput_stream_abort().

◆ logicalrep_write_stream_commit()

void logicalrep_write_stream_commit ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 1104 of file proto.c.

1106{
1107 uint8 flags = 0;
1108
1110
1112
1113 /* transaction ID */
1114 pq_sendint32(out, txn->xid);
1115
1116 /* send the flags field (unused for now) */
1117 pq_sendbyte(out, flags);
1118
1119 /* send fields */
1120 pq_sendint64(out, commit_lsn);
1121 pq_sendint64(out, txn->end_lsn);
1123}

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, LOGICAL_REP_MSG_STREAM_COMMIT, pq_sendbyte(), pq_sendint32(), pq_sendint64(), TransactionIdIsValid, ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_stream_commit().

◆ logicalrep_write_stream_prepare()

void logicalrep_write_stream_prepare ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)

Definition at line 353 of file proto.c.

356{
358 txn, prepare_lsn);
359}

References LOGICAL_REP_MSG_STREAM_PREPARE, and logicalrep_write_prepare_common().

Referenced by pgoutput_stream_prepare_txn().

◆ logicalrep_write_stream_start()

void logicalrep_write_stream_start ( StringInfo  out,
TransactionId  xid,
bool  first_segment 
)

Definition at line 1061 of file proto.c.

1063{
1065
1067
1068 /* transaction ID (we're starting to stream, so must be valid) */
1069 pq_sendint32(out, xid);
1070
1071 /* 1 if this is the first streaming segment for this xid */
1072 pq_sendbyte(out, first_segment ? 1 : 0);
1073}

References Assert, LOGICAL_REP_MSG_STREAM_START, pq_sendbyte(), pq_sendint32(), and TransactionIdIsValid.

Referenced by pgoutput_stream_start().

◆ logicalrep_write_stream_stop()

void logicalrep_write_stream_stop ( StringInfo  out)

Definition at line 1095 of file proto.c.

1096{
1098}

References LOGICAL_REP_MSG_STREAM_STOP, and pq_sendbyte().

Referenced by pgoutput_stream_stop().

◆ logicalrep_write_truncate()

void logicalrep_write_truncate ( StringInfo  out,
TransactionId  xid,
int  nrelids,
Oid  relids[],
bool  cascade,
bool  restart_seqs 
)

Definition at line 583 of file proto.c.

588{
589 int i;
590 uint8 flags = 0;
591
593
594 /* transaction ID (if not valid, we're not streaming) */
595 if (TransactionIdIsValid(xid))
596 pq_sendint32(out, xid);
597
598 pq_sendint32(out, nrelids);
599
600 /* encode and send truncate flags */
601 if (cascade)
602 flags |= TRUNCATE_CASCADE;
603 if (restart_seqs)
604 flags |= TRUNCATE_RESTART_SEQS;
605 pq_sendint8(out, flags);
606
607 for (i = 0; i < nrelids; i++)
608 pq_sendint32(out, relids[i]);
609}

References i, LOGICAL_REP_MSG_TRUNCATE, pq_sendbyte(), pq_sendint32(), pq_sendint8(), TransactionIdIsValid, TRUNCATE_CASCADE, and TRUNCATE_RESTART_SEQS.

Referenced by pgoutput_truncate().

◆ logicalrep_write_typ()

void logicalrep_write_typ ( StringInfo  out,
TransactionId  xid,
Oid  typoid 
)

Definition at line 723 of file proto.c.

724{
725 Oid basetypoid = getBaseType(typoid);
726 HeapTuple tup;
727 Form_pg_type typtup;
728
730
731 /* transaction ID (if not valid, we're not streaming) */
732 if (TransactionIdIsValid(xid))
733 pq_sendint32(out, xid);
734
735 tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
736 if (!HeapTupleIsValid(tup))
737 elog(ERROR, "cache lookup failed for type %u", basetypoid);
738 typtup = (Form_pg_type) GETSTRUCT(tup);
739
740 /* use Oid as type identifier */
741 pq_sendint32(out, typoid);
742
743 /* send qualified type name */
744 logicalrep_write_namespace(out, typtup->typnamespace);
745 pq_sendstring(out, NameStr(typtup->typname));
746
747 ReleaseSysCache(tup);
748}
#define NameStr(name)
Definition: c.h:703
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
Definition: htup_details.h:728
Oid getBaseType(Oid typid)
Definition: lsyscache.c:2548
FormData_pg_type * Form_pg_type
Definition: pg_type.h:261
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:257
unsigned int Oid
Definition: postgres_ext.h:32
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:269
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:221

References elog, ERROR, getBaseType(), GETSTRUCT(), HeapTupleIsValid, LOGICAL_REP_MSG_TYPE, logicalrep_write_namespace(), NameStr, ObjectIdGetDatum(), pq_sendbyte(), pq_sendint32(), pq_sendstring(), ReleaseSysCache(), SearchSysCache1(), and TransactionIdIsValid.

Referenced by send_relation_and_attrs().

◆ logicalrep_write_update()

void logicalrep_write_update ( StringInfo  out,
TransactionId  xid,
Relation  rel,
TupleTableSlot oldslot,
TupleTableSlot newslot,
bool  binary,
Bitmapset columns,
PublishGencolsType  include_gencols_type 
)

Definition at line 450 of file proto.c.

454{
456
457 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
458 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
459 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
460
461 /* transaction ID (if not valid, we're not streaming) */
462 if (TransactionIdIsValid(xid))
463 pq_sendint32(out, xid);
464
465 /* use Oid as relation identifier */
467
468 if (oldslot != NULL)
469 {
470 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
471 pq_sendbyte(out, 'O'); /* old tuple follows */
472 else
473 pq_sendbyte(out, 'K'); /* old key follows */
474 logicalrep_write_tuple(out, rel, oldslot, binary, columns,
475 include_gencols_type);
476 }
477
478 pq_sendbyte(out, 'N'); /* new tuple follows */
479 logicalrep_write_tuple(out, rel, newslot, binary, columns,
480 include_gencols_type);
481}

References Assert, LOGICAL_REP_MSG_UPDATE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().