26#define LOGICALREP_IS_REPLICA_IDENTITY 1
28#define MESSAGE_TRANSACTIONAL (1<<0)
29#define TRUNCATE_CASCADE (1<<0)
30#define TRUNCATE_RESTART_SEQS (1<<1)
34 PublishGencolsType include_gencols_type);
38 PublishGencolsType include_gencols_type);
68 elog(
ERROR,
"final_lsn not set in begin message");
104 elog(
ERROR,
"unrecognized flags %u in commit message", flags);
139 elog(
ERROR,
"prepare_lsn not set in begin prepare message");
142 elog(
ERROR,
"end_lsn not set in begin prepare message");
206 elog(
ERROR,
"unrecognized flags %u in %s message", flags, msgtype);
211 elog(
ERROR,
"prepare_lsn is not set in %s message", msgtype);
214 elog(
ERROR,
"end_lsn is not set in %s message", msgtype);
218 elog(
ERROR,
"invalid two-phase transaction ID in %s message", msgtype);
273 elog(
ERROR,
"unrecognized flags %u in commit prepared message", flags);
278 elog(
ERROR,
"commit_lsn is not set in commit prepared message");
281 elog(
ERROR,
"end_lsn is not set in commit prepared message");
332 elog(
ERROR,
"unrecognized flags %u in rollback prepared message", flags);
337 elog(
ERROR,
"prepare_end_lsn is not set in rollback prepared message");
340 elog(
ERROR,
"rollback_end_lsn is not set in rollback prepared message");
406 PublishGencolsType include_gencols_type)
419 include_gencols_type);
438 elog(
ERROR,
"expected new tuple but got %d",
453 PublishGencolsType include_gencols_type)
457 Assert(rel->
rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
458 rel->
rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
459 rel->
rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
470 if (rel->
rd_rel->relreplident == REPLICA_IDENTITY_FULL)
475 include_gencols_type);
480 include_gencols_type);
500 elog(
ERROR,
"expected action 'N', 'O' or 'K', got %c",
507 *has_oldtuple =
true;
512 *has_oldtuple =
false;
516 elog(
ERROR,
"expected action 'N', got %c",
531 PublishGencolsType include_gencols_type)
533 Assert(rel->
rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
534 rel->
rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
535 rel->
rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
546 if (rel->
rd_rel->relreplident == REPLICA_IDENTITY_FULL)
552 include_gencols_type);
587 bool cascade,
bool restart_seqs)
607 for (
i = 0;
i < nrelids;
i++)
616 bool *cascade,
bool *restart_seqs)
630 for (
i = 0;
i < nrelids;
i++)
641 bool transactional,
const char *prefix,
Size sz,
669 PublishGencolsType include_gencols_type)
737 elog(
ERROR,
"cache lookup failed for type %u", basetypoid);
769 PublishGencolsType include_gencols_type)
784 include_gencols_type))
803 include_gencols_type))
825 elog(
ERROR,
"cache lookup failed for type %u", att->atttypid);
872 tuple->
ncols = natts;
875 for (
i = 0;
i < natts;
i++)
912 elog(
ERROR,
"unrecognized data representation type '%c'", kind);
922 PublishGencolsType include_gencols_type)
938 include_gencols_type))
946 replidentfull = (rel->
rd_rel->relreplident == REPLICA_IDENTITY_FULL);
957 include_gencols_type))
994 attnames =
palloc(natts *
sizeof(
char *));
998 for (
i = 0;
i < natts;
i++)
1029 if (
nspid == PG_CATALOG_NAMESPACE)
1035 if (nspname == NULL)
1036 elog(
ERROR,
"cache lookup failed for namespace %u",
1051 if (nspname[0] ==
'\0')
1052 nspname =
"pg_catalog";
1140 elog(
ERROR,
"unrecognized flags %u in commit message", flags);
1170 if (write_abort_info)
1186 bool read_abort_info)
1193 if (read_abort_info)
1211 static char err_unknown[20];
1236 return "BEGIN PREPARE";
1240 return "COMMIT PREPARED";
1242 return "ROLLBACK PREPARED";
1244 return "STREAM START";
1246 return "STREAM STOP";
1248 return "STREAM COMMIT";
1250 return "STREAM ABORT";
1252 return "STREAM PREPARE";
1280 PublishGencolsType include_gencols_type)
1282 if (att->attisdropped)
1290 if (!att->attgenerated)
1297 if (att->attgenerated == ATTRIBUTE_GENERATED_STORED)
1298 return include_gencols_type == PUBLISH_GENCOLS_STORED;
void bms_free(Bitmapset *a)
bool bms_is_member(int x, const Bitmapset *a)
Bitmapset * bms_add_member(Bitmapset *a, int x)
static Datum values[MAXATTR]
#define OidIsValid(objectId)
char * OidOutputFunctionCall(Oid functionId, Datum val)
bytea * OidSendFunctionCall(Oid functionId, Datum val)
Assert(PointerIsAligned(start, uint64))
#define HeapTupleIsValid(tuple)
static void * GETSTRUCT(const HeapTupleData *tuple)
List * lappend_oid(List *list, Oid datum)
#define LOGICALREP_COLUMN_UNCHANGED
@ LOGICAL_REP_MSG_TRUNCATE
@ LOGICAL_REP_MSG_STREAM_STOP
@ LOGICAL_REP_MSG_STREAM_PREPARE
@ LOGICAL_REP_MSG_STREAM_ABORT
@ LOGICAL_REP_MSG_BEGIN_PREPARE
@ LOGICAL_REP_MSG_STREAM_START
@ LOGICAL_REP_MSG_PREPARE
@ LOGICAL_REP_MSG_RELATION
@ LOGICAL_REP_MSG_MESSAGE
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
@ LOGICAL_REP_MSG_COMMIT_PREPARED
@ LOGICAL_REP_MSG_STREAM_COMMIT
#define LOGICALREP_COLUMN_NULL
#define LOGICALREP_COLUMN_BINARY
#define LOGICALREP_COLUMN_TEXT
Oid getBaseType(Oid typid)
char * get_namespace_name(Oid nspid)
char * pstrdup(const char *in)
void pfree(void *pointer)
void * palloc0(Size size)
FormData_pg_attribute * Form_pg_attribute
FormData_pg_type * Form_pg_type
size_t strlcpy(char *dst, const char *src, size_t siz)
static Datum ObjectIdGetDatum(Oid X)
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
#define TRUNCATE_RESTART_SEQS
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
static void logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid, XLogRecPtr abort_lsn, TimestampTz abort_time, bool write_abort_info)
static void logicalrep_read_prepare_common(StringInfo in, char *msgtype, LogicalRepPreparedTxnData *prepare_data)
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
#define MESSAGE_TRANSACTIONAL
char * logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
#define LOGICALREP_IS_REPLICA_IDENTITY
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
const char * logicalrep_message_type(LogicalRepMsgType action)
void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
bool logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns, PublishGencolsType include_gencols_type)
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
static const char * logicalrep_read_namespace(StringInfo in)
void logicalrep_write_stream_stop(StringInfo out)
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
#define RelationGetRelid(relation)
#define RelationGetDescr(relation)
#define RelationGetRelationName(relation)
#define RelationGetNamespace(relation)
Bitmapset * RelationGetIdentityKeyBitmap(Relation relation)
#define rbtxn_is_prepared(txn)
static void initStringInfoFromString(StringInfo str, char *data, int len)
XLogRecPtr prepare_end_lsn
XLogRecPtr rollback_end_lsn
TimestampTz rollback_time
StringInfoData * colvalues
union ReorderBufferTXN::@116 xact_time
#define FirstLowInvalidHeapAttributeNumber
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache1(int cacheId, Datum key1)
#define InvalidTransactionId
#define TransactionIdIsValid(xid)
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
static void slot_getallattrs(TupleTableSlot *slot)
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
#define InvalidXLogRecPtr