PostgreSQL Source Code  git master
proto.c File Reference
#include "postgres.h"
#include "access/sysattr.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_type.h"
#include "libpq/pqformat.h"
#include "replication/logicalproto.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
Include dependency graph for proto.c:

Go to the source code of this file.

Macros

#define LOGICALREP_IS_REPLICA_IDENTITY   1
 
#define MESSAGE_TRANSACTIONAL   (1<<0)
 
#define TRUNCATE_CASCADE   (1<<0)
 
#define TRUNCATE_RESTART_SEQS   (1<<1)
 

Functions

static void logicalrep_write_attrs (StringInfo out, Relation rel, Bitmapset *columns)
 
static void logicalrep_write_tuple (StringInfo out, Relation rel, TupleTableSlot *slot, bool binary, Bitmapset *columns)
 
static void logicalrep_read_attrs (StringInfo in, LogicalRepRelation *rel)
 
static void logicalrep_read_tuple (StringInfo in, LogicalRepTupleData *tuple)
 
static void logicalrep_write_namespace (StringInfo out, Oid nspid)
 
static const char * logicalrep_read_namespace (StringInfo in)
 
static bool column_in_column_list (int attnum, Bitmapset *columns)
 
void logicalrep_write_begin (StringInfo out, ReorderBufferTXN *txn)
 
void logicalrep_read_begin (StringInfo in, LogicalRepBeginData *begin_data)
 
void logicalrep_write_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
void logicalrep_read_commit (StringInfo in, LogicalRepCommitData *commit_data)
 
void logicalrep_write_begin_prepare (StringInfo out, ReorderBufferTXN *txn)
 
void logicalrep_read_begin_prepare (StringInfo in, LogicalRepPreparedTxnData *begin_data)
 
static void logicalrep_write_prepare_common (StringInfo out, LogicalRepMsgType type, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
void logicalrep_write_prepare (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static void logicalrep_read_prepare_common (StringInfo in, char *msgtype, LogicalRepPreparedTxnData *prepare_data)
 
void logicalrep_read_prepare (StringInfo in, LogicalRepPreparedTxnData *prepare_data)
 
void logicalrep_write_commit_prepared (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
void logicalrep_read_commit_prepared (StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
 
void logicalrep_write_rollback_prepared (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
void logicalrep_read_rollback_prepared (StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
 
void logicalrep_write_stream_prepare (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
void logicalrep_read_stream_prepare (StringInfo in, LogicalRepPreparedTxnData *prepare_data)
 
void logicalrep_write_origin (StringInfo out, const char *origin, XLogRecPtr origin_lsn)
 
char * logicalrep_read_origin (StringInfo in, XLogRecPtr *origin_lsn)
 
void logicalrep_write_insert (StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns)
 
LogicalRepRelId logicalrep_read_insert (StringInfo in, LogicalRepTupleData *newtup)
 
void logicalrep_write_update (StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns)
 
LogicalRepRelId logicalrep_read_update (StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
 
void logicalrep_write_delete (StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns)
 
LogicalRepRelId logicalrep_read_delete (StringInfo in, LogicalRepTupleData *oldtup)
 
void logicalrep_write_truncate (StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
 
Listlogicalrep_read_truncate (StringInfo in, bool *cascade, bool *restart_seqs)
 
void logicalrep_write_message (StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
void logicalrep_write_rel (StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns)
 
LogicalRepRelationlogicalrep_read_rel (StringInfo in)
 
void logicalrep_write_typ (StringInfo out, TransactionId xid, Oid typoid)
 
void logicalrep_read_typ (StringInfo in, LogicalRepTyp *ltyp)
 
void logicalrep_write_stream_start (StringInfo out, TransactionId xid, bool first_segment)
 
TransactionId logicalrep_read_stream_start (StringInfo in, bool *first_segment)
 
void logicalrep_write_stream_stop (StringInfo out)
 
void logicalrep_write_stream_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
TransactionId logicalrep_read_stream_commit (StringInfo in, LogicalRepCommitData *commit_data)
 
void logicalrep_write_stream_abort (StringInfo out, TransactionId xid, TransactionId subxid, XLogRecPtr abort_lsn, TimestampTz abort_time, bool write_abort_info)
 
void logicalrep_read_stream_abort (StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
 
const char * logicalrep_message_type (LogicalRepMsgType action)
 

Macro Definition Documentation

◆ LOGICALREP_IS_REPLICA_IDENTITY

#define LOGICALREP_IS_REPLICA_IDENTITY   1

Definition at line 26 of file proto.c.

◆ MESSAGE_TRANSACTIONAL

#define MESSAGE_TRANSACTIONAL   (1<<0)

Definition at line 28 of file proto.c.

◆ TRUNCATE_CASCADE

#define TRUNCATE_CASCADE   (1<<0)

Definition at line 29 of file proto.c.

◆ TRUNCATE_RESTART_SEQS

#define TRUNCATE_RESTART_SEQS   (1<<1)

Definition at line 30 of file proto.c.

Function Documentation

◆ column_in_column_list()

static bool column_in_column_list ( int  attnum,
Bitmapset columns 
)
static

Definition at line 50 of file proto.c.

51 {
52  return (columns == NULL || bms_is_member(attnum, columns));
53 }
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:510
int16 attnum
Definition: pg_attribute.h:74

References attnum, and bms_is_member().

Referenced by logicalrep_write_attrs(), and logicalrep_write_tuple().

◆ logicalrep_message_type()

const char* logicalrep_message_type ( LogicalRepMsgType  action)

Definition at line 1217 of file proto.c.

1218 {
1219  static char err_unknown[20];
1220 
1221  switch (action)
1222  {
1223  case LOGICAL_REP_MSG_BEGIN:
1224  return "BEGIN";
1226  return "COMMIT";
1228  return "ORIGIN";
1230  return "INSERT";
1232  return "UPDATE";
1234  return "DELETE";
1236  return "TRUNCATE";
1238  return "RELATION";
1239  case LOGICAL_REP_MSG_TYPE:
1240  return "TYPE";
1242  return "MESSAGE";
1244  return "BEGIN PREPARE";
1246  return "PREPARE";
1248  return "COMMIT PREPARED";
1250  return "ROLLBACK PREPARED";
1252  return "STREAM START";
1254  return "STREAM STOP";
1256  return "STREAM COMMIT";
1258  return "STREAM ABORT";
1260  return "STREAM PREPARE";
1261  }
1262 
1263  /*
1264  * This message provides context in the error raised when applying a
1265  * logical message. So we can't throw an error here. Return an unknown
1266  * indicator value so that the original error is still reported.
1267  */
1268  snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);
1269 
1270  return err_unknown;
1271 }
@ LOGICAL_REP_MSG_INSERT
Definition: logicalproto.h:62
@ LOGICAL_REP_MSG_TRUNCATE
Definition: logicalproto.h:65
@ LOGICAL_REP_MSG_STREAM_STOP
Definition: logicalproto.h:74
@ LOGICAL_REP_MSG_BEGIN
Definition: logicalproto.h:59
@ LOGICAL_REP_MSG_STREAM_PREPARE
Definition: logicalproto.h:77
@ LOGICAL_REP_MSG_STREAM_ABORT
Definition: logicalproto.h:76
@ LOGICAL_REP_MSG_BEGIN_PREPARE
Definition: logicalproto.h:69
@ LOGICAL_REP_MSG_STREAM_START
Definition: logicalproto.h:73
@ LOGICAL_REP_MSG_COMMIT
Definition: logicalproto.h:60
@ LOGICAL_REP_MSG_PREPARE
Definition: logicalproto.h:70
@ LOGICAL_REP_MSG_RELATION
Definition: logicalproto.h:66
@ LOGICAL_REP_MSG_MESSAGE
Definition: logicalproto.h:68
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
Definition: logicalproto.h:72
@ LOGICAL_REP_MSG_COMMIT_PREPARED
Definition: logicalproto.h:71
@ LOGICAL_REP_MSG_TYPE
Definition: logicalproto.h:67
@ LOGICAL_REP_MSG_DELETE
Definition: logicalproto.h:64
@ LOGICAL_REP_MSG_STREAM_COMMIT
Definition: logicalproto.h:75
@ LOGICAL_REP_MSG_ORIGIN
Definition: logicalproto.h:61
@ LOGICAL_REP_MSG_UPDATE
Definition: logicalproto.h:63
#define snprintf
Definition: port.h:238

References generate_unaccent_rules::action, LOGICAL_REP_MSG_BEGIN, LOGICAL_REP_MSG_BEGIN_PREPARE, LOGICAL_REP_MSG_COMMIT, LOGICAL_REP_MSG_COMMIT_PREPARED, LOGICAL_REP_MSG_DELETE, LOGICAL_REP_MSG_INSERT, LOGICAL_REP_MSG_MESSAGE, LOGICAL_REP_MSG_ORIGIN, LOGICAL_REP_MSG_PREPARE, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_ROLLBACK_PREPARED, LOGICAL_REP_MSG_STREAM_ABORT, LOGICAL_REP_MSG_STREAM_COMMIT, LOGICAL_REP_MSG_STREAM_PREPARE, LOGICAL_REP_MSG_STREAM_START, LOGICAL_REP_MSG_STREAM_STOP, LOGICAL_REP_MSG_TRUNCATE, LOGICAL_REP_MSG_TYPE, LOGICAL_REP_MSG_UPDATE, and snprintf.

Referenced by apply_error_callback().

◆ logicalrep_read_attrs()

static void logicalrep_read_attrs ( StringInfo  in,
LogicalRepRelation rel 
)
static

Definition at line 993 of file proto.c.

994 {
995  int i;
996  int natts;
997  char **attnames;
998  Oid *atttyps;
999  Bitmapset *attkeys = NULL;
1000 
1001  natts = pq_getmsgint(in, 2);
1002  attnames = palloc(natts * sizeof(char *));
1003  atttyps = palloc(natts * sizeof(Oid));
1004 
1005  /* read the attributes */
1006  for (i = 0; i < natts; i++)
1007  {
1008  uint8 flags;
1009 
1010  /* Check for replica identity column */
1011  flags = pq_getmsgbyte(in);
1012  if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
1013  attkeys = bms_add_member(attkeys, i);
1014 
1015  /* attribute name */
1016  attnames[i] = pstrdup(pq_getmsgstring(in));
1017 
1018  /* attribute type id */
1019  atttyps[i] = (Oid) pq_getmsgint(in, 4);
1020 
1021  /* we ignore attribute mode for now */
1022  (void) pq_getmsgint(in, 4);
1023  }
1024 
1025  rel->attnames = attnames;
1026  rel->atttyps = atttyps;
1027  rel->attkeys = attkeys;
1028  rel->natts = natts;
1029 }
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:815
unsigned char uint8
Definition: c.h:504
int i
Definition: isn.c:73
char * pstrdup(const char *in)
Definition: mcxt.c:1695
void * palloc(Size size)
Definition: mcxt.c:1316
unsigned int Oid
Definition: postgres_ext.h:31
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:415
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:579
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
#define LOGICALREP_IS_REPLICA_IDENTITY
Definition: proto.c:26
Bitmapset * attkeys
Definition: logicalproto.h:115

References LogicalRepRelation::attkeys, LogicalRepRelation::attnames, LogicalRepRelation::atttyps, bms_add_member(), i, LOGICALREP_IS_REPLICA_IDENTITY, LogicalRepRelation::natts, palloc(), pq_getmsgbyte(), pq_getmsgint(), pq_getmsgstring(), and pstrdup().

Referenced by logicalrep_read_rel().

◆ logicalrep_read_begin()

void logicalrep_read_begin ( StringInfo  in,
LogicalRepBeginData begin_data 
)

Definition at line 74 of file proto.c.

75 {
76  /* read fields */
77  begin_data->final_lsn = pq_getmsgint64(in);
78  if (begin_data->final_lsn == InvalidXLogRecPtr)
79  elog(ERROR, "final_lsn not set in begin message");
80  begin_data->committime = pq_getmsgint64(in);
81  begin_data->xid = pq_getmsgint(in, 4);
82 }
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:224
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:453
XLogRecPtr final_lsn
Definition: logicalproto.h:129
TransactionId xid
Definition: logicalproto.h:131
TimestampTz committime
Definition: logicalproto.h:130
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References LogicalRepBeginData::committime, elog, ERROR, LogicalRepBeginData::final_lsn, InvalidXLogRecPtr, pq_getmsgint(), pq_getmsgint64(), and LogicalRepBeginData::xid.

Referenced by apply_handle_begin().

◆ logicalrep_read_begin_prepare()

void logicalrep_read_begin_prepare ( StringInfo  in,
LogicalRepPreparedTxnData begin_data 
)

Definition at line 145 of file proto.c.

146 {
147  /* read fields */
148  begin_data->prepare_lsn = pq_getmsgint64(in);
149  if (begin_data->prepare_lsn == InvalidXLogRecPtr)
150  elog(ERROR, "prepare_lsn not set in begin prepare message");
151  begin_data->end_lsn = pq_getmsgint64(in);
152  if (begin_data->end_lsn == InvalidXLogRecPtr)
153  elog(ERROR, "end_lsn not set in begin prepare message");
154  begin_data->prepare_time = pq_getmsgint64(in);
155  begin_data->xid = pq_getmsgint(in, 4);
156 
157  /* read gid (copy it into a pre-allocated buffer) */
158  strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
159 }
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45

References elog, LogicalRepPreparedTxnData::end_lsn, ERROR, LogicalRepPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepPreparedTxnData::prepare_lsn, LogicalRepPreparedTxnData::prepare_time, strlcpy(), and LogicalRepPreparedTxnData::xid.

Referenced by apply_handle_begin_prepare().

◆ logicalrep_read_commit()

void logicalrep_read_commit ( StringInfo  in,
LogicalRepCommitData commit_data 
)

Definition at line 109 of file proto.c.

110 {
111  /* read flags (unused for now) */
112  uint8 flags = pq_getmsgbyte(in);
113 
114  if (flags != 0)
115  elog(ERROR, "unrecognized flags %u in commit message", flags);
116 
117  /* read fields */
118  commit_data->commit_lsn = pq_getmsgint64(in);
119  commit_data->end_lsn = pq_getmsgint64(in);
120  commit_data->committime = pq_getmsgint64(in);
121 }
TimestampTz committime
Definition: logicalproto.h:138

References LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, elog, LogicalRepCommitData::end_lsn, ERROR, pq_getmsgbyte(), and pq_getmsgint64().

Referenced by apply_handle_commit().

◆ logicalrep_read_commit_prepared()

void logicalrep_read_commit_prepared ( StringInfo  in,
LogicalRepCommitPreparedTxnData prepare_data 
)

Definition at line 278 of file proto.c.

279 {
280  /* read flags */
281  uint8 flags = pq_getmsgbyte(in);
282 
283  if (flags != 0)
284  elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
285 
286  /* read fields */
287  prepare_data->commit_lsn = pq_getmsgint64(in);
288  if (prepare_data->commit_lsn == InvalidXLogRecPtr)
289  elog(ERROR, "commit_lsn is not set in commit prepared message");
290  prepare_data->end_lsn = pq_getmsgint64(in);
291  if (prepare_data->end_lsn == InvalidXLogRecPtr)
292  elog(ERROR, "end_lsn is not set in commit prepared message");
293  prepare_data->commit_time = pq_getmsgint64(in);
294  prepare_data->xid = pq_getmsgint(in, 4);
295 
296  /* read gid (copy it into a pre-allocated buffer) */
297  strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
298 }

References LogicalRepCommitPreparedTxnData::commit_lsn, LogicalRepCommitPreparedTxnData::commit_time, elog, LogicalRepCommitPreparedTxnData::end_lsn, ERROR, LogicalRepCommitPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), strlcpy(), and LogicalRepCommitPreparedTxnData::xid.

Referenced by apply_handle_commit_prepared().

◆ logicalrep_read_delete()

LogicalRepRelId logicalrep_read_delete ( StringInfo  in,
LogicalRepTupleData oldtup 
)

Definition at line 564 of file proto.c.

565 {
566  char action;
567  LogicalRepRelId relid;
568 
569  /* read the relation id */
570  relid = pq_getmsgint(in, 4);
571 
572  /* read and verify action */
573  action = pq_getmsgbyte(in);
574  if (action != 'K' && action != 'O')
575  elog(ERROR, "expected action 'O' or 'K', got %c", action);
576 
577  logicalrep_read_tuple(in, oldtup);
578 
579  return relid;
580 }
uint32 LogicalRepRelId
Definition: logicalproto.h:101
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:866

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_delete().

◆ logicalrep_read_insert()

LogicalRepRelId logicalrep_read_insert ( StringInfo  in,
LogicalRepTupleData newtup 
)

Definition at line 436 of file proto.c.

437 {
438  char action;
439  LogicalRepRelId relid;
440 
441  /* read the relation id */
442  relid = pq_getmsgint(in, 4);
443 
444  action = pq_getmsgbyte(in);
445  if (action != 'N')
446  elog(ERROR, "expected new tuple but got %d",
447  action);
448 
449  logicalrep_read_tuple(in, newtup);
450 
451  return relid;
452 }

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_insert().

◆ logicalrep_read_namespace()

static const char * logicalrep_read_namespace ( StringInfo  in)
static

Definition at line 1055 of file proto.c.

1056 {
1057  const char *nspname = pq_getmsgstring(in);
1058 
1059  if (nspname[0] == '\0')
1060  nspname = "pg_catalog";
1061 
1062  return nspname;
1063 }

References pq_getmsgstring().

Referenced by logicalrep_read_rel(), and logicalrep_read_typ().

◆ logicalrep_read_origin()

char* logicalrep_read_origin ( StringInfo  in,
XLogRecPtr origin_lsn 
)

Definition at line 401 of file proto.c.

402 {
403  /* fixed fields */
404  *origin_lsn = pq_getmsgint64(in);
405 
406  /* return origin */
407  return pstrdup(pq_getmsgstring(in));
408 }

References pq_getmsgint64(), pq_getmsgstring(), and pstrdup().

◆ logicalrep_read_prepare()

void logicalrep_read_prepare ( StringInfo  in,
LogicalRepPreparedTxnData prepare_data 
)

Definition at line 239 of file proto.c.

240 {
241  logicalrep_read_prepare_common(in, "prepare", prepare_data);
242 }
static void logicalrep_read_prepare_common(StringInfo in, char *msgtype, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:210

References logicalrep_read_prepare_common().

Referenced by apply_handle_prepare().

◆ logicalrep_read_prepare_common()

static void logicalrep_read_prepare_common ( StringInfo  in,
char *  msgtype,
LogicalRepPreparedTxnData prepare_data 
)
static

Definition at line 210 of file proto.c.

212 {
213  /* read flags */
214  uint8 flags = pq_getmsgbyte(in);
215 
216  if (flags != 0)
217  elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
218 
219  /* read fields */
220  prepare_data->prepare_lsn = pq_getmsgint64(in);
221  if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
222  elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
223  prepare_data->end_lsn = pq_getmsgint64(in);
224  if (prepare_data->end_lsn == InvalidXLogRecPtr)
225  elog(ERROR, "end_lsn is not set in %s message", msgtype);
226  prepare_data->prepare_time = pq_getmsgint64(in);
227  prepare_data->xid = pq_getmsgint(in, 4);
228  if (prepare_data->xid == InvalidTransactionId)
229  elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
230 
231  /* read gid (copy it into a pre-allocated buffer) */
232  strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
233 }
#define InvalidTransactionId
Definition: transam.h:31

References elog, LogicalRepPreparedTxnData::end_lsn, ERROR, LogicalRepPreparedTxnData::gid, InvalidTransactionId, InvalidXLogRecPtr, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepPreparedTxnData::prepare_lsn, LogicalRepPreparedTxnData::prepare_time, strlcpy(), and LogicalRepPreparedTxnData::xid.

Referenced by logicalrep_read_prepare(), and logicalrep_read_stream_prepare().

◆ logicalrep_read_rel()

LogicalRepRelation* logicalrep_read_rel ( StringInfo  in)

Definition at line 700 of file proto.c.

701 {
703 
704  rel->remoteid = pq_getmsgint(in, 4);
705 
706  /* Read relation name from stream */
708  rel->relname = pstrdup(pq_getmsgstring(in));
709 
710  /* Read the replica identity. */
711  rel->replident = pq_getmsgbyte(in);
712 
713  /* Get attribute description */
714  logicalrep_read_attrs(in, rel);
715 
716  return rel;
717 }
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
Definition: proto.c:993
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:1055
LogicalRepRelId remoteid
Definition: logicalproto.h:107

References logicalrep_read_attrs(), logicalrep_read_namespace(), LogicalRepRelation::nspname, palloc(), pq_getmsgbyte(), pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepRelation::relname, LogicalRepRelation::remoteid, and LogicalRepRelation::replident.

Referenced by apply_handle_relation().

◆ logicalrep_read_rollback_prepared()

void logicalrep_read_rollback_prepared ( StringInfo  in,
LogicalRepRollbackPreparedTxnData rollback_data 
)

Definition at line 336 of file proto.c.

338 {
339  /* read flags */
340  uint8 flags = pq_getmsgbyte(in);
341 
342  if (flags != 0)
343  elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
344 
345  /* read fields */
346  rollback_data->prepare_end_lsn = pq_getmsgint64(in);
347  if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
348  elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
349  rollback_data->rollback_end_lsn = pq_getmsgint64(in);
350  if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
351  elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
352  rollback_data->prepare_time = pq_getmsgint64(in);
353  rollback_data->rollback_time = pq_getmsgint64(in);
354  rollback_data->xid = pq_getmsgint(in, 4);
355 
356  /* read gid (copy it into a pre-allocated buffer) */
357  strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
358 }

References elog, ERROR, LogicalRepRollbackPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepRollbackPreparedTxnData::prepare_end_lsn, LogicalRepRollbackPreparedTxnData::prepare_time, LogicalRepRollbackPreparedTxnData::rollback_end_lsn, LogicalRepRollbackPreparedTxnData::rollback_time, strlcpy(), and LogicalRepRollbackPreparedTxnData::xid.

Referenced by apply_handle_rollback_prepared().

◆ logicalrep_read_stream_abort()

void logicalrep_read_stream_abort ( StringInfo  in,
LogicalRepStreamAbortData abort_data,
bool  read_abort_info 
)

Definition at line 1192 of file proto.c.

1195 {
1196  Assert(abort_data);
1197 
1198  abort_data->xid = pq_getmsgint(in, 4);
1199  abort_data->subxid = pq_getmsgint(in, 4);
1200 
1201  if (read_abort_info)
1202  {
1203  abort_data->abort_lsn = pq_getmsgint64(in);
1204  abort_data->abort_time = pq_getmsgint64(in);
1205  }
1206  else
1207  {
1208  abort_data->abort_lsn = InvalidXLogRecPtr;
1209  abort_data->abort_time = 0;
1210  }
1211 }
#define Assert(condition)
Definition: c.h:858

References LogicalRepStreamAbortData::abort_lsn, LogicalRepStreamAbortData::abort_time, Assert, InvalidXLogRecPtr, pq_getmsgint(), pq_getmsgint64(), LogicalRepStreamAbortData::subxid, and LogicalRepStreamAbortData::xid.

Referenced by apply_handle_stream_abort().

◆ logicalrep_read_stream_commit()

TransactionId logicalrep_read_stream_commit ( StringInfo  in,
LogicalRepCommitData commit_data 
)

Definition at line 1137 of file proto.c.

1138 {
1139  TransactionId xid;
1140  uint8 flags;
1141 
1142  xid = pq_getmsgint(in, 4);
1143 
1144  /* read flags (unused for now) */
1145  flags = pq_getmsgbyte(in);
1146 
1147  if (flags != 0)
1148  elog(ERROR, "unrecognized flags %u in commit message", flags);
1149 
1150  /* read fields */
1151  commit_data->commit_lsn = pq_getmsgint64(in);
1152  commit_data->end_lsn = pq_getmsgint64(in);
1153  commit_data->committime = pq_getmsgint64(in);
1154 
1155  return xid;
1156 }
uint32 TransactionId
Definition: c.h:652

References LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, elog, LogicalRepCommitData::end_lsn, ERROR, pq_getmsgbyte(), pq_getmsgint(), and pq_getmsgint64().

Referenced by apply_handle_stream_commit().

◆ logicalrep_read_stream_prepare()

void logicalrep_read_stream_prepare ( StringInfo  in,
LogicalRepPreparedTxnData prepare_data 
)

Definition at line 376 of file proto.c.

377 {
378  logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
379 }

References logicalrep_read_prepare_common().

Referenced by apply_handle_stream_prepare().

◆ logicalrep_read_stream_start()

TransactionId logicalrep_read_stream_start ( StringInfo  in,
bool first_segment 
)

Definition at line 1087 of file proto.c.

1088 {
1089  TransactionId xid;
1090 
1091  Assert(first_segment);
1092 
1093  xid = pq_getmsgint(in, 4);
1094  *first_segment = (pq_getmsgbyte(in) == 1);
1095 
1096  return xid;
1097 }

References Assert, pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_stream_start().

◆ logicalrep_read_truncate()

List* logicalrep_read_truncate ( StringInfo  in,
bool cascade,
bool restart_seqs 
)

Definition at line 618 of file proto.c.

620 {
621  int i;
622  int nrelids;
623  List *relids = NIL;
624  uint8 flags;
625 
626  nrelids = pq_getmsgint(in, 4);
627 
628  /* read and decode truncate flags */
629  flags = pq_getmsgint(in, 1);
630  *cascade = (flags & TRUNCATE_CASCADE) > 0;
631  *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
632 
633  for (i = 0; i < nrelids; i++)
634  relids = lappend_oid(relids, pq_getmsgint(in, 4));
635 
636  return relids;
637 }
List * lappend_oid(List *list, Oid datum)
Definition: list.c:375
#define NIL
Definition: pg_list.h:68
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:30
#define TRUNCATE_CASCADE
Definition: proto.c:29
Definition: pg_list.h:54

References i, lappend_oid(), NIL, pq_getmsgint(), TRUNCATE_CASCADE, and TRUNCATE_RESTART_SEQS.

Referenced by apply_handle_truncate().

◆ logicalrep_read_tuple()

static void logicalrep_read_tuple ( StringInfo  in,
LogicalRepTupleData tuple 
)
static

Definition at line 866 of file proto.c.

867 {
868  int i;
869  int natts;
870 
871  /* Get number of attributes */
872  natts = pq_getmsgint(in, 2);
873 
874  /* Allocate space for per-column values; zero out unused StringInfoDatas */
875  tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
876  tuple->colstatus = (char *) palloc(natts * sizeof(char));
877  tuple->ncols = natts;
878 
879  /* Read the data */
880  for (i = 0; i < natts; i++)
881  {
882  char *buff;
883  char kind;
884  int len;
885  StringInfo value = &tuple->colvalues[i];
886 
887  kind = pq_getmsgbyte(in);
888  tuple->colstatus[i] = kind;
889 
890  switch (kind)
891  {
893  /* nothing more to do */
894  break;
896  /* we don't receive the value of an unchanged column */
897  break;
900  len = pq_getmsgint(in, 4); /* read length */
901 
902  /* and data */
903  buff = palloc(len + 1);
904  pq_copymsgbytes(in, buff, len);
905 
906  /*
907  * NUL termination is required for LOGICALREP_COLUMN_TEXT mode
908  * as input functions require that. For
909  * LOGICALREP_COLUMN_BINARY it's not technically required, but
910  * it's harmless.
911  */
912  buff[len] = '\0';
913 
915  break;
916  default:
917  elog(ERROR, "unrecognized data representation type '%c'", kind);
918  }
919  }
920 }
static struct @155 value
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:97
#define LOGICALREP_COLUMN_NULL
Definition: logicalproto.h:96
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:99
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:98
void * palloc0(Size size)
Definition: mcxt.c:1346
const void size_t len
void pq_copymsgbytes(StringInfo msg, char *buf, int datalen)
Definition: pqformat.c:528
static void initStringInfoFromString(StringInfo str, char *data, int len)
Definition: stringinfo.h:148
StringInfoData * colvalues
Definition: logicalproto.h:87

References LogicalRepTupleData::colstatus, LogicalRepTupleData::colvalues, elog, ERROR, i, initStringInfoFromString(), len, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_NULL, LOGICALREP_COLUMN_TEXT, LOGICALREP_COLUMN_UNCHANGED, LogicalRepTupleData::ncols, palloc(), palloc0(), pq_copymsgbytes(), pq_getmsgbyte(), pq_getmsgint(), and value.

Referenced by logicalrep_read_delete(), logicalrep_read_insert(), and logicalrep_read_update().

◆ logicalrep_read_typ()

void logicalrep_read_typ ( StringInfo  in,
LogicalRepTyp ltyp 
)

Definition at line 756 of file proto.c.

757 {
758  ltyp->remoteid = pq_getmsgint(in, 4);
759 
760  /* Read type name from stream */
762  ltyp->typname = pstrdup(pq_getmsgstring(in));
763 }

References logicalrep_read_namespace(), LogicalRepTyp::nspname, pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepTyp::remoteid, and LogicalRepTyp::typname.

Referenced by apply_handle_type().

◆ logicalrep_read_update()

LogicalRepRelId logicalrep_read_update ( StringInfo  in,
bool has_oldtuple,
LogicalRepTupleData oldtup,
LogicalRepTupleData newtup 
)

Definition at line 492 of file proto.c.

495 {
496  char action;
497  LogicalRepRelId relid;
498 
499  /* read the relation id */
500  relid = pq_getmsgint(in, 4);
501 
502  /* read and verify action */
503  action = pq_getmsgbyte(in);
504  if (action != 'K' && action != 'O' && action != 'N')
505  elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
506  action);
507 
508  /* check for old tuple */
509  if (action == 'K' || action == 'O')
510  {
511  logicalrep_read_tuple(in, oldtup);
512  *has_oldtuple = true;
513 
514  action = pq_getmsgbyte(in);
515  }
516  else
517  *has_oldtuple = false;
518 
519  /* check for new tuple */
520  if (action != 'N')
521  elog(ERROR, "expected action 'N', got %c",
522  action);
523 
524  logicalrep_read_tuple(in, newtup);
525 
526  return relid;
527 }

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_update().

◆ logicalrep_write_attrs()

static void logicalrep_write_attrs ( StringInfo  out,
Relation  rel,
Bitmapset columns 
)
static

Definition at line 926 of file proto.c.

927 {
928  TupleDesc desc;
929  int i;
930  uint16 nliveatts = 0;
931  Bitmapset *idattrs = NULL;
932  bool replidentfull;
933 
934  desc = RelationGetDescr(rel);
935 
936  /* send number of live attributes */
937  for (i = 0; i < desc->natts; i++)
938  {
939  Form_pg_attribute att = TupleDescAttr(desc, i);
940 
941  if (att->attisdropped || att->attgenerated)
942  continue;
943 
944  if (!column_in_column_list(att->attnum, columns))
945  continue;
946 
947  nliveatts++;
948  }
949  pq_sendint16(out, nliveatts);
950 
951  /* fetch bitmap of REPLICATION IDENTITY attributes */
952  replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
953  if (!replidentfull)
954  idattrs = RelationGetIdentityKeyBitmap(rel);
955 
956  /* send the attributes */
957  for (i = 0; i < desc->natts; i++)
958  {
959  Form_pg_attribute att = TupleDescAttr(desc, i);
960  uint8 flags = 0;
961 
962  if (att->attisdropped || att->attgenerated)
963  continue;
964 
965  if (!column_in_column_list(att->attnum, columns))
966  continue;
967 
968  /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
969  if (replidentfull ||
971  idattrs))
973 
974  pq_sendbyte(out, flags);
975 
976  /* attribute name */
977  pq_sendstring(out, NameStr(att->attname));
978 
979  /* attribute type id */
980  pq_sendint32(out, (int) att->atttypid);
981 
982  /* attribute mode */
983  pq_sendint32(out, att->atttypmod);
984  }
985 
986  bms_free(idattrs);
987 }
void bms_free(Bitmapset *a)
Definition: bitmapset.c:239
#define NameStr(name)
Definition: c.h:746
unsigned short uint16
Definition: c.h:505
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:209
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:195
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:144
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:136
static bool column_in_column_list(int attnum, Bitmapset *columns)
Definition: proto.c:50
#define RelationGetDescr(relation)
Definition: rel.h:531
Bitmapset * RelationGetIdentityKeyBitmap(Relation relation)
Definition: relcache.c:5516
Form_pg_class rd_rel
Definition: rel.h:111
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92

References bms_free(), bms_is_member(), column_in_column_list(), FirstLowInvalidHeapAttributeNumber, i, LOGICALREP_IS_REPLICA_IDENTITY, NameStr, TupleDescData::natts, pq_sendbyte(), pq_sendint16(), pq_sendint32(), pq_sendstring(), RelationData::rd_rel, RelationGetDescr, RelationGetIdentityKeyBitmap(), and TupleDescAttr.

Referenced by logicalrep_write_rel().

◆ logicalrep_write_begin()

void logicalrep_write_begin ( StringInfo  out,
ReorderBufferTXN txn 
)

Definition at line 60 of file proto.c.

61 {
63 
64  /* fixed fields */
65  pq_sendint64(out, txn->final_lsn);
67  pq_sendint32(out, txn->xid);
68 }
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152
TimestampTz commit_time
XLogRecPtr final_lsn
TransactionId xid
union ReorderBufferTXN::@111 xact_time

References ReorderBufferTXN::commit_time, ReorderBufferTXN::final_lsn, LOGICAL_REP_MSG_BEGIN, pq_sendbyte(), pq_sendint32(), pq_sendint64(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_send_begin().

◆ logicalrep_write_begin_prepare()

void logicalrep_write_begin_prepare ( StringInfo  out,
ReorderBufferTXN txn 
)

◆ logicalrep_write_commit()

void logicalrep_write_commit ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 89 of file proto.c.

91 {
92  uint8 flags = 0;
93 
95 
96  /* send the flags field (unused for now) */
97  pq_sendbyte(out, flags);
98 
99  /* send fields */
100  pq_sendint64(out, commit_lsn);
101  pq_sendint64(out, txn->end_lsn);
103 }

References ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, LOGICAL_REP_MSG_COMMIT, pq_sendbyte(), pq_sendint64(), and ReorderBufferTXN::xact_time.

Referenced by pgoutput_commit_txn().

◆ logicalrep_write_commit_prepared()

void logicalrep_write_commit_prepared ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 248 of file proto.c.

250 {
251  uint8 flags = 0;
252 
254 
255  /*
256  * This should only ever happen for two-phase commit transactions, in
257  * which case we expect to have a valid GID.
258  */
259  Assert(txn->gid != NULL);
260 
261  /* send the flags field */
262  pq_sendbyte(out, flags);
263 
264  /* send fields */
265  pq_sendint64(out, commit_lsn);
266  pq_sendint64(out, txn->end_lsn);
268  pq_sendint32(out, txn->xid);
269 
270  /* send gid */
271  pq_sendstring(out, txn->gid);
272 }

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_COMMIT_PREPARED, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_commit_prepared_txn().

◆ logicalrep_write_delete()

void logicalrep_write_delete ( StringInfo  out,
TransactionId  xid,
Relation  rel,
TupleTableSlot oldslot,
bool  binary,
Bitmapset columns 
)

Definition at line 533 of file proto.c.

536 {
537  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
538  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
539  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
540 
542 
543  /* transaction ID (if not valid, we're not streaming) */
544  if (TransactionIdIsValid(xid))
545  pq_sendint32(out, xid);
546 
547  /* use Oid as relation identifier */
548  pq_sendint32(out, RelationGetRelid(rel));
549 
550  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
551  pq_sendbyte(out, 'O'); /* old tuple follows */
552  else
553  pq_sendbyte(out, 'K'); /* old key follows */
554 
555  logicalrep_write_tuple(out, rel, oldslot, binary, columns);
556 }
static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, bool binary, Bitmapset *columns)
Definition: proto.c:769
#define RelationGetRelid(relation)
Definition: rel.h:505
#define TransactionIdIsValid(xid)
Definition: transam.h:41

References Assert, LOGICAL_REP_MSG_DELETE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

◆ logicalrep_write_insert()

void logicalrep_write_insert ( StringInfo  out,
TransactionId  xid,
Relation  rel,
TupleTableSlot newslot,
bool  binary,
Bitmapset columns 
)

Definition at line 414 of file proto.c.

416 {
418 
419  /* transaction ID (if not valid, we're not streaming) */
420  if (TransactionIdIsValid(xid))
421  pq_sendint32(out, xid);
422 
423  /* use Oid as relation identifier */
424  pq_sendint32(out, RelationGetRelid(rel));
425 
426  pq_sendbyte(out, 'N'); /* new tuple follows */
427  logicalrep_write_tuple(out, rel, newslot, binary, columns);
428 }

References LOGICAL_REP_MSG_INSERT, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

◆ logicalrep_write_message()

void logicalrep_write_message ( StringInfo  out,
TransactionId  xid,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)

Definition at line 643 of file proto.c.

646 {
647  uint8 flags = 0;
648 
650 
651  /* encode and send message flags */
652  if (transactional)
653  flags |= MESSAGE_TRANSACTIONAL;
654 
655  /* transaction ID (if not valid, we're not streaming) */
656  if (TransactionIdIsValid(xid))
657  pq_sendint32(out, xid);
658 
659  pq_sendint8(out, flags);
660  pq_sendint64(out, lsn);
661  pq_sendstring(out, prefix);
662  pq_sendint32(out, sz);
663  pq_sendbytes(out, message, sz);
664 }
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
Definition: pqformat.c:126
static void pq_sendint8(StringInfo buf, uint8 i)
Definition: pqformat.h:128
#define MESSAGE_TRANSACTIONAL
Definition: proto.c:28

References LOGICAL_REP_MSG_MESSAGE, MESSAGE_TRANSACTIONAL, pq_sendbyte(), pq_sendbytes(), pq_sendint32(), pq_sendint64(), pq_sendint8(), pq_sendstring(), and TransactionIdIsValid.

Referenced by pgoutput_message().

◆ logicalrep_write_namespace()

static void logicalrep_write_namespace ( StringInfo  out,
Oid  nspid 
)
static

Definition at line 1035 of file proto.c.

1036 {
1037  if (nspid == PG_CATALOG_NAMESPACE)
1038  pq_sendbyte(out, '\0');
1039  else
1040  {
1041  char *nspname = get_namespace_name(nspid);
1042 
1043  if (nspname == NULL)
1044  elog(ERROR, "cache lookup failed for namespace %u",
1045  nspid);
1046 
1047  pq_sendstring(out, nspname);
1048  }
1049 }
int nspid
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3366

References elog, ERROR, get_namespace_name(), nspid, pq_sendbyte(), and pq_sendstring().

Referenced by logicalrep_write_rel(), and logicalrep_write_typ().

◆ logicalrep_write_origin()

void logicalrep_write_origin ( StringInfo  out,
const char *  origin,
XLogRecPtr  origin_lsn 
)

Definition at line 385 of file proto.c.

387 {
389 
390  /* fixed fields */
391  pq_sendint64(out, origin_lsn);
392 
393  /* origin string */
394  pq_sendstring(out, origin);
395 }

References LOGICAL_REP_MSG_ORIGIN, pq_sendbyte(), pq_sendint64(), and pq_sendstring().

Referenced by send_repl_origin().

◆ logicalrep_write_prepare()

void logicalrep_write_prepare ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)

Definition at line 198 of file proto.c.

200 {
202  txn, prepare_lsn);
203 }
static void logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:166

References LOGICAL_REP_MSG_PREPARE, and logicalrep_write_prepare_common().

Referenced by pgoutput_prepare_txn().

◆ logicalrep_write_prepare_common()

static void logicalrep_write_prepare_common ( StringInfo  out,
LogicalRepMsgType  type,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 166 of file proto.c.

168 {
169  uint8 flags = 0;
170 
171  pq_sendbyte(out, type);
172 
173  /*
174  * This should only ever happen for two-phase commit transactions, in
175  * which case we expect to have a valid GID.
176  */
177  Assert(txn->gid != NULL);
178  Assert(rbtxn_prepared(txn));
180 
181  /* send the flags field */
182  pq_sendbyte(out, flags);
183 
184  /* send fields */
185  pq_sendint64(out, prepare_lsn);
186  pq_sendint64(out, txn->end_lsn);
188  pq_sendint32(out, txn->xid);
189 
190  /* send gid */
191  pq_sendstring(out, txn->gid);
192 }
#define rbtxn_prepared(txn)
const char * type

References Assert, ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::prepare_time, rbtxn_prepared, TransactionIdIsValid, type, ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by logicalrep_write_prepare(), and logicalrep_write_stream_prepare().

◆ logicalrep_write_rel()

void logicalrep_write_rel ( StringInfo  out,
TransactionId  xid,
Relation  rel,
Bitmapset columns 
)

Definition at line 670 of file proto.c.

672 {
673  char *relname;
674 
676 
677  /* transaction ID (if not valid, we're not streaming) */
678  if (TransactionIdIsValid(xid))
679  pq_sendint32(out, xid);
680 
681  /* use Oid as relation identifier */
682  pq_sendint32(out, RelationGetRelid(rel));
683 
684  /* send qualified relation name */
687  pq_sendstring(out, relname);
688 
689  /* send replica identity */
690  pq_sendbyte(out, rel->rd_rel->relreplident);
691 
692  /* send the attribute info */
693  logicalrep_write_attrs(out, rel, columns);
694 }
NameData relname
Definition: pg_class.h:38
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:1035
static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
Definition: proto.c:926
#define RelationGetRelationName(relation)
Definition: rel.h:539
#define RelationGetNamespace(relation)
Definition: rel.h:546

References LOGICAL_REP_MSG_RELATION, logicalrep_write_attrs(), logicalrep_write_namespace(), pq_sendbyte(), pq_sendint32(), pq_sendstring(), RelationData::rd_rel, RelationGetNamespace, RelationGetRelationName, RelationGetRelid, relname, and TransactionIdIsValid.

Referenced by send_relation_and_attrs().

◆ logicalrep_write_rollback_prepared()

void logicalrep_write_rollback_prepared ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  prepare_end_lsn,
TimestampTz  prepare_time 
)

Definition at line 304 of file proto.c.

307 {
308  uint8 flags = 0;
309 
311 
312  /*
313  * This should only ever happen for two-phase commit transactions, in
314  * which case we expect to have a valid GID.
315  */
316  Assert(txn->gid != NULL);
317 
318  /* send the flags field */
319  pq_sendbyte(out, flags);
320 
321  /* send fields */
322  pq_sendint64(out, prepare_end_lsn);
323  pq_sendint64(out, txn->end_lsn);
324  pq_sendint64(out, prepare_time);
326  pq_sendint32(out, txn->xid);
327 
328  /* send gid */
329  pq_sendstring(out, txn->gid);
330 }

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_ROLLBACK_PREPARED, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_rollback_prepared_txn().

◆ logicalrep_write_stream_abort()

void logicalrep_write_stream_abort ( StringInfo  out,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  abort_lsn,
TimestampTz  abort_time,
bool  write_abort_info 
)

Definition at line 1166 of file proto.c.

1169 {
1171 
1173 
1174  /* transaction ID */
1175  pq_sendint32(out, xid);
1176  pq_sendint32(out, subxid);
1177 
1178  if (write_abort_info)
1179  {
1180  pq_sendint64(out, abort_lsn);
1181  pq_sendint64(out, abort_time);
1182  }
1183 }

References Assert, LOGICAL_REP_MSG_STREAM_ABORT, pq_sendbyte(), pq_sendint32(), pq_sendint64(), and TransactionIdIsValid.

Referenced by pgoutput_stream_abort().

◆ logicalrep_write_stream_commit()

void logicalrep_write_stream_commit ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 1112 of file proto.c.

1114 {
1115  uint8 flags = 0;
1116 
1118 
1120 
1121  /* transaction ID */
1122  pq_sendint32(out, txn->xid);
1123 
1124  /* send the flags field (unused for now) */
1125  pq_sendbyte(out, flags);
1126 
1127  /* send fields */
1128  pq_sendint64(out, commit_lsn);
1129  pq_sendint64(out, txn->end_lsn);
1130  pq_sendint64(out, txn->xact_time.commit_time);
1131 }

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, LOGICAL_REP_MSG_STREAM_COMMIT, pq_sendbyte(), pq_sendint32(), pq_sendint64(), TransactionIdIsValid, ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_stream_commit().

◆ logicalrep_write_stream_prepare()

void logicalrep_write_stream_prepare ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)

Definition at line 364 of file proto.c.

367 {
369  txn, prepare_lsn);
370 }

References LOGICAL_REP_MSG_STREAM_PREPARE, and logicalrep_write_prepare_common().

Referenced by pgoutput_stream_prepare_txn().

◆ logicalrep_write_stream_start()

void logicalrep_write_stream_start ( StringInfo  out,
TransactionId  xid,
bool  first_segment 
)

Definition at line 1069 of file proto.c.

1071 {
1073 
1075 
1076  /* transaction ID (we're starting to stream, so must be valid) */
1077  pq_sendint32(out, xid);
1078 
1079  /* 1 if this is the first streaming segment for this xid */
1080  pq_sendbyte(out, first_segment ? 1 : 0);
1081 }

References Assert, LOGICAL_REP_MSG_STREAM_START, pq_sendbyte(), pq_sendint32(), and TransactionIdIsValid.

Referenced by pgoutput_stream_start().

◆ logicalrep_write_stream_stop()

void logicalrep_write_stream_stop ( StringInfo  out)

Definition at line 1103 of file proto.c.

1104 {
1106 }

References LOGICAL_REP_MSG_STREAM_STOP, and pq_sendbyte().

Referenced by pgoutput_stream_stop().

◆ logicalrep_write_truncate()

void logicalrep_write_truncate ( StringInfo  out,
TransactionId  xid,
int  nrelids,
Oid  relids[],
bool  cascade,
bool  restart_seqs 
)

Definition at line 586 of file proto.c.

591 {
592  int i;
593  uint8 flags = 0;
594 
596 
597  /* transaction ID (if not valid, we're not streaming) */
598  if (TransactionIdIsValid(xid))
599  pq_sendint32(out, xid);
600 
601  pq_sendint32(out, nrelids);
602 
603  /* encode and send truncate flags */
604  if (cascade)
605  flags |= TRUNCATE_CASCADE;
606  if (restart_seqs)
607  flags |= TRUNCATE_RESTART_SEQS;
608  pq_sendint8(out, flags);
609 
610  for (i = 0; i < nrelids; i++)
611  pq_sendint32(out, relids[i]);
612 }

References i, LOGICAL_REP_MSG_TRUNCATE, pq_sendbyte(), pq_sendint32(), pq_sendint8(), TransactionIdIsValid, TRUNCATE_CASCADE, and TRUNCATE_RESTART_SEQS.

Referenced by pgoutput_truncate().

◆ logicalrep_write_tuple()

static void logicalrep_write_tuple ( StringInfo  out,
Relation  rel,
TupleTableSlot slot,
bool  binary,
Bitmapset columns 
)
static

Definition at line 769 of file proto.c.

771 {
772  TupleDesc desc;
773  Datum *values;
774  bool *isnull;
775  int i;
776  uint16 nliveatts = 0;
777 
778  desc = RelationGetDescr(rel);
779 
780  for (i = 0; i < desc->natts; i++)
781  {
782  Form_pg_attribute att = TupleDescAttr(desc, i);
783 
784  if (att->attisdropped || att->attgenerated)
785  continue;
786 
787  if (!column_in_column_list(att->attnum, columns))
788  continue;
789 
790  nliveatts++;
791  }
792  pq_sendint16(out, nliveatts);
793 
794  slot_getallattrs(slot);
795  values = slot->tts_values;
796  isnull = slot->tts_isnull;
797 
798  /* Write the values */
799  for (i = 0; i < desc->natts; i++)
800  {
801  HeapTuple typtup;
802  Form_pg_type typclass;
803  Form_pg_attribute att = TupleDescAttr(desc, i);
804 
805  if (att->attisdropped || att->attgenerated)
806  continue;
807 
808  if (!column_in_column_list(att->attnum, columns))
809  continue;
810 
811  if (isnull[i])
812  {
814  continue;
815  }
816 
817  if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
818  {
819  /*
820  * Unchanged toasted datum. (Note that we don't promise to detect
821  * unchanged data in general; this is just a cheap check to avoid
822  * sending large values unnecessarily.)
823  */
825  continue;
826  }
827 
828  typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
829  if (!HeapTupleIsValid(typtup))
830  elog(ERROR, "cache lookup failed for type %u", att->atttypid);
831  typclass = (Form_pg_type) GETSTRUCT(typtup);
832 
833  /*
834  * Send in binary if requested and type has suitable send function.
835  */
836  if (binary && OidIsValid(typclass->typsend))
837  {
838  bytea *outputbytes;
839  int len;
840 
842  outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
843  len = VARSIZE(outputbytes) - VARHDRSZ;
844  pq_sendint(out, len, 4); /* length */
845  pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
846  pfree(outputbytes);
847  }
848  else
849  {
850  char *outputstr;
851 
853  outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
854  pq_sendcountedtext(out, outputstr, strlen(outputstr));
855  pfree(outputstr);
856  }
857 
858  ReleaseSysCache(typtup);
859  }
860 }
static Datum values[MAXATTR]
Definition: bootstrap.c:152
#define VARHDRSZ
Definition: c.h:692
#define OidIsValid(objectId)
Definition: c.h:775
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1763
bytea * OidSendFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1782
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
void pfree(void *pointer)
Definition: mcxt.c:1520
FormData_pg_type * Form_pg_type
Definition: pg_type.h:261
uintptr_t Datum
Definition: postgres.h:64
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
void pq_sendcountedtext(StringInfo buf, const char *str, int slen)
Definition: pqformat.c:142
static void pq_sendint(StringInfo buf, uint32 i, int b)
Definition: pqformat.h:171
bool * tts_isnull
Definition: tuptable.h:127
Datum * tts_values
Definition: tuptable.h:125
Definition: c.h:687
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:266
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:218
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:368
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: varatt.h:290
#define VARDATA(PTR)
Definition: varatt.h:278
#define VARSIZE(PTR)
Definition: varatt.h:279

References column_in_column_list(), elog, ERROR, GETSTRUCT, HeapTupleIsValid, i, len, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_NULL, LOGICALREP_COLUMN_TEXT, LOGICALREP_COLUMN_UNCHANGED, TupleDescData::natts, ObjectIdGetDatum(), OidIsValid, OidOutputFunctionCall(), OidSendFunctionCall(), pfree(), pq_sendbyte(), pq_sendbytes(), pq_sendcountedtext(), pq_sendint(), pq_sendint16(), RelationGetDescr, ReleaseSysCache(), SearchSysCache1(), slot_getallattrs(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, TupleDescAttr, values, VARATT_IS_EXTERNAL_ONDISK, VARDATA, VARHDRSZ, and VARSIZE.

Referenced by logicalrep_write_delete(), logicalrep_write_insert(), and logicalrep_write_update().

◆ logicalrep_write_typ()

void logicalrep_write_typ ( StringInfo  out,
TransactionId  xid,
Oid  typoid 
)

Definition at line 725 of file proto.c.

726 {
727  Oid basetypoid = getBaseType(typoid);
728  HeapTuple tup;
729  Form_pg_type typtup;
730 
732 
733  /* transaction ID (if not valid, we're not streaming) */
734  if (TransactionIdIsValid(xid))
735  pq_sendint32(out, xid);
736 
737  tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
738  if (!HeapTupleIsValid(tup))
739  elog(ERROR, "cache lookup failed for type %u", basetypoid);
740  typtup = (Form_pg_type) GETSTRUCT(tup);
741 
742  /* use Oid as relation identifier */
743  pq_sendint32(out, typoid);
744 
745  /* send qualified type name */
746  logicalrep_write_namespace(out, typtup->typnamespace);
747  pq_sendstring(out, NameStr(typtup->typname));
748 
749  ReleaseSysCache(tup);
750 }
Oid getBaseType(Oid typid)
Definition: lsyscache.c:2521

References elog, ERROR, getBaseType(), GETSTRUCT, HeapTupleIsValid, LOGICAL_REP_MSG_TYPE, logicalrep_write_namespace(), NameStr, ObjectIdGetDatum(), pq_sendbyte(), pq_sendint32(), pq_sendstring(), ReleaseSysCache(), SearchSysCache1(), and TransactionIdIsValid.

Referenced by send_relation_and_attrs().

◆ logicalrep_write_update()

void logicalrep_write_update ( StringInfo  out,
TransactionId  xid,
Relation  rel,
TupleTableSlot oldslot,
TupleTableSlot newslot,
bool  binary,
Bitmapset columns 
)

Definition at line 458 of file proto.c.

461 {
463 
464  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
465  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
466  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
467 
468  /* transaction ID (if not valid, we're not streaming) */
469  if (TransactionIdIsValid(xid))
470  pq_sendint32(out, xid);
471 
472  /* use Oid as relation identifier */
473  pq_sendint32(out, RelationGetRelid(rel));
474 
475  if (oldslot != NULL)
476  {
477  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
478  pq_sendbyte(out, 'O'); /* old tuple follows */
479  else
480  pq_sendbyte(out, 'K'); /* old key follows */
481  logicalrep_write_tuple(out, rel, oldslot, binary, columns);
482  }
483 
484  pq_sendbyte(out, 'N'); /* new tuple follows */
485  logicalrep_write_tuple(out, rel, newslot, binary, columns);
486 }

References Assert, LOGICAL_REP_MSG_UPDATE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().