PostgreSQL Source Code  git master
pgoutput.c File Reference
Include dependency graph for pgoutput.c:

Go to the source code of this file.

Data Structures

struct  RelationSyncEntry
 
struct  PGOutputTxnData
 

Macros

#define NUM_ROWFILTER_PUBACTIONS   (PUBACTION_DELETE+1)
 

Typedefs

typedef struct RelationSyncEntry RelationSyncEntry
 
typedef struct PGOutputTxnData PGOutputTxnData
 

Enumerations

enum  RowFilterPubAction { PUBACTION_INSERT , PUBACTION_UPDATE , PUBACTION_DELETE }
 

Functions

static void pgoutput_startup (LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
 
static void pgoutput_shutdown (LogicalDecodingContext *ctx)
 
static void pgoutput_begin_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_commit_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void pgoutput_truncate (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
static void pgoutput_message (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
static bool pgoutput_origin_filter (LogicalDecodingContext *ctx, RepOriginId origin_id)
 
static void pgoutput_begin_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static void pgoutput_commit_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_rollback_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
static void pgoutput_stream_start (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_stream_stop (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_stream_abort (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
 
static void pgoutput_stream_commit (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_stream_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static ListLoadPublications (List *pubnames)
 
static void publication_invalidation_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void send_repl_origin (LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin)
 
static void init_rel_sync_cache (MemoryContext cachectx)
 
static void cleanup_rel_sync_cache (TransactionId xid, bool is_commit)
 
static RelationSyncEntryget_rel_sync_entry (PGOutputData *data, Relation relation)
 
static void send_relation_and_attrs (Relation relation, TransactionId xid, LogicalDecodingContext *ctx, RelationSyncEntry *relentry)
 
static void rel_sync_cache_relation_cb (Datum arg, Oid relid)
 
static void rel_sync_cache_publication_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void set_schema_sent_in_streamed_txn (RelationSyncEntry *entry, TransactionId xid)
 
static bool get_schema_sent_in_streamed_txn (RelationSyncEntry *entry, TransactionId xid)
 
static void init_tuple_slot (PGOutputData *data, Relation relation, RelationSyncEntry *entry)
 
static EStatecreate_estate_for_relation (Relation rel)
 
static void pgoutput_row_filter_init (PGOutputData *data, List *publications, RelationSyncEntry *entry)
 
static bool pgoutput_row_filter_exec_expr (ExprState *state, ExprContext *econtext)
 
static bool pgoutput_row_filter (Relation relation, TupleTableSlot *old_slot, TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry, ReorderBufferChangeType *action)
 
static void pgoutput_column_list_init (PGOutputData *data, List *publications, RelationSyncEntry *entry)
 
void _PG_output_plugin_init (OutputPluginCallbacks *cb)
 
static void parse_output_parameters (List *options, PGOutputData *data)
 
static void pgoutput_send_begin (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void maybe_send_schema (LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
 
static void pgoutput_ensure_entry_cxt (PGOutputData *data, RelationSyncEntry *entry)
 
static void check_and_init_gencol (PGOutputData *data, List *publications, RelationSyncEntry *entry)
 

Variables

 PG_MODULE_MAGIC
 
static bool publications_valid
 
static HTABRelationSyncCache = NULL
 

Macro Definition Documentation

◆ NUM_ROWFILTER_PUBACTIONS

#define NUM_ROWFILTER_PUBACTIONS   (PUBACTION_DELETE+1)

Definition at line 102 of file pgoutput.c.

Typedef Documentation

◆ PGOutputTxnData

◆ RelationSyncEntry

Enumeration Type Documentation

◆ RowFilterPubAction

Enumerator
PUBACTION_INSERT 
PUBACTION_UPDATE 
PUBACTION_DELETE 

Definition at line 95 of file pgoutput.c.

96 {
100 };
@ PUBACTION_INSERT
Definition: pgoutput.c:97
@ PUBACTION_UPDATE
Definition: pgoutput.c:98
@ PUBACTION_DELETE
Definition: pgoutput.c:99

Function Documentation

◆ _PG_output_plugin_init()

void _PG_output_plugin_init ( OutputPluginCallbacks cb)

Definition at line 253 of file pgoutput.c.

254 {
261 
268 
269  /* transaction streaming */
277  /* transaction streaming - two-phase commit */
279 }
static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
Definition: pgoutput.c:1438
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:622
static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: pgoutput.c:424
static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Definition: pgoutput.c:1610
static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: pgoutput.c:639
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
Definition: pgoutput.c:1723
static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: pgoutput.c:667
static void pgoutput_shutdown(LogicalDecodingContext *ctx)
Definition: pgoutput.c:1742
static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
Definition: pgoutput.c:1845
static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: pgoutput.c:1906
static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:554
static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:1878
static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:653
static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:1824
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:1792
static void pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: pgoutput.c:1678
static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:590
LogicalDecodeStreamChangeCB stream_change_cb
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
LogicalDecodeStreamMessageCB stream_message_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeTruncateCB truncate_cb
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
LogicalDecodeStreamPrepareCB stream_prepare_cb
LogicalDecodeCommitPreparedCB commit_prepared_cb
LogicalDecodeStreamStartCB stream_start_cb
LogicalDecodePrepareCB prepare_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeCommitCB commit_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamAbortCB stream_abort_cb
LogicalDecodeBeginPrepareCB begin_prepare_cb
LogicalDecodeChangeCB change_cb
LogicalDecodeShutdownCB shutdown_cb

References OutputPluginCallbacks::begin_cb, OutputPluginCallbacks::begin_prepare_cb, OutputPluginCallbacks::change_cb, OutputPluginCallbacks::commit_cb, OutputPluginCallbacks::commit_prepared_cb, OutputPluginCallbacks::filter_by_origin_cb, OutputPluginCallbacks::message_cb, pgoutput_begin_prepare_txn(), pgoutput_begin_txn(), pgoutput_change(), pgoutput_commit_prepared_txn(), pgoutput_commit_txn(), pgoutput_message(), pgoutput_origin_filter(), pgoutput_prepare_txn(), pgoutput_rollback_prepared_txn(), pgoutput_shutdown(), pgoutput_startup(), pgoutput_stream_abort(), pgoutput_stream_commit(), pgoutput_stream_prepare_txn(), pgoutput_stream_start(), pgoutput_stream_stop(), pgoutput_truncate(), OutputPluginCallbacks::prepare_cb, OutputPluginCallbacks::rollback_prepared_cb, OutputPluginCallbacks::shutdown_cb, OutputPluginCallbacks::startup_cb, OutputPluginCallbacks::stream_abort_cb, OutputPluginCallbacks::stream_change_cb, OutputPluginCallbacks::stream_commit_cb, OutputPluginCallbacks::stream_message_cb, OutputPluginCallbacks::stream_prepare_cb, OutputPluginCallbacks::stream_start_cb, OutputPluginCallbacks::stream_stop_cb, OutputPluginCallbacks::stream_truncate_cb, and OutputPluginCallbacks::truncate_cb.

◆ check_and_init_gencol()

static void check_and_init_gencol ( PGOutputData data,
List publications,
RelationSyncEntry entry 
)
static

Definition at line 1020 of file pgoutput.c.

1022 {
1023  Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1024  TupleDesc desc = RelationGetDescr(relation);
1025  bool gencolpresent = false;
1026  bool first = true;
1027 
1028  /* Check if there is any generated column present. */
1029  for (int i = 0; i < desc->natts; i++)
1030  {
1031  Form_pg_attribute att = TupleDescAttr(desc, i);
1032 
1033  if (att->attgenerated)
1034  {
1035  gencolpresent = true;
1036  break;
1037  }
1038  }
1039 
1040  /* There are no generated columns to be published. */
1041  if (!gencolpresent)
1042  {
1043  entry->include_gencols = false;
1044  return;
1045  }
1046 
1047  /*
1048  * There may be a conflicting value for 'publish_generated_columns'
1049  * parameter in the publications.
1050  */
1051  foreach_ptr(Publication, pub, publications)
1052  {
1053  /*
1054  * The column list takes precedence over the
1055  * 'publish_generated_columns' parameter. Those will be checked later,
1056  * see pgoutput_column_list_init.
1057  */
1058  if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL))
1059  continue;
1060 
1061  if (first)
1062  {
1063  entry->include_gencols = pub->pubgencols;
1064  first = false;
1065  }
1066  else if (entry->include_gencols != pub->pubgencols)
1067  ereport(ERROR,
1068  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1069  errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
1071  RelationGetRelationName(relation)));
1072  }
1073 }
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
int i
Definition: isn.c:72
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3366
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:209
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
bool check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt, Bitmapset **cols)
#define RelationGetDescr(relation)
Definition: rel.h:531
#define RelationGetRelationName(relation)
Definition: rel.h:539
#define RelationGetNamespace(relation)
Definition: rel.h:546
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2061
bool include_gencols
Definition: pgoutput.c:134
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92

References check_and_fetch_column_list(), ereport, errcode(), errmsg(), ERROR, foreach_ptr, get_namespace_name(), i, RelationSyncEntry::include_gencols, TupleDescData::natts, RelationSyncEntry::publish_as_relid, RelationGetDescr, RelationGetNamespace, RelationGetRelationName, RelationIdGetRelation(), and TupleDescAttr.

Referenced by get_rel_sync_entry().

◆ cleanup_rel_sync_cache()

static void cleanup_rel_sync_cache ( TransactionId  xid,
bool  is_commit 
)
static

Definition at line 2309 of file pgoutput.c.

2310 {
2311  HASH_SEQ_STATUS hash_seq;
2312  RelationSyncEntry *entry;
2313 
2314  Assert(RelationSyncCache != NULL);
2315 
2316  hash_seq_init(&hash_seq, RelationSyncCache);
2317  while ((entry = hash_seq_search(&hash_seq)) != NULL)
2318  {
2319  /*
2320  * We can set the schema_sent flag for an entry that has committed xid
2321  * in the list as that ensures that the subscriber would have the
2322  * corresponding schema and we don't need to send it unless there is
2323  * any invalidation for that relation.
2324  */
2325  foreach_xid(streamed_txn, entry->streamed_txns)
2326  {
2327  if (xid == streamed_txn)
2328  {
2329  if (is_commit)
2330  entry->schema_sent = true;
2331 
2332  entry->streamed_txns =
2333  foreach_delete_current(entry->streamed_txns, streamed_txn);
2334  break;
2335  }
2336  }
2337  }
2338 }
#define Assert(condition)
Definition: c.h:812
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1420
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1385
#define foreach_delete_current(lst, var_or_cell)
Definition: pg_list.h:391
#define foreach_xid(var, lst)
Definition: pg_list.h:472
static HTAB * RelationSyncCache
Definition: pgoutput.c:213
List * streamed_txns
Definition: pgoutput.c:135

References Assert, foreach_delete_current, foreach_xid, hash_seq_init(), hash_seq_search(), RelationSyncCache, RelationSyncEntry::schema_sent, and RelationSyncEntry::streamed_txns.

Referenced by pgoutput_stream_abort(), and pgoutput_stream_commit().

◆ create_estate_for_relation()

static EState * create_estate_for_relation ( Relation  rel)
static

Definition at line 798 of file pgoutput.c.

799 {
800  EState *estate;
801  RangeTblEntry *rte;
802  List *perminfos = NIL;
803 
804  estate = CreateExecutorState();
805 
806  rte = makeNode(RangeTblEntry);
807  rte->rtekind = RTE_RELATION;
808  rte->relid = RelationGetRelid(rel);
809  rte->relkind = rel->rd_rel->relkind;
810  rte->rellockmode = AccessShareLock;
811 
812  addRTEPermissionInfo(&perminfos, rte);
813 
814  ExecInitRangeTable(estate, list_make1(rte), perminfos);
815 
816  estate->es_output_cid = GetCurrentCommandId(false);
817 
818  return estate;
819 }
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos)
Definition: execUtils.c:730
EState * CreateExecutorState(void)
Definition: execUtils.c:88
#define AccessShareLock
Definition: lockdefs.h:36
#define makeNode(_type_)
Definition: nodes.h:155
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
@ RTE_RELATION
Definition: parsenodes.h:1017
#define NIL
Definition: pg_list.h:68
#define list_make1(x1)
Definition: pg_list.h:212
#define RelationGetRelid(relation)
Definition: rel.h:505
CommandId es_output_cid
Definition: execnodes.h:647
Definition: pg_list.h:54
RTEKind rtekind
Definition: parsenodes.h:1047
Form_pg_class rd_rel
Definition: rel.h:111
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:828

References AccessShareLock, addRTEPermissionInfo(), CreateExecutorState(), EState::es_output_cid, ExecInitRangeTable(), GetCurrentCommandId(), list_make1, makeNode, NIL, RelationData::rd_rel, RelationGetRelid, RangeTblEntry::relid, RTE_RELATION, and RangeTblEntry::rtekind.

Referenced by pgoutput_row_filter_init().

◆ get_rel_sync_entry()

static RelationSyncEntry * get_rel_sync_entry ( PGOutputData data,
Relation  relation 
)
static

Definition at line 2011 of file pgoutput.c.

2012 {
2013  RelationSyncEntry *entry;
2014  bool found;
2015  MemoryContext oldctx;
2016  Oid relid = RelationGetRelid(relation);
2017 
2018  Assert(RelationSyncCache != NULL);
2019 
2020  /* Find cached relation info, creating if not found */
2022  &relid,
2023  HASH_ENTER, &found);
2024  Assert(entry != NULL);
2025 
2026  /* initialize entry, if it's new */
2027  if (!found)
2028  {
2029  entry->replicate_valid = false;
2030  entry->schema_sent = false;
2031  entry->include_gencols = false;
2032  entry->streamed_txns = NIL;
2033  entry->pubactions.pubinsert = entry->pubactions.pubupdate =
2034  entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
2035  entry->new_slot = NULL;
2036  entry->old_slot = NULL;
2037  memset(entry->exprstate, 0, sizeof(entry->exprstate));
2038  entry->entry_cxt = NULL;
2039  entry->publish_as_relid = InvalidOid;
2040  entry->columns = NULL;
2041  entry->attrmap = NULL;
2042  }
2043 
2044  /* Validate the entry */
2045  if (!entry->replicate_valid)
2046  {
2047  Oid schemaId = get_rel_namespace(relid);
2048  List *pubids = GetRelationPublications(relid);
2049 
2050  /*
2051  * We don't acquire a lock on the namespace system table as we build
2052  * the cache entry using a historic snapshot and all the later changes
2053  * are absorbed while decoding WAL.
2054  */
2055  List *schemaPubids = GetSchemaPublications(schemaId);
2056  ListCell *lc;
2057  Oid publish_as_relid = relid;
2058  int publish_ancestor_level = 0;
2059  bool am_partition = get_rel_relispartition(relid);
2060  char relkind = get_rel_relkind(relid);
2061  List *rel_publications = NIL;
2062 
2063  /* Reload publications if needed before use. */
2064  if (!publications_valid)
2065  {
2067  if (data->publications)
2068  {
2069  list_free_deep(data->publications);
2070  data->publications = NIL;
2071  }
2072  data->publications = LoadPublications(data->publication_names);
2073  MemoryContextSwitchTo(oldctx);
2074  publications_valid = true;
2075  }
2076 
2077  /*
2078  * Reset schema_sent status as the relation definition may have
2079  * changed. Also reset pubactions to empty in case rel was dropped
2080  * from a publication. Also free any objects that depended on the
2081  * earlier definition.
2082  */
2083  entry->schema_sent = false;
2084  entry->include_gencols = false;
2085  list_free(entry->streamed_txns);
2086  entry->streamed_txns = NIL;
2087  bms_free(entry->columns);
2088  entry->columns = NULL;
2089  entry->pubactions.pubinsert = false;
2090  entry->pubactions.pubupdate = false;
2091  entry->pubactions.pubdelete = false;
2092  entry->pubactions.pubtruncate = false;
2093 
2094  /*
2095  * Tuple slots cleanups. (Will be rebuilt later if needed).
2096  */
2097  if (entry->old_slot)
2098  {
2099  TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
2100 
2101  Assert(desc->tdrefcount == -1);
2102 
2104 
2105  /*
2106  * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2107  * do it now to avoid any leaks.
2108  */
2109  FreeTupleDesc(desc);
2110  }
2111  if (entry->new_slot)
2112  {
2113  TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
2114 
2115  Assert(desc->tdrefcount == -1);
2116 
2118 
2119  /*
2120  * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2121  * do it now to avoid any leaks.
2122  */
2123  FreeTupleDesc(desc);
2124  }
2125 
2126  entry->old_slot = NULL;
2127  entry->new_slot = NULL;
2128 
2129  if (entry->attrmap)
2130  free_attrmap(entry->attrmap);
2131  entry->attrmap = NULL;
2132 
2133  /*
2134  * Row filter cache cleanups.
2135  */
2136  if (entry->entry_cxt)
2138 
2139  entry->entry_cxt = NULL;
2140  entry->estate = NULL;
2141  memset(entry->exprstate, 0, sizeof(entry->exprstate));
2142 
2143  /*
2144  * Build publication cache. We can't use one provided by relcache as
2145  * relcache considers all publications that the given relation is in,
2146  * but here we only need to consider ones that the subscriber
2147  * requested.
2148  */
2149  foreach(lc, data->publications)
2150  {
2151  Publication *pub = lfirst(lc);
2152  bool publish = false;
2153 
2154  /*
2155  * Under what relid should we publish changes in this publication?
2156  * We'll use the top-most relid across all publications. Also
2157  * track the ancestor level for this publication.
2158  */
2159  Oid pub_relid = relid;
2160  int ancestor_level = 0;
2161 
2162  /*
2163  * If this is a FOR ALL TABLES publication, pick the partition
2164  * root and set the ancestor level accordingly.
2165  */
2166  if (pub->alltables)
2167  {
2168  publish = true;
2169  if (pub->pubviaroot && am_partition)
2170  {
2171  List *ancestors = get_partition_ancestors(relid);
2172 
2173  pub_relid = llast_oid(ancestors);
2174  ancestor_level = list_length(ancestors);
2175  }
2176  }
2177 
2178  if (!publish)
2179  {
2180  bool ancestor_published = false;
2181 
2182  /*
2183  * For a partition, check if any of the ancestors are
2184  * published. If so, note down the topmost ancestor that is
2185  * published via this publication, which will be used as the
2186  * relation via which to publish the partition's changes.
2187  */
2188  if (am_partition)
2189  {
2190  Oid ancestor;
2191  int level;
2192  List *ancestors = get_partition_ancestors(relid);
2193 
2194  ancestor = GetTopMostAncestorInPublication(pub->oid,
2195  ancestors,
2196  &level);
2197 
2198  if (ancestor != InvalidOid)
2199  {
2200  ancestor_published = true;
2201  if (pub->pubviaroot)
2202  {
2203  pub_relid = ancestor;
2204  ancestor_level = level;
2205  }
2206  }
2207  }
2208 
2209  if (list_member_oid(pubids, pub->oid) ||
2210  list_member_oid(schemaPubids, pub->oid) ||
2211  ancestor_published)
2212  publish = true;
2213  }
2214 
2215  /*
2216  * If the relation is to be published, determine actions to
2217  * publish, and list of columns, if appropriate.
2218  *
2219  * Don't publish changes for partitioned tables, because
2220  * publishing those of its partitions suffices, unless partition
2221  * changes won't be published due to pubviaroot being set.
2222  */
2223  if (publish &&
2224  (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2225  {
2226  entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
2227  entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
2228  entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
2230 
2231  /*
2232  * We want to publish the changes as the top-most ancestor
2233  * across all publications. So we need to check if the already
2234  * calculated level is higher than the new one. If yes, we can
2235  * ignore the new value (as it's a child). Otherwise the new
2236  * value is an ancestor, so we keep it.
2237  */
2238  if (publish_ancestor_level > ancestor_level)
2239  continue;
2240 
2241  /*
2242  * If we found an ancestor higher up in the tree, discard the
2243  * list of publications through which we replicate it, and use
2244  * the new ancestor.
2245  */
2246  if (publish_ancestor_level < ancestor_level)
2247  {
2248  publish_as_relid = pub_relid;
2249  publish_ancestor_level = ancestor_level;
2250 
2251  /* reset the publication list for this relation */
2252  rel_publications = NIL;
2253  }
2254  else
2255  {
2256  /* Same ancestor level, has to be the same OID. */
2257  Assert(publish_as_relid == pub_relid);
2258  }
2259 
2260  /* Track publications for this ancestor. */
2261  rel_publications = lappend(rel_publications, pub);
2262  }
2263  }
2264 
2265  entry->publish_as_relid = publish_as_relid;
2266 
2267  /*
2268  * Initialize the tuple slot, map, and row filter. These are only used
2269  * when publishing inserts, updates, or deletes.
2270  */
2271  if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2272  entry->pubactions.pubdelete)
2273  {
2274  /* Initialize the tuple slot and map */
2275  init_tuple_slot(data, relation, entry);
2276 
2277  /* Initialize the row filter */
2278  pgoutput_row_filter_init(data, rel_publications, entry);
2279 
2280  /* Check whether to publish generated columns. */
2281  check_and_init_gencol(data, rel_publications, entry);
2282 
2283  /* Initialize the column list */
2284  pgoutput_column_list_init(data, rel_publications, entry);
2285  }
2286 
2287  list_free(pubids);
2288  list_free(schemaPubids);
2289  list_free(rel_publications);
2290 
2291  entry->replicate_valid = true;
2292  }
2293 
2294  return entry;
2295 }
void free_attrmap(AttrMap *map)
Definition: attmap.c:56
void bms_free(Bitmapset *a)
Definition: bitmapset.c:239
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1341
@ HASH_ENTER
Definition: hsearch.h:114
List * lappend(List *list, void *datum)
Definition: list.c:339
void list_free(List *list)
Definition: list.c:1546
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:722
void list_free_deep(List *list)
Definition: list.c:1560
bool get_rel_relispartition(Oid relid)
Definition: lsyscache.c:2027
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:2003
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1952
MemoryContext CacheMemoryContext
Definition: mcxt.c:152
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
List * get_partition_ancestors(Oid relid)
Definition: partition.c:134
const void * data
#define lfirst(lc)
Definition: pg_list.h:172
static int list_length(const List *l)
Definition: pg_list.h:152
#define llast_oid(l)
Definition: pg_list.h:200
List * GetSchemaPublications(Oid schemaid)
List * GetRelationPublications(Oid relid)
Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
static List * LoadPublications(List *pubnames)
Definition: pgoutput.c:1755
static void init_tuple_slot(PGOutputData *data, Relation relation, RelationSyncEntry *entry)
Definition: pgoutput.c:1165
static void pgoutput_row_filter_init(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition: pgoutput.c:873
static void check_and_init_gencol(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition: pgoutput.c:1020
static void pgoutput_column_list_init(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition: pgoutput.c:1079
static bool publications_valid
Definition: pgoutput.c:82
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
MemoryContextSwitchTo(old_ctx)
PublicationActions pubactions
ExprState * exprstate[NUM_ROWFILTER_PUBACTIONS]
Definition: pgoutput.c:148
Bitmapset * columns
Definition: pgoutput.c:174
PublicationActions pubactions
Definition: pgoutput.c:139
TupleTableSlot * old_slot
Definition: pgoutput.c:151
bool replicate_valid
Definition: pgoutput.c:126
MemoryContext entry_cxt
Definition: pgoutput.c:180
EState * estate
Definition: pgoutput.c:149
TupleTableSlot * new_slot
Definition: pgoutput.c:150
AttrMap * attrmap
Definition: pgoutput.c:167
int tdrefcount
Definition: tupdesc.h:84
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:123
void FreeTupleDesc(TupleDesc tupdesc)
Definition: tupdesc.c:331

References Publication::alltables, Assert, RelationSyncEntry::attrmap, bms_free(), CacheMemoryContext, check_and_init_gencol(), RelationSyncEntry::columns, data, RelationSyncEntry::entry_cxt, RelationSyncEntry::estate, ExecDropSingleTupleTableSlot(), RelationSyncEntry::exprstate, free_attrmap(), FreeTupleDesc(), get_partition_ancestors(), get_rel_namespace(), get_rel_relispartition(), get_rel_relkind(), GetRelationPublications(), GetSchemaPublications(), GetTopMostAncestorInPublication(), HASH_ENTER, hash_search(), RelationSyncEntry::include_gencols, init_tuple_slot(), InvalidOid, lappend(), lfirst, list_free(), list_free_deep(), list_length(), list_member_oid(), llast_oid, LoadPublications(), MemoryContextDelete(), MemoryContextSwitchTo(), RelationSyncEntry::new_slot, NIL, Publication::oid, RelationSyncEntry::old_slot, pgoutput_column_list_init(), pgoutput_row_filter_init(), RelationSyncEntry::pubactions, Publication::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, publications_valid, RelationSyncEntry::publish_as_relid, PublicationActions::pubtruncate, PublicationActions::pubupdate, Publication::pubviaroot, RelationGetRelid, RelationSyncCache, RelationSyncEntry::replicate_valid, RelationSyncEntry::schema_sent, RelationSyncEntry::streamed_txns, TupleDescData::tdrefcount, and TupleTableSlot::tts_tupleDescriptor.

Referenced by pgoutput_change(), and pgoutput_truncate().

◆ get_schema_sent_in_streamed_txn()

static bool get_schema_sent_in_streamed_txn ( RelationSyncEntry entry,
TransactionId  xid 
)
static

Definition at line 1980 of file pgoutput.c.

1981 {
1982  return list_member_xid(entry->streamed_txns, xid);
1983 }
bool list_member_xid(const List *list, TransactionId datum)
Definition: list.c:742

References list_member_xid(), and RelationSyncEntry::streamed_txns.

Referenced by maybe_send_schema().

◆ init_rel_sync_cache()

static void init_rel_sync_cache ( MemoryContext  cachectx)
static

Definition at line 1926 of file pgoutput.c.

1927 {
1928  HASHCTL ctl;
1929  static bool relation_callbacks_registered = false;
1930 
1931  /* Nothing to do if hash table already exists */
1932  if (RelationSyncCache != NULL)
1933  return;
1934 
1935  /* Make a new hash table for the cache */
1936  ctl.keysize = sizeof(Oid);
1937  ctl.entrysize = sizeof(RelationSyncEntry);
1938  ctl.hcxt = cachectx;
1939 
1940  RelationSyncCache = hash_create("logical replication output relation cache",
1941  128, &ctl,
1943 
1944  Assert(RelationSyncCache != NULL);
1945 
1946  /* No more to do if we already registered callbacks */
1947  if (relation_callbacks_registered)
1948  return;
1949 
1950  /* We must update the cache entry for a relation after a relcache flush */
1952 
1953  /*
1954  * Flush all cache entries after a pg_namespace change, in case it was a
1955  * schema rename affecting a relation being replicated.
1956  */
1957  CacheRegisterSyscacheCallback(NAMESPACEOID,
1959  (Datum) 0);
1960 
1961  /*
1962  * Flush all cache entries after any publication changes. (We need no
1963  * callback entry for pg_publication, because publication_invalidation_cb
1964  * will take care of it.)
1965  */
1966  CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
1968  (Datum) 0);
1969  CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
1971  (Datum) 0);
1972 
1973  relation_callbacks_registered = true;
1974 }
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
Definition: inval.c:1746
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1704
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:2395
struct RelationSyncEntry RelationSyncEntry
static void rel_sync_cache_relation_cb(Datum arg, Oid relid)
Definition: pgoutput.c:2344
uintptr_t Datum
Definition: postgres.h:64
tree ctl
Definition: radixtree.h:1853

References Assert, CacheRegisterRelcacheCallback(), CacheRegisterSyscacheCallback(), ctl, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, rel_sync_cache_publication_cb(), rel_sync_cache_relation_cb(), and RelationSyncCache.

Referenced by pgoutput_startup().

◆ init_tuple_slot()

static void init_tuple_slot ( PGOutputData data,
Relation  relation,
RelationSyncEntry entry 
)
static

Definition at line 1165 of file pgoutput.c.

1167 {
1168  MemoryContext oldctx;
1169  TupleDesc oldtupdesc;
1170  TupleDesc newtupdesc;
1171 
1172  oldctx = MemoryContextSwitchTo(data->cachectx);
1173 
1174  /*
1175  * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1176  * live as long as the cache remains.
1177  */
1178  oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1179  newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1180 
1181  entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1182  entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
1183 
1184  MemoryContextSwitchTo(oldctx);
1185 
1186  /*
1187  * Cache the map that will be used to convert the relation's tuples into
1188  * the ancestor's format, if needed.
1189  */
1190  if (entry->publish_as_relid != RelationGetRelid(relation))
1191  {
1192  Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
1193  TupleDesc indesc = RelationGetDescr(relation);
1194  TupleDesc outdesc = RelationGetDescr(ancestor);
1195 
1196  /* Map must live as long as the session does. */
1198 
1199  entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
1200 
1201  MemoryContextSwitchTo(oldctx);
1202  RelationClose(ancestor);
1203  }
1204 }
AttrMap * build_attrmap_by_name_if_req(TupleDesc indesc, TupleDesc outdesc, bool missing_ok)
Definition: attmap.c:263
const TupleTableSlotOps TTSOpsHeapTuple
Definition: execTuples.c:85
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1325
void RelationClose(Relation relation)
Definition: relcache.c:2183
TupleDesc CreateTupleDescCopyConstr(TupleDesc tupdesc)
Definition: tupdesc.c:173

References RelationSyncEntry::attrmap, build_attrmap_by_name_if_req(), CacheMemoryContext, CreateTupleDescCopyConstr(), data, MakeSingleTupleTableSlot(), MemoryContextSwitchTo(), RelationSyncEntry::new_slot, RelationSyncEntry::old_slot, RelationSyncEntry::publish_as_relid, RelationClose(), RelationGetDescr, RelationGetRelid, RelationIdGetRelation(), and TTSOpsHeapTuple.

Referenced by get_rel_sync_entry().

◆ LoadPublications()

static List * LoadPublications ( List pubnames)
static

Definition at line 1755 of file pgoutput.c.

1756 {
1757  List *result = NIL;
1758  ListCell *lc;
1759 
1760  foreach(lc, pubnames)
1761  {
1762  char *pubname = (char *) lfirst(lc);
1763  Publication *pub = GetPublicationByName(pubname, false);
1764 
1765  result = lappend(result, pub);
1766  }
1767 
1768  return result;
1769 }
Publication * GetPublicationByName(const char *pubname, bool missing_ok)

References GetPublicationByName(), lappend(), lfirst, and NIL.

Referenced by get_rel_sync_entry().

◆ maybe_send_schema()

static void maybe_send_schema ( LogicalDecodingContext ctx,
ReorderBufferChange change,
Relation  relation,
RelationSyncEntry relentry 
)
static

Definition at line 685 of file pgoutput.c.

688 {
690  bool schema_sent;
693 
694  /*
695  * Remember XID of the (sub)transaction for the change. We don't care if
696  * it's top-level transaction or not (we have already sent that XID in
697  * start of the current streaming block).
698  *
699  * If we're not in a streaming block, just use InvalidTransactionId and
700  * the write methods will not include it.
701  */
702  if (data->in_streaming)
703  xid = change->txn->xid;
704 
705  if (rbtxn_is_subtxn(change->txn))
706  topxid = rbtxn_get_toptxn(change->txn)->xid;
707  else
708  topxid = xid;
709 
710  /*
711  * Do we need to send the schema? We do track streamed transactions
712  * separately, because those may be applied later (and the regular
713  * transactions won't see their effects until then) and in an order that
714  * we don't know at this point.
715  *
716  * XXX There is a scope of optimization here. Currently, we always send
717  * the schema first time in a streaming transaction but we can probably
718  * avoid that by checking 'relentry->schema_sent' flag. However, before
719  * doing that we need to study its impact on the case where we have a mix
720  * of streaming and non-streaming transactions.
721  */
722  if (data->in_streaming)
723  schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
724  else
725  schema_sent = relentry->schema_sent;
726 
727  /* Nothing to do if we already sent the schema. */
728  if (schema_sent)
729  return;
730 
731  /*
732  * Send the schema. If the changes will be published using an ancestor's
733  * schema, not the relation's own, send that ancestor's schema before
734  * sending relation's own (XXX - maybe sending only the former suffices?).
735  */
736  if (relentry->publish_as_relid != RelationGetRelid(relation))
737  {
738  Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
739 
740  send_relation_and_attrs(ancestor, xid, ctx, relentry);
741  RelationClose(ancestor);
742  }
743 
744  send_relation_and_attrs(relation, xid, ctx, relentry);
745 
746  if (data->in_streaming)
747  set_schema_sent_in_streamed_txn(relentry, topxid);
748  else
749  relentry->schema_sent = true;
750 }
uint32 TransactionId
Definition: c.h:606
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:76
static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx, RelationSyncEntry *relentry)
Definition: pgoutput.c:756
static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:1990
static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:1980
#define rbtxn_get_toptxn(txn)
#define rbtxn_is_subtxn(txn)
void * output_plugin_private
Definition: logical.h:76
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:84
TransactionId xid
#define InvalidTransactionId
Definition: transam.h:31

References data, get_schema_sent_in_streamed_txn(), if(), InvalidTransactionId, LogicalDecodingContext::output_plugin_private, RelationSyncEntry::publish_as_relid, rbtxn_get_toptxn, rbtxn_is_subtxn, RelationClose(), RelationGetRelid, RelationIdGetRelation(), RelationSyncEntry::schema_sent, send_relation_and_attrs(), set_schema_sent_in_streamed_txn(), ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by pgoutput_change(), and pgoutput_truncate().

◆ parse_output_parameters()

static void parse_output_parameters ( List options,
PGOutputData data 
)
static

Definition at line 282 of file pgoutput.c.

283 {
284  ListCell *lc;
285  bool protocol_version_given = false;
286  bool publication_names_given = false;
287  bool binary_option_given = false;
288  bool messages_option_given = false;
289  bool streaming_given = false;
290  bool two_phase_option_given = false;
291  bool origin_option_given = false;
292 
293  data->binary = false;
294  data->streaming = LOGICALREP_STREAM_OFF;
295  data->messages = false;
296  data->two_phase = false;
297 
298  foreach(lc, options)
299  {
300  DefElem *defel = (DefElem *) lfirst(lc);
301 
302  Assert(defel->arg == NULL || IsA(defel->arg, String));
303 
304  /* Check each param, whether or not we recognize it */
305  if (strcmp(defel->defname, "proto_version") == 0)
306  {
307  unsigned long parsed;
308  char *endptr;
309 
310  if (protocol_version_given)
311  ereport(ERROR,
312  (errcode(ERRCODE_SYNTAX_ERROR),
313  errmsg("conflicting or redundant options")));
314  protocol_version_given = true;
315 
316  errno = 0;
317  parsed = strtoul(strVal(defel->arg), &endptr, 10);
318  if (errno != 0 || *endptr != '\0')
319  ereport(ERROR,
320  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
321  errmsg("invalid proto_version")));
322 
323  if (parsed > PG_UINT32_MAX)
324  ereport(ERROR,
325  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
326  errmsg("proto_version \"%s\" out of range",
327  strVal(defel->arg))));
328 
329  data->protocol_version = (uint32) parsed;
330  }
331  else if (strcmp(defel->defname, "publication_names") == 0)
332  {
333  if (publication_names_given)
334  ereport(ERROR,
335  (errcode(ERRCODE_SYNTAX_ERROR),
336  errmsg("conflicting or redundant options")));
337  publication_names_given = true;
338 
339  if (!SplitIdentifierString(strVal(defel->arg), ',',
340  &data->publication_names))
341  ereport(ERROR,
342  (errcode(ERRCODE_INVALID_NAME),
343  errmsg("invalid publication_names syntax")));
344  }
345  else if (strcmp(defel->defname, "binary") == 0)
346  {
347  if (binary_option_given)
348  ereport(ERROR,
349  (errcode(ERRCODE_SYNTAX_ERROR),
350  errmsg("conflicting or redundant options")));
351  binary_option_given = true;
352 
353  data->binary = defGetBoolean(defel);
354  }
355  else if (strcmp(defel->defname, "messages") == 0)
356  {
357  if (messages_option_given)
358  ereport(ERROR,
359  (errcode(ERRCODE_SYNTAX_ERROR),
360  errmsg("conflicting or redundant options")));
361  messages_option_given = true;
362 
363  data->messages = defGetBoolean(defel);
364  }
365  else if (strcmp(defel->defname, "streaming") == 0)
366  {
367  if (streaming_given)
368  ereport(ERROR,
369  (errcode(ERRCODE_SYNTAX_ERROR),
370  errmsg("conflicting or redundant options")));
371  streaming_given = true;
372 
373  data->streaming = defGetStreamingMode(defel);
374  }
375  else if (strcmp(defel->defname, "two_phase") == 0)
376  {
377  if (two_phase_option_given)
378  ereport(ERROR,
379  (errcode(ERRCODE_SYNTAX_ERROR),
380  errmsg("conflicting or redundant options")));
381  two_phase_option_given = true;
382 
383  data->two_phase = defGetBoolean(defel);
384  }
385  else if (strcmp(defel->defname, "origin") == 0)
386  {
387  char *origin;
388 
389  if (origin_option_given)
390  ereport(ERROR,
391  errcode(ERRCODE_SYNTAX_ERROR),
392  errmsg("conflicting or redundant options"));
393  origin_option_given = true;
394 
395  origin = defGetString(defel);
396  if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
397  data->publish_no_origin = true;
398  else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
399  data->publish_no_origin = false;
400  else
401  ereport(ERROR,
402  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
403  errmsg("unrecognized origin value: \"%s\"", origin));
404  }
405  else
406  elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
407  }
408 
409  /* Check required options */
410  if (!protocol_version_given)
411  ereport(ERROR,
412  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
413  errmsg("option \"%s\" missing", "proto_version"));
414  if (!publication_names_given)
415  ereport(ERROR,
416  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
417  errmsg("option \"%s\" missing", "publication_names"));
418 }
#define PG_UINT32_MAX
Definition: c.h:544
uint32_t uint32
Definition: c.h:485
bool defGetBoolean(DefElem *def)
Definition: define.c:107
char * defGetString(DefElem *def)
Definition: define.c:48
#define elog(elevel,...)
Definition: elog.h:225
#define IsA(nodeptr, _type_)
Definition: nodes.h:158
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
char * defname
Definition: parsenodes.h:817
Node * arg
Definition: parsenodes.h:818
Definition: value.h:64
char defGetStreamingMode(DefElem *def)
#define strVal(v)
Definition: value.h:82
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3432

References DefElem::arg, Assert, data, defGetBoolean(), defGetStreamingMode(), defGetString(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, IsA, lfirst, pg_strcasecmp(), PG_UINT32_MAX, SplitIdentifierString(), and strVal.

Referenced by pgoutput_startup().

◆ pgoutput_begin_prepare_txn()

static void pgoutput_begin_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 622 of file pgoutput.c.

623 {
624  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
625 
626  OutputPluginPrepareWrite(ctx, !send_replication_origin);
628 
629  send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
630  send_replication_origin);
631 
632  OutputPluginWrite(ctx, true);
633 }
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:722
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:709
#define InvalidRepOriginId
Definition: origin.h:33
static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin)
Definition: pgoutput.c:2421
void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:115
StringInfo out
Definition: logical.h:71
RepOriginId origin_id
XLogRecPtr origin_lsn

References InvalidRepOriginId, logicalrep_write_begin_prepare(), ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and send_repl_origin().

Referenced by _PG_output_plugin_init().

◆ pgoutput_begin_txn()

static void pgoutput_begin_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 554 of file pgoutput.c.

555 {
557  sizeof(PGOutputTxnData));
558 
559  txn->output_plugin_private = txndata;
560 }
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1215
MemoryContext context
Definition: logical.h:36
void * output_plugin_private

References LogicalDecodingContext::context, MemoryContextAllocZero(), and ReorderBufferTXN::output_plugin_private.

Referenced by _PG_output_plugin_init().

◆ pgoutput_change()

static void pgoutput_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 1438 of file pgoutput.c.

1440 {
1443  MemoryContext old;
1444  RelationSyncEntry *relentry;
1446  Relation ancestor = NULL;
1447  Relation targetrel = relation;
1449  TupleTableSlot *old_slot = NULL;
1450  TupleTableSlot *new_slot = NULL;
1451 
1452  if (!is_publishable_relation(relation))
1453  return;
1454 
1455  /*
1456  * Remember the xid for the change in streaming mode. We need to send xid
1457  * with each change in the streaming mode so that subscriber can make
1458  * their association and on aborts, it can discard the corresponding
1459  * changes.
1460  */
1461  if (data->in_streaming)
1462  xid = change->txn->xid;
1463 
1464  relentry = get_rel_sync_entry(data, relation);
1465 
1466  /* First check the table filter */
1467  switch (action)
1468  {
1470  if (!relentry->pubactions.pubinsert)
1471  return;
1472  break;
1474  if (!relentry->pubactions.pubupdate)
1475  return;
1476  break;
1478  if (!relentry->pubactions.pubdelete)
1479  return;
1480 
1481  /*
1482  * This is only possible if deletes are allowed even when replica
1483  * identity is not defined for a table. Since the DELETE action
1484  * can't be published, we simply return.
1485  */
1486  if (!change->data.tp.oldtuple)
1487  {
1488  elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1489  return;
1490  }
1491  break;
1492  default:
1493  Assert(false);
1494  }
1495 
1496  /* Avoid leaking memory by using and resetting our own context */
1497  old = MemoryContextSwitchTo(data->context);
1498 
1499  /* Switch relation if publishing via root. */
1500  if (relentry->publish_as_relid != RelationGetRelid(relation))
1501  {
1502  Assert(relation->rd_rel->relispartition);
1503  ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1504  targetrel = ancestor;
1505  }
1506 
1507  if (change->data.tp.oldtuple)
1508  {
1509  old_slot = relentry->old_slot;
1510  ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
1511 
1512  /* Convert tuple if needed. */
1513  if (relentry->attrmap)
1514  {
1516  &TTSOpsVirtual);
1517 
1518  old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1519  }
1520  }
1521 
1522  if (change->data.tp.newtuple)
1523  {
1524  new_slot = relentry->new_slot;
1525  ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
1526 
1527  /* Convert tuple if needed. */
1528  if (relentry->attrmap)
1529  {
1531  &TTSOpsVirtual);
1532 
1533  new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1534  }
1535  }
1536 
1537  /*
1538  * Check row filter.
1539  *
1540  * Updates could be transformed to inserts or deletes based on the results
1541  * of the row filter for old and new tuple.
1542  */
1543  if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1544  goto cleanup;
1545 
1546  /*
1547  * Send BEGIN if we haven't yet.
1548  *
1549  * We send the BEGIN message after ensuring that we will actually send the
1550  * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1551  * transactions.
1552  */
1553  if (txndata && !txndata->sent_begin_txn)
1554  pgoutput_send_begin(ctx, txn);
1555 
1556  /*
1557  * Schema should be sent using the original relation because it also sends
1558  * the ancestor's relation.
1559  */
1560  maybe_send_schema(ctx, change, relation, relentry);
1561 
1562  OutputPluginPrepareWrite(ctx, true);
1563 
1564  /* Send the data */
1565  switch (action)
1566  {
1568  logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1569  data->binary, relentry->columns,
1570  relentry->include_gencols);
1571  break;
1573  logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
1574  new_slot, data->binary, relentry->columns,
1575  relentry->include_gencols);
1576  break;
1578  logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
1579  data->binary, relentry->columns,
1580  relentry->include_gencols);
1581  break;
1582  default:
1583  Assert(false);
1584  }
1585 
1586  OutputPluginWrite(ctx, true);
1587 
1588 cleanup:
1589  if (RelationIsValid(ancestor))
1590  {
1591  RelationClose(ancestor);
1592  ancestor = NULL;
1593  }
1594 
1595  /* Drop the new slots that were used to store the converted tuples. */
1596  if (relentry->attrmap)
1597  {
1598  if (old_slot)
1599  ExecDropSingleTupleTableSlot(old_slot);
1600 
1601  if (new_slot)
1602  ExecDropSingleTupleTableSlot(new_slot);
1603  }
1604 
1605  MemoryContextSwitchTo(old);
1606  MemoryContextReset(data->context);
1607 }
static void cleanup(void)
Definition: bootstrap.c:704
#define DEBUG1
Definition: elog.h:30
TupleTableSlot * MakeTupleTableSlot(TupleDesc tupleDesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1199
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
TupleTableSlot * ExecStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1439
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:383
bool is_publishable_relation(Relation rel)
static void pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:568
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Relation relation)
Definition: pgoutput.c:2011
static void maybe_send_schema(LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
Definition: pgoutput.c:685
static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot, TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry, ReorderBufferChangeType *action)
Definition: pgoutput.c:1257
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns, bool include_gencols)
Definition: proto.c:402
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns, bool include_gencols)
Definition: proto.c:523
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns, bool include_gencols)
Definition: proto.c:447
#define RelationIsValid(relation)
Definition: rel.h:478
ReorderBufferChangeType
Definition: reorderbuffer.h:51
@ REORDER_BUFFER_CHANGE_INSERT
Definition: reorderbuffer.h:52
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:54
@ REORDER_BUFFER_CHANGE_UPDATE
Definition: reorderbuffer.h:53
ReorderBufferChangeType action
Definition: reorderbuffer.h:81
union ReorderBufferChange::@108 data
struct ReorderBufferChange::@108::@109 tp
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:192

References generate_unaccent_rules::action, ReorderBufferChange::action, Assert, RelationSyncEntry::attrmap, cleanup(), RelationSyncEntry::columns, ReorderBufferChange::data, data, DEBUG1, elog, ExecDropSingleTupleTableSlot(), ExecStoreHeapTuple(), execute_attr_map_slot(), get_rel_sync_entry(), RelationSyncEntry::include_gencols, InvalidTransactionId, is_publishable_relation(), logicalrep_write_delete(), logicalrep_write_insert(), logicalrep_write_update(), MakeTupleTableSlot(), maybe_send_schema(), MemoryContextReset(), MemoryContextSwitchTo(), RelationSyncEntry::new_slot, RelationSyncEntry::old_slot, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pgoutput_row_filter(), pgoutput_send_begin(), RelationSyncEntry::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, RelationSyncEntry::publish_as_relid, PublicationActions::pubupdate, RelationData::rd_rel, RelationClose(), RelationGetDescr, RelationGetRelid, RelationIdGetRelation(), RelationIsValid, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::tp, TTSOpsVirtual, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_column_list_init()

static void pgoutput_column_list_init ( PGOutputData data,
List publications,
RelationSyncEntry entry 
)
static

Definition at line 1079 of file pgoutput.c.

1081 {
1082  ListCell *lc;
1083  bool first = true;
1084  Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1085  bool found_pub_collist = false;
1086  Bitmapset *relcols = NULL;
1087 
1089 
1090  /*
1091  * Find if there are any column lists for this relation. If there are,
1092  * build a bitmap using the column lists.
1093  *
1094  * Multiple publications might have multiple column lists for this
1095  * relation.
1096  *
1097  * Note that we don't support the case where the column list is different
1098  * for the same table when combining publications. See comments atop
1099  * fetch_table_list. But one can later change the publication so we still
1100  * need to check all the given publication-table mappings and report an
1101  * error if any publications have a different column list.
1102  */
1103  foreach(lc, publications)
1104  {
1105  Publication *pub = lfirst(lc);
1106  Bitmapset *cols = NULL;
1107 
1108  /* Retrieve the bitmap of columns for a column list publication. */
1109  found_pub_collist |= check_and_fetch_column_list(pub,
1110  entry->publish_as_relid,
1111  entry->entry_cxt, &cols);
1112 
1113  /*
1114  * For non-column list publications — e.g. TABLE (without a column
1115  * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
1116  * of the table (including generated columns when
1117  * 'publish_generated_columns' parameter is true).
1118  */
1119  if (!cols)
1120  {
1121  /*
1122  * Cache the table columns for the first publication with no
1123  * specified column list to detect publication with a different
1124  * column list.
1125  */
1126  if (!relcols && (list_length(publications) > 1))
1127  {
1129 
1130  relcols = pub_form_cols_map(relation, entry->include_gencols);
1131  MemoryContextSwitchTo(oldcxt);
1132  }
1133 
1134  cols = relcols;
1135  }
1136 
1137  if (first)
1138  {
1139  entry->columns = cols;
1140  first = false;
1141  }
1142  else if (!bms_equal(entry->columns, cols))
1143  ereport(ERROR,
1144  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1145  errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
1147  RelationGetRelationName(relation)));
1148  } /* loop all subscribed publications */
1149 
1150  /*
1151  * If no column list publications exist, columns to be published will be
1152  * computed later according to the 'publish_generated_columns' parameter.
1153  */
1154  if (!found_pub_collist)
1155  entry->columns = NULL;
1156 
1157  RelationClose(relation);
1158 }
bool bms_equal(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:142
Bitmapset * pub_form_cols_map(Relation relation, bool include_gencols)
static void pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
Definition: pgoutput.c:851

References bms_equal(), check_and_fetch_column_list(), RelationSyncEntry::columns, data, RelationSyncEntry::entry_cxt, ereport, errcode(), errmsg(), ERROR, get_namespace_name(), RelationSyncEntry::include_gencols, lfirst, list_length(), MemoryContextSwitchTo(), pgoutput_ensure_entry_cxt(), pub_form_cols_map(), RelationSyncEntry::publish_as_relid, RelationClose(), RelationGetNamespace, RelationGetRelationName, and RelationIdGetRelation().

Referenced by get_rel_sync_entry().

◆ pgoutput_commit_prepared_txn()

static void pgoutput_commit_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 653 of file pgoutput.c.

655 {
656  OutputPluginUpdateProgress(ctx, false);
657 
658  OutputPluginPrepareWrite(ctx, true);
659  logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
660  OutputPluginWrite(ctx, true);
661 }
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact)
Definition: logical.c:735
void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:236

References logicalrep_write_commit_prepared(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

◆ pgoutput_commit_txn()

static void pgoutput_commit_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 590 of file pgoutput.c.

592 {
594  bool sent_begin_txn;
595 
596  Assert(txndata);
597 
598  /*
599  * We don't need to send the commit message unless some relevant change
600  * from this transaction has been sent to the downstream.
601  */
602  sent_begin_txn = txndata->sent_begin_txn;
603  OutputPluginUpdateProgress(ctx, !sent_begin_txn);
604  pfree(txndata);
605  txn->output_plugin_private = NULL;
606 
607  if (!sent_begin_txn)
608  {
609  elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
610  return;
611  }
612 
613  OutputPluginPrepareWrite(ctx, true);
614  logicalrep_write_commit(ctx->out, txn, commit_lsn);
615  OutputPluginWrite(ctx, true);
616 }
void pfree(void *pointer)
Definition: mcxt.c:1521
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:77
bool sent_begin_txn
Definition: pgoutput.c:209

References Assert, DEBUG1, elog, logicalrep_write_commit(), LogicalDecodingContext::out, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), OutputPluginWrite(), pfree(), PGOutputTxnData::sent_begin_txn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_ensure_entry_cxt()

static void pgoutput_ensure_entry_cxt ( PGOutputData data,
RelationSyncEntry entry 
)
static

Definition at line 851 of file pgoutput.c.

852 {
853  Relation relation;
854 
855  /* The context may already exist, in which case bail out. */
856  if (entry->entry_cxt)
857  return;
858 
859  relation = RelationIdGetRelation(entry->publish_as_relid);
860 
861  entry->entry_cxt = AllocSetContextCreate(data->cachectx,
862  "entry private context",
864 
866  RelationGetRelationName(relation));
867 }
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_SMALL_SIZES
Definition: memutils.h:170
#define MemoryContextCopyAndSetIdentifier(cxt, id)
Definition: memutils.h:101

References ALLOCSET_SMALL_SIZES, AllocSetContextCreate, data, RelationSyncEntry::entry_cxt, MemoryContextCopyAndSetIdentifier, RelationSyncEntry::publish_as_relid, RelationGetRelationName, and RelationIdGetRelation().

Referenced by pgoutput_column_list_init(), and pgoutput_row_filter_init().

◆ pgoutput_message()

static void pgoutput_message ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  message_lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)
static

Definition at line 1678 of file pgoutput.c.

1681 {
1684 
1685  if (!data->messages)
1686  return;
1687 
1688  /*
1689  * Remember the xid for the message in streaming mode. See
1690  * pgoutput_change.
1691  */
1692  if (data->in_streaming)
1693  xid = txn->xid;
1694 
1695  /*
1696  * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1697  */
1698  if (transactional)
1699  {
1701 
1702  /* Send BEGIN if we haven't yet */
1703  if (txndata && !txndata->sent_begin_txn)
1704  pgoutput_send_begin(ctx, txn);
1705  }
1706 
1707  OutputPluginPrepareWrite(ctx, true);
1709  xid,
1710  message_lsn,
1711  transactional,
1712  prefix,
1713  sz,
1714  message);
1715  OutputPluginWrite(ctx, true);
1716 }
void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: proto.c:633

References data, if(), InvalidTransactionId, logicalrep_write_message(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pgoutput_send_begin(), PGOutputTxnData::sent_begin_txn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_origin_filter()

static bool pgoutput_origin_filter ( LogicalDecodingContext ctx,
RepOriginId  origin_id 
)
static

Definition at line 1723 of file pgoutput.c.

1725 {
1727 
1728  if (data->publish_no_origin && origin_id != InvalidRepOriginId)
1729  return true;
1730 
1731  return false;
1732 }

References data, if(), InvalidRepOriginId, and LogicalDecodingContext::output_plugin_private.

Referenced by _PG_output_plugin_init().

◆ pgoutput_prepare_txn()

static void pgoutput_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 639 of file pgoutput.c.

641 {
642  OutputPluginUpdateProgress(ctx, false);
643 
644  OutputPluginPrepareWrite(ctx, true);
645  logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
646  OutputPluginWrite(ctx, true);
647 }
void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:186

References logicalrep_write_prepare(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

◆ pgoutput_rollback_prepared_txn()

static void pgoutput_rollback_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_end_lsn,
TimestampTz  prepare_time 
)
static

Definition at line 667 of file pgoutput.c.

671 {
672  OutputPluginUpdateProgress(ctx, false);
673 
674  OutputPluginPrepareWrite(ctx, true);
675  logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
676  prepare_time);
677  OutputPluginWrite(ctx, true);
678 }
void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: proto.c:292

References logicalrep_write_rollback_prepared(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

◆ pgoutput_row_filter()

static bool pgoutput_row_filter ( Relation  relation,
TupleTableSlot old_slot,
TupleTableSlot **  new_slot_ptr,
RelationSyncEntry entry,
ReorderBufferChangeType action 
)
static

Definition at line 1257 of file pgoutput.c.

1260 {
1261  TupleDesc desc;
1262  int i;
1263  bool old_matched,
1264  new_matched,
1265  result;
1266  TupleTableSlot *tmp_new_slot;
1267  TupleTableSlot *new_slot = *new_slot_ptr;
1268  ExprContext *ecxt;
1269  ExprState *filter_exprstate;
1270 
1271  /*
1272  * We need this map to avoid relying on ReorderBufferChangeType enums
1273  * having specific values.
1274  */
1275  static const int map_changetype_pubaction[] = {
1279  };
1280 
1284 
1285  Assert(new_slot || old_slot);
1286 
1287  /* Get the corresponding row filter */
1288  filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1289 
1290  /* Bail out if there is no row filter */
1291  if (!filter_exprstate)
1292  return true;
1293 
1294  elog(DEBUG3, "table \"%s.%s\" has row filter",
1296  RelationGetRelationName(relation));
1297 
1299 
1300  ecxt = GetPerTupleExprContext(entry->estate);
1301 
1302  /*
1303  * For the following occasions where there is only one tuple, we can
1304  * evaluate the row filter for that tuple and return.
1305  *
1306  * For inserts, we only have the new tuple.
1307  *
1308  * For updates, we can have only a new tuple when none of the replica
1309  * identity columns changed and none of those columns have external data
1310  * but we still need to evaluate the row filter for the new tuple as the
1311  * existing values of those columns might not match the filter. Also,
1312  * users can use constant expressions in the row filter, so we anyway need
1313  * to evaluate it for the new tuple.
1314  *
1315  * For deletes, we only have the old tuple.
1316  */
1317  if (!new_slot || !old_slot)
1318  {
1319  ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1320  result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1321 
1322  return result;
1323  }
1324 
1325  /*
1326  * Both the old and new tuples must be valid only for updates and need to
1327  * be checked against the row filter.
1328  */
1329  Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1330 
1331  slot_getallattrs(new_slot);
1332  slot_getallattrs(old_slot);
1333 
1334  tmp_new_slot = NULL;
1335  desc = RelationGetDescr(relation);
1336 
1337  /*
1338  * The new tuple might not have all the replica identity columns, in which
1339  * case it needs to be copied over from the old tuple.
1340  */
1341  for (i = 0; i < desc->natts; i++)
1342  {
1343  Form_pg_attribute att = TupleDescAttr(desc, i);
1344 
1345  /*
1346  * if the column in the new tuple or old tuple is null, nothing to do
1347  */
1348  if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1349  continue;
1350 
1351  /*
1352  * Unchanged toasted replica identity columns are only logged in the
1353  * old tuple. Copy this over to the new tuple. The changed (or WAL
1354  * Logged) toast values are always assembled in memory and set as
1355  * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1356  */
1357  if (att->attlen == -1 &&
1358  VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
1359  !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
1360  {
1361  if (!tmp_new_slot)
1362  {
1363  tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
1364  ExecClearTuple(tmp_new_slot);
1365 
1366  memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1367  desc->natts * sizeof(Datum));
1368  memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1369  desc->natts * sizeof(bool));
1370  }
1371 
1372  tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1373  tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1374  }
1375  }
1376 
1377  ecxt->ecxt_scantuple = old_slot;
1378  old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1379 
1380  if (tmp_new_slot)
1381  {
1382  ExecStoreVirtualTuple(tmp_new_slot);
1383  ecxt->ecxt_scantuple = tmp_new_slot;
1384  }
1385  else
1386  ecxt->ecxt_scantuple = new_slot;
1387 
1388  new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1389 
1390  /*
1391  * Case 1: if both tuples don't match the row filter, bailout. Send
1392  * nothing.
1393  */
1394  if (!old_matched && !new_matched)
1395  return false;
1396 
1397  /*
1398  * Case 2: if the old tuple doesn't satisfy the row filter but the new
1399  * tuple does, transform the UPDATE into INSERT.
1400  *
1401  * Use the newly transformed tuple that must contain the column values for
1402  * all the replica identity columns. This is required to ensure that the
1403  * while inserting the tuple in the downstream node, we have all the
1404  * required column values.
1405  */
1406  if (!old_matched && new_matched)
1407  {
1409 
1410  if (tmp_new_slot)
1411  *new_slot_ptr = tmp_new_slot;
1412  }
1413 
1414  /*
1415  * Case 3: if the old tuple satisfies the row filter but the new tuple
1416  * doesn't, transform the UPDATE into DELETE.
1417  *
1418  * This transformation does not require another tuple. The Old tuple will
1419  * be used for DELETE.
1420  */
1421  else if (old_matched && !new_matched)
1423 
1424  /*
1425  * Case 4: if both tuples match the row filter, transformation isn't
1426  * required. (*action is default UPDATE).
1427  */
1428 
1429  return true;
1430 }
#define DEBUG3
Definition: elog.h:28
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1639
#define ResetPerTupleExprContext(estate)
Definition: executor.h:570
#define GetPerTupleExprContext(estate)
Definition: executor.h:561
static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
Definition: pgoutput.c:828
TupleTableSlot * ecxt_scantuple
Definition: execnodes.h:258
bool * tts_isnull
Definition: tuptable.h:127
Datum * tts_values
Definition: tuptable.h:125
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:454
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:368
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: varatt.h:290

References generate_unaccent_rules::action, Assert, DEBUG3, ExprContext::ecxt_scantuple, elog, RelationSyncEntry::estate, ExecClearTuple(), ExecStoreVirtualTuple(), RelationSyncEntry::exprstate, get_namespace_name(), GetPerTupleExprContext, i, MakeSingleTupleTableSlot(), TupleDescData::natts, pgoutput_row_filter_exec_expr(), PUBACTION_DELETE, PUBACTION_INSERT, PUBACTION_UPDATE, RelationGetDescr, RelationGetNamespace, RelationGetRelationName, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, ResetPerTupleExprContext, slot_getallattrs(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, TTSOpsVirtual, TupleDescAttr, and VARATT_IS_EXTERNAL_ONDISK.

Referenced by pgoutput_change().

◆ pgoutput_row_filter_exec_expr()

static bool pgoutput_row_filter_exec_expr ( ExprState state,
ExprContext econtext 
)
static

Definition at line 828 of file pgoutput.c.

829 {
830  Datum ret;
831  bool isnull;
832 
833  Assert(state != NULL);
834 
835  ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
836 
837  elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
838  isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
839  isnull ? "true" : "false");
840 
841  if (isnull)
842  return false;
843 
844  return DatumGetBool(ret);
845 }
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:359
static bool DatumGetBool(Datum X)
Definition: postgres.h:90
Definition: regguts.h:323

References Assert, DatumGetBool(), DEBUG3, elog, and ExecEvalExprSwitchContext().

Referenced by pgoutput_row_filter().

◆ pgoutput_row_filter_init()

static void pgoutput_row_filter_init ( PGOutputData data,
List publications,
RelationSyncEntry entry 
)
static

Definition at line 873 of file pgoutput.c.

875 {
876  ListCell *lc;
877  List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
878  bool no_filter[] = {false, false, false}; /* One per pubaction */
879  MemoryContext oldctx;
880  int idx;
881  bool has_filter = true;
882  Oid schemaid = get_rel_namespace(entry->publish_as_relid);
883 
884  /*
885  * Find if there are any row filters for this relation. If there are, then
886  * prepare the necessary ExprState and cache it in entry->exprstate. To
887  * build an expression state, we need to ensure the following:
888  *
889  * All the given publication-table mappings must be checked.
890  *
891  * Multiple publications might have multiple row filters for this
892  * relation. Since row filter usage depends on the DML operation, there
893  * are multiple lists (one for each operation) to which row filters will
894  * be appended.
895  *
896  * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
897  * expression" so it takes precedence.
898  */
899  foreach(lc, publications)
900  {
901  Publication *pub = lfirst(lc);
902  HeapTuple rftuple = NULL;
903  Datum rfdatum = 0;
904  bool pub_no_filter = true;
905 
906  /*
907  * If the publication is FOR ALL TABLES, or the publication includes a
908  * FOR TABLES IN SCHEMA where the table belongs to the referred
909  * schema, then it is treated the same as if there are no row filters
910  * (even if other publications have a row filter).
911  */
912  if (!pub->alltables &&
913  !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
914  ObjectIdGetDatum(schemaid),
915  ObjectIdGetDatum(pub->oid)))
916  {
917  /*
918  * Check for the presence of a row filter in this publication.
919  */
920  rftuple = SearchSysCache2(PUBLICATIONRELMAP,
922  ObjectIdGetDatum(pub->oid));
923 
924  if (HeapTupleIsValid(rftuple))
925  {
926  /* Null indicates no filter. */
927  rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
928  Anum_pg_publication_rel_prqual,
929  &pub_no_filter);
930  }
931  }
932 
933  if (pub_no_filter)
934  {
935  if (rftuple)
936  ReleaseSysCache(rftuple);
937 
938  no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
939  no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
940  no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
941 
942  /*
943  * Quick exit if all the DML actions are publicized via this
944  * publication.
945  */
946  if (no_filter[PUBACTION_INSERT] &&
947  no_filter[PUBACTION_UPDATE] &&
948  no_filter[PUBACTION_DELETE])
949  {
950  has_filter = false;
951  break;
952  }
953 
954  /* No additional work for this publication. Next one. */
955  continue;
956  }
957 
958  /* Form the per pubaction row filter lists. */
959  if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
960  rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
961  TextDatumGetCString(rfdatum));
962  if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
963  rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
964  TextDatumGetCString(rfdatum));
965  if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
966  rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
967  TextDatumGetCString(rfdatum));
968 
969  ReleaseSysCache(rftuple);
970  } /* loop all subscribed publications */
971 
972  /* Clean the row filter */
973  for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
974  {
975  if (no_filter[idx])
976  {
977  list_free_deep(rfnodes[idx]);
978  rfnodes[idx] = NIL;
979  }
980  }
981 
982  if (has_filter)
983  {
985 
987 
988  /*
989  * Now all the filters for all pubactions are known. Combine them when
990  * their pubactions are the same.
991  */
992  oldctx = MemoryContextSwitchTo(entry->entry_cxt);
993  entry->estate = create_estate_for_relation(relation);
994  for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
995  {
996  List *filters = NIL;
997  Expr *rfnode;
998 
999  if (rfnodes[idx] == NIL)
1000  continue;
1001 
1002  foreach(lc, rfnodes[idx])
1003  filters = lappend(filters, stringToNode((char *) lfirst(lc)));
1004 
1005  /* combine the row filter and cache the ExprState */
1006  rfnode = make_orclause(filters);
1007  entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
1008  } /* for each pubaction */
1009  MemoryContextSwitchTo(oldctx);
1010 
1011  RelationClose(relation);
1012  }
1013 }
Datum idx(PG_FUNCTION_ARGS)
Definition: _int_op.c:259
#define TextDatumGetCString(d)
Definition: builtins.h:98
ExprState * ExecPrepareExpr(Expr *node, EState *estate)
Definition: execExpr.c:743
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
Expr * make_orclause(List *orclauses)
Definition: makefuncs.c:693
#define NUM_ROWFILTER_PUBACTIONS
Definition: pgoutput.c:102
static EState * create_estate_for_relation(Relation rel)
Definition: pgoutput.c:798
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
void * stringToNode(const char *str)
Definition: read.c:90
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:269
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:600
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:232
#define SearchSysCacheExists2(cacheId, key1, key2)
Definition: syscache.h:102

References Publication::alltables, create_estate_for_relation(), data, RelationSyncEntry::entry_cxt, RelationSyncEntry::estate, ExecPrepareExpr(), RelationSyncEntry::exprstate, get_rel_namespace(), HeapTupleIsValid, idx(), lappend(), lfirst, list_free_deep(), make_orclause(), MemoryContextSwitchTo(), NIL, NUM_ROWFILTER_PUBACTIONS, ObjectIdGetDatum(), Publication::oid, pgoutput_ensure_entry_cxt(), PUBACTION_DELETE, PUBACTION_INSERT, PUBACTION_UPDATE, Publication::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, RelationSyncEntry::publish_as_relid, PublicationActions::pubupdate, RelationClose(), RelationIdGetRelation(), ReleaseSysCache(), SearchSysCache2(), SearchSysCacheExists2, stringToNode(), SysCacheGetAttr(), and TextDatumGetCString.

Referenced by get_rel_sync_entry().

◆ pgoutput_send_begin()

static void pgoutput_send_begin ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 568 of file pgoutput.c.

569 {
570  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
572 
573  Assert(txndata);
574  Assert(!txndata->sent_begin_txn);
575 
576  OutputPluginPrepareWrite(ctx, !send_replication_origin);
577  logicalrep_write_begin(ctx->out, txn);
578  txndata->sent_begin_txn = true;
579 
580  send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
581  send_replication_origin);
582 
583  OutputPluginWrite(ctx, true);
584 }
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:48

References Assert, InvalidRepOriginId, logicalrep_write_begin(), ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, LogicalDecodingContext::out, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), send_repl_origin(), and PGOutputTxnData::sent_begin_txn.

Referenced by pgoutput_change(), pgoutput_message(), and pgoutput_truncate().

◆ pgoutput_shutdown()

static void pgoutput_shutdown ( LogicalDecodingContext ctx)
static

Definition at line 1742 of file pgoutput.c.

1743 {
1744  if (RelationSyncCache)
1745  {
1747  RelationSyncCache = NULL;
1748  }
1749 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865

References hash_destroy(), and RelationSyncCache.

Referenced by _PG_output_plugin_init().

◆ pgoutput_startup()

static void pgoutput_startup ( LogicalDecodingContext ctx,
OutputPluginOptions opt,
bool  is_init 
)
static

Definition at line 424 of file pgoutput.c.

426 {
428  static bool publication_callback_registered = false;
429 
430  /* Create our memory context for private allocations. */
431  data->context = AllocSetContextCreate(ctx->context,
432  "logical replication output context",
434 
435  data->cachectx = AllocSetContextCreate(ctx->context,
436  "logical replication cache context",
438 
440 
441  /* This plugin uses binary protocol. */
443 
444  /*
445  * This is replication start and not slot initialization.
446  *
447  * Parse and validate options passed by the client.
448  */
449  if (!is_init)
450  {
451  /* Parse the params and ERROR if we see any we don't recognize */
453 
454  /* Check if we support requested protocol */
455  if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
456  ereport(ERROR,
457  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
458  errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
459  data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
460 
461  if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
462  ereport(ERROR,
463  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
464  errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
465  data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
466 
467  /*
468  * Decide whether to enable streaming. It is disabled by default, in
469  * which case we just update the flag in decoding context. Otherwise
470  * we only allow it with sufficient version of the protocol, and when
471  * the output plugin supports it.
472  */
473  if (data->streaming == LOGICALREP_STREAM_OFF)
474  ctx->streaming = false;
475  else if (data->streaming == LOGICALREP_STREAM_ON &&
476  data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
477  ereport(ERROR,
478  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
479  errmsg("requested proto_version=%d does not support streaming, need %d or higher",
480  data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
481  else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
483  ereport(ERROR,
484  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
485  errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
487  else if (!ctx->streaming)
488  ereport(ERROR,
489  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
490  errmsg("streaming requested, but not supported by output plugin")));
491 
492  /*
493  * Here, we just check whether the two-phase option is passed by
494  * plugin and decide whether to enable it at later point of time. It
495  * remains enabled if the previous start-up has done so. But we only
496  * allow the option to be passed in with sufficient version of the
497  * protocol, and when the output plugin supports it.
498  */
499  if (!data->two_phase)
500  ctx->twophase_opt_given = false;
501  else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
502  ereport(ERROR,
503  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
504  errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
505  data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
506  else if (!ctx->twophase)
507  ereport(ERROR,
508  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
509  errmsg("two-phase commit requested, but not supported by output plugin")));
510  else
511  ctx->twophase_opt_given = true;
512 
513  /* Init publication state. */
514  data->publications = NIL;
515  publications_valid = false;
516 
517  /*
518  * Register callback for pg_publication if we didn't already do that
519  * during some previous call in this process.
520  */
521  if (!publication_callback_registered)
522  {
523  CacheRegisterSyscacheCallback(PUBLICATIONOID,
525  (Datum) 0);
526  publication_callback_registered = true;
527  }
528 
529  /* Initialize relation schema cache. */
531  }
532  else
533  {
534  /*
535  * Disable the streaming and prepared transactions during the slot
536  * initialization mode.
537  */
538  ctx->streaming = false;
539  ctx->twophase = false;
540  }
541 }
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
Definition: logicalproto.h:44
#define LOGICALREP_PROTO_MIN_VERSION_NUM
Definition: logicalproto.h:40
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:42
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:43
#define LOGICALREP_PROTO_MAX_VERSION_NUM
Definition: logicalproto.h:45
void * palloc0(Size size)
Definition: mcxt.c:1347
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
@ OUTPUT_PLUGIN_BINARY_OUTPUT
Definition: output_plugin.h:19
static void parse_output_parameters(List *options, PGOutputData *data)
Definition: pgoutput.c:282
static void init_rel_sync_cache(MemoryContext cachectx)
Definition: pgoutput.c:1926
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:1777
List * output_plugin_options
Definition: logical.h:59
OutputPluginOutputType output_type
Definition: output_plugin.h:28

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CacheMemoryContext, CacheRegisterSyscacheCallback(), LogicalDecodingContext::context, data, ereport, errcode(), errmsg(), ERROR, init_rel_sync_cache(), LOGICALREP_PROTO_MAX_VERSION_NUM, LOGICALREP_PROTO_MIN_VERSION_NUM, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM, LOGICALREP_PROTO_STREAM_VERSION_NUM, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM, NIL, OUTPUT_PLUGIN_BINARY_OUTPUT, LogicalDecodingContext::output_plugin_options, LogicalDecodingContext::output_plugin_private, OutputPluginOptions::output_type, palloc0(), parse_output_parameters(), publication_invalidation_cb(), publications_valid, LogicalDecodingContext::streaming, LogicalDecodingContext::twophase, and LogicalDecodingContext::twophase_opt_given.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_abort()

static void pgoutput_stream_abort ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  abort_lsn 
)
static

Definition at line 1845 of file pgoutput.c.

1848 {
1849  ReorderBufferTXN *toptxn;
1851  bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
1852 
1853  /*
1854  * The abort should happen outside streaming block, even for streamed
1855  * transactions. The transaction has to be marked as streamed, though.
1856  */
1857  Assert(!data->in_streaming);
1858 
1859  /* determine the toplevel transaction */
1860  toptxn = rbtxn_get_toptxn(txn);
1861 
1862  Assert(rbtxn_is_streamed(toptxn));
1863 
1864  OutputPluginPrepareWrite(ctx, true);
1865  logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
1866  txn->xact_time.abort_time, write_abort_info);
1867 
1868  OutputPluginWrite(ctx, true);
1869 
1870  cleanup_rel_sync_cache(toptxn->xid, false);
1871 }
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
Definition: pgoutput.c:2309
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid, XLogRecPtr abort_lsn, TimestampTz abort_time, bool write_abort_info)
Definition: proto.c:1145
#define rbtxn_is_streamed(txn)
TimestampTz abort_time
union ReorderBufferTXN::@114 xact_time

References ReorderBufferTXN::abort_time, Assert, cleanup_rel_sync_cache(), data, logicalrep_write_stream_abort(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), rbtxn_get_toptxn, rbtxn_is_streamed, ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_commit()

static void pgoutput_stream_commit ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 1878 of file pgoutput.c.

1881 {
1883 
1884  /*
1885  * The commit should happen outside streaming block, even for streamed
1886  * transactions. The transaction has to be marked as streamed, though.
1887  */
1888  Assert(!data->in_streaming);
1889  Assert(rbtxn_is_streamed(txn));
1890 
1891  OutputPluginUpdateProgress(ctx, false);
1892 
1893  OutputPluginPrepareWrite(ctx, true);
1894  logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1895  OutputPluginWrite(ctx, true);
1896 
1897  cleanup_rel_sync_cache(txn->xid, true);
1898 }
#define PG_USED_FOR_ASSERTS_ONLY
Definition: c.h:201
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:1091

References Assert, cleanup_rel_sync_cache(), data, logicalrep_write_stream_commit(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), OutputPluginWrite(), PG_USED_FOR_ASSERTS_ONLY, rbtxn_is_streamed, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_prepare_txn()

static void pgoutput_stream_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 1906 of file pgoutput.c.

1909 {
1910  Assert(rbtxn_is_streamed(txn));
1911 
1912  OutputPluginUpdateProgress(ctx, false);
1913  OutputPluginPrepareWrite(ctx, true);
1914  logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1915  OutputPluginWrite(ctx, true);
1916 }
void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:352

References Assert, logicalrep_write_stream_prepare(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), OutputPluginWrite(), and rbtxn_is_streamed.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_start()

static void pgoutput_stream_start ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 1792 of file pgoutput.c.

1794 {
1796  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1797 
1798  /* we can't nest streaming of transactions */
1799  Assert(!data->in_streaming);
1800 
1801  /*
1802  * If we already sent the first stream for this transaction then don't
1803  * send the origin id in the subsequent streams.
1804  */
1805  if (rbtxn_is_streamed(txn))
1806  send_replication_origin = false;
1807 
1808  OutputPluginPrepareWrite(ctx, !send_replication_origin);
1810 
1812  send_replication_origin);
1813 
1814  OutputPluginWrite(ctx, true);
1815 
1816  /* we're streaming a chunk of transaction now */
1817  data->in_streaming = true;
1818 }
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
Definition: proto.c:1048
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert, data, InvalidRepOriginId, InvalidXLogRecPtr, logicalrep_write_stream_start(), ReorderBufferTXN::origin_id, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), rbtxn_is_streamed, send_repl_origin(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_stop()

static void pgoutput_stream_stop ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 1824 of file pgoutput.c.

1826 {
1828 
1829  /* we should be streaming a transaction */
1830  Assert(data->in_streaming);
1831 
1832  OutputPluginPrepareWrite(ctx, true);
1834  OutputPluginWrite(ctx, true);
1835 
1836  /* we've stopped streaming a transaction */
1837  data->in_streaming = false;
1838 }
void logicalrep_write_stream_stop(StringInfo out)
Definition: proto.c:1082

References Assert, data, logicalrep_write_stream_stop(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

◆ pgoutput_truncate()

static void pgoutput_truncate ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
int  nrelations,
Relation  relations[],
ReorderBufferChange change 
)
static

Definition at line 1610 of file pgoutput.c.

1612 {
1615  MemoryContext old;
1616  RelationSyncEntry *relentry;
1617  int i;
1618  int nrelids;
1619  Oid *relids;
1621 
1622  /* Remember the xid for the change in streaming mode. See pgoutput_change. */
1623  if (data->in_streaming)
1624  xid = change->txn->xid;
1625 
1626  old = MemoryContextSwitchTo(data->context);
1627 
1628  relids = palloc0(nrelations * sizeof(Oid));
1629  nrelids = 0;
1630 
1631  for (i = 0; i < nrelations; i++)
1632  {
1633  Relation relation = relations[i];
1634  Oid relid = RelationGetRelid(relation);
1635 
1636  if (!is_publishable_relation(relation))
1637  continue;
1638 
1639  relentry = get_rel_sync_entry(data, relation);
1640 
1641  if (!relentry->pubactions.pubtruncate)
1642  continue;
1643 
1644  /*
1645  * Don't send partitions if the publication wants to send only the
1646  * root tables through it.
1647  */
1648  if (relation->rd_rel->relispartition &&
1649  relentry->publish_as_relid != relid)
1650  continue;
1651 
1652  relids[nrelids++] = relid;
1653 
1654  /* Send BEGIN if we haven't yet */
1655  if (txndata && !txndata->sent_begin_txn)
1656  pgoutput_send_begin(ctx, txn);
1657 
1658  maybe_send_schema(ctx, change, relation, relentry);
1659  }
1660 
1661  if (nrelids > 0)
1662  {
1663  OutputPluginPrepareWrite(ctx, true);
1665  xid,
1666  nrelids,
1667  relids,
1668  change->data.truncate.cascade,
1669  change->data.truncate.restart_seqs);
1670  OutputPluginWrite(ctx, true);
1671  }
1672 
1673  MemoryContextSwitchTo(old);
1674  MemoryContextReset(data->context);
1675 }
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:576
struct ReorderBufferChange::@108::@110 truncate

References ReorderBufferChange::data, data, get_rel_sync_entry(), i, InvalidTransactionId, is_publishable_relation(), logicalrep_write_truncate(), maybe_send_schema(), MemoryContextReset(), MemoryContextSwitchTo(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), palloc0(), pgoutput_send_begin(), RelationSyncEntry::pubactions, RelationSyncEntry::publish_as_relid, PublicationActions::pubtruncate, RelationData::rd_rel, RelationGetRelid, ReorderBufferChange::truncate, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ publication_invalidation_cb()

static void publication_invalidation_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 1777 of file pgoutput.c.

1778 {
1779  publications_valid = false;
1780 
1781  /*
1782  * Also invalidate per-relation cache so that next time the filtering info
1783  * is checked it will be updated with the new publication settings.
1784  */
1785  rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
1786 }
void * arg

References arg, publications_valid, and rel_sync_cache_publication_cb().

Referenced by pgoutput_startup().

◆ rel_sync_cache_publication_cb()

static void rel_sync_cache_publication_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 2395 of file pgoutput.c.

2396 {
2397  HASH_SEQ_STATUS status;
2398  RelationSyncEntry *entry;
2399 
2400  /*
2401  * We can get here if the plugin was used in SQL interface as the
2402  * RelationSyncCache is destroyed when the decoding finishes, but there is
2403  * no way to unregister the invalidation callbacks.
2404  */
2405  if (RelationSyncCache == NULL)
2406  return;
2407 
2408  /*
2409  * We have no easy way to identify which cache entries this invalidation
2410  * event might have affected, so just mark them all invalid.
2411  */
2412  hash_seq_init(&status, RelationSyncCache);
2413  while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2414  {
2415  entry->replicate_valid = false;
2416  }
2417 }

References hash_seq_init(), hash_seq_search(), RelationSyncCache, and RelationSyncEntry::replicate_valid.

Referenced by init_rel_sync_cache(), and publication_invalidation_cb().

◆ rel_sync_cache_relation_cb()

static void rel_sync_cache_relation_cb ( Datum  arg,
Oid  relid 
)
static

Definition at line 2344 of file pgoutput.c.

2345 {
2346  RelationSyncEntry *entry;
2347 
2348  /*
2349  * We can get here if the plugin was used in SQL interface as the
2350  * RelationSyncCache is destroyed when the decoding finishes, but there is
2351  * no way to unregister the relcache invalidation callback.
2352  */
2353  if (RelationSyncCache == NULL)
2354  return;
2355 
2356  /*
2357  * Nobody keeps pointers to entries in this hash table around outside
2358  * logical decoding callback calls - but invalidation events can come in
2359  * *during* a callback if we do any syscache access in the callback.
2360  * Because of that we must mark the cache entry as invalid but not damage
2361  * any of its substructure here. The next get_rel_sync_entry() call will
2362  * rebuild it all.
2363  */
2364  if (OidIsValid(relid))
2365  {
2366  /*
2367  * Getting invalidations for relations that aren't in the table is
2368  * entirely normal. So we don't care if it's found or not.
2369  */
2370  entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
2371  HASH_FIND, NULL);
2372  if (entry != NULL)
2373  entry->replicate_valid = false;
2374  }
2375  else
2376  {
2377  /* Whole cache must be flushed. */
2378  HASH_SEQ_STATUS status;
2379 
2380  hash_seq_init(&status, RelationSyncCache);
2381  while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2382  {
2383  entry->replicate_valid = false;
2384  }
2385  }
2386 }
#define OidIsValid(objectId)
Definition: c.h:729
@ HASH_FIND
Definition: hsearch.h:113

References HASH_FIND, hash_search(), hash_seq_init(), hash_seq_search(), OidIsValid, RelationSyncCache, and RelationSyncEntry::replicate_valid.

Referenced by init_rel_sync_cache().

◆ send_relation_and_attrs()

static void send_relation_and_attrs ( Relation  relation,
TransactionId  xid,
LogicalDecodingContext ctx,
RelationSyncEntry relentry 
)
static

Definition at line 756 of file pgoutput.c.

759 {
760  TupleDesc desc = RelationGetDescr(relation);
761  Bitmapset *columns = relentry->columns;
762  bool include_gencols = relentry->include_gencols;
763  int i;
764 
765  /*
766  * Write out type info if needed. We do that only for user-created types.
767  * We use FirstGenbkiObjectId as the cutoff, so that we only consider
768  * objects with hand-assigned OIDs to be "built in", not for instance any
769  * function or type defined in the information_schema. This is important
770  * because only hand-assigned OIDs can be expected to remain stable across
771  * major versions.
772  */
773  for (i = 0; i < desc->natts; i++)
774  {
775  Form_pg_attribute att = TupleDescAttr(desc, i);
776 
777  if (!logicalrep_should_publish_column(att, columns, include_gencols))
778  continue;
779 
780  if (att->atttypid < FirstGenbkiObjectId)
781  continue;
782 
783  OutputPluginPrepareWrite(ctx, false);
784  logicalrep_write_typ(ctx->out, xid, att->atttypid);
785  OutputPluginWrite(ctx, false);
786  }
787 
788  OutputPluginPrepareWrite(ctx, false);
789  logicalrep_write_rel(ctx->out, xid, relation, columns, include_gencols);
790  OutputPluginWrite(ctx, false);
791 }
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns, bool include_gencols)
Definition: proto.c:660
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
Definition: proto.c:715
bool logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns, bool include_gencols)
Definition: proto.c:1265
#define FirstGenbkiObjectId
Definition: transam.h:195

References RelationSyncEntry::columns, FirstGenbkiObjectId, i, RelationSyncEntry::include_gencols, logicalrep_should_publish_column(), logicalrep_write_rel(), logicalrep_write_typ(), TupleDescData::natts, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), RelationGetDescr, and TupleDescAttr.

Referenced by maybe_send_schema().

◆ send_repl_origin()

static void send_repl_origin ( LogicalDecodingContext ctx,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
bool  send_origin 
)
static

Definition at line 2421 of file pgoutput.c.

2423 {
2424  if (send_origin)
2425  {
2426  char *origin;
2427 
2428  /*----------
2429  * XXX: which behaviour do we want here?
2430  *
2431  * Alternatives:
2432  * - don't send origin message if origin name not found
2433  * (that's what we do now)
2434  * - throw error - that will break replication, not good
2435  * - send some special "unknown" origin
2436  *----------
2437  */
2438  if (replorigin_by_oid(origin_id, true, &origin))
2439  {
2440  /* Message boundary */
2441  OutputPluginWrite(ctx, false);
2442  OutputPluginPrepareWrite(ctx, true);
2443 
2444  logicalrep_write_origin(ctx->out, origin, origin_lsn);
2445  }
2446  }
2447 }
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:469
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:373

References logicalrep_write_origin(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and replorigin_by_oid().

Referenced by pgoutput_begin_prepare_txn(), pgoutput_send_begin(), and pgoutput_stream_start().

◆ set_schema_sent_in_streamed_txn()

static void set_schema_sent_in_streamed_txn ( RelationSyncEntry entry,
TransactionId  xid 
)
static

Definition at line 1990 of file pgoutput.c.

1991 {
1992  MemoryContext oldctx;
1993 
1995 
1996  entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
1997 
1998  MemoryContextSwitchTo(oldctx);
1999 }
List * lappend_xid(List *list, TransactionId datum)
Definition: list.c:393

References CacheMemoryContext, lappend_xid(), MemoryContextSwitchTo(), and RelationSyncEntry::streamed_txns.

Referenced by maybe_send_schema().

Variable Documentation

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 38 of file pgoutput.c.

◆ publications_valid

bool publications_valid
static

Definition at line 82 of file pgoutput.c.

Referenced by get_rel_sync_entry(), pgoutput_startup(), and publication_invalidation_cb().

◆ RelationSyncCache