PostgreSQL Source Code  git master
logicalproto.h File Reference
Include dependency graph for logicalproto.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  LogicalRepTupleData
 
struct  LogicalRepRelation
 
struct  LogicalRepTyp
 
struct  LogicalRepBeginData
 
struct  LogicalRepCommitData
 

Macros

#define LOGICALREP_PROTO_MIN_VERSION_NUM   1
 
#define LOGICALREP_PROTO_VERSION_NUM   1
 
#define LOGICALREP_PROTO_STREAM_VERSION_NUM   2
 
#define LOGICALREP_PROTO_MAX_VERSION_NUM   LOGICALREP_PROTO_STREAM_VERSION_NUM
 
#define LOGICALREP_COLUMN_NULL   'n'
 
#define LOGICALREP_COLUMN_UNCHANGED   'u'
 
#define LOGICALREP_COLUMN_TEXT   't'
 
#define LOGICALREP_COLUMN_BINARY   'b' /* added in PG14 */
 

Typedefs

typedef struct LogicalRepTupleData LogicalRepTupleData
 
typedef uint32 LogicalRepRelId
 
typedef struct LogicalRepRelation LogicalRepRelation
 
typedef struct LogicalRepTyp LogicalRepTyp
 
typedef struct LogicalRepBeginData LogicalRepBeginData
 
typedef struct LogicalRepCommitData LogicalRepCommitData
 

Functions

void logicalrep_write_begin (StringInfo out, ReorderBufferTXN *txn)
 
void logicalrep_read_begin (StringInfo in, LogicalRepBeginData *begin_data)
 
void logicalrep_write_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
void logicalrep_read_commit (StringInfo in, LogicalRepCommitData *commit_data)
 
void logicalrep_write_origin (StringInfo out, const char *origin, XLogRecPtr origin_lsn)
 
char * logicalrep_read_origin (StringInfo in, XLogRecPtr *origin_lsn)
 
void logicalrep_write_insert (StringInfo out, TransactionId xid, Relation rel, HeapTuple newtuple, bool binary)
 
LogicalRepRelId logicalrep_read_insert (StringInfo in, LogicalRepTupleData *newtup)
 
void logicalrep_write_update (StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary)
 
LogicalRepRelId logicalrep_read_update (StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
 
void logicalrep_write_delete (StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, bool binary)
 
LogicalRepRelId logicalrep_read_delete (StringInfo in, LogicalRepTupleData *oldtup)
 
void logicalrep_write_truncate (StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
 
Listlogicalrep_read_truncate (StringInfo in, bool *cascade, bool *restart_seqs)
 
void logicalrep_write_rel (StringInfo out, TransactionId xid, Relation rel)
 
LogicalRepRelationlogicalrep_read_rel (StringInfo in)
 
void logicalrep_write_typ (StringInfo out, TransactionId xid, Oid typoid)
 
void logicalrep_read_typ (StringInfo out, LogicalRepTyp *ltyp)
 
void logicalrep_write_stream_start (StringInfo out, TransactionId xid, bool first_segment)
 
TransactionId logicalrep_read_stream_start (StringInfo in, bool *first_segment)
 
void logicalrep_write_stream_stop (StringInfo out)
 
void logicalrep_write_stream_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
TransactionId logicalrep_read_stream_commit (StringInfo out, LogicalRepCommitData *commit_data)
 
void logicalrep_write_stream_abort (StringInfo out, TransactionId xid, TransactionId subxid)
 
void logicalrep_read_stream_abort (StringInfo in, TransactionId *xid, TransactionId *subxid)
 

Macro Definition Documentation

◆ LOGICALREP_COLUMN_BINARY

#define LOGICALREP_COLUMN_BINARY   'b' /* added in PG14 */

◆ LOGICALREP_COLUMN_NULL

#define LOGICALREP_COLUMN_NULL   'n'

Definition at line 52 of file logicalproto.h.

Referenced by logicalrep_read_tuple(), and logicalrep_write_tuple().

◆ LOGICALREP_COLUMN_TEXT

#define LOGICALREP_COLUMN_TEXT   't'

◆ LOGICALREP_COLUMN_UNCHANGED

#define LOGICALREP_COLUMN_UNCHANGED   'u'

◆ LOGICALREP_PROTO_MAX_VERSION_NUM

#define LOGICALREP_PROTO_MAX_VERSION_NUM   LOGICALREP_PROTO_STREAM_VERSION_NUM

Definition at line 34 of file logicalproto.h.

Referenced by pgoutput_startup().

◆ LOGICALREP_PROTO_MIN_VERSION_NUM

#define LOGICALREP_PROTO_MIN_VERSION_NUM   1

Definition at line 31 of file logicalproto.h.

Referenced by pgoutput_startup().

◆ LOGICALREP_PROTO_STREAM_VERSION_NUM

#define LOGICALREP_PROTO_STREAM_VERSION_NUM   2

Definition at line 33 of file logicalproto.h.

Referenced by ApplyWorkerMain(), and pgoutput_startup().

◆ LOGICALREP_PROTO_VERSION_NUM

#define LOGICALREP_PROTO_VERSION_NUM   1

Definition at line 32 of file logicalproto.h.

Referenced by ApplyWorkerMain().

Typedef Documentation

◆ LogicalRepBeginData

◆ LogicalRepCommitData

◆ LogicalRepRelation

◆ LogicalRepRelId

Definition at line 57 of file logicalproto.h.

◆ LogicalRepTupleData

◆ LogicalRepTyp

typedef struct LogicalRepTyp LogicalRepTyp

Function Documentation

◆ logicalrep_read_begin()

void logicalrep_read_begin ( StringInfo  in,
LogicalRepBeginData begin_data 
)

Definition at line 59 of file proto.c.

References LogicalRepBeginData::committime, elog, ERROR, LogicalRepBeginData::final_lsn, InvalidXLogRecPtr, pq_getmsgint(), pq_getmsgint64(), and LogicalRepBeginData::xid.

Referenced by apply_handle_begin().

60 {
61  /* read fields */
62  begin_data->final_lsn = pq_getmsgint64(in);
63  if (begin_data->final_lsn == InvalidXLogRecPtr)
64  elog(ERROR, "final_lsn not set in begin message");
65  begin_data->committime = pq_getmsgint64(in);
66  begin_data->xid = pq_getmsgint(in, 4);
67 }
TransactionId xid
Definition: logicalproto.h:87
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define ERROR
Definition: elog.h:43
XLogRecPtr final_lsn
Definition: logicalproto.h:85
#define elog(elevel,...)
Definition: elog.h:214
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
TimestampTz committime
Definition: logicalproto.h:86

◆ logicalrep_read_commit()

void logicalrep_read_commit ( StringInfo  in,
LogicalRepCommitData commit_data 
)

Definition at line 94 of file proto.c.

References LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, elog, LogicalRepCommitData::end_lsn, ERROR, pq_getmsgbyte(), and pq_getmsgint64().

Referenced by apply_handle_commit().

95 {
96  /* read flags (unused for now) */
97  uint8 flags = pq_getmsgbyte(in);
98 
99  if (flags != 0)
100  elog(ERROR, "unrecognized flags %u in commit message", flags);
101 
102  /* read fields */
103  commit_data->commit_lsn = pq_getmsgint64(in);
104  commit_data->end_lsn = pq_getmsgint64(in);
105  commit_data->committime = pq_getmsgint64(in);
106 }
unsigned char uint8
Definition: c.h:373
#define ERROR
Definition: elog.h:43
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:214
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
XLogRecPtr commit_lsn
Definition: logicalproto.h:92
TimestampTz committime
Definition: logicalproto.h:94

◆ logicalrep_read_delete()

LogicalRepRelId logicalrep_read_delete ( StringInfo  in,
LogicalRepTupleData oldtup 
)

Definition at line 289 of file proto.c.

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_delete().

290 {
291  char action;
292  LogicalRepRelId relid;
293 
294  /* read the relation id */
295  relid = pq_getmsgint(in, 4);
296 
297  /* read and verify action */
298  action = pq_getmsgbyte(in);
299  if (action != 'K' && action != 'O')
300  elog(ERROR, "expected action 'O' or 'K', got %c", action);
301 
302  logicalrep_read_tuple(in, oldtup);
303 
304  return relid;
305 }
#define ERROR
Definition: elog.h:43
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:554
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:214
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
uint32 LogicalRepRelId
Definition: logicalproto.h:57

◆ logicalrep_read_insert()

LogicalRepRelId logicalrep_read_insert ( StringInfo  in,
LogicalRepTupleData newtup 
)

Definition at line 163 of file proto.c.

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_insert().

164 {
165  char action;
166  LogicalRepRelId relid;
167 
168  /* read the relation id */
169  relid = pq_getmsgint(in, 4);
170 
171  action = pq_getmsgbyte(in);
172  if (action != 'N')
173  elog(ERROR, "expected new tuple but got %d",
174  action);
175 
176  logicalrep_read_tuple(in, newtup);
177 
178  return relid;
179 }
#define ERROR
Definition: elog.h:43
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:554
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:214
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
uint32 LogicalRepRelId
Definition: logicalproto.h:57

◆ logicalrep_read_origin()

char* logicalrep_read_origin ( StringInfo  in,
XLogRecPtr origin_lsn 
)

Definition at line 128 of file proto.c.

References pq_getmsgint64(), pq_getmsgstring(), and pstrdup().

129 {
130  /* fixed fields */
131  *origin_lsn = pq_getmsgint64(in);
132 
133  /* return origin */
134  return pstrdup(pq_getmsgstring(in));
135 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * pstrdup(const char *in)
Definition: mcxt.c:1187
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455

◆ logicalrep_read_rel()

LogicalRepRelation* logicalrep_read_rel ( StringInfo  in)

Definition at line 397 of file proto.c.

References logicalrep_read_attrs(), logicalrep_read_namespace(), LogicalRepRelation::nspname, palloc(), pq_getmsgbyte(), pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepRelation::relname, LogicalRepRelation::remoteid, and LogicalRepRelation::replident.

Referenced by apply_handle_relation().

398 {
400 
401  rel->remoteid = pq_getmsgint(in, 4);
402 
403  /* Read relation name from stream */
405  rel->relname = pstrdup(pq_getmsgstring(in));
406 
407  /* Read the replica identity. */
408  rel->replident = pq_getmsgbyte(in);
409 
410  /* Get attribute description */
411  logicalrep_read_attrs(in, rel);
412 
413  return rel;
414 }
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * pstrdup(const char *in)
Definition: mcxt.c:1187
LogicalRepRelId remoteid
Definition: logicalproto.h:63
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
Definition: proto.c:679
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:741
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
void * palloc(Size size)
Definition: mcxt.c:950
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_stream_abort()

void logicalrep_read_stream_abort ( StringInfo  in,
TransactionId xid,
TransactionId subxid 
)

Definition at line 865 of file proto.c.

References Assert, and pq_getmsgint().

Referenced by apply_handle_stream_abort().

867 {
868  Assert(xid && subxid);
869 
870  *xid = pq_getmsgint(in, 4);
871  *subxid = pq_getmsgint(in, 4);
872 }
#define Assert(condition)
Definition: c.h:746
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_stream_commit()

TransactionId logicalrep_read_stream_commit ( StringInfo  out,
LogicalRepCommitData commit_data 
)

Definition at line 823 of file proto.c.

References LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, elog, LogicalRepCommitData::end_lsn, ERROR, pq_getmsgbyte(), pq_getmsgint(), and pq_getmsgint64().

Referenced by apply_handle_stream_commit().

824 {
825  TransactionId xid;
826  uint8 flags;
827 
828  xid = pq_getmsgint(in, 4);
829 
830  /* read flags (unused for now) */
831  flags = pq_getmsgbyte(in);
832 
833  if (flags != 0)
834  elog(ERROR, "unrecognized flags %u in commit message", flags);
835 
836  /* read fields */
837  commit_data->commit_lsn = pq_getmsgint64(in);
838  commit_data->end_lsn = pq_getmsgint64(in);
839  commit_data->committime = pq_getmsgint64(in);
840 
841  return xid;
842 }
uint32 TransactionId
Definition: c.h:521
unsigned char uint8
Definition: c.h:373
#define ERROR
Definition: elog.h:43
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:214
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
XLogRecPtr commit_lsn
Definition: logicalproto.h:92
TimestampTz committime
Definition: logicalproto.h:94

◆ logicalrep_read_stream_start()

TransactionId logicalrep_read_stream_start ( StringInfo  in,
bool first_segment 
)

Definition at line 773 of file proto.c.

References Assert, pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_stream_start().

774 {
775  TransactionId xid;
776 
777  Assert(first_segment);
778 
779  xid = pq_getmsgint(in, 4);
780  *first_segment = (pq_getmsgbyte(in) == 1);
781 
782  return xid;
783 }
uint32 TransactionId
Definition: c.h:521
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define Assert(condition)
Definition: c.h:746
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

◆ logicalrep_read_truncate()

List* logicalrep_read_truncate ( StringInfo  in,
bool cascade,
bool restart_seqs 
)

Definition at line 343 of file proto.c.

References i, lappend_oid(), NIL, pq_getmsgint(), TRUNCATE_CASCADE, and TRUNCATE_RESTART_SEQS.

Referenced by apply_handle_truncate().

345 {
346  int i;
347  int nrelids;
348  List *relids = NIL;
349  uint8 flags;
350 
351  nrelids = pq_getmsgint(in, 4);
352 
353  /* read and decode truncate flags */
354  flags = pq_getmsgint(in, 1);
355  *cascade = (flags & TRUNCATE_CASCADE) > 0;
356  *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
357 
358  for (i = 0; i < nrelids; i++)
359  relids = lappend_oid(relids, pq_getmsgint(in, 4));
360 
361  return relids;
362 }
#define NIL
Definition: pg_list.h:65
unsigned char uint8
Definition: c.h:373
List * lappend_oid(List *list, Oid datum)
Definition: list.c:357
int i
#define TRUNCATE_CASCADE
Definition: proto.c:28
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
Definition: pg_list.h:50
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:29

◆ logicalrep_read_typ()

void logicalrep_read_typ ( StringInfo  out,
LogicalRepTyp ltyp 
)

Definition at line 453 of file proto.c.

References logicalrep_read_namespace(), LogicalRepTyp::nspname, pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepTyp::remoteid, and LogicalRepTyp::typname.

Referenced by apply_handle_type().

454 {
455  ltyp->remoteid = pq_getmsgint(in, 4);
456 
457  /* Read type name from stream */
459  ltyp->typname = pstrdup(pq_getmsgstring(in));
460 }
char * typname
Definition: logicalproto.h:79
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * pstrdup(const char *in)
Definition: mcxt.c:1187
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:741
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
char * nspname
Definition: logicalproto.h:78

◆ logicalrep_read_update()

LogicalRepRelId logicalrep_read_update ( StringInfo  in,
bool has_oldtuple,
LogicalRepTupleData oldtup,
LogicalRepTupleData newtup 
)

Definition at line 218 of file proto.c.

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_update().

221 {
222  char action;
223  LogicalRepRelId relid;
224 
225  /* read the relation id */
226  relid = pq_getmsgint(in, 4);
227 
228  /* read and verify action */
229  action = pq_getmsgbyte(in);
230  if (action != 'K' && action != 'O' && action != 'N')
231  elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
232  action);
233 
234  /* check for old tuple */
235  if (action == 'K' || action == 'O')
236  {
237  logicalrep_read_tuple(in, oldtup);
238  *has_oldtuple = true;
239 
240  action = pq_getmsgbyte(in);
241  }
242  else
243  *has_oldtuple = false;
244 
245  /* check for new tuple */
246  if (action != 'N')
247  elog(ERROR, "expected action 'N', got %c",
248  action);
249 
250  logicalrep_read_tuple(in, newtup);
251 
252  return relid;
253 }
#define ERROR
Definition: elog.h:43
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:554
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
#define elog(elevel,...)
Definition: elog.h:214
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
uint32 LogicalRepRelId
Definition: logicalproto.h:57

◆ logicalrep_write_begin()

void logicalrep_write_begin ( StringInfo  out,
ReorderBufferTXN txn 
)

Definition at line 45 of file proto.c.

References ReorderBufferTXN::commit_time, ReorderBufferTXN::final_lsn, pq_sendbyte(), pq_sendint32(), pq_sendint64(), and ReorderBufferTXN::xid.

Referenced by pgoutput_begin_txn().

46 {
47  pq_sendbyte(out, 'B'); /* BEGIN */
48 
49  /* fixed fields */
50  pq_sendint64(out, txn->final_lsn);
51  pq_sendint64(out, txn->commit_time);
52  pq_sendint32(out, txn->xid);
53 }
TimestampTz commit_time
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
XLogRecPtr final_lsn
TransactionId xid

◆ logicalrep_write_commit()

void logicalrep_write_commit ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 74 of file proto.c.

References ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, pq_sendbyte(), and pq_sendint64().

Referenced by pgoutput_commit_txn().

76 {
77  uint8 flags = 0;
78 
79  pq_sendbyte(out, 'C'); /* sending COMMIT */
80 
81  /* send the flags field (unused for now) */
82  pq_sendbyte(out, flags);
83 
84  /* send fields */
85  pq_sendint64(out, commit_lsn);
86  pq_sendint64(out, txn->end_lsn);
87  pq_sendint64(out, txn->commit_time);
88 }
TimestampTz commit_time
unsigned char uint8
Definition: c.h:373
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
XLogRecPtr end_lsn

◆ logicalrep_write_delete()

void logicalrep_write_delete ( StringInfo  out,
TransactionId  xid,
Relation  rel,
HeapTuple  oldtuple,
bool  binary 
)

Definition at line 259 of file proto.c.

References Assert, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

261 {
262  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
263  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
264  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
265 
266  pq_sendbyte(out, 'D'); /* action DELETE */
267 
268  /* transaction ID (if not valid, we're not streaming) */
269  if (TransactionIdIsValid(xid))
270  pq_sendint32(out, xid);
271 
272  /* use Oid as relation identifier */
273  pq_sendint32(out, RelationGetRelid(rel));
274 
275  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
276  pq_sendbyte(out, 'O'); /* old tuple follows */
277  else
278  pq_sendbyte(out, 'K'); /* old key follows */
279 
280  logicalrep_write_tuple(out, rel, oldtuple, binary);
281 }
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
Definition: proto.c:466
Form_pg_class rd_rel
Definition: rel.h:109
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:746
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:456

◆ logicalrep_write_insert()

void logicalrep_write_insert ( StringInfo  out,
TransactionId  xid,
Relation  rel,
HeapTuple  newtuple,
bool  binary 
)

Definition at line 141 of file proto.c.

References logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

143 {
144  pq_sendbyte(out, 'I'); /* action INSERT */
145 
146  /* transaction ID (if not valid, we're not streaming) */
147  if (TransactionIdIsValid(xid))
148  pq_sendint32(out, xid);
149 
150  /* use Oid as relation identifier */
151  pq_sendint32(out, RelationGetRelid(rel));
152 
153  pq_sendbyte(out, 'N'); /* new tuple follows */
154  logicalrep_write_tuple(out, rel, newtuple, binary);
155 }
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
Definition: proto.c:466
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:456

◆ logicalrep_write_origin()

void logicalrep_write_origin ( StringInfo  out,
const char *  origin,
XLogRecPtr  origin_lsn 
)

Definition at line 112 of file proto.c.

References pq_sendbyte(), pq_sendint64(), and pq_sendstring().

Referenced by pgoutput_begin_txn(), and pgoutput_stream_start().

114 {
115  pq_sendbyte(out, 'O'); /* ORIGIN */
116 
117  /* fixed fields */
118  pq_sendint64(out, origin_lsn);
119 
120  /* origin string */
121  pq_sendstring(out, origin);
122 }
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161

◆ logicalrep_write_rel()

void logicalrep_write_rel ( StringInfo  out,
TransactionId  xid,
Relation  rel 
)

Definition at line 368 of file proto.c.

References logicalrep_write_attrs(), logicalrep_write_namespace(), pq_sendbyte(), pq_sendint32(), pq_sendstring(), RelationData::rd_rel, RelationGetNamespace, RelationGetRelationName, RelationGetRelid, relname, and TransactionIdIsValid.

Referenced by send_relation_and_attrs().

369 {
370  char *relname;
371 
372  pq_sendbyte(out, 'R'); /* sending RELATION */
373 
374  /* transaction ID (if not valid, we're not streaming) */
375  if (TransactionIdIsValid(xid))
376  pq_sendint32(out, xid);
377 
378  /* use Oid as relation identifier */
379  pq_sendint32(out, RelationGetRelid(rel));
380 
381  /* send qualified relation name */
383  relname = RelationGetRelationName(rel);
384  pq_sendstring(out, relname);
385 
386  /* send replica identity */
387  pq_sendbyte(out, rel->rd_rel->relreplident);
388 
389  /* send the attribute info */
390  logicalrep_write_attrs(out, rel);
391 }
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:721
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
Form_pg_class rd_rel
Definition: rel.h:109
NameData relname
Definition: pg_class.h:38
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static void logicalrep_write_attrs(StringInfo out, Relation rel)
Definition: proto.c:620
#define RelationGetRelationName(relation)
Definition: rel.h:490
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:456
#define RelationGetNamespace(relation)
Definition: rel.h:497

◆ logicalrep_write_stream_abort()

void logicalrep_write_stream_abort ( StringInfo  out,
TransactionId  xid,
TransactionId  subxid 
)

Definition at line 849 of file proto.c.

References Assert, pq_sendbyte(), pq_sendint32(), and TransactionIdIsValid.

Referenced by pgoutput_stream_abort().

851 {
852  pq_sendbyte(out, 'A'); /* action STREAM ABORT */
853 
855 
856  /* transaction ID */
857  pq_sendint32(out, xid);
858  pq_sendint32(out, subxid);
859 }
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:746
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ logicalrep_write_stream_commit()

void logicalrep_write_stream_commit ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 798 of file proto.c.

References Assert, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, pq_sendbyte(), pq_sendint32(), pq_sendint64(), TransactionIdIsValid, and ReorderBufferTXN::xid.

Referenced by pgoutput_stream_commit().

800 {
801  uint8 flags = 0;
802 
803  pq_sendbyte(out, 'c'); /* action STREAM COMMIT */
804 
806 
807  /* transaction ID */
808  pq_sendint32(out, txn->xid);
809 
810  /* send the flags field (unused for now) */
811  pq_sendbyte(out, flags);
812 
813  /* send fields */
814  pq_sendint64(out, commit_lsn);
815  pq_sendint64(out, txn->end_lsn);
816  pq_sendint64(out, txn->commit_time);
817 }
TimestampTz commit_time
unsigned char uint8
Definition: c.h:373
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
TransactionId xid
#define Assert(condition)
Definition: c.h:746
XLogRecPtr end_lsn
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ logicalrep_write_stream_start()

void logicalrep_write_stream_start ( StringInfo  out,
TransactionId  xid,
bool  first_segment 
)

Definition at line 755 of file proto.c.

References Assert, pq_sendbyte(), pq_sendint32(), and TransactionIdIsValid.

Referenced by pgoutput_stream_start().

757 {
758  pq_sendbyte(out, 'S'); /* action STREAM START */
759 
761 
762  /* transaction ID (we're starting to stream, so must be valid) */
763  pq_sendint32(out, xid);
764 
765  /* 1 if this is the first streaming segment for this xid */
766  pq_sendbyte(out, first_segment ? 1 : 0);
767 }
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:746
#define TransactionIdIsValid(xid)
Definition: transam.h:41

◆ logicalrep_write_stream_stop()

void logicalrep_write_stream_stop ( StringInfo  out)

Definition at line 789 of file proto.c.

References pq_sendbyte().

Referenced by pgoutput_stream_stop().

790 {
791  pq_sendbyte(out, 'E'); /* action STREAM END */
792 }
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161

◆ logicalrep_write_truncate()

void logicalrep_write_truncate ( StringInfo  out,
TransactionId  xid,
int  nrelids,
Oid  relids[],
bool  cascade,
bool  restart_seqs 
)

Definition at line 311 of file proto.c.

References i, pq_sendbyte(), pq_sendint32(), pq_sendint8(), TransactionIdIsValid, TRUNCATE_CASCADE, and TRUNCATE_RESTART_SEQS.

Referenced by pgoutput_truncate().

316 {
317  int i;
318  uint8 flags = 0;
319 
320  pq_sendbyte(out, 'T'); /* action TRUNCATE */
321 
322  /* transaction ID (if not valid, we're not streaming) */
323  if (TransactionIdIsValid(xid))
324  pq_sendint32(out, xid);
325 
326  pq_sendint32(out, nrelids);
327 
328  /* encode and send truncate flags */
329  if (cascade)
330  flags |= TRUNCATE_CASCADE;
331  if (restart_seqs)
332  flags |= TRUNCATE_RESTART_SEQS;
333  pq_sendint8(out, flags);
334 
335  for (i = 0; i < nrelids; i++)
336  pq_sendint32(out, relids[i]);
337 }
unsigned char uint8
Definition: c.h:373
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
int i
#define TRUNCATE_CASCADE
Definition: proto.c:28
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static void pq_sendint8(StringInfo buf, uint8 i)
Definition: pqformat.h:129
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:29

◆ logicalrep_write_typ()

void logicalrep_write_typ ( StringInfo  out,
TransactionId  xid,
Oid  typoid 
)

Definition at line 422 of file proto.c.

References elog, ERROR, getBaseType(), GETSTRUCT, HeapTupleIsValid, logicalrep_write_namespace(), NameStr, ObjectIdGetDatum, pq_sendbyte(), pq_sendint32(), pq_sendstring(), ReleaseSysCache(), SearchSysCache1(), TransactionIdIsValid, and TYPEOID.

Referenced by send_relation_and_attrs().

423 {
424  Oid basetypoid = getBaseType(typoid);
425  HeapTuple tup;
426  Form_pg_type typtup;
427 
428  pq_sendbyte(out, 'Y'); /* sending TYPE */
429 
430  /* transaction ID (if not valid, we're not streaming) */
431  if (TransactionIdIsValid(xid))
432  pq_sendint32(out, xid);
433 
434  tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
435  if (!HeapTupleIsValid(tup))
436  elog(ERROR, "cache lookup failed for type %u", basetypoid);
437  typtup = (Form_pg_type) GETSTRUCT(tup);
438 
439  /* use Oid as relation identifier */
440  pq_sendint32(out, typoid);
441 
442  /* send qualified type name */
443  logicalrep_write_namespace(out, typtup->typnamespace);
444  pq_sendstring(out, NameStr(typtup->typname));
445 
446  ReleaseSysCache(tup);
447 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:721
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
unsigned int Oid
Definition: postgres_ext.h:31
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1116
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
FormData_pg_type * Form_pg_type
Definition: pg_type.h:255
#define elog(elevel,...)
Definition: elog.h:214
#define NameStr(name)
Definition: c.h:623
#define TransactionIdIsValid(xid)
Definition: transam.h:41
Oid getBaseType(Oid typid)
Definition: lsyscache.c:2409

◆ logicalrep_write_update()

void logicalrep_write_update ( StringInfo  out,
TransactionId  xid,
Relation  rel,
HeapTuple  oldtuple,
HeapTuple  newtuple,
bool  binary 
)

Definition at line 185 of file proto.c.

References Assert, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

187 {
188  pq_sendbyte(out, 'U'); /* action UPDATE */
189 
190  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
191  rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
192  rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
193 
194  /* transaction ID (if not valid, we're not streaming) */
195  if (TransactionIdIsValid(xid))
196  pq_sendint32(out, xid);
197 
198  /* use Oid as relation identifier */
199  pq_sendint32(out, RelationGetRelid(rel));
200 
201  if (oldtuple != NULL)
202  {
203  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
204  pq_sendbyte(out, 'O'); /* old tuple follows */
205  else
206  pq_sendbyte(out, 'K'); /* old key follows */
207  logicalrep_write_tuple(out, rel, oldtuple, binary);
208  }
209 
210  pq_sendbyte(out, 'N'); /* new tuple follows */
211  logicalrep_write_tuple(out, rel, newtuple, binary);
212 }
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
Definition: proto.c:466
Form_pg_class rd_rel
Definition: rel.h:109
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
#define Assert(condition)
Definition: c.h:746
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define RelationGetRelid(relation)
Definition: rel.h:456