PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
pgoutput.c File Reference
Include dependency graph for pgoutput.c:

Go to the source code of this file.

Data Structures

struct  RelationSyncEntry
 
struct  PGOutputTxnData
 

Macros

#define NUM_ROWFILTER_PUBACTIONS   (PUBACTION_DELETE+1)
 

Typedefs

typedef struct RelationSyncEntry RelationSyncEntry
 
typedef struct PGOutputTxnData PGOutputTxnData
 

Enumerations

enum  RowFilterPubAction { PUBACTION_INSERT , PUBACTION_UPDATE , PUBACTION_DELETE }
 

Functions

static void pgoutput_startup (LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
 
static void pgoutput_shutdown (LogicalDecodingContext *ctx)
 
static void pgoutput_begin_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_commit_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void pgoutput_truncate (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
static void pgoutput_message (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
static bool pgoutput_origin_filter (LogicalDecodingContext *ctx, RepOriginId origin_id)
 
static void pgoutput_begin_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static void pgoutput_commit_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_rollback_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
static void pgoutput_stream_start (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_stream_stop (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_stream_abort (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
 
static void pgoutput_stream_commit (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_stream_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static ListLoadPublications (List *pubnames)
 
static void publication_invalidation_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void send_repl_origin (LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin)
 
static void init_rel_sync_cache (MemoryContext cachectx)
 
static void cleanup_rel_sync_cache (TransactionId xid, bool is_commit)
 
static RelationSyncEntryget_rel_sync_entry (PGOutputData *data, Relation relation)
 
static void send_relation_and_attrs (Relation relation, TransactionId xid, LogicalDecodingContext *ctx, RelationSyncEntry *relentry)
 
static void rel_sync_cache_relation_cb (Datum arg, Oid relid)
 
static void rel_sync_cache_publication_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void set_schema_sent_in_streamed_txn (RelationSyncEntry *entry, TransactionId xid)
 
static bool get_schema_sent_in_streamed_txn (RelationSyncEntry *entry, TransactionId xid)
 
static void init_tuple_slot (PGOutputData *data, Relation relation, RelationSyncEntry *entry)
 
static EStatecreate_estate_for_relation (Relation rel)
 
static void pgoutput_row_filter_init (PGOutputData *data, List *publications, RelationSyncEntry *entry)
 
static bool pgoutput_row_filter_exec_expr (ExprState *state, ExprContext *econtext)
 
static bool pgoutput_row_filter (Relation relation, TupleTableSlot *old_slot, TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry, ReorderBufferChangeType *action)
 
static void pgoutput_column_list_init (PGOutputData *data, List *publications, RelationSyncEntry *entry)
 
void _PG_output_plugin_init (OutputPluginCallbacks *cb)
 
static void parse_output_parameters (List *options, PGOutputData *data)
 
static void pgoutput_send_begin (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void maybe_send_schema (LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
 
static void pgoutput_ensure_entry_cxt (PGOutputData *data, RelationSyncEntry *entry)
 
static void check_and_init_gencol (PGOutputData *data, List *publications, RelationSyncEntry *entry)
 

Variables

 PG_MODULE_MAGIC
 
static bool publications_valid
 
static HTABRelationSyncCache = NULL
 

Macro Definition Documentation

◆ NUM_ROWFILTER_PUBACTIONS

#define NUM_ROWFILTER_PUBACTIONS   (PUBACTION_DELETE+1)

Definition at line 103 of file pgoutput.c.

Typedef Documentation

◆ PGOutputTxnData

◆ RelationSyncEntry

Enumeration Type Documentation

◆ RowFilterPubAction

Enumerator
PUBACTION_INSERT 
PUBACTION_UPDATE 
PUBACTION_DELETE 

Definition at line 96 of file pgoutput.c.

97{
101};
@ PUBACTION_INSERT
Definition: pgoutput.c:98
@ PUBACTION_UPDATE
Definition: pgoutput.c:99
@ PUBACTION_DELETE
Definition: pgoutput.c:100

Function Documentation

◆ _PG_output_plugin_init()

void _PG_output_plugin_init ( OutputPluginCallbacks cb)

Definition at line 257 of file pgoutput.c.

258{
265
272
273 /* transaction streaming */
281 /* transaction streaming - two-phase commit */
283}
static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
Definition: pgoutput.c:1452
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:632
static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: pgoutput.c:428
static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Definition: pgoutput.c:1624
static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: pgoutput.c:649
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
Definition: pgoutput.c:1737
static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: pgoutput.c:677
static void pgoutput_shutdown(LogicalDecodingContext *ctx)
Definition: pgoutput.c:1756
static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
Definition: pgoutput.c:1853
static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: pgoutput.c:1914
static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:564
static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:1886
static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:663
static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:1832
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:1800
static void pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: pgoutput.c:1692
static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:600
LogicalDecodeStreamChangeCB stream_change_cb
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
LogicalDecodeStreamMessageCB stream_message_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeTruncateCB truncate_cb
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
LogicalDecodeStreamPrepareCB stream_prepare_cb
LogicalDecodeCommitPreparedCB commit_prepared_cb
LogicalDecodeStreamStartCB stream_start_cb
LogicalDecodePrepareCB prepare_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeCommitCB commit_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamAbortCB stream_abort_cb
LogicalDecodeBeginPrepareCB begin_prepare_cb
LogicalDecodeChangeCB change_cb
LogicalDecodeShutdownCB shutdown_cb

References OutputPluginCallbacks::begin_cb, OutputPluginCallbacks::begin_prepare_cb, OutputPluginCallbacks::change_cb, OutputPluginCallbacks::commit_cb, OutputPluginCallbacks::commit_prepared_cb, OutputPluginCallbacks::filter_by_origin_cb, OutputPluginCallbacks::message_cb, pgoutput_begin_prepare_txn(), pgoutput_begin_txn(), pgoutput_change(), pgoutput_commit_prepared_txn(), pgoutput_commit_txn(), pgoutput_message(), pgoutput_origin_filter(), pgoutput_prepare_txn(), pgoutput_rollback_prepared_txn(), pgoutput_shutdown(), pgoutput_startup(), pgoutput_stream_abort(), pgoutput_stream_commit(), pgoutput_stream_prepare_txn(), pgoutput_stream_start(), pgoutput_stream_stop(), pgoutput_truncate(), OutputPluginCallbacks::prepare_cb, OutputPluginCallbacks::rollback_prepared_cb, OutputPluginCallbacks::shutdown_cb, OutputPluginCallbacks::startup_cb, OutputPluginCallbacks::stream_abort_cb, OutputPluginCallbacks::stream_change_cb, OutputPluginCallbacks::stream_commit_cb, OutputPluginCallbacks::stream_message_cb, OutputPluginCallbacks::stream_prepare_cb, OutputPluginCallbacks::stream_start_cb, OutputPluginCallbacks::stream_stop_cb, OutputPluginCallbacks::stream_truncate_cb, and OutputPluginCallbacks::truncate_cb.

◆ check_and_init_gencol()

static void check_and_init_gencol ( PGOutputData data,
List publications,
RelationSyncEntry entry 
)
static

Definition at line 1033 of file pgoutput.c.

1035{
1037 TupleDesc desc = RelationGetDescr(relation);
1038 bool gencolpresent = false;
1039 bool first = true;
1040
1041 /* Check if there is any generated column present. */
1042 for (int i = 0; i < desc->natts; i++)
1043 {
1044 Form_pg_attribute att = TupleDescAttr(desc, i);
1045
1046 if (att->attgenerated)
1047 {
1048 gencolpresent = true;
1049 break;
1050 }
1051 }
1052
1053 /* There are no generated columns to be published. */
1054 if (!gencolpresent)
1055 {
1056 entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
1057 return;
1058 }
1059
1060 /*
1061 * There may be a conflicting value for 'publish_generated_columns'
1062 * parameter in the publications.
1063 */
1064 foreach_ptr(Publication, pub, publications)
1065 {
1066 /*
1067 * The column list takes precedence over the
1068 * 'publish_generated_columns' parameter. Those will be checked later,
1069 * see pgoutput_column_list_init.
1070 */
1071 if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL))
1072 continue;
1073
1074 if (first)
1075 {
1076 entry->include_gencols_type = pub->pubgencols_type;
1077 first = false;
1078 }
1079 else if (entry->include_gencols_type != pub->pubgencols_type)
1080 ereport(ERROR,
1081 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1082 errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
1084 RelationGetRelationName(relation)));
1085 }
1086}
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
int i
Definition: isn.c:72
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3405
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:200
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
bool check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt, Bitmapset **cols)
#define RelationGetDescr(relation)
Definition: rel.h:538
#define RelationGetRelationName(relation)
Definition: rel.h:546
#define RelationGetNamespace(relation)
Definition: rel.h:553
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2056
PublishGencolsType include_gencols_type
Definition: pgoutput.c:138
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:154

References check_and_fetch_column_list(), ereport, errcode(), errmsg(), ERROR, foreach_ptr, get_namespace_name(), i, RelationSyncEntry::include_gencols_type, TupleDescData::natts, RelationSyncEntry::publish_as_relid, RelationGetDescr, RelationGetNamespace, RelationGetRelationName, RelationIdGetRelation(), and TupleDescAttr().

Referenced by get_rel_sync_entry().

◆ cleanup_rel_sync_cache()

static void cleanup_rel_sync_cache ( TransactionId  xid,
bool  is_commit 
)
static

Definition at line 2309 of file pgoutput.c.

2310{
2311 HASH_SEQ_STATUS hash_seq;
2312 RelationSyncEntry *entry;
2313
2314 Assert(RelationSyncCache != NULL);
2315
2316 hash_seq_init(&hash_seq, RelationSyncCache);
2317 while ((entry = hash_seq_search(&hash_seq)) != NULL)
2318 {
2319 /*
2320 * We can set the schema_sent flag for an entry that has committed xid
2321 * in the list as that ensures that the subscriber would have the
2322 * corresponding schema and we don't need to send it unless there is
2323 * any invalidation for that relation.
2324 */
2325 foreach_xid(streamed_txn, entry->streamed_txns)
2326 {
2327 if (xid == streamed_txn)
2328 {
2329 if (is_commit)
2330 entry->schema_sent = true;
2331
2332 entry->streamed_txns =
2333 foreach_delete_current(entry->streamed_txns, streamed_txn);
2334 break;
2335 }
2336 }
2337 }
2338}
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1420
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1385
Assert(PointerIsAligned(start, uint64))
#define foreach_delete_current(lst, var_or_cell)
Definition: pg_list.h:391
#define foreach_xid(var, lst)
Definition: pg_list.h:472
static HTAB * RelationSyncCache
Definition: pgoutput.c:217
List * streamed_txns
Definition: pgoutput.c:139

References Assert(), foreach_delete_current, foreach_xid, hash_seq_init(), hash_seq_search(), RelationSyncCache, RelationSyncEntry::schema_sent, and RelationSyncEntry::streamed_txns.

Referenced by pgoutput_stream_abort(), and pgoutput_stream_commit().

◆ create_estate_for_relation()

static EState * create_estate_for_relation ( Relation  rel)
static

Definition at line 810 of file pgoutput.c.

811{
812 EState *estate;
813 RangeTblEntry *rte;
814 List *perminfos = NIL;
815
816 estate = CreateExecutorState();
817
818 rte = makeNode(RangeTblEntry);
819 rte->rtekind = RTE_RELATION;
820 rte->relid = RelationGetRelid(rel);
821 rte->relkind = rel->rd_rel->relkind;
822 rte->rellockmode = AccessShareLock;
823
824 addRTEPermissionInfo(&perminfos, rte);
825
826 ExecInitRangeTable(estate, list_make1(rte), perminfos,
828
829 estate->es_output_cid = GetCurrentCommandId(false);
830
831 return estate;
832}
Bitmapset * bms_make_singleton(int x)
Definition: bitmapset.c:216
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos, Bitmapset *unpruned_relids)
Definition: execUtils.c:775
EState * CreateExecutorState(void)
Definition: execUtils.c:88
#define AccessShareLock
Definition: lockdefs.h:36
#define makeNode(_type_)
Definition: nodes.h:155
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
@ RTE_RELATION
Definition: parsenodes.h:1026
#define NIL
Definition: pg_list.h:68
#define list_make1(x1)
Definition: pg_list.h:212
#define RelationGetRelid(relation)
Definition: rel.h:512
CommandId es_output_cid
Definition: execnodes.h:674
Definition: pg_list.h:54
RTEKind rtekind
Definition: parsenodes.h:1056
Form_pg_class rd_rel
Definition: rel.h:111
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:828

References AccessShareLock, addRTEPermissionInfo(), bms_make_singleton(), CreateExecutorState(), EState::es_output_cid, ExecInitRangeTable(), GetCurrentCommandId(), list_make1, makeNode, NIL, RelationData::rd_rel, RelationGetRelid, RangeTblEntry::relid, RTE_RELATION, and RangeTblEntry::rtekind.

Referenced by pgoutput_row_filter_init().

◆ get_rel_sync_entry()

static RelationSyncEntry * get_rel_sync_entry ( PGOutputData data,
Relation  relation 
)
static

Definition at line 2014 of file pgoutput.c.

2015{
2016 RelationSyncEntry *entry;
2017 bool found;
2018 MemoryContext oldctx;
2019 Oid relid = RelationGetRelid(relation);
2020
2021 Assert(RelationSyncCache != NULL);
2022
2023 /* Find cached relation info, creating if not found */
2025 &relid,
2026 HASH_ENTER, &found);
2027 Assert(entry != NULL);
2028
2029 /* initialize entry, if it's new */
2030 if (!found)
2031 {
2032 entry->replicate_valid = false;
2033 entry->schema_sent = false;
2034 entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
2035 entry->streamed_txns = NIL;
2036 entry->pubactions.pubinsert = entry->pubactions.pubupdate =
2037 entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
2038 entry->new_slot = NULL;
2039 entry->old_slot = NULL;
2040 memset(entry->exprstate, 0, sizeof(entry->exprstate));
2041 entry->entry_cxt = NULL;
2043 entry->columns = NULL;
2044 entry->attrmap = NULL;
2045 }
2046
2047 /* Validate the entry */
2048 if (!entry->replicate_valid)
2049 {
2050 Oid schemaId = get_rel_namespace(relid);
2051 List *pubids = GetRelationPublications(relid);
2052
2053 /*
2054 * We don't acquire a lock on the namespace system table as we build
2055 * the cache entry using a historic snapshot and all the later changes
2056 * are absorbed while decoding WAL.
2057 */
2058 List *schemaPubids = GetSchemaPublications(schemaId);
2059 ListCell *lc;
2060 Oid publish_as_relid = relid;
2061 int publish_ancestor_level = 0;
2062 bool am_partition = get_rel_relispartition(relid);
2063 char relkind = get_rel_relkind(relid);
2064 List *rel_publications = NIL;
2065
2066 /* Reload publications if needed before use. */
2067 if (!publications_valid)
2068 {
2069 MemoryContextReset(data->pubctx);
2070
2071 oldctx = MemoryContextSwitchTo(data->pubctx);
2072 data->publications = LoadPublications(data->publication_names);
2073 MemoryContextSwitchTo(oldctx);
2074 publications_valid = true;
2075 }
2076
2077 /*
2078 * Reset schema_sent status as the relation definition may have
2079 * changed. Also reset pubactions to empty in case rel was dropped
2080 * from a publication. Also free any objects that depended on the
2081 * earlier definition.
2082 */
2083 entry->schema_sent = false;
2084 entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
2085 list_free(entry->streamed_txns);
2086 entry->streamed_txns = NIL;
2087 bms_free(entry->columns);
2088 entry->columns = NULL;
2089 entry->pubactions.pubinsert = false;
2090 entry->pubactions.pubupdate = false;
2091 entry->pubactions.pubdelete = false;
2092 entry->pubactions.pubtruncate = false;
2093
2094 /*
2095 * Tuple slots cleanups. (Will be rebuilt later if needed).
2096 */
2097 if (entry->old_slot)
2098 {
2099 TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
2100
2101 Assert(desc->tdrefcount == -1);
2102
2104
2105 /*
2106 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2107 * do it now to avoid any leaks.
2108 */
2109 FreeTupleDesc(desc);
2110 }
2111 if (entry->new_slot)
2112 {
2113 TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
2114
2115 Assert(desc->tdrefcount == -1);
2116
2118
2119 /*
2120 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2121 * do it now to avoid any leaks.
2122 */
2123 FreeTupleDesc(desc);
2124 }
2125
2126 entry->old_slot = NULL;
2127 entry->new_slot = NULL;
2128
2129 if (entry->attrmap)
2130 free_attrmap(entry->attrmap);
2131 entry->attrmap = NULL;
2132
2133 /*
2134 * Row filter cache cleanups.
2135 */
2136 if (entry->entry_cxt)
2138
2139 entry->entry_cxt = NULL;
2140 entry->estate = NULL;
2141 memset(entry->exprstate, 0, sizeof(entry->exprstate));
2142
2143 /*
2144 * Build publication cache. We can't use one provided by relcache as
2145 * relcache considers all publications that the given relation is in,
2146 * but here we only need to consider ones that the subscriber
2147 * requested.
2148 */
2149 foreach(lc, data->publications)
2150 {
2151 Publication *pub = lfirst(lc);
2152 bool publish = false;
2153
2154 /*
2155 * Under what relid should we publish changes in this publication?
2156 * We'll use the top-most relid across all publications. Also
2157 * track the ancestor level for this publication.
2158 */
2159 Oid pub_relid = relid;
2160 int ancestor_level = 0;
2161
2162 /*
2163 * If this is a FOR ALL TABLES publication, pick the partition
2164 * root and set the ancestor level accordingly.
2165 */
2166 if (pub->alltables)
2167 {
2168 publish = true;
2169 if (pub->pubviaroot && am_partition)
2170 {
2171 List *ancestors = get_partition_ancestors(relid);
2172
2173 pub_relid = llast_oid(ancestors);
2174 ancestor_level = list_length(ancestors);
2175 }
2176 }
2177
2178 if (!publish)
2179 {
2180 bool ancestor_published = false;
2181
2182 /*
2183 * For a partition, check if any of the ancestors are
2184 * published. If so, note down the topmost ancestor that is
2185 * published via this publication, which will be used as the
2186 * relation via which to publish the partition's changes.
2187 */
2188 if (am_partition)
2189 {
2190 Oid ancestor;
2191 int level;
2192 List *ancestors = get_partition_ancestors(relid);
2193
2194 ancestor = GetTopMostAncestorInPublication(pub->oid,
2195 ancestors,
2196 &level);
2197
2198 if (ancestor != InvalidOid)
2199 {
2200 ancestor_published = true;
2201 if (pub->pubviaroot)
2202 {
2203 pub_relid = ancestor;
2204 ancestor_level = level;
2205 }
2206 }
2207 }
2208
2209 if (list_member_oid(pubids, pub->oid) ||
2210 list_member_oid(schemaPubids, pub->oid) ||
2211 ancestor_published)
2212 publish = true;
2213 }
2214
2215 /*
2216 * If the relation is to be published, determine actions to
2217 * publish, and list of columns, if appropriate.
2218 *
2219 * Don't publish changes for partitioned tables, because
2220 * publishing those of its partitions suffices, unless partition
2221 * changes won't be published due to pubviaroot being set.
2222 */
2223 if (publish &&
2224 (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2225 {
2230
2231 /*
2232 * We want to publish the changes as the top-most ancestor
2233 * across all publications. So we need to check if the already
2234 * calculated level is higher than the new one. If yes, we can
2235 * ignore the new value (as it's a child). Otherwise the new
2236 * value is an ancestor, so we keep it.
2237 */
2238 if (publish_ancestor_level > ancestor_level)
2239 continue;
2240
2241 /*
2242 * If we found an ancestor higher up in the tree, discard the
2243 * list of publications through which we replicate it, and use
2244 * the new ancestor.
2245 */
2246 if (publish_ancestor_level < ancestor_level)
2247 {
2248 publish_as_relid = pub_relid;
2249 publish_ancestor_level = ancestor_level;
2250
2251 /* reset the publication list for this relation */
2252 rel_publications = NIL;
2253 }
2254 else
2255 {
2256 /* Same ancestor level, has to be the same OID. */
2257 Assert(publish_as_relid == pub_relid);
2258 }
2259
2260 /* Track publications for this ancestor. */
2261 rel_publications = lappend(rel_publications, pub);
2262 }
2263 }
2264
2265 entry->publish_as_relid = publish_as_relid;
2266
2267 /*
2268 * Initialize the tuple slot, map, and row filter. These are only used
2269 * when publishing inserts, updates, or deletes.
2270 */
2271 if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2272 entry->pubactions.pubdelete)
2273 {
2274 /* Initialize the tuple slot and map */
2275 init_tuple_slot(data, relation, entry);
2276
2277 /* Initialize the row filter */
2278 pgoutput_row_filter_init(data, rel_publications, entry);
2279
2280 /* Check whether to publish generated columns. */
2281 check_and_init_gencol(data, rel_publications, entry);
2282
2283 /* Initialize the column list */
2284 pgoutput_column_list_init(data, rel_publications, entry);
2285 }
2286
2287 list_free(pubids);
2288 list_free(schemaPubids);
2289 list_free(rel_publications);
2290
2291 entry->replicate_valid = true;
2292 }
2293
2294 return entry;
2295}
void free_attrmap(AttrMap *map)
Definition: attmap.c:56
void bms_free(Bitmapset *a)
Definition: bitmapset.c:239
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1441
@ HASH_ENTER
Definition: hsearch.h:114
List * lappend(List *list, void *datum)
Definition: list.c:339
void list_free(List *list)
Definition: list.c:1546
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:722
bool get_rel_relispartition(Oid relid)
Definition: lsyscache.c:2066
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:2042
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1991
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:383
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
List * get_partition_ancestors(Oid relid)
Definition: partition.c:134
const void * data
#define lfirst(lc)
Definition: pg_list.h:172
static int list_length(const List *l)
Definition: pg_list.h:152
#define llast_oid(l)
Definition: pg_list.h:200
List * GetRelationPublications(Oid relid)
List * GetSchemaPublications(Oid schemaid)
Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
static List * LoadPublications(List *pubnames)
Definition: pgoutput.c:1769
static void init_tuple_slot(PGOutputData *data, Relation relation, RelationSyncEntry *entry)
Definition: pgoutput.c:1179
static void pgoutput_row_filter_init(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition: pgoutput.c:886
static void check_and_init_gencol(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition: pgoutput.c:1033
static void pgoutput_column_list_init(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition: pgoutput.c:1092
static bool publications_valid
Definition: pgoutput.c:83
#define InvalidOid
Definition: postgres_ext.h:37
unsigned int Oid
Definition: postgres_ext.h:32
PublicationActions pubactions
ExprState * exprstate[NUM_ROWFILTER_PUBACTIONS]
Definition: pgoutput.c:152
Bitmapset * columns
Definition: pgoutput.c:178
PublicationActions pubactions
Definition: pgoutput.c:143
TupleTableSlot * old_slot
Definition: pgoutput.c:155
bool replicate_valid
Definition: pgoutput.c:127
MemoryContext entry_cxt
Definition: pgoutput.c:184
EState * estate
Definition: pgoutput.c:153
TupleTableSlot * new_slot
Definition: pgoutput.c:154
AttrMap * attrmap
Definition: pgoutput.c:171
int tdrefcount
Definition: tupdesc.h:134
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:123
void FreeTupleDesc(TupleDesc tupdesc)
Definition: tupdesc.c:479

References Publication::alltables, Assert(), RelationSyncEntry::attrmap, bms_free(), check_and_init_gencol(), RelationSyncEntry::columns, data, RelationSyncEntry::entry_cxt, RelationSyncEntry::estate, ExecDropSingleTupleTableSlot(), RelationSyncEntry::exprstate, free_attrmap(), FreeTupleDesc(), get_partition_ancestors(), get_rel_namespace(), get_rel_relispartition(), get_rel_relkind(), GetRelationPublications(), GetSchemaPublications(), GetTopMostAncestorInPublication(), HASH_ENTER, hash_search(), RelationSyncEntry::include_gencols_type, init_tuple_slot(), InvalidOid, lappend(), lfirst, list_free(), list_length(), list_member_oid(), llast_oid, LoadPublications(), MemoryContextDelete(), MemoryContextReset(), MemoryContextSwitchTo(), RelationSyncEntry::new_slot, NIL, Publication::oid, RelationSyncEntry::old_slot, pgoutput_column_list_init(), pgoutput_row_filter_init(), RelationSyncEntry::pubactions, Publication::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, publications_valid, RelationSyncEntry::publish_as_relid, PublicationActions::pubtruncate, PublicationActions::pubupdate, Publication::pubviaroot, RelationGetRelid, RelationSyncCache, RelationSyncEntry::replicate_valid, RelationSyncEntry::schema_sent, RelationSyncEntry::streamed_txns, TupleDescData::tdrefcount, and TupleTableSlot::tts_tupleDescriptor.

Referenced by pgoutput_change(), and pgoutput_truncate().

◆ get_schema_sent_in_streamed_txn()

static bool get_schema_sent_in_streamed_txn ( RelationSyncEntry entry,
TransactionId  xid 
)
static

Definition at line 1983 of file pgoutput.c.

1984{
1985 return list_member_xid(entry->streamed_txns, xid);
1986}
bool list_member_xid(const List *list, TransactionId datum)
Definition: list.c:742

References list_member_xid(), and RelationSyncEntry::streamed_txns.

Referenced by maybe_send_schema().

◆ init_rel_sync_cache()

static void init_rel_sync_cache ( MemoryContext  cachectx)
static

Definition at line 1934 of file pgoutput.c.

1935{
1936 HASHCTL ctl;
1937 static bool relation_callbacks_registered = false;
1938
1939 /* Nothing to do if hash table already exists */
1940 if (RelationSyncCache != NULL)
1941 return;
1942
1943 /* Make a new hash table for the cache */
1944 ctl.keysize = sizeof(Oid);
1945 ctl.entrysize = sizeof(RelationSyncEntry);
1946 ctl.hcxt = cachectx;
1947
1948 RelationSyncCache = hash_create("logical replication output relation cache",
1949 128, &ctl,
1951
1952 Assert(RelationSyncCache != NULL);
1953
1954 /* No more to do if we already registered callbacks */
1955 if (relation_callbacks_registered)
1956 return;
1957
1958 /* We must update the cache entry for a relation after a relcache flush */
1960
1961 /*
1962 * Flush all cache entries after a pg_namespace change, in case it was a
1963 * schema rename affecting a relation being replicated.
1964 *
1965 * XXX: It is not a good idea to invalidate all the relation entries in
1966 * RelationSyncCache on schema rename. We can optimize it to invalidate
1967 * only the required relations by either having a specific invalidation
1968 * message containing impacted relations or by having schema information
1969 * in each RelationSyncCache entry and using hashvalue of pg_namespace.oid
1970 * passed to the callback.
1971 */
1972 CacheRegisterSyscacheCallback(NAMESPACEOID,
1974 (Datum) 0);
1975
1976 relation_callbacks_registered = true;
1977}
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
Definition: inval.c:1844
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1802
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:2394
struct RelationSyncEntry RelationSyncEntry
static void rel_sync_cache_relation_cb(Datum arg, Oid relid)
Definition: pgoutput.c:2344
uintptr_t Datum
Definition: postgres.h:69
tree ctl
Definition: radixtree.h:1838

References Assert(), CacheRegisterRelcacheCallback(), CacheRegisterSyscacheCallback(), ctl, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, rel_sync_cache_publication_cb(), rel_sync_cache_relation_cb(), and RelationSyncCache.

Referenced by pgoutput_startup().

◆ init_tuple_slot()

static void init_tuple_slot ( PGOutputData data,
Relation  relation,
RelationSyncEntry entry 
)
static

Definition at line 1179 of file pgoutput.c.

1181{
1182 MemoryContext oldctx;
1183 TupleDesc oldtupdesc;
1184 TupleDesc newtupdesc;
1185
1186 oldctx = MemoryContextSwitchTo(data->cachectx);
1187
1188 /*
1189 * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1190 * live as long as the cache remains.
1191 */
1192 oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1193 newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1194
1195 entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1196 entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
1197
1198 MemoryContextSwitchTo(oldctx);
1199
1200 /*
1201 * Cache the map that will be used to convert the relation's tuples into
1202 * the ancestor's format, if needed.
1203 */
1204 if (entry->publish_as_relid != RelationGetRelid(relation))
1205 {
1207 TupleDesc indesc = RelationGetDescr(relation);
1208 TupleDesc outdesc = RelationGetDescr(ancestor);
1209
1210 /* Map must live as long as the logical decoding context. */
1211 oldctx = MemoryContextSwitchTo(data->cachectx);
1212
1213 entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
1214
1215 MemoryContextSwitchTo(oldctx);
1216 RelationClose(ancestor);
1217 }
1218}
AttrMap * build_attrmap_by_name_if_req(TupleDesc indesc, TupleDesc outdesc, bool missing_ok)
Definition: attmap.c:261
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1425
const TupleTableSlotOps TTSOpsHeapTuple
Definition: execTuples.c:85
void RelationClose(Relation relation)
Definition: relcache.c:2178
TupleDesc CreateTupleDescCopyConstr(TupleDesc tupdesc)
Definition: tupdesc.c:322

References RelationSyncEntry::attrmap, build_attrmap_by_name_if_req(), CreateTupleDescCopyConstr(), data, MakeSingleTupleTableSlot(), MemoryContextSwitchTo(), RelationSyncEntry::new_slot, RelationSyncEntry::old_slot, RelationSyncEntry::publish_as_relid, RelationClose(), RelationGetDescr, RelationGetRelid, RelationIdGetRelation(), and TTSOpsHeapTuple.

Referenced by get_rel_sync_entry().

◆ LoadPublications()

static List * LoadPublications ( List pubnames)
static

Definition at line 1769 of file pgoutput.c.

1770{
1771 List *result = NIL;
1772 ListCell *lc;
1773
1774 foreach(lc, pubnames)
1775 {
1776 char *pubname = (char *) lfirst(lc);
1777 Publication *pub = GetPublicationByName(pubname, false);
1778
1779 result = lappend(result, pub);
1780 }
1781
1782 return result;
1783}
Publication * GetPublicationByName(const char *pubname, bool missing_ok)

References GetPublicationByName(), lappend(), lfirst, and NIL.

Referenced by get_rel_sync_entry().

◆ maybe_send_schema()

static void maybe_send_schema ( LogicalDecodingContext ctx,
ReorderBufferChange change,
Relation  relation,
RelationSyncEntry relentry 
)
static

Definition at line 695 of file pgoutput.c.

698{
700 bool schema_sent;
703
704 /*
705 * Remember XID of the (sub)transaction for the change. We don't care if
706 * it's top-level transaction or not (we have already sent that XID in
707 * start of the current streaming block).
708 *
709 * If we're not in a streaming block, just use InvalidTransactionId and
710 * the write methods will not include it.
711 */
712 if (data->in_streaming)
713 xid = change->txn->xid;
714
715 if (rbtxn_is_subtxn(change->txn))
716 topxid = rbtxn_get_toptxn(change->txn)->xid;
717 else
718 topxid = xid;
719
720 /*
721 * Do we need to send the schema? We do track streamed transactions
722 * separately, because those may be applied later (and the regular
723 * transactions won't see their effects until then) and in an order that
724 * we don't know at this point.
725 *
726 * XXX There is a scope of optimization here. Currently, we always send
727 * the schema first time in a streaming transaction but we can probably
728 * avoid that by checking 'relentry->schema_sent' flag. However, before
729 * doing that we need to study its impact on the case where we have a mix
730 * of streaming and non-streaming transactions.
731 */
732 if (data->in_streaming)
733 schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
734 else
735 schema_sent = relentry->schema_sent;
736
737 /* Nothing to do if we already sent the schema. */
738 if (schema_sent)
739 return;
740
741 /*
742 * Send the schema. If the changes will be published using an ancestor's
743 * schema, not the relation's own, send that ancestor's schema before
744 * sending relation's own (XXX - maybe sending only the former suffices?).
745 */
746 if (relentry->publish_as_relid != RelationGetRelid(relation))
747 {
748 Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
749
750 send_relation_and_attrs(ancestor, xid, ctx, relentry);
751 RelationClose(ancestor);
752 }
753
754 send_relation_and_attrs(relation, xid, ctx, relentry);
755
756 if (data->in_streaming)
757 set_schema_sent_in_streamed_txn(relentry, topxid);
758 else
759 relentry->schema_sent = true;
760}
uint32 TransactionId
Definition: c.h:623
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:76
static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx, RelationSyncEntry *relentry)
Definition: pgoutput.c:766
static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:1993
static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:1983
#define rbtxn_get_toptxn(txn)
#define rbtxn_is_subtxn(txn)
void * output_plugin_private
Definition: logical.h:76
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:84
TransactionId xid
#define InvalidTransactionId
Definition: transam.h:31

References data, get_schema_sent_in_streamed_txn(), if(), InvalidTransactionId, LogicalDecodingContext::output_plugin_private, RelationSyncEntry::publish_as_relid, rbtxn_get_toptxn, rbtxn_is_subtxn, RelationClose(), RelationGetRelid, RelationIdGetRelation(), RelationSyncEntry::schema_sent, send_relation_and_attrs(), set_schema_sent_in_streamed_txn(), ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by pgoutput_change(), and pgoutput_truncate().

◆ parse_output_parameters()

static void parse_output_parameters ( List options,
PGOutputData data 
)
static

Definition at line 286 of file pgoutput.c.

287{
288 ListCell *lc;
289 bool protocol_version_given = false;
290 bool publication_names_given = false;
291 bool binary_option_given = false;
292 bool messages_option_given = false;
293 bool streaming_given = false;
294 bool two_phase_option_given = false;
295 bool origin_option_given = false;
296
297 data->binary = false;
298 data->streaming = LOGICALREP_STREAM_OFF;
299 data->messages = false;
300 data->two_phase = false;
301
302 foreach(lc, options)
303 {
304 DefElem *defel = (DefElem *) lfirst(lc);
305
306 Assert(defel->arg == NULL || IsA(defel->arg, String));
307
308 /* Check each param, whether or not we recognize it */
309 if (strcmp(defel->defname, "proto_version") == 0)
310 {
311 unsigned long parsed;
312 char *endptr;
313
314 if (protocol_version_given)
316 (errcode(ERRCODE_SYNTAX_ERROR),
317 errmsg("conflicting or redundant options")));
318 protocol_version_given = true;
319
320 errno = 0;
321 parsed = strtoul(strVal(defel->arg), &endptr, 10);
322 if (errno != 0 || *endptr != '\0')
324 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
325 errmsg("invalid proto_version")));
326
327 if (parsed > PG_UINT32_MAX)
329 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
330 errmsg("proto_version \"%s\" out of range",
331 strVal(defel->arg))));
332
333 data->protocol_version = (uint32) parsed;
334 }
335 else if (strcmp(defel->defname, "publication_names") == 0)
336 {
337 if (publication_names_given)
339 (errcode(ERRCODE_SYNTAX_ERROR),
340 errmsg("conflicting or redundant options")));
341 publication_names_given = true;
342
343 if (!SplitIdentifierString(strVal(defel->arg), ',',
344 &data->publication_names))
346 (errcode(ERRCODE_INVALID_NAME),
347 errmsg("invalid publication_names syntax")));
348 }
349 else if (strcmp(defel->defname, "binary") == 0)
350 {
351 if (binary_option_given)
353 (errcode(ERRCODE_SYNTAX_ERROR),
354 errmsg("conflicting or redundant options")));
355 binary_option_given = true;
356
357 data->binary = defGetBoolean(defel);
358 }
359 else if (strcmp(defel->defname, "messages") == 0)
360 {
361 if (messages_option_given)
363 (errcode(ERRCODE_SYNTAX_ERROR),
364 errmsg("conflicting or redundant options")));
365 messages_option_given = true;
366
367 data->messages = defGetBoolean(defel);
368 }
369 else if (strcmp(defel->defname, "streaming") == 0)
370 {
371 if (streaming_given)
373 (errcode(ERRCODE_SYNTAX_ERROR),
374 errmsg("conflicting or redundant options")));
375 streaming_given = true;
376
377 data->streaming = defGetStreamingMode(defel);
378 }
379 else if (strcmp(defel->defname, "two_phase") == 0)
380 {
381 if (two_phase_option_given)
383 (errcode(ERRCODE_SYNTAX_ERROR),
384 errmsg("conflicting or redundant options")));
385 two_phase_option_given = true;
386
387 data->two_phase = defGetBoolean(defel);
388 }
389 else if (strcmp(defel->defname, "origin") == 0)
390 {
391 char *origin;
392
393 if (origin_option_given)
395 errcode(ERRCODE_SYNTAX_ERROR),
396 errmsg("conflicting or redundant options"));
397 origin_option_given = true;
398
399 origin = defGetString(defel);
400 if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
401 data->publish_no_origin = true;
402 else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
403 data->publish_no_origin = false;
404 else
406 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
407 errmsg("unrecognized origin value: \"%s\"", origin));
408 }
409 else
410 elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
411 }
412
413 /* Check required options */
414 if (!protocol_version_given)
416 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
417 errmsg("option \"%s\" missing", "proto_version"));
418 if (!publication_names_given)
420 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
421 errmsg("option \"%s\" missing", "publication_names"));
422}
#define PG_UINT32_MAX
Definition: c.h:561
uint32_t uint32
Definition: c.h:502
char * defGetString(DefElem *def)
Definition: define.c:35
bool defGetBoolean(DefElem *def)
Definition: define.c:94
#define elog(elevel,...)
Definition: elog.h:225
#define IsA(nodeptr, _type_)
Definition: nodes.h:158
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
char * defname
Definition: parsenodes.h:826
Node * arg
Definition: parsenodes.h:827
Definition: value.h:64
char defGetStreamingMode(DefElem *def)
#define strVal(v)
Definition: value.h:82
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3525

References DefElem::arg, Assert(), data, defGetBoolean(), defGetStreamingMode(), defGetString(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, IsA, lfirst, pg_strcasecmp(), PG_UINT32_MAX, SplitIdentifierString(), and strVal.

Referenced by pgoutput_startup().

◆ pgoutput_begin_prepare_txn()

static void pgoutput_begin_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 632 of file pgoutput.c.

633{
634 bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
635
636 OutputPluginPrepareWrite(ctx, !send_replication_origin);
638
639 send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
640 send_replication_origin);
641
642 OutputPluginWrite(ctx, true);
643}
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:703
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:690
#define InvalidRepOriginId
Definition: origin.h:33
static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin)
Definition: pgoutput.c:2420
void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:116
StringInfo out
Definition: logical.h:71
RepOriginId origin_id
XLogRecPtr origin_lsn

References InvalidRepOriginId, logicalrep_write_begin_prepare(), ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and send_repl_origin().

Referenced by _PG_output_plugin_init().

◆ pgoutput_begin_txn()

static void pgoutput_begin_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 564 of file pgoutput.c.

565{
567 sizeof(PGOutputTxnData));
568
569 txn->output_plugin_private = txndata;
570}
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1215
MemoryContext context
Definition: logical.h:36
void * output_plugin_private

References LogicalDecodingContext::context, MemoryContextAllocZero(), and ReorderBufferTXN::output_plugin_private.

Referenced by _PG_output_plugin_init().

◆ pgoutput_change()

static void pgoutput_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 1452 of file pgoutput.c.

1454{
1457 MemoryContext old;
1458 RelationSyncEntry *relentry;
1460 Relation ancestor = NULL;
1461 Relation targetrel = relation;
1463 TupleTableSlot *old_slot = NULL;
1464 TupleTableSlot *new_slot = NULL;
1465
1466 if (!is_publishable_relation(relation))
1467 return;
1468
1469 /*
1470 * Remember the xid for the change in streaming mode. We need to send xid
1471 * with each change in the streaming mode so that subscriber can make
1472 * their association and on aborts, it can discard the corresponding
1473 * changes.
1474 */
1475 if (data->in_streaming)
1476 xid = change->txn->xid;
1477
1478 relentry = get_rel_sync_entry(data, relation);
1479
1480 /* First check the table filter */
1481 switch (action)
1482 {
1484 if (!relentry->pubactions.pubinsert)
1485 return;
1486 break;
1488 if (!relentry->pubactions.pubupdate)
1489 return;
1490 break;
1492 if (!relentry->pubactions.pubdelete)
1493 return;
1494
1495 /*
1496 * This is only possible if deletes are allowed even when replica
1497 * identity is not defined for a table. Since the DELETE action
1498 * can't be published, we simply return.
1499 */
1500 if (!change->data.tp.oldtuple)
1501 {
1502 elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1503 return;
1504 }
1505 break;
1506 default:
1507 Assert(false);
1508 }
1509
1510 /* Avoid leaking memory by using and resetting our own context */
1511 old = MemoryContextSwitchTo(data->context);
1512
1513 /* Switch relation if publishing via root. */
1514 if (relentry->publish_as_relid != RelationGetRelid(relation))
1515 {
1516 Assert(relation->rd_rel->relispartition);
1517 ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1518 targetrel = ancestor;
1519 }
1520
1521 if (change->data.tp.oldtuple)
1522 {
1523 old_slot = relentry->old_slot;
1524 ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
1525
1526 /* Convert tuple if needed. */
1527 if (relentry->attrmap)
1528 {
1530 &TTSOpsVirtual);
1531
1532 old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1533 }
1534 }
1535
1536 if (change->data.tp.newtuple)
1537 {
1538 new_slot = relentry->new_slot;
1539 ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
1540
1541 /* Convert tuple if needed. */
1542 if (relentry->attrmap)
1543 {
1545 &TTSOpsVirtual);
1546
1547 new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1548 }
1549 }
1550
1551 /*
1552 * Check row filter.
1553 *
1554 * Updates could be transformed to inserts or deletes based on the results
1555 * of the row filter for old and new tuple.
1556 */
1557 if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1558 goto cleanup;
1559
1560 /*
1561 * Send BEGIN if we haven't yet.
1562 *
1563 * We send the BEGIN message after ensuring that we will actually send the
1564 * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1565 * transactions.
1566 */
1567 if (txndata && !txndata->sent_begin_txn)
1568 pgoutput_send_begin(ctx, txn);
1569
1570 /*
1571 * Schema should be sent using the original relation because it also sends
1572 * the ancestor's relation.
1573 */
1574 maybe_send_schema(ctx, change, relation, relentry);
1575
1576 OutputPluginPrepareWrite(ctx, true);
1577
1578 /* Send the data */
1579 switch (action)
1580 {
1582 logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1583 data->binary, relentry->columns,
1584 relentry->include_gencols_type);
1585 break;
1587 logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
1588 new_slot, data->binary, relentry->columns,
1589 relentry->include_gencols_type);
1590 break;
1592 logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
1593 data->binary, relentry->columns,
1594 relentry->include_gencols_type);
1595 break;
1596 default:
1597 Assert(false);
1598 }
1599
1600 OutputPluginWrite(ctx, true);
1601
1602cleanup:
1603 if (RelationIsValid(ancestor))
1604 {
1605 RelationClose(ancestor);
1606 ancestor = NULL;
1607 }
1608
1609 /* Drop the new slots that were used to store the converted tuples. */
1610 if (relentry->attrmap)
1611 {
1612 if (old_slot)
1614
1615 if (new_slot)
1617 }
1618
1620 MemoryContextReset(data->context);
1621}
static void cleanup(void)
Definition: bootstrap.c:713
#define DEBUG1
Definition: elog.h:30
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
TupleTableSlot * MakeTupleTableSlot(TupleDesc tupleDesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1299
TupleTableSlot * ExecStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1539
bool is_publishable_relation(Relation rel)
static void pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:578
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Relation relation)
Definition: pgoutput.c:2014
static void maybe_send_schema(LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
Definition: pgoutput.c:695
static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot, TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry, ReorderBufferChangeType *action)
Definition: pgoutput.c:1271
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:403
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:528
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:450
#define RelationIsValid(relation)
Definition: rel.h:485
ReorderBufferChangeType
Definition: reorderbuffer.h:51
@ REORDER_BUFFER_CHANGE_INSERT
Definition: reorderbuffer.h:52
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:54
@ REORDER_BUFFER_CHANGE_UPDATE
Definition: reorderbuffer.h:53
ReorderBufferChangeType action
Definition: reorderbuffer.h:81
union ReorderBufferChange::@110 data
struct ReorderBufferChange::@110::@111 tp
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:192

References generate_unaccent_rules::action, ReorderBufferChange::action, Assert(), RelationSyncEntry::attrmap, cleanup(), RelationSyncEntry::columns, ReorderBufferChange::data, data, DEBUG1, elog, ExecDropSingleTupleTableSlot(), ExecStoreHeapTuple(), execute_attr_map_slot(), get_rel_sync_entry(), RelationSyncEntry::include_gencols_type, InvalidTransactionId, is_publishable_relation(), logicalrep_write_delete(), logicalrep_write_insert(), logicalrep_write_update(), MakeTupleTableSlot(), maybe_send_schema(), MemoryContextReset(), MemoryContextSwitchTo(), RelationSyncEntry::new_slot, ReorderBufferChange::newtuple, RelationSyncEntry::old_slot, ReorderBufferChange::oldtuple, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pgoutput_row_filter(), pgoutput_send_begin(), RelationSyncEntry::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, RelationSyncEntry::publish_as_relid, PublicationActions::pubupdate, RelationData::rd_rel, RelationClose(), RelationGetDescr, RelationGetRelid, RelationIdGetRelation(), RelationIsValid, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::tp, TTSOpsVirtual, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_column_list_init()

static void pgoutput_column_list_init ( PGOutputData data,
List publications,
RelationSyncEntry entry 
)
static

Definition at line 1092 of file pgoutput.c.

1094{
1095 ListCell *lc;
1096 bool first = true;
1098 bool found_pub_collist = false;
1099 Bitmapset *relcols = NULL;
1100
1102
1103 /*
1104 * Find if there are any column lists for this relation. If there are,
1105 * build a bitmap using the column lists.
1106 *
1107 * Multiple publications might have multiple column lists for this
1108 * relation.
1109 *
1110 * Note that we don't support the case where the column list is different
1111 * for the same table when combining publications. See comments atop
1112 * fetch_table_list. But one can later change the publication so we still
1113 * need to check all the given publication-table mappings and report an
1114 * error if any publications have a different column list.
1115 */
1116 foreach(lc, publications)
1117 {
1118 Publication *pub = lfirst(lc);
1119 Bitmapset *cols = NULL;
1120
1121 /* Retrieve the bitmap of columns for a column list publication. */
1122 found_pub_collist |= check_and_fetch_column_list(pub,
1123 entry->publish_as_relid,
1124 entry->entry_cxt, &cols);
1125
1126 /*
1127 * For non-column list publications — e.g. TABLE (without a column
1128 * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
1129 * of the table (including generated columns when
1130 * 'publish_generated_columns' parameter is true).
1131 */
1132 if (!cols)
1133 {
1134 /*
1135 * Cache the table columns for the first publication with no
1136 * specified column list to detect publication with a different
1137 * column list.
1138 */
1139 if (!relcols && (list_length(publications) > 1))
1140 {
1142
1143 relcols = pub_form_cols_map(relation,
1144 entry->include_gencols_type);
1145 MemoryContextSwitchTo(oldcxt);
1146 }
1147
1148 cols = relcols;
1149 }
1150
1151 if (first)
1152 {
1153 entry->columns = cols;
1154 first = false;
1155 }
1156 else if (!bms_equal(entry->columns, cols))
1157 ereport(ERROR,
1158 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1159 errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
1161 RelationGetRelationName(relation)));
1162 } /* loop all subscribed publications */
1163
1164 /*
1165 * If no column list publications exist, columns to be published will be
1166 * computed later according to the 'publish_generated_columns' parameter.
1167 */
1168 if (!found_pub_collist)
1169 entry->columns = NULL;
1170
1171 RelationClose(relation);
1172}
bool bms_equal(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:142
Bitmapset * pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type)
static void pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
Definition: pgoutput.c:864

References bms_equal(), check_and_fetch_column_list(), RelationSyncEntry::columns, data, RelationSyncEntry::entry_cxt, ereport, errcode(), errmsg(), ERROR, get_namespace_name(), RelationSyncEntry::include_gencols_type, lfirst, list_length(), MemoryContextSwitchTo(), pgoutput_ensure_entry_cxt(), pub_form_cols_map(), RelationSyncEntry::publish_as_relid, RelationClose(), RelationGetNamespace, RelationGetRelationName, and RelationIdGetRelation().

Referenced by get_rel_sync_entry().

◆ pgoutput_commit_prepared_txn()

static void pgoutput_commit_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 663 of file pgoutput.c.

665{
666 OutputPluginUpdateProgress(ctx, false);
667
668 OutputPluginPrepareWrite(ctx, true);
669 logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
670 OutputPluginWrite(ctx, true);
671}
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact)
Definition: logical.c:716
void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:237

References logicalrep_write_commit_prepared(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

◆ pgoutput_commit_txn()

static void pgoutput_commit_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 600 of file pgoutput.c.

602{
604 bool sent_begin_txn;
605
606 Assert(txndata);
607
608 /*
609 * We don't need to send the commit message unless some relevant change
610 * from this transaction has been sent to the downstream.
611 */
612 sent_begin_txn = txndata->sent_begin_txn;
613 OutputPluginUpdateProgress(ctx, !sent_begin_txn);
614 pfree(txndata);
615 txn->output_plugin_private = NULL;
616
617 if (!sent_begin_txn)
618 {
619 elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
620 return;
621 }
622
623 OutputPluginPrepareWrite(ctx, true);
624 logicalrep_write_commit(ctx->out, txn, commit_lsn);
625 OutputPluginWrite(ctx, true);
626}
void pfree(void *pointer)
Definition: mcxt.c:1524
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:78
bool sent_begin_txn
Definition: pgoutput.c:213

References Assert(), DEBUG1, elog, logicalrep_write_commit(), LogicalDecodingContext::out, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), OutputPluginWrite(), pfree(), PGOutputTxnData::sent_begin_txn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_ensure_entry_cxt()

static void pgoutput_ensure_entry_cxt ( PGOutputData data,
RelationSyncEntry entry 
)
static

Definition at line 864 of file pgoutput.c.

865{
866 Relation relation;
867
868 /* The context may already exist, in which case bail out. */
869 if (entry->entry_cxt)
870 return;
871
872 relation = RelationIdGetRelation(entry->publish_as_relid);
873
874 entry->entry_cxt = AllocSetContextCreate(data->cachectx,
875 "entry private context",
877
879 RelationGetRelationName(relation));
880}
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_SMALL_SIZES
Definition: memutils.h:170
#define MemoryContextCopyAndSetIdentifier(cxt, id)
Definition: memutils.h:101

References ALLOCSET_SMALL_SIZES, AllocSetContextCreate, data, RelationSyncEntry::entry_cxt, MemoryContextCopyAndSetIdentifier, RelationSyncEntry::publish_as_relid, RelationGetRelationName, and RelationIdGetRelation().

Referenced by pgoutput_column_list_init(), and pgoutput_row_filter_init().

◆ pgoutput_message()

static void pgoutput_message ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  message_lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)
static

Definition at line 1692 of file pgoutput.c.

1695{
1698
1699 if (!data->messages)
1700 return;
1701
1702 /*
1703 * Remember the xid for the message in streaming mode. See
1704 * pgoutput_change.
1705 */
1706 if (data->in_streaming)
1707 xid = txn->xid;
1708
1709 /*
1710 * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1711 */
1712 if (transactional)
1713 {
1715
1716 /* Send BEGIN if we haven't yet */
1717 if (txndata && !txndata->sent_begin_txn)
1718 pgoutput_send_begin(ctx, txn);
1719 }
1720
1721 OutputPluginPrepareWrite(ctx, true);
1723 xid,
1724 message_lsn,
1725 transactional,
1726 prefix,
1727 sz,
1728 message);
1729 OutputPluginWrite(ctx, true);
1730}
void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: proto.c:640

References data, if(), InvalidTransactionId, logicalrep_write_message(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pgoutput_send_begin(), PGOutputTxnData::sent_begin_txn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_origin_filter()

static bool pgoutput_origin_filter ( LogicalDecodingContext ctx,
RepOriginId  origin_id 
)
static

Definition at line 1737 of file pgoutput.c.

1739{
1741
1742 if (data->publish_no_origin && origin_id != InvalidRepOriginId)
1743 return true;
1744
1745 return false;
1746}

References data, if(), InvalidRepOriginId, and LogicalDecodingContext::output_plugin_private.

Referenced by _PG_output_plugin_init().

◆ pgoutput_prepare_txn()

static void pgoutput_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 649 of file pgoutput.c.

651{
652 OutputPluginUpdateProgress(ctx, false);
653
654 OutputPluginPrepareWrite(ctx, true);
655 logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
656 OutputPluginWrite(ctx, true);
657}
void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:187

References logicalrep_write_prepare(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

◆ pgoutput_rollback_prepared_txn()

static void pgoutput_rollback_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_end_lsn,
TimestampTz  prepare_time 
)
static

Definition at line 677 of file pgoutput.c.

681{
682 OutputPluginUpdateProgress(ctx, false);
683
684 OutputPluginPrepareWrite(ctx, true);
685 logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
686 prepare_time);
687 OutputPluginWrite(ctx, true);
688}
void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: proto.c:293

References logicalrep_write_rollback_prepared(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

◆ pgoutput_row_filter()

static bool pgoutput_row_filter ( Relation  relation,
TupleTableSlot old_slot,
TupleTableSlot **  new_slot_ptr,
RelationSyncEntry entry,
ReorderBufferChangeType action 
)
static

Definition at line 1271 of file pgoutput.c.

1274{
1275 TupleDesc desc;
1276 int i;
1277 bool old_matched,
1278 new_matched,
1279 result;
1280 TupleTableSlot *tmp_new_slot;
1281 TupleTableSlot *new_slot = *new_slot_ptr;
1282 ExprContext *ecxt;
1283 ExprState *filter_exprstate;
1284
1285 /*
1286 * We need this map to avoid relying on ReorderBufferChangeType enums
1287 * having specific values.
1288 */
1289 static const int map_changetype_pubaction[] = {
1293 };
1294
1298
1299 Assert(new_slot || old_slot);
1300
1301 /* Get the corresponding row filter */
1302 filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1303
1304 /* Bail out if there is no row filter */
1305 if (!filter_exprstate)
1306 return true;
1307
1308 elog(DEBUG3, "table \"%s.%s\" has row filter",
1310 RelationGetRelationName(relation));
1311
1313
1314 ecxt = GetPerTupleExprContext(entry->estate);
1315
1316 /*
1317 * For the following occasions where there is only one tuple, we can
1318 * evaluate the row filter for that tuple and return.
1319 *
1320 * For inserts, we only have the new tuple.
1321 *
1322 * For updates, we can have only a new tuple when none of the replica
1323 * identity columns changed and none of those columns have external data
1324 * but we still need to evaluate the row filter for the new tuple as the
1325 * existing values of those columns might not match the filter. Also,
1326 * users can use constant expressions in the row filter, so we anyway need
1327 * to evaluate it for the new tuple.
1328 *
1329 * For deletes, we only have the old tuple.
1330 */
1331 if (!new_slot || !old_slot)
1332 {
1333 ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1334 result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1335
1336 return result;
1337 }
1338
1339 /*
1340 * Both the old and new tuples must be valid only for updates and need to
1341 * be checked against the row filter.
1342 */
1343 Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1344
1345 slot_getallattrs(new_slot);
1346 slot_getallattrs(old_slot);
1347
1348 tmp_new_slot = NULL;
1349 desc = RelationGetDescr(relation);
1350
1351 /*
1352 * The new tuple might not have all the replica identity columns, in which
1353 * case it needs to be copied over from the old tuple.
1354 */
1355 for (i = 0; i < desc->natts; i++)
1356 {
1358
1359 /*
1360 * if the column in the new tuple or old tuple is null, nothing to do
1361 */
1362 if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1363 continue;
1364
1365 /*
1366 * Unchanged toasted replica identity columns are only logged in the
1367 * old tuple. Copy this over to the new tuple. The changed (or WAL
1368 * Logged) toast values are always assembled in memory and set as
1369 * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1370 */
1371 if (att->attlen == -1 &&
1374 {
1375 if (!tmp_new_slot)
1376 {
1377 tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
1378 ExecClearTuple(tmp_new_slot);
1379
1380 memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1381 desc->natts * sizeof(Datum));
1382 memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1383 desc->natts * sizeof(bool));
1384 }
1385
1386 tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1387 tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1388 }
1389 }
1390
1391 ecxt->ecxt_scantuple = old_slot;
1392 old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1393
1394 if (tmp_new_slot)
1395 {
1396 ExecStoreVirtualTuple(tmp_new_slot);
1397 ecxt->ecxt_scantuple = tmp_new_slot;
1398 }
1399 else
1400 ecxt->ecxt_scantuple = new_slot;
1401
1402 new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1403
1404 /*
1405 * Case 1: if both tuples don't match the row filter, bailout. Send
1406 * nothing.
1407 */
1408 if (!old_matched && !new_matched)
1409 return false;
1410
1411 /*
1412 * Case 2: if the old tuple doesn't satisfy the row filter but the new
1413 * tuple does, transform the UPDATE into INSERT.
1414 *
1415 * Use the newly transformed tuple that must contain the column values for
1416 * all the replica identity columns. This is required to ensure that the
1417 * while inserting the tuple in the downstream node, we have all the
1418 * required column values.
1419 */
1420 if (!old_matched && new_matched)
1421 {
1423
1424 if (tmp_new_slot)
1425 *new_slot_ptr = tmp_new_slot;
1426 }
1427
1428 /*
1429 * Case 3: if the old tuple satisfies the row filter but the new tuple
1430 * doesn't, transform the UPDATE into DELETE.
1431 *
1432 * This transformation does not require another tuple. The Old tuple will
1433 * be used for DELETE.
1434 */
1435 else if (old_matched && !new_matched)
1437
1438 /*
1439 * Case 4: if both tuples match the row filter, transformation isn't
1440 * required. (*action is default UPDATE).
1441 */
1442
1443 return true;
1444}
#define DEBUG3
Definition: elog.h:28
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1739
#define ResetPerTupleExprContext(estate)
Definition: executor.h:646
#define GetPerTupleExprContext(estate)
Definition: executor.h:637
static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
Definition: pgoutput.c:841
int16 attlen
Definition: tupdesc.h:71
TupleTableSlot * ecxt_scantuple
Definition: execnodes.h:268
bool * tts_isnull
Definition: tuptable.h:127
Datum * tts_values
Definition: tuptable.h:125
static CompactAttribute * TupleDescCompactAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:169
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:454
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:368
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: varatt.h:290

References generate_unaccent_rules::action, Assert(), CompactAttribute::attlen, DEBUG3, ExprContext::ecxt_scantuple, elog, RelationSyncEntry::estate, ExecClearTuple(), ExecStoreVirtualTuple(), RelationSyncEntry::exprstate, get_namespace_name(), GetPerTupleExprContext, i, MakeSingleTupleTableSlot(), TupleDescData::natts, pgoutput_row_filter_exec_expr(), PUBACTION_DELETE, PUBACTION_INSERT, PUBACTION_UPDATE, RelationGetDescr, RelationGetNamespace, RelationGetRelationName, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, ResetPerTupleExprContext, slot_getallattrs(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, TTSOpsVirtual, TupleDescCompactAttr(), and VARATT_IS_EXTERNAL_ONDISK.

Referenced by pgoutput_change().

◆ pgoutput_row_filter_exec_expr()

static bool pgoutput_row_filter_exec_expr ( ExprState state,
ExprContext econtext 
)
static

Definition at line 841 of file pgoutput.c.

842{
843 Datum ret;
844 bool isnull;
845
846 Assert(state != NULL);
847
848 ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
849
850 elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
851 isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
852 isnull ? "true" : "false");
853
854 if (isnull)
855 return false;
856
857 return DatumGetBool(ret);
858}
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:417
static bool DatumGetBool(Datum X)
Definition: postgres.h:95
Definition: regguts.h:323

References Assert(), DatumGetBool(), DEBUG3, elog, and ExecEvalExprSwitchContext().

Referenced by pgoutput_row_filter().

◆ pgoutput_row_filter_init()

static void pgoutput_row_filter_init ( PGOutputData data,
List publications,
RelationSyncEntry entry 
)
static

Definition at line 886 of file pgoutput.c.

888{
889 ListCell *lc;
890 List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
891 bool no_filter[] = {false, false, false}; /* One per pubaction */
892 MemoryContext oldctx;
893 int idx;
894 bool has_filter = true;
895 Oid schemaid = get_rel_namespace(entry->publish_as_relid);
896
897 /*
898 * Find if there are any row filters for this relation. If there are, then
899 * prepare the necessary ExprState and cache it in entry->exprstate. To
900 * build an expression state, we need to ensure the following:
901 *
902 * All the given publication-table mappings must be checked.
903 *
904 * Multiple publications might have multiple row filters for this
905 * relation. Since row filter usage depends on the DML operation, there
906 * are multiple lists (one for each operation) to which row filters will
907 * be appended.
908 *
909 * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
910 * expression" so it takes precedence.
911 */
912 foreach(lc, publications)
913 {
914 Publication *pub = lfirst(lc);
915 HeapTuple rftuple = NULL;
916 Datum rfdatum = 0;
917 bool pub_no_filter = true;
918
919 /*
920 * If the publication is FOR ALL TABLES, or the publication includes a
921 * FOR TABLES IN SCHEMA where the table belongs to the referred
922 * schema, then it is treated the same as if there are no row filters
923 * (even if other publications have a row filter).
924 */
925 if (!pub->alltables &&
926 !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
927 ObjectIdGetDatum(schemaid),
928 ObjectIdGetDatum(pub->oid)))
929 {
930 /*
931 * Check for the presence of a row filter in this publication.
932 */
933 rftuple = SearchSysCache2(PUBLICATIONRELMAP,
935 ObjectIdGetDatum(pub->oid));
936
937 if (HeapTupleIsValid(rftuple))
938 {
939 /* Null indicates no filter. */
940 rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
941 Anum_pg_publication_rel_prqual,
942 &pub_no_filter);
943 }
944 }
945
946 if (pub_no_filter)
947 {
948 if (rftuple)
949 ReleaseSysCache(rftuple);
950
951 no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
952 no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
953 no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
954
955 /*
956 * Quick exit if all the DML actions are publicized via this
957 * publication.
958 */
959 if (no_filter[PUBACTION_INSERT] &&
960 no_filter[PUBACTION_UPDATE] &&
961 no_filter[PUBACTION_DELETE])
962 {
963 has_filter = false;
964 break;
965 }
966
967 /* No additional work for this publication. Next one. */
968 continue;
969 }
970
971 /* Form the per pubaction row filter lists. */
972 if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
973 rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
974 TextDatumGetCString(rfdatum));
975 if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
976 rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
977 TextDatumGetCString(rfdatum));
978 if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
979 rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
980 TextDatumGetCString(rfdatum));
981
982 ReleaseSysCache(rftuple);
983 } /* loop all subscribed publications */
984
985 /* Clean the row filter */
986 for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
987 {
988 if (no_filter[idx])
989 {
990 list_free_deep(rfnodes[idx]);
991 rfnodes[idx] = NIL;
992 }
993 }
994
995 if (has_filter)
996 {
998
1000
1001 /*
1002 * Now all the filters for all pubactions are known. Combine them when
1003 * their pubactions are the same.
1004 */
1005 oldctx = MemoryContextSwitchTo(entry->entry_cxt);
1006 entry->estate = create_estate_for_relation(relation);
1007 for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
1008 {
1009 List *filters = NIL;
1010 Expr *rfnode;
1011
1012 if (rfnodes[idx] == NIL)
1013 continue;
1014
1015 foreach(lc, rfnodes[idx])
1016 filters = lappend(filters, expand_generated_columns_in_expr(stringToNode((char *) lfirst(lc)), relation, 1));
1017
1018 /* combine the row filter and cache the ExprState */
1019 rfnode = make_orclause(filters);
1020 entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
1021 } /* for each pubaction */
1022 MemoryContextSwitchTo(oldctx);
1023
1024 RelationClose(relation);
1025 }
1026}
Datum idx(PG_FUNCTION_ARGS)
Definition: _int_op.c:259
#define TextDatumGetCString(d)
Definition: builtins.h:98
ExprState * ExecPrepareExpr(Expr *node, EState *estate)
Definition: execExpr.c:765
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
void list_free_deep(List *list)
Definition: list.c:1560
Expr * make_orclause(List *orclauses)
Definition: makefuncs.c:743
#define NUM_ROWFILTER_PUBACTIONS
Definition: pgoutput.c:103
static EState * create_estate_for_relation(Relation rel)
Definition: pgoutput.c:810
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:257
void * stringToNode(const char *str)
Definition: read.c:90
Node * expand_generated_columns_in_expr(Node *node, Relation rel, int rt_index)
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:269
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:600
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:232
#define SearchSysCacheExists2(cacheId, key1, key2)
Definition: syscache.h:102

References Publication::alltables, create_estate_for_relation(), data, RelationSyncEntry::entry_cxt, RelationSyncEntry::estate, ExecPrepareExpr(), expand_generated_columns_in_expr(), RelationSyncEntry::exprstate, get_rel_namespace(), HeapTupleIsValid, idx(), lappend(), lfirst, list_free_deep(), make_orclause(), MemoryContextSwitchTo(), NIL, NUM_ROWFILTER_PUBACTIONS, ObjectIdGetDatum(), Publication::oid, pgoutput_ensure_entry_cxt(), PUBACTION_DELETE, PUBACTION_INSERT, PUBACTION_UPDATE, Publication::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, RelationSyncEntry::publish_as_relid, PublicationActions::pubupdate, RelationClose(), RelationIdGetRelation(), ReleaseSysCache(), SearchSysCache2(), SearchSysCacheExists2, stringToNode(), SysCacheGetAttr(), and TextDatumGetCString.

Referenced by get_rel_sync_entry().

◆ pgoutput_send_begin()

static void pgoutput_send_begin ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 578 of file pgoutput.c.

579{
580 bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
582
583 Assert(txndata);
584 Assert(!txndata->sent_begin_txn);
585
586 OutputPluginPrepareWrite(ctx, !send_replication_origin);
587 logicalrep_write_begin(ctx->out, txn);
588 txndata->sent_begin_txn = true;
589
590 send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
591 send_replication_origin);
592
593 OutputPluginWrite(ctx, true);
594}
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:49

References Assert(), InvalidRepOriginId, logicalrep_write_begin(), ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, LogicalDecodingContext::out, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), send_repl_origin(), and PGOutputTxnData::sent_begin_txn.

Referenced by pgoutput_change(), pgoutput_message(), and pgoutput_truncate().

◆ pgoutput_shutdown()

static void pgoutput_shutdown ( LogicalDecodingContext ctx)
static

Definition at line 1756 of file pgoutput.c.

1757{
1759 {
1761 RelationSyncCache = NULL;
1762 }
1763}
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865

References hash_destroy(), and RelationSyncCache.

Referenced by _PG_output_plugin_init().

◆ pgoutput_startup()

static void pgoutput_startup ( LogicalDecodingContext ctx,
OutputPluginOptions opt,
bool  is_init 
)
static

Definition at line 428 of file pgoutput.c.

430{
432 static bool publication_callback_registered = false;
433
434 /* Create our memory context for private allocations. */
435 data->context = AllocSetContextCreate(ctx->context,
436 "logical replication output context",
438
439 data->cachectx = AllocSetContextCreate(ctx->context,
440 "logical replication cache context",
442
443 data->pubctx = AllocSetContextCreate(ctx->context,
444 "logical replication publication list context",
446
448
449 /* This plugin uses binary protocol. */
451
452 /*
453 * This is replication start and not slot initialization.
454 *
455 * Parse and validate options passed by the client.
456 */
457 if (!is_init)
458 {
459 /* Parse the params and ERROR if we see any we don't recognize */
461
462 /* Check if we support requested protocol */
463 if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
465 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
466 errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
467 data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
468
469 if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
471 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
472 errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
473 data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
474
475 /*
476 * Decide whether to enable streaming. It is disabled by default, in
477 * which case we just update the flag in decoding context. Otherwise
478 * we only allow it with sufficient version of the protocol, and when
479 * the output plugin supports it.
480 */
481 if (data->streaming == LOGICALREP_STREAM_OFF)
482 ctx->streaming = false;
483 else if (data->streaming == LOGICALREP_STREAM_ON &&
484 data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
486 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
487 errmsg("requested proto_version=%d does not support streaming, need %d or higher",
488 data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
489 else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
492 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
493 errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
495 else if (!ctx->streaming)
497 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
498 errmsg("streaming requested, but not supported by output plugin")));
499
500 /*
501 * Here, we just check whether the two-phase option is passed by
502 * plugin and decide whether to enable it at later point of time. It
503 * remains enabled if the previous start-up has done so. But we only
504 * allow the option to be passed in with sufficient version of the
505 * protocol, and when the output plugin supports it.
506 */
507 if (!data->two_phase)
508 ctx->twophase_opt_given = false;
509 else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
511 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
512 errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
513 data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
514 else if (!ctx->twophase)
516 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
517 errmsg("two-phase commit requested, but not supported by output plugin")));
518 else
519 ctx->twophase_opt_given = true;
520
521 /* Init publication state. */
522 data->publications = NIL;
523 publications_valid = false;
524
525 /*
526 * Register callback for pg_publication if we didn't already do that
527 * during some previous call in this process.
528 */
529 if (!publication_callback_registered)
530 {
531 CacheRegisterSyscacheCallback(PUBLICATIONOID,
533 (Datum) 0);
535 (Datum) 0);
536 publication_callback_registered = true;
537 }
538
539 /* Initialize relation schema cache. */
541 }
542 else
543 {
544 /*
545 * Disable the streaming and prepared transactions during the slot
546 * initialization mode.
547 */
548 ctx->streaming = false;
549 ctx->twophase = false;
550 }
551}
void CacheRegisterRelSyncCallback(RelSyncCallbackFunction func, Datum arg)
Definition: inval.c:1865
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
Definition: logicalproto.h:44
#define LOGICALREP_PROTO_MIN_VERSION_NUM
Definition: logicalproto.h:40
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:42
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:43
#define LOGICALREP_PROTO_MAX_VERSION_NUM
Definition: logicalproto.h:45
void * palloc0(Size size)
Definition: mcxt.c:1347
MemoryContext CacheMemoryContext
Definition: mcxt.c:152
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
@ OUTPUT_PLUGIN_BINARY_OUTPUT
Definition: output_plugin.h:19
static void parse_output_parameters(List *options, PGOutputData *data)
Definition: pgoutput.c:286
static void init_rel_sync_cache(MemoryContext cachectx)
Definition: pgoutput.c:1934
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:1791
List * output_plugin_options
Definition: logical.h:59
OutputPluginOutputType output_type
Definition: output_plugin.h:28

References ALLOCSET_DEFAULT_SIZES, ALLOCSET_SMALL_SIZES, AllocSetContextCreate, CacheMemoryContext, CacheRegisterRelSyncCallback(), CacheRegisterSyscacheCallback(), LogicalDecodingContext::context, data, ereport, errcode(), errmsg(), ERROR, init_rel_sync_cache(), LOGICALREP_PROTO_MAX_VERSION_NUM, LOGICALREP_PROTO_MIN_VERSION_NUM, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM, LOGICALREP_PROTO_STREAM_VERSION_NUM, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM, NIL, OUTPUT_PLUGIN_BINARY_OUTPUT, LogicalDecodingContext::output_plugin_options, LogicalDecodingContext::output_plugin_private, OutputPluginOptions::output_type, palloc0(), parse_output_parameters(), publication_invalidation_cb(), publications_valid, rel_sync_cache_relation_cb(), LogicalDecodingContext::streaming, LogicalDecodingContext::twophase, and LogicalDecodingContext::twophase_opt_given.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_abort()

static void pgoutput_stream_abort ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  abort_lsn 
)
static

Definition at line 1853 of file pgoutput.c.

1856{
1857 ReorderBufferTXN *toptxn;
1859 bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
1860
1861 /*
1862 * The abort should happen outside streaming block, even for streamed
1863 * transactions. The transaction has to be marked as streamed, though.
1864 */
1865 Assert(!data->in_streaming);
1866
1867 /* determine the toplevel transaction */
1868 toptxn = rbtxn_get_toptxn(txn);
1869
1870 Assert(rbtxn_is_streamed(toptxn));
1871
1872 OutputPluginPrepareWrite(ctx, true);
1873 logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
1874 txn->xact_time.abort_time, write_abort_info);
1875
1876 OutputPluginWrite(ctx, true);
1877
1878 cleanup_rel_sync_cache(toptxn->xid, false);
1879}
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
Definition: pgoutput.c:2309
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid, XLogRecPtr abort_lsn, TimestampTz abort_time, bool write_abort_info)
Definition: proto.c:1158
#define rbtxn_is_streamed(txn)
TimestampTz abort_time
union ReorderBufferTXN::@116 xact_time

References ReorderBufferTXN::abort_time, Assert(), cleanup_rel_sync_cache(), data, logicalrep_write_stream_abort(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), rbtxn_get_toptxn, rbtxn_is_streamed, ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_commit()

static void pgoutput_stream_commit ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 1886 of file pgoutput.c.

1889{
1891
1892 /*
1893 * The commit should happen outside streaming block, even for streamed
1894 * transactions. The transaction has to be marked as streamed, though.
1895 */
1896 Assert(!data->in_streaming);
1898
1899 OutputPluginUpdateProgress(ctx, false);
1900
1901 OutputPluginPrepareWrite(ctx, true);
1902 logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1903 OutputPluginWrite(ctx, true);
1904
1905 cleanup_rel_sync_cache(txn->xid, true);
1906}
#define PG_USED_FOR_ASSERTS_ONLY
Definition: c.h:224
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:1104

References Assert(), cleanup_rel_sync_cache(), data, logicalrep_write_stream_commit(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), OutputPluginWrite(), PG_USED_FOR_ASSERTS_ONLY, rbtxn_is_streamed, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_prepare_txn()

static void pgoutput_stream_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 1914 of file pgoutput.c.

1917{
1919
1920 OutputPluginUpdateProgress(ctx, false);
1921 OutputPluginPrepareWrite(ctx, true);
1922 logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1923 OutputPluginWrite(ctx, true);
1924}
void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:353

References Assert(), logicalrep_write_stream_prepare(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), OutputPluginWrite(), and rbtxn_is_streamed.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_start()

static void pgoutput_stream_start ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 1800 of file pgoutput.c.

1802{
1804 bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1805
1806 /* we can't nest streaming of transactions */
1807 Assert(!data->in_streaming);
1808
1809 /*
1810 * If we already sent the first stream for this transaction then don't
1811 * send the origin id in the subsequent streams.
1812 */
1813 if (rbtxn_is_streamed(txn))
1814 send_replication_origin = false;
1815
1816 OutputPluginPrepareWrite(ctx, !send_replication_origin);
1818
1820 send_replication_origin);
1821
1822 OutputPluginWrite(ctx, true);
1823
1824 /* we're streaming a chunk of transaction now */
1825 data->in_streaming = true;
1826}
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
Definition: proto.c:1061
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert(), data, InvalidRepOriginId, InvalidXLogRecPtr, logicalrep_write_stream_start(), ReorderBufferTXN::origin_id, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), rbtxn_is_streamed, send_repl_origin(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_stop()

static void pgoutput_stream_stop ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 1832 of file pgoutput.c.

1834{
1836
1837 /* we should be streaming a transaction */
1838 Assert(data->in_streaming);
1839
1840 OutputPluginPrepareWrite(ctx, true);
1842 OutputPluginWrite(ctx, true);
1843
1844 /* we've stopped streaming a transaction */
1845 data->in_streaming = false;
1846}
void logicalrep_write_stream_stop(StringInfo out)
Definition: proto.c:1095

References Assert(), data, logicalrep_write_stream_stop(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

◆ pgoutput_truncate()

static void pgoutput_truncate ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
int  nrelations,
Relation  relations[],
ReorderBufferChange change 
)
static

Definition at line 1624 of file pgoutput.c.

1626{
1629 MemoryContext old;
1630 RelationSyncEntry *relentry;
1631 int i;
1632 int nrelids;
1633 Oid *relids;
1635
1636 /* Remember the xid for the change in streaming mode. See pgoutput_change. */
1637 if (data->in_streaming)
1638 xid = change->txn->xid;
1639
1640 old = MemoryContextSwitchTo(data->context);
1641
1642 relids = palloc0(nrelations * sizeof(Oid));
1643 nrelids = 0;
1644
1645 for (i = 0; i < nrelations; i++)
1646 {
1647 Relation relation = relations[i];
1648 Oid relid = RelationGetRelid(relation);
1649
1650 if (!is_publishable_relation(relation))
1651 continue;
1652
1653 relentry = get_rel_sync_entry(data, relation);
1654
1655 if (!relentry->pubactions.pubtruncate)
1656 continue;
1657
1658 /*
1659 * Don't send partitions if the publication wants to send only the
1660 * root tables through it.
1661 */
1662 if (relation->rd_rel->relispartition &&
1663 relentry->publish_as_relid != relid)
1664 continue;
1665
1666 relids[nrelids++] = relid;
1667
1668 /* Send BEGIN if we haven't yet */
1669 if (txndata && !txndata->sent_begin_txn)
1670 pgoutput_send_begin(ctx, txn);
1671
1672 maybe_send_schema(ctx, change, relation, relentry);
1673 }
1674
1675 if (nrelids > 0)
1676 {
1677 OutputPluginPrepareWrite(ctx, true);
1679 xid,
1680 nrelids,
1681 relids,
1682 change->data.truncate.cascade,
1683 change->data.truncate.restart_seqs);
1684 OutputPluginWrite(ctx, true);
1685 }
1686
1688 MemoryContextReset(data->context);
1689}
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:583
struct ReorderBufferChange::@110::@112 truncate

References ReorderBufferChange::cascade, ReorderBufferChange::data, data, get_rel_sync_entry(), i, InvalidTransactionId, is_publishable_relation(), logicalrep_write_truncate(), maybe_send_schema(), MemoryContextReset(), MemoryContextSwitchTo(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), palloc0(), pgoutput_send_begin(), RelationSyncEntry::pubactions, RelationSyncEntry::publish_as_relid, PublicationActions::pubtruncate, RelationData::rd_rel, RelationGetRelid, ReorderBufferChange::restart_seqs, ReorderBufferChange::truncate, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ publication_invalidation_cb()

static void publication_invalidation_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 1791 of file pgoutput.c.

1792{
1793 publications_valid = false;
1794}

References publications_valid.

Referenced by pgoutput_startup().

◆ rel_sync_cache_publication_cb()

static void rel_sync_cache_publication_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 2394 of file pgoutput.c.

2395{
2396 HASH_SEQ_STATUS status;
2397 RelationSyncEntry *entry;
2398
2399 /*
2400 * We can get here if the plugin was used in SQL interface as the
2401 * RelationSyncCache is destroyed when the decoding finishes, but there is
2402 * no way to unregister the invalidation callbacks.
2403 */
2404 if (RelationSyncCache == NULL)
2405 return;
2406
2407 /*
2408 * We have no easy way to identify which cache entries this invalidation
2409 * event might have affected, so just mark them all invalid.
2410 */
2412 while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2413 {
2414 entry->replicate_valid = false;
2415 }
2416}

References hash_seq_init(), hash_seq_search(), RelationSyncCache, and RelationSyncEntry::replicate_valid.

Referenced by init_rel_sync_cache().

◆ rel_sync_cache_relation_cb()

static void rel_sync_cache_relation_cb ( Datum  arg,
Oid  relid 
)
static

Definition at line 2344 of file pgoutput.c.

2345{
2346 RelationSyncEntry *entry;
2347
2348 /*
2349 * We can get here if the plugin was used in SQL interface as the
2350 * RelationSyncCache is destroyed when the decoding finishes, but there is
2351 * no way to unregister the relcache invalidation callback.
2352 */
2353 if (RelationSyncCache == NULL)
2354 return;
2355
2356 /*
2357 * Nobody keeps pointers to entries in this hash table around outside
2358 * logical decoding callback calls - but invalidation events can come in
2359 * *during* a callback if we do any syscache access in the callback.
2360 * Because of that we must mark the cache entry as invalid but not damage
2361 * any of its substructure here. The next get_rel_sync_entry() call will
2362 * rebuild it all.
2363 */
2364 if (OidIsValid(relid))
2365 {
2366 /*
2367 * Getting invalidations for relations that aren't in the table is
2368 * entirely normal. So we don't care if it's found or not.
2369 */
2371 HASH_FIND, NULL);
2372 if (entry != NULL)
2373 entry->replicate_valid = false;
2374 }
2375 else
2376 {
2377 /* Whole cache must be flushed. */
2378 HASH_SEQ_STATUS status;
2379
2381 while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2382 {
2383 entry->replicate_valid = false;
2384 }
2385 }
2386}
#define OidIsValid(objectId)
Definition: c.h:746
@ HASH_FIND
Definition: hsearch.h:113

References HASH_FIND, hash_search(), hash_seq_init(), hash_seq_search(), OidIsValid, RelationSyncCache, and RelationSyncEntry::replicate_valid.

Referenced by init_rel_sync_cache(), and pgoutput_startup().

◆ send_relation_and_attrs()

static void send_relation_and_attrs ( Relation  relation,
TransactionId  xid,
LogicalDecodingContext ctx,
RelationSyncEntry relentry 
)
static

Definition at line 766 of file pgoutput.c.

769{
770 TupleDesc desc = RelationGetDescr(relation);
771 Bitmapset *columns = relentry->columns;
772 PublishGencolsType include_gencols_type = relentry->include_gencols_type;
773 int i;
774
775 /*
776 * Write out type info if needed. We do that only for user-created types.
777 * We use FirstGenbkiObjectId as the cutoff, so that we only consider
778 * objects with hand-assigned OIDs to be "built in", not for instance any
779 * function or type defined in the information_schema. This is important
780 * because only hand-assigned OIDs can be expected to remain stable across
781 * major versions.
782 */
783 for (i = 0; i < desc->natts; i++)
784 {
785 Form_pg_attribute att = TupleDescAttr(desc, i);
786
787 if (!logicalrep_should_publish_column(att, columns,
788 include_gencols_type))
789 continue;
790
791 if (att->atttypid < FirstGenbkiObjectId)
792 continue;
793
794 OutputPluginPrepareWrite(ctx, false);
795 logicalrep_write_typ(ctx->out, xid, att->atttypid);
796 OutputPluginWrite(ctx, false);
797 }
798
799 OutputPluginPrepareWrite(ctx, false);
800 logicalrep_write_rel(ctx->out, xid, relation, columns,
801 include_gencols_type);
802 OutputPluginWrite(ctx, false);
803}
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:667
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
Definition: proto.c:723
bool logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:1279
#define FirstGenbkiObjectId
Definition: transam.h:195

References RelationSyncEntry::columns, FirstGenbkiObjectId, i, RelationSyncEntry::include_gencols_type, logicalrep_should_publish_column(), logicalrep_write_rel(), logicalrep_write_typ(), TupleDescData::natts, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), RelationGetDescr, and TupleDescAttr().

Referenced by maybe_send_schema().

◆ send_repl_origin()

static void send_repl_origin ( LogicalDecodingContext ctx,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
bool  send_origin 
)
static

Definition at line 2420 of file pgoutput.c.

2422{
2423 if (send_origin)
2424 {
2425 char *origin;
2426
2427 /*----------
2428 * XXX: which behaviour do we want here?
2429 *
2430 * Alternatives:
2431 * - don't send origin message if origin name not found
2432 * (that's what we do now)
2433 * - throw error - that will break replication, not good
2434 * - send some special "unknown" origin
2435 *----------
2436 */
2437 if (replorigin_by_oid(origin_id, true, &origin))
2438 {
2439 /* Message boundary */
2440 OutputPluginWrite(ctx, false);
2441 OutputPluginPrepareWrite(ctx, true);
2442
2443 logicalrep_write_origin(ctx->out, origin, origin_lsn);
2444 }
2445 }
2446}
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:469
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:374

References logicalrep_write_origin(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and replorigin_by_oid().

Referenced by pgoutput_begin_prepare_txn(), pgoutput_send_begin(), and pgoutput_stream_start().

◆ set_schema_sent_in_streamed_txn()

static void set_schema_sent_in_streamed_txn ( RelationSyncEntry entry,
TransactionId  xid 
)
static

Definition at line 1993 of file pgoutput.c.

1994{
1995 MemoryContext oldctx;
1996
1998
1999 entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
2000
2001 MemoryContextSwitchTo(oldctx);
2002}
List * lappend_xid(List *list, TransactionId datum)
Definition: list.c:393

References CacheMemoryContext, lappend_xid(), MemoryContextSwitchTo(), and RelationSyncEntry::streamed_txns.

Referenced by maybe_send_schema().

Variable Documentation

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 39 of file pgoutput.c.

◆ publications_valid

bool publications_valid
static

Definition at line 83 of file pgoutput.c.

Referenced by get_rel_sync_entry(), pgoutput_startup(), and publication_invalidation_cb().

◆ RelationSyncCache