PostgreSQL Source Code  git master
pgoutput.c File Reference
Include dependency graph for pgoutput.c:

Go to the source code of this file.

Data Structures

struct  RelationSyncEntry
 
struct  PGOutputTxnData
 

Macros

#define NUM_ROWFILTER_PUBACTIONS   (PUBACTION_DELETE+1)
 
#define CHANGES_THRESHOLD   100
 

Typedefs

typedef struct RelationSyncEntry RelationSyncEntry
 
typedef struct PGOutputTxnData PGOutputTxnData
 

Enumerations

enum  RowFilterPubAction { PUBACTION_INSERT , PUBACTION_UPDATE , PUBACTION_DELETE }
 

Functions

static void pgoutput_startup (LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
 
static void pgoutput_shutdown (LogicalDecodingContext *ctx)
 
static void pgoutput_begin_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_commit_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void pgoutput_truncate (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
static void pgoutput_message (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
static bool pgoutput_origin_filter (LogicalDecodingContext *ctx, RepOriginId origin_id)
 
static void pgoutput_begin_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static void pgoutput_commit_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_rollback_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
static void pgoutput_stream_start (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_stream_stop (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_stream_abort (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
 
static void pgoutput_stream_commit (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_stream_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static ListLoadPublications (List *pubnames)
 
static void publication_invalidation_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void send_relation_and_attrs (Relation relation, TransactionId xid, LogicalDecodingContext *ctx, Bitmapset *columns)
 
static void send_repl_origin (LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin)
 
static void update_replication_progress (LogicalDecodingContext *ctx, bool skipped_xact)
 
static void init_rel_sync_cache (MemoryContext cachectx)
 
static void cleanup_rel_sync_cache (TransactionId xid, bool is_commit)
 
static RelationSyncEntryget_rel_sync_entry (PGOutputData *data, Relation relation)
 
static void rel_sync_cache_relation_cb (Datum arg, Oid relid)
 
static void rel_sync_cache_publication_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void set_schema_sent_in_streamed_txn (RelationSyncEntry *entry, TransactionId xid)
 
static bool get_schema_sent_in_streamed_txn (RelationSyncEntry *entry, TransactionId xid)
 
static void init_tuple_slot (PGOutputData *data, Relation relation, RelationSyncEntry *entry)
 
static EStatecreate_estate_for_relation (Relation rel)
 
static void pgoutput_row_filter_init (PGOutputData *data, List *publications, RelationSyncEntry *entry)
 
static bool pgoutput_row_filter_exec_expr (ExprState *state, ExprContext *econtext)
 
static bool pgoutput_row_filter (Relation relation, TupleTableSlot *old_slot, TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry, ReorderBufferChangeType *action)
 
static void pgoutput_column_list_init (PGOutputData *data, List *publications, RelationSyncEntry *entry)
 
void _PG_output_plugin_init (OutputPluginCallbacks *cb)
 
static void parse_output_parameters (List *options, PGOutputData *data)
 
static void pgoutput_send_begin (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void maybe_send_schema (LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
 
static void pgoutput_ensure_entry_cxt (PGOutputData *data, RelationSyncEntry *entry)
 

Variables

 PG_MODULE_MAGIC
 
static bool publications_valid
 
static bool in_streaming
 
static bool publish_no_origin
 
static HTABRelationSyncCache = NULL
 

Macro Definition Documentation

◆ CHANGES_THRESHOLD

#define CHANGES_THRESHOLD   100

◆ NUM_ROWFILTER_PUBACTIONS

#define NUM_ROWFILTER_PUBACTIONS   (PUBACTION_DELETE+1)

Definition at line 108 of file pgoutput.c.

Typedef Documentation

◆ PGOutputTxnData

◆ RelationSyncEntry

Enumeration Type Documentation

◆ RowFilterPubAction

Enumerator
PUBACTION_INSERT 
PUBACTION_UPDATE 
PUBACTION_DELETE 

Definition at line 101 of file pgoutput.c.

102 {
106 };
@ PUBACTION_INSERT
Definition: pgoutput.c:103
@ PUBACTION_UPDATE
Definition: pgoutput.c:104
@ PUBACTION_DELETE
Definition: pgoutput.c:105

Function Documentation

◆ _PG_output_plugin_init()

void _PG_output_plugin_init ( OutputPluginCallbacks cb)

Definition at line 250 of file pgoutput.c.

251 {
253 
260 
267 
268  /* transaction streaming */
276  /* transaction streaming - two-phase commit */
278 }
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:914
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:36
static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
Definition: pgoutput.c:1367
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:600
static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: pgoutput.c:411
static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Definition: pgoutput.c:1605
static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: pgoutput.c:617
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
Definition: pgoutput.c:1722
static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: pgoutput.c:645
static void pgoutput_shutdown(LogicalDecodingContext *ctx)
Definition: pgoutput.c:1739
static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
Definition: pgoutput.c:1839
static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: pgoutput.c:1894
static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:532
static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:1868
static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:631
static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:1820
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:1789
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: pgoutput.c:250
static void pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: pgoutput.c:1675
static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:568
LogicalDecodeStreamChangeCB stream_change_cb
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
LogicalDecodeStreamMessageCB stream_message_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeTruncateCB truncate_cb
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
LogicalDecodeStreamPrepareCB stream_prepare_cb
LogicalDecodeCommitPreparedCB commit_prepared_cb
LogicalDecodeStreamStartCB stream_start_cb
LogicalDecodePrepareCB prepare_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeCommitCB commit_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamAbortCB stream_abort_cb
LogicalDecodeBeginPrepareCB begin_prepare_cb
LogicalDecodeChangeCB change_cb
LogicalDecodeShutdownCB shutdown_cb

References AssertVariableIsOfType, OutputPluginCallbacks::begin_cb, OutputPluginCallbacks::begin_prepare_cb, OutputPluginCallbacks::change_cb, OutputPluginCallbacks::commit_cb, OutputPluginCallbacks::commit_prepared_cb, OutputPluginCallbacks::filter_by_origin_cb, OutputPluginCallbacks::message_cb, pgoutput_begin_prepare_txn(), pgoutput_begin_txn(), pgoutput_change(), pgoutput_commit_prepared_txn(), pgoutput_commit_txn(), pgoutput_message(), pgoutput_origin_filter(), pgoutput_prepare_txn(), pgoutput_rollback_prepared_txn(), pgoutput_shutdown(), pgoutput_startup(), pgoutput_stream_abort(), pgoutput_stream_commit(), pgoutput_stream_prepare_txn(), pgoutput_stream_start(), pgoutput_stream_stop(), pgoutput_truncate(), OutputPluginCallbacks::prepare_cb, OutputPluginCallbacks::rollback_prepared_cb, OutputPluginCallbacks::shutdown_cb, OutputPluginCallbacks::startup_cb, OutputPluginCallbacks::stream_abort_cb, OutputPluginCallbacks::stream_change_cb, OutputPluginCallbacks::stream_commit_cb, OutputPluginCallbacks::stream_message_cb, OutputPluginCallbacks::stream_prepare_cb, OutputPluginCallbacks::stream_start_cb, OutputPluginCallbacks::stream_stop_cb, OutputPluginCallbacks::stream_truncate_cb, and OutputPluginCallbacks::truncate_cb.

◆ cleanup_rel_sync_cache()

static void cleanup_rel_sync_cache ( TransactionId  xid,
bool  is_commit 
)
static

Definition at line 2245 of file pgoutput.c.

2246 {
2247  HASH_SEQ_STATUS hash_seq;
2248  RelationSyncEntry *entry;
2249  ListCell *lc;
2250 
2251  Assert(RelationSyncCache != NULL);
2252 
2253  hash_seq_init(&hash_seq, RelationSyncCache);
2254  while ((entry = hash_seq_search(&hash_seq)) != NULL)
2255  {
2256  /*
2257  * We can set the schema_sent flag for an entry that has committed xid
2258  * in the list as that ensures that the subscriber would have the
2259  * corresponding schema and we don't need to send it unless there is
2260  * any invalidation for that relation.
2261  */
2262  foreach(lc, entry->streamed_txns)
2263  {
2264  if (xid == lfirst_xid(lc))
2265  {
2266  if (is_commit)
2267  entry->schema_sent = true;
2268 
2269  entry->streamed_txns =
2271  break;
2272  }
2273  }
2274  }
2275 }
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1431
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1421
Assert(fmt[strlen(fmt) - 1] !='\n')
#define lfirst_xid(lc)
Definition: pg_list.h:173
#define foreach_delete_current(lst, cell)
Definition: pg_list.h:388
static HTAB * RelationSyncCache
Definition: pgoutput.c:213
List * streamed_txns
Definition: pgoutput.c:135

References Assert(), foreach_delete_current, hash_seq_init(), hash_seq_search(), lfirst_xid, RelationSyncCache, RelationSyncEntry::schema_sent, and RelationSyncEntry::streamed_txns.

Referenced by pgoutput_stream_abort(), and pgoutput_stream_commit().

◆ create_estate_for_relation()

static EState * create_estate_for_relation ( Relation  rel)
static

Definition at line 777 of file pgoutput.c.

778 {
779  EState *estate;
780  RangeTblEntry *rte;
781 
782  estate = CreateExecutorState();
783 
784  rte = makeNode(RangeTblEntry);
785  rte->rtekind = RTE_RELATION;
786  rte->relid = RelationGetRelid(rel);
787  rte->relkind = rel->rd_rel->relkind;
789  ExecInitRangeTable(estate, list_make1(rte));
790 
791  estate->es_output_cid = GetCurrentCommandId(false);
792 
793  return estate;
794 }
void ExecInitRangeTable(EState *estate, List *rangeTable)
Definition: execUtils.c:757
EState * CreateExecutorState(void)
Definition: execUtils.c:92
#define AccessShareLock
Definition: lockdefs.h:36
#define makeNode(_type_)
Definition: nodes.h:165
@ RTE_RELATION
Definition: parsenodes.h:982
#define list_make1(x1)
Definition: pg_list.h:210
#define RelationGetRelid(relation)
Definition: rel.h:501
CommandId es_output_cid
Definition: execnodes.h:625
RTEKind rtekind
Definition: parsenodes.h:1001
Form_pg_class rd_rel
Definition: rel.h:110
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:817

References AccessShareLock, CreateExecutorState(), EState::es_output_cid, ExecInitRangeTable(), GetCurrentCommandId(), list_make1, makeNode, RelationData::rd_rel, RelationGetRelid, RangeTblEntry::relid, RangeTblEntry::relkind, RangeTblEntry::rellockmode, RTE_RELATION, and RangeTblEntry::rtekind.

Referenced by pgoutput_row_filter_init().

◆ get_rel_sync_entry()

static RelationSyncEntry * get_rel_sync_entry ( PGOutputData data,
Relation  relation 
)
static

Definition at line 1976 of file pgoutput.c.

1977 {
1978  RelationSyncEntry *entry;
1979  bool found;
1980  MemoryContext oldctx;
1981  Oid relid = RelationGetRelid(relation);
1982 
1983  Assert(RelationSyncCache != NULL);
1984 
1985  /* Find cached relation info, creating if not found */
1987  (void *) &relid,
1988  HASH_ENTER, &found);
1989  Assert(entry != NULL);
1990 
1991  /* initialize entry, if it's new */
1992  if (!found)
1993  {
1994  entry->replicate_valid = false;
1995  entry->schema_sent = false;
1996  entry->streamed_txns = NIL;
1997  entry->pubactions.pubinsert = entry->pubactions.pubupdate =
1998  entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
1999  entry->new_slot = NULL;
2000  entry->old_slot = NULL;
2001  memset(entry->exprstate, 0, sizeof(entry->exprstate));
2002  entry->entry_cxt = NULL;
2003  entry->publish_as_relid = InvalidOid;
2004  entry->columns = NULL;
2005  entry->attrmap = NULL;
2006  }
2007 
2008  /* Validate the entry */
2009  if (!entry->replicate_valid)
2010  {
2011  Oid schemaId = get_rel_namespace(relid);
2012  List *pubids = GetRelationPublications(relid);
2013 
2014  /*
2015  * We don't acquire a lock on the namespace system table as we build
2016  * the cache entry using a historic snapshot and all the later changes
2017  * are absorbed while decoding WAL.
2018  */
2019  List *schemaPubids = GetSchemaPublications(schemaId);
2020  ListCell *lc;
2021  Oid publish_as_relid = relid;
2022  int publish_ancestor_level = 0;
2023  bool am_partition = get_rel_relispartition(relid);
2024  char relkind = get_rel_relkind(relid);
2025  List *rel_publications = NIL;
2026 
2027  /* Reload publications if needed before use. */
2028  if (!publications_valid)
2029  {
2031  if (data->publications)
2032  {
2033  list_free_deep(data->publications);
2034  data->publications = NIL;
2035  }
2036  data->publications = LoadPublications(data->publication_names);
2037  MemoryContextSwitchTo(oldctx);
2038  publications_valid = true;
2039  }
2040 
2041  /*
2042  * Reset schema_sent status as the relation definition may have
2043  * changed. Also reset pubactions to empty in case rel was dropped
2044  * from a publication. Also free any objects that depended on the
2045  * earlier definition.
2046  */
2047  entry->schema_sent = false;
2048  list_free(entry->streamed_txns);
2049  entry->streamed_txns = NIL;
2050  bms_free(entry->columns);
2051  entry->columns = NULL;
2052  entry->pubactions.pubinsert = false;
2053  entry->pubactions.pubupdate = false;
2054  entry->pubactions.pubdelete = false;
2055  entry->pubactions.pubtruncate = false;
2056 
2057  /*
2058  * Tuple slots cleanups. (Will be rebuilt later if needed).
2059  */
2060  if (entry->old_slot)
2062  if (entry->new_slot)
2064 
2065  entry->old_slot = NULL;
2066  entry->new_slot = NULL;
2067 
2068  if (entry->attrmap)
2069  free_attrmap(entry->attrmap);
2070  entry->attrmap = NULL;
2071 
2072  /*
2073  * Row filter cache cleanups.
2074  */
2075  if (entry->entry_cxt)
2077 
2078  entry->entry_cxt = NULL;
2079  entry->estate = NULL;
2080  memset(entry->exprstate, 0, sizeof(entry->exprstate));
2081 
2082  /*
2083  * Build publication cache. We can't use one provided by relcache as
2084  * relcache considers all publications that the given relation is in,
2085  * but here we only need to consider ones that the subscriber
2086  * requested.
2087  */
2088  foreach(lc, data->publications)
2089  {
2090  Publication *pub = lfirst(lc);
2091  bool publish = false;
2092 
2093  /*
2094  * Under what relid should we publish changes in this publication?
2095  * We'll use the top-most relid across all publications. Also
2096  * track the ancestor level for this publication.
2097  */
2098  Oid pub_relid = relid;
2099  int ancestor_level = 0;
2100 
2101  /*
2102  * If this is a FOR ALL TABLES publication, pick the partition
2103  * root and set the ancestor level accordingly.
2104  */
2105  if (pub->alltables)
2106  {
2107  publish = true;
2108  if (pub->pubviaroot && am_partition)
2109  {
2110  List *ancestors = get_partition_ancestors(relid);
2111 
2112  pub_relid = llast_oid(ancestors);
2113  ancestor_level = list_length(ancestors);
2114  }
2115  }
2116 
2117  if (!publish)
2118  {
2119  bool ancestor_published = false;
2120 
2121  /*
2122  * For a partition, check if any of the ancestors are
2123  * published. If so, note down the topmost ancestor that is
2124  * published via this publication, which will be used as the
2125  * relation via which to publish the partition's changes.
2126  */
2127  if (am_partition)
2128  {
2129  Oid ancestor;
2130  int level;
2131  List *ancestors = get_partition_ancestors(relid);
2132 
2133  ancestor = GetTopMostAncestorInPublication(pub->oid,
2134  ancestors,
2135  &level);
2136 
2137  if (ancestor != InvalidOid)
2138  {
2139  ancestor_published = true;
2140  if (pub->pubviaroot)
2141  {
2142  pub_relid = ancestor;
2143  ancestor_level = level;
2144  }
2145  }
2146  }
2147 
2148  if (list_member_oid(pubids, pub->oid) ||
2149  list_member_oid(schemaPubids, pub->oid) ||
2150  ancestor_published)
2151  publish = true;
2152  }
2153 
2154  /*
2155  * If the relation is to be published, determine actions to
2156  * publish, and list of columns, if appropriate.
2157  *
2158  * Don't publish changes for partitioned tables, because
2159  * publishing those of its partitions suffices, unless partition
2160  * changes won't be published due to pubviaroot being set.
2161  */
2162  if (publish &&
2163  (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2164  {
2165  entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
2166  entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
2167  entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
2169 
2170  /*
2171  * We want to publish the changes as the top-most ancestor
2172  * across all publications. So we need to check if the already
2173  * calculated level is higher than the new one. If yes, we can
2174  * ignore the new value (as it's a child). Otherwise the new
2175  * value is an ancestor, so we keep it.
2176  */
2177  if (publish_ancestor_level > ancestor_level)
2178  continue;
2179 
2180  /*
2181  * If we found an ancestor higher up in the tree, discard the
2182  * list of publications through which we replicate it, and use
2183  * the new ancestor.
2184  */
2185  if (publish_ancestor_level < ancestor_level)
2186  {
2187  publish_as_relid = pub_relid;
2188  publish_ancestor_level = ancestor_level;
2189 
2190  /* reset the publication list for this relation */
2191  rel_publications = NIL;
2192  }
2193  else
2194  {
2195  /* Same ancestor level, has to be the same OID. */
2196  Assert(publish_as_relid == pub_relid);
2197  }
2198 
2199  /* Track publications for this ancestor. */
2200  rel_publications = lappend(rel_publications, pub);
2201  }
2202  }
2203 
2204  entry->publish_as_relid = publish_as_relid;
2205 
2206  /*
2207  * Initialize the tuple slot, map, and row filter. These are only used
2208  * when publishing inserts, updates, or deletes.
2209  */
2210  if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2211  entry->pubactions.pubdelete)
2212  {
2213  /* Initialize the tuple slot and map */
2214  init_tuple_slot(data, relation, entry);
2215 
2216  /* Initialize the row filter */
2217  pgoutput_row_filter_init(data, rel_publications, entry);
2218 
2219  /* Initialize the column list */
2220  pgoutput_column_list_init(data, rel_publications, entry);
2221  }
2222 
2223  list_free(pubids);
2224  list_free(schemaPubids);
2225  list_free(rel_publications);
2226 
2227  entry->replicate_valid = true;
2228  }
2229 
2230  return entry;
2231 }
void free_attrmap(AttrMap *map)
Definition: attmap.c:57
void bms_free(Bitmapset *a)
Definition: bitmapset.c:209
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:953
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1254
@ HASH_ENTER
Definition: hsearch.h:114
List * lappend(List *list, void *datum)
Definition: list.c:338
void list_free(List *list)
Definition: list.c:1545
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:721
void list_free_deep(List *list)
Definition: list.c:1559
bool get_rel_relispartition(Oid relid)
Definition: lsyscache.c:2009
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1985
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1934
MemoryContext CacheMemoryContext
Definition: mcxt.c:133
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:376
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:135
List * get_partition_ancestors(Oid relid)
Definition: partition.c:133
const void * data
#define lfirst(lc)
Definition: pg_list.h:170
static int list_length(const List *l)
Definition: pg_list.h:150
#define NIL
Definition: pg_list.h:66
#define llast_oid(l)
Definition: pg_list.h:198
List * GetSchemaPublications(Oid schemaid)
List * GetRelationPublications(Oid relid)
Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
static List * LoadPublications(List *pubnames)
Definition: pgoutput.c:1752
static void init_tuple_slot(PGOutputData *data, Relation relation, RelationSyncEntry *entry)
Definition: pgoutput.c:1094
static void pgoutput_row_filter_init(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition: pgoutput.c:848
static void pgoutput_column_list_init(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition: pgoutput.c:994
static bool publications_valid
Definition: pgoutput.c:81
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
Definition: pg_list.h:52
PublicationActions pubactions
ExprState * exprstate[NUM_ROWFILTER_PUBACTIONS]
Definition: pgoutput.c:148
Bitmapset * columns
Definition: pgoutput.c:174
PublicationActions pubactions
Definition: pgoutput.c:139
TupleTableSlot * old_slot
Definition: pgoutput.c:151
bool replicate_valid
Definition: pgoutput.c:132
MemoryContext entry_cxt
Definition: pgoutput.c:180
EState * estate
Definition: pgoutput.c:149
TupleTableSlot * new_slot
Definition: pgoutput.c:150
AttrMap * attrmap
Definition: pgoutput.c:167

References Publication::alltables, Assert(), RelationSyncEntry::attrmap, bms_free(), CacheMemoryContext, RelationSyncEntry::columns, data, RelationSyncEntry::entry_cxt, RelationSyncEntry::estate, ExecDropSingleTupleTableSlot(), RelationSyncEntry::exprstate, free_attrmap(), get_partition_ancestors(), get_rel_namespace(), get_rel_relispartition(), get_rel_relkind(), GetRelationPublications(), GetSchemaPublications(), GetTopMostAncestorInPublication(), HASH_ENTER, hash_search(), init_tuple_slot(), InvalidOid, lappend(), lfirst, list_free(), list_free_deep(), list_length(), list_member_oid(), llast_oid, LoadPublications(), MemoryContextDelete(), MemoryContextSwitchTo(), RelationSyncEntry::new_slot, NIL, Publication::oid, RelationSyncEntry::old_slot, pgoutput_column_list_init(), pgoutput_row_filter_init(), RelationSyncEntry::pubactions, Publication::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, publications_valid, RelationSyncEntry::publish_as_relid, PublicationActions::pubtruncate, PublicationActions::pubupdate, Publication::pubviaroot, RelationGetRelid, RelationSyncCache, RelationSyncEntry::replicate_valid, RelationSyncEntry::schema_sent, and RelationSyncEntry::streamed_txns.

Referenced by pgoutput_change(), and pgoutput_truncate().

◆ get_schema_sent_in_streamed_txn()

static bool get_schema_sent_in_streamed_txn ( RelationSyncEntry entry,
TransactionId  xid 
)
static

Definition at line 1945 of file pgoutput.c.

1946 {
1947  return list_member_xid(entry->streamed_txns, xid);
1948 }
bool list_member_xid(const List *list, TransactionId datum)
Definition: list.c:741

References list_member_xid(), and RelationSyncEntry::streamed_txns.

Referenced by maybe_send_schema().

◆ init_rel_sync_cache()

static void init_rel_sync_cache ( MemoryContext  cachectx)
static

Definition at line 1914 of file pgoutput.c.

1915 {
1916  HASHCTL ctl;
1917 
1918  if (RelationSyncCache != NULL)
1919  return;
1920 
1921  /* Make a new hash table for the cache */
1922  ctl.keysize = sizeof(Oid);
1923  ctl.entrysize = sizeof(RelationSyncEntry);
1924  ctl.hcxt = cachectx;
1925 
1926  RelationSyncCache = hash_create("logical replication output relation cache",
1927  128, &ctl,
1929 
1930  Assert(RelationSyncCache != NULL);
1931 
1935  (Datum) 0);
1938  (Datum) 0);
1939 }
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:350
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
Definition: inval.c:1561
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1519
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:2332
struct RelationSyncEntry RelationSyncEntry
static void rel_sync_cache_relation_cb(Datum arg, Oid relid)
Definition: pgoutput.c:2281
uintptr_t Datum
Definition: postgres.h:412
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
@ PUBLICATIONNAMESPACEMAP
Definition: syscache.h:82
@ PUBLICATIONRELMAP
Definition: syscache.h:85

References Assert(), CacheRegisterRelcacheCallback(), CacheRegisterSyscacheCallback(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, PUBLICATIONNAMESPACEMAP, PUBLICATIONRELMAP, rel_sync_cache_publication_cb(), rel_sync_cache_relation_cb(), and RelationSyncCache.

Referenced by pgoutput_startup().

◆ init_tuple_slot()

static void init_tuple_slot ( PGOutputData data,
Relation  relation,
RelationSyncEntry entry 
)
static

Definition at line 1094 of file pgoutput.c.

1096 {
1097  MemoryContext oldctx;
1098  TupleDesc oldtupdesc;
1099  TupleDesc newtupdesc;
1100 
1101  oldctx = MemoryContextSwitchTo(data->cachectx);
1102 
1103  /*
1104  * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1105  * live as long as the cache remains.
1106  */
1107  oldtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
1108  newtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
1109 
1110  entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1111  entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
1112 
1113  MemoryContextSwitchTo(oldctx);
1114 
1115  /*
1116  * Cache the map that will be used to convert the relation's tuples into
1117  * the ancestor's format, if needed.
1118  */
1119  if (entry->publish_as_relid != RelationGetRelid(relation))
1120  {
1121  Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
1122  TupleDesc indesc = RelationGetDescr(relation);
1123  TupleDesc outdesc = RelationGetDescr(ancestor);
1124 
1125  /* Map must live as long as the session does. */
1127 
1128  entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
1129 
1130  MemoryContextSwitchTo(oldctx);
1131  RelationClose(ancestor);
1132  }
1133 }
AttrMap * build_attrmap_by_name_if_req(TupleDesc indesc, TupleDesc outdesc, bool missing_ok)
Definition: attmap.c:264
const TupleTableSlotOps TTSOpsHeapTuple
Definition: execTuples.c:84
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1238
#define RelationGetDescr(relation)
Definition: rel.h:527
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2054
void RelationClose(Relation relation)
Definition: relcache.c:2160
TupleDesc CreateTupleDescCopy(TupleDesc tupdesc)
Definition: tupdesc.c:111

References RelationSyncEntry::attrmap, build_attrmap_by_name_if_req(), CacheMemoryContext, CreateTupleDescCopy(), data, MakeSingleTupleTableSlot(), MemoryContextSwitchTo(), RelationSyncEntry::new_slot, RelationSyncEntry::old_slot, RelationSyncEntry::publish_as_relid, RelationClose(), RelationGetDescr, RelationGetRelid, RelationIdGetRelation(), and TTSOpsHeapTuple.

Referenced by get_rel_sync_entry().

◆ LoadPublications()

static List * LoadPublications ( List pubnames)
static

Definition at line 1752 of file pgoutput.c.

1753 {
1754  List *result = NIL;
1755  ListCell *lc;
1756 
1757  foreach(lc, pubnames)
1758  {
1759  char *pubname = (char *) lfirst(lc);
1760  Publication *pub = GetPublicationByName(pubname, false);
1761 
1762  result = lappend(result, pub);
1763  }
1764 
1765  return result;
1766 }
Publication * GetPublicationByName(const char *pubname, bool missing_ok)

References GetPublicationByName(), lappend(), lfirst, and NIL.

Referenced by get_rel_sync_entry().

◆ maybe_send_schema()

static void maybe_send_schema ( LogicalDecodingContext ctx,
ReorderBufferChange change,
Relation  relation,
RelationSyncEntry relentry 
)
static

Definition at line 663 of file pgoutput.c.

666 {
667  bool schema_sent;
670 
671  /*
672  * Remember XID of the (sub)transaction for the change. We don't care if
673  * it's top-level transaction or not (we have already sent that XID in
674  * start of the current streaming block).
675  *
676  * If we're not in a streaming block, just use InvalidTransactionId and
677  * the write methods will not include it.
678  */
679  if (in_streaming)
680  xid = change->txn->xid;
681 
682  if (change->txn->toptxn)
683  topxid = change->txn->toptxn->xid;
684  else
685  topxid = xid;
686 
687  /*
688  * Do we need to send the schema? We do track streamed transactions
689  * separately, because those may be applied later (and the regular
690  * transactions won't see their effects until then) and in an order that
691  * we don't know at this point.
692  *
693  * XXX There is a scope of optimization here. Currently, we always send
694  * the schema first time in a streaming transaction but we can probably
695  * avoid that by checking 'relentry->schema_sent' flag. However, before
696  * doing that we need to study its impact on the case where we have a mix
697  * of streaming and non-streaming transactions.
698  */
699  if (in_streaming)
700  schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
701  else
702  schema_sent = relentry->schema_sent;
703 
704  /* Nothing to do if we already sent the schema. */
705  if (schema_sent)
706  return;
707 
708  /*
709  * Send the schema. If the changes will be published using an ancestor's
710  * schema, not the relation's own, send that ancestor's schema before
711  * sending relation's own (XXX - maybe sending only the former suffices?).
712  */
713  if (relentry->publish_as_relid != RelationGetRelid(relation))
714  {
715  Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
716 
717  send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
718  RelationClose(ancestor);
719  }
720 
721  send_relation_and_attrs(relation, xid, ctx, relentry->columns);
722 
723  if (in_streaming)
724  set_schema_sent_in_streamed_txn(relentry, topxid);
725  else
726  relentry->schema_sent = true;
727 }
uint32 TransactionId
Definition: c.h:588
static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx, Bitmapset *columns)
Definition: pgoutput.c:733
static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:1955
static bool in_streaming
Definition: pgoutput.c:82
static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:1945
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:88
struct ReorderBufferTXN * toptxn
TransactionId xid
#define InvalidTransactionId
Definition: transam.h:31

References RelationSyncEntry::columns, get_schema_sent_in_streamed_txn(), in_streaming, InvalidTransactionId, RelationSyncEntry::publish_as_relid, RelationClose(), RelationGetRelid, RelationIdGetRelation(), RelationSyncEntry::schema_sent, send_relation_and_attrs(), set_schema_sent_in_streamed_txn(), ReorderBufferTXN::toptxn, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by pgoutput_change(), and pgoutput_truncate().

◆ parse_output_parameters()

static void parse_output_parameters ( List options,
PGOutputData data 
)
static

Definition at line 281 of file pgoutput.c.

282 {
283  ListCell *lc;
284  bool protocol_version_given = false;
285  bool publication_names_given = false;
286  bool binary_option_given = false;
287  bool messages_option_given = false;
288  bool streaming_given = false;
289  bool two_phase_option_given = false;
290  bool origin_option_given = false;
291 
292  data->binary = false;
293  data->streaming = false;
294  data->messages = false;
295  data->two_phase = false;
296 
297  foreach(lc, options)
298  {
299  DefElem *defel = (DefElem *) lfirst(lc);
300 
301  Assert(defel->arg == NULL || IsA(defel->arg, String));
302 
303  /* Check each param, whether or not we recognize it */
304  if (strcmp(defel->defname, "proto_version") == 0)
305  {
306  unsigned long parsed;
307  char *endptr;
308 
309  if (protocol_version_given)
310  ereport(ERROR,
311  (errcode(ERRCODE_SYNTAX_ERROR),
312  errmsg("conflicting or redundant options")));
313  protocol_version_given = true;
314 
315  errno = 0;
316  parsed = strtoul(strVal(defel->arg), &endptr, 10);
317  if (errno != 0 || *endptr != '\0')
318  ereport(ERROR,
319  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
320  errmsg("invalid proto_version")));
321 
322  if (parsed > PG_UINT32_MAX)
323  ereport(ERROR,
324  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
325  errmsg("proto_version \"%s\" out of range",
326  strVal(defel->arg))));
327 
328  data->protocol_version = (uint32) parsed;
329  }
330  else if (strcmp(defel->defname, "publication_names") == 0)
331  {
332  if (publication_names_given)
333  ereport(ERROR,
334  (errcode(ERRCODE_SYNTAX_ERROR),
335  errmsg("conflicting or redundant options")));
336  publication_names_given = true;
337 
338  if (!SplitIdentifierString(strVal(defel->arg), ',',
339  &data->publication_names))
340  ereport(ERROR,
341  (errcode(ERRCODE_INVALID_NAME),
342  errmsg("invalid publication_names syntax")));
343  }
344  else if (strcmp(defel->defname, "binary") == 0)
345  {
346  if (binary_option_given)
347  ereport(ERROR,
348  (errcode(ERRCODE_SYNTAX_ERROR),
349  errmsg("conflicting or redundant options")));
350  binary_option_given = true;
351 
352  data->binary = defGetBoolean(defel);
353  }
354  else if (strcmp(defel->defname, "messages") == 0)
355  {
356  if (messages_option_given)
357  ereport(ERROR,
358  (errcode(ERRCODE_SYNTAX_ERROR),
359  errmsg("conflicting or redundant options")));
360  messages_option_given = true;
361 
362  data->messages = defGetBoolean(defel);
363  }
364  else if (strcmp(defel->defname, "streaming") == 0)
365  {
366  if (streaming_given)
367  ereport(ERROR,
368  (errcode(ERRCODE_SYNTAX_ERROR),
369  errmsg("conflicting or redundant options")));
370  streaming_given = true;
371 
372  data->streaming = defGetBoolean(defel);
373  }
374  else if (strcmp(defel->defname, "two_phase") == 0)
375  {
376  if (two_phase_option_given)
377  ereport(ERROR,
378  (errcode(ERRCODE_SYNTAX_ERROR),
379  errmsg("conflicting or redundant options")));
380  two_phase_option_given = true;
381 
382  data->two_phase = defGetBoolean(defel);
383  }
384  else if (strcmp(defel->defname, "origin") == 0)
385  {
386  if (origin_option_given)
387  ereport(ERROR,
388  errcode(ERRCODE_SYNTAX_ERROR),
389  errmsg("conflicting or redundant options"));
390  origin_option_given = true;
391 
392  data->origin = defGetString(defel);
393  if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0)
394  publish_no_origin = true;
395  else if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) == 0)
396  publish_no_origin = false;
397  else
398  ereport(ERROR,
399  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
400  errmsg("unrecognized origin value: \"%s\"", data->origin));
401  }
402  else
403  elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
404  }
405 }
unsigned int uint32
Definition: c.h:442
#define PG_UINT32_MAX
Definition: c.h:526
bool defGetBoolean(DefElem *def)
Definition: define.c:108
char * defGetString(DefElem *def)
Definition: define.c:49
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
#define IsA(nodeptr, _type_)
Definition: nodes.h:168
#define LOGICALREP_ORIGIN_NONE
#define LOGICALREP_ORIGIN_ANY
static bool publish_no_origin
Definition: pgoutput.c:83
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
char * defname
Definition: parsenodes.h:779
Node * arg
Definition: parsenodes.h:780
Definition: value.h:64
#define strVal(v)
Definition: value.h:82
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3712

References DefElem::arg, Assert(), data, defGetBoolean(), defGetString(), DefElem::defname, elog(), ereport, errcode(), errmsg(), ERROR, IsA, lfirst, LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE, pg_strcasecmp(), PG_UINT32_MAX, publish_no_origin, SplitIdentifierString(), and strVal.

Referenced by pgoutput_startup().

◆ pgoutput_begin_prepare_txn()

static void pgoutput_begin_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 600 of file pgoutput.c.

601 {
602  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
603 
604  OutputPluginPrepareWrite(ctx, !send_replication_origin);
606 
607  send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
608  send_replication_origin);
609 
610  OutputPluginWrite(ctx, true);
611 }
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:662
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:649
#define InvalidRepOriginId
Definition: origin.h:33
static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin)
Definition: pgoutput.c:2358
void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:127
StringInfo out
Definition: logical.h:71
RepOriginId origin_id
XLogRecPtr origin_lsn

References InvalidRepOriginId, logicalrep_write_begin_prepare(), ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and send_repl_origin().

Referenced by _PG_output_plugin_init().

◆ pgoutput_begin_txn()

static void pgoutput_begin_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 532 of file pgoutput.c.

533 {
535  sizeof(PGOutputTxnData));
536 
537  txn->output_plugin_private = txndata;
538 }
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1037
MemoryContext context
Definition: logical.h:36
void * output_plugin_private

References LogicalDecodingContext::context, MemoryContextAllocZero(), and ReorderBufferTXN::output_plugin_private.

Referenced by _PG_output_plugin_init().

◆ pgoutput_change()

static void pgoutput_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 1367 of file pgoutput.c.

1369 {
1372  MemoryContext old;
1373  RelationSyncEntry *relentry;
1375  Relation ancestor = NULL;
1376  Relation targetrel = relation;
1378  TupleTableSlot *old_slot = NULL;
1379  TupleTableSlot *new_slot = NULL;
1380 
1381  update_replication_progress(ctx, false);
1382 
1383  if (!is_publishable_relation(relation))
1384  return;
1385 
1386  /*
1387  * Remember the xid for the change in streaming mode. We need to send xid
1388  * with each change in the streaming mode so that subscriber can make
1389  * their association and on aborts, it can discard the corresponding
1390  * changes.
1391  */
1392  if (in_streaming)
1393  xid = change->txn->xid;
1394 
1395  relentry = get_rel_sync_entry(data, relation);
1396 
1397  /* First check the table filter */
1398  switch (action)
1399  {
1401  if (!relentry->pubactions.pubinsert)
1402  return;
1403  break;
1405  if (!relentry->pubactions.pubupdate)
1406  return;
1407  break;
1409  if (!relentry->pubactions.pubdelete)
1410  return;
1411  break;
1412  default:
1413  Assert(false);
1414  }
1415 
1416  /* Avoid leaking memory by using and resetting our own context */
1417  old = MemoryContextSwitchTo(data->context);
1418 
1419  /* Send the data */
1420  switch (action)
1421  {
1423  new_slot = relentry->new_slot;
1424  ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
1425  new_slot, false);
1426 
1427  /* Switch relation if publishing via root. */
1428  if (relentry->publish_as_relid != RelationGetRelid(relation))
1429  {
1430  Assert(relation->rd_rel->relispartition);
1431  ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1432  targetrel = ancestor;
1433  /* Convert tuple if needed. */
1434  if (relentry->attrmap)
1435  {
1436  TupleDesc tupdesc = RelationGetDescr(targetrel);
1437 
1438  new_slot = execute_attr_map_slot(relentry->attrmap,
1439  new_slot,
1440  MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
1441  }
1442  }
1443 
1444  /* Check row filter */
1445  if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry,
1446  &action))
1447  break;
1448 
1449  /*
1450  * Send BEGIN if we haven't yet.
1451  *
1452  * We send the BEGIN message after ensuring that we will actually
1453  * send the change. This avoids sending a pair of BEGIN/COMMIT
1454  * messages for empty transactions.
1455  */
1456  if (txndata && !txndata->sent_begin_txn)
1457  pgoutput_send_begin(ctx, txn);
1458 
1459  /*
1460  * Schema should be sent using the original relation because it
1461  * also sends the ancestor's relation.
1462  */
1463  maybe_send_schema(ctx, change, relation, relentry);
1464 
1465  OutputPluginPrepareWrite(ctx, true);
1466  logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1467  data->binary, relentry->columns);
1468  OutputPluginWrite(ctx, true);
1469  break;
1471  if (change->data.tp.oldtuple)
1472  {
1473  old_slot = relentry->old_slot;
1474  ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
1475  old_slot, false);
1476  }
1477 
1478  new_slot = relentry->new_slot;
1479  ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
1480  new_slot, false);
1481 
1482  /* Switch relation if publishing via root. */
1483  if (relentry->publish_as_relid != RelationGetRelid(relation))
1484  {
1485  Assert(relation->rd_rel->relispartition);
1486  ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1487  targetrel = ancestor;
1488  /* Convert tuples if needed. */
1489  if (relentry->attrmap)
1490  {
1491  TupleDesc tupdesc = RelationGetDescr(targetrel);
1492 
1493  if (old_slot)
1494  old_slot = execute_attr_map_slot(relentry->attrmap,
1495  old_slot,
1496  MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
1497 
1498  new_slot = execute_attr_map_slot(relentry->attrmap,
1499  new_slot,
1500  MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
1501  }
1502  }
1503 
1504  /* Check row filter */
1505  if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
1506  relentry, &action))
1507  break;
1508 
1509  /* Send BEGIN if we haven't yet */
1510  if (txndata && !txndata->sent_begin_txn)
1511  pgoutput_send_begin(ctx, txn);
1512 
1513  maybe_send_schema(ctx, change, relation, relentry);
1514 
1515  OutputPluginPrepareWrite(ctx, true);
1516 
1517  /*
1518  * Updates could be transformed to inserts or deletes based on the
1519  * results of the row filter for old and new tuple.
1520  */
1521  switch (action)
1522  {
1524  logicalrep_write_insert(ctx->out, xid, targetrel,
1525  new_slot, data->binary,
1526  relentry->columns);
1527  break;
1529  logicalrep_write_update(ctx->out, xid, targetrel,
1530  old_slot, new_slot, data->binary,
1531  relentry->columns);
1532  break;
1534  logicalrep_write_delete(ctx->out, xid, targetrel,
1535  old_slot, data->binary,
1536  relentry->columns);
1537  break;
1538  default:
1539  Assert(false);
1540  }
1541 
1542  OutputPluginWrite(ctx, true);
1543  break;
1545  if (change->data.tp.oldtuple)
1546  {
1547  old_slot = relentry->old_slot;
1548 
1549  ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
1550  old_slot, false);
1551 
1552  /* Switch relation if publishing via root. */
1553  if (relentry->publish_as_relid != RelationGetRelid(relation))
1554  {
1555  Assert(relation->rd_rel->relispartition);
1556  ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1557  targetrel = ancestor;
1558  /* Convert tuple if needed. */
1559  if (relentry->attrmap)
1560  {
1561  TupleDesc tupdesc = RelationGetDescr(targetrel);
1562 
1563  old_slot = execute_attr_map_slot(relentry->attrmap,
1564  old_slot,
1565  MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
1566  }
1567  }
1568 
1569  /* Check row filter */
1570  if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
1571  relentry, &action))
1572  break;
1573 
1574  /* Send BEGIN if we haven't yet */
1575  if (txndata && !txndata->sent_begin_txn)
1576  pgoutput_send_begin(ctx, txn);
1577 
1578  maybe_send_schema(ctx, change, relation, relentry);
1579 
1580  OutputPluginPrepareWrite(ctx, true);
1581  logicalrep_write_delete(ctx->out, xid, targetrel,
1582  old_slot, data->binary,
1583  relentry->columns);
1584  OutputPluginWrite(ctx, true);
1585  }
1586  else
1587  elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1588  break;
1589  default:
1590  Assert(false);
1591  }
1592 
1593  if (RelationIsValid(ancestor))
1594  {
1595  RelationClose(ancestor);
1596  ancestor = NULL;
1597  }
1598 
1599  /* Cleanup */
1600  MemoryContextSwitchTo(old);
1601  MemoryContextReset(data->context);
1602 }
#define DEBUG1
Definition: elog.h:30
TupleTableSlot * MakeTupleTableSlot(TupleDesc tupleDesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1112
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
TupleTableSlot * ExecStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1352
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:303
bool is_publishable_relation(Relation rel)
static void pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:546
static void update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact)
Definition: pgoutput.c:2396
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Relation relation)
Definition: pgoutput.c:1976
static void maybe_send_schema(LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
Definition: pgoutput.c:663
static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot, TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry, ReorderBufferChangeType *action)
Definition: pgoutput.c:1186
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns)
Definition: proto.c:458
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns)
Definition: proto.c:414
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns)
Definition: proto.c:533
#define RelationIsValid(relation)
Definition: rel.h:474
ReorderBufferChangeType
Definition: reorderbuffer.h:55
@ REORDER_BUFFER_CHANGE_INSERT
Definition: reorderbuffer.h:56
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:58
@ REORDER_BUFFER_CHANGE_UPDATE
Definition: reorderbuffer.h:57
void * output_plugin_private
Definition: logical.h:76
ReorderBufferChangeType action
Definition: reorderbuffer.h:85
union ReorderBufferChange::@97 data
struct ReorderBufferChange::@97::@98 tp
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:192

References generate_unaccent_rules::action, ReorderBufferChange::action, Assert(), RelationSyncEntry::attrmap, RelationSyncEntry::columns, ReorderBufferChange::data, data, DEBUG1, elog(), ExecStoreHeapTuple(), execute_attr_map_slot(), get_rel_sync_entry(), in_streaming, InvalidTransactionId, is_publishable_relation(), logicalrep_write_delete(), logicalrep_write_insert(), logicalrep_write_update(), MakeTupleTableSlot(), maybe_send_schema(), MemoryContextReset(), MemoryContextSwitchTo(), RelationSyncEntry::new_slot, RelationSyncEntry::old_slot, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pgoutput_row_filter(), pgoutput_send_begin(), RelationSyncEntry::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, RelationSyncEntry::publish_as_relid, PublicationActions::pubupdate, RelationData::rd_rel, RelationClose(), RelationGetDescr, RelationGetRelid, RelationIdGetRelation(), RelationIsValid, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::tp, TTSOpsVirtual, ReorderBufferChange::txn, update_replication_progress(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_column_list_init()

static void pgoutput_column_list_init ( PGOutputData data,
List publications,
RelationSyncEntry entry 
)
static

Definition at line 994 of file pgoutput.c.

996 {
997  ListCell *lc;
998  bool first = true;
1000 
1001  /*
1002  * Find if there are any column lists for this relation. If there are,
1003  * build a bitmap using the column lists.
1004  *
1005  * Multiple publications might have multiple column lists for this
1006  * relation.
1007  *
1008  * Note that we don't support the case where the column list is different
1009  * for the same table when combining publications. See comments atop
1010  * fetch_table_list. But one can later change the publication so we still
1011  * need to check all the given publication-table mappings and report an
1012  * error if any publications have a different column list.
1013  *
1014  * FOR ALL TABLES and FOR TABLES IN SCHEMA imply "don't use column list".
1015  */
1016  foreach(lc, publications)
1017  {
1018  Publication *pub = lfirst(lc);
1019  HeapTuple cftuple = NULL;
1020  Datum cfdatum = 0;
1021  Bitmapset *cols = NULL;
1022 
1023  /*
1024  * If the publication is FOR ALL TABLES then it is treated the same as
1025  * if there are no column lists (even if other publications have a
1026  * list).
1027  */
1028  if (!pub->alltables)
1029  {
1030  bool pub_no_list = true;
1031 
1032  /*
1033  * Check for the presence of a column list in this publication.
1034  *
1035  * Note: If we find no pg_publication_rel row, it's a publication
1036  * defined for a whole schema, so it can't have a column list,
1037  * just like a FOR ALL TABLES publication.
1038  */
1041  ObjectIdGetDatum(pub->oid));
1042 
1043  if (HeapTupleIsValid(cftuple))
1044  {
1045  /* Lookup the column list attribute. */
1046  cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
1047  Anum_pg_publication_rel_prattrs,
1048  &pub_no_list);
1049 
1050  /* Build the column list bitmap in the per-entry context. */
1051  if (!pub_no_list) /* when not null */
1052  {
1054 
1055  cols = pub_collist_to_bitmapset(cols, cfdatum,
1056  entry->entry_cxt);
1057 
1058  /*
1059  * If column list includes all the columns of the table,
1060  * set it to NULL.
1061  */
1062  if (bms_num_members(cols) == RelationGetNumberOfAttributes(relation))
1063  {
1064  bms_free(cols);
1065  cols = NULL;
1066  }
1067  }
1068 
1069  ReleaseSysCache(cftuple);
1070  }
1071  }
1072 
1073  if (first)
1074  {
1075  entry->columns = cols;
1076  first = false;
1077  }
1078  else if (!bms_equal(entry->columns, cols))
1079  ereport(ERROR,
1080  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1081  errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
1083  RelationGetRelationName(relation)));
1084  } /* loop all subscribed publications */
1085 
1086  RelationClose(relation);
1087 }
bool bms_equal(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:94
int bms_num_members(const Bitmapset *a)
Definition: bitmapset.c:649
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3331
Bitmapset * pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
static void pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
Definition: pgoutput.c:826
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:600
#define RelationGetNumberOfAttributes(relation)
Definition: rel.h:507
#define RelationGetRelationName(relation)
Definition: rel.h:535
#define RelationGetNamespace(relation)
Definition: rel.h:542
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1221
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1434
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:1184

References Publication::alltables, bms_equal(), bms_free(), bms_num_members(), RelationSyncEntry::columns, data, RelationSyncEntry::entry_cxt, ereport, errcode(), errmsg(), ERROR, get_namespace_name(), HeapTupleIsValid, lfirst, ObjectIdGetDatum(), Publication::oid, pgoutput_ensure_entry_cxt(), pub_collist_to_bitmapset(), PUBLICATIONRELMAP, RelationSyncEntry::publish_as_relid, RelationClose(), RelationGetNamespace, RelationGetNumberOfAttributes, RelationGetRelationName, RelationIdGetRelation(), ReleaseSysCache(), SearchSysCache2(), and SysCacheGetAttr().

Referenced by get_rel_sync_entry().

◆ pgoutput_commit_prepared_txn()

static void pgoutput_commit_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 631 of file pgoutput.c.

633 {
634  update_replication_progress(ctx, false);
635 
636  OutputPluginPrepareWrite(ctx, true);
637  logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
638  OutputPluginWrite(ctx, true);
639 }
void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:248

References logicalrep_write_commit_prepared(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and update_replication_progress().

Referenced by _PG_output_plugin_init().

◆ pgoutput_commit_txn()

static void pgoutput_commit_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 568 of file pgoutput.c.

570 {
572  bool sent_begin_txn;
573 
574  Assert(txndata);
575 
576  /*
577  * We don't need to send the commit message unless some relevant change
578  * from this transaction has been sent to the downstream.
579  */
580  sent_begin_txn = txndata->sent_begin_txn;
581  update_replication_progress(ctx, !sent_begin_txn);
582  pfree(txndata);
583  txn->output_plugin_private = NULL;
584 
585  if (!sent_begin_txn)
586  {
587  elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
588  return;
589  }
590 
591  OutputPluginPrepareWrite(ctx, true);
592  logicalrep_write_commit(ctx->out, txn, commit_lsn);
593  OutputPluginWrite(ctx, true);
594 }
void pfree(void *pointer)
Definition: mcxt.c:1306
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:89
bool sent_begin_txn
Definition: pgoutput.c:209

References Assert(), DEBUG1, elog(), logicalrep_write_commit(), LogicalDecodingContext::out, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pfree(), PGOutputTxnData::sent_begin_txn, update_replication_progress(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_ensure_entry_cxt()

static void pgoutput_ensure_entry_cxt ( PGOutputData data,
RelationSyncEntry entry 
)
static

Definition at line 826 of file pgoutput.c.

827 {
828  Relation relation;
829 
830  /* The context may already exist, in which case bail out. */
831  if (entry->entry_cxt)
832  return;
833 
834  relation = RelationIdGetRelation(entry->publish_as_relid);
835 
836  entry->entry_cxt = AllocSetContextCreate(data->cachectx,
837  "entry private context",
839 
841  RelationGetRelationName(relation));
842 }
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_SMALL_SIZES
Definition: memutils.h:163
#define MemoryContextCopyAndSetIdentifier(cxt, id)
Definition: memutils.h:101

References ALLOCSET_SMALL_SIZES, AllocSetContextCreate, data, RelationSyncEntry::entry_cxt, MemoryContextCopyAndSetIdentifier, RelationSyncEntry::publish_as_relid, RelationGetRelationName, and RelationIdGetRelation().

Referenced by pgoutput_column_list_init(), and pgoutput_row_filter_init().

◆ pgoutput_message()

static void pgoutput_message ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  message_lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)
static

Definition at line 1675 of file pgoutput.c.

1678 {
1681 
1682  update_replication_progress(ctx, false);
1683 
1684  if (!data->messages)
1685  return;
1686 
1687  /*
1688  * Remember the xid for the message in streaming mode. See
1689  * pgoutput_change.
1690  */
1691  if (in_streaming)
1692  xid = txn->xid;
1693 
1694  /*
1695  * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1696  */
1697  if (transactional)
1698  {
1700 
1701  /* Send BEGIN if we haven't yet */
1702  if (txndata && !txndata->sent_begin_txn)
1703  pgoutput_send_begin(ctx, txn);
1704  }
1705 
1706  OutputPluginPrepareWrite(ctx, true);
1708  xid,
1709  message_lsn,
1710  transactional,
1711  prefix,
1712  sz,
1713  message);
1714  OutputPluginWrite(ctx, true);
1715 }
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: proto.c:643

References data, if(), in_streaming, InvalidTransactionId, logicalrep_write_message(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pgoutput_send_begin(), PGOutputTxnData::sent_begin_txn, update_replication_progress(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_origin_filter()

static bool pgoutput_origin_filter ( LogicalDecodingContext ctx,
RepOriginId  origin_id 
)
static

Definition at line 1722 of file pgoutput.c.

1724 {
1725  if (publish_no_origin && origin_id != InvalidRepOriginId)
1726  return true;
1727 
1728  return false;
1729 }

References InvalidRepOriginId, and publish_no_origin.

Referenced by _PG_output_plugin_init().

◆ pgoutput_prepare_txn()

static void pgoutput_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 617 of file pgoutput.c.

619 {
620  update_replication_progress(ctx, false);
621 
622  OutputPluginPrepareWrite(ctx, true);
623  logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
624  OutputPluginWrite(ctx, true);
625 }
void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:198

References logicalrep_write_prepare(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and update_replication_progress().

Referenced by _PG_output_plugin_init().

◆ pgoutput_rollback_prepared_txn()

static void pgoutput_rollback_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_end_lsn,
TimestampTz  prepare_time 
)
static

Definition at line 645 of file pgoutput.c.

649 {
650  update_replication_progress(ctx, false);
651 
652  OutputPluginPrepareWrite(ctx, true);
653  logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
654  prepare_time);
655  OutputPluginWrite(ctx, true);
656 }
void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: proto.c:304

References logicalrep_write_rollback_prepared(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and update_replication_progress().

Referenced by _PG_output_plugin_init().

◆ pgoutput_row_filter()

static bool pgoutput_row_filter ( Relation  relation,
TupleTableSlot old_slot,
TupleTableSlot **  new_slot_ptr,
RelationSyncEntry entry,
ReorderBufferChangeType action 
)
static

Definition at line 1186 of file pgoutput.c.

1189 {
1190  TupleDesc desc;
1191  int i;
1192  bool old_matched,
1193  new_matched,
1194  result;
1195  TupleTableSlot *tmp_new_slot;
1196  TupleTableSlot *new_slot = *new_slot_ptr;
1197  ExprContext *ecxt;
1198  ExprState *filter_exprstate;
1199 
1200  /*
1201  * We need this map to avoid relying on ReorderBufferChangeType enums
1202  * having specific values.
1203  */
1204  static const int map_changetype_pubaction[] = {
1208  };
1209 
1213 
1214  Assert(new_slot || old_slot);
1215 
1216  /* Get the corresponding row filter */
1217  filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1218 
1219  /* Bail out if there is no row filter */
1220  if (!filter_exprstate)
1221  return true;
1222 
1223  elog(DEBUG3, "table \"%s.%s\" has row filter",
1225  RelationGetRelationName(relation));
1226 
1228 
1229  ecxt = GetPerTupleExprContext(entry->estate);
1230 
1231  /*
1232  * For the following occasions where there is only one tuple, we can
1233  * evaluate the row filter for that tuple and return.
1234  *
1235  * For inserts, we only have the new tuple.
1236  *
1237  * For updates, we can have only a new tuple when none of the replica
1238  * identity columns changed and none of those columns have external data
1239  * but we still need to evaluate the row filter for the new tuple as the
1240  * existing values of those columns might not match the filter. Also,
1241  * users can use constant expressions in the row filter, so we anyway need
1242  * to evaluate it for the new tuple.
1243  *
1244  * For deletes, we only have the old tuple.
1245  */
1246  if (!new_slot || !old_slot)
1247  {
1248  ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1249  result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1250 
1251  return result;
1252  }
1253 
1254  /*
1255  * Both the old and new tuples must be valid only for updates and need to
1256  * be checked against the row filter.
1257  */
1258  Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1259 
1260  slot_getallattrs(new_slot);
1261  slot_getallattrs(old_slot);
1262 
1263  tmp_new_slot = NULL;
1264  desc = RelationGetDescr(relation);
1265 
1266  /*
1267  * The new tuple might not have all the replica identity columns, in which
1268  * case it needs to be copied over from the old tuple.
1269  */
1270  for (i = 0; i < desc->natts; i++)
1271  {
1272  Form_pg_attribute att = TupleDescAttr(desc, i);
1273 
1274  /*
1275  * if the column in the new tuple or old tuple is null, nothing to do
1276  */
1277  if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1278  continue;
1279 
1280  /*
1281  * Unchanged toasted replica identity columns are only logged in the
1282  * old tuple. Copy this over to the new tuple. The changed (or WAL
1283  * Logged) toast values are always assembled in memory and set as
1284  * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1285  */
1286  if (att->attlen == -1 &&
1287  VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
1288  !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
1289  {
1290  if (!tmp_new_slot)
1291  {
1292  tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
1293  ExecClearTuple(tmp_new_slot);
1294 
1295  memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1296  desc->natts * sizeof(Datum));
1297  memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1298  desc->natts * sizeof(bool));
1299  }
1300 
1301  tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1302  tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1303  }
1304  }
1305 
1306  ecxt->ecxt_scantuple = old_slot;
1307  old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1308 
1309  if (tmp_new_slot)
1310  {
1311  ExecStoreVirtualTuple(tmp_new_slot);
1312  ecxt->ecxt_scantuple = tmp_new_slot;
1313  }
1314  else
1315  ecxt->ecxt_scantuple = new_slot;
1316 
1317  new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1318 
1319  /*
1320  * Case 1: if both tuples don't match the row filter, bailout. Send
1321  * nothing.
1322  */
1323  if (!old_matched && !new_matched)
1324  return false;
1325 
1326  /*
1327  * Case 2: if the old tuple doesn't satisfy the row filter but the new
1328  * tuple does, transform the UPDATE into INSERT.
1329  *
1330  * Use the newly transformed tuple that must contain the column values for
1331  * all the replica identity columns. This is required to ensure that the
1332  * while inserting the tuple in the downstream node, we have all the
1333  * required column values.
1334  */
1335  if (!old_matched && new_matched)
1336  {
1338 
1339  if (tmp_new_slot)
1340  *new_slot_ptr = tmp_new_slot;
1341  }
1342 
1343  /*
1344  * Case 3: if the old tuple satisfies the row filter but the new tuple
1345  * doesn't, transform the UPDATE into DELETE.
1346  *
1347  * This transformation does not require another tuple. The Old tuple will
1348  * be used for DELETE.
1349  */
1350  else if (old_matched && !new_matched)
1352 
1353  /*
1354  * Case 4: if both tuples match the row filter, transformation isn't
1355  * required. (*action is default UPDATE).
1356  */
1357 
1358  return true;
1359 }
#define DEBUG3
Definition: elog.h:28
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1552
#define ResetPerTupleExprContext(estate)
Definition: executor.h:547
#define GetPerTupleExprContext(estate)
Definition: executor.h:538
int i
Definition: isn.c:73
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:207
static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
Definition: pgoutput.c:803
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: postgres.h:328
TupleTableSlot * ecxt_scantuple
Definition: execnodes.h:247
bool * tts_isnull
Definition: tuptable.h:128
Datum * tts_values
Definition: tuptable.h:126
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:433
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:362

References generate_unaccent_rules::action, Assert(), DEBUG3, ExprContext::ecxt_scantuple, elog(), RelationSyncEntry::estate, ExecClearTuple(), ExecStoreVirtualTuple(), RelationSyncEntry::exprstate, get_namespace_name(), GetPerTupleExprContext, i, MakeSingleTupleTableSlot(), TupleDescData::natts, pgoutput_row_filter_exec_expr(), PUBACTION_DELETE, PUBACTION_INSERT, PUBACTION_UPDATE, RelationGetDescr, RelationGetNamespace, RelationGetRelationName, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, ResetPerTupleExprContext, slot_getallattrs(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, TTSOpsVirtual, TupleDescAttr, and VARATT_IS_EXTERNAL_ONDISK.

Referenced by pgoutput_change().

◆ pgoutput_row_filter_exec_expr()

static bool pgoutput_row_filter_exec_expr ( ExprState state,
ExprContext econtext 
)
static

Definition at line 803 of file pgoutput.c.

804 {
805  Datum ret;
806  bool isnull;
807 
808  Assert(state != NULL);
809 
810  ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
811 
812  elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
813  isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
814  isnull ? "true" : "false");
815 
816  if (isnull)
817  return false;
818 
819  return DatumGetBool(ret);
820 }
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:336
static bool DatumGetBool(Datum X)
Definition: postgres.h:438
Definition: regguts.h:318

References Assert(), DatumGetBool(), DEBUG3, elog(), and ExecEvalExprSwitchContext().

Referenced by pgoutput_row_filter().

◆ pgoutput_row_filter_init()

static void pgoutput_row_filter_init ( PGOutputData data,
List publications,
RelationSyncEntry entry 
)
static

Definition at line 848 of file pgoutput.c.

850 {
851  ListCell *lc;
852  List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
853  bool no_filter[] = {false, false, false}; /* One per pubaction */
854  MemoryContext oldctx;
855  int idx;
856  bool has_filter = true;
857  Oid schemaid = get_rel_namespace(entry->publish_as_relid);
858 
859  /*
860  * Find if there are any row filters for this relation. If there are, then
861  * prepare the necessary ExprState and cache it in entry->exprstate. To
862  * build an expression state, we need to ensure the following:
863  *
864  * All the given publication-table mappings must be checked.
865  *
866  * Multiple publications might have multiple row filters for this
867  * relation. Since row filter usage depends on the DML operation, there
868  * are multiple lists (one for each operation) to which row filters will
869  * be appended.
870  *
871  * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row
872  * filter expression" so it takes precedence.
873  */
874  foreach(lc, publications)
875  {
876  Publication *pub = lfirst(lc);
877  HeapTuple rftuple = NULL;
878  Datum rfdatum = 0;
879  bool pub_no_filter = true;
880 
881  /*
882  * If the publication is FOR ALL TABLES, or the publication includes a
883  * FOR TABLES IN SCHEMA where the table belongs to the referred
884  * schema, then it is treated the same as if there are no row filters
885  * (even if other publications have a row filter).
886  */
887  if (!pub->alltables &&
889  ObjectIdGetDatum(schemaid),
890  ObjectIdGetDatum(pub->oid)))
891  {
892  /*
893  * Check for the presence of a row filter in this publication.
894  */
897  ObjectIdGetDatum(pub->oid));
898 
899  if (HeapTupleIsValid(rftuple))
900  {
901  /* Null indicates no filter. */
902  rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
903  Anum_pg_publication_rel_prqual,
904  &pub_no_filter);
905  }
906  }
907 
908  if (pub_no_filter)
909  {
910  if (rftuple)
911  ReleaseSysCache(rftuple);
912 
913  no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
914  no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
915  no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
916 
917  /*
918  * Quick exit if all the DML actions are publicized via this
919  * publication.
920  */
921  if (no_filter[PUBACTION_INSERT] &&
922  no_filter[PUBACTION_UPDATE] &&
923  no_filter[PUBACTION_DELETE])
924  {
925  has_filter = false;
926  break;
927  }
928 
929  /* No additional work for this publication. Next one. */
930  continue;
931  }
932 
933  /* Form the per pubaction row filter lists. */
934  if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
935  rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
936  TextDatumGetCString(rfdatum));
937  if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
938  rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
939  TextDatumGetCString(rfdatum));
940  if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
941  rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
942  TextDatumGetCString(rfdatum));
943 
944  ReleaseSysCache(rftuple);
945  } /* loop all subscribed publications */
946 
947  /* Clean the row filter */
948  for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
949  {
950  if (no_filter[idx])
951  {
952  list_free_deep(rfnodes[idx]);
953  rfnodes[idx] = NIL;
954  }
955  }
956 
957  if (has_filter)
958  {
960 
962 
963  /*
964  * Now all the filters for all pubactions are known. Combine them when
965  * their pubactions are the same.
966  */
967  oldctx = MemoryContextSwitchTo(entry->entry_cxt);
968  entry->estate = create_estate_for_relation(relation);
969  for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
970  {
971  List *filters = NIL;
972  Expr *rfnode;
973 
974  if (rfnodes[idx] == NIL)
975  continue;
976 
977  foreach(lc, rfnodes[idx])
978  filters = lappend(filters, stringToNode((char *) lfirst(lc)));
979 
980  /* combine the row filter and cache the ExprState */
981  rfnode = make_orclause(filters);
982  entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
983  } /* for each pubaction */
984  MemoryContextSwitchTo(oldctx);
985 
986  RelationClose(relation);
987  }
988 }
Datum idx(PG_FUNCTION_ARGS)
Definition: _int_op.c:259
#define TextDatumGetCString(d)
Definition: builtins.h:89
ExprState * ExecPrepareExpr(Expr *node, EState *estate)
Definition: execExpr.c:747
Expr * make_orclause(List *orclauses)
Definition: makefuncs.c:652
#define NUM_ROWFILTER_PUBACTIONS
Definition: pgoutput.c:108
static EState * create_estate_for_relation(Relation rel)
Definition: pgoutput.c:777
void * stringToNode(const char *str)
Definition: read.c:90
#define SearchSysCacheExists2(cacheId, key1, key2)
Definition: syscache.h:190

References Publication::alltables, create_estate_for_relation(), data, RelationSyncEntry::entry_cxt, RelationSyncEntry::estate, ExecPrepareExpr(), RelationSyncEntry::exprstate, get_rel_namespace(), HeapTupleIsValid, idx(), lappend(), lfirst, list_free_deep(), make_orclause(), MemoryContextSwitchTo(), NIL, NUM_ROWFILTER_PUBACTIONS, ObjectIdGetDatum(), Publication::oid, pgoutput_ensure_entry_cxt(), PUBACTION_DELETE, PUBACTION_INSERT, PUBACTION_UPDATE, Publication::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, PUBLICATIONNAMESPACEMAP, PUBLICATIONRELMAP, RelationSyncEntry::publish_as_relid, PublicationActions::pubupdate, RelationClose(), RelationIdGetRelation(), ReleaseSysCache(), SearchSysCache2(), SearchSysCacheExists2, stringToNode(), SysCacheGetAttr(), and TextDatumGetCString.

Referenced by get_rel_sync_entry().

◆ pgoutput_send_begin()

static void pgoutput_send_begin ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 546 of file pgoutput.c.

547 {
548  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
550 
551  Assert(txndata);
552  Assert(!txndata->sent_begin_txn);
553 
554  OutputPluginPrepareWrite(ctx, !send_replication_origin);
555  logicalrep_write_begin(ctx->out, txn);
556  txndata->sent_begin_txn = true;
557 
558  send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
559  send_replication_origin);
560 
561  OutputPluginWrite(ctx, true);
562 }
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:60

References Assert(), InvalidRepOriginId, logicalrep_write_begin(), ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, LogicalDecodingContext::out, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), send_repl_origin(), and PGOutputTxnData::sent_begin_txn.

Referenced by pgoutput_change(), pgoutput_message(), and pgoutput_truncate().

◆ pgoutput_shutdown()

static void pgoutput_shutdown ( LogicalDecodingContext ctx)
static

Definition at line 1739 of file pgoutput.c.

1740 {
1741  if (RelationSyncCache)
1742  {
1744  RelationSyncCache = NULL;
1745  }
1746 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:863

References hash_destroy(), and RelationSyncCache.

Referenced by _PG_output_plugin_init().

◆ pgoutput_startup()

static void pgoutput_startup ( LogicalDecodingContext ctx,
OutputPluginOptions opt,
bool  is_init 
)
static

Definition at line 411 of file pgoutput.c.

413 {
415 
416  /* Create our memory context for private allocations. */
417  data->context = AllocSetContextCreate(ctx->context,
418  "logical replication output context",
420 
421  data->cachectx = AllocSetContextCreate(ctx->context,
422  "logical replication cache context",
424 
426 
427  /* This plugin uses binary protocol. */
429 
430  /*
431  * This is replication start and not slot initialization.
432  *
433  * Parse and validate options passed by the client.
434  */
435  if (!is_init)
436  {
437  /* Parse the params and ERROR if we see any we don't recognize */
439 
440  /* Check if we support requested protocol */
441  if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
442  ereport(ERROR,
443  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
444  errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
445  data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
446 
447  if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
448  ereport(ERROR,
449  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
450  errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
451  data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
452 
453  if (data->publication_names == NIL)
454  ereport(ERROR,
455  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
456  errmsg("publication_names parameter missing")));
457 
458  /*
459  * Decide whether to enable streaming. It is disabled by default, in
460  * which case we just update the flag in decoding context. Otherwise
461  * we only allow it with sufficient version of the protocol, and when
462  * the output plugin supports it.
463  */
464  if (!data->streaming)
465  ctx->streaming = false;
466  else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
467  ereport(ERROR,
468  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
469  errmsg("requested proto_version=%d does not support streaming, need %d or higher",
470  data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
471  else if (!ctx->streaming)
472  ereport(ERROR,
473  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
474  errmsg("streaming requested, but not supported by output plugin")));
475 
476  /* Also remember we're currently not streaming any transaction. */
477  in_streaming = false;
478 
479  /*
480  * Here, we just check whether the two-phase option is passed by
481  * plugin and decide whether to enable it at later point of time. It
482  * remains enabled if the previous start-up has done so. But we only
483  * allow the option to be passed in with sufficient version of the
484  * protocol, and when the output plugin supports it.
485  */
486  if (!data->two_phase)
487  ctx->twophase_opt_given = false;
488  else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
489  ereport(ERROR,
490  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
491  errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
492  data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
493  else if (!ctx->twophase)
494  ereport(ERROR,
495  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
496  errmsg("two-phase commit requested, but not supported by output plugin")));
497  else
498  ctx->twophase_opt_given = true;
499 
500  /* Init publication state. */
501  data->publications = NIL;
502  publications_valid = false;
505  (Datum) 0);
506 
507  /* Initialize relation schema cache. */
509  }
510  else
511  {
512  /*
513  * Disable the streaming and prepared transactions during the slot
514  * initialization mode.
515  */
516  ctx->streaming = false;
517  ctx->twophase = false;
518  }
519 }
#define LOGICALREP_PROTO_MIN_VERSION_NUM
Definition: logicalproto.h:36
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:38
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:39
#define LOGICALREP_PROTO_MAX_VERSION_NUM
Definition: logicalproto.h:40
void * palloc0(Size size)
Definition: mcxt.c:1230
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
@ OUTPUT_PLUGIN_BINARY_OUTPUT
Definition: output_plugin.h:19
static void parse_output_parameters(List *options, PGOutputData *data)
Definition: pgoutput.c:281
static void init_rel_sync_cache(MemoryContext cachectx)
Definition: pgoutput.c:1914
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:1774
List * output_plugin_options
Definition: logical.h:59
OutputPluginOutputType output_type
Definition: output_plugin.h:28
@ PUBLICATIONOID
Definition: syscache.h:83

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CacheMemoryContext, CacheRegisterSyscacheCallback(), LogicalDecodingContext::context, data, ereport, errcode(), errmsg(), ERROR, in_streaming, init_rel_sync_cache(), LOGICALREP_PROTO_MAX_VERSION_NUM, LOGICALREP_PROTO_MIN_VERSION_NUM, LOGICALREP_PROTO_STREAM_VERSION_NUM, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM, NIL, OUTPUT_PLUGIN_BINARY_OUTPUT, LogicalDecodingContext::output_plugin_options, LogicalDecodingContext::output_plugin_private, OutputPluginOptions::output_type, palloc0(), parse_output_parameters(), publication_invalidation_cb(), PUBLICATIONOID, publications_valid, LogicalDecodingContext::streaming, LogicalDecodingContext::twophase, and LogicalDecodingContext::twophase_opt_given.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_abort()

static void pgoutput_stream_abort ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  abort_lsn 
)
static

Definition at line 1839 of file pgoutput.c.

1842 {
1843  ReorderBufferTXN *toptxn;
1844 
1845  /*
1846  * The abort should happen outside streaming block, even for streamed
1847  * transactions. The transaction has to be marked as streamed, though.
1848  */
1849  Assert(!in_streaming);
1850 
1851  /* determine the toplevel transaction */
1852  toptxn = (txn->toptxn) ? txn->toptxn : txn;
1853 
1854  Assert(rbtxn_is_streamed(toptxn));
1855 
1856  OutputPluginPrepareWrite(ctx, true);
1857  logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid);
1858  OutputPluginWrite(ctx, true);
1859 
1860  cleanup_rel_sync_cache(toptxn->xid, false);
1861 }
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
Definition: pgoutput.c:2245
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid)
Definition: proto.c:1169
#define rbtxn_is_streamed(txn)

References Assert(), cleanup_rel_sync_cache(), in_streaming, logicalrep_write_stream_abort(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), rbtxn_is_streamed, ReorderBufferTXN::toptxn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_commit()

static void pgoutput_stream_commit ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 1868 of file pgoutput.c.

1871 {
1872  /*
1873  * The commit should happen outside streaming block, even for streamed
1874  * transactions. The transaction has to be marked as streamed, though.
1875  */
1876  Assert(!in_streaming);
1877  Assert(rbtxn_is_streamed(txn));
1878 
1879  update_replication_progress(ctx, false);
1880 
1881  OutputPluginPrepareWrite(ctx, true);
1882  logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1883  OutputPluginWrite(ctx, true);
1884 
1885  cleanup_rel_sync_cache(txn->xid, true);
1886 }
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:1118

References Assert(), cleanup_rel_sync_cache(), in_streaming, logicalrep_write_stream_commit(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), rbtxn_is_streamed, update_replication_progress(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_prepare_txn()

static void pgoutput_stream_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 1894 of file pgoutput.c.

1897 {
1898  Assert(rbtxn_is_streamed(txn));
1899 
1900  update_replication_progress(ctx, false);
1901  OutputPluginPrepareWrite(ctx, true);
1902  logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1903  OutputPluginWrite(ctx, true);
1904 }
void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:364

References Assert(), logicalrep_write_stream_prepare(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), rbtxn_is_streamed, and update_replication_progress().

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_start()

static void pgoutput_stream_start ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 1789 of file pgoutput.c.

1791 {
1792  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1793 
1794  /* we can't nest streaming of transactions */
1795  Assert(!in_streaming);
1796 
1797  /*
1798  * If we already sent the first stream for this transaction then don't
1799  * send the origin id in the subsequent streams.
1800  */
1801  if (rbtxn_is_streamed(txn))
1802  send_replication_origin = false;
1803 
1804  OutputPluginPrepareWrite(ctx, !send_replication_origin);
1806 
1808  send_replication_origin);
1809 
1810  OutputPluginWrite(ctx, true);
1811 
1812  /* we're streaming a chunk of transaction now */
1813  in_streaming = true;
1814 }
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
Definition: proto.c:1075
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert(), in_streaming, InvalidRepOriginId, InvalidXLogRecPtr, logicalrep_write_stream_start(), ReorderBufferTXN::origin_id, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), rbtxn_is_streamed, send_repl_origin(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_stop()

static void pgoutput_stream_stop ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 1820 of file pgoutput.c.

1822 {
1823  /* we should be streaming a trasanction */
1825 
1826  OutputPluginPrepareWrite(ctx, true);
1828  OutputPluginWrite(ctx, true);
1829 
1830  /* we've stopped streaming a transaction */
1831  in_streaming = false;
1832 }
void logicalrep_write_stream_stop(StringInfo out)
Definition: proto.c:1109

References Assert(), in_streaming, logicalrep_write_stream_stop(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

◆ pgoutput_truncate()

static void pgoutput_truncate ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
int  nrelations,
Relation  relations[],
ReorderBufferChange change 
)
static

Definition at line 1605 of file pgoutput.c.

1607 {
1610  MemoryContext old;
1611  RelationSyncEntry *relentry;
1612  int i;
1613  int nrelids;
1614  Oid *relids;
1616 
1617  update_replication_progress(ctx, false);
1618 
1619  /* Remember the xid for the change in streaming mode. See pgoutput_change. */
1620  if (in_streaming)
1621  xid = change->txn->xid;
1622 
1623  old = MemoryContextSwitchTo(data->context);
1624 
1625  relids = palloc0(nrelations * sizeof(Oid));
1626  nrelids = 0;
1627 
1628  for (i = 0; i < nrelations; i++)
1629  {
1630  Relation relation = relations[i];
1631  Oid relid = RelationGetRelid(relation);
1632 
1633  if (!is_publishable_relation(relation))
1634  continue;
1635 
1636  relentry = get_rel_sync_entry(data, relation);
1637 
1638  if (!relentry->pubactions.pubtruncate)
1639  continue;
1640 
1641  /*
1642  * Don't send partitions if the publication wants to send only the
1643  * root tables through it.
1644  */
1645  if (relation->rd_rel->relispartition &&
1646  relentry->publish_as_relid != relid)
1647  continue;
1648 
1649  relids[nrelids++] = relid;
1650 
1651  /* Send BEGIN if we haven't yet */
1652  if (txndata && !txndata->sent_begin_txn)
1653  pgoutput_send_begin(ctx, txn);
1654 
1655  maybe_send_schema(ctx, change, relation, relentry);
1656  }
1657 
1658  if (nrelids > 0)
1659  {
1660  OutputPluginPrepareWrite(ctx, true);
1662  xid,
1663  nrelids,
1664  relids,
1665  change->data.truncate.cascade,
1666  change->data.truncate.restart_seqs);
1667  OutputPluginWrite(ctx, true);
1668  }
1669 
1670  MemoryContextSwitchTo(old);
1671  MemoryContextReset(data->context);
1672 }
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:586
struct ReorderBufferChange::@97::@99 truncate

References ReorderBufferChange::data, data, get_rel_sync_entry(), i, in_streaming, InvalidTransactionId, is_publishable_relation(), logicalrep_write_truncate(), maybe_send_schema(), MemoryContextReset(), MemoryContextSwitchTo(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), palloc0(), pgoutput_send_begin(), RelationSyncEntry::pubactions, RelationSyncEntry::publish_as_relid, PublicationActions::pubtruncate, RelationData::rd_rel, RelationGetRelid, ReorderBufferChange::truncate, ReorderBufferChange::txn, update_replication_progress(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ publication_invalidation_cb()

static void publication_invalidation_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 1774 of file pgoutput.c.

1775 {
1776  publications_valid = false;
1777 
1778  /*
1779  * Also invalidate per-relation cache so that next time the filtering info
1780  * is checked it will be updated with the new publication settings.
1781  */
1782  rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
1783 }
void * arg

References arg, publications_valid, and rel_sync_cache_publication_cb().

Referenced by pgoutput_startup().

◆ rel_sync_cache_publication_cb()

static void rel_sync_cache_publication_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 2332 of file pgoutput.c.

2333 {
2335  RelationSyncEntry *entry;
2336 
2337  /*
2338  * We can get here if the plugin was used in SQL interface as the
2339  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
2340  * is no way to unregister the relcache invalidation callback.
2341  */
2342  if (RelationSyncCache == NULL)
2343  return;
2344 
2345  /*
2346  * There is no way to find which entry in our cache the hash belongs to so
2347  * mark the whole cache as invalid.
2348  */
2350  while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2351  {
2352  entry->replicate_valid = false;
2353  }
2354 }
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:225

References hash_seq_init(), hash_seq_search(), RelationSyncCache, RelationSyncEntry::replicate_valid, and status().

Referenced by init_rel_sync_cache(), and publication_invalidation_cb().

◆ rel_sync_cache_relation_cb()

static void rel_sync_cache_relation_cb ( Datum  arg,
Oid  relid 
)
static

Definition at line 2281 of file pgoutput.c.

2282 {
2283  RelationSyncEntry *entry;
2284 
2285  /*
2286  * We can get here if the plugin was used in SQL interface as the
2287  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
2288  * is no way to unregister the relcache invalidation callback.
2289  */
2290  if (RelationSyncCache == NULL)
2291  return;
2292 
2293  /*
2294  * Nobody keeps pointers to entries in this hash table around outside
2295  * logical decoding callback calls - but invalidation events can come in
2296  * *during* a callback if we do any syscache access in the callback.
2297  * Because of that we must mark the cache entry as invalid but not damage
2298  * any of its substructure here. The next get_rel_sync_entry() call will
2299  * rebuild it all.
2300  */
2301  if (OidIsValid(relid))
2302  {
2303  /*
2304  * Getting invalidations for relations that aren't in the table is
2305  * entirely normal. So we don't care if it's found or not.
2306  */
2307  entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
2308  HASH_FIND, NULL);
2309  if (entry != NULL)
2310  entry->replicate_valid = false;
2311  }
2312  else
2313  {
2314  /* Whole cache must be flushed. */
2316 
2318  while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2319  {
2320  entry->replicate_valid = false;
2321  }
2322  }
2323 }
#define OidIsValid(objectId)
Definition: c.h:711
@ HASH_FIND
Definition: hsearch.h:113

References HASH_FIND, hash_search(), hash_seq_init(), hash_seq_search(), OidIsValid, RelationSyncCache, RelationSyncEntry::replicate_valid, and status().

Referenced by init_rel_sync_cache().

◆ send_relation_and_attrs()

static void send_relation_and_attrs ( Relation  relation,
TransactionId  xid,
LogicalDecodingContext ctx,
Bitmapset columns 
)
static

Definition at line 733 of file pgoutput.c.

736 {
737  TupleDesc desc = RelationGetDescr(relation);
738  int i;
739 
740  /*
741  * Write out type info if needed. We do that only for user-created types.
742  * We use FirstGenbkiObjectId as the cutoff, so that we only consider
743  * objects with hand-assigned OIDs to be "built in", not for instance any
744  * function or type defined in the information_schema. This is important
745  * because only hand-assigned OIDs can be expected to remain stable across
746  * major versions.
747  */
748  for (i = 0; i < desc->natts; i++)
749  {
750  Form_pg_attribute att = TupleDescAttr(desc, i);
751 
752  if (att->attisdropped || att->attgenerated)
753  continue;
754 
755  if (att->atttypid < FirstGenbkiObjectId)
756  continue;
757 
758  /* Skip this attribute if it's not present in the column list */
759  if (columns != NULL && !bms_is_member(att->attnum, columns))
760  continue;
761 
762  OutputPluginPrepareWrite(ctx, false);
763  logicalrep_write_typ(ctx->out, xid, att->atttypid);
764  OutputPluginWrite(ctx, false);
765  }
766 
767  OutputPluginPrepareWrite(ctx, false);
768  logicalrep_write_rel(ctx->out, xid, relation, columns);
769  OutputPluginWrite(ctx, false);
770 }
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:428
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns)
Definition: proto.c:670
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
Definition: proto.c:725
#define FirstGenbkiObjectId
Definition: transam.h:195

References bms_is_member(), FirstGenbkiObjectId, i, logicalrep_write_rel(), logicalrep_write_typ(), TupleDescData::natts, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), RelationGetDescr, and TupleDescAttr.

Referenced by maybe_send_schema().

◆ send_repl_origin()

static void send_repl_origin ( LogicalDecodingContext ctx,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
bool  send_origin 
)
static

Definition at line 2358 of file pgoutput.c.

2360 {
2361  if (send_origin)
2362  {
2363  char *origin;
2364 
2365  /*----------
2366  * XXX: which behaviour do we want here?
2367  *
2368  * Alternatives:
2369  * - don't send origin message if origin name not found
2370  * (that's what we do now)
2371  * - throw error - that will break replication, not good
2372  * - send some special "unknown" origin
2373  *----------
2374  */
2375  if (replorigin_by_oid(origin_id, true, &origin))
2376  {
2377  /* Message boundary */
2378  OutputPluginWrite(ctx, false);
2379  OutputPluginPrepareWrite(ctx, true);
2380 
2381  logicalrep_write_origin(ctx->out, origin, origin_lsn);
2382  }
2383  }
2384 }
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:461
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:385

References logicalrep_write_origin(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and replorigin_by_oid().

Referenced by pgoutput_begin_prepare_txn(), pgoutput_send_begin(), and pgoutput_stream_start().

◆ set_schema_sent_in_streamed_txn()

static void set_schema_sent_in_streamed_txn ( RelationSyncEntry entry,
TransactionId  xid 
)
static

Definition at line 1955 of file pgoutput.c.

1956 {
1957  MemoryContext oldctx;
1958 
1960 
1961  entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
1962 
1963  MemoryContextSwitchTo(oldctx);
1964 }
List * lappend_xid(List *list, TransactionId datum)
Definition: list.c:392

References CacheMemoryContext, lappend_xid(), MemoryContextSwitchTo(), and RelationSyncEntry::streamed_txns.

Referenced by maybe_send_schema().

◆ update_replication_progress()

static void update_replication_progress ( LogicalDecodingContext ctx,
bool  skipped_xact 
)
static

Definition at line 2396 of file pgoutput.c.

2397 {
2398  static int changes_count = 0;
2399 
2400  /*
2401  * We don't want to try sending a keepalive message after processing each
2402  * change as that can have overhead. Tests revealed that there is no
2403  * noticeable overhead in doing it after continuously processing 100 or so
2404  * changes.
2405  */
2406 #define CHANGES_THRESHOLD 100
2407 
2408  /*
2409  * If we are at the end of transaction LSN, update progress tracking.
2410  * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
2411  * try to send a keepalive message if required.
2412  */
2413  if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
2414  {
2415  OutputPluginUpdateProgress(ctx, skipped_xact);
2416  changes_count = 0;
2417  }
2418 }
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact)
Definition: logical.c:675
#define CHANGES_THRESHOLD

References CHANGES_THRESHOLD, LogicalDecodingContext::end_xact, and OutputPluginUpdateProgress().

Referenced by pgoutput_change(), pgoutput_commit_prepared_txn(), pgoutput_commit_txn(), pgoutput_message(), pgoutput_prepare_txn(), pgoutput_rollback_prepared_txn(), pgoutput_stream_commit(), pgoutput_stream_prepare_txn(), and pgoutput_truncate().

Variable Documentation

◆ in_streaming

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 37 of file pgoutput.c.

◆ publications_valid

bool publications_valid
static

Definition at line 81 of file pgoutput.c.

Referenced by get_rel_sync_entry(), pgoutput_startup(), and publication_invalidation_cb().

◆ publish_no_origin

bool publish_no_origin
static

Definition at line 83 of file pgoutput.c.

Referenced by parse_output_parameters(), and pgoutput_origin_filter().

◆ RelationSyncCache