26#define LOGICALREP_IS_REPLICA_IDENTITY 1
28#define MESSAGE_TRANSACTIONAL (1<<0)
29#define TRUNCATE_CASCADE (1<<0)
30#define TRUNCATE_RESTART_SEQS (1<<1)
33 Bitmapset *columns,
bool include_gencols);
37 bool include_gencols);
67 elog(
ERROR,
"final_lsn not set in begin message");
103 elog(
ERROR,
"unrecognized flags %u in commit message", flags);
138 elog(
ERROR,
"prepare_lsn not set in begin prepare message");
141 elog(
ERROR,
"end_lsn not set in begin prepare message");
205 elog(
ERROR,
"unrecognized flags %u in %s message", flags, msgtype);
210 elog(
ERROR,
"prepare_lsn is not set in %s message", msgtype);
213 elog(
ERROR,
"end_lsn is not set in %s message", msgtype);
217 elog(
ERROR,
"invalid two-phase transaction ID in %s message", msgtype);
272 elog(
ERROR,
"unrecognized flags %u in commit prepared message", flags);
277 elog(
ERROR,
"commit_lsn is not set in commit prepared message");
280 elog(
ERROR,
"end_lsn is not set in commit prepared message");
331 elog(
ERROR,
"unrecognized flags %u in rollback prepared message", flags);
336 elog(
ERROR,
"prepare_end_lsn is not set in rollback prepared message");
339 elog(
ERROR,
"rollback_end_lsn is not set in rollback prepared message");
404 Bitmapset *columns,
bool include_gencols)
435 elog(
ERROR,
"expected new tuple but got %d",
449 bool binary,
Bitmapset *columns,
bool include_gencols)
453 Assert(rel->
rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
454 rel->
rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
455 rel->
rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
466 if (rel->
rd_rel->relreplident == REPLICA_IDENTITY_FULL)
495 elog(
ERROR,
"expected action 'N', 'O' or 'K', got %c",
502 *has_oldtuple =
true;
507 *has_oldtuple =
false;
511 elog(
ERROR,
"expected action 'N', got %c",
525 Bitmapset *columns,
bool include_gencols)
527 Assert(rel->
rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
528 rel->
rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
529 rel->
rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
540 if (rel->
rd_rel->relreplident == REPLICA_IDENTITY_FULL)
580 bool cascade,
bool restart_seqs)
600 for (
i = 0;
i < nrelids;
i++)
609 bool *cascade,
bool *restart_seqs)
623 for (
i = 0;
i < nrelids;
i++)
634 bool transactional,
const char *prefix,
Size sz,
661 Bitmapset *columns,
bool include_gencols)
729 elog(
ERROR,
"cache lookup failed for type %u", basetypoid);
760 bool binary,
Bitmapset *columns,
bool include_gencols)
814 elog(
ERROR,
"cache lookup failed for type %u", att->atttypid);
861 tuple->
ncols = natts;
864 for (
i = 0;
i < natts;
i++)
901 elog(
ERROR,
"unrecognized data representation type '%c'", kind);
911 bool include_gencols)
934 replidentfull = (rel->
rd_rel->relreplident == REPLICA_IDENTITY_FULL);
981 attnames =
palloc(natts *
sizeof(
char *));
985 for (
i = 0;
i < natts;
i++)
1016 if (
nspid == PG_CATALOG_NAMESPACE)
1022 if (nspname == NULL)
1023 elog(
ERROR,
"cache lookup failed for namespace %u",
1038 if (nspname[0] ==
'\0')
1039 nspname =
"pg_catalog";
1127 elog(
ERROR,
"unrecognized flags %u in commit message", flags);
1157 if (write_abort_info)
1173 bool read_abort_info)
1180 if (read_abort_info)
1198 static char err_unknown[20];
1223 return "BEGIN PREPARE";
1227 return "COMMIT PREPARED";
1229 return "ROLLBACK PREPARED";
1231 return "STREAM START";
1233 return "STREAM STOP";
1235 return "STREAM COMMIT";
1237 return "STREAM ABORT";
1239 return "STREAM PREPARE";
1266 bool include_gencols)
1268 if (att->attisdropped)
1276 return att->attgenerated ? include_gencols :
true;
void bms_free(Bitmapset *a)
bool bms_is_member(int x, const Bitmapset *a)
Bitmapset * bms_add_member(Bitmapset *a, int x)
static Datum values[MAXATTR]
#define Assert(condition)
#define OidIsValid(objectId)
char * OidOutputFunctionCall(Oid functionId, Datum val)
bytea * OidSendFunctionCall(Oid functionId, Datum val)
#define HeapTupleIsValid(tuple)
List * lappend_oid(List *list, Oid datum)
#define LOGICALREP_COLUMN_UNCHANGED
@ LOGICAL_REP_MSG_TRUNCATE
@ LOGICAL_REP_MSG_STREAM_STOP
@ LOGICAL_REP_MSG_STREAM_PREPARE
@ LOGICAL_REP_MSG_STREAM_ABORT
@ LOGICAL_REP_MSG_BEGIN_PREPARE
@ LOGICAL_REP_MSG_STREAM_START
@ LOGICAL_REP_MSG_PREPARE
@ LOGICAL_REP_MSG_RELATION
@ LOGICAL_REP_MSG_MESSAGE
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
@ LOGICAL_REP_MSG_COMMIT_PREPARED
@ LOGICAL_REP_MSG_STREAM_COMMIT
#define LOGICALREP_COLUMN_NULL
#define LOGICALREP_COLUMN_BINARY
#define LOGICALREP_COLUMN_TEXT
Oid getBaseType(Oid typid)
char * get_namespace_name(Oid nspid)
char * pstrdup(const char *in)
void pfree(void *pointer)
void * palloc0(Size size)
FormData_pg_attribute * Form_pg_attribute
FormData_pg_type * Form_pg_type
size_t strlcpy(char *dst, const char *src, size_t siz)
static Datum ObjectIdGetDatum(Oid X)
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
#define TRUNCATE_RESTART_SEQS
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
static void logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns, bool include_gencols)
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid, XLogRecPtr abort_lsn, TimestampTz abort_time, bool write_abort_info)
static void logicalrep_read_prepare_common(StringInfo in, char *msgtype, LogicalRepPreparedTxnData *prepare_data)
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns, bool include_gencols)
static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, bool binary, Bitmapset *columns, bool include_gencols)
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
#define MESSAGE_TRANSACTIONAL
char * logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
bool logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns, bool include_gencols)
#define LOGICALREP_IS_REPLICA_IDENTITY
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns, bool include_gencols)
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, bool include_gencols)
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns, bool include_gencols)
const char * logicalrep_message_type(LogicalRepMsgType action)
void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
static const char * logicalrep_read_namespace(StringInfo in)
void logicalrep_write_stream_stop(StringInfo out)
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
#define RelationGetRelid(relation)
#define RelationGetDescr(relation)
#define RelationGetRelationName(relation)
#define RelationGetNamespace(relation)
Bitmapset * RelationGetIdentityKeyBitmap(Relation relation)
#define rbtxn_prepared(txn)
static void initStringInfoFromString(StringInfo str, char *data, int len)
XLogRecPtr prepare_end_lsn
XLogRecPtr rollback_end_lsn
TimestampTz rollback_time
StringInfoData * colvalues
union ReorderBufferTXN::@115 xact_time
#define FirstLowInvalidHeapAttributeNumber
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache1(int cacheId, Datum key1)
#define InvalidTransactionId
#define TransactionIdIsValid(xid)
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
static void slot_getallattrs(TupleTableSlot *slot)
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
#define InvalidXLogRecPtr