PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
proto.c File Reference
#include "postgres.h"
#include "access/sysattr.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_type.h"
#include "libpq/pqformat.h"
#include "replication/logicalproto.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
Include dependency graph for proto.c:

Go to the source code of this file.

Macros

#define LOGICALREP_IS_REPLICA_IDENTITY   1
 
#define MESSAGE_TRANSACTIONAL   (1<<0)
 
#define TRUNCATE_CASCADE   (1<<0)
 
#define TRUNCATE_RESTART_SEQS   (1<<1)
 

Functions

static void logicalrep_write_attrs (StringInfo out, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
 
static void logicalrep_write_tuple (StringInfo out, Relation rel, TupleTableSlot *slot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
 
static void logicalrep_read_attrs (StringInfo in, LogicalRepRelation *rel)
 
static void logicalrep_read_tuple (StringInfo in, LogicalRepTupleData *tuple)
 
static void logicalrep_write_namespace (StringInfo out, Oid nspid)
 
static const char * logicalrep_read_namespace (StringInfo in)
 
void logicalrep_write_begin (StringInfo out, ReorderBufferTXN *txn)
 
void logicalrep_read_begin (StringInfo in, LogicalRepBeginData *begin_data)
 
void logicalrep_write_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
void logicalrep_read_commit (StringInfo in, LogicalRepCommitData *commit_data)
 
void logicalrep_write_begin_prepare (StringInfo out, ReorderBufferTXN *txn)
 
void logicalrep_read_begin_prepare (StringInfo in, LogicalRepPreparedTxnData *begin_data)
 
static void logicalrep_write_prepare_common (StringInfo out, LogicalRepMsgType type, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
void logicalrep_write_prepare (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static void logicalrep_read_prepare_common (StringInfo in, char *msgtype, LogicalRepPreparedTxnData *prepare_data)
 
void logicalrep_read_prepare (StringInfo in, LogicalRepPreparedTxnData *prepare_data)
 
void logicalrep_write_commit_prepared (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
void logicalrep_read_commit_prepared (StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
 
void logicalrep_write_rollback_prepared (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
void logicalrep_read_rollback_prepared (StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
 
void logicalrep_write_stream_prepare (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
void logicalrep_read_stream_prepare (StringInfo in, LogicalRepPreparedTxnData *prepare_data)
 
void logicalrep_write_origin (StringInfo out, const char *origin, XLogRecPtr origin_lsn)
 
char * logicalrep_read_origin (StringInfo in, XLogRecPtr *origin_lsn)
 
void logicalrep_write_insert (StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
 
LogicalRepRelId logicalrep_read_insert (StringInfo in, LogicalRepTupleData *newtup)
 
void logicalrep_write_update (StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
 
LogicalRepRelId logicalrep_read_update (StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
 
void logicalrep_write_delete (StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
 
LogicalRepRelId logicalrep_read_delete (StringInfo in, LogicalRepTupleData *oldtup)
 
void logicalrep_write_truncate (StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
 
Listlogicalrep_read_truncate (StringInfo in, bool *cascade, bool *restart_seqs)
 
void logicalrep_write_message (StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
void logicalrep_write_rel (StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
 
LogicalRepRelationlogicalrep_read_rel (StringInfo in)
 
void logicalrep_write_typ (StringInfo out, TransactionId xid, Oid typoid)
 
void logicalrep_read_typ (StringInfo in, LogicalRepTyp *ltyp)
 
void logicalrep_write_stream_start (StringInfo out, TransactionId xid, bool first_segment)
 
TransactionId logicalrep_read_stream_start (StringInfo in, bool *first_segment)
 
void logicalrep_write_stream_stop (StringInfo out)
 
void logicalrep_write_stream_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
TransactionId logicalrep_read_stream_commit (StringInfo in, LogicalRepCommitData *commit_data)
 
void logicalrep_write_stream_abort (StringInfo out, TransactionId xid, TransactionId subxid, XLogRecPtr abort_lsn, TimestampTz abort_time, bool write_abort_info)
 
void logicalrep_read_stream_abort (StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
 
const char * logicalrep_message_type (LogicalRepMsgType action)
 
bool logicalrep_should_publish_column (Form_pg_attribute att, Bitmapset *columns, PublishGencolsType include_gencols_type)
 

Macro Definition Documentation

◆ LOGICALREP_IS_REPLICA_IDENTITY

#define LOGICALREP_IS_REPLICA_IDENTITY   1

Definition at line 26 of file proto.c.

◆ MESSAGE_TRANSACTIONAL

#define MESSAGE_TRANSACTIONAL   (1<<0)

Definition at line 28 of file proto.c.

◆ TRUNCATE_CASCADE

#define TRUNCATE_CASCADE   (1<<0)

Definition at line 29 of file proto.c.

◆ TRUNCATE_RESTART_SEQS

#define TRUNCATE_RESTART_SEQS   (1<<1)

Definition at line 30 of file proto.c.

Function Documentation

◆ logicalrep_message_type()

const char * logicalrep_message_type ( LogicalRepMsgType  action)

Definition at line 1212 of file proto.c.

1213{
1214 static char err_unknown[20];
1215
1216 switch (action)
1217 {
1219 return "BEGIN";
1221 return "COMMIT";
1223 return "ORIGIN";
1225 return "INSERT";
1227 return "UPDATE";
1229 return "DELETE";
1231 return "TRUNCATE";
1233 return "RELATION";
1235 return "TYPE";
1237 return "MESSAGE";
1239 return "BEGIN PREPARE";
1241 return "PREPARE";
1243 return "COMMIT PREPARED";
1245 return "ROLLBACK PREPARED";
1247 return "STREAM START";
1249 return "STREAM STOP";
1251 return "STREAM COMMIT";
1253 return "STREAM ABORT";
1255 return "STREAM PREPARE";
1256 }
1257
1258 /*
1259 * This message provides context in the error raised when applying a
1260 * logical message. So we can't throw an error here. Return an unknown
1261 * indicator value so that the original error is still reported.
1262 */
1263 snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);
1264
1265 return err_unknown;
1266}
@ LOGICAL_REP_MSG_INSERT
Definition: logicalproto.h:62
@ LOGICAL_REP_MSG_TRUNCATE
Definition: logicalproto.h:65
@ LOGICAL_REP_MSG_STREAM_STOP
Definition: logicalproto.h:74
@ LOGICAL_REP_MSG_BEGIN
Definition: logicalproto.h:59
@ LOGICAL_REP_MSG_STREAM_PREPARE
Definition: logicalproto.h:77
@ LOGICAL_REP_MSG_STREAM_ABORT
Definition: logicalproto.h:76
@ LOGICAL_REP_MSG_BEGIN_PREPARE
Definition: logicalproto.h:69
@ LOGICAL_REP_MSG_STREAM_START
Definition: logicalproto.h:73
@ LOGICAL_REP_MSG_COMMIT
Definition: logicalproto.h:60
@ LOGICAL_REP_MSG_PREPARE
Definition: logicalproto.h:70
@ LOGICAL_REP_MSG_RELATION
Definition: logicalproto.h:66
@ LOGICAL_REP_MSG_MESSAGE
Definition: logicalproto.h:68
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
Definition: logicalproto.h:72
@ LOGICAL_REP_MSG_COMMIT_PREPARED
Definition: logicalproto.h:71
@ LOGICAL_REP_MSG_TYPE
Definition: logicalproto.h:67
@ LOGICAL_REP_MSG_DELETE
Definition: logicalproto.h:64
@ LOGICAL_REP_MSG_STREAM_COMMIT
Definition: logicalproto.h:75
@ LOGICAL_REP_MSG_ORIGIN
Definition: logicalproto.h:61
@ LOGICAL_REP_MSG_UPDATE
Definition: logicalproto.h:63
#define snprintf
Definition: port.h:239

References generate_unaccent_rules::action, LOGICAL_REP_MSG_BEGIN, LOGICAL_REP_MSG_BEGIN_PREPARE, LOGICAL_REP_MSG_COMMIT, LOGICAL_REP_MSG_COMMIT_PREPARED, LOGICAL_REP_MSG_DELETE, LOGICAL_REP_MSG_INSERT, LOGICAL_REP_MSG_MESSAGE, LOGICAL_REP_MSG_ORIGIN, LOGICAL_REP_MSG_PREPARE, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_ROLLBACK_PREPARED, LOGICAL_REP_MSG_STREAM_ABORT, LOGICAL_REP_MSG_STREAM_COMMIT, LOGICAL_REP_MSG_STREAM_PREPARE, LOGICAL_REP_MSG_STREAM_START, LOGICAL_REP_MSG_STREAM_STOP, LOGICAL_REP_MSG_TRUNCATE, LOGICAL_REP_MSG_TYPE, LOGICAL_REP_MSG_UPDATE, and snprintf.

Referenced by apply_error_callback().

◆ logicalrep_read_attrs()

static void logicalrep_read_attrs ( StringInfo  in,
LogicalRepRelation rel 
)
static

Definition at line 988 of file proto.c.

989{
990 int i;
991 int natts;
992 char **attnames;
993 Oid *atttyps;
994 Bitmapset *attkeys = NULL;
995
996 natts = pq_getmsgint(in, 2);
997 attnames = palloc(natts * sizeof(char *));
998 atttyps = palloc(natts * sizeof(Oid));
999
1000 /* read the attributes */
1001 for (i = 0; i < natts; i++)
1002 {
1003 uint8 flags;
1004
1005 /* Check for replica identity column */
1006 flags = pq_getmsgbyte(in);
1008 attkeys = bms_add_member(attkeys, i);
1009
1010 /* attribute name */
1011 attnames[i] = pstrdup(pq_getmsgstring(in));
1012
1013 /* attribute type id */
1014 atttyps[i] = (Oid) pq_getmsgint(in, 4);
1015
1016 /* we ignore attribute mode for now */
1017 (void) pq_getmsgint(in, 4);
1018 }
1019
1020 rel->attnames = attnames;
1021 rel->atttyps = atttyps;
1022 rel->attkeys = attkeys;
1023 rel->natts = natts;
1024}
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:815
uint8_t uint8
Definition: c.h:540
int i
Definition: isn.c:77
char * pstrdup(const char *in)
Definition: mcxt.c:1759
void * palloc(Size size)
Definition: mcxt.c:1365
unsigned int Oid
Definition: postgres_ext.h:32
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:415
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:579
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
#define LOGICALREP_IS_REPLICA_IDENTITY
Definition: proto.c:26
Bitmapset * attkeys
Definition: logicalproto.h:115

References LogicalRepRelation::attkeys, LogicalRepRelation::attnames, LogicalRepRelation::atttyps, bms_add_member(), i, LOGICALREP_IS_REPLICA_IDENTITY, LogicalRepRelation::natts, palloc(), pq_getmsgbyte(), pq_getmsgint(), pq_getmsgstring(), and pstrdup().

Referenced by logicalrep_read_rel().

◆ logicalrep_read_begin()

void logicalrep_read_begin ( StringInfo  in,
LogicalRepBeginData begin_data 
)

Definition at line 63 of file proto.c.

64{
65 /* read fields */
66 begin_data->final_lsn = pq_getmsgint64(in);
67 if (!XLogRecPtrIsValid(begin_data->final_lsn))
68 elog(ERROR, "final_lsn not set in begin message");
69 begin_data->committime = pq_getmsgint64(in);
70 begin_data->xid = pq_getmsgint(in, 4);
71}
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:453
XLogRecPtr final_lsn
Definition: logicalproto.h:129
TransactionId xid
Definition: logicalproto.h:131
TimestampTz committime
Definition: logicalproto.h:130
#define XLogRecPtrIsValid(r)
Definition: xlogdefs.h:29

References LogicalRepBeginData::committime, elog, ERROR, LogicalRepBeginData::final_lsn, pq_getmsgint(), pq_getmsgint64(), LogicalRepBeginData::xid, and XLogRecPtrIsValid.

Referenced by apply_handle_begin().

◆ logicalrep_read_begin_prepare()

void logicalrep_read_begin_prepare ( StringInfo  in,
LogicalRepPreparedTxnData begin_data 
)

Definition at line 134 of file proto.c.

135{
136 /* read fields */
137 begin_data->prepare_lsn = pq_getmsgint64(in);
138 if (!XLogRecPtrIsValid(begin_data->prepare_lsn))
139 elog(ERROR, "prepare_lsn not set in begin prepare message");
140 begin_data->end_lsn = pq_getmsgint64(in);
141 if (!XLogRecPtrIsValid(begin_data->end_lsn))
142 elog(ERROR, "end_lsn not set in begin prepare message");
143 begin_data->prepare_time = pq_getmsgint64(in);
144 begin_data->xid = pq_getmsgint(in, 4);
145
146 /* read gid (copy it into a pre-allocated buffer) */
147 strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
148}
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45

References elog, LogicalRepPreparedTxnData::end_lsn, ERROR, LogicalRepPreparedTxnData::gid, pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepPreparedTxnData::prepare_lsn, LogicalRepPreparedTxnData::prepare_time, strlcpy(), LogicalRepPreparedTxnData::xid, and XLogRecPtrIsValid.

Referenced by apply_handle_begin_prepare().

◆ logicalrep_read_commit()

void logicalrep_read_commit ( StringInfo  in,
LogicalRepCommitData commit_data 
)

Definition at line 98 of file proto.c.

99{
100 /* read flags (unused for now) */
101 uint8 flags = pq_getmsgbyte(in);
102
103 if (flags != 0)
104 elog(ERROR, "unrecognized flags %u in commit message", flags);
105
106 /* read fields */
107 commit_data->commit_lsn = pq_getmsgint64(in);
108 commit_data->end_lsn = pq_getmsgint64(in);
109 commit_data->committime = pq_getmsgint64(in);
110}
TimestampTz committime
Definition: logicalproto.h:138

References LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, elog, LogicalRepCommitData::end_lsn, ERROR, pq_getmsgbyte(), and pq_getmsgint64().

Referenced by apply_handle_commit().

◆ logicalrep_read_commit_prepared()

void logicalrep_read_commit_prepared ( StringInfo  in,
LogicalRepCommitPreparedTxnData prepare_data 
)

Definition at line 267 of file proto.c.

268{
269 /* read flags */
270 uint8 flags = pq_getmsgbyte(in);
271
272 if (flags != 0)
273 elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
274
275 /* read fields */
276 prepare_data->commit_lsn = pq_getmsgint64(in);
277 if (!XLogRecPtrIsValid(prepare_data->commit_lsn))
278 elog(ERROR, "commit_lsn is not set in commit prepared message");
279 prepare_data->end_lsn = pq_getmsgint64(in);
280 if (!XLogRecPtrIsValid(prepare_data->end_lsn))
281 elog(ERROR, "end_lsn is not set in commit prepared message");
282 prepare_data->commit_time = pq_getmsgint64(in);
283 prepare_data->xid = pq_getmsgint(in, 4);
284
285 /* read gid (copy it into a pre-allocated buffer) */
286 strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
287}

References LogicalRepCommitPreparedTxnData::commit_lsn, LogicalRepCommitPreparedTxnData::commit_time, elog, LogicalRepCommitPreparedTxnData::end_lsn, ERROR, LogicalRepCommitPreparedTxnData::gid, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), strlcpy(), LogicalRepCommitPreparedTxnData::xid, and XLogRecPtrIsValid.

Referenced by apply_handle_commit_prepared().

◆ logicalrep_read_delete()

LogicalRepRelId logicalrep_read_delete ( StringInfo  in,
LogicalRepTupleData oldtup 
)

Definition at line 561 of file proto.c.

562{
563 char action;
564 LogicalRepRelId relid;
565
566 /* read the relation id */
567 relid = pq_getmsgint(in, 4);
568
569 /* read and verify action */
570 action = pq_getmsgbyte(in);
571 if (action != 'K' && action != 'O')
572 elog(ERROR, "expected action 'O' or 'K', got %c", action);
573
574 logicalrep_read_tuple(in, oldtup);
575
576 return relid;
577}
uint32 LogicalRepRelId
Definition: logicalproto.h:101
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:864

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_delete().

◆ logicalrep_read_insert()

LogicalRepRelId logicalrep_read_insert ( StringInfo  in,
LogicalRepTupleData newtup 
)

Definition at line 428 of file proto.c.

429{
430 char action;
431 LogicalRepRelId relid;
432
433 /* read the relation id */
434 relid = pq_getmsgint(in, 4);
435
436 action = pq_getmsgbyte(in);
437 if (action != 'N')
438 elog(ERROR, "expected new tuple but got %d",
439 action);
440
441 logicalrep_read_tuple(in, newtup);
442
443 return relid;
444}

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_insert().

◆ logicalrep_read_namespace()

static const char * logicalrep_read_namespace ( StringInfo  in)
static

Definition at line 1050 of file proto.c.

1051{
1052 const char *nspname = pq_getmsgstring(in);
1053
1054 if (nspname[0] == '\0')
1055 nspname = "pg_catalog";
1056
1057 return nspname;
1058}

References pq_getmsgstring().

Referenced by logicalrep_read_rel(), and logicalrep_read_typ().

◆ logicalrep_read_origin()

char * logicalrep_read_origin ( StringInfo  in,
XLogRecPtr origin_lsn 
)

Definition at line 390 of file proto.c.

391{
392 /* fixed fields */
393 *origin_lsn = pq_getmsgint64(in);
394
395 /* return origin */
396 return pstrdup(pq_getmsgstring(in));
397}

References pq_getmsgint64(), pq_getmsgstring(), and pstrdup().

◆ logicalrep_read_prepare()

void logicalrep_read_prepare ( StringInfo  in,
LogicalRepPreparedTxnData prepare_data 
)

Definition at line 228 of file proto.c.

229{
230 logicalrep_read_prepare_common(in, "prepare", prepare_data);
231}
static void logicalrep_read_prepare_common(StringInfo in, char *msgtype, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:199

References logicalrep_read_prepare_common().

Referenced by apply_handle_prepare().

◆ logicalrep_read_prepare_common()

static void logicalrep_read_prepare_common ( StringInfo  in,
char *  msgtype,
LogicalRepPreparedTxnData prepare_data 
)
static

Definition at line 199 of file proto.c.

201{
202 /* read flags */
203 uint8 flags = pq_getmsgbyte(in);
204
205 if (flags != 0)
206 elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
207
208 /* read fields */
209 prepare_data->prepare_lsn = pq_getmsgint64(in);
210 if (!XLogRecPtrIsValid(prepare_data->prepare_lsn))
211 elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
212 prepare_data->end_lsn = pq_getmsgint64(in);
213 if (!XLogRecPtrIsValid(prepare_data->end_lsn))
214 elog(ERROR, "end_lsn is not set in %s message", msgtype);
215 prepare_data->prepare_time = pq_getmsgint64(in);
216 prepare_data->xid = pq_getmsgint(in, 4);
217 if (prepare_data->xid == InvalidTransactionId)
218 elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
219
220 /* read gid (copy it into a pre-allocated buffer) */
221 strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
222}
#define InvalidTransactionId
Definition: transam.h:31

References elog, LogicalRepPreparedTxnData::end_lsn, ERROR, LogicalRepPreparedTxnData::gid, InvalidTransactionId, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepPreparedTxnData::prepare_lsn, LogicalRepPreparedTxnData::prepare_time, strlcpy(), LogicalRepPreparedTxnData::xid, and XLogRecPtrIsValid.

Referenced by logicalrep_read_prepare(), and logicalrep_read_stream_prepare().

◆ logicalrep_read_rel()

LogicalRepRelation * logicalrep_read_rel ( StringInfo  in)

Definition at line 698 of file proto.c.

699{
701
702 rel->remoteid = pq_getmsgint(in, 4);
703
704 /* Read relation name from stream */
706 rel->relname = pstrdup(pq_getmsgstring(in));
707
708 /* Read the replica identity. */
709 rel->replident = pq_getmsgbyte(in);
710
711 /* relkind is not sent */
712 rel->relkind = 0;
713
714 /* Get attribute description */
715 logicalrep_read_attrs(in, rel);
716
717 return rel;
718}
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
Definition: proto.c:988
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:1050
LogicalRepRelId remoteid
Definition: logicalproto.h:107

References logicalrep_read_attrs(), logicalrep_read_namespace(), LogicalRepRelation::nspname, palloc(), pq_getmsgbyte(), pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepRelation::relkind, LogicalRepRelation::relname, LogicalRepRelation::remoteid, and LogicalRepRelation::replident.

Referenced by apply_handle_relation().

◆ logicalrep_read_rollback_prepared()

void logicalrep_read_rollback_prepared ( StringInfo  in,
LogicalRepRollbackPreparedTxnData rollback_data 
)

Definition at line 325 of file proto.c.

327{
328 /* read flags */
329 uint8 flags = pq_getmsgbyte(in);
330
331 if (flags != 0)
332 elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
333
334 /* read fields */
335 rollback_data->prepare_end_lsn = pq_getmsgint64(in);
336 if (!XLogRecPtrIsValid(rollback_data->prepare_end_lsn))
337 elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
338 rollback_data->rollback_end_lsn = pq_getmsgint64(in);
339 if (!XLogRecPtrIsValid(rollback_data->rollback_end_lsn))
340 elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
341 rollback_data->prepare_time = pq_getmsgint64(in);
342 rollback_data->rollback_time = pq_getmsgint64(in);
343 rollback_data->xid = pq_getmsgint(in, 4);
344
345 /* read gid (copy it into a pre-allocated buffer) */
346 strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
347}

References elog, ERROR, LogicalRepRollbackPreparedTxnData::gid, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepRollbackPreparedTxnData::prepare_end_lsn, LogicalRepRollbackPreparedTxnData::prepare_time, LogicalRepRollbackPreparedTxnData::rollback_end_lsn, LogicalRepRollbackPreparedTxnData::rollback_time, strlcpy(), LogicalRepRollbackPreparedTxnData::xid, and XLogRecPtrIsValid.

Referenced by apply_handle_rollback_prepared().

◆ logicalrep_read_stream_abort()

void logicalrep_read_stream_abort ( StringInfo  in,
LogicalRepStreamAbortData abort_data,
bool  read_abort_info 
)

Definition at line 1187 of file proto.c.

1190{
1191 Assert(abort_data);
1192
1193 abort_data->xid = pq_getmsgint(in, 4);
1194 abort_data->subxid = pq_getmsgint(in, 4);
1195
1196 if (read_abort_info)
1197 {
1198 abort_data->abort_lsn = pq_getmsgint64(in);
1199 abort_data->abort_time = pq_getmsgint64(in);
1200 }
1201 else
1202 {
1203 abort_data->abort_lsn = InvalidXLogRecPtr;
1204 abort_data->abort_time = 0;
1205 }
1206}
Assert(PointerIsAligned(start, uint64))
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References LogicalRepStreamAbortData::abort_lsn, LogicalRepStreamAbortData::abort_time, Assert(), InvalidXLogRecPtr, pq_getmsgint(), pq_getmsgint64(), LogicalRepStreamAbortData::subxid, and LogicalRepStreamAbortData::xid.

Referenced by apply_handle_stream_abort().

◆ logicalrep_read_stream_commit()

TransactionId logicalrep_read_stream_commit ( StringInfo  in,
LogicalRepCommitData commit_data 
)

Definition at line 1132 of file proto.c.

1133{
1134 TransactionId xid;
1135 uint8 flags;
1136
1137 xid = pq_getmsgint(in, 4);
1138
1139 /* read flags (unused for now) */
1140 flags = pq_getmsgbyte(in);
1141
1142 if (flags != 0)
1143 elog(ERROR, "unrecognized flags %u in commit message", flags);
1144
1145 /* read fields */
1146 commit_data->commit_lsn = pq_getmsgint64(in);
1147 commit_data->end_lsn = pq_getmsgint64(in);
1148 commit_data->committime = pq_getmsgint64(in);
1149
1150 return xid;
1151}
uint32 TransactionId
Definition: c.h:661

References LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, elog, LogicalRepCommitData::end_lsn, ERROR, pq_getmsgbyte(), pq_getmsgint(), and pq_getmsgint64().

Referenced by apply_handle_stream_commit().

◆ logicalrep_read_stream_prepare()

void logicalrep_read_stream_prepare ( StringInfo  in,
LogicalRepPreparedTxnData prepare_data 
)

Definition at line 365 of file proto.c.

366{
367 logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
368}

References logicalrep_read_prepare_common().

Referenced by apply_handle_stream_prepare().

◆ logicalrep_read_stream_start()

TransactionId logicalrep_read_stream_start ( StringInfo  in,
bool *  first_segment 
)

Definition at line 1082 of file proto.c.

1083{
1084 TransactionId xid;
1085
1086 Assert(first_segment);
1087
1088 xid = pq_getmsgint(in, 4);
1089 *first_segment = (pq_getmsgbyte(in) == 1);
1090
1091 return xid;
1092}

References Assert(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_stream_start().

◆ logicalrep_read_truncate()

List * logicalrep_read_truncate ( StringInfo  in,
bool *  cascade,
bool *  restart_seqs 
)

Definition at line 615 of file proto.c.

617{
618 int i;
619 int nrelids;
620 List *relids = NIL;
621 uint8 flags;
622
623 nrelids = pq_getmsgint(in, 4);
624
625 /* read and decode truncate flags */
626 flags = pq_getmsgint(in, 1);
627 *cascade = (flags & TRUNCATE_CASCADE) > 0;
628 *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
629
630 for (i = 0; i < nrelids; i++)
631 relids = lappend_oid(relids, pq_getmsgint(in, 4));
632
633 return relids;
634}
List * lappend_oid(List *list, Oid datum)
Definition: list.c:375
#define NIL
Definition: pg_list.h:68
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:30
#define TRUNCATE_CASCADE
Definition: proto.c:29
Definition: pg_list.h:54

References i, lappend_oid(), NIL, pq_getmsgint(), TRUNCATE_CASCADE, and TRUNCATE_RESTART_SEQS.

Referenced by apply_handle_truncate().

◆ logicalrep_read_tuple()

static void logicalrep_read_tuple ( StringInfo  in,
LogicalRepTupleData tuple 
)
static

Definition at line 864 of file proto.c.

865{
866 int i;
867 int natts;
868
869 /* Get number of attributes */
870 natts = pq_getmsgint(in, 2);
871
872 /* Allocate space for per-column values; zero out unused StringInfoDatas */
873 tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
874 tuple->colstatus = (char *) palloc(natts * sizeof(char));
875 tuple->ncols = natts;
876
877 /* Read the data */
878 for (i = 0; i < natts; i++)
879 {
880 char *buff;
881 char kind;
882 int len;
883 StringInfo value = &tuple->colvalues[i];
884
885 kind = pq_getmsgbyte(in);
886 tuple->colstatus[i] = kind;
887
888 switch (kind)
889 {
891 /* nothing more to do */
892 break;
894 /* we don't receive the value of an unchanged column */
895 break;
898 len = pq_getmsgint(in, 4); /* read length */
899
900 /* and data */
901 buff = palloc(len + 1);
902 pq_copymsgbytes(in, buff, len);
903
904 /*
905 * NUL termination is required for LOGICALREP_COLUMN_TEXT mode
906 * as input functions require that. For
907 * LOGICALREP_COLUMN_BINARY it's not technically required, but
908 * it's harmless.
909 */
910 buff[len] = '\0';
911
913 break;
914 default:
915 elog(ERROR, "unrecognized data representation type '%c'", kind);
916 }
917 }
918}
static struct @171 value
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:97
#define LOGICALREP_COLUMN_NULL
Definition: logicalproto.h:96
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:99
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:98
void * palloc0(Size size)
Definition: mcxt.c:1395
const void size_t len
void pq_copymsgbytes(StringInfo msg, void *buf, int datalen)
Definition: pqformat.c:528
static void initStringInfoFromString(StringInfo str, char *data, int len)
Definition: stringinfo.h:175
StringInfoData * colvalues
Definition: logicalproto.h:87

References LogicalRepTupleData::colstatus, LogicalRepTupleData::colvalues, elog, ERROR, i, initStringInfoFromString(), len, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_NULL, LOGICALREP_COLUMN_TEXT, LOGICALREP_COLUMN_UNCHANGED, LogicalRepTupleData::ncols, palloc(), palloc0(), pq_copymsgbytes(), pq_getmsgbyte(), pq_getmsgint(), and value.

Referenced by logicalrep_read_delete(), logicalrep_read_insert(), and logicalrep_read_update().

◆ logicalrep_read_typ()

void logicalrep_read_typ ( StringInfo  in,
LogicalRepTyp ltyp 
)

Definition at line 757 of file proto.c.

758{
759 ltyp->remoteid = pq_getmsgint(in, 4);
760
761 /* Read type name from stream */
763 ltyp->typname = pstrdup(pq_getmsgstring(in));
764}

References logicalrep_read_namespace(), LogicalRepTyp::nspname, pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepTyp::remoteid, and LogicalRepTyp::typname.

Referenced by apply_handle_type().

◆ logicalrep_read_update()

LogicalRepRelId logicalrep_read_update ( StringInfo  in,
bool *  has_oldtuple,
LogicalRepTupleData oldtup,
LogicalRepTupleData newtup 
)

Definition at line 487 of file proto.c.

490{
491 char action;
492 LogicalRepRelId relid;
493
494 /* read the relation id */
495 relid = pq_getmsgint(in, 4);
496
497 /* read and verify action */
498 action = pq_getmsgbyte(in);
499 if (action != 'K' && action != 'O' && action != 'N')
500 elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
501 action);
502
503 /* check for old tuple */
504 if (action == 'K' || action == 'O')
505 {
506 logicalrep_read_tuple(in, oldtup);
507 *has_oldtuple = true;
508
509 action = pq_getmsgbyte(in);
510 }
511 else
512 *has_oldtuple = false;
513
514 /* check for new tuple */
515 if (action != 'N')
516 elog(ERROR, "expected action 'N', got %c",
517 action);
518
519 logicalrep_read_tuple(in, newtup);
520
521 return relid;
522}

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_update().

◆ logicalrep_should_publish_column()

bool logicalrep_should_publish_column ( Form_pg_attribute  att,
Bitmapset columns,
PublishGencolsType  include_gencols_type 
)

Definition at line 1282 of file proto.c.

1284{
1285 if (att->attisdropped)
1286 return false;
1287
1288 /* If a column list is provided, publish only the cols in that list. */
1289 if (columns)
1290 return bms_is_member(att->attnum, columns);
1291
1292 /* All non-generated columns are always published. */
1293 if (!att->attgenerated)
1294 return true;
1295
1296 /*
1297 * Stored generated columns are only published when the user sets
1298 * publish_generated_columns as stored.
1299 */
1300 if (att->attgenerated == ATTRIBUTE_GENERATED_STORED)
1301 return include_gencols_type == PUBLISH_GENCOLS_STORED;
1302
1303 return false;
1304}
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:510

References bms_is_member().

Referenced by logicalrep_write_attrs(), logicalrep_write_tuple(), and send_relation_and_attrs().

◆ logicalrep_write_attrs()

static void logicalrep_write_attrs ( StringInfo  out,
Relation  rel,
Bitmapset columns,
PublishGencolsType  include_gencols_type 
)
static

Definition at line 924 of file proto.c.

926{
927 TupleDesc desc;
928 int i;
929 uint16 nliveatts = 0;
930 Bitmapset *idattrs = NULL;
931 bool replidentfull;
932
933 desc = RelationGetDescr(rel);
934
935 /* send number of live attributes */
936 for (i = 0; i < desc->natts; i++)
937 {
938 Form_pg_attribute att = TupleDescAttr(desc, i);
939
940 if (!logicalrep_should_publish_column(att, columns,
941 include_gencols_type))
942 continue;
943
944 nliveatts++;
945 }
946 pq_sendint16(out, nliveatts);
947
948 /* fetch bitmap of REPLICATION IDENTITY attributes */
949 replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
950 if (!replidentfull)
951 idattrs = RelationGetIdentityKeyBitmap(rel);
952
953 /* send the attributes */
954 for (i = 0; i < desc->natts; i++)
955 {
956 Form_pg_attribute att = TupleDescAttr(desc, i);
957 uint8 flags = 0;
958
959 if (!logicalrep_should_publish_column(att, columns,
960 include_gencols_type))
961 continue;
962
963 /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
964 if (replidentfull ||
966 idattrs))
968
969 pq_sendbyte(out, flags);
970
971 /* attribute name */
972 pq_sendstring(out, NameStr(att->attname));
973
974 /* attribute type id */
975 pq_sendint32(out, (int) att->atttypid);
976
977 /* attribute mode */
978 pq_sendint32(out, att->atttypmod);
979 }
980
981 bms_free(idattrs);
982}
void bms_free(Bitmapset *a)
Definition: bitmapset.c:239
#define NameStr(name)
Definition: c.h:755
uint16_t uint16
Definition: c.h:541
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:202
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:195
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:144
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:136
bool logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:1282
#define RelationGetDescr(relation)
Definition: rel.h:541
Bitmapset * RelationGetIdentityKeyBitmap(Relation relation)
Definition: relcache.c:5576
Form_pg_class rd_rel
Definition: rel.h:111
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:160

References bms_free(), bms_is_member(), FirstLowInvalidHeapAttributeNumber, i, LOGICALREP_IS_REPLICA_IDENTITY, logicalrep_should_publish_column(), NameStr, TupleDescData::natts, pq_sendbyte(), pq_sendint16(), pq_sendint32(), pq_sendstring(), RelationData::rd_rel, RelationGetDescr, RelationGetIdentityKeyBitmap(), and TupleDescAttr().

Referenced by logicalrep_write_rel().

◆ logicalrep_write_begin()

void logicalrep_write_begin ( StringInfo  out,
ReorderBufferTXN txn 
)

Definition at line 49 of file proto.c.

50{
52
53 /* fixed fields */
54 pq_sendint64(out, txn->final_lsn);
55 pq_sendint64(out, txn->commit_time);
56 pq_sendint32(out, txn->xid);
57}
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152
TimestampTz commit_time
XLogRecPtr final_lsn
TransactionId xid

References ReorderBufferTXN::commit_time, ReorderBufferTXN::final_lsn, LOGICAL_REP_MSG_BEGIN, pq_sendbyte(), pq_sendint32(), pq_sendint64(), and ReorderBufferTXN::xid.

Referenced by pgoutput_send_begin().

◆ logicalrep_write_begin_prepare()

void logicalrep_write_begin_prepare ( StringInfo  out,
ReorderBufferTXN txn 
)

◆ logicalrep_write_commit()

void logicalrep_write_commit ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 78 of file proto.c.

80{
81 uint8 flags = 0;
82
84
85 /* send the flags field (unused for now) */
86 pq_sendbyte(out, flags);
87
88 /* send fields */
89 pq_sendint64(out, commit_lsn);
90 pq_sendint64(out, txn->end_lsn);
91 pq_sendint64(out, txn->commit_time);
92}

References ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, LOGICAL_REP_MSG_COMMIT, pq_sendbyte(), and pq_sendint64().

Referenced by pgoutput_commit_txn().

◆ logicalrep_write_commit_prepared()

void logicalrep_write_commit_prepared ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 237 of file proto.c.

239{
240 uint8 flags = 0;
241
243
244 /*
245 * This should only ever happen for two-phase commit transactions, in
246 * which case we expect to have a valid GID.
247 */
248 Assert(txn->gid != NULL);
249
250 /* send the flags field */
251 pq_sendbyte(out, flags);
252
253 /* send fields */
254 pq_sendint64(out, commit_lsn);
255 pq_sendint64(out, txn->end_lsn);
256 pq_sendint64(out, txn->commit_time);
257 pq_sendint32(out, txn->xid);
258
259 /* send gid */
260 pq_sendstring(out, txn->gid);
261}

References Assert(), ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_COMMIT_PREPARED, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), and ReorderBufferTXN::xid.

Referenced by pgoutput_commit_prepared_txn().

◆ logicalrep_write_delete()

void logicalrep_write_delete ( StringInfo  out,
TransactionId  xid,
Relation  rel,
TupleTableSlot oldslot,
bool  binary,
Bitmapset columns,
PublishGencolsType  include_gencols_type 
)

Definition at line 528 of file proto.c.

532{
533 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
534 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
535 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
536
538
539 /* transaction ID (if not valid, we're not streaming) */
540 if (TransactionIdIsValid(xid))
541 pq_sendint32(out, xid);
542
543 /* use Oid as relation identifier */
545
546 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
547 pq_sendbyte(out, 'O'); /* old tuple follows */
548 else
549 pq_sendbyte(out, 'K'); /* old key follows */
550
551 logicalrep_write_tuple(out, rel, oldslot, binary, columns,
552 include_gencols_type);
553}
static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:770
#define RelationGetRelid(relation)
Definition: rel.h:515
#define TransactionIdIsValid(xid)
Definition: transam.h:41

References Assert(), LOGICAL_REP_MSG_DELETE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

◆ logicalrep_write_insert()

void logicalrep_write_insert ( StringInfo  out,
TransactionId  xid,
Relation  rel,
TupleTableSlot newslot,
bool  binary,
Bitmapset columns,
PublishGencolsType  include_gencols_type 
)

Definition at line 403 of file proto.c.

407{
409
410 /* transaction ID (if not valid, we're not streaming) */
411 if (TransactionIdIsValid(xid))
412 pq_sendint32(out, xid);
413
414 /* use Oid as relation identifier */
416
417 pq_sendbyte(out, 'N'); /* new tuple follows */
418 logicalrep_write_tuple(out, rel, newslot, binary, columns,
419 include_gencols_type);
420}

References LOGICAL_REP_MSG_INSERT, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

◆ logicalrep_write_message()

void logicalrep_write_message ( StringInfo  out,
TransactionId  xid,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)

Definition at line 640 of file proto.c.

643{
644 uint8 flags = 0;
645
647
648 /* encode and send message flags */
649 if (transactional)
650 flags |= MESSAGE_TRANSACTIONAL;
651
652 /* transaction ID (if not valid, we're not streaming) */
653 if (TransactionIdIsValid(xid))
654 pq_sendint32(out, xid);
655
656 pq_sendint8(out, flags);
657 pq_sendint64(out, lsn);
658 pq_sendstring(out, prefix);
659 pq_sendint32(out, sz);
660 pq_sendbytes(out, message, sz);
661}
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
Definition: pqformat.c:126
static void pq_sendint8(StringInfo buf, uint8 i)
Definition: pqformat.h:128
#define MESSAGE_TRANSACTIONAL
Definition: proto.c:28

References LOGICAL_REP_MSG_MESSAGE, MESSAGE_TRANSACTIONAL, pq_sendbyte(), pq_sendbytes(), pq_sendint32(), pq_sendint64(), pq_sendint8(), pq_sendstring(), and TransactionIdIsValid.

Referenced by pgoutput_message().

◆ logicalrep_write_namespace()

static void logicalrep_write_namespace ( StringInfo  out,
Oid  nspid 
)
static

Definition at line 1030 of file proto.c.

1031{
1032 if (nspid == PG_CATALOG_NAMESPACE)
1033 pq_sendbyte(out, '\0');
1034 else
1035 {
1036 char *nspname = get_namespace_name(nspid);
1037
1038 if (nspname == NULL)
1039 elog(ERROR, "cache lookup failed for namespace %u",
1040 nspid);
1041
1042 pq_sendstring(out, nspname);
1043 }
1044}
int nspid
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3533

References elog, ERROR, get_namespace_name(), nspid, pq_sendbyte(), and pq_sendstring().

Referenced by logicalrep_write_rel(), and logicalrep_write_typ().

◆ logicalrep_write_origin()

void logicalrep_write_origin ( StringInfo  out,
const char *  origin,
XLogRecPtr  origin_lsn 
)

Definition at line 374 of file proto.c.

376{
378
379 /* fixed fields */
380 pq_sendint64(out, origin_lsn);
381
382 /* origin string */
383 pq_sendstring(out, origin);
384}

References LOGICAL_REP_MSG_ORIGIN, pq_sendbyte(), pq_sendint64(), and pq_sendstring().

Referenced by send_repl_origin().

◆ logicalrep_write_prepare()

void logicalrep_write_prepare ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)

Definition at line 187 of file proto.c.

189{
191 txn, prepare_lsn);
192}
static void logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:155

References LOGICAL_REP_MSG_PREPARE, and logicalrep_write_prepare_common().

Referenced by pgoutput_prepare_txn().

◆ logicalrep_write_prepare_common()

static void logicalrep_write_prepare_common ( StringInfo  out,
LogicalRepMsgType  type,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 155 of file proto.c.

157{
158 uint8 flags = 0;
159
160 pq_sendbyte(out, type);
161
162 /*
163 * This should only ever happen for two-phase commit transactions, in
164 * which case we expect to have a valid GID.
165 */
166 Assert(txn->gid != NULL);
169
170 /* send the flags field */
171 pq_sendbyte(out, flags);
172
173 /* send fields */
174 pq_sendint64(out, prepare_lsn);
175 pq_sendint64(out, txn->end_lsn);
176 pq_sendint64(out, txn->prepare_time);
177 pq_sendint32(out, txn->xid);
178
179 /* send gid */
180 pq_sendstring(out, txn->gid);
181}
#define rbtxn_is_prepared(txn)
const char * type

References Assert(), ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::prepare_time, rbtxn_is_prepared, TransactionIdIsValid, type, and ReorderBufferTXN::xid.

Referenced by logicalrep_write_prepare(), and logicalrep_write_stream_prepare().

◆ logicalrep_write_rel()

void logicalrep_write_rel ( StringInfo  out,
TransactionId  xid,
Relation  rel,
Bitmapset columns,
PublishGencolsType  include_gencols_type 
)

Definition at line 667 of file proto.c.

670{
671 char *relname;
672
674
675 /* transaction ID (if not valid, we're not streaming) */
676 if (TransactionIdIsValid(xid))
677 pq_sendint32(out, xid);
678
679 /* use Oid as relation identifier */
681
682 /* send qualified relation name */
686
687 /* send replica identity */
688 pq_sendbyte(out, rel->rd_rel->relreplident);
689
690 /* send the attribute info */
691 logicalrep_write_attrs(out, rel, columns, include_gencols_type);
692}
NameData relname
Definition: pg_class.h:38
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:1030
static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:924
#define RelationGetRelationName(relation)
Definition: rel.h:549
#define RelationGetNamespace(relation)
Definition: rel.h:556

References LOGICAL_REP_MSG_RELATION, logicalrep_write_attrs(), logicalrep_write_namespace(), pq_sendbyte(), pq_sendint32(), pq_sendstring(), RelationData::rd_rel, RelationGetNamespace, RelationGetRelationName, RelationGetRelid, relname, and TransactionIdIsValid.

Referenced by send_relation_and_attrs().

◆ logicalrep_write_rollback_prepared()

void logicalrep_write_rollback_prepared ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  prepare_end_lsn,
TimestampTz  prepare_time 
)

Definition at line 293 of file proto.c.

296{
297 uint8 flags = 0;
298
300
301 /*
302 * This should only ever happen for two-phase commit transactions, in
303 * which case we expect to have a valid GID.
304 */
305 Assert(txn->gid != NULL);
306
307 /* send the flags field */
308 pq_sendbyte(out, flags);
309
310 /* send fields */
311 pq_sendint64(out, prepare_end_lsn);
312 pq_sendint64(out, txn->end_lsn);
313 pq_sendint64(out, prepare_time);
314 pq_sendint64(out, txn->commit_time);
315 pq_sendint32(out, txn->xid);
316
317 /* send gid */
318 pq_sendstring(out, txn->gid);
319}

References Assert(), ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_ROLLBACK_PREPARED, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), and ReorderBufferTXN::xid.

Referenced by pgoutput_rollback_prepared_txn().

◆ logicalrep_write_stream_abort()

void logicalrep_write_stream_abort ( StringInfo  out,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  abort_lsn,
TimestampTz  abort_time,
bool  write_abort_info 
)

Definition at line 1161 of file proto.c.

1164{
1166
1168
1169 /* transaction ID */
1170 pq_sendint32(out, xid);
1171 pq_sendint32(out, subxid);
1172
1173 if (write_abort_info)
1174 {
1175 pq_sendint64(out, abort_lsn);
1176 pq_sendint64(out, abort_time);
1177 }
1178}

References Assert(), LOGICAL_REP_MSG_STREAM_ABORT, pq_sendbyte(), pq_sendint32(), pq_sendint64(), and TransactionIdIsValid.

Referenced by pgoutput_stream_abort().

◆ logicalrep_write_stream_commit()

void logicalrep_write_stream_commit ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 1107 of file proto.c.

1109{
1110 uint8 flags = 0;
1111
1113
1115
1116 /* transaction ID */
1117 pq_sendint32(out, txn->xid);
1118
1119 /* send the flags field (unused for now) */
1120 pq_sendbyte(out, flags);
1121
1122 /* send fields */
1123 pq_sendint64(out, commit_lsn);
1124 pq_sendint64(out, txn->end_lsn);
1125 pq_sendint64(out, txn->commit_time);
1126}

References Assert(), ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, LOGICAL_REP_MSG_STREAM_COMMIT, pq_sendbyte(), pq_sendint32(), pq_sendint64(), TransactionIdIsValid, and ReorderBufferTXN::xid.

Referenced by pgoutput_stream_commit().

◆ logicalrep_write_stream_prepare()

void logicalrep_write_stream_prepare ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)

Definition at line 353 of file proto.c.

356{
358 txn, prepare_lsn);
359}

References LOGICAL_REP_MSG_STREAM_PREPARE, and logicalrep_write_prepare_common().

Referenced by pgoutput_stream_prepare_txn().

◆ logicalrep_write_stream_start()

void logicalrep_write_stream_start ( StringInfo  out,
TransactionId  xid,
bool  first_segment 
)

Definition at line 1064 of file proto.c.

1066{
1068
1070
1071 /* transaction ID (we're starting to stream, so must be valid) */
1072 pq_sendint32(out, xid);
1073
1074 /* 1 if this is the first streaming segment for this xid */
1075 pq_sendbyte(out, first_segment ? 1 : 0);
1076}

References Assert(), LOGICAL_REP_MSG_STREAM_START, pq_sendbyte(), pq_sendint32(), and TransactionIdIsValid.

Referenced by pgoutput_stream_start().

◆ logicalrep_write_stream_stop()

void logicalrep_write_stream_stop ( StringInfo  out)

Definition at line 1098 of file proto.c.

1099{
1101}

References LOGICAL_REP_MSG_STREAM_STOP, and pq_sendbyte().

Referenced by pgoutput_stream_stop().

◆ logicalrep_write_truncate()

void logicalrep_write_truncate ( StringInfo  out,
TransactionId  xid,
int  nrelids,
Oid  relids[],
bool  cascade,
bool  restart_seqs 
)

Definition at line 583 of file proto.c.

588{
589 int i;
590 uint8 flags = 0;
591
593
594 /* transaction ID (if not valid, we're not streaming) */
595 if (TransactionIdIsValid(xid))
596 pq_sendint32(out, xid);
597
598 pq_sendint32(out, nrelids);
599
600 /* encode and send truncate flags */
601 if (cascade)
602 flags |= TRUNCATE_CASCADE;
603 if (restart_seqs)
604 flags |= TRUNCATE_RESTART_SEQS;
605 pq_sendint8(out, flags);
606
607 for (i = 0; i < nrelids; i++)
608 pq_sendint32(out, relids[i]);
609}

References i, LOGICAL_REP_MSG_TRUNCATE, pq_sendbyte(), pq_sendint32(), pq_sendint8(), TransactionIdIsValid, TRUNCATE_CASCADE, and TRUNCATE_RESTART_SEQS.

Referenced by pgoutput_truncate().

◆ logicalrep_write_tuple()

static void logicalrep_write_tuple ( StringInfo  out,
Relation  rel,
TupleTableSlot slot,
bool  binary,
Bitmapset columns,
PublishGencolsType  include_gencols_type 
)
static

Definition at line 770 of file proto.c.

773{
774 TupleDesc desc;
775 Datum *values;
776 bool *isnull;
777 int i;
778 uint16 nliveatts = 0;
779
780 desc = RelationGetDescr(rel);
781
782 for (i = 0; i < desc->natts; i++)
783 {
784 Form_pg_attribute att = TupleDescAttr(desc, i);
785
786 if (!logicalrep_should_publish_column(att, columns,
787 include_gencols_type))
788 continue;
789
790 nliveatts++;
791 }
792 pq_sendint16(out, nliveatts);
793
794 slot_getallattrs(slot);
795 values = slot->tts_values;
796 isnull = slot->tts_isnull;
797
798 /* Write the values */
799 for (i = 0; i < desc->natts; i++)
800 {
801 HeapTuple typtup;
802 Form_pg_type typclass;
803 Form_pg_attribute att = TupleDescAttr(desc, i);
804
805 if (!logicalrep_should_publish_column(att, columns,
806 include_gencols_type))
807 continue;
808
809 if (isnull[i])
810 {
812 continue;
813 }
814
815 if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(values[i])))
816 {
817 /*
818 * Unchanged toasted datum. (Note that we don't promise to detect
819 * unchanged data in general; this is just a cheap check to avoid
820 * sending large values unnecessarily.)
821 */
823 continue;
824 }
825
826 typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
827 if (!HeapTupleIsValid(typtup))
828 elog(ERROR, "cache lookup failed for type %u", att->atttypid);
829 typclass = (Form_pg_type) GETSTRUCT(typtup);
830
831 /*
832 * Send in binary if requested and type has suitable send function.
833 */
834 if (binary && OidIsValid(typclass->typsend))
835 {
836 bytea *outputbytes;
837 int len;
838
840 outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
841 len = VARSIZE(outputbytes) - VARHDRSZ;
842 pq_sendint(out, len, 4); /* length */
843 pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
844 pfree(outputbytes);
845 }
846 else
847 {
848 char *outputstr;
849
851 outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
852 pq_sendcountedtext(out, outputstr, strlen(outputstr));
853 pfree(outputstr);
854 }
855
856 ReleaseSysCache(typtup);
857 }
858}
static Datum values[MAXATTR]
Definition: bootstrap.c:153
#define VARHDRSZ
Definition: c.h:701
#define OidIsValid(objectId)
Definition: c.h:778
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1763
bytea * OidSendFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1782
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
Definition: htup_details.h:728
void pfree(void *pointer)
Definition: mcxt.c:1594
FormData_pg_type * Form_pg_type
Definition: pg_type.h:261
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:262
uint64_t Datum
Definition: postgres.h:70
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:322
void pq_sendcountedtext(StringInfo buf, const char *str, int slen)
Definition: pqformat.c:142
static void pq_sendint(StringInfo buf, uint32 i, int b)
Definition: pqformat.h:171
bool * tts_isnull
Definition: tuptable.h:126
Datum * tts_values
Definition: tuptable.h:124
Definition: c.h:696
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:264
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:220
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:371
static bool VARATT_IS_EXTERNAL_ONDISK(const void *PTR)
Definition: varatt.h:361
static Size VARSIZE(const void *PTR)
Definition: varatt.h:298
static char * VARDATA(const void *PTR)
Definition: varatt.h:305

References DatumGetPointer(), elog, ERROR, GETSTRUCT(), HeapTupleIsValid, i, len, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_NULL, LOGICALREP_COLUMN_TEXT, LOGICALREP_COLUMN_UNCHANGED, logicalrep_should_publish_column(), TupleDescData::natts, ObjectIdGetDatum(), OidIsValid, OidOutputFunctionCall(), OidSendFunctionCall(), pfree(), pq_sendbyte(), pq_sendbytes(), pq_sendcountedtext(), pq_sendint(), pq_sendint16(), RelationGetDescr, ReleaseSysCache(), SearchSysCache1(), slot_getallattrs(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, TupleDescAttr(), values, VARATT_IS_EXTERNAL_ONDISK(), VARDATA(), VARHDRSZ, and VARSIZE().

Referenced by logicalrep_write_delete(), logicalrep_write_insert(), and logicalrep_write_update().

◆ logicalrep_write_typ()

void logicalrep_write_typ ( StringInfo  out,
TransactionId  xid,
Oid  typoid 
)

Definition at line 726 of file proto.c.

727{
728 Oid basetypoid = getBaseType(typoid);
729 HeapTuple tup;
730 Form_pg_type typtup;
731
733
734 /* transaction ID (if not valid, we're not streaming) */
735 if (TransactionIdIsValid(xid))
736 pq_sendint32(out, xid);
737
738 tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
739 if (!HeapTupleIsValid(tup))
740 elog(ERROR, "cache lookup failed for type %u", basetypoid);
741 typtup = (Form_pg_type) GETSTRUCT(tup);
742
743 /* use Oid as type identifier */
744 pq_sendint32(out, typoid);
745
746 /* send qualified type name */
747 logicalrep_write_namespace(out, typtup->typnamespace);
748 pq_sendstring(out, NameStr(typtup->typname));
749
750 ReleaseSysCache(tup);
751}
Oid getBaseType(Oid typid)
Definition: lsyscache.c:2688

References elog, ERROR, getBaseType(), GETSTRUCT(), HeapTupleIsValid, LOGICAL_REP_MSG_TYPE, logicalrep_write_namespace(), NameStr, ObjectIdGetDatum(), pq_sendbyte(), pq_sendint32(), pq_sendstring(), ReleaseSysCache(), SearchSysCache1(), and TransactionIdIsValid.

Referenced by send_relation_and_attrs().

◆ logicalrep_write_update()

void logicalrep_write_update ( StringInfo  out,
TransactionId  xid,
Relation  rel,
TupleTableSlot oldslot,
TupleTableSlot newslot,
bool  binary,
Bitmapset columns,
PublishGencolsType  include_gencols_type 
)

Definition at line 450 of file proto.c.

454{
456
457 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
458 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
459 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
460
461 /* transaction ID (if not valid, we're not streaming) */
462 if (TransactionIdIsValid(xid))
463 pq_sendint32(out, xid);
464
465 /* use Oid as relation identifier */
467
468 if (oldslot != NULL)
469 {
470 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
471 pq_sendbyte(out, 'O'); /* old tuple follows */
472 else
473 pq_sendbyte(out, 'K'); /* old key follows */
474 logicalrep_write_tuple(out, rel, oldslot, binary, columns,
475 include_gencols_type);
476 }
477
478 pq_sendbyte(out, 'N'); /* new tuple follows */
479 logicalrep_write_tuple(out, rel, newslot, binary, columns,
480 include_gencols_type);
481}

References Assert(), LOGICAL_REP_MSG_UPDATE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().