PostgreSQL Source Code git master
Loading...
Searching...
No Matches
pgoutput.c File Reference
Include dependency graph for pgoutput.c:

Go to the source code of this file.

Data Structures

struct  RelationSyncEntry
 
struct  PGOutputTxnData
 

Macros

#define NUM_ROWFILTER_PUBACTIONS   (PUBACTION_DELETE+1)
 

Typedefs

typedef struct RelationSyncEntry RelationSyncEntry
 
typedef struct PGOutputTxnData PGOutputTxnData
 

Enumerations

enum  RowFilterPubAction { PUBACTION_INSERT , PUBACTION_UPDATE , PUBACTION_DELETE }
 

Functions

 PG_MODULE_MAGIC_EXT (.name="pgoutput",.version=PG_VERSION)
 
static void pgoutput_startup (LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
 
static void pgoutput_shutdown (LogicalDecodingContext *ctx)
 
static void pgoutput_begin_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_commit_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void pgoutput_truncate (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
static void pgoutput_message (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
static bool pgoutput_origin_filter (LogicalDecodingContext *ctx, ReplOriginId origin_id)
 
static void pgoutput_begin_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static void pgoutput_commit_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_rollback_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
static void pgoutput_stream_start (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_stream_stop (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_stream_abort (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
 
static void pgoutput_stream_commit (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_stream_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static ListLoadPublications (List *pubnames)
 
static void publication_invalidation_cb (Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
 
static void send_repl_origin (LogicalDecodingContext *ctx, ReplOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin)
 
static void init_rel_sync_cache (MemoryContext cachectx)
 
static void cleanup_rel_sync_cache (TransactionId xid, bool is_commit)
 
static RelationSyncEntryget_rel_sync_entry (PGOutputData *data, Relation relation)
 
static void send_relation_and_attrs (Relation relation, TransactionId xid, LogicalDecodingContext *ctx, RelationSyncEntry *relentry)
 
static void rel_sync_cache_relation_cb (Datum arg, Oid relid)
 
static void rel_sync_cache_publication_cb (Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
 
static void set_schema_sent_in_streamed_txn (RelationSyncEntry *entry, TransactionId xid)
 
static bool get_schema_sent_in_streamed_txn (RelationSyncEntry *entry, TransactionId xid)
 
static void init_tuple_slot (PGOutputData *data, Relation relation, RelationSyncEntry *entry)
 
static void pgoutput_memory_context_reset (void *arg)
 
static EStatecreate_estate_for_relation (Relation rel)
 
static void pgoutput_row_filter_init (PGOutputData *data, List *publications, RelationSyncEntry *entry)
 
static bool pgoutput_row_filter_exec_expr (ExprState *state, ExprContext *econtext)
 
static bool pgoutput_row_filter (Relation relation, TupleTableSlot *old_slot, TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry, ReorderBufferChangeType *action)
 
static void pgoutput_column_list_init (PGOutputData *data, List *publications, RelationSyncEntry *entry)
 
void _PG_output_plugin_init (OutputPluginCallbacks *cb)
 
static void parse_output_parameters (List *options, PGOutputData *data)
 
static void pgoutput_send_begin (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void maybe_send_schema (LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
 
static void pgoutput_ensure_entry_cxt (PGOutputData *data, RelationSyncEntry *entry)
 
static void check_and_init_gencol (PGOutputData *data, List *publications, RelationSyncEntry *entry)
 

Variables

static bool publications_valid
 
static HTABRelationSyncCache = NULL
 

Macro Definition Documentation

◆ NUM_ROWFILTER_PUBACTIONS

#define NUM_ROWFILTER_PUBACTIONS   (PUBACTION_DELETE+1)

Definition at line 106 of file pgoutput.c.

Typedef Documentation

◆ PGOutputTxnData

◆ RelationSyncEntry

Enumeration Type Documentation

◆ RowFilterPubAction

Enumerator
PUBACTION_INSERT 
PUBACTION_UPDATE 
PUBACTION_DELETE 

Definition at line 99 of file pgoutput.c.

100{
104};
@ PUBACTION_INSERT
Definition pgoutput.c:101
@ PUBACTION_UPDATE
Definition pgoutput.c:102
@ PUBACTION_DELETE
Definition pgoutput.c:103

Function Documentation

◆ _PG_output_plugin_init()

void _PG_output_plugin_init ( OutputPluginCallbacks cb)

Definition at line 261 of file pgoutput.c.

262{
269
276
277 /* transaction streaming */
285 /* transaction streaming - two-phase commit */
287}
static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
Definition pgoutput.c:1484
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition pgoutput.c:664
static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition pgoutput.c:451
static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Definition pgoutput.c:1656
static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition pgoutput.c:681
static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition pgoutput.c:709
static void pgoutput_shutdown(LogicalDecodingContext *ctx)
Definition pgoutput.c:1788
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, ReplOriginId origin_id)
Definition pgoutput.c:1769
static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
Definition pgoutput.c:1894
static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition pgoutput.c:1955
static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition pgoutput.c:596
static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition pgoutput.c:1927
static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition pgoutput.c:695
static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition pgoutput.c:1873
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition pgoutput.c:1841
static void pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition pgoutput.c:1724
static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition pgoutput.c:632
LogicalDecodeStreamChangeCB stream_change_cb
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
LogicalDecodeStreamMessageCB stream_message_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeTruncateCB truncate_cb
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
LogicalDecodeStreamPrepareCB stream_prepare_cb
LogicalDecodeCommitPreparedCB commit_prepared_cb
LogicalDecodeStreamStartCB stream_start_cb
LogicalDecodePrepareCB prepare_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeCommitCB commit_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamAbortCB stream_abort_cb
LogicalDecodeBeginPrepareCB begin_prepare_cb
LogicalDecodeChangeCB change_cb
LogicalDecodeShutdownCB shutdown_cb

References OutputPluginCallbacks::begin_cb, OutputPluginCallbacks::begin_prepare_cb, OutputPluginCallbacks::change_cb, OutputPluginCallbacks::commit_cb, OutputPluginCallbacks::commit_prepared_cb, OutputPluginCallbacks::filter_by_origin_cb, OutputPluginCallbacks::message_cb, pgoutput_begin_prepare_txn(), pgoutput_begin_txn(), pgoutput_change(), pgoutput_commit_prepared_txn(), pgoutput_commit_txn(), pgoutput_message(), pgoutput_origin_filter(), pgoutput_prepare_txn(), pgoutput_rollback_prepared_txn(), pgoutput_shutdown(), pgoutput_startup(), pgoutput_stream_abort(), pgoutput_stream_commit(), pgoutput_stream_prepare_txn(), pgoutput_stream_start(), pgoutput_stream_stop(), pgoutput_truncate(), OutputPluginCallbacks::prepare_cb, OutputPluginCallbacks::rollback_prepared_cb, OutputPluginCallbacks::shutdown_cb, OutputPluginCallbacks::startup_cb, OutputPluginCallbacks::stream_abort_cb, OutputPluginCallbacks::stream_change_cb, OutputPluginCallbacks::stream_commit_cb, OutputPluginCallbacks::stream_message_cb, OutputPluginCallbacks::stream_prepare_cb, OutputPluginCallbacks::stream_start_cb, OutputPluginCallbacks::stream_stop_cb, OutputPluginCallbacks::stream_truncate_cb, and OutputPluginCallbacks::truncate_cb.

◆ check_and_init_gencol()

static void check_and_init_gencol ( PGOutputData data,
List publications,
RelationSyncEntry entry 
)
static

Definition at line 1065 of file pgoutput.c.

1067{
1069 TupleDesc desc = RelationGetDescr(relation);
1070 bool gencolpresent = false;
1071 bool first = true;
1072
1073 /* Check if there is any generated column present. */
1074 for (int i = 0; i < desc->natts; i++)
1075 {
1077
1078 if (att->attgenerated)
1079 {
1080 gencolpresent = true;
1081 break;
1082 }
1083 }
1084
1085 /* There are no generated columns to be published. */
1086 if (!gencolpresent)
1087 {
1089 return;
1090 }
1091
1092 /*
1093 * There may be a conflicting value for 'publish_generated_columns'
1094 * parameter in the publications.
1095 */
1096 foreach_ptr(Publication, pub, publications)
1097 {
1098 /*
1099 * The column list takes precedence over the
1100 * 'publish_generated_columns' parameter. Those will be checked later,
1101 * see pgoutput_column_list_init.
1102 */
1104 continue;
1105
1106 if (first)
1107 {
1108 entry->include_gencols_type = pub->pubgencols_type;
1109 first = false;
1110 }
1111 else if (entry->include_gencols_type != pub->pubgencols_type)
1112 ereport(ERROR,
1114 errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
1116 RelationGetRelationName(relation)));
1117 }
1118}
int errcode(int sqlerrcode)
Definition elog.c:874
#define ERROR
Definition elog.h:39
#define ereport(elevel,...)
Definition elog.h:150
int i
Definition isn.c:77
char * get_namespace_name(Oid nspid)
Definition lsyscache.c:3588
static char * errmsg
#define foreach_ptr(type, var, lst)
Definition pg_list.h:469
bool check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt, Bitmapset **cols)
static int fb(int x)
#define RelationGetDescr(relation)
Definition rel.h:540
#define RelationGetRelationName(relation)
Definition rel.h:548
#define RelationGetNamespace(relation)
Definition rel.h:555
Relation RelationIdGetRelation(Oid relationId)
Definition relcache.c:2088
PublishGencolsType include_gencols_type
Definition pgoutput.c:141
static CompactAttribute * TupleDescCompactAttr(TupleDesc tupdesc, int i)
Definition tupdesc.h:193

References check_and_fetch_column_list(), ereport, errcode(), errmsg, ERROR, fb(), foreach_ptr, get_namespace_name(), i, RelationSyncEntry::include_gencols_type, TupleDescData::natts, RelationSyncEntry::publish_as_relid, RelationGetDescr, RelationGetNamespace, RelationGetRelationName, RelationIdGetRelation(), and TupleDescCompactAttr().

Referenced by get_rel_sync_entry().

◆ cleanup_rel_sync_cache()

static void cleanup_rel_sync_cache ( TransactionId  xid,
bool  is_commit 
)
static

Definition at line 2383 of file pgoutput.c.

2384{
2386 RelationSyncEntry *entry;
2387
2389
2391 while ((entry = hash_seq_search(&hash_seq)) != NULL)
2392 {
2393 /*
2394 * We can set the schema_sent flag for an entry that has committed xid
2395 * in the list as that ensures that the subscriber would have the
2396 * corresponding schema and we don't need to send it unless there is
2397 * any invalidation for that relation.
2398 */
2400 {
2401 if (xid == streamed_txn)
2402 {
2403 if (is_commit)
2404 entry->schema_sent = true;
2405
2406 entry->streamed_txns =
2408 break;
2409 }
2410 }
2411 }
2412}
#define Assert(condition)
Definition c.h:927
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition dynahash.c:1415
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition dynahash.c:1380
#define foreach_delete_current(lst, var_or_cell)
Definition pg_list.h:391
#define foreach_xid(var, lst)
Definition pg_list.h:472
static HTAB * RelationSyncCache
Definition pgoutput.c:220
List * streamed_txns
Definition pgoutput.c:142

References Assert, fb(), foreach_delete_current, foreach_xid, hash_seq_init(), hash_seq_search(), RelationSyncCache, RelationSyncEntry::schema_sent, and RelationSyncEntry::streamed_txns.

Referenced by pgoutput_stream_abort(), and pgoutput_stream_commit().

◆ create_estate_for_relation()

static EState * create_estate_for_relation ( Relation  rel)
static

Definition at line 842 of file pgoutput.c.

843{
844 EState *estate;
846 List *perminfos = NIL;
847
848 estate = CreateExecutorState();
849
851 rte->rtekind = RTE_RELATION;
852 rte->relid = RelationGetRelid(rel);
853 rte->relkind = rel->rd_rel->relkind;
854 rte->rellockmode = AccessShareLock;
855
857
860
861 estate->es_output_cid = GetCurrentCommandId(false);
862
863 return estate;
864}
Bitmapset * bms_make_singleton(int x)
Definition bitmapset.c:216
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos, Bitmapset *unpruned_relids)
Definition execUtils.c:778
EState * CreateExecutorState(void)
Definition execUtils.c:90
#define AccessShareLock
Definition lockdefs.h:36
#define makeNode(_type_)
Definition nodes.h:161
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
@ RTE_RELATION
#define NIL
Definition pg_list.h:68
#define list_make1(x1)
Definition pg_list.h:212
#define RelationGetRelid(relation)
Definition rel.h:514
CommandId es_output_cid
Definition execnodes.h:694
Definition pg_list.h:54
Form_pg_class rd_rel
Definition rel.h:111
CommandId GetCurrentCommandId(bool used)
Definition xact.c:831

References AccessShareLock, addRTEPermissionInfo(), bms_make_singleton(), CreateExecutorState(), EState::es_output_cid, ExecInitRangeTable(), fb(), GetCurrentCommandId(), list_make1, makeNode, NIL, RelationData::rd_rel, RelationGetRelid, and RTE_RELATION.

Referenced by pgoutput_row_filter_init().

◆ get_rel_sync_entry()

static RelationSyncEntry * get_rel_sync_entry ( PGOutputData data,
Relation  relation 
)
static

Definition at line 2055 of file pgoutput.c.

2056{
2057 RelationSyncEntry *entry;
2058 bool found;
2060 Oid relid = RelationGetRelid(relation);
2061
2063
2064 /* Find cached relation info, creating if not found */
2066 &relid,
2067 HASH_ENTER, &found);
2068 Assert(entry != NULL);
2069
2070 /* initialize entry, if it's new */
2071 if (!found)
2072 {
2073 entry->replicate_valid = false;
2074 entry->schema_sent = false;
2076 entry->streamed_txns = NIL;
2077 entry->pubactions.pubinsert = entry->pubactions.pubupdate =
2078 entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
2079 entry->new_slot = NULL;
2080 entry->old_slot = NULL;
2081 memset(entry->exprstate, 0, sizeof(entry->exprstate));
2082 entry->entry_cxt = NULL;
2084 entry->columns = NULL;
2085 entry->attrmap = NULL;
2086 }
2087
2088 /* Validate the entry */
2089 if (!entry->replicate_valid)
2090 {
2093
2094 /*
2095 * We don't acquire a lock on the namespace system table as we build
2096 * the cache entry using a historic snapshot and all the later changes
2097 * are absorbed while decoding WAL.
2098 */
2100 ListCell *lc;
2101 Oid publish_as_relid = relid;
2102 int publish_ancestor_level = 0;
2104 char relkind = get_rel_relkind(relid);
2106
2107 /* Reload publications if needed before use. */
2108 if (!publications_valid)
2109 {
2110 MemoryContextReset(data->pubctx);
2111
2113 data->publications = LoadPublications(data->publication_names);
2115 publications_valid = true;
2116 }
2117
2118 /*
2119 * Reset schema_sent status as the relation definition may have
2120 * changed. Also reset pubactions to empty in case rel was dropped
2121 * from a publication. Also free any objects that depended on the
2122 * earlier definition.
2123 */
2124 entry->schema_sent = false;
2126 list_free(entry->streamed_txns);
2127 entry->streamed_txns = NIL;
2128 bms_free(entry->columns);
2129 entry->columns = NULL;
2130 entry->pubactions.pubinsert = false;
2131 entry->pubactions.pubupdate = false;
2132 entry->pubactions.pubdelete = false;
2133 entry->pubactions.pubtruncate = false;
2134
2135 /*
2136 * Tuple slots cleanups. (Will be rebuilt later if needed).
2137 */
2138 if (entry->old_slot)
2139 {
2140 TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
2141
2142 Assert(desc->tdrefcount == -1);
2143
2145
2146 /*
2147 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2148 * do it now to avoid any leaks.
2149 */
2150 FreeTupleDesc(desc);
2151 }
2152 if (entry->new_slot)
2153 {
2154 TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
2155
2156 Assert(desc->tdrefcount == -1);
2157
2159
2160 /*
2161 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2162 * do it now to avoid any leaks.
2163 */
2164 FreeTupleDesc(desc);
2165 }
2166
2167 entry->old_slot = NULL;
2168 entry->new_slot = NULL;
2169
2170 if (entry->attrmap)
2171 free_attrmap(entry->attrmap);
2172 entry->attrmap = NULL;
2173
2174 /*
2175 * Row filter cache cleanups.
2176 */
2177 if (entry->entry_cxt)
2179
2180 entry->entry_cxt = NULL;
2181 entry->estate = NULL;
2182 memset(entry->exprstate, 0, sizeof(entry->exprstate));
2183
2184 /*
2185 * Build publication cache. We can't use one provided by relcache as
2186 * relcache considers all publications that the given relation is in,
2187 * but here we only need to consider ones that the subscriber
2188 * requested.
2189 */
2190 foreach(lc, data->publications)
2191 {
2192 Publication *pub = lfirst(lc);
2193 bool publish = false;
2194
2195 /*
2196 * Under what relid should we publish changes in this publication?
2197 * We'll use the top-most relid across all publications. Also
2198 * track the ancestor level for this publication.
2199 */
2200 Oid pub_relid = relid;
2201 int ancestor_level = 0;
2202
2203 /*
2204 * If this is a FOR ALL TABLES publication, pick the partition
2205 * root and set the ancestor level accordingly.
2206 */
2207 if (pub->alltables)
2208 {
2210
2211 if (am_partition)
2212 {
2213 List *ancestors = get_partition_ancestors(relid);
2214 Oid last_ancestor_relid = llast_oid(ancestors);
2215
2216 /*
2217 * For a partition, changes are published via top-most
2218 * ancestor when pubviaroot is true, so populate pub_relid
2219 * accordingly.
2220 */
2221 if (pub->pubviaroot)
2222 {
2224 ancestor_level = list_length(ancestors);
2225 }
2226
2227 /*
2228 * Only the top-most ancestor can appear in the EXCEPT
2229 * clause. Therefore, for a partition, exclusion must be
2230 * evaluated at the top-most ancestor.
2231 */
2233 }
2234 else
2235 {
2236 /*
2237 * For a regular table or a root partitioned table, check
2238 * exclusion on table itself.
2239 */
2241 }
2242
2243 if (!list_member_oid(exceptpubids, pub->oid))
2244 publish = true;
2245
2247
2248 if (!publish)
2249 continue;
2250 }
2251
2252 if (!publish)
2253 {
2254 bool ancestor_published = false;
2255
2256 /*
2257 * For a partition, check if any of the ancestors are
2258 * published. If so, note down the topmost ancestor that is
2259 * published via this publication, which will be used as the
2260 * relation via which to publish the partition's changes.
2261 */
2262 if (am_partition)
2263 {
2264 Oid ancestor;
2265 int level;
2266 List *ancestors = get_partition_ancestors(relid);
2267
2269 ancestors,
2270 &level);
2271
2272 if (ancestor != InvalidOid)
2273 {
2274 ancestor_published = true;
2275 if (pub->pubviaroot)
2276 {
2278 ancestor_level = level;
2279 }
2280 }
2281 }
2282
2283 if (list_member_oid(pubids, pub->oid) ||
2286 publish = true;
2287 }
2288
2289 /*
2290 * If the relation is to be published, determine actions to
2291 * publish, and list of columns, if appropriate.
2292 *
2293 * Don't publish changes for partitioned tables, because
2294 * publishing those of its partitions suffices, unless partition
2295 * changes won't be published due to pubviaroot being set.
2296 */
2297 if (publish &&
2298 (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2299 {
2304
2305 /*
2306 * We want to publish the changes as the top-most ancestor
2307 * across all publications. So we need to check if the already
2308 * calculated level is higher than the new one. If yes, we can
2309 * ignore the new value (as it's a child). Otherwise the new
2310 * value is an ancestor, so we keep it.
2311 */
2313 continue;
2314
2315 /*
2316 * If we found an ancestor higher up in the tree, discard the
2317 * list of publications through which we replicate it, and use
2318 * the new ancestor.
2319 */
2321 {
2322 publish_as_relid = pub_relid;
2324
2325 /* reset the publication list for this relation */
2327 }
2328 else
2329 {
2330 /* Same ancestor level, has to be the same OID. */
2331 Assert(publish_as_relid == pub_relid);
2332 }
2333
2334 /* Track publications for this ancestor. */
2336 }
2337 }
2338
2339 entry->publish_as_relid = publish_as_relid;
2340
2341 /*
2342 * Initialize the tuple slot, map, and row filter. These are only used
2343 * when publishing inserts, updates, or deletes.
2344 */
2345 if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2346 entry->pubactions.pubdelete)
2347 {
2348 /* Initialize the tuple slot and map */
2349 init_tuple_slot(data, relation, entry);
2350
2351 /* Initialize the row filter */
2353
2354 /* Check whether to publish generated columns. */
2356
2357 /* Initialize the column list */
2359 }
2360
2364
2365 entry->replicate_valid = true;
2366 }
2367
2368 return entry;
2369}
void free_attrmap(AttrMap *map)
Definition attmap.c:56
void bms_free(Bitmapset *a)
Definition bitmapset.c:239
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition dynahash.c:952
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
@ HASH_ENTER
Definition hsearch.h:114
List * lappend(List *list, void *datum)
Definition list.c:339
void list_free(List *list)
Definition list.c:1546
bool list_member_oid(const List *list, Oid datum)
Definition list.c:722
bool get_rel_relispartition(Oid relid)
Definition lsyscache.c:2247
char get_rel_relkind(Oid relid)
Definition lsyscache.c:2223
Oid get_rel_namespace(Oid relid)
Definition lsyscache.c:2172
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:403
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:472
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
List * get_partition_ancestors(Oid relid)
Definition partition.c:134
const void * data
#define lfirst(lc)
Definition pg_list.h:172
static int list_length(const List *l)
Definition pg_list.h:152
#define llast_oid(l)
Definition pg_list.h:200
List * GetRelationIncludedPublications(Oid relid)
List * GetSchemaPublications(Oid schemaid)
Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
List * GetRelationExcludedPublications(Oid relid)
static List * LoadPublications(List *pubnames)
Definition pgoutput.c:1802
static void init_tuple_slot(PGOutputData *data, Relation relation, RelationSyncEntry *entry)
Definition pgoutput.c:1211
static void pgoutput_row_filter_init(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition pgoutput.c:918
static void check_and_init_gencol(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition pgoutput.c:1065
static void pgoutput_column_list_init(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition pgoutput.c:1124
static bool publications_valid
Definition pgoutput.c:86
#define InvalidOid
unsigned int Oid
PublicationActions pubactions
ExprState * exprstate[NUM_ROWFILTER_PUBACTIONS]
Definition pgoutput.c:155
Bitmapset * columns
Definition pgoutput.c:181
PublicationActions pubactions
Definition pgoutput.c:146
TupleTableSlot * old_slot
Definition pgoutput.c:158
MemoryContext entry_cxt
Definition pgoutput.c:187
EState * estate
Definition pgoutput.c:156
TupleTableSlot * new_slot
Definition pgoutput.c:157
AttrMap * attrmap
Definition pgoutput.c:174
TupleDesc tts_tupleDescriptor
Definition tuptable.h:129
void FreeTupleDesc(TupleDesc tupdesc)
Definition tupdesc.c:547

References Publication::alltables, Assert, RelationSyncEntry::attrmap, bms_free(), check_and_init_gencol(), RelationSyncEntry::columns, data, RelationSyncEntry::entry_cxt, RelationSyncEntry::estate, ExecDropSingleTupleTableSlot(), RelationSyncEntry::exprstate, fb(), free_attrmap(), FreeTupleDesc(), get_partition_ancestors(), get_rel_namespace(), get_rel_relispartition(), get_rel_relkind(), GetRelationExcludedPublications(), GetRelationIncludedPublications(), GetSchemaPublications(), GetTopMostAncestorInPublication(), HASH_ENTER, hash_search(), RelationSyncEntry::include_gencols_type, init_tuple_slot(), InvalidOid, lappend(), lfirst, list_free(), list_length(), list_member_oid(), llast_oid, LoadPublications(), MemoryContextDelete(), MemoryContextReset(), MemoryContextSwitchTo(), RelationSyncEntry::new_slot, NIL, Publication::oid, RelationSyncEntry::old_slot, pgoutput_column_list_init(), pgoutput_row_filter_init(), RelationSyncEntry::pubactions, Publication::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, publications_valid, RelationSyncEntry::publish_as_relid, PublicationActions::pubtruncate, PublicationActions::pubupdate, Publication::pubviaroot, RelationGetRelid, RelationSyncCache, RelationSyncEntry::replicate_valid, RelationSyncEntry::schema_sent, RelationSyncEntry::streamed_txns, TupleDescData::tdrefcount, and TupleTableSlot::tts_tupleDescriptor.

Referenced by pgoutput_change(), and pgoutput_truncate().

◆ get_schema_sent_in_streamed_txn()

static bool get_schema_sent_in_streamed_txn ( RelationSyncEntry entry,
TransactionId  xid 
)
static

Definition at line 2024 of file pgoutput.c.

2025{
2026 return list_member_xid(entry->streamed_txns, xid);
2027}
bool list_member_xid(const List *list, TransactionId datum)
Definition list.c:742

References list_member_xid(), and RelationSyncEntry::streamed_txns.

Referenced by maybe_send_schema().

◆ init_rel_sync_cache()

static void init_rel_sync_cache ( MemoryContext  cachectx)
static

Definition at line 1975 of file pgoutput.c.

1976{
1977 HASHCTL ctl;
1978 static bool relation_callbacks_registered = false;
1979
1980 /* Nothing to do if hash table already exists */
1981 if (RelationSyncCache != NULL)
1982 return;
1983
1984 /* Make a new hash table for the cache */
1985 ctl.keysize = sizeof(Oid);
1986 ctl.entrysize = sizeof(RelationSyncEntry);
1987 ctl.hcxt = cachectx;
1988
1989 RelationSyncCache = hash_create("logical replication output relation cache",
1990 128, &ctl,
1992
1994
1995 /* No more to do if we already registered callbacks */
1997 return;
1998
1999 /* We must update the cache entry for a relation after a relcache flush */
2001
2002 /*
2003 * Flush all cache entries after a pg_namespace change, in case it was a
2004 * schema rename affecting a relation being replicated.
2005 *
2006 * XXX: It is not a good idea to invalidate all the relation entries in
2007 * RelationSyncCache on schema rename. We can optimize it to invalidate
2008 * only the required relations by either having a specific invalidation
2009 * message containing impacted relations or by having schema information
2010 * in each RelationSyncCache entry and using hashvalue of pg_namespace.oid
2011 * passed to the callback.
2012 */
2015 (Datum) 0);
2016
2018}
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition dynahash.c:358
#define HASH_CONTEXT
Definition hsearch.h:102
#define HASH_ELEM
Definition hsearch.h:95
#define HASH_BLOBS
Definition hsearch.h:97
void CacheRegisterSyscacheCallback(SysCacheIdentifier cacheid, SyscacheCallbackFunction func, Datum arg)
Definition inval.c:1816
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
Definition inval.c:1858
static void rel_sync_cache_relation_cb(Datum arg, Oid relid)
Definition pgoutput.c:2418
static void rel_sync_cache_publication_cb(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
Definition pgoutput.c:2468
uint64_t Datum
Definition postgres.h:70
tree ctl
Definition radixtree.h:1838
Size keysize
Definition hsearch.h:75

References Assert, CacheRegisterRelcacheCallback(), CacheRegisterSyscacheCallback(), ctl, fb(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::keysize, rel_sync_cache_publication_cb(), rel_sync_cache_relation_cb(), and RelationSyncCache.

Referenced by pgoutput_startup().

◆ init_tuple_slot()

static void init_tuple_slot ( PGOutputData data,
Relation  relation,
RelationSyncEntry entry 
)
static

Definition at line 1211 of file pgoutput.c.

1213{
1217
1218 oldctx = MemoryContextSwitchTo(data->cachectx);
1219
1220 /*
1221 * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1222 * live as long as the cache remains.
1223 */
1226
1229
1231
1232 /*
1233 * Cache the map that will be used to convert the relation's tuples into
1234 * the ancestor's format, if needed.
1235 */
1236 if (entry->publish_as_relid != RelationGetRelid(relation))
1237 {
1239 TupleDesc indesc = RelationGetDescr(relation);
1241
1242 /* Map must live as long as the logical decoding context. */
1243 oldctx = MemoryContextSwitchTo(data->cachectx);
1244
1245 entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
1246
1249 }
1250}
AttrMap * build_attrmap_by_name_if_req(TupleDesc indesc, TupleDesc outdesc, bool missing_ok)
Definition attmap.c:261
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps TTSOpsHeapTuple
Definition execTuples.c:85
void RelationClose(Relation relation)
Definition relcache.c:2209
TupleDesc CreateTupleDescCopyConstr(TupleDesc tupdesc)
Definition tupdesc.c:334

References RelationSyncEntry::attrmap, build_attrmap_by_name_if_req(), CreateTupleDescCopyConstr(), data, fb(), MakeSingleTupleTableSlot(), MemoryContextSwitchTo(), RelationSyncEntry::new_slot, RelationSyncEntry::old_slot, RelationSyncEntry::publish_as_relid, RelationClose(), RelationGetDescr, RelationGetRelid, RelationIdGetRelation(), and TTSOpsHeapTuple.

Referenced by get_rel_sync_entry().

◆ LoadPublications()

static List * LoadPublications ( List pubnames)
static

Definition at line 1802 of file pgoutput.c.

1803{
1804 List *result = NIL;
1805 ListCell *lc;
1806
1807 foreach(lc, pubnames)
1808 {
1809 char *pubname = (char *) lfirst(lc);
1810 Publication *pub = GetPublicationByName(pubname, true);
1811
1812 if (pub)
1813 result = lappend(result, pub);
1814 else
1817 errmsg("skipped loading publication \"%s\"", pubname),
1818 errdetail("The publication does not exist at this point in the WAL."),
1819 errhint("Create the publication if it does not exist."));
1820 }
1821
1822 return result;
1823}
int errhint(const char *fmt,...) pg_attribute_printf(1
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define WARNING
Definition elog.h:36
Publication * GetPublicationByName(const char *pubname, bool missing_ok)

References ereport, errcode(), errdetail(), errhint(), errmsg, fb(), GetPublicationByName(), lappend(), lfirst, NIL, and WARNING.

Referenced by get_rel_sync_entry().

◆ maybe_send_schema()

static void maybe_send_schema ( LogicalDecodingContext ctx,
ReorderBufferChange change,
Relation  relation,
RelationSyncEntry relentry 
)
static

Definition at line 727 of file pgoutput.c.

730{
732 bool schema_sent;
735
736 /*
737 * Remember XID of the (sub)transaction for the change. We don't care if
738 * it's top-level transaction or not (we have already sent that XID in
739 * start of the current streaming block).
740 *
741 * If we're not in a streaming block, just use InvalidTransactionId and
742 * the write methods will not include it.
743 */
744 if (data->in_streaming)
745 xid = change->txn->xid;
746
747 if (rbtxn_is_subtxn(change->txn))
748 topxid = rbtxn_get_toptxn(change->txn)->xid;
749 else
750 topxid = xid;
751
752 /*
753 * Do we need to send the schema? We do track streamed transactions
754 * separately, because those may be applied later (and the regular
755 * transactions won't see their effects until then) and in an order that
756 * we don't know at this point.
757 *
758 * XXX There is a scope of optimization here. Currently, we always send
759 * the schema first time in a streaming transaction but we can probably
760 * avoid that by checking 'relentry->schema_sent' flag. However, before
761 * doing that we need to study its impact on the case where we have a mix
762 * of streaming and non-streaming transactions.
763 */
764 if (data->in_streaming)
766 else
767 schema_sent = relentry->schema_sent;
768
769 /* Nothing to do if we already sent the schema. */
770 if (schema_sent)
771 return;
772
773 /*
774 * Send the schema. If the changes will be published using an ancestor's
775 * schema, not the relation's own, send that ancestor's schema before
776 * sending relation's own (XXX - maybe sending only the former suffices?).
777 */
778 if (relentry->publish_as_relid != RelationGetRelid(relation))
779 {
780 Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
781
784 }
785
786 send_relation_and_attrs(relation, xid, ctx, relentry);
787
788 if (data->in_streaming)
790 else
791 relentry->schema_sent = true;
792}
uint32 TransactionId
Definition c.h:720
static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx, RelationSyncEntry *relentry)
Definition pgoutput.c:798
static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition pgoutput.c:2034
static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition pgoutput.c:2024
#define rbtxn_get_toptxn(txn)
#define rbtxn_is_subtxn(txn)
void * output_plugin_private
Definition logical.h:76
struct ReorderBufferTXN * txn
TransactionId xid
#define InvalidTransactionId
Definition transam.h:31

References data, fb(), get_schema_sent_in_streamed_txn(), InvalidTransactionId, LogicalDecodingContext::output_plugin_private, rbtxn_get_toptxn, rbtxn_is_subtxn, RelationClose(), RelationGetRelid, RelationIdGetRelation(), send_relation_and_attrs(), set_schema_sent_in_streamed_txn(), ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by pgoutput_change(), and pgoutput_truncate().

◆ parse_output_parameters()

static void parse_output_parameters ( List options,
PGOutputData data 
)
static

Definition at line 290 of file pgoutput.c.

291{
292 ListCell *lc;
293 bool protocol_version_given = false;
294 bool publication_names_given = false;
295 bool binary_option_given = false;
296 bool messages_option_given = false;
297 bool streaming_given = false;
298 bool two_phase_option_given = false;
299 bool origin_option_given = false;
300
301 /* Initialize optional parameters to defaults */
302 data->binary = false;
303 data->streaming = LOGICALREP_STREAM_OFF;
304 data->messages = false;
305 data->two_phase = false;
306 data->publish_no_origin = false;
307
308 foreach(lc, options)
309 {
310 DefElem *defel = (DefElem *) lfirst(lc);
311
312 Assert(defel->arg == NULL || IsA(defel->arg, String));
313
314 /* Check each param, whether or not we recognize it */
315 if (strcmp(defel->defname, "proto_version") == 0)
316 {
317 unsigned long parsed;
318 char *endptr;
319
323 errmsg("conflicting or redundant options")));
325
326 errno = 0;
327 parsed = strtoul(strVal(defel->arg), &endptr, 10);
328 if (errno != 0 || *endptr != '\0')
331 errmsg("invalid proto_version")));
332
333 if (parsed > PG_UINT32_MAX)
336 errmsg("proto_version \"%s\" out of range",
337 strVal(defel->arg))));
338
339 data->protocol_version = (uint32) parsed;
340 }
341 else if (strcmp(defel->defname, "publication_names") == 0)
342 {
346 errmsg("conflicting or redundant options")));
348
349 /*
350 * Pass a copy of the DefElem->arg since SplitIdentifierString
351 * modifies its input.
352 */
353 if (!SplitIdentifierString(pstrdup(strVal(defel->arg)), ',',
354 &data->publication_names))
357 errmsg("invalid publication_names syntax")));
358 }
359 else if (strcmp(defel->defname, "binary") == 0)
360 {
364 errmsg("conflicting or redundant options")));
365 binary_option_given = true;
366
367 data->binary = defGetBoolean(defel);
368 }
369 else if (strcmp(defel->defname, "messages") == 0)
370 {
374 errmsg("conflicting or redundant options")));
376
377 data->messages = defGetBoolean(defel);
378 }
379 else if (strcmp(defel->defname, "streaming") == 0)
380 {
381 if (streaming_given)
384 errmsg("conflicting or redundant options")));
385 streaming_given = true;
386
387 data->streaming = defGetStreamingMode(defel);
388 }
389 else if (strcmp(defel->defname, "two_phase") == 0)
390 {
394 errmsg("conflicting or redundant options")));
396
397 data->two_phase = defGetBoolean(defel);
398 }
399 else if (strcmp(defel->defname, "origin") == 0)
400 {
401 char *origin;
402
406 errmsg("conflicting or redundant options"));
407 origin_option_given = true;
408
409 origin = defGetString(defel);
410 if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
411 data->publish_no_origin = true;
412 else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
413 data->publish_no_origin = false;
414 else
417 errmsg("unrecognized origin value: \"%s\"", origin));
418 }
419 else
420 elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
421 }
422
423 /* Check required options */
427 errmsg("option \"%s\" missing", "proto_version"));
431 errmsg("option \"%s\" missing", "publication_names"));
432}
#define PG_UINT32_MAX
Definition c.h:658
uint32_t uint32
Definition c.h:600
char * defGetString(DefElem *def)
Definition define.c:34
bool defGetBoolean(DefElem *def)
Definition define.c:93
#define elog(elevel,...)
Definition elog.h:226
char * pstrdup(const char *in)
Definition mcxt.c:1781
#define IsA(nodeptr, _type_)
Definition nodes.h:164
int pg_strcasecmp(const char *s1, const char *s2)
Definition value.h:64
char defGetStreamingMode(DefElem *def)
#define strVal(v)
Definition value.h:82
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition varlena.c:2777

References Assert, data, defGetBoolean(), defGetStreamingMode(), defGetString(), elog, ereport, errcode(), errmsg, ERROR, fb(), IsA, lfirst, pg_strcasecmp(), PG_UINT32_MAX, pstrdup(), SplitIdentifierString(), and strVal.

Referenced by pgoutput_startup().

◆ PG_MODULE_MAGIC_EXT()

PG_MODULE_MAGIC_EXT ( name = "pgoutput",
version = PG_VERSION 
)

◆ pgoutput_begin_prepare_txn()

static void pgoutput_begin_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 664 of file pgoutput.c.

665{
667
670
671 send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
673
674 OutputPluginWrite(ctx, true);
675}
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition logical.c:696
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition logical.c:683
#define InvalidReplOriginId
Definition origin.h:33
static void send_repl_origin(LogicalDecodingContext *ctx, ReplOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin)
Definition pgoutput.c:2495
void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
Definition proto.c:116
XLogRecPtr origin_lsn
ReplOriginId origin_id

References fb(), InvalidReplOriginId, logicalrep_write_begin_prepare(), ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and send_repl_origin().

Referenced by _PG_output_plugin_init().

◆ pgoutput_begin_txn()

static void pgoutput_begin_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 596 of file pgoutput.c.

597{
599 sizeof(PGOutputTxnData));
600
602}
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition mcxt.c:1266
MemoryContext context
Definition logical.h:36
void * output_plugin_private

References LogicalDecodingContext::context, fb(), MemoryContextAllocZero(), and ReorderBufferTXN::output_plugin_private.

Referenced by _PG_output_plugin_init().

◆ pgoutput_change()

static void pgoutput_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 1484 of file pgoutput.c.

1486{
1493 Relation targetrel = relation;
1495 TupleTableSlot *old_slot = NULL;
1496 TupleTableSlot *new_slot = NULL;
1497
1498 if (!is_publishable_relation(relation))
1499 return;
1500
1501 /*
1502 * Remember the xid for the change in streaming mode. We need to send xid
1503 * with each change in the streaming mode so that subscriber can make
1504 * their association and on aborts, it can discard the corresponding
1505 * changes.
1506 */
1507 if (data->in_streaming)
1508 xid = change->txn->xid;
1509
1510 relentry = get_rel_sync_entry(data, relation);
1511
1512 /* First check the table filter */
1513 switch (action)
1514 {
1516 if (!relentry->pubactions.pubinsert)
1517 return;
1518 break;
1520 if (!relentry->pubactions.pubupdate)
1521 return;
1522 break;
1524 if (!relentry->pubactions.pubdelete)
1525 return;
1526
1527 /*
1528 * This is only possible if deletes are allowed even when replica
1529 * identity is not defined for a table. Since the DELETE action
1530 * can't be published, we simply return.
1531 */
1532 if (!change->data.tp.oldtuple)
1533 {
1534 elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1535 return;
1536 }
1537 break;
1538 default:
1539 Assert(false);
1540 }
1541
1542 /* Avoid leaking memory by using and resetting our own context */
1543 old = MemoryContextSwitchTo(data->context);
1544
1545 /* Switch relation if publishing via root. */
1546 if (relentry->publish_as_relid != RelationGetRelid(relation))
1547 {
1548 Assert(relation->rd_rel->relispartition);
1549 ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1551 }
1552
1553 if (change->data.tp.oldtuple)
1554 {
1555 old_slot = relentry->old_slot;
1556 ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
1557
1558 /* Convert tuple if needed. */
1559 if (relentry->attrmap)
1560 {
1562 &TTSOpsVirtual, 0);
1563
1564 old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1565 }
1566 }
1567
1568 if (change->data.tp.newtuple)
1569 {
1570 new_slot = relentry->new_slot;
1571 ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
1572
1573 /* Convert tuple if needed. */
1574 if (relentry->attrmap)
1575 {
1577 &TTSOpsVirtual, 0);
1578
1579 new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1580 }
1581 }
1582
1583 /*
1584 * Check row filter.
1585 *
1586 * Updates could be transformed to inserts or deletes based on the results
1587 * of the row filter for old and new tuple.
1588 */
1589 if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1590 goto cleanup;
1591
1592 /*
1593 * Send BEGIN if we haven't yet.
1594 *
1595 * We send the BEGIN message after ensuring that we will actually send the
1596 * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1597 * transactions.
1598 */
1599 if (txndata && !txndata->sent_begin_txn)
1600 pgoutput_send_begin(ctx, txn);
1601
1602 /*
1603 * Schema should be sent using the original relation because it also sends
1604 * the ancestor's relation.
1605 */
1606 maybe_send_schema(ctx, change, relation, relentry);
1607
1608 OutputPluginPrepareWrite(ctx, true);
1609
1610 /* Send the data */
1611 switch (action)
1612 {
1614 logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1615 data->binary, relentry->columns,
1616 relentry->include_gencols_type);
1617 break;
1619 logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
1620 new_slot, data->binary, relentry->columns,
1621 relentry->include_gencols_type);
1622 break;
1624 logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
1625 data->binary, relentry->columns,
1626 relentry->include_gencols_type);
1627 break;
1628 default:
1629 Assert(false);
1630 }
1631
1632 OutputPluginWrite(ctx, true);
1633
1634cleanup:
1636 {
1638 ancestor = NULL;
1639 }
1640
1641 /* Drop the new slots that were used to store the converted tuples. */
1642 if (relentry->attrmap)
1643 {
1644 if (old_slot)
1646
1647 if (new_slot)
1649 }
1650
1652 MemoryContextReset(data->context);
1653}
static void cleanup(void)
Definition bootstrap.c:879
#define DEBUG1
Definition elog.h:30
const TupleTableSlotOps TTSOpsVirtual
Definition execTuples.c:84
TupleTableSlot * MakeTupleTableSlot(TupleDesc tupleDesc, const TupleTableSlotOps *tts_ops, uint16 flags)
TupleTableSlot * ExecStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
bool is_publishable_relation(Relation rel)
static void pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition pgoutput.c:610
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Relation relation)
Definition pgoutput.c:2055
static void maybe_send_schema(LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
Definition pgoutput.c:727
static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot, TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry, ReorderBufferChangeType *action)
Definition pgoutput.c:1303
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition proto.c:403
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition proto.c:528
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition proto.c:450
#define RelationIsValid(relation)
Definition rel.h:489
ReorderBufferChangeType
@ REORDER_BUFFER_CHANGE_INSERT
@ REORDER_BUFFER_CHANGE_DELETE
@ REORDER_BUFFER_CHANGE_UPDATE
struct ReorderBufferChange::@115::@116 tp
ReorderBufferChangeType action
union ReorderBufferChange::@115 data
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition tupconvert.c:193

References ReorderBufferChange::action, Assert, cleanup(), ReorderBufferChange::data, data, DEBUG1, elog, ExecDropSingleTupleTableSlot(), ExecStoreHeapTuple(), execute_attr_map_slot(), fb(), get_rel_sync_entry(), InvalidTransactionId, is_publishable_relation(), logicalrep_write_delete(), logicalrep_write_insert(), logicalrep_write_update(), MakeTupleTableSlot(), maybe_send_schema(), MemoryContextReset(), MemoryContextSwitchTo(), ReorderBufferChange::newtuple, ReorderBufferChange::oldtuple, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pgoutput_row_filter(), pgoutput_send_begin(), RelationData::rd_rel, RelationClose(), RelationGetDescr, RelationGetRelid, RelationIdGetRelation(), RelationIsValid, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::tp, TTSOpsVirtual, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_column_list_init()

static void pgoutput_column_list_init ( PGOutputData data,
List publications,
RelationSyncEntry entry 
)
static

Definition at line 1124 of file pgoutput.c.

1126{
1127 ListCell *lc;
1128 bool first = true;
1130 bool found_pub_collist = false;
1132
1134
1135 /*
1136 * Find if there are any column lists for this relation. If there are,
1137 * build a bitmap using the column lists.
1138 *
1139 * Multiple publications might have multiple column lists for this
1140 * relation.
1141 *
1142 * Note that we don't support the case where the column list is different
1143 * for the same table when combining publications. See comments atop
1144 * fetch_relation_list. But one can later change the publication so we
1145 * still need to check all the given publication-table mappings and report
1146 * an error if any publications have a different column list.
1147 */
1148 foreach(lc, publications)
1149 {
1150 Publication *pub = lfirst(lc);
1151 Bitmapset *cols = NULL;
1152
1153 /* Retrieve the bitmap of columns for a column list publication. */
1155 entry->publish_as_relid,
1156 entry->entry_cxt, &cols);
1157
1158 /*
1159 * For non-column list publications — e.g. TABLE (without a column
1160 * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
1161 * of the table (including generated columns when
1162 * 'publish_generated_columns' parameter is true).
1163 */
1164 if (!cols)
1165 {
1166 /*
1167 * Cache the table columns for the first publication with no
1168 * specified column list to detect publication with a different
1169 * column list.
1170 */
1171 if (!relcols && (list_length(publications) > 1))
1172 {
1174
1175 relcols = pub_form_cols_map(relation,
1176 entry->include_gencols_type);
1178 }
1179
1180 cols = relcols;
1181 }
1182
1183 if (first)
1184 {
1185 entry->columns = cols;
1186 first = false;
1187 }
1188 else if (!bms_equal(entry->columns, cols))
1189 ereport(ERROR,
1191 errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
1193 RelationGetRelationName(relation)));
1194 } /* loop all subscribed publications */
1195
1196 /*
1197 * If no column list publications exist, columns to be published will be
1198 * computed later according to the 'publish_generated_columns' parameter.
1199 */
1200 if (!found_pub_collist)
1201 entry->columns = NULL;
1202
1203 RelationClose(relation);
1204}
bool bms_equal(const Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:142
Bitmapset * pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type)
static void pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
Definition pgoutput.c:896

References bms_equal(), check_and_fetch_column_list(), RelationSyncEntry::columns, data, RelationSyncEntry::entry_cxt, ereport, errcode(), errmsg, ERROR, fb(), get_namespace_name(), RelationSyncEntry::include_gencols_type, lfirst, list_length(), MemoryContextSwitchTo(), pgoutput_ensure_entry_cxt(), pub_form_cols_map(), RelationSyncEntry::publish_as_relid, RelationClose(), RelationGetNamespace, RelationGetRelationName, and RelationIdGetRelation().

Referenced by get_rel_sync_entry().

◆ pgoutput_commit_prepared_txn()

static void pgoutput_commit_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 695 of file pgoutput.c.

697{
698 OutputPluginUpdateProgress(ctx, false);
699
700 OutputPluginPrepareWrite(ctx, true);
701 logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
702 OutputPluginWrite(ctx, true);
703}
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact)
Definition logical.c:709
void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition proto.c:237

References logicalrep_write_commit_prepared(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

◆ pgoutput_commit_txn()

static void pgoutput_commit_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 632 of file pgoutput.c.

634{
636 bool sent_begin_txn;
637
639
640 /*
641 * We don't need to send the commit message unless some relevant change
642 * from this transaction has been sent to the downstream.
643 */
644 sent_begin_txn = txndata->sent_begin_txn;
645 OutputPluginUpdateProgress(ctx, !sent_begin_txn);
646 pfree(txndata);
648
649 if (!sent_begin_txn)
650 {
651 elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
652 return;
653 }
654
655 OutputPluginPrepareWrite(ctx, true);
656 logicalrep_write_commit(ctx->out, txn, commit_lsn);
657 OutputPluginWrite(ctx, true);
658}
void pfree(void *pointer)
Definition mcxt.c:1616
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition proto.c:78

References Assert, DEBUG1, elog, fb(), logicalrep_write_commit(), LogicalDecodingContext::out, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), OutputPluginWrite(), pfree(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_ensure_entry_cxt()

static void pgoutput_ensure_entry_cxt ( PGOutputData data,
RelationSyncEntry entry 
)
static

Definition at line 896 of file pgoutput.c.

897{
898 Relation relation;
899
900 /* The context may already exist, in which case bail out. */
901 if (entry->entry_cxt)
902 return;
903
904 relation = RelationIdGetRelation(entry->publish_as_relid);
905
906 entry->entry_cxt = AllocSetContextCreate(data->cachectx,
907 "entry private context",
909
911 RelationGetRelationName(relation));
912}
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_SMALL_SIZES
Definition memutils.h:170
#define MemoryContextCopyAndSetIdentifier(cxt, id)
Definition memutils.h:101

References ALLOCSET_SMALL_SIZES, AllocSetContextCreate, data, RelationSyncEntry::entry_cxt, MemoryContextCopyAndSetIdentifier, RelationSyncEntry::publish_as_relid, RelationGetRelationName, and RelationIdGetRelation().

Referenced by pgoutput_column_list_init(), and pgoutput_row_filter_init().

◆ pgoutput_memory_context_reset()

static void pgoutput_memory_context_reset ( void arg)
static

Definition at line 438 of file pgoutput.c.

439{
441 {
444 }
445}
void hash_destroy(HTAB *hashp)
Definition dynahash.c:865

References fb(), hash_destroy(), and RelationSyncCache.

Referenced by pgoutput_shutdown(), and pgoutput_startup().

◆ pgoutput_message()

static void pgoutput_message ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  message_lsn,
bool  transactional,
const char prefix,
Size  sz,
const char message 
)
static

Definition at line 1724 of file pgoutput.c.

1727{
1730
1731 if (!data->messages)
1732 return;
1733
1734 /*
1735 * Remember the xid for the message in streaming mode. See
1736 * pgoutput_change.
1737 */
1738 if (data->in_streaming)
1739 xid = txn->xid;
1740
1741 /*
1742 * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1743 */
1744 if (transactional)
1745 {
1747
1748 /* Send BEGIN if we haven't yet */
1749 if (txndata && !txndata->sent_begin_txn)
1750 pgoutput_send_begin(ctx, txn);
1751 }
1752
1753 OutputPluginPrepareWrite(ctx, true);
1755 xid,
1757 transactional,
1758 prefix,
1759 sz,
1760 message);
1761 OutputPluginWrite(ctx, true);
1762}
void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition proto.c:640

References data, fb(), InvalidTransactionId, logicalrep_write_message(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pgoutput_send_begin(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_origin_filter()

static bool pgoutput_origin_filter ( LogicalDecodingContext ctx,
ReplOriginId  origin_id 
)
static

Definition at line 1769 of file pgoutput.c.

1771{
1773
1774 if (data->publish_no_origin && origin_id != InvalidReplOriginId)
1775 return true;
1776
1777 return false;
1778}

References data, InvalidReplOriginId, and LogicalDecodingContext::output_plugin_private.

Referenced by _PG_output_plugin_init().

◆ pgoutput_prepare_txn()

static void pgoutput_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 681 of file pgoutput.c.

683{
684 OutputPluginUpdateProgress(ctx, false);
685
686 OutputPluginPrepareWrite(ctx, true);
687 logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
688 OutputPluginWrite(ctx, true);
689}
void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition proto.c:187

References logicalrep_write_prepare(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

◆ pgoutput_rollback_prepared_txn()

static void pgoutput_rollback_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_end_lsn,
TimestampTz  prepare_time 
)
static

Definition at line 709 of file pgoutput.c.

713{
714 OutputPluginUpdateProgress(ctx, false);
715
716 OutputPluginPrepareWrite(ctx, true);
717 logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
718 prepare_time);
719 OutputPluginWrite(ctx, true);
720}
void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition proto.c:293

References logicalrep_write_rollback_prepared(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

◆ pgoutput_row_filter()

static bool pgoutput_row_filter ( Relation  relation,
TupleTableSlot old_slot,
TupleTableSlot **  new_slot_ptr,
RelationSyncEntry entry,
ReorderBufferChangeType action 
)
static

Definition at line 1303 of file pgoutput.c.

1306{
1307 TupleDesc desc;
1308 int i;
1309 bool old_matched,
1311 result;
1313 TupleTableSlot *new_slot = *new_slot_ptr;
1316
1317 /*
1318 * We need this map to avoid relying on ReorderBufferChangeType enums
1319 * having specific values.
1320 */
1321 static const int map_changetype_pubaction[] = {
1325 };
1326
1328 *action == REORDER_BUFFER_CHANGE_UPDATE ||
1329 *action == REORDER_BUFFER_CHANGE_DELETE);
1330
1331 Assert(new_slot || old_slot);
1332
1333 /* Get the corresponding row filter */
1335
1336 /* Bail out if there is no row filter */
1337 if (!filter_exprstate)
1338 return true;
1339
1340 elog(DEBUG3, "table \"%s.%s\" has row filter",
1342 RelationGetRelationName(relation));
1343
1345
1347
1348 /*
1349 * For the following occasions where there is only one tuple, we can
1350 * evaluate the row filter for that tuple and return.
1351 *
1352 * For inserts, we only have the new tuple.
1353 *
1354 * For updates, we can have only a new tuple when none of the replica
1355 * identity columns changed and none of those columns have external data
1356 * but we still need to evaluate the row filter for the new tuple as the
1357 * existing values of those columns might not match the filter. Also,
1358 * users can use constant expressions in the row filter, so we anyway need
1359 * to evaluate it for the new tuple.
1360 *
1361 * For deletes, we only have the old tuple.
1362 */
1363 if (!new_slot || !old_slot)
1364 {
1365 ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1367
1368 return result;
1369 }
1370
1371 /*
1372 * Both the old and new tuples must be valid only for updates and need to
1373 * be checked against the row filter.
1374 */
1376
1377 slot_getallattrs(new_slot);
1378 slot_getallattrs(old_slot);
1379
1381 desc = RelationGetDescr(relation);
1382
1383 /*
1384 * The new tuple might not have all the replica identity columns, in which
1385 * case it needs to be copied over from the old tuple.
1386 */
1387 for (i = 0; i < desc->natts; i++)
1388 {
1390
1391 /*
1392 * if the column in the new tuple or old tuple is null, nothing to do
1393 */
1394 if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1395 continue;
1396
1397 /*
1398 * Unchanged toasted replica identity columns are only logged in the
1399 * old tuple. Copy this over to the new tuple. The changed (or WAL
1400 * Logged) toast values are always assembled in memory and set as
1401 * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1402 */
1403 if (att->attlen == -1 &&
1406 {
1407 if (!tmp_new_slot)
1408 {
1411
1412 memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1413 desc->natts * sizeof(Datum));
1414 memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1415 desc->natts * sizeof(bool));
1416 }
1417
1418 tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1419 tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1420 }
1421 }
1422
1423 ecxt->ecxt_scantuple = old_slot;
1425
1426 if (tmp_new_slot)
1427 {
1429 ecxt->ecxt_scantuple = tmp_new_slot;
1430 }
1431 else
1432 ecxt->ecxt_scantuple = new_slot;
1433
1435
1436 /*
1437 * Case 1: if both tuples don't match the row filter, bailout. Send
1438 * nothing.
1439 */
1440 if (!old_matched && !new_matched)
1441 return false;
1442
1443 /*
1444 * Case 2: if the old tuple doesn't satisfy the row filter but the new
1445 * tuple does, transform the UPDATE into INSERT.
1446 *
1447 * Use the newly transformed tuple that must contain the column values for
1448 * all the replica identity columns. This is required to ensure that the
1449 * while inserting the tuple in the downstream node, we have all the
1450 * required column values.
1451 */
1452 if (!old_matched && new_matched)
1453 {
1455
1456 if (tmp_new_slot)
1458 }
1459
1460 /*
1461 * Case 3: if the old tuple satisfies the row filter but the new tuple
1462 * doesn't, transform the UPDATE into DELETE.
1463 *
1464 * This transformation does not require another tuple. The Old tuple will
1465 * be used for DELETE.
1466 */
1467 else if (old_matched && !new_matched)
1469
1470 /*
1471 * Case 4: if both tuples match the row filter, transformation isn't
1472 * required. (*action is default UPDATE).
1473 */
1474
1475 return true;
1476}
#define DEBUG3
Definition elog.h:28
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
#define ResetPerTupleExprContext(estate)
Definition executor.h:669
#define GetPerTupleExprContext(estate)
Definition executor.h:660
static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
Definition pgoutput.c:873
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:332
bool * tts_isnull
Definition tuptable.h:133
Datum * tts_values
Definition tuptable.h:131
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition tuptable.h:476
static void slot_getallattrs(TupleTableSlot *slot)
Definition tuptable.h:390
static bool VARATT_IS_EXTERNAL_ONDISK(const void *PTR)
Definition varatt.h:361

References Assert, DatumGetPointer(), DEBUG3, elog, RelationSyncEntry::estate, ExecClearTuple(), ExecStoreVirtualTuple(), RelationSyncEntry::exprstate, fb(), get_namespace_name(), GetPerTupleExprContext, i, MakeSingleTupleTableSlot(), TupleDescData::natts, pgoutput_row_filter_exec_expr(), PUBACTION_DELETE, PUBACTION_INSERT, PUBACTION_UPDATE, RelationGetDescr, RelationGetNamespace, RelationGetRelationName, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, ResetPerTupleExprContext, slot_getallattrs(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, TTSOpsVirtual, TupleDescCompactAttr(), and VARATT_IS_EXTERNAL_ONDISK().

Referenced by pgoutput_change().

◆ pgoutput_row_filter_exec_expr()

static bool pgoutput_row_filter_exec_expr ( ExprState state,
ExprContext econtext 
)
static

Definition at line 873 of file pgoutput.c.

874{
875 Datum ret;
876 bool isnull;
877
878 Assert(state != NULL);
879
880 ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
881
882 elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
883 isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
884 isnull ? "true" : "false");
885
886 if (isnull)
887 return false;
888
889 return DatumGetBool(ret);
890}
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
Definition executor.h:439
static bool DatumGetBool(Datum X)
Definition postgres.h:100

References Assert, DatumGetBool(), DEBUG3, elog, ExecEvalExprSwitchContext(), and fb().

Referenced by pgoutput_row_filter().

◆ pgoutput_row_filter_init()

static void pgoutput_row_filter_init ( PGOutputData data,
List publications,
RelationSyncEntry entry 
)
static

Definition at line 918 of file pgoutput.c.

920{
921 ListCell *lc;
922 List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
923 bool no_filter[] = {false, false, false}; /* One per pubaction */
925 int idx;
926 bool has_filter = true;
928
929 /*
930 * Find if there are any row filters for this relation. If there are, then
931 * prepare the necessary ExprState and cache it in entry->exprstate. To
932 * build an expression state, we need to ensure the following:
933 *
934 * All the given publication-table mappings must be checked.
935 *
936 * Multiple publications might have multiple row filters for this
937 * relation. Since row filter usage depends on the DML operation, there
938 * are multiple lists (one for each operation) to which row filters will
939 * be appended.
940 *
941 * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
942 * expression" so it takes precedence.
943 */
944 foreach(lc, publications)
945 {
946 Publication *pub = lfirst(lc);
948 Datum rfdatum = 0;
949 bool pub_no_filter = true;
950
951 /*
952 * If the publication is FOR ALL TABLES, or the publication includes a
953 * FOR TABLES IN SCHEMA where the table belongs to the referred
954 * schema, then it is treated the same as if there are no row filters
955 * (even if other publications have a row filter).
956 */
957 if (!pub->alltables &&
960 ObjectIdGetDatum(pub->oid)))
961 {
962 /*
963 * Check for the presence of a row filter in this publication.
964 */
967 ObjectIdGetDatum(pub->oid));
968
970 {
971 /* Null indicates no filter. */
975 }
976 }
977
978 if (pub_no_filter)
979 {
980 if (rftuple)
982
986
987 /*
988 * Quick exit if all the DML actions are publicized via this
989 * publication.
990 */
994 {
995 has_filter = false;
996 break;
997 }
998
999 /* No additional work for this publication. Next one. */
1000 continue;
1001 }
1002
1003 /* Form the per pubaction row filter lists. */
1013
1015 } /* loop all subscribed publications */
1016
1017 /* Clean the row filter */
1018 for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
1019 {
1020 if (no_filter[idx])
1021 {
1023 rfnodes[idx] = NIL;
1024 }
1025 }
1026
1027 if (has_filter)
1028 {
1030
1032
1033 /*
1034 * Now all the filters for all pubactions are known. Combine them when
1035 * their pubactions are the same.
1036 */
1038 entry->estate = create_estate_for_relation(relation);
1039 for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
1040 {
1041 List *filters = NIL;
1042 Expr *rfnode;
1043
1044 if (rfnodes[idx] == NIL)
1045 continue;
1046
1047 foreach(lc, rfnodes[idx])
1049
1050 /* combine the row filter and cache the ExprState */
1052 entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
1053 } /* for each pubaction */
1055
1056 RelationClose(relation);
1057 }
1058}
Datum idx(PG_FUNCTION_ARGS)
Definition _int_op.c:262
#define TextDatumGetCString(d)
Definition builtins.h:99
ExprState * ExecPrepareExpr(Expr *node, EState *estate)
Definition execExpr.c:786
#define HeapTupleIsValid(tuple)
Definition htup.h:78
void list_free_deep(List *list)
Definition list.c:1560
Expr * make_orclause(List *orclauses)
Definition makefuncs.c:743
#define NUM_ROWFILTER_PUBACTIONS
Definition pgoutput.c:106
static EState * create_estate_for_relation(Relation rel)
Definition pgoutput.c:842
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
void * stringToNode(const char *str)
Definition read.c:90
Node * expand_generated_columns_in_expr(Node *node, Relation rel, int rt_index)
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:264
HeapTuple SearchSysCache2(SysCacheIdentifier cacheId, Datum key1, Datum key2)
Definition syscache.c:230
Datum SysCacheGetAttr(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition syscache.c:595
#define SearchSysCacheExists2(cacheId, key1, key2)
Definition syscache.h:102

References Publication::alltables, create_estate_for_relation(), data, RelationSyncEntry::entry_cxt, RelationSyncEntry::estate, ExecPrepareExpr(), expand_generated_columns_in_expr(), RelationSyncEntry::exprstate, fb(), get_rel_namespace(), HeapTupleIsValid, idx(), lappend(), lfirst, list_free_deep(), make_orclause(), MemoryContextSwitchTo(), NIL, NUM_ROWFILTER_PUBACTIONS, ObjectIdGetDatum(), Publication::oid, pgoutput_ensure_entry_cxt(), PUBACTION_DELETE, PUBACTION_INSERT, PUBACTION_UPDATE, Publication::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, RelationSyncEntry::publish_as_relid, PublicationActions::pubupdate, RelationClose(), RelationIdGetRelation(), ReleaseSysCache(), SearchSysCache2(), SearchSysCacheExists2, stringToNode(), SysCacheGetAttr(), and TextDatumGetCString.

Referenced by get_rel_sync_entry().

◆ pgoutput_send_begin()

◆ pgoutput_shutdown()

static void pgoutput_shutdown ( LogicalDecodingContext ctx)
static

Definition at line 1788 of file pgoutput.c.

1789{
1791}
static void pgoutput_memory_context_reset(void *arg)
Definition pgoutput.c:438

References fb(), and pgoutput_memory_context_reset().

Referenced by _PG_output_plugin_init().

◆ pgoutput_startup()

static void pgoutput_startup ( LogicalDecodingContext ctx,
OutputPluginOptions opt,
bool  is_init 
)
static

Definition at line 451 of file pgoutput.c.

453{
455 static bool publication_callback_registered = false;
457
458 /* Create our memory context for private allocations. */
459 data->context = AllocSetContextCreate(ctx->context,
460 "logical replication output context",
462
463 data->cachectx = AllocSetContextCreate(ctx->context,
464 "logical replication cache context",
466
467 data->pubctx = AllocSetContextCreate(ctx->context,
468 "logical replication publication list context",
470
471 /*
472 * Ensure to cleanup RelationSyncCache even when logical decoding invoked
473 * via SQL interface ends up with an error.
474 */
478
480
481 /* This plugin uses binary protocol. */
483
484 /*
485 * This is replication start and not slot initialization.
486 *
487 * Parse and validate options passed by the client.
488 */
489 if (!is_init)
490 {
491 /* Parse the params and ERROR if we see any we don't recognize */
493
494 /* Check if we support requested protocol */
495 if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
498 errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
499 data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
500
501 if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
504 errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
505 data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
506
507 /*
508 * Decide whether to enable streaming. It is disabled by default, in
509 * which case we just update the flag in decoding context. Otherwise
510 * we only allow it with sufficient version of the protocol, and when
511 * the output plugin supports it.
512 */
513 if (data->streaming == LOGICALREP_STREAM_OFF)
514 ctx->streaming = false;
515 else if (data->streaming == LOGICALREP_STREAM_ON &&
516 data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
519 errmsg("requested proto_version=%d does not support streaming, need %d or higher",
520 data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
521 else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
525 errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
527 else if (!ctx->streaming)
530 errmsg("streaming requested, but not supported by output plugin")));
531
532 /*
533 * Here, we just check whether the two-phase option is passed by
534 * plugin and decide whether to enable it at later point of time. It
535 * remains enabled if the previous start-up has done so. But we only
536 * allow the option to be passed in with sufficient version of the
537 * protocol, and when the output plugin supports it.
538 */
539 if (!data->two_phase)
540 ctx->twophase_opt_given = false;
541 else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
544 errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
545 data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
546 else if (!ctx->twophase)
549 errmsg("two-phase commit requested, but not supported by output plugin")));
550 else
551 ctx->twophase_opt_given = true;
552
553 /* Init publication state. */
554 data->publications = NIL;
555 publications_valid = false;
556
557 /*
558 * Register callback for pg_publication if we didn't already do that
559 * during some previous call in this process.
560 */
562 {
565 (Datum) 0);
567 (Datum) 0);
569 }
570
571 /* Initialize relation schema cache. */
573 }
574 else
575 {
576 /*
577 * Disable the streaming and prepared transactions during the slot
578 * initialization mode.
579 */
580 ctx->streaming = false;
581 ctx->twophase = false;
582 }
583}
#define palloc0_object(type)
Definition fe_memutils.h:75
void CacheRegisterRelSyncCallback(RelSyncCallbackFunction func, Datum arg)
Definition inval.c:1879
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
#define LOGICALREP_PROTO_MIN_VERSION_NUM
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
#define LOGICALREP_PROTO_MAX_VERSION_NUM
void MemoryContextRegisterResetCallback(MemoryContext context, MemoryContextCallback *cb)
Definition mcxt.c:582
MemoryContext CacheMemoryContext
Definition mcxt.c:169
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
@ OUTPUT_PLUGIN_BINARY_OUTPUT
static void parse_output_parameters(List *options, PGOutputData *data)
Definition pgoutput.c:290
static void init_rel_sync_cache(MemoryContext cachectx)
Definition pgoutput.c:1975
static void publication_invalidation_cb(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
Definition pgoutput.c:1831
List * output_plugin_options
Definition logical.h:59
OutputPluginOutputType output_type

References ALLOCSET_DEFAULT_SIZES, ALLOCSET_SMALL_SIZES, AllocSetContextCreate, CacheMemoryContext, CacheRegisterRelSyncCallback(), CacheRegisterSyscacheCallback(), LogicalDecodingContext::context, data, ereport, errcode(), errmsg, ERROR, fb(), init_rel_sync_cache(), LOGICALREP_PROTO_MAX_VERSION_NUM, LOGICALREP_PROTO_MIN_VERSION_NUM, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM, LOGICALREP_PROTO_STREAM_VERSION_NUM, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM, MemoryContextRegisterResetCallback(), NIL, OUTPUT_PLUGIN_BINARY_OUTPUT, LogicalDecodingContext::output_plugin_options, LogicalDecodingContext::output_plugin_private, OutputPluginOptions::output_type, palloc0_object, parse_output_parameters(), pgoutput_memory_context_reset(), publication_invalidation_cb(), publications_valid, rel_sync_cache_relation_cb(), LogicalDecodingContext::streaming, LogicalDecodingContext::twophase, and LogicalDecodingContext::twophase_opt_given.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_abort()

static void pgoutput_stream_abort ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  abort_lsn 
)
static

Definition at line 1894 of file pgoutput.c.

1897{
1898 ReorderBufferTXN *toptxn;
1900 bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
1901
1902 /*
1903 * The abort should happen outside streaming block, even for streamed
1904 * transactions. The transaction has to be marked as streamed, though.
1905 */
1906 Assert(!data->in_streaming);
1907
1908 /* determine the toplevel transaction */
1909 toptxn = rbtxn_get_toptxn(txn);
1910
1911 Assert(rbtxn_is_streamed(toptxn));
1912
1913 OutputPluginPrepareWrite(ctx, true);
1914 logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
1916
1917 OutputPluginWrite(ctx, true);
1918
1919 cleanup_rel_sync_cache(toptxn->xid, false);
1920}
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
Definition pgoutput.c:2383
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid, XLogRecPtr abort_lsn, TimestampTz abort_time, bool write_abort_info)
Definition proto.c:1161
#define rbtxn_is_streamed(txn)
TimestampTz abort_time

References ReorderBufferTXN::abort_time, Assert, cleanup_rel_sync_cache(), data, fb(), logicalrep_write_stream_abort(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), rbtxn_get_toptxn, rbtxn_is_streamed, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_commit()

static void pgoutput_stream_commit ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 1927 of file pgoutput.c.

1930{
1932
1933 /*
1934 * The commit should happen outside streaming block, even for streamed
1935 * transactions. The transaction has to be marked as streamed, though.
1936 */
1937 Assert(!data->in_streaming);
1939
1940 OutputPluginUpdateProgress(ctx, false);
1941
1942 OutputPluginPrepareWrite(ctx, true);
1943 logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1944 OutputPluginWrite(ctx, true);
1945
1946 cleanup_rel_sync_cache(txn->xid, true);
1947}
#define PG_USED_FOR_ASSERTS_ONLY
Definition c.h:243
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition proto.c:1107

References Assert, cleanup_rel_sync_cache(), data, logicalrep_write_stream_commit(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), OutputPluginWrite(), PG_USED_FOR_ASSERTS_ONLY, rbtxn_is_streamed, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_prepare_txn()

static void pgoutput_stream_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 1955 of file pgoutput.c.

1958{
1960
1961 OutputPluginUpdateProgress(ctx, false);
1962 OutputPluginPrepareWrite(ctx, true);
1963 logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1964 OutputPluginWrite(ctx, true);
1965}
void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition proto.c:353

References Assert, logicalrep_write_stream_prepare(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), OutputPluginWrite(), and rbtxn_is_streamed.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_start()

static void pgoutput_stream_start ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 1841 of file pgoutput.c.

1843{
1846
1847 /* we can't nest streaming of transactions */
1848 Assert(!data->in_streaming);
1849
1850 /*
1851 * If we already sent the first stream for this transaction then don't
1852 * send the origin id in the subsequent streams.
1853 */
1854 if (rbtxn_is_streamed(txn))
1856
1859
1862
1863 OutputPluginWrite(ctx, true);
1864
1865 /* we're streaming a chunk of transaction now */
1866 data->in_streaming = true;
1867}
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
Definition proto.c:1064
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References Assert, data, fb(), InvalidReplOriginId, InvalidXLogRecPtr, logicalrep_write_stream_start(), ReorderBufferTXN::origin_id, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), rbtxn_is_streamed, send_repl_origin(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_stop()

static void pgoutput_stream_stop ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 1873 of file pgoutput.c.

1875{
1877
1878 /* we should be streaming a transaction */
1879 Assert(data->in_streaming);
1880
1881 OutputPluginPrepareWrite(ctx, true);
1883 OutputPluginWrite(ctx, true);
1884
1885 /* we've stopped streaming a transaction */
1886 data->in_streaming = false;
1887}
void logicalrep_write_stream_stop(StringInfo out)
Definition proto.c:1098

References Assert, data, logicalrep_write_stream_stop(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

◆ pgoutput_truncate()

static void pgoutput_truncate ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
int  nrelations,
Relation  relations[],
ReorderBufferChange change 
)
static

Definition at line 1656 of file pgoutput.c.

1658{
1663 int i;
1664 int nrelids;
1665 Oid *relids;
1667
1668 /* Remember the xid for the change in streaming mode. See pgoutput_change. */
1669 if (data->in_streaming)
1670 xid = change->txn->xid;
1671
1672 old = MemoryContextSwitchTo(data->context);
1673
1674 relids = palloc0(nrelations * sizeof(Oid));
1675 nrelids = 0;
1676
1677 for (i = 0; i < nrelations; i++)
1678 {
1679 Relation relation = relations[i];
1680 Oid relid = RelationGetRelid(relation);
1681
1682 if (!is_publishable_relation(relation))
1683 continue;
1684
1685 relentry = get_rel_sync_entry(data, relation);
1686
1687 if (!relentry->pubactions.pubtruncate)
1688 continue;
1689
1690 /*
1691 * Don't send partitions if the publication wants to send only the
1692 * root tables through it.
1693 */
1694 if (relation->rd_rel->relispartition &&
1695 relentry->publish_as_relid != relid)
1696 continue;
1697
1698 relids[nrelids++] = relid;
1699
1700 /* Send BEGIN if we haven't yet */
1701 if (txndata && !txndata->sent_begin_txn)
1702 pgoutput_send_begin(ctx, txn);
1703
1704 maybe_send_schema(ctx, change, relation, relentry);
1705 }
1706
1707 if (nrelids > 0)
1708 {
1709 OutputPluginPrepareWrite(ctx, true);
1711 xid,
1712 nrelids,
1713 relids,
1714 change->data.truncate.cascade,
1715 change->data.truncate.restart_seqs);
1716 OutputPluginWrite(ctx, true);
1717 }
1718
1720 MemoryContextReset(data->context);
1721}
void * palloc0(Size size)
Definition mcxt.c:1417
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition proto.c:583
struct ReorderBufferChange::@115::@117 truncate

References ReorderBufferChange::cascade, ReorderBufferChange::data, data, fb(), get_rel_sync_entry(), i, InvalidTransactionId, is_publishable_relation(), logicalrep_write_truncate(), maybe_send_schema(), MemoryContextReset(), MemoryContextSwitchTo(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), palloc0(), pgoutput_send_begin(), RelationData::rd_rel, RelationGetRelid, ReorderBufferChange::restart_seqs, ReorderBufferChange::truncate, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ publication_invalidation_cb()

static void publication_invalidation_cb ( Datum  arg,
SysCacheIdentifier  cacheid,
uint32  hashvalue 
)
static

Definition at line 1831 of file pgoutput.c.

1833{
1834 publications_valid = false;
1835}

References publications_valid.

Referenced by pgoutput_startup().

◆ rel_sync_cache_publication_cb()

static void rel_sync_cache_publication_cb ( Datum  arg,
SysCacheIdentifier  cacheid,
uint32  hashvalue 
)
static

Definition at line 2468 of file pgoutput.c.

2470{
2471 HASH_SEQ_STATUS status;
2472 RelationSyncEntry *entry;
2473
2474 /*
2475 * We can get here if the plugin was used in SQL interface as the
2476 * RelationSyncCache is destroyed when the decoding finishes, but there is
2477 * no way to unregister the invalidation callbacks.
2478 */
2479 if (RelationSyncCache == NULL)
2480 return;
2481
2482 /*
2483 * We have no easy way to identify which cache entries this invalidation
2484 * event might have affected, so just mark them all invalid.
2485 */
2487 while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2488 {
2489 entry->replicate_valid = false;
2490 }
2491}

References fb(), hash_seq_init(), hash_seq_search(), RelationSyncCache, and RelationSyncEntry::replicate_valid.

Referenced by init_rel_sync_cache().

◆ rel_sync_cache_relation_cb()

static void rel_sync_cache_relation_cb ( Datum  arg,
Oid  relid 
)
static

Definition at line 2418 of file pgoutput.c.

2419{
2420 RelationSyncEntry *entry;
2421
2422 /*
2423 * We can get here if the plugin was used in SQL interface as the
2424 * RelationSyncCache is destroyed when the decoding finishes, but there is
2425 * no way to unregister the relcache invalidation callback.
2426 */
2427 if (RelationSyncCache == NULL)
2428 return;
2429
2430 /*
2431 * Nobody keeps pointers to entries in this hash table around outside
2432 * logical decoding callback calls - but invalidation events can come in
2433 * *during* a callback if we do any syscache access in the callback.
2434 * Because of that we must mark the cache entry as invalid but not damage
2435 * any of its substructure here. The next get_rel_sync_entry() call will
2436 * rebuild it all.
2437 */
2438 if (OidIsValid(relid))
2439 {
2440 /*
2441 * Getting invalidations for relations that aren't in the table is
2442 * entirely normal. So we don't care if it's found or not.
2443 */
2445 HASH_FIND, NULL);
2446 if (entry != NULL)
2447 entry->replicate_valid = false;
2448 }
2449 else
2450 {
2451 /* Whole cache must be flushed. */
2452 HASH_SEQ_STATUS status;
2453
2455 while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2456 {
2457 entry->replicate_valid = false;
2458 }
2459 }
2460}
#define OidIsValid(objectId)
Definition c.h:842
@ HASH_FIND
Definition hsearch.h:113

References fb(), HASH_FIND, hash_search(), hash_seq_init(), hash_seq_search(), OidIsValid, RelationSyncCache, and RelationSyncEntry::replicate_valid.

Referenced by init_rel_sync_cache(), and pgoutput_startup().

◆ send_relation_and_attrs()

static void send_relation_and_attrs ( Relation  relation,
TransactionId  xid,
LogicalDecodingContext ctx,
RelationSyncEntry relentry 
)
static

Definition at line 798 of file pgoutput.c.

801{
802 TupleDesc desc = RelationGetDescr(relation);
803 Bitmapset *columns = relentry->columns;
804 PublishGencolsType include_gencols_type = relentry->include_gencols_type;
805 int i;
806
807 /*
808 * Write out type info if needed. We do that only for user-created types.
809 * We use FirstGenbkiObjectId as the cutoff, so that we only consider
810 * objects with hand-assigned OIDs to be "built in", not for instance any
811 * function or type defined in the information_schema. This is important
812 * because only hand-assigned OIDs can be expected to remain stable across
813 * major versions.
814 */
815 for (i = 0; i < desc->natts; i++)
816 {
818
820 include_gencols_type))
821 continue;
822
823 if (att->atttypid < FirstGenbkiObjectId)
824 continue;
825
826 OutputPluginPrepareWrite(ctx, false);
827 logicalrep_write_typ(ctx->out, xid, att->atttypid);
828 OutputPluginWrite(ctx, false);
829 }
830
831 OutputPluginPrepareWrite(ctx, false);
832 logicalrep_write_rel(ctx->out, xid, relation, columns,
833 include_gencols_type);
834 OutputPluginWrite(ctx, false);
835}
FormData_pg_attribute * Form_pg_attribute
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition proto.c:667
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
Definition proto.c:726
bool logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition proto.c:1282
#define FirstGenbkiObjectId
Definition transam.h:195
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition tupdesc.h:178

References fb(), FirstGenbkiObjectId, i, logicalrep_should_publish_column(), logicalrep_write_rel(), logicalrep_write_typ(), TupleDescData::natts, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), RelationGetDescr, and TupleDescAttr().

Referenced by maybe_send_schema().

◆ send_repl_origin()

static void send_repl_origin ( LogicalDecodingContext ctx,
ReplOriginId  origin_id,
XLogRecPtr  origin_lsn,
bool  send_origin 
)
static

Definition at line 2495 of file pgoutput.c.

2497{
2498 if (send_origin)
2499 {
2500 char *origin;
2501
2502 /*----------
2503 * XXX: which behaviour do we want here?
2504 *
2505 * Alternatives:
2506 * - don't send origin message if origin name not found
2507 * (that's what we do now)
2508 * - throw error - that will break replication, not good
2509 * - send some special "unknown" origin
2510 *----------
2511 */
2512 if (replorigin_by_oid(origin_id, true, &origin))
2513 {
2514 /* Message boundary */
2515 OutputPluginWrite(ctx, false);
2516 OutputPluginPrepareWrite(ctx, true);
2517
2518 logicalrep_write_origin(ctx->out, origin, origin_lsn);
2519 }
2520 }
2521}
bool replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname)
Definition origin.c:502
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition proto.c:374

References fb(), logicalrep_write_origin(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and replorigin_by_oid().

Referenced by pgoutput_begin_prepare_txn(), pgoutput_send_begin(), and pgoutput_stream_start().

◆ set_schema_sent_in_streamed_txn()

static void set_schema_sent_in_streamed_txn ( RelationSyncEntry entry,
TransactionId  xid 
)
static

Definition at line 2034 of file pgoutput.c.

2035{
2037
2039
2040 entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
2041
2043}
List * lappend_xid(List *list, TransactionId datum)
Definition list.c:393

References CacheMemoryContext, fb(), lappend_xid(), MemoryContextSwitchTo(), and RelationSyncEntry::streamed_txns.

Referenced by maybe_send_schema().

Variable Documentation

◆ publications_valid

bool publications_valid
static

Definition at line 86 of file pgoutput.c.

Referenced by get_rel_sync_entry(), pgoutput_startup(), and publication_invalidation_cb().

◆ RelationSyncCache