PostgreSQL Source Code  git master
pgoutput.c File Reference
Include dependency graph for pgoutput.c:

Go to the source code of this file.

Data Structures

struct  RelationSyncEntry
 
struct  PGOutputTxnData
 

Macros

#define NUM_ROWFILTER_PUBACTIONS   (PUBACTION_DELETE+1)
 

Typedefs

typedef struct RelationSyncEntry RelationSyncEntry
 
typedef struct PGOutputTxnData PGOutputTxnData
 

Enumerations

enum  RowFilterPubAction { PUBACTION_INSERT , PUBACTION_UPDATE , PUBACTION_DELETE }
 

Functions

static void pgoutput_startup (LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
 
static void pgoutput_shutdown (LogicalDecodingContext *ctx)
 
static void pgoutput_begin_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_commit_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void pgoutput_truncate (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
static void pgoutput_message (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
static bool pgoutput_origin_filter (LogicalDecodingContext *ctx, RepOriginId origin_id)
 
static void pgoutput_begin_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static void pgoutput_commit_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_rollback_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
static void pgoutput_stream_start (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_stream_stop (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_stream_abort (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
 
static void pgoutput_stream_commit (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_stream_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static ListLoadPublications (List *pubnames)
 
static void publication_invalidation_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void send_relation_and_attrs (Relation relation, TransactionId xid, LogicalDecodingContext *ctx, Bitmapset *columns)
 
static void send_repl_origin (LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin)
 
static void init_rel_sync_cache (MemoryContext cachectx)
 
static void cleanup_rel_sync_cache (TransactionId xid, bool is_commit)
 
static RelationSyncEntryget_rel_sync_entry (PGOutputData *data, Relation relation)
 
static void rel_sync_cache_relation_cb (Datum arg, Oid relid)
 
static void rel_sync_cache_publication_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void set_schema_sent_in_streamed_txn (RelationSyncEntry *entry, TransactionId xid)
 
static bool get_schema_sent_in_streamed_txn (RelationSyncEntry *entry, TransactionId xid)
 
static void init_tuple_slot (PGOutputData *data, Relation relation, RelationSyncEntry *entry)
 
static EStatecreate_estate_for_relation (Relation rel)
 
static void pgoutput_row_filter_init (PGOutputData *data, List *publications, RelationSyncEntry *entry)
 
static bool pgoutput_row_filter_exec_expr (ExprState *state, ExprContext *econtext)
 
static bool pgoutput_row_filter (Relation relation, TupleTableSlot *old_slot, TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry, ReorderBufferChangeType *action)
 
static void pgoutput_column_list_init (PGOutputData *data, List *publications, RelationSyncEntry *entry)
 
void _PG_output_plugin_init (OutputPluginCallbacks *cb)
 
static void parse_output_parameters (List *options, PGOutputData *data)
 
static void pgoutput_send_begin (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void maybe_send_schema (LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
 
static void pgoutput_ensure_entry_cxt (PGOutputData *data, RelationSyncEntry *entry)
 

Variables

 PG_MODULE_MAGIC
 
static bool publications_valid
 
static bool in_streaming
 
static bool publish_no_origin
 
static HTABRelationSyncCache = NULL
 

Macro Definition Documentation

◆ NUM_ROWFILTER_PUBACTIONS

#define NUM_ROWFILTER_PUBACTIONS   (PUBACTION_DELETE+1)

Definition at line 108 of file pgoutput.c.

Typedef Documentation

◆ PGOutputTxnData

◆ RelationSyncEntry

Enumeration Type Documentation

◆ RowFilterPubAction

Enumerator
PUBACTION_INSERT 
PUBACTION_UPDATE 
PUBACTION_DELETE 

Definition at line 101 of file pgoutput.c.

102 {
106 };
@ PUBACTION_INSERT
Definition: pgoutput.c:103
@ PUBACTION_UPDATE
Definition: pgoutput.c:104
@ PUBACTION_DELETE
Definition: pgoutput.c:105

Function Documentation

◆ _PG_output_plugin_init()

void _PG_output_plugin_init ( OutputPluginCallbacks cb)

Definition at line 250 of file pgoutput.c.

251 {
258 
265 
266  /* transaction streaming */
274  /* transaction streaming - two-phase commit */
276 }
static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
Definition: pgoutput.c:1401
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:615
static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: pgoutput.c:409
static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Definition: pgoutput.c:1560
static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: pgoutput.c:632
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
Definition: pgoutput.c:1673
static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: pgoutput.c:660
static void pgoutput_shutdown(LogicalDecodingContext *ctx)
Definition: pgoutput.c:1690
static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
Definition: pgoutput.c:1790
static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: pgoutput.c:1849
static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:547
static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:1823
static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:646
static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:1771
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:1740
static void pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: pgoutput.c:1628
static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:583
LogicalDecodeStreamChangeCB stream_change_cb
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
LogicalDecodeStreamMessageCB stream_message_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeTruncateCB truncate_cb
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
LogicalDecodeStreamPrepareCB stream_prepare_cb
LogicalDecodeCommitPreparedCB commit_prepared_cb
LogicalDecodeStreamStartCB stream_start_cb
LogicalDecodePrepareCB prepare_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeCommitCB commit_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamAbortCB stream_abort_cb
LogicalDecodeBeginPrepareCB begin_prepare_cb
LogicalDecodeChangeCB change_cb
LogicalDecodeShutdownCB shutdown_cb

References OutputPluginCallbacks::begin_cb, OutputPluginCallbacks::begin_prepare_cb, OutputPluginCallbacks::change_cb, OutputPluginCallbacks::commit_cb, OutputPluginCallbacks::commit_prepared_cb, OutputPluginCallbacks::filter_by_origin_cb, OutputPluginCallbacks::message_cb, pgoutput_begin_prepare_txn(), pgoutput_begin_txn(), pgoutput_change(), pgoutput_commit_prepared_txn(), pgoutput_commit_txn(), pgoutput_message(), pgoutput_origin_filter(), pgoutput_prepare_txn(), pgoutput_rollback_prepared_txn(), pgoutput_shutdown(), pgoutput_startup(), pgoutput_stream_abort(), pgoutput_stream_commit(), pgoutput_stream_prepare_txn(), pgoutput_stream_start(), pgoutput_stream_stop(), pgoutput_truncate(), OutputPluginCallbacks::prepare_cb, OutputPluginCallbacks::rollback_prepared_cb, OutputPluginCallbacks::shutdown_cb, OutputPluginCallbacks::startup_cb, OutputPluginCallbacks::stream_abort_cb, OutputPluginCallbacks::stream_change_cb, OutputPluginCallbacks::stream_commit_cb, OutputPluginCallbacks::stream_message_cb, OutputPluginCallbacks::stream_prepare_cb, OutputPluginCallbacks::stream_start_cb, OutputPluginCallbacks::stream_stop_cb, OutputPluginCallbacks::stream_truncate_cb, and OutputPluginCallbacks::truncate_cb.

◆ cleanup_rel_sync_cache()

static void cleanup_rel_sync_cache ( TransactionId  xid,
bool  is_commit 
)
static

Definition at line 2223 of file pgoutput.c.

2224 {
2225  HASH_SEQ_STATUS hash_seq;
2226  RelationSyncEntry *entry;
2227  ListCell *lc;
2228 
2229  Assert(RelationSyncCache != NULL);
2230 
2231  hash_seq_init(&hash_seq, RelationSyncCache);
2232  while ((entry = hash_seq_search(&hash_seq)) != NULL)
2233  {
2234  /*
2235  * We can set the schema_sent flag for an entry that has committed xid
2236  * in the list as that ensures that the subscriber would have the
2237  * corresponding schema and we don't need to send it unless there is
2238  * any invalidation for that relation.
2239  */
2240  foreach(lc, entry->streamed_txns)
2241  {
2242  if (xid == lfirst_xid(lc))
2243  {
2244  if (is_commit)
2245  entry->schema_sent = true;
2246 
2247  entry->streamed_txns =
2249  break;
2250  }
2251  }
2252  }
2253 }
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1431
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1421
Assert(fmt[strlen(fmt) - 1] !='\n')
#define lfirst_xid(lc)
Definition: pg_list.h:175
#define foreach_delete_current(lst, cell)
Definition: pg_list.h:390
static HTAB * RelationSyncCache
Definition: pgoutput.c:213
List * streamed_txns
Definition: pgoutput.c:135

References Assert(), foreach_delete_current, hash_seq_init(), hash_seq_search(), lfirst_xid, RelationSyncCache, RelationSyncEntry::schema_sent, and RelationSyncEntry::streamed_txns.

Referenced by pgoutput_stream_abort(), and pgoutput_stream_commit().

◆ create_estate_for_relation()

static EState * create_estate_for_relation ( Relation  rel)
static

Definition at line 792 of file pgoutput.c.

793 {
794  EState *estate;
795  RangeTblEntry *rte;
796  List *perminfos = NIL;
797 
798  estate = CreateExecutorState();
799 
800  rte = makeNode(RangeTblEntry);
801  rte->rtekind = RTE_RELATION;
802  rte->relid = RelationGetRelid(rel);
803  rte->relkind = rel->rd_rel->relkind;
805 
806  addRTEPermissionInfo(&perminfos, rte);
807 
808  ExecInitRangeTable(estate, list_make1(rte), perminfos);
809 
810  estate->es_output_cid = GetCurrentCommandId(false);
811 
812  return estate;
813 }
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos)
Definition: execUtils.c:759
EState * CreateExecutorState(void)
Definition: execUtils.c:93
#define AccessShareLock
Definition: lockdefs.h:36
#define makeNode(_type_)
Definition: nodes.h:176
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
@ RTE_RELATION
Definition: parsenodes.h:1014
#define NIL
Definition: pg_list.h:68
#define list_make1(x1)
Definition: pg_list.h:212
#define RelationGetRelid(relation)
Definition: rel.h:504
CommandId es_output_cid
Definition: execnodes.h:631
Definition: pg_list.h:54
RTEKind rtekind
Definition: parsenodes.h:1033
Form_pg_class rd_rel
Definition: rel.h:111
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:818

References AccessShareLock, addRTEPermissionInfo(), CreateExecutorState(), EState::es_output_cid, ExecInitRangeTable(), GetCurrentCommandId(), list_make1, makeNode, NIL, RelationData::rd_rel, RelationGetRelid, RangeTblEntry::relid, RangeTblEntry::relkind, RangeTblEntry::rellockmode, RTE_RELATION, and RangeTblEntry::rtekind.

Referenced by pgoutput_row_filter_init().

◆ get_rel_sync_entry()

static RelationSyncEntry * get_rel_sync_entry ( PGOutputData data,
Relation  relation 
)
static

Definition at line 1954 of file pgoutput.c.

1955 {
1956  RelationSyncEntry *entry;
1957  bool found;
1958  MemoryContext oldctx;
1959  Oid relid = RelationGetRelid(relation);
1960 
1961  Assert(RelationSyncCache != NULL);
1962 
1963  /* Find cached relation info, creating if not found */
1965  &relid,
1966  HASH_ENTER, &found);
1967  Assert(entry != NULL);
1968 
1969  /* initialize entry, if it's new */
1970  if (!found)
1971  {
1972  entry->replicate_valid = false;
1973  entry->schema_sent = false;
1974  entry->streamed_txns = NIL;
1975  entry->pubactions.pubinsert = entry->pubactions.pubupdate =
1976  entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
1977  entry->new_slot = NULL;
1978  entry->old_slot = NULL;
1979  memset(entry->exprstate, 0, sizeof(entry->exprstate));
1980  entry->entry_cxt = NULL;
1981  entry->publish_as_relid = InvalidOid;
1982  entry->columns = NULL;
1983  entry->attrmap = NULL;
1984  }
1985 
1986  /* Validate the entry */
1987  if (!entry->replicate_valid)
1988  {
1989  Oid schemaId = get_rel_namespace(relid);
1990  List *pubids = GetRelationPublications(relid);
1991 
1992  /*
1993  * We don't acquire a lock on the namespace system table as we build
1994  * the cache entry using a historic snapshot and all the later changes
1995  * are absorbed while decoding WAL.
1996  */
1997  List *schemaPubids = GetSchemaPublications(schemaId);
1998  ListCell *lc;
1999  Oid publish_as_relid = relid;
2000  int publish_ancestor_level = 0;
2001  bool am_partition = get_rel_relispartition(relid);
2002  char relkind = get_rel_relkind(relid);
2003  List *rel_publications = NIL;
2004 
2005  /* Reload publications if needed before use. */
2006  if (!publications_valid)
2007  {
2009  if (data->publications)
2010  {
2011  list_free_deep(data->publications);
2012  data->publications = NIL;
2013  }
2014  data->publications = LoadPublications(data->publication_names);
2015  MemoryContextSwitchTo(oldctx);
2016  publications_valid = true;
2017  }
2018 
2019  /*
2020  * Reset schema_sent status as the relation definition may have
2021  * changed. Also reset pubactions to empty in case rel was dropped
2022  * from a publication. Also free any objects that depended on the
2023  * earlier definition.
2024  */
2025  entry->schema_sent = false;
2026  list_free(entry->streamed_txns);
2027  entry->streamed_txns = NIL;
2028  bms_free(entry->columns);
2029  entry->columns = NULL;
2030  entry->pubactions.pubinsert = false;
2031  entry->pubactions.pubupdate = false;
2032  entry->pubactions.pubdelete = false;
2033  entry->pubactions.pubtruncate = false;
2034 
2035  /*
2036  * Tuple slots cleanups. (Will be rebuilt later if needed).
2037  */
2038  if (entry->old_slot)
2040  if (entry->new_slot)
2042 
2043  entry->old_slot = NULL;
2044  entry->new_slot = NULL;
2045 
2046  if (entry->attrmap)
2047  free_attrmap(entry->attrmap);
2048  entry->attrmap = NULL;
2049 
2050  /*
2051  * Row filter cache cleanups.
2052  */
2053  if (entry->entry_cxt)
2055 
2056  entry->entry_cxt = NULL;
2057  entry->estate = NULL;
2058  memset(entry->exprstate, 0, sizeof(entry->exprstate));
2059 
2060  /*
2061  * Build publication cache. We can't use one provided by relcache as
2062  * relcache considers all publications that the given relation is in,
2063  * but here we only need to consider ones that the subscriber
2064  * requested.
2065  */
2066  foreach(lc, data->publications)
2067  {
2068  Publication *pub = lfirst(lc);
2069  bool publish = false;
2070 
2071  /*
2072  * Under what relid should we publish changes in this publication?
2073  * We'll use the top-most relid across all publications. Also
2074  * track the ancestor level for this publication.
2075  */
2076  Oid pub_relid = relid;
2077  int ancestor_level = 0;
2078 
2079  /*
2080  * If this is a FOR ALL TABLES publication, pick the partition
2081  * root and set the ancestor level accordingly.
2082  */
2083  if (pub->alltables)
2084  {
2085  publish = true;
2086  if (pub->pubviaroot && am_partition)
2087  {
2088  List *ancestors = get_partition_ancestors(relid);
2089 
2090  pub_relid = llast_oid(ancestors);
2091  ancestor_level = list_length(ancestors);
2092  }
2093  }
2094 
2095  if (!publish)
2096  {
2097  bool ancestor_published = false;
2098 
2099  /*
2100  * For a partition, check if any of the ancestors are
2101  * published. If so, note down the topmost ancestor that is
2102  * published via this publication, which will be used as the
2103  * relation via which to publish the partition's changes.
2104  */
2105  if (am_partition)
2106  {
2107  Oid ancestor;
2108  int level;
2109  List *ancestors = get_partition_ancestors(relid);
2110 
2111  ancestor = GetTopMostAncestorInPublication(pub->oid,
2112  ancestors,
2113  &level);
2114 
2115  if (ancestor != InvalidOid)
2116  {
2117  ancestor_published = true;
2118  if (pub->pubviaroot)
2119  {
2120  pub_relid = ancestor;
2121  ancestor_level = level;
2122  }
2123  }
2124  }
2125 
2126  if (list_member_oid(pubids, pub->oid) ||
2127  list_member_oid(schemaPubids, pub->oid) ||
2128  ancestor_published)
2129  publish = true;
2130  }
2131 
2132  /*
2133  * If the relation is to be published, determine actions to
2134  * publish, and list of columns, if appropriate.
2135  *
2136  * Don't publish changes for partitioned tables, because
2137  * publishing those of its partitions suffices, unless partition
2138  * changes won't be published due to pubviaroot being set.
2139  */
2140  if (publish &&
2141  (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2142  {
2143  entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
2144  entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
2145  entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
2147 
2148  /*
2149  * We want to publish the changes as the top-most ancestor
2150  * across all publications. So we need to check if the already
2151  * calculated level is higher than the new one. If yes, we can
2152  * ignore the new value (as it's a child). Otherwise the new
2153  * value is an ancestor, so we keep it.
2154  */
2155  if (publish_ancestor_level > ancestor_level)
2156  continue;
2157 
2158  /*
2159  * If we found an ancestor higher up in the tree, discard the
2160  * list of publications through which we replicate it, and use
2161  * the new ancestor.
2162  */
2163  if (publish_ancestor_level < ancestor_level)
2164  {
2165  publish_as_relid = pub_relid;
2166  publish_ancestor_level = ancestor_level;
2167 
2168  /* reset the publication list for this relation */
2169  rel_publications = NIL;
2170  }
2171  else
2172  {
2173  /* Same ancestor level, has to be the same OID. */
2174  Assert(publish_as_relid == pub_relid);
2175  }
2176 
2177  /* Track publications for this ancestor. */
2178  rel_publications = lappend(rel_publications, pub);
2179  }
2180  }
2181 
2182  entry->publish_as_relid = publish_as_relid;
2183 
2184  /*
2185  * Initialize the tuple slot, map, and row filter. These are only used
2186  * when publishing inserts, updates, or deletes.
2187  */
2188  if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2189  entry->pubactions.pubdelete)
2190  {
2191  /* Initialize the tuple slot and map */
2192  init_tuple_slot(data, relation, entry);
2193 
2194  /* Initialize the row filter */
2195  pgoutput_row_filter_init(data, rel_publications, entry);
2196 
2197  /* Initialize the column list */
2198  pgoutput_column_list_init(data, rel_publications, entry);
2199  }
2200 
2201  list_free(pubids);
2202  list_free(schemaPubids);
2203  list_free(rel_publications);
2204 
2205  entry->replicate_valid = true;
2206  }
2207 
2208  return entry;
2209 }
void free_attrmap(AttrMap *map)
Definition: attmap.c:57
void bms_free(Bitmapset *a)
Definition: bitmapset.c:209
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:953
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1255
@ HASH_ENTER
Definition: hsearch.h:114
List * lappend(List *list, void *datum)
Definition: list.c:338
void list_free(List *list)
Definition: list.c:1545
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:721
void list_free_deep(List *list)
Definition: list.c:1559
bool get_rel_relispartition(Oid relid)
Definition: lsyscache.c:2009
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1985
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1934
MemoryContext CacheMemoryContext
Definition: mcxt.c:144
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:403
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
List * get_partition_ancestors(Oid relid)
Definition: partition.c:133
const void * data
#define lfirst(lc)
Definition: pg_list.h:172
static int list_length(const List *l)
Definition: pg_list.h:152
#define llast_oid(l)
Definition: pg_list.h:200
List * GetSchemaPublications(Oid schemaid)
List * GetRelationPublications(Oid relid)
Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
static List * LoadPublications(List *pubnames)
Definition: pgoutput.c:1703
static void init_tuple_slot(PGOutputData *data, Relation relation, RelationSyncEntry *entry)
Definition: pgoutput.c:1128
static void pgoutput_row_filter_init(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition: pgoutput.c:867
static void pgoutput_column_list_init(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition: pgoutput.c:1013
static bool publications_valid
Definition: pgoutput.c:83
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
PublicationActions pubactions
ExprState * exprstate[NUM_ROWFILTER_PUBACTIONS]
Definition: pgoutput.c:148
Bitmapset * columns
Definition: pgoutput.c:174
PublicationActions pubactions
Definition: pgoutput.c:139
TupleTableSlot * old_slot
Definition: pgoutput.c:151
bool replicate_valid
Definition: pgoutput.c:132
MemoryContext entry_cxt
Definition: pgoutput.c:180
EState * estate
Definition: pgoutput.c:149
TupleTableSlot * new_slot
Definition: pgoutput.c:150
AttrMap * attrmap
Definition: pgoutput.c:167

References Publication::alltables, Assert(), RelationSyncEntry::attrmap, bms_free(), CacheMemoryContext, RelationSyncEntry::columns, data, RelationSyncEntry::entry_cxt, RelationSyncEntry::estate, ExecDropSingleTupleTableSlot(), RelationSyncEntry::exprstate, free_attrmap(), get_partition_ancestors(), get_rel_namespace(), get_rel_relispartition(), get_rel_relkind(), GetRelationPublications(), GetSchemaPublications(), GetTopMostAncestorInPublication(), HASH_ENTER, hash_search(), init_tuple_slot(), InvalidOid, lappend(), lfirst, list_free(), list_free_deep(), list_length(), list_member_oid(), llast_oid, LoadPublications(), MemoryContextDelete(), MemoryContextSwitchTo(), RelationSyncEntry::new_slot, NIL, Publication::oid, RelationSyncEntry::old_slot, pgoutput_column_list_init(), pgoutput_row_filter_init(), RelationSyncEntry::pubactions, Publication::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, publications_valid, RelationSyncEntry::publish_as_relid, PublicationActions::pubtruncate, PublicationActions::pubupdate, Publication::pubviaroot, RelationGetRelid, RelationSyncCache, RelationSyncEntry::replicate_valid, RelationSyncEntry::schema_sent, and RelationSyncEntry::streamed_txns.

Referenced by pgoutput_change(), and pgoutput_truncate().

◆ get_schema_sent_in_streamed_txn()

static bool get_schema_sent_in_streamed_txn ( RelationSyncEntry entry,
TransactionId  xid 
)
static

Definition at line 1923 of file pgoutput.c.

1924 {
1925  return list_member_xid(entry->streamed_txns, xid);
1926 }
bool list_member_xid(const List *list, TransactionId datum)
Definition: list.c:741

References list_member_xid(), and RelationSyncEntry::streamed_txns.

Referenced by maybe_send_schema().

◆ init_rel_sync_cache()

static void init_rel_sync_cache ( MemoryContext  cachectx)
static

Definition at line 1869 of file pgoutput.c.

1870 {
1871  HASHCTL ctl;
1872  static bool relation_callbacks_registered = false;
1873 
1874  /* Nothing to do if hash table already exists */
1875  if (RelationSyncCache != NULL)
1876  return;
1877 
1878  /* Make a new hash table for the cache */
1879  ctl.keysize = sizeof(Oid);
1880  ctl.entrysize = sizeof(RelationSyncEntry);
1881  ctl.hcxt = cachectx;
1882 
1883  RelationSyncCache = hash_create("logical replication output relation cache",
1884  128, &ctl,
1886 
1887  Assert(RelationSyncCache != NULL);
1888 
1889  /* No more to do if we already registered callbacks */
1890  if (relation_callbacks_registered)
1891  return;
1892 
1893  /* We must update the cache entry for a relation after a relcache flush */
1895 
1896  /*
1897  * Flush all cache entries after a pg_namespace change, in case it was a
1898  * schema rename affecting a relation being replicated.
1899  */
1902  (Datum) 0);
1903 
1904  /*
1905  * Flush all cache entries after any publication changes. (We need no
1906  * callback entry for pg_publication, because publication_invalidation_cb
1907  * will take care of it.)
1908  */
1911  (Datum) 0);
1914  (Datum) 0);
1915 
1916  relation_callbacks_registered = true;
1917 }
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:350
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
Definition: inval.c:1561
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1519
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:2310
struct RelationSyncEntry RelationSyncEntry
static void rel_sync_cache_relation_cb(Datum arg, Oid relid)
Definition: pgoutput.c:2259
uintptr_t Datum
Definition: postgres.h:64
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
@ PUBLICATIONNAMESPACEMAP
Definition: syscache.h:82
@ PUBLICATIONRELMAP
Definition: syscache.h:85
@ NAMESPACEOID
Definition: syscache.h:70

References Assert(), CacheRegisterRelcacheCallback(), CacheRegisterSyscacheCallback(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, NAMESPACEOID, PUBLICATIONNAMESPACEMAP, PUBLICATIONRELMAP, rel_sync_cache_publication_cb(), rel_sync_cache_relation_cb(), and RelationSyncCache.

Referenced by pgoutput_startup().

◆ init_tuple_slot()

static void init_tuple_slot ( PGOutputData data,
Relation  relation,
RelationSyncEntry entry 
)
static

Definition at line 1128 of file pgoutput.c.

1130 {
1131  MemoryContext oldctx;
1132  TupleDesc oldtupdesc;
1133  TupleDesc newtupdesc;
1134 
1135  oldctx = MemoryContextSwitchTo(data->cachectx);
1136 
1137  /*
1138  * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1139  * live as long as the cache remains.
1140  */
1141  oldtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
1142  newtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
1143 
1144  entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1145  entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
1146 
1147  MemoryContextSwitchTo(oldctx);
1148 
1149  /*
1150  * Cache the map that will be used to convert the relation's tuples into
1151  * the ancestor's format, if needed.
1152  */
1153  if (entry->publish_as_relid != RelationGetRelid(relation))
1154  {
1155  Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
1156  TupleDesc indesc = RelationGetDescr(relation);
1157  TupleDesc outdesc = RelationGetDescr(ancestor);
1158 
1159  /* Map must live as long as the session does. */
1161 
1162  entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
1163 
1164  MemoryContextSwitchTo(oldctx);
1165  RelationClose(ancestor);
1166  }
1167 }
AttrMap * build_attrmap_by_name_if_req(TupleDesc indesc, TupleDesc outdesc, bool missing_ok)
Definition: attmap.c:264
const TupleTableSlotOps TTSOpsHeapTuple
Definition: execTuples.c:84
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1239
#define RelationGetDescr(relation)
Definition: rel.h:530
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2054
void RelationClose(Relation relation)
Definition: relcache.c:2160
TupleDesc CreateTupleDescCopy(TupleDesc tupdesc)
Definition: tupdesc.c:111

References RelationSyncEntry::attrmap, build_attrmap_by_name_if_req(), CacheMemoryContext, CreateTupleDescCopy(), data, MakeSingleTupleTableSlot(), MemoryContextSwitchTo(), RelationSyncEntry::new_slot, RelationSyncEntry::old_slot, RelationSyncEntry::publish_as_relid, RelationClose(), RelationGetDescr, RelationGetRelid, RelationIdGetRelation(), and TTSOpsHeapTuple.

Referenced by get_rel_sync_entry().

◆ LoadPublications()

static List * LoadPublications ( List pubnames)
static

Definition at line 1703 of file pgoutput.c.

1704 {
1705  List *result = NIL;
1706  ListCell *lc;
1707 
1708  foreach(lc, pubnames)
1709  {
1710  char *pubname = (char *) lfirst(lc);
1711  Publication *pub = GetPublicationByName(pubname, false);
1712 
1713  result = lappend(result, pub);
1714  }
1715 
1716  return result;
1717 }
Publication * GetPublicationByName(const char *pubname, bool missing_ok)

References GetPublicationByName(), lappend(), lfirst, and NIL.

Referenced by get_rel_sync_entry().

◆ maybe_send_schema()

static void maybe_send_schema ( LogicalDecodingContext ctx,
ReorderBufferChange change,
Relation  relation,
RelationSyncEntry relentry 
)
static

Definition at line 678 of file pgoutput.c.

681 {
682  bool schema_sent;
685 
686  /*
687  * Remember XID of the (sub)transaction for the change. We don't care if
688  * it's top-level transaction or not (we have already sent that XID in
689  * start of the current streaming block).
690  *
691  * If we're not in a streaming block, just use InvalidTransactionId and
692  * the write methods will not include it.
693  */
694  if (in_streaming)
695  xid = change->txn->xid;
696 
697  if (rbtxn_is_subtxn(change->txn))
698  topxid = rbtxn_get_toptxn(change->txn)->xid;
699  else
700  topxid = xid;
701 
702  /*
703  * Do we need to send the schema? We do track streamed transactions
704  * separately, because those may be applied later (and the regular
705  * transactions won't see their effects until then) and in an order that
706  * we don't know at this point.
707  *
708  * XXX There is a scope of optimization here. Currently, we always send
709  * the schema first time in a streaming transaction but we can probably
710  * avoid that by checking 'relentry->schema_sent' flag. However, before
711  * doing that we need to study its impact on the case where we have a mix
712  * of streaming and non-streaming transactions.
713  */
714  if (in_streaming)
715  schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
716  else
717  schema_sent = relentry->schema_sent;
718 
719  /* Nothing to do if we already sent the schema. */
720  if (schema_sent)
721  return;
722 
723  /*
724  * Send the schema. If the changes will be published using an ancestor's
725  * schema, not the relation's own, send that ancestor's schema before
726  * sending relation's own (XXX - maybe sending only the former suffices?).
727  */
728  if (relentry->publish_as_relid != RelationGetRelid(relation))
729  {
730  Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
731 
732  send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
733  RelationClose(ancestor);
734  }
735 
736  send_relation_and_attrs(relation, xid, ctx, relentry->columns);
737 
738  if (in_streaming)
739  set_schema_sent_in_streamed_txn(relentry, topxid);
740  else
741  relentry->schema_sent = true;
742 }
uint32 TransactionId
Definition: c.h:636
static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx, Bitmapset *columns)
Definition: pgoutput.c:748
static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:1933
static bool in_streaming
Definition: pgoutput.c:84
static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:1923
#define rbtxn_get_toptxn(txn)
#define rbtxn_is_subtxn(txn)
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:97
TransactionId xid
#define InvalidTransactionId
Definition: transam.h:31

References RelationSyncEntry::columns, get_schema_sent_in_streamed_txn(), in_streaming, InvalidTransactionId, RelationSyncEntry::publish_as_relid, rbtxn_get_toptxn, rbtxn_is_subtxn, RelationClose(), RelationGetRelid, RelationIdGetRelation(), RelationSyncEntry::schema_sent, send_relation_and_attrs(), set_schema_sent_in_streamed_txn(), ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by pgoutput_change(), and pgoutput_truncate().

◆ parse_output_parameters()

static void parse_output_parameters ( List options,
PGOutputData data 
)
static

Definition at line 279 of file pgoutput.c.

280 {
281  ListCell *lc;
282  bool protocol_version_given = false;
283  bool publication_names_given = false;
284  bool binary_option_given = false;
285  bool messages_option_given = false;
286  bool streaming_given = false;
287  bool two_phase_option_given = false;
288  bool origin_option_given = false;
289 
290  data->binary = false;
291  data->streaming = LOGICALREP_STREAM_OFF;
292  data->messages = false;
293  data->two_phase = false;
294 
295  foreach(lc, options)
296  {
297  DefElem *defel = (DefElem *) lfirst(lc);
298 
299  Assert(defel->arg == NULL || IsA(defel->arg, String));
300 
301  /* Check each param, whether or not we recognize it */
302  if (strcmp(defel->defname, "proto_version") == 0)
303  {
304  unsigned long parsed;
305  char *endptr;
306 
307  if (protocol_version_given)
308  ereport(ERROR,
309  (errcode(ERRCODE_SYNTAX_ERROR),
310  errmsg("conflicting or redundant options")));
311  protocol_version_given = true;
312 
313  errno = 0;
314  parsed = strtoul(strVal(defel->arg), &endptr, 10);
315  if (errno != 0 || *endptr != '\0')
316  ereport(ERROR,
317  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
318  errmsg("invalid proto_version")));
319 
320  if (parsed > PG_UINT32_MAX)
321  ereport(ERROR,
322  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
323  errmsg("proto_version \"%s\" out of range",
324  strVal(defel->arg))));
325 
326  data->protocol_version = (uint32) parsed;
327  }
328  else if (strcmp(defel->defname, "publication_names") == 0)
329  {
330  if (publication_names_given)
331  ereport(ERROR,
332  (errcode(ERRCODE_SYNTAX_ERROR),
333  errmsg("conflicting or redundant options")));
334  publication_names_given = true;
335 
336  if (!SplitIdentifierString(strVal(defel->arg), ',',
337  &data->publication_names))
338  ereport(ERROR,
339  (errcode(ERRCODE_INVALID_NAME),
340  errmsg("invalid publication_names syntax")));
341  }
342  else if (strcmp(defel->defname, "binary") == 0)
343  {
344  if (binary_option_given)
345  ereport(ERROR,
346  (errcode(ERRCODE_SYNTAX_ERROR),
347  errmsg("conflicting or redundant options")));
348  binary_option_given = true;
349 
350  data->binary = defGetBoolean(defel);
351  }
352  else if (strcmp(defel->defname, "messages") == 0)
353  {
354  if (messages_option_given)
355  ereport(ERROR,
356  (errcode(ERRCODE_SYNTAX_ERROR),
357  errmsg("conflicting or redundant options")));
358  messages_option_given = true;
359 
360  data->messages = defGetBoolean(defel);
361  }
362  else if (strcmp(defel->defname, "streaming") == 0)
363  {
364  if (streaming_given)
365  ereport(ERROR,
366  (errcode(ERRCODE_SYNTAX_ERROR),
367  errmsg("conflicting or redundant options")));
368  streaming_given = true;
369 
370  data->streaming = defGetStreamingMode(defel);
371  }
372  else if (strcmp(defel->defname, "two_phase") == 0)
373  {
374  if (two_phase_option_given)
375  ereport(ERROR,
376  (errcode(ERRCODE_SYNTAX_ERROR),
377  errmsg("conflicting or redundant options")));
378  two_phase_option_given = true;
379 
380  data->two_phase = defGetBoolean(defel);
381  }
382  else if (strcmp(defel->defname, "origin") == 0)
383  {
384  if (origin_option_given)
385  ereport(ERROR,
386  errcode(ERRCODE_SYNTAX_ERROR),
387  errmsg("conflicting or redundant options"));
388  origin_option_given = true;
389 
390  data->origin = defGetString(defel);
391  if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0)
392  publish_no_origin = true;
393  else if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) == 0)
394  publish_no_origin = false;
395  else
396  ereport(ERROR,
397  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
398  errmsg("unrecognized origin value: \"%s\"", data->origin));
399  }
400  else
401  elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
402  }
403 }
unsigned int uint32
Definition: c.h:490
#define PG_UINT32_MAX
Definition: c.h:574
bool defGetBoolean(DefElem *def)
Definition: define.c:108
char * defGetString(DefElem *def)
Definition: define.c:49
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
#define IsA(nodeptr, _type_)
Definition: nodes.h:179
#define LOGICALREP_ORIGIN_NONE
#define LOGICALREP_ORIGIN_ANY
#define LOGICALREP_STREAM_OFF
static bool publish_no_origin
Definition: pgoutput.c:85
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
char * defname
Definition: parsenodes.h:810
Node * arg
Definition: parsenodes.h:811
Definition: value.h:64
char defGetStreamingMode(DefElem *def)
#define strVal(v)
Definition: value.h:82
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3454

References DefElem::arg, Assert(), data, defGetBoolean(), defGetStreamingMode(), defGetString(), DefElem::defname, elog(), ereport, errcode(), errmsg(), ERROR, IsA, lfirst, LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE, LOGICALREP_STREAM_OFF, pg_strcasecmp(), PG_UINT32_MAX, publish_no_origin, SplitIdentifierString(), and strVal.

Referenced by pgoutput_startup().

◆ pgoutput_begin_prepare_txn()

static void pgoutput_begin_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 615 of file pgoutput.c.

616 {
617  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
618 
619  OutputPluginPrepareWrite(ctx, !send_replication_origin);
621 
622  send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
623  send_replication_origin);
624 
625  OutputPluginWrite(ctx, true);
626 }
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:702
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:689
#define InvalidRepOriginId
Definition: origin.h:33
static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin)
Definition: pgoutput.c:2336
void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:127
StringInfo out
Definition: logical.h:71
RepOriginId origin_id
XLogRecPtr origin_lsn

References InvalidRepOriginId, logicalrep_write_begin_prepare(), ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and send_repl_origin().

Referenced by _PG_output_plugin_init().

◆ pgoutput_begin_txn()

static void pgoutput_begin_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 547 of file pgoutput.c.

548 {
550  sizeof(PGOutputTxnData));
551 
552  txn->output_plugin_private = txndata;
553 }
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1064
MemoryContext context
Definition: logical.h:36
void * output_plugin_private

References LogicalDecodingContext::context, MemoryContextAllocZero(), and ReorderBufferTXN::output_plugin_private.

Referenced by _PG_output_plugin_init().

◆ pgoutput_change()

static void pgoutput_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 1401 of file pgoutput.c.

1403 {
1406  MemoryContext old;
1407  RelationSyncEntry *relentry;
1409  Relation ancestor = NULL;
1410  Relation targetrel = relation;
1412  TupleTableSlot *old_slot = NULL;
1413  TupleTableSlot *new_slot = NULL;
1414 
1415  if (!is_publishable_relation(relation))
1416  return;
1417 
1418  /*
1419  * Remember the xid for the change in streaming mode. We need to send xid
1420  * with each change in the streaming mode so that subscriber can make
1421  * their association and on aborts, it can discard the corresponding
1422  * changes.
1423  */
1424  if (in_streaming)
1425  xid = change->txn->xid;
1426 
1427  relentry = get_rel_sync_entry(data, relation);
1428 
1429  /* First check the table filter */
1430  switch (action)
1431  {
1433  if (!relentry->pubactions.pubinsert)
1434  return;
1435  break;
1437  if (!relentry->pubactions.pubupdate)
1438  return;
1439  break;
1441  if (!relentry->pubactions.pubdelete)
1442  return;
1443 
1444  /*
1445  * This is only possible if deletes are allowed even when replica
1446  * identity is not defined for a table. Since the DELETE action
1447  * can't be published, we simply return.
1448  */
1449  if (!change->data.tp.oldtuple)
1450  {
1451  elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1452  return;
1453  }
1454  break;
1455  default:
1456  Assert(false);
1457  }
1458 
1459  /* Avoid leaking memory by using and resetting our own context */
1460  old = MemoryContextSwitchTo(data->context);
1461 
1462  /* Switch relation if publishing via root. */
1463  if (relentry->publish_as_relid != RelationGetRelid(relation))
1464  {
1465  Assert(relation->rd_rel->relispartition);
1466  ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1467  targetrel = ancestor;
1468  }
1469 
1470  if (change->data.tp.oldtuple)
1471  {
1472  old_slot = relentry->old_slot;
1473  ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, old_slot, false);
1474 
1475  /* Convert tuple if needed. */
1476  if (relentry->attrmap)
1477  {
1479  &TTSOpsVirtual);
1480 
1481  old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1482  }
1483  }
1484 
1485  if (change->data.tp.newtuple)
1486  {
1487  new_slot = relentry->new_slot;
1488  ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, new_slot, false);
1489 
1490  /* Convert tuple if needed. */
1491  if (relentry->attrmap)
1492  {
1494  &TTSOpsVirtual);
1495 
1496  new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1497  }
1498  }
1499 
1500  /*
1501  * Check row filter.
1502  *
1503  * Updates could be transformed to inserts or deletes based on the results
1504  * of the row filter for old and new tuple.
1505  */
1506  if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1507  goto cleanup;
1508 
1509  /*
1510  * Send BEGIN if we haven't yet.
1511  *
1512  * We send the BEGIN message after ensuring that we will actually send the
1513  * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1514  * transactions.
1515  */
1516  if (txndata && !txndata->sent_begin_txn)
1517  pgoutput_send_begin(ctx, txn);
1518 
1519  /*
1520  * Schema should be sent using the original relation because it also sends
1521  * the ancestor's relation.
1522  */
1523  maybe_send_schema(ctx, change, relation, relentry);
1524 
1525  OutputPluginPrepareWrite(ctx, true);
1526 
1527  /* Send the data */
1528  switch (action)
1529  {
1531  logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1532  data->binary, relentry->columns);
1533  break;
1535  logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
1536  new_slot, data->binary, relentry->columns);
1537  break;
1539  logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
1540  data->binary, relentry->columns);
1541  break;
1542  default:
1543  Assert(false);
1544  }
1545 
1546  OutputPluginWrite(ctx, true);
1547 
1548 cleanup:
1549  if (RelationIsValid(ancestor))
1550  {
1551  RelationClose(ancestor);
1552  ancestor = NULL;
1553  }
1554 
1555  MemoryContextSwitchTo(old);
1556  MemoryContextReset(data->context);
1557 }
static void cleanup(void)
Definition: bootstrap.c:696
#define DEBUG1
Definition: elog.h:30
TupleTableSlot * MakeTupleTableSlot(TupleDesc tupleDesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1113
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
TupleTableSlot * ExecStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1353
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:330
bool is_publishable_relation(Relation rel)
static void pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:561
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Relation relation)
Definition: pgoutput.c:1954
static void maybe_send_schema(LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
Definition: pgoutput.c:678
static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot, TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry, ReorderBufferChangeType *action)
Definition: pgoutput.c:1220
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns)
Definition: proto.c:458
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns)
Definition: proto.c:414
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns)
Definition: proto.c:533
#define RelationIsValid(relation)
Definition: rel.h:477
ReorderBufferChangeType
Definition: reorderbuffer.h:64
@ REORDER_BUFFER_CHANGE_INSERT
Definition: reorderbuffer.h:65
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:67
@ REORDER_BUFFER_CHANGE_UPDATE
Definition: reorderbuffer.h:66
void * output_plugin_private
Definition: logical.h:76
ReorderBufferChangeType action
Definition: reorderbuffer.h:94
struct ReorderBufferChange::@99::@100 tp
union ReorderBufferChange::@99 data
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:192

References generate_unaccent_rules::action, ReorderBufferChange::action, Assert(), RelationSyncEntry::attrmap, cleanup(), RelationSyncEntry::columns, ReorderBufferChange::data, data, DEBUG1, elog(), ExecStoreHeapTuple(), execute_attr_map_slot(), get_rel_sync_entry(), in_streaming, InvalidTransactionId, is_publishable_relation(), logicalrep_write_delete(), logicalrep_write_insert(), logicalrep_write_update(), MakeTupleTableSlot(), maybe_send_schema(), MemoryContextReset(), MemoryContextSwitchTo(), RelationSyncEntry::new_slot, RelationSyncEntry::old_slot, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pgoutput_row_filter(), pgoutput_send_begin(), RelationSyncEntry::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, RelationSyncEntry::publish_as_relid, PublicationActions::pubupdate, RelationData::rd_rel, RelationClose(), RelationGetDescr, RelationGetRelid, RelationIdGetRelation(), RelationIsValid, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::tp, TTSOpsVirtual, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_column_list_init()

static void pgoutput_column_list_init ( PGOutputData data,
List publications,
RelationSyncEntry entry 
)
static

Definition at line 1013 of file pgoutput.c.

1015 {
1016  ListCell *lc;
1017  bool first = true;
1018  Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1019 
1020  /*
1021  * Find if there are any column lists for this relation. If there are,
1022  * build a bitmap using the column lists.
1023  *
1024  * Multiple publications might have multiple column lists for this
1025  * relation.
1026  *
1027  * Note that we don't support the case where the column list is different
1028  * for the same table when combining publications. See comments atop
1029  * fetch_table_list. But one can later change the publication so we still
1030  * need to check all the given publication-table mappings and report an
1031  * error if any publications have a different column list.
1032  *
1033  * FOR ALL TABLES and FOR TABLES IN SCHEMA imply "don't use column list".
1034  */
1035  foreach(lc, publications)
1036  {
1037  Publication *pub = lfirst(lc);
1038  HeapTuple cftuple = NULL;
1039  Datum cfdatum = 0;
1040  Bitmapset *cols = NULL;
1041 
1042  /*
1043  * If the publication is FOR ALL TABLES then it is treated the same as
1044  * if there are no column lists (even if other publications have a
1045  * list).
1046  */
1047  if (!pub->alltables)
1048  {
1049  bool pub_no_list = true;
1050 
1051  /*
1052  * Check for the presence of a column list in this publication.
1053  *
1054  * Note: If we find no pg_publication_rel row, it's a publication
1055  * defined for a whole schema, so it can't have a column list,
1056  * just like a FOR ALL TABLES publication.
1057  */
1060  ObjectIdGetDatum(pub->oid));
1061 
1062  if (HeapTupleIsValid(cftuple))
1063  {
1064  /* Lookup the column list attribute. */
1065  cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
1066  Anum_pg_publication_rel_prattrs,
1067  &pub_no_list);
1068 
1069  /* Build the column list bitmap in the per-entry context. */
1070  if (!pub_no_list) /* when not null */
1071  {
1072  int i;
1073  int nliveatts = 0;
1074  TupleDesc desc = RelationGetDescr(relation);
1075 
1077 
1078  cols = pub_collist_to_bitmapset(cols, cfdatum,
1079  entry->entry_cxt);
1080 
1081  /* Get the number of live attributes. */
1082  for (i = 0; i < desc->natts; i++)
1083  {
1084  Form_pg_attribute att = TupleDescAttr(desc, i);
1085 
1086  if (att->attisdropped || att->attgenerated)
1087  continue;
1088 
1089  nliveatts++;
1090  }
1091 
1092  /*
1093  * If column list includes all the columns of the table,
1094  * set it to NULL.
1095  */
1096  if (bms_num_members(cols) == nliveatts)
1097  {
1098  bms_free(cols);
1099  cols = NULL;
1100  }
1101  }
1102 
1103  ReleaseSysCache(cftuple);
1104  }
1105  }
1106 
1107  if (first)
1108  {
1109  entry->columns = cols;
1110  first = false;
1111  }
1112  else if (!bms_equal(entry->columns, cols))
1113  ereport(ERROR,
1114  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1115  errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
1117  RelationGetRelationName(relation)));
1118  } /* loop all subscribed publications */
1119 
1120  RelationClose(relation);
1121 }
bool bms_equal(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:94
int bms_num_members(const Bitmapset *a)
Definition: bitmapset.c:665
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
int i
Definition: isn.c:73
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3324
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:209
Bitmapset * pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
static void pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
Definition: pgoutput.c:845
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
#define RelationGetRelationName(relation)
Definition: rel.h:538
#define RelationGetNamespace(relation)
Definition: rel.h:545
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:866
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1079
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:829
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92

References Publication::alltables, bms_equal(), bms_free(), bms_num_members(), RelationSyncEntry::columns, data, RelationSyncEntry::entry_cxt, ereport, errcode(), errmsg(), ERROR, get_namespace_name(), HeapTupleIsValid, i, lfirst, TupleDescData::natts, ObjectIdGetDatum(), Publication::oid, pgoutput_ensure_entry_cxt(), pub_collist_to_bitmapset(), PUBLICATIONRELMAP, RelationSyncEntry::publish_as_relid, RelationClose(), RelationGetDescr, RelationGetNamespace, RelationGetRelationName, RelationIdGetRelation(), ReleaseSysCache(), SearchSysCache2(), SysCacheGetAttr(), and TupleDescAttr.

Referenced by get_rel_sync_entry().

◆ pgoutput_commit_prepared_txn()

static void pgoutput_commit_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 646 of file pgoutput.c.

648 {
649  OutputPluginUpdateProgress(ctx, false);
650 
651  OutputPluginPrepareWrite(ctx, true);
652  logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
653  OutputPluginWrite(ctx, true);
654 }
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact)
Definition: logical.c:715
void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:248

References logicalrep_write_commit_prepared(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

◆ pgoutput_commit_txn()

static void pgoutput_commit_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 583 of file pgoutput.c.

585 {
587  bool sent_begin_txn;
588 
589  Assert(txndata);
590 
591  /*
592  * We don't need to send the commit message unless some relevant change
593  * from this transaction has been sent to the downstream.
594  */
595  sent_begin_txn = txndata->sent_begin_txn;
596  OutputPluginUpdateProgress(ctx, !sent_begin_txn);
597  pfree(txndata);
598  txn->output_plugin_private = NULL;
599 
600  if (!sent_begin_txn)
601  {
602  elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
603  return;
604  }
605 
606  OutputPluginPrepareWrite(ctx, true);
607  logicalrep_write_commit(ctx->out, txn, commit_lsn);
608  OutputPluginWrite(ctx, true);
609 }
void pfree(void *pointer)
Definition: mcxt.c:1456
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:89
bool sent_begin_txn
Definition: pgoutput.c:209

References Assert(), DEBUG1, elog(), logicalrep_write_commit(), LogicalDecodingContext::out, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), OutputPluginWrite(), pfree(), PGOutputTxnData::sent_begin_txn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_ensure_entry_cxt()

static void pgoutput_ensure_entry_cxt ( PGOutputData data,
RelationSyncEntry entry 
)
static

Definition at line 845 of file pgoutput.c.

846 {
847  Relation relation;
848 
849  /* The context may already exist, in which case bail out. */
850  if (entry->entry_cxt)
851  return;
852 
853  relation = RelationIdGetRelation(entry->publish_as_relid);
854 
855  entry->entry_cxt = AllocSetContextCreate(data->cachectx,
856  "entry private context",
858 
860  RelationGetRelationName(relation));
861 }
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_SMALL_SIZES
Definition: memutils.h:163
#define MemoryContextCopyAndSetIdentifier(cxt, id)
Definition: memutils.h:101

References ALLOCSET_SMALL_SIZES, AllocSetContextCreate, data, RelationSyncEntry::entry_cxt, MemoryContextCopyAndSetIdentifier, RelationSyncEntry::publish_as_relid, RelationGetRelationName, and RelationIdGetRelation().

Referenced by pgoutput_column_list_init(), and pgoutput_row_filter_init().

◆ pgoutput_message()

static void pgoutput_message ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  message_lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)
static

Definition at line 1628 of file pgoutput.c.

1631 {
1634 
1635  if (!data->messages)
1636  return;
1637 
1638  /*
1639  * Remember the xid for the message in streaming mode. See
1640  * pgoutput_change.
1641  */
1642  if (in_streaming)
1643  xid = txn->xid;
1644 
1645  /*
1646  * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1647  */
1648  if (transactional)
1649  {
1651 
1652  /* Send BEGIN if we haven't yet */
1653  if (txndata && !txndata->sent_begin_txn)
1654  pgoutput_send_begin(ctx, txn);
1655  }
1656 
1657  OutputPluginPrepareWrite(ctx, true);
1659  xid,
1660  message_lsn,
1661  transactional,
1662  prefix,
1663  sz,
1664  message);
1665  OutputPluginWrite(ctx, true);
1666 }
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: proto.c:643

References data, if(), in_streaming, InvalidTransactionId, logicalrep_write_message(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pgoutput_send_begin(), PGOutputTxnData::sent_begin_txn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_origin_filter()

static bool pgoutput_origin_filter ( LogicalDecodingContext ctx,
RepOriginId  origin_id 
)
static

Definition at line 1673 of file pgoutput.c.

1675 {
1676  if (publish_no_origin && origin_id != InvalidRepOriginId)
1677  return true;
1678 
1679  return false;
1680 }

References InvalidRepOriginId, and publish_no_origin.

Referenced by _PG_output_plugin_init().

◆ pgoutput_prepare_txn()

static void pgoutput_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 632 of file pgoutput.c.

634 {
635  OutputPluginUpdateProgress(ctx, false);
636 
637  OutputPluginPrepareWrite(ctx, true);
638  logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
639  OutputPluginWrite(ctx, true);
640 }
void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:198

References logicalrep_write_prepare(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

◆ pgoutput_rollback_prepared_txn()

static void pgoutput_rollback_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_end_lsn,
TimestampTz  prepare_time 
)
static

Definition at line 660 of file pgoutput.c.

664 {
665  OutputPluginUpdateProgress(ctx, false);
666 
667  OutputPluginPrepareWrite(ctx, true);
668  logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
669  prepare_time);
670  OutputPluginWrite(ctx, true);
671 }
void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: proto.c:304

References logicalrep_write_rollback_prepared(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

◆ pgoutput_row_filter()

static bool pgoutput_row_filter ( Relation  relation,
TupleTableSlot old_slot,
TupleTableSlot **  new_slot_ptr,
RelationSyncEntry entry,
ReorderBufferChangeType action 
)
static

Definition at line 1220 of file pgoutput.c.

1223 {
1224  TupleDesc desc;
1225  int i;
1226  bool old_matched,
1227  new_matched,
1228  result;
1229  TupleTableSlot *tmp_new_slot;
1230  TupleTableSlot *new_slot = *new_slot_ptr;
1231  ExprContext *ecxt;
1232  ExprState *filter_exprstate;
1233 
1234  /*
1235  * We need this map to avoid relying on ReorderBufferChangeType enums
1236  * having specific values.
1237  */
1238  static const int map_changetype_pubaction[] = {
1242  };
1243 
1247 
1248  Assert(new_slot || old_slot);
1249 
1250  /* Get the corresponding row filter */
1251  filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1252 
1253  /* Bail out if there is no row filter */
1254  if (!filter_exprstate)
1255  return true;
1256 
1257  elog(DEBUG3, "table \"%s.%s\" has row filter",
1259  RelationGetRelationName(relation));
1260 
1262 
1263  ecxt = GetPerTupleExprContext(entry->estate);
1264 
1265  /*
1266  * For the following occasions where there is only one tuple, we can
1267  * evaluate the row filter for that tuple and return.
1268  *
1269  * For inserts, we only have the new tuple.
1270  *
1271  * For updates, we can have only a new tuple when none of the replica
1272  * identity columns changed and none of those columns have external data
1273  * but we still need to evaluate the row filter for the new tuple as the
1274  * existing values of those columns might not match the filter. Also,
1275  * users can use constant expressions in the row filter, so we anyway need
1276  * to evaluate it for the new tuple.
1277  *
1278  * For deletes, we only have the old tuple.
1279  */
1280  if (!new_slot || !old_slot)
1281  {
1282  ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1283  result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1284 
1285  return result;
1286  }
1287 
1288  /*
1289  * Both the old and new tuples must be valid only for updates and need to
1290  * be checked against the row filter.
1291  */
1292  Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1293 
1294  slot_getallattrs(new_slot);
1295  slot_getallattrs(old_slot);
1296 
1297  tmp_new_slot = NULL;
1298  desc = RelationGetDescr(relation);
1299 
1300  /*
1301  * The new tuple might not have all the replica identity columns, in which
1302  * case it needs to be copied over from the old tuple.
1303  */
1304  for (i = 0; i < desc->natts; i++)
1305  {
1306  Form_pg_attribute att = TupleDescAttr(desc, i);
1307 
1308  /*
1309  * if the column in the new tuple or old tuple is null, nothing to do
1310  */
1311  if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1312  continue;
1313 
1314  /*
1315  * Unchanged toasted replica identity columns are only logged in the
1316  * old tuple. Copy this over to the new tuple. The changed (or WAL
1317  * Logged) toast values are always assembled in memory and set as
1318  * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1319  */
1320  if (att->attlen == -1 &&
1321  VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
1322  !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
1323  {
1324  if (!tmp_new_slot)
1325  {
1326  tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
1327  ExecClearTuple(tmp_new_slot);
1328 
1329  memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1330  desc->natts * sizeof(Datum));
1331  memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1332  desc->natts * sizeof(bool));
1333  }
1334 
1335  tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1336  tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1337  }
1338  }
1339 
1340  ecxt->ecxt_scantuple = old_slot;
1341  old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1342 
1343  if (tmp_new_slot)
1344  {
1345  ExecStoreVirtualTuple(tmp_new_slot);
1346  ecxt->ecxt_scantuple = tmp_new_slot;
1347  }
1348  else
1349  ecxt->ecxt_scantuple = new_slot;
1350 
1351  new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1352 
1353  /*
1354  * Case 1: if both tuples don't match the row filter, bailout. Send
1355  * nothing.
1356  */
1357  if (!old_matched && !new_matched)
1358  return false;
1359 
1360  /*
1361  * Case 2: if the old tuple doesn't satisfy the row filter but the new
1362  * tuple does, transform the UPDATE into INSERT.
1363  *
1364  * Use the newly transformed tuple that must contain the column values for
1365  * all the replica identity columns. This is required to ensure that the
1366  * while inserting the tuple in the downstream node, we have all the
1367  * required column values.
1368  */
1369  if (!old_matched && new_matched)
1370  {
1372 
1373  if (tmp_new_slot)
1374  *new_slot_ptr = tmp_new_slot;
1375  }
1376 
1377  /*
1378  * Case 3: if the old tuple satisfies the row filter but the new tuple
1379  * doesn't, transform the UPDATE into DELETE.
1380  *
1381  * This transformation does not require another tuple. The Old tuple will
1382  * be used for DELETE.
1383  */
1384  else if (old_matched && !new_matched)
1386 
1387  /*
1388  * Case 4: if both tuples match the row filter, transformation isn't
1389  * required. (*action is default UPDATE).
1390  */
1391 
1392  return true;
1393 }
#define DEBUG3
Definition: elog.h:28
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1553
#define ResetPerTupleExprContext(estate)
Definition: executor.h:558
#define GetPerTupleExprContext(estate)
Definition: executor.h:549
static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
Definition: pgoutput.c:822
TupleTableSlot * ecxt_scantuple
Definition: execnodes.h:249
bool * tts_isnull
Definition: tuptable.h:128
Datum * tts_values
Definition: tuptable.h:126
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:433
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:362
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: varatt.h:290

References generate_unaccent_rules::action, Assert(), DEBUG3, ExprContext::ecxt_scantuple, elog(), RelationSyncEntry::estate, ExecClearTuple(), ExecStoreVirtualTuple(), RelationSyncEntry::exprstate, get_namespace_name(), GetPerTupleExprContext, i, MakeSingleTupleTableSlot(), TupleDescData::natts, pgoutput_row_filter_exec_expr(), PUBACTION_DELETE, PUBACTION_INSERT, PUBACTION_UPDATE, RelationGetDescr, RelationGetNamespace, RelationGetRelationName, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, ResetPerTupleExprContext, slot_getallattrs(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, TTSOpsVirtual, TupleDescAttr, and VARATT_IS_EXTERNAL_ONDISK.

Referenced by pgoutput_change().

◆ pgoutput_row_filter_exec_expr()

static bool pgoutput_row_filter_exec_expr ( ExprState state,
ExprContext econtext 
)
static

Definition at line 822 of file pgoutput.c.

823 {
824  Datum ret;
825  bool isnull;
826 
827  Assert(state != NULL);
828 
829  ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
830 
831  elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
832  isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
833  isnull ? "true" : "false");
834 
835  if (isnull)
836  return false;
837 
838  return DatumGetBool(ret);
839 }
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:347
static bool DatumGetBool(Datum X)
Definition: postgres.h:90
Definition: regguts.h:323

References Assert(), DatumGetBool(), DEBUG3, elog(), and ExecEvalExprSwitchContext().

Referenced by pgoutput_row_filter().

◆ pgoutput_row_filter_init()

static void pgoutput_row_filter_init ( PGOutputData data,
List publications,
RelationSyncEntry entry 
)
static

Definition at line 867 of file pgoutput.c.

869 {
870  ListCell *lc;
871  List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
872  bool no_filter[] = {false, false, false}; /* One per pubaction */
873  MemoryContext oldctx;
874  int idx;
875  bool has_filter = true;
876  Oid schemaid = get_rel_namespace(entry->publish_as_relid);
877 
878  /*
879  * Find if there are any row filters for this relation. If there are, then
880  * prepare the necessary ExprState and cache it in entry->exprstate. To
881  * build an expression state, we need to ensure the following:
882  *
883  * All the given publication-table mappings must be checked.
884  *
885  * Multiple publications might have multiple row filters for this
886  * relation. Since row filter usage depends on the DML operation, there
887  * are multiple lists (one for each operation) to which row filters will
888  * be appended.
889  *
890  * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
891  * expression" so it takes precedence.
892  */
893  foreach(lc, publications)
894  {
895  Publication *pub = lfirst(lc);
896  HeapTuple rftuple = NULL;
897  Datum rfdatum = 0;
898  bool pub_no_filter = true;
899 
900  /*
901  * If the publication is FOR ALL TABLES, or the publication includes a
902  * FOR TABLES IN SCHEMA where the table belongs to the referred
903  * schema, then it is treated the same as if there are no row filters
904  * (even if other publications have a row filter).
905  */
906  if (!pub->alltables &&
908  ObjectIdGetDatum(schemaid),
909  ObjectIdGetDatum(pub->oid)))
910  {
911  /*
912  * Check for the presence of a row filter in this publication.
913  */
916  ObjectIdGetDatum(pub->oid));
917 
918  if (HeapTupleIsValid(rftuple))
919  {
920  /* Null indicates no filter. */
921  rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
922  Anum_pg_publication_rel_prqual,
923  &pub_no_filter);
924  }
925  }
926 
927  if (pub_no_filter)
928  {
929  if (rftuple)
930  ReleaseSysCache(rftuple);
931 
932  no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
933  no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
934  no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
935 
936  /*
937  * Quick exit if all the DML actions are publicized via this
938  * publication.
939  */
940  if (no_filter[PUBACTION_INSERT] &&
941  no_filter[PUBACTION_UPDATE] &&
942  no_filter[PUBACTION_DELETE])
943  {
944  has_filter = false;
945  break;
946  }
947 
948  /* No additional work for this publication. Next one. */
949  continue;
950  }
951 
952  /* Form the per pubaction row filter lists. */
953  if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
954  rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
955  TextDatumGetCString(rfdatum));
956  if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
957  rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
958  TextDatumGetCString(rfdatum));
959  if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
960  rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
961  TextDatumGetCString(rfdatum));
962 
963  ReleaseSysCache(rftuple);
964  } /* loop all subscribed publications */
965 
966  /* Clean the row filter */
967  for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
968  {
969  if (no_filter[idx])
970  {
971  list_free_deep(rfnodes[idx]);
972  rfnodes[idx] = NIL;
973  }
974  }
975 
976  if (has_filter)
977  {
979 
981 
982  /*
983  * Now all the filters for all pubactions are known. Combine them when
984  * their pubactions are the same.
985  */
986  oldctx = MemoryContextSwitchTo(entry->entry_cxt);
987  entry->estate = create_estate_for_relation(relation);
988  for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
989  {
990  List *filters = NIL;
991  Expr *rfnode;
992 
993  if (rfnodes[idx] == NIL)
994  continue;
995 
996  foreach(lc, rfnodes[idx])
997  filters = lappend(filters, stringToNode((char *) lfirst(lc)));
998 
999  /* combine the row filter and cache the ExprState */
1000  rfnode = make_orclause(filters);
1001  entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
1002  } /* for each pubaction */
1003  MemoryContextSwitchTo(oldctx);
1004 
1005  RelationClose(relation);
1006  }
1007 }
Datum idx(PG_FUNCTION_ARGS)
Definition: _int_op.c:259
#define TextDatumGetCString(d)
Definition: builtins.h:95
ExprState * ExecPrepareExpr(Expr *node, EState *estate)
Definition: execExpr.c:735
Expr * make_orclause(List *orclauses)
Definition: makefuncs.c:655
#define NUM_ROWFILTER_PUBACTIONS
Definition: pgoutput.c:108
static EState * create_estate_for_relation(Relation rel)
Definition: pgoutput.c:792
void * stringToNode(const char *str)
Definition: read.c:90
#define SearchSysCacheExists2(cacheId, key1, key2)
Definition: syscache.h:193

References Publication::alltables, create_estate_for_relation(), data, RelationSyncEntry::entry_cxt, RelationSyncEntry::estate, ExecPrepareExpr(), RelationSyncEntry::exprstate, get_rel_namespace(), HeapTupleIsValid, idx(), lappend(), lfirst, list_free_deep(), make_orclause(), MemoryContextSwitchTo(), NIL, NUM_ROWFILTER_PUBACTIONS, ObjectIdGetDatum(), Publication::oid, pgoutput_ensure_entry_cxt(), PUBACTION_DELETE, PUBACTION_INSERT, PUBACTION_UPDATE, Publication::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, PUBLICATIONNAMESPACEMAP, PUBLICATIONRELMAP, RelationSyncEntry::publish_as_relid, PublicationActions::pubupdate, RelationClose(), RelationIdGetRelation(), ReleaseSysCache(), SearchSysCache2(), SearchSysCacheExists2, stringToNode(), SysCacheGetAttr(), and TextDatumGetCString.

Referenced by get_rel_sync_entry().

◆ pgoutput_send_begin()

static void pgoutput_send_begin ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 561 of file pgoutput.c.

562 {
563  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
565 
566  Assert(txndata);
567  Assert(!txndata->sent_begin_txn);
568 
569  OutputPluginPrepareWrite(ctx, !send_replication_origin);
570  logicalrep_write_begin(ctx->out, txn);
571  txndata->sent_begin_txn = true;
572 
573  send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
574  send_replication_origin);
575 
576  OutputPluginWrite(ctx, true);
577 }
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:60

References Assert(), InvalidRepOriginId, logicalrep_write_begin(), ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, LogicalDecodingContext::out, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), send_repl_origin(), and PGOutputTxnData::sent_begin_txn.

Referenced by pgoutput_change(), pgoutput_message(), and pgoutput_truncate().

◆ pgoutput_shutdown()

static void pgoutput_shutdown ( LogicalDecodingContext ctx)
static

Definition at line 1690 of file pgoutput.c.

1691 {
1692  if (RelationSyncCache)
1693  {
1695  RelationSyncCache = NULL;
1696  }
1697 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:863

References hash_destroy(), and RelationSyncCache.

Referenced by _PG_output_plugin_init().

◆ pgoutput_startup()

static void pgoutput_startup ( LogicalDecodingContext ctx,
OutputPluginOptions opt,
bool  is_init 
)
static

Definition at line 409 of file pgoutput.c.

411 {
413  static bool publication_callback_registered = false;
414 
415  /* Create our memory context for private allocations. */
416  data->context = AllocSetContextCreate(ctx->context,
417  "logical replication output context",
419 
420  data->cachectx = AllocSetContextCreate(ctx->context,
421  "logical replication cache context",
423 
425 
426  /* This plugin uses binary protocol. */
428 
429  /*
430  * This is replication start and not slot initialization.
431  *
432  * Parse and validate options passed by the client.
433  */
434  if (!is_init)
435  {
436  /* Parse the params and ERROR if we see any we don't recognize */
438 
439  /* Check if we support requested protocol */
440  if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
441  ereport(ERROR,
442  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
443  errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
444  data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
445 
446  if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
447  ereport(ERROR,
448  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
449  errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
450  data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
451 
452  if (data->publication_names == NIL)
453  ereport(ERROR,
454  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
455  errmsg("publication_names parameter missing")));
456 
457  /*
458  * Decide whether to enable streaming. It is disabled by default, in
459  * which case we just update the flag in decoding context. Otherwise
460  * we only allow it with sufficient version of the protocol, and when
461  * the output plugin supports it.
462  */
463  if (data->streaming == LOGICALREP_STREAM_OFF)
464  ctx->streaming = false;
465  else if (data->streaming == LOGICALREP_STREAM_ON &&
466  data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
467  ereport(ERROR,
468  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
469  errmsg("requested proto_version=%d does not support streaming, need %d or higher",
470  data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
471  else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
473  ereport(ERROR,
474  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
475  errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
477  else if (!ctx->streaming)
478  ereport(ERROR,
479  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
480  errmsg("streaming requested, but not supported by output plugin")));
481 
482  /* Also remember we're currently not streaming any transaction. */
483  in_streaming = false;
484 
485  /*
486  * Here, we just check whether the two-phase option is passed by
487  * plugin and decide whether to enable it at later point of time. It
488  * remains enabled if the previous start-up has done so. But we only
489  * allow the option to be passed in with sufficient version of the
490  * protocol, and when the output plugin supports it.
491  */
492  if (!data->two_phase)
493  ctx->twophase_opt_given = false;
494  else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
495  ereport(ERROR,
496  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
497  errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
498  data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
499  else if (!ctx->twophase)
500  ereport(ERROR,
501  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
502  errmsg("two-phase commit requested, but not supported by output plugin")));
503  else
504  ctx->twophase_opt_given = true;
505 
506  /* Init publication state. */
507  data->publications = NIL;
508  publications_valid = false;
509 
510  /*
511  * Register callback for pg_publication if we didn't already do that
512  * during some previous call in this process.
513  */
514  if (!publication_callback_registered)
515  {
518  (Datum) 0);
519  publication_callback_registered = true;
520  }
521 
522  /* Initialize relation schema cache. */
524  }
525  else
526  {
527  /*
528  * Disable the streaming and prepared transactions during the slot
529  * initialization mode.
530  */
531  ctx->streaming = false;
532  ctx->twophase = false;
533  }
534 }
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
Definition: logicalproto.h:44
#define LOGICALREP_PROTO_MIN_VERSION_NUM
Definition: logicalproto.h:40
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:42
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:43
#define LOGICALREP_PROTO_MAX_VERSION_NUM
Definition: logicalproto.h:45
void * palloc0(Size size)
Definition: mcxt.c:1257
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
@ OUTPUT_PLUGIN_BINARY_OUTPUT
Definition: output_plugin.h:19
#define LOGICALREP_STREAM_ON
#define LOGICALREP_STREAM_PARALLEL
static void parse_output_parameters(List *options, PGOutputData *data)
Definition: pgoutput.c:279
static void init_rel_sync_cache(MemoryContext cachectx)
Definition: pgoutput.c:1869
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:1725
List * output_plugin_options
Definition: logical.h:59
OutputPluginOutputType output_type
Definition: output_plugin.h:28
@ PUBLICATIONOID
Definition: syscache.h:83

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CacheMemoryContext, CacheRegisterSyscacheCallback(), LogicalDecodingContext::context, data, ereport, errcode(), errmsg(), ERROR, in_streaming, init_rel_sync_cache(), LOGICALREP_PROTO_MAX_VERSION_NUM, LOGICALREP_PROTO_MIN_VERSION_NUM, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM, LOGICALREP_PROTO_STREAM_VERSION_NUM, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM, LOGICALREP_STREAM_OFF, LOGICALREP_STREAM_ON, LOGICALREP_STREAM_PARALLEL, NIL, OUTPUT_PLUGIN_BINARY_OUTPUT, LogicalDecodingContext::output_plugin_options, LogicalDecodingContext::output_plugin_private, OutputPluginOptions::output_type, palloc0(), parse_output_parameters(), publication_invalidation_cb(), PUBLICATIONOID, publications_valid, LogicalDecodingContext::streaming, LogicalDecodingContext::twophase, and LogicalDecodingContext::twophase_opt_given.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_abort()

static void pgoutput_stream_abort ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  abort_lsn 
)
static

Definition at line 1790 of file pgoutput.c.

1793 {
1794  ReorderBufferTXN *toptxn;
1796  bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
1797 
1798  /*
1799  * The abort should happen outside streaming block, even for streamed
1800  * transactions. The transaction has to be marked as streamed, though.
1801  */
1802  Assert(!in_streaming);
1803 
1804  /* determine the toplevel transaction */
1805  toptxn = rbtxn_get_toptxn(txn);
1806 
1807  Assert(rbtxn_is_streamed(toptxn));
1808 
1809  OutputPluginPrepareWrite(ctx, true);
1810  logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
1811  txn->xact_time.abort_time, write_abort_info);
1812 
1813  OutputPluginWrite(ctx, true);
1814 
1815  cleanup_rel_sync_cache(toptxn->xid, false);
1816 }
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
Definition: pgoutput.c:2223
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid, XLogRecPtr abort_lsn, TimestampTz abort_time, bool write_abort_info)
Definition: proto.c:1166
#define rbtxn_is_streamed(txn)
union ReorderBufferTXN::@105 xact_time
TimestampTz abort_time

References ReorderBufferTXN::abort_time, Assert(), cleanup_rel_sync_cache(), data, in_streaming, LOGICALREP_STREAM_PARALLEL, logicalrep_write_stream_abort(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), rbtxn_get_toptxn, rbtxn_is_streamed, ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_commit()

static void pgoutput_stream_commit ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 1823 of file pgoutput.c.

1826 {
1827  /*
1828  * The commit should happen outside streaming block, even for streamed
1829  * transactions. The transaction has to be marked as streamed, though.
1830  */
1831  Assert(!in_streaming);
1832  Assert(rbtxn_is_streamed(txn));
1833 
1834  OutputPluginUpdateProgress(ctx, false);
1835 
1836  OutputPluginPrepareWrite(ctx, true);
1837  logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1838  OutputPluginWrite(ctx, true);
1839 
1840  cleanup_rel_sync_cache(txn->xid, true);
1841 }
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:1112

References Assert(), cleanup_rel_sync_cache(), in_streaming, logicalrep_write_stream_commit(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), OutputPluginWrite(), rbtxn_is_streamed, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_prepare_txn()

static void pgoutput_stream_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 1849 of file pgoutput.c.

1852 {
1853  Assert(rbtxn_is_streamed(txn));
1854 
1855  OutputPluginUpdateProgress(ctx, false);
1856  OutputPluginPrepareWrite(ctx, true);
1857  logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1858  OutputPluginWrite(ctx, true);
1859 }
void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:364

References Assert(), logicalrep_write_stream_prepare(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), OutputPluginWrite(), and rbtxn_is_streamed.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_start()

static void pgoutput_stream_start ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 1740 of file pgoutput.c.

1742 {
1743  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1744 
1745  /* we can't nest streaming of transactions */
1746  Assert(!in_streaming);
1747 
1748  /*
1749  * If we already sent the first stream for this transaction then don't
1750  * send the origin id in the subsequent streams.
1751  */
1752  if (rbtxn_is_streamed(txn))
1753  send_replication_origin = false;
1754 
1755  OutputPluginPrepareWrite(ctx, !send_replication_origin);
1757 
1759  send_replication_origin);
1760 
1761  OutputPluginWrite(ctx, true);
1762 
1763  /* we're streaming a chunk of transaction now */
1764  in_streaming = true;
1765 }
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
Definition: proto.c:1069
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert(), in_streaming, InvalidRepOriginId, InvalidXLogRecPtr, logicalrep_write_stream_start(), ReorderBufferTXN::origin_id, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), rbtxn_is_streamed, send_repl_origin(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_stop()

static void pgoutput_stream_stop ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 1771 of file pgoutput.c.

1773 {
1774  /* we should be streaming a transaction */
1776 
1777  OutputPluginPrepareWrite(ctx, true);
1779  OutputPluginWrite(ctx, true);
1780 
1781  /* we've stopped streaming a transaction */
1782  in_streaming = false;
1783 }
void logicalrep_write_stream_stop(StringInfo out)
Definition: proto.c:1103

References Assert(), in_streaming, logicalrep_write_stream_stop(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

◆ pgoutput_truncate()

static void pgoutput_truncate ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
int  nrelations,
Relation  relations[],
ReorderBufferChange change 
)
static

Definition at line 1560 of file pgoutput.c.

1562 {
1565  MemoryContext old;
1566  RelationSyncEntry *relentry;
1567  int i;
1568  int nrelids;
1569  Oid *relids;
1571 
1572  /* Remember the xid for the change in streaming mode. See pgoutput_change. */
1573  if (in_streaming)
1574  xid = change->txn->xid;
1575 
1576  old = MemoryContextSwitchTo(data->context);
1577 
1578  relids = palloc0(nrelations * sizeof(Oid));
1579  nrelids = 0;
1580 
1581  for (i = 0; i < nrelations; i++)
1582  {
1583  Relation relation = relations[i];
1584  Oid relid = RelationGetRelid(relation);
1585 
1586  if (!is_publishable_relation(relation))
1587  continue;
1588 
1589  relentry = get_rel_sync_entry(data, relation);
1590 
1591  if (!relentry->pubactions.pubtruncate)
1592  continue;
1593 
1594  /*
1595  * Don't send partitions if the publication wants to send only the
1596  * root tables through it.
1597  */
1598  if (relation->rd_rel->relispartition &&
1599  relentry->publish_as_relid != relid)
1600  continue;
1601 
1602  relids[nrelids++] = relid;
1603 
1604  /* Send BEGIN if we haven't yet */
1605  if (txndata && !txndata->sent_begin_txn)
1606  pgoutput_send_begin(ctx, txn);
1607 
1608  maybe_send_schema(ctx, change, relation, relentry);
1609  }
1610 
1611  if (nrelids > 0)
1612  {
1613  OutputPluginPrepareWrite(ctx, true);
1615  xid,
1616  nrelids,
1617  relids,
1618  change->data.truncate.cascade,
1619  change->data.truncate.restart_seqs);
1620  OutputPluginWrite(ctx, true);
1621  }
1622 
1623  MemoryContextSwitchTo(old);
1624  MemoryContextReset(data->context);
1625 }
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:586
struct ReorderBufferChange::@99::@101 truncate

References ReorderBufferChange::data, data, get_rel_sync_entry(), i, in_streaming, InvalidTransactionId, is_publishable_relation(), logicalrep_write_truncate(), maybe_send_schema(), MemoryContextReset(), MemoryContextSwitchTo(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), palloc0(), pgoutput_send_begin(), RelationSyncEntry::pubactions, RelationSyncEntry::publish_as_relid, PublicationActions::pubtruncate, RelationData::rd_rel, RelationGetRelid, ReorderBufferChange::truncate, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ publication_invalidation_cb()

static void publication_invalidation_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 1725 of file pgoutput.c.

1726 {
1727  publications_valid = false;
1728 
1729  /*
1730  * Also invalidate per-relation cache so that next time the filtering info
1731  * is checked it will be updated with the new publication settings.
1732  */
1733  rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
1734 }
void * arg

References arg, publications_valid, and rel_sync_cache_publication_cb().

Referenced by pgoutput_startup().

◆ rel_sync_cache_publication_cb()

static void rel_sync_cache_publication_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 2310 of file pgoutput.c.

2311 {
2312  HASH_SEQ_STATUS status;
2313  RelationSyncEntry *entry;
2314 
2315  /*
2316  * We can get here if the plugin was used in SQL interface as the
2317  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
2318  * is no way to unregister the invalidation callbacks.
2319  */
2320  if (RelationSyncCache == NULL)
2321  return;
2322 
2323  /*
2324  * We have no easy way to identify which cache entries this invalidation
2325  * event might have affected, so just mark them all invalid.
2326  */
2327  hash_seq_init(&status, RelationSyncCache);
2328  while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2329  {
2330  entry->replicate_valid = false;
2331  }
2332 }

References hash_seq_init(), hash_seq_search(), RelationSyncCache, and RelationSyncEntry::replicate_valid.

Referenced by init_rel_sync_cache(), and publication_invalidation_cb().

◆ rel_sync_cache_relation_cb()

static void rel_sync_cache_relation_cb ( Datum  arg,
Oid  relid 
)
static

Definition at line 2259 of file pgoutput.c.

2260 {
2261  RelationSyncEntry *entry;
2262 
2263  /*
2264  * We can get here if the plugin was used in SQL interface as the
2265  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
2266  * is no way to unregister the relcache invalidation callback.
2267  */
2268  if (RelationSyncCache == NULL)
2269  return;
2270 
2271  /*
2272  * Nobody keeps pointers to entries in this hash table around outside
2273  * logical decoding callback calls - but invalidation events can come in
2274  * *during* a callback if we do any syscache access in the callback.
2275  * Because of that we must mark the cache entry as invalid but not damage
2276  * any of its substructure here. The next get_rel_sync_entry() call will
2277  * rebuild it all.
2278  */
2279  if (OidIsValid(relid))
2280  {
2281  /*
2282  * Getting invalidations for relations that aren't in the table is
2283  * entirely normal. So we don't care if it's found or not.
2284  */
2285  entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
2286  HASH_FIND, NULL);
2287  if (entry != NULL)
2288  entry->replicate_valid = false;
2289  }
2290  else
2291  {
2292  /* Whole cache must be flushed. */
2293  HASH_SEQ_STATUS status;
2294 
2295  hash_seq_init(&status, RelationSyncCache);
2296  while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2297  {
2298  entry->replicate_valid = false;
2299  }
2300  }
2301 }
#define OidIsValid(objectId)
Definition: c.h:759
@ HASH_FIND
Definition: hsearch.h:113

References HASH_FIND, hash_search(), hash_seq_init(), hash_seq_search(), OidIsValid, RelationSyncCache, and RelationSyncEntry::replicate_valid.

Referenced by init_rel_sync_cache().

◆ send_relation_and_attrs()

static void send_relation_and_attrs ( Relation  relation,
TransactionId  xid,
LogicalDecodingContext ctx,
Bitmapset columns 
)
static

Definition at line 748 of file pgoutput.c.

751 {
752  TupleDesc desc = RelationGetDescr(relation);
753  int i;
754 
755  /*
756  * Write out type info if needed. We do that only for user-created types.
757  * We use FirstGenbkiObjectId as the cutoff, so that we only consider
758  * objects with hand-assigned OIDs to be "built in", not for instance any
759  * function or type defined in the information_schema. This is important
760  * because only hand-assigned OIDs can be expected to remain stable across
761  * major versions.
762  */
763  for (i = 0; i < desc->natts; i++)
764  {
765  Form_pg_attribute att = TupleDescAttr(desc, i);
766 
767  if (att->attisdropped || att->attgenerated)
768  continue;
769 
770  if (att->atttypid < FirstGenbkiObjectId)
771  continue;
772 
773  /* Skip this attribute if it's not present in the column list */
774  if (columns != NULL && !bms_is_member(att->attnum, columns))
775  continue;
776 
777  OutputPluginPrepareWrite(ctx, false);
778  logicalrep_write_typ(ctx->out, xid, att->atttypid);
779  OutputPluginWrite(ctx, false);
780  }
781 
782  OutputPluginPrepareWrite(ctx, false);
783  logicalrep_write_rel(ctx->out, xid, relation, columns);
784  OutputPluginWrite(ctx, false);
785 }
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:444
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns)
Definition: proto.c:670
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
Definition: proto.c:725
#define FirstGenbkiObjectId
Definition: transam.h:195

References bms_is_member(), FirstGenbkiObjectId, i, logicalrep_write_rel(), logicalrep_write_typ(), TupleDescData::natts, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), RelationGetDescr, and TupleDescAttr.

Referenced by maybe_send_schema().

◆ send_repl_origin()

static void send_repl_origin ( LogicalDecodingContext ctx,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
bool  send_origin 
)
static

Definition at line 2336 of file pgoutput.c.

2338 {
2339  if (send_origin)
2340  {
2341  char *origin;
2342 
2343  /*----------
2344  * XXX: which behaviour do we want here?
2345  *
2346  * Alternatives:
2347  * - don't send origin message if origin name not found
2348  * (that's what we do now)
2349  * - throw error - that will break replication, not good
2350  * - send some special "unknown" origin
2351  *----------
2352  */
2353  if (replorigin_by_oid(origin_id, true, &origin))
2354  {
2355  /* Message boundary */
2356  OutputPluginWrite(ctx, false);
2357  OutputPluginPrepareWrite(ctx, true);
2358 
2359  logicalrep_write_origin(ctx->out, origin, origin_lsn);
2360  }
2361  }
2362 }
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:465
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:385

References logicalrep_write_origin(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and replorigin_by_oid().

Referenced by pgoutput_begin_prepare_txn(), pgoutput_send_begin(), and pgoutput_stream_start().

◆ set_schema_sent_in_streamed_txn()

static void set_schema_sent_in_streamed_txn ( RelationSyncEntry entry,
TransactionId  xid 
)
static

Definition at line 1933 of file pgoutput.c.

1934 {
1935  MemoryContext oldctx;
1936 
1938 
1939  entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
1940 
1941  MemoryContextSwitchTo(oldctx);
1942 }
List * lappend_xid(List *list, TransactionId datum)
Definition: list.c:392

References CacheMemoryContext, lappend_xid(), MemoryContextSwitchTo(), and RelationSyncEntry::streamed_txns.

Referenced by maybe_send_schema().

Variable Documentation

◆ in_streaming

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 39 of file pgoutput.c.

◆ publications_valid

bool publications_valid
static

Definition at line 83 of file pgoutput.c.

Referenced by get_rel_sync_entry(), pgoutput_startup(), and publication_invalidation_cb().

◆ publish_no_origin

bool publish_no_origin
static

Definition at line 85 of file pgoutput.c.

Referenced by parse_output_parameters(), and pgoutput_origin_filter().

◆ RelationSyncCache